commit cd732b7447668885ded63ad66773185562cfe34f Author: Sven Slootweg Date: Wed Jun 19 00:42:35 2024 +0200 Initial version diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3c3629e --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +node_modules diff --git a/README.md b/README.md new file mode 100644 index 0000000..dd025ca --- /dev/null +++ b/README.md @@ -0,0 +1,83 @@ +# push-buffer + +An abstraction for handling buffering of asynchronous pull-based and push-based operations. + +- Request a value from the buffer, receive a Promise. +- Depending on mode of operation, values are either acquired through an asynchronous callback (pull), or provided externally at a later point (push). +- Values that come in when there are no outstanding request are buffered, to be reconciled with later requests; ie. the abstraction matches both requests *and* results, and reconciles them in an order-preserving manner. +- Support for dividing values into 'lanes'; ie. independent request queues where values get assigned to specific lanes based on some predicate function, for implementing complex value distribution patterns. +- Supports [@promistream/NoValue](https://www.npmjs.com/package/@promistream/no-value) to explicitly represent cases where no value is produced; this means you can send `null` and `undefined` to a queue just like any other value. +- Values and errors can be sent to one lane, multiple lanes (multicast), or all lanes (broadcast). + +## Typical usecases + +- Implementing [Promistreams](https://promistream.cryto.net/), particularly forking streams. +- Implementing asynchronous task queues and task distribution systems. +- Splitting out asynchronously obtain values into different worker threads. +- Converting push-based APIs (eg. EventEmitters) into pull-based APIs (asynchronous iterators, Promistreams, pull-streams, etc.) + +## Caveats + +- The internal queues are unbounded, and so may use an arbitrary amount of memory. You should ensure that you are continuously requesting values from all lanes, so that they never become too full. +- Likewise, there is nothing that prevents results from sitting in a lane forever and never actually being read, even when the process terminates; you must continuously drive reads to ensure you are not missing any values. +- The buffer can currently only operate in either push *or* pull mode, because there are no obviously-correct semantics for a combined mode; if you still need a combined mode, please file an issue describing your exact usecase, so that I can better understand what the real-world requirements of such cases are. +- While using this abstraction is much less error-prone than manually implementing asynchronous reconciliation, it is still somewhat difficult to get right, and very difficult to debug when you don't; if possible, prefer using higher-level off-the-shelf libraries for your usecase instead. + +## Lanes + +This library supports a concept of 'lanes'. Each lane is an independent queue of values and errors; you make a request for a value to a specific lane, and values and errors are assigned *to* specific lane, even though the source of values is shared among all of them. Additionally, it is possible to broadcast errors or values to all lanes. + +If you just want to convert a push-based API to a pull-based API, you probably won't need lanes, and you can safely ignore all options related to it. However, if you are implementing some kind of value or task distribution mechanism, you will probably want to use lanes to ensure that the right values end up at the right place. It's up to you what to do with the lane assignments; they're just a list of zero-indexed queues internally, and you can map the lane indexes to some sort of other lookup table in your own code if needed. + +If you are implementing a forking Promistream, the most useful lane configuration will most likely be: one lane per output stream, with error broadcasts enabled. + +## Example + +## API + +### let buffer = pushBuffer(options) + +Note that using lanes is __optional__; all options and functions default to lane 0, which is the only lane when no lane count is specified (and so it functions as if lanes didn't exist). + +- __options:__ The settings for this pushBuffer. + - __mode:__ *Default: `"pull"`.* The mode to operate in. One of `"push"`, `"pull"`. + - __lanes__ *Default: `1`.* The amount of lanes to create. Each lane is an independent queue of values and errors, though broadcasts are possible. + - __pull:__ *Required only in pull mode.* An (`async`) callback that's called to acquire a new value/result, and which is expected to return a Promise. This should be providing your values in pull mode. + - __select:__ *Default: `0`.* A callback that receives a value, and returns the index of the lane to assign it to (or an array of such indexes). The callback may return a Promise. + - __selectError:__ *Default: `0`.* Like `select`, but it receives errors instead of values. + - __broadcastValues:__ *Default: `false`.* Whether to broadcast all values to all lanes by default. The `select` callback will not be called for broadcast values. In push mode, this can be overridden for individual values. + - __broadcastErrors:__ *Default: `true`.* Like `broadcastValues`, but for errors. The `selectError` callback will not be called for broadcast errors. In push mode, this can be overridden for individual errors. + +__Returns:__ A new pushBuffer. + +## buffer.request(lane) + +Requests the next value (for a given lane). + +- __lane:__ *Default: `0`.* The lane to request a value for. + +__Returns:__ a Promise that will eventually either resolve or reject, depending on the value/error acquired. + +## buffer.push(value, broadcast) + +Pushes a value (or a Promise) to the next pending request. + +- __value:__ *Required.* The value to push. +- __broadcast:__ *Default: the `broadcastValues` setting.* Whether to broadcast this value to all lanes. + +## buffer.pushError(error, broadcast) + +Pushes an error to the next pending request. Note that if you wish to push a Promise that *may* fail (rather than an error you already have), you should use `buffer.push` instead; it will automatically be handled as an error if it ends up rejecting. + +- __error:__ *Required.* The value to push. +- __broadcast:__ *Default: the `broadcastErrors` setting.* Whether to broadcast this error to all lanes. + +## buffer.countLane(lane) + +Provides queue lengths for the given lane. + +- __lane:__ *Default: `0`.* The lane to request a value for. + +__Returns:__ an object with queue length properties: +- __values:__ The amount of pending results that have not been reconciled with a request yet - note that this includes errors/rejections! +- __requests:__ The amount of pending requests that have not been reconciled with a result yet. diff --git a/example.js b/example.js new file mode 100644 index 0000000..ae7eb3b --- /dev/null +++ b/example.js @@ -0,0 +1,23 @@ +"use strict"; + +const pushBuffer = require("./"); + +function promiseDelay(time) { + return new Promise((resolve, reject) => { + setTimeout(resolve, time); + }); +} + +(async function () { + let i = 0; + let buffer = pushBuffer({ + lanes: 2, + pull: async () => promiseDelay(Math.random() * 50).then(() => i++), + select: (value) => value % 2 + }); + + for (let n = 0; n < 100; n++) { + buffer.request(n % 2).then((value) => console.log(`[lane ${n % 2}] ${n} -> ${value}`)) + await promiseDelay(Math.random() * 50); + } +})(); diff --git a/index.js b/index.js new file mode 100644 index 0000000..c60a78a --- /dev/null +++ b/index.js @@ -0,0 +1,288 @@ +"use strict"; + +const capturePromise = require("capture-promise"); +const singleConcurrent = require("single-concurrent"); +const assureArray = require("assure-array"); +const promiseDefer = require("@joepie91/promise-defer"); +const debug = require("debug")("push-buffer"); +const NoValue = require("@promistream/no-value"); + +const { validateOptions } = require("@validatem/core"); +const required = require("@validatem/required"); +const isInteger = require("@validatem/is-integer"); +const isFunction = require("@validatem/is-function"); +const defaultTo = require("@validatem/default-to"); +const oneOf = require("@validatem/one-of"); +const dynamic = require("@validatem/dynamic"); +const isBoolean = require("@validatem/is-boolean"); + +/* TODO: +- Add limits for buffer size? This is difficult to implement since we don't know upfront what lane a value will be assigned to, and buffers are ideally per-lane. +*/ + +function suppressUncaughtRejection(promise) { + promise.catch((error) => { + // We do log the suppressed error, for debugging purposes + debug("error temporarily suppressed;", error); + }); + + return promise; +} + +module.exports = function createPushBuffer(_options) { + function requiredIfMode(acceptableModes) { + return dynamic((_value) => { + // This is a bit hacky, but currently necessary to make it work how we want + let mode = _options.mode ?? "pull"; + let acceptableArray = assureArray(acceptableModes); + + if (acceptableArray.includes(mode)) { + return [ required ]; + } else { + return []; + } + }); + } + + let { lanes, pull, select, selectError, mode, broadcastErrors, broadcastValues } = validateOptions(arguments, { + lanes: [ defaultTo(1), isInteger ], + // Currently only either push or pull mode is supported; if you need both at the same time, file an issue! + mode: [ defaultTo("pull"), oneOf([ "push", "pull" ]) ], + pull: [ requiredIfMode("pull"), isFunction ], + select: [ defaultTo.literal((_value) => 0), isFunction ], + selectError: [ defaultTo.literal((_value) => 0), isFunction ], + broadcastValues: [ defaultTo(false), isBoolean ], + broadcastErrors: [ defaultTo(true), isBoolean ], + // append == add after all pending pulls (including pushed promises!), insert == send to outstanding requests immediately (so basically insert *before* pending pulls) + }); + + let pushOrder = "append"; // Temporary; this infrastructure exists to support a potential later combined mode + + let pullQueue = []; + let settlementMap = new WeakMap(); + + let valueQueues = []; + let requestQueues = []; + + for (let i = 0; i < lanes; i++) { + valueQueues[i] = []; + requestQueues[i] = []; + } + + function getLane(lane) { + if (lane < lanes) { + return { + values: valueQueues[lane], + requests: requestQueues[lane] + }; + } else { + throw new Error(`Tried to access lane ${lane}, but only lanes 0-${lanes-1} exist`); + } + } + + function storePull(pullPromise) { + settlementMap.set(pullPromise, false); + pullQueue.push(pullPromise); + debug("queued pull"); + + let actualPromise = (pullPromise.__preset === true) + ? pullPromise.promise + : pullPromise; + + // Note that we are storing the *original* pullPromise for later use, and not this 'notification chain' below, because otherwise we would be swallowing errors. + suppressUncaughtRejection(actualPromise.finally(() => { + settlementMap.set(pullPromise, true); + debug("marked pull as settled"); + + tryRunReconciliation(); + })); + } + + function doPull() { + let pullPromise = capturePromise(() => pull()); + storePull(pullPromise); + } + + let tryRunReconciliation = singleConcurrent(async function reconcile() { + if (pullQueue.length === 0) { + debug("reconciliation: empty pullQueue; finishing loop"); + + let laneLengths = requestQueues.map((queue) => queue.length); + let longestQueue = Math.max(... laneLengths); + + if (longestQueue > 0) { + debug(`there are still requests pending; kicking off ${longestQueue} new pulls to satisfy longest queue`); + debug(` lane lengths:`, laneLengths); + + for (let i = 0; i < longestQueue; i++) { + doPull(); + } + } + + return; + } + + let nextPromise = pullQueue[0]; + + if (settlementMap.get(nextPromise) !== true) { + debug(`reconciliation: nextPromise not settled yet; finishing loop`); + return; + } + + debug(`reconciliation: nextPromise settled`); + pullQueue.shift(); // Remove it from the queue + + let value; + let broadcast = (nextPromise.__preset === true) ? nextPromise.broadcast : undefined; + let promise = (nextPromise.__preset === true) ? nextPromise.promise : nextPromise; + + try { + value = await promise; + } catch (error) { + await sendError(error, broadcast); + return await reconcile(); + } + + if (value !== NoValue) { + await sendValue(value, broadcast); + } + + return await reconcile(); + }); + + async function sendValue(value, broadcast = broadcastValues) { + if (broadcast) { + return sendValueBroadcast(value); + } else { + return sendValueLane(value); + } + } + + async function sendValueLane(value) { + let lane = await select(value); + debug(`reconciliation: result: resolved, allocated to lane ${lane}`); + + if (Array.isArray(lane)) { + for (let i of lane) { + sendToLane(i, Promise.resolve(value)); + } + } else { + sendToLane(lane, Promise.resolve(value)); + } + } + + async function sendValueBroadcast(value) { + debug(`reconciliation: result: resolved, broadcasting`); + + requestQueues.forEach((queue, i) => { + debug(`reconciliation: (sending to lane ${i})`); + sendToLane(i, Promise.resolve(error)); + }); + } + + async function sendError(value, broadcast = broadcastErrors) { + if (broadcast) { + return sendErrorBroadcast(value); + } else { + return sendErrorLane(value); + } + } + + async function sendErrorLane(error) { + let lane = await selectError(error); + debug(`reconciliation: result: rejected, allocated to lane ${lane}`); + + if (Array.isArray(lane)) { + for (let i of lane) { + sendToLane(i, suppressUncaughtRejection(Promise.reject(value))); + } + } else { + sendToLane(lane, suppressUncaughtRejection(Promise.reject(value))); + } + } + + async function sendErrorBroadcast(error) { + debug(`reconciliation: result: rejected, broadcasting`); + + requestQueues.forEach((queue, i) => { + debug(`reconciliation: (sending to lane ${i})`); + sendToLane(i, suppressUncaughtRejection(Promise.reject(error))); + }); + } + + // We use Promises as a Result type of sorts here, which is fine since this is all async anyway + function sendToLane(lane, promise) { + let { requests, values } = getLane(lane); + + debug(`sendToQueue: lane ${lane}; request = ${requests.length}, values = ${values.length}`); + + if (requests.length > 0) { + debug(`sendToQueue: satisfying request`); + let request = requests.shift(); + request.resolve(promise); + } else { + debug(`sendToQueue: storing in value buffer`); + values.push(promise); + } + } + + return { + request: function (lane = 0) { + debug(`value requested from lane ${lane}`); + let { values, requests } = getLane(lane); + + if (values.length > 0) { + debug(`value immediately available; reconciling`); + return Promise.resolve(values.shift()); + } else { + let defer = promiseDefer(); + requests.push(defer); + + if (mode === "pull") { + // We don't need to handle errors in doPull because errors in the actual pull callback are propagated through the defer. Errors while *initiating* the pull should be thrown from here (which happens automatically because `doPull` is itself synchronous). + doPull(); + } + + return defer.promise; + } + }, + push: async function (value, broadcast) { + if (mode === "push") { + debug(`reconciliation: value pushed`); + + if (pushOrder === "insert") { + await sendValue(value, broadcast); + } else if (pushOrder === "append") { + storePull({ __preset: true, broadcast: broadcast, promise: Promise.resolve(value) }); + } else { + throw new Error(`Unreachable`); + } + } else { + throw new Error(`This buffer is operating in pull mode; pushing values is not supported. Switch to push mode instead, if this is what you need.`); + } + }, + pushError: async function (error, broadcast) { + if (mode === "push") { + debug(`reconciliation: error pushed`); + + if (pushOrder === "insert") { + await sendError(error, broadcast); + } else if (pushOrder === "append") { + storePull({ __preset: true, broadcast: broadcast, promise: Promise.reject(error) }); + } else { + throw new Error(`Unreachable`); + } + } else { + throw new Error(`This buffer is operating in pull mode; pushing errors is not supported. Switch to push mode instead, if this is what you need.`); + } + }, + countLane: function (lane = 0) { + let { values, requests } = getLane(lane); + + return { + values: values.length, + requests: requests.length + }; + } + }; +}; diff --git a/package.json b/package.json new file mode 100644 index 0000000..4555cb7 --- /dev/null +++ b/package.json @@ -0,0 +1,35 @@ +{ + "name": "push-buffer", + "version": "1.0.0", + "description": "Abstraction for converting between push/pull APIs, and managing asynchronous value distribution", + "main": "index.js", + "files": [ + "index.js", + "example.js", + "test.js", + "README.md" + ], + "repository": "http://git.cryto.net/joepie91/push-buffer.git", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "keywords": ["buffer", "async", "promises", "push", "pull", "queue", "streams"], + "author": "Sven Slootweg ", + "license": "WTFPL OR CC0-1.0", + "dependencies": { + "@joepie91/promise-defer": "^1.0.1", + "@promistream/no-value": "^1.0.0", + "@validatem/core": "^0.5.0", + "@validatem/default-to": "^0.1.0", + "@validatem/dynamic": "^0.1.2", + "@validatem/is-boolean": "^0.1.1", + "@validatem/is-function": "^0.1.0", + "@validatem/is-integer": "^0.1.0", + "@validatem/one-of": "^0.1.1", + "@validatem/required": "^0.1.1", + "assure-array": "^1.0.0", + "capture-promise": "^1.0.0", + "debug": "^4.3.5", + "single-concurrent": "^1.0.0" + } +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml new file mode 100644 index 0000000..4303920 --- /dev/null +++ b/pnpm-lock.yaml @@ -0,0 +1,453 @@ +lockfileVersion: '6.0' + +settings: + autoInstallPeers: true + excludeLinksFromLockfile: false + +dependencies: + '@joepie91/promise-defer': + specifier: ^1.0.1 + version: 1.0.1 + '@promistream/no-value': + specifier: ^1.0.0 + version: 1.0.0 + '@validatem/core': + specifier: ^0.5.0 + version: 0.5.0 + '@validatem/default-to': + specifier: ^0.1.0 + version: 0.1.0 + '@validatem/dynamic': + specifier: ^0.1.2 + version: 0.1.2 + '@validatem/is-boolean': + specifier: ^0.1.1 + version: 0.1.1 + '@validatem/is-function': + specifier: ^0.1.0 + version: 0.1.0 + '@validatem/is-integer': + specifier: ^0.1.0 + version: 0.1.0 + '@validatem/one-of': + specifier: ^0.1.1 + version: 0.1.1 + '@validatem/required': + specifier: ^0.1.1 + version: 0.1.1 + assure-array: + specifier: ^1.0.0 + version: 1.0.0 + capture-promise: + specifier: ^1.0.0 + version: 1.0.0 + debug: + specifier: ^4.3.5 + version: 4.3.5 + single-concurrent: + specifier: ^1.0.0 + version: 1.0.0 + +packages: + + /@joepie91/promise-defer@1.0.1: + resolution: {integrity: sha512-yjDrKMpO155Ajmk+r3RG4WNGbrwd4Mj8UHqJtBSoIL9usT6cC0jhy1yn7dzJiKmYFxvCvgxDbjcWNFNCwQ9UUw==} + dev: false + + /@promistream/no-value@1.0.0: + resolution: {integrity: sha512-PY/1gj60YKRRpzLY0UyM/b1OzXZS+5lvrVMrU3BzjTDv31P0EaCAb2X39DKQAfoX0ygiXOHewCaLVX1/+IZsTA==} + dev: false + + /@validatem/annotate-errors@0.1.2: + resolution: {integrity: sha512-EuX7pzdYI/YpTmZcgdPG481Oi3elAg8JWh/LYXuE1h6MaZk3A8eP5DD33/l7EoKzrysn6y8nCsqNa1ngei562w==} + dependencies: + '@validatem/match-validation-error': 0.1.0 + dev: false + + /@validatem/any-property@0.1.3: + resolution: {integrity: sha512-jYWxif5ff9pccu7566LIQ/4+snlApXEJUimBywzAriBgS3r4eDBbz3oZFHuiPmhxNK/NNof5YUS+L6Sk3zaMfg==} + dependencies: + '@validatem/annotate-errors': 0.1.2 + '@validatem/combinator': 0.1.2 + '@validatem/error': 1.1.0 + '@validatem/validation-result': 0.1.2 + '@validatem/virtual-property': 0.1.0 + default-value: 1.0.0 + dev: false + + /@validatem/combinator@0.1.2: + resolution: {integrity: sha512-vE8t1tNXknmN62FlN6LxQmA2c6TwVKZ+fl/Wit3H2unFdOhu7SZj2kRPGjAXdK/ARh/3svYfUBeD75pea0j1Sw==} + dev: false + + /@validatem/core@0.5.0: + resolution: {integrity: sha512-hLEdoRFRvFGUqHFFK0eR8r7sTJaqQjzB81FVMp86esZJiBrblnWhpZtzVouguoaAaKFX9oiWI3nAQc73xYrTJg==} + dependencies: + '@validatem/annotate-errors': 0.1.2 + '@validatem/any-property': 0.1.3 + '@validatem/error': 1.1.0 + '@validatem/match-validation-error': 0.1.0 + '@validatem/match-versioned-special': 0.1.1 + '@validatem/match-virtual-property': 0.1.0 + '@validatem/normalize-rules': 0.1.3 + '@validatem/required': 0.1.1 + '@validatem/validation-result': 0.1.2 + '@validatem/virtual-property': 0.1.0 + as-expression: 1.0.0 + assure-array: 1.0.0 + create-error: 0.3.1 + default-value: 1.0.0 + execall: 2.0.0 + indent-string: 4.0.0 + is-arguments: 1.1.1 + supports-color: 7.2.0 + syncpipe: 1.0.0 + dev: false + + /@validatem/default-to@0.1.0: + resolution: {integrity: sha512-UE/mJ6ZcHFlBLUhX75PQHDRYf80GFFhB+vZfIcsEWduh7Nm6lTMDnCPj4MI+jd9E/A7HV5D1yCZhaRSwoWo4vg==} + dependencies: + is-callable: 1.2.7 + dev: false + + /@validatem/dynamic@0.1.2: + resolution: {integrity: sha512-TNZMUO9McL2kFYdLWTYSD+zxxZ9fbK9Si+3X5u/JngOWAq7PFxbU7o2oxREkwiSIZi5cjBCK/hvrZMWyl+FWEA==} + dependencies: + '@validatem/combinator': 0.1.2 + dev: false + + /@validatem/error@1.1.0: + resolution: {integrity: sha512-gZJEoZq1COi/8/5v0fVKQ9uX54x5lb5HbV7mzIOhY6dqjmLNfxdQmpECZPQrCAOpcRkRMJ7zaFhq4UTslpY9yA==} + dev: false + + /@validatem/has-shape@0.1.8: + resolution: {integrity: sha512-x2i8toW1uraFF2Vl6WBl4CScbBeg5alrtoCKMyXbJkHf2B5QxL/ftUh2RQRcBzx6U0i7KUb8vdShcWAa+fehRQ==} + dependencies: + '@validatem/annotate-errors': 0.1.2 + '@validatem/combinator': 0.1.2 + '@validatem/error': 1.1.0 + '@validatem/validation-result': 0.1.2 + array-union: 2.1.0 + as-expression: 1.0.0 + assure-array: 1.0.0 + default-value: 1.0.0 + flatten: 1.0.3 + dev: false + + /@validatem/is-boolean@0.1.1: + resolution: {integrity: sha512-eIFq+mCBEDgAp4ezaPn1mbVZd2H+IkQG3CcEFnLSlqfg1XKY5uv8AOI08+UqeWS+C7AIFk3rEqRg63+OuPCpsg==} + dependencies: + '@validatem/error': 1.1.0 + is-boolean-object: 1.1.2 + dev: false + + /@validatem/is-function@0.1.0: + resolution: {integrity: sha512-UtVrwTGhaIdIJ0mPG5XkAmYZUeWgRoMP1G9ZEHbKvAZJ4+SXf/prC0jPgE0pw+sPjdQG4hblsXSfo/9Bf3PGdQ==} + dependencies: + '@validatem/error': 1.1.0 + is-callable: 1.2.7 + dev: false + + /@validatem/is-integer@0.1.0: + resolution: {integrity: sha512-sSp66uxfirIFMqro64DAdfM+UKo+IICmHdy/x3ZJXUM9F4byz/GyFmhR4wfcQswywwF1fqKw9458GE38fozjOQ==} + dependencies: + '@validatem/error': 1.1.0 + '@validatem/is-number': 0.1.3 + dev: false + + /@validatem/is-number@0.1.3: + resolution: {integrity: sha512-GjnbKYfYa0cTCJmsr5OUbylxTKHHZ6FDtJixWl+lEuXzeELDoYRp2UAjzfjTXJ9g2BumESqI/t0hap5rw5tEyQ==} + dependencies: + '@validatem/error': 1.1.0 + is-number-object: 1.0.7 + dev: false + + /@validatem/is-plain-object@0.1.1: + resolution: {integrity: sha512-aNGbNIbKRpYI0lRBczlTBbiA+nqN52ADAASdySKg2/QeSCVtYS4uOIeCNIJRAgXe/5sUnLTuL4pgq628uAl7Kw==} + dependencies: + '@validatem/error': 1.1.0 + is-plain-obj: 2.1.0 + dev: false + + /@validatem/match-special@0.1.0: + resolution: {integrity: sha512-TFiq9Wk/1Hoja4PK85WwNYnwBXk3+Lgoj59ZIMxm2an1qmNYp8j+BnSvkKBflba451yIn6V1laU9NJf+/NYZgw==} + dev: false + + /@validatem/match-validation-error@0.1.0: + resolution: {integrity: sha512-6akGTk7DdulOreyqDiGdikwRSixQz/AlvARSX18dcWaTFc79KxCLouL2hyoFcor9IIUhu5RTY4/i756y4T1yxA==} + dependencies: + '@validatem/match-versioned-special': 0.1.1 + dev: false + + /@validatem/match-versioned-special@0.1.1: + resolution: {integrity: sha512-RRNeFSgzqSo0sKck/92a+yC9zKdt+DD6y4TK70+VDKVppdWsb8YzC/FBTucseN1OYrr1KcBPKNVZePg1NTROYw==} + dev: false + + /@validatem/match-virtual-property@0.1.0: + resolution: {integrity: sha512-ssd3coFgwbLuqvZftLZTy3eHN0TFST8oTS2XTViQdXJPXVoJmwEKBpFhXgwnb5Ly1CE037R/KWpjhd1TP/56kQ==} + dev: false + + /@validatem/normalize-rules@0.1.3: + resolution: {integrity: sha512-HHPceAP2ce9NWymIZrgLCTzpdwXNRBCCB5H6ZPc5ggOrbmh4STpT83fLazleHtvYNlqgXZ4GjQOvCwrjaM+qEA==} + dependencies: + '@validatem/has-shape': 0.1.8 + '@validatem/is-plain-object': 0.1.1 + '@validatem/match-special': 0.1.0 + assure-array: 1.0.0 + default-value: 1.0.0 + flatten: 1.0.3 + is-plain-obj: 2.1.0 + dev: false + + /@validatem/one-of@0.1.1: + resolution: {integrity: sha512-lIgxnkNRouPx5Ydddi8OaAxmzp1ox44OJnrJPRrJkU4ccz9Yb7GSJ+wQJNVkAZCar+DGTDMoXoy51NwDnsf4sw==} + dependencies: + '@validatem/error': 1.1.0 + dev: false + + /@validatem/required@0.1.1: + resolution: {integrity: sha512-vI4NzLfay4RFAzp7xyU34PHb8sAo6w/3frrNh1EY9Xjnw2zxjY5oaxwmbFP1jVevBE6QQEnKogtzUHz/Zuvh6g==} + dev: false + + /@validatem/validation-result@0.1.2: + resolution: {integrity: sha512-okmP8JarIwIgfpaVcvZGuQ1yOsLKT3Egt49Ynz6h1MAeGsP/bGHXkkXtbiWOVsk5Tzku5vDVFSrFnF+5IEHKxw==} + dependencies: + default-value: 1.0.0 + dev: false + + /@validatem/virtual-property@0.1.0: + resolution: {integrity: sha512-JUUvWtdqoSkOwlsl20oB3qFHYIL05a/TAfdY4AJcs55QeVTiX5iI1b8IoQW644sIWWooBuLv+XwoxjRsQFczlQ==} + dev: false + + /array-union@2.1.0: + resolution: {integrity: sha512-HGyxoOTYUyCM6stUe6EJgnd4EoewAI7zMdfqO+kGjnlZmBDz/cR5pf8r/cR4Wq60sL/p0IkcjUEEPwS3GFrIyw==} + engines: {node: '>=8'} + dev: false + + /as-expression@1.0.0: + resolution: {integrity: sha512-Iqh4GxNUfxbJdGn6b7/XMzc8m1Dz2ZHouBQ9DDTzyMRO3VPPIAXeoY/sucRxxxXKbUtzwzWZSN6jPR3zfpYHHA==} + dev: false + + /assure-array@1.0.0: + resolution: {integrity: sha512-igvOvGYidAcJKr6YQIHzLivUpAdqUfi7MN0QfrEnFtifQvuw6D0W4oInrIVgTaefJ+QBVWAj8ZYuUGNnwq6Ydw==} + dev: false + + /call-bind@1.0.7: + resolution: {integrity: sha512-GHTSNSYICQ7scH7sZ+M2rFopRoLh8t2bLSW6BbgrtLsahOIB5iyAVJf9GjWK3cYTDaMj4XdBpM1cA6pIS0Kv2w==} + engines: {node: '>= 0.4'} + dependencies: + es-define-property: 1.0.0 + es-errors: 1.3.0 + function-bind: 1.1.2 + get-intrinsic: 1.2.4 + set-function-length: 1.2.2 + dev: false + + /capture-promise@1.0.0: + resolution: {integrity: sha512-40FXZr0YaUWw2q6T8DcUOIF885saJni87ZbAOXkwDB14nMl6M1vIYAKUs2BjoeMyqnv3PGqEcDskGUXvgNIbZw==} + dev: false + + /clone-regexp@2.2.0: + resolution: {integrity: sha512-beMpP7BOtTipFuW8hrJvREQ2DrRu3BE7by0ZpibtfBA+qfHYvMGTc2Yb1JMYPKg/JUw0CHYvpg796aNTSW9z7Q==} + engines: {node: '>=6'} + dependencies: + is-regexp: 2.1.0 + dev: false + + /create-error@0.3.1: + resolution: {integrity: sha512-n/Q4aSCtYuuDneEW5Q+nd0IIZwbwmX/oF6wKcDUhXGJNwhmp2WHEoWKz7X+/H7rBtjimInW7f0ceouxU0SmuzQ==} + dev: false + + /debug@4.3.5: + resolution: {integrity: sha512-pt0bNEmneDIvdL1Xsd9oDQ/wrQRkXDT4AUWlNZNPKvW5x/jyO9VFXkJUP07vQ2upmw5PlaITaPKc31jK13V+jg==} + engines: {node: '>=6.0'} + peerDependencies: + supports-color: '*' + peerDependenciesMeta: + supports-color: + optional: true + dependencies: + ms: 2.1.2 + dev: false + + /default-value@1.0.0: + resolution: {integrity: sha512-y6j7G55tgWG7nfjXUNy/WkTLGExiPEUlhGv0zqgqKdlOwJnDDy/dbk7yCozn4biAGIRnMI+9fyZ1V2fZ7tjp6Q==} + dependencies: + es6-promise-try: 0.0.1 + dev: false + + /define-data-property@1.1.4: + resolution: {integrity: sha512-rBMvIzlpA8v6E+SJZoo++HAYqsLrkg7MSfIinMPFhmkorw7X+dOXVJQs+QT69zGkzMyfDnIMN2Wid1+NbL3T+A==} + engines: {node: '>= 0.4'} + dependencies: + es-define-property: 1.0.0 + es-errors: 1.3.0 + gopd: 1.0.1 + dev: false + + /es-define-property@1.0.0: + resolution: {integrity: sha512-jxayLKShrEqqzJ0eumQbVhTYQM27CfT1T35+gCgDFoL82JLsXqTJ76zv6A0YLOgEnLUMvLzsDsGIrl8NFpT2gQ==} + engines: {node: '>= 0.4'} + dependencies: + get-intrinsic: 1.2.4 + dev: false + + /es-errors@1.3.0: + resolution: {integrity: sha512-Zf5H2Kxt2xjTvbJvP2ZWLEICxA6j+hAmMzIlypy4xcBg1vKVnx89Wy0GbS+kf5cwCVFFzdCFh2XSCFNULS6csw==} + engines: {node: '>= 0.4'} + dev: false + + /es6-promise-try@0.0.1: + resolution: {integrity: sha512-T6f3cNyF8y+3uua2IDGpGmeoDe2w7PXGfPGS94TyLfQLPzYVvZUfM8dQuN4DuVXpelK4tg9F7zKzZHzNS2f2IQ==} + dev: false + + /execall@2.0.0: + resolution: {integrity: sha512-0FU2hZ5Hh6iQnarpRtQurM/aAvp3RIbfvgLHrcqJYzhXyV2KFruhuChf9NC6waAhiUR7FFtlugkI4p7f2Fqlow==} + engines: {node: '>=8'} + dependencies: + clone-regexp: 2.2.0 + dev: false + + /flatten@1.0.3: + resolution: {integrity: sha512-dVsPA/UwQ8+2uoFe5GHtiBMu48dWLTdsuEd7CKGlZlD78r1TTWBvDuFaFGKCo/ZfEr95Uk56vZoX86OsHkUeIg==} + deprecated: flatten is deprecated in favor of utility frameworks such as lodash. + dev: false + + /function-bind@1.1.2: + resolution: {integrity: sha512-7XHNxH7qX9xG5mIwxkhumTox/MIRNcOgDrxWsMt2pAr23WHp6MrRlN7FBSFpCpr+oVO0F744iUgR82nJMfG2SA==} + dev: false + + /get-intrinsic@1.2.4: + resolution: {integrity: sha512-5uYhsJH8VJBTv7oslg4BznJYhDoRI6waYCxMmCdnTrcCrHA/fCFKoTFz2JKKE0HdDFUF7/oQuhzumXJK7paBRQ==} + engines: {node: '>= 0.4'} + dependencies: + es-errors: 1.3.0 + function-bind: 1.1.2 + has-proto: 1.0.3 + has-symbols: 1.0.3 + hasown: 2.0.2 + dev: false + + /gopd@1.0.1: + resolution: {integrity: sha512-d65bNlIadxvpb/A2abVdlqKqV563juRnZ1Wtk6s1sIR8uNsXR70xqIzVqxVf1eTqDunwT2MkczEeaezCKTZhwA==} + dependencies: + get-intrinsic: 1.2.4 + dev: false + + /has-flag@4.0.0: + resolution: {integrity: sha512-EykJT/Q1KjTWctppgIAgfSO0tKVuZUjhgMr17kqTumMl6Afv3EISleU7qZUzoXDFTAHTDC4NOoG/ZxU3EvlMPQ==} + engines: {node: '>=8'} + dev: false + + /has-property-descriptors@1.0.2: + resolution: {integrity: sha512-55JNKuIW+vq4Ke1BjOTjM2YctQIvCT7GFzHwmfZPGo5wnrgkid0YQtnAleFSqumZm4az3n2BS+erby5ipJdgrg==} + dependencies: + es-define-property: 1.0.0 + dev: false + + /has-proto@1.0.3: + resolution: {integrity: sha512-SJ1amZAJUiZS+PhsVLf5tGydlaVB8EdFpaSO4gmiUKUOxk8qzn5AIy4ZeJUmh22znIdk/uMAUT2pl3FxzVUH+Q==} + engines: {node: '>= 0.4'} + dev: false + + /has-symbols@1.0.3: + resolution: {integrity: sha512-l3LCuF6MgDNwTDKkdYGEihYjt5pRPbEg46rtlmnSPlUbgmB8LOIrKJbYYFBSbnPaJexMKtiPO8hmeRjRz2Td+A==} + engines: {node: '>= 0.4'} + dev: false + + /has-tostringtag@1.0.2: + resolution: {integrity: sha512-NqADB8VjPFLM2V0VvHUewwwsw0ZWBaIdgo+ieHtK3hasLz4qeCRjYcqfB6AQrBggRKppKF8L52/VqdVsO47Dlw==} + engines: {node: '>= 0.4'} + dependencies: + has-symbols: 1.0.3 + dev: false + + /hasown@2.0.2: + resolution: {integrity: sha512-0hJU9SCPvmMzIBdZFqNPXWa6dqh7WdH0cII9y+CyS8rG3nL48Bclra9HmKhVVUHyPWNH5Y7xDwAB7bfgSjkUMQ==} + engines: {node: '>= 0.4'} + dependencies: + function-bind: 1.1.2 + dev: false + + /indent-string@4.0.0: + resolution: {integrity: sha512-EdDDZu4A2OyIK7Lr/2zG+w5jmbuk1DVBnEwREQvBzspBJkCEbRa8GxU1lghYcaGJCnRWibjDXlq779X1/y5xwg==} + engines: {node: '>=8'} + dev: false + + /is-arguments@1.1.1: + resolution: {integrity: sha512-8Q7EARjzEnKpt/PCD7e1cgUS0a6X8u5tdSiMqXhojOdoV9TsMsiO+9VLC5vAmO8N7/GmXn7yjR8qnA6bVAEzfA==} + engines: {node: '>= 0.4'} + dependencies: + call-bind: 1.0.7 + has-tostringtag: 1.0.2 + dev: false + + /is-boolean-object@1.1.2: + resolution: {integrity: sha512-gDYaKHJmnj4aWxyj6YHyXVpdQawtVLHU5cb+eztPGczf6cjuTdwve5ZIEfgXqH4e57An1D1AKf8CZ3kYrQRqYA==} + engines: {node: '>= 0.4'} + dependencies: + call-bind: 1.0.7 + has-tostringtag: 1.0.2 + dev: false + + /is-callable@1.2.7: + resolution: {integrity: sha512-1BC0BVFhS/p0qtw6enp8e+8OD0UrK0oFLztSjNzhcKA3WDuJxxAPXzPuPtKkjEY9UUoEWlX/8fgKeu2S8i9JTA==} + engines: {node: '>= 0.4'} + dev: false + + /is-number-object@1.0.7: + resolution: {integrity: sha512-k1U0IRzLMo7ZlYIfzRu23Oh6MiIFasgpb9X76eqfFZAqwH44UI4KTBvBYIZ1dSL9ZzChTB9ShHfLkR4pdW5krQ==} + engines: {node: '>= 0.4'} + dependencies: + has-tostringtag: 1.0.2 + dev: false + + /is-plain-obj@2.1.0: + resolution: {integrity: sha512-YWnfyRwxL/+SsrWYfOpUtz5b3YD+nyfkHvjbcanzk8zgyO4ASD67uVMRt8k5bM4lLMDnXfriRhOpemw+NfT1eA==} + engines: {node: '>=8'} + dev: false + + /is-regexp@2.1.0: + resolution: {integrity: sha512-OZ4IlER3zmRIoB9AqNhEggVxqIH4ofDns5nRrPS6yQxXE1TPCUpFznBfRQmQa8uC+pXqjMnukiJBxCisIxiLGA==} + engines: {node: '>=6'} + dev: false + + /ms@2.1.2: + resolution: {integrity: sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==} + dev: false + + /set-function-length@1.2.2: + resolution: {integrity: sha512-pgRc4hJ4/sNjWCSS9AmnS40x3bNMDTknHgL5UaMBTMyJnU90EgWh1Rz+MC9eFu4BuN/UwZjKQuY/1v3rM7HMfg==} + engines: {node: '>= 0.4'} + dependencies: + define-data-property: 1.1.4 + es-errors: 1.3.0 + function-bind: 1.1.2 + get-intrinsic: 1.2.4 + gopd: 1.0.1 + has-property-descriptors: 1.0.2 + dev: false + + /single-concurrent@1.0.0: + resolution: {integrity: sha512-lYx5vhQB1jhpVnS11rAZLTDId3E3cJFCteOvl6tsXmRPm1hfCQGFXFAgP12gUQJ4MRh3Cvt8eXwmnE8RIimzGw==} + dependencies: + '@validatem/core': 0.5.0 + '@validatem/is-function': 0.1.0 + '@validatem/required': 0.1.1 + capture-promise: 1.0.0 + debug: 4.3.5 + transitivePeerDependencies: + - supports-color + dev: false + + /supports-color@7.2.0: + resolution: {integrity: sha512-qpCAvRl9stuOHveKsn7HncJRvv501qIacKzQlO/+Lwxc9+0q2wLyv4Dfvt80/DPn2pqOBsJdDiogXGR9+OvwRw==} + engines: {node: '>=8'} + dependencies: + has-flag: 4.0.0 + dev: false + + /syncpipe@1.0.0: + resolution: {integrity: sha512-cdiAFTnFJRvUaNPDc2n9CqoFvtIL3+JUMJZrC3kA3FzpugHOqu0TvkgNwmnxPZ5/WjAzMcfMS3xm+AO7rg/j/w==} + dependencies: + assure-array: 1.0.0 + dev: false diff --git a/test.js b/test.js new file mode 100644 index 0000000..065b2b5 --- /dev/null +++ b/test.js @@ -0,0 +1,110 @@ +"use strict"; + +const test = require("node:test"); +const assert = require("node:assert"); + +const pushBuffer = require("./"); + +function promiseDelay(time) { + return new Promise((resolve, _reject) => { + setTimeout(resolve, time); + }); +} + +// Contributions of additional tests are welcome! As long as they test exposed API, not internals. + +test("happy path", async (t) => { + let seenValues = new Map(); + + let expectedValues = new Map(); + for (let m = 0; m < 100; m++) { expectedValues.set(m, m); } + + let i = 0; + let buffer = pushBuffer({ + lanes: 2, + pull: async () => { + let result = i++; + await promiseDelay(Math.random() * 50); + return result; + }, + select: (value) => value % 2 + }); + + let promises = []; + for (let n = 0; n < 100; n++) { + // not awaiting here; we *want* multiple of them to happen concurrently + promises.push(buffer.request(n % 2).then((value) => seenValues.set(n, value))); + await promiseDelay(Math.random() * 50); + } + + // wait for all (potentially concurrent) buffer reads to complete before checking the outcome + await Promise.all(promises); + assert.deepStrictEqual(seenValues, expectedValues); +}); + +test("push mode", async (t) => { + let buffer = pushBuffer({ + mode: "push" + }); + + let expected = []; + for (let n = 0; n < 100; n++) { expected.push(n); } + + let promises = []; + for (let n = 0; n < 100; n++) { + promises.push(buffer.request()); + } + + for (let n = 0; n < 100; n++) { + buffer.push(n); + } + + let outcome = await Promise.all(promises); + assert.deepStrictEqual(outcome, expected); +}); + +test("multiple lane selection", async (t) => { + let allLanes = [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 ]; + let i = 0; + + let buffer = pushBuffer({ + lanes: 10, + pull: async () => i++, + select: (value) => { + // Only send to lanes that the value is divisible by + return allLanes.filter((lane) => value % (lane+1) === 0); + } + }); + + let seen = new Map(); + let promises = []; + + let expected = new Map([ + [ 0, [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 ] ], + [ 1, [ 0, 2, 4, 6, 8, 10, 12, 14, 16, 18 ] ], + [ 2, [ 0, 3, 6, 9, 12, 15, 18, 21, 24, 27 ] ], + [ 3, [ 0, 4, 8, 12, 16, 20, 24, 28, 32, 36 ] ], + [ 4, [ 0, 5, 10, 15, 20, 25, 30, 35, 40, 45 ] ], + [ 5, [ 0, 6, 12, 18, 24, 30, 36, 42, 48, 54 ] ], + [ 6, [ 0, 7, 14, 21, 28, 35, 42, 49, 56, 63 ] ], + [ 7, [ 0, 8, 16, 24, 32, 40, 48, 56, 64, 72 ] ], + [ 8, [ 0, 9, 18, 27, 36, 45, 54, 63, 72, 81 ] ], + [ 9, [ 0, 10, 20, 30, 40, 50, 60, 70, 80, 90 ] ], + ]); + + for (let n = 0; n < 10; n++) { + for (let x = 0; x < 10; x++) { + promises.push(buffer.request(n).then((value) => { + if (!seen.has(n)) { + seen.set(n, []); + } + + seen.get(n).push(value); + })); + } + } + + await Promise.all(promises); + + assert.deepStrictEqual(seen, expected); +});