|
|
|
@ -3,8 +3,8 @@
|
|
|
|
|
const Promise = require("bluebird");
|
|
|
|
|
const propagateAbort = require("@ppstreams/propagate-abort");
|
|
|
|
|
const propagatePeek = require("@ppstreams/propagate-peek");
|
|
|
|
|
const { isEndOfStream } = require("@ppstreams/end-of-stream-marker");
|
|
|
|
|
const { isAborted } = require("@ppstreams/aborted-marker");
|
|
|
|
|
const isEndOfStream = require("@ppstreams/is-end-of-stream");
|
|
|
|
|
const isAborted = require("@ppstreams/is-aborted");
|
|
|
|
|
|
|
|
|
|
const { validateOptions } = require("@validatem/core");
|
|
|
|
|
const required = require("@validatem/required");
|
|
|
|
@ -48,58 +48,60 @@ module.exports = function simpleSinkStream(_options) {
|
|
|
|
|
lastResult = result;
|
|
|
|
|
|
|
|
|
|
return attemptRead();
|
|
|
|
|
}).catch(isEndOfStream, () => {
|
|
|
|
|
/* Don't attempt to do another read, we're done. */
|
|
|
|
|
if (onEndCalled) {
|
|
|
|
|
return onEndResult;
|
|
|
|
|
} else {
|
|
|
|
|
return Promise.try(() => {
|
|
|
|
|
return onEnd();
|
|
|
|
|
}).then((result) => {
|
|
|
|
|
onEndResult = result;
|
|
|
|
|
return result;
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}).catch((error) => !isAborted(error), (error) => {
|
|
|
|
|
return Promise.try(() => {
|
|
|
|
|
return source.abort(error);
|
|
|
|
|
}).catch((abortError) => {
|
|
|
|
|
let message = [
|
|
|
|
|
`Tried to abort stream due to encountering an error, but the aborting itself failed`,
|
|
|
|
|
`Original error message: ${error.message}`,
|
|
|
|
|
`Abort failure message: ${abortError.message}`
|
|
|
|
|
].join("\n");
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// FIXME: Make this some sort of chained error
|
|
|
|
|
let combinedError = new Error(message);
|
|
|
|
|
combinedError.stack = abortError.stack; // HACK
|
|
|
|
|
throw combinedError;
|
|
|
|
|
}).then(() => {
|
|
|
|
|
// Pass through the original error to the user
|
|
|
|
|
throw error;
|
|
|
|
|
return Promise.try(() => {
|
|
|
|
|
return attemptRead();
|
|
|
|
|
}).catch(isEndOfStream, () => {
|
|
|
|
|
/* Don't attempt to do another read, we're done. */
|
|
|
|
|
if (onEndCalled) {
|
|
|
|
|
return onEndResult;
|
|
|
|
|
} else {
|
|
|
|
|
return Promise.try(() => {
|
|
|
|
|
return onEnd();
|
|
|
|
|
}).then((result) => {
|
|
|
|
|
onEndResult = result;
|
|
|
|
|
return result;
|
|
|
|
|
});
|
|
|
|
|
}).catch(isAborted, (marker) => {
|
|
|
|
|
if (abortHandled === false) {
|
|
|
|
|
abortHandled = true;
|
|
|
|
|
}
|
|
|
|
|
}).catch((error) => !isAborted(error), (error) => {
|
|
|
|
|
return Promise.try(() => {
|
|
|
|
|
return source.abort(error);
|
|
|
|
|
}).catch((abortError) => {
|
|
|
|
|
let message = [
|
|
|
|
|
`Tried to abort stream due to encountering an error, but the aborting itself failed`,
|
|
|
|
|
`Original error message: ${error.message}`,
|
|
|
|
|
`Abort failure message: ${abortError.message}`
|
|
|
|
|
].join("\n");
|
|
|
|
|
|
|
|
|
|
return Promise.try(() => {
|
|
|
|
|
return onAbort();
|
|
|
|
|
}).then(() => {
|
|
|
|
|
if (marker.reason instanceof Error) {
|
|
|
|
|
// NOTE: This ensures that the original error causing the abort is thrown exactly once
|
|
|
|
|
throw marker.reason;
|
|
|
|
|
} else {
|
|
|
|
|
throw marker;
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
} else {
|
|
|
|
|
// Don't interfere, we only need special behaviour on the first occurrence
|
|
|
|
|
throw marker;
|
|
|
|
|
}
|
|
|
|
|
// FIXME: Make this some sort of chained error
|
|
|
|
|
let combinedError = new Error(message);
|
|
|
|
|
combinedError.stack = abortError.stack; // HACK
|
|
|
|
|
throw combinedError;
|
|
|
|
|
}).then(() => {
|
|
|
|
|
// Pass through the original error to the user
|
|
|
|
|
throw error;
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}).catch(isAborted, (marker) => {
|
|
|
|
|
if (abortHandled === false) {
|
|
|
|
|
abortHandled = true;
|
|
|
|
|
|
|
|
|
|
return attemptRead();
|
|
|
|
|
return Promise.try(() => {
|
|
|
|
|
return onAbort();
|
|
|
|
|
}).then(() => {
|
|
|
|
|
if (marker.reason instanceof Error) {
|
|
|
|
|
// NOTE: This ensures that the original error causing the abort is thrown exactly once
|
|
|
|
|
throw marker.reason;
|
|
|
|
|
} else {
|
|
|
|
|
throw marker;
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
} else {
|
|
|
|
|
// Don't interfere, we only need special behaviour on the first occurrence
|
|
|
|
|
throw marker;
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
};
|
|
|
|
|