|
|
|
@ -343,9 +343,16 @@ class FederationSender(AbstractFederationSender): |
|
|
|
|
last_token, self._last_poked_id, limit=100 |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
logger.debug("Handling %s -> %s", last_token, next_token) |
|
|
|
|
logger.debug( |
|
|
|
|
"Handling %i -> %i: %i events to send (current id %i)", |
|
|
|
|
last_token, |
|
|
|
|
next_token, |
|
|
|
|
len(events), |
|
|
|
|
self._last_poked_id, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
if not events and next_token >= self._last_poked_id: |
|
|
|
|
logger.debug("All events processed") |
|
|
|
|
break |
|
|
|
|
|
|
|
|
|
async def handle_event(event: EventBase) -> None: |
|
|
|
@ -353,6 +360,7 @@ class FederationSender(AbstractFederationSender): |
|
|
|
|
send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of() |
|
|
|
|
is_mine = self.is_mine_id(event.sender) |
|
|
|
|
if not is_mine and send_on_behalf_of is None: |
|
|
|
|
logger.debug("Not sending remote-origin event %s", event) |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
# We also want to not send out-of-band membership events. |
|
|
|
@ -389,12 +397,16 @@ class FederationSender(AbstractFederationSender): |
|
|
|
|
# above). |
|
|
|
|
# |
|
|
|
|
if event.internal_metadata.is_out_of_band_membership(): |
|
|
|
|
logger.debug("Not sending OOB membership event %s", event) |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
# Finally, there are some other events that we should not send out |
|
|
|
|
# until someone asks for them. They are explicitly flagged as such |
|
|
|
|
# with `proactively_send: False`. |
|
|
|
|
if not event.internal_metadata.should_proactively_send(): |
|
|
|
|
logger.debug( |
|
|
|
|
"Not sending event with proactively_send=false: %s", event |
|
|
|
|
) |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
destinations: Optional[Set[str]] = None |
|
|
|
@ -458,7 +470,10 @@ class FederationSender(AbstractFederationSender): |
|
|
|
|
"federation_sender" |
|
|
|
|
).observe((now - ts) / 1000) |
|
|
|
|
|
|
|
|
|
async def handle_room_events(events: Iterable[EventBase]) -> None: |
|
|
|
|
async def handle_room_events(events: List[EventBase]) -> None: |
|
|
|
|
logger.debug( |
|
|
|
|
"Handling %i events in room %s", len(events), events[0].room_id |
|
|
|
|
) |
|
|
|
|
with Measure(self.clock, "handle_room_events"): |
|
|
|
|
for event in events: |
|
|
|
|
await handle_event(event) |
|
|
|
@ -477,6 +492,7 @@ class FederationSender(AbstractFederationSender): |
|
|
|
|
) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
logger.debug("Successfully handled up to %i", next_token) |
|
|
|
|
await self.store.update_federation_out_pos("events", next_token) |
|
|
|
|
|
|
|
|
|
if events: |
|
|
|
|