You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
201 lines
6.4 KiB
JavaScript
201 lines
6.4 KiB
JavaScript
"use strict";
|
|
|
|
const Promise = require("bluebird");
|
|
const consumable = require("@joepie91/consumable");
|
|
const defaultValue = require("default-value");
|
|
const debug = require("debug")("promistream:parallelize");
|
|
const createRequestMatcher = require("request-matcher");
|
|
|
|
const isEndOfStream = require("@promistream/is-end-of-stream");
|
|
const isAborted = require("@promistream/is-aborted");
|
|
const propagateAbort = require("@promistream/propagate-abort");
|
|
const pipe = require("@promistream/pipe");
|
|
const sequentialize = require("@promistream/sequentialize");
|
|
|
|
const asyncWhile = require("./async-while");
|
|
|
|
function isRejectedWithMarker(promise) {
|
|
if (promise.isRejected()) {
|
|
let reason = promise.reason();
|
|
|
|
return (isAborted(reason) || isEndOfStream(reason));
|
|
} else {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
module.exports = function parallelizeStream(threadCount, options = {}) {
|
|
let ordered = defaultValue(options.ordered, true);
|
|
|
|
/* TODO: Does this need a more efficient FIFO queue implementation? */
|
|
let ended = false;
|
|
let parallelMode = true;
|
|
let filling = false;
|
|
let currentSource = null;
|
|
let threadsRunning = 0;
|
|
let peekPointer = 0;
|
|
let markerBuffer = consumable([]);
|
|
|
|
let requestMatcher = createRequestMatcher({
|
|
onMatch: () => {
|
|
tryStartFilling();
|
|
}
|
|
});
|
|
|
|
function canStartRead() {
|
|
let maximumThreads = (parallelMode === true)
|
|
? threadCount
|
|
: 1;
|
|
|
|
if (threadsRunning >= maximumThreads) {
|
|
return false;
|
|
} else if (ended) {
|
|
// Special case: never optimistically read when the stream has been ended, because it will lead to an unnecessary read
|
|
let canRead = (requestMatcher.requestCount() > 0);
|
|
|
|
debug(` [filling] stream ended; is there a request in queue? ${canRead}`);
|
|
return canRead;
|
|
} else {
|
|
return Promise.try(() => {
|
|
return currentSource.peek();
|
|
}).then((dataAvailable) => {
|
|
if (dataAvailable && parallelMode === false && ended === false) {
|
|
switchToParallelMode();
|
|
} else if (!dataAvailable && parallelMode === true) {
|
|
switchToSequentialMode();
|
|
}
|
|
|
|
debug(` [filling] data available upstream: ${dataAvailable}`);
|
|
|
|
return dataAvailable;
|
|
});
|
|
}
|
|
}
|
|
|
|
function tryStartFilling() {
|
|
if (filling === false) {
|
|
debug(` [filling] started`);
|
|
filling = true;
|
|
|
|
Promise.try(() => {
|
|
return asyncWhile(canStartRead, () => {
|
|
return startRead();
|
|
});
|
|
}).then(() => {
|
|
debug(` [filling] completed`);
|
|
filling = false;
|
|
}).catch((error) => {
|
|
debug(` [filling] error:`, error);
|
|
requestMatcher.failAllRequests(error);
|
|
});
|
|
}
|
|
}
|
|
|
|
function startRead() {
|
|
threadsRunning += 1;
|
|
debug(`[read] started (parallel mode = ${parallelMode}, thread count = ${threadCount}, in-flight requests = ${threadsRunning})`);
|
|
|
|
let readOperation = Promise.try(() => {
|
|
return currentSource.read();
|
|
}).finally(() => {
|
|
threadsRunning -= 1;
|
|
debug(`[read] completed (parallel mode = ${parallelMode}, thread count = ${threadCount}, in-flight requests = ${threadsRunning})`);
|
|
}).tapCatch(isEndOfStream, isAborted, () => {
|
|
if (ended === false) {
|
|
debug(" [mode] marking stream as ended");
|
|
ended = true;
|
|
switchToSequentialMode();
|
|
}
|
|
});
|
|
|
|
// This noop is just used to silence unhandled rejection warnings - those get handled by the read request that they are attached to instead. But because they may only actually get attached in a later event loop tick, Bluebird will incorrectly believe them to be 'unhandled'.
|
|
// TODO: Is the assertion that they get handled always true? Is it possible for some read results to never get read out? Will that re-emit a warning elsewhere somehow?
|
|
function noop() {}
|
|
|
|
if (ordered) {
|
|
// In ordered mode, (a Promise for) the result is reported to the queue immediately so that it gets matched to a read request in-order
|
|
debug("[response] pushed in-flight read operation");
|
|
requestMatcher.pushResponse(readOperation);
|
|
|
|
// Unhandled rejection silencer
|
|
readOperation.catch(noop);
|
|
} else {
|
|
// In unordered mode, results are reported to the queue as and when they come in
|
|
readOperation.finally(() => {
|
|
if (isRejectedWithMarker(readOperation)) {
|
|
// We place Aborted/EndOfStream markers in a separate queue in unordered mode; they can occur *before* some other successful reads complete, and we don't want the downstream to prematurely stop reading, so we need to make sure that all non-marker results are processed before throwing the markers downstream.
|
|
debug("[response] pushed read result (to marker buffer)");
|
|
markerBuffer.peek().push(readOperation);
|
|
} else {
|
|
debug("[response] pushed read result");
|
|
requestMatcher.pushResponse(readOperation);
|
|
}
|
|
|
|
if (ended === true && threadsRunning === 0 && markerBuffer.peek().length > 0) {
|
|
for (let marker of markerBuffer.replace([])) {
|
|
requestMatcher.pushResponse(marker);
|
|
}
|
|
}
|
|
}).catch(noop); // Unhandled rejection silencer
|
|
}
|
|
}
|
|
|
|
function switchToParallelMode() {
|
|
debug(" [mode] switching to parallel");
|
|
parallelMode = true;
|
|
|
|
return tryStartFilling();
|
|
}
|
|
|
|
function switchToSequentialMode() {
|
|
debug(" [mode] switching to sequential");
|
|
parallelMode = false;
|
|
}
|
|
|
|
let parallelizer = {
|
|
_promistreamVersion: 0,
|
|
description: `parallelize (${threadCount} threads)`,
|
|
abort: propagateAbort,
|
|
peek: async function (source) {
|
|
debug("[peek] requested");
|
|
|
|
if (requestMatcher.responseCount() > peekPointer) {
|
|
peekPointer += 1;
|
|
|
|
return true;
|
|
} else if (parallelMode === true) {
|
|
return source.peek();
|
|
} else {
|
|
return false;
|
|
}
|
|
},
|
|
read: function (source) {
|
|
return Promise.try(() => {
|
|
currentSource = source;
|
|
|
|
if (peekPointer > 0) {
|
|
peekPointer -= 1;
|
|
}
|
|
|
|
// We leave it up to the request queue to match a read request to a result - we don't assume that it will be the next read, because in unordered mode reads can complete out-of-order
|
|
let request = requestMatcher.pushRequest();
|
|
debug("[request] started");
|
|
|
|
// NOTE: This should always happen *after* creating a request in the queue, to ensure correct behaviour when the stream has ended; otherwise canStartRead may be called while the requestCount is still 0, and it will fail to initiate a read to satisfy the request
|
|
tryStartFilling();
|
|
|
|
return request;
|
|
}).tap(() => {
|
|
debug("[request] satisfied with a value");
|
|
}).tapCatch((error) => {
|
|
debug("[request] satisfied with an error:", error.message);
|
|
});
|
|
}
|
|
};
|
|
|
|
return pipe([
|
|
parallelizer,
|
|
sequentialize()
|
|
]);
|
|
};
|