|
|
@ -16,13 +16,17 @@ module.exports = function pipe(streams) {
|
|
|
|
|
|
|
|
|
|
|
|
function getPipeline(source) {
|
|
|
|
function getPipeline(source) {
|
|
|
|
if (!boundPipelineCache.has(source)) {
|
|
|
|
if (!boundPipelineCache.has(source)) {
|
|
|
|
let pipeline = existentStreams.reduce((bound, stream) => {
|
|
|
|
let pipeline = existentStreams.reduce((bound, stream, i) => {
|
|
|
|
if (stream.read == null) {
|
|
|
|
// TODO: Use Validatem for this instead
|
|
|
|
throw new Error(`Stream is missing a read handler (stream description: ${stream.description})`);
|
|
|
|
// FIXME: Stream index is currently wrong if there were any nulls in the streams list, correct for this
|
|
|
|
|
|
|
|
if (!(typeof stream === "object") || Array.isArray(stream)) {
|
|
|
|
|
|
|
|
throw new Error(`Value at index ${i} in the pipeline is not a stream`);
|
|
|
|
|
|
|
|
} else if (stream.read == null) {
|
|
|
|
|
|
|
|
throw new Error(`Stream at index ${i} is missing a read handler (stream description: ${stream.description})`);
|
|
|
|
} else if (stream.abort == null) {
|
|
|
|
} else if (stream.abort == null) {
|
|
|
|
throw new Error(`Stream is missing an abort handler (stream description: ${stream.description})`);
|
|
|
|
throw new Error(`Stream at index ${i} is missing an abort handler (stream description: ${stream.description})`);
|
|
|
|
} else if (stream.peek == null) {
|
|
|
|
} else if (stream.peek == null) {
|
|
|
|
throw new Error(`Stream is missing a peek handler (stream description: ${stream.description})`);
|
|
|
|
throw new Error(`Stream at index ${i} is missing a peek handler (stream description: ${stream.description})`);
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
if (bound != null) {
|
|
|
|
if (bound != null) {
|
|
|
|
return Object.assign({}, stream, {
|
|
|
|
return Object.assign({}, stream, {
|
|
|
|