Initial commit

master
Sven Slootweg 3 years ago
commit 2abd52f919

@ -0,0 +1,3 @@
{
"extends": "@joepie91/eslint-config"
}

1
.gitignore vendored

@ -0,0 +1 @@
node_modules

@ -0,0 +1,69 @@
"use strict";
const Promise = require("bluebird");
const pipe = require("@promistream/pipe");
const map = require("@promistream/map");
const spy = require("@promistream/spy");
const collect = require("@promistream/collect");
const fromIterable = require("@promistream/from-iterable");
const forkMirror = require("./");
return Promise.try(() => {
let [ streamA, streamB ] = pipe([
fromIterable([ "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k" ]),
forkMirror(2, { bufferSize: Infinity, leaderMode: false }) // NOTE: Change `leaderMode` to see the output order change
]).read();
return Promise.all([
pipe([
streamA,
map((string) => {
return Promise
.resolve(`[stream A] ${string}`)
.delay(20); // Emulate some slow processing step
}),
spy((value) => console.log(value)),
collect()
]).read(),
pipe([
streamB,
map((string) => `[stream B] ${string}`),
spy((value) => console.log(value)),
collect()
]).read(),
]);
}).then((results) => {
console.log(results);
/*
[
[
'[stream A] a',
'[stream A] b',
'[stream A] c',
'[stream A] d',
'[stream A] e',
'[stream A] f',
'[stream A] g',
'[stream A] h',
'[stream A] i',
'[stream A] j',
'[stream A] k'
],
[
'[stream B] a',
'[stream B] b',
'[stream B] c',
'[stream B] d',
'[stream B] e',
'[stream B] f',
'[stream B] g',
'[stream B] h',
'[stream B] i',
'[stream B] j',
'[stream B] k'
]
]
*/
});

@ -0,0 +1,74 @@
"use strict";
const range = require("range").range;
const derivedStream = require("@promistream/derived-stream");
const { validateArguments } = require("@validatem/core");
const required = require("@validatem/required");
const isInteger = require("@validatem/is-integer");
const isPositive = require("@validatem/is-positive");
const defaultTo = require("@validatem/default-to");
const either = require("@validatem/either");
const isValue = require("@validatem/is-value");
const isBoolean = require("@validatem/is-boolean");
const mirroredBuffer = require("./mirrored-buffer");
const notifiedWhen = require("./notified-when");
// FIXME: Ensure that the consume-peek-on-read model makes coherent sense
let positiveOrInfinity = [
either([ isInteger, isValue(Infinity) ]),
isPositive
];
module.exports = function createMirrorFork(_streamCount, _options) {
let [ streamCount, options ] = validateArguments(arguments, {
streamCount: [ required, isInteger, isPositive ],
options: [ defaultTo({}), {
bufferSize: [ required, positiveOrInfinity ],
peekBufferSize: [ defaultTo(Infinity), positiveOrInfinity ],
leaderMode: [ defaultTo(false), isBoolean ]
}]
});
let peekBuffer = mirroredBuffer(streamCount, options.leaderMode);
let readBuffer = mirroredBuffer(streamCount, options.leaderMode);
let when = notifiedWhen();
// FIXME: Label as 'Mirrored stream fork'
return derivedStream((source) => {
return range(0, streamCount).map((streamIndex) => {
return {
_promistreamVersion: 0,
_promistreamIsSource: true,
description: `Mirrored stream fork (${streamIndex})`,
abort: function (reason) {
return source.abort(reason);
},
peek: function () {
return when(() => peekBuffer.getBiggestBufferLength() < options.peekBufferSize, () => {
let resultPromise = peekBuffer.getItem(streamIndex, () => source.peek());
when.notify();
return resultPromise;
});
},
read: function () {
return when(() => readBuffer.getBiggestBufferLength() < options.bufferSize, () => {
let resultPromise = readBuffer.getItem(streamIndex, () => source.read());
// Throw away the peek corresponding to this read, if there are any
peekBuffer.maybeDiscardItem(streamIndex);
when.notify();
return resultPromise;
});
}
};
});
});
};

