You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

83 lines
3.7 KiB
JavaScript

"use strict";
module.exports = function pipe(streams) {
// NOTE: We explicitly check for *null* here, not undefined. We still want to detect accidentally-omitted streams, but permit the user to explicitly insert nulls for streams that are not always inserted (eg. optional parallelization in a library). */
let existentStreams = streams.filter((stream) => stream !== null);
// FIXME: Validate input properly
if (existentStreams.length < 1) {
throw new Error("Must specify at least one stream when defining a pipeline");
} else {
let firstStream = existentStreams[0];
let requiresSource = (firstStream.read.length > 0 && firstStream._promistreamHasSource !== true);
/* NOTE: We never clean up the cache, because it's very unlikely that this cache will ever grow big. In the vast majority of cases, it's going to contain at most one item. So instead, we let the garbage collector worry about getting rid of it once the pipeline itself becomes obsolete. */
let boundPipelineCache = new Map();
function getPipeline(source) {
if (!boundPipelineCache.has(source)) {
let pipeline = existentStreams.reduce((bound, stream, i) => {
// TODO: Use Validatem for this instead
// FIXME: Stream index is currently wrong if there were any nulls in the streams list, correct for this
if (!(typeof stream === "object") || Array.isArray(stream)) {
throw new Error(`Value at index ${i} in the pipeline is not a stream`);
} else if (stream.read == null) {
throw new Error(`Stream at index ${i} is missing a read handler (stream description: ${stream.description})`);
} else if (stream.abort == null) {
throw new Error(`Stream at index ${i} is missing an abort handler (stream description: ${stream.description})`);
} else if (stream.peek == null) {
throw new Error(`Stream at index ${i} is missing a peek handler (stream description: ${stream.description})`);
} else {
if (bound != null) {
return Object.assign({}, stream, {
read: stream.read.bind(null, bound),
abort: stream.abort.bind(null, bound),
peek: stream.peek.bind(null, bound)
});
} else {
/* This only applies to the source stream at the start of a pipeline, which is initialized *without* an upstream source argument. */
return stream;
}
}
}, source);
boundPipelineCache.set(source, pipeline);
}
return boundPipelineCache.get(source);
}
function verifyFullPipeline(source) {
return (source != null || !requiresSource);
}
return {
// NOTE: This is set to convey to other `pipe` calls (as well as any other composability tools) that this is explicitly a composed stream that does not require a source, even if the `read` signature suggests otherwise. FIXME: Figure out how and whether to spec this, or whether there is a better way to deal with this.
_promistreamHasSource: !requiresSource,
_promistreamVersion: 0,
description: `piped stream [${existentStreams.map((stream) => stream.description).join(" => ")}]`,
read: function (source) {
if(!verifyFullPipeline(source)) {
throw new Error("Cannot read from a partial pipeline; maybe you forgot to specify a source stream?");
} else {
return getPipeline(source).read();
}
},
peek: function (source) {
if(!verifyFullPipeline(source)) {
throw new Error("Cannot peek from a partial pipeline; maybe you forgot to specify a source stream?");
} else {
return getPipeline(source).peek();
}
},
abort: function (source, reason) {
if(!verifyFullPipeline(source)) {
throw new Error("Cannot abort a partial pipeline; maybe you forgot to specify a source stream?");
} else {
return getPipeline(source).abort(reason);
}
}
};
}
};