@ -1,186 +1,156 @@
"use strict" ;
const Promise = require ( "bluebird" ) ;
const consumable = require ( "@joepie91/consumable" ) ;
const defaultValue = require ( "default-value" ) ;
const debug = require ( "debug" ) ( "promistream:parallelize" ) ;
const isEndOfStream = require ( "@promistream/is-end-of-stream" ) ;
const isAborted = require ( "@promistream/is-aborted" ) ;
const propagateAbort = require ( "@promistream/propagate-abort" ) ;
const pipe = require ( "@promistream/pipe" ) ;
const sequentialize = require ( "@promistream/sequentialize" ) ;
const defaultValue = require ( "default-value" ) ;
const debug = require ( "debug" ) ( "promistream:parallelize" ) ;
const debugListener = require ( "debug" ) ( "promistream:parallelize:queue-listener" ) ;
const createPromiseListener = require ( "./promise-listener" ) ;
const createRequestQueue = require ( "./request-queue" ) ;
const asyncWhile = require ( "./async-while" ) ;
// FIXME: Verify that if an EndOfStream or Aborted marker comes in, it is left queued up until all of the in-flight non-marker results have been processed; otherwise the downstream may erroneously believe that the stream has already ended, while more items are still on their way
function isRejectedWithMarker ( promise ) {
if ( promise . isRejected ( ) ) {
let reason = promise . reason ( ) ;
return ( isAborted ( reason ) || isEndOfStream ( reason ) ) ;
} else {
return false ;
}
}
module . exports = function parallelizeStream ( threadCount , options = { } ) {
let ordered = defaultValue ( options . ordered , true ) ;
/* TODO: Does this need a more efficient FIFO queue implementation? */
let signals = [ ] ;
let queueListener = createPromiseListener ( ) ;
let ended = false ;
let parallelMode = true ;
let filling = false ;
let currentSource = null ;
let threadsRunning = 0 ;
let peekPointer = 0 ;
let markerBuffer = consumable ( [ ] ) ;
function stopFilling ( ) {
debug ( ` Paused internal queue fill (parallel mode = ${ parallelMode } , thread count = ${ threadCount } , in-flight requests = ${ signals . length } ) ` ) ;
filling = false ;
}
let requestQueue = createRequestQueue ( {
onMatch : ( ) => {
tryStartFilling ( ) ;
}
} ) ;
function canStartRead ( ) {
let maximumThreads = ( parallelMode === true )
? threadCount
: 1 ;
function fillRequest ( ) {
if ( parallelMode === true && signals . length < threadCount ) {
if ( threadsRunning >= maximumThreads ) {
return false ;
} else if ( ended ) {
// Special case: never optimistically read when the stream has been ended, because it will lead to an unnecessary read
let canRead = ( requestQueue . requestCount ( ) > 0 ) ;
debug ( ` [filling] stream ended; is there a request in queue? ${ canRead } ` ) ;
return canRead ;
} else {
return Promise . try ( ( ) => {
return currentSource . peek ( ) ;
} ) . then ( ( valueAvailable ) => {
debug ( ` Result of upstream peek: ${ valueAvailable } ` ) ;
} ) . then ( ( dataAvailable ) => {
if ( dataAvailable && parallelMode === false && ended === false ) {
switchToParallelMode ( ) ;
} else if ( ! dataAvailable && parallelMode === true ) {
switchToSequentialMode ( ) ;
}
if ( valueAvailable ) {
queueRead ( ) ;
debug ( ` [filling] data available upstream: ${ dataAvailable } ` ) ;
return fillRequest ( currentSource ) ;
} else {
return switchToSequentialMode ( ) ;
}
return dataAvailable ;
} ) ;
} else {
stopFilling ( ) ;
}
}
function tryStartFilling ( ) {
if ( ! filling ) {
if ( filling === false ) {
debug ( ` [filling] started ` ) ;
filling = true ;
Promise . try ( ( ) => {
debug ( "Starting internal queue fill..." ) ;
filling = true ;
return fillRequest ( currentSource ) ;
} ) . catch ( ( err ) => {
debugListener ( "Rejecting" , err ) ;
queueListener . reject ( err ) ;
debug ( ` Error occurred during filling: ${ err . stack } ` ) ;
return asyncWhile ( canStartRead , ( ) => {
return startRead ( ) ;
} ) ;
} ) . then ( ( ) => {
debug ( ` [filling] completed ` ) ;
filling = false ;
} ) . catch ( ( error ) => {
debug ( ` [filling] error: ` , error ) ;
requestQueue . failAllRequests ( error ) ;
} ) ;
}
}
function switchToParallelMode ( ) {
debug ( "Switching to parallel mode" ) ;
parallelMode = true ;
return tryStartFilling ( ) ;
}
function switchToSequentialMode ( ) {
debug ( "Switching to sequential mode" ) ;
parallelMode = false ;
function startRead ( ) {
threadsRunning += 1 ;
debug ( ` [read] started (parallel mode = ${ parallelMode } , thread count = ${ threadCount } , in-flight requests = ${ threadsRunning } ) ` ) ;
return stopFilling ( ) ;
}
let readOperation = Promise . try ( ( ) => {
return currentSource . read ( ) ;
} ) . finally ( ( ) => {
threadsRunning -= 1 ;
debug ( ` [read] completed (parallel mode = ${ parallelMode } , thread count = ${ threadCount } , in-flight requests = ${ threadsRunning } ) ` ) ;
} ) . tapCatch ( isEndOfStream , isAborted , ( ) => {
if ( ended === false ) {
debug ( " [mode] marking stream as ended" ) ;
ended = true ;
switchToSequentialMode ( ) ;
}
} ) ;
function bufferNotEmpty ( ) {
/* NOTE: This should *only* take into account items that have not been peeked yet! */
let peekedSignal ;
// This noop is just used to silence unhandled rejection warnings - those get handled by the read request that they are attached to instead. But because they may only actually get attached in a later event loop tick, Bluebird will incorrectly believe them to be 'unhandled'.
// TODO: Is the assertion that they get handled always true? Is it possible for some read results to never get read out? Will that re-emit a warning elsewhere somehow?
function noop ( ) { }
if ( ordered ) {
/* This will return only if there is a contiguous sequence of settled signals from the start, of which *at least one* has not been peeked yet. */
for ( let signal of signals ) {
if ( signal . trackingPromise . isPending ( ) ) {
break ;
} else if ( signal . peeked ) {
continue ;
} else {
/* Settled, and not peeked yet. */
peekedSignal = signal ;
break ;
}
}
} else {
peekedSignal = signals . find ( ( signal ) => {
return ! ( signal . trackingPromise . isPending ( ) ) && ! signal . peeked ;
} ) ;
}
if ( peekedSignal != null ) {
peekedSignal . peeked = true ;
return true ;
} else {
return false ;
}
}
// In ordered mode, (a Promise for) the result is reported to the queue immediately so that it gets matched to a read request in-order
debug ( "[response] pushed in-flight read operation" ) ;
requestQueue . pushResponse ( readOperation ) ;
function awaitInFlightRequests ( ) {
if ( signals . length > 0 ) {
return true ;
// Unhandled rejection silencer
readOperation . catch ( noop ) ;
} else {
debug ( "Waiting for queue to be non-empty..." ) ;
// In unordered mode, results are reported to the queue as and when they come in
readOperation . finally ( ( ) => {
if ( isRejectedWithMarker ( readOperation ) ) {
// We place Aborted/EndOfStream markers in a separate queue in unordered mode; they can occur *before* some other successful reads complete, and we don't want the downstream to prematurely stop reading, so we need to make sure that all non-marker results are processed before throwing the markers downstream.
debug ( "[response] pushed read result (to marker buffer)" ) ;
markerBuffer . peek ( ) . push ( readOperation ) ;
} else {
debug ( "[response] pushed read result" ) ;
requestQueue . pushResponse ( readOperation ) ;
}
return Promise . try ( ( ) => {
debugListener ( "Listening..." ) ;
return queueListener . listen ( ) ;
} ) . tap ( ( ) => {
debug ( "Got queue-filled notification" ) ;
} ) . tapCatch ( ( error ) => {
debug ( ` Queue listener rejected: ${ error . stack } ` ) ;
} ) ;
if ( ended === true && threadsRunning === 0 && markerBuffer . peek ( ) . length > 0 ) {
for ( let marker of markerBuffer . replace ( [ ] ) ) {
requestQueue . pushResponse ( marker ) ;
}
}
} ) . catch ( noop ) ; // Unhandled rejection silencer
}
}
function queueRead ( ) {
let promise = Promise . try ( ( ) => {
return currentSource . read ( ) ;
} ) ;
let signalObject = { promise : promise } ;
signals . push ( {
peeked : false ,
object : signalObject ,
trackingPromise : Promise . try ( ( ) => {
return promise . reflect ( ) ;
} ) . then ( ( ) => {
return signalObject ;
} )
} ) ;
function switchToParallelMode ( ) {
debug ( " [mode] switching to parallel" ) ;
parallelMode = true ;
debugListener ( "Resolving" ) ;
queueListener . resolve ( ) ;
return tryStartFilling ( ) ;
}
function awaitResult ( ) {
return Promise . try ( ( ) => {
return awaitInFlightRequests ( ) ;
} ) . then ( ( ) => {
debug ( "Awaiting next finished result..." ) ;
if ( ordered ) {
return signals [ 0 ] . trackingPromise ;
} else {
return Promise . race ( signals . map ( ( item ) => item . trackingPromise ) ) ;
}
} ) . then ( ( signalObject ) => {
debug ( "A read attempt completed!" ) ;
let resultPromise = signalObject . promise ;
signals = signals . filter ( ( signal ) => ( signal . object !== signalObject ) ) ;
let isRejected = resultPromise . isRejected ( ) ;
let isEndOfStream _ = isRejected && isEndOfStream ( resultPromise . reason ( ) ) ;
let isAborted _ = isRejected && isAborted ( resultPromise . reason ( ) ) ;
if ( isEndOfStream _ || isAborted _ ) {
switchToSequentialMode ( ) ;
if ( signals . length > 0 ) {
/* Throw away the marker, and wait for the next result */
return awaitResult ( ) ;
} else {
/* Queue has been exhausted, so this marker will be the final result; pass it through */
return signalObject . promise ;
}
} else {
return signalObject . promise ;
}
} ) ;
function switchToSequentialMode ( ) {
debug ( " [mode] switching to sequential" ) ;
parallelMode = false ;
}
let parallelizer = {
@ -188,49 +158,38 @@ module.exports = function parallelizeStream(threadCount, options = {}) {
description : ` parallelize ( ${ threadCount } threads) ` ,
abort : propagateAbort ,
peek : async function ( source ) {
debug ( "Processing peek..." ) ;
debug ( "[peek] requested" ) ;
if ( requestQueue . responseCount ( ) > peekPointer ) {
peekPointer += 1 ;
if ( bufferNotEmpty ( ) ) {
return true ;
} else if ( parallelMode === true ) {
return source . peek ( ) ;
} else {
if ( parallelMode === true ) {
return source . peek ( ) ;
} else {
return false ;
}
return false ;
}
} ,
read : function ( source ) {
return Promise . try ( ( ) => {
debug ( "Processing read..." ) ;
currentSource = source ;
if ( parallelMode ) {
/* This runs in the background, potentially perpetually */
tryStartFilling ( ) ;
return awaitResult ( ) ;
} else {
/* Sequential mode */
if ( signals . length > 0 ) {
/* Clear out the remaining in-flight reads from the previous parallel-mode operation, first. */
debug ( ` Awaiting in-flight read... (in-flight reads = ${ signals . length } ) ` ) ;
return awaitResult ( ) ;
} else {
debug ( "Passing through read to upstream..." ) ;
return Promise . try ( ( ) => {
return source . read ( ) ;
} ) . then ( ( result ) => {
switchToParallelMode ( ) ;
return result ;
} ) ;
}
if ( peekPointer > 0 ) {
peekPointer -= 1 ;
}
// We leave it up to the request queue to match a read request to a result - we don't assume that it will be the next read, because in unordered mode reads can complete out-of-order
let request = requestQueue . pushRequest ( ) ;
debug ( "[request] started" ) ;
// NOTE: This should always happen *after* creating a request in the queue, to ensure correct behaviour when the stream has ended; otherwise canStartRead may be called while the requestCount is still 0, and it will fail to initiate a read to satisfy the request
tryStartFilling ( ) ;
return request ;
} ) . tap ( ( ) => {
debug ( "Read request was satisfied with a value" ) ;
debug ( "[request] satisfied with a value" ) ;
} ) . tapCatch ( ( error ) => {
debug ( "Read request was satisfied with an error:" , error . message ) ;
debug ( "[request] satisfied with an error:" , error . message ) ;
} ) ;
}
} ;