Add sequential mode

main
Sven Slootweg 6 months ago
parent cd732b7447
commit bb1c8a3d07

@ -44,11 +44,12 @@ module.exports = function createPushBuffer(_options) {
});
}
let { lanes, pull, select, selectError, mode, broadcastErrors, broadcastValues } = validateOptions(arguments, {
let { lanes, pull, select, selectError, mode, sequential, broadcastErrors, broadcastValues } = validateOptions(arguments, {
lanes: [ defaultTo(1), isInteger ],
// Currently only either push or pull mode is supported; if you need both at the same time, file an issue!
mode: [ defaultTo("pull"), oneOf([ "push", "pull" ]) ],
pull: [ requiredIfMode("pull"), isFunction ],
sequential: [ defaultTo(false), isBoolean ],
select: [ defaultTo.literal((_value) => 0), isFunction ],
selectError: [ defaultTo.literal((_value) => 0), isFunction ],
broadcastValues: [ defaultTo(false), isBoolean ],
@ -111,10 +112,12 @@ module.exports = function createPushBuffer(_options) {
let longestQueue = Math.max(... laneLengths);
if (longestQueue > 0) {
debug(`there are still requests pending; kicking off ${longestQueue} new pulls to satisfy longest queue`);
let newPulls = (sequential) ? 1 : longestQueue;
debug(`there are still requests pending; kicking off ${newPulls} new pulls to satisfy longest queue (${longestQueue})`);
debug(` lane lengths:`, laneLengths);
for (let i = 0; i < longestQueue; i++) {
for (let i = 0; i < newPulls; i++) {
doPull();
}
}
@ -240,7 +243,11 @@ module.exports = function createPushBuffer(_options) {
if (mode === "pull") {
// We don't need to handle errors in doPull because errors in the actual pull callback are propagated through the defer. Errors while *initiating* the pull should be thrown from here (which happens automatically because `doPull` is itself synchronous).
doPull();
if (sequential === false || pullQueue.length === 0) {
doPull();
} else {
// In sequential mode, we let the reconciliation process kick off the next pull once this one is completed.
}
}
return defer.promise;

@ -1,6 +1,6 @@
{
"name": "push-buffer",
"version": "1.0.0",
"version": "1.1.0",
"description": "Abstraction for converting between push/pull APIs, and managing asynchronous value distribution",
"main": "index.js",
"files": [

@ -108,3 +108,55 @@ test("multiple lane selection", async (t) => {
assert.deepStrictEqual(seen, expected);
});
test("sequential mode", async (t) => {
/* How this test works:
The starting time of the test is recorded, then each pull is delayed by a time proportional to its order - so the first pull is delayed by 100ms from start, the second by 200ms from start, etc. - and finally the actual time of completion (as a delta from the test start) is recorded as the value for each pull, divided by 100 to go back to the original accuracy of the index in the order.
If the stream operates in parallel, then all of these are kicked off at the same time, and so the recorded durations look like 100ms, 200ms, 300ms... but in sequential operation, these happen one after another, and so the durations look like 100ms, 100+200=300ms, 100+200+300=600ms, etc. - and now we can simply at the recorded (divided by 100 and rounded) values to see which of these patterns it has, and that tells us whether it ran sequentially or not.
If this test breaks because a value looks *slightly* off (but is still close enough to the target pattern), that probably means you are running it in such a slow environment that the scheduling is wildly off (more than 100ms). I'm not sure if that can be fixed with this test approach.
*/
let startTime = Date.now();
let i = 1;
let buffer = pushBuffer({
sequential: true,
pull: async () => {
await promiseDelay(i++ * 100);
return Math.round((Date.now() - startTime) / 100);
}
});
let values = await Promise.all([
buffer.request(),
buffer.request(),
buffer.request(),
buffer.request(),
buffer.request(),
]);
assert.deepStrictEqual(values, [ 1, 3, 6, 10, 15 ]);
});
test("parallel mode", async (t) => {
let startTime = Date.now();
let i = 1;
let buffer = pushBuffer({
sequential: false,
pull: async () => {
await promiseDelay(i++ * 100);
return Math.round((Date.now() - startTime) / 100);
}
});
let values = await Promise.all([
buffer.request(),
buffer.request(),
buffer.request(),
buffer.request(),
buffer.request(),
]);
assert.deepStrictEqual(values, [ 1, 2, 3, 4, 5 ]);
});

Loading…
Cancel
Save