Convert streams to async. (#8014)

code_spécifique_watcha
Patrick Cloke 4 years ago committed by GitHub
parent 916cf2d439
commit e19de43eb5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      changelog.d/8014.misc
  2. 4
      synapse/handlers/initial_sync.py
  3. 2
      synapse/handlers/pagination.py
  4. 10
      synapse/handlers/room.py
  5. 2
      synapse/handlers/search.py
  6. 2
      synapse/handlers/sync.py
  7. 4
      synapse/notifier.py
  8. 8
      synapse/storage/data_stores/main/stream.py
  9. 22
      synapse/streams/events.py
  10. 2
      tests/server_notices/test_resource_limits_server_notices.py

@ -0,0 +1 @@
Convert various parts of the codebase to async/await.

@ -109,7 +109,7 @@ class InitialSyncHandler(BaseHandler):
rooms_ret = [] rooms_ret = []
now_token = await self.hs.get_event_sources().get_current_token() now_token = self.hs.get_event_sources().get_current_token()
presence_stream = self.hs.get_event_sources().sources["presence"] presence_stream = self.hs.get_event_sources().sources["presence"]
pagination_config = PaginationConfig(from_token=now_token) pagination_config = PaginationConfig(from_token=now_token)
@ -360,7 +360,7 @@ class InitialSyncHandler(BaseHandler):
current_state.values(), time_now current_state.values(), time_now
) )
now_token = await self.hs.get_event_sources().get_current_token() now_token = self.hs.get_event_sources().get_current_token()
limit = pagin_config.limit if pagin_config else None limit = pagin_config.limit if pagin_config else None
if limit is None: if limit is None:

@ -309,7 +309,7 @@ class PaginationHandler(object):
room_token = pagin_config.from_token.room_key room_token = pagin_config.from_token.room_key
else: else:
pagin_config.from_token = ( pagin_config.from_token = (
await self.hs.get_event_sources().get_current_token_for_pagination() self.hs.get_event_sources().get_current_token_for_pagination()
) )
room_token = pagin_config.from_token.room_key room_token = pagin_config.from_token.room_key

@ -22,7 +22,7 @@ import logging
import math import math
import string import string
from collections import OrderedDict from collections import OrderedDict
from typing import Optional, Tuple from typing import Awaitable, Optional, Tuple
from synapse.api.constants import ( from synapse.api.constants import (
EventTypes, EventTypes,
@ -1041,7 +1041,7 @@ class RoomEventSource(object):
): ):
# We just ignore the key for now. # We just ignore the key for now.
to_key = await self.get_current_key() to_key = self.get_current_key()
from_token = RoomStreamToken.parse(from_key) from_token = RoomStreamToken.parse(from_key)
if from_token.topological: if from_token.topological:
@ -1081,10 +1081,10 @@ class RoomEventSource(object):
return (events, end_key) return (events, end_key)
def get_current_key(self): def get_current_key(self) -> str:
return self.store.get_room_events_max_id() return "s%d" % (self.store.get_room_max_stream_ordering(),)
def get_current_key_for_room(self, room_id): def get_current_key_for_room(self, room_id: str) -> Awaitable[str]:
return self.store.get_room_events_max_id(room_id) return self.store.get_room_events_max_id(room_id)

@ -340,7 +340,7 @@ class SearchHandler(BaseHandler):
# If client has asked for "context" for each event (i.e. some surrounding # If client has asked for "context" for each event (i.e. some surrounding
# events and state), fetch that # events and state), fetch that
if event_context is not None: if event_context is not None:
now_token = await self.hs.get_event_sources().get_current_token() now_token = self.hs.get_event_sources().get_current_token()
contexts = {} contexts = {}
for event in allowed_events: for event in allowed_events:

