New: Use database change streams when available (#18892)
Co-authored-by: Diego Sampaio <chinello@gmail.com>pull/18993/head
parent
43bcd30543
commit
707aa1f76b
@ -0,0 +1,177 @@ |
||||
import http from 'http'; |
||||
|
||||
import client from 'prom-client'; |
||||
import connect from 'connect'; |
||||
import _ from 'underscore'; |
||||
import gcStats from 'prometheus-gc-stats'; |
||||
import { Meteor } from 'meteor/meteor'; |
||||
import { Facts } from 'meteor/facts-base'; |
||||
|
||||
import { Info, getOplogInfo } from '../../../utils/server'; |
||||
import { Migrations } from '../../../migrations'; |
||||
import { settings } from '../../../settings'; |
||||
import { Statistics } from '../../../models'; |
||||
import { metrics } from './metrics'; |
||||
|
||||
Facts.incrementServerFact = function(pkg, fact, increment) { |
||||
metrics.meteorFacts.inc({ pkg, fact }, increment); |
||||
}; |
||||
|
||||
const setPrometheusData = async () => { |
||||
metrics.info.set({ |
||||
version: Info.version, |
||||
unique_id: settings.get('uniqueID'), |
||||
site_url: settings.get('Site_Url'), |
||||
}, 1); |
||||
|
||||
const sessions = Array.from(Meteor.server.sessions.values()); |
||||
const authenticatedSessions = sessions.filter((s) => s.userId); |
||||
metrics.ddpSessions.set(Meteor.server.sessions.size); |
||||
metrics.ddpAuthenticatedSessions.set(authenticatedSessions.length); |
||||
metrics.ddpConnectedUsers.set(_.unique(authenticatedSessions.map((s) => s.userId)).length); |
||||
|
||||
const statistics = Statistics.findLast(); |
||||
if (!statistics) { |
||||
return; |
||||
} |
||||
|
||||
metrics.version.set({ version: statistics.version }, 1); |
||||
metrics.migration.set(Migrations._getControl().version); |
||||
metrics.instanceCount.set(statistics.instanceCount); |
||||
metrics.oplogEnabled.set({ enabled: statistics.oplogEnabled }, 1); |
||||
|
||||
// User statistics
|
||||
metrics.totalUsers.set(statistics.totalUsers); |
||||
metrics.activeUsers.set(statistics.activeUsers); |
||||
metrics.nonActiveUsers.set(statistics.nonActiveUsers); |
||||
metrics.onlineUsers.set(statistics.onlineUsers); |
||||
metrics.awayUsers.set(statistics.awayUsers); |
||||
metrics.offlineUsers.set(statistics.offlineUsers); |
||||
|
||||
// Room statistics
|
||||
metrics.totalRooms.set(statistics.totalRooms); |
||||
metrics.totalChannels.set(statistics.totalChannels); |
||||
metrics.totalPrivateGroups.set(statistics.totalPrivateGroups); |
||||
metrics.totalDirect.set(statistics.totalDirect); |
||||
metrics.totalLivechat.set(statistics.totalLivechat); |
||||
|
||||
// Message statistics
|
||||
metrics.totalMessages.set(statistics.totalMessages); |
||||
metrics.totalChannelMessages.set(statistics.totalChannelMessages); |
||||
metrics.totalPrivateGroupMessages.set(statistics.totalPrivateGroupMessages); |
||||
metrics.totalDirectMessages.set(statistics.totalDirectMessages); |
||||
metrics.totalLivechatMessages.set(statistics.totalLivechatMessages); |
||||
|
||||
const oplogQueue = getOplogInfo().mongo._oplogHandle?._entryQueue?.length || 0; |
||||
metrics.oplogQueue.set(oplogQueue); |
||||
|
||||
metrics.pushQueue.set(statistics.pushQueue || 0); |
||||
}; |
||||
|
||||
const app = connect(); |
||||
|
||||
// const compression = require('compression');
|
||||
// app.use(compression());
|
||||
|
||||
app.use('/metrics', (req, res) => { |
||||
res.setHeader('Content-Type', 'text/plain'); |
||||
const data = client.register.metrics(); |
||||
|
||||
metrics.metricsRequests.inc(); |
||||
metrics.metricsSize.set(data.length); |
||||
|
||||
res.end(data); |
||||
}); |
||||
|
||||
app.use('/', (req, res) => { |
||||
const html = `<html>
|
||||
<head> |
||||
<title>Rocket.Chat Prometheus Exporter</title> |
||||
</head> |
||||
<body> |
||||
<h1>Rocket.Chat Prometheus Exporter</h1> |
||||
<p><a href="/metrics">Metrics</a></p> |
||||
</body> |
||||
</html>`; |
||||
|
||||
res.write(html); |
||||
res.end(); |
||||
}); |
||||
|
||||
const server = http.createServer(app); |
||||
|
||||
let timer; |
||||
let resetTimer; |
||||
let defaultMetricsInitiated = false; |
||||
let gcStatsInitiated = false; |
||||
const was = { |
||||
enabled: false, |
||||
port: 9458, |
||||
resetInterval: 0, |
||||
collectGC: false, |
||||
}; |
||||
const updatePrometheusConfig = async () => { |
||||
const is = { |
||||
port: process.env.PROMETHEUS_PORT || settings.get('Prometheus_Port'), |
||||
enabled: settings.get('Prometheus_Enabled'), |
||||
resetInterval: settings.get('Prometheus_Reset_Interval'), |
||||
collectGC: settings.get('Prometheus_Garbage_Collector'), |
||||
}; |
||||
|
||||
if (Object.values(is).some((s) => s == null)) { |
||||
return; |
||||
} |
||||
|
||||
if (Object.entries(is).every(([k, v]) => v === was[k])) { |
||||
return; |
||||
} |
||||
|
||||
if (!is.enabled) { |
||||
if (was.enabled) { |
||||
console.log('Disabling Prometheus'); |
||||
server.close(); |
||||
Meteor.clearInterval(timer); |
||||
} |
||||
Object.assign(was, is); |
||||
return; |
||||
} |
||||
|
||||
console.log('Configuring Prometheus', is); |
||||
|
||||
if (!was.enabled) { |
||||
server.listen({ |
||||
port: is.port, |
||||
host: process.env.BIND_IP || '0.0.0.0', |
||||
}); |
||||
|
||||
timer = Meteor.setInterval(setPrometheusData, 5000); |
||||
} |
||||
|
||||
Meteor.clearInterval(resetTimer); |
||||
if (is.resetInterval) { |
||||
resetTimer = Meteor.setInterval(() => { |
||||
client.register.getMetricsAsArray().forEach((metric) => { metric.hashMap = {}; }); |
||||
}, is.resetInterval); |
||||
} |
||||
|
||||
// Prevent exceptions on calling those methods twice since
|
||||
// it's not possible to stop them to be able to restart
|
||||
try { |
||||
if (defaultMetricsInitiated === false) { |
||||
defaultMetricsInitiated = true; |
||||
client.collectDefaultMetrics(); |
||||
} |
||||
if (is.collectGC && gcStatsInitiated === false) { |
||||
gcStatsInitiated = true; |
||||
gcStats()(); |
||||
} |
||||
} catch (error) { |
||||
console.error(error); |
||||
} |
||||
|
||||
Object.assign(was, is); |
||||
}; |
||||
|
||||
Meteor.startup(async () => { |
||||
settings.get(/^Prometheus_.+/, updatePrometheusConfig); |
||||
}); |
||||
@ -0,0 +1,7 @@ |
||||
import { InstanceStatus } from 'meteor/konecty:multiple-instances-status'; |
||||
|
||||
import { Base } from './_Base'; |
||||
|
||||
export class InstanceStatusModel extends Base {} |
||||
|
||||
export default new InstanceStatusModel(InstanceStatus.getCollection(), { preventSetUpdatedAt: true }); |
||||
@ -0,0 +1,7 @@ |
||||
import { UsersSessions } from 'meteor/konecty:user-presence'; |
||||
|
||||
import { Base } from './_Base'; |
||||
|
||||
export class UsersSessionsModel extends Base {} |
||||
|
||||
export default new UsersSessionsModel(UsersSessions, { preventSetUpdatedAt: true }); |
||||
@ -0,0 +1,185 @@ |
||||
import { Meteor } from 'meteor/meteor'; |
||||
import { Promise } from 'meteor/promise'; |
||||
import { MongoInternals } from 'meteor/mongo'; |
||||
import semver from 'semver'; |
||||
import s from 'underscore.string'; |
||||
import { MongoClient, Cursor, Timestamp, Db } from 'mongodb'; |
||||
|
||||
import { urlParser } from './_oplogUrlParser'; |
||||
|
||||
class OplogHandle { |
||||
dbName: string; |
||||
|
||||
client: MongoClient; |
||||
|
||||
stream: Cursor; |
||||
|
||||
db: Db; |
||||
|
||||
usingChangeStream: boolean; |
||||
|
||||
async isChangeStreamAvailable(): Promise<boolean> { |
||||
if (process.env.IGNORE_CHANGE_STREAM) { |
||||
return false; |
||||
} |
||||
|
||||
const { mongo } = MongoInternals.defaultRemoteCollectionDriver(); |
||||
const { version, storageEngine } = await mongo.db.command({ serverStatus: 1 }); |
||||
return storageEngine?.name === 'wiredTiger' && semver.satisfies(semver.coerce(version) || '', '>=3.6.0'); |
||||
} |
||||
|
||||
async start(): Promise<OplogHandle> { |
||||
this.usingChangeStream = await this.isChangeStreamAvailable(); |
||||
const oplogUrl = this.usingChangeStream ? process.env.MONGO_URL : process.env.MONGO_OPLOG_URL; |
||||
|
||||
let urlParsed; |
||||
try { |
||||
urlParsed = await urlParser(oplogUrl); |
||||
} catch (e) { |
||||
throw Error("$MONGO_OPLOG_URL must be set to the 'local' database of a Mongo replica set"); |
||||
} |
||||
|
||||
if (!this.usingChangeStream && (!oplogUrl || urlParsed.dbName !== 'local')) { |
||||
throw Error("$MONGO_OPLOG_URL must be set to the 'local' database of a Mongo replica set"); |
||||
} |
||||
|
||||
if (!oplogUrl) { |
||||
throw Error('$MONGO_URL must be set'); |
||||
} |
||||
|
||||
if (process.env.MONGO_OPLOG_URL) { |
||||
const urlParsed = await urlParser(process.env.MONGO_URL); |
||||
this.dbName = urlParsed.dbName; |
||||
} |
||||
|
||||
this.client = new MongoClient(oplogUrl, { |
||||
useUnifiedTopology: true, |
||||
useNewUrlParser: true, |
||||
...!this.usingChangeStream && { poolSize: 1 }, |
||||
}); |
||||
|
||||
await this.client.connect(); |
||||
this.db = this.client.db(); |
||||
|
||||
if (!this.usingChangeStream) { |
||||
await this.startOplog(); |
||||
} |
||||
|
||||
return this; |
||||
} |
||||
|
||||
async startOplog(): Promise<void> { |
||||
const isMasterDoc = await this.db.admin().command({ ismaster: 1 }); |
||||
if (!isMasterDoc || !isMasterDoc.setName) { |
||||
throw Error("$MONGO_OPLOG_URL must be set to the 'local' database of a Mongo replica set"); |
||||
} |
||||
|
||||
const oplogCollection = this.db.collection('oplog.rs'); |
||||
|
||||
const lastOplogEntry = await oplogCollection.findOne<{ts: Timestamp}>({}, { sort: { $natural: -1 }, projection: { _id: 0, ts: 1 } }); |
||||
|
||||
const oplogSelector = { |
||||
ns: new RegExp(`^(?:${ [ |
||||
s.escapeRegExp(`${ this.dbName }.`), |
||||
s.escapeRegExp('admin.$cmd'), |
||||
].join('|') })`),
|
||||
|
||||
op: { $in: ['i', 'u', 'd'] }, |
||||
...lastOplogEntry && { ts: { $gt: lastOplogEntry.ts } }, |
||||
}; |
||||
|
||||
this.stream = oplogCollection.find(oplogSelector, { |
||||
tailable: true, |
||||
// awaitData: true,
|
||||
}).stream(); |
||||
|
||||
// Prevent warning about many listeners, we add 11
|
||||
this.stream.setMaxListeners(20); |
||||
} |
||||
|
||||
onOplogEntry(query: {collection: string}, callback: Function): void { |
||||
if (this.usingChangeStream) { |
||||
return this._onOplogEntryChangeStream(query, callback); |
||||
} |
||||
|
||||
return this._onOplogEntryOplog(query, callback); |
||||
} |
||||
|
||||
_onOplogEntryOplog(query: {collection: string}, callback: Function): void { |
||||
this.stream.on('data', Meteor.bindEnvironment((buffer) => { |
||||
const doc = buffer as any; |
||||
if (doc.ns === `${ this.dbName }.${ query.collection }`) { |
||||
callback({ |
||||
id: doc.op === 'u' ? doc.o2._id : doc.o._id, |
||||
op: doc, |
||||
}); |
||||
} |
||||
})); |
||||
} |
||||
|
||||
_onOplogEntryChangeStream(query: {collection: string}, callback: Function): void { |
||||
this.db.collection(query.collection).watch([], { /* fullDocument: 'updateLookup' */ }).on('change', Meteor.bindEnvironment((event) => { |
||||
switch (event.operationType) { |
||||
case 'insert': |
||||
callback({ |
||||
id: event.documentKey._id, |
||||
op: { |
||||
op: 'i', |
||||
o: event.fullDocument, |
||||
}, |
||||
}); |
||||
break; |
||||
case 'update': |
||||
callback({ |
||||
id: event.documentKey._id, |
||||
op: { |
||||
op: 'u', |
||||
// o: event.fullDocument,
|
||||
o: { |
||||
$set: event.updateDescription.updatedFields, |
||||
$unset: event.updateDescription.removedFields, |
||||
}, |
||||
}, |
||||
}); |
||||
break; |
||||
case 'delete': |
||||
callback({ |
||||
id: event.documentKey._id, |
||||
op: { |
||||
op: 'd', |
||||
}, |
||||
}); |
||||
break; |
||||
} |
||||
})); |
||||
} |
||||
|
||||
_defineTooFarBehind(): void { |
||||
//
|
||||
} |
||||
} |
||||
|
||||
let oplogHandle: Promise<OplogHandle>; |
||||
|
||||
// @ts-ignore
|
||||
// eslint-disable-next-line no-undef
|
||||
if (Package['disable-oplog']) { |
||||
const { mongo } = MongoInternals.defaultRemoteCollectionDriver(); |
||||
try { |
||||
Promise.await(mongo.db.admin().command({ replSetGetStatus: 1 })); |
||||
oplogHandle = Promise.await(new OplogHandle().start()); |
||||
} catch (e) { |
||||
console.error(e.message); |
||||
} |
||||
} |
||||
|
||||
export const getOplogHandle = async (): Promise<OplogHandle | undefined> => { |
||||
if (oplogHandle) { |
||||
return oplogHandle; |
||||
} |
||||
|
||||
const { mongo } = MongoInternals.defaultRemoteCollectionDriver(); |
||||
if (mongo._oplogHandle?.onOplogEntry) { |
||||
return mongo._oplogHandle; |
||||
} |
||||
}; |
||||
@ -0,0 +1,5 @@ |
||||
import { promisify } from 'util'; |
||||
|
||||
import _urlParser from 'mongodb/lib/url_parser'; |
||||
|
||||
export const urlParser = promisify(_urlParser); |
||||
@ -1,3 +0,0 @@ |
||||
import { EventEmitter } from 'events'; |
||||
|
||||
export const oplogEvents = new EventEmitter(); |
||||
@ -1,19 +0,0 @@ |
||||
import { Meteor } from 'meteor/meteor'; |
||||
|
||||
import { Settings } from '../../models/server'; |
||||
import { setValue } from './raw'; |
||||
|
||||
const updateValue = (id, fields) => { |
||||
if (typeof fields.value === 'undefined') { |
||||
return; |
||||
} |
||||
setValue(id, fields.value); |
||||
}; |
||||
|
||||
Meteor.startup(() => Settings.find({}, { fields: { value: 1 } }).observeChanges({ |
||||
added: updateValue, |
||||
changed: updateValue, |
||||
removed(id) { |
||||
setValue(id, undefined); |
||||
}, |
||||
})); |
||||
@ -1,35 +0,0 @@ |
||||
import { Meteor } from 'meteor/meteor'; |
||||
|
||||
import { Users, Uploads } from '../../app/models'; |
||||
|
||||
export const roomFiles = (pub, { rid, searchText, fileType, limit = 50 }) => { |
||||
if (!pub.userId) { |
||||
return pub.ready(); |
||||
} |
||||
|
||||
if (!Meteor.call('canAccessRoom', rid, pub.userId)) { |
||||
return this.ready(); |
||||
} |
||||
|
||||
const cursorFileListHandle = Uploads.findNotHiddenFilesOfRoom(rid, searchText, fileType, limit).observeChanges({ |
||||
added(_id, record) { |
||||
const { username, name } = record.userId ? Users.findOneById(record.userId) : {}; |
||||
return pub.added('room_files', _id, { ...record, user: { username, name } }); |
||||
}, |
||||
changed(_id, recordChanges) { |
||||
if (!recordChanges.hasOwnProperty('user') && recordChanges.userId) { |
||||
recordChanges.user = Users.findOneById(recordChanges.userId); |
||||
} |
||||
return pub.changed('room_files', _id, recordChanges); |
||||
}, |
||||
removed(_id) { |
||||
return pub.removed('room_files', _id); |
||||
}, |
||||
}); |
||||
|
||||
pub.ready(); |
||||
|
||||
return pub.onStop(function() { |
||||
return cursorFileListHandle.stop(); |
||||
}); |
||||
}; |
||||
Loading…
Reference in new issue