"use strict" ;
const Promise = require ( "bluebird" ) ;
const consumable = require ( "@joepie91/consumable" ) ;
const defaultValue = require ( "default-value" ) ;
const debug = require ( "debug" ) ( "promistream:parallelize" ) ;
const createRequestMatcher = require ( "request-matcher" ) ;
const isEndOfStream = require ( "@promistream/is-end-of-stream" ) ;
const isAborted = require ( "@promistream/is-aborted" ) ;
const propagateAbort = require ( "@promistream/propagate-abort" ) ;
const pipe = require ( "@promistream/pipe" ) ;
const sequentialize = require ( "@promistream/sequentialize" ) ;
const asyncWhile = require ( "./async-while" ) ;
function isRejectedWithMarker ( promise ) {
if ( promise . isRejected ( ) ) {
let reason = promise . reason ( ) ;
return ( isAborted ( reason ) || isEndOfStream ( reason ) ) ;
} else {
return false ;
}
}
module . exports = function parallelizeStream ( threadCount , options = { } ) {
let ordered = defaultValue ( options . ordered , true ) ;
/* TODO: Does this need a more efficient FIFO queue implementation? */
let ended = false ;
let parallelMode = true ;
let filling = false ;
let currentSource = null ;
let threadsRunning = 0 ;
let peekPointer = 0 ;
let markerBuffer = consumable ( [ ] ) ;
let requestMatcher = createRequestMatcher ( {
onMatch : ( ) => {
tryStartFilling ( ) ;
}
} ) ;
function canStartRead ( ) {
let maximumThreads = ( parallelMode === true )
? threadCount
: 1 ;
if ( threadsRunning >= maximumThreads ) {
return false ;
} else if ( ended ) {
// Special case: never optimistically read when the stream has been ended, because it will lead to an unnecessary read
let canRead = ( requestMatcher . requestCount ( ) > 0 ) ;
debug ( ` [filling] stream ended; is there a request in queue? ${ canRead } ` ) ;
return canRead ;
} else {
return Promise . try ( ( ) => {
return currentSource . peek ( ) ;
} ) . then ( ( dataAvailable ) => {
if ( dataAvailable && parallelMode === false && ended === false ) {
switchToParallelMode ( ) ;
} else if ( ! dataAvailable && parallelMode === true ) {
switchToSequentialMode ( ) ;
}
debug ( ` [filling] data available upstream: ${ dataAvailable } ` ) ;
return dataAvailable ;
} ) ;
}
}
function tryStartFilling ( ) {
if ( filling === false ) {
debug ( ` [filling] started ` ) ;
filling = true ;
Promise . try ( ( ) => {
return asyncWhile ( canStartRead , ( ) => {
return startRead ( ) ;
} ) ;
} ) . then ( ( ) => {
debug ( ` [filling] completed ` ) ;
filling = false ;
} ) . catch ( ( error ) => {
debug ( ` [filling] error: ` , error ) ;
requestMatcher . failAllRequests ( error ) ;
} ) ;
}
}
function startRead ( ) {
threadsRunning += 1 ;
debug ( ` [read] started (parallel mode = ${ parallelMode } , thread count = ${ threadCount } , in-flight requests = ${ threadsRunning } ) ` ) ;
let readOperation = Promise . try ( ( ) => {
return currentSource . read ( ) ;
} ) . finally ( ( ) => {
threadsRunning -= 1 ;
debug ( ` [read] completed (parallel mode = ${ parallelMode } , thread count = ${ threadCount } , in-flight requests = ${ threadsRunning } ) ` ) ;
} ) . tapCatch ( isEndOfStream , isAborted , ( ) => {
if ( ended === false ) {
debug ( " [mode] marking stream as ended" ) ;
ended = true ;
switchToSequentialMode ( ) ;
}
} ) ;
// This noop is just used to silence unhandled rejection warnings - those get handled by the read request that they are attached to instead. But because they may only actually get attached in a later event loop tick, Bluebird will incorrectly believe them to be 'unhandled'.
// TODO: Is the assertion that they get handled always true? Is it possible for some read results to never get read out? Will that re-emit a warning elsewhere somehow?
function noop ( ) { }
if ( ordered ) {
// In ordered mode, (a Promise for) the result is reported to the queue immediately so that it gets matched to a read request in-order
debug ( "[response] pushed in-flight read operation" ) ;
requestMatcher . pushResponse ( readOperation ) ;
// Unhandled rejection silencer
readOperation . catch ( noop ) ;
} else {
// In unordered mode, results are reported to the queue as and when they come in
readOperation . finally ( ( ) => {
if ( isRejectedWithMarker ( readOperation ) ) {
// We place Aborted/EndOfStream markers in a separate queue in unordered mode; they can occur *before* some other successful reads complete, and we don't want the downstream to prematurely stop reading, so we need to make sure that all non-marker results are processed before throwing the markers downstream.
debug ( "[response] pushed read result (to marker buffer)" ) ;
markerBuffer . peek ( ) . push ( readOperation ) ;
} else {
debug ( "[response] pushed read result" ) ;
requestMatcher . pushResponse ( readOperation ) ;
}
if ( ended === true && threadsRunning === 0 && markerBuffer . peek ( ) . length > 0 ) {
for ( let marker of markerBuffer . replace ( [ ] ) ) {
requestMatcher . pushResponse ( marker ) ;
}
}
} ) . catch ( noop ) ; // Unhandled rejection silencer
}
}
function switchToParallelMode ( ) {
debug ( " [mode] switching to parallel" ) ;
parallelMode = true ;
return tryStartFilling ( ) ;
}
function switchToSequentialMode ( ) {
debug ( " [mode] switching to sequential" ) ;
parallelMode = false ;
}
let parallelizer = {
_promistreamVersion : 0 ,
description : ` parallelize ( ${ threadCount } threads) ` ,
abort : propagateAbort ,
peek : async function ( source ) {
debug ( "[peek] requested" ) ;
if ( requestMatcher . responseCount ( ) > peekPointer ) {
peekPointer += 1 ;
return true ;
} else if ( parallelMode === true ) {
return source . peek ( ) ;
} else {
return false ;
}
} ,
read : function ( source ) {
return Promise . try ( ( ) => {
currentSource = source ;
if ( peekPointer > 0 ) {
peekPointer -= 1 ;
}
// We leave it up to the request queue to match a read request to a result - we don't assume that it will be the next read, because in unordered mode reads can complete out-of-order
let request = requestMatcher . pushRequest ( ) ;
debug ( "[request] started" ) ;
// NOTE: This should always happen *after* creating a request in the queue, to ensure correct behaviour when the stream has ended; otherwise canStartRead may be called while the requestCount is still 0, and it will fail to initiate a read to satisfy the request
tryStartFilling ( ) ;
return request ;
} ) . tap ( ( ) => {
debug ( "[request] satisfied with a value" ) ;
} ) . tapCatch ( ( error ) => {
debug ( "[request] satisfied with an error:" , error . message ) ;
} ) ;
}
} ;
return pipe ( [
parallelizer ,
sequentialize ( )
] ) ;
} ;