'use strict'; const rfr = require("rfr"); const errors = rfr("lib/errors"); const tagWebsocket = rfr("lib/tag-websocket"); const tagMessage = rfr("lib/tag-message"); const websocketTracker = rfr("lib/websockets/tracker"); const websocketDeduplicator = rfr("lib/deduplicator"); const fakeTask = rfr("lib/tasks/fake-task"); module.exports = function({taskTracker}) { let router = require("express-promise-router")(); /* We manually create a deduplicator, so that we can share it across all the user-level websocket trackers. This does mean that we have to manually remove disconnected clients from our shared deduplicator. */ let deduplicator = websocketDeduplicator(); let allClients = websocketTracker({deduplicator: deduplicator}); let userClients = websocketTracker({deduplicator: deduplicator, namespaced: true}); let roleClients = websocketTracker({deduplicator: deduplicator, namespaced: true}); /* The adminClient tracker is a special case, and can use its own internal deduplicator if needed */ let adminClients = websocketTracker(); function emitTask(task, message) { task.userIds.forEach((userId) => { userClients.emit(userId, message); }); task.roleIds.forEach((roleId) => { roleClients.emit(roleId, message); }); adminClients.emit(message); } taskTracker.on("newTask", (task) => { emitTask(task, tagMessage({ // FIXME: Task state? pending/started/paused/cancelled/etc. taskId: task.id, type: "newTask", name: task.name, progress: task.progress, started: task.started, lastUpdated: task.lastUpdated, lastOperation: task.lastOperation })); }); taskTracker.on("progress", (task, progress, lastOperation) => { emitTask(task, tagMessage({ taskId: task.id, type: "progress", progress: progress, lastOperation: lastOperation })); }); taskTracker.on("completed", (task) => { emitTask(tagMessage({ taskId: task.id, type: "completed" })); }); // FIXME: Initial task list router.ws("/feed", (ws, req) => { // FIXME: Auth user? if (req.session.user == null) { throw new errors.UnauthorizedError("You must be authenticated to obtain a task feed"); } else { tagWebsocket(ws); // We ignore client messages for now allClients.add(client); userClients.add(req.session.user.id, client); req.session.user.roles.forEach((role) => { roleClients.add(role.id, client); }); ws.on("close", () => { allClients.remove(client); userClients.remove(client); roleClients.remove(client); deduplicator.forgetClient(client); }); } }); router.ws("/feed/all", (ws, req) => { // FIXME: Require admin access tagWebsocket(ws); // We ignore client messages for now adminClients.add(client); ws.on("close", () => { adminClients.remove(client); }); }); router.get("/fake-task", (req, res) => { let task = fakeTask(5000); addTask("Fake Task", task, [req.session.user.id], [], {}); res.send("Fake task added!"); }) return router; }