"use strict"; const Promise = require("bluebird"); const isEndOfStream = require("@promistream/is-end-of-stream"); const isAborted = require("@promistream/is-aborted"); const propagateAbort = require("@promistream/propagate-abort"); const pipe = require("@promistream/pipe"); const sequentialize = require("@promistream/sequentialize"); const defaultValue = require("default-value"); const debug = require("debug")("promistream:parallelize"); const createPromiseListener = require("./promise-listener"); // FIXME: Verify that if an EndOfStream or Aborted marker comes in, it is left queued up until all of the in-flight non-marker results have been processed; otherwise the downstream may erroneously believe that the stream has already ended, while more items are still on their way module.exports = function parallelizeStream(threadCount, options = {}) { let ordered = defaultValue(options.ordered, true); /* TODO: Does this need a more efficient FIFO queue implementation? */ let signals = []; let storedErrors = []; let queueListener = createPromiseListener(); let parallelMode = true; let filling = false; let currentSource = null; function fillRequest() { if (parallelMode === true && signals.length < threadCount) { return Promise.try(() => { return currentSource.peek(); }).then((valueAvailable) => { if (valueAvailable) { queueRead(); return fillRequest(currentSource); } else { return switchToSequentialMode(); } }); } else { debug(`Paused internal queue fill (parallel mode = ${parallelMode}, thread count = ${threadCount}, in-flight requests = ${signals.length})`); filling = false; } } function tryStartFilling() { if (!filling) { debug("Starting internal queue fill..."); filling = true; return fillRequest(currentSource); } } function switchToParallelMode() { debug("Switching to parallel mode"); parallelMode = true; return tryStartFilling(); } function switchToSequentialMode() { debug("Switching to sequential mode"); parallelMode = false; } function bufferNotEmpty() { /* NOTE: This should *only* take into account items that have not been peeked yet! */ let peekedSignal; if (ordered) { /* This will return only if there is a contiguous sequence of settled signals from the start, of which *at least one* has not been peeked yet. */ for (let signal of signals) { if (signal.trackingPromise.isPending()) { break; } else if (signal.peeked) { continue; } else { /* Settled, and not peeked yet. */ peekedSignal = signal; break; } } } else { peekedSignal = signals.find((signal) => { return !(signal.trackingPromise.isPending()) && !signal.peeked; }); } if (peekedSignal != null) { peekedSignal.peeked = true; return true; } else { return false; } } function awaitInFlightRequests() { if (signals.length > 0) { return true; } else { debug("Waiting for queue to be non-empty..."); return Promise.try(() => { return queueListener.listen(); }).tap(() => { debug("Got queue-filled notification"); }).tapCatch((error) => { debug(`Queue listener rejected: ${error.stack}`); }); } } function queueRead() { let promise = Promise.try(() => { return currentSource.read(); }); let signalObject = { promise: promise }; signals.push({ peeked: false, object: signalObject, trackingPromise: Promise.try(() => { return promise.reflect(); }).then(() => { return signalObject; }) }); queueListener.resolve(); } function awaitResult() { return Promise.try(() => { return awaitInFlightRequests(); }).then(() => { debug("Awaiting next finished result..."); if (ordered) { return signals[0].trackingPromise; } else { return Promise.race(signals.map((item) => item.trackingPromise)); } }).then((signalObject) => { let resultPromise = signalObject.promise; signals = signals.filter((signal) => (signal.object !== signalObject)); let isRejected = resultPromise.isRejected(); let isEndOfStream_ = isRejected && isEndOfStream(resultPromise.reason()); let isAborted_ = isRejected && isAborted(resultPromise.reason()); if (isEndOfStream_ || isAborted_) { switchToSequentialMode(); if (signals.length > 0) { /* Throw away the marker, and wait for the next result */ return awaitResult(); } else { /* Queue has been exhausted, so this marker will be the final result; pass it through */ return signalObject.promise; } } else { return signalObject.promise; } }); } let parallelizer = { _promistreamVersion: 0, description: `parallelize (${threadCount} threads)`, abort: propagateAbort, peek: function (source) { return Promise.try(() => { debug("Processing peek..."); if (bufferNotEmpty()) { return true; } else { if (parallelMode === true) { return source.peek(); } else { return false; } } }); }, read: function (source) { return Promise.try(() => { debug("Processing read..."); currentSource = source; if (storedErrors.length > 0) { throw storedErrors.shift(); } else { if (parallelMode) { /* This runs in the background, potentially perpetually */ Promise.try(() => { return tryStartFilling(); }).catch((err) => { queueListener.reject(err); debug(`Error occurred during filling: ${err.stack}`); // storedErrors.push(err); }); return awaitResult(); } else { /* Sequential mode */ if (signals.length > 0) { /* Clear out the remaining in-flight reads from the previous parallel-mode operation, first. */ return awaitResult(); } else { debug("Passing through read to upstream..."); return Promise.try(() => { return source.read(); }).then((result) => { switchToParallelMode(); return result; }); } } } }); } }; return pipe([ parallelizer, sequentialize() ]); };