You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

462 lines
17 KiB
JavaScript

"use strict";
const Promise = require("bluebird");
const unreachable = require("@joepie91/unreachable");
const { validateArguments, validateOptions } = require("@validatem/core");
const required = require("@validatem/required");
const requireEither = require("@validatem/require-either");
const isString = require("@validatem/is-string");
const isBoolean = require("@validatem/is-boolean");
const isFunction = require("@validatem/is-function");
const isDate = require("@validatem/is-date");
const arrayOf = require("@validatem/array-of");
const defaultTo = require("@validatem/default-to");
const anyProperty = require("@validatem/any-property");
const anything = require("@validatem/anything");
const wrapValueAsOption = require("@validatem/wrap-value-as-option");
const ValidationError = require("@validatem/error");
const either = require("@validatem/either");
const isTaskObject = require("../validators/is-task-object");
// NOTE: The purpose of this module is to implement all the database API logic that's common across database backends; that mainly means initialization, settings merging, and input validation/normalization. Each individual database backend can then assume that it will always be called with valid input.
/* TODO: API semantics to document:
- All mutating operations are queued, and executed in a transaction *after* the task has been completed; this is to avoid long-lived transactions interfering with task concurrency
- All read-only operations (as specified in the task logic) are executed immediately, bypassing any queue, and outside of any transaction.
- This means that read-only operations see the state of the database *before* any mutations take place, but mutating methods see the state *after* any preceding mutations (within that task) have taken place.
- Simulated execution works exactly like regular execution, except the mutation transaction is cancelled at the last moment, preventing any true changes to the database. This essentially means you are operating on a read-only view of the database.
- This also means that a transactional database is *required* to build a srap backend. If you want to implement a backend for a non-transactional database and are able to provide semantically equivalent simulation functionality otherwise, please open an issue!
*/
// FIXME: Verify that all internal method calls in the PostgreSQL backend are still valid after moving argument validation/normalization into this module
module.exports = function (state) {
let { tasks, metrics } = state;
const backendModules = {
"postgresql": require("./postgresql")(state),
"simulated": require("./simulated")(state),
};
function wrapBackend(backend) {
// NOTE: The backend.getDefaultTransaction method MUST return synchronously.
// TODO: Only accept an explicit `null` when defaulting, not an `undefined` which may be implicit? To ensure that the caller didn't just forget to provide one. Though most/all queries have arguments coming after the TX, so this might not be necessary.
let maybeTX = [ defaultTo(backend.getDefaultTransaction), backend.isTransaction ];
function maybeGetTaskObject(taskName) {
if (tasks.has(taskName)) {
return tasks.get(taskName);
} else {
throw new ValidationError(`No task named '${taskName}' exists`);
}
}
let isTask = either([
[ isTaskObject ],
[ isString, maybeGetTaskObject, isTaskObject ]
]);
return {
topLevel: {
simulate: function () {
// NOTE: Simulated backend is initialized synchronously; all other backends are not!
return wrapBackend(backendModules.simulated.create({ backend: backend }));
},
shutdown: function () {
return backend.shutdown();
},
getQueueSize: function (_tx) {
let [ tx ] = validateArguments(arguments, {
tx: maybeTX
});
return backend.getQueueSize(tx);
},
runInTransaction: function (_tx, _callback) {
let [ tx, callback ] = validateArguments(arguments, {
tx: maybeTX,
callback: [ required, isFunction ]
});
return backend.runInTransaction(tx, callback);
},
countLockedTasks: function (_tx) {
let [ tx ] = validateArguments(arguments, {
tx: maybeTX
});
return backend.countLockedTasks(tx);
},
getUpdateStream: function (_tx, _options) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ defaultTo({}), {
timestamp: [ isDate ],
prefix: [ isString ]
}]
});
return backend.getUpdateStream(tx, options);
},
getTaskStream: function (_tx, _options) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, {
task: [ required, isTask ]
}]
});
return backend.getTaskStream(tx, options);
},
insertSeeds: function (_tx, _options) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, {
// FIXME: This currently duplicates validation logic from forItem.storeItem; figure out a way to deduplicate that
seeds: [ required, arrayOf({
id: [ required, isString ],
// Tags are required to be specified (even if an empty array) because it's easily forgotten
tags: [ required, arrayOf(isString) ],
aliases: [ defaultTo([]), arrayOf(isString) ],
data: [ required, anything ], // FIXME: Check for object
}) ]
}]
});
return Promise.map(options.seeds, (seed) => {
let { data, ... props } = seed;
return backend.storeItem(tx, {
... props,
update: () => data,
allowUpsert: false,
failIfExists: false
});
});
},
// FIXME: Other than the missing readOperation wrapper and the tx argument, this is *basically* the same logic as under forItem... this should be simplified somehow.
getItem: function (_tx, _options) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, wrapValueAsOption("id"), {
id: [ required, isString ],
optional: [ defaultTo(false), isBoolean ] // FIXME: Can this handling be moved to the wrapper?
}]
});
return backend.getItem(tx, options);
},
},
forItem: function (_options) {
// FIXME: Proper validation rules here for the other fields as well
let { item, task, mutationQueue, readTX, simulate } = validateOptions(arguments, {
item: anything,
task: [ required, isTask ],
mutationQueue: anything,
readTX: maybeTX,
simulate: anything
});
// We create a new instance of the actual API for every item being processed. This is necessary because some of the input arguments will default to item-specific values, and some of the logic is dependent on task-specific metadata. This is a more efficient (and understandable) approach than pretending the API is stateless and then separately wrapping the API *again* for every individual item with a whole separate layer of input validation rules.
// FIXME: Is this still correct, with the new task (graph) format?
let dependentTaskNames = new Set(task.dependents.map((dependent) => dependent.task));
function mutableOperation(func) {
if (simulate === true) {
return func(readTX, backend);
} else if (mutationQueue != null) {
mutationQueue.push(func);
} else {
unreachable("No mutation queue provided in live mode");
}
}
function readOperation(func) {
return func(readTX, backend);
}
let exposedAPI = {
// NOTE: 'exposed' API methods are the ones that are passed into a user-defined task, and which the task uses to eg. update or create new items
getItem: function (_options) {
let [ options ] = validateArguments(arguments, {
options: [ required, wrapValueAsOption("id"), {
id: [ required, isString ],
optional: [ defaultTo(false), isBoolean ] // FIXME: Can this handling be moved to the wrapper?
}]
});
return readOperation((tx) => {
return backend.getItem(tx, options);
});
},
storeItem: function (_options) {
// NOTE: Using `update` instead of `data` makes it an upsert!
// FIXME: Add an 'expire' flag for expiring any existing task results for this item? To trigger re-evaluation on updates
let [ options ] = validateArguments(arguments, {
options: [ required, {
id: [ required, isString ],
// Tags are required to be specified (even if an empty array) because it's easily forgotten
tags: [ required, arrayOf(isString) ],
aliases: [ defaultTo([]), arrayOf(isString) ],
data: [ anything ], // FIXME: Check for object
update: [ isFunction ],
failIfExists: [ defaultTo(false), isBoolean ],
allowUpsert: [ defaultTo(true), isBoolean ],
parentID: [ defaultTo(item.id), isString ]
}, requireEither([ "data", "update" ]) ]
});
let { data, ... rest } = options;
return mutableOperation((tx) => {
return backend.storeItem(tx, {
... rest,
// We normalize `data` and `update` (which are mutually-exclusive) into a single option here, so that the backend only needs to deal with the `update` case
// TODO: Can this be folded into the validation rules in a reasonable and readable way?
update: (data != null)
? (existingData) => ({ ... existingData, ... data })
: rest.update
});
});
},
moveItem: function (_options) {
let [ options ] = validateArguments(arguments, {
options: [ required, wrapValueAsOption("into"), {
from: [ defaultTo(item.id), isString ],
into: [ required, isString ],
// NOTE: If no `merge` function is specified, that indicates that merging is not allowed (ie. this is strictly a rename), and mergeMetadata is ignored too
merge: [ isFunction ],
mergeMetadata: [ defaultTo({}), anyProperty({
key: [ required ],
value: [ required, isFunction ]
})],
}]
});
return mutableOperation((tx) => {
return backend.moveItem(tx, { ... options, allowMerge: (options.merge != null) });
});
},
deleteItem: function (_options) {
let [ options ] = validateArguments(arguments, {
options: [
defaultTo({}),
wrapValueAsOption("id"), {
id: [ defaultTo(item.id), isString ]
}
]
});
return mutableOperation((tx) => {
return backend.deleteItem(tx, options);
});
},
createAlias: function (_options) {
let [ options ] = validateArguments(arguments, {
options: [ required, wrapValueAsOption("from"), {
from: [ required, isString ],
to: [ defaultTo(item.id), isString ],
failIfExists: [ defaultTo(false), isBoolean ] // TODO: Shouldn't this default to true, for any occurrence outside of a merge/rename?
}]
});
return mutableOperation((tx) => {
return backend.createAlias(tx, options);
});
},
deleteAlias: function (_options) {
let [ options ] = validateArguments(arguments, {
options: [ required, wrapValueAsOption("from"), {
from: [ required, isString ]
}]
});
return mutableOperation((tx) => {
return backend.deleteAlias(tx, options);
});
},
updateData: function (_options) {
// NOTE: This is a semantically self-describing convenience wrapper for `storeItem` that updates the currently-being-processed item
let [ options ] = validateArguments(arguments, {
options: [ required, wrapValueAsOption("update"), {
id: [ defaultTo(item.id), isString ],
update: [ required, isFunction ]
}]
});
return exposedAPI.storeItem({
... options,
tags: []
});
},
updateMetadata: function (_options) {
let [ options ] = validateArguments(arguments, {
options: [ required, wrapValueAsOption("update"), {
id: [ defaultTo(item.id), isString ],
update: [ required, isFunction ],
task: [ required, isTask ]
}]
});
return mutableOperation((tx) => {
return backend.updateMetadata(tx, options);
});
},
expire: function (_options) {
// TODO: It probably doesn't make any semantic sense to leave *both* arguments unspecified. Maybe that should be prohibited via eg. a non-exclusive requireEither? Otherwise the user might expect to immediately expire the *current* task, but since the task is only updated *after* the task logic runs, that is not currently possible to express.
let [ options ] = validateArguments(arguments, {
options: [ required, {
id: [ defaultTo(item.id), isString ],
isTask: [ defaultTo(task), isTask ]
}]
});
return mutableOperation((tx) => {
return backend.expire(tx, options);
});
},
expireDependents: function (_options) {
// NOTE: This method does not have a counterpart in the database backend; it's a convenience abstraction over regular `backend.expire` calls
let [{ id, dependents }] = validateArguments(arguments, {
options: [ defaultTo({}), wrapValueAsOption("dependents"), {
id: [ defaultTo(item.id), isString ],
dependents: [ defaultTo([]), arrayOf(isString), (dependents) => {
// Only consider dependents that actually exist for this task
return dependents.filter((dependent) => dependentTaskNames.has(dependent));
}]
}]
});
// NOTE: This works even with queueing, because each this.expire call just internally queues another operation
return Promise.map(dependents, (dependent) => {
return exposedAPI.expire({
id: id,
taskName: dependent
});
});
},
// Temporary compatibility aliases
createItem: (... args) => exposedAPI.storeItem(... args),
mergeItem: (... args) => exposedAPI.moveItem(... args),
renameItem: (options) => {
if (typeof options === "string") {
return exposedAPI.moveItem(options);
} else {
return exposedAPI.moveItem({ into: options.to, from: options.from });
}
},
};
return {
// NOTE: 'internal' API methods are accessible to srap, but not to user-defined tasks.
internal: {
lock: function (_tx, _options) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ defaultTo({}), {
id: [ defaultTo(item.id), isString ],
task: [ defaultTo(task), isTask ]
}]
});
return Promise.try(() => {
return backend.lock(tx, options);
}).tap((succeeded) => {
if (succeeded) {
metrics.successfulLocks.labels({ task: task.name }).inc(1);
} else {
metrics.failedLocks.labels({ task: task.name }).inc(1);
}
});
},
unlock: function (_tx, _options) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ defaultTo({}), {
id: [ defaultTo(item.id), isString ],
task: [ defaultTo(task), isTask ]
}]
});
return backend.unlock(tx, options);
},
markTaskCompleted: function (_tx) {
// TODO: Allow specifying a different task or item ID?
let [ tx ] = validateArguments(arguments, {
tx: maybeTX
});
return backend.markTaskStatus(tx, {
id: item.id,
task: task,
isSuccessful: true
});
},
markTaskFailed: function (_tx, _options) {
// TODO: Allow specifying a different task or item ID?
let [ tx, { error }] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, {
error: [ required, anything ] // TODO: Restrict to Error types?
}]
});
// FIXME: Persist error
console.error("FIXME(persist error):", error.stack);
return backend.markTaskStatus(tx, {
id: item.id,
task: task,
isSuccessful: false
});
},
},
exposed: exposedAPI
};
}
};
}
return function createDatabaseAPI({ backend: backendName, options, simulate }) {
// TODO: Validate inputs here, or maybe that belongs in config validation instead?
return Promise.try(() => {
let backendModule = backendModules[backendName];
if (backendModule != null) {
// FIXME: Smarter merge, maybe expose merge-by-template merge rules from the database module?
let settings = { ... backendModule.defaultSettings, ... options };
return backendModule.create(settings);
} else {
throw new Error(`No backend named '${backendName}' exists`);
}
}).then((backend) => {
return wrapBackend(backend);
});
};
};