You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
openNG/build/core/pipeline.js

105 lines
3.1 KiB
JavaScript

"use strict";
const path = require("path");
const defaultValue = require("default-value");
const createEventEmitter = require("create-event-emitter");
const mergeMetadata = require("./util/merge-metadata");
module.exports = function pipeline(steps) {
if (steps.length < 2) {
throw new Error("A pipeline must consist of at least two steps");
} else if (steps[0].__buildStep !== "source") {
throw new Error("The first transform in a pipeline must be a file source");
} else if (steps[steps.length - 1].__buildStep !== "sink") {
throw new Error("The last transform in a pipeline must be a file sink");
} else {
let basePath = steps[0].basePath;
let sources = steps.slice(0, -1);
let sinks = steps.slice(1);
let firstSource = sources[0];
let finalSink = sinks[sinks.length - 1];
let emitter = createEventEmitter({
basePath: basePath, /* QUESTION: Should sources be required to provide this? */
steps: steps, /* FIXME: Make private? */
initialize: function () {
let allSinksSupportStreams = sinks.every((sink) => sink.supportsStreams === true);
/* This will start the flow. */
firstSource.initialize({
supportsStreams: allSinksSupportStreams
});
}
});
steps.forEach((step) => {
step.on("error", (data) => {
if (data instanceof Error) {
/* FIXME: This is because any error within an event handler will automatically result in an `error` event. Probably should rename the transform error event to avoid interfering with this. */
process.nextTick(() => {
throw data;
});
} else {
let error = Object.assign({
isFatal: true
}, data);
emitter.emit("error", error);
}
});
});
sources.forEach((source, i) => {
source.on("file", ({metadata, contents, stream}) => {
/* FIXME: Add stream-to-contents conversion? */
let augmentedMetadata;
if (metadata.isVirtual === false && metadata.relativePath == null) {
/* FIXME: Add check that 1) absolute path is set, and 2) it lies within the basePath */
augmentedMetadata = mergeMetadata(metadata, {
relativePath: path.relative(basePath, metadata.path)
});
} else {
augmentedMetadata = metadata;
}
augmentedMetadata.basePath = basePath;
/* NOTE: The below indexes into `steps`, not `sources`, because the last source sends to the final sink. Both `steps` and `sources` arrays start at the same point. */
let nextSink = steps[i + 1];
try {
let sinkInput = {
metadata: augmentedMetadata,
contents: contents,
stream: stream
};
if (nextSink.__buildStep === "transform") {
nextSink.transform(sinkInput);
} else if (nextSink.__buildStep === "sink") {
nextSink.sink(sinkInput);
}
} catch (error) {
emitter.emit("error", {
step: nextSink,
file: augmentedMetadata,
error: error,
isFatal: defaultValue(error.isFatal, true)
});
}
});
});
finalSink.on("sunk", ({metadata}) => {
emitter.emit("done", {
metadata: metadata,
acknowledged: finalSink.supportsAcknowledgment
});
});
return emitter;
}
};