diff --git a/apps/meteor/app/models/server/index.js b/apps/meteor/app/models/server/index.js index 697bad8152d..da1ab9c8549 100644 --- a/apps/meteor/app/models/server/index.js +++ b/apps/meteor/app/models/server/index.js @@ -11,7 +11,6 @@ import LivechatDepartment from './models/LivechatDepartment'; import LivechatDepartmentAgents from './models/LivechatDepartmentAgents'; import LivechatRooms from './models/LivechatRooms'; import LivechatInquiry from './models/LivechatInquiry'; -import OmnichannelQueue from './models/OmnichannelQueue'; import ImportData from './models/ImportData'; import './lib/watchModels'; @@ -36,6 +35,5 @@ export { LivechatDepartmentAgents, LivechatRooms, LivechatInquiry, - OmnichannelQueue, ImportData, }; diff --git a/apps/meteor/app/models/server/models/LivechatInquiry.js b/apps/meteor/app/models/server/models/LivechatInquiry.ts similarity index 69% rename from apps/meteor/app/models/server/models/LivechatInquiry.js rename to apps/meteor/app/models/server/models/LivechatInquiry.ts index c85169cc15a..331398d21c7 100644 --- a/apps/meteor/app/models/server/models/LivechatInquiry.js +++ b/apps/meteor/app/models/server/models/LivechatInquiry.ts @@ -1,3 +1,6 @@ +import { ILivechatInquiryRecord } from '@rocket.chat/core-typings'; +import { FindOneOptions, Cursor, WriteOpResult, DeleteWriteOpResultObject } from 'mongodb'; + import { Base } from './_Base'; export class LivechatInquiry extends Base { @@ -12,17 +15,18 @@ export class LivechatInquiry extends Base { this.tryEnsureIndex({ status: 1 }); // 'ready', 'queued', 'taken' this.tryEnsureIndex({ queueOrder: 1, estimatedWaitingTimeQueue: 1, estimatedServiceTimeAt: 1 }); this.tryEnsureIndex({ 'v.token': 1, 'status': 1 }); // visitor token and status + this.tryEnsureIndex({ locked: 1, lockedAt: 1 }, { sparse: true }); // locked and lockedAt } - findOneById(inquiryId) { + findOneById(inquiryId: string): ILivechatInquiryRecord { return this.findOne({ _id: inquiryId }); } - findOneByRoomId(rid, options) { + findOneByRoomId(rid: string, options?: FindOneOptions): ILivechatInquiryRecord { return this.findOne({ rid }, options); } - getNextInquiryQueued(department) { + getNextInquiryQueued(department?: string): ILivechatInquiryRecord { return this.findOne( { status: 'queued', @@ -38,14 +42,14 @@ export class LivechatInquiry extends Base { ); } - getQueuedInquiries(options) { + getQueuedInquiries(options?: FindOneOptions): Cursor { return this.find({ status: 'queued' }, options); } /* * mark the inquiry as taken */ - takeInquiry(inquiryId) { + takeInquiry(inquiryId: string): void { this.update( { _id: inquiryId, @@ -60,7 +64,7 @@ export class LivechatInquiry extends Base { /* * mark inquiry as open */ - openInquiry(inquiryId) { + openInquiry(inquiryId: string): WriteOpResult { return this.update( { _id: inquiryId, @@ -74,7 +78,7 @@ export class LivechatInquiry extends Base { /* * mark inquiry as queued */ - queueInquiry(inquiryId) { + queueInquiry(inquiryId: string): WriteOpResult { return this.update( { _id: inquiryId, @@ -86,7 +90,7 @@ export class LivechatInquiry extends Base { ); } - queueInquiryAndRemoveDefaultAgent(inquiryId) { + queueInquiryAndRemoveDefaultAgent(inquiryId: string): WriteOpResult { return this.update( { _id: inquiryId, @@ -101,7 +105,7 @@ export class LivechatInquiry extends Base { /* * mark inquiry as ready */ - readyInquiry(inquiryId) { + readyInquiry(inquiryId: string): WriteOpResult { return this.update( { _id: inquiryId, @@ -114,27 +118,27 @@ export class LivechatInquiry extends Base { ); } - changeDepartmentIdByRoomId(rid, department) { + changeDepartmentIdByRoomId(rid: string, department: string): void { const query = { rid, }; - const update = { + const updateObj = { $set: { department, }, }; - this.update(query, update); + this.update(query, updateObj); } /* * return the status of the inquiry (open or taken) */ - getStatus(inquiryId) { + getStatus(inquiryId: string): ILivechatInquiryRecord['status'] { return this.findOne({ _id: inquiryId }).status; } - updateVisitorStatus(token, status) { + updateVisitorStatus(token: string, status: string): WriteOpResult { const query = { 'v.token': token, 'status': 'queued', @@ -149,7 +153,7 @@ export class LivechatInquiry extends Base { return this.update(query, update); } - setDefaultAgentById(inquiryId, defaultAgent) { + setDefaultAgentById(inquiryId: string, defaultAgent: ILivechatInquiryRecord['defaultAgent']): WriteOpResult { return this.update( { _id: inquiryId, @@ -162,7 +166,7 @@ export class LivechatInquiry extends Base { ); } - setNameByRoomId(rid, name) { + setNameByRoomId(rid: string, name: string): WriteOpResult { const query = { rid }; const update = { @@ -173,7 +177,7 @@ export class LivechatInquiry extends Base { return this.update(query, update); } - findOneByToken(token) { + findOneByToken(token: string): ILivechatInquiryRecord { const query = { 'v.token': token, 'status': 'queued', @@ -181,7 +185,13 @@ export class LivechatInquiry extends Base { return this.findOne(query); } - async getCurrentSortedQueueAsync({ _id, department }) { + async getCurrentSortedQueueAsync({ + _id, + department, + }: { + _id: string; + department: string; + }): Promise & { position: number }> { const collectionObj = this.model.rawCollection(); const aggregate = [ { @@ -227,7 +237,7 @@ export class LivechatInquiry extends Base { position: 1, }, }, - ]; + ] as any[]; // To get the current room position in the queue, we need to apply the next $match after the $project if (_id) { @@ -237,7 +247,7 @@ export class LivechatInquiry extends Base { return collectionObj.aggregate(aggregate).toArray(); } - removeDefaultAgentById(inquiryId) { + removeDefaultAgentById(inquiryId: string): WriteOpResult { return this.update( { _id: inquiryId, @@ -251,54 +261,17 @@ export class LivechatInquiry extends Base { /* * remove the inquiry by roomId */ - removeByRoomId(rid) { + removeByRoomId(rid: string): DeleteWriteOpResultObject { return this.remove({ rid }); } - removeByVisitorToken(token) { + removeByVisitorToken(token: string): void { const query = { 'v.token': token, }; this.remove(query); } - - getUnnatendedQueueItems(date) { - const query = { - status: 'queued', - estimatedInactivityCloseTimeAt: { $lte: new Date(date) }, - }; - return this.find(query); - } - - setEstimatedInactivityCloseTime(_id, date) { - return this.update( - { _id }, - { - $set: { - estimatedInactivityCloseTimeAt: new Date(date), - }, - }, - ); - } - - // This is a better solution, but update pipelines are not supported until version 4.2 of mongo - // leaving this here for when the time comes - /* updateEstimatedInactivityCloseTime(milisecondsToAdd) { - return this.model.rawCollection().updateMany( - { status: 'queued' }, - [{ - // in case this field doesn't exists, set at the last time the item was modified (updatedAt) - $set: { estimatedInactivityCloseTimeAt: '$_updatedAt' }, - }, { - $set: { - estimatedInactivityCloseTimeAt: { - $add: ['$estimatedInactivityCloseTimeAt', milisecondsToAdd], - }, - }, - }], - ); - } */ } export default new LivechatInquiry(); diff --git a/apps/meteor/app/models/server/models/OmnichannelQueue.js b/apps/meteor/app/models/server/models/OmnichannelQueue.js deleted file mode 100644 index fb37a83af6d..00000000000 --- a/apps/meteor/app/models/server/models/OmnichannelQueue.js +++ /dev/null @@ -1,9 +0,0 @@ -import { Base } from './_Base'; - -export class OmnichannelQueue extends Base { - constructor() { - super('omnichannel_queue'); - } -} - -export default new OmnichannelQueue(); diff --git a/apps/meteor/ee/app/livechat-enterprise/server/lib/Helper.js b/apps/meteor/ee/app/livechat-enterprise/server/lib/Helper.js index 0d092e0e0b9..457bd918289 100644 --- a/apps/meteor/ee/app/livechat-enterprise/server/lib/Helper.js +++ b/apps/meteor/ee/app/livechat-enterprise/server/lib/Helper.js @@ -109,14 +109,9 @@ export const dispatchWaitingQueueStatus = async (department) => { // but we don't need to notify _each_ change that takes place, just their final position export const debouncedDispatchWaitingQueueStatus = memoizeDebounce(dispatchWaitingQueueStatus, 1200); -export const processWaitingQueue = async (department) => { +export const processWaitingQueue = async (department, inquiry) => { const queue = department || 'Public'; helperLogger.debug(`Processing items on queue ${queue}`); - const inquiry = LivechatInquiry.getNextInquiryQueued(department); - if (!inquiry) { - helperLogger.debug(`No items to process on queue ${queue}`); - return; - } helperLogger.debug(`Processing inquiry ${inquiry._id} from queue ${queue}`); const { defaultAgent } = inquiry; @@ -132,10 +127,14 @@ export const processWaitingQueue = async (department) => { servedBy: { _id: agentId }, } = room; helperLogger.debug(`Inquiry ${inquiry._id} taken successfully by agent ${agentId}. Notifying`); - return setTimeout(() => { + setTimeout(() => { propagateAgentDelegated(rid, agentId); }, 1000); + + return true; } + + return false; }; export const setPredictedVisitorAbandonmentTime = (room) => { diff --git a/apps/meteor/ee/app/livechat-enterprise/server/lib/LivechatEnterprise.js b/apps/meteor/ee/app/livechat-enterprise/server/lib/LivechatEnterprise.js index f84bfddc8a2..3e0049f63eb 100644 --- a/apps/meteor/ee/app/livechat-enterprise/server/lib/LivechatEnterprise.js +++ b/apps/meteor/ee/app/livechat-enterprise/server/lib/LivechatEnterprise.js @@ -1,6 +1,6 @@ import { Meteor } from 'meteor/meteor'; import { Match, check } from 'meteor/check'; -import { LivechatInquiry, OmnichannelQueue } from '@rocket.chat/models'; +import { LivechatInquiry } from '@rocket.chat/models'; import LivechatUnit from '../../../models/server/models/LivechatUnit'; import LivechatTag from '../../../models/server/models/LivechatTag'; @@ -229,14 +229,14 @@ const queueWorker = { const activeQueues = await this.getActiveQueues(); queueLogger.debug(`Active queues: ${activeQueues.length}`); - await OmnichannelQueue.initQueue(); this.running = true; return this.execute(); }, async stop() { queueLogger.debug('Stopping queue'); + await LivechatInquiry.unlockAll(); + this.running = false; - return OmnichannelQueue.stopQueue(); }, async getActiveQueues() { // undefined = public queue(without department) @@ -265,12 +265,16 @@ const queueWorker = { async checkQueue(queue) { queueLogger.debug(`Processing items for queue ${queue || 'Public'}`); try { - if (await OmnichannelQueue.lockQueue()) { - await processWaitingQueue(queue); - queueLogger.debug(`Queue ${queue || 'Public'} processed. Unlocking`); - await OmnichannelQueue.unlockQueue(); - } else { - queueLogger.debug('Queue locked. Waiting'); + const nextInquiry = await LivechatInquiry.findNextAndLock(queue); + if (!nextInquiry) { + queueLogger.debug(`No more items for queue ${queue || 'Public'}`); + return; + } + + const result = await processWaitingQueue(queue, nextInquiry); + + if (!result) { + await LivechatInquiry.unlock(nextInquiry._id); } } catch (e) { queueLogger.error({ diff --git a/apps/meteor/server/models/OmnichannelQueue.ts b/apps/meteor/server/models/OmnichannelQueue.ts deleted file mode 100644 index 9321b2bd6fa..00000000000 --- a/apps/meteor/server/models/OmnichannelQueue.ts +++ /dev/null @@ -1,7 +0,0 @@ -import { registerModel } from '@rocket.chat/models'; - -import { trashCollection } from '../database/trash'; -import { db } from '../database/utils'; -import { OmnichannelQueueRaw } from './raw/OmnichannelQueue'; - -registerModel('IOmnichannelQueueModel', new OmnichannelQueueRaw(db, trashCollection)); diff --git a/apps/meteor/server/models/raw/LivechatInquiry.ts b/apps/meteor/server/models/raw/LivechatInquiry.ts index eeabc0700d3..b499be73521 100644 --- a/apps/meteor/server/models/raw/LivechatInquiry.ts +++ b/apps/meteor/server/models/raw/LivechatInquiry.ts @@ -40,4 +40,52 @@ export class LivechatInquiryRaw extends BaseRaw implemen async setLastMessageByRoomId(rid: string, message: IMessage): Promise { return this.updateOne({ rid }, { $set: { lastMessage: message } }); } + + async findNextAndLock(department?: string): Promise { + const date = new Date(); + const result = await this.col.findOneAndUpdate( + { + status: LivechatInquiryStatus.QUEUED, + ...(department && { department }), + $or: [ + { + locked: true, + lockedAt: { + $lte: new Date(date.getTime() - 5000), + }, + }, + { + locked: false, + }, + { + locked: { $exists: false }, + }, + ], + }, + { + $set: { + locked: true, + // apply 5 secs lock lifetime + lockedAt: new Date(), + }, + }, + { + sort: { + queueOrder: 1, + estimatedWaitingTimeQueue: 1, + estimatedServiceTimeAt: 1, + }, + }, + ); + + return result.value; + } + + async unlock(inquiryId: string): Promise { + return this.updateOne({ _id: inquiryId }, { $unset: { locked: 1, lockedAt: 1 } }); + } + + async unlockAll(): Promise { + return this.updateMany({}, { $unset: { locked: 1, lockedAt: 1 } }); + } } diff --git a/apps/meteor/server/models/raw/OmnichannelQueue.ts b/apps/meteor/server/models/raw/OmnichannelQueue.ts deleted file mode 100644 index 1b02b093d1f..00000000000 --- a/apps/meteor/server/models/raw/OmnichannelQueue.ts +++ /dev/null @@ -1,104 +0,0 @@ -import type { IOmnichannelQueueStatus, RocketChatRecordDeleted } from '@rocket.chat/core-typings'; -import type { IOmnichannelQueueModel } from '@rocket.chat/model-typings'; -import type { Collection, Db } from 'mongodb'; -import { getCollectionName } from '@rocket.chat/models'; - -import { BaseRaw } from './BaseRaw'; - -const UNIQUE_QUEUE_ID = 'queue'; -export class OmnichannelQueueRaw extends BaseRaw implements IOmnichannelQueueModel { - constructor(db: Db, trash?: Collection>) { - super(db, getCollectionName('omnichannel_queue'), trash); - } - - initQueue(): any { - return this.col.updateOne( - { - _id: UNIQUE_QUEUE_ID, - }, - { - $unset: { - stoppedAt: 1, - }, - $set: { - startedAt: new Date(), - locked: false, - }, - }, - { - upsert: true, - }, - ); - } - - stopQueue(): any { - return this.col.updateOne( - { - _id: UNIQUE_QUEUE_ID, - }, - { - $set: { - stoppedAt: new Date(), - locked: false, - }, - }, - ); - } - - async lockQueue(): Promise { - const date = new Date(); - const result = await this.col.findOneAndUpdate( - { - _id: UNIQUE_QUEUE_ID, - $or: [ - { - locked: true, - lockedAt: { - $lte: new Date(date.getTime() - 5000), - }, - }, - { - locked: false, - }, - ], - }, - { - $set: { - locked: true, - // apply 5 secs lock lifetime - lockedAt: new Date(), - }, - }, - { - sort: { - _id: 1, - }, - }, - ); - - return result.value; - } - - async unlockQueue(): Promise { - const result = await this.col.findOneAndUpdate( - { - _id: UNIQUE_QUEUE_ID, - }, - { - $set: { - locked: false, - }, - $unset: { - lockedAt: 1, - }, - }, - { - sort: { - _id: 1, - }, - }, - ); - - return result.value; - } -} diff --git a/apps/meteor/server/models/startup.ts b/apps/meteor/server/models/startup.ts index 987deb8b2ea..0d2618b0251 100644 --- a/apps/meteor/server/models/startup.ts +++ b/apps/meteor/server/models/startup.ts @@ -32,7 +32,6 @@ import './Nps'; import './NpsVote'; import './OAuthApps'; import './OEmbedCache'; -import './OmnichannelQueue'; import './PbxEvents'; import './PushToken'; import './Permissions'; diff --git a/apps/meteor/server/startup/migrations/index.ts b/apps/meteor/server/startup/migrations/index.ts index a8e134993c7..6466e0d9ff2 100644 --- a/apps/meteor/server/startup/migrations/index.ts +++ b/apps/meteor/server/startup/migrations/index.ts @@ -97,4 +97,5 @@ import './v270'; import './v271'; import './v272'; import './v273'; +import './v274'; import './xrun'; diff --git a/apps/meteor/server/startup/migrations/v274.ts b/apps/meteor/server/startup/migrations/v274.ts new file mode 100644 index 00000000000..7a9fad44adc --- /dev/null +++ b/apps/meteor/server/startup/migrations/v274.ts @@ -0,0 +1,18 @@ +import { MongoInternals } from 'meteor/mongo'; + +import { addMigration } from '../../lib/migrations'; + +// Remove Deprecated Omnichannel Queue Collection +addMigration({ + version: 274, + async up() { + // Remove collection + try { + const { mongo } = MongoInternals.defaultRemoteCollectionDriver(); + await mongo.db.dropCollection('rocketchat_omnichannel_queue'); + } catch (e: any) { + // ignore + console.warn('Error deleting collection. Perhaps collection was already deleted?'); + } + }, +}); diff --git a/packages/core-typings/src/IInquiry.ts b/packages/core-typings/src/IInquiry.ts index 90cccbef6cf..bc53ca7c685 100644 --- a/packages/core-typings/src/IInquiry.ts +++ b/packages/core-typings/src/IInquiry.ts @@ -1,3 +1,4 @@ +import type { IUser } from './IUser'; import type { IMessage } from './IMessage'; import type { IRocketChatRecord } from './IRocketChatRecord'; @@ -33,5 +34,11 @@ export interface ILivechatInquiryRecord extends IRocketChatRecord { estimatedServiceTimeAt: string; department: string; estimatedInactivityCloseTimeAt: Date; + locked?: boolean; + lockedAt?: Date; lastMessage?: IMessage & { token?: string }; + defaultAgent?: { + agentId: IUser['_id']; + username?: IUser['username']; + }; } diff --git a/packages/model-typings/src/index.ts b/packages/model-typings/src/index.ts index e994857d7ac..cf73129041b 100644 --- a/packages/model-typings/src/index.ts +++ b/packages/model-typings/src/index.ts @@ -38,7 +38,6 @@ export * from './models/INpsModel'; export * from './models/INpsVoteModel'; export * from './models/IOAuthAppsModel'; export * from './models/IOEmbedCacheModel'; -export * from './models/IOmnichannelQueueModel'; export * from './models/IPbxEventsModel'; export * from './models/IPushTokenModel'; export * from './models/IPermissionsModel'; diff --git a/packages/model-typings/src/models/ILivechatInquiryModel.ts b/packages/model-typings/src/models/ILivechatInquiryModel.ts index d0b989d8459..bc9e9916834 100644 --- a/packages/model-typings/src/models/ILivechatInquiryModel.ts +++ b/packages/model-typings/src/models/ILivechatInquiryModel.ts @@ -12,4 +12,7 @@ export interface ILivechatInquiryModel extends IBaseModel; setDepartmentByInquiryId(inquiryId: string, department: string): Promise; setLastMessageByRoomId(rid: string, message: IMessage): Promise; + findNextAndLock(department?: string): Promise; + unlock(inquiryId: string): Promise; + unlockAll(): Promise; } diff --git a/packages/model-typings/src/models/IOmnichannelQueueModel.ts b/packages/model-typings/src/models/IOmnichannelQueueModel.ts deleted file mode 100644 index bcd4390eb12..00000000000 --- a/packages/model-typings/src/models/IOmnichannelQueueModel.ts +++ /dev/null @@ -1,10 +0,0 @@ -import type { IOmnichannelQueueStatus } from '@rocket.chat/core-typings'; - -import type { IBaseModel } from './IBaseModel'; - -export interface IOmnichannelQueueModel extends IBaseModel { - initQueue(): any; - stopQueue(): any; - lockQueue(): Promise; - unlockQueue(): Promise; -} diff --git a/packages/models/src/index.ts b/packages/models/src/index.ts index 2b50e685385..30db255dc6c 100644 --- a/packages/models/src/index.ts +++ b/packages/models/src/index.ts @@ -38,7 +38,6 @@ import type { INpsVoteModel, IOAuthAppsModel, IOEmbedCacheModel, - IOmnichannelQueueModel, IPbxEventsModel, IPushTokenModel, IPermissionsModel, @@ -110,7 +109,6 @@ export const Nps = proxify('INpsModel'); export const NpsVote = proxify('INpsVoteModel'); export const OAuthApps = proxify('IOAuthAppsModel'); export const OEmbedCache = proxify('IOEmbedCacheModel'); -export const OmnichannelQueue = proxify('IOmnichannelQueueModel'); export const PbxEvents = proxify('IPbxEventsModel'); export const PushToken = proxify('IPushTokenModel'); export const Permissions = proxify('IPermissionsModel');