@ -0,0 +1,64 @@
"use strict";
const range = require("range").range;
const asExpression = require("as-expression");
const pDefer = require("p-defer");
// FIXME: Move into separate package
module.exports = function createMirroredBuffer(slots, leaderMode = false) {
let buffers = range(0, slots).map(() => []);
let bufferLengths = range(0, slots).map(() => []);
return {
getItem: function (slotIndex, callback) {
let slotBuffer = buffers[slotIndex];
if (slotBuffer.length === 0) {
let value = asExpression(() => {
if (leaderMode) {
// NOTE: In leader mode, we don't immediately start the operation, but instead store a placeholder defer. Then whenever the stream with index 0 picks out the item, *it* will start the operation (see further down below) and attach its promise to the existing defer, thereby resolving it for every stream.
let { promise, resolve } = pDefer();
return {
promise: promise,
resolve: resolve
};
} else {
return {
promise: callback(),
resolve: null
};
}
});
buffers.forEach((buffer, i) => {
buffer.push(value);
bufferLengths[i] += 1;
});
}
bufferLengths[slotIndex] -= 1;
let nextItem = slotBuffer.shift();
if (leaderMode && slotIndex === 0) {
nextItem.resolve(callback());
}
return nextItem.promise;
},
// FIXME: Add note to README asking anyone who needs a strict discardItem, to file an issue to that effect
maybeDiscardItem: function (slotIndex) {
let slotBuffer = buffers[slotIndex];
if (slotBuffer.length > 0) {
slotBuffer.shift();
bufferLengths[slotIndex] -= 1;
}
},
getBiggestBufferLength: function () {
// TODO: Pre-calculate this on changes instead? Using bufferLengths. Figure out how to make that work with external buffer changes...
return Math.max(... buffers.map((buffer) => buffer.length));
}
};
};

@ -0,0 +1,29 @@
"use strict";
const pDefer = require("p-defer");
// FIXME: Move into separate package, and see if this can be used in other stream implementations too
module.exports = function createPromiseNotifier() {
let promise, resolve;
return {
wait: function () {
if (promise == null) {
let defer = pDefer();
promise = defer.promise;
resolve = defer.resolve;
}
return promise;
},
notify: function (value) {
if (resolve != null) {
resolve(value);
promise = null;
resolve = null;
}
}
};
};

@ -0,0 +1,28 @@
"use strict";
const Promise = require("bluebird");
const notifiedPromise = require("./notified-promise");
// FIXME: Move into separate package
module.exports = function createNotifiedWhen() {
let notifier = notifiedPromise();
function when(condition, callback) {
if (condition()) {
return callback();
} else {
return Promise.try(() => {
return notifier.wait();
}).then(() => {
return when(condition, callback);
});
}
}
when.notify = function notifyWhen() {
return notifier.notify();
};
return when;
};

@ -0,0 +1,35 @@
{
"name": "@promistream/fork-mirror",
"version": "0.1.0",
"main": "index.js",
"keywords": [
"promistream"
],
"repository": "http://git.cryto.net/promistream/fork-mirror.git",
"author": "Sven Slootweg <admin@cryto.net>",
"license": "WTFPL OR CC0-1.0",
"devDependencies": {
"@joepie91/eslint-config": "^1.1.0",
"@promistream/collect": "^0.1.0",
"@promistream/from-iterable": "^0.1.0",
"@promistream/map": "^0.1.0",
"@promistream/pipe": "^0.1.1",
"@promistream/spy": "^0.1.0",
"eslint": "^6.8.0"
},
"dependencies": {
"@promistream/derived-stream": "^0.1.0",
"@validatem/core": "^0.3.15",
"@validatem/default-to": "^0.1.0",
"@validatem/either": "^0.1.9",
"@validatem/is-boolean": "^0.1.1",
"@validatem/is-integer": "^0.1.0",
"@validatem/is-positive": "^1.0.0",
"@validatem/is-value": "^0.1.0",
"@validatem/required": "^0.1.1",
"as-expression": "^1.0.0",
"bluebird": "^3.7.2",
"p-defer": "3",
"range": "^0.0.3"
}
}

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save