The communications platform that puts data protection first.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
Rocket.Chat/apps/meteor/client/lib/cachedStores/CachedStore.ts

399 lines
11 KiB

import type { IRocketChatRecord } from '@rocket.chat/core-typings';
import type { StreamNames } from '@rocket.chat/ddp-client';
import localforage from 'localforage';
import { Accounts } from 'meteor/accounts-base';
import { Meteor } from 'meteor/meteor';
import { Tracker } from 'meteor/tracker';
import { create, type StoreApi, type UseBoundStore } from 'zustand';
import { baseURI } from '../baseURI';
import { onLoggedIn } from '../loggedIn';
import { CachedStoresManager } from './CachedStoresManager';
import type { IDocumentMapStore } from './DocumentMapStore';
import { sdk } from '../../../app/utils/client/lib/SDKClient';
import { isTruthy } from '../../../lib/isTruthy';
import { withDebouncing } from '../../../lib/utils/highOrderFunctions';
import { getUserId } from '../user';
import { getConfig } from '../utils/getConfig';
type Name = 'rooms' | 'subscriptions' | 'permissions' | 'public-settings' | 'private-settings';
const hasId = <T>(record: T): record is T & { _id: string } => typeof record === 'object' && record !== null && '_id' in record;
const hasUpdatedAt = <T>(record: T): record is T & { _updatedAt: Date } =>
typeof record === 'object' &&
record !== null &&
'_updatedAt' in record &&
(record as unknown as { _updatedAt: unknown })._updatedAt instanceof Date;
const hasDeletedAt = <T>(record: T): record is T & { _deletedAt: Date } =>
typeof record === 'object' &&
record !== null &&
'_deletedAt' in record &&
(record as unknown as { _deletedAt: unknown })._deletedAt instanceof Date;
const hasUnserializedUpdatedAt = <T>(record: T): record is T & { _updatedAt: ConstructorParameters<typeof Date>[0] } =>
typeof record === 'object' &&
record !== null &&
'_updatedAt' in record &&
!((record as unknown as { _updatedAt: unknown })._updatedAt instanceof Date);
localforage.config({ name: baseURI });
export interface IWithManageableCache {
clearCacheOnLogout(): void;
}
export abstract class CachedStore<T extends IRocketChatRecord, U = T> implements IWithManageableCache {
private static readonly MAX_CACHE_TIME = 60 * 60 * 24 * 30;
readonly store: UseBoundStore<StoreApi<IDocumentMapStore<T>>>;
protected name: Name;
protected eventType: StreamNames;
private readonly version = 18;
private updatedAt = new Date(0);
protected log: (...args: any[]) => void;
private timer: ReturnType<typeof setTimeout>;
readonly useReady = create(() => false);
constructor({ name, eventType, store }: { name: Name; eventType: StreamNames; store: UseBoundStore<StoreApi<IDocumentMapStore<T>>> }) {
this.name = name;
this.eventType = eventType;
this.store = store;
this.log = [getConfig(`debugCachedCollection-${this.name}`), getConfig('debugCachedCollection'), getConfig('debug')].includes('true')
? console.log.bind(console, `%cCachedCollection ${this.name}`, `color: navy; font-weight: bold;`)
: () => undefined;
CachedStoresManager.register(this);
}
protected get eventName(): `${Name}-changed` | `${string}/${Name}-changed` {
if (this.eventType === 'notify-user') {
return `${getUserId()}/${this.name}-changed`;
}
return `${this.name}-changed`;
}
protected abstract getToken(): unknown;
private async loadFromCache() {
const data = await localforage.getItem<{ version: number; token: unknown; records: unknown[]; updatedAt: Date | string }>(this.name);
if (!data) {
return false;
}
if (data.version < this.version || data.token !== this.getToken()) {
return false;
}
if (data.records.length <= 0) {
return false;
}
// updatedAt may be a Date or a string depending on the used localForage backend
if (!(data.updatedAt instanceof Date)) {
data.updatedAt = new Date(data.updatedAt);
}
if (Date.now() - data.updatedAt.getTime() >= 1000 * CachedStore.MAX_CACHE_TIME) {
return false;
}
this.log(`${data.records.length} records loaded from cache`);
const deserializedRecords = data.records.map((record) => this.deserializeFromCache(record)).filter(isTruthy);
const updatedAt = Math.max(...deserializedRecords.filter(hasUpdatedAt).map((record) => record?._updatedAt.getTime() ?? 0));
if (updatedAt > this.updatedAt.getTime()) {
this.updatedAt = new Date(updatedAt);
}
this.store.getState().replaceAll(deserializedRecords.filter(hasId));
this.updatedAt = data.updatedAt || this.updatedAt;
return true;
}
protected deserializeFromCache(record: unknown): T | undefined {
if (typeof record !== 'object' || record === null) {
return undefined;
}
return {
...(record as unknown as T),
...(hasUnserializedUpdatedAt(record) && {
_updatedAt: new Date(record._updatedAt),
}),
};
}
private async callLoad() {
// TODO: workaround for bad function overload
const data = await sdk.call(`${this.name}/get`);
return data as unknown as U[];
}
private async callSync(updatedSince: Date) {
// TODO: workaround for bad function overload
const data = await sdk.call(`${this.name}/get`, updatedSince);
return data as unknown as { update: U[]; remove: U[] };
}
private async loadFromServer() {
const startTime = new Date();
const lastTime = this.updatedAt;
const data = await this.callLoad();
this.log(`${data.length} records loaded from server`);
const newRecords = data.map((record) => {
const mapped = this.mapRecord(record);
if (hasUpdatedAt(mapped) && mapped._updatedAt > this.updatedAt) {
this.updatedAt = mapped._updatedAt;
}
return mapped;
});
this.store.getState().storeMany(newRecords);
this.handleLoadedFromServer(newRecords);
this.updatedAt = this.updatedAt === lastTime ? startTime : this.updatedAt;
}
protected mapRecord(record: U): T {
return record as unknown as T;
}
protected handleLoadedFromServer(_records: T[]): void {
// This method can be overridden to handle records after they are loaded from the server
}
protected handleSyncEvent(_action: 'removed' | 'changed', _record: T): void {
// This method can be overridden to handle sync events
}
private async loadFromServerAndPopulate() {
await this.loadFromServer();
await this.save();
}
private save = withDebouncing({ wait: 1000 })(async () => {
this.log('saving cache');
await localforage.setItem(this.name, {
updatedAt: this.updatedAt,
version: this.version,
token: this.getToken(),
records: Array.from(this.store.getState().records.values()),
});
this.log('saving cache (done)');
});
abstract clearCacheOnLogout(): void;
protected async clearCache() {
this.log('clearing cache');
await localforage.removeItem(this.name);
this.store.getState().replaceAll([]);
}
protected setupListener() {
return sdk.stream(this.eventType, [this.eventName], (async (action: 'removed' | 'changed', record: U) => {
this.log('record received', action, record);
await this.handleRecordEvent(action, record);
}) as (...args: unknown[]) => void);
}
protected async handleRecordEvent(action: 'removed' | 'changed', record: U) {
const newRecord = this.mapRecord(record);
if (action === 'removed') {
this.store.getState().delete(newRecord._id);
} else {
this.store.getState().store(newRecord);
}
await this.save();
}
private trySync(delay = 10) {
clearTimeout(this.timer);
// Wait for an empty queue to load data again and sync
this.timer = setTimeout(async () => {
if (!(await this.sync())) {
return this.trySync(delay);
}
await this.save();
}, delay);
}
protected async sync() {
if (!this.updatedAt || this.updatedAt.getTime() === 0 || Meteor.connection._outstandingMethodBlocks.length !== 0) {
return false;
}
const startTime = new Date();
const lastTime = this.updatedAt;
this.log(`syncing from ${this.updatedAt}`);
const data = await this.callSync(this.updatedAt);
const changes = [];
if (data.update && data.update.length > 0) {
this.log(`${data.update.length} records updated in sync`);
for (const record of data.update) {
const newRecord = this.mapRecord(record);
const actionTime = hasUpdatedAt(newRecord) ? newRecord._updatedAt : startTime;
changes.push({
action: () => {
this.store.getState().store(newRecord);
if (actionTime > this.updatedAt) {
this.updatedAt = actionTime;
}
this.handleSyncEvent('changed', newRecord);
},
timestamp: actionTime.getTime(),
});
}
}
if (data.remove && data.remove.length > 0) {
this.log(`${data.remove.length} records removed in sync`);
for (const record of data.remove) {
const newRecord = this.mapRecord(record);
if (!hasDeletedAt(newRecord)) {
continue;
}
const actionTime = newRecord._deletedAt;
changes.push({
action: () => {
this.store.getState().delete(newRecord._id);
if (actionTime > this.updatedAt) {
this.updatedAt = actionTime;
}
this.handleSyncEvent('removed', newRecord);
},
timestamp: actionTime.getTime(),
});
}
}
changes
.sort((a, b) => a.timestamp - b.timestamp)
.forEach((c) => {
c.action();
});
this.updatedAt = this.updatedAt === lastTime ? startTime : this.updatedAt;
return true;
}
private listenerUnsubscriber: (() => void) | undefined;
private async performInitialization() {
if (await this.loadFromCache()) {
this.trySync();
} else {
await this.loadFromServerAndPopulate();
}
this.reconnectionComputation?.stop();
let wentOffline = Tracker.nonreactive(() => Meteor.status().status === 'offline');
this.reconnectionComputation = Tracker.autorun(() => {
const { status } = Meteor.status();
if (status === 'offline') {
wentOffline = true;
}
if (status === 'connected' && wentOffline) {
this.trySync();
}
});
const subscription = this.setupListener();
this.listenerUnsubscriber = () => {
subscription.stop();
this.listenerUnsubscriber = undefined;
};
}
private initializationPromise: Promise<void> | undefined;
init() {
if (this.initializationPromise) {
return this.initializationPromise;
}
this.initializationPromise = this.performInitialization()
.catch(console.error)
.finally(() => {
this.initializationPromise = undefined;
this.setReady(true);
});
return this.initializationPromise;
}
async release() {
if (this.initializationPromise) {
await this.initializationPromise;
}
this.listenerUnsubscriber?.();
this.setReady(false);
}
private reconnectionComputation: Tracker.Computation | undefined;
setReady(ready: boolean) {
this.useReady.setState(ready);
}
}
export class PublicCachedStore<T extends IRocketChatRecord, U = T> extends CachedStore<T, U> {
protected override getToken() {
return undefined;
}
override clearCacheOnLogout() {
// do nothing
}
}
export class PrivateCachedStore<T extends IRocketChatRecord, U = T> extends CachedStore<T, U> {
protected override getToken() {
return Accounts._storedLoginToken();
}
override clearCacheOnLogout() {
void this.clearCache();
}
listen() {
if (process.env.NODE_ENV === 'test') {
return;
}
onLoggedIn(() => {
void this.init();
});
Accounts.onLogout(() => {
this.release();
});
}
}