From f28cde310b6cf4dae66ff96d2414d79731aa7e4b Mon Sep 17 00:00:00 2001 From: Sven Slootweg Date: Tue, 2 Feb 2021 15:45:28 +0100 Subject: [PATCH] Handle source changes --- index.js | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/index.js b/index.js index 72c0a88..4032db5 100644 --- a/index.js +++ b/index.js @@ -14,12 +14,14 @@ const wrapValueAsOption = require("@validatem/wrap-value-as-option"); // FIXME: Update other stream implementations to new API module.exports = function simpleSinkStream(_options) { - let { onResult, onEnd, onAbort } = validateOptions(arguments, [ + let { onResult, onEnd, onAbort, onSourceChanged } = validateOptions(arguments, [ required, wrapValueAsOption("onResult"), { onResult: [ required, isFunction ], onAbort: [ isFunction ], + onSourceChanged: [ isFunction ], onEnd: [ isFunction, defaultTo.literal(function defaultOnEnd() { + // FIXME: Deprecate this, it does not work as-expected when the onResult callback never gets called (eg. due to a stream immediately terminating), as the onResult callback will then never get a chance to set the lastResult, not even to an initializer value (eg. an empty array for `collect`) // We return whatever value we got last from the specified onResult callback. return lastResult; })] @@ -30,8 +32,10 @@ module.exports = function simpleSinkStream(_options) { let onEndCalled = false; let onEndResult; let abortHandled = false; + let lastKnownSource; return { + _promistreamVersion: 0, description: `simple sink stream`, abort: propagateAbort, peek: propagatePeek, @@ -50,9 +54,15 @@ module.exports = function simpleSinkStream(_options) { } return Promise.try(() => { + if (source !== lastKnownSource && onSourceChanged != null) { + lastKnownSource = source; + return onSourceChanged(source); + } + }).then(() => { return attemptRead(); }).catch(isEndOfStream, () => { /* Don't attempt to do another read, we're done. */ + // FIXME: Is it actually correct to keep returning the same thing? if (onEndCalled) { return onEndResult; } else {