@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import logging
import time
@ -19,8 +18,8 @@ import time
from twisted . web . server import Request , Site
from synapse . http import redact_uri
from synapse . http . request_metrics import RequestMetrics
from synapse . util . logcontext import ContextResourceUsage , LoggingContext
from synapse . http . request_metrics import RequestMetrics , requests_counter
from synapse . util . logcontext import Logging Context, Preserve LoggingContext
logger = logging . getLogger ( __name__ )
@ -34,25 +33,43 @@ class SynapseRequest(Request):
It extends twisted ' s twisted.web.server.Request, and adds:
* Unique request ID
* A log context associated with the request
* Redaction of access_token query - params in __repr__
* Logging at start and end
* Metrics to record CPU , wallclock and DB time by endpoint .
It provides a method ` processing ` which should be called by the Resource
which is handling the request , and returns a context manager .
It also provides a method ` processing ` , which returns a context manager . If this
method is called , the request won ' t be logged until the context manager is closed;
this is useful for asynchronous request handlers which may go on processing the
request even after the client has disconnected .
Attributes :
logcontext ( LoggingContext ) : the log context for this request
"""
def __init__ ( self , site , channel , * args , * * kw ) :
Request . __init__ ( self , channel , * args , * * kw )
self . site = site
self . _channel = channel
self . _channel = channel # this is used by the tests
self . authenticated_entity = None
self . start_time = 0
# we can't yet create the logcontext, as we don't know the method.
self . logcontext = None
global _next_request_seq
self . request_seq = _next_request_seq
_next_request_seq + = 1
# whether an asynchronous request handler has called processing()
self . _is_processing = False
# the time when the asynchronous request handler completed its processing
self . _processing_finished_time = None
# what time we finished sending the response to the client (or the connection
# dropped)
self . finish_time = None
def __repr__ ( self ) :
# We overwrite this so that we don't log ``access_token``
return ' < %s at 0x %x method= %r uri= %r clientproto= %r site= %r > ' % (
@ -74,11 +91,116 @@ class SynapseRequest(Request):
return self . requestHeaders . getRawHeaders ( b " User-Agent " , [ None ] ) [ - 1 ]
def render ( self , resrc ) :
# this is called once a Resource has been found to serve the request; in our
# case the Resource in question will normally be a JsonResource.
# create a LogContext for this request
request_id = self . get_request_id ( )
logcontext = self . logcontext = LoggingContext ( request_id )
logcontext . request = request_id
# override the Server header which is set by twisted
self . setHeader ( " Server " , self . site . server_version_string )
return Request . render ( self , resrc )
with PreserveLoggingContext ( self . logcontext ) :
# we start the request metrics timer here with an initial stab
# at the servlet name. For most requests that name will be
# JsonResource (or a subclass), and JsonResource._async_render
# will update it once it picks a servlet.
servlet_name = resrc . __class__ . __name__
self . _started_processing ( servlet_name )
Request . render ( self , resrc )
# record the arrival of the request *after*
# dispatching to the handler, so that the handler
# can update the servlet name in the request
# metrics
requests_counter . labels ( self . method ,
self . request_metrics . name ) . inc ( )
@contextlib . contextmanager
def processing ( self ) :
""" Record the fact that we are processing this request.
Returns a context manager ; the correct way to use this is :
@defer . inlineCallbacks
def handle_request ( request ) :
with request . processing ( " FooServlet " ) :
yield really_handle_the_request ( )
Once the context manager is closed , the completion of the request will be logged ,
and the various metrics will be updated .
"""
if self . _is_processing :
raise RuntimeError ( " Request is already processing " )
self . _is_processing = True
try :
yield
except Exception :
# this should already have been caught, and sent back to the client as a 500.
logger . exception ( " Asynchronous messge handler raised an uncaught exception " )
finally :
# the request handler has finished its work and either sent the whole response
# back, or handed over responsibility to a Producer.
self . _processing_finished_time = time . time ( )
self . _is_processing = False
# if we've already sent the response, log it now; otherwise, we wait for the
# response to be sent.
if self . finish_time is not None :
self . _finished_processing ( )
def finish ( self ) :
""" Called when all response data has been written to this Request.
Overrides twisted . web . server . Request . finish to record the finish time and do
logging .
"""
self . finish_time = time . time ( )
Request . finish ( self )
if not self . _is_processing :
with PreserveLoggingContext ( self . logcontext ) :
self . _finished_processing ( )
def connectionLost ( self , reason ) :
""" Called when the client connection is closed before the response is written.
Overrides twisted . web . server . Request . connectionLost to record the finish time and
do logging .
"""
self . finish_time = time . time ( )
Request . connectionLost ( self , reason )
# we only get here if the connection to the client drops before we send
# the response.
#
# It's useful to log it here so that we can get an idea of when
# the client disconnects.
with PreserveLoggingContext ( self . logcontext ) :
logger . warn (
" Error processing request: %s %s " , reason . type , reason . value ,
)
if not self . _is_processing :
self . _finished_processing ( )
def _started_processing ( self , servlet_name ) :
""" Record the fact that we are processing this request.
This will log the request ' s arrival. Once the request completes,
be sure to call finished_processing .
Args :
servlet_name ( str ) : the name of the servlet which will be
processing this request . This is used in the metrics .
It is possible to update this afterwards by updating
self . request_metrics . name .
"""
self . start_time = time . time ( )
self . request_metrics = RequestMetrics ( )
self . request_metrics . start (
@ -94,13 +216,21 @@ class SynapseRequest(Request):
)
def _finished_processing ( self ) :
try :
context = LoggingContext . current_context ( )
usage = context . get_resource_usage ( )
except Exception :
usage = ContextResourceUsage ( )
""" Log the completion of this request and update the metrics
"""
usage = self . logcontext . get_resource_usage ( )
if self . _processing_finished_time is None :
# we completed the request without anything calling processing()
self . _processing_finished_time = time . time ( )
end_time = time . time ( )
# the time between receiving the request and the request handler finishing
processing_time = self . _processing_finished_time - self . start_time
# the time between the request handler finishing and the response being sent
# to the client (nb may be negative)
response_send_time = self . finish_time - self . _processing_finished_time
# need to decode as it could be raw utf-8 bytes
# from a IDN servname in an auth header
@ -116,22 +246,31 @@ class SynapseRequest(Request):
user_agent = self . get_user_agent ( )
if user_agent is not None :
user_agent = user_agent . decode ( " utf-8 " , " replace " )
else :
user_agent = " - "
code = str ( self . code )
if not self . finished :
# we didn't send the full response before we gave up (presumably because
# the connection dropped)
code + = " ! "
self . site . access_logger . info (
" %s - %s - { %s } "
" Processed request: %.3f sec ( %.3f sec, %.3f sec) ( %.3f sec/ %.3f sec/ %d ) "
" Processed request: %.3f sec/ %.3f sec ( %.3f sec, %.3f sec) ( %.3f sec/ %.3f sec/ %d ) "
" %s B %s \" %s %s %s \" \" %s \" [ %d dbevts] " ,
self . getClientIP ( ) ,
self . site . site_tag ,
authenticated_entity ,
end_time - self . start_time ,
processing_time ,
response_send_time ,
usage . ru_utime ,
usage . ru_stime ,
usage . db_sched_duration_sec ,
usage . db_txn_duration_sec ,
int ( usage . db_txn_count ) ,
self . sentLength ,
self . code ,
code ,
self . method ,
self . get_redacted_uri ( ) ,
self . clientproto ,
@ -140,38 +279,10 @@ class SynapseRequest(Request):
)
try :
self . request_metrics . stop ( end _time, self )
self . request_metrics . stop ( self . finish _time, self )
except Exception as e :
logger . warn ( " Failed to stop metrics: %r " , e )
@contextlib . contextmanager
def processing ( self , servlet_name ) :
""" Record the fact that we are processing this request.
Returns a context manager ; the correct way to use this is :
@defer . inlineCallbacks
def handle_request ( request ) :
with request . processing ( " FooServlet " ) :
yield really_handle_the_request ( )
This will log the request ' s arrival. Once the context manager is
closed , the completion of the request will be logged , and the various
metrics will be updated .
Args :
servlet_name ( str ) : the name of the servlet which will be
processing this request . This is used in the metrics .
It is possible to update this afterwards by updating
self . request_metrics . servlet_name .
"""
# TODO: we should probably just move this into render() and finish(),
# to save having to call a separate method.
self . _started_processing ( servlet_name )
yield
self . _finished_processing ( )
class XForwardedForRequest ( SynapseRequest ) :
def __init__ ( self , * args , * * kw ) :