You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

122 lines
3.0 KiB
JavaScript

"use strict";
const Promise = require("bluebird");
const unreachable = require("@joepie91/unreachable")("@promistream/buffer");
const util = require("util");
const debug = require("debug")("promistream:buffer");
const propagateAbort = require("@promistream/propagate-abort");
const sequentialize = require("@promistream/sequentialize");
const pipe = require("@promistream/pipe");
module.exports = function bufferStream() {
let buffer;
let bufferPeeked;
let hasPeekedUpstream = false;
function attemptRead(readFunc) {
return Promise.try(() => {
return readFunc();
}).then((result) => {
if (Array.isArray(result)) {
if (result.length > 0) {
debug(`Buffered ${result.length} items`);
buffer = result;
bufferPeeked = result.map(() => false);
} else {
return attemptRead(readFunc);
}
} else {
// FIXME: Make this its own module, and improve its compactness?
let stringified = util.inspect(result, { depth: 0 })
.replace(/\s+/g, " ");
throw new Error(`Buffered stream produced a non-array value: ${stringified}`);
}
});
}
function peekLocal() {
let unpeekedIndex = bufferPeeked.indexOf(false);
if (unpeekedIndex >= 0) {
bufferPeeked[unpeekedIndex] = true;
return true;
}
}
function peekUpstream(source) {
if (!hasPeekedUpstream) {
debug("Peeking upstream");
hasPeekedUpstream = true;
return source.peek();
} else if (buffer == null) {
debug("Not allowed to peek upstream, reading first");
return Promise.try(() => {
return attemptRead(source.read);
}).then(() => {
return peekLocal();
});
} else {
unreachable("Attempted to peek upstream while there is already a local buffer");
}
}
let bufferStream = {
_promistreamVersion: 0,
description: "buffer stream",
abort: propagateAbort,
peek: function peekValue_bufferedSourceStream(source) {
debug("Got peek request");
if (buffer == null) {
debug("No buffer, forwarding peek to upstream");
return peekUpstream(source);
} else {
if (peekLocal()) {
debug("Answering peek request");
return true;
} else {
debug("Buffer fully peeked, forwarding peek to upstream");
return peekUpstream(source);
}
}
},
read: function produceValue_bufferedSourceStream(source) {
debug("Got read request");
return Promise.try(() => {
if (buffer == null) {
debug("Waiting for upstream read...");
return attemptRead(source.read);
}
}).then(() => {
if (buffer != null) {
let value = buffer.shift();
bufferPeeked.shift();
debug(`Picked item from buffer, ${buffer.length} items left`);
if (buffer.length === 0) {
debug(`Buffer empty, unsetting...`);
buffer = null;
bufferPeeked = null;
}
return value;
} else {
unreachable("No buffer was present");
}
});
}
};
return pipe([
bufferStream,
// FIXME: Can this be made parallelism-safe without force-sequentializing?
sequentialize()
]);
};