You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
289 lines
8.6 KiB
JavaScript
289 lines
8.6 KiB
JavaScript
"use strict";
|
|
|
|
const capturePromise = require("capture-promise");
|
|
const singleConcurrent = require("single-concurrent");
|
|
const assureArray = require("assure-array");
|
|
const promiseDefer = require("@joepie91/promise-defer");
|
|
const debug = require("debug")("push-buffer");
|
|
const NoValue = require("@promistream/no-value");
|
|
|
|
const { validateOptions } = require("@validatem/core");
|
|
const required = require("@validatem/required");
|
|
const isInteger = require("@validatem/is-integer");
|
|
const isFunction = require("@validatem/is-function");
|
|
const defaultTo = require("@validatem/default-to");
|
|
const oneOf = require("@validatem/one-of");
|
|
const dynamic = require("@validatem/dynamic");
|
|
const isBoolean = require("@validatem/is-boolean");
|
|
|
|
/* TODO:
|
|
- Add limits for buffer size? This is difficult to implement since we don't know upfront what lane a value will be assigned to, and buffers are ideally per-lane.
|
|
*/
|
|
|
|
function suppressUncaughtRejection(promise) {
|
|
promise.catch((error) => {
|
|
// We do log the suppressed error, for debugging purposes
|
|
debug("error temporarily suppressed;", error);
|
|
});
|
|
|
|
return promise;
|
|
}
|
|
|
|
module.exports = function createPushBuffer(_options) {
|
|
function requiredIfMode(acceptableModes) {
|
|
return dynamic((_value) => {
|
|
// This is a bit hacky, but currently necessary to make it work how we want
|
|
let mode = _options.mode ?? "pull";
|
|
let acceptableArray = assureArray(acceptableModes);
|
|
|
|
if (acceptableArray.includes(mode)) {
|
|
return [ required ];
|
|
} else {
|
|
return [];
|
|
}
|
|
});
|
|
}
|
|
|
|
let { lanes, pull, select, selectError, mode, broadcastErrors, broadcastValues } = validateOptions(arguments, {
|
|
lanes: [ defaultTo(1), isInteger ],
|
|
// Currently only either push or pull mode is supported; if you need both at the same time, file an issue!
|
|
mode: [ defaultTo("pull"), oneOf([ "push", "pull" ]) ],
|
|
pull: [ requiredIfMode("pull"), isFunction ],
|
|
select: [ defaultTo.literal((_value) => 0), isFunction ],
|
|
selectError: [ defaultTo.literal((_value) => 0), isFunction ],
|
|
broadcastValues: [ defaultTo(false), isBoolean ],
|
|
broadcastErrors: [ defaultTo(true), isBoolean ],
|
|
// append == add after all pending pulls (including pushed promises!), insert == send to outstanding requests immediately (so basically insert *before* pending pulls)
|
|
});
|
|
|
|
let pushOrder = "append"; // Temporary; this infrastructure exists to support a potential later combined mode
|
|
|
|
let pullQueue = [];
|
|
let settlementMap = new WeakMap();
|
|
|
|
let valueQueues = [];
|
|
let requestQueues = [];
|
|
|
|
for (let i = 0; i < lanes; i++) {
|
|
valueQueues[i] = [];
|
|
requestQueues[i] = [];
|
|
}
|
|
|
|
function getLane(lane) {
|
|
if (lane < lanes) {
|
|
return {
|
|
values: valueQueues[lane],
|
|
requests: requestQueues[lane]
|
|
};
|
|
} else {
|
|
throw new Error(`Tried to access lane ${lane}, but only lanes 0-${lanes-1} exist`);
|
|
}
|
|
}
|
|
|
|
function storePull(pullPromise) {
|
|
settlementMap.set(pullPromise, false);
|
|
pullQueue.push(pullPromise);
|
|
debug("queued pull");
|
|
|
|
let actualPromise = (pullPromise.__preset === true)
|
|
? pullPromise.promise
|
|
: pullPromise;
|
|
|
|
// Note that we are storing the *original* pullPromise for later use, and not this 'notification chain' below, because otherwise we would be swallowing errors.
|
|
suppressUncaughtRejection(actualPromise.finally(() => {
|
|
settlementMap.set(pullPromise, true);
|
|
debug("marked pull as settled");
|
|
|
|
tryRunReconciliation();
|
|
}));
|
|
}
|
|
|
|
function doPull() {
|
|
let pullPromise = capturePromise(() => pull());
|
|
storePull(pullPromise);
|
|
}
|
|
|
|
let tryRunReconciliation = singleConcurrent(async function reconcile() {
|
|
if (pullQueue.length === 0) {
|
|
debug("reconciliation: empty pullQueue; finishing loop");
|
|
|
|
let laneLengths = requestQueues.map((queue) => queue.length);
|
|
let longestQueue = Math.max(... laneLengths);
|
|
|
|
if (longestQueue > 0) {
|
|
debug(`there are still requests pending; kicking off ${longestQueue} new pulls to satisfy longest queue`);
|
|
debug(` lane lengths:`, laneLengths);
|
|
|
|
for (let i = 0; i < longestQueue; i++) {
|
|
doPull();
|
|
}
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
let nextPromise = pullQueue[0];
|
|
|
|
if (settlementMap.get(nextPromise) !== true) {
|
|
debug(`reconciliation: nextPromise not settled yet; finishing loop`);
|
|
return;
|
|
}
|
|
|
|
debug(`reconciliation: nextPromise settled`);
|
|
pullQueue.shift(); // Remove it from the queue
|
|
|
|
let value;
|
|
let broadcast = (nextPromise.__preset === true) ? nextPromise.broadcast : undefined;
|
|
let promise = (nextPromise.__preset === true) ? nextPromise.promise : nextPromise;
|
|
|
|
try {
|
|
value = await promise;
|
|
} catch (error) {
|
|
await sendError(error, broadcast);
|
|
return await reconcile();
|
|
}
|
|
|
|
if (value !== NoValue) {
|
|
await sendValue(value, broadcast);
|
|
}
|
|
|
|
return await reconcile();
|
|
});
|
|
|
|
async function sendValue(value, broadcast = broadcastValues) {
|
|
if (broadcast) {
|
|
return sendValueBroadcast(value);
|
|
} else {
|
|
return sendValueLane(value);
|
|
}
|
|
}
|
|
|
|
async function sendValueLane(value) {
|
|
let lane = await select(value);
|
|
debug(`reconciliation: result: resolved, allocated to lane ${lane}`);
|
|
|
|
if (Array.isArray(lane)) {
|
|
for (let i of lane) {
|
|
sendToLane(i, Promise.resolve(value));
|
|
}
|
|
} else {
|
|
sendToLane(lane, Promise.resolve(value));
|
|
}
|
|
}
|
|
|
|
async function sendValueBroadcast(value) {
|
|
debug(`reconciliation: result: resolved, broadcasting`);
|
|
|
|
requestQueues.forEach((queue, i) => {
|
|
debug(`reconciliation: (sending to lane ${i})`);
|
|
sendToLane(i, Promise.resolve(error));
|
|
});
|
|
}
|
|
|
|
async function sendError(value, broadcast = broadcastErrors) {
|
|
if (broadcast) {
|
|
return sendErrorBroadcast(value);
|
|
} else {
|
|
return sendErrorLane(value);
|
|
}
|
|
}
|
|
|
|
async function sendErrorLane(error) {
|
|
let lane = await selectError(error);
|
|
debug(`reconciliation: result: rejected, allocated to lane ${lane}`);
|
|
|
|
if (Array.isArray(lane)) {
|
|
for (let i of lane) {
|
|
sendToLane(i, suppressUncaughtRejection(Promise.reject(value)));
|
|
}
|
|
} else {
|
|
sendToLane(lane, suppressUncaughtRejection(Promise.reject(value)));
|
|
}
|
|
}
|
|
|
|
async function sendErrorBroadcast(error) {
|
|
debug(`reconciliation: result: rejected, broadcasting`);
|
|
|
|
requestQueues.forEach((queue, i) => {
|
|
debug(`reconciliation: (sending to lane ${i})`);
|
|
sendToLane(i, suppressUncaughtRejection(Promise.reject(error)));
|
|
});
|
|
}
|
|
|
|
// We use Promises as a Result type of sorts here, which is fine since this is all async anyway
|
|
function sendToLane(lane, promise) {
|
|
let { requests, values } = getLane(lane);
|
|
|
|
debug(`sendToQueue: lane ${lane}; request = ${requests.length}, values = ${values.length}`);
|
|
|
|
if (requests.length > 0) {
|
|
debug(`sendToQueue: satisfying request`);
|
|
let request = requests.shift();
|
|
request.resolve(promise);
|
|
} else {
|
|
debug(`sendToQueue: storing in value buffer`);
|
|
values.push(promise);
|
|
}
|
|
}
|
|
|
|
return {
|
|
request: function (lane = 0) {
|
|
debug(`value requested from lane ${lane}`);
|
|
let { values, requests } = getLane(lane);
|
|
|
|
if (values.length > 0) {
|
|
debug(`value immediately available; reconciling`);
|
|
return Promise.resolve(values.shift());
|
|
} else {
|
|
let defer = promiseDefer();
|
|
requests.push(defer);
|
|
|
|
if (mode === "pull") {
|
|
// We don't need to handle errors in doPull because errors in the actual pull callback are propagated through the defer. Errors while *initiating* the pull should be thrown from here (which happens automatically because `doPull` is itself synchronous).
|
|
doPull();
|
|
}
|
|
|
|
return defer.promise;
|
|
}
|
|
},
|
|
push: async function (value, broadcast) {
|
|
if (mode === "push") {
|
|
debug(`reconciliation: value pushed`);
|
|
|
|
if (pushOrder === "insert") {
|
|
await sendValue(value, broadcast);
|
|
} else if (pushOrder === "append") {
|
|
storePull({ __preset: true, broadcast: broadcast, promise: Promise.resolve(value) });
|
|
} else {
|
|
throw new Error(`Unreachable`);
|
|
}
|
|
} else {
|
|
throw new Error(`This buffer is operating in pull mode; pushing values is not supported. Switch to push mode instead, if this is what you need.`);
|
|
}
|
|
},
|
|
pushError: async function (error, broadcast) {
|
|
if (mode === "push") {
|
|
debug(`reconciliation: error pushed`);
|
|
|
|
if (pushOrder === "insert") {
|
|
await sendError(error, broadcast);
|
|
} else if (pushOrder === "append") {
|
|
storePull({ __preset: true, broadcast: broadcast, promise: Promise.reject(error) });
|
|
} else {
|
|
throw new Error(`Unreachable`);
|
|
}
|
|
} else {
|
|
throw new Error(`This buffer is operating in pull mode; pushing errors is not supported. Switch to push mode instead, if this is what you need.`);
|
|
}
|
|
},
|
|
countLane: function (lane = 0) {
|
|
let { values, requests } = getLane(lane);
|
|
|
|
return {
|
|
values: values.length,
|
|
requests: requests.length
|
|
};
|
|
}
|
|
};
|
|
};
|