diff --git a/app/lib/server/functions/setStatusText.js b/app/lib/server/functions/setStatusText.js index d6705468ab3..cfef155220e 100644 --- a/app/lib/server/functions/setStatusText.js +++ b/app/lib/server/functions/setStatusText.js @@ -21,7 +21,9 @@ export const _setStatusTextPromise = async function(userId, statusText) { await UsersRaw.updateStatusText(user._id, statusText); const { _id, username, status } = user; - api.broadcast('userpresence', { user: { _id, username, status, statusText } }); + api.broadcast('presence.status', { + user: { _id, username, status, statusText }, + }); return true; }; @@ -48,7 +50,9 @@ export const _setStatusText = function(userId, statusText) { user.statusText = statusText; const { _id, username, status } = user; - api.broadcast('userpresence', { user: { _id, username, status, statusText } }); + api.broadcast('presence.status', { + user: { _id, username, status, statusText }, + }); return true; }; diff --git a/app/models/server/raw/index.ts b/app/models/server/raw/index.ts index f161d4523fd..7c2a4d92ab5 100644 --- a/app/models/server/raw/index.ts +++ b/app/models/server/raw/index.ts @@ -55,7 +55,6 @@ import { UsersSessionsRaw } from './UsersSessions'; import UsersSessionsModel from '../models/UsersSessions'; import { ServerEventsRaw } from './ServerEvents'; import { trash } from '../models/_BaseDb'; -import { initWatchers } from '../../../../server/modules/watchers/watchers.module'; import LoginServiceConfigurationModel from '../models/LoginServiceConfiguration'; import { LoginServiceConfigurationRaw } from './LoginServiceConfiguration'; import { InstanceStatusRaw } from './InstanceStatus'; @@ -64,6 +63,8 @@ import { IntegrationHistoryRaw } from './IntegrationHistory'; import IntegrationHistoryModel from '../models/IntegrationHistory'; import OmnichannelQueueModel from '../models/OmnichannelQueue'; import { OmnichannelQueueRaw } from './OmnichannelQueue'; +import { api } from '../../../../server/sdk/api'; +import { initWatchers } from '../../../../server/modules/watchers/watchers.module'; const trashCollection = trash.rawCollection(); @@ -117,27 +118,30 @@ const map = { [Integrations.col.collectionName]: IntegrationsModel, }; -!process.env.DISABLE_DB_WATCH && initWatchers({ - Messages, - Users, - Subscriptions, - Settings, - LivechatInquiry, - LivechatDepartmentAgents, - UsersSessions, - Permissions, - Roles, - Rooms, - LoginServiceConfiguration, - InstanceStatus, - IntegrationHistory, - Integrations, -}, (model, fn) => { - const meteorModel = map[model.col.collectionName]; +if (!process.env.DISABLE_DB_WATCH) { + const models = { + Messages, + Users, + Subscriptions, + Settings, + LivechatInquiry, + LivechatDepartmentAgents, + UsersSessions, + Permissions, + Roles, + Rooms, + LoginServiceConfiguration, + InstanceStatus, + IntegrationHistory, + Integrations, + }; - if (!meteorModel) { - return; - } + initWatchers(models, api.broadcastLocal.bind(api), (model, fn) => { + const meteorModel = map[model.col.collectionName]; + if (!meteorModel) { + return; + } - meteorModel.on('change', fn); -}); + meteorModel.on('change', fn); + }); +} diff --git a/app/notifications/server/lib/Notifications.ts b/app/notifications/server/lib/Notifications.ts index a84a8f2d4d3..0bd8eaf7d05 100644 --- a/app/notifications/server/lib/Notifications.ts +++ b/app/notifications/server/lib/Notifications.ts @@ -3,7 +3,7 @@ import { Promise } from 'meteor/promise'; import { DDPCommon } from 'meteor/ddp-common'; import { NotificationsModule } from '../../../../server/modules/notifications/notifications.module'; -import { Streamer, StreamerCentral } from '../../../../server/modules/streamer/streamer.module'; +import { Streamer } from '../../../../server/modules/streamer/streamer.module'; import { api } from '../../../../server/sdk/api'; import { Subscriptions as SubscriptionsRaw, @@ -13,13 +13,13 @@ import { } from '../../../models/server/raw'; // TODO: Replace this in favor of the api.broadcast -StreamerCentral.on('broadcast', (name, eventName, args) => { - api.broadcast('stream', [ - name, - eventName, - args, - ]); -}); +// StreamerCentral.on('broadcast', (name, eventName, args) => { +// api.broadcast('stream', [ +// name, +// eventName, +// args, +// ]); +// }); export class Stream extends Streamer { registerPublication(name: string, fn: (eventName: string, options: boolean | {useCollection?: boolean; args?: any}) => void): void { @@ -51,4 +51,8 @@ notifications.configure({ Settings: SettingsRaw, }); +notifications.streamLocal.on('broadcast', ({ eventName, args }) => { + api.broadcastLocal(eventName, ...args); +}); + export default notifications; diff --git a/ee/server/broker.ts b/ee/server/broker.ts index 77c35e3fad0..536319fbb2b 100644 --- a/ee/server/broker.ts +++ b/ee/server/broker.ts @@ -193,6 +193,10 @@ class NetworkBroker implements IBroker { return this.broker.broadcast(event, args); } + async broadcastLocal(event: T, ...args: Parameters): Promise { + this.broker.broadcastLocal(event, args); + } + async nodeList(): Promise { return this.broker.call('$node.list'); } diff --git a/ee/server/services/presence/actions/setStatus.ts b/ee/server/services/presence/actions/setStatus.ts index 0396e28043c..b763ca37d88 100755 --- a/ee/server/services/presence/actions/setStatus.ts +++ b/ee/server/services/presence/actions/setStatus.ts @@ -33,8 +33,7 @@ export async function setStatus(uid: string, statusDefault: USER_STATUS, statusT if (result.modifiedCount > 0) { const user = await User.findOne(query, { projection: { username: 1 } }); - api.broadcast('userpresence', { - action: 'updated', + api.broadcast('presence.status', { user: { _id: uid, username: user?.username, status, statusText }, }); } diff --git a/ee/server/services/presence/actions/updateUserPresence.ts b/ee/server/services/presence/actions/updateUserPresence.ts index 805de4742dd..a7ac52f67d7 100755 --- a/ee/server/services/presence/actions/updateUserPresence.ts +++ b/ee/server/services/presence/actions/updateUserPresence.ts @@ -31,8 +31,7 @@ export async function updateUserPresence(uid: string): Promise { }); if (result.modifiedCount > 0) { - api.broadcast('userpresence', { - action: 'updated', + api.broadcast('presence.status', { user: { _id: uid, username: user.username, status, statusText: user.statusText }, }); } diff --git a/ee/server/services/stream-hub/StreamHub.ts b/ee/server/services/stream-hub/StreamHub.ts index b364fe1c182..bfa1c64644c 100755 --- a/ee/server/services/stream-hub/StreamHub.ts +++ b/ee/server/services/stream-hub/StreamHub.ts @@ -15,6 +15,7 @@ import { IntegrationHistoryRaw } from '../../../../app/models/server/raw/Integra import { LivechatDepartmentAgentsRaw } from '../../../app/models/server/raw/LivechatDepartmentAgents'; import { IntegrationsRaw } from '../../../../app/models/server/raw/Integrations'; import { PermissionsRaw } from '../../../../app/models/server/raw/Permissions'; +import { api } from '../../../../server/sdk/api'; export class StreamHub extends ServiceClass implements IServiceClass { protected name = 'hub'; @@ -58,7 +59,7 @@ export class StreamHub extends ServiceClass implements IServiceClass { Integrations, }; - initWatchers(models, (model, fn) => { + initWatchers(models, api.broadcast.bind(api), (model, fn) => { model.col.watch([]).on('change', (event) => { switch (event.operationType) { case 'insert': diff --git a/imports/users-presence/server/activeUsers.js b/imports/users-presence/server/activeUsers.js index f3225b933b8..5d93fb9769c 100644 --- a/imports/users-presence/server/activeUsers.js +++ b/imports/users-presence/server/activeUsers.js @@ -20,7 +20,9 @@ export const setUserStatus = (user, status/* , statusConnection*/) => { // since this callback can be called by only one instance in the cluster // we need to broadcast the change to all instances - api.broadcast('userpresence', { user: { status, _id, username, statusText } }); // remove username + api.broadcast('presence.status', { + user: { status, _id, username, statusText }, // TODO remove username + }); }; let TroubleshootDisablePresenceBroadcast; diff --git a/server/modules/listeners/listeners.module.ts b/server/modules/listeners/listeners.module.ts index 1ed5008128c..01cf4a8406f 100644 --- a/server/modules/listeners/listeners.module.ts +++ b/server/modules/listeners/listeners.module.ts @@ -77,7 +77,7 @@ export class ListenersModule { notifications.notifyLoggedInThisInstance('roles-change', update); }); - service.onEvent('userpresence', ({ user }) => { + service.onEvent('presence.status', ({ user }) => { const { _id, username, status, statusText, } = user; diff --git a/server/modules/notifications/notifications.module.ts b/server/modules/notifications/notifications.module.ts index 8ec868ea359..27c824c691d 100644 --- a/server/modules/notifications/notifications.module.ts +++ b/server/modules/notifications/notifications.module.ts @@ -48,6 +48,8 @@ export class NotificationsModule { public readonly streamRoomData: IStreamer; + public readonly streamLocal: IStreamer; + constructor( private Streamer: IStreamerConstructor, ) { @@ -88,6 +90,8 @@ export class NotificationsModule { }); this.streamUser = new this.Streamer('notify-user'); + + this.streamLocal = new this.Streamer('local'); } async configure({ Rooms, Subscriptions, Users, Settings }: IModelsParam): Promise { @@ -374,6 +378,11 @@ export class NotificationsModule { }); } }); + + this.streamLocal.serverOnly = true; + this.streamLocal.allowRead('none'); + this.streamLocal.allowEmit('all'); + this.streamLocal.allowWrite('none'); } notifyAll(eventName: string, ...args: any[]): void { diff --git a/server/modules/watchers/watchers.module.ts b/server/modules/watchers/watchers.module.ts index 038a50f1292..4e5e7fac482 100644 --- a/server/modules/watchers/watchers.module.ts +++ b/server/modules/watchers/watchers.module.ts @@ -11,7 +11,6 @@ import { IRole } from '../../../definition/IRole'; import { IRoom } from '../../../definition/IRoom'; import { IBaseRaw } from '../../../app/models/server/raw/BaseRaw'; import { LivechatInquiryRaw } from '../../../app/models/server/raw/LivechatInquiry'; -import { api } from '../../sdk/api'; import { IBaseData } from '../../../definition/IBaseData'; import { IPermission } from '../../../definition/IPermission'; import { ISetting } from '../../../definition/ISetting'; @@ -30,6 +29,7 @@ import { LivechatDepartmentAgentsRaw } from '../../../app/models/server/raw/Live import { ILivechatDepartmentAgents } from '../../../definition/ILivechatDepartmentAgents'; import { IIntegration } from '../../../definition/IIntegration'; import { IntegrationsRaw } from '../../../app/models/server/raw/Integrations'; +import { EventSignatures } from '../../sdk/lib/Events'; interface IModelsParam { Subscriptions: SubscriptionsRaw; @@ -59,22 +59,26 @@ interface IChange { type Watcher = (model: IBaseRaw, fn: (event: IChange) => void) => void; -export function initWatchers({ - Messages, - Users, - Settings, - Subscriptions, - UsersSessions, - Roles, - Permissions, - LivechatInquiry, - LivechatDepartmentAgents, - Rooms, - LoginServiceConfiguration, - InstanceStatus, - IntegrationHistory, - Integrations, -}: IModelsParam, watch: Watcher): void { +type BroadcastCallback = (event: T, ...args: Parameters) => Promise; + +export function initWatchers(models: IModelsParam, broadcast: BroadcastCallback, watch: Watcher): void { + const { + Messages, + Users, + Settings, + Subscriptions, + UsersSessions, + Roles, + Permissions, + LivechatInquiry, + LivechatDepartmentAgents, + Rooms, + LoginServiceConfiguration, + InstanceStatus, + IntegrationHistory, + Integrations, + } = models; + watch(Messages, async ({ clientAction, id, data }) => { switch (clientAction) { case 'inserted': @@ -101,7 +105,7 @@ export function initWatchers({ } } - api.broadcast('watch.messages', { clientAction, message }); + broadcast('watch.messages', { clientAction, message }); } break; } @@ -116,14 +120,14 @@ export function initWatchers({ if (!subscription) { return; } - api.broadcast('watch.subscriptions', { clientAction, subscription }); + broadcast('watch.subscriptions', { clientAction, subscription }); break; } case 'removed': { const trash = await Subscriptions.trashFindOneById(id, { projection: { u: 1, rid: 1 } }); const subscription = trash || { _id: id }; - api.broadcast('watch.subscriptions', { clientAction, subscription }); + broadcast('watch.subscriptions', { clientAction, subscription }); break; } } @@ -143,7 +147,7 @@ export function initWatchers({ return; } - api.broadcast('watch.roles', { + broadcast('watch.roles', { clientAction: clientAction !== 'removed' ? 'changed' : clientAction, role, }); @@ -158,10 +162,10 @@ export function initWatchers({ return; } - api.broadcast('watch.userSessions', { clientAction, userSession: data }); + broadcast('watch.userSessions', { clientAction, userSession: data }); break; case 'removed': - api.broadcast('watch.userSessions', { clientAction, userSession: { _id: id } }); + broadcast('watch.userSessions', { clientAction, userSession: { _id: id } }); break; } }); @@ -182,7 +186,7 @@ export function initWatchers({ return; } - api.broadcast('watch.inquiries', { clientAction, inquiry: data, diff }); + broadcast('watch.inquiries', { clientAction, inquiry: data, diff }); }); watch(LivechatDepartmentAgents, async ({ clientAction, id, data, diff }) => { @@ -191,7 +195,7 @@ export function initWatchers({ if (!data) { return; } - api.broadcast('watch.livechatDepartmentAgents', { clientAction, id, data, diff }); + broadcast('watch.livechatDepartmentAgents', { clientAction, id, data, diff }); return; } @@ -199,7 +203,7 @@ export function initWatchers({ if (!data) { return; } - api.broadcast('watch.livechatDepartmentAgents', { clientAction, id, data, diff }); + broadcast('watch.livechatDepartmentAgents', { clientAction, id, data, diff }); }); @@ -223,7 +227,7 @@ export function initWatchers({ return; } - api.broadcast('permission.changed', { clientAction, data }); + broadcast('permission.changed', { clientAction, data }); if (data.level === 'settings' && data.settingId) { // if the permission changes, the effect on the visible settings depends on the role affected. @@ -233,7 +237,7 @@ export function initWatchers({ if (!setting) { return; } - api.broadcast('watch.settings', { clientAction: 'updated', setting }); + broadcast('watch.settings', { clientAction: 'updated', setting }); } }); @@ -260,12 +264,12 @@ export function initWatchers({ return; } - api.broadcast('watch.settings', { clientAction, setting }); + broadcast('watch.settings', { clientAction, setting }); }); watch(Rooms, async ({ clientAction, id, data }) => { if (clientAction === 'removed') { - api.broadcast('watch.rooms', { clientAction, room: { _id: id } }); + broadcast('watch.rooms', { clientAction, room: { _id: id } }); return; } @@ -274,13 +278,13 @@ export function initWatchers({ return; } - api.broadcast('watch.rooms', { clientAction, room }); + broadcast('watch.rooms', { clientAction, room }); }); // TODO: Prevent flood from database on username change, what causes changes on all past messages from that user // and most of those messages are not loaded by the clients. watch(Users, ({ clientAction, id, data, diff, unset }) => { - api.broadcast('watch.users', { clientAction, data, diff, unset, id }); + broadcast('watch.users', { clientAction, data, diff, unset, id }); }); watch(LoginServiceConfiguration, async ({ clientAction, id }) => { @@ -289,11 +293,11 @@ export function initWatchers({ return; } - api.broadcast('watch.loginServiceConfiguration', { clientAction, data, id }); + broadcast('watch.loginServiceConfiguration', { clientAction, data, id }); }); watch(InstanceStatus, ({ clientAction, id, data, diff }) => { - api.broadcast('watch.instanceStatus', { clientAction, data, diff, id }); + broadcast('watch.instanceStatus', { clientAction, data, diff, id }); }); watch(IntegrationHistory, async ({ clientAction, id, data, diff }) => { @@ -304,14 +308,14 @@ export function initWatchers({ return; } data = history; - api.broadcast('watch.integrationHistory', { clientAction, data, diff, id }); + broadcast('watch.integrationHistory', { clientAction, data, diff, id }); break; } case 'inserted': { if (!data) { return; } - api.broadcast('watch.integrationHistory', { clientAction, data, diff, id }); + broadcast('watch.integrationHistory', { clientAction, data, diff, id }); break; } } @@ -319,7 +323,7 @@ export function initWatchers({ watch(Integrations, async ({ clientAction, id, data }) => { if (clientAction === 'removed') { - api.broadcast('watch.integrations', { clientAction, id, data: { _id: id } }); + broadcast('watch.integrations', { clientAction, id, data: { _id: id } }); return; } @@ -328,6 +332,6 @@ export function initWatchers({ return; } - api.broadcast('watch.integrations', { clientAction, data, id }); + broadcast('watch.integrations', { clientAction, data, id }); }); } diff --git a/server/sdk/lib/Api.ts b/server/sdk/lib/Api.ts index 9b646f479e5..7631341c1d8 100644 --- a/server/sdk/lib/Api.ts +++ b/server/sdk/lib/Api.ts @@ -47,4 +47,8 @@ export class Api { async broadcast(event: T, ...args: Parameters): Promise { return this.broker.broadcast(event, ...args); } + + async broadcastLocal(event: T, ...args: Parameters): Promise { + return this.broker.broadcastLocal(event, ...args); + } } diff --git a/server/sdk/lib/Events.ts b/server/sdk/lib/Events.ts index 38982672534..5b81503f3e3 100644 --- a/server/sdk/lib/Events.ts +++ b/server/sdk/lib/Events.ts @@ -35,7 +35,7 @@ export type EventSignatures = { 'user.nameChanged'(user: Partial): void; 'user.roleUpdate'(update: Record): void; 'user.updateCustomStatus'(userStatus: IUserStatus): void; - 'userpresence'(data: { action: string; user: Partial }): void; + 'presence.status'(data: { user: Partial }): void; 'watch.messages'(data: { clientAction: string; message: Partial }): void; 'watch.roles'(data: { clientAction: string; role: Partial }): void; 'watch.rooms'(data: { clientAction: string; room: Pick & Partial }): void; diff --git a/server/sdk/lib/LocalBroker.ts b/server/sdk/lib/LocalBroker.ts index 636e81e51f2..738018c8f1c 100644 --- a/server/sdk/lib/LocalBroker.ts +++ b/server/sdk/lib/LocalBroker.ts @@ -4,6 +4,7 @@ import { IBroker, IBrokerNode } from '../types/IBroker'; import { ServiceClass } from '../types/ServiceClass'; import { asyncLocalStorage } from '..'; import { EventSignatures } from './Events'; +import { StreamerCentral } from '../../modules/streamer/streamer.module'; export class LocalBroker implements IBroker { private methods = new Map(); @@ -46,7 +47,9 @@ export class LocalBroker implements IBroker { const namespace = instance.getName(); instance.getEvents().forEach((eventName) => { - this.events.on(eventName, instance.emit); + this.events.on(eventName, (...args) => { + instance.emit(eventName, ...args as Parameters); + }); }); const methods = instance.constructor?.name === 'Object' ? Object.getOwnPropertyNames(instance) : Object.getOwnPropertyNames(Object.getPrototypeOf(instance)); @@ -61,8 +64,13 @@ export class LocalBroker implements IBroker { } async broadcast(event: T, ...args: Parameters): Promise { - // Pass the event name twice to forward it to the instance's emit method - this.events.emit(event, event, ...args); + this.broadcastLocal(event, ...args); + + StreamerCentral.emit('broadcast', 'local', 'broadcast', [{ eventName: event, args }]); + } + + async broadcastLocal(event: T, ...args: Parameters): Promise { + this.events.emit(event, ...args); } async nodeList(): Promise { diff --git a/server/sdk/types/IBroker.ts b/server/sdk/types/IBroker.ts index b5c3eda737e..647c57c3035 100644 --- a/server/sdk/types/IBroker.ts +++ b/server/sdk/types/IBroker.ts @@ -52,5 +52,6 @@ export interface IBroker { call(method: string, data: any): Promise; waitAndCall(method: string, data: any): Promise; broadcast(event: T, ...args: Parameters): Promise; + broadcastLocal(event: T, ...args: Parameters): Promise; nodeList(): Promise; }