You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
119 lines
4.3 KiB
JavaScript
119 lines
4.3 KiB
JavaScript
"use strict";
|
|
|
|
const Promise = require("bluebird");
|
|
const Aborted = require("@promistream/aborted");
|
|
const EndOfStream = require("@promistream/end-of-stream");
|
|
const isEndOfStream = require("@promistream/is-end-of-stream");
|
|
const errorChain = require("error-chain");
|
|
const unreachable = require("@joepie91/unreachable")("@promistream/simple-source");
|
|
const debug = require("debug")("promistream:simple-source");
|
|
|
|
const { validateOptions } = require("@validatem/core");
|
|
const required = require("@validatem/required");
|
|
const isFunction = require("@validatem/is-function");
|
|
const wrapValueAsOption = require("@validatem/wrap-value-as-option");
|
|
|
|
module.exports = function simpleSource(_options) {
|
|
let { onRequest, onAbort } = validateOptions(arguments, [
|
|
required,
|
|
wrapValueAsOption("onRequest"), {
|
|
onRequest: [ required, isFunction ],
|
|
onAbort: [ isFunction ]
|
|
}
|
|
]);
|
|
|
|
let errorReason;
|
|
let ended = false;
|
|
let peekQueue = [];
|
|
|
|
function getValue() {
|
|
return Promise.try(() => {
|
|
return onRequest();
|
|
}).catch(isEndOfStream, (marker) => {
|
|
ended = true;
|
|
throw marker;
|
|
});
|
|
}
|
|
|
|
return {
|
|
_promistreamVersion: 0,
|
|
_promistreamIsSource: true,
|
|
description: `simple source`,
|
|
peek: function peekValue_simpleSource() {
|
|
return Promise.try(() => {
|
|
if (errorReason != null || ended === true) {
|
|
return true;
|
|
} else {
|
|
// FIXME: Store Promises in the peekQueue instead? Or would this make it more difficult to deal with concurrent peeks/reads? And should the peek spec be changed to account for this?
|
|
return Promise.try(() => {
|
|
return getValue();
|
|
}).then((result) => {
|
|
debug("pushing value to peek queue");
|
|
peekQueue.push({ type: "value", value: result });
|
|
}).catch((error) => {
|
|
debug("pushing error to peek queue");
|
|
peekQueue.push({ type: "error", error: error });
|
|
}).then(() => {
|
|
// FIXME: What if there's an EndOfStream marker being produced? Or an abort? Or some other sort of failure? Shouldn't that return/produce something *other than* `true`?
|
|
return true;
|
|
});
|
|
}
|
|
});
|
|
},
|
|
read: function produceValue_simpleSource() {
|
|
return Promise.try(() => {
|
|
if (peekQueue.length > 0) {
|
|
debug("returning read result from peek queue");
|
|
/* FIXME: Move all this logic out into an itemBuffer abstraction of some sort (also useful in from-node-stream?) */
|
|
let item = peekQueue.shift();
|
|
|
|
if (item.type === "value") {
|
|
return item.value;
|
|
} else if (item.type === "error") {
|
|
throw item.error;
|
|
} else {
|
|
throw unreachable(`Found a queue item of type '${item.type}'`);
|
|
}
|
|
} else {
|
|
if (errorReason != null) {
|
|
debug("stream was aborted; throwing Aborted");
|
|
if (errorReason === true) {
|
|
throw new Aborted("Stream was aborted");
|
|
} else if (errorReason instanceof Error) {
|
|
throw new errorChain.chain(errorReason, Aborted, `Stream was aborted due to error: ${errorReason.message}`);
|
|
}
|
|
} else if (ended === true) {
|
|
debug("stream was ended; throwing EndOfStream");
|
|
throw new EndOfStream;
|
|
} else {
|
|
debug("fetching value for read");
|
|
return getValue();
|
|
}
|
|
}
|
|
});
|
|
},
|
|
abort: function abort_simpleSource(reason, _source) {
|
|
debug(`received abort`, reason);
|
|
|
|
// TODO: Support different behaviour when a source stream is provided, and so we are not the source stream ourselves?
|
|
return Promise.try(() => {
|
|
if (errorReason == null) {
|
|
if (reason === true || reason instanceof Error) {
|
|
errorReason = reason;
|
|
|
|
if (onAbort != null) {
|
|
return onAbort(reason);
|
|
}
|
|
} else {
|
|
throw new Error("You must specify a reason (either `true` or an Error object) when aborting a stream");
|
|
}
|
|
} else {
|
|
// Further thoughts: We should probably allow this to fail silently - if an error occurs several times in a transform stream, multiple aborts may be issued (reading from an internal buffer) before the first Aborted signal has a chance to be supplied by the source stream, and so multiple aborts may occur in normal operation, and should be tolerated
|
|
// TODO: Make the warning silenceable?
|
|
console.warn("WARNING: A Promistream pipeline was double-aborted. This can happen normally in some high-throughput pipelines, but it can also indicate a bug. Aborted because of:", reason);
|
|
}
|
|
});
|
|
}
|
|
};
|
|
};
|