Add ability to run replication protocol over redis. (#7040)

This is configured via the `redis` config options.
code_spécifique_watcha
Erik Johnston 5 years ago committed by GitHub
parent 5308239d5d
commit 51f7eaf908
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      changelog.d/7040.feature
  2. 40
      stubs/txredisapi.pyi
  3. 6
      synapse/app/homeserver.py
  4. 2
      synapse/config/homeserver.py
  5. 35
      synapse/config/redis.py
  6. 1
      synapse/python_dependencies.py
  7. 2
      synapse/replication/tcp/client.py
  8. 18
      synapse/replication/tcp/commands.py
  9. 50
      synapse/replication/tcp/handler.py
  10. 38
      synapse/replication/tcp/protocol.py
  11. 181
      synapse/replication/tcp/redis.py
  12. 4
      tests/replication/slave/storage/_base.py

@ -0,0 +1 @@
Add support for running replication over Redis when using workers.

@ -0,0 +1,40 @@
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Contains *incomplete* type hints for txredisapi.
"""
from typing import List, Optional, Union
class RedisProtocol:
def publish(self, channel: str, message: bytes): ...
class SubscriberProtocol:
def subscribe(self, channels: Union[str, List[str]]): ...
def lazyConnection(
host: str = ...,
port: int = ...,
dbid: Optional[int] = ...,
reconnect: bool = ...,
charset: str = ...,
password: Optional[str] = ...,
connectTimeout: Optional[int] = ...,
replyTimeout: Optional[int] = ...,
convertNumbers: bool = ...,
) -> RedisProtocol: ...
class SubscriberFactory:
def buildProtocol(self, addr): ...

@ -273,6 +273,12 @@ class SynapseHomeServer(HomeServer):
def start_listening(self, listeners):
config = self.get_config()
if config.redis_enabled:
# If redis is enabled we connect via the replication command handler
# in the same way as the workers (since we're effectively a client
# rather than a server).
self.get_tcp_replication().start_replication(self)
for listener in listeners:
if listener["type"] == "http":
self._listening_services.extend(self._listener_http(config, listener))

@ -31,6 +31,7 @@ from .password import PasswordConfig
from .password_auth_providers import PasswordAuthProviderConfig
from .push import PushConfig
from .ratelimiting import RatelimitConfig
from .redis import RedisConfig
from .registration import RegistrationConfig
from .repository import ContentRepositoryConfig
from .room_directory import RoomDirectoryConfig
@ -82,4 +83,5 @@ class HomeServerConfig(RootConfig):
RoomDirectoryConfig,
ThirdPartyRulesConfig,
TracerConfig,
RedisConfig,
]

@ -0,0 +1,35 @@
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.config._base import Config
from synapse.python_dependencies import check_requirements
class RedisConfig(Config):
section = "redis"
def read_config(self, config, **kwargs):
redis_config = config.get("redis", {})
self.redis_enabled = redis_config.get("enabled", False)
if not self.redis_enabled:
return
check_requirements("redis")
self.redis_host = redis_config.get("host", "localhost")
self.redis_port = redis_config.get("port", 6379)
self.redis_dbid = redis_config.get("dbid")
self.redis_password = redis_config.get("password")

@ -98,6 +98,7 @@ CONDITIONAL_REQUIREMENTS = {
"sentry": ["sentry-sdk>=0.7.2"],
"opentracing": ["jaeger-client>=4.0.0", "opentracing>=2.2.0"],
"jwt": ["pyjwt>=1.6.4"],
"redis": ["txredisapi>=1.4.7"],
}
ALL_OPTIONAL_REQUIREMENTS = set() # type: Set[str]

@ -30,7 +30,7 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
class ReplicationClientFactory(ReconnectingClientFactory):
class DirectTcpReplicationClientFactory(ReconnectingClientFactory):
"""Factory for building connections to the master. Will reconnect if the
connection is lost.

@ -454,3 +454,21 @@ VALID_CLIENT_COMMANDS = (
ErrorCommand.NAME,
RemoteServerUpCommand.NAME,
)
def parse_command_from_line(line: str) -> Command:
"""Parses a command from a received line.
Line should already be stripped of whitespace and be checked if blank.
"""
idx = line.index(" ")
if idx >= 0:
cmd_name = line[:idx]
rest_of_line = line[idx + 1 :]
else:
cmd_name = line
rest_of_line = ""
cmd_cls = COMMAND_MAP[cmd_name]
return cmd_cls.from_line(rest_of_line)

@ -30,8 +30,10 @@ from typing import (
from prometheus_client import Counter
from twisted.internet.protocol import ReconnectingClientFactory
from synapse.metrics import LaterGauge
from synapse.replication.tcp.client import ReplicationClientFactory
from synapse.replication.tcp.client import DirectTcpReplicationClientFactory
from synapse.replication.tcp.commands import (
ClearUserSyncsCommand,
Command,
@ -92,7 +94,7 @@ class ReplicationCommandHandler:
self._pending_batches = {} # type: Dict[str, List[Any]]
# The factory used to create connections.
self._factory = None # type: Optional[ReplicationClientFactory]
self._factory = None # type: Optional[ReconnectingClientFactory]
# The currently connected connections.
self._connections = [] # type: List[AbstractConnection]
@ -119,11 +121,45 @@ class ReplicationCommandHandler:
"""Helper method to start a replication connection to the remote server
using TCP.
"""
client_name = hs.config.worker_name
self._factory = ReplicationClientFactory(hs, client_name, self)
host = hs.config.worker_replication_host
port = hs.config.worker_replication_port
hs.get_reactor().connectTCP(host, port, self._factory)
if hs.config.redis.redis_enabled:
from synapse.replication.tcp.redis import (
RedisDirectTcpReplicationClientFactory,
)
import txredisapi
logger.info(
"Connecting to redis (host=%r port=%r DBID=%r)",
hs.config.redis_host,
hs.config.redis_port,
hs.config.redis_dbid,
)
# We need two connections to redis, one for the subscription stream and
# one to send commands to (as you can't send further redis commands to a
# connection after SUBSCRIBE is called).
# First create the connection for sending commands.
outbound_redis_connection = txredisapi.lazyConnection(
host=hs.config.redis_host,
port=hs.config.redis_port,
dbid=hs.config.redis_dbid,
password=hs.config.redis.redis_password,
reconnect=True,
)
# Now create the factory/connection for the subscription stream.
self._factory = RedisDirectTcpReplicationClientFactory(
hs, outbound_redis_connection
)
hs.get_reactor().connectTCP(
hs.config.redis.redis_host, hs.config.redis.redis_port, self._factory,
)
else:
client_name = hs.config.worker_name
self._factory = DirectTcpReplicationClientFactory(hs, client_name, self)
host = hs.config.worker_replication_host
port = hs.config.worker_replication_port
hs.get_reactor().connectTCP(host, port, self._factory)
async def on_REPLICATE(self, cmd: ReplicateCommand):
# We only want to announce positions by the writer of the streams.

@ -63,7 +63,6 @@ from twisted.python.failure import Failure
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.commands import (
COMMAND_MAP,
VALID_CLIENT_COMMANDS,
VALID_SERVER_COMMANDS,
Command,
@ -72,6 +71,7 @@ from synapse.replication.tcp.commands import (
PingCommand,
ReplicateCommand,
ServerCommand,
parse_command_from_line,
)
from synapse.types import Collection
from synapse.util import Clock
@ -210,38 +210,24 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
linestr = line.decode("utf-8")
# split at the first " ", handling one-word commands
idx = linestr.index(" ")
if idx >= 0:
cmd_name = linestr[:idx]
rest_of_line = linestr[idx + 1 :]
else:
cmd_name = linestr
rest_of_line = ""
try:
cmd = parse_command_from_line(linestr)
except Exception as e:
logger.exception("[%s] failed to parse line: %r", self.id(), linestr)
self.send_error("failed to parse line: %r (%r):" % (e, linestr))
return
if cmd_name not in self.VALID_INBOUND_COMMANDS:
logger.error("[%s] invalid command %s", self.id(), cmd_name)
self.send_error("invalid command: %s", cmd_name)
if cmd.NAME not in self.VALID_INBOUND_COMMANDS:
logger.error("[%s] invalid command %s", self.id(), cmd.NAME)
self.send_error("invalid command: %s", cmd.NAME)
return
self.last_received_command = self.clock.time_msec()
self.inbound_commands_counter[cmd_name] = (
self.inbound_commands_counter[cmd_name] + 1
self.inbound_commands_counter[cmd.NAME] = (
self.inbound_commands_counter[cmd.NAME] + 1
)
cmd_cls = COMMAND_MAP[cmd_name]
try:
cmd = cmd_cls.from_line(rest_of_line)
except Exception as e:
logger.exception(
"[%s] failed to parse line %r: %r", self.id(), cmd_name, rest_of_line
)
self.send_error(
"failed to parse line for %r: %r (%r):" % (cmd_name, e, rest_of_line)
)
return
# Now lets try and call on_<CMD_NAME> function
run_as_background_process(
"replication-" + cmd.get_logcontext_id(), self.handle_command, cmd

@ -0,0 +1,181 @@
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TYPE_CHECKING
import txredisapi
from synapse.logging.context import PreserveLoggingContext
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.commands import (
Command,
ReplicateCommand,
parse_command_from_line,
)
from synapse.replication.tcp.protocol import AbstractConnection
if TYPE_CHECKING:
from synapse.replication.tcp.handler import ReplicationCommandHandler
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
"""Connection to redis subscribed to replication stream.
Parses incoming messages from redis into replication commands, and passes
them to `ReplicationCommandHandler`
Due to the vagaries of `txredisapi` we don't want to have a custom
constructor, so instead we expect the defined attributes below to be set
immediately after initialisation.
Attributes:
handler: The command handler to handle incoming commands.
stream_name: The *redis* stream name to subscribe to (not anything to
do with Synapse replication streams).
outbound_redis_connection: The connection to redis to use to send
commands.
"""
handler = None # type: ReplicationCommandHandler
stream_name = None # type: str
outbound_redis_connection = None # type: txredisapi.RedisProtocol
def connectionMade(self):
logger.info("Connected to redis instance")
self.subscribe(self.stream_name)
self.send_command(ReplicateCommand())
self.handler.new_connection(self)
def messageReceived(self, pattern: str, channel: str, message: str):
"""Received a message from redis.
"""
if message.strip() == "":
# Ignore blank lines
return
try:
cmd = parse_command_from_line(message)
except Exception:
logger.exception(
"[%s] failed to parse line: %r", message,
)
return
# Now lets try and call on_<CMD_NAME> function
run_as_background_process(
"replication-" + cmd.get_logcontext_id(), self.handle_command, cmd
)
async def handle_command(self, cmd: Command):
"""Handle a command we have received over the replication stream.
By default delegates to on_<COMMAND>, which should return an awaitable.
Args:
cmd: received command
"""
handled = False
# First call any command handlers on this instance. These are for redis
# specific handling.
cmd_func = getattr(self, "on_%s" % (cmd.NAME,), None)
if cmd_func:
await cmd_func(cmd)
handled = True
# Then call out to the handler.
cmd_func = getattr(self.handler, "on_%s" % (cmd.NAME,), None)
if cmd_func:
await cmd_func(cmd)
handled = True
if not handled:
logger.warning("Unhandled command: %r", cmd)
def connectionLost(self, reason):
logger.info("Lost connection to redis instance")
self.handler.lost_connection(self)
def send_command(self, cmd: Command):
"""Send a command if connection has been established.
Args:
cmd (Command)
"""
string = "%s %s" % (cmd.NAME, cmd.to_line())
if "\n" in string:
raise Exception("Unexpected newline in command: %r", string)
encoded_string = string.encode("utf-8")
async def _send():
with PreserveLoggingContext():
# Note that we use the other connection as we can't send
# commands using the subscription connection.
await self.outbound_redis_connection.publish(
self.stream_name, encoded_string
)
run_as_background_process("send-cmd", _send)
class RedisDirectTcpReplicationClientFactory(txredisapi.SubscriberFactory):
"""This is a reconnecting factory that connects to redis and immediately
subscribes to a stream.
Args:
hs
outbound_redis_connection: A connection to redis that will be used to
send outbound commands (this is seperate to the redis connection
used to subscribe).
"""
maxDelay = 5
continueTrying = True
protocol = RedisSubscriber
def __init__(
self, hs: "HomeServer", outbound_redis_connection: txredisapi.RedisProtocol
):
super().__init__()
# This sets the password on the RedisFactory base class (as
# SubscriberFactory constructor doesn't pass it through).
self.password = hs.config.redis.redis_password
self.handler = hs.get_tcp_replication()
self.stream_name = hs.hostname
self.outbound_redis_connection = outbound_redis_connection
def buildProtocol(self, addr):
p = super().buildProtocol(addr) # type: RedisSubscriber
# We do this here rather than add to the constructor of `RedisSubcriber`
# as to do so would involve overriding `buildProtocol` entirely, however
# the base method does some other things than just instantiating the
# protocol.
p.handler = self.handler
p.outbound_redis_connection = self.outbound_redis_connection
p.stream_name = self.stream_name
return p

@ -16,7 +16,7 @@
from mock import Mock, NonCallableMock
from synapse.replication.tcp.client import (
ReplicationClientFactory,
DirectTcpReplicationClientFactory,
ReplicationDataHandler,
)
from synapse.replication.tcp.handler import ReplicationCommandHandler
@ -61,7 +61,7 @@ class BaseSlavedStoreTestCase(unittest.HomeserverTestCase):
self.slaved_store
)
client_factory = ReplicationClientFactory(
client_factory = DirectTcpReplicationClientFactory(
self.hs, "client_name", self.replication_handler
)
client_factory.handler = self.replication_handler

Loading…
Cancel
Save