diff --git a/index.js b/index.js index 5768f88..f08c467 100644 --- a/index.js +++ b/index.js @@ -1,9 +1,13 @@ "use strict"; module.exports = function pipe(streams) { + // FIXME: Validate input properly if (streams.length < 1) { throw new Error("Must specify at least one stream when defining a pipeline"); } else { + let firstStream = streams[0]; + let requiresSource = (firstStream.read.length > 0 && firstStream.__ppstreams_hasSource !== true); + /* NOTE: We never clean up the cache, because it's very unlikely that this cache will ever grow big. In the vast majority of cases, it's going to contain at most one item. So instead, we let the garbage collector worry about getting rid of it once the pipeline itself becomes obsolete. */ let boundPipelineCache = new Map(); @@ -37,13 +41,12 @@ module.exports = function pipe(streams) { } function verifyFullPipeline(source) { - let firstStream = streams[0]; - let requiresSource = (firstStream.read.length > 0); - return (source != null || !requiresSource); } return { + // NOTE: This is set to convey to other `pipe` calls (as well as any other composability tools) that this is explicitly a composed stream that does not require a source, even if the `read` signature suggests otherwise. FIXME: Figure out how and whether to spec this, or whether there is a better way to deal with this. + __ppstreams_hasSource: !requiresSource, description: `piped stream [${streams.map((stream) => stream.description).join(" => ")}]`, read: function (source) { if(!verifyFullPipeline(source)) {