You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
78 lines
2.7 KiB
JavaScript
78 lines
2.7 KiB
JavaScript
"use strict";
|
|
|
|
const Promise = require("bluebird");
|
|
const { chain } = require("error-chain");
|
|
|
|
const mmAxios = require("@modular-matrix/axios");
|
|
const matchProtocolError = require("@modular-matrix/match-protocol-error");
|
|
const errors = require("@modular-matrix/errors");
|
|
const mmParseMessagesResponse = require("../parse-messages-response");
|
|
const mmMapEvents = require("../map-events");
|
|
|
|
const pipe = require("@promistream/pipe");
|
|
const simpleSource = require("@promistream/simple-source");
|
|
const buffer = require("@promistream/buffer");
|
|
const EndOfStream = require("@promistream/end-of-stream");
|
|
|
|
const { validateArguments } = require("@validatem/core");
|
|
const required = require("@validatem/required");
|
|
const isString = require("@validatem/is-string");
|
|
const isInteger = require("@validatem/is-integer");
|
|
const isFunction = require("@validatem/is-function");
|
|
|
|
// FIXME: Ensure that there is a way to define a limit, and get back a token that lets us continue from the point where that limit was reached.
|
|
|
|
module.exports = function streamBacklog(_session, _roomID, _options) {
|
|
let [ session, roomID, options ] = validateArguments(arguments, {
|
|
session: [ required ], // FIXME: Session object validation
|
|
roomID: [ required, isString ], // FIXME: Strict validation
|
|
options: [ required, {
|
|
start: [ required, isString ],
|
|
end: [ isString ],
|
|
chunkSize: [ isInteger ],
|
|
eventMapper: [ isFunction ]
|
|
}]
|
|
});
|
|
|
|
let lastToken = options.start;
|
|
|
|
let axios = mmAxios({ session: session });
|
|
|
|
return pipe([
|
|
simpleSource(() => {
|
|
if (lastToken != null) {
|
|
return Promise.try(() => {
|
|
// FIXME: Write urlEncodedInput`` abstraction
|
|
return axios.get(`/client/r0/rooms/${encodeURIComponent(roomID)}/messages`, { params: {
|
|
from: lastToken,
|
|
to: options.end,
|
|
dir: "b",
|
|
limit: options.chunkSize
|
|
// FIXME: filter?
|
|
}});
|
|
}).then((response) => {
|
|
// console.log(require("util").inspect(response.data.chunk, { colors: true, depth: null }));
|
|
|
|
let { events, paginationToken } = mmParseMessagesResponse(response.data);
|
|
|
|
// Currently the only way to detect the end of pagination
|
|
if (events.length > 0) {
|
|
lastToken = paginationToken;
|
|
|
|
// FIXME: Return type:event items and a type:token item
|
|
return mmMapEvents(events, options.eventMapper);
|
|
} else {
|
|
throw new EndOfStream;
|
|
}
|
|
}).catch(matchProtocolError(403, "M_FORBIDDEN"), (error) => {
|
|
throw chain(error, errors.AccessDenied, "You cannot access the messages in that room, or the room does not exist");
|
|
});
|
|
} else {
|
|
// For supporting the proposed future end-of-pagination detection method (https://github.com/matrix-org/matrix-doc/issues/2251)
|
|
throw new EndOfStream;
|
|
}
|
|
}),
|
|
buffer()
|
|
]);
|
|
};
|