You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

166 lines
5.0 KiB

"use strict";
const Promise = require("bluebird");
const simpleSource = require("@ppstreams/simple-source");
const simpleSink = require("@ppstreams/simple-sink");
const buffer = require("@ppstreams/buffer");
const propagatePeek = require("@ppstreams/propagate-peek");
const propagateAbort = require("@ppstreams/propagate-abort");
const pipe = require("@ppstreams/pipe");
const isEndOfStream = require("@ppstreams/is-end-of-stream");
const createDefer = require("./src/create-defer");
const wireUpReadableInterface = require("./src/readable");
const wireUpWritableInterface = require("./src/writable");
// FIXME: Maybe also an abstraction for 'handle queue of requests', as this is used in multiple stream implementations
// TODO: Improve robustness of stream-end handling using
// FIXME: Sequentialize all of these?
// readable
// writable
// transform
// duplex
module.exports = function convert(stream) {
// FIXME: Proper validation and tagging
// FIXME: Wrap v1 streams
// NOTE: Standard I/O streams are specialcased here because they may be Duplex streams; even though the other half is never actually used. We're only interested in the interface that *is* being used.
if (stream === process.stdin) {
return fromReadable(stream);
} else if (stream === process.stdout || stream === process.stderr) {
return fromWritable(stream);
} else if (stream.writable != null) {
if (stream.readable != null) {
if (stream._transform != null) {
// transform
return fromTransform(stream);
} else {
throw new Error(`Duplex streams cannot be converted with the auto-detection API. Instead, use 'fromReadable' and/or 'fromWritable' manually, depending on which parts of the Duplex stream you are interested in.`);
} else {
return fromWritable(stream);
} else if (stream.readable != null) {
return fromReadable(stream);
} else {
throw new Error(`Not a Node stream`);
// FIXME: Duplex APIs
function fromReadable(stream) {
let readable = wireUpReadableInterface(stream);
return simpleSource({
onRequest: () => {
return readable.request();
onAbort: () => {
return readable.destroy();
function fromWritable(stream) {
let upstreamHasEnded = false;
let mostRecentSource = { abort: function() {} }; // FIXME: Replace with a proper spec-compliant dummy stream
let convertedStream = simpleSink({
onResult: (result) => {
return writable.write(result);
onEnd: () => {
upstreamHasEnded = true;
return writable.end();
onAbort: (_reason) => {
return writable.destroy();
onSourceChanged: (source) => {
mostRecentSource = source;
// NOTE: The use of `var` is intentional, to make hoisting possible here; otherwise we'd have a broken cyclical reference
var writable = wireUpWritableInterface(stream, {
onClose: () => {
if (!upstreamHasEnded) {
onError: (error) => {
// Make sure we notify the pipeline, if any, by passing in the most recent source stream that we've seen.
convertedStream.abort(mostRecentSource, error);
return convertedStream;
function fromTransform(stream) {
let completionDefer;
let endHandled = false;
// FIXME: we need to specifically watch for the `error` and `end` events on the readable interface, to know when the transform stream has fully completed processing
// Respond to the EndOfStream produced by the pushbuffer in this case
// request, destroy
let readable = wireUpReadableInterface(stream, {
onEnd: () => {
if (completionDefer != null) {
onError: (error) => {
if (completionDefer != null) {
// write, end, destroy
var writable = wireUpWritableInterface(stream);
let convertedStream = {
_promistreamVersion: 0,
description: `converted Node.js transform stream`,
abort: propagateAbort,
peek: propagatePeek,
read: function produceValue_nodeTransformStream(source) {
return Promise.try(() => {
}).then((value) => {
// This will quite possibly return an empty buffer, but that is fine; the `buffer` stream downstream from us will just keep reading (and therefore queueing up new items to be transformed) until it gets some results.
return readable.consumeImmediateBuffer();
}).catch(isEndOfStream, (marker) => {
// Wait for transform stream to drain fully, `error`/`end` event, and then return whatever buffer remains.
// FIXME: Error propagation logic is pretty shaky here. Verify that we don't end up with double error reports.
if (endHandled === false) {
endHandled = true;
return Promise.try(() => {
let { promise, defer } = createDefer();
completionDefer = defer;
return promise;
}).then(() => {
return readable.consumeImmediateBuffer();
} else {
throw marker;
return pipe([