|
|
|
@ -685,6 +685,7 @@ class _TransactionQueue(object): |
|
|
|
|
self.transport_layer = transport_layer |
|
|
|
|
|
|
|
|
|
self._clock = hs.get_clock() |
|
|
|
|
self.store = hs.get_datastore() |
|
|
|
|
|
|
|
|
|
# Is a mapping from destinations -> deferreds. Used to keep track |
|
|
|
|
# of which destinations have transactions in flight and when they are |
|
|
|
@ -775,11 +776,18 @@ class _TransactionQueue(object): |
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
@log_function |
|
|
|
|
def _attempt_new_transaction(self, destination): |
|
|
|
|
|
|
|
|
|
(retry_last_ts, retry_interval) = self.store.get_destination_retry_timings(destination) |
|
|
|
|
if retry_last_ts + retry_interval > int(self._clock.time_msec()): |
|
|
|
|
logger.info("TX [%s] not ready for retry yet - dropping transaction for now") |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
(retry_last_ts, retry_interval) = (0, 0) |
|
|
|
|
retry_timings = yield self.store.get_destination_retry_timings(destination) |
|
|
|
|
if retry_timings: |
|
|
|
|
(retry_last_ts, retry_interval) = ( |
|
|
|
|
retry_timings.retry_last_ts, retry_timings.retry_interval |
|
|
|
|
) |
|
|
|
|
if retry_last_ts + retry_interval > int(self._clock.time_msec()): |
|
|
|
|
logger.info("TX [%s] not ready for retry yet - dropping transaction for now", destination) |
|
|
|
|
return |
|
|
|
|
else: |
|
|
|
|
logger.info("TX [%s] is ready for retry", destination) |
|
|
|
|
|
|
|
|
|
if destination in self.pending_transactions: |
|
|
|
|
# XXX: pending_transactions can get stuck on by a never-ending request |
|
|
|
@ -866,7 +874,7 @@ class _TransactionQueue(object): |
|
|
|
|
if code == 200: |
|
|
|
|
if retry_last_ts: |
|
|
|
|
# this host is alive! reset retry schedule |
|
|
|
|
self.store.set_destination_retry_timings(destination, 0, 0) |
|
|
|
|
yield self.store.set_destination_retry_timings(destination, 0, 0) |
|
|
|
|
deferred.callback(None) |
|
|
|
|
else: |
|
|
|
|
self.start_retrying(destination, retry_interval) |
|
|
|
@ -899,12 +907,13 @@ class _TransactionQueue(object): |
|
|
|
|
# Check to see if there is anything else to send. |
|
|
|
|
self._attempt_new_transaction(destination) |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def start_retrying(self, destination, retry_interval): |
|
|
|
|
# track that this destination is having problems and we should |
|
|
|
|
# give it a chance to recover before trying it again |
|
|
|
|
if retry_interval: |
|
|
|
|
retry_interval *= 2 |
|
|
|
|
else: |
|
|
|
|
retry_interval = 2 # try again at first after 2 seconds |
|
|
|
|
self.store.set_destination_retry_timings(destination, |
|
|
|
|
retry_interval = 2000 # try again at first after 2 seconds |
|
|
|
|
yield self.store.set_destination_retry_timings(destination, |
|
|
|
|
int(self._clock.time_msec()), retry_interval) |
|
|
|
|