Allow moving typing off master (#7869)

code_spécifique_watcha
Erik Johnston 4 years ago committed by GitHub
parent 649a7ead5c
commit f2e38ca867
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      changelog.d/7869.feature
  2. 36
      synapse/app/generic_worker.py
  3. 19
      synapse/config/workers.py
  4. 125
      synapse/federation/federation_server.py
  5. 241
      synapse/handlers/typing.py
  6. 9
      synapse/replication/tcp/handler.py
  7. 7
      synapse/replication/tcp/streams/_base.py
  8. 9
      synapse/rest/client/v1/room.py
  9. 13
      synapse/server.py
  10. 2
      synapse/server.pyi

@ -0,0 +1 @@
Add experimental support for moving typing off master.

@ -111,6 +111,7 @@ from synapse.rest.client.v1.room import (
RoomSendEventRestServlet,
RoomStateEventRestServlet,
RoomStateRestServlet,
RoomTypingRestServlet,
)
from synapse.rest.client.v1.voip import VoipRestServlet
from synapse.rest.client.v2_alpha import groups, sync, user_directory
@ -451,37 +452,6 @@ class GenericWorkerPresence(BasePresenceHandler):
await self._bump_active_client(user_id=user_id)
class GenericWorkerTyping(object):
def __init__(self, hs):
self._latest_room_serial = 0
self._reset()
def _reset(self):
"""
Reset the typing handler's data caches.
"""
# map room IDs to serial numbers
self._room_serials = {}
# map room IDs to sets of users currently typing
self._room_typing = {}
def process_replication_rows(self, token, rows):
if self._latest_room_serial > token:
# The master has gone backwards. To prevent inconsistent data, just
# clear everything.
self._reset()
# Set the latest serial token to whatever the server gave us.
self._latest_room_serial = token
for row in rows:
self._room_serials[row.room_id] = token
self._room_typing[row.room_id] = row.user_ids
def get_current_token(self) -> int:
return self._latest_room_serial
class GenericWorkerSlavedStore(
# FIXME(#3714): We need to add UserDirectoryStore as we write directly
# rather than going via the correct worker.
@ -558,6 +528,7 @@ class GenericWorkerServer(HomeServer):
KeyUploadServlet(self).register(resource)
AccountDataServlet(self).register(resource)
RoomAccountDataServlet(self).register(resource)
RoomTypingRestServlet(self).register(resource)
sync.register_servlets(self, resource)
events.register_servlets(self, resource)
@ -669,9 +640,6 @@ class GenericWorkerServer(HomeServer):
def build_presence_handler(self):
return GenericWorkerPresence(self)
def build_typing_handler(self):
return GenericWorkerTyping(self)
class GenericWorkerReplicationHandler(ReplicationDataHandler):
def __init__(self, hs):

@ -34,9 +34,11 @@ class WriterLocations:
Attributes:
events: The instance that writes to the event and backfill streams.
events: The instance that writes to the typing stream.
"""
events = attr.ib(default="master", type=str)
typing = attr.ib(default="master", type=str)
class WorkerConfig(Config):
@ -93,16 +95,15 @@ class WorkerConfig(Config):
writers = config.get("stream_writers") or {}
self.writers = WriterLocations(**writers)
# Check that the configured writer for events also appears in
# Check that the configured writer for events and typing also appears in
# `instance_map`.
if (
self.writers.events != "master"
and self.writers.events not in self.instance_map
):
raise ConfigError(
"Instance %r is configured to write events but does not appear in `instance_map` config."
% (self.writers.events,)
)
for stream in ("events", "typing"):
instance = getattr(self.writers, stream)
if instance != "master" and instance not in self.instance_map:
raise ConfigError(
"Instance %r is configured to write %s but does not appear in `instance_map` config."
% (instance, stream)
)
def read_arguments(self, args):
# We support a bunch of command line arguments that override options in

@ -15,7 +15,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Any, Callable, Dict, List, Match, Optional, Tuple, Union
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
Dict,
List,
Match,
Optional,
Tuple,
Union,
)
from canonicaljson import json
from prometheus_client import Counter, Histogram
@ -56,6 +67,9 @@ from synapse.util import glob_to_regex, unwrapFirstError
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
if TYPE_CHECKING:
from synapse.server import HomeServer
# when processing incoming transactions, we try to handle multiple rooms in
# parallel, up to this limit.
TRANSACTION_CONCURRENCY_LIMIT = 10
@ -768,11 +782,30 @@ class FederationHandlerRegistry(object):
query type for incoming federation traffic.
"""
def __init__(self):
self.edu_handlers = {}
self.query_handlers = {}
def __init__(self, hs: "HomeServer"):
self.config = hs.config
self.http_client = hs.get_simple_http_client()
self.clock = hs.get_clock()
self._instance_name = hs.get_instance_name()
def register_edu_handler(self, edu_type: str, handler: Callable[[str, dict], None]):
# These are safe to load in monolith mode, but will explode if we try
# and use them. However we have guards before we use them to ensure that
# we don't route to ourselves, and in monolith mode that will always be
# the case.
self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs)
self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs)
self.edu_handlers = (
{}
) # type: Dict[str, Callable[[str, dict], Awaitable[None]]]
self.query_handlers = {} # type: Dict[str, Callable[[dict], Awaitable[None]]]
# Map from type to instance name that we should route EDU handling to.
self._edu_type_to_instance = {} # type: Dict[str, str]
def register_edu_handler(
self, edu_type: str, handler: Callable[[str, dict], Awaitable[None]]
):
"""Sets the handler callable that will be used to handle an incoming
federation EDU of the given type.
@ -809,66 +842,56 @@ class FederationHandlerRegistry(object):
self.query_handlers[query_type] = handler
def register_instance_for_edu(self, edu_type: str, instance_name: str):
"""Register that the EDU handler is on a different instance than master.
"""
self._edu_type_to_instance[edu_type] = instance_name
async def on_edu(self, edu_type: str, origin: str, content: dict):
if not self.config.use_presence and edu_type == "m.presence":
return
# Check if we have a handler on this instance
handler = self.edu_handlers.get(edu_type)
if not handler:
logger.warning("No handler registered for EDU type %s", edu_type)
if handler:
with start_active_span_from_edu(content, "handle_edu"):
try:
await handler(origin, content)
except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e)
except Exception:
logger.exception("Failed to handle edu %r", edu_type)
return
with start_active_span_from_edu(content, "handle_edu"):
# Check if we can route it somewhere else that isn't us
route_to = self._edu_type_to_instance.get(edu_type, "master")
if route_to != self._instance_name:
try:
await handler(origin, content)
await self._send_edu(
instance_name=route_to,
edu_type=edu_type,
origin=origin,
content=content,
)
except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e)
except Exception:
logger.exception("Failed to handle edu %r", edu_type)
def on_query(self, query_type: str, args: dict) -> defer.Deferred:
handler = self.query_handlers.get(query_type)
if not handler:
logger.warning("No handler registered for query type %s", query_type)
raise NotFoundError("No handler for Query type '%s'" % (query_type,))
return handler(args)
class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
"""A FederationHandlerRegistry for worker processes.
When receiving EDU or queries it will check if an appropriate handler has
been registered on the worker, if there isn't one then it calls off to the
master process.
"""
def __init__(self, hs):
self.config = hs.config
self.http_client = hs.get_simple_http_client()
self.clock = hs.get_clock()
self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs)
self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs)
super(ReplicationFederationHandlerRegistry, self).__init__()
async def on_edu(self, edu_type: str, origin: str, content: dict):
"""Overrides FederationHandlerRegistry
"""
if not self.config.use_presence and edu_type == "m.presence":
return
handler = self.edu_handlers.get(edu_type)
if handler:
return await super(ReplicationFederationHandlerRegistry, self).on_edu(
edu_type, origin, content
)
return await self._send_edu(edu_type=edu_type, origin=origin, content=content)
# Oh well, let's just log and move on.
logger.warning("No handler registered for EDU type %s", edu_type)
async def on_query(self, query_type: str, args: dict):
"""Overrides FederationHandlerRegistry
"""
handler = self.query_handlers.get(query_type)
if handler:
return await handler(args)
return await self._get_query_client(query_type=query_type, args=args)
# Check if we can route it somewhere else that isn't us
if self._instance_name == "master":
return await self._get_query_client(query_type=query_type, args=args)
# Uh oh, no handler! Let's raise an exception so the request returns an
# error.
logger.warning("No handler registered for query type %s", query_type)
raise NotFoundError("No handler for Query type '%s'" % (query_type,))

@ -15,15 +15,19 @@
import logging
from collections import namedtuple
from typing import List, Tuple
from typing import TYPE_CHECKING, List, Set, Tuple
from synapse.api.errors import AuthError, SynapseError
from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.streams import TypingStream
from synapse.types import UserID, get_domain_from_id
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
@ -39,48 +43,48 @@ FEDERATION_TIMEOUT = 60 * 1000
FEDERATION_PING_INTERVAL = 40 * 1000
class TypingHandler(object):
def __init__(self, hs):
class FollowerTypingHandler:
"""A typing handler on a different process than the writer that is updated
via replication.
"""
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastore()
self.server_name = hs.config.server_name
self.auth = hs.get_auth()
self.is_mine_id = hs.is_mine_id
self.notifier = hs.get_notifier()
self.state = hs.get_state_handler()
self.hs = hs
self.clock = hs.get_clock()
self.wheel_timer = WheelTimer(bucket_size=5000)
self.is_mine_id = hs.is_mine_id
self.federation = hs.get_federation_sender()
self.federation = None
if hs.should_send_federation():
self.federation = hs.get_federation_sender()
hs.get_federation_registry().register_edu_handler("m.typing", self._recv_edu)
if hs.config.worker.writers.typing != hs.get_instance_name():
hs.get_federation_registry().register_instance_for_edu(
"m.typing", hs.config.worker.writers.typing,
)
hs.get_distributor().observe("user_left_room", self.user_left_room)
# map room IDs to serial numbers
self._room_serials = {}
# map room IDs to sets of users currently typing
self._room_typing = {}
self._member_typing_until = {} # clock time we expect to stop
self._member_last_federation_poke = {}
self.wheel_timer = WheelTimer(bucket_size=5000)
self._latest_room_serial = 0
self._reset()
# caches which room_ids changed at which serials
self._typing_stream_change_cache = StreamChangeCache(
"TypingStreamChangeCache", self._latest_room_serial
)
self.clock.looping_call(self._handle_timeouts, 5000)
def _reset(self):
"""
Reset the typing handler's data caches.
"""Reset the typing handler's data caches.
"""
# map room IDs to serial numbers
self._room_serials = {}
# map room IDs to sets of users currently typing
self._room_typing = {}
self._member_last_federation_poke = {}
self.wheel_timer = WheelTimer(bucket_size=5000)
def _handle_timeouts(self):
logger.debug("Checking for typing timeouts")
@ -89,30 +93,140 @@ class TypingHandler(object):
members = set(self.wheel_timer.fetch(now))
for member in members:
if not self.is_typing(member):
# Nothing to do if they're no longer typing
continue
until = self._member_typing_until.get(member, None)
if not until or until <= now:
logger.info("Timing out typing for: %s", member.user_id)
self._stopped_typing(member)
continue
# Check if we need to resend a keep alive over federation for this
# user.
if self.hs.is_mine_id(member.user_id):
last_fed_poke = self._member_last_federation_poke.get(member, None)
if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now:
run_in_background(self._push_remote, member=member, typing=True)
# Add a paranoia timer to ensure that we always have a timer for
# each person typing.
self.wheel_timer.insert(now=now, obj=member, then=now + 60 * 1000)
self._handle_timeout_for_member(now, member)
def _handle_timeout_for_member(self, now: int, member: RoomMember):
if not self.is_typing(member):
# Nothing to do if they're no longer typing
return
# Check if we need to resend a keep alive over federation for this
# user.
if self.federation and self.is_mine_id(member.user_id):
last_fed_poke = self._member_last_federation_poke.get(member, None)
if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now:
run_as_background_process(
"typing._push_remote", self._push_remote, member=member, typing=True
)
# Add a paranoia timer to ensure that we always have a timer for
# each person typing.
self.wheel_timer.insert(now=now, obj=member, then=now + 60 * 1000)
def is_typing(self, member):
return member.user_id in self._room_typing.get(member.room_id, [])
async def _push_remote(self, member, typing):
if not self.federation:
return
try:
users = await self.store.get_users_in_room(member.room_id)
self._member_last_federation_poke[member] = self.clock.time_msec()
now = self.clock.time_msec()
self.wheel_timer.insert(
now=now, obj=member, then=now + FEDERATION_PING_INTERVAL
)
for domain in {get_domain_from_id(u) for u in users}:
if domain != self.server_name:
logger.debug("sending typing update to %s", domain)
self.federation.build_and_send_edu(
destination=domain,
edu_type="m.typing",
content={
"room_id": member.room_id,
"user_id": member.user_id,
"typing": typing,
},
key=member,
)
except Exception:
logger.exception("Error pushing typing notif to remotes")
def process_replication_rows(
self, token: int, rows: List[TypingStream.TypingStreamRow]
):
"""Should be called whenever we receive updates for typing stream.
"""
if self._latest_room_serial > token:
# The master has gone backwards. To prevent inconsistent data, just
# clear everything.
self._reset()
# Set the latest serial token to whatever the server gave us.
self._latest_room_serial = token
for row in rows:
self._room_serials[row.room_id] = token
prev_typing = set(self._room_typing.get(row.room_id, []))
now_typing = set(row.user_ids)
self._room_typing[row.room_id] = row.user_ids
run_as_background_process(
"_handle_change_in_typing",
self._handle_change_in_typing,
row.room_id,
prev_typing,
now_typing,
)
async def _handle_change_in_typing(
self, room_id: str, prev_typing: Set[str], now_typing: Set[str]
):
"""Process a change in typing of a room from replication, sending EDUs
for any local users.
"""
for user_id in now_typing - prev_typing:
if self.is_mine_id(user_id):
await self._push_remote(RoomMember(room_id, user_id), True)
for user_id in prev_typing - now_typing:
if self.is_mine_id(user_id):
await self._push_remote(RoomMember(room_id, user_id), False)
def get_current_token(self):
return self._latest_room_serial
class TypingWriterHandler(FollowerTypingHandler):
def __init__(self, hs):
super().__init__(hs)
assert hs.config.worker.writers.typing == hs.get_instance_name()
self.auth = hs.get_auth()
self.notifier = hs.get_notifier()
self.hs = hs
hs.get_federation_registry().register_edu_handler("m.typing", self._recv_edu)
hs.get_distributor().observe("user_left_room", self.user_left_room)
self._member_typing_until = {} # clock time we expect to stop
# caches which room_ids changed at which serials
self._typing_stream_change_cache = StreamChangeCache(
"TypingStreamChangeCache", self._latest_room_serial
)
def _handle_timeout_for_member(self, now: int, member: RoomMember):
super()._handle_timeout_for_member(now, member)
if not self.is_typing(member):
# Nothing to do if they're no longer typing
return
until = self._member_typing_until.get(member, None)
if not until or until <= now:
logger.info("Timing out typing for: %s", member.user_id)
self._stopped_typing(member)
return
async def started_typing(self, target_user, auth_user, room_id, timeout):
target_user_id = target_user.to_string()
auth_user_id = auth_user.to_string()
@ -179,35 +293,11 @@ class TypingHandler(object):
def _push_update(self, member, typing):
if self.hs.is_mine_id(member.user_id):
# Only send updates for changes to our own users.
run_in_background(self._push_remote, member, typing)
self._push_update_local(member=member, typing=typing)
async def _push_remote(self, member, typing):
try:
users = await self.store.get_users_in_room(member.room_id)
self._member_last_federation_poke[member] = self.clock.time_msec()
now = self.clock.time_msec()
self.wheel_timer.insert(
now=now, obj=member, then=now + FEDERATION_PING_INTERVAL
run_as_background_process(
"typing._push_remote", self._push_remote, member, typing
)
for domain in {get_domain_from_id(u) for u in users}:
if domain != self.server_name:
logger.debug("sending typing update to %s", domain)
self.federation.build_and_send_edu(
destination=domain,
edu_type="m.typing",
content={
"room_id": member.room_id,
"user_id": member.user_id,
"typing": typing,
},
key=member,
)
except Exception:
logger.exception("Error pushing typing notif to remotes")
self._push_update_local(member=member, typing=typing)
async def _recv_edu(self, origin, content):
room_id = content["room_id"]
@ -304,8 +394,11 @@ class TypingHandler(object):
return rows, current_id, limited
def get_current_token(self):
return self._latest_room_serial
def process_replication_rows(
self, token: int, rows: List[TypingStream.TypingStreamRow]
):
# The writing process should never get updates from replication.
raise Exception("Typing writer instance got typing info over replication")
class TypingNotificationEventSource(object):

@ -42,6 +42,7 @@ from synapse.replication.tcp.streams import (
EventsStream,
FederationStream,
Stream,
TypingStream,
)
from synapse.util.async_helpers import Linearizer
@ -96,6 +97,14 @@ class ReplicationCommandHandler:
continue
if isinstance(stream, TypingStream):
# Only add TypingStream as a source on the instance in charge of
# typing.
if hs.config.worker.writers.typing == hs.get_instance_name():
self._streams_to_replicate.append(stream)
continue
# Only add any other streams if we're on master.
if hs.config.worker_app is not None:
continue

@ -294,11 +294,12 @@ class TypingStream(Stream):
def __init__(self, hs):
typing_handler = hs.get_typing_handler()
if hs.config.worker_app is None:
# on the master, query the typing handler
writer_instance = hs.config.worker.writers.typing
if writer_instance == hs.get_instance_name():
# On the writer, query the typing handler
update_function = typing_handler.get_all_typing_updates
else:
# Query master process
# Query the typing writer process
update_function = make_http_update_function(hs, self.NAME)
super().__init__(

@ -817,9 +817,18 @@ class RoomTypingRestServlet(RestServlet):
self.typing_handler = hs.get_typing_handler()
self.auth = hs.get_auth()
# If we're not on the typing writer instance we should scream if we get
# requests.
self._is_typing_writer = (
hs.config.worker.writers.typing == hs.get_instance_name()
)
async def on_PUT(self, request, room_id, user_id):
requester = await self.auth.get_user_by_req(request)
if not self._is_typing_writer:
raise Exception("Got /typing request on instance that is not typing writer")
room_id = urlparse.unquote(room_id)
target_user = UserID.from_string(urlparse.unquote(user_id))

@ -44,7 +44,6 @@ from synapse.federation.federation_client import FederationClient
from synapse.federation.federation_server import (
FederationHandlerRegistry,
FederationServer,
ReplicationFederationHandlerRegistry,
)
from synapse.federation.send_queue import FederationRemoteSendQueue
from synapse.federation.sender import FederationSender
@ -84,7 +83,7 @@ from synapse.handlers.room_member_worker import RoomMemberWorkerHandler
from synapse.handlers.set_password import SetPasswordHandler
from synapse.handlers.stats import StatsHandler
from synapse.handlers.sync import SyncHandler
from synapse.handlers.typing import TypingHandler
from synapse.handlers.typing import FollowerTypingHandler, TypingWriterHandler
from synapse.handlers.user_directory import UserDirectoryHandler
from synapse.http.client import InsecureInterceptableContextFactory, SimpleHttpClient
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
@ -378,7 +377,10 @@ class HomeServer(object):
return PresenceHandler(self)
def build_typing_handler(self):
return TypingHandler(self)
if self.config.worker.writers.typing == self.get_instance_name():
return TypingWriterHandler(self)
else:
return FollowerTypingHandler(self)
def build_sync_handler(self):
return SyncHandler(self)
@ -534,10 +536,7 @@ class HomeServer(object):
return RoomMemberMasterHandler(self)
def build_federation_registry(self):
if self.config.worker_app:
return ReplicationFederationHandlerRegistry(self)
else:
return FederationHandlerRegistry()
return FederationHandlerRegistry(self)
def build_server_notices_manager(self):
if self.config.worker_app:

@ -148,3 +148,5 @@ class HomeServer(object):
self,
) -> synapse.http.matrixfederationclient.MatrixFederationHttpClient:
pass
def should_send_federation(self) -> bool:
pass

Loading…
Cancel
Save