Initial commit

master
Sven Slootweg 3 years ago
commit 0197401ccd

@ -0,0 +1,3 @@
{
"extends": "@joepie91/eslint-config"
}

1
.gitignore vendored

@ -0,0 +1 @@
node_modules

@ -0,0 +1,34 @@
"use strict";
const Promise = require("bluebird");
const preReader = require("./");
const pipe = require("@promistream/pipe");
const rangeNumbers = require("@promistream/range-numbers");
let prereadingPipeline = pipe([
rangeNumbers(0, 3),
preReader()
]);
return Promise.try(() => {
return prereadingPipeline.read();
}).then((result) => {
console.log(result); // null
return prereadingPipeline.read();
}).then((result) => {
console.log(result); // 0
return prereadingPipeline.read();
}).then((result) => {
console.log(result); // 1
return prereadingPipeline.read();
}).then((result) => {
console.log(result); // 2
return prereadingPipeline.read();
}).catch((err) => {
console.error(err); // EndOfStream: There is no more data to read
});

@ -0,0 +1,61 @@
"use strict";
const Promise = require("bluebird");
const consumable = require("@joepie91/consumable");
const propagatePeek = require("@promistream/propagate-peek");
const propagateAbort = require("@promistream/propagate-abort");
const pDefer = require("p-defer");
const debug = require("debug")("@promistream/pre-reader");
// DOCS: This stream is guaranteed to return `null` on the first (usually manual) read call, and will pre-read 1 item from upstream for every subsequent call. It's primarily meant to be used for complex stream internals where a stream will only be first used (much) later than its creation, but it's desired to do the work as soon as possible. If you're just trying to make your pipeline faster, you probably want `parallelize` instead - this module will not do what you want.
const EndReached = Symbol("@promistream/pre-reader/EndReached");
module.exports = function preReader() {
let buffer = consumable(null);
let endOfStreamError;
function prepareRead(source) {
return Promise.try(() => {
return source.read();
}).catch((error) => {
endOfStreamError = error;
// This is a workaround: by ensuring that an EndOfStream state is stored in `nextRead` as a resolved value and not as a rejected error, we prevent an UnhandledRejectionError from occurring in circumstances where an extraneous read occurs before we know that we've reached the end of the stream (which would therefore result in a *second* EndOfStream error that's left unread by downstream). We convert this back to the original error below, if and only if the downstream actually asks for it.
return EndReached;
});
}
return {
_promistreamVersion: 0,
description: `pre-reader`,
peek: propagatePeek, // FIXME: Check that this is correct behaviour - shouldn't we always produce `true` on the first call?
abort: propagateAbort,
read: function produceValue_preReader(source) {
if (endOfStreamError == null) {
let debugID = Math.floor(Math.random() * 100000);
let { promise, resolve } = pDefer();
return Promise.try(() => {
// We put a Promise in the buffer before we decide what to do with it; that way, we are parallelism-safe, as each read builds upon the result of the previous read.
// FIXME: Investigate whether using sequentialize wouldn't have the exact same practical result
debug(`[${debugID}] Awaiting result`);
return buffer.replace(promise);
}).then((result) => {
if (result === EndReached) {
debug(`[${debugID}] Rejecting`);
resolve(EndReached);
throw endOfStreamError;
} else {
debug(`[${debugID}] Preparing new read`);
resolve(prepareRead(source));
debug(`[${debugID}] Returning:`, result);
return result;
}
});
} else {
throw endOfStreamError;
}
},
};
};

@ -0,0 +1,22 @@
{
"name": "@promistream/pre-reader",
"version": "0.1.0",
"main": "index.js",
"repository": "http://git.cryto.net/promistream/pre-reader.git",
"author": "Sven Slootweg <admin@cryto.net>",
"license": "WTFPL OR CC0-1.0",
"dependencies": {
"@joepie91/consumable": "^1.0.1",
"@promistream/propagate-abort": "^0.1.7",
"@promistream/propagate-peek": "^0.1.1",
"bluebird": "^3.7.2",
"debug": "^4.3.1",
"p-defer": "3"
},
"devDependencies": {
"@joepie91/eslint-config": "^1.1.0",
"@promistream/pipe": "^0.1.6",
"@promistream/range-numbers": "^0.1.2",
"eslint": "^7.29.0"
}
}

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save