Compare commits

...

19 Commits

Author SHA1 Message Date
Sven Slootweg 4163b7deb9 Fix config path resolution 1 year ago
Sven Slootweg 00685bb7e9 Do not store task result when item deletes itself 1 year ago
Sven Slootweg 8997a762ed Add missing index for aliases table 1 year ago
Sven Slootweg 96e368b432 Add missing index on tags table 1 year ago
Sven Slootweg 7c60c4fa6b Remove invalidation flag and update last-update timestamp on task completion, not on metadata update 1 year ago
Sven Slootweg 48e9f8372b Prioritize tasks that have never run at all 1 year ago
Sven Slootweg e40113a701 Track current queue sizes as a metric 1 year ago
Sven Slootweg 5b8e71c083 Don't automatically retry failures, and consider never-expiring results to always remain valid 1 year ago
Sven Slootweg 59b89a3459 Rewrite queue refill query to be much faster, fix duplicate tags 2 years ago
Sven Slootweg dbd15aa1d7 Update sync server to use refactored API 2 years ago
Sven Slootweg 6e172dd04d Make PostgreSQL pool size configurable 2 years ago
Sven Slootweg 919985bbd2 Improve empty-queue behaviour, reduce stampeding 2 years ago
Sven Slootweg 31742f8638 Pass task name into item metrics, not task object 2 years ago
Sven Slootweg f7cd69d7d0 Track locking metrics 2 years ago
Sven Slootweg e2f2fb6cb1 Remove obsolete handler 2 years ago
Sven Slootweg ea7cd67158 Fix race condition 2 years ago
Sven Slootweg b9b0e63454 Refactor to pre-generate task queue for performance, fix some refactoring errors, fix some model bugs 2 years ago
Sven Slootweg fb93e902a8 Merge branch 'backend-refactor' 2 years ago
Sven Slootweg 1e1a367cb2 Backend refactor: support pluggable backends 2 years ago

1
.gitignore vendored

@ -1,2 +1,3 @@
node_modules
junk
.clinic

@ -0,0 +1,21 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "node",
"request": "launch",
"name": "Launch Program",
"skipFiles": [
"<node_internals>/**"
],
"program": "./bin/simulate",
"args": ["../seekseek/scraper-config/", "lcsc:normalizeProduct", "lcsc:product:C494972"],
"env": {
"DEBUG": "srap:backend:postgresql:*"
}
}
]
}

