"use strict"; // FIXME: Separate this out into its own package const splitFilter = require("split-filter"); const unreachable = require("@joepie91/unreachable")("@promistream/from-node-stream"); const EndOfStream = require("@promistream/end-of-stream"); const debug = require("debug")("promistream:from-node-stream:push-buffer"); const warn = require("../warn"); const createDefer = require("../create-defer"); module.exports = function createPushBuffer(options) { let onPause = options.onPause || function pauseNotImplemented() { return false; }; let onResume = options.onResume || function resumeNotImplemented() { return false; }; // TODO: Use @joepie91/consumable here? let itemBuffer = []; let requestQueue = []; let isPaused = false; let hasEnded = false; function resumeIfEmpty() { let bufferIsEmpty = (itemBuffer.length === 0); if (bufferIsEmpty && isPaused) { if (onResume() === true) { isPaused = false; } } } function attemptDrain() { // NOTE: This must remain fully synchronous, if we want to avoid unnecessary pauses in the `data` handler debug("Drain attempt started"); if (requestQueue.length > 0) { while (requestQueue.length > 0) { let hasItems = (itemBuffer.length > 0); let hasResponse = (hasEnded || hasItems); if (hasResponse) { debug("Satisfying queued request"); let defer = requestQueue.shift(); if (hasItems) { // FIXME: Does this correctly deal with an error event produced as a result of an abort? let item = itemBuffer.shift(); if (item.type === "value") { defer.resolve(item.value); } else if (item.type === "error") { defer.reject(item.error); } else { unreachable(`Unexpected item type '${item.type}'`); } } else if (hasEnded) { defer.reject(new EndOfStream()); } else { unreachable("Invalid response state, neither has items in queue nor ended"); } } else { debug("No data available to satisfy queued request"); break; } } } else { debug("No outstanding requests to satisfy"); } resumeIfEmpty(); } return { queueValue: function (value) { debug("Queueing value"); itemBuffer.push({ type: "value", value: value }); attemptDrain(); let stillHasItemsBuffered = (itemBuffer.length > 0); if (stillHasItemsBuffered && !isPaused) { if (onPause() === true) { isPaused = true; } else { // FIXME: Only show this warning once? warn("The stream you are converting does not support pausing. This may lead to unexpectedly high memory usage!"); } } }, queueError: function (error) { debug("Queueing error"); itemBuffer.push({ type: "error", error: error }); attemptDrain(); }, queueRequest: function () { debug("Queueing read request"); let { defer, promise } = createDefer(); requestQueue.push(defer); attemptDrain(); return promise; }, markEnded: function () { debug("Marking as ended"); hasEnded = true; attemptDrain(); }, consumeImmediateBuffer: function () { debug("Post-drain remaining buffer requested"); attemptDrain(); debug("Returning immediate buffer"); // FIXME: Only return successful items here? if (requestQueue.length > 0) { // We won't ever serve up the buffer until any individual-item requests have been fulfilled. return []; } else { let [ values, errors ] = splitFilter(itemBuffer, (item) => item.type === "value"); debug(`Buffer contains ${errors.length} errors and ${values.length} values`); if (errors.length > 0) { debug("Throwing first error"); itemBuffer = values; // In case we ever write code that will do something with the remaining values in the buffer throw errors[0].error; } else { debug(`Returning ${values.length} values`); itemBuffer = []; resumeIfEmpty(); // Ensure that we haven't left the source stream in a paused state, because that would deadlock the pipeline return values.map((item) => item.value); } } } }; };