commit 6870c16155b0cf0301b484d09098138c6e4a6a39 Author: Sven Slootweg Date: Sun Jul 19 22:55:53 2020 +0200 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3c3629e --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +node_modules diff --git a/index.js b/index.js new file mode 100644 index 0000000..5768f88 --- /dev/null +++ b/index.js @@ -0,0 +1,71 @@ +"use strict"; + +module.exports = function pipe(streams) { + if (streams.length < 1) { + throw new Error("Must specify at least one stream when defining a pipeline"); + } else { + /* NOTE: We never clean up the cache, because it's very unlikely that this cache will ever grow big. In the vast majority of cases, it's going to contain at most one item. So instead, we let the garbage collector worry about getting rid of it once the pipeline itself becomes obsolete. */ + let boundPipelineCache = new Map(); + + function getPipeline(source) { + if (!boundPipelineCache.has(source)) { + let pipeline = streams.reduce((bound, stream) => { + if (stream.read == null) { + throw new Error(`Stream is missing a read handler (stream description: ${stream.description})`); + } else if (stream.abort == null) { + throw new Error(`Stream is missing an abort handler (stream description: ${stream.description})`); + } else if (stream.peek == null) { + throw new Error(`Stream is missing a peek handler (stream description: ${stream.description})`); + } else { + if (bound != null) { + return Object.assign({}, stream, { + read: stream.read.bind(null, bound), + abort: stream.abort.bind(null, bound), + peek: stream.peek.bind(null, bound) + }); + } else { + /* This only applies to the source stream at the start of a pipeline, which is initialized *without* an upstream source argument. */ + return stream; + } + } + }, source); + + boundPipelineCache.set(source, pipeline); + } + + return boundPipelineCache.get(source); + } + + function verifyFullPipeline(source) { + let firstStream = streams[0]; + let requiresSource = (firstStream.read.length > 0); + + return (source != null || !requiresSource); + } + + return { + description: `piped stream [${streams.map((stream) => stream.description).join(" => ")}]`, + read: function (source) { + if(!verifyFullPipeline(source)) { + throw new Error("Cannot read from a partial pipeline; maybe you forgot to specify a source stream?"); + } else { + return getPipeline(source).read(); + } + }, + peek: function (source) { + if(!verifyFullPipeline(source)) { + throw new Error("Cannot peek from a partial pipeline; maybe you forgot to specify a source stream?"); + } else { + return getPipeline(source).peek(); + } + }, + abort: function (source, reason) { + if(!verifyFullPipeline(source)) { + throw new Error("Cannot abort a partial pipeline; maybe you forgot to specify a source stream?"); + } else { + return getPipeline(source).abort(reason); + } + } + }; + } +}; diff --git a/package.json b/package.json new file mode 100644 index 0000000..34d67e6 --- /dev/null +++ b/package.json @@ -0,0 +1,8 @@ +{ + "name": "@ppstreams/pipe", + "version": "0.1.0", + "main": "index.js", + "repository": "http://git.cryto.net/ppstreams/pipe.git", + "author": "Sven Slootweg ", + "license": "WTFPL OR CC0-1.0" +}