|
|
|
@ -116,7 +116,16 @@ class HttpPusher(object): |
|
|
|
|
return |
|
|
|
|
try: |
|
|
|
|
self.processing = True |
|
|
|
|
yield self._unsafe_process() |
|
|
|
|
# if the max ordering changes while we're running _unsafe_process, |
|
|
|
|
# call it again, and so on until we've caught up. |
|
|
|
|
while True: |
|
|
|
|
starting_max_ordering = self.max_stream_ordering |
|
|
|
|
try: |
|
|
|
|
yield self._unsafe_process() |
|
|
|
|
except: |
|
|
|
|
logger.exception("Exception processing notifs") |
|
|
|
|
if self.max_stream_ordering == starting_max_ordering: |
|
|
|
|
break |
|
|
|
|
finally: |
|
|
|
|
self.processing = False |
|
|
|
|
|
|
|
|
@ -127,7 +136,7 @@ class HttpPusher(object): |
|
|
|
|
Never call this directly: use _process which will only allow this to |
|
|
|
|
run once per pusher. |
|
|
|
|
""" |
|
|
|
|
starting_max_ordering = self.max_stream_ordering |
|
|
|
|
|
|
|
|
|
unprocessed = yield self.store.get_unread_push_actions_for_user_in_range( |
|
|
|
|
self.user_id, self.last_stream_ordering, self.max_stream_ordering |
|
|
|
|
) |
|
|
|
@ -188,8 +197,6 @@ class HttpPusher(object): |
|
|
|
|
self.timed_call = reactor.callLater(self.backoff_delay, self.on_timer) |
|
|
|
|
self.backoff_delay = min(self.backoff_delay * 2, self.MAX_BACKOFF_SEC) |
|
|
|
|
break |
|
|
|
|
if self.max_stream_ordering != starting_max_ordering: |
|
|
|
|
self._unsafe_process() |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def _process_one(self, push_action): |
|
|
|
|