You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
534 lines
15 KiB
534 lines
15 KiB
import EJSON from 'ejson';
|
|
import { FederationServers, FederationRoomEvents, Rooms, Messages, Subscriptions, Users } from '@rocket.chat/models';
|
|
import { api } from '@rocket.chat/core-services';
|
|
import { eventTypes } from '@rocket.chat/core-typings';
|
|
|
|
import { API } from '../../../api/server';
|
|
import { serverLogger } from '../lib/logger';
|
|
import { contextDefinitions } from '../lib/context';
|
|
import { normalizers } from '../normalizers';
|
|
import { deleteRoom } from '../../../lib/server/functions';
|
|
import { FileUpload } from '../../../file-upload/server';
|
|
import { getFederationDomain } from '../lib/getFederationDomain';
|
|
import { decryptIfNeeded } from '../lib/crypt';
|
|
import { isFederationEnabled } from '../lib/isFederationEnabled';
|
|
import { getUpload, requestEventsFromLatest } from '../handler';
|
|
import { notifyUsersOnMessage } from '../../../lib/server/lib/notifyUsersOnMessage';
|
|
import { sendAllNotifications } from '../../../lib/server/lib/sendNotificationsOnMessage';
|
|
import { processThreads } from '../../../threads/server/hooks/aftersavemessage';
|
|
|
|
const eventHandlers = {
|
|
//
|
|
// PING
|
|
//
|
|
async [eventTypes.PING]() {
|
|
return {
|
|
success: true,
|
|
};
|
|
},
|
|
|
|
//
|
|
// GENESIS
|
|
//
|
|
async [eventTypes.GENESIS](event) {
|
|
switch (event.data.contextType) {
|
|
case contextDefinitions.ROOM.type:
|
|
const eventResult = await FederationRoomEvents.addEvent(event.context, event);
|
|
|
|
// If the event was successfully added, handle the event locally
|
|
if (eventResult.success) {
|
|
const {
|
|
data: { room },
|
|
} = event;
|
|
|
|
// Check if room exists
|
|
const persistedRoom = await Rooms.findOne({ _id: room._id });
|
|
|
|
if (persistedRoom) {
|
|
// Update the federation
|
|
await Rooms.updateOne({ _id: persistedRoom._id }, { $set: { federation: room.federation } });
|
|
} else {
|
|
// Denormalize room
|
|
const denormalizedRoom = normalizers.denormalizeRoom(room);
|
|
|
|
// Create the room
|
|
await Rooms.insertOne(denormalizedRoom);
|
|
}
|
|
}
|
|
return eventResult;
|
|
}
|
|
},
|
|
|
|
//
|
|
// ROOM_DELETE
|
|
//
|
|
async [eventTypes.ROOM_DELETE](event) {
|
|
const {
|
|
data: { roomId },
|
|
} = event;
|
|
|
|
// Check if room exists
|
|
const persistedRoom = await Rooms.findOne({ _id: roomId });
|
|
|
|
if (persistedRoom) {
|
|
// Delete the room
|
|
await deleteRoom(roomId);
|
|
}
|
|
|
|
// Remove all room events
|
|
await FederationRoomEvents.removeRoomEvents(roomId);
|
|
|
|
return {
|
|
success: true,
|
|
};
|
|
},
|
|
|
|
//
|
|
// ROOM_ADD_USER
|
|
//
|
|
async [eventTypes.ROOM_ADD_USER](event) {
|
|
const eventResult = await FederationRoomEvents.addEvent(event.context, event);
|
|
|
|
// We only want to refresh the server list and update the room federation array if something changed
|
|
let federationAltered = false;
|
|
|
|
// If the event was successfully added, handle the event locally
|
|
if (eventResult.success) {
|
|
const {
|
|
data: { roomId, user, subscription, domainsAfterAdd },
|
|
} = event;
|
|
|
|
// Check if user exists
|
|
const persistedUser = await Users.findOne({ _id: user._id });
|
|
|
|
if (persistedUser) {
|
|
// Update the federation, if its not already set (if it's set, this is likely an event being reprocessed)
|
|
if (!persistedUser.federation) {
|
|
await Users.updateOne({ _id: persistedUser._id }, { $set: { federation: user.federation } });
|
|
federationAltered = true;
|
|
}
|
|
} else {
|
|
// Denormalize user
|
|
const denormalizedUser = normalizers.denormalizeUser(user);
|
|
|
|
// Create the user
|
|
await Users.insertOne(denormalizedUser);
|
|
federationAltered = true;
|
|
}
|
|
|
|
// Check if subscription exists
|
|
const persistedSubscription = await Subscriptions.findOne({ _id: subscription._id });
|
|
|
|
try {
|
|
if (persistedSubscription) {
|
|
// Update the federation, if its not already set (if it's set, this is likely an event being reprocessed
|
|
if (!persistedSubscription.federation) {
|
|
await Subscriptions.updateOne({ _id: persistedSubscription._id }, { $set: { federation: subscription.federation } });
|
|
federationAltered = true;
|
|
}
|
|
} else {
|
|
// Denormalize subscription
|
|
const denormalizedSubscription = normalizers.denormalizeSubscription(subscription);
|
|
|
|
// Create the subscription
|
|
await Subscriptions.insertOne(denormalizedSubscription);
|
|
federationAltered = true;
|
|
}
|
|
} catch (ex) {
|
|
serverLogger.debug(`unable to create subscription for user ( ${user._id} ) in room (${roomId})`);
|
|
}
|
|
|
|
// Refresh the servers list
|
|
if (federationAltered) {
|
|
await FederationServers.refreshServers();
|
|
|
|
// Update the room's federation property
|
|
await Rooms.updateOne({ _id: roomId }, { $set: { 'federation.domains': domainsAfterAdd } });
|
|
}
|
|
}
|
|
|
|
return eventResult;
|
|
},
|
|
|
|
//
|
|
// ROOM_REMOVE_USER
|
|
//
|
|
async [eventTypes.ROOM_REMOVE_USER](event) {
|
|
const eventResult = await FederationRoomEvents.addEvent(event.context, event);
|
|
|
|
// If the event was successfully added, handle the event locally
|
|
if (eventResult.success) {
|
|
const {
|
|
data: { roomId, user, domainsAfterRemoval },
|
|
} = event;
|
|
|
|
// Remove the user's subscription
|
|
await Subscriptions.removeByRoomIdAndUserId(roomId, user._id);
|
|
|
|
// Refresh the servers list
|
|
await FederationServers.refreshServers();
|
|
|
|
// Update the room's federation property
|
|
await Rooms.updateOne({ _id: roomId }, { $set: { 'federation.domains': domainsAfterRemoval } });
|
|
}
|
|
|
|
return eventResult;
|
|
},
|
|
|
|
//
|
|
// ROOM_USER_LEFT
|
|
//
|
|
async [eventTypes.ROOM_USER_LEFT](event) {
|
|
const eventResult = await FederationRoomEvents.addEvent(event.context, event);
|
|
|
|
// If the event was successfully added, handle the event locally
|
|
if (eventResult.success) {
|
|
const {
|
|
data: { roomId, user, domainsAfterRemoval },
|
|
} = event;
|
|
|
|
// Remove the user's subscription
|
|
await Subscriptions.removeByRoomIdAndUserId(roomId, user._id);
|
|
|
|
// Refresh the servers list
|
|
await FederationServers.refreshServers();
|
|
|
|
// Update the room's federation property
|
|
await Rooms.updateOne({ _id: roomId }, { $set: { 'federation.domains': domainsAfterRemoval } });
|
|
}
|
|
|
|
return eventResult;
|
|
},
|
|
|
|
//
|
|
// ROOM_MESSAGE
|
|
//
|
|
async [eventTypes.ROOM_MESSAGE](event) {
|
|
const eventResult = await FederationRoomEvents.addEvent(event.context, event);
|
|
|
|
// If the event was successfully added, handle the event locally
|
|
if (eventResult.success) {
|
|
const {
|
|
data: { message },
|
|
} = event;
|
|
|
|
// Check if message exists
|
|
const persistedMessage = await Messages.findOne({ _id: message._id });
|
|
|
|
if (persistedMessage) {
|
|
// Update the federation
|
|
if (!persistedMessage.federation) {
|
|
await Messages.updateOne({ _id: persistedMessage._id }, { $set: { federation: message.federation } });
|
|
}
|
|
} else {
|
|
// Load the room
|
|
const room = await Rooms.findOneById(message.rid);
|
|
|
|
// Denormalize message
|
|
const denormalizedMessage = normalizers.denormalizeMessage(message);
|
|
|
|
// Is there a file?
|
|
if (denormalizedMessage.file) {
|
|
const fileStore = FileUpload.getStore('Uploads');
|
|
|
|
const {
|
|
federation: { origin },
|
|
} = denormalizedMessage;
|
|
|
|
const { upload, buffer } = await getUpload(origin, denormalizedMessage.file._id);
|
|
|
|
const oldUploadId = upload._id;
|
|
|
|
// Normalize upload
|
|
delete upload._id;
|
|
upload.rid = denormalizedMessage.rid;
|
|
upload.userId = denormalizedMessage.u._id;
|
|
upload.federation = {
|
|
_id: denormalizedMessage.file._id,
|
|
origin,
|
|
};
|
|
|
|
await fileStore.insert(upload, buffer);
|
|
|
|
// Update the message's file
|
|
denormalizedMessage.file._id = upload._id;
|
|
|
|
// Update the message's attachments dependent on type
|
|
for (const attachment of denormalizedMessage.attachments) {
|
|
attachment.title_link = attachment.title_link.replace(oldUploadId, upload._id);
|
|
if (/^image\/.+/.test(denormalizedMessage.file.type)) {
|
|
attachment.image_url = attachment.image_url.replace(oldUploadId, upload._id);
|
|
} else if (/^audio\/.+/.test(denormalizedMessage.file.type)) {
|
|
attachment.audio_url = attachment.audio_url.replace(oldUploadId, upload._id);
|
|
} else if (/^video\/.+/.test(denormalizedMessage.file.type)) {
|
|
attachment.video_url = attachment.video_url.replace(oldUploadId, upload._id);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Create the message
|
|
try {
|
|
await Messages.insertOne(denormalizedMessage);
|
|
|
|
await processThreads(denormalizedMessage, room);
|
|
|
|
// Notify users
|
|
await notifyUsersOnMessage(denormalizedMessage, room);
|
|
sendAllNotifications(denormalizedMessage, room);
|
|
} catch (err) {
|
|
serverLogger.debug(`Error on creating message: ${message._id}`);
|
|
}
|
|
}
|
|
}
|
|
|
|
return eventResult;
|
|
},
|
|
|
|
//
|
|
// ROOM_EDIT_MESSAGE
|
|
//
|
|
async [eventTypes.ROOM_EDIT_MESSAGE](event) {
|
|
const eventResult = await FederationRoomEvents.addEvent(event.context, event);
|
|
|
|
// If the event was successfully added, handle the event locally
|
|
if (eventResult.success) {
|
|
const {
|
|
data: { message },
|
|
} = event;
|
|
|
|
// Check if message exists
|
|
const persistedMessage = await Messages.findOne({ _id: message._id });
|
|
|
|
if (!persistedMessage) {
|
|
eventResult.success = false;
|
|
eventResult.reason = 'missingMessageToEdit';
|
|
} else {
|
|
// Update the message
|
|
await Messages.updateOne({ _id: persistedMessage._id }, { $set: { msg: message.msg, federation: message.federation } });
|
|
}
|
|
}
|
|
|
|
return eventResult;
|
|
},
|
|
|
|
//
|
|
// ROOM_DELETE_MESSAGE
|
|
//
|
|
async [eventTypes.ROOM_DELETE_MESSAGE](event) {
|
|
const eventResult = await FederationRoomEvents.addEvent(event.context, event);
|
|
|
|
// If the event was successfully added, handle the event locally
|
|
if (eventResult.success) {
|
|
const {
|
|
data: { roomId, messageId },
|
|
} = event;
|
|
|
|
// Remove the message
|
|
await Messages.removeById(messageId);
|
|
|
|
// Notify the room
|
|
void api.broadcast('notify.deleteMessage', roomId, { _id: messageId });
|
|
}
|
|
|
|
return eventResult;
|
|
},
|
|
|
|
//
|
|
// ROOM_SET_MESSAGE_REACTION
|
|
//
|
|
async [eventTypes.ROOM_SET_MESSAGE_REACTION](event) {
|
|
const eventResult = await FederationRoomEvents.addEvent(event.context, event);
|
|
|
|
// If the event was successfully added, handle the event locally
|
|
if (eventResult.success) {
|
|
const {
|
|
data: { messageId, username, reaction },
|
|
} = event;
|
|
|
|
// Get persisted message
|
|
const persistedMessage = await Messages.findOne({ _id: messageId });
|
|
|
|
// Make sure reactions exist
|
|
persistedMessage.reactions = persistedMessage.reactions || {};
|
|
|
|
let reactionObj = persistedMessage.reactions[reaction];
|
|
|
|
// If there are no reactions of that type, add it
|
|
if (!reactionObj) {
|
|
reactionObj = {
|
|
usernames: [username],
|
|
};
|
|
} else {
|
|
// Otherwise, add the username
|
|
reactionObj.usernames.push(username);
|
|
reactionObj.usernames = [...new Set(reactionObj.usernames)];
|
|
}
|
|
|
|
// Update the property
|
|
await Messages.updateOne({ _id: messageId }, { $set: { [`reactions.${reaction}`]: reactionObj } });
|
|
}
|
|
|
|
return eventResult;
|
|
},
|
|
|
|
//
|
|
// ROOM_UNSET_MESSAGE_REACTION
|
|
//
|
|
async [eventTypes.ROOM_UNSET_MESSAGE_REACTION](event) {
|
|
const eventResult = await FederationRoomEvents.addEvent(event.context, event);
|
|
|
|
// If the event was successfully added, handle the event locally
|
|
if (eventResult.success) {
|
|
const {
|
|
data: { messageId, username, reaction },
|
|
} = event;
|
|
|
|
// Get persisted message
|
|
const persistedMessage = await Messages.findOne({ _id: messageId });
|
|
|
|
// Make sure reactions exist
|
|
persistedMessage.reactions = persistedMessage.reactions || {};
|
|
|
|
// If there are no reactions of that type, ignore
|
|
if (!persistedMessage.reactions[reaction]) {
|
|
return eventResult;
|
|
}
|
|
|
|
const reactionObj = persistedMessage.reactions[reaction];
|
|
|
|
// Get the username index on the list
|
|
const usernameIdx = reactionObj.usernames.indexOf(username);
|
|
|
|
// If the index is not found, ignore
|
|
if (usernameIdx === -1) {
|
|
return eventResult;
|
|
}
|
|
|
|
// Remove the username from the given reaction
|
|
reactionObj.usernames.splice(usernameIdx, 1);
|
|
|
|
// If there are no more users for that reaction, remove the property
|
|
if (reactionObj.usernames.length === 0) {
|
|
await Messages.updateOne({ _id: messageId }, { $unset: { [`reactions.${reaction}`]: 1 } });
|
|
} else {
|
|
// Otherwise, update the property
|
|
await Messages.updateOne({ _id: messageId }, { $set: { [`reactions.${reaction}`]: reactionObj } });
|
|
}
|
|
}
|
|
|
|
return eventResult;
|
|
},
|
|
|
|
//
|
|
// ROOM_MUTE_USER
|
|
//
|
|
async [eventTypes.ROOM_MUTE_USER](event) {
|
|
const eventResult = await FederationRoomEvents.addEvent(event.context, event);
|
|
|
|
// If the event was successfully added, handle the event locally
|
|
if (eventResult.success) {
|
|
const {
|
|
data: { roomId, user },
|
|
} = event;
|
|
|
|
// Denormalize user
|
|
const denormalizedUser = normalizers.denormalizeUser(user);
|
|
|
|
// Mute user
|
|
await Rooms.muteUsernameByRoomId(roomId, denormalizedUser.username);
|
|
}
|
|
|
|
return eventResult;
|
|
},
|
|
|
|
//
|
|
// ROOM_UNMUTE_USER
|
|
//
|
|
async [eventTypes.ROOM_UNMUTE_USER](event) {
|
|
const eventResult = await FederationRoomEvents.addEvent(event.context, event);
|
|
|
|
// If the event was successfully added, handle the event locally
|
|
if (eventResult.success) {
|
|
const {
|
|
data: { roomId, user },
|
|
} = event;
|
|
|
|
// Denormalize user
|
|
const denormalizedUser = normalizers.denormalizeUser(user);
|
|
|
|
// Mute user
|
|
await Rooms.unmuteUsernameByRoomId(roomId, denormalizedUser.username);
|
|
}
|
|
|
|
return eventResult;
|
|
},
|
|
};
|
|
|
|
API.v1.addRoute(
|
|
'federation.events.dispatch',
|
|
{ authRequired: false, rateLimiterOptions: { numRequestsAllowed: 30, intervalTimeInMS: 1000 } },
|
|
{
|
|
async post() {
|
|
if (!isFederationEnabled()) {
|
|
return API.v1.failure('Federation not enabled');
|
|
}
|
|
|
|
//
|
|
// Decrypt the payload if needed
|
|
let payload;
|
|
|
|
try {
|
|
payload = await decryptIfNeeded(this.request, this.bodyParams);
|
|
} catch (err) {
|
|
return API.v1.failure('Could not decrypt payload');
|
|
}
|
|
|
|
//
|
|
// Convert from EJSON
|
|
const { events } = EJSON.fromJSONValue(payload);
|
|
|
|
serverLogger.debug({ msg: 'federation.events.dispatch', events });
|
|
|
|
// Loop over received events
|
|
for (const event of events) {
|
|
/* eslint-disable no-await-in-loop */
|
|
|
|
let eventResult;
|
|
|
|
if (eventHandlers[event.type]) {
|
|
eventResult = await eventHandlers[event.type](event);
|
|
}
|
|
|
|
// If there was an error handling the event, take action
|
|
if (!eventResult || !eventResult.success) {
|
|
try {
|
|
serverLogger.debug({
|
|
msg: 'federation.events.dispatch => Event has missing parents',
|
|
event,
|
|
});
|
|
|
|
await requestEventsFromLatest(
|
|
event.origin,
|
|
getFederationDomain(),
|
|
contextDefinitions.defineType(event),
|
|
event.context,
|
|
eventResult.latestEventIds,
|
|
);
|
|
|
|
// And stop handling the events
|
|
break;
|
|
} catch (err) {
|
|
serverLogger.error({ msg: 'dispatch', event, eventResult, err });
|
|
|
|
throw err;
|
|
}
|
|
}
|
|
|
|
/* eslint-enable no-await-in-loop */
|
|
}
|
|
|
|
// Respond
|
|
return API.v1.success();
|
|
},
|
|
},
|
|
);
|
|
|