You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

95 lines
3.4 KiB
JavaScript

"use strict";
// DOCS: A basic in-memory queue, with no persistence capabilities. Typically useful for implementing recursive asynchronous operations as a stream, such as recursively walking a directory tree.
// FIXME: Add to spec a suggestion to use the `foo.stream` pattern for things that return more than just a stream?
const asExpression = require("as-expression");
const matchValue = require("match-value");
const errorChain = require("error-chain");
const EndOfStream = require("@promistream/end-of-stream");
const Aborted = require("@promistream/aborted");
module.exports = function simpleQueue(initialItems) {
// FIXME: Validatem
let items = initialItems.slice();
let peekPointer = 0;
let isEnded = false;
let errorReason;
return {
push: function addItemToQueue(item) {
// TODO: Throw an error if stream has already ended?
items.push(item);
},
stream: {
_promistreamVersion: 0,
_promistreamIsSource: true,
description: `simple queue`,
peek: async function peekValue_simpleQueue() {
// NOTE: 0 items in the queue does *not* mean that the stream has ended, here! This may be a temporary state, and so only translates into an ended state if it is still true when a read is attempted, as per the spec's defined behaviour for parallelized streams.
if (errorReason != null || isEnded === true) {
return true;
} else if (items.length > 0 && peekPointer < items.length - 1) {
peekPointer += 1;
return true;
} else {
return false;
}
},
read: async function produceValue_simpleQueue() {
let action = asExpression(() => {
if (peekPointer > 0) {
return "readValue";
} else if (errorReason != null) {
return "readError";
} else if (items.length === 0 || isEnded) {
return "endOfStream";
} else {
return "readValue";
}
});
return matchValue(action, {
readValue: () => {
// NOTE: As per the spec, each peek corresponds to a later read, but reads are allowed to happen without preceding peeks. Therefore, we should only reduce the peekPointer when it's actually been incremented to begin with.
if (peekPointer > 0) {
peekPointer -= 1;
}
return items.shift();
},
readError: () => {
// FIXME: Abstract out this logic, it's reusable across source streams
if (errorReason === true) {
throw new Aborted("Stream was aborted");
} else if (errorReason instanceof Error) {
throw new errorChain.chain(errorReason, Aborted, `Stream was aborted due to error: ${errorReason.message}`);
}
},
endOfStream: () => {
// This locks the queue into an 'ended' state for spec compliance, even if some misbehaving code pushes an item after the pipeline has already been torn down
isEnded = true;
throw new EndOfStream;
}
});
},
abort: async function abort_simpleQueue(reason, _source) {
// FIXME: Abstract out this logic, it's reusable across source streams
if (errorReason == null) {
if (reason === true || reason instanceof Error) {
errorReason = reason;
} else {
throw new Error("You must specify a reason (either `true` or an Error object) when aborting a stream");
}
} else {
// FIXME: Require this behaviour in the spec? Or is there a composability-related reason to permit/require quietly ignoring this, to make it idempotent?
throw new Error(`The stream is already aborted`);
}
}
}
};
};