This PR reverts https://github.com/element-hq/synapse/pull/18751 ### Why revert? @reivilibre [found](https://matrix.to/#/!vcyiEtMVHIhWXcJAfl:sw1v.org/$u9OEmMxaFYUzWHhCk1A_r50Y0aGrtKEhepF7WxWJkUA?via=matrix.org&via=node.marinchik.ink&via=element.io) that our CI was failing in bizarre ways (thanks for stepping up to dive into this 🙇). Examples: - `twisted.internet.error.ProcessTerminated: A process has ended with a probable error condition: process ended by signal 9.` - `twisted.internet.error.ProcessTerminated: A process has ended with a probable error condition: process ended by signal 15.` <details> <summary>More detailed part of the log</summary> https://github.com/element-hq/synapse/actions/runs/16758038107/job/47500520633#step:9:6809 ``` tests.util.test_wheel_timer.WheelTimerTestCase.test_single_insert_fetch =============================================================================== Error: Traceback (most recent call last): File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/trial/_dist/disttrial.py", line 371, in task await worker.run(case, result) File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/trial/_dist/worker.py", line 305, in run return await self.callRemote(workercommands.Run, testCase=testCaseId) # type: ignore[no-any-return] File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/internet/defer.py", line 1187, in __iter__ yield self File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/internet/defer.py", line 1092, in _runCallbacks current.result = callback( # type: ignore[misc] File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/protocols/amp.py", line 1968, in _massageError error.trap(RemoteAmpError) File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/python/failure.py", line 431, in trap self.raiseException() File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/python/failure.py", line 455, in raiseException raise self.value.with_traceback(self.tb) twisted.internet.error.ProcessTerminated: A process has ended with a probable error condition: process ended by signal 9. tests.util.test_macaroons.MacaroonGeneratorTestCase.test_guest_access_token ------------------------------------------------------------------------------- Ran 4325 tests in 669.321s FAILED (skips=159, errors=62, successes=4108) while calling from thread Traceback (most recent call last): File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/internet/base.py", line 1064, in runUntilCurrent f(*a, **kw) File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/internet/base.py", line 790, in stop raise error.ReactorNotRunning("Can't stop reactor that isn't running.") twisted.internet.error.ReactorNotRunning: Can't stop reactor that isn't running. joining disttrial worker #0 failed Traceback (most recent call last): File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/internet/defer.py", line 1853, in _inlineCallbacks result = context.run( File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/python/failure.py", line 467, in throwExceptionIntoGenerator return g.throw(self.value.with_traceback(self.tb)) File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/trial/_dist/worker.py", line 406, in exit await endDeferred File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/internet/defer.py", line 1187, in __iter__ yield self twisted.internet.error.ProcessTerminated: A process has ended with a probable error condition: process ended by signal 15. ``` </details> With more debugging (thanks @devonh for also stepping in as maintainer), we were finding that the CI was consistently failing at `test_exposed_to_prometheus` which was a bit of smoke because of all of the [metrics changes](https://github.com/element-hq/synapse/issues/18592) that were merged recently. Locally, although I wasn't able to reproduce the bizarre errors, I could easily see increased memory usage (~20GB vs ~2GB) and the `test_exposed_to_prometheus` test taking a while to complete when running a full test run (`SYNAPSE_TEST_LOG_LEVEL=INFO poetry run trial tests`). <img width="1485" height="78" alt="Lots of memory usage" src="https://github.com/user-attachments/assets/811e2a96-75e5-4a3c-966c-00dc0512cea9" /> After updating `test_exposed_to_prometheus` to dump the `latest_metrics_response = generate_latest(REGISTRY)`, I could see that it's a massive 3.2GB response. Inspecting the contents, we can see 4.1M (4,137,123) entries for just `synapse_background_update_status{server_name="test"} 3.0` which is a `LaterGauge`. I don't think we have 4.1M test cases so it's also unclear why we end up with so many samples but it does make sense that we do see a lot of duplicates because each `HomeserverTestCase` will create a homeserver for each test case that will `LaterGauge.register_hook(...)` (part of the https://github.com/element-hq/synapse/pull/18751 changes). `tests/storage/databases/main/test_metrics.py` ```python latest_metrics_response = generate_latest(REGISTRY) with open("/tmp/synapse-test-metrics", "wb") as f: f.write(latest_metrics_response) ``` After reverting the https://github.com/element-hq/synapse/pull/18751 changes, running the full test suite locally doesn't result in memory spikes and seems to run normally. ### Dev notes Discussion in the [`#synapse-dev:matrix.org`](https://matrix.to/#/!vcyiEtMVHIhWXcJAfl:sw1v.org/$vkMATs04yqZggVVd6Noop5nU8M2DVoTkrAWshw7u1-w?via=matrix.org&via=node.marinchik.ink&via=element.io) room. ### Pull Request Checklist <!-- Please read https://element-hq.github.io/synapse/latest/development/contributing_guide.html before submitting your pull request --> * [x] Pull request is based on the develop branch * [ ] Pull request includes a [changelog file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog). The entry should: - Be a short description of your change which makes sense to users. "Fixed a bug that prevented receiving messages from other servers." instead of "Moved X method from `EventStore` to `EventWorkerStore`.". - Use markdown where necessary, mostly for `code blocks`. - End with either a period (.) or an exclamation mark (!). - Start with a capital letter. - Feel free to credit yourself, by adding a sentence "Contributed by @github_username." or "Contributed by [Your Name]." to the end of the entry. * [ ] [Code style](https://element-hq.github.io/synapse/latest/code_style.html) is correct (run the [linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters))
701 lines
23 KiB
Python
701 lines
23 KiB
Python
#
|
|
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
|
#
|
|
# Copyright 2022 The Matrix.org Foundation C.I.C.
|
|
# Copyright 2015, 2016 OpenMarket Ltd
|
|
# Copyright (C) 2023 New Vector, Ltd
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as
|
|
# published by the Free Software Foundation, either version 3 of the
|
|
# License, or (at your option) any later version.
|
|
#
|
|
# See the GNU Affero General Public License for more details:
|
|
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
|
#
|
|
# Originally licensed under the Apache License, Version 2.0:
|
|
# <http://www.apache.org/licenses/LICENSE-2.0>.
|
|
#
|
|
# [This file includes modifications made by New Vector Limited]
|
|
#
|
|
#
|
|
|
|
import itertools
|
|
import logging
|
|
import os
|
|
import platform
|
|
import threading
|
|
from importlib import metadata
|
|
from typing import (
|
|
Callable,
|
|
Dict,
|
|
Generic,
|
|
Iterable,
|
|
Mapping,
|
|
Optional,
|
|
Sequence,
|
|
Set,
|
|
Tuple,
|
|
Type,
|
|
TypeVar,
|
|
Union,
|
|
cast,
|
|
)
|
|
|
|
import attr
|
|
from pkg_resources import parse_version
|
|
from prometheus_client import (
|
|
CollectorRegistry,
|
|
Counter,
|
|
Gauge,
|
|
Histogram,
|
|
Metric,
|
|
generate_latest,
|
|
)
|
|
from prometheus_client.core import (
|
|
REGISTRY,
|
|
GaugeHistogramMetricFamily,
|
|
GaugeMetricFamily,
|
|
)
|
|
|
|
from twisted.python.threadpool import ThreadPool
|
|
from twisted.web.resource import Resource
|
|
from twisted.web.server import Request
|
|
|
|
# This module is imported for its side effects; flake8 needn't warn that it's unused.
|
|
import synapse.metrics._reactor_metrics # noqa: F401
|
|
from synapse.metrics._gc import MIN_TIME_BETWEEN_GCS, install_gc_manager
|
|
from synapse.metrics._types import Collector
|
|
from synapse.types import StrSequence
|
|
from synapse.util import SYNAPSE_VERSION
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
METRICS_PREFIX = "/_synapse/metrics"
|
|
|
|
all_gauges: Dict[str, Collector] = {}
|
|
|
|
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
|
|
|
|
SERVER_NAME_LABEL = "server_name"
|
|
"""
|
|
The `server_name` label is used to identify the homeserver that the metrics correspond
|
|
to. Because we support multiple instances of Synapse running in the same process and all
|
|
metrics are in a single global `REGISTRY`, we need to manually label any metrics.
|
|
|
|
In the case of a Synapse homeserver, this should be set to the homeserver name
|
|
(`hs.hostname`).
|
|
|
|
We're purposely not using the `instance` label for this purpose as that should be "The
|
|
<host>:<port> part of the target's URL that was scraped.". Also: "In Prometheus
|
|
terms, an endpoint you can scrape is called an *instance*, usually corresponding to a
|
|
single process." (source: https://prometheus.io/docs/concepts/jobs_instances/)
|
|
"""
|
|
|
|
|
|
CONTENT_TYPE_LATEST = "text/plain; version=0.0.4; charset=utf-8"
|
|
"""
|
|
Content type of the latest text format for Prometheus metrics.
|
|
|
|
Pulled directly from the prometheus_client library.
|
|
"""
|
|
|
|
|
|
def _set_prometheus_client_use_created_metrics(new_value: bool) -> None:
|
|
"""
|
|
Sets whether prometheus_client should expose `_created`-suffixed metrics for
|
|
all gauges, histograms and summaries.
|
|
|
|
There is no programmatic way in the old versions of `prometheus_client` to disable
|
|
this without poking at internals; the proper way in the old `prometheus_client`
|
|
versions (> `0.14.0` < `0.18.0`) is to use an environment variable which
|
|
prometheus_client loads at import time. For versions > `0.18.0`, we can use the
|
|
dedicated `disable_created_metrics()`/`enable_created_metrics()`.
|
|
|
|
The motivation for disabling these `_created` metrics is that they're a waste of
|
|
space as they're not useful but they take up space in Prometheus. It's not the end
|
|
of the world if this doesn't work.
|
|
"""
|
|
import prometheus_client.metrics
|
|
|
|
if hasattr(prometheus_client.metrics, "_use_created"):
|
|
prometheus_client.metrics._use_created = new_value
|
|
# Just log an error for old versions that don't support disabling the unecessary
|
|
# metrics. It's not the end of the world if this doesn't work as it just means extra
|
|
# wasted space taken up in Prometheus but things keep working.
|
|
elif parse_version(metadata.version("prometheus_client")) < parse_version("0.14.0"):
|
|
logger.error(
|
|
"Can't disable `_created` metrics in prometheus_client (unsupported `prometheus_client` version, too old)"
|
|
)
|
|
# If the attribute doesn't exist on a newer version, this is a sign that the brittle
|
|
# hack is broken. We should consider updating the minimum version of
|
|
# `prometheus_client` to a version (> `0.18.0`) where we can use dedicated
|
|
# `disable_created_metrics()`/`enable_created_metrics()` functions.
|
|
else:
|
|
raise Exception(
|
|
"Can't disable `_created` metrics in prometheus_client (brittle hack broken?)"
|
|
)
|
|
|
|
|
|
# Set this globally so it applies wherever we generate/collect metrics
|
|
_set_prometheus_client_use_created_metrics(False)
|
|
|
|
|
|
class _RegistryProxy:
|
|
@staticmethod
|
|
def collect() -> Iterable[Metric]:
|
|
for metric in REGISTRY.collect():
|
|
if not metric.name.startswith("__"):
|
|
yield metric
|
|
|
|
|
|
# A little bit nasty, but collect() above is static so a Protocol doesn't work.
|
|
# _RegistryProxy matches the signature of a CollectorRegistry instance enough
|
|
# for it to be usable in the contexts in which we use it.
|
|
# TODO Do something nicer about this.
|
|
RegistryProxy = cast(CollectorRegistry, _RegistryProxy)
|
|
|
|
|
|
@attr.s(slots=True, hash=True, auto_attribs=True, kw_only=True)
|
|
class LaterGauge(Collector):
|
|
"""A Gauge which periodically calls a user-provided callback to produce metrics."""
|
|
|
|
name: str
|
|
desc: str
|
|
labelnames: Optional[StrSequence] = attr.ib(hash=False)
|
|
# callback: should either return a value (if there are no labels for this metric),
|
|
# or dict mapping from a label tuple to a value
|
|
caller: Callable[
|
|
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
|
|
]
|
|
|
|
def collect(self) -> Iterable[Metric]:
|
|
# The decision to add `SERVER_NAME_LABEL` is from the `LaterGauge` usage itself
|
|
# (we don't enforce it here, one level up).
|
|
g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames) # type: ignore[missing-server-name-label]
|
|
|
|
try:
|
|
calls = self.caller()
|
|
except Exception:
|
|
logger.exception("Exception running callback for LaterGauge(%s)", self.name)
|
|
yield g
|
|
return
|
|
|
|
if isinstance(calls, (int, float)):
|
|
g.add_metric([], calls)
|
|
else:
|
|
for k, v in calls.items():
|
|
g.add_metric(k, v)
|
|
|
|
yield g
|
|
|
|
def __attrs_post_init__(self) -> None:
|
|
self._register()
|
|
|
|
def _register(self) -> None:
|
|
if self.name in all_gauges.keys():
|
|
logger.warning("%s already registered, reregistering", self.name)
|
|
REGISTRY.unregister(all_gauges.pop(self.name))
|
|
|
|
REGISTRY.register(self)
|
|
all_gauges[self.name] = self
|
|
|
|
|
|
# `MetricsEntry` only makes sense when it is a `Protocol`,
|
|
# but `Protocol` can't be used as a `TypeVar` bound.
|
|
MetricsEntry = TypeVar("MetricsEntry")
|
|
|
|
|
|
class InFlightGauge(Generic[MetricsEntry], Collector):
|
|
"""Tracks number of things (e.g. requests, Measure blocks, etc) in flight
|
|
at any given time.
|
|
|
|
Each InFlightGauge will create a metric called `<name>_total` that counts
|
|
the number of in flight blocks, as well as a metrics for each item in the
|
|
given `sub_metrics` as `<name>_<sub_metric>` which will get updated by the
|
|
callbacks.
|
|
|
|
Args:
|
|
name
|
|
desc
|
|
labels
|
|
sub_metrics: A list of sub metrics that the callbacks will update.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
name: str,
|
|
desc: str,
|
|
labels: StrSequence,
|
|
sub_metrics: StrSequence,
|
|
):
|
|
self.name = name
|
|
self.desc = desc
|
|
self.labels = labels
|
|
self.sub_metrics = sub_metrics
|
|
|
|
# Create a class which have the sub_metrics values as attributes, which
|
|
# default to 0 on initialization. Used to pass to registered callbacks.
|
|
self._metrics_class: Type[MetricsEntry] = attr.make_class(
|
|
"_MetricsEntry",
|
|
attrs={x: attr.ib(default=0) for x in sub_metrics},
|
|
slots=True,
|
|
)
|
|
|
|
# Counts number of in flight blocks for a given set of label values
|
|
self._registrations: Dict[
|
|
Tuple[str, ...], Set[Callable[[MetricsEntry], None]]
|
|
] = {}
|
|
|
|
# Protects access to _registrations
|
|
self._lock = threading.Lock()
|
|
|
|
self._register_with_collector()
|
|
|
|
def register(
|
|
self,
|
|
key: Tuple[str, ...],
|
|
callback: Callable[[MetricsEntry], None],
|
|
) -> None:
|
|
"""Registers that we've entered a new block with labels `key`.
|
|
|
|
`callback` gets called each time the metrics are collected. The same
|
|
value must also be given to `unregister`.
|
|
|
|
`callback` gets called with an object that has an attribute per
|
|
sub_metric, which should be updated with the necessary values. Note that
|
|
the metrics object is shared between all callbacks registered with the
|
|
same key.
|
|
|
|
Note that `callback` may be called on a separate thread.
|
|
|
|
Args:
|
|
key: A tuple of label values, which must match the order of the
|
|
`labels` given to the constructor.
|
|
callback
|
|
"""
|
|
assert len(key) == len(self.labels), (
|
|
f"Expected {len(self.labels)} labels in `key`, got {len(key)}: {key}"
|
|
)
|
|
|
|
with self._lock:
|
|
self._registrations.setdefault(key, set()).add(callback)
|
|
|
|
def unregister(
|
|
self,
|
|
key: Tuple[str, ...],
|
|
callback: Callable[[MetricsEntry], None],
|
|
) -> None:
|
|
"""
|
|
Registers that we've exited a block with labels `key`.
|
|
|
|
Args:
|
|
key: A tuple of label values, which must match the order of the
|
|
`labels` given to the constructor.
|
|
callback
|
|
"""
|
|
assert len(key) == len(self.labels), (
|
|
f"Expected {len(self.labels)} labels in `key`, got {len(key)}: {key}"
|
|
)
|
|
|
|
with self._lock:
|
|
self._registrations.setdefault(key, set()).discard(callback)
|
|
|
|
def collect(self) -> Iterable[Metric]:
|
|
"""Called by prometheus client when it reads metrics.
|
|
|
|
Note: may be called by a separate thread.
|
|
"""
|
|
# The decision to add `SERVER_NAME_LABEL` is from the `GaugeBucketCollector`
|
|
# usage itself (we don't enforce it here, one level up).
|
|
in_flight = GaugeMetricFamily( # type: ignore[missing-server-name-label]
|
|
self.name + "_total", self.desc, labels=self.labels
|
|
)
|
|
|
|
metrics_by_key = {}
|
|
|
|
# We copy so that we don't mutate the list while iterating
|
|
with self._lock:
|
|
keys = list(self._registrations)
|
|
|
|
for key in keys:
|
|
with self._lock:
|
|
callbacks = set(self._registrations[key])
|
|
|
|
in_flight.add_metric(labels=key, value=len(callbacks))
|
|
|
|
metrics = self._metrics_class()
|
|
metrics_by_key[key] = metrics
|
|
for callback in callbacks:
|
|
callback(metrics)
|
|
|
|
yield in_flight
|
|
|
|
for name in self.sub_metrics:
|
|
# The decision to add `SERVER_NAME_LABEL` is from the `InFlightGauge` usage
|
|
# itself (we don't enforce it here, one level up).
|
|
gauge = GaugeMetricFamily( # type: ignore[missing-server-name-label]
|
|
"_".join([self.name, name]), "", labels=self.labels
|
|
)
|
|
for key, metrics in metrics_by_key.items():
|
|
gauge.add_metric(labels=key, value=getattr(metrics, name))
|
|
yield gauge
|
|
|
|
def _register_with_collector(self) -> None:
|
|
if self.name in all_gauges.keys():
|
|
logger.warning("%s already registered, reregistering", self.name)
|
|
REGISTRY.unregister(all_gauges.pop(self.name))
|
|
|
|
REGISTRY.register(self)
|
|
all_gauges[self.name] = self
|
|
|
|
|
|
class GaugeHistogramMetricFamilyWithLabels(GaugeHistogramMetricFamily):
|
|
"""
|
|
Custom version of `GaugeHistogramMetricFamily` from `prometheus_client` that allows
|
|
specifying labels and label values.
|
|
|
|
A single gauge histogram and its samples.
|
|
|
|
For use by custom collectors.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
name: str,
|
|
documentation: str,
|
|
gsum_value: float,
|
|
buckets: Optional[Sequence[Tuple[str, float]]] = None,
|
|
labelnames: StrSequence = (),
|
|
labelvalues: StrSequence = (),
|
|
unit: str = "",
|
|
):
|
|
# Sanity check the number of label values matches the number of label names.
|
|
if len(labelvalues) != len(labelnames):
|
|
raise ValueError(
|
|
"The number of label values must match the number of label names"
|
|
)
|
|
|
|
# Call the super to validate and set the labelnames. We use this stable API
|
|
# instead of setting the internal `_labelnames` field directly.
|
|
super().__init__(
|
|
name=name,
|
|
documentation=documentation,
|
|
labels=labelnames,
|
|
# Since `GaugeHistogramMetricFamily` doesn't support supplying `labels` and
|
|
# `buckets` at the same time (artificial limitation), we will just set these
|
|
# as `None` and set up the buckets ourselves just below.
|
|
buckets=None,
|
|
gsum_value=None,
|
|
)
|
|
|
|
# Create a gauge for each bucket.
|
|
if buckets is not None:
|
|
self.add_metric(labels=labelvalues, buckets=buckets, gsum_value=gsum_value)
|
|
|
|
|
|
class GaugeBucketCollector(Collector):
|
|
"""Like a Histogram, but the buckets are Gauges which are updated atomically.
|
|
|
|
The data is updated by calling `update_data` with an iterable of measurements.
|
|
|
|
We assume that the data is updated less frequently than it is reported to
|
|
Prometheus, and optimise for that case.
|
|
"""
|
|
|
|
__slots__ = (
|
|
"_name",
|
|
"_documentation",
|
|
"_labelnames",
|
|
"_bucket_bounds",
|
|
"_metric",
|
|
)
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
name: str,
|
|
documentation: str,
|
|
labelnames: Optional[StrSequence],
|
|
buckets: Iterable[float],
|
|
registry: CollectorRegistry = REGISTRY,
|
|
):
|
|
"""
|
|
Args:
|
|
name: base name of metric to be exported to Prometheus. (a _bucket suffix
|
|
will be added.)
|
|
documentation: help text for the metric
|
|
buckets: The top bounds of the buckets to report
|
|
registry: metric registry to register with
|
|
"""
|
|
self._name = name
|
|
self._documentation = documentation
|
|
self._labelnames = labelnames if labelnames else ()
|
|
|
|
# the tops of the buckets
|
|
self._bucket_bounds = [float(b) for b in buckets]
|
|
if self._bucket_bounds != sorted(self._bucket_bounds):
|
|
raise ValueError("Buckets not in sorted order")
|
|
|
|
if self._bucket_bounds[-1] != float("inf"):
|
|
self._bucket_bounds.append(float("inf"))
|
|
|
|
# We initially set this to None. We won't report metrics until
|
|
# this has been initialised after a successful data update
|
|
self._metric: Optional[GaugeHistogramMetricFamilyWithLabels] = None
|
|
|
|
registry.register(self)
|
|
|
|
def collect(self) -> Iterable[Metric]:
|
|
# Don't report metrics unless we've already collected some data
|
|
if self._metric is not None:
|
|
yield self._metric
|
|
|
|
def update_data(self, values: Iterable[float], labels: StrSequence = ()) -> None:
|
|
"""Update the data to be reported by the metric
|
|
|
|
The existing data is cleared, and each measurement in the input is assigned
|
|
to the relevant bucket.
|
|
|
|
Args:
|
|
values
|
|
labels
|
|
"""
|
|
self._metric = self._values_to_metric(values, labels)
|
|
|
|
def _values_to_metric(
|
|
self, values: Iterable[float], labels: StrSequence = ()
|
|
) -> GaugeHistogramMetricFamilyWithLabels:
|
|
"""
|
|
Args:
|
|
values
|
|
labels
|
|
"""
|
|
total = 0.0
|
|
bucket_values = [0 for _ in self._bucket_bounds]
|
|
|
|
for v in values:
|
|
# assign each value to a bucket
|
|
for i, bound in enumerate(self._bucket_bounds):
|
|
if v <= bound:
|
|
bucket_values[i] += 1
|
|
break
|
|
|
|
# ... and increment the sum
|
|
total += v
|
|
|
|
# now, aggregate the bucket values so that they count the number of entries in
|
|
# that bucket or below.
|
|
accumulated_values = itertools.accumulate(bucket_values)
|
|
|
|
# The decision to add `SERVER_NAME_LABEL` is from the `GaugeBucketCollector`
|
|
# usage itself (we don't enforce it here, one level up).
|
|
return GaugeHistogramMetricFamilyWithLabels( # type: ignore[missing-server-name-label]
|
|
name=self._name,
|
|
documentation=self._documentation,
|
|
labelnames=self._labelnames,
|
|
labelvalues=labels,
|
|
buckets=list(
|
|
zip((str(b) for b in self._bucket_bounds), accumulated_values)
|
|
),
|
|
gsum_value=total,
|
|
)
|
|
|
|
|
|
#
|
|
# Detailed CPU metrics
|
|
#
|
|
|
|
|
|
class CPUMetrics(Collector):
|
|
def __init__(self) -> None:
|
|
ticks_per_sec = 100
|
|
try:
|
|
# Try and get the system config
|
|
ticks_per_sec = os.sysconf("SC_CLK_TCK")
|
|
except (ValueError, TypeError, AttributeError):
|
|
pass
|
|
|
|
self.ticks_per_sec = ticks_per_sec
|
|
|
|
def collect(self) -> Iterable[Metric]:
|
|
if not HAVE_PROC_SELF_STAT:
|
|
return
|
|
|
|
with open("/proc/self/stat") as s:
|
|
line = s.read()
|
|
raw_stats = line.split(") ", 1)[1].split(" ")
|
|
|
|
# This is a process-level metric, so it does not have the `SERVER_NAME_LABEL`.
|
|
user = GaugeMetricFamily("process_cpu_user_seconds_total", "") # type: ignore[missing-server-name-label]
|
|
user.add_metric([], float(raw_stats[11]) / self.ticks_per_sec)
|
|
yield user
|
|
|
|
# This is a process-level metric, so it does not have the `SERVER_NAME_LABEL`.
|
|
sys = GaugeMetricFamily("process_cpu_system_seconds_total", "") # type: ignore[missing-server-name-label]
|
|
sys.add_metric([], float(raw_stats[12]) / self.ticks_per_sec)
|
|
yield sys
|
|
|
|
|
|
# This is a process-level metric, so it does not have the `SERVER_NAME_LABEL`.
|
|
REGISTRY.register(CPUMetrics()) # type: ignore[missing-server-name-label]
|
|
|
|
|
|
#
|
|
# Federation Metrics
|
|
#
|
|
|
|
sent_transactions_counter = Counter(
|
|
"synapse_federation_client_sent_transactions", "", labelnames=[SERVER_NAME_LABEL]
|
|
)
|
|
|
|
events_processed_counter = Counter(
|
|
"synapse_federation_client_events_processed", "", labelnames=[SERVER_NAME_LABEL]
|
|
)
|
|
|
|
event_processing_loop_counter = Counter(
|
|
"synapse_event_processing_loop_count",
|
|
"Event processing loop iterations",
|
|
labelnames=["name", SERVER_NAME_LABEL],
|
|
)
|
|
|
|
event_processing_loop_room_count = Counter(
|
|
"synapse_event_processing_loop_room_count",
|
|
"Rooms seen per event processing loop iteration",
|
|
labelnames=["name", SERVER_NAME_LABEL],
|
|
)
|
|
|
|
|
|
# Used to track where various components have processed in the event stream,
|
|
# e.g. federation sending, appservice sending, etc.
|
|
event_processing_positions = Gauge(
|
|
"synapse_event_processing_positions", "", labelnames=["name", SERVER_NAME_LABEL]
|
|
)
|
|
|
|
# Used to track the current max events stream position
|
|
event_persisted_position = Gauge(
|
|
"synapse_event_persisted_position", "", labelnames=[SERVER_NAME_LABEL]
|
|
)
|
|
|
|
# Used to track the received_ts of the last event processed by various
|
|
# components
|
|
event_processing_last_ts = Gauge(
|
|
"synapse_event_processing_last_ts", "", labelnames=["name", SERVER_NAME_LABEL]
|
|
)
|
|
|
|
# Used to track the lag processing events. This is the time difference
|
|
# between the last processed event's received_ts and the time it was
|
|
# finished being processed.
|
|
event_processing_lag = Gauge(
|
|
"synapse_event_processing_lag", "", labelnames=["name", SERVER_NAME_LABEL]
|
|
)
|
|
|
|
event_processing_lag_by_event = Histogram(
|
|
"synapse_event_processing_lag_by_event",
|
|
"Time between an event being persisted and it being queued up to be sent to the relevant remote servers",
|
|
labelnames=["name", SERVER_NAME_LABEL],
|
|
)
|
|
|
|
# Build info of the running server.
|
|
#
|
|
# This is a process-level metric, so it does not have the `SERVER_NAME_LABEL`. We
|
|
# consider this process-level because all Synapse homeservers running in the process
|
|
# will use the same Synapse version.
|
|
build_info = Gauge( # type: ignore[missing-server-name-label]
|
|
"synapse_build_info", "Build information", ["pythonversion", "version", "osversion"]
|
|
)
|
|
build_info.labels(
|
|
" ".join([platform.python_implementation(), platform.python_version()]),
|
|
SYNAPSE_VERSION,
|
|
" ".join([platform.system(), platform.release()]),
|
|
).set(1)
|
|
|
|
# 3PID send info
|
|
threepid_send_requests = Histogram(
|
|
"synapse_threepid_send_requests_with_tries",
|
|
documentation="Number of requests for a 3pid token by try count. Note if"
|
|
" there is a request with try count of 4, then there would have been one"
|
|
" each for 1, 2 and 3",
|
|
buckets=(1, 2, 3, 4, 5, 10),
|
|
labelnames=("type", "reason", SERVER_NAME_LABEL),
|
|
)
|
|
|
|
threadpool_total_threads = Gauge(
|
|
"synapse_threadpool_total_threads",
|
|
"Total number of threads currently in the threadpool",
|
|
labelnames=["name", SERVER_NAME_LABEL],
|
|
)
|
|
|
|
threadpool_total_working_threads = Gauge(
|
|
"synapse_threadpool_working_threads",
|
|
"Number of threads currently working in the threadpool",
|
|
labelnames=["name", SERVER_NAME_LABEL],
|
|
)
|
|
|
|
threadpool_total_min_threads = Gauge(
|
|
"synapse_threadpool_min_threads",
|
|
"Minimum number of threads configured in the threadpool",
|
|
labelnames=["name", SERVER_NAME_LABEL],
|
|
)
|
|
|
|
threadpool_total_max_threads = Gauge(
|
|
"synapse_threadpool_max_threads",
|
|
"Maximum number of threads configured in the threadpool",
|
|
labelnames=["name", SERVER_NAME_LABEL],
|
|
)
|
|
|
|
|
|
def register_threadpool(*, name: str, server_name: str, threadpool: ThreadPool) -> None:
|
|
"""
|
|
Add metrics for the threadpool.
|
|
|
|
Args:
|
|
name: The name of the threadpool, used to identify it in the metrics.
|
|
server_name: The homeserver name (used to label metrics) (this should be `hs.hostname`).
|
|
threadpool: The threadpool to register metrics for.
|
|
"""
|
|
|
|
threadpool_total_min_threads.labels(
|
|
name=name, **{SERVER_NAME_LABEL: server_name}
|
|
).set(threadpool.min)
|
|
threadpool_total_max_threads.labels(
|
|
name=name, **{SERVER_NAME_LABEL: server_name}
|
|
).set(threadpool.max)
|
|
|
|
threadpool_total_threads.labels(
|
|
name=name, **{SERVER_NAME_LABEL: server_name}
|
|
).set_function(lambda: len(threadpool.threads))
|
|
threadpool_total_working_threads.labels(
|
|
name=name, **{SERVER_NAME_LABEL: server_name}
|
|
).set_function(lambda: len(threadpool.working))
|
|
|
|
|
|
class MetricsResource(Resource):
|
|
"""
|
|
Twisted ``Resource`` that serves prometheus metrics.
|
|
"""
|
|
|
|
isLeaf = True
|
|
|
|
def __init__(self, registry: CollectorRegistry = REGISTRY):
|
|
self.registry = registry
|
|
|
|
def render_GET(self, request: Request) -> bytes:
|
|
request.setHeader(b"Content-Type", CONTENT_TYPE_LATEST.encode("ascii"))
|
|
response = generate_latest(self.registry)
|
|
request.setHeader(b"Content-Length", str(len(response)))
|
|
return response
|
|
|
|
|
|
__all__ = [
|
|
"Collector",
|
|
"MetricsResource",
|
|
"generate_latest",
|
|
"LaterGauge",
|
|
"InFlightGauge",
|
|
"GaugeBucketCollector",
|
|
"MIN_TIME_BETWEEN_GCS",
|
|
"install_gc_manager",
|
|
]
|