backend-refactor
Sven Slootweg 3 years ago
parent 1acc039897
commit 128b70fdae

@ -0,0 +1,31 @@
#!/usr/bin/env node
"use strict";
const Promise = require("bluebird");
const yargs = require("yargs");
const path = require("path");
const createKernel = require("../src/kernel");
const chalk = require("chalk");
let argv = yargs.argv;
let [ configurationPath, task, item ] = argv._;
let absoluteConfigurationPath = path.join(process.cwd(), configurationPath);
let configuration = require(absoluteConfigurationPath);
return Promise.try(() => {
return createKernel(configuration);
}).then((kernel) => {
return Promise.try(() => {
return kernel.execute({
task: task,
itemID: item
});
}).then(() => {
console.log(chalk.green.bold("Done!"));
}).finally(() => {
kernel.shutdown();
});
});

@ -7,5 +7,8 @@ module.exports = {
connection: {
host: config.database.socketPath,
database: config.database.database
},
migrations: {
tableName: "srap_knex_migrations"
}
};

@ -2,7 +2,7 @@
module.exports.up = function(knex, Promise) {
return knex.schema
.createTable("items", (table) => {
.createTable("srap_items", (table) => {
// NOTE: The id is the primary name for the item
table.text("id").notNullable().primary();
table.jsonb("data").notNullable();
@ -12,16 +12,16 @@ module.exports.up = function(knex, Promise) {
table.timestamp("updated_at").notNullable(); // FIXME: Maybe should be nullable?
table.timestamp("metadata_updated_at");
})
.createTable("aliases", (table) => {
.createTable("srap_aliases", (table) => {
table.text("alias").notNullable().primary();
table.text("item_id").references("items.id").notNullable().onUpdate("CASCADE").onDelete("CASCADE");
})
.createTable("tags", (table) => {
.createTable("srap_tags", (table) => {
table.bigIncrements("id").primary();
table.text("item_id").references("items.id").notNullable().onUpdate("CASCADE").onDelete("CASCADE");
table.text("name").notNullable().index();
})
.createTable("task_results", (table) => {
.createTable("srap_task_results", (table) => {
table.primary([ "task", "item_id" ]);
table.text("task").notNullable();
table.text("item_id").references("items.id").notNullable().onUpdate("CASCADE").onDelete("CASCADE");
@ -32,13 +32,13 @@ module.exports.up = function(knex, Promise) {
table.timestamp("updated_at").notNullable().defaultTo(knex.fn.now());
table.timestamp("expires_at");
})
.createTable("tasks_in_progress", (table) => {
.createTable("srap_tasks_in_progress", (table) => {
table.primary([ "task", "item_id" ]);
table.text("task").notNullable();
table.text("item_id").references("items.id").notNullable().onUpdate("CASCADE").onDelete("CASCADE");
table.timestamp("started_at").notNullable().defaultTo(knex.fn.now());
})
.createTable("failures", (table) => {
.createTable("srap_failures", (table) => {
table.bigIncrements("id").primary();
table.text("task").notNullable();
table.text("item_id").notNullable();
@ -52,10 +52,10 @@ module.exports.up = function(knex, Promise) {
module.exports.down = function(knex, Promise) {
return knex.schema
.dropTable("failures")
.dropTable("tasks_in_progress")
.dropTable("task_results")
.dropTable("tags")
.dropTable("aliases")
.dropTable("items");
.dropTable("srap_failures")
.dropTable("srap_tasks_in_progress")
.dropTable("srap_task_results")
.dropTable("srap_tags")
.dropTable("srap_aliases")
.dropTable("srap_items");
}

@ -2,14 +2,14 @@
module.exports.up = function(knex, Promise) {
return knex.schema
.alterTable("task_results", (table) => {
.alterTable("srap_task_results", (table) => {
table.index("item_id");
});
};
module.exports.down = function(knex, Promise) {
return knex.schema
.alterTable("task_results", (table) => {
.alterTable("srap_task_results", (table) => {
table.dropIndex("item_id");
});
};

@ -0,0 +1,17 @@
"use strict";
module.exports.up = function(knex, Promise) {
return knex.schema
.alterTable("srap_aliases", (table) => {
table.timestamp("created_at").notNullable().defaultTo(knex.fn.now());
table.timestamp("updated_at").notNullable().defaultTo(knex.fn.now());
});
};
module.exports.down = function(knex, Promise) {
return knex.schema
.alterTable("srap_aliases", (table) => {
table.dropColumn("created_at");
table.dropColumn("updated_at");
});
};

@ -6,11 +6,15 @@
"author": "Sven Slootweg <admin@cryto.net>",
"license": "WTFPL OR CC0-1.0",
"dependencies": {
"@joepie91/consumable": "^1.0.1",
"@promistream/buffer": "^0.1.1",
"@promistream/combine-sequential-streaming": "^0.1.0",
"@promistream/from-iterable": "^0.1.0",
"@promistream/from-node-stream": "^0.1.1",
"@promistream/map": "^0.1.1",
"@promistream/map-filter": "^0.1.0",
"@promistream/parallelize": "^0.1.0",
"@promistream/pipe": "^0.1.2",
"@promistream/pipe": "^0.1.6",
"@promistream/rate-limit": "^1.0.1",
"@promistream/simple-sink": "^0.1.1",
"@promistream/simple-source": "^0.1.3",
@ -21,6 +25,7 @@
"@validatem/default-to": "^0.1.0",
"@validatem/error": "^1.1.0",
"@validatem/is-boolean": "^0.1.1",
"@validatem/is-date": "^0.1.0",
"@validatem/is-function": "^0.1.0",
"@validatem/is-number": "^0.1.3",
"@validatem/is-string": "^1.0.0",
@ -36,11 +41,12 @@
"default-value": "^1.0.0",
"express": "^4.17.1",
"express-promise-router": "^4.0.1",
"knex": "^0.21.17",
"knex": "^0.95.11",
"map-obj": "^4.2.0",
"ms": "^2.1.3",
"objection": "^2.2.14",
"pg": "^8.5.1",
"pg-query-stream": "^4.1.0",
"syncpipe": "^1.0.0",
"yargs": "^16.2.0"
},

@ -4,13 +4,21 @@ const Promise = require("bluebird");
const express = require("express");
const expressPromiseRouter = require("express-promise-router");
const pipe = require("@promistream/pipe");
const fromNodeStream = require("@promistream/from-node-stream");
const map = require("@promistream/map");
const initialize = require("./initialize");
return Promise.try(() => {
return initialize();
return initialize({
knexfile: require("../knexfile")
});
}).then((state) => {
let { db, knex } = state;
const queries = require("./queries")(state);
let app = express();
let router = expressPromiseRouter();
@ -65,6 +73,14 @@ return Promise.try(() => {
router.get("/items/:id", (req, res) => {
});
router.get("/updates", (req, res) => {
return pipe([
queries.getUpdates(knex),
map((item) => JSON.stringify(item)),
fromNodeStream(res)
]).read();
});
app.use(router);
app.listen(3000);

@ -41,12 +41,15 @@ module.exports = function createKernel(configuration) {
return initialize({
knexfile: {
client: "pg",
connection: configuration.database
connection: configuration.database,
pool: { min: 0, max: 32 },
migrations: { tableName: "srap_knex_migrations" }
}
});
}).then((state) => {
const queries = require("./queries")(state);
const createTaskStream = require("./task-stream")(state);
const createDatabaseQueue = require("./queued-database-api")(state);
let { knex } = state;
let { dependencyMap, dependentMap } = createDependencyMap(configuration);
@ -117,6 +120,38 @@ module.exports = function createKernel(configuration) {
});
}
function executeTask(id, task) {
let taskConfiguration = configuration.tasks[task];
return knex.transaction((tx) => {
return Promise.try(() => {
return queries.getItem(knex, id);
}).then((item) => {
let queue = createDatabaseQueue({
tx,
item,
task,
taskVersion: defaultValue(taskConfiguration.version, "0"),
taskDependents: dependentMap[task],
taskDependencies: dependencyMap[task]
});
return Promise.try(() => {
return taskConfiguration.run({
id: item.id,
data: item.data,
getItem: function (id) {
return queries.getItem(knex, id);
},
... queue.api
});
}).then(() => {
return queue.execute();
});
});
});
}
function simulateTask(id, task) {
let taskConfiguration = configuration.tasks[task];
@ -162,6 +197,15 @@ module.exports = function createKernel(configuration) {
return simulateTask(itemID, task);
});
},
execute: function simulate({ itemID, task }) {
return Promise.try(() => {
return insertSeeds();
}).then(() => {
return checkLockedTasks();
}).then(() => {
return executeTask(itemID, task);
});
},
shutdown: function () {
// TODO: Properly lock all public methods after shutdown is called, and wait for any running tasks to have completed
knex.destroy();

@ -4,7 +4,7 @@ const { Model } = require("objection");
module.exports = function ({ db }) {
return class Alias extends Model {
static tableName = "aliases";
static tableName = "srap_aliases";
static idColumn = "alias";
static get relationMappings() {
@ -12,7 +12,7 @@ module.exports = function ({ db }) {
item: {
relation: Model.BelongsToOneRelation,
modelClass: db.Item,
join: { from: "aliases.itemId", to: "items.id" }
join: { from: "srap_aliases.itemId", to: "srap_items.id" }
}
};
};

@ -4,14 +4,14 @@ const { Model } = require("objection");
module.exports = function ({ db }) {
return class Failure extends Model {
static tableName = "failures";
static tableName = "srap_failures";
static get relationMappings() {
return {
taskResult: {
relation: Model.BelongsToOneRelation,
modelClass: db.TaskResult,
join: { from: "failures.taskResultId", to: "taskResults.id" }
join: { from: "srap_failures.taskResultId", to: "srap_taskResults.id" }
}
};
};

@ -4,29 +4,29 @@ const { Model } = require("objection");
module.exports = function ({ db }) {
return class Item extends Model {
static tableName = "items";
static tableName = "srap_items";
static get relationMappings() {
return {
aliases: {
relation: Model.HasManyRelation,
modelClass: db.Alias,
join: { from: "items.id", to: "aliases.itemId" }
join: { from: "srap_items.id", to: "srap_aliases.itemId" }
},
tags: {
relation: Model.HasManyRelation,
modelClass: db.Tag,
join: { from: "items.id", to: "tags.itemId" }
join: { from: "srap_items.id", to: "srap_tags.itemId" }
},
taskResults: {
relation: Model.HasManyRelation,
modelClass: db.TaskResult,
join: { from: "items.id", to: "taskResults.itemId" }
join: { from: "srap_items.id", to: "srap_taskResults.itemId" }
},
tasksInProgress: {
relation: Model.HasManyRelation,
modelClass: db.TaskInProgress,
join: { from: "items.id", to: "tasksInProgress.itemId" }
join: { from: "srap_items.id", to: "srap_tasksInProgress.itemId" }
},
failedTasks: {
// Not actually a many-to-many, but that's what objection calls a HasManyThrough...
@ -34,9 +34,9 @@ module.exports = function ({ db }) {
relation: Model.ManyToManyRelation,
modelClass: db.Failure,
join: {
from: "items.id",
through: { from: "task_results.itemId", to: "task_results.id" },
to: "failures.taskResultId"
from: "srap_items.id",
through: { from: "srap_task_results.itemId", to: "srap_task_results.id" },
to: "srap_failures.taskResultId"
}
}
};

@ -4,14 +4,14 @@ const { Model, QueryBuilder } = require("objection");
module.exports = function ({ db }) {
return class Tag extends Model {
static tableName = "tags";
static tableName = "srap_tags";
static get relationMappings() {
return {
item: {
relation: Model.BelongsToOneRelation,
modelClass: db.Item,
join: { from: "tags.itemId", to: "item.id" }
join: { from: "srap_tags.itemId", to: "srap_item.id" }
}
};
};

@ -4,7 +4,7 @@ const { Model } = require("objection");
module.exports = function ({ db }) {
return class TaskInProgress extends Model {
static tableName = "tasksInProgress";
static tableName = "srap_tasksInProgress";
static idColumn = [ "task", "itemId" ];
static get relationMappings() {
@ -12,7 +12,7 @@ module.exports = function ({ db }) {
item: {
relation: Model.BelongsToOneRelation,
modelClass: db.Item,
join: { from: "tasksInProgress.itemId", to: "item.id" }
join: { from: "srap_tasksInProgress.itemId", to: "srap_item.id" }
}
};
};

@ -4,7 +4,7 @@ const { Model } = require("objection");
module.exports = function ({ db }) {
return class TaskResult extends Model {
static tableName = "taskResults";
static tableName = "srap_taskResults";
static idColumn = [ "task", "itemId" ];
static get relationMappings() {
@ -12,7 +12,7 @@ module.exports = function ({ db }) {
item: {
relation: Model.BelongsToOneRelation,
modelClass: db.Item,
join: { from: "taskResults.itemId", to: "item.id" }
join: { from: "srap_taskResults.itemId", to: "srap_item.id" }
}
};
};

@ -8,6 +8,8 @@ function logCall(methodName, args) {
console.log(`${chalk.bold.yellow.bgBlack(`${methodName} (simulated):`)} ${util.inspect(args, { colors: true, depth: null })}`);
}
// TODO: Make this do an actual database query and then rollback; that way the behaviour is the same as when really modifying the DB, in that earlier operations can affect what later operations see (eg. a createItem followed by a mergeItem involving that new item).
module.exports = function (state) {
const queries = require("../queries")(state);

@ -9,6 +9,7 @@ const defaultTo = require("@validatem/default-to");
const validateOptions = require("@validatem/core/src/api/validate-options");
const isFunction = require("@validatem/is-function");
const arrayOf = require("@validatem/array-of");
const defaultValue = require("default-value");
// FIXME: Remaining validators
@ -93,9 +94,11 @@ module.exports = function wrapMutationAPI({ item, task, taskDependents }, api) {
? new Set(options.dependents)
: null;
let allDependents = defaultValue(taskDependents, []);
let affectedDependents = (selectedDependents != null)
? taskDependents.filter((dependent) => selectedDependents.has(dependent.task))
: taskDependents;
? allDependents.filter((dependent) => selectedDependents.has(dependent.task))
: allDependents;
return Promise.map(affectedDependents, (dependent) => {
return this.expire({

@ -11,12 +11,17 @@ const isString = require("@validatem/is-string");
const isBoolean = require("@validatem/is-boolean");
const isFunction = require("@validatem/is-function");
const isNumber = require("@validatem/is-number");
const isDate = require("@validatem/is-date");
const arrayOf = require("@validatem/array-of");
const defaultTo = require("@validatem/default-to");
const anyProperty = require("@validatem/any-property");
const anything = require("@validatem/anything");
const ValidationError = require("@validatem/error");
const pipe = require("@promistream/pipe");
const combineSequentialStreaming = require("@promistream/combine-sequential-streaming");
const fromIterable = require("@promistream/from-iterable");
const fromNodeStream = require("@promistream/from-node-stream");
const { addSeconds } = require("date-fns");
const syncpipe = require("syncpipe");
@ -32,20 +37,21 @@ function noop() {}
function taskResultsToObject(taskResults) {
return syncpipe(taskResults, [
(_) => [ _.taskName, _.metadata ],
(_) => _.map((result) => [ result.taskName, result.metadata ]),
(_) => Object.fromEntries(_)
]);
}
module.exports = function ({ db }) {
return {
getItem: function (tx, id) {
// FIXME: Make object API instead
getItem: function (tx, id, optional = false) {
return Promise.try(() => {
return db.Alias.relatedQuery("item", tx)
.for(id)
.withGraphFetched("taskResults");
}).then((results) => {
if (results.length > 0) {
if (optional === true || results.length > 0) {
return results[0];
} else {
throw new Error(`No item exists with ID '${id}'`);
@ -148,7 +154,7 @@ module.exports = function ({ db }) {
});
return db.Alias.query(tx)
.patch({ itemId: to })
.patch({ itemId: to, updatedAt: new Date() })
.where({ itemId: from });
},
mergeItem: function (_tx, _options) {
@ -167,77 +173,87 @@ module.exports = function ({ db }) {
});
return Promise.all([
this.getItem(tx, { id: from }),
this.getItem(tx, { id: into }),
]).then(([ from, into ]) => {
let newData = merge(into.data, from.data);
let fromTaskResults = taskResultsToObject(from.taskResults);
let intoTaskResults = taskResultsToObject(into.taskResults);
// FIXME: Deduplicate function
let allTaskKeys = Array.from(new Set([
... Object.keys(fromTaskResults),
... Object.keys(intoTaskResults)
]));
function selectNewestResult(taskA, taskB) {
if (taskA == null) {
return taskB;
} else if (taskB == null) {
return taskA;
} else if (taskA.updatedAt > taskB.updatedAt) {
return taskA;
} else {
return taskB;
}
}
this.getItem(tx, from, true),
this.getItem(tx, into, true),
]).then(([ fromObj, intoObj ]) => {
if (fromObj != null) {
let defaultedIntoObj = defaultValue(intoObj, {
id: into,
data: {},
taskResults: []
});
let newData = merge(defaultedIntoObj.data, fromObj.data);
let fromTaskResults = taskResultsToObject(fromObj.taskResults);
let intoTaskResults = taskResultsToObject(defaultedIntoObj.taskResults);
// TODO: Use merge-by-template here instead?
let newTaskResults = allTaskKeys.map((key) => {
let merger = mergeMetadata[key];
let fromTask = fromTaskResults[key];
let intoTask = intoTaskResults[key];
if (merger != null) {
// Generate a new TaskResult that includes data combined from both
let newMetadata = merger(
defaultValue(intoTask.metadata, {}),
defaultValue(fromTask.metadata, {})
);
return {
... intoTask,
metadata: newMetadata,
updatedAt: Date.now()
};
} else {
// Take the newest known TaskResult and just make sure that it is pointing at the correct ID
return {
... selectNewestResult(intoTask, fromTask),
itemId: into.id
};
// FIXME: Deduplicate function
let allTaskKeys = Array.from(new Set([
... Object.keys(fromTaskResults),
... Object.keys(intoTaskResults)
]));
function selectNewestResult(taskA, taskB) {
if (taskA == null) {
return taskB;
} else if (taskB == null) {
return taskA;
} else if (taskA.updatedAt > taskB.updatedAt) {
return taskA;
} else {
return taskB;
}
}
});
let upsertOptions = {
insertMissing: true,
noDelete: true
};
// TODO: Use merge-by-template here instead?
let newTaskResults = allTaskKeys.map((key) => {
let merger = mergeMetadata[key];
let fromTask = fromTaskResults[key];
let intoTask = intoTaskResults[key];
if (merger != null) {
// Generate a new TaskResult that includes data combined from both
let newMetadata = merger(
defaultValue(intoTask.metadata, {}),
defaultValue(fromTask.metadata, {})
);
return {
... intoTask,
metadata: newMetadata,
updatedAt: Date.now()
};
} else {
// Take the newest known TaskResult and just make sure that it is pointing at the correct ID
return {
... selectNewestResult(intoTask, fromTask),
itemId: defaultedIntoObj.id
};
}
});
return Promise.try(() => {
return into.$query(tx).upsertGraph({
data: newData,
taskResults: newTaskResults
}, upsertOptions);
}).then(() => {
// NOTE: Repointing aliases has the side-effect of leaving a redirect from the source to the destination item, as each item has a self:self alias
return this.repointAliases(tx, { from: from.id, to: into.id });
}).then(() => {
// NOTE: We don't use this.deleteItem, to sidestep any alias lookups
return db.Item.query(tx).findById(from.id).delete();
});
let upsertOptions = {
insertMissing: true,
noDelete: true
};
return Promise.try(() => {
// NOTE: Cannot use into.$query here because that adds an implicit query builder operation, which upsertGraph does not allow
return db.Item.query(tx).upsertGraph({
id: defaultedIntoObj.id,
data: newData,
taskResults: newTaskResults
}, upsertOptions);
}).then(() => {
// NOTE: Repointing aliases has the side-effect of leaving a redirect from the source to the destination item, as each item has a self:self alias
return this.repointAliases(tx, { from: fromObj.id, to: intoObj.id });
}).then(() => {
// NOTE: We don't use this.deleteItem, to sidestep any alias lookups
return db.Item.query(tx).findById(fromObj.id).delete();
});
}
});
},
deleteItem: function (_tx, _options) {
@ -265,7 +281,11 @@ module.exports = function ({ db }) {
}]
});
let promise = db.Alias.query(tx).insert({ alias: from, itemId: to });
let promise = db.Alias.query(tx).insert({
alias: from,
itemId: to,
updatedAt: new Date()
});
if (failIfExists) {
return promise;
@ -281,6 +301,7 @@ module.exports = function ({ db }) {
}]
});
// TODO: This cannot yet be propagated to the update feed, because we don't keep a record of deletions
return db.Alias.query(tx).findById(from).delete();
},
updateData: function (_tx, _options) {
@ -379,6 +400,48 @@ module.exports = function ({ db }) {
}).then((result) => {
return result[0].count;
});
},
getUpdates: function (_tx, _options) {
// NOTE: This returns snake_cased keys! As we're bypassing the Objection internals, no casemapping occurs.
let [ tx, { timestamp, prefix }] = validateArguments(arguments, {
tx: [ required, isTX ],
options: [ defaultTo({}), {
timestamp: [ isDate ],
prefix: [ isString ]
}]
});
function applyWhereClauses(query, idField) {
if (timestamp != null) {
query = query.whereRaw(`updated_at > ?`, [ timestamp ]);
}
if (prefix != null) {
query = query.whereRaw(`${idField} LIKE ?`, [ `${prefix.replace(/%/g, "\\%")}%` ]);
}
return query;
}
// FIXME/MARKER: Below query streams are all producing 0 items, why? Running them manually yields results.
function* streamGenerator() {
yield fromNodeStream.fromReadable(
applyWhereClauses(db.Item.query(tx), "id").toKnexQuery().stream()
);
yield fromNodeStream.fromReadable(
applyWhereClauses(db.Alias.query(tx), "item_id").toKnexQuery().stream()
);
yield fromNodeStream.fromReadable(
applyWhereClauses(db.TaskResult.query(tx), "item_id").toKnexQuery().stream()
);
}
return pipe([
fromIterable(streamGenerator()),
combineSequentialStreaming()
]);
}
};
};

@ -0,0 +1,36 @@
"use strict";
const Promise = require("bluebird");
const consumable = require("@joepie91/consumable");
const syncpipe = require("syncpipe");
const createMutationAPIWrapper = require("./mutation-api/wrapper");
module.exports = function (state) {
const createDatabaseMutationAPI = require("./mutation-api/database")(state);
return function createDatabaseQueue(context) {
let databaseMutationAPI = createDatabaseMutationAPI(context);
let mutationAPI = createMutationAPIWrapper(context, databaseMutationAPI);
let queue = consumable([]);
return {
api: syncpipe(Object.keys(mutationAPI), [
(_) => _.map((method) => [ method, function() { queue.peek().push([ method, arguments ]); } ]),
(_) => Object.fromEntries(_)
]),
execute: function () {
if (!queue.peek().some((method) => method[0] === "updateMetadata")) {
// Doing an updateMetadata call is necessary to mark a task 'completed', so we inject a dummy call that doesn't actually change the metadata itself
// FIXME: Split apart 'markTaskCompleted' and 'updateMetadata' queries so that this hack is no longer necessary
queue.peek().push([ "updateMetadata", [ (data) => data ]]);
}
return Promise.each(queue.consume(), ([ method, args ]) => {
return mutationAPI[method](... args);
});
}
};
};
};

@ -3,20 +3,20 @@
const Promise = require("bluebird");
const ms = require("ms");
const dateFns = require("date-fns");
const syncpipe = require("syncpipe");
const debug = require("debug")("scrapingserver");
const chalk = require("chalk");
const simpleSource = require("@promistream/simple-source");
const buffer = require("@promistream/buffer");
const pipe = require("@promistream/pipe");
const rateLimit = require("@promistream/rate-limit");
const parallelize = require("@promistream/parallelize");
const createMutationAPIWrapper = require("./mutation-api/wrapper");
const logStatus = require("./log-status");
const chalk = require("chalk");
const parallelize = require("@promistream/parallelize");
const { UniqueViolationError } = require("objection");
// FIXME: Revert inlining of task_states once switched to PostgreSQL 12+, which can do this automatically using NOT MATERIALIZED
// FIXME: Check whether the dependency task_versions are actually being correctly passed in, and aren't accidentally nulls
let query = `
WITH
dependency_tasks AS (
@ -25,27 +25,27 @@ let query = `
),
matching_items AS (
SELECT
DISTINCT ON (items.id)
items.*,
results.updated_at AS result_date,
results.task_version,
DISTINCT ON (srap_items.id)
srap_items.*,
srap_results.updated_at AS result_date,
srap_results.task_version,
(
results.is_successful = TRUE
srap_results.is_successful = TRUE
AND (
results.expires_at < NOW()
OR results.is_invalidated = TRUE
srap_results.expires_at < NOW()
OR srap_results.is_invalidated = TRUE
)
) AS is_candidate
FROM items
INNER JOIN tags
ON tags.item_id = items.id
AND tags.name = ANY(:tags)
LEFT JOIN task_results AS results
INNER JOIN srap_tags
ON srap_tags.item_id = srap_items.id
AND srap_tags.name = ANY(:tags)
LEFT JOIN srap_task_results AS results
ON results.item_id = items.id
AND results.task = :task
WHERE
NOT EXISTS (
SELECT FROM tasks_in_progress AS pr WHERE pr.item_id = items.id
SELECT FROM srap_tasks_in_progress AS pr WHERE pr.item_id = items.id
)
),
candidates AS (
@ -66,12 +66,13 @@ let query = `
SELECT
results.*
FROM dependency_tasks
LEFT JOIN task_results AS results
LEFT JOIN srap_task_results AS results
ON dependency_tasks.task = results.task
AND dependency_tasks.task_version = results.task_version
AND results.item_id = candidates.id
WHERE
results.is_successful IS NULL
OR results.is_successful = FALSE
OR (
results.is_successful = TRUE
AND (
@ -86,7 +87,7 @@ let query = `
module.exports = function (state) {
const processTaskSafely = require("./streams/process-task-safely")(state);
const queries = require("./queries")(state);
const createDatabaseMutationAPI = require("./mutation-api/database")(state);
const createDatabaseQueue = require("./queued-database-api")(state);
let { knex, db } = state;
@ -138,18 +139,7 @@ module.exports = function (state) {
processTaskSafely(task, (item, tx) => {
logStatus(task, chalk.bold.cyan, "started", item.id);
let context = { tx, item, task, taskVersion, taskDependents, taskDependencies };
let databaseMutationAPI = createDatabaseMutationAPI(context);
let mutationAPI = createMutationAPIWrapper(context, databaseMutationAPI);
let queue = [];
let methods = [ "createItem", "renameItem", "mergeItem", "deleteItem", "createAlias", "deleteAlias", "updateData", "updateMetadata", "expire", "expireDependents" ];
let queueMethods = syncpipe(methods, [
(_) => _.map((method) => [ method, function() { queue.push([ method, arguments ]); } ]),
(_) => Object.fromEntries(_)
]);
let queue = createDatabaseQueue({ tx, item, task, taskVersion, taskDependents, taskDependencies });
return Promise.try(() => {
// TODO: Proper Validatem schemas for each API method
@ -159,18 +149,10 @@ module.exports = function (state) {
getItem: function (id) {
return queries.getItem(tx, id);
},
... queueMethods
... queue.api
});
}).then(() => {
if (!queue.some((method) => method[0] === "updateMetadata")) {
// Doing an updateMetadata call is necessary to mark a task 'completed', so we inject a dummy call that doesn't actually change the metadata itself
// FIXME: Split apart 'markTaskCompleted' and 'updateMetadata' queries so that this hack is no longer necessary
queue.push([ "updateMetadata", [ (data) => data ]]);
}
return Promise.each(queue, ([ method, args ]) => {
return mutationAPI[method](... args);
});
return queue.execute();
}).then(() => {
// Update succeeded
return db.TaskResult.query(tx).findById([ task, item.id ]).patch({
@ -183,10 +165,22 @@ module.exports = function (state) {
}).catch((error) => {
logStatus(task, chalk.bold.red, "failed", `${item.id}: ${error.stack}`);
let commonUpdate = {
is_successful: false,
task_version: taskVersion
};
return Promise.try(() => {
// Task failed -- note, cannot use tx here because it has failed
return db.TaskResult.query(knex).insert({
item_id: item.id,
task: task,
metadata: {},
... commonUpdate
});
}).catch(UniqueViolationError, () => {
return db.TaskResult.query(knex).findById([ task, item.id ]).patch({
is_successful: false
... commonUpdate
});
}).then(() => {
// throw error;

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save