You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

121 lines
2.9 KiB
JavaScript

"use strict";
const unreachable = require("@joepie91/unreachable")("@promistream/buffer");
const util = require("util");
const debug = require("debug")("promistream:buffer");
const propagateAbort = require("@promistream/propagate-abort");
const sequentialize = require("@promistream/sequentialize");
const pipe = require("@promistream/pipe");
module.exports = function bufferStream() {
let buffer;
let bufferPeeked;
let hasPeekedUpstream = false;
async function attemptRead(readFunc) {
let result = await readFunc();
if (Array.isArray(result)) {
if (result.length > 0) {
debug(`Buffered ${result.length} items`);
buffer = result;
bufferPeeked = result.map(() => false);
} else {
return await attemptRead(readFunc);
}
} else {
// FIXME: Make this its own module, and improve its compactness?
let stringified = util.inspect(result, { depth: 0 })
.replace(/\s+/g, " ");
throw new Error(`Buffered stream produced a non-array value: ${stringified}`);
}
}
function peekLocal() {
let unpeekedIndex = bufferPeeked.indexOf(false);
if (unpeekedIndex >= 0) {
bufferPeeked[unpeekedIndex] = true;
return true;
}
}
async function peekUpstream(source) {
if (!hasPeekedUpstream) {
debug("Peeking upstream");
hasPeekedUpstream = true;
return source.peek();
} else if (buffer == null) {
debug("Not allowed to peek upstream, reading first");
// FIXME: Capture error and store in buffer?
await attemptRead(source.read);
return peekLocal();
} else {
unreachable("Attempted to peek upstream while there is already a local buffer");
}
}
let bufferStream = {
_promistreamVersion: 0,
description: "buffer stream",
abort: propagateAbort,
peek: function peekValue_bufferedSourceStream(source) {
debug("Got peek request");
if (buffer == null) {
debug("No buffer, forwarding peek to upstream");
return peekUpstream(source);
} else {
if (peekLocal()) {
debug("Answering peek request");
return true;
} else {
debug("Buffer fully peeked, forwarding peek to upstream");
return peekUpstream(source);
}
}
},
read: async function produceValue_bufferedSourceStream(source) {
debug("Got read request");
if (buffer == null) {
debug("Waiting for upstream read...");
try {
await attemptRead(source.read);
} catch (error) {
debug(`Encountered error in upstream read: ${error}`);
throw error;
}
}
if (buffer != null) {
let value = buffer.shift();
bufferPeeked.shift();
debug(`Picked item from buffer, ${buffer.length} items left`);
if (buffer.length === 0) {
debug(`Buffer empty, unsetting...`);
buffer = null;
bufferPeeked = null;
}
return value;
} else {
unreachable("No buffer was present");
}
}
};
return pipe([
bufferStream,
// TODO: Can this be made parallelism-safe without force-sequentializing?
sequentialize()
]);
};