Compare commits

..

No commits in common. 'master' and 'backend-refactor' have entirely different histories.

1
.gitignore vendored

@ -1,3 +1,2 @@
node_modules
junk
.clinic

@ -1,21 +0,0 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "node",
"request": "launch",
"name": "Launch Program",
"skipFiles": [
"<node_internals>/**"
],
"program": "./bin/simulate",
"args": ["../seekseek/scraper-config/", "lcsc:normalizeProduct", "lcsc:product:C494972"],
"env": {
"DEBUG": "srap:backend:postgresql:*"
}
}
]
}

@ -12,7 +12,7 @@ const chalk = require("chalk");
let argv = yargs.argv;
let [ configurationPath, task, item ] = argv._;
let absoluteConfigurationPath = path.resolve(process.cwd(), configurationPath);
let absoluteConfigurationPath = path.join(process.cwd(), configurationPath);
let configuration = require(absoluteConfigurationPath);
return Promise.try(() => {

@ -17,7 +17,7 @@ let configurationPath = argv._[0];
let listenHost = argv.listenHost ?? "127.0.0.1";
let listenPort = argv.listenPort ?? 3131;
let absoluteConfigurationPath = path.resolve(process.cwd(), configurationPath);
let absoluteConfigurationPath = path.join(process.cwd(), configurationPath);
let configuration = require(absoluteConfigurationPath);
return Promise.try(() => {

@ -31,19 +31,32 @@ const { testValue } = require("@validatem/core");
const matchesFormat = require("@validatem/matches-format");
const isString = require("@validatem/is-string");
const createKernel = require("../src/kernel");
const initialize = require("../src/initialize");
const errors = require("../src/errors");
let argv = yargs.argv;
let configurationPath = argv._[0];
let listenHost = argv.listenHost ?? "127.0.0.1";
let listenPort = argv.listenPort ?? 3000;
let absoluteConfigurationPath = path.resolve(process.cwd(), configurationPath);
let absoluteConfigurationPath = path.join(process.cwd(), configurationPath);
let configuration = require(absoluteConfigurationPath);
return Promise.try(() => {
return createKernel(configuration);
}).then((kernel) => {
// FIXME: Deduplicate this with kernel! Also other common wiring across binaries...
return initialize({
knexfile: {
client: "pg",
connection: configuration.database,
pool: { min: 0, max: 32 },
migrations: { tableName: "srap_knex_migrations" }
}
});
}).then((state) => {
let { db, knex } = state;
const queries = require("../src/queries")(state);
let app = express();
let router = expressPromiseRouter();
@ -112,7 +125,7 @@ return Promise.try(() => {
: undefined;
return pipe([
kernel.getUpdates({ prefix: req.query.prefix, timestamp }),
queries.getUpdates(knex, { prefix: req.query.prefix, timestamp: timestamp }),
map((item) => JSON.stringify(item) + "\n"),
fromNodeStream(res)
]).read();

@ -12,7 +12,7 @@ const chalk = require("chalk");
let argv = yargs.argv;
let [ configurationPath, task, item ] = argv._;
let absoluteConfigurationPath = path.resolve(process.cwd(), configurationPath);
let absoluteConfigurationPath = path.join(process.cwd(), configurationPath);
let configuration = require(absoluteConfigurationPath);
return Promise.try(() => {

@ -1,17 +0,0 @@
"use strict";
module.exports.up = function(knex, Promise) {
return knex.schema
.alterTable("srap_task_results", (table) => {
table.jsonb("metadata").notNullable().default({}).alter();
table.boolean("is_successful").nullable().alter();
});
};
module.exports.down = function(knex, Promise) {
return knex.schema
.alterTable("srap_task_results", (table) => {
table.jsonb("metadata").notNullable().alter();
table.boolean("is_successful").notNullable().alter();
});
};

@ -1,19 +0,0 @@
"use strict";
module.exports.up = function(knex, Promise) {
return knex.schema
.alterTable("srap_tasks_in_progress", (table) => {
table.timestamp("started_at").alter().nullable().defaultTo(null);
table.boolean("started").notNullable().defaultTo(false);
})
.renameTable("srap_tasks_in_progress", "srap_queue");
};
module.exports.down = function(knex, Promise) {
return knex.schema
.renameTable("srap_queue", "srap_tasks_in_progress")
.alterTable("srap_tasks_in_progress", (table) => {
table.timestamp("started_at").alter().notNullable().defaultTo(knex.fn.now());
table.dropColumn("started");
});
};

@ -1,39 +0,0 @@
"use strict";
module.exports.up = function(knex, Promise) {
// Get rid of existing duplicate entries
return knex.raw(`
DELETE FROM srap_tags
WHERE id IN (
SELECT
id
FROM (
SELECT
id,
row_number() OVER w as rnum
FROM srap_tags
WINDOW w AS (
PARTITION BY name, item_id
ORDER BY id
)
) t
WHERE t.rnum > 1);
`).then(() => {
return knex.schema
.alterTable("srap_tags", (table) => {
table.dropPrimary();
table.dropIndex("name");
table.dropColumn("id");
table.primary([ "name", "item_id" ]);
});
});
};
module.exports.down = function(knex, Promise) {
return knex.schema
.alterTable("srap_tags", (table) => {
table.dropPrimary();
table.bigIncrements("id").primary();
});
};

@ -1,15 +0,0 @@
"use strict";
module.exports.up = function(knex, Promise) {
return knex.schema
.alterTable("srap_tags", (table) => {
table.index("item_id");
});
};
module.exports.down = function(knex, Promise) {
return knex.schema
.alterTable("srap_tags", (table) => {
table.dropIndex("item_id");
});
};

@ -1,15 +0,0 @@
"use strict";
module.exports.up = function(knex, Promise) {
return knex.schema
.alterTable("srap_aliases", (table) => {
table.index("item_id");
});
};
module.exports.down = function(knex, Promise) {
return knex.schema
.alterTable("srap_aliases", (table) => {
table.dropIndex("item_id");
});
};

@ -61,8 +61,7 @@
},
"devDependencies": {
"@joepie91/eslint-config": "^1.1.0",
"eslint": "^7.21.0",
"clinic": "^12.0.0"
"eslint": "^7.21.0"
},
"bin": {
"srap-server": "./bin/server",

@ -1,9 +1,8 @@
"use strict";
const Promise = require("bluebird");
const unreachable = require("@joepie91/unreachable");
const { validateArguments, validateOptions } = require("@validatem/core");
const { validateArguments } = require("@validatem/core");
const required = require("@validatem/required");
const requireEither = require("@validatem/require-either");
const isString = require("@validatem/is-string");
@ -33,7 +32,7 @@ const isTaskObject = require("../validators/is-task-object");
// FIXME: Verify that all internal method calls in the PostgreSQL backend are still valid after moving argument validation/normalization into this module
module.exports = function (state) {
let { tasks, metrics } = state;
let { tasks } = state;
const backendModules = {
"postgresql": require("./postgresql")(state),
@ -69,14 +68,6 @@ module.exports = function (state) {
return backend.shutdown();
},
getQueueSize: function (_tx) {
let [ tx ] = validateArguments(arguments, {
tx: maybeTX
});
return backend.getQueueSize(tx);
},
runInTransaction: function (_tx, _callback) {
let [ tx, callback ] = validateArguments(arguments, {
tx: maybeTX,
@ -85,7 +76,6 @@ module.exports = function (state) {
return backend.runInTransaction(tx, callback);
},
countLockedTasks: function (_tx) {
let [ tx ] = validateArguments(arguments, {
tx: maybeTX
@ -121,14 +111,8 @@ module.exports = function (state) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, {
// FIXME: This currently duplicates validation logic from forItem.storeItem; figure out a way to deduplicate that
seeds: [ required, arrayOf({
id: [ required, isString ],
// Tags are required to be specified (even if an empty array) because it's easily forgotten
tags: [ required, arrayOf(isString) ],
aliases: [ defaultTo([]), arrayOf(isString) ],
data: [ required, anything ], // FIXME: Check for object
}) ]
// FIXME: Stricter validation
seeds: [ required, arrayOf(anything) ]
}]
});
@ -142,238 +126,23 @@ module.exports = function (state) {
failIfExists: false
});
});
},
// FIXME: Other than the missing readOperation wrapper and the tx argument, this is *basically* the same logic as under forItem... this should be simplified somehow.
getItem: function (_tx, _options) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, wrapValueAsOption("id"), {
id: [ required, isString ],
optional: [ defaultTo(false), isBoolean ] // FIXME: Can this handling be moved to the wrapper?
}]
});
return backend.getItem(tx, options);
},
}
},
forItem: function (_options) {
// FIXME: Proper validation rules here for the other fields as well
let { item, task, mutationQueue, readTX, simulate, onDeleteSelf } = validateOptions(arguments, {
item: anything,
task: [ required, isTask ],
mutationQueue: anything,
readTX: maybeTX,
simulate: anything,
onDeleteSelf: isFunction
});
forItem: function ({ item, task, mutationQueue }) {
// We create a new instance of the actual API for every item being processed. This is necessary because some of the input arguments will default to item-specific values, and some of the logic is dependent on task-specific metadata. This is a more efficient (and understandable) approach than pretending the API is stateless and then separately wrapping the API *again* for every individual item with a whole separate layer of input validation rules.
// FIXME: Is this still correct, with the new task (graph) format?
let dependentTaskNames = new Set(task.dependents.map((dependent) => dependent.task));
function mutableOperation(func) {
if (simulate === true) {
return func(readTX, backend);
if (simulate) {
return func(backend);
} else if (mutationQueue != null) {
mutationQueue.push(func);
} else {
unreachable("No mutation queue provided in live mode");
}
}
function readOperation(func) {
return func(readTX, backend);
}
let exposedAPI = {
// NOTE: 'exposed' API methods are the ones that are passed into a user-defined task, and which the task uses to eg. update or create new items
getItem: function (_options) {
let [ options ] = validateArguments(arguments, {
options: [ required, wrapValueAsOption("id"), {
id: [ required, isString ],
optional: [ defaultTo(false), isBoolean ] // FIXME: Can this handling be moved to the wrapper?
}]
});
return readOperation((tx) => {
return backend.getItem(tx, options);
});
},
storeItem: function (_options) {
// NOTE: Using `update` instead of `data` makes it an upsert!
// FIXME: Add an 'expire' flag for expiring any existing task results for this item? To trigger re-evaluation on updates
let [ options ] = validateArguments(arguments, {
options: [ required, {
id: [ required, isString ],
// Tags are required to be specified (even if an empty array) because it's easily forgotten
tags: [ required, arrayOf(isString) ],
aliases: [ defaultTo([]), arrayOf(isString) ],
data: [ anything ], // FIXME: Check for object
update: [ isFunction ],
failIfExists: [ defaultTo(false), isBoolean ],
allowUpsert: [ defaultTo(true), isBoolean ],
parentID: [ defaultTo(item.id), isString ]
}, requireEither([ "data", "update" ]) ]
});
let { data, ... rest } = options;
return mutableOperation((tx) => {
return backend.storeItem(tx, {
... rest,
// We normalize `data` and `update` (which are mutually-exclusive) into a single option here, so that the backend only needs to deal with the `update` case
// TODO: Can this be folded into the validation rules in a reasonable and readable way?
update: (data != null)
? (existingData) => ({ ... existingData, ... data })
: rest.update
});
});
},
moveItem: function (_options) {
let [ options ] = validateArguments(arguments, {
options: [ required, wrapValueAsOption("into"), {
from: [ defaultTo(item.id), isString ],
into: [ required, isString ],
// NOTE: If no `merge` function is specified, that indicates that merging is not allowed (ie. this is strictly a rename), and mergeMetadata is ignored too
merge: [ isFunction ],
mergeMetadata: [ defaultTo({}), anyProperty({
key: [ required ],
value: [ required, isFunction ]
})],
}]
});
return mutableOperation((tx) => {
return backend.moveItem(tx, { ... options, allowMerge: (options.merge != null) });
});
},
deleteItem: function (_options) {
let [ options ] = validateArguments(arguments, {
options: [
defaultTo({}),
wrapValueAsOption("id"), {
id: [ defaultTo(item.id), isString ]
}
]
});
if (options.id === item.id) {
// This hook is necessary to allow the task kernel to skip certain operations in this case, eg. storing the task result - it would be redundant, and reference a now non-existent item.
onDeleteSelf();
}
return mutableOperation((tx) => {
return backend.deleteItem(tx, options);
});
},
createAlias: function (_options) {
let [ options ] = validateArguments(arguments, {
options: [ required, wrapValueAsOption("from"), {
from: [ required, isString ],
to: [ defaultTo(item.id), isString ],
failIfExists: [ defaultTo(false), isBoolean ] // TODO: Shouldn't this default to true, for any occurrence outside of a merge/rename?
}]
});
return mutableOperation((tx) => {
return backend.createAlias(tx, options);
});
},
deleteAlias: function (_options) {
let [ options ] = validateArguments(arguments, {
options: [ required, wrapValueAsOption("from"), {
from: [ required, isString ]
}]
});
return mutableOperation((tx) => {
return backend.deleteAlias(tx, options);
});
},
updateData: function (_options) {
// NOTE: This is a semantically self-describing convenience wrapper for `storeItem` that updates the currently-being-processed item
// TODO: Have a dedicated alias and/or signature (for this function) for the common case of "just add a few attributes to whatever is already there"? ie. a shallow merge
let [ options ] = validateArguments(arguments, {
options: [ required, wrapValueAsOption("update"), {
id: [ defaultTo(item.id), isString ],
update: [ required, isFunction ]
}]
});
return exposedAPI.storeItem({
... options,
tags: []
});
},
updateMetadata: function (_options) {
let [ options ] = validateArguments(arguments, {
options: [ required, wrapValueAsOption("update"), {
id: [ defaultTo(item.id), isString ],
update: [ required, isFunction ],
task: [ required, isTask ]
}]
});
return mutableOperation((tx) => {
return backend.updateMetadata(tx, options);
});
},
expire: function (_options) {
// TODO: It probably doesn't make any semantic sense to leave *both* arguments unspecified. Maybe that should be prohibited via eg. a non-exclusive requireEither? Otherwise the user might expect to immediately expire the *current* task, but since the task is only updated *after* the task logic runs, that is not currently possible to express.
let [ options ] = validateArguments(arguments, {
options: [ required, {
id: [ defaultTo(item.id), isString ],
isTask: [ defaultTo(task), isTask ]
}]
});
return mutableOperation((tx) => {
return backend.expire(tx, options);
});
},
expireDependents: function (_options) {
// NOTE: This method does not have a counterpart in the database backend; it's a convenience abstraction over regular `backend.expire` calls
let [{ id, dependents }] = validateArguments(arguments, {
options: [ defaultTo({}), wrapValueAsOption("dependents"), {
id: [ defaultTo(item.id), isString ],
dependents: [ defaultTo([]), arrayOf(isString), (dependents) => {
// Only consider dependents that actually exist for this task
return dependents.filter((dependent) => dependentTaskNames.has(dependent));
}]
}]
});
// NOTE: This works even with queueing, because each this.expire call just internally queues another operation
return Promise.map(dependents, (dependent) => {
return exposedAPI.expire({
id: id,
taskName: dependent
});
});
},
// Temporary compatibility aliases
createItem: (... args) => exposedAPI.storeItem(... args),
mergeItem: (... args) => exposedAPI.moveItem(... args),
renameItem: (options) => {
if (typeof options === "string") {
return exposedAPI.moveItem(options);
} else {
return exposedAPI.moveItem({ into: options.to, from: options.from });
}
},
};
return {
// NOTE: 'internal' API methods are accessible to srap, but not to user-defined tasks.
@ -387,15 +156,7 @@ module.exports = function (state) {
}]
});
return Promise.try(() => {
return backend.lock(tx, options);
}).tap((succeeded) => {
if (succeeded) {
metrics.successfulLocks.labels({ task: task.name }).inc(1);
} else {
metrics.failedLocks.labels({ task: task.name }).inc(1);
}
});
return backend.lock(tx, options);
},
unlock: function (_tx, _options) {
@ -442,7 +203,196 @@ module.exports = function (state) {
});
},
},
exposed: exposedAPI
exposed: {
// NOTE: 'exposed' API methods are the ones that are passed into a user-defined task, and which the task uses to eg. update or create new items
getItem: function (_tx, _id, _optional) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, wrapValueAsOption("id"), {
id: [ required, isString ],
optional: [ defaultTo(false), isBoolean ] // FIXME: Can this handling be moved to the wrapper?
}]
});
return backend.getItem(tx, options);
},
storeItem: function (_tx, _options) {
// NOTE: Using `update` instead of `data` makes it an upsert!
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, {
id: [ required, isString ],
// Tags are required to be specified (even if an empty array) because it's easily forgotten
tags: [ required, arrayOf(isString) ],
aliases: [ defaultTo([]), arrayOf(isString) ],
data: [ anything ], // FIXME: Check for object
update: [ isFunction ],
failIfExists: [ defaultTo(false), isBoolean ],
allowUpsert: [ defaultTo(true), isBoolean ],
parentID: [ defaultTo(item.id), isString ]
}, requireEither([ "data", "update" ]) ]
});
let { data, ... rest } = options;
return mutableOperation((backend) => {
return backend.storeItem(tx, {
... rest,
// We normalize `data` and `update` (which are mutually-exclusive) into a single option here, so that the backend only needs to deal with the `update` case
// TODO: Can this be folded into the validation rules in a reasonable and readable way?
update: (data != null)
? (existingData) => ({ ... existingData, ... data })
: rest.update
});
});
},
moveItem: function (_tx, _options) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, wrapValueAsOption("into"), {
from: [ defaultTo(item.id), isString ],
into: [ required, isString ],
// NOTE: If no `merge` function is specified, that indicates that merging is not allowed (ie. this is strictly a rename), and mergeMetadata is ignored too
merge: [ isFunction ],
mergeMetadata: [ defaultTo({}), anyProperty({
key: [ required ],
value: [ required, isFunction ]
})],
}]
});
return mutableOperation((backend) => {
return backend.moveItem(tx, { options, allowMerge: (options.merge != null) });
});
},
deleteItem: function (_tx, _options) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [
defaultTo({}),
wrapValueAsOption("id"), {
id: [ defaultTo(item.id), isString ]
}
]
});
return mutableOperation((backend) => {
return backend.deleteItem(tx, options);
});
},
createAlias: function (_tx, _options) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, wrapValueAsOption("from"), {
from: [ required, isString ],
to: [ defaultTo(item.id), isString ],
failIfExists: [ defaultTo(false), isBoolean ] // TODO: Shouldn't this default to true, for any occurrence outside of a merge/rename?
}]
});
return mutableOperation((backend) => {
return backend.createAlias(tx, options);
});
},
deleteAlias: function (_tx, _options) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, wrapValueAsOption("from"), {
from: [ required, isString ]
}]
});
return mutableOperation((backend) => {
return backend.deleteAlias(tx, options);
});
},
updateData: function (_tx, _options) {
// NOTE: This is a semantically self-describing convenience wrapper for `createItem` that updates the currently-being-processed item
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, wrapValueAsOption("update"), {
id: [ defaultTo(item.id), isString ],
update: [ required, isFunction ]
}]
});
return mutableOperation((backend) => {
return backend.createItem(tx, {
... options,
tags: []
});
});
},
updateMetadata: function (_tx, _options) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, wrapValueAsOption("update"), {
id: [ defaultTo(item.id), isString ],
update: [ required, isFunction ],
task: [ required, isTask ]
}]
});
return mutableOperation((backend) => {
return backend.updateMetadata(tx, options);
});
},
expire: function (_tx, _options) {
// TODO: It probably doesn't make any semantic sense to leave *both* arguments unspecified. Maybe that should be prohibited via eg. a non-exclusive requireEither? Otherwise the user might expect to immediately expire the *current* task, but since the task is only updated *after* the task logic runs, that is not currently possible to express.
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, {
id: [ defaultTo(item.id), isString ],
isTask: [ defaultTo(task), isTask ]
}]
});
return mutableOperation((backend) => {
return backend.expire(tx, options);
});
},
expireDependents: function (_tx, _options) {
// NOTE: This method does not have a counterpart in the database backend; it's a convenience abstraction over regular `backend.expire` calls
let [ tx, { id, dependents }] = validateArguments(arguments, {
tx: maybeTX,
options: [ defaultTo({}), wrapValueAsOption("dependents"), {
id: [ defaultTo(item.id), isString ],
dependents: [ defaultTo([]), arrayOf(isString), (dependents) => {
// Only consider dependents that actually exist for this task
return dependents.filter((dependent) => dependentTaskNames.has(dependent));
}]
}]
});
// FIXME: This doesn't work with the synchronous queueing model
return Promise.map(dependents, (dependent) => {
return this.expire(tx, {
id: id,
taskName: dependent
});
});
},
// Temporary compatibility aliases
createItem: (... args) => this.storeItem(... args),
mergeItem: (... args) => this.moveItem(... args),
renameItem: (tx, options) => {
if (typeof options === "string") {
return this.moveItem(tx, options);
} else {
return this.moveItem(tx, { into: options.to, from: options.from });
}
},
}
};
}
};

