You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

78 lines
2.7 KiB
JavaScript

"use strict";
const Promise = require("bluebird");
const { chain } = require("error-chain");
const mmAxios = require("@modular-matrix/axios");
const matchProtocolError = require("@modular-matrix/match-protocol-error");
const errors = require("@modular-matrix/errors");
const mmParseMessagesResponse = require("../parse-messages-response");
const mmMapEvents = require("../map-events");
const pipe = require("@promistream/pipe");
const simpleSource = require("@promistream/simple-source");
const buffer = require("@promistream/buffer");
const EndOfStream = require("@promistream/end-of-stream");
const { validateArguments } = require("@validatem/core");
const required = require("@validatem/required");
const isString = require("@validatem/is-string");
const isInteger = require("@validatem/is-integer");
const isFunction = require("@validatem/is-function");
// FIXME: Ensure that there is a way to define a limit, and get back a token that lets us continue from the point where that limit was reached.
module.exports = function streamBacklog(_session, _roomID, _options) {
let [ session, roomID, options ] = validateArguments(arguments, {
session: [ required ], // FIXME: Session object validation
roomID: [ required, isString ], // FIXME: Strict validation
options: [ required, {
start: [ required, isString ],
end: [ isString ],
chunkSize: [ isInteger ],
eventMapper: [ isFunction ]
}]
});
let lastToken = options.start;
let axios = mmAxios({ session: session });
return pipe([
simpleSource(() => {
if (lastToken != null) {
return Promise.try(() => {
// FIXME: Write urlEncodedInput`` abstraction
return axios.get(`/client/r0/rooms/${encodeURIComponent(roomID)}/messages`, { params: {
from: lastToken,
to: options.end,
dir: "b",
limit: options.chunkSize
// FIXME: filter?
}});
}).then((response) => {
// console.log(require("util").inspect(response.data.chunk, { colors: true, depth: null }));
let { events, paginationToken } = mmParseMessagesResponse(response.data);
// Currently the only way to detect the end of pagination
if (events.length > 0) {
lastToken = paginationToken;
// FIXME: Return type:event items and a type:token item
return mmMapEvents(events, options.eventMapper);
} else {
throw new EndOfStream;
}
}).catch(matchProtocolError(403, "M_FORBIDDEN"), (error) => {
throw chain(error, errors.AccessDenied, "You cannot access the messages in that room, or the room does not exist");
});
} else {
// For supporting the proposed future end-of-pagination detection method (https://github.com/matrix-org/matrix-doc/issues/2251)
throw new EndOfStream;
}
}),
buffer()
]);
};