Compare commits

...

3 Commits

@ -1,10 +1,10 @@
"use strict";
const Promise = require("bluebird");
const propagateAbort = require("@ppstreams/propagate-abort");
const propagatePeek = require("@ppstreams/propagate-peek");
const isEndOfStream = require("@ppstreams/is-end-of-stream");
const isAborted = require("@ppstreams/is-aborted");
const propagateAbort = require("@promistream/propagate-abort");
const propagatePeek = require("@promistream/propagate-peek");
const isEndOfStream = require("@promistream/is-end-of-stream");
const isAborted = require("@promistream/is-aborted");
const { validateOptions } = require("@validatem/core");
const required = require("@validatem/required");
@ -14,12 +14,14 @@ const wrapValueAsOption = require("@validatem/wrap-value-as-option");
// FIXME: Update other stream implementations to new API
module.exports = function simpleSinkStream(_options) {
let { onResult, onEnd, onAbort } = validateOptions(arguments, [
let { onResult, onEnd, onAbort, onSourceChanged } = validateOptions(arguments, [
required,
wrapValueAsOption("onResult"), {
onResult: [ required, isFunction ],
onAbort: [ isFunction ],
onSourceChanged: [ isFunction ],
onEnd: [ isFunction, defaultTo.literal(function defaultOnEnd() {
// FIXME: Deprecate this, it does not work as-expected when the onResult callback never gets called (eg. due to a stream immediately terminating), as the onResult callback will then never get a chance to set the lastResult, not even to an initializer value (eg. an empty array for `collect`)
// We return whatever value we got last from the specified onResult callback.
return lastResult;
})]
@ -30,8 +32,10 @@ module.exports = function simpleSinkStream(_options) {
let onEndCalled = false;
let onEndResult;
let abortHandled = false;
let lastKnownSource;
return {
_promistreamVersion: 0,
description: `simple sink stream`,
abort: propagateAbort,
peek: propagatePeek,
@ -50,9 +54,15 @@ module.exports = function simpleSinkStream(_options) {
}
return Promise.try(() => {
if (source !== lastKnownSource && onSourceChanged != null) {
lastKnownSource = source;
return onSourceChanged(source);
}
}).then(() => {
return attemptRead();
}).catch(isEndOfStream, () => {
/* Don't attempt to do another read, we're done. */
// FIXME: Is it actually correct to keep returning the same thing?
if (onEndCalled) {
return onEndResult;
} else {

@ -1,15 +1,15 @@
{
"name": "@ppstreams/simple-sink",
"version": "0.1.0",
"name": "@promistream/simple-sink",
"version": "0.1.1",
"main": "index.js",
"repository": "http://git.cryto.net/ppstreams/simple-sink.git",
"repository": "http://git.cryto.net/promistream/simple-sink.git",
"author": "Sven Slootweg <admin@cryto.net>",
"license": "WTFPL OR CC0-1.0",
"dependencies": {
"@ppstreams/is-aborted": "^0.1.0",
"@ppstreams/is-end-of-stream": "^0.1.0",
"@ppstreams/propagate-abort": "^0.1.2",
"@ppstreams/propagate-peek": "^0.1.0",
"@promistream/is-aborted": "^0.1.1",
"@promistream/is-end-of-stream": "^0.1.1",
"@promistream/propagate-abort": "^0.1.6",
"@promistream/propagate-peek": "^0.1.1",
"@validatem/core": "^0.3.11",
"@validatem/default-to": "^0.1.0",
"@validatem/is-function": "^0.1.0",

Loading…
Cancel
Save