You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

108 lines
3.4 KiB
JavaScript

"use strict";
const Promise = require("bluebird");
const Aborted = require("@promistream/aborted");
const EndOfStream = require("@promistream/end-of-stream");
const isEndOfStream = require("@promistream/is-end-of-stream");
const errorChain = require("error-chain");
const unreachable = require("@joepie91/unreachable")("@promistream/simple-source");
const { validateOptions } = require("@validatem/core");
const required = require("@validatem/required");
const isFunction = require("@validatem/is-function");
const wrapValueAsOption = require("@validatem/wrap-value-as-option");
module.exports = function simpleSource(_options) {
let { onRequest, onAbort } = validateOptions(arguments, [
required,
wrapValueAsOption("onRequest"), {
onRequest: [ required, isFunction ],
onAbort: [ isFunction ]
}
]);
let errorReason;
let ended = false;
let peekQueue = [];
function getValue() {
return Promise.try(() => {
return onRequest();
}).catch(isEndOfStream, (marker) => {
ended = true;
throw marker;
});
}
return {
_promistreamVersion: 0,
_promistreamIsSource: true,
description: `simple source`,
peek: function peekValue_simpleSource() {
return Promise.try(() => {
if (errorReason != null || ended === true) {
return true;
} else {
// FIXME: Store Promises in the peekQueue instead? Or would this make it more difficult to deal with concurrent peeks/reads? And should the peek spec be changed to account for this?
return Promise.try(() => {
return getValue();
}).then((result) => {
peekQueue.push({ type: "value", value: result });
}).catch((error) => {
peekQueue.push({ type: "error", error: error });
}).then(() => {
// FIXME: What if there's an EndOfStream marker being produced? Or an abort? Or some other sort of failure? Shouldn't that return/produce something *other than* `true`?
return true;
});
}
});
},
read: function produceValue_simpleSource() {
return Promise.try(() => {
if (peekQueue.length > 0) {
/* FIXME: Move all this logic out into an itemBuffer abstraction of some sort (also useful in from-node-stream?) */
let item = peekQueue.shift();
if (item.type === "value") {
return item.value;
} else if (item.type === "error") {
throw item.error;
} else {
throw unreachable(`Found a queue item of type '${item.type}'`);
}
} else {
if (errorReason != null) {
if (errorReason === true) {
throw new Aborted("Stream was aborted");
} else if (errorReason instanceof Error) {
throw new errorChain.chain(errorReason, Aborted, `Stream was aborted due to error: ${errorReason.message}`);
}
} else if (ended === true) {
throw new EndOfStream;
} else {
return getValue();
}
}
});
},
abort: function abort_simpleSource(reason) {
return Promise.try(() => {
if (errorReason == null) {
if (reason === true || reason instanceof Error) {
errorReason = reason;
if (onAbort != null) {
return onAbort(reason);
}
} else {
throw new Error("You must specify a reason (either `true` or an Error object) when aborting a stream");
}
} else {
// FIXME: Require this behaviour in the spec? Or is there a composability-related reason to permit/require quietly ignoring this, to make it idempotent?
throw new Error(`The stream is already aborted`);
}
});
}
};
};