|
|
|
@ -247,9 +247,15 @@ class FederationHandler(BaseHandler): |
|
|
|
|
if set(e_id for e_id, _ in ev.prev_events) - event_ids |
|
|
|
|
] |
|
|
|
|
|
|
|
|
|
logger.info( |
|
|
|
|
"backfill: Got %d events with %d edges", |
|
|
|
|
len(events), len(edges), |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
# For each edge get the current state. |
|
|
|
|
|
|
|
|
|
auth_events = {} |
|
|
|
|
state_events = {} |
|
|
|
|
events_to_state = {} |
|
|
|
|
for e_id in edges: |
|
|
|
|
state, auth = yield self.replication_layer.get_state_for_room( |
|
|
|
@ -258,12 +264,46 @@ class FederationHandler(BaseHandler): |
|
|
|
|
event_id=e_id |
|
|
|
|
) |
|
|
|
|
auth_events.update({a.event_id: a for a in auth}) |
|
|
|
|
auth_events.update({s.event_id: s for s in state}) |
|
|
|
|
state_events.update({s.event_id: s for s in state}) |
|
|
|
|
events_to_state[e_id] = state |
|
|
|
|
|
|
|
|
|
seen_events = yield self.store.have_events( |
|
|
|
|
set(auth_events.keys()) | set(state_events.keys()) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
all_events = events + state_events.values() + auth_events.values() |
|
|
|
|
required_auth = set( |
|
|
|
|
a_id for event in all_events for a_id, _ in event.auth_events |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
missing_auth = required_auth - set(auth_events) |
|
|
|
|
results = yield defer.gatherResults( |
|
|
|
|
[ |
|
|
|
|
self.replication_layer.get_pdu( |
|
|
|
|
[dest], |
|
|
|
|
event_id, |
|
|
|
|
outlier=True, |
|
|
|
|
timeout=10000, |
|
|
|
|
) |
|
|
|
|
for event_id in missing_auth |
|
|
|
|
], |
|
|
|
|
consumeErrors=True |
|
|
|
|
).addErrback(unwrapFirstError) |
|
|
|
|
auth_events.update({a.event_id: a for a in results}) |
|
|
|
|
|
|
|
|
|
yield defer.gatherResults( |
|
|
|
|
[ |
|
|
|
|
self._handle_new_event(dest, a) |
|
|
|
|
self._handle_new_event( |
|
|
|
|
dest, a, |
|
|
|
|
auth_events={ |
|
|
|
|
(auth_events[a_id].type, auth_events[a_id].state_key): |
|
|
|
|
auth_events[a_id] |
|
|
|
|
for a_id, _ in a.auth_events |
|
|
|
|
}, |
|
|
|
|
) |
|
|
|
|
for a in auth_events.values() |
|
|
|
|
if a.event_id not in seen_events |
|
|
|
|
], |
|
|
|
|
consumeErrors=True, |
|
|
|
|
).addErrback(unwrapFirstError) |
|
|
|
@ -274,6 +314,11 @@ class FederationHandler(BaseHandler): |
|
|
|
|
dest, event_map[e_id], |
|
|
|
|
state=events_to_state[e_id], |
|
|
|
|
backfilled=True, |
|
|
|
|
auth_events={ |
|
|
|
|
(auth_events[a_id].type, auth_events[a_id].state_key): |
|
|
|
|
auth_events[a_id] |
|
|
|
|
for a_id, _ in event_map[e_id].auth_events |
|
|
|
|
}, |
|
|
|
|
) |
|
|
|
|
for e_id in events_to_state |
|
|
|
|
], |
|
|
|
|