diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..b624ae2 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,21 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "type": "node", + "request": "launch", + "name": "Launch Program", + "skipFiles": [ + "/**" + ], + "program": "./bin/simulate", + "args": ["../seekseek/scraper-config/", "lcsc:normalizeProduct", "lcsc:product:C494972"], + "env": { + "DEBUG": "srap:backend:postgresql:*" + } + } + ] +} diff --git a/migrations/20221123175316_queue-table.js b/migrations/20221123175316_queue-table.js new file mode 100644 index 0000000..0892c7e --- /dev/null +++ b/migrations/20221123175316_queue-table.js @@ -0,0 +1,19 @@ +"use strict"; + +module.exports.up = function(knex, Promise) { + return knex.schema + .alterTable("srap_tasks_in_progress", (table) => { + table.timestamp("started_at").alter().nullable().defaultTo(null); + table.boolean("started").notNullable().defaultTo(false); + }) + .renameTable("srap_tasks_in_progress", "srap_queue"); +}; + +module.exports.down = function(knex, Promise) { + return knex.schema + .renameTable("srap_queue", "srap_tasks_in_progress") + .alterTable("srap_tasks_in_progress", (table) => { + table.timestamp("started_at").alter().notNullable().defaultTo(knex.fn.now()); + table.dropColumn("started"); + }); +}; diff --git a/src/database-backends/index.js b/src/database-backends/index.js index bb70805..d6bde6a 100644 --- a/src/database-backends/index.js +++ b/src/database-backends/index.js @@ -237,7 +237,7 @@ module.exports = function (state) { }); return mutableOperation((tx) => { - return backend.moveItem(tx, { options, allowMerge: (options.merge != null) }); + return backend.moveItem(tx, { ... options, allowMerge: (options.merge != null) }); }); }, diff --git a/src/database-backends/postgresql/index.js b/src/database-backends/postgresql/index.js index 51e53fd..fb23aff 100644 --- a/src/database-backends/postgresql/index.js +++ b/src/database-backends/postgresql/index.js @@ -85,10 +85,9 @@ module.exports = function(state) { lock: function (tx, { id, task }) { return Promise.try(() => { - return db.TaskInProgress.query(tx).insert({ - task: task.name, - item_id: id - }); + // FIXME: use Objection for this? + // FIXME: Read item first to make it prevent edit conflicts via transaction + return tx.raw(`UPDATE srap_queue SET started = TRUE, started_at = NOW() WHERE task = :task AND item_id = :id`, { task: task.name, id: id }); }).then(() => { return true; }).catch({ name: "UniqueViolationError" }, () => { @@ -338,7 +337,7 @@ module.exports = function(state) { countLockedTasks: function (tx) { return Promise.try(() => { - return db.TaskInProgress.query(tx).count({ count: "*" }); + return db.TaskInProgress.query(tx).count({ count: "*" }).where({ started: true }); }).then((result) => { return result[0].count; }); diff --git a/src/database-backends/postgresql/models/item.js b/src/database-backends/postgresql/models/item.js index 9e9cb9f..699fdf9 100644 --- a/src/database-backends/postgresql/models/item.js +++ b/src/database-backends/postgresql/models/item.js @@ -26,7 +26,7 @@ module.exports = function ({ db }) { tasksInProgress: { relation: Model.HasManyRelation, modelClass: db.TaskInProgress, - join: { from: "srap_items.id", to: "srap_tasksInProgress.itemId" } + join: { from: "srap_items.id", to: "srap_queue.itemId" } }, failedTasks: { // Not actually a many-to-many, but that's what objection calls a HasManyThrough... @@ -35,7 +35,7 @@ module.exports = function ({ db }) { modelClass: db.Failure, join: { from: "srap_items.id", - through: { from: "srap_task_results.itemId", to: "srap_task_results.id" }, + through: { from: "srap_taskResults.itemId", to: "srap_taskResults.id" }, to: "srap_failures.taskResultId" } } diff --git a/src/database-backends/postgresql/models/tag.js b/src/database-backends/postgresql/models/tag.js index 4d4b08e..0caf093 100644 --- a/src/database-backends/postgresql/models/tag.js +++ b/src/database-backends/postgresql/models/tag.js @@ -11,7 +11,7 @@ module.exports = function ({ db }) { item: { relation: Model.BelongsToOneRelation, modelClass: db.Item, - join: { from: "srap_tags.itemId", to: "srap_item.id" } + join: { from: "srap_tags.itemId", to: "srap_items.id" } } }; }; diff --git a/src/database-backends/postgresql/models/task-in-progress.js b/src/database-backends/postgresql/models/task-in-progress.js index a9e90e7..b93dc84 100644 --- a/src/database-backends/postgresql/models/task-in-progress.js +++ b/src/database-backends/postgresql/models/task-in-progress.js @@ -4,7 +4,7 @@ const { Model } = require("objection"); module.exports = function ({ db }) { return class TaskInProgress extends Model { - static tableName = "srap_tasksInProgress"; + static tableName = "srap_queue"; static idColumn = [ "task", "itemId" ]; static get relationMappings() { @@ -12,7 +12,7 @@ module.exports = function ({ db }) { item: { relation: Model.BelongsToOneRelation, modelClass: db.Item, - join: { from: "srap_tasksInProgress.itemId", to: "srap_item.id" } + join: { from: "srap_queue.itemId", to: "srap_items.id" } } }; }; diff --git a/src/database-backends/postgresql/models/task-result.js b/src/database-backends/postgresql/models/task-result.js index 103965b..7b9880d 100644 --- a/src/database-backends/postgresql/models/task-result.js +++ b/src/database-backends/postgresql/models/task-result.js @@ -12,7 +12,7 @@ module.exports = function ({ db }) { item: { relation: Model.BelongsToOneRelation, modelClass: db.Item, - join: { from: "srap_taskResults.itemId", to: "srap_item.id" } + join: { from: "srap_taskResults.itemId", to: "srap_items.id" } } }; }; diff --git a/src/database-backends/postgresql/queries/get-task-stream.js b/src/database-backends/postgresql/queries/get-task-stream.js index 95236ce..dff75dc 100644 --- a/src/database-backends/postgresql/queries/get-task-stream.js +++ b/src/database-backends/postgresql/queries/get-task-stream.js @@ -6,7 +6,20 @@ const buffer = require("@promistream/buffer"); const pipe = require("@promistream/pipe"); const simpleSource = require("@promistream/simple-source"); -const query = ` +const fetchQuery = ` + SELECT + srap_items.* + FROM + srap_queue + LEFT JOIN srap_items + ON srap_items.id = srap_queue.item_id + WHERE + srap_queue.task = :task + AND srap_queue.started = FALSE + LIMIT :resultLimit +`; + +const fillQuery = ` WITH dependency_tasks AS ( SELECT * FROM @@ -14,7 +27,6 @@ const query = ` ), matching_items AS ( SELECT - DISTINCT ON (srap_items.id) srap_items.*, results.updated_at AS result_date, results.task_version, @@ -32,22 +44,19 @@ const query = ` LEFT JOIN srap_task_results AS results ON results.item_id = srap_items.id AND results.task = :task - WHERE - NOT EXISTS ( - SELECT FROM srap_tasks_in_progress AS pr WHERE pr.item_id = srap_items.id - ) ), candidates AS ( SELECT * FROM matching_items WHERE result_date IS NULL - UNION + UNION ALL SELECT * FROM matching_items WHERE is_candidate = TRUE OR NOT (task_version = :taskVersion) ) ( SELECT - * + :task AS task, + id AS item_id FROM candidates WHERE @@ -70,26 +79,53 @@ const query = ` ) ) ) - ) LIMIT :resultLimit; + ) `; -module.exports = function ({ metrics, backendSettings }) { +module.exports = function ({ metrics, backendSettings, knex }) { return function (tx, { task }) { + let refillParameters = { + tags: task.tags, + task: task.name, + taskVersion: task.version, + dependencyTaskDefinitions: JSON.stringify(task.dependencies.map((dependency) => { + // Case-mapping for SQL compatibility + return { task_version: dependency.version, task: dependency.name }; + })) + }; + + let fetchParameters = { + task: task.name, + resultLimit: backendSettings.taskBatchSize + }; + + function refillQueue() { + let startTime = Date.now(); + + return Promise.try(() => { + // NOTE: We are deliberately bypassing the transaction here! Also deliberately not using VALUES, since we're inserting from the results of another query instead + return knex.raw(` + INSERT INTO srap_queue (task, item_id) + (${fillQuery}) + ON CONFLICT (task, item_id) DO NOTHING; + `, refillParameters); + }).then((response) => { + let timeElapsed = Date.now() - startTime; + + metrics.taskRefillTime.labels({ task: task.name }).set(timeElapsed / 1000); + metrics.taskRefillResults.labels({ task: task.name }).set(response.rowCount); + + debug(`Queue for '${task.name}' was refilled with ${response.rowCount} items in ${timeElapsed}ms`); + return response.rowCount; + }); + } + return pipe([ simpleSource(() => { let startTime = Date.now(); return Promise.try(() => { - return tx.raw(query, { - tags: task.tags, - task: task.name, - taskVersion: task.version, - resultLimit: backendSettings.taskBatchSize, - dependencyTaskDefinitions: JSON.stringify(task.dependencies.map((dependency) => { - // Case-mapping for SQL compatibility - return { task_version: dependency.version, task: dependency.name }; - })) - }); + return tx.raw(fetchQuery, fetchParameters); }).then((result) => { let timeElapsed = Date.now() - startTime; @@ -101,8 +137,17 @@ module.exports = function ({ metrics, backendSettings }) { if (result.rowCount > 0) { return result.rows; } else { - // TODO: Consider using LISTEN/NOTIFY instead? - return Promise.resolve([]).delay(backendSettings.taskBatchDelay); + return Promise.try(() => { + return refillQueue(); + }).then((newItems) => { + if (newItems === 0) { + // TODO: Consider using LISTEN/NOTIFY instead? Worth the added complexity? + return Promise.resolve([]).delay(backendSettings.taskBatchDelay); + } else { + // Have another go right away + return []; + } + }); } }); }), diff --git a/src/database-backends/simulated/index.js b/src/database-backends/simulated/index.js index 917f70f..2502b7c 100644 --- a/src/database-backends/simulated/index.js +++ b/src/database-backends/simulated/index.js @@ -104,7 +104,7 @@ module.exports = function (state) { return Promise.all([ this.getItem(tx, { id: from, optional: true }), this.getItem(tx, { id: into, optional: true }), - ]).then((fromObj, maybeIntoObj) => { + ]).then(([ fromObj, maybeIntoObj ]) => { if (fromObj != null) { let intoObj = maybeIntoObj ?? { id: into, diff --git a/src/prometheus/index.js b/src/prometheus/index.js index c9420f6..b696835 100644 --- a/src/prometheus/index.js +++ b/src/prometheus/index.js @@ -38,8 +38,19 @@ module.exports = function createPrometheus() { name: "srap_task_fetch_results_count", help: "Amount of new scraping tasks fetched during the most recent attempt", labelNames: [ "task" ] + }), + taskRefillTime: new prometheusClient.Gauge({ + registers: [ prometheusRegistry ], + name: "srap_task_refill_seconds", + help: "Time needed for the most recent refill of the task queue", + labelNames: [ "task" ] + }), + taskRefillResults: new prometheusClient.Gauge({ + registers: [ prometheusRegistry ], + name: "srap_task_refill_results_count", + help: "Amount of new scraping tasks added to queue during the most recent attempt", + labelNames: [ "task" ] }) - // FIXME: Measure queue-refill task } }; };