mirror of https://github.com/watcha-fr/synapse
parent
0a6a966e2b
commit
52bfa604e1
@ -0,0 +1,196 @@ |
||||
# -*- coding: utf-8 -*- |
||||
# Copyright 2017 Vector Creations Ltd |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
"""A replication client for use by synapse workers. |
||||
""" |
||||
|
||||
from twisted.internet import reactor, defer |
||||
from twisted.internet.protocol import ReconnectingClientFactory |
||||
|
||||
from .commands import ( |
||||
FederationAckCommand, UserSyncCommand, RemovePusherCommand, InvalidateCacheCommand, |
||||
) |
||||
from .protocol import ClientReplicationStreamProtocol |
||||
|
||||
import logging |
||||
|
||||
logger = logging.getLogger(__name__) |
||||
|
||||
|
||||
class ReplicationClientFactory(ReconnectingClientFactory): |
||||
"""Factory for building connections to the master. Will reconnect if the |
||||
connection is lost. |
||||
|
||||
Accepts a handler that will be called when new data is available or data |
||||
is required. |
||||
""" |
||||
maxDelay = 5 # Try at least once every N seconds |
||||
|
||||
def __init__(self, hs, client_name, handler): |
||||
self.client_name = client_name |
||||
self.handler = handler |
||||
self.server_name = hs.config.server_name |
||||
self._clock = hs.get_clock() # As self.clock is defined in super class |
||||
|
||||
reactor.addSystemEventTrigger("before", "shutdown", self.stopTrying) |
||||
|
||||
def startedConnecting(self, connector): |
||||
logger.info("Connecting to replication: %r", connector.getDestination()) |
||||
|
||||
def buildProtocol(self, addr): |
||||
logger.info("Connected to replication: %r", addr) |
||||
self.resetDelay() |
||||
return ClientReplicationStreamProtocol( |
||||
self.client_name, self.server_name, self._clock, self.handler |
||||
) |
||||
|
||||
def clientConnectionLost(self, connector, reason): |
||||
logger.error("Lost replication conn: %r", reason) |
||||
ReconnectingClientFactory.clientConnectionLost(self, connector, reason) |
||||
|
||||
def clientConnectionFailed(self, connector, reason): |
||||
logger.error("Failed to connect to replication: %r", reason) |
||||
ReconnectingClientFactory.clientConnectionFailed( |
||||
self, connector, reason |
||||
) |
||||
|
||||
|
||||
class ReplicationClientHandler(object): |
||||
"""A base handler that can be passed to the ReplicationClientFactory. |
||||
|
||||
By default proxies incoming replication data to the SlaveStore. |
||||
""" |
||||
def __init__(self, store): |
||||
self.store = store |
||||
|
||||
# The current connection. None if we are currently (re)connecting |
||||
self.connection = None |
||||
|
||||
# Any pending commands to be sent once a new connection has been |
||||
# established |
||||
self.pending_commands = [] |
||||
|
||||
# Map from string -> deferred, to wake up when receiveing a SYNC with |
||||
# the given string. |
||||
# Used for tests. |
||||
self.awaiting_syncs = {} |
||||
|
||||
def start_replication(self, hs): |
||||
"""Helper method to start a replication connection to the remote server |
||||
using TCP. |
||||
""" |
||||
client_name = hs.config.worker_name |
||||
factory = ReplicationClientFactory(hs, client_name, self) |
||||
host = hs.config.worker_replication_host |
||||
port = hs.config.worker_replication_port |
||||
reactor.connectTCP(host, port, factory) |
||||
|
||||
def on_rdata(self, stream_name, token, rows): |
||||
"""Called when we get new replication data. By default this just pokes |
||||
the slave store. |
||||
|
||||
Can be overriden in subclasses to handle more. |
||||
""" |
||||
logger.info("Received rdata %s -> %s", stream_name, token) |
||||
self.store.process_replication_rows(stream_name, token, rows) |
||||
|
||||
def on_position(self, stream_name, token): |
||||
"""Called when we get new position data. By default this just pokes |
||||
the slave store. |
||||
|
||||
Can be overriden in subclasses to handle more. |
||||
""" |
||||
self.store.process_replication_rows(stream_name, token, []) |
||||
|
||||
def on_sync(self, data): |
||||
"""When we received a SYNC we wake up any deferreds that were waiting |
||||
for the sync with the given data. |
||||
|
||||
Used by tests. |
||||
""" |
||||
d = self.awaiting_syncs.pop(data, None) |
||||
if d: |
||||
d.callback(data) |
||||
|
||||
def get_streams_to_replicate(self): |
||||
"""Called when a new connection has been established and we need to |
||||
subscribe to streams. |
||||
|
||||
Returns a dictionary of stream name to token. |
||||
""" |
||||
args = self.store.stream_positions() |
||||
user_account_data = args.pop("user_account_data", None) |
||||
room_account_data = args.pop("room_account_data", None) |
||||
if user_account_data: |
||||
args["account_data"] = user_account_data |
||||
elif room_account_data: |
||||
args["account_data"] = room_account_data |
||||
return args |
||||
|
||||
def get_currently_syncing_users(self): |
||||
"""Get the list of currently syncing users (if any). This is called |
||||
when a connection has been established and we need to send the |
||||
currently syncing users. (Overriden by the synchrotron's only) |
||||
""" |
||||
return [] |
||||
|
||||
def send_command(self, cmd): |
||||
"""Send a command to master (when we get establish a connection if we |
||||
don't have one already.) |
||||
""" |
||||
if self.connection: |
||||
self.connection.send_command(cmd) |
||||
else: |
||||
logger.warn("Queuing command as not connected: %r", cmd.NAME) |
||||
self.pending_commands.append(cmd) |
||||
|
||||
def send_federation_ack(self, token): |
||||
"""Ack data for the federation stream. This allows the master to drop |
||||
data stored purely in memory. |
||||
""" |
||||
self.send_command(FederationAckCommand(token)) |
||||
|
||||
def send_user_sync(self, user_id, is_syncing, last_sync_ms): |
||||
"""Poke the master that a user has started/stopped syncing. |
||||
""" |
||||
self.send_command(UserSyncCommand(user_id, is_syncing, last_sync_ms)) |
||||
|
||||
def send_remove_pusher(self, app_id, push_key, user_id): |
||||
"""Poke the master to remove a pusher for a user |
||||
""" |
||||
cmd = RemovePusherCommand(app_id, push_key, user_id) |
||||
self.send_command(cmd) |
||||
|
||||
def send_invalidate_cache(self, cache_func, keys): |
||||
"""Poke the master to invalidate a cache. |
||||
""" |
||||
cmd = InvalidateCacheCommand(cache_func, keys) |
||||
self.send_command(cmd) |
||||
|
||||
def await_sync(self, data): |
||||
"""Returns a deferred that is resolved when we receive a SYNC command |
||||
with given data. |
||||
|
||||
Used by tests. |
||||
""" |
||||
return self.awaiting_syncs.setdefault(data, defer.Deferred()) |
||||
|
||||
def update_connection(self, connection): |
||||
"""Called when a connection has been established (or lost with None). |
||||
""" |
||||
self.connection = connection |
||||
if connection: |
||||
for cmd in self.pending_commands: |
||||
connection.send_command(cmd) |
||||
self.pending_commands = [] |
Loading…
Reference in new issue