From e40113a701dce97ff485d87ee847e8d7aeb4ac4a Mon Sep 17 00:00:00 2001 From: Sven Slootweg Date: Sat, 26 Nov 2022 15:22:49 +0100 Subject: [PATCH] Track current queue sizes as a metric --- src/database-backends/index.js | 9 +++++++++ src/database-backends/postgresql/index.js | 12 ++++++++++++ src/kernel.js | 14 ++++++++++++- src/prometheus/index.js | 6 ++++++ src/util/async-interval.js | 24 +++++++++++++++++++++++ 5 files changed, 64 insertions(+), 1 deletion(-) create mode 100644 src/util/async-interval.js diff --git a/src/database-backends/index.js b/src/database-backends/index.js index ebaaecd..ae99d96 100644 --- a/src/database-backends/index.js +++ b/src/database-backends/index.js @@ -69,6 +69,14 @@ module.exports = function (state) { return backend.shutdown(); }, + getQueueSize: function (_tx) { + let [ tx ] = validateArguments(arguments, { + tx: maybeTX + }); + + return backend.getQueueSize(tx); + }, + runInTransaction: function (_tx, _callback) { let [ tx, callback ] = validateArguments(arguments, { tx: maybeTX, @@ -77,6 +85,7 @@ module.exports = function (state) { return backend.runInTransaction(tx, callback); }, + countLockedTasks: function (_tx) { let [ tx ] = validateArguments(arguments, { tx: maybeTX diff --git a/src/database-backends/postgresql/index.js b/src/database-backends/postgresql/index.js index 559c292..4f80d52 100644 --- a/src/database-backends/postgresql/index.js +++ b/src/database-backends/postgresql/index.js @@ -12,6 +12,7 @@ const debug = require("debug")("srap:backend:postgresql:queries"); const models = require("./models"); const mergeItems = require("../../semantics/merge-items"); +const syncpipe = require("syncpipe"); let migrationsFolder = path.join(__dirname, "../../../migrations"); @@ -363,6 +364,17 @@ module.exports = function(state) { return result[0].count; }); }, + + getQueueSize: function (tx) { + return Promise.try(() => { + return tx.raw(`SELECT task, COUNT(*) FROM srap_queue WHERE started = false GROUP BY task;`); + }).then((result) => { + return syncpipe(result.rows, [ + _ => _.map(({ task, count }) => [ task, parseInt(count) ]), + _ => Object.fromEntries(_) + ]); + }); + }, getUpdateStream: require("./queries/get-update-stream")(state), getTaskStream: require("./queries/get-task-stream")(state) diff --git a/src/kernel.js b/src/kernel.js index 1e53542..48a957a 100644 --- a/src/kernel.js +++ b/src/kernel.js @@ -2,6 +2,7 @@ const Promise = require("bluebird"); const chalk = require("chalk"); +const unreachable = require("@joepie91/unreachable")("srap"); const rateLimit = require("@promistream/rate-limit"); const simpleSink = require("@promistream/simple-sink"); @@ -9,9 +10,10 @@ const pipe = require("@promistream/pipe"); const { validateOptions } = require("@validatem/core"); const isValidConfiguration = require("./validators/is-valid-configuration"); + const createPrometheus = require("./prometheus"); const generateTaskGraph = require("./util/generate-task-graph"); -const unreachable = require("@joepie91/unreachable")("srap"); +const asyncInterval = require("./util/async-interval"); // FIXME: *Require* a taskInterval to be set, even if explicitly null, to prevent accidentally forgetting it @@ -71,6 +73,16 @@ module.exports = async function createKernel(_configuration) { run: async function runKernel() { console.log(`Starting ${tasks.size} tasks...`); await prepareDatabase(); + + asyncInterval(60 * 1000, () => { + return Promise.try(() => { + return backend.topLevel.getQueueSize(); + }).then((queueSize) => { + for (let [ taskName, size ] of Object.entries(queueSize)) { + metrics.taskQueueSize.labels({ task: taskName }).set(size); + } + }); + }); return Promise.map(tasks.values(), (task) => { return pipe([ diff --git a/src/prometheus/index.js b/src/prometheus/index.js index 43f80b7..82b8e10 100644 --- a/src/prometheus/index.js +++ b/src/prometheus/index.js @@ -62,6 +62,12 @@ module.exports = function createPrometheus() { name: "srap_task_refill_results_count", help: "Amount of new scraping tasks added to queue during the most recent attempt", labelNames: [ "task" ] + }), + taskQueueSize: new prometheusClient.Gauge({ + registers: [ prometheusRegistry ], + name: "srap_task_queue_count", + help: "Amount of scraping tasks currently queued up", + labelNames: [ "task" ] }) } }; diff --git a/src/util/async-interval.js b/src/util/async-interval.js new file mode 100644 index 0000000..4c33dd1 --- /dev/null +++ b/src/util/async-interval.js @@ -0,0 +1,24 @@ +"use strict"; + +const Promise = require("bluebird"); + +module.exports = function asyncInterval(interval, callback) { + function doCycle() { + let startTime = Date.now(); + + return Promise.try(() => { + return callback(); + }).then(() => { + // let elapsed = Date.now() - startTime; + let elapsed = 0; // HACK: Temporary way to force that the full interval is always waited *between* operations + + if (elapsed > interval) { + return doCycle(); + } else { + return Promise.delay(interval - elapsed).then(doCycle); + } + }); + } + + return doCycle(); +};