You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
216 lines
6.1 KiB
JavaScript
216 lines
6.1 KiB
JavaScript
"use strict";
|
|
|
|
const Promise = require("bluebird");
|
|
const defaultValue = require("default-value");
|
|
const chalk = require("chalk");
|
|
const util = require("util");
|
|
const syncpipe = require("syncpipe");
|
|
|
|
const rateLimit = require("@promistream/rate-limit");
|
|
const simpleSink = require("@promistream/simple-sink");
|
|
const pipe = require("@promistream/pipe");
|
|
const parallelize = require("@promistream/parallelize");
|
|
|
|
const initialize = require("./initialize");
|
|
const logStatus = require("./log-status");
|
|
const createDependencyMap = require("./dependency-map");
|
|
|
|
// FIXME: *Require* a taskInterval to be set, even if explicitly null, to prevent accidentally forgetting it
|
|
|
|
// TODO: Publish this as a separate package
|
|
// Inverts an object of arrays, eg. {a: [x, y], b: [x, z]} becomes {x: [a, b], y: [a], z: [b]}
|
|
// Useful for eg. tag mappings
|
|
function invertMapping(object) {
|
|
let newObject = {};
|
|
|
|
for (let [ key, valueList ] of Object.entries(object)) {
|
|
for (let value of valueList) {
|
|
if (newObject[value] == null) {
|
|
newObject[value] = [];
|
|
}
|
|
|
|
newObject[value].push(key);
|
|
}
|
|
}
|
|
|
|
return newObject;
|
|
}
|
|
|
|
module.exports = function createKernel(configuration) {
|
|
return Promise.try(() => {
|
|
return initialize({
|
|
knexfile: {
|
|
client: "pg",
|
|
connection: configuration.database,
|
|
pool: { min: 0, max: 32 },
|
|
migrations: { tableName: "srap_knex_migrations" }
|
|
}
|
|
});
|
|
}).then((state) => {
|
|
const queries = require("./queries")(state);
|
|
const createTaskStream = require("./task-stream")(state);
|
|
const createDatabaseQueue = require("./queued-database-api")(state);
|
|
|
|
let { knex } = state;
|
|
let { dependencyMap, dependentMap } = createDependencyMap(configuration);
|
|
|
|
function insertSeeds() {
|
|
return Promise.map(configuration.seed, (item) => {
|
|
return queries.createItem(knex, {
|
|
... item,
|
|
allowUpsert: false,
|
|
failIfExists: false
|
|
});
|
|
});
|
|
}
|
|
|
|
function checkLockedTasks() {
|
|
return Promise.try(() => {
|
|
return queries.countLockedTasks(knex);
|
|
}).then((lockedCount) => {
|
|
if (lockedCount > 0) {
|
|
console.log(`${chalk.bold.red("WARNING:")} There are ${lockedCount} tasks currently locked, and they will not be run! This may be caused by a process crash in the past. See the documentation for more details on how to solve this issue.`);
|
|
}
|
|
});
|
|
}
|
|
|
|
function runTaskStreams() {
|
|
let tasks = invertMapping(configuration.tags);
|
|
|
|
let attachToGlobalRateLimit = (configuration.taskInterval != null)
|
|
? rateLimit.clonable(configuration.taskInterval)
|
|
: undefined;
|
|
|
|
console.log(`Starting ${Object.keys(tasks).length} tasks...`);
|
|
|
|
return Promise.map(Object.entries(tasks), ([ task, tags ]) => {
|
|
let taskConfiguration = configuration.tasks[task];
|
|
|
|
if (taskConfiguration != null) {
|
|
let taskStream = createTaskStream({
|
|
task: task,
|
|
tags: tags,
|
|
taskVersion: defaultValue(taskConfiguration.version, "0"),
|
|
taskInterval: taskConfiguration.taskInterval,
|
|
parallelTasks: taskConfiguration.parallelTasks,
|
|
ttl: taskConfiguration.ttl,
|
|
run: taskConfiguration.run,
|
|
globalRateLimiter: (attachToGlobalRateLimit != null)
|
|
? attachToGlobalRateLimit()
|
|
: null,
|
|
globalParallelize: (configuration.parallelTasks != null)
|
|
? parallelize(configuration.parallelTasks)
|
|
: null,
|
|
taskDependencies: dependencyMap[task],
|
|
taskDependents: dependentMap[task]
|
|
});
|
|
|
|
return pipe([
|
|
taskStream,
|
|
simpleSink((completedItem) => {
|
|
logStatus(task, chalk.bold.green, "completed", completedItem.id);
|
|
})
|
|
]).read();
|
|
} else {
|
|
throw new Error(`Task '${task}' is defined to run for tags [${tags}], but no such task is defined`);
|
|
}
|
|
}).catch((error) => {
|
|
console.dir(error, { depth: null, colors: true });
|
|
throw error;
|
|
});
|
|
}
|
|
|
|
function executeTask(id, task) {
|
|
let taskConfiguration = configuration.tasks[task];
|
|
|
|
return knex.transaction((tx) => {
|
|
return Promise.try(() => {
|
|
return queries.getItem(knex, id);
|
|
}).then((item) => {
|
|
let queue = createDatabaseQueue({
|
|
tx,
|
|
item,
|
|
task,
|
|
taskVersion: defaultValue(taskConfiguration.version, "0"),
|
|
taskDependents: dependentMap[task],
|
|
taskDependencies: dependencyMap[task]
|
|
});
|
|
|
|
return Promise.try(() => {
|
|
return taskConfiguration.run({
|
|
id: item.id,
|
|
data: item.data,
|
|
getItem: function (id) {
|
|
return queries.getItem(knex, id);
|
|
},
|
|
... queue.api
|
|
});
|
|
}).then(() => {
|
|
return queue.execute();
|
|
});
|
|
});
|
|
});
|
|
}
|
|
|
|
function simulateTask(id, task) {
|
|
let taskConfiguration = configuration.tasks[task];
|
|
|
|
let methods = [ "createItem", "renameItem", "mergeItem", "deleteItem", "createAlias", "deleteAlias", "updateData", "updateMetadata", "expire" ];
|
|
|
|
let simulatedMethods = syncpipe(methods, [
|
|
(_) => _.map((method) => [ method, function() {
|
|
console.log(`${chalk.bold.yellow.bgBlack(`${method} (simulated):`)} ${util.inspect(arguments, { colors: true, depth: null })}`);
|
|
}]),
|
|
(_) => Object.fromEntries(_)
|
|
]);
|
|
|
|
return Promise.try(() => {
|
|
return queries.getItem(knex, id);
|
|
}).then((item) => {
|
|
return taskConfiguration.run({
|
|
id: item.id,
|
|
data: item.data,
|
|
getItem: function (id) {
|
|
return queries.getItem(knex, id);
|
|
},
|
|
... simulatedMethods
|
|
});
|
|
});
|
|
}
|
|
|
|
return {
|
|
run: function runKernel() {
|
|
return Promise.try(() => {
|
|
return insertSeeds();
|
|
}).then(() => {
|
|
return checkLockedTasks();
|
|
}).then(() => {
|
|
return runTaskStreams();
|
|
});
|
|
},
|
|
simulate: function simulate({ itemID, task }) {
|
|
return Promise.try(() => {
|
|
return insertSeeds();
|
|
}).then(() => {
|
|
return checkLockedTasks();
|
|
}).then(() => {
|
|
return simulateTask(itemID, task);
|
|
});
|
|
},
|
|
execute: function simulate({ itemID, task }) {
|
|
return Promise.try(() => {
|
|
return insertSeeds();
|
|
}).then(() => {
|
|
return checkLockedTasks();
|
|
}).then(() => {
|
|
return executeTask(itemID, task);
|
|
});
|
|
},
|
|
shutdown: function () {
|
|
// TODO: Properly lock all public methods after shutdown is called, and wait for any running tasks to have completed
|
|
knex.destroy();
|
|
}
|
|
};
|
|
});
|
|
};
|