diff --git a/_templates/service/new/service.ejs.t b/_templates/service/new/service.ejs.t index b0880bd4753..ec307ecdce7 100644 --- a/_templates/service/new/service.ejs.t +++ b/_templates/service/new/service.ejs.t @@ -3,7 +3,7 @@ to: ee/apps/<%= name %>/src/service.ts --- import { api, getConnection, getTrashCollection } from '@rocket.chat/core-services'; import { registerServiceModels } from '@rocket.chat/models'; -import { broker } from '@rocket.chat/network-broker'; +import { startBroker } from '@rocket.chat/network-broker'; import { startTracing } from '@rocket.chat/tracing'; import polka from 'polka'; @@ -16,7 +16,7 @@ const PORT = process.env.PORT || <%= h.random() %>; registerServiceModels(db, await getTrashCollection()); - api.setBroker(broker); + api.setBroker(startBroker()); // need to import service after models are registered const { <%= h.changeCase.pascalCase(name) %> } = await import('./<%= h.changeCase.pascalCase(name) %>'); diff --git a/apps/meteor/ee/server/startup/index.ts b/apps/meteor/ee/server/startup/index.ts index 51bb011828b..e295467e117 100644 --- a/apps/meteor/ee/server/startup/index.ts +++ b/apps/meteor/ee/server/startup/index.ts @@ -12,9 +12,9 @@ import { isRunningMs } from '../../../server/lib/isRunningMs'; export const registerEEBroker = async (): Promise => { // only starts network broker if running in micro services mode if (isRunningMs()) { - const { broker } = await import('@rocket.chat/network-broker'); + const { startBroker } = await import('@rocket.chat/network-broker'); - api.setBroker(broker); + api.setBroker(startBroker()); void api.start(); } else { require('./presence'); diff --git a/ee/apps/account-service/src/service.ts b/ee/apps/account-service/src/service.ts index 54ecd217091..470d9833136 100755 --- a/ee/apps/account-service/src/service.ts +++ b/ee/apps/account-service/src/service.ts @@ -1,6 +1,6 @@ import { api, getConnection, getTrashCollection } from '@rocket.chat/core-services'; import { registerServiceModels } from '@rocket.chat/models'; -import { broker } from '@rocket.chat/network-broker'; +import { startBroker } from '@rocket.chat/network-broker'; import { startTracing } from '@rocket.chat/tracing'; import polka from 'polka'; @@ -13,7 +13,7 @@ const PORT = process.env.PORT || 3033; registerServiceModels(db, await getTrashCollection()); - api.setBroker(broker); + api.setBroker(startBroker()); // need to import service after models are registered const { Account } = await import('./Account'); diff --git a/ee/apps/authorization-service/src/service.ts b/ee/apps/authorization-service/src/service.ts index 0ed1e83177a..e09d87de6d2 100755 --- a/ee/apps/authorization-service/src/service.ts +++ b/ee/apps/authorization-service/src/service.ts @@ -1,6 +1,6 @@ import { api, getConnection, getTrashCollection } from '@rocket.chat/core-services'; import { registerServiceModels } from '@rocket.chat/models'; -import { broker } from '@rocket.chat/network-broker'; +import { startBroker } from '@rocket.chat/network-broker'; import { startTracing } from '@rocket.chat/tracing'; import polka from 'polka'; @@ -13,7 +13,7 @@ const PORT = process.env.PORT || 3034; registerServiceModels(db, await getTrashCollection()); - api.setBroker(broker); + api.setBroker(startBroker()); // need to import service after models are registered const { Authorization } = await import('../../../../apps/meteor/server/services/authorization/service'); diff --git a/ee/apps/ddp-streamer/src/service.ts b/ee/apps/ddp-streamer/src/service.ts index 0afea31dc23..e0e430c7eeb 100755 --- a/ee/apps/ddp-streamer/src/service.ts +++ b/ee/apps/ddp-streamer/src/service.ts @@ -1,6 +1,9 @@ +import os from 'os'; + import { api, getConnection, getTrashCollection } from '@rocket.chat/core-services'; +import { InstanceStatus } from '@rocket.chat/instance-status'; import { registerServiceModels } from '@rocket.chat/models'; -import { broker } from '@rocket.chat/network-broker'; +import { startBroker } from '@rocket.chat/network-broker'; import { startTracing } from '@rocket.chat/tracing'; (async () => { @@ -10,7 +13,11 @@ import { startTracing } from '@rocket.chat/tracing'; registerServiceModels(db, await getTrashCollection()); - api.setBroker(broker); + api.setBroker( + startBroker({ + nodeID: `${os.hostname().toLowerCase()}-${InstanceStatus.id()}`, + }), + ); // need to import service after models are registered const { NotificationsModule } = await import('../../../../apps/meteor/server/modules/notifications/notifications.module'); diff --git a/ee/apps/omnichannel-transcript/src/service.ts b/ee/apps/omnichannel-transcript/src/service.ts index 840da856b6d..689f710327c 100644 --- a/ee/apps/omnichannel-transcript/src/service.ts +++ b/ee/apps/omnichannel-transcript/src/service.ts @@ -1,7 +1,7 @@ import { api, getConnection, getTrashCollection } from '@rocket.chat/core-services'; import { Logger } from '@rocket.chat/logger'; import { registerServiceModels } from '@rocket.chat/models'; -import { broker } from '@rocket.chat/network-broker'; +import { startBroker } from '@rocket.chat/network-broker'; import { startTracing } from '@rocket.chat/tracing'; import polka from 'polka'; @@ -14,7 +14,7 @@ const PORT = process.env.PORT || 3036; registerServiceModels(db, await getTrashCollection()); - api.setBroker(broker); + api.setBroker(startBroker()); // need to import service after models are registered const { OmnichannelTranscript } = await import('@rocket.chat/omnichannel-services'); diff --git a/ee/apps/presence-service/src/service.ts b/ee/apps/presence-service/src/service.ts index e87d659dac2..ab1c1ee4056 100755 --- a/ee/apps/presence-service/src/service.ts +++ b/ee/apps/presence-service/src/service.ts @@ -1,6 +1,6 @@ import { api, getConnection, getTrashCollection } from '@rocket.chat/core-services'; import { registerServiceModels } from '@rocket.chat/models'; -import { broker } from '@rocket.chat/network-broker'; +import { startBroker } from '@rocket.chat/network-broker'; import { startTracing } from '@rocket.chat/tracing'; import polka from 'polka'; @@ -13,7 +13,7 @@ const PORT = process.env.PORT || 3031; registerServiceModels(db, await getTrashCollection()); - api.setBroker(broker); + api.setBroker(startBroker()); // need to import Presence service after models are registered const { Presence } = await import('@rocket.chat/presence'); diff --git a/ee/apps/queue-worker/src/service.ts b/ee/apps/queue-worker/src/service.ts index 69817f42a32..7895e78b37e 100644 --- a/ee/apps/queue-worker/src/service.ts +++ b/ee/apps/queue-worker/src/service.ts @@ -1,7 +1,7 @@ import { api, getConnection, getTrashCollection } from '@rocket.chat/core-services'; import { Logger } from '@rocket.chat/logger'; import { registerServiceModels } from '@rocket.chat/models'; -import { broker } from '@rocket.chat/network-broker'; +import { startBroker } from '@rocket.chat/network-broker'; import { startTracing } from '@rocket.chat/tracing'; import polka from 'polka'; @@ -14,7 +14,7 @@ const PORT = process.env.PORT || 3038; registerServiceModels(db, await getTrashCollection()); - api.setBroker(broker); + api.setBroker(startBroker()); // need to import service after models are registeredpackagfe const { QueueWorker } = await import('@rocket.chat/omnichannel-services'); diff --git a/ee/apps/stream-hub-service/src/service.ts b/ee/apps/stream-hub-service/src/service.ts index 576e4e5e9be..12f3d4fe026 100755 --- a/ee/apps/stream-hub-service/src/service.ts +++ b/ee/apps/stream-hub-service/src/service.ts @@ -1,7 +1,7 @@ import { api, getConnection, getTrashCollection } from '@rocket.chat/core-services'; import { Logger } from '@rocket.chat/logger'; import { DatabaseWatcher, registerServiceModels } from '@rocket.chat/models'; -import { broker } from '@rocket.chat/network-broker'; +import { startBroker } from '@rocket.chat/network-broker'; import { startTracing } from '@rocket.chat/tracing'; import polka from 'polka'; @@ -16,7 +16,7 @@ const PORT = process.env.PORT || 3035; registerServiceModels(db, await getTrashCollection()); - api.setBroker(broker); + api.setBroker(startBroker()); // TODO having to import Logger to pass as a param is a temporary solution. logger should come from the service (either from broker or api) const watcher = new DatabaseWatcher({ db, logger: Logger }); diff --git a/ee/packages/network-broker/src/index.ts b/ee/packages/network-broker/src/index.ts index 4424c084bda..2c6da09750e 100644 --- a/ee/packages/network-broker/src/index.ts +++ b/ee/packages/network-broker/src/index.ts @@ -1,5 +1,6 @@ import { isMeteorError, MeteorError } from '@rocket.chat/core-services'; import EJSON from 'ejson'; +import type Moleculer from 'moleculer'; import { Errors, Serializers, ServiceBroker } from 'moleculer'; import { pino } from 'pino'; @@ -70,82 +71,85 @@ class EJSONSerializer extends Base { } } -const network = new ServiceBroker({ - namespace: MS_NAMESPACE, - skipProcessEventRegistration: SKIP_PROCESS_EVENT_REGISTRATION === 'true', - transporter: TRANSPORTER, - metrics: { - enabled: MS_METRICS === 'true', - reporter: [ - { - type: 'Prometheus', - options: { - port: MS_METRICS_PORT, +export function startBroker(options: Moleculer.BrokerOptions = {}): NetworkBroker { + const network = new ServiceBroker({ + namespace: MS_NAMESPACE, + skipProcessEventRegistration: SKIP_PROCESS_EVENT_REGISTRATION === 'true', + transporter: TRANSPORTER, + metrics: { + enabled: MS_METRICS === 'true', + reporter: [ + { + type: 'Prometheus', + options: { + port: MS_METRICS_PORT, + }, }, - }, - ], - }, - cacher: CACHE, - serializer: SERIALIZER === 'EJSON' ? new EJSONSerializer() : SERIALIZER, - logger: { - type: 'Pino', - options: { - level: MOLECULER_LOG_LEVEL, - pino: { - options: { - timestamp: pino.stdTimeFunctions.isoTime, - ...(process.env.NODE_ENV !== 'production' - ? { - transport: { - target: 'pino-pretty', - options: { - colorize: true, + ], + }, + cacher: CACHE, + serializer: SERIALIZER === 'EJSON' ? new EJSONSerializer() : SERIALIZER, + logger: { + type: 'Pino', + options: { + level: MOLECULER_LOG_LEVEL, + pino: { + options: { + timestamp: pino.stdTimeFunctions.isoTime, + ...(process.env.NODE_ENV !== 'production' + ? { + transport: { + target: 'pino-pretty', + options: { + colorize: true, + }, }, - }, - } - : {}), + } + : {}), + }, }, }, }, - }, - registry: { - strategy: BALANCE_STRATEGY, - preferLocal: BALANCE_PREFER_LOCAL !== 'false', - }, - - requestTimeout: parseInt(REQUEST_TIMEOUT) * 1000, - retryPolicy: { - enabled: RETRY_ENABLED === 'true', - retries: parseInt(RETRY_RETRIES), - delay: parseInt(RETRY_DELAY), - maxDelay: parseInt(RETRY_MAX_DELAY), - factor: parseInt(RETRY_FACTOR), - check: (err: any): boolean => err && !!err.retryable, - }, - - maxCallLevel: 100, - heartbeatInterval: parseInt(HEARTBEAT_INTERVAL), - heartbeatTimeout: parseInt(HEARTBEAT_TIMEOUT), - - // circuitBreaker: { - // enabled: false, - // threshold: 0.5, - // windowTime: 60, - // minRequestCount: 20, - // halfOpenTime: 10 * 1000, - // check: (err: any): boolean => err && err.code >= 500, - // }, - - bulkhead: { - enabled: BULKHEAD_ENABLED === 'true', - concurrency: parseInt(BULKHEAD_CONCURRENCY), - maxQueueSize: parseInt(BULKHEAD_MAX_QUEUE_SIZE), - }, - - errorRegenerator: new CustomRegenerator(), - started(): void { - console.log('NetworkBroker started successfully.'); - }, -}); - -export const broker = new NetworkBroker(network); + registry: { + strategy: BALANCE_STRATEGY, + preferLocal: BALANCE_PREFER_LOCAL !== 'false', + }, + + requestTimeout: parseInt(REQUEST_TIMEOUT) * 1000, + retryPolicy: { + enabled: RETRY_ENABLED === 'true', + retries: parseInt(RETRY_RETRIES), + delay: parseInt(RETRY_DELAY), + maxDelay: parseInt(RETRY_MAX_DELAY), + factor: parseInt(RETRY_FACTOR), + check: (err: any): boolean => err && !!err.retryable, + }, + + maxCallLevel: 100, + heartbeatInterval: parseInt(HEARTBEAT_INTERVAL), + heartbeatTimeout: parseInt(HEARTBEAT_TIMEOUT), + + // circuitBreaker: { + // enabled: false, + // threshold: 0.5, + // windowTime: 60, + // minRequestCount: 20, + // halfOpenTime: 10 * 1000, + // check: (err: any): boolean => err && err.code >= 500, + // }, + + bulkhead: { + enabled: BULKHEAD_ENABLED === 'true', + concurrency: parseInt(BULKHEAD_CONCURRENCY), + maxQueueSize: parseInt(BULKHEAD_MAX_QUEUE_SIZE), + }, + + errorRegenerator: new CustomRegenerator(), + started(): void { + console.log('NetworkBroker started successfully.'); + }, + ...options, + }); + + return new NetworkBroker(network); +}