Micro Services: Add metrics capability to Services (#19448)

pull/19384/head^2
Diego Sampaio 6 years ago committed by GitHub
parent 556cc51e26
commit 30df229e58
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      ee/server/broker.ts
  2. 66
      ee/server/services/ddp-streamer/DDPStreamer.ts
  3. 26
      server/sdk/types/IBroker.ts

@ -3,7 +3,7 @@ import EJSON from 'ejson';
import { asyncLocalStorage, License } from '../../server/sdk';
import { api } from '../../server/sdk/api';
import { IBroker, IBrokerNode } from '../../server/sdk/types/IBroker';
import { IBroker, IBrokerNode, IServiceMetrics } from '../../server/sdk/types/IBroker';
import { ServiceClass } from '../../server/sdk/types/ServiceClass';
import { EventSignatures } from '../../server/sdk/lib/Events';
import { LocalBroker } from '../../server/sdk/lib/LocalBroker';
@ -45,11 +45,15 @@ class NetworkBroker implements IBroker {
// list of allowed services to run - has precedence over `internalOnly`
private allowedList = new Set<string>(SERVICES_ALLOWED?.split(',').map((i) => i.trim()).filter((i) => i));
metrics: IServiceMetrics;
constructor(broker: ServiceBroker) {
this.broker = broker;
api.setBroker(this);
this.metrics = broker.metrics;
this.started = this.broker.start();
this.allowed = License.hasLicense('scalability');

@ -7,10 +7,11 @@ import WebSocket from 'ws';
import { Client } from './Client';
// import { STREAMER_EVENTS, STREAM_NAMES } from './constants';
import { ServiceClass } from '../../../../server/sdk/types/ServiceClass';
import { events } from './configureServer';
import { events, server } from './configureServer';
import notifications from './streams/index';
import { StreamerCentral } from '../../../../server/modules/streamer/streamer.module';
import { ListenersModule } from '../../../../server/modules/listeners/listeners.module';
import { DDP_EVENTS } from './constants';
const {
PORT: port = 4000,
@ -81,24 +82,6 @@ wss.on('connection', (ws, req) => new Client(ws, req.url !== '/websocket'));
// },
// };
// broker.createService({
// settings: {
// port: PROMETHEUS_PORT,
// metrics: {
// streamer_users_connected: {
// type: 'Gauge',
// labelNames: ['nodeID'],
// help: 'Users connecteds by streamer',
// },
// streamer_users_logged: {
// type: 'Gauge',
// labelNames: ['nodeID'],
// help: 'Users logged by streamer',
// },
// },
// },
// mixins: PROMETHEUS_PORT !== 'false' ? [PromService] : [],
export class DDPStreamer extends ServiceClass {
protected name = 'streamer';
@ -142,6 +125,51 @@ export class DDPStreamer extends ServiceClass {
});
});
}
async created(): Promise<void> {
if (!this.context) {
return;
}
const { broker, nodeID } = this.context;
if (!broker) {
return;
}
const { metrics } = broker;
if (!metrics) {
return;
}
metrics.register({
name: 'users_connected',
type: 'gauge',
labelNames: ['nodeID'],
description: 'Users connected by streamer',
});
metrics.register({
name: 'users_logged',
type: 'gauge',
labelNames: ['nodeID'],
description: 'Users logged by streamer',
});
server.on(DDP_EVENTS.CONNECTED, () => {
metrics.increment('users_connected', { nodeID }, 1);
});
server.on(DDP_EVENTS.LOGGED, () => {
metrics.increment('users_logged', { nodeID }, 1);
});
server.on(DDP_EVENTS.DISCONNECTED, ({ userId }) => {
metrics.decrement('users_connected', { nodeID }, 1);
if (userId) {
metrics.decrement('users_logged', { nodeID }, 1);
}
});
}
}
// broker.start();

@ -20,7 +20,33 @@ export interface IBrokerNode {
// offlineSince: null
}
export type BaseMetricOptions = {
type: string;
name: string;
description?: string;
labelNames?: Array<string>;
unit?: string;
aggregator?: string;
}
export interface IServiceMetrics {
register(opts: BaseMetricOptions): void;
hasMetric(name: string): boolean;
increment(name: string, labels?: Record<string, any>, value?: number, timestamp?: number): void;
decrement(name: string, labels?: Record<string, any>, value?: number, timestamp?: number): void;
set(name: string, value: any | null, labels?: Record<string, any>, timestamp?: number): void;
observe(name: string, value: number, labels?: Record<string, any>, timestamp?: number): void;
reset(name: string, labels?: Record<string, any>, timestamp?: number): void;
resetAll(name: string, timestamp?: number): void;
timer(name: string, labels?: Record<string, any>, timestamp?: number): () => number;
}
export interface IBroker {
metrics?: IServiceMetrics;
destroyService(service: ServiceClass): void;
createService(service: ServiceClass): void;
call(method: string, data: any): Promise<any>;

Loading…
Cancel
Save