Simplify internal metadata class. (#16762)

We remove these fields as they're just duplicating data the event
already stores, and (for reasons 🤫) I'd like to simplify
the class to only store simple types.

I'm not entirely convinced that we shouldn't instead add helper methods
to the event class to generate stream tokens, but I don't really think
that's where they belong either
1.103.0-whithout-watcha
Erik Johnston 11 months ago committed by GitHub
parent 9ee3db1de5
commit 7469fa7585
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      changelog.d/16762.misc
  2. 9
      synapse/events/__init__.py
  3. 7
      synapse/handlers/admin.py
  4. 10
      synapse/handlers/room.py
  5. 10
      synapse/handlers/sync.py
  6. 33
      synapse/storage/databases/main/stream.py

@ -0,0 +1 @@
Simplify event internal metadata class.

@ -42,7 +42,7 @@ from unpaddedbase64 import encode_base64
from synapse.api.constants import RelationTypes from synapse.api.constants import RelationTypes
from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions
from synapse.types import JsonDict, RoomStreamToken, StrCollection from synapse.types import JsonDict, StrCollection
from synapse.util.caches import intern_dict from synapse.util.caches import intern_dict
from synapse.util.frozenutils import freeze from synapse.util.frozenutils import freeze
from synapse.util.stringutils import strtobool from synapse.util.stringutils import strtobool
@ -211,13 +211,6 @@ class _EventInternalMetadata:
device_id: DictProperty[str] = DictProperty("device_id") device_id: DictProperty[str] = DictProperty("device_id")
"""The device ID of the user who sent this event, if any.""" """The device ID of the user who sent this event, if any."""
# XXX: These are set by StreamWorkerStore._set_before_and_after.
# I'm pretty sure that these are never persisted to the database, so shouldn't
# be here
before: DictProperty[RoomStreamToken] = DictProperty("before")
after: DictProperty[RoomStreamToken] = DictProperty("after")
order: DictProperty[Tuple[int, int]] = DictProperty("order")
def get_dict(self) -> JsonDict: def get_dict(self) -> JsonDict:
return dict(self._dict) return dict(self._dict)

@ -208,7 +208,12 @@ class AdminHandler:
if not events: if not events:
break break
from_key = events[-1].internal_metadata.after last_event = events[-1]
assert last_event.internal_metadata.stream_ordering
from_key = RoomStreamToken(
stream=last_event.internal_metadata.stream_ordering,
topological=last_event.depth,
)
events = await filter_events_for_client( events = await filter_events_for_client(
self._storage_controllers, user_id, events self._storage_controllers, user_id, events

@ -1742,13 +1742,19 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
events = list(room_events) events = list(room_events)
events.extend(e for evs, _ in room_to_events.values() for e in evs) events.extend(e for evs, _ in room_to_events.values() for e in evs)
events.sort(key=lambda e: e.internal_metadata.order) # We know stream_ordering must be not None here, as its been
# persisted, but mypy doesn't know that
events.sort(key=lambda e: cast(int, e.internal_metadata.stream_ordering))
if limit: if limit:
events[:] = events[:limit] events[:] = events[:limit]
if events: if events:
end_key = events[-1].internal_metadata.after last_event = events[-1]
assert last_event.internal_metadata.stream_ordering
end_key = RoomStreamToken(
stream=last_event.internal_metadata.stream_ordering,
)
else: else:
end_key = to_key end_key = to_key

@ -601,7 +601,10 @@ class SyncHandler:
if not limited or block_all_timeline: if not limited or block_all_timeline:
prev_batch_token = upto_token prev_batch_token = upto_token
if recents: if recents:
room_key = recents[0].internal_metadata.before assert recents[0].internal_metadata.stream_ordering
room_key = RoomStreamToken(
stream=recents[0].internal_metadata.stream_ordering - 1
)
prev_batch_token = upto_token.copy_and_replace( prev_batch_token = upto_token.copy_and_replace(
StreamKeyType.ROOM, room_key StreamKeyType.ROOM, room_key
) )
@ -689,7 +692,10 @@ class SyncHandler:
if len(recents) > timeline_limit: if len(recents) > timeline_limit:
limited = True limited = True
recents = recents[-timeline_limit:] recents = recents[-timeline_limit:]
room_key = recents[0].internal_metadata.before assert recents[0].internal_metadata.stream_ordering
room_key = RoomStreamToken(
stream=recents[0].internal_metadata.stream_ordering - 1
)
prev_batch_token = upto_token.copy_and_replace(StreamKeyType.ROOM, room_key) prev_batch_token = upto_token.copy_and_replace(StreamKeyType.ROOM, room_key)

@ -705,8 +705,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
[r.event_id for r in rows], get_prev_content=True [r.event_id for r in rows], get_prev_content=True
) )
self._set_before_and_after(ret, rows, topo_order=False)
if order.lower() == "desc": if order.lower() == "desc":
ret.reverse() ret.reverse()
@ -793,8 +791,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
[r.event_id for r in rows], get_prev_content=True [r.event_id for r in rows], get_prev_content=True
) )
self._set_before_and_after(ret, rows, topo_order=False)
return ret return ret
async def get_recent_events_for_room( async def get_recent_events_for_room(
@ -820,8 +816,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
[r.event_id for r in rows], get_prev_content=True [r.event_id for r in rows], get_prev_content=True
) )
self._set_before_and_after(events, rows)
return events, token return events, token
async def get_recent_event_ids_for_room( async def get_recent_event_ids_for_room(
@ -1094,31 +1088,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# `[(None,)]` # `[(None,)]`
return rows[0][0] if rows[0][0] is not None else 0 return rows[0][0] if rows[0][0] is not None else 0
@staticmethod
def _set_before_and_after(
events: List[EventBase], rows: List[_EventDictReturn], topo_order: bool = True
) -> None:
"""Inserts ordering information to events' internal metadata from
the DB rows.
Args:
events
rows
topo_order: Whether the events were ordered topologically or by stream
ordering. If true then all rows should have a non null
topological_ordering.
"""
for event, row in zip(events, rows):
stream = row.stream_ordering
if topo_order and row.topological_ordering:
topo: Optional[int] = row.topological_ordering
else:
topo = None
internal = event.internal_metadata
internal.before = RoomStreamToken(topological=topo, stream=stream - 1)
internal.after = RoomStreamToken(topological=topo, stream=stream)
internal.order = (int(topo) if topo else 0, int(stream))
async def get_events_around( async def get_events_around(
self, self,
room_id: str, room_id: str,
@ -1559,8 +1528,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
[r.event_id for r in rows], get_prev_content=True [r.event_id for r in rows], get_prev_content=True
) )
self._set_before_and_after(events, rows)
return events, token return events, token
@cached() @cached()

Loading…
Cancel
Save