"use strict"; const capturePromise = require("capture-promise"); const singleConcurrent = require("single-concurrent"); const assureArray = require("assure-array"); const promiseDefer = require("@joepie91/promise-defer"); const debug = require("debug")("push-buffer"); const NoValue = require("@promistream/no-value"); const { validateOptions } = require("@validatem/core"); const required = require("@validatem/required"); const isInteger = require("@validatem/is-integer"); const isFunction = require("@validatem/is-function"); const defaultTo = require("@validatem/default-to"); const oneOf = require("@validatem/one-of"); const dynamic = require("@validatem/dynamic"); const isBoolean = require("@validatem/is-boolean"); /* TODO: - Add limits for buffer size? This is difficult to implement since we don't know upfront what lane a value will be assigned to, and buffers are ideally per-lane. */ function suppressUncaughtRejection(promise) { promise.catch((error) => { // We do log the suppressed error, for debugging purposes debug("error temporarily suppressed;", error); }); return promise; } module.exports = function createPushBuffer(_options) { function requiredIfMode(acceptableModes) { return dynamic((_value) => { // This is a bit hacky, but currently necessary to make it work how we want let mode = _options.mode ?? "pull"; let acceptableArray = assureArray(acceptableModes); if (acceptableArray.includes(mode)) { return [ required ]; } else { return []; } }); } let { lanes, pull, select, selectError, mode, sequential, broadcastErrors, broadcastValues } = validateOptions(arguments, { lanes: [ defaultTo(1), isInteger ], // Currently only either push or pull mode is supported; if you need both at the same time, file an issue! mode: [ defaultTo("pull"), oneOf([ "push", "pull" ]) ], pull: [ requiredIfMode("pull"), isFunction ], sequential: [ defaultTo(false), isBoolean ], select: [ defaultTo.literal((_value) => 0), isFunction ], selectError: [ defaultTo.literal((_value) => 0), isFunction ], broadcastValues: [ defaultTo(false), isBoolean ], broadcastErrors: [ defaultTo(true), isBoolean ], // append == add after all pending pulls (including pushed promises!), insert == send to outstanding requests immediately (so basically insert *before* pending pulls) }); let pushOrder = "append"; // Temporary; this infrastructure exists to support a potential later combined mode let pullQueue = []; let settlementMap = new WeakMap(); let valueQueues = []; let requestQueues = []; for (let i = 0; i < lanes; i++) { valueQueues[i] = []; requestQueues[i] = []; } function getLane(lane) { if (lane < lanes) { return { values: valueQueues[lane], requests: requestQueues[lane] }; } else { throw new Error(`Tried to access lane ${lane}, but only lanes 0-${lanes-1} exist`); } } function storePull(pullPromise) { settlementMap.set(pullPromise, false); pullQueue.push(pullPromise); debug("queued pull"); let actualPromise = (pullPromise.__preset === true) ? pullPromise.promise : pullPromise; // Note that we are storing the *original* pullPromise for later use, and not this 'notification chain' below, because otherwise we would be swallowing errors. suppressUncaughtRejection(actualPromise.finally(() => { settlementMap.set(pullPromise, true); debug("marked pull as settled"); tryRunReconciliation(); })); } function doPull() { let pullPromise = capturePromise(() => pull()); storePull(pullPromise); } let tryRunReconciliation = singleConcurrent(async function reconcile() { if (pullQueue.length === 0) { debug("reconciliation: empty pullQueue; finishing loop"); let laneLengths = requestQueues.map((queue) => queue.length); let longestQueue = Math.max(... laneLengths); if (longestQueue > 0) { let newPulls = (sequential) ? 1 : longestQueue; debug(`there are still requests pending; kicking off ${newPulls} new pulls to satisfy longest queue (${longestQueue})`); debug(` lane lengths:`, laneLengths); for (let i = 0; i < newPulls; i++) { doPull(); } } return; } let nextPromise = pullQueue[0]; if (settlementMap.get(nextPromise) !== true) { debug(`reconciliation: nextPromise not settled yet; finishing loop`); return; } debug(`reconciliation: nextPromise settled`); pullQueue.shift(); // Remove it from the queue let value; let broadcast = (nextPromise.__preset === true) ? nextPromise.broadcast : undefined; let promise = (nextPromise.__preset === true) ? nextPromise.promise : nextPromise; try { value = await promise; } catch (error) { await sendError(error, broadcast); return await reconcile(); } if (value !== NoValue) { await sendValue(value, broadcast); } return await reconcile(); }); async function sendValue(value, broadcast = broadcastValues) { if (broadcast) { return sendValueBroadcast(value); } else { return sendValueLane(value); } } async function sendValueLane(value) { let lane = await select(value); debug(`reconciliation: result: resolved, allocated to lane ${lane}`); if (Array.isArray(lane)) { for (let i of lane) { sendToLane(i, Promise.resolve(value)); } } else { sendToLane(lane, Promise.resolve(value)); } } async function sendValueBroadcast(value) { debug(`reconciliation: result: resolved, broadcasting`); requestQueues.forEach((queue, i) => { debug(`reconciliation: (sending to lane ${i})`); sendToLane(i, Promise.resolve(error)); }); } async function sendError(value, broadcast = broadcastErrors) { if (broadcast) { return sendErrorBroadcast(value); } else { return sendErrorLane(value); } } async function sendErrorLane(error) { let lane = await selectError(error); debug(`reconciliation: result: rejected, allocated to lane ${lane}`); if (Array.isArray(lane)) { for (let i of lane) { sendToLane(i, suppressUncaughtRejection(Promise.reject(value))); } } else { sendToLane(lane, suppressUncaughtRejection(Promise.reject(value))); } } async function sendErrorBroadcast(error) { debug(`reconciliation: result: rejected, broadcasting`); requestQueues.forEach((queue, i) => { debug(`reconciliation: (sending to lane ${i})`); sendToLane(i, suppressUncaughtRejection(Promise.reject(error))); }); } // We use Promises as a Result type of sorts here, which is fine since this is all async anyway function sendToLane(lane, promise) { let { requests, values } = getLane(lane); debug(`sendToQueue: lane ${lane}; request = ${requests.length}, values = ${values.length}`); if (requests.length > 0) { debug(`sendToQueue: satisfying request`); let request = requests.shift(); request.resolve(promise); } else { debug(`sendToQueue: storing in value buffer`); values.push(promise); } } return { request: function (lane = 0) { debug(`value requested from lane ${lane}`); let { values, requests } = getLane(lane); if (values.length > 0) { debug(`value immediately available; reconciling`); return Promise.resolve(values.shift()); } else { let defer = promiseDefer(); requests.push(defer); if (mode === "pull") { // We don't need to handle errors in doPull because errors in the actual pull callback are propagated through the defer. Errors while *initiating* the pull should be thrown from here (which happens automatically because `doPull` is itself synchronous). if (sequential === false || pullQueue.length === 0) { doPull(); } else { // In sequential mode, we let the reconciliation process kick off the next pull once this one is completed. } } return defer.promise; } }, push: async function (value, broadcast) { if (mode === "push") { debug(`reconciliation: value pushed`); if (pushOrder === "insert") { await sendValue(value, broadcast); } else if (pushOrder === "append") { storePull({ __preset: true, broadcast: broadcast, promise: Promise.resolve(value) }); } else { throw new Error(`Unreachable`); } } else { throw new Error(`This buffer is operating in pull mode; pushing values is not supported. Switch to push mode instead, if this is what you need.`); } }, pushError: async function (error, broadcast) { if (mode === "push") { debug(`reconciliation: error pushed`); if (pushOrder === "insert") { await sendError(error, broadcast); } else if (pushOrder === "append") { storePull({ __preset: true, broadcast: broadcast, promise: Promise.reject(error) }); } else { throw new Error(`Unreachable`); } } else { throw new Error(`This buffer is operating in pull mode; pushing errors is not supported. Switch to push mode instead, if this is what you need.`); } }, countLane: function (lane = 0) { let { values, requests } = getLane(lane); return { values: values.length, requests: requests.length }; } }; };