From f0d9b9d0ed0980363c89ad61af03878c581784fb Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Tue, 28 Jun 2016 17:42:04 -0400 Subject: [PATCH] *Use once, not on --- src/index.js | 44 ++++++++++++++++++++++---------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/src/index.js b/src/index.js index caaab59..0b3cf27 100644 --- a/src/index.js +++ b/src/index.js @@ -31,11 +31,11 @@ module.exports = function createTaskQueue(options) { let counters = {}; let starts = {}; let running = {}; - + function tryRunTask(type) { let maxTasks = defaultValue(taskOptions[type].concurrency, Infinity); let waitTime = remainingInterval(type); - + if (tasks[type].length > 0) { if (waitTime <= 0) { if (counters[type] < maxTasks) { @@ -43,7 +43,7 @@ module.exports = function createTaskQueue(options) { debugLoop(`Queue for '${type}' is now running`); markQueueRunning(type); } - + runTask(type); } else { debugLoop(`Reached concurrency for '${type}'`); @@ -52,7 +52,7 @@ module.exports = function createTaskQueue(options) { } else { debugLoop(`Registering queue delay for '${type}'`); taskQueue.emit(`delayed:${type}`); - + setTimeout(() => { tryRunTask(type); }, waitTime); @@ -64,19 +64,19 @@ module.exports = function createTaskQueue(options) { } } } - + function remainingInterval(type) { let taskInterval = defaultValue(taskOptions[type].interval, 0) * 1000; let lastTask = defaultValue(starts[type], 0); - + return (lastTask + taskInterval) - Date.now(); } - + function runTask(type) { let task = tasks[type].shift(); - + markStarted(type, task); - + Promise.try(() => { return handlers[type](task.data); }).then((result) => { @@ -89,45 +89,45 @@ module.exports = function createTaskQueue(options) { tryRunTask(type); }); } - + function markStarted(type, task) { debugTasks(`markStarted (${type}): ${debugObject(task.data)}`); counters[type] += 1; starts[type] = Date.now(); taskQueue.emit(`started:${type}`, task.data); } - + function markFinished(type, task) { debugTasks(`markFinished (${type}): ${debugObject(task.data)}`); counters[type] -= 1; taskQueue.emit(`finished:${type}`, task.data); checkCompletion(type); } - + function markSuccess(type, task) { debugTasks(`markSuccess (${type}): ${debugObject(task.data)}`); markFinished(type, task); taskQueue.emit(`success:${type}`, task.data); } - + function markFailed(type, task) { debugTasks(`markFailed (${type}): ${debugObject(task.data)}`); markFinished(type, task); taskQueue.emit(`failed:${type}`, task.data); } - + function markQueueRunning(type) { debugLoop(`markQueueRunning (${type})`); taskQueue.emit(`queueRunning:${type}`); running[type] = true; } - + function markQueueDrained(type) { debugLoop(`markQueueDrained (${type})`); taskQueue.emit(`queueDrained:${type}`); running[type] = false; } - + function markQueueCompleted(type) { debugLoop(`markQueueCompleted (${type})`); taskQueue.emit(`queueCompleted:${type}`); @@ -156,7 +156,7 @@ module.exports = function createTaskQueue(options) { if (handlers[type] == null) { throw new TaskQueueError("No such task type exists.") } - + debugTasks(`Queueing new task for '${type}': ${debugObject(data)}`); let resolveFunc, rejectFunc; @@ -170,9 +170,9 @@ module.exports = function createTaskQueue(options) { resolve: resolveFunc, reject: rejectFunc }); - + tryRunTask(type); - + return deferredPromise; }) }, @@ -198,7 +198,7 @@ module.exports = function createTaskQueue(options) { } else { debugLoop(`Returning awaitDrained Promise for '${type}'`); return new Promise((resolve, reject) => { - this.on(`queueDrained:${type}`, () => { + this.once(`queueDrained:${type}`, () => { debugLoop(`Resolving awaitDrained Promise for '${type}'`); resolve(); }) @@ -220,7 +220,7 @@ module.exports = function createTaskQueue(options) { } else { debugLoop(`Returning awaitCompleted Promise for '${type}'`); return new Promise((resolve, reject) => { - this.on(`queueCompleted:${type}`, () => { + this.once(`queueCompleted:${type}`, () => { debugLoop(`Resolving awaitCompleted Promise for '${type}'`); resolve(); }) @@ -229,6 +229,6 @@ module.exports = function createTaskQueue(options) { }) } }); - + return taskQueue; }