@ -15,10 +15,23 @@
import collections
import contextlib
import logging
import threading
import typing
from typing import Any , DefaultDict , Iterator , List , Set
from typing import (
Any ,
Callable ,
DefaultDict ,
Dict ,
Iterator ,
List ,
Mapping ,
Optional ,
Set ,
Tuple ,
)
from prometheus_client . core import Counter
from typing_extensions import ContextManager
from twisted . internet import defer
@ -40,12 +53,20 @@ logger = logging.getLogger(__name__)
# Track how much the ratelimiter is affecting requests
rate_limit_sleep_counter = Counter ( " synapse_rate_limit_sleep " , " " )
rate_limit_reject_counter = Counter ( " synapse_rate_limit_reject " , " " )
rate_limit_sleep_counter = Counter (
" synapse_rate_limit_sleep " ,
" Number of requests slept by the rate limiter " ,
[ " rate_limiter_name " ] ,
)
rate_limit_reject_counter = Counter (
" synapse_rate_limit_reject " ,
" Number of requests rejected by the rate limiter " ,
[ " rate_limiter_name " ] ,
)
queue_wait_timer = Histogram (
" synapse_rate_limit_queue_wait_time_seconds " ,
" sec " ,
[ ] ,
" Amount of time spent waiting for the rate limiter to let our request through. " ,
[ " rate_limiter_name " ] ,
buckets = (
0.005 ,
0.01 ,
@ -65,35 +86,92 @@ queue_wait_timer = Histogram(
)
_rate_limiter_instances : Set [ " FederationRateLimiter " ] = set ( )
# Protects the _rate_limiter_instances set from concurrent access
_rate_limiter_instances_lock = threading . Lock ( )
def _get_counts_from_rate_limiter_instance (
count_func : Callable [ [ " FederationRateLimiter " ] , int ]
) - > Mapping [ Tuple [ str , . . . ] , int ] :
""" Returns a count of something (slept/rejected hosts) by (metrics_name) """
# Cast to a list to prevent it changing while the Prometheus
# thread is collecting metrics
with _rate_limiter_instances_lock :
rate_limiter_instances = list ( _rate_limiter_instances )
# Map from (metrics_name,) -> int, the number of something like slept hosts
# or rejected hosts. The key type is Tuple[str], but we leave the length
# unspecified for compatability with LaterGauge's annotations.
counts : Dict [ Tuple [ str , . . . ] , int ] = { }
for rate_limiter_instance in rate_limiter_instances :
# Only track metrics if they provided a `metrics_name` to
# differentiate this instance of the rate limiter.
if rate_limiter_instance . metrics_name :
key = ( rate_limiter_instance . metrics_name , )
counts [ key ] = count_func ( rate_limiter_instance )
return counts
# We track the number of affected hosts per time-period so we can
# differentiate one really noisy homeserver from a general
# ratelimit tuning problem across the federation.
LaterGauge (
" synapse_rate_limit_sleep_affected_hosts " ,
" Number of hosts that had requests put to sleep " ,
[ " rate_limiter_name " ] ,
lambda : _get_counts_from_rate_limiter_instance (
lambda rate_limiter_instance : sum (
ratelimiter . should_sleep ( )
for ratelimiter in rate_limiter_instance . ratelimiters . values ( )
)
) ,
)
LaterGauge (
" synapse_rate_limit_reject_affected_hosts " ,
" Number of hosts that had requests rejected " ,
[ " rate_limiter_name " ] ,
lambda : _get_counts_from_rate_limiter_instance (
lambda rate_limiter_instance : sum (
ratelimiter . should_reject ( )
for ratelimiter in rate_limiter_instance . ratelimiters . values ( )
)
) ,
)
class FederationRateLimiter :
def __init__ ( self , clock : Clock , config : FederationRatelimitSettings ) :
""" Used to rate limit request per-host. """
def __init__ (
self ,
clock : Clock ,
config : FederationRatelimitSettings ,
metrics_name : Optional [ str ] = None ,
) :
"""
Args :
clock
config
metrics_name : The name of the rate limiter so we can differentiate it
from the rest in the metrics . If ` None ` , we don ' t track metrics
for this rate limiter .
"""
self . metrics_name = metrics_name
def new_limiter ( ) - > " _PerHostRatelimiter " :
return _PerHostRatelimiter ( clock = clock , config = config )
return _PerHostRatelimiter (
clock = clock , config = config , metrics_name = metrics_name
)
self . ratelimiters : DefaultDict [
str , " _PerHostRatelimiter "
] = collections . defaultdict ( new_limiter )
# We track the number of affected hosts per time-period so we can
# differentiate one really noisy homeserver from a general
# ratelimit tuning problem across the federation.
LaterGauge (
" synapse_rate_limit_sleep_affected_hosts " ,
" Number of hosts that had requests put to sleep " ,
[ ] ,
lambda : sum (
ratelimiter . should_sleep ( ) for ratelimiter in self . ratelimiters . values ( )
) ,
)
LaterGauge (
" synapse_rate_limit_reject_affected_hosts " ,
" Number of hosts that had requests rejected " ,
[ ] ,
lambda : sum (
ratelimiter . should_reject ( )
for ratelimiter in self . ratelimiters . values ( )
) ,
)
with _rate_limiter_instances_lock :
_rate_limiter_instances . add ( self )
def ratelimit ( self , host : str ) - > " _GeneratorContextManager[defer.Deferred[None]] " :
""" Used to ratelimit an incoming request from a given host
@ -114,13 +192,23 @@ class FederationRateLimiter:
class _PerHostRatelimiter :
def __init__ ( self , clock : Clock , config : FederationRatelimitSettings ) :
def __init__ (
self ,
clock : Clock ,
config : FederationRatelimitSettings ,
metrics_name : Optional [ str ] = None ,
) :
"""
Args :
clock
config
metrics_name : The name of the rate limiter so we can differentiate it
from the rest in the metrics . If ` None ` , we don ' t track metrics
for this rate limiter .
from the rest in the metrics
"""
self . clock = clock
self . metrics_name = metrics_name
self . window_size = config . window_size
self . sleep_limit = config . sleep_limit
@ -178,7 +266,10 @@ class _PerHostRatelimiter:
return len ( self . request_times ) > self . sleep_limit
async def _on_enter_with_tracing ( self , request_id : object ) - > None :
with start_active_span ( " ratelimit wait " ) , queue_wait_timer . time ( ) :
maybe_metrics_cm : ContextManager = contextlib . nullcontext ( )
if self . metrics_name :
maybe_metrics_cm = queue_wait_timer . labels ( self . metrics_name ) . time ( )
with start_active_span ( " ratelimit wait " ) , maybe_metrics_cm :
await self . _on_enter ( request_id )
def _on_enter ( self , request_id : object ) - > " defer.Deferred[None] " :
@ -193,7 +284,8 @@ class _PerHostRatelimiter:
# sleeping or in the ready queue).
if self . should_reject ( ) :
logger . debug ( " Ratelimiter( %s ): rejecting request " , self . host )
rate_limit_reject_counter . inc ( )
if self . metrics_name :
rate_limit_reject_counter . labels ( self . metrics_name ) . inc ( )
raise LimitExceededError (
retry_after_ms = int ( self . window_size / self . sleep_limit )
)
@ -228,7 +320,8 @@ class _PerHostRatelimiter:
id ( request_id ) ,
self . sleep_sec ,
)
rate_limit_sleep_counter . inc ( )
if self . metrics_name :
rate_limit_sleep_counter . labels ( self . metrics_name ) . inc ( )
ret_defer = run_in_background ( self . clock . sleep , self . sleep_sec )
self . sleeping_requests . add ( request_id )