You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
132 lines
3.9 KiB
JavaScript
132 lines
3.9 KiB
JavaScript
"use strict";
|
|
|
|
const Promise = require("bluebird");
|
|
const createResultBuffer = require("result-buffer");
|
|
|
|
const propagateAbort = require("@promistream/propagate-abort");
|
|
const propagatePeek = require("@promistream/propagate-peek");
|
|
const isEndOfStream = require("@promistream/is-end-of-stream");
|
|
const isAborted = require("@promistream/is-aborted");
|
|
|
|
const { validateOptions } = require("@validatem/core");
|
|
const required = require("@validatem/required");
|
|
const isFunction = require("@validatem/is-function");
|
|
const wrapValueAsOption = require("@validatem/wrap-value-as-option");
|
|
|
|
// FIXME: Update other stream implementations to new API
|
|
module.exports = function simpleSinkStream(_options) {
|
|
let { onResult, onEnd, onAbort, onSourceChanged } = validateOptions(arguments, [
|
|
required,
|
|
wrapValueAsOption("onResult"), {
|
|
onResult: [ required, isFunction ],
|
|
onAbort: [ isFunction ],
|
|
onSourceChanged: [ isFunction ],
|
|
onEnd: [ isFunction ]
|
|
}
|
|
]);
|
|
|
|
// FIXME: Bump minor version!
|
|
|
|
let onEndCalled = false;
|
|
let abortHandled = false;
|
|
let lastKnownSource;
|
|
|
|
let resultBuffer = createResultBuffer();
|
|
|
|
return {
|
|
_promistreamVersion: 0,
|
|
description: `simple sink stream`,
|
|
abort: propagateAbort,
|
|
peek: propagatePeek,
|
|
read: function produceValue_simpleSinkStream(source) {
|
|
function attemptRead() {
|
|
return Promise.try(() => {
|
|
return source.read();
|
|
}).then((value) => {
|
|
// FIXME: Document that you can pause the sink from the onResult callback, by returning a Promise that resolves when it should be resumed
|
|
return onResult(value, source.abort.bind(source));
|
|
}).then((result) => {
|
|
// FIXME: Replace all instances of undefined checks with a standardized NoValue marker
|
|
if (result !== undefined) {
|
|
// TODO: Force end of stream when this occurs?
|
|
return result;
|
|
} else {
|
|
return attemptRead();
|
|
}
|
|
});
|
|
}
|
|
|
|
return resultBuffer.maybeRead(() => {
|
|
return Promise.try(() => {
|
|
if (source !== lastKnownSource && onSourceChanged != null) {
|
|
lastKnownSource = source;
|
|
return onSourceChanged(source);
|
|
}
|
|
}).then(() => {
|
|
return attemptRead();
|
|
}).catch(isEndOfStream, (error) => {
|
|
/* Don't attempt to do another read, we're done. */
|
|
return Promise.try(() => {
|
|
if (!onEndCalled && onEnd != null) {
|
|
onEndCalled = true;
|
|
|
|
return Promise.try(() => {
|
|
return onEnd();
|
|
}).then((result) => {
|
|
if (result !== undefined) {
|
|
resultBuffer.push(result);
|
|
}
|
|
});
|
|
}
|
|
}).then(() => {
|
|
return resultBuffer.maybeRead(() => {
|
|
throw error;
|
|
});
|
|
});
|
|
}).catch((error) => !isAborted(error), (error) => {
|
|
return Promise.try(() => {
|
|
return source.abort(error);
|
|
}).catch((abortError) => {
|
|
let message = [
|
|
`Tried to abort stream due to encountering an error, but the aborting itself failed`,
|
|
`Original error message: ${error.message}`,
|
|
`Abort failure message: ${abortError.message}`
|
|
].join("\n");
|
|
|
|
// FIXME: Make this some sort of chained error
|
|
let combinedError = new Error(message);
|
|
combinedError.stack = abortError.stack; // HACK
|
|
throw combinedError;
|
|
}).then(() => {
|
|
// Pass through the original error to the user
|
|
throw error;
|
|
});
|
|
}).catch(isAborted, (marker) => {
|
|
if (abortHandled === false) {
|
|
abortHandled = true;
|
|
|
|
return Promise.try(() => {
|
|
if (onAbort != null) {
|
|
return onAbort(marker.reason);
|
|
}
|
|
}).then((value) => {
|
|
if (value !== undefined) {
|
|
resultBuffer.push(value);
|
|
}
|
|
|
|
if (marker.reason instanceof Error) {
|
|
// NOTE: This ensures that the original error causing the abort is thrown exactly once
|
|
resultBuffer.push(Promise.reject(marker.reason));
|
|
}
|
|
});
|
|
}
|
|
|
|
return resultBuffer.maybeRead(() => {
|
|
throw marker;
|
|
});
|
|
});
|
|
});
|
|
}
|
|
};
|
|
};
|