"use strict"; // FIXME: Separate this out into its own package const splitFilter = require("split-filter"); const unreachable = require("@joepie91/unreachable")("@promistream/from-node-stream"); const EndOfStream = require("@promistream/end-of-stream"); const warn = require("../warn"); const createDefer = require("../create-defer"); module.exports = function createPushBuffer(options) { let onPause = options.onPause || function pauseNotImplemented() { return false; }; let onResume = options.onResume || function resumeNotImplemented() { return false; }; let itemBuffer = []; let requestQueue = []; let isPaused = false; let hasEnded = false; function resumeIfEmpty() { let bufferIsEmpty = (itemBuffer.length === 0); if (bufferIsEmpty && isPaused) { if (onResume() === true) { isPaused = false; } } } function attemptDrain() { // NOTE: This must remain fully synchronous, if we want to avoid unnecessary pauses in the `data` handler while (requestQueue.length > 0) { let hasItems = (itemBuffer.length > 0); let hasResponse = (hasEnded || hasItems); if (hasResponse) { let defer = requestQueue.shift(); if (hasItems) { // FIXME: Does this correctly deal with an error event produced as a result of an abort? let item = itemBuffer.shift(); if (item.type === "value") { defer.resolve(item.value); } else if (item.type === "error") { defer.reject(item.error); } else { unreachable(`Unexpected item type '${item.type}'`); } } else if (hasEnded) { defer.reject(new EndOfStream()); } else { unreachable("Invalid response state, neither has items in queue nor ended"); } } else { break; } } resumeIfEmpty(); } return { queueValue: function (value) { itemBuffer.push({ type: "value", value: value }); attemptDrain(); let stillHasItemsBuffered = (itemBuffer.length > 0); if (stillHasItemsBuffered && !isPaused) { if (onPause() === true) { isPaused = true; } else { // FIXME: Only show this warning once? warn("The stream you are converting does not support pausing. This may lead to unexpectedly high memory usage!"); } } }, queueError: function (error) { itemBuffer.push({ type: "error", error: error }); attemptDrain(); }, queueRequest: function () { let { defer, promise } = createDefer(); requestQueue.push(defer); attemptDrain(); return promise; }, markEnded: function () { hasEnded = true; attemptDrain(); }, consumeImmediateBuffer: function () { attemptDrain(); // FIXME: Only return successful items here? if (requestQueue.length > 0) { // We won't ever serve up the buffer until any individual-item requests have been fulfilled. return []; } else { let [ values, errors ] = splitFilter(itemBuffer, (item) => item.type === "value"); if (errors.length > 0) { itemBuffer = values; // In case we ever write code that will do something with the remaining values in the buffer throw errors[0].error; } else { itemBuffer = []; resumeIfEmpty(); // Ensure that we haven't left the source stream in a paused state, because that would deadlock the pipeline return values.map((item) => item.value); } } } }; };