Remember EndOfStream internally so that the source callback doesn't get called unnecessarily

master
Sven Slootweg 4 years ago
parent 92ee209de2
commit 370ac14fd1

@ -2,6 +2,8 @@
const Promise = require("bluebird");
const Aborted = require("@ppstreams/aborted");
const EndOfStream = require("@ppstreams/end-of-stream");
const isEndOfStream = require("@ppstreams/is-end-of-stream");
const { validateOptions } = require("@validatem/core");
const required = require("@validatem/required");
@ -18,18 +20,28 @@ module.exports = function simpleSource(_options) {
]);
let errorReason;
let ended = false;
let peekQueue = [];
function getValue() {
return Promise.try(() => {
return onRequest();
}).catch(isEndOfStream, (marker) => {
ended = true;
throw marker;
});
}
return {
description: `simple source`,
peek: function peekValue_simpleSource() {
return Promise.try(() => {
if (errorReason != null) {
if (errorReason != null || ended === true) {
return true;
} else {
// FIXME: Store Promises in the peekQueue instead? Or would this make it more difficult to deal with concurrent peeks/reads? And should the peek spec be changed to account for this?
return Promise.try(() => {
return onRequest();
return getValue();
}).then((result) => {
peekQueue.push({ type: "value", value: result });
}).catch((error) => {
@ -62,8 +74,10 @@ module.exports = function simpleSource(_options) {
// FIXME: Pass in full error, after changing to a different error type implementation that accepts error objects as extra properties
throw new Aborted(`Stream was aborted due to error: ${errorReason.message}`, {reason: errorReason.message});
}
} else if (ended === true) {
throw new EndOfStream;
} else {
return onRequest();
return getValue();
}
}
});

@ -7,6 +7,8 @@
"license": "WTFPL OR CC0-1.0",
"dependencies": {
"@ppstreams/aborted": "^0.1.0",
"@ppstreams/end-of-stream": "^0.1.0",
"@ppstreams/is-end-of-stream": "^0.1.0",
"@validatem/core": "^0.3.12",
"@validatem/is-function": "^0.1.0",
"@validatem/required": "^0.1.1",
@ -14,7 +16,6 @@
"bluebird": "^3.7.2"
},
"devDependencies": {
"@ppstreams/end-of-stream": "^0.1.0",
"@ppstreams/pipe": "^0.1.0"
}
}

@ -16,6 +16,11 @@
dependencies:
create-error "^0.3.1"
"@ppstreams/is-end-of-stream@^0.1.0":
version "0.1.0"
resolved "https://registry.yarnpkg.com/@ppstreams/is-end-of-stream/-/is-end-of-stream-0.1.0.tgz#703b0530698dc920a8fbdeea9812ef63a6fdb42c"
integrity sha512-F7J7ey5oApuH/+QD/CCotePi9ID1b5Wl1h7/IAArjL7ETDOEyHuQevsi/KXIq8kY+0YArW3F0c4QqgQujjvSnQ==
"@ppstreams/pipe@^0.1.0":
version "0.1.0"
resolved "https://registry.yarnpkg.com/@ppstreams/pipe/-/pipe-0.1.0.tgz#52e72e15b4c8b8c59bcdbd757abfa4546ab380db"

Loading…
Cancel
Save