You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

135 lines
4.5 KiB
JavaScript

"use strict";
const Promise = require("bluebird");
const createResultBuffer = require("result-buffer");
const propagateAbort = require("@promistream/propagate-abort");
const propagatePeek = require("@promistream/propagate-peek");
const isEndOfStream = require("@promistream/is-end-of-stream");
const isAborted = require("@promistream/is-aborted");
const { validateOptions } = require("@validatem/core");
const required = require("@validatem/required");
const isFunction = require("@validatem/is-function");
const wrapValueAsOption = require("@validatem/wrap-value-as-option");
// FIXME: Update other stream implementations to new API
module.exports = function simpleSinkStream(_options) {
let { onResult, onEnd, onAbort, onSourceChanged } = validateOptions(arguments, [
required,
wrapValueAsOption("onResult"), {
onResult: [ required, isFunction ],
onAbort: [ isFunction ],
onSourceChanged: [ isFunction ],
onEnd: [ isFunction ]
}
]);
// FIXME: Bump minor version!
let onEndCalled = false;
let abortHandled = false;
let lastKnownSource;
let resultBuffer = createResultBuffer();
return {
_promistreamVersion: 0,
description: `simple sink stream`,
abort: propagateAbort,
peek: propagatePeek,
read: function produceValue_simpleSinkStream(source) {
function attemptRead() {
return Promise.try(() => {
return source.read();
}).then((value) => {
// FIXME: Document that you can pause the sink from the onResult callback, by returning a Promise that resolves when it should be resumed
return onResult(value, source.abort.bind(source));
}).then((result) => {
// FIXME: Replace all instances of undefined checks with a standardized NoValue marker
if (result !== undefined) {
// TODO: Force end of stream when this occurs?
return result;
} else {
return attemptRead();
}
});
}
return resultBuffer.maybeRead(() => {
return Promise.try(() => {
if (source !== lastKnownSource && onSourceChanged != null) {
lastKnownSource = source;
return onSourceChanged(source);
}
}).then(() => {
return attemptRead();
}).catch(isEndOfStream, (error) => {
/* Don't attempt to do another read, we're done. */
return Promise.try(() => {
// Note: we *always* push a value to the buffer when handling the end of the stream - that is intentional. This is necessary because the end can only ever be reached as the ultimate result of a read call on this stream, and that call is expected to return a value. If we only pushed a value to the buffer when there's an onEnd handler producing one, then it would be possible in some cases for the buffer to remain empty, causing an entirely valid read call to result in a thrown EndOfStream error. That error should only be thrown if something *continues* to try and read from it, not at the first moment that the end is reached.
if (!onEndCalled) {
onEndCalled = true;
if (onEnd != null) {
return Promise.try(() => {
return onEnd();
}).then((result) => {
resultBuffer.push(result);
});
} else {
resultBuffer.push(undefined);
}
}
}).then(() => {
return resultBuffer.maybeRead(() => {
throw error;
});
});
}).catch((error) => !isAborted(error), (error) => {
return Promise.try(() => {
return source.abort(error);
}).catch((abortError) => {
let message = [
`Tried to abort stream due to encountering an error, but the aborting itself failed`,
`Original error message: ${error.message}`,
`Abort failure message: ${abortError.message}`
].join("\n");
// FIXME: Make this some sort of chained error
let combinedError = new Error(message);
combinedError.stack = abortError.stack; // HACK
throw combinedError;
}).then(() => {
// Pass through the original error to the user
throw error;
});
}).catch(isAborted, (marker) => {
if (abortHandled === false) {
abortHandled = true;
return Promise.try(() => {
if (onAbort != null) {
return onAbort(marker.reason);
}
}).then((value) => {
if (value !== undefined) {
resultBuffer.push(value);
}
if (marker.reason instanceof Error) {
// NOTE: This ensures that the original error causing the abort is thrown exactly once
resultBuffer.push(Promise.reject(marker.reason));
}
});
}
return resultBuffer.maybeRead(() => {
throw marker;
});
});
});
}
};
};