|
|
|
@ -14,7 +14,9 @@ |
|
|
|
|
"""A replication client for use by synapse workers. |
|
|
|
|
""" |
|
|
|
|
import logging |
|
|
|
|
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple |
|
|
|
|
from typing import TYPE_CHECKING, Dict, Iterable, Optional, Set, Tuple |
|
|
|
|
|
|
|
|
|
from sortedcontainers import SortedList |
|
|
|
|
|
|
|
|
|
from twisted.internet import defer |
|
|
|
|
from twisted.internet.defer import Deferred |
|
|
|
@ -86,7 +88,9 @@ class ReplicationDataHandler: |
|
|
|
|
|
|
|
|
|
# Map from stream and instance to list of deferreds waiting for the stream to |
|
|
|
|
# arrive at a particular position. The lists are sorted by stream position. |
|
|
|
|
self._streams_to_waiters: Dict[Tuple[str, str], List[Tuple[int, Deferred]]] = {} |
|
|
|
|
self._streams_to_waiters: Dict[ |
|
|
|
|
Tuple[str, str], SortedList[Tuple[int, Deferred]] |
|
|
|
|
] = {} |
|
|
|
|
|
|
|
|
|
async def on_rdata( |
|
|
|
|
self, stream_name: str, instance_name: str, token: int, rows: list |
|
|
|
@ -238,7 +242,9 @@ class ReplicationDataHandler: |
|
|
|
|
# Notify any waiting deferreds. The list is ordered by position so we |
|
|
|
|
# just iterate through the list until we reach a position that is |
|
|
|
|
# greater than the received row position. |
|
|
|
|
waiting_list = self._streams_to_waiters.get((stream_name, instance_name), []) |
|
|
|
|
waiting_list = self._streams_to_waiters.get((stream_name, instance_name)) |
|
|
|
|
if not waiting_list: |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
# Index of first item with a position after the current token, i.e we |
|
|
|
|
# have called all deferreds before this index. If not overwritten by |
|
|
|
@ -262,7 +268,7 @@ class ReplicationDataHandler: |
|
|
|
|
|
|
|
|
|
# Drop all entries in the waiting list that were called in the above |
|
|
|
|
# loop. (This maintains the order so no need to resort) |
|
|
|
|
waiting_list[:] = waiting_list[index_of_first_deferred_not_called:] |
|
|
|
|
del waiting_list[:index_of_first_deferred_not_called] |
|
|
|
|
|
|
|
|
|
for deferred in deferreds_to_callback: |
|
|
|
|
try: |
|
|
|
@ -322,11 +328,10 @@ class ReplicationDataHandler: |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
waiting_list = self._streams_to_waiters.setdefault( |
|
|
|
|
(stream_name, instance_name), [] |
|
|
|
|
(stream_name, instance_name), SortedList(key=lambda t: t[0]) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
waiting_list.append((position, deferred)) |
|
|
|
|
waiting_list.sort(key=lambda t: t[0]) |
|
|
|
|
waiting_list.add((position, deferred)) |
|
|
|
|
|
|
|
|
|
# We measure here to get in flight counts and average waiting time. |
|
|
|
|
with Measure(self._clock, "repl.wait_for_stream_position"): |
|
|
|
|