[NEW] Stream to get individual presence updates (#22950)

Co-authored-by: Guilherme Gazzo <guilhermegazzo@gmail.com>
pull/23461/head
Diego Sampaio 5 years ago committed by GitHub
parent d9043a6c0d
commit d03c2b7e7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      app/notifications/client/lib/Notifications.js
  2. 13
      app/notifications/client/lib/Presence.ts
  3. 1
      app/notifications/server/lib/Notifications.ts
  4. 103
      app/notifications/server/lib/Presence.ts
  5. 1
      app/ui-message/client/message.html
  6. 54
      app/ui-sidenav/client/userPresence.js
  7. 10
      client/components/Message/StatusMessage.tsx
  8. 2
      client/components/UserStatus/ReactiveUserStatus.js
  9. 24
      client/hooks/usePresence.ts
  10. 28
      client/hooks/useUserData.ts
  11. 33
      client/lib/presence.ts
  12. 1
      client/startup/index.ts
  13. 69
      client/startup/listenActiveUsers.ts
  14. 7
      client/templates.ts
  15. 4
      client/views/room/Header/DirectRoomHeader.js
  16. 2
      client/views/room/contextualBar/OTR/OTRWithData.js
  17. 4
      server/modules/listeners/listeners.module.ts
  18. 19
      server/modules/notifications/notifications.module.ts
  19. 2
      server/modules/streamer/streamer.module.ts

@ -1,6 +1,8 @@
import { Meteor } from 'meteor/meteor';
import { Tracker } from 'meteor/tracker';
import './Presence';
class Notifications {
constructor(...args) {
this.logged = Meteor.userId() !== null;
@ -17,6 +19,7 @@ class Notifications {
this.streamRoom = new Meteor.Streamer('notify-room');
this.streamRoomUsers = new Meteor.Streamer('notify-room-users');
this.streamUser = new Meteor.Streamer('notify-user');
if (this.debug === true) {
this.onAll(function() {
return console.log('RocketChat.Notifications: onAll', args);

@ -0,0 +1,13 @@
import { Meteor } from 'meteor/meteor';
import { Presence, STATUS_MAP } from '../../../../client/lib/presence';
// TODO implement API on Streamer to be able to listen to all streamed data
// this is a hacky way to listen to all streamed data from user-presense Streamer
(Meteor as any).StreamerCentral.on('stream-user-presence', (uid: string, args: unknown) => {
if (!Array.isArray(args)) {
throw new Error('Presence event must be an array');
}
const [username, status, statusText] = args as [string, number, string | undefined];
Presence.notify({ _id: uid, username, status: STATUS_MAP[status], statusText });
});

@ -11,6 +11,7 @@ import {
Users as UsersRaw,
Settings as SettingsRaw,
} from '../../../models/server/raw';
import './Presence';
// TODO: Replace this in favor of the api.broadcast
// StreamerCentral.on('broadcast', (name, eventName, args) => {

@ -0,0 +1,103 @@
import { Emitter } from '@rocket.chat/emitter';
import { IUser } from '../../../../definition/IUser';
import { IPublication, IStreamerConstructor, Connection, IStreamer } from '../../../../server/modules/streamer/streamer.module';
export type UserPresenseStreamProps = {
added: IUser['_id'][];
removed: IUser['_id'][];
}
export type UserPresenseStreamArgs = {
'uid': string;
args: unknown;
}
const e = new Emitter<{
[key: string]: UserPresenseStreamArgs;
}>();
const clients = new WeakMap<Connection, UserPresence>();
export class UserPresence {
private readonly streamer: IStreamer;
private readonly publication: IPublication;
private readonly listeners: Set<string>;
constructor(publication: IPublication, streamer: IStreamer) {
this.listeners = new Set();
this.publication = publication;
this.streamer = streamer;
}
listen(uid: string): void {
if (this.listeners.has(uid)) {
return;
}
e.on(uid, this.run);
this.listeners.add(uid);
}
off = (uid: string): void => {
e.off(uid, this.run);
this.listeners.delete(uid);
}
run = (args: UserPresenseStreamArgs): void => {
const payload = this.streamer.changedPayload(this.streamer.subscriptionName, args.uid, { ...args, eventName: args.uid }); // there is no good explanation to keep eventName, I just want to save one 'DDPCommon.parseDDP' on the client side, so I'm trying to fit the Meteor Streamer's payload
(this.publication as any)._session.socket.send(payload);
}
stop(): void {
this.listeners.forEach(this.off);
clients.delete(this.publication.connection);
}
static getClient(publication: IPublication, streamer: IStreamer): [UserPresence, boolean] {
const { connection } = publication;
const stored = clients.get(connection);
const client = stored || new UserPresence(publication, streamer);
const main = Boolean(!stored);
clients.set(connection, client);
return [client, main];
}
}
export class StreamPresence {
static getInstance(Streamer: IStreamerConstructor, name = 'user-presence'): IStreamer {
return new class StreamPresence extends Streamer {
async _publish(publication: IPublication, _eventName: string, options: boolean | {useCollection?: boolean; args?: any} = false): Promise<void> {
const { added, removed } = (typeof options !== 'boolean' ? options : {}) as unknown as UserPresenseStreamProps;
const [client, main] = UserPresence.getClient(publication, this);
added?.forEach((uid) => client.listen(uid));
removed?.forEach((uid) => client.off(uid));
if (!main) {
publication.stop();
return;
}
publication.ready();
publication.onStop(() => client.stop());
}
}(name);
}
}
export const emit = (uid: string, args: UserPresenseStreamArgs): void => {
e.emit(uid, { uid, args });
};

@ -40,7 +40,6 @@
<button type="button" class="user user-card-message color-primary-font-color" data-username="{{msg.u.username}}" tabindex="1">
{{getName}}{{#if showUsername}} <span class="message-alias border-component-color color-info-font-color">@{{msg.u.username}}</span>{{/if}}
</button>
{{> StatusMessage uid=msg.u._id}}
<span class="info border-component-color color-info-font-color"></span>
{{# if showRoles }}
{{#each role in roleTags}}

@ -2,58 +2,12 @@ import { Meteor } from 'meteor/meteor';
import { Accounts } from 'meteor/accounts-base';
import { Template } from 'meteor/templating';
import { Tracker } from 'meteor/tracker';
import _ from 'underscore';
import mem from 'mem';
import { APIClient } from '../../utils/client';
import { saveUser, interestedUserIds } from '../../../client/startup/listenActiveUsers';
import { Presence } from '../../../client/lib/presence';
import './userPresence.html';
const data = new Map();
const promises = new Map();
const pending = new Map();
const getAll = _.debounce(async function getAll() {
const ids = Array.from(pending.keys());
if (ids.length === 0) {
return;
}
const params = {
ids,
};
try {
const {
users,
} = await APIClient.v1.get('users.presence', params);
users.forEach((user) => saveUser(user, true));
ids.forEach((id) => {
const { resolve } = promises.get(id);
resolve();
});
} catch (e) {
ids.forEach((id) => {
const { reject } = promises.get(id);
reject();
});
}
}, 100);
export const get = mem(function get(id) {
interestedUserIds.add(id);
const promise = pending.get(id) || new Promise((resolve, reject) => {
promises.set(id, { resolve, reject });
});
pending.set(id, promise);
return promise;
});
const options = {
threshold: 0.1,
};
@ -63,10 +17,8 @@ const handleEntries = function(entries) {
lastEntries = entries.filter(({ isIntersecting }) => isIntersecting);
lastEntries.forEach(async (entry) => {
const { uid } = data.get(entry.target);
await get(uid);
pending.delete(uid);
Presence.get(uid);
});
getAll();
};
const featureExists = !!window.IntersectionObserver;
@ -80,7 +32,6 @@ Tracker.autorun(() => {
Presence.reset();
return Meteor.users.update({ status: { $exists: true } }, { $unset: { status: true } }, { multi: true });
}
mem.clear(get);
Presence.restart();
@ -93,11 +44,8 @@ Tracker.autorun(() => {
}
getAll();
Accounts.onLogout(() => {
Presence.reset();
interestedUserIds.clear();
});
});

@ -1,10 +1,14 @@
import { Box, Icon } from '@rocket.chat/fuselage';
import React, { ReactElement, memo } from 'react';
import React, { ReactElement, memo, useEffect } from 'react';
import { useUserData } from '../../hooks/useUserData';
import { usePresence } from '../../hooks/usePresence';
// TODO: deprecate this component
const StatusMessage = ({ uid }: { uid: string }): ReactElement | null => {
const data = useUserData(uid);
const data = usePresence(uid);
useEffect(() => {
process.env.NODE_ENV === 'development' && console.warn('StatusMessage component is deprecated');
}, [data]);
if (!data || !data.statusText) {
return null;

@ -4,7 +4,7 @@ import { usePresence } from '../../hooks/usePresence';
import UserStatus from './UserStatus';
const ReactiveUserStatus = ({ uid, ...props }) => {
const status = usePresence(uid);
const status = usePresence(uid)?.status;
return <UserStatus status={status} {...props} />;
};

@ -1,4 +1,7 @@
import { useUserData } from './useUserData';
import { useMemo } from 'react';
import { useSubscription } from 'use-subscription';
import { Presence, UserPresence } from '../lib/presence';
type Presence = 'online' | 'offline' | 'busy' | 'away' | 'loading';
@ -6,9 +9,22 @@ type Presence = 'online' | 'offline' | 'busy' | 'away' | 'loading';
* Hook to fetch and subscribe users presence
*
* @param uid - User Id
* @returns User Presence - 'online' | 'offline' | 'busy' | 'away' | 'loading'
* @returns UserPresence
* @public
*/
export const usePresence = (uid: string): UserPresence | undefined => {
const subscription = useMemo(
() => ({
getCurrentValue: (): UserPresence | undefined => (uid ? Presence.store.get(uid) : undefined),
subscribe: (callback: any): any => {
uid && Presence.listen(uid, callback);
return (): void => {
uid && Presence.stop(uid, callback);
};
},
}),
[uid],
);
export const usePresence = (uid: string, presence: Presence): Presence =>
useUserData(uid)?.status || presence;
return useSubscription(subscription);
};

@ -1,28 +0,0 @@
import { useMemo } from 'react';
import { useSubscription } from 'use-subscription';
import { Presence, UserPresence } from '../lib/presence';
/**
* Hook to fetch and subscribe users data
*
* @param uid - User Id
* @returns Users data: status, statusText, username, name
* @public
*/
export const useUserData = (uid: string): UserPresence | undefined => {
const subscription = useMemo(
() => ({
getCurrentValue: (): UserPresence | undefined => Presence.store.get(uid),
subscribe: (callback: any): any => {
Presence.listen(uid, callback);
return (): void => {
Presence.stop(uid, callback);
};
},
}),
[uid],
);
return useSubscription(subscription);
};

@ -1,9 +1,12 @@
import { Emitter, EventHandlerOf } from '@rocket.chat/emitter';
import { Meteor } from 'meteor/meteor';
import { APIClient } from '../../app/utils/client';
import { IUser } from '../../definition/IUser';
import { UserStatus } from '../../definition/UserStatus';
export const STATUS_MAP = [UserStatus.OFFLINE, UserStatus.ONLINE, UserStatus.AWAY, UserStatus.BUSY];
type InternalEvents = {
remove: IUser['_id'];
reset: undefined;
@ -54,11 +57,28 @@ const notify = (presence: UserPresence): void => {
const getPresence = ((): ((uid: UserPresence['_id']) => void) => {
let timer: ReturnType<typeof setTimeout>;
const fetch = (delay = 250): void => {
const deletedUids = new Set<UserPresence['_id']>();
const fetch = (delay = 500): void => {
timer && clearTimeout(timer);
timer = setTimeout(async () => {
const currentUids = new Set(uids);
uids.clear();
const ids = Array.from(currentUids);
const removed = Array.from(deletedUids);
Meteor.subscribe('stream-user-presence', '', {
...(ids.length > 0 && { added: Array.from(currentUids) }),
...(removed.length && { removed: Array.from(deletedUids) }),
});
deletedUids.clear();
if (ids.length === 0) {
return;
}
try {
const params = {
ids: [...currentUids],
@ -93,13 +113,17 @@ const getPresence = ((): ((uid: UserPresence['_id']) => void) => {
uids.add(uid);
fetch();
};
const stop = (uid: UserPresence['_id']): void => {
deletedUids.add(uid);
fetch();
};
emitter.on('remove', (uid) => {
if (emitter.has(uid)) {
return;
}
store.delete(uid);
stop(uid);
});
emitter.on('reset', () => {
@ -121,7 +145,9 @@ const listen = (
uid: UserPresence['_id'],
handler: EventHandlerOf<ExternalEvents, UserPresence['_id']> | (() => void),
): void => {
// emitter.on(uid, update);
if (!uid) {
return;
}
emitter.on(uid, handler);
const user = store.has(uid) && store.get(uid);
@ -154,7 +180,6 @@ const restart = (): void => {
const get = async (uid: UserPresence['_id']): Promise<UserPresence | undefined> =>
new Promise((resolve) => {
const user = store.has(uid) && store.get(uid);
if (user) {
return resolve(user);
}

@ -6,7 +6,6 @@ import './customTranslations';
import './e2e';
import './emailVerification';
import './i18n';
import './listenActiveUsers';
import './ldap';
import './loginViaQuery';
import './messageTypes';

@ -1,69 +0,0 @@
import { Accounts } from 'meteor/accounts-base';
import { Meteor } from 'meteor/meteor';
import { Notifications } from '../../app/notifications/client';
import { IUser } from '../../definition/IUser';
import { UserStatus } from '../../definition/UserStatus';
import { Presence } from '../lib/presence';
const STATUS_MAP = [UserStatus.OFFLINE, UserStatus.ONLINE, UserStatus.AWAY, UserStatus.BUSY];
export const interestedUserIds = new Set<IUser['_id']>();
export const saveUser = (
user: Pick<IUser, '_id' | 'username' | 'status' | 'statusText' | 'avatarETag'>,
force = false,
): void => {
// do not update my own user, my user's status will come from a subscription
if (user._id === (Accounts as any).connection?._userId) {
return;
}
const found = (Meteor.users as any)._collection._docs._map[user._id];
if (found && force) {
Meteor.users.update(
{ _id: user._id },
{
$set: {
...(user.username && { username: user.username }),
// name: user.name,
// utcOffset: user.utcOffset,
status: user.status,
statusText: user.statusText,
...(user.avatarETag && { avatarETag: user.avatarETag }),
},
},
);
return;
}
if (!found) {
Meteor.users.insert(user);
}
};
Meteor.startup(() => {
Notifications.onLogged(
'user-status',
([_id, username, status, statusText]: [
IUser['_id'],
IUser['username'],
number,
IUser['statusText'],
]) => {
Presence.notify({
_id,
username,
status: STATUS_MAP[status],
statusText,
});
if (!interestedUserIds.has(_id)) {
return;
}
saveUser({ _id, username, status: STATUS_MAP[status], statusText }, true);
},
);
});

@ -232,11 +232,4 @@ createTemplateForComponent(
createTemplateForComponent('UserDropdown', () => import('./sidebar/header/UserDropdown'));
createTemplateForComponent('StatusMessage', () => import('./components/Message/StatusMessage'), {
renderContainerView: () =>
HTML.DIV({
class: 'message-custom-status',
}),
});
createTemplateForComponent('sidebarFooter', () => import('./sidebar/footer'));

@ -1,13 +1,13 @@
import React from 'react';
import { useUserId } from '../../../contexts/UserContext';
import { useUserData } from '../../../hooks/useUserData';
import { usePresence } from '../../../hooks/usePresence';
import RoomHeader from './RoomHeader';
const DirectRoomHeader = ({ room, slots }) => {
const userId = useUserId();
const directUserId = room.uids.filter((uid) => uid !== userId).shift();
const directUserData = useUserData(directUserId);
const directUserData = usePresence(directUserId);
return <RoomHeader slots={slots} room={room} topic={directUserData?.statusText} />;
};

@ -22,7 +22,7 @@ const OTRWithData = ({ rid, tabBar }) => {
),
);
const userStatus = usePresence(otr.peerId);
const userStatus = usePresence(otr.peerId)?.status;
const isOnline = !['offline', 'loading'].includes(userStatus);

@ -87,6 +87,10 @@ export class ListenersModule {
}
notifications.notifyLoggedInThisInstance('user-status', [_id, username, STATUS_MAP[status], statusText]);
if (_id) {
notifications.sendPresence(_id, [username, STATUS_MAP[status], statusText]);
}
});
service.onEvent('user.updateCustomStatus', (userStatus) => {

@ -3,12 +3,14 @@ import { Authorization } from '../../sdk';
import { RoomsRaw } from '../../../app/models/server/raw/Rooms';
import { SubscriptionsRaw } from '../../../app/models/server/raw/Subscriptions';
import { ISubscription } from '../../../definition/ISubscription';
import { emit, StreamPresence } from '../../../app/notifications/server/lib/Presence';
import { UsersRaw } from '../../../app/models/server/raw/Users';
import { SettingsRaw } from '../../../app/models/server/raw/Settings';
import { IOmnichannelRoom } from '../../../definition/IRoom';
import { IUser } from '../../../definition/IUser';
import { SystemLogger } from '../../lib/logger/system';
interface IModelsParam {
Rooms: RoomsRaw;
Subscriptions: SubscriptionsRaw;
@ -51,6 +53,9 @@ export class NotificationsModule {
public readonly streamLocal: IStreamer;
public readonly streamPresence: IStreamer;
constructor(
private Streamer: IStreamerConstructor,
) {
@ -68,7 +73,7 @@ export class NotificationsModule {
this.streamLivechatQueueData = new this.Streamer('livechat-inquiry-queue-observer');
this.streamStdout = new this.Streamer('stdout');
this.streamRoomData = new this.Streamer('room-data');
this.streamPresence = StreamPresence.getInstance(Streamer, 'user-presence');
this.streamRoomMessage = new this.Streamer('room-messages');
this.streamRoomMessage.on('_afterPublish', async (streamer: IStreamer, publication: IPublication, eventName: string): Promise<void> => {
@ -161,6 +166,7 @@ export class NotificationsModule {
this.streamLogged.allowWrite('none');
this.streamLogged.allowRead('logged');
this.streamRoom.allowRead(async function(eventName, extraData): Promise<boolean> {
const [rid] = eventName.split('/');
@ -406,6 +412,9 @@ export class NotificationsModule {
this.streamLocal.allowRead('none');
this.streamLocal.allowEmit('all');
this.streamLocal.allowWrite('none');
this.streamPresence.allowRead('logged');
this.streamPresence.allowWrite('none');
}
notifyAll(eventName: string, ...args: any[]): void {
@ -440,6 +449,14 @@ export class NotificationsModule {
return this.streamUser.emitWithoutBroadcast(`${ userId }/${ eventName }`, ...args);
}
sendPresence(uid: string, ...args: any[]): void {
// if (this.debug === true) {
// console.log('notifyUserAndBroadcast', [userId, eventName, ...args]);
// }
emit(uid, args as any);
return this.streamPresence.emitWithoutBroadcast(uid, ...args);
}
progressUpdated(progress: {rate: number}): void {
this.streamImporters.emit('progress', progress);
}

@ -76,6 +76,8 @@ export interface IStreamer {
emitWithoutBroadcast(event: string, ...data: any[]): void;
changedPayload(collection: string, id: string, fields: Record<string, any>): string | false;
_publish(publication: IPublication, eventName: string, options: boolean | {useCollection?: boolean; args?: any}): Promise<void>;
}
export interface IStreamerConstructor {

Loading…
Cancel
Save