You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

59 lines
2.3 KiB
JavaScript

"use strict";
const Promise = require("bluebird");
const Aborted = require("@promistream/aborted");
const EndOfStream = require("@promistream/end-of-stream");
const errorChain = require("error-chain");
const { validateArguments } = require("@validatem/core");
const required = require("@validatem/required");
const isFunction = require("@validatem/is-function");
module.exports = function simpleSource(_deriver) {
let [ deriver ] = validateArguments(arguments, {
deriver: [ required, isFunction ]
});
let hasBeenDerived = false;
let errorReason = null;
return {
_promistreamVersion: 0,
description: `derived stream`,
peek: function peekValue_derivedStream(_source) {
// A derived-stream is guaranteed to produce a value; either a new stream, or an EndOfStream, or an Aborted marker
return true;
},
read: function produceValue_derivedStream(source) {
// TODO: Abstract out the abort handling here and in `abort`? The logic is almost entirely shared between `simple-source` and `derived-stream`.
if (errorReason != null) {
if (errorReason === true) {
throw new Aborted("Stream was aborted");
} else if (errorReason instanceof Error) {
throw new errorChain.chain(errorReason, Aborted, `Stream was aborted due to error: ${errorReason.message}`);
}
} else if (hasBeenDerived === true) {
throw new EndOfStream;
} else {
hasBeenDerived = true;
return deriver(source);
}
},
abort: function abort_simpleSource(reason, _source) {
// NOTE: Even though a `source` is provided, we don't do anything with it; higher-order stream producers should *not* propagate aborts upstream, as a read from such a producer would never constitute a read upstream.
return Promise.try(() => { // FIXME: Remove Promise.try here?
if (errorReason == null) {
if (reason === true || reason instanceof Error) {
errorReason = reason;
} else {
throw new Error("You must specify a reason (either `true` or an Error object) when aborting a stream");
}
} else {
// FIXME: Require this behaviour in the spec? Or is there a composability-related reason to permit/require quietly ignoring this, to make it idempotent? (Duplicate of simple-source)
throw new Error(`The stream is already aborted`);
}
});
}
};
};