[FIX] cachedcollection calling multiple times SYNC (#15104)

* fixed cachedcollection calling multiple times

* fix tests

* fix review
pull/15074/head^2
Guilherme Gazzo 6 years ago committed by GitHub
parent 85711287ea
commit b9a9a3ff0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      app/models/server/models/Settings.js
  2. 2
      app/settings/client/lib/settings.js
  3. 413
      app/ui-cached-collection/client/models/CachedCollection.js
  4. 9
      server/publications/settings/index.js

@ -55,7 +55,7 @@ export class Settings extends Base {
};
if (ids.length > 0) {
filter._id = { $in: ids };
filter._id = { $in: ids };
}
return this.find(filter, { fields: { _id: 1, value: 1 } });

@ -13,8 +13,6 @@ settings.cachedCollection = new CachedCollection({
settings.collection = settings.cachedCollection.collection;
settings.cachedCollection.init();
settings.dict = new ReactiveDict('settings');
settings.get = function(_id) {

@ -6,17 +6,35 @@ import { ReactiveVar } from 'meteor/reactive-var';
import { Tracker } from 'meteor/tracker';
import localforage from 'localforage';
import _ from 'underscore';
import EventEmitter from 'wolfy87-eventemitter';
import { callbacks } from '../../../callbacks';
import Notifications from '../../../notifications/client/lib/Notifications';
import { getConfig } from '../../../ui-utils/client/config';
class CachedCollectionManagerClass {
const fromEntries = Object.fromEntries || function fromEntries(iterable) {
return [...iterable].reduce((obj, { 0: key, 1: val }) => Object.assign(obj, { [key]: val }), {});
};
const wrap = (fn) => (...args) => new Promise((resolve, reject) => {
fn(...args, (err, result) => {
if (err) {
return reject(err);
}
return resolve(result);
});
});
const call = wrap(Meteor.call);
const localforageGetItem = wrap(localforage.getItem);
class CachedCollectionManagerClass extends EventEmitter {
constructor() {
super();
this.items = [];
this._syncEnabled = false;
this.reconnectCb = [];
this.loginCb = [];
this.logged = false;
const { _unstoreLoginToken } = Accounts;
@ -27,31 +45,27 @@ class CachedCollectionManagerClass {
// Wait 1s to start or the code will run before the connection and
// on first connection the `reconnect` callbacks will run
Meteor.setTimeout(() => {
let connectionWasOnline = true;
Tracker.autorun(() => {
const { connected } = Meteor.connection.status();
if (connected === true && connectionWasOnline === false) {
for (const cb of this.reconnectCb) {
cb();
}
}
connectionWasOnline = connected;
});
}, 1000);
Tracker.autorun(() => {
if (Meteor.userId() !== null) {
if (this.logged === false) {
for (const cb of this.loginCb) {
cb();
}
}
const [WAITING_FIRST_CONNECTION, WAITING_FIRST_DICONNECTION, LISTENING_RECONNECTIONS] = [0, 1, 2];
this.step = this.step || WAITING_FIRST_CONNECTION;
const { connected } = Meteor.status();
switch (this.step) {
case WAITING_FIRST_CONNECTION:
return !connected || this.step++;
case WAITING_FIRST_DICONNECTION:
return connected || this.step++;
case LISTENING_RECONNECTIONS:
return connected && this.emit('reconnect');
}
});
this.logged = Meteor.userId() !== null;
Tracker.autorun(() => {
const uid = Meteor.userId();
this.logged = uid !== null;
if (this.logged) {
this.emit('login', uid);
}
});
}
@ -87,11 +101,11 @@ class CachedCollectionManagerClass {
}
onReconnect(cb) {
this.reconnectCb.push(cb);
this.on('reconnect', cb);
}
onLogin(cb) {
this.loginCb.push(cb);
this.on('login', cb);
if (this.logged) {
cb();
}
@ -104,11 +118,9 @@ const debug = (name) => [getConfig(`debugCachedCollection-${ name }`), getConfig
const nullLog = function() {};
const log = function(...args) {
console.log(`CachedCollection ${ this.name } =>`, ...args);
};
const log = (...args) => console.log(`CachedCollection ${ this.name } =>`, ...args);
export class CachedCollection {
export class CachedCollection extends EventEmitter {
constructor({
collection,
name,
@ -124,6 +136,7 @@ export class CachedCollection {
maxCacheTime = 60 * 60 * 24 * 30,
onSyncData = (/* action, record */) => {},
}) {
super();
this.collection = collection || new Mongo.Collection(null);
this.ready = new ReactiveVar(false);
@ -141,34 +154,21 @@ export class CachedCollection {
this.maxCacheTime = maxCacheTime;
this.onSyncData = onSyncData;
this.log = debug(name) ? log : nullLog;
CachedCollectionManager.register(this);
if (userRelated === true) {
CachedCollectionManager.onLogin(() => {
this.log('Init on login');
this.ready.set(false);
this.updatedAt = new Date(0);
this.initiated = false;
this.init();
});
}
if (this.useCache === false) {
return this.clearCache();
CachedCollectionManager.register(this);
if (!userRelated) {
this.init();
return;
}
CachedCollectionManager.onLogin(() => {
this.init();
});
}
countQueries() {
this.log(`${ Object.keys(this.collection._collection.queries).length } queries`);
}
recomputeCollectionQueries() {
this.log(`recomputing ${ Object.keys(this.collection._collection.queries).length } queries`);
_.each(this.collection._collection.queries, (query) => {
this.collection._collection._recomputeResults(query);
});
}
getToken() {
if (this.userRelated === false) {
return undefined;
@ -177,157 +177,80 @@ export class CachedCollection {
return Accounts._storedLoginToken();
}
loadFromCache(callback = () => {}) {
if (this.useCache === false) {
return callback(false);
async loadFromCache() {
const data = await localforageGetItem(this.name);
if (!data) {
return false;
}
localforage.getItem(this.name, (error, data) => {
if (data && (data.version < this.version || data.token !== this.getToken() || this.getToken() === undefined)) {
this.clearCache();
callback(false);
return;
}
const now = new Date();
if (data && now - data.updatedAt >= 1000 * this.maxCacheTime) {
this.clearCache();
callback(false);
return;
}
if (data && data.records && data.records.length > 0) {
this.log(`${ data.records.length } records loaded from cache`);
data.records.forEach((record) => {
callbacks.run(`cachedCollection-loadFromCache-${ this.name }`, record);
record.__cache__ = true;
this.collection.upsert({ _id: record._id }, _.omit(record, '_id'));
if (record._updatedAt) {
const _updatedAt = new Date(record._updatedAt);
if (_updatedAt > this.updatedAt) {
this.updatedAt = _updatedAt;
}
}
});
callback(true);
} else {
callback(false);
}
});
}
loadFromServer(callback = () => {}) {
Meteor.call(this.methodName, (error, data) => {
this.log(`${ data.length } records loaded from server`);
data.forEach((record) => {
callbacks.run(`cachedCollection-loadFromServer-${ this.name }`, record, 'changed');
this.collection.upsert({ _id: record._id }, _.omit(record, '_id'));
this.onSyncData('changed', record);
if (record._updatedAt && record._updatedAt > this.updatedAt) {
this.updatedAt = record._updatedAt;
}
});
this.recomputeCollectionQueries();
if (this.updatedAt < new Date()) {
this.updatedAt = new Date();
}
callback(data);
});
}
loadFromServerAndPopulate() {
this.loadFromServer((loadedData) => {
this.ready.set(true);
this.saveCache(loadedData);
});
}
sync() {
if (CachedCollectionManager.syncEnabled === false || Meteor.connection._outstandingMethodBlocks.length !== 0) {
if (data.version < this.version || data.token !== this.getToken()) {
return false;
}
if (data.records.length <= 0) {
return false;
}
this.log(`syncing from ${ this.updatedAt }`);
if (new Date() - data.updatedAt >= 1000 * this.maxCacheTime) {
return false;
}
Meteor.call(this.syncMethodName, this.updatedAt, (error, data) => {
let changes = [];
this.log(`${ data.records.length } records loaded from cache`);
if (data.update && data.update.length > 0) {
this.log(`${ data.update.length } records updated in sync`);
changes.push(...data.update);
}
data.records.forEach((record) => {
callbacks.run(`cachedCollection-loadFromCache-${ this.name }`, record);
// this.collection.direct.insert(record);
if (data.remove && data.remove.length > 0) {
this.log(`${ data.remove.length } records removed in sync`);
changes.push(...data.remove);
if (record._updatedAt) {
return;
}
changes = changes.sort((a, b) => {
const valueA = a._updatedAt || a._deletedAt;
const valueB = b._updatedAt || b._deletedAt;
const _updatedAt = new Date(record._updatedAt);
if (valueA < valueB) {
return -1;
}
if (_updatedAt > this.updatedAt) {
this.updatedAt = _updatedAt;
}
});
if (valueA > valueB) {
return 1;
}
this.collection._collection._docs._map = fromEntries(data.records.map((record) => [record._id, record]));
this.updatedAt = data.updatedAt;
return 0;
});
Object.values(this.collection._collection.queries).forEach((query) => this.collection._collection._recomputeResults(query));
for (const record of changes) {
callbacks.run(`cachedCollection-sync-${ this.name }`, record, record._deletedAt ? 'removed' : 'changed');
if (record._deletedAt) {
this.collection.remove({ _id: record._id });
return true;
}
this.onSyncData('removed', record);
async loadFromServer() {
const startTime = new Date();
const lastTime = this.updatedAt;
const data = await call(this.methodName);
this.log(`${ data.length } records loaded from server`);
data.forEach((record) => {
callbacks.run(`cachedCollection-loadFromServer-${ this.name }`, record, 'changed');
if (record._deletedAt && record._deletedAt > this.updatedAt) {
this.updatedAt = record._deletedAt;
}
} else {
this.collection.upsert({ _id: record._id }, _.omit(record, '_id'));
this.collection.direct.upsert({ _id: record._id }, _.omit(record, '_id'));
this.onSyncData('changed', record);
this.onSyncData('changed', record);
if (record._updatedAt && record._updatedAt > this.updatedAt) {
this.updatedAt = record._updatedAt;
}
}
}
this.saveCache();
if (record._updatedAt && record._updatedAt > this.updatedAt) { this.updatedAt = record._updatedAt; }
});
return true;
this.updatedAt = this.updatedAt === lastTime ? startTime : this.updatedAt;
}
saveCache(data) {
if (this.useCache === false) {
return;
}
async loadFromServerAndPopulate() {
await this.loadFromServer();
this.save();
}
save = _.debounce(() => {
this.log('saving cache');
if (!data) {
data = this.collection.find().fetch();
}
const data = this.collection.find().fetch();
localforage.setItem(this.name, {
updatedAt: new Date(),
updatedAt: this.updatedAt,
version: this.version,
token: this.getToken(),
records: data,
});
this.log('saving cache (done)');
}
}, 1000);
clearCacheOnLogout() {
if (this.userRelated === true) {
@ -343,74 +266,124 @@ export class CachedCollection {
removeRoomFromCacheWhenUserLeaves(roomId, ChatRoom, CachedChatRoom) {
ChatRoom.remove(roomId);
CachedChatRoom.saveCache();
CachedChatRoom.save();
}
async setupListener(eventType, eventName) {
Meteor.startup(async () => {
const { RoomManager } = await import('../../../ui-utils');
const { ChatRoom, CachedChatRoom } = await import('../../../models');
Notifications[eventType || this.eventType](eventName || this.eventName, (t, record) => {
this.log('record received', t, record);
callbacks.run(`cachedCollection-received-${ this.name }`, record, t);
if (t === 'removed') {
let room;
if (this.eventName === 'subscriptions-changed') {
room = ChatRoom.findOne(record.rid);
this.removeRoomFromCacheWhenUserLeaves(room._id, ChatRoom, CachedChatRoom);
} else {
room = this.collection.findOne({ _id: record._id });
}
if (room) {
RoomManager.close(room.t + room.name);
}
this.collection.remove(record._id);
const { RoomManager } = await import('../../../ui-utils');
const { ChatRoom, CachedChatRoom } = await import('../../../models');
Notifications[eventType || this.eventType](eventName || this.eventName, (t, record) => {
this.log('record received', t, record);
callbacks.run(`cachedCollection-received-${ this.name }`, record, t);
if (t === 'removed') {
let room;
if (this.eventName === 'subscriptions-changed') {
room = ChatRoom.findOne(record.rid);
this.removeRoomFromCacheWhenUserLeaves(room._id, ChatRoom, CachedChatRoom);
} else {
this.collection.upsert({ _id: record._id }, _.omit(record, '_id'));
room = this.collection.findOne({
_id: record._id,
});
}
this.saveCache();
});
if (room) {
RoomManager.close(room.t + room.name);
}
this.collection.remove(record._id);
} else {
this.collection.direct.upsert({ _id: record._id }, _.omit(record, '_id'));
}
this.save();
});
}
trySync() {
trySync(delay = 10) {
clearTimeout(this.tm);
// Wait for an empty queue to load data again and sync
const interval = Meteor.setInterval(() => {
if (this.sync()) {
Meteor.clearInterval(interval);
this.tm = setTimeout(async () => {
if (!await this.sync()) {
return this.trySync(delay);
}
}, 200);
this.save();
}, delay);
}
init() {
if (this.initiated === true) {
return;
async sync() {
if (this.updatedAt.valueOf() === 0 || Meteor.connection._outstandingMethodBlocks.length !== 0) {
return false;
}
this.initiated = true;
this.loadFromCache((cacheLoaded) => {
this.ready.set(cacheLoaded);
const startTime = new Date();
const lastTime = this.updatedAt;
if (cacheLoaded === false) {
// If there is no cache load data immediately
this.loadFromServerAndPopulate();
} else if (this.useSync === true) {
this.trySync();
this.log(`syncing from ${ this.updatedAt }`);
const data = await call(this.syncMethodName, this.updatedAt);
let changes = [];
if (data.update && data.update.length > 0) {
this.log(`${ data.update.length } records updated in sync`);
changes.push(...data.update);
}
if (data.remove && data.remove.length > 0) {
this.log(`${ data.remove.length } records removed in sync`);
changes.push(...data.remove);
}
changes = changes.sort((a, b) => {
const valueA = a._updatedAt || a._deletedAt;
const valueB = b._updatedAt || b._deletedAt;
if (valueA < valueB) {
return -1;
}
if (this.useSync === true) {
CachedCollectionManager.onReconnect(() => {
this.trySync();
});
if (valueA > valueB) {
return 1;
}
if (this.listenChangesForLoggedUsersOnly) {
CachedCollectionManager.onLogin(() => {
this.setupListener();
});
return 0;
});
for (const { _id, ...record } of changes) {
const action = record._deletedAt ? 'removed' : 'changed';
callbacks.run(`cachedCollection-sync-${ this.name }`, record, action);
const actionTime = record._deletedAt || record._updatedAt;
if (record._deletedAt) {
this.collection.direct.remove({ _id });
} else {
this.setupListener();
this.collection.direct.upsert({ _id }, record);
}
if (actionTime > this.updatedAt) {
this.updatedAt = record._updatedAt;
}
this.onSyncData(action, { _id, ...record });
}
this.updatedAt = this.updatedAt === lastTime ? startTime : this.updatedAt;
return true;
}
async init() {
this.ready.set(false);
if (await this.loadFromCache()) {
this.trySync();
} else {
await this.loadFromServerAndPopulate();
}
this.ready.set(true);
if (!this.userRelated) {
CachedCollectionManager.onReconnect(() => {
this.trySync();
});
return this.setupListener();
}
CachedCollectionManager.onLogin(async () => {
await this.setupListener();
this.trySync();
});
}
}

@ -6,13 +6,10 @@ import './emitter';
Meteor.methods({
'public-settings/get'(updatedAt) {
const records = Settings.findNotHiddenPublic().fetch();
if (updatedAt instanceof Date) {
const records = Settings.findNotHiddenPublicUpdatedAfter(updatedAt).fetch();
return {
update: records.filter(function(record) {
return record._updatedAt > updatedAt;
}),
update: records,
remove: Settings.trashFindDeletedAfter(updatedAt, {
hidden: {
$ne: true,
@ -26,7 +23,7 @@ Meteor.methods({
}).fetch(),
};
}
return records;
return Settings.findNotHiddenPublic().fetch();
},
'private-settings/get'(updatedAfter) {
if (!Meteor.userId()) {

Loading…
Cancel
Save