Backend refactor: support pluggable backends

master
Sven Slootweg 1 year ago
parent bc1a9349c3
commit 1e1a367cb2

@ -0,0 +1,17 @@
"use strict";
module.exports.up = function(knex, Promise) {
return knex.schema
.alterTable("srap_task_results", (table) => {
table.jsonb("metadata").notNullable().default({}).alter();
table.boolean("is_successful").nullable().alter();
});
};
module.exports.down = function(knex, Promise) {
return knex.schema
.alterTable("srap_task_results", (table) => {
table.jsonb("metadata").notNullable().alter();
table.boolean("is_successful").notNullable().alter();
});
};

@ -1,8 +1,9 @@
"use strict";
const Promise = require("bluebird");
const unreachable = require("@joepie91/unreachable");
const { validateArguments } = require("@validatem/core");
const { validateArguments, validateOptions } = require("@validatem/core");
const required = require("@validatem/required");
const requireEither = require("@validatem/require-either");
const isString = require("@validatem/is-string");
@ -111,8 +112,14 @@ module.exports = function (state) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, {
// FIXME: Stricter validation
seeds: [ required, arrayOf(anything) ]
// FIXME: This currently duplicates validation logic from forItem.storeItem; figure out a way to deduplicate that
seeds: [ required, arrayOf({
id: [ required, isString ],
// Tags are required to be specified (even if an empty array) because it's easily forgotten
tags: [ required, arrayOf(isString) ],
aliases: [ defaultTo([]), arrayOf(isString) ],
data: [ required, anything ], // FIXME: Check for object
}) ]
}]
});
@ -126,23 +133,230 @@ module.exports = function (state) {
failIfExists: false
});
});
}
},
// FIXME: Other than the missing readOperation wrapper and the tx argument, this is *basically* the same logic as under forItem... this should be simplified somehow.
getItem: function (_tx, _options) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, wrapValueAsOption("id"), {
id: [ required, isString ],
optional: [ defaultTo(false), isBoolean ] // FIXME: Can this handling be moved to the wrapper?
}]
});
return backend.getItem(tx, options);
},
},
forItem: function ({ item, task, mutationQueue }) {
forItem: function (_options) {
// FIXME: Proper validation rules here for the other fields as well
let { item, task, mutationQueue, readTX, simulate } = validateOptions(arguments, {
item: anything,
task: [ required, isTask ],
mutationQueue: anything,
readTX: maybeTX,
simulate: anything
});
// We create a new instance of the actual API for every item being processed. This is necessary because some of the input arguments will default to item-specific values, and some of the logic is dependent on task-specific metadata. This is a more efficient (and understandable) approach than pretending the API is stateless and then separately wrapping the API *again* for every individual item with a whole separate layer of input validation rules.
// FIXME: Is this still correct, with the new task (graph) format?
let dependentTaskNames = new Set(task.dependents.map((dependent) => dependent.task));
function mutableOperation(func) {
if (simulate) {
return func(backend);
if (simulate === true) {
return func(readTX, backend);
} else if (mutationQueue != null) {
mutationQueue.push(func);
} else {
unreachable("No mutation queue provided in live mode");
}
}
function readOperation(func) {
return func(readTX, backend);
}
let exposedAPI = {
// NOTE: 'exposed' API methods are the ones that are passed into a user-defined task, and which the task uses to eg. update or create new items
getItem: function (_options) {
let [ options ] = validateArguments(arguments, {
options: [ required, wrapValueAsOption("id"), {
id: [ required, isString ],
optional: [ defaultTo(false), isBoolean ] // FIXME: Can this handling be moved to the wrapper?
}]
});
return readOperation((tx) => {
return backend.getItem(tx, options);
});
},
storeItem: function (_options) {
// NOTE: Using `update` instead of `data` makes it an upsert!
let [ options ] = validateArguments(arguments, {
options: [ required, {
id: [ required, isString ],
// Tags are required to be specified (even if an empty array) because it's easily forgotten
tags: [ required, arrayOf(isString) ],
aliases: [ defaultTo([]), arrayOf(isString) ],
data: [ anything ], // FIXME: Check for object
update: [ isFunction ],
failIfExists: [ defaultTo(false), isBoolean ],
allowUpsert: [ defaultTo(true), isBoolean ],
parentID: [ defaultTo(item.id), isString ]
}, requireEither([ "data", "update" ]) ]
});
let { data, ... rest } = options;
return mutableOperation((tx) => {
return backend.storeItem(tx, {
... rest,
// We normalize `data` and `update` (which are mutually-exclusive) into a single option here, so that the backend only needs to deal with the `update` case
// TODO: Can this be folded into the validation rules in a reasonable and readable way?
update: (data != null)
? (existingData) => ({ ... existingData, ... data })
: rest.update
});
});
},
moveItem: function (_options) {
let [ options ] = validateArguments(arguments, {
options: [ required, wrapValueAsOption("into"), {
from: [ defaultTo(item.id), isString ],
into: [ required, isString ],
// NOTE: If no `merge` function is specified, that indicates that merging is not allowed (ie. this is strictly a rename), and mergeMetadata is ignored too
merge: [ isFunction ],
mergeMetadata: [ defaultTo({}), anyProperty({
key: [ required ],
value: [ required, isFunction ]
})],
}]
});
return mutableOperation((tx) => {
return backend.moveItem(tx, { options, allowMerge: (options.merge != null) });
});
},
deleteItem: function (_options) {
let [ options ] = validateArguments(arguments, {
options: [
defaultTo({}),
wrapValueAsOption("id"), {
id: [ defaultTo(item.id), isString ]
}
]
});
return mutableOperation((tx) => {
return backend.deleteItem(tx, options);
});
},
createAlias: function (_options) {
let [ options ] = validateArguments(arguments, {
options: [ required, wrapValueAsOption("from"), {
from: [ required, isString ],
to: [ defaultTo(item.id), isString ],
failIfExists: [ defaultTo(false), isBoolean ] // TODO: Shouldn't this default to true, for any occurrence outside of a merge/rename?
}]
});
return mutableOperation((tx) => {
return backend.createAlias(tx, options);
});
},
deleteAlias: function (_options) {
let [ options ] = validateArguments(arguments, {
options: [ required, wrapValueAsOption("from"), {
from: [ required, isString ]
}]
});
return mutableOperation((tx) => {
return backend.deleteAlias(tx, options);
});
},
updateData: function (_options) {
// NOTE: This is a semantically self-describing convenience wrapper for `storeItem` that updates the currently-being-processed item
let [ options ] = validateArguments(arguments, {
options: [ required, wrapValueAsOption("update"), {
id: [ defaultTo(item.id), isString ],
update: [ required, isFunction ]
}]
});
return exposedAPI.storeItem({
... options,
tags: []
});
},
updateMetadata: function (_options) {
let [ options ] = validateArguments(arguments, {
options: [ required, wrapValueAsOption("update"), {
id: [ defaultTo(item.id), isString ],
update: [ required, isFunction ],
task: [ required, isTask ]
}]
});
return mutableOperation((tx) => {
return backend.updateMetadata(tx, options);
});
},
expire: function (_options) {
// TODO: It probably doesn't make any semantic sense to leave *both* arguments unspecified. Maybe that should be prohibited via eg. a non-exclusive requireEither? Otherwise the user might expect to immediately expire the *current* task, but since the task is only updated *after* the task logic runs, that is not currently possible to express.
let [ options ] = validateArguments(arguments, {
options: [ required, {
id: [ defaultTo(item.id), isString ],
isTask: [ defaultTo(task), isTask ]
}]
});
return mutableOperation((tx) => {
return backend.expire(tx, options);
});
},
expireDependents: function (_options) {
// NOTE: This method does not have a counterpart in the database backend; it's a convenience abstraction over regular `backend.expire` calls
let [{ id, dependents }] = validateArguments(arguments, {
options: [ defaultTo({}), wrapValueAsOption("dependents"), {
id: [ defaultTo(item.id), isString ],
dependents: [ defaultTo([]), arrayOf(isString), (dependents) => {
// Only consider dependents that actually exist for this task
return dependents.filter((dependent) => dependentTaskNames.has(dependent));
}]
}]
});
// NOTE: This works even with queueing, because each this.expire call just internally queues another operation
return Promise.map(dependents, (dependent) => {
return exposedAPI.expire({
id: id,
taskName: dependent
});
});
},
// Temporary compatibility aliases
createItem: (... args) => exposedAPI.storeItem(... args),
mergeItem: (... args) => exposedAPI.moveItem(... args),
renameItem: (options) => {
if (typeof options === "string") {
return exposedAPI.moveItem(options);
} else {
return exposedAPI.moveItem({ into: options.to, from: options.from });
}
},
};
return {
// NOTE: 'internal' API methods are accessible to srap, but not to user-defined tasks.
@ -203,196 +417,7 @@ module.exports = function (state) {
});
},
},
exposed: {
// NOTE: 'exposed' API methods are the ones that are passed into a user-defined task, and which the task uses to eg. update or create new items
getItem: function (_tx, _id, _optional) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, wrapValueAsOption("id"), {
id: [ required, isString ],
optional: [ defaultTo(false), isBoolean ] // FIXME: Can this handling be moved to the wrapper?
}]
});
return backend.getItem(tx, options);
},
storeItem: function (_tx, _options) {
// NOTE: Using `update` instead of `data` makes it an upsert!
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, {
id: [ required, isString ],
// Tags are required to be specified (even if an empty array) because it's easily forgotten
tags: [ required, arrayOf(isString) ],
aliases: [ defaultTo([]), arrayOf(isString) ],
data: [ anything ], // FIXME: Check for object
update: [ isFunction ],
failIfExists: [ defaultTo(false), isBoolean ],
allowUpsert: [ defaultTo(true), isBoolean ],
parentID: [ defaultTo(item.id), isString ]
}, requireEither([ "data", "update" ]) ]
});
let { data, ... rest } = options;
return mutableOperation((backend) => {
return backend.storeItem(tx, {
... rest,
// We normalize `data` and `update` (which are mutually-exclusive) into a single option here, so that the backend only needs to deal with the `update` case
// TODO: Can this be folded into the validation rules in a reasonable and readable way?
update: (data != null)
? (existingData) => ({ ... existingData, ... data })
: rest.update
});
});
},
moveItem: function (_tx, _options) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, wrapValueAsOption("into"), {
from: [ defaultTo(item.id), isString ],
into: [ required, isString ],
// NOTE: If no `merge` function is specified, that indicates that merging is not allowed (ie. this is strictly a rename), and mergeMetadata is ignored too
merge: [ isFunction ],
mergeMetadata: [ defaultTo({}), anyProperty({
key: [ required ],
value: [ required, isFunction ]
})],
}]
});
return mutableOperation((backend) => {
return backend.moveItem(tx, { options, allowMerge: (options.merge != null) });
});
},
deleteItem: function (_tx, _options) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [
defaultTo({}),
wrapValueAsOption("id"), {
id: [ defaultTo(item.id), isString ]
}
]
});
return mutableOperation((backend) => {
return backend.deleteItem(tx, options);
});
},
createAlias: function (_tx, _options) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, wrapValueAsOption("from"), {
from: [ required, isString ],
to: [ defaultTo(item.id), isString ],
failIfExists: [ defaultTo(false), isBoolean ] // TODO: Shouldn't this default to true, for any occurrence outside of a merge/rename?
}]
});
return mutableOperation((backend) => {
return backend.createAlias(tx, options);
});
},
deleteAlias: function (_tx, _options) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, wrapValueAsOption("from"), {
from: [ required, isString ]
}]
});
return mutableOperation((backend) => {
return backend.deleteAlias(tx, options);
});
},
updateData: function (_tx, _options) {
// NOTE: This is a semantically self-describing convenience wrapper for `createItem` that updates the currently-being-processed item
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, wrapValueAsOption("update"), {
id: [ defaultTo(item.id), isString ],
update: [ required, isFunction ]
}]
});
return mutableOperation((backend) => {
return backend.createItem(tx, {
... options,
tags: []
});
});
},
updateMetadata: function (_tx, _options) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, wrapValueAsOption("update"), {
id: [ defaultTo(item.id), isString ],
update: [ required, isFunction ],
task: [ required, isTask ]
}]
});
return mutableOperation((backend) => {
return backend.updateMetadata(tx, options);
});
},
expire: function (_tx, _options) {
// TODO: It probably doesn't make any semantic sense to leave *both* arguments unspecified. Maybe that should be prohibited via eg. a non-exclusive requireEither? Otherwise the user might expect to immediately expire the *current* task, but since the task is only updated *after* the task logic runs, that is not currently possible to express.
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, {
id: [ defaultTo(item.id), isString ],
isTask: [ defaultTo(task), isTask ]
}]
});
return mutableOperation((backend) => {
return backend.expire(tx, options);
});
},
expireDependents: function (_tx, _options) {
// NOTE: This method does not have a counterpart in the database backend; it's a convenience abstraction over regular `backend.expire` calls
let [ tx, { id, dependents }] = validateArguments(arguments, {
tx: maybeTX,
options: [ defaultTo({}), wrapValueAsOption("dependents"), {
id: [ defaultTo(item.id), isString ],
dependents: [ defaultTo([]), arrayOf(isString), (dependents) => {
// Only consider dependents that actually exist for this task
return dependents.filter((dependent) => dependentTaskNames.has(dependent));
}]
}]
});
// FIXME: This doesn't work with the synchronous queueing model
return Promise.map(dependents, (dependent) => {
return this.expire(tx, {
id: id,
taskName: dependent
});
});
},
// Temporary compatibility aliases
createItem: (... args) => this.storeItem(... args),
mergeItem: (... args) => this.moveItem(... args),
renameItem: (tx, options) => {
if (typeof options === "string") {
return this.moveItem(tx, options);
} else {
return this.moveItem(tx, { into: options.to, from: options.from });
}
},
}
exposed: exposedAPI
};
}
};