@ -12,7 +12,7 @@ const chalk = require("chalk");
let argv = yargs.argv;
let [ configurationPath, task, item ] = argv._;
let absoluteConfigurationPath = path.join(process.cwd(), configurationPath);
let absoluteConfigurationPath = path.resolve(process.cwd(), configurationPath);
let configuration = require(absoluteConfigurationPath);
return Promise.try(() => {

@ -17,7 +17,7 @@ let configurationPath = argv._[0];
let listenHost = argv.listenHost ?? "127.0.0.1";
let listenPort = argv.listenPort ?? 3131;
let absoluteConfigurationPath = path.join(process.cwd(), configurationPath);
let absoluteConfigurationPath = path.resolve(process.cwd(), configurationPath);
let configuration = require(absoluteConfigurationPath);
return Promise.try(() => {

@ -31,32 +31,19 @@ const { testValue } = require("@validatem/core");
const matchesFormat = require("@validatem/matches-format");
const isString = require("@validatem/is-string");
const initialize = require("../src/initialize");
const errors = require("../src/errors");
const createKernel = require("../src/kernel");
let argv = yargs.argv;
let configurationPath = argv._[0];
let listenHost = argv.listenHost ?? "127.0.0.1";
let listenPort = argv.listenPort ?? 3000;
let absoluteConfigurationPath = path.join(process.cwd(), configurationPath);
let absoluteConfigurationPath = path.resolve(process.cwd(), configurationPath);
let configuration = require(absoluteConfigurationPath);
return Promise.try(() => {
// FIXME: Deduplicate this with kernel! Also other common wiring across binaries...
return initialize({
knexfile: {
client: "pg",
connection: configuration.database,
pool: { min: 0, max: 32 },
migrations: { tableName: "srap_knex_migrations" }
}
});
}).then((state) => {
let { db, knex } = state;
const queries = require("../src/queries")(state);
return createKernel(configuration);
}).then((kernel) => {
let app = express();
let router = expressPromiseRouter();
@ -125,7 +112,7 @@ return Promise.try(() => {
: undefined;
return pipe([
queries.getUpdates(knex, { prefix: req.query.prefix, timestamp: timestamp }),
kernel.getUpdates({ prefix: req.query.prefix, timestamp }),
map((item) => JSON.stringify(item) + "\n"),
fromNodeStream(res)
]).read();

@ -12,7 +12,7 @@ const chalk = require("chalk");
let argv = yargs.argv;
let [ configurationPath, task, item ] = argv._;
let absoluteConfigurationPath = path.join(process.cwd(), configurationPath);
let absoluteConfigurationPath = path.resolve(process.cwd(), configurationPath);
let configuration = require(absoluteConfigurationPath);
return Promise.try(() => {

@ -0,0 +1,17 @@
"use strict";
module.exports.up = function(knex, Promise) {
return knex.schema
.alterTable("srap_task_results", (table) => {
table.jsonb("metadata").notNullable().default({}).alter();
table.boolean("is_successful").nullable().alter();
});
};
module.exports.down = function(knex, Promise) {
return knex.schema
.alterTable("srap_task_results", (table) => {
table.jsonb("metadata").notNullable().alter();
table.boolean("is_successful").notNullable().alter();
});
};

@ -0,0 +1,19 @@
"use strict";
module.exports.up = function(knex, Promise) {
return knex.schema
.alterTable("srap_tasks_in_progress", (table) => {
table.timestamp("started_at").alter().nullable().defaultTo(null);
table.boolean("started").notNullable().defaultTo(false);
})
.renameTable("srap_tasks_in_progress", "srap_queue");
};
module.exports.down = function(knex, Promise) {
return knex.schema
.renameTable("srap_queue", "srap_tasks_in_progress")
.alterTable("srap_tasks_in_progress", (table) => {
table.timestamp("started_at").alter().notNullable().defaultTo(knex.fn.now());
table.dropColumn("started");
});
};

@ -0,0 +1,39 @@
"use strict";
module.exports.up = function(knex, Promise) {
// Get rid of existing duplicate entries
return knex.raw(`
DELETE FROM srap_tags
WHERE id IN (
SELECT
id
FROM (
SELECT
id,
row_number() OVER w as rnum
FROM srap_tags
WINDOW w AS (
PARTITION BY name, item_id
ORDER BY id
)
) t
WHERE t.rnum > 1);
`).then(() => {
return knex.schema
.alterTable("srap_tags", (table) => {
table.dropPrimary();
table.dropIndex("name");
table.dropColumn("id");
table.primary([ "name", "item_id" ]);
});
});
};
module.exports.down = function(knex, Promise) {
return knex.schema
.alterTable("srap_tags", (table) => {
table.dropPrimary();
table.bigIncrements("id").primary();
});
};

@ -0,0 +1,15 @@
"use strict";
module.exports.up = function(knex, Promise) {
return knex.schema
.alterTable("srap_tags", (table) => {
table.index("item_id");
});
};
module.exports.down = function(knex, Promise) {
return knex.schema
.alterTable("srap_tags", (table) => {
table.dropIndex("item_id");
});
};

@ -0,0 +1,15 @@
"use strict";
module.exports.up = function(knex, Promise) {
return knex.schema
.alterTable("srap_aliases", (table) => {
table.index("item_id");
});
};
module.exports.down = function(knex, Promise) {
return knex.schema
.alterTable("srap_aliases", (table) => {
table.dropIndex("item_id");
});
};

@ -61,7 +61,8 @@
},
"devDependencies": {
"@joepie91/eslint-config": "^1.1.0",
"eslint": "^7.21.0"
"eslint": "^7.21.0",
"clinic": "^12.0.0"
},
"bin": {
"srap-server": "./bin/server",

@ -1,8 +1,9 @@
"use strict";
const Promise = require("bluebird");
const unreachable = require("@joepie91/unreachable");
const { validateArguments } = require("@validatem/core");
const { validateArguments, validateOptions } = require("@validatem/core");
const required = require("@validatem/required");
const requireEither = require("@validatem/require-either");
const isString = require("@validatem/is-string");
@ -32,7 +33,7 @@ const isTaskObject = require("../validators/is-task-object");
// FIXME: Verify that all internal method calls in the PostgreSQL backend are still valid after moving argument validation/normalization into this module
module.exports = function (state) {
let { tasks } = state;
let { tasks, metrics } = state;
const backendModules = {
"postgresql": require("./postgresql")(state),
@ -68,6 +69,14 @@ module.exports = function (state) {
return backend.shutdown();
},
getQueueSize: function (_tx) {
let [ tx ] = validateArguments(arguments, {
tx: maybeTX
});
return backend.getQueueSize(tx);
},
runInTransaction: function (_tx, _callback) {
let [ tx, callback ] = validateArguments(arguments, {
tx: maybeTX,
@ -76,6 +85,7 @@ module.exports = function (state) {
return backend.runInTransaction(tx, callback);
},
countLockedTasks: function (_tx) {
let [ tx ] = validateArguments(arguments, {
tx: maybeTX
@ -111,8 +121,14 @@ module.exports = function (state) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, {
// FIXME: Stricter validation
seeds: [ required, arrayOf(anything) ]
// FIXME: This currently duplicates validation logic from forItem.storeItem; figure out a way to deduplicate that
seeds: [ required, arrayOf({
id: [ required, isString ],
// Tags are required to be specified (even if an empty array) because it's easily forgotten
tags: [ required, arrayOf(isString) ],
aliases: [ defaultTo([]), arrayOf(isString) ],
data: [ required, anything ], // FIXME: Check for object
}) ]
}]
});
@ -126,23 +142,238 @@ module.exports = function (state) {
failIfExists: false
});
});
}
},
// FIXME: Other than the missing readOperation wrapper and the tx argument, this is *basically* the same logic as under forItem... this should be simplified somehow.
getItem: function (_tx, _options) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, wrapValueAsOption("id"), {
id: [ required, isString ],
optional: [ defaultTo(false), isBoolean ] // FIXME: Can this handling be moved to the wrapper?
}]
});
return backend.getItem(tx, options);
},
},
forItem: function ({ item, task, mutationQueue }) {
forItem: function (_options) {
// FIXME: Proper validation rules here for the other fields as well
let { item, task, mutationQueue, readTX, simulate, onDeleteSelf } = validateOptions(arguments, {
item: anything,
task: [ required, isTask ],
mutationQueue: anything,
readTX: maybeTX,
simulate: anything,
onDeleteSelf: isFunction
});
// We create a new instance of the actual API for every item being processed. This is necessary because some of the input arguments will default to item-specific values, and some of the logic is dependent on task-specific metadata. This is a more efficient (and understandable) approach than pretending the API is stateless and then separately wrapping the API *again* for every individual item with a whole separate layer of input validation rules.
// FIXME: Is this still correct, with the new task (graph) format?
let dependentTaskNames = new Set(task.dependents.map((dependent) => dependent.task));
function mutableOperation(func) {
if (simulate) {
return func(backend);
if (simulate === true) {
return func(readTX, backend);
} else if (mutationQueue != null) {
mutationQueue.push(func);
} else {
unreachable("No mutation queue provided in live mode");
}
}
function readOperation(func) {
return func(readTX, backend);
}
let exposedAPI = {
// NOTE: 'exposed' API methods are the ones that are passed into a user-defined task, and which the task uses to eg. update or create new items
getItem: function (_options) {
let [ options ] = validateArguments(arguments, {
options: [ required, wrapValueAsOption("id"), {
id: [ required, isString ],
optional: [ defaultTo(false), isBoolean ] // FIXME: Can this handling be moved to the wrapper?
}]
});
return readOperation((tx) => {
return backend.getItem(tx, options);
});
},
storeItem: function (_options) {
// NOTE: Using `update` instead of `data` makes it an upsert!
// FIXME: Add an 'expire' flag for expiring any existing task results for this item? To trigger re-evaluation on updates
let [ options ] = validateArguments(arguments, {
options: [ required, {
id: [ required, isString ],
// Tags are required to be specified (even if an empty array) because it's easily forgotten
tags: [ required, arrayOf(isString) ],
aliases: [ defaultTo([]), arrayOf(isString) ],
data: [ anything ], // FIXME: Check for object
update: [ isFunction ],
failIfExists: [ defaultTo(false), isBoolean ],
allowUpsert: [ defaultTo(true), isBoolean ],
parentID: [ defaultTo(item.id), isString ]
}, requireEither([ "data", "update" ]) ]
});
let { data, ... rest } = options;
return mutableOperation((tx) => {
return backend.storeItem(tx, {
... rest,
// We normalize `data` and `update` (which are mutually-exclusive) into a single option here, so that the backend only needs to deal with the `update` case
// TODO: Can this be folded into the validation rules in a reasonable and readable way?
update: (data != null)
? (existingData) => ({ ... existingData, ... data })
: rest.update
});
});
},
moveItem: function (_options) {
let [ options ] = validateArguments(arguments, {
options: [ required, wrapValueAsOption("into"), {
from: [ defaultTo(item.id), isString ],
into: [ required, isString ],
// NOTE: If no `merge` function is specified, that indicates that merging is not allowed (ie. this is strictly a rename), and mergeMetadata is ignored too
merge: [ isFunction ],
mergeMetadata: [ defaultTo({}), anyProperty({
key: [ required ],
value: [ required, isFunction ]
})],
}]
});
return mutableOperation((tx) => {
return backend.moveItem(tx, { ... options, allowMerge: (options.merge != null) });
});
},
deleteItem: function (_options) {
let [ options ] = validateArguments(arguments, {
options: [
defaultTo({}),
wrapValueAsOption("id"), {
id: [ defaultTo(item.id), isString ]
}
]
});
if (options.id === item.id) {
// This hook is necessary to allow the task kernel to skip certain operations in this case, eg. storing the task result - it would be redundant, and reference a now non-existent item.
onDeleteSelf();
}
return mutableOperation((tx) => {
return backend.deleteItem(tx, options);
});
},
createAlias: function (_options) {
let [ options ] = validateArguments(arguments, {
options: [ required, wrapValueAsOption("from"), {
from: [ required, isString ],
to: [ defaultTo(item.id), isString ],
failIfExists: [ defaultTo(false), isBoolean ] // TODO: Shouldn't this default to true, for any occurrence outside of a merge/rename?
}]
});
return mutableOperation((tx) => {
return backend.createAlias(tx, options);
});
},
deleteAlias: function (_options) {
let [ options ] = validateArguments(arguments, {
options: [ required, wrapValueAsOption("from"), {
from: [ required, isString ]
}]
});
return mutableOperation((tx) => {
return backend.deleteAlias(tx, options);
});
},
updateData: function (_options) {
// NOTE: This is a semantically self-describing convenience wrapper for `storeItem` that updates the currently-being-processed item
// TODO: Have a dedicated alias and/or signature (for this function) for the common case of "just add a few attributes to whatever is already there"? ie. a shallow merge
let [ options ] = validateArguments(arguments, {
options: [ required, wrapValueAsOption("update"), {
id: [ defaultTo(item.id), isString ],
update: [ required, isFunction ]
}]
});
return exposedAPI.storeItem({
... options,
tags: []
});
},
updateMetadata: function (_options) {
let [ options ] = validateArguments(arguments, {
options: [ required, wrapValueAsOption("update"), {
id: [ defaultTo(item.id), isString ],
update: [ required, isFunction ],
task: [ required, isTask ]
}]
});
return mutableOperation((tx) => {
return backend.updateMetadata(tx, options);
});
},
expire: function (_options) {
// TODO: It probably doesn't make any semantic sense to leave *both* arguments unspecified. Maybe that should be prohibited via eg. a non-exclusive requireEither? Otherwise the user might expect to immediately expire the *current* task, but since the task is only updated *after* the task logic runs, that is not currently possible to express.
let [ options ] = validateArguments(arguments, {
options: [ required, {
id: [ defaultTo(item.id), isString ],
isTask: [ defaultTo(task), isTask ]
}]
});
return mutableOperation((tx) => {
return backend.expire(tx, options);
});
},
expireDependents: function (_options) {
// NOTE: This method does not have a counterpart in the database backend; it's a convenience abstraction over regular `backend.expire` calls
let [{ id, dependents }] = validateArguments(arguments, {
options: [ defaultTo({}), wrapValueAsOption("dependents"), {
id: [ defaultTo(item.id), isString ],
dependents: [ defaultTo([]), arrayOf(isString), (dependents) => {
// Only consider dependents that actually exist for this task
return dependents.filter((dependent) => dependentTaskNames.has(dependent));
}]
}]
});
// NOTE: This works even with queueing, because each this.expire call just internally queues another operation
return Promise.map(dependents, (dependent) => {
return exposedAPI.expire({
id: id,
taskName: dependent
});
});
},
// Temporary compatibility aliases
createItem: (... args) => exposedAPI.storeItem(... args),
mergeItem: (... args) => exposedAPI.moveItem(... args),
renameItem: (options) => {
if (typeof options === "string") {
return exposedAPI.moveItem(options);
} else {
return exposedAPI.moveItem({ into: options.to, from: options.from });
}
},
};
return {
// NOTE: 'internal' API methods are accessible to srap, but not to user-defined tasks.
@ -156,7 +387,15 @@ module.exports = function (state) {
}]
});
return backend.lock(tx, options);
return Promise.try(() => {
return backend.lock(tx, options);
}).tap((succeeded) => {
if (succeeded) {
metrics.successfulLocks.labels({ task: task.name }).inc(1);
} else {
metrics.failedLocks.labels({ task: task.name }).inc(1);
}
});
},
unlock: function (_tx, _options) {
@ -203,196 +442,7 @@ module.exports = function (state) {
});
},
},
exposed: {
// NOTE: 'exposed' API methods are the ones that are passed into a user-defined task, and which the task uses to eg. update or create new items
getItem: function (_tx, _id, _optional) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, wrapValueAsOption("id"), {
id: [ required, isString ],
optional: [ defaultTo(false), isBoolean ] // FIXME: Can this handling be moved to the wrapper?
}]
});
return backend.getItem(tx, options);
},
storeItem: function (_tx, _options) {
// NOTE: Using `update` instead of `data` makes it an upsert!
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, {
id: [ required, isString ],
// Tags are required to be specified (even if an empty array) because it's easily forgotten
tags: [ required, arrayOf(isString) ],
aliases: [ defaultTo([]), arrayOf(isString) ],
data: [ anything ], // FIXME: Check for object
update: [ isFunction ],
failIfExists: [ defaultTo(false), isBoolean ],
allowUpsert: [ defaultTo(true), isBoolean ],
parentID: [ defaultTo(item.id), isString ]
}, requireEither([ "data", "update" ]) ]
});
let { data, ... rest } = options;
return mutableOperation((backend) => {
return backend.storeItem(tx, {
... rest,
// We normalize `data` and `update` (which are mutually-exclusive) into a single option here, so that the backend only needs to deal with the `update` case
// TODO: Can this be folded into the validation rules in a reasonable and readable way?
update: (data != null)
? (existingData) => ({ ... existingData, ... data })
: rest.update
});
});
},
moveItem: function (_tx, _options) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, wrapValueAsOption("into"), {
from: [ defaultTo(item.id), isString ],
into: [ required, isString ],
// NOTE: If no `merge` function is specified, that indicates that merging is not allowed (ie. this is strictly a rename), and mergeMetadata is ignored too
merge: [ isFunction ],
mergeMetadata: [ defaultTo({}), anyProperty({
key: [ required ],
value: [ required, isFunction ]
})],
}]
});
return mutableOperation((backend) => {
return backend.moveItem(tx, { options, allowMerge: (options.merge != null) });
});
},
deleteItem: function (_tx, _options) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [
defaultTo({}),
wrapValueAsOption("id"), {
id: [ defaultTo(item.id), isString ]
}
]
});
return mutableOperation((backend) => {
return backend.deleteItem(tx, options);
});
},
createAlias: function (_tx, _options) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, wrapValueAsOption("from"), {
from: [ required, isString ],
to: [ defaultTo(item.id), isString ],
failIfExists: [ defaultTo(false), isBoolean ] // TODO: Shouldn't this default to true, for any occurrence outside of a merge/rename?
}]
});
return mutableOperation((backend) => {
return backend.createAlias(tx, options);
});
},
deleteAlias: function (_tx, _options) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, wrapValueAsOption("from"), {
from: [ required, isString ]
}]
});
return mutableOperation((backend) => {
return backend.deleteAlias(tx, options);
});
},
updateData: function (_tx, _options) {
// NOTE: This is a semantically self-describing convenience wrapper for `createItem` that updates the currently-being-processed item
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, wrapValueAsOption("update"), {
id: [ defaultTo(item.id), isString ],
update: [ required, isFunction ]
}]
});
return mutableOperation((backend) => {
return backend.createItem(tx, {
... options,
tags: []
});
});
},
updateMetadata: function (_tx, _options) {
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, wrapValueAsOption("update"), {
id: [ defaultTo(item.id), isString ],
update: [ required, isFunction ],
task: [ required, isTask ]
}]
});
return mutableOperation((backend) => {
return backend.updateMetadata(tx, options);
});
},
expire: function (_tx, _options) {
// TODO: It probably doesn't make any semantic sense to leave *both* arguments unspecified. Maybe that should be prohibited via eg. a non-exclusive requireEither? Otherwise the user might expect to immediately expire the *current* task, but since the task is only updated *after* the task logic runs, that is not currently possible to express.
let [ tx, options ] = validateArguments(arguments, {
tx: maybeTX,
options: [ required, {
id: [ defaultTo(item.id), isString ],
isTask: [ defaultTo(task), isTask ]
}]
});
return mutableOperation((backend) => {
return backend.expire(tx, options);
});
},
expireDependents: function (_tx, _options) {
// NOTE: This method does not have a counterpart in the database backend; it's a convenience abstraction over regular `backend.expire` calls
let [ tx, { id, dependents }] = validateArguments(arguments, {
tx: maybeTX,
options: [ defaultTo({}), wrapValueAsOption("dependents"), {
id: [ defaultTo(item.id), isString ],
dependents: [ defaultTo([]), arrayOf(isString), (dependents) => {
// Only consider dependents that actually exist for this task
return dependents.filter((dependent) => dependentTaskNames.has(dependent));
}]
}]
});
// FIXME: This doesn't work with the synchronous queueing model
return Promise.map(dependents, (dependent) => {
return this.expire(tx, {
id: id,
taskName: dependent
});
});
},
// Temporary compatibility aliases
createItem: (... args) => this.storeItem(... args),
mergeItem: (... args) => this.moveItem(... args),
renameItem: (tx, options) => {
if (typeof options === "string") {
return this.moveItem(tx, options);
} else {
return this.moveItem(tx, { into: options.to, from: options.from });
}
},
}
exposed: exposedAPI
};
}
};

