Backend refactor WIP

backend-refactor
Sven Slootweg 2 years ago
parent 2a24de452e
commit bc1a9349c3

1
.gitignore vendored

@ -1 +1,2 @@
node_modules
junk

@ -7,8 +7,10 @@
"license": "WTFPL OR CC0-1.0",
"dependencies": {
"@joepie91/consumable": "^1.0.1",
"@joepie91/unreachable": "^1.0.0",
"@promistream/buffer": "^0.1.1",
"@promistream/combine-sequential-streaming": "^0.1.0",
"@promistream/filter": "^0.1.1",
"@promistream/from-iterable": "^0.1.0",
"@promistream/from-node-stream": "^0.1.1",
"@promistream/map": "^0.1.1",
@ -23,12 +25,17 @@
"@validatem/array-of": "^0.1.2",
"@validatem/core": "^0.3.16",
"@validatem/default-to": "^0.1.0",
"@validatem/either": "^0.1.9",
"@validatem/error": "^1.1.0",
"@validatem/is-array": "^0.1.1",
"@validatem/is-boolean": "^0.1.1",
"@validatem/is-date": "^0.1.0",
"@validatem/is-function": "^0.1.0",
"@validatem/is-integer": "^0.1.0",
"@validatem/is-number": "^0.1.3",
"@validatem/is-positive": "^1.0.0",
"@validatem/is-string": "^1.0.0",
"@validatem/is-value": "^0.1.0",
"@validatem/matches-format": "^0.1.0",
"@validatem/require-either": "^0.1.0",
"@validatem/required": "^0.1.1",

@ -0,0 +1,222 @@
# runInTransaction
```js
runInTransaction(parentTX, (tx) => {
// ... database operations go here ...
})
```
# lock
```js
lock(tx, {
id,
task
})
```
Returns: `true` if locking succeeded, `false` if the task was already locked
# unlock
```js
lock(tx, {
id,
task
})
```
# getItem
Fetch an item from the database by its unique named ID.
Wrapper API:
```js
getItem(tx, id)
getItem(tx, {
id,
optional = false
})
```
Expected backend API:
```js
getItem(tx, {
id,
optional
})
```
# createItem
Creates a new item or, if the item already exists, updates the existing item (if allowed). Tags/aliases are only ever added by this operation, never removed.
Wrapper API:
```js
createItem(tx, {
id,
tags = [],
aliases = [],
// Either `data` or `update` is required
data,
update, // callback(oldData)
failIfExists = false,
allowUpsert = true, // whether the *item object* is allowed to be updated, or should just be skipped if already exists
parentID
})
```
Expected backend API:
```js
createItem(tx, {
id,
tags,
aliases,
update, // callback(oldData ?? {})
failIfExists,
allowUpsert,
parentID
})
```
# renameItem
```js
let [ tx, { to, from }] = validateArguments(arguments, {
tx: [ required, isTX ],
options: [{
to: [ required, isString ],
from: [ required, isString ]
}]
});
```
# repointAliases
```js
let [ tx, { to, from }] = validateArguments(arguments, {
tx: [ required, isTX ],
options: [{
to: [ required, isString ],
from: [ required, isString ]
}]
});
```
# mergeItem
```js
let [ tx, { from, into, merge, mergeMetadata }] = validateArguments(arguments, {
tx: [ required, isTX ],
options: [{
from: [ required, isString ],
into: [ required, isString ],
merge: [ required, isFunction ],
mergeMetadata: [ defaultTo({}), anyProperty({
key: [ required ],
value: [ required, isFunction ]
})],
}]
});
```
# deleteItem
```js
let [ tx, { id }] = validateArguments(arguments, {
tx: [ required, isTX ],
options: [{
id: [ required, isString ]
}]
});
```
# createAlias
```js
let [ tx, { from, to, failIfExists }] = validateArguments(arguments, {
tx: [ required, isTX ],
options: [{
from: [ required, isString ],
to: [ required, isString ],
failIfExists: [ defaultTo(false), isBoolean ] // TODO: Shouldn't this default to true, for any occurrence outside of a merge/rename?
}]
});
```
# deleteAlias
```js
let [ tx, { from }] = validateArguments(arguments, {
tx: [ required, isTX ],
options: [{
from: [ required, isString ]
}]
});
```
# updateData
```js
let [ tx, { id, update }] = validateArguments(arguments, {
tx: [ required, isTX ],
options: [{
id: [ required, isString ],
update: [ required, isFunction ]
}]
});
```
# updateMetadata
```js
let [ tx, { id, update, taskName, taskVersion, ttl }] = validateArguments(arguments, {
tx: [ required, isTX ],
options: [{
id: [ required, isString ],
update: [ required, isFunction ],
taskName: [ required, isString ],
taskVersion: [ required, isString ],
ttl: [ isNumber ]
}]
});
```
# expire
```js
let [ tx, { id, taskName }] = validateArguments(arguments, {
tx: [ required, isTX ],
options: [{
id: [ required, isString ],
taskName: [ required, isString ]
}]
});
```
# setTTL
# allowFailure?
# log?
# countLockedTasks
Only a `tx` argument
# getUpdates
```js
let [ tx, { timestamp, prefix }] = validateArguments(arguments, {
tx: [ required, isTX ],
options: [ defaultTo({}), {
timestamp: [ isDate ],
prefix: [ isString ]
}]
});
```

@ -0,0 +1,418 @@
"use strict";
const unreachable = require("@joepie91/unreachable");
const { validateArguments } = require("@validatem/core");
const required = require("@validatem/required");
const requireEither = require("@validatem/require-either");
const isString = require("@validatem/is-string");
const isBoolean = require("@validatem/is-boolean");
const isFunction = require("@validatem/is-function");
const isDate = require("@validatem/is-date");
const arrayOf = require("@validatem/array-of");
const defaultTo = require("@validatem/default-to");
const anyProperty = require("@validatem/any-property");
const anything = require("@validatem/anything");
const wrapValueAsOption = require("@validatem/wrap-value-as-option");
const ValidationError = require("@validatem/error");
const either = require("@validatem/either");
const isTaskObject = require("../validators/is-task-object");
// NOTE: The purpose of this module is to implement all the database API logic that's common across database backends; that mainly means initialization, settings merging, and input validation/normalization. Each individual database backend can then assume that it will always be called with valid input.
/* TODO: API semantics to document:
- All mutating operations are queued, and executed in a transaction *after* the task has been completed; this is to avoid long-lived transactions interfering with task concurrency
- All read-only operations (as specified in the task logic) are executed immediately, bypassing any queue, and outside of any transaction.
- This means that read-only operations see the state of the database *before* any mutations take place, but mutating methods see the state *after* any preceding mutations (within that task) have taken place.
- Simulated execution works exactly like regular execution, except the mutation transaction is cancelled at the last moment, preventing any true changes to the database. This essentially means you are operating on a read-only view of the database.
- This also means that a transactional database is *required* to build a srap backend. If you want to implement a backend for a non-transactional database and are able to provide semantically equivalent simulation functionality otherwise, please open an issue!
*/
// FIXME: Verify that all internal method calls in the PostgreSQL backend are still valid after moving argument validation/normalization into this module
module.exports = function (state) {
let { tasks } = state;
const backendModules = {
"postgresql": require("./postgresql")(state),
"simulated": require("./simulated")(state),
};
function wrapBackend(backend) {
// NOTE: The backend.getDefaultTransaction method MUST return synchronously.
// TODO: Only accept an explicit `null` when defaulting, not an `undefined` which may be implicit? To ensure that the caller didn't just forget to provide one. Though most/all queries have arguments coming after the TX, so this might not be necessary.
let maybeTX = [ defaultTo(backend.getDefaultTransaction), backend.isTransaction ];
function maybeGetTaskObject(taskName) {
if (tasks.has(taskName)) {
return tasks.get(taskName);
} else {
throw new ValidationError(`No task named '${taskName}' exists`);
}
}
let isTask = either([
[ isTaskObject ],
[ isString, maybeGetTaskObject, isTaskObject ]
]);
return {
topLevel: {
simulate: function () {
// NOTE: Simulated backend is initialized synchronously; all other backends are not!
return wrapBackend(backendModules.simulated.create({ backend: backend }));
},
shutdown: function () {
return backend.shutdown();
},
runInTransaction: function (_tx, _callback) {
let [ tx, callback ] = validateArguments(arguments, {
tx: maybeTX,
callback: [ required, isFunction ]
});
return backend.runInTransaction(tx, callback);
},
countLockedTasks: function (_tx) {
let [ tx ] = validateArguments(arguments, {
tx: maybeTX
});
return backend.countLockedTasks(tx);
},
getUpdateStream: function (_tx, _options) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ defaultTo({}), {
timestamp: [ isDate ],
prefix: [ isString ]
}]
});
return backend.getUpdateStream(tx, options);
},
getTaskStream: function (_tx, _options) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, {
task: [ required, isTask ]
}]
});
return backend.getTaskStream(tx, options);
},
insertSeeds: function (_tx, _options) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, {
// FIXME: Stricter validation
seeds: [ required, arrayOf(anything) ]
}]
});
return Promise.map(options.seeds, (seed) => {
let { data, ... props } = seed;
return backend.storeItem(tx, {
... props,
update: () => data,
allowUpsert: false,
failIfExists: false
});
});
}
},
forItem: function ({ item, task, mutationQueue }) {
// We create a new instance of the actual API for every item being processed. This is necessary because some of the input arguments will default to item-specific values, and some of the logic is dependent on task-specific metadata. This is a more efficient (and understandable) approach than pretending the API is stateless and then separately wrapping the API *again* for every individual item with a whole separate layer of input validation rules.
// FIXME: Is this still correct, with the new task (graph) format?
let dependentTaskNames = new Set(task.dependents.map((dependent) => dependent.task));
function mutableOperation(func) {
if (simulate) {
return func(backend);
} else if (mutationQueue != null) {
mutationQueue.push(func);
} else {
unreachable("No mutation queue provided in live mode");
}
}
return {
// NOTE: 'internal' API methods are accessible to srap, but not to user-defined tasks.
internal: {
lock: function (_tx, _options) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ defaultTo({}), {
id: [ defaultTo(item.id), isString ],
task: [ defaultTo(task), isTask ]
}]
});
return backend.lock(tx, options);
},
unlock: function (_tx, _options) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ defaultTo({}), {
id: [ defaultTo(item.id), isString ],
task: [ defaultTo(task), isTask ]
}]
});
return backend.unlock(tx, options);
},
markTaskCompleted: function (_tx) {
// TODO: Allow specifying a different task or item ID?
let [ tx ] = validateArguments(arguments, {
tx: maybeTX
});
return backend.markTaskStatus(tx, {
id: item.id,
task: task,
isSuccessful: true
});
},
markTaskFailed: function (_tx, _options) {
// TODO: Allow specifying a different task or item ID?
let [ tx, { error }] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, {
error: [ required, anything ] // TODO: Restrict to Error types?
}]
});
// FIXME: Persist error
console.error("FIXME(persist error):", error.stack);
return backend.markTaskStatus(tx, {
id: item.id,
task: task,
isSuccessful: false
});
},
},
exposed: {
// NOTE: 'exposed' API methods are the ones that are passed into a user-defined task, and which the task uses to eg. update or create new items
getItem: function (_tx, _id, _optional) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, wrapValueAsOption("id"), {
id: [ required, isString ],
optional: [ defaultTo(false), isBoolean ] // FIXME: Can this handling be moved to the wrapper?
}]
});
return backend.getItem(tx, options);
},
storeItem: function (_tx, _options) {
// NOTE: Using `update` instead of `data` makes it an upsert!
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, {
id: [ required, isString ],
// Tags are required to be specified (even if an empty array) because it's easily forgotten
tags: [ required, arrayOf(isString) ],
aliases: [ defaultTo([]), arrayOf(isString) ],
data: [ anything ], // FIXME: Check for object
update: [ isFunction ],
failIfExists: [ defaultTo(false), isBoolean ],
allowUpsert: [ defaultTo(true), isBoolean ],
parentID: [ defaultTo(item.id), isString ]
}, requireEither([ "data", "update" ]) ]
});
let { data, ... rest } = options;
return mutableOperation((backend) => {
return backend.storeItem(tx, {
... rest,
// We normalize `data` and `update` (which are mutually-exclusive) into a single option here, so that the backend only needs to deal with the `update` case
// TODO: Can this be folded into the validation rules in a reasonable and readable way?
update: (data != null)
? (existingData) => ({ ... existingData, ... data })
: rest.update
});
});
},
moveItem: function (_tx, _options) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, wrapValueAsOption("into"), {
from: [ defaultTo(item.id), isString ],
into: [ required, isString ],
// NOTE: If no `merge` function is specified, that indicates that merging is not allowed (ie. this is strictly a rename), and mergeMetadata is ignored too
merge: [ isFunction ],
mergeMetadata: [ defaultTo({}), anyProperty({
key: [ required ],
value: [ required, isFunction ]
})],
}]
});
return mutableOperation((backend) => {
return backend.moveItem(tx, { options, allowMerge: (options.merge != null) });
});
},
deleteItem: function (_tx, _options) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [
defaultTo({}),
wrapValueAsOption("id"), {
id: [ defaultTo(item.id), isString ]
}
]
});
return mutableOperation((backend) => {
return backend.deleteItem(tx, options);
});
},
createAlias: function (_tx, _options) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, wrapValueAsOption("from"), {
from: [ required, isString ],
to: [ defaultTo(item.id), isString ],
failIfExists: [ defaultTo(false), isBoolean ] // TODO: Shouldn't this default to true, for any occurrence outside of a merge/rename?
}]
});
return mutableOperation((backend) => {
return backend.createAlias(tx, options);
});
},
deleteAlias: function (_tx, _options) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, wrapValueAsOption("from"), {
from: [ required, isString ]
}]
});
return mutableOperation((backend) => {
return backend.deleteAlias(tx, options);
});
},
updateData: function (_tx, _options) {
// NOTE: This is a semantically self-describing convenience wrapper for `createItem` that updates the currently-being-processed item
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, wrapValueAsOption("update"), {
id: [ defaultTo(item.id), isString ],
update: [ required, isFunction ]
}]
});
return mutableOperation((backend) => {
return backend.createItem(tx, {
... options,
tags: []
});
});
},
updateMetadata: function (_tx, _options) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, wrapValueAsOption("update"), {
id: [ defaultTo(item.id), isString ],
update: [ required, isFunction ],
task: [ required, isTask ]
}]
});
return mutableOperation((backend) => {
return backend.updateMetadata(tx, options);
});
},
expire: function (_tx, _options) {
// TODO: It probably doesn't make any semantic sense to leave *both* arguments unspecified. Maybe that should be prohibited via eg. a non-exclusive requireEither? Otherwise the user might expect to immediately expire the *current* task, but since the task is only updated *after* the task logic runs, that is not currently possible to express.
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, {
id: [ defaultTo(item.id), isString ],
isTask: [ defaultTo(task), isTask ]
}]
});
return mutableOperation((backend) => {
return backend.expire(tx, options);
});
},
expireDependents: function (_tx, _options) {
// NOTE: This method does not have a counterpart in the database backend; it's a convenience abstraction over regular `backend.expire` calls
let [ tx, { id, dependents }] = validateArguments(arguments, {
tx: maybeTX,
options: [ defaultTo({}), wrapValueAsOption("dependents"), {
id: [ defaultTo(item.id), isString ],
dependents: [ defaultTo([]), arrayOf(isString), (dependents) => {
// Only consider dependents that actually exist for this task
return dependents.filter((dependent) => dependentTaskNames.has(dependent));
}]
}]
});
// FIXME: This doesn't work with the synchronous queueing model
return Promise.map(dependents, (dependent) => {
return this.expire(tx, {
id: id,
taskName: dependent
});
});
},
// Temporary compatibility aliases
createItem: (... args) => this.storeItem(... args),
mergeItem: (... args) => this.moveItem(... args),
renameItem: (tx, options) => {
if (typeof options === "string") {
return this.moveItem(tx, options);
} else {
return this.moveItem(tx, { into: options.to, from: options.from });
}
},
}
};
}
};
}
return function createDatabaseAPI({ backend: backendName, options, simulate }) {
// TODO: Validate inputs here, or maybe that belongs in config validation instead?
return Promise.try(() => {
let backendModule = backendModules[backendName];
if (backendModule != null) {
// FIXME: Smarter merge, maybe expose merge-by-template merge rules from the database module?
let settings = { ... backendModule.defaultSettings, ... options };
return backendModule.create(settings);
} else {
throw new Error(`No backend named '${backendName}' exists`);
}
}).then((backend) => {
return wrapBackend(backend);
});
};
};

