"use strict"; const Promise = require("bluebird"); const consumable = require("@joepie91/consumable"); const defaultValue = require("default-value"); const debug = require("debug")("promistream:parallelize"); const createRequestMatcher = require("request-matcher"); const isEndOfStream = require("@promistream/is-end-of-stream"); const isAborted = require("@promistream/is-aborted"); const propagateAbort = require("@promistream/propagate-abort"); const pipe = require("@promistream/pipe"); const sequentialize = require("@promistream/sequentialize"); const asyncWhile = require("./async-while"); function isRejectedWithMarker(promise) { if (promise.isRejected()) { let reason = promise.reason(); return (isAborted(reason) || isEndOfStream(reason)); } else { return false; } } module.exports = function parallelizeStream(threadCount, options = {}) { let ordered = defaultValue(options.ordered, true); /* TODO: Does this need a more efficient FIFO queue implementation? */ let ended = false; let parallelMode = true; let filling = false; let currentSource = null; let threadsRunning = 0; let peekPointer = 0; let markerBuffer = consumable([]); let requestMatcher = createRequestMatcher({ onMatch: () => { tryStartFilling(); } }); function canStartRead() { let maximumThreads = (parallelMode === true) ? threadCount : 1; if (threadsRunning >= maximumThreads) { return false; } else if (ended) { // Special case: never optimistically read when the stream has been ended, because it will lead to an unnecessary read let canRead = (requestMatcher.requestCount() > 0); debug(` [filling] stream ended; is there a request in queue? ${canRead}`); return canRead; } else { return Promise.try(() => { return currentSource.peek(); }).then((dataAvailable) => { if (dataAvailable && parallelMode === false && ended === false) { switchToParallelMode(); } else if (!dataAvailable && parallelMode === true) { switchToSequentialMode(); } debug(` [filling] data available upstream: ${dataAvailable}`); return dataAvailable; }); } } function tryStartFilling() { if (filling === false) { debug(` [filling] started`); filling = true; Promise.try(() => { return asyncWhile(canStartRead, () => { return startRead(); }); }).then(() => { debug(` [filling] completed`); filling = false; }).catch((error) => { debug(` [filling] error:`, error); requestMatcher.failAllRequests(error); }); } } function startRead() { threadsRunning += 1; debug(`[read] started (parallel mode = ${parallelMode}, thread count = ${threadCount}, in-flight requests = ${threadsRunning})`); let readOperation = Promise.try(() => { return currentSource.read(); }).finally(() => { threadsRunning -= 1; debug(`[read] completed (parallel mode = ${parallelMode}, thread count = ${threadCount}, in-flight requests = ${threadsRunning})`); }).tapCatch(isEndOfStream, isAborted, () => { if (ended === false) { debug(" [mode] marking stream as ended"); ended = true; switchToSequentialMode(); } }); // This noop is just used to silence unhandled rejection warnings - those get handled by the read request that they are attached to instead. But because they may only actually get attached in a later event loop tick, Bluebird will incorrectly believe them to be 'unhandled'. // TODO: Is the assertion that they get handled always true? Is it possible for some read results to never get read out? Will that re-emit a warning elsewhere somehow? function noop() {} if (ordered) { // In ordered mode, (a Promise for) the result is reported to the queue immediately so that it gets matched to a read request in-order debug("[response] pushed in-flight read operation"); requestMatcher.pushResponse(readOperation); // Unhandled rejection silencer readOperation.catch(noop); } else { // In unordered mode, results are reported to the queue as and when they come in readOperation.finally(() => { if (isRejectedWithMarker(readOperation)) { // We place Aborted/EndOfStream markers in a separate queue in unordered mode; they can occur *before* some other successful reads complete, and we don't want the downstream to prematurely stop reading, so we need to make sure that all non-marker results are processed before throwing the markers downstream. debug("[response] pushed read result (to marker buffer)"); markerBuffer.peek().push(readOperation); } else { debug("[response] pushed read result"); requestMatcher.pushResponse(readOperation); } if (ended === true && threadsRunning === 0 && markerBuffer.peek().length > 0) { for (let marker of markerBuffer.replace([])) { requestMatcher.pushResponse(marker); } } }).catch(noop); // Unhandled rejection silencer } } function switchToParallelMode() { debug(" [mode] switching to parallel"); parallelMode = true; return tryStartFilling(); } function switchToSequentialMode() { debug(" [mode] switching to sequential"); parallelMode = false; } let parallelizer = { _promistreamVersion: 0, description: `parallelize (${threadCount} threads)`, abort: propagateAbort, peek: async function (source) { debug("[peek] requested"); if (requestMatcher.responseCount() > peekPointer) { peekPointer += 1; return true; } else if (parallelMode === true) { return source.peek(); } else { return false; } }, read: function (source) { return Promise.try(() => { currentSource = source; if (peekPointer > 0) { peekPointer -= 1; } // We leave it up to the request queue to match a read request to a result - we don't assume that it will be the next read, because in unordered mode reads can complete out-of-order let request = requestMatcher.pushRequest(); debug("[request] started"); // NOTE: This should always happen *after* creating a request in the queue, to ensure correct behaviour when the stream has ended; otherwise canStartRead may be called while the requestCount is still 0, and it will fail to initiate a read to satisfy the request tryStartFilling(); return request; }).tap(() => { debug("[request] satisfied with a value"); }).tapCatch((error) => { debug("[request] satisfied with an error:", error.message); }); } }; return pipe([ parallelizer, sequentialize() ]); };