You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

75 lines
2.3 KiB
JavaScript

3 years ago
"use strict";
const range = require("range").range;
const derivedStream = require("@promistream/derived-stream");
const { validateArguments } = require("@validatem/core");
const required = require("@validatem/required");
const isInteger = require("@validatem/is-integer");
const isPositive = require("@validatem/is-positive");
const defaultTo = require("@validatem/default-to");
const either = require("@validatem/either");
const isValue = require("@validatem/is-value");
const isBoolean = require("@validatem/is-boolean");
const mirroredBuffer = require("./mirrored-buffer");
const notifiedWhen = require("./notified-when");
// FIXME: Ensure that the consume-peek-on-read model makes coherent sense
let positiveOrInfinity = [
either([ isInteger, isValue(Infinity) ]),
isPositive
];
module.exports = function createMirrorFork(_streamCount, _options) {
let [ streamCount, options ] = validateArguments(arguments, {
streamCount: [ required, isInteger, isPositive ],
options: [ defaultTo({}), {
bufferSize: [ required, positiveOrInfinity ],
peekBufferSize: [ defaultTo(Infinity), positiveOrInfinity ],
leaderMode: [ defaultTo(false), isBoolean ]
}]
});
let peekBuffer = mirroredBuffer(streamCount, options.leaderMode);
let readBuffer = mirroredBuffer(streamCount, options.leaderMode);
let when = notifiedWhen();
// FIXME: Label as 'Mirrored stream fork'
return derivedStream((source) => {
return range(0, streamCount).map((streamIndex) => {
return {
_promistreamVersion: 0,
_promistreamIsSource: true,
description: `Mirrored stream fork (${streamIndex})`,
abort: function (reason) {
return source.abort(reason);
},
peek: function () {
return when(() => peekBuffer.getBiggestBufferLength() < options.peekBufferSize, () => {
let resultPromise = peekBuffer.getItem(streamIndex, () => source.peek());
when.notify();
return resultPromise;
});
},
read: function () {
return when(() => readBuffer.getBiggestBufferLength() < options.bufferSize, () => {
let resultPromise = readBuffer.getItem(streamIndex, () => source.read());
// Throw away the peek corresponding to this read, if there are any
peekBuffer.maybeDiscardItem(streamIndex);
when.notify();
return resultPromise;
});
}
};
});
});
};