You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

163 lines
4.7 KiB
JavaScript

"use strict";
const test = require("node:test");
const assert = require("node:assert");
const pushBuffer = require("./");
function promiseDelay(time) {
return new Promise((resolve, _reject) => {
setTimeout(resolve, time);
});
}
// Contributions of additional tests are welcome! As long as they test exposed API, not internals.
test("happy path", async (t) => {
let seenValues = new Map();
let expectedValues = new Map();
for (let m = 0; m < 100; m++) { expectedValues.set(m, m); }
let i = 0;
let buffer = pushBuffer({
lanes: 2,
pull: async () => {
let result = i++;
await promiseDelay(Math.random() * 50);
return result;
},
select: (value) => value % 2
});
let promises = [];
for (let n = 0; n < 100; n++) {
// not awaiting here; we *want* multiple of them to happen concurrently
promises.push(buffer.request(n % 2).then((value) => seenValues.set(n, value)));
await promiseDelay(Math.random() * 50);
}
// wait for all (potentially concurrent) buffer reads to complete before checking the outcome
await Promise.all(promises);
assert.deepStrictEqual(seenValues, expectedValues);
});
test("push mode", async (t) => {
let buffer = pushBuffer({
mode: "push"
});
let expected = [];
for (let n = 0; n < 100; n++) { expected.push(n); }
let promises = [];
for (let n = 0; n < 100; n++) {
promises.push(buffer.request());
}
for (let n = 0; n < 100; n++) {
buffer.push(n);
}
let outcome = await Promise.all(promises);
assert.deepStrictEqual(outcome, expected);
});
test("multiple lane selection", async (t) => {
let allLanes = [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 ];
let i = 0;
let buffer = pushBuffer({
lanes: 10,
pull: async () => i++,
select: (value) => {
// Only send to lanes that the value is divisible by
return allLanes.filter((lane) => value % (lane+1) === 0);
}
});
let seen = new Map();
let promises = [];
let expected = new Map([
[ 0, [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 ] ],
[ 1, [ 0, 2, 4, 6, 8, 10, 12, 14, 16, 18 ] ],
[ 2, [ 0, 3, 6, 9, 12, 15, 18, 21, 24, 27 ] ],
[ 3, [ 0, 4, 8, 12, 16, 20, 24, 28, 32, 36 ] ],
[ 4, [ 0, 5, 10, 15, 20, 25, 30, 35, 40, 45 ] ],
[ 5, [ 0, 6, 12, 18, 24, 30, 36, 42, 48, 54 ] ],
[ 6, [ 0, 7, 14, 21, 28, 35, 42, 49, 56, 63 ] ],
[ 7, [ 0, 8, 16, 24, 32, 40, 48, 56, 64, 72 ] ],
[ 8, [ 0, 9, 18, 27, 36, 45, 54, 63, 72, 81 ] ],
[ 9, [ 0, 10, 20, 30, 40, 50, 60, 70, 80, 90 ] ],
]);
for (let n = 0; n < 10; n++) {
for (let x = 0; x < 10; x++) {
promises.push(buffer.request(n).then((value) => {
if (!seen.has(n)) {
seen.set(n, []);
}
seen.get(n).push(value);
}));
}
}
await Promise.all(promises);
assert.deepStrictEqual(seen, expected);
});
test("sequential mode", async (t) => {
/* How this test works:
The starting time of the test is recorded, then each pull is delayed by a time proportional to its order - so the first pull is delayed by 100ms from start, the second by 200ms from start, etc. - and finally the actual time of completion (as a delta from the test start) is recorded as the value for each pull, divided by 100 to go back to the original accuracy of the index in the order.
If the stream operates in parallel, then all of these are kicked off at the same time, and so the recorded durations look like 100ms, 200ms, 300ms... but in sequential operation, these happen one after another, and so the durations look like 100ms, 100+200=300ms, 100+200+300=600ms, etc. - and now we can simply at the recorded (divided by 100 and rounded) values to see which of these patterns it has, and that tells us whether it ran sequentially or not.
If this test breaks because a value looks *slightly* off (but is still close enough to the target pattern), that probably means you are running it in such a slow environment that the scheduling is wildly off (more than 100ms). I'm not sure if that can be fixed with this test approach.
*/
let startTime = Date.now();
let i = 1;
let buffer = pushBuffer({
sequential: true,
pull: async () => {
await promiseDelay(i++ * 100);
return Math.round((Date.now() - startTime) / 100);
}
});
let values = await Promise.all([
buffer.request(),
buffer.request(),
buffer.request(),
buffer.request(),
buffer.request(),
]);
assert.deepStrictEqual(values, [ 1, 3, 6, 10, 15 ]);
});
test("parallel mode", async (t) => {
let startTime = Date.now();
let i = 1;
let buffer = pushBuffer({
sequential: false,
pull: async () => {
await promiseDelay(i++ * 100);
return Math.round((Date.now() - startTime) / 100);
}
});
let values = await Promise.all([
buffer.request(),
buffer.request(),
buffer.request(),
buffer.request(),
buffer.request(),
]);
assert.deepStrictEqual(values, [ 1, 2, 3, 4, 5 ]);
});