[NEW] Federation (#12370)
parent
dc51cdc4cd
commit
a080cd9b6b
File diff suppressed because it is too large
Load Diff
@ -0,0 +1 @@ |
||||
##Rocket.Chat Federation |
||||
@ -0,0 +1,23 @@ |
||||
import { MessageTypes } from 'meteor/rocketchat:ui-utils'; |
||||
|
||||
// Register message types
|
||||
MessageTypes.registerType({ |
||||
id: 'rejected-message-by-peer', |
||||
system: true, |
||||
message: 'This_message_was_rejected_by__peer__peer', |
||||
data(message) { |
||||
return { |
||||
peer: message.peer, |
||||
}; |
||||
}, |
||||
}); |
||||
MessageTypes.registerType({ |
||||
id: 'peer-does-not-exist', |
||||
system: true, |
||||
message: 'The_peer__peer__does_not_exist', |
||||
data(message) { |
||||
return { |
||||
peer: message.peer, |
||||
}; |
||||
}, |
||||
}); |
||||
@ -0,0 +1,23 @@ |
||||
Package.describe({ |
||||
name: 'rocketchat:federation', |
||||
version: '0.0.1', |
||||
summary: 'RocketChat support for federating with other RocketChat servers', |
||||
git: '', |
||||
}); |
||||
|
||||
Package.onUse(function(api) { |
||||
api.use([ |
||||
'ecmascript', |
||||
'rocketchat:api', |
||||
'rocketchat:lib', |
||||
'rocketchat:reactions', |
||||
'rocketchat:models', |
||||
'rocketchat:settings', |
||||
]); |
||||
|
||||
api.use('accounts-base', 'server'); |
||||
api.use('accounts-password', 'server'); |
||||
|
||||
api.mainModule('client/main.js', 'client'); |
||||
api.mainModule('server/main.js', 'server'); |
||||
}); |
||||
@ -0,0 +1,264 @@ |
||||
import { Meteor } from 'meteor/meteor'; |
||||
import { sendMessage, updateMessage } from 'meteor/rocketchat:lib'; |
||||
import { Messages, Rooms, Users } from 'meteor/rocketchat:models'; |
||||
import { FileUpload } from 'meteor/rocketchat:file-upload'; |
||||
|
||||
import FederatedResource from './FederatedResource'; |
||||
import FederatedRoom from './FederatedRoom'; |
||||
import FederatedUser from './FederatedUser'; |
||||
import peerClient from '../peerClient'; |
||||
|
||||
class FederatedMessage extends FederatedResource { |
||||
constructor(localPeerIdentifier, message) { |
||||
super('message'); |
||||
|
||||
if (!message) { |
||||
throw new Error('message param cannot be empty'); |
||||
} |
||||
|
||||
this.localPeerIdentifier = localPeerIdentifier; |
||||
|
||||
// Make sure room dates are correct
|
||||
message.ts = new Date(message.ts); |
||||
message._updatedAt = new Date(message._updatedAt); |
||||
|
||||
// Set the message author
|
||||
if (message.u.federation) { |
||||
this.federatedAuthor = FederatedUser.loadByFederationId(localPeerIdentifier, message.u.federation._id); |
||||
} else { |
||||
const author = Users.findOneById(message.u._id); |
||||
this.federatedAuthor = new FederatedUser(localPeerIdentifier, author); |
||||
} |
||||
|
||||
message.u = { |
||||
username: this.federatedAuthor.user.username, |
||||
federation: { |
||||
_id: this.federatedAuthor.user.federation._id, |
||||
}, |
||||
}; |
||||
|
||||
// Set the room
|
||||
const room = Rooms.findOneById(message.rid); |
||||
|
||||
// Prepare the federation property
|
||||
if (!message.federation) { |
||||
const federation = { |
||||
_id: message._id, |
||||
peer: localPeerIdentifier, |
||||
roomId: room.federation._id, |
||||
}; |
||||
|
||||
// Prepare the user
|
||||
message.federation = federation; |
||||
|
||||
// Update the user
|
||||
Messages.update(message._id, { $set: { federation } }); |
||||
|
||||
// Prepare mentions
|
||||
for (const mention of message.mentions) { |
||||
|
||||
mention.federation = mention.federation || {}; |
||||
|
||||
if (mention.username.indexOf('@') === -1) { |
||||
mention.federation.peer = localPeerIdentifier; |
||||
} else { |
||||
const [username, peer] = mention.username.split('@'); |
||||
|
||||
mention.username = username; |
||||
mention.federation.peer = peer; |
||||
} |
||||
} |
||||
|
||||
// Prepare channels
|
||||
for (const channel of message.channels) { |
||||
channel.federation = channel.federation || {}; |
||||
|
||||
if (channel.name.indexOf('@') === -1) { |
||||
channel.federation.peer = localPeerIdentifier; |
||||
} else { |
||||
channel.name = channel.name.split('@')[0]; |
||||
channel.federation.peer = channel.name.split('@')[1]; |
||||
} |
||||
} |
||||
} |
||||
|
||||
// Set message property
|
||||
this.message = message; |
||||
} |
||||
|
||||
getFederationId() { |
||||
return this.message.federation._id; |
||||
} |
||||
|
||||
getMessage() { |
||||
return this.message; |
||||
} |
||||
|
||||
getLocalMessage() { |
||||
this.log('getLocalMessage'); |
||||
|
||||
const { localPeerIdentifier, message } = this; |
||||
|
||||
const localMessage = Object.assign({}, message); |
||||
|
||||
// Make sure `u` is correct
|
||||
if (!this.federatedAuthor) { |
||||
throw new Error('Author does not exist'); |
||||
} |
||||
|
||||
const localAuthor = this.federatedAuthor.getLocalUser(); |
||||
|
||||
localMessage.u = { |
||||
_id: localAuthor._id, |
||||
username: localAuthor.username, |
||||
}; |
||||
|
||||
// Make sure `rid` is correct
|
||||
const federatedRoom = FederatedRoom.loadByFederationId(localPeerIdentifier, message.federation.roomId); |
||||
|
||||
if (!federatedRoom) { |
||||
throw new Error('Room does not exist'); |
||||
} |
||||
|
||||
const localRoom = federatedRoom.getLocalRoom(); |
||||
|
||||
localMessage.rid = localRoom._id; |
||||
|
||||
return localMessage; |
||||
} |
||||
|
||||
create() { |
||||
this.log('create'); |
||||
|
||||
// Get the local message object
|
||||
const localMessageObject = this.getLocalMessage(); |
||||
|
||||
// Grab the federation id
|
||||
const { federation: { _id: federationId } } = localMessageObject; |
||||
|
||||
// Check if the message exists
|
||||
let localMessage = Messages.findOne({ 'federation._id': federationId }); |
||||
|
||||
// Create if needed
|
||||
if (!localMessage) { |
||||
delete localMessageObject._id; |
||||
|
||||
localMessage = localMessageObject; |
||||
|
||||
const localRoom = Rooms.findOneById(localMessage.rid); |
||||
|
||||
// Normalize mentions
|
||||
for (const mention of localMessage.mentions) { |
||||
// Ignore if we are dealing with all, here or rocket.cat
|
||||
if (['all', 'here', 'rocket.cat'].indexOf(mention.username) !== -1) { continue; } |
||||
|
||||
let usernameToReplace = ''; |
||||
|
||||
if (mention.federation.peer !== this.localPeerIdentifier) { |
||||
usernameToReplace = mention.username; |
||||
|
||||
mention.username = `${ mention.username }@${ mention.federation.peer }`; |
||||
} else { |
||||
usernameToReplace = `${ mention.username }@${ mention.federation.peer }`; |
||||
} |
||||
|
||||
localMessage.msg = localMessage.msg.split(usernameToReplace).join(mention.username); |
||||
} |
||||
|
||||
// Normalize channels
|
||||
for (const channel of localMessage.channels) { |
||||
if (channel.federation.peer !== this.localPeerIdentifier) { |
||||
channel.name = `${ channel.name }@${ channel.federation.peer }`; |
||||
} |
||||
} |
||||
|
||||
// Is there a file?
|
||||
if (localMessage.file) { |
||||
const fileStore = FileUpload.getStore('Uploads'); |
||||
|
||||
const { federation: { peer: identifier } } = localMessage; |
||||
|
||||
const { upload, buffer } = peerClient.getUpload({ identifier, localMessage }); |
||||
|
||||
const oldUploadId = upload._id; |
||||
|
||||
// Normalize upload
|
||||
delete upload._id; |
||||
upload.rid = localMessage.rid; |
||||
upload.userId = localMessage.u._id; |
||||
upload.federation = { |
||||
_id: localMessage.file._id, |
||||
peer: identifier, |
||||
}; |
||||
|
||||
Meteor.runAsUser(upload.userId, () => Meteor.wrapAsync(fileStore.insert.bind(fileStore))(upload, buffer)); |
||||
|
||||
// Update the message's file
|
||||
localMessage.file._id = upload._id; |
||||
|
||||
// Update the message's attachments
|
||||
for (const attachment of localMessage.attachments) { |
||||
attachment.title_link = attachment.title_link.replace(oldUploadId, upload._id); |
||||
attachment.image_url = attachment.image_url.replace(oldUploadId, upload._id); |
||||
} |
||||
} |
||||
|
||||
// Create the message
|
||||
const { _id } = sendMessage(localMessage.u, localMessage, localRoom, false); |
||||
|
||||
localMessage._id = _id; |
||||
} |
||||
|
||||
return localMessage; |
||||
} |
||||
|
||||
update(updatedByFederatedUser) { |
||||
this.log('update'); |
||||
|
||||
// Get the original message
|
||||
const originalMessage = Messages.findOne({ 'federation._id': this.getFederationId() }); |
||||
|
||||
// Error if message does not exist
|
||||
if (!originalMessage) { |
||||
throw new Error('Message does not exist'); |
||||
} |
||||
|
||||
// Get the local message object
|
||||
const localMessage = this.getLocalMessage(); |
||||
|
||||
// Make sure the message has the correct _id
|
||||
localMessage._id = originalMessage._id; |
||||
|
||||
// Get the user who updated
|
||||
const user = updatedByFederatedUser.getLocalUser(); |
||||
|
||||
// Update the message
|
||||
updateMessage(localMessage, user, originalMessage); |
||||
|
||||
return localMessage; |
||||
} |
||||
} |
||||
|
||||
FederatedMessage.loadByFederationId = function loadByFederationId(localPeerIdentifier, federationId) { |
||||
const localMessage = Messages.findOne({ 'federation._id': federationId }); |
||||
|
||||
if (!localMessage) { return; } |
||||
|
||||
return new FederatedMessage(localPeerIdentifier, localMessage); |
||||
}; |
||||
|
||||
FederatedMessage.loadOrCreate = function loadOrCreate(localPeerIdentifier, message) { |
||||
const { federation } = message; |
||||
|
||||
if (federation) { |
||||
const federatedMessage = FederatedMessage.loadByFederationId(localPeerIdentifier, federation._id); |
||||
|
||||
if (federatedMessage) { |
||||
return federatedMessage; |
||||
} |
||||
} |
||||
|
||||
return new FederatedMessage(localPeerIdentifier, message); |
||||
}; |
||||
|
||||
export default FederatedMessage; |
||||
@ -0,0 +1,19 @@ |
||||
import { logger } from '../logger'; |
||||
|
||||
class FederatedResource { |
||||
constructor(name) { |
||||
this.resourceName = `federated-${ name }`; |
||||
|
||||
this.log('Creating federated resource'); |
||||
} |
||||
|
||||
log(message) { |
||||
FederatedResource.log(this.resourceName, message); |
||||
} |
||||
} |
||||
|
||||
FederatedResource.log = function log(name, message) { |
||||
logger.resource.info(`[${ name }] ${ message }`); |
||||
}; |
||||
|
||||
export default FederatedResource; |
||||
@ -0,0 +1,270 @@ |
||||
import { createRoom } from 'meteor/rocketchat:lib'; |
||||
import { Rooms, Subscriptions, Users } from 'meteor/rocketchat:models'; |
||||
|
||||
import FederatedResource from './FederatedResource'; |
||||
import FederatedUser from './FederatedUser'; |
||||
|
||||
class FederatedRoom extends FederatedResource { |
||||
constructor(localPeerIdentifier, room, extras = {}) { |
||||
super('room'); |
||||
|
||||
if (!room) { |
||||
throw new Error('room param cannot be empty'); |
||||
} |
||||
|
||||
this.localPeerIdentifier = localPeerIdentifier; |
||||
|
||||
// Make sure room dates are correct
|
||||
room.ts = new Date(room.ts); |
||||
room._updatedAt = new Date(room._updatedAt); |
||||
|
||||
// Set the name
|
||||
if (room.t !== 'd' && room.name.indexOf('@') === -1) { |
||||
room.name = `${ room.name }@${ localPeerIdentifier }`; |
||||
} |
||||
|
||||
// Set the federated owner, if there is one
|
||||
const { owner } = extras; |
||||
|
||||
if (owner) { |
||||
if (!owner && room.federation) { |
||||
this.federatedOwner = FederatedUser.loadByFederationId(localPeerIdentifier, room.federation.ownerId); |
||||
} else { |
||||
this.federatedOwner = FederatedUser.loadOrCreate(localPeerIdentifier, owner); |
||||
} |
||||
} |
||||
|
||||
// Set base federation
|
||||
room.federation = room.federation || { |
||||
_id: room._id, |
||||
peer: localPeerIdentifier, |
||||
ownerId: this.federatedOwner ? this.federatedOwner.getFederationId() : null, |
||||
}; |
||||
|
||||
// Set room property
|
||||
this.room = room; |
||||
} |
||||
|
||||
getFederationId() { |
||||
return this.room.federation._id; |
||||
} |
||||
|
||||
getPeers() { |
||||
return this.room.federation.peers; |
||||
} |
||||
|
||||
getRoom() { |
||||
return this.room; |
||||
} |
||||
|
||||
getOwner() { |
||||
return this.federatedOwner ? this.federatedOwner.getUser() : null; |
||||
} |
||||
|
||||
getUsers() { |
||||
return this.federatedUsers.map((u) => u.getUser()); |
||||
} |
||||
|
||||
loadUsers() { |
||||
const { room } = this; |
||||
|
||||
// Get all room users
|
||||
const users = FederatedRoom.loadRoomUsers(room); |
||||
|
||||
this.setUsers(users); |
||||
} |
||||
|
||||
setUsers(users) { |
||||
const { localPeerIdentifier } = this; |
||||
|
||||
// Initialize federatedUsers
|
||||
this.federatedUsers = []; |
||||
|
||||
for (const user of users) { |
||||
const federatedUser = FederatedUser.loadOrCreate(localPeerIdentifier, user); |
||||
|
||||
// Keep the federated user
|
||||
this.federatedUsers.push(federatedUser); |
||||
} |
||||
} |
||||
|
||||
refreshFederation() { |
||||
const { room } = this; |
||||
|
||||
// Prepare the federated users
|
||||
let federation = { |
||||
peers: [], |
||||
users: [], |
||||
}; |
||||
|
||||
// Check all the peers
|
||||
for (const federatedUser of this.federatedUsers) { |
||||
// Add federation data to the room
|
||||
const { user: { federation: { _id, peer } } } = federatedUser; |
||||
|
||||
federation.peers.push(peer); |
||||
federation.users.push({ _id, peer }); |
||||
} |
||||
|
||||
federation.peers = [...new Set(federation.peers)]; |
||||
|
||||
federation = Object.assign(room.federation || {}, federation); |
||||
|
||||
// Prepare the room
|
||||
room.federation = federation; |
||||
|
||||
// Update the room
|
||||
Rooms.update(room._id, { $set: { federation } }); |
||||
} |
||||
|
||||
getLocalRoom() { |
||||
this.log('getLocalRoom'); |
||||
|
||||
const { localPeerIdentifier, room, room: { federation } } = this; |
||||
|
||||
const localRoom = Object.assign({}, room); |
||||
|
||||
if (federation.peer === localPeerIdentifier) { |
||||
if (localRoom.t !== 'd') { |
||||
localRoom.name = room.name.split('@')[0]; |
||||
} |
||||
} |
||||
|
||||
return localRoom; |
||||
} |
||||
|
||||
createUsers() { |
||||
this.log('createUsers'); |
||||
|
||||
const { federatedUsers } = this; |
||||
|
||||
// Create, if needed, all room's users
|
||||
for (const federatedUser of federatedUsers) { |
||||
federatedUser.create(); |
||||
} |
||||
} |
||||
|
||||
create() { |
||||
this.log('create'); |
||||
|
||||
// Get the local room object (with or without suffixes)
|
||||
const localRoomObject = this.getLocalRoom(); |
||||
|
||||
// Grab the federation id
|
||||
const { federation: { _id: federationId } } = localRoomObject; |
||||
|
||||
// Check if the user exists
|
||||
let localRoom = FederatedRoom.loadByFederationId(this.localPeerIdentifier, federationId); |
||||
|
||||
// Create if needed
|
||||
if (!localRoom) { |
||||
delete localRoomObject._id; |
||||
|
||||
localRoom = localRoomObject; |
||||
|
||||
const { t: type, name, broadcast, customFields, federation, sysMes } = localRoom; |
||||
const { federatedOwner, federatedUsers } = this; |
||||
|
||||
// Get usernames for the owner and members
|
||||
const ownerUsername = federatedOwner.user.username; |
||||
const members = []; |
||||
|
||||
if (type !== 'd') { |
||||
for (const federatedUser of federatedUsers) { |
||||
const localUser = federatedUser.getLocalUser(); |
||||
members.push(localUser.username); |
||||
} |
||||
} else { |
||||
for (const federatedUser of federatedUsers) { |
||||
const localUser = federatedUser.getLocalUser(); |
||||
members.push(localUser); |
||||
} |
||||
} |
||||
|
||||
// Is this a broadcast channel? Then mute everyone but the owner
|
||||
let muted = []; |
||||
|
||||
if (broadcast) { |
||||
muted = members.filter((u) => u !== ownerUsername); |
||||
} |
||||
|
||||
// Set the extra data and create room options
|
||||
let extraData = { |
||||
federation, |
||||
}; |
||||
|
||||
let createRoomOptions = { |
||||
subscriptionExtra: { |
||||
alert: true, |
||||
open: true, |
||||
}, |
||||
}; |
||||
|
||||
if (type !== 'd') { |
||||
extraData = Object.assign(extraData, { |
||||
broadcast, |
||||
customFields, |
||||
encrypted: false, // Always false for now
|
||||
muted, |
||||
sysMes, |
||||
}); |
||||
|
||||
createRoomOptions = Object.assign(extraData, { |
||||
nameValidationRegex: '^[0-9a-zA-Z-_.@]+$', |
||||
subscriptionExtra: { |
||||
alert: true, |
||||
}, |
||||
}); |
||||
} |
||||
|
||||
// Create the room
|
||||
// !!!! Forcing direct or private only, no public rooms for now
|
||||
const { rid } = createRoom(type === 'd' ? type : 'p', name, ownerUsername, members, false, extraData, createRoomOptions); |
||||
|
||||
localRoom._id = rid; |
||||
} |
||||
|
||||
return localRoom; |
||||
} |
||||
} |
||||
|
||||
FederatedRoom.loadByFederationId = function loadByFederationId(localPeerIdentifier, federationId) { |
||||
const localRoom = Rooms.findOne({ 'federation._id': federationId }); |
||||
|
||||
if (!localRoom) { return; } |
||||
|
||||
return new FederatedRoom(localPeerIdentifier, localRoom); |
||||
}; |
||||
|
||||
FederatedRoom.loadRoomUsers = function loadRoomUsers(room) { |
||||
const subscriptions = Subscriptions.findByRoomIdWhenUsernameExists(room._id, { fields: { 'u._id': 1 } }).fetch(); |
||||
const userIds = subscriptions.map((s) => s.u._id); |
||||
return Users.findUsersWithUsernameByIds(userIds).fetch(); |
||||
}; |
||||
|
||||
FederatedRoom.isFederated = function isFederated(localPeerIdentifier, room, options = {}) { |
||||
this.log('federated-room', `${ room._id } - isFederated?`); |
||||
|
||||
let isFederated = false; |
||||
|
||||
if (options.checkUsingUsers) { |
||||
// Get all room users
|
||||
const users = FederatedRoom.loadRoomUsers(room); |
||||
|
||||
// Check all the users
|
||||
for (const user of users) { |
||||
if (user.federation && user.federation.peer !== localPeerIdentifier) { |
||||
isFederated = true; |
||||
break; |
||||
} |
||||
} |
||||
} else { |
||||
isFederated = room.federation && room.federation.peers.length > 1; |
||||
} |
||||
|
||||
this.log('federated-room', `${ room._id } - isFederated? ${ isFederated ? 'yes' : 'no' }`); |
||||
|
||||
return isFederated; |
||||
}; |
||||
|
||||
export default FederatedRoom; |
||||
@ -0,0 +1,124 @@ |
||||
import { Users } from 'meteor/rocketchat:models'; |
||||
|
||||
import FederatedResource from './FederatedResource'; |
||||
|
||||
class FederatedUser extends FederatedResource { |
||||
constructor(localPeerIdentifier, user) { |
||||
super('user'); |
||||
|
||||
if (!user) { |
||||
throw new Error('user param cannot be empty'); |
||||
} |
||||
|
||||
this.localPeerIdentifier = localPeerIdentifier; |
||||
|
||||
// Make sure all properties are normalized
|
||||
// Prepare the federation property
|
||||
if (!user.federation) { |
||||
const federation = { |
||||
_id: user._id, |
||||
peer: localPeerIdentifier, |
||||
}; |
||||
|
||||
// Prepare the user
|
||||
user.federation = federation; |
||||
|
||||
// Update the user
|
||||
Users.update(user._id, { $set: { federation } }); |
||||
} |
||||
|
||||
// Make sure user dates are correct
|
||||
user.createdAt = new Date(user.createdAt); |
||||
user.lastLogin = new Date(user.lastLogin); |
||||
user._updatedAt = new Date(user._updatedAt); |
||||
|
||||
// Delete sensitive data as well
|
||||
delete user.roles; |
||||
delete user.services; |
||||
|
||||
// Make sure some other properties are ready
|
||||
user.name = user.name; |
||||
user.username = user.username.indexOf('@') === -1 ? `${ user.username }@${ user.federation.peer }` : user.username; |
||||
user.roles = ['user']; |
||||
user.status = 'online'; |
||||
user.statusConnection = 'online'; |
||||
user.type = 'user'; |
||||
|
||||
// Set user property
|
||||
this.user = user; |
||||
} |
||||
|
||||
getFederationId() { |
||||
return this.user.federation._id; |
||||
} |
||||
|
||||
getUser() { |
||||
return this.user; |
||||
} |
||||
|
||||
getLocalUser() { |
||||
this.log('getLocalUser'); |
||||
|
||||
const { localPeerIdentifier, user, user: { federation } } = this; |
||||
|
||||
const localUser = Object.assign({}, user); |
||||
|
||||
if (federation.peer === localPeerIdentifier || user.username === 'rocket.cat') { |
||||
localUser.username = user.username.split('@')[0]; |
||||
localUser.name = user.name.split('@')[0]; |
||||
} |
||||
|
||||
return localUser; |
||||
} |
||||
|
||||
create() { |
||||
this.log('create'); |
||||
|
||||
// Get the local user object (with or without suffixes)
|
||||
const localUserObject = this.getLocalUser(); |
||||
|
||||
// Grab the federation id
|
||||
const { federation: { _id: federationId } } = localUserObject; |
||||
|
||||
// Check if the user exists
|
||||
let localUser = Users.findOne({ 'federation._id': federationId }); |
||||
|
||||
// Create if needed
|
||||
if (!localUser) { |
||||
delete localUserObject._id; |
||||
|
||||
localUser = localUserObject; |
||||
|
||||
localUser._id = Users.create(localUserObject); |
||||
} |
||||
|
||||
// Update the id
|
||||
this.user._id = localUser._id; |
||||
|
||||
return localUser; |
||||
} |
||||
} |
||||
|
||||
FederatedUser.loadByFederationId = function loadByFederationId(localPeerIdentifier, federationId) { |
||||
const localUser = Users.findOne({ 'federation._id': federationId }); |
||||
|
||||
if (!localUser) { return; } |
||||
|
||||
return new FederatedUser(localPeerIdentifier, localUser); |
||||
}; |
||||
|
||||
FederatedUser.loadOrCreate = function loadOrCreate(localPeerIdentifier, user) { |
||||
const { federation } = user; |
||||
|
||||
if (federation) { |
||||
const federatedUser = FederatedUser.loadByFederationId(localPeerIdentifier, federation._id); |
||||
|
||||
if (federatedUser) { |
||||
return federatedUser; |
||||
} |
||||
} |
||||
|
||||
return new FederatedUser(localPeerIdentifier, user); |
||||
}; |
||||
|
||||
export default FederatedUser; |
||||
@ -0,0 +1,4 @@ |
||||
export { default as FederatedMessage } from './FederatedMessage'; |
||||
export { default as FederatedResource } from './FederatedResource'; |
||||
export { default as FederatedRoom } from './FederatedRoom'; |
||||
export { default as FederatedUser } from './FederatedUser'; |
||||
@ -0,0 +1,68 @@ |
||||
import { Meteor } from 'meteor/meteor'; |
||||
import { settings } from 'meteor/rocketchat:settings'; |
||||
import { FederationKeys } from 'meteor/rocketchat:models'; |
||||
|
||||
Meteor.startup(function() { |
||||
// const federationUniqueId = FederationKeys.getUniqueId();
|
||||
const federationPublicKey = FederationKeys.getPublicKeyString(); |
||||
|
||||
settings.addGroup('Federation', function() { |
||||
this.add('FEDERATION_Enabled', false, { |
||||
type: 'boolean', |
||||
i18nLabel: 'Enabled', |
||||
i18nDescription: 'FEDERATION_Enabled', |
||||
alert: 'FEDERATION_Enabled_Alert', |
||||
public: true, |
||||
}); |
||||
|
||||
this.add('FEDERATION_Status', '-', { |
||||
readonly: true, |
||||
type: 'string', |
||||
i18nLabel: 'FEDERATION_Status', |
||||
}); |
||||
|
||||
// this.add('FEDERATION_Unique_Id', federationUniqueId, {
|
||||
// readonly: true,
|
||||
// type: 'string',
|
||||
// i18nLabel: 'FEDERATION_Unique_Id',
|
||||
// i18nDescription: 'FEDERATION_Unique_Id_Description',
|
||||
// });
|
||||
|
||||
this.add('FEDERATION_Domain', '', { |
||||
type: 'string', |
||||
i18nLabel: 'FEDERATION_Domain', |
||||
i18nDescription: 'FEDERATION_Domain_Description', |
||||
alert: 'FEDERATION_Domain_Alert', |
||||
}); |
||||
|
||||
this.add('FEDERATION_Public_Key', federationPublicKey, { |
||||
readonly: true, |
||||
type: 'string', |
||||
multiline: true, |
||||
i18nLabel: 'FEDERATION_Public_Key', |
||||
i18nDescription: 'FEDERATION_Public_Key_Description', |
||||
}); |
||||
|
||||
this.add('FEDERATION_Hub_URL', 'https://hub.rocket.chat', { |
||||
group: 'Federation Hub', |
||||
type: 'string', |
||||
i18nLabel: 'FEDERATION_Hub_URL', |
||||
i18nDescription: 'FEDERATION_Hub_URL_Description', |
||||
}); |
||||
|
||||
this.add('FEDERATION_Discovery_Method', 'dns', { |
||||
type: 'select', |
||||
values: [{ |
||||
key: 'dns', |
||||
i18nLabel: 'DNS', |
||||
}, { |
||||
key: 'hub', |
||||
i18nLabel: 'Hub', |
||||
}], |
||||
i18nLabel: 'FEDERATION_Discovery_Method', |
||||
i18nDescription: 'FEDERATION_Discovery_Method_Description', |
||||
public: true, |
||||
}); |
||||
|
||||
}); |
||||
}); |
||||
@ -0,0 +1,12 @@ |
||||
import { Logger } from 'meteor/rocketchat:logger'; |
||||
|
||||
export const logger = new Logger('Federation', { |
||||
sections: { |
||||
resource: 'Resource', |
||||
setup: 'Setup', |
||||
peerClient: 'Peer Client', |
||||
peerServer: 'Peer Server', |
||||
dns: 'DNS', |
||||
http: 'HTTP', |
||||
}, |
||||
}); |
||||
@ -0,0 +1,152 @@ |
||||
import { Meteor } from 'meteor/meteor'; |
||||
import { _ } from 'meteor/underscore'; |
||||
import { settings } from 'meteor/rocketchat:settings'; |
||||
import { FederationKeys } from 'meteor/rocketchat:models'; |
||||
|
||||
import './federation-settings'; |
||||
import './methods'; |
||||
|
||||
import { logger } from './logger'; |
||||
import peerClient from './peerClient'; |
||||
import peerServer from './peerServer'; |
||||
import peerDNS from './peerDNS'; |
||||
import peerHTTP from './peerHTTP'; |
||||
import * as SettingsUpdater from './settingsUpdater'; |
||||
|
||||
export const Federation = { |
||||
enabled: false, |
||||
privateKey: null, |
||||
publicKey: null, |
||||
usingHub: null, |
||||
uniqueId: null, |
||||
localIdentifier: null, |
||||
}; |
||||
|
||||
// Generate keys
|
||||
|
||||
// Create unique id if needed
|
||||
if (!FederationKeys.getUniqueId()) { |
||||
FederationKeys.generateUniqueId(); |
||||
} |
||||
|
||||
// Create key pair if needed
|
||||
if (!FederationKeys.getPublicKey()) { |
||||
FederationKeys.generateKeys(); |
||||
} |
||||
|
||||
// Initializations
|
||||
|
||||
// Start the client, setting up all the callbacks
|
||||
peerClient.start(); |
||||
|
||||
// Start the server, setting up all the endpoints
|
||||
peerServer.start(); |
||||
|
||||
const updateSettings = _.debounce(Meteor.bindEnvironment(function() { |
||||
const _enabled = settings.get('FEDERATION_Enabled'); |
||||
|
||||
if (!_enabled) { return; } |
||||
|
||||
// If it is enabled, check if the settings are there
|
||||
const _uniqueId = settings.get('FEDERATION_Unique_Id'); |
||||
const _domain = settings.get('FEDERATION_Domain'); |
||||
const _discoveryMethod = settings.get('FEDERATION_Discovery_Method'); |
||||
const _hubUrl = settings.get('FEDERATION_Hub_URL'); |
||||
const _peerUrl = settings.get('Site_Url'); |
||||
|
||||
if (!_domain || !_discoveryMethod || !_hubUrl || !_peerUrl) { |
||||
SettingsUpdater.updateStatus('Could not enable, settings are not fully set'); |
||||
|
||||
logger.setup.error('Could not enable Federation, settings are not fully set'); |
||||
|
||||
return; |
||||
} |
||||
|
||||
logger.setup.info('Updating settings...'); |
||||
|
||||
// Normalize the config values
|
||||
const config = { |
||||
hub: { |
||||
active: _discoveryMethod === 'hub', |
||||
url: _hubUrl.replace(/\/+$/, ''), |
||||
}, |
||||
peer: { |
||||
uniqueId: _uniqueId, |
||||
domain: _domain.replace('@', '').trim(), |
||||
url: _peerUrl.replace(/\/+$/, ''), |
||||
public_key: FederationKeys.getPublicKeyString(), |
||||
}, |
||||
}; |
||||
|
||||
// If the settings are correctly set, let's update the configuration
|
||||
|
||||
// Get the key pair
|
||||
Federation.privateKey = FederationKeys.getPrivateKey(); |
||||
Federation.publicKey = FederationKeys.getPublicKey(); |
||||
|
||||
// Set important information
|
||||
Federation.enabled = true; |
||||
Federation.usingHub = config.hub.active; |
||||
Federation.uniqueId = config.peer.uniqueId; |
||||
Federation.localIdentifier = config.peer.domain; |
||||
|
||||
// Set DNS
|
||||
peerDNS.setConfig(config); |
||||
|
||||
// Set HTTP
|
||||
peerHTTP.setConfig(config); |
||||
|
||||
// Set Client
|
||||
peerClient.setConfig(config); |
||||
peerClient.enable(); |
||||
|
||||
// Set server
|
||||
peerServer.setConfig(config); |
||||
peerServer.enable(); |
||||
|
||||
// Register the client
|
||||
if (peerClient.register()) { |
||||
SettingsUpdater.updateStatus('Running'); |
||||
} else { |
||||
SettingsUpdater.updateNextStatusTo('Disabled, could not register with Hub'); |
||||
SettingsUpdater.updateEnabled(false); |
||||
} |
||||
}), 150); |
||||
|
||||
function enableOrDisable() { |
||||
const _enabled = settings.get('FEDERATION_Enabled'); |
||||
|
||||
// If it was enabled, and was disabled now,
|
||||
// make sure we disable everything: callbacks and endpoints
|
||||
if (Federation.enabled && !_enabled) { |
||||
peerClient.disable(); |
||||
peerServer.disable(); |
||||
|
||||
// Disable federation
|
||||
Federation.enabled = false; |
||||
|
||||
SettingsUpdater.updateStatus('Disabled'); |
||||
|
||||
logger.setup.info('Shutting down...'); |
||||
|
||||
return; |
||||
} |
||||
|
||||
// If not enabled, skip
|
||||
if (!_enabled) { |
||||
SettingsUpdater.updateStatus('Disabled'); |
||||
return; |
||||
} |
||||
|
||||
logger.setup.info('Booting...'); |
||||
|
||||
SettingsUpdater.updateStatus('Booting...'); |
||||
|
||||
updateSettings(); |
||||
} |
||||
|
||||
// Add settings listeners
|
||||
settings.get('FEDERATION_Enabled', enableOrDisable); |
||||
settings.get('FEDERATION_Domain', updateSettings); |
||||
settings.get('FEDERATION_Discovery_Method', updateSettings); |
||||
settings.get('FEDERATION_Hub_URL', updateSettings); |
||||
@ -0,0 +1,48 @@ |
||||
import { Meteor } from 'meteor/meteor'; |
||||
import { Users } from 'meteor/rocketchat:models'; |
||||
|
||||
import { logger } from '../logger'; |
||||
import peerClient from '../peerClient'; |
||||
import peerServer from '../peerClient'; |
||||
|
||||
Meteor.methods({ |
||||
federationAddUser(emailAddress, domainOverride) { |
||||
if (!Meteor.userId()) { |
||||
throw new Meteor.Error('error-invalid-user', 'Invalid user', { method: 'federationAddUser' }); |
||||
} |
||||
|
||||
if (!peerServer.enabled) { |
||||
throw new Meteor.Error('error-federation-disabled', 'Federation disabled', { method: 'federationAddUser' }); |
||||
} |
||||
|
||||
// Make sure the federated user still exists, and get the unique one, by email address
|
||||
const [federatedUser] = peerClient.findUsers(emailAddress, { domainOverride, emailOnly: true }); |
||||
|
||||
if (!federatedUser) { |
||||
throw new Meteor.Error('federation-invalid-user', 'There is no user to add.'); |
||||
} |
||||
|
||||
let user = null; |
||||
|
||||
const localUser = federatedUser.getLocalUser(); |
||||
|
||||
localUser.name += `@${ federatedUser.user.federation.peer }`; |
||||
|
||||
// Delete the _id
|
||||
delete localUser._id; |
||||
|
||||
try { |
||||
// Create the local user
|
||||
user = Users.create(localUser); |
||||
} catch (err) { |
||||
// If the user already exists, return the existing user
|
||||
if (err.code === 11000) { |
||||
user = Users.findOne({ 'federation._id': localUser.federation._id }); |
||||
} |
||||
|
||||
logger.error(err); |
||||
} |
||||
|
||||
return user; |
||||
}, |
||||
}); |
||||
@ -0,0 +1,24 @@ |
||||
import { Meteor } from 'meteor/meteor'; |
||||
|
||||
import peerClient from '../peerClient'; |
||||
import peerServer from '../peerServer'; |
||||
|
||||
Meteor.methods({ |
||||
federationSearchUsers(email) { |
||||
if (!Meteor.userId()) { |
||||
throw new Meteor.Error('error-invalid-user', 'Invalid user', { method: 'federationSearchUsers' }); |
||||
} |
||||
|
||||
if (!peerServer.enabled) { |
||||
throw new Meteor.Error('error-federation-disabled', 'Federation disabled', { method: 'federationAddUser' }); |
||||
} |
||||
|
||||
const federatedUsers = peerClient.findUsers(email); |
||||
|
||||
if (!federatedUsers.length) { |
||||
throw new Meteor.Error('federation-user-not-found', `Could not find federated users using "${ email }"`); |
||||
} |
||||
|
||||
return federatedUsers; |
||||
}, |
||||
}); |
||||
@ -0,0 +1,2 @@ |
||||
import './federationSearchUsers'; |
||||
import './federationAddUser'; |
||||
@ -0,0 +1,614 @@ |
||||
import qs from 'querystring'; |
||||
import { Meteor } from 'meteor/meteor'; |
||||
import { callbacks } from 'meteor/rocketchat:callbacks'; |
||||
import { settings } from 'meteor/rocketchat:settings'; |
||||
import { FederationEvents, FederationKeys, Messages, Rooms, Subscriptions, Users } from 'meteor/rocketchat:models'; |
||||
|
||||
import { Federation } from './main'; |
||||
import peerDNS from './peerDNS'; |
||||
import peerHTTP from './peerHTTP'; |
||||
import { updateStatus } from './settingsUpdater'; |
||||
import { logger } from './logger'; |
||||
import { FederatedMessage, FederatedRoom, FederatedUser } from './federatedResources'; |
||||
|
||||
class PeerClient { |
||||
constructor() { |
||||
this.config = {}; |
||||
|
||||
this.enabled = false; |
||||
|
||||
// Keep resources we should skip callbacks
|
||||
this.callbacksToSkip = {}; |
||||
} |
||||
|
||||
setConfig(config) { |
||||
// General
|
||||
this.config = config; |
||||
|
||||
// Setup HubPeer
|
||||
const { hub: { url } } = this.config; |
||||
|
||||
// Remove trailing slash
|
||||
this.HubPeer = { url }; |
||||
|
||||
// Set the local peer
|
||||
this.peer = { |
||||
domain: this.config.peer.domain, |
||||
url: this.config.peer.url, |
||||
public_key: this.config.peer.public_key, |
||||
}; |
||||
} |
||||
|
||||
log(message) { |
||||
logger.peerClient.info(message); |
||||
} |
||||
|
||||
disable() { |
||||
this.log('Disabling...'); |
||||
|
||||
this.enabled = false; |
||||
} |
||||
|
||||
enable() { |
||||
this.log('Enabling...'); |
||||
|
||||
this.enabled = true; |
||||
} |
||||
|
||||
start() { |
||||
this.setupCallbacks(); |
||||
} |
||||
|
||||
// ###########
|
||||
//
|
||||
// Registering
|
||||
//
|
||||
// ###########
|
||||
register() { |
||||
if (this.config.hub.active) { |
||||
updateStatus('Registering with Hub...'); |
||||
|
||||
return peerDNS.register(this.peer); |
||||
} |
||||
|
||||
return true; |
||||
} |
||||
|
||||
// ###################
|
||||
//
|
||||
// Callback management
|
||||
//
|
||||
// ###################
|
||||
addCallbackToSkip(callback, resourceId) { |
||||
this.callbacksToSkip[`${ callback }_${ resourceId }`] = true; |
||||
} |
||||
|
||||
skipCallbackIfNeeded(callback, resource) { |
||||
const { federation } = resource; |
||||
|
||||
if (!federation) { return false; } |
||||
|
||||
const { _id } = federation; |
||||
|
||||
const callbackName = `${ callback }_${ _id }`; |
||||
|
||||
const skipCallback = this.callbacksToSkip[callbackName]; |
||||
|
||||
delete this.callbacksToSkip[callbackName]; |
||||
|
||||
this.log(`${ callbackName } callback ${ skipCallback ? '' : 'not ' }skipped`); |
||||
|
||||
return skipCallback; |
||||
} |
||||
|
||||
wrapEnabled(callbackHandler) { |
||||
return function(...parameters) { |
||||
if (!this.enabled) { return; } |
||||
|
||||
callbackHandler.apply(this, parameters); |
||||
}.bind(this); |
||||
} |
||||
|
||||
setupCallbacks() { |
||||
// Accounts.onLogin(onLoginCallbackHandler.bind(this));
|
||||
// Accounts.onLogout(onLogoutCallbackHandler.bind(this));
|
||||
|
||||
FederationEvents.on('createEvent', this.wrapEnabled(this.onCreateEvent.bind(this))); |
||||
|
||||
callbacks.add('afterCreateDirectRoom', this.wrapEnabled(this.afterCreateDirectRoom.bind(this)), callbacks.priority.LOW, 'federation-create-direct-room'); |
||||
callbacks.add('afterCreateRoom', this.wrapEnabled(this.afterCreateRoom.bind(this)), callbacks.priority.LOW, 'federation-join-room'); |
||||
callbacks.add('afterSaveRoomSettings', this.wrapEnabled(this.afterSaveRoomSettings.bind(this)), callbacks.priority.LOW, 'federation-after-save-room-settings'); |
||||
callbacks.add('afterAddedToRoom', this.wrapEnabled(this.afterAddedToRoom.bind(this)), callbacks.priority.LOW, 'federation-join-room'); |
||||
callbacks.add('beforeLeaveRoom', this.wrapEnabled(this.beforeLeaveRoom.bind(this)), callbacks.priority.LOW, 'federation-leave-room'); |
||||
callbacks.add('beforeRemoveFromRoom', this.wrapEnabled(this.beforeRemoveFromRoom.bind(this)), callbacks.priority.LOW, 'federation-leave-room'); |
||||
callbacks.add('afterSaveMessage', this.wrapEnabled(this.afterSaveMessage.bind(this)), callbacks.priority.LOW, 'federation-save-message'); |
||||
callbacks.add('afterDeleteMessage', this.wrapEnabled(this.afterDeleteMessage.bind(this)), callbacks.priority.LOW, 'federation-delete-message'); |
||||
callbacks.add('afterReadMessages', this.wrapEnabled(this.afterReadMessages.bind(this)), callbacks.priority.LOW, 'federation-read-messages'); |
||||
callbacks.add('afterSetReaction', this.wrapEnabled(this.afterSetReaction.bind(this)), callbacks.priority.LOW, 'federation-after-set-reaction'); |
||||
callbacks.add('afterUnsetReaction', this.wrapEnabled(this.afterUnsetReaction.bind(this)), callbacks.priority.LOW, 'federation-after-unset-reaction'); |
||||
callbacks.add('afterMuteUser', this.wrapEnabled(this.afterMuteUser.bind(this)), callbacks.priority.LOW, 'federation-mute-user'); |
||||
callbacks.add('afterUnmuteUser', this.wrapEnabled(this.afterUnmuteUser.bind(this)), callbacks.priority.LOW, 'federation-unmute-user'); |
||||
|
||||
this.log('Callbacks set'); |
||||
} |
||||
|
||||
// ################
|
||||
//
|
||||
// Event management
|
||||
//
|
||||
// ################
|
||||
propagateEvent(e) { |
||||
this.log(`propagateEvent: ${ e.t }`); |
||||
|
||||
const { peer: domain } = e; |
||||
|
||||
const peer = peerDNS.searchPeer(domain); |
||||
|
||||
if (!peer || !peer.public_key) { |
||||
this.log(`Could not find valid peer:${ domain }`); |
||||
|
||||
FederationEvents.setEventAsErrored(e, 'Could not find valid peer'); |
||||
} else { |
||||
try { |
||||
const stringPayload = JSON.stringify({ event: e }); |
||||
|
||||
// Encrypt with the peer's public key
|
||||
let payload = FederationKeys.loadKey(peer.public_key, 'public').encrypt(stringPayload); |
||||
|
||||
// Encrypt with the local private key
|
||||
payload = Federation.privateKey.encryptPrivate(payload); |
||||
|
||||
peerHTTP.request(peer, 'POST', '/api/v1/federation.events', { payload }, { total: 5, stepSize: 500, stepMultiplier: 10 }); |
||||
|
||||
FederationEvents.setEventAsFullfilled(e); |
||||
} catch (err) { |
||||
this.log(`[${ e.t }] Event could not be sent to peer:${ domain }`); |
||||
|
||||
if (err.response) { |
||||
const { response: { data: error } } = err; |
||||
|
||||
if (error.errorType === 'error-app-prevented-sending') { |
||||
const { payload: { |
||||
message: { |
||||
rid: roomId, |
||||
u: { |
||||
username, |
||||
federation: { _id: userId }, |
||||
}, |
||||
}, |
||||
} } = e; |
||||
|
||||
const localUsername = username.split('@')[0]; |
||||
|
||||
// Create system message
|
||||
Messages.createRejectedMessageByPeer(roomId, localUsername, { |
||||
u: { |
||||
_id: userId, |
||||
username: localUsername, |
||||
}, |
||||
peer: domain, |
||||
}); |
||||
|
||||
return FederationEvents.setEventAsErrored(e, err.error, true); |
||||
} |
||||
} |
||||
|
||||
if (err.error === 'federation-peer-does-not-exist') { |
||||
const { payload: { |
||||
message: { |
||||
rid: roomId, |
||||
u: { |
||||
username, |
||||
federation: { _id: userId }, |
||||
}, |
||||
}, |
||||
} } = e; |
||||
|
||||
const localUsername = username.split('@')[0]; |
||||
|
||||
// Create system message
|
||||
Messages.createPeerDoesNotExist(roomId, localUsername, { |
||||
u: { |
||||
_id: userId, |
||||
username: localUsername, |
||||
}, |
||||
peer: domain, |
||||
}); |
||||
|
||||
return FederationEvents.setEventAsErrored(e, err.error, true); |
||||
} |
||||
|
||||
return FederationEvents.setEventAsErrored(e, `Could not send request to ${ domain }`); |
||||
} |
||||
} |
||||
} |
||||
|
||||
onCreateEvent(e) { |
||||
this.propagateEvent(e); |
||||
} |
||||
|
||||
resendUnfulfilledEvents() { |
||||
// Should we use queues in here?
|
||||
const events = FederationEvents.getUnfulfilled(); |
||||
|
||||
for (const e of events) { |
||||
this.propagateEvent(e); |
||||
} |
||||
} |
||||
|
||||
// #####
|
||||
//
|
||||
// Users
|
||||
//
|
||||
// #####
|
||||
findUsers(email, options = {}) { |
||||
const [username, domain] = email.split('@'); |
||||
|
||||
const { peer: { domain: localPeerDomain } } = this; |
||||
|
||||
let peer = null; |
||||
|
||||
try { |
||||
peer = peerDNS.searchPeer(options.domainOverride || domain); |
||||
} catch (err) { |
||||
this.log(`Could not find peer using domain:${ domain }`); |
||||
throw new Meteor.Error('federation-peer-does-not-exist', `Could not find peer using domain:${ domain }`); |
||||
} |
||||
|
||||
try { |
||||
const { data: { federatedUsers: remoteFederatedUsers } } = peerHTTP.request(peer, 'GET', `/api/v1/federation.users?${ qs.stringify({ username, domain, emailOnly: options.emailOnly }) }`); |
||||
|
||||
const federatedUsers = []; |
||||
|
||||
for (const federatedUser of remoteFederatedUsers) { |
||||
federatedUsers.push(new FederatedUser(localPeerDomain, federatedUser.user)); |
||||
} |
||||
|
||||
return federatedUsers; |
||||
} catch (err) { |
||||
this.log(`Could not find user:${ username } at ${ peer.domain }`); |
||||
throw new Meteor.Error('federation-user-does-not-exist', `Could not find user:${ email } at ${ peer.domain }`); |
||||
} |
||||
} |
||||
|
||||
// #######
|
||||
//
|
||||
// Uploads
|
||||
//
|
||||
// #######
|
||||
getUpload(options) { |
||||
const { identifier: domain, localMessage: { file: { _id: fileId } } } = options; |
||||
|
||||
let peer = null; |
||||
|
||||
try { |
||||
peer = peerDNS.searchPeer(domain); |
||||
} catch (err) { |
||||
this.log(`Could not find peer using domain:${ domain }`); |
||||
throw new Meteor.Error('federation-peer-does-not-exist', `Could not find peer using domain:${ domain }`); |
||||
} |
||||
|
||||
const { data: { upload, buffer } } = peerHTTP.request(peer, 'GET', `/api/v1/federation.uploads?${ qs.stringify({ upload_id: fileId }) }`); |
||||
|
||||
return { upload, buffer: Buffer.from(buffer) }; |
||||
} |
||||
|
||||
// #################
|
||||
//
|
||||
// Callback handlers
|
||||
//
|
||||
// #################
|
||||
afterCreateDirectRoom(room, { from: owner }) { |
||||
this.log('afterCreateDirectRoom'); |
||||
|
||||
const { peer: { domain: localPeerDomain } } = this; |
||||
|
||||
// Check if room is federated
|
||||
if (!FederatedRoom.isFederated(localPeerDomain, room, { checkUsingUsers: true })) { return room; } |
||||
|
||||
const federatedRoom = new FederatedRoom(localPeerDomain, room, { owner }); |
||||
|
||||
// Check if this should be skipped
|
||||
if (this.skipCallbackIfNeeded('afterCreateDirectRoom', federatedRoom.getLocalRoom())) { return room; } |
||||
|
||||
// Load federated users
|
||||
federatedRoom.loadUsers(); |
||||
|
||||
// Refresh room's federation
|
||||
federatedRoom.refreshFederation(); |
||||
|
||||
FederationEvents.directRoomCreated(federatedRoom, { skipPeers: [localPeerDomain] }); |
||||
} |
||||
|
||||
afterCreateRoom(roomOwner, room) { |
||||
this.log('afterCreateRoom'); |
||||
|
||||
const { _id: ownerId } = roomOwner; |
||||
|
||||
const { peer: { domain: localPeerDomain } } = this; |
||||
|
||||
// Check if room is federated
|
||||
if (!FederatedRoom.isFederated(localPeerDomain, room, { checkUsingUsers: true })) { return roomOwner; } |
||||
|
||||
const owner = Users.findOneById(ownerId); |
||||
|
||||
const federatedRoom = new FederatedRoom(localPeerDomain, room, { owner }); |
||||
|
||||
// Check if this should be skipped
|
||||
if (this.skipCallbackIfNeeded('afterCreateRoom', federatedRoom.getLocalRoom())) { return roomOwner; } |
||||
|
||||
// Load federated users
|
||||
federatedRoom.loadUsers(); |
||||
|
||||
// Refresh room's federation
|
||||
federatedRoom.refreshFederation(); |
||||
|
||||
FederationEvents.roomCreated(federatedRoom, { skipPeers: [localPeerDomain] }); |
||||
} |
||||
|
||||
afterSaveRoomSettings(/* room */) { |
||||
this.log('afterSaveRoomSettings - NOT IMPLEMENTED'); |
||||
} |
||||
|
||||
afterAddedToRoom(users, room) { |
||||
this.log('afterAddedToRoom'); |
||||
|
||||
const { user: userWhoJoined, inviter: userWhoInvited } = users; |
||||
|
||||
// Check if this should be skipped
|
||||
if (this.skipCallbackIfNeeded('afterAddedToRoom', userWhoJoined)) { return users; } |
||||
|
||||
const { peer: { domain: localPeerDomain } } = this; |
||||
|
||||
// Check if room is federated
|
||||
if (!FederatedRoom.isFederated(localPeerDomain, room, { checkUsingUsers: true })) { return users; } |
||||
|
||||
const extras = {}; |
||||
|
||||
// If the room is not federated and has an owner
|
||||
if (!room.federation) { |
||||
let ownerId; |
||||
|
||||
// If the room does not have an owner, get the first user subscribed to that room
|
||||
if (!room.u) { |
||||
const userSubscription = Subscriptions.findOne({ rid: room._id }, { |
||||
sort: { |
||||
ts: 1, |
||||
}, |
||||
}); |
||||
|
||||
ownerId = userSubscription.u._id; |
||||
} else { |
||||
ownerId = room.u._id; |
||||
} |
||||
|
||||
extras.owner = Users.findOneById(ownerId); |
||||
} |
||||
|
||||
const federatedRoom = new FederatedRoom(localPeerDomain, room, extras); |
||||
|
||||
// Load federated users
|
||||
federatedRoom.loadUsers(); |
||||
|
||||
// Refresh room's federation
|
||||
federatedRoom.refreshFederation(); |
||||
|
||||
// If the user who joined is from a different peer...
|
||||
if (userWhoJoined.federation && userWhoJoined.federation.peer !== localPeerDomain) { |
||||
// ...create a "create room" event for that peer
|
||||
FederationEvents.roomCreated(federatedRoom, { peers: [userWhoJoined.federation.peer] }); |
||||
} |
||||
|
||||
// Then, create a "user join/added" event to the other peers
|
||||
const federatedUserWhoJoined = FederatedUser.loadOrCreate(localPeerDomain, userWhoJoined); |
||||
|
||||
if (userWhoInvited) { |
||||
const federatedInviter = FederatedUser.loadOrCreate(localPeerDomain, userWhoInvited); |
||||
|
||||
FederationEvents.userAdded(federatedRoom, federatedUserWhoJoined, federatedInviter, { skipPeers: [localPeerDomain] }); |
||||
} else { |
||||
FederationEvents.userJoined(federatedRoom, federatedUserWhoJoined, { skipPeers: [localPeerDomain] }); |
||||
} |
||||
} |
||||
|
||||
beforeLeaveRoom(userWhoLeft, room) { |
||||
this.log('beforeLeaveRoom'); |
||||
|
||||
// Check if this should be skipped
|
||||
if (this.skipCallbackIfNeeded('beforeLeaveRoom', userWhoLeft)) { return userWhoLeft; } |
||||
|
||||
const { peer: { domain: localPeerDomain } } = this; |
||||
|
||||
// Check if room is federated
|
||||
if (!FederatedRoom.isFederated(localPeerDomain, room)) { return userWhoLeft; } |
||||
|
||||
const federatedRoom = FederatedRoom.loadByFederationId(localPeerDomain, room.federation._id); |
||||
|
||||
const federatedUserWhoLeft = FederatedUser.loadByFederationId(localPeerDomain, userWhoLeft.federation._id); |
||||
|
||||
// Then, create a "user left" event to the other peers
|
||||
FederationEvents.userLeft(federatedRoom, federatedUserWhoLeft, { skipPeers: [localPeerDomain] }); |
||||
|
||||
// Load federated users
|
||||
federatedRoom.loadUsers(); |
||||
|
||||
// Refresh room's federation
|
||||
federatedRoom.refreshFederation(); |
||||
} |
||||
|
||||
beforeRemoveFromRoom(users, room) { |
||||
this.log('beforeRemoveFromRoom'); |
||||
|
||||
const { removedUser, userWhoRemoved } = users; |
||||
|
||||
// Check if this should be skipped
|
||||
if (this.skipCallbackIfNeeded('beforeRemoveFromRoom', removedUser)) { return users; } |
||||
|
||||
const { peer: { domain: localPeerDomain } } = this; |
||||
|
||||
// Check if room is federated
|
||||
if (!FederatedRoom.isFederated(localPeerDomain, room)) { return users; } |
||||
|
||||
const federatedRoom = FederatedRoom.loadByFederationId(localPeerDomain, room.federation._id); |
||||
|
||||
const federatedRemovedUser = FederatedUser.loadByFederationId(localPeerDomain, removedUser.federation._id); |
||||
|
||||
const federatedUserWhoRemoved = FederatedUser.loadByFederationId(localPeerDomain, userWhoRemoved.federation._id); |
||||
|
||||
FederationEvents.userRemoved(federatedRoom, federatedRemovedUser, federatedUserWhoRemoved, { skipPeers: [localPeerDomain] }); |
||||
|
||||
// Load federated users
|
||||
federatedRoom.loadUsers(); |
||||
|
||||
// Refresh room's federation
|
||||
federatedRoom.refreshFederation(); |
||||
} |
||||
|
||||
afterSaveMessage(message, room) { |
||||
this.log('afterSaveMessage'); |
||||
|
||||
// Check if this should be skipped
|
||||
if (this.skipCallbackIfNeeded('afterSaveMessage', message)) { return message; } |
||||
|
||||
const { peer: { domain: localPeerDomain } } = this; |
||||
|
||||
// Check if room is federated
|
||||
if (!FederatedRoom.isFederated(localPeerDomain, room)) { return message; } |
||||
|
||||
const federatedRoom = FederatedRoom.loadByFederationId(localPeerDomain, room.federation._id); |
||||
|
||||
const federatedMessage = FederatedMessage.loadOrCreate(localPeerDomain, message); |
||||
|
||||
// If editedAt exists, it means it is an update
|
||||
if (message.editedAt) { |
||||
const user = Users.findOneById(message.editedBy._id); |
||||
|
||||
const federatedUser = FederatedUser.loadByFederationId(localPeerDomain, user.federation._id); |
||||
|
||||
FederationEvents.messageUpdated(federatedRoom, federatedMessage, federatedUser, { skipPeers: [localPeerDomain] }); |
||||
} else { |
||||
FederationEvents.messageCreated(federatedRoom, federatedMessage, { skipPeers: [localPeerDomain] }); |
||||
} |
||||
} |
||||
|
||||
afterDeleteMessage(message) { |
||||
this.log('afterDeleteMessage'); |
||||
|
||||
// Check if this should be skipped
|
||||
if (this.skipCallbackIfNeeded('afterDeleteMessage', message)) { return message; } |
||||
|
||||
const { peer: { domain: localPeerDomain } } = this; |
||||
|
||||
const room = Rooms.findOneById(message.rid); |
||||
|
||||
// Check if room is federated
|
||||
if (!FederatedRoom.isFederated(localPeerDomain, room)) { return message; } |
||||
|
||||
const federatedRoom = FederatedRoom.loadByFederationId(localPeerDomain, room.federation._id); |
||||
|
||||
const federatedMessage = new FederatedMessage(localPeerDomain, message); |
||||
|
||||
FederationEvents.messageDeleted(federatedRoom, federatedMessage, { skipPeers: [localPeerDomain] }); |
||||
} |
||||
|
||||
afterReadMessages(roomId, { userId }) { |
||||
this.log('afterReadMessages'); |
||||
|
||||
if (!settings.get('Message_Read_Receipt_Enabled')) { this.log('Skipping: read receipts are not enabled'); return roomId; } |
||||
|
||||
const { peer: { domain: localPeerDomain } } = this; |
||||
|
||||
const room = Rooms.findOneById(roomId); |
||||
|
||||
// Check if room is federated
|
||||
if (!FederatedRoom.isFederated(localPeerDomain, room)) { return roomId; } |
||||
|
||||
const federatedRoom = FederatedRoom.loadByFederationId(localPeerDomain, room.federation._id); |
||||
|
||||
if (this.skipCallbackIfNeeded('afterReadMessages', federatedRoom.getLocalRoom())) { return roomId; } |
||||
|
||||
const user = Users.findOneById(userId); |
||||
|
||||
const federatedUser = FederatedUser.loadByFederationId(localPeerDomain, user.federation._id); |
||||
|
||||
FederationEvents.messagesRead(federatedRoom, federatedUser, { skipPeers: [localPeerDomain] }); |
||||
} |
||||
|
||||
afterSetReaction(message, { user, reaction, shouldReact }) { |
||||
this.log('afterSetReaction'); |
||||
|
||||
const room = Rooms.findOneById(message.rid); |
||||
|
||||
const { peer: { domain: localPeerDomain } } = this; |
||||
|
||||
// Check if room is federated
|
||||
if (!FederatedRoom.isFederated(localPeerDomain, room)) { return message; } |
||||
|
||||
const federatedUser = FederatedUser.loadByFederationId(localPeerDomain, user.federation._id); |
||||
|
||||
const federatedMessage = FederatedMessage.loadByFederationId(localPeerDomain, message.federation._id); |
||||
|
||||
const federatedRoom = FederatedRoom.loadByFederationId(localPeerDomain, room.federation._id); |
||||
|
||||
FederationEvents.messagesSetReaction(federatedRoom, federatedMessage, federatedUser, reaction, shouldReact, { skipPeers: [localPeerDomain] }); |
||||
} |
||||
|
||||
afterUnsetReaction(message, { user, reaction, shouldReact }) { |
||||
this.log('afterUnsetReaction'); |
||||
|
||||
const room = Rooms.findOneById(message.rid); |
||||
|
||||
const { peer: { domain: localPeerDomain } } = this; |
||||
|
||||
// Check if room is federated
|
||||
if (!FederatedRoom.isFederated(localPeerDomain, room)) { return message; } |
||||
|
||||
const federatedUser = FederatedUser.loadByFederationId(localPeerDomain, user.federation._id); |
||||
|
||||
const federatedMessage = FederatedMessage.loadByFederationId(localPeerDomain, message.federation._id); |
||||
|
||||
const federatedRoom = FederatedRoom.loadByFederationId(localPeerDomain, room.federation._id); |
||||
|
||||
FederationEvents.messagesUnsetReaction(federatedRoom, federatedMessage, federatedUser, reaction, shouldReact, { skipPeers: [localPeerDomain] }); |
||||
} |
||||
|
||||
afterMuteUser(users, room) { |
||||
this.log('afterMuteUser'); |
||||
|
||||
const { mutedUser, fromUser } = users; |
||||
|
||||
const { peer: { domain: localPeerDomain } } = this; |
||||
|
||||
// Check if room is federated
|
||||
if (!FederatedRoom.isFederated(localPeerDomain, room)) { return users; } |
||||
|
||||
const federatedRoom = FederatedRoom.loadByFederationId(localPeerDomain, room.federation._id); |
||||
|
||||
const federatedMutedUser = FederatedUser.loadByFederationId(localPeerDomain, mutedUser.federation._id); |
||||
|
||||
const federatedUserWhoMuted = FederatedUser.loadByFederationId(localPeerDomain, fromUser.federation._id); |
||||
|
||||
FederationEvents.userMuted(federatedRoom, federatedMutedUser, federatedUserWhoMuted, { skipPeers: [localPeerDomain] }); |
||||
} |
||||
|
||||
afterUnmuteUser(users, room) { |
||||
this.log('afterUnmuteUser'); |
||||
|
||||
const { unmutedUser, fromUser } = users; |
||||
|
||||
const { peer: { domain: localPeerDomain } } = this; |
||||
|
||||
// Check if room is federated
|
||||
if (!FederatedRoom.isFederated(localPeerDomain, room)) { return users; } |
||||
|
||||
const federatedRoom = FederatedRoom.loadByFederationId(localPeerDomain, room.federation._id); |
||||
|
||||
const federatedUnmutedUser = FederatedUser.loadByFederationId(localPeerDomain, unmutedUser.federation._id); |
||||
|
||||
const federatedUserWhoUnmuted = FederatedUser.loadByFederationId(localPeerDomain, fromUser.federation._id); |
||||
|
||||
FederationEvents.userUnmuted(federatedRoom, federatedUnmutedUser, federatedUserWhoUnmuted, { skipPeers: [localPeerDomain] }); |
||||
} |
||||
} |
||||
|
||||
export default new PeerClient(); |
||||
@ -0,0 +1,175 @@ |
||||
import dns from 'dns'; |
||||
import { Meteor } from 'meteor/meteor'; |
||||
import { FederationDNSCache } from 'meteor/rocketchat:models'; |
||||
|
||||
import { logger } from './logger'; |
||||
import peerHTTP from './peerHTTP'; |
||||
import { updateStatus } from './settingsUpdater'; |
||||
|
||||
const dnsResolveSRV = Meteor.wrapAsync(dns.resolveSrv); |
||||
const dnsResolveTXT = Meteor.wrapAsync(dns.resolveTxt); |
||||
|
||||
class PeerDNS { |
||||
constructor() { |
||||
this.config = {}; |
||||
} |
||||
|
||||
setConfig(config) { |
||||
// General
|
||||
this.config = config; |
||||
|
||||
// Setup HubPeer
|
||||
const { hub: { url } } = config; |
||||
this.HubPeer = { url }; |
||||
} |
||||
|
||||
log(message) { |
||||
logger.dns.info(message); |
||||
} |
||||
|
||||
// ########
|
||||
//
|
||||
// Register
|
||||
//
|
||||
// ########
|
||||
register(peerConfig) { |
||||
const { uniqueId, domain, url, public_key } = peerConfig; |
||||
|
||||
this.log(`Registering peer with domain ${ domain }...`); |
||||
|
||||
// Attempt to register peer
|
||||
try { |
||||
peerHTTP.request(this.HubPeer, 'POST', '/api/v1/peers', { uniqueId, domain, url, public_key }, { total: 5, stepSize: 1000, tryToUpdateDNS: false }); |
||||
|
||||
this.log('Peer registered!'); |
||||
|
||||
updateStatus('Running, registered to Hub'); |
||||
|
||||
return true; |
||||
} catch (err) { |
||||
this.log(err); |
||||
|
||||
this.log('Could not register peer'); |
||||
|
||||
return false; |
||||
} |
||||
} |
||||
|
||||
// #############
|
||||
//
|
||||
// Peer Handling
|
||||
//
|
||||
// #############
|
||||
searchPeer(domain) { |
||||
this.log(`searchPeer: ${ domain }`); |
||||
|
||||
let peer = FederationDNSCache.findOneByDomain(domain); |
||||
|
||||
// Try to lookup at the DNS Cache
|
||||
if (!peer) { |
||||
this.updatePeerDNS(domain); |
||||
|
||||
peer = FederationDNSCache.findOneByDomain(domain); |
||||
} |
||||
|
||||
return peer; |
||||
} |
||||
|
||||
getPeerUsingDNS(domain) { |
||||
this.log(`getPeerUsingDNS: ${ domain }`); |
||||
|
||||
// Try searching by DNS first
|
||||
const srvEntries = dnsResolveSRV(`_rocketchat._tcp.${ domain }`); |
||||
|
||||
const [srvEntry] = srvEntries; |
||||
|
||||
// Get the public key from the TXT record
|
||||
const txtRecords = dnsResolveTXT(domain); |
||||
|
||||
let publicKey; |
||||
|
||||
for (const txtRecord of txtRecords) { |
||||
const joinedTxtRecord = txtRecord.join(''); |
||||
|
||||
if (joinedTxtRecord.indexOf('rocketchat-public-key=') === 0) { |
||||
publicKey = joinedTxtRecord; |
||||
break; |
||||
} |
||||
} |
||||
|
||||
if (!publicKey) { |
||||
throw new Meteor.Error('ENOTFOUND', 'Could not find public key entry on TXT records'); |
||||
} |
||||
|
||||
publicKey = publicKey.replace('rocketchat-public-key=', ''); |
||||
|
||||
const protocol = srvEntry.name === 'localhost' ? 'http' : 'https'; |
||||
|
||||
return { |
||||
domain, |
||||
url: `${ protocol }://${ srvEntry.name }:${ srvEntry.port }`, |
||||
public_key: publicKey, |
||||
}; |
||||
} |
||||
|
||||
getPeerUsingHub(domain) { |
||||
this.log(`getPeerUsingHub: ${ domain }`); |
||||
|
||||
// If there is no DNS entry for that, get from the Hub
|
||||
const { data: { peer } } = peerHTTP.simpleRequest(this.HubPeer, 'GET', `/api/v1/peers?search=${ domain }`); |
||||
|
||||
return peer; |
||||
} |
||||
|
||||
// ##############
|
||||
//
|
||||
// DNS Management
|
||||
//
|
||||
// ##############
|
||||
updatePeerDNS(domain) { |
||||
this.log(`updatePeerDNS: ${ domain }`); |
||||
|
||||
let peer; |
||||
|
||||
try { |
||||
peer = this.getPeerUsingDNS(domain); |
||||
} catch (err) { |
||||
if (err.code !== 'ENOTFOUND') { |
||||
this.log(err); |
||||
|
||||
throw new Error(`Error trying to fetch SRV DNS entries for ${ domain }`); |
||||
} |
||||
|
||||
peer = this.getPeerUsingHub(domain); |
||||
} |
||||
|
||||
this.updateDNSCache.call(this, peer); |
||||
|
||||
return peer; |
||||
} |
||||
|
||||
updateDNSEntry(peer) { |
||||
this.log('updateDNSEntry'); |
||||
|
||||
const { domain } = peer; |
||||
|
||||
delete peer._id; |
||||
|
||||
// Make sure public_key has no line breaks
|
||||
peer.public_key = peer.public_key.replace(/\n|\r/g, ''); |
||||
|
||||
return FederationDNSCache.upsert({ domain }, peer); |
||||
} |
||||
|
||||
updateDNSCache(peers) { |
||||
this.log('updateDNSCache'); |
||||
|
||||
peers = Array.isArray(peers) ? peers : [peers]; |
||||
|
||||
for (const peer of peers) { |
||||
this.updateDNSEntry.call(this, peer); |
||||
} |
||||
} |
||||
} |
||||
|
||||
export default new PeerDNS(); |
||||
@ -0,0 +1,126 @@ |
||||
import { Meteor } from 'meteor/meteor'; |
||||
import { HTTP } from 'meteor/http'; |
||||
|
||||
import { logger } from './logger'; |
||||
import peerDNS from './peerDNS'; |
||||
|
||||
// Should skip the retry if the error is one of the below?
|
||||
const errorsToSkipRetrying = [ |
||||
'error-app-prevented-sending', |
||||
]; |
||||
|
||||
function skipRetryOnSpecificError(err) { |
||||
return errorsToSkipRetrying.includes(err && err.errorType); |
||||
} |
||||
|
||||
// Delay method to wait a little bit before retrying
|
||||
const delay = Meteor.wrapAsync(function(ms, callback) { |
||||
Meteor.setTimeout(function() { |
||||
callback(null); |
||||
}, ms); |
||||
}); |
||||
|
||||
function doSimpleRequest(peer, method, uri, body) { |
||||
this.log(`Request: ${ method } ${ uri }`); |
||||
|
||||
const { url: serverBaseURL } = peer; |
||||
|
||||
const url = `${ serverBaseURL }${ uri }`; |
||||
|
||||
let data = null; |
||||
|
||||
if (method === 'POST' || method === 'PUT') { |
||||
data = body; |
||||
} |
||||
|
||||
this.log(`Sending request: ${ method } - ${ uri }`); |
||||
|
||||
return HTTP.call(method, url, { data, timeout: 2000, headers: { 'x-federation-domain': this.config.peer.domain } }); |
||||
} |
||||
|
||||
//
|
||||
// Actually does the request, handling retries and everything
|
||||
function doRequest(peer, method, uri, body, retryInfo = {}) { |
||||
// Normalize retry info
|
||||
retryInfo = { |
||||
total: retryInfo.total || 1, |
||||
stepSize: retryInfo.stepSize || 100, |
||||
stepMultiplier: retryInfo.stepMultiplier || 1, |
||||
tryToUpdateDNS: retryInfo.tryToUpdateDNS === undefined ? true : retryInfo.tryToUpdateDNS, |
||||
DNSUpdated: false, |
||||
}; |
||||
|
||||
for (let i = 0; i <= retryInfo.total; i++) { |
||||
try { |
||||
return doSimpleRequest.call(this, peer, method, uri, body); |
||||
} catch (err) { |
||||
try { |
||||
if (retryInfo.tryToUpdateDNS && !retryInfo.DNSUpdated) { |
||||
i--; |
||||
|
||||
retryInfo.DNSUpdated = true; |
||||
|
||||
this.log(`Trying to update local DNS cache for peer:${ peer.domain }`); |
||||
|
||||
peer = peerDNS.updatePeerDNS(peer.domain); |
||||
|
||||
continue; |
||||
} |
||||
} catch (err) { |
||||
if (err.response && err.response.statusCode === 404) { |
||||
throw new Meteor.Error('federation-peer-does-not-exist', 'Peer does not exist'); |
||||
} |
||||
} |
||||
|
||||
// Check if we need to skip due to specific error
|
||||
if (skipRetryOnSpecificError(err && err.response && err.response.data)) { |
||||
this.log('Retry: skipping due to specific error'); |
||||
|
||||
throw err; |
||||
} |
||||
|
||||
if (i === retryInfo.total - 1) { |
||||
// Throw the error, as we could not fulfill the request
|
||||
this.log('Retry: could not fulfill the request'); |
||||
|
||||
throw err; |
||||
} |
||||
|
||||
const timeToRetry = retryInfo.stepSize * (i + 1) * retryInfo.stepMultiplier; |
||||
|
||||
this.log(`Trying again in ${ timeToRetry / 1000 }s: ${ method } - ${ uri }`); |
||||
|
||||
// Otherwise, wait and try again
|
||||
delay(timeToRetry); |
||||
} |
||||
} |
||||
} |
||||
|
||||
class PeerHTTP { |
||||
constructor() { |
||||
this.config = {}; |
||||
} |
||||
|
||||
setConfig(config) { |
||||
// General
|
||||
this.config = config; |
||||
} |
||||
|
||||
log(message) { |
||||
logger.http.info(message); |
||||
} |
||||
|
||||
//
|
||||
// Direct request
|
||||
simpleRequest(peer, method, uri, body) { |
||||
return doSimpleRequest.call(this, peer, method, uri, body); |
||||
} |
||||
|
||||
//
|
||||
// Request trying to find DNS entries
|
||||
request(peer, method, uri, body, retryInfo = {}) { |
||||
return doRequest.call(this, peer, method, uri, body, retryInfo); |
||||
} |
||||
} |
||||
|
||||
export default new PeerHTTP(); |
||||
@ -0,0 +1,8 @@ |
||||
import peerServer from './peerServer'; |
||||
|
||||
// Setup routes
|
||||
import './routes/events'; |
||||
import './routes/uploads'; |
||||
import './routes/users'; |
||||
|
||||
export default peerServer; |
||||
@ -0,0 +1,388 @@ |
||||
import { callbacks } from 'meteor/rocketchat:callbacks'; |
||||
import { setReaction } from 'meteor/rocketchat:reactions'; |
||||
import { addUserToRoom, removeUserFromRoom, deleteMessage } from 'meteor/rocketchat:lib'; |
||||
import { Rooms, Subscriptions } from 'meteor/rocketchat:models'; |
||||
|
||||
import { FederatedMessage, FederatedRoom, FederatedUser } from '../federatedResources'; |
||||
import { logger } from '../logger.js'; |
||||
import peerClient from '../peerClient'; |
||||
|
||||
class PeerServer { |
||||
constructor() { |
||||
this.config = {}; |
||||
this.enabled = false; |
||||
} |
||||
|
||||
setConfig(config) { |
||||
// General
|
||||
this.config = config; |
||||
} |
||||
|
||||
log(message) { |
||||
logger.peerServer.info(message); |
||||
} |
||||
|
||||
disable() { |
||||
this.log('Disabling...'); |
||||
|
||||
this.enabled = false; |
||||
} |
||||
|
||||
enable() { |
||||
this.log('Enabling...'); |
||||
|
||||
this.enabled = true; |
||||
} |
||||
|
||||
start() { |
||||
this.log('Routes are set'); |
||||
} |
||||
|
||||
handleDirectRoomCreatedEvent(e) { |
||||
this.log('handleDirectRoomCreatedEvent'); |
||||
|
||||
const { peer: { domain: localPeerDomain } } = this.config; |
||||
|
||||
const { payload: { room, owner, users } } = e; |
||||
|
||||
// Load the federated room
|
||||
const federatedRoom = new FederatedRoom(localPeerDomain, room, { owner }); |
||||
|
||||
// Set users
|
||||
federatedRoom.setUsers(users); |
||||
|
||||
// Create, if needed, all room's users
|
||||
federatedRoom.createUsers(); |
||||
|
||||
// Then, create the room, if needed
|
||||
federatedRoom.create(); |
||||
} |
||||
|
||||
handleRoomCreatedEvent(e) { |
||||
this.log('handleRoomCreatedEvent'); |
||||
|
||||
const { peer: { domain: localPeerDomain } } = this.config; |
||||
|
||||
const { payload: { room, owner, users } } = e; |
||||
|
||||
// Load the federated room
|
||||
const federatedRoom = new FederatedRoom(localPeerDomain, room, { owner }); |
||||
|
||||
// Set users
|
||||
federatedRoom.setUsers(users); |
||||
|
||||
// Create, if needed, all room's users
|
||||
federatedRoom.createUsers(); |
||||
|
||||
// Then, create the room, if needed
|
||||
federatedRoom.create(); |
||||
} |
||||
|
||||
handleUserJoinedEvent(e) { |
||||
this.log('handleUserJoinedEvent'); |
||||
|
||||
const { peer: { domain: localPeerDomain } } = this.config; |
||||
|
||||
const { payload: { federated_room_id, user } } = e; |
||||
|
||||
// Load the federated room
|
||||
const federatedRoom = FederatedRoom.loadByFederationId(localPeerDomain, federated_room_id); |
||||
|
||||
// Create the user, if needed
|
||||
const federatedUser = FederatedUser.loadOrCreate(localPeerDomain, user); |
||||
const localUser = federatedUser.create(); |
||||
|
||||
// Callback management
|
||||
peerClient.addCallbackToSkip('afterAddedToRoom', federatedUser.getFederationId()); |
||||
|
||||
// Add the user to the room
|
||||
addUserToRoom(federatedRoom.room._id, localUser, null, false); |
||||
|
||||
// Load federated users
|
||||
federatedRoom.loadUsers(); |
||||
|
||||
// Refresh room's federation
|
||||
federatedRoom.refreshFederation(); |
||||
} |
||||
|
||||
handleUserAddedEvent(e) { |
||||
this.log('handleUserAddedEvent'); |
||||
|
||||
const { peer: { domain: localPeerDomain } } = this.config; |
||||
|
||||
const { payload: { federated_room_id, federated_inviter_id, user } } = e; |
||||
|
||||
// Load the federated room
|
||||
const federatedRoom = FederatedRoom.loadByFederationId(localPeerDomain, federated_room_id); |
||||
|
||||
// Load the inviter
|
||||
const federatedInviter = FederatedUser.loadByFederationId(localPeerDomain, federated_inviter_id); |
||||
|
||||
if (!federatedInviter) { |
||||
throw new Error('Inviting user does not exist'); |
||||
} |
||||
|
||||
const localInviter = federatedInviter.getLocalUser(); |
||||
|
||||
// Create the user, if needed
|
||||
const federatedUser = FederatedUser.loadOrCreate(localPeerDomain, user); |
||||
const localUser = federatedUser.create(); |
||||
|
||||
// Callback management
|
||||
peerClient.addCallbackToSkip('afterAddedToRoom', federatedUser.getFederationId()); |
||||
|
||||
// Add the user to the room
|
||||
addUserToRoom(federatedRoom.room._id, localUser, localInviter, false); |
||||
|
||||
// Load federated users
|
||||
federatedRoom.loadUsers(); |
||||
|
||||
// Refresh room's federation
|
||||
federatedRoom.refreshFederation(); |
||||
} |
||||
|
||||
handleUserLeftEvent(e) { |
||||
this.log('handleUserLeftEvent'); |
||||
|
||||
const { peer: { domain: localPeerDomain } } = this.config; |
||||
|
||||
const { payload: { federated_room_id, federated_user_id } } = e; |
||||
|
||||
// Load the federated room
|
||||
const federatedRoom = FederatedRoom.loadByFederationId(localPeerDomain, federated_room_id); |
||||
|
||||
// Load the user who left
|
||||
const federatedUser = FederatedUser.loadByFederationId(localPeerDomain, federated_user_id); |
||||
const localUser = federatedUser.getLocalUser(); |
||||
|
||||
// Callback management
|
||||
peerClient.addCallbackToSkip('beforeLeaveRoom', federatedUser.getFederationId()); |
||||
|
||||
// Remove the user from the room
|
||||
removeUserFromRoom(federatedRoom.room._id, localUser); |
||||
|
||||
// Load federated users
|
||||
federatedRoom.loadUsers(); |
||||
|
||||
// Refresh room's federation
|
||||
federatedRoom.refreshFederation(); |
||||
} |
||||
|
||||
handleUserRemovedEvent(e) { |
||||
this.log('handleUserRemovedEvent'); |
||||
|
||||
const { peer: { domain: localPeerDomain } } = this.config; |
||||
|
||||
const { payload: { federated_room_id, federated_user_id, federated_removed_by_user_id } } = e; |
||||
|
||||
// Load the federated room
|
||||
const federatedRoom = FederatedRoom.loadByFederationId(localPeerDomain, federated_room_id); |
||||
|
||||
// Load the user who left
|
||||
const federatedUser = FederatedUser.loadByFederationId(localPeerDomain, federated_user_id); |
||||
const localUser = federatedUser.getLocalUser(); |
||||
|
||||
// Load the user who removed
|
||||
const federatedUserWhoRemoved = FederatedUser.loadByFederationId(localPeerDomain, federated_removed_by_user_id); |
||||
const localUserWhoRemoved = federatedUserWhoRemoved.getLocalUser(); |
||||
|
||||
// Callback management
|
||||
peerClient.addCallbackToSkip('beforeRemoveFromRoom', federatedUser.getFederationId()); |
||||
|
||||
// Remove the user from the room
|
||||
removeUserFromRoom(federatedRoom.room._id, localUser, { byUser: localUserWhoRemoved }); |
||||
|
||||
// Load federated users
|
||||
federatedRoom.loadUsers(); |
||||
|
||||
// Refresh room's federation
|
||||
federatedRoom.refreshFederation(); |
||||
} |
||||
|
||||
handleUserMutedEvent(e) { |
||||
this.log('handleUserMutedEvent'); |
||||
|
||||
const { peer: { domain: localPeerDomain } } = this.config; |
||||
|
||||
const { payload: { federated_room_id, federated_user_id } } = e; |
||||
// const { payload: { federated_room_id, federated_user_id, federated_muted_by_user_id } } = e;
|
||||
|
||||
// Load the federated room
|
||||
const federatedRoom = FederatedRoom.loadByFederationId(localPeerDomain, federated_room_id); |
||||
|
||||
// Load the user who left
|
||||
const federatedUser = FederatedUser.loadByFederationId(localPeerDomain, federated_user_id); |
||||
const localUser = federatedUser.getLocalUser(); |
||||
|
||||
// // Load the user who muted
|
||||
// const federatedUserWhoMuted = FederatedUser.loadByFederationId(localPeerDomain, federated_muted_by_user_id);
|
||||
// const localUserWhoMuted = federatedUserWhoMuted.getLocalUser();
|
||||
|
||||
// Mute user
|
||||
Rooms.muteUsernameByRoomId(federatedRoom.room._id, localUser.username); |
||||
|
||||
// TODO: should we create a message?
|
||||
} |
||||
|
||||
handleUserUnmutedEvent(e) { |
||||
this.log('handleUserUnmutedEvent'); |
||||
|
||||
const { peer: { domain: localPeerDomain } } = this.config; |
||||
|
||||
const { payload: { federated_room_id, federated_user_id } } = e; |
||||
// const { payload: { federated_room_id, federated_user_id, federated_unmuted_by_user_id } } = e;
|
||||
|
||||
// Load the federated room
|
||||
const federatedRoom = FederatedRoom.loadByFederationId(localPeerDomain, federated_room_id); |
||||
|
||||
// Load the user who left
|
||||
const federatedUser = FederatedUser.loadByFederationId(localPeerDomain, federated_user_id); |
||||
const localUser = federatedUser.getLocalUser(); |
||||
|
||||
// // Load the user who muted
|
||||
// const federatedUserWhoUnmuted = FederatedUser.loadByFederationId(localPeerDomain, federated_unmuted_by_user_id);
|
||||
// const localUserWhoUnmuted = federatedUserWhoUnmuted.getLocalUser();
|
||||
|
||||
// Unmute user
|
||||
Rooms.unmuteUsernameByRoomId(federatedRoom.room._id, localUser.username); |
||||
|
||||
// TODO: should we create a message?
|
||||
} |
||||
|
||||
handleMessageCreatedEvent(e) { |
||||
this.log('handleMessageCreatedEvent'); |
||||
|
||||
const { peer: { domain: localPeerDomain } } = this.config; |
||||
|
||||
const { payload: { message } } = e; |
||||
|
||||
// Load the federated message
|
||||
const federatedMessage = new FederatedMessage(localPeerDomain, message); |
||||
|
||||
// Callback management
|
||||
peerClient.addCallbackToSkip('afterSaveMessage', federatedMessage.getFederationId()); |
||||
|
||||
// Create the federated message
|
||||
federatedMessage.create(); |
||||
} |
||||
|
||||
handleMessageUpdatedEvent(e) { |
||||
this.log('handleMessageUpdatedEvent'); |
||||
|
||||
const { peer: { domain: localPeerDomain } } = this.config; |
||||
|
||||
const { payload: { message, federated_user_id } } = e; |
||||
|
||||
// Load the federated message
|
||||
const federatedMessage = new FederatedMessage(localPeerDomain, message); |
||||
|
||||
// Load the federated user
|
||||
const federatedUser = FederatedUser.loadByFederationId(localPeerDomain, federated_user_id); |
||||
|
||||
// Callback management
|
||||
peerClient.addCallbackToSkip('afterSaveMessage', federatedMessage.getFederationId()); |
||||
|
||||
// Update the federated message
|
||||
federatedMessage.update(federatedUser); |
||||
} |
||||
|
||||
handleMessageDeletedEvent(e) { |
||||
this.log('handleMessageDeletedEvent'); |
||||
|
||||
const { peer: { domain: localPeerDomain } } = this.config; |
||||
|
||||
const { payload: { federated_message_id } } = e; |
||||
|
||||
const federatedMessage = FederatedMessage.loadByFederationId(localPeerDomain, federated_message_id); |
||||
|
||||
// Load the federated message
|
||||
const localMessage = federatedMessage.getLocalMessage(); |
||||
|
||||
// Load the author
|
||||
const localAuthor = federatedMessage.federatedAuthor.getLocalUser(); |
||||
|
||||
// Callback management
|
||||
peerClient.addCallbackToSkip('afterDeleteMessage', federatedMessage.getFederationId()); |
||||
|
||||
// Create the federated message
|
||||
deleteMessage(localMessage, localAuthor); |
||||
} |
||||
|
||||
handleMessagesReadEvent(e) { |
||||
this.log('handleMessagesReadEvent'); |
||||
|
||||
const { peer: { domain: localPeerDomain } } = this.config; |
||||
|
||||
const { payload: { federated_room_id, federated_user_id } } = e; |
||||
|
||||
// Load the federated room
|
||||
const federatedRoom = FederatedRoom.loadByFederationId(localPeerDomain, federated_room_id); |
||||
|
||||
peerClient.addCallbackToSkip('afterReadMessages', federatedRoom.getFederationId()); |
||||
|
||||
// Load the user who left
|
||||
const federatedUser = FederatedUser.loadByFederationId(localPeerDomain, federated_user_id); |
||||
const localUser = federatedUser.getLocalUser(); |
||||
|
||||
// Mark the messages as read
|
||||
// TODO: move below calls to an exported function
|
||||
const userSubscription = Subscriptions.findOneByRoomIdAndUserId(federatedRoom.room._id, localUser._id, { fields: { ls: 1 } }); |
||||
Subscriptions.setAsReadByRoomIdAndUserId(federatedRoom.room._id, localUser._id); |
||||
|
||||
callbacks.run('afterReadMessages', federatedRoom.room._id, { userId: localUser._id, lastSeen: userSubscription.ls }); |
||||
} |
||||
|
||||
handleMessagesSetReactionEvent(e) { |
||||
this.log('handleMessagesSetReactionEvent'); |
||||
|
||||
const { peer: { domain: localPeerDomain } } = this.config; |
||||
|
||||
const { payload: { federated_room_id, federated_message_id, federated_user_id, reaction, shouldReact } } = e; |
||||
|
||||
// Load the federated room
|
||||
const federatedRoom = FederatedRoom.loadByFederationId(localPeerDomain, federated_room_id); |
||||
const localRoom = federatedRoom.getLocalRoom(); |
||||
|
||||
// Load the user who reacted
|
||||
const federatedUser = FederatedUser.loadByFederationId(localPeerDomain, federated_user_id); |
||||
const localUser = federatedUser.getLocalUser(); |
||||
|
||||
// Load the message
|
||||
const federatedMessage = FederatedMessage.loadByFederationId(localPeerDomain, federated_message_id); |
||||
const localMessage = federatedMessage.getLocalMessage(); |
||||
|
||||
// Callback management
|
||||
peerClient.addCallbackToSkip('afterSetReaction', federatedMessage.getFederationId()); |
||||
|
||||
// Set message reaction
|
||||
setReaction(localRoom, localUser, localMessage, reaction, shouldReact); |
||||
} |
||||
|
||||
handleMessagesUnsetReactionEvent(e) { |
||||
this.log('handleMessagesUnsetReactionEvent'); |
||||
|
||||
const { peer: { domain: localPeerDomain } } = this.config; |
||||
|
||||
const { payload: { federated_room_id, federated_message_id, federated_user_id, reaction, shouldReact } } = e; |
||||
|
||||
// Load the federated room
|
||||
const federatedRoom = FederatedRoom.loadByFederationId(localPeerDomain, federated_room_id); |
||||
const localRoom = federatedRoom.getLocalRoom(); |
||||
|
||||
// Load the user who reacted
|
||||
const federatedUser = FederatedUser.loadByFederationId(localPeerDomain, federated_user_id); |
||||
const localUser = federatedUser.getLocalUser(); |
||||
|
||||
// Load the message
|
||||
const federatedMessage = FederatedMessage.loadByFederationId(localPeerDomain, federated_message_id); |
||||
const localMessage = federatedMessage.getLocalMessage(); |
||||
|
||||
// Callback management
|
||||
peerClient.addCallbackToSkip('afterUnsetReaction', federatedMessage.getFederationId()); |
||||
|
||||
// Unset message reaction
|
||||
setReaction(localRoom, localUser, localMessage, reaction, shouldReact); |
||||
} |
||||
} |
||||
|
||||
export default new PeerServer(); |
||||
@ -0,0 +1,106 @@ |
||||
import { API } from 'meteor/rocketchat:api'; |
||||
import { FederationKeys } from 'meteor/rocketchat:models'; |
||||
|
||||
import { Federation } from '../../main'; |
||||
|
||||
import peerDNS from '../../peerDNS'; |
||||
import peerServer from '../peerServer'; |
||||
|
||||
API.v1.addRoute('federation.events', { authRequired: false }, { |
||||
post() { |
||||
if (!peerServer.enabled) { |
||||
return API.v1.failure('Not found'); |
||||
} |
||||
|
||||
if (!this.bodyParams.payload) { |
||||
return API.v1.failure('Payload was not sent'); |
||||
} |
||||
|
||||
if (!this.request.headers['x-federation-domain']) { |
||||
return API.v1.failure('Cannot handle that request'); |
||||
} |
||||
|
||||
const remotePeerDomain = this.request.headers['x-federation-domain']; |
||||
|
||||
const peer = peerDNS.searchPeer(remotePeerDomain); |
||||
|
||||
if (!peer) { |
||||
return API.v1.failure('Could not find valid peer'); |
||||
} |
||||
|
||||
const payloadBuffer = Buffer.from(this.bodyParams.payload.data); |
||||
|
||||
// Decrypt with the peer's public key
|
||||
let payload = FederationKeys.loadKey(peer.public_key, 'public').decryptPublic(payloadBuffer); |
||||
|
||||
// Decrypt with the local private key
|
||||
payload = Federation.privateKey.decrypt(payload); |
||||
|
||||
// Get the event
|
||||
const { event: e } = JSON.parse(payload.toString()); |
||||
|
||||
if (!e) { |
||||
return API.v1.failure('Event was not sent'); |
||||
} |
||||
|
||||
peerServer.log(`Received event:${ e.t }`); |
||||
|
||||
try { |
||||
switch (e.t) { |
||||
case 'drc': |
||||
peerServer.handleDirectRoomCreatedEvent(e); |
||||
break; |
||||
case 'roc': |
||||
peerServer.handleRoomCreatedEvent(e); |
||||
break; |
||||
case 'usj': |
||||
peerServer.handleUserJoinedEvent(e); |
||||
break; |
||||
case 'usa': |
||||
peerServer.handleUserAddedEvent(e); |
||||
break; |
||||
case 'usl': |
||||
peerServer.handleUserLeftEvent(e); |
||||
break; |
||||
case 'usr': |
||||
peerServer.handleUserRemovedEvent(e); |
||||
break; |
||||
case 'usm': |
||||
peerServer.handleUserMutedEvent(e); |
||||
break; |
||||
case 'usu': |
||||
peerServer.handleUserUnmutedEvent(e); |
||||
break; |
||||
case 'msc': |
||||
peerServer.handleMessageCreatedEvent(e); |
||||
break; |
||||
case 'msu': |
||||
peerServer.handleMessageUpdatedEvent(e); |
||||
break; |
||||
case 'msd': |
||||
peerServer.handleMessageDeletedEvent(e); |
||||
break; |
||||
case 'msr': |
||||
peerServer.handleMessagesReadEvent(e); |
||||
break; |
||||
case 'mrs': |
||||
peerServer.handleMessagesSetReactionEvent(e); |
||||
break; |
||||
case 'mru': |
||||
peerServer.handleMessagesUnsetReactionEvent(e); |
||||
break; |
||||
default: |
||||
throw new Error(`Invalid event:${ e.t }`); |
||||
} |
||||
|
||||
peerServer.log('Success, responding...'); |
||||
|
||||
// Respond
|
||||
return API.v1.success(); |
||||
} catch (err) { |
||||
peerServer.log(`Error handling event:${ e.t } - ${ err.toString() }`); |
||||
|
||||
return API.v1.failure(`Error handling event:${ e.t } - ${ err.toString() }`, err.error || 'unknown-error'); |
||||
} |
||||
}, |
||||
}); |
||||
@ -0,0 +1,28 @@ |
||||
import { Meteor } from 'meteor/meteor'; |
||||
import { API } from 'meteor/rocketchat:api'; |
||||
import { Uploads } from 'meteor/rocketchat:models'; |
||||
import { FileUpload } from 'meteor/rocketchat:file-upload'; |
||||
|
||||
import peerServer from '../peerServer'; |
||||
|
||||
API.v1.addRoute('federation.uploads', { authRequired: false }, { |
||||
get() { |
||||
if (!peerServer.enabled) { |
||||
return API.v1.failure('Not found'); |
||||
} |
||||
|
||||
const { upload_id } = this.requestParams(); |
||||
|
||||
const upload = Uploads.findOneById(upload_id); |
||||
|
||||
if (!upload) { |
||||
return API.v1.failure('There is no such file in this server'); |
||||
} |
||||
|
||||
const getFileBuffer = Meteor.wrapAsync(FileUpload.getBuffer, FileUpload); |
||||
|
||||
const buffer = getFileBuffer(upload); |
||||
|
||||
return API.v1.success({ upload, buffer }); |
||||
}, |
||||
}); |
||||
@ -0,0 +1,49 @@ |
||||
import { API } from 'meteor/rocketchat:api'; |
||||
import { Users } from 'meteor/rocketchat:models'; |
||||
|
||||
import { FederatedUser } from '../../federatedResources'; |
||||
import peerServer from '../peerServer'; |
||||
|
||||
API.v1.addRoute('federation.users', { authRequired: false }, { |
||||
get() { |
||||
if (!peerServer.enabled) { |
||||
return API.v1.failure('Not found'); |
||||
} |
||||
|
||||
const { peer: { domain: localPeerDomain } } = peerServer.config; |
||||
|
||||
const { username, domain, emailOnly } = this.requestParams(); |
||||
|
||||
const email = `${ username }@${ domain }`; |
||||
|
||||
peerServer.log(`[users] Trying to find user by username:${ username } and email:${ email }`); |
||||
|
||||
const query = { |
||||
type: 'user', |
||||
}; |
||||
|
||||
if (emailOnly === 'true') { |
||||
query['emails.address'] = email; |
||||
} else { |
||||
query.$or = [ |
||||
{ name: username }, |
||||
{ username }, |
||||
{ 'emails.address': email }, |
||||
]; |
||||
} |
||||
|
||||
const users = Users.find(query, { fields: { services: 0, roles: 0 } }).fetch(); |
||||
|
||||
if (!users.length) { |
||||
return API.v1.failure('There is no such user in this server'); |
||||
} |
||||
|
||||
const federatedUsers = []; |
||||
|
||||
for (const user of users) { |
||||
federatedUsers.push(new FederatedUser(localPeerDomain, user)); |
||||
} |
||||
|
||||
return API.v1.success({ federatedUsers }); |
||||
}, |
||||
}); |
||||
@ -0,0 +1,17 @@ |
||||
import { Settings } from 'meteor/rocketchat:models'; |
||||
|
||||
let nextStatus; |
||||
|
||||
export function updateStatus(status) { |
||||
Settings.updateValueById('FEDERATION_Status', nextStatus || status); |
||||
|
||||
nextStatus = null; |
||||
} |
||||
|
||||
export function updateNextStatusTo(status) { |
||||
nextStatus = status; |
||||
} |
||||
|
||||
export function updateEnabled(enabled) { |
||||
Settings.updateValueById('FEDERATION_Enabled', enabled); |
||||
} |
||||
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,13 @@ |
||||
import { Base } from './_Base'; |
||||
|
||||
class FederationDNSCacheModel extends Base { |
||||
constructor() { |
||||
super('federation_dns_cache'); |
||||
} |
||||
|
||||
findOneByDomain(domain) { |
||||
return this.findOne({ domain }); |
||||
} |
||||
} |
||||
|
||||
export const FederationDNSCache = new FederationDNSCacheModel(); |
||||
@ -0,0 +1,255 @@ |
||||
import { Base } from './_Base'; |
||||
|
||||
const normalizePeers = (basePeers, options) => { |
||||
const { peers: sentPeers, skipPeers } = options; |
||||
|
||||
let peers = sentPeers || basePeers || []; |
||||
|
||||
if (skipPeers) { |
||||
peers = peers.filter((p) => skipPeers.indexOf(p) === -1); |
||||
} |
||||
|
||||
return peers; |
||||
}; |
||||
|
||||
//
|
||||
// We should create a time to live index in this table to remove fulfilled events
|
||||
//
|
||||
class FederationEventsModel extends Base { |
||||
constructor() { |
||||
super('federation_events'); |
||||
} |
||||
|
||||
// Sometimes events errored but the error is final
|
||||
setEventAsErrored(e, error, fulfilled = false) { |
||||
this.update({ _id: e._id }, { |
||||
$set: { |
||||
fulfilled, |
||||
lastAttemptAt: new Date(), |
||||
error, |
||||
}, |
||||
}); |
||||
} |
||||
|
||||
setEventAsFullfilled(e) { |
||||
this.update({ _id: e._id }, { |
||||
$set: { fulfilled: true }, |
||||
$unset: { error: 1 }, |
||||
}); |
||||
} |
||||
|
||||
createEvent(type, payload, peer) { |
||||
const record = { |
||||
t: type, |
||||
ts: new Date(), |
||||
fulfilled: false, |
||||
payload, |
||||
peer, |
||||
}; |
||||
|
||||
record._id = this.insert(record); |
||||
|
||||
this.emit('createEvent', record); |
||||
|
||||
return record; |
||||
} |
||||
|
||||
createEventForPeers(type, payload, peers) { |
||||
const records = []; |
||||
|
||||
for (const peer of peers) { |
||||
const record = this.createEvent(type, payload, peer); |
||||
|
||||
records.push(record); |
||||
} |
||||
|
||||
return records; |
||||
} |
||||
|
||||
// Create a `directRoomCreated(drc)` event
|
||||
directRoomCreated(federatedRoom, options = {}) { |
||||
const peers = normalizePeers(federatedRoom.getPeers(), options); |
||||
|
||||
const payload = { |
||||
room: federatedRoom.getRoom(), |
||||
owner: federatedRoom.getOwner(), |
||||
users: federatedRoom.getUsers(), |
||||
}; |
||||
|
||||
return this.createEventForPeers('drc', payload, peers); |
||||
} |
||||
|
||||
// Create a `roomCreated(roc)` event
|
||||
roomCreated(federatedRoom, options = {}) { |
||||
const peers = normalizePeers(federatedRoom.getPeers(), options); |
||||
|
||||
const payload = { |
||||
room: federatedRoom.getRoom(), |
||||
owner: federatedRoom.getOwner(), |
||||
users: federatedRoom.getUsers(), |
||||
}; |
||||
|
||||
return this.createEventForPeers('roc', payload, peers); |
||||
} |
||||
|
||||
// Create a `userJoined(usj)` event
|
||||
userJoined(federatedRoom, federatedUser, options = {}) { |
||||
const peers = normalizePeers(federatedRoom.getPeers(), options); |
||||
|
||||
const payload = { |
||||
federated_room_id: federatedRoom.getFederationId(), |
||||
user: federatedUser.getUser(), |
||||
}; |
||||
|
||||
return this.createEventForPeers('usj', payload, peers); |
||||
} |
||||
|
||||
// Create a `userAdded(usa)` event
|
||||
userAdded(federatedRoom, federatedUser, federatedInviter, options = {}) { |
||||
const peers = normalizePeers(federatedRoom.getPeers(), options); |
||||
|
||||
const payload = { |
||||
federated_room_id: federatedRoom.getFederationId(), |
||||
federated_inviter_id: federatedInviter.getFederationId(), |
||||
user: federatedUser.getUser(), |
||||
}; |
||||
|
||||
return this.createEventForPeers('usa', payload, peers); |
||||
} |
||||
|
||||
// Create a `userLeft(usl)` event
|
||||
userLeft(federatedRoom, federatedUser, options = {}) { |
||||
const peers = normalizePeers(federatedRoom.getPeers(), options); |
||||
|
||||
const payload = { |
||||
federated_room_id: federatedRoom.getFederationId(), |
||||
federated_user_id: federatedUser.getFederationId(), |
||||
}; |
||||
|
||||
return this.createEventForPeers('usl', payload, peers); |
||||
} |
||||
|
||||
// Create a `userRemoved(usr)` event
|
||||
userRemoved(federatedRoom, federatedUser, federatedRemovedByUser, options = {}) { |
||||
const peers = normalizePeers(federatedRoom.getPeers(), options); |
||||
|
||||
const payload = { |
||||
federated_room_id: federatedRoom.getFederationId(), |
||||
federated_user_id: federatedUser.getFederationId(), |
||||
federated_removed_by_user_id: federatedRemovedByUser.getFederationId(), |
||||
}; |
||||
|
||||
return this.createEventForPeers('usr', payload, peers); |
||||
} |
||||
|
||||
// Create a `userMuted(usm)` event
|
||||
userMuted(federatedRoom, federatedUser, federatedMutedByUser, options = {}) { |
||||
const peers = normalizePeers(federatedRoom.getPeers(), options); |
||||
|
||||
const payload = { |
||||
federated_room_id: federatedRoom.getFederationId(), |
||||
federated_user_id: federatedUser.getFederationId(), |
||||
federated_muted_by_user_id: federatedMutedByUser.getFederationId(), |
||||
}; |
||||
|
||||
return this.createEventForPeers('usm', payload, peers); |
||||
} |
||||
|
||||
// Create a `userUnmuted(usu)` event
|
||||
userUnmuted(federatedRoom, federatedUser, federatedUnmutedByUser, options = {}) { |
||||
const peers = normalizePeers(federatedRoom.getPeers(), options); |
||||
|
||||
const payload = { |
||||
federated_room_id: federatedRoom.getFederationId(), |
||||
federated_user_id: federatedUser.getFederationId(), |
||||
federated_unmuted_by_user_id: federatedUnmutedByUser.getFederationId(), |
||||
}; |
||||
|
||||
return this.createEventForPeers('usu', payload, peers); |
||||
} |
||||
|
||||
// Create a `messageCreated(msc)` event
|
||||
messageCreated(federatedRoom, federatedMessage, options = {}) { |
||||
const peers = normalizePeers(federatedRoom.getPeers(), options); |
||||
|
||||
const payload = { |
||||
message: federatedMessage.getMessage(), |
||||
}; |
||||
|
||||
return this.createEventForPeers('msc', payload, peers); |
||||
} |
||||
|
||||
// Create a `messageUpdated(msu)` event
|
||||
messageUpdated(federatedRoom, federatedMessage, federatedUser, options = {}) { |
||||
const peers = normalizePeers(federatedRoom.getPeers(), options); |
||||
|
||||
const payload = { |
||||
message: federatedMessage.getMessage(), |
||||
federated_user_id: federatedUser.getFederationId(), |
||||
}; |
||||
|
||||
return this.createEventForPeers('msu', payload, peers); |
||||
} |
||||
|
||||
// Create a `deleteMessage(msd)` event
|
||||
messageDeleted(federatedRoom, federatedMessage, options = {}) { |
||||
const peers = normalizePeers(federatedRoom.getPeers(), options); |
||||
|
||||
const payload = { |
||||
federated_message_id: federatedMessage.getFederationId(), |
||||
}; |
||||
|
||||
return this.createEventForPeers('msd', payload, peers); |
||||
} |
||||
|
||||
// Create a `messagesRead(msr)` event
|
||||
messagesRead(federatedRoom, federatedUser, options = {}) { |
||||
const peers = normalizePeers(federatedRoom.getPeers(), options); |
||||
|
||||
const payload = { |
||||
federated_room_id: federatedRoom.getFederationId(), |
||||
federated_user_id: federatedUser.getFederationId(), |
||||
}; |
||||
|
||||
return this.createEventForPeers('msr', payload, peers); |
||||
} |
||||
|
||||
// Create a `messagesSetReaction(mrs)` event
|
||||
messagesSetReaction(federatedRoom, federatedMessage, federatedUser, reaction, shouldReact, options = {}) { |
||||
const peers = normalizePeers(federatedRoom.getPeers(), options); |
||||
|
||||
const payload = { |
||||
federated_room_id: federatedRoom.getFederationId(), |
||||
federated_message_id: federatedMessage.getFederationId(), |
||||
federated_user_id: federatedUser.getFederationId(), |
||||
reaction, |
||||
shouldReact, |
||||
}; |
||||
|
||||
return this.createEventForPeers('mrs', payload, peers); |
||||
} |
||||
|
||||
// Create a `messagesUnsetReaction(mru)` event
|
||||
messagesUnsetReaction(federatedRoom, federatedMessage, federatedUser, reaction, shouldReact, options = {}) { |
||||
const peers = normalizePeers(federatedRoom.getPeers(), options); |
||||
|
||||
const payload = { |
||||
federated_room_id: federatedRoom.getFederationId(), |
||||
federated_message_id: federatedMessage.getFederationId(), |
||||
federated_user_id: federatedUser.getFederationId(), |
||||
reaction, |
||||
shouldReact, |
||||
}; |
||||
|
||||
return this.createEventForPeers('mru', payload, peers); |
||||
} |
||||
|
||||
// Get all unfulfilled events
|
||||
getUnfulfilled() { |
||||
return this.find({ fulfilled: false }, { sort: { ts: 1 } }).fetch(); |
||||
} |
||||
|
||||
|
||||
} |
||||
|
||||
export const FederationEvents = new FederationEventsModel(); |
||||
@ -0,0 +1,69 @@ |
||||
import NodeRSA from 'node-rsa'; |
||||
import uuid from 'uuid/v4'; |
||||
|
||||
import { Base } from './_Base'; |
||||
|
||||
class FederationKeysModel extends Base { |
||||
constructor() { |
||||
super('federation_keys'); |
||||
} |
||||
|
||||
getKey(type) { |
||||
const keyResource = this.findOne({ type }); |
||||
|
||||
if (!keyResource) { return null; } |
||||
|
||||
return keyResource.key; |
||||
} |
||||
|
||||
loadKey(keyData, type) { |
||||
return new NodeRSA(keyData, `pkcs8-${ type }-pem`); |
||||
} |
||||
|
||||
generateKeys() { |
||||
const key = new NodeRSA({ b: 512 }); |
||||
|
||||
key.generateKeyPair(); |
||||
|
||||
this.update({ type: 'private' }, { type: 'private', key: key.exportKey('pkcs8-private-pem').replace(/\n|\r/g, '') }, { upsert: true }); |
||||
|
||||
this.update({ type: 'public' }, { type: 'public', key: key.exportKey('pkcs8-public-pem').replace(/\n|\r/g, '') }, { upsert: true }); |
||||
|
||||
return { |
||||
privateKey: this.getPrivateKey(), |
||||
publicKey: this.getPublicKey(), |
||||
}; |
||||
} |
||||
|
||||
generateUniqueId() { |
||||
const uniqueId = uuid(); |
||||
|
||||
this.update({ type: 'unique' }, { type: 'unique', key: uniqueId }, { upsert: true }); |
||||
} |
||||
|
||||
getUniqueId() { |
||||
return (this.findOne({ type: 'unique' }) || {}).key; |
||||
} |
||||
|
||||
getPrivateKey() { |
||||
const keyData = this.getKey('private'); |
||||
|
||||
return keyData && this.loadKey(keyData, 'private'); |
||||
} |
||||
|
||||
getPrivateKeyString() { |
||||
return this.getKey('private'); |
||||
} |
||||
|
||||
getPublicKey() { |
||||
const keyData = this.getKey('public'); |
||||
|
||||
return keyData && this.loadKey(keyData, 'public'); |
||||
} |
||||
|
||||
getPublicKeyString() { |
||||
return this.getKey('public'); |
||||
} |
||||
} |
||||
|
||||
export const FederationKeys = new FederationKeysModel(); |
||||
@ -1 +1 @@ |
||||
import './setReaction'; |
||||
export { setReaction } from './setReaction'; |
||||
|
||||
Loading…
Reference in new issue