|
|
|
@ -22,6 +22,8 @@ from synapse.util.async import run_on_reactor, ObservableDeferred |
|
|
|
|
from synapse.types import StreamToken |
|
|
|
|
import synapse.metrics |
|
|
|
|
|
|
|
|
|
from collections import namedtuple |
|
|
|
|
|
|
|
|
|
import logging |
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -118,6 +120,11 @@ class _NotifierUserStream(object): |
|
|
|
|
return _NotificationListener(self.notify_deferred.observe()) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))): |
|
|
|
|
def __nonzero__(self): |
|
|
|
|
return bool(self.events) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Notifier(object): |
|
|
|
|
""" This class is responsible for notifying any listeners when there are |
|
|
|
|
new events available for it. |
|
|
|
@ -356,7 +363,7 @@ class Notifier(object): |
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def check_for_updates(before_token, after_token): |
|
|
|
|
if not after_token.is_after(before_token): |
|
|
|
|
defer.returnValue(None) |
|
|
|
|
defer.returnValue(EventStreamResult([], (before_token, before_token))) |
|
|
|
|
|
|
|
|
|
events = [] |
|
|
|
|
end_token = from_token |
|
|
|
@ -369,10 +376,14 @@ class Notifier(object): |
|
|
|
|
continue |
|
|
|
|
if only_keys and name not in only_keys: |
|
|
|
|
continue |
|
|
|
|
if limit: |
|
|
|
|
new_limit = max(limit * 2, 10) |
|
|
|
|
else: |
|
|
|
|
new_limit = 10 |
|
|
|
|
new_events, new_key = yield source.get_new_events( |
|
|
|
|
user=user, |
|
|
|
|
from_key=getattr(from_token, keyname), |
|
|
|
|
limit=limit, |
|
|
|
|
limit=new_limit, |
|
|
|
|
is_guest=is_peeking, |
|
|
|
|
room_ids=room_ids, |
|
|
|
|
) |
|
|
|
@ -388,10 +399,7 @@ class Notifier(object): |
|
|
|
|
events.extend(new_events) |
|
|
|
|
end_token = end_token.copy_and_replace(keyname, new_key) |
|
|
|
|
|
|
|
|
|
if events: |
|
|
|
|
defer.returnValue((events, (from_token, end_token))) |
|
|
|
|
else: |
|
|
|
|
defer.returnValue(None) |
|
|
|
|
defer.returnValue(EventStreamResult(events, (from_token, end_token))) |
|
|
|
|
|
|
|
|
|
user_id_for_stream = user.to_string() |
|
|
|
|
if is_peeking: |
|
|
|
@ -415,9 +423,6 @@ class Notifier(object): |
|
|
|
|
from_token=from_token, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
if result is None: |
|
|
|
|
result = ([], (from_token, from_token)) |
|
|
|
|
|
|
|
|
|
defer.returnValue(result) |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|