You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
75 lines
2.3 KiB
JavaScript
75 lines
2.3 KiB
JavaScript
"use strict";
|
|
|
|
const range = require("range").range;
|
|
|
|
const derivedStream = require("@promistream/derived-stream");
|
|
|
|
const { validateArguments } = require("@validatem/core");
|
|
const required = require("@validatem/required");
|
|
const isInteger = require("@validatem/is-integer");
|
|
const isPositive = require("@validatem/is-positive");
|
|
const defaultTo = require("@validatem/default-to");
|
|
const either = require("@validatem/either");
|
|
const isValue = require("@validatem/is-value");
|
|
const isBoolean = require("@validatem/is-boolean");
|
|
|
|
const mirroredBuffer = require("./mirrored-buffer");
|
|
const notifiedWhen = require("./notified-when");
|
|
|
|
// FIXME: Ensure that the consume-peek-on-read model makes coherent sense
|
|
|
|
let positiveOrInfinity = [
|
|
either([ isInteger, isValue(Infinity) ]),
|
|
isPositive
|
|
];
|
|
|
|
module.exports = function createMirrorFork(_streamCount, _options) {
|
|
let [ streamCount, options ] = validateArguments(arguments, {
|
|
streamCount: [ required, isInteger, isPositive ],
|
|
options: [ defaultTo({}), {
|
|
bufferSize: [ required, positiveOrInfinity ],
|
|
peekBufferSize: [ defaultTo(Infinity), positiveOrInfinity ],
|
|
leaderMode: [ defaultTo(false), isBoolean ]
|
|
}]
|
|
});
|
|
|
|
let peekBuffer = mirroredBuffer(streamCount, options.leaderMode);
|
|
let readBuffer = mirroredBuffer(streamCount, options.leaderMode);
|
|
let when = notifiedWhen();
|
|
|
|
// FIXME: Label as 'Mirrored stream fork'
|
|
return derivedStream((source) => {
|
|
return range(0, streamCount).map((streamIndex) => {
|
|
return {
|
|
_promistreamVersion: 0,
|
|
_promistreamIsSource: true,
|
|
description: `Mirrored stream fork (${streamIndex})`,
|
|
abort: function (reason) {
|
|
return source.abort(reason);
|
|
},
|
|
peek: function () {
|
|
return when(() => peekBuffer.getBiggestBufferLength() < options.peekBufferSize, () => {
|
|
let resultPromise = peekBuffer.getItem(streamIndex, () => source.peek());
|
|
|
|
when.notify();
|
|
|
|
return resultPromise;
|
|
});
|
|
},
|
|
read: function () {
|
|
return when(() => readBuffer.getBiggestBufferLength() < options.bufferSize, () => {
|
|
let resultPromise = readBuffer.getItem(streamIndex, () => source.read());
|
|
|
|
// Throw away the peek corresponding to this read, if there are any
|
|
peekBuffer.maybeDiscardItem(streamIndex);
|
|
|
|
when.notify();
|
|
|
|
return resultPromise;
|
|
});
|
|
}
|
|
};
|
|
});
|
|
});
|
|
};
|