You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

110 lines
4.4 KiB
JavaScript

"use strict";
const { validateArguments } = require("@validatem/core");
const ValidationError = require("@validatem/error");
const required = require("@validatem/required");
const anything = require("@validatem/anything");
const wrapError = require("@validatem/wrap-error");
const arrayOf = require("@validatem/array-of");
const removeNullishItems = require("@validatem/remove-nullish-items");
const allowExtraProperties = require("@validatem/allow-extra-properties");
// FIXME: Move to @validatem/required-or-null
function requiredOrNull(value) {
if (value === void 0 /* undefined */) {
// FIXME: Error code
return new ValidationError("Required value is missing");
}
}
requiredOrNull.callIfNull = true;
// FIXME: Move to separate package, maybe extend to detect specific types of streams as well?
let isPromistream = wrapError("Must be a Promistream", "promistream.pipe.is-promistream", allowExtraProperties({
_promistreamVersion: anything
}));
module.exports = function pipe(_streams) {
// NOTE: We explicitly check for *null* here, not undefined. We still want to detect accidentally-omitted streams, but permit the user to explicitly insert nulls for streams that are not always inserted (eg. optional parallelization in a library). */
let [ streams ] = validateArguments(arguments, {
streams: [
required,
arrayOf([ requiredOrNull, isPromistream ]),
removeNullishItems
]
});
// FIXME: Validate input properly
if (streams.length < 1) {
throw new Error("Must specify at least one stream when defining a pipeline");
} else {
let firstStream = streams[0];
// NOTE: firstStream._promistreamHasSource is backwards compatibility; to be removed in promistreamVersion:1
let requiresSource = (firstStream._promistreamHasSource !== true && firstStream._promistreamIsSource !== true);
/* NOTE: We never clean up the cache, because it's very unlikely that this cache will ever grow big. In the vast majority of cases, it's going to contain at most one item. So instead, we let the garbage collector worry about getting rid of it once the pipeline itself becomes obsolete. */
let boundPipelineCache = new Map();
function getPipeline(source) {
if (!boundPipelineCache.has(source)) {
let pipeline = streams.reduce((bound, stream, i) => {
// FIXME: Stream index is currently wrong if there were any nulls in the streams list, correct for this
if (stream.read == null) {
throw new Error(`Stream at index ${i} is missing a read handler (stream description: ${stream.description})`);
} else if (stream.abort == null) {
throw new Error(`Stream at index ${i} is missing an abort handler (stream description: ${stream.description})`);
} else if (stream.peek == null) {
throw new Error(`Stream at index ${i} is missing a peek handler (stream description: ${stream.description})`);
} else {
if (bound != null) {
return Object.assign({}, stream, {
read: stream.read.bind(null, bound),
abort: (reason) => stream.abort(reason, bound), // bindRight equivalent
peek: stream.peek.bind(null, bound)
});
} else {
/* This only applies to the source stream at the start of a pipeline, which is initialized *without* an upstream source argument. */
return stream;
}
}
}, source);
boundPipelineCache.set(source, pipeline);
}
return boundPipelineCache.get(source);
}
function verifyFullPipeline(source) {
return (source != null || !requiresSource);
}
return {
_promistreamHasSource: !requiresSource, // For backwards compatibility, to be removed in promistreamVersion:1
_promistreamIsSource: !requiresSource,
_promistreamVersion: 0,
description: `piped stream [${streams.map((stream) => stream.description).join(" => ")}]`,
read: function (source) {
if(!verifyFullPipeline(source)) {
throw new Error("Cannot read from a partial pipeline; maybe you forgot to specify a source stream?");
} else {
return getPipeline(source).read();
}
},
peek: function (source) {
if(!verifyFullPipeline(source)) {
throw new Error("Cannot peek from a partial pipeline; maybe you forgot to specify a source stream?");
} else {
return getPipeline(source).peek();
}
},
abort: function (reason, source) {
if(!verifyFullPipeline(source)) {
throw new Error("Cannot abort a partial pipeline; maybe you forgot to specify a source stream?");
} else {
return getPipeline(source).abort(reason);
}
}
};
}
};