master
Sven Slootweg 8 years ago
parent 0aa2422085
commit 209b45625f

@ -0,0 +1,157 @@
# promise-task-queue
A task queue, with:
* __First-class support for Promises__ (any Promises/A+ compatible implementation, including Bluebird, ES6, Q, ...)
* __Concurrency control__
* __Rate-limiting__ (explained further below)
* __Many event hooks__, for easy metrics and reporting
## License
[WTFPL](http://www.wtfpl.net/txt/copying/) or [CC0](https://creativecommons.org/publicdomain/zero/1.0/), whichever you prefer. A donation and/or attribution are appreciated, but not required.
## Donate
My income consists largely of donations for my projects. If this module is useful to you, consider [making a donation](http://cryto.net/~joepie91/donate.html)!
You can donate using Bitcoin, PayPal, Flattr, cash-in-mail, SEPA transfers, and pretty much anything else.
## Contributing
Pull requests welcome. Please make sure your modifications are in line with the overall code style, and ensure that you're editing the files in `src/`, not those in `lib/`.
Build tool of choice is `gulp`; simply run `gulp` while developing, and it will watch for changes.
Be aware that by making a pull request, you agree to release your modifications under the licenses stated above.
## Usage
A simple usage example:
```javascript
var Promise = require("bluebird");
var bhttp = require("bhttp");
var taskQueue = require("promise-task-queue");
var queue = taskQueue();
var failedRequests = 0;
queue.on("failed:apiRequest", function(task) {
failedRequests += 1;
});
queue.define("apiRequest", function(task) {
return Promise.try(function() {
return bhttp.get("http://api.example.com/users/" + task.username, {decodeJSON: true});
}).then(function(response) {
return response.body;
});
}, {
concurrency: 2
});
Promise.try(function() {
/* The following queues up the actual task. Note how it returns a Promise! */
return queue.push("apiRequest", {username: "joepie91"});
}).then(function(jsonResponse) {
console.log("This user has " + jsonResponse.repositoryCount + " repositories.");
})
```
This example shows a task for making an API request using [`bhttp`](https://www.npmjs.com/package/bhttp), with a maximum concurrency of two tasks running at the same time. It also demonstrates one of the event types.
## Task mechanics
You can define any number of task types. Each task type will have a single queue, with certain optional limits. The specified `handler` determines the task logic, and will be executed for each added task, but only when that task starts.
A 'task' is a plain object, pushed to the queue for a particular task type using `queue.push`. A task is deemed to have 'finished' when it either returns synchronously, or when the Promise it returns has resolved or rejected.
Tasks will execute immediately if allowed by the configured limits, or queue up if not.
## Concurrency control vs. Rate-limiting
The difference between the two, summarized:
* __Concurrency control:__ Controlling how many tasks you can run *at any given moment in time*.
* __Rate-limiting:__ Controlling how many tasks can be started within a certain *amount* of time.
This module supports both, even combined - in which case both conditions must be satisfied.
## Rate-limiting (intervals)
This queue does *not* implement rate-limiting of the "X tasks per Y amount of time" type. Instead, it uses intervals between tasks. This is not without reason, however, and it will almost certainly work for your usecase. If you're not interested in rate-limiting, you'll probably want to skip this section.
The "X tasks per Y amount of time" type of rate-limiting will usually result in a 'burst' of tasks being executed at the same time, followed by a long waiting period. However, in many cases, this isn't what you want at all - and for this reason and to reduce implementation complexity, `promise-task-queue` implements a 'smoothed out' version of rate-limiting instead, where there is a minimum interval between each task.
Say that you make a request to a particular API on behalf of your users, and the API limits you to 30 requests per second. When using `promise-task-queue`, you would specify the `interval` as `2` seconds, because `60 / 30 == 2`. When you are going over capacity, this will cause a usually short delay for your users - best case, they would be looking at a 2 second delay for their request, if they'd made it right after the average rate limit was hit.
When using a 'bursty' model of rate-limiting, once you go over capacity, the best case is that a user in that same scenario would have to wait *an entire minute* for the next 'batch' of API requests to become available. By 'smoothing out' tasks instead, you avoid this scenario, and your application becomes 'just a bit slow' rather than 'broken', as far as the user is concerned.
Another reason is the aforementioned implementation complexity - one use might want a limit per second, another user might want a limit per minute, then per hour, and so on. This would require implementation of a relatively complex time specification API... and it's much simpler to simply let you specify an interval in seconds, which accommodates all of those usecases. This makes it simpler for everybody involved.
## API
### taskQueue()
Creates a new task queue.
### queue.define(type, handler, [options])
Defines a new task type.
* __type__: The name of the task type, which is used to refer to it in `queue.push` calls and event names.
* __handler__: The function to run when a task of this type starts. This is where the actual task logic goes, and it *must* return either a Promise or a synchronous value. It will receive a single argument, containing the `data` for the task.
* __options__: *Optional.* An object with options, all of them optional.
* __concurrency__: The maximum amount of tasks of this type that can run at the same time.
* __interval__: The rate-limiting interval for this task type, *in seconds*. See the explanation in the "Rate-limiting" section for more information.
Keep in mind that if you reject/throw an error from your task handler, it *must* be an `Error` object (or descendant thereof). This is true for Promises and error-handling in general, but is worth repeating. If you're having trouble creating useful errors, try out the [`create-error` module](https://www.npmjs.com/package/create-error).
### queue.push(type, data)
Adds a task to the queue for the given `type`.
* __type__: The name of the task type, as specified in `queue.define`.
* __data__: The task data to pass on - this will be provided to the task handler as the first (and only) callback argument.
Note that this function will __return a Promise__, passing through the result from the task handler. If the Promise from the task handler resolves, then the one returned from this function will resolve with the same value. If the Promise from the task handler is rejected, this one will also reject, with the same error.
### Events
All of these events are emitted on the `queue` object. Where you see `$type` in an event name, this will be replaced with the task type that it occurred for. For example, for an `apiRequest` task type, you might see a `failed:apiRequest` event.
__Important:__ Keep in mind that for the `finished`, `success` and `failed` events, you usually want to use the Promise that is returned from `queue.push` instead - these events exists primarily for purposes like keeping metrics, and trying to use them in the regular task-queueing process will make your code a mess.
#### 'started:$type'
Emitted when a task is started. The first (and only) argument to the event handler will be the `data` for the task.
#### 'finished:$type'
Emitted when a task has finished, regardless of whether it was successful. The first (and only) argument to the event handler will be the `data` for the task.
#### 'success:$type'
Emitted when a task has finished, but *only* when it was successful - ie. the returned Promise *resolved*. The first (and only) argument to the event handler will be the `data` for the task.
#### 'failed:$type'
Emitted when a task has finished, but *only* when it failed - ie. the returned Promise *rejected*. The first (and only) argument to the event handler will be the `data` for the task.
#### 'queueRunning:$type'
Emitted when the queue for this task type starts running, while it was previously drained (ie. empty). No arguments are passed to the event handler.
#### 'queueDrained:$type'
Emitted when the queue for this task type has drained (ie. ran out of tasks). No arguments are passed to the event handler.
#### 'delayed:$type'
Emitted when a task has been delayed because of the `interval` rate-limit. Note that this event may currently be emitted *many* times if many tasks are queued.
#### 'concurrencyReached:$type'
Emitted when a task has been queued up because of the `concurrency` limit. Can be useful to detect when your queue is backing up.

@ -0,0 +1,24 @@
var gulp = require('gulp');
var gutil = require('gulp-util');
var babel = require('gulp-babel');
var cache = require('gulp-cached');
var remember = require('gulp-remember');
var plumber = require('gulp-plumber');
var source = ["src/**/*.js"]
gulp.task('babel', function() {
return gulp.src(source)
.pipe(plumber())
.pipe(cache("babel"))
.pipe(babel({presets: ["es2015"]}).on('error', gutil.log)).on('data', gutil.log)
.pipe(remember("babel"))
.pipe(gulp.dest("lib/"));
});
gulp.task('watch', function () {
gulp.watch(source, ['babel']);
});
gulp.task('default', ['babel', 'watch']);

@ -0,0 +1,3 @@
'use strict';
module.exports = require("./lib");

@ -0,0 +1,151 @@
'use strict';
var Promise = require("bluebird");
var events = require("events");
var extend = require("extend");
var createError = require("create-error");
var TaskQueueError = createError("TaskQueueError", {
code: "TaskQueueError"
});
function defaultValue(value, defaultVal) {
if (value != null) {
return value;
} else {
return defaultVal;
}
}
module.exports = function createTaskQueue(options) {
var handlers = {};
var taskOptions = {};
var tasks = {};
var counters = {};
var starts = {};
var running = {};
function tryRunTask(type) {
var maxTasks = defaultValue(taskOptions[type].concurrency, Infinity);
var waitTime = remainingInterval(type);
if (tasks[type].length > 0) {
if (waitTime <= 0) {
if (counters[type] < maxTasks) {
if (running[type] === false) {
markQueueRunning(type);
}
runTask(type);
} else {
taskQueue.emit("concurrencyReached:" + type);
}
} else {
taskQueue.emit("delayed:" + type);
setTimeout(function () {
tryRunTask(type);
}, waitTime);
}
} else {
if (running[type] === true) {
markQueueDrained(type);
}
}
}
function remainingInterval(type) {
var taskInterval = defaultValue(taskOptions[type].interval, 0) * 1000;
var lastTask = defaultValue(starts[type], 0);
return lastTask + taskInterval - Date.now();
}
function runTask(type) {
var task = tasks[type].shift();
markStarted(type, task);
Promise.try(function () {
return handlers[type](task.data);
}).then(function (result) {
markSuccess(type, task);
task.resolve(result);
}).catch(function (err) {
markFailed(type, task);
task.reject(err);
}).then(function () {
tryRunTask(type);
});
}
function markStarted(type, task) {
counters[type] += 1;
starts[type] = Date.now();
taskQueue.emit("started:" + type, task.data);
}
function markFinished(type, task) {
counters[type] -= 1;
taskQueue.emit("finished:" + type, task.data);
}
function markSuccess(type, task) {
markFinished(type, task);
taskQueue.emit("success:" + type, task.data);
}
function markFailed(type, task) {
markFinished(type, task);
taskQueue.emit("failed:" + type, task.data);
}
function markQueueRunning(type) {
taskQueue.emit("queueRunning:" + type);
running[type] = true;
}
function markQueueDrained(type) {
taskQueue.emit("queueDrained:" + type);
running[type] = false;
}
var taskQueue = extend(new events.EventEmitter(), {
define: function define(type, handler, options) {
handlers[type] = handler;
taskOptions[type] = defaultValue(options, {});
counters[type] = 0;
running[type] = false;
},
push: function push(type, data) {
return Promise.try(function () {
if (handlers[type] == null) {
throw new TaskQueueError("No such task type exists.");
}
var resolveFunc = void 0,
rejectFunc = void 0;
var deferredPromise = new Promise(function (resolve, reject) {
resolveFunc = resolve;
rejectFunc = reject;
});
if (tasks[type] == null) {
tasks[type] = [];
}
tasks[type].push({
data: data,
resolve: resolveFunc,
reject: rejectFunc
});
tryRunTask(type);
return deferredPromise;
});
}
});
return taskQueue;
};

@ -0,0 +1,34 @@
{
"name": "promise-task-queue",
"version": "1.0.0",
"description": "A configurable task queue that supports Promises.",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"repository": {
"type": "git",
"url": "git://github.com/joepie91/node-promise-task-queue"
},
"keywords": [
"promises",
"async",
"queue"
],
"author": "Sven Slootweg",
"license": "WTFPL",
"dependencies": {
"bluebird": "^3.3.3",
"create-error": "^0.3.1",
"extend": "^3.0.0"
},
"devDependencies": {
"babel-preset-es2015": "^6.6.0",
"gulp": "^3.9.1",
"gulp-babel": "^6.1.2",
"gulp-cached": "^1.1.0",
"gulp-plumber": "^1.1.0",
"gulp-remember": "^0.3.0",
"gulp-util": "^3.0.7"
}
}

@ -0,0 +1,150 @@
'use strict';
const Promise = require("bluebird");
const events = require("events");
const extend = require("extend");
const createError = require("create-error");
const TaskQueueError = createError("TaskQueueError", {
code: "TaskQueueError"
});
function defaultValue(value, defaultVal) {
if (value != null) {
return value;
} else {
return defaultVal;
}
}
module.exports = function createTaskQueue(options) {
let handlers = {};
let taskOptions = {};
let tasks = {};
let counters = {};
let starts = {};
let running = {};
function tryRunTask(type) {
let maxTasks = defaultValue(taskOptions[type].concurrency, Infinity);
let waitTime = remainingInterval(type);
if (tasks[type].length > 0) {
if (waitTime <= 0) {
if (counters[type] < maxTasks) {
if (running[type] === false) {
markQueueRunning(type);
}
runTask(type);
} else {
taskQueue.emit(`concurrencyReached:${type}`);
}
} else {
taskQueue.emit(`delayed:${type}`);
setTimeout(() => {
tryRunTask(type);
}, waitTime);
}
} else {
if (running[type] === true) {
markQueueDrained(type);
}
}
}
function remainingInterval(type) {
let taskInterval = defaultValue(taskOptions[type].interval, 0) * 1000;
let lastTask = defaultValue(starts[type], 0);
return (lastTask + taskInterval) - Date.now();
}
function runTask(type) {
let task = tasks[type].shift();
markStarted(type, task);
Promise.try(() => {
return handlers[type](task.data);
}).then((result) => {
markSuccess(type, task);
task.resolve(result);
}).catch((err) => {
markFailed(type, task);
task.reject(err);
}).then(function(){
tryRunTask(type);
});
}
function markStarted(type, task) {
counters[type] += 1;
starts[type] = Date.now();
taskQueue.emit(`started:${type}`, task.data);
}
function markFinished(type, task) {
counters[type] -= 1;
taskQueue.emit(`finished:${type}`, task.data);
}
function markSuccess(type, task) {
markFinished(type, task);
taskQueue.emit(`success:${type}`, task.data);
}
function markFailed(type, task) {
markFinished(type, task);
taskQueue.emit(`failed:${type}`, task.data);
}
function markQueueRunning(type) {
taskQueue.emit(`queueRunning:${type}`);
running[type] = true;
}
function markQueueDrained(type) {
taskQueue.emit(`queueDrained:${type}`);
running[type] = false;
}
let taskQueue = extend(new events.EventEmitter(), {
define: function(type, handler, options) {
handlers[type] = handler;
taskOptions[type] = defaultValue(options, {});
counters[type] = 0;
running[type] = false;
},
push: function(type, data) {
return Promise.try(() => {
if (handlers[type] == null) {
throw new TaskQueueError("No such task type exists.")
}
let resolveFunc, rejectFunc;
let deferredPromise = new Promise((resolve, reject) => {
resolveFunc = resolve;
rejectFunc = reject;
});
if (tasks[type] == null) {
tasks[type] = [];
}
tasks[type].push({
data: data,
resolve: resolveFunc,
reject: rejectFunc
});
tryRunTask(type);
return deferredPromise;
})
}
});
return taskQueue;
}

@ -0,0 +1,66 @@
'use strict';
const Promise = require("bluebird");
const taskQueue = require("./");
let queue = taskQueue();
queue.define("build", function(data) {
return Promise.try(() => {
console.log("Building with data:", data);
return Promise.delay((Math.random() * 500) + 100)
}).then(function() {
if (Math.random() < 0.25) {
throw new Error("Dummy failure!");
} else {
return `Success! ${data.id}`;
}
});
}, {
concurrency: 2,
interval: 2
});
queue.on("started:build", function(task) {
console.log(`Started task ID ${task.id}...`);
});
queue.on("success:build", function(task) {
console.log(`Succeeded for ID ${task.id}`);
});
queue.on("failed:build", function(task) {
console.log(`Failed for ID ${task.id}`);
});
queue.on("finished:build", function(task) {
console.log(`Completed for ID ${task.id}`);
});
queue.on("concurrencyReached:build", function() {
//console.log(`!! Reached concurrency limit.`);
});
queue.on("delayed:build", function() {
//console.log(`!! Delayed due to interval.`);
});
queue.on("queueRunning:build", function() {
console.log(`## Queue started running...`);
});
queue.on("queueDrained:build", function() {
console.log(`## Queue drained!`);
});
Promise.map((new Array(20)), function(_, index) {
return Promise.try(function() {
return queue.push("build", { id: index });
}).then(function(result) {
return "[SUCCESS] " + result;
}).catch(function(err) {
return "[ERROR ] " + err.toString();
});
}).each(function(message) {
console.log(message);
});
Loading…
Cancel
Save