Track current queue sizes as a metric
This commit is contained in:
parent
5b8e71c083
commit
e40113a701
|
@ -69,6 +69,14 @@ module.exports = function (state) {
|
|||
return backend.shutdown();
|
||||
},
|
||||
|
||||
getQueueSize: function (_tx) {
|
||||
let [ tx ] = validateArguments(arguments, {
|
||||
tx: maybeTX
|
||||
});
|
||||
|
||||
return backend.getQueueSize(tx);
|
||||
},
|
||||
|
||||
runInTransaction: function (_tx, _callback) {
|
||||
let [ tx, callback ] = validateArguments(arguments, {
|
||||
tx: maybeTX,
|
||||
|
@ -77,6 +85,7 @@ module.exports = function (state) {
|
|||
|
||||
return backend.runInTransaction(tx, callback);
|
||||
},
|
||||
|
||||
countLockedTasks: function (_tx) {
|
||||
let [ tx ] = validateArguments(arguments, {
|
||||
tx: maybeTX
|
||||
|
|
|
@ -12,6 +12,7 @@ const debug = require("debug")("srap:backend:postgresql:queries");
|
|||
|
||||
const models = require("./models");
|
||||
const mergeItems = require("../../semantics/merge-items");
|
||||
const syncpipe = require("syncpipe");
|
||||
|
||||
let migrationsFolder = path.join(__dirname, "../../../migrations");
|
||||
|
||||
|
@ -363,6 +364,17 @@ module.exports = function(state) {
|
|||
return result[0].count;
|
||||
});
|
||||
},
|
||||
|
||||
getQueueSize: function (tx) {
|
||||
return Promise.try(() => {
|
||||
return tx.raw(`SELECT task, COUNT(*) FROM srap_queue WHERE started = false GROUP BY task;`);
|
||||
}).then((result) => {
|
||||
return syncpipe(result.rows, [
|
||||
_ => _.map(({ task, count }) => [ task, parseInt(count) ]),
|
||||
_ => Object.fromEntries(_)
|
||||
]);
|
||||
});
|
||||
},
|
||||
|
||||
getUpdateStream: require("./queries/get-update-stream")(state),
|
||||
getTaskStream: require("./queries/get-task-stream")(state)
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
const Promise = require("bluebird");
|
||||
const chalk = require("chalk");
|
||||
const unreachable = require("@joepie91/unreachable")("srap");
|
||||
|
||||
const rateLimit = require("@promistream/rate-limit");
|
||||
const simpleSink = require("@promistream/simple-sink");
|
||||
|
@ -9,9 +10,10 @@ const pipe = require("@promistream/pipe");
|
|||
|
||||
const { validateOptions } = require("@validatem/core");
|
||||
const isValidConfiguration = require("./validators/is-valid-configuration");
|
||||
|
||||
const createPrometheus = require("./prometheus");
|
||||
const generateTaskGraph = require("./util/generate-task-graph");
|
||||
const unreachable = require("@joepie91/unreachable")("srap");
|
||||
const asyncInterval = require("./util/async-interval");
|
||||
|
||||
// FIXME: *Require* a taskInterval to be set, even if explicitly null, to prevent accidentally forgetting it
|
||||
|
||||
|
@ -71,6 +73,16 @@ module.exports = async function createKernel(_configuration) {
|
|||
run: async function runKernel() {
|
||||
console.log(`Starting ${tasks.size} tasks...`);
|
||||
await prepareDatabase();
|
||||
|
||||
asyncInterval(60 * 1000, () => {
|
||||
return Promise.try(() => {
|
||||
return backend.topLevel.getQueueSize();
|
||||
}).then((queueSize) => {
|
||||
for (let [ taskName, size ] of Object.entries(queueSize)) {
|
||||
metrics.taskQueueSize.labels({ task: taskName }).set(size);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
return Promise.map(tasks.values(), (task) => {
|
||||
return pipe([
|
||||
|
|
|
@ -62,6 +62,12 @@ module.exports = function createPrometheus() {
|
|||
name: "srap_task_refill_results_count",
|
||||
help: "Amount of new scraping tasks added to queue during the most recent attempt",
|
||||
labelNames: [ "task" ]
|
||||
}),
|
||||
taskQueueSize: new prometheusClient.Gauge({
|
||||
registers: [ prometheusRegistry ],
|
||||
name: "srap_task_queue_count",
|
||||
help: "Amount of scraping tasks currently queued up",
|
||||
labelNames: [ "task" ]
|
||||
})
|
||||
}
|
||||
};
|
||||
|
|
24
src/util/async-interval.js
Normal file
24
src/util/async-interval.js
Normal file
|
@ -0,0 +1,24 @@
|
|||
"use strict";
|
||||
|
||||
const Promise = require("bluebird");
|
||||
|
||||
module.exports = function asyncInterval(interval, callback) {
|
||||
function doCycle() {
|
||||
let startTime = Date.now();
|
||||
|
||||
return Promise.try(() => {
|
||||
return callback();
|
||||
}).then(() => {
|
||||
// let elapsed = Date.now() - startTime;
|
||||
let elapsed = 0; // HACK: Temporary way to force that the full interval is always waited *between* operations
|
||||
|
||||
if (elapsed > interval) {
|
||||
return doCycle();
|
||||
} else {
|
||||
return Promise.delay(interval - elapsed).then(doCycle);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return doCycle();
|
||||
};
|
Loading…
Reference in a new issue