"use strict";
const Promise = require("bluebird");
const defaultValue = require("default-value");
const chalk = require("chalk");
const util = require("util");
const syncpipe = require("syncpipe");
const rateLimit = require("@ppstreams/rate-limit");
const simpleSink = require("@promistream/simple-sink");
const pipe = require("@promistream/pipe");
const parallelize = require("@ppstreams/parallelize");
const initialize = require("./initialize");
// TODO: Publish this as a separate package
// Inverts an object of arrays, eg. {a: [x, y], b: [x, z]} becomes {x: [a, b], y: [a], z: [b]}
// Useful for eg. tag mappings
function invertMapping(object) {
let newObject = {};
for (let [ key, valueList ] of Object.entries(object)) {
for (let value of valueList) {
if (newObject[value] == null) {
newObject[value] = [];
return newObject;
function log(value) {
return value;
module.exports = function createKernel(configuration) {
return Promise.try(() => {
return initialize({
knexfile: {
client: "pg",
connection: configuration.database
}).then((state) => {
const queries = require("./queries")(state);
const createTaskStream = require("./task-stream")(state);
let { knex } = state;
function insertSeeds() {
return, (item) => {
return queries.createItem(knex, {
... item,
allowUpsert: false,
failIfExists: false
function checkLockedTasks() {
return Promise.try(() => {
return queries.countLockedTasks(knex);
}).then((lockedCount) => {
if (lockedCount > 0) {
console.log(`${"WARNING:")} There are ${lockedCount} tasks currently locked, and they will not be run! This may be caused by a process crash in the past. See the documentation for more details on how to solve this issue.`);
function runTaskStreams() {
let tasks = invertMapping(configuration.tags);
let attachToGlobalRateLimit = (configuration.taskInterval != null)
? rateLimit.clonable(configuration.taskInterval)
: undefined;
console.log(`Starting ${Object.keys(tasks).length} tasks...`);
return, ([ task, tags ]) => {
let taskConfiguration = configuration.tasks[task];
if (taskConfiguration != null) {
let taskStream = createTaskStream({
task: task,
tags: tags,
taskVersion: defaultValue(taskConfiguration.version, "0"),
ttl: taskConfiguration.ttl,
globalRateLimiter: (attachToGlobalRateLimit != null)
? attachToGlobalRateLimit()
: null,
globalParallelize: (configuration.parallelTasks != null)
? parallelize(configuration.parallelTasks)
: null,
taskDependencies: defaultValue(taskConfiguration.dependsOn, []).map((task) => {
return {
task: task,
taskVersion: defaultValue(configuration.tasks[task].taskVersion, "0")
return pipe([
simpleSink((completedItem) => {
console.log(`[completed] ${}`);
} else {
throw new Error(`Task '${task}' is defined to run for tags [${tags}], but no such task is defined`);
}).catch((error) => {
console.dir(error, { depth: null, colors: true });
throw error;
function simulateTask(id, task) {
let taskConfiguration = configuration.tasks[task];
let methods = [ "createItem", "renameItem", "mergeItem", "deleteItem", "createAlias", "deleteAlias", "updateData", "updateMetadata", "expire" ];
let simulatedMethods = syncpipe(methods, [
(_) => => [ method, function() {
console.log(`${chalk.bold.yellow.bgBlack(`${method} (simulated):`)} ${util.inspect(arguments, { colors: true, depth: null })}`);
(_) => Object.fromEntries(_)
return Promise.try(() => {
return queries.getItem(knex, id);
}).then((item) => {
getItem: function (id) {
return queries.getItem(knex, id);
... simulatedMethods
return {
run: function runKernel() {
return Promise.try(() => {
return insertSeeds();
}).then(() => {
return checkLockedTasks();
}).then(() => {
return runTaskStreams();
simulate: function simulate({ itemID, task }) {
return Promise.try(() => {
return insertSeeds();
}).then(() => {
return checkLockedTasks();
}).then(() => {
return simulateTask(itemID, task);
shutdown: function () {
// TODO: Properly lock all public methods after shutdown is called, and wait for any running tasks to have completed