You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
248 lines
7.1 KiB
JavaScript
248 lines
7.1 KiB
JavaScript
"use strict";
|
|
|
|
const defaultValue = require("default-value");
|
|
const assureArray = require("assure-array");
|
|
const matchValue = require("match-value");
|
|
const concatArrays = require("concat-arrays");
|
|
const syncpipe = require("syncpipe");
|
|
const flatten = require("flatten");
|
|
|
|
const { validateArguments } = require("@validatem/core");
|
|
const required = require("@validatem/required");
|
|
|
|
const eventDeduplicator = require("../event-deduplicator");
|
|
const isSyncResponse = require("../is-sync-response");
|
|
const removeStateDuplication = require("./remove-state-duplication");
|
|
|
|
function maybeMap(array, mappingFunction) {
|
|
if (array == null) {
|
|
return [];
|
|
} else {
|
|
return array.map(mappingFunction);
|
|
}
|
|
}
|
|
|
|
function maybeMapObject(object, mappingFunction) {
|
|
if (object == null) {
|
|
return [];
|
|
} else {
|
|
return Object.entries(object).map(mappingFunction);
|
|
}
|
|
}
|
|
|
|
|
|
module.exports = function parseSyncResponse(_syncResponse, strict = false) {
|
|
// require("fs").writeFileSync("private/dump.json", JSON.stringify(_syncResponse));
|
|
let [ syncResponse ] = validateArguments(arguments, {
|
|
syncResponseBody: [ required, isSyncResponse(strict) ], // TODO: Validate and normalize the response body, including setting defaults, and allowing extra properties
|
|
});
|
|
|
|
// We keep an event ID -> event body mapping, to ensure that the same event in different places in the response maps to the same in-memory object in our resulting event list; this is useful both to save memory, and to make equality-checking operations work
|
|
// FIXME: Check if we need to deep-compare objects here to detect abbreviated versions of events? Otherwise we might end up replacing a full event with an abbreviated version.
|
|
let deduplicateEvent = eventDeduplicator();
|
|
|
|
function toTimestampedEvent(event, type) {
|
|
return {
|
|
type: type,
|
|
event: deduplicateEvent(event),
|
|
timestamp: event.origin_server_ts
|
|
};
|
|
}
|
|
|
|
function toUntimestampedEvent(event, type) {
|
|
return {
|
|
type: type,
|
|
event: deduplicateEvent(event)
|
|
};
|
|
}
|
|
|
|
function toRoomSummaryEvent(key, value) {
|
|
return {
|
|
type: "roomSummaryUpdate",
|
|
key: key,
|
|
value: value
|
|
};
|
|
}
|
|
|
|
function toNotificationCountsEvent(notificationCount) {
|
|
return {
|
|
type: "roomNotificationCounts",
|
|
data: notificationCount
|
|
};
|
|
}
|
|
|
|
function toDeviceListUserChangedEvent(user) {
|
|
return {
|
|
type: "deviceListUserChanged",
|
|
user: user
|
|
};
|
|
}
|
|
|
|
function toDeviceListUserLeftEvent(user) {
|
|
return {
|
|
type: "deviceListUserLeft",
|
|
user: user
|
|
};
|
|
}
|
|
|
|
function toMemberStateEvent(room, memberState) {
|
|
return {
|
|
type: "memberState",
|
|
room: room, // FIXME: Rename all room to roomID for consistency?
|
|
state: memberState
|
|
};
|
|
}
|
|
|
|
let roomParsingRules = [
|
|
{ types: [ "joined", "left", "invited" ], parser: (room, memberState) => {
|
|
return toMemberStateEvent(room, memberState);
|
|
} },
|
|
{ types: [ "joined", "left" ], parser: (room) => {
|
|
return maybeMap(room.state.events, (event) => toTimestampedEvent(event, "roomStateUpdate"));
|
|
} },
|
|
{ types: [ "joined", "left" ], parser: (room) => {
|
|
// NOTE: Can still contain state events! But they are a part of the timeline, not a 'summarized' state delta like `roomStateUpdate`s.
|
|
return maybeMap(room.timeline.events, (event) => toTimestampedEvent(event, "roomTimelineEvent"));
|
|
} },
|
|
{ types: [ "joined", "left" ], parser: (room) => {
|
|
return maybeMap(room.account_data.events, (event) => toUntimestampedEvent(event, "roomAccountData"));
|
|
} },
|
|
{ types: [ "joined" ], parser: (room) => {
|
|
return maybeMap(room.ephemeral.events, (event) => toUntimestampedEvent(event, "roomEphemeralEvent"));
|
|
} },
|
|
{ types: [ "joined" ], parser: (room) => {
|
|
return maybeMapObject(room.summary, ([ key, value ]) => toRoomSummaryEvent(key, value));
|
|
} },
|
|
{ types: [ "joined" ], parser: (room) => {
|
|
if (room.unread_notifications != null) {
|
|
return toNotificationCountsEvent(room.unread_notifications);
|
|
}
|
|
} },
|
|
{ types: [ "invited" ], parser: (room) => {
|
|
return maybeMap(room.invite_state.events, (event) => toUntimestampedEvent(event, "roomInviteState"));
|
|
} },
|
|
];
|
|
|
|
let globalParsingRules = [
|
|
{ key: "device_lists", parser: (deviceLists) => {
|
|
return concatArrays([
|
|
maybeMap(deviceLists.changed, (user) => toDeviceListUserChangedEvent(user)),
|
|
maybeMap(deviceLists.left, (user) => toDeviceListUserLeftEvent(user)),
|
|
]);
|
|
} },
|
|
{ key: "device_one_time_keys_count", parser: (oneTimeKeysCounts) => {
|
|
/* QUESTION: Always received, or only when one value updates? And if the latter, only the delta or the full list of algorithms? */
|
|
return {
|
|
type: "deviceOneTimeKeysCount",
|
|
keyCounts: oneTimeKeysCounts
|
|
};
|
|
} },
|
|
];
|
|
|
|
let limitedRooms = [];
|
|
let previousBatchTokens = {};
|
|
|
|
let roomsWithTimelines = concatArrays(
|
|
Object.entries(syncResponse.rooms.join),
|
|
Object.entries(syncResponse.rooms.leave),
|
|
);
|
|
|
|
for (let [ roomId, room ] of roomsWithTimelines) {
|
|
if (room.timeline.prev_batch != null) {
|
|
previousBatchTokens[roomId] = room.timeline.prev_batch;
|
|
}
|
|
|
|
if (room.timeline.limited === true) {
|
|
limitedRooms.push(roomId);
|
|
}
|
|
}
|
|
|
|
function parseRoom({ room, roomId, memberState }) {
|
|
return roomParsingRules.map((rule) => {
|
|
if (rule.types.includes(memberState)) {
|
|
return assureArray(rule.parser(room, memberState))
|
|
.filter((event) => event != null)
|
|
.map((event) => {
|
|
return {
|
|
... event,
|
|
room: roomId,
|
|
// memberState: memberState
|
|
};
|
|
});
|
|
}
|
|
});
|
|
}
|
|
|
|
let roomEvents = ["join", "leave", "invite"].map((roomType) => {
|
|
if (syncResponse.rooms[roomType] != null) {
|
|
let memberState = matchValue(roomType, {
|
|
join: "joined",
|
|
leave: "left",
|
|
invite: "invited"
|
|
});
|
|
|
|
return Object.entries(syncResponse.rooms[roomType])
|
|
.filter(([ _roomId, room ]) => room != null)
|
|
.map(([ roomId, room ]) => {
|
|
return parseRoom({ room, roomId, memberState });
|
|
});
|
|
} else {
|
|
return [];
|
|
}
|
|
});
|
|
|
|
let globalEvents = globalParsingRules.map((rule) => {
|
|
let data = syncResponse[rule.key];
|
|
|
|
if (data != null) {
|
|
return syncpipe(data, [
|
|
(_) => rule.parser(_),
|
|
(_) => assureArray(_)
|
|
]);
|
|
} else {
|
|
return [];
|
|
}
|
|
});
|
|
|
|
let globalTimelineMapping = {
|
|
presence: "presenceEvent",
|
|
account_data: "accountData",
|
|
to_device: "toDeviceEvent"
|
|
};
|
|
|
|
let globalTimelineEvents = syncpipe(globalTimelineMapping, [
|
|
(_) => Object.entries(_),
|
|
(_) => _.map(([ source, eventType ]) => {
|
|
let events = defaultValue(syncResponse[source].events, []);
|
|
|
|
return events.map((event) => {
|
|
return {
|
|
type: eventType,
|
|
event: deduplicateEvent(event)
|
|
};
|
|
});
|
|
})
|
|
]);
|
|
|
|
let events = syncpipe(null, [
|
|
(_) => concatArrays(
|
|
globalEvents,
|
|
globalTimelineEvents,
|
|
roomEvents
|
|
),
|
|
(_) => flatten(_),
|
|
(_) => _.filter((event) => event !== undefined),
|
|
(_) => removeStateDuplication(_)
|
|
]);
|
|
|
|
// FIXME: In the stream API, translate the metadata into events in and of themselves; probably combining the 'limited' markers and previous-batch tokens into one event, to allow clients to backfill based on that
|
|
// -> Do we need to emit such events for rooms that are *not* limited? Is there an actual purpose to that?
|
|
|
|
return {
|
|
syncToken: syncResponse.next_batch,
|
|
limitedRooms: limitedRooms,
|
|
previousBatchTokens: previousBatchTokens,
|
|
events: events
|
|
};
|
|
};
|