You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

62 lines
2.7 KiB
JavaScript

"use strict";
const Promise = require("bluebird");
const consumable = require("@joepie91/consumable");
const propagatePeek = require("@promistream/propagate-peek");
const propagateAbort = require("@promistream/propagate-abort");
const pDefer = require("p-defer");
const debug = require("debug")("@promistream/pre-reader");
// DOCS: This stream is guaranteed to return `null` on the first (usually manual) read call, and will pre-read 1 item from upstream for every subsequent call. It's primarily meant to be used for complex stream internals where a stream will only be first used (much) later than its creation, but it's desired to do the work as soon as possible. If you're just trying to make your pipeline faster, you probably want `parallelize` instead - this module will not do what you want.
const EndReached = Symbol("@promistream/pre-reader/EndReached");
module.exports = function preReader() {
let buffer = consumable(null);
let endOfStreamError;
function prepareRead(source) {
return Promise.try(() => {
return source.read();
}).catch((error) => {
endOfStreamError = error;
// This is a workaround: by ensuring that an EndOfStream state is stored in `nextRead` as a resolved value and not as a rejected error, we prevent an UnhandledRejectionError from occurring in circumstances where an extraneous read occurs before we know that we've reached the end of the stream (which would therefore result in a *second* EndOfStream error that's left unread by downstream). We convert this back to the original error below, if and only if the downstream actually asks for it.
return EndReached;
});
}
return {
_promistreamVersion: 0,
description: `pre-reader`,
peek: propagatePeek, // FIXME: Check that this is correct behaviour - shouldn't we always produce `true` on the first call?
abort: propagateAbort,
read: function produceValue_preReader(source) {
if (endOfStreamError == null) {
let debugID = Math.floor(Math.random() * 100000);
let { promise, resolve } = pDefer();
return Promise.try(() => {
// We put a Promise in the buffer before we decide what to do with it; that way, we are parallelism-safe, as each read builds upon the result of the previous read.
// FIXME: Investigate whether using sequentialize wouldn't have the exact same practical result
debug(`[${debugID}] Awaiting result`);
return buffer.replace(promise);
}).then((result) => {
if (result === EndReached) {
debug(`[${debugID}] Rejecting`);
resolve(EndReached);
throw endOfStreamError;
} else {
debug(`[${debugID}] Preparing new read`);
resolve(prepareRead(source));
debug(`[${debugID}] Returning:`, result);
return result;
}
});
} else {
throw endOfStreamError;
}
},
};
};