refactor: Omnichannel queue processing (#34192)

pull/34939/head^2
Kevin Aleman 1 year ago committed by GitHub
parent d94901634d
commit c9a5c78c2d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 6
      apps/meteor/ee/app/livechat-enterprise/server/hooks/checkAgentBeforeTakeInquiry.ts
  2. 4
      apps/meteor/ee/app/livechat-enterprise/server/lib/Helper.ts
  3. 45
      apps/meteor/server/services/omnichannel/queue.ts
  4. 70
      apps/meteor/tests/unit/server/services/omnichannel/queue.tests.ts
  5. 10
      packages/model-typings/src/models/ILivechatInquiryModel.ts
  6. 27
      packages/models/src/models/LivechatInquiry.ts

@ -27,15 +27,18 @@ const validateMaxChats = async ({
};
}) => {
if (!inquiry?._id || !agent?.agentId) {
cbLogger.debug('No inquiry or agent provided');
throw new Error('No inquiry or agent provided');
}
const { agentId } = agent;
if (!(await Livechat.checkOnlineAgents(undefined, agent))) {
cbLogger.debug('Provided agent is not online');
throw new Error('Provided agent is not online');
}
if (!settings.get('Livechat_waiting_queue')) {
cbLogger.info(`Chat can be taken by Agent ${agentId}: waiting queue is disabled`);
return agent;
}
@ -58,11 +61,14 @@ const validateMaxChats = async ({
const user = await Users.getAgentAndAmountOngoingChats(agentId);
if (!user) {
cbLogger.debug({ msg: 'No valid agent found', agentId });
throw new Error('No valid agent found');
}
const { queueInfo: { chats = 0 } = {} } = user;
const maxChats = typeof maxNumberSimultaneousChat === 'number' ? maxNumberSimultaneousChat : parseInt(maxNumberSimultaneousChat, 10);
cbLogger.debug({ msg: 'Validating agent is within max number of chats', agentId, user, maxChats });
if (maxChats <= chats) {
await callbacks.run('livechat.onMaxNumberSimultaneousChatsReached', inquiry);
throw new Error('error-max-number-simultaneous-chats-reached');

@ -38,7 +38,7 @@ export const getMaxNumberSimultaneousChat = async ({ agentId, departmentId }: {
const department = await LivechatDepartmentRaw.findOneById(departmentId);
const { maxNumberSimultaneousChat = 0 } = department || { maxNumberSimultaneousChat: 0 };
if (maxNumberSimultaneousChat > 0) {
return maxNumberSimultaneousChat;
return Number(maxNumberSimultaneousChat);
}
}
@ -46,7 +46,7 @@ export const getMaxNumberSimultaneousChat = async ({ agentId, departmentId }: {
const user = await Users.getAgentInfo(agentId, settings.get('Livechat_show_agent_info'));
const { livechat: { maxNumberSimultaneousChat = 0 } = {} } = user || {};
if (maxNumberSimultaneousChat > 0) {
return maxNumberSimultaneousChat;
return Number(maxNumberSimultaneousChat);
}
}

@ -2,6 +2,7 @@ import { ServiceStarter } from '@rocket.chat/core-services';
import { type InquiryWithAgentInfo, type IOmnichannelQueue } from '@rocket.chat/core-typings';
import { License } from '@rocket.chat/license';
import { LivechatInquiry, LivechatRooms } from '@rocket.chat/models';
import { tracerSpan } from '@rocket.chat/tracing';
import { queueLogger } from './logger';
import { getOmniChatSortQuery } from '../../../app/livechat/lib/inquiries';
@ -26,8 +27,6 @@ export class OmnichannelQueue implements IOmnichannelQueue {
private running = false;
private queues: (string | undefined)[] = [];
private delay() {
const timeout = settings.get<number>('Omnichannel_queue_delay_timeout') ?? 5;
return timeout < 1 ? DEFAULT_RACE_TIMEOUT : timeout * 1000;
@ -76,17 +75,7 @@ export class OmnichannelQueue implements IOmnichannelQueue {
}
private async getActiveQueues() {
// undefined = public queue(without department)
return ([undefined] as typeof this.queues).concat(await LivechatInquiry.getDistinctQueuedDepartments({}));
}
private async nextQueue() {
if (!this.queues.length) {
queueLogger.debug('No more registered queues. Refreshing');
this.queues = await this.getActiveQueues();
}
return this.queues.shift();
return (await LivechatInquiry.getDistinctQueuedDepartments({})).map(({ _id }) => _id);
}
private async execute() {
@ -101,16 +90,20 @@ export class OmnichannelQueue implements IOmnichannelQueue {
return;
}
const queue = await this.nextQueue();
const queueDelayTimeout = this.delay();
queueLogger.debug(`Executing queue ${queue || 'Public'} with timeout of ${queueDelayTimeout}`);
void this.checkQueue(queue).catch((e) => {
queueLogger.error(e);
});
// We still go 1 by 1, but we go with every queue every cycle instead of just 1 queue per cycle
// And we get tracing :)
const queues = await this.getActiveQueues();
for await (const queue of queues) {
await tracerSpan(
'omnichannel.queue',
{ attributes: { workerTime: new Date().toISOString(), queue: queue || 'Public' }, root: true },
() => this.checkQueue(queue),
);
}
this.scheduleExecution();
}
private async checkQueue(queue: string | undefined) {
private async checkQueue(queue: string | null) {
queueLogger.debug(`Processing items for queue ${queue || 'Public'}`);
try {
const nextInquiry = await LivechatInquiry.findNextAndLock(getOmniChatSortQuery(getInquirySortMechanismSetting()), queue);
@ -122,12 +115,8 @@ export class OmnichannelQueue implements IOmnichannelQueue {
const result = await this.processWaitingQueue(queue, nextInquiry as InquiryWithAgentInfo);
if (!result) {
// Note: this removes the "one-shot" behavior of queue, allowing it to take a conversation again in the future
// And sorting them by _updatedAt: -1 will make it so that the oldest inquiries are taken first
// preventing us from playing with the same inquiry over and over again
queueLogger.debug(`Inquiry ${nextInquiry._id} not taken. Unlocking and re-queueing`);
const updatedQueue = await LivechatInquiry.unlockAndQueue(nextInquiry._id);
return updatedQueue;
return await LivechatInquiry.unlock(nextInquiry._id);
}
queueLogger.debug(`Inquiry ${nextInquiry._id} taken successfully. Unlocking`);
@ -144,8 +133,6 @@ export class OmnichannelQueue implements IOmnichannelQueue {
queue: queue || 'Public',
err: e,
});
} finally {
this.scheduleExecution();
}
}
@ -217,7 +204,7 @@ export class OmnichannelQueue implements IOmnichannelQueue {
return true;
}
private async processWaitingQueue(department: string | undefined, inquiry: InquiryWithAgentInfo) {
private async processWaitingQueue(department: string | null, inquiry: InquiryWithAgentInfo) {
const queue = department || 'Public';
queueLogger.debug(`Processing inquiry ${inquiry._id} from queue ${queue}`);

@ -29,7 +29,6 @@ const models = {
unlockAll: Sinon.stub(),
findNextAndLock: Sinon.stub(),
getDistinctQueuedDepartments: Sinon.stub(),
unlockAndQueue: Sinon.stub(),
unlock: Sinon.stub(),
removeByRoomId: Sinon.stub(),
takeInquiry: Sinon.stub(),
@ -93,47 +92,23 @@ describe('Omnichannel Queue processor', () => {
after(() => {
models.LivechatInquiry.getDistinctQueuedDepartments.reset();
});
it('should return [undefined] when there is no other queues', async () => {
models.LivechatInquiry.getDistinctQueuedDepartments.returns([]);
it('should return empty array when there are no active queues', async () => {
models.LivechatInquiry.getDistinctQueuedDepartments.resolves([]);
const queue = new OmnichannelQueue();
expect(await queue.getActiveQueues()).to.be.eql([undefined]);
expect(await queue.getActiveQueues()).to.be.eql([]);
});
it('should return [undefined, department1] when department1 is an active queue', async () => {
models.LivechatInquiry.getDistinctQueuedDepartments.returns(['department1']);
it('should return [department1] when department1 is an active queue', async () => {
models.LivechatInquiry.getDistinctQueuedDepartments.resolves([{ _id: 'department1' }]);
const queue = new OmnichannelQueue();
expect(await queue.getActiveQueues()).to.be.eql([undefined, 'department1']);
expect(await queue.getActiveQueues()).to.be.eql(['department1']);
});
});
describe('nextQueue', () => {
after(() => {
models.LivechatInquiry.getDistinctQueuedDepartments.reset();
});
it('should return undefined when thats the only queue', async () => {
models.LivechatInquiry.getDistinctQueuedDepartments.returns([]);
const queue = new OmnichannelQueue();
queue.getActiveQueues = Sinon.stub().returns([undefined]);
expect(await queue.nextQueue()).to.be.undefined;
});
it('should return undefined, and then the following queue', async () => {
models.LivechatInquiry.getDistinctQueuedDepartments.returns(['department1']);
const queue = new OmnichannelQueue();
queue.getActiveQueues = Sinon.stub().returns([undefined, 'department1']);
expect(await queue.nextQueue()).to.be.undefined;
expect(await queue.nextQueue()).to.be.equal('department1');
});
it('should not call getActiveQueues if there are still queues to process', async () => {
models.LivechatInquiry.getDistinctQueuedDepartments.returns(['department1']);
it('should return [null, department1] when department1 is an active queue and there are elements on public queue', async () => {
models.LivechatInquiry.getDistinctQueuedDepartments.resolves([{ _id: 'department1' }, { _id: null }]);
const queue = new OmnichannelQueue();
queue.queues = ['department1'];
queue.getActiveQueues = Sinon.stub();
expect(await queue.nextQueue()).to.be.equal('department1');
expect(queue.getActiveQueues.notCalled).to.be.true;
expect(await queue.getActiveQueues()).to.be.eql(['department1', null]);
});
});
describe('checkQueue', () => {
@ -141,7 +116,6 @@ describe('Omnichannel Queue processor', () => {
beforeEach(() => {
models.LivechatInquiry.findNextAndLock.resetHistory();
models.LivechatInquiry.takeInquiry.resetHistory();
models.LivechatInquiry.unlockAndQueue.resetHistory();
models.LivechatInquiry.unlock.resetHistory();
queueLogger.error.resetHistory();
queueLogger.info.resetHistory();
@ -153,7 +127,6 @@ describe('Omnichannel Queue processor', () => {
after(() => {
models.LivechatInquiry.findNextAndLock.reset();
models.LivechatInquiry.takeInquiry.reset();
models.LivechatInquiry.unlockAndQueue.reset();
models.LivechatInquiry.unlock.reset();
queueLogger.error.reset();
queueLogger.info.reset();
@ -178,7 +151,7 @@ describe('Omnichannel Queue processor', () => {
expect(models.LivechatInquiry.findNextAndLock.calledOnce).to.be.true;
expect(queue.processWaitingQueue.calledOnce).to.be.true;
});
it('should call unlockAndRequeue when the inquiry could not be processed', async () => {
it('should call unlock when the inquiry could not be processed', async () => {
models.LivechatInquiry.findNextAndLock.returns(mockedInquiry);
const queue = new OmnichannelQueue();
@ -187,7 +160,7 @@ describe('Omnichannel Queue processor', () => {
await queue.checkQueue();
expect(queue.processWaitingQueue.calledOnce).to.be.true;
expect(models.LivechatInquiry.unlockAndQueue.calledOnce).to.be.true;
expect(models.LivechatInquiry.unlock.calledOnce).to.be.true;
});
it('should unlock the inquiry when it was processed succesfully', async () => {
models.LivechatInquiry.findNextAndLock.returns(mockedInquiry);
@ -209,21 +182,6 @@ describe('Omnichannel Queue processor', () => {
expect(queueLogger.error.calledOnce).to.be.true;
});
it('should call execute after finishing', async () => {
models.LivechatInquiry.findNextAndLock.returns(mockedInquiry);
const queue = new OmnichannelQueue();
queue.processWaitingQueue = Sinon.stub().returns(true);
queue.execute = Sinon.stub();
queue.delay = Sinon.stub().returns(100);
await queue.checkQueue();
clock.tick(100);
expect(queue.execute.calledOnce).to.be.true;
expect(models.LivechatInquiry.unlock.calledOnce).to.be.true;
expect(queue.execute.calledAfter(models.LivechatInquiry.unlock)).to.be.true;
expect(queue.execute.calledOnce).to.be.true;
});
});
describe('shouldStart', () => {
beforeEach(() => {
@ -414,11 +372,11 @@ describe('Omnichannel Queue processor', () => {
const queue = new OmnichannelQueue();
queue.running = true;
queue.nextQueue = Sinon.stub();
queue.getActiveQueues = Sinon.stub().resolves([null]);
await queue.execute();
expect(queue.nextQueue.calledOnce).to.be.true;
expect(queueLogger.debug.calledWith('Executing queue Public with timeout of 5000')).to.be.true;
expect(queue.getActiveQueues.calledOnce).to.be.true;
expect(queueLogger.debug.calledWith('Processing items for queue Public')).to.be.true;
});
});
describe('start', () => {

@ -1,5 +1,5 @@
import type { IMessage, ILivechatInquiryRecord, LivechatInquiryStatus } from '@rocket.chat/core-typings';
import type { FindOptions, DistinctOptions, Document, UpdateResult, DeleteResult, FindCursor, DeleteOptions } from 'mongodb';
import type { FindOptions, Document, UpdateResult, DeleteResult, FindCursor, DeleteOptions, AggregateOptions } from 'mongodb';
import type { IBaseModel } from './IBaseModel';
@ -13,12 +13,14 @@ export interface ILivechatInquiryModel extends IBaseModel<ILivechatInquiryRecord
rid: string,
options?: FindOptions<T extends ILivechatInquiryRecord ? ILivechatInquiryRecord : T>,
): Promise<T | null>;
getDistinctQueuedDepartments(options: DistinctOptions): Promise<(string | undefined)[]>;
getDistinctQueuedDepartments(options: AggregateOptions): Promise<{ _id: string | null }[]>;
setDepartmentByInquiryId(inquiryId: string, department: string): Promise<ILivechatInquiryRecord | null>;
setLastMessageByRoomId(rid: ILivechatInquiryRecord['rid'], message: IMessage): Promise<ILivechatInquiryRecord | null>;
findNextAndLock(queueSortBy: FindOptions<ILivechatInquiryRecord>['sort'], department?: string): Promise<ILivechatInquiryRecord | null>;
findNextAndLock(
queueSortBy: FindOptions<ILivechatInquiryRecord>['sort'],
department: string | null,
): Promise<ILivechatInquiryRecord | null>;
unlock(inquiryId: string): Promise<UpdateResult>;
unlockAndQueue(inquiryId: string): Promise<UpdateResult>;
unlockAll(): Promise<UpdateResult | Document>;
getCurrentSortedQueueAsync(props: {
inquiryId?: string;

@ -6,7 +6,6 @@ import type {
Db,
Document,
FindOptions,
DistinctOptions,
UpdateResult,
Filter,
DeleteResult,
@ -14,6 +13,7 @@ import type {
FindCursor,
UpdateFilter,
DeleteOptions,
AggregateOptions,
WithId,
} from 'mongodb';
@ -133,8 +133,20 @@ export class LivechatInquiryRaw extends BaseRaw<ILivechatInquiryRecord> implemen
return this.find({ 'v.token': token }, { projection: { _id: 1 } });
}
getDistinctQueuedDepartments(options: DistinctOptions): Promise<(string | undefined)[]> {
return this.col.distinct('department', { status: LivechatInquiryStatus.QUEUED }, options);
getDistinctQueuedDepartments(options: AggregateOptions): Promise<{ _id: string | null }[]> {
return this.col
.aggregate<{ _id: string | null }>(
[
{ $match: { status: LivechatInquiryStatus.QUEUED } },
{
$group: {
_id: '$department',
},
},
],
options,
)
.toArray();
}
async setDepartmentByInquiryId(inquiryId: string, department: string): Promise<ILivechatInquiryRecord | null> {
@ -147,7 +159,7 @@ export class LivechatInquiryRaw extends BaseRaw<ILivechatInquiryRecord> implemen
async findNextAndLock(
queueSortBy: FindOptions<ILivechatInquiryRecord>['sort'],
department?: string,
department: string | null,
): Promise<ILivechatInquiryRecord | null> {
const date = new Date();
return this.findOneAndUpdate(
@ -183,13 +195,6 @@ export class LivechatInquiryRaw extends BaseRaw<ILivechatInquiryRecord> implemen
return this.updateOne({ _id: inquiryId }, { $unset: { locked: 1, lockedAt: 1 } });
}
async unlockAndQueue(inquiryId: string): Promise<UpdateResult> {
return this.updateOne(
{ _id: inquiryId },
{ $unset: { locked: 1, lockedAt: 1 }, $set: { status: LivechatInquiryStatus.QUEUED, queuedAt: new Date() } },
);
}
async unlockAll(): Promise<UpdateResult | Document> {
return this.updateMany(
{ locked: { $exists: true } },

Loading…
Cancel
Save