"use strict"; // DOCS: A basic in-memory queue, with no persistence capabilities. Typically useful for implementing recursive asynchronous operations as a stream, such as recursively walking a directory tree. // FIXME: Add to spec a suggestion to use the `foo.stream` pattern for things that return more than just a stream? const asExpression = require("as-expression"); const matchValue = require("match-value"); const errorChain = require("error-chain"); const EndOfStream = require("@promistream/end-of-stream"); const Aborted = require("@promistream/aborted"); module.exports = function simpleQueue(initialItems) { // FIXME: Validatem let items = initialItems.slice(); let peekPointer = 0; let isEnded = false; let errorReason; return { push: function addItemToQueue(item) { // TODO: Throw an error if stream has already ended? items.push(item); }, stream: { _promistreamVersion: 0, _promistreamIsSource: true, description: `simple queue`, peek: async function peekValue_simpleQueue() { // NOTE: 0 items in the queue does *not* mean that the stream has ended, here! This may be a temporary state, and so only translates into an ended state if it is still true when a read is attempted, as per the spec's defined behaviour for parallelized streams. if (errorReason != null || isEnded === true) { return true; } else if (items.length > 0 && peekPointer < items.length - 1) { peekPointer += 1; return true; } else { return false; } }, read: async function produceValue_simpleQueue() { let action = asExpression(() => { if (peekPointer > 0) { return "readValue"; } else if (errorReason != null) { return "readError"; } else if (items.length === 0 || isEnded) { return "endOfStream"; } else { return "readValue"; } }); return matchValue(action, { readValue: () => { // NOTE: As per the spec, each peek corresponds to a later read, but reads are allowed to happen without preceding peeks. Therefore, we should only reduce the peekPointer when it's actually been incremented to begin with. if (peekPointer > 0) { peekPointer -= 1; } return items.shift(); }, readError: () => { // FIXME: Abstract out this logic, it's reusable across source streams if (errorReason === true) { throw new Aborted("Stream was aborted"); } else if (errorReason instanceof Error) { throw new errorChain.chain(errorReason, Aborted, `Stream was aborted due to error: ${errorReason.message}`); } }, endOfStream: () => { // This locks the queue into an 'ended' state for spec compliance, even if some misbehaving code pushes an item after the pipeline has already been torn down isEnded = true; throw new EndOfStream; } }); }, abort: async function abort_simpleQueue(reason, _source) { // FIXME: Abstract out this logic, it's reusable across source streams if (errorReason == null) { if (reason === true || reason instanceof Error) { errorReason = reason; } else { throw new Error("You must specify a reason (either `true` or an Error object) when aborting a stream"); } } else { // FIXME: Require this behaviour in the spec? Or is there a composability-related reason to permit/require quietly ignoring this, to make it idempotent? throw new Error(`The stream is already aborted`); } } } }; };