You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

65 lines
1.9 KiB
JavaScript

"use strict";
const range = require("range").range;
const asExpression = require("as-expression");
const pDefer = require("p-defer");
// FIXME: Move into separate package
module.exports = function createMirroredBuffer(slots, leaderMode = false) {
let buffers = range(0, slots).map(() => []);
let bufferLengths = range(0, slots).map(() => []);
return {
getItem: function (slotIndex, callback) {
let slotBuffer = buffers[slotIndex];
if (slotBuffer.length === 0) {
let value = asExpression(() => {
if (leaderMode) {
// NOTE: In leader mode, we don't immediately start the operation, but instead store a placeholder defer. Then whenever the stream with index 0 picks out the item, *it* will start the operation (see further down below) and attach its promise to the existing defer, thereby resolving it for every stream.
let { promise, resolve } = pDefer();
return {
promise: promise,
resolve: resolve
};
} else {
return {
promise: callback(),
resolve: null
};
}
});
buffers.forEach((buffer, i) => {
buffer.push(value);
bufferLengths[i] += 1;
});
}
bufferLengths[slotIndex] -= 1;
let nextItem = slotBuffer.shift();
if (leaderMode && slotIndex === 0) {
nextItem.resolve(callback());
}
return nextItem.promise;
},
// FIXME: Add note to README asking anyone who needs a strict discardItem, to file an issue to that effect
maybeDiscardItem: function (slotIndex) {
let slotBuffer = buffers[slotIndex];
if (slotBuffer.length > 0) {
slotBuffer.shift();
bufferLengths[slotIndex] -= 1;
}
},
getBiggestBufferLength: function () {
// TODO: Pre-calculate this on changes instead? Using bufferLengths. Figure out how to make that work with external buffer changes...
return Math.max(... buffers.map((buffer) => buffer.length));
}
};
};