"use strict"; const defaultValue = require("default-value"); const assureArray = require("assure-array"); const matchValue = require("match-value"); const concatArrays = require("concat-arrays"); const syncpipe = require("syncpipe"); const flatten = require("flatten"); const { validateArguments } = require("@validatem/core"); const required = require("@validatem/required"); const eventDeduplicator = require("../event-deduplicator"); const isSyncResponse = require("../is-sync-response"); const removeStateDuplication = require("./remove-state-duplication"); function maybeMap(array, mappingFunction) { if (array == null) { return []; } else { return array.map(mappingFunction); } } function maybeMapObject(object, mappingFunction) { if (object == null) { return []; } else { return Object.entries(object).map(mappingFunction); } } module.exports = function parseSyncResponse(_syncResponse, strict = false) { // require("fs").writeFileSync("private/dump.json", JSON.stringify(_syncResponse)); let [ syncResponse ] = validateArguments(arguments, { syncResponseBody: [ required, isSyncResponse(strict) ], // TODO: Validate and normalize the response body, including setting defaults, and allowing extra properties }); // We keep an event ID -> event body mapping, to ensure that the same event in different places in the response maps to the same in-memory object in our resulting event list; this is useful both to save memory, and to make equality-checking operations work // FIXME: Check if we need to deep-compare objects here to detect abbreviated versions of events? Otherwise we might end up replacing a full event with an abbreviated version. let deduplicateEvent = eventDeduplicator(); function toTimestampedEvent(event, type) { return { type: type, event: deduplicateEvent(event), timestamp: event.origin_server_ts }; } function toUntimestampedEvent(event, type) { return { type: type, event: deduplicateEvent(event) }; } function toRoomSummaryEvent(key, value) { return { type: "roomSummaryUpdate", key: key, value: value }; } function toNotificationCountsEvent(notificationCount) { return { type: "roomNotificationCounts", data: notificationCount }; } function toDeviceListUserChangedEvent(user) { return { type: "deviceListUserChanged", user: user }; } function toDeviceListUserLeftEvent(user) { return { type: "deviceListUserLeft", user: user }; } function toMemberStateEvent(room, memberState) { return { type: "memberState", room: room, // FIXME: Rename all room to roomID for consistency? state: memberState }; } let roomParsingRules = [ { types: [ "joined", "left", "invited" ], parser: (room, memberState) => { return toMemberStateEvent(room, memberState); } }, { types: [ "joined", "left" ], parser: (room) => { return maybeMap(room.state.events, (event) => toTimestampedEvent(event, "roomStateUpdate")); } }, { types: [ "joined", "left" ], parser: (room) => { // NOTE: Can still contain state events! But they are a part of the timeline, not a 'summarized' state delta like `roomStateUpdate`s. return maybeMap(room.timeline.events, (event) => toTimestampedEvent(event, "roomTimelineEvent")); } }, { types: [ "joined", "left" ], parser: (room) => { return maybeMap(room.account_data.events, (event) => toUntimestampedEvent(event, "roomAccountData")); } }, { types: [ "joined" ], parser: (room) => { return maybeMap(room.ephemeral.events, (event) => toUntimestampedEvent(event, "roomEphemeralEvent")); } }, { types: [ "joined" ], parser: (room) => { return maybeMapObject(room.summary, ([ key, value ]) => toRoomSummaryEvent(key, value)); } }, { types: [ "joined" ], parser: (room) => { if (room.unread_notifications != null) { return toNotificationCountsEvent(room.unread_notifications); } } }, { types: [ "invited" ], parser: (room) => { return maybeMap(room.invite_state.events, (event) => toUntimestampedEvent(event, "roomInviteState")); } }, ]; let globalParsingRules = [ { key: "device_lists", parser: (deviceLists) => { return concatArrays([ maybeMap(deviceLists.changed, (user) => toDeviceListUserChangedEvent(user)), maybeMap(deviceLists.left, (user) => toDeviceListUserLeftEvent(user)), ]); } }, { key: "device_one_time_keys_count", parser: (oneTimeKeysCounts) => { /* QUESTION: Always received, or only when one value updates? And if the latter, only the delta or the full list of algorithms? */ return { type: "deviceOneTimeKeysCount", keyCounts: oneTimeKeysCounts }; } }, ]; let limitedRooms = []; let previousBatchTokens = {}; let roomsWithTimelines = concatArrays( Object.entries(syncResponse.rooms.join), Object.entries(syncResponse.rooms.leave), ); for (let [ roomId, room ] of roomsWithTimelines) { if (room.timeline.prev_batch != null) { previousBatchTokens[roomId] = room.timeline.prev_batch; } if (room.timeline.limited === true) { limitedRooms.push(roomId); } } function parseRoom({ room, roomId, memberState }) { return roomParsingRules.map((rule) => { if (rule.types.includes(memberState)) { return assureArray(rule.parser(room, memberState)) .filter((event) => event != null) .map((event) => { return { ... event, room: roomId, // memberState: memberState }; }); } }); } let roomEvents = ["join", "leave", "invite"].map((roomType) => { if (syncResponse.rooms[roomType] != null) { let memberState = matchValue(roomType, { join: "joined", leave: "left", invite: "invited" }); return Object.entries(syncResponse.rooms[roomType]) .filter(([ _roomId, room ]) => room != null) .map(([ roomId, room ]) => { return parseRoom({ room, roomId, memberState }); }); } else { return []; } }); let globalEvents = globalParsingRules.map((rule) => { let data = syncResponse[rule.key]; if (data != null) { return syncpipe(data, [ (_) => rule.parser(_), (_) => assureArray(_) ]); } else { return []; } }); let globalTimelineMapping = { presence: "presenceEvent", account_data: "accountData", to_device: "toDeviceEvent" }; let globalTimelineEvents = syncpipe(globalTimelineMapping, [ (_) => Object.entries(_), (_) => _.map(([ source, eventType ]) => { let events = defaultValue(syncResponse[source].events, []); return events.map((event) => { return { type: eventType, event: deduplicateEvent(event) }; }); }) ]); let events = syncpipe(null, [ (_) => concatArrays( globalEvents, globalTimelineEvents, roomEvents ), (_) => flatten(_), (_) => _.filter((event) => event !== undefined), (_) => removeStateDuplication(_) ]); // FIXME: In the stream API, translate the metadata into events in and of themselves; probably combining the 'limited' markers and previous-batch tokens into one event, to allow clients to backfill based on that // -> Do we need to emit such events for rooms that are *not* limited? Is there an actual purpose to that? return { syncToken: syncResponse.next_batch, limitedRooms: limitedRooms, previousBatchTokens: previousBatchTokens, events: events }; };