import type { IMessage, IRoom } from '@rocket.chat/core-typings'; import { Emitter } from '@rocket.chat/emitter'; import { createPredicateFromFilter } from '@rocket.chat/mongo-adapter'; import { clientCallbacks } from '@rocket.chat/ui-client'; import type { Filter } from 'mongodb'; import { upsertMessage, RoomHistoryManager } from './RoomHistoryManager'; import { RoomManager } from '../../../../client/lib/RoomManager'; import { roomCoordinator } from '../../../../client/lib/rooms/roomCoordinator'; import { fireGlobalEvent } from '../../../../client/lib/utils/fireGlobalEvent'; import { getConfig } from '../../../../client/lib/utils/getConfig'; import { modifyMessageOnFilesDelete } from '../../../../client/lib/utils/modifyMessageOnFilesDelete'; import { Messages, Subscriptions } from '../../../../client/stores'; import { sdk } from '../../../utils/client/lib/SDKClient'; const maxRoomsOpen = parseInt(getConfig('maxRoomsOpen') ?? '5') || 5; type ListenRoomPropsByRidProps = keyof OpenedRoom; type ListenRoomPropsByRidPropsEvent = `${string}/${ListenRoomPropsByRidProps}`; const listener = new Emitter<{ [key in ListenRoomPropsByRidPropsEvent]: undefined; }>(); type OpenedRoom = { typeName: string; rid: IRoom['_id']; ready: boolean; dom?: Node; streamActive?: boolean; unreadSince: Date | undefined; lastSeen: Date; unreadFirstId?: string; stream?: { stop: () => void; }; }; const openedRooms: Record = {}; function close(typeName: string) { if (openedRooms[typeName]) { openedRooms[typeName].stream?.stop(); openedRooms[typeName].ready = false; delete openedRooms[typeName].dom; const { rid } = openedRooms[typeName]; delete openedRooms[typeName]; if (rid) { RoomManager.close(rid); return RoomHistoryManager.close(rid); } } } function closeOlderRooms() { if (Object.keys(openedRooms).length <= maxRoomsOpen) { return; } const roomsToClose = Object.values(openedRooms) .sort((a, b) => b.lastSeen.getTime() - a.lastSeen.getTime()) .slice(maxRoomsOpen); return Array.from(roomsToClose).map((roomToClose) => close(roomToClose.typeName)); } async function closeAllRooms() { for await (const openedRoom of Object.values(openedRooms)) { await close(openedRoom.typeName); } } function listenRoomPropsByRid( rid: IRoom['_id'], prop: T, ): { subscribe: (cb: () => void) => () => void; getSnapshotValue: () => OpenedRoom[T]; } { return { subscribe: (cb: () => void) => { return listener.on(`${rid}/${prop}`, cb); }, getSnapshotValue: (): OpenedRoom[T] => { return getOpenedRoomByRid(rid)?.[prop] as OpenedRoom[T]; }, }; } function setPropertyByRid(room: OpenedRoom, prop: T, value: OpenedRoom[T]): OpenedRoom[T] | undefined; function setPropertyByRid(rid: IRoom['_id'], prop: T, value: OpenedRoom[T]): OpenedRoom[T] | undefined; function setPropertyByRid( ridOrRoom: IRoom['_id'] | OpenedRoom, prop: T, value: OpenedRoom[T], ): OpenedRoom[T] | undefined { const room = typeof ridOrRoom === 'string' ? getOpenedRoomByRid(ridOrRoom) : ridOrRoom; const rid = typeof ridOrRoom === 'string' ? ridOrRoom : room?.rid; if (!room) { return; } room[prop] = value; listener.emit(`${rid}/${prop}`); } function getOpenedRoomByRid(rid: IRoom['_id']) { return Object.keys(openedRooms) .map((typeName) => openedRooms[typeName]) .find((openedRoom) => openedRoom.rid === rid); } function createDeleteQuery({ excludePinned, ignoreDiscussion, rid, ts, users, ids, }: { rid: IMessage['rid']; excludePinned: boolean; ignoreDiscussion: boolean; ts: Record; users: string[]; ids?: string[]; }) { const query: Filter = { rid }; if (ids) { query._id = { $in: ids }; } else { query.ts = ts; } if (excludePinned) { query.pinned = { $ne: true }; } if (ignoreDiscussion) { query.drid = { $exists: false }; } if (users?.length) { query['u.username'] = { $in: users }; } return query; } const openRoom = (typeName: string, record: OpenedRoom) => { if (record.ready === true && record.streamActive === true) { return; } if (record.streamActive === true) { return; } const type = typeName.slice(0, 1); const name = typeName.slice(1); const room = roomCoordinator.getRoomDirectives(type).findRoom(name); if (!room) { return; } const streams: ReturnType[] = []; streams.push( ...[ sdk.stream('room-messages', [record.rid], async (msg) => { // Should not send message to room if room has not loaded all the current messages // if (RoomHistoryManager.hasMoreNext(record.rid) !== false) { // return; // } // Do not load command messages into channel if (msg.t !== 'command') { const subscription = Subscriptions.state.find(({ rid }) => rid === record.rid); const isNew = !Messages.state.find((record) => record._id === msg._id && record.temp !== true); // Measure and log message receive delay for messages if (msg.ts) { const receiveDelay = Date.now() - new Date(msg.ts).getTime(); // Log warning if delay is significant (>2 seconds) if (receiveDelay > 2000) { console.warn(`[Message Delivery] High delay detected: ${receiveDelay}ms. Possible network or backend issue.`); } } await upsertMessage({ msg, subscription }); if (isNew) { await clientCallbacks.run('streamNewMessage', msg); } } await clientCallbacks.run('streamMessage', { ...msg, name: room.name || '' }); fireGlobalEvent('new-message', { ...msg, name: room.name || '', room: { type, name, }, }); }), // when we receive a messages imported event we just clear the room history and fetch it again sdk.stream('notify-room', [`${record.rid}/messagesImported`], async () => { await RoomHistoryManager.clear(record.rid); await RoomHistoryManager.getMore(record.rid); }), sdk.stream('notify-room', [`${record.rid}/deleteMessage`], (msg) => { Messages.state.delete(msg._id); // remove thread refenrece from deleted message Messages.state.update( (record) => record.tmid === msg._id, ({ tmid: _, ...record }) => record, ); }), sdk.stream('notify-room', [`${record.rid}/deleteMessageBulk`], async (params) => { const query = createDeleteQuery(params); const predicate = createPredicateFromFilter(query); if (params.filesOnly) { return Messages.state.update(predicate, (record) => modifyMessageOnFilesDelete(record, params.replaceFileAttachmentsWith)); } if (params.showDeletedStatus) { return Messages.state.update(predicate, (record) => ({ ...record, t: 'rm', msg: '', urls: [], mentions: [], attachments: [], reactions: {}, })); } return Messages.state.remove(predicate); }), sdk.stream('notify-room', [`${record.rid}/messagesRead`], ({ tmid, until }) => { if (tmid) { Messages.state.update( (record) => record.tmid === tmid && record.unread === true, ({ unread: _, ...record }) => record, ); return; } Messages.state.update( (r) => r.rid === record.rid && r.unread === true && r.ts.getTime() < until.getTime() && (r.tmid === undefined || r.tshow === true), ({ unread: _, ...r }) => r, ); }), ], ); const [streamRoomMessages] = streams; void streamRoomMessages.ready().then(() => { setPropertyByRid(record.rid, 'streamActive', true); }); record.stream = { stop: () => { streams.forEach((stream) => stream.stop()); }, }; record.ready = true; }; function open({ typeName, rid }: { typeName: string; rid: IRoom['_id'] }) { if (!openedRooms[typeName]) { openedRooms[typeName] = { typeName, rid, ready: false, unreadSince: undefined, lastSeen: new Date(), }; } openedRooms[typeName].lastSeen = new Date(); if (openedRooms[typeName].ready) { closeOlderRooms(); } openRoom(typeName, openedRooms[typeName]); } let openedRoom: string | undefined = undefined; export const LegacyRoomManager = { get openedRoom() { return openedRoom; }, set openedRoom(rid) { openedRoom = rid; }, get openedRooms() { return openedRooms; }, listenRoomPropsByRid, setPropertyByRid, getOpenedRoomByRid, close, closeAllRooms, open, };