|
|
|
@ -43,7 +43,7 @@ from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_cla |
|
|
|
|
from synapse.storage.database import DatabasePool |
|
|
|
|
from synapse.storage.util.id_generators import StreamIdGenerator |
|
|
|
|
from synapse.types import get_domain_from_id |
|
|
|
|
from synapse.util.caches.descriptors import Cache, cached, cachedInlineCallbacks |
|
|
|
|
from synapse.util.caches.descriptors import Cache, cachedInlineCallbacks |
|
|
|
|
from synapse.util.iterutils import batch_iter |
|
|
|
|
from synapse.util.metrics import Measure |
|
|
|
|
|
|
|
|
@ -137,42 +137,6 @@ class EventsWorkerStore(SQLBaseStore): |
|
|
|
|
desc="get_received_ts", |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
def get_received_ts_by_stream_pos(self, stream_ordering): |
|
|
|
|
"""Given a stream ordering get an approximate timestamp of when it |
|
|
|
|
happened. |
|
|
|
|
|
|
|
|
|
This is done by simply taking the received ts of the first event that |
|
|
|
|
has a stream ordering greater than or equal to the given stream pos. |
|
|
|
|
If none exists returns the current time, on the assumption that it must |
|
|
|
|
have happened recently. |
|
|
|
|
|
|
|
|
|
Args: |
|
|
|
|
stream_ordering (int) |
|
|
|
|
|
|
|
|
|
Returns: |
|
|
|
|
Deferred[int] |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
def _get_approximate_received_ts_txn(txn): |
|
|
|
|
sql = """ |
|
|
|
|
SELECT received_ts FROM events |
|
|
|
|
WHERE stream_ordering >= ? |
|
|
|
|
LIMIT 1 |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
txn.execute(sql, (stream_ordering,)) |
|
|
|
|
row = txn.fetchone() |
|
|
|
|
if row and row[0]: |
|
|
|
|
ts = row[0] |
|
|
|
|
else: |
|
|
|
|
ts = self.clock.time_msec() |
|
|
|
|
|
|
|
|
|
return ts |
|
|
|
|
|
|
|
|
|
return self.db_pool.runInteraction( |
|
|
|
|
"get_approximate_received_ts", _get_approximate_received_ts_txn |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def get_event( |
|
|
|
|
self, |
|
|
|
@ -923,36 +887,6 @@ class EventsWorkerStore(SQLBaseStore): |
|
|
|
|
) |
|
|
|
|
return results |
|
|
|
|
|
|
|
|
|
def _get_total_state_event_counts_txn(self, txn, room_id): |
|
|
|
|
""" |
|
|
|
|
See get_total_state_event_counts. |
|
|
|
|
""" |
|
|
|
|
# We join against the events table as that has an index on room_id |
|
|
|
|
sql = """ |
|
|
|
|
SELECT COUNT(*) FROM state_events |
|
|
|
|
INNER JOIN events USING (room_id, event_id) |
|
|
|
|
WHERE room_id=? |
|
|
|
|
""" |
|
|
|
|
txn.execute(sql, (room_id,)) |
|
|
|
|
row = txn.fetchone() |
|
|
|
|
return row[0] if row else 0 |
|
|
|
|
|
|
|
|
|
def get_total_state_event_counts(self, room_id): |
|
|
|
|
""" |
|
|
|
|
Gets the total number of state events in a room. |
|
|
|
|
|
|
|
|
|
Args: |
|
|
|
|
room_id (str) |
|
|
|
|
|
|
|
|
|
Returns: |
|
|
|
|
Deferred[int] |
|
|
|
|
""" |
|
|
|
|
return self.db_pool.runInteraction( |
|
|
|
|
"get_total_state_event_counts", |
|
|
|
|
self._get_total_state_event_counts_txn, |
|
|
|
|
room_id, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
def _get_current_state_event_counts_txn(self, txn, room_id): |
|
|
|
|
""" |
|
|
|
|
See get_current_state_event_counts. |
|
|
|
@ -1222,97 +1156,6 @@ class EventsWorkerStore(SQLBaseStore): |
|
|
|
|
|
|
|
|
|
return rows, to_token, True |
|
|
|
|
|
|
|
|
|
@cached(num_args=5, max_entries=10) |
|
|
|
|
def get_all_new_events( |
|
|
|
|
self, |
|
|
|
|
last_backfill_id, |
|
|
|
|
last_forward_id, |
|
|
|
|
current_backfill_id, |
|
|
|
|
current_forward_id, |
|
|
|
|
limit, |
|
|
|
|
): |
|
|
|
|
"""Get all the new events that have arrived at the server either as |
|
|
|
|
new events or as backfilled events""" |
|
|
|
|
have_backfill_events = last_backfill_id != current_backfill_id |
|
|
|
|
have_forward_events = last_forward_id != current_forward_id |
|
|
|
|
|
|
|
|
|
if not have_backfill_events and not have_forward_events: |
|
|
|
|
return defer.succeed(AllNewEventsResult([], [], [], [], [])) |
|
|
|
|
|
|
|
|
|
def get_all_new_events_txn(txn): |
|
|
|
|
sql = ( |
|
|
|
|
"SELECT e.stream_ordering, e.event_id, e.room_id, e.type," |
|
|
|
|
" state_key, redacts" |
|
|
|
|
" FROM events AS e" |
|
|
|
|
" LEFT JOIN redactions USING (event_id)" |
|
|
|
|
" LEFT JOIN state_events USING (event_id)" |
|
|
|
|
" WHERE ? < stream_ordering AND stream_ordering <= ?" |
|
|
|
|
" ORDER BY stream_ordering ASC" |
|
|
|
|
" LIMIT ?" |
|
|
|
|
) |
|
|
|
|
if have_forward_events: |
|
|
|
|
txn.execute(sql, (last_forward_id, current_forward_id, limit)) |
|
|
|
|
new_forward_events = txn.fetchall() |
|
|
|
|
|
|
|
|
|
if len(new_forward_events) == limit: |
|
|
|
|
upper_bound = new_forward_events[-1][0] |
|
|
|
|
else: |
|
|
|
|
upper_bound = current_forward_id |
|
|
|
|
|
|
|
|
|
sql = ( |
|
|
|
|
"SELECT event_stream_ordering, event_id, state_group" |
|
|
|
|
" FROM ex_outlier_stream" |
|
|
|
|
" WHERE ? > event_stream_ordering" |
|
|
|
|
" AND event_stream_ordering >= ?" |
|
|
|
|
" ORDER BY event_stream_ordering DESC" |
|
|
|
|
) |
|
|
|
|
txn.execute(sql, (last_forward_id, upper_bound)) |
|
|
|
|
forward_ex_outliers = txn.fetchall() |
|
|
|
|
else: |
|
|
|
|
new_forward_events = [] |
|
|
|
|
forward_ex_outliers = [] |
|
|
|
|
|
|
|
|
|
sql = ( |
|
|
|
|
"SELECT -e.stream_ordering, e.event_id, e.room_id, e.type," |
|
|
|
|
" state_key, redacts" |
|
|
|
|
" FROM events AS e" |
|
|
|
|
" LEFT JOIN redactions USING (event_id)" |
|
|
|
|
" LEFT JOIN state_events USING (event_id)" |
|
|
|
|
" WHERE ? > stream_ordering AND stream_ordering >= ?" |
|
|
|
|
" ORDER BY stream_ordering DESC" |
|
|
|
|
" LIMIT ?" |
|
|
|
|
) |
|
|
|
|
if have_backfill_events: |
|
|
|
|
txn.execute(sql, (-last_backfill_id, -current_backfill_id, limit)) |
|
|
|
|
new_backfill_events = txn.fetchall() |
|
|
|
|
|
|
|
|
|
if len(new_backfill_events) == limit: |
|
|
|
|
upper_bound = new_backfill_events[-1][0] |
|
|
|
|
else: |
|
|
|
|
upper_bound = current_backfill_id |
|
|
|
|
|
|
|
|
|
sql = ( |
|
|
|
|
"SELECT -event_stream_ordering, event_id, state_group" |
|
|
|
|
" FROM ex_outlier_stream" |
|
|
|
|
" WHERE ? > event_stream_ordering" |
|
|
|
|
" AND event_stream_ordering >= ?" |
|
|
|
|
" ORDER BY event_stream_ordering DESC" |
|
|
|
|
) |
|
|
|
|
txn.execute(sql, (-last_backfill_id, -upper_bound)) |
|
|
|
|
backward_ex_outliers = txn.fetchall() |
|
|
|
|
else: |
|
|
|
|
new_backfill_events = [] |
|
|
|
|
backward_ex_outliers = [] |
|
|
|
|
|
|
|
|
|
return AllNewEventsResult( |
|
|
|
|
new_forward_events, |
|
|
|
|
new_backfill_events, |
|
|
|
|
forward_ex_outliers, |
|
|
|
|
backward_ex_outliers, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
return self.db_pool.runInteraction("get_all_new_events", get_all_new_events_txn) |
|
|
|
|
|
|
|
|
|
async def is_event_after(self, event_id1, event_id2): |
|
|
|
|
"""Returns True if event_id1 is after event_id2 in the stream |
|
|
|
|
""" |
|
|
|
@ -1357,14 +1200,3 @@ class EventsWorkerStore(SQLBaseStore): |
|
|
|
|
return self.db_pool.runInteraction( |
|
|
|
|
desc="get_next_event_to_expire", func=get_next_event_to_expire_txn |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
AllNewEventsResult = namedtuple( |
|
|
|
|
"AllNewEventsResult", |
|
|
|
|
[ |
|
|
|
|
"new_forward_events", |
|
|
|
|
"new_backfill_events", |
|
|
|
|
"forward_ex_outliers", |
|
|
|
|
"backward_ex_outliers", |
|
|
|
|
], |
|
|
|
|
) |
|
|
|
|