Compare commits

...

6 Commits

@ -118,15 +118,33 @@ Adds a task to the queue for the given `type`.
Note that this function will __return a Promise__, passing through the result from the task handler. If the Promise from the task handler resolves, then the one returned from this function will resolve with the same value. If the Promise from the task handler is rejected, this one will also reject, with the same error.
### queue.drain(type)
Drains (ie. empties) the queue for the given `type`. Note that this __will not__ try to stop or 'cancel' *running* tasks; it will simply remove the *upcoming* tasks that are still in the queue.
* __type__: The name of the task type, as specified in `queue.define`.
### queue.awaitDrained(type)
Returns a Promise that will resolve when the task queue has run out of tasks for a given `type`.
Returns a Promise that will resolve when the task queue has run out of tasks for a given `type`. Some of the previously queued tasks may still be running, however - this simply signals that there are no *upcoming* tasks scheduled anymore.
* __type__: The name of the task type, as specified in `queue.define`.
This can be useful for keeping metrics of the queue status.
__Caution:__ The returned Promise will only resolve __exactly once__, as soon as the amount of pending tasks reaches 0 and the queue tries to run the next task - and since the queue cannot distinguish between the origin of tasks, this function will only be useful in cases without concurrency. It will also not work correctly if you add tasks asynchronously and don't handle your asynchronous sequences very carefully.
In short; only use this method if you're very certain that you fully understand - and can predict - the execution order of your (asynchronous) code.
### queue.awaitCompleted(type)
Returns a Promise that will resolve when the task queue has run out of tasks for a given `type`, and all the running tasks have finished.
* __type__: The name of the task type, as specified in `queue.define`.
This is useful for, for example, complex multi-operation build processes, where you want to wait for all existing tasks to finish before moving on.
__Caution:__ The returned Promise will only resolve __exactly once__, as soon as the amount of pending tasks reaches 0 - and since the queue cannot distinguish between the origin of tasks, this function will only be useful in cases without concurrency. It will also not work correctly if you add tasks asynchronously and don't handle your asynchronous sequences very carefully.
__Caution:__ The returned Promise will only resolve __exactly once__, as soon as the last running task finishes and there are no tasks left in the queue - and since the queue cannot distinguish between the origin of tasks, this function will only be useful in cases without concurrency. It will also not work correctly if you add tasks asynchronously and don't handle your asynchronous sequences very carefully.
In short; only use this method if you're very certain that you fully understand - and can predict - the execution order of your (asynchronous) code.
@ -158,7 +176,11 @@ Emitted when the queue for this task type starts running, while it was previousl
#### 'queueDrained:$type'
Emitted when the queue for this task type has drained (ie. ran out of tasks). No arguments are passed to the event handler.
Emitted when the queue for this task type has drained (ie. ran out of queued tasks). Some of the tasks may still be running, however. No arguments are passed to the event handler.
#### 'queueCompleted:$type'
Emitted when the queue for this task type has fully completed (ie. the queue has drained, and all running tasks have finished). No arguments are passed to the event handler.
#### 'delayed:$type'
@ -166,4 +188,14 @@ Emitted when a task has been delayed because of the `interval` rate-limit. Note
#### 'concurrencyReached:$type'
Emitted when a task has been queued up because of the `concurrency` limit. Can be useful to detect when your queue is backing up.
Emitted when a task has been queued up because of the `concurrency` limit. Can be useful to detect when your queue is backing up.
## Changelog
* __1.2.0:__ Various changes:
* Added `awaitCompleted` and `drain` methods, and `queueCompleted` event.
* Fixed the `awaitDrained` Promise never resolving.
* Added debugging statements in the code.
* __1.1.1:__ Fixed typo in the example; unit in the API rate limit should've been 'per minute', not 'per second'.
* __1.1.0:__ Added `awaitDrained` method.
* __1.0.0:__ Initial release.

