|
|
|
@ -24,8 +24,8 @@ from synapse.replication.http.streams import ReplicationGetStreamUpdates |
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
MAX_EVENTS_BEHIND = 500000 |
|
|
|
|
# the number of rows to request from an update_function. |
|
|
|
|
_STREAM_UPDATE_TARGET_ROW_COUNT = 100 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Some type aliases to make things a bit easier. |
|
|
|
@ -56,7 +56,11 @@ StreamUpdateResult = Tuple[List[Tuple[Token, StreamRow]], Token, bool] |
|
|
|
|
# * from_token: the previous stream token: the starting point for fetching the |
|
|
|
|
# updates |
|
|
|
|
# * to_token: the new stream token: the point to get updates up to |
|
|
|
|
# * limit: the maximum number of rows to return |
|
|
|
|
# * target_row_count: a target for the number of rows to be returned. |
|
|
|
|
# |
|
|
|
|
# The update_function is expected to return up to _approximately_ target_row_count rows. |
|
|
|
|
# If there are more updates available, it should set `limited` in the result, and |
|
|
|
|
# it will be called again to get the next batch. |
|
|
|
|
# |
|
|
|
|
UpdateFunction = Callable[[Token, Token, int], Awaitable[StreamUpdateResult]] |
|
|
|
|
|
|
|
|
@ -138,7 +142,7 @@ class Stream(object): |
|
|
|
|
return updates, current_token, limited |
|
|
|
|
|
|
|
|
|
async def get_updates_since( |
|
|
|
|
self, from_token: Token, upto_token: Token, limit: int = 100 |
|
|
|
|
self, from_token: Token, upto_token: Token |
|
|
|
|
) -> StreamUpdateResult: |
|
|
|
|
"""Like get_updates except allows specifying from when we should |
|
|
|
|
stream updates |
|
|
|
@ -156,7 +160,7 @@ class Stream(object): |
|
|
|
|
return [], upto_token, False |
|
|
|
|
|
|
|
|
|
updates, upto_token, limited = await self.update_function( |
|
|
|
|
from_token, upto_token, limit, |
|
|
|
|
from_token, upto_token, _STREAM_UPDATE_TARGET_ROW_COUNT, |
|
|
|
|
) |
|
|
|
|
return updates, upto_token, limited |
|
|
|
|
|
|
|
|
|