Stream pooling

master
Sven Slootweg 3 years ago
parent 451761d589
commit d0d4fec113

@ -7,6 +7,7 @@ const rangeNumbers = require("@promistream/range-numbers");
const map = require("@promistream/map"); const map = require("@promistream/map");
const filter = require("@promistream/filter"); const filter = require("@promistream/filter");
const collect = require("@promistream/collect"); const collect = require("@promistream/collect");
const parallelize = require("@promistream/parallelize");
function createMap() { function createMap() {
return map((number) => number * 2); return map((number) => number * 2);
@ -16,7 +17,6 @@ function createFilter() {
return filter(() => false); return filter(() => false);
} }
// TODO: Test in parallel mode
return Promise.try(() => { return Promise.try(() => {
return pipe([ return pipe([
rangeNumbers(0, 10), rangeNumbers(0, 10),
@ -27,6 +27,7 @@ return Promise.try(() => {
return createFilter; return createFilter;
} }
}), }),
parallelize(3),
collect() collect()
]).read(); ]).read();
}).then((result) => { }).then((result) => {

@ -2,11 +2,12 @@
// FIXME: Stream reuse via pooling. Instead of spawning a new stream every value, reuse existing streams from a pool, and hold upstream reads instead of sending EndOfStreams. This is not only useful for better performance, but also ensures that streams don't see EOS (and therefore don't do cleanup) until we're *actually* done with the entire thing - this matters especially when dealing with clones with shared state. // FIXME: Stream reuse via pooling. Instead of spawning a new stream every value, reuse existing streams from a pool, and hold upstream reads instead of sending EndOfStreams. This is not only useful for better performance, but also ensures that streams don't see EOS (and therefore don't do cleanup) until we're *actually* done with the entire thing - this matters especially when dealing with clones with shared state.
// TODO: Make `clone` an optional part of the stream API and use that internally in this module to instantiate clones as-needed from an individual stream rather than a stream factory. Need to figure out, though, how to do resource management in that case - need to ensure that when the upstream ends, not just the clones but also the original input stream receive an EndOfStream signal. // TODO: Make `clone` an optional part of the stream API and use that internally in this module to instantiate clones as-needed from an individual stream rather than a stream factory. Need to figure out, though, how to do resource management in that case - need to ensure that when the upstream ends, not just the clones but also the original input stream receive an EndOfStream signal.
// FIXME: Verify that all streams in the pool receive an EndOfStream when upstream ends, and have a chance to finalize their internal processing
const Promise = require("bluebird"); const Promise = require("bluebird");
const pDefer = require("p-defer"); const pDefer = require("p-defer");
const debug = require("debug")("@promistream/dynamic"); const debug = require("debug")("promistream:dynamic");
const valueID = require("./value-id"); const unreachable = require("@joepie91/unreachable")("@promistream/dynamic");
const pipe = require("@promistream/pipe"); const pipe = require("@promistream/pipe");
const fromValue = require("@promistream/from-value"); const fromValue = require("@promistream/from-value");
@ -15,9 +16,14 @@ const isEndOfStream = require("@promistream/is-end-of-stream");
const propagatePeek = require("@promistream/propagate-peek"); const propagatePeek = require("@promistream/propagate-peek");
const propagateAbort = require("@promistream/propagate-abort"); const propagateAbort = require("@promistream/propagate-abort");
const valueID = require("./value-id");
const streamPool = require("./stream-pool");
module.exports = function createDynamicStream(streamPickerFunc) { module.exports = function createDynamicStream(streamPickerFunc) {
let requestQueue = []; let requestQueue = [];
let streamQueue = []; let streamQueue = [];
let pool = streamPool();
let wrapperToStream = new WeakMap();
let isRunning = false; let isRunning = false;
let lastSource; let lastSource;
@ -28,12 +34,25 @@ module.exports = function createDynamicStream(streamPickerFunc) {
return startPipeline(); return startPipeline();
} }
}).then(() => { }).then(() => {
let currentStream = streamQueue[0];
return Promise.try(() => { return Promise.try(() => {
debug(`[stream ${valueID(currentStream)}] Attempting to read value...`)
return streamQueue[0].read(); return streamQueue[0].read();
}).tap(() => {
debug(`[stream ${valueID(currentStream)}] Successfully read value`)
}).catch(isEndOfStream, () => { }).catch(isEndOfStream, () => {
// Current stream is depleted, move on to the next stream in line // Current stream is depleted, move on to the next stream in line
streamQueue.shift(); debug(`[stream ${valueID(currentStream)}] Depleted, removing from queue`);
return attemptStreamRead(); let acquiredStream = wrapperToStream.get(currentStream);
if (acquiredStream != null) {
pool.release(acquiredStream);
streamQueue.shift();
return attemptStreamRead();
} else {
throw unreachable("Stream wrapper does not exist in mapping");
}
}); });
}); });
} }
@ -41,42 +60,53 @@ module.exports = function createDynamicStream(streamPickerFunc) {
function tryHandleQueue() { function tryHandleQueue() {
// FIXME: It's possible that a pipeline results in *no* items and so we come up short on pipelines anyway, need to compensate for this by starting a new pipeline // FIXME: It's possible that a pipeline results in *no* items and so we come up short on pipelines anyway, need to compensate for this by starting a new pipeline
if (requestQueue.length > 0) { if (requestQueue.length > 0) {
if (isRunning === false) { // FIXME: Parallelism, locking? We can't sequentialize the entire thing because we *do* want to accept parallel reads from downstream (so that necessary tasks can be queued behind the scenes), it's just the fetching values from selected pipelines that we want to sequentalize across the stream queue
// FIXME: Parallelism, locking? We can't sequentialize the entire thing because we *do* want to accept parallel reads from downstream (so that necessary tasks can be queued behind the scenes), it's just the fetching values from selected pipelines that we want to sequentalize across the stream queue // Can probably do this with a good ol' recursive loop and an isRunning marker -- is there a way to abstract this out? We seem to be using this pattern in a lot of different places
// Can probably do this with a good ol' recursive loop and an isRunning marker -- is there a way to abstract this out? We seem to be using this pattern in a lot of different places let resolveRequest = requestQueue.shift();
let resolveRequest = requestQueue.shift(); let read = attemptStreamRead();
let read = attemptStreamRead();
debug(`[request ${valueID(resolveRequest)}] Attaching read operation`);
debug(`Attaching read operation to request ID ${valueID(resolveRequest)}`); resolveRequest(read);
resolveRequest(read);
return Promise.try(() => {
return Promise.try(() => { return read;
return read; }).catch(() => {
}).catch(() => { // NOTE: We treat any kind of error as a 'completed read' - the error will come out of the request that the read has been attached to and that is where the consumer can handle it. We only care *here* about whether it is time to start another read attempt.
// NOTE: We treat any kind of error as a 'completed read' - the error will come out of the request that the read has been attached to and that is where the consumer can handle it. We only care *here* about whether it is time to start another read attempt. }).then(() => {
}).then(() => { debug(`[request ${valueID(resolveRequest)}] Read completed`);
debug(`Completed read for request ID ${valueID(resolveRequest)}`); return tryHandleQueue();
return tryHandleQueue(); });
});
}
} else { } else {
debug("Request queue empty; stopping request handling loop");
isRunning = false; isRunning = false;
} }
} }
function tryStartLoop() {
if (isRunning === false) {
debug("Starting request handling loop");
isRunning = true;
return tryHandleQueue();
}
}
function startPipeline() { function startPipeline() {
return Promise.try(() => { return Promise.try(() => {
return lastSource.read(); return lastSource.read();
}).then((value) => { }).then((value) => {
let selectedStreamFactory = streamPickerFunc(value); let selectedStreamFactory = streamPickerFunc(value);
let acquiredStream = pool.acquire(selectedStreamFactory);
let pipeline = pipe([ let pipeline = pipe([
fromValue(value), fromValue(value),
selectedStreamFactory(), acquiredStream,
preReader() preReader()
]); ]);
debug(`Created new pipeline with ID ${valueID(pipeline)}`); // This is needed to correlate it back later, when the acquired stream ends and we want to release it back into the pool
wrapperToStream.set(pipeline, acquiredStream);
debug(`[stream ${valueID(pipeline)}] Created`);
pipeline.read(); // Set the pre-reader in motion pipeline.read(); // Set the pre-reader in motion
streamQueue.push(pipeline); streamQueue.push(pipeline);
@ -93,7 +123,7 @@ module.exports = function createDynamicStream(streamPickerFunc) {
let { promise, resolve } = pDefer(); let { promise, resolve } = pDefer();
requestQueue.push(resolve); requestQueue.push(resolve);
debug(`Queued request ID ${valueID(resolve)}`); debug(`[request ${valueID(resolve)}] Created`);
return Promise.try(() => { return Promise.try(() => {
if (requestQueue.length > streamQueue.length - 1) { if (requestQueue.length > streamQueue.length - 1) {
@ -101,11 +131,11 @@ module.exports = function createDynamicStream(streamPickerFunc) {
// NOTE: We do "-1" here to compensate for the fact that after the first read, there is likely to be an ended stream in the stream queue that we don't know has ended yet, and so this could would otherwise always believe that no streams need to be created and then immediately run into an EndOfStream when attempting to actually *use* the available stream. FIXME: Do we need to make this margin $concurrentReadCount instead of a fixed value of 1? // NOTE: We do "-1" here to compensate for the fact that after the first read, there is likely to be an ended stream in the stream queue that we don't know has ended yet, and so this could would otherwise always believe that no streams need to be created and then immediately run into an EndOfStream when attempting to actually *use* the available stream. FIXME: Do we need to make this margin $concurrentReadCount instead of a fixed value of 1?
// TODO: Queue multiple pipelines, in case one request eats through multiple pipelines due to 0-value streams? // TODO: Queue multiple pipelines, in case one request eats through multiple pipelines due to 0-value streams?
// NOTE: startPipeline will produce an EndOfStream if the source runs out of values! // NOTE: startPipeline will produce an EndOfStream if the source runs out of values!
debug("Preparing a pipeline upfront"); debug("Preparing a pipeline upfront...");
return startPipeline(); return startPipeline();
} }
}).then(() => { }).then(() => {
tryHandleQueue(); tryStartLoop();
return promise; return promise;
}); });
} }

@ -9,6 +9,8 @@
"test": "node test.js | tap-diff" "test": "node test.js | tap-diff"
}, },
"dependencies": { "dependencies": {
"@joepie91/unreachable": "^1.0.0",
"@promistream/end-of-stream": "^0.1.2",
"@promistream/from-value": "^0.1.0", "@promistream/from-value": "^0.1.0",
"@promistream/is-end-of-stream": "^0.1.1", "@promistream/is-end-of-stream": "^0.1.1",
"@promistream/pipe": "^0.1.6", "@promistream/pipe": "^0.1.6",
@ -23,6 +25,7 @@
"@promistream/collect": "^0.1.1", "@promistream/collect": "^0.1.1",
"@promistream/filter": "^0.1.1", "@promistream/filter": "^0.1.1",
"@promistream/map": "^0.1.1", "@promistream/map": "^0.1.1",
"@promistream/parallelize": "^0.1.0",
"@promistream/range-numbers": "^0.1.2", "@promistream/range-numbers": "^0.1.2",
"tap-diff": "^0.1.1", "tap-diff": "^0.1.1",
"tape": "^5.1.0" "tape": "^5.1.0"

@ -0,0 +1,46 @@
"use strict";
const pipe = require("@promistream/pipe");
const propagatePeek = require("@promistream/propagate-peek");
const propagateAbort = require("@promistream/propagate-abort");
const EndOfStream = require("@promistream/end-of-stream");
const unreachable = require("@joepie91/unreachable")("@promistream/dynamic");
module.exports = function singleValueProcessor(processingStream) {
// NOTE: This is *not* compliant with the Promistream spec! It can only be used with code that is specifically written to handle it.
let streamEnded = true;
let gateStream = {
_promistreamVersion: 0,
description: `single-value processor gate stream`,
peek: propagatePeek, // FIXME: Is this correct? Should this not be dependent on whether a peek and/or read has already been done for this iteration?
abort: propagateAbort,
read: function produceValue_singleValueGateStream(source) {
if (streamEnded === false) {
streamEnded = true;
return source.read();
} else {
throw new EndOfStream;
}
},
};
let wrappedPipeline = pipe([
gateStream,
processingStream
]);
return {
read: function (source) {
return wrappedPipeline.read(source);
},
reset: function () {
if (streamEnded === true) {
streamEnded = false;
} else {
throw unreachable("Tried to reset a non-ended stream");
}
}
};
};

@ -0,0 +1,52 @@
"use strict";
const debug = require("debug")("promistream:dynamic:stream-pool");
const unreachable = require("@joepie91/unreachable")("@promistream/dynamic");
const valueID = require("./value-id");
// TODO: Separate out into own package + update `unreachable` label accordingly
// TODO: Validation
module.exports = function createStreamPool() {
let pool = new WeakMap();
let streamTypes = new Map();
return {
acquire: function (streamFactory) {
if (!pool.has(streamFactory)) {
pool.set(streamFactory, []);
}
let availableStreams = pool.get(streamFactory);
if (availableStreams.length === 0) {
debug(`Ran out of streams for type ${valueID(streamFactory)}, creating a new one...`);
let newStream = streamFactory();
streamTypes.set(newStream, streamFactory);
// TODO: Periodic cleanup and activity tracking for instances
availableStreams.push(newStream);
} else {
debug(`Reusing stream from pool for type ${valueID(streamFactory)}`);
}
let acquiredStream = availableStreams.shift();
if (acquiredStream.reset != null) {
acquiredStream.reset();
}
return acquiredStream;
},
release: function (stream) {
let streamFactory = streamTypes.get(stream);
if (streamFactory != null) {
let availableStreams = pool.get(streamFactory);
availableStreams.push(stream);
} else {
throw unreachable(`Specified stream does not belong to the pool`);
}
}
};
};

@ -8,9 +8,13 @@ const nanoid = require("nanoid/non-secure").customAlphabet("1234567890abcdefghkl
let knownValues = new WeakMap(); let knownValues = new WeakMap();
module.exports = function valueID(value) { module.exports = function valueID(value) {
if (!knownValues.has(value)) { if (value == null) {
knownValues.set(value, nanoid()); return value;
} } else {
if (!knownValues.has(value)) {
knownValues.set(value, nanoid());
}
return knownValues.get(value); return knownValues.get(value);
}
}; };

@ -27,7 +27,7 @@
dependencies: dependencies:
"@promistream/simple-sink" "^0.1.0" "@promistream/simple-sink" "^0.1.0"
"@promistream/end-of-stream@^0.1.0", "@promistream/end-of-stream@^0.1.1": "@promistream/end-of-stream@^0.1.0", "@promistream/end-of-stream@^0.1.1", "@promistream/end-of-stream@^0.1.2":
version "0.1.2" version "0.1.2"
resolved "https://registry.yarnpkg.com/@promistream/end-of-stream/-/end-of-stream-0.1.2.tgz#45820c8d29353c480c0219920db95ba075396438" resolved "https://registry.yarnpkg.com/@promistream/end-of-stream/-/end-of-stream-0.1.2.tgz#45820c8d29353c480c0219920db95ba075396438"
integrity sha512-rOeAIkcVZW6oYox2Jc1z/00iLVx0w0cIlcD/TbR798Qg5M5/nhErtjSG08QAtuaPSxAFKNl5ipAD8HHGV5esJw== integrity sha512-rOeAIkcVZW6oYox2Jc1z/00iLVx0w0cIlcD/TbR798Qg5M5/nhErtjSG08QAtuaPSxAFKNl5ipAD8HHGV5esJw==
@ -79,7 +79,21 @@
"@validatem/required" "^0.1.1" "@validatem/required" "^0.1.1"
bluebird "^3.5.4" bluebird "^3.5.4"
"@promistream/pipe@^0.1.6": "@promistream/parallelize@^0.1.0":
version "0.1.0"
resolved "https://registry.yarnpkg.com/@promistream/parallelize/-/parallelize-0.1.0.tgz#0f261c15e58acc141605a88d01ecce6a7a27612c"
integrity sha512-kbu2aheDxOl+PxvZWMkiU5WJdpnR3A+4s3/B9ky1W74tSt40Cl1/Qa+5sQtdmMuuqwhdhDFwjhhSlNJx4LwLJw==
dependencies:
"@promistream/is-aborted" "^0.1.1"
"@promistream/is-end-of-stream" "^0.1.1"
"@promistream/pipe" "^0.1.4"
"@promistream/propagate-abort" "^0.1.2"
"@promistream/sequentialize" "^0.1.0"
bluebird "^3.5.4"
debug "^4.1.1"
default-value "^1.0.0"
"@promistream/pipe@^0.1.4", "@promistream/pipe@^0.1.6":
version "0.1.6" version "0.1.6"
resolved "https://registry.yarnpkg.com/@promistream/pipe/-/pipe-0.1.6.tgz#fb7d930fd7011a9542502904049023fdaa87b82d" resolved "https://registry.yarnpkg.com/@promistream/pipe/-/pipe-0.1.6.tgz#fb7d930fd7011a9542502904049023fdaa87b82d"
integrity sha512-B/n4WPJ/goXALCWJYgZV0M/lLMIF5OuaqvxezJq/lcSCo9RuV82wmdJBZd+IEmc6Ykn/EYTFtUHCnRjkl56+3w== integrity sha512-B/n4WPJ/goXALCWJYgZV0M/lLMIF5OuaqvxezJq/lcSCo9RuV82wmdJBZd+IEmc6Ykn/EYTFtUHCnRjkl56+3w==
@ -123,6 +137,16 @@
"@promistream/end-of-stream" "^0.1.0" "@promistream/end-of-stream" "^0.1.0"
"@promistream/simple-source" "^0.1.0" "@promistream/simple-source" "^0.1.0"
"@promistream/sequentialize@^0.1.0":
version "0.1.0"
resolved "https://registry.yarnpkg.com/@promistream/sequentialize/-/sequentialize-0.1.0.tgz#8cab499c2518ee856fcb1e13943859ca5b77ba71"
integrity sha512-lm7wJmlOSmBvHq49zLfs3cghOt9kcRhLezCbuhXQUXhhiaKLCvYuyA1AGId0kiJDPX2SggrU3Ojb+TOcxPEAqw==
dependencies:
"@joepie91/unreachable" "^1.0.0"
"@promistream/propagate-abort" "^0.1.2"
bluebird "^3.5.4"
p-defer "^3.0.0"
"@promistream/simple-sink@^0.1.0": "@promistream/simple-sink@^0.1.0":
version "0.1.1" version "0.1.1"
resolved "https://registry.yarnpkg.com/@promistream/simple-sink/-/simple-sink-0.1.1.tgz#e3808179102ffe4bc10d70d681f19c649e1f3811" resolved "https://registry.yarnpkg.com/@promistream/simple-sink/-/simple-sink-0.1.1.tgz#e3808179102ffe4bc10d70d681f19c649e1f3811"
@ -547,7 +571,7 @@ create-error@^0.3.1:
resolved "https://registry.yarnpkg.com/create-error/-/create-error-0.3.1.tgz#69810245a629e654432bf04377360003a5351a23" resolved "https://registry.yarnpkg.com/create-error/-/create-error-0.3.1.tgz#69810245a629e654432bf04377360003a5351a23"
integrity sha1-aYECRaYp5lRDK/BDdzYAA6U1GiM= integrity sha1-aYECRaYp5lRDK/BDdzYAA6U1GiM=
debug@^4.3.1: debug@^4.1.1, debug@^4.3.1:
version "4.3.1" version "4.3.1"
resolved "https://registry.yarnpkg.com/debug/-/debug-4.3.1.tgz#f0d229c505e0c6d8c49ac553d1b13dc183f6b2ee" resolved "https://registry.yarnpkg.com/debug/-/debug-4.3.1.tgz#f0d229c505e0c6d8c49ac553d1b13dc183f6b2ee"
integrity sha512-doEwdvm4PCeK4K3RQN2ZC2BYUBaxwLARCqZmMjtF8a51J2Rb0xpVloFRnCODwqjpwnAoao4pelN8l3RJdv3gRQ== integrity sha512-doEwdvm4PCeK4K3RQN2ZC2BYUBaxwLARCqZmMjtF8a51J2Rb0xpVloFRnCODwqjpwnAoao4pelN8l3RJdv3gRQ==

Loading…
Cancel
Save