diff --git a/apps/meteor/ee/app/livechat-enterprise/server/hooks/checkAgentBeforeTakeInquiry.ts b/apps/meteor/ee/app/livechat-enterprise/server/hooks/checkAgentBeforeTakeInquiry.ts index 55adb05cbd1..9722cf25b20 100644 --- a/apps/meteor/ee/app/livechat-enterprise/server/hooks/checkAgentBeforeTakeInquiry.ts +++ b/apps/meteor/ee/app/livechat-enterprise/server/hooks/checkAgentBeforeTakeInquiry.ts @@ -27,15 +27,18 @@ const validateMaxChats = async ({ }; }) => { if (!inquiry?._id || !agent?.agentId) { + cbLogger.debug('No inquiry or agent provided'); throw new Error('No inquiry or agent provided'); } const { agentId } = agent; if (!(await Livechat.checkOnlineAgents(undefined, agent))) { + cbLogger.debug('Provided agent is not online'); throw new Error('Provided agent is not online'); } if (!settings.get('Livechat_waiting_queue')) { + cbLogger.info(`Chat can be taken by Agent ${agentId}: waiting queue is disabled`); return agent; } @@ -58,11 +61,14 @@ const validateMaxChats = async ({ const user = await Users.getAgentAndAmountOngoingChats(agentId); if (!user) { + cbLogger.debug({ msg: 'No valid agent found', agentId }); throw new Error('No valid agent found'); } const { queueInfo: { chats = 0 } = {} } = user; const maxChats = typeof maxNumberSimultaneousChat === 'number' ? maxNumberSimultaneousChat : parseInt(maxNumberSimultaneousChat, 10); + + cbLogger.debug({ msg: 'Validating agent is within max number of chats', agentId, user, maxChats }); if (maxChats <= chats) { await callbacks.run('livechat.onMaxNumberSimultaneousChatsReached', inquiry); throw new Error('error-max-number-simultaneous-chats-reached'); diff --git a/apps/meteor/ee/app/livechat-enterprise/server/lib/Helper.ts b/apps/meteor/ee/app/livechat-enterprise/server/lib/Helper.ts index cb8503d82e7..ec2c9d40d68 100644 --- a/apps/meteor/ee/app/livechat-enterprise/server/lib/Helper.ts +++ b/apps/meteor/ee/app/livechat-enterprise/server/lib/Helper.ts @@ -38,7 +38,7 @@ export const getMaxNumberSimultaneousChat = async ({ agentId, departmentId }: { const department = await LivechatDepartmentRaw.findOneById(departmentId); const { maxNumberSimultaneousChat = 0 } = department || { maxNumberSimultaneousChat: 0 }; if (maxNumberSimultaneousChat > 0) { - return maxNumberSimultaneousChat; + return Number(maxNumberSimultaneousChat); } } @@ -46,7 +46,7 @@ export const getMaxNumberSimultaneousChat = async ({ agentId, departmentId }: { const user = await Users.getAgentInfo(agentId, settings.get('Livechat_show_agent_info')); const { livechat: { maxNumberSimultaneousChat = 0 } = {} } = user || {}; if (maxNumberSimultaneousChat > 0) { - return maxNumberSimultaneousChat; + return Number(maxNumberSimultaneousChat); } } diff --git a/apps/meteor/server/services/omnichannel/queue.ts b/apps/meteor/server/services/omnichannel/queue.ts index ed7420e78d9..31a159492cf 100644 --- a/apps/meteor/server/services/omnichannel/queue.ts +++ b/apps/meteor/server/services/omnichannel/queue.ts @@ -2,6 +2,7 @@ import { ServiceStarter } from '@rocket.chat/core-services'; import { type InquiryWithAgentInfo, type IOmnichannelQueue } from '@rocket.chat/core-typings'; import { License } from '@rocket.chat/license'; import { LivechatInquiry, LivechatRooms } from '@rocket.chat/models'; +import { tracerSpan } from '@rocket.chat/tracing'; import { queueLogger } from './logger'; import { getOmniChatSortQuery } from '../../../app/livechat/lib/inquiries'; @@ -26,8 +27,6 @@ export class OmnichannelQueue implements IOmnichannelQueue { private running = false; - private queues: (string | undefined)[] = []; - private delay() { const timeout = settings.get('Omnichannel_queue_delay_timeout') ?? 5; return timeout < 1 ? DEFAULT_RACE_TIMEOUT : timeout * 1000; @@ -76,17 +75,7 @@ export class OmnichannelQueue implements IOmnichannelQueue { } private async getActiveQueues() { - // undefined = public queue(without department) - return ([undefined] as typeof this.queues).concat(await LivechatInquiry.getDistinctQueuedDepartments({})); - } - - private async nextQueue() { - if (!this.queues.length) { - queueLogger.debug('No more registered queues. Refreshing'); - this.queues = await this.getActiveQueues(); - } - - return this.queues.shift(); + return (await LivechatInquiry.getDistinctQueuedDepartments({})).map(({ _id }) => _id); } private async execute() { @@ -101,16 +90,20 @@ export class OmnichannelQueue implements IOmnichannelQueue { return; } - const queue = await this.nextQueue(); - const queueDelayTimeout = this.delay(); - queueLogger.debug(`Executing queue ${queue || 'Public'} with timeout of ${queueDelayTimeout}`); - - void this.checkQueue(queue).catch((e) => { - queueLogger.error(e); - }); + // We still go 1 by 1, but we go with every queue every cycle instead of just 1 queue per cycle + // And we get tracing :) + const queues = await this.getActiveQueues(); + for await (const queue of queues) { + await tracerSpan( + 'omnichannel.queue', + { attributes: { workerTime: new Date().toISOString(), queue: queue || 'Public' }, root: true }, + () => this.checkQueue(queue), + ); + } + this.scheduleExecution(); } - private async checkQueue(queue: string | undefined) { + private async checkQueue(queue: string | null) { queueLogger.debug(`Processing items for queue ${queue || 'Public'}`); try { const nextInquiry = await LivechatInquiry.findNextAndLock(getOmniChatSortQuery(getInquirySortMechanismSetting()), queue); @@ -122,12 +115,8 @@ export class OmnichannelQueue implements IOmnichannelQueue { const result = await this.processWaitingQueue(queue, nextInquiry as InquiryWithAgentInfo); if (!result) { - // Note: this removes the "one-shot" behavior of queue, allowing it to take a conversation again in the future - // And sorting them by _updatedAt: -1 will make it so that the oldest inquiries are taken first - // preventing us from playing with the same inquiry over and over again queueLogger.debug(`Inquiry ${nextInquiry._id} not taken. Unlocking and re-queueing`); - const updatedQueue = await LivechatInquiry.unlockAndQueue(nextInquiry._id); - return updatedQueue; + return await LivechatInquiry.unlock(nextInquiry._id); } queueLogger.debug(`Inquiry ${nextInquiry._id} taken successfully. Unlocking`); @@ -144,8 +133,6 @@ export class OmnichannelQueue implements IOmnichannelQueue { queue: queue || 'Public', err: e, }); - } finally { - this.scheduleExecution(); } } @@ -217,7 +204,7 @@ export class OmnichannelQueue implements IOmnichannelQueue { return true; } - private async processWaitingQueue(department: string | undefined, inquiry: InquiryWithAgentInfo) { + private async processWaitingQueue(department: string | null, inquiry: InquiryWithAgentInfo) { const queue = department || 'Public'; queueLogger.debug(`Processing inquiry ${inquiry._id} from queue ${queue}`); diff --git a/apps/meteor/tests/unit/server/services/omnichannel/queue.tests.ts b/apps/meteor/tests/unit/server/services/omnichannel/queue.tests.ts index ce24b0ded64..8db9a9a3edf 100644 --- a/apps/meteor/tests/unit/server/services/omnichannel/queue.tests.ts +++ b/apps/meteor/tests/unit/server/services/omnichannel/queue.tests.ts @@ -29,7 +29,6 @@ const models = { unlockAll: Sinon.stub(), findNextAndLock: Sinon.stub(), getDistinctQueuedDepartments: Sinon.stub(), - unlockAndQueue: Sinon.stub(), unlock: Sinon.stub(), removeByRoomId: Sinon.stub(), takeInquiry: Sinon.stub(), @@ -93,47 +92,23 @@ describe('Omnichannel Queue processor', () => { after(() => { models.LivechatInquiry.getDistinctQueuedDepartments.reset(); }); - it('should return [undefined] when there is no other queues', async () => { - models.LivechatInquiry.getDistinctQueuedDepartments.returns([]); + it('should return empty array when there are no active queues', async () => { + models.LivechatInquiry.getDistinctQueuedDepartments.resolves([]); const queue = new OmnichannelQueue(); - expect(await queue.getActiveQueues()).to.be.eql([undefined]); + expect(await queue.getActiveQueues()).to.be.eql([]); }); - it('should return [undefined, department1] when department1 is an active queue', async () => { - models.LivechatInquiry.getDistinctQueuedDepartments.returns(['department1']); + it('should return [department1] when department1 is an active queue', async () => { + models.LivechatInquiry.getDistinctQueuedDepartments.resolves([{ _id: 'department1' }]); const queue = new OmnichannelQueue(); - expect(await queue.getActiveQueues()).to.be.eql([undefined, 'department1']); + expect(await queue.getActiveQueues()).to.be.eql(['department1']); }); - }); - describe('nextQueue', () => { - after(() => { - models.LivechatInquiry.getDistinctQueuedDepartments.reset(); - }); - it('should return undefined when thats the only queue', async () => { - models.LivechatInquiry.getDistinctQueuedDepartments.returns([]); - - const queue = new OmnichannelQueue(); - queue.getActiveQueues = Sinon.stub().returns([undefined]); - expect(await queue.nextQueue()).to.be.undefined; - }); - it('should return undefined, and then the following queue', async () => { - models.LivechatInquiry.getDistinctQueuedDepartments.returns(['department1']); - - const queue = new OmnichannelQueue(); - queue.getActiveQueues = Sinon.stub().returns([undefined, 'department1']); - expect(await queue.nextQueue()).to.be.undefined; - expect(await queue.nextQueue()).to.be.equal('department1'); - }); - it('should not call getActiveQueues if there are still queues to process', async () => { - models.LivechatInquiry.getDistinctQueuedDepartments.returns(['department1']); + it('should return [null, department1] when department1 is an active queue and there are elements on public queue', async () => { + models.LivechatInquiry.getDistinctQueuedDepartments.resolves([{ _id: 'department1' }, { _id: null }]); const queue = new OmnichannelQueue(); - queue.queues = ['department1']; - queue.getActiveQueues = Sinon.stub(); - - expect(await queue.nextQueue()).to.be.equal('department1'); - expect(queue.getActiveQueues.notCalled).to.be.true; + expect(await queue.getActiveQueues()).to.be.eql(['department1', null]); }); }); describe('checkQueue', () => { @@ -141,7 +116,6 @@ describe('Omnichannel Queue processor', () => { beforeEach(() => { models.LivechatInquiry.findNextAndLock.resetHistory(); models.LivechatInquiry.takeInquiry.resetHistory(); - models.LivechatInquiry.unlockAndQueue.resetHistory(); models.LivechatInquiry.unlock.resetHistory(); queueLogger.error.resetHistory(); queueLogger.info.resetHistory(); @@ -153,7 +127,6 @@ describe('Omnichannel Queue processor', () => { after(() => { models.LivechatInquiry.findNextAndLock.reset(); models.LivechatInquiry.takeInquiry.reset(); - models.LivechatInquiry.unlockAndQueue.reset(); models.LivechatInquiry.unlock.reset(); queueLogger.error.reset(); queueLogger.info.reset(); @@ -178,7 +151,7 @@ describe('Omnichannel Queue processor', () => { expect(models.LivechatInquiry.findNextAndLock.calledOnce).to.be.true; expect(queue.processWaitingQueue.calledOnce).to.be.true; }); - it('should call unlockAndRequeue when the inquiry could not be processed', async () => { + it('should call unlock when the inquiry could not be processed', async () => { models.LivechatInquiry.findNextAndLock.returns(mockedInquiry); const queue = new OmnichannelQueue(); @@ -187,7 +160,7 @@ describe('Omnichannel Queue processor', () => { await queue.checkQueue(); expect(queue.processWaitingQueue.calledOnce).to.be.true; - expect(models.LivechatInquiry.unlockAndQueue.calledOnce).to.be.true; + expect(models.LivechatInquiry.unlock.calledOnce).to.be.true; }); it('should unlock the inquiry when it was processed succesfully', async () => { models.LivechatInquiry.findNextAndLock.returns(mockedInquiry); @@ -209,21 +182,6 @@ describe('Omnichannel Queue processor', () => { expect(queueLogger.error.calledOnce).to.be.true; }); - it('should call execute after finishing', async () => { - models.LivechatInquiry.findNextAndLock.returns(mockedInquiry); - - const queue = new OmnichannelQueue(); - queue.processWaitingQueue = Sinon.stub().returns(true); - queue.execute = Sinon.stub(); - queue.delay = Sinon.stub().returns(100); - await queue.checkQueue(); - clock.tick(100); - - expect(queue.execute.calledOnce).to.be.true; - expect(models.LivechatInquiry.unlock.calledOnce).to.be.true; - expect(queue.execute.calledAfter(models.LivechatInquiry.unlock)).to.be.true; - expect(queue.execute.calledOnce).to.be.true; - }); }); describe('shouldStart', () => { beforeEach(() => { @@ -414,11 +372,11 @@ describe('Omnichannel Queue processor', () => { const queue = new OmnichannelQueue(); queue.running = true; - queue.nextQueue = Sinon.stub(); + queue.getActiveQueues = Sinon.stub().resolves([null]); await queue.execute(); - expect(queue.nextQueue.calledOnce).to.be.true; - expect(queueLogger.debug.calledWith('Executing queue Public with timeout of 5000')).to.be.true; + expect(queue.getActiveQueues.calledOnce).to.be.true; + expect(queueLogger.debug.calledWith('Processing items for queue Public')).to.be.true; }); }); describe('start', () => { diff --git a/packages/model-typings/src/models/ILivechatInquiryModel.ts b/packages/model-typings/src/models/ILivechatInquiryModel.ts index b81c37afb95..19b52f3f90e 100644 --- a/packages/model-typings/src/models/ILivechatInquiryModel.ts +++ b/packages/model-typings/src/models/ILivechatInquiryModel.ts @@ -1,5 +1,5 @@ import type { IMessage, ILivechatInquiryRecord, LivechatInquiryStatus } from '@rocket.chat/core-typings'; -import type { FindOptions, DistinctOptions, Document, UpdateResult, DeleteResult, FindCursor, DeleteOptions } from 'mongodb'; +import type { FindOptions, Document, UpdateResult, DeleteResult, FindCursor, DeleteOptions, AggregateOptions } from 'mongodb'; import type { IBaseModel } from './IBaseModel'; @@ -13,12 +13,14 @@ export interface ILivechatInquiryModel extends IBaseModel, ): Promise; - getDistinctQueuedDepartments(options: DistinctOptions): Promise<(string | undefined)[]>; + getDistinctQueuedDepartments(options: AggregateOptions): Promise<{ _id: string | null }[]>; setDepartmentByInquiryId(inquiryId: string, department: string): Promise; setLastMessageByRoomId(rid: ILivechatInquiryRecord['rid'], message: IMessage): Promise; - findNextAndLock(queueSortBy: FindOptions['sort'], department?: string): Promise; + findNextAndLock( + queueSortBy: FindOptions['sort'], + department: string | null, + ): Promise; unlock(inquiryId: string): Promise; - unlockAndQueue(inquiryId: string): Promise; unlockAll(): Promise; getCurrentSortedQueueAsync(props: { inquiryId?: string; diff --git a/packages/models/src/models/LivechatInquiry.ts b/packages/models/src/models/LivechatInquiry.ts index 3caee4626d2..d1361f26549 100644 --- a/packages/models/src/models/LivechatInquiry.ts +++ b/packages/models/src/models/LivechatInquiry.ts @@ -6,7 +6,6 @@ import type { Db, Document, FindOptions, - DistinctOptions, UpdateResult, Filter, DeleteResult, @@ -14,6 +13,7 @@ import type { FindCursor, UpdateFilter, DeleteOptions, + AggregateOptions, WithId, } from 'mongodb'; @@ -133,8 +133,20 @@ export class LivechatInquiryRaw extends BaseRaw implemen return this.find({ 'v.token': token }, { projection: { _id: 1 } }); } - getDistinctQueuedDepartments(options: DistinctOptions): Promise<(string | undefined)[]> { - return this.col.distinct('department', { status: LivechatInquiryStatus.QUEUED }, options); + getDistinctQueuedDepartments(options: AggregateOptions): Promise<{ _id: string | null }[]> { + return this.col + .aggregate<{ _id: string | null }>( + [ + { $match: { status: LivechatInquiryStatus.QUEUED } }, + { + $group: { + _id: '$department', + }, + }, + ], + options, + ) + .toArray(); } async setDepartmentByInquiryId(inquiryId: string, department: string): Promise { @@ -147,7 +159,7 @@ export class LivechatInquiryRaw extends BaseRaw implemen async findNextAndLock( queueSortBy: FindOptions['sort'], - department?: string, + department: string | null, ): Promise { const date = new Date(); return this.findOneAndUpdate( @@ -183,13 +195,6 @@ export class LivechatInquiryRaw extends BaseRaw implemen return this.updateOne({ _id: inquiryId }, { $unset: { locked: 1, lockedAt: 1 } }); } - async unlockAndQueue(inquiryId: string): Promise { - return this.updateOne( - { _id: inquiryId }, - { $unset: { locked: 1, lockedAt: 1 }, $set: { status: LivechatInquiryStatus.QUEUED, queuedAt: new Date() } }, - ); - } - async unlockAll(): Promise { return this.updateMany( { locked: { $exists: true } },