|
|
|
@ -381,7 +381,9 @@ class PerDestinationQueue: |
|
|
|
|
) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
if self._last_successful_stream_ordering is None: |
|
|
|
|
last_successful_stream_ordering = self._last_successful_stream_ordering |
|
|
|
|
|
|
|
|
|
if last_successful_stream_ordering is None: |
|
|
|
|
# if it's still None, then this means we don't have the information |
|
|
|
|
# in our database we haven't successfully sent a PDU to this server |
|
|
|
|
# (at least since the introduction of the feature tracking |
|
|
|
@ -394,8 +396,7 @@ class PerDestinationQueue: |
|
|
|
|
# get at most 50 catchup room/PDUs |
|
|
|
|
while True: |
|
|
|
|
event_ids = await self._store.get_catch_up_room_event_ids( |
|
|
|
|
self._destination, |
|
|
|
|
self._last_successful_stream_ordering, |
|
|
|
|
self._destination, last_successful_stream_ordering |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
if not event_ids: |
|
|
|
@ -403,7 +404,7 @@ class PerDestinationQueue: |
|
|
|
|
# of a race condition, so we check that no new events have been |
|
|
|
|
# skipped due to us being in catch-up mode |
|
|
|
|
|
|
|
|
|
if self._catchup_last_skipped > self._last_successful_stream_ordering: |
|
|
|
|
if self._catchup_last_skipped > last_successful_stream_ordering: |
|
|
|
|
# another event has been skipped because we were in catch-up mode |
|
|
|
|
continue |
|
|
|
|
|
|
|
|
@ -470,7 +471,7 @@ class PerDestinationQueue: |
|
|
|
|
# offline |
|
|
|
|
if ( |
|
|
|
|
p.internal_metadata.stream_ordering |
|
|
|
|
< self._last_successful_stream_ordering |
|
|
|
|
< last_successful_stream_ordering |
|
|
|
|
): |
|
|
|
|
continue |
|
|
|
|
|
|
|
|
@ -513,12 +514,11 @@ class PerDestinationQueue: |
|
|
|
|
# from the *original* PDU, rather than the PDU(s) we actually |
|
|
|
|
# send. This is because we use it to mark our position in the |
|
|
|
|
# queue of missed PDUs to process. |
|
|
|
|
self._last_successful_stream_ordering = ( |
|
|
|
|
pdu.internal_metadata.stream_ordering |
|
|
|
|
) |
|
|
|
|
last_successful_stream_ordering = pdu.internal_metadata.stream_ordering |
|
|
|
|
|
|
|
|
|
self._last_successful_stream_ordering = last_successful_stream_ordering |
|
|
|
|
await self._store.set_destination_last_successful_stream_ordering( |
|
|
|
|
self._destination, self._last_successful_stream_ordering |
|
|
|
|
self._destination, last_successful_stream_ordering |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]: |
|
|
|
|