[FIX] MAU when using micro services (#24204)

pull/24211/head^2
Diego Sampaio 3 years ago committed by GitHub
parent 56f892401f
commit 37714424bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 33
      app/models/server/raw/Sessions.ts
  2. 394
      app/statistics/server/lib/SAUMonitor.js
  3. 342
      app/statistics/server/lib/SAUMonitor.ts
  4. 10
      app/statistics/server/lib/UAParserCustom.js
  5. 3
      app/statistics/server/startup/monitor.js
  6. 27
      definition/ISession.ts
  7. 10
      definition/ISocketConnection.ts
  8. 5
      definition/externals/meteor/konecty-multiple-instances-status.d.ts
  9. 57
      ee/server/services/ddp-streamer/Client.ts
  10. 2
      ee/server/services/ddp-streamer/DDPStreamer.ts
  11. 3
      ee/server/services/ddp-streamer/Server.ts
  12. 31
      ee/server/services/ddp-streamer/configureServer.ts
  13. 1
      ee/server/services/ddp-streamer/constants.ts
  14. 17
      package-lock.json
  15. 3
      package.json
  16. 1
      server/hooks/index.ts
  17. 42
      server/hooks/sauMonitorHooks.ts
  18. 2
      server/sdk/index.ts
  19. 5
      server/sdk/lib/Events.ts
  20. 3
      server/sdk/types/ISAUMonitorService.ts
  21. 10
      server/services/sauMonitor/events.ts
  22. 31
      server/services/sauMonitor/service.ts
  23. 2
      server/services/startup.ts
  24. 1
      server/startup/index.ts

@ -6,6 +6,7 @@ import {
IndexSpecification,
UpdateWriteOpResult,
FilterQuery,
Cursor,
} from 'mongodb';
import type { ISession } from '../../../../definition/ISession';
@ -774,6 +775,19 @@ export class SessionsRaw extends BaseRaw<ISession> {
);
}
findSessionsNotClosedByDateWithoutLastActivity({ year, month, day }: DestructuredDate): Cursor<ISession> {
const query = {
year,
month,
day,
type: 'session',
closedAt: { $exists: false },
lastActivityAt: { $exists: false },
};
return this.find(query);
}
async getActiveUsersOfPeriodByDayBetweenDates({ start, end }: DestructuredRange): Promise<
{
day: number;
@ -1163,7 +1177,7 @@ export class SessionsRaw extends BaseRaw<ISession> {
};
}
async createOrUpdate(data: ISession): Promise<UpdateWriteOpResult | undefined> {
async createOrUpdate(data: Omit<ISession, '_id' | 'createdAt' | '_updatedAt'>): Promise<UpdateWriteOpResult | undefined> {
const { year, month, day, sessionId, instanceId } = data;
if (!year || !month || !day || !sessionId || !instanceId) {
@ -1224,6 +1238,23 @@ export class SessionsRaw extends BaseRaw<ISession> {
return this.updateMany(query, update);
}
async updateActiveSessionsByDate({ year, month, day }: DestructuredDate, data = {}): Promise<UpdateWriteOpResult> {
const query = {
year,
month,
day,
type: 'session',
closedAt: { $exists: false },
lastActivityAt: { $exists: false },
};
const update = {
$set: data,
};
return this.updateMany(query, update);
}
async logoutByInstanceIdAndSessionIdAndUserId(instanceId: string, sessionId: string, userId: string): Promise<UpdateWriteOpResult> {
const query = {
instanceId,

@ -1,394 +0,0 @@
import { Meteor } from 'meteor/meteor';
import { Accounts } from 'meteor/accounts-base';
import { SyncedCron } from 'meteor/littledata:synced-cron';
import UAParser from 'ua-parser-js';
import { UAParserMobile, UAParserDesktop } from './UAParserCustom';
import { Sessions } from '../../../models/server/raw';
import { aggregates } from '../../../models/server/raw/Sessions';
import { Logger } from '../../../logger';
import { getMostImportantRole } from './getMostImportantRole';
const getDateObj = (dateTime = new Date()) => ({
day: dateTime.getDate(),
month: dateTime.getMonth() + 1,
year: dateTime.getFullYear(),
});
const isSameDateObj = (oldest, newest) => oldest.year === newest.year && oldest.month === newest.month && oldest.day === newest.day;
const logger = new Logger('SAUMonitor');
/**
* Server Session Monitor for SAU(Simultaneously Active Users) based on Meteor server sessions
*/
export class SAUMonitorClass {
constructor() {
this._started = false;
this._monitorTime = 60000;
this._timer = null;
this._today = getDateObj();
this._instanceId = null;
this._jobName = 'aggregate-sessions';
}
async start(instanceId) {
if (this.isRunning()) {
return;
}
this._instanceId = instanceId;
if (!this._instanceId) {
logger.debug('[start] - InstanceId is not defined.');
return;
}
await this._startMonitoring(() => {
this._started = true;
logger.debug(`[start] - InstanceId: ${this._instanceId}`);
});
}
stop() {
if (!this.isRunning()) {
return;
}
this._started = false;
if (this._timer) {
Meteor.clearInterval(this._timer);
}
SyncedCron.remove(this._jobName);
logger.debug(`[stop] - InstanceId: ${this._instanceId}`);
}
isRunning() {
return this._started === true;
}
async _startMonitoring(callback) {
try {
this._handleAccountEvents();
this._handleOnConnection();
this._startSessionControl();
await this._initActiveServerSessions();
this._startAggregation();
if (callback) {
callback();
}
} catch (err) {
throw new Meteor.Error(err);
}
}
_startSessionControl() {
if (this.isRunning()) {
return;
}
if (this._monitorTime < 0) {
return;
}
this._timer = Meteor.setInterval(async () => {
await this._updateActiveSessions();
}, this._monitorTime);
}
_handleOnConnection() {
if (this.isRunning()) {
return;
}
Meteor.onConnection((connection) => {
if (!this.isRunning()) {
return;
}
// this._handleSession(connection, getDateObj());
connection.onClose(async () => {
await Sessions.closeByInstanceIdAndSessionId(this._instanceId, connection.id);
});
});
}
_handleAccountEvents() {
if (this.isRunning()) {
return;
}
Accounts.onLogin(async (info) => {
if (!this.isRunning()) {
return;
}
const { roles, _id: userId } = info.user;
const mostImportantRole = getMostImportantRole(roles);
const loginAt = new Date();
const params = { userId, roles, mostImportantRole, loginAt, ...getDateObj() };
await this._handleSession(info.connection, params);
this._updateConnectionInfo(info.connection.id, { loginAt });
});
Accounts.onLogout(async (info) => {
if (!this.isRunning()) {
return;
}
const sessionId = info.connection.id;
if (info.user) {
const userId = info.user._id;
await Sessions.logoutByInstanceIdAndSessionIdAndUserId(this._instanceId, sessionId, userId);
}
});
}
async _handleSession(connection, params) {
const data = this._getConnectionInfo(connection, params);
await Sessions.createOrUpdate(data);
}
async _updateActiveSessions() {
if (!this.isRunning()) {
return;
}
const { year, month, day } = this._today;
const currentDateTime = new Date();
const currentDay = getDateObj(currentDateTime);
if (!isSameDateObj(this._today, currentDay)) {
const beforeDateTime = new Date(this._today.year, this._today.month - 1, this._today.day, 23, 59, 59, 999);
const nextDateTime = new Date(currentDay.year, currentDay.month - 1, currentDay.day);
const createSessions = async (objects, ids) => {
await Sessions.createBatch(objects);
Meteor.defer(() => {
Sessions.updateActiveSessionsByDateAndInstanceIdAndIds({ year, month, day }, this._instanceId, ids, {
lastActivityAt: beforeDateTime,
});
});
};
this._applyAllServerSessionsBatch(createSessions, {
createdAt: nextDateTime,
lastActivityAt: nextDateTime,
...currentDay,
});
this._today = currentDay;
return;
}
// Otherwise, just update the lastActivityAt field
await this._applyAllServerSessionsIds(async (sessions) => {
await Sessions.updateActiveSessionsByDateAndInstanceIdAndIds({ year, month, day }, this._instanceId, sessions, {
lastActivityAt: currentDateTime,
});
});
}
_getConnectionInfo(connection, params = {}) {
if (!connection) {
return;
}
const ip = connection.httpHeaders
? connection.httpHeaders['x-real-ip'] || connection.httpHeaders['x-forwarded-for']
: connection.clientAddress;
const host = connection.httpHeaders && connection.httpHeaders.host;
const info = {
type: 'session',
sessionId: connection.id,
instanceId: this._instanceId,
ip,
host,
...this._getUserAgentInfo(connection),
...params,
};
if (connection.loginAt) {
info.loginAt = connection.loginAt;
}
return info;
}
_getUserAgentInfo(connection) {
if (!(connection && connection.httpHeaders && connection.httpHeaders['user-agent'])) {
return;
}
const uaString = connection.httpHeaders['user-agent'];
let result;
if (UAParserMobile.isMobileApp(uaString)) {
result = UAParserMobile.uaObject(uaString);
} else if (UAParserDesktop.isDesktopApp(uaString)) {
result = UAParserDesktop.uaObject(uaString);
} else {
const ua = new UAParser(uaString);
result = ua.getResult();
}
const info = {
type: 'other',
};
const removeEmptyProps = (obj) => {
Object.keys(obj).forEach((p) => (!obj[p] || obj[p] === undefined) && delete obj[p]);
return obj;
};
if (result.browser && result.browser.name) {
info.type = 'browser';
info.name = result.browser.name;
info.longVersion = result.browser.version;
}
if (result.os && result.os.name) {
info.os = removeEmptyProps(result.os);
}
if (result.device && (result.device.type || result.device.model)) {
info.type = result.device.type;
if (result.app && result.app.name) {
info.name = result.app.name;
info.longVersion = result.app.version;
if (result.app.bundle) {
info.longVersion += ` ${result.app.bundle}`;
}
}
}
if (typeof info.longVersion === 'string') {
info.version = info.longVersion.match(/(\d+\.){0,2}\d+/)[0];
}
return {
device: info,
};
}
async _initActiveServerSessions() {
await this._applyAllServerSessions(async (connectionHandle) => {
await this._handleSession(connectionHandle, getDateObj());
});
}
async _applyAllServerSessions(callback) {
if (!callback || typeof callback !== 'function') {
return;
}
const sessions = Object.values(Meteor.server.sessions).filter((session) => session.userId);
for await (const session of sessions) {
await callback(session.connectionHandle);
}
}
async recursive(callback, sessionIds) {
await callback(sessionIds.splice(0, 500));
if (sessionIds.length) {
await this.recursive(callback, sessionIds);
}
}
async _applyAllServerSessionsIds(callback) {
if (!callback || typeof callback !== 'function') {
return;
}
const sessionIds = Object.values(Meteor.server.sessions)
.filter((session) => session.userId)
.map((s) => s.id);
await this.recursive(callback, sessionIds);
}
_updateConnectionInfo(sessionId, data = {}) {
if (!sessionId) {
return;
}
const session = Meteor.server.sessions.get(sessionId);
if (session) {
Object.keys(data).forEach((p) => {
session.connectionHandle = Object.assign({}, session.connectionHandle, { [p]: data[p] });
});
}
}
_applyAllServerSessionsBatch(callback, params) {
const batch = (arr, limit) => {
if (!arr.length) {
return Promise.resolve();
}
const ids = [];
return Promise.all(
arr.splice(0, limit).map((item) => {
ids.push(item.id);
return this._getConnectionInfo(item.connectionHandle, params);
}),
)
.then(async (data) => {
await callback(data, ids);
return batch(arr, limit);
})
.catch((e) => {
logger.debug(`Error: ${e.message}`);
});
};
const sessions = Object.values(Meteor.server.sessions).filter((session) => session.userId);
batch(sessions, 500);
}
_startAggregation() {
logger.info('[aggregate] - Start Cron.');
SyncedCron.add({
name: this._jobName,
schedule: (parser) => parser.text('at 2:00 am'),
job: async () => {
await this.aggregate();
},
});
}
async aggregate() {
if (!this.isRunning()) {
return;
}
logger.info('[aggregate] - Aggregating data.');
const date = new Date();
date.setDate(date.getDate() - 0); // yesterday
const yesterday = getDateObj(date);
const match = {
type: 'session',
year: { $lte: yesterday.year },
month: { $lte: yesterday.month },
day: { $lte: yesterday.day },
};
await aggregates.dailySessionsOfYesterday(Sessions.col, yesterday).forEach(async (record) => {
record._id = `${record.userId}-${record.year}-${record.month}-${record.day}`;
await Sessions.updateOne({ _id: record._id }, { $set: record }, { upsert: true });
});
await Sessions.updateMany(match, {
$set: {
type: 'computed-session',
_computedAt: new Date(),
},
});
}
}

@ -0,0 +1,342 @@
import { Meteor } from 'meteor/meteor';
import { SyncedCron } from 'meteor/littledata:synced-cron';
import UAParser from 'ua-parser-js';
import mem from 'mem';
import { UAParserMobile, UAParserDesktop } from './UAParserCustom';
import { Sessions, Users } from '../../../models/server/raw';
import { aggregates } from '../../../models/server/raw/Sessions';
import { Logger } from '../../../../server/lib/logger/Logger';
import { getMostImportantRole } from './getMostImportantRole';
import { sauEvents } from '../../../../server/services/sauMonitor/events';
import { ISession, ISessionDevice } from '../../../../definition/ISession';
import { ISocketConnection } from '../../../../definition/ISocketConnection';
import { IUser } from '../../../../definition/IUser';
type DateObj = { day: number; month: number; year: number };
const getDateObj = (dateTime = new Date()): DateObj => ({
day: dateTime.getDate(),
month: dateTime.getMonth() + 1,
year: dateTime.getFullYear(),
});
const logger = new Logger('SAUMonitor');
const getUserRoles = mem(
async (userId: string): Promise<string[]> => {
const user = await Users.findOneById<IUser>(userId, { projection: { roles: 1 } });
return user?.roles || [];
},
{ maxAge: 5000 },
);
/**
* Server Session Monitor for SAU(Simultaneously Active Users) based on Meteor server sessions
*/
export class SAUMonitorClass {
private _started: boolean;
private _dailyComputeJobName: string;
private _dailyFinishSessionsJobName: string;
constructor() {
this._started = false;
this._dailyComputeJobName = 'aggregate-sessions';
this._dailyFinishSessionsJobName = 'aggregate-sessions';
}
async start(): Promise<void> {
if (this.isRunning()) {
return;
}
await this._startMonitoring();
this._started = true;
logger.debug('[start]');
}
stop(): void {
if (!this.isRunning()) {
return;
}
this._started = false;
SyncedCron.remove(this._dailyComputeJobName);
SyncedCron.remove(this._dailyFinishSessionsJobName);
logger.debug('[stop]');
}
isRunning(): boolean {
return this._started === true;
}
async _startMonitoring(): Promise<void> {
try {
this._handleAccountEvents();
this._handleOnConnection();
this._startCronjobs();
} catch (err: any) {
throw new Meteor.Error(err);
}
}
private _handleOnConnection(): void {
if (this.isRunning()) {
return;
}
sauEvents.on('socket.disconnected', async ({ id, instanceId }) => {
if (!this.isRunning()) {
return;
}
await Sessions.closeByInstanceIdAndSessionId(instanceId, id);
});
}
private _handleAccountEvents(): void {
if (this.isRunning()) {
return;
}
sauEvents.on('accounts.login', async ({ userId, connection }) => {
if (!this.isRunning()) {
return;
}
const roles = await getUserRoles(userId);
const mostImportantRole = getMostImportantRole(roles);
const loginAt = new Date();
const params = { userId, roles, mostImportantRole, loginAt, ...getDateObj() };
await this._handleSession(connection, params);
});
sauEvents.on('accounts.logout', async ({ userId, connection }) => {
if (!this.isRunning()) {
return;
}
await Sessions.logoutByInstanceIdAndSessionIdAndUserId(connection.instanceId, connection.id, userId);
});
}
private async _handleSession(
connection: ISocketConnection,
params: Pick<ISession, 'userId' | 'mostImportantRole' | 'loginAt' | 'day' | 'month' | 'year' | 'roles'>,
): Promise<void> {
const data = this._getConnectionInfo(connection, params);
if (!data) {
return;
}
await Sessions.createOrUpdate(data);
}
private async _finishSessionsFromDate(yesterday: Date, today: Date): Promise<void> {
if (!this.isRunning()) {
return;
}
const { day, month, year } = getDateObj(yesterday);
const beforeDateTime = new Date(year, month - 1, day, 23, 59, 59, 999);
const currentDate = getDateObj(today);
const nextDateTime = new Date(currentDate.year, currentDate.month - 1, currentDate.day);
const cursor = Sessions.findSessionsNotClosedByDateWithoutLastActivity({ year, month, day });
const batch = [];
for await (const session of cursor) {
// create a new session for the current day
batch.push({
...session,
...currentDate,
createdAt: nextDateTime,
});
if (batch.length === 500) {
await Sessions.createBatch(batch);
batch.length = 0;
}
}
if (batch.length > 0) {
await Sessions.createBatch(batch);
}
// close all sessions from current 'date'
await Sessions.updateActiveSessionsByDate(
{ year, month, day },
{
lastActivityAt: beforeDateTime,
},
);
// TODO missing an action to perform on dangling sessions (for example remove sessions not closed one month ago)
}
private _getConnectionInfo(
connection: ISocketConnection,
params: Pick<ISession, 'userId' | 'mostImportantRole' | 'loginAt' | 'day' | 'month' | 'year' | 'roles'>,
): Omit<ISession, '_id' | '_updatedAt' | 'createdAt'> | undefined {
if (!connection) {
return;
}
const ip = connection.clientAddress || connection.httpHeaders?.['x-real-ip'] || connection.httpHeaders?.['x-forwarded-for'];
const host = connection.httpHeaders?.host || '';
return {
type: 'session',
sessionId: connection.id,
instanceId: connection.instanceId,
ip: (Array.isArray(ip) ? ip[0] : ip) || '',
host,
...this._getUserAgentInfo(connection),
...params,
};
}
private _getUserAgentInfo(connection: ISocketConnection): { device: ISessionDevice } | undefined {
if (!connection?.httpHeaders?.['user-agent']) {
return;
}
const uaString = connection.httpHeaders['user-agent'];
// TODO define a type for "result" below
// | UAParser.IResult
// | { device: { type: string; model?: string }; browser: undefined; os: undefined; app: { name: string; version: string } }
// | {
// device: { type: string; model?: string };
// browser: undefined;
// os: string;
// app: { name: string; version: string };
// }
const result = ((): any => {
if (UAParserMobile.isMobileApp(uaString)) {
return UAParserMobile.uaObject(uaString);
}
if (UAParserDesktop.isDesktopApp(uaString)) {
return UAParserDesktop.uaObject(uaString);
}
const ua = new UAParser(uaString);
return ua.getResult();
})();
const info: ISessionDevice = {
type: 'other',
name: '',
longVersion: '',
os: {
name: '',
version: '',
},
version: '',
};
const removeEmptyProps = (obj: any): any => {
Object.keys(obj).forEach((p) => (!obj[p] || obj[p] === undefined) && delete obj[p]);
return obj;
};
if (result.browser && result.browser.name) {
info.type = 'browser';
info.name = result.browser.name;
info.longVersion = result.browser.version || '';
}
if (typeof result.os !== 'string' && result.os?.name) {
info.os = removeEmptyProps(result.os) || '';
}
if (result.device && (result.device.type || result.device.model)) {
info.type = result.device.type || '';
if (result.hasOwnProperty('app') && result.app?.name) {
info.name = result.app.name;
info.longVersion = result.app.version;
if (result.app.bundle) {
info.longVersion += ` ${result.app.bundle}`;
}
}
}
if (typeof info.longVersion === 'string') {
info.version = info.longVersion.match(/(\d+\.){0,2}\d+/)?.[0] || '';
}
return {
device: info,
};
}
private _startCronjobs(): void {
logger.info('[aggregate] - Start Cron.');
SyncedCron.add({
name: this._dailyComputeJobName,
schedule: (parser: any) => parser.text('at 2:00 am'),
job: async () => {
await this._aggregate();
},
});
SyncedCron.add({
name: this._dailyFinishSessionsJobName,
schedule: (parser: any) => parser.text('at 1:05 am'),
job: async () => {
const yesterday = new Date();
yesterday.setDate(yesterday.getDate() - 1);
await this._finishSessionsFromDate(yesterday, new Date());
},
});
}
private async _aggregate(): Promise<void> {
if (!this.isRunning()) {
return;
}
logger.info('[aggregate] - Aggregating data.');
const date = new Date();
date.setDate(date.getDate() - 0); // yesterday
const yesterday = getDateObj(date);
const match = {
type: 'session',
year: { $lte: yesterday.year },
month: { $lte: yesterday.month },
day: { $lte: yesterday.day },
};
for await (const record of aggregates.dailySessionsOfYesterday(Sessions.col, yesterday)) {
await Sessions.updateOne(
{ _id: `${record.userId}-${record.year}-${record.month}-${record.day}` },
{ $set: record },
{ upsert: true },
);
}
await Sessions.updateMany(match, {
$set: {
type: 'computed-session',
_computedAt: new Date(),
},
});
}
}

@ -59,6 +59,11 @@ export const UAParserMobile = {
return splitUA && splitUA[0] && splitUA[0].trim() === this.appName;
},
/**
*
* @param {string} uaString
* @returns { device: { type: '' }, app: { name: '', version: '' } }
*/
uaObject(uaString) {
if (!this.isMobileApp(uaString)) {
return {};
@ -120,6 +125,11 @@ export const UAParserDesktop = {
return uaString.includes(' Electron/');
},
/**
*
* @param {string} uaString
* @returns { device: { type: '' }, os: '' || {}, app: { name: '', version: '' } }
*/
uaObject(uaString) {
if (!this.isDesktopApp(uaString)) {
return {};

@ -1,5 +1,4 @@
import { Meteor } from 'meteor/meteor';
import { InstanceStatus } from 'meteor/konecty:multiple-instances-status';
import { SAUMonitorClass } from '../lib/SAUMonitor';
import { settings } from '../../../settings/server';
@ -18,6 +17,6 @@ Meteor.startup(() => {
return SAUMonitor.stop();
}
SAUMonitor.start(InstanceStatus.id());
SAUMonitor.start();
});
});

@ -1,20 +1,23 @@
export interface ISessionDevice {
type: string;
name: string;
longVersion: string;
os: {
name: string;
version: string;
};
version: string;
}
export interface ISession {
_id: string;
type: string;
mostImportantRole: string;
userId: string;
lastActivityAt: Date;
device: {
type: string;
name: string;
longVersion: string;
os: {
name: string;
version: string;
};
version: string;
};
lastActivityAt?: Date;
device?: ISessionDevice;
roles: string[];
year: number;
month: number;
day: number;
@ -25,5 +28,5 @@ export interface ISession {
host: string;
ip: string;
loginAt: Date;
closedAt: Date;
closedAt?: Date;
}

@ -0,0 +1,10 @@
import type { IncomingHttpHeaders } from 'http';
export interface ISocketConnection {
id: string;
instanceId: string;
livechatToken?: string;
onClose(fn: (...args: any[]) => void): void;
clientAddress: string | undefined;
httpHeaders: IncomingHttpHeaders;
}

@ -0,0 +1,5 @@
declare module 'meteor/konecty:multiple-instances-status' {
namespace InstanceStatus {
function id(): string;
}
}

@ -1,4 +1,5 @@
import { EventEmitter } from 'events';
import type { IncomingMessage } from 'http';
import { v1 as uuidv1 } from 'uuid';
import WebSocket from 'ws';
@ -7,11 +8,51 @@ import { DDP_EVENTS, WS_ERRORS, WS_ERRORS_MESSAGES, TIMEOUT } from './constants'
import { SERVER_ID } from './Server';
import { server } from './configureServer';
import { IPacket } from './types/IPacket';
import { ISocketConnection } from '../../../../definition/ISocketConnection';
// TODO why localhost not as 127.0.0.1?
// based on Meteor's implementation (link)
const getClientAddress = (req: IncomingMessage): string | undefined => {
// For the reported client address for a connection to be correct,
// the developer must set the HTTP_FORWARDED_COUNT environment
// variable to an integer representing the number of hops they
// expect in the `x-forwarded-for` header. E.g., set to "1" if the
// server is behind one proxy.
//
// This could be computed once at startup instead of every time.
const httpForwardedCount = parseInt(process.env.HTTP_FORWARDED_COUNT || '') || 0;
if (httpForwardedCount === 0) {
return req.socket.remoteAddress;
}
interface IConnection {
livechatToken?: string;
onClose(fn: (...args: any[]) => void): void;
}
const forwardedFor =
(req.headers['x-forwarded-for'] && Array.isArray(req.headers['x-forwarded-for'])
? req.headers['x-forwarded-for'][0]
: req.headers['x-forwarded-for']) || '';
if (!forwardedFor) {
return;
}
const forwardedForClean = forwardedFor
.trim()
.split(',')
.map((ip) => ip.trim());
// Typically the first value in the `x-forwarded-for` header is
// the original IP address of the client connecting to the first
// proxy. However, the end user can easily spoof the header, in
// which case the first value(s) will be the fake IP address from
// the user pretending to be a proxy reporting the original IP
// address value. By counting HTTP_FORWARDED_COUNT back from the
// end of the list, we ensure that we get the IP address being
// reported by *our* first proxy.
if (httpForwardedCount < 0 || httpForwardedCount > forwardedForClean.length) {
return;
}
return forwardedForClean[forwardedForClean.length - httpForwardedCount];
};
export class Client extends EventEmitter {
private chain = Promise.resolve();
@ -22,7 +63,7 @@ export class Client extends EventEmitter {
public subscriptions = new Map();
public connection: IConnection;
public connection: ISocketConnection;
public wait = false;
@ -30,13 +71,17 @@ export class Client extends EventEmitter {
public userToken?: string;
constructor(public ws: WebSocket, public meteorClient = false) {
constructor(public ws: WebSocket, public meteorClient = false, req: IncomingMessage) {
super();
this.connection = {
id: this.session,
instanceId: server.id,
onClose: (fn): void => {
this.on('close', fn);
},
clientAddress: getClientAddress(req),
httpHeaders: req.headers,
};
this.renewTimeout(TIMEOUT / 1000);

@ -59,7 +59,7 @@ httpServer.listen(port);
const wss = new WebSocket.Server({ server: httpServer });
wss.on('connection', (ws, req) => new Client(ws, req.url !== '/websocket'));
wss.on('connection', (ws, req) => new Client(ws, req.url !== '/websocket', req));
// export default {
// name: 'streamer',

@ -1,6 +1,7 @@
import { EventEmitter } from 'events';
import ejson from 'ejson';
import { v1 as uuidv1 } from 'uuid';
import { DDP_EVENTS } from './constants';
import { Publication } from './Publication';
@ -22,6 +23,8 @@ export class Server extends EventEmitter {
private _methods = new Map<string, MethodFn>();
public readonly id = uuidv1();
serialize = ejson.stringify;
parse = (packet: string): IPacket => {

@ -5,6 +5,7 @@ import { Account, Presence, MeteorService } from '../../../../server/sdk';
import { UserStatus } from '../../../../definition/UserStatus';
import { Server } from './Server';
import { AutoUpdateRecord } from '../../../../server/sdk/types/IMeteor';
import { api } from '../../../../server/sdk/api';
export const server = new Server();
@ -91,8 +92,9 @@ server.methods({
await Account.logout({ userId: this.userId, token: this.userToken });
}
// TODO: run the handles on monolith to track SAU correctly
// accounts._successfulLogout(this.connection, this.userId);
this.emit(DDP_EVENTS.LOGGEDOUT);
server.emit(DDP_EVENTS.LOGGEDOUT, this);
this.userToken = undefined;
this.userId = undefined;
@ -149,15 +151,32 @@ server.methods({
},
});
server.on(DDP_EVENTS.LOGGED, ({ userId, session }) => {
Presence.newConnection(userId, session);
server.on(DDP_EVENTS.LOGGED, (info) => {
const { userId, connection } = info;
Presence.newConnection(userId, connection.id);
api.broadcast('accounts.login', { userId, connection });
});
server.on(DDP_EVENTS.LOGGEDOUT, (info) => {
const { userId, connection } = info;
api.broadcast('accounts.logout', { userId, connection });
});
server.on(DDP_EVENTS.DISCONNECTED, ({ userId, session }) => {
server.on(DDP_EVENTS.DISCONNECTED, (info) => {
const { userId, connection } = info;
api.broadcast('socket.disconnected', connection);
if (!userId) {
return;
}
Presence.removeConnection(userId, session);
Presence.removeConnection(userId, connection.id);
});
server.on(DDP_EVENTS.CONNECTED, ({ connection }) => {
api.broadcast('socket.connected', connection);
});
// TODO: resolve metrics

@ -29,6 +29,7 @@ export const DDP_EVENTS = {
UNSUBSCRIBE: 'unsub',
DISCONNECTED: 'disconnected',
LOGGED: 'logged',
LOGGEDOUT: 'loggedout',
};
export const WS_ERRORS = {

17
package-lock.json generated

@ -10584,6 +10584,12 @@
"integrity": "sha512-TmhE+/eI8PP7EwT9EbK8i74F1ryNn0LToCyEaLpN+X+A3LS1j4CpsCk9Jwq6Y2Uu7w9wdrKl6bujdj5LSsDKKA==",
"dev": true
},
"@types/ua-parser-js": {
"version": "0.7.36",
"resolved": "https://registry.npmjs.org/@types/ua-parser-js/-/ua-parser-js-0.7.36.tgz",
"integrity": "sha512-N1rW+njavs70y2cApeIw1vLMYXRwfBy+7trgavGuuTfOd7j1Yh7QTRc/yqsPl6ncokt72ZXuxEU0PiCp9bSwNQ==",
"dev": true
},
"@types/uglify-js": {
"version": "3.13.1",
"resolved": "https://registry.npmjs.org/@types/uglify-js/-/uglify-js-3.13.1.tgz",
@ -19385,6 +19391,11 @@
"version": "1.2.7",
"resolved": "https://registry.npmjs.org/core-js/-/core-js-1.2.7.tgz",
"integrity": "sha1-ZSKUwUZR2yj6k70tX/KYOk8IxjY="
},
"ua-parser-js": {
"version": "0.7.31",
"resolved": "https://registry.npmjs.org/ua-parser-js/-/ua-parser-js-0.7.31.tgz",
"integrity": "sha512-qLK/Xe9E2uzmYI3qLeOmI0tEOt+TBBQyUIAh4aAgU05FVYzeZrKUdkAZfBNVGRaHVgV0TDkdEngJSw/SyQchkQ=="
}
}
},
@ -36561,9 +36572,9 @@
"dev": true
},
"ua-parser-js": {
"version": "0.7.28",
"resolved": "https://registry.npmjs.org/ua-parser-js/-/ua-parser-js-0.7.28.tgz",
"integrity": "sha512-6Gurc1n//gjp9eQNXjD9O3M/sMwVtN5S8Lv9bvOYBfKfDNiIIhqiyi01vMBO45u4zkDE420w/e0se7Vs+sIg+g=="
"version": "1.0.2",
"resolved": "https://registry.npmjs.org/ua-parser-js/-/ua-parser-js-1.0.2.tgz",
"integrity": "sha512-00y/AXhx0/SsnI51fTc0rLRmafiGOM4/O+ny10Ps7f+j/b8p/ZY11ytMgznXkOVo4GQ+KwQG5UQLkLGirsACRg=="
},
"uc.micro": {
"version": "1.0.5",

@ -114,6 +114,7 @@
"@types/string-strip-html": "^5.0.0",
"@types/supertest": "^2.0.11",
"@types/toastr": "^2.1.39",
"@types/ua-parser-js": "^0.7.36",
"@types/underscore.string": "0.0.38",
"@types/use-subscription": "^1.0.0",
"@types/uuid": "^8.3.1",
@ -307,7 +308,7 @@
"turndown": "^5.0.3",
"twilio": "^3.65.0",
"twit": "^2.2.11",
"ua-parser-js": "^0.7.28",
"ua-parser-js": "^1.0.2",
"underscore": "^1.13.1",
"underscore.string": "^3.3.5",
"universal-perf-hooks": "^1.0.1",

@ -0,0 +1 @@
import './sauMonitorHooks';

@ -0,0 +1,42 @@
import type { IncomingHttpHeaders } from 'http';
import { InstanceStatus } from 'meteor/konecty:multiple-instances-status';
import { Accounts } from 'meteor/accounts-base';
import { Meteor } from 'meteor/meteor';
import { sauEvents } from '../services/sauMonitor/events';
Accounts.onLogin((info: { user: Meteor.User; connection: Meteor.Connection }) => {
const { httpHeaders } = info.connection;
sauEvents.emit('accounts.login', {
userId: info.user._id,
connection: { instanceId: InstanceStatus.id(), ...info.connection, httpHeaders: httpHeaders as IncomingHttpHeaders },
});
});
Accounts.onLogout((info: { user: Meteor.User; connection: Meteor.Connection }) => {
const { httpHeaders } = info.connection;
sauEvents.emit('accounts.logout', {
userId: info.user._id,
connection: { instanceId: InstanceStatus.id(), ...info.connection, httpHeaders: httpHeaders as IncomingHttpHeaders },
});
});
Meteor.onConnection((connection) => {
connection.onClose(async () => {
const { httpHeaders } = connection;
sauEvents.emit('socket.disconnected', {
instanceId: InstanceStatus.id(),
...connection,
httpHeaders: httpHeaders as IncomingHttpHeaders,
});
});
});
Meteor.onConnection((connection) => {
const { httpHeaders } = connection;
sauEvents.emit('socket.connected', { instanceId: InstanceStatus.id(), ...connection, httpHeaders: httpHeaders as IncomingHttpHeaders });
});

@ -14,6 +14,7 @@ import { IRoomService } from './types/IRoomService';
import { IMediaService } from './types/IMediaService';
import { IAnalyticsService } from './types/IAnalyticsService';
import { ILDAPService } from './types/ILDAPService';
import { ISAUMonitorService } from './types/ISAUMonitorService';
import { FibersContextStore } from './lib/ContextStore';
// TODO think in a way to not have to pass the service name to proxify here as well
@ -30,6 +31,7 @@ export const Room = proxifyWithWait<IRoomService>('room');
export const Media = proxifyWithWait<IMediaService>('media');
export const Analytics = proxifyWithWait<IAnalyticsService>('analytics');
export const LDAP = proxifyWithWait<ILDAPService>('ldap');
export const SAUMonitor = proxifyWithWait<ISAUMonitorService>('sau-monitor');
// Calls without wait. Means that the service is optional and the result may be an error
// of service/method not available

@ -15,10 +15,15 @@ import { IIntegrationHistory } from '../../../definition/IIntegrationHistory';
import { ILivechatDepartmentAgents } from '../../../definition/ILivechatDepartmentAgents';
import { IIntegration } from '../../../definition/IIntegration';
import { IEmailInbox } from '../../../definition/IEmailInbox';
import { ISocketConnection } from '../../../definition/ISocketConnection';
type ClientAction = 'inserted' | 'updated' | 'removed' | 'changed';
export type EventSignatures = {
'accounts.login': (info: { userId: string; connection: ISocketConnection }) => void;
'accounts.logout': (info: { userId: string; connection: ISocketConnection }) => void;
'socket.connected': (connection: ISocketConnection) => void;
'socket.disconnected': (connection: ISocketConnection) => void;
'banner.new'(bannerId: string): void;
'banner.enabled'(bannerId: string): void;
'banner.disabled'(bannerId: string): void;

@ -0,0 +1,3 @@
import { IServiceClass } from './ServiceClass';
export type ISAUMonitorService = IServiceClass;

@ -0,0 +1,10 @@
import { Emitter } from '@rocket.chat/emitter';
import { ISocketConnection } from '../../../definition/ISocketConnection';
export const sauEvents = new Emitter<{
'accounts.login': { userId: string; connection: ISocketConnection };
'accounts.logout': { userId: string; connection: ISocketConnection };
'socket.connected': ISocketConnection;
'socket.disconnected': ISocketConnection;
}>();

@ -0,0 +1,31 @@
// import type { Db } from 'mongodb';
import { ServiceClass } from '../../sdk/types/ServiceClass';
import { ISAUMonitorService } from '../../sdk/types/ISAUMonitorService';
import { sauEvents } from './events';
export class SAUMonitorService extends ServiceClass implements ISAUMonitorService {
protected name = 'sau-monitor';
constructor() {
super();
this.onEvent('accounts.login', async (data) => {
sauEvents.emit('accounts.login', data);
});
this.onEvent('accounts.logout', async (data) => {
sauEvents.emit('accounts.logout', data);
});
this.onEvent('socket.disconnected', async (data) => {
// console.log('socket.disconnected', data);
sauEvents.emit('socket.disconnected', data);
});
this.onEvent('socket.connected', async (data) => {
// console.log('socket.connected', data);
sauEvents.emit('socket.connected', data);
});
}
}

@ -11,6 +11,7 @@ import { UiKitCoreApp } from './uikit-core-app/service';
import { MediaService } from './image/service';
import { AnalyticsService } from './analytics/service';
import { LDAPService } from './ldap/service';
import { SAUMonitorService } from './sauMonitor/service';
const { db } = MongoInternals.defaultRemoteCollectionDriver().mongo;
@ -24,3 +25,4 @@ api.registerService(new TeamService(db));
api.registerService(new MediaService());
api.registerService(new AnalyticsService(db));
api.registerService(new LDAPService());
api.registerService(new SAUMonitorService());

@ -7,3 +7,4 @@ import './instance';
import './presence';
import './serverRunning';
import './coreApps';
import '../hooks';

Loading…
Cancel
Save