"use strict" ;
const Promise = require ( "bluebird" ) ;
const consumable = require ( "@joepie91/consumable" ) ;
const pPause = require ( "p-pause" ) ;
const debug = require ( "debug" ) ( "promistream:last-will" ) ;
const isEndOfStream = require ( "@promistream/is-end-of-stream" ) ;
const isAborted = require ( "@promistream/is-aborted" ) ;
const propagateAbort = require ( "@promistream/propagate-abort" ) ;
const { validateOptions } = require ( "@validatem/core" ) ;
const isFunction = require ( "@validatem/is-function" ) ;
const wrapValueAsOption = require ( "@validatem/wrap-value-as-option" ) ;
const requireEither = require ( "@validatem/require-either" ) ;
let NoValue = Symbol ( ) ;
// MARKER: Test this in batchOn, then finalize the refactoring
// FIXME: Make NoValue its own reusable promistream package? For use as a marker in different types of streams which need to deal with multiple possibly-matching handlers, or eg. map-filter
// FIXME: Re-understand and refactor the logic, finish packaging it up
module . exports = function lastWill ( ) {
let { onEndOfStream , onAborted , onAllEnds } = validateOptions ( arguments , [
wrapValueAsOption ( "onEndOfStream" ) , {
onEndOfStream : [ isFunction ] ,
onAborted : [ isFunction ] ,
onAllEnds : [ isFunction ] ,
} ,
requireEither ( [ "onEndOfStream" , "onAborted" , "onAllEnds" ] , { allowMultiple : true } )
] ) ;
let errorBuffer = consumable ( ) ;
let hasExecutedLastWill = false ;
// NOTE: We use a resettable pauser to ensure that when reading in parallel mode, all concurrent reads get held up while a stream end marker is being processed, but parallelization is otherwise permitted. Think of it as a conditional `sequentialize`.
// FIXME: Verify that this doesn't ever introduce race conditions
let pauser = pPause ( ) ;
function throwBufferedError ( ) {
// Run after all custom handlers have run out
throw errorBuffer . consume ( ) ;
}
function processUntilValue ( handlers ) {
return Promise . reduce ( handlers , ( lastResult , handler , i ) => {
if ( lastResult === NoValue ) {
debug ( ` Calling handler ${ i } ` ) ;
return handler ( errorBuffer . peek ( ) ) ;
} else {
debug ( ` Skipping handler ${ i } ` ) ;
return lastResult ;
}
} , NoValue ) ;
}
function handleMarker ( marker , markerHandler ) {
pauser . pause ( ) ;
return Promise . try ( ( ) => {
let relevantHandlers = [ markerHandler , onAllEnds , throwBufferedError ]
. filter ( ( handler ) => handler != null ) ;
debug ( ` ${ relevantHandlers . length - 1 } handlers to try, excluding default handler ` ) ;
if ( hasExecutedLastWill || relevantHandlers . length === 0 ) {
debug ( "Bailing" ) ;
throw marker ;
} else {
errorBuffer . set ( marker ) ;
hasExecutedLastWill = true ;
return processUntilValue ( relevantHandlers ) ;
}
} ) . finally ( ( ) => {
pauser . unpause ( ) ;
} ) ;
}
return {
_promistreamVersion : 0 ,
description : ` last-will handler ` ,
abort : propagateAbort ,
peek : function peek _lastWill ( source ) {
return Promise . try ( ( ) => {
return pauser . await ( ) ;
} ) . then ( ( ) => {
if ( errorBuffer . isSet ( ) ) {
/* We've reached the end of the source stream in some way; we'll have at least *some* response ready, whether it is an error or some value produced by a last-will handler */
return true ;
} else {
return source . peek ( ) ;
}
} ) ;
} ,
read : function produceValue _lastWill ( source ) {
return Promise . try ( ( ) => {
return pauser . await ( ) ;
} ) . then ( ( ) => {
if ( errorBuffer . isSet ( ) ) {
return throwBufferedError ( ) ;
} else {
return Promise . try ( ( ) => {
return source . read ( ) ;
} ) . catch ( isEndOfStream , ( err ) => {
debug ( "Got EndOfStream marker" ) ;
return handleMarker ( err , onEndOfStream ) ;
} ) . catch ( isAborted , ( err ) => {
debug ( "Got Aborted marker" ) ;
return handleMarker ( err , onAborted ) ;
} ) ;
}
} ) ;
}
} ;
} ;
module . exports . NoValue = NoValue ;