You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
95 lines
3.4 KiB
JavaScript
95 lines
3.4 KiB
JavaScript
"use strict";
|
|
|
|
// DOCS: A basic in-memory queue, with no persistence capabilities. Typically useful for implementing recursive asynchronous operations as a stream, such as recursively walking a directory tree.
|
|
// FIXME: Add to spec a suggestion to use the `foo.stream` pattern for things that return more than just a stream?
|
|
|
|
const asExpression = require("as-expression");
|
|
const matchValue = require("match-value");
|
|
const errorChain = require("error-chain");
|
|
|
|
const EndOfStream = require("@promistream/end-of-stream");
|
|
const Aborted = require("@promistream/aborted");
|
|
|
|
module.exports = function simpleQueue(initialItems) {
|
|
// FIXME: Validatem
|
|
let items = initialItems.slice();
|
|
|
|
let peekPointer = 0;
|
|
let isEnded = false;
|
|
let errorReason;
|
|
|
|
return {
|
|
push: function addItemToQueue(item) {
|
|
// TODO: Throw an error if stream has already ended?
|
|
items.push(item);
|
|
},
|
|
stream: {
|
|
_promistreamVersion: 0,
|
|
_promistreamIsSource: true,
|
|
description: `simple queue`,
|
|
peek: async function peekValue_simpleQueue() {
|
|
// NOTE: 0 items in the queue does *not* mean that the stream has ended, here! This may be a temporary state, and so only translates into an ended state if it is still true when a read is attempted, as per the spec's defined behaviour for parallelized streams.
|
|
if (errorReason != null || isEnded === true) {
|
|
return true;
|
|
} else if (items.length > 0 && peekPointer < items.length - 1) {
|
|
peekPointer += 1;
|
|
return true;
|
|
} else {
|
|
return false;
|
|
}
|
|
},
|
|
read: async function produceValue_simpleQueue() {
|
|
let action = asExpression(() => {
|
|
if (peekPointer > 0) {
|
|
return "readValue";
|
|
} else if (errorReason != null) {
|
|
return "readError";
|
|
} else if (items.length === 0 || isEnded) {
|
|
return "endOfStream";
|
|
} else {
|
|
return "readValue";
|
|
}
|
|
});
|
|
|
|
return matchValue(action, {
|
|
readValue: () => {
|
|
// NOTE: As per the spec, each peek corresponds to a later read, but reads are allowed to happen without preceding peeks. Therefore, we should only reduce the peekPointer when it's actually been incremented to begin with.
|
|
if (peekPointer > 0) {
|
|
peekPointer -= 1;
|
|
}
|
|
|
|
return items.shift();
|
|
},
|
|
readError: () => {
|
|
// FIXME: Abstract out this logic, it's reusable across source streams
|
|
if (errorReason === true) {
|
|
throw new Aborted("Stream was aborted");
|
|
} else if (errorReason instanceof Error) {
|
|
throw new errorChain.chain(errorReason, Aborted, `Stream was aborted due to error: ${errorReason.message}`);
|
|
}
|
|
},
|
|
endOfStream: () => {
|
|
// This locks the queue into an 'ended' state for spec compliance, even if some misbehaving code pushes an item after the pipeline has already been torn down
|
|
isEnded = true;
|
|
|
|
throw new EndOfStream;
|
|
}
|
|
});
|
|
},
|
|
abort: async function abort_simpleQueue(reason, _source) {
|
|
// FIXME: Abstract out this logic, it's reusable across source streams
|
|
if (errorReason == null) {
|
|
if (reason === true || reason instanceof Error) {
|
|
errorReason = reason;
|
|
} else {
|
|
throw new Error("You must specify a reason (either `true` or an Error object) when aborting a stream");
|
|
}
|
|
} else {
|
|
// FIXME: Require this behaviour in the spec? Or is there a composability-related reason to permit/require quietly ignoring this, to make it idempotent?
|
|
throw new Error(`The stream is already aborted`);
|
|
}
|
|
}
|
|
}
|
|
};
|
|
};
|