From 4a67904a85e0dfde6409bd5137561f4e5c8e8d8f Mon Sep 17 00:00:00 2001 From: Sven Slootweg Date: Sun, 19 Jul 2020 21:04:21 +0200 Subject: [PATCH] Rename, validate input, end/abort handlers, misc. fixes --- index.js | 87 +++++++++++++++++++++++++++++++++++++++++++++++----- package.json | 8 +++-- 2 files changed, 85 insertions(+), 10 deletions(-) diff --git a/index.js b/index.js index 0e6e5f3..0993458 100644 --- a/index.js +++ b/index.js @@ -4,31 +4,102 @@ const Promise = require("bluebird"); const propagateAbort = require("@ppstreams/propagate-abort"); const propagatePeek = require("@ppstreams/propagate-peek"); const { isEndOfStream } = require("@ppstreams/end-of-stream-marker"); +const { isAborted } = require("@ppstreams/aborted-marker"); + +const { validateOptions } = require("@validatem/core"); +const required = require("@validatem/required"); +const isFunction = require("@validatem/is-function"); +const defaultTo = require("@validatem/default-to"); + +// FIXME: Update other stream implementations to new API +module.exports = function greedySinkStream(_options) { + let { onResult, onEnd, onAbort } = validateOptions(arguments, { + onResult: [ required, isFunction ], + onEnd: [ isFunction, defaultTo.literal(defaultOnEnd) ], + onAbort: [ isFunction, defaultTo.literal(defaultOnAbort) ] + }); + + let lastResult; + let onEndCalled = false; + let onEndResult; + let abortHandled = false; + + function defaultOnEnd() { + // We return whatever value we got last from the specified onResult callback. + return lastResult; + } + + function defaultOnAbort() { + // no-op + } -module.exports = function greedySinkStream(description, callback) { return { - description: `greedy sink stream (${description})`, + description: `greedy sink stream`, abort: propagateAbort, peek: propagatePeek, read: function produceValue_greedySinkStream(source) { - let lastResult; - function attemptRead() { return Promise.try(() => { return source.read(); }).then((value) => { - return callback(value); + // FIXME: Document that you can pause the sink from the onResult callback, by returning a Promise that resolves when it should be resumed + return onResult(value); }).then((result) => { lastResult = result; return attemptRead(); }).catch(isEndOfStream, () => { - /* Don't attempt to do another read, we're done. We return whatever value we got last from the specified callback. */ - return lastResult; + /* Don't attempt to do another read, we're done. */ + if (onEndCalled) { + return onEndResult; + } else { + return Promise.try(() => { + return onEnd(); + }).then((result) => { + onEndResult = result; + return result; + }); + } + }).catch((error) => !isAborted(error), (error) => { + return Promise.try(() => { + return source.abort(error); + }).catch((abortError) => { + let message = [ + `Tried to abort stream due to encountering an error, but the aborting itself failed`, + `Original error message: ${error.message}`, + `Abort failure message: ${abortError.message}` + ].join("\n"); + + // FIXME: Make this some sort of chained error + let combinedError = new Error(message); + combinedError.stack = abortError.stack; // HACK + throw combinedError; + }).then(() => { + // Pass through the original error to the user + throw error; + }); + }).catch(isAborted, (marker) => { + if (abortHandled === false) { + abortHandled = true; + + return Promise.try(() => { + return onAbort(); + }).then(() => { + if (marker.reason instanceof Error) { + // NOTE: This ensures that the original error causing the abort is thrown exactly once + throw marker.reason; + } else { + throw marker; + } + }); + } else { + // Don't interfere, we only need special behaviour on the first occurrence + throw marker; + } }); } return attemptRead(); } }; -}; \ No newline at end of file +}; diff --git a/package.json b/package.json index 404d487..7be796c 100644 --- a/package.json +++ b/package.json @@ -1,13 +1,17 @@ { - "name": "@ppstreams/greedy-sink", + "name": "@ppstreams/simple-sink", "version": "0.1.0", "main": "index.js", - "repository": "http://git.cryto.net/ppstreams/greedy-sink.git", + "repository": "http://git.cryto.net/ppstreams/simple-sink.git", "author": "Sven Slootweg ", "license": "WTFPL OR CC0-1.0", "dependencies": { "@ppstreams/propagate-abort": "^0.1.2", "@ppstreams/propagate-peek": "^0.1.0", + "@validatem/core": "^0.3.11", + "@validatem/default-to": "^0.1.0", + "@validatem/is-function": "^0.1.0", + "@validatem/required": "^0.1.1", "bluebird": "^3.5.4" } }