You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

79 lines
3.4 KiB
JavaScript

"use strict";
module.exports = function pipe(streams) {
// NOTE: We explicitly check for *null* here, not undefined. We still want to detect accidentally-omitted streams, but permit the user to explicitly insert nulls for streams that are not always inserted (eg. optional parallelization in a library). */
let existentStreams = streams.filter((stream) => stream !== null);
// FIXME: Validate input properly
if (existentStreams.length < 1) {
throw new Error("Must specify at least one stream when defining a pipeline");
} else {
let firstStream = existentStreams[0];
let requiresSource = (firstStream.read.length > 0 && firstStream._promistreamHasSource !== true);
/* NOTE: We never clean up the cache, because it's very unlikely that this cache will ever grow big. In the vast majority of cases, it's going to contain at most one item. So instead, we let the garbage collector worry about getting rid of it once the pipeline itself becomes obsolete. */
let boundPipelineCache = new Map();
function getPipeline(source) {
if (!boundPipelineCache.has(source)) {
let pipeline = existentStreams.reduce((bound, stream) => {
if (stream.read == null) {
throw new Error(`Stream is missing a read handler (stream description: ${stream.description})`);
} else if (stream.abort == null) {
throw new Error(`Stream is missing an abort handler (stream description: ${stream.description})`);
} else if (stream.peek == null) {
throw new Error(`Stream is missing a peek handler (stream description: ${stream.description})`);
} else {
if (bound != null) {
return Object.assign({}, stream, {
read: stream.read.bind(null, bound),
abort: stream.abort.bind(null, bound),
peek: stream.peek.bind(null, bound)
});
} else {
/* This only applies to the source stream at the start of a pipeline, which is initialized *without* an upstream source argument. */
return stream;
}
}
}, source);
boundPipelineCache.set(source, pipeline);
}
return boundPipelineCache.get(source);
}
function verifyFullPipeline(source) {
return (source != null || !requiresSource);
}
return {
// NOTE: This is set to convey to other `pipe` calls (as well as any other composability tools) that this is explicitly a composed stream that does not require a source, even if the `read` signature suggests otherwise. FIXME: Figure out how and whether to spec this, or whether there is a better way to deal with this.
_promistreamHasSource: !requiresSource,
_promistreamVersion: 0,
description: `piped stream [${existentStreams.map((stream) => stream.description).join(" => ")}]`,
read: function (source) {
if(!verifyFullPipeline(source)) {
throw new Error("Cannot read from a partial pipeline; maybe you forgot to specify a source stream?");
} else {
return getPipeline(source).read();
}
},
peek: function (source) {
if(!verifyFullPipeline(source)) {
throw new Error("Cannot peek from a partial pipeline; maybe you forgot to specify a source stream?");
} else {
return getPipeline(source).peek();
}
},
abort: function (source, reason) {
if(!verifyFullPipeline(source)) {
throw new Error("Cannot abort a partial pipeline; maybe you forgot to specify a source stream?");
} else {
return getPipeline(source).abort(reason);
}
}
};
}
};