You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

6.4 KiB

push-buffer

An abstraction for handling buffering of asynchronous pull-based and push-based operations.

  • Request a value from the buffer, receive a Promise.
  • Depending on mode of operation, values are either acquired through an asynchronous callback (pull), or provided externally at a later point (push).
  • Values that come in when there are no outstanding request are buffered, to be reconciled with later requests; ie. the abstraction matches both requests and results, and reconciles them in an order-preserving manner.
  • Support for dividing values into 'lanes'; ie. independent request queues where values get assigned to specific lanes based on some predicate function, for implementing complex value distribution patterns.
  • Supports @promistream/NoValue to explicitly represent cases where no value is produced; this means you can send null and undefined to a queue just like any other value.
  • Values and errors can be sent to one lane, multiple lanes (multicast), or all lanes (broadcast).

Typical usecases

  • Implementing Promistreams, particularly forking streams.
  • Implementing asynchronous task queues and task distribution systems.
  • Splitting out asynchronously obtain values into different worker threads.
  • Converting push-based APIs (eg. EventEmitters) into pull-based APIs (asynchronous iterators, Promistreams, pull-streams, etc.)

Caveats

  • The internal queues are unbounded, and so may use an arbitrary amount of memory. You should ensure that you are continuously requesting values from all lanes, so that they never become too full.
  • Likewise, there is nothing that prevents results from sitting in a lane forever and never actually being read, even when the process terminates; you must continuously drive reads to ensure you are not missing any values.
  • The buffer can currently only operate in either push or pull mode, because there are no obviously-correct semantics for a combined mode; if you still need a combined mode, please file an issue describing your exact usecase, so that I can better understand what the real-world requirements of such cases are.
  • While using this abstraction is much less error-prone than manually implementing asynchronous reconciliation, it is still somewhat difficult to get right, and very difficult to debug when you don't; if possible, prefer using higher-level off-the-shelf libraries for your usecase instead.

Lanes

This library supports a concept of 'lanes'. Each lane is an independent queue of values and errors; you make a request for a value to a specific lane, and values and errors are assigned to specific lane, even though the source of values is shared among all of them. Additionally, it is possible to broadcast errors or values to all lanes.

If you just want to convert a push-based API to a pull-based API, you probably won't need lanes, and you can safely ignore all options related to it. However, if you are implementing some kind of value or task distribution mechanism, you will probably want to use lanes to ensure that the right values end up at the right place. It's up to you what to do with the lane assignments; they're just a list of zero-indexed queues internally, and you can map the lane indexes to some sort of other lookup table in your own code if needed.

If you are implementing a forking Promistream, the most useful lane configuration will most likely be: one lane per output stream, with error broadcasts enabled.

Example

API

let buffer = pushBuffer(options)

Note that using lanes is optional; all options and functions default to lane 0, which is the only lane when no lane count is specified (and so it functions as if lanes didn't exist).

  • options: The settings for this pushBuffer.
    • mode: Default: "pull". The mode to operate in. One of "push", "pull".
    • lanes Default: 1. The amount of lanes to create. Each lane is an independent queue of values and errors, though broadcasts are possible.
    • pull: Required only in pull mode. An (async) callback that's called to acquire a new value/result, and which is expected to return a Promise. This should be providing your values in pull mode.
    • sequential: Default: false. Only used in pull mode. Whether to handle requests sequentially; that is, a second pull will not be started until the previous pull has completed, even if a second request is made.
    • select: Default: 0. A callback that receives a value, and returns the index of the lane to assign it to (or an array of such indexes). The callback may return a Promise.
    • selectError: Default: 0. Like select, but it receives errors instead of values.
    • broadcastValues: Default: false. Whether to broadcast all values to all lanes by default. The select callback will not be called for broadcast values. In push mode, this can be overridden for individual values.
    • broadcastErrors: Default: true. Like broadcastValues, but for errors. The selectError callback will not be called for broadcast errors. In push mode, this can be overridden for individual errors.

Returns: A new pushBuffer.

buffer.request(lane)

Requests the next value (for a given lane).

  • lane: Default: 0. The lane to request a value for.

Returns: a Promise that will eventually either resolve or reject, depending on the value/error acquired.

buffer.push(value, broadcast)

Pushes a value (or a Promise) to the next pending request.

  • value: Required. The value to push.
  • broadcast: Default: the broadcastValues setting. Whether to broadcast this value to all lanes.

buffer.pushError(error, broadcast)

Pushes an error to the next pending request. Note that if you wish to push a Promise that may fail (rather than an error you already have), you should use buffer.push instead; it will automatically be handled as an error if it ends up rejecting.

  • error: Required. The value to push.
  • broadcast: Default: the broadcastErrors setting. Whether to broadcast this error to all lanes.

buffer.countLane(lane)

Provides queue lengths for the given lane.

  • lane: Default: 0. The lane to request a value for.

Returns: an object with queue length properties:

  • values: The amount of pending results that have not been reconciled with a request yet - note that this includes errors/rejections!
  • requests: The amount of pending requests that have not been reconciled with a result yet.