Federation performance and bug fixes (#17504)

pull/17824/head
Andrew Louis 5 years ago committed by GitHub
parent 46b8f1711b
commit 1cfd820111
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 68
      app/federation/server/endpoints/dispatch.js
  2. 2
      app/federation/server/functions/helpers.js
  3. 7
      app/federation/server/handler/index.js
  4. 11
      app/federation/server/hooks/afterAddedToRoom.js
  5. 2
      app/federation/server/hooks/afterCreateRoom.js
  6. 4
      app/federation/server/hooks/afterLeaveRoom.js
  7. 4
      app/federation/server/hooks/afterRemoveFromRoom.js
  8. 1
      app/federation/server/hooks/afterUnsetReaction.js
  9. 15
      app/federation/server/lib/dns.js
  10. 2
      app/federation/server/normalizers/room.js

@ -21,6 +21,8 @@ import { isFederationEnabled } from '../lib/isFederationEnabled';
import { getUpload, requestEventsFromLatest } from '../handler';
import { notifyUsersOnMessage } from '../../../lib/server/lib/notifyUsersOnMessage';
import { sendAllNotifications } from '../../../lib/server/lib/sendNotificationsOnMessage';
import { processThreads } from '../../../threads/server/hooks/aftersavemessage';
import { processDeleteInThread } from '../../../threads/server/hooks/afterdeletemessage';
const eventHandlers = {
//
@ -90,6 +92,9 @@ const eventHandlers = {
async [eventTypes.ROOM_ADD_USER](event) {
const eventResult = await FederationRoomEvents.addEvent(event.context, event);
// We only want to refresh the server list and update the room federation array if something changed
let federationAltered = false;
// If the event was successfully added, handle the event locally
if (eventResult.success) {
const { data: { roomId, user, subscription, domainsAfterAdd } } = event;
@ -98,35 +103,49 @@ const eventHandlers = {
const persistedUser = Users.findOne({ _id: user._id });
if (persistedUser) {
// Update the federation
Users.update({ _id: persistedUser._id }, { $set: { federation: user.federation } });
// Update the federation, if its not already set (if it's set, this is likely an event being reprocessed)
if (!persistedUser.federation) {
Users.update({ _id: persistedUser._id }, { $set: { federation: user.federation } });
federationAltered = true;
}
} else {
// Denormalize user
const denormalizedUser = normalizers.denormalizeUser(user);
// Create the user
Users.insert(denormalizedUser);
federationAltered = true;
}
// Check if subscription exists
const persistedSubscription = Subscriptions.findOne({ _id: subscription._id });
if (persistedSubscription) {
// Update the federation
Subscriptions.update({ _id: persistedSubscription._id }, { $set: { federation: subscription.federation } });
} else {
// Denormalize subscription
const denormalizedSubscription = normalizers.denormalizeSubscription(subscription);
try {
if (persistedSubscription) {
// Update the federation, if its not already set (if it's set, this is likely an event being reprocessed
if (!persistedSubscription.federation) {
Subscriptions.update({ _id: persistedSubscription._id }, { $set: { federation: subscription.federation } });
federationAltered = true;
}
} else {
// Denormalize subscription
const denormalizedSubscription = normalizers.denormalizeSubscription(subscription);
// Create the subscription
Subscriptions.insert(denormalizedSubscription);
// Create the subscription
Subscriptions.insert(denormalizedSubscription);
federationAltered = true;
}
} catch (ex) {
logger.server.debug(`unable to create subscription for user ( ${ user._id } ) in room (${ roomId })`);
}
// Refresh the servers list
FederationServers.refreshServers();
if (federationAltered) {
FederationServers.refreshServers();
// Update the room's federation property
Rooms.update({ _id: roomId }, { $set: { 'federation.domains': domainsAfterAdd } });
// Update the room's federation property
Rooms.update({ _id: roomId }, { $set: { 'federation.domains': domainsAfterAdd } });
}
}
return eventResult;
@ -193,7 +212,9 @@ const eventHandlers = {
if (persistedMessage) {
// Update the federation
Messages.update({ _id: persistedMessage._id }, { $set: { federation: message.federation } });
if (!persistedMessage.federation) {
Messages.update({ _id: persistedMessage._id }, { $set: { federation: message.federation } });
}
} else {
// Load the room
const room = Rooms.findOneById(message.rid);
@ -239,11 +260,17 @@ const eventHandlers = {
}
// Create the message
Messages.insert(denormalizedMessage);
try {
Messages.insert(denormalizedMessage);
processThreads(denormalizedMessage, room);
// Notify users
notifyUsersOnMessage(denormalizedMessage, room);
sendAllNotifications(denormalizedMessage, room);
// Notify users
notifyUsersOnMessage(denormalizedMessage, room);
sendAllNotifications(denormalizedMessage, room);
} catch (err) {
logger.server.debug(`Error on creating message: ${ message._id }`);
}
}
}
@ -285,6 +312,11 @@ const eventHandlers = {
if (eventResult.success) {
const { data: { roomId, messageId } } = event;
const message = Messages.findOne({ _id: messageId });
if (message) {
processDeleteInThread(message);
}
// Remove the message
Messages.removeById(messageId);

@ -18,7 +18,7 @@ export function updateEnabled(enabled) {
}
export const checkRoomType = (room) => room.t === 'p' || room.t === 'd';
export const checkRoomDomainsLength = (domains) => domains.length <= 10;
export const checkRoomDomainsLength = (domains) => domains.length <= (process.env.FEDERATED_DOMAINS_LENGTH || 10);
export const hasExternalDomain = ({ federation }) => {
// same test as isFederated(room)

@ -55,6 +55,8 @@ export function dispatchEvents(domains, events) {
throw disabled('client.dispatchEvents');
}
domains = [...new Set(domains)];
logger.client.debug(() => `dispatchEvents => domains=${ domains.join(', ') } events=${ events.map((e) => JSON.stringify(e, null, 2)) }`);
const uri = '/api/v1/federation.events.dispatch';
@ -65,7 +67,10 @@ export function dispatchEvents(domains, events) {
}
export function dispatchEvent(domains, event) {
dispatchEvents(domains, [event]);
// Ensure the domain list is distinct to avoid excessive events
const distinctDomains = [...new Set(domains)].filter((domain) => domain === event.origin);
dispatchEvents(distinctDomains, [event]);
}
export function getUpload(domain, fileId) {

@ -45,11 +45,16 @@ async function afterAddedToRoom(involvedUsers, room) {
//
// Get the users domains
const domainsAfterAdd = users.map((u) => u.federation.origin);
const domainsAfterAdd = [];
users.forEach((user) => {
if (user.hasOwnProperty('federation') && !domainsAfterAdd.includes(user.federation.origin)) {
domainsAfterAdd.push(user.federation.origin);
}
});
// Check if the number of domains is allowed
if (!checkRoomDomainsLength(room.federation.domains)) {
throw new Error('Cannot federate rooms with more than 10 domains');
if (!checkRoomDomainsLength(domainsAfterAdd)) {
throw new Error(`Cannot federate rooms with more than ${ process.env.FEDERATED_DOMAINS_LENGTH || 10 } domains`);
}
//

@ -40,7 +40,7 @@ export async function doAfterCreateRoom(room, users, subscriptions) {
// Check if the number of domains is allowed
if (!checkRoomDomainsLength(normalizedRoom.federation.domains)) {
throw new Error('Cannot federate rooms with more than 10 domains');
throw new Error(`Cannot federate rooms with more than ${ process.env.FEDERATED_DOMAINS_LENGTH || 10 } domains`);
}
// Ensure a genesis event for this room

@ -19,7 +19,7 @@ async function afterLeaveRoom(user, room) {
try {
// Get the domains after leave
const domainsAfterLeave = users.map((u) => u.federation.origin);
const domainsAfterLeave = [...new Set(users.map((u) => u.federation.origin))];
//
// Normalize the room's federation status
@ -28,7 +28,7 @@ async function afterLeaveRoom(user, room) {
usersBeforeLeave.push(user);
// Get the users domains
const domainsBeforeLeft = usersBeforeLeave.map((u) => u.federation.origin);
const domainsBeforeLeft = [...new Set(usersBeforeLeave.map((u) => u.federation.origin))];
//
// Create the user left event

@ -21,7 +21,7 @@ async function afterRemoveFromRoom(involvedUsers, room) {
try {
// Get the domains after removal
const domainsAfterRemoval = users.map((u) => u.federation.origin);
const domainsAfterRemoval = [...new Set(users.map((u) => u.federation.origin))];
//
// Normalize the room's federation status
@ -30,7 +30,7 @@ async function afterRemoveFromRoom(involvedUsers, room) {
usersBeforeRemoval.push(removedUser);
// Get the users domains
const domainsBeforeRemoval = usersBeforeRemoval.map((u) => u.federation.origin);
const domainsBeforeRemoval = [...new Set(usersBeforeRemoval.map((u) => u.federation.origin))];
//
// Create the user remove event

@ -4,6 +4,7 @@ import { FederationRoomEvents, Rooms } from '../../../models/server';
import { logger } from '../lib/logger';
import { hasExternalDomain } from '../functions/helpers';
import { getFederationDomain } from '../lib/getFederationDomain';
import { dispatchEvent } from '../handler';
async function afterUnsetReaction(message, { user, reaction }) {
const room = Rooms.findOneById(message.rid, { fields: { federation: 1 } });

@ -1,6 +1,7 @@
import dnsResolver from 'dns';
import { Meteor } from 'meteor/meteor';
import mem from 'mem';
import * as federationErrors from '../functions/errors';
import { logger } from './logger';
@ -10,6 +11,10 @@ import { federationRequest } from './http';
const dnsResolveSRV = Meteor.wrapAsync(dnsResolver.resolveSrv);
const dnsResolveTXT = Meteor.wrapAsync(dnsResolver.resolveTxt);
const cacheMaxAge = 3600000; // one hour
const memoizedDnsResolveSRV = mem(dnsResolveSRV, { maxAge: cacheMaxAge });
const memoizedDnsResolveTXT = mem(dnsResolveTXT, { maxAge: cacheMaxAge });
const hubUrl = process.env.NODE_ENV === 'development' ? 'http://localhost:8080' : 'https://hub.rocket.chat';
export function registerWithHub(peerDomain, url, publicKey) {
@ -68,7 +73,7 @@ export function search(peerDomain) {
// Search by HTTPS first
try {
logger.dns.debug(`search: peerDomain=${ peerDomain } srv=_rocketchat._https.${ peerDomain }`);
srvEntries = dnsResolveSRV(`_rocketchat._https.${ peerDomain }`);
srvEntries = memoizedDnsResolveSRV(`_rocketchat._https.${ peerDomain }`);
protocol = 'https';
} catch (err) {
// Ignore errors when looking for DNS entries
@ -78,7 +83,7 @@ export function search(peerDomain) {
if (!srvEntries.length) {
try {
logger.dns.debug(`search: peerDomain=${ peerDomain } srv=_rocketchat._http.${ peerDomain }`);
srvEntries = dnsResolveSRV(`_rocketchat._http.${ peerDomain }`);
srvEntries = memoizedDnsResolveSRV(`_rocketchat._http.${ peerDomain }`);
protocol = 'http';
} catch (err) {
// Ignore errors when looking for DNS entries
@ -89,12 +94,12 @@ export function search(peerDomain) {
if (!srvEntries.length) {
try {
logger.dns.debug(`search: peerDomain=${ peerDomain } srv=_rocketchat._tcp.${ peerDomain }`);
srvEntries = dnsResolveSRV(`_rocketchat._tcp.${ peerDomain }`);
srvEntries = memoizedDnsResolveSRV(`_rocketchat._tcp.${ peerDomain }`);
protocol = 'https'; // https is the default
// Then, also try to get the protocol
logger.dns.debug(`search: peerDomain=${ peerDomain } txt=rocketchat-tcp-protocol.${ peerDomain }`);
protocol = dnsResolveTXT(`rocketchat-tcp-protocol.${ peerDomain }`);
protocol = memoizedDnsResolveSRV(`rocketchat-tcp-protocol.${ peerDomain }`);
protocol = protocol[0].join('');
if (protocol !== 'http' && protocol !== 'https') {
@ -119,7 +124,7 @@ export function search(peerDomain) {
// Get the public key from the TXT record
try {
logger.dns.debug(`search: peerDomain=${ peerDomain } txt=rocketchat-public-key.${ peerDomain }`);
const publicKeyTxtRecords = dnsResolveTXT(`rocketchat-public-key.${ peerDomain }`);
const publicKeyTxtRecords = memoizedDnsResolveTXT(`rocketchat-public-key.${ peerDomain }`);
// Join the TXT record, that might be split
publicKey = publicKeyTxtRecords[0].join('');

@ -78,6 +78,8 @@ const normalizeRoom = (originalResource, users) => {
}
}
domains = [...new Set(domains)];
// Federation
resource.federation = resource.federation || {
origin: getFederationDomain(), // The origin of this resource, where it was created

Loading…
Cancel
Save