|
|
|
@ -242,10 +242,8 @@ class SQLBaseStore(object): |
|
|
|
|
self._txn_perf_counters = PerformanceCounters() |
|
|
|
|
self._get_event_counters = PerformanceCounters() |
|
|
|
|
|
|
|
|
|
self._get_event_cache = LruCache(hs.config.event_cache_size) |
|
|
|
|
|
|
|
|
|
# Pretend the getEventCache is just another named cache |
|
|
|
|
caches_by_name["*getEvent*"] = self._get_event_cache |
|
|
|
|
self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True, |
|
|
|
|
max_entries=hs.config.event_cache_size) |
|
|
|
|
|
|
|
|
|
def start_profiling(self): |
|
|
|
|
self._previous_loop_ts = self._clock.time_msec() |
|
|
|
@ -733,6 +731,12 @@ class SQLBaseStore(object): |
|
|
|
|
|
|
|
|
|
return [e for e in events if e] |
|
|
|
|
|
|
|
|
|
def _invalidate_get_event_cache(self, event_id): |
|
|
|
|
for check_redacted in (False, True): |
|
|
|
|
for get_prev_content in (False, True): |
|
|
|
|
self._get_event_cache.invalidate(event_id, check_redacted, |
|
|
|
|
get_prev_content) |
|
|
|
|
|
|
|
|
|
def _get_event_txn(self, txn, event_id, check_redacted=True, |
|
|
|
|
get_prev_content=False, allow_rejected=False): |
|
|
|
|
|
|
|
|
@ -743,16 +747,14 @@ class SQLBaseStore(object): |
|
|
|
|
sql_getevents_timer.inc_by(curr_time - last_time, desc) |
|
|
|
|
return curr_time |
|
|
|
|
|
|
|
|
|
cache = self._get_event_cache.setdefault(event_id, {}) |
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
# Separate cache entries for each way to invoke _get_event_txn |
|
|
|
|
ret = cache[(check_redacted, get_prev_content, allow_rejected)] |
|
|
|
|
ret = self._get_event_cache.get(event_id, check_redacted, get_prev_content) |
|
|
|
|
|
|
|
|
|
cache_counter.inc_hits("*getEvent*") |
|
|
|
|
return ret |
|
|
|
|
if allow_rejected or not ret.rejected_reason: |
|
|
|
|
return ret |
|
|
|
|
else: |
|
|
|
|
return None |
|
|
|
|
except KeyError: |
|
|
|
|
cache_counter.inc_misses("*getEvent*") |
|
|
|
|
pass |
|
|
|
|
finally: |
|
|
|
|
start_time = update_counter("event_cache", start_time) |
|
|
|
@ -777,19 +779,22 @@ class SQLBaseStore(object): |
|
|
|
|
|
|
|
|
|
start_time = update_counter("select_event", start_time) |
|
|
|
|
|
|
|
|
|
result = self._get_event_from_row_txn( |
|
|
|
|
txn, internal_metadata, js, redacted, |
|
|
|
|
check_redacted=check_redacted, |
|
|
|
|
get_prev_content=get_prev_content, |
|
|
|
|
rejected_reason=rejected_reason, |
|
|
|
|
) |
|
|
|
|
self._get_event_cache.prefill(event_id, check_redacted, get_prev_content, result) |
|
|
|
|
|
|
|
|
|
if allow_rejected or not rejected_reason: |
|
|
|
|
result = self._get_event_from_row_txn( |
|
|
|
|
txn, internal_metadata, js, redacted, |
|
|
|
|
check_redacted=check_redacted, |
|
|
|
|
get_prev_content=get_prev_content, |
|
|
|
|
) |
|
|
|
|
cache[(check_redacted, get_prev_content, allow_rejected)] = result |
|
|
|
|
return result |
|
|
|
|
else: |
|
|
|
|
return None |
|
|
|
|
|
|
|
|
|
def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted, |
|
|
|
|
check_redacted=True, get_prev_content=False): |
|
|
|
|
check_redacted=True, get_prev_content=False, |
|
|
|
|
rejected_reason=None): |
|
|
|
|
|
|
|
|
|
start_time = time.time() * 1000 |
|
|
|
|
|
|
|
|
@ -804,7 +809,11 @@ class SQLBaseStore(object): |
|
|
|
|
internal_metadata = json.loads(internal_metadata) |
|
|
|
|
start_time = update_counter("decode_internal", start_time) |
|
|
|
|
|
|
|
|
|
ev = FrozenEvent(d, internal_metadata_dict=internal_metadata) |
|
|
|
|
ev = FrozenEvent( |
|
|
|
|
d, |
|
|
|
|
internal_metadata_dict=internal_metadata, |
|
|
|
|
rejected_reason=rejected_reason, |
|
|
|
|
) |
|
|
|
|
start_time = update_counter("build_frozen_event", start_time) |
|
|
|
|
|
|
|
|
|
if check_redacted and redacted: |
|
|
|
|