From 27195f54ab5d601ba01fac817a3cd693ec96198e Mon Sep 17 00:00:00 2001 From: Sven Slootweg Date: Fri, 3 Jun 2016 02:44:14 +0200 Subject: [PATCH] Fixed bug in awaitDrained that made it never resolve, and add debug statements --- lib/index.js | 28 ++++++++++++++++++++++++++-- package.json | 1 + src/index.js | 26 +++++++++++++++++++++++++- 3 files changed, 52 insertions(+), 3 deletions(-) diff --git a/lib/index.js b/lib/index.js index ba55549..8d70f0f 100644 --- a/lib/index.js +++ b/lib/index.js @@ -4,6 +4,13 @@ var Promise = require("bluebird"); var events = require("events"); var extend = require("extend"); var createError = require("create-error"); +var debugLoop = require("debug")("promise-task-queue:loop"); +var debugTasks = require("debug")("promise-task-queue:tasks"); +var util = require("util"); + +function debugObject(obj) { + return util.inspect(obj, { depth: null, colors: true }).replace(/\n|\r/g, ""); +} var TaskQueueError = createError("TaskQueueError", { code: "TaskQueueError" @@ -33,14 +40,17 @@ module.exports = function createTaskQueue(options) { if (waitTime <= 0) { if (counters[type] < maxTasks) { if (running[type] === false) { + debugLoop("Queue for '" + type + "' is now running"); markQueueRunning(type); } runTask(type); } else { + debugLoop("Reached concurrency for '" + type + "'"); taskQueue.emit("concurrencyReached:" + type); } } else { + debugLoop("Registering queue delay for '" + type + "'"); taskQueue.emit("delayed:" + type); setTimeout(function () { @@ -49,6 +59,7 @@ module.exports = function createTaskQueue(options) { } } else { if (running[type] === true) { + debugLoop("Queue for '" + type + "' has now stopped"); markQueueDrained(type); } } @@ -80,32 +91,38 @@ module.exports = function createTaskQueue(options) { } function markStarted(type, task) { + debugTasks("markStarted: " + debugObject(task.data)); counters[type] += 1; starts[type] = Date.now(); taskQueue.emit("started:" + type, task.data); } function markFinished(type, task) { + debugTasks("markFinished: " + debugObject(task.data)); counters[type] -= 1; taskQueue.emit("finished:" + type, task.data); } function markSuccess(type, task) { + debugTasks("markSuccess: " + debugObject(task.data)); markFinished(type, task); taskQueue.emit("success:" + type, task.data); } function markFailed(type, task) { + debugTasks("markFailed: " + debugObject(task.data)); markFinished(type, task); taskQueue.emit("failed:" + type, task.data); } function markQueueRunning(type) { + debugLoop("markQueueRunning"); taskQueue.emit("queueRunning:" + type); running[type] = true; } function markQueueDrained(type) { + debugLoop("markQueueDrained"); taskQueue.emit("queueDrained:" + type); running[type] = false; } @@ -128,6 +145,8 @@ module.exports = function createTaskQueue(options) { throw new TaskQueueError("No such task type exists."); } + debugTasks("Queueing new task for '" + type + "': " + debugObject(data)); + var resolveFunc = void 0, rejectFunc = void 0; var deferredPromise = new Promise(function (resolve, reject) { @@ -154,11 +173,16 @@ module.exports = function createTaskQueue(options) { throw new TaskQueueError("No such task type exists."); } + debugLoop("awaitDrained requested for '" + type + "'"); + if (tasks[type].length === 0) { + debugLoop("Queue for '" + type + "' is already drained"); return; } else { + debugLoop("Returning awaitDrained Promise for '" + type + "'"); return new Promise(function (resolve, reject) { - _this.on("queueDrained:type", function () { + _this.on("queueDrained:" + type, function () { + debugLoop("Resolving awaitDrained Promise for '" + type + "'"); resolve(); }); }); @@ -168,4 +192,4 @@ module.exports = function createTaskQueue(options) { }); return taskQueue; -}; +}; \ No newline at end of file diff --git a/package.json b/package.json index 68e22e5..1888743 100644 --- a/package.json +++ b/package.json @@ -20,6 +20,7 @@ "dependencies": { "bluebird": "^3.3.3", "create-error": "^0.3.1", + "debug": "^2.2.0", "extend": "^3.0.0" }, "devDependencies": { diff --git a/src/index.js b/src/index.js index c9b6c41..a5d8fbc 100644 --- a/src/index.js +++ b/src/index.js @@ -4,6 +4,13 @@ const Promise = require("bluebird"); const events = require("events"); const extend = require("extend"); const createError = require("create-error"); +const debugLoop = require("debug")("promise-task-queue:loop"); +const debugTasks = require("debug")("promise-task-queue:tasks"); +const util = require("util"); + +function debugObject(obj) { + return util.inspect(obj, {depth: null, colors: true}).replace(/\n|\r/g, ""); +} const TaskQueueError = createError("TaskQueueError", { code: "TaskQueueError" @@ -33,14 +40,17 @@ module.exports = function createTaskQueue(options) { if (waitTime <= 0) { if (counters[type] < maxTasks) { if (running[type] === false) { + debugLoop(`Queue for '${type}' is now running`); markQueueRunning(type); } runTask(type); } else { + debugLoop(`Reached concurrency for '${type}'`); taskQueue.emit(`concurrencyReached:${type}`); } } else { + debugLoop(`Registering queue delay for '${type}'`); taskQueue.emit(`delayed:${type}`); setTimeout(() => { @@ -49,6 +59,7 @@ module.exports = function createTaskQueue(options) { } } else { if (running[type] === true) { + debugLoop(`Queue for '${type}' has now stopped`); markQueueDrained(type); } } @@ -80,32 +91,38 @@ module.exports = function createTaskQueue(options) { } function markStarted(type, task) { + debugTasks(`markStarted: ${debugObject(task.data)}`); counters[type] += 1; starts[type] = Date.now(); taskQueue.emit(`started:${type}`, task.data); } function markFinished(type, task) { + debugTasks(`markFinished: ${debugObject(task.data)}`); counters[type] -= 1; taskQueue.emit(`finished:${type}`, task.data); } function markSuccess(type, task) { + debugTasks(`markSuccess: ${debugObject(task.data)}`); markFinished(type, task); taskQueue.emit(`success:${type}`, task.data); } function markFailed(type, task) { + debugTasks(`markFailed: ${debugObject(task.data)}`); markFinished(type, task); taskQueue.emit(`failed:${type}`, task.data); } function markQueueRunning(type) { + debugLoop("markQueueRunning"); taskQueue.emit(`queueRunning:${type}`); running[type] = true; } function markQueueDrained(type) { + debugLoop("markQueueDrained"); taskQueue.emit(`queueDrained:${type}`); running[type] = false; } @@ -128,6 +145,8 @@ module.exports = function createTaskQueue(options) { throw new TaskQueueError("No such task type exists.") } + debugTasks(`Queueing new task for '${type}': ${debugObject(data)}`); + let resolveFunc, rejectFunc; let deferredPromise = new Promise((resolve, reject) => { resolveFunc = resolve; @@ -151,11 +170,16 @@ module.exports = function createTaskQueue(options) { throw new TaskQueueError("No such task type exists.") } + debugLoop(`awaitDrained requested for '${type}'`); + if (tasks[type].length === 0) { + debugLoop(`Queue for '${type}' is already drained`); return; } else { + debugLoop(`Returning awaitDrained Promise for '${type}'`); return new Promise((resolve, reject) => { - this.on(`queueDrained:type`, () => { + this.on(`queueDrained:${type}`, () => { + debugLoop(`Resolving awaitDrained Promise for '${type}'`); resolve(); }) })