diff --git a/index.js b/index.js index ddf8038..97e7e73 100644 --- a/index.js +++ b/index.js @@ -1,13 +1,18 @@ "use strict"; const Promise = require("bluebird"); -const propagateAbort = require("@promistream/propagate-abort"); const unreachable = require("@joepie91/unreachable")("@promistream/buffer"); const util = require("util"); +const debug = require("debug")("promistream:buffer"); + +const propagateAbort = require("@promistream/propagate-abort"); +const sequentialize = require("@promistream/sequentialize"); +const pipe = require("@promistream/pipe"); module.exports = function bufferStream() { let buffer; let bufferPeeked; + let hasPeekedUpstream = false; function attemptRead(readFunc) { return Promise.try(() => { @@ -15,6 +20,8 @@ module.exports = function bufferStream() { }).then((result) => { if (Array.isArray(result)) { if (result.length > 0) { + debug(`Buffered ${result.length} items`); + buffer = result; bufferPeeked = result.map(() => false); } else { @@ -30,35 +37,70 @@ module.exports = function bufferStream() { }); } - return { + function peekLocal() { + let unpeekedIndex = bufferPeeked.indexOf(false); + + if (unpeekedIndex >= 0) { + bufferPeeked[unpeekedIndex] = true; + return true; + } + } + + function peekUpstream(source) { + if (!hasPeekedUpstream) { + debug("Peeking upstream"); + hasPeekedUpstream = true; + return source.peek(); + } else if (buffer == null) { + debug("Not allowed to peek upstream, reading first"); + + return Promise.try(() => { + return attemptRead(source.read); + }).then(() => { + return peekLocal(); + }); + } else { + unreachable("Attempted to peek upstream while there is already a local buffer"); + } + } + + let bufferStream = { _promistreamVersion: 0, description: "buffer stream", abort: propagateAbort, peek: function peekValue_bufferedSourceStream(source) { + debug("Got peek request"); + if (buffer == null) { - return source.peek(); + debug("No buffer, forwarding peek to upstream"); + return peekUpstream(source); } else { - let unpeekedIndex = bufferPeeked.indexOf(false); - - if (unpeekedIndex > 0) { - bufferPeeked[unpeekedIndex] = true; + if (peekLocal()) { + debug("Answering peek request"); return true; } else { - return source.peek(); + debug("Buffer fully peeked, forwarding peek to upstream"); + return peekUpstream(source); } } }, read: function produceValue_bufferedSourceStream(source) { + debug("Got read request"); + return Promise.try(() => { if (buffer == null) { + debug("Waiting for upstream read..."); return attemptRead(source.read); } }).then(() => { if (buffer != null) { let value = buffer.shift(); bufferPeeked.shift(); + + debug(`Picked item from buffer, ${buffer.length} items left`); if (buffer.length === 0) { + debug(`Buffer empty, unsetting...`); buffer = null; bufferPeeked = null; } @@ -70,4 +112,10 @@ module.exports = function bufferStream() { }); } }; + + return pipe([ + bufferStream, + // FIXME: Can this be made parallelism-safe without force-sequentializing? + sequentialize() + ]); }; diff --git a/package.json b/package.json index 3a8441f..4a715a5 100644 --- a/package.json +++ b/package.json @@ -7,13 +7,15 @@ "license": "WTFPL OR CC0-1.0", "dependencies": { "@joepie91/unreachable": "^1.0.0", + "@promistream/pipe": "^0.1.4", "@promistream/propagate-abort": "^0.1.2", - "bluebird": "^3.5.4" + "@promistream/sequentialize": "^0.1.0", + "bluebird": "^3.5.4", + "debug": "^4.3.1" }, "devDependencies": { "@promistream/collect": "^0.1.0", "@promistream/map": "^0.1.0", - "@promistream/pipe": "^0.1.0", "@promistream/range-numbers": "^0.1.1" } } diff --git a/yarn.lock b/yarn.lock index 82b0856..a41589e 100644 --- a/yarn.lock +++ b/yarn.lock @@ -52,10 +52,19 @@ "@validatem/required" "^0.1.1" bluebird "^3.5.4" -"@promistream/pipe@^0.1.0": - version "0.1.2" - resolved "https://registry.yarnpkg.com/@promistream/pipe/-/pipe-0.1.2.tgz#3e468916247e762e5ec90a831779e24677e80e57" - integrity sha512-J0SmZcEPZ7+If9Q3zeG6OmgfGHyiQe75iv0RMEXsUwxDQ12y4yDHkK7W09VhULAQ69JGk5j4SICk5LgBZlb60A== +"@promistream/pipe@^0.1.4": + version "0.1.4" + resolved "https://registry.yarnpkg.com/@promistream/pipe/-/pipe-0.1.4.tgz#ef05fe582a33768c7eb56ad20635e1b7b48ac95b" + integrity sha512-4js6lhu/aTNEMosIBFcCz8Rkxc1S2V4zzI2QvZp9HqglhL5UTuxnv5VbU2ZlPFAFVID1aJOurZ8KdiVagHfOCw== + dependencies: + "@validatem/allow-extra-properties" "^0.1.0" + "@validatem/anything" "^0.1.0" + "@validatem/array-of" "^0.1.2" + "@validatem/core" "^0.3.15" + "@validatem/error" "^1.1.0" + "@validatem/remove-nullish-items" "^0.1.0" + "@validatem/required" "^0.1.1" + "@validatem/wrap-error" "^0.3.0" "@promistream/propagate-abort@^0.1.2", "@promistream/propagate-abort@^0.1.6": version "0.1.6" @@ -75,6 +84,16 @@ "@promistream/end-of-stream" "^0.1.0" "@promistream/simple-source" "^0.1.0" +"@promistream/sequentialize@^0.1.0": + version "0.1.0" + resolved "https://registry.yarnpkg.com/@promistream/sequentialize/-/sequentialize-0.1.0.tgz#8cab499c2518ee856fcb1e13943859ca5b77ba71" + integrity sha512-lm7wJmlOSmBvHq49zLfs3cghOt9kcRhLezCbuhXQUXhhiaKLCvYuyA1AGId0kiJDPX2SggrU3Ojb+TOcxPEAqw== + dependencies: + "@joepie91/unreachable" "^1.0.0" + "@promistream/propagate-abort" "^0.1.2" + bluebird "^3.5.4" + p-defer "^3.0.0" + "@promistream/simple-sink@^0.1.0": version "0.1.1" resolved "https://registry.yarnpkg.com/@promistream/simple-sink/-/simple-sink-0.1.1.tgz#e3808179102ffe4bc10d70d681f19c649e1f3811" @@ -133,12 +152,27 @@ "@validatem/virtual-property" "^0.1.0" default-value "^1.0.0" +"@validatem/anything@^0.1.0": + version "0.1.0" + resolved "https://registry.yarnpkg.com/@validatem/anything/-/anything-0.1.0.tgz#09b57720476b9f7ab072c3e5d0a3d4234b721435" + integrity sha512-VJcygPpLw2fAhh29m2qL1AybHY7Ewl7xpvVgNIZpqUwMsSZXWSmzmbZhqE4Sr6Wy2n6FbZVzVoUFREO589SPcQ== + +"@validatem/array-of@^0.1.2": + version "0.1.2" + resolved "https://registry.yarnpkg.com/@validatem/array-of/-/array-of-0.1.2.tgz#59c09879fb41c583e45b210e7f7c78fd7f86ac33" + integrity sha512-3YjrZOxxlburFfRdJyPWbNoAA7a72E3/2nPCyVGTE8lekQy9NZSyrPjntMozwE14rsnGGLWFLCgNWKu73cyhxQ== + dependencies: + "@validatem/annotate-errors" "^0.1.2" + "@validatem/combinator" "^0.1.0" + "@validatem/is-array" "^0.1.0" + "@validatem/validation-result" "^0.1.1" + "@validatem/combinator@^0.1.0", "@validatem/combinator@^0.1.1": version "0.1.2" resolved "https://registry.yarnpkg.com/@validatem/combinator/-/combinator-0.1.2.tgz#eab893d55f1643b9c6857eaf6ff7ed2a728e89ff" integrity sha512-vE8t1tNXknmN62FlN6LxQmA2c6TwVKZ+fl/Wit3H2unFdOhu7SZj2kRPGjAXdK/ARh/3svYfUBeD75pea0j1Sw== -"@validatem/core@^0.3.10": +"@validatem/core@^0.3.10", "@validatem/core@^0.3.15": version "0.3.15" resolved "https://registry.yarnpkg.com/@validatem/core/-/core-0.3.15.tgz#645a0734dbc6efa3a5c39c62c5f2d8fa773f89f3" integrity sha512-4nBLGzgpPrPsZ5DDXDXwL5p+GUEvpAFt6I3/YUHoah+ckYmKNh9qwmWKkFZHxJVdRrTewGFRj0FPw5fqje1yxA== @@ -242,6 +276,13 @@ default-value "^1.0.0" flatten "^1.0.3" +"@validatem/is-array@^0.1.0": + version "0.1.1" + resolved "https://registry.yarnpkg.com/@validatem/is-array/-/is-array-0.1.1.tgz#fbe15ca8c97c30b622a5bbeb536d341e99cfc2c5" + integrity sha512-XD3C+Nqfpnbb4oO//Ufodzvui7SsCIW/stxZ39dP/fyRsBHrdERinkFATH5HepegtDlWMQswm5m1XFRbQiP2oQ== + dependencies: + "@validatem/error" "^1.0.0" + "@validatem/is-boolean@^0.1.1": version "0.1.1" resolved "https://registry.yarnpkg.com/@validatem/is-boolean/-/is-boolean-0.1.1.tgz#b7fafd4143ab6d23bca597c86d8c4e0ba6f6cacf" @@ -316,6 +357,11 @@ dependencies: "@validatem/error" "^1.0.0" +"@validatem/remove-nullish-items@^0.1.0": + version "0.1.0" + resolved "https://registry.yarnpkg.com/@validatem/remove-nullish-items/-/remove-nullish-items-0.1.0.tgz#fe1a8b64d11276b506fae2bd2c41da4985a5b5ff" + integrity sha512-cs4YSF47TA/gHnV5muSUUqGi5PwybP5ztu5SYnPKxQVTyubvcbrFat51nOvJ2PmUasyrIccoYMmATiviXkTi6g== + "@validatem/required@^0.1.0", "@validatem/required@^0.1.1": version "0.1.1" resolved "https://registry.yarnpkg.com/@validatem/required/-/required-0.1.1.tgz#64f4a87333fc5955511634036b7f8948ed269170" @@ -353,6 +399,19 @@ default-value "^1.0.0" split-filter-n "^1.1.2" +"@validatem/wrap-error@^0.3.0": + version "0.3.0" + resolved "https://registry.yarnpkg.com/@validatem/wrap-error/-/wrap-error-0.3.0.tgz#f8d170e79b6fdd68321d82c60581ad345be7d6b9" + integrity sha512-km5v6F/Xm7j8W/tmCmht2BTzxMLSpBUJ5MdhJD7ABEut/fdO0tNca1u1imTnWCULCJcdDHbNtpSmDMvXFg3E7Q== + dependencies: + "@validatem/combinator" "^0.1.1" + "@validatem/error" "^1.0.0" + "@validatem/match-validation-error" "^0.1.0" + "@validatem/validation-result" "^0.1.2" + as-expression "^1.0.0" + default-value "^1.0.0" + split-filter-n "^1.1.2" + "@validatem/wrap-value-as-option@^0.1.0": version "0.1.0" resolved "https://registry.yarnpkg.com/@validatem/wrap-value-as-option/-/wrap-value-as-option-0.1.0.tgz#57fa8d535f6cdf40cf8c8846ad45f4dd68f44568" @@ -434,6 +493,13 @@ create-error@^0.3.1: resolved "https://registry.yarnpkg.com/create-error/-/create-error-0.3.1.tgz#69810245a629e654432bf04377360003a5351a23" integrity sha1-aYECRaYp5lRDK/BDdzYAA6U1GiM= +debug@^4.3.1: + version "4.3.1" + resolved "https://registry.yarnpkg.com/debug/-/debug-4.3.1.tgz#f0d229c505e0c6d8c49ac553d1b13dc183f6b2ee" + integrity sha512-doEwdvm4PCeK4K3RQN2ZC2BYUBaxwLARCqZmMjtF8a51J2Rb0xpVloFRnCODwqjpwnAoao4pelN8l3RJdv3gRQ== + dependencies: + ms "2.1.2" + default-value@^1.0.0: version "1.0.0" resolved "https://registry.yarnpkg.com/default-value/-/default-value-1.0.0.tgz#8c6f52a5a1193fe78fdc9f86eb71d16c9757c83a" @@ -569,6 +635,16 @@ is.object@^1.0.0: resolved "https://registry.yarnpkg.com/is.object/-/is.object-1.0.0.tgz#e4f4117e9f083b35c8df5cf817ea3efb0452fdfa" integrity sha1-5PQRfp8IOzXI31z4F+o++wRS/fo= +ms@2.1.2: + version "2.1.2" + resolved "https://registry.yarnpkg.com/ms/-/ms-2.1.2.tgz#d09d1f357b443f493382a8eb3ccd183872ae6009" + integrity sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w== + +p-defer@^3.0.0: + version "3.0.0" + resolved "https://registry.yarnpkg.com/p-defer/-/p-defer-3.0.0.tgz#d1dceb4ee9b2b604b1d94ffec83760175d4e6f83" + integrity sha512-ugZxsxmtTln604yeYd29EGrNhazN2lywetzpKhfmQjW/VJmhpDmWbiX+h0zL8V91R0UXkhb3KtPmyq9PZw3aYw== + split-filter-n@^1.1.2: version "1.1.2" resolved "https://registry.yarnpkg.com/split-filter-n/-/split-filter-n-1.1.2.tgz#268be1ec9c4d93dfb27b030c06165ac1b6f70f66"