"use strict"; const Promise = require("bluebird"); const unreachable = require("@joepie91/unreachable")("@promistream/buffer"); const util = require("util"); const debug = require("debug")("promistream:buffer"); const propagateAbort = require("@promistream/propagate-abort"); const sequentialize = require("@promistream/sequentialize"); const pipe = require("@promistream/pipe"); module.exports = function bufferStream() { let buffer; let bufferPeeked; let hasPeekedUpstream = false; function attemptRead(readFunc) { return Promise.try(() => { return readFunc(); }).then((result) => { if (Array.isArray(result)) { if (result.length > 0) { debug(`Buffered ${result.length} items`); buffer = result; bufferPeeked = result.map(() => false); } else { return attemptRead(readFunc); } } else { // FIXME: Make this its own module, and improve its compactness? let stringified = util.inspect(result, { depth: 0 }) .replace(/\s+/g, " "); throw new Error(`Buffered stream produced a non-array value: ${stringified}`); } }); } function peekLocal() { let unpeekedIndex = bufferPeeked.indexOf(false); if (unpeekedIndex >= 0) { bufferPeeked[unpeekedIndex] = true; return true; } } function peekUpstream(source) { if (!hasPeekedUpstream) { debug("Peeking upstream"); hasPeekedUpstream = true; return source.peek(); } else if (buffer == null) { debug("Not allowed to peek upstream, reading first"); return Promise.try(() => { return attemptRead(source.read); }).then(() => { return peekLocal(); }); } else { unreachable("Attempted to peek upstream while there is already a local buffer"); } } let bufferStream = { _promistreamVersion: 0, description: "buffer stream", abort: propagateAbort, peek: function peekValue_bufferedSourceStream(source) { debug("Got peek request"); if (buffer == null) { debug("No buffer, forwarding peek to upstream"); return peekUpstream(source); } else { if (peekLocal()) { debug("Answering peek request"); return true; } else { debug("Buffer fully peeked, forwarding peek to upstream"); return peekUpstream(source); } } }, read: function produceValue_bufferedSourceStream(source) { debug("Got read request"); return Promise.try(() => { if (buffer == null) { debug("Waiting for upstream read..."); return attemptRead(source.read); } }).then(() => { if (buffer != null) { let value = buffer.shift(); bufferPeeked.shift(); debug(`Picked item from buffer, ${buffer.length} items left`); if (buffer.length === 0) { debug(`Buffer empty, unsetting...`); buffer = null; bufferPeeked = null; } return value; } else { unreachable("No buffer was present"); } }); } }; return pipe([ bufferStream, // FIXME: Can this be made parallelism-safe without force-sequentializing? sequentialize() ]); };