Merge pull request #8425 from matrix-org/rav/extremity_metrics

Add an improved "forward extremities" metric
code_spécifique_watcha
Richard van der Hoff 4 years ago committed by GitHub
commit a0a1ba6973
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      changelog.d/8425.feature
  2. 115
      synapse/metrics/__init__.py
  3. 40
      synapse/metrics/_exposition.py
  4. 53
      synapse/storage/databases/main/metrics.py
  5. 19
      tests/storage/test_event_metrics.py

@ -0,0 +1 @@
Add experimental prometheus metric to track numbers of "large" rooms for state resolutiom.

@ -15,6 +15,7 @@
import functools
import gc
import itertools
import logging
import os
import platform
@ -27,8 +28,8 @@ from prometheus_client import Counter, Gauge, Histogram
from prometheus_client.core import (
REGISTRY,
CounterMetricFamily,
GaugeHistogramMetricFamily,
GaugeMetricFamily,
HistogramMetricFamily,
)
from twisted.internet import reactor
@ -46,7 +47,7 @@ logger = logging.getLogger(__name__)
METRICS_PREFIX = "/_synapse/metrics"
running_on_pypy = platform.python_implementation() == "PyPy"
all_gauges = {} # type: Dict[str, Union[LaterGauge, InFlightGauge, BucketCollector]]
all_gauges = {} # type: Dict[str, Union[LaterGauge, InFlightGauge]]
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
@ -205,63 +206,83 @@ class InFlightGauge:
all_gauges[self.name] = self
@attr.s(slots=True, hash=True)
class BucketCollector:
"""
Like a Histogram, but allows buckets to be point-in-time instead of
incrementally added to.
class GaugeBucketCollector:
"""Like a Histogram, but the buckets are Gauges which are updated atomically.
Args:
name (str): Base name of metric to be exported to Prometheus.
data_collector (callable -> dict): A synchronous callable that
returns a dict mapping bucket to number of items in the
bucket. If these buckets are not the same as the buckets
given to this class, they will be remapped into them.
buckets (list[float]): List of floats/ints of the buckets to
give to Prometheus. +Inf is ignored, if given.
The data is updated by calling `update_data` with an iterable of measurements.
We assume that the data is updated less frequently than it is reported to
Prometheus, and optimise for that case.
"""
name = attr.ib()
data_collector = attr.ib()
buckets = attr.ib()
__slots__ = ("_name", "_documentation", "_bucket_bounds", "_metric")
def collect(self):
def __init__(
self,
name: str,
documentation: str,
buckets: Iterable[float],
registry=REGISTRY,
):
"""
Args:
name: base name of metric to be exported to Prometheus. (a _bucket suffix
will be added.)
documentation: help text for the metric
buckets: The top bounds of the buckets to report
registry: metric registry to register with
"""
self._name = name
self._documentation = documentation
# Fetch the data -- this must be synchronous!
data = self.data_collector()
# the tops of the buckets
self._bucket_bounds = [float(b) for b in buckets]
if self._bucket_bounds != sorted(self._bucket_bounds):
raise ValueError("Buckets not in sorted order")
buckets = {} # type: Dict[float, int]
if self._bucket_bounds[-1] != float("inf"):
self._bucket_bounds.append(float("inf"))
res = []
for x in data.keys():
for i, bound in enumerate(self.buckets):
if x <= bound:
buckets[bound] = buckets.get(bound, 0) + data[x]
self._metric = self._values_to_metric([])
registry.register(self)
for i in self.buckets:
res.append([str(i), buckets.get(i, 0)])
def collect(self):
yield self._metric
res.append(["+Inf", sum(data.values())])
def update_data(self, values: Iterable[float]):
"""Update the data to be reported by the metric
metric = HistogramMetricFamily(
self.name, "", buckets=res, sum_value=sum(x * y for x, y in data.items())
The existing data is cleared, and each measurement in the input is assigned
to the relevant bucket.
"""
self._metric = self._values_to_metric(values)
def _values_to_metric(self, values: Iterable[float]) -> GaugeHistogramMetricFamily:
total = 0.0
bucket_values = [0 for _ in self._bucket_bounds]
for v in values:
# assign each value to a bucket
for i, bound in enumerate(self._bucket_bounds):
if v <= bound:
bucket_values[i] += 1
break
# ... and increment the sum
total += v
# now, aggregate the bucket values so that they count the number of entries in
# that bucket or below.
accumulated_values = itertools.accumulate(bucket_values)
return GaugeHistogramMetricFamily(
self._name,
self._documentation,
buckets=list(
zip((str(b) for b in self._bucket_bounds), accumulated_values)
),
gsum_value=total,
)
yield metric
def __attrs_post_init__(self):
self.buckets = [float(x) for x in self.buckets if x != "+Inf"]
if self.buckets != sorted(self.buckets):
raise ValueError("Buckets not sorted")
self.buckets = tuple(self.buckets)
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering" % (self.name,))
REGISTRY.unregister(all_gauges.pop(self.name))
REGISTRY.register(self)
all_gauges[self.name] = self
#

@ -26,6 +26,7 @@ import math
import threading
from http.server import BaseHTTPRequestHandler, HTTPServer
from socketserver import ThreadingMixIn
from typing import Dict, List
from urllib.parse import parse_qs, urlparse
from prometheus_client import REGISTRY
@ -124,16 +125,33 @@ def generate_latest(registry, emit_help=False):
)
)
output.append("# TYPE {0} {1}\n".format(mname, mtype))
for sample in metric.samples:
# Get rid of the OpenMetrics specific samples
om_samples = {} # type: Dict[str, List[str]]
for s in metric.samples:
for suffix in ["_created", "_gsum", "_gcount"]:
if sample.name.endswith(suffix):
if s.name == metric.name + suffix:
# OpenMetrics specific sample, put in a gauge at the end.
# (these come from gaugehistograms which don't get renamed,
# so no need to faff with mnewname)
om_samples.setdefault(suffix, []).append(sample_line(s, s.name))
break
else:
newname = sample.name.replace(mnewname, mname)
newname = s.name.replace(mnewname, mname)
if ":" in newname and newname.endswith("_total"):
newname = newname[: -len("_total")]
output.append(sample_line(sample, newname))
output.append(sample_line(s, newname))
for suffix, lines in sorted(om_samples.items()):
if emit_help:
output.append(
"# HELP {0}{1} {2}\n".format(
metric.name,
suffix,
metric.documentation.replace("\\", r"\\").replace("\n", r"\n"),
)
)
output.append("# TYPE {0}{1} gauge\n".format(metric.name, suffix))
output.extend(lines)
# Get rid of the weird colon things while we're at it
if mtype == "counter":
@ -152,16 +170,16 @@ def generate_latest(registry, emit_help=False):
)
)
output.append("# TYPE {0} {1}\n".format(mnewname, mtype))
for sample in metric.samples:
# Get rid of the OpenMetrics specific samples
for s in metric.samples:
# Get rid of the OpenMetrics specific samples (we should already have
# dealt with them above anyway.)
for suffix in ["_created", "_gsum", "_gcount"]:
if sample.name.endswith(suffix):
if s.name == metric.name + suffix:
break
else:
output.append(
sample_line(
sample, sample.name.replace(":total", "").replace(":", "_")
)
sample_line(s, s.name.replace(":total", "").replace(":", "_"))
)
return "".join(output).encode("utf-8")

@ -12,10 +12,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import typing
from collections import Counter
from synapse.metrics import BucketCollector
from synapse.metrics import GaugeBucketCollector
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool
@ -23,6 +21,26 @@ from synapse.storage.databases.main.event_push_actions import (
EventPushActionsWorkerStore,
)
# Collect metrics on the number of forward extremities that exist.
_extremities_collecter = GaugeBucketCollector(
"synapse_forward_extremities",
"Number of rooms on the server with the given number of forward extremities"
" or fewer",
buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500],
)
# we also expose metrics on the "number of excess extremity events", which is
# (E-1)*N, where E is the number of extremities and N is the number of state
# events in the room. This is an approximation to the number of state events
# we could remove from state resolution by reducing the graph to a single
# forward extremity.
_excess_state_events_collecter = GaugeBucketCollector(
"synapse_excess_extremity_events",
"Number of rooms on the server with the given number of excess extremity "
"events, or fewer",
buckets=[0] + [1 << n for n in range(12)],
)
class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
"""Functions to pull various metrics from the DB, for e.g. phone home
@ -32,18 +50,6 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)
# Collect metrics on the number of forward extremities that exist.
# Counter of number of extremities to count
self._current_forward_extremities_amount = (
Counter()
) # type: typing.Counter[int]
BucketCollector(
"synapse_forward_extremities",
lambda: self._current_forward_extremities_amount,
buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"],
)
# Read the extrems every 60 minutes
def read_forward_extremities():
# run as a background process to make sure that the database transactions
@ -58,14 +64,25 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
def fetch(txn):
txn.execute(
"""
select count(*) c from event_forward_extremities
group by room_id
SELECT t1.c, t2.c
FROM (
SELECT room_id, COUNT(*) c FROM event_forward_extremities
GROUP BY room_id
) t1 LEFT JOIN (
SELECT room_id, COUNT(*) c FROM current_state_events
GROUP BY room_id
) t2 ON t1.room_id = t2.room_id
"""
)
return txn.fetchall()
res = await self.db_pool.runInteraction("read_forward_extremities", fetch)
self._current_forward_extremities_amount = Counter([x[0] for x in res])
_extremities_collecter.update_data(x[0] for x in res)
_excess_state_events_collecter.update_data(
(x[0] - 1) * x[1] for x in res if x[1]
)
async def count_daily_messages(self):
"""

@ -52,14 +52,14 @@ class ExtremStatisticsTestCase(HomeserverTestCase):
self.reactor.advance(60 * 60 * 1000)
self.pump(1)
items = set(
items = list(
filter(
lambda x: b"synapse_forward_extremities_" in x,
generate_latest(REGISTRY).split(b"\n"),
generate_latest(REGISTRY, emit_help=False).split(b"\n"),
)
)
expected = {
expected = [
b'synapse_forward_extremities_bucket{le="1.0"} 0.0',
b'synapse_forward_extremities_bucket{le="2.0"} 2.0',
b'synapse_forward_extremities_bucket{le="3.0"} 2.0',
@ -72,9 +72,12 @@ class ExtremStatisticsTestCase(HomeserverTestCase):
b'synapse_forward_extremities_bucket{le="100.0"} 3.0',
b'synapse_forward_extremities_bucket{le="200.0"} 3.0',
b'synapse_forward_extremities_bucket{le="500.0"} 3.0',
b'synapse_forward_extremities_bucket{le="+Inf"} 3.0',
b"synapse_forward_extremities_count 3.0",
b"synapse_forward_extremities_sum 10.0",
}
# per https://docs.google.com/document/d/1KwV0mAXwwbvvifBvDKH_LU1YjyXE_wxCkHNoCGq1GX0/edit#heading=h.wghdjzzh72j9,
# "inf" is valid: "this includes variants such as inf"
b'synapse_forward_extremities_bucket{le="inf"} 3.0',
b"# TYPE synapse_forward_extremities_gcount gauge",
b"synapse_forward_extremities_gcount 3.0",
b"# TYPE synapse_forward_extremities_gsum gauge",
b"synapse_forward_extremities_gsum 10.0",
]
self.assertEqual(items, expected)

Loading…
Cancel
Save