"use strict"; const range = require("range").range; const derivedStream = require("@promistream/derived-stream"); const { validateArguments } = require("@validatem/core"); const required = require("@validatem/required"); const isInteger = require("@validatem/is-integer"); const isPositive = require("@validatem/is-positive"); const defaultTo = require("@validatem/default-to"); const either = require("@validatem/either"); const isValue = require("@validatem/is-value"); const isBoolean = require("@validatem/is-boolean"); const mirroredBuffer = require("./mirrored-buffer"); const notifiedWhen = require("./notified-when"); // FIXME: Ensure that the consume-peek-on-read model makes coherent sense let positiveOrInfinity = [ either([ isInteger, isValue(Infinity) ]), isPositive ]; module.exports = function createMirrorFork(_streamCount, _options) { let [ streamCount, options ] = validateArguments(arguments, { streamCount: [ required, isInteger, isPositive ], options: [ defaultTo({}), { bufferSize: [ required, positiveOrInfinity ], peekBufferSize: [ defaultTo(Infinity), positiveOrInfinity ], leaderMode: [ defaultTo(false), isBoolean ] }] }); let peekBuffer = mirroredBuffer(streamCount, options.leaderMode); let readBuffer = mirroredBuffer(streamCount, options.leaderMode); let when = notifiedWhen(); // FIXME: Label as 'Mirrored stream fork' return derivedStream((source) => { return range(0, streamCount).map((streamIndex) => { return { _promistreamVersion: 0, _promistreamIsSource: true, description: `Mirrored stream fork (${streamIndex})`, abort: function (reason) { return source.abort(reason); }, peek: function () { return when(() => peekBuffer.getBiggestBufferLength() < options.peekBufferSize, () => { let resultPromise = peekBuffer.getItem(streamIndex, () => source.peek()); when.notify(); return resultPromise; }); }, read: function () { return when(() => readBuffer.getBiggestBufferLength() < options.bufferSize, () => { let resultPromise = readBuffer.getItem(streamIndex, () => source.read()); // Throw away the peek corresponding to this read, if there are any peekBuffer.maybeDiscardItem(streamIndex); when.notify(); return resultPromise; }); } }; }); }); };