@ -1,6 +1,8 @@
"use strict" ;
const Promise = require ( "bluebird" ) ;
const createResultBuffer = require ( "result-buffer" ) ;
const propagateAbort = require ( "@promistream/propagate-abort" ) ;
const propagatePeek = require ( "@promistream/propagate-peek" ) ;
const isEndOfStream = require ( "@promistream/is-end-of-stream" ) ;
@ -9,7 +11,6 @@ const isAborted = require("@promistream/is-aborted");
const { validateOptions } = require ( "@validatem/core" ) ;
const required = require ( "@validatem/required" ) ;
const isFunction = require ( "@validatem/is-function" ) ;
const defaultTo = require ( "@validatem/default-to" ) ;
const wrapValueAsOption = require ( "@validatem/wrap-value-as-option" ) ;
// FIXME: Update other stream implementations to new API
@ -20,20 +21,18 @@ module.exports = function simpleSinkStream(_options) {
onResult : [ required , isFunction ] ,
onAbort : [ isFunction ] ,
onSourceChanged : [ isFunction ] ,
onEnd : [ isFunction , defaultTo . literal ( function defaultOnEnd ( ) {
// FIXME: Deprecate this, it does not work as-expected when the onResult callback never gets called (eg. due to a stream immediately terminating), as the onResult callback will then never get a chance to set the lastResult, not even to an initializer value (eg. an empty array for `collect`)
// We return whatever value we got last from the specified onResult callback.
return lastResult ;
} ) ]
onEnd : [ isFunction ]
}
] ) ;
let lastResult ;
// FIXME: Bump minor version!
let onEndCalled = false ;
let onEndResult ;
let abortHandled = false ;
let lastKnownSource ;
let resultBuffer = createResultBuffer ( ) ;
return {
_promistreamVersion : 0 ,
description : ` simple sink stream ` ,
@ -45,72 +44,85 @@ module.exports = function simpleSinkStream(_options) {
return source . read ( ) ;
} ) . then ( ( value ) => {
// FIXME: Document that you can pause the sink from the onResult callback, by returning a Promise that resolves when it should be resumed
return onResult ( value );
return onResult ( value , source . abort . bind ( source ) );
} ) . then ( ( result ) => {
lastResult = result ;
return attemptRead ( ) ;
// FIXME: Replace all instances of undefined checks with a standardized NoValue marker
if ( result !== undefined ) {
// TODO: Force end of stream when this occurs?
return result ;
} else {
return attemptRead ( ) ;
}
} ) ;
}
return Promise . try ( ( ) => {
if ( source !== lastKnownSource && onSourceChanged != null ) {
lastKnownSource = source ;
return onSourceChanged ( source ) ;
}
} ) . then ( ( ) => {
return attemptRead ( ) ;
} ) . catch ( isEndOfStream , ( ) => {
/* Don't attempt to do another read, we're done. */
// FIXME: Is it actually correct to keep returning the same thing?
if ( onEndCalled ) {
return onEndResult ;
} else {
return Promise . try ( ( ) => {
return onEnd ( ) ;
} ) . then ( ( result ) => {
onEndResult = result ;
return result ;
} ) ;
}
} ) . catch ( ( error ) => ! isAborted ( error ) , ( error ) => {
return resultBuffer . maybeRead ( ( ) => {
return Promise . try ( ( ) => {
return source . abort ( error ) ;
} ) . catch ( ( abortError ) => {
let message = [
` Tried to abort stream due to encountering an error, but the aborting itself failed ` ,
` Original error message: ${ error . message } ` ,
` Abort failure message: ${ abortError . message } `
] . join ( "\n" ) ;
// FIXME: Make this some sort of chained error
let combinedError = new Error ( message ) ;
combinedError . stack = abortError . stack ; // HACK
throw combinedError ;
if ( source !== lastKnownSource && onSourceChanged != null ) {
lastKnownSource = source ;
return onSourceChanged ( source ) ;
}
} ) . then ( ( ) => {
// Pass through the original error to the user
throw error ;
} ) ;
} ) . catch ( isAborted , ( marker ) => {
if ( abortHandled === false ) {
abortHandled = true ;
return attemptRead ( ) ;
} ) . catch ( isEndOfStream , ( error ) => {
/* Don't attempt to do another read, we're done. */
if ( ! onEndCalled && onEnd != null ) {
onEndCalled = true ;
return Promise . try ( ( ) => {
return onEnd ( ) ;
} ) . then ( ( result ) => {
if ( result !== undefined ) {
resultBuffer . push ( result ) ;
}
} ) ;
}
return resultBuffer . maybeRead ( ( ) => {
throw error ;
} ) ;
} ) . catch ( ( error ) => ! isAborted ( error ) , ( error ) => {
return Promise . try ( ( ) => {
if ( onAbort != null ) {
return onAbort ( ) ;
}
return source . abort ( error ) ;
} ) . catch ( ( abortError ) => {
let message = [
` Tried to abort stream due to encountering an error, but the aborting itself failed ` ,
` Original error message: ${ error . message } ` ,
` Abort failure message: ${ abortError . message } `
] . join ( "\n" ) ;
// FIXME: Make this some sort of chained error
let combinedError = new Error ( message ) ;
combinedError . stack = abortError . stack ; // HACK
throw combinedError ;
} ) . then ( ( ) => {
if ( marker . reason instanceof Error ) {
// NOTE: This ensures that the original error causing the abort is thrown exactly once
throw marker . reason ;
} else {
throw marker ;
}
// Pass through the original error to the user
throw error ;
} ) ;
} ) . catch ( isAborted , ( marker ) => {
if ( abortHandled === false ) {
abortHandled = true ;
return Promise . try ( ( ) => {
if ( onAbort != null ) {
return onAbort ( marker . reason ) ;
}
} ) . then ( ( value ) => {
if ( value !== undefined ) {
resultBuffer . push ( value ) ;
}
if ( marker . reason instanceof Error ) {
// NOTE: This ensures that the original error causing the abort is thrown exactly once
resultBuffer . push ( Promise . reject ( marker . reason ) ) ;
}
} ) ;
}
return resultBuffer . maybeRead ( ( ) => {
throw marker ;
} ) ;
} else {
// Don't interfere, we only need special behaviour on the first occurrence
throw marker ;
}
} ) ;
} ) ;
}
} ;