You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
65 lines
1.9 KiB
JavaScript
65 lines
1.9 KiB
JavaScript
"use strict";
|
|
|
|
const range = require("range").range;
|
|
const asExpression = require("as-expression");
|
|
const pDefer = require("p-defer");
|
|
|
|
// FIXME: Move into separate package
|
|
|
|
module.exports = function createMirroredBuffer(slots, leaderMode = false) {
|
|
let buffers = range(0, slots).map(() => []);
|
|
let bufferLengths = range(0, slots).map(() => []);
|
|
|
|
return {
|
|
getItem: function (slotIndex, callback) {
|
|
let slotBuffer = buffers[slotIndex];
|
|
|
|
if (slotBuffer.length === 0) {
|
|
let value = asExpression(() => {
|
|
if (leaderMode) {
|
|
// NOTE: In leader mode, we don't immediately start the operation, but instead store a placeholder defer. Then whenever the stream with index 0 picks out the item, *it* will start the operation (see further down below) and attach its promise to the existing defer, thereby resolving it for every stream.
|
|
let { promise, resolve } = pDefer();
|
|
|
|
return {
|
|
promise: promise,
|
|
resolve: resolve
|
|
};
|
|
} else {
|
|
return {
|
|
promise: callback(),
|
|
resolve: null
|
|
};
|
|
}
|
|
});
|
|
|
|
buffers.forEach((buffer, i) => {
|
|
buffer.push(value);
|
|
bufferLengths[i] += 1;
|
|
});
|
|
}
|
|
|
|
bufferLengths[slotIndex] -= 1;
|
|
let nextItem = slotBuffer.shift();
|
|
|
|
if (leaderMode && slotIndex === 0) {
|
|
nextItem.resolve(callback());
|
|
}
|
|
|
|
return nextItem.promise;
|
|
},
|
|
// FIXME: Add note to README asking anyone who needs a strict discardItem, to file an issue to that effect
|
|
maybeDiscardItem: function (slotIndex) {
|
|
let slotBuffer = buffers[slotIndex];
|
|
|
|
if (slotBuffer.length > 0) {
|
|
slotBuffer.shift();
|
|
bufferLengths[slotIndex] -= 1;
|
|
}
|
|
},
|
|
getBiggestBufferLength: function () {
|
|
// TODO: Pre-calculate this on changes instead? Using bufferLengths. Figure out how to make that work with external buffer changes...
|
|
return Math.max(... buffers.map((buffer) => buffer.length));
|
|
}
|
|
};
|
|
};
|