|
|
|
@ -20,18 +20,28 @@ import urllib |
|
|
|
|
from inspect import signature |
|
|
|
|
from typing import Dict, List, Tuple |
|
|
|
|
|
|
|
|
|
from synapse.api.errors import ( |
|
|
|
|
CodeMessageException, |
|
|
|
|
HttpResponseException, |
|
|
|
|
RequestSendFailed, |
|
|
|
|
SynapseError, |
|
|
|
|
) |
|
|
|
|
from prometheus_client import Counter, Gauge |
|
|
|
|
|
|
|
|
|
from synapse.api.errors import HttpResponseException, SynapseError |
|
|
|
|
from synapse.http import RequestTimedOutError |
|
|
|
|
from synapse.logging.opentracing import inject_active_span_byte_dict, trace |
|
|
|
|
from synapse.util.caches.response_cache import ResponseCache |
|
|
|
|
from synapse.util.stringutils import random_string |
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
_pending_outgoing_requests = Gauge( |
|
|
|
|
"synapse_pending_outgoing_replication_requests", |
|
|
|
|
"Number of active outgoing replication requests, by replication method name", |
|
|
|
|
["name"], |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
_outgoing_request_counter = Counter( |
|
|
|
|
"synapse_outgoing_replication_requests", |
|
|
|
|
"Number of outgoing replication requests, by replication method name and result", |
|
|
|
|
["name", "code"], |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ReplicationEndpoint(metaclass=abc.ABCMeta): |
|
|
|
|
"""Helper base class for defining new replication HTTP endpoints. |
|
|
|
@ -138,7 +148,10 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): |
|
|
|
|
|
|
|
|
|
instance_map = hs.config.worker.instance_map |
|
|
|
|
|
|
|
|
|
outgoing_gauge = _pending_outgoing_requests.labels(cls.NAME) |
|
|
|
|
|
|
|
|
|
@trace(opname="outgoing_replication_request") |
|
|
|
|
@outgoing_gauge.track_inprogress() |
|
|
|
|
async def send_request(instance_name="master", **kwargs): |
|
|
|
|
if instance_name == local_instance_name: |
|
|
|
|
raise Exception("Trying to send HTTP request to self") |
|
|
|
@ -193,23 +206,26 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): |
|
|
|
|
try: |
|
|
|
|
result = await request_func(uri, data, headers=headers) |
|
|
|
|
break |
|
|
|
|
except CodeMessageException as e: |
|
|
|
|
if e.code != 504 or not cls.RETRY_ON_TIMEOUT: |
|
|
|
|
except RequestTimedOutError: |
|
|
|
|
if not cls.RETRY_ON_TIMEOUT: |
|
|
|
|
raise |
|
|
|
|
|
|
|
|
|
logger.warning("%s request timed out", cls.NAME) |
|
|
|
|
logger.warning("%s request timed out; retrying", cls.NAME) |
|
|
|
|
|
|
|
|
|
# If we timed out we probably don't need to worry about backing |
|
|
|
|
# off too much, but lets just wait a little anyway. |
|
|
|
|
await clock.sleep(1) |
|
|
|
|
except HttpResponseException as e: |
|
|
|
|
# We convert to SynapseError as we know that it was a SynapseError |
|
|
|
|
# on the master process that we should send to the client. (And |
|
|
|
|
# on the main process that we should send to the client. (And |
|
|
|
|
# importantly, not stack traces everywhere) |
|
|
|
|
_outgoing_request_counter.labels(cls.NAME, e.code).inc() |
|
|
|
|
raise e.to_synapse_error() |
|
|
|
|
except RequestSendFailed as e: |
|
|
|
|
raise SynapseError(502, "Failed to talk to master") from e |
|
|
|
|
except Exception as e: |
|
|
|
|
_outgoing_request_counter.labels(cls.NAME, "ERR").inc() |
|
|
|
|
raise SynapseError(502, "Failed to talk to main process") from e |
|
|
|
|
|
|
|
|
|
_outgoing_request_counter.labels(cls.NAME, 200).inc() |
|
|
|
|
return result |
|
|
|
|
|
|
|
|
|
return send_request |
|
|
|
|