Expose statistics on extrems to prometheus (#5384)

code_spécifique_watcha
Amber Brown 6 years ago committed by GitHub
parent 09e9a26b71
commit 6312d6cc7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      changelog.d/5384.feature
  2. 2
      scripts/generate_signing_key.py
  3. 112
      synapse/metrics/__init__.py
  4. 44
      synapse/storage/events.py
  5. 128
      tests/storage/test_cleanup_extrems.py
  6. 97
      tests/storage/test_event_metrics.py
  7. 61
      tests/unittest.py

@ -0,0 +1 @@
Statistics on forward extremities per room are now exposed via Prometheus.

@ -16,7 +16,7 @@
import argparse import argparse
import sys import sys
from signedjson.key import write_signing_keys, generate_signing_key from signedjson.key import generate_signing_key, write_signing_keys
from synapse.util.stringutils import random_string from synapse.util.stringutils import random_string

@ -25,7 +25,7 @@ import six
import attr import attr
from prometheus_client import Counter, Gauge, Histogram from prometheus_client import Counter, Gauge, Histogram
from prometheus_client.core import REGISTRY, GaugeMetricFamily from prometheus_client.core import REGISTRY, GaugeMetricFamily, HistogramMetricFamily
from twisted.internet import reactor from twisted.internet import reactor
@ -40,7 +40,6 @@ HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
class RegistryProxy(object): class RegistryProxy(object):
@staticmethod @staticmethod
def collect(): def collect():
for metric in REGISTRY.collect(): for metric in REGISTRY.collect():
@ -63,10 +62,7 @@ class LaterGauge(object):
try: try:
calls = self.caller() calls = self.caller()
except Exception: except Exception:
logger.exception( logger.exception("Exception running callback for LaterGauge(%s)", self.name)
"Exception running callback for LaterGauge(%s)",
self.name,
)
yield g yield g
return return
@ -116,9 +112,7 @@ class InFlightGauge(object):
# Create a class which have the sub_metrics values as attributes, which # Create a class which have the sub_metrics values as attributes, which
# default to 0 on initialization. Used to pass to registered callbacks. # default to 0 on initialization. Used to pass to registered callbacks.
self._metrics_class = attr.make_class( self._metrics_class = attr.make_class(
"_MetricsEntry", "_MetricsEntry", attrs={x: attr.ib(0) for x in sub_metrics}, slots=True
attrs={x: attr.ib(0) for x in sub_metrics},
slots=True,
) )
# Counts number of in flight blocks for a given set of label values # Counts number of in flight blocks for a given set of label values
@ -157,7 +151,9 @@ class InFlightGauge(object):
Note: may be called by a separate thread. Note: may be called by a separate thread.
""" """
in_flight = GaugeMetricFamily(self.name + "_total", self.desc, labels=self.labels) in_flight = GaugeMetricFamily(
self.name + "_total", self.desc, labels=self.labels
)
metrics_by_key = {} metrics_by_key = {}
@ -179,7 +175,9 @@ class InFlightGauge(object):
yield in_flight yield in_flight
for name in self.sub_metrics: for name in self.sub_metrics:
gauge = GaugeMetricFamily("_".join([self.name, name]), "", labels=self.labels) gauge = GaugeMetricFamily(
"_".join([self.name, name]), "", labels=self.labels
)
for key, metrics in six.iteritems(metrics_by_key): for key, metrics in six.iteritems(metrics_by_key):
gauge.add_metric(key, getattr(metrics, name)) gauge.add_metric(key, getattr(metrics, name))
yield gauge yield gauge
@ -193,12 +191,75 @@ class InFlightGauge(object):
all_gauges[self.name] = self all_gauges[self.name] = self
@attr.s(hash=True)
class BucketCollector(object):
"""
Like a Histogram, but allows buckets to be point-in-time instead of
incrementally added to.
Args:
name (str): Base name of metric to be exported to Prometheus.
data_collector (callable -> dict): A synchronous callable that
returns a dict mapping bucket to number of items in the
bucket. If these buckets are not the same as the buckets
given to this class, they will be remapped into them.
buckets (list[float]): List of floats/ints of the buckets to
give to Prometheus. +Inf is ignored, if given.
"""
name = attr.ib()
data_collector = attr.ib()
buckets = attr.ib()
def collect(self):
# Fetch the data -- this must be synchronous!
data = self.data_collector()
buckets = {}
res = []
for x in data.keys():
for i, bound in enumerate(self.buckets):
if x <= bound:
buckets[bound] = buckets.get(bound, 0) + data[x]
break
for i in self.buckets:
res.append([i, buckets.get(i, 0)])
res.append(["+Inf", sum(data.values())])
metric = HistogramMetricFamily(
self.name,
"",
buckets=res,
sum_value=sum([x * y for x, y in data.items()]),
)
yield metric
def __attrs_post_init__(self):
self.buckets = [float(x) for x in self.buckets if x != "+Inf"]
if self.buckets != sorted(self.buckets):
raise ValueError("Buckets not sorted")
self.buckets = tuple(self.buckets)
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering" % (self.name,))
REGISTRY.unregister(all_gauges.pop(self.name))
REGISTRY.register(self)
all_gauges[self.name] = self
# #
# Detailed CPU metrics # Detailed CPU metrics
# #
class CPUMetrics(object):
class CPUMetrics(object):
def __init__(self): def __init__(self):
ticks_per_sec = 100 ticks_per_sec = 100
try: try:
@ -237,13 +298,28 @@ gc_time = Histogram(
"python_gc_time", "python_gc_time",
"Time taken to GC (sec)", "Time taken to GC (sec)",
["gen"], ["gen"],
buckets=[0.0025, 0.005, 0.01, 0.025, 0.05, 0.10, 0.25, 0.50, 1.00, 2.50, buckets=[
5.00, 7.50, 15.00, 30.00, 45.00, 60.00], 0.0025,
0.005,
0.01,
0.025,
0.05,
0.10,
0.25,
0.50,
1.00,
2.50,
5.00,
7.50,
15.00,
30.00,
45.00,
60.00,
],
) )
class GCCounts(object): class GCCounts(object):
def collect(self): def collect(self):
cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"]) cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"])
for n, m in enumerate(gc.get_count()): for n, m in enumerate(gc.get_count()):
@ -279,9 +355,7 @@ sent_transactions_counter = Counter("synapse_federation_client_sent_transactions
events_processed_counter = Counter("synapse_federation_client_events_processed", "") events_processed_counter = Counter("synapse_federation_client_events_processed", "")
event_processing_loop_counter = Counter( event_processing_loop_counter = Counter(
"synapse_event_processing_loop_count", "synapse_event_processing_loop_count", "Event processing loop iterations", ["name"]
"Event processing loop iterations",
["name"],
) )
event_processing_loop_room_count = Counter( event_processing_loop_room_count = Counter(
@ -311,7 +385,6 @@ last_ticked = time.time()
class ReactorLastSeenMetric(object): class ReactorLastSeenMetric(object):
def collect(self): def collect(self):
cm = GaugeMetricFamily( cm = GaugeMetricFamily(
"python_twisted_reactor_last_seen", "python_twisted_reactor_last_seen",
@ -325,7 +398,6 @@ REGISTRY.register(ReactorLastSeenMetric())
def runUntilCurrentTimer(func): def runUntilCurrentTimer(func):
@functools.wraps(func) @functools.wraps(func)
def f(*args, **kwargs): def f(*args, **kwargs):
now = reactor.seconds() now = reactor.seconds()

@ -17,7 +17,7 @@
import itertools import itertools
import logging import logging
from collections import OrderedDict, deque, namedtuple from collections import Counter as c_counter, OrderedDict, deque, namedtuple
from functools import wraps from functools import wraps
from six import iteritems, text_type from six import iteritems, text_type
@ -33,6 +33,7 @@ from synapse.api.constants import EventTypes
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
from synapse.events import EventBase # noqa: F401 from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401
from synapse.metrics import BucketCollector
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.state import StateResolutionStore from synapse.state import StateResolutionStore
from synapse.storage.background_updates import BackgroundUpdateStore from synapse.storage.background_updates import BackgroundUpdateStore
@ -220,13 +221,38 @@ class EventsStore(
EventsWorkerStore, EventsWorkerStore,
BackgroundUpdateStore, BackgroundUpdateStore,
): ):
def __init__(self, db_conn, hs): def __init__(self, db_conn, hs):
super(EventsStore, self).__init__(db_conn, hs) super(EventsStore, self).__init__(db_conn, hs)
self._event_persist_queue = _EventPeristenceQueue() self._event_persist_queue = _EventPeristenceQueue()
self._state_resolution_handler = hs.get_state_resolution_handler() self._state_resolution_handler = hs.get_state_resolution_handler()
# Collect metrics on the number of forward extremities that exist.
self._current_forward_extremities_amount = {}
BucketCollector(
"synapse_forward_extremities",
lambda: self._current_forward_extremities_amount,
buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"]
)
# Read the extrems every 60 minutes
hs.get_clock().looping_call(self._read_forward_extremities, 60 * 60 * 1000)
@defer.inlineCallbacks
def _read_forward_extremities(self):
def fetch(txn):
txn.execute(
"""
select count(*) c from event_forward_extremities
group by room_id
"""
)
return txn.fetchall()
res = yield self.runInteraction("read_forward_extremities", fetch)
self._current_forward_extremities_amount = c_counter(list(x[0] for x in res))
@defer.inlineCallbacks @defer.inlineCallbacks
def persist_events(self, events_and_contexts, backfilled=False): def persist_events(self, events_and_contexts, backfilled=False):
""" """
@ -568,17 +594,11 @@ class EventsStore(
) )
txn.execute(sql, batch) txn.execute(sql, batch)
results.extend( results.extend(r[0] for r in txn if not json.loads(r[1]).get("soft_failed"))
r[0]
for r in txn
if not json.loads(r[1]).get("soft_failed")
)
for chunk in batch_iter(event_ids, 100): for chunk in batch_iter(event_ids, 100):
yield self.runInteraction( yield self.runInteraction(
"_get_events_which_are_prevs", "_get_events_which_are_prevs", _get_events_which_are_prevs_txn, chunk
_get_events_which_are_prevs_txn,
chunk,
) )
defer.returnValue(results) defer.returnValue(results)
@ -640,9 +660,7 @@ class EventsStore(
for chunk in batch_iter(event_ids, 100): for chunk in batch_iter(event_ids, 100):
yield self.runInteraction( yield self.runInteraction(
"_get_prevs_before_rejected", "_get_prevs_before_rejected", _get_prevs_before_rejected_txn, chunk
_get_prevs_before_rejected_txn,
chunk,
) )
defer.returnValue(existing_prevs) defer.returnValue(existing_prevs)

@ -15,7 +15,6 @@
import os.path import os.path
from synapse.api.constants import EventTypes
from synapse.storage import prepare_database from synapse.storage import prepare_database
from synapse.types import Requester, UserID from synapse.types import Requester, UserID
@ -23,17 +22,12 @@ from tests.unittest import HomeserverTestCase
class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase): class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase):
"""Test the background update to clean forward extremities table.
""" """
def make_homeserver(self, reactor, clock): Test the background update to clean forward extremities table.
# Hack until we understand why test_forked_graph_cleanup fails with v4 """
config = self.default_config()
config['default_room_version'] = '1'
return self.setup_test_homeserver(config=config)
def prepare(self, reactor, clock, homeserver): def prepare(self, reactor, clock, homeserver):
self.store = homeserver.get_datastore() self.store = homeserver.get_datastore()
self.event_creator = homeserver.get_event_creation_handler()
self.room_creator = homeserver.get_room_creation_handler() self.room_creator = homeserver.get_room_creation_handler()
# Create a test user and room # Create a test user and room
@ -42,56 +36,6 @@ class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase):
info = self.get_success(self.room_creator.create_room(self.requester, {})) info = self.get_success(self.room_creator.create_room(self.requester, {}))
self.room_id = info["room_id"] self.room_id = info["room_id"]
def create_and_send_event(self, soft_failed=False, prev_event_ids=None):
"""Create and send an event.
Args:
soft_failed (bool): Whether to create a soft failed event or not
prev_event_ids (list[str]|None): Explicitly set the prev events,
or if None just use the default
Returns:
str: The new event's ID.
"""
prev_events_and_hashes = None
if prev_event_ids:
prev_events_and_hashes = [[p, {}, 0] for p in prev_event_ids]
event, context = self.get_success(
self.event_creator.create_event(
self.requester,
{
"type": EventTypes.Message,
"room_id": self.room_id,
"sender": self.user.to_string(),
"content": {"body": "", "msgtype": "m.text"},
},
prev_events_and_hashes=prev_events_and_hashes,
)
)
if soft_failed:
event.internal_metadata.soft_failed = True
self.get_success(
self.event_creator.send_nonmember_event(self.requester, event, context)
)
return event.event_id
def add_extremity(self, event_id):
"""Add the given event as an extremity to the room.
"""
self.get_success(
self.store._simple_insert(
table="event_forward_extremities",
values={"room_id": self.room_id, "event_id": event_id},
desc="test_add_extremity",
)
)
self.store.get_latest_event_ids_in_room.invalidate((self.room_id,))
def run_background_update(self): def run_background_update(self):
"""Re run the background update to clean up the extremities. """Re run the background update to clean up the extremities.
""" """
@ -131,10 +75,16 @@ class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase):
""" """
# Create the room graph # Create the room graph
event_id_1 = self.create_and_send_event() event_id_1 = self.create_and_send_event(self.room_id, self.user)
event_id_2 = self.create_and_send_event(True, [event_id_1]) event_id_2 = self.create_and_send_event(
event_id_3 = self.create_and_send_event(True, [event_id_2]) self.room_id, self.user, True, [event_id_1]
event_id_4 = self.create_and_send_event(False, [event_id_3]) )
event_id_3 = self.create_and_send_event(
self.room_id, self.user, True, [event_id_2]
)
event_id_4 = self.create_and_send_event(
self.room_id, self.user, False, [event_id_3]
)
# Check the latest events are as expected # Check the latest events are as expected
latest_event_ids = self.get_success( latest_event_ids = self.get_success(
@ -154,12 +104,16 @@ class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase):
Where SF* are soft failed, and with extremities of A and B Where SF* are soft failed, and with extremities of A and B
""" """
# Create the room graph # Create the room graph
event_id_a = self.create_and_send_event() event_id_a = self.create_and_send_event(self.room_id, self.user)
event_id_sf1 = self.create_and_send_event(True, [event_id_a]) event_id_sf1 = self.create_and_send_event(
event_id_b = self.create_and_send_event(False, [event_id_sf1]) self.room_id, self.user, True, [event_id_a]
)
event_id_b = self.create_and_send_event(
self.room_id, self.user, False, [event_id_sf1]
)
# Add the new extremity and check the latest events are as expected # Add the new extremity and check the latest events are as expected
self.add_extremity(event_id_a) self.add_extremity(self.room_id, event_id_a)
latest_event_ids = self.get_success( latest_event_ids = self.get_success(
self.store.get_latest_event_ids_in_room(self.room_id) self.store.get_latest_event_ids_in_room(self.room_id)
@ -185,13 +139,19 @@ class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase):
Where SF* are soft failed, and with extremities of A and B Where SF* are soft failed, and with extremities of A and B
""" """
# Create the room graph # Create the room graph
event_id_a = self.create_and_send_event() event_id_a = self.create_and_send_event(self.room_id, self.user)
event_id_sf1 = self.create_and_send_event(True, [event_id_a]) event_id_sf1 = self.create_and_send_event(
event_id_sf2 = self.create_and_send_event(True, [event_id_sf1]) self.room_id, self.user, True, [event_id_a]
event_id_b = self.create_and_send_event(False, [event_id_sf2]) )
event_id_sf2 = self.create_and_send_event(
self.room_id, self.user, True, [event_id_sf1]
)
event_id_b = self.create_and_send_event(
self.room_id, self.user, False, [event_id_sf2]
)
# Add the new extremity and check the latest events are as expected # Add the new extremity and check the latest events are as expected
self.add_extremity(event_id_a) self.add_extremity(self.room_id, event_id_a)
latest_event_ids = self.get_success( latest_event_ids = self.get_success(
self.store.get_latest_event_ids_in_room(self.room_id) self.store.get_latest_event_ids_in_room(self.room_id)
@ -227,16 +187,26 @@ class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase):
""" """
# Create the room graph # Create the room graph
event_id_a = self.create_and_send_event() event_id_a = self.create_and_send_event(self.room_id, self.user)
event_id_b = self.create_and_send_event() event_id_b = self.create_and_send_event(self.room_id, self.user)
event_id_sf1 = self.create_and_send_event(True, [event_id_a]) event_id_sf1 = self.create_and_send_event(
event_id_sf2 = self.create_and_send_event(True, [event_id_a, event_id_b]) self.room_id, self.user, True, [event_id_a]
event_id_sf3 = self.create_and_send_event(True, [event_id_sf1]) )
self.create_and_send_event(True, [event_id_sf2, event_id_sf3]) # SF4 event_id_sf2 = self.create_and_send_event(
event_id_c = self.create_and_send_event(False, [event_id_sf3]) self.room_id, self.user, True, [event_id_a, event_id_b]
)
event_id_sf3 = self.create_and_send_event(
self.room_id, self.user, True, [event_id_sf1]
)
self.create_and_send_event(
self.room_id, self.user, True, [event_id_sf2, event_id_sf3]
) # SF4
event_id_c = self.create_and_send_event(
self.room_id, self.user, False, [event_id_sf3]
)
# Add the new extremity and check the latest events are as expected # Add the new extremity and check the latest events are as expected
self.add_extremity(event_id_a) self.add_extremity(self.room_id, event_id_a)
latest_event_ids = self.get_success( latest_event_ids = self.get_success(
self.store.get_latest_event_ids_in_room(self.room_id) self.store.get_latest_event_ids_in_room(self.room_id)

@ -0,0 +1,97 @@
# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.metrics import REGISTRY
from synapse.types import Requester, UserID
from tests.unittest import HomeserverTestCase
class ExtremStatisticsTestCase(HomeserverTestCase):
def test_exposed_to_prometheus(self):
"""
Forward extremity counts are exposed via Prometheus.
"""
room_creator = self.hs.get_room_creation_handler()
user = UserID("alice", "test")
requester = Requester(user, None, False, None, None)
# Real events, forward extremities
events = [(3, 2), (6, 2), (4, 6)]
for event_count, extrems in events:
info = self.get_success(room_creator.create_room(requester, {}))
room_id = info["room_id"]
last_event = None
# Make a real event chain
for i in range(event_count):
ev = self.create_and_send_event(room_id, user, False, last_event)
last_event = [ev]
# Sprinkle in some extremities
for i in range(extrems):
ev = self.create_and_send_event(room_id, user, False, last_event)
# Let it run for a while, then pull out the statistics from the
# Prometheus client registry
self.reactor.advance(60 * 60 * 1000)
self.pump(1)
items = list(
filter(
lambda x: x.name == "synapse_forward_extremities",
list(REGISTRY.collect()),
)
)
# Check the values are what we want
buckets = {}
_count = 0
_sum = 0
for i in items[0].samples:
if i[0].endswith("_bucket"):
buckets[i[1]['le']] = i[2]
elif i[0].endswith("_count"):
_count = i[2]
elif i[0].endswith("_sum"):
_sum = i[2]
# 3 buckets, 2 with 2 extrems, 1 with 6 extrems (bucketed as 7), and
# +Inf which is all
self.assertEqual(
buckets,
{
1.0: 0,
2.0: 2,
3.0: 0,
5.0: 0,
7.0: 1,
10.0: 0,
15.0: 0,
20.0: 0,
50.0: 0,
100.0: 0,
200.0: 0,
500.0: 0,
"+Inf": 3,
},
)
# 3 rooms, with 10 total events
self.assertEqual(_count, 3)
self.assertEqual(_sum, 10)

@ -27,11 +27,12 @@ import twisted.logger
from twisted.internet.defer import Deferred from twisted.internet.defer import Deferred
from twisted.trial import unittest from twisted.trial import unittest
from synapse.api.constants import EventTypes
from synapse.config.homeserver import HomeServerConfig from synapse.config.homeserver import HomeServerConfig
from synapse.http.server import JsonResource from synapse.http.server import JsonResource
from synapse.http.site import SynapseRequest from synapse.http.site import SynapseRequest
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.types import UserID, create_requester from synapse.types import Requester, UserID, create_requester
from synapse.util.logcontext import LoggingContext from synapse.util.logcontext import LoggingContext
from tests.server import get_clock, make_request, render, setup_test_homeserver from tests.server import get_clock, make_request, render, setup_test_homeserver
@ -442,6 +443,64 @@ class HomeserverTestCase(TestCase):
access_token = channel.json_body["access_token"] access_token = channel.json_body["access_token"]
return access_token return access_token
def create_and_send_event(
self, room_id, user, soft_failed=False, prev_event_ids=None
):
"""
Create and send an event.
Args:
soft_failed (bool): Whether to create a soft failed event or not
prev_event_ids (list[str]|None): Explicitly set the prev events,
or if None just use the default
Returns:
str: The new event's ID.
"""
event_creator = self.hs.get_event_creation_handler()
secrets = self.hs.get_secrets()
requester = Requester(user, None, False, None, None)
prev_events_and_hashes = None
if prev_event_ids:
prev_events_and_hashes = [[p, {}, 0] for p in prev_event_ids]
event, context = self.get_success(
event_creator.create_event(
requester,
{
"type": EventTypes.Message,
"room_id": room_id,
"sender": user.to_string(),
"content": {"body": secrets.token_hex(), "msgtype": "m.text"},
},
prev_events_and_hashes=prev_events_and_hashes,
)
)
if soft_failed:
event.internal_metadata.soft_failed = True
self.get_success(
event_creator.send_nonmember_event(requester, event, context)
)
return event.event_id
def add_extremity(self, room_id, event_id):
"""
Add the given event as an extremity to the room.
"""
self.get_success(
self.hs.get_datastore()._simple_insert(
table="event_forward_extremities",
values={"room_id": room_id, "event_id": event_id},
desc="test_add_extremity",
)
)
self.hs.get_datastore().get_latest_event_ids_in_room.invalidate((room_id,))
def attempt_wrong_password_login(self, username, password): def attempt_wrong_password_login(self, username, password):
"""Attempts to login as the user with the given password, asserting """Attempts to login as the user with the given password, asserting
that the attempt *fails*. that the attempt *fails*.

Loading…
Cancel
Save