This PR reverts https://github.com/element-hq/synapse/pull/18751 ### Why revert? @reivilibre [found](https://matrix.to/#/!vcyiEtMVHIhWXcJAfl:sw1v.org/$u9OEmMxaFYUzWHhCk1A_r50Y0aGrtKEhepF7WxWJkUA?via=matrix.org&via=node.marinchik.ink&via=element.io) that our CI was failing in bizarre ways (thanks for stepping up to dive into this 🙇). Examples: - `twisted.internet.error.ProcessTerminated: A process has ended with a probable error condition: process ended by signal 9.` - `twisted.internet.error.ProcessTerminated: A process has ended with a probable error condition: process ended by signal 15.` <details> <summary>More detailed part of the log</summary> https://github.com/element-hq/synapse/actions/runs/16758038107/job/47500520633#step:9:6809 ``` tests.util.test_wheel_timer.WheelTimerTestCase.test_single_insert_fetch =============================================================================== Error: Traceback (most recent call last): File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/trial/_dist/disttrial.py", line 371, in task await worker.run(case, result) File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/trial/_dist/worker.py", line 305, in run return await self.callRemote(workercommands.Run, testCase=testCaseId) # type: ignore[no-any-return] File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/internet/defer.py", line 1187, in __iter__ yield self File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/internet/defer.py", line 1092, in _runCallbacks current.result = callback( # type: ignore[misc] File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/protocols/amp.py", line 1968, in _massageError error.trap(RemoteAmpError) File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/python/failure.py", line 431, in trap self.raiseException() File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/python/failure.py", line 455, in raiseException raise self.value.with_traceback(self.tb) twisted.internet.error.ProcessTerminated: A process has ended with a probable error condition: process ended by signal 9. tests.util.test_macaroons.MacaroonGeneratorTestCase.test_guest_access_token ------------------------------------------------------------------------------- Ran 4325 tests in 669.321s FAILED (skips=159, errors=62, successes=4108) while calling from thread Traceback (most recent call last): File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/internet/base.py", line 1064, in runUntilCurrent f(*a, **kw) File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/internet/base.py", line 790, in stop raise error.ReactorNotRunning("Can't stop reactor that isn't running.") twisted.internet.error.ReactorNotRunning: Can't stop reactor that isn't running. joining disttrial worker #0 failed Traceback (most recent call last): File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/internet/defer.py", line 1853, in _inlineCallbacks result = context.run( File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/python/failure.py", line 467, in throwExceptionIntoGenerator return g.throw(self.value.with_traceback(self.tb)) File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/trial/_dist/worker.py", line 406, in exit await endDeferred File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/internet/defer.py", line 1187, in __iter__ yield self twisted.internet.error.ProcessTerminated: A process has ended with a probable error condition: process ended by signal 15. ``` </details> With more debugging (thanks @devonh for also stepping in as maintainer), we were finding that the CI was consistently failing at `test_exposed_to_prometheus` which was a bit of smoke because of all of the [metrics changes](https://github.com/element-hq/synapse/issues/18592) that were merged recently. Locally, although I wasn't able to reproduce the bizarre errors, I could easily see increased memory usage (~20GB vs ~2GB) and the `test_exposed_to_prometheus` test taking a while to complete when running a full test run (`SYNAPSE_TEST_LOG_LEVEL=INFO poetry run trial tests`). <img width="1485" height="78" alt="Lots of memory usage" src="https://github.com/user-attachments/assets/811e2a96-75e5-4a3c-966c-00dc0512cea9" /> After updating `test_exposed_to_prometheus` to dump the `latest_metrics_response = generate_latest(REGISTRY)`, I could see that it's a massive 3.2GB response. Inspecting the contents, we can see 4.1M (4,137,123) entries for just `synapse_background_update_status{server_name="test"} 3.0` which is a `LaterGauge`. I don't think we have 4.1M test cases so it's also unclear why we end up with so many samples but it does make sense that we do see a lot of duplicates because each `HomeserverTestCase` will create a homeserver for each test case that will `LaterGauge.register_hook(...)` (part of the https://github.com/element-hq/synapse/pull/18751 changes). `tests/storage/databases/main/test_metrics.py` ```python latest_metrics_response = generate_latest(REGISTRY) with open("/tmp/synapse-test-metrics", "wb") as f: f.write(latest_metrics_response) ``` After reverting the https://github.com/element-hq/synapse/pull/18751 changes, running the full test suite locally doesn't result in memory spikes and seems to run normally. ### Dev notes Discussion in the [`#synapse-dev:matrix.org`](https://matrix.to/#/!vcyiEtMVHIhWXcJAfl:sw1v.org/$vkMATs04yqZggVVd6Noop5nU8M2DVoTkrAWshw7u1-w?via=matrix.org&via=node.marinchik.ink&via=element.io) room. ### Pull Request Checklist <!-- Please read https://element-hq.github.io/synapse/latest/development/contributing_guide.html before submitting your pull request --> * [x] Pull request is based on the develop branch * [ ] Pull request includes a [changelog file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog). The entry should: - Be a short description of your change which makes sense to users. "Fixed a bug that prevented receiving messages from other servers." instead of "Moved X method from `EventStore` to `EventWorkerStore`.". - Use markdown where necessary, mostly for `code blocks`. - End with either a period (.) or an exclamation mark (!). - Start with a capital letter. - Feel free to credit yourself, by adding a sentence "Contributed by @github_username." or "Contributed by [Your Name]." to the end of the entry. * [ ] [Code style](https://element-hq.github.io/synapse/latest/code_style.html) is correct (run the [linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters))
309 lines
11 KiB
Python
309 lines
11 KiB
Python
#
|
|
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
|
#
|
|
# Copyright 2019 Matrix.org Foundation C.I.C.
|
|
# Copyright (C) 2023 New Vector, Ltd
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as
|
|
# published by the Free Software Foundation, either version 3 of the
|
|
# License, or (at your option) any later version.
|
|
#
|
|
# See the GNU Affero General Public License for more details:
|
|
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
|
#
|
|
# Originally licensed under the Apache License, Version 2.0:
|
|
# <http://www.apache.org/licenses/LICENSE-2.0>.
|
|
#
|
|
# [This file includes modifications made by New Vector Limited]
|
|
#
|
|
#
|
|
from typing import Dict, Protocol, Tuple
|
|
|
|
from prometheus_client.core import Sample
|
|
|
|
from synapse.metrics import REGISTRY, InFlightGauge, generate_latest
|
|
from synapse.util.caches.deferred_cache import DeferredCache
|
|
|
|
from tests import unittest
|
|
|
|
|
|
def get_sample_labels_value(sample: Sample) -> Tuple[Dict[str, str], float]:
|
|
"""Extract the labels and values of a sample.
|
|
|
|
prometheus_client 0.5 changed the sample type to a named tuple with more
|
|
members than the plain tuple had in 0.4 and earlier. This function can
|
|
extract the labels and value from the sample for both sample types.
|
|
|
|
Args:
|
|
sample: The sample to get the labels and value from.
|
|
Returns:
|
|
A tuple of (labels, value) from the sample.
|
|
"""
|
|
|
|
# If the sample has a labels and value attribute, use those.
|
|
if hasattr(sample, "labels") and hasattr(sample, "value"):
|
|
return sample.labels, sample.value
|
|
# Otherwise fall back to treating it as a plain 3 tuple.
|
|
else:
|
|
# In older versions of prometheus_client Sample was a 3-tuple.
|
|
labels: Dict[str, str]
|
|
value: float
|
|
_, labels, value = sample # type: ignore[misc]
|
|
return labels, value
|
|
|
|
|
|
class TestMauLimit(unittest.TestCase):
|
|
def test_basic(self) -> None:
|
|
class MetricEntry(Protocol):
|
|
foo: int
|
|
bar: int
|
|
|
|
# This is a test and does not matter if it uses `SERVER_NAME_LABEL`.
|
|
gauge: InFlightGauge[MetricEntry] = InFlightGauge( # type: ignore[missing-server-name-label]
|
|
"test1", "", labels=["test_label"], sub_metrics=["foo", "bar"]
|
|
)
|
|
|
|
def handle1(metrics: MetricEntry) -> None:
|
|
metrics.foo += 2
|
|
metrics.bar = max(metrics.bar, 5)
|
|
|
|
def handle2(metrics: MetricEntry) -> None:
|
|
metrics.foo += 3
|
|
metrics.bar = max(metrics.bar, 7)
|
|
|
|
gauge.register(("key1",), handle1)
|
|
|
|
self.assert_dict(
|
|
{
|
|
"test1_total": {("key1",): 1},
|
|
"test1_foo": {("key1",): 2},
|
|
"test1_bar": {("key1",): 5},
|
|
},
|
|
self.get_metrics_from_gauge(gauge),
|
|
)
|
|
|
|
gauge.unregister(("key1",), handle1)
|
|
|
|
self.assert_dict(
|
|
{
|
|
"test1_total": {("key1",): 0},
|
|
"test1_foo": {("key1",): 0},
|
|
"test1_bar": {("key1",): 0},
|
|
},
|
|
self.get_metrics_from_gauge(gauge),
|
|
)
|
|
|
|
gauge.register(("key1",), handle1)
|
|
gauge.register(("key2",), handle2)
|
|
|
|
self.assert_dict(
|
|
{
|
|
"test1_total": {("key1",): 1, ("key2",): 1},
|
|
"test1_foo": {("key1",): 2, ("key2",): 3},
|
|
"test1_bar": {("key1",): 5, ("key2",): 7},
|
|
},
|
|
self.get_metrics_from_gauge(gauge),
|
|
)
|
|
|
|
gauge.unregister(("key2",), handle2)
|
|
gauge.register(("key1",), handle2)
|
|
|
|
self.assert_dict(
|
|
{
|
|
"test1_total": {("key1",): 2, ("key2",): 0},
|
|
"test1_foo": {("key1",): 5, ("key2",): 0},
|
|
"test1_bar": {("key1",): 7, ("key2",): 0},
|
|
},
|
|
self.get_metrics_from_gauge(gauge),
|
|
)
|
|
|
|
def get_metrics_from_gauge(
|
|
self, gauge: InFlightGauge
|
|
) -> Dict[str, Dict[Tuple[str, ...], float]]:
|
|
results = {}
|
|
|
|
for r in gauge.collect():
|
|
results[r.name] = {
|
|
tuple(labels[x] for x in gauge.labels): value
|
|
for labels, value in map(get_sample_labels_value, r.samples)
|
|
}
|
|
|
|
return results
|
|
|
|
|
|
class BuildInfoTests(unittest.TestCase):
|
|
def test_get_build(self) -> None:
|
|
"""
|
|
The synapse_build_info metric reports the OS version, Python version,
|
|
and Synapse version.
|
|
"""
|
|
items = list(
|
|
filter(
|
|
lambda x: b"synapse_build_info{" in x,
|
|
generate_latest(REGISTRY).split(b"\n"),
|
|
)
|
|
)
|
|
self.assertEqual(len(items), 1)
|
|
self.assertTrue(b"osversion=" in items[0])
|
|
self.assertTrue(b"pythonversion=" in items[0])
|
|
self.assertTrue(b"version=" in items[0])
|
|
|
|
|
|
class CacheMetricsTests(unittest.HomeserverTestCase):
|
|
def test_cache_metric(self) -> None:
|
|
"""
|
|
Caches produce metrics reflecting their state when scraped.
|
|
"""
|
|
CACHE_NAME = "cache_metrics_test_fgjkbdfg"
|
|
cache: DeferredCache[str, str] = DeferredCache(
|
|
name=CACHE_NAME, server_name=self.hs.hostname, max_entries=777
|
|
)
|
|
|
|
metrics_map = get_latest_metrics()
|
|
|
|
cache_size_metric = f'synapse_util_caches_cache_size{{name="{CACHE_NAME}",server_name="{self.hs.hostname}"}}'
|
|
cache_max_size_metric = f'synapse_util_caches_cache_max_size{{name="{CACHE_NAME}",server_name="{self.hs.hostname}"}}'
|
|
|
|
cache_size_metric_value = metrics_map.get(cache_size_metric)
|
|
self.assertIsNotNone(
|
|
cache_size_metric_value,
|
|
f"Missing metric {cache_size_metric} in cache metrics {metrics_map}",
|
|
)
|
|
cache_max_size_metric_value = metrics_map.get(cache_max_size_metric)
|
|
self.assertIsNotNone(
|
|
cache_max_size_metric_value,
|
|
f"Missing metric {cache_max_size_metric} in cache metrics {metrics_map}",
|
|
)
|
|
|
|
self.assertEqual(cache_size_metric_value, "0.0")
|
|
self.assertEqual(cache_max_size_metric_value, "777.0")
|
|
|
|
cache.prefill("1", "hi")
|
|
|
|
metrics_map = get_latest_metrics()
|
|
|
|
cache_size_metric_value = metrics_map.get(cache_size_metric)
|
|
self.assertIsNotNone(
|
|
cache_size_metric_value,
|
|
f"Missing metric {cache_size_metric} in cache metrics {metrics_map}",
|
|
)
|
|
cache_max_size_metric_value = metrics_map.get(cache_max_size_metric)
|
|
self.assertIsNotNone(
|
|
cache_max_size_metric_value,
|
|
f"Missing metric {cache_max_size_metric} in cache metrics {metrics_map}",
|
|
)
|
|
|
|
self.assertEqual(cache_size_metric_value, "1.0")
|
|
self.assertEqual(cache_max_size_metric_value, "777.0")
|
|
|
|
def test_cache_metric_multiple_servers(self) -> None:
|
|
"""
|
|
Test that cache metrics are reported correctly across multiple servers. We will
|
|
have an metrics entry for each homeserver that is labeled with the `server_name`
|
|
label.
|
|
"""
|
|
CACHE_NAME = "cache_metric_multiple_servers_test"
|
|
cache1: DeferredCache[str, str] = DeferredCache(
|
|
name=CACHE_NAME, server_name="hs1", max_entries=777
|
|
)
|
|
cache2: DeferredCache[str, str] = DeferredCache(
|
|
name=CACHE_NAME, server_name="hs2", max_entries=777
|
|
)
|
|
|
|
metrics_map = get_latest_metrics()
|
|
|
|
hs1_cache_size_metric = (
|
|
f'synapse_util_caches_cache_size{{name="{CACHE_NAME}",server_name="hs1"}}'
|
|
)
|
|
hs2_cache_size_metric = (
|
|
f'synapse_util_caches_cache_size{{name="{CACHE_NAME}",server_name="hs2"}}'
|
|
)
|
|
hs1_cache_max_size_metric = f'synapse_util_caches_cache_max_size{{name="{CACHE_NAME}",server_name="hs1"}}'
|
|
hs2_cache_max_size_metric = f'synapse_util_caches_cache_max_size{{name="{CACHE_NAME}",server_name="hs2"}}'
|
|
|
|
# Find the metrics for the caches from both homeservers
|
|
hs1_cache_size_metric_value = metrics_map.get(hs1_cache_size_metric)
|
|
self.assertIsNotNone(
|
|
hs1_cache_size_metric_value,
|
|
f"Missing metric {hs1_cache_size_metric} in cache metrics {metrics_map}",
|
|
)
|
|
hs2_cache_size_metric_value = metrics_map.get(hs2_cache_size_metric)
|
|
self.assertIsNotNone(
|
|
hs2_cache_size_metric_value,
|
|
f"Missing metric {hs2_cache_size_metric} in cache metrics {metrics_map}",
|
|
)
|
|
hs1_cache_max_size_metric_value = metrics_map.get(hs1_cache_max_size_metric)
|
|
self.assertIsNotNone(
|
|
hs1_cache_max_size_metric_value,
|
|
f"Missing metric {hs1_cache_max_size_metric} in cache metrics {metrics_map}",
|
|
)
|
|
hs2_cache_max_size_metric_value = metrics_map.get(hs2_cache_max_size_metric)
|
|
self.assertIsNotNone(
|
|
hs2_cache_max_size_metric_value,
|
|
f"Missing metric {hs2_cache_max_size_metric} in cache metrics {metrics_map}",
|
|
)
|
|
|
|
# Sanity check the metric values
|
|
self.assertEqual(hs1_cache_size_metric_value, "0.0")
|
|
self.assertEqual(hs2_cache_size_metric_value, "0.0")
|
|
self.assertEqual(hs1_cache_max_size_metric_value, "777.0")
|
|
self.assertEqual(hs2_cache_max_size_metric_value, "777.0")
|
|
|
|
# Add something to both caches to change the numbers
|
|
cache1.prefill("1", "hi")
|
|
cache2.prefill("2", "ho")
|
|
|
|
metrics_map = get_latest_metrics()
|
|
|
|
# Find the metrics for the caches from both homeservers
|
|
hs1_cache_size_metric_value = metrics_map.get(hs1_cache_size_metric)
|
|
self.assertIsNotNone(
|
|
hs1_cache_size_metric_value,
|
|
f"Missing metric {hs1_cache_size_metric} in cache metrics {metrics_map}",
|
|
)
|
|
hs2_cache_size_metric_value = metrics_map.get(hs2_cache_size_metric)
|
|
self.assertIsNotNone(
|
|
hs2_cache_size_metric_value,
|
|
f"Missing metric {hs2_cache_size_metric} in cache metrics {metrics_map}",
|
|
)
|
|
hs1_cache_max_size_metric_value = metrics_map.get(hs1_cache_max_size_metric)
|
|
self.assertIsNotNone(
|
|
hs1_cache_max_size_metric_value,
|
|
f"Missing metric {hs1_cache_max_size_metric} in cache metrics {metrics_map}",
|
|
)
|
|
hs2_cache_max_size_metric_value = metrics_map.get(hs2_cache_max_size_metric)
|
|
self.assertIsNotNone(
|
|
hs2_cache_max_size_metric_value,
|
|
f"Missing metric {hs2_cache_max_size_metric} in cache metrics {metrics_map}",
|
|
)
|
|
|
|
# Sanity check the metric values
|
|
self.assertEqual(hs1_cache_size_metric_value, "1.0")
|
|
self.assertEqual(hs2_cache_size_metric_value, "1.0")
|
|
self.assertEqual(hs1_cache_max_size_metric_value, "777.0")
|
|
self.assertEqual(hs2_cache_max_size_metric_value, "777.0")
|
|
|
|
|
|
def get_latest_metrics() -> Dict[str, str]:
|
|
"""
|
|
Collect the latest metrics from the registry and parse them into an easy to use map.
|
|
The key includes the metric name and labels.
|
|
|
|
Example output:
|
|
{
|
|
"synapse_util_caches_cache_size": "0.0",
|
|
"synapse_util_caches_cache_max_size{name="some_cache",server_name="hs1"}": "777.0",
|
|
...
|
|
}
|
|
"""
|
|
metric_map = {
|
|
x.split(b" ")[0].decode("ascii"): x.split(b" ")[1].decode("ascii")
|
|
for x in filter(
|
|
lambda x: len(x) > 0 and not x.startswith(b"#"),
|
|
generate_latest(REGISTRY).split(b"\n"),
|
|
)
|
|
}
|
|
|
|
return metric_map
|