debugWritable(`[#${objectID(stream)}] Pipeline was aborted`);
returnwritable.destroy();
returnwritable.destroy();
},
},
onSourceChanged:(source)=>{
onSourceChanged:(source)=>{
debugWritable("A source change occurred");
debugWritable(`[#${objectID(stream)}] A source change occurred`);
mostRecentSource=source;
mostRecentSource=source;
}
}
});
});
@ -91,16 +91,16 @@ function fromWritable(stream) {
// NOTE: The use of `var` is intentional, to make hoisting possible here; otherwise we'd have a broken cyclical reference
// NOTE: The use of `var` is intentional, to make hoisting possible here; otherwise we'd have a broken cyclical reference
varwritable=wireUpWritableInterface(stream,{
varwritable=wireUpWritableInterface(stream,{
onEnd:()=>{
onEnd:()=>{
debugWritable(`Underlying stream has reported a close event (upstreamHasEnded = ${upstreamHasEnded})`);
debugWritable(`[#${objectID(stream)}] Underlying stream has reported a close event (upstreamHasEnded = ${upstreamHasEnded})`);
if(!upstreamHasEnded){
if(!upstreamHasEnded){
debugWritable(`Issuing happy abort on converted stream`);
debugWritable(`[#${objectID(stream)}] Issuing happy abort on converted stream`);
convertedStream.abort(true,mostRecentSource);
convertedStream.abort(true,mostRecentSource);
}
}
},
},
onError:(error)=>{
onError:(error)=>{
// Make sure we notify the pipeline, if any, by passing in the most recent source stream that we've seen.
// Make sure we notify the pipeline, if any, by passing in the most recent source stream that we've seen.
debugWritable(`Issuing error abort on converted stream due to: ${error.message}`);
debugWritable(`[#${objectID(stream)}] Issuing error abort on converted stream due to: ${error.message}`);
convertedStream.abort(error,mostRecentSource);
convertedStream.abort(error,mostRecentSource);
}
}
});
});
@ -112,54 +112,90 @@ let debugTransform = debug("promistream:from-node-stream:transform");
functionfromTransform(stream){
functionfromTransform(stream){
letendHandled=false;
letendHandled=false;
letlastWrite=null;
// FIXME: we need to specifically watch for the `error` and `end` events on the readable interface, to know when the transform stream has fully completed processing
// FIXME: we need to specifically watch for the `error` and `end` events on the readable interface, to know when the transform stream has fully completed processing
// Respond to the EndOfStream produced by the pushbuffer in this case
// Respond to the EndOfStream produced by the pushbuffer in this case
// request, destroy
// request, destroy
letreadable =wireUpReadableInterface(stream,{
letnodeReadable =wireUpReadableInterface(stream,{
onEnd:()=>{
onEnd:()=>{
debugTransform("Received end/close event from underlying stream");
debugTransform(`[#${objectID(stream)}] Received end/close event from underlying stream`);
},
},
onError:()=>{
onError:()=>{
debugTransform("Received error event from underlying stream");
debugTransform(`[#${objectID(stream)}] Received error event from underlying stream`);
}
}
});
});
// write, end, destroy
// write, end, destroy
letwritable=wireUpWritableInterface(stream);
letnodeWritable=wireUpWritableInterface(stream);
/*
So,let's talk about transform streams. They'reacomplicatedbeasttodealwith-theyseparatelyimplementareadableandwritableinterface(likeanyDuplexstream),butthereisno*reliable*waytoassociateagivenoutputvalue(onthereadableinterface)withagiveninputvalue(onthewritableinterface).However,westillneedtorespectthebackpressuresignals;which,inNodestreams,arepush-basedratherthanpull-based.
-Ifthepush-bufferis*also*empty,seeifthereisstilla'write in progress'.Thisisaconceptthatonlyexistsinourabstractions-thecorrespondingNodestreamconceptis"the last write returned `false`, and we're waiting for a 'drain' event".Ifthisisthecase,wedon't do anything else except wait for a new value to appear; we register a notification request with the push-buffer, which returns a Promise that will resolve once one or more new values are produced by the Nodestream - whether as a result of our previous write, or otherwise. Either way, that'soursignaltocontinue,asthere'snowstufftoconsumeandreturn.
-Ifthereis*not*awrite-in-progress,thenbythispointit's extremely unlikely that new values will ever be produced by the underlying Nodestream - all the buffers are empty, and backpressure has been relieved, which means that it is most likely expecting us to write something to it. We therefore do an 'upstreamread'(fromthePromistreampipeline)andpasstheresultintotheNodestream,registeringthewriteasthelastWriteincaseitendsupwithbackpressure(whichanextwritewouldneedtowaiton).Wethentrytoimmediatelyemptythepush-buffer,incasetheNodestreamhasimmediately/synchronouslyproducedvaluesfromourinput;ifnot,wejustreturnnothing,andwaitforthenextreadcycletohappen.
-Nodestreamsthatdo*not*implementbackpressurewillgetnewvalueswrittentothemeverytimeourpush-buffer(outputbuffer)runsout;sowestilldon't write more often than we *need* to (preferring to satisfy requests locally where possible), but we also don'tblockonwaitingfornewvaluestoappear,whichwouldn't work if there'sno1:1correspondencebetweenwritesandreadsintheNodestream.
// NOTE: This logic exists at the start, not in the upstream EndOfStream handling code, because any number of buffer reads may be required before the wrapped Node stream can be closed
// NOTE: This logic exists at the start, not in the upstream EndOfStream handling code, because any number of buffer reads may be required before the wrapped Node stream can be closed
// NOTE: The push-buffer will automatically produce EndOfStream markers once the buffer has run out and the underlying stream has closed, so long as we're using the wireUpReadableInterface function
// NOTE: The underlying push-buffer will automatically produce EndOfStream markers once the buffer has run out and the underlying stream has closed, so long as we're using the wireUpReadableInterface function
// FIXME: Refactor this design to request-matcher instead?
// TODO: Refactor this design (and/or push-buffer itself) to use request-matcher instead?
returnPromise.try(()=>{
returnPromise.try(()=>{
returnreadable.request();
returnnodeReadable.request();
}).then((result)=>{
}).then((result)=>{
return[result];
return[result];
});
});
}else{
}else{
returnPromise.try(()=>{
returnPromise.try(()=>{
debugTransform("Doing upstream read...");
if(lastWrite!=null&&!lastWrite.isFulfilled()){
returnsource.read();
debugTransform(`[#${objectID(stream)}] Write already in progress; waiting for new readable values to become available...`);
// NOTE: We cannot just wait for the write here; it'll block if the internal buffer of the transform stream is full, and our read would block in turn, leading to the buffer never emptying. This would deadlock. Instead, we store the promise representing our write, so that the *next* write cycle can inspect it, which are the semantics we're actually after.
// HACK: There is *technically* a race condition here; it's possible for another upstream read to start before the first read reaches the "writing to writable interface" point, in which case the second write will 'bypass' the lastWrite check and override the value of lastWrite. However, since all write operations listen to the same `drain` event on the underlying stream anyway, this shouldn't cause any change in behaviour. If the underlying write implementation ever changes, this approach needs to be reevaluated.
// Note that this sort-of violates the Node streams API - which *allows* further writes while waiting for a drain event, it's just strongly recommended against. But for now we're assuming that if someone decides to use a parallelization stream, they are okay with the additional memory usage.
});
}
}).then(()=>{
// This will quite possibly return an empty buffer, but that is fine; the `buffer` stream downstream from us will just keep reading (and therefore queueing up new items to be transformed) until it gets some results.
// This will quite possibly return an empty buffer, but that is fine; the `buffer` stream downstream from us will just keep reading (and therefore queueing up new items to be transformed) until it gets some results.
debugTransform("Consuming immediate buffer from readable interface");
debugTransform(`[#${objectID(stream)}] Consuming immediate buffer from nodestream's readable interface`);
returnreadable.consumeImmediateBuffer();
returnnodeReadable.consumeImmediateBuffer();
}).catch(isEndOfStream,()=>{
}).catch(isEndOfStream,()=>{
debugTransform("End of upstream reached");
debugTransform(`[#${objectID(stream)}] End of upstream reached`);
endHandled=true;
endHandled=true;
debugTransform("Closing via writable interface");
debugTransform(`[#${objectID(stream)}] Closing via nodestream's writable interface`);
writable.end();
nodeWritable.end();
// Return nothing, let the next read call (and all of those after that) deal with either underlying stream completion or buffered results
// Return nothing, let the next read call (and all of those after that) deal with either underlying stream completion or buffered results
return[];
return[];
@ -176,3 +212,4 @@ function fromTransform(stream) {
// TODO: Move this to a stand-alone package, so that it can be reused elsewhere
if(global.__objectID_map==null){
// Yes, these are deliberately global - we want to ensure that the same value gets the same assigned ID, regardless of where in the dependency tree this ID is obtained