diff --git a/.changeset/old-schools-build.md b/.changeset/old-schools-build.md new file mode 100644 index 00000000000..d318748ef18 --- /dev/null +++ b/.changeset/old-schools-build.md @@ -0,0 +1,11 @@ +--- +"@rocket.chat/meteor": patch +"@rocket.chat/model-typings": patch +"@rocket.chat/models": patch +--- + +Fixes the queue processing of Omnichannel's waiting queue focusing on 3 main areas: +- Changes the way we fetch the queue list to not append the public queue by default. This makes the server to not run the public queue always (as it is now) even if there was no work to be done. +- Changes how the queue executes: previously, it was executed in a kind of chain: We fetched a list of "queues", then we took one, processed it, and after that we scheduled the next run, which could take some time. Now, every TIMEOUT, server will try to process all the queues, 1 by 1, and then schedule the next run for all queues after TIMEOUT. This should speed up chat assignment and reduce waiting time when waiting queue is enabled. +- Removes the unlockAndRequeue and replcaes it with just unlock. This change shouldn't be noticeable. The original idea of the requeueing was to iterate over the inquiries when 1 wasn't being able to be taken. Idea was to avoid blocking the queue by rotating them instead of fetching the same until it gets routed, however this never worked cause we never modified the global sorting for the inquiries and it kept using the ts as the sorting, which returned always the oldest and ignored the requeing. So we're removing those extra steps as well. + diff --git a/apps/meteor/ee/app/livechat-enterprise/server/hooks/checkAgentBeforeTakeInquiry.ts b/apps/meteor/ee/app/livechat-enterprise/server/hooks/checkAgentBeforeTakeInquiry.ts index 55adb05cbd1..9722cf25b20 100644 --- a/apps/meteor/ee/app/livechat-enterprise/server/hooks/checkAgentBeforeTakeInquiry.ts +++ b/apps/meteor/ee/app/livechat-enterprise/server/hooks/checkAgentBeforeTakeInquiry.ts @@ -27,15 +27,18 @@ const validateMaxChats = async ({ }; }) => { if (!inquiry?._id || !agent?.agentId) { + cbLogger.debug('No inquiry or agent provided'); throw new Error('No inquiry or agent provided'); } const { agentId } = agent; if (!(await Livechat.checkOnlineAgents(undefined, agent))) { + cbLogger.debug('Provided agent is not online'); throw new Error('Provided agent is not online'); } if (!settings.get('Livechat_waiting_queue')) { + cbLogger.info(`Chat can be taken by Agent ${agentId}: waiting queue is disabled`); return agent; } @@ -58,11 +61,14 @@ const validateMaxChats = async ({ const user = await Users.getAgentAndAmountOngoingChats(agentId); if (!user) { + cbLogger.debug({ msg: 'No valid agent found', agentId }); throw new Error('No valid agent found'); } const { queueInfo: { chats = 0 } = {} } = user; const maxChats = typeof maxNumberSimultaneousChat === 'number' ? maxNumberSimultaneousChat : parseInt(maxNumberSimultaneousChat, 10); + + cbLogger.debug({ msg: 'Validating agent is within max number of chats', agentId, user, maxChats }); if (maxChats <= chats) { await callbacks.run('livechat.onMaxNumberSimultaneousChatsReached', inquiry); throw new Error('error-max-number-simultaneous-chats-reached'); diff --git a/apps/meteor/ee/app/livechat-enterprise/server/lib/Helper.ts b/apps/meteor/ee/app/livechat-enterprise/server/lib/Helper.ts index cb8503d82e7..ec2c9d40d68 100644 --- a/apps/meteor/ee/app/livechat-enterprise/server/lib/Helper.ts +++ b/apps/meteor/ee/app/livechat-enterprise/server/lib/Helper.ts @@ -38,7 +38,7 @@ export const getMaxNumberSimultaneousChat = async ({ agentId, departmentId }: { const department = await LivechatDepartmentRaw.findOneById(departmentId); const { maxNumberSimultaneousChat = 0 } = department || { maxNumberSimultaneousChat: 0 }; if (maxNumberSimultaneousChat > 0) { - return maxNumberSimultaneousChat; + return Number(maxNumberSimultaneousChat); } } @@ -46,7 +46,7 @@ export const getMaxNumberSimultaneousChat = async ({ agentId, departmentId }: { const user = await Users.getAgentInfo(agentId, settings.get('Livechat_show_agent_info')); const { livechat: { maxNumberSimultaneousChat = 0 } = {} } = user || {}; if (maxNumberSimultaneousChat > 0) { - return maxNumberSimultaneousChat; + return Number(maxNumberSimultaneousChat); } } diff --git a/apps/meteor/server/services/omnichannel/queue.ts b/apps/meteor/server/services/omnichannel/queue.ts index ed7420e78d9..31a159492cf 100644 --- a/apps/meteor/server/services/omnichannel/queue.ts +++ b/apps/meteor/server/services/omnichannel/queue.ts @@ -2,6 +2,7 @@ import { ServiceStarter } from '@rocket.chat/core-services'; import { type InquiryWithAgentInfo, type IOmnichannelQueue } from '@rocket.chat/core-typings'; import { License } from '@rocket.chat/license'; import { LivechatInquiry, LivechatRooms } from '@rocket.chat/models'; +import { tracerSpan } from '@rocket.chat/tracing'; import { queueLogger } from './logger'; import { getOmniChatSortQuery } from '../../../app/livechat/lib/inquiries'; @@ -26,8 +27,6 @@ export class OmnichannelQueue implements IOmnichannelQueue { private running = false; - private queues: (string | undefined)[] = []; - private delay() { const timeout = settings.get('Omnichannel_queue_delay_timeout') ?? 5; return timeout < 1 ? DEFAULT_RACE_TIMEOUT : timeout * 1000; @@ -76,17 +75,7 @@ export class OmnichannelQueue implements IOmnichannelQueue { } private async getActiveQueues() { - // undefined = public queue(without department) - return ([undefined] as typeof this.queues).concat(await LivechatInquiry.getDistinctQueuedDepartments({})); - } - - private async nextQueue() { - if (!this.queues.length) { - queueLogger.debug('No more registered queues. Refreshing'); - this.queues = await this.getActiveQueues(); - } - - return this.queues.shift(); + return (await LivechatInquiry.getDistinctQueuedDepartments({})).map(({ _id }) => _id); } private async execute() { @@ -101,16 +90,20 @@ export class OmnichannelQueue implements IOmnichannelQueue { return; } - const queue = await this.nextQueue(); - const queueDelayTimeout = this.delay(); - queueLogger.debug(`Executing queue ${queue || 'Public'} with timeout of ${queueDelayTimeout}`); - - void this.checkQueue(queue).catch((e) => { - queueLogger.error(e); - }); + // We still go 1 by 1, but we go with every queue every cycle instead of just 1 queue per cycle + // And we get tracing :) + const queues = await this.getActiveQueues(); + for await (const queue of queues) { + await tracerSpan( + 'omnichannel.queue', + { attributes: { workerTime: new Date().toISOString(), queue: queue || 'Public' }, root: true }, + () => this.checkQueue(queue), + ); + } + this.scheduleExecution(); } - private async checkQueue(queue: string | undefined) { + private async checkQueue(queue: string | null) { queueLogger.debug(`Processing items for queue ${queue || 'Public'}`); try { const nextInquiry = await LivechatInquiry.findNextAndLock(getOmniChatSortQuery(getInquirySortMechanismSetting()), queue); @@ -122,12 +115,8 @@ export class OmnichannelQueue implements IOmnichannelQueue { const result = await this.processWaitingQueue(queue, nextInquiry as InquiryWithAgentInfo); if (!result) { - // Note: this removes the "one-shot" behavior of queue, allowing it to take a conversation again in the future - // And sorting them by _updatedAt: -1 will make it so that the oldest inquiries are taken first - // preventing us from playing with the same inquiry over and over again queueLogger.debug(`Inquiry ${nextInquiry._id} not taken. Unlocking and re-queueing`); - const updatedQueue = await LivechatInquiry.unlockAndQueue(nextInquiry._id); - return updatedQueue; + return await LivechatInquiry.unlock(nextInquiry._id); } queueLogger.debug(`Inquiry ${nextInquiry._id} taken successfully. Unlocking`); @@ -144,8 +133,6 @@ export class OmnichannelQueue implements IOmnichannelQueue { queue: queue || 'Public', err: e, }); - } finally { - this.scheduleExecution(); } } @@ -217,7 +204,7 @@ export class OmnichannelQueue implements IOmnichannelQueue { return true; } - private async processWaitingQueue(department: string | undefined, inquiry: InquiryWithAgentInfo) { + private async processWaitingQueue(department: string | null, inquiry: InquiryWithAgentInfo) { const queue = department || 'Public'; queueLogger.debug(`Processing inquiry ${inquiry._id} from queue ${queue}`); diff --git a/apps/meteor/tests/unit/server/services/omnichannel/queue.tests.ts b/apps/meteor/tests/unit/server/services/omnichannel/queue.tests.ts index ce24b0ded64..8db9a9a3edf 100644 --- a/apps/meteor/tests/unit/server/services/omnichannel/queue.tests.ts +++ b/apps/meteor/tests/unit/server/services/omnichannel/queue.tests.ts @@ -29,7 +29,6 @@ const models = { unlockAll: Sinon.stub(), findNextAndLock: Sinon.stub(), getDistinctQueuedDepartments: Sinon.stub(), - unlockAndQueue: Sinon.stub(), unlock: Sinon.stub(), removeByRoomId: Sinon.stub(), takeInquiry: Sinon.stub(), @@ -93,47 +92,23 @@ describe('Omnichannel Queue processor', () => { after(() => { models.LivechatInquiry.getDistinctQueuedDepartments.reset(); }); - it('should return [undefined] when there is no other queues', async () => { - models.LivechatInquiry.getDistinctQueuedDepartments.returns([]); + it('should return empty array when there are no active queues', async () => { + models.LivechatInquiry.getDistinctQueuedDepartments.resolves([]); const queue = new OmnichannelQueue(); - expect(await queue.getActiveQueues()).to.be.eql([undefined]); + expect(await queue.getActiveQueues()).to.be.eql([]); }); - it('should return [undefined, department1] when department1 is an active queue', async () => { - models.LivechatInquiry.getDistinctQueuedDepartments.returns(['department1']); + it('should return [department1] when department1 is an active queue', async () => { + models.LivechatInquiry.getDistinctQueuedDepartments.resolves([{ _id: 'department1' }]); const queue = new OmnichannelQueue(); - expect(await queue.getActiveQueues()).to.be.eql([undefined, 'department1']); + expect(await queue.getActiveQueues()).to.be.eql(['department1']); }); - }); - describe('nextQueue', () => { - after(() => { - models.LivechatInquiry.getDistinctQueuedDepartments.reset(); - }); - it('should return undefined when thats the only queue', async () => { - models.LivechatInquiry.getDistinctQueuedDepartments.returns([]); - - const queue = new OmnichannelQueue(); - queue.getActiveQueues = Sinon.stub().returns([undefined]); - expect(await queue.nextQueue()).to.be.undefined; - }); - it('should return undefined, and then the following queue', async () => { - models.LivechatInquiry.getDistinctQueuedDepartments.returns(['department1']); - - const queue = new OmnichannelQueue(); - queue.getActiveQueues = Sinon.stub().returns([undefined, 'department1']); - expect(await queue.nextQueue()).to.be.undefined; - expect(await queue.nextQueue()).to.be.equal('department1'); - }); - it('should not call getActiveQueues if there are still queues to process', async () => { - models.LivechatInquiry.getDistinctQueuedDepartments.returns(['department1']); + it('should return [null, department1] when department1 is an active queue and there are elements on public queue', async () => { + models.LivechatInquiry.getDistinctQueuedDepartments.resolves([{ _id: 'department1' }, { _id: null }]); const queue = new OmnichannelQueue(); - queue.queues = ['department1']; - queue.getActiveQueues = Sinon.stub(); - - expect(await queue.nextQueue()).to.be.equal('department1'); - expect(queue.getActiveQueues.notCalled).to.be.true; + expect(await queue.getActiveQueues()).to.be.eql(['department1', null]); }); }); describe('checkQueue', () => { @@ -141,7 +116,6 @@ describe('Omnichannel Queue processor', () => { beforeEach(() => { models.LivechatInquiry.findNextAndLock.resetHistory(); models.LivechatInquiry.takeInquiry.resetHistory(); - models.LivechatInquiry.unlockAndQueue.resetHistory(); models.LivechatInquiry.unlock.resetHistory(); queueLogger.error.resetHistory(); queueLogger.info.resetHistory(); @@ -153,7 +127,6 @@ describe('Omnichannel Queue processor', () => { after(() => { models.LivechatInquiry.findNextAndLock.reset(); models.LivechatInquiry.takeInquiry.reset(); - models.LivechatInquiry.unlockAndQueue.reset(); models.LivechatInquiry.unlock.reset(); queueLogger.error.reset(); queueLogger.info.reset(); @@ -178,7 +151,7 @@ describe('Omnichannel Queue processor', () => { expect(models.LivechatInquiry.findNextAndLock.calledOnce).to.be.true; expect(queue.processWaitingQueue.calledOnce).to.be.true; }); - it('should call unlockAndRequeue when the inquiry could not be processed', async () => { + it('should call unlock when the inquiry could not be processed', async () => { models.LivechatInquiry.findNextAndLock.returns(mockedInquiry); const queue = new OmnichannelQueue(); @@ -187,7 +160,7 @@ describe('Omnichannel Queue processor', () => { await queue.checkQueue(); expect(queue.processWaitingQueue.calledOnce).to.be.true; - expect(models.LivechatInquiry.unlockAndQueue.calledOnce).to.be.true; + expect(models.LivechatInquiry.unlock.calledOnce).to.be.true; }); it('should unlock the inquiry when it was processed succesfully', async () => { models.LivechatInquiry.findNextAndLock.returns(mockedInquiry); @@ -209,21 +182,6 @@ describe('Omnichannel Queue processor', () => { expect(queueLogger.error.calledOnce).to.be.true; }); - it('should call execute after finishing', async () => { - models.LivechatInquiry.findNextAndLock.returns(mockedInquiry); - - const queue = new OmnichannelQueue(); - queue.processWaitingQueue = Sinon.stub().returns(true); - queue.execute = Sinon.stub(); - queue.delay = Sinon.stub().returns(100); - await queue.checkQueue(); - clock.tick(100); - - expect(queue.execute.calledOnce).to.be.true; - expect(models.LivechatInquiry.unlock.calledOnce).to.be.true; - expect(queue.execute.calledAfter(models.LivechatInquiry.unlock)).to.be.true; - expect(queue.execute.calledOnce).to.be.true; - }); }); describe('shouldStart', () => { beforeEach(() => { @@ -414,11 +372,11 @@ describe('Omnichannel Queue processor', () => { const queue = new OmnichannelQueue(); queue.running = true; - queue.nextQueue = Sinon.stub(); + queue.getActiveQueues = Sinon.stub().resolves([null]); await queue.execute(); - expect(queue.nextQueue.calledOnce).to.be.true; - expect(queueLogger.debug.calledWith('Executing queue Public with timeout of 5000')).to.be.true; + expect(queue.getActiveQueues.calledOnce).to.be.true; + expect(queueLogger.debug.calledWith('Processing items for queue Public')).to.be.true; }); }); describe('start', () => { diff --git a/packages/model-typings/src/models/ILivechatInquiryModel.ts b/packages/model-typings/src/models/ILivechatInquiryModel.ts index b81c37afb95..19b52f3f90e 100644 --- a/packages/model-typings/src/models/ILivechatInquiryModel.ts +++ b/packages/model-typings/src/models/ILivechatInquiryModel.ts @@ -1,5 +1,5 @@ import type { IMessage, ILivechatInquiryRecord, LivechatInquiryStatus } from '@rocket.chat/core-typings'; -import type { FindOptions, DistinctOptions, Document, UpdateResult, DeleteResult, FindCursor, DeleteOptions } from 'mongodb'; +import type { FindOptions, Document, UpdateResult, DeleteResult, FindCursor, DeleteOptions, AggregateOptions } from 'mongodb'; import type { IBaseModel } from './IBaseModel'; @@ -13,12 +13,14 @@ export interface ILivechatInquiryModel extends IBaseModel, ): Promise; - getDistinctQueuedDepartments(options: DistinctOptions): Promise<(string | undefined)[]>; + getDistinctQueuedDepartments(options: AggregateOptions): Promise<{ _id: string | null }[]>; setDepartmentByInquiryId(inquiryId: string, department: string): Promise; setLastMessageByRoomId(rid: ILivechatInquiryRecord['rid'], message: IMessage): Promise; - findNextAndLock(queueSortBy: FindOptions['sort'], department?: string): Promise; + findNextAndLock( + queueSortBy: FindOptions['sort'], + department: string | null, + ): Promise; unlock(inquiryId: string): Promise; - unlockAndQueue(inquiryId: string): Promise; unlockAll(): Promise; getCurrentSortedQueueAsync(props: { inquiryId?: string; diff --git a/packages/models/src/models/LivechatInquiry.ts b/packages/models/src/models/LivechatInquiry.ts index 3caee4626d2..d1361f26549 100644 --- a/packages/models/src/models/LivechatInquiry.ts +++ b/packages/models/src/models/LivechatInquiry.ts @@ -6,7 +6,6 @@ import type { Db, Document, FindOptions, - DistinctOptions, UpdateResult, Filter, DeleteResult, @@ -14,6 +13,7 @@ import type { FindCursor, UpdateFilter, DeleteOptions, + AggregateOptions, WithId, } from 'mongodb'; @@ -133,8 +133,20 @@ export class LivechatInquiryRaw extends BaseRaw implemen return this.find({ 'v.token': token }, { projection: { _id: 1 } }); } - getDistinctQueuedDepartments(options: DistinctOptions): Promise<(string | undefined)[]> { - return this.col.distinct('department', { status: LivechatInquiryStatus.QUEUED }, options); + getDistinctQueuedDepartments(options: AggregateOptions): Promise<{ _id: string | null }[]> { + return this.col + .aggregate<{ _id: string | null }>( + [ + { $match: { status: LivechatInquiryStatus.QUEUED } }, + { + $group: { + _id: '$department', + }, + }, + ], + options, + ) + .toArray(); } async setDepartmentByInquiryId(inquiryId: string, department: string): Promise { @@ -147,7 +159,7 @@ export class LivechatInquiryRaw extends BaseRaw implemen async findNextAndLock( queueSortBy: FindOptions['sort'], - department?: string, + department: string | null, ): Promise { const date = new Date(); return this.findOneAndUpdate( @@ -183,13 +195,6 @@ export class LivechatInquiryRaw extends BaseRaw implemen return this.updateOne({ _id: inquiryId }, { $unset: { locked: 1, lockedAt: 1 } }); } - async unlockAndQueue(inquiryId: string): Promise { - return this.updateOne( - { _id: inquiryId }, - { $unset: { locked: 1, lockedAt: 1 }, $set: { status: LivechatInquiryStatus.QUEUED, queuedAt: new Date() } }, - ); - } - async unlockAll(): Promise { return this.updateMany( { locked: { $exists: true } },