@ -8,9 +8,11 @@ const knexLibrary = require("knex");
const { knexSnakeCaseMappers } = require("objection");
const { addMilliseconds } = require("date-fns");
const ValidationError = require("@validatem/error");
const debug = require("debug")("srap:backend:postgresql:queries");
const models = require("./models");
const mergeItems = require("../../semantics/merge-items");
const syncpipe = require("syncpipe");
let migrationsFolder = path.join(__dirname, "../../../migrations");
@ -22,12 +24,13 @@ module.exports = function(state) {
return {
defaultSettings: {
taskBatchSize: 1000,
taskBatchDelay: 30 * 1000
taskBatchDelay: 5 * 60 * 1000, // FIXME: Rename to reflect changed behaviour?
delayRandomization: 0.2
},
create: function createPostgreSQLBackend(options) {
let knex = knexLibrary({
client: "pg",
pool: { min: 0, max: 32 },
pool: { min: 0, max: 32, ... defaultValue(options.pool, {}) },
migrations: { tableName: "srap_knex_migrations" },
connection: options,
... knexSnakeCaseMappers()
@ -38,9 +41,10 @@ module.exports = function(state) {
directory: migrationsFolder
});
}).then(() => {
let db = models({ ... state, knex: knex });
let queryState = { ... state, knex, db };
// TODO: Does it really make sense to be merging in the backendSettings here? Shouldn't that happen automatically in some way for *every* backend, rather than just the PostgreSQL one specifically? As backend settings are a generic backend feature
state = { ... state, knex: knex, backendSettings: options};
let db = models(state);
state = { ... state, db: db };
// TODO: Should this be inlined instead?
function repointAliases (tx, { to, from }) {
@ -83,15 +87,33 @@ module.exports = function(state) {
},
lock: function (tx, { id, task }) {
return Promise.try(() => {
return db.TaskInProgress.query(tx).insert({
task: task.name,
item_id: id
return this.runInTransaction(tx, (tx) => {
return Promise.try(() => {
return db.TaskInProgress.query(tx).forUpdate().findById([ task.name, id ]);
}).then((item) => {
if (item != null) {
debug(`Task '${task.name}:${id}' currently locked: ${item.started}`);
if (item.started === false) {
return Promise.try(() => {
return db.TaskInProgress.query(tx)
.findById([ task.name, id ])
.update({
started: true,
started_at: knex.fn.now()
});
}).then(() => {
debug(`Task '${task.name}:${id}' locked successfully`);
return true;
});
} else {
debug(`Task '${task.name}:${id}' failed lock because already locked on read`);
return false;
}
} else {
debug(`Task '${task.name}:${id}' failed lock because it no longer exists in queue`);
return false;
}
});
}).then(() => {
return true;
}).catch({ name: "UniqueViolationError" }, () => {
return false;
});
},
@ -104,7 +126,7 @@ module.exports = function(state) {
});
// TODO: return true/false depending on whether unlocking succeeded?
},
getItem: function (tx, { id, optional }) {
return Promise.try(() => {
return db.Alias.relatedQuery("item", tx)
@ -122,7 +144,7 @@ module.exports = function(state) {
storeItem: function (tx, { id, tags, aliases, update, failIfExists, allowUpsert, parentID }) {
// FIXME: Make failIfExists actually work, currently it does nothing as the UNIQUE constraint violation cannot occur for an upsert
// TODO: Ensure that we run the transaction in full isolation mode, and retry in case of a conflict
return Promise.try(() => {
// NOTE: We look up by alias, since this is an upsert - and so if the specified ID already exists as an alias, we should update the existing item instead of creating a new one with the specified (aliased) ID
return db.Alias
@ -146,7 +168,7 @@ module.exports = function(state) {
id: actualID,
data: update(existingData),
createdBy: parentID,
tags: tags.map((tag) => ({ name: tag })),
tags: tags.map((tag) => ({ name: tag, itemId: actualID })),
aliases: allAliases.map((alias) => ({ alias: alias })),
updatedAt: new Date()
};
@ -261,19 +283,12 @@ module.exports = function(state) {
return Promise.try(() => {
return getTaskResult(tx, task, id);
}).then((taskResult) => {
let sharedFields = {
isInvalidated: false,
updatedAt: new Date()
};
if (taskResult != null) {
return taskResult.$query(tx).patch({
... sharedFields,
metadata: update(taskResult.metadata),
});
} else {
return db.TaskResult.query(tx).insert({
... sharedFields,
task: task.name,
itemId: id,
metadata: update({})
@ -311,7 +326,9 @@ module.exports = function(state) {
}).then((taskResult) => {
let sharedFields = {
isSuccessful: isSuccessful,
isInvalidated: false,
taskVersion: task.version,
updatedAt: new Date(),
expiresAt: (task.ttl != null)
? addMilliseconds(new Date(), task.ttl)
: undefined
@ -337,14 +354,25 @@ module.exports = function(state) {
countLockedTasks: function (tx) {
return Promise.try(() => {
return db.TaskInProgress.query(tx).count({ count: "*" });
return db.TaskInProgress.query(tx).count({ count: "*" }).where({ started: true });
}).then((result) => {
return result[0].count;
});
},
getQueueSize: function (tx) {
return Promise.try(() => {
return tx.raw(`SELECT task, COUNT(*) FROM srap_queue WHERE started = false GROUP BY task;`);
}).then((result) => {
return syncpipe(result.rows, [
_ => _.map(({ task, count }) => [ task, parseInt(count) ]),
_ => Object.fromEntries(_)
]);
});
},
getUpdateStream: require("./queries/get-update-stream")(queryState),
getTaskStream: require("./queries/get-task-stream")(queryState)
getUpdateStream: require("./queries/get-update-stream")(state),
getTaskStream: require("./queries/get-task-stream")(state)
};
});
}

@ -26,7 +26,7 @@ module.exports = function ({ db }) {
tasksInProgress: {
relation: Model.HasManyRelation,
modelClass: db.TaskInProgress,
join: { from: "srap_items.id", to: "srap_tasksInProgress.itemId" }
join: { from: "srap_items.id", to: "srap_queue.itemId" }
},
failedTasks: {
// Not actually a many-to-many, but that's what objection calls a HasManyThrough...
@ -35,7 +35,7 @@ module.exports = function ({ db }) {
modelClass: db.Failure,
join: {
from: "srap_items.id",
through: { from: "srap_task_results.itemId", to: "srap_task_results.id" },
through: { from: "srap_taskResults.itemId", to: "srap_taskResults.id" },
to: "srap_failures.taskResultId"
}
}

@ -5,13 +5,14 @@ const { Model, QueryBuilder } = require("objection");
module.exports = function ({ db }) {
return class Tag extends Model {
static tableName = "srap_tags";
static idColumn = [ "name", "itemId" ];
static get relationMappings() {
return {
item: {
relation: Model.BelongsToOneRelation,
modelClass: db.Item,
join: { from: "srap_tags.itemId", to: "srap_item.id" }
join: { from: "srap_tags.itemId", to: "srap_items.id" }
}
};
};

@ -4,7 +4,7 @@ const { Model } = require("objection");
module.exports = function ({ db }) {
return class TaskInProgress extends Model {
static tableName = "srap_tasksInProgress";
static tableName = "srap_queue";
static idColumn = [ "task", "itemId" ];
static get relationMappings() {
@ -12,7 +12,7 @@ module.exports = function ({ db }) {
item: {
relation: Model.BelongsToOneRelation,
modelClass: db.Item,
join: { from: "srap_tasksInProgress.itemId", to: "srap_item.id" }
join: { from: "srap_queue.itemId", to: "srap_items.id" }
}
};
};

@ -12,7 +12,7 @@ module.exports = function ({ db }) {
item: {
relation: Model.BelongsToOneRelation,
modelClass: db.Item,
join: { from: "srap_taskResults.itemId", to: "srap_item.id" }
join: { from: "srap_taskResults.itemId", to: "srap_items.id" }
}
};
};

@ -1,106 +1,182 @@
"use strict";
const Promise = require("bluebird");
const debug = require("debug")("srap:backend:postgresql:query:get-task-stream");
const buffer = require("@promistream/buffer");
const pipe = require("@promistream/pipe");
const simpleSource = require("@promistream/simple-source");
const query = `
WITH
dependency_tasks AS (
SELECT * FROM
json_to_recordset(:dependencyTaskDefinitions) AS x(task text, task_version text)
),
matching_items AS (
SELECT
DISTINCT ON (srap_items.id)
srap_items.*,
results.updated_at AS result_date,
results.task_version,
(
results.is_successful = TRUE
AND (
results.expires_at < NOW()
OR results.is_invalidated = TRUE
)
) AS is_candidate
FROM srap_items
INNER JOIN srap_tags
ON srap_tags.item_id = srap_items.id
AND srap_tags.name = ANY(:tags)
LEFT JOIN srap_task_results AS results
ON results.item_id = srap_items.id
AND results.task = :task
WHERE
NOT EXISTS (
SELECT FROM srap_tasks_in_progress AS pr WHERE pr.item_id = srap_items.id
)
),
candidates AS (
SELECT * FROM matching_items
WHERE result_date IS NULL
UNION
SELECT * FROM matching_items
WHERE is_candidate = TRUE
OR NOT (task_version = :taskVersion)
)
(
SELECT
*
FROM
candidates
WHERE
NOT EXISTS (
SELECT
results.*
FROM dependency_tasks
const fetchQuery = `
SELECT
srap_items.*
FROM
srap_queue
LEFT JOIN srap_items
ON srap_items.id = srap_queue.item_id
WHERE
srap_queue.task = :task
AND srap_queue.started = FALSE
LIMIT :resultLimit
`;
function makeFillQuery(withDependencies) {
return `
WITH
${withDependencies ? `
dependencies AS (
SELECT * FROM json_to_recordset(:dependencyTaskDefinitions) AS x(task text, task_version text)
),
satisfied AS (
SELECT results.* FROM dependencies
LEFT JOIN srap_task_results AS results
ON dependency_tasks.task = results.task
AND dependency_tasks.task_version = results.task_version
AND results.item_id = candidates.id
WHERE
results.is_successful IS NULL
OR results.is_successful = FALSE
OR (
ON dependencies.task = results.task
AND dependencies.task_version = results.task_version
WHERE
results.is_successful = TRUE
AND results.is_invalidated = FALSE
AND (
results.expires_at < NOW()
OR results.is_invalidated = TRUE
results.expires_at > NOW()
OR results.expires_at IS NULL
)
)
),
counts AS (
SELECT item_id, COUNT(task) AS count FROM satisfied GROUP BY item_id
),
dependency_candidates AS (
SELECT item_id FROM counts WHERE count = :dependencyCount
),
` : "" }
tag_candidates AS (
SELECT item_id FROM srap_tags WHERE name = ANY(:tags)
),
full_candidates AS MATERIALIZED (
${withDependencies
? `
SELECT tag_candidates.item_id FROM dependency_candidates
INNER JOIN tag_candidates
ON dependency_candidates.item_id = tag_candidates.item_id
`
: `
SELECT item_id FROM tag_candidates
`
}
),
tasks AS NOT MATERIALIZED (
SELECT
item_id,
is_successful,
(
results.is_successful = FALSE
OR (
results.is_successful = TRUE
AND results.is_invalidated = FALSE
AND (
results.expires_at > NOW()
OR results.expires_at IS NULL
)
)
) AS is_completed
FROM srap_task_results AS results
WHERE
results.task = :task
AND results.task_version = :taskVersion
)
) LIMIT :resultLimit;
`;
SELECT
:task AS task,
full_candidates.item_id
FROM full_candidates
LEFT JOIN tasks ON full_candidates.item_id = tasks.item_id
WHERE tasks.is_completed IS NOT TRUE
ORDER BY tasks.is_successful NULLS FIRST
`;
}
const fillQueryWithDependencies = makeFillQuery(true);
const fillQueryWithoutDependencies = makeFillQuery(false);
module.exports = function ({ metrics, backendSettings }) {
module.exports = function ({ metrics, backendSettings, knex }) {
return function (tx, { task }) {
return simpleSource(() => {
let startTime = Date.now();
return Promise.try(() => {
return tx.raw(query, {
tags: task.tags,
task: task.name,
taskVersion: task.version,
resultLimit: backendSettings.taskBatchSize,
let hasDependencies = (task.dependencies.length > 0);
let refillParameters = {
tags: task.tags,
task: task.name,
taskVersion: task.version,
... hasDependencies
? {
dependencyCount: task.dependencies.length,
dependencyTaskDefinitions: JSON.stringify(task.dependencies.map((dependency) => {
// Case-mapping for SQL compatibility
return { task_version: dependency.version, task: dependency.name };
}))
});
}).then((result) => {
let timeElapsed = Date.now() - startTime;
metrics.taskFetchTime.labels({ task: task.name }).set(timeElapsed / 1000);
metrics.taskFetchResults.labels({ task: task.name }).set(result.rowCount);
debug(`Task retrieval query for '${task.name}' took ${timeElapsed}ms and produced ${result.rowCount} results`);
if (result.rowCount > 0) {
return result.rows;
} else {
// TODO: Consider using LISTEN/NOTIFY instead?
return Promise.resolve([]).delay(backendSettings.taskBatchDelay);
}
: {}
};
let fetchParameters = {
task: task.name,
resultLimit: backendSettings.taskBatchSize
// resultLimit: 1 // For tracking down race conditions
};
function refillQueue() {
let startTime = Date.now();
return Promise.try(() => {
let fillQuery = (hasDependencies)
? fillQueryWithDependencies
: fillQueryWithoutDependencies;
// NOTE: We are deliberately bypassing the transaction here! Also deliberately not using VALUES, since we're inserting from the results of another query instead
return knex.raw(`
INSERT INTO srap_queue (task, item_id)
(${fillQuery})
ON CONFLICT (task, item_id) DO NOTHING;
`, refillParameters);
}).then((response) => {
let timeElapsed = Date.now() - startTime;
metrics.taskRefillTime.labels({ task: task.name }).set(timeElapsed / 1000);
metrics.taskRefillResults.labels({ task: task.name }).set(response.rowCount);
debug(`Queue for '${task.name}' was refilled with ${response.rowCount} items in ${timeElapsed}ms`);
return response.rowCount;
});
});
}
return pipe([
simpleSource(() => {
let startTime = Date.now();
return Promise.try(() => {
return tx.raw(fetchQuery, fetchParameters);
}).then((result) => {
let timeElapsed = Date.now() - startTime;
metrics.taskFetchTime.labels({ task: task.name }).set(timeElapsed / 1000);
metrics.taskFetchResults.labels({ task: task.name }).set(result.rowCount);
debug(`Task retrieval query for '${task.name}' took ${timeElapsed}ms and produced ${result.rowCount} results`);
if (result.rowCount > 0) {
return result.rows;
} else {
return Promise.try(() => {
return refillQueue();
}).then((newItems) => {
if (newItems === 0) {
// TODO: Consider using LISTEN/NOTIFY instead? Worth the added complexity?
let randomization = Math.random() * backendSettings.delayRandomization * backendSettings.taskBatchDelay; // To prevent stampeding by low-throughput tasks
return Promise.resolve([]).delay(backendSettings.taskBatchDelay + randomization);
} else {
// Have another go right away
return [];
}
});
}
});
}),
buffer()
]);
};
}
};

@ -7,7 +7,7 @@ const combineSequentialStreaming = require("@promistream/combine-sequential-stre
const fromIterable = require("@promistream/from-iterable");
const fromNodeStream = require("@promistream/from-node-stream");
const createTypeTaggingStream = require("./streams/tag-type");
const createTypeTaggingStream = require("../../../streams/tag-type");
module.exports = function ({ db, knex }) {
return function (tx, { timestamp, prefix }) {

@ -1,179 +1,183 @@
"use strict";
const Promise = require("bluebird");
const chalk = require("chalk");
const mergeItems = require("../../semantics/merge-items");
function printTX(tx) {
// TODO: Print entire chain?
return `[tx ${tx.__txID}]`;
return chalk.bold.yellow(`[tx ${tx.__txID ?? "?"}]`);
}
function printItem(id, task) {
if (task != null) {
return `[${id}:${task.name}]`;
return chalk.bold.white(`[${id}][${task.name}]`);
} else {
return `[${id}]`;
return chalk.bold.white(`[${id}]`);
}
}
// FIXME: Move logs to logging hook
function logSimulated(... args) {
console.log(... args);
console.log(chalk.gray(args[0]), ... args.slice(1));
}
module.exports = function (state) {
// NOTE: The simulated backend needs access to the 'real' backend; a task may eg. mutate an item based on its current data, and we'd need to read that from the real data source. The only constraint is that the simulated backend cannot *mutate* anything in the real backend, but reading is fine!
return function attachSimulatedBackend({ backend }) {
return {
defaultSettings: {},
create: function createSimulatedBackend(_options) {
let txCounter = 0;
let locks = new Map(); // Map<task, Set<id>>
return {
shutdown: function () {
return backend.shutdown();
},
getDefaultTransaction: function () {
return { __txID: null };
},
isTransaction: function (value) {
return ("__txID" in value);
},
runInTransaction: function (tx, callback) {
let newTransaction = { __txID: txCounter++, __parentTX: tx };
return callback(newTransaction);
},
lock: function (tx, { id, task }) {
if (!locks.has(task)) {
locks.set(task, new Set());
}
return {
defaultSettings: {},
create: function createSimulatedBackend({ backend }) {
let txCounter = 0;
let locks = new Map(); // Map<task, Set<id>>
return {
shutdown: function () {
return backend.shutdown();
},
getDefaultTransaction: function () {
return { __txID: null };
},
isTransaction: function (value) {
return ("__txID" in value);
},
runInTransaction: function (tx, callback) {
let newTransaction = { __txID: txCounter++, __parentTX: tx };
return callback(newTransaction);
},
lock: function (tx, { id, task }) {
if (!locks.has(task)) {
locks.set(task, new Set());
}
let taskLocks = locks.get(task);
let taskLocks = locks.get(task);
if (taskLocks.has(id)) {
logSimulated(`${printTX(tx)} Already locked: ${printItem(id, task)}`);
return false;
} else {
logSimulated(`${printTX(tx)} Locking ${printItem(id, task)}`);
taskLocks.add(id);
return true;
}
},
unlock: function (tx, { id, task }) {
logSimulated(`${printTX(tx)} Unlocking ${printItem(id, task)}`);
locks.get(task).delete(id);
},
getItem: function (tx, options) {
return backend.getItem(backend.getDefaultTransaction(), options);
},
storeItem: function (tx, { id, parentID, update, tags, aliases, allowUpsert }) {
return Promise.try(() => {
return this.getItem(tx, { id: id, optional: true });
}).then((currentItem) => {
let actualID = currentItem.id ?? id;
let newItem = {
id: actualID,
data: (currentItem != null)
? update(currentItem.data)
: update({}),
createdBy: parentID,
tags: tags,
aliases: aliases.concat([ actualID ]),
updatedAt: new Date()
if (taskLocks.has(id)) {
logSimulated(`${printTX(tx)} Already locked: ${printItem(id, task)}`);
return false;
} else {
logSimulated(`${printTX(tx)} Locking ${printItem(id, task)}`);
taskLocks.add(id);
return true;
}
},
unlock: function (tx, { id, task }) {
logSimulated(`${printTX(tx)} Unlocking ${printItem(id, task)}`);
locks.get(task).delete(id);
},
getItem: function (tx, options) {
return backend.getItem(backend.getDefaultTransaction(), options);
},
storeItem: function (tx, { id, parentID, update, tags, aliases, allowUpsert }) {
return Promise.try(() => {
return this.getItem(tx, { id: id, optional: true });
}).then((currentItem) => {
let actualID = (currentItem != null)
? currentItem.id
: id;
let newItem = {
id: actualID,
data: (currentItem != null)
? update(currentItem.data)
: update({}),
createdBy: parentID,
tags: tags,
aliases: aliases.concat([ actualID ]),
updatedAt: new Date()
};
logSimulated(`${printTX(tx)} ${printItem(id)} Storing item (upsert=${allowUpsert})`, newItem);
});
},
moveItem: function (tx, { from, into, merge, mergeMetadata, allowMerge }) {
return Promise.all([
this.getItem(tx, { id: from, optional: true }),
this.getItem(tx, { id: into, optional: true }),
]).then(([ fromObj, maybeIntoObj ]) => {
if (fromObj != null) {
let intoObj = maybeIntoObj ?? {
id: into,
data: {},
taskResults: []
};
logSimulated(`${printTX(tx)} ${printItem(id)} Storing item (upsert=${allowUpsert})`, newItem);
});
},
moveItem: function (tx, { from, into, merge, mergeMetadata, allowMerge }) {
return Promise.all([
this.getItem(tx, { id: from, optional: true }),
this.getItem(tx, { id: into, optional: true }),
]).then((fromObj, maybeIntoObj) => {
if (fromObj != null) {
let intoObj = maybeIntoObj ?? {
id: into,
data: {},
taskResults: []
};
if (allowMerge) {
let newItem = mergeItems({ fromObj, intoObj, merge, mergeMetadata });
logSimulated(`${printTX(tx)} ${printItem(from)} Moving item to ${printItem[into]} (merge=true)`, newItem);
} else {
logSimulated(`${printTX(tx)} ${printItem(from)} Moving item to ${printItem[into]} (merge=false)`);
}
if (allowMerge) {
let newItem = mergeItems({ fromObj, intoObj, merge, mergeMetadata });
logSimulated(`${printTX(tx)} ${printItem(from)} Moving item to ${printItem[into]} (merge=true)`, newItem);
} else {
logSimulated(`${printTX(tx)} ${printItem(from)} Moving item to ${printItem[into]} (merge=false)`);
}
});
},
deleteItem: function (tx, { id }) {
logSimulated(`${printTX(tx)} ${printItem(id)} Deleting item`);
},
createAlias: function (tx, { from, to }) {
logSimulated(`${printTX(tx)} ${printItem(to)} Creating alias ${from}`);
},
deleteAlias: function (tx, { from }) {
logSimulated(`${printTX(tx)} ${printItem(from)} Deleting alias`);
},
updateMetadata: function (tx, { id, task, update }) {
return Promise.try(() => {
return this.getItem(tx, { id, optional: true });
}).then((item) => {
let taskResult = (item != null)
? item.taskResults.find((result) => result.task === task)
: undefined;
let existingData = (taskResult != null)
? taskResult.metadata
: {};
let newData = update(existingData);
logSimulated(`${printTX(tx)} ${printItem(id, task)} Storing metadata`, newData);
});
},
expire: function (tx, { id, task }) {
logSimulated(`${printTX(tx)} ${printItem(id, task)} Expiring task result`);
},
markTaskStatus: function (tx, { id, task, isSuccessful }) {
if (isSuccessful) {
logSimulated(`${printTX(tx)} ${printItem(id, task)} Marking task as completed`);
} else {
logSimulated(`${printTX(tx)} ${printItem(id, task)} Marking task as failed`);
}
},
countLockedTasks: function (tx) {
return Array.from(locks).reduce((total, [ task, taskLocks ]) => {
return total + taskLocks.size;
}, 0);
},
getUpdateStream: function (... args) {
return backend.getUpdateStream(... args);
},
getTaskStream: function (... args) {
return backend.getTaskStream(... args);
});
},
deleteItem: function (tx, { id }) {
logSimulated(`${printTX(tx)} ${printItem(id)} Deleting item`);
},
createAlias: function (tx, { from, to }) {
logSimulated(`${printTX(tx)} ${printItem(to)} Creating alias ${from}`);
},
deleteAlias: function (tx, { from }) {
logSimulated(`${printTX(tx)} ${printItem(from)} Deleting alias`);
},
updateMetadata: function (tx, { id, task, update }) {
return Promise.try(() => {
return this.getItem(tx, { id, optional: true });
}).then((item) => {
let taskResult = (item != null)
? item.taskResults.find((result) => result.task === task)
: undefined;
let existingData = (taskResult != null)
? taskResult.metadata
: {};
let newData = update(existingData);
logSimulated(`${printTX(tx)} ${printItem(id, task)} Storing metadata`, newData);
});
},
expire: function (tx, { id, task }) {
logSimulated(`${printTX(tx)} ${printItem(id, task)} Expiring task result`);
},
markTaskStatus: function (tx, { id, task, isSuccessful }) {
if (isSuccessful) {
logSimulated(`${printTX(tx)} ${printItem(id, task)} Marking task as completed`);
} else {
logSimulated(`${printTX(tx)} ${printItem(id, task)} Marking task as failed`);
}
};
}
};
},
countLockedTasks: function (tx) {
return Array.from(locks).reduce((total, [ task, taskLocks ]) => {
return total + taskLocks.size;
}, 0);
},
getUpdateStream: function (... args) {
return backend.getUpdateStream(... args);
},
getTaskStream: function (... args) {
return backend.getTaskStream(... args);
}
};
}
};
};

@ -1,22 +1,19 @@
"use strict";
const Promise = require("bluebird");
const defaultValue = require("default-value");
const chalk = require("chalk");
const util = require("util");
const syncpipe = require("syncpipe");
const unreachable = require("@joepie91/unreachable")("srap");
const rateLimit = require("@promistream/rate-limit");
const simpleSink = require("@promistream/simple-sink");
const pipe = require("@promistream/pipe");
const parallelize = require("@promistream/parallelize");
const logStatus = require("./log-status");
const { validateOptions } = require("@validatem/core");
const isValidConfiguration = require("./validators/is-valid-configuration");
const createPrometheus = require("./prometheus");
const generateTaskGraph = require("./generate-task-graph");
const unreachable = require("@joepie91/unreachable")("srap");
const generateTaskGraph = require("./util/generate-task-graph");
const asyncInterval = require("./util/async-interval");
// FIXME: *Require* a taskInterval to be set, even if explicitly null, to prevent accidentally forgetting it
@ -46,7 +43,8 @@ module.exports = async function createKernel(_configuration) {
Object.assign(state, { backend: backend });
const createTaskKernel = require("./task-kernel")(state);
const createTaskKernel = require("./streams/task-kernel")(state);
const runTask = require("./run-task")(state);
function checkLockedTasks() {
return Promise.try(() => {
@ -63,29 +61,41 @@ module.exports = async function createKernel(_configuration) {
if (databasePreparePromise == null) {
databasePreparePromise = Promise.all([
checkLockedTasks(),
backend.topLevel.insertSeeds(configuration.seed)
backend.topLevel.insertSeeds(null, { seeds: configuration.seed })
]);
}
return databasePreparePromise;
}
// FIXME: Don't dump to console.log below, since this is meant to be usable as a library as well - provide some sort of object logging hook instead?
return {
run: async function runKernel() {
console.log(`Starting ${tasks.size} tasks...`);
await prepareDatabase();
asyncInterval(60 * 1000, () => {
return Promise.try(() => {
return backend.topLevel.getQueueSize();
}).then((queueSize) => {
for (let [ taskName, size ] of Object.entries(queueSize)) {
metrics.taskQueueSize.labels({ task: taskName }).set(size);
}
});
});
return Promise.map(tasks.values(), (task) => {
return pipe([
createTaskKernel(task),
simpleSink(({ status, item, error }) => {
createTaskKernel(task, {
globalRateLimiter: (attachToGlobalRateLimit != null) ? attachToGlobalRateLimit() : null,
}),
simpleSink(({ status }) => {
if (status === "completed") {
metrics.successfulItems.inc(1);
metrics.successfulItems.labels({ task: task }).inc(1);
metrics.successfulItems.labels({ task: task.name }).inc(1);
} else if (status === "failed") {
metrics.failedItems.inc(1);
metrics.failedItems.labels({ task: task }).inc(1);
metrics.failedItems.labels({ task: task.name }).inc(1);
} else {
unreachable(`Unrecognized status '${status}'`);
}
@ -93,190 +103,44 @@ module.exports = async function createKernel(_configuration) {
]).read();
});
},
simulate: async function simulate({ itemID, task }) {
simulate: async function simulate({ itemID, task: taskName }) {
console.log(`Simulating task ${itemID}/${taskName}...`);
await prepareDatabase();
let simulatedBackend = backend.simulate();
return simulateTask(itemID, task);
},
execute: async function simulate({ itemID, task }) {
await prepareDatabase();
return executeTask(itemID, task);
},
shutdown: function () {
// TODO: Properly lock all public methods after shutdown is called, and wait for any running tasks to have completed
return backend.shutdown();
},
getMetrics: function () {
return Promise.try(() => {
return state.prometheusRegistry.metrics();
}).then((metrics) => {
return {
contentType: state.prometheusRegistry.contentType,
metrics: metrics
};
});
}
};
let simulatedBackend = backend.topLevel.simulate();
function runTaskStreams() {
return Promise.map(Object.entries(tasks), ([ task, tags ]) => {
let taskConfiguration = configuration.tasks[task];
if (taskConfiguration != null) {
let taskStream = createTaskStream({
task: task,
tags: tags,
taskVersion: defaultValue(taskConfiguration.version, "0"),
taskInterval: taskConfiguration.taskInterval,
parallelTasks: taskConfiguration.parallelTasks,
ttl: taskConfiguration.ttl,
run: taskConfiguration.run,
globalRateLimiter: (attachToGlobalRateLimit != null)
? attachToGlobalRateLimit()
: null,
globalParallelize: (configuration.parallelTasks != null)
? parallelize(configuration.parallelTasks)
: null,
taskDependencies: dependencyMap[task],
taskDependents: dependentMap[task]
});
return pipe([
taskStream,
simpleSink((completedItem) => {
metrics.successfulItems.inc(1);
metrics.successfulItems.labels({ task: task }).inc(1);
logStatus(task, chalk.bold.green, "completed", completedItem.id);
})
]).read();
} else {
throw new Error(`Task '${task}' is defined to run for tags [${tags}], but no such task is defined`);
}
}).catch((error) => {
console.dir(error, { depth: null, colors: true });
throw error;
});
}
function executeTask(id, task) {
let taskConfiguration = configuration.tasks[task];
return knex.transaction((tx) => {
return Promise.try(() => {
return queries.getItem(knex, id);
}).then((item) => {
let queue = createDatabaseQueue({
tx,
item,
task,
taskVersion: defaultValue(taskConfiguration.version, "0"),
taskDependents: dependentMap[task],
taskDependencies: dependencyMap[task]
});
return Promise.try(() => {
return taskConfiguration.run({
id: item.id,
data: item.data,
getItem: function (id) {
return queries.getItem(knex, id);
},
... queue.api
});
}).then(() => {
return queue.execute();
});
});
}, { doNotRejectOnRollback: false });
}
function simulateTask(id, task) {
let taskConfiguration = configuration.tasks[task];
let methods = [ "createItem", "renameItem", "mergeItem", "deleteItem", "createAlias", "deleteAlias", "updateData", "updateMetadata", "expire", "expireDependents" ];
let simulatedMethods = syncpipe(methods, [
(_) => _.map((method) => [ method, function() {
console.log(`${chalk.bold.yellow.bgBlack(`${method} (simulated):`)} ${util.inspect(arguments, { colors: true, depth: null })}`);
}]),
(_) => Object.fromEntries(_)
]);
return Promise.try(() => {
return queries.getItem(knex, id);
}).then((item) => {
return taskConfiguration.run({
id: item.id,
data: item.data,
getItem: function (id) {
return queries.getItem(knex, id);
},
... simulatedMethods
let simulateTask = require("./run-task")({
... state,
backend: simulatedBackend
});
});
}
return {
run: function runKernel() {
return Promise.try(() => {
return insertSeeds();
}).then(() => {
return checkLockedTasks();
}).then(() => {
return runTaskStreams();
});
let item = await simulatedBackend.topLevel.getItem(null, { id: itemID });
return simulateTask(tasks.get(taskName), item);
},
simulate: function simulate({ itemID, task }) {
return Promise.try(() => {
return insertSeeds();
}).then(() => {
return checkLockedTasks();
}).then(() => {
return simulateTask(itemID, task);
});
},
execute: function simulate({ itemID, task }) {
return Promise.try(() => {
return insertSeeds();
}).then(() => {
return checkLockedTasks();
}).then(() => {
return executeTask(itemID, task);
});
execute: async function simulate({ itemID, task: taskName }) {
// TODO: Should this also lock the task? We probably want to ignore any locks, since this method is primarily used for task logic debugging purposes, and overriding locks would be desirable there.
console.log(`Running task ${itemID}/${taskName}...`);
await prepareDatabase();
let item = await backend.topLevel.getItem(null, { id: itemID });
return runTask(tasks.get(taskName), item);
},
shutdown: function () {
// TODO: Properly lock all public methods after shutdown is called, and wait for any running tasks to have completed
knex.destroy();
return backend.topLevel.shutdown();
},
getMetrics: function () {
return Promise.try(() => {
return prometheusRegistry.metrics();
return state.prometheusRegistry.metrics();
}).then((metrics) => {
return {
contentType: prometheusRegistry.contentType,
contentType: state.prometheusRegistry.contentType,
metrics: metrics
};
});
},
getUpdates: function ({ prefix, timestamp }) {
return backend.topLevel.getUpdateStream(null, { prefix, timestamp });
}
};
};

@ -27,6 +27,18 @@ module.exports = function createPrometheus() {
help: "Amount of items that have failed during processing",
labelNames: [ "task" ]
}),
successfulLocks: new prometheusClient.Counter({
registers: [ prometheusRegistry ],
name: "srap_successful_locks_total",
help: "Amount of queue item lock attempts that were successful",
labelNames: [ "task" ]
}),
failedLocks: new prometheusClient.Counter({
registers: [ prometheusRegistry ],
name: "srap_failed_locks_total",
help: "Amount of queue item lock attempts that failed",
labelNames: [ "task" ]
}),
taskFetchTime: new prometheusClient.Gauge({
registers: [ prometheusRegistry ],
name: "srap_task_fetch_seconds",
@ -38,8 +50,25 @@ module.exports = function createPrometheus() {
name: "srap_task_fetch_results_count",
help: "Amount of new scraping tasks fetched during the most recent attempt",
labelNames: [ "task" ]
}),
taskRefillTime: new prometheusClient.Gauge({
registers: [ prometheusRegistry ],
name: "srap_task_refill_seconds",
help: "Time needed for the most recent refill of the task queue",
labelNames: [ "task" ]
}),
taskRefillResults: new prometheusClient.Gauge({
registers: [ prometheusRegistry ],
name: "srap_task_refill_results_count",
help: "Amount of new scraping tasks added to queue during the most recent attempt",
labelNames: [ "task" ]
}),
taskQueueSize: new prometheusClient.Gauge({
registers: [ prometheusRegistry ],
name: "srap_task_queue_count",
help: "Amount of scraping tasks currently queued up",
labelNames: [ "task" ]
})
// FIXME: Measure queue-refill task
}
};
};

@ -1,36 +0,0 @@
"use strict";
const Promise = require("bluebird");
const consumable = require("@joepie91/consumable");
const syncpipe = require("syncpipe");
const createMutationAPIWrapper = require("./mutation-api/wrapper");
module.exports = function (state) {
const createDatabaseMutationAPI = require("./mutation-api/database")(state);
return function createDatabaseQueue(context) {
let databaseMutationAPI = createDatabaseMutationAPI(context);
let mutationAPI = createMutationAPIWrapper(context, databaseMutationAPI);
let queue = consumable([]);
return {
api: syncpipe(Object.keys(mutationAPI), [
(_) => _.map((method) => [ method, function() { queue.peek().push([ method, arguments ]); } ]),
(_) => Object.fromEntries(_)
]),
execute: function () {
if (!queue.peek().some((method) => method[0] === "updateMetadata")) {
// Doing an updateMetadata call is necessary to mark a task 'completed', so we inject a dummy call that doesn't actually change the metadata itself
// FIXME: Split apart 'markTaskCompleted' and 'updateMetadata' queries so that this hack is no longer necessary
queue.peek().push([ "updateMetadata", [ (data) => data ]]);
}
return Promise.each(queue.consume(), ([ method, args ]) => {
return mutationAPI[method](... args);
});
}
};
};
};

@ -0,0 +1,55 @@
"use strict";
const Promise = require("bluebird");
const chalk = require("chalk");
const logStatus = require("./util/log-status");
module.exports = function ({ backend }) {
return function runTask(task, item) {
let queue = [];
let itemIsDeleted = false;
let api = backend.forItem({
task: task,
item: item,
mutationQueue: queue,
onDeleteSelf: () => { itemIsDeleted = true; }
});
return Promise.try(() => {
// TODO: Standardize logging control/levels interface, also for library use
if (!process.env.SRAP_QUIET) {
logStatus(task, chalk.bold.cyan, "started", item.id);
}
// NOTE: We only pass in the item data itself, *not* any associated metadata like tags. If the scraping task wants access to that sort of information, it should do a `getItem` call from within its task logic where needed.
// FIXME: Is that actually still true post-refactor?
return task.run({
data: item.data,
... api.exposed
});
}).then(() => {
// NOTE: We only apply changes at the very end (outside of simulation mode), so that when a task implementation contains multiple operations, each of those operation always 'sees' the state at the start of the task, not the state after the previous mutation. This makes the model as a whole easier to reason about. In simulation mode, all calls are immediate and the queue is empty - after all, no mutation can happen in that case anyway. This is also another reason to ensure that operations in live mode always see the starting state; that makes its behaviour consistent with simulation mode.
return backend.topLevel.runInTransaction(null, (tx) => {
return Promise.each(queue, (operation) => {
return operation(tx);
});
});
}).then(async () => {
if (!itemIsDeleted) {
await api.internal.markTaskCompleted();
}
if (!process.env.SRAP_QUIET) {
logStatus(task, chalk.bold.green, "completed", item.id);
}
return { status: "completed", item: item };
}).catch(async (error) => {
await api.internal.markTaskFailed(null, { error });
logStatus(task, chalk.bold.red, "failed", `${item.id}: ${error.stack}`);
return { status: "failed", item: item, error: error };
});
};
};

@ -1,42 +0,0 @@
"use strict";
const Promise = require("bluebird");
// const { UniqueViolationError } = require("objection");
const pipe = require("@promistream/pipe");
const map = require("@promistream/map");
const mapFilter = require("@promistream/map-filter");
module.exports = function ({ backend }) {
return function processTaskSafely(task, processHandler) {
let lockStream = mapFilter((item) => {
return Promise.try(() => {
return backend.lock(null, { id: item.id, task: task });
}).then((success) => {
if (success) {
return item;
} else {
return mapFilter.NoValue;
}
});
});
let processUnlockStream = map((item) => {
return Promise.try(() => {
return backend.runInTransaction((tx) => {
return processHandler(item, tx);
});
}).finally(() => {
// NOTE: The unlock deliberately happens outside of a transaction, so that it can always succeed, even if a task and its associated database changes failed
return backend.unlock(null, { id: item.id, task: task });
}).then(() => {
return item;
});
});
return pipe([
lockStream,
processUnlockStream
]);
};
};

@ -0,0 +1,37 @@
"use strict";
const Promise = require("bluebird");
const pipe = require("@promistream/pipe");
const filter = require("@promistream/filter");
const map = require("@promistream/map");
const rateLimit = require("@promistream/rate-limit");
const parallelize = require("@promistream/parallelize");
// FIXME: Move logs to logging hook
module.exports = function (state) {
let { backend } = state;
const runTask = require("../run-task")(state);
return function createTaskKernelStream(task, { globalRateLimiter }) {
return pipe([
backend.topLevel.getTaskStream(null, { task: task }),
filter((item) => backend.forItem({ task: task, item: item }).internal.lock()),
globalRateLimiter,
(task.taskInterval != null)
? rateLimit(task.taskInterval)
: null,
map((item) => {
return Promise.try(() => {
return runTask(task, item);
}).tap(() => {
return backend.forItem({ task: task, item: item }).internal.unlock();
});
}),
(task.parallelTasks != null)
? parallelize(task.parallelTasks)
: null
]);
};
};

@ -1,53 +0,0 @@
"use strict";
const chalk = require("chalk");
const pipe = require("@promistream/pipe");
const filter = require("@promistream/filter");
const map = require("@promistream/map");
const logStatus = require("./log-status");
module.exports = function ({ backend }) {
function runTask(task, item) {
let queue = [];
let api = backend.forItem({ task: task, id: item.id, mutationQueue: queue });
return Promise.try(() => {
logStatus(task, chalk.bold.cyan, "started", item.id);
// NOTE: We only pass in the item data itself, *not* any associated metadata like tags. If the scraping task wants access to that sort of information, it should do a `getItem` call from within its task logic where needed.
// FIXME: Is that actually still true post-refactor?
task.run({
data: item.data,
... api.exposed
});
}).then(() => {
return backend.topLevel.runInTransaction((tx) => {
// FIXME: use queue
});
}).then(async () => {
await api.internal.markTaskCompleted();
logStatus(task, chalk.bold.green, "completed", item.id);
return { status: "completed", item: item };
}).catch(async (error) => {
await api.internal.markTaskFailed(null, { error });
logStatus(task, chalk.bold.red, "failed", `${item.id}: ${error.stack}`);
return { status: "failed", item: item, error: error };
});
}
return function createTaskKernelStream(task) {
return pipe([
backend.topLevel.getTaskStream(null, { task: task }),
filter((item) => backend.forItem({ task: task, id: item.id }).internal.lock()),
map((item) => {
return Promise.try(() => {
return runTask(task, item);
}).tap(() => {
return backend.forItem({ task: task, id: item.id }).internal.unlock();
});
})
]);
};
};

@ -0,0 +1,24 @@
"use strict";
const Promise = require("bluebird");
module.exports = function asyncInterval(interval, callback) {
function doCycle() {
let startTime = Date.now();
return Promise.try(() => {
return callback();
}).then(() => {
// let elapsed = Date.now() - startTime;
let elapsed = 0; // HACK: Temporary way to force that the full interval is always waited *between* operations
if (elapsed > interval) {
return doCycle();
} else {
return Promise.delay(interval - elapsed).then(doCycle);
}
});
}
return doCycle();
};

@ -3,7 +3,7 @@
const syncpipe = require("syncpipe");
const defaultValue = require("default-value"); // FIXME: Move to config validation
const invertMapping = require("./util/invert-mapping");
const invertMapping = require("./invert-mapping");
module.exports = function generateTaskGraph({ tags, tasks }) {
let tagsMapping = invertMapping(tags);
@ -14,7 +14,8 @@ module.exports = function generateTaskGraph({ tags, tasks }) {
return [ name, {
... taskDefinition,
name: name,
tags: tagsMapping[name],
// NOTE: The default here is for cases where a task is 'orphaned' and not associated with any tags; this can happen during development, and in that case the task won't be present in the tagsMapping at all.
tags: tagsMapping[name] ?? [],
dependencies: [],
dependents: []
}];

@ -3,5 +3,5 @@
const chalk = require("chalk");
module.exports = function logStatus(task, color, type, message) {
console.log(`${chalk.bold(`[${task}]`)} ${color(`[${type}]`)} ${message}`);
console.log(`${chalk.bold(`[${task.name}]`)} ${color(`[${type}]`)} ${message}`);
};

@ -5,6 +5,8 @@ const arrayOf = require("@validatem/array-of");
const isString = require("@validatem/is-string");
const isFunction = require("@validatem/is-function");
const isArray = require("@validatem/is-array");
const either = require("@validatem/either");
const isValue = require("@validatem/is-value");
const isPositiveInteger = require("./is-positive-integer");
@ -18,7 +20,7 @@ function makeRules(recurse) {
name: [ required, isString ],
version: [ required, isString ],
ttl: [ isPositiveInteger ],
parallelTasks: [ isPositiveInteger ],
parallelTasks: [ either([ isPositiveInteger, isValue(Infinity) ]) ],
taskInterval: [ isPositiveInteger ],
dependents: [ required, isTaskArray ],
dependencies: [ required, isTaskArray ],

@ -34,7 +34,7 @@ module.exports = {
value: [ required, {
ttl: [ isMS ],
taskInterval: [ isMS ],
parallelTasks: [ defaultTo, either([
parallelTasks: [ defaultTo(1), either([
[ isValue(Infinity) ],
[ isInteger, isPositive ]
])],

@ -0,0 +1,3 @@
- locks table: make locked status a field instead of based on existence
- rename locks table to queue table
- insert tasks into queue table whenever drained for a task

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save