"use strict"; const Promise = require("bluebird"); const pipe = require("@promistream/pipe"); const filter = require("@promistream/filter"); const map = require("@promistream/map"); const rateLimit = require("@promistream/rate-limit"); const parallelize = require("@promistream/parallelize"); // FIXME: Move logs to logging hook module.exports = function (state) { let { backend } = state; const runTask = require("../run-task")(state); return function createTaskKernelStream(task, { globalRateLimiter }) { return pipe([ backend.topLevel.getTaskStream(null, { task: task }), filter((item) => backend.forItem({ task: task, item: item }).internal.lock()), globalRateLimiter, (task.taskInterval != null) ? rateLimit(task.taskInterval) : null, map((item) => { return Promise.try(() => { return runTask(task, item); }).tap(() => { return backend.forItem({ task: task, item: item }).internal.unlock(); }); }), (task.parallelTasks != null) ? parallelize(task.parallelTasks) : null ]); }; };