diff --git a/index.js b/index.js index 7e5d212..bc0fe96 100644 --- a/index.js +++ b/index.js @@ -2,6 +2,8 @@ const Promise = require("bluebird"); const Aborted = require("@ppstreams/aborted"); +const EndOfStream = require("@ppstreams/end-of-stream"); +const isEndOfStream = require("@ppstreams/is-end-of-stream"); const { validateOptions } = require("@validatem/core"); const required = require("@validatem/required"); @@ -18,18 +20,28 @@ module.exports = function simpleSource(_options) { ]); let errorReason; + let ended = false; let peekQueue = []; + function getValue() { + return Promise.try(() => { + return onRequest(); + }).catch(isEndOfStream, (marker) => { + ended = true; + throw marker; + }); + } + return { description: `simple source`, peek: function peekValue_simpleSource() { return Promise.try(() => { - if (errorReason != null) { + if (errorReason != null || ended === true) { return true; } else { // FIXME: Store Promises in the peekQueue instead? Or would this make it more difficult to deal with concurrent peeks/reads? And should the peek spec be changed to account for this? return Promise.try(() => { - return onRequest(); + return getValue(); }).then((result) => { peekQueue.push({ type: "value", value: result }); }).catch((error) => { @@ -62,8 +74,10 @@ module.exports = function simpleSource(_options) { // FIXME: Pass in full error, after changing to a different error type implementation that accepts error objects as extra properties throw new Aborted(`Stream was aborted due to error: ${errorReason.message}`, {reason: errorReason.message}); } + } else if (ended === true) { + throw new EndOfStream; } else { - return onRequest(); + return getValue(); } } }); diff --git a/package.json b/package.json index 4522726..969245e 100644 --- a/package.json +++ b/package.json @@ -7,6 +7,8 @@ "license": "WTFPL OR CC0-1.0", "dependencies": { "@ppstreams/aborted": "^0.1.0", + "@ppstreams/end-of-stream": "^0.1.0", + "@ppstreams/is-end-of-stream": "^0.1.0", "@validatem/core": "^0.3.12", "@validatem/is-function": "^0.1.0", "@validatem/required": "^0.1.1", @@ -14,7 +16,6 @@ "bluebird": "^3.7.2" }, "devDependencies": { - "@ppstreams/end-of-stream": "^0.1.0", "@ppstreams/pipe": "^0.1.0" } } diff --git a/yarn.lock b/yarn.lock index 9c6a64f..6502730 100644 --- a/yarn.lock +++ b/yarn.lock @@ -16,6 +16,11 @@ dependencies: create-error "^0.3.1" +"@ppstreams/is-end-of-stream@^0.1.0": + version "0.1.0" + resolved "https://registry.yarnpkg.com/@ppstreams/is-end-of-stream/-/is-end-of-stream-0.1.0.tgz#703b0530698dc920a8fbdeea9812ef63a6fdb42c" + integrity sha512-F7J7ey5oApuH/+QD/CCotePi9ID1b5Wl1h7/IAArjL7ETDOEyHuQevsi/KXIq8kY+0YArW3F0c4QqgQujjvSnQ== + "@ppstreams/pipe@^0.1.0": version "0.1.0" resolved "https://registry.yarnpkg.com/@ppstreams/pipe/-/pipe-0.1.0.tgz#52e72e15b4c8b8c59bcdbd757abfa4546ab380db"