Initial commit
commit
6870c16155
@ -0,0 +1 @@
|
||||
node_modules
|
@ -0,0 +1,71 @@
|
||||
"use strict";
|
||||
|
||||
module.exports = function pipe(streams) {
|
||||
if (streams.length < 1) {
|
||||
throw new Error("Must specify at least one stream when defining a pipeline");
|
||||
} else {
|
||||
/* NOTE: We never clean up the cache, because it's very unlikely that this cache will ever grow big. In the vast majority of cases, it's going to contain at most one item. So instead, we let the garbage collector worry about getting rid of it once the pipeline itself becomes obsolete. */
|
||||
let boundPipelineCache = new Map();
|
||||
|
||||
function getPipeline(source) {
|
||||
if (!boundPipelineCache.has(source)) {
|
||||
let pipeline = streams.reduce((bound, stream) => {
|
||||
if (stream.read == null) {
|
||||
throw new Error(`Stream is missing a read handler (stream description: ${stream.description})`);
|
||||
} else if (stream.abort == null) {
|
||||
throw new Error(`Stream is missing an abort handler (stream description: ${stream.description})`);
|
||||
} else if (stream.peek == null) {
|
||||
throw new Error(`Stream is missing a peek handler (stream description: ${stream.description})`);
|
||||
} else {
|
||||
if (bound != null) {
|
||||
return Object.assign({}, stream, {
|
||||
read: stream.read.bind(null, bound),
|
||||
abort: stream.abort.bind(null, bound),
|
||||
peek: stream.peek.bind(null, bound)
|
||||
});
|
||||
} else {
|
||||
/* This only applies to the source stream at the start of a pipeline, which is initialized *without* an upstream source argument. */
|
||||
return stream;
|
||||
}
|
||||
}
|
||||
}, source);
|
||||
|
||||
boundPipelineCache.set(source, pipeline);
|
||||
}
|
||||
|
||||
return boundPipelineCache.get(source);
|
||||
}
|
||||
|
||||
function verifyFullPipeline(source) {
|
||||
let firstStream = streams[0];
|
||||
let requiresSource = (firstStream.read.length > 0);
|
||||
|
||||
return (source != null || !requiresSource);
|
||||
}
|
||||
|
||||
return {
|
||||
description: `piped stream [${streams.map((stream) => stream.description).join(" => ")}]`,
|
||||
read: function (source) {
|
||||
if(!verifyFullPipeline(source)) {
|
||||
throw new Error("Cannot read from a partial pipeline; maybe you forgot to specify a source stream?");
|
||||
} else {
|
||||
return getPipeline(source).read();
|
||||
}
|
||||
},
|
||||
peek: function (source) {
|
||||
if(!verifyFullPipeline(source)) {
|
||||
throw new Error("Cannot peek from a partial pipeline; maybe you forgot to specify a source stream?");
|
||||
} else {
|
||||
return getPipeline(source).peek();
|
||||
}
|
||||
},
|
||||
abort: function (source, reason) {
|
||||
if(!verifyFullPipeline(source)) {
|
||||
throw new Error("Cannot abort a partial pipeline; maybe you forgot to specify a source stream?");
|
||||
} else {
|
||||
return getPipeline(source).abort(reason);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
@ -0,0 +1,8 @@
|
||||
{
|
||||
"name": "@ppstreams/pipe",
|
||||
"version": "0.1.0",
|
||||
"main": "index.js",
|
||||
"repository": "http://git.cryto.net/ppstreams/pipe.git",
|
||||
"author": "Sven Slootweg <admin@cryto.net>",
|
||||
"license": "WTFPL OR CC0-1.0"
|
||||
}
|
Loading…
Reference in New Issue