You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

54 lines
2.7 KiB
JavaScript

"use strict";
const range = require("range").range;
const derivedStream = require("@promistream/derived-stream");
const { validateArguments } = require("@validatem/core");
const required = require("@validatem/required");
const isInteger = require("@validatem/is-integer");
const isPositive = require("@validatem/is-positive");
const isFunction = require("@validatem/is-function");
// TODO: Think about when exactly an EOS/error should be passed through to all pending stream reads
const indexedQueue = require("./indexed-queue");
/* Strategies:
- Optimize for speed: Each attempted read from a fork triggers upstream reads until one is sorted into that fork. Other forks that values were sorted into, will buffer those values up for their next reads (which will not cause additional upstream reads). This may cause buffers to grow large, if not all of the fork streams are read simultaneously.
- Optimize for resources: Each attempted read from a fork triggers one upstream read, sorting it into the correct queue. That may not be the queue that was read from. Eventually the amount of reads should equal the amount of produced values, so it should all balance out in the end, but it's possible that one fork may need to wait on another to call for more data. This may deadlock, if not all of the fork streams are read simultaneously.
NOTE: Need to make sure this is order-preserving, ie. stream read results are processed in the order that they were originally requested, not in the order that they succeed in. This is because we don't know what fork a value will be sorted into until we have the result from the callback, so we can't know whether something will be the 'first' value for a given fork until *all* of the preceding values have been completely evaluated. Maybe have an option to disable order preservation, for faster operation?
*/
module.exports = function createFilterFork(_streamCount, _callback) {
let [ streamCount, callback ] = validateArguments(arguments, {
streamCount: [ required, isInteger, isPositive ],
callback: [ required, isFunction ]
});
let withQueue = indexedQueue(streamCount, callback);
return derivedStream((source) => {
return range(0, streamCount).map((streamIndex) => {
return {
_promistreamVersion: 0,
_promistreamIsSource: true,
description: `Filter stream fork (${streamIndex})`,
abort: function (reason) {
return source.abort(reason);
},
peek: function () {
// FIXME: Check that this is correct, and whether peek in fork-round-robin needs to also be changed to *not* use withQueue
return source.peek();
},
read: async function () {
return withQueue(streamIndex, () => {
return source.read();
});
}
};
});
});
};