@ -38,9 +38,10 @@ module.exports = function(state) {
directory: migrationsFolder
});
}).then(() => {
let db = models({ ... state, knex: knex });
let queryState = { ... state, knex, db };
// TODO: Does it really make sense to be merging in the backendSettings here? Shouldn't that happen automatically in some way for *every* backend, rather than just the PostgreSQL one specifically? As backend settings are a generic backend feature
state = { ... state, knex: knex, backendSettings: options};
let db = models(state);
state = { ... state, db: db };
// TODO: Should this be inlined instead?
function repointAliases (tx, { to, from }) {
@ -104,7 +105,7 @@ module.exports = function(state) {
});
// TODO: return true/false depending on whether unlocking succeeded?
},
getItem: function (tx, { id, optional }) {
return Promise.try(() => {
return db.Alias.relatedQuery("item", tx)
@ -122,7 +123,7 @@ module.exports = function(state) {
storeItem: function (tx, { id, tags, aliases, update, failIfExists, allowUpsert, parentID }) {
// FIXME: Make failIfExists actually work, currently it does nothing as the UNIQUE constraint violation cannot occur for an upsert
// TODO: Ensure that we run the transaction in full isolation mode, and retry in case of a conflict
return Promise.try(() => {
// NOTE: We look up by alias, since this is an upsert - and so if the specified ID already exists as an alias, we should update the existing item instead of creating a new one with the specified (aliased) ID
return db.Alias
@ -343,8 +344,8 @@ module.exports = function(state) {
});
},
getUpdateStream: require("./queries/get-update-stream")(queryState),
getTaskStream: require("./queries/get-task-stream")(queryState)
getUpdateStream: require("./queries/get-update-stream")(state),
getTaskStream: require("./queries/get-task-stream")(state)
};
});
}

@ -1,6 +1,9 @@
"use strict";
const Promise = require("bluebird");
const debug = require("debug")("srap:backend:postgresql:query:get-task-stream");
const buffer = require("@promistream/buffer");
const pipe = require("@promistream/pipe");
const simpleSource = require("@promistream/simple-source");
const query = `
@ -72,35 +75,38 @@ const query = `
module.exports = function ({ metrics, backendSettings }) {
return function (tx, { task }) {
return simpleSource(() => {
let startTime = Date.now();
return Promise.try(() => {
return tx.raw(query, {
tags: task.tags,
task: task.name,
taskVersion: task.version,
resultLimit: backendSettings.taskBatchSize,
dependencyTaskDefinitions: JSON.stringify(task.dependencies.map((dependency) => {
// Case-mapping for SQL compatibility
return { task_version: dependency.version, task: dependency.name };
}))
return pipe([
simpleSource(() => {
let startTime = Date.now();
return Promise.try(() => {
return tx.raw(query, {
tags: task.tags,
task: task.name,
taskVersion: task.version,
resultLimit: backendSettings.taskBatchSize,
dependencyTaskDefinitions: JSON.stringify(task.dependencies.map((dependency) => {
// Case-mapping for SQL compatibility
return { task_version: dependency.version, task: dependency.name };
}))
});
}).then((result) => {
let timeElapsed = Date.now() - startTime;
metrics.taskFetchTime.labels({ task: task.name }).set(timeElapsed / 1000);
metrics.taskFetchResults.labels({ task: task.name }).set(result.rowCount);
debug(`Task retrieval query for '${task.name}' took ${timeElapsed}ms and produced ${result.rowCount} results`);
if (result.rowCount > 0) {
return result.rows;
} else {
// TODO: Consider using LISTEN/NOTIFY instead?
return Promise.resolve([]).delay(backendSettings.taskBatchDelay);
}
});
}).then((result) => {
let timeElapsed = Date.now() - startTime;
metrics.taskFetchTime.labels({ task: task.name }).set(timeElapsed / 1000);
metrics.taskFetchResults.labels({ task: task.name }).set(result.rowCount);
debug(`Task retrieval query for '${task.name}' took ${timeElapsed}ms and produced ${result.rowCount} results`);
if (result.rowCount > 0) {
return result.rows;
} else {
// TODO: Consider using LISTEN/NOTIFY instead?
return Promise.resolve([]).delay(backendSettings.taskBatchDelay);
}
});
});
}),
buffer()
]);
};
}
};

@ -7,7 +7,7 @@ const combineSequentialStreaming = require("@promistream/combine-sequential-stre
const fromIterable = require("@promistream/from-iterable");
const fromNodeStream = require("@promistream/from-node-stream");
const createTypeTaggingStream = require("./streams/tag-type");
const createTypeTaggingStream = require("../../../streams/tag-type");
module.exports = function ({ db, knex }) {
return function (tx, { timestamp, prefix }) {

@ -1,179 +1,183 @@
"use strict";
const Promise = require("bluebird");
const chalk = require("chalk");
const mergeItems = require("../../semantics/merge-items");
function printTX(tx) {
// TODO: Print entire chain?
return `[tx ${tx.__txID}]`;
return chalk.bold.yellow(`[tx ${tx.__txID ?? "?"}]`);
}
function printItem(id, task) {
if (task != null) {
return `[${id}:${task.name}]`;
return chalk.bold.white(`[${id}][${task.name}]`);
} else {
return `[${id}]`;
return chalk.bold.white(`[${id}]`);
}
}
// FIXME: Move logs to logging hook
function logSimulated(... args) {
console.log(... args);
console.log(chalk.gray(args[0]), ... args.slice(1));
}
module.exports = function (state) {
// NOTE: The simulated backend needs access to the 'real' backend; a task may eg. mutate an item based on its current data, and we'd need to read that from the real data source. The only constraint is that the simulated backend cannot *mutate* anything in the real backend, but reading is fine!
return function attachSimulatedBackend({ backend }) {
return {
defaultSettings: {},
create: function createSimulatedBackend(_options) {
let txCounter = 0;
let locks = new Map(); // Map<task, Set<id>>
return {
shutdown: function () {
return backend.shutdown();
},
getDefaultTransaction: function () {
return { __txID: null };
},
isTransaction: function (value) {
return ("__txID" in value);
},
runInTransaction: function (tx, callback) {
let newTransaction = { __txID: txCounter++, __parentTX: tx };
return callback(newTransaction);
},
lock: function (tx, { id, task }) {
if (!locks.has(task)) {
locks.set(task, new Set());
}
return {
defaultSettings: {},
create: function createSimulatedBackend({ backend }) {
let txCounter = 0;
let locks = new Map(); // Map<task, Set<id>>
return {
shutdown: function () {
return backend.shutdown();
},
getDefaultTransaction: function () {
return { __txID: null };
},
isTransaction: function (value) {
return ("__txID" in value);
},
runInTransaction: function (tx, callback) {
let newTransaction = { __txID: txCounter++, __parentTX: tx };
return callback(newTransaction);
},
lock: function (tx, { id, task }) {
if (!locks.has(task)) {
locks.set(task, new Set());
}
let taskLocks = locks.get(task);
let taskLocks = locks.get(task);
if (taskLocks.has(id)) {
logSimulated(`${printTX(tx)} Already locked: ${printItem(id, task)}`);
return false;
} else {
logSimulated(`${printTX(tx)} Locking ${printItem(id, task)}`);
taskLocks.add(id);
return true;
}
},
unlock: function (tx, { id, task }) {
logSimulated(`${printTX(tx)} Unlocking ${printItem(id, task)}`);
locks.get(task).delete(id);
},
getItem: function (tx, options) {
return backend.getItem(backend.getDefaultTransaction(), options);
},
storeItem: function (tx, { id, parentID, update, tags, aliases, allowUpsert }) {
return Promise.try(() => {
return this.getItem(tx, { id: id, optional: true });
}).then((currentItem) => {
let actualID = currentItem.id ?? id;
let newItem = {
id: actualID,
data: (currentItem != null)
? update(currentItem.data)
: update({}),
createdBy: parentID,
tags: tags,
aliases: aliases.concat([ actualID ]),
updatedAt: new Date()
if (taskLocks.has(id)) {
logSimulated(`${printTX(tx)} Already locked: ${printItem(id, task)}`);
return false;
} else {
logSimulated(`${printTX(tx)} Locking ${printItem(id, task)}`);
taskLocks.add(id);
return true;
}
},
unlock: function (tx, { id, task }) {
logSimulated(`${printTX(tx)} Unlocking ${printItem(id, task)}`);
locks.get(task).delete(id);
},
getItem: function (tx, options) {
return backend.getItem(backend.getDefaultTransaction(), options);
},
storeItem: function (tx, { id, parentID, update, tags, aliases, allowUpsert }) {
return Promise.try(() => {
return this.getItem(tx, { id: id, optional: true });
}).then((currentItem) => {
let actualID = (currentItem != null)
? currentItem.id
: id;
let newItem = {
id: actualID,
data: (currentItem != null)
? update(currentItem.data)
: update({}),
createdBy: parentID,
tags: tags,
aliases: aliases.concat([ actualID ]),
updatedAt: new Date()
};
logSimulated(`${printTX(tx)} ${printItem(id)} Storing item (upsert=${allowUpsert})`, newItem);
});
},
moveItem: function (tx, { from, into, merge, mergeMetadata, allowMerge }) {
return Promise.all([
this.getItem(tx, { id: from, optional: true }),
this.getItem(tx, { id: into, optional: true }),
]).then((fromObj, maybeIntoObj) => {
if (fromObj != null) {
let intoObj = maybeIntoObj ?? {
id: into,
data: {},
taskResults: []
};
logSimulated(`${printTX(tx)} ${printItem(id)} Storing item (upsert=${allowUpsert})`, newItem);
});
},
moveItem: function (tx, { from, into, merge, mergeMetadata, allowMerge }) {
return Promise.all([
this.getItem(tx, { id: from, optional: true }),
this.getItem(tx, { id: into, optional: true }),
]).then((fromObj, maybeIntoObj) => {
if (fromObj != null) {
let intoObj = maybeIntoObj ?? {
id: into,
data: {},
taskResults: []
};
if (allowMerge) {
let newItem = mergeItems({ fromObj, intoObj, merge, mergeMetadata });
logSimulated(`${printTX(tx)} ${printItem(from)} Moving item to ${printItem[into]} (merge=true)`, newItem);
} else {
logSimulated(`${printTX(tx)} ${printItem(from)} Moving item to ${printItem[into]} (merge=false)`);
}
if (allowMerge) {
let newItem = mergeItems({ fromObj, intoObj, merge, mergeMetadata });
logSimulated(`${printTX(tx)} ${printItem(from)} Moving item to ${printItem[into]} (merge=true)`, newItem);
} else {
logSimulated(`${printTX(tx)} ${printItem(from)} Moving item to ${printItem[into]} (merge=false)`);
}
});
},
deleteItem: function (tx, { id }) {
logSimulated(`${printTX(tx)} ${printItem(id)} Deleting item`);
},
createAlias: function (tx, { from, to }) {
logSimulated(`${printTX(tx)} ${printItem(to)} Creating alias ${from}`);
},
deleteAlias: function (tx, { from }) {
logSimulated(`${printTX(tx)} ${printItem(from)} Deleting alias`);
},
updateMetadata: function (tx, { id, task, update }) {
return Promise.try(() => {
return this.getItem(tx, { id, optional: true });
}).then((item) => {
let taskResult = (item != null)
? item.taskResults.find((result) => result.task === task)
: undefined;
let existingData = (taskResult != null)
? taskResult.metadata
: {};
let newData = update(existingData);
logSimulated(`${printTX(tx)} ${printItem(id, task)} Storing metadata`, newData);
});
},
expire: function (tx, { id, task }) {
logSimulated(`${printTX(tx)} ${printItem(id, task)} Expiring task result`);
},
markTaskStatus: function (tx, { id, task, isSuccessful }) {
if (isSuccessful) {
logSimulated(`${printTX(tx)} ${printItem(id, task)} Marking task as completed`);
} else {
logSimulated(`${printTX(tx)} ${printItem(id, task)} Marking task as failed`);
}
},
countLockedTasks: function (tx) {
return Array.from(locks).reduce((total, [ task, taskLocks ]) => {
return total + taskLocks.size;
}, 0);
},
getUpdateStream: function (... args) {
return backend.getUpdateStream(... args);
},
getTaskStream: function (... args) {
return backend.getTaskStream(... args);
});
},
deleteItem: function (tx, { id }) {
logSimulated(`${printTX(tx)} ${printItem(id)} Deleting item`);
},
createAlias: function (tx, { from, to }) {
logSimulated(`${printTX(tx)} ${printItem(to)} Creating alias ${from}`);
},
deleteAlias: function (tx, { from }) {
logSimulated(`${printTX(tx)} ${printItem(from)} Deleting alias`);
},
updateMetadata: function (tx, { id, task, update }) {
return Promise.try(() => {
return this.getItem(tx, { id, optional: true });
}).then((item) => {
let taskResult = (item != null)
? item.taskResults.find((result) => result.task === task)
: undefined;
let existingData = (taskResult != null)
? taskResult.metadata
: {};
let newData = update(existingData);
logSimulated(`${printTX(tx)} ${printItem(id, task)} Storing metadata`, newData);
});
},
expire: function (tx, { id, task }) {
logSimulated(`${printTX(tx)} ${printItem(id, task)} Expiring task result`);
},
markTaskStatus: function (tx, { id, task, isSuccessful }) {
if (isSuccessful) {
logSimulated(`${printTX(tx)} ${printItem(id, task)} Marking task as completed`);
} else {
logSimulated(`${printTX(tx)} ${printItem(id, task)} Marking task as failed`);
}
};
}
};
},
countLockedTasks: function (tx) {
return Array.from(locks).reduce((total, [ task, taskLocks ]) => {
return total + taskLocks.size;
}, 0);
},
getUpdateStream: function (... args) {
return backend.getUpdateStream(... args);
},
getTaskStream: function (... args) {
return backend.getTaskStream(... args);
}
};
}
};
};

@ -1,21 +1,16 @@
"use strict";
const Promise = require("bluebird");
const defaultValue = require("default-value");
const chalk = require("chalk");
const util = require("util");
const syncpipe = require("syncpipe");
const rateLimit = require("@promistream/rate-limit");
const simpleSink = require("@promistream/simple-sink");
const pipe = require("@promistream/pipe");
const parallelize = require("@promistream/parallelize");
const logStatus = require("./log-status");
const { validateOptions } = require("@validatem/core");
const isValidConfiguration = require("./validators/is-valid-configuration");
const createPrometheus = require("./prometheus");
const generateTaskGraph = require("./generate-task-graph");
const generateTaskGraph = require("./util/generate-task-graph");
const unreachable = require("@joepie91/unreachable")("srap");
// FIXME: *Require* a taskInterval to be set, even if explicitly null, to prevent accidentally forgetting it
@ -46,7 +41,8 @@ module.exports = async function createKernel(_configuration) {
Object.assign(state, { backend: backend });
const createTaskKernel = require("./task-kernel")(state);
const createTaskKernel = require("./streams/task-kernel")(state);
const runTask = require("./run-task")(state);
function checkLockedTasks() {
return Promise.try(() => {
@ -63,23 +59,25 @@ module.exports = async function createKernel(_configuration) {
if (databasePreparePromise == null) {
databasePreparePromise = Promise.all([
checkLockedTasks(),
backend.topLevel.insertSeeds(configuration.seed)
backend.topLevel.insertSeeds(null, { seeds: configuration.seed })
]);
}
return databasePreparePromise;
}
// FIXME: Don't dump to console.log below, since this is meant to be usable as a library as well - provide some sort of object logging hook instead?
return {
run: async function runKernel() {
console.log(`Starting ${tasks.size} tasks...`);
await prepareDatabase();
return Promise.map(tasks.values(), (task) => {
return pipe([
createTaskKernel(task),
simpleSink(({ status, item, error }) => {
createTaskKernel(task, {
globalRateLimiter: (attachToGlobalRateLimit != null) ? attachToGlobalRateLimit() : null,
}),
simpleSink(({ status }) => {
if (status === "completed") {
metrics.successfulItems.inc(1);
metrics.successfulItems.labels({ task: task }).inc(1);
@ -93,187 +91,38 @@ module.exports = async function createKernel(_configuration) {
]).read();
});
},
simulate: async function simulate({ itemID, task }) {
simulate: async function simulate({ itemID, task: taskName }) {
console.log(`Simulating task ${itemID}/${taskName}...`);
await prepareDatabase();
let simulatedBackend = backend.simulate();
return simulateTask(itemID, task);
},
execute: async function simulate({ itemID, task }) {
await prepareDatabase();
return executeTask(itemID, task);
},
shutdown: function () {
// TODO: Properly lock all public methods after shutdown is called, and wait for any running tasks to have completed
return backend.shutdown();
},
getMetrics: function () {
return Promise.try(() => {
return state.prometheusRegistry.metrics();
}).then((metrics) => {
return {
contentType: state.prometheusRegistry.contentType,
metrics: metrics
};
});
}
};
function runTaskStreams() {
let simulatedBackend = backend.topLevel.simulate();
return Promise.map(Object.entries(tasks), ([ task, tags ]) => {
let taskConfiguration = configuration.tasks[task];
if (taskConfiguration != null) {
let taskStream = createTaskStream({
task: task,
tags: tags,
taskVersion: defaultValue(taskConfiguration.version, "0"),
taskInterval: taskConfiguration.taskInterval,
parallelTasks: taskConfiguration.parallelTasks,
ttl: taskConfiguration.ttl,
run: taskConfiguration.run,
globalRateLimiter: (attachToGlobalRateLimit != null)
? attachToGlobalRateLimit()
: null,
globalParallelize: (configuration.parallelTasks != null)
? parallelize(configuration.parallelTasks)
: null,
taskDependencies: dependencyMap[task],
taskDependents: dependentMap[task]
});
return pipe([
taskStream,
simpleSink((completedItem) => {
metrics.successfulItems.inc(1);
metrics.successfulItems.labels({ task: task }).inc(1);
logStatus(task, chalk.bold.green, "completed", completedItem.id);
})
]).read();
} else {
throw new Error(`Task '${task}' is defined to run for tags [${tags}], but no such task is defined`);
}
}).catch((error) => {
console.dir(error, { depth: null, colors: true });
throw error;
});
}
function executeTask(id, task) {
let taskConfiguration = configuration.tasks[task];
return knex.transaction((tx) => {
return Promise.try(() => {
return queries.getItem(knex, id);
}).then((item) => {
let queue = createDatabaseQueue({
tx,
item,
task,
taskVersion: defaultValue(taskConfiguration.version, "0"),
taskDependents: dependentMap[task],
taskDependencies: dependencyMap[task]
});
return Promise.try(() => {
return taskConfiguration.run({
id: item.id,
data: item.data,
getItem: function (id) {
return queries.getItem(knex, id);
},
... queue.api
});
}).then(() => {
return queue.execute();
});
});
}, { doNotRejectOnRollback: false });
}
function simulateTask(id, task) {
let taskConfiguration = configuration.tasks[task];
let methods = [ "createItem", "renameItem", "mergeItem", "deleteItem", "createAlias", "deleteAlias", "updateData", "updateMetadata", "expire", "expireDependents" ];
let simulatedMethods = syncpipe(methods, [
(_) => _.map((method) => [ method, function() {
console.log(`${chalk.bold.yellow.bgBlack(`${method} (simulated):`)} ${util.inspect(arguments, { colors: true, depth: null })}`);
}]),
(_) => Object.fromEntries(_)
]);
return Promise.try(() => {
return queries.getItem(knex, id);
}).then((item) => {
return taskConfiguration.run({
id: item.id,
data: item.data,
getItem: function (id) {
return queries.getItem(knex, id);
},
... simulatedMethods
let simulateTask = require("./run-task")({
... state,
backend: simulatedBackend
});
});
}
return {
run: function runKernel() {
return Promise.try(() => {
return insertSeeds();
}).then(() => {
return checkLockedTasks();
}).then(() => {
return runTaskStreams();
});
},
simulate: function simulate({ itemID, task }) {
return Promise.try(() => {
return insertSeeds();
}).then(() => {
return checkLockedTasks();
}).then(() => {
return simulateTask(itemID, task);
});
let item = await simulatedBackend.topLevel.getItem(null, { id: itemID });
return simulateTask(tasks.get(taskName), item);
},
execute: function simulate({ itemID, task }) {
return Promise.try(() => {
return insertSeeds();
}).then(() => {
return checkLockedTasks();
}).then(() => {
return executeTask(itemID, task);
});
execute: async function simulate({ itemID, task: taskName }) {
// TODO: Should this also lock the task? We probably want to ignore any locks, since this method is primarily used for task logic debugging purposes, and overriding locks would be desirable there.
console.log(`Running task ${itemID}/${taskName}...`);
await prepareDatabase();
let item = await backend.topLevel.getItem(null, { id: itemID });
return runTask(tasks.get(taskName), item);
},
shutdown: function () {
// TODO: Properly lock all public methods after shutdown is called, and wait for any running tasks to have completed
knex.destroy();
return backend.topLevel.shutdown();
},
getMetrics: function () {
return Promise.try(() => {
return prometheusRegistry.metrics();
return state.prometheusRegistry.metrics();
}).then((metrics) => {
return {
contentType: prometheusRegistry.contentType,
contentType: state.prometheusRegistry.contentType,
metrics: metrics
};
});

@ -1,36 +0,0 @@
"use strict";
const Promise = require("bluebird");
const consumable = require("@joepie91/consumable");
const syncpipe = require("syncpipe");
const createMutationAPIWrapper = require("./mutation-api/wrapper");
module.exports = function (state) {
const createDatabaseMutationAPI = require("./mutation-api/database")(state);
return function createDatabaseQueue(context) {
let databaseMutationAPI = createDatabaseMutationAPI(context);
let mutationAPI = createMutationAPIWrapper(context, databaseMutationAPI);
let queue = consumable([]);
return {
api: syncpipe(Object.keys(mutationAPI), [
(_) => _.map((method) => [ method, function() { queue.peek().push([ method, arguments ]); } ]),
(_) => Object.fromEntries(_)
]),
execute: function () {
if (!queue.peek().some((method) => method[0] === "updateMetadata")) {
// Doing an updateMetadata call is necessary to mark a task 'completed', so we inject a dummy call that doesn't actually change the metadata itself
// FIXME: Split apart 'markTaskCompleted' and 'updateMetadata' queries so that this hack is no longer necessary
queue.peek().push([ "updateMetadata", [ (data) => data ]]);
}
return Promise.each(queue.consume(), ([ method, args ]) => {
return mutationAPI[method](... args);
});
}
};
};
};

@ -0,0 +1,39 @@
"use strict";
const Promise = require("bluebird");
const chalk = require("chalk");
const logStatus = require("./util/log-status");
module.exports = function ({ backend }) {
return function runTask(task, item) {
let queue = [];
let api = backend.forItem({ task: task, item: item, mutationQueue: queue });
return Promise.try(() => {
logStatus(task, chalk.bold.cyan, "started", item.id);
// NOTE: We only pass in the item data itself, *not* any associated metadata like tags. If the scraping task wants access to that sort of information, it should do a `getItem` call from within its task logic where needed.
// FIXME: Is that actually still true post-refactor?
return task.run({
data: item.data,
... api.exposed
});
}).then(() => {
// NOTE: We only apply changes at the very end (outside of simulation mode), so that when a task implementation contains multiple operations, each of those operation always 'sees' the state at the start of the task, not the state after the previous mutation. This makes the model as a whole easier to reason about. In simulation mode, all calls are immediate and the queue is empty - after all, no mutation can happen in that case anyway. This is also another reason to ensure that operations in live mode always see the starting state; that makes its behaviour consistent with simulation mode.
return backend.topLevel.runInTransaction(null, (tx) => {
return Promise.each(queue, (operation) => {
return operation(tx);
});
});
}).then(async () => {
await api.internal.markTaskCompleted();
logStatus(task, chalk.bold.green, "completed", item.id);
return { status: "completed", item: item };
}).catch(async (error) => {
await api.internal.markTaskFailed(null, { error });
logStatus(task, chalk.bold.red, "failed", `${item.id}: ${error.stack}`);
return { status: "failed", item: item, error: error };
});
};
};

@ -1,42 +0,0 @@
"use strict";
const Promise = require("bluebird");
// const { UniqueViolationError } = require("objection");
const pipe = require("@promistream/pipe");
const map = require("@promistream/map");
const mapFilter = require("@promistream/map-filter");
module.exports = function ({ backend }) {
return function processTaskSafely(task, processHandler) {
let lockStream = mapFilter((item) => {
return Promise.try(() => {
return backend.lock(null, { id: item.id, task: task });
}).then((success) => {
if (success) {
return item;
} else {
return mapFilter.NoValue;
}
});
});
let processUnlockStream = map((item) => {
return Promise.try(() => {
return backend.runInTransaction((tx) => {
return processHandler(item, tx);
});
}).finally(() => {
// NOTE: The unlock deliberately happens outside of a transaction, so that it can always succeed, even if a task and its associated database changes failed
return backend.unlock(null, { id: item.id, task: task });
}).then(() => {
return item;
});
});
return pipe([
lockStream,
processUnlockStream
]);
};
};

@ -0,0 +1,37 @@
"use strict";
const Promise = require("bluebird");
const pipe = require("@promistream/pipe");
const filter = require("@promistream/filter");
const map = require("@promistream/map");
const rateLimit = require("@promistream/rate-limit");
const parallelize = require("@promistream/parallelize");
// FIXME: Move logs to logging hook
module.exports = function (state) {
let { backend } = state;
const runTask = require("../run-task")(state);
return function createTaskKernelStream(task, { globalRateLimiter }) {
return pipe([
backend.topLevel.getTaskStream(null, { task: task }),
filter((item) => backend.forItem({ task: task, item: item }).internal.lock()),
globalRateLimiter,
(task.taskInterval != null)
? rateLimit(task.taskInterval)
: null,
map((item) => {
return Promise.try(() => {
return runTask(task, item);
}).tap(() => {
return backend.forItem({ task: task, item: item }).internal.unlock();
});
}),
(task.parallelTasks != null)
? parallelize(task.parallelTasks)
: null
]);
};
};

@ -1,53 +0,0 @@
"use strict";
const chalk = require("chalk");
const pipe = require("@promistream/pipe");
const filter = require("@promistream/filter");
const map = require("@promistream/map");
const logStatus = require("./log-status");
module.exports = function ({ backend }) {
function runTask(task, item) {
let queue = [];
let api = backend.forItem({ task: task, id: item.id, mutationQueue: queue });
return Promise.try(() => {
logStatus(task, chalk.bold.cyan, "started", item.id);
// NOTE: We only pass in the item data itself, *not* any associated metadata like tags. If the scraping task wants access to that sort of information, it should do a `getItem` call from within its task logic where needed.
// FIXME: Is that actually still true post-refactor?
task.run({
data: item.data,
... api.exposed
});
}).then(() => {
return backend.topLevel.runInTransaction((tx) => {
// FIXME: use queue
});
}).then(async () => {
await api.internal.markTaskCompleted();
logStatus(task, chalk.bold.green, "completed", item.id);
return { status: "completed", item: item };
}).catch(async (error) => {
await api.internal.markTaskFailed(null, { error });
logStatus(task, chalk.bold.red, "failed", `${item.id}: ${error.stack}`);
return { status: "failed", item: item, error: error };
});
}
return function createTaskKernelStream(task) {
return pipe([
backend.topLevel.getTaskStream(null, { task: task }),
filter((item) => backend.forItem({ task: task, id: item.id }).internal.lock()),
map((item) => {
return Promise.try(() => {
return runTask(task, item);
}).tap(() => {
return backend.forItem({ task: task, id: item.id }).internal.unlock();
});
})
]);
};
};

@ -3,7 +3,7 @@
const syncpipe = require("syncpipe");
const defaultValue = require("default-value"); // FIXME: Move to config validation
const invertMapping = require("./util/invert-mapping");
const invertMapping = require("./invert-mapping");
module.exports = function generateTaskGraph({ tags, tasks }) {
let tagsMapping = invertMapping(tags);
@ -14,7 +14,8 @@ module.exports = function generateTaskGraph({ tags, tasks }) {
return [ name, {
... taskDefinition,
name: name,
tags: tagsMapping[name],
// NOTE: The default here is for cases where a task is 'orphaned' and not associated with any tags; this can happen during development, and in that case the task won't be present in the tagsMapping at all.
tags: tagsMapping[name] ?? [],
dependencies: [],
dependents: []
}];

@ -3,5 +3,5 @@
const chalk = require("chalk");
module.exports = function logStatus(task, color, type, message) {
console.log(`${chalk.bold(`[${task}]`)} ${color(`[${type}]`)} ${message}`);
console.log(`${chalk.bold(`[${task.name}]`)} ${color(`[${type}]`)} ${message}`);
};

@ -5,6 +5,8 @@ const arrayOf = require("@validatem/array-of");
const isString = require("@validatem/is-string");
const isFunction = require("@validatem/is-function");
const isArray = require("@validatem/is-array");
const either = require("@validatem/either");
const isValue = require("@validatem/is-value");
const isPositiveInteger = require("./is-positive-integer");
@ -18,7 +20,7 @@ function makeRules(recurse) {
name: [ required, isString ],
version: [ required, isString ],
ttl: [ isPositiveInteger ],
parallelTasks: [ isPositiveInteger ],
parallelTasks: [ either([ isPositiveInteger, isValue(Infinity) ]) ],
taskInterval: [ isPositiveInteger ],
dependents: [ required, isTaskArray ],
dependencies: [ required, isTaskArray ],

@ -34,7 +34,7 @@ module.exports = {
value: [ required, {
ttl: [ isMS ],
taskInterval: [ isMS ],
parallelTasks: [ defaultTo, either([
parallelTasks: [ defaultTo(1), either([
[ isValue(Infinity) ],
[ isInteger, isPositive ]
])],

@ -0,0 +1,3 @@
- locks table: make locked status a field instead of based on existence
- rename locks table to queue table
- insert tasks into queue table whenever drained for a task
Loading…
Cancel
Save