"use strict" ;
const capturePromise = require ( "capture-promise" ) ;
const singleConcurrent = require ( "single-concurrent" ) ;
const assureArray = require ( "assure-array" ) ;
const promiseDefer = require ( "@joepie91/promise-defer" ) ;
const debug = require ( "debug" ) ( "push-buffer" ) ;
const NoValue = require ( "@promistream/no-value" ) ;
const { validateOptions } = require ( "@validatem/core" ) ;
const required = require ( "@validatem/required" ) ;
const isInteger = require ( "@validatem/is-integer" ) ;
const isFunction = require ( "@validatem/is-function" ) ;
const defaultTo = require ( "@validatem/default-to" ) ;
const oneOf = require ( "@validatem/one-of" ) ;
const dynamic = require ( "@validatem/dynamic" ) ;
const isBoolean = require ( "@validatem/is-boolean" ) ;
/ * T O D O :
- Add limits for buffer size ? This is difficult to implement since we don ' t know upfront what lane a value will be assigned to , and buffers are ideally per - lane .
* /
function suppressUncaughtRejection ( promise ) {
promise . catch ( ( error ) => {
// We do log the suppressed error, for debugging purposes
debug ( "error temporarily suppressed;" , error ) ;
} ) ;
return promise ;
}
module . exports = function createPushBuffer ( _options ) {
function requiredIfMode ( acceptableModes ) {
return dynamic ( ( _value ) => {
// This is a bit hacky, but currently necessary to make it work how we want
let mode = _options . mode ? ? "pull" ;
let acceptableArray = assureArray ( acceptableModes ) ;
if ( acceptableArray . includes ( mode ) ) {
return [ required ] ;
} else {
return [ ] ;
}
} ) ;
}
let { lanes , pull , select , selectError , mode , sequential , broadcastErrors , broadcastValues } = validateOptions ( arguments , {
lanes : [ defaultTo ( 1 ) , isInteger ] ,
// Currently only either push or pull mode is supported; if you need both at the same time, file an issue!
mode : [ defaultTo ( "pull" ) , oneOf ( [ "push" , "pull" ] ) ] ,
pull : [ requiredIfMode ( "pull" ) , isFunction ] ,
sequential : [ defaultTo ( false ) , isBoolean ] ,
select : [ defaultTo . literal ( ( _value ) => 0 ) , isFunction ] ,
selectError : [ defaultTo . literal ( ( _value ) => 0 ) , isFunction ] ,
broadcastValues : [ defaultTo ( false ) , isBoolean ] ,
broadcastErrors : [ defaultTo ( true ) , isBoolean ] ,
// append == add after all pending pulls (including pushed promises!), insert == send to outstanding requests immediately (so basically insert *before* pending pulls)
} ) ;
let pushOrder = "append" ; // Temporary; this infrastructure exists to support a potential later combined mode
let pullQueue = [ ] ;
let settlementMap = new WeakMap ( ) ;
let valueQueues = [ ] ;
let requestQueues = [ ] ;
for ( let i = 0 ; i < lanes ; i ++ ) {
valueQueues [ i ] = [ ] ;
requestQueues [ i ] = [ ] ;
}
function getLane ( lane ) {
if ( lane < lanes ) {
return {
values : valueQueues [ lane ] ,
requests : requestQueues [ lane ]
} ;
} else {
throw new Error ( ` Tried to access lane ${ lane } , but only lanes 0- ${ lanes - 1 } exist ` ) ;
}
}
function storePull ( pullPromise ) {
settlementMap . set ( pullPromise , false ) ;
pullQueue . push ( pullPromise ) ;
debug ( "queued pull" ) ;
let actualPromise = ( pullPromise . _ _preset === true )
? pullPromise . promise
: pullPromise ;
// Note that we are storing the *original* pullPromise for later use, and not this 'notification chain' below, because otherwise we would be swallowing errors.
suppressUncaughtRejection ( actualPromise . finally ( ( ) => {
settlementMap . set ( pullPromise , true ) ;
debug ( "marked pull as settled" ) ;
tryRunReconciliation ( ) ;
} ) ) ;
}
function doPull ( ) {
let pullPromise = capturePromise ( ( ) => pull ( ) ) ;
storePull ( pullPromise ) ;
}
let tryRunReconciliation = singleConcurrent ( async function reconcile ( ) {
if ( pullQueue . length === 0 ) {
debug ( "reconciliation: empty pullQueue; finishing loop" ) ;
let laneLengths = requestQueues . map ( ( queue ) => queue . length ) ;
let longestQueue = Math . max ( ... laneLengths ) ;
if ( longestQueue > 0 ) {
let newPulls = ( sequential ) ? 1 : longestQueue ;
debug ( ` there are still requests pending; kicking off ${ newPulls } new pulls to satisfy longest queue ( ${ longestQueue } ) ` ) ;
debug ( ` lane lengths: ` , laneLengths ) ;
for ( let i = 0 ; i < newPulls ; i ++ ) {
doPull ( ) ;
}
}
return ;
}
let nextPromise = pullQueue [ 0 ] ;
if ( settlementMap . get ( nextPromise ) !== true ) {
debug ( ` reconciliation: nextPromise not settled yet; finishing loop ` ) ;
return ;
}
debug ( ` reconciliation: nextPromise settled ` ) ;
pullQueue . shift ( ) ; // Remove it from the queue
let value ;
let broadcast = ( nextPromise . _ _preset === true ) ? nextPromise . broadcast : undefined ;
let promise = ( nextPromise . _ _preset === true ) ? nextPromise . promise : nextPromise ;
try {
value = await promise ;
} catch ( error ) {
await sendError ( error , broadcast ) ;
return await reconcile ( ) ;
}
if ( value !== NoValue ) {
await sendValue ( value , broadcast ) ;
}
return await reconcile ( ) ;
} ) ;
async function sendValue ( value , broadcast = broadcastValues ) {
if ( broadcast ) {
return sendValueBroadcast ( value ) ;
} else {
return sendValueLane ( value ) ;
}
}
async function sendValueLane ( value ) {
let lane = await select ( value ) ;
debug ( ` reconciliation: result: resolved, allocated to lane ${ lane } ` ) ;
if ( Array . isArray ( lane ) ) {
for ( let i of lane ) {
sendToLane ( i , Promise . resolve ( value ) ) ;
}
} else {
sendToLane ( lane , Promise . resolve ( value ) ) ;
}
}
async function sendValueBroadcast ( value ) {
debug ( ` reconciliation: result: resolved, broadcasting ` ) ;
requestQueues . forEach ( ( queue , i ) => {
debug ( ` reconciliation: (sending to lane ${ i } ) ` ) ;
sendToLane ( i , Promise . resolve ( error ) ) ;
} ) ;
}
async function sendError ( value , broadcast = broadcastErrors ) {
if ( broadcast ) {
return sendErrorBroadcast ( value ) ;
} else {
return sendErrorLane ( value ) ;
}
}
async function sendErrorLane ( error ) {
let lane = await selectError ( error ) ;
debug ( ` reconciliation: result: rejected, allocated to lane ${ lane } ` ) ;
if ( Array . isArray ( lane ) ) {
for ( let i of lane ) {
sendToLane ( i , suppressUncaughtRejection ( Promise . reject ( value ) ) ) ;
}
} else {
sendToLane ( lane , suppressUncaughtRejection ( Promise . reject ( value ) ) ) ;
}
}
async function sendErrorBroadcast ( error ) {
debug ( ` reconciliation: result: rejected, broadcasting ` ) ;
requestQueues . forEach ( ( queue , i ) => {
debug ( ` reconciliation: (sending to lane ${ i } ) ` ) ;
sendToLane ( i , suppressUncaughtRejection ( Promise . reject ( error ) ) ) ;
} ) ;
}
// We use Promises as a Result type of sorts here, which is fine since this is all async anyway
function sendToLane ( lane , promise ) {
let { requests , values } = getLane ( lane ) ;
debug ( ` sendToQueue: lane ${ lane } ; request = ${ requests . length } , values = ${ values . length } ` ) ;
if ( requests . length > 0 ) {
debug ( ` sendToQueue: satisfying request ` ) ;
let request = requests . shift ( ) ;
request . resolve ( promise ) ;
} else {
debug ( ` sendToQueue: storing in value buffer ` ) ;
values . push ( promise ) ;
}
}
return {
request : function ( lane = 0 ) {
debug ( ` value requested from lane ${ lane } ` ) ;
let { values , requests } = getLane ( lane ) ;
if ( values . length > 0 ) {
debug ( ` value immediately available; reconciling ` ) ;
return Promise . resolve ( values . shift ( ) ) ;
} else {
let defer = promiseDefer ( ) ;
requests . push ( defer ) ;
if ( mode === "pull" ) {
// We don't need to handle errors in doPull because errors in the actual pull callback are propagated through the defer. Errors while *initiating* the pull should be thrown from here (which happens automatically because `doPull` is itself synchronous).
if ( sequential === false || pullQueue . length === 0 ) {
doPull ( ) ;
} else {
// In sequential mode, we let the reconciliation process kick off the next pull once this one is completed.
}
}
return defer . promise ;
}
} ,
push : async function ( value , broadcast ) {
if ( mode === "push" ) {
debug ( ` reconciliation: value pushed ` ) ;
if ( pushOrder === "insert" ) {
await sendValue ( value , broadcast ) ;
} else if ( pushOrder === "append" ) {
storePull ( { _ _preset : true , broadcast : broadcast , promise : Promise . resolve ( value ) } ) ;
} else {
throw new Error ( ` Unreachable ` ) ;
}
} else {
throw new Error ( ` This buffer is operating in pull mode; pushing values is not supported. Switch to push mode instead, if this is what you need. ` ) ;
}
} ,
pushError : async function ( error , broadcast ) {
if ( mode === "push" ) {
debug ( ` reconciliation: error pushed ` ) ;
if ( pushOrder === "insert" ) {
await sendError ( error , broadcast ) ;
} else if ( pushOrder === "append" ) {
storePull ( { _ _preset : true , broadcast : broadcast , promise : Promise . reject ( error ) } ) ;
} else {
throw new Error ( ` Unreachable ` ) ;
}
} else {
throw new Error ( ` This buffer is operating in pull mode; pushing errors is not supported. Switch to push mode instead, if this is what you need. ` ) ;
}
} ,
countLane : function ( lane = 0 ) {
let { values , requests } = getLane ( lane ) ;
return {
values : values . length ,
requests : requests . length
} ;
}
} ;
} ;