fix: ddp-streamer restart keeping opened connections (#35195)

pull/35137/head^2
Diego Sampaio 11 months ago committed by GitHub
parent f89741d174
commit a1d3faa0e2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 4
      _templates/service/new/service.ejs.t
  2. 4
      apps/meteor/ee/server/startup/index.ts
  3. 4
      ee/apps/account-service/src/service.ts
  4. 4
      ee/apps/authorization-service/src/service.ts
  5. 11
      ee/apps/ddp-streamer/src/service.ts
  6. 4
      ee/apps/omnichannel-transcript/src/service.ts
  7. 4
      ee/apps/presence-service/src/service.ts
  8. 4
      ee/apps/queue-worker/src/service.ts
  9. 4
      ee/apps/stream-hub-service/src/service.ts
  10. 152
      ee/packages/network-broker/src/index.ts

@ -3,7 +3,7 @@ to: ee/apps/<%= name %>/src/service.ts
---
import { api, getConnection, getTrashCollection } from '@rocket.chat/core-services';
import { registerServiceModels } from '@rocket.chat/models';
import { broker } from '@rocket.chat/network-broker';
import { startBroker } from '@rocket.chat/network-broker';
import { startTracing } from '@rocket.chat/tracing';
import polka from 'polka';
@ -16,7 +16,7 @@ const PORT = process.env.PORT || <%= h.random() %>;
registerServiceModels(db, await getTrashCollection());
api.setBroker(broker);
api.setBroker(startBroker());
// need to import service after models are registered
const { <%= h.changeCase.pascalCase(name) %> } = await import('./<%= h.changeCase.pascalCase(name) %>');

@ -12,9 +12,9 @@ import { isRunningMs } from '../../../server/lib/isRunningMs';
export const registerEEBroker = async (): Promise<void> => {
// only starts network broker if running in micro services mode
if (isRunningMs()) {
const { broker } = await import('@rocket.chat/network-broker');
const { startBroker } = await import('@rocket.chat/network-broker');
api.setBroker(broker);
api.setBroker(startBroker());
void api.start();
} else {
require('./presence');

@ -1,6 +1,6 @@
import { api, getConnection, getTrashCollection } from '@rocket.chat/core-services';
import { registerServiceModels } from '@rocket.chat/models';
import { broker } from '@rocket.chat/network-broker';
import { startBroker } from '@rocket.chat/network-broker';
import { startTracing } from '@rocket.chat/tracing';
import polka from 'polka';
@ -13,7 +13,7 @@ const PORT = process.env.PORT || 3033;
registerServiceModels(db, await getTrashCollection());
api.setBroker(broker);
api.setBroker(startBroker());
// need to import service after models are registered
const { Account } = await import('./Account');

@ -1,6 +1,6 @@
import { api, getConnection, getTrashCollection } from '@rocket.chat/core-services';
import { registerServiceModels } from '@rocket.chat/models';
import { broker } from '@rocket.chat/network-broker';
import { startBroker } from '@rocket.chat/network-broker';
import { startTracing } from '@rocket.chat/tracing';
import polka from 'polka';
@ -13,7 +13,7 @@ const PORT = process.env.PORT || 3034;
registerServiceModels(db, await getTrashCollection());
api.setBroker(broker);
api.setBroker(startBroker());
// need to import service after models are registered
const { Authorization } = await import('../../../../apps/meteor/server/services/authorization/service');

@ -1,6 +1,9 @@
import os from 'os';
import { api, getConnection, getTrashCollection } from '@rocket.chat/core-services';
import { InstanceStatus } from '@rocket.chat/instance-status';
import { registerServiceModels } from '@rocket.chat/models';
import { broker } from '@rocket.chat/network-broker';
import { startBroker } from '@rocket.chat/network-broker';
import { startTracing } from '@rocket.chat/tracing';
(async () => {
@ -10,7 +13,11 @@ import { startTracing } from '@rocket.chat/tracing';
registerServiceModels(db, await getTrashCollection());
api.setBroker(broker);
api.setBroker(
startBroker({
nodeID: `${os.hostname().toLowerCase()}-${InstanceStatus.id()}`,
}),
);
// need to import service after models are registered
const { NotificationsModule } = await import('../../../../apps/meteor/server/modules/notifications/notifications.module');

@ -1,7 +1,7 @@
import { api, getConnection, getTrashCollection } from '@rocket.chat/core-services';
import { Logger } from '@rocket.chat/logger';
import { registerServiceModels } from '@rocket.chat/models';
import { broker } from '@rocket.chat/network-broker';
import { startBroker } from '@rocket.chat/network-broker';
import { startTracing } from '@rocket.chat/tracing';
import polka from 'polka';
@ -14,7 +14,7 @@ const PORT = process.env.PORT || 3036;
registerServiceModels(db, await getTrashCollection());
api.setBroker(broker);
api.setBroker(startBroker());
// need to import service after models are registered
const { OmnichannelTranscript } = await import('@rocket.chat/omnichannel-services');

@ -1,6 +1,6 @@
import { api, getConnection, getTrashCollection } from '@rocket.chat/core-services';
import { registerServiceModels } from '@rocket.chat/models';
import { broker } from '@rocket.chat/network-broker';
import { startBroker } from '@rocket.chat/network-broker';
import { startTracing } from '@rocket.chat/tracing';
import polka from 'polka';
@ -13,7 +13,7 @@ const PORT = process.env.PORT || 3031;
registerServiceModels(db, await getTrashCollection());
api.setBroker(broker);
api.setBroker(startBroker());
// need to import Presence service after models are registered
const { Presence } = await import('@rocket.chat/presence');

@ -1,7 +1,7 @@
import { api, getConnection, getTrashCollection } from '@rocket.chat/core-services';
import { Logger } from '@rocket.chat/logger';
import { registerServiceModels } from '@rocket.chat/models';
import { broker } from '@rocket.chat/network-broker';
import { startBroker } from '@rocket.chat/network-broker';
import { startTracing } from '@rocket.chat/tracing';
import polka from 'polka';
@ -14,7 +14,7 @@ const PORT = process.env.PORT || 3038;
registerServiceModels(db, await getTrashCollection());
api.setBroker(broker);
api.setBroker(startBroker());
// need to import service after models are registeredpackagfe
const { QueueWorker } = await import('@rocket.chat/omnichannel-services');

@ -1,7 +1,7 @@
import { api, getConnection, getTrashCollection } from '@rocket.chat/core-services';
import { Logger } from '@rocket.chat/logger';
import { DatabaseWatcher, registerServiceModels } from '@rocket.chat/models';
import { broker } from '@rocket.chat/network-broker';
import { startBroker } from '@rocket.chat/network-broker';
import { startTracing } from '@rocket.chat/tracing';
import polka from 'polka';
@ -16,7 +16,7 @@ const PORT = process.env.PORT || 3035;
registerServiceModels(db, await getTrashCollection());
api.setBroker(broker);
api.setBroker(startBroker());
// TODO having to import Logger to pass as a param is a temporary solution. logger should come from the service (either from broker or api)
const watcher = new DatabaseWatcher({ db, logger: Logger });

@ -1,5 +1,6 @@
import { isMeteorError, MeteorError } from '@rocket.chat/core-services';
import EJSON from 'ejson';
import type Moleculer from 'moleculer';
import { Errors, Serializers, ServiceBroker } from 'moleculer';
import { pino } from 'pino';
@ -70,82 +71,85 @@ class EJSONSerializer extends Base {
}
}
const network = new ServiceBroker({
namespace: MS_NAMESPACE,
skipProcessEventRegistration: SKIP_PROCESS_EVENT_REGISTRATION === 'true',
transporter: TRANSPORTER,
metrics: {
enabled: MS_METRICS === 'true',
reporter: [
{
type: 'Prometheus',
options: {
port: MS_METRICS_PORT,
export function startBroker(options: Moleculer.BrokerOptions = {}): NetworkBroker {
const network = new ServiceBroker({
namespace: MS_NAMESPACE,
skipProcessEventRegistration: SKIP_PROCESS_EVENT_REGISTRATION === 'true',
transporter: TRANSPORTER,
metrics: {
enabled: MS_METRICS === 'true',
reporter: [
{
type: 'Prometheus',
options: {
port: MS_METRICS_PORT,
},
},
},
],
},
cacher: CACHE,
serializer: SERIALIZER === 'EJSON' ? new EJSONSerializer() : SERIALIZER,
logger: {
type: 'Pino',
options: {
level: MOLECULER_LOG_LEVEL,
pino: {
options: {
timestamp: pino.stdTimeFunctions.isoTime,
...(process.env.NODE_ENV !== 'production'
? {
transport: {
target: 'pino-pretty',
options: {
colorize: true,
],
},
cacher: CACHE,
serializer: SERIALIZER === 'EJSON' ? new EJSONSerializer() : SERIALIZER,
logger: {
type: 'Pino',
options: {
level: MOLECULER_LOG_LEVEL,
pino: {
options: {
timestamp: pino.stdTimeFunctions.isoTime,
...(process.env.NODE_ENV !== 'production'
? {
transport: {
target: 'pino-pretty',
options: {
colorize: true,
},
},
},
}
: {}),
}
: {}),
},
},
},
},
},
registry: {
strategy: BALANCE_STRATEGY,
preferLocal: BALANCE_PREFER_LOCAL !== 'false',
},
requestTimeout: parseInt(REQUEST_TIMEOUT) * 1000,
retryPolicy: {
enabled: RETRY_ENABLED === 'true',
retries: parseInt(RETRY_RETRIES),
delay: parseInt(RETRY_DELAY),
maxDelay: parseInt(RETRY_MAX_DELAY),
factor: parseInt(RETRY_FACTOR),
check: (err: any): boolean => err && !!err.retryable,
},
maxCallLevel: 100,
heartbeatInterval: parseInt(HEARTBEAT_INTERVAL),
heartbeatTimeout: parseInt(HEARTBEAT_TIMEOUT),
// circuitBreaker: {
// enabled: false,
// threshold: 0.5,
// windowTime: 60,
// minRequestCount: 20,
// halfOpenTime: 10 * 1000,
// check: (err: any): boolean => err && err.code >= 500,
// },
bulkhead: {
enabled: BULKHEAD_ENABLED === 'true',
concurrency: parseInt(BULKHEAD_CONCURRENCY),
maxQueueSize: parseInt(BULKHEAD_MAX_QUEUE_SIZE),
},
errorRegenerator: new CustomRegenerator(),
started(): void {
console.log('NetworkBroker started successfully.');
},
});
export const broker = new NetworkBroker(network);
registry: {
strategy: BALANCE_STRATEGY,
preferLocal: BALANCE_PREFER_LOCAL !== 'false',
},
requestTimeout: parseInt(REQUEST_TIMEOUT) * 1000,
retryPolicy: {
enabled: RETRY_ENABLED === 'true',
retries: parseInt(RETRY_RETRIES),
delay: parseInt(RETRY_DELAY),
maxDelay: parseInt(RETRY_MAX_DELAY),
factor: parseInt(RETRY_FACTOR),
check: (err: any): boolean => err && !!err.retryable,
},
maxCallLevel: 100,
heartbeatInterval: parseInt(HEARTBEAT_INTERVAL),
heartbeatTimeout: parseInt(HEARTBEAT_TIMEOUT),
// circuitBreaker: {
// enabled: false,
// threshold: 0.5,
// windowTime: 60,
// minRequestCount: 20,
// halfOpenTime: 10 * 1000,
// check: (err: any): boolean => err && err.code >= 500,
// },
bulkhead: {
enabled: BULKHEAD_ENABLED === 'true',
concurrency: parseInt(BULKHEAD_CONCURRENCY),
maxQueueSize: parseInt(BULKHEAD_MAX_QUEUE_SIZE),
},
errorRegenerator: new CustomRegenerator(),
started(): void {
console.log('NetworkBroker started successfully.');
},
...options,
});
return new NetworkBroker(network);
}

Loading…
Cancel
Save