Regression: Fix broadcast events when running as monolith (#19498)

* Notify user status to all instances

* Create a local broadcast stream
pull/19519/head
Diego Sampaio 5 years ago committed by GitHub
parent 91b0e4e42d
commit 48af214014
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      app/lib/server/functions/setStatusText.js
  2. 50
      app/models/server/raw/index.ts
  3. 20
      app/notifications/server/lib/Notifications.ts
  4. 4
      ee/server/broker.ts
  5. 3
      ee/server/services/presence/actions/setStatus.ts
  6. 3
      ee/server/services/presence/actions/updateUserPresence.ts
  7. 3
      ee/server/services/stream-hub/StreamHub.ts
  8. 4
      imports/users-presence/server/activeUsers.js
  9. 2
      server/modules/listeners/listeners.module.ts
  10. 9
      server/modules/notifications/notifications.module.ts
  11. 80
      server/modules/watchers/watchers.module.ts
  12. 4
      server/sdk/lib/Api.ts
  13. 2
      server/sdk/lib/Events.ts
  14. 14
      server/sdk/lib/LocalBroker.ts
  15. 1
      server/sdk/types/IBroker.ts

@ -21,7 +21,9 @@ export const _setStatusTextPromise = async function(userId, statusText) {
await UsersRaw.updateStatusText(user._id, statusText);
const { _id, username, status } = user;
api.broadcast('userpresence', { user: { _id, username, status, statusText } });
api.broadcast('presence.status', {
user: { _id, username, status, statusText },
});
return true;
};
@ -48,7 +50,9 @@ export const _setStatusText = function(userId, statusText) {
user.statusText = statusText;
const { _id, username, status } = user;
api.broadcast('userpresence', { user: { _id, username, status, statusText } });
api.broadcast('presence.status', {
user: { _id, username, status, statusText },
});
return true;
};

@ -55,7 +55,6 @@ import { UsersSessionsRaw } from './UsersSessions';
import UsersSessionsModel from '../models/UsersSessions';
import { ServerEventsRaw } from './ServerEvents';
import { trash } from '../models/_BaseDb';
import { initWatchers } from '../../../../server/modules/watchers/watchers.module';
import LoginServiceConfigurationModel from '../models/LoginServiceConfiguration';
import { LoginServiceConfigurationRaw } from './LoginServiceConfiguration';
import { InstanceStatusRaw } from './InstanceStatus';
@ -64,6 +63,8 @@ import { IntegrationHistoryRaw } from './IntegrationHistory';
import IntegrationHistoryModel from '../models/IntegrationHistory';
import OmnichannelQueueModel from '../models/OmnichannelQueue';
import { OmnichannelQueueRaw } from './OmnichannelQueue';
import { api } from '../../../../server/sdk/api';
import { initWatchers } from '../../../../server/modules/watchers/watchers.module';
const trashCollection = trash.rawCollection();
@ -117,27 +118,30 @@ const map = {
[Integrations.col.collectionName]: IntegrationsModel,
};
!process.env.DISABLE_DB_WATCH && initWatchers({
Messages,
Users,
Subscriptions,
Settings,
LivechatInquiry,
LivechatDepartmentAgents,
UsersSessions,
Permissions,
Roles,
Rooms,
LoginServiceConfiguration,
InstanceStatus,
IntegrationHistory,
Integrations,
}, (model, fn) => {
const meteorModel = map[model.col.collectionName];
if (!process.env.DISABLE_DB_WATCH) {
const models = {
Messages,
Users,
Subscriptions,
Settings,
LivechatInquiry,
LivechatDepartmentAgents,
UsersSessions,
Permissions,
Roles,
Rooms,
LoginServiceConfiguration,
InstanceStatus,
IntegrationHistory,
Integrations,
};
if (!meteorModel) {
return;
}
initWatchers(models, api.broadcastLocal.bind(api), (model, fn) => {
const meteorModel = map[model.col.collectionName];
if (!meteorModel) {
return;
}
meteorModel.on('change', fn);
});
meteorModel.on('change', fn);
});
}

@ -3,7 +3,7 @@ import { Promise } from 'meteor/promise';
import { DDPCommon } from 'meteor/ddp-common';
import { NotificationsModule } from '../../../../server/modules/notifications/notifications.module';
import { Streamer, StreamerCentral } from '../../../../server/modules/streamer/streamer.module';
import { Streamer } from '../../../../server/modules/streamer/streamer.module';
import { api } from '../../../../server/sdk/api';
import {
Subscriptions as SubscriptionsRaw,
@ -13,13 +13,13 @@ import {
} from '../../../models/server/raw';
// TODO: Replace this in favor of the api.broadcast
StreamerCentral.on('broadcast', (name, eventName, args) => {
api.broadcast('stream', [
name,
eventName,
args,
]);
});
// StreamerCentral.on('broadcast', (name, eventName, args) => {
// api.broadcast('stream', [
// name,
// eventName,
// args,
// ]);
// });
export class Stream extends Streamer {
registerPublication(name: string, fn: (eventName: string, options: boolean | {useCollection?: boolean; args?: any}) => void): void {
@ -51,4 +51,8 @@ notifications.configure({
Settings: SettingsRaw,
});
notifications.streamLocal.on('broadcast', ({ eventName, args }) => {
api.broadcastLocal(eventName, ...args);
});
export default notifications;

@ -193,6 +193,10 @@ class NetworkBroker implements IBroker {
return this.broker.broadcast(event, args);
}
async broadcastLocal<T extends keyof EventSignatures>(event: T, ...args: Parameters<EventSignatures[T]>): Promise<void> {
this.broker.broadcastLocal(event, args);
}
async nodeList(): Promise<IBrokerNode[]> {
return this.broker.call('$node.list');
}

@ -33,8 +33,7 @@ export async function setStatus(uid: string, statusDefault: USER_STATUS, statusT
if (result.modifiedCount > 0) {
const user = await User.findOne<IUser>(query, { projection: { username: 1 } });
api.broadcast('userpresence', {
action: 'updated',
api.broadcast('presence.status', {
user: { _id: uid, username: user?.username, status, statusText },
});
}

@ -31,8 +31,7 @@ export async function updateUserPresence(uid: string): Promise<void> {
});
if (result.modifiedCount > 0) {
api.broadcast('userpresence', {
action: 'updated',
api.broadcast('presence.status', {
user: { _id: uid, username: user.username, status, statusText: user.statusText },
});
}

@ -15,6 +15,7 @@ import { IntegrationHistoryRaw } from '../../../../app/models/server/raw/Integra
import { LivechatDepartmentAgentsRaw } from '../../../app/models/server/raw/LivechatDepartmentAgents';
import { IntegrationsRaw } from '../../../../app/models/server/raw/Integrations';
import { PermissionsRaw } from '../../../../app/models/server/raw/Permissions';
import { api } from '../../../../server/sdk/api';
export class StreamHub extends ServiceClass implements IServiceClass {
protected name = 'hub';
@ -58,7 +59,7 @@ export class StreamHub extends ServiceClass implements IServiceClass {
Integrations,
};
initWatchers(models, (model, fn) => {
initWatchers(models, api.broadcast.bind(api), (model, fn) => {
model.col.watch([]).on('change', (event) => {
switch (event.operationType) {
case 'insert':

@ -20,7 +20,9 @@ export const setUserStatus = (user, status/* , statusConnection*/) => {
// since this callback can be called by only one instance in the cluster
// we need to broadcast the change to all instances
api.broadcast('userpresence', { user: { status, _id, username, statusText } }); // remove username
api.broadcast('presence.status', {
user: { status, _id, username, statusText }, // TODO remove username
});
};
let TroubleshootDisablePresenceBroadcast;

@ -77,7 +77,7 @@ export class ListenersModule {
notifications.notifyLoggedInThisInstance('roles-change', update);
});
service.onEvent('userpresence', ({ user }) => {
service.onEvent('presence.status', ({ user }) => {
const {
_id, username, status, statusText,
} = user;

@ -48,6 +48,8 @@ export class NotificationsModule {
public readonly streamRoomData: IStreamer;
public readonly streamLocal: IStreamer;
constructor(
private Streamer: IStreamerConstructor,
) {
@ -88,6 +90,8 @@ export class NotificationsModule {
});
this.streamUser = new this.Streamer('notify-user');
this.streamLocal = new this.Streamer('local');
}
async configure({ Rooms, Subscriptions, Users, Settings }: IModelsParam): Promise<void> {
@ -374,6 +378,11 @@ export class NotificationsModule {
});
}
});
this.streamLocal.serverOnly = true;
this.streamLocal.allowRead('none');
this.streamLocal.allowEmit('all');
this.streamLocal.allowWrite('none');
}
notifyAll(eventName: string, ...args: any[]): void {

@ -11,7 +11,6 @@ import { IRole } from '../../../definition/IRole';
import { IRoom } from '../../../definition/IRoom';
import { IBaseRaw } from '../../../app/models/server/raw/BaseRaw';
import { LivechatInquiryRaw } from '../../../app/models/server/raw/LivechatInquiry';
import { api } from '../../sdk/api';
import { IBaseData } from '../../../definition/IBaseData';
import { IPermission } from '../../../definition/IPermission';
import { ISetting } from '../../../definition/ISetting';
@ -30,6 +29,7 @@ import { LivechatDepartmentAgentsRaw } from '../../../app/models/server/raw/Live
import { ILivechatDepartmentAgents } from '../../../definition/ILivechatDepartmentAgents';
import { IIntegration } from '../../../definition/IIntegration';
import { IntegrationsRaw } from '../../../app/models/server/raw/Integrations';
import { EventSignatures } from '../../sdk/lib/Events';
interface IModelsParam {
Subscriptions: SubscriptionsRaw;
@ -59,22 +59,26 @@ interface IChange<T> {
type Watcher = <T extends IBaseData>(model: IBaseRaw<T>, fn: (event: IChange<T>) => void) => void;
export function initWatchers({
Messages,
Users,
Settings,
Subscriptions,
UsersSessions,
Roles,
Permissions,
LivechatInquiry,
LivechatDepartmentAgents,
Rooms,
LoginServiceConfiguration,
InstanceStatus,
IntegrationHistory,
Integrations,
}: IModelsParam, watch: Watcher): void {
type BroadcastCallback = <T extends keyof EventSignatures>(event: T, ...args: Parameters<EventSignatures[T]>) => Promise<void>;
export function initWatchers(models: IModelsParam, broadcast: BroadcastCallback, watch: Watcher): void {
const {
Messages,
Users,
Settings,
Subscriptions,
UsersSessions,
Roles,
Permissions,
LivechatInquiry,
LivechatDepartmentAgents,
Rooms,
LoginServiceConfiguration,
InstanceStatus,
IntegrationHistory,
Integrations,
} = models;
watch<IMessage>(Messages, async ({ clientAction, id, data }) => {
switch (clientAction) {
case 'inserted':
@ -101,7 +105,7 @@ export function initWatchers({
}
}
api.broadcast('watch.messages', { clientAction, message });
broadcast('watch.messages', { clientAction, message });
}
break;
}
@ -116,14 +120,14 @@ export function initWatchers({
if (!subscription) {
return;
}
api.broadcast('watch.subscriptions', { clientAction, subscription });
broadcast('watch.subscriptions', { clientAction, subscription });
break;
}
case 'removed': {
const trash = await Subscriptions.trashFindOneById(id, { projection: { u: 1, rid: 1 } });
const subscription = trash || { _id: id };
api.broadcast('watch.subscriptions', { clientAction, subscription });
broadcast('watch.subscriptions', { clientAction, subscription });
break;
}
}
@ -143,7 +147,7 @@ export function initWatchers({
return;
}
api.broadcast('watch.roles', {
broadcast('watch.roles', {
clientAction: clientAction !== 'removed' ? 'changed' : clientAction,
role,
});
@ -158,10 +162,10 @@ export function initWatchers({
return;
}
api.broadcast('watch.userSessions', { clientAction, userSession: data });
broadcast('watch.userSessions', { clientAction, userSession: data });
break;
case 'removed':
api.broadcast('watch.userSessions', { clientAction, userSession: { _id: id } });
broadcast('watch.userSessions', { clientAction, userSession: { _id: id } });
break;
}
});
@ -182,7 +186,7 @@ export function initWatchers({
return;
}
api.broadcast('watch.inquiries', { clientAction, inquiry: data, diff });
broadcast('watch.inquiries', { clientAction, inquiry: data, diff });
});
watch<ILivechatDepartmentAgents>(LivechatDepartmentAgents, async ({ clientAction, id, data, diff }) => {
@ -191,7 +195,7 @@ export function initWatchers({
if (!data) {
return;
}
api.broadcast('watch.livechatDepartmentAgents', { clientAction, id, data, diff });
broadcast('watch.livechatDepartmentAgents', { clientAction, id, data, diff });
return;
}
@ -199,7 +203,7 @@ export function initWatchers({
if (!data) {
return;
}
api.broadcast('watch.livechatDepartmentAgents', { clientAction, id, data, diff });
broadcast('watch.livechatDepartmentAgents', { clientAction, id, data, diff });
});
@ -223,7 +227,7 @@ export function initWatchers({
return;
}
api.broadcast('permission.changed', { clientAction, data });
broadcast('permission.changed', { clientAction, data });
if (data.level === 'settings' && data.settingId) {
// if the permission changes, the effect on the visible settings depends on the role affected.
@ -233,7 +237,7 @@ export function initWatchers({
if (!setting) {
return;
}
api.broadcast('watch.settings', { clientAction: 'updated', setting });
broadcast('watch.settings', { clientAction: 'updated', setting });
}
});
@ -260,12 +264,12 @@ export function initWatchers({
return;
}
api.broadcast('watch.settings', { clientAction, setting });
broadcast('watch.settings', { clientAction, setting });
});
watch<IRoom>(Rooms, async ({ clientAction, id, data }) => {
if (clientAction === 'removed') {
api.broadcast('watch.rooms', { clientAction, room: { _id: id } });
broadcast('watch.rooms', { clientAction, room: { _id: id } });
return;
}
@ -274,13 +278,13 @@ export function initWatchers({
return;
}
api.broadcast('watch.rooms', { clientAction, room });
broadcast('watch.rooms', { clientAction, room });
});
// TODO: Prevent flood from database on username change, what causes changes on all past messages from that user
// and most of those messages are not loaded by the clients.
watch<IUser>(Users, ({ clientAction, id, data, diff, unset }) => {
api.broadcast('watch.users', { clientAction, data, diff, unset, id });
broadcast('watch.users', { clientAction, data, diff, unset, id });
});
watch<ILoginServiceConfiguration>(LoginServiceConfiguration, async ({ clientAction, id }) => {
@ -289,11 +293,11 @@ export function initWatchers({
return;
}
api.broadcast('watch.loginServiceConfiguration', { clientAction, data, id });
broadcast('watch.loginServiceConfiguration', { clientAction, data, id });
});
watch<IInstanceStatus>(InstanceStatus, ({ clientAction, id, data, diff }) => {
api.broadcast('watch.instanceStatus', { clientAction, data, diff, id });
broadcast('watch.instanceStatus', { clientAction, data, diff, id });
});
watch<IIntegrationHistory>(IntegrationHistory, async ({ clientAction, id, data, diff }) => {
@ -304,14 +308,14 @@ export function initWatchers({
return;
}
data = history;
api.broadcast('watch.integrationHistory', { clientAction, data, diff, id });
broadcast('watch.integrationHistory', { clientAction, data, diff, id });
break;
}
case 'inserted': {
if (!data) {
return;
}
api.broadcast('watch.integrationHistory', { clientAction, data, diff, id });
broadcast('watch.integrationHistory', { clientAction, data, diff, id });
break;
}
}
@ -319,7 +323,7 @@ export function initWatchers({
watch<IIntegration>(Integrations, async ({ clientAction, id, data }) => {
if (clientAction === 'removed') {
api.broadcast('watch.integrations', { clientAction, id, data: { _id: id } });
broadcast('watch.integrations', { clientAction, id, data: { _id: id } });
return;
}
@ -328,6 +332,6 @@ export function initWatchers({
return;
}
api.broadcast('watch.integrations', { clientAction, data, id });
broadcast('watch.integrations', { clientAction, data, id });
});
}

@ -47,4 +47,8 @@ export class Api {
async broadcast<T extends keyof EventSignatures>(event: T, ...args: Parameters<EventSignatures[T]>): Promise<void> {
return this.broker.broadcast(event, ...args);
}
async broadcastLocal<T extends keyof EventSignatures>(event: T, ...args: Parameters<EventSignatures[T]>): Promise<void> {
return this.broker.broadcastLocal(event, ...args);
}
}

@ -35,7 +35,7 @@ export type EventSignatures = {
'user.nameChanged'(user: Partial<IUser>): void;
'user.roleUpdate'(update: Record<string, any>): void;
'user.updateCustomStatus'(userStatus: IUserStatus): void;
'userpresence'(data: { action: string; user: Partial<IUser> }): void;
'presence.status'(data: { user: Partial<IUser> }): void;
'watch.messages'(data: { clientAction: string; message: Partial<IMessage> }): void;
'watch.roles'(data: { clientAction: string; role: Partial<IRole> }): void;
'watch.rooms'(data: { clientAction: string; room: Pick<IRoom, '_id'> & Partial<IRoom> }): void;

@ -4,6 +4,7 @@ import { IBroker, IBrokerNode } from '../types/IBroker';
import { ServiceClass } from '../types/ServiceClass';
import { asyncLocalStorage } from '..';
import { EventSignatures } from './Events';
import { StreamerCentral } from '../../modules/streamer/streamer.module';
export class LocalBroker implements IBroker {
private methods = new Map<string, Function>();
@ -46,7 +47,9 @@ export class LocalBroker implements IBroker {
const namespace = instance.getName();
instance.getEvents().forEach((eventName) => {
this.events.on(eventName, instance.emit);
this.events.on(eventName, (...args) => {
instance.emit(eventName, ...args as Parameters<EventSignatures[typeof eventName]>);
});
});
const methods = instance.constructor?.name === 'Object' ? Object.getOwnPropertyNames(instance) : Object.getOwnPropertyNames(Object.getPrototypeOf(instance));
@ -61,8 +64,13 @@ export class LocalBroker implements IBroker {
}
async broadcast<T extends keyof EventSignatures>(event: T, ...args: Parameters<EventSignatures[T]>): Promise<void> {
// Pass the event name twice to forward it to the instance's emit method
this.events.emit(event, event, ...args);
this.broadcastLocal(event, ...args);
StreamerCentral.emit('broadcast', 'local', 'broadcast', [{ eventName: event, args }]);
}
async broadcastLocal<T extends keyof EventSignatures>(event: T, ...args: Parameters<EventSignatures[T]>): Promise<void> {
this.events.emit(event, ...args);
}
async nodeList(): Promise<IBrokerNode[]> {

@ -52,5 +52,6 @@ export interface IBroker {
call(method: string, data: any): Promise<any>;
waitAndCall(method: string, data: any): Promise<any>;
broadcast<T extends keyof EventSignatures>(event: T, ...args: Parameters<EventSignatures[T]>): Promise<void>;
broadcastLocal<T extends keyof EventSignatures>(event: T, ...args: Parameters<EventSignatures[T]>): Promise<void>;
nodeList(): Promise<IBrokerNode[]>;
}

Loading…
Cancel
Save