@ -43,6 +43,7 @@ Note that using lanes is __optional__; all options and functions default to lane
- __mode:__*Default: `"pull"`.* The mode to operate in. One of `"push"`, `"pull"`.
- __lanes__*Default: `1`.* The amount of lanes to create. Each lane is an independent queue of values and errors, though broadcasts are possible.
- __pull:__*Required only in pull mode.* An (`async`) callback that's called to acquire a new value/result, and which is expected to return a Promise. This should be providing your values in pull mode.
- __sequential:__*Default: `false`. Only used in pull mode.* Whether to handle requests sequentially; that is, a second pull will not be started until the previous pull has completed, even if a second request is made.
- __select:__*Default: `0`.* A callback that receives a value, and returns the index of the lane to assign it to (or an array of such indexes). The callback may return a Promise.
- __selectError:__*Default: `0`.* Like `select`, but it receives errors instead of values.
- __broadcastValues:__*Default: `false`.* Whether to broadcast all values to all lanes by default. The `select` callback will not be called for broadcast values. In push mode, this can be overridden for individual values.
@ -111,10 +112,12 @@ module.exports = function createPushBuffer(_options) {
letlongestQueue=Math.max(...laneLengths);
if(longestQueue>0){
debug(`there are still requests pending; kicking off ${longestQueue} new pulls to satisfy longest queue`);
letnewPulls=(sequential)?1:longestQueue;
debug(`there are still requests pending; kicking off ${newPulls} new pulls to satisfy longest queue (${longestQueue})`);
debug(` lane lengths:`,laneLengths);
for(leti=0;i<longestQueue;i++){
for(leti=0;i<newPulls;i++){
doPull();
}
}
@ -240,7 +243,11 @@ module.exports = function createPushBuffer(_options) {
if(mode==="pull"){
// We don't need to handle errors in doPull because errors in the actual pull callback are propagated through the defer. Errors while *initiating* the pull should be thrown from here (which happens automatically because `doPull` is itself synchronous).
doPull();
if(sequential===false||pullQueue.length===0){
doPull();
}else{
// In sequential mode, we let the reconciliation process kick off the next pull once this one is completed.