|
|
|
"use strict";
|
|
|
|
|
|
|
|
const debug = require("debug")("promistream:from-node-stream:readable");
|
|
|
|
|
|
|
|
const objectID = require("../object-id");
|
|
|
|
const attachHandlers = require("./attach-handlers");
|
|
|
|
const createPushBuffer = require("./push-buffer");
|
|
|
|
const destroyStream = require("../destroy-stream");
|
|
|
|
const assertErrorType = require("../assert-error-type");
|
|
|
|
|
|
|
|
module.exports = function wireUpReadableInterface(stream, { onEnd, onError } = {}) {
|
|
|
|
let pushBuffer = createPushBuffer({
|
|
|
|
onPause: function () {
|
|
|
|
if (stream.pause != null) {
|
|
|
|
debug(`[#${objectID(stream)}] Pausing underlying stream`);
|
|
|
|
stream.pause();
|
|
|
|
return true; // FIXME: Can we verify whether the pausing was successful, somehow? Eg. to deal with streams with `readable` event handlers attached.
|
|
|
|
} else {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
},
|
|
|
|
onResume: function () {
|
|
|
|
if (stream.resume != null) {
|
|
|
|
debug(`[#${objectID(stream)}] Resuming underlying stream`);
|
|
|
|
stream.resume();
|
|
|
|
return true;
|
|
|
|
} else {
|
|
|
|
throw new Error(`Stream was successfully paused but does not have a resume method. This should never happen!`);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
// TODO: Verify that auto-detaching all event handlers upon end/error is actually the correct thing to do!
|
|
|
|
attachHandlers({
|
|
|
|
stream: stream,
|
|
|
|
onData: (data) => {
|
|
|
|
if (Buffer.isBuffer(data)) {
|
|
|
|
debug(`[#${objectID(stream)}] Chunk emitted of length ${data.length}`);
|
|
|
|
} else {
|
|
|
|
debug(`[#${objectID(stream)}] Value emitted`);
|
|
|
|
}
|
|
|
|
|
|
|
|
pushBuffer.queueValue(data);
|
|
|
|
},
|
|
|
|
onError: (error) => {
|
|
|
|
assertErrorType(error);
|
|
|
|
pushBuffer.queueError(error);
|
|
|
|
|
|
|
|
if (onError != null) {
|
|
|
|
onError(error);
|
|
|
|
}
|
|
|
|
},
|
|
|
|
onClose: () => {
|
|
|
|
pushBuffer.markEnded();
|
|
|
|
|
|
|
|
if (onEnd != null) {
|
|
|
|
onEnd();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
return {
|
|
|
|
request: pushBuffer.queueRequest,
|
|
|
|
consumeImmediateBuffer: pushBuffer.consumeImmediateBuffer,
|
|
|
|
awaitBuffer: pushBuffer.awaitBuffer,
|
|
|
|
destroy: () => {
|
|
|
|
return destroyStream(stream);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
};
|