@ -961,7 +961,7 @@ class SyncHandler(object):
# this is due to some of the underlying streams not supporting the ability # this is due to some of the underlying streams not supporting the ability
# to query up to a given point. # to query up to a given point.
# Always use the `now_token` in `SyncResultBuilder` # Always use the `now_token` in `SyncResultBuilder`
now_token = await self.event_sources.get_current_token() now_token = self.event_sources.get_current_token()
logger.debug( logger.debug(
"Calculating sync response for %r between %s and %s", "Calculating sync response for %r between %s and %s",

@ -320,7 +320,7 @@ class Notifier(object):
""" """
user_stream = self.user_to_user_stream.get(user_id) user_stream = self.user_to_user_stream.get(user_id)
if user_stream is None: if user_stream is None:
current_token = await self.event_sources.get_current_token() current_token = self.event_sources.get_current_token()
if room_ids is None: if room_ids is None:
room_ids = await self.store.get_rooms_for_user(user_id) room_ids = await self.store.get_rooms_for_user(user_id)
user_stream = _NotifierUserStream( user_stream = _NotifierUserStream(
@ -397,7 +397,7 @@ class Notifier(object):
""" """
from_token = pagination_config.from_token from_token = pagination_config.from_token
if not from_token: if not from_token:
from_token = await self.event_sources.get_current_token() from_token = self.event_sources.get_current_token()
limit = pagination_config.limit limit = pagination_config.limit

@ -39,6 +39,7 @@ what sort order was used:
import abc import abc
import logging import logging
from collections import namedtuple from collections import namedtuple
from typing import Optional
from twisted.internet import defer from twisted.internet import defer
@ -557,19 +558,18 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return self.db.runInteraction("get_room_event_before_stream_ordering", _f) return self.db.runInteraction("get_room_event_before_stream_ordering", _f)
@defer.inlineCallbacks async def get_room_events_max_id(self, room_id: Optional[str] = None) -> str:
def get_room_events_max_id(self, room_id=None):
"""Returns the current token for rooms stream. """Returns the current token for rooms stream.
By default, it returns the current global stream token. Specifying a By default, it returns the current global stream token. Specifying a
`room_id` causes it to return the current room specific topological `room_id` causes it to return the current room specific topological
token. token.
""" """
token = yield self.get_room_max_stream_ordering() token = self.get_room_max_stream_ordering()
if room_id is None: if room_id is None:
return "s%d" % (token,) return "s%d" % (token,)
else: else:
topo = yield self.db.runInteraction( topo = await self.db.runInteraction(
"_get_max_topological_txn", self._get_max_topological_txn, room_id "_get_max_topological_txn", self._get_max_topological_txn, room_id
) )
return "t%d-%d" % (topo, token) return "t%d-%d" % (topo, token)

@ -15,8 +15,6 @@
from typing import Any, Dict from typing import Any, Dict
from twisted.internet import defer
from synapse.handlers.account_data import AccountDataEventSource from synapse.handlers.account_data import AccountDataEventSource
from synapse.handlers.presence import PresenceEventSource from synapse.handlers.presence import PresenceEventSource
from synapse.handlers.receipts import ReceiptEventSource from synapse.handlers.receipts import ReceiptEventSource
@ -40,19 +38,18 @@ class EventSources(object):
} # type: Dict[str, Any] } # type: Dict[str, Any]
self.store = hs.get_datastore() self.store = hs.get_datastore()
@defer.inlineCallbacks def get_current_token(self) -> StreamToken:
def get_current_token(self):
push_rules_key, _ = self.store.get_push_rules_stream_token() push_rules_key, _ = self.store.get_push_rules_stream_token()
to_device_key = self.store.get_to_device_stream_token() to_device_key = self.store.get_to_device_stream_token()
device_list_key = self.store.get_device_stream_token() device_list_key = self.store.get_device_stream_token()
groups_key = self.store.get_group_stream_token() groups_key = self.store.get_group_stream_token()
token = StreamToken( token = StreamToken(
room_key=(yield self.sources["room"].get_current_key()), room_key=self.sources["room"].get_current_key(),
presence_key=(yield self.sources["presence"].get_current_key()), presence_key=self.sources["presence"].get_current_key(),
typing_key=(yield self.sources["typing"].get_current_key()), typing_key=self.sources["typing"].get_current_key(),
receipt_key=(yield self.sources["receipt"].get_current_key()), receipt_key=self.sources["receipt"].get_current_key(),
account_data_key=(yield self.sources["account_data"].get_current_key()), account_data_key=self.sources["account_data"].get_current_key(),
push_rules_key=push_rules_key, push_rules_key=push_rules_key,
to_device_key=to_device_key, to_device_key=to_device_key,
device_list_key=device_list_key, device_list_key=device_list_key,
@ -60,8 +57,7 @@ class EventSources(object):
) )
return token return token
@defer.inlineCallbacks def get_current_token_for_pagination(self) -> StreamToken:
def get_current_token_for_pagination(self):
"""Get the current token for a given room to be used to paginate """Get the current token for a given room to be used to paginate
events. events.
@ -69,10 +65,10 @@ class EventSources(object):
than `room`, since they are not used during pagination. than `room`, since they are not used during pagination.
Returns: Returns:
Deferred[StreamToken] The current token for pagination.
""" """
token = StreamToken( token = StreamToken(
room_key=(yield self.sources["room"].get_current_key()), room_key=self.sources["room"].get_current_key(),
presence_key=0, presence_key=0,
typing_key=0, typing_key=0,
receipt_key=0, receipt_key=0,

@ -275,7 +275,7 @@ class TestResourceLimitsServerNoticesWithRealRooms(unittest.HomeserverTestCase):
self.server_notices_manager.get_or_create_notice_room_for_user(self.user_id) self.server_notices_manager.get_or_create_notice_room_for_user(self.user_id)
) )
token = self.get_success(self.event_source.get_current_token()) token = self.event_source.get_current_token()
events, _ = self.get_success( events, _ = self.get_success(
self.store.get_recent_events_for_room( self.store.get_recent_events_for_room(
room_id, limit=100, end_token=token.room_key room_id, limit=100, end_token=token.room_key

Loading…
Cancel
Save