@ -0,0 +1,352 @@
"use strict";
const Promise = require("bluebird");
const path = require("path");
const defaultValue = require("default-value");
const knexLibrary = require("knex");
const { knexSnakeCaseMappers } = require("objection");
const { addMilliseconds } = require("date-fns");
const ValidationError = require("@validatem/error");
const models = require("./models");
const mergeItems = require("../../semantics/merge-items");
let migrationsFolder = path.join(__dirname, "../../../migrations");
function noop() {}
module.exports = function(state) {
let { metrics } = state;
return {
defaultSettings: {
taskBatchSize: 1000,
taskBatchDelay: 30 * 1000
},
create: function createPostgreSQLBackend(options) {
let knex = knexLibrary({
client: "pg",
pool: { min: 0, max: 32 },
migrations: { tableName: "srap_knex_migrations" },
connection: options,
... knexSnakeCaseMappers()
});
return Promise.try(() => {
return knex.migrate.latest({
directory: migrationsFolder
});
}).then(() => {
let db = models({ ... state, knex: knex });
let queryState = { ... state, knex, db };
// TODO: Should this be inlined instead?
function repointAliases (tx, { to, from }) {
return db.Alias.query(tx)
.patch({ itemId: to, updatedAt: new Date() })
.where({ itemId: from });
}
function getTaskResult(tx, task, id) {
return Promise.try(() => {
return db.Alias.query(tx).findById(id);
}).then((alias) => {
if (alias != null) {
return Promise.try(() => {
return db.TaskResult.query(tx).findById([ task.name, alias.itemId ]);
});
}
});
}
return {
shutdown: function () {
knex.destroy();
},
getDefaultTransaction: function () {
return knex;
},
isTransaction: function (value) {
if (value.where == null || value.raw == null) {
throw new ValidationError(`Must be a valid Knex or Knex transaction instance`);
}
},
runInTransaction: function (tx, callback) {
return tx.transaction((tx) => {
return callback(tx);
}, { doNotRejectOnRollback: false });
},
lock: function (tx, { id, task }) {
return Promise.try(() => {
return db.TaskInProgress.query(tx).insert({
task: task.name,
item_id: id
});
}).then(() => {
return true;
}).catch({ name: "UniqueViolationError" }, () => {
return false;
});
},
unlock: function (tx, { id, task }) {
return db.TaskInProgress.query(tx)
.delete()
.where({
task: task.name,
item_id: id
});
// TODO: return true/false depending on whether unlocking succeeded?
},
getItem: function (tx, { id, optional }) {
return Promise.try(() => {
return db.Alias.relatedQuery("item", tx)
.for(id)
.withGraphFetched("taskResults");
}).then((results) => {
if (optional === true || results.length > 0) {
return results[0];
} else {
throw new Error(`No item exists with ID '${id}'`);
}
});
},
storeItem: function (tx, { id, tags, aliases, update, failIfExists, allowUpsert, parentID }) {
// FIXME: Make failIfExists actually work, currently it does nothing as the UNIQUE constraint violation cannot occur for an upsert
// TODO: Ensure that we run the transaction in full isolation mode, and retry in case of a conflict
return Promise.try(() => {
// NOTE: We look up by alias, since this is an upsert - and so if the specified ID already exists as an alias, we should update the existing item instead of creating a new one with the specified (aliased) ID
return db.Alias
.relatedQuery("item", tx)
.for(id);
}).then((existingItems) => {
let existingItem = existingItems[0];
let actualID = (existingItem != null)
? existingItem.id
: id;
let existingData = (existingItem != null)
? existingItem.data
: {};
// Make sure to add a self:self alias
let allAliases = aliases.concat([ actualID ]);
let newItem = {
id: actualID,
data: update(existingData),
createdBy: parentID,
tags: tags.map((tag) => ({ name: tag })),
aliases: allAliases.map((alias) => ({ alias: alias })),
updatedAt: new Date()
};
return Promise.try(() => {
if (allowUpsert) {
// NOTE: We *always* do upserts here, even if the user specified `data` rather than `update`, because tags and aliases should always be added even if the item itself already exists. We trust the user not to accidentally reuse IDs between different kinds of objects (which would break in various other ways anyway).
return db.Item.query(tx).upsertGraph(newItem, {
insertMissing: true,
noDelete: true
});
} else {
return db.Item.query(tx).insertGraph(newItem, {
insertMissing: true
});
}
}).tap(() => {
// FIXME: We should probably move the metrics stuff to the wrapper instead, so that it works for *any* backend
metrics.storedItems.inc(1);
// TODO: This currently produces somewhat misleading metrics; it only counts *explicitly specified* tags. That will *mostly* correlate to amount of genuinely-new items, but not perfectly. In the future, we should probably refactor the insertion code such that it is aware of the *current* tags of an item that it is about to merge into - but maybe that should be delayed until the zapdb migration.
if (newItem.tags != null) {
for (let tag of newItem.tags) {
metrics.storedItems.labels({ tag: tag.name }).inc(1);
}
}
});
}).catch({ name: "UniqueViolationError", table: "srap_items" }, (error) => {
if (failIfExists) {
throw error;
} else {
// Do nothing, just ignore the failure
}
});
},
moveItem: function (tx, { from, into, merge, mergeMetadata }) {
let allowMerge = (merge != null);
// TODO: Make stuff like `this.getItem` roundtrip through the backend abstraction instead of directly calling internal database methods, eg. by providing the backend itself as an argument to the method
return Promise.all([
this.getItem(tx, { id: from, optional: true }),
this.getItem(tx, { id: into, optional: true }),
]).then(([ fromObj, maybeIntoObj ]) => {
// NOTE: If the source item does not exist, we silently ignore that. This is to ensure that opportunistic renaming of items (eg. after a naming convention change which only affects older items) in scraper configurations is possible.
if (fromObj != null) {
if (allowMerge) {
let intoObj = defaultValue(maybeIntoObj, {
id: into,
data: {},
taskResults: []
});
let newObject = mergeItems({ fromObj, intoObj, merge, mergeMetadata });
let upsertOptions = {
insertMissing: true,
noDelete: true
};
return Promise.all([
// NOTE: We don't use this.deleteItem, to sidestep any alias lookups
db.Item.query(tx).findById(from).delete(),
// NOTE: Cannot use into.$query here because that adds an implicit query builder operation, which upsertGraph does not allow
db.Item.query(tx).upsertGraph(newObject, upsertOptions),
]);
} else {
return Promise.all([
db.Item.query(tx).findById(from).patch({ id: into }),
this.createAlias(tx, { from: into, to: into })
]);
}
}
}).then(() => {
// NOTE: Repointing aliases has the side-effect of leaving a redirect from the source to the destination item, as each item has a self:self alias
return repointAliases(tx, { from: from, to: into });
});
},
deleteItem: function (tx, { id }) {
return db.Alias.relatedQuery("item", tx)
.for(id)
.delete();
},
createAlias: function (tx, { from, to, failIfExists }) {
// Isolate this operation into a savepoint so that it can fail without breaking the entire transaction
let promise = this.runInTransaction(tx, (tx) => {
return db.Alias.query(tx).insert({
alias: from,
itemId: to,
updatedAt: new Date()
});
});
if (failIfExists) {
return promise;
} else {
return Promise.resolve(promise)
.catch({ name: "UniqueViolationError" }, noop);
}
},
deleteAlias: function (tx, { from }) {
// TODO: This cannot yet be propagated to the update feed, because we don't keep a record of deletions
return db.Alias.query(tx).findById(from).delete();
},
updateMetadata: function (tx, { id, update, task }) {
// TODO: failIfExists
return Promise.try(() => {
return getTaskResult(tx, task, id);
}).then((taskResult) => {
let sharedFields = {
isInvalidated: false,
updatedAt: new Date()
};
if (taskResult != null) {
return taskResult.$query(tx).patch({
... sharedFields,
metadata: update(taskResult.metadata),
});
} else {
return db.TaskResult.query(tx).insert({
... sharedFields,
task: task.name,
itemId: id,
metadata: update({})
});
}
});
},
expire: function (tx, { id, task }) {
return Promise.try(() => {
return db.Alias.query(tx).findById(id);
}).then((alias) => {
return db.TaskResult.query(tx)
.where({ task: task.name, itemId: alias.itemId })
.patch({ isInvalidated: true });
});
},
setTTL: function (options) {
// options = ttl || { id, taskName, ttl }
// FIXME
},
allowFailure: function (allowed) {
},
log: function (category, message) {
},
markTaskStatus: function (tx, { id, task, isSuccessful }) {
return Promise.try(() => {
return getTaskResult(tx, task, id);
}).then((taskResult) => {
let sharedFields = {
isSuccessful: isSuccessful,
taskVersion: task.version,
expiresAt: (task.ttl != null)
? addMilliseconds(new Date(), task.ttl)
: undefined
};
if (taskResult != null) {
return taskResult.$query(tx).patch({
... sharedFields
});
} else {
return db.TaskResult.query(tx).insert({
... sharedFields,
task: task.name,
itemId: id
});
}
});
},
logTaskError: function (tx, { id, task, error }) {
throw new Error(`UNIMPLEMENTED`); // FIXME
},
countLockedTasks: function (tx) {
return Promise.try(() => {
return db.TaskInProgress.query(tx).count({ count: "*" });
}).then((result) => {
return result[0].count;
});
},
getUpdateStream: require("./queries/get-update-stream")(queryState),
getTaskStream: require("./queries/get-task-stream")(queryState)
};
});
}
};
};

