From ea7cd67158aff674fddae833f261ddfdbba424f5 Mon Sep 17 00:00:00 2001 From: Sven Slootweg Date: Thu, 24 Nov 2022 16:01:04 +0100 Subject: [PATCH] Fix race condition --- src/database-backends/postgresql/index.js | 40 +++++++++++++++---- .../postgresql/queries/get-task-stream.js | 1 + src/run-task.js | 11 ++++- 3 files changed, 42 insertions(+), 10 deletions(-) diff --git a/src/database-backends/postgresql/index.js b/src/database-backends/postgresql/index.js index fb23aff..07cb6e5 100644 --- a/src/database-backends/postgresql/index.js +++ b/src/database-backends/postgresql/index.js @@ -8,6 +8,7 @@ const knexLibrary = require("knex"); const { knexSnakeCaseMappers } = require("objection"); const { addMilliseconds } = require("date-fns"); const ValidationError = require("@validatem/error"); +const debug = require("debug")("srap:backend:postgresql:queries"); const models = require("./models"); const mergeItems = require("../../semantics/merge-items"); @@ -84,14 +85,37 @@ module.exports = function(state) { }, lock: function (tx, { id, task }) { - return Promise.try(() => { - // FIXME: use Objection for this? - // FIXME: Read item first to make it prevent edit conflicts via transaction - return tx.raw(`UPDATE srap_queue SET started = TRUE, started_at = NOW() WHERE task = :task AND item_id = :id`, { task: task.name, id: id }); - }).then(() => { - return true; - }).catch({ name: "UniqueViolationError" }, () => { - return false; + return this.runInTransaction(tx, (tx) => { + return Promise.try(() => { + return db.TaskInProgress.query(tx).forUpdate().findById([ task.name, id ]); + }).then((item) => { + if (item != null) { + debug(`Task '${task.name}:${id}' currently locked: ${item.started}`); + if (item.started === false) { + return Promise.try(() => { + return db.TaskInProgress.query(tx) + .findById([ task.name, id ]) + .update({ + started: true, + started_at: knex.fn.now() + }); + }).then(() => { + debug(`Task '${task.name}:${id}' locked successfully`); + return true; + }).catch({ name: "UniqueViolationError" }, () => { + // FIXME: This is not the correct error... + debug(`Task '${task.name}:${id}' failed lock because already locked on write`); + return false; + }); + } else { + debug(`Task '${task.name}:${id}' failed lock because already locked on read`); + return false; + } + } else { + debug(`Task '${task.name}:${id}' failed lock because it no longer exists in queue`); + return false; + } + }); }); }, diff --git a/src/database-backends/postgresql/queries/get-task-stream.js b/src/database-backends/postgresql/queries/get-task-stream.js index dff75dc..50db093 100644 --- a/src/database-backends/postgresql/queries/get-task-stream.js +++ b/src/database-backends/postgresql/queries/get-task-stream.js @@ -97,6 +97,7 @@ module.exports = function ({ metrics, backendSettings, knex }) { let fetchParameters = { task: task.name, resultLimit: backendSettings.taskBatchSize + // resultLimit: 1 // For tracking down race conditions }; function refillQueue() { diff --git a/src/run-task.js b/src/run-task.js index 1820247..3e1aaa0 100644 --- a/src/run-task.js +++ b/src/run-task.js @@ -11,7 +11,10 @@ module.exports = function ({ backend }) { let api = backend.forItem({ task: task, item: item, mutationQueue: queue }); return Promise.try(() => { - logStatus(task, chalk.bold.cyan, "started", item.id); + // TODO: Standardize logging control/levels interface, also for library use + if (!process.env.SRAP_QUIET) { + logStatus(task, chalk.bold.cyan, "started", item.id); + } // NOTE: We only pass in the item data itself, *not* any associated metadata like tags. If the scraping task wants access to that sort of information, it should do a `getItem` call from within its task logic where needed. // FIXME: Is that actually still true post-refactor? @@ -28,7 +31,11 @@ module.exports = function ({ backend }) { }); }).then(async () => { await api.internal.markTaskCompleted(); - logStatus(task, chalk.bold.green, "completed", item.id); + + if (!process.env.SRAP_QUIET) { + logStatus(task, chalk.bold.green, "completed", item.id); + } + return { status: "completed", item: item }; }).catch(async (error) => { await api.internal.markTaskFailed(null, { error });