You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

126 lines
4.2 KiB
JavaScript

/* eslint-disable no-loop-func */
"use strict";
const matchValue = require("match-value");
const pushBuffer = require("push-buffer");
const propagateAbort = require("@promistream/propagate-abort");
const propagatePeek = require("@promistream/propagate-peek");
const isEndOfStream = require("@promistream/is-end-of-stream");
const isAborted = require("@promistream/is-aborted");
const { validateOptions } = require("@validatem/core");
const required = require("@validatem/required");
const isFunction = require("@validatem/is-function");
const wrapValueAsOption = require("@validatem/wrap-value-as-option");
const defaultTo = require("@validatem/default-to");
const states = {
OPEN: Symbol("states.OPEN"),
ABORTED: Symbol("states.ABORTED"),
ABORT_HANDLED: Symbol("states.ABORT_HANDLED"),
ABORT_THROW_CAUSE: Symbol("states.ABORT_THROW_CAUSE"),
ENDED: Symbol("states.ENDED"),
END_HANDLED: Symbol("states.END_HANDLED")
};
function noop() {
// Do nothing
}
module.exports = function simpleSinkStream(_options) {
let { onValue, onEnd, onAbort, onSourceChanged } = validateOptions(arguments, [
required,
wrapValueAsOption("onValue"), {
onValue: [ required, isFunction ],
onAbort: [ defaultTo.literal(noop), isFunction ],
onSourceChanged: [ defaultTo.literal(noop), isFunction ],
onEnd: [ defaultTo.literal(noop), isFunction ]
}
]);
let state = states.OPEN;
let lastKnownSource;
let abortMarker, endMarker;
let buffer = pushBuffer({
sequential: true,
pull: async () => {
let result = undefined;
let forceReturn = false;
while (result === undefined && forceReturn === false) {
await matchValue(state, {
[states.OPEN]: async () => {
try {
let value = await lastKnownSource.read();
result = await onValue(value, (reason) => lastKnownSource.abort(reason));
} catch (error) {
if (isEndOfStream(error)) {
endMarker = error;
state = states.ENDED;
} else if (isAborted(error)) {
abortMarker = error;
state = states.ABORTED;
} else {
try {
await lastKnownSource.abort(error);
} catch (abortError) {
let message = [
`Tried to abort stream due to encountering an error, but the aborting itself failed`,
`Original error message: ${error.message}`,
`Abort failure message: ${abortError.message}`
].join("\n");
// FIXME: Make this some sort of chained error
let combinedError = new Error(message);
combinedError.stack = abortError.stack; // HACK
throw combinedError;
}
}
}
},
[states.ABORTED]: async () => {
result = await onAbort(abortMarker.cause);
state = states.ABORT_THROW_CAUSE;
},
[states.ABORT_THROW_CAUSE]: async () => {
// NOTE: This ensures that the original error causing the abort is thrown exactly once
state = states.ABORT_HANDLED;
throw abortMarker.cause;
},
[states.ABORT_HANDLED]: async () => {
throw abortMarker;
},
[states.ENDED]: async () => {
// Note: we *always* return a value when handling the end of the stream - that is intentional. This is necessary because the end can only ever be reached as the ultimate result of a read call on this stream, and that call is expected to return a value. If we only returned a value when there's an onEnd handler producing one, then it would be possible in some cases for the handler to not produce any result, causing an entirely valid read call to result in a thrown EndOfStream error in the next state. That error should only be thrown if something *continues* to try and read from it, not at the first moment that the end is reached.
state = states.END_HANDLED;
result = await onEnd();
forceReturn = true;
},
[states.END_HANDLED]: async () => {
throw endMarker;
}
});
}
return result;
}
});
return {
_promistreamVersion: 0,
description: `simple sink stream`,
abort: propagateAbort,
peek: propagatePeek,
read: async function produceValue_simpleSinkStream(source) {
if (source !== lastKnownSource && onSourceChanged != null) {
lastKnownSource = source;
await onSourceChanged(source);
}
return buffer.request();
}
};
};