Compare commits

...

2 Commits

@ -43,6 +43,7 @@ Note that using lanes is __optional__; all options and functions default to lane
- __mode:__ *Default: `"pull"`.* The mode to operate in. One of `"push"`, `"pull"`.
- __lanes__ *Default: `1`.* The amount of lanes to create. Each lane is an independent queue of values and errors, though broadcasts are possible.
- __pull:__ *Required only in pull mode.* An (`async`) callback that's called to acquire a new value/result, and which is expected to return a Promise. This should be providing your values in pull mode.
- __sequential:__ *Default: `false`. Only used in pull mode.* Whether to handle requests sequentially; that is, a second pull will not be started until the previous pull has completed, even if a second request is made.
- __select:__ *Default: `0`.* A callback that receives a value, and returns the index of the lane to assign it to (or an array of such indexes). The callback may return a Promise.
- __selectError:__ *Default: `0`.* Like `select`, but it receives errors instead of values.
- __broadcastValues:__ *Default: `false`.* Whether to broadcast all values to all lanes by default. The `select` callback will not be called for broadcast values. In push mode, this can be overridden for individual values.

@ -44,11 +44,12 @@ module.exports = function createPushBuffer(_options) {
});
}
let { lanes, pull, select, selectError, mode, broadcastErrors, broadcastValues } = validateOptions(arguments, {
let { lanes, pull, select, selectError, mode, sequential, broadcastErrors, broadcastValues } = validateOptions(arguments, {
lanes: [ defaultTo(1), isInteger ],
// Currently only either push or pull mode is supported; if you need both at the same time, file an issue!
mode: [ defaultTo("pull"), oneOf([ "push", "pull" ]) ],
pull: [ requiredIfMode("pull"), isFunction ],
sequential: [ defaultTo(false), isBoolean ],
select: [ defaultTo.literal((_value) => 0), isFunction ],
selectError: [ defaultTo.literal((_value) => 0), isFunction ],
broadcastValues: [ defaultTo(false), isBoolean ],
@ -111,10 +112,12 @@ module.exports = function createPushBuffer(_options) {
let longestQueue = Math.max(... laneLengths);
if (longestQueue > 0) {
debug(`there are still requests pending; kicking off ${longestQueue} new pulls to satisfy longest queue`);
let newPulls = (sequential) ? 1 : longestQueue;
debug(`there are still requests pending; kicking off ${newPulls} new pulls to satisfy longest queue (${longestQueue})`);
debug(` lane lengths:`, laneLengths);
for (let i = 0; i < longestQueue; i++) {
for (let i = 0; i < newPulls; i++) {
doPull();
}
}
@ -240,7 +243,11 @@ module.exports = function createPushBuffer(_options) {
if (mode === "pull") {
// We don't need to handle errors in doPull because errors in the actual pull callback are propagated through the defer. Errors while *initiating* the pull should be thrown from here (which happens automatically because `doPull` is itself synchronous).
doPull();
if (sequential === false || pullQueue.length === 0) {
doPull();
} else {
// In sequential mode, we let the reconciliation process kick off the next pull once this one is completed.
}
}
return defer.promise;

@ -1,6 +1,6 @@
{
"name": "push-buffer",
"version": "1.0.0",
"version": "1.1.1",
"description": "Abstraction for converting between push/pull APIs, and managing asynchronous value distribution",
"main": "index.js",
"files": [

@ -108,3 +108,55 @@ test("multiple lane selection", async (t) => {
assert.deepStrictEqual(seen, expected);
});
test("sequential mode", async (t) => {
/* How this test works:
The starting time of the test is recorded, then each pull is delayed by a time proportional to its order - so the first pull is delayed by 100ms from start, the second by 200ms from start, etc. - and finally the actual time of completion (as a delta from the test start) is recorded as the value for each pull, divided by 100 to go back to the original accuracy of the index in the order.
If the stream operates in parallel, then all of these are kicked off at the same time, and so the recorded durations look like 100ms, 200ms, 300ms... but in sequential operation, these happen one after another, and so the durations look like 100ms, 100+200=300ms, 100+200+300=600ms, etc. - and now we can simply at the recorded (divided by 100 and rounded) values to see which of these patterns it has, and that tells us whether it ran sequentially or not.
If this test breaks because a value looks *slightly* off (but is still close enough to the target pattern), that probably means you are running it in such a slow environment that the scheduling is wildly off (more than 100ms). I'm not sure if that can be fixed with this test approach.
*/
let startTime = Date.now();
let i = 1;
let buffer = pushBuffer({
sequential: true,
pull: async () => {
await promiseDelay(i++ * 100);
return Math.round((Date.now() - startTime) / 100);
}
});
let values = await Promise.all([
buffer.request(),
buffer.request(),
buffer.request(),
buffer.request(),
buffer.request(),
]);
assert.deepStrictEqual(values, [ 1, 3, 6, 10, 15 ]);
});
test("parallel mode", async (t) => {
let startTime = Date.now();
let i = 1;
let buffer = pushBuffer({
sequential: false,
pull: async () => {
await promiseDelay(i++ * 100);
return Math.round((Date.now() - startTime) / 100);
}
});
let values = await Promise.all([
buffer.request(),
buffer.request(),
buffer.request(),
buffer.request(),
buffer.request(),
]);
assert.deepStrictEqual(values, [ 1, 2, 3, 4, 5 ]);
});

Loading…
Cancel
Save