You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

66 lines
2.1 KiB
JavaScript

"use strict";
const Promise = require("bluebird");
const range = require("range").range;
const promisePullQueue = require("promise-pull-queue").default;
// FIXME: Move to separate package
module.exports = function createIndexedQueue(queueCount, getIndexForValue) {
let running = false;
let inQueue = [];
let outQueues = range(0, queueCount).map(() => new promisePullQueue());
// TODO: Ideally find a better way of handling this; especially the fallback when there are no queued pulls is pretty weird...
function pushToFirstQueue(promise) {
for (let queue of outQueues) {
if (queue.pullQueueSize > 0) {
queue.push(promise);
return;
}
}
// Nobody is pulling, push to the first queue I guess?
outQueues[0].push(promise);
}
async function startProcessing() {
if (!running) {
running = true;
return tryProcessItem();
}
}
async function tryProcessItem() {
return Promise.try(() => {
return inQueue.shift();
}).then(async (value) => {
let index = await getIndexForValue(value);
let outQueue = outQueues[index];
outQueue.push(Promise.resolve(value));
}).catch((error) => {
// If we don't have a value, then we can't associate the result with a specific forked stream, so we just grab the first one that happens to have a pending read
pushToFirstQueue(Promise.reject(error));
}).finally(() => {
if (inQueue.length > 0) {
return tryProcessItem();
} else {
running = false;
}
});
}
return function withQueue(index, getValue) {
let outQueue = outQueues[index];
let promise = outQueue.pull();
// We always queue a new read, even if we've returned a value from the buffer; maintaining a 1:1 correspondence between requested reads and fulfilled reads (even if they occur in different stream forks) ensures that the process cannot deadlock within the stream implementation. (Deadlocking due to application logic is still possible!)
// TODO: Eventually add a "keep reading until a result is yielded" mode to prevent application-level deadlocks, at the cost of larger internal buffers
inQueue.push(getValue());
startProcessing();
return promise;
};
};