You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

52 lines
1.5 KiB
JavaScript

"use strict";
const range = require("range").range;
const pushBuffer = require("push-buffer");
const derivedStream = require("@promistream/derived-stream");
const { validateArguments } = require("@validatem/core");
const required = require("@validatem/required");
const isInteger = require("@validatem/is-integer");
const isPositive = require("@validatem/is-positive");
const isFunction = require("@validatem/is-function");
module.exports = function createForkSelectForks(_streamCount, _selectionCallback) {
let [ streamCount, selectionCallback ] = validateArguments(arguments, {
streamCount: [ required, isInteger, isPositive ],
selectionCallback: [ required, isFunction ]
});
return derivedStream((source) => {
let buffer = pushBuffer({
lanes: streamCount,
select: selectionCallback,
pull: async () => source.read()
});
return range(0, streamCount).map((streamIndex) => {
return {
_promistreamVersion: true,
_promistreamIsSource: true,
description: `fork-select stream (${streamIndex})`,
abort: function (reason) {
return source.abort(reason);
},
peek: async function () {
// TODO: Improve the behaviour here somehow, maybe buffering up read values on every peek?
return (buffer.countLanes(streamIndex).values > 0);
},
read: async function () {
try {
let value = await buffer.request(streamIndex);
return value;
} catch (error) {
throw error;
}
}
};
});
});
};