"use strict" ;
const Promise = require ( "bluebird" ) ;
const isEndOfStream = require ( "@promistream/is-end-of-stream" ) ;
const isAborted = require ( "@promistream/is-aborted" ) ;
const propagateAbort = require ( "@promistream/propagate-abort" ) ;
const pipe = require ( "@promistream/pipe" ) ;
const sequentialize = require ( "@promistream/sequentialize" ) ;
const defaultValue = require ( "default-value" ) ;
const debug = require ( "debug" ) ( "promistream:parallelize" ) ;
const debugListener = require ( "debug" ) ( "promistream:parallelize:queue-listener" ) ;
const createPromiseListener = require ( "./promise-listener" ) ;
// FIXME: Verify that if an EndOfStream or Aborted marker comes in, it is left queued up until all of the in-flight non-marker results have been processed; otherwise the downstream may erroneously believe that the stream has already ended, while more items are still on their way
module . exports = function parallelizeStream ( threadCount , options = { } ) {
let ordered = defaultValue ( options . ordered , true ) ;
/* TODO: Does this need a more efficient FIFO queue implementation? */
let signals = [ ] ;
let queueListener = createPromiseListener ( ) ;
let parallelMode = true ;
let filling = false ;
let currentSource = null ;
function stopFilling ( ) {
debug ( ` Paused internal queue fill (parallel mode = ${ parallelMode } , thread count = ${ threadCount } , in-flight requests = ${ signals . length } ) ` ) ;
filling = false ;
}
function fillRequest ( ) {
if ( parallelMode === true && signals . length < threadCount ) {
return Promise . try ( ( ) => {
return currentSource . peek ( ) ;
} ) . then ( ( valueAvailable ) => {
debug ( ` Result of upstream peek: ${ valueAvailable } ` ) ;
if ( valueAvailable ) {
queueRead ( ) ;
return fillRequest ( currentSource ) ;
} else {
return switchToSequentialMode ( ) ;
}
} ) ;
} else {
stopFilling ( ) ;
}
}
function tryStartFilling ( ) {
if ( ! filling ) {
Promise . try ( ( ) => {
debug ( "Starting internal queue fill..." ) ;
filling = true ;
return fillRequest ( currentSource ) ;
} ) . catch ( ( err ) => {
debugListener ( "Rejecting" , err ) ;
queueListener . reject ( err ) ;
debug ( ` Error occurred during filling: ${ err . stack } ` ) ;
} ) ;
}
}
function switchToParallelMode ( ) {
debug ( "Switching to parallel mode" ) ;
parallelMode = true ;
return tryStartFilling ( ) ;
}
function switchToSequentialMode ( ) {
debug ( "Switching to sequential mode" ) ;
parallelMode = false ;
return stopFilling ( ) ;
}
function bufferNotEmpty ( ) {
/* NOTE: This should *only* take into account items that have not been peeked yet! */
let peekedSignal ;
if ( ordered ) {
/* This will return only if there is a contiguous sequence of settled signals from the start, of which *at least one* has not been peeked yet. */
for ( let signal of signals ) {
if ( signal . trackingPromise . isPending ( ) ) {
break ;
} else if ( signal . peeked ) {
continue ;
} else {
/* Settled, and not peeked yet. */
peekedSignal = signal ;
break ;
}
}
} else {
peekedSignal = signals . find ( ( signal ) => {
return ! ( signal . trackingPromise . isPending ( ) ) && ! signal . peeked ;
} ) ;
}
if ( peekedSignal != null ) {
peekedSignal . peeked = true ;
return true ;
} else {
return false ;
}
}
function awaitInFlightRequests ( ) {
if ( signals . length > 0 ) {
return true ;
} else {
debug ( "Waiting for queue to be non-empty..." ) ;
return Promise . try ( ( ) => {
debugListener ( "Listening..." ) ;
return queueListener . listen ( ) ;
} ) . tap ( ( ) => {
debug ( "Got queue-filled notification" ) ;
} ) . tapCatch ( ( error ) => {
debug ( ` Queue listener rejected: ${ error . stack } ` ) ;
} ) ;
}
}
function queueRead ( ) {
let promise = Promise . try ( ( ) => {
return currentSource . read ( ) ;
} ) ;
let signalObject = { promise : promise } ;
signals . push ( {
peeked : false ,
object : signalObject ,
trackingPromise : Promise . try ( ( ) => {
return promise . reflect ( ) ;
} ) . then ( ( ) => {
return signalObject ;
} )
} ) ;
debugListener ( "Resolving" ) ;
queueListener . resolve ( ) ;
}
function awaitResult ( ) {
return Promise . try ( ( ) => {
return awaitInFlightRequests ( ) ;
} ) . then ( ( ) => {
debug ( "Awaiting next finished result..." ) ;
if ( ordered ) {
return signals [ 0 ] . trackingPromise ;
} else {
return Promise . race ( signals . map ( ( item ) => item . trackingPromise ) ) ;
}
} ) . then ( ( signalObject ) => {
debug ( "A read attempt completed!" ) ;
let resultPromise = signalObject . promise ;
signals = signals . filter ( ( signal ) => ( signal . object !== signalObject ) ) ;
let isRejected = resultPromise . isRejected ( ) ;
let isEndOfStream _ = isRejected && isEndOfStream ( resultPromise . reason ( ) ) ;
let isAborted _ = isRejected && isAborted ( resultPromise . reason ( ) ) ;
if ( isEndOfStream _ || isAborted _ ) {
switchToSequentialMode ( ) ;
if ( signals . length > 0 ) {
/* Throw away the marker, and wait for the next result */
return awaitResult ( ) ;
} else {
/* Queue has been exhausted, so this marker will be the final result; pass it through */
return signalObject . promise ;
}
} else {
return signalObject . promise ;
}
} ) ;
}
let parallelizer = {
_promistreamVersion : 0 ,
description : ` parallelize ( ${ threadCount } threads) ` ,
abort : propagateAbort ,
peek : async function ( source ) {
debug ( "Processing peek..." ) ;
if ( bufferNotEmpty ( ) ) {
return true ;
} else {
if ( parallelMode === true ) {
return source . peek ( ) ;
} else {
return false ;
}
}
} ,
read : function ( source ) {
return Promise . try ( ( ) => {
debug ( "Processing read..." ) ;
currentSource = source ;
if ( parallelMode ) {
/* This runs in the background, potentially perpetually */
tryStartFilling ( ) ;
return awaitResult ( ) ;
} else {
/* Sequential mode */
if ( signals . length > 0 ) {
/* Clear out the remaining in-flight reads from the previous parallel-mode operation, first. */
debug ( ` Awaiting in-flight read... (in-flight reads = ${ signals . length } ) ` ) ;
return awaitResult ( ) ;
} else {
debug ( "Passing through read to upstream..." ) ;
return Promise . try ( ( ) => {
return source . read ( ) ;
} ) . then ( ( result ) => {
switchToParallelMode ( ) ;
return result ;
} ) ;
}
}
} ) . tap ( ( ) => {
debug ( "Read request was satisfied with a value" ) ;
} ) . tapCatch ( ( error ) => {
debug ( "Read request was satisfied with an error:" , error . message ) ;
} ) ;
}
} ;
return pipe ( [
parallelizer ,
sequentialize ( )
] ) ;
} ;