diff --git a/app/api/server/v1/settings.js b/app/api/server/v1/settings.js index e323a635ddf..cf926f44d47 100644 --- a/app/api/server/v1/settings.js +++ b/app/api/server/v1/settings.js @@ -6,7 +6,7 @@ import _ from 'underscore'; import { Settings } from '../../../models/server'; import { hasPermission } from '../../../authorization'; import { API } from '../api'; -import { SettingsEvents } from '../../../settings/server'; +import { SettingsEvents, settings } from '../../../settings/server'; const fetchSettings = (query, sort, offset, count, fields) => { const settings = Settings.find(query, { @@ -146,6 +146,10 @@ API.v1.addRoute('settings/:_id', { authRequired: true }, { value: Match.Any, }); if (Settings.updateValueNotHiddenById(this.urlParams._id, this.bodyParams.value)) { + settings.storeSettingValue({ + _id: this.urlParams._id, + value: this.bodyParams.value, + }); return API.v1.success(); } diff --git a/app/assets/server/assets.js b/app/assets/server/assets.js index 3df4a62c2e7..8864c8e372f 100644 --- a/app/assets/server/assets.js +++ b/app/assets/server/assets.js @@ -7,8 +7,7 @@ import _ from 'underscore'; import sizeOf from 'image-size'; import sharp from 'sharp'; -import { settings } from '../../settings'; -import { Settings } from '../../models'; +import { settings } from '../../settings/server'; import { getURL } from '../../utils/lib/getURL'; import { mime } from '../../utils/lib/mimeTypes'; import { hasPermission } from '../../authorization'; @@ -354,19 +353,7 @@ for (const key of Object.keys(assets)) { addAssetToSetting(key, value); } -Settings.find().observe({ - added(record) { - return RocketChatAssets.processAsset(record._id, record.value); - }, - - changed(record) { - return RocketChatAssets.processAsset(record._id, record.value); - }, - - removed(record) { - return RocketChatAssets.processAsset(record._id, undefined); - }, -}); +settings.get(/^Assets_/, (key, value) => RocketChatAssets.processAsset(key, value)); Meteor.startup(function() { return Meteor.setTimeout(function() { diff --git a/app/authorization/server/streamer/permissions/emitter.js b/app/authorization/server/streamer/permissions/emitter.js index a7062439eb4..a258bdc607e 100644 --- a/app/authorization/server/streamer/permissions/emitter.js +++ b/app/authorization/server/streamer/permissions/emitter.js @@ -12,7 +12,7 @@ Permissions.on('change', ({ clientAction, id, data, diff }) => { switch (clientAction) { case 'updated': case 'inserted': - data = data || Permissions.findOneById(id); + data = data ?? Permissions.findOneById(id); break; case 'removed': diff --git a/app/dolphin/lib/common.js b/app/dolphin/lib/common.js index 5366b5fecec..0e74e6d1fca 100644 --- a/app/dolphin/lib/common.js +++ b/app/dolphin/lib/common.js @@ -5,7 +5,6 @@ import { ServiceConfiguration } from 'meteor/service-configuration'; import { settings } from '../../settings'; import { CustomOAuth } from '../../custom-oauth'; import { callbacks } from '../../callbacks'; -import { Settings } from '../../models'; const config = { serverURL: '', @@ -31,15 +30,9 @@ function DolphinOnCreateUser(options, user) { if (Meteor.isServer) { Meteor.startup(() => - Settings.find({ _id: 'Accounts_OAuth_Dolphin_URL' }).observe({ - added() { - config.serverURL = settings.get('Accounts_OAuth_Dolphin_URL'); - return Dolphin.configure(config); - }, - changed() { - config.serverURL = settings.get('Accounts_OAuth_Dolphin_URL'); - return Dolphin.configure(config); - }, + settings.get('Accounts_OAuth_Dolphin_URL', (key, value) => { + config.serverURL = value; + return Dolphin.configure(config); }), ); diff --git a/app/integrations/server/lib/triggerHandler.js b/app/integrations/server/lib/triggerHandler.js index 7ce09dc03f8..0541855cbf6 100644 --- a/app/integrations/server/lib/triggerHandler.js +++ b/app/integrations/server/lib/triggerHandler.js @@ -22,19 +22,26 @@ integrations.triggerHandler = new class RocketChatIntegrationHandler { this.compiledScripts = {}; this.triggers = {}; - Models.Integrations.find({ type: 'webhook-outgoing' }).observe({ - added: (record) => { - this.addIntegration(record); - }, - - changed: (record) => { - this.removeIntegration(record); - this.addIntegration(record); - }, + Models.Integrations.find({ type: 'webhook-outgoing' }).fetch().forEach((data) => this.addIntegration(data)); - removed: (record) => { - this.removeIntegration(record); - }, + Models.Integrations.on('change', ({ clientAction, id, data }) => { + switch (clientAction) { + case 'inserted': + if (data.type === 'webhook-outgoing') { + this.addIntegration(data); + } + break; + case 'updated': + data = data ?? Models.Integrations.findOneById(id); + if (data.type === 'webhook-outgoing') { + this.removeIntegration(data); + this.addIntegration(data); + } + break; + case 'removed': + this.removeIntegration({ _id: id }); + break; + } }); } diff --git a/app/lib/server/startup/userDataStream.js b/app/lib/server/startup/userDataStream.js index 8cdc95e4b66..f30f745de50 100644 --- a/app/lib/server/startup/userDataStream.js +++ b/app/lib/server/startup/userDataStream.js @@ -1,10 +1,73 @@ +import { MongoInternals } from 'meteor/mongo'; + import { Users } from '../../../models/server'; import { Notifications } from '../../../notifications/server'; +let processOnChange; +// eslint-disable-next-line no-undef +const disableOplog = Package['disable-oplog']; + +if (disableOplog) { + // Stores the callbacks for the disconnection reactivity bellow + const userCallbacks = new Map(); + + // Overrides the native observe changes to prevent database polling and stores the callbacks + // for the users' tokens to re-implement the reactivity based on our database listeners + const { mongo } = MongoInternals.defaultRemoteCollectionDriver(); + MongoInternals.Connection.prototype._observeChanges = function({ collectionName, selector, options = {} }, _ordered, callbacks) { + // console.error('Connection.Collection.prototype._observeChanges', collectionName, selector, options); + let cbs; + if (callbacks?.added) { + const records = Promise.await(mongo.rawCollection(collectionName).find(selector, { projection: options.fields }).toArray()); + for (const { _id, ...fields } of records) { + callbacks.added(_id, fields); + } + + if (collectionName === 'users' && selector['services.resume.loginTokens.hashedToken']) { + cbs = userCallbacks.get(selector._id) || new Set(); + cbs.add({ + hashedToken: selector['services.resume.loginTokens.hashedToken'], + callbacks, + }); + userCallbacks.set(selector._id, cbs); + } + } + return { + stop() { + if (cbs) { + cbs.delete(callbacks); + } + }, + }; + }; + + // Re-implement meteor's reactivity that uses observe to disconnect sessions when the token + // associated was removed + processOnChange = (diff, id) => { + const loginTokens = diff['services.resume.loginTokens']; + if (loginTokens) { + const tokens = loginTokens.map(({ hashedToken }) => hashedToken); + + const cbs = userCallbacks.get(id); + if (cbs) { + [...cbs].filter(({ hashedToken }) => !tokens.includes(hashedToken)).forEach((item) => { + item.callbacks.removed(id); + cbs.delete(item); + }); + } + } + }; +} + Users.on('change', ({ clientAction, id, data, diff }) => { switch (clientAction) { case 'updated': Notifications.notifyUserInThisInstance(id, 'userData', { diff, type: clientAction }); + + if (disableOplog) { + processOnChange(diff, id); + } + break; case 'inserted': Notifications.notifyUserInThisInstance(id, 'userData', { data, type: clientAction }); diff --git a/app/metrics/server/index.js b/app/metrics/server/index.js index 3f176d62916..be589206b65 100644 --- a/app/metrics/server/index.js +++ b/app/metrics/server/index.js @@ -1,6 +1,7 @@ import { metrics } from './lib/metrics'; import StatsTracker from './lib/statsTracker'; +import './lib/collectMetrics'; import './callbacksMetrics'; export { diff --git a/app/metrics/server/lib/collectMetrics.js b/app/metrics/server/lib/collectMetrics.js new file mode 100644 index 00000000000..31625134bed --- /dev/null +++ b/app/metrics/server/lib/collectMetrics.js @@ -0,0 +1,177 @@ +import http from 'http'; + +import client from 'prom-client'; +import connect from 'connect'; +import _ from 'underscore'; +import gcStats from 'prometheus-gc-stats'; +import { Meteor } from 'meteor/meteor'; +import { Facts } from 'meteor/facts-base'; + +import { Info, getOplogInfo } from '../../../utils/server'; +import { Migrations } from '../../../migrations'; +import { settings } from '../../../settings'; +import { Statistics } from '../../../models'; +import { metrics } from './metrics'; + +Facts.incrementServerFact = function(pkg, fact, increment) { + metrics.meteorFacts.inc({ pkg, fact }, increment); +}; + +const setPrometheusData = async () => { + metrics.info.set({ + version: Info.version, + unique_id: settings.get('uniqueID'), + site_url: settings.get('Site_Url'), + }, 1); + + const sessions = Array.from(Meteor.server.sessions.values()); + const authenticatedSessions = sessions.filter((s) => s.userId); + metrics.ddpSessions.set(Meteor.server.sessions.size); + metrics.ddpAuthenticatedSessions.set(authenticatedSessions.length); + metrics.ddpConnectedUsers.set(_.unique(authenticatedSessions.map((s) => s.userId)).length); + + const statistics = Statistics.findLast(); + if (!statistics) { + return; + } + + metrics.version.set({ version: statistics.version }, 1); + metrics.migration.set(Migrations._getControl().version); + metrics.instanceCount.set(statistics.instanceCount); + metrics.oplogEnabled.set({ enabled: statistics.oplogEnabled }, 1); + + // User statistics + metrics.totalUsers.set(statistics.totalUsers); + metrics.activeUsers.set(statistics.activeUsers); + metrics.nonActiveUsers.set(statistics.nonActiveUsers); + metrics.onlineUsers.set(statistics.onlineUsers); + metrics.awayUsers.set(statistics.awayUsers); + metrics.offlineUsers.set(statistics.offlineUsers); + + // Room statistics + metrics.totalRooms.set(statistics.totalRooms); + metrics.totalChannels.set(statistics.totalChannels); + metrics.totalPrivateGroups.set(statistics.totalPrivateGroups); + metrics.totalDirect.set(statistics.totalDirect); + metrics.totalLivechat.set(statistics.totalLivechat); + + // Message statistics + metrics.totalMessages.set(statistics.totalMessages); + metrics.totalChannelMessages.set(statistics.totalChannelMessages); + metrics.totalPrivateGroupMessages.set(statistics.totalPrivateGroupMessages); + metrics.totalDirectMessages.set(statistics.totalDirectMessages); + metrics.totalLivechatMessages.set(statistics.totalLivechatMessages); + + const oplogQueue = getOplogInfo().mongo._oplogHandle?._entryQueue?.length || 0; + metrics.oplogQueue.set(oplogQueue); + + metrics.pushQueue.set(statistics.pushQueue || 0); +}; + +const app = connect(); + +// const compression = require('compression'); +// app.use(compression()); + +app.use('/metrics', (req, res) => { + res.setHeader('Content-Type', 'text/plain'); + const data = client.register.metrics(); + + metrics.metricsRequests.inc(); + metrics.metricsSize.set(data.length); + + res.end(data); +}); + +app.use('/', (req, res) => { + const html = ` + + Rocket.Chat Prometheus Exporter + + +

Rocket.Chat Prometheus Exporter

+

Metrics

+ + `; + + res.write(html); + res.end(); +}); + +const server = http.createServer(app); + +let timer; +let resetTimer; +let defaultMetricsInitiated = false; +let gcStatsInitiated = false; +const was = { + enabled: false, + port: 9458, + resetInterval: 0, + collectGC: false, +}; +const updatePrometheusConfig = async () => { + const is = { + port: process.env.PROMETHEUS_PORT || settings.get('Prometheus_Port'), + enabled: settings.get('Prometheus_Enabled'), + resetInterval: settings.get('Prometheus_Reset_Interval'), + collectGC: settings.get('Prometheus_Garbage_Collector'), + }; + + if (Object.values(is).some((s) => s == null)) { + return; + } + + if (Object.entries(is).every(([k, v]) => v === was[k])) { + return; + } + + if (!is.enabled) { + if (was.enabled) { + console.log('Disabling Prometheus'); + server.close(); + Meteor.clearInterval(timer); + } + Object.assign(was, is); + return; + } + + console.log('Configuring Prometheus', is); + + if (!was.enabled) { + server.listen({ + port: is.port, + host: process.env.BIND_IP || '0.0.0.0', + }); + + timer = Meteor.setInterval(setPrometheusData, 5000); + } + + Meteor.clearInterval(resetTimer); + if (is.resetInterval) { + resetTimer = Meteor.setInterval(() => { + client.register.getMetricsAsArray().forEach((metric) => { metric.hashMap = {}; }); + }, is.resetInterval); + } + + // Prevent exceptions on calling those methods twice since + // it's not possible to stop them to be able to restart + try { + if (defaultMetricsInitiated === false) { + defaultMetricsInitiated = true; + client.collectDefaultMetrics(); + } + if (is.collectGC && gcStatsInitiated === false) { + gcStatsInitiated = true; + gcStats()(); + } + } catch (error) { + console.error(error); + } + + Object.assign(was, is); +}; + +Meteor.startup(async () => { + settings.get(/^Prometheus_.+/, updatePrometheusConfig); +}); diff --git a/app/metrics/server/lib/metrics.js b/app/metrics/server/lib/metrics.js index a3d5de8c8db..288daf0d90f 100644 --- a/app/metrics/server/lib/metrics.js +++ b/app/metrics/server/lib/metrics.js @@ -1,17 +1,4 @@ -import http from 'http'; - import client from 'prom-client'; -import connect from 'connect'; -import _ from 'underscore'; -import gcStats from 'prometheus-gc-stats'; -import { Meteor } from 'meteor/meteor'; -import { Facts } from 'meteor/facts-base'; - -import { Info, getOplogInfo } from '../../../utils/server'; -import { Migrations } from '../../../migrations'; -import { settings } from '../../../settings'; -import { Statistics } from '../../../models'; -import { oplogEvents } from '../../../models/server/oplogEvents'; export const metrics = {}; const percentiles = [0.01, 0.1, 0.9, 0.99]; @@ -102,175 +89,3 @@ metrics.totalLivechatMessages = new client.Gauge({ name: 'rocketchat_livechat_me // Meteor Facts metrics.meteorFacts = new client.Gauge({ name: 'rocketchat_meteor_facts', labelNames: ['pkg', 'fact'], help: 'internal meteor facts' }); - -Facts.incrementServerFact = function(pkg, fact, increment) { - metrics.meteorFacts.inc({ pkg, fact }, increment); -}; - -const setPrometheusData = async () => { - metrics.info.set({ - version: Info.version, - unique_id: settings.get('uniqueID'), - site_url: settings.get('Site_Url'), - }, 1); - - const sessions = Array.from(Meteor.server.sessions.values()); - const authenticatedSessions = sessions.filter((s) => s.userId); - metrics.ddpSessions.set(Meteor.server.sessions.size); - metrics.ddpAuthenticatedSessions.set(authenticatedSessions.length); - metrics.ddpConnectedUsers.set(_.unique(authenticatedSessions.map((s) => s.userId)).length); - - const statistics = Statistics.findLast(); - if (!statistics) { - return; - } - - metrics.version.set({ version: statistics.version }, 1); - metrics.migration.set(Migrations._getControl().version); - metrics.instanceCount.set(statistics.instanceCount); - metrics.oplogEnabled.set({ enabled: statistics.oplogEnabled }, 1); - - // User statistics - metrics.totalUsers.set(statistics.totalUsers); - metrics.activeUsers.set(statistics.activeUsers); - metrics.nonActiveUsers.set(statistics.nonActiveUsers); - metrics.onlineUsers.set(statistics.onlineUsers); - metrics.awayUsers.set(statistics.awayUsers); - metrics.offlineUsers.set(statistics.offlineUsers); - - // Room statistics - metrics.totalRooms.set(statistics.totalRooms); - metrics.totalChannels.set(statistics.totalChannels); - metrics.totalPrivateGroups.set(statistics.totalPrivateGroups); - metrics.totalDirect.set(statistics.totalDirect); - metrics.totalLivechat.set(statistics.totalLivechat); - - // Message statistics - metrics.totalMessages.set(statistics.totalMessages); - metrics.totalChannelMessages.set(statistics.totalChannelMessages); - metrics.totalPrivateGroupMessages.set(statistics.totalPrivateGroupMessages); - metrics.totalDirectMessages.set(statistics.totalDirectMessages); - metrics.totalLivechatMessages.set(statistics.totalLivechatMessages); - - const oplogQueue = getOplogInfo().mongo._oplogHandle?._entryQueue?.length || 0; - metrics.oplogQueue.set(oplogQueue); - - metrics.pushQueue.set(statistics.pushQueue || 0); -}; - -const app = connect(); - -// const compression = require('compression'); -// app.use(compression()); - -app.use('/metrics', (req, res) => { - res.setHeader('Content-Type', 'text/plain'); - const data = client.register.metrics(); - - metrics.metricsRequests.inc(); - metrics.metricsSize.set(data.length); - - res.end(data); -}); - -app.use('/', (req, res) => { - const html = ` - - Rocket.Chat Prometheus Exporter - - -

Rocket.Chat Prometheus Exporter

-

Metrics

- - `; - - res.write(html); - res.end(); -}); - -const server = http.createServer(app); - -const oplogMetric = ({ collection, op }) => { - metrics.oplog.inc({ - collection, - op, - }); -}; - -let timer; -let resetTimer; -let defaultMetricsInitiated = false; -let gcStatsInitiated = false; -const was = { - enabled: false, - port: 9458, - resetInterval: 0, - collectGC: false, -}; -const updatePrometheusConfig = async () => { - const is = { - port: process.env.PROMETHEUS_PORT || settings.get('Prometheus_Port'), - enabled: settings.get('Prometheus_Enabled'), - resetInterval: settings.get('Prometheus_Reset_Interval'), - collectGC: settings.get('Prometheus_Garbage_Collector'), - }; - - if (Object.values(is).some((s) => s == null)) { - return; - } - - if (Object.entries(is).every(([k, v]) => v === was[k])) { - return; - } - - if (!is.enabled) { - if (was.enabled) { - console.log('Disabling Prometheus'); - server.close(); - Meteor.clearInterval(timer); - oplogEvents.removeListener('record', oplogMetric); - } - Object.assign(was, is); - return; - } - - console.log('Configuring Prometheus', is); - - if (!was.enabled) { - server.listen({ - port: is.port, - host: process.env.BIND_IP || '0.0.0.0', - }); - - timer = Meteor.setInterval(setPrometheusData, 5000); - oplogEvents.on('record', oplogMetric); - } - - Meteor.clearInterval(resetTimer); - if (is.resetInterval) { - resetTimer = Meteor.setInterval(() => { - client.register.getMetricsAsArray().forEach((metric) => { metric.hashMap = {}; }); - }, is.resetInterval); - } - - // Prevent exceptions on calling those methods twice since - // it's not possible to stop them to be able to restart - try { - if (defaultMetricsInitiated === false) { - defaultMetricsInitiated = true; - client.collectDefaultMetrics(); - } - if (is.collectGC && gcStatsInitiated === false) { - gcStatsInitiated = true; - gcStats()(); - } - } catch (error) { - console.error(error); - } - - Object.assign(was, is); -}; - -Meteor.startup(async () => { - settings.get(/^Prometheus_.+/, updatePrometheusConfig); -}); diff --git a/app/models/server/models/InstanceStatus.js b/app/models/server/models/InstanceStatus.js new file mode 100644 index 00000000000..344381e4426 --- /dev/null +++ b/app/models/server/models/InstanceStatus.js @@ -0,0 +1,7 @@ +import { InstanceStatus } from 'meteor/konecty:multiple-instances-status'; + +import { Base } from './_Base'; + +export class InstanceStatusModel extends Base {} + +export default new InstanceStatusModel(InstanceStatus.getCollection(), { preventSetUpdatedAt: true }); diff --git a/app/models/server/models/UsersSessions.js b/app/models/server/models/UsersSessions.js new file mode 100644 index 00000000000..43aec902d34 --- /dev/null +++ b/app/models/server/models/UsersSessions.js @@ -0,0 +1,7 @@ +import { UsersSessions } from 'meteor/konecty:user-presence'; + +import { Base } from './_Base'; + +export class UsersSessionsModel extends Base {} + +export default new UsersSessionsModel(UsersSessions, { preventSetUpdatedAt: true }); diff --git a/app/models/server/models/_Base.js b/app/models/server/models/_Base.js index d5ba676bd7e..c668d3761ec 100644 --- a/app/models/server/models/_Base.js +++ b/app/models/server/models/_Base.js @@ -4,7 +4,6 @@ import objectPath from 'object-path'; import _ from 'underscore'; import { BaseDb } from './_BaseDb'; -import { oplogEvents } from '../oplogEvents'; export class Base { constructor(nameOrModel, options) { @@ -13,20 +12,11 @@ export class Base { this.collectionName = this._db.collectionName; this.name = this._db.name; + this.removeListener = this._db.removeListener.bind(this._db); this.on = this._db.on.bind(this._db); this.emit = this._db.emit.bind(this._db); this.db = this; - - this._db.on('change', ({ action, oplog }) => { - if (!oplog) { - return; - } - oplogEvents.emit('record', { - collection: this.collectionName, - op: action, - }); - }); } get origin() { diff --git a/app/models/server/models/_BaseDb.js b/app/models/server/models/_BaseDb.js index 87785f6a5ec..7201335c92e 100644 --- a/app/models/server/models/_BaseDb.js +++ b/app/models/server/models/_BaseDb.js @@ -4,7 +4,8 @@ import { Match } from 'meteor/check'; import { Mongo } from 'meteor/mongo'; import _ from 'underscore'; -import { getMongoInfo } from '../../../utils/server/functions/getMongoInfo'; +import { metrics } from '../../../metrics/server/lib/metrics'; +import { getOplogHandle } from './_oplogHandle'; const baseName = 'rocketchat_'; @@ -21,6 +22,12 @@ try { console.log(e); } +const actions = { + i: 'insert', + u: 'update', + d: 'remove', +}; + export class BaseDb extends EventEmitter { constructor(model, baseModel, options = {}) { super(); @@ -37,12 +44,14 @@ export class BaseDb extends EventEmitter { this.baseModel = baseModel; + this.preventSetUpdatedAt = !!options.preventSetUpdatedAt; + this.wrapModel(); - const { oplogEnabled, mongo } = getMongoInfo(); + const _oplogHandle = Promise.await(getOplogHandle()); // When someone start listening for changes we start oplog if available - const handleListener = (event /* , listener*/) => { + const handleListener = async (event /* , listener*/) => { if (event !== 'change') { return; } @@ -53,7 +62,7 @@ export class BaseDb extends EventEmitter { collection: this.collectionName, }; - if (!mongo._oplogHandle) { + if (!_oplogHandle) { throw new Error(`Error: Unable to find Mongodb Oplog. You must run the server with oplog enabled. Try the following:\n 1. Start your mongodb in a replicaset mode: mongod --smallfiles --oplogSize 128 --replSet rs0\n 2. Start the replicaset via mongodb shell: mongo mongo/meteor --eval "rs.initiate({ _id: ''rs0'', members: [ { _id: 0, host: ''localhost:27017'' } ]})"\n @@ -61,19 +70,19 @@ export class BaseDb extends EventEmitter { `); } - mongo._oplogHandle.onOplogEntry( + _oplogHandle.onOplogEntry( query, this.processOplogRecord.bind(this), ); // Meteor will handle if we have a value https://github.com/meteor/meteor/blob/5dcd0b2eb9c8bf881ffbee98bc4cb7631772c4da/packages/mongo/oplog_tailing.js#L5 if (process.env.METEOR_OPLOG_TOO_FAR_BEHIND == null) { - mongo._oplogHandle._defineTooFarBehind( + _oplogHandle._defineTooFarBehind( Number.MAX_SAFE_INTEGER, ); } }; - if (oplogEnabled) { + if (_oplogHandle) { this.on('newListener', handleListener); } @@ -85,6 +94,9 @@ export class BaseDb extends EventEmitter { } setUpdatedAt(record = {}) { + if (this.preventSetUpdatedAt) { + return record; + } // TODO: Check if this can be deleted, Rodrigo does not rememebr WHY he added it. So he removed it to fix issue #5541 // setUpdatedAt(record = {}, checkQuery = false, query) { // if (checkQuery === true) { @@ -189,62 +201,68 @@ export class BaseDb extends EventEmitter { ); } - processOplogRecord(action) { - if (action.op.op === 'i') { + processOplogRecord({ id, op }) { + const action = actions[op.op]; + metrics.oplog.inc({ + collection: this.collectionName, + op: action, + }); + + if (action === 'insert') { this.emit('change', { - action: 'insert', + action, clientAction: 'inserted', - id: action.op.o._id, - data: action.op.o, + id: op.o._id, + data: op.o, oplog: true, }); return; } - if (action.op.op === 'u') { - if (!action.op.o.$set && !action.op.o.$unset) { + if (action === 'update') { + if (!op.o.$set && !op.o.$unset) { this.emit('change', { - action: 'update', + action, clientAction: 'updated', - id: action.id, - data: action.op.o, + id, + data: op.o, oplog: true, }); return; } const diff = {}; - if (action.op.o.$set) { - for (const key in action.op.o.$set) { - if (action.op.o.$set.hasOwnProperty(key)) { - diff[key] = action.op.o.$set[key]; + if (op.o.$set) { + for (const key in op.o.$set) { + if (op.o.$set.hasOwnProperty(key)) { + diff[key] = op.o.$set[key]; } } } - if (action.op.o.$unset) { - for (const key in action.op.o.$unset) { - if (action.op.o.$unset.hasOwnProperty(key)) { + if (op.o.$unset) { + for (const key in op.o.$unset) { + if (op.o.$unset.hasOwnProperty(key)) { diff[key] = undefined; } } } this.emit('change', { - action: 'update', + action, clientAction: 'updated', - id: action.id, + id, diff, oplog: true, }); return; } - if (action.op.op === 'd') { + if (action === 'remove') { this.emit('change', { - action: 'remove', + action, clientAction: 'removed', - id: action.id, + id, oplog: true, }); } diff --git a/app/models/server/models/_oplogHandle.ts b/app/models/server/models/_oplogHandle.ts new file mode 100644 index 00000000000..74d50bf6c23 --- /dev/null +++ b/app/models/server/models/_oplogHandle.ts @@ -0,0 +1,185 @@ +import { Meteor } from 'meteor/meteor'; +import { Promise } from 'meteor/promise'; +import { MongoInternals } from 'meteor/mongo'; +import semver from 'semver'; +import s from 'underscore.string'; +import { MongoClient, Cursor, Timestamp, Db } from 'mongodb'; + +import { urlParser } from './_oplogUrlParser'; + +class OplogHandle { + dbName: string; + + client: MongoClient; + + stream: Cursor; + + db: Db; + + usingChangeStream: boolean; + + async isChangeStreamAvailable(): Promise { + if (process.env.IGNORE_CHANGE_STREAM) { + return false; + } + + const { mongo } = MongoInternals.defaultRemoteCollectionDriver(); + const { version, storageEngine } = await mongo.db.command({ serverStatus: 1 }); + return storageEngine?.name === 'wiredTiger' && semver.satisfies(semver.coerce(version) || '', '>=3.6.0'); + } + + async start(): Promise { + this.usingChangeStream = await this.isChangeStreamAvailable(); + const oplogUrl = this.usingChangeStream ? process.env.MONGO_URL : process.env.MONGO_OPLOG_URL; + + let urlParsed; + try { + urlParsed = await urlParser(oplogUrl); + } catch (e) { + throw Error("$MONGO_OPLOG_URL must be set to the 'local' database of a Mongo replica set"); + } + + if (!this.usingChangeStream && (!oplogUrl || urlParsed.dbName !== 'local')) { + throw Error("$MONGO_OPLOG_URL must be set to the 'local' database of a Mongo replica set"); + } + + if (!oplogUrl) { + throw Error('$MONGO_URL must be set'); + } + + if (process.env.MONGO_OPLOG_URL) { + const urlParsed = await urlParser(process.env.MONGO_URL); + this.dbName = urlParsed.dbName; + } + + this.client = new MongoClient(oplogUrl, { + useUnifiedTopology: true, + useNewUrlParser: true, + ...!this.usingChangeStream && { poolSize: 1 }, + }); + + await this.client.connect(); + this.db = this.client.db(); + + if (!this.usingChangeStream) { + await this.startOplog(); + } + + return this; + } + + async startOplog(): Promise { + const isMasterDoc = await this.db.admin().command({ ismaster: 1 }); + if (!isMasterDoc || !isMasterDoc.setName) { + throw Error("$MONGO_OPLOG_URL must be set to the 'local' database of a Mongo replica set"); + } + + const oplogCollection = this.db.collection('oplog.rs'); + + const lastOplogEntry = await oplogCollection.findOne<{ts: Timestamp}>({}, { sort: { $natural: -1 }, projection: { _id: 0, ts: 1 } }); + + const oplogSelector = { + ns: new RegExp(`^(?:${ [ + s.escapeRegExp(`${ this.dbName }.`), + s.escapeRegExp('admin.$cmd'), + ].join('|') })`), + + op: { $in: ['i', 'u', 'd'] }, + ...lastOplogEntry && { ts: { $gt: lastOplogEntry.ts } }, + }; + + this.stream = oplogCollection.find(oplogSelector, { + tailable: true, + // awaitData: true, + }).stream(); + + // Prevent warning about many listeners, we add 11 + this.stream.setMaxListeners(20); + } + + onOplogEntry(query: {collection: string}, callback: Function): void { + if (this.usingChangeStream) { + return this._onOplogEntryChangeStream(query, callback); + } + + return this._onOplogEntryOplog(query, callback); + } + + _onOplogEntryOplog(query: {collection: string}, callback: Function): void { + this.stream.on('data', Meteor.bindEnvironment((buffer) => { + const doc = buffer as any; + if (doc.ns === `${ this.dbName }.${ query.collection }`) { + callback({ + id: doc.op === 'u' ? doc.o2._id : doc.o._id, + op: doc, + }); + } + })); + } + + _onOplogEntryChangeStream(query: {collection: string}, callback: Function): void { + this.db.collection(query.collection).watch([], { /* fullDocument: 'updateLookup' */ }).on('change', Meteor.bindEnvironment((event) => { + switch (event.operationType) { + case 'insert': + callback({ + id: event.documentKey._id, + op: { + op: 'i', + o: event.fullDocument, + }, + }); + break; + case 'update': + callback({ + id: event.documentKey._id, + op: { + op: 'u', + // o: event.fullDocument, + o: { + $set: event.updateDescription.updatedFields, + $unset: event.updateDescription.removedFields, + }, + }, + }); + break; + case 'delete': + callback({ + id: event.documentKey._id, + op: { + op: 'd', + }, + }); + break; + } + })); + } + + _defineTooFarBehind(): void { + // + } +} + +let oplogHandle: Promise; + +// @ts-ignore +// eslint-disable-next-line no-undef +if (Package['disable-oplog']) { + const { mongo } = MongoInternals.defaultRemoteCollectionDriver(); + try { + Promise.await(mongo.db.admin().command({ replSetGetStatus: 1 })); + oplogHandle = Promise.await(new OplogHandle().start()); + } catch (e) { + console.error(e.message); + } +} + +export const getOplogHandle = async (): Promise => { + if (oplogHandle) { + return oplogHandle; + } + + const { mongo } = MongoInternals.defaultRemoteCollectionDriver(); + if (mongo._oplogHandle?.onOplogEntry) { + return mongo._oplogHandle; + } +}; diff --git a/app/models/server/models/_oplogUrlParser.js b/app/models/server/models/_oplogUrlParser.js new file mode 100644 index 00000000000..8bf3fdd2e83 --- /dev/null +++ b/app/models/server/models/_oplogUrlParser.js @@ -0,0 +1,5 @@ +import { promisify } from 'util'; + +import _urlParser from 'mongodb/lib/url_parser'; + +export const urlParser = promisify(_urlParser); diff --git a/app/models/server/oplogEvents.js b/app/models/server/oplogEvents.js deleted file mode 100644 index 1508641e9d9..00000000000 --- a/app/models/server/oplogEvents.js +++ /dev/null @@ -1,3 +0,0 @@ -import { EventEmitter } from 'events'; - -export const oplogEvents = new EventEmitter(); diff --git a/app/search/server/events/events.js b/app/search/server/events/events.js index 7f0f5032a1d..41a1e217b45 100644 --- a/app/search/server/events/events.js +++ b/app/search/server/events/events.js @@ -1,11 +1,13 @@ -import { callbacks } from '../../../callbacks'; -import { Users, Rooms } from '../../../models'; +import _ from 'underscore'; + +import { settings } from '../../../settings/server'; +import { callbacks } from '../../../callbacks/server'; +import { Users, Rooms } from '../../../models/server'; import { searchProviderService } from '../service/providerService'; import SearchLogger from '../logger/logger'; class EventService { - /* eslint no-unused-vars: [2, { "args": "none" }]*/ - _pushError(name, value, payload) { + _pushError(name, value/* , payload */) { // TODO implement a (performant) cache SearchLogger.debug(`Error on event '${ name }' with id '${ value }'`); } @@ -22,26 +24,24 @@ const eventService = new EventService(); /** * Listen to message changes via Hooks */ -callbacks.add('afterSaveMessage', function(m) { +function afterSaveMessage(m) { eventService.promoteEvent('message.save', m._id, m); return m; -}, callbacks.priority.MEDIUM, 'search-events'); +} -callbacks.add('afterDeleteMessage', function(m) { +function afterDeleteMessage(m) { eventService.promoteEvent('message.delete', m._id); return m; -}, callbacks.priority.MEDIUM, 'search-events-delete'); +} /** * Listen to user and room changes via cursor */ - - -Users.on('change', ({ clientAction, id, data }) => { +function onUsersChange({ clientAction, id, data }) { switch (clientAction) { case 'updated': case 'inserted': - const user = data || Users.findOneById(id); + const user = data ?? Users.findOneById(id); eventService.promoteEvent('user.save', id, user); break; @@ -49,13 +49,13 @@ Users.on('change', ({ clientAction, id, data }) => { eventService.promoteEvent('user.delete', id); break; } -}); +} -Rooms.on('change', ({ clientAction, id, data }) => { +function onRoomsChange({ clientAction, id, data }) { switch (clientAction) { case 'updated': case 'inserted': - const room = data || Rooms.findOneById(id); + const room = data ?? Rooms.findOneById(id); eventService.promoteEvent('room.save', id, room); break; @@ -63,4 +63,18 @@ Rooms.on('change', ({ clientAction, id, data }) => { eventService.promoteEvent('room.delete', id); break; } -}); +} + +settings.get('Search.Provider', _.debounce(() => { + if (searchProviderService.activeProvider?.on) { + Users.on('change', onUsersChange); + Rooms.on('change', onRoomsChange); + callbacks.add('afterSaveMessage', afterSaveMessage, callbacks.priority.MEDIUM, 'search-events'); + callbacks.add('afterDeleteMessage', afterDeleteMessage, callbacks.priority.MEDIUM, 'search-events-delete'); + } else { + Users.removeListener('change', onUsersChange); + Rooms.removeListener('change', onRoomsChange); + callbacks.remove('afterSaveMessage', 'search-events'); + callbacks.remove('afterDeleteMessage', 'search-events-delete'); + } +}, 1000)); diff --git a/app/settings/server/functions/settings.ts b/app/settings/server/functions/settings.ts index c0b239e7be5..3bff85f1f1b 100644 --- a/app/settings/server/functions/settings.ts +++ b/app/settings/server/functions/settings.ts @@ -5,6 +5,7 @@ import _ from 'underscore'; import { SettingsBase, SettingValue } from '../../lib/settings'; import SettingsModel from '../../../models/server/models/Settings'; +import { setValue, updateValue } from '../raw'; const blockedSettings = new Set(); const hiddenSettings = new Set(); @@ -395,13 +396,28 @@ class Settings extends SettingsBase { */ init(): void { this.initialLoad = true; - SettingsModel.find().observe({ - added: (record: ISettingRecord) => this.storeSettingValue(record, this.initialLoad), - changed: (record: ISettingRecord) => this.storeSettingValue(record, this.initialLoad), - removed: (record: ISettingRecord) => this.removeSettingValue(record, this.initialLoad), + SettingsModel.find().fetch().forEach((record: ISettingRecord) => { + this.storeSettingValue(record, this.initialLoad); + updateValue(record._id, { value: record.value }); }); this.initialLoad = false; this.afterInitialLoad.forEach((fn) => fn(Meteor.settings)); + + SettingsModel.on('change', ({ clientAction, id, data }) => { + switch (clientAction) { + case 'inserted': + case 'updated': + data = data ?? SettingsModel.findOneById(id); + this.storeSettingValue(data, this.initialLoad); + updateValue(id, { value: data.value }); + break; + case 'removed': + data = SettingsModel.trashFindOneById(id); + this.removeSettingValue(data, this.initialLoad); + setValue(id, undefined); + break; + } + }); } onAfterInitialLoad(fn: (settings: Meteor.Settings) => void): void { diff --git a/app/settings/server/index.ts b/app/settings/server/index.ts index 7a4f6ebf00b..3adfad5409b 100644 --- a/app/settings/server/index.ts +++ b/app/settings/server/index.ts @@ -1,5 +1,4 @@ import { settings, SettingsEvents } from './functions/settings'; -import './observer'; export { settings, diff --git a/app/settings/server/observer.js b/app/settings/server/observer.js deleted file mode 100644 index 7c376078aaa..00000000000 --- a/app/settings/server/observer.js +++ /dev/null @@ -1,19 +0,0 @@ -import { Meteor } from 'meteor/meteor'; - -import { Settings } from '../../models/server'; -import { setValue } from './raw'; - -const updateValue = (id, fields) => { - if (typeof fields.value === 'undefined') { - return; - } - setValue(id, fields.value); -}; - -Meteor.startup(() => Settings.find({}, { fields: { value: 1 } }).observeChanges({ - added: updateValue, - changed: updateValue, - removed(id) { - setValue(id, undefined); - }, -})); diff --git a/app/settings/server/raw.js b/app/settings/server/raw.js index 9bfd51fbc02..436643cbae4 100644 --- a/app/settings/server/raw.js +++ b/app/settings/server/raw.js @@ -1,15 +1,18 @@ -import { Settings } from '../../models/server/raw'; +import { Settings } from '../../models/server/models/Settings'; const cache = new Map(); export const setValue = (_id, value) => cache.set(_id, value); const setFromDB = async (_id) => { - const value = await Settings.getValueById(_id); + const setting = Settings.findOneById(_id, { fields: { value: 1 } }); + if (!setting) { + return; + } - setValue(_id, value); + setValue(_id, setting.value); - return value; + return setting.value; }; export const getValue = async (_id) => { @@ -19,3 +22,10 @@ export const getValue = async (_id) => { return cache.get(_id); }; + +export const updateValue = (id, fields) => { + if (typeof fields.value === 'undefined') { + return; + } + setValue(id, fields.value); +}; diff --git a/app/ui-master/server/inject.js b/app/ui-master/server/inject.js index 5f0ac8fcaed..2cf6094baa0 100644 --- a/app/ui-master/server/inject.js +++ b/app/ui-master/server/inject.js @@ -6,7 +6,7 @@ import _ from 'underscore'; import s from 'underscore.string'; import { Settings } from '../../models'; -import { settings } from '../../settings'; +import { settings } from '../../settings/server'; const headInjections = new ReactiveDict(); @@ -157,9 +157,7 @@ renderDynamicCssList(); // changed: renderDynamicCssList // }); -Settings.find({ _id: /theme-color-rc/i }, { fields: { value: 1 } }).observe({ - changed: renderDynamicCssList, -}); +settings.get(/theme-color-rc/i, () => renderDynamicCssList()); injectIntoBody('icons', Assets.getText('public/icons.svg')); diff --git a/app/utils/server/functions/getMongoInfo.js b/app/utils/server/functions/getMongoInfo.js index c0cbd9a17a6..7b34b5a3054 100644 --- a/app/utils/server/functions/getMongoInfo.js +++ b/app/utils/server/functions/getMongoInfo.js @@ -1,9 +1,11 @@ import { MongoInternals } from 'meteor/mongo'; +import { getOplogHandle } from '../../../models/server/models/_oplogHandle'; + export function getOplogInfo() { const { mongo } = MongoInternals.defaultRemoteCollectionDriver(); - const oplogEnabled = Boolean(mongo._oplogHandle && mongo._oplogHandle.onOplogEntry); + const oplogEnabled = !!Promise.await(getOplogHandle()); return { oplogEnabled, mongo }; } diff --git a/package-lock.json b/package-lock.json index bb1c770e9e8..dd10719517e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -8583,6 +8583,12 @@ "resolved": "https://registry.npmjs.org/@types/retry/-/retry-0.12.0.tgz", "integrity": "sha512-wWKOClTTiizcZhXnPY4wikVAwmdYHp8q6DmC+EJUzAMsycb7HB32Kh9RN4+0gExjmPmZSAQjgURXIGATPegAvA==" }, + "@types/semver": { + "version": "7.3.3", + "resolved": "https://registry.npmjs.org/@types/semver/-/semver-7.3.3.tgz", + "integrity": "sha512-jQxClWFzv9IXdLdhSaTf16XI3NYe6zrEbckSpb5xhKfPbWgIyAY0AFyWWWfaiDcBuj3UHmMkCIwSRqpKMTZL2Q==", + "dev": true + }, "@types/serve-static": { "version": "1.13.5", "resolved": "https://registry.npmjs.org/@types/serve-static/-/serve-static-1.13.5.tgz", @@ -18103,7 +18109,7 @@ }, "minimist": { "version": "0.0.8", - "resolved": "https://registry.npmjs.org/minimist/-/minimist-0.0.8.tgz", + "resolved": false, "integrity": "sha1-hX/Kv8M5fSYluCKCYuhqp6ARsF0=", "dev": true, "optional": true @@ -18131,7 +18137,7 @@ }, "mkdirp": { "version": "0.5.1", - "resolved": "https://registry.npmjs.org/mkdirp/-/mkdirp-0.5.1.tgz", + "resolved": false, "integrity": "sha1-MAV0OOrGz3+MR2fzhkjWaX11yQM=", "dev": true, "optional": true, @@ -18304,7 +18310,7 @@ "dependencies": { "minimist": { "version": "1.2.0", - "resolved": "https://registry.npmjs.org/minimist/-/minimist-1.2.0.tgz", + "resolved": false, "integrity": "sha1-o1AIsg9BOD7sH7kU9M1d95omQoQ=", "dev": true, "optional": true diff --git a/package.json b/package.json index ff4d16e57a5..45a2f4b661d 100644 --- a/package.json +++ b/package.json @@ -71,6 +71,7 @@ "@types/moment-timezone": "^0.5.30", "@types/mongodb": "^3.5.26", "@types/react-dom": "^16.9.8", + "@types/semver": "^7.3.3", "@types/toastr": "^2.1.38", "@typescript-eslint/eslint-plugin": "^2.34.0", "@typescript-eslint/parser": "^2.34.0", diff --git a/packages/rocketchat-mongo-config/server/index.js b/packages/rocketchat-mongo-config/server/index.js index 5c39b353a5b..2db736697b6 100644 --- a/packages/rocketchat-mongo-config/server/index.js +++ b/packages/rocketchat-mongo-config/server/index.js @@ -5,6 +5,10 @@ import { EmailTest } from 'meteor/email'; import { Mongo } from 'meteor/mongo'; import { HTTP } from 'meteor/http'; +if (!process.env.USE_NATIVE_OPLOG) { + Package['disable-oplog'] = {}; +} + // Set default HTTP call timeout to 20s const envTimeout = parseInt(process.env.HTTP_DEFAULT_TIMEOUT, 10); const timeout = !isNaN(envTimeout) ? envTimeout : 20000; diff --git a/server/lib/roomFiles.js b/server/lib/roomFiles.js deleted file mode 100644 index b1ca1aa8808..00000000000 --- a/server/lib/roomFiles.js +++ /dev/null @@ -1,35 +0,0 @@ -import { Meteor } from 'meteor/meteor'; - -import { Users, Uploads } from '../../app/models'; - -export const roomFiles = (pub, { rid, searchText, fileType, limit = 50 }) => { - if (!pub.userId) { - return pub.ready(); - } - - if (!Meteor.call('canAccessRoom', rid, pub.userId)) { - return this.ready(); - } - - const cursorFileListHandle = Uploads.findNotHiddenFilesOfRoom(rid, searchText, fileType, limit).observeChanges({ - added(_id, record) { - const { username, name } = record.userId ? Users.findOneById(record.userId) : {}; - return pub.added('room_files', _id, { ...record, user: { username, name } }); - }, - changed(_id, recordChanges) { - if (!recordChanges.hasOwnProperty('user') && recordChanges.userId) { - recordChanges.user = Users.findOneById(recordChanges.userId); - } - return pub.changed('room_files', _id, recordChanges); - }, - removed(_id) { - return pub.removed('room_files', _id); - }, - }); - - pub.ready(); - - return pub.onStop(function() { - return cursorFileListHandle.stop(); - }); -}; diff --git a/server/main.d.ts b/server/main.d.ts index 7245264d085..b9bf69c0e03 100644 --- a/server/main.d.ts +++ b/server/main.d.ts @@ -7,6 +7,12 @@ declare module 'meteor/random' { } } +declare module 'meteor/mongo' { + namespace MongoInternals { + function defaultRemoteCollectionDriver(): any; + } +} + declare module 'meteor/accounts-base' { namespace Accounts { function _bcryptRounds(): number; diff --git a/server/main.js b/server/main.js index d24acf47316..8b62fe259bd 100644 --- a/server/main.js +++ b/server/main.js @@ -5,7 +5,6 @@ import '../lib/RegExp'; import '../ee/server'; import './lib/pushConfig'; -import './lib/roomFiles'; import './startup/migrations'; import './startup/appcache'; import './startup/cron'; diff --git a/server/publications/settings/emitter.js b/server/publications/settings/emitter.js index 2a9312b2254..0756d9af8b9 100644 --- a/server/publications/settings/emitter.js +++ b/server/publications/settings/emitter.js @@ -1,6 +1,7 @@ -import { Settings } from '../../../app/models'; -import { Notifications } from '../../../app/notifications'; -import { hasPermission } from '../../../app/authorization'; +import { Settings } from '../../../app/models/server'; +import { Notifications } from '../../../app/notifications/server'; +import { hasAtLeastOnePermission } from '../../../app/authorization/server'; +import { SettingsEvents } from '../../../app/settings/server/functions/settings'; Settings.on('change', ({ clientAction, id, data, diff }) => { if (diff && Object.keys(diff).length === 1 && diff._updatedAt) { // avoid useless changes @@ -9,14 +10,18 @@ Settings.on('change', ({ clientAction, id, data, diff }) => { switch (clientAction) { case 'updated': case 'inserted': { - const setting = data || Settings.findOneById(id); + const setting = data ?? Settings.findOneById(id); const value = { _id: setting._id, value: setting.value, editor: setting.editor, properties: setting.properties, + enterprise: setting.enterprise, + requiredOnWizard: setting.requiredOnWizard, }; + SettingsEvents.emit('change-setting', setting, value); + if (setting.public === true) { Notifications.notifyAllInThisInstance('public-settings-changed', clientAction, value); } @@ -36,10 +41,9 @@ Settings.on('change', ({ clientAction, id, data, diff }) => { } }); - Notifications.streamAll.allowRead('private-settings-changed', function() { if (this.userId == null) { return false; } - return hasPermission(this.userId, 'view-privileged-setting'); + return hasAtLeastOnePermission(this.userId, ['view-privileged-setting', 'edit-privileged-setting', 'manage-selected-settings']); }); diff --git a/server/publications/settings/index.js b/server/publications/settings/index.js index c06948fb22a..d0afdc93a98 100644 --- a/server/publications/settings/index.js +++ b/server/publications/settings/index.js @@ -1,10 +1,10 @@ import { Meteor } from 'meteor/meteor'; import { Settings } from '../../../app/models/server'; -import { Notifications } from '../../../app/notifications/server'; import { hasPermission, hasAtLeastOnePermission } from '../../../app/authorization/server'; import { getSettingPermissionId } from '../../../app/authorization/lib'; import { SettingsEvents } from '../../../app/settings/server/functions/settings'; +import './emitter'; Meteor.methods({ 'public-settings/get'(updatedAt) { @@ -57,7 +57,7 @@ Meteor.methods({ if (!(updatedAfter instanceof Date)) { // this does not only imply an unfiltered setting range, it also identifies the caller's context: - // If called *with* filter (see below), the user wants a colllection as a result. + // If called *with* filter (see below), the user wants a collection as a result. // in this case, it shall only be a plain array return getAuthorizedSettings(updatedAfter, privilegedSetting); } @@ -77,48 +77,3 @@ Meteor.methods({ }; }, }); - -Settings.on('change', ({ clientAction, id, data, diff }) => { - if (diff && Object.keys(diff).length === 1 && diff._updatedAt) { // avoid useless changes - return; - } - switch (clientAction) { - case 'updated': - case 'inserted': { - const setting = data || Settings.findOneById(id); - const value = { - _id: setting._id, - value: setting.value, - editor: setting.editor, - properties: setting.properties, - enterprise: setting.enterprise, - requiredOnWizard: setting.requiredOnWizard, - }; - - SettingsEvents.emit('change-setting', setting, value); - - if (setting.public === true) { - Notifications.notifyAllInThisInstance('public-settings-changed', clientAction, value); - } - Notifications.notifyLoggedInThisInstance('private-settings-changed', clientAction, setting); - break; - } - - case 'removed': { - const setting = data || Settings.findOneById(id, { fields: { public: 1 } }); - - if (setting && setting.public === true) { - Notifications.notifyAllInThisInstance('public-settings-changed', clientAction, { _id: id }); - } - Notifications.notifyLoggedInThisInstance('private-settings-changed', clientAction, { _id: id }); - break; - } - } -}); - -Notifications.streamAll.allowRead('private-settings-changed', function() { - if (this.userId == null) { - return false; - } - return hasAtLeastOnePermission(this.userId, ['view-privileged-setting', 'edit-privileged-setting', 'manage-selected-settings']); -}); diff --git a/server/publications/subscription/emitter.js b/server/publications/subscription/emitter.js index dfcd50f88a3..ba93668f488 100644 --- a/server/publications/subscription/emitter.js +++ b/server/publications/subscription/emitter.js @@ -14,11 +14,18 @@ Subscriptions.on('change', ({ clientAction, id, data }) => { case 'removed': data = Subscriptions.trashFindOneById(id, { fields: { u: 1, rid: 1 } }); + if (!data) { + return; + } // emit a removed event on msg stream to remove the user's stream-room-messages subscription when the user is removed from room msgStream.__emit(data.u._id, clientAction, data); break; } + if (!data) { + return; + } + Notifications.streamUser.__emit(data.u._id, clientAction, data); Notifications.notifyUserInThisInstance( diff --git a/server/startup/presence.js b/server/startup/presence.js index 10abc5b7a78..d28c69906ae 100644 --- a/server/startup/presence.js +++ b/server/startup/presence.js @@ -2,6 +2,8 @@ import { Meteor } from 'meteor/meteor'; import { InstanceStatus } from 'meteor/konecty:multiple-instances-status'; import { UserPresence, UserPresenceMonitor } from 'meteor/konecty:user-presence'; +import InstanceStatusModel from '../../app/models/server/models/InstanceStatus'; +import UsersSessionsModel from '../../app/models/server/models/UsersSessions'; Meteor.startup(function() { const instance = { @@ -20,6 +22,50 @@ Meteor.startup(function() { const startMonitor = typeof process.env.DISABLE_PRESENCE_MONITOR === 'undefined' || !['true', 'yes'].includes(String(process.env.DISABLE_PRESENCE_MONITOR).toLowerCase()); if (startMonitor) { - UserPresenceMonitor.start(); + // UserPresenceMonitor.start(); + + // Remove lost connections + const ids = InstanceStatusModel.find({}, { fields: { _id: 1 } }).fetch().map((id) => id._id); + + const update = { + $pull: { + connections: { + instanceId: { + $nin: ids, + }, + }, + }, + }; + UsersSessionsModel.update({}, update, { multi: true }); + + InstanceStatusModel.on('change', ({ clientAction, id }) => { + switch (clientAction) { + case 'removed': + UserPresence.removeConnectionsByInstanceId(id); + break; + } + }); + + UsersSessionsModel.on('change', ({ clientAction, id, data }) => { + switch (clientAction) { + case 'inserted': + UserPresenceMonitor.processUserSession(data, 'added'); + break; + case 'updated': + data = data ?? UsersSessionsModel.findOneById(id); + if (data) { + UserPresenceMonitor.processUserSession(data, 'changed'); + } + break; + case 'removed': + UserPresenceMonitor.processUserSession({ + _id: id, + connections: [{ + fake: true, + }], + }, 'removed'); + break; + } + }); } }); diff --git a/server/stream/messages/emitter.js b/server/stream/messages/emitter.js index aa1f7260d91..d86dea2639f 100644 --- a/server/stream/messages/emitter.js +++ b/server/stream/messages/emitter.js @@ -31,7 +31,7 @@ Meteor.startup(function() { switch (clientAction) { case 'inserted': case 'updated': - const message = data || Messages.findOne({ _id: id }); + const message = data ?? Messages.findOne({ _id: id }); publishMessage(clientAction, message); break; } diff --git a/server/stream/streamBroadcast.js b/server/stream/streamBroadcast.js index 0b982b6e99b..b615e1b5d68 100644 --- a/server/stream/streamBroadcast.js +++ b/server/stream/streamBroadcast.js @@ -11,6 +11,7 @@ import { hasPermission } from '../../app/authorization'; import { settings } from '../../app/settings'; import { isDocker, getURL } from '../../app/utils'; import { Users } from '../../app/models/server'; +import InstanceStatusModel from '../../app/models/server/models/InstanceStatus'; process.env.PORT = String(process.env.PORT).trim(); process.env.INSTANCE_IP = String(process.env.INSTANCE_IP).trim(); @@ -56,26 +57,17 @@ function authorizeConnection(instance) { return _authorizeConnection(instance); } +const cache = new Map(); const originalSetDefaultStatus = UserPresence.setDefaultStatus; function startMatrixBroadcast() { if (!startMonitor) { UserPresence.setDefaultStatus = originalSetDefaultStatus; } - const query = { - 'extraInformation.port': { - $exists: true, - }, - }; - - const options = { - sort: { - _createdAt: -1, - }, - }; - - return InstanceStatus.getCollection().find(query, options).observe({ + const actions = { added(record) { + cache.set(record._id, record); + const subPath = getURL('', { cdn: false, full: false }); let instance = `${ record.extraInformation.host }:${ record.extraInformation.port }${ subPath }`; @@ -111,7 +103,13 @@ function startMatrixBroadcast() { }; }, - removed(record) { + removed(id) { + const record = cache.get(id); + if (!record) { + return; + } + cache.delete(id); + const subPath = getURL('', { cdn: false, full: false }); let instance = `${ record.extraInformation.host }:${ record.extraInformation.port }${ subPath }`; @@ -130,6 +128,32 @@ function startMatrixBroadcast() { return delete connections[instance]; } }, + }; + + const query = { + 'extraInformation.port': { + $exists: true, + }, + }; + + const options = { + sort: { + _createdAt: -1, + }, + }; + + InstanceStatusModel.find(query, options).fetch().forEach(actions.added); + return InstanceStatusModel.on('change', ({ clientAction, id, data }) => { + switch (clientAction) { + case 'inserted': + if (data.extraInformation?.port) { + actions.added(data); + } + break; + case 'removed': + actions.removed(id); + break; + } }); }