You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
Sven Slootweg 0d6955fac1 Initial version 5 months ago
.gitignore Initial version 5 months ago
README.md Initial version 5 months ago
example.js Initial version 5 months ago
index.js Initial version 5 months ago
package.json Initial version 5 months ago
pnpm-lock.yaml Initial version 5 months ago

README.md

@promistream/fork-select

A general-purpose forking Promistream. Lets you fork a pipeline into an arbitrary number of downstream forks, and divide values among them based on an arbitrary predicate function. Can be used directly, or as a basis for other types of forking streams.

Stream characteristics:

  • Promistream version: 0
  • Stream type: Fork
  • Supports parallelization: Yes (order-preserving)
  • Buffering: Unbounded; one internal buffer per fork, buffer only fills as needed to unblock other forks
  • Fork distribution strategy: User-specified

Example

A runnable version of this example is included in the package as example.js.

"use strict";

const pipe = require("@promistream/pipe");
const debug = require("@promistream/debug");
const fromIterable = require("@promistream/from-iterable");
const forkSelect = require("@promistream/fork-select");
const collect = require("@promistream/collect");

(async () => {
	let [ a, b ] = await pipe([
		fromIterable([ 0, 0, 0, 1, 1, 2 ]),
		debug("source"),
		forkSelect(2, (value) => {
			return value % 2;
		})
	]).read();

	setTimeout(async () => {
		try {
			let bResults = await pipe([
				b,
				// debug("pipeline B"),
				collect()
			]).read();

			console.log({ bResults }); // { bResults: [ 1, 1 ] }
		} catch (error) {
			console.error("error from B", error);
		}
	}, 1000);

	try {
		let aResults = await pipe([
			a,
			// debug("pipeline A"),
			collect()
		]).read();

		console.log({ aResults }); // { aResults: [ 0, 0, 0, 2 ] }
	} catch (error) {
		console.error("error from A", error);
	}
})();

API

forkSelect(forkCount, select)

Creates a new forkSelect stream. Note that only the value assignment is configurable; errors (including EndOfStream and Aborted markers) are always broadcast to all forks.

  • forkCount: Required. The amount of forks to create.
  • select: Required. The predicate callback to determine which fork a given value should be sent to. Receives the value as its argument, and is expected to return (a Promise of) the index of the fork to send it to. An array of fork indexes may also be specified to send the value to multiple forks.