|
|
|
@ -131,6 +131,11 @@ class PusherSlaveStore( |
|
|
|
|
DataStore.get_profile_displayname.__func__ |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
# XXX: This is a bit broken because we don't persist forgotten rooms |
|
|
|
|
# in a way that they can be streamed. This means that we don't have a |
|
|
|
|
# way to invalidate the forgotten rooms cache correctly. |
|
|
|
|
# For now we expire the cache every 10 minutes. |
|
|
|
|
BROKEN_CACHE_EXPIRY_MS = 60 * 60 * 1000 |
|
|
|
|
who_forgot_in_room = ( |
|
|
|
|
RoomMemberStore.__dict__["who_forgot_in_room"] |
|
|
|
|
) |
|
|
|
@ -214,6 +219,7 @@ class PusherServer(HomeServer): |
|
|
|
|
store = self.get_datastore() |
|
|
|
|
replication_url = self.config.replication_url |
|
|
|
|
pusher_pool = self.get_pusherpool() |
|
|
|
|
clock = self.get_clock() |
|
|
|
|
|
|
|
|
|
def stop_pusher(user_id, app_id, pushkey): |
|
|
|
|
key = "%s:%s" % (app_id, pushkey) |
|
|
|
@ -265,11 +271,21 @@ class PusherServer(HomeServer): |
|
|
|
|
min_stream_id, max_stream_id, affected_room_ids |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
def expire_broken_caches(): |
|
|
|
|
store.who_forgot_in_room.invalidate_all() |
|
|
|
|
|
|
|
|
|
next_expire_broken_caches_ms = 0 |
|
|
|
|
while True: |
|
|
|
|
try: |
|
|
|
|
args = store.stream_positions() |
|
|
|
|
args["timeout"] = 30000 |
|
|
|
|
result = yield http_client.get_json(replication_url, args=args) |
|
|
|
|
now_ms = clock.time_msec() |
|
|
|
|
if now_ms > next_expire_broken_caches_ms: |
|
|
|
|
expire_broken_caches() |
|
|
|
|
next_expire_broken_caches_ms = ( |
|
|
|
|
now_ms + store.BROKEN_CACHE_EXPIRY_MS |
|
|
|
|
) |
|
|
|
|
yield store.process_replication(result) |
|
|
|
|
poke_pushers(result) |
|
|
|
|
except: |
|
|
|
|