You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

153 lines
4.5 KiB
JavaScript

"use strict";
const Promise = require("bluebird");
const concatArrays = require("concat-arrays");
const flatten = require("flatten");
const mmParseSyncResponse = require("../parse-sync-response");
const mmAxios = require("@modular-matrix/axios");
const mmStreamBacklog = require("../stream-backlog");
const isSession = require("@modular-matrix/is-session");
const sortEventComparator = require("../sort-event-comparator");
const mapEvents = require("../map-events");
const pipe = require("@promistream/pipe");
const simpleSource = require("@promistream/simple-source");
const buffer = require("@promistream/buffer");
const collect = require("@promistream/collect");
const { validateArguments } = require("@validatem/core");
const isString = require("@validatem/is-string");
const isInteger = require("@validatem/is-integer");
const isFunction = require("@validatem/is-function");
const optionalObject = require("../optional-object");
const defaultTo = require("@validatem/default-to");
/* pstreams implementation:
track last sync token in state, on every call make a new sync request
buffer stream
suggest using a time-based batching stream for batch handling
*/
// FIXME: Can we deduplicate between stream-events and stream-backlog? Is this a useful thing to do in the first place?
// FIXME: Accept a decryption implementation as an argument, to allow decryption to occur transparently, so that only the underlying plaintext events (or decryption errors or their recoveries) actually end up in the event stream
/* TODO: Add limited/lazy mode
- for each limited room, return a 'paginator' object; this object internally stores the start and end of the pagination, and so represents a finite set of items
- when invoked, the paginator should produce a pstreams stream, which should be aborted when the desired amount of items has been obtained (and will then internally stop making requests for them)
different settings:
- limited/lazy mode for initial sync
- limited/lazy mode for subsequent syncs
*/
function generateLimitFilter(limit) {
return {
room: {
timeline: { limit: limit }
}
};
}
function combineEvents(arrays) {
return concatArrays(... arrays).sort(sortEventComparator);
}
module.exports = function streamEvents(_session, _initialSyncToken, _options) {
let [ session, initialSyncToken, options ] = validateArguments(arguments, {
session: isSession,
initialSyncToken: [ isString ],
options: optionalObject({
initialLimit: [ isInteger ],
updateLimit: [ isInteger ],
eventMapper: [ isFunction ],
timeout: [ defaultTo(10000), isInteger ]
})
});
let lastSyncToken = initialSyncToken;
let isInitialSync = true;
let axios = mmAxios({ session: session });
return pipe([
simpleSource(() => {
let applicableLimit = (isInitialSync)
? options.initialLimit
: options.updateLimit;
let filter = (applicableLimit != null)
? generateLimitFilter(applicableLimit)
: {};
if (isInitialSync) {
isInitialSync = false;
}
return Promise.try(() => {
return axios.get("/client/r0/sync", {
params: {
since: lastSyncToken,
filter: JSON.stringify(filter),
timeout: options.timeout
}
});
}).then((response) => {
let { events, syncToken, limitedRooms, previousBatchTokens } = mmParseSyncResponse(response.data);
let limitedRoomMarkers = limitedRooms.map((room) => {
let backlogStart = previousBatchTokens[room];
let backlogEnd = lastSyncToken;
return {
type: "limitedRoom",
room: room,
backlogStart: backlogStart,
backlogEnd: backlogEnd,
streamBacklog: function () {
return mmStreamBacklog(session, room, {
start: backlogStart,
end: backlogEnd,
// chunkSize: applicableLimit // FIXME: Make configurable separately?
chunkSize: 200,
eventMapper: options.eventMapper
});
}
};
});
lastSyncToken = syncToken;
let syncTokenEvent = {
type: "syncToken",
token: syncToken
};
let mappedEvents = mapEvents(events, options.eventMapper);
if (limitedRooms.length > 0 && applicableLimit == null) {
return Promise.map(limitedRoomMarkers, (marker) => {
return pipe([
marker.streamBacklog(),
collect()
]).read();
}).then((backlogs) => {
return combineEvents([
flatten(backlogs),
mappedEvents
]).concat([ syncTokenEvent ]);
});
} else {
return combineEvents([
limitedRoomMarkers,
mappedEvents
]).concat([ syncTokenEvent ]);
}
});
}),
buffer()
]);
};