// FIXME: Maybe also an abstraction for 'handle queue of requests', as this is used in multiple stream implementations
// TODO: Improve robustness of stream-end handling using https://nodejs.org/dist/latest-v14.x/docs/api/stream.html#stream_stream_finished_stream_options_callback?
// FIXME: Sequentialize all of these?
// readable
// writable
// transform
// duplex
module.exports=functionconvert(stream){
// FIXME: Proper validation and tagging
// FIXME: Wrap v1 streams
// NOTE: Standard I/O streams are specialcased here because they may be Duplex streams; even though the other half is never actually used. We're only interested in the interface that *is* being used.
thrownewError(`Duplex streams cannot be converted with the auto-detection API. Instead, use 'fromReadable' and/or 'fromWritable' manually, depending on which parts of the Duplex stream you are interested in.`);
// FIXME: we need to specifically watch for the `error` and `end` events on the readable interface, to know when the transform stream has fully completed processing
// Respond to the EndOfStream produced by the pushbuffer in this case
// NOTE: This logic exists at the start, not in the upstream EndOfStream handling code, because any number of buffer reads may be required before the wrapped Node stream can be closed
// NOTE: The push-buffer will automatically produce EndOfStream markers once the buffer has run out and the underlying stream has closed, so long as we're using the wireUpReadableInterface function
returnPromise.try(()=>{
returnreadable.request();
}).then((result)=>{
return[result];
});
}else{
returnPromise.try(()=>{
debugTransform("Doing upstream read...");
returnsource.read();
}).then((value)=>{
debugTransform("Writing upstream value to writable interface");
writable.write(value);
// This will quite possibly return an empty buffer, but that is fine; the `buffer` stream downstream from us will just keep reading (and therefore queueing up new items to be transformed) until it gets some results.
debugTransform("Consuming immediate buffer from readable interface");