diff --git a/index.js b/index.js index 43f6343..a033962 100644 --- a/index.js +++ b/index.js @@ -3,8 +3,8 @@ const Promise = require("bluebird"); const propagateAbort = require("@ppstreams/propagate-abort"); const propagatePeek = require("@ppstreams/propagate-peek"); -const { isEndOfStream } = require("@ppstreams/end-of-stream-marker"); -const { isAborted } = require("@ppstreams/aborted-marker"); +const isEndOfStream = require("@ppstreams/is-end-of-stream"); +const isAborted = require("@ppstreams/is-aborted"); const { validateOptions } = require("@validatem/core"); const required = require("@validatem/required"); @@ -48,58 +48,60 @@ module.exports = function simpleSinkStream(_options) { lastResult = result; return attemptRead(); - }).catch(isEndOfStream, () => { - /* Don't attempt to do another read, we're done. */ - if (onEndCalled) { - return onEndResult; - } else { - return Promise.try(() => { - return onEnd(); - }).then((result) => { - onEndResult = result; - return result; - }); - } - }).catch((error) => !isAborted(error), (error) => { - return Promise.try(() => { - return source.abort(error); - }).catch((abortError) => { - let message = [ - `Tried to abort stream due to encountering an error, but the aborting itself failed`, - `Original error message: ${error.message}`, - `Abort failure message: ${abortError.message}` - ].join("\n"); + }); + } - // FIXME: Make this some sort of chained error - let combinedError = new Error(message); - combinedError.stack = abortError.stack; // HACK - throw combinedError; - }).then(() => { - // Pass through the original error to the user - throw error; + return Promise.try(() => { + return attemptRead(); + }).catch(isEndOfStream, () => { + /* Don't attempt to do another read, we're done. */ + if (onEndCalled) { + return onEndResult; + } else { + return Promise.try(() => { + return onEnd(); + }).then((result) => { + onEndResult = result; + return result; }); - }).catch(isAborted, (marker) => { - if (abortHandled === false) { - abortHandled = true; + } + }).catch((error) => !isAborted(error), (error) => { + return Promise.try(() => { + return source.abort(error); + }).catch((abortError) => { + let message = [ + `Tried to abort stream due to encountering an error, but the aborting itself failed`, + `Original error message: ${error.message}`, + `Abort failure message: ${abortError.message}` + ].join("\n"); - return Promise.try(() => { - return onAbort(); - }).then(() => { - if (marker.reason instanceof Error) { - // NOTE: This ensures that the original error causing the abort is thrown exactly once - throw marker.reason; - } else { - throw marker; - } - }); - } else { - // Don't interfere, we only need special behaviour on the first occurrence - throw marker; - } + // FIXME: Make this some sort of chained error + let combinedError = new Error(message); + combinedError.stack = abortError.stack; // HACK + throw combinedError; + }).then(() => { + // Pass through the original error to the user + throw error; }); - } + }).catch(isAborted, (marker) => { + if (abortHandled === false) { + abortHandled = true; - return attemptRead(); + return Promise.try(() => { + return onAbort(); + }).then(() => { + if (marker.reason instanceof Error) { + // NOTE: This ensures that the original error causing the abort is thrown exactly once + throw marker.reason; + } else { + throw marker; + } + }); + } else { + // Don't interfere, we only need special behaviour on the first occurrence + throw marker; + } + }); } }; };