You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

248 lines
7.1 KiB
JavaScript

"use strict";
const defaultValue = require("default-value");
const assureArray = require("assure-array");
const matchValue = require("match-value");
const concatArrays = require("concat-arrays");
const syncpipe = require("syncpipe");
const flatten = require("flatten");
const { validateArguments } = require("@validatem/core");
const required = require("@validatem/required");
const eventDeduplicator = require("../event-deduplicator");
const isSyncResponse = require("../is-sync-response");
const removeStateDuplication = require("./remove-state-duplication");
function maybeMap(array, mappingFunction) {
if (array == null) {
return [];
} else {
return array.map(mappingFunction);
}
}
function maybeMapObject(object, mappingFunction) {
if (object == null) {
return [];
} else {
return Object.entries(object).map(mappingFunction);
}
}
module.exports = function parseSyncResponse(_syncResponse, strict = false) {
// require("fs").writeFileSync("private/dump.json", JSON.stringify(_syncResponse));
let [ syncResponse ] = validateArguments(arguments, {
syncResponseBody: [ required, isSyncResponse(strict) ], // TODO: Validate and normalize the response body, including setting defaults, and allowing extra properties
});
// We keep an event ID -> event body mapping, to ensure that the same event in different places in the response maps to the same in-memory object in our resulting event list; this is useful both to save memory, and to make equality-checking operations work
// FIXME: Check if we need to deep-compare objects here to detect abbreviated versions of events? Otherwise we might end up replacing a full event with an abbreviated version.
let deduplicateEvent = eventDeduplicator();
function toTimestampedEvent(event, type) {
return {
type: type,
event: deduplicateEvent(event),
timestamp: event.origin_server_ts
};
}
function toUntimestampedEvent(event, type) {
return {
type: type,
event: deduplicateEvent(event)
};
}
function toRoomSummaryEvent(key, value) {
return {
type: "roomSummaryUpdate",
key: key,
value: value
};
}
function toNotificationCountsEvent(notificationCount) {
return {
type: "roomNotificationCounts",
data: notificationCount
};
}
function toDeviceListUserChangedEvent(user) {
return {
type: "deviceListUserChanged",
user: user
};
}
function toDeviceListUserLeftEvent(user) {
return {
type: "deviceListUserLeft",
user: user
};
}
function toMemberStateEvent(room, memberState) {
return {
type: "memberState",
room: room, // FIXME: Rename all room to roomID for consistency?
state: memberState
};
}
let roomParsingRules = [
{ types: [ "joined", "left", "invited" ], parser: (room, memberState) => {
return toMemberStateEvent(room, memberState);
} },
{ types: [ "joined", "left" ], parser: (room) => {
return maybeMap(room.state.events, (event) => toTimestampedEvent(event, "roomStateUpdate"));
} },
{ types: [ "joined", "left" ], parser: (room) => {
// NOTE: Can still contain state events! But they are a part of the timeline, not a 'summarized' state delta like `roomStateUpdate`s.
return maybeMap(room.timeline.events, (event) => toTimestampedEvent(event, "roomTimelineEvent"));
} },
{ types: [ "joined", "left" ], parser: (room) => {
return maybeMap(room.account_data.events, (event) => toUntimestampedEvent(event, "roomAccountData"));
} },
{ types: [ "joined" ], parser: (room) => {
return maybeMap(room.ephemeral.events, (event) => toUntimestampedEvent(event, "roomEphemeralEvent"));
} },
{ types: [ "joined" ], parser: (room) => {
return maybeMapObject(room.summary, ([ key, value ]) => toRoomSummaryEvent(key, value));
} },
{ types: [ "joined" ], parser: (room) => {
if (room.unread_notifications != null) {
return toNotificationCountsEvent(room.unread_notifications);
}
} },
{ types: [ "invited" ], parser: (room) => {
return maybeMap(room.invite_state.events, (event) => toUntimestampedEvent(event, "roomInviteState"));
} },
];
let globalParsingRules = [
{ key: "device_lists", parser: (deviceLists) => {
return concatArrays([
maybeMap(deviceLists.changed, (user) => toDeviceListUserChangedEvent(user)),
maybeMap(deviceLists.left, (user) => toDeviceListUserLeftEvent(user)),
]);
} },
{ key: "device_one_time_keys_count", parser: (oneTimeKeysCounts) => {
/* QUESTION: Always received, or only when one value updates? And if the latter, only the delta or the full list of algorithms? */
return {
type: "deviceOneTimeKeysCount",
keyCounts: oneTimeKeysCounts
};
} },
];
let limitedRooms = [];
let previousBatchTokens = {};
let roomsWithTimelines = concatArrays(
Object.entries(syncResponse.rooms.join),
Object.entries(syncResponse.rooms.leave),
);
for (let [ roomId, room ] of roomsWithTimelines) {
if (room.timeline.prev_batch != null) {
previousBatchTokens[roomId] = room.timeline.prev_batch;
}
if (room.timeline.limited === true) {
limitedRooms.push(roomId);
}
}
function parseRoom({ room, roomId, memberState }) {
return roomParsingRules.map((rule) => {
if (rule.types.includes(memberState)) {
return assureArray(rule.parser(room, memberState))
.filter((event) => event != null)
.map((event) => {
return {
... event,
room: roomId,
// memberState: memberState
};
});
}
});
}
let roomEvents = ["join", "leave", "invite"].map((roomType) => {
if (syncResponse.rooms[roomType] != null) {
let memberState = matchValue(roomType, {
join: "joined",
leave: "left",
invite: "invited"
});
return Object.entries(syncResponse.rooms[roomType])
.filter(([ _roomId, room ]) => room != null)
.map(([ roomId, room ]) => {
return parseRoom({ room, roomId, memberState });
});
} else {
return [];
}
});
let globalEvents = globalParsingRules.map((rule) => {
let data = syncResponse[rule.key];
if (data != null) {
return syncpipe(data, [
(_) => rule.parser(_),
(_) => assureArray(_)
]);
} else {
return [];
}
});
let globalTimelineMapping = {
presence: "presenceEvent",
account_data: "accountData",
to_device: "toDeviceEvent"
};
let globalTimelineEvents = syncpipe(globalTimelineMapping, [
(_) => Object.entries(_),
(_) => _.map(([ source, eventType ]) => {
let events = defaultValue(syncResponse[source].events, []);
return events.map((event) => {
return {
type: eventType,
event: deduplicateEvent(event)
};
});
})
]);
let events = syncpipe(null, [
(_) => concatArrays(
globalEvents,
globalTimelineEvents,
roomEvents
),
(_) => flatten(_),
(_) => _.filter((event) => event !== undefined),
(_) => removeStateDuplication(_)
]);
// FIXME: In the stream API, translate the metadata into events in and of themselves; probably combining the 'limited' markers and previous-batch tokens into one event, to allow clients to backfill based on that
// -> Do we need to emit such events for rooms that are *not* limited? Is there an actual purpose to that?
return {
syncToken: syncResponse.next_batch,
limitedRooms: limitedRooms,
previousBatchTokens: previousBatchTokens,
events: events
};
};