import { Meteor } from 'meteor/meteor'; import type { INotification, INotificationItemPush, INotificationItemEmail, NotificationItem, IUser } from '@rocket.chat/core-typings'; import { NotificationQueue, Users } from '@rocket.chat/models'; import { sendEmailFromData } from '../../lib/server/functions/notifications/email'; import { PushNotification } from '../../push-notifications/server'; import { SystemLogger } from '../../../server/lib/logger/system'; const { NOTIFICATIONS_WORKER_TIMEOUT = 2000, NOTIFICATIONS_BATCH_SIZE = 100, NOTIFICATIONS_SCHEDULE_DELAY_ONLINE = 120, NOTIFICATIONS_SCHEDULE_DELAY_AWAY = 0, NOTIFICATIONS_SCHEDULE_DELAY_OFFLINE = 0, } = process.env; class NotificationClass { private running = false; private cyclePause = Number(NOTIFICATIONS_WORKER_TIMEOUT); private maxBatchSize = Number(NOTIFICATIONS_BATCH_SIZE); private maxScheduleDelaySeconds: { [key: string]: number } = { online: Number(NOTIFICATIONS_SCHEDULE_DELAY_ONLINE), away: Number(NOTIFICATIONS_SCHEDULE_DELAY_AWAY), offline: Number(NOTIFICATIONS_SCHEDULE_DELAY_OFFLINE), }; initWorker(): void { this.running = true; this.executeWorkerLater(); } stopWorker(): void { this.running = false; } executeWorkerLater(): void { if (!this.running) { return; } setTimeout(async () => { try { await this.worker(); } catch (err) { SystemLogger.error({ msg: 'Error sending notification', err }); this.executeWorkerLater(); } }, this.cyclePause); } async worker(counter = 0): Promise { const notification = await this.getNextNotification(); if (!notification) { return this.executeWorkerLater(); } // Once we start notifying the user we anticipate all the schedules const flush = await NotificationQueue.clearScheduleByUserId(notification.uid); // start worker again it queue flushed if (flush.modifiedCount) { await NotificationQueue.unsetSendingById(notification._id); return this.worker(counter); } try { for await (const item of notification.items) { switch (item.type) { case 'push': await this.push(notification, item); break; case 'email': await this.email(item); break; } } await NotificationQueue.removeById(notification._id); } catch (e) { SystemLogger.error(e); await NotificationQueue.setErrorById(notification._id, e instanceof Error ? e.message : String(e)); } if (counter >= this.maxBatchSize) { return this.executeWorkerLater(); } await this.worker(counter++); } getNextNotification(): Promise { const expired = new Date(); expired.setMinutes(expired.getMinutes() - 5); return NotificationQueue.findNextInQueueOrExpired(expired); } async push({ uid, rid, mid }: INotification, item: INotificationItemPush): Promise { await PushNotification.send({ rid, uid, mid, ...item.data, }); } async email(item: INotificationItemEmail): Promise { return sendEmailFromData(item.data); } async scheduleItem({ uid, rid, mid, items, user, }: { uid: string; rid: string; mid: string; items: NotificationItem[]; user?: Partial; }): Promise { const receiver = user || (await Users.findOneById>(uid, { projection: { statusConnection: 1, }, })); if (!receiver) { return; } const { statusConnection = 'offline' } = receiver; let schedule: Date | undefined; const delay = this.maxScheduleDelaySeconds[statusConnection]; if (delay < 0) { return; } if (delay > 0) { schedule = new Date(); schedule.setSeconds(schedule.getSeconds() + delay); } await NotificationQueue.insertOne({ uid, rid, mid, ts: new Date(), schedule, items, }); } } export const Notification = new NotificationClass(); Meteor.startup(() => { Notification.initWorker(); });