"use strict"; const range = require("range").range; const pushBuffer = require("push-buffer"); const derivedStream = require("@promistream/derived-stream"); const { validateArguments } = require("@validatem/core"); const required = require("@validatem/required"); const isInteger = require("@validatem/is-integer"); const isPositive = require("@validatem/is-positive"); const isFunction = require("@validatem/is-function"); module.exports = function createForkSelectForks(_streamCount, _selectionCallback) { let [ streamCount, selectionCallback ] = validateArguments(arguments, { streamCount: [ required, isInteger, isPositive ], selectionCallback: [ required, isFunction ] }); return derivedStream((source) => { let buffer = pushBuffer({ lanes: streamCount, select: selectionCallback, pull: async () => source.read() }); return range(0, streamCount).map((streamIndex) => { return { _promistreamVersion: true, _promistreamIsSource: true, description: `fork-select stream (${streamIndex})`, abort: function (reason) { return source.abort(reason); }, peek: async function () { // TODO: Improve the behaviour here somehow, maybe buffering up read values on every peek? return (buffer.countLanes(streamIndex).values > 0); }, read: async function () { try { let value = await buffer.request(streamIndex); return value; } catch (error) { throw error; } } }; }); }); };