|
|
|
@ -46,8 +46,7 @@ class NotRetryingDestination(Exception): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def get_retry_limiter(destination, clock, store, ignore_backoff=False, |
|
|
|
|
**kwargs): |
|
|
|
|
def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs): |
|
|
|
|
"""For a given destination check if we have previously failed to |
|
|
|
|
send a request there and are waiting before retrying the destination. |
|
|
|
|
If we are not ready to retry the destination, this will raise a |
|
|
|
@ -60,8 +59,7 @@ def get_retry_limiter(destination, clock, store, ignore_backoff=False, |
|
|
|
|
clock (synapse.util.clock): timing source |
|
|
|
|
store (synapse.storage.transactions.TransactionStore): datastore |
|
|
|
|
ignore_backoff (bool): true to ignore the historical backoff data and |
|
|
|
|
try the request anyway. We will still update the next |
|
|
|
|
retry_interval on success/failure. |
|
|
|
|
try the request anyway. We will still reset the retry_interval on success. |
|
|
|
|
|
|
|
|
|
Example usage: |
|
|
|
|
|
|
|
|
@ -75,13 +73,12 @@ def get_retry_limiter(destination, clock, store, ignore_backoff=False, |
|
|
|
|
""" |
|
|
|
|
retry_last_ts, retry_interval = (0, 0) |
|
|
|
|
|
|
|
|
|
retry_timings = yield store.get_destination_retry_timings( |
|
|
|
|
destination |
|
|
|
|
) |
|
|
|
|
retry_timings = yield store.get_destination_retry_timings(destination) |
|
|
|
|
|
|
|
|
|
if retry_timings: |
|
|
|
|
retry_last_ts, retry_interval = ( |
|
|
|
|
retry_timings["retry_last_ts"], retry_timings["retry_interval"] |
|
|
|
|
retry_timings["retry_last_ts"], |
|
|
|
|
retry_timings["retry_interval"], |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
now = int(clock.time_msec()) |
|
|
|
@ -93,22 +90,31 @@ def get_retry_limiter(destination, clock, store, ignore_backoff=False, |
|
|
|
|
destination=destination, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
# if we are ignoring the backoff data, we should also not increment the backoff |
|
|
|
|
# when we get another failure - otherwise a server can very quickly reach the |
|
|
|
|
# maximum backoff even though it might only have been down briefly |
|
|
|
|
backoff_on_failure = not ignore_backoff |
|
|
|
|
|
|
|
|
|
defer.returnValue( |
|
|
|
|
RetryDestinationLimiter( |
|
|
|
|
destination, |
|
|
|
|
clock, |
|
|
|
|
store, |
|
|
|
|
retry_interval, |
|
|
|
|
**kwargs |
|
|
|
|
destination, clock, store, retry_interval, backoff_on_failure, **kwargs |
|
|
|
|
) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class RetryDestinationLimiter(object): |
|
|
|
|
def __init__(self, destination, clock, store, retry_interval, |
|
|
|
|
min_retry_interval=10 * 60 * 1000, |
|
|
|
|
max_retry_interval=24 * 60 * 60 * 1000, |
|
|
|
|
multiplier_retry_interval=5, backoff_on_404=False): |
|
|
|
|
def __init__( |
|
|
|
|
self, |
|
|
|
|
destination, |
|
|
|
|
clock, |
|
|
|
|
store, |
|
|
|
|
retry_interval, |
|
|
|
|
min_retry_interval=10 * 60 * 1000, |
|
|
|
|
max_retry_interval=24 * 60 * 60 * 1000, |
|
|
|
|
multiplier_retry_interval=5, |
|
|
|
|
backoff_on_404=False, |
|
|
|
|
backoff_on_failure=True, |
|
|
|
|
): |
|
|
|
|
"""Marks the destination as "down" if an exception is thrown in the |
|
|
|
|
context, except for CodeMessageException with code < 500. |
|
|
|
|
|
|
|
|
@ -128,6 +134,9 @@ class RetryDestinationLimiter(object): |
|
|
|
|
multiplier_retry_interval (int): The multiplier to use to increase |
|
|
|
|
the retry interval after a failed request. |
|
|
|
|
backoff_on_404 (bool): Back off if we get a 404 |
|
|
|
|
|
|
|
|
|
backoff_on_failure (bool): set to False if we should not increase the |
|
|
|
|
retry interval on a failure. |
|
|
|
|
""" |
|
|
|
|
self.clock = clock |
|
|
|
|
self.store = store |
|
|
|
@ -138,6 +147,7 @@ class RetryDestinationLimiter(object): |
|
|
|
|
self.max_retry_interval = max_retry_interval |
|
|
|
|
self.multiplier_retry_interval = multiplier_retry_interval |
|
|
|
|
self.backoff_on_404 = backoff_on_404 |
|
|
|
|
self.backoff_on_failure = backoff_on_failure |
|
|
|
|
|
|
|
|
|
def __enter__(self): |
|
|
|
|
pass |
|
|
|
@ -173,10 +183,13 @@ class RetryDestinationLimiter(object): |
|
|
|
|
if not self.retry_interval: |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
logger.debug("Connection to %s was successful; clearing backoff", |
|
|
|
|
self.destination) |
|
|
|
|
logger.debug( |
|
|
|
|
"Connection to %s was successful; clearing backoff", self.destination |
|
|
|
|
) |
|
|
|
|
retry_last_ts = 0 |
|
|
|
|
self.retry_interval = 0 |
|
|
|
|
elif not self.backoff_on_failure: |
|
|
|
|
return |
|
|
|
|
else: |
|
|
|
|
# We couldn't connect. |
|
|
|
|
if self.retry_interval: |
|
|
|
@ -190,7 +203,10 @@ class RetryDestinationLimiter(object): |
|
|
|
|
|
|
|
|
|
logger.info( |
|
|
|
|
"Connection to %s was unsuccessful (%s(%s)); backoff now %i", |
|
|
|
|
self.destination, exc_type, exc_val, self.retry_interval |
|
|
|
|
self.destination, |
|
|
|
|
exc_type, |
|
|
|
|
exc_val, |
|
|
|
|
self.retry_interval, |
|
|
|
|
) |
|
|
|
|
retry_last_ts = int(self.clock.time_msec()) |
|
|
|
|
|
|
|
|
@ -201,9 +217,7 @@ class RetryDestinationLimiter(object): |
|
|
|
|
self.destination, retry_last_ts, self.retry_interval |
|
|
|
|
) |
|
|
|
|
except Exception: |
|
|
|
|
logger.exception( |
|
|
|
|
"Failed to store destination_retry_timings", |
|
|
|
|
) |
|
|
|
|
logger.exception("Failed to store destination_retry_timings") |
|
|
|
|
|
|
|
|
|
# we deliberately do this in the background. |
|
|
|
|
synapse.util.logcontext.run_in_background(store_retry_timings) |
|
|
|
|