*Use once, not on

master
Derrick Hammer 8 years ago
parent 9ddcb57106
commit f0d9b9d0ed

@ -31,11 +31,11 @@ module.exports = function createTaskQueue(options) {
let counters = {}; let counters = {};
let starts = {}; let starts = {};
let running = {}; let running = {};
function tryRunTask(type) { function tryRunTask(type) {
let maxTasks = defaultValue(taskOptions[type].concurrency, Infinity); let maxTasks = defaultValue(taskOptions[type].concurrency, Infinity);
let waitTime = remainingInterval(type); let waitTime = remainingInterval(type);
if (tasks[type].length > 0) { if (tasks[type].length > 0) {
if (waitTime <= 0) { if (waitTime <= 0) {
if (counters[type] < maxTasks) { if (counters[type] < maxTasks) {
@ -43,7 +43,7 @@ module.exports = function createTaskQueue(options) {
debugLoop(`Queue for '${type}' is now running`); debugLoop(`Queue for '${type}' is now running`);
markQueueRunning(type); markQueueRunning(type);
} }
runTask(type); runTask(type);
} else { } else {
debugLoop(`Reached concurrency for '${type}'`); debugLoop(`Reached concurrency for '${type}'`);
@ -52,7 +52,7 @@ module.exports = function createTaskQueue(options) {
} else { } else {
debugLoop(`Registering queue delay for '${type}'`); debugLoop(`Registering queue delay for '${type}'`);
taskQueue.emit(`delayed:${type}`); taskQueue.emit(`delayed:${type}`);
setTimeout(() => { setTimeout(() => {
tryRunTask(type); tryRunTask(type);
}, waitTime); }, waitTime);
@ -64,19 +64,19 @@ module.exports = function createTaskQueue(options) {
} }
} }
} }
function remainingInterval(type) { function remainingInterval(type) {
let taskInterval = defaultValue(taskOptions[type].interval, 0) * 1000; let taskInterval = defaultValue(taskOptions[type].interval, 0) * 1000;
let lastTask = defaultValue(starts[type], 0); let lastTask = defaultValue(starts[type], 0);
return (lastTask + taskInterval) - Date.now(); return (lastTask + taskInterval) - Date.now();
} }
function runTask(type) { function runTask(type) {
let task = tasks[type].shift(); let task = tasks[type].shift();
markStarted(type, task); markStarted(type, task);
Promise.try(() => { Promise.try(() => {
return handlers[type](task.data); return handlers[type](task.data);
}).then((result) => { }).then((result) => {
@ -89,45 +89,45 @@ module.exports = function createTaskQueue(options) {
tryRunTask(type); tryRunTask(type);
}); });
} }
function markStarted(type, task) { function markStarted(type, task) {
debugTasks(`markStarted (${type}): ${debugObject(task.data)}`); debugTasks(`markStarted (${type}): ${debugObject(task.data)}`);
counters[type] += 1; counters[type] += 1;
starts[type] = Date.now(); starts[type] = Date.now();
taskQueue.emit(`started:${type}`, task.data); taskQueue.emit(`started:${type}`, task.data);
} }
function markFinished(type, task) { function markFinished(type, task) {
debugTasks(`markFinished (${type}): ${debugObject(task.data)}`); debugTasks(`markFinished (${type}): ${debugObject(task.data)}`);
counters[type] -= 1; counters[type] -= 1;
taskQueue.emit(`finished:${type}`, task.data); taskQueue.emit(`finished:${type}`, task.data);
checkCompletion(type); checkCompletion(type);
} }
function markSuccess(type, task) { function markSuccess(type, task) {
debugTasks(`markSuccess (${type}): ${debugObject(task.data)}`); debugTasks(`markSuccess (${type}): ${debugObject(task.data)}`);
markFinished(type, task); markFinished(type, task);
taskQueue.emit(`success:${type}`, task.data); taskQueue.emit(`success:${type}`, task.data);
} }
function markFailed(type, task) { function markFailed(type, task) {
debugTasks(`markFailed (${type}): ${debugObject(task.data)}`); debugTasks(`markFailed (${type}): ${debugObject(task.data)}`);
markFinished(type, task); markFinished(type, task);
taskQueue.emit(`failed:${type}`, task.data); taskQueue.emit(`failed:${type}`, task.data);
} }
function markQueueRunning(type) { function markQueueRunning(type) {
debugLoop(`markQueueRunning (${type})`); debugLoop(`markQueueRunning (${type})`);
taskQueue.emit(`queueRunning:${type}`); taskQueue.emit(`queueRunning:${type}`);
running[type] = true; running[type] = true;
} }
function markQueueDrained(type) { function markQueueDrained(type) {
debugLoop(`markQueueDrained (${type})`); debugLoop(`markQueueDrained (${type})`);
taskQueue.emit(`queueDrained:${type}`); taskQueue.emit(`queueDrained:${type}`);
running[type] = false; running[type] = false;
} }
function markQueueCompleted(type) { function markQueueCompleted(type) {
debugLoop(`markQueueCompleted (${type})`); debugLoop(`markQueueCompleted (${type})`);
taskQueue.emit(`queueCompleted:${type}`); taskQueue.emit(`queueCompleted:${type}`);
@ -156,7 +156,7 @@ module.exports = function createTaskQueue(options) {
if (handlers[type] == null) { if (handlers[type] == null) {
throw new TaskQueueError("No such task type exists.") throw new TaskQueueError("No such task type exists.")
} }
debugTasks(`Queueing new task for '${type}': ${debugObject(data)}`); debugTasks(`Queueing new task for '${type}': ${debugObject(data)}`);
let resolveFunc, rejectFunc; let resolveFunc, rejectFunc;
@ -170,9 +170,9 @@ module.exports = function createTaskQueue(options) {
resolve: resolveFunc, resolve: resolveFunc,
reject: rejectFunc reject: rejectFunc
}); });
tryRunTask(type); tryRunTask(type);
return deferredPromise; return deferredPromise;
}) })
}, },
@ -198,7 +198,7 @@ module.exports = function createTaskQueue(options) {
} else { } else {
debugLoop(`Returning awaitDrained Promise for '${type}'`); debugLoop(`Returning awaitDrained Promise for '${type}'`);
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
this.on(`queueDrained:${type}`, () => { this.once(`queueDrained:${type}`, () => {
debugLoop(`Resolving awaitDrained Promise for '${type}'`); debugLoop(`Resolving awaitDrained Promise for '${type}'`);
resolve(); resolve();
}) })
@ -220,7 +220,7 @@ module.exports = function createTaskQueue(options) {
} else { } else {
debugLoop(`Returning awaitCompleted Promise for '${type}'`); debugLoop(`Returning awaitCompleted Promise for '${type}'`);
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
this.on(`queueCompleted:${type}`, () => { this.once(`queueCompleted:${type}`, () => {
debugLoop(`Resolving awaitCompleted Promise for '${type}'`); debugLoop(`Resolving awaitCompleted Promise for '${type}'`);
resolve(); resolve();
}) })
@ -229,6 +229,6 @@ module.exports = function createTaskQueue(options) {
}) })
} }
}); });
return taskQueue; return taskQueue;
} }

Loading…
Cancel
Save