diff --git a/index.js b/index.js index fd895bc..a60fff1 100644 --- a/index.js +++ b/index.js @@ -8,6 +8,7 @@ const propagatePeek = require("@promistream/propagate-peek"); const propagateAbort = require("@promistream/propagate-abort"); const pipe = require("@promistream/pipe"); const isEndOfStream = require("@promistream/is-end-of-stream"); +const debug = require("debug"); const createDefer = require("./src/create-defer"); const wireUpReadableInterface = require("./src/readable"); @@ -100,8 +101,9 @@ function fromWritable(stream) { return convertedStream; } +let debugTransform = debug("promistream:from-node-stream:transform"); + function fromTransform(stream) { - let completionDefer; let endHandled = false; // FIXME: we need to specifically watch for the `error` and `end` events on the readable interface, to know when the transform stream has fully completed processing @@ -109,19 +111,15 @@ function fromTransform(stream) { // request, destroy let readable = wireUpReadableInterface(stream, { onEnd: () => { - if (completionDefer != null) { - completionDefer.resolve(); - } + debugTransform("Received end/close event from underlying stream"); }, - onError: (error) => { - if (completionDefer != null) { - completionDefer.reject(error); - } + onError: () => { + debugTransform("Received error event from underlying stream"); } }); // write, end, destroy - var writable = wireUpWritableInterface(stream); + let writable = wireUpWritableInterface(stream); let convertedStream = { _promistreamVersion: 0, @@ -129,32 +127,36 @@ function fromTransform(stream) { abort: propagateAbort, peek: propagatePeek, read: function produceValue_nodeTransformStream(source) { - return Promise.try(() => { - return source.read(); - }).then((value) => { - writable.write(value); - - // This will quite possibly return an empty buffer, but that is fine; the `buffer` stream downstream from us will just keep reading (and therefore queueing up new items to be transformed) until it gets some results. - return readable.consumeImmediateBuffer(); - }).catch(isEndOfStream, (marker) => { - // Wait for transform stream to drain fully, `error`/`end` event, and then return whatever buffer remains. - // FIXME: Error propagation logic is pretty shaky here. Verify that we don't end up with double error reports. - if (endHandled === false) { + if (endHandled) { + // NOTE: This logic exists at the start, not in the upstream EndOfStream handling code, because any number of buffer reads may be required before the wrapped Node stream can be closed + // NOTE: The push-buffer will automatically produce EndOfStream markers once the buffer has run out and the underlying stream has closed, so long as we're using the wireUpReadableInterface function + return Promise.try(() => { + return readable.request(); + }).then((result) => { + return [ result ]; + }); + } else { + return Promise.try(() => { + debugTransform("Doing upstream read..."); + return source.read(); + }).then((value) => { + debugTransform("Writing upstream value to writable interface"); + writable.write(value); + + // This will quite possibly return an empty buffer, but that is fine; the `buffer` stream downstream from us will just keep reading (and therefore queueing up new items to be transformed) until it gets some results. + debugTransform("Consuming immediate buffer from readable interface"); + return readable.consumeImmediateBuffer(); + }).catch(isEndOfStream, () => { + debugTransform("End of upstream reached"); endHandled = true; + debugTransform("Closing via writable interface"); writable.end(); - return Promise.try(() => { - let { promise, defer } = createDefer(); - completionDefer = defer; - return promise; - }).then(() => { - return readable.consumeImmediateBuffer(); - }); - } else { - throw marker; - } - }); + // Return nothing, let the next read call (and all of those after that) deal with either underlying stream completion or buffered results + return []; + }); + } } }; diff --git a/package.json b/package.json index 231b2b9..def1d2b 100644 --- a/package.json +++ b/package.json @@ -16,6 +16,7 @@ "@promistream/simple-sink": "^0.1.0", "@promistream/simple-source": "^0.1.1", "bluebird": "^3.7.2", + "debug": "^4.3.1", "split-filter": "^1.1.3" } } diff --git a/src/readable/attach-handlers.js b/src/readable/attach-handlers.js index e7c7992..936d951 100644 --- a/src/readable/attach-handlers.js +++ b/src/readable/attach-handlers.js @@ -1,7 +1,10 @@ "use strict"; +const debug = require("debug")("promistream:from-node-stream:readable:attach-handlers"); + module.exports = function attachReadableStreamHandlers({ stream, onClose, onError, onData }) { function detachEventHandlers() { + debug("Detaching event handlers"); stream.removeListener("end", onCloseWrapper); stream.removeListener("close", onCloseWrapper); stream.removeListener("error", onErrorWrapper); @@ -9,6 +12,7 @@ module.exports = function attachReadableStreamHandlers({ stream, onClose, onErro } function attachEventHandlers() { + debug("Attaching event handlers"); stream.on("end", onCloseWrapper); stream.on("close", onCloseWrapper); stream.on("error", onErrorWrapper); @@ -16,11 +20,13 @@ module.exports = function attachReadableStreamHandlers({ stream, onClose, onErro } function onCloseWrapper() { + debug("onCloseWrapper called"); onClose(); detachEventHandlers(); } function onErrorWrapper(error) { + debug("onErrorWrapper called"); onError(error); detachEventHandlers(); } diff --git a/src/readable/index.js b/src/readable/index.js index 51323e9..2ec987d 100644 --- a/src/readable/index.js +++ b/src/readable/index.js @@ -1,5 +1,7 @@ "use strict"; +const debug = require("debug")("promistream:from-node-stream:readable"); + const attachHandlers = require("./attach-handlers"); const createPushBuffer = require("./push-buffer"); const destroyStream = require("../destroy-stream"); @@ -9,6 +11,7 @@ module.exports = function wireUpReadableInterface(stream, { onEnd, onError } = { let pushBuffer = createPushBuffer({ onPause: function () { if (stream.pause != null) { + debug("Pausing underlying stream"); stream.pause(); return true; // FIXME: Can we verify whether the pausing was successful, somehow? Eg. to deal with streams with `readable` event handlers attached. } else { @@ -17,6 +20,7 @@ module.exports = function wireUpReadableInterface(stream, { onEnd, onError } = { }, onResume: function () { if (stream.resume != null) { + debug("Resuming underlying stream"); stream.resume(); return true; } else { @@ -29,6 +33,12 @@ module.exports = function wireUpReadableInterface(stream, { onEnd, onError } = { attachHandlers({ stream: stream, onData: (data) => { + if (Buffer.isBuffer(data)) { + debug(`Chunk emitted of length ${data.length}`); + } else { + debug(`Value emitted`); + } + pushBuffer.queueValue(data); }, onError: (error) => { diff --git a/src/readable/push-buffer.js b/src/readable/push-buffer.js index 21e2278..d558b6a 100644 --- a/src/readable/push-buffer.js +++ b/src/readable/push-buffer.js @@ -5,6 +5,7 @@ const splitFilter = require("split-filter"); const unreachable = require("@joepie91/unreachable")("@promistream/from-node-stream"); const EndOfStream = require("@promistream/end-of-stream"); +const debug = require("debug")("promistream:from-node-stream:push-buffer"); const warn = require("../warn"); const createDefer = require("../create-defer"); @@ -18,6 +19,7 @@ module.exports = function createPushBuffer(options) { return false; }; + // TODO: Use @joepie91/consumable here? let itemBuffer = []; let requestQueue = []; let isPaused = false; @@ -35,33 +37,40 @@ module.exports = function createPushBuffer(options) { function attemptDrain() { // NOTE: This must remain fully synchronous, if we want to avoid unnecessary pauses in the `data` handler - - while (requestQueue.length > 0) { - let hasItems = (itemBuffer.length > 0); - let hasResponse = (hasEnded || hasItems); - - if (hasResponse) { - let defer = requestQueue.shift(); - - if (hasItems) { - // FIXME: Does this correctly deal with an error event produced as a result of an abort? - let item = itemBuffer.shift(); - - if (item.type === "value") { - defer.resolve(item.value); - } else if (item.type === "error") { - defer.reject(item.error); + debug("Drain attempt started"); + + if (requestQueue.length > 0) { + while (requestQueue.length > 0) { + let hasItems = (itemBuffer.length > 0); + let hasResponse = (hasEnded || hasItems); + + if (hasResponse) { + debug("Satisfying queued request"); + let defer = requestQueue.shift(); + + if (hasItems) { + // FIXME: Does this correctly deal with an error event produced as a result of an abort? + let item = itemBuffer.shift(); + + if (item.type === "value") { + defer.resolve(item.value); + } else if (item.type === "error") { + defer.reject(item.error); + } else { + unreachable(`Unexpected item type '${item.type}'`); + } + } else if (hasEnded) { + defer.reject(new EndOfStream()); } else { - unreachable(`Unexpected item type '${item.type}'`); + unreachable("Invalid response state, neither has items in queue nor ended"); } - } else if (hasEnded) { - defer.reject(new EndOfStream()); } else { - unreachable("Invalid response state, neither has items in queue nor ended"); + debug("No data available to satisfy queued request"); + break; } - } else { - break; } + } else { + debug("No outstanding requests to satisfy"); } resumeIfEmpty(); @@ -69,6 +78,7 @@ module.exports = function createPushBuffer(options) { return { queueValue: function (value) { + debug("Queueing value"); itemBuffer.push({ type: "value", value: value }); attemptDrain(); @@ -84,22 +94,28 @@ module.exports = function createPushBuffer(options) { } }, queueError: function (error) { + debug("Queueing error"); itemBuffer.push({ type: "error", error: error }); attemptDrain(); }, queueRequest: function () { + debug("Queueing read request"); let { defer, promise } = createDefer(); requestQueue.push(defer); attemptDrain(); return promise; }, markEnded: function () { + debug("Marking as ended"); hasEnded = true; attemptDrain(); }, consumeImmediateBuffer: function () { + debug("Post-drain remaining buffer requested"); attemptDrain(); + debug("Returning immediate buffer"); + // FIXME: Only return successful items here? if (requestQueue.length > 0) { // We won't ever serve up the buffer until any individual-item requests have been fulfilled. @@ -107,10 +123,16 @@ module.exports = function createPushBuffer(options) { } else { let [ values, errors ] = splitFilter(itemBuffer, (item) => item.type === "value"); + debug(`Buffer contains ${errors.length} errors and ${values.length} values`); + if (errors.length > 0) { + debug("Throwing first error"); + itemBuffer = values; // In case we ever write code that will do something with the remaining values in the buffer throw errors[0].error; } else { + debug(`Returning ${values.length} values`); + itemBuffer = []; resumeIfEmpty(); // Ensure that we haven't left the source stream in a paused state, because that would deadlock the pipeline return values.map((item) => item.value); diff --git a/src/writable/index.js b/src/writable/index.js index 123da3a..40e4ef5 100644 --- a/src/writable/index.js +++ b/src/writable/index.js @@ -1,5 +1,7 @@ "use strict"; +const debug = require("debug")("promistream:from-node-stream:writable"); + const attachHandlers = require("./attach-handlers"); const writeToStream = require("./write-to-stream"); const isStdioStream = require("../is-stdio-stream"); @@ -30,7 +32,10 @@ module.exports = function wireUpWritableInterface(stream, { onEnd, onError } = { end: function () { // stdout/stderr cannot be ended like other streams if (!isStdioStream(stream)) { + debug("Ending stream"); stream.end(); + } else { + debug("Not ending stream because it is stdio"); } }, destroy: function () { diff --git a/src/writable/write-to-stream.js b/src/writable/write-to-stream.js index 046efa5..8be31cc 100644 --- a/src/writable/write-to-stream.js +++ b/src/writable/write-to-stream.js @@ -1,5 +1,7 @@ "use strict"; +const debug = require("debug")("promistream:from-node-stream:writable"); + const isStdioStream = require("../is-stdio-stream"); module.exports = function writeToStream(stream, value) { @@ -7,10 +9,16 @@ module.exports = function writeToStream(stream, value) { let canWriteMore = stream.write(value); if (canWriteMore) { + debug("Stream can accept more data"); return; } else { + debug("Stream is backed up, waiting for drain event..."); + // TODO: Use p-event instead? return new Promise((resolve, _reject) => { - stream.once("drain", () => resolve()); + stream.once("drain", () => { + debug("Drain event received"); + resolve(); + }); }); } } else { diff --git a/yarn.lock b/yarn.lock index 7cd1f06..5603db3 100644 --- a/yarn.lock +++ b/yarn.lock @@ -460,6 +460,13 @@ create-error@^0.3.1: resolved "https://registry.yarnpkg.com/create-error/-/create-error-0.3.1.tgz#69810245a629e654432bf04377360003a5351a23" integrity sha1-aYECRaYp5lRDK/BDdzYAA6U1GiM= +debug@^4.3.1: + version "4.3.1" + resolved "https://registry.yarnpkg.com/debug/-/debug-4.3.1.tgz#f0d229c505e0c6d8c49ac553d1b13dc183f6b2ee" + integrity sha512-doEwdvm4PCeK4K3RQN2ZC2BYUBaxwLARCqZmMjtF8a51J2Rb0xpVloFRnCODwqjpwnAoao4pelN8l3RJdv3gRQ== + dependencies: + ms "2.1.2" + default-value@^1.0.0: version "1.0.0" resolved "https://registry.yarnpkg.com/default-value/-/default-value-1.0.0.tgz#8c6f52a5a1193fe78fdc9f86eb71d16c9757c83a" @@ -595,6 +602,11 @@ is.object@^1.0.0: resolved "https://registry.yarnpkg.com/is.object/-/is.object-1.0.0.tgz#e4f4117e9f083b35c8df5cf817ea3efb0452fdfa" integrity sha1-5PQRfp8IOzXI31z4F+o++wRS/fo= +ms@2.1.2: + version "2.1.2" + resolved "https://registry.yarnpkg.com/ms/-/ms-2.1.2.tgz#d09d1f357b443f493382a8eb3ccd183872ae6009" + integrity sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w== + split-filter-n@^1.1.2: version "1.1.2" resolved "https://registry.yarnpkg.com/split-filter-n/-/split-filter-n-1.1.2.tgz#268be1ec9c4d93dfb27b030c06165ac1b6f70f66"