@ -257,6 +257,11 @@ class ReplicationCommandHandler:
if hs . config . redis . redis_enabled :
self . _notifier . add_lock_released_callback ( self . on_lock_released )
# Marks if we should send POSITION commands for all streams ASAP. This
# is checked by the `ReplicationStreamer` which manages sending
# RDATA/POSITION commands
self . _should_announce_positions = True
def subscribe_to_channel ( self , channel_name : str ) - > None :
"""
Indicates that we wish to subscribe to a Redis channel by name .
@ -397,29 +402,23 @@ class ReplicationCommandHandler:
return self . _streams_to_replicate
def on_REPLICATE ( self , conn : IReplicationConnection , cmd : ReplicateCommand ) - > None :
self . send_positions_to_connection ( conn )
self . send_positions_to_connection ( )
def send_positions_to_connection ( self , conn : IReplicationConnection ) - > None :
def send_positions_to_connection ( self ) - > None :
""" Send current position of all streams this process is source of to
the connection .
"""
# We respond with current position of all streams this instance
# replicates.
for stream in self . get_streams_to_replicate ( ) :
# Note that we use the current token as the prev token here (rather
# than stream.last_token), as we can't be sure that there have been
# no rows written between last token and the current token (since we
# might be racing with the replication sending bg process).
current_token = stream . current_token ( self . _instance_name )
self . send_command (
PositionCommand (
stream . NAME ,
self . _instance_name ,
current_token ,
current_token ,
)
)
self . _should_announce_positions = True
self . _notifier . notify_replication ( )
def should_announce_positions ( self ) - > bool :
""" Check if we should send POSITION commands for all streams ASAP. """
return self . _should_announce_positions
def will_announce_positions ( self ) - > None :
""" Mark that we ' re about to send POSITIONs out for all streams. """
self . _should_announce_positions = False
def on_USER_SYNC (
self , conn : IReplicationConnection , cmd : UserSyncCommand
@ -653,8 +652,9 @@ class ReplicationCommandHandler:
# for why this can happen.
logger . info (
" Fetching replication rows for ' %s ' between %i and %i " ,
" Fetching replication rows for ' %s ' / %s between %i and %i " ,
stream_name ,
cmd . instance_name ,
current_token ,
cmd . new_token ,
)