|
|
@ -233,19 +233,31 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): |
|
|
|
@defer.inlineCallbacks |
|
|
|
@defer.inlineCallbacks |
|
|
|
def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0, |
|
|
|
def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0, |
|
|
|
order='DESC'): |
|
|
|
order='DESC'): |
|
|
|
# Note: If from_key is None then we return in topological order. This |
|
|
|
|
|
|
|
# is because in that case we're using this as a "get the last few messages |
|
|
|
|
|
|
|
# in a room" function, rather than "get new messages since last sync" |
|
|
|
|
|
|
|
if from_key is not None: |
|
|
|
|
|
|
|
from_id = RoomStreamToken.parse_stream_token(from_key).stream |
|
|
|
|
|
|
|
else: |
|
|
|
|
|
|
|
from_id = None |
|
|
|
|
|
|
|
to_id = RoomStreamToken.parse_stream_token(to_key).stream |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"""Get new room events in stream ordering since `from_key`. |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Args: |
|
|
|
|
|
|
|
room_id (str) |
|
|
|
|
|
|
|
from_key (str): Token from which no events are returned before |
|
|
|
|
|
|
|
to_key (str): Token from which no events are returned after. (This |
|
|
|
|
|
|
|
is typically the current stream token) |
|
|
|
|
|
|
|
limit (int): Maximum number of events to return |
|
|
|
|
|
|
|
order (str): Either "DESC" or "ASC". Determines which events are |
|
|
|
|
|
|
|
returned when the result is limited. If "DESC" then the most |
|
|
|
|
|
|
|
recent `limit` events are returned, otherwise returns the |
|
|
|
|
|
|
|
oldest `limit` events. |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Returns: |
|
|
|
|
|
|
|
Deferred[tuple[list[FrozenEvent], str]]: Returns the list of |
|
|
|
|
|
|
|
events (in ascending order) and the token from the start of |
|
|
|
|
|
|
|
the chunk of events returned. |
|
|
|
|
|
|
|
""" |
|
|
|
if from_key == to_key: |
|
|
|
if from_key == to_key: |
|
|
|
defer.returnValue(([], from_key)) |
|
|
|
defer.returnValue(([], from_key)) |
|
|
|
|
|
|
|
|
|
|
|
if from_id: |
|
|
|
from_id = RoomStreamToken.parse_stream_token(from_key).stream |
|
|
|
|
|
|
|
to_id = RoomStreamToken.parse_stream_token(to_key).stream |
|
|
|
|
|
|
|
|
|
|
|
has_changed = yield self._events_stream_cache.has_entity_changed( |
|
|
|
has_changed = yield self._events_stream_cache.has_entity_changed( |
|
|
|
room_id, from_id |
|
|
|
room_id, from_id |
|
|
|
) |
|
|
|
) |
|
|
@ -254,7 +266,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): |
|
|
|
defer.returnValue(([], from_key)) |
|
|
|
defer.returnValue(([], from_key)) |
|
|
|
|
|
|
|
|
|
|
|
def f(txn): |
|
|
|
def f(txn): |
|
|
|
if from_id is not None: |
|
|
|
|
|
|
|
sql = ( |
|
|
|
sql = ( |
|
|
|
"SELECT event_id, stream_ordering FROM events WHERE" |
|
|
|
"SELECT event_id, stream_ordering FROM events WHERE" |
|
|
|
" room_id = ?" |
|
|
|
" room_id = ?" |
|
|
@ -265,20 +276,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): |
|
|
|
txn.execute(sql, (room_id, from_id, to_id, limit)) |
|
|
|
txn.execute(sql, (room_id, from_id, to_id, limit)) |
|
|
|
|
|
|
|
|
|
|
|
rows = [_EventDictReturn(row[0], None, row[1]) for row in txn] |
|
|
|
rows = [_EventDictReturn(row[0], None, row[1]) for row in txn] |
|
|
|
else: |
|
|
|
|
|
|
|
sql = ( |
|
|
|
|
|
|
|
"SELECT event_id, topological_ordering, stream_ordering" |
|
|
|
|
|
|
|
" FROM events" |
|
|
|
|
|
|
|
" WHERE" |
|
|
|
|
|
|
|
" room_id = ?" |
|
|
|
|
|
|
|
" AND not outlier" |
|
|
|
|
|
|
|
" AND stream_ordering <= ?" |
|
|
|
|
|
|
|
" ORDER BY topological_ordering %s, stream_ordering %s LIMIT ?" |
|
|
|
|
|
|
|
) % (order, order,) |
|
|
|
|
|
|
|
txn.execute(sql, (room_id, to_id, limit)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return rows |
|
|
|
return rows |
|
|
|
|
|
|
|
|
|
|
|
rows = yield self.runInteraction("get_room_events_stream_for_room", f) |
|
|
|
rows = yield self.runInteraction("get_room_events_stream_for_room", f) |
|
|
@ -393,7 +390,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): |
|
|
|
if limit == 0: |
|
|
|
if limit == 0: |
|
|
|
defer.returnValue(([], end_token)) |
|
|
|
defer.returnValue(([], end_token)) |
|
|
|
|
|
|
|
|
|
|
|
end_token = RoomStreamToken.parse_stream_token(end_token) |
|
|
|
end_token = RoomStreamToken.parse(end_token) |
|
|
|
|
|
|
|
|
|
|
|
rows, token = yield self.runInteraction( |
|
|
|
rows, token = yield self.runInteraction( |
|
|
|
"get_recent_event_ids_for_room", self._paginate_room_events_txn, |
|
|
|
"get_recent_event_ids_for_room", self._paginate_room_events_txn, |
|
|
|