You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
174 lines
5.9 KiB
JavaScript
174 lines
5.9 KiB
JavaScript
"use strict";
|
|
|
|
const Promise = require("bluebird");
|
|
const simpleSource = require("@promistream/simple-source");
|
|
const simpleSink = require("@promistream/simple-sink");
|
|
const buffer = require("@promistream/buffer");
|
|
const propagatePeek = require("@promistream/propagate-peek");
|
|
const propagateAbort = require("@promistream/propagate-abort");
|
|
const pipe = require("@promistream/pipe");
|
|
const isEndOfStream = require("@promistream/is-end-of-stream");
|
|
const debug = require("debug");
|
|
|
|
const createDefer = require("./src/create-defer");
|
|
const wireUpReadableInterface = require("./src/readable");
|
|
const wireUpWritableInterface = require("./src/writable");
|
|
|
|
// FIXME: Maybe also an abstraction for 'handle queue of requests', as this is used in multiple stream implementations
|
|
// TODO: Improve robustness of stream-end handling using https://nodejs.org/dist/latest-v14.x/docs/api/stream.html#stream_stream_finished_stream_options_callback?
|
|
// FIXME: Sequentialize all of these?
|
|
|
|
// readable
|
|
// writable
|
|
// transform
|
|
// duplex
|
|
|
|
module.exports = function convert(stream) {
|
|
// FIXME: Proper validation and tagging
|
|
// FIXME: Wrap v1 streams
|
|
// NOTE: Standard I/O streams are specialcased here because they may be Duplex streams; even though the other half is never actually used. We're only interested in the interface that *is* being used.
|
|
if (stream === process.stdin) {
|
|
return fromReadable(stream);
|
|
} else if (stream === process.stdout || stream === process.stderr) {
|
|
return fromWritable(stream);
|
|
} else if (stream.writable != null) {
|
|
if (stream.readable != null) {
|
|
if (stream._transform != null) {
|
|
// transform
|
|
return fromTransform(stream);
|
|
} else {
|
|
throw new Error(`Duplex streams cannot be converted with the auto-detection API. Instead, use 'fromReadable' and/or 'fromWritable' manually, depending on which parts of the Duplex stream you are interested in.`);
|
|
}
|
|
} else {
|
|
return fromWritable(stream);
|
|
}
|
|
} else if (stream.readable != null) {
|
|
return fromReadable(stream);
|
|
} else {
|
|
throw new Error(`Not a Node stream`);
|
|
}
|
|
};
|
|
|
|
function fromReadable(stream) {
|
|
let readable = wireUpReadableInterface(stream);
|
|
|
|
return simpleSource({
|
|
onRequest: () => {
|
|
return readable.request();
|
|
},
|
|
onAbort: () => {
|
|
return readable.destroy();
|
|
}
|
|
});
|
|
}
|
|
|
|
let debugWritable = debug("promistream:from-node-stream:writable");
|
|
|
|
function fromWritable(stream) {
|
|
let upstreamHasEnded = false;
|
|
let mostRecentSource = { abort: function() {} }; // FIXME: Replace with a proper spec-compliant dummy stream
|
|
|
|
let convertedStream = simpleSink({
|
|
onResult: (result) => {
|
|
debugWritable("Received value");
|
|
return writable.write(result);
|
|
},
|
|
onEnd: () => {
|
|
debugWritable("Upstream reported end-of-stream");
|
|
upstreamHasEnded = true;
|
|
return writable.end();
|
|
},
|
|
onAbort: (_reason) => {
|
|
debugWritable("Pipeline was aborted");
|
|
return writable.destroy();
|
|
},
|
|
onSourceChanged: (source) => {
|
|
debugWritable("A source change occurred");
|
|
mostRecentSource = source;
|
|
}
|
|
});
|
|
|
|
// NOTE: The use of `var` is intentional, to make hoisting possible here; otherwise we'd have a broken cyclical reference
|
|
var writable = wireUpWritableInterface(stream, {
|
|
onClose: () => {
|
|
if (!upstreamHasEnded) {
|
|
convertedStream.abort(true);
|
|
}
|
|
},
|
|
onError: (error) => {
|
|
// Make sure we notify the pipeline, if any, by passing in the most recent source stream that we've seen.
|
|
convertedStream.abort(mostRecentSource, error);
|
|
}
|
|
});
|
|
|
|
return convertedStream;
|
|
}
|
|
|
|
let debugTransform = debug("promistream:from-node-stream:transform");
|
|
|
|
function fromTransform(stream) {
|
|
let endHandled = false;
|
|
|
|
// FIXME: we need to specifically watch for the `error` and `end` events on the readable interface, to know when the transform stream has fully completed processing
|
|
// Respond to the EndOfStream produced by the pushbuffer in this case
|
|
// request, destroy
|
|
let readable = wireUpReadableInterface(stream, {
|
|
onEnd: () => {
|
|
debugTransform("Received end/close event from underlying stream");
|
|
},
|
|
onError: () => {
|
|
debugTransform("Received error event from underlying stream");
|
|
}
|
|
});
|
|
|
|
// write, end, destroy
|
|
let writable = wireUpWritableInterface(stream);
|
|
|
|
let convertedStream = {
|
|
_promistreamVersion: 0,
|
|
description: `converted Node.js transform stream`,
|
|
abort: propagateAbort,
|
|
peek: propagatePeek,
|
|
read: function produceValue_nodeTransformStream(source) {
|
|
if (endHandled) {
|
|
// NOTE: This logic exists at the start, not in the upstream EndOfStream handling code, because any number of buffer reads may be required before the wrapped Node stream can be closed
|
|
// NOTE: The push-buffer will automatically produce EndOfStream markers once the buffer has run out and the underlying stream has closed, so long as we're using the wireUpReadableInterface function
|
|
return Promise.try(() => {
|
|
return readable.request();
|
|
}).then((result) => {
|
|
return [ result ];
|
|
});
|
|
} else {
|
|
return Promise.try(() => {
|
|
debugTransform("Doing upstream read...");
|
|
return source.read();
|
|
}).then((value) => {
|
|
debugTransform("Writing upstream value to writable interface");
|
|
writable.write(value);
|
|
|
|
// This will quite possibly return an empty buffer, but that is fine; the `buffer` stream downstream from us will just keep reading (and therefore queueing up new items to be transformed) until it gets some results.
|
|
debugTransform("Consuming immediate buffer from readable interface");
|
|
return readable.consumeImmediateBuffer();
|
|
}).catch(isEndOfStream, () => {
|
|
debugTransform("End of upstream reached");
|
|
endHandled = true;
|
|
|
|
debugTransform("Closing via writable interface");
|
|
writable.end();
|
|
|
|
// Return nothing, let the next read call (and all of those after that) deal with either underlying stream completion or buffered results
|
|
return [];
|
|
});
|
|
}
|
|
}
|
|
};
|
|
|
|
return pipe([
|
|
convertedStream,
|
|
buffer()
|
|
]);
|
|
}
|
|
|
|
module.exports.fromReadable = fromReadable;
|
|
module.exports.fromWritable = fromWritable;
|