diff --git a/packages/rocketchat-lib/server/models/_BaseCache.js b/packages/rocketchat-lib/server/models/_BaseCache.js index 39d3b9265c1..f3537131d9a 100644 --- a/packages/rocketchat-lib/server/models/_BaseCache.js +++ b/packages/rocketchat-lib/server/models/_BaseCache.js @@ -1,4 +1,3 @@ -/* globals MongoInternals */ /* eslint new-cap: 0 */ import loki from 'lokijs'; @@ -424,7 +423,7 @@ class ModelsBaseCache extends EventEmitter { } console.log(String(data.length), 'records load from', this.collectionName); RocketChat.statsTracker.timing('cache.load', RocketChat.statsTracker.now() - time, [`collection:${this.collectionName}`]); - // this.startOplog(); + this.startSync(); this.loaded = true; this.emit('afterload'); @@ -459,7 +458,7 @@ class ModelsBaseCache extends EventEmitter { return; } - this.model._db.on('change', ({action, id, data}) => { + this.model._db.on('change', ({action, id, data, oplog}) => { switch (action) { case 'insert': data._id = id; @@ -470,71 +469,21 @@ class ModelsBaseCache extends EventEmitter { this.removeById(id); break; - case 'update:db': + case 'update:record': + this.updateDiffById(id, data); + break; + + case 'update:diff': this.updateDiffById(id, data); break; - case 'update:cache': + case 'update:query': this.update(data.query, data.update, data.options); break; } }); } - startOplog() { - const query = { - collection: this.collectionName - }; - - if (!MongoInternals.defaultRemoteCollectionDriver().mongo._oplogHandle || !MongoInternals.defaultRemoteCollectionDriver().mongo._oplogHandle.onOplogEntry) { - console.error('\nYour MongoDB is not with ReplicaSet enabled.\nPlease enable it.\nYou can see more information at:\n* https://docs.mongodb.com/v3.2/tutorial/convert-standalone-to-replica-set/ \n* https://github.com/RocketChat/Rocket.Chat/issues/5212\n'); - throw new Meteor.Error('Your MongoDB is not with ReplicaSet enabled.'); - } - - MongoInternals.defaultRemoteCollectionDriver().mongo._oplogHandle.onOplogEntry(query, (record) => { - this.processOplogRecord(record); - }); - } - - processOplogRecord(action) { - if (action.op.op === 'i') { - this.insert(action.op.o); - return; - } - - if (action.op.op === 'u') { - let diff = {}; - - if (!action.op.o.$set && !action.op.o.$unset) { - diff = action.op.o; - } else { - if (action.op.o.$set) { - for (let key in action.op.o.$set) { - if (action.op.o.$set.hasOwnProperty(key)) { - diff[key] = action.op.o.$set[key]; - } - } - } - - if (action.op.o.$unset) { - for (let key in action.op.o.$unset) { - if (action.op.o.$unset.hasOwnProperty(key)) { - diff[key] = undefined; - } - } - } - } - - this.updateDiffById(action.id, diff); - return; - } - - if (action.op.op === 'd') { - this.removeById(action.id); - return; - } - } - processQueryOptionsOnResult(result, options={}) { if (result === undefined || result === null) { return undefined; diff --git a/packages/rocketchat-lib/server/models/_BaseDb.js b/packages/rocketchat-lib/server/models/_BaseDb.js index 840eb431086..b831391fcbf 100644 --- a/packages/rocketchat-lib/server/models/_BaseDb.js +++ b/packages/rocketchat-lib/server/models/_BaseDb.js @@ -1,3 +1,5 @@ +/* globals MongoInternals */ + const baseName = 'rocketchat_'; import {EventEmitter} from 'events'; @@ -27,6 +29,21 @@ class ModelsBaseDb extends EventEmitter { this.wrapModel(); + this.isOplogAvailable = MongoInternals.defaultRemoteCollectionDriver().mongo._oplogHandle && !!MongoInternals.defaultRemoteCollectionDriver().mongo._oplogHandle.onOplogEntry; + + // When someone start listening for changes we start oplog if available + this.once('newListener', (event/*, listener*/) => { + if (event === 'change') { + if (this.isOplogAvailable) { + const query = { + collection: this.collectionName + }; + + MongoInternals.defaultRemoteCollectionDriver().mongo._oplogHandle.onOplogEntry(query, this.processOplogRecord.bind(this)); + } + } + }); + this.tryEnsureIndex({ '_updatedAt': 1 }); } @@ -112,15 +129,74 @@ class ModelsBaseDb extends EventEmitter { return 'cache'; } + processOplogRecord(action) { + if (action.op.op === 'i') { + this.emit('change', { + action: 'insert', + id: action.op.o._id, + data: action.op.o, + oplog: true + }); + return; + } + + if (action.op.op === 'u') { + if (!action.op.o.$set && !action.op.o.$unset) { + this.emit('change', { + action: 'update:record', + id: action.id, + data: action.op.o, + oplog: true + }); + return; + } + + let diff = {}; + if (action.op.o.$set) { + for (let key in action.op.o.$set) { + if (action.op.o.$set.hasOwnProperty(key)) { + diff[key] = action.op.o.$set[key]; + } + } + } + + if (action.op.o.$unset) { + for (let key in action.op.o.$unset) { + if (action.op.o.$unset.hasOwnProperty(key)) { + diff[key] = undefined; + } + } + } + + this.emit('change', { + action: 'update:diff', + id: action.id, + data: diff, + oplog: true + }); + return; + } + + if (action.op.op === 'd') { + this.emit('change', { + action: 'remove', + id: action.id, + oplog: true + }); + return; + } + } + insert(record) { this.setUpdatedAt(record); const result = this.originals.insert(...arguments); - if (this.listenerCount('change') > 0) { + if (!this.isOplogAvailable && this.listenerCount('change') > 0) { this.emit('change', { action: 'insert', id: result, - data: _.extend({}, record) + data: _.extend({}, record), + oplog: false }); } @@ -134,7 +210,7 @@ class ModelsBaseDb extends EventEmitter { let strategy = this.defineSyncStrategy(query, update, options); let ids = []; - if (this.listenerCount('change') > 0 && strategy === 'db') { + if (!this.isOplogAvailable && this.listenerCount('change') > 0 && strategy === 'db') { const findOptions = {fields: {_id: 1}}; let records = options.multi ? this.find(query, findOptions).fetch() : this.findOne(query, findOptions) || []; if (!Array.isArray(records)) { @@ -153,14 +229,15 @@ class ModelsBaseDb extends EventEmitter { const result = this.originals.update(query, update, options); - if (this.listenerCount('change') > 0) { + if (!this.isOplogAvailable && this.listenerCount('change') > 0) { if (strategy === 'db') { if (options.upsert === true) { if (result.insertedId) { this.emit('change', { action: 'insert', id: result.insertedId, - data: this.findOne({_id: result.insertedId}) + data: this.findOne({_id: result.insertedId}), + oplog: false }); return; } @@ -178,20 +255,22 @@ class ModelsBaseDb extends EventEmitter { } for (const record of records) { this.emit('change', { - action: 'update:db', + action: 'update:record', id: record._id, - data: _.extend({}, record) + data: _.extend({}, record), + oplog: false }); } } else { this.emit('change', { - action: 'update:cache', + action: 'update:query', id: undefined, data: { query: query, update: update, options: options - } + }, + oplog: false }); } } @@ -221,12 +300,15 @@ class ModelsBaseDb extends EventEmitter { const result = this.originals.remove(query); - for (const record of records) { - this.emit('change', { - action: 'remove', - id: record._id, - data: _.extend({}, record) - }); + if (!this.isOplogAvailable && this.listenerCount('change') > 0) { + for (const record of records) { + this.emit('change', { + action: 'remove', + id: record._id, + data: _.extend({}, record), + oplog: false + }); + } } return result; diff --git a/server/stream/messages.coffee b/server/stream/messages.coffee index b858de982ae..6141449dd36 100644 --- a/server/stream/messages.coffee +++ b/server/stream/messages.coffee @@ -45,15 +45,17 @@ Meteor.startup -> query = collection: RocketChat.models.Messages.collectionName - MongoInternals.defaultRemoteCollectionDriver().mongo._oplogHandle.onOplogEntry query, (action) -> - if action.op.op is 'i' - publishMessage 'inserted', action.op.o - return - - if action.op.op is 'u' - publishMessage 'updated', RocketChat.models.Messages.findOne({_id: action.id}) - return - - # if action.op.op is 'd' - # publishMessage 'deleted', {_id: action.id} - # return + RocketChat.models.Messages._db.on 'change', ({action, id, data, oplog}) => + switch action + when 'insert' + data._id = id; + publishMessage 'inserted', data + break; + + when 'update:record' + publishMessage 'updated', data; + break; + + when 'update:diff' + publishMessage 'updated', RocketChat.models.Messages.findOne({_id: id}) + break;