Compare commits

...

3 Commits

@ -33,7 +33,7 @@ const isTaskObject = require("../validators/is-task-object");
// FIXME: Verify that all internal method calls in the PostgreSQL backend are still valid after moving argument validation/normalization into this module // FIXME: Verify that all internal method calls in the PostgreSQL backend are still valid after moving argument validation/normalization into this module
module.exports = function (state) { module.exports = function (state) {
let { tasks } = state; let { tasks, metrics } = state;
const backendModules = { const backendModules = {
"postgresql": require("./postgresql")(state), "postgresql": require("./postgresql")(state),
@ -370,7 +370,15 @@ module.exports = function (state) {
}] }]
}); });
return backend.lock(tx, options); return Promise.try(() => {
return backend.lock(tx, options);
}).tap((succeeded) => {
if (succeeded) {
metrics.successfulLocks.labels({ task: task.name }).inc(1);
} else {
metrics.failedLocks.labels({ task: task.name }).inc(1);
}
});
}, },
unlock: function (_tx, _options) { unlock: function (_tx, _options) {

@ -102,10 +102,6 @@ module.exports = function(state) {
}).then(() => { }).then(() => {
debug(`Task '${task.name}:${id}' locked successfully`); debug(`Task '${task.name}:${id}' locked successfully`);
return true; return true;
}).catch({ name: "UniqueViolationError" }, () => {
// FIXME: This is not the correct error...
debug(`Task '${task.name}:${id}' failed lock because already locked on write`);
return false;
}); });
} else { } else {
debug(`Task '${task.name}:${id}' failed lock because already locked on read`); debug(`Task '${task.name}:${id}' failed lock because already locked on read`);

@ -80,10 +80,10 @@ module.exports = async function createKernel(_configuration) {
simpleSink(({ status }) => { simpleSink(({ status }) => {
if (status === "completed") { if (status === "completed") {
metrics.successfulItems.inc(1); metrics.successfulItems.inc(1);
metrics.successfulItems.labels({ task: task }).inc(1); metrics.successfulItems.labels({ task: task.name }).inc(1);
} else if (status === "failed") { } else if (status === "failed") {
metrics.failedItems.inc(1); metrics.failedItems.inc(1);
metrics.failedItems.labels({ task: task }).inc(1); metrics.failedItems.labels({ task: task.name }).inc(1);
} else { } else {
unreachable(`Unrecognized status '${status}'`); unreachable(`Unrecognized status '${status}'`);
} }

@ -27,6 +27,18 @@ module.exports = function createPrometheus() {
help: "Amount of items that have failed during processing", help: "Amount of items that have failed during processing",
labelNames: [ "task" ] labelNames: [ "task" ]
}), }),
successfulLocks: new prometheusClient.Counter({
registers: [ prometheusRegistry ],
name: "srap_successful_locks_total",
help: "Amount of queue item lock attempts that were successful",
labelNames: [ "task" ]
}),
failedLocks: new prometheusClient.Counter({
registers: [ prometheusRegistry ],
name: "srap_failed_locks_total",
help: "Amount of queue item lock attempts that failed",
labelNames: [ "task" ]
}),
taskFetchTime: new prometheusClient.Gauge({ taskFetchTime: new prometheusClient.Gauge({
registers: [ prometheusRegistry ], registers: [ prometheusRegistry ],
name: "srap_task_fetch_seconds", name: "srap_task_fetch_seconds",

Loading…
Cancel
Save