@ -4,6 +4,13 @@ var Promise = require("bluebird");
var events = require("events");
var extend = require("extend");
var createError = require("create-error");
var debugLoop = require("debug")("promise-task-queue:loop");
var debugTasks = require("debug")("promise-task-queue:tasks");
var util = require("util");
function debugObject(obj) {
return util.inspect(obj, { depth: null, colors: true }).replace(/\n|\r/g, "");
}
var TaskQueueError = createError("TaskQueueError", {
code: "TaskQueueError"
@ -33,14 +40,17 @@ module.exports = function createTaskQueue(options) {
if (waitTime <= 0) {
if (counters[type] < maxTasks) {
if (running[type] === false) {
debugLoop("Queue for '" + type + "' is now running");
markQueueRunning(type);
}
runTask(type);
} else {
debugLoop("Reached concurrency for '" + type + "'");
taskQueue.emit("concurrencyReached:" + type);
}
} else {
debugLoop("Registering queue delay for '" + type + "'");
taskQueue.emit("delayed:" + type);
setTimeout(function () {
@ -49,6 +59,7 @@ module.exports = function createTaskQueue(options) {
}
} else {
if (running[type] === true) {
debugLoop("Queue for '" + type + "' has now stopped");
markQueueDrained(type);
}
}
@ -69,8 +80,8 @@ module.exports = function createTaskQueue(options) {
Promise.try(function () {
return handlers[type](task.data);
}).then(function (result) {
markSuccess(type, task);
task.resolve(result);
markSuccess(type, task);
}).catch(function (err) {
markFailed(type, task);
task.reject(err);
@ -80,36 +91,54 @@ module.exports = function createTaskQueue(options) {
}
function markStarted(type, task) {
debugTasks("markStarted (" + type + "): " + debugObject(task.data));
counters[type] += 1;
starts[type] = Date.now();
taskQueue.emit("started:" + type, task.data);
}
function markFinished(type, task) {
debugTasks("markFinished (" + type + "): " + debugObject(task.data));
counters[type] -= 1;
taskQueue.emit("finished:" + type, task.data);
checkCompletion(type);
}
function markSuccess(type, task) {
debugTasks("markSuccess (" + type + "): " + debugObject(task.data));
markFinished(type, task);
taskQueue.emit("success:" + type, task.data);
}
function markFailed(type, task) {
debugTasks("markFailed (" + type + "): " + debugObject(task.data));
markFinished(type, task);
taskQueue.emit("failed:" + type, task.data);
}
function markQueueRunning(type) {
debugLoop("markQueueRunning (" + type + ")");
taskQueue.emit("queueRunning:" + type);
running[type] = true;
}
function markQueueDrained(type) {
debugLoop("markQueueDrained (" + type + ")");
taskQueue.emit("queueDrained:" + type);
running[type] = false;
}
function markQueueCompleted(type) {
debugLoop("markQueueCompleted (" + type + ")");
taskQueue.emit("queueCompleted:" + type);
}
function checkCompletion(type) {
if (tasks[type].length === 0 && counters[type] === 0) {
markQueueCompleted(type);
}
}
var taskQueue = extend(new events.EventEmitter(), {
define: function define(type, handler, options) {
if (handlers[type] != null) {
@ -128,6 +157,8 @@ module.exports = function createTaskQueue(options) {
throw new TaskQueueError("No such task type exists.");
}
debugTasks("Queueing new task for '" + type + "': " + debugObject(data));
var resolveFunc = void 0,
rejectFunc = void 0;
var deferredPromise = new Promise(function (resolve, reject) {
@ -146,6 +177,14 @@ module.exports = function createTaskQueue(options) {
return deferredPromise;
});
},
drain: function drain(type) {
if (handlers[type] == null) {
throw new TaskQueueError("No such task type exists.");
}
debugTasks("Draining tasks for '" + type + "'");
tasks[type] = [];
},
awaitDrained: function awaitDrained(type) {
var _this = this;
@ -154,11 +193,40 @@ module.exports = function createTaskQueue(options) {
throw new TaskQueueError("No such task type exists.");
}
debugLoop("awaitDrained requested for '" + type + "'");
if (tasks[type].length === 0) {
debugLoop("Queue for '" + type + "' is already drained");
return;
} else {
debugLoop("Returning awaitDrained Promise for '" + type + "'");
return new Promise(function (resolve, reject) {
_this.on("queueDrained:" + type, function () {
debugLoop("Resolving awaitDrained Promise for '" + type + "'");
resolve();
});
});
}
});
},
awaitCompleted: function awaitCompleted(type) {
var _this2 = this;
return Promise.try(function () {
if (handlers[type] == null) {
throw new TaskQueueError("No such task type exists.");
}
debugLoop("awaitCompleted requested for '" + type + "'");
if (tasks[type].length === 0 && counters[type] === 0) {
debugLoop("Queue for '" + type + "' is already completed");
return;
} else {
debugLoop("Returning awaitCompleted Promise for '" + type + "'");
return new Promise(function (resolve, reject) {
_this.on("queueDrained:type", function () {
_this2.on("queueCompleted:" + type, function () {
debugLoop("Resolving awaitCompleted Promise for '" + type + "'");
resolve();
});
});
@ -168,4 +236,4 @@ module.exports = function createTaskQueue(options) {
});
return taskQueue;
};
};

@ -1,6 +1,6 @@
{
"name": "promise-task-queue",
"version": "1.1.1",
"version": "1.2.0",
"description": "A configurable task queue that supports Promises.",
"main": "index.js",
"scripts": {
@ -20,6 +20,7 @@
"dependencies": {
"bluebird": "^3.3.3",
"create-error": "^0.3.1",
"debug": "^2.2.0",
"extend": "^3.0.0"
},
"devDependencies": {

@ -4,6 +4,13 @@ const Promise = require("bluebird");
const events = require("events");
const extend = require("extend");
const createError = require("create-error");
const debugLoop = require("debug")("promise-task-queue:loop");
const debugTasks = require("debug")("promise-task-queue:tasks");
const util = require("util");
function debugObject(obj) {
return util.inspect(obj, {depth: null, colors: true}).replace(/\n|\r/g, "");
}
const TaskQueueError = createError("TaskQueueError", {
code: "TaskQueueError"
@ -33,14 +40,17 @@ module.exports = function createTaskQueue(options) {
if (waitTime <= 0) {
if (counters[type] < maxTasks) {
if (running[type] === false) {
debugLoop(`Queue for '${type}' is now running`);
markQueueRunning(type);
}
runTask(type);
} else {
debugLoop(`Reached concurrency for '${type}'`);
taskQueue.emit(`concurrencyReached:${type}`);
}
} else {
debugLoop(`Registering queue delay for '${type}'`);
taskQueue.emit(`delayed:${type}`);
setTimeout(() => {
@ -49,6 +59,7 @@ module.exports = function createTaskQueue(options) {
}
} else {
if (running[type] === true) {
debugLoop(`Queue for '${type}' has now stopped`);
markQueueDrained(type);
}
}
@ -69,8 +80,8 @@ module.exports = function createTaskQueue(options) {
Promise.try(() => {
return handlers[type](task.data);
}).then((result) => {
markSuccess(type, task);
task.resolve(result);
markSuccess(type, task);
}).catch((err) => {
markFailed(type, task);
task.reject(err);
@ -80,36 +91,54 @@ module.exports = function createTaskQueue(options) {
}
function markStarted(type, task) {
debugTasks(`markStarted (${type}): ${debugObject(task.data)}`);
counters[type] += 1;
starts[type] = Date.now();
taskQueue.emit(`started:${type}`, task.data);
}
function markFinished(type, task) {
debugTasks(`markFinished (${type}): ${debugObject(task.data)}`);
counters[type] -= 1;
taskQueue.emit(`finished:${type}`, task.data);
checkCompletion(type);
}
function markSuccess(type, task) {
debugTasks(`markSuccess (${type}): ${debugObject(task.data)}`);
markFinished(type, task);
taskQueue.emit(`success:${type}`, task.data);
}
function markFailed(type, task) {
debugTasks(`markFailed (${type}): ${debugObject(task.data)}`);
markFinished(type, task);
taskQueue.emit(`failed:${type}`, task.data);
}
function markQueueRunning(type) {
debugLoop(`markQueueRunning (${type})`);
taskQueue.emit(`queueRunning:${type}`);
running[type] = true;
}
function markQueueDrained(type) {
debugLoop(`markQueueDrained (${type})`);
taskQueue.emit(`queueDrained:${type}`);
running[type] = false;
}
function markQueueCompleted(type) {
debugLoop(`markQueueCompleted (${type})`);
taskQueue.emit(`queueCompleted:${type}`);
}
function checkCompletion(type) {
if (tasks[type].length === 0 && counters[type] === 0) {
markQueueCompleted(type);
}
}
let taskQueue = extend(new events.EventEmitter(), {
define: function(type, handler, options) {
if (handlers[type] != null) {
@ -128,6 +157,8 @@ module.exports = function createTaskQueue(options) {
throw new TaskQueueError("No such task type exists.")
}
debugTasks(`Queueing new task for '${type}': ${debugObject(data)}`);
let resolveFunc, rejectFunc;
let deferredPromise = new Promise((resolve, reject) => {
resolveFunc = resolve;
@ -145,17 +176,52 @@ module.exports = function createTaskQueue(options) {
return deferredPromise;
})
},
drain: function(type) {
if (handlers[type] == null) {
throw new TaskQueueError("No such task type exists.")
}
debugTasks(`Draining tasks for '${type}'`);
tasks[type] = [];
},
awaitDrained: function(type) {
return Promise.try(() => {
if (handlers[type] == null) {
throw new TaskQueueError("No such task type exists.")
}
debugLoop(`awaitDrained requested for '${type}'`);
if (tasks[type].length === 0) {
debugLoop(`Queue for '${type}' is already drained`);
return;
} else {
debugLoop(`Returning awaitDrained Promise for '${type}'`);
return new Promise((resolve, reject) => {
this.on(`queueDrained:${type}`, () => {
debugLoop(`Resolving awaitDrained Promise for '${type}'`);
resolve();
})
})
}
})
},
awaitCompleted: function(type) {
return Promise.try(() => {
if (handlers[type] == null) {
throw new TaskQueueError("No such task type exists.")
}
debugLoop(`awaitCompleted requested for '${type}'`);
if (tasks[type].length === 0 && counters[type] === 0) {
debugLoop(`Queue for '${type}' is already completed`);
return;
} else {
debugLoop(`Returning awaitCompleted Promise for '${type}'`);
return new Promise((resolve, reject) => {
this.on(`queueDrained:type`, () => {
this.on(`queueCompleted:${type}`, () => {
debugLoop(`Resolving awaitCompleted Promise for '${type}'`);
resolve();
})
})

Loading…
Cancel
Save