"use strict"; const Promise = require("bluebird"); const propagateAbort = require("@ppstreams/propagate-abort"); const propagatePeek = require("@ppstreams/propagate-peek"); const isEndOfStream = require("@ppstreams/is-end-of-stream"); const isAborted = require("@ppstreams/is-aborted"); const { validateOptions } = require("@validatem/core"); const required = require("@validatem/required"); const isFunction = require("@validatem/is-function"); const defaultTo = require("@validatem/default-to"); const wrapValueAsOption = require("@validatem/wrap-value-as-option"); // FIXME: Update other stream implementations to new API module.exports = function simpleSinkStream(_options) { let { onResult, onEnd, onAbort, onSourceChanged } = validateOptions(arguments, [ required, wrapValueAsOption("onResult"), { onResult: [ required, isFunction ], onAbort: [ isFunction ], onSourceChanged: [ isFunction ], onEnd: [ isFunction, defaultTo.literal(function defaultOnEnd() { // FIXME: Deprecate this, it does not work as-expected when the onResult callback never gets called (eg. due to a stream immediately terminating), as the onResult callback will then never get a chance to set the lastResult, not even to an initializer value (eg. an empty array for `collect`) // We return whatever value we got last from the specified onResult callback. return lastResult; })] } ]); let lastResult; let onEndCalled = false; let onEndResult; let abortHandled = false; let lastKnownSource; return { _promistreamVersion: 0, description: `simple sink stream`, abort: propagateAbort, peek: propagatePeek, read: function produceValue_simpleSinkStream(source) { function attemptRead() { return Promise.try(() => { return source.read(); }).then((value) => { // FIXME: Document that you can pause the sink from the onResult callback, by returning a Promise that resolves when it should be resumed return onResult(value); }).then((result) => { lastResult = result; return attemptRead(); }); } return Promise.try(() => { if (source !== lastKnownSource && onSourceChanged != null) { lastKnownSource = source; return onSourceChanged(source); } }).then(() => { return attemptRead(); }).catch(isEndOfStream, () => { /* Don't attempt to do another read, we're done. */ // FIXME: Is it actually correct to keep returning the same thing? if (onEndCalled) { return onEndResult; } else { return Promise.try(() => { return onEnd(); }).then((result) => { onEndResult = result; return result; }); } }).catch((error) => !isAborted(error), (error) => { return Promise.try(() => { return source.abort(error); }).catch((abortError) => { let message = [ `Tried to abort stream due to encountering an error, but the aborting itself failed`, `Original error message: ${error.message}`, `Abort failure message: ${abortError.message}` ].join("\n"); // FIXME: Make this some sort of chained error let combinedError = new Error(message); combinedError.stack = abortError.stack; // HACK throw combinedError; }).then(() => { // Pass through the original error to the user throw error; }); }).catch(isAborted, (marker) => { if (abortHandled === false) { abortHandled = true; return Promise.try(() => { if (onAbort != null) { return onAbort(); } }).then(() => { if (marker.reason instanceof Error) { // NOTE: This ensures that the original error causing the abort is thrown exactly once throw marker.reason; } else { throw marker; } }); } else { // Don't interfere, we only need special behaviour on the first occurrence throw marker; } }); } }; };