From ea3968b240fae8c330cb74eac82b72090ebfa1af Mon Sep 17 00:00:00 2001 From: Sven Slootweg Date: Sun, 4 Jul 2021 01:16:42 +0200 Subject: [PATCH] Deprecate default onEnd, allow returning from onResult, refactor/fix error handling, fix duplicate onEnd calls --- index.js | 140 ++++++++++++++++++++++++++++----------------------- package.json | 4 +- 2 files changed, 78 insertions(+), 66 deletions(-) diff --git a/index.js b/index.js index 7716894..4361d5b 100644 --- a/index.js +++ b/index.js @@ -1,6 +1,8 @@ "use strict"; const Promise = require("bluebird"); +const createResultBuffer = require("result-buffer"); + const propagateAbort = require("@promistream/propagate-abort"); const propagatePeek = require("@promistream/propagate-peek"); const isEndOfStream = require("@promistream/is-end-of-stream"); @@ -9,7 +11,6 @@ const isAborted = require("@promistream/is-aborted"); const { validateOptions } = require("@validatem/core"); const required = require("@validatem/required"); const isFunction = require("@validatem/is-function"); -const defaultTo = require("@validatem/default-to"); const wrapValueAsOption = require("@validatem/wrap-value-as-option"); // FIXME: Update other stream implementations to new API @@ -20,20 +21,18 @@ module.exports = function simpleSinkStream(_options) { onResult: [ required, isFunction ], onAbort: [ isFunction ], onSourceChanged: [ isFunction ], - onEnd: [ isFunction, defaultTo.literal(function defaultOnEnd() { - // FIXME: Deprecate this, it does not work as-expected when the onResult callback never gets called (eg. due to a stream immediately terminating), as the onResult callback will then never get a chance to set the lastResult, not even to an initializer value (eg. an empty array for `collect`) - // We return whatever value we got last from the specified onResult callback. - return lastResult; - })] + onEnd: [ isFunction ] } ]); - let lastResult; + // FIXME: Bump minor version! + let onEndCalled = false; - let onEndResult; let abortHandled = false; let lastKnownSource; + let resultBuffer = createResultBuffer(); + return { _promistreamVersion: 0, description: `simple sink stream`, @@ -45,72 +44,85 @@ module.exports = function simpleSinkStream(_options) { return source.read(); }).then((value) => { // FIXME: Document that you can pause the sink from the onResult callback, by returning a Promise that resolves when it should be resumed - return onResult(value); + return onResult(value, source.abort.bind(source)); }).then((result) => { - lastResult = result; - - return attemptRead(); + // FIXME: Replace all instances of undefined checks with a standardized NoValue marker + if (result !== undefined) { + // TODO: Force end of stream when this occurs? + return result; + } else { + return attemptRead(); + } }); } - return Promise.try(() => { - if (source !== lastKnownSource && onSourceChanged != null) { - lastKnownSource = source; - return onSourceChanged(source); - } - }).then(() => { - return attemptRead(); - }).catch(isEndOfStream, () => { - /* Don't attempt to do another read, we're done. */ - // FIXME: Is it actually correct to keep returning the same thing? - if (onEndCalled) { - return onEndResult; - } else { - return Promise.try(() => { - return onEnd(); - }).then((result) => { - onEndResult = result; - return result; - }); - } - }).catch((error) => !isAborted(error), (error) => { + return resultBuffer.maybeRead(() => { return Promise.try(() => { - return source.abort(error); - }).catch((abortError) => { - let message = [ - `Tried to abort stream due to encountering an error, but the aborting itself failed`, - `Original error message: ${error.message}`, - `Abort failure message: ${abortError.message}` - ].join("\n"); - - // FIXME: Make this some sort of chained error - let combinedError = new Error(message); - combinedError.stack = abortError.stack; // HACK - throw combinedError; + if (source !== lastKnownSource && onSourceChanged != null) { + lastKnownSource = source; + return onSourceChanged(source); + } }).then(() => { - // Pass through the original error to the user - throw error; - }); - }).catch(isAborted, (marker) => { - if (abortHandled === false) { - abortHandled = true; + return attemptRead(); + }).catch(isEndOfStream, (error) => { + /* Don't attempt to do another read, we're done. */ + if (!onEndCalled && onEnd != null) { + onEndCalled = true; + + return Promise.try(() => { + return onEnd(); + }).then((result) => { + if (result !== undefined) { + resultBuffer.push(result); + } + }); + } + return resultBuffer.maybeRead(() => { + throw error; + }); + }).catch((error) => !isAborted(error), (error) => { return Promise.try(() => { - if (onAbort != null) { - return onAbort(); - } + return source.abort(error); + }).catch((abortError) => { + let message = [ + `Tried to abort stream due to encountering an error, but the aborting itself failed`, + `Original error message: ${error.message}`, + `Abort failure message: ${abortError.message}` + ].join("\n"); + + // FIXME: Make this some sort of chained error + let combinedError = new Error(message); + combinedError.stack = abortError.stack; // HACK + throw combinedError; }).then(() => { - if (marker.reason instanceof Error) { - // NOTE: This ensures that the original error causing the abort is thrown exactly once - throw marker.reason; - } else { - throw marker; - } + // Pass through the original error to the user + throw error; + }); + }).catch(isAborted, (marker) => { + if (abortHandled === false) { + abortHandled = true; + + return Promise.try(() => { + if (onAbort != null) { + return onAbort(marker.reason); + } + }).then((value) => { + if (value !== undefined) { + resultBuffer.push(value); + } + + if (marker.reason instanceof Error) { + // NOTE: This ensures that the original error causing the abort is thrown exactly once + resultBuffer.push(Promise.reject(marker.reason)); + } + }); + } + + return resultBuffer.maybeRead(() => { + throw marker; }); - } else { - // Don't interfere, we only need special behaviour on the first occurrence - throw marker; - } + }); }); } }; diff --git a/package.json b/package.json index 7c478b2..ececca1 100644 --- a/package.json +++ b/package.json @@ -11,10 +11,10 @@ "@promistream/propagate-abort": "^0.1.6", "@promistream/propagate-peek": "^0.1.1", "@validatem/core": "^0.3.11", - "@validatem/default-to": "^0.1.0", "@validatem/is-function": "^0.1.0", "@validatem/required": "^0.1.1", "@validatem/wrap-value-as-option": "^0.1.0", - "bluebird": "^3.5.4" + "bluebird": "^3.5.4", + "result-buffer": "^0.1.0" } }