|
|
|
@ -419,8 +419,6 @@ class FederationEventHandler: |
|
|
|
|
Raises: |
|
|
|
|
SynapseError if the response is in some way invalid. |
|
|
|
|
""" |
|
|
|
|
event_map = {e.event_id: e for e in itertools.chain(auth_events, state)} |
|
|
|
|
|
|
|
|
|
create_event = None |
|
|
|
|
for e in auth_events: |
|
|
|
|
if (e.type, e.state_key) == (EventTypes.Create, ""): |
|
|
|
@ -439,11 +437,6 @@ class FederationEventHandler: |
|
|
|
|
if room_version.identifier != room_version_id: |
|
|
|
|
raise SynapseError(400, "Room version mismatch") |
|
|
|
|
|
|
|
|
|
# filter out any events we have already seen |
|
|
|
|
seen_remotes = await self._store.have_seen_events(room_id, event_map.keys()) |
|
|
|
|
for s in seen_remotes: |
|
|
|
|
event_map.pop(s, None) |
|
|
|
|
|
|
|
|
|
# persist the auth chain and state events. |
|
|
|
|
# |
|
|
|
|
# any invalid events here will be marked as rejected, and we'll carry on. |
|
|
|
@ -455,7 +448,9 @@ class FederationEventHandler: |
|
|
|
|
# signatures right now doesn't mean that we will *never* be able to, so it |
|
|
|
|
# is premature to reject them. |
|
|
|
|
# |
|
|
|
|
await self._auth_and_persist_outliers(room_id, event_map.values()) |
|
|
|
|
await self._auth_and_persist_outliers( |
|
|
|
|
room_id, itertools.chain(auth_events, state) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
# and now persist the join event itself. |
|
|
|
|
logger.info("Peristing join-via-remote %s", event) |
|
|
|
@ -1245,6 +1240,16 @@ class FederationEventHandler: |
|
|
|
|
""" |
|
|
|
|
event_map = {event.event_id: event for event in events} |
|
|
|
|
|
|
|
|
|
# filter out any events we have already seen. This might happen because |
|
|
|
|
# the events were eagerly pushed to us (eg, during a room join), or because |
|
|
|
|
# another thread has raced against us since we decided to request the event. |
|
|
|
|
# |
|
|
|
|
# This is just an optimisation, so it doesn't need to be watertight - the event |
|
|
|
|
# persister does another round of deduplication. |
|
|
|
|
seen_remotes = await self._store.have_seen_events(room_id, event_map.keys()) |
|
|
|
|
for s in seen_remotes: |
|
|
|
|
event_map.pop(s, None) |
|
|
|
|
|
|
|
|
|
# XXX: it might be possible to kick this process off in parallel with fetching |
|
|
|
|
# the events. |
|
|
|
|
while event_map: |
|
|
|
@ -1717,31 +1722,22 @@ class FederationEventHandler: |
|
|
|
|
event_id: the event for which we are lacking auth events |
|
|
|
|
""" |
|
|
|
|
try: |
|
|
|
|
remote_event_map = { |
|
|
|
|
e.event_id: e |
|
|
|
|
for e in await self._federation_client.get_event_auth( |
|
|
|
|
destination, room_id, event_id |
|
|
|
|
) |
|
|
|
|
} |
|
|
|
|
remote_events = await self._federation_client.get_event_auth( |
|
|
|
|
destination, room_id, event_id |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
except RequestSendFailed as e1: |
|
|
|
|
# The other side isn't around or doesn't implement the |
|
|
|
|
# endpoint, so lets just bail out. |
|
|
|
|
logger.info("Failed to get event auth from remote: %s", e1) |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
logger.info("/event_auth returned %i events", len(remote_event_map)) |
|
|
|
|
logger.info("/event_auth returned %i events", len(remote_events)) |
|
|
|
|
|
|
|
|
|
# `event` may be returned, but we should not yet process it. |
|
|
|
|
remote_event_map.pop(event_id, None) |
|
|
|
|
|
|
|
|
|
# nor should we reprocess any events we have already seen. |
|
|
|
|
seen_remotes = await self._store.have_seen_events( |
|
|
|
|
room_id, remote_event_map.keys() |
|
|
|
|
) |
|
|
|
|
for s in seen_remotes: |
|
|
|
|
remote_event_map.pop(s, None) |
|
|
|
|
remote_auth_events = (e for e in remote_events if e.event_id != event_id) |
|
|
|
|
|
|
|
|
|
await self._auth_and_persist_outliers(room_id, remote_event_map.values()) |
|
|
|
|
await self._auth_and_persist_outliers(room_id, remote_auth_events) |
|
|
|
|
|
|
|
|
|
async def _update_context_for_auth_events( |
|
|
|
|
self, event: EventBase, context: EventContext, auth_events: StateMap[EventBase] |
|
|
|
|