diff --git a/packages/rocketchat-lib/server/models/_Base.js b/packages/rocketchat-lib/server/models/_Base.js index b8bfcdf73d7..940b98d9c46 100644 --- a/packages/rocketchat-lib/server/models/_Base.js +++ b/packages/rocketchat-lib/server/models/_Base.js @@ -5,7 +5,7 @@ RocketChat.models._CacheControl = new Meteor.EnvironmentVariable(); class ModelsBase { constructor(nameOrModel, useCache) { - this._db = new ModelsBaseDb(nameOrModel); + this._db = new ModelsBaseDb(nameOrModel, this); this.model = this._db.model; this.collectionName = this._db.collectionName; this.name = this._db.name; diff --git a/packages/rocketchat-lib/server/models/_BaseCache.js b/packages/rocketchat-lib/server/models/_BaseCache.js index 86154da7d46..39d3b9265c1 100644 --- a/packages/rocketchat-lib/server/models/_BaseCache.js +++ b/packages/rocketchat-lib/server/models/_BaseCache.js @@ -455,86 +455,28 @@ class ModelsBaseCache extends EventEmitter { } startSync() { - const db = this.model._db; - - // INSERT - db.on('afterInsert', (beforeInsertResult, _id, record) => { - record._id = _id; - this.insert(record); - }); - - // REMOVE - db.on('afterRemove', (beforeRemoveResult, result, records/*, query*/) => { - for (const record of records) { - console.log('afterRemove', this.collectionName, record._id); - this.removeById(record._id); - } - }); - - // UPDATE - db.on('beforeUpdate', (response, query, update, options = {}) => { - if (this.defineSyncStrategy(query, update) === 'db') { - const findOptions = {fields: {_id: 1}}; - let records = options.multi ? db.find(query, findOptions).fetch() : db.findOne(query, findOptions) || []; - if (!Array.isArray(records)) { - records = [records]; - } - for (const key in query) { - if (query.hasOwnProperty(key)) { - delete query[key]; - } - } - query._id = { - $in: records.map(item => item._id) - }; - response.strategy = 'db'; - } - }); - - db.on('afterUpdate', (beforeUpdateResult, result, query, update, options = {}) => { - if (beforeUpdateResult.strategy === 'db') { - // Find only updated fields? - let records = options.multi ? db.find(query).fetch() : db.findOne(query) || []; - if (!Array.isArray(records)) { - records = [records]; - } - for (const record of records) { - this.updateDiffById(record._id, record); - } - } else { - this.update(query, update, options); - } - }); - - db.on('beforeUpsert', (response, query, update, options = {}) => { - const findOptions = {fields: {_id: 1}}; - let records = options.multi ? db.find(query, findOptions).fetch() : db.findOne(query, findOptions) || []; - if (!Array.isArray(records)) { - records = [records]; - } + if (this.model._useCache === false) { + return; + } - response.ids = records.map(item => item._id); - }); + this.model._db.on('change', ({action, id, data}) => { + switch (action) { + case 'insert': + data._id = id; + this.insert(data); + break; - // Always go to DB to get update records - db.on('afterUpsert', (beforeUpsertResult, result, query, update, options = {}) => { - if (result.insertedId) { - this.insert(db.findOne({_id: result.insertedId})); - return; - } + case 'remove': + this.removeById(id); + break; - const findQuery = { - _id: { - $in: beforeUpsertResult.ids - } - }; + case 'update:db': + this.updateDiffById(id, data); + break; - let records = options.multi ? db.find(findQuery).fetch() : db.findOne(findQuery) || []; - if (!Array.isArray(records)) { - records = [records]; - } - for (const record of records) { - this.updateDiffById(record._id, record); + case 'update:cache': + this.update(data.query, data.update, data.options); + break; } }); } diff --git a/packages/rocketchat-lib/server/models/_BaseDb.js b/packages/rocketchat-lib/server/models/_BaseDb.js index 872464a5453..840eb431086 100644 --- a/packages/rocketchat-lib/server/models/_BaseDb.js +++ b/packages/rocketchat-lib/server/models/_BaseDb.js @@ -10,7 +10,7 @@ try { } class ModelsBaseDb extends EventEmitter { - constructor(model) { + constructor(model, baseModel) { super(); if (Match.test(model, String)) { @@ -23,6 +23,8 @@ class ModelsBaseDb extends EventEmitter { this.model = model; } + this.baseModel = baseModel; + this.wrapModel(); this.tryEnsureIndex({ '_updatedAt': 1 }); @@ -78,35 +80,120 @@ class ModelsBaseDb extends EventEmitter { return this.model.findOne(...arguments); } + defineSyncStrategy(query, modifier, options) { + if (this.baseModel.useCache === false) { + return 'db'; + } + + if (options.upsert === true) { + return 'db'; + } + + const dbModifiers = [ + '$currentDate', + '$bit', + '$pull', + '$pushAll', + '$push', + '$setOnInsert' + ]; + + const modifierKeys = Object.keys(modifier); + + if (_.intersection(modifierKeys, dbModifiers).length > 0) { + return 'db'; + } + + const placeholderFields = Object.keys(query).filter(item => item.indexOf('$') > -1); + if (placeholderFields.length > 0) { + return 'db'; + } + + return 'cache'; + } + insert(record) { this.setUpdatedAt(record); - const beforeInsertResult = {}; - this.emit('beforeInsert', beforeInsertResult, ...arguments); - const result = this.originals.insert(...arguments); + if (this.listenerCount('change') > 0) { + this.emit('change', { + action: 'insert', + id: result, + data: _.extend({}, record) + }); + } + record._id = result; - this.emit('afterInsert', beforeInsertResult, result, ...arguments); return result; } update(query, update, options = {}) { this.setUpdatedAt(update, true, query); - const beforeUpdateResult = {}; - if (options.upsert === true) { - this.emit('beforeUpsert', beforeUpdateResult, ...arguments); - } else { - this.emit('beforeUpdate', beforeUpdateResult, ...arguments); + let strategy = this.defineSyncStrategy(query, update, options); + let ids = []; + if (this.listenerCount('change') > 0 && strategy === 'db') { + const findOptions = {fields: {_id: 1}}; + let records = options.multi ? this.find(query, findOptions).fetch() : this.findOne(query, findOptions) || []; + if (!Array.isArray(records)) { + records = [records]; + } + + ids = records.map(item => item._id); + if (options.upsert !== true) { + query = { + _id: { + $in: ids + } + }; + } } const result = this.originals.update(query, update, options); - if (options.upsert === true) { - this.emit('afterUpsert', beforeUpdateResult, result, ...arguments); - } else { - this.emit('afterUpdate', beforeUpdateResult, result, ...arguments); + if (this.listenerCount('change') > 0) { + if (strategy === 'db') { + if (options.upsert === true) { + if (result.insertedId) { + this.emit('change', { + action: 'insert', + id: result.insertedId, + data: this.findOne({_id: result.insertedId}) + }); + return; + } + + query = { + _id: { + $in: ids + } + }; + } + + let records = options.multi ? this.find(query).fetch() : this.findOne(query) || []; + if (!Array.isArray(records)) { + records = [records]; + } + for (const record of records) { + this.emit('change', { + action: 'update:db', + id: record._id, + data: _.extend({}, record) + }); + } + } else { + this.emit('change', { + action: 'update:cache', + id: undefined, + data: { + query: query, + update: update, + options: options + } + }); + } } return result; } @@ -118,9 +205,6 @@ class ModelsBaseDb extends EventEmitter { } remove(query) { - const beforeRemoveResult = {}; - this.emit('beforeRemove', beforeRemoveResult, ...arguments); - const records = this.model.find(query).fetch(); const ids = []; @@ -136,7 +220,15 @@ class ModelsBaseDb extends EventEmitter { query = { _id: { $in: ids } }; const result = this.originals.remove(query); - this.emit('afterRemove', beforeRemoveResult, result, records, ...arguments); + + for (const record of records) { + this.emit('change', { + action: 'remove', + id: record._id, + data: _.extend({}, record) + }); + } + return result; }