Add awaitDrained method

master
Sven Slootweg 8 years ago
parent 209b45625f
commit b3ece538e3

@ -118,6 +118,18 @@ Adds a task to the queue for the given `type`.
Note that this function will __return a Promise__, passing through the result from the task handler. If the Promise from the task handler resolves, then the one returned from this function will resolve with the same value. If the Promise from the task handler is rejected, this one will also reject, with the same error.
### queue.awaitDrained(type)
Returns a Promise that will resolve when the task queue has run out of tasks for a given `type`.
* __type__: The name of the task type, as specified in `queue.define`.
This is useful for, for example, complex multi-operation build processes, where you want to wait for all existing tasks to finish before moving on.
__Caution:__ The returned Promise will only resolve __exactly once__, as soon as the amount of pending tasks reaches 0 - and since the queue cannot distinguish between the origin of tasks, this function will only be useful in cases without concurrency. It will also not work correctly if you add tasks asynchronously and don't handle your asynchronous sequences very carefully.
In short; only use this method if you're very certain that you fully understand - and can predict - the execution order of your (asynchronous) code.
### Events
All of these events are emitted on the `queue` object. Where you see `$type` in an event name, this will be replaced with the task type that it occurred for. For example, for an `apiRequest` task type, you might see a `failed:apiRequest` event.

@ -112,10 +112,15 @@ module.exports = function createTaskQueue(options) {
var taskQueue = extend(new events.EventEmitter(), {
define: function define(type, handler, options) {
if (handlers[type] != null) {
throw new TaskQueueError("The '" + type + "' task type already exists.");
}
handlers[type] = handler;
taskOptions[type] = defaultValue(options, {});
counters[type] = 0;
running[type] = false;
tasks[type] = [];
},
push: function push(type, data) {
return Promise.try(function () {
@ -130,10 +135,6 @@ module.exports = function createTaskQueue(options) {
rejectFunc = reject;
});
if (tasks[type] == null) {
tasks[type] = [];
}
tasks[type].push({
data: data,
resolve: resolveFunc,
@ -144,8 +145,27 @@ module.exports = function createTaskQueue(options) {
return deferredPromise;
});
},
awaitDrained: function awaitDrained(type) {
var _this = this;
return Promise.try(function () {
if (handlers[type] == null) {
throw new TaskQueueError("No such task type exists.");
}
if (tasks[type].length === 0) {
return;
} else {
return new Promise(function (resolve, reject) {
_this.on("queueDrained:type", function () {
resolve();
});
});
}
});
}
});
return taskQueue;
};
};

@ -112,10 +112,15 @@ module.exports = function createTaskQueue(options) {
let taskQueue = extend(new events.EventEmitter(), {
define: function(type, handler, options) {
if (handlers[type] != null) {
throw new TaskQueueError(`The '${type}' task type already exists.`)
}
handlers[type] = handler;
taskOptions[type] = defaultValue(options, {});
counters[type] = 0;
running[type] = false;
tasks[type] = [];
},
push: function(type, data) {
return Promise.try(() => {
@ -128,11 +133,7 @@ module.exports = function createTaskQueue(options) {
resolveFunc = resolve;
rejectFunc = reject;
});
if (tasks[type] == null) {
tasks[type] = [];
}
tasks[type].push({
data: data,
resolve: resolveFunc,
@ -143,8 +144,25 @@ module.exports = function createTaskQueue(options) {
return deferredPromise;
})
},
awaitDrained: function(type) {
return Promise.try(() => {
if (handlers[type] == null) {
throw new TaskQueueError("No such task type exists.")
}
if (tasks[type].length === 0) {
return;
} else {
return new Promise((resolve, reject) => {
this.on(`queueDrained:type`, () => {
resolve();
})
})
}
})
}
});
return taskQueue;
}
}

Loading…
Cancel
Save