"use strict"; const Promise = require("bluebird"); const pipe = require("@promistream/pipe"); const sequentialize = require("@promistream/sequentialize"); const isEndOfStream = require("@promistream/is-end-of-stream"); const propagateAbort = require("@promistream/propagate-abort"); module.exports = function combineSequentialStreaming() { let currentStream; function tryLoadNextStream(source) { if (currentStream == null) { return Promise.try(() => { return source.read(); }).then((stream) => { // FIXME: Validate that it's actually a promistream currentStream = stream; return tryLoadNextStream(source); }); } } function tryRead(source) { return Promise.try(() => { return tryLoadNextStream(source); }).then(() => { return Promise.try(() => { return currentStream.read(); }).catch(isEndOfStream, () => { currentStream = null; // The next read attempt will load in a new stream return tryRead(source); }); }); } return pipe([ { _promistreamVersion: 0, description: `sequential combiner stream`, abort: propagateAbort, // Once aborted, need to abort any substreams that come in afterwards peek: function peek(source) { return Promise.try(() => { return tryLoadNextStream(source); }).then(() => { // FIXME: This won't be completely accurate - if the current stream produces multiple peek=true for EndOfStream markers, then only one of them will actually be read out before the stream is switched out - need to have some code to drain remaining peeks before swapping out the stream, so as to remain spec-compliant return currentStream.peek(); }); }, read: function produceValue_combineSequentialStreaming(source) { // FIXME: Define how this interacts with aborts - does an abort in a single source stream also propagate to an abort for the parent pipeline, including any following substreams? return tryRead(source); } }, // FIXME: Make parallelism-capable sequentialize() ]); };