You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

126 lines
4.2 KiB
JavaScript

5 months ago
/* eslint-disable no-loop-func */
6 years ago
"use strict";
5 months ago
const matchValue = require("match-value");
const pushBuffer = require("push-buffer");
const propagateAbort = require("@promistream/propagate-abort");
const propagatePeek = require("@promistream/propagate-peek");
const isEndOfStream = require("@promistream/is-end-of-stream");
const isAborted = require("@promistream/is-aborted");
const { validateOptions } = require("@validatem/core");
const required = require("@validatem/required");
const isFunction = require("@validatem/is-function");
const wrapValueAsOption = require("@validatem/wrap-value-as-option");
5 months ago
const defaultTo = require("@validatem/default-to");
const states = {
OPEN: Symbol("states.OPEN"),
ABORTED: Symbol("states.ABORTED"),
ABORT_HANDLED: Symbol("states.ABORT_HANDLED"),
ABORT_THROW_CAUSE: Symbol("states.ABORT_THROW_CAUSE"),
ENDED: Symbol("states.ENDED"),
END_HANDLED: Symbol("states.END_HANDLED")
};
function noop() {
// Do nothing
}
module.exports = function simpleSinkStream(_options) {
5 months ago
let { onValue, onEnd, onAbort, onSourceChanged } = validateOptions(arguments, [
required,
5 months ago
wrapValueAsOption("onValue"), {
onValue: [ required, isFunction ],
onAbort: [ defaultTo.literal(noop), isFunction ],
onSourceChanged: [ defaultTo.literal(noop), isFunction ],
onEnd: [ defaultTo.literal(noop), isFunction ]
}
]);
5 months ago
let state = states.OPEN;
let lastKnownSource;
5 months ago
let abortMarker, endMarker;
5 months ago
let buffer = pushBuffer({
sequential: true,
pull: async () => {
let result = undefined;
let forceReturn = false;
while (result === undefined && forceReturn === false) {
await matchValue(state, {
[states.OPEN]: async () => {
try {
let value = await lastKnownSource.read();
result = await onValue(value, (reason) => lastKnownSource.abort(reason));
} catch (error) {
if (isEndOfStream(error)) {
endMarker = error;
state = states.ENDED;
} else if (isAborted(error)) {
abortMarker = error;
state = states.ABORTED;
} else {
try {
await lastKnownSource.abort(error);
} catch (abortError) {
let message = [
`Tried to abort stream due to encountering an error, but the aborting itself failed`,
`Original error message: ${error.message}`,
`Abort failure message: ${abortError.message}`
].join("\n");
// FIXME: Make this some sort of chained error
let combinedError = new Error(message);
combinedError.stack = abortError.stack; // HACK
throw combinedError;
}
}
}
},
[states.ABORTED]: async () => {
result = await onAbort(abortMarker.cause);
state = states.ABORT_THROW_CAUSE;
},
[states.ABORT_THROW_CAUSE]: async () => {
// NOTE: This ensures that the original error causing the abort is thrown exactly once
state = states.ABORT_HANDLED;
throw abortMarker.cause;
},
[states.ABORT_HANDLED]: async () => {
throw abortMarker;
},
[states.ENDED]: async () => {
// Note: we *always* return a value when handling the end of the stream - that is intentional. This is necessary because the end can only ever be reached as the ultimate result of a read call on this stream, and that call is expected to return a value. If we only returned a value when there's an onEnd handler producing one, then it would be possible in some cases for the handler to not produce any result, causing an entirely valid read call to result in a thrown EndOfStream error in the next state. That error should only be thrown if something *continues* to try and read from it, not at the first moment that the end is reached.
state = states.END_HANDLED;
result = await onEnd();
forceReturn = true;
},
[states.END_HANDLED]: async () => {
throw endMarker;
}
});
}
return result;
}
});
6 years ago
return {
_promistreamVersion: 0,
description: `simple sink stream`,
6 years ago
abort: propagateAbort,
6 years ago
peek: propagatePeek,
5 months ago
read: async function produceValue_simpleSinkStream(source) {
if (source !== lastKnownSource && onSourceChanged != null) {
lastKnownSource = source;
await onSourceChanged(source);
}
5 months ago
return buffer.request();
6 years ago
}
};
};