fix: Omnichannel queue processing (#35112)

Co-authored-by: Kevin Aleman <11577696+KevLehman@users.noreply.github.com>
backport-7.3.1-35112
dionisio-bot[bot] 11 months ago committed by GitHub
parent be0bbd5c0d
commit b7905dfebe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 11
      .changeset/old-schools-build.md
  2. 6
      apps/meteor/ee/app/livechat-enterprise/server/hooks/checkAgentBeforeTakeInquiry.ts
  3. 4
      apps/meteor/ee/app/livechat-enterprise/server/lib/Helper.ts
  4. 45
      apps/meteor/server/services/omnichannel/queue.ts
  5. 70
      apps/meteor/tests/unit/server/services/omnichannel/queue.tests.ts
  6. 10
      packages/model-typings/src/models/ILivechatInquiryModel.ts
  7. 27
      packages/models/src/models/LivechatInquiry.ts

@ -0,0 +1,11 @@
---
"@rocket.chat/meteor": patch
"@rocket.chat/model-typings": patch
"@rocket.chat/models": patch
---
Fixes the queue processing of Omnichannel's waiting queue focusing on 3 main areas:
- Changes the way we fetch the queue list to not append the public queue by default. This makes the server to not run the public queue always (as it is now) even if there was no work to be done.
- Changes how the queue executes: previously, it was executed in a kind of chain: We fetched a list of "queues", then we took one, processed it, and after that we scheduled the next run, which could take some time. Now, every TIMEOUT, server will try to process all the queues, 1 by 1, and then schedule the next run for all queues after TIMEOUT. This should speed up chat assignment and reduce waiting time when waiting queue is enabled.
- Removes the unlockAndRequeue and replcaes it with just unlock. This change shouldn't be noticeable. The original idea of the requeueing was to iterate over the inquiries when 1 wasn't being able to be taken. Idea was to avoid blocking the queue by rotating them instead of fetching the same until it gets routed, however this never worked cause we never modified the global sorting for the inquiries and it kept using the ts as the sorting, which returned always the oldest and ignored the requeing. So we're removing those extra steps as well.

@ -27,15 +27,18 @@ const validateMaxChats = async ({
};
}) => {
if (!inquiry?._id || !agent?.agentId) {
cbLogger.debug('No inquiry or agent provided');
throw new Error('No inquiry or agent provided');
}
const { agentId } = agent;
if (!(await Livechat.checkOnlineAgents(undefined, agent))) {
cbLogger.debug('Provided agent is not online');
throw new Error('Provided agent is not online');
}
if (!settings.get('Livechat_waiting_queue')) {
cbLogger.info(`Chat can be taken by Agent ${agentId}: waiting queue is disabled`);
return agent;
}
@ -58,11 +61,14 @@ const validateMaxChats = async ({
const user = await Users.getAgentAndAmountOngoingChats(agentId);
if (!user) {
cbLogger.debug({ msg: 'No valid agent found', agentId });
throw new Error('No valid agent found');
}
const { queueInfo: { chats = 0 } = {} } = user;
const maxChats = typeof maxNumberSimultaneousChat === 'number' ? maxNumberSimultaneousChat : parseInt(maxNumberSimultaneousChat, 10);
cbLogger.debug({ msg: 'Validating agent is within max number of chats', agentId, user, maxChats });
if (maxChats <= chats) {
await callbacks.run('livechat.onMaxNumberSimultaneousChatsReached', inquiry);
throw new Error('error-max-number-simultaneous-chats-reached');

@ -38,7 +38,7 @@ export const getMaxNumberSimultaneousChat = async ({ agentId, departmentId }: {
const department = await LivechatDepartmentRaw.findOneById(departmentId);
const { maxNumberSimultaneousChat = 0 } = department || { maxNumberSimultaneousChat: 0 };
if (maxNumberSimultaneousChat > 0) {
return maxNumberSimultaneousChat;
return Number(maxNumberSimultaneousChat);
}
}
@ -46,7 +46,7 @@ export const getMaxNumberSimultaneousChat = async ({ agentId, departmentId }: {
const user = await Users.getAgentInfo(agentId, settings.get('Livechat_show_agent_info'));
const { livechat: { maxNumberSimultaneousChat = 0 } = {} } = user || {};
if (maxNumberSimultaneousChat > 0) {
return maxNumberSimultaneousChat;
return Number(maxNumberSimultaneousChat);
}
}

@ -2,6 +2,7 @@ import { ServiceStarter } from '@rocket.chat/core-services';
import { type InquiryWithAgentInfo, type IOmnichannelQueue } from '@rocket.chat/core-typings';
import { License } from '@rocket.chat/license';
import { LivechatInquiry, LivechatRooms } from '@rocket.chat/models';
import { tracerSpan } from '@rocket.chat/tracing';
import { queueLogger } from './logger';
import { getOmniChatSortQuery } from '../../../app/livechat/lib/inquiries';
@ -26,8 +27,6 @@ export class OmnichannelQueue implements IOmnichannelQueue {
private running = false;
private queues: (string | undefined)[] = [];
private delay() {
const timeout = settings.get<number>('Omnichannel_queue_delay_timeout') ?? 5;
return timeout < 1 ? DEFAULT_RACE_TIMEOUT : timeout * 1000;
@ -76,17 +75,7 @@ export class OmnichannelQueue implements IOmnichannelQueue {
}
private async getActiveQueues() {
// undefined = public queue(without department)
return ([undefined] as typeof this.queues).concat(await LivechatInquiry.getDistinctQueuedDepartments({}));
}
private async nextQueue() {
if (!this.queues.length) {
queueLogger.debug('No more registered queues. Refreshing');
this.queues = await this.getActiveQueues();
}
return this.queues.shift();
return (await LivechatInquiry.getDistinctQueuedDepartments({})).map(({ _id }) => _id);
}
private async execute() {
@ -101,16 +90,20 @@ export class OmnichannelQueue implements IOmnichannelQueue {
return;
}
const queue = await this.nextQueue();
const queueDelayTimeout = this.delay();
queueLogger.debug(`Executing queue ${queue || 'Public'} with timeout of ${queueDelayTimeout}`);
void this.checkQueue(queue).catch((e) => {
queueLogger.error(e);
});
// We still go 1 by 1, but we go with every queue every cycle instead of just 1 queue per cycle
// And we get tracing :)
const queues = await this.getActiveQueues();
for await (const queue of queues) {
await tracerSpan(
'omnichannel.queue',
{ attributes: { workerTime: new Date().toISOString(), queue: queue || 'Public' }, root: true },
() => this.checkQueue(queue),
);
}
this.scheduleExecution();
}
private async checkQueue(queue: string | undefined) {
private async checkQueue(queue: string | null) {
queueLogger.debug(`Processing items for queue ${queue || 'Public'}`);
try {
const nextInquiry = await LivechatInquiry.findNextAndLock(getOmniChatSortQuery(getInquirySortMechanismSetting()), queue);
@ -122,12 +115,8 @@ export class OmnichannelQueue implements IOmnichannelQueue {
const result = await this.processWaitingQueue(queue, nextInquiry as InquiryWithAgentInfo);
if (!result) {
// Note: this removes the "one-shot" behavior of queue, allowing it to take a conversation again in the future
// And sorting them by _updatedAt: -1 will make it so that the oldest inquiries are taken first
// preventing us from playing with the same inquiry over and over again
queueLogger.debug(`Inquiry ${nextInquiry._id} not taken. Unlocking and re-queueing`);
const updatedQueue = await LivechatInquiry.unlockAndQueue(nextInquiry._id);
return updatedQueue;
return await LivechatInquiry.unlock(nextInquiry._id);
}
queueLogger.debug(`Inquiry ${nextInquiry._id} taken successfully. Unlocking`);
@ -144,8 +133,6 @@ export class OmnichannelQueue implements IOmnichannelQueue {
queue: queue || 'Public',
err: e,
});
} finally {
this.scheduleExecution();
}
}
@ -217,7 +204,7 @@ export class OmnichannelQueue implements IOmnichannelQueue {
return true;
}
private async processWaitingQueue(department: string | undefined, inquiry: InquiryWithAgentInfo) {
private async processWaitingQueue(department: string | null, inquiry: InquiryWithAgentInfo) {
const queue = department || 'Public';
queueLogger.debug(`Processing inquiry ${inquiry._id} from queue ${queue}`);

@ -29,7 +29,6 @@ const models = {
unlockAll: Sinon.stub(),
findNextAndLock: Sinon.stub(),
getDistinctQueuedDepartments: Sinon.stub(),
unlockAndQueue: Sinon.stub(),
unlock: Sinon.stub(),
removeByRoomId: Sinon.stub(),
takeInquiry: Sinon.stub(),
@ -93,47 +92,23 @@ describe('Omnichannel Queue processor', () => {
after(() => {
models.LivechatInquiry.getDistinctQueuedDepartments.reset();
});
it('should return [undefined] when there is no other queues', async () => {
models.LivechatInquiry.getDistinctQueuedDepartments.returns([]);
it('should return empty array when there are no active queues', async () => {
models.LivechatInquiry.getDistinctQueuedDepartments.resolves([]);
const queue = new OmnichannelQueue();
expect(await queue.getActiveQueues()).to.be.eql([undefined]);
expect(await queue.getActiveQueues()).to.be.eql([]);
});
it('should return [undefined, department1] when department1 is an active queue', async () => {
models.LivechatInquiry.getDistinctQueuedDepartments.returns(['department1']);
it('should return [department1] when department1 is an active queue', async () => {
models.LivechatInquiry.getDistinctQueuedDepartments.resolves([{ _id: 'department1' }]);
const queue = new OmnichannelQueue();
expect(await queue.getActiveQueues()).to.be.eql([undefined, 'department1']);
expect(await queue.getActiveQueues()).to.be.eql(['department1']);
});
});
describe('nextQueue', () => {
after(() => {
models.LivechatInquiry.getDistinctQueuedDepartments.reset();
});
it('should return undefined when thats the only queue', async () => {
models.LivechatInquiry.getDistinctQueuedDepartments.returns([]);
const queue = new OmnichannelQueue();
queue.getActiveQueues = Sinon.stub().returns([undefined]);
expect(await queue.nextQueue()).to.be.undefined;
});
it('should return undefined, and then the following queue', async () => {
models.LivechatInquiry.getDistinctQueuedDepartments.returns(['department1']);
const queue = new OmnichannelQueue();
queue.getActiveQueues = Sinon.stub().returns([undefined, 'department1']);
expect(await queue.nextQueue()).to.be.undefined;
expect(await queue.nextQueue()).to.be.equal('department1');
});
it('should not call getActiveQueues if there are still queues to process', async () => {
models.LivechatInquiry.getDistinctQueuedDepartments.returns(['department1']);
it('should return [null, department1] when department1 is an active queue and there are elements on public queue', async () => {
models.LivechatInquiry.getDistinctQueuedDepartments.resolves([{ _id: 'department1' }, { _id: null }]);
const queue = new OmnichannelQueue();
queue.queues = ['department1'];
queue.getActiveQueues = Sinon.stub();
expect(await queue.nextQueue()).to.be.equal('department1');
expect(queue.getActiveQueues.notCalled).to.be.true;
expect(await queue.getActiveQueues()).to.be.eql(['department1', null]);
});
});
describe('checkQueue', () => {
@ -141,7 +116,6 @@ describe('Omnichannel Queue processor', () => {
beforeEach(() => {
models.LivechatInquiry.findNextAndLock.resetHistory();
models.LivechatInquiry.takeInquiry.resetHistory();
models.LivechatInquiry.unlockAndQueue.resetHistory();
models.LivechatInquiry.unlock.resetHistory();
queueLogger.error.resetHistory();
queueLogger.info.resetHistory();
@ -153,7 +127,6 @@ describe('Omnichannel Queue processor', () => {
after(() => {
models.LivechatInquiry.findNextAndLock.reset();
models.LivechatInquiry.takeInquiry.reset();
models.LivechatInquiry.unlockAndQueue.reset();
models.LivechatInquiry.unlock.reset();
queueLogger.error.reset();
queueLogger.info.reset();
@ -178,7 +151,7 @@ describe('Omnichannel Queue processor', () => {
expect(models.LivechatInquiry.findNextAndLock.calledOnce).to.be.true;
expect(queue.processWaitingQueue.calledOnce).to.be.true;
});
it('should call unlockAndRequeue when the inquiry could not be processed', async () => {
it('should call unlock when the inquiry could not be processed', async () => {
models.LivechatInquiry.findNextAndLock.returns(mockedInquiry);
const queue = new OmnichannelQueue();
@ -187,7 +160,7 @@ describe('Omnichannel Queue processor', () => {
await queue.checkQueue();
expect(queue.processWaitingQueue.calledOnce).to.be.true;
expect(models.LivechatInquiry.unlockAndQueue.calledOnce).to.be.true;
expect(models.LivechatInquiry.unlock.calledOnce).to.be.true;
});
it('should unlock the inquiry when it was processed succesfully', async () => {
models.LivechatInquiry.findNextAndLock.returns(mockedInquiry);
@ -209,21 +182,6 @@ describe('Omnichannel Queue processor', () => {
expect(queueLogger.error.calledOnce).to.be.true;
});
it('should call execute after finishing', async () => {
models.LivechatInquiry.findNextAndLock.returns(mockedInquiry);
const queue = new OmnichannelQueue();
queue.processWaitingQueue = Sinon.stub().returns(true);
queue.execute = Sinon.stub();
queue.delay = Sinon.stub().returns(100);
await queue.checkQueue();
clock.tick(100);
expect(queue.execute.calledOnce).to.be.true;
expect(models.LivechatInquiry.unlock.calledOnce).to.be.true;
expect(queue.execute.calledAfter(models.LivechatInquiry.unlock)).to.be.true;
expect(queue.execute.calledOnce).to.be.true;
});
});
describe('shouldStart', () => {
beforeEach(() => {
@ -414,11 +372,11 @@ describe('Omnichannel Queue processor', () => {
const queue = new OmnichannelQueue();
queue.running = true;
queue.nextQueue = Sinon.stub();
queue.getActiveQueues = Sinon.stub().resolves([null]);
await queue.execute();
expect(queue.nextQueue.calledOnce).to.be.true;
expect(queueLogger.debug.calledWith('Executing queue Public with timeout of 5000')).to.be.true;
expect(queue.getActiveQueues.calledOnce).to.be.true;
expect(queueLogger.debug.calledWith('Processing items for queue Public')).to.be.true;
});
});
describe('start', () => {

@ -1,5 +1,5 @@
import type { IMessage, ILivechatInquiryRecord, LivechatInquiryStatus } from '@rocket.chat/core-typings';
import type { FindOptions, DistinctOptions, Document, UpdateResult, DeleteResult, FindCursor, DeleteOptions } from 'mongodb';
import type { FindOptions, Document, UpdateResult, DeleteResult, FindCursor, DeleteOptions, AggregateOptions } from 'mongodb';
import type { IBaseModel } from './IBaseModel';
@ -13,12 +13,14 @@ export interface ILivechatInquiryModel extends IBaseModel<ILivechatInquiryRecord
rid: string,
options?: FindOptions<T extends ILivechatInquiryRecord ? ILivechatInquiryRecord : T>,
): Promise<T | null>;
getDistinctQueuedDepartments(options: DistinctOptions): Promise<(string | undefined)[]>;
getDistinctQueuedDepartments(options: AggregateOptions): Promise<{ _id: string | null }[]>;
setDepartmentByInquiryId(inquiryId: string, department: string): Promise<ILivechatInquiryRecord | null>;
setLastMessageByRoomId(rid: ILivechatInquiryRecord['rid'], message: IMessage): Promise<ILivechatInquiryRecord | null>;
findNextAndLock(queueSortBy: FindOptions<ILivechatInquiryRecord>['sort'], department?: string): Promise<ILivechatInquiryRecord | null>;
findNextAndLock(
queueSortBy: FindOptions<ILivechatInquiryRecord>['sort'],
department: string | null,
): Promise<ILivechatInquiryRecord | null>;
unlock(inquiryId: string): Promise<UpdateResult>;
unlockAndQueue(inquiryId: string): Promise<UpdateResult>;
unlockAll(): Promise<UpdateResult | Document>;
getCurrentSortedQueueAsync(props: {
inquiryId?: string;

@ -6,7 +6,6 @@ import type {
Db,
Document,
FindOptions,
DistinctOptions,
UpdateResult,
Filter,
DeleteResult,
@ -14,6 +13,7 @@ import type {
FindCursor,
UpdateFilter,
DeleteOptions,
AggregateOptions,
WithId,
} from 'mongodb';
@ -133,8 +133,20 @@ export class LivechatInquiryRaw extends BaseRaw<ILivechatInquiryRecord> implemen
return this.find({ 'v.token': token }, { projection: { _id: 1 } });
}
getDistinctQueuedDepartments(options: DistinctOptions): Promise<(string | undefined)[]> {
return this.col.distinct('department', { status: LivechatInquiryStatus.QUEUED }, options);
getDistinctQueuedDepartments(options: AggregateOptions): Promise<{ _id: string | null }[]> {
return this.col
.aggregate<{ _id: string | null }>(
[
{ $match: { status: LivechatInquiryStatus.QUEUED } },
{
$group: {
_id: '$department',
},
},
],
options,
)
.toArray();
}
async setDepartmentByInquiryId(inquiryId: string, department: string): Promise<ILivechatInquiryRecord | null> {
@ -147,7 +159,7 @@ export class LivechatInquiryRaw extends BaseRaw<ILivechatInquiryRecord> implemen
async findNextAndLock(
queueSortBy: FindOptions<ILivechatInquiryRecord>['sort'],
department?: string,
department: string | null,
): Promise<ILivechatInquiryRecord | null> {
const date = new Date();
return this.findOneAndUpdate(
@ -183,13 +195,6 @@ export class LivechatInquiryRaw extends BaseRaw<ILivechatInquiryRecord> implemen
return this.updateOne({ _id: inquiryId }, { $unset: { locked: 1, lockedAt: 1 } });
}
async unlockAndQueue(inquiryId: string): Promise<UpdateResult> {
return this.updateOne(
{ _id: inquiryId },
{ $unset: { locked: 1, lockedAt: 1 }, $set: { status: LivechatInquiryStatus.QUEUED, queuedAt: new Date() } },
);
}
async unlockAll(): Promise<UpdateResult | Document> {
return this.updateMany(
{ locked: { $exists: true } },

Loading…
Cancel
Save