"use strict"; const Promise = require("bluebird"); const createResultBuffer = require("result-buffer"); const propagateAbort = require("@promistream/propagate-abort"); const propagatePeek = require("@promistream/propagate-peek"); const isEndOfStream = require("@promistream/is-end-of-stream"); const isAborted = require("@promistream/is-aborted"); const { validateOptions } = require("@validatem/core"); const required = require("@validatem/required"); const isFunction = require("@validatem/is-function"); const wrapValueAsOption = require("@validatem/wrap-value-as-option"); // FIXME: Update other stream implementations to new API module.exports = function simpleSinkStream(_options) { let { onResult, onEnd, onAbort, onSourceChanged } = validateOptions(arguments, [ required, wrapValueAsOption("onResult"), { onResult: [ required, isFunction ], onAbort: [ isFunction ], onSourceChanged: [ isFunction ], onEnd: [ isFunction ] } ]); // FIXME: Bump minor version! let onEndCalled = false; let abortHandled = false; let lastKnownSource; let resultBuffer = createResultBuffer(); return { _promistreamVersion: 0, description: `simple sink stream`, abort: propagateAbort, peek: propagatePeek, read: function produceValue_simpleSinkStream(source) { function attemptRead() { return Promise.try(() => { return source.read(); }).then((value) => { // FIXME: Document that you can pause the sink from the onResult callback, by returning a Promise that resolves when it should be resumed return onResult(value, source.abort.bind(source)); }).then((result) => { // FIXME: Replace all instances of undefined checks with a standardized NoValue marker if (result !== undefined) { // TODO: Force end of stream when this occurs? return result; } else { return attemptRead(); } }); } return resultBuffer.maybeRead(() => { return Promise.try(() => { if (source !== lastKnownSource && onSourceChanged != null) { lastKnownSource = source; return onSourceChanged(source); } }).then(() => { return attemptRead(); }).catch(isEndOfStream, (error) => { /* Don't attempt to do another read, we're done. */ return Promise.try(() => { if (!onEndCalled && onEnd != null) { onEndCalled = true; return Promise.try(() => { return onEnd(); }).then((result) => { if (result !== undefined) { resultBuffer.push(result); } }); } }).then(() => { return resultBuffer.maybeRead(() => { throw error; }); }); }).catch((error) => !isAborted(error), (error) => { return Promise.try(() => { return source.abort(error); }).catch((abortError) => { let message = [ `Tried to abort stream due to encountering an error, but the aborting itself failed`, `Original error message: ${error.message}`, `Abort failure message: ${abortError.message}` ].join("\n"); // FIXME: Make this some sort of chained error let combinedError = new Error(message); combinedError.stack = abortError.stack; // HACK throw combinedError; }).then(() => { // Pass through the original error to the user throw error; }); }).catch(isAborted, (marker) => { if (abortHandled === false) { abortHandled = true; return Promise.try(() => { if (onAbort != null) { return onAbort(marker.reason); } }).then((value) => { if (value !== undefined) { resultBuffer.push(value); } if (marker.reason instanceof Error) { // NOTE: This ensures that the original error causing the abort is thrown exactly once resultBuffer.push(Promise.reject(marker.reason)); } }); } return resultBuffer.maybeRead(() => { throw marker; }); }); }); } }; };