You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
cvm/routes/tasks.js

113 lines
2.9 KiB
JavaScript

'use strict';
const rfr = require("rfr");
const errors = rfr("lib/errors");
const tagWebsocket = rfr("lib/tag-websocket");
const tagMessage = rfr("lib/tag-message");
const websocketTracker = rfr("lib/websockets/tracker");
const websocketDeduplicator = rfr("lib/deduplicator");
const fakeTask = rfr("lib/tasks/fake-task");
module.exports = function({taskTracker}) {
let router = require("express-promise-router")();
/* We manually create a deduplicator, so that we can share it across all the user-level websocket trackers. This does mean that we have to manually remove disconnected clients from our shared deduplicator. */
let deduplicator = websocketDeduplicator();
let allClients = websocketTracker({deduplicator: deduplicator});
let userClients = websocketTracker({deduplicator: deduplicator, namespaced: true});
let roleClients = websocketTracker({deduplicator: deduplicator, namespaced: true});
/* The adminClient tracker is a special case, and can use its own internal deduplicator if needed */
let adminClients = websocketTracker();
function emitTask(task, message) {
task.userIds.forEach((userId) => {
userClients.emit(userId, message);
});
task.roleIds.forEach((roleId) => {
roleClients.emit(roleId, message);
});
adminClients.emit(message);
}
taskTracker.on("newTask", (task) => {
emitTask(task, tagMessage({
// FIXME: Task state? pending/started/paused/cancelled/etc.
taskId: task.id,
type: "newTask",
name: task.name,
progress: task.progress,
started: task.started,
lastUpdated: task.lastUpdated,
lastOperation: task.lastOperation
}));
});
taskTracker.on("progress", (task, progress, lastOperation) => {
emitTask(task, tagMessage({
taskId: task.id,
type: "progress",
progress: progress,
lastOperation: lastOperation
}));
});
taskTracker.on("completed", (task) => {
emitTask(tagMessage({
taskId: task.id,
type: "completed"
}));
});
// FIXME: Initial task list
router.ws("/feed", (ws, req) => {
// FIXME: Auth user?
if (req.session.user == null) {
throw new errors.UnauthorizedError("You must be authenticated to obtain a task feed");
} else {
tagWebsocket(ws);
// We ignore client messages for now
allClients.add(client);
userClients.add(req.session.user.id, client);
req.session.user.roles.forEach((role) => {
roleClients.add(role.id, client);
});
ws.on("close", () => {
allClients.remove(client);
userClients.remove(client);
roleClients.remove(client);
deduplicator.forgetClient(client);
});
}
});
router.ws("/feed/all", (ws, req) => {
// FIXME: Require admin access
tagWebsocket(ws);
// We ignore client messages for now
adminClients.add(client);
ws.on("close", () => {
adminClients.remove(client);
});
});
router.get("/fake-task", (req, res) => {
let task = fakeTask(5000);
addTask("Fake Task", task, [req.session.user.id], [], {});
res.send("Fake task added!");
})
return router;
}