Compare commits

...

4 Commits

Author SHA1 Message Date
Sven Slootweg 47cdc9ff64 1.1.1 8 years ago
Sven Slootweg e77f51891b Fix API documentation example to state requests/minute, rather than requests/second 8 years ago
Sven Slootweg 20d2eaa463 1.1.0 8 years ago
Sven Slootweg b3ece538e3 Add awaitDrained method 8 years ago

@ -85,7 +85,7 @@ This queue does *not* implement rate-limiting of the "X tasks per Y amount of ti
The "X tasks per Y amount of time" type of rate-limiting will usually result in a 'burst' of tasks being executed at the same time, followed by a long waiting period. However, in many cases, this isn't what you want at all - and for this reason and to reduce implementation complexity, `promise-task-queue` implements a 'smoothed out' version of rate-limiting instead, where there is a minimum interval between each task.
Say that you make a request to a particular API on behalf of your users, and the API limits you to 30 requests per second. When using `promise-task-queue`, you would specify the `interval` as `2` seconds, because `60 / 30 == 2`. When you are going over capacity, this will cause a usually short delay for your users - best case, they would be looking at a 2 second delay for their request, if they'd made it right after the average rate limit was hit.
Say that you make a request to a particular API on behalf of your users, and the API limits you to 30 requests per minute. When using `promise-task-queue`, you would specify the `interval` as `2` seconds, because `60 / 30 == 2`. When you are going over capacity, this will cause a usually short delay for your users - best case, they would be looking at a 2 second delay for their request, if they'd made it right after the average rate limit was hit.
When using a 'bursty' model of rate-limiting, once you go over capacity, the best case is that a user in that same scenario would have to wait *an entire minute* for the next 'batch' of API requests to become available. By 'smoothing out' tasks instead, you avoid this scenario, and your application becomes 'just a bit slow' rather than 'broken', as far as the user is concerned.
@ -118,6 +118,18 @@ Adds a task to the queue for the given `type`.
Note that this function will __return a Promise__, passing through the result from the task handler. If the Promise from the task handler resolves, then the one returned from this function will resolve with the same value. If the Promise from the task handler is rejected, this one will also reject, with the same error.
### queue.awaitDrained(type)
Returns a Promise that will resolve when the task queue has run out of tasks for a given `type`.
* __type__: The name of the task type, as specified in `queue.define`.
This is useful for, for example, complex multi-operation build processes, where you want to wait for all existing tasks to finish before moving on.
__Caution:__ The returned Promise will only resolve __exactly once__, as soon as the amount of pending tasks reaches 0 - and since the queue cannot distinguish between the origin of tasks, this function will only be useful in cases without concurrency. It will also not work correctly if you add tasks asynchronously and don't handle your asynchronous sequences very carefully.
In short; only use this method if you're very certain that you fully understand - and can predict - the execution order of your (asynchronous) code.
### Events
All of these events are emitted on the `queue` object. Where you see `$type` in an event name, this will be replaced with the task type that it occurred for. For example, for an `apiRequest` task type, you might see a `failed:apiRequest` event.

@ -112,10 +112,15 @@ module.exports = function createTaskQueue(options) {
var taskQueue = extend(new events.EventEmitter(), {
define: function define(type, handler, options) {
if (handlers[type] != null) {
throw new TaskQueueError("The '" + type + "' task type already exists.");
}
handlers[type] = handler;
taskOptions[type] = defaultValue(options, {});
counters[type] = 0;
running[type] = false;
tasks[type] = [];
},
push: function push(type, data) {
return Promise.try(function () {
@ -130,10 +135,6 @@ module.exports = function createTaskQueue(options) {
rejectFunc = reject;
});
if (tasks[type] == null) {
tasks[type] = [];
}
tasks[type].push({
data: data,
resolve: resolveFunc,
@ -144,8 +145,27 @@ module.exports = function createTaskQueue(options) {
return deferredPromise;
});
},
awaitDrained: function awaitDrained(type) {
var _this = this;
return Promise.try(function () {
if (handlers[type] == null) {
throw new TaskQueueError("No such task type exists.");
}
if (tasks[type].length === 0) {
return;
} else {
return new Promise(function (resolve, reject) {
_this.on("queueDrained:type", function () {
resolve();
});
});
}
});
}
});
return taskQueue;
};
};

@ -1,6 +1,6 @@
{
"name": "promise-task-queue",
"version": "1.0.0",
"version": "1.1.1",
"description": "A configurable task queue that supports Promises.",
"main": "index.js",
"scripts": {

@ -112,10 +112,15 @@ module.exports = function createTaskQueue(options) {
let taskQueue = extend(new events.EventEmitter(), {
define: function(type, handler, options) {
if (handlers[type] != null) {
throw new TaskQueueError(`The '${type}' task type already exists.`)
}
handlers[type] = handler;
taskOptions[type] = defaultValue(options, {});
counters[type] = 0;
running[type] = false;
tasks[type] = [];
},
push: function(type, data) {
return Promise.try(() => {
@ -128,11 +133,7 @@ module.exports = function createTaskQueue(options) {
resolveFunc = resolve;
rejectFunc = reject;
});
if (tasks[type] == null) {
tasks[type] = [];
}
tasks[type].push({
data: data,
resolve: resolveFunc,
@ -143,8 +144,25 @@ module.exports = function createTaskQueue(options) {
return deferredPromise;
})
},
awaitDrained: function(type) {
return Promise.try(() => {
if (handlers[type] == null) {
throw new TaskQueueError("No such task type exists.")
}
if (tasks[type].length === 0) {
return;
} else {
return new Promise((resolve, reject) => {
this.on(`queueDrained:type`, () => {
resolve();
})
})
}
})
}
});
return taskQueue;
}
}

Loading…
Cancel
Save