|
|
@@ -4,31 +4,102 @@ const Promise = require("bluebird"); |
|
|
|
const propagateAbort = require("@ppstreams/propagate-abort"); |
|
|
|
const propagatePeek = require("@ppstreams/propagate-peek"); |
|
|
|
const { isEndOfStream } = require("@ppstreams/end-of-stream-marker"); |
|
|
|
const { isAborted } = require("@ppstreams/aborted-marker"); |
|
|
|
|
|
|
|
const { validateOptions } = require("@validatem/core"); |
|
|
|
const required = require("@validatem/required"); |
|
|
|
const isFunction = require("@validatem/is-function"); |
|
|
|
const defaultTo = require("@validatem/default-to"); |
|
|
|
|
|
|
|
// FIXME: Update other stream implementations to new API |
|
|
|
module.exports = function greedySinkStream(_options) { |
|
|
|
let { onResult, onEnd, onAbort } = validateOptions(arguments, { |
|
|
|
onResult: [ required, isFunction ], |
|
|
|
onEnd: [ isFunction, defaultTo.literal(defaultOnEnd) ], |
|
|
|
onAbort: [ isFunction, defaultTo.literal(defaultOnAbort) ] |
|
|
|
}); |
|
|
|
|
|
|
|
let lastResult; |
|
|
|
let onEndCalled = false; |
|
|
|
let onEndResult; |
|
|
|
let abortHandled = false; |
|
|
|
|
|
|
|
function defaultOnEnd() { |
|
|
|
// We return whatever value we got last from the specified onResult callback. |
|
|
|
return lastResult; |
|
|
|
} |
|
|
|
|
|
|
|
function defaultOnAbort() { |
|
|
|
// no-op |
|
|
|
} |
|
|
|
|
|
|
|
module.exports = function greedySinkStream(description, callback) { |
|
|
|
return { |
|
|
|
description: `greedy sink stream (${description})`, |
|
|
|
description: `greedy sink stream`, |
|
|
|
abort: propagateAbort, |
|
|
|
peek: propagatePeek, |
|
|
|
read: function produceValue_greedySinkStream(source) { |
|
|
|
let lastResult; |
|
|
|
|
|
|
|
function attemptRead() { |
|
|
|
return Promise.try(() => { |
|
|
|
return source.read(); |
|
|
|
}).then((value) => { |
|
|
|
return callback(value); |
|
|
|
// FIXME: Document that you can pause the sink from the onResult callback, by returning a Promise that resolves when it should be resumed |
|
|
|
return onResult(value); |
|
|
|
}).then((result) => { |
|
|
|
lastResult = result; |
|
|
|
|
|
|
|
return attemptRead(); |
|
|
|
}).catch(isEndOfStream, () => { |
|
|
|
/* Don't attempt to do another read, we're done. We return whatever value we got last from the specified callback. */ |
|
|
|
return lastResult; |
|
|
|
/* Don't attempt to do another read, we're done. */ |
|
|
|
if (onEndCalled) { |
|
|
|
return onEndResult; |
|
|
|
} else { |
|
|
|
return Promise.try(() => { |
|
|
|
return onEnd(); |
|
|
|
}).then((result) => { |
|
|
|
onEndResult = result; |
|
|
|
return result; |
|
|
|
}); |
|
|
|
} |
|
|
|
}).catch((error) => !isAborted(error), (error) => { |
|
|
|
return Promise.try(() => { |
|
|
|
return source.abort(error); |
|
|
|
}).catch((abortError) => { |
|
|
|
let message = [ |
|
|
|
`Tried to abort stream due to encountering an error, but the aborting itself failed`, |
|
|
|
`Original error message: ${error.message}`, |
|
|
|
`Abort failure message: ${abortError.message}` |
|
|
|
].join("\n"); |
|
|
|
|
|
|
|
// FIXME: Make this some sort of chained error |
|
|
|
let combinedError = new Error(message); |
|
|
|
combinedError.stack = abortError.stack; // HACK |
|
|
|
throw combinedError; |
|
|
|
}).then(() => { |
|
|
|
// Pass through the original error to the user |
|
|
|
throw error; |
|
|
|
}); |
|
|
|
}).catch(isAborted, (marker) => { |
|
|
|
if (abortHandled === false) { |
|
|
|
abortHandled = true; |
|
|
|
|
|
|
|
return Promise.try(() => { |
|
|
|
return onAbort(); |
|
|
|
}).then(() => { |
|
|
|
if (marker.reason instanceof Error) { |
|
|
|
// NOTE: This ensures that the original error causing the abort is thrown exactly once |
|
|
|
throw marker.reason; |
|
|
|
} else { |
|
|
|
throw marker; |
|
|
|
} |
|
|
|
}); |
|
|
|
} else { |
|
|
|
// Don't interfere, we only need special behaviour on the first occurrence |
|
|
|
throw marker; |
|
|
|
} |
|
|
|
}); |
|
|
|
} |
|
|
|
|
|
|
|
return attemptRead(); |
|
|
|
} |
|
|
|
}; |
|
|
|
}; |
|
|
|
}; |