You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
srap/src/kernel.js

147 lines
4.8 KiB
JavaScript

"use strict";
const Promise = require("bluebird");
const chalk = require("chalk");
const unreachable = require("@joepie91/unreachable")("srap");
const rateLimit = require("@promistream/rate-limit");
const simpleSink = require("@promistream/simple-sink");
const pipe = require("@promistream/pipe");
const { validateOptions } = require("@validatem/core");
const isValidConfiguration = require("./validators/is-valid-configuration");
const createPrometheus = require("./prometheus");
const generateTaskGraph = require("./util/generate-task-graph");
const asyncInterval = require("./util/async-interval");
// FIXME: *Require* a taskInterval to be set, even if explicitly null, to prevent accidentally forgetting it
module.exports = async function createKernel(_configuration) {
let configuration = validateOptions(arguments, isValidConfiguration);
let state = {
... createPrometheus(),
tasks: generateTaskGraph({
tags: configuration.tags,
tasks: configuration.tasks
})
};
let { metrics, tasks } = state;
const createBackend = require("./database-backends")(state);
let attachToGlobalRateLimit = (configuration.taskInterval != null)
? rateLimit.clonable(configuration.taskInterval)
: undefined;
let backend = await createBackend({
backend: configuration.backend,
options: configuration.database
});
Object.assign(state, { backend: backend });
const createTaskKernel = require("./streams/task-kernel")(state);
const runTask = require("./run-task")(state);
function checkLockedTasks() {
return Promise.try(() => {
return backend.topLevel.countLockedTasks();
}).then((lockedCount) => {
if (lockedCount > 0) {
console.log(`${chalk.bold.red("WARNING:")} There are ${lockedCount} tasks currently locked, and they will not be run! This may be caused by a process crash in the past. See the documentation for more details on how to solve this issue.`);
}
});
}
let databasePreparePromise;
async function prepareDatabase() {
if (databasePreparePromise == null) {
databasePreparePromise = Promise.all([
checkLockedTasks(),
backend.topLevel.insertSeeds(null, { seeds: configuration.seed })
]);
}
return databasePreparePromise;
}
// FIXME: Don't dump to console.log below, since this is meant to be usable as a library as well - provide some sort of object logging hook instead?
return {
run: async function runKernel() {
console.log(`Starting ${tasks.size} tasks...`);
await prepareDatabase();
asyncInterval(60 * 1000, () => {
return Promise.try(() => {
return backend.topLevel.getQueueSize();
}).then((queueSize) => {
for (let [ taskName, size ] of Object.entries(queueSize)) {
metrics.taskQueueSize.labels({ task: taskName }).set(size);
}
});
});
return Promise.map(tasks.values(), (task) => {
return pipe([
createTaskKernel(task, {
globalRateLimiter: (attachToGlobalRateLimit != null) ? attachToGlobalRateLimit() : null,
}),
simpleSink(({ status }) => {
if (status === "completed") {
metrics.successfulItems.inc(1);
metrics.successfulItems.labels({ task: task.name }).inc(1);
} else if (status === "failed") {
metrics.failedItems.inc(1);
metrics.failedItems.labels({ task: task.name }).inc(1);
} else {
unreachable(`Unrecognized status '${status}'`);
}
})
]).read();
});
},
simulate: async function simulate({ itemID, task: taskName }) {
console.log(`Simulating task ${itemID}/${taskName}...`);
await prepareDatabase();
let simulatedBackend = backend.topLevel.simulate();
let simulateTask = require("./run-task")({
... state,
backend: simulatedBackend
});
let item = await simulatedBackend.topLevel.getItem(null, { id: itemID });
return simulateTask(tasks.get(taskName), item);
},
execute: async function simulate({ itemID, task: taskName }) {
// TODO: Should this also lock the task? We probably want to ignore any locks, since this method is primarily used for task logic debugging purposes, and overriding locks would be desirable there.
console.log(`Running task ${itemID}/${taskName}...`);
await prepareDatabase();
let item = await backend.topLevel.getItem(null, { id: itemID });
return runTask(tasks.get(taskName), item);
},
shutdown: function () {
// TODO: Properly lock all public methods after shutdown is called, and wait for any running tasks to have completed
return backend.topLevel.shutdown();
},
getMetrics: function () {
return Promise.try(() => {
return state.prometheusRegistry.metrics();
}).then((metrics) => {
return {
contentType: state.prometheusRegistry.contentType,
metrics: metrics
};
});
},
getUpdates: function ({ prefix, timestamp }) {
return backend.topLevel.getUpdateStream(null, { prefix, timestamp });
}
};
};