Fix transform buffer deadlock, add debug messages

master
Sven Slootweg 3 years ago
parent 5cfd5c501a
commit 26f48400a0

@ -8,6 +8,7 @@ const propagatePeek = require("@promistream/propagate-peek");
const propagateAbort = require("@promistream/propagate-abort");
const pipe = require("@promistream/pipe");
const isEndOfStream = require("@promistream/is-end-of-stream");
const debug = require("debug");
const createDefer = require("./src/create-defer");
const wireUpReadableInterface = require("./src/readable");
@ -100,8 +101,9 @@ function fromWritable(stream) {
return convertedStream;
}
let debugTransform = debug("promistream:from-node-stream:transform");
function fromTransform(stream) {
let completionDefer;
let endHandled = false;
// FIXME: we need to specifically watch for the `error` and `end` events on the readable interface, to know when the transform stream has fully completed processing
@ -109,19 +111,15 @@ function fromTransform(stream) {
// request, destroy
let readable = wireUpReadableInterface(stream, {
onEnd: () => {
if (completionDefer != null) {
completionDefer.resolve();
}
debugTransform("Received end/close event from underlying stream");
},
onError: (error) => {
if (completionDefer != null) {
completionDefer.reject(error);
}
onError: () => {
debugTransform("Received error event from underlying stream");
}
});
// write, end, destroy
var writable = wireUpWritableInterface(stream);
let writable = wireUpWritableInterface(stream);
let convertedStream = {
_promistreamVersion: 0,
@ -129,32 +127,36 @@ function fromTransform(stream) {
abort: propagateAbort,
peek: propagatePeek,
read: function produceValue_nodeTransformStream(source) {
return Promise.try(() => {
return source.read();
}).then((value) => {
writable.write(value);
// This will quite possibly return an empty buffer, but that is fine; the `buffer` stream downstream from us will just keep reading (and therefore queueing up new items to be transformed) until it gets some results.
return readable.consumeImmediateBuffer();
}).catch(isEndOfStream, (marker) => {
// Wait for transform stream to drain fully, `error`/`end` event, and then return whatever buffer remains.
// FIXME: Error propagation logic is pretty shaky here. Verify that we don't end up with double error reports.
if (endHandled === false) {
if (endHandled) {
// NOTE: This logic exists at the start, not in the upstream EndOfStream handling code, because any number of buffer reads may be required before the wrapped Node stream can be closed
// NOTE: The push-buffer will automatically produce EndOfStream markers once the buffer has run out and the underlying stream has closed, so long as we're using the wireUpReadableInterface function
return Promise.try(() => {
return readable.request();
}).then((result) => {
return [ result ];
});
} else {
return Promise.try(() => {
debugTransform("Doing upstream read...");
return source.read();
}).then((value) => {
debugTransform("Writing upstream value to writable interface");
writable.write(value);
// This will quite possibly return an empty buffer, but that is fine; the `buffer` stream downstream from us will just keep reading (and therefore queueing up new items to be transformed) until it gets some results.
debugTransform("Consuming immediate buffer from readable interface");
return readable.consumeImmediateBuffer();
}).catch(isEndOfStream, () => {
debugTransform("End of upstream reached");
endHandled = true;
debugTransform("Closing via writable interface");
writable.end();
return Promise.try(() => {
let { promise, defer } = createDefer();
completionDefer = defer;
return promise;
}).then(() => {
return readable.consumeImmediateBuffer();
});
} else {
throw marker;
}
});
// Return nothing, let the next read call (and all of those after that) deal with either underlying stream completion or buffered results
return [];
});
}
}
};

@ -16,6 +16,7 @@
"@promistream/simple-sink": "^0.1.0",
"@promistream/simple-source": "^0.1.1",
"bluebird": "^3.7.2",
"debug": "^4.3.1",
"split-filter": "^1.1.3"
}
}

@ -1,7 +1,10 @@
"use strict";
const debug = require("debug")("promistream:from-node-stream:readable:attach-handlers");
module.exports = function attachReadableStreamHandlers({ stream, onClose, onError, onData }) {
function detachEventHandlers() {
debug("Detaching event handlers");
stream.removeListener("end", onCloseWrapper);
stream.removeListener("close", onCloseWrapper);
stream.removeListener("error", onErrorWrapper);
@ -9,6 +12,7 @@ module.exports = function attachReadableStreamHandlers({ stream, onClose, onErro
}
function attachEventHandlers() {
debug("Attaching event handlers");
stream.on("end", onCloseWrapper);
stream.on("close", onCloseWrapper);
stream.on("error", onErrorWrapper);
@ -16,11 +20,13 @@ module.exports = function attachReadableStreamHandlers({ stream, onClose, onErro
}
function onCloseWrapper() {
debug("onCloseWrapper called");
onClose();
detachEventHandlers();
}
function onErrorWrapper(error) {
debug("onErrorWrapper called");
onError(error);
detachEventHandlers();
}

@ -1,5 +1,7 @@
"use strict";
const debug = require("debug")("promistream:from-node-stream:readable");
const attachHandlers = require("./attach-handlers");
const createPushBuffer = require("./push-buffer");
const destroyStream = require("../destroy-stream");
@ -9,6 +11,7 @@ module.exports = function wireUpReadableInterface(stream, { onEnd, onError } = {
let pushBuffer = createPushBuffer({
onPause: function () {
if (stream.pause != null) {
debug("Pausing underlying stream");
stream.pause();
return true; // FIXME: Can we verify whether the pausing was successful, somehow? Eg. to deal with streams with `readable` event handlers attached.
} else {
@ -17,6 +20,7 @@ module.exports = function wireUpReadableInterface(stream, { onEnd, onError } = {
},
onResume: function () {
if (stream.resume != null) {
debug("Resuming underlying stream");
stream.resume();
return true;
} else {
@ -29,6 +33,12 @@ module.exports = function wireUpReadableInterface(stream, { onEnd, onError } = {
attachHandlers({
stream: stream,
onData: (data) => {
if (Buffer.isBuffer(data)) {
debug(`Chunk emitted of length ${data.length}`);
} else {
debug(`Value emitted`);
}
pushBuffer.queueValue(data);
},
onError: (error) => {

@ -5,6 +5,7 @@
const splitFilter = require("split-filter");
const unreachable = require("@joepie91/unreachable")("@promistream/from-node-stream");
const EndOfStream = require("@promistream/end-of-stream");
const debug = require("debug")("promistream:from-node-stream:push-buffer");
const warn = require("../warn");
const createDefer = require("../create-defer");
@ -18,6 +19,7 @@ module.exports = function createPushBuffer(options) {
return false;
};
// TODO: Use @joepie91/consumable here?
let itemBuffer = [];
let requestQueue = [];
let isPaused = false;
@ -35,33 +37,40 @@ module.exports = function createPushBuffer(options) {
function attemptDrain() {
// NOTE: This must remain fully synchronous, if we want to avoid unnecessary pauses in the `data` handler
while (requestQueue.length > 0) {
let hasItems = (itemBuffer.length > 0);
let hasResponse = (hasEnded || hasItems);
if (hasResponse) {
let defer = requestQueue.shift();
if (hasItems) {
// FIXME: Does this correctly deal with an error event produced as a result of an abort?
let item = itemBuffer.shift();
if (item.type === "value") {
defer.resolve(item.value);
} else if (item.type === "error") {
defer.reject(item.error);
debug("Drain attempt started");
if (requestQueue.length > 0) {
while (requestQueue.length > 0) {
let hasItems = (itemBuffer.length > 0);
let hasResponse = (hasEnded || hasItems);
if (hasResponse) {
debug("Satisfying queued request");
let defer = requestQueue.shift();
if (hasItems) {
// FIXME: Does this correctly deal with an error event produced as a result of an abort?
let item = itemBuffer.shift();
if (item.type === "value") {
defer.resolve(item.value);
} else if (item.type === "error") {
defer.reject(item.error);
} else {
unreachable(`Unexpected item type '${item.type}'`);
}
} else if (hasEnded) {
defer.reject(new EndOfStream());
} else {
unreachable(`Unexpected item type '${item.type}'`);
unreachable("Invalid response state, neither has items in queue nor ended");
}
} else if (hasEnded) {
defer.reject(new EndOfStream());
} else {
unreachable("Invalid response state, neither has items in queue nor ended");
debug("No data available to satisfy queued request");
break;
}
} else {
break;
}
} else {
debug("No outstanding requests to satisfy");
}
resumeIfEmpty();
@ -69,6 +78,7 @@ module.exports = function createPushBuffer(options) {
return {
queueValue: function (value) {
debug("Queueing value");
itemBuffer.push({ type: "value", value: value });
attemptDrain();
@ -84,22 +94,28 @@ module.exports = function createPushBuffer(options) {
}
},
queueError: function (error) {
debug("Queueing error");
itemBuffer.push({ type: "error", error: error });
attemptDrain();
},
queueRequest: function () {
debug("Queueing read request");
let { defer, promise } = createDefer();
requestQueue.push(defer);
attemptDrain();
return promise;
},
markEnded: function () {
debug("Marking as ended");
hasEnded = true;
attemptDrain();
},
consumeImmediateBuffer: function () {
debug("Post-drain remaining buffer requested");
attemptDrain();
debug("Returning immediate buffer");
// FIXME: Only return successful items here?
if (requestQueue.length > 0) {
// We won't ever serve up the buffer until any individual-item requests have been fulfilled.
@ -107,10 +123,16 @@ module.exports = function createPushBuffer(options) {
} else {
let [ values, errors ] = splitFilter(itemBuffer, (item) => item.type === "value");
debug(`Buffer contains ${errors.length} errors and ${values.length} values`);
if (errors.length > 0) {
debug("Throwing first error");
itemBuffer = values; // In case we ever write code that will do something with the remaining values in the buffer
throw errors[0].error;
} else {
debug(`Returning ${values.length} values`);
itemBuffer = [];
resumeIfEmpty(); // Ensure that we haven't left the source stream in a paused state, because that would deadlock the pipeline
return values.map((item) => item.value);

@ -1,5 +1,7 @@
"use strict";
const debug = require("debug")("promistream:from-node-stream:writable");
const attachHandlers = require("./attach-handlers");
const writeToStream = require("./write-to-stream");
const isStdioStream = require("../is-stdio-stream");
@ -30,7 +32,10 @@ module.exports = function wireUpWritableInterface(stream, { onEnd, onError } = {
end: function () {
// stdout/stderr cannot be ended like other streams
if (!isStdioStream(stream)) {
debug("Ending stream");
stream.end();
} else {
debug("Not ending stream because it is stdio");
}
},
destroy: function () {

@ -1,5 +1,7 @@
"use strict";
const debug = require("debug")("promistream:from-node-stream:writable");
const isStdioStream = require("../is-stdio-stream");
module.exports = function writeToStream(stream, value) {
@ -7,10 +9,16 @@ module.exports = function writeToStream(stream, value) {
let canWriteMore = stream.write(value);
if (canWriteMore) {
debug("Stream can accept more data");
return;
} else {
debug("Stream is backed up, waiting for drain event...");
// TODO: Use p-event instead?
return new Promise((resolve, _reject) => {
stream.once("drain", () => resolve());
stream.once("drain", () => {
debug("Drain event received");
resolve();
});
});
}
} else {

@ -460,6 +460,13 @@ create-error@^0.3.1:
resolved "https://registry.yarnpkg.com/create-error/-/create-error-0.3.1.tgz#69810245a629e654432bf04377360003a5351a23"
integrity sha1-aYECRaYp5lRDK/BDdzYAA6U1GiM=
debug@^4.3.1:
version "4.3.1"
resolved "https://registry.yarnpkg.com/debug/-/debug-4.3.1.tgz#f0d229c505e0c6d8c49ac553d1b13dc183f6b2ee"
integrity sha512-doEwdvm4PCeK4K3RQN2ZC2BYUBaxwLARCqZmMjtF8a51J2Rb0xpVloFRnCODwqjpwnAoao4pelN8l3RJdv3gRQ==
dependencies:
ms "2.1.2"
default-value@^1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/default-value/-/default-value-1.0.0.tgz#8c6f52a5a1193fe78fdc9f86eb71d16c9757c83a"
@ -595,6 +602,11 @@ is.object@^1.0.0:
resolved "https://registry.yarnpkg.com/is.object/-/is.object-1.0.0.tgz#e4f4117e9f083b35c8df5cf817ea3efb0452fdfa"
integrity sha1-5PQRfp8IOzXI31z4F+o++wRS/fo=
ms@2.1.2:
version "2.1.2"
resolved "https://registry.yarnpkg.com/ms/-/ms-2.1.2.tgz#d09d1f357b443f493382a8eb3ccd183872ae6009"
integrity sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==
split-filter-n@^1.1.2:
version "1.1.2"
resolved "https://registry.yarnpkg.com/split-filter-n/-/split-filter-n-1.1.2.tgz#268be1ec9c4d93dfb27b030c06165ac1b6f70f66"

Loading…
Cancel
Save