"use strict"; const Promise = require("bluebird"); const { chain } = require("error-chain"); const mmAxios = require("@modular-matrix/axios"); const matchProtocolError = require("@modular-matrix/match-protocol-error"); const errors = require("@modular-matrix/errors"); const mmParseMessagesResponse = require("../parse-messages-response"); const mmMapEvents = require("../map-events"); const pipe = require("@promistream/pipe"); const simpleSource = require("@promistream/simple-source"); const buffer = require("@promistream/buffer"); const EndOfStream = require("@promistream/end-of-stream"); const { validateArguments } = require("@validatem/core"); const required = require("@validatem/required"); const isString = require("@validatem/is-string"); const isInteger = require("@validatem/is-integer"); const isFunction = require("@validatem/is-function"); // FIXME: Ensure that there is a way to define a limit, and get back a token that lets us continue from the point where that limit was reached. module.exports = function streamBacklog(_session, _roomID, _options) { let [ session, roomID, options ] = validateArguments(arguments, { session: [ required ], // FIXME: Session object validation roomID: [ required, isString ], // FIXME: Strict validation options: [ required, { start: [ required, isString ], end: [ isString ], chunkSize: [ isInteger ], eventMapper: [ isFunction ] }] }); let lastToken = options.start; let axios = mmAxios({ session: session }); return pipe([ simpleSource(() => { if (lastToken != null) { return Promise.try(() => { // FIXME: Write urlEncodedInput`` abstraction return axios.get(`/client/r0/rooms/${encodeURIComponent(roomID)}/messages`, { params: { from: lastToken, to: options.end, dir: "b", limit: options.chunkSize // FIXME: filter? }}); }).then((response) => { // console.log(require("util").inspect(response.data.chunk, { colors: true, depth: null })); let { events, paginationToken } = mmParseMessagesResponse(response.data); // Currently the only way to detect the end of pagination if (events.length > 0) { lastToken = paginationToken; // FIXME: Return type:event items and a type:token item return mmMapEvents(events, options.eventMapper); } else { throw new EndOfStream; } }).catch(matchProtocolError(403, "M_FORBIDDEN"), (error) => { throw chain(error, errors.AccessDenied, "You cannot access the messages in that room, or the room does not exist"); }); } else { // For supporting the proposed future end-of-pagination detection method (https://github.com/matrix-org/matrix-doc/issues/2251) throw new EndOfStream; } }), buffer() ]); };