Improve debugging, and implement a separate queueCompleted and awaitCompleted API

master
Sven Slootweg 8 years ago
parent 27195f54ab
commit 01771c794b

@ -120,13 +120,25 @@ Note that this function will __return a Promise__, passing through the result fr
### queue.awaitDrained(type)
Returns a Promise that will resolve when the task queue has run out of tasks for a given `type`.
Returns a Promise that will resolve when the task queue has run out of tasks for a given `type`. Some of the previously queued tasks may still be running, however - this simply signals that there are no *upcoming* tasks scheduled anymore.
* __type__: The name of the task type, as specified in `queue.define`.
This can be useful for keeping metrics of the queue status.
__Caution:__ The returned Promise will only resolve __exactly once__, as soon as the amount of pending tasks reaches 0 and the queue tries to run the next task - and since the queue cannot distinguish between the origin of tasks, this function will only be useful in cases without concurrency. It will also not work correctly if you add tasks asynchronously and don't handle your asynchronous sequences very carefully.
In short; only use this method if you're very certain that you fully understand - and can predict - the execution order of your (asynchronous) code.
### queue.awaitCompleted(type)
Returns a Promise that will resolve when the task queue has run out of tasks for a given `type`, and all the running tasks have finished.
* __type__: The name of the task type, as specified in `queue.define`.
This is useful for, for example, complex multi-operation build processes, where you want to wait for all existing tasks to finish before moving on.
__Caution:__ The returned Promise will only resolve __exactly once__, as soon as the amount of pending tasks reaches 0 - and since the queue cannot distinguish between the origin of tasks, this function will only be useful in cases without concurrency. It will also not work correctly if you add tasks asynchronously and don't handle your asynchronous sequences very carefully.
__Caution:__ The returned Promise will only resolve __exactly once__, as soon as the last running task finishes and there are no tasks left in the queue - and since the queue cannot distinguish between the origin of tasks, this function will only be useful in cases without concurrency. It will also not work correctly if you add tasks asynchronously and don't handle your asynchronous sequences very carefully.
In short; only use this method if you're very certain that you fully understand - and can predict - the execution order of your (asynchronous) code.
@ -158,7 +170,11 @@ Emitted when the queue for this task type starts running, while it was previousl
#### 'queueDrained:$type'
Emitted when the queue for this task type has drained (ie. ran out of tasks). No arguments are passed to the event handler.
Emitted when the queue for this task type has drained (ie. ran out of queued tasks). Some of the tasks may still be running, however. No arguments are passed to the event handler.
#### 'queueCompleted:$type'
Emitted when the queue for this task type has fully completed (ie. the queue has drained, and all running tasks have finished). No arguments are passed to the event handler.
#### 'delayed:$type'

@ -91,42 +91,54 @@ module.exports = function createTaskQueue(options) {
}
function markStarted(type, task) {
debugTasks("markStarted: " + debugObject(task.data));
debugTasks("markStarted (" + type + "): " + debugObject(task.data));
counters[type] += 1;
starts[type] = Date.now();
taskQueue.emit("started:" + type, task.data);
}
function markFinished(type, task) {
debugTasks("markFinished: " + debugObject(task.data));
debugTasks("markFinished (" + type + "): " + debugObject(task.data));
counters[type] -= 1;
taskQueue.emit("finished:" + type, task.data);
checkCompletion(type);
}
function markSuccess(type, task) {
debugTasks("markSuccess: " + debugObject(task.data));
debugTasks("markSuccess (" + type + "): " + debugObject(task.data));
markFinished(type, task);
taskQueue.emit("success:" + type, task.data);
}
function markFailed(type, task) {
debugTasks("markFailed: " + debugObject(task.data));
debugTasks("markFailed (" + type + "): " + debugObject(task.data));
markFinished(type, task);
taskQueue.emit("failed:" + type, task.data);
}
function markQueueRunning(type) {
debugLoop("markQueueRunning");
debugLoop("markQueueRunning (" + type + ")");
taskQueue.emit("queueRunning:" + type);
running[type] = true;
}
function markQueueDrained(type) {
debugLoop("markQueueDrained");
debugLoop("markQueueDrained (" + type + ")");
taskQueue.emit("queueDrained:" + type);
running[type] = false;
}
function markQueueCompleted(type) {
debugLoop("markQueueCompleted (" + type + ")");
taskQueue.emit("queueCompleted:" + type);
}
function checkCompletion(type) {
if (tasks[type].length === 0 && counters[type] === 0) {
markQueueCompleted(type);
}
}
var taskQueue = extend(new events.EventEmitter(), {
define: function define(type, handler, options) {
if (handlers[type] != null) {
@ -188,6 +200,30 @@ module.exports = function createTaskQueue(options) {
});
}
});
},
awaitCompleted: function awaitCompleted(type) {
var _this2 = this;
return Promise.try(function () {
if (handlers[type] == null) {
throw new TaskQueueError("No such task type exists.");
}
debugLoop("awaitCompleted requested for '" + type + "'");
if (tasks[type].length === 0 && counters[type] === 0) {
debugLoop("Queue for '" + type + "' is already completed");
return;
} else {
debugLoop("Returning awaitCompleted Promise for '" + type + "'");
return new Promise(function (resolve, reject) {
_this2.on("queueCompleted:" + type, function () {
debugLoop("Resolving awaitCompleted Promise for '" + type + "'");
resolve();
});
});
}
});
}
});

