Merge pull request #668 from matrix-org/markjh/deduplicate

Deduplicate identical /sync requests
pull/4/merge
Mark Haines 9 years ago
commit 3e8bb99a2b
  1. 12
      synapse/handlers/room.py
  2. 16
      synapse/handlers/sync.py
  3. 3
      synapse/rest/client/v2_alpha/sync.py
  4. 46
      synapse/util/caches/response_cache.py

@ -25,6 +25,7 @@ from synapse.api.constants import (
from synapse.api.errors import AuthError, StoreError, SynapseError, Codes
from synapse.util import stringutils, unwrapFirstError
from synapse.util.logcontext import preserve_context_over_fn
from synapse.util.caches.response_cache import ResponseCache
from signedjson.sign import verify_signed_json
from signedjson.key import decode_verify_key_bytes
@ -939,9 +940,18 @@ class RoomMemberHandler(BaseHandler):
class RoomListHandler(BaseHandler):
def __init__(self, hs):
super(RoomListHandler, self).__init__(hs)
self.response_cache = ResponseCache()
@defer.inlineCallbacks
def get_public_room_list(self):
result = self.response_cache.get(())
if not result:
result = self.response_cache.set((), self._get_public_room_list())
return result
@defer.inlineCallbacks
def _get_public_room_list(self):
room_ids = yield self.store.get_public_room_ids()
@defer.inlineCallbacks

@ -20,6 +20,7 @@ from synapse.api.constants import Membership, EventTypes
from synapse.util import unwrapFirstError
from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.metrics import Measure
from synapse.util.caches.response_cache import ResponseCache
from synapse.push.clientformat import format_push_rules_for_user
from twisted.internet import defer
@ -35,6 +36,7 @@ SyncConfig = collections.namedtuple("SyncConfig", [
"user",
"filter_collection",
"is_guest",
"request_key",
])
@ -136,8 +138,8 @@ class SyncHandler(BaseHandler):
super(SyncHandler, self).__init__(hs)
self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock()
self.response_cache = ResponseCache()
@defer.inlineCallbacks
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
full_state=False):
"""Get the sync for a client if we have new data for it now. Otherwise
@ -146,7 +148,19 @@ class SyncHandler(BaseHandler):
Returns:
A Deferred SyncResult.
"""
result = self.response_cache.get(sync_config.request_key)
if not result:
result = self.response_cache.set(
sync_config.request_key,
self._wait_for_sync_for_user(
sync_config, since_token, timeout, full_state
)
)
return result
@defer.inlineCallbacks
def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
full_state):
context = LoggingContext.current_context()
if context:
if since_token is None:

@ -115,6 +115,8 @@ class SyncRestServlet(RestServlet):
)
)
request_key = (user, timeout, since, filter_id, full_state)
if filter_id:
if filter_id.startswith('{'):
try:
@ -134,6 +136,7 @@ class SyncRestServlet(RestServlet):
user=user,
filter_collection=filter,
is_guest=requester.is_guest,
request_key=request_key,
)
if since is not None:

@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.util.async import ObservableDeferred
class ResponseCache(object):
"""
This caches a deferred response. Until the deferred completes it will be
returned from the cache. This means that if the client retries the request
while the response is still being computed, that original response will be
used rather than trying to compute a new response.
"""
def __init__(self):
self.pending_result_cache = {} # Requests that haven't finished yet.
def get(self, key):
result = self.pending_result_cache.get(key)
if result is not None:
return result.observe()
else:
return None
def set(self, key, deferred):
result = ObservableDeferred(deferred)
self.pending_result_cache[key] = result
def remove(r):
self.pending_result_cache.pop(key, None)
return r
result.addBoth(remove)
return result.observe()
Loading…
Cancel
Save