From 4842920469605968ee84f648f15fc8376c191f4a Mon Sep 17 00:00:00 2001 From: Sven Slootweg Date: Tue, 6 Jul 2021 01:24:17 +0200 Subject: [PATCH] Move request-queue into standalone package --- index.js | 19 +++++++-------- package.json | 3 ++- request-queue.js | 63 ------------------------------------------------ yarn.lock | 10 ++++++++ 4 files changed, 21 insertions(+), 74 deletions(-) delete mode 100644 request-queue.js diff --git a/index.js b/index.js index 6d80ef9..4385dcf 100644 --- a/index.js +++ b/index.js @@ -4,6 +4,7 @@ const Promise = require("bluebird"); const consumable = require("@joepie91/consumable"); const defaultValue = require("default-value"); const debug = require("debug")("promistream:parallelize"); +const createRequestMatcher = require("request-matcher"); const isEndOfStream = require("@promistream/is-end-of-stream"); const isAborted = require("@promistream/is-aborted"); @@ -11,10 +12,8 @@ const propagateAbort = require("@promistream/propagate-abort"); const pipe = require("@promistream/pipe"); const sequentialize = require("@promistream/sequentialize"); -const createRequestQueue = require("./request-queue"); const asyncWhile = require("./async-while"); - function isRejectedWithMarker(promise) { if (promise.isRejected()) { let reason = promise.reason(); @@ -37,7 +36,7 @@ module.exports = function parallelizeStream(threadCount, options = {}) { let peekPointer = 0; let markerBuffer = consumable([]); - let requestQueue = createRequestQueue({ + let requestMatcher = createRequestMatcher({ onMatch: () => { tryStartFilling(); } @@ -52,7 +51,7 @@ module.exports = function parallelizeStream(threadCount, options = {}) { return false; } else if (ended) { // Special case: never optimistically read when the stream has been ended, because it will lead to an unnecessary read - let canRead = (requestQueue.requestCount() > 0); + let canRead = (requestMatcher.requestCount() > 0); debug(` [filling] stream ended; is there a request in queue? ${canRead}`); return canRead; @@ -87,7 +86,7 @@ module.exports = function parallelizeStream(threadCount, options = {}) { filling = false; }).catch((error) => { debug(` [filling] error:`, error); - requestQueue.failAllRequests(error); + requestMatcher.failAllRequests(error); }); } } @@ -116,7 +115,7 @@ module.exports = function parallelizeStream(threadCount, options = {}) { if (ordered) { // In ordered mode, (a Promise for) the result is reported to the queue immediately so that it gets matched to a read request in-order debug("[response] pushed in-flight read operation"); - requestQueue.pushResponse(readOperation); + requestMatcher.pushResponse(readOperation); // Unhandled rejection silencer readOperation.catch(noop); @@ -129,12 +128,12 @@ module.exports = function parallelizeStream(threadCount, options = {}) { markerBuffer.peek().push(readOperation); } else { debug("[response] pushed read result"); - requestQueue.pushResponse(readOperation); + requestMatcher.pushResponse(readOperation); } if (ended === true && threadsRunning === 0 && markerBuffer.peek().length > 0) { for (let marker of markerBuffer.replace([])) { - requestQueue.pushResponse(marker); + requestMatcher.pushResponse(marker); } } }).catch(noop); // Unhandled rejection silencer @@ -160,7 +159,7 @@ module.exports = function parallelizeStream(threadCount, options = {}) { peek: async function (source) { debug("[peek] requested"); - if (requestQueue.responseCount() > peekPointer) { + if (requestMatcher.responseCount() > peekPointer) { peekPointer += 1; return true; @@ -179,7 +178,7 @@ module.exports = function parallelizeStream(threadCount, options = {}) { } // We leave it up to the request queue to match a read request to a result - we don't assume that it will be the next read, because in unordered mode reads can complete out-of-order - let request = requestQueue.pushRequest(); + let request = requestMatcher.pushRequest(); debug("[request] started"); // NOTE: This should always happen *after* creating a request in the queue, to ensure correct behaviour when the stream has ended; otherwise canStartRead may be called while the requestCount is still 0, and it will fail to initiate a read to satisfy the request diff --git a/package.json b/package.json index e28ff52..3415aa8 100644 --- a/package.json +++ b/package.json @@ -15,7 +15,8 @@ "bluebird": "^3.5.4", "debug": "^4.1.1", "default-value": "^1.0.0", - "p-defer": "^3" + "p-defer": "^3", + "request-matcher": "^0.1.0" }, "devDependencies": { "@promistream/collect": "^0.1.1", diff --git a/request-queue.js b/request-queue.js deleted file mode 100644 index 255173b..0000000 --- a/request-queue.js +++ /dev/null @@ -1,63 +0,0 @@ -"use strict"; - -const Promise = require("bluebird"); -const pDefer = require("p-defer"); - -module.exports = function createRequestQueue(options = {}) { - let requestBuffer = []; - let responseBuffer = []; - let seenError; - - function failAllRequests(error) { - let failedRequests = requestBuffer; - requestBuffer = []; - - seenError = error; - - for (let request of failedRequests) { - request.reject(error); - } - } - - function maybeOnMatch() { - if (options.onMatch != null) { - return Promise.try(() => { - return options.onMatch(); - }).catch((error) => { - failAllRequests(error); - }); - } - } - - return { - pushRequest: function () { - if (responseBuffer.length > 0) { - let returnValue = Promise.resolve(responseBuffer.shift()); - maybeOnMatch(); - return returnValue; - } else if (seenError !== undefined) { - return Promise.reject(seenError); - } else { - let { resolve, reject, promise } = pDefer(); - requestBuffer.push({ resolve, reject }); - return promise; - } - }, - pushResponse: function (response) { - if (requestBuffer.length > 0) { - let request = requestBuffer.shift(); - request.resolve(response); - maybeOnMatch(); - } else { - responseBuffer.push(response); - } - }, - failAllRequests: failAllRequests, - requestCount: function () { - return requestBuffer.length; - }, - responseCount: function () { - return responseBuffer.length; - } - }; -}; diff --git a/yarn.lock b/yarn.lock index 683bed0..d3986de 100644 --- a/yarn.lock +++ b/yarn.lock @@ -621,6 +621,16 @@ p-defer@^3, p-defer@^3.0.0: resolved "https://registry.yarnpkg.com/p-defer/-/p-defer-3.0.0.tgz#d1dceb4ee9b2b604b1d94ffec83760175d4e6f83" integrity sha512-ugZxsxmtTln604yeYd29EGrNhazN2lywetzpKhfmQjW/VJmhpDmWbiX+h0zL8V91R0UXkhb3KtPmyq9PZw3aYw== +request-matcher@^0.1.0: + version "0.1.0" + resolved "https://registry.yarnpkg.com/request-matcher/-/request-matcher-0.1.0.tgz#eacbfd7f71698e47c3c46bcc1c38140124e9e11d" + integrity sha512-2wzZ0F6nd3lZAGOfgHXLaOXQ7jVAPBBo7gWUuwERZt1uzXDdJ8Gurl5Agjf2XP+dlw2waC0jIz1I3boH8J0J+w== + dependencies: + "@joepie91/consumable" "^1.0.1" + bluebird "^3.7.2" + default-value "^1.0.0" + p-defer "^3" + split-filter-n@^1.1.2: version "1.1.2" resolved "https://registry.yarnpkg.com/split-filter-n/-/split-filter-n-1.1.2.tgz#268be1ec9c4d93dfb27b030c06165ac1b6f70f66"