"use strict"; const Promise = require("bluebird"); const consumable = require("@joepie91/consumable"); const pPause = require("p-pause"); const debug = require("debug")("promistream:last-will"); const isEndOfStream = require("@promistream/is-end-of-stream"); const isAborted = require("@promistream/is-aborted"); const { validateOptions } = require("@validatem/core"); const isFunction = require("@validatem/is-function"); const wrapValueAsOption = require("@validatem/wrap-value-as-option"); const requireEither = require("@validatem/require-either"); let NoValue = Symbol(); // MARKER: Test this in batchOn, then finalize the refactoring // FIXME: Make NoValue its own reusable promistream package? For use as a marker in different types of streams which need to deal with multiple possibly-matching handlers, or eg. map-filter // FIXME: Re-understand and refactor the logic, finish packaging it up module.exports = function lastWill() { let { onEndOfStream, onAborted, onAllEnds } = validateOptions(arguments, [ wrapValueAsOption("onEndOfStream"), { onEndOfStream: [ isFunction ], onAborted: [ isFunction ], onAllEnds: [ isFunction ], }, requireEither([ "onEndOfStream", "onAborted", "onAllEnds" ], { allowMultiple: true }) ]); let errorBuffer = consumable(); let hasExecutedLastWill = false; // NOTE: We use a resettable pauser to ensure that when reading in parallel mode, all concurrent reads get held up while a stream end marker is being processed, but parallelization is otherwise permitted. Think of it as a conditional `sequentialize`. // FIXME: Verify that this doesn't ever introduce race conditions let pauser = pPause(); function throwBufferedError() { // Run after all custom handlers have run out throw errorBuffer.consume(); } function processUntilValue(handlers) { return Promise.reduce(handlers, (lastResult, handler, i) => { if (lastResult === NoValue) { debug(`Calling handler ${i}`); return handler(errorBuffer.peek()); } else { debug(`Skipping handler ${i}`); return lastResult; } }, NoValue); } function handleMarker(marker, markerHandler) { pauser.pause(); return Promise.try(() => { let relevantHandlers = [ markerHandler, onAllEnds, throwBufferedError ] .filter((handler) => handler != null); debug(`${relevantHandlers.length - 1} handlers to try, excluding default handler`); if (hasExecutedLastWill || relevantHandlers.length === 0) { debug("Bailing"); throw marker; } else { errorBuffer.set(marker); hasExecutedLastWill = true; return processUntilValue(relevantHandlers); } }).finally(() => { pauser.unpause(); }); } return { _promistreamVersion: 0, description: `last-will handler`, abort: function abort_lastWill(source, reason) { // FIXME: propagate module return source.abort(reason); }, peek: function peek_lastWill(source) { return Promise.try(() => { return pauser.await(); }).then(() => { if (errorBuffer.isSet()) { /* We've reached the end of the source stream in some way; we'll have at least *some* response ready, whether it is an error or some value produced by a last-will handler */ return true; } else { return source.peek(); } }); }, read: function produceValue_lastWill(source) { return Promise.try(() => { return pauser.await(); }).then(() => { if (errorBuffer.isSet()) { return throwBufferedError(); } else { return Promise.try(() => { return source.read(); }).catch(isEndOfStream, (err) => { debug("Got EndOfStream marker"); return handleMarker(err, onEndOfStream); }).catch(isAborted, (err) => { debug("Got Aborted marker"); return handleMarker(err, onAborted); }); } }); } }; }; module.exports.NoValue = NoValue;