Fixed bug in awaitDrained that made it never resolve, and add debug statements

master
Sven Slootweg 8 years ago
parent 47cdc9ff64
commit 27195f54ab

@ -4,6 +4,13 @@ var Promise = require("bluebird");
var events = require("events"); var events = require("events");
var extend = require("extend"); var extend = require("extend");
var createError = require("create-error"); var createError = require("create-error");
var debugLoop = require("debug")("promise-task-queue:loop");
var debugTasks = require("debug")("promise-task-queue:tasks");
var util = require("util");
function debugObject(obj) {
return util.inspect(obj, { depth: null, colors: true }).replace(/\n|\r/g, "");
}
var TaskQueueError = createError("TaskQueueError", { var TaskQueueError = createError("TaskQueueError", {
code: "TaskQueueError" code: "TaskQueueError"
@ -33,14 +40,17 @@ module.exports = function createTaskQueue(options) {
if (waitTime <= 0) { if (waitTime <= 0) {
if (counters[type] < maxTasks) { if (counters[type] < maxTasks) {
if (running[type] === false) { if (running[type] === false) {
debugLoop("Queue for '" + type + "' is now running");
markQueueRunning(type); markQueueRunning(type);
} }
runTask(type); runTask(type);
} else { } else {
debugLoop("Reached concurrency for '" + type + "'");
taskQueue.emit("concurrencyReached:" + type); taskQueue.emit("concurrencyReached:" + type);
} }
} else { } else {
debugLoop("Registering queue delay for '" + type + "'");
taskQueue.emit("delayed:" + type); taskQueue.emit("delayed:" + type);
setTimeout(function () { setTimeout(function () {
@ -49,6 +59,7 @@ module.exports = function createTaskQueue(options) {
} }
} else { } else {
if (running[type] === true) { if (running[type] === true) {
debugLoop("Queue for '" + type + "' has now stopped");
markQueueDrained(type); markQueueDrained(type);
} }
} }
@ -80,32 +91,38 @@ module.exports = function createTaskQueue(options) {
} }
function markStarted(type, task) { function markStarted(type, task) {
debugTasks("markStarted: " + debugObject(task.data));
counters[type] += 1; counters[type] += 1;
starts[type] = Date.now(); starts[type] = Date.now();
taskQueue.emit("started:" + type, task.data); taskQueue.emit("started:" + type, task.data);
} }
function markFinished(type, task) { function markFinished(type, task) {
debugTasks("markFinished: " + debugObject(task.data));
counters[type] -= 1; counters[type] -= 1;
taskQueue.emit("finished:" + type, task.data); taskQueue.emit("finished:" + type, task.data);
} }
function markSuccess(type, task) { function markSuccess(type, task) {
debugTasks("markSuccess: " + debugObject(task.data));
markFinished(type, task); markFinished(type, task);
taskQueue.emit("success:" + type, task.data); taskQueue.emit("success:" + type, task.data);
} }
function markFailed(type, task) { function markFailed(type, task) {
debugTasks("markFailed: " + debugObject(task.data));
markFinished(type, task); markFinished(type, task);
taskQueue.emit("failed:" + type, task.data); taskQueue.emit("failed:" + type, task.data);
} }
function markQueueRunning(type) { function markQueueRunning(type) {
debugLoop("markQueueRunning");
taskQueue.emit("queueRunning:" + type); taskQueue.emit("queueRunning:" + type);
running[type] = true; running[type] = true;
} }
function markQueueDrained(type) { function markQueueDrained(type) {
debugLoop("markQueueDrained");
taskQueue.emit("queueDrained:" + type); taskQueue.emit("queueDrained:" + type);
running[type] = false; running[type] = false;
} }
@ -128,6 +145,8 @@ module.exports = function createTaskQueue(options) {
throw new TaskQueueError("No such task type exists."); throw new TaskQueueError("No such task type exists.");
} }
debugTasks("Queueing new task for '" + type + "': " + debugObject(data));
var resolveFunc = void 0, var resolveFunc = void 0,
rejectFunc = void 0; rejectFunc = void 0;
var deferredPromise = new Promise(function (resolve, reject) { var deferredPromise = new Promise(function (resolve, reject) {
@ -154,11 +173,16 @@ module.exports = function createTaskQueue(options) {
throw new TaskQueueError("No such task type exists."); throw new TaskQueueError("No such task type exists.");
} }
debugLoop("awaitDrained requested for '" + type + "'");
if (tasks[type].length === 0) { if (tasks[type].length === 0) {
debugLoop("Queue for '" + type + "' is already drained");
return; return;
} else { } else {
debugLoop("Returning awaitDrained Promise for '" + type + "'");
return new Promise(function (resolve, reject) { return new Promise(function (resolve, reject) {
_this.on("queueDrained:type", function () { _this.on("queueDrained:" + type, function () {
debugLoop("Resolving awaitDrained Promise for '" + type + "'");
resolve(); resolve();
}); });
}); });
@ -168,4 +192,4 @@ module.exports = function createTaskQueue(options) {
}); });
return taskQueue; return taskQueue;
}; };

@ -20,6 +20,7 @@
"dependencies": { "dependencies": {
"bluebird": "^3.3.3", "bluebird": "^3.3.3",
"create-error": "^0.3.1", "create-error": "^0.3.1",
"debug": "^2.2.0",
"extend": "^3.0.0" "extend": "^3.0.0"
}, },
"devDependencies": { "devDependencies": {

@ -4,6 +4,13 @@ const Promise = require("bluebird");
const events = require("events"); const events = require("events");
const extend = require("extend"); const extend = require("extend");
const createError = require("create-error"); const createError = require("create-error");
const debugLoop = require("debug")("promise-task-queue:loop");
const debugTasks = require("debug")("promise-task-queue:tasks");
const util = require("util");
function debugObject(obj) {
return util.inspect(obj, {depth: null, colors: true}).replace(/\n|\r/g, "");
}
const TaskQueueError = createError("TaskQueueError", { const TaskQueueError = createError("TaskQueueError", {
code: "TaskQueueError" code: "TaskQueueError"
@ -33,14 +40,17 @@ module.exports = function createTaskQueue(options) {
if (waitTime <= 0) { if (waitTime <= 0) {
if (counters[type] < maxTasks) { if (counters[type] < maxTasks) {
if (running[type] === false) { if (running[type] === false) {
debugLoop(`Queue for '${type}' is now running`);
markQueueRunning(type); markQueueRunning(type);
} }
runTask(type); runTask(type);
} else { } else {
debugLoop(`Reached concurrency for '${type}'`);
taskQueue.emit(`concurrencyReached:${type}`); taskQueue.emit(`concurrencyReached:${type}`);
} }
} else { } else {
debugLoop(`Registering queue delay for '${type}'`);
taskQueue.emit(`delayed:${type}`); taskQueue.emit(`delayed:${type}`);
setTimeout(() => { setTimeout(() => {
@ -49,6 +59,7 @@ module.exports = function createTaskQueue(options) {
} }
} else { } else {
if (running[type] === true) { if (running[type] === true) {
debugLoop(`Queue for '${type}' has now stopped`);
markQueueDrained(type); markQueueDrained(type);
} }
} }
@ -80,32 +91,38 @@ module.exports = function createTaskQueue(options) {
} }
function markStarted(type, task) { function markStarted(type, task) {
debugTasks(`markStarted: ${debugObject(task.data)}`);
counters[type] += 1; counters[type] += 1;
starts[type] = Date.now(); starts[type] = Date.now();
taskQueue.emit(`started:${type}`, task.data); taskQueue.emit(`started:${type}`, task.data);
} }
function markFinished(type, task) { function markFinished(type, task) {
debugTasks(`markFinished: ${debugObject(task.data)}`);
counters[type] -= 1; counters[type] -= 1;
taskQueue.emit(`finished:${type}`, task.data); taskQueue.emit(`finished:${type}`, task.data);
} }
function markSuccess(type, task) { function markSuccess(type, task) {
debugTasks(`markSuccess: ${debugObject(task.data)}`);
markFinished(type, task); markFinished(type, task);
taskQueue.emit(`success:${type}`, task.data); taskQueue.emit(`success:${type}`, task.data);
} }
function markFailed(type, task) { function markFailed(type, task) {
debugTasks(`markFailed: ${debugObject(task.data)}`);
markFinished(type, task); markFinished(type, task);
taskQueue.emit(`failed:${type}`, task.data); taskQueue.emit(`failed:${type}`, task.data);
} }
function markQueueRunning(type) { function markQueueRunning(type) {
debugLoop("markQueueRunning");
taskQueue.emit(`queueRunning:${type}`); taskQueue.emit(`queueRunning:${type}`);
running[type] = true; running[type] = true;
} }
function markQueueDrained(type) { function markQueueDrained(type) {
debugLoop("markQueueDrained");
taskQueue.emit(`queueDrained:${type}`); taskQueue.emit(`queueDrained:${type}`);
running[type] = false; running[type] = false;
} }
@ -128,6 +145,8 @@ module.exports = function createTaskQueue(options) {
throw new TaskQueueError("No such task type exists.") throw new TaskQueueError("No such task type exists.")
} }
debugTasks(`Queueing new task for '${type}': ${debugObject(data)}`);
let resolveFunc, rejectFunc; let resolveFunc, rejectFunc;
let deferredPromise = new Promise((resolve, reject) => { let deferredPromise = new Promise((resolve, reject) => {
resolveFunc = resolve; resolveFunc = resolve;
@ -151,11 +170,16 @@ module.exports = function createTaskQueue(options) {
throw new TaskQueueError("No such task type exists.") throw new TaskQueueError("No such task type exists.")
} }
debugLoop(`awaitDrained requested for '${type}'`);
if (tasks[type].length === 0) { if (tasks[type].length === 0) {
debugLoop(`Queue for '${type}' is already drained`);
return; return;
} else { } else {
debugLoop(`Returning awaitDrained Promise for '${type}'`);
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
this.on(`queueDrained:type`, () => { this.on(`queueDrained:${type}`, () => {
debugLoop(`Resolving awaitDrained Promise for '${type}'`);
resolve(); resolve();
}) })
}) })

Loading…
Cancel
Save