|
|
|
@ -17,19 +17,25 @@ |
|
|
|
|
import synapse |
|
|
|
|
|
|
|
|
|
from synapse.server import HomeServer |
|
|
|
|
from synapse.util.versionstring import get_version_string |
|
|
|
|
from synapse.config._base import ConfigError |
|
|
|
|
from synapse.config.database import DatabaseConfig |
|
|
|
|
from synapse.config.logger import LoggingConfig |
|
|
|
|
from synapse.http.site import SynapseSite |
|
|
|
|
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX |
|
|
|
|
from synapse.replication.slave.storage.events import SlavedEventStore |
|
|
|
|
from synapse.replication.slave.storage.pushers import SlavedPusherStore |
|
|
|
|
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore |
|
|
|
|
from synapse.storage.engines import create_engine |
|
|
|
|
from synapse.storage import DataStore |
|
|
|
|
from synapse.util.async import sleep |
|
|
|
|
from synapse.util.logcontext import (LoggingContext, preserve_fn) |
|
|
|
|
from synapse.util.httpresourcetree import create_resource_tree |
|
|
|
|
from synapse.util.logcontext import LoggingContext, preserve_fn |
|
|
|
|
from synapse.util.manhole import manhole |
|
|
|
|
from synapse.util.rlimit import change_resource_limit |
|
|
|
|
from synapse.util.versionstring import get_version_string |
|
|
|
|
|
|
|
|
|
from twisted.internet import reactor, defer |
|
|
|
|
from twisted.web.resource import Resource |
|
|
|
|
|
|
|
|
|
import sys |
|
|
|
|
import logging |
|
|
|
@ -46,12 +52,28 @@ class SlaveConfig(DatabaseConfig): |
|
|
|
|
) |
|
|
|
|
self.user_agent_suffix = None |
|
|
|
|
self.start_pushers = True |
|
|
|
|
self.listeners = config["listeners"] |
|
|
|
|
self.soft_file_limit = config.get("soft_file_limit") |
|
|
|
|
|
|
|
|
|
def default_config(self, **kwargs): |
|
|
|
|
return """\ |
|
|
|
|
## Slave ## |
|
|
|
|
# The replication listener on the synapse to talk to. |
|
|
|
|
#replication_url: https://localhost:{replication_port}/_synapse/replication |
|
|
|
|
|
|
|
|
|
listeners: [] |
|
|
|
|
# Uncomment to enable a ssh manhole listener on the pusher. |
|
|
|
|
# - type: manhole |
|
|
|
|
# port: {manhole_port} |
|
|
|
|
# bind_address: 127.0.0.1 |
|
|
|
|
# Uncomment to enable a metric listener on the pusher. |
|
|
|
|
# - type: http |
|
|
|
|
# port: {metrics_port} |
|
|
|
|
# bind_address: 127.0.0.1 |
|
|
|
|
# resources: |
|
|
|
|
# - names: ["metrics"], |
|
|
|
|
# compress: False |
|
|
|
|
|
|
|
|
|
report_stats: False |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
@ -100,6 +122,46 @@ class PusherServer(HomeServer): |
|
|
|
|
}] |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
def _listen_http(self, listener_config): |
|
|
|
|
port = listener_config["port"] |
|
|
|
|
bind_address = listener_config.get("bind_address", "") |
|
|
|
|
site_tag = listener_config.get("tag", port) |
|
|
|
|
resources = {} |
|
|
|
|
for res in listener_config["resources"]: |
|
|
|
|
for name in res["names"]: |
|
|
|
|
if name == "metrics": |
|
|
|
|
resources[METRICS_PREFIX] = MetricsResource(self) |
|
|
|
|
|
|
|
|
|
root_resource = create_resource_tree(resources, Resource()) |
|
|
|
|
reactor.listenTCP( |
|
|
|
|
port, |
|
|
|
|
SynapseSite( |
|
|
|
|
"synapse.access.http.%s" % (site_tag,), |
|
|
|
|
site_tag, |
|
|
|
|
listener_config, |
|
|
|
|
root_resource, |
|
|
|
|
), |
|
|
|
|
interface=bind_address |
|
|
|
|
) |
|
|
|
|
logger.info("Synapse pusher now listening on port %d", port) |
|
|
|
|
|
|
|
|
|
def start_listening(self): |
|
|
|
|
for listener in self.config.listeners: |
|
|
|
|
if listener["type"] == "http": |
|
|
|
|
self._listen_http(listener) |
|
|
|
|
elif listener["type"] == "manhole": |
|
|
|
|
reactor.listenTCP( |
|
|
|
|
listener["port"], |
|
|
|
|
manhole( |
|
|
|
|
username="matrix", |
|
|
|
|
password="rabbithole", |
|
|
|
|
globals={"hs": self}, |
|
|
|
|
), |
|
|
|
|
interface=listener.get("bind_address", '127.0.0.1') |
|
|
|
|
) |
|
|
|
|
else: |
|
|
|
|
logger.warn("Unrecognized listener type: %s", listener["type"]) |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def replicate(self): |
|
|
|
|
http_client = self.get_simple_http_client() |
|
|
|
@ -191,6 +253,9 @@ def setup(config_options): |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
ps.setup() |
|
|
|
|
ps.start_listening() |
|
|
|
|
|
|
|
|
|
change_resource_limit(ps.config.soft_file_limit) |
|
|
|
|
|
|
|
|
|
def start(): |
|
|
|
|
ps.replicate() |
|
|
|
|