Unix Sockets for HTTP Replication (#15708)

Unix socket support for `federation` and `client` Listeners has existed now for a little while(since [1.81.0](https://github.com/matrix-org/synapse/pull/15353)), but there was one last hold out before it could be complete: HTTP Replication communication. This should finish it up. The Listeners would have always worked, but would have had no way to be talked to/at.

---------

Co-authored-by: Eric Eastwood <madlittlemods@gmail.com>
Co-authored-by: Olivier Wilkinson (reivilibre) <oliverw@matrix.org>
Co-authored-by: Eric Eastwood <erice@element.io>
1.103.0-whithout-watcha
Jason Little 1 year ago committed by GitHub
parent a4243183f0
commit 224ef0b669
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      changelog.d/15708.feature
  2. 4
      docker/conf-workers/nginx.conf.j2
  3. 3
      docker/conf-workers/shared.yaml.j2
  4. 4
      docker/conf-workers/supervisord.conf.j2
  5. 4
      docker/conf-workers/worker.yaml.j2
  6. 10
      docker/conf/homeserver.yaml
  7. 104
      docker/configure_workers_and_start.py
  8. 1
      docs/development/contributing_guide.md
  9. 52
      docs/usage/configuration/config_documentation.md
  10. 9
      docs/workers.md
  11. 4
      scripts-dev/complement.sh
  12. 24
      synapse/config/workers.py
  13. 47
      synapse/http/replicationagent.py
  14. 6
      synapse/logging/opentracing.py
  15. 7
      tests/replication/_base.py
  16. 32
      tests/server.py

@ -0,0 +1 @@
Add Unix Socket support for HTTP Replication Listeners. Document and provide usage instructions for utilizing Unix sockets in Synapse. Contributed by Jason Little.

@ -35,7 +35,11 @@ server {
# Send all other traffic to the main process # Send all other traffic to the main process
location ~* ^(\\/_matrix|\\/_synapse) { location ~* ^(\\/_matrix|\\/_synapse) {
{% if using_unix_sockets %}
proxy_pass http://unix:/run/main_public.sock;
{% else %}
proxy_pass http://localhost:8080; proxy_pass http://localhost:8080;
{% endif %}
proxy_set_header X-Forwarded-For $remote_addr; proxy_set_header X-Forwarded-For $remote_addr;
proxy_set_header X-Forwarded-Proto $scheme; proxy_set_header X-Forwarded-Proto $scheme;
proxy_set_header Host $host; proxy_set_header Host $host;

@ -6,6 +6,9 @@
{% if enable_redis %} {% if enable_redis %}
redis: redis:
enabled: true enabled: true
{% if using_unix_sockets %}
path: /tmp/redis.sock
{% endif %}
{% endif %} {% endif %}
{% if appservice_registrations is not none %} {% if appservice_registrations is not none %}

@ -19,7 +19,11 @@ username=www-data
autorestart=true autorestart=true
[program:redis] [program:redis]
{% if using_unix_sockets %}
command=/usr/local/bin/prefix-log /usr/local/bin/redis-server --unixsocket /tmp/redis.sock
{% else %}
command=/usr/local/bin/prefix-log /usr/local/bin/redis-server command=/usr/local/bin/prefix-log /usr/local/bin/redis-server
{% endif %}
priority=1 priority=1
stdout_logfile=/dev/stdout stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0 stdout_logfile_maxbytes=0

@ -8,7 +8,11 @@ worker_name: "{{ name }}"
worker_listeners: worker_listeners:
- type: http - type: http
{% if using_unix_sockets %}
path: "/run/worker.{{ port }}"
{% else %}
port: {{ port }} port: {{ port }}
{% endif %}
{% if listener_resources %} {% if listener_resources %}
resources: resources:
- names: - names:

@ -36,12 +36,17 @@ listeners:
# Allow configuring in case we want to reverse proxy 8008 # Allow configuring in case we want to reverse proxy 8008
# using another process in the same container # using another process in the same container
{% if SYNAPSE_USE_UNIX_SOCKET %}
# Unix sockets don't care about TLS or IP addresses or ports
- path: '/run/main_public.sock'
type: http
{% else %}
- port: {{ SYNAPSE_HTTP_PORT or 8008 }} - port: {{ SYNAPSE_HTTP_PORT or 8008 }}
tls: false tls: false
bind_addresses: ['::'] bind_addresses: ['::']
type: http type: http
x_forwarded: false x_forwarded: false
{% endif %}
resources: resources:
- names: [client] - names: [client]
compress: true compress: true
@ -57,8 +62,11 @@ database:
user: "{{ POSTGRES_USER or "synapse" }}" user: "{{ POSTGRES_USER or "synapse" }}"
password: "{{ POSTGRES_PASSWORD }}" password: "{{ POSTGRES_PASSWORD }}"
database: "{{ POSTGRES_DB or "synapse" }}" database: "{{ POSTGRES_DB or "synapse" }}"
{% if not SYNAPSE_USE_UNIX_SOCKET %}
{# Synapse will use a default unix socket for Postgres when host/port is not specified (behavior from `psycopg2`). #}
host: "{{ POSTGRES_HOST or "db" }}" host: "{{ POSTGRES_HOST or "db" }}"
port: "{{ POSTGRES_PORT or "5432" }}" port: "{{ POSTGRES_PORT or "5432" }}"
{% endif %}
cp_min: 5 cp_min: 5
cp_max: 10 cp_max: 10
{% else %} {% else %}

@ -74,6 +74,9 @@ MAIN_PROCESS_HTTP_LISTENER_PORT = 8080
MAIN_PROCESS_INSTANCE_NAME = "main" MAIN_PROCESS_INSTANCE_NAME = "main"
MAIN_PROCESS_LOCALHOST_ADDRESS = "127.0.0.1" MAIN_PROCESS_LOCALHOST_ADDRESS = "127.0.0.1"
MAIN_PROCESS_REPLICATION_PORT = 9093 MAIN_PROCESS_REPLICATION_PORT = 9093
# Obviously, these would only be used with the UNIX socket option
MAIN_PROCESS_UNIX_SOCKET_PUBLIC_PATH = "/run/main_public.sock"
MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH = "/run/main_private.sock"
# A simple name used as a placeholder in the WORKERS_CONFIG below. This will be replaced # A simple name used as a placeholder in the WORKERS_CONFIG below. This will be replaced
# during processing with the name of the worker. # during processing with the name of the worker.
@ -407,11 +410,15 @@ def add_worker_roles_to_shared_config(
) )
# Map of stream writer instance names to host/ports combos # Map of stream writer instance names to host/ports combos
instance_map[worker_name] = { if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
"host": "localhost", instance_map[worker_name] = {
"port": worker_port, "path": f"/run/worker.{worker_port}",
} }
else:
instance_map[worker_name] = {
"host": "localhost",
"port": worker_port,
}
# Update the list of stream writers. It's convenient that the name of the worker # Update the list of stream writers. It's convenient that the name of the worker
# type is the same as the stream to write. Iterate over the whole list in case there # type is the same as the stream to write. Iterate over the whole list in case there
# is more than one. # is more than one.
@ -423,10 +430,15 @@ def add_worker_roles_to_shared_config(
# Map of stream writer instance names to host/ports combos # Map of stream writer instance names to host/ports combos
# For now, all stream writers need http replication ports # For now, all stream writers need http replication ports
instance_map[worker_name] = { if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
"host": "localhost", instance_map[worker_name] = {
"port": worker_port, "path": f"/run/worker.{worker_port}",
} }
else:
instance_map[worker_name] = {
"host": "localhost",
"port": worker_port,
}
def merge_worker_template_configs( def merge_worker_template_configs(
@ -718,17 +730,29 @@ def generate_worker_files(
# Note that yaml cares about indentation, so care should be taken to insert lines # Note that yaml cares about indentation, so care should be taken to insert lines
# into files at the correct indentation below. # into files at the correct indentation below.
# Convenience helper for if using unix sockets instead of host:port
using_unix_sockets = environ.get("SYNAPSE_USE_UNIX_SOCKET", False)
# First read the original config file and extract the listeners block. Then we'll # First read the original config file and extract the listeners block. Then we'll
# add another listener for replication. Later we'll write out the result to the # add another listener for replication. Later we'll write out the result to the
# shared config file. # shared config file.
listeners = [ listeners: List[Any]
{ if using_unix_sockets:
"port": MAIN_PROCESS_REPLICATION_PORT, listeners = [
"bind_address": MAIN_PROCESS_LOCALHOST_ADDRESS, {
"type": "http", "path": MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH,
"resources": [{"names": ["replication"]}], "type": "http",
} "resources": [{"names": ["replication"]}],
] }
]
else:
listeners = [
{
"port": MAIN_PROCESS_REPLICATION_PORT,
"bind_address": MAIN_PROCESS_LOCALHOST_ADDRESS,
"type": "http",
"resources": [{"names": ["replication"]}],
}
]
with open(config_path) as file_stream: with open(config_path) as file_stream:
original_config = yaml.safe_load(file_stream) original_config = yaml.safe_load(file_stream)
original_listeners = original_config.get("listeners") original_listeners = original_config.get("listeners")
@ -769,7 +793,17 @@ def generate_worker_files(
# A list of internal endpoints to healthcheck, starting with the main process # A list of internal endpoints to healthcheck, starting with the main process
# which exists even if no workers do. # which exists even if no workers do.
healthcheck_urls = ["http://localhost:8080/health"] # This list ends up being part of the command line to curl, (curl added support for
# Unix sockets in version 7.40).
if using_unix_sockets:
healthcheck_urls = [
f"--unix-socket {MAIN_PROCESS_UNIX_SOCKET_PUBLIC_PATH} "
# The scheme and hostname from the following URL are ignored.
# The only thing that matters is the path `/health`
"http://localhost/health"
]
else:
healthcheck_urls = ["http://localhost:8080/health"]
# Get the set of all worker types that we have configured # Get the set of all worker types that we have configured
all_worker_types_in_use = set(chain(*requested_worker_types.values())) all_worker_types_in_use = set(chain(*requested_worker_types.values()))
@ -806,8 +840,12 @@ def generate_worker_files(
# given worker_type needs to stay assigned and not be replaced. # given worker_type needs to stay assigned and not be replaced.
worker_config["shared_extra_conf"].update(shared_config) worker_config["shared_extra_conf"].update(shared_config)
shared_config = worker_config["shared_extra_conf"] shared_config = worker_config["shared_extra_conf"]
if using_unix_sockets:
healthcheck_urls.append("http://localhost:%d/health" % (worker_port,)) healthcheck_urls.append(
f"--unix-socket /run/worker.{worker_port} http://localhost/health"
)
else:
healthcheck_urls.append("http://localhost:%d/health" % (worker_port,))
# Update the shared config with sharding-related options if necessary # Update the shared config with sharding-related options if necessary
add_worker_roles_to_shared_config( add_worker_roles_to_shared_config(
@ -826,6 +864,7 @@ def generate_worker_files(
"/conf/workers/{name}.yaml".format(name=worker_name), "/conf/workers/{name}.yaml".format(name=worker_name),
**worker_config, **worker_config,
worker_log_config_filepath=log_config_filepath, worker_log_config_filepath=log_config_filepath,
using_unix_sockets=using_unix_sockets,
) )
# Save this worker's port number to the correct nginx upstreams # Save this worker's port number to the correct nginx upstreams
@ -846,8 +885,13 @@ def generate_worker_files(
nginx_upstream_config = "" nginx_upstream_config = ""
for upstream_worker_base_name, upstream_worker_ports in nginx_upstreams.items(): for upstream_worker_base_name, upstream_worker_ports in nginx_upstreams.items():
body = "" body = ""
for port in upstream_worker_ports: if using_unix_sockets:
body += f" server localhost:{port};\n" for port in upstream_worker_ports:
body += f" server unix:/run/worker.{port};\n"
else:
for port in upstream_worker_ports:
body += f" server localhost:{port};\n"
# Add to the list of configured upstreams # Add to the list of configured upstreams
nginx_upstream_config += NGINX_UPSTREAM_CONFIG_BLOCK.format( nginx_upstream_config += NGINX_UPSTREAM_CONFIG_BLOCK.format(
@ -877,10 +921,15 @@ def generate_worker_files(
# If there are workers, add the main process to the instance_map too. # If there are workers, add the main process to the instance_map too.
if workers_in_use: if workers_in_use:
instance_map = shared_config.setdefault("instance_map", {}) instance_map = shared_config.setdefault("instance_map", {})
instance_map[MAIN_PROCESS_INSTANCE_NAME] = { if using_unix_sockets:
"host": MAIN_PROCESS_LOCALHOST_ADDRESS, instance_map[MAIN_PROCESS_INSTANCE_NAME] = {
"port": MAIN_PROCESS_REPLICATION_PORT, "path": MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH,
} }
else:
instance_map[MAIN_PROCESS_INSTANCE_NAME] = {
"host": MAIN_PROCESS_LOCALHOST_ADDRESS,
"port": MAIN_PROCESS_REPLICATION_PORT,
}
# Shared homeserver config # Shared homeserver config
convert( convert(
@ -890,6 +939,7 @@ def generate_worker_files(
appservice_registrations=appservice_registrations, appservice_registrations=appservice_registrations,
enable_redis=workers_in_use, enable_redis=workers_in_use,
workers_in_use=workers_in_use, workers_in_use=workers_in_use,
using_unix_sockets=using_unix_sockets,
) )
# Nginx config # Nginx config
@ -900,6 +950,7 @@ def generate_worker_files(
upstream_directives=nginx_upstream_config, upstream_directives=nginx_upstream_config,
tls_cert_path=os.environ.get("SYNAPSE_TLS_CERT"), tls_cert_path=os.environ.get("SYNAPSE_TLS_CERT"),
tls_key_path=os.environ.get("SYNAPSE_TLS_KEY"), tls_key_path=os.environ.get("SYNAPSE_TLS_KEY"),
using_unix_sockets=using_unix_sockets,
) )
# Supervisord config # Supervisord config
@ -909,6 +960,7 @@ def generate_worker_files(
"/etc/supervisor/supervisord.conf", "/etc/supervisor/supervisord.conf",
main_config_path=config_path, main_config_path=config_path,
enable_redis=workers_in_use, enable_redis=workers_in_use,
using_unix_sockets=using_unix_sockets,
) )
convert( convert(

@ -370,6 +370,7 @@ The above will run a monolithic (single-process) Synapse with SQLite as the data
See the [worker documentation](../workers.md) for additional information on workers. See the [worker documentation](../workers.md) for additional information on workers.
- Passing `ASYNCIO_REACTOR=1` as an environment variable to use the Twisted asyncio reactor instead of the default one. - Passing `ASYNCIO_REACTOR=1` as an environment variable to use the Twisted asyncio reactor instead of the default one.
- Passing `PODMAN=1` will use the [podman](https://podman.io/) container runtime, instead of docker. - Passing `PODMAN=1` will use the [podman](https://podman.io/) container runtime, instead of docker.
- Passing `UNIX_SOCKETS=1` will utilise Unix socket functionality for Synapse, Redis, and Postgres(when applicable).
To increase the log level for the tests, set `SYNAPSE_TEST_LOG_LEVEL`, e.g: To increase the log level for the tests, set `SYNAPSE_TEST_LOG_LEVEL`, e.g:
```sh ```sh

@ -462,6 +462,20 @@ See the docs [request log format](../administration/request_log.md).
* `additional_resources`: Only valid for an 'http' listener. A map of * `additional_resources`: Only valid for an 'http' listener. A map of
additional endpoints which should be loaded via dynamic modules. additional endpoints which should be loaded via dynamic modules.
Unix socket support (_Added in Synapse 1.88.0_):
* `path`: A path and filename for a Unix socket. Make sure it is located in a
directory with read and write permissions, and that it already exists (the directory
will not be created). Defaults to `None`.
* **Note**: The use of both `path` and `port` options for the same `listener` is not
compatible.
* The `x_forwarded` option defaults to true when using Unix sockets and can be omitted.
* Other options that would not make sense to use with a UNIX socket, such as
`bind_addresses` and `tls` will be ignored and can be removed.
* `mode`: The file permissions to set on the UNIX socket. Defaults to `666`
* **Note:** Must be set as `type: http` (does not support `metrics` and `manhole`).
Also make sure that `metrics` is not included in `resources` -> `names`
Valid resource names are: Valid resource names are:
* `client`: the client-server API (/_matrix/client), and the synapse admin API (/_synapse/admin). Also implies `media` and `static`. * `client`: the client-server API (/_matrix/client), and the synapse admin API (/_synapse/admin). Also implies `media` and `static`.
@ -474,7 +488,7 @@ Valid resource names are:
* `media`: the media API (/_matrix/media). * `media`: the media API (/_matrix/media).
* `metrics`: the metrics interface. See [here](../../metrics-howto.md). * `metrics`: the metrics interface. See [here](../../metrics-howto.md). (Not compatible with Unix sockets)
* `openid`: OpenID authentication. See [here](../../openid.md). * `openid`: OpenID authentication. See [here](../../openid.md).
@ -533,6 +547,22 @@ listeners:
bind_addresses: ['::1', '127.0.0.1'] bind_addresses: ['::1', '127.0.0.1']
type: manhole type: manhole
``` ```
Example configuration #3:
```yaml
listeners:
# Unix socket listener: Ideal for Synapse deployments behind a reverse proxy, offering
# lightweight interprocess communication without TCP/IP overhead, avoid port
# conflicts, and providing enhanced security through system file permissions.
#
# Note that x_forwarded will default to true, when using a UNIX socket. Please see
# https://matrix-org.github.io/synapse/latest/reverse_proxy.html.
#
- path: /var/run/synapse/main_public.sock
type: http
resources:
- names: [client, federation]
```
--- ---
### `manhole_settings` ### `manhole_settings`
@ -3949,6 +3979,14 @@ instance_map:
host: localhost host: localhost
port: 8034 port: 8034
``` ```
Example configuration(#2, for UNIX sockets):
```yaml
instance_map:
main:
path: /var/run/synapse/main_replication.sock
worker1:
path: /var/run/synapse/worker1_replication.sock
```
--- ---
### `stream_writers` ### `stream_writers`
@ -4108,6 +4146,18 @@ worker_listeners:
resources: resources:
- names: [client, federation] - names: [client, federation]
``` ```
Example configuration(#2, using UNIX sockets with a `replication` listener):
```yaml
worker_listeners:
- type: http
path: /var/run/synapse/worker_public.sock
resources:
- names: [client, federation]
- type: http
path: /var/run/synapse/worker_replication.sock
resources:
- names: [replication]
```
--- ---
### `worker_manhole` ### `worker_manhole`

@ -95,9 +95,12 @@ for the main process
* Secondly, you need to enable * Secondly, you need to enable
[redis-based replication](usage/configuration/config_documentation.md#redis) [redis-based replication](usage/configuration/config_documentation.md#redis)
* You will need to add an [`instance_map`](usage/configuration/config_documentation.md#instance_map) * You will need to add an [`instance_map`](usage/configuration/config_documentation.md#instance_map)
with the `main` process defined, as well as the relevant connection information from with the `main` process defined, as well as the relevant connection information from
it's HTTP `replication` listener (defined in step 1 above). Note that the `host` defined it's HTTP `replication` listener (defined in step 1 above).
is the address the worker needs to look for the `main` process at, not necessarily the same address that is bound to. * Note that the `host` defined is the address the worker needs to look for the `main`
process at, not necessarily the same address that is bound to.
* If you are using Unix sockets for the `replication` resource, make sure to
use a `path` to the socket file instead of a `port`.
* Optionally, a [shared secret](usage/configuration/config_documentation.md#worker_replication_secret) * Optionally, a [shared secret](usage/configuration/config_documentation.md#worker_replication_secret)
can be used to authenticate HTTP traffic between workers. For example: can be used to authenticate HTTP traffic between workers. For example:

@ -253,6 +253,10 @@ if [[ -n "$ASYNCIO_REACTOR" ]]; then
export PASS_SYNAPSE_COMPLEMENT_USE_ASYNCIO_REACTOR=true export PASS_SYNAPSE_COMPLEMENT_USE_ASYNCIO_REACTOR=true
fi fi
if [[ -n "$UNIX_SOCKETS" ]]; then
# Enable full on Unix socket mode for Synapse, Redis and Postgresql
export PASS_SYNAPSE_USE_UNIX_SOCKET=1
fi
if [[ -n "$SYNAPSE_TEST_LOG_LEVEL" ]]; then if [[ -n "$SYNAPSE_TEST_LOG_LEVEL" ]]; then
# Set the log level to what is desired # Set the log level to what is desired

@ -94,7 +94,7 @@ class ConfigModel(BaseModel):
allow_mutation = False allow_mutation = False
class InstanceLocationConfig(ConfigModel): class InstanceTcpLocationConfig(ConfigModel):
"""The host and port to talk to an instance via HTTP replication.""" """The host and port to talk to an instance via HTTP replication."""
host: StrictStr host: StrictStr
@ -110,6 +110,23 @@ class InstanceLocationConfig(ConfigModel):
return f"{self.host}:{self.port}" return f"{self.host}:{self.port}"
class InstanceUnixLocationConfig(ConfigModel):
"""The socket file to talk to an instance via HTTP replication."""
path: StrictStr
def scheme(self) -> str:
"""Hardcode a retrievable scheme"""
return "unix"
def netloc(self) -> str:
"""Nicely format the address location data"""
return f"{self.path}"
InstanceLocationConfig = Union[InstanceTcpLocationConfig, InstanceUnixLocationConfig]
@attr.s @attr.s
class WriterLocations: class WriterLocations:
"""Specifies the instances that write various streams. """Specifies the instances that write various streams.
@ -270,9 +287,12 @@ class WorkerConfig(Config):
% MAIN_PROCESS_INSTANCE_MAP_NAME % MAIN_PROCESS_INSTANCE_MAP_NAME
) )
# type-ignore: the expression `Union[A, B]` is not a Type[Union[A, B]] currently
self.instance_map: Dict[ self.instance_map: Dict[
str, InstanceLocationConfig str, InstanceLocationConfig
] = parse_and_validate_mapping(instance_map, InstanceLocationConfig) ] = parse_and_validate_mapping(
instance_map, InstanceLocationConfig # type: ignore[arg-type]
)
# Map from type of streams to source, c.f. WriterLocations. # Map from type of streams to source, c.f. WriterLocations.
writers = config.get("stream_writers") or {} writers = config.get("stream_writers") or {}

@ -18,7 +18,11 @@ from typing import Dict, Optional
from zope.interface import implementer from zope.interface import implementer
from twisted.internet import defer from twisted.internet import defer
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS from twisted.internet.endpoints import (
HostnameEndpoint,
UNIXClientEndpoint,
wrapClientTLS,
)
from twisted.internet.interfaces import IStreamClientEndpoint from twisted.internet.interfaces import IStreamClientEndpoint
from twisted.python.failure import Failure from twisted.python.failure import Failure
from twisted.web.client import URI, HTTPConnectionPool, _AgentBase from twisted.web.client import URI, HTTPConnectionPool, _AgentBase
@ -32,7 +36,11 @@ from twisted.web.iweb import (
IResponse, IResponse,
) )
from synapse.config.workers import InstanceLocationConfig from synapse.config.workers import (
InstanceLocationConfig,
InstanceTcpLocationConfig,
InstanceUnixLocationConfig,
)
from synapse.types import ISynapseReactor from synapse.types import ISynapseReactor
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -40,7 +48,7 @@ logger = logging.getLogger(__name__)
@implementer(IAgentEndpointFactory) @implementer(IAgentEndpointFactory)
class ReplicationEndpointFactory: class ReplicationEndpointFactory:
"""Connect to a given TCP socket""" """Connect to a given TCP or UNIX socket"""
def __init__( def __init__(
self, self,
@ -64,24 +72,27 @@ class ReplicationEndpointFactory:
# The given URI has a special scheme and includes the worker name. The # The given URI has a special scheme and includes the worker name. The
# actual connection details are pulled from the instance map. # actual connection details are pulled from the instance map.
worker_name = uri.netloc.decode("utf-8") worker_name = uri.netloc.decode("utf-8")
scheme = self.instance_map[worker_name].scheme() location_config = self.instance_map[worker_name]
scheme = location_config.scheme()
if scheme in ("http", "https"): if isinstance(location_config, InstanceTcpLocationConfig):
endpoint = HostnameEndpoint( endpoint = HostnameEndpoint(
self.reactor, self.reactor,
self.instance_map[worker_name].host, location_config.host,
self.instance_map[worker_name].port, location_config.port,
) )
if scheme == "https": if scheme == "https":
endpoint = wrapClientTLS( endpoint = wrapClientTLS(
# The 'port' argument below isn't actually used by the function # The 'port' argument below isn't actually used by the function
self.context_factory.creatorForNetloc( self.context_factory.creatorForNetloc(
self.instance_map[worker_name].host.encode("utf-8"), location_config.host.encode("utf-8"),
self.instance_map[worker_name].port, location_config.port,
), ),
endpoint, endpoint,
) )
return endpoint return endpoint
elif isinstance(location_config, InstanceUnixLocationConfig):
return UNIXClientEndpoint(self.reactor, location_config.path)
else: else:
raise SchemeNotSupported(f"Unsupported scheme: {scheme}") raise SchemeNotSupported(f"Unsupported scheme: {scheme}")
@ -138,13 +149,16 @@ class ReplicationAgent(_AgentBase):
An existing connection from the connection pool may be used or a new An existing connection from the connection pool may be used or a new
one may be created. one may be created.
Currently, HTTP and HTTPS schemes are supported in uri. Currently, HTTP, HTTPS and UNIX schemes are supported in uri.
This is copied from twisted.web.client.Agent, except: This is copied from twisted.web.client.Agent, except:
* It uses a different pool key (combining the host & port). * It uses a different pool key (combining the scheme with either host & port or
* It does not call _ensureValidURI(...) since it breaks on some socket path).
UNIX paths. * It does not call _ensureValidURI(...) as the strictness of IDNA2008 is not
required when using a worker's name as a 'hostname' for Synapse HTTP
Replication machinery. Specifically, this allows a range of ascii characters
such as '+' and '_' in hostnames/worker's names.
See: twisted.web.iweb.IAgent.request See: twisted.web.iweb.IAgent.request
""" """
@ -154,9 +168,12 @@ class ReplicationAgent(_AgentBase):
except SchemeNotSupported: except SchemeNotSupported:
return defer.fail(Failure()) return defer.fail(Failure())
worker_name = parsedURI.netloc.decode("utf-8")
key_scheme = self._endpointFactory.instance_map[worker_name].scheme()
key_netloc = self._endpointFactory.instance_map[worker_name].netloc()
# This sets the Pool key to be: # This sets the Pool key to be:
# (http(s), <host:ip>) # (http(s), <host:port>) or (unix, <socket_path>)
key = (parsedURI.scheme, parsedURI.netloc) key = (key_scheme, key_netloc)
# _requestWithEndpoint comes from _AgentBase class # _requestWithEndpoint comes from _AgentBase class
return self._requestWithEndpoint( return self._requestWithEndpoint(

@ -1070,7 +1070,7 @@ def trace_servlet(
tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER, tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
tags.HTTP_METHOD: request.get_method(), tags.HTTP_METHOD: request.get_method(),
tags.HTTP_URL: request.get_redacted_uri(), tags.HTTP_URL: request.get_redacted_uri(),
tags.PEER_HOST_IPV6: request.getClientAddress().host, tags.PEER_HOST_IPV6: request.get_client_ip_if_available(),
} }
request_name = request.request_metrics.name request_name = request.request_metrics.name
@ -1091,9 +1091,11 @@ def trace_servlet(
# with JsonResource). # with JsonResource).
scope.span.set_operation_name(request.request_metrics.name) scope.span.set_operation_name(request.request_metrics.name)
# Mypy seems to think that start_context.tag below can be Optional[str], but
# that doesn't appear to be correct and works in practice.
request_tags[ request_tags[
SynapseTags.REQUEST_TAG SynapseTags.REQUEST_TAG
] = request.request_metrics.start_context.tag ] = request.request_metrics.start_context.tag # type: ignore[assignment]
# set the tags *after* the servlet completes, in case it decided to # set the tags *after* the servlet completes, in case it decided to
# prioritise the span (tags will get dropped on unprioritised spans) # prioritise the span (tags will get dropped on unprioritised spans)

@ -22,6 +22,7 @@ from twisted.test.proto_helpers import MemoryReactor
from twisted.web.resource import Resource from twisted.web.resource import Resource
from synapse.app.generic_worker import GenericWorkerServer from synapse.app.generic_worker import GenericWorkerServer
from synapse.config.workers import InstanceTcpLocationConfig, InstanceUnixLocationConfig
from synapse.http.site import SynapseRequest, SynapseSite from synapse.http.site import SynapseRequest, SynapseSite
from synapse.replication.http import ReplicationRestResource from synapse.replication.http import ReplicationRestResource
from synapse.replication.tcp.client import ReplicationDataHandler from synapse.replication.tcp.client import ReplicationDataHandler
@ -339,7 +340,7 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase):
# `_handle_http_replication_attempt` like we do with the master HS. # `_handle_http_replication_attempt` like we do with the master HS.
instance_name = worker_hs.get_instance_name() instance_name = worker_hs.get_instance_name()
instance_loc = worker_hs.config.worker.instance_map.get(instance_name) instance_loc = worker_hs.config.worker.instance_map.get(instance_name)
if instance_loc: if instance_loc and isinstance(instance_loc, InstanceTcpLocationConfig):
# Ensure the host is one that has a fake DNS entry. # Ensure the host is one that has a fake DNS entry.
if instance_loc.host not in self.reactor.lookups: if instance_loc.host not in self.reactor.lookups:
raise Exception( raise Exception(
@ -360,6 +361,10 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase):
instance_loc.port, instance_loc.port,
lambda: self._handle_http_replication_attempt(worker_hs, port), lambda: self._handle_http_replication_attempt(worker_hs, port),
) )
elif instance_loc and isinstance(instance_loc, InstanceUnixLocationConfig):
raise Exception(
"Unix sockets are not supported for unit tests at this time."
)
store = worker_hs.get_datastores().main store = worker_hs.get_datastores().main
store.db_pool._db_pool = self.database_pool._db_pool store.db_pool._db_pool = self.database_pool._db_pool

@ -53,6 +53,7 @@ from twisted.internet.interfaces import (
IConnector, IConnector,
IConsumer, IConsumer,
IHostnameResolver, IHostnameResolver,
IListeningPort,
IProducer, IProducer,
IProtocol, IProtocol,
IPullProducer, IPullProducer,
@ -62,7 +63,7 @@ from twisted.internet.interfaces import (
IResolverSimple, IResolverSimple,
ITransport, ITransport,
) )
from twisted.internet.protocol import ClientFactory, DatagramProtocol from twisted.internet.protocol import ClientFactory, DatagramProtocol, Factory
from twisted.python import threadpool from twisted.python import threadpool
from twisted.python.failure import Failure from twisted.python.failure import Failure
from twisted.test.proto_helpers import AccumulatingProtocol, MemoryReactorClock from twisted.test.proto_helpers import AccumulatingProtocol, MemoryReactorClock
@ -523,6 +524,35 @@ class ThreadedMemoryReactorClock(MemoryReactorClock):
""" """
self._tcp_callbacks[(host, port)] = callback self._tcp_callbacks[(host, port)] = callback
def connectUNIX(
self,
address: str,
factory: ClientFactory,
timeout: float = 30,
checkPID: int = 0,
) -> IConnector:
"""
Unix sockets aren't supported for unit tests yet. Make it obvious to any
developer trying it out that they will need to do some work before being able
to use it in tests.
"""
raise Exception("Unix sockets are not implemented for tests yet, sorry.")
def listenUNIX(
self,
address: str,
factory: Factory,
backlog: int = 50,
mode: int = 0o666,
wantPID: int = 0,
) -> IListeningPort:
"""
Unix sockets aren't supported for unit tests yet. Make it obvious to any
developer trying it out that they will need to do some work before being able
to use it in tests.
"""
raise Exception("Unix sockets are not implemented for tests, sorry")
def connectTCP( def connectTCP(
self, self,
host: str, host: str,

Loading…
Cancel
Save