You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

296 lines
8.9 KiB
JavaScript

"use strict";
const capturePromise = require("capture-promise");
const singleConcurrent = require("single-concurrent");
const assureArray = require("assure-array");
const promiseDefer = require("@joepie91/promise-defer");
const debug = require("debug")("push-buffer");
const NoValue = require("@promistream/no-value");
const { validateOptions } = require("@validatem/core");
const required = require("@validatem/required");
const isInteger = require("@validatem/is-integer");
const isFunction = require("@validatem/is-function");
const defaultTo = require("@validatem/default-to");
const oneOf = require("@validatem/one-of");
const dynamic = require("@validatem/dynamic");
const isBoolean = require("@validatem/is-boolean");
/* TODO:
- Add limits for buffer size? This is difficult to implement since we don't know upfront what lane a value will be assigned to, and buffers are ideally per-lane.
*/
function suppressUncaughtRejection(promise) {
promise.catch((error) => {
// We do log the suppressed error, for debugging purposes
debug("error temporarily suppressed;", error);
});
return promise;
}
module.exports = function createPushBuffer(_options) {
function requiredIfMode(acceptableModes) {
return dynamic((_value) => {
// This is a bit hacky, but currently necessary to make it work how we want
let mode = _options.mode ?? "pull";
let acceptableArray = assureArray(acceptableModes);
if (acceptableArray.includes(mode)) {
return [ required ];
} else {
return [];
}
});
}
let { lanes, pull, select, selectError, mode, sequential, broadcastErrors, broadcastValues } = validateOptions(arguments, {
lanes: [ defaultTo(1), isInteger ],
// Currently only either push or pull mode is supported; if you need both at the same time, file an issue!
mode: [ defaultTo("pull"), oneOf([ "push", "pull" ]) ],
pull: [ requiredIfMode("pull"), isFunction ],
sequential: [ defaultTo(false), isBoolean ],
select: [ defaultTo.literal((_value) => 0), isFunction ],
selectError: [ defaultTo.literal((_value) => 0), isFunction ],
broadcastValues: [ defaultTo(false), isBoolean ],
broadcastErrors: [ defaultTo(true), isBoolean ],
// append == add after all pending pulls (including pushed promises!), insert == send to outstanding requests immediately (so basically insert *before* pending pulls)
});
let pushOrder = "append"; // Temporary; this infrastructure exists to support a potential later combined mode
let pullQueue = [];
let settlementMap = new WeakMap();
let valueQueues = [];
let requestQueues = [];
for (let i = 0; i < lanes; i++) {
valueQueues[i] = [];
requestQueues[i] = [];
}
function getLane(lane) {
if (lane < lanes) {
return {
values: valueQueues[lane],
requests: requestQueues[lane]
};
} else {
throw new Error(`Tried to access lane ${lane}, but only lanes 0-${lanes-1} exist`);
}
}
function storePull(pullPromise) {
settlementMap.set(pullPromise, false);
pullQueue.push(pullPromise);
debug("queued pull");
let actualPromise = (pullPromise.__preset === true)
? pullPromise.promise
: pullPromise;
// Note that we are storing the *original* pullPromise for later use, and not this 'notification chain' below, because otherwise we would be swallowing errors.
suppressUncaughtRejection(actualPromise.finally(() => {
settlementMap.set(pullPromise, true);
debug("marked pull as settled");
tryRunReconciliation();
}));
}
function doPull() {
let pullPromise = capturePromise(() => pull());
storePull(pullPromise);
}
let tryRunReconciliation = singleConcurrent(async function reconcile() {
if (pullQueue.length === 0) {
debug("reconciliation: empty pullQueue; finishing loop");
let laneLengths = requestQueues.map((queue) => queue.length);
let longestQueue = Math.max(... laneLengths);
if (longestQueue > 0) {
let newPulls = (sequential) ? 1 : longestQueue;
debug(`there are still requests pending; kicking off ${newPulls} new pulls to satisfy longest queue (${longestQueue})`);
debug(` lane lengths:`, laneLengths);
for (let i = 0; i < newPulls; i++) {
doPull();
}
}
return;
}
let nextPromise = pullQueue[0];
if (settlementMap.get(nextPromise) !== true) {
debug(`reconciliation: nextPromise not settled yet; finishing loop`);
return;
}
debug(`reconciliation: nextPromise settled`);
pullQueue.shift(); // Remove it from the queue
let value;
let broadcast = (nextPromise.__preset === true) ? nextPromise.broadcast : undefined;
let promise = (nextPromise.__preset === true) ? nextPromise.promise : nextPromise;
try {
value = await promise;
} catch (error) {
await sendError(error, broadcast);
return await reconcile();
}
if (value !== NoValue) {
await sendValue(value, broadcast);
}
return await reconcile();
});
async function sendValue(value, broadcast = broadcastValues) {
if (broadcast) {
return sendValueBroadcast(value);
} else {
return sendValueLane(value);
}
}
async function sendValueLane(value) {
let lane = await select(value);
debug(`reconciliation: result: resolved, allocated to lane ${lane}`);
if (Array.isArray(lane)) {
for (let i of lane) {
sendToLane(i, Promise.resolve(value));
}
} else {
sendToLane(lane, Promise.resolve(value));
}
}
async function sendValueBroadcast(value) {
debug(`reconciliation: result: resolved, broadcasting`);
requestQueues.forEach((queue, i) => {
debug(`reconciliation: (sending to lane ${i})`);
sendToLane(i, Promise.resolve(error));
});
}
async function sendError(value, broadcast = broadcastErrors) {
if (broadcast) {
return sendErrorBroadcast(value);
} else {
return sendErrorLane(value);
}
}
async function sendErrorLane(error) {
let lane = await selectError(error);
debug(`reconciliation: result: rejected, allocated to lane ${lane}`);
if (Array.isArray(lane)) {
for (let i of lane) {
sendToLane(i, suppressUncaughtRejection(Promise.reject(value)));
}
} else {
sendToLane(lane, suppressUncaughtRejection(Promise.reject(value)));
}
}
async function sendErrorBroadcast(error) {
debug(`reconciliation: result: rejected, broadcasting`);
requestQueues.forEach((queue, i) => {
debug(`reconciliation: (sending to lane ${i})`);
sendToLane(i, suppressUncaughtRejection(Promise.reject(error)));
});
}
// We use Promises as a Result type of sorts here, which is fine since this is all async anyway
function sendToLane(lane, promise) {
let { requests, values } = getLane(lane);
debug(`sendToQueue: lane ${lane}; request = ${requests.length}, values = ${values.length}`);
if (requests.length > 0) {
debug(`sendToQueue: satisfying request`);
let request = requests.shift();
request.resolve(promise);
} else {
debug(`sendToQueue: storing in value buffer`);
values.push(promise);
}
}
return {
request: function (lane = 0) {
debug(`value requested from lane ${lane}`);
let { values, requests } = getLane(lane);
if (values.length > 0) {
debug(`value immediately available; reconciling`);
return Promise.resolve(values.shift());
} else {
let defer = promiseDefer();
requests.push(defer);
if (mode === "pull") {
// We don't need to handle errors in doPull because errors in the actual pull callback are propagated through the defer. Errors while *initiating* the pull should be thrown from here (which happens automatically because `doPull` is itself synchronous).
if (sequential === false || pullQueue.length === 0) {
doPull();
} else {
// In sequential mode, we let the reconciliation process kick off the next pull once this one is completed.
}
}
return defer.promise;
}
},
push: async function (value, broadcast) {
if (mode === "push") {
debug(`reconciliation: value pushed`);
if (pushOrder === "insert") {
await sendValue(value, broadcast);
} else if (pushOrder === "append") {
storePull({ __preset: true, broadcast: broadcast, promise: Promise.resolve(value) });
} else {
throw new Error(`Unreachable`);
}
} else {
throw new Error(`This buffer is operating in pull mode; pushing values is not supported. Switch to push mode instead, if this is what you need.`);
}
},
pushError: async function (error, broadcast) {
if (mode === "push") {
debug(`reconciliation: error pushed`);
if (pushOrder === "insert") {
await sendError(error, broadcast);
} else if (pushOrder === "append") {
storePull({ __preset: true, broadcast: broadcast, promise: Promise.reject(error) });
} else {
throw new Error(`Unreachable`);
}
} else {
throw new Error(`This buffer is operating in pull mode; pushing errors is not supported. Switch to push mode instead, if this is what you need.`);
}
},
countLane: function (lane = 0) {
let { values, requests } = getLane(lane);
return {
values: values.length,
requests: requests.length
};
}
};
};