@ -91,42 +91,54 @@ module.exports = function createTaskQueue(options) {
}
function markStarted(type, task) {
debugTasks(`markStarted: ${debugObject(task.data)}`);
debugTasks(`markStarted (${type}): ${debugObject(task.data)}`);
counters[type] += 1;
starts[type] = Date.now();
taskQueue.emit(`started:${type}`, task.data);
}
function markFinished(type, task) {
debugTasks(`markFinished: ${debugObject(task.data)}`);
debugTasks(`markFinished (${type}): ${debugObject(task.data)}`);
counters[type] -= 1;
taskQueue.emit(`finished:${type}`, task.data);
checkCompletion(type);
}
function markSuccess(type, task) {
debugTasks(`markSuccess: ${debugObject(task.data)}`);
debugTasks(`markSuccess (${type}): ${debugObject(task.data)}`);
markFinished(type, task);
taskQueue.emit(`success:${type}`, task.data);
}
function markFailed(type, task) {
debugTasks(`markFailed: ${debugObject(task.data)}`);
debugTasks(`markFailed (${type}): ${debugObject(task.data)}`);
markFinished(type, task);
taskQueue.emit(`failed:${type}`, task.data);
}
function markQueueRunning(type) {
debugLoop("markQueueRunning");
debugLoop(`markQueueRunning (${type})`);
taskQueue.emit(`queueRunning:${type}`);
running[type] = true;
}
function markQueueDrained(type) {
debugLoop("markQueueDrained");
debugLoop(`markQueueDrained (${type})`);
taskQueue.emit(`queueDrained:${type}`);
running[type] = false;
}
function markQueueCompleted(type) {
debugLoop(`markQueueCompleted (${type})`);
taskQueue.emit(`queueCompleted:${type}`);
}
function checkCompletion(type) {
if (tasks[type].length === 0 && counters[type] === 0) {
markQueueCompleted(type);
}
}
let taskQueue = extend(new events.EventEmitter(), {
define: function(type, handler, options) {
if (handlers[type] != null) {
@ -185,6 +197,28 @@ module.exports = function createTaskQueue(options) {
})
}
})
},
awaitCompleted: function(type) {
return Promise.try(() => {
if (handlers[type] == null) {
throw new TaskQueueError("No such task type exists.")
}
debugLoop(`awaitCompleted requested for '${type}'`);
if (tasks[type].length === 0 && counters[type] === 0) {
debugLoop(`Queue for '${type}' is already completed`);
return;
} else {
debugLoop(`Returning awaitCompleted Promise for '${type}'`);
return new Promise((resolve, reject) => {
this.on(`queueCompleted:${type}`, () => {
debugLoop(`Resolving awaitCompleted Promise for '${type}'`);
resolve();
})
})
}
})
}
});

Loading…
Cancel
Save