You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
srap/src/kernel.js

147 lines
4.8 KiB
JavaScript

3 years ago
"use strict";
const Promise = require("bluebird");
const chalk = require("chalk");
const unreachable = require("@joepie91/unreachable")("srap");
3 years ago
3 years ago
const rateLimit = require("@promistream/rate-limit");
3 years ago
const simpleSink = require("@promistream/simple-sink");
const pipe = require("@promistream/pipe");
const { validateOptions } = require("@validatem/core");
const isValidConfiguration = require("./validators/is-valid-configuration");
const createPrometheus = require("./prometheus");
const generateTaskGraph = require("./util/generate-task-graph");
const asyncInterval = require("./util/async-interval");
3 years ago
// FIXME: *Require* a taskInterval to be set, even if explicitly null, to prevent accidentally forgetting it
3 years ago
module.exports = async function createKernel(_configuration) {
let configuration = validateOptions(arguments, isValidConfiguration);
let state = {
... createPrometheus(),
tasks: generateTaskGraph({
tags: configuration.tags,
tasks: configuration.tasks
})
};
let { metrics, tasks } = state;
const createBackend = require("./database-backends")(state);
let attachToGlobalRateLimit = (configuration.taskInterval != null)
? rateLimit.clonable(configuration.taskInterval)
: undefined;
let backend = await createBackend({
backend: configuration.backend,
options: configuration.database
});
Object.assign(state, { backend: backend });
const createTaskKernel = require("./streams/task-kernel")(state);
const runTask = require("./run-task")(state);
function checkLockedTasks() {
return Promise.try(() => {
return backend.topLevel.countLockedTasks();
}).then((lockedCount) => {
if (lockedCount > 0) {
console.log(`${chalk.bold.red("WARNING:")} There are ${lockedCount} tasks currently locked, and they will not be run! This may be caused by a process crash in the past. See the documentation for more details on how to solve this issue.`);
3 years ago
}
});
}
3 years ago
let databasePreparePromise;
async function prepareDatabase() {
if (databasePreparePromise == null) {
databasePreparePromise = Promise.all([
checkLockedTasks(),
backend.topLevel.insertSeeds(null, { seeds: configuration.seed })
]);
3 years ago
}
return databasePreparePromise;
}
3 years ago
// FIXME: Don't dump to console.log below, since this is meant to be usable as a library as well - provide some sort of object logging hook instead?
return {
run: async function runKernel() {
console.log(`Starting ${tasks.size} tasks...`);
await prepareDatabase();
asyncInterval(60 * 1000, () => {
return Promise.try(() => {
return backend.topLevel.getQueueSize();
}).then((queueSize) => {
for (let [ taskName, size ] of Object.entries(queueSize)) {
metrics.taskQueueSize.labels({ task: taskName }).set(size);
}
});
});
return Promise.map(tasks.values(), (task) => {
return pipe([
createTaskKernel(task, {
globalRateLimiter: (attachToGlobalRateLimit != null) ? attachToGlobalRateLimit() : null,
}),
simpleSink(({ status }) => {
if (status === "completed") {
2 years ago
metrics.successfulItems.inc(1);
metrics.successfulItems.labels({ task: task.name }).inc(1);
} else if (status === "failed") {
metrics.failedItems.inc(1);
metrics.failedItems.labels({ task: task.name }).inc(1);
} else {
unreachable(`Unrecognized status '${status}'`);
}
})
]).read();
});
},
simulate: async function simulate({ itemID, task: taskName }) {
console.log(`Simulating task ${itemID}/${taskName}...`);
await prepareDatabase();
let simulatedBackend = backend.topLevel.simulate();
let simulateTask = require("./run-task")({
... state,
backend: simulatedBackend
});
let item = await simulatedBackend.topLevel.getItem(null, { id: itemID });
return simulateTask(tasks.get(taskName), item);
},
execute: async function simulate({ itemID, task: taskName }) {
// TODO: Should this also lock the task? We probably want to ignore any locks, since this method is primarily used for task logic debugging purposes, and overriding locks would be desirable there.
console.log(`Running task ${itemID}/${taskName}...`);
await prepareDatabase();
let item = await backend.topLevel.getItem(null, { id: itemID });
return runTask(tasks.get(taskName), item);
},
shutdown: function () {
// TODO: Properly lock all public methods after shutdown is called, and wait for any running tasks to have completed
return backend.topLevel.shutdown();
},
getMetrics: function () {
return Promise.try(() => {
return state.prometheusRegistry.metrics();
}).then((metrics) => {
return {
contentType: state.prometheusRegistry.contentType,
metrics: metrics
};
});
},
getUpdates: function ({ prefix, timestamp }) {
return backend.topLevel.getUpdateStream(null, { prefix, timestamp });
}
};
3 years ago
};