diff --git a/.changeset/mean-guests-love.md b/.changeset/mean-guests-love.md new file mode 100644 index 00000000000..783319ae1ba --- /dev/null +++ b/.changeset/mean-guests-love.md @@ -0,0 +1,11 @@ +--- +'@rocket.chat/meteor': minor +--- + +feat: *Enterprise* Add support for different transporters to connect multiple monolith instances. + +To use that, you can use the `TRANSPORTER` env var adding "monolith+" to the transporter value. To use NATS for example, your env var should be: + +```bash +export TRANSPORTER="monolith+nats://localhost:4222" +``` diff --git a/apps/meteor/ee/server/local-services/instance/getTransporter.ts b/apps/meteor/ee/server/local-services/instance/getTransporter.ts new file mode 100644 index 00000000000..c075dd2ad35 --- /dev/null +++ b/apps/meteor/ee/server/local-services/instance/getTransporter.ts @@ -0,0 +1,15 @@ +export function getTransporter({ transporter, port }: { transporter?: string; port?: string } = {}) { + if (transporter) { + if (!transporter.match(/^(?:monolith\+)/)) { + throw new Error('invalid transporter'); + } + + const [, ...url] = transporter.split('+'); + return url.join(''); + } + + return { + port: port ? port.trim() : 0, + udpDiscovery: false, + }; +} diff --git a/apps/meteor/ee/server/local-services/instance/service.ts b/apps/meteor/ee/server/local-services/instance/service.ts index 46dbcaa5194..c48481bc642 100644 --- a/apps/meteor/ee/server/local-services/instance/service.ts +++ b/apps/meteor/ee/server/local-services/instance/service.ts @@ -1,19 +1,24 @@ import os from 'os'; import type { BrokerNode } from 'moleculer'; -import { ServiceBroker } from 'moleculer'; +import { ServiceBroker, Transporters } from 'moleculer'; import { License, ServiceClassInternal } from '@rocket.chat/core-services'; import { InstanceStatus as InstanceStatusRaw } from '@rocket.chat/models'; import { InstanceStatus } from '@rocket.chat/instance-status'; import { StreamerCentral } from '../../../../server/modules/streamer/streamer.module'; import type { IInstanceService } from '../../sdk/types/IInstanceService'; +import { getTransporter } from './getTransporter'; export class InstanceService extends ServiceClassInternal implements IInstanceService { protected name = 'instance'; private broadcastStarted = false; + private transporter: Transporters.TCP | Transporters.NATS; + + private isTransporterTCP = true; + private broker: ServiceBroker; private troubleshootDisableInstanceBroadcast = false; @@ -21,15 +26,25 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe constructor() { super(); - this.onEvent('watch.instanceStatus', async ({ clientAction, data }): Promise => { - if (clientAction === 'removed') { - return; - } + const tx = getTransporter({ transporter: process.env.TRANSPORTER, port: process.env.TCP_PORT }); + if (typeof tx === 'string') { + this.transporter = new Transporters.NATS({ url: tx }); + this.isTransporterTCP = false; + } else { + this.transporter = new Transporters.TCP(tx); + } - if (clientAction === 'inserted' && data?.extraInformation?.port) { - this.connectNode(data); - } - }); + if (this.isTransporterTCP) { + this.onEvent('watch.instanceStatus', async ({ clientAction, data }): Promise => { + if (clientAction === 'removed') { + return; + } + + if (clientAction === 'inserted' && data?.extraInformation?.tcpPort) { + this.connectNode(data); + } + }); + } this.onEvent('license.module', async ({ module, valid }) => { if (module === 'scalability' && valid) { @@ -60,17 +75,9 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe } async created() { - const port = process.env.TCP_PORT ? String(process.env.TCP_PORT).trim() : 0; - this.broker = new ServiceBroker({ nodeID: InstanceStatus.id(), - transporter: { - type: 'TCP', - options: { - port, - udpDiscovery: false, - }, - }, + transporter: this.transporter, }); this.broker.createService({ @@ -135,18 +142,20 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe StreamerCentral.on('broadcast', this.sendBroadcast.bind(this)); - await InstanceStatusRaw.find( - { - 'extraInformation.tcpPort': { - $exists: true, + if (this.isTransporterTCP) { + await InstanceStatusRaw.find( + { + 'extraInformation.tcpPort': { + $exists: true, + }, }, - }, - { - sort: { - _createdAt: -1, + { + sort: { + _createdAt: -1, + }, }, - }, - ).forEach(this.connectNode.bind(this)); + ).forEach(this.connectNode.bind(this)); + } } private connectNode(record: any) { diff --git a/apps/meteor/ee/tests/unit/apps/meteor/ee/server/local-services/instance/getTransporter.spec.ts b/apps/meteor/ee/tests/unit/apps/meteor/ee/server/local-services/instance/getTransporter.spec.ts new file mode 100644 index 00000000000..0fc3707cd8c --- /dev/null +++ b/apps/meteor/ee/tests/unit/apps/meteor/ee/server/local-services/instance/getTransporter.spec.ts @@ -0,0 +1,25 @@ +import { expect } from 'chai'; + +import { getTransporter } from '../../../../../../../../server/local-services/instance/getTransporter'; + +describe('getTransporter', () => { + it('should return TCP with port 0 by default', () => { + expect(getTransporter()).to.deep.equal({ port: 0, udpDiscovery: false }); + }); + + it('should return TCP with port set via env var', () => { + expect(getTransporter({ port: '1234' })).to.deep.equal({ port: '1234', udpDiscovery: false }); + + expect(getTransporter({ port: ' 1234' })).to.deep.equal({ port: '1234', udpDiscovery: false }); + + expect(getTransporter({ port: ' 1234 ' })).to.deep.equal({ port: '1234', udpDiscovery: false }); + }); + + it('should throw if transporter set incorrectly', () => { + expect(() => getTransporter({ transporter: 'something' })).to.throw('invalid transporter'); + }); + + it('should return transporter if set correctly', () => { + expect(getTransporter({ transporter: 'monolith+nats://address' })).to.equal('nats://address'); + }); +});