@ -0,0 +1,106 @@
"use strict";
const debug = require("debug")("srap:backend:postgresql:query:get-task-stream");
const simpleSource = require("@promistream/simple-source");
const query = `
WITH
dependency_tasks AS (
SELECT * FROM
json_to_recordset(:dependencyTaskDefinitions) AS x(task text, task_version text)
),
matching_items AS (
SELECT
DISTINCT ON (srap_items.id)
srap_items.*,
results.updated_at AS result_date,
results.task_version,
(
results.is_successful = TRUE
AND (
results.expires_at < NOW()
OR results.is_invalidated = TRUE
)
) AS is_candidate
FROM srap_items
INNER JOIN srap_tags
ON srap_tags.item_id = srap_items.id
AND srap_tags.name = ANY(:tags)
LEFT JOIN srap_task_results AS results
ON results.item_id = srap_items.id
AND results.task = :task
WHERE
NOT EXISTS (
SELECT FROM srap_tasks_in_progress AS pr WHERE pr.item_id = srap_items.id
)
),
candidates AS (
SELECT * FROM matching_items
WHERE result_date IS NULL
UNION
SELECT * FROM matching_items
WHERE is_candidate = TRUE
OR NOT (task_version = :taskVersion)
)
(
SELECT
*
FROM
candidates
WHERE
NOT EXISTS (
SELECT
results.*
FROM dependency_tasks
LEFT JOIN srap_task_results AS results
ON dependency_tasks.task = results.task
AND dependency_tasks.task_version = results.task_version
AND results.item_id = candidates.id
WHERE
results.is_successful IS NULL
OR results.is_successful = FALSE
OR (
results.is_successful = TRUE
AND (
results.expires_at < NOW()
OR results.is_invalidated = TRUE
)
)
)
) LIMIT :resultLimit;
`;
module.exports = function ({ metrics, backendSettings }) {
return function (tx, { task }) {
return simpleSource(() => {
let startTime = Date.now();
return Promise.try(() => {
return tx.raw(query, {
tags: task.tags,
task: task.name,
taskVersion: task.version,
resultLimit: backendSettings.taskBatchSize,
dependencyTaskDefinitions: JSON.stringify(task.dependencies.map((dependency) => {
// Case-mapping for SQL compatibility
return { task_version: dependency.version, task: dependency.name };
}))
});
}).then((result) => {
let timeElapsed = Date.now() - startTime;
metrics.taskFetchTime.labels({ task: task.name }).set(timeElapsed / 1000);
metrics.taskFetchResults.labels({ task: task.name }).set(result.rowCount);
debug(`Task retrieval query for '${task.name}' took ${timeElapsed}ms and produced ${result.rowCount} results`);
if (result.rowCount > 0) {
return result.rows;
} else {
// TODO: Consider using LISTEN/NOTIFY instead?
return Promise.resolve([]).delay(backendSettings.taskBatchDelay);
}
});
});
};
}

@ -0,0 +1,65 @@
"use strict";
const dateFns = require("date-fns");
const pipe = require("@promistream/pipe");
const combineSequentialStreaming = require("@promistream/combine-sequential-streaming");
const fromIterable = require("@promistream/from-iterable");
const fromNodeStream = require("@promistream/from-node-stream");
const createTypeTaggingStream = require("./streams/tag-type");
module.exports = function ({ db, knex }) {
return function (tx, { timestamp, prefix }) {
// NOTE: This returns snake_cased keys! As we're bypassing the Objection internals, no casemapping occurs.
// FIXME/REFACTOR: That needs to be changed in the refactor, for consistency across database backends
// NOTE: This is a hacky workaround - if we don't do this, then for some reason also entries *at* the exact timestamp are included, which is not what we want.
// FIXME: Verify that this doesn't break anything, eg. when an entry is created inbetween the original timestamp and +1ms.
let actualTimestamp = (timestamp != null)
? dateFns.addMilliseconds(timestamp, 1)
: undefined;
function applyWhereClauses(query, idField) {
if (timestamp != null) {
// FIXME: An error in the query here throws an error, resulting in an abort handling bug in a promistream
query = query.whereRaw(`updated_at > ?`, [ actualTimestamp ]);
}
if (prefix != null) {
query = query.whereRaw(`${idField} LIKE ?`, [ `${prefix.replace(/%/g, "\\%")}%` ]);
}
return query;
}
function* streamGenerator() {
yield pipe([
fromNodeStream.fromReadable(
applyWhereClauses(db.Item.query(tx), "id").toKnexQuery().stream()
),
createTypeTaggingStream("item")
]);
yield pipe([
fromNodeStream.fromReadable(
// NOTE: We are only interested in aliases which don't point at themselves
applyWhereClauses(db.Alias.query(tx).where("alias", "!=", knex.ref("item_id")), "alias").toKnexQuery().stream()
),
createTypeTaggingStream("alias")
]);
yield pipe([
fromNodeStream.fromReadable(
applyWhereClauses(db.TaskResult.query(tx), "item_id").toKnexQuery().stream()
),
createTypeTaggingStream("taskResult")
]);
}
return pipe([
fromIterable(streamGenerator()),
combineSequentialStreaming()
]);
}
};

@ -0,0 +1,179 @@
"use strict";
const mergeItems = require("../../semantics/merge-items");
function printTX(tx) {
// TODO: Print entire chain?
return `[tx ${tx.__txID}]`;
}
function printItem(id, task) {
if (task != null) {
return `[${id}:${task.name}]`;
} else {
return `[${id}]`;
}
}
function logSimulated(... args) {
console.log(... args);
}
module.exports = function (state) {
// NOTE: The simulated backend needs access to the 'real' backend; a task may eg. mutate an item based on its current data, and we'd need to read that from the real data source. The only constraint is that the simulated backend cannot *mutate* anything in the real backend, but reading is fine!
return function attachSimulatedBackend({ backend }) {
return {
defaultSettings: {},
create: function createSimulatedBackend(_options) {
let txCounter = 0;
let locks = new Map(); // Map<task, Set<id>>
return {
shutdown: function () {
return backend.shutdown();
},
getDefaultTransaction: function () {
return { __txID: null };
},
isTransaction: function (value) {
return ("__txID" in value);
},
runInTransaction: function (tx, callback) {
let newTransaction = { __txID: txCounter++, __parentTX: tx };
return callback(newTransaction);
},
lock: function (tx, { id, task }) {
if (!locks.has(task)) {
locks.set(task, new Set());
}
let taskLocks = locks.get(task);
if (taskLocks.has(id)) {
logSimulated(`${printTX(tx)} Already locked: ${printItem(id, task)}`);
return false;
} else {
logSimulated(`${printTX(tx)} Locking ${printItem(id, task)}`);
taskLocks.add(id);
return true;
}
},
unlock: function (tx, { id, task }) {
logSimulated(`${printTX(tx)} Unlocking ${printItem(id, task)}`);
locks.get(task).delete(id);
},
getItem: function (tx, options) {
return backend.getItem(backend.getDefaultTransaction(), options);
},
storeItem: function (tx, { id, parentID, update, tags, aliases, allowUpsert }) {
return Promise.try(() => {
return this.getItem(tx, { id: id, optional: true });
}).then((currentItem) => {
let actualID = currentItem.id ?? id;
let newItem = {
id: actualID,
data: (currentItem != null)
? update(currentItem.data)
: update({}),
createdBy: parentID,
tags: tags,
aliases: aliases.concat([ actualID ]),
updatedAt: new Date()
};
logSimulated(`${printTX(tx)} ${printItem(id)} Storing item (upsert=${allowUpsert})`, newItem);
});
},
moveItem: function (tx, { from, into, merge, mergeMetadata, allowMerge }) {
return Promise.all([
this.getItem(tx, { id: from, optional: true }),
this.getItem(tx, { id: into, optional: true }),
]).then((fromObj, maybeIntoObj) => {
if (fromObj != null) {
let intoObj = maybeIntoObj ?? {
id: into,
data: {},
taskResults: []
};
if (allowMerge) {
let newItem = mergeItems({ fromObj, intoObj, merge, mergeMetadata });
logSimulated(`${printTX(tx)} ${printItem(from)} Moving item to ${printItem[into]} (merge=true)`, newItem);
} else {
logSimulated(`${printTX(tx)} ${printItem(from)} Moving item to ${printItem[into]} (merge=false)`);
}
}
});
},
deleteItem: function (tx, { id }) {
logSimulated(`${printTX(tx)} ${printItem(id)} Deleting item`);
},
createAlias: function (tx, { from, to }) {
logSimulated(`${printTX(tx)} ${printItem(to)} Creating alias ${from}`);
},
deleteAlias: function (tx, { from }) {
logSimulated(`${printTX(tx)} ${printItem(from)} Deleting alias`);
},
updateMetadata: function (tx, { id, task, update }) {
return Promise.try(() => {
return this.getItem(tx, { id, optional: true });
}).then((item) => {
let taskResult = (item != null)
? item.taskResults.find((result) => result.task === task)
: undefined;
let existingData = (taskResult != null)
? taskResult.metadata
: {};
let newData = update(existingData);
logSimulated(`${printTX(tx)} ${printItem(id, task)} Storing metadata`, newData);
});
},
expire: function (tx, { id, task }) {
logSimulated(`${printTX(tx)} ${printItem(id, task)} Expiring task result`);
},
markTaskStatus: function (tx, { id, task, isSuccessful }) {
if (isSuccessful) {
logSimulated(`${printTX(tx)} ${printItem(id, task)} Marking task as completed`);
} else {
logSimulated(`${printTX(tx)} ${printItem(id, task)} Marking task as failed`);
}
},
countLockedTasks: function (tx) {
return Array.from(locks).reduce((total, [ task, taskLocks ]) => {
return total + taskLocks.size;
}, 0);
},
getUpdateStream: function (... args) {
return backend.getUpdateStream(... args);
},
getTaskStream: function (... args) {
return backend.getTaskStream(... args);
}
};
}
};
};
};

