Fix race condition

master
Sven Slootweg 2 years ago
parent b9b0e63454
commit ea7cd67158

@ -8,6 +8,7 @@ const knexLibrary = require("knex");
const { knexSnakeCaseMappers } = require("objection");
const { addMilliseconds } = require("date-fns");
const ValidationError = require("@validatem/error");
const debug = require("debug")("srap:backend:postgresql:queries");
const models = require("./models");
const mergeItems = require("../../semantics/merge-items");
@ -84,14 +85,37 @@ module.exports = function(state) {
},
lock: function (tx, { id, task }) {
return this.runInTransaction(tx, (tx) => {
return Promise.try(() => {
// FIXME: use Objection for this?
// FIXME: Read item first to make it prevent edit conflicts via transaction
return tx.raw(`UPDATE srap_queue SET started = TRUE, started_at = NOW() WHERE task = :task AND item_id = :id`, { task: task.name, id: id });
return db.TaskInProgress.query(tx).forUpdate().findById([ task.name, id ]);
}).then((item) => {
if (item != null) {
debug(`Task '${task.name}:${id}' currently locked: ${item.started}`);
if (item.started === false) {
return Promise.try(() => {
return db.TaskInProgress.query(tx)
.findById([ task.name, id ])
.update({
started: true,
started_at: knex.fn.now()
});
}).then(() => {
debug(`Task '${task.name}:${id}' locked successfully`);
return true;
}).catch({ name: "UniqueViolationError" }, () => {
// FIXME: This is not the correct error...
debug(`Task '${task.name}:${id}' failed lock because already locked on write`);
return false;
});
} else {
debug(`Task '${task.name}:${id}' failed lock because already locked on read`);
return false;
}
} else {
debug(`Task '${task.name}:${id}' failed lock because it no longer exists in queue`);
return false;
}
});
});
},

@ -97,6 +97,7 @@ module.exports = function ({ metrics, backendSettings, knex }) {
let fetchParameters = {
task: task.name,
resultLimit: backendSettings.taskBatchSize
// resultLimit: 1 // For tracking down race conditions
};
function refillQueue() {

@ -11,7 +11,10 @@ module.exports = function ({ backend }) {
let api = backend.forItem({ task: task, item: item, mutationQueue: queue });
return Promise.try(() => {
// TODO: Standardize logging control/levels interface, also for library use
if (!process.env.SRAP_QUIET) {
logStatus(task, chalk.bold.cyan, "started", item.id);
}
// NOTE: We only pass in the item data itself, *not* any associated metadata like tags. If the scraping task wants access to that sort of information, it should do a `getItem` call from within its task logic where needed.
// FIXME: Is that actually still true post-refactor?
@ -28,7 +31,11 @@ module.exports = function ({ backend }) {
});
}).then(async () => {
await api.internal.markTaskCompleted();
if (!process.env.SRAP_QUIET) {
logStatus(task, chalk.bold.green, "completed", item.id);
}
return { status: "completed", item: item };
}).catch(async (error) => {
await api.internal.markTaskFailed(null, { error });

Loading…
Cancel
Save