Automatically migrated from Gitolite
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

95 lines
2.3 KiB

EventEmitter = require("events").EventEmitter
Promise = require "bluebird"
debug = require("debug")("task-runner")
makeExternalPromise = ->
extResolve = extReject = null
new Promise (resolve, reject) ->
extResolve = resolve
extReject = reject
module.exports = class TaskRunner extends EventEmitter
constructor: (@context = {}) ->
@_taskTypes = {}
@_queue = {}
@_runningCount = {}
@running = false
@context.taskRunner = this
_checkRunTask: ->
if not @running
return
debug "checking for runnable tasks..."
for taskType, options of @_taskTypes
if @_runningCount[taskType] < (options.maxConcurrent ? Infinity)
if options.maxConcurrent?
tasksToRun = options.maxConcurrent - @_runningCount[taskType]
else
tasksToRun = Infinity
debug "running #{tasksToRun} tasks"
for i in [0...tasksToRun]
if @_queue[taskType].length == 0
debug "ran out of tasks"
@emit "tasksDepleted"
break
@_doRunTask taskType, @_queue[taskType].shift()
debug "started #{tasksToRun} tasks, waiting for completion..."
_doRunTask: (taskType, taskData) ->
taskOptions = @_taskTypes[taskType]
@_runningCount[taskType] += 1
@emit "taskStarted", taskType, taskData.task
Promise.resolve(taskOptions.taskFunc(taskData.task, @context))
.then (value) =>
@_markTaskCompleted taskType, taskData
taskData.resolveFunc(value)
.catch (err) =>
@emit "taskFailed", taskType, taskData.task, err
taskData.rejectFunc(err)
debug "started task"
_markTaskCompleted: (taskType, task) ->
@_runningCount[taskType] -= 1
@emit "taskCompleted", taskType, task.task
debug "completed task"
@_checkRunTask()
addTask: (taskType, taskFunc, options = {}) =>
options.taskFunc = taskFunc
@_taskTypes[taskType] = options
@_queue[taskType] = []
@_runningCount[taskType] = 0
debug "added task"
setTaskOptions: (taskType, options) =>
options.taskFunc = @_taskTypes[taskType].taskFunc
@_taskTypes[taskType] = options
do: (taskType, task) =>
@emit "taskQueued", taskType, task
debug "queued task"
new Promise (resolve, reject) =>
@_queue[taskType].push
resolveFunc: resolve
rejectFunc: reject
task: task
@_checkRunTask()
run: =>
@running = true
debug "started task loop"
@_checkRunTask()
pause: =>
@running = false
debug "paused task loop"