"use strict"; const unreachable = require("@joepie91/unreachable")("@promistream/buffer"); const util = require("util"); const debug = require("debug")("promistream:buffer"); const propagateAbort = require("@promistream/propagate-abort"); const sequentialize = require("@promistream/sequentialize"); const pipe = require("@promistream/pipe"); module.exports = function bufferStream() { let buffer; let bufferPeeked; let hasPeekedUpstream = false; async function attemptRead(readFunc) { let result = await readFunc(); if (Array.isArray(result)) { if (result.length > 0) { debug(`Buffered ${result.length} items`); buffer = result; bufferPeeked = result.map(() => false); } else { return await attemptRead(readFunc); } } else { // FIXME: Make this its own module, and improve its compactness? let stringified = util.inspect(result, { depth: 0 }) .replace(/\s+/g, " "); throw new Error(`Buffered stream produced a non-array value: ${stringified}`); } } function peekLocal() { let unpeekedIndex = bufferPeeked.indexOf(false); if (unpeekedIndex >= 0) { bufferPeeked[unpeekedIndex] = true; return true; } } async function peekUpstream(source) { if (!hasPeekedUpstream) { debug("Peeking upstream"); hasPeekedUpstream = true; return source.peek(); } else if (buffer == null) { debug("Not allowed to peek upstream, reading first"); // FIXME: Capture error and store in buffer? await attemptRead(source.read); return peekLocal(); } else { unreachable("Attempted to peek upstream while there is already a local buffer"); } } let bufferStream = { _promistreamVersion: 0, description: "buffer stream", abort: propagateAbort, peek: function peekValue_bufferedSourceStream(source) { debug("Got peek request"); if (buffer == null) { debug("No buffer, forwarding peek to upstream"); return peekUpstream(source); } else { if (peekLocal()) { debug("Answering peek request"); return true; } else { debug("Buffer fully peeked, forwarding peek to upstream"); return peekUpstream(source); } } }, read: async function produceValue_bufferedSourceStream(source) { debug("Got read request"); if (buffer == null) { debug("Waiting for upstream read..."); try { await attemptRead(source.read); } catch (error) { debug(`Encountered error in upstream read: ${error}`); throw error; } } if (buffer != null) { let value = buffer.shift(); bufferPeeked.shift(); debug(`Picked item from buffer, ${buffer.length} items left`); if (buffer.length === 0) { debug(`Buffer empty, unsetting...`); buffer = null; bufferPeeked = null; } return value; } else { unreachable("No buffer was present"); } } }; return pipe([ bufferStream, // TODO: Can this be made parallelism-safe without force-sequentializing? sequentialize() ]); };