"use strict"; const range = require("range").range; const asExpression = require("as-expression"); const pDefer = require("p-defer"); // FIXME: Move into separate package module.exports = function createMirroredBuffer(slots, leaderMode = false) { let buffers = range(0, slots).map(() => []); let bufferLengths = range(0, slots).map(() => []); return { getItem: function (slotIndex, callback) { let slotBuffer = buffers[slotIndex]; if (slotBuffer.length === 0) { let value = asExpression(() => { if (leaderMode) { // NOTE: In leader mode, we don't immediately start the operation, but instead store a placeholder defer. Then whenever the stream with index 0 picks out the item, *it* will start the operation (see further down below) and attach its promise to the existing defer, thereby resolving it for every stream. let { promise, resolve } = pDefer(); return { promise: promise, resolve: resolve }; } else { return { promise: callback(), resolve: null }; } }); buffers.forEach((buffer, i) => { buffer.push(value); bufferLengths[i] += 1; }); } bufferLengths[slotIndex] -= 1; let nextItem = slotBuffer.shift(); if (leaderMode && slotIndex === 0) { nextItem.resolve(callback()); } return nextItem.promise; }, // FIXME: Add note to README asking anyone who needs a strict discardItem, to file an issue to that effect maybeDiscardItem: function (slotIndex) { let slotBuffer = buffers[slotIndex]; if (slotBuffer.length > 0) { slotBuffer.shift(); bufferLengths[slotIndex] -= 1; } }, getBiggestBufferLength: function () { // TODO: Pre-calculate this on changes instead? Using bufferLengths. Figure out how to make that work with external buffer changes... return Math.max(... buffers.map((buffer) => buffer.length)); } }; };