|
|
|
@ -10,7 +10,7 @@ import { fireGlobalEvent } from './fireGlobalEvent'; |
|
|
|
|
import { upsertMessage, RoomHistoryManager } from './RoomHistoryManager'; |
|
|
|
|
import { mainReady } from './mainReady'; |
|
|
|
|
import { roomTypes } from '../../../utils'; |
|
|
|
|
import { promises } from '../../../promises/client'; |
|
|
|
|
import { call } from '../..'; |
|
|
|
|
import { callbacks } from '../../../callbacks'; |
|
|
|
|
import { Notifications } from '../../../notifications'; |
|
|
|
|
import { CachedChatRoom, ChatMessage, ChatSubscription, CachedChatSubscription } from '../../../models'; |
|
|
|
@ -50,56 +50,53 @@ export const RoomManager = new function() { |
|
|
|
|
this.prototype.openedRooms = openedRooms; |
|
|
|
|
this.prototype.onlineUsers = onlineUsers; |
|
|
|
|
this.prototype.computation = Tracker.autorun(() => { |
|
|
|
|
Object.keys(openedRooms).forEach((typeName) => { |
|
|
|
|
const record = openedRooms[typeName]; |
|
|
|
|
const ready = CachedChatRoom.ready.get() && mainReady.get(); |
|
|
|
|
if (ready !== true) { return; } |
|
|
|
|
const user = Meteor.user(); |
|
|
|
|
Tracker.nonreactive(() => Object.entries(openedRooms).forEach(([typeName, record]) => { |
|
|
|
|
if (record.active !== true || record.ready === true) { return; } |
|
|
|
|
const ready = CachedChatRoom.ready.get() && mainReady.get(); |
|
|
|
|
if (ready !== true) { return; } |
|
|
|
|
const user = Meteor.user(); |
|
|
|
|
|
|
|
|
|
const type = typeName.substr(0, 1); |
|
|
|
|
const name = typeName.substr(1); |
|
|
|
|
|
|
|
|
|
const room = Tracker.nonreactive(() => roomTypes.findRoom(type, name, user)); |
|
|
|
|
const room = roomTypes.findRoom(type, name, user); |
|
|
|
|
|
|
|
|
|
if (room != null) { |
|
|
|
|
openedRooms[typeName].rid = room._id; |
|
|
|
|
record.rid = room._id; |
|
|
|
|
RoomHistoryManager.getMoreIfIsEmpty(room._id); |
|
|
|
|
|
|
|
|
|
if (openedRooms[typeName].streamActive !== true) { |
|
|
|
|
openedRooms[typeName].streamActive = true; |
|
|
|
|
msgStream.on(openedRooms[typeName].rid, (msg) => |
|
|
|
|
|
|
|
|
|
promises.run('onClientMessageReceived', msg).then(function(msg) { |
|
|
|
|
// Should not send message to room if room has not loaded all the current messages
|
|
|
|
|
if (RoomHistoryManager.hasMoreNext(openedRooms[typeName].rid) === false) { |
|
|
|
|
// Do not load command messages into channel
|
|
|
|
|
if (msg.t !== 'command') { |
|
|
|
|
const subscription = ChatSubscription.findOne({ rid: openedRooms[typeName].rid }); |
|
|
|
|
upsertMessage({ msg, subscription }); |
|
|
|
|
msg.room = { |
|
|
|
|
type, |
|
|
|
|
name, |
|
|
|
|
}; |
|
|
|
|
} |
|
|
|
|
msg.name = room.name; |
|
|
|
|
RoomManager.updateMentionsMarksOfRoom(typeName); |
|
|
|
|
|
|
|
|
|
callbacks.run('streamMessage', msg); |
|
|
|
|
|
|
|
|
|
return fireGlobalEvent('new-message', msg); |
|
|
|
|
} |
|
|
|
|
}) |
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
Notifications.onRoom(openedRooms[typeName].rid, 'deleteMessage', onDeleteMessageStream); // eslint-disable-line no-use-before-define
|
|
|
|
|
Notifications.onRoom(openedRooms[typeName].rid, 'deleteMessageBulk', onDeleteMessageBulkStream); // eslint-disable-line no-use-before-define
|
|
|
|
|
if (record.streamActive !== true) { |
|
|
|
|
record.streamActive = true; |
|
|
|
|
msgStream.on(record.rid, async (msg) => { |
|
|
|
|
// Should not send message to room if room has not loaded all the current messages
|
|
|
|
|
if (RoomHistoryManager.hasMoreNext(record.rid) !== false) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// Do not load command messages into channel
|
|
|
|
|
if (msg.t !== 'command') { |
|
|
|
|
const subscription = ChatSubscription.findOne({ rid: record.rid }, { reactive: false }); |
|
|
|
|
upsertMessage({ msg, subscription }); |
|
|
|
|
msg.room = { |
|
|
|
|
type, |
|
|
|
|
name, |
|
|
|
|
}; |
|
|
|
|
} |
|
|
|
|
msg.name = room.name; |
|
|
|
|
RoomManager.updateMentionsMarksOfRoom(typeName); |
|
|
|
|
|
|
|
|
|
callbacks.run('streamMessage', msg); |
|
|
|
|
|
|
|
|
|
return fireGlobalEvent('new-message', msg); |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
Notifications.onRoom(record.rid, 'deleteMessage', onDeleteMessageStream); // eslint-disable-line no-use-before-define
|
|
|
|
|
Notifications.onRoom(record.rid, 'deleteMessageBulk', onDeleteMessageBulkStream); // eslint-disable-line no-use-before-define
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
record.ready = true; |
|
|
|
|
Dep.changed(); |
|
|
|
|
}); |
|
|
|
|
})); |
|
|
|
|
Dep.changed(); |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -255,18 +252,24 @@ export const RoomManager = new function() { |
|
|
|
|
return new Cls(); |
|
|
|
|
}(); |
|
|
|
|
|
|
|
|
|
const loadMissedMessages = function(rid) { |
|
|
|
|
const loadMissedMessages = async function(rid) { |
|
|
|
|
const lastMessage = ChatMessage.findOne({ rid, _hidden: { $ne: true }, temp: { $exists: false } }, { sort: { ts: -1 }, limit: 1 }); |
|
|
|
|
|
|
|
|
|
if (lastMessage == null) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
const subscription = ChatSubscription.findOne({ rid }); |
|
|
|
|
return Meteor.call('loadMissedMessages', rid, lastMessage.ts, (err, result) => { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try { |
|
|
|
|
const result = await call('loadMissedMessages', rid, lastMessage.ts); |
|
|
|
|
if (result) { |
|
|
|
|
return Array.from(result).map((item) => promises.run('onClientMessageReceived', item).then((msg) => upsertMessage({ msg, subscription }))); |
|
|
|
|
const subscription = ChatSubscription.findOne({ rid }); |
|
|
|
|
return Promise.all(Array.from(result).map((msg) => upsertMessage({ msg, subscription }))); |
|
|
|
|
} |
|
|
|
|
return []; |
|
|
|
|
}); |
|
|
|
|
} catch (error) { |
|
|
|
|
return []; |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
let connectionWasOnline = true; |
|
|
|
|