Browse Source

Initial commit

master
Sven Slootweg 2 months ago
commit
245a7c494a
  1. 1
      .gitignore
  2. 23
      example.js
  3. 61
      index.js
  4. 25
      package.json
  5. 1438
      yarn.lock

1
.gitignore

@ -0,0 +1 @@
node_modules

23
example.js

@ -0,0 +1,23 @@
"use strict";
const Promise = require("bluebird");
const pipe = require("@promistream/pipe");
const fromIterable = require("@promistream/from-iterable");
const rangeNumbers = require("@promistream/range-numbers");
const combineSequentialStreaming = require("./");
const collect = require("@promistream/collect");
return Promise.try(() => {
return pipe([
fromIterable([
rangeNumbers(0, 3),
rangeNumbers(3, 5),
rangeNumbers(5, 10),
]),
combineSequentialStreaming(),
collect()
]).read();
}).then((result) => {
console.log(result); // [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 ]
});

61
index.js

@ -0,0 +1,61 @@
"use strict";
const Promise = require("bluebird");
const pipe = require("@promistream/pipe");
const sequentialize = require("@promistream/sequentialize");
const isEndOfStream = require("@promistream/is-end-of-stream");
const propagateAbort = require("@promistream/propagate-abort");
module.exports = function combineSequentialStreaming() {
let currentStream;
function tryLoadNextStream(source) {
if (currentStream == null) {
return Promise.try(() => {
return source.read();
}).then((stream) => {
// FIXME: Validate that it's actually a promistream
currentStream = stream;
return tryLoadNextStream(source);
});
}
}
function tryRead(source) {
return Promise.try(() => {
return tryLoadNextStream(source);
}).then(() => {
return Promise.try(() => {
return currentStream.read();
}).catch(isEndOfStream, () => {
currentStream = null;
// The next read attempt will load in a new stream
return tryRead(source);
});
});
}
return pipe([
{
_promistreamVersion: 0,
description: `sequential combiner stream`,
abort: propagateAbort, // Once aborted, need to abort any substreams that come in afterwards
peek: function peek(source) {
return Promise.try(() => {
return tryLoadNextStream(source);
}).then(() => {
// FIXME: This won't be completely accurate - if the current stream produces multiple peek=true for EndOfStream markers, then only one of them will actually be read out before the stream is switched out - need to have some code to drain remaining peeks before swapping out the stream, so as to remain spec-compliant
return currentStream.peek();
});
},
read: function produceValue_combineSequentialStreaming(source) {
// FIXME: Define how this interacts with aborts - does an abort in a single source stream also propagate to an abort for the parent pipeline, including any following substreams?
return tryRead(source);
}
},
// FIXME: Make parallelism-capable
sequentialize()
]);
};

25
package.json

@ -0,0 +1,25 @@
{
"name": "@promistream/combine-sequential-streaming",
"version": "0.1.0",
"main": "index.js",
"keywords": [
"promistream"
],
"repository": "http://git.cryto.net/promistream/combine-sequential-streaming.git",
"author": "Sven Slootweg <admin@cryto.net>",
"license": "WTFPL OR CC0-1.0",
"devDependencies": {
"@joepie91/eslint-config": "^1.1.0",
"@promistream/collect": "^0.1.1",
"@promistream/range-numbers": "^0.1.2",
"eslint": "^6.8.0"
},
"dependencies": {
"@promistream/pipe": "^0.1.6",
"@promistream/from-iterable": "^0.1.0",
"@promistream/is-end-of-stream": "^0.1.1",
"@promistream/propagate-abort": "^0.1.7",
"@promistream/sequentialize": "^0.1.0",
"bluebird": "^3.7.2"
}
}

1438
yarn.lock
File diff suppressed because it is too large
View File

Loading…
Cancel
Save