Move sync code to baseDb and unify under a single event

pull/5371/head
Rodrigo Nascimento 9 years ago
parent 444b43926a
commit cc704a00af
No known key found for this signature in database
GPG Key ID: 2C85B3AFE75D23F9
  1. 2
      packages/rocketchat-lib/server/models/_Base.js
  2. 94
      packages/rocketchat-lib/server/models/_BaseCache.js
  3. 128
      packages/rocketchat-lib/server/models/_BaseDb.js

@ -5,7 +5,7 @@ RocketChat.models._CacheControl = new Meteor.EnvironmentVariable();
class ModelsBase {
constructor(nameOrModel, useCache) {
this._db = new ModelsBaseDb(nameOrModel);
this._db = new ModelsBaseDb(nameOrModel, this);
this.model = this._db.model;
this.collectionName = this._db.collectionName;
this.name = this._db.name;

@ -455,86 +455,28 @@ class ModelsBaseCache extends EventEmitter {
}
startSync() {
const db = this.model._db;
// INSERT
db.on('afterInsert', (beforeInsertResult, _id, record) => {
record._id = _id;
this.insert(record);
});
// REMOVE
db.on('afterRemove', (beforeRemoveResult, result, records/*, query*/) => {
for (const record of records) {
console.log('afterRemove', this.collectionName, record._id);
this.removeById(record._id);
}
});
// UPDATE
db.on('beforeUpdate', (response, query, update, options = {}) => {
if (this.defineSyncStrategy(query, update) === 'db') {
const findOptions = {fields: {_id: 1}};
let records = options.multi ? db.find(query, findOptions).fetch() : db.findOne(query, findOptions) || [];
if (!Array.isArray(records)) {
records = [records];
}
for (const key in query) {
if (query.hasOwnProperty(key)) {
delete query[key];
}
}
query._id = {
$in: records.map(item => item._id)
};
response.strategy = 'db';
}
});
db.on('afterUpdate', (beforeUpdateResult, result, query, update, options = {}) => {
if (beforeUpdateResult.strategy === 'db') {
// Find only updated fields?
let records = options.multi ? db.find(query).fetch() : db.findOne(query) || [];
if (!Array.isArray(records)) {
records = [records];
}
for (const record of records) {
this.updateDiffById(record._id, record);
}
} else {
this.update(query, update, options);
}
});
db.on('beforeUpsert', (response, query, update, options = {}) => {
const findOptions = {fields: {_id: 1}};
let records = options.multi ? db.find(query, findOptions).fetch() : db.findOne(query, findOptions) || [];
if (!Array.isArray(records)) {
records = [records];
}
if (this.model._useCache === false) {
return;
}
response.ids = records.map(item => item._id);
});
this.model._db.on('change', ({action, id, data}) => {
switch (action) {
case 'insert':
data._id = id;
this.insert(data);
break;
// Always go to DB to get update records
db.on('afterUpsert', (beforeUpsertResult, result, query, update, options = {}) => {
if (result.insertedId) {
this.insert(db.findOne({_id: result.insertedId}));
return;
}
case 'remove':
this.removeById(id);
break;
const findQuery = {
_id: {
$in: beforeUpsertResult.ids
}
};
case 'update:db':
this.updateDiffById(id, data);
break;
let records = options.multi ? db.find(findQuery).fetch() : db.findOne(findQuery) || [];
if (!Array.isArray(records)) {
records = [records];
}
for (const record of records) {
this.updateDiffById(record._id, record);
case 'update:cache':
this.update(data.query, data.update, data.options);
break;
}
});
}

@ -10,7 +10,7 @@ try {
}
class ModelsBaseDb extends EventEmitter {
constructor(model) {
constructor(model, baseModel) {
super();
if (Match.test(model, String)) {
@ -23,6 +23,8 @@ class ModelsBaseDb extends EventEmitter {
this.model = model;
}
this.baseModel = baseModel;
this.wrapModel();
this.tryEnsureIndex({ '_updatedAt': 1 });
@ -78,35 +80,120 @@ class ModelsBaseDb extends EventEmitter {
return this.model.findOne(...arguments);
}
defineSyncStrategy(query, modifier, options) {
if (this.baseModel.useCache === false) {
return 'db';
}
if (options.upsert === true) {
return 'db';
}
const dbModifiers = [
'$currentDate',
'$bit',
'$pull',
'$pushAll',
'$push',
'$setOnInsert'
];
const modifierKeys = Object.keys(modifier);
if (_.intersection(modifierKeys, dbModifiers).length > 0) {
return 'db';
}
const placeholderFields = Object.keys(query).filter(item => item.indexOf('$') > -1);
if (placeholderFields.length > 0) {
return 'db';
}
return 'cache';
}
insert(record) {
this.setUpdatedAt(record);
const beforeInsertResult = {};
this.emit('beforeInsert', beforeInsertResult, ...arguments);
const result = this.originals.insert(...arguments);
if (this.listenerCount('change') > 0) {
this.emit('change', {
action: 'insert',
id: result,
data: _.extend({}, record)
});
}
record._id = result;
this.emit('afterInsert', beforeInsertResult, result, ...arguments);
return result;
}
update(query, update, options = {}) {
this.setUpdatedAt(update, true, query);
const beforeUpdateResult = {};
if (options.upsert === true) {
this.emit('beforeUpsert', beforeUpdateResult, ...arguments);
} else {
this.emit('beforeUpdate', beforeUpdateResult, ...arguments);
let strategy = this.defineSyncStrategy(query, update, options);
let ids = [];
if (this.listenerCount('change') > 0 && strategy === 'db') {
const findOptions = {fields: {_id: 1}};
let records = options.multi ? this.find(query, findOptions).fetch() : this.findOne(query, findOptions) || [];
if (!Array.isArray(records)) {
records = [records];
}
ids = records.map(item => item._id);
if (options.upsert !== true) {
query = {
_id: {
$in: ids
}
};
}
}
const result = this.originals.update(query, update, options);
if (options.upsert === true) {
this.emit('afterUpsert', beforeUpdateResult, result, ...arguments);
} else {
this.emit('afterUpdate', beforeUpdateResult, result, ...arguments);
if (this.listenerCount('change') > 0) {
if (strategy === 'db') {
if (options.upsert === true) {
if (result.insertedId) {
this.emit('change', {
action: 'insert',
id: result.insertedId,
data: this.findOne({_id: result.insertedId})
});
return;
}
query = {
_id: {
$in: ids
}
};
}
let records = options.multi ? this.find(query).fetch() : this.findOne(query) || [];
if (!Array.isArray(records)) {
records = [records];
}
for (const record of records) {
this.emit('change', {
action: 'update:db',
id: record._id,
data: _.extend({}, record)
});
}
} else {
this.emit('change', {
action: 'update:cache',
id: undefined,
data: {
query: query,
update: update,
options: options
}
});
}
}
return result;
}
@ -118,9 +205,6 @@ class ModelsBaseDb extends EventEmitter {
}
remove(query) {
const beforeRemoveResult = {};
this.emit('beforeRemove', beforeRemoveResult, ...arguments);
const records = this.model.find(query).fetch();
const ids = [];
@ -136,7 +220,15 @@ class ModelsBaseDb extends EventEmitter {
query = { _id: { $in: ids } };
const result = this.originals.remove(query);
this.emit('afterRemove', beforeRemoveResult, result, records, ...arguments);
for (const record of records) {
this.emit('change', {
action: 'remove',
id: record._id,
data: _.extend({}, record)
});
}
return result;
}

Loading…
Cancel
Save