Add debugging, force sequential operation

master
Sven Slootweg 3 years ago
parent 80975a80f5
commit d927fc4b80

@ -1,13 +1,18 @@
"use strict";
const Promise = require("bluebird");
const propagateAbort = require("@promistream/propagate-abort");
const unreachable = require("@joepie91/unreachable")("@promistream/buffer");
const util = require("util");
const debug = require("debug")("promistream:buffer");
const propagateAbort = require("@promistream/propagate-abort");
const sequentialize = require("@promistream/sequentialize");
const pipe = require("@promistream/pipe");
module.exports = function bufferStream() {
let buffer;
let bufferPeeked;
let hasPeekedUpstream = false;
function attemptRead(readFunc) {
return Promise.try(() => {
@ -15,6 +20,8 @@ module.exports = function bufferStream() {
}).then((result) => {
if (Array.isArray(result)) {
if (result.length > 0) {
debug(`Buffered ${result.length} items`);
buffer = result;
bufferPeeked = result.map(() => false);
} else {
@ -30,35 +37,70 @@ module.exports = function bufferStream() {
});
}
return {
function peekLocal() {
let unpeekedIndex = bufferPeeked.indexOf(false);
if (unpeekedIndex >= 0) {
bufferPeeked[unpeekedIndex] = true;
return true;
}
}
function peekUpstream(source) {
if (!hasPeekedUpstream) {
debug("Peeking upstream");
hasPeekedUpstream = true;
return source.peek();
} else if (buffer == null) {
debug("Not allowed to peek upstream, reading first");
return Promise.try(() => {
return attemptRead(source.read);
}).then(() => {
return peekLocal();
});
} else {
unreachable("Attempted to peek upstream while there is already a local buffer");
}
}
let bufferStream = {
_promistreamVersion: 0,
description: "buffer stream",
abort: propagateAbort,
peek: function peekValue_bufferedSourceStream(source) {
debug("Got peek request");
if (buffer == null) {
return source.peek();
debug("No buffer, forwarding peek to upstream");
return peekUpstream(source);
} else {
let unpeekedIndex = bufferPeeked.indexOf(false);
if (unpeekedIndex > 0) {
bufferPeeked[unpeekedIndex] = true;
if (peekLocal()) {
debug("Answering peek request");
return true;
} else {
return source.peek();
debug("Buffer fully peeked, forwarding peek to upstream");
return peekUpstream(source);
}
}
},
read: function produceValue_bufferedSourceStream(source) {
debug("Got read request");
return Promise.try(() => {
if (buffer == null) {
debug("Waiting for upstream read...");
return attemptRead(source.read);
}
}).then(() => {
if (buffer != null) {
let value = buffer.shift();
bufferPeeked.shift();
debug(`Picked item from buffer, ${buffer.length} items left`);
if (buffer.length === 0) {
debug(`Buffer empty, unsetting...`);
buffer = null;
bufferPeeked = null;
}
@ -70,4 +112,10 @@ module.exports = function bufferStream() {
});
}
};
return pipe([
bufferStream,
// FIXME: Can this be made parallelism-safe without force-sequentializing?
sequentialize()
]);
};

@ -7,13 +7,15 @@
"license": "WTFPL OR CC0-1.0",
"dependencies": {
"@joepie91/unreachable": "^1.0.0",
"@promistream/pipe": "^0.1.4",
"@promistream/propagate-abort": "^0.1.2",
"bluebird": "^3.5.4"
"@promistream/sequentialize": "^0.1.0",
"bluebird": "^3.5.4",
"debug": "^4.3.1"
},
"devDependencies": {
"@promistream/collect": "^0.1.0",
"@promistream/map": "^0.1.0",
"@promistream/pipe": "^0.1.0",
"@promistream/range-numbers": "^0.1.1"
}
}

@ -52,10 +52,19 @@
"@validatem/required" "^0.1.1"
bluebird "^3.5.4"
"@promistream/pipe@^0.1.0":
version "0.1.2"
resolved "https://registry.yarnpkg.com/@promistream/pipe/-/pipe-0.1.2.tgz#3e468916247e762e5ec90a831779e24677e80e57"
integrity sha512-J0SmZcEPZ7+If9Q3zeG6OmgfGHyiQe75iv0RMEXsUwxDQ12y4yDHkK7W09VhULAQ69JGk5j4SICk5LgBZlb60A==
"@promistream/pipe@^0.1.4":
version "0.1.4"
resolved "https://registry.yarnpkg.com/@promistream/pipe/-/pipe-0.1.4.tgz#ef05fe582a33768c7eb56ad20635e1b7b48ac95b"
integrity sha512-4js6lhu/aTNEMosIBFcCz8Rkxc1S2V4zzI2QvZp9HqglhL5UTuxnv5VbU2ZlPFAFVID1aJOurZ8KdiVagHfOCw==
dependencies:
"@validatem/allow-extra-properties" "^0.1.0"
"@validatem/anything" "^0.1.0"
"@validatem/array-of" "^0.1.2"
"@validatem/core" "^0.3.15"
"@validatem/error" "^1.1.0"
"@validatem/remove-nullish-items" "^0.1.0"
"@validatem/required" "^0.1.1"
"@validatem/wrap-error" "^0.3.0"
"@promistream/propagate-abort@^0.1.2", "@promistream/propagate-abort@^0.1.6":
version "0.1.6"
@ -75,6 +84,16 @@
"@promistream/end-of-stream" "^0.1.0"
"@promistream/simple-source" "^0.1.0"
"@promistream/sequentialize@^0.1.0":
version "0.1.0"
resolved "https://registry.yarnpkg.com/@promistream/sequentialize/-/sequentialize-0.1.0.tgz#8cab499c2518ee856fcb1e13943859ca5b77ba71"
integrity sha512-lm7wJmlOSmBvHq49zLfs3cghOt9kcRhLezCbuhXQUXhhiaKLCvYuyA1AGId0kiJDPX2SggrU3Ojb+TOcxPEAqw==
dependencies:
"@joepie91/unreachable" "^1.0.0"
"@promistream/propagate-abort" "^0.1.2"
bluebird "^3.5.4"
p-defer "^3.0.0"
"@promistream/simple-sink@^0.1.0":
version "0.1.1"
resolved "https://registry.yarnpkg.com/@promistream/simple-sink/-/simple-sink-0.1.1.tgz#e3808179102ffe4bc10d70d681f19c649e1f3811"
@ -133,12 +152,27 @@
"@validatem/virtual-property" "^0.1.0"
default-value "^1.0.0"
"@validatem/anything@^0.1.0":
version "0.1.0"
resolved "https://registry.yarnpkg.com/@validatem/anything/-/anything-0.1.0.tgz#09b57720476b9f7ab072c3e5d0a3d4234b721435"
integrity sha512-VJcygPpLw2fAhh29m2qL1AybHY7Ewl7xpvVgNIZpqUwMsSZXWSmzmbZhqE4Sr6Wy2n6FbZVzVoUFREO589SPcQ==
"@validatem/array-of@^0.1.2":
version "0.1.2"
resolved "https://registry.yarnpkg.com/@validatem/array-of/-/array-of-0.1.2.tgz#59c09879fb41c583e45b210e7f7c78fd7f86ac33"
integrity sha512-3YjrZOxxlburFfRdJyPWbNoAA7a72E3/2nPCyVGTE8lekQy9NZSyrPjntMozwE14rsnGGLWFLCgNWKu73cyhxQ==
dependencies:
"@validatem/annotate-errors" "^0.1.2"
"@validatem/combinator" "^0.1.0"
"@validatem/is-array" "^0.1.0"
"@validatem/validation-result" "^0.1.1"
"@validatem/combinator@^0.1.0", "@validatem/combinator@^0.1.1":
version "0.1.2"
resolved "https://registry.yarnpkg.com/@validatem/combinator/-/combinator-0.1.2.tgz#eab893d55f1643b9c6857eaf6ff7ed2a728e89ff"
integrity sha512-vE8t1tNXknmN62FlN6LxQmA2c6TwVKZ+fl/Wit3H2unFdOhu7SZj2kRPGjAXdK/ARh/3svYfUBeD75pea0j1Sw==
"@validatem/core@^0.3.10":
"@validatem/core@^0.3.10", "@validatem/core@^0.3.15":
version "0.3.15"
resolved "https://registry.yarnpkg.com/@validatem/core/-/core-0.3.15.tgz#645a0734dbc6efa3a5c39c62c5f2d8fa773f89f3"
integrity sha512-4nBLGzgpPrPsZ5DDXDXwL5p+GUEvpAFt6I3/YUHoah+ckYmKNh9qwmWKkFZHxJVdRrTewGFRj0FPw5fqje1yxA==
@ -242,6 +276,13 @@
default-value "^1.0.0"
flatten "^1.0.3"
"@validatem/is-array@^0.1.0":
version "0.1.1"
resolved "https://registry.yarnpkg.com/@validatem/is-array/-/is-array-0.1.1.tgz#fbe15ca8c97c30b622a5bbeb536d341e99cfc2c5"
integrity sha512-XD3C+Nqfpnbb4oO//Ufodzvui7SsCIW/stxZ39dP/fyRsBHrdERinkFATH5HepegtDlWMQswm5m1XFRbQiP2oQ==
dependencies:
"@validatem/error" "^1.0.0"
"@validatem/is-boolean@^0.1.1":
version "0.1.1"
resolved "https://registry.yarnpkg.com/@validatem/is-boolean/-/is-boolean-0.1.1.tgz#b7fafd4143ab6d23bca597c86d8c4e0ba6f6cacf"
@ -316,6 +357,11 @@
dependencies:
"@validatem/error" "^1.0.0"
"@validatem/remove-nullish-items@^0.1.0":
version "0.1.0"
resolved "https://registry.yarnpkg.com/@validatem/remove-nullish-items/-/remove-nullish-items-0.1.0.tgz#fe1a8b64d11276b506fae2bd2c41da4985a5b5ff"
integrity sha512-cs4YSF47TA/gHnV5muSUUqGi5PwybP5ztu5SYnPKxQVTyubvcbrFat51nOvJ2PmUasyrIccoYMmATiviXkTi6g==
"@validatem/required@^0.1.0", "@validatem/required@^0.1.1":
version "0.1.1"
resolved "https://registry.yarnpkg.com/@validatem/required/-/required-0.1.1.tgz#64f4a87333fc5955511634036b7f8948ed269170"
@ -353,6 +399,19 @@
default-value "^1.0.0"
split-filter-n "^1.1.2"
"@validatem/wrap-error@^0.3.0":
version "0.3.0"
resolved "https://registry.yarnpkg.com/@validatem/wrap-error/-/wrap-error-0.3.0.tgz#f8d170e79b6fdd68321d82c60581ad345be7d6b9"
integrity sha512-km5v6F/Xm7j8W/tmCmht2BTzxMLSpBUJ5MdhJD7ABEut/fdO0tNca1u1imTnWCULCJcdDHbNtpSmDMvXFg3E7Q==
dependencies:
"@validatem/combinator" "^0.1.1"
"@validatem/error" "^1.0.0"
"@validatem/match-validation-error" "^0.1.0"
"@validatem/validation-result" "^0.1.2"
as-expression "^1.0.0"
default-value "^1.0.0"
split-filter-n "^1.1.2"
"@validatem/wrap-value-as-option@^0.1.0":
version "0.1.0"
resolved "https://registry.yarnpkg.com/@validatem/wrap-value-as-option/-/wrap-value-as-option-0.1.0.tgz#57fa8d535f6cdf40cf8c8846ad45f4dd68f44568"
@ -434,6 +493,13 @@ create-error@^0.3.1:
resolved "https://registry.yarnpkg.com/create-error/-/create-error-0.3.1.tgz#69810245a629e654432bf04377360003a5351a23"
integrity sha1-aYECRaYp5lRDK/BDdzYAA6U1GiM=
debug@^4.3.1:
version "4.3.1"
resolved "https://registry.yarnpkg.com/debug/-/debug-4.3.1.tgz#f0d229c505e0c6d8c49ac553d1b13dc183f6b2ee"
integrity sha512-doEwdvm4PCeK4K3RQN2ZC2BYUBaxwLARCqZmMjtF8a51J2Rb0xpVloFRnCODwqjpwnAoao4pelN8l3RJdv3gRQ==
dependencies:
ms "2.1.2"
default-value@^1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/default-value/-/default-value-1.0.0.tgz#8c6f52a5a1193fe78fdc9f86eb71d16c9757c83a"
@ -569,6 +635,16 @@ is.object@^1.0.0:
resolved "https://registry.yarnpkg.com/is.object/-/is.object-1.0.0.tgz#e4f4117e9f083b35c8df5cf817ea3efb0452fdfa"
integrity sha1-5PQRfp8IOzXI31z4F+o++wRS/fo=
ms@2.1.2:
version "2.1.2"
resolved "https://registry.yarnpkg.com/ms/-/ms-2.1.2.tgz#d09d1f357b443f493382a8eb3ccd183872ae6009"
integrity sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==
p-defer@^3.0.0:
version "3.0.0"
resolved "https://registry.yarnpkg.com/p-defer/-/p-defer-3.0.0.tgz#d1dceb4ee9b2b604b1d94ffec83760175d4e6f83"
integrity sha512-ugZxsxmtTln604yeYd29EGrNhazN2lywetzpKhfmQjW/VJmhpDmWbiX+h0zL8V91R0UXkhb3KtPmyq9PZw3aYw==
split-filter-n@^1.1.2:
version "1.1.2"
resolved "https://registry.yarnpkg.com/split-filter-n/-/split-filter-n-1.1.2.tgz#268be1ec9c4d93dfb27b030c06165ac1b6f70f66"

Loading…
Cancel
Save