Compare commits

...

2 Commits

@ -4,6 +4,7 @@ const Promise = require("bluebird");
const consumable = require("@joepie91/consumable"); const consumable = require("@joepie91/consumable");
const defaultValue = require("default-value"); const defaultValue = require("default-value");
const debug = require("debug")("promistream:parallelize"); const debug = require("debug")("promistream:parallelize");
const createRequestMatcher = require("request-matcher");
const isEndOfStream = require("@promistream/is-end-of-stream"); const isEndOfStream = require("@promistream/is-end-of-stream");
const isAborted = require("@promistream/is-aborted"); const isAborted = require("@promistream/is-aborted");
@ -11,10 +12,8 @@ const propagateAbort = require("@promistream/propagate-abort");
const pipe = require("@promistream/pipe"); const pipe = require("@promistream/pipe");
const sequentialize = require("@promistream/sequentialize"); const sequentialize = require("@promistream/sequentialize");
const createRequestQueue = require("./request-queue");
const asyncWhile = require("./async-while"); const asyncWhile = require("./async-while");
function isRejectedWithMarker(promise) { function isRejectedWithMarker(promise) {
if (promise.isRejected()) { if (promise.isRejected()) {
let reason = promise.reason(); let reason = promise.reason();
@ -37,7 +36,7 @@ module.exports = function parallelizeStream(threadCount, options = {}) {
let peekPointer = 0; let peekPointer = 0;
let markerBuffer = consumable([]); let markerBuffer = consumable([]);
let requestQueue = createRequestQueue({ let requestMatcher = createRequestMatcher({
onMatch: () => { onMatch: () => {
tryStartFilling(); tryStartFilling();
} }
@ -52,7 +51,7 @@ module.exports = function parallelizeStream(threadCount, options = {}) {
return false; return false;
} else if (ended) { } else if (ended) {
// Special case: never optimistically read when the stream has been ended, because it will lead to an unnecessary read // Special case: never optimistically read when the stream has been ended, because it will lead to an unnecessary read
let canRead = (requestQueue.requestCount() > 0); let canRead = (requestMatcher.requestCount() > 0);
debug(` [filling] stream ended; is there a request in queue? ${canRead}`); debug(` [filling] stream ended; is there a request in queue? ${canRead}`);
return canRead; return canRead;
@ -87,7 +86,7 @@ module.exports = function parallelizeStream(threadCount, options = {}) {
filling = false; filling = false;
}).catch((error) => { }).catch((error) => {
debug(` [filling] error:`, error); debug(` [filling] error:`, error);
requestQueue.failAllRequests(error); requestMatcher.failAllRequests(error);
}); });
} }
} }
@ -116,7 +115,7 @@ module.exports = function parallelizeStream(threadCount, options = {}) {
if (ordered) { if (ordered) {
// In ordered mode, (a Promise for) the result is reported to the queue immediately so that it gets matched to a read request in-order // In ordered mode, (a Promise for) the result is reported to the queue immediately so that it gets matched to a read request in-order
debug("[response] pushed in-flight read operation"); debug("[response] pushed in-flight read operation");
requestQueue.pushResponse(readOperation); requestMatcher.pushResponse(readOperation);
// Unhandled rejection silencer // Unhandled rejection silencer
readOperation.catch(noop); readOperation.catch(noop);
@ -129,12 +128,12 @@ module.exports = function parallelizeStream(threadCount, options = {}) {
markerBuffer.peek().push(readOperation); markerBuffer.peek().push(readOperation);
} else { } else {
debug("[response] pushed read result"); debug("[response] pushed read result");
requestQueue.pushResponse(readOperation); requestMatcher.pushResponse(readOperation);
} }
if (ended === true && threadsRunning === 0 && markerBuffer.peek().length > 0) { if (ended === true && threadsRunning === 0 && markerBuffer.peek().length > 0) {
for (let marker of markerBuffer.replace([])) { for (let marker of markerBuffer.replace([])) {
requestQueue.pushResponse(marker); requestMatcher.pushResponse(marker);
} }
} }
}).catch(noop); // Unhandled rejection silencer }).catch(noop); // Unhandled rejection silencer
@ -160,7 +159,7 @@ module.exports = function parallelizeStream(threadCount, options = {}) {
peek: async function (source) { peek: async function (source) {
debug("[peek] requested"); debug("[peek] requested");
if (requestQueue.responseCount() > peekPointer) { if (requestMatcher.responseCount() > peekPointer) {
peekPointer += 1; peekPointer += 1;
return true; return true;
@ -179,7 +178,7 @@ module.exports = function parallelizeStream(threadCount, options = {}) {
} }
// We leave it up to the request queue to match a read request to a result - we don't assume that it will be the next read, because in unordered mode reads can complete out-of-order // We leave it up to the request queue to match a read request to a result - we don't assume that it will be the next read, because in unordered mode reads can complete out-of-order
let request = requestQueue.pushRequest(); let request = requestMatcher.pushRequest();
debug("[request] started"); debug("[request] started");
// NOTE: This should always happen *after* creating a request in the queue, to ensure correct behaviour when the stream has ended; otherwise canStartRead may be called while the requestCount is still 0, and it will fail to initiate a read to satisfy the request // NOTE: This should always happen *after* creating a request in the queue, to ensure correct behaviour when the stream has ended; otherwise canStartRead may be called while the requestCount is still 0, and it will fail to initiate a read to satisfy the request

@ -1,6 +1,6 @@
{ {
"name": "@promistream/parallelize", "name": "@promistream/parallelize",
"version": "0.1.2", "version": "0.1.3",
"main": "index.js", "main": "index.js",
"repository": "http://git.cryto.net/promistream/parallelize.git", "repository": "http://git.cryto.net/promistream/parallelize.git",
"author": "Sven Slootweg <admin@cryto.net>", "author": "Sven Slootweg <admin@cryto.net>",
@ -15,7 +15,8 @@
"bluebird": "^3.5.4", "bluebird": "^3.5.4",
"debug": "^4.1.1", "debug": "^4.1.1",
"default-value": "^1.0.0", "default-value": "^1.0.0",
"p-defer": "^3" "p-defer": "^3",
"request-matcher": "^0.1.0"
}, },
"devDependencies": { "devDependencies": {
"@promistream/collect": "^0.1.1", "@promistream/collect": "^0.1.1",

@ -1,63 +0,0 @@
"use strict";
const Promise = require("bluebird");
const pDefer = require("p-defer");
module.exports = function createRequestQueue(options = {}) {
let requestBuffer = [];
let responseBuffer = [];
let seenError;
function failAllRequests(error) {
let failedRequests = requestBuffer;
requestBuffer = [];
seenError = error;
for (let request of failedRequests) {
request.reject(error);
}
}
function maybeOnMatch() {
if (options.onMatch != null) {
return Promise.try(() => {
return options.onMatch();
}).catch((error) => {
failAllRequests(error);
});
}
}
return {
pushRequest: function () {
if (responseBuffer.length > 0) {
let returnValue = Promise.resolve(responseBuffer.shift());
maybeOnMatch();
return returnValue;
} else if (seenError !== undefined) {
return Promise.reject(seenError);
} else {
let { resolve, reject, promise } = pDefer();
requestBuffer.push({ resolve, reject });
return promise;
}
},
pushResponse: function (response) {
if (requestBuffer.length > 0) {
let request = requestBuffer.shift();
request.resolve(response);
maybeOnMatch();
} else {
responseBuffer.push(response);
}
},
failAllRequests: failAllRequests,
requestCount: function () {
return requestBuffer.length;
},
responseCount: function () {
return responseBuffer.length;
}
};
};

@ -621,6 +621,16 @@ p-defer@^3, p-defer@^3.0.0:
resolved "https://registry.yarnpkg.com/p-defer/-/p-defer-3.0.0.tgz#d1dceb4ee9b2b604b1d94ffec83760175d4e6f83" resolved "https://registry.yarnpkg.com/p-defer/-/p-defer-3.0.0.tgz#d1dceb4ee9b2b604b1d94ffec83760175d4e6f83"
integrity sha512-ugZxsxmtTln604yeYd29EGrNhazN2lywetzpKhfmQjW/VJmhpDmWbiX+h0zL8V91R0UXkhb3KtPmyq9PZw3aYw== integrity sha512-ugZxsxmtTln604yeYd29EGrNhazN2lywetzpKhfmQjW/VJmhpDmWbiX+h0zL8V91R0UXkhb3KtPmyq9PZw3aYw==
request-matcher@^0.1.0:
version "0.1.0"
resolved "https://registry.yarnpkg.com/request-matcher/-/request-matcher-0.1.0.tgz#eacbfd7f71698e47c3c46bcc1c38140124e9e11d"
integrity sha512-2wzZ0F6nd3lZAGOfgHXLaOXQ7jVAPBBo7gWUuwERZt1uzXDdJ8Gurl5Agjf2XP+dlw2waC0jIz1I3boH8J0J+w==
dependencies:
"@joepie91/consumable" "^1.0.1"
bluebird "^3.7.2"
default-value "^1.0.0"
p-defer "^3"
split-filter-n@^1.1.2: split-filter-n@^1.1.2:
version "1.1.2" version "1.1.2"
resolved "https://registry.yarnpkg.com/split-filter-n/-/split-filter-n-1.1.2.tgz#268be1ec9c4d93dfb27b030c06165ac1b6f70f66" resolved "https://registry.yarnpkg.com/split-filter-n/-/split-filter-n-1.1.2.tgz#268be1ec9c4d93dfb27b030c06165ac1b6f70f66"

Loading…
Cancel
Save