"use strict"; const Promise = require("bluebird"); const Aborted = require("@promistream/aborted"); const EndOfStream = require("@promistream/end-of-stream"); const errorChain = require("error-chain"); const { validateArguments } = require("@validatem/core"); const required = require("@validatem/required"); const isFunction = require("@validatem/is-function"); module.exports = function simpleSource(_deriver) { let [ deriver ] = validateArguments(arguments, { deriver: [ required, isFunction ] }); let hasBeenDerived = false; let errorReason = null; return { _promistreamVersion: 0, description: `derived stream`, peek: function peekValue_derivedStream(_source) { // A derived-stream is guaranteed to produce a value; either a new stream, or an EndOfStream, or an Aborted marker return true; }, read: function produceValue_derivedStream(source) { // TODO: Abstract out the abort handling here and in `abort`? The logic is almost entirely shared between `simple-source` and `derived-stream`. if (errorReason != null) { if (errorReason === true) { throw new Aborted("Stream was aborted"); } else if (errorReason instanceof Error) { throw new errorChain.chain(errorReason, Aborted, `Stream was aborted due to error: ${errorReason.message}`); } } else if (hasBeenDerived === true) { throw new EndOfStream; } else { hasBeenDerived = true; return deriver(source); } }, abort: function abort_simpleSource(reason, _source) { // NOTE: Even though a `source` is provided, we don't do anything with it; higher-order stream producers should *not* propagate aborts upstream, as a read from such a producer would never constitute a read upstream. return Promise.try(() => { // FIXME: Remove Promise.try here? if (errorReason == null) { if (reason === true || reason instanceof Error) { errorReason = reason; } else { throw new Error("You must specify a reason (either `true` or an Error object) when aborting a stream"); } } else { // FIXME: Require this behaviour in the spec? Or is there a composability-related reason to permit/require quietly ignoring this, to make it idempotent? (Duplicate of simple-source) throw new Error(`The stream is already aborted`); } }); } }; };