fix: convert network broker stopped lifecycle method to async (#31927)

Co-authored-by: Diego Sampaio <8591547+sampaiodiego@users.noreply.github.com>
pull/31810/head^2
Marcos Spessatto Defendi 2 years ago committed by GitHub
parent 703cb7ebb0
commit b876e4e0fc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 6
      .changeset/slimy-clocks-argue.md
  2. 4
      apps/meteor/app/search/server/search.internalService.ts
  3. 4
      apps/meteor/ee/server/NetworkBroker.ts
  4. 2
      apps/meteor/ee/server/startup/services.ts
  5. 39
      apps/meteor/ee/tests/unit/server/NetworkBroker.tests.ts
  6. 10
      apps/meteor/tests/mocks/server/BrokerMocked.ts
  7. 2
      apps/meteor/tests/unit/server/services/messages/hooks/BeforeSaveCheckMAC.tests.ts
  8. 4
      packages/core-services/src/LocalBroker.ts
  9. 4
      packages/core-services/src/lib/Api.ts
  10. 2
      packages/core-services/src/types/IApiService.ts
  11. 2
      packages/core-services/src/types/IBroker.ts

@ -0,0 +1,6 @@
---
"@rocket.chat/meteor": patch
"@rocket.chat/core-services": patch
---
`stopped` lifecycle method was unexpectedly synchronous when using microservices, causing our code to create race conditions.

@ -36,10 +36,10 @@ class Search extends ServiceClassInternal {
const service = new Search();
settings.watch('Search.Provider', () => {
settings.watch('Search.Provider', async () => {
if (searchProviderService.activeProvider?.on) {
api.registerService(service);
} else {
api.destroyService(service);
await api.destroyService(service);
}
});

@ -71,12 +71,12 @@ export class NetworkBroker implements IBroker {
return this.broker.call(method, data);
}
destroyService(instance: IServiceClass): void {
async destroyService(instance: IServiceClass): Promise<void> {
const name = instance.getName();
if (!name) {
return;
}
void this.broker.destroyService(name);
await this.broker.destroyService(name);
instance.removeAllListeners();
}

@ -33,7 +33,7 @@ if (!License.hasValidLicense()) {
void License.onLicense('federation', async () => {
const federationServiceEE = await FederationServiceEE.createFederationService();
if (federationService) {
api.destroyService(federationService);
await api.destroyService(federationService);
}
api.registerService(federationServiceEE);
});

@ -0,0 +1,39 @@
import { ServiceClass } from '@rocket.chat/core-services';
import { expect } from 'chai';
import sinon from 'sinon';
import { BrokerMocked } from '../../../../tests/mocks/server/BrokerMocked';
import { NetworkBroker } from '../../../server/NetworkBroker';
class DelayedStopBroker extends BrokerMocked {
async destroyService(name: string) {
const instance = this.services.get(name);
await new Promise((resolve) => setTimeout(resolve, 1000));
await instance.stopped();
await super.destroyService(name);
}
}
const broker = new NetworkBroker(new DelayedStopBroker() as any);
describe('NetworkBroker', () => {
it('should wait services to be fully destroyed', async () => {
const stoppedStub = sinon.stub();
const instance = new (class extends ServiceClass {
name = 'test';
async stopped() {
stoppedStub();
}
})();
broker.createService(instance);
await broker.destroyService(instance);
expect(stoppedStub.called).to.be.true;
});
});

@ -1,12 +1,14 @@
export class BrokerMocked {
actions: Record<string, (...params: unknown[]) => Promise<unknown>> = {};
destroyService(): void {
// no op
services: Map<string, any> = new Map();
async destroyService(name: string): Promise<void> {
this.services.delete(name);
}
createService(): void {
// no op
createService(instance: any): void {
this.services.set(instance.name, instance);
}
async call(method: string, data: any): Promise<any> {

@ -36,7 +36,7 @@ const broker = new BrokerMocked();
describe('Check MAC', () => {
before(() => {
api.setBroker(broker);
api.setBroker(broker as any);
});
it('should do nothing if not omnichannel room', async () => {

@ -34,7 +34,7 @@ export class LocalBroker implements IBroker {
return this.call(method, data);
}
destroyService(instance: ServiceClass): void {
async destroyService(instance: ServiceClass): Promise<void> {
const namespace = instance.getName();
instance.getEvents().forEach((event) => event.listeners.forEach((listener) => this.events.removeListener(event.eventName, listener)));
@ -51,7 +51,7 @@ export class LocalBroker implements IBroker {
this.methods.delete(`${namespace}.${method}`);
}
instance.removeAllListeners();
instance.stopped();
await instance.stopped();
}
createService(instance: IServiceClass): void {

@ -15,13 +15,13 @@ export class Api implements IApiService {
this.services.forEach((service) => this.broker?.createService(service));
}
destroyService(instance: IServiceClass): void {
async destroyService(instance: IServiceClass): Promise<void> {
if (!this.services.has(instance)) {
return;
}
if (this.broker) {
this.broker.destroyService(instance);
await this.broker.destroyService(instance);
}
this.services.delete(instance);

@ -5,7 +5,7 @@ import type { IServiceClass } from './ServiceClass';
export interface IApiService {
setBroker(broker: IBroker): void;
destroyService(instance: IServiceClass): void;
destroyService(instance: IServiceClass): Promise<void>;
registerService(instance: IServiceClass): void;

@ -48,7 +48,7 @@ export interface IServiceMetrics {
export interface IBroker {
metrics?: IServiceMetrics;
destroyService(service: IServiceClass): void;
destroyService(service: IServiceClass): Promise<void>;
createService(service: IServiceClass, serviceDependencies?: string[]): void;
call(method: string, data: any): Promise<any>;
waitAndCall(method: string, data: any): Promise<any>;

Loading…
Cancel
Save