@ -8,11 +8,9 @@ const knexLibrary = require("knex");
const { knexSnakeCaseMappers } = require("objection");
const { addMilliseconds } = require("date-fns");
const ValidationError = require("@validatem/error");
const debug = require("debug")("srap:backend:postgresql:queries");
const models = require("./models");
const mergeItems = require("../../semantics/merge-items");
const syncpipe = require("syncpipe");
let migrationsFolder = path.join(__dirname, "../../../migrations");
@ -24,13 +22,12 @@ module.exports = function(state) {
return {
defaultSettings: {
taskBatchSize: 1000,
taskBatchDelay: 5 * 60 * 1000, // FIXME: Rename to reflect changed behaviour?
delayRandomization: 0.2
taskBatchDelay: 30 * 1000
},
create: function createPostgreSQLBackend(options) {
let knex = knexLibrary({
client: "pg",
pool: { min: 0, max: 32, ... defaultValue(options.pool, {}) },
pool: { min: 0, max: 32 },
migrations: { tableName: "srap_knex_migrations" },
connection: options,
... knexSnakeCaseMappers()
@ -41,10 +38,9 @@ module.exports = function(state) {
directory: migrationsFolder
});
}).then(() => {
// TODO: Does it really make sense to be merging in the backendSettings here? Shouldn't that happen automatically in some way for *every* backend, rather than just the PostgreSQL one specifically? As backend settings are a generic backend feature
state = { ... state, knex: knex, backendSettings: options};
let db = models(state);
state = { ... state, db: db };
let db = models({ ... state, knex: knex });
let queryState = { ... state, knex, db };
// TODO: Should this be inlined instead?
function repointAliases (tx, { to, from }) {
@ -87,33 +83,15 @@ module.exports = function(state) {
},
lock: function (tx, { id, task }) {
return this.runInTransaction(tx, (tx) => {
return Promise.try(() => {
return db.TaskInProgress.query(tx).forUpdate().findById([ task.name, id ]);
}).then((item) => {
if (item != null) {
debug(`Task '${task.name}:${id}' currently locked: ${item.started}`);
if (item.started === false) {
return Promise.try(() => {
return db.TaskInProgress.query(tx)
.findById([ task.name, id ])
.update({
started: true,
started_at: knex.fn.now()
});
}).then(() => {
debug(`Task '${task.name}:${id}' locked successfully`);
return true;
});
} else {
debug(`Task '${task.name}:${id}' failed lock because already locked on read`);
return false;
}
} else {
debug(`Task '${task.name}:${id}' failed lock because it no longer exists in queue`);
return false;
}
return Promise.try(() => {
return db.TaskInProgress.query(tx).insert({
task: task.name,
item_id: id
});
}).then(() => {
return true;
}).catch({ name: "UniqueViolationError" }, () => {
return false;
});
},
@ -126,7 +104,7 @@ module.exports = function(state) {
});
// TODO: return true/false depending on whether unlocking succeeded?
},
getItem: function (tx, { id, optional }) {
return Promise.try(() => {
return db.Alias.relatedQuery("item", tx)
@ -144,7 +122,7 @@ module.exports = function(state) {
storeItem: function (tx, { id, tags, aliases, update, failIfExists, allowUpsert, parentID }) {
// FIXME: Make failIfExists actually work, currently it does nothing as the UNIQUE constraint violation cannot occur for an upsert
// TODO: Ensure that we run the transaction in full isolation mode, and retry in case of a conflict
return Promise.try(() => {
// NOTE: We look up by alias, since this is an upsert - and so if the specified ID already exists as an alias, we should update the existing item instead of creating a new one with the specified (aliased) ID
return db.Alias
@ -168,7 +146,7 @@ module.exports = function(state) {
id: actualID,
data: update(existingData),
createdBy: parentID,
tags: tags.map((tag) => ({ name: tag, itemId: actualID })),
tags: tags.map((tag) => ({ name: tag })),
aliases: allAliases.map((alias) => ({ alias: alias })),
updatedAt: new Date()
};
@ -283,12 +261,19 @@ module.exports = function(state) {
return Promise.try(() => {
return getTaskResult(tx, task, id);
}).then((taskResult) => {
let sharedFields = {
isInvalidated: false,
updatedAt: new Date()
};
if (taskResult != null) {
return taskResult.$query(tx).patch({
... sharedFields,
metadata: update(taskResult.metadata),
});
} else {
return db.TaskResult.query(tx).insert({
... sharedFields,
task: task.name,
itemId: id,
metadata: update({})
@ -326,9 +311,7 @@ module.exports = function(state) {
}).then((taskResult) => {
let sharedFields = {
isSuccessful: isSuccessful,
isInvalidated: false,
taskVersion: task.version,
updatedAt: new Date(),
expiresAt: (task.ttl != null)
? addMilliseconds(new Date(), task.ttl)
: undefined
@ -354,25 +337,14 @@ module.exports = function(state) {
countLockedTasks: function (tx) {
return Promise.try(() => {
return db.TaskInProgress.query(tx).count({ count: "*" }).where({ started: true });
return db.TaskInProgress.query(tx).count({ count: "*" });
}).then((result) => {
return result[0].count;
});
},
getQueueSize: function (tx) {
return Promise.try(() => {
return tx.raw(`SELECT task, COUNT(*) FROM srap_queue WHERE started = false GROUP BY task;`);
}).then((result) => {
return syncpipe(result.rows, [
_ => _.map(({ task, count }) => [ task, parseInt(count) ]),
_ => Object.fromEntries(_)
]);
});
},
getUpdateStream: require("./queries/get-update-stream")(state),
getTaskStream: require("./queries/get-task-stream")(state)
getUpdateStream: require("./queries/get-update-stream")(queryState),
getTaskStream: require("./queries/get-task-stream")(queryState)
};
});
}

@ -26,7 +26,7 @@ module.exports = function ({ db }) {
tasksInProgress: {
relation: Model.HasManyRelation,
modelClass: db.TaskInProgress,
join: { from: "srap_items.id", to: "srap_queue.itemId" }
join: { from: "srap_items.id", to: "srap_tasksInProgress.itemId" }
},
failedTasks: {
// Not actually a many-to-many, but that's what objection calls a HasManyThrough...
@ -35,7 +35,7 @@ module.exports = function ({ db }) {
modelClass: db.Failure,
join: {
from: "srap_items.id",
through: { from: "srap_taskResults.itemId", to: "srap_taskResults.id" },
through: { from: "srap_task_results.itemId", to: "srap_task_results.id" },
to: "srap_failures.taskResultId"
}
}

@ -5,14 +5,13 @@ const { Model, QueryBuilder } = require("objection");
module.exports = function ({ db }) {
return class Tag extends Model {
static tableName = "srap_tags";
static idColumn = [ "name", "itemId" ];
static get relationMappings() {
return {
item: {
relation: Model.BelongsToOneRelation,
modelClass: db.Item,
join: { from: "srap_tags.itemId", to: "srap_items.id" }
join: { from: "srap_tags.itemId", to: "srap_item.id" }
}
};
};

@ -4,7 +4,7 @@ const { Model } = require("objection");
module.exports = function ({ db }) {
return class TaskInProgress extends Model {
static tableName = "srap_queue";
static tableName = "srap_tasksInProgress";
static idColumn = [ "task", "itemId" ];
static get relationMappings() {
@ -12,7 +12,7 @@ module.exports = function ({ db }) {
item: {
relation: Model.BelongsToOneRelation,
modelClass: db.Item,
join: { from: "srap_queue.itemId", to: "srap_items.id" }
join: { from: "srap_tasksInProgress.itemId", to: "srap_item.id" }
}
};
};

@ -12,7 +12,7 @@ module.exports = function ({ db }) {
item: {
relation: Model.BelongsToOneRelation,
modelClass: db.Item,
join: { from: "srap_taskResults.itemId", to: "srap_items.id" }
join: { from: "srap_taskResults.itemId", to: "srap_item.id" }
}
};
};

@ -1,182 +1,106 @@
"use strict";
const Promise = require("bluebird");
const debug = require("debug")("srap:backend:postgresql:query:get-task-stream");
const buffer = require("@promistream/buffer");
const pipe = require("@promistream/pipe");
const simpleSource = require("@promistream/simple-source");
const fetchQuery = `
SELECT
srap_items.*
FROM
srap_queue
LEFT JOIN srap_items
ON srap_items.id = srap_queue.item_id
WHERE
srap_queue.task = :task
AND srap_queue.started = FALSE
LIMIT :resultLimit
`;
function makeFillQuery(withDependencies) {
return `
WITH
${withDependencies ? `
dependencies AS (
SELECT * FROM json_to_recordset(:dependencyTaskDefinitions) AS x(task text, task_version text)
),
satisfied AS (
SELECT results.* FROM dependencies
const query = `
WITH
dependency_tasks AS (
SELECT * FROM
json_to_recordset(:dependencyTaskDefinitions) AS x(task text, task_version text)
),
matching_items AS (
SELECT
DISTINCT ON (srap_items.id)
srap_items.*,
results.updated_at AS result_date,
results.task_version,
(
results.is_successful = TRUE
AND (
results.expires_at < NOW()
OR results.is_invalidated = TRUE
)
) AS is_candidate
FROM srap_items
INNER JOIN srap_tags
ON srap_tags.item_id = srap_items.id
AND srap_tags.name = ANY(:tags)
LEFT JOIN srap_task_results AS results
ON results.item_id = srap_items.id
AND results.task = :task
WHERE
NOT EXISTS (
SELECT FROM srap_tasks_in_progress AS pr WHERE pr.item_id = srap_items.id
)
),
candidates AS (
SELECT * FROM matching_items
WHERE result_date IS NULL
UNION
SELECT * FROM matching_items
WHERE is_candidate = TRUE
OR NOT (task_version = :taskVersion)
)
(
SELECT
*
FROM
candidates
WHERE
NOT EXISTS (
SELECT
results.*
FROM dependency_tasks
LEFT JOIN srap_task_results AS results
ON dependencies.task = results.task
AND dependencies.task_version = results.task_version
WHERE
ON dependency_tasks.task = results.task
AND dependency_tasks.task_version = results.task_version
AND results.item_id = candidates.id
WHERE
results.is_successful IS NULL
OR results.is_successful = FALSE
OR (
results.is_successful = TRUE
AND results.is_invalidated = FALSE
AND (
results.expires_at > NOW()
OR results.expires_at IS NULL
results.expires_at < NOW()
OR results.is_invalidated = TRUE
)
),
counts AS (
SELECT item_id, COUNT(task) AS count FROM satisfied GROUP BY item_id
),
dependency_candidates AS (
SELECT item_id FROM counts WHERE count = :dependencyCount
),
` : "" }
tag_candidates AS (
SELECT item_id FROM srap_tags WHERE name = ANY(:tags)
),
full_candidates AS MATERIALIZED (
${withDependencies
? `
SELECT tag_candidates.item_id FROM dependency_candidates
INNER JOIN tag_candidates
ON dependency_candidates.item_id = tag_candidates.item_id
`
: `
SELECT item_id FROM tag_candidates
`
}
),
tasks AS NOT MATERIALIZED (
SELECT
item_id,
is_successful,
(
results.is_successful = FALSE
OR (
results.is_successful = TRUE
AND results.is_invalidated = FALSE
AND (
results.expires_at > NOW()
OR results.expires_at IS NULL
)
)
) AS is_completed
FROM srap_task_results AS results
WHERE
results.task = :task
AND results.task_version = :taskVersion
)
)
SELECT
:task AS task,
full_candidates.item_id
FROM full_candidates
LEFT JOIN tasks ON full_candidates.item_id = tasks.item_id
WHERE tasks.is_completed IS NOT TRUE
ORDER BY tasks.is_successful NULLS FIRST
`;
}
const fillQueryWithDependencies = makeFillQuery(true);
const fillQueryWithoutDependencies = makeFillQuery(false);
) LIMIT :resultLimit;
`;
module.exports = function ({ metrics, backendSettings, knex }) {
module.exports = function ({ metrics, backendSettings }) {
return function (tx, { task }) {
let hasDependencies = (task.dependencies.length > 0);
let refillParameters = {
tags: task.tags,
task: task.name,
taskVersion: task.version,
... hasDependencies
? {
dependencyCount: task.dependencies.length,
return simpleSource(() => {
let startTime = Date.now();
return Promise.try(() => {
return tx.raw(query, {
tags: task.tags,
task: task.name,
taskVersion: task.version,
resultLimit: backendSettings.taskBatchSize,
dependencyTaskDefinitions: JSON.stringify(task.dependencies.map((dependency) => {
// Case-mapping for SQL compatibility
return { task_version: dependency.version, task: dependency.name };
}))
}
: {}
};
let fetchParameters = {
task: task.name,
resultLimit: backendSettings.taskBatchSize
// resultLimit: 1 // For tracking down race conditions
};
function refillQueue() {
let startTime = Date.now();
return Promise.try(() => {
let fillQuery = (hasDependencies)
? fillQueryWithDependencies
: fillQueryWithoutDependencies;
// NOTE: We are deliberately bypassing the transaction here! Also deliberately not using VALUES, since we're inserting from the results of another query instead
return knex.raw(`
INSERT INTO srap_queue (task, item_id)
(${fillQuery})
ON CONFLICT (task, item_id) DO NOTHING;
`, refillParameters);
}).then((response) => {
});
}).then((result) => {
let timeElapsed = Date.now() - startTime;
metrics.taskRefillTime.labels({ task: task.name }).set(timeElapsed / 1000);
metrics.taskRefillResults.labels({ task: task.name }).set(response.rowCount);
debug(`Queue for '${task.name}' was refilled with ${response.rowCount} items in ${timeElapsed}ms`);
return response.rowCount;
metrics.taskFetchTime.labels({ task: task.name }).set(timeElapsed / 1000);
metrics.taskFetchResults.labels({ task: task.name }).set(result.rowCount);
debug(`Task retrieval query for '${task.name}' took ${timeElapsed}ms and produced ${result.rowCount} results`);
if (result.rowCount > 0) {
return result.rows;
} else {
// TODO: Consider using LISTEN/NOTIFY instead?
return Promise.resolve([]).delay(backendSettings.taskBatchDelay);
}
});
}
return pipe([
simpleSource(() => {
let startTime = Date.now();
return Promise.try(() => {
return tx.raw(fetchQuery, fetchParameters);
}).then((result) => {
let timeElapsed = Date.now() - startTime;
metrics.taskFetchTime.labels({ task: task.name }).set(timeElapsed / 1000);
metrics.taskFetchResults.labels({ task: task.name }).set(result.rowCount);
debug(`Task retrieval query for '${task.name}' took ${timeElapsed}ms and produced ${result.rowCount} results`);
if (result.rowCount > 0) {
return result.rows;
} else {
return Promise.try(() => {
return refillQueue();
}).then((newItems) => {
if (newItems === 0) {
// TODO: Consider using LISTEN/NOTIFY instead? Worth the added complexity?
let randomization = Math.random() * backendSettings.delayRandomization * backendSettings.taskBatchDelay; // To prevent stampeding by low-throughput tasks
return Promise.resolve([]).delay(backendSettings.taskBatchDelay + randomization);
} else {
// Have another go right away
return [];
}
});
}
});
}),
buffer()
]);
});
};
};
}

@ -7,7 +7,7 @@ const combineSequentialStreaming = require("@promistream/combine-sequential-stre
const fromIterable = require("@promistream/from-iterable");
const fromNodeStream = require("@promistream/from-node-stream");
const createTypeTaggingStream = require("../../../streams/tag-type");
const createTypeTaggingStream = require("./streams/tag-type");
module.exports = function ({ db, knex }) {
return function (tx, { timestamp, prefix }) {

@ -1,183 +1,179 @@
"use strict";
const Promise = require("bluebird");
const chalk = require("chalk");
const mergeItems = require("../../semantics/merge-items");
function printTX(tx) {
// TODO: Print entire chain?
return chalk.bold.yellow(`[tx ${tx.__txID ?? "?"}]`);
return `[tx ${tx.__txID}]`;
}
function printItem(id, task) {
if (task != null) {
return chalk.bold.white(`[${id}][${task.name}]`);
return `[${id}:${task.name}]`;
} else {
return chalk.bold.white(`[${id}]`);
return `[${id}]`;
}
}
// FIXME: Move logs to logging hook
function logSimulated(... args) {
console.log(chalk.gray(args[0]), ... args.slice(1));
console.log(... args);
}
module.exports = function (state) {
// NOTE: The simulated backend needs access to the 'real' backend; a task may eg. mutate an item based on its current data, and we'd need to read that from the real data source. The only constraint is that the simulated backend cannot *mutate* anything in the real backend, but reading is fine!
return {
defaultSettings: {},
create: function createSimulatedBackend({ backend }) {
let txCounter = 0;
let locks = new Map(); // Map<task, Set<id>>
return {
shutdown: function () {
return backend.shutdown();
},
getDefaultTransaction: function () {
return { __txID: null };
},
isTransaction: function (value) {
return ("__txID" in value);
},
runInTransaction: function (tx, callback) {
let newTransaction = { __txID: txCounter++, __parentTX: tx };
return callback(newTransaction);
},
lock: function (tx, { id, task }) {
if (!locks.has(task)) {
locks.set(task, new Set());
}
return function attachSimulatedBackend({ backend }) {
return {
defaultSettings: {},
create: function createSimulatedBackend(_options) {
let txCounter = 0;
let locks = new Map(); // Map<task, Set<id>>
return {
shutdown: function () {
return backend.shutdown();
},
getDefaultTransaction: function () {
return { __txID: null };
},
isTransaction: function (value) {
return ("__txID" in value);
},
runInTransaction: function (tx, callback) {
let newTransaction = { __txID: txCounter++, __parentTX: tx };
return callback(newTransaction);
},
lock: function (tx, { id, task }) {
if (!locks.has(task)) {
locks.set(task, new Set());
}
let taskLocks = locks.get(task);
let taskLocks = locks.get(task);
if (taskLocks.has(id)) {
logSimulated(`${printTX(tx)} Already locked: ${printItem(id, task)}`);
return false;
} else {
logSimulated(`${printTX(tx)} Locking ${printItem(id, task)}`);
taskLocks.add(id);
return true;
}
},
unlock: function (tx, { id, task }) {
logSimulated(`${printTX(tx)} Unlocking ${printItem(id, task)}`);
locks.get(task).delete(id);
},
getItem: function (tx, options) {
return backend.getItem(backend.getDefaultTransaction(), options);
},
storeItem: function (tx, { id, parentID, update, tags, aliases, allowUpsert }) {
return Promise.try(() => {
return this.getItem(tx, { id: id, optional: true });
}).then((currentItem) => {
let actualID = (currentItem != null)
? currentItem.id
: id;
let newItem = {
id: actualID,
data: (currentItem != null)
? update(currentItem.data)
: update({}),
createdBy: parentID,
tags: tags,
aliases: aliases.concat([ actualID ]),
updatedAt: new Date()
};
logSimulated(`${printTX(tx)} ${printItem(id)} Storing item (upsert=${allowUpsert})`, newItem);
});
},
moveItem: function (tx, { from, into, merge, mergeMetadata, allowMerge }) {
return Promise.all([
this.getItem(tx, { id: from, optional: true }),
this.getItem(tx, { id: into, optional: true }),
]).then(([ fromObj, maybeIntoObj ]) => {
if (fromObj != null) {
let intoObj = maybeIntoObj ?? {
id: into,
data: {},
taskResults: []
if (taskLocks.has(id)) {
logSimulated(`${printTX(tx)} Already locked: ${printItem(id, task)}`);
return false;
} else {
logSimulated(`${printTX(tx)} Locking ${printItem(id, task)}`);
taskLocks.add(id);
return true;
}
},
unlock: function (tx, { id, task }) {
logSimulated(`${printTX(tx)} Unlocking ${printItem(id, task)}`);
locks.get(task).delete(id);
},
getItem: function (tx, options) {
return backend.getItem(backend.getDefaultTransaction(), options);
},
storeItem: function (tx, { id, parentID, update, tags, aliases, allowUpsert }) {
return Promise.try(() => {
return this.getItem(tx, { id: id, optional: true });
}).then((currentItem) => {
let actualID = currentItem.id ?? id;
let newItem = {
id: actualID,
data: (currentItem != null)
? update(currentItem.data)
: update({}),
createdBy: parentID,
tags: tags,
aliases: aliases.concat([ actualID ]),
updatedAt: new Date()
};
if (allowMerge) {
let newItem = mergeItems({ fromObj, intoObj, merge, mergeMetadata });
logSimulated(`${printTX(tx)} ${printItem(from)} Moving item to ${printItem[into]} (merge=true)`, newItem);
} else {
logSimulated(`${printTX(tx)} ${printItem(from)} Moving item to ${printItem[into]} (merge=false)`);
logSimulated(`${printTX(tx)} ${printItem(id)} Storing item (upsert=${allowUpsert})`, newItem);
});
},
moveItem: function (tx, { from, into, merge, mergeMetadata, allowMerge }) {
return Promise.all([
this.getItem(tx, { id: from, optional: true }),
this.getItem(tx, { id: into, optional: true }),
]).then((fromObj, maybeIntoObj) => {
if (fromObj != null) {
let intoObj = maybeIntoObj ?? {
id: into,
data: {},
taskResults: []
};
if (allowMerge) {
let newItem = mergeItems({ fromObj, intoObj, merge, mergeMetadata });
logSimulated(`${printTX(tx)} ${printItem(from)} Moving item to ${printItem[into]} (merge=true)`, newItem);
} else {
logSimulated(`${printTX(tx)} ${printItem(from)} Moving item to ${printItem[into]} (merge=false)`);
}
}
});
},
deleteItem: function (tx, { id }) {
logSimulated(`${printTX(tx)} ${printItem(id)} Deleting item`);
},
createAlias: function (tx, { from, to }) {
logSimulated(`${printTX(tx)} ${printItem(to)} Creating alias ${from}`);
},
deleteAlias: function (tx, { from }) {
logSimulated(`${printTX(tx)} ${printItem(from)} Deleting alias`);
},
updateMetadata: function (tx, { id, task, update }) {
return Promise.try(() => {
return this.getItem(tx, { id, optional: true });
}).then((item) => {
let taskResult = (item != null)
? item.taskResults.find((result) => result.task === task)
: undefined;
let existingData = (taskResult != null)
? taskResult.metadata
: {};
let newData = update(existingData);
logSimulated(`${printTX(tx)} ${printItem(id, task)} Storing metadata`, newData);
});
},
expire: function (tx, { id, task }) {
logSimulated(`${printTX(tx)} ${printItem(id, task)} Expiring task result`);
},
markTaskStatus: function (tx, { id, task, isSuccessful }) {
if (isSuccessful) {
logSimulated(`${printTX(tx)} ${printItem(id, task)} Marking task as completed`);
} else {
logSimulated(`${printTX(tx)} ${printItem(id, task)} Marking task as failed`);
}
});
},
deleteItem: function (tx, { id }) {
logSimulated(`${printTX(tx)} ${printItem(id)} Deleting item`);
},
createAlias: function (tx, { from, to }) {
logSimulated(`${printTX(tx)} ${printItem(to)} Creating alias ${from}`);
},
deleteAlias: function (tx, { from }) {
logSimulated(`${printTX(tx)} ${printItem(from)} Deleting alias`);
},
updateMetadata: function (tx, { id, task, update }) {
return Promise.try(() => {
return this.getItem(tx, { id, optional: true });
}).then((item) => {
let taskResult = (item != null)
? item.taskResults.find((result) => result.task === task)
: undefined;
let existingData = (taskResult != null)
? taskResult.metadata
: {};
let newData = update(existingData);
logSimulated(`${printTX(tx)} ${printItem(id, task)} Storing metadata`, newData);
});
},
expire: function (tx, { id, task }) {
logSimulated(`${printTX(tx)} ${printItem(id, task)} Expiring task result`);
},
markTaskStatus: function (tx, { id, task, isSuccessful }) {
if (isSuccessful) {
logSimulated(`${printTX(tx)} ${printItem(id, task)} Marking task as completed`);
} else {
logSimulated(`${printTX(tx)} ${printItem(id, task)} Marking task as failed`);
},
countLockedTasks: function (tx) {
return Array.from(locks).reduce((total, [ task, taskLocks ]) => {
return total + taskLocks.size;
}, 0);
},
getUpdateStream: function (... args) {
return backend.getUpdateStream(... args);
},
getTaskStream: function (... args) {
return backend.getTaskStream(... args);
}
},
countLockedTasks: function (tx) {
return Array.from(locks).reduce((total, [ task, taskLocks ]) => {
return total + taskLocks.size;
}, 0);
},
getUpdateStream: function (... args) {
return backend.getUpdateStream(... args);
},
getTaskStream: function (... args) {
return backend.getTaskStream(... args);
}
};
}
};
}
};
};
};

@ -3,7 +3,7 @@
const syncpipe = require("syncpipe");
const defaultValue = require("default-value"); // FIXME: Move to config validation
const invertMapping = require("./invert-mapping");
const invertMapping = require("./util/invert-mapping");
module.exports = function generateTaskGraph({ tags, tasks }) {
let tagsMapping = invertMapping(tags);
@ -14,8 +14,7 @@ module.exports = function generateTaskGraph({ tags, tasks }) {
return [ name, {
... taskDefinition,
name: name,
// NOTE: The default here is for cases where a task is 'orphaned' and not associated with any tags; this can happen during development, and in that case the task won't be present in the tagsMapping at all.
tags: tagsMapping[name] ?? [],
tags: tagsMapping[name],
dependencies: [],
dependents: []
}];

@ -1,19 +1,22 @@
"use strict";
const Promise = require("bluebird");
const defaultValue = require("default-value");
const chalk = require("chalk");
const unreachable = require("@joepie91/unreachable")("srap");
const util = require("util");
const syncpipe = require("syncpipe");
const rateLimit = require("@promistream/rate-limit");
const simpleSink = require("@promistream/simple-sink");
const pipe = require("@promistream/pipe");
const parallelize = require("@promistream/parallelize");
const logStatus = require("./log-status");
const { validateOptions } = require("@validatem/core");
const isValidConfiguration = require("./validators/is-valid-configuration");
const createPrometheus = require("./prometheus");
const generateTaskGraph = require("./util/generate-task-graph");
const asyncInterval = require("./util/async-interval");
const generateTaskGraph = require("./generate-task-graph");
const unreachable = require("@joepie91/unreachable")("srap");
// FIXME: *Require* a taskInterval to be set, even if explicitly null, to prevent accidentally forgetting it
@ -43,8 +46,7 @@ module.exports = async function createKernel(_configuration) {
Object.assign(state, { backend: backend });
const createTaskKernel = require("./streams/task-kernel")(state);
const runTask = require("./run-task")(state);
const createTaskKernel = require("./task-kernel")(state);
function checkLockedTasks() {
return Promise.try(() => {
@ -61,41 +63,29 @@ module.exports = async function createKernel(_configuration) {
if (databasePreparePromise == null) {
databasePreparePromise = Promise.all([
checkLockedTasks(),
backend.topLevel.insertSeeds(null, { seeds: configuration.seed })
backend.topLevel.insertSeeds(configuration.seed)
]);
}
return databasePreparePromise;
}
// FIXME: Don't dump to console.log below, since this is meant to be usable as a library as well - provide some sort of object logging hook instead?
return {
run: async function runKernel() {
console.log(`Starting ${tasks.size} tasks...`);
await prepareDatabase();
asyncInterval(60 * 1000, () => {
return Promise.try(() => {
return backend.topLevel.getQueueSize();
}).then((queueSize) => {
for (let [ taskName, size ] of Object.entries(queueSize)) {
metrics.taskQueueSize.labels({ task: taskName }).set(size);
}
});
});
await prepareDatabase();
return Promise.map(tasks.values(), (task) => {
return pipe([
createTaskKernel(task, {
globalRateLimiter: (attachToGlobalRateLimit != null) ? attachToGlobalRateLimit() : null,
}),
simpleSink(({ status }) => {
createTaskKernel(task),
simpleSink(({ status, item, error }) => {
if (status === "completed") {
metrics.successfulItems.inc(1);
metrics.successfulItems.labels({ task: task.name }).inc(1);
metrics.successfulItems.labels({ task: task }).inc(1);
} else if (status === "failed") {
metrics.failedItems.inc(1);
metrics.failedItems.labels({ task: task.name }).inc(1);
metrics.failedItems.labels({ task: task }).inc(1);
} else {
unreachable(`Unrecognized status '${status}'`);
}
@ -103,31 +93,19 @@ module.exports = async function createKernel(_configuration) {
]).read();
});
},
simulate: async function simulate({ itemID, task: taskName }) {
console.log(`Simulating task ${itemID}/${taskName}...`);
simulate: async function simulate({ itemID, task }) {
await prepareDatabase();
let simulatedBackend = backend.topLevel.simulate();
let simulateTask = require("./run-task")({
... state,
backend: simulatedBackend
});
let item = await simulatedBackend.topLevel.getItem(null, { id: itemID });
return simulateTask(tasks.get(taskName), item);
let simulatedBackend = backend.simulate();
return simulateTask(itemID, task);
},
execute: async function simulate({ itemID, task: taskName }) {
// TODO: Should this also lock the task? We probably want to ignore any locks, since this method is primarily used for task logic debugging purposes, and overriding locks would be desirable there.
console.log(`Running task ${itemID}/${taskName}...`);
execute: async function simulate({ itemID, task }) {
await prepareDatabase();
let item = await backend.topLevel.getItem(null, { id: itemID });
return runTask(tasks.get(taskName), item);
return executeTask(itemID, task);
},
shutdown: function () {
// TODO: Properly lock all public methods after shutdown is called, and wait for any running tasks to have completed
return backend.topLevel.shutdown();
return backend.shutdown();
},
getMetrics: function () {
return Promise.try(() => {
@ -138,9 +116,167 @@ module.exports = async function createKernel(_configuration) {
metrics: metrics
};
});
}
};
function runTaskStreams() {
return Promise.map(Object.entries(tasks), ([ task, tags ]) => {
let taskConfiguration = configuration.tasks[task];
if (taskConfiguration != null) {
let taskStream = createTaskStream({
task: task,
tags: tags,
taskVersion: defaultValue(taskConfiguration.version, "0"),
taskInterval: taskConfiguration.taskInterval,
parallelTasks: taskConfiguration.parallelTasks,
ttl: taskConfiguration.ttl,
run: taskConfiguration.run,
globalRateLimiter: (attachToGlobalRateLimit != null)
? attachToGlobalRateLimit()
: null,
globalParallelize: (configuration.parallelTasks != null)
? parallelize(configuration.parallelTasks)
: null,
taskDependencies: dependencyMap[task],
taskDependents: dependentMap[task]
});
return pipe([
taskStream,
simpleSink((completedItem) => {
metrics.successfulItems.inc(1);
metrics.successfulItems.labels({ task: task }).inc(1);
logStatus(task, chalk.bold.green, "completed", completedItem.id);
})
]).read();
} else {
throw new Error(`Task '${task}' is defined to run for tags [${tags}], but no such task is defined`);
}
}).catch((error) => {
console.dir(error, { depth: null, colors: true });
throw error;
});
}
function executeTask(id, task) {
let taskConfiguration = configuration.tasks[task];
return knex.transaction((tx) => {
return Promise.try(() => {
return queries.getItem(knex, id);
}).then((item) => {
let queue = createDatabaseQueue({
tx,
item,
task,
taskVersion: defaultValue(taskConfiguration.version, "0"),
taskDependents: dependentMap[task],
taskDependencies: dependencyMap[task]
});
return Promise.try(() => {
return taskConfiguration.run({
id: item.id,
data: item.data,
getItem: function (id) {
return queries.getItem(knex, id);
},
... queue.api
});
}).then(() => {
return queue.execute();
});
});
}, { doNotRejectOnRollback: false });
}
function simulateTask(id, task) {
let taskConfiguration = configuration.tasks[task];
let methods = [ "createItem", "renameItem", "mergeItem", "deleteItem", "createAlias", "deleteAlias", "updateData", "updateMetadata", "expire", "expireDependents" ];
let simulatedMethods = syncpipe(methods, [
(_) => _.map((method) => [ method, function() {
console.log(`${chalk.bold.yellow.bgBlack(`${method} (simulated):`)} ${util.inspect(arguments, { colors: true, depth: null })}`);
}]),
(_) => Object.fromEntries(_)
]);
return Promise.try(() => {
return queries.getItem(knex, id);
}).then((item) => {
return taskConfiguration.run({
id: item.id,
data: item.data,
getItem: function (id) {
return queries.getItem(knex, id);
},
... simulatedMethods
});
});
}
return {
run: function runKernel() {
return Promise.try(() => {
return insertSeeds();
}).then(() => {
return checkLockedTasks();
}).then(() => {
return runTaskStreams();
});
},
simulate: function simulate({ itemID, task }) {
return Promise.try(() => {
return insertSeeds();
}).then(() => {
return checkLockedTasks();
}).then(() => {
return simulateTask(itemID, task);
});
},
getUpdates: function ({ prefix, timestamp }) {
return backend.topLevel.getUpdateStream(null, { prefix, timestamp });
execute: function simulate({ itemID, task }) {
return Promise.try(() => {
return insertSeeds();
}).then(() => {
return checkLockedTasks();
}).then(() => {
return executeTask(itemID, task);
});
},
shutdown: function () {
// TODO: Properly lock all public methods after shutdown is called, and wait for any running tasks to have completed
knex.destroy();
},
getMetrics: function () {
return Promise.try(() => {
return prometheusRegistry.metrics();
}).then((metrics) => {
return {
contentType: prometheusRegistry.contentType,
metrics: metrics
};
});
}
};
};

@ -3,5 +3,5 @@
const chalk = require("chalk");
module.exports = function logStatus(task, color, type, message) {
console.log(`${chalk.bold(`[${task.name}]`)} ${color(`[${type}]`)} ${message}`);
console.log(`${chalk.bold(`[${task}]`)} ${color(`[${type}]`)} ${message}`);
};

@ -27,18 +27,6 @@ module.exports = function createPrometheus() {
help: "Amount of items that have failed during processing",
labelNames: [ "task" ]
}),
successfulLocks: new prometheusClient.Counter({
registers: [ prometheusRegistry ],
name: "srap_successful_locks_total",
help: "Amount of queue item lock attempts that were successful",
labelNames: [ "task" ]
}),
failedLocks: new prometheusClient.Counter({
registers: [ prometheusRegistry ],
name: "srap_failed_locks_total",
help: "Amount of queue item lock attempts that failed",
labelNames: [ "task" ]
}),
taskFetchTime: new prometheusClient.Gauge({
registers: [ prometheusRegistry ],
name: "srap_task_fetch_seconds",
@ -50,25 +38,8 @@ module.exports = function createPrometheus() {
name: "srap_task_fetch_results_count",
help: "Amount of new scraping tasks fetched during the most recent attempt",
labelNames: [ "task" ]
}),
taskRefillTime: new prometheusClient.Gauge({
registers: [ prometheusRegistry ],
name: "srap_task_refill_seconds",
help: "Time needed for the most recent refill of the task queue",
labelNames: [ "task" ]
}),
taskRefillResults: new prometheusClient.Gauge({
registers: [ prometheusRegistry ],
name: "srap_task_refill_results_count",
help: "Amount of new scraping tasks added to queue during the most recent attempt",
labelNames: [ "task" ]
}),
taskQueueSize: new prometheusClient.Gauge({
registers: [ prometheusRegistry ],
name: "srap_task_queue_count",
help: "Amount of scraping tasks currently queued up",
labelNames: [ "task" ]
})
// FIXME: Measure queue-refill task
}
};
};

@ -0,0 +1,36 @@
"use strict";
const Promise = require("bluebird");
const consumable = require("@joepie91/consumable");
const syncpipe = require("syncpipe");
const createMutationAPIWrapper = require("./mutation-api/wrapper");
module.exports = function (state) {
const createDatabaseMutationAPI = require("./mutation-api/database")(state);
return function createDatabaseQueue(context) {
let databaseMutationAPI = createDatabaseMutationAPI(context);
let mutationAPI = createMutationAPIWrapper(context, databaseMutationAPI);
let queue = consumable([]);
return {
api: syncpipe(Object.keys(mutationAPI), [
(_) => _.map((method) => [ method, function() { queue.peek().push([ method, arguments ]); } ]),
(_) => Object.fromEntries(_)
]),
execute: function () {
if (!queue.peek().some((method) => method[0] === "updateMetadata")) {
// Doing an updateMetadata call is necessary to mark a task 'completed', so we inject a dummy call that doesn't actually change the metadata itself
// FIXME: Split apart 'markTaskCompleted' and 'updateMetadata' queries so that this hack is no longer necessary
queue.peek().push([ "updateMetadata", [ (data) => data ]]);
}
return Promise.each(queue.consume(), ([ method, args ]) => {
return mutationAPI[method](... args);
});
}
};
};
};

@ -1,55 +0,0 @@
"use strict";
const Promise = require("bluebird");
const chalk = require("chalk");
const logStatus = require("./util/log-status");
module.exports = function ({ backend }) {
return function runTask(task, item) {
let queue = [];
let itemIsDeleted = false;
let api = backend.forItem({
task: task,
item: item,
mutationQueue: queue,
onDeleteSelf: () => { itemIsDeleted = true; }
});
return Promise.try(() => {
// TODO: Standardize logging control/levels interface, also for library use
if (!process.env.SRAP_QUIET) {
logStatus(task, chalk.bold.cyan, "started", item.id);
}
// NOTE: We only pass in the item data itself, *not* any associated metadata like tags. If the scraping task wants access to that sort of information, it should do a `getItem` call from within its task logic where needed.
// FIXME: Is that actually still true post-refactor?
return task.run({
data: item.data,
... api.exposed
});
}).then(() => {
// NOTE: We only apply changes at the very end (outside of simulation mode), so that when a task implementation contains multiple operations, each of those operation always 'sees' the state at the start of the task, not the state after the previous mutation. This makes the model as a whole easier to reason about. In simulation mode, all calls are immediate and the queue is empty - after all, no mutation can happen in that case anyway. This is also another reason to ensure that operations in live mode always see the starting state; that makes its behaviour consistent with simulation mode.
return backend.topLevel.runInTransaction(null, (tx) => {
return Promise.each(queue, (operation) => {
return operation(tx);
});
});
}).then(async () => {
if (!itemIsDeleted) {
await api.internal.markTaskCompleted();
}
if (!process.env.SRAP_QUIET) {
logStatus(task, chalk.bold.green, "completed", item.id);
}
return { status: "completed", item: item };
}).catch(async (error) => {
await api.internal.markTaskFailed(null, { error });
logStatus(task, chalk.bold.red, "failed", `${item.id}: ${error.stack}`);
return { status: "failed", item: item, error: error };
});
};
};

@ -0,0 +1,42 @@
"use strict";
const Promise = require("bluebird");
// const { UniqueViolationError } = require("objection");
const pipe = require("@promistream/pipe");
const map = require("@promistream/map");
const mapFilter = require("@promistream/map-filter");
module.exports = function ({ backend }) {
return function processTaskSafely(task, processHandler) {
let lockStream = mapFilter((item) => {
return Promise.try(() => {
return backend.lock(null, { id: item.id, task: task });
}).then((success) => {
if (success) {
return item;
} else {
return mapFilter.NoValue;
}
});
});
let processUnlockStream = map((item) => {
return Promise.try(() => {
return backend.runInTransaction((tx) => {
return processHandler(item, tx);
});
}).finally(() => {
// NOTE: The unlock deliberately happens outside of a transaction, so that it can always succeed, even if a task and its associated database changes failed
return backend.unlock(null, { id: item.id, task: task });
}).then(() => {
return item;
});
});
return pipe([
lockStream,
processUnlockStream
]);
};
};

@ -1,37 +0,0 @@
"use strict";
const Promise = require("bluebird");
const pipe = require("@promistream/pipe");
const filter = require("@promistream/filter");
const map = require("@promistream/map");
const rateLimit = require("@promistream/rate-limit");
const parallelize = require("@promistream/parallelize");
// FIXME: Move logs to logging hook
module.exports = function (state) {
let { backend } = state;
const runTask = require("../run-task")(state);
return function createTaskKernelStream(task, { globalRateLimiter }) {
return pipe([
backend.topLevel.getTaskStream(null, { task: task }),
filter((item) => backend.forItem({ task: task, item: item }).internal.lock()),
globalRateLimiter,
(task.taskInterval != null)
? rateLimit(task.taskInterval)
: null,
map((item) => {
return Promise.try(() => {
return runTask(task, item);
}).tap(() => {
return backend.forItem({ task: task, item: item }).internal.unlock();
});
}),
(task.parallelTasks != null)
? parallelize(task.parallelTasks)
: null
]);
};
};

@ -0,0 +1,53 @@
"use strict";
const chalk = require("chalk");
const pipe = require("@promistream/pipe");
const filter = require("@promistream/filter");
const map = require("@promistream/map");
const logStatus = require("./log-status");
module.exports = function ({ backend }) {
function runTask(task, item) {
let queue = [];
let api = backend.forItem({ task: task, id: item.id, mutationQueue: queue });
return Promise.try(() => {
logStatus(task, chalk.bold.cyan, "started", item.id);
// NOTE: We only pass in the item data itself, *not* any associated metadata like tags. If the scraping task wants access to that sort of information, it should do a `getItem` call from within its task logic where needed.
// FIXME: Is that actually still true post-refactor?
task.run({
data: item.data,
... api.exposed
});
}).then(() => {
return backend.topLevel.runInTransaction((tx) => {
// FIXME: use queue
});
}).then(async () => {
await api.internal.markTaskCompleted();
logStatus(task, chalk.bold.green, "completed", item.id);
return { status: "completed", item: item };
}).catch(async (error) => {
await api.internal.markTaskFailed(null, { error });
logStatus(task, chalk.bold.red, "failed", `${item.id}: ${error.stack}`);
return { status: "failed", item: item, error: error };
});
}
return function createTaskKernelStream(task) {
return pipe([
backend.topLevel.getTaskStream(null, { task: task }),
filter((item) => backend.forItem({ task: task, id: item.id }).internal.lock()),
map((item) => {
return Promise.try(() => {
return runTask(task, item);
}).tap(() => {
return backend.forItem({ task: task, id: item.id }).internal.unlock();
});
})
]);
};
};

@ -1,24 +0,0 @@
"use strict";
const Promise = require("bluebird");
module.exports = function asyncInterval(interval, callback) {
function doCycle() {
let startTime = Date.now();
return Promise.try(() => {
return callback();
}).then(() => {
// let elapsed = Date.now() - startTime;
let elapsed = 0; // HACK: Temporary way to force that the full interval is always waited *between* operations
if (elapsed > interval) {
return doCycle();
} else {
return Promise.delay(interval - elapsed).then(doCycle);
}
});
}
return doCycle();
};

@ -5,8 +5,6 @@ const arrayOf = require("@validatem/array-of");
const isString = require("@validatem/is-string");
const isFunction = require("@validatem/is-function");
const isArray = require("@validatem/is-array");
const either = require("@validatem/either");
const isValue = require("@validatem/is-value");
const isPositiveInteger = require("./is-positive-integer");
@ -20,7 +18,7 @@ function makeRules(recurse) {
name: [ required, isString ],
version: [ required, isString ],
ttl: [ isPositiveInteger ],
parallelTasks: [ either([ isPositiveInteger, isValue(Infinity) ]) ],
parallelTasks: [ isPositiveInteger ],
taskInterval: [ isPositiveInteger ],
dependents: [ required, isTaskArray ],
dependencies: [ required, isTaskArray ],

@ -34,7 +34,7 @@ module.exports = {
value: [ required, {
ttl: [ isMS ],
taskInterval: [ isMS ],
parallelTasks: [ defaultTo(1), either([
parallelTasks: [ defaultTo, either([
[ isValue(Infinity) ],
[ isInteger, isPositive ]
])],

@ -1,3 +0,0 @@
- locks table: make locked status a field instead of based on existence
- rename locks table to queue table
- insert tasks into queue table whenever drained for a task

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save