@ -1,48 +0,0 @@
"use strict";
const defaultValue = require("default-value");
const mapObj = require("map-obj");
module.exports = function createDependencyMap(configuration) {
let dependencyMap = mapObj(configuration.tasks, (task, definition) => {
return [
task,
defaultValue(definition.dependsOn, []).map((dependencyName) => {
let dependency = configuration.tasks[dependencyName];
if (dependency != null) {
return {
task: dependencyName,
// TODO: Do defaults processing in configuration loading/validation instead
taskVersion: defaultValue(dependency.version, "0")
};
} else {
throw new Error(`Invalid dependency specified, task does not exist: ${dependencyName}`);
}
})
];
});
// NOTE: When inverting the dependencyMap, we totally ignore the taskVersion of dependencies when keying this mapping. While the taskVersion of specific tasks *may* matter to the code that uses these mappings, we don't support more than one version of a task simultaneously existing, and so keying by the task name alone is sufficient.
let dependentMap = {};
for (let [ task, dependencies ] of Object.entries(dependencyMap)) {
let taskDefinition = configuration.tasks[task];
for (let dependency of dependencies) {
if (dependentMap[dependency.task] == null) {
dependentMap[dependency.task] = [];
}
dependentMap[dependency.task].push({
task: task,
taskVersion: defaultValue(taskDefinition.version, "0")
});
}
}
return {
dependencyMap,
dependentMap
};
};

@ -0,0 +1,44 @@
"use strict";
const syncpipe = require("syncpipe");
const defaultValue = require("default-value"); // FIXME: Move to config validation
const invertMapping = require("./util/invert-mapping");
module.exports = function generateTaskGraph({ tags, tasks }) {
let tagsMapping = invertMapping(tags);
let tasksMap = syncpipe(tasks, [
_ => Object.entries(_),
_ => _.map(([ name, taskDefinition ]) => {
return [ name, {
... taskDefinition,
name: name,
tags: tagsMapping[name],
dependencies: [],
dependents: []
}];
}),
_ => new Map(_)
]);
function getTask(name) {
if (tasksMap.has(name)) {
return tasksMap.get(name);
} else {
throw new Error(`Encountered dependency reference to non-existent task '${name}'`);
}
}
for (let task of tasksMap.values()) {
for (let dependencyName of defaultValue(task.dependsOn, [])) {
task.dependencies.push(getTask(dependencyName));
getTask(dependencyName).dependents.push(task);
}
// NOTE: We are mutating a local copy of the task definition here, that we created further up
delete task.dependsOn;
}
return tasksMap;
};

@ -11,217 +11,272 @@ const simpleSink = require("@promistream/simple-sink");
const pipe = require("@promistream/pipe");
const parallelize = require("@promistream/parallelize");
const initialize = require("./initialize");
const logStatus = require("./log-status");
const createDependencyMap = require("./dependency-map");
const { validateOptions } = require("@validatem/core");
const isValidConfiguration = require("./validators/is-valid-configuration");
const createPrometheus = require("./prometheus");
const generateTaskGraph = require("./generate-task-graph");
const unreachable = require("@joepie91/unreachable")("srap");
// FIXME: *Require* a taskInterval to be set, even if explicitly null, to prevent accidentally forgetting it
// TODO: Publish this as a separate package
// Inverts an object of arrays, eg. {a: [x, y], b: [x, z]} becomes {x: [a, b], y: [a], z: [b]}
// Useful for eg. tag mappings
function invertMapping(object) {
let newObject = {};
for (let [ key, valueList ] of Object.entries(object)) {
for (let value of valueList) {
if (newObject[value] == null) {
newObject[value] = [];
}
newObject[value].push(key);
}
}
return newObject;
}
module.exports = function createKernel(configuration) {
return Promise.try(() => {
return initialize({
knexfile: {
client: "pg",
connection: configuration.database,
pool: { min: 0, max: 32 },
migrations: { tableName: "srap_knex_migrations" }
module.exports = async function createKernel(_configuration) {
let configuration = validateOptions(arguments, isValidConfiguration);
let state = {
... createPrometheus(),
tasks: generateTaskGraph({
tags: configuration.tags,
tasks: configuration.tasks
})
};
let { metrics, tasks } = state;
const createBackend = require("./database-backends")(state);
let attachToGlobalRateLimit = (configuration.taskInterval != null)
? rateLimit.clonable(configuration.taskInterval)
: undefined;
let backend = await createBackend({
backend: configuration.backend,
options: configuration.database
});
Object.assign(state, { backend: backend });
const createTaskKernel = require("./task-kernel")(state);
function checkLockedTasks() {
return Promise.try(() => {
return backend.topLevel.countLockedTasks();
}).then((lockedCount) => {
if (lockedCount > 0) {
console.log(`${chalk.bold.red("WARNING:")} There are ${lockedCount} tasks currently locked, and they will not be run! This may be caused by a process crash in the past. See the documentation for more details on how to solve this issue.`);
}
});
}).then((state) => {
const queries = require("./queries")(state);
const createTaskStream = require("./task-stream")(state);
const createDatabaseQueue = require("./queued-database-api")(state);
}
let { knex, prometheusRegistry, metrics } = state;
let { dependencyMap, dependentMap } = createDependencyMap(configuration);
function insertSeeds() {
return Promise.map(configuration.seed, (item) => {
return queries.createItem(knex, {
... item,
allowUpsert: false,
failIfExists: false
});
});
let databasePreparePromise;
async function prepareDatabase() {
if (databasePreparePromise == null) {
databasePreparePromise = Promise.all([
checkLockedTasks(),
backend.topLevel.insertSeeds(configuration.seed)
]);
}
function checkLockedTasks() {
return Promise.try(() => {
return queries.countLockedTasks(knex);
}).then((lockedCount) => {
if (lockedCount > 0) {
console.log(`${chalk.bold.red("WARNING:")} There are ${lockedCount} tasks currently locked, and they will not be run! This may be caused by a process crash in the past. See the documentation for more details on how to solve this issue.`);
}
});
}
return databasePreparePromise;
}
function runTaskStreams() {
let tasks = invertMapping(configuration.tags);
let attachToGlobalRateLimit = (configuration.taskInterval != null)
? rateLimit.clonable(configuration.taskInterval)
: undefined;
console.log(`Starting ${Object.keys(tasks).length} tasks...`);
return Promise.map(Object.entries(tasks), ([ task, tags ]) => {
let taskConfiguration = configuration.tasks[task];
if (taskConfiguration != null) {
let taskStream = createTaskStream({
task: task,
tags: tags,
taskVersion: defaultValue(taskConfiguration.version, "0"),
taskInterval: taskConfiguration.taskInterval,
parallelTasks: taskConfiguration.parallelTasks,
ttl: taskConfiguration.ttl,
run: taskConfiguration.run,
globalRateLimiter: (attachToGlobalRateLimit != null)
? attachToGlobalRateLimit()
: null,
globalParallelize: (configuration.parallelTasks != null)
? parallelize(configuration.parallelTasks)
: null,
taskDependencies: dependencyMap[task],
taskDependents: dependentMap[task]
});
return {
run: async function runKernel() {
console.log(`Starting ${tasks.size} tasks...`);
return pipe([
taskStream,
simpleSink((completedItem) => {
await prepareDatabase();
return Promise.map(tasks.values(), (task) => {
return pipe([
createTaskKernel(task),
simpleSink(({ status, item, error }) => {
if (status === "completed") {
metrics.successfulItems.inc(1);
metrics.successfulItems.labels({ task: task }).inc(1);
logStatus(task, chalk.bold.green, "completed", completedItem.id);
})
]).read();
} else {
throw new Error(`Task '${task}' is defined to run for tags [${tags}], but no such task is defined`);
}
}).catch((error) => {
console.dir(error, { depth: null, colors: true });
throw error;
} else if (status === "failed") {
metrics.failedItems.inc(1);
metrics.failedItems.labels({ task: task }).inc(1);
} else {
unreachable(`Unrecognized status '${status}'`);
}
})
]).read();
});
},
simulate: async function simulate({ itemID, task }) {
await prepareDatabase();
let simulatedBackend = backend.simulate();
return simulateTask(itemID, task);
},
execute: async function simulate({ itemID, task }) {
await prepareDatabase();
return executeTask(itemID, task);
},
shutdown: function () {
// TODO: Properly lock all public methods after shutdown is called, and wait for any running tasks to have completed
return backend.shutdown();
},
getMetrics: function () {
return Promise.try(() => {
return state.prometheusRegistry.metrics();
}).then((metrics) => {
return {
contentType: state.prometheusRegistry.contentType,
metrics: metrics
};
});
}
function executeTask(id, task) {
let taskConfiguration = configuration.tasks[task];
};
return knex.transaction((tx) => {
return Promise.try(() => {
return queries.getItem(knex, id);
}).then((item) => {
let queue = createDatabaseQueue({
tx,
item,
task,
taskVersion: defaultValue(taskConfiguration.version, "0"),
taskDependents: dependentMap[task],
taskDependencies: dependencyMap[task]
});
return Promise.try(() => {
return taskConfiguration.run({
id: item.id,
data: item.data,
getItem: function (id) {
return queries.getItem(knex, id);
},
... queue.api
});
}).then(() => {
return queue.execute();
});
});
}, { doNotRejectOnRollback: false });
}
function runTaskStreams() {
function simulateTask(id, task) {
return Promise.map(Object.entries(tasks), ([ task, tags ]) => {
let taskConfiguration = configuration.tasks[task];
let methods = [ "createItem", "renameItem", "mergeItem", "deleteItem", "createAlias", "deleteAlias", "updateData", "updateMetadata", "expire", "expireDependents" ];
if (taskConfiguration != null) {
let taskStream = createTaskStream({
task: task,
tags: tags,
taskVersion: defaultValue(taskConfiguration.version, "0"),
taskInterval: taskConfiguration.taskInterval,
parallelTasks: taskConfiguration.parallelTasks,
ttl: taskConfiguration.ttl,
run: taskConfiguration.run,
globalRateLimiter: (attachToGlobalRateLimit != null)
? attachToGlobalRateLimit()
: null,
globalParallelize: (configuration.parallelTasks != null)
? parallelize(configuration.parallelTasks)
: null,
taskDependencies: dependencyMap[task],
taskDependents: dependentMap[task]
});
let simulatedMethods = syncpipe(methods, [
(_) => _.map((method) => [ method, function() {
console.log(`${chalk.bold.yellow.bgBlack(`${method} (simulated):`)} ${util.inspect(arguments, { colors: true, depth: null })}`);
}]),
(_) => Object.fromEntries(_)
]);
return pipe([
taskStream,
simpleSink((completedItem) => {
metrics.successfulItems.inc(1);
metrics.successfulItems.labels({ task: task }).inc(1);
logStatus(task, chalk.bold.green, "completed", completedItem.id);
})
]).read();
} else {
throw new Error(`Task '${task}' is defined to run for tags [${tags}], but no such task is defined`);
}
}).catch((error) => {
console.dir(error, { depth: null, colors: true });
throw error;
});
}
function executeTask(id, task) {
let taskConfiguration = configuration.tasks[task];
return knex.transaction((tx) => {
return Promise.try(() => {
return queries.getItem(knex, id);
}).then((item) => {
return taskConfiguration.run({
id: item.id,
data: item.data,
getItem: function (id) {
return queries.getItem(knex, id);
},
... simulatedMethods
let queue = createDatabaseQueue({
tx,
item,
task,
taskVersion: defaultValue(taskConfiguration.version, "0"),
taskDependents: dependentMap[task],
taskDependencies: dependencyMap[task]
});
});
}
return {
run: function runKernel() {
return Promise.try(() => {
return insertSeeds();
}).then(() => {
return checkLockedTasks();
}).then(() => {
return runTaskStreams();
});
},
simulate: function simulate({ itemID, task }) {
return Promise.try(() => {
return insertSeeds();
}).then(() => {
return checkLockedTasks();
}).then(() => {
return simulateTask(itemID, task);
});
},
execute: function simulate({ itemID, task }) {
return Promise.try(() => {
return insertSeeds();
}).then(() => {
return checkLockedTasks();
return taskConfiguration.run({
id: item.id,
data: item.data,
getItem: function (id) {
return queries.getItem(knex, id);
},
... queue.api
});
}).then(() => {
return executeTask(itemID, task);
});
},
shutdown: function () {
// TODO: Properly lock all public methods after shutdown is called, and wait for any running tasks to have completed
knex.destroy();
},
getMetrics: function () {
return Promise.try(() => {
return prometheusRegistry.metrics();
}).then((metrics) => {
return {
contentType: prometheusRegistry.contentType,
metrics: metrics
};
});
}
};
});
return queue.execute();
});
});
}, { doNotRejectOnRollback: false });
}
function simulateTask(id, task) {
let taskConfiguration = configuration.tasks[task];
let methods = [ "createItem", "renameItem", "mergeItem", "deleteItem", "createAlias", "deleteAlias", "updateData", "updateMetadata", "expire", "expireDependents" ];
let simulatedMethods = syncpipe(methods, [
(_) => _.map((method) => [ method, function() {
console.log(`${chalk.bold.yellow.bgBlack(`${method} (simulated):`)} ${util.inspect(arguments, { colors: true, depth: null })}`);
}]),
(_) => Object.fromEntries(_)
]);
return Promise.try(() => {
return queries.getItem(knex, id);
}).then((item) => {
return taskConfiguration.run({
id: item.id,
data: item.data,
getItem: function (id) {
return queries.getItem(knex, id);
},
... simulatedMethods
});
});
}
return {
run: function runKernel() {
return Promise.try(() => {
return insertSeeds();
}).then(() => {
return checkLockedTasks();
}).then(() => {
return runTaskStreams();
});
},
simulate: function simulate({ itemID, task }) {
return Promise.try(() => {
return insertSeeds();
}).then(() => {
return checkLockedTasks();
}).then(() => {
return simulateTask(itemID, task);
});
},
execute: function simulate({ itemID, task }) {
return Promise.try(() => {
return insertSeeds();
}).then(() => {
return checkLockedTasks();
}).then(() => {
return executeTask(itemID, task);
});
},
shutdown: function () {
// TODO: Properly lock all public methods after shutdown is called, and wait for any running tasks to have completed
knex.destroy();
},
getMetrics: function () {
return Promise.try(() => {
return prometheusRegistry.metrics();
}).then((metrics) => {
return {
contentType: prometheusRegistry.contentType,
metrics: metrics
};
});
}
};
};

@ -1,61 +0,0 @@
"use strict";
const defaultValue = require("default-value");
const chalk = require("chalk");
const logStatus = require("../log-status");
module.exports = function (state) {
const queries = require("../queries")(state);
return function createDatabaseMutationAPI({ tx, item, taskVersion, task }) {
return {
createItem: function (options) {
logStatus(task, chalk.gray, "new", options.id);
return queries.createItem(tx, {
...options,
parentID: item.id
});
},
renameItem: function (options) {
return queries.renameItem(tx, options);
},
mergeItem: function (options) {
// FIXME: Move default
return queries.mergeItem(tx, {
...options,
from: defaultValue(options.from, item.id)
});
},
deleteItem: function (options) {
return queries.deleteItem(tx, {
id: options.id
});
},
createAlias: function (options) {
// FIXME: Move default
return queries.createAlias(tx, {
...options,
to: defaultValue(options.to, item.id)
});
},
deleteAlias: function (from) {
return queries.deleteAlias(tx, {
from: from
});
},
updateData: function (options) {
return queries.updateData(tx, options);
},
updateMetadata: function (options) {
return queries.updateMetadata(tx, {
... options,
taskVersion: taskVersion
});
},
expire: function (options) {
return queries.expire(tx, options);
}
};
};
};

@ -1,79 +0,0 @@
"use strict";
const Promise = require("bluebird");
const chalk = require("chalk");
const util = require("util");
function logCall(methodName, args) {
console.log(`${chalk.bold.yellow.bgBlack(`${methodName} (simulated):`)} ${util.inspect(args, { colors: true, depth: null })}`);
}
// TODO: Make this do an actual database query and then rollback; that way the behaviour is the same as when really modifying the DB, in that earlier operations can affect what later operations see (eg. a createItem followed by a mergeItem involving that new item).
module.exports = function (state) {
const queries = require("../queries")(state);
return function createSimulationMutationAPI({ tx, item, taskVersion }) {
return {
createItem: function (options) {
logCall("createItem", {
... options,
update: (options.update != null)
? options.update(item.data)
: undefined
});
},
renameItem: function (options) {
logCall("renameItem", {
... options
});
},
mergeItem: function (options) {
// FIXME: Visualize merges
logCall("createItem", {
... options
});
},
deleteItem: function (options) {
logCall("deleteItem", {
... options
});
},
createAlias: function (options) {
logCall("createAlias", {
... options
});
},
deleteAlias: function (from) {
logCall("deleteAlias", {
from: from
});
},
updateData: function (options) {
logCall("updateData", {
... options,
update: (options.update != null)
? options.update(item.data)
: undefined
});
},
updateMetadata: function (options) {
return Promise.try(() => {
return queries.getItem(tx, id);
}).then((item) => {
// FIXME: Visualize metadata update, actually merge with the correct thing
// MARKER: Expose taskResults as an object mapping instead of an array, somehow
logCall("updateMetadata", {
... options,
update: (options.update != null)
? options.update(item.data)
: undefined
});
});
},
expire: function (options) {
return queries.expire(tx, options);
}
};
};
};

@ -1,111 +0,0 @@
"use strict";
const Promise = require("bluebird");
const wrapValueAsOption = require("@validatem/wrap-value-as-option");
const required = require("@validatem/required");
const isString = require("@validatem/is-string");
const defaultTo = require("@validatem/default-to");
const validateOptions = require("@validatem/core/src/api/validate-options");
const isFunction = require("@validatem/is-function");
const arrayOf = require("@validatem/array-of");
const defaultValue = require("default-value");
// FIXME: Remaining validators
module.exports = function wrapMutationAPI({ item, task, taskDependents }, api) {
return {
createItem: function (options) {
// FIXME: Require tags to be set, even if to an empty array, to avoid accidentally forgetting the tags
return api.createItem(options);
},
renameItem: function (_options) {
let options = validateOptions(arguments, [
wrapValueAsOption("to"), {
to: [ required, isString ],
from: [ defaultTo(item.id), isString ]
}
]);
return api.renameItem(options);
},
mergeItem: function (options) {
return api.mergeItem(options);
},
deleteItem: function (_options) {
let options = validateOptions(arguments, [
defaultTo({}),
wrapValueAsOption("id"), {
id: [ defaultTo(item.id), isString ]
}
]);
return api.deleteItem(options);
},
createAlias: function (options) {
// TODO: Single-parameter variant?
return api.createAlias(options);
},
deleteAlias: function (from) {
// FIXME: options wrapper
return api.deleteAlias(from);
},
updateData: function (_options) {
let options = validateOptions(arguments, [
wrapValueAsOption("update"), {
id: [ defaultTo(item.id), isString ],
update: [ required, isFunction ]
}
]);
return api.updateData(options);
},
updateMetadata: function (_options) {
let options = validateOptions(arguments, [
wrapValueAsOption("update"), {
id: [ defaultTo(item.id), isString ],
taskName: [ defaultTo(task), isString ],
update: [ required, isFunction ]
}
]);
return api.updateMetadata(options);
},
expire: function (_options) {
let options = validateOptions(arguments, [
defaultTo({}), {
id: [ defaultTo(item.id), isString ],
taskName: [ defaultTo(task), isString ]
}
]);
return api.expire(options);
},
expireDependents: function (_options) {
let options = validateOptions(arguments, [
defaultTo({}),
wrapValueAsOption("dependents"), {
id: [ defaultTo(item.id), isString ],
dependents: [ arrayOf(isString) ]
}
]);
let selectedDependents = (options.dependents != null)
? new Set(options.dependents)
: null;
let allDependents = defaultValue(taskDependents, []);
let affectedDependents = (selectedDependents != null)
? allDependents.filter((dependent) => selectedDependents.has(dependent.task))
: allDependents;
return Promise.map(affectedDependents, (dependent) => {
return this.expire({
id: options.id,
taskName: dependent.task
});
});
}
};
};

@ -1,26 +1,12 @@
"use strict";
const Promise = require("bluebird");
const path = require("path");
const knex = require("knex");
const { knexSnakeCaseMappers } = require("objection");
const prometheusClient = require("prom-client");
const models = require("./models");
let migrationsFolder = path.join(__dirname, "../migrations");
module.exports = function initialize({ knexfile }) {
module.exports = function createPrometheus() {
let prometheusRegistry = new prometheusClient.Registry();
prometheusClient.collectDefaultMetrics({ register: prometheusRegistry });
let knexInstance = knex({
... knexfile,
... knexSnakeCaseMappers()
});
let state = {
knex: knexInstance,
return {
prometheusRegistry: prometheusRegistry,
metrics: {
storedItems: new prometheusClient.Counter({
@ -53,18 +39,7 @@ module.exports = function initialize({ knexfile }) {
help: "Amount of new scraping tasks fetched during the most recent attempt",
labelNames: [ "task" ]
})
// FIXME: Measure queue-refill task
}
};
return Promise.try(() => {
return knexInstance.migrate.latest({
directory: migrationsFolder
});
}).then(() => {
return {
... state,
db: models(state)
};
});
};

@ -1,487 +0,0 @@
"use strict";
const Promise = require("bluebird");
// const { UniqueViolationError } = require("objection");
const dateFns = require("date-fns");
const { validateArguments } = require("@validatem/core");
const required = require("@validatem/required");
const requireEither = require("@validatem/require-either");
const isString = require("@validatem/is-string");
const isBoolean = require("@validatem/is-boolean");
const isFunction = require("@validatem/is-function");
const isNumber = require("@validatem/is-number");
const isDate = require("@validatem/is-date");
const arrayOf = require("@validatem/array-of");
const defaultTo = require("@validatem/default-to");
const anyProperty = require("@validatem/any-property");
const anything = require("@validatem/anything");
const ValidationError = require("@validatem/error");
const pipe = require("@promistream/pipe");
const combineSequentialStreaming = require("@promistream/combine-sequential-streaming");
const fromIterable = require("@promistream/from-iterable");
const fromNodeStream = require("@promistream/from-node-stream");
const createTypeTaggingStream = require("./streams/tag-type");
const { addSeconds } = require("date-fns");
const syncpipe = require("syncpipe");
const defaultValue = require("default-value");
function isTX(value) {
if (value.where == null || value.raw == null) {
throw new ValidationError(`Must be a valid Knex or Knex transaction instance`);
}
}
function noop() {}
function taskResultsToObject(taskResults) {
return syncpipe(taskResults, [
(_) => _.map((result) => [ result.taskName, result.metadata ]),
(_) => Object.fromEntries(_)
]);
}
module.exports = function ({ db, knex, metrics }) {
return {
// FIXME: Make object API instead
getItem: function (_tx, _id, _optional) {
let [ tx, id, optional ] = validateArguments(arguments, {
tx: [ required, isTX ],
id: [ required, isString ],
optional: [ defaultTo(false), isBoolean ]
});
return Promise.try(() => {
return db.Alias.relatedQuery("item", tx)
.for(id)
.withGraphFetched("taskResults");
}).then((results) => {
if (optional === true || results.length > 0) {
return results[0];
} else {
throw new Error(`No item exists with ID '${id}'`);
}
});
},
createItem: function (_tx, _options) {
// NOTE: Using `update` instead of `data` makes it an upsert!
// FIXME: Make failIfExists actually work, currently it does nothing as the UNIQUE constraint violation cannot occur for an upsert
let [ tx, { id, tags, aliases, data, update, failIfExists, allowUpsert, parentID }] = validateArguments(arguments, {
tx: [ required, isTX ],
options: [{
id: [ required, isString ],
tags: [ defaultTo([]), arrayOf(isString) ],
aliases: [ defaultTo([]), arrayOf(isString) ],
data: [ anything ], // FIXME: Check for object
update: [ isFunction ],
failIfExists: [ defaultTo(false), isBoolean ],
allowUpsert: [ defaultTo(true), isBoolean ],
parentID: [ isString ]
}, requireEither([ "data", "update" ]) ]
});
// FIXME: Ensure that we run the transaction in full isolation mode, and retry in case of a conflict
return Promise.try(() => {
// NOTE: We look up by alias, since this is an upsert - and so if the specified ID already exists as an alias, we should update the existing item instead of creating a new one with the specified (aliased) ID
return db.Alias
.relatedQuery("item", tx)
.for(id);
}).then((existingItems) => {
let existingItem = existingItems[0];
let actualID = (existingItem != null)
? existingItem.id
: id;
let existingData = (existingItem != null)
? existingItem.data
: {};
let newData = (update != null)
? update(existingData)
: { ... existingData, ... data };
// Make sure to add a self:self alias
let allAliases = aliases.concat([ actualID ]);
let newItem = {
id: actualID,
data: newData,
createdBy: parentID,
tags: tags.map((tag) => ({ name: tag })),
aliases: allAliases.map((alias) => ({ alias: alias })),
updatedAt: new Date()
};
return Promise.try(() => {
if (allowUpsert) {
// NOTE: We *always* do upserts here, even if the user specified `data` rather than `update`, because tags and aliases should always be added even if the item itself already exists. We trust the user not to accidentally reuse IDs between different kinds of objects (which would break in various other ways anyway).
return db.Item.query(tx).upsertGraph(newItem, {
insertMissing: true,
noDelete: true
});
} else {
return db.Item.query(tx).insertGraph(newItem, {
insertMissing: true
});
}
}).tap(() => {
// FIXME: We should probably move the metrics stuff to the wrapper instead, so that it works for *any* backend
metrics.storedItems.inc(1);
// TODO: This currently produces somewhat misleading metrics; it only counts *explicitly specified* tags. That will *mostly* correlate to amount of genuinely-new items, but not perfectly. In the future, we should probably refactor the insertion code such that it is aware of the *current* tags of an item that it is about to merge into - but maybe that should be delayed until the zapdb migration.
if (newItem.tags != null) {
for (let tag of newItem.tags) {
metrics.storedItems.labels({ tag: tag.name }).inc(1);
}
}
});
}).catch({ name: "UniqueViolationError", table: "srap_items" }, (error) => {
if (failIfExists) {
throw error;
} else {
// Do nothing, just ignore the failure
}
});
},
renameItem: function (_tx, _options) {
// options == to || { from, to }
let [ tx, { to, from }] = validateArguments(arguments, {
tx: [ required, isTX ],
options: [{
to: [ required, isString ],
from: [ required, isString ]
}]
});
return Promise.all([
db.Item.query(tx).findById(from).patch({ id: to }),
this.createAlias(tx, { from: to, to: to })
]);
},
repointAliases: function (_tx, _options) {
// { from, to }
let [ tx, { to, from }] = validateArguments(arguments, {
tx: [ required, isTX ],
options: [{
to: [ required, isString ],
from: [ required, isString ]
}]
});
return db.Alias.query(tx)
.patch({ itemId: to, updatedAt: new Date() })
.where({ itemId: from });
},
mergeItem: function (_tx, _options) {
// options = { from, into, merge, mergeMetadata{} }
let [ tx, { from, into, merge, mergeMetadata }] = validateArguments(arguments, {
tx: [ required, isTX ],
options: [{
from: [ required, isString ],
into: [ required, isString ],
merge: [ required, isFunction ],
mergeMetadata: [ defaultTo({}), anyProperty({
key: [ required ],
value: [ required, isFunction ]
})],
}]
});
return Promise.all([
this.getItem(tx, from, true),
this.getItem(tx, into, true),
]).then(([ fromObj, intoObj ]) => {
if (fromObj != null) {
let defaultedIntoObj = defaultValue(intoObj, {
id: into,
data: {},
taskResults: []
});
let newData = merge(defaultedIntoObj.data, fromObj.data);
let fromTaskResults = taskResultsToObject(fromObj.taskResults);
let intoTaskResults = taskResultsToObject(defaultedIntoObj.taskResults);
// FIXME: Deduplicate function
let allTaskKeys = Array.from(new Set([
... Object.keys(fromTaskResults),
... Object.keys(intoTaskResults)
]));
function selectNewestResult(taskA, taskB) {
if (taskA == null) {
return taskB;
} else if (taskB == null) {
return taskA;
} else if (taskA.updatedAt > taskB.updatedAt) {
return taskA;
} else {
return taskB;
}
}
// TODO: Use merge-by-template here instead?
let newTaskResults = allTaskKeys.map((key) => {
let merger = mergeMetadata[key];
let fromTask = fromTaskResults[key];
let intoTask = intoTaskResults[key];
if (merger != null) {
// Generate a new TaskResult that includes data combined from both
let newMetadata = merger(
defaultValue(intoTask.metadata, {}),
defaultValue(fromTask.metadata, {})
);
return {
... intoTask,
metadata: newMetadata,
updatedAt: Date.now()
};
} else {
// Take the newest known TaskResult and just make sure that it is pointing at the correct ID
return {
... selectNewestResult(intoTask, fromTask),
itemId: defaultedIntoObj.id
};
}
});
let upsertOptions = {
insertMissing: true,
noDelete: true
};
return Promise.try(() => {
// NOTE: Cannot use into.$query here because that adds an implicit query builder operation, which upsertGraph does not allow
return db.Item.query(tx).upsertGraph({
id: defaultedIntoObj.id,
data: newData,
taskResults: newTaskResults
}, upsertOptions);
}).then(() => {
// NOTE: Repointing aliases has the side-effect of leaving a redirect from the source to the destination item, as each item has a self:self alias
return this.repointAliases(tx, { from: fromObj.id, to: intoObj.id });
}).then(() => {
// NOTE: We don't use this.deleteItem, to sidestep any alias lookups
return db.Item.query(tx).findById(fromObj.id).delete();
});
}
});
},
deleteItem: function (_tx, _options) {
// options = none || { id }
let [ tx, { id }] = validateArguments(arguments, {
tx: [ required, isTX ],
options: [{
id: [ required, isString ]
}]
});
return db.Alias.relatedQuery("item", tx)
.for(id)
.delete();
// return db.Item.query(tx).findById(id).delete();
},
createAlias: function (_tx, _options) {
// options = { from, to, failIfExists }
let [ tx, { from, to, failIfExists }] = validateArguments(arguments, {
tx: [ required, isTX ],
options: [{
from: [ required, isString ],
to: [ required, isString ],
failIfExists: [ defaultTo(false), isBoolean ] // TODO: Shouldn't this default to true, for any occurrence outside of a merge/rename?
}]
});
// Isolate this operation into a savepoint so that it can fail without breaking the entire transaction
let promise = tx.transaction((tx) => {
return db.Alias.query(tx).insert({
alias: from,
itemId: to,
updatedAt: new Date()
});
});
if (failIfExists) {
return promise;
} else {
return Promise.resolve(promise)
.catch({ name: "UniqueViolationError" }, noop);
}
},
deleteAlias: function (_tx, _options) {
let [ tx, { from }] = validateArguments(arguments, {
tx: [ required, isTX ],
options: [{
from: [ required, isString ]
}]
});
// TODO: This cannot yet be propagated to the update feed, because we don't keep a record of deletions
return db.Alias.query(tx).findById(from).delete();
},
updateData: function (_tx, _options) {
// options = update || { id, update }
let [ tx, { id, update }] = validateArguments(arguments, {
tx: [ required, isTX ],
options: [{
id: [ required, isString ],
update: [ required, isFunction ]
}]
});
// TODO: Figure out the proper delineation between 'creating' and 'updating' an item
return this.createItem(tx, { id, update });
},
updateMetadata: function (_tx, _options) {
// options = update || { id, update, taskName }
let [ tx, { id, update, taskName, taskVersion, ttl }] = validateArguments(arguments, {
tx: [ required, isTX ],
options: [{
id: [ required, isString ],
update: [ required, isFunction ],
taskName: [ required, isString ],
taskVersion: [ required, isString ],
ttl: [ isNumber ]
}]
});
// TODO: failIfExists
// FIXME: metadata_updated_at
return Promise.try(() => {
return db.Alias.query(tx).findById(id);
}).then((alias) => {
let sharedFields = {
isSuccessful: true,
isInvalidated: false,
taskVersion: taskVersion,
updatedAt: new Date(),
expiresAt: (ttl != null)
? addSeconds(new Date(), ttl)
: undefined
};
if (alias != null) {
return Promise.try(() => {
return db.TaskResult.query(tx).findById([ taskName, alias.itemId ]);
}).then((taskResult) => {
if (taskResult != null) {
return taskResult.$query(tx).patch({
... sharedFields,
metadata: update(taskResult.metadata),
});
} else {
return db.TaskResult.query(tx).insert({
... sharedFields,
task: taskName,
itemId: id,
metadata: update({})
});
}
});
}
});
},
expire: function (_tx, _options) {
// options = none || { id, taskName }
let [ tx, { id, taskName }] = validateArguments(arguments, {
tx: [ required, isTX ],
options: [{
id: [ required, isString ],
taskName: [ required, isString ]
}]
});
return Promise.try(() => {
return db.Alias.query(tx).findById(id);
}).then((alias) => {
return db.TaskResult.query(tx)
.where({ task: taskName, itemId: alias.itemId })
.patch({ isInvalidated: true });
});
},
setTTL: function (options) {
// options = ttl || { id, taskName, ttl }
// FIXME
},
allowFailure: function (allowed) {
},
log: function (category, message) {
},
countLockedTasks: function (tx) {
return Promise.try(() => {
return db.TaskInProgress.query(tx).count({ count: "*" });
}).then((result) => {
return result[0].count;
});
},
getUpdates: function (_tx, _options) {
// NOTE: This returns snake_cased keys! As we're bypassing the Objection internals, no casemapping occurs.
let [ tx, { timestamp, prefix }] = validateArguments(arguments, {
tx: [ required, isTX ],
options: [ defaultTo({}), {
timestamp: [ isDate ],
prefix: [ isString ]
}]
});
// NOTE: This is a hacky workaround - if we don't do this, then for some reason also entries *at* the exact timestamp are included, which is not what we want.
// FIXME: Verify that this doesn't break anything, eg. when an entry is created inbetween the original timestamp and +1ms.
let actualTimestamp = (timestamp != null)
? dateFns.addMilliseconds(timestamp, 1)
: undefined;
function applyWhereClauses(query, idField) {
if (timestamp != null) {
// FIXME: An error in the query here throws an error, resulting in an abort handling bug in a promistream
query = query.whereRaw(`updated_at > ?`, [ actualTimestamp ]);
}
if (prefix != null) {
query = query.whereRaw(`${idField} LIKE ?`, [ `${prefix.replace(/%/g, "\\%")}%` ]);
}
return query;
}
function* streamGenerator() {
yield pipe([
fromNodeStream.fromReadable(
applyWhereClauses(db.Item.query(tx), "id").toKnexQuery().stream()
),
createTypeTaggingStream("item")
]);
yield pipe([
fromNodeStream.fromReadable(
// NOTE: We are only interested in aliases which don't point at themselves
applyWhereClauses(db.Alias.query(tx).where("alias", "!=", knex.ref("item_id")), "alias").toKnexQuery().stream()
),
createTypeTaggingStream("alias")
]);
yield pipe([
fromNodeStream.fromReadable(
applyWhereClauses(db.TaskResult.query(tx), "item_id").toKnexQuery().stream()
),
createTypeTaggingStream("taskResult")
]);
}
return pipe([
fromIterable(streamGenerator()),
combineSequentialStreaming()
]);
}
};
};

@ -0,0 +1,61 @@
"use strict";
const defaultValue = require("default-value");
const syncpipe = require("syncpipe");
const arrayUnique = require("../util/array-unique");
const pickTaskResult = require("./pick-task-result");
function taskResultsToObject(taskResults) {
return syncpipe(taskResults, [
(_) => _.map((result) => [ result.taskName, result.metadata ]),
(_) => Object.fromEntries(_)
]);
}
module.exports = function mergeItems({ fromObj, intoObj, merge, mergeMetadata }) {
let mergedData = merge(intoObj.data, fromObj.data);
let fromTaskResults = taskResultsToObject(fromObj.taskResults);
let intoTaskResults = taskResultsToObject(intoObj.taskResults);
let allTaskKeys = arrayUnique([
... Object.keys(fromTaskResults),
... Object.keys(intoTaskResults)
]);
// TODO: Use merge-by-template here instead?
let mergedTaskResults = allTaskKeys.map((key) => {
let merger = mergeMetadata[key];
let fromTask = fromTaskResults[key];
let intoTask = intoTaskResults[key];
if (merger != null) {
// Generate a new TaskResult that includes data combined from both
let newMetadata = merger(
defaultValue(intoTask.metadata, {}),
defaultValue(fromTask.metadata, {})
);
return {
... intoTask,
metadata: newMetadata,
updatedAt: Date.now()
};
} else {
// Take the newest known TaskResult and just make sure that it is pointing at the correct ID
// TODO: Does this really belong in the semantics module? Or is this database-specific?
return {
... pickTaskResult(intoTask, fromTask),
itemId: intoObj.id
};
}
});
return {
id: intoObj.id,
data: mergedData,
taskResults: mergedTaskResults
};
};

@ -0,0 +1,13 @@
"use strict";
module.exports = function pickTaskResult(taskA, taskB) {
if (taskA == null) {
return taskB;
} else if (taskB == null) {
return taskA;
} else if (taskA.updatedAt > taskB.updatedAt) {
return taskA;
} else {
return taskB;
}
};

@ -7,33 +7,28 @@ const pipe = require("@promistream/pipe");
const map = require("@promistream/map");
const mapFilter = require("@promistream/map-filter");
module.exports = function ({ db, knex }) {
module.exports = function ({ backend }) {
return function processTaskSafely(task, processHandler) {
let lockStream = mapFilter((item) => {
return Promise.try(() => {
return db.TaskInProgress.query(knex).insert({
task: task,
item_id: item.id
});
}).then(() => {
return item;
}).catch({ name: "UniqueViolationError" }, () => {
return mapFilter.NoValue;
return backend.lock(null, { id: item.id, task: task });
}).then((success) => {
if (success) {
return item;
} else {
return mapFilter.NoValue;
}
});
});
let processUnlockStream = map((item) => {
return Promise.try(() => {
return knex.transaction((tx) => {
return backend.runInTransaction((tx) => {
return processHandler(item, tx);
}, { doNotRejectOnRollback: false });
});
}).finally(() => {
return db.TaskInProgress.query(knex)
.delete()
.where({
task: task,
item_id: item.id
});
// NOTE: The unlock deliberately happens outside of a transaction, so that it can always succeed, even if a task and its associated database changes failed
return backend.unlock(null, { id: item.id, task: task });
}).then(() => {
return item;
});

@ -0,0 +1,53 @@
"use strict";
const chalk = require("chalk");
const pipe = require("@promistream/pipe");
const filter = require("@promistream/filter");
const map = require("@promistream/map");
const logStatus = require("./log-status");
module.exports = function ({ backend }) {
function runTask(task, item) {
let queue = [];
let api = backend.forItem({ task: task, id: item.id, mutationQueue: queue });
return Promise.try(() => {
logStatus(task, chalk.bold.cyan, "started", item.id);
// NOTE: We only pass in the item data itself, *not* any associated metadata like tags. If the scraping task wants access to that sort of information, it should do a `getItem` call from within its task logic where needed.
// FIXME: Is that actually still true post-refactor?
task.run({
data: item.data,
... api.exposed
});
}).then(() => {
return backend.topLevel.runInTransaction((tx) => {
// FIXME: use queue
});
}).then(async () => {
await api.internal.markTaskCompleted();
logStatus(task, chalk.bold.green, "completed", item.id);
return { status: "completed", item: item };
}).catch(async (error) => {
await api.internal.markTaskFailed(null, { error });
logStatus(task, chalk.bold.red, "failed", `${item.id}: ${error.stack}`);
return { status: "failed", item: item, error: error };
});
}
return function createTaskKernelStream(task) {
return pipe([
backend.topLevel.getTaskStream(null, { task: task }),
filter((item) => backend.forItem({ task: task, id: item.id }).internal.lock()),
map((item) => {
return Promise.try(() => {
return runTask(task, item);
}).tap(() => {
return backend.forItem({ task: task, id: item.id }).internal.unlock();
});
})
]);
};
};

@ -1,201 +0,0 @@
"use strict";
const Promise = require("bluebird");
const ms = require("ms");
const dateFns = require("date-fns");
const debug = require("debug")("scrapingserver");
const chalk = require("chalk");
const simpleSource = require("@promistream/simple-source");
const buffer = require("@promistream/buffer");
const pipe = require("@promistream/pipe");
const rateLimit = require("@promistream/rate-limit");
const parallelize = require("@promistream/parallelize");
const logStatus = require("./log-status");
// const { UniqueViolationError } = require("objection");
// FIXME: Revert inlining of task_states once switched to PostgreSQL 12+, which can do this automatically using NOT MATERIALIZED
// FIXME: Check whether the dependency task_versions are actually being correctly passed in, and aren't accidentally nulls
let query = `
WITH
dependency_tasks AS (
SELECT * FROM
json_to_recordset(:dependencyTaskDefinitions) AS x(task text, task_version text)
),
matching_items AS (
SELECT
DISTINCT ON (srap_items.id)
srap_items.*,
results.updated_at AS result_date,
results.task_version,
(
results.is_successful = TRUE
AND (
results.expires_at < NOW()
OR results.is_invalidated = TRUE
)
) AS is_candidate
FROM srap_items
INNER JOIN srap_tags
ON srap_tags.item_id = srap_items.id
AND srap_tags.name = ANY(:tags)
LEFT JOIN srap_task_results AS results
ON results.item_id = srap_items.id
AND results.task = :task
WHERE
NOT EXISTS (
SELECT FROM srap_tasks_in_progress AS pr WHERE pr.item_id = srap_items.id
)
),
candidates AS (
SELECT * FROM matching_items
WHERE result_date IS NULL
UNION
SELECT * FROM matching_items
WHERE is_candidate = TRUE
OR NOT (task_version = :taskVersion)
)
(
SELECT
*
FROM
candidates
WHERE
NOT EXISTS (
SELECT
results.*
FROM dependency_tasks
LEFT JOIN srap_task_results AS results
ON dependency_tasks.task = results.task
AND dependency_tasks.task_version = results.task_version
AND results.item_id = candidates.id
WHERE
results.is_successful IS NULL
OR results.is_successful = FALSE
OR (
results.is_successful = TRUE
AND (
results.expires_at < NOW()
OR results.is_invalidated = TRUE
)
)
)
) LIMIT :resultLimit;
`;
module.exports = function (state) {
const processTaskSafely = require("./streams/process-task-safely")(state);
const queries = require("./queries")(state);
const createDatabaseQueue = require("./queued-database-api")(state);
let { knex, db, metrics } = state;
// FIXME: Transaction support!
return function createTaskStream({ task, taskVersion, taskDependencies, taskDependents, taskInterval, tags, run, ttl, globalRateLimiter, globalParallelize, parallelTasks }) {
// TODO: Make nicer
let ttlInSeconds = (ttl != null)
? (typeof ttl === "number")
? ttl / 1000
: ms(ttl) / 1000
: undefined;
return pipe([
simpleSource(() => {
let startTime = Date.now();
return Promise.try(() => {
// console.log("Fetching new batch");
return knex.raw(query, {
tags: tags,
task: task,
taskVersion: taskVersion,
resultLimit: 1000, // TODO: Make configurable
dependencyTaskDefinitions: JSON.stringify(taskDependencies.map((dependency) => {
// Case-mapping for SQL compatibility
return { task_version: dependency.taskVersion, task: dependency.task };
}))
});
}).then((result) => {
let timeElapsed = Date.now() - startTime;
metrics.taskFetchTime.labels({ task: task }).set(timeElapsed / 1000);
metrics.taskFetchResults.labels({ task: task }).set(result.rowCount);
debug(`Task retrieval query for '${task}' took ${timeElapsed}ms and produced ${result.rowCount} results`);
if (result.rowCount > 0) {
// console.log("rows:", result.rows);
return result.rows;
} else {
// FIXME: Make this delay configurable, or maybe even use LISTEN/NOTIFY
return Promise.resolve([]).delay(30000);
}
});
}),
buffer(),
globalRateLimiter,
(taskInterval != null)
? rateLimit(taskInterval)
: null,
processTaskSafely(task, (item, tx) => {
logStatus(task, chalk.bold.cyan, "started", item.id);
let queue = createDatabaseQueue({ tx, item, task, taskVersion, taskDependents, taskDependencies });
return Promise.try(() => {
// TODO: Proper Validatem schemas for each API method
return run({
id: item.id,
data: item.data,
getItem: function (id) {
return queries.getItem(tx, id);
},
... queue.api
});
}).then(() => {
return queue.execute();
}).then(() => {
// Update succeeded
return db.TaskResult.query(tx).findById([ task, item.id ]).patch({
is_successful: true,
updated_at: new Date(),
expires_at: (ttlInSeconds != null)
? dateFns.add(new Date(), { seconds: ttlInSeconds })
: null
});
}).catch((error) => {
metrics.failedItems.inc(1);
metrics.failedItems.labels({ task: task }).inc(1);
logStatus(task, chalk.bold.red, "failed", `${item.id}: ${error.stack}`);
let commonUpdate = {
is_successful: false,
task_version: taskVersion
};
return Promise.try(() => {
// Task failed -- note, cannot use tx here because it has failed
return db.TaskResult.query(knex).insert({
item_id: item.id,
task: task,
metadata: {},
... commonUpdate
});
}).catch({ name: "UniqueViolationError" }, () => {
return db.TaskResult.query(knex).findById([ task, item.id ]).patch({
... commonUpdate
});
}).then(() => {
// throw error;
});
});
}),
// TODO: Sort out a cleaner way to organize local vs. global parallelization
(parallelTasks != null)
? parallelize(parallelTasks)
: globalParallelize
]);
};
};

@ -0,0 +1,5 @@
"use strict";
module.exports = function arrayUnique(array) {
return Array.from(new Set(array));
};

@ -0,0 +1,20 @@
"use strict";
// Inverts an object of arrays, eg. {a: [x, y], b: [x, z]} becomes {x: [a, b], y: [a], z: [b]}
// TODO: See if this can be replaced with an off-the-shelf module with equivalent semantics, something transpose-y maybe?
module.exports = function invertMapping(mapping) {
let newObject = {};
for (let [ key, values ] of Object.entries(mapping)) {
for (let value of values) {
if (newObject[value] == null) {
newObject[value] = [];
}
newObject[value].push(key);
}
}
return newObject;
};

@ -0,0 +1,16 @@
"use strict";
// FIXME: Move to own @validatem/is-ms module
const ms = require("ms");
const either = require("@validatem/either");
const isString = require("@validatem/is-string");
const required = require("@validatem/required");
const isPositiveInteger = require("./is-positive-integer");
module.exports = either([
isPositiveInteger,
[ isString, (value) => ms(value), required, isPositiveInteger ]
]);

@ -0,0 +1,6 @@
"use strict";
const isInteger = require("@validatem/is-integer");
const isPositive = require("@validatem/is-positive");
module.exports = [ isInteger, isPositive ];

@ -0,0 +1,30 @@
"use strict";
const required = require("@validatem/required");
const arrayOf = require("@validatem/array-of");
const isString = require("@validatem/is-string");
const isFunction = require("@validatem/is-function");
const isArray = require("@validatem/is-array");
const isPositiveInteger = require("./is-positive-integer");
function makeRules(recurse) {
// We want to recurse exactly one level
let isTaskArray = (recurse === true)
? arrayOf(makeRules(false))
: isArray;
return {
name: [ required, isString ],
version: [ required, isString ],
ttl: [ isPositiveInteger ],
parallelTasks: [ isPositiveInteger ],
taskInterval: [ isPositiveInteger ],
dependents: [ required, isTaskArray ],
dependencies: [ required, isTaskArray ],
tags: [ required, arrayOf(isString) ],
run: [ required, isFunction ]
};
}
module.exports = makeRules(true);

@ -0,0 +1,46 @@
"use strict";
const required = require("@validatem/required");
const defaultTo = require("@validatem/default-to");
const either = require("@validatem/either");
const anything = require("@validatem/anything");
const anyProperty = require("@validatem/any-property");
const arrayOf = require("@validatem/array-of");
const isFunction = require("@validatem/is-function");
const isString = require("@validatem/is-string");
const isInteger = require("@validatem/is-integer");
const isPositive = require("@validatem/is-positive");
const isValue = require("@validatem/is-value");
const isBoolean = require("@validatem/is-boolean");
const isMS = require("./is-ms");
module.exports = {
backend: [ required, isValue("postgresql") ], // No other backends exist yet
simulate: [ isBoolean, defaultTo(false) ],
taskInterval: [ isMS ], // global rate limit
database: anything, // FIXME: Validate various database options
seed: arrayOf({
id: [ required, isString ],
tags: [ required, arrayOf(isString) ],
data: [ required, anything ] // FIXME: Validate object
}),
tags: anyProperty({
key: [ required, isString ],
value: [ required, arrayOf(isString) ]
}),
tasks: anyProperty({
key: [ required, isString ],
value: [ required, {
ttl: [ isMS ],
taskInterval: [ isMS ],
parallelTasks: [ defaultTo, either([
[ isValue(Infinity) ],
[ isInteger, isPositive ]
])],
version: [ defaultTo("0"), isString ],
dependsOn: [ defaultTo([]), arrayOf(isString) ],
run: [ required, isFunction ]
}]
})
};

@ -112,6 +112,18 @@
default-value "^1.0.0"
error-chain "^0.1.0"
"@promistream/filter@^0.1.1":
version "0.1.1"
resolved "https://registry.yarnpkg.com/@promistream/filter/-/filter-0.1.1.tgz#126f2d581c801d0e8e30e1392700384f00da94be"
integrity sha512-rE4mTC8I7FiwH5HZTcqzfwHTRXAAShBuNoLMhhWcKPvqPIjAbEoqGg8klEs4fNDl8LXrd0Txm7PeBQ8cgZYwmg==
dependencies:
"@promistream/propagate-abort" "^0.1.3"
"@promistream/propagate-peek" "^0.1.0"
"@validatem/core" "^0.3.12"
"@validatem/is-function" "^0.1.0"
"@validatem/required" "^0.1.1"
bluebird "^3.7.2"
"@promistream/from-iterable@^0.1.0":
version "0.1.0"
resolved "https://registry.yarnpkg.com/@promistream/from-iterable/-/from-iterable-0.1.0.tgz#f0f28e5f53449f4dc0cb90fc3d6753b9181b8a8b"
@ -421,7 +433,7 @@
default-value "^1.0.0"
flatten "^1.0.3"
"@validatem/is-array@^0.1.0":
"@validatem/is-array@^0.1.0", "@validatem/is-array@^0.1.1":
version "0.1.1"
resolved "https://registry.yarnpkg.com/@validatem/is-array/-/is-array-0.1.1.tgz#fbe15ca8c97c30b622a5bbeb536d341e99cfc2c5"
integrity sha512-XD3C+Nqfpnbb4oO//Ufodzvui7SsCIW/stxZ39dP/fyRsBHrdERinkFATH5HepegtDlWMQswm5m1XFRbQiP2oQ==
@ -452,7 +464,15 @@
"@validatem/error" "^1.0.0"
is-callable "^1.1.5"
"@validatem/is-number@^0.1.3":
"@validatem/is-integer@^0.1.0":
version "0.1.0"
resolved "https://registry.yarnpkg.com/@validatem/is-integer/-/is-integer-0.1.0.tgz#52c544063aaeabb630854e1822298f5c043196a0"
integrity sha512-sSp66uxfirIFMqro64DAdfM+UKo+IICmHdy/x3ZJXUM9F4byz/GyFmhR4wfcQswywwF1fqKw9458GE38fozjOQ==
dependencies:
"@validatem/error" "^1.0.0"
"@validatem/is-number" "^0.1.2"
"@validatem/is-number@^0.1.2", "@validatem/is-number@^0.1.3":
version "0.1.3"
resolved "https://registry.yarnpkg.com/@validatem/is-number/-/is-number-0.1.3.tgz#0f8ce8c72970dbedbbd04d12942e5ab48a44cda6"
integrity sha512-GjnbKYfYa0cTCJmsr5OUbylxTKHHZ6FDtJixWl+lEuXzeELDoYRp2UAjzfjTXJ9g2BumESqI/t0hap5rw5tEyQ==
@ -468,6 +488,15 @@
"@validatem/error" "^1.0.0"
is-plain-obj "^2.1.0"
"@validatem/is-positive@^1.0.0":
version "1.0.0"
resolved "https://registry.yarnpkg.com/@validatem/is-positive/-/is-positive-1.0.0.tgz#8820e39dc34e94ffc216217830ad4d2a3b26d415"
integrity sha512-ei8YL+IxwdZd7QGaR9coAo/MJRJKMqN3RunoM6lLbtFGPJyMa6hOlcLWiZ9UmRI9qE96YUHr+AI80AJ91SeOYg==
dependencies:
"@validatem/error" "^1.0.0"
"@validatem/is-number" "^0.1.3"
is-negative-zero "^2.0.0"
"@validatem/is-string@^0.1.1":
version "0.1.1"
resolved "https://registry.yarnpkg.com/@validatem/is-string/-/is-string-0.1.1.tgz#0710d8cebedd4d6861b4a8c63d7803ed6d2f9d6c"
@ -484,6 +513,13 @@
"@validatem/error" "^1.0.0"
is-string "^1.0.5"
"@validatem/is-value@^0.1.0":
version "0.1.0"
resolved "https://registry.yarnpkg.com/@validatem/is-value/-/is-value-0.1.0.tgz#b4c7481818a88d7c24400b9b587f83b388b37d56"
integrity sha512-FrwC6AP4W8dN6GCJEX02qNphoRUaN2JtdbIU2ztwPpLFUSgaJ+zvUrIPYDr8f+8YfrI/QIHm+uiY2TNEjQq/iQ==
dependencies:
"@validatem/error" "^1.0.0"
"@validatem/match-special@^0.1.0":
version "0.1.0"
resolved "https://registry.yarnpkg.com/@validatem/match-special/-/match-special-0.1.0.tgz#4e0c28f1aee5bf53c1ef30bbf8c755d4946ae0ff"
@ -1860,6 +1896,11 @@ is-glob@^4.0.0, is-glob@^4.0.1:
dependencies:
is-extglob "^2.1.1"
is-negative-zero@^2.0.0:
version "2.0.2"
resolved "https://registry.yarnpkg.com/is-negative-zero/-/is-negative-zero-2.0.2.tgz#7bf6f03a28003b8b3965de3ac26f664d765f3150"
integrity sha512-dqJvarLawXsFbNDeJW7zAz8ItJ9cd28YufuuFzh0G8pNHjJMnY08Dv7sYX2uF5UpQOwieAeOExEYAWWfu7ZZUA==
is-number-object@^1.0.4:
version "1.0.6"
resolved "https://registry.yarnpkg.com/is-number-object/-/is-number-object-1.0.6.tgz#6a7aaf838c7f0686a50b4553f7e54a96494e89f0"

Loading…
Cancel
Save