From 4203bb1d5a890c9f307907b42c2efc99c06a425c Mon Sep 17 00:00:00 2001 From: Sven Slootweg Date: Sat, 27 Mar 2021 19:10:52 +0100 Subject: [PATCH] Initial commit --- .gitignore | 2 ++ index.js | 26 +++++++++++++++++++++++ package.json | 14 +++++++++++++ sequential-queue.js | 51 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 93 insertions(+) create mode 100644 .gitignore create mode 100644 index.js create mode 100644 package.json create mode 100644 sequential-queue.js diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..97008e5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +node_modules +yarn.lock \ No newline at end of file diff --git a/index.js b/index.js new file mode 100644 index 0000000..b084a6d --- /dev/null +++ b/index.js @@ -0,0 +1,26 @@ +"use strict"; + +const propagateAbort = require("@promistream/propagate-abort"); + +const createSequentialQueue = require("./sequential-queue"); + +module.exports = function sequentialize() { + let withQueue = createSequentialQueue(); + + return { + _promistreamVersion: 0, + description: `sequentialize`, + // FIXME: We don't queue up aborts because once a downstream has encountered an error, it may have stopped trying to read, and we would deadlock. While the Aborted marker reads *do* get queued, the abort itself should probably be immediate. Need to make sure that this doesn't clash with any other part of the spec. + abort: propagateAbort, + peek: function peek(source) { + return withQueue(() => { + return source.peek(); + }); + }, + read: function read(source) { + return withQueue(() => { + return source.read(); + }); + } + }; +}; diff --git a/package.json b/package.json new file mode 100644 index 0000000..97348ee --- /dev/null +++ b/package.json @@ -0,0 +1,14 @@ +{ + "name": "@promistream/sequentialize", + "version": "0.1.0", + "main": "index.js", + "repository": "http://git.cryto.net/promistream/sequentialize.git", + "author": "Sven Slootweg ", + "license": "WTFPL OR CC0-1.0", + "dependencies": { + "@joepie91/unreachable": "^1.0.0", + "@promistream/propagate-abort": "^0.1.2", + "bluebird": "^3.5.4", + "p-defer": "^3.0.0" + } +} diff --git a/sequential-queue.js b/sequential-queue.js new file mode 100644 index 0000000..3dbccd2 --- /dev/null +++ b/sequential-queue.js @@ -0,0 +1,51 @@ +"use strict"; + +// FIXME: Make this a stand-alone package; it should be useful as a general-purpose mechanism for sequentializing multiple asynchronous operations that may originate from multiple different callsites (like with read/peek/abort) + +const Promise = require("bluebird"); +const pDefer = require("p-defer"); +const unreachable = require("@joepie91/unreachable")("@promistream/sequentialize"); // FIXME: Change name when moved out into a stand-alone package + +module.exports = function createSequentialQueue() { + /* TODO: Does this need a more efficient FIFO queue implementation? */ + let queue = []; + let processing = false; + + function nextItem() { + if (queue.length > 0) { + let item = queue.shift(); + item(); + } else { + unreachable("Tried to process an item from an empty queue"); + } + } + + function tryStart() { + if (processing === false) { + processing = true; + nextItem(); + } + } + + function markCompleted() { + if (queue.length > 0) { + nextItem(); + } else { + processing = false; + } + } + + return function withQueue(callback) { + let { resolve, promise } = pDefer(); + queue.push(resolve); + tryStart(); + + return Promise.try(() => { + return promise; + }).then(() => { + return callback(); + }).tap(() => { + markCompleted(); + }); + }; +}