You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

239 lines
6.1 KiB
JavaScript

'use strict';
var Promise = require("bluebird");
var events = require("events");
var extend = require("extend");
var createError = require("create-error");
var debugLoop = require("debug")("promise-task-queue:loop");
var debugTasks = require("debug")("promise-task-queue:tasks");
var util = require("util");
function debugObject(obj) {
return util.inspect(obj, { depth: null, colors: true }).replace(/\n|\r/g, "");
}
var TaskQueueError = createError("TaskQueueError", {
code: "TaskQueueError"
});
function defaultValue(value, defaultVal) {
if (value != null) {
return value;
} else {
return defaultVal;
}
}
module.exports = function createTaskQueue(options) {
var handlers = {};
var taskOptions = {};
var tasks = {};
var counters = {};
var starts = {};
var running = {};
function tryRunTask(type) {
var maxTasks = defaultValue(taskOptions[type].concurrency, Infinity);
var waitTime = remainingInterval(type);
if (tasks[type].length > 0) {
if (waitTime <= 0) {
if (counters[type] < maxTasks) {
if (running[type] === false) {
debugLoop("Queue for '" + type + "' is now running");
markQueueRunning(type);
}
runTask(type);
} else {
debugLoop("Reached concurrency for '" + type + "'");
taskQueue.emit("concurrencyReached:" + type);
}
} else {
debugLoop("Registering queue delay for '" + type + "'");
taskQueue.emit("delayed:" + type);
setTimeout(function () {
tryRunTask(type);
}, waitTime);
}
} else {
if (running[type] === true) {
debugLoop("Queue for '" + type + "' has now stopped");
markQueueDrained(type);
}
}
}
function remainingInterval(type) {
var taskInterval = defaultValue(taskOptions[type].interval, 0) * 1000;
var lastTask = defaultValue(starts[type], 0);
return lastTask + taskInterval - Date.now();
}
function runTask(type) {
var task = tasks[type].shift();
markStarted(type, task);
Promise.try(function () {
return handlers[type](task.data);
}).then(function (result) {
task.resolve(result);
markSuccess(type, task);
}).catch(function (err) {
markFailed(type, task);
task.reject(err);
}).then(function () {
tryRunTask(type);
});
}
function markStarted(type, task) {
debugTasks("markStarted (" + type + "): " + debugObject(task.data));
counters[type] += 1;
starts[type] = Date.now();
taskQueue.emit("started:" + type, task.data);
}
function markFinished(type, task) {
debugTasks("markFinished (" + type + "): " + debugObject(task.data));
counters[type] -= 1;
taskQueue.emit("finished:" + type, task.data);
checkCompletion(type);
}
function markSuccess(type, task) {
debugTasks("markSuccess (" + type + "): " + debugObject(task.data));
markFinished(type, task);
taskQueue.emit("success:" + type, task.data);
}
function markFailed(type, task) {
debugTasks("markFailed (" + type + "): " + debugObject(task.data));
markFinished(type, task);
taskQueue.emit("failed:" + type, task.data);
}
function markQueueRunning(type) {
debugLoop("markQueueRunning (" + type + ")");
taskQueue.emit("queueRunning:" + type);
running[type] = true;
}
function markQueueDrained(type) {
debugLoop("markQueueDrained (" + type + ")");
taskQueue.emit("queueDrained:" + type);
running[type] = false;
}
function markQueueCompleted(type) {
debugLoop("markQueueCompleted (" + type + ")");
taskQueue.emit("queueCompleted:" + type);
}
function checkCompletion(type) {
if (tasks[type].length === 0 && counters[type] === 0) {
markQueueCompleted(type);
}
}
var taskQueue = extend(new events.EventEmitter(), {
define: function define(type, handler, options) {
if (handlers[type] != null) {
throw new TaskQueueError("The '" + type + "' task type already exists.");
}
handlers[type] = handler;
taskOptions[type] = defaultValue(options, {});
counters[type] = 0;
running[type] = false;
tasks[type] = [];
},
push: function push(type, data) {
return Promise.try(function () {
if (handlers[type] == null) {
throw new TaskQueueError("No such task type exists.");
}
debugTasks("Queueing new task for '" + type + "': " + debugObject(data));
var resolveFunc = void 0,
rejectFunc = void 0;
var deferredPromise = new Promise(function (resolve, reject) {
resolveFunc = resolve;
rejectFunc = reject;
});
tasks[type].push({
data: data,
resolve: resolveFunc,
reject: rejectFunc
});
tryRunTask(type);
return deferredPromise;
});
},
drain: function drain(type) {
if (handlers[type] == null) {
throw new TaskQueueError("No such task type exists.");
}
debugTasks("Draining tasks for '" + type + "'");
tasks[type] = [];
},
awaitDrained: function awaitDrained(type) {
var _this = this;
return Promise.try(function () {
if (handlers[type] == null) {
throw new TaskQueueError("No such task type exists.");
}
debugLoop("awaitDrained requested for '" + type + "'");
if (tasks[type].length === 0) {
debugLoop("Queue for '" + type + "' is already drained");
return;
} else {
debugLoop("Returning awaitDrained Promise for '" + type + "'");
return new Promise(function (resolve, reject) {
_this.on("queueDrained:" + type, function () {
debugLoop("Resolving awaitDrained Promise for '" + type + "'");
resolve();
});
});
}
});
},
awaitCompleted: function awaitCompleted(type) {
var _this2 = this;
return Promise.try(function () {
if (handlers[type] == null) {
throw new TaskQueueError("No such task type exists.");
}
debugLoop("awaitCompleted requested for '" + type + "'");
if (tasks[type].length === 0 && counters[type] === 0) {
debugLoop("Queue for '" + type + "' is already completed");
return;
} else {
debugLoop("Returning awaitCompleted Promise for '" + type + "'");
return new Promise(function (resolve, reject) {
_this2.on("queueCompleted:" + type, function () {
debugLoop("Resolving awaitCompleted Promise for '" + type + "'");
resolve();
});
});
}
});
}
});
return taskQueue;
};