|
|
|
@ -38,7 +38,6 @@ from twisted.internet import defer |
|
|
|
|
from synapse.storage._base import SQLBaseStore |
|
|
|
|
from synapse.storage.events import EventsWorkerStore |
|
|
|
|
|
|
|
|
|
from synapse.util.caches.descriptors import cached |
|
|
|
|
from synapse.types import RoomStreamToken |
|
|
|
|
from synapse.util.caches.stream_change_cache import StreamChangeCache |
|
|
|
|
from synapse.util.logcontext import make_deferred_yieldable, run_in_background |
|
|
|
@ -363,60 +362,40 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): |
|
|
|
|
|
|
|
|
|
defer.returnValue((events, token)) |
|
|
|
|
|
|
|
|
|
@cached(num_args=4) |
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def get_recent_event_ids_for_room(self, room_id, limit, end_token, from_token=None): |
|
|
|
|
end_token = RoomStreamToken.parse_stream_token(end_token) |
|
|
|
|
|
|
|
|
|
if from_token is None: |
|
|
|
|
sql = ( |
|
|
|
|
"SELECT stream_ordering, topological_ordering, event_id" |
|
|
|
|
" FROM events" |
|
|
|
|
" WHERE room_id = ? AND stream_ordering <= ? AND outlier = ?" |
|
|
|
|
" ORDER BY topological_ordering DESC, stream_ordering DESC" |
|
|
|
|
" LIMIT ?" |
|
|
|
|
) |
|
|
|
|
else: |
|
|
|
|
from_token = RoomStreamToken.parse_stream_token(from_token) |
|
|
|
|
sql = ( |
|
|
|
|
"SELECT stream_ordering, topological_ordering, event_id" |
|
|
|
|
" FROM events" |
|
|
|
|
" WHERE room_id = ? AND stream_ordering > ?" |
|
|
|
|
" AND stream_ordering <= ? AND outlier = ?" |
|
|
|
|
" ORDER BY topological_ordering DESC, stream_ordering DESC" |
|
|
|
|
" LIMIT ?" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
def get_recent_events_for_room_txn(txn): |
|
|
|
|
if from_token is None: |
|
|
|
|
txn.execute(sql, (room_id, end_token.stream, False, limit,)) |
|
|
|
|
else: |
|
|
|
|
txn.execute(sql, ( |
|
|
|
|
room_id, from_token.stream, end_token.stream, False, limit |
|
|
|
|
)) |
|
|
|
|
"""Get the most recent events in the room in topological ordering. |
|
|
|
|
|
|
|
|
|
rows = self.cursor_to_dict(txn) |
|
|
|
|
Args: |
|
|
|
|
room_id (str) |
|
|
|
|
limit (int) |
|
|
|
|
end_token (str): The stream token representing now. |
|
|
|
|
from_token(str|None): Token to not return events before, if given. |
|
|
|
|
|
|
|
|
|
rows.reverse() # As we selected with reverse ordering |
|
|
|
|
Returns: |
|
|
|
|
Deferred[tuple[list[dict], tuple[str, str]]]: Returns a list of |
|
|
|
|
dicts (which include event_ids, etc), and a tuple for |
|
|
|
|
`(start_token, end_token)` representing the range of rows |
|
|
|
|
returned. |
|
|
|
|
The returned events are in ascending order. |
|
|
|
|
""" |
|
|
|
|
# Allow a zero limit here, and no-op. |
|
|
|
|
if limit == 0: |
|
|
|
|
defer.returnValue(([], (end_token, end_token))) |
|
|
|
|
|
|
|
|
|
if rows: |
|
|
|
|
# Tokens are positions between events. |
|
|
|
|
# This token points *after* the last event in the chunk. |
|
|
|
|
# We need it to point to the event before it in the chunk |
|
|
|
|
# since we are going backwards so we subtract one from the |
|
|
|
|
# stream part. |
|
|
|
|
topo = rows[0]["topological_ordering"] |
|
|
|
|
toke = rows[0]["stream_ordering"] - 1 |
|
|
|
|
start_token = str(RoomStreamToken(topo, toke)) |
|
|
|
|
end_token = RoomStreamToken.parse_stream_token(end_token) |
|
|
|
|
if from_token is not None: |
|
|
|
|
from_token = RoomStreamToken.parse(from_token) |
|
|
|
|
|
|
|
|
|
token = (start_token, str(end_token)) |
|
|
|
|
else: |
|
|
|
|
token = (str(end_token), str(end_token)) |
|
|
|
|
rows, token = yield self.runInteraction( |
|
|
|
|
"get_recent_event_ids_for_room", self._paginate_room_events_txn, |
|
|
|
|
room_id, from_token=end_token, to_token=from_token, limit=limit, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
return rows, token |
|
|
|
|
# We want to return the results in ascending order. |
|
|
|
|
rows.reverse() |
|
|
|
|
|
|
|
|
|
return self.runInteraction( |
|
|
|
|
"get_recent_events_for_room", get_recent_events_for_room_txn |
|
|
|
|
) |
|
|
|
|
defer.returnValue((rows, (token, str(end_token)))) |
|
|
|
|
|
|
|
|
|
def get_room_event_after_stream_ordering(self, room_id, stream_ordering): |
|
|
|
|
"""Gets details of the first event in a room at or after a stream ordering |
|
|
|
|