[FIX][ENTERPRISE] Notifications not being sent by ddp-streamer (#24831)

pull/24911/head
Diego Sampaio 4 years ago committed by GitHub
parent b3f4977a04
commit dfb2c31989
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      app/apps/server/bridges/uiInteraction.ts
  2. 6
      app/custom-sounds/server/methods/deleteCustomSound.js
  3. 6
      app/custom-sounds/server/methods/insertOrUpdateSound.js
  4. 8
      app/custom-sounds/server/methods/uploadCustomSound.js
  5. 4
      app/e2e/server/index.js
  6. 6
      app/e2e/server/methods/requestSubscriptionKeys.js
  7. 4
      app/federation/server/endpoints/dispatch.js
  8. 9
      app/invites/server/functions/findOrCreateInvite.js
  9. 4
      app/lib/server/functions/cleanRoomHistory.ts
  10. 4
      app/lib/server/functions/deleteMessage.ts
  11. 16
      app/lib/server/functions/notifications/desktop.js
  12. 4
      app/livechat/server/lib/messageTypes.js
  13. 5
      app/notifications/server/lib/Notifications.ts
  14. 7
      app/webdav/server/methods/addWebdavAccount.ts
  15. 4
      app/webdav/server/methods/removeWebdavAccount.ts
  16. 21
      definition/INotification.ts
  17. 2
      definition/IRoom.ts
  18. 2
      ee/server/services/ddp-streamer/DDPStreamer.ts
  19. 11
      ee/server/services/ddp-streamer/Streamer.ts
  20. 1
      ee/server/services/package.json
  21. 36
      server/modules/listeners/listeners.module.ts
  22. 9
      server/modules/notifications/notifications.module.ts
  23. 55
      server/sdk/lib/Events.ts
  24. 1
      server/services/omnichannel-voip/service.ts
  25. 1
      server/stream/streamBroadcast.js

@ -2,7 +2,7 @@ import { UiInteractionBridge as UiIntBridge } from '@rocket.chat/apps-engine/ser
import { IUIKitInteraction } from '@rocket.chat/apps-engine/definition/uikit';
import { IUser } from '@rocket.chat/apps-engine/definition/users';
import { Notifications } from '../../../notifications/server';
import { api } from '../../../../server/sdk/api';
import { AppServerOrchestrator } from '../orchestrator';
export class UiInteractionBridge extends UiIntBridge {
@ -20,6 +20,6 @@ export class UiInteractionBridge extends UiIntBridge {
throw new Error('Invalid app provided');
}
Notifications.notifyUser(user.id, 'uiInteraction', interaction);
api.broadcast('notify.uiInteraction', user.id, interaction);
}
}

@ -1,8 +1,8 @@
import { Meteor } from 'meteor/meteor';
import { CustomSounds } from '../../../models/server/raw';
import { hasPermission } from '../../../authorization';
import { Notifications } from '../../../notifications';
import { hasPermission } from '../../../authorization/server';
import { api } from '../../../../server/sdk/api';
import { RocketChatFileCustomSoundsInstance } from '../startup/custom-sounds';
Meteor.methods({
@ -23,7 +23,7 @@ Meteor.methods({
RocketChatFileCustomSoundsInstance.deleteFile(`${sound._id}.${sound.extension}`);
await CustomSounds.removeById(_id);
Notifications.notifyAll('deleteCustomSound', { soundData: sound });
api.broadcast('notify.deleteCustomSound', { soundData: sound });
return true;
},

@ -2,9 +2,9 @@ import { Meteor } from 'meteor/meteor';
import s from 'underscore.string';
import { check } from 'meteor/check';
import { hasPermission } from '../../../authorization';
import { hasPermission } from '../../../authorization/server';
import { CustomSounds } from '../../../models/server/raw';
import { Notifications } from '../../../notifications';
import { api } from '../../../../server/sdk/api';
import { RocketChatFileCustomSoundsInstance } from '../startup/custom-sounds';
Meteor.methods({
@ -71,7 +71,7 @@ Meteor.methods({
if (soundData.name !== soundData.previousName) {
await CustomSounds.setName(soundData._id, soundData.name);
Notifications.notifyAll('updateCustomSound', { soundData });
api.broadcast('notify.updateCustomSound', { soundData });
}
return soundData._id;

@ -1,8 +1,8 @@
import { Meteor } from 'meteor/meteor';
import { hasPermission } from '../../../authorization';
import { Notifications } from '../../../notifications';
import { RocketChatFile } from '../../../file';
import { hasPermission } from '../../../authorization/server';
import { api } from '../../../../server/sdk/api';
import { RocketChatFile } from '../../../file/server';
import { RocketChatFileCustomSoundsInstance } from '../startup/custom-sounds';
Meteor.methods({
@ -18,7 +18,7 @@ Meteor.methods({
const ws = RocketChatFileCustomSoundsInstance.createWriteStream(`${soundData._id}.${soundData.extension}`, contentType);
ws.on(
'end',
Meteor.bindEnvironment(() => Meteor.setTimeout(() => Notifications.notifyAll('updateCustomSound', { soundData }), 500)),
setTimeout(() => api.broadcast('notify.updateCustomSound', { soundData }), 500),
);
rs.pipe(ws);

@ -1,5 +1,5 @@
import { callbacks } from '../../../lib/callbacks';
import { Notifications } from '../../notifications';
import { api } from '../../../server/sdk/api';
import './settings';
import './beforeCreateRoom';
@ -14,7 +14,7 @@ import './methods/requestSubscriptionKeys';
callbacks.add(
'afterJoinRoom',
(user, room) => {
Notifications.notifyRoom('e2e.keyRequest', room._id, room.e2eKeyId);
api.broadcast('notify.e2e.keyRequest', room._id, room.e2eKeyId);
},
callbacks.priority.MEDIUM,
'e2e',

@ -1,7 +1,7 @@
import { Meteor } from 'meteor/meteor';
import { Subscriptions, Rooms } from '../../../models';
import { Notifications } from '../../../notifications';
import { Subscriptions, Rooms } from '../../../models/server';
import { api } from '../../../../server/sdk/api';
Meteor.methods({
'e2e.requestSubscriptionKeys'() {
@ -27,7 +27,7 @@ Meteor.methods({
const rooms = Rooms.find(query);
rooms.forEach((room) => {
Notifications.notifyRoom('e2e.keyRequest', room._id, room.e2eKeyId);
api.broadcast('notify.e2e.keyRequest', room._id, room.e2eKeyId);
});
return true;

@ -7,7 +7,7 @@ import { FederationRoomEvents, Messages, Rooms, Subscriptions, Users } from '../
import { FederationServers } from '../../../models/server/raw';
import { normalizers } from '../normalizers';
import { deleteRoom } from '../../../lib/server/functions';
import { Notifications } from '../../../notifications/server';
import { api } from '../../../../server/sdk/api';
import { FileUpload } from '../../../file-upload';
import { getFederationDomain } from '../lib/getFederationDomain';
import { decryptIfNeeded } from '../lib/crypt';
@ -327,7 +327,7 @@ const eventHandlers = {
Messages.removeById(messageId);
// Notify the room
Notifications.notifyRoom(roomId, 'deleteMessage', { _id: messageId });
api.broadcast('notify.deleteMessage', roomId, { _id: messageId });
}
return eventResult;

@ -1,11 +1,11 @@
import { Meteor } from 'meteor/meteor';
import { Random } from 'meteor/random';
import { hasPermission } from '../../../authorization';
import { Notifications } from '../../../notifications';
import { hasPermission } from '../../../authorization/server';
import { api } from '../../../../server/sdk/api';
import { Subscriptions, Rooms } from '../../../models/server';
import { Invites } from '../../../models/server/raw';
import { settings } from '../../../settings';
import { settings } from '../../../settings/server';
import { getURL } from '../../../utils/lib/getURL';
import { RoomMemberActions } from '../../../../definition/IRoomTypeConfig';
import { roomCoordinator } from '../../../../server/lib/rooms/roomCoordinator';
@ -99,7 +99,8 @@ export const findOrCreateInvite = async (userId, invite) => {
};
await Invites.insertOne(createInvite);
Notifications.notifyUser(userId, 'updateInvites', { invite: createInvite });
api.broadcast('notify.updateInvites', userId, { invite: createInvite });
createInvite.url = getInviteUrl(createInvite);
return createInvite;

@ -4,7 +4,7 @@ import { TAPi18n } from 'meteor/rocketchat:tap-i18n';
import { deleteRoom } from './deleteRoom';
import { FileUpload } from '../../../file-upload/server';
import { Messages, Rooms, Subscriptions } from '../../../models/server';
import { Notifications } from '../../../notifications/server';
import { api } from '../../../../server/sdk/api';
import { IMessage, IMessageDiscussion } from '../../../../definition/IMessage';
export const cleanRoomHistory = function ({
@ -68,7 +68,7 @@ export const cleanRoomHistory = function ({
const count = Messages.removeByIdPinnedTimestampLimitAndUsers(rid, excludePinned, ignoreDiscussion, ts, limit, fromUsers, ignoreThreads);
if (count) {
Rooms.resetLastMessageById(rid);
Notifications.notifyRoom(rid, 'deleteMessageBulk', {
api.broadcast('notify.deleteMessageBulk', rid, {
rid,
excludePinned,
ignoreDiscussion,

@ -4,7 +4,7 @@ import { FileUpload } from '../../../file-upload/server';
import { settings } from '../../../settings/server';
import { Messages, Rooms } from '../../../models/server';
import { Uploads } from '../../../models/server/raw';
import { Notifications } from '../../../notifications/server';
import { api } from '../../../../server/sdk/api';
import { callbacks } from '../../../../lib/callbacks';
import { Apps } from '../../../apps/server';
import { IMessage } from '../../../../definition/IMessage';
@ -66,7 +66,7 @@ export const deleteMessage = async function (message: IMessage, user: IUser): Pr
if (showDeletedStatus) {
Messages.setAsDeletedByIdAndUser(message._id, user);
} else {
Notifications.notifyRoom(message.rid, 'deleteMessage', { _id: message._id });
api.broadcast('notify.deleteMessage', message.rid, { _id: message._id });
}
if (bridges) {

@ -1,7 +1,8 @@
import { metrics } from '../../../../metrics';
import { settings } from '../../../../settings';
import { Notifications } from '../../../../notifications';
import { roomCoordinator } from '../../../../../server/lib/rooms/roomCoordinator';
import { api } from '../../../../../server/sdk/api';
import { metrics } from '../../../../metrics/server';
import { settings } from '../../../../settings/server';
/**
* Send notification to user
*
@ -15,8 +16,7 @@ import { roomCoordinator } from '../../../../../server/lib/rooms/roomCoordinator
export function notifyDesktopUser({ userId, user, message, room, duration, notificationMessage }) {
const { title, text } = roomCoordinator.getRoomDirectives(room.t)?.getNotificationDetails(room, user, notificationMessage, userId);
metrics.notificationsSent.inc({ notification_type: 'desktop' });
Notifications.notifyUser(userId, 'notification', {
const payload = {
title,
text,
duration,
@ -32,7 +32,11 @@ export function notifyDesktopUser({ userId, user, message, room, duration, notif
t: message.t,
},
},
});
};
metrics.notificationsSent.inc({ notification_type: 'desktop' });
api.broadcast('notify.desktop', userId, payload);
}
export function shouldNotifyDesktop({

@ -2,7 +2,7 @@ import { Meteor } from 'meteor/meteor';
import { TAPi18n } from 'meteor/rocketchat:tap-i18n';
import { actionLinks } from '../../../action-links/server';
import { Notifications } from '../../../notifications/server';
import { api } from '../../../../server/sdk/api';
import { Messages, LivechatRooms } from '../../../models/server';
import { settings } from '../../../settings/server';
import { Livechat } from './Livechat';
@ -11,7 +11,7 @@ actionLinks.register('denyLivechatCall', function (message /* , params*/) {
const user = Meteor.user();
Messages.createWithTypeRoomIdMessageAndUser('command', message.rid, 'endCall', user);
Notifications.notifyRoom(message.rid, 'deleteMessage', { _id: message._id });
api.broadcast('notify.deleteMessage', message.rid, { _id: message._id });
const language = user.language || settings.get('Language') || 'en';

@ -3,7 +3,6 @@ import { DDPCommon } from 'meteor/ddp-common';
import { NotificationsModule } from '../../../../server/modules/notifications/notifications.module';
import { Streamer } from '../../../../server/modules/streamer/streamer.module';
import { api } from '../../../../server/sdk/api';
import {
Subscriptions as SubscriptionsRaw,
Rooms as RoomsRaw,
@ -42,8 +41,4 @@ notifications.configure({
Settings: SettingsRaw,
});
notifications.streamLocal.on('broadcast', ({ eventName, args }) => {
api.broadcastLocal(eventName, ...args);
});
export default notifications;

@ -4,7 +4,7 @@ import { Match, check } from 'meteor/check';
import { settings } from '../../../settings/server';
import { WebdavAccounts } from '../../../models/server/raw';
import { WebdavClientAdapter } from '../lib/webdavClientAdapter';
import { Notifications } from '../../../notifications/server';
import { api } from '../../../../server/sdk/api';
Meteor.methods({
async addWebdavAccount(formData) {
@ -57,7 +57,8 @@ Meteor.methods({
await client.stat('/');
await WebdavAccounts.insertOne(accountData);
Notifications.notifyUser(userId, 'webdav', {
api.broadcast('notify.webdav', userId, {
type: 'changed',
account: accountData,
});
@ -115,7 +116,7 @@ Meteor.methods({
upsert: true,
},
);
Notifications.notifyUser(userId, 'webdav', {
api.broadcast('notify.webdav', userId, {
type: 'changed',
account: accountData,
});

@ -2,7 +2,7 @@ import { Meteor } from 'meteor/meteor';
import { check } from 'meteor/check';
import { WebdavAccounts } from '../../../models/server/raw';
import { Notifications } from '../../../notifications/server';
import { api } from '../../../../server/sdk/api';
Meteor.methods({
async removeWebdavAccount(accountId) {
@ -18,7 +18,7 @@ Meteor.methods({
const removed = await WebdavAccounts.removeByUserAndId(accountId, userId);
if (removed) {
Notifications.notifyUser(userId, 'webdav', {
api.broadcast('notify.webdav', userId, {
type: 'removed',
account: { _id: accountId },
});

@ -1,3 +1,6 @@
import { IMessage } from './IMessage';
import { IRoom } from './IRoom';
export interface INotificationItemPush {
type: 'push';
data: {
@ -43,3 +46,21 @@ export interface INotification {
error?: string;
items: NotificationItem[];
}
export interface INotificationDesktop {
title: string;
text: string;
duration?: number;
payload: {
_id: IMessage['_id'];
rid: IMessage['rid'];
tmid?: IMessage['tmid'];
sender: IMessage['u'];
type: IRoom['t'];
name: IRoom['name'];
message: {
msg: IMessage['msg'];
t?: IMessage['t'];
};
};
}

@ -80,6 +80,8 @@ export interface IRoom extends IRocketChatRecord {
archived?: boolean;
announcement?: string;
description?: string;
e2eKeyId?: string;
}
export interface ICreatedRoom extends IRoom {

@ -5,8 +5,8 @@ import WebSocket from 'ws';
import { ListenersModule } from '../../../../server/modules/listeners/listeners.module';
import { StreamerCentral } from '../../../../server/modules/streamer/streamer.module';
import { ServiceClass } from '../../../../server/sdk/types/ServiceClass';
import { MeteorService } from '../../../../server/sdk';
import { ServiceClass } from '../../../../server/sdk/types/ServiceClass';
import { Client } from './Client';
import { events, server } from './configureServer';
import { DDP_EVENTS } from './constants';

@ -62,8 +62,15 @@ export class Stream extends Streamer {
}
if (await this.isEmitAllowed(subscription, eventName, ...args)) {
await new Promise((resolve) => {
subscription.client.ws._sender.sendFrame(data[subscription.client.meteorClient ? 'meteor' : 'normal'], resolve);
await new Promise<void>((resolve, reject) => {
const frame = data[subscription.client.meteorClient ? 'meteor' : 'normal'];
subscription.client.ws._sender.sendFrame(frame, (err: unknown) => {
if (err) {
return reject(err);
}
resolve();
});
});
}
}

@ -22,6 +22,7 @@
"author": "Rocket.Chat",
"license": "MIT",
"dependencies": {
"@rocket.chat/apps-engine": "^1.31.0",
"@rocket.chat/emitter": "~0.31.4",
"@rocket.chat/message-parser": "~0.31.4",
"@rocket.chat/string-helpers": "~0.31.4",

@ -289,5 +289,41 @@ export class ListenersModule {
service.onEvent('queue.callabandoned', (userId, queuename: string, queuedcallafterabandon: string): void => {
notifications.notifyUserInThisInstance(userId, 'callabandoned', { queuename, queuedcallafterabandon });
});
service.onEvent('notify.desktop', (uid, notification): void => {
notifications.notifyUserInThisInstance(uid, 'notification', notification);
});
service.onEvent('notify.uiInteraction', (uid, interaction): void => {
notifications.notifyUserInThisInstance(uid, 'uiInteraction', interaction);
});
service.onEvent('notify.updateInvites', (uid, data): void => {
notifications.notifyUserInThisInstance(uid, 'updateInvites', data);
});
service.onEvent('notify.webdav', (uid, data): void => {
notifications.notifyUserInThisInstance(uid, 'webdav', data);
});
service.onEvent('notify.e2e.keyRequest', (rid, data): void => {
notifications.notifyRoomInThisInstance(rid, 'e2e.keyRequest', data);
});
service.onEvent('notify.deleteMessage', (rid, data): void => {
notifications.notifyRoomInThisInstance(rid, 'deleteMessage', data);
});
service.onEvent('notify.deleteMessageBulk', (rid, data): void => {
notifications.notifyRoomInThisInstance(rid, 'deleteMessageBulk', data);
});
service.onEvent('notify.deleteCustomSound', (data): void => {
notifications.notifyAllInThisInstance('deleteCustomSound', data);
});
service.onEvent('notify.updateCustomSound', (data): void => {
notifications.notifyAllInThisInstance('updateCustomSound', data);
});
}
}

@ -51,8 +51,6 @@ export class NotificationsModule {
public readonly streamRoomData: IStreamer;
public readonly streamLocal: IStreamer;
public readonly streamPresence: IStreamer;
constructor(private Streamer: IStreamerConstructor) {
@ -95,8 +93,6 @@ export class NotificationsModule {
});
this.streamUser = new this.Streamer('notify-user');
this.streamLocal = new this.Streamer('local');
}
async configure({ Rooms, Subscriptions, Users, Settings }: IModelsParam): Promise<void> {
@ -452,11 +448,6 @@ export class NotificationsModule {
}
});
this.streamLocal.serverOnly = true;
this.streamLocal.allowRead('none');
this.streamLocal.allowEmit('all');
this.streamLocal.allowWrite('none');
this.streamPresence.allowRead('logged');
this.streamPresence.allowWrite('none');
}

@ -1,22 +1,28 @@
import { IUIKitInteraction } from '@rocket.chat/apps-engine/definition/uikit';
import { IEmailInbox } from '../../../definition/IEmailInbox';
import { IEmoji } from '../../../definition/IEmoji';
import { IInquiry } from '../../../definition/IInquiry';
import { IInstanceStatus } from '../../../definition/IInstanceStatus';
import { IIntegration } from '../../../definition/IIntegration';
import { IIntegrationHistory } from '../../../definition/IIntegrationHistory';
import { ILivechatDepartmentAgents } from '../../../definition/ILivechatDepartmentAgents';
import { ILoginServiceConfiguration } from '../../../definition/ILoginServiceConfiguration';
import { IMessage } from '../../../definition/IMessage';
import { INotificationDesktop } from '../../../definition/INotification';
import { IPbxEvent } from '../../../definition/IPbxEvent';
import { IRole } from '../../../definition/IRole';
import { IRoom } from '../../../definition/IRoom';
import { ISetting } from '../../../definition/ISetting';
import { ISocketConnection } from '../../../definition/ISocketConnection';
import { ISubscription } from '../../../definition/ISubscription';
import { IUser } from '../../../definition/IUser';
import { IEmoji } from '../../../definition/IEmoji';
import { IUserStatus } from '../../../definition/IUserStatus';
import { IUserSession } from '../../../definition/IUserSession';
import { ILoginServiceConfiguration } from '../../../definition/ILoginServiceConfiguration';
import { IInstanceStatus } from '../../../definition/IInstanceStatus';
import { IIntegrationHistory } from '../../../definition/IIntegrationHistory';
import { ILivechatDepartmentAgents } from '../../../definition/ILivechatDepartmentAgents';
import { IIntegration } from '../../../definition/IIntegration';
import { IEmailInbox } from '../../../definition/IEmailInbox';
import { ISocketConnection } from '../../../definition/ISocketConnection';
import { IPbxEvent } from '../../../definition/IPbxEvent';
import { IUserStatus } from '../../../definition/IUserStatus';
import { AutoUpdateRecord } from '../types/IMeteor';
import { IInvite } from '../../../definition/IInvite';
import { IWebdavAccount } from '../../../definition/IWebdavAccount';
import { ICustomSound } from '../../../definition/ICustomSound';
type ClientAction = 'inserted' | 'updated' | 'removed' | 'changed';
@ -37,7 +43,36 @@ export type EventSignatures = {
'livechat-inquiry-queue-observer'(data: { action: string; inquiry: IInquiry }): void;
'message'(data: { action: string; message: IMessage }): void;
'meteor.clientVersionUpdated'(data: AutoUpdateRecord): void;
'notify.desktop'(uid: string, data: INotificationDesktop): void;
'notify.uiInteraction'(uid: string, data: IUIKitInteraction): void;
'notify.updateInvites'(uid: string, data: { invite: IInvite }): void;
'notify.ephemeralMessage'(uid: string, rid: string, message: Partial<IMessage>): void;
'notify.webdav'(
uid: string,
data:
| {
type: 'changed';
account: Partial<IWebdavAccount>;
}
| {
type: 'removed';
account: { _id: IWebdavAccount['_id'] };
},
): void;
'notify.e2e.keyRequest'(rid: string, data: IRoom['e2eKeyId']): void;
'notify.deleteMessage'(rid: string, data: { _id: string }): void;
'notify.deleteMessageBulk'(
rid: string,
data: {
rid: string;
excludePinned: boolean;
ignoreDiscussion: boolean;
ts: Record<string, Date>;
users: string[];
},
): void;
'notify.deleteCustomSound'(data: { soundData: ICustomSound }): void;
'notify.updateCustomSound'(data: { soundData: ICustomSound }): void;
'permission.changed'(data: { clientAction: ClientAction; data: any }): void;
'room'(data: { action: string; room: Partial<IRoom> }): void;
'room.avatarUpdate'(room: Partial<IRoom>): void;

@ -70,6 +70,7 @@ export class OmnichannelVoipService extends ServiceClassInternal implements IOmn
return;
}
this.logger.debug(`Notifying agent ${agent._id} of hangup on room ${currentRoom._id}`);
// TODO evalute why this is 'notifyUserInThisInstance'
Notifications.notifyUserInThisInstance(agent._id, 'call.callerhangup', { roomId: currentRoom._id });
}

@ -289,6 +289,7 @@ export function startStreamBroadcast() {
return StreamerCentral.removeListener('broadcast', onBroadcast);
}
// TODO move to a service and stop using StreamerCentral
StreamerCentral.on('broadcast', onBroadcast);
});
}

Loading…
Cancel
Save