"use strict"; const Promise = require("bluebird"); const Aborted = require("@promistream/aborted"); const EndOfStream = require("@promistream/end-of-stream"); const isEndOfStream = require("@promistream/is-end-of-stream"); const errorChain = require("error-chain"); const unreachable = require("@joepie91/unreachable")("@promistream/simple-source"); const debug = require("debug")("promistream:simple-source"); const { validateOptions } = require("@validatem/core"); const required = require("@validatem/required"); const isFunction = require("@validatem/is-function"); const wrapValueAsOption = require("@validatem/wrap-value-as-option"); module.exports = function simpleSource(_options) { let { onRequest, onAbort } = validateOptions(arguments, [ required, wrapValueAsOption("onRequest"), { onRequest: [ required, isFunction ], onAbort: [ isFunction ] } ]); let errorReason; let ended = false; let peekQueue = []; function getValue() { return Promise.try(() => { return onRequest(); }).catch(isEndOfStream, (marker) => { ended = true; throw marker; }); } return { _promistreamVersion: 0, _promistreamIsSource: true, description: `simple source`, peek: function peekValue_simpleSource() { return Promise.try(() => { if (errorReason != null || ended === true) { return true; } else { // FIXME: Store Promises in the peekQueue instead? Or would this make it more difficult to deal with concurrent peeks/reads? And should the peek spec be changed to account for this? return Promise.try(() => { return getValue(); }).then((result) => { debug("pushing value to peek queue"); peekQueue.push({ type: "value", value: result }); }).catch((error) => { debug("pushing error to peek queue"); peekQueue.push({ type: "error", error: error }); }).then(() => { // FIXME: What if there's an EndOfStream marker being produced? Or an abort? Or some other sort of failure? Shouldn't that return/produce something *other than* `true`? return true; }); } }); }, read: function produceValue_simpleSource() { return Promise.try(() => { if (peekQueue.length > 0) { debug("returning read result from peek queue"); /* FIXME: Move all this logic out into an itemBuffer abstraction of some sort (also useful in from-node-stream?) */ let item = peekQueue.shift(); if (item.type === "value") { return item.value; } else if (item.type === "error") { throw item.error; } else { throw unreachable(`Found a queue item of type '${item.type}'`); } } else { if (errorReason != null) { debug("stream was aborted; throwing Aborted"); if (errorReason === true) { throw new Aborted("Stream was aborted"); } else if (errorReason instanceof Error) { throw new errorChain.chain(errorReason, Aborted, `Stream was aborted due to error: ${errorReason.message}`); } } else if (ended === true) { debug("stream was ended; throwing EndOfStream"); throw new EndOfStream; } else { debug("fetching value for read"); return getValue(); } } }); }, abort: function abort_simpleSource(reason, _source) { debug(`received abort`, reason); // TODO: Support different behaviour when a source stream is provided, and so we are not the source stream ourselves? return Promise.try(() => { if (errorReason == null) { if (reason === true || reason instanceof Error) { errorReason = reason; if (onAbort != null) { return onAbort(reason); } } else { throw new Error("You must specify a reason (either `true` or an Error object) when aborting a stream"); } } else { // Further thoughts: We should probably allow this to fail silently - if an error occurs several times in a transform stream, multiple aborts may be issued (reading from an internal buffer) before the first Aborted signal has a chance to be supplied by the source stream, and so multiple aborts may occur in normal operation, and should be tolerated // TODO: Make the warning silenceable? console.warn("WARNING: A Promistream pipeline was double-aborted. This can happen normally in some high-throughput pipelines, but it can also indicate a bug. Aborted because of:", reason); } }); } }; };