fix: Omnichannel queue starting multiple times due to race condition (#34062)

Co-authored-by: Pierre Lehnen <55164754+pierre-lehnen-rc@users.noreply.github.com>
pull/34109/head^2
Aleksander Nicacio da Silva 1 year ago committed by GitHub
parent 18cea50a5b
commit 072a749470
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 5
      .changeset/green-shirts-fold.md
  2. 43
      apps/meteor/server/services/omnichannel/queue.ts
  3. 6
      apps/meteor/server/services/omnichannel/service.ts
  4. 1
      packages/core-services/src/index.ts
  5. 68
      packages/core-services/src/lib/ServiceStarter.ts
  6. 91
      packages/core-services/tests/ServiceStarter.test.ts

@ -0,0 +1,5 @@
---
"@rocket.chat/meteor": patch
---
Fixes condition causing Omnichannel queue to start more than once.

@ -1,3 +1,4 @@
import { ServiceStarter } from '@rocket.chat/core-services';
import { type InquiryWithAgentInfo, type IOmnichannelQueue } from '@rocket.chat/core-typings';
import { License } from '@rocket.chat/license';
import { LivechatInquiry, LivechatRooms } from '@rocket.chat/models';
@ -11,6 +12,17 @@ import { settings } from '../../../app/settings/server';
const DEFAULT_RACE_TIMEOUT = 5000;
export class OmnichannelQueue implements IOmnichannelQueue {
private serviceStarter: ServiceStarter;
private timeoutHandler: ReturnType<typeof setTimeout> | null = null;
constructor() {
this.serviceStarter = new ServiceStarter(
() => this._start(),
() => this._stop(),
);
}
private running = false;
private queues: (string | undefined)[] = [];
@ -24,7 +36,7 @@ export class OmnichannelQueue implements IOmnichannelQueue {
return this.running;
}
async start() {
private async _start() {
if (this.running) {
return;
}
@ -37,7 +49,7 @@ export class OmnichannelQueue implements IOmnichannelQueue {
return this.execute();
}
async stop() {
private async _stop() {
if (!this.running) {
return;
}
@ -45,9 +57,23 @@ export class OmnichannelQueue implements IOmnichannelQueue {
await LivechatInquiry.unlockAll();
this.running = false;
if (this.timeoutHandler !== null) {
clearTimeout(this.timeoutHandler);
this.timeoutHandler = null;
}
queueLogger.info('Service stopped');
}
async start() {
return this.serviceStarter.start();
}
async stop() {
return this.serviceStarter.stop();
}
private async getActiveQueues() {
// undefined = public queue(without department)
return ([undefined] as typeof this.queues).concat(await LivechatInquiry.getDistinctQueuedDepartments({}));
@ -118,10 +144,21 @@ export class OmnichannelQueue implements IOmnichannelQueue {
err: e,
});
} finally {
setTimeout(this.execute.bind(this), this.delay());
this.scheduleExecution();
}
}
private scheduleExecution(): void {
if (this.timeoutHandler !== null) {
return;
}
this.timeoutHandler = setTimeout(() => {
this.timeoutHandler = null;
return this.execute();
}, this.delay());
}
async shouldStart() {
if (!settings.get('Livechat_enabled')) {
void this.stop();

@ -33,11 +33,7 @@ export class OmnichannelService extends ServiceClassInternal implements IOmnicha
}
async started() {
settings.watch<boolean>('Livechat_enabled', (enabled) => {
void (enabled && RoutingManager.isMethodSet() ? this.queueWorker.shouldStart() : this.queueWorker.stop());
});
settings.watch<string>('Livechat_Routing_Method', async () => {
settings.watchMultiple(['Livechat_enabled', 'Livechat_Routing_Method'], () => {
this.queueWorker.shouldStart();
});

@ -78,6 +78,7 @@ export {
} from './types/IOmnichannelAnalyticsService';
export { getConnection, getTrashCollection } from './lib/mongo';
export { ServiceStarter } from './lib/ServiceStarter';
export {
AutoUpdateRecord,

@ -0,0 +1,68 @@
// This class is used to manage calls to a service's .start and .stop functions
// Specifically for cases where the start function has different conditions that may cause the service to actually start or not,
// or when the start process can take a while to complete
// Using this class, you ensure that calls to .start and .stop will be chained, so you avoid race conditions
// At the same time, it prevents those functions from running more times than necessary if there are several calls to them (for example when loading setting values)
export class ServiceStarter {
private lock = Promise.resolve();
private currentCall?: 'start' | 'stop';
private nextCall?: 'start' | 'stop';
private starterFn: () => Promise<void>;
private stopperFn?: () => Promise<void>;
constructor(starterFn: () => Promise<void>, stopperFn?: () => Promise<void>) {
this.starterFn = starterFn;
this.stopperFn = stopperFn;
}
private async checkStatus(): Promise<void> {
if (this.nextCall === 'start') {
return this.doCall('start');
}
if (this.nextCall === 'stop') {
return this.doCall('stop');
}
}
private async doCall(call: 'start' | 'stop'): Promise<void> {
this.nextCall = undefined;
this.currentCall = call;
try {
if (call === 'start') {
await this.starterFn();
} else if (this.stopperFn) {
await this.stopperFn();
}
} finally {
this.currentCall = undefined;
await this.checkStatus();
}
}
private async call(call: 'start' | 'stop'): Promise<void> {
// If something is already chained to run after the current call, it's okay to replace it with the new call
this.nextCall = call;
if (this.currentCall) {
return this.lock;
}
this.lock = this.checkStatus();
return this.lock;
}
async start(): Promise<void> {
return this.call('start');
}
async stop(): Promise<void> {
return this.call('stop');
}
async wait(): Promise<void> {
return this.lock;
}
}

@ -0,0 +1,91 @@
import { ServiceStarter } from '../src/lib/ServiceStarter';
const wait = (time: number) => {
return new Promise((resolve) => {
setTimeout(() => resolve(undefined), time);
});
};
describe('ServiceStarter', () => {
it('should call the starterFn and stopperFn when calling .start and .stop', async () => {
const start = jest.fn();
const stop = jest.fn();
const instance = new ServiceStarter(start, stop);
expect(start).not.toHaveBeenCalled();
expect(stop).not.toHaveBeenCalled();
await instance.start();
expect(start).toHaveBeenCalled();
expect(stop).not.toHaveBeenCalled();
start.mockReset();
await instance.stop();
expect(start).not.toHaveBeenCalled();
expect(stop).toHaveBeenCalled();
});
it('should only call .start for the second time after the initial call has finished running', async () => {
let running = false;
const start = jest.fn(async () => {
expect(running).toBe(false);
running = true;
await wait(100);
running = false;
});
const stop = jest.fn();
const instance = new ServiceStarter(start, stop);
void instance.start();
void instance.start();
await instance.wait();
expect(start).toHaveBeenCalledTimes(2);
expect(stop).not.toHaveBeenCalled();
});
it('should chain up to two calls to .start', async () => {
const start = jest.fn(async () => {
await wait(100);
});
const stop = jest.fn();
const instance = new ServiceStarter(start, stop);
void instance.start();
void instance.start();
void instance.start();
void instance.start();
await instance.wait();
expect(start).toHaveBeenCalledTimes(2);
expect(stop).not.toHaveBeenCalled();
});
it('should skip the chained calls to .start if .stop is called', async () => {
const start = jest.fn(async () => {
await wait(100);
});
const stop = jest.fn();
const instance = new ServiceStarter(start, stop);
void instance.start();
void instance.start();
void instance.start();
void instance.stop();
await instance.wait();
expect(start).toHaveBeenCalledTimes(1);
expect(stop).toHaveBeenCalledTimes(1);
});
});
Loading…
Cancel
Save