diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 54be02540..da43bb132 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -38,7 +38,6 @@ from twisted.internet import defer from synapse.storage._base import SQLBaseStore from synapse.storage.events import EventsWorkerStore -from synapse.util.caches.descriptors import cached from synapse.types import RoomStreamToken from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.logcontext import make_deferred_yieldable, run_in_background @@ -363,60 +362,40 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): defer.returnValue((events, token)) - @cached(num_args=4) + @defer.inlineCallbacks def get_recent_event_ids_for_room(self, room_id, limit, end_token, from_token=None): - end_token = RoomStreamToken.parse_stream_token(end_token) - - if from_token is None: - sql = ( - "SELECT stream_ordering, topological_ordering, event_id" - " FROM events" - " WHERE room_id = ? AND stream_ordering <= ? AND outlier = ?" - " ORDER BY topological_ordering DESC, stream_ordering DESC" - " LIMIT ?" - ) - else: - from_token = RoomStreamToken.parse_stream_token(from_token) - sql = ( - "SELECT stream_ordering, topological_ordering, event_id" - " FROM events" - " WHERE room_id = ? AND stream_ordering > ?" - " AND stream_ordering <= ? AND outlier = ?" - " ORDER BY topological_ordering DESC, stream_ordering DESC" - " LIMIT ?" - ) - - def get_recent_events_for_room_txn(txn): - if from_token is None: - txn.execute(sql, (room_id, end_token.stream, False, limit,)) - else: - txn.execute(sql, ( - room_id, from_token.stream, end_token.stream, False, limit - )) + """Get the most recent events in the room in topological ordering. - rows = self.cursor_to_dict(txn) + Args: + room_id (str) + limit (int) + end_token (str): The stream token representing now. + from_token(str|None): Token to not return events before, if given. - rows.reverse() # As we selected with reverse ordering + Returns: + Deferred[tuple[list[dict], tuple[str, str]]]: Returns a list of + dicts (which include event_ids, etc), and a tuple for + `(start_token, end_token)` representing the range of rows + returned. + The returned events are in ascending order. + """ + # Allow a zero limit here, and no-op. + if limit == 0: + defer.returnValue(([], (end_token, end_token))) - if rows: - # Tokens are positions between events. - # This token points *after* the last event in the chunk. - # We need it to point to the event before it in the chunk - # since we are going backwards so we subtract one from the - # stream part. - topo = rows[0]["topological_ordering"] - toke = rows[0]["stream_ordering"] - 1 - start_token = str(RoomStreamToken(topo, toke)) + end_token = RoomStreamToken.parse_stream_token(end_token) + if from_token is not None: + from_token = RoomStreamToken.parse(from_token) - token = (start_token, str(end_token)) - else: - token = (str(end_token), str(end_token)) + rows, token = yield self.runInteraction( + "get_recent_event_ids_for_room", self._paginate_room_events_txn, + room_id, from_token=end_token, to_token=from_token, limit=limit, + ) - return rows, token + # We want to return the results in ascending order. + rows.reverse() - return self.runInteraction( - "get_recent_events_for_room", get_recent_events_for_room_txn - ) + defer.returnValue((rows, (token, str(end_token)))) def get_room_event_after_stream_ordering(self, room_id, stream_ordering): """Gets details of the first event in a room at or after a stream ordering