You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

74 lines
1.7 KiB
JavaScript

"use strict";
const Promise = require("bluebird");
const propagateAbort = require("@promistream/propagate-abort");
const unreachable = require("@joepie91/unreachable")("@promistream/buffer");
const util = require("util");
module.exports = function bufferStream() {
let buffer;
let bufferPeeked;
function attemptRead(readFunc) {
return Promise.try(() => {
return readFunc();
}).then((result) => {
if (Array.isArray(result)) {
if (result.length > 0) {
buffer = result;
bufferPeeked = result.map(() => false);
} else {
return attemptRead(readFunc);
}
} else {
// FIXME: Make this its own module, and improve its compactness?
let stringified = util.inspect(result, { depth: 0 })
.replace(/\s+/g, " ");
throw new Error(`Buffered stream produced a non-array value: ${stringified}`);
}
});
}
return {
_promistreamVersion: 0,
description: "buffer stream",
abort: propagateAbort,
peek: function peekValue_bufferedSourceStream(source) {
if (buffer == null) {
return source.peek();
} else {
let unpeekedIndex = bufferPeeked.indexOf(false);
if (unpeekedIndex > 0) {
bufferPeeked[unpeekedIndex] = true;
return true;
} else {
return source.peek();
}
}
},
read: function produceValue_bufferedSourceStream(source) {
return Promise.try(() => {
if (buffer == null) {
return attemptRead(source.read);
}
}).then(() => {
if (buffer != null) {
let value = buffer.shift();
bufferPeeked.shift();
if (buffer.length === 0) {
buffer = null;
bufferPeeked = null;
}
return value;
} else {
unreachable("No buffer was present");
}
});
}
};
};