From b3ece538e39ed838aba9c743240a57f290fa60f9 Mon Sep 17 00:00:00 2001 From: Sven Slootweg Date: Sat, 2 Apr 2016 02:17:19 +0200 Subject: [PATCH] Add awaitDrained method --- README.md | 12 ++++++++++++ lib/index.js | 30 +++++++++++++++++++++++++----- src/index.js | 30 ++++++++++++++++++++++++------ 3 files changed, 61 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index af5cfa9..09e4fa1 100644 --- a/README.md +++ b/README.md @@ -118,6 +118,18 @@ Adds a task to the queue for the given `type`. Note that this function will __return a Promise__, passing through the result from the task handler. If the Promise from the task handler resolves, then the one returned from this function will resolve with the same value. If the Promise from the task handler is rejected, this one will also reject, with the same error. +### queue.awaitDrained(type) + +Returns a Promise that will resolve when the task queue has run out of tasks for a given `type`. + +* __type__: The name of the task type, as specified in `queue.define`. + +This is useful for, for example, complex multi-operation build processes, where you want to wait for all existing tasks to finish before moving on. + +__Caution:__ The returned Promise will only resolve __exactly once__, as soon as the amount of pending tasks reaches 0 - and since the queue cannot distinguish between the origin of tasks, this function will only be useful in cases without concurrency. It will also not work correctly if you add tasks asynchronously and don't handle your asynchronous sequences very carefully. + +In short; only use this method if you're very certain that you fully understand - and can predict - the execution order of your (asynchronous) code. + ### Events All of these events are emitted on the `queue` object. Where you see `$type` in an event name, this will be replaced with the task type that it occurred for. For example, for an `apiRequest` task type, you might see a `failed:apiRequest` event. diff --git a/lib/index.js b/lib/index.js index 0a75306..ba55549 100644 --- a/lib/index.js +++ b/lib/index.js @@ -112,10 +112,15 @@ module.exports = function createTaskQueue(options) { var taskQueue = extend(new events.EventEmitter(), { define: function define(type, handler, options) { + if (handlers[type] != null) { + throw new TaskQueueError("The '" + type + "' task type already exists."); + } + handlers[type] = handler; taskOptions[type] = defaultValue(options, {}); counters[type] = 0; running[type] = false; + tasks[type] = []; }, push: function push(type, data) { return Promise.try(function () { @@ -130,10 +135,6 @@ module.exports = function createTaskQueue(options) { rejectFunc = reject; }); - if (tasks[type] == null) { - tasks[type] = []; - } - tasks[type].push({ data: data, resolve: resolveFunc, @@ -144,8 +145,27 @@ module.exports = function createTaskQueue(options) { return deferredPromise; }); + }, + awaitDrained: function awaitDrained(type) { + var _this = this; + + return Promise.try(function () { + if (handlers[type] == null) { + throw new TaskQueueError("No such task type exists."); + } + + if (tasks[type].length === 0) { + return; + } else { + return new Promise(function (resolve, reject) { + _this.on("queueDrained:type", function () { + resolve(); + }); + }); + } + }); } }); return taskQueue; -}; \ No newline at end of file +}; diff --git a/src/index.js b/src/index.js index a6d325a..c9b6c41 100644 --- a/src/index.js +++ b/src/index.js @@ -112,10 +112,15 @@ module.exports = function createTaskQueue(options) { let taskQueue = extend(new events.EventEmitter(), { define: function(type, handler, options) { + if (handlers[type] != null) { + throw new TaskQueueError(`The '${type}' task type already exists.`) + } + handlers[type] = handler; taskOptions[type] = defaultValue(options, {}); counters[type] = 0; running[type] = false; + tasks[type] = []; }, push: function(type, data) { return Promise.try(() => { @@ -128,11 +133,7 @@ module.exports = function createTaskQueue(options) { resolveFunc = resolve; rejectFunc = reject; }); - - if (tasks[type] == null) { - tasks[type] = []; - } - + tasks[type].push({ data: data, resolve: resolveFunc, @@ -143,8 +144,25 @@ module.exports = function createTaskQueue(options) { return deferredPromise; }) + }, + awaitDrained: function(type) { + return Promise.try(() => { + if (handlers[type] == null) { + throw new TaskQueueError("No such task type exists.") + } + + if (tasks[type].length === 0) { + return; + } else { + return new Promise((resolve, reject) => { + this.on(`queueDrained:type`, () => { + resolve(); + }) + }) + } + }) } }); return taskQueue; -} \ No newline at end of file +}