Browse Source

Ensure that a writable stream doesn't end until the underlying stream has ended, plus misc changes

master
Sven Slootweg 1 month ago
parent
commit
e6ad12417d
5 changed files with 87 additions and 40 deletions
  1. +1
    -1
      example.js
  2. +6
    -0
      index.js
  3. +6
    -0
      package.json
  4. +4
    -0
      src/writable/index.js
  5. +70
    -39
      yarn.lock

+ 1
- 1
example.js View File

@ -33,7 +33,7 @@ return Promise.try(() => {
return pipe([
rangeNumbers(0, 10),
map((line) => String(line) + "\n"),
map((number) => String(number) + "\n"),
fromNodeStream(fs.createWriteStream("numbers.txt", { encoding: "utf8" }))
]).read();
}).then((result) => {


+ 6
- 0
index.js View File

@ -65,22 +65,28 @@ function fromReadable(stream) {
});
}
let debugWritable = debug("promistream:from-node-stream:writable");
function fromWritable(stream) {
let upstreamHasEnded = false;
let mostRecentSource = { abort: function() {} }; // FIXME: Replace with a proper spec-compliant dummy stream
let convertedStream = simpleSink({
onResult: (result) => {
debugWritable("Received value");
return writable.write(result);
},
onEnd: () => {
debugWritable("Upstream reported end-of-stream");
upstreamHasEnded = true;
return writable.end();
},
onAbort: (_reason) => {
debugWritable("Pipeline was aborted");
return writable.destroy();
},
onSourceChanged: (source) => {
debugWritable("A source change occurred");
mostRecentSource = source;
}
});


+ 6
- 0
package.json View File

@ -17,6 +17,12 @@
"@promistream/simple-source": "^0.1.1",
"bluebird": "^3.7.2",
"debug": "^4.3.1",
"p-event": "^4.2.0",
"split-filter": "^1.1.3"
},
"devDependencies": {
"@promistream/buffered-map": "^0.1.0",
"@promistream/collect": "^0.1.1",
"@promistream/range-numbers": "^0.1.2"
}
}

+ 4
- 0
src/writable/index.js View File

@ -1,5 +1,6 @@
"use strict";
const pEvent = require("p-event");
const debug = require("debug")("promistream:from-node-stream:writable");
const attachHandlers = require("./attach-handlers");
@ -33,7 +34,10 @@ module.exports = function wireUpWritableInterface(stream, { onEnd, onError } = {
// stdout/stderr cannot be ended like other streams
if (!isStdioStream(stream)) {
debug("Ending stream");
let finishPromise = pEvent(stream, "finish");
stream.end();
return finishPromise;
} else {
debug("Not ending stream because it is stdio");
}


+ 70
- 39
yarn.lock View File

@ -15,7 +15,7 @@
default-value "^1.0.0"
error-chain "^0.1.0"
"@promistream/buffer@^0.1.0":
"@promistream/buffer@^0.1.0", "@promistream/buffer@^0.1.2":
version "0.1.2"
resolved "https://registry.yarnpkg.com/@promistream/buffer/-/buffer-0.1.2.tgz#73c63476aa5cfeb111298b791a33d8008384721b"
integrity sha512-EquWW7HTpjngNkMxHhxww0rvODWAaEN715BXQWP9zXkm+CXdhYmadeol7G2kMPTTYuYjKFmiNUdjjfm/z2puyg==
@ -24,6 +24,22 @@
"@promistream/propagate-abort" "^0.1.2"
bluebird "^3.5.4"
"@promistream/buffered-map@^0.1.0":
version "0.1.1"
resolved "https://registry.yarnpkg.com/@promistream/buffered-map/-/buffered-map-0.1.1.tgz#7c0f2542102b0a9f8b738100ea4587a084dcb110"
integrity sha512-6KIcJEW75oJmKzYEJoGTm+vZgwH7oeOJCLJj9eRKnqSYaGFBtWWyyEN0uEui5qrXbPqFW4l0Eu609HhRWPc3Gg==
dependencies:
"@promistream/buffer" "^0.1.2"
"@promistream/map" "^0.1.1"
"@promistream/pipe" "^0.1.4"
"@promistream/collect@^0.1.1":
version "0.1.1"
resolved "https://registry.yarnpkg.com/@promistream/collect/-/collect-0.1.1.tgz#085360a66c5cab7616833542009212be34a447ff"
integrity sha512-zXnp8SFm2dFsvJBASLfYMUxfaNkvzyhU56WT1iAXxlN5w2Rb0vArP2pIXvpbiSVrWkUObNeZ8t715nGxqsWEow==
dependencies:
"@promistream/simple-sink" "^0.1.0"
"@promistream/end-of-stream@^0.1.0", "@promistream/end-of-stream@^0.1.1":
version "0.1.2"
resolved "https://registry.yarnpkg.com/@promistream/end-of-stream/-/end-of-stream-0.1.2.tgz#45820c8d29353c480c0219920db95ba075396438"
@ -42,7 +58,19 @@
resolved "https://registry.yarnpkg.com/@promistream/is-end-of-stream/-/is-end-of-stream-0.1.1.tgz#7f84e630c9e49a92739df6a8c574eff99dd4c09d"
integrity sha512-GZn7W0wrUen7kkgWCcwFFgr0g/ftfuddnuK/Tp0MLWCCJA4hyAboglCZP0JzEJdi34gClEP8lCfDwGekw18LHg==
"@promistream/pipe@^0.1.1":
"@promistream/map@^0.1.1":
version "0.1.1"
resolved "https://registry.yarnpkg.com/@promistream/map/-/map-0.1.1.tgz#2f771372e5d1dd12f41b6efd57874014d406f123"
integrity sha512-ggyNqWlvNXVY9Gf/pLUgbHROK8mEqu46hbpJftmN9etPr724YPhL+vxA7+9b6bBmTLAU1Tw4Th3BWG5EHVBn1g==
dependencies:
"@promistream/propagate-abort" "^0.1.2"
"@promistream/propagate-peek" "^0.1.0"
"@validatem/core" "^0.3.12"
"@validatem/is-function" "^0.1.0"
"@validatem/required" "^0.1.1"
bluebird "^3.5.4"
"@promistream/pipe@^0.1.1", "@promistream/pipe@^0.1.4":
version "0.1.4"
resolved "https://registry.yarnpkg.com/@promistream/pipe/-/pipe-0.1.4.tgz#ef05fe582a33768c7eb56ad20635e1b7b48ac95b"
integrity sha512-4js6lhu/aTNEMosIBFcCz8Rkxc1S2V4zzI2QvZp9HqglhL5UTuxnv5VbU2ZlPFAFVID1aJOurZ8KdiVagHfOCw==
@ -66,6 +94,14 @@
resolved "https://registry.yarnpkg.com/@promistream/propagate-peek/-/propagate-peek-0.1.1.tgz#c7dd69efcd894c408d7a3e9713b6a9036f70a501"
integrity sha512-4xfkSmtPQzlvL4+KCquPHX7sPXiAACGJac/y7fB3Sv6ZKXAT/cjTfms1nEjlDGn1nroN0MzReBza2HnpF59deg==
"@promistream/range-numbers@^0.1.2":
version "0.1.2"
resolved "https://registry.yarnpkg.com/@promistream/range-numbers/-/range-numbers-0.1.2.tgz#2e5bbe012338eb238ee7ba469cde3ecb8a135239"
integrity sha512-yoCstn6vYhGjl0swIspyVUck3N/X8B97yODcbrCd2sqaFJoLClepIW9Fz24lZ26PHQc6pZTUrubrw+Fc7Dcvng==
dependencies:
"@promistream/end-of-stream" "^0.1.0"
"@promistream/simple-source" "^0.1.0"
"@promistream/simple-sink@^0.1.0":
version "0.1.1"
resolved "https://registry.yarnpkg.com/@promistream/simple-sink/-/simple-sink-0.1.1.tgz#e3808179102ffe4bc10d70d681f19c649e1f3811"
@ -82,7 +118,7 @@
"@validatem/wrap-value-as-option" "^0.1.0"
bluebird "^3.5.4"
"@promistream/simple-source@^0.1.1":
"@promistream/simple-source@^0.1.0", "@promistream/simple-source@^0.1.1":
version "0.1.3"
resolved "https://registry.yarnpkg.com/@promistream/simple-source/-/simple-source-0.1.3.tgz#8139ed088f8249eb9a93287fc04213008325cf06"
integrity sha512-rmpEW0Ec/9Ajrgnx0FHV+mYk4uZ+X3tRhACexUjeal6Jxgzp1oITES59+y2FZA86/a7VPCaadXBA6sWuRfcc3w==
@ -144,7 +180,7 @@
resolved "https://registry.yarnpkg.com/@validatem/combinator/-/combinator-0.1.2.tgz#eab893d55f1643b9c6857eaf6ff7ed2a728e89ff"
integrity sha512-vE8t1tNXknmN62FlN6LxQmA2c6TwVKZ+fl/Wit3H2unFdOhu7SZj2kRPGjAXdK/ARh/3svYfUBeD75pea0j1Sw==
"@validatem/core@^0.3.10", "@validatem/core@^0.3.15":
"@validatem/core@^0.3.10", "@validatem/core@^0.3.11", "@validatem/core@^0.3.12", "@validatem/core@^0.3.15":
version "0.3.15"
resolved "https://registry.yarnpkg.com/@validatem/core/-/core-0.3.15.tgz#645a0734dbc6efa3a5c39c62c5f2d8fa773f89f3"
integrity sha512-4nBLGzgpPrPsZ5DDXDXwL5p+GUEvpAFt6I3/YUHoah+ckYmKNh9qwmWKkFZHxJVdRrTewGFRj0FPw5fqje1yxA==
@ -170,32 +206,6 @@
supports-color "^7.1.0"
syncpipe "^1.0.0"
"@validatem/core@^0.3.11", "@validatem/core@^0.3.12":
version "0.3.12"
resolved "https://registry.yarnpkg.com/@validatem/core/-/core-0.3.12.tgz#e4e8a566850571bf55412862e88a3b06e75c8072"
integrity sha512-ngrFk6PT/pPZntpleG6q55SByongNxRk7wJhUiCihyv4yqIqqG+bNGH4wb6yW33IHefreWxkkJ53yM1Yj9srNA==
dependencies:
"@validatem/annotate-errors" "^0.1.2"
"@validatem/any-property" "^0.1.0"
"@validatem/error" "^1.0.0"
"@validatem/match-validation-error" "^0.1.0"
"@validatem/match-versioned-special" "^0.1.0"
"@validatem/match-virtual-property" "^0.1.0"
"@validatem/normalize-rules" "^0.1.0"
"@validatem/required" "^0.1.0"
"@validatem/validation-result" "^0.1.1"
"@validatem/virtual-property" "^0.1.0"
as-expression "^1.0.0"
assure-array "^1.0.0"
create-error "^0.3.1"
default-value "^1.0.0"
execall "^2.0.0"
flatten "^1.0.3"
indent-string "^4.0.0"
is-arguments "^1.0.4"
supports-color "^7.1.0"
syncpipe "^1.0.0"
"@validatem/default-to@^0.1.0":
version "0.1.0"
resolved "https://registry.yarnpkg.com/@validatem/default-to/-/default-to-0.1.0.tgz#62766a3ca24d2f61a96c713bcb629a5b3c6427c5"
@ -566,9 +576,11 @@ indent-string@^4.0.0:
integrity sha512-EdDDZu4A2OyIK7Lr/2zG+w5jmbuk1DVBnEwREQvBzspBJkCEbRa8GxU1lghYcaGJCnRWibjDXlq779X1/y5xwg==
is-arguments@^1.0.4:
version "1.0.4"
resolved "https://registry.yarnpkg.com/is-arguments/-/is-arguments-1.0.4.tgz#3faf966c7cba0ff437fb31f6250082fcf0448cf3"
integrity sha512-xPh0Rmt8NE65sNzvyUmWgI1tz3mKq74lGA0mL8LYZcoIzKOzDh6HmrYm3d18k60nHerC8A9Km8kYu87zfSFnLA==
version "1.1.0"
resolved "https://registry.yarnpkg.com/is-arguments/-/is-arguments-1.1.0.tgz#62353031dfbee07ceb34656a6bde59efecae8dd9"
integrity sha512-1Ij4lOMPl/xB5kBDn7I+b2ttPMKa8szhEIrXDuXQD/oe3HJLTLhqhgGspwgyGd6MOywBUqVvYicF72lkgDnIHg==
dependencies:
call-bind "^1.0.0"
is-boolean-object@^1.0.1:
version "1.1.0"
@ -578,9 +590,9 @@ is-boolean-object@^1.0.1:
call-bind "^1.0.0"
is-callable@^1.1.5:
version "1.2.0"
resolved "https://registry.yarnpkg.com/is-callable/-/is-callable-1.2.0.tgz#83336560b54a38e35e3a2df7afd0454d691468bb"
integrity sha512-pyVD9AaGLxtg6srb2Ng6ynWJqkHU9bEM087AKck0w8QwDarTfNcpIYoU8x8Hv2Icm8u6kFJM18Dag8lyqGkviw==
version "1.2.3"
resolved "https://registry.yarnpkg.com/is-callable/-/is-callable-1.2.3.tgz#8b1e0500b73a1d76c70487636f368e519de8db8e"
integrity sha512-J1DcMe8UYTBSrKezuIUTUwjXsho29693unXM2YhJUTR2txK/eG47bvNa/wipPFmZFgr/N6f1GA66dv0mEyTIyQ==
is-plain-obj@^2.1.0:
version "2.1.0"
@ -607,6 +619,25 @@ ms@2.1.2:
resolved "https://registry.yarnpkg.com/ms/-/ms-2.1.2.tgz#d09d1f357b443f493382a8eb3ccd183872ae6009"
integrity sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==
p-event@^4.2.0:
version "4.2.0"
resolved "https://registry.yarnpkg.com/p-event/-/p-event-4.2.0.tgz#af4b049c8acd91ae81083ebd1e6f5cae2044c1b5"
integrity sha512-KXatOjCRXXkSePPb1Nbi0p0m+gQAwdlbhi4wQKJPI1HsMQS9g+Sqp2o+QHziPr7eYJyOZet836KoHEVM1mwOrQ==
dependencies:
p-timeout "^3.1.0"
p-finally@^1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/p-finally/-/p-finally-1.0.0.tgz#3fbcfb15b899a44123b34b6dcc18b724336a2cae"
integrity sha1-P7z7FbiZpEEjs0ttzBi3JDNqLK4=
p-timeout@^3.1.0:
version "3.2.0"
resolved "https://registry.yarnpkg.com/p-timeout/-/p-timeout-3.2.0.tgz#c7e17abc971d2a7962ef83626b35d635acf23dfe"
integrity sha512-rhIwUycgwwKcP9yTOOFK/AKsAopjjCakVqLHePO3CC6Mir1Z99xT+R63jZxAT5lFZLa2inS5h+ZS2GvR99/FBg==
dependencies:
p-finally "^1.0.0"
split-filter-n@^1.1.2:
version "1.1.2"
resolved "https://registry.yarnpkg.com/split-filter-n/-/split-filter-n-1.1.2.tgz#268be1ec9c4d93dfb27b030c06165ac1b6f70f66"
@ -625,9 +656,9 @@ supports-color@^5.3.0:
has-flag "^3.0.0"
supports-color@^7.1.0:
version "7.1.0"
resolved "https://registry.yarnpkg.com/supports-color/-/supports-color-7.1.0.tgz#68e32591df73e25ad1c4b49108a2ec507962bfd1"
integrity sha512-oRSIpR8pxT1Wr2FquTNnGet79b3BWljqOuoW/h4oBhxJ/HUbX5nX6JSruTkvXDCFMwDPvsaTTbvMLKZWSy0R5g==
version "7.2.0"
resolved "https://registry.yarnpkg.com/supports-color/-/supports-color-7.2.0.tgz#1b7dcdcb32b8138801b3e478ba6a51caa89648da"
integrity sha512-qpCAvRl9stuOHveKsn7HncJRvv501qIacKzQlO/+Lwxc9+0q2wLyv4Dfvt80/DPn2pqOBsJdDiogXGR9+OvwRw==
dependencies:
has-flag "^4.0.0"


Loading…
Cancel
Save