|
|
|
|
@ -1,48 +1,47 @@ |
|
|
|
|
import type { IMessage, IRoom } from '@rocket.chat/core-typings'; |
|
|
|
|
import { Emitter } from '@rocket.chat/emitter'; |
|
|
|
|
import { createPredicateFromFilter } from '@rocket.chat/mongo-adapter'; |
|
|
|
|
import type { Filter } from '@rocket.chat/mongo-adapter'; |
|
|
|
|
import { ReactiveVar } from 'meteor/reactive-var'; |
|
|
|
|
import { Tracker } from 'meteor/tracker'; |
|
|
|
|
|
|
|
|
|
import { upsertMessage, RoomHistoryManager } from './RoomHistoryManager'; |
|
|
|
|
import { mainReady } from './mainReady'; |
|
|
|
|
import { RoomManager } from '../../../../client/lib/RoomManager'; |
|
|
|
|
import { roomCoordinator } from '../../../../client/lib/rooms/roomCoordinator'; |
|
|
|
|
import { fireGlobalEvent } from '../../../../client/lib/utils/fireGlobalEvent'; |
|
|
|
|
import { getConfig } from '../../../../client/lib/utils/getConfig'; |
|
|
|
|
import { callbacks } from '../../../../lib/callbacks'; |
|
|
|
|
import { Messages, Subscriptions, CachedChatSubscription } from '../../../models/client'; |
|
|
|
|
import { Messages, Subscriptions } from '../../../models/client'; |
|
|
|
|
import { sdk } from '../../../utils/client/lib/SDKClient'; |
|
|
|
|
|
|
|
|
|
const maxRoomsOpen = parseInt(getConfig('maxRoomsOpen') ?? '5') || 5; |
|
|
|
|
|
|
|
|
|
const openedRooms: Record< |
|
|
|
|
string, |
|
|
|
|
{ |
|
|
|
|
typeName: string; |
|
|
|
|
rid: IRoom['_id']; |
|
|
|
|
ready: boolean; |
|
|
|
|
active: boolean; |
|
|
|
|
dom?: Node; |
|
|
|
|
streamActive?: boolean; |
|
|
|
|
unreadSince: ReactiveVar<Date | undefined>; |
|
|
|
|
lastSeen: Date; |
|
|
|
|
unreadFirstId?: string; |
|
|
|
|
} |
|
|
|
|
> = {}; |
|
|
|
|
type ListenRoomPropsByRidProps = keyof OpenedRoom; |
|
|
|
|
type ListenRoomPropsByRidPropsEvent = `${string}/${ListenRoomPropsByRidProps}`; |
|
|
|
|
|
|
|
|
|
const listener = new Emitter<{ |
|
|
|
|
[key in ListenRoomPropsByRidPropsEvent]: undefined; |
|
|
|
|
}>(); |
|
|
|
|
|
|
|
|
|
type OpenedRoom = { |
|
|
|
|
typeName: string; |
|
|
|
|
rid: IRoom['_id']; |
|
|
|
|
ready: boolean; |
|
|
|
|
dom?: Node; |
|
|
|
|
streamActive?: boolean; |
|
|
|
|
unreadSince: Date | undefined; |
|
|
|
|
lastSeen: Date; |
|
|
|
|
unreadFirstId?: string; |
|
|
|
|
stream?: { |
|
|
|
|
stop: () => void; |
|
|
|
|
}; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
const openedRoomsDependency = new Tracker.Dependency(); |
|
|
|
|
const openedRooms: Record<string, OpenedRoom> = {}; |
|
|
|
|
|
|
|
|
|
function close(typeName: string) { |
|
|
|
|
if (openedRooms[typeName]) { |
|
|
|
|
if (openedRooms[typeName].rid) { |
|
|
|
|
sdk.stop('room-messages', openedRooms[typeName].rid); |
|
|
|
|
sdk.stop('notify-room', `${openedRooms[typeName].rid}/deleteMessage`); |
|
|
|
|
sdk.stop('notify-room', `${openedRooms[typeName].rid}/deleteMessageBulk`); |
|
|
|
|
} |
|
|
|
|
openedRooms[typeName].stream?.stop(); |
|
|
|
|
|
|
|
|
|
openedRooms[typeName].ready = false; |
|
|
|
|
openedRooms[typeName].active = false; |
|
|
|
|
|
|
|
|
|
delete openedRooms[typeName].dom; |
|
|
|
|
|
|
|
|
|
@ -72,152 +71,187 @@ async function closeAllRooms() { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
function listenRoomPropsByRid<T extends ListenRoomPropsByRidProps>( |
|
|
|
|
rid: IRoom['_id'], |
|
|
|
|
prop: T, |
|
|
|
|
): { |
|
|
|
|
subscribe: (cb: () => void) => () => void; |
|
|
|
|
getSnapshotValue: () => OpenedRoom[T]; |
|
|
|
|
} { |
|
|
|
|
return { |
|
|
|
|
subscribe: (cb: () => void) => { |
|
|
|
|
return listener.on(`${rid}/${prop}`, cb); |
|
|
|
|
}, |
|
|
|
|
getSnapshotValue: (): OpenedRoom[T] => { |
|
|
|
|
return getOpenedRoomByRid(rid)?.[prop] as OpenedRoom[T]; |
|
|
|
|
}, |
|
|
|
|
}; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
function setPropertyByRid<T extends ListenRoomPropsByRidProps>(room: OpenedRoom, prop: T, value: OpenedRoom[T]): OpenedRoom[T] | undefined; |
|
|
|
|
function setPropertyByRid<T extends ListenRoomPropsByRidProps>(rid: IRoom['_id'], prop: T, value: OpenedRoom[T]): OpenedRoom[T] | undefined; |
|
|
|
|
function setPropertyByRid<T extends ListenRoomPropsByRidProps>( |
|
|
|
|
ridOrRoom: IRoom['_id'] | OpenedRoom, |
|
|
|
|
prop: T, |
|
|
|
|
value: OpenedRoom[T], |
|
|
|
|
): OpenedRoom[T] | undefined { |
|
|
|
|
const room = typeof ridOrRoom === 'string' ? getOpenedRoomByRid(ridOrRoom) : ridOrRoom; |
|
|
|
|
const rid = typeof ridOrRoom === 'string' ? ridOrRoom : room?.rid; |
|
|
|
|
if (!room) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
room[prop] = value; |
|
|
|
|
listener.emit(`${rid}/${prop}`); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
function getOpenedRoomByRid(rid: IRoom['_id']) { |
|
|
|
|
openedRoomsDependency.depend(); |
|
|
|
|
return Object.keys(openedRooms) |
|
|
|
|
.map((typeName) => openedRooms[typeName]) |
|
|
|
|
.find((openedRoom) => openedRoom.rid === rid); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const computation = Tracker.autorun(() => { |
|
|
|
|
if (!mainReady.get()) { |
|
|
|
|
const openRoom = (typeName: string, record: OpenedRoom) => { |
|
|
|
|
if (record.ready === true && record.streamActive === true) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
Tracker.nonreactive(() => |
|
|
|
|
Object.entries(openedRooms).forEach(([typeName, record]) => { |
|
|
|
|
if (record.active !== true || (record.ready === true && record.streamActive === true)) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const type = typeName.slice(0, 1); |
|
|
|
|
const name = typeName.slice(1); |
|
|
|
|
|
|
|
|
|
const room = roomCoordinator.getRoomDirectives(type).findRoom(name); |
|
|
|
|
|
|
|
|
|
if (room) { |
|
|
|
|
if (record.streamActive !== true) { |
|
|
|
|
void sdk |
|
|
|
|
.stream('room-messages', [record.rid], async (msg) => { |
|
|
|
|
// Should not send message to room if room has not loaded all the current messages
|
|
|
|
|
// if (RoomHistoryManager.hasMoreNext(record.rid) !== false) {
|
|
|
|
|
// return;
|
|
|
|
|
// }
|
|
|
|
|
// Do not load command messages into channel
|
|
|
|
|
if (msg.t !== 'command') { |
|
|
|
|
const subscription = Subscriptions.findOne({ rid: record.rid }, { reactive: false }); |
|
|
|
|
const isNew = !Messages.state.find((record) => record._id === msg._id && record.temp !== true); |
|
|
|
|
({ _id: msg._id, temp: { $ne: true } }); |
|
|
|
|
await upsertMessage({ msg, subscription }); |
|
|
|
|
|
|
|
|
|
if (isNew) { |
|
|
|
|
await callbacks.run('streamNewMessage', msg); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
await callbacks.run('streamMessage', { ...msg, name: room.name || '' }); |
|
|
|
|
|
|
|
|
|
fireGlobalEvent('new-message', { |
|
|
|
|
...msg, |
|
|
|
|
name: room.name || '', |
|
|
|
|
room: { |
|
|
|
|
type, |
|
|
|
|
name, |
|
|
|
|
}, |
|
|
|
|
}); |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
.ready() |
|
|
|
|
.then(() => { |
|
|
|
|
record.streamActive = true; |
|
|
|
|
openedRoomsDependency.changed(); |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
// when we receive a messages imported event we just clear the room history and fetch it again
|
|
|
|
|
sdk.stream('notify-room', [`${record.rid}/messagesImported`], async () => { |
|
|
|
|
await RoomHistoryManager.clear(record.rid); |
|
|
|
|
await RoomHistoryManager.getMore(record.rid); |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
sdk.stream('notify-room', [`${record.rid}/deleteMessage`], (msg) => { |
|
|
|
|
Messages.state.delete(msg._id); |
|
|
|
|
|
|
|
|
|
// remove thread refenrece from deleted message
|
|
|
|
|
Messages.state.update( |
|
|
|
|
(record) => record.tmid === msg._id, |
|
|
|
|
({ tmid: _, ...record }) => record, |
|
|
|
|
); |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
sdk.stream( |
|
|
|
|
'notify-room', |
|
|
|
|
[`${record.rid}/deleteMessageBulk`], |
|
|
|
|
({ rid, ts, excludePinned, ignoreDiscussion, users, ids, showDeletedStatus }) => { |
|
|
|
|
const query: Filter<IMessage> = { rid }; |
|
|
|
|
|
|
|
|
|
if (ids) { |
|
|
|
|
query._id = { $in: ids }; |
|
|
|
|
} else { |
|
|
|
|
query.ts = ts; |
|
|
|
|
} |
|
|
|
|
if (excludePinned) { |
|
|
|
|
query.pinned = { $ne: true }; |
|
|
|
|
} |
|
|
|
|
if (ignoreDiscussion) { |
|
|
|
|
query.drid = { $exists: false }; |
|
|
|
|
} |
|
|
|
|
if (users?.length) { |
|
|
|
|
query['u.username'] = { $in: users }; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const predicate = createPredicateFromFilter(query); |
|
|
|
|
|
|
|
|
|
if (showDeletedStatus) { |
|
|
|
|
return Messages.state.update(predicate, (record) => ({ |
|
|
|
|
...record, |
|
|
|
|
t: 'rm', |
|
|
|
|
msg: '', |
|
|
|
|
urls: [], |
|
|
|
|
mentions: [], |
|
|
|
|
attachments: [], |
|
|
|
|
reactions: {}, |
|
|
|
|
})); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return Messages.state.remove(predicate); |
|
|
|
|
}, |
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
sdk.stream('notify-room', [`${record.rid}/messagesRead`], ({ tmid, until }) => { |
|
|
|
|
if (tmid) { |
|
|
|
|
Messages.state.update( |
|
|
|
|
(record) => record.tmid === tmid && record.unread === true, |
|
|
|
|
({ unread: _, ...record }) => record, |
|
|
|
|
); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
Messages.state.update( |
|
|
|
|
(r) => |
|
|
|
|
r.rid === record.rid && r.unread === true && r.ts.getTime() < until.getTime() && (r.tmid === undefined || r.tshow === true), |
|
|
|
|
({ unread: _, ...r }) => r, |
|
|
|
|
); |
|
|
|
|
}); |
|
|
|
|
if (record.streamActive === true) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const type = typeName.slice(0, 1); |
|
|
|
|
const name = typeName.slice(1); |
|
|
|
|
|
|
|
|
|
const room = roomCoordinator.getRoomDirectives(type).findRoom(name); |
|
|
|
|
|
|
|
|
|
if (!room) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const streams: ReturnType<typeof sdk.stream>[] = []; |
|
|
|
|
|
|
|
|
|
streams.push( |
|
|
|
|
...[ |
|
|
|
|
sdk.stream('room-messages', [record.rid], async (msg) => { |
|
|
|
|
// Should not send message to room if room has not loaded all the current messages
|
|
|
|
|
// if (RoomHistoryManager.hasMoreNext(record.rid) !== false) {
|
|
|
|
|
// return;
|
|
|
|
|
// }
|
|
|
|
|
// Do not load command messages into channel
|
|
|
|
|
if (msg.t !== 'command') { |
|
|
|
|
const subscription = Subscriptions.findOne({ rid: record.rid }, { reactive: false }); |
|
|
|
|
const isNew = !Messages.state.find((record) => record._id === msg._id && record.temp !== true); |
|
|
|
|
({ _id: msg._id, temp: { $ne: true } }); |
|
|
|
|
await upsertMessage({ msg, subscription }); |
|
|
|
|
if (isNew) { |
|
|
|
|
await callbacks.run('streamNewMessage', msg); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
record.ready = true; |
|
|
|
|
}), |
|
|
|
|
await callbacks.run('streamMessage', { ...msg, name: room.name || '' }); |
|
|
|
|
|
|
|
|
|
fireGlobalEvent('new-message', { |
|
|
|
|
...msg, |
|
|
|
|
name: room.name || '', |
|
|
|
|
room: { |
|
|
|
|
type, |
|
|
|
|
name, |
|
|
|
|
}, |
|
|
|
|
}); |
|
|
|
|
}), |
|
|
|
|
|
|
|
|
|
// when we receive a messages imported event we just clear the room history and fetch it again
|
|
|
|
|
sdk.stream('notify-room', [`${record.rid}/messagesImported`], async () => { |
|
|
|
|
await RoomHistoryManager.clear(record.rid); |
|
|
|
|
await RoomHistoryManager.getMore(record.rid); |
|
|
|
|
}), |
|
|
|
|
sdk.stream('notify-room', [`${record.rid}/deleteMessage`], (msg) => { |
|
|
|
|
Messages.state.delete(msg._id); |
|
|
|
|
|
|
|
|
|
// remove thread refenrece from deleted message
|
|
|
|
|
Messages.state.update( |
|
|
|
|
(record) => record.tmid === msg._id, |
|
|
|
|
({ tmid: _, ...record }) => record, |
|
|
|
|
); |
|
|
|
|
}), |
|
|
|
|
sdk.stream( |
|
|
|
|
'notify-room', |
|
|
|
|
[`${record.rid}/deleteMessageBulk`], |
|
|
|
|
({ rid, ts, excludePinned, ignoreDiscussion, users, ids, showDeletedStatus }) => { |
|
|
|
|
const query: Filter<IMessage> = { rid }; |
|
|
|
|
|
|
|
|
|
if (ids) { |
|
|
|
|
query._id = { $in: ids }; |
|
|
|
|
} else { |
|
|
|
|
query.ts = ts; |
|
|
|
|
} |
|
|
|
|
if (excludePinned) { |
|
|
|
|
query.pinned = { $ne: true }; |
|
|
|
|
} |
|
|
|
|
if (ignoreDiscussion) { |
|
|
|
|
query.drid = { $exists: false }; |
|
|
|
|
} |
|
|
|
|
if (users?.length) { |
|
|
|
|
query['u.username'] = { $in: users }; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const predicate = createPredicateFromFilter(query); |
|
|
|
|
|
|
|
|
|
if (showDeletedStatus) { |
|
|
|
|
return Messages.state.update(predicate, (record) => ({ |
|
|
|
|
...record, |
|
|
|
|
t: 'rm', |
|
|
|
|
msg: '', |
|
|
|
|
urls: [], |
|
|
|
|
mentions: [], |
|
|
|
|
attachments: [], |
|
|
|
|
reactions: {}, |
|
|
|
|
})); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return Messages.state.remove(predicate); |
|
|
|
|
}, |
|
|
|
|
), |
|
|
|
|
|
|
|
|
|
sdk.stream('notify-room', [`${record.rid}/messagesRead`], ({ tmid, until }) => { |
|
|
|
|
if (tmid) { |
|
|
|
|
Messages.state.update( |
|
|
|
|
(record) => record.tmid === tmid && record.unread === true, |
|
|
|
|
({ unread: _, ...record }) => record, |
|
|
|
|
); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
Messages.state.update( |
|
|
|
|
(r) => |
|
|
|
|
r.rid === record.rid && r.unread === true && r.ts.getTime() < until.getTime() && (r.tmid === undefined || r.tshow === true), |
|
|
|
|
({ unread: _, ...r }) => r, |
|
|
|
|
); |
|
|
|
|
}), |
|
|
|
|
], |
|
|
|
|
); |
|
|
|
|
openedRoomsDependency.changed(); |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
const [streamRoomMessages] = streams; |
|
|
|
|
|
|
|
|
|
void streamRoomMessages.ready().then(() => { |
|
|
|
|
setPropertyByRid(record.rid, 'streamActive', true); |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
record.stream = { |
|
|
|
|
stop: () => { |
|
|
|
|
streams.forEach((stream) => stream.stop()); |
|
|
|
|
}, |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
record.ready = true; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
function open({ typeName, rid }: { typeName: string; rid: IRoom['_id'] }) { |
|
|
|
|
if (!openedRooms[typeName]) { |
|
|
|
|
openedRooms[typeName] = { |
|
|
|
|
typeName, |
|
|
|
|
rid, |
|
|
|
|
active: false, |
|
|
|
|
ready: false, |
|
|
|
|
unreadSince: new ReactiveVar(undefined), |
|
|
|
|
unreadSince: undefined, |
|
|
|
|
lastSeen: new Date(), |
|
|
|
|
}; |
|
|
|
|
} |
|
|
|
|
@ -228,21 +262,7 @@ function open({ typeName, rid }: { typeName: string; rid: IRoom['_id'] }) { |
|
|
|
|
closeOlderRooms(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (CachedChatSubscription.ready.get() === true) { |
|
|
|
|
if (openedRooms[typeName].active !== true) { |
|
|
|
|
openedRooms[typeName].active = true; |
|
|
|
|
if (computation) { |
|
|
|
|
computation.invalidate(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return { |
|
|
|
|
ready() { |
|
|
|
|
openedRoomsDependency.depend(); |
|
|
|
|
return openedRooms[typeName].ready; |
|
|
|
|
}, |
|
|
|
|
}; |
|
|
|
|
openRoom(typeName, openedRooms[typeName]); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
let openedRoom: string | undefined = undefined; |
|
|
|
|
@ -259,16 +279,13 @@ export const LegacyRoomManager = { |
|
|
|
|
get openedRooms() { |
|
|
|
|
return openedRooms; |
|
|
|
|
}, |
|
|
|
|
|
|
|
|
|
listenRoomPropsByRid, |
|
|
|
|
setPropertyByRid, |
|
|
|
|
getOpenedRoomByRid, |
|
|
|
|
|
|
|
|
|
close, |
|
|
|
|
|
|
|
|
|
closeAllRooms, |
|
|
|
|
|
|
|
|
|
get computation() { |
|
|
|
|
return computation; |
|
|
|
|
}, |
|
|
|
|
|
|
|
|
|
open, |
|
|
|
|
}; |
|
|
|
|
|