|
|
|
@ -0,0 +1,113 @@
|
|
|
|
|
"use strict";
|
|
|
|
|
|
|
|
|
|
// FIXME: Stream reuse via pooling. Instead of spawning a new stream every value, reuse existing streams from a pool, and hold upstream reads instead of sending EndOfStreams. This is not only useful for better performance, but also ensures that streams don't see EOS (and therefore don't do cleanup) until we're *actually* done with the entire thing - this matters especially when dealing with clones with shared state.
|
|
|
|
|
// TODO: Make `clone` an optional part of the stream API and use that internally in this module to instantiate clones as-needed from an individual stream rather than a stream factory. Need to figure out, though, how to do resource management in that case - need to ensure that when the upstream ends, not just the clones but also the original input stream receive an EndOfStream signal.
|
|
|
|
|
|
|
|
|
|
const Promise = require("bluebird");
|
|
|
|
|
const pDefer = require("p-defer");
|
|
|
|
|
const debug = require("debug")("@promistream/dynamic");
|
|
|
|
|
const valueID = require("./value-id");
|
|
|
|
|
|
|
|
|
|
const pipe = require("@promistream/pipe");
|
|
|
|
|
const fromValue = require("@promistream/from-value");
|
|
|
|
|
const preReader = require("@promistream/pre-reader");
|
|
|
|
|
const isEndOfStream = require("@promistream/is-end-of-stream");
|
|
|
|
|
const propagatePeek = require("@promistream/propagate-peek");
|
|
|
|
|
const propagateAbort = require("@promistream/propagate-abort");
|
|
|
|
|
|
|
|
|
|
module.exports = function createDynamicStream(streamPickerFunc) {
|
|
|
|
|
let requestQueue = [];
|
|
|
|
|
let streamQueue = [];
|
|
|
|
|
let isRunning = false;
|
|
|
|
|
let lastSource;
|
|
|
|
|
|
|
|
|
|
function attemptStreamRead() {
|
|
|
|
|
return Promise.try(() => {
|
|
|
|
|
if (streamQueue.length === 0) {
|
|
|
|
|
debug("Ran out of streams to read, creating new pipeline");
|
|
|
|
|
return startPipeline();
|
|
|
|
|
}
|
|
|
|
|
}).then(() => {
|
|
|
|
|
return Promise.try(() => {
|
|
|
|
|
return streamQueue[0].read();
|
|
|
|
|
}).catch(isEndOfStream, () => {
|
|
|
|
|
// Current stream is depleted, move on to the next stream in line
|
|
|
|
|
streamQueue.shift();
|
|
|
|
|
return attemptStreamRead();
|
|
|
|
|
});
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
function tryHandleQueue() {
|
|
|
|
|
// FIXME: It's possible that a pipeline results in *no* items and so we come up short on pipelines anyway, need to compensate for this by starting a new pipeline
|
|
|
|
|
if (requestQueue.length > 0) {
|
|
|
|
|
if (isRunning === false) {
|
|
|
|
|
// FIXME: Parallelism, locking? We can't sequentialize the entire thing because we *do* want to accept parallel reads from downstream (so that necessary tasks can be queued behind the scenes), it's just the fetching values from selected pipelines that we want to sequentalize across the stream queue
|
|
|
|
|
// Can probably do this with a good ol' recursive loop and an isRunning marker -- is there a way to abstract this out? We seem to be using this pattern in a lot of different places
|
|
|
|
|
let resolveRequest = requestQueue.shift();
|
|
|
|
|
let read = attemptStreamRead();
|
|
|
|
|
|
|
|
|
|
debug(`Attaching read operation to request ID ${valueID(resolveRequest)}`);
|
|
|
|
|
resolveRequest(read);
|
|
|
|
|
|
|
|
|
|
return Promise.try(() => {
|
|
|
|
|
return read;
|
|
|
|
|
}).catch(() => {
|
|
|
|
|
// NOTE: We treat any kind of error as a 'completed read' - the error will come out of the request that the read has been attached to and that is where the consumer can handle it. We only care *here* about whether it is time to start another read attempt.
|
|
|
|
|
}).then(() => {
|
|
|
|
|
debug(`Completed read for request ID ${valueID(resolveRequest)}`);
|
|
|
|
|
return tryHandleQueue();
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
isRunning = false;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
function startPipeline() {
|
|
|
|
|
return Promise.try(() => {
|
|
|
|
|
return lastSource.read();
|
|
|
|
|
}).then((value) => {
|
|
|
|
|
let selectedStreamFactory = streamPickerFunc(value);
|
|
|
|
|
|
|
|
|
|
let pipeline = pipe([
|
|
|
|
|
fromValue(value),
|
|
|
|
|
selectedStreamFactory(),
|
|
|
|
|
preReader()
|
|
|
|
|
]);
|
|
|
|
|
|
|
|
|
|
debug(`Created new pipeline with ID ${valueID(pipeline)}`);
|
|
|
|
|
|
|
|
|
|
pipeline.read(); // Set the pre-reader in motion
|
|
|
|
|
streamQueue.push(pipeline);
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
_promistreamVersion: 0,
|
|
|
|
|
peek: propagatePeek,
|
|
|
|
|
abort: propagateAbort, // TODO: Clean up internal stream pool as well; simulate an abort on those
|
|
|
|
|
read: function (source) {
|
|
|
|
|
debug(`Setting source to ID ${valueID(source)}`);
|
|
|
|
|
lastSource = source;
|
|
|
|
|
|
|
|
|
|
let { promise, resolve } = pDefer();
|
|
|
|
|
requestQueue.push(resolve);
|
|
|
|
|
debug(`Queued request ID ${valueID(resolve)}`);
|
|
|
|
|
|
|
|
|
|
return Promise.try(() => {
|
|
|
|
|
if (requestQueue.length > streamQueue.length - 1) {
|
|
|
|
|
// There are not as many active streams as there are in-flight requests, and this means we may end up not having enough streams to fulfill all requests - we can create rectify that once we run into that situation, but this would harm performance as read operations would be started unnecessarily late. Therefore, we queue a new pipeline right away.
|
|
|
|
|
// NOTE: We do "-1" here to compensate for the fact that after the first read, there is likely to be an ended stream in the stream queue that we don't know has ended yet, and so this could would otherwise always believe that no streams need to be created and then immediately run into an EndOfStream when attempting to actually *use* the available stream. FIXME: Do we need to make this margin $concurrentReadCount instead of a fixed value of 1?
|
|
|
|
|
// TODO: Queue multiple pipelines, in case one request eats through multiple pipelines due to 0-value streams?
|
|
|
|
|
// NOTE: startPipeline will produce an EndOfStream if the source runs out of values!
|
|
|
|
|
debug("Preparing a pipeline upfront");
|
|
|
|
|
return startPipeline();
|
|
|
|
|
}
|
|
|
|
|
}).then(() => {
|
|
|
|
|
tryHandleQueue();
|
|
|
|
|
return promise;
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
};
|