Merge branch 'backend-refactor'
This commit is contained in:
commit
fb93e902a8
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -1 +1,2 @@
|
|||
node_modules
|
||||
junk
|
||||
|
|
17
migrations/20220601113806_task-results-optional.js
Normal file
17
migrations/20220601113806_task-results-optional.js
Normal file
|
@ -0,0 +1,17 @@
|
|||
"use strict";
|
||||
|
||||
module.exports.up = function(knex, Promise) {
|
||||
return knex.schema
|
||||
.alterTable("srap_task_results", (table) => {
|
||||
table.jsonb("metadata").notNullable().default({}).alter();
|
||||
table.boolean("is_successful").nullable().alter();
|
||||
});
|
||||
};
|
||||
|
||||
module.exports.down = function(knex, Promise) {
|
||||
return knex.schema
|
||||
.alterTable("srap_task_results", (table) => {
|
||||
table.jsonb("metadata").notNullable().alter();
|
||||
table.boolean("is_successful").notNullable().alter();
|
||||
});
|
||||
};
|
|
@ -7,8 +7,10 @@
|
|||
"license": "WTFPL OR CC0-1.0",
|
||||
"dependencies": {
|
||||
"@joepie91/consumable": "^1.0.1",
|
||||
"@joepie91/unreachable": "^1.0.0",
|
||||
"@promistream/buffer": "^0.1.1",
|
||||
"@promistream/combine-sequential-streaming": "^0.1.0",
|
||||
"@promistream/filter": "^0.1.1",
|
||||
"@promistream/from-iterable": "^0.1.0",
|
||||
"@promistream/from-node-stream": "^0.1.1",
|
||||
"@promistream/map": "^0.1.1",
|
||||
|
@ -23,12 +25,17 @@
|
|||
"@validatem/array-of": "^0.1.2",
|
||||
"@validatem/core": "^0.3.16",
|
||||
"@validatem/default-to": "^0.1.0",
|
||||
"@validatem/either": "^0.1.9",
|
||||
"@validatem/error": "^1.1.0",
|
||||
"@validatem/is-array": "^0.1.1",
|
||||
"@validatem/is-boolean": "^0.1.1",
|
||||
"@validatem/is-date": "^0.1.0",
|
||||
"@validatem/is-function": "^0.1.0",
|
||||
"@validatem/is-integer": "^0.1.0",
|
||||
"@validatem/is-number": "^0.1.3",
|
||||
"@validatem/is-positive": "^1.0.0",
|
||||
"@validatem/is-string": "^1.0.0",
|
||||
"@validatem/is-value": "^0.1.0",
|
||||
"@validatem/matches-format": "^0.1.0",
|
||||
"@validatem/require-either": "^0.1.0",
|
||||
"@validatem/required": "^0.1.1",
|
||||
|
|
222
src/database-backends/api.md
Normal file
222
src/database-backends/api.md
Normal file
|
@ -0,0 +1,222 @@
|
|||
# runInTransaction
|
||||
|
||||
```js
|
||||
runInTransaction(parentTX, (tx) => {
|
||||
// ... database operations go here ...
|
||||
})
|
||||
```
|
||||
|
||||
# lock
|
||||
|
||||
```js
|
||||
lock(tx, {
|
||||
id,
|
||||
task
|
||||
})
|
||||
```
|
||||
|
||||
Returns: `true` if locking succeeded, `false` if the task was already locked
|
||||
|
||||
# unlock
|
||||
|
||||
```js
|
||||
lock(tx, {
|
||||
id,
|
||||
task
|
||||
})
|
||||
```
|
||||
|
||||
# getItem
|
||||
|
||||
Fetch an item from the database by its unique named ID.
|
||||
|
||||
Wrapper API:
|
||||
|
||||
```js
|
||||
getItem(tx, id)
|
||||
|
||||
getItem(tx, {
|
||||
id,
|
||||
optional = false
|
||||
})
|
||||
```
|
||||
|
||||
Expected backend API:
|
||||
|
||||
```js
|
||||
getItem(tx, {
|
||||
id,
|
||||
optional
|
||||
})
|
||||
```
|
||||
|
||||
# createItem
|
||||
|
||||
Creates a new item or, if the item already exists, updates the existing item (if allowed). Tags/aliases are only ever added by this operation, never removed.
|
||||
|
||||
Wrapper API:
|
||||
|
||||
```js
|
||||
createItem(tx, {
|
||||
id,
|
||||
tags = [],
|
||||
aliases = [],
|
||||
// Either `data` or `update` is required
|
||||
data,
|
||||
update, // callback(oldData)
|
||||
failIfExists = false,
|
||||
allowUpsert = true, // whether the *item object* is allowed to be updated, or should just be skipped if already exists
|
||||
parentID
|
||||
})
|
||||
```
|
||||
|
||||
Expected backend API:
|
||||
|
||||
```js
|
||||
createItem(tx, {
|
||||
id,
|
||||
tags,
|
||||
aliases,
|
||||
update, // callback(oldData ?? {})
|
||||
failIfExists,
|
||||
allowUpsert,
|
||||
parentID
|
||||
})
|
||||
```
|
||||
|
||||
# renameItem
|
||||
|
||||
```js
|
||||
let [ tx, { to, from }] = validateArguments(arguments, {
|
||||
tx: [ required, isTX ],
|
||||
options: [{
|
||||
to: [ required, isString ],
|
||||
from: [ required, isString ]
|
||||
}]
|
||||
});
|
||||
```
|
||||
|
||||
# repointAliases
|
||||
|
||||
```js
|
||||
let [ tx, { to, from }] = validateArguments(arguments, {
|
||||
tx: [ required, isTX ],
|
||||
options: [{
|
||||
to: [ required, isString ],
|
||||
from: [ required, isString ]
|
||||
}]
|
||||
});
|
||||
```
|
||||
|
||||
# mergeItem
|
||||
|
||||
```js
|
||||
let [ tx, { from, into, merge, mergeMetadata }] = validateArguments(arguments, {
|
||||
tx: [ required, isTX ],
|
||||
options: [{
|
||||
from: [ required, isString ],
|
||||
into: [ required, isString ],
|
||||
merge: [ required, isFunction ],
|
||||
mergeMetadata: [ defaultTo({}), anyProperty({
|
||||
key: [ required ],
|
||||
value: [ required, isFunction ]
|
||||
})],
|
||||
}]
|
||||
});
|
||||
```
|
||||
|
||||
# deleteItem
|
||||
|
||||
```js
|
||||
let [ tx, { id }] = validateArguments(arguments, {
|
||||
tx: [ required, isTX ],
|
||||
options: [{
|
||||
id: [ required, isString ]
|
||||
}]
|
||||
});
|
||||
```
|
||||
|
||||
# createAlias
|
||||
|
||||
```js
|
||||
let [ tx, { from, to, failIfExists }] = validateArguments(arguments, {
|
||||
tx: [ required, isTX ],
|
||||
options: [{
|
||||
from: [ required, isString ],
|
||||
to: [ required, isString ],
|
||||
failIfExists: [ defaultTo(false), isBoolean ] // TODO: Shouldn't this default to true, for any occurrence outside of a merge/rename?
|
||||
}]
|
||||
});
|
||||
```
|
||||
|
||||
# deleteAlias
|
||||
|
||||
```js
|
||||
let [ tx, { from }] = validateArguments(arguments, {
|
||||
tx: [ required, isTX ],
|
||||
options: [{
|
||||
from: [ required, isString ]
|
||||
}]
|
||||
});
|
||||
```
|
||||
|
||||
# updateData
|
||||
|
||||
```js
|
||||
let [ tx, { id, update }] = validateArguments(arguments, {
|
||||
tx: [ required, isTX ],
|
||||
options: [{
|
||||
id: [ required, isString ],
|
||||
update: [ required, isFunction ]
|
||||
}]
|
||||
});
|
||||
```
|
||||
|
||||
# updateMetadata
|
||||
|
||||
```js
|
||||
let [ tx, { id, update, taskName, taskVersion, ttl }] = validateArguments(arguments, {
|
||||
tx: [ required, isTX ],
|
||||
options: [{
|
||||
id: [ required, isString ],
|
||||
update: [ required, isFunction ],
|
||||
taskName: [ required, isString ],
|
||||
taskVersion: [ required, isString ],
|
||||
ttl: [ isNumber ]
|
||||
}]
|
||||
});
|
||||
```
|
||||
|
||||
# expire
|
||||
|
||||
```js
|
||||
let [ tx, { id, taskName }] = validateArguments(arguments, {
|
||||
tx: [ required, isTX ],
|
||||
options: [{
|
||||
id: [ required, isString ],
|
||||
taskName: [ required, isString ]
|
||||
}]
|
||||
});
|
||||
```
|
||||
|
||||
# setTTL
|
||||
|
||||
# allowFailure?
|
||||
|
||||
# log?
|
||||
|
||||
# countLockedTasks
|
||||
|
||||
Only a `tx` argument
|
||||
|
||||
# getUpdates
|
||||
|
||||
```js
|
||||
let [ tx, { timestamp, prefix }] = validateArguments(arguments, {
|
||||
tx: [ required, isTX ],
|
||||
options: [ defaultTo({}), {
|
||||
timestamp: [ isDate ],
|
||||
prefix: [ isString ]
|
||||
}]
|
||||
});
|
||||
```
|
443
src/database-backends/index.js
Normal file
443
src/database-backends/index.js
Normal file
|
@ -0,0 +1,443 @@
|
|||
"use strict";
|
||||
|
||||
const Promise = require("bluebird");
|
||||
const unreachable = require("@joepie91/unreachable");
|
||||
|
||||
const { validateArguments, validateOptions } = require("@validatem/core");
|
||||
const required = require("@validatem/required");
|
||||
const requireEither = require("@validatem/require-either");
|
||||
const isString = require("@validatem/is-string");
|
||||
const isBoolean = require("@validatem/is-boolean");
|
||||
const isFunction = require("@validatem/is-function");
|
||||
const isDate = require("@validatem/is-date");
|
||||
const arrayOf = require("@validatem/array-of");
|
||||
const defaultTo = require("@validatem/default-to");
|
||||
const anyProperty = require("@validatem/any-property");
|
||||
const anything = require("@validatem/anything");
|
||||
const wrapValueAsOption = require("@validatem/wrap-value-as-option");
|
||||
const ValidationError = require("@validatem/error");
|
||||
const either = require("@validatem/either");
|
||||
|
||||
const isTaskObject = require("../validators/is-task-object");
|
||||
|
||||
// NOTE: The purpose of this module is to implement all the database API logic that's common across database backends; that mainly means initialization, settings merging, and input validation/normalization. Each individual database backend can then assume that it will always be called with valid input.
|
||||
|
||||
/* TODO: API semantics to document:
|
||||
- All mutating operations are queued, and executed in a transaction *after* the task has been completed; this is to avoid long-lived transactions interfering with task concurrency
|
||||
- All read-only operations (as specified in the task logic) are executed immediately, bypassing any queue, and outside of any transaction.
|
||||
- This means that read-only operations see the state of the database *before* any mutations take place, but mutating methods see the state *after* any preceding mutations (within that task) have taken place.
|
||||
- Simulated execution works exactly like regular execution, except the mutation transaction is cancelled at the last moment, preventing any true changes to the database. This essentially means you are operating on a read-only view of the database.
|
||||
- This also means that a transactional database is *required* to build a srap backend. If you want to implement a backend for a non-transactional database and are able to provide semantically equivalent simulation functionality otherwise, please open an issue!
|
||||
*/
|
||||
|
||||
// FIXME: Verify that all internal method calls in the PostgreSQL backend are still valid after moving argument validation/normalization into this module
|
||||
|
||||
module.exports = function (state) {
|
||||
let { tasks } = state;
|
||||
|
||||
const backendModules = {
|
||||
"postgresql": require("./postgresql")(state),
|
||||
"simulated": require("./simulated")(state),
|
||||
};
|
||||
|
||||
function wrapBackend(backend) {
|
||||
// NOTE: The backend.getDefaultTransaction method MUST return synchronously.
|
||||
// TODO: Only accept an explicit `null` when defaulting, not an `undefined` which may be implicit? To ensure that the caller didn't just forget to provide one. Though most/all queries have arguments coming after the TX, so this might not be necessary.
|
||||
let maybeTX = [ defaultTo(backend.getDefaultTransaction), backend.isTransaction ];
|
||||
|
||||
function maybeGetTaskObject(taskName) {
|
||||
if (tasks.has(taskName)) {
|
||||
return tasks.get(taskName);
|
||||
} else {
|
||||
throw new ValidationError(`No task named '${taskName}' exists`);
|
||||
}
|
||||
}
|
||||
|
||||
let isTask = either([
|
||||
[ isTaskObject ],
|
||||
[ isString, maybeGetTaskObject, isTaskObject ]
|
||||
]);
|
||||
|
||||
return {
|
||||
topLevel: {
|
||||
simulate: function () {
|
||||
// NOTE: Simulated backend is initialized synchronously; all other backends are not!
|
||||
return wrapBackend(backendModules.simulated.create({ backend: backend }));
|
||||
},
|
||||
|
||||
shutdown: function () {
|
||||
return backend.shutdown();
|
||||
},
|
||||
|
||||
runInTransaction: function (_tx, _callback) {
|
||||
let [ tx, callback ] = validateArguments(arguments, {
|
||||
tx: maybeTX,
|
||||
callback: [ required, isFunction ]
|
||||
});
|
||||
|
||||
return backend.runInTransaction(tx, callback);
|
||||
},
|
||||
countLockedTasks: function (_tx) {
|
||||
let [ tx ] = validateArguments(arguments, {
|
||||
tx: maybeTX
|
||||
});
|
||||
|
||||
return backend.countLockedTasks(tx);
|
||||
},
|
||||
|
||||
getUpdateStream: function (_tx, _options) {
|
||||
let [ tx, options ] = validateArguments(arguments, {
|
||||
tx: maybeTX,
|
||||
options: [ defaultTo({}), {
|
||||
timestamp: [ isDate ],
|
||||
prefix: [ isString ]
|
||||
}]
|
||||
});
|
||||
|
||||
return backend.getUpdateStream(tx, options);
|
||||
},
|
||||
|
||||
getTaskStream: function (_tx, _options) {
|
||||
let [ tx, options ] = validateArguments(arguments, {
|
||||
tx: maybeTX,
|
||||
options: [ required, {
|
||||
task: [ required, isTask ]
|
||||
}]
|
||||
});
|
||||
|
||||
return backend.getTaskStream(tx, options);
|
||||
},
|
||||
|
||||
insertSeeds: function (_tx, _options) {
|
||||
let [ tx, options ] = validateArguments(arguments, {
|
||||
tx: maybeTX,
|
||||
options: [ required, {
|
||||
// FIXME: This currently duplicates validation logic from forItem.storeItem; figure out a way to deduplicate that
|
||||
seeds: [ required, arrayOf({
|
||||
id: [ required, isString ],
|
||||
// Tags are required to be specified (even if an empty array) because it's easily forgotten
|
||||
tags: [ required, arrayOf(isString) ],
|
||||
aliases: [ defaultTo([]), arrayOf(isString) ],
|
||||
data: [ required, anything ], // FIXME: Check for object
|
||||
}) ]
|
||||
}]
|
||||
});
|
||||
|
||||
return Promise.map(options.seeds, (seed) => {
|
||||
let { data, ... props } = seed;
|
||||
|
||||
return backend.storeItem(tx, {
|
||||
... props,
|
||||
update: () => data,
|
||||
allowUpsert: false,
|
||||
failIfExists: false
|
||||
});
|
||||
});
|
||||
},
|
||||
|
||||
// FIXME: Other than the missing readOperation wrapper and the tx argument, this is *basically* the same logic as under forItem... this should be simplified somehow.
|
||||
getItem: function (_tx, _options) {
|
||||
let [ tx, options ] = validateArguments(arguments, {
|
||||
tx: maybeTX,
|
||||
options: [ required, wrapValueAsOption("id"), {
|
||||
id: [ required, isString ],
|
||||
optional: [ defaultTo(false), isBoolean ] // FIXME: Can this handling be moved to the wrapper?
|
||||
}]
|
||||
});
|
||||
|
||||
return backend.getItem(tx, options);
|
||||
},
|
||||
},
|
||||
forItem: function (_options) {
|
||||
// FIXME: Proper validation rules here for the other fields as well
|
||||
let { item, task, mutationQueue, readTX, simulate } = validateOptions(arguments, {
|
||||
item: anything,
|
||||
task: [ required, isTask ],
|
||||
mutationQueue: anything,
|
||||
readTX: maybeTX,
|
||||
simulate: anything
|
||||
});
|
||||
|
||||
// We create a new instance of the actual API for every item being processed. This is necessary because some of the input arguments will default to item-specific values, and some of the logic is dependent on task-specific metadata. This is a more efficient (and understandable) approach than pretending the API is stateless and then separately wrapping the API *again* for every individual item with a whole separate layer of input validation rules.
|
||||
|
||||
// FIXME: Is this still correct, with the new task (graph) format?
|
||||
let dependentTaskNames = new Set(task.dependents.map((dependent) => dependent.task));
|
||||
|
||||
function mutableOperation(func) {
|
||||
if (simulate === true) {
|
||||
return func(readTX, backend);
|
||||
} else if (mutationQueue != null) {
|
||||
mutationQueue.push(func);
|
||||
} else {
|
||||
unreachable("No mutation queue provided in live mode");
|
||||
}
|
||||
}
|
||||
|
||||
function readOperation(func) {
|
||||
return func(readTX, backend);
|
||||
}
|
||||
|
||||
let exposedAPI = {
|
||||
// NOTE: 'exposed' API methods are the ones that are passed into a user-defined task, and which the task uses to eg. update or create new items
|
||||
getItem: function (_options) {
|
||||
let [ options ] = validateArguments(arguments, {
|
||||
options: [ required, wrapValueAsOption("id"), {
|
||||
id: [ required, isString ],
|
||||
optional: [ defaultTo(false), isBoolean ] // FIXME: Can this handling be moved to the wrapper?
|
||||
}]
|
||||
});
|
||||
|
||||
return readOperation((tx) => {
|
||||
return backend.getItem(tx, options);
|
||||
});
|
||||
},
|
||||
|
||||
storeItem: function (_options) {
|
||||
// NOTE: Using `update` instead of `data` makes it an upsert!
|
||||
let [ options ] = validateArguments(arguments, {
|
||||
options: [ required, {
|
||||
id: [ required, isString ],
|
||||
// Tags are required to be specified (even if an empty array) because it's easily forgotten
|
||||
tags: [ required, arrayOf(isString) ],
|
||||
aliases: [ defaultTo([]), arrayOf(isString) ],
|
||||
data: [ anything ], // FIXME: Check for object
|
||||
update: [ isFunction ],
|
||||
failIfExists: [ defaultTo(false), isBoolean ],
|
||||
allowUpsert: [ defaultTo(true), isBoolean ],
|
||||
parentID: [ defaultTo(item.id), isString ]
|
||||
}, requireEither([ "data", "update" ]) ]
|
||||
});
|
||||
|
||||
let { data, ... rest } = options;
|
||||
|
||||
return mutableOperation((tx) => {
|
||||
return backend.storeItem(tx, {
|
||||
... rest,
|
||||
// We normalize `data` and `update` (which are mutually-exclusive) into a single option here, so that the backend only needs to deal with the `update` case
|
||||
// TODO: Can this be folded into the validation rules in a reasonable and readable way?
|
||||
update: (data != null)
|
||||
? (existingData) => ({ ... existingData, ... data })
|
||||
: rest.update
|
||||
});
|
||||
});
|
||||
},
|
||||
|
||||
moveItem: function (_options) {
|
||||
let [ options ] = validateArguments(arguments, {
|
||||
options: [ required, wrapValueAsOption("into"), {
|
||||
from: [ defaultTo(item.id), isString ],
|
||||
into: [ required, isString ],
|
||||
// NOTE: If no `merge` function is specified, that indicates that merging is not allowed (ie. this is strictly a rename), and mergeMetadata is ignored too
|
||||
merge: [ isFunction ],
|
||||
mergeMetadata: [ defaultTo({}), anyProperty({
|
||||
key: [ required ],
|
||||
value: [ required, isFunction ]
|
||||
})],
|
||||
}]
|
||||
});
|
||||
|
||||
return mutableOperation((tx) => {
|
||||
return backend.moveItem(tx, { options, allowMerge: (options.merge != null) });
|
||||
});
|
||||
},
|
||||
|
||||
deleteItem: function (_options) {
|
||||
let [ options ] = validateArguments(arguments, {
|
||||
options: [
|
||||
defaultTo({}),
|
||||
wrapValueAsOption("id"), {
|
||||
id: [ defaultTo(item.id), isString ]
|
||||
}
|
||||
]
|
||||
});
|
||||
|
||||
return mutableOperation((tx) => {
|
||||
return backend.deleteItem(tx, options);
|
||||
});
|
||||
},
|
||||
|
||||
createAlias: function (_options) {
|
||||
let [ options ] = validateArguments(arguments, {
|
||||
options: [ required, wrapValueAsOption("from"), {
|
||||
from: [ required, isString ],
|
||||
to: [ defaultTo(item.id), isString ],
|
||||
failIfExists: [ defaultTo(false), isBoolean ] // TODO: Shouldn't this default to true, for any occurrence outside of a merge/rename?
|
||||
}]
|
||||
});
|
||||
|
||||
return mutableOperation((tx) => {
|
||||
return backend.createAlias(tx, options);
|
||||
});
|
||||
},
|
||||
|
||||
deleteAlias: function (_options) {
|
||||
let [ options ] = validateArguments(arguments, {
|
||||
options: [ required, wrapValueAsOption("from"), {
|
||||
from: [ required, isString ]
|
||||
}]
|
||||
});
|
||||
|
||||
return mutableOperation((tx) => {
|
||||
return backend.deleteAlias(tx, options);
|
||||
});
|
||||
},
|
||||
|
||||
updateData: function (_options) {
|
||||
// NOTE: This is a semantically self-describing convenience wrapper for `storeItem` that updates the currently-being-processed item
|
||||
let [ options ] = validateArguments(arguments, {
|
||||
options: [ required, wrapValueAsOption("update"), {
|
||||
id: [ defaultTo(item.id), isString ],
|
||||
update: [ required, isFunction ]
|
||||
}]
|
||||
});
|
||||
|
||||
return exposedAPI.storeItem({
|
||||
... options,
|
||||
tags: []
|
||||
});
|
||||
},
|
||||
|
||||
updateMetadata: function (_options) {
|
||||
let [ options ] = validateArguments(arguments, {
|
||||
options: [ required, wrapValueAsOption("update"), {
|
||||
id: [ defaultTo(item.id), isString ],
|
||||
update: [ required, isFunction ],
|
||||
task: [ required, isTask ]
|
||||
}]
|
||||
});
|
||||
|
||||
return mutableOperation((tx) => {
|
||||
return backend.updateMetadata(tx, options);
|
||||
});
|
||||
},
|
||||
|
||||
expire: function (_options) {
|
||||
// TODO: It probably doesn't make any semantic sense to leave *both* arguments unspecified. Maybe that should be prohibited via eg. a non-exclusive requireEither? Otherwise the user might expect to immediately expire the *current* task, but since the task is only updated *after* the task logic runs, that is not currently possible to express.
|
||||
let [ options ] = validateArguments(arguments, {
|
||||
options: [ required, {
|
||||
id: [ defaultTo(item.id), isString ],
|
||||
isTask: [ defaultTo(task), isTask ]
|
||||
}]
|
||||
});
|
||||
|
||||
return mutableOperation((tx) => {
|
||||
return backend.expire(tx, options);
|
||||
});
|
||||
},
|
||||
|
||||
expireDependents: function (_options) {
|
||||
// NOTE: This method does not have a counterpart in the database backend; it's a convenience abstraction over regular `backend.expire` calls
|
||||
let [{ id, dependents }] = validateArguments(arguments, {
|
||||
options: [ defaultTo({}), wrapValueAsOption("dependents"), {
|
||||
id: [ defaultTo(item.id), isString ],
|
||||
dependents: [ defaultTo([]), arrayOf(isString), (dependents) => {
|
||||
// Only consider dependents that actually exist for this task
|
||||
return dependents.filter((dependent) => dependentTaskNames.has(dependent));
|
||||
}]
|
||||
}]
|
||||
});
|
||||
|
||||
// NOTE: This works even with queueing, because each this.expire call just internally queues another operation
|
||||
return Promise.map(dependents, (dependent) => {
|
||||
return exposedAPI.expire({
|
||||
id: id,
|
||||
taskName: dependent
|
||||
});
|
||||
});
|
||||
},
|
||||
|
||||
// Temporary compatibility aliases
|
||||
createItem: (... args) => exposedAPI.storeItem(... args),
|
||||
mergeItem: (... args) => exposedAPI.moveItem(... args),
|
||||
renameItem: (options) => {
|
||||
if (typeof options === "string") {
|
||||
return exposedAPI.moveItem(options);
|
||||
} else {
|
||||
return exposedAPI.moveItem({ into: options.to, from: options.from });
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
return {
|
||||
// NOTE: 'internal' API methods are accessible to srap, but not to user-defined tasks.
|
||||
internal: {
|
||||
lock: function (_tx, _options) {
|
||||
let [ tx, options ] = validateArguments(arguments, {
|
||||
tx: maybeTX,
|
||||
options: [ defaultTo({}), {
|
||||
id: [ defaultTo(item.id), isString ],
|
||||
task: [ defaultTo(task), isTask ]
|
||||
}]
|
||||
});
|
||||
|
||||
return backend.lock(tx, options);
|
||||
},
|
||||
|
||||
unlock: function (_tx, _options) {
|
||||
let [ tx, options ] = validateArguments(arguments, {
|
||||
tx: maybeTX,
|
||||
options: [ defaultTo({}), {
|
||||
id: [ defaultTo(item.id), isString ],
|
||||
task: [ defaultTo(task), isTask ]
|
||||
}]
|
||||
});
|
||||
|
||||
return backend.unlock(tx, options);
|
||||
},
|
||||
|
||||
markTaskCompleted: function (_tx) {
|
||||
// TODO: Allow specifying a different task or item ID?
|
||||
let [ tx ] = validateArguments(arguments, {
|
||||
tx: maybeTX
|
||||
});
|
||||
|
||||
return backend.markTaskStatus(tx, {
|
||||
id: item.id,
|
||||
task: task,
|
||||
isSuccessful: true
|
||||
});
|
||||
},
|
||||
|
||||
markTaskFailed: function (_tx, _options) {
|
||||
// TODO: Allow specifying a different task or item ID?
|
||||
let [ tx, { error }] = validateArguments(arguments, {
|
||||
tx: maybeTX,
|
||||
options: [ required, {
|
||||
error: [ required, anything ] // TODO: Restrict to Error types?
|
||||
}]
|
||||
});
|
||||
|
||||
// FIXME: Persist error
|
||||
console.error("FIXME(persist error):", error.stack);
|
||||
|
||||
return backend.markTaskStatus(tx, {
|
||||
id: item.id,
|
||||
task: task,
|
||||
isSuccessful: false
|
||||
});
|
||||
},
|
||||
},
|
||||
exposed: exposedAPI
|
||||
};
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
return function createDatabaseAPI({ backend: backendName, options, simulate }) {
|
||||
// TODO: Validate inputs here, or maybe that belongs in config validation instead?
|
||||
return Promise.try(() => {
|
||||
let backendModule = backendModules[backendName];
|
||||
|
||||
if (backendModule != null) {
|
||||
// FIXME: Smarter merge, maybe expose merge-by-template merge rules from the database module?
|
||||
let settings = { ... backendModule.defaultSettings, ... options };
|
||||
|
||||
return backendModule.create(settings);
|
||||
} else {
|
||||
throw new Error(`No backend named '${backendName}' exists`);
|
||||
}
|
||||
}).then((backend) => {
|
||||
return wrapBackend(backend);
|
||||
});
|
||||
};
|
||||
};
|
353
src/database-backends/postgresql/index.js
Normal file
353
src/database-backends/postgresql/index.js
Normal file
|
@ -0,0 +1,353 @@
|
|||
"use strict";
|
||||
|
||||
const Promise = require("bluebird");
|
||||
|
||||
const path = require("path");
|
||||
const defaultValue = require("default-value");
|
||||
const knexLibrary = require("knex");
|
||||
const { knexSnakeCaseMappers } = require("objection");
|
||||
const { addMilliseconds } = require("date-fns");
|
||||
const ValidationError = require("@validatem/error");
|
||||
|
||||
const models = require("./models");
|
||||
const mergeItems = require("../../semantics/merge-items");
|
||||
|
||||
let migrationsFolder = path.join(__dirname, "../../../migrations");
|
||||
|
||||
function noop() {}
|
||||
|
||||
module.exports = function(state) {
|
||||
let { metrics } = state;
|
||||
|
||||
return {
|
||||
defaultSettings: {
|
||||
taskBatchSize: 1000,
|
||||
taskBatchDelay: 30 * 1000
|
||||
},
|
||||
create: function createPostgreSQLBackend(options) {
|
||||
let knex = knexLibrary({
|
||||
client: "pg",
|
||||
pool: { min: 0, max: 32 },
|
||||
migrations: { tableName: "srap_knex_migrations" },
|
||||
connection: options,
|
||||
... knexSnakeCaseMappers()
|
||||
});
|
||||
|
||||
return Promise.try(() => {
|
||||
return knex.migrate.latest({
|
||||
directory: migrationsFolder
|
||||
});
|
||||
}).then(() => {
|
||||
// TODO: Does it really make sense to be merging in the backendSettings here? Shouldn't that happen automatically in some way for *every* backend, rather than just the PostgreSQL one specifically? As backend settings are a generic backend feature
|
||||
state = { ... state, knex: knex, backendSettings: options};
|
||||
let db = models(state);
|
||||
state = { ... state, db: db };
|
||||
|
||||
// TODO: Should this be inlined instead?
|
||||
function repointAliases (tx, { to, from }) {
|
||||
return db.Alias.query(tx)
|
||||
.patch({ itemId: to, updatedAt: new Date() })
|
||||
.where({ itemId: from });
|
||||
}
|
||||
|
||||
function getTaskResult(tx, task, id) {
|
||||
return Promise.try(() => {
|
||||
return db.Alias.query(tx).findById(id);
|
||||
}).then((alias) => {
|
||||
if (alias != null) {
|
||||
return Promise.try(() => {
|
||||
return db.TaskResult.query(tx).findById([ task.name, alias.itemId ]);
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
shutdown: function () {
|
||||
knex.destroy();
|
||||
},
|
||||
|
||||
getDefaultTransaction: function () {
|
||||
return knex;
|
||||
},
|
||||
|
||||
isTransaction: function (value) {
|
||||
if (value.where == null || value.raw == null) {
|
||||
throw new ValidationError(`Must be a valid Knex or Knex transaction instance`);
|
||||
}
|
||||
},
|
||||
|
||||
runInTransaction: function (tx, callback) {
|
||||
return tx.transaction((tx) => {
|
||||
return callback(tx);
|
||||
}, { doNotRejectOnRollback: false });
|
||||
},
|
||||
|
||||
lock: function (tx, { id, task }) {
|
||||
return Promise.try(() => {
|
||||
return db.TaskInProgress.query(tx).insert({
|
||||
task: task.name,
|
||||
item_id: id
|
||||
});
|
||||
}).then(() => {
|
||||
return true;
|
||||
}).catch({ name: "UniqueViolationError" }, () => {
|
||||
return false;
|
||||
});
|
||||
},
|
||||
|
||||
unlock: function (tx, { id, task }) {
|
||||
return db.TaskInProgress.query(tx)
|
||||
.delete()
|
||||
.where({
|
||||
task: task.name,
|
||||
item_id: id
|
||||
});
|
||||
// TODO: return true/false depending on whether unlocking succeeded?
|
||||
},
|
||||
|
||||
getItem: function (tx, { id, optional }) {
|
||||
return Promise.try(() => {
|
||||
return db.Alias.relatedQuery("item", tx)
|
||||
.for(id)
|
||||
.withGraphFetched("taskResults");
|
||||
}).then((results) => {
|
||||
if (optional === true || results.length > 0) {
|
||||
return results[0];
|
||||
} else {
|
||||
throw new Error(`No item exists with ID '${id}'`);
|
||||
}
|
||||
});
|
||||
},
|
||||
|
||||
storeItem: function (tx, { id, tags, aliases, update, failIfExists, allowUpsert, parentID }) {
|
||||
// FIXME: Make failIfExists actually work, currently it does nothing as the UNIQUE constraint violation cannot occur for an upsert
|
||||
// TODO: Ensure that we run the transaction in full isolation mode, and retry in case of a conflict
|
||||
|
||||
return Promise.try(() => {
|
||||
// NOTE: We look up by alias, since this is an upsert - and so if the specified ID already exists as an alias, we should update the existing item instead of creating a new one with the specified (aliased) ID
|
||||
return db.Alias
|
||||
.relatedQuery("item", tx)
|
||||
.for(id);
|
||||
}).then((existingItems) => {
|
||||
let existingItem = existingItems[0];
|
||||
|
||||
let actualID = (existingItem != null)
|
||||
? existingItem.id
|
||||
: id;
|
||||
|
||||
let existingData = (existingItem != null)
|
||||
? existingItem.data
|
||||
: {};
|
||||
|
||||
// Make sure to add a self:self alias
|
||||
let allAliases = aliases.concat([ actualID ]);
|
||||
|
||||
let newItem = {
|
||||
id: actualID,
|
||||
data: update(existingData),
|
||||
createdBy: parentID,
|
||||
tags: tags.map((tag) => ({ name: tag })),
|
||||
aliases: allAliases.map((alias) => ({ alias: alias })),
|
||||
updatedAt: new Date()
|
||||
};
|
||||
|
||||
return Promise.try(() => {
|
||||
if (allowUpsert) {
|
||||
// NOTE: We *always* do upserts here, even if the user specified `data` rather than `update`, because tags and aliases should always be added even if the item itself already exists. We trust the user not to accidentally reuse IDs between different kinds of objects (which would break in various other ways anyway).
|
||||
return db.Item.query(tx).upsertGraph(newItem, {
|
||||
insertMissing: true,
|
||||
noDelete: true
|
||||
});
|
||||
} else {
|
||||
return db.Item.query(tx).insertGraph(newItem, {
|
||||
insertMissing: true
|
||||
});
|
||||
}
|
||||
}).tap(() => {
|
||||
// FIXME: We should probably move the metrics stuff to the wrapper instead, so that it works for *any* backend
|
||||
metrics.storedItems.inc(1);
|
||||
|
||||
// TODO: This currently produces somewhat misleading metrics; it only counts *explicitly specified* tags. That will *mostly* correlate to amount of genuinely-new items, but not perfectly. In the future, we should probably refactor the insertion code such that it is aware of the *current* tags of an item that it is about to merge into - but maybe that should be delayed until the zapdb migration.
|
||||
if (newItem.tags != null) {
|
||||
for (let tag of newItem.tags) {
|
||||
metrics.storedItems.labels({ tag: tag.name }).inc(1);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
}).catch({ name: "UniqueViolationError", table: "srap_items" }, (error) => {
|
||||
if (failIfExists) {
|
||||
throw error;
|
||||
} else {
|
||||
// Do nothing, just ignore the failure
|
||||
}
|
||||
});
|
||||
},
|
||||
|
||||
moveItem: function (tx, { from, into, merge, mergeMetadata }) {
|
||||
let allowMerge = (merge != null);
|
||||
|
||||
// TODO: Make stuff like `this.getItem` roundtrip through the backend abstraction instead of directly calling internal database methods, eg. by providing the backend itself as an argument to the method
|
||||
return Promise.all([
|
||||
this.getItem(tx, { id: from, optional: true }),
|
||||
this.getItem(tx, { id: into, optional: true }),
|
||||
]).then(([ fromObj, maybeIntoObj ]) => {
|
||||
// NOTE: If the source item does not exist, we silently ignore that. This is to ensure that opportunistic renaming of items (eg. after a naming convention change which only affects older items) in scraper configurations is possible.
|
||||
if (fromObj != null) {
|
||||
if (allowMerge) {
|
||||
let intoObj = defaultValue(maybeIntoObj, {
|
||||
id: into,
|
||||
data: {},
|
||||
taskResults: []
|
||||
});
|
||||
|
||||
let newObject = mergeItems({ fromObj, intoObj, merge, mergeMetadata });
|
||||
|
||||
let upsertOptions = {
|
||||
insertMissing: true,
|
||||
noDelete: true
|
||||
};
|
||||
|
||||
return Promise.all([
|
||||
// NOTE: We don't use this.deleteItem, to sidestep any alias lookups
|
||||
db.Item.query(tx).findById(from).delete(),
|
||||
// NOTE: Cannot use into.$query here because that adds an implicit query builder operation, which upsertGraph does not allow
|
||||
db.Item.query(tx).upsertGraph(newObject, upsertOptions),
|
||||
]);
|
||||
} else {
|
||||
return Promise.all([
|
||||
db.Item.query(tx).findById(from).patch({ id: into }),
|
||||
this.createAlias(tx, { from: into, to: into })
|
||||
]);
|
||||
}
|
||||
}
|
||||
}).then(() => {
|
||||
// NOTE: Repointing aliases has the side-effect of leaving a redirect from the source to the destination item, as each item has a self:self alias
|
||||
return repointAliases(tx, { from: from, to: into });
|
||||
});
|
||||
},
|
||||
|
||||
deleteItem: function (tx, { id }) {
|
||||
return db.Alias.relatedQuery("item", tx)
|
||||
.for(id)
|
||||
.delete();
|
||||
},
|
||||
|
||||
createAlias: function (tx, { from, to, failIfExists }) {
|
||||
// Isolate this operation into a savepoint so that it can fail without breaking the entire transaction
|
||||
let promise = this.runInTransaction(tx, (tx) => {
|
||||
return db.Alias.query(tx).insert({
|
||||
alias: from,
|
||||
itemId: to,
|
||||
updatedAt: new Date()
|
||||
});
|
||||
});
|
||||
|
||||
if (failIfExists) {
|
||||
return promise;
|
||||
} else {
|
||||
return Promise.resolve(promise)
|
||||
.catch({ name: "UniqueViolationError" }, noop);
|
||||
}
|
||||
},
|
||||
|
||||
deleteAlias: function (tx, { from }) {
|
||||
// TODO: This cannot yet be propagated to the update feed, because we don't keep a record of deletions
|
||||
return db.Alias.query(tx).findById(from).delete();
|
||||
},
|
||||
|
||||
updateMetadata: function (tx, { id, update, task }) {
|
||||
// TODO: failIfExists
|
||||
return Promise.try(() => {
|
||||
return getTaskResult(tx, task, id);
|
||||
}).then((taskResult) => {
|
||||
let sharedFields = {
|
||||
isInvalidated: false,
|
||||
updatedAt: new Date()
|
||||
};
|
||||
|
||||
if (taskResult != null) {
|
||||
return taskResult.$query(tx).patch({
|
||||
... sharedFields,
|
||||
metadata: update(taskResult.metadata),
|
||||
});
|
||||
} else {
|
||||
return db.TaskResult.query(tx).insert({
|
||||
... sharedFields,
|
||||
task: task.name,
|
||||
itemId: id,
|
||||
metadata: update({})
|
||||
});
|
||||
}
|
||||
});
|
||||
},
|
||||
|
||||
expire: function (tx, { id, task }) {
|
||||
return Promise.try(() => {
|
||||
return db.Alias.query(tx).findById(id);
|
||||
}).then((alias) => {
|
||||
return db.TaskResult.query(tx)
|
||||
.where({ task: task.name, itemId: alias.itemId })
|
||||
.patch({ isInvalidated: true });
|
||||
});
|
||||
},
|
||||
|
||||
setTTL: function (options) {
|
||||
// options = ttl || { id, taskName, ttl }
|
||||
// FIXME
|
||||
},
|
||||
|
||||
allowFailure: function (allowed) {
|
||||
|
||||
},
|
||||
|
||||
log: function (category, message) {
|
||||
|
||||
},
|
||||
|
||||
markTaskStatus: function (tx, { id, task, isSuccessful }) {
|
||||
return Promise.try(() => {
|
||||
return getTaskResult(tx, task, id);
|
||||
}).then((taskResult) => {
|
||||
let sharedFields = {
|
||||
isSuccessful: isSuccessful,
|
||||
taskVersion: task.version,
|
||||
expiresAt: (task.ttl != null)
|
||||
? addMilliseconds(new Date(), task.ttl)
|
||||
: undefined
|
||||
};
|
||||
|
||||
if (taskResult != null) {
|
||||
return taskResult.$query(tx).patch({
|
||||
... sharedFields
|
||||
});
|
||||
} else {
|
||||
return db.TaskResult.query(tx).insert({
|
||||
... sharedFields,
|
||||
task: task.name,
|
||||
itemId: id
|
||||
});
|
||||
}
|
||||
});
|
||||
},
|
||||
|
||||
logTaskError: function (tx, { id, task, error }) {
|
||||
throw new Error(`UNIMPLEMENTED`); // FIXME
|
||||
},
|
||||
|
||||
countLockedTasks: function (tx) {
|
||||
return Promise.try(() => {
|
||||
return db.TaskInProgress.query(tx).count({ count: "*" });
|
||||
}).then((result) => {
|
||||
return result[0].count;
|
||||
});
|
||||
},
|
||||
|
||||
getUpdateStream: require("./queries/get-update-stream")(state),
|
||||
getTaskStream: require("./queries/get-task-stream")(state)
|
||||
};
|
||||
});
|
||||
}
|
||||
};
|
||||
};
|
112
src/database-backends/postgresql/queries/get-task-stream.js
Normal file
112
src/database-backends/postgresql/queries/get-task-stream.js
Normal file
|
@ -0,0 +1,112 @@
|
|||
"use strict";
|
||||
|
||||
const Promise = require("bluebird");
|
||||
const debug = require("debug")("srap:backend:postgresql:query:get-task-stream");
|
||||
const buffer = require("@promistream/buffer");
|
||||
const pipe = require("@promistream/pipe");
|
||||
const simpleSource = require("@promistream/simple-source");
|
||||
|
||||
const query = `
|
||||
WITH
|
||||
dependency_tasks AS (
|
||||
SELECT * FROM
|
||||
json_to_recordset(:dependencyTaskDefinitions) AS x(task text, task_version text)
|
||||
),
|
||||
matching_items AS (
|
||||
SELECT
|
||||
DISTINCT ON (srap_items.id)
|
||||
srap_items.*,
|
||||
results.updated_at AS result_date,
|
||||
results.task_version,
|
||||
(
|
||||
results.is_successful = TRUE
|
||||
AND (
|
||||
results.expires_at < NOW()
|
||||
OR results.is_invalidated = TRUE
|
||||
)
|
||||
) AS is_candidate
|
||||
FROM srap_items
|
||||
INNER JOIN srap_tags
|
||||
ON srap_tags.item_id = srap_items.id
|
||||
AND srap_tags.name = ANY(:tags)
|
||||
LEFT JOIN srap_task_results AS results
|
||||
ON results.item_id = srap_items.id
|
||||
AND results.task = :task
|
||||
WHERE
|
||||
NOT EXISTS (
|
||||
SELECT FROM srap_tasks_in_progress AS pr WHERE pr.item_id = srap_items.id
|
||||
)
|
||||
),
|
||||
candidates AS (
|
||||
SELECT * FROM matching_items
|
||||
WHERE result_date IS NULL
|
||||
UNION
|
||||
SELECT * FROM matching_items
|
||||
WHERE is_candidate = TRUE
|
||||
OR NOT (task_version = :taskVersion)
|
||||
)
|
||||
(
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
candidates
|
||||
WHERE
|
||||
NOT EXISTS (
|
||||
SELECT
|
||||
results.*
|
||||
FROM dependency_tasks
|
||||
LEFT JOIN srap_task_results AS results
|
||||
ON dependency_tasks.task = results.task
|
||||
AND dependency_tasks.task_version = results.task_version
|
||||
AND results.item_id = candidates.id
|
||||
WHERE
|
||||
results.is_successful IS NULL
|
||||
OR results.is_successful = FALSE
|
||||
OR (
|
||||
results.is_successful = TRUE
|
||||
AND (
|
||||
results.expires_at < NOW()
|
||||
OR results.is_invalidated = TRUE
|
||||
)
|
||||
)
|
||||
)
|
||||
) LIMIT :resultLimit;
|
||||
`;
|
||||
|
||||
module.exports = function ({ metrics, backendSettings }) {
|
||||
return function (tx, { task }) {
|
||||
return pipe([
|
||||
simpleSource(() => {
|
||||
let startTime = Date.now();
|
||||
|
||||
return Promise.try(() => {
|
||||
return tx.raw(query, {
|
||||
tags: task.tags,
|
||||
task: task.name,
|
||||
taskVersion: task.version,
|
||||
resultLimit: backendSettings.taskBatchSize,
|
||||
dependencyTaskDefinitions: JSON.stringify(task.dependencies.map((dependency) => {
|
||||
// Case-mapping for SQL compatibility
|
||||
return { task_version: dependency.version, task: dependency.name };
|
||||
}))
|
||||
});
|
||||
}).then((result) => {
|
||||
let timeElapsed = Date.now() - startTime;
|
||||
|
||||
metrics.taskFetchTime.labels({ task: task.name }).set(timeElapsed / 1000);
|
||||
metrics.taskFetchResults.labels({ task: task.name }).set(result.rowCount);
|
||||
|
||||
debug(`Task retrieval query for '${task.name}' took ${timeElapsed}ms and produced ${result.rowCount} results`);
|
||||
|
||||
if (result.rowCount > 0) {
|
||||
return result.rows;
|
||||
} else {
|
||||
// TODO: Consider using LISTEN/NOTIFY instead?
|
||||
return Promise.resolve([]).delay(backendSettings.taskBatchDelay);
|
||||
}
|
||||
});
|
||||
}),
|
||||
buffer()
|
||||
]);
|
||||
};
|
||||
};
|
|
@ -0,0 +1,65 @@
|
|||
"use strict";
|
||||
|
||||
const dateFns = require("date-fns");
|
||||
|
||||
const pipe = require("@promistream/pipe");
|
||||
const combineSequentialStreaming = require("@promistream/combine-sequential-streaming");
|
||||
const fromIterable = require("@promistream/from-iterable");
|
||||
const fromNodeStream = require("@promistream/from-node-stream");
|
||||
|
||||
const createTypeTaggingStream = require("../../../streams/tag-type");
|
||||
|
||||
module.exports = function ({ db, knex }) {
|
||||
return function (tx, { timestamp, prefix }) {
|
||||
// NOTE: This returns snake_cased keys! As we're bypassing the Objection internals, no casemapping occurs.
|
||||
// FIXME/REFACTOR: That needs to be changed in the refactor, for consistency across database backends
|
||||
|
||||
// NOTE: This is a hacky workaround - if we don't do this, then for some reason also entries *at* the exact timestamp are included, which is not what we want.
|
||||
// FIXME: Verify that this doesn't break anything, eg. when an entry is created inbetween the original timestamp and +1ms.
|
||||
let actualTimestamp = (timestamp != null)
|
||||
? dateFns.addMilliseconds(timestamp, 1)
|
||||
: undefined;
|
||||
|
||||
function applyWhereClauses(query, idField) {
|
||||
if (timestamp != null) {
|
||||
// FIXME: An error in the query here throws an error, resulting in an abort handling bug in a promistream
|
||||
query = query.whereRaw(`updated_at > ?`, [ actualTimestamp ]);
|
||||
}
|
||||
|
||||
if (prefix != null) {
|
||||
query = query.whereRaw(`${idField} LIKE ?`, [ `${prefix.replace(/%/g, "\\%")}%` ]);
|
||||
}
|
||||
|
||||
return query;
|
||||
}
|
||||
|
||||
function* streamGenerator() {
|
||||
yield pipe([
|
||||
fromNodeStream.fromReadable(
|
||||
applyWhereClauses(db.Item.query(tx), "id").toKnexQuery().stream()
|
||||
),
|
||||
createTypeTaggingStream("item")
|
||||
]);
|
||||
|
||||
yield pipe([
|
||||
fromNodeStream.fromReadable(
|
||||
// NOTE: We are only interested in aliases which don't point at themselves
|
||||
applyWhereClauses(db.Alias.query(tx).where("alias", "!=", knex.ref("item_id")), "alias").toKnexQuery().stream()
|
||||
),
|
||||
createTypeTaggingStream("alias")
|
||||
]);
|
||||
|
||||
yield pipe([
|
||||
fromNodeStream.fromReadable(
|
||||
applyWhereClauses(db.TaskResult.query(tx), "item_id").toKnexQuery().stream()
|
||||
),
|
||||
createTypeTaggingStream("taskResult")
|
||||
]);
|
||||
}
|
||||
|
||||
return pipe([
|
||||
fromIterable(streamGenerator()),
|
||||
combineSequentialStreaming()
|
||||
]);
|
||||
}
|
||||
};
|
183
src/database-backends/simulated/index.js
Normal file
183
src/database-backends/simulated/index.js
Normal file
|
@ -0,0 +1,183 @@
|
|||
"use strict";
|
||||
|
||||
const Promise = require("bluebird");
|
||||
const chalk = require("chalk");
|
||||
|
||||
const mergeItems = require("../../semantics/merge-items");
|
||||
|
||||
function printTX(tx) {
|
||||
// TODO: Print entire chain?
|
||||
return chalk.bold.yellow(`[tx ${tx.__txID ?? "?"}]`);
|
||||
}
|
||||
|
||||
function printItem(id, task) {
|
||||
if (task != null) {
|
||||
return chalk.bold.white(`[${id}][${task.name}]`);
|
||||
} else {
|
||||
return chalk.bold.white(`[${id}]`);
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: Move logs to logging hook
|
||||
function logSimulated(... args) {
|
||||
console.log(chalk.gray(args[0]), ... args.slice(1));
|
||||
}
|
||||
|
||||
module.exports = function (state) {
|
||||
|
||||
// NOTE: The simulated backend needs access to the 'real' backend; a task may eg. mutate an item based on its current data, and we'd need to read that from the real data source. The only constraint is that the simulated backend cannot *mutate* anything in the real backend, but reading is fine!
|
||||
return {
|
||||
defaultSettings: {},
|
||||
create: function createSimulatedBackend({ backend }) {
|
||||
let txCounter = 0;
|
||||
let locks = new Map(); // Map<task, Set<id>>
|
||||
|
||||
return {
|
||||
shutdown: function () {
|
||||
return backend.shutdown();
|
||||
},
|
||||
|
||||
getDefaultTransaction: function () {
|
||||
return { __txID: null };
|
||||
},
|
||||
|
||||
isTransaction: function (value) {
|
||||
return ("__txID" in value);
|
||||
},
|
||||
|
||||
runInTransaction: function (tx, callback) {
|
||||
let newTransaction = { __txID: txCounter++, __parentTX: tx };
|
||||
|
||||
return callback(newTransaction);
|
||||
},
|
||||
|
||||
lock: function (tx, { id, task }) {
|
||||
if (!locks.has(task)) {
|
||||
locks.set(task, new Set());
|
||||
}
|
||||
|
||||
let taskLocks = locks.get(task);
|
||||
|
||||
if (taskLocks.has(id)) {
|
||||
logSimulated(`${printTX(tx)} Already locked: ${printItem(id, task)}`);
|
||||
return false;
|
||||
} else {
|
||||
logSimulated(`${printTX(tx)} Locking ${printItem(id, task)}`);
|
||||
taskLocks.add(id);
|
||||
return true;
|
||||
}
|
||||
},
|
||||
|
||||
unlock: function (tx, { id, task }) {
|
||||
logSimulated(`${printTX(tx)} Unlocking ${printItem(id, task)}`);
|
||||
locks.get(task).delete(id);
|
||||
},
|
||||
|
||||
getItem: function (tx, options) {
|
||||
return backend.getItem(backend.getDefaultTransaction(), options);
|
||||
},
|
||||
|
||||
storeItem: function (tx, { id, parentID, update, tags, aliases, allowUpsert }) {
|
||||
return Promise.try(() => {
|
||||
return this.getItem(tx, { id: id, optional: true });
|
||||
}).then((currentItem) => {
|
||||
let actualID = (currentItem != null)
|
||||
? currentItem.id
|
||||
: id;
|
||||
|
||||
let newItem = {
|
||||
id: actualID,
|
||||
data: (currentItem != null)
|
||||
? update(currentItem.data)
|
||||
: update({}),
|
||||
createdBy: parentID,
|
||||
tags: tags,
|
||||
aliases: aliases.concat([ actualID ]),
|
||||
updatedAt: new Date()
|
||||
};
|
||||
|
||||
logSimulated(`${printTX(tx)} ${printItem(id)} Storing item (upsert=${allowUpsert})`, newItem);
|
||||
});
|
||||
},
|
||||
|
||||
moveItem: function (tx, { from, into, merge, mergeMetadata, allowMerge }) {
|
||||
return Promise.all([
|
||||
this.getItem(tx, { id: from, optional: true }),
|
||||
this.getItem(tx, { id: into, optional: true }),
|
||||
]).then((fromObj, maybeIntoObj) => {
|
||||
if (fromObj != null) {
|
||||
let intoObj = maybeIntoObj ?? {
|
||||
id: into,
|
||||
data: {},
|
||||
taskResults: []
|
||||
};
|
||||
|
||||
if (allowMerge) {
|
||||
let newItem = mergeItems({ fromObj, intoObj, merge, mergeMetadata });
|
||||
logSimulated(`${printTX(tx)} ${printItem(from)} Moving item to ${printItem[into]} (merge=true)`, newItem);
|
||||
} else {
|
||||
logSimulated(`${printTX(tx)} ${printItem(from)} Moving item to ${printItem[into]} (merge=false)`);
|
||||
}
|
||||
}
|
||||
});
|
||||
},
|
||||
|
||||
deleteItem: function (tx, { id }) {
|
||||
logSimulated(`${printTX(tx)} ${printItem(id)} Deleting item`);
|
||||
},
|
||||
|
||||
createAlias: function (tx, { from, to }) {
|
||||
logSimulated(`${printTX(tx)} ${printItem(to)} Creating alias ${from}`);
|
||||
},
|
||||
|
||||
deleteAlias: function (tx, { from }) {
|
||||
logSimulated(`${printTX(tx)} ${printItem(from)} Deleting alias`);
|
||||
},
|
||||
|
||||
updateMetadata: function (tx, { id, task, update }) {
|
||||
return Promise.try(() => {
|
||||
return this.getItem(tx, { id, optional: true });
|
||||
}).then((item) => {
|
||||
let taskResult = (item != null)
|
||||
? item.taskResults.find((result) => result.task === task)
|
||||
: undefined;
|
||||
|
||||
let existingData = (taskResult != null)
|
||||
? taskResult.metadata
|
||||
: {};
|
||||
|
||||
let newData = update(existingData);
|
||||
|
||||
logSimulated(`${printTX(tx)} ${printItem(id, task)} Storing metadata`, newData);
|
||||
});
|
||||
},
|
||||
|
||||
expire: function (tx, { id, task }) {
|
||||
logSimulated(`${printTX(tx)} ${printItem(id, task)} Expiring task result`);
|
||||
},
|
||||
|
||||
markTaskStatus: function (tx, { id, task, isSuccessful }) {
|
||||
if (isSuccessful) {
|
||||
logSimulated(`${printTX(tx)} ${printItem(id, task)} Marking task as completed`);
|
||||
} else {
|
||||
logSimulated(`${printTX(tx)} ${printItem(id, task)} Marking task as failed`);
|
||||
}
|
||||
},
|
||||
|
||||
countLockedTasks: function (tx) {
|
||||
return Array.from(locks).reduce((total, [ task, taskLocks ]) => {
|
||||
return total + taskLocks.size;
|
||||
}, 0);
|
||||
},
|
||||
|
||||
getUpdateStream: function (... args) {
|
||||
return backend.getUpdateStream(... args);
|
||||
},
|
||||
|
||||
getTaskStream: function (... args) {
|
||||
return backend.getTaskStream(... args);
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
};
|
|
@ -1,48 +0,0 @@
|
|||
"use strict";
|
||||
|
||||
const defaultValue = require("default-value");
|
||||
const mapObj = require("map-obj");
|
||||
|
||||
module.exports = function createDependencyMap(configuration) {
|
||||
let dependencyMap = mapObj(configuration.tasks, (task, definition) => {
|
||||
return [
|
||||
task,
|
||||
defaultValue(definition.dependsOn, []).map((dependencyName) => {
|
||||
let dependency = configuration.tasks[dependencyName];
|
||||
|
||||
if (dependency != null) {
|
||||
return {
|
||||
task: dependencyName,
|
||||
// TODO: Do defaults processing in configuration loading/validation instead
|
||||
taskVersion: defaultValue(dependency.version, "0")
|
||||
};
|
||||
} else {
|
||||
throw new Error(`Invalid dependency specified, task does not exist: ${dependencyName}`);
|
||||
}
|
||||
})
|
||||
];
|
||||
});
|
||||
|
||||
// NOTE: When inverting the dependencyMap, we totally ignore the taskVersion of dependencies when keying this mapping. While the taskVersion of specific tasks *may* matter to the code that uses these mappings, we don't support more than one version of a task simultaneously existing, and so keying by the task name alone is sufficient.
|
||||
let dependentMap = {};
|
||||
|
||||
for (let [ task, dependencies ] of Object.entries(dependencyMap)) {
|
||||
let taskDefinition = configuration.tasks[task];
|
||||
|
||||
for (let dependency of dependencies) {
|
||||
if (dependentMap[dependency.task] == null) {
|
||||
dependentMap[dependency.task] = [];
|
||||
}
|
||||
|
||||
dependentMap[dependency.task].push({
|
||||
task: task,
|
||||
taskVersion: defaultValue(taskDefinition.version, "0")
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
dependencyMap,
|
||||
dependentMap
|
||||
};
|
||||
};
|
298
src/kernel.js
298
src/kernel.js
|
@ -1,227 +1,131 @@
|
|||
"use strict";
|
||||
|
||||
const Promise = require("bluebird");
|
||||
const defaultValue = require("default-value");
|
||||
const chalk = require("chalk");
|
||||
const util = require("util");
|
||||
const syncpipe = require("syncpipe");
|
||||
|
||||
const rateLimit = require("@promistream/rate-limit");
|
||||
const simpleSink = require("@promistream/simple-sink");
|
||||
const pipe = require("@promistream/pipe");
|
||||
const parallelize = require("@promistream/parallelize");
|
||||
|
||||
const initialize = require("./initialize");
|
||||
const logStatus = require("./log-status");
|
||||
const createDependencyMap = require("./dependency-map");
|
||||
const { validateOptions } = require("@validatem/core");
|
||||
const isValidConfiguration = require("./validators/is-valid-configuration");
|
||||
const createPrometheus = require("./prometheus");
|
||||
const generateTaskGraph = require("./util/generate-task-graph");
|
||||
const unreachable = require("@joepie91/unreachable")("srap");
|
||||
|
||||
// FIXME: *Require* a taskInterval to be set, even if explicitly null, to prevent accidentally forgetting it
|
||||
|
||||
// TODO: Publish this as a separate package
|
||||
// Inverts an object of arrays, eg. {a: [x, y], b: [x, z]} becomes {x: [a, b], y: [a], z: [b]}
|
||||
// Useful for eg. tag mappings
|
||||
function invertMapping(object) {
|
||||
let newObject = {};
|
||||
|
||||
for (let [ key, valueList ] of Object.entries(object)) {
|
||||
for (let value of valueList) {
|
||||
if (newObject[value] == null) {
|
||||
newObject[value] = [];
|
||||
}
|
||||
|
||||
newObject[value].push(key);
|
||||
}
|
||||
}
|
||||
|
||||
return newObject;
|
||||
}
|
||||
module.exports = async function createKernel(_configuration) {
|
||||
let configuration = validateOptions(arguments, isValidConfiguration);
|
||||
|
||||
module.exports = function createKernel(configuration) {
|
||||
return Promise.try(() => {
|
||||
return initialize({
|
||||
knexfile: {
|
||||
client: "pg",
|
||||
connection: configuration.database,
|
||||
pool: { min: 0, max: 32 },
|
||||
migrations: { tableName: "srap_knex_migrations" }
|
||||
let state = {
|
||||
... createPrometheus(),
|
||||
tasks: generateTaskGraph({
|
||||
tags: configuration.tags,
|
||||
tasks: configuration.tasks
|
||||
})
|
||||
};
|
||||
|
||||
let { metrics, tasks } = state;
|
||||
|
||||
const createBackend = require("./database-backends")(state);
|
||||
|
||||
let attachToGlobalRateLimit = (configuration.taskInterval != null)
|
||||
? rateLimit.clonable(configuration.taskInterval)
|
||||
: undefined;
|
||||
|
||||
let backend = await createBackend({
|
||||
backend: configuration.backend,
|
||||
options: configuration.database
|
||||
});
|
||||
|
||||
Object.assign(state, { backend: backend });
|
||||
|
||||
const createTaskKernel = require("./streams/task-kernel")(state);
|
||||
const runTask = require("./run-task")(state);
|
||||
|
||||
function checkLockedTasks() {
|
||||
return Promise.try(() => {
|
||||
return backend.topLevel.countLockedTasks();
|
||||
}).then((lockedCount) => {
|
||||
if (lockedCount > 0) {
|
||||
console.log(`${chalk.bold.red("WARNING:")} There are ${lockedCount} tasks currently locked, and they will not be run! This may be caused by a process crash in the past. See the documentation for more details on how to solve this issue.`);
|
||||
}
|
||||
});
|
||||
}).then((state) => {
|
||||
const queries = require("./queries")(state);
|
||||
const createTaskStream = require("./task-stream")(state);
|
||||
const createDatabaseQueue = require("./queued-database-api")(state);
|
||||
}
|
||||
|
||||
let { knex, prometheusRegistry, metrics } = state;
|
||||
let { dependencyMap, dependentMap } = createDependencyMap(configuration);
|
||||
|
||||
function insertSeeds() {
|
||||
return Promise.map(configuration.seed, (item) => {
|
||||
return queries.createItem(knex, {
|
||||
... item,
|
||||
allowUpsert: false,
|
||||
failIfExists: false
|
||||
});
|
||||
});
|
||||
let databasePreparePromise;
|
||||
async function prepareDatabase() {
|
||||
if (databasePreparePromise == null) {
|
||||
databasePreparePromise = Promise.all([
|
||||
checkLockedTasks(),
|
||||
backend.topLevel.insertSeeds(null, { seeds: configuration.seed })
|
||||
]);
|
||||
}
|
||||
|
||||
function checkLockedTasks() {
|
||||
return Promise.try(() => {
|
||||
return queries.countLockedTasks(knex);
|
||||
}).then((lockedCount) => {
|
||||
if (lockedCount > 0) {
|
||||
console.log(`${chalk.bold.red("WARNING:")} There are ${lockedCount} tasks currently locked, and they will not be run! This may be caused by a process crash in the past. See the documentation for more details on how to solve this issue.`);
|
||||
}
|
||||
});
|
||||
}
|
||||
return databasePreparePromise;
|
||||
}
|
||||
|
||||
function runTaskStreams() {
|
||||
let tasks = invertMapping(configuration.tags);
|
||||
|
||||
let attachToGlobalRateLimit = (configuration.taskInterval != null)
|
||||
? rateLimit.clonable(configuration.taskInterval)
|
||||
: undefined;
|
||||
|
||||
console.log(`Starting ${Object.keys(tasks).length} tasks...`);
|
||||
|
||||
return Promise.map(Object.entries(tasks), ([ task, tags ]) => {
|
||||
let taskConfiguration = configuration.tasks[task];
|
||||
|
||||
if (taskConfiguration != null) {
|
||||
let taskStream = createTaskStream({
|
||||
task: task,
|
||||
tags: tags,
|
||||
taskVersion: defaultValue(taskConfiguration.version, "0"),
|
||||
taskInterval: taskConfiguration.taskInterval,
|
||||
parallelTasks: taskConfiguration.parallelTasks,
|
||||
ttl: taskConfiguration.ttl,
|
||||
run: taskConfiguration.run,
|
||||
globalRateLimiter: (attachToGlobalRateLimit != null)
|
||||
? attachToGlobalRateLimit()
|
||||
: null,
|
||||
globalParallelize: (configuration.parallelTasks != null)
|
||||
? parallelize(configuration.parallelTasks)
|
||||
: null,
|
||||
taskDependencies: dependencyMap[task],
|
||||
taskDependents: dependentMap[task]
|
||||
});
|
||||
|
||||
return pipe([
|
||||
taskStream,
|
||||
simpleSink((completedItem) => {
|
||||
// FIXME: Don't dump to console.log below, since this is meant to be usable as a library as well - provide some sort of object logging hook instead?
|
||||
return {
|
||||
run: async function runKernel() {
|
||||
console.log(`Starting ${tasks.size} tasks...`);
|
||||
await prepareDatabase();
|
||||
|
||||
return Promise.map(tasks.values(), (task) => {
|
||||
return pipe([
|
||||
createTaskKernel(task, {
|
||||
globalRateLimiter: (attachToGlobalRateLimit != null) ? attachToGlobalRateLimit() : null,
|
||||
}),
|
||||
simpleSink(({ status }) => {
|
||||
if (status === "completed") {
|
||||
metrics.successfulItems.inc(1);
|
||||
metrics.successfulItems.labels({ task: task }).inc(1);
|
||||
logStatus(task, chalk.bold.green, "completed", completedItem.id);
|
||||
})
|
||||
]).read();
|
||||
} else {
|
||||
throw new Error(`Task '${task}' is defined to run for tags [${tags}], but no such task is defined`);
|
||||
}
|
||||
}).catch((error) => {
|
||||
console.dir(error, { depth: null, colors: true });
|
||||
throw error;
|
||||
} else if (status === "failed") {
|
||||
metrics.failedItems.inc(1);
|
||||
metrics.failedItems.labels({ task: task }).inc(1);
|
||||
} else {
|
||||
unreachable(`Unrecognized status '${status}'`);
|
||||
}
|
||||
})
|
||||
]).read();
|
||||
});
|
||||
}
|
||||
|
||||
function executeTask(id, task) {
|
||||
let taskConfiguration = configuration.tasks[task];
|
||||
},
|
||||
simulate: async function simulate({ itemID, task: taskName }) {
|
||||
console.log(`Simulating task ${itemID}/${taskName}...`);
|
||||
await prepareDatabase();
|
||||
|
||||
return knex.transaction((tx) => {
|
||||
return Promise.try(() => {
|
||||
return queries.getItem(knex, id);
|
||||
}).then((item) => {
|
||||
let queue = createDatabaseQueue({
|
||||
tx,
|
||||
item,
|
||||
task,
|
||||
taskVersion: defaultValue(taskConfiguration.version, "0"),
|
||||
taskDependents: dependentMap[task],
|
||||
taskDependencies: dependencyMap[task]
|
||||
});
|
||||
let simulatedBackend = backend.topLevel.simulate();
|
||||
|
||||
return Promise.try(() => {
|
||||
return taskConfiguration.run({
|
||||
id: item.id,
|
||||
data: item.data,
|
||||
getItem: function (id) {
|
||||
return queries.getItem(knex, id);
|
||||
},
|
||||
... queue.api
|
||||
});
|
||||
}).then(() => {
|
||||
return queue.execute();
|
||||
});
|
||||
});
|
||||
}, { doNotRejectOnRollback: false });
|
||||
}
|
||||
|
||||
function simulateTask(id, task) {
|
||||
let taskConfiguration = configuration.tasks[task];
|
||||
let simulateTask = require("./run-task")({
|
||||
... state,
|
||||
backend: simulatedBackend
|
||||
});
|
||||
|
||||
let methods = [ "createItem", "renameItem", "mergeItem", "deleteItem", "createAlias", "deleteAlias", "updateData", "updateMetadata", "expire", "expireDependents" ];
|
||||
let item = await simulatedBackend.topLevel.getItem(null, { id: itemID });
|
||||
return simulateTask(tasks.get(taskName), item);
|
||||
},
|
||||
execute: async function simulate({ itemID, task: taskName }) {
|
||||
// TODO: Should this also lock the task? We probably want to ignore any locks, since this method is primarily used for task logic debugging purposes, and overriding locks would be desirable there.
|
||||
console.log(`Running task ${itemID}/${taskName}...`);
|
||||
await prepareDatabase();
|
||||
|
||||
let simulatedMethods = syncpipe(methods, [
|
||||
(_) => _.map((method) => [ method, function() {
|
||||
console.log(`${chalk.bold.yellow.bgBlack(`${method} (simulated):`)} ${util.inspect(arguments, { colors: true, depth: null })}`);
|
||||
}]),
|
||||
(_) => Object.fromEntries(_)
|
||||
]);
|
||||
|
||||
let item = await backend.topLevel.getItem(null, { id: itemID });
|
||||
return runTask(tasks.get(taskName), item);
|
||||
},
|
||||
shutdown: function () {
|
||||
// TODO: Properly lock all public methods after shutdown is called, and wait for any running tasks to have completed
|
||||
return backend.topLevel.shutdown();
|
||||
},
|
||||
getMetrics: function () {
|
||||
return Promise.try(() => {
|
||||
return queries.getItem(knex, id);
|
||||
}).then((item) => {
|
||||
return taskConfiguration.run({
|
||||
id: item.id,
|
||||
data: item.data,
|
||||
getItem: function (id) {
|
||||
return queries.getItem(knex, id);
|
||||
},
|
||||
... simulatedMethods
|
||||
});
|
||||
return state.prometheusRegistry.metrics();
|
||||
}).then((metrics) => {
|
||||
return {
|
||||
contentType: state.prometheusRegistry.contentType,
|
||||
metrics: metrics
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
run: function runKernel() {
|
||||
return Promise.try(() => {
|
||||
return insertSeeds();
|
||||
}).then(() => {
|
||||
return checkLockedTasks();
|
||||
}).then(() => {
|
||||
return runTaskStreams();
|
||||
});
|
||||
},
|
||||
simulate: function simulate({ itemID, task }) {
|
||||
return Promise.try(() => {
|
||||
return insertSeeds();
|
||||
}).then(() => {
|
||||
return checkLockedTasks();
|
||||
}).then(() => {
|
||||
return simulateTask(itemID, task);
|
||||
});
|
||||
},
|
||||
execute: function simulate({ itemID, task }) {
|
||||
return Promise.try(() => {
|
||||
return insertSeeds();
|
||||
}).then(() => {
|
||||
return checkLockedTasks();
|
||||
}).then(() => {
|
||||
return executeTask(itemID, task);
|
||||
});
|
||||
},
|
||||
shutdown: function () {
|
||||
// TODO: Properly lock all public methods after shutdown is called, and wait for any running tasks to have completed
|
||||
knex.destroy();
|
||||
},
|
||||
getMetrics: function () {
|
||||
return Promise.try(() => {
|
||||
return prometheusRegistry.metrics();
|
||||
}).then((metrics) => {
|
||||
return {
|
||||
contentType: prometheusRegistry.contentType,
|
||||
metrics: metrics
|
||||
};
|
||||
});
|
||||
}
|
||||
};
|
||||
});
|
||||
};
|
||||
};
|
||||
|
|
|
@ -1,61 +0,0 @@
|
|||
"use strict";
|
||||
|
||||
const defaultValue = require("default-value");
|
||||
const chalk = require("chalk");
|
||||
const logStatus = require("../log-status");
|
||||
|
||||
module.exports = function (state) {
|
||||
const queries = require("../queries")(state);
|
||||
|
||||
return function createDatabaseMutationAPI({ tx, item, taskVersion, task }) {
|
||||
return {
|
||||
createItem: function (options) {
|
||||
logStatus(task, chalk.gray, "new", options.id);
|
||||
|
||||
return queries.createItem(tx, {
|
||||
...options,
|
||||
parentID: item.id
|
||||
});
|
||||
},
|
||||
renameItem: function (options) {
|
||||
return queries.renameItem(tx, options);
|
||||
},
|
||||
mergeItem: function (options) {
|
||||
// FIXME: Move default
|
||||
return queries.mergeItem(tx, {
|
||||
...options,
|
||||
from: defaultValue(options.from, item.id)
|
||||
});
|
||||
},
|
||||
deleteItem: function (options) {
|
||||
return queries.deleteItem(tx, {
|
||||
id: options.id
|
||||
});
|
||||
},
|
||||
createAlias: function (options) {
|
||||
// FIXME: Move default
|
||||
return queries.createAlias(tx, {
|
||||
...options,
|
||||
to: defaultValue(options.to, item.id)
|
||||
});
|
||||
},
|
||||
deleteAlias: function (from) {
|
||||
return queries.deleteAlias(tx, {
|
||||
from: from
|
||||
});
|
||||
},
|
||||
updateData: function (options) {
|
||||
return queries.updateData(tx, options);
|
||||
},
|
||||
updateMetadata: function (options) {
|
||||
return queries.updateMetadata(tx, {
|
||||
... options,
|
||||
taskVersion: taskVersion
|
||||
});
|
||||
},
|
||||
expire: function (options) {
|
||||
return queries.expire(tx, options);
|
||||
}
|
||||
};
|
||||
};
|
||||
};
|
|
@ -1,79 +0,0 @@
|
|||
"use strict";
|
||||
|
||||
const Promise = require("bluebird");
|
||||
const chalk = require("chalk");
|
||||
const util = require("util");
|
||||
|
||||
function logCall(methodName, args) {
|
||||
console.log(`${chalk.bold.yellow.bgBlack(`${methodName} (simulated):`)} ${util.inspect(args, { colors: true, depth: null })}`);
|
||||
}
|
||||
|
||||
// TODO: Make this do an actual database query and then rollback; that way the behaviour is the same as when really modifying the DB, in that earlier operations can affect what later operations see (eg. a createItem followed by a mergeItem involving that new item).
|
||||
|
||||
module.exports = function (state) {
|
||||
const queries = require("../queries")(state);
|
||||
|
||||
return function createSimulationMutationAPI({ tx, item, taskVersion }) {
|
||||
return {
|
||||
createItem: function (options) {
|
||||
logCall("createItem", {
|
||||
... options,
|
||||
update: (options.update != null)
|
||||
? options.update(item.data)
|
||||
: undefined
|
||||
});
|
||||
},
|
||||
renameItem: function (options) {
|
||||
logCall("renameItem", {
|
||||
... options
|
||||
});
|
||||
},
|
||||
mergeItem: function (options) {
|
||||
// FIXME: Visualize merges
|
||||
logCall("createItem", {
|
||||
... options
|
||||
});
|
||||
},
|
||||
deleteItem: function (options) {
|
||||
logCall("deleteItem", {
|
||||
... options
|
||||
});
|
||||
},
|
||||
createAlias: function (options) {
|
||||
logCall("createAlias", {
|
||||
... options
|
||||
});
|
||||
},
|
||||
deleteAlias: function (from) {
|
||||
logCall("deleteAlias", {
|
||||
from: from
|
||||
});
|
||||
},
|
||||
updateData: function (options) {
|
||||
logCall("updateData", {
|
||||
... options,
|
||||
update: (options.update != null)
|
||||
? options.update(item.data)
|
||||
: undefined
|
||||
});
|
||||
},
|
||||
updateMetadata: function (options) {
|
||||
return Promise.try(() => {
|
||||
return queries.getItem(tx, id);
|
||||
}).then((item) => {
|
||||
// FIXME: Visualize metadata update, actually merge with the correct thing
|
||||
// MARKER: Expose taskResults as an object mapping instead of an array, somehow
|
||||
logCall("updateMetadata", {
|
||||
... options,
|
||||
update: (options.update != null)
|
||||
? options.update(item.data)
|
||||
: undefined
|
||||
});
|
||||
});
|
||||
},
|
||||
expire: function (options) {
|
||||
return queries.expire(tx, options);
|
||||
}
|
||||
};
|
||||
};
|
||||
};
|
|
@ -1,111 +0,0 @@
|
|||
"use strict";
|
||||
|
||||
const Promise = require("bluebird");
|
||||
|
||||
const wrapValueAsOption = require("@validatem/wrap-value-as-option");
|
||||
const required = require("@validatem/required");
|
||||
const isString = require("@validatem/is-string");
|
||||
const defaultTo = require("@validatem/default-to");
|
||||
const validateOptions = require("@validatem/core/src/api/validate-options");
|
||||
const isFunction = require("@validatem/is-function");
|
||||
const arrayOf = require("@validatem/array-of");
|
||||
const defaultValue = require("default-value");
|
||||
|
||||
// FIXME: Remaining validators
|
||||
|
||||
module.exports = function wrapMutationAPI({ item, task, taskDependents }, api) {
|
||||
return {
|
||||
createItem: function (options) {
|
||||
// FIXME: Require tags to be set, even if to an empty array, to avoid accidentally forgetting the tags
|
||||
return api.createItem(options);
|
||||
},
|
||||
renameItem: function (_options) {
|
||||
let options = validateOptions(arguments, [
|
||||
wrapValueAsOption("to"), {
|
||||
to: [ required, isString ],
|
||||
from: [ defaultTo(item.id), isString ]
|
||||
}
|
||||
]);
|
||||
|
||||
return api.renameItem(options);
|
||||
},
|
||||
mergeItem: function (options) {
|
||||
return api.mergeItem(options);
|
||||
},
|
||||
deleteItem: function (_options) {
|
||||
let options = validateOptions(arguments, [
|
||||
defaultTo({}),
|
||||
wrapValueAsOption("id"), {
|
||||
id: [ defaultTo(item.id), isString ]
|
||||
}
|
||||
]);
|
||||
|
||||
return api.deleteItem(options);
|
||||
},
|
||||
createAlias: function (options) {
|
||||
// TODO: Single-parameter variant?
|
||||
return api.createAlias(options);
|
||||
},
|
||||
deleteAlias: function (from) {
|
||||
// FIXME: options wrapper
|
||||
return api.deleteAlias(from);
|
||||
},
|
||||
updateData: function (_options) {
|
||||
let options = validateOptions(arguments, [
|
||||
wrapValueAsOption("update"), {
|
||||
id: [ defaultTo(item.id), isString ],
|
||||
update: [ required, isFunction ]
|
||||
}
|
||||
]);
|
||||
|
||||
return api.updateData(options);
|
||||
},
|
||||
updateMetadata: function (_options) {
|
||||
let options = validateOptions(arguments, [
|
||||
wrapValueAsOption("update"), {
|
||||
id: [ defaultTo(item.id), isString ],
|
||||
taskName: [ defaultTo(task), isString ],
|
||||
update: [ required, isFunction ]
|
||||
}
|
||||
]);
|
||||
|
||||
return api.updateMetadata(options);
|
||||
},
|
||||
expire: function (_options) {
|
||||
let options = validateOptions(arguments, [
|
||||
defaultTo({}), {
|
||||
id: [ defaultTo(item.id), isString ],
|
||||
taskName: [ defaultTo(task), isString ]
|
||||
}
|
||||
]);
|
||||
|
||||
return api.expire(options);
|
||||
},
|
||||
expireDependents: function (_options) {
|
||||
let options = validateOptions(arguments, [
|
||||
defaultTo({}),
|
||||
wrapValueAsOption("dependents"), {
|
||||
id: [ defaultTo(item.id), isString ],
|
||||
dependents: [ arrayOf(isString) ]
|
||||
}
|
||||
]);
|
||||
|
||||
let selectedDependents = (options.dependents != null)
|
||||
? new Set(options.dependents)
|
||||
: null;
|
||||
|
||||
let allDependents = defaultValue(taskDependents, []);
|
||||
|
||||
let affectedDependents = (selectedDependents != null)
|
||||
? allDependents.filter((dependent) => selectedDependents.has(dependent.task))
|
||||
: allDependents;
|
||||
|
||||
return Promise.map(affectedDependents, (dependent) => {
|
||||
return this.expire({
|
||||
id: options.id,
|
||||
taskName: dependent.task
|
||||
});
|
||||
});
|
||||
}
|
||||
};
|
||||
};
|
|
@ -1,26 +1,12 @@
|
|||
"use strict";
|
||||
|
||||
const Promise = require("bluebird");
|
||||
const path = require("path");
|
||||
const knex = require("knex");
|
||||
const { knexSnakeCaseMappers } = require("objection");
|
||||
const prometheusClient = require("prom-client");
|
||||
|
||||
const models = require("./models");
|
||||
|
||||
let migrationsFolder = path.join(__dirname, "../migrations");
|
||||
|
||||
module.exports = function initialize({ knexfile }) {
|
||||
module.exports = function createPrometheus() {
|
||||
let prometheusRegistry = new prometheusClient.Registry();
|
||||
prometheusClient.collectDefaultMetrics({ register: prometheusRegistry });
|
||||
|
||||
let knexInstance = knex({
|
||||
... knexfile,
|
||||
... knexSnakeCaseMappers()
|
||||
});
|
||||
|
||||
let state = {
|
||||
knex: knexInstance,
|
||||
return {
|
||||
prometheusRegistry: prometheusRegistry,
|
||||
metrics: {
|
||||
storedItems: new prometheusClient.Counter({
|
||||
|
@ -53,18 +39,7 @@ module.exports = function initialize({ knexfile }) {
|
|||
help: "Amount of new scraping tasks fetched during the most recent attempt",
|
||||
labelNames: [ "task" ]
|
||||
})
|
||||
// FIXME: Measure queue-refill task
|
||||
}
|
||||
};
|
||||
|
||||
return Promise.try(() => {
|
||||
return knexInstance.migrate.latest({
|
||||
directory: migrationsFolder
|
||||
});
|
||||
}).then(() => {
|
||||
return {
|
||||
... state,
|
||||
db: models(state)
|
||||
};
|
||||
});
|
||||
};
|
||||
|
487
src/queries.js
487
src/queries.js
|
@ -1,487 +0,0 @@
|
|||
"use strict";
|
||||
|
||||
const Promise = require("bluebird");
|
||||
// const { UniqueViolationError } = require("objection");
|
||||
const dateFns = require("date-fns");
|
||||
|
||||
const { validateArguments } = require("@validatem/core");
|
||||
const required = require("@validatem/required");
|
||||
const requireEither = require("@validatem/require-either");
|
||||
const isString = require("@validatem/is-string");
|
||||
const isBoolean = require("@validatem/is-boolean");
|
||||
const isFunction = require("@validatem/is-function");
|
||||
const isNumber = require("@validatem/is-number");
|
||||
const isDate = require("@validatem/is-date");
|
||||
const arrayOf = require("@validatem/array-of");
|
||||
const defaultTo = require("@validatem/default-to");
|
||||
const anyProperty = require("@validatem/any-property");
|
||||
const anything = require("@validatem/anything");
|
||||
const ValidationError = require("@validatem/error");
|
||||
|
||||
const pipe = require("@promistream/pipe");
|
||||
const combineSequentialStreaming = require("@promistream/combine-sequential-streaming");
|
||||
const fromIterable = require("@promistream/from-iterable");
|
||||
const fromNodeStream = require("@promistream/from-node-stream");
|
||||
const createTypeTaggingStream = require("./streams/tag-type");
|
||||
|
||||
const { addSeconds } = require("date-fns");
|
||||
const syncpipe = require("syncpipe");
|
||||
const defaultValue = require("default-value");
|
||||
|
||||
function isTX(value) {
|
||||
if (value.where == null || value.raw == null) {
|
||||
throw new ValidationError(`Must be a valid Knex or Knex transaction instance`);
|
||||
}
|
||||
}
|
||||
|
||||
function noop() {}
|
||||
|
||||
function taskResultsToObject(taskResults) {
|
||||
return syncpipe(taskResults, [
|
||||
(_) => _.map((result) => [ result.taskName, result.metadata ]),
|
||||
(_) => Object.fromEntries(_)
|
||||
]);
|
||||
}
|
||||
|
||||
module.exports = function ({ db, knex, metrics }) {
|
||||
return {
|
||||
// FIXME: Make object API instead
|
||||
getItem: function (_tx, _id, _optional) {
|
||||
let [ tx, id, optional ] = validateArguments(arguments, {
|
||||
tx: [ required, isTX ],
|
||||
id: [ required, isString ],
|
||||
optional: [ defaultTo(false), isBoolean ]
|
||||
});
|
||||
|
||||
return Promise.try(() => {
|
||||
return db.Alias.relatedQuery("item", tx)
|
||||
.for(id)
|
||||
.withGraphFetched("taskResults");
|
||||
}).then((results) => {
|
||||
if (optional === true || results.length > 0) {
|
||||
return results[0];
|
||||
} else {
|
||||
throw new Error(`No item exists with ID '${id}'`);
|
||||
}
|
||||
});
|
||||
},
|
||||
createItem: function (_tx, _options) {
|
||||
// NOTE: Using `update` instead of `data` makes it an upsert!
|
||||
// FIXME: Make failIfExists actually work, currently it does nothing as the UNIQUE constraint violation cannot occur for an upsert
|
||||
let [ tx, { id, tags, aliases, data, update, failIfExists, allowUpsert, parentID }] = validateArguments(arguments, {
|
||||
tx: [ required, isTX ],
|
||||
options: [{
|
||||
id: [ required, isString ],
|
||||
tags: [ defaultTo([]), arrayOf(isString) ],
|
||||
aliases: [ defaultTo([]), arrayOf(isString) ],
|
||||
data: [ anything ], // FIXME: Check for object
|
||||
update: [ isFunction ],
|
||||
failIfExists: [ defaultTo(false), isBoolean ],
|
||||
allowUpsert: [ defaultTo(true), isBoolean ],
|
||||
parentID: [ isString ]
|
||||
}, requireEither([ "data", "update" ]) ]
|
||||
});
|
||||
|
||||
// FIXME: Ensure that we run the transaction in full isolation mode, and retry in case of a conflict
|
||||
|
||||
return Promise.try(() => {
|
||||
// NOTE: We look up by alias, since this is an upsert - and so if the specified ID already exists as an alias, we should update the existing item instead of creating a new one with the specified (aliased) ID
|
||||
return db.Alias
|
||||
.relatedQuery("item", tx)
|
||||
.for(id);
|
||||
}).then((existingItems) => {
|
||||
let existingItem = existingItems[0];
|
||||
|
||||
let actualID = (existingItem != null)
|
||||
? existingItem.id
|
||||
: id;
|
||||
|
||||
let existingData = (existingItem != null)
|
||||
? existingItem.data
|
||||
: {};
|
||||
|
||||
let newData = (update != null)
|
||||
? update(existingData)
|
||||
: { ... existingData, ... data };
|
||||
|
||||
// Make sure to add a self:self alias
|
||||
let allAliases = aliases.concat([ actualID ]);
|
||||
|
||||
let newItem = {
|
||||
id: actualID,
|
||||
data: newData,
|
||||
createdBy: parentID,
|
||||
tags: tags.map((tag) => ({ name: tag })),
|
||||
aliases: allAliases.map((alias) => ({ alias: alias })),
|
||||
updatedAt: new Date()
|
||||
};
|
||||
|
||||
return Promise.try(() => {
|
||||
if (allowUpsert) {
|
||||
// NOTE: We *always* do upserts here, even if the user specified `data` rather than `update`, because tags and aliases should always be added even if the item itself already exists. We trust the user not to accidentally reuse IDs between different kinds of objects (which would break in various other ways anyway).
|
||||
return db.Item.query(tx).upsertGraph(newItem, {
|
||||
insertMissing: true,
|
||||
noDelete: true
|
||||
});
|
||||
} else {
|
||||
return db.Item.query(tx).insertGraph(newItem, {
|
||||
insertMissing: true
|
||||
});
|
||||
}
|
||||
}).tap(() => {
|
||||
// FIXME: We should probably move the metrics stuff to the wrapper instead, so that it works for *any* backend
|
||||
metrics.storedItems.inc(1);
|
||||
|
||||
// TODO: This currently produces somewhat misleading metrics; it only counts *explicitly specified* tags. That will *mostly* correlate to amount of genuinely-new items, but not perfectly. In the future, we should probably refactor the insertion code such that it is aware of the *current* tags of an item that it is about to merge into - but maybe that should be delayed until the zapdb migration.
|
||||
if (newItem.tags != null) {
|
||||
for (let tag of newItem.tags) {
|
||||
metrics.storedItems.labels({ tag: tag.name }).inc(1);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
}).catch({ name: "UniqueViolationError", table: "srap_items" }, (error) => {
|
||||
if (failIfExists) {
|
||||
throw error;
|
||||
} else {
|
||||
// Do nothing, just ignore the failure
|
||||
}
|
||||
});
|
||||
},
|
||||
renameItem: function (_tx, _options) {
|
||||
// options == to || { from, to }
|
||||
let [ tx, { to, from }] = validateArguments(arguments, {
|
||||
tx: [ required, isTX ],
|
||||
options: [{
|
||||
to: [ required, isString ],
|
||||
from: [ required, isString ]
|
||||
}]
|
||||
});
|
||||
|
||||
return Promise.all([
|
||||
db.Item.query(tx).findById(from).patch({ id: to }),
|
||||
this.createAlias(tx, { from: to, to: to })
|
||||
]);
|
||||
},
|
||||
repointAliases: function (_tx, _options) {
|
||||
// { from, to }
|
||||
let [ tx, { to, from }] = validateArguments(arguments, {
|
||||
tx: [ required, isTX ],
|
||||
options: [{
|
||||
to: [ required, isString ],
|
||||
from: [ required, isString ]
|
||||
}]
|
||||
});
|
||||
|
||||
return db.Alias.query(tx)
|
||||
.patch({ itemId: to, updatedAt: new Date() })
|
||||
.where({ itemId: from });
|
||||
},
|
||||
mergeItem: function (_tx, _options) {
|
||||
// options = { from, into, merge, mergeMetadata{} }
|
||||
let [ tx, { from, into, merge, mergeMetadata }] = validateArguments(arguments, {
|
||||
tx: [ required, isTX ],
|
||||
options: [{
|
||||
from: [ required, isString ],
|
||||
into: [ required, isString ],
|
||||
merge: [ required, isFunction ],
|
||||
mergeMetadata: [ defaultTo({}), anyProperty({
|
||||
key: [ required ],
|
||||
value: [ required, isFunction ]
|
||||
})],
|
||||
}]
|
||||
});
|
||||
|
||||
return Promise.all([
|
||||
this.getItem(tx, from, true),
|
||||
this.getItem(tx, into, true),
|
||||
]).then(([ fromObj, intoObj ]) => {
|
||||
if (fromObj != null) {
|
||||
let defaultedIntoObj = defaultValue(intoObj, {
|
||||
id: into,
|
||||
data: {},
|
||||
taskResults: []
|
||||
});
|
||||
|
||||
let newData = merge(defaultedIntoObj.data, fromObj.data);
|
||||
|
||||
let fromTaskResults = taskResultsToObject(fromObj.taskResults);
|
||||
let intoTaskResults = taskResultsToObject(defaultedIntoObj.taskResults);
|
||||
|
||||
// FIXME: Deduplicate function
|
||||
let allTaskKeys = Array.from(new Set([
|
||||
... Object.keys(fromTaskResults),
|
||||
... Object.keys(intoTaskResults)
|
||||
]));
|
||||
|
||||
function selectNewestResult(taskA, taskB) {
|
||||
if (taskA == null) {
|
||||
return taskB;
|
||||
} else if (taskB == null) {
|
||||
return taskA;
|
||||
} else if (taskA.updatedAt > taskB.updatedAt) {
|
||||
return taskA;
|
||||
} else {
|
||||
return taskB;
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Use merge-by-template here instead?
|
||||
|
||||
let newTaskResults = allTaskKeys.map((key) => {
|
||||
let merger = mergeMetadata[key];
|
||||
let fromTask = fromTaskResults[key];
|
||||
let intoTask = intoTaskResults[key];
|
||||
|
||||
if (merger != null) {
|
||||
// Generate a new TaskResult that includes data combined from both
|
||||
let newMetadata = merger(
|
||||
defaultValue(intoTask.metadata, {}),
|
||||
defaultValue(fromTask.metadata, {})
|
||||
);
|
||||
|
||||
return {
|
||||
... intoTask,
|
||||
metadata: newMetadata,
|
||||
updatedAt: Date.now()
|
||||
};
|
||||
} else {
|
||||
// Take the newest known TaskResult and just make sure that it is pointing at the correct ID
|
||||
return {
|
||||
... selectNewestResult(intoTask, fromTask),
|
||||
itemId: defaultedIntoObj.id
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
let upsertOptions = {
|
||||
insertMissing: true,
|
||||
noDelete: true
|
||||
};
|
||||
|
||||
return Promise.try(() => {
|
||||
// NOTE: Cannot use into.$query here because that adds an implicit query builder operation, which upsertGraph does not allow
|
||||
return db.Item.query(tx).upsertGraph({
|
||||
id: defaultedIntoObj.id,
|
||||
data: newData,
|
||||
taskResults: newTaskResults
|
||||
}, upsertOptions);
|
||||
}).then(() => {
|
||||
// NOTE: Repointing aliases has the side-effect of leaving a redirect from the source to the destination item, as each item has a self:self alias
|
||||
return this.repointAliases(tx, { from: fromObj.id, to: intoObj.id });
|
||||
}).then(() => {
|
||||
// NOTE: We don't use this.deleteItem, to sidestep any alias lookups
|
||||
return db.Item.query(tx).findById(fromObj.id).delete();
|
||||
});
|
||||
}
|
||||
});
|
||||
},
|
||||
deleteItem: function (_tx, _options) {
|
||||
// options = none || { id }
|
||||
let [ tx, { id }] = validateArguments(arguments, {
|
||||
tx: [ required, isTX ],
|
||||
options: [{
|
||||
id: [ required, isString ]
|
||||
}]
|
||||
});
|
||||
|
||||
return db.Alias.relatedQuery("item", tx)
|
||||
.for(id)
|
||||
.delete();
|
||||
// return db.Item.query(tx).findById(id).delete();
|
||||
},
|
||||
createAlias: function (_tx, _options) {
|
||||
// options = { from, to, failIfExists }
|
||||
let [ tx, { from, to, failIfExists }] = validateArguments(arguments, {
|
||||
tx: [ required, isTX ],
|
||||
options: [{
|
||||
from: [ required, isString ],
|
||||
to: [ required, isString ],
|
||||
failIfExists: [ defaultTo(false), isBoolean ] // TODO: Shouldn't this default to true, for any occurrence outside of a merge/rename?
|
||||
}]
|
||||
});
|
||||
|
||||
// Isolate this operation into a savepoint so that it can fail without breaking the entire transaction
|
||||
let promise = tx.transaction((tx) => {
|
||||
return db.Alias.query(tx).insert({
|
||||
alias: from,
|
||||
itemId: to,
|
||||
updatedAt: new Date()
|
||||
});
|
||||
});
|
||||
|
||||
if (failIfExists) {
|
||||
return promise;
|
||||
} else {
|
||||
return Promise.resolve(promise)
|
||||
.catch({ name: "UniqueViolationError" }, noop);
|
||||
}
|
||||
},
|
||||
deleteAlias: function (_tx, _options) {
|
||||
let [ tx, { from }] = validateArguments(arguments, {
|
||||
tx: [ required, isTX ],
|
||||
options: [{
|
||||
from: [ required, isString ]
|
||||
}]
|
||||
});
|
||||
|
||||
// TODO: This cannot yet be propagated to the update feed, because we don't keep a record of deletions
|
||||
return db.Alias.query(tx).findById(from).delete();
|
||||
},
|
||||
updateData: function (_tx, _options) {
|
||||
// options = update || { id, update }
|
||||
let [ tx, { id, update }] = validateArguments(arguments, {
|
||||
tx: [ required, isTX ],
|
||||
options: [{
|
||||
id: [ required, isString ],
|
||||
update: [ required, isFunction ]
|
||||
}]
|
||||
});
|
||||
|
||||
// TODO: Figure out the proper delineation between 'creating' and 'updating' an item
|
||||
return this.createItem(tx, { id, update });
|
||||
},
|
||||
updateMetadata: function (_tx, _options) {
|
||||
// options = update || { id, update, taskName }
|
||||
let [ tx, { id, update, taskName, taskVersion, ttl }] = validateArguments(arguments, {
|
||||
tx: [ required, isTX ],
|
||||
options: [{
|
||||
id: [ required, isString ],
|
||||
update: [ required, isFunction ],
|
||||
taskName: [ required, isString ],
|
||||
taskVersion: [ required, isString ],
|
||||
ttl: [ isNumber ]
|
||||
}]
|
||||
});
|
||||
// TODO: failIfExists
|
||||
|
||||
// FIXME: metadata_updated_at
|
||||
return Promise.try(() => {
|
||||
return db.Alias.query(tx).findById(id);
|
||||
}).then((alias) => {
|
||||
let sharedFields = {
|
||||
isSuccessful: true,
|
||||
isInvalidated: false,
|
||||
taskVersion: taskVersion,
|
||||
updatedAt: new Date(),
|
||||
expiresAt: (ttl != null)
|
||||
? addSeconds(new Date(), ttl)
|
||||
: undefined
|
||||
};
|
||||
|
||||
if (alias != null) {
|
||||
return Promise.try(() => {
|
||||
return db.TaskResult.query(tx).findById([ taskName, alias.itemId ]);
|
||||
}).then((taskResult) => {
|
||||
if (taskResult != null) {
|
||||
return taskResult.$query(tx).patch({
|
||||
... sharedFields,
|
||||
metadata: update(taskResult.metadata),
|
||||
});
|
||||
} else {
|
||||
return db.TaskResult.query(tx).insert({
|
||||
... sharedFields,
|
||||
task: taskName,
|
||||
itemId: id,
|
||||
metadata: update({})
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
},
|
||||
expire: function (_tx, _options) {
|
||||
// options = none || { id, taskName }
|
||||
let [ tx, { id, taskName }] = validateArguments(arguments, {
|
||||
tx: [ required, isTX ],
|
||||
options: [{
|
||||
id: [ required, isString ],
|
||||
taskName: [ required, isString ]
|
||||
}]
|
||||
});
|
||||
|
||||
return Promise.try(() => {
|
||||
return db.Alias.query(tx).findById(id);
|
||||
}).then((alias) => {
|
||||
return db.TaskResult.query(tx)
|
||||
.where({ task: taskName, itemId: alias.itemId })
|
||||
.patch({ isInvalidated: true });
|
||||
});
|
||||
},
|
||||
setTTL: function (options) {
|
||||
// options = ttl || { id, taskName, ttl }
|
||||
// FIXME
|
||||
},
|
||||
allowFailure: function (allowed) {
|
||||
|
||||
},
|
||||
log: function (category, message) {
|
||||
|
||||
},
|
||||
countLockedTasks: function (tx) {
|
||||
return Promise.try(() => {
|
||||
return db.TaskInProgress.query(tx).count({ count: "*" });
|
||||
}).then((result) => {
|
||||
return result[0].count;
|
||||
});
|
||||
},
|
||||
getUpdates: function (_tx, _options) {
|
||||
// NOTE: This returns snake_cased keys! As we're bypassing the Objection internals, no casemapping occurs.
|
||||
let [ tx, { timestamp, prefix }] = validateArguments(arguments, {
|
||||
tx: [ required, isTX ],
|
||||
options: [ defaultTo({}), {
|
||||
timestamp: [ isDate ],
|
||||
prefix: [ isString ]
|
||||
}]
|
||||
});
|
||||
|
||||
// NOTE: This is a hacky workaround - if we don't do this, then for some reason also entries *at* the exact timestamp are included, which is not what we want.
|
||||
// FIXME: Verify that this doesn't break anything, eg. when an entry is created inbetween the original timestamp and +1ms.
|
||||
let actualTimestamp = (timestamp != null)
|
||||
? dateFns.addMilliseconds(timestamp, 1)
|
||||
: undefined;
|
||||
|
||||
function applyWhereClauses(query, idField) {
|
||||
if (timestamp != null) {
|
||||
// FIXME: An error in the query here throws an error, resulting in an abort handling bug in a promistream
|
||||
query = query.whereRaw(`updated_at > ?`, [ actualTimestamp ]);
|
||||
}
|
||||
|
||||
if (prefix != null) {
|
||||
query = query.whereRaw(`${idField} LIKE ?`, [ `${prefix.replace(/%/g, "\\%")}%` ]);
|
||||
}
|
||||
|
||||
return query;
|
||||
}
|
||||
|
||||
function* streamGenerator() {
|
||||
yield pipe([
|
||||
fromNodeStream.fromReadable(
|
||||
applyWhereClauses(db.Item.query(tx), "id").toKnexQuery().stream()
|
||||
),
|
||||
createTypeTaggingStream("item")
|
||||
]);
|
||||
|
||||
yield pipe([
|
||||
fromNodeStream.fromReadable(
|
||||
// NOTE: We are only interested in aliases which don't point at themselves
|
||||
applyWhereClauses(db.Alias.query(tx).where("alias", "!=", knex.ref("item_id")), "alias").toKnexQuery().stream()
|
||||
),
|
||||
createTypeTaggingStream("alias")
|
||||
]);
|
||||
|
||||
yield pipe([
|
||||
fromNodeStream.fromReadable(
|
||||
applyWhereClauses(db.TaskResult.query(tx), "item_id").toKnexQuery().stream()
|
||||
),
|
||||
createTypeTaggingStream("taskResult")
|
||||
]);
|
||||
}
|
||||
|
||||
return pipe([
|
||||
fromIterable(streamGenerator()),
|
||||
combineSequentialStreaming()
|
||||
]);
|
||||
}
|
||||
};
|
||||
};
|
|
@ -1,36 +0,0 @@
|
|||
"use strict";
|
||||
|
||||
const Promise = require("bluebird");
|
||||
const consumable = require("@joepie91/consumable");
|
||||
const syncpipe = require("syncpipe");
|
||||
|
||||
const createMutationAPIWrapper = require("./mutation-api/wrapper");
|
||||
|
||||
module.exports = function (state) {
|
||||
const createDatabaseMutationAPI = require("./mutation-api/database")(state);
|
||||
|
||||
return function createDatabaseQueue(context) {
|
||||
let databaseMutationAPI = createDatabaseMutationAPI(context);
|
||||
let mutationAPI = createMutationAPIWrapper(context, databaseMutationAPI);
|
||||
|
||||
let queue = consumable([]);
|
||||
|
||||
return {
|
||||
api: syncpipe(Object.keys(mutationAPI), [
|
||||
(_) => _.map((method) => [ method, function() { queue.peek().push([ method, arguments ]); } ]),
|
||||
(_) => Object.fromEntries(_)
|
||||
]),
|
||||
execute: function () {
|
||||
if (!queue.peek().some((method) => method[0] === "updateMetadata")) {
|
||||
// Doing an updateMetadata call is necessary to mark a task 'completed', so we inject a dummy call that doesn't actually change the metadata itself
|
||||
// FIXME: Split apart 'markTaskCompleted' and 'updateMetadata' queries so that this hack is no longer necessary
|
||||
queue.peek().push([ "updateMetadata", [ (data) => data ]]);
|
||||
}
|
||||
|
||||
return Promise.each(queue.consume(), ([ method, args ]) => {
|
||||
return mutationAPI[method](... args);
|
||||
});
|
||||
}
|
||||
};
|
||||
};
|
||||
};
|
39
src/run-task.js
Normal file
39
src/run-task.js
Normal file
|
@ -0,0 +1,39 @@
|
|||
"use strict";
|
||||
|
||||
const Promise = require("bluebird");
|
||||
const chalk = require("chalk");
|
||||
|
||||
const logStatus = require("./util/log-status");
|
||||
|
||||
module.exports = function ({ backend }) {
|
||||
return function runTask(task, item) {
|
||||
let queue = [];
|
||||
let api = backend.forItem({ task: task, item: item, mutationQueue: queue });
|
||||
|
||||
return Promise.try(() => {
|
||||
logStatus(task, chalk.bold.cyan, "started", item.id);
|
||||
|
||||
// NOTE: We only pass in the item data itself, *not* any associated metadata like tags. If the scraping task wants access to that sort of information, it should do a `getItem` call from within its task logic where needed.
|
||||
// FIXME: Is that actually still true post-refactor?
|
||||
return task.run({
|
||||
data: item.data,
|
||||
... api.exposed
|
||||
});
|
||||
}).then(() => {
|
||||
// NOTE: We only apply changes at the very end (outside of simulation mode), so that when a task implementation contains multiple operations, each of those operation always 'sees' the state at the start of the task, not the state after the previous mutation. This makes the model as a whole easier to reason about. In simulation mode, all calls are immediate and the queue is empty - after all, no mutation can happen in that case anyway. This is also another reason to ensure that operations in live mode always see the starting state; that makes its behaviour consistent with simulation mode.
|
||||
return backend.topLevel.runInTransaction(null, (tx) => {
|
||||
return Promise.each(queue, (operation) => {
|
||||
return operation(tx);
|
||||
});
|
||||
});
|
||||
}).then(async () => {
|
||||
await api.internal.markTaskCompleted();
|
||||
logStatus(task, chalk.bold.green, "completed", item.id);
|
||||
return { status: "completed", item: item };
|
||||
}).catch(async (error) => {
|
||||
await api.internal.markTaskFailed(null, { error });
|
||||
logStatus(task, chalk.bold.red, "failed", `${item.id}: ${error.stack}`);
|
||||
return { status: "failed", item: item, error: error };
|
||||
});
|
||||
};
|
||||
};
|
61
src/semantics/merge-items.js
Normal file
61
src/semantics/merge-items.js
Normal file
|
@ -0,0 +1,61 @@
|
|||
"use strict";
|
||||
|
||||
const defaultValue = require("default-value");
|
||||
const syncpipe = require("syncpipe");
|
||||
|
||||
const arrayUnique = require("../util/array-unique");
|
||||
const pickTaskResult = require("./pick-task-result");
|
||||
|
||||
function taskResultsToObject(taskResults) {
|
||||
return syncpipe(taskResults, [
|
||||
(_) => _.map((result) => [ result.taskName, result.metadata ]),
|
||||
(_) => Object.fromEntries(_)
|
||||
]);
|
||||
}
|
||||
|
||||
module.exports = function mergeItems({ fromObj, intoObj, merge, mergeMetadata }) {
|
||||
let mergedData = merge(intoObj.data, fromObj.data);
|
||||
|
||||
let fromTaskResults = taskResultsToObject(fromObj.taskResults);
|
||||
let intoTaskResults = taskResultsToObject(intoObj.taskResults);
|
||||
|
||||
let allTaskKeys = arrayUnique([
|
||||
... Object.keys(fromTaskResults),
|
||||
... Object.keys(intoTaskResults)
|
||||
]);
|
||||
|
||||
// TODO: Use merge-by-template here instead?
|
||||
|
||||
let mergedTaskResults = allTaskKeys.map((key) => {
|
||||
let merger = mergeMetadata[key];
|
||||
let fromTask = fromTaskResults[key];
|
||||
let intoTask = intoTaskResults[key];
|
||||
|
||||
if (merger != null) {
|
||||
// Generate a new TaskResult that includes data combined from both
|
||||
let newMetadata = merger(
|
||||
defaultValue(intoTask.metadata, {}),
|
||||
defaultValue(fromTask.metadata, {})
|
||||
);
|
||||
|
||||
return {
|
||||
... intoTask,
|
||||
metadata: newMetadata,
|
||||
updatedAt: Date.now()
|
||||
};
|
||||
} else {
|
||||
// Take the newest known TaskResult and just make sure that it is pointing at the correct ID
|
||||
// TODO: Does this really belong in the semantics module? Or is this database-specific?
|
||||
return {
|
||||
... pickTaskResult(intoTask, fromTask),
|
||||
itemId: intoObj.id
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
return {
|
||||
id: intoObj.id,
|
||||
data: mergedData,
|
||||
taskResults: mergedTaskResults
|
||||
};
|
||||
};
|
13
src/semantics/pick-task-result.js
Normal file
13
src/semantics/pick-task-result.js
Normal file
|
@ -0,0 +1,13 @@
|
|||
"use strict";
|
||||
|
||||
module.exports = function pickTaskResult(taskA, taskB) {
|
||||
if (taskA == null) {
|
||||
return taskB;
|
||||
} else if (taskB == null) {
|
||||
return taskA;
|
||||
} else if (taskA.updatedAt > taskB.updatedAt) {
|
||||
return taskA;
|
||||
} else {
|
||||
return taskB;
|
||||
}
|
||||
};
|
|
@ -1,47 +0,0 @@
|
|||
"use strict";
|
||||
|
||||
const Promise = require("bluebird");
|
||||
// const { UniqueViolationError } = require("objection");
|
||||
|
||||
const pipe = require("@promistream/pipe");
|
||||
const map = require("@promistream/map");
|
||||
const mapFilter = require("@promistream/map-filter");
|
||||
|
||||
module.exports = function ({ db, knex }) {
|
||||
return function processTaskSafely(task, processHandler) {
|
||||
let lockStream = mapFilter((item) => {
|
||||
return Promise.try(() => {
|
||||
return db.TaskInProgress.query(knex).insert({
|
||||
task: task,
|
||||
item_id: item.id
|
||||
});
|
||||
}).then(() => {
|
||||
return item;
|
||||
}).catch({ name: "UniqueViolationError" }, () => {
|
||||
return mapFilter.NoValue;
|
||||
});
|
||||
});
|
||||
|
||||
let processUnlockStream = map((item) => {
|
||||
return Promise.try(() => {
|
||||
return knex.transaction((tx) => {
|
||||
return processHandler(item, tx);
|
||||
}, { doNotRejectOnRollback: false });
|
||||
}).finally(() => {
|
||||
return db.TaskInProgress.query(knex)
|
||||
.delete()
|
||||
.where({
|
||||
task: task,
|
||||
item_id: item.id
|
||||
});
|
||||
}).then(() => {
|
||||
return item;
|
||||
});
|
||||
});
|
||||
|
||||
return pipe([
|
||||
lockStream,
|
||||
processUnlockStream
|
||||
]);
|
||||
};
|
||||
};
|
37
src/streams/task-kernel.js
Normal file
37
src/streams/task-kernel.js
Normal file
|
@ -0,0 +1,37 @@
|
|||
"use strict";
|
||||
|
||||
const Promise = require("bluebird");
|
||||
const pipe = require("@promistream/pipe");
|
||||
const filter = require("@promistream/filter");
|
||||
const map = require("@promistream/map");
|
||||
const rateLimit = require("@promistream/rate-limit");
|
||||
|
||||
const parallelize = require("@promistream/parallelize");
|
||||
|
||||
// FIXME: Move logs to logging hook
|
||||
|
||||
module.exports = function (state) {
|
||||
let { backend } = state;
|
||||
const runTask = require("../run-task")(state);
|
||||
|
||||
return function createTaskKernelStream(task, { globalRateLimiter }) {
|
||||
return pipe([
|
||||
backend.topLevel.getTaskStream(null, { task: task }),
|
||||
filter((item) => backend.forItem({ task: task, item: item }).internal.lock()),
|
||||
globalRateLimiter,
|
||||
(task.taskInterval != null)
|
||||
? rateLimit(task.taskInterval)
|
||||
: null,
|
||||
map((item) => {
|
||||
return Promise.try(() => {
|
||||
return runTask(task, item);
|
||||
}).tap(() => {
|
||||
return backend.forItem({ task: task, item: item }).internal.unlock();
|
||||
});
|
||||
}),
|
||||
(task.parallelTasks != null)
|
||||
? parallelize(task.parallelTasks)
|
||||
: null
|
||||
]);
|
||||
};
|
||||
};
|
|
@ -1,201 +0,0 @@
|
|||
"use strict";
|
||||
|
||||
const Promise = require("bluebird");
|
||||
const ms = require("ms");
|
||||
const dateFns = require("date-fns");
|
||||
const debug = require("debug")("scrapingserver");
|
||||
const chalk = require("chalk");
|
||||
|
||||
const simpleSource = require("@promistream/simple-source");
|
||||
const buffer = require("@promistream/buffer");
|
||||
const pipe = require("@promistream/pipe");
|
||||
const rateLimit = require("@promistream/rate-limit");
|
||||
const parallelize = require("@promistream/parallelize");
|
||||
|
||||
const logStatus = require("./log-status");
|
||||
// const { UniqueViolationError } = require("objection");
|
||||
|
||||
// FIXME: Revert inlining of task_states once switched to PostgreSQL 12+, which can do this automatically using NOT MATERIALIZED
|
||||
// FIXME: Check whether the dependency task_versions are actually being correctly passed in, and aren't accidentally nulls
|
||||
let query = `
|
||||
WITH
|
||||
dependency_tasks AS (
|
||||
SELECT * FROM
|
||||
json_to_recordset(:dependencyTaskDefinitions) AS x(task text, task_version text)
|
||||
),
|
||||
matching_items AS (
|
||||
SELECT
|
||||
DISTINCT ON (srap_items.id)
|
||||
srap_items.*,
|
||||
results.updated_at AS result_date,
|
||||
results.task_version,
|
||||
(
|
||||
results.is_successful = TRUE
|
||||
AND (
|
||||
results.expires_at < NOW()
|
||||
OR results.is_invalidated = TRUE
|
||||
)
|
||||
) AS is_candidate
|
||||
FROM srap_items
|
||||
INNER JOIN srap_tags
|
||||
ON srap_tags.item_id = srap_items.id
|
||||
AND srap_tags.name = ANY(:tags)
|
||||
LEFT JOIN srap_task_results AS results
|
||||
ON results.item_id = srap_items.id
|
||||
AND results.task = :task
|
||||
WHERE
|
||||
NOT EXISTS (
|
||||
SELECT FROM srap_tasks_in_progress AS pr WHERE pr.item_id = srap_items.id
|
||||
)
|
||||
),
|
||||
candidates AS (
|
||||
SELECT * FROM matching_items
|
||||
WHERE result_date IS NULL
|
||||
UNION
|
||||
SELECT * FROM matching_items
|
||||
WHERE is_candidate = TRUE
|
||||
OR NOT (task_version = :taskVersion)
|
||||
)
|
||||
(
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
candidates
|
||||
WHERE
|
||||
NOT EXISTS (
|
||||
SELECT
|
||||
results.*
|
||||
FROM dependency_tasks
|
||||
LEFT JOIN srap_task_results AS results
|
||||
ON dependency_tasks.task = results.task
|
||||
AND dependency_tasks.task_version = results.task_version
|
||||
AND results.item_id = candidates.id
|
||||
WHERE
|
||||
results.is_successful IS NULL
|
||||
OR results.is_successful = FALSE
|
||||
OR (
|
||||
results.is_successful = TRUE
|
||||
AND (
|
||||
results.expires_at < NOW()
|
||||
OR results.is_invalidated = TRUE
|
||||
)
|
||||
)
|
||||
)
|
||||
) LIMIT :resultLimit;
|
||||
`;
|
||||
|
||||
module.exports = function (state) {
|
||||
const processTaskSafely = require("./streams/process-task-safely")(state);
|
||||
const queries = require("./queries")(state);
|
||||
const createDatabaseQueue = require("./queued-database-api")(state);
|
||||
|
||||
let { knex, db, metrics } = state;
|
||||
|
||||
// FIXME: Transaction support!
|
||||
|
||||
return function createTaskStream({ task, taskVersion, taskDependencies, taskDependents, taskInterval, tags, run, ttl, globalRateLimiter, globalParallelize, parallelTasks }) {
|
||||
// TODO: Make nicer
|
||||
let ttlInSeconds = (ttl != null)
|
||||
? (typeof ttl === "number")
|
||||
? ttl / 1000
|
||||
: ms(ttl) / 1000
|
||||
: undefined;
|
||||
|
||||
return pipe([
|
||||
simpleSource(() => {
|
||||
let startTime = Date.now();
|
||||
|
||||
return Promise.try(() => {
|
||||
// console.log("Fetching new batch");
|
||||
return knex.raw(query, {
|
||||
tags: tags,
|
||||
task: task,
|
||||
taskVersion: taskVersion,
|
||||
resultLimit: 1000, // TODO: Make configurable
|
||||
dependencyTaskDefinitions: JSON.stringify(taskDependencies.map((dependency) => {
|
||||
// Case-mapping for SQL compatibility
|
||||
return { task_version: dependency.taskVersion, task: dependency.task };
|
||||
}))
|
||||
});
|
||||
}).then((result) => {
|
||||
let timeElapsed = Date.now() - startTime;
|
||||
|
||||
metrics.taskFetchTime.labels({ task: task }).set(timeElapsed / 1000);
|
||||
metrics.taskFetchResults.labels({ task: task }).set(result.rowCount);
|
||||
|
||||
debug(`Task retrieval query for '${task}' took ${timeElapsed}ms and produced ${result.rowCount} results`);
|
||||
|
||||
if (result.rowCount > 0) {
|
||||
// console.log("rows:", result.rows);
|
||||
return result.rows;
|
||||
} else {
|
||||
// FIXME: Make this delay configurable, or maybe even use LISTEN/NOTIFY
|
||||
return Promise.resolve([]).delay(30000);
|
||||
}
|
||||
});
|
||||
}),
|
||||
buffer(),
|
||||
globalRateLimiter,
|
||||
(taskInterval != null)
|
||||
? rateLimit(taskInterval)
|
||||
: null,
|
||||
processTaskSafely(task, (item, tx) => {
|
||||
logStatus(task, chalk.bold.cyan, "started", item.id);
|
||||
|
||||
let queue = createDatabaseQueue({ tx, item, task, taskVersion, taskDependents, taskDependencies });
|
||||
|
||||
return Promise.try(() => {
|
||||
// TODO: Proper Validatem schemas for each API method
|
||||
return run({
|
||||
id: item.id,
|
||||
data: item.data,
|
||||
getItem: function (id) {
|
||||
return queries.getItem(tx, id);
|
||||
},
|
||||
... queue.api
|
||||
});
|
||||
}).then(() => {
|
||||
return queue.execute();
|
||||
}).then(() => {
|
||||
// Update succeeded
|
||||
return db.TaskResult.query(tx).findById([ task, item.id ]).patch({
|
||||
is_successful: true,
|
||||
updated_at: new Date(),
|
||||
expires_at: (ttlInSeconds != null)
|
||||
? dateFns.add(new Date(), { seconds: ttlInSeconds })
|
||||
: null
|
||||
});
|
||||
}).catch((error) => {
|
||||
metrics.failedItems.inc(1);
|
||||
metrics.failedItems.labels({ task: task }).inc(1);
|
||||
logStatus(task, chalk.bold.red, "failed", `${item.id}: ${error.stack}`);
|
||||
|
||||
let commonUpdate = {
|
||||
is_successful: false,
|
||||
task_version: taskVersion
|
||||
};
|
||||
|
||||
return Promise.try(() => {
|
||||
// Task failed -- note, cannot use tx here because it has failed
|
||||
return db.TaskResult.query(knex).insert({
|
||||
item_id: item.id,
|
||||
task: task,
|
||||
metadata: {},
|
||||
... commonUpdate
|
||||
});
|
||||
}).catch({ name: "UniqueViolationError" }, () => {
|
||||
return db.TaskResult.query(knex).findById([ task, item.id ]).patch({
|
||||
... commonUpdate
|
||||
});
|
||||
}).then(() => {
|
||||
// throw error;
|
||||
});
|
||||
});
|
||||
}),
|
||||
// TODO: Sort out a cleaner way to organize local vs. global parallelization
|
||||
(parallelTasks != null)
|
||||
? parallelize(parallelTasks)
|
||||
: globalParallelize
|
||||
]);
|
||||
};
|
||||
};
|
5
src/util/array-unique.js
Normal file
5
src/util/array-unique.js
Normal file
|
@ -0,0 +1,5 @@
|
|||
"use strict";
|
||||
|
||||
module.exports = function arrayUnique(array) {
|
||||
return Array.from(new Set(array));
|
||||
};
|
45
src/util/generate-task-graph.js
Normal file
45
src/util/generate-task-graph.js
Normal file
|
@ -0,0 +1,45 @@
|
|||
"use strict";
|
||||
|
||||
const syncpipe = require("syncpipe");
|
||||
const defaultValue = require("default-value"); // FIXME: Move to config validation
|
||||
|
||||
const invertMapping = require("./invert-mapping");
|
||||
|
||||
module.exports = function generateTaskGraph({ tags, tasks }) {
|
||||
let tagsMapping = invertMapping(tags);
|
||||
|
||||
let tasksMap = syncpipe(tasks, [
|
||||
_ => Object.entries(_),
|
||||
_ => _.map(([ name, taskDefinition ]) => {
|
||||
return [ name, {
|
||||
... taskDefinition,
|
||||
name: name,
|
||||
// NOTE: The default here is for cases where a task is 'orphaned' and not associated with any tags; this can happen during development, and in that case the task won't be present in the tagsMapping at all.
|
||||
tags: tagsMapping[name] ?? [],
|
||||
dependencies: [],
|
||||
dependents: []
|
||||
}];
|
||||
}),
|
||||
_ => new Map(_)
|
||||
]);
|
||||
|
||||
function getTask(name) {
|
||||
if (tasksMap.has(name)) {
|
||||
return tasksMap.get(name);
|
||||
} else {
|
||||
throw new Error(`Encountered dependency reference to non-existent task '${name}'`);
|
||||
}
|
||||
}
|
||||
|
||||
for (let task of tasksMap.values()) {
|
||||
for (let dependencyName of defaultValue(task.dependsOn, [])) {
|
||||
task.dependencies.push(getTask(dependencyName));
|
||||
getTask(dependencyName).dependents.push(task);
|
||||
}
|
||||
|
||||
// NOTE: We are mutating a local copy of the task definition here, that we created further up
|
||||
delete task.dependsOn;
|
||||
}
|
||||
|
||||
return tasksMap;
|
||||
};
|
20
src/util/invert-mapping.js
Normal file
20
src/util/invert-mapping.js
Normal file
|
@ -0,0 +1,20 @@
|
|||
"use strict";
|
||||
|
||||
// Inverts an object of arrays, eg. {a: [x, y], b: [x, z]} becomes {x: [a, b], y: [a], z: [b]}
|
||||
// TODO: See if this can be replaced with an off-the-shelf module with equivalent semantics, something transpose-y maybe?
|
||||
|
||||
module.exports = function invertMapping(mapping) {
|
||||
let newObject = {};
|
||||
|
||||
for (let [ key, values ] of Object.entries(mapping)) {
|
||||
for (let value of values) {
|
||||
if (newObject[value] == null) {
|
||||
newObject[value] = [];
|
||||
}
|
||||
|
||||
newObject[value].push(key);
|
||||
}
|
||||
}
|
||||
|
||||
return newObject;
|
||||
};
|
|
@ -3,5 +3,5 @@
|
|||
const chalk = require("chalk");
|
||||
|
||||
module.exports = function logStatus(task, color, type, message) {
|
||||
console.log(`${chalk.bold(`[${task}]`)} ${color(`[${type}]`)} ${message}`);
|
||||
console.log(`${chalk.bold(`[${task.name}]`)} ${color(`[${type}]`)} ${message}`);
|
||||
};
|
16
src/validators/is-ms.js
Normal file
16
src/validators/is-ms.js
Normal file
|
@ -0,0 +1,16 @@
|
|||
"use strict";
|
||||
|
||||
// FIXME: Move to own @validatem/is-ms module
|
||||
|
||||
const ms = require("ms");
|
||||
|
||||
const either = require("@validatem/either");
|
||||
const isString = require("@validatem/is-string");
|
||||
const required = require("@validatem/required");
|
||||
|
||||
const isPositiveInteger = require("./is-positive-integer");
|
||||
|
||||
module.exports = either([
|
||||
isPositiveInteger,
|
||||
[ isString, (value) => ms(value), required, isPositiveInteger ]
|
||||
]);
|
6
src/validators/is-positive-integer.js
Normal file
6
src/validators/is-positive-integer.js
Normal file
|
@ -0,0 +1,6 @@
|
|||
"use strict";
|
||||
|
||||
const isInteger = require("@validatem/is-integer");
|
||||
const isPositive = require("@validatem/is-positive");
|
||||
|
||||
module.exports = [ isInteger, isPositive ];
|
32
src/validators/is-task-object.js
Normal file
32
src/validators/is-task-object.js
Normal file
|
@ -0,0 +1,32 @@
|
|||
"use strict";
|
||||
|
||||
const required = require("@validatem/required");
|
||||
const arrayOf = require("@validatem/array-of");
|
||||
const isString = require("@validatem/is-string");
|
||||
const isFunction = require("@validatem/is-function");
|
||||
const isArray = require("@validatem/is-array");
|
||||
const either = require("@validatem/either");
|
||||
const isValue = require("@validatem/is-value");
|
||||
|
||||
const isPositiveInteger = require("./is-positive-integer");
|
||||
|
||||
function makeRules(recurse) {
|
||||
// We want to recurse exactly one level
|
||||
let isTaskArray = (recurse === true)
|
||||
? arrayOf(makeRules(false))
|
||||
: isArray;
|
||||
|
||||
return {
|
||||
name: [ required, isString ],
|
||||
version: [ required, isString ],
|
||||
ttl: [ isPositiveInteger ],
|
||||
parallelTasks: [ either([ isPositiveInteger, isValue(Infinity) ]) ],
|
||||
taskInterval: [ isPositiveInteger ],
|
||||
dependents: [ required, isTaskArray ],
|
||||
dependencies: [ required, isTaskArray ],
|
||||
tags: [ required, arrayOf(isString) ],
|
||||
run: [ required, isFunction ]
|
||||
};
|
||||
}
|
||||
|
||||
module.exports = makeRules(true);
|
46
src/validators/is-valid-configuration.js
Normal file
46
src/validators/is-valid-configuration.js
Normal file
|
@ -0,0 +1,46 @@
|
|||
"use strict";
|
||||
|
||||
const required = require("@validatem/required");
|
||||
const defaultTo = require("@validatem/default-to");
|
||||
const either = require("@validatem/either");
|
||||
const anything = require("@validatem/anything");
|
||||
const anyProperty = require("@validatem/any-property");
|
||||
const arrayOf = require("@validatem/array-of");
|
||||
const isFunction = require("@validatem/is-function");
|
||||
const isString = require("@validatem/is-string");
|
||||
const isInteger = require("@validatem/is-integer");
|
||||
const isPositive = require("@validatem/is-positive");
|
||||
const isValue = require("@validatem/is-value");
|
||||
const isBoolean = require("@validatem/is-boolean");
|
||||
|
||||
const isMS = require("./is-ms");
|
||||
|
||||
module.exports = {
|
||||
backend: [ required, isValue("postgresql") ], // No other backends exist yet
|
||||
simulate: [ isBoolean, defaultTo(false) ],
|
||||
taskInterval: [ isMS ], // global rate limit
|
||||
database: anything, // FIXME: Validate various database options
|
||||
seed: arrayOf({
|
||||
id: [ required, isString ],
|
||||
tags: [ required, arrayOf(isString) ],
|
||||
data: [ required, anything ] // FIXME: Validate object
|
||||
}),
|
||||
tags: anyProperty({
|
||||
key: [ required, isString ],
|
||||
value: [ required, arrayOf(isString) ]
|
||||
}),
|
||||
tasks: anyProperty({
|
||||
key: [ required, isString ],
|
||||
value: [ required, {
|
||||
ttl: [ isMS ],
|
||||
taskInterval: [ isMS ],
|
||||
parallelTasks: [ defaultTo(1), either([
|
||||
[ isValue(Infinity) ],
|
||||
[ isInteger, isPositive ]
|
||||
])],
|
||||
version: [ defaultTo("0"), isString ],
|
||||
dependsOn: [ defaultTo([]), arrayOf(isString) ],
|
||||
run: [ required, isFunction ]
|
||||
}]
|
||||
})
|
||||
};
|
3
todo-db-rework.txt
Normal file
3
todo-db-rework.txt
Normal file
|
@ -0,0 +1,3 @@
|
|||
- locks table: make locked status a field instead of based on existence
|
||||
- rename locks table to queue table
|
||||
- insert tasks into queue table whenever drained for a task
|
45
yarn.lock
45
yarn.lock
|
@ -112,6 +112,18 @@
|
|||
default-value "^1.0.0"
|
||||
error-chain "^0.1.0"
|
||||
|
||||
"@promistream/filter@^0.1.1":
|
||||
version "0.1.1"
|
||||
resolved "https://registry.yarnpkg.com/@promistream/filter/-/filter-0.1.1.tgz#126f2d581c801d0e8e30e1392700384f00da94be"
|
||||
integrity sha512-rE4mTC8I7FiwH5HZTcqzfwHTRXAAShBuNoLMhhWcKPvqPIjAbEoqGg8klEs4fNDl8LXrd0Txm7PeBQ8cgZYwmg==
|
||||
dependencies:
|
||||
"@promistream/propagate-abort" "^0.1.3"
|
||||
"@promistream/propagate-peek" "^0.1.0"
|
||||
"@validatem/core" "^0.3.12"
|
||||
"@validatem/is-function" "^0.1.0"
|
||||
"@validatem/required" "^0.1.1"
|
||||
bluebird "^3.7.2"
|
||||
|
||||
"@promistream/from-iterable@^0.1.0":
|
||||
version "0.1.0"
|
||||
resolved "https://registry.yarnpkg.com/@promistream/from-iterable/-/from-iterable-0.1.0.tgz#f0f28e5f53449f4dc0cb90fc3d6753b9181b8a8b"
|
||||
|
@ -421,7 +433,7 @@
|
|||
default-value "^1.0.0"
|
||||
flatten "^1.0.3"
|
||||
|
||||
"@validatem/is-array@^0.1.0":
|
||||
"@validatem/is-array@^0.1.0", "@validatem/is-array@^0.1.1":
|
||||
version "0.1.1"
|
||||
resolved "https://registry.yarnpkg.com/@validatem/is-array/-/is-array-0.1.1.tgz#fbe15ca8c97c30b622a5bbeb536d341e99cfc2c5"
|
||||
integrity sha512-XD3C+Nqfpnbb4oO//Ufodzvui7SsCIW/stxZ39dP/fyRsBHrdERinkFATH5HepegtDlWMQswm5m1XFRbQiP2oQ==
|
||||
|
@ -452,7 +464,15 @@
|
|||
"@validatem/error" "^1.0.0"
|
||||
is-callable "^1.1.5"
|
||||
|
||||
"@validatem/is-number@^0.1.3":
|
||||
"@validatem/is-integer@^0.1.0":
|
||||
version "0.1.0"
|
||||
resolved "https://registry.yarnpkg.com/@validatem/is-integer/-/is-integer-0.1.0.tgz#52c544063aaeabb630854e1822298f5c043196a0"
|
||||
integrity sha512-sSp66uxfirIFMqro64DAdfM+UKo+IICmHdy/x3ZJXUM9F4byz/GyFmhR4wfcQswywwF1fqKw9458GE38fozjOQ==
|
||||
dependencies:
|
||||
"@validatem/error" "^1.0.0"
|
||||
"@validatem/is-number" "^0.1.2"
|
||||
|
||||
"@validatem/is-number@^0.1.2", "@validatem/is-number@^0.1.3":
|
||||
version "0.1.3"
|
||||
resolved "https://registry.yarnpkg.com/@validatem/is-number/-/is-number-0.1.3.tgz#0f8ce8c72970dbedbbd04d12942e5ab48a44cda6"
|
||||
integrity sha512-GjnbKYfYa0cTCJmsr5OUbylxTKHHZ6FDtJixWl+lEuXzeELDoYRp2UAjzfjTXJ9g2BumESqI/t0hap5rw5tEyQ==
|
||||
|
@ -468,6 +488,15 @@
|
|||
"@validatem/error" "^1.0.0"
|
||||
is-plain-obj "^2.1.0"
|
||||
|
||||
"@validatem/is-positive@^1.0.0":
|
||||
version "1.0.0"
|
||||
resolved "https://registry.yarnpkg.com/@validatem/is-positive/-/is-positive-1.0.0.tgz#8820e39dc34e94ffc216217830ad4d2a3b26d415"
|
||||
integrity sha512-ei8YL+IxwdZd7QGaR9coAo/MJRJKMqN3RunoM6lLbtFGPJyMa6hOlcLWiZ9UmRI9qE96YUHr+AI80AJ91SeOYg==
|
||||
dependencies:
|
||||
"@validatem/error" "^1.0.0"
|
||||
"@validatem/is-number" "^0.1.3"
|
||||
is-negative-zero "^2.0.0"
|
||||
|
||||
"@validatem/is-string@^0.1.1":
|
||||
version "0.1.1"
|
||||
resolved "https://registry.yarnpkg.com/@validatem/is-string/-/is-string-0.1.1.tgz#0710d8cebedd4d6861b4a8c63d7803ed6d2f9d6c"
|
||||
|
@ -484,6 +513,13 @@
|
|||
"@validatem/error" "^1.0.0"
|
||||
is-string "^1.0.5"
|
||||
|
||||
"@validatem/is-value@^0.1.0":
|
||||
version "0.1.0"
|
||||
resolved "https://registry.yarnpkg.com/@validatem/is-value/-/is-value-0.1.0.tgz#b4c7481818a88d7c24400b9b587f83b388b37d56"
|
||||
integrity sha512-FrwC6AP4W8dN6GCJEX02qNphoRUaN2JtdbIU2ztwPpLFUSgaJ+zvUrIPYDr8f+8YfrI/QIHm+uiY2TNEjQq/iQ==
|
||||
dependencies:
|
||||
"@validatem/error" "^1.0.0"
|
||||
|
||||
"@validatem/match-special@^0.1.0":
|
||||
version "0.1.0"
|
||||
resolved "https://registry.yarnpkg.com/@validatem/match-special/-/match-special-0.1.0.tgz#4e0c28f1aee5bf53c1ef30bbf8c755d4946ae0ff"
|
||||
|
@ -1860,6 +1896,11 @@ is-glob@^4.0.0, is-glob@^4.0.1:
|
|||
dependencies:
|
||||
is-extglob "^2.1.1"
|
||||
|
||||
is-negative-zero@^2.0.0:
|
||||
version "2.0.2"
|
||||
resolved "https://registry.yarnpkg.com/is-negative-zero/-/is-negative-zero-2.0.2.tgz#7bf6f03a28003b8b3965de3ac26f664d765f3150"
|
||||
integrity sha512-dqJvarLawXsFbNDeJW7zAz8ItJ9cd28YufuuFzh0G8pNHjJMnY08Dv7sYX2uF5UpQOwieAeOExEYAWWfu7ZZUA==
|
||||
|
||||
is-number-object@^1.0.4:
|
||||
version "1.0.6"
|
||||
resolved "https://registry.yarnpkg.com/is-number-object/-/is-number-object-1.0.6.tgz#6a7aaf838c7f0686a50b4553f7e54a96494e89f0"
|
||||
|
|
Loading…
Reference in a new issue