Compare commits

...

3 Commits

@ -33,7 +33,7 @@ const isTaskObject = require("../validators/is-task-object");
// FIXME: Verify that all internal method calls in the PostgreSQL backend are still valid after moving argument validation/normalization into this module
module.exports = function (state) {
let { tasks } = state;
let { tasks, metrics } = state;
const backendModules = {
"postgresql": require("./postgresql")(state),
@ -370,7 +370,15 @@ module.exports = function (state) {
}]
});
return backend.lock(tx, options);
return Promise.try(() => {
return backend.lock(tx, options);
}).tap((succeeded) => {
if (succeeded) {
metrics.successfulLocks.labels({ task: task.name }).inc(1);
} else {
metrics.failedLocks.labels({ task: task.name }).inc(1);
}
});
},
unlock: function (_tx, _options) {

@ -102,10 +102,6 @@ module.exports = function(state) {
}).then(() => {
debug(`Task '${task.name}:${id}' locked successfully`);
return true;
}).catch({ name: "UniqueViolationError" }, () => {
// FIXME: This is not the correct error...
debug(`Task '${task.name}:${id}' failed lock because already locked on write`);
return false;
});
} else {
debug(`Task '${task.name}:${id}' failed lock because already locked on read`);

@ -80,10 +80,10 @@ module.exports = async function createKernel(_configuration) {
simpleSink(({ status }) => {
if (status === "completed") {
metrics.successfulItems.inc(1);
metrics.successfulItems.labels({ task: task }).inc(1);
metrics.successfulItems.labels({ task: task.name }).inc(1);
} else if (status === "failed") {
metrics.failedItems.inc(1);
metrics.failedItems.labels({ task: task }).inc(1);
metrics.failedItems.labels({ task: task.name }).inc(1);
} else {
unreachable(`Unrecognized status '${status}'`);
}

@ -27,6 +27,18 @@ module.exports = function createPrometheus() {
help: "Amount of items that have failed during processing",
labelNames: [ "task" ]
}),
successfulLocks: new prometheusClient.Counter({
registers: [ prometheusRegistry ],
name: "srap_successful_locks_total",
help: "Amount of queue item lock attempts that were successful",
labelNames: [ "task" ]
}),
failedLocks: new prometheusClient.Counter({
registers: [ prometheusRegistry ],
name: "srap_failed_locks_total",
help: "Amount of queue item lock attempts that failed",
labelNames: [ "task" ]
}),
taskFetchTime: new prometheusClient.Gauge({
registers: [ prometheusRegistry ],
name: "srap_task_fetch_seconds",

Loading…
Cancel
Save