Initial commit
commit
630e3e8027
@ -0,0 +1 @@
|
||||
node_modules
|
@ -0,0 +1,56 @@
|
||||
"use strict";
|
||||
|
||||
const Promise = require("bluebird");
|
||||
|
||||
const pipe = require("@promistream/pipe");
|
||||
const map = require("@promistream/map");
|
||||
const collect = require("@promistream/collect");
|
||||
const fromIterable = require("@promistream/from-iterable");
|
||||
const forkFilter = require("./");
|
||||
|
||||
return Promise.try(() => {
|
||||
let [ streamA, streamB ] = pipe([
|
||||
fromIterable([ 0, 1, 3, 4, 6, 8, 9, 10, 11, 13, 15, 16 ]),
|
||||
forkFilter(2, (value) => value % 2)
|
||||
]).read();
|
||||
|
||||
return Promise.all([
|
||||
pipe([
|
||||
streamA,
|
||||
map((string) => `[stream A] ${string}`),
|
||||
collect()
|
||||
]).read(),
|
||||
pipe([
|
||||
streamB,
|
||||
map((string) => {
|
||||
return Promise
|
||||
.resolve(`[stream B] ${string}`)
|
||||
.delay(20); // Emulate some slow processing step
|
||||
}),
|
||||
collect()
|
||||
]).read(),
|
||||
]);
|
||||
}).then((results) => {
|
||||
console.log(results);
|
||||
|
||||
/*
|
||||
[
|
||||
[
|
||||
'[stream A] 0',
|
||||
'[stream A] 4',
|
||||
'[stream A] 6',
|
||||
'[stream A] 8',
|
||||
'[stream A] 10',
|
||||
'[stream A] 16'
|
||||
],
|
||||
[
|
||||
'[stream B] 1',
|
||||
'[stream B] 3',
|
||||
'[stream B] 9',
|
||||
'[stream B] 11',
|
||||
'[stream B] 13',
|
||||
'[stream B] 15'
|
||||
]
|
||||
]
|
||||
*/
|
||||
});
|
@ -0,0 +1,53 @@
|
||||
"use strict";
|
||||
|
||||
const range = require("range").range;
|
||||
|
||||
const derivedStream = require("@promistream/derived-stream");
|
||||
|
||||
const { validateArguments } = require("@validatem/core");
|
||||
const required = require("@validatem/required");
|
||||
const isInteger = require("@validatem/is-integer");
|
||||
const isPositive = require("@validatem/is-positive");
|
||||
const isFunction = require("@validatem/is-function");
|
||||
|
||||
// TODO: Think about when exactly an EOS/error should be passed through to all pending stream reads
|
||||
|
||||
const indexedQueue = require("./indexed-queue");
|
||||
|
||||
/* Strategies:
|
||||
- Optimize for speed: Each attempted read from a fork triggers upstream reads until one is sorted into that fork. Other forks that values were sorted into, will buffer those values up for their next reads (which will not cause additional upstream reads). This may cause buffers to grow large, if not all of the fork streams are read simultaneously.
|
||||
- Optimize for resources: Each attempted read from a fork triggers one upstream read, sorting it into the correct queue. That may not be the queue that was read from. Eventually the amount of reads should equal the amount of produced values, so it should all balance out in the end, but it's possible that one fork may need to wait on another to call for more data. This may deadlock, if not all of the fork streams are read simultaneously.
|
||||
|
||||
NOTE: Need to make sure this is order-preserving, ie. stream read results are processed in the order that they were originally requested, not in the order that they succeed in. This is because we don't know what fork a value will be sorted into until we have the result from the callback, so we can't know whether something will be the 'first' value for a given fork until *all* of the preceding values have been completely evaluated. Maybe have an option to disable order preservation, for faster operation?
|
||||
*/
|
||||
|
||||
module.exports = function createFilterFork(_streamCount, _callback) {
|
||||
let [ streamCount, callback ] = validateArguments(arguments, {
|
||||
streamCount: [ required, isInteger, isPositive ],
|
||||
callback: [ required, isFunction ]
|
||||
});
|
||||
|
||||
let withQueue = indexedQueue(streamCount, callback);
|
||||
|
||||
return derivedStream((source) => {
|
||||
return range(0, streamCount).map((streamIndex) => {
|
||||
return {
|
||||
_promistreamVersion: 0,
|
||||
_promistreamIsSource: true,
|
||||
description: `Filter stream fork (${streamIndex})`,
|
||||
abort: function (reason) {
|
||||
return source.abort(reason);
|
||||
},
|
||||
peek: function () {
|
||||
// FIXME: Check that this is correct, and whether peek in fork-round-robin needs to also be changed to *not* use withQueue
|
||||
return source.peek();
|
||||
},
|
||||
read: async function () {
|
||||
return withQueue(streamIndex, () => {
|
||||
return source.read();
|
||||
});
|
||||
}
|
||||
};
|
||||
});
|
||||
});
|
||||
};
|
@ -0,0 +1,65 @@
|
||||
"use strict";
|
||||
|
||||
const Promise = require("bluebird");
|
||||
const range = require("range").range;
|
||||
const promisePullQueue = require("promise-pull-queue").default;
|
||||
|
||||
// FIXME: Move to separate package
|
||||
|
||||
module.exports = function createIndexedQueue(queueCount, getIndexForValue) {
|
||||
let running = false;
|
||||
let inQueue = [];
|
||||
let outQueues = range(0, queueCount).map(() => new promisePullQueue());
|
||||
|
||||
// TODO: Ideally find a better way of handling this; especially the fallback when there are no queued pulls is pretty weird...
|
||||
function pushToFirstQueue(promise) {
|
||||
for (let queue of outQueues) {
|
||||
if (queue.pullQueueSize > 0) {
|
||||
queue.push(promise);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Nobody is pulling, push to the first queue I guess?
|
||||
outQueues[0].push(promise);
|
||||
}
|
||||
|
||||
async function startProcessing() {
|
||||
if (!running) {
|
||||
running = true;
|
||||
return tryProcessItem();
|
||||
}
|
||||
}
|
||||
|
||||
async function tryProcessItem() {
|
||||
return Promise.try(() => {
|
||||
return inQueue.shift();
|
||||
}).then(async (value) => {
|
||||
let index = await getIndexForValue(value);
|
||||
|
||||
let outQueue = outQueues[index];
|
||||
outQueue.push(Promise.resolve(value));
|
||||
}).catch((error) => {
|
||||
// If we don't have a value, then we can't associate the result with a specific forked stream, so we just grab the first one that happens to have a pending read
|
||||
pushToFirstQueue(Promise.reject(error));
|
||||
}).finally(() => {
|
||||
if (inQueue.length > 0) {
|
||||
return tryProcessItem();
|
||||
} else {
|
||||
running = false;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return function withQueue(index, getValue) {
|
||||
let outQueue = outQueues[index];
|
||||
let promise = outQueue.pull();
|
||||
|
||||
// We always queue a new read, even if we've returned a value from the buffer; maintaining a 1:1 correspondence between requested reads and fulfilled reads (even if they occur in different stream forks) ensures that the process cannot deadlock within the stream implementation. (Deadlocking due to application logic is still possible!)
|
||||
// TODO: Eventually add a "keep reading until a result is yielded" mode to prevent application-level deadlocks, at the cost of larger internal buffers
|
||||
inQueue.push(getValue());
|
||||
startProcessing();
|
||||
|
||||
return promise;
|
||||
};
|
||||
};
|
@ -0,0 +1,30 @@
|
||||
{
|
||||
"name": "@promistream/fork-filter",
|
||||
"version": "0.1.0",
|
||||
"main": "index.js",
|
||||
"keywords": [
|
||||
"promistream"
|
||||
],
|
||||
"repository": "http://git.cryto.net/promistream/fork-filter.git",
|
||||
"author": "Sven Slootweg <admin@cryto.net>",
|
||||
"license": "WTFPL OR CC0-1.0",
|
||||
"devDependencies": {
|
||||
"@joepie91/eslint-config": "^1.1.0",
|
||||
"@promistream/collect": "^0.1.0",
|
||||
"@promistream/from-iterable": "^0.1.0",
|
||||
"@promistream/map": "^0.1.0",
|
||||
"@promistream/pipe": "^0.1.1",
|
||||
"eslint": "^6.8.0"
|
||||
},
|
||||
"dependencies": {
|
||||
"@promistream/derived-stream": "^0.1.0",
|
||||
"@validatem/core": "^0.3.15",
|
||||
"@validatem/is-function": "^0.1.0",
|
||||
"@validatem/is-integer": "^0.1.0",
|
||||
"@validatem/is-positive": "^1.0.0",
|
||||
"@validatem/required": "^0.1.1",
|
||||
"bluebird": "^3.7.2",
|
||||
"promise-pull-queue": "^2.0.0",
|
||||
"range": "^0.0.3"
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue