|
|
|
@ -17,6 +17,7 @@ from __future__ import division |
|
|
|
|
|
|
|
|
|
import itertools |
|
|
|
|
import logging |
|
|
|
|
import threading |
|
|
|
|
from collections import namedtuple |
|
|
|
|
|
|
|
|
|
from canonicaljson import json |
|
|
|
@ -34,6 +35,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process |
|
|
|
|
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause |
|
|
|
|
from synapse.types import get_domain_from_id |
|
|
|
|
from synapse.util import batch_iter |
|
|
|
|
from synapse.util.caches.descriptors import Cache |
|
|
|
|
from synapse.util.metrics import Measure |
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
@ -53,6 +55,17 @@ _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event")) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class EventsWorkerStore(SQLBaseStore): |
|
|
|
|
def __init__(self, db_conn, hs): |
|
|
|
|
super(EventsWorkerStore, self).__init__(db_conn, hs) |
|
|
|
|
|
|
|
|
|
self._get_event_cache = Cache( |
|
|
|
|
"*getEvent*", keylen=3, max_entries=hs.config.event_cache_size |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
self._event_fetch_lock = threading.Condition() |
|
|
|
|
self._event_fetch_list = [] |
|
|
|
|
self._event_fetch_ongoing = 0 |
|
|
|
|
|
|
|
|
|
def get_received_ts(self, event_id): |
|
|
|
|
"""Get received_ts (when it was persisted) for the event. |
|
|
|
|
|
|
|
|
|