From ff03a51cb08f48efb0eded36fbd0e72446dfb67e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 6 Aug 2025 17:14:40 -0500 Subject: [PATCH] Revert "Fix `LaterGauge` metrics to collect from all servers (#18751)" (#18789) This PR reverts https://github.com/element-hq/synapse/pull/18751 ### Why revert? @reivilibre [found](https://matrix.to/#/!vcyiEtMVHIhWXcJAfl:sw1v.org/$u9OEmMxaFYUzWHhCk1A_r50Y0aGrtKEhepF7WxWJkUA?via=matrix.org&via=node.marinchik.ink&via=element.io) that our CI was failing in bizarre ways (thanks for stepping up to dive into this :bow:). Examples: - `twisted.internet.error.ProcessTerminated: A process has ended with a probable error condition: process ended by signal 9.` - `twisted.internet.error.ProcessTerminated: A process has ended with a probable error condition: process ended by signal 15.`
More detailed part of the log https://github.com/element-hq/synapse/actions/runs/16758038107/job/47500520633#step:9:6809 ``` tests.util.test_wheel_timer.WheelTimerTestCase.test_single_insert_fetch =============================================================================== Error: Traceback (most recent call last): File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/trial/_dist/disttrial.py", line 371, in task await worker.run(case, result) File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/trial/_dist/worker.py", line 305, in run return await self.callRemote(workercommands.Run, testCase=testCaseId) # type: ignore[no-any-return] File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/internet/defer.py", line 1187, in __iter__ yield self File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/internet/defer.py", line 1092, in _runCallbacks current.result = callback( # type: ignore[misc] File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/protocols/amp.py", line 1968, in _massageError error.trap(RemoteAmpError) File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/python/failure.py", line 431, in trap self.raiseException() File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/python/failure.py", line 455, in raiseException raise self.value.with_traceback(self.tb) twisted.internet.error.ProcessTerminated: A process has ended with a probable error condition: process ended by signal 9. tests.util.test_macaroons.MacaroonGeneratorTestCase.test_guest_access_token ------------------------------------------------------------------------------- Ran 4325 tests in 669.321s FAILED (skips=159, errors=62, successes=4108) while calling from thread Traceback (most recent call last): File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/internet/base.py", line 1064, in runUntilCurrent f(*a, **kw) File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/internet/base.py", line 790, in stop raise error.ReactorNotRunning("Can't stop reactor that isn't running.") twisted.internet.error.ReactorNotRunning: Can't stop reactor that isn't running. joining disttrial worker #0 failed Traceback (most recent call last): File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/internet/defer.py", line 1853, in _inlineCallbacks result = context.run( File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/python/failure.py", line 467, in throwExceptionIntoGenerator return g.throw(self.value.with_traceback(self.tb)) File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/trial/_dist/worker.py", line 406, in exit await endDeferred File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/internet/defer.py", line 1187, in __iter__ yield self twisted.internet.error.ProcessTerminated: A process has ended with a probable error condition: process ended by signal 15. ```
With more debugging (thanks @devonh for also stepping in as maintainer), we were finding that the CI was consistently failing at `test_exposed_to_prometheus` which was a bit of smoke because of all of the [metrics changes](https://github.com/element-hq/synapse/issues/18592) that were merged recently. Locally, although I wasn't able to reproduce the bizarre errors, I could easily see increased memory usage (~20GB vs ~2GB) and the `test_exposed_to_prometheus` test taking a while to complete when running a full test run (`SYNAPSE_TEST_LOG_LEVEL=INFO poetry run trial tests`). Lots of memory usage After updating `test_exposed_to_prometheus` to dump the `latest_metrics_response = generate_latest(REGISTRY)`, I could see that it's a massive 3.2GB response. Inspecting the contents, we can see 4.1M (4,137,123) entries for just `synapse_background_update_status{server_name="test"} 3.0` which is a `LaterGauge`. I don't think we have 4.1M test cases so it's also unclear why we end up with so many samples but it does make sense that we do see a lot of duplicates because each `HomeserverTestCase` will create a homeserver for each test case that will `LaterGauge.register_hook(...)` (part of the https://github.com/element-hq/synapse/pull/18751 changes). `tests/storage/databases/main/test_metrics.py` ```python latest_metrics_response = generate_latest(REGISTRY) with open("/tmp/synapse-test-metrics", "wb") as f: f.write(latest_metrics_response) ``` After reverting the https://github.com/element-hq/synapse/pull/18751 changes, running the full test suite locally doesn't result in memory spikes and seems to run normally. ### Dev notes Discussion in the [`#synapse-dev:matrix.org`](https://matrix.to/#/!vcyiEtMVHIhWXcJAfl:sw1v.org/$vkMATs04yqZggVVd6Noop5nU8M2DVoTkrAWshw7u1-w?via=matrix.org&via=node.marinchik.ink&via=element.io) room. ### Pull Request Checklist * [x] Pull request is based on the develop branch * [ ] Pull request includes a [changelog file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog). The entry should: - Be a short description of your change which makes sense to users. "Fixed a bug that prevented receiving messages from other servers." instead of "Moved X method from `EventStore` to `EventWorkerStore`.". - Use markdown where necessary, mostly for `code blocks`. - End with either a period (.) or an exclamation mark (!). - Start with a capital letter. - Feel free to credit yourself, by adding a sentence "Contributed by @github_username." or "Contributed by [Your Name]." to the end of the entry. * [ ] [Code style](https://element-hq.github.io/synapse/latest/code_style.html) is correct (run the [linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters)) --- changelog.d/18751.misc | 1 - synapse/federation/send_queue.py | 42 +++++------- synapse/federation/sender/__init__.py | 46 ++++++------- synapse/handlers/presence.py | 26 +++----- synapse/http/request_metrics.py | 4 +- synapse/metrics/__init__.py | 68 +++++++++++--------- synapse/notifier.py | 42 ++++++------ synapse/replication/tcp/handler.py | 28 ++++---- synapse/replication/tcp/protocol.py | 24 +++---- synapse/storage/database.py | 13 ++-- synapse/storage/databases/main/roommember.py | 14 ++-- synapse/util/ratelimitutils.py | 16 ++--- synapse/util/task_scheduler.py | 14 ++-- tests/metrics/test_metrics.py | 44 +------------ 14 files changed, 141 insertions(+), 241 deletions(-) delete mode 100644 changelog.d/18751.misc diff --git a/changelog.d/18751.misc b/changelog.d/18751.misc deleted file mode 100644 index 6ecd49828..000000000 --- a/changelog.d/18751.misc +++ /dev/null @@ -1 +0,0 @@ -Fix `LaterGauge` metrics to collect from all servers. diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 1e9722e0d..7f511d570 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -37,7 +37,6 @@ Events are replicated via a separate events stream. """ import logging -from enum import Enum from typing import ( TYPE_CHECKING, Dict, @@ -68,25 +67,6 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -class QueueNames(str, Enum): - PRESENCE_MAP = "presence_map" - KEYED_EDU = "keyed_edu" - KEYED_EDU_CHANGED = "keyed_edu_changed" - EDUS = "edus" - POS_TIME = "pos_time" - PRESENCE_DESTINATIONS = "presence_destinations" - - -queue_name_to_gauge_map: Dict[QueueNames, LaterGauge] = {} - -for queue_name in QueueNames: - queue_name_to_gauge_map[queue_name] = LaterGauge( - name=f"synapse_federation_send_queue_{queue_name.value}_size", - desc="", - labelnames=[SERVER_NAME_LABEL], - ) - - class FederationRemoteSendQueue(AbstractFederationSender): """A drop in replacement for FederationSender""" @@ -131,15 +111,23 @@ class FederationRemoteSendQueue(AbstractFederationSender): # we make a new function, so we need to make a new function so the inner # lambda binds to the queue rather than to the name of the queue which # changes. ARGH. - def register(queue_name: QueueNames, queue: Sized) -> None: - queue_name_to_gauge_map[queue_name].register_hook( - lambda: {(self.server_name,): len(queue)} + def register(name: str, queue: Sized) -> None: + LaterGauge( + name="synapse_federation_send_queue_%s_size" % (queue_name,), + desc="", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: {(self.server_name,): len(queue)}, ) - for queue_name in QueueNames: - queue = getattr(self, queue_name.value) - assert isinstance(queue, Sized) - register(queue_name, queue=queue) + for queue_name in [ + "presence_map", + "keyed_edu", + "keyed_edu_changed", + "edus", + "pos_time", + "presence_destinations", + ]: + register(queue_name, getattr(self, queue_name)) self.clock.looping_call(self._clear_queue, 30 * 1000) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 21af12354..8befbe372 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -199,24 +199,6 @@ sent_pdus_destination_dist_total = Counter( labelnames=[SERVER_NAME_LABEL], ) -transaction_queue_pending_destinations_gauge = LaterGauge( - name="synapse_federation_transaction_queue_pending_destinations", - desc="", - labelnames=[SERVER_NAME_LABEL], -) - -transaction_queue_pending_pdus_gauge = LaterGauge( - name="synapse_federation_transaction_queue_pending_pdus", - desc="", - labelnames=[SERVER_NAME_LABEL], -) - -transaction_queue_pending_edus_gauge = LaterGauge( - name="synapse_federation_transaction_queue_pending_edus", - desc="", - labelnames=[SERVER_NAME_LABEL], -) - # Time (in s) to wait before trying to wake up destinations that have # catch-up outstanding. # Please note that rate limiting still applies, so while the loop is @@ -416,28 +398,38 @@ class FederationSender(AbstractFederationSender): # map from destination to PerDestinationQueue self._per_destination_queues: Dict[str, PerDestinationQueue] = {} - transaction_queue_pending_destinations_gauge.register_hook( - lambda: { + LaterGauge( + name="synapse_federation_transaction_queue_pending_destinations", + desc="", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: { (self.server_name,): sum( 1 for d in self._per_destination_queues.values() if d.transmission_loop_running ) - } + }, ) - transaction_queue_pending_pdus_gauge.register_hook( - lambda: { + + LaterGauge( + name="synapse_federation_transaction_queue_pending_pdus", + desc="", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: { (self.server_name,): sum( d.pending_pdu_count() for d in self._per_destination_queues.values() ) - } + }, ) - transaction_queue_pending_edus_gauge.register_hook( - lambda: { + LaterGauge( + name="synapse_federation_transaction_queue_pending_edus", + desc="", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: { (self.server_name,): sum( d.pending_edu_count() for d in self._per_destination_queues.values() ) - } + }, ) self._is_processing = False diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index fb9f96267..b25311749 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -173,18 +173,6 @@ state_transition_counter = Counter( labelnames=["locality", "from", "to", SERVER_NAME_LABEL], ) -presence_user_to_current_state_size_gauge = LaterGauge( - name="synapse_handlers_presence_user_to_current_state_size", - desc="", - labelnames=[SERVER_NAME_LABEL], -) - -presence_wheel_timer_size_gauge = LaterGauge( - name="synapse_handlers_presence_wheel_timer_size", - desc="", - labelnames=[SERVER_NAME_LABEL], -) - # If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them # "currently_active" LAST_ACTIVE_GRANULARITY = 60 * 1000 @@ -791,8 +779,11 @@ class PresenceHandler(BasePresenceHandler): EduTypes.PRESENCE, self.incoming_presence ) - presence_user_to_current_state_size_gauge.register_hook( - lambda: {(self.server_name,): len(self.user_to_current_state)} + LaterGauge( + name="synapse_handlers_presence_user_to_current_state_size", + desc="", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: {(self.server_name,): len(self.user_to_current_state)}, ) # The per-device presence state, maps user to devices to per-device presence state. @@ -891,8 +882,11 @@ class PresenceHandler(BasePresenceHandler): 60 * 1000, ) - presence_wheel_timer_size_gauge.register_hook( - lambda: {(self.server_name,): len(self.wheel_timer)} + LaterGauge( + name="synapse_handlers_presence_wheel_timer_size", + desc="", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: {(self.server_name,): len(self.wheel_timer)}, ) # Used to handle sending of presence to newly joined users/servers diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py index c5274c758..a9b049f90 100644 --- a/synapse/http/request_metrics.py +++ b/synapse/http/request_metrics.py @@ -164,12 +164,12 @@ def _get_in_flight_counts() -> Mapping[Tuple[str, ...], int]: return counts -in_flight_requests = LaterGauge( +LaterGauge( name="synapse_http_server_in_flight_requests_count", desc="", labelnames=["method", "servlet", SERVER_NAME_LABEL], + caller=_get_in_flight_counts, ) -in_flight_requests.register_hook(_get_in_flight_counts) class RequestMetrics: diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 8c99d3c77..11e2551a1 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -31,7 +31,6 @@ from typing import ( Dict, Generic, Iterable, - List, Mapping, Optional, Sequence, @@ -74,6 +73,8 @@ logger = logging.getLogger(__name__) METRICS_PREFIX = "/_synapse/metrics" +all_gauges: Dict[str, Collector] = {} + HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat") SERVER_NAME_LABEL = "server_name" @@ -162,47 +163,42 @@ class LaterGauge(Collector): name: str desc: str labelnames: Optional[StrSequence] = attr.ib(hash=False) - # List of callbacks: each callback should either return a value (if there are no - # labels for this metric), or dict mapping from a label tuple to a value - _hooks: List[ - Callable[ - [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]] - ] - ] = attr.ib(factory=list, hash=False) + # callback: should either return a value (if there are no labels for this metric), + # or dict mapping from a label tuple to a value + caller: Callable[ + [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]] + ] def collect(self) -> Iterable[Metric]: # The decision to add `SERVER_NAME_LABEL` is from the `LaterGauge` usage itself # (we don't enforce it here, one level up). g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames) # type: ignore[missing-server-name-label] - for hook in self._hooks: - try: - hook_result = hook() - except Exception: - logger.exception( - "Exception running callback for LaterGauge(%s)", self.name - ) - yield g - return - - if isinstance(hook_result, (int, float)): - g.add_metric([], hook_result) - else: - for k, v in hook_result.items(): - g.add_metric(k, v) - + try: + calls = self.caller() + except Exception: + logger.exception("Exception running callback for LaterGauge(%s)", self.name) yield g + return - def register_hook( - self, - hook: Callable[ - [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]] - ], - ) -> None: - self._hooks.append(hook) + if isinstance(calls, (int, float)): + g.add_metric([], calls) + else: + for k, v in calls.items(): + g.add_metric(k, v) + + yield g def __attrs_post_init__(self) -> None: + self._register() + + def _register(self) -> None: + if self.name in all_gauges.keys(): + logger.warning("%s already registered, reregistering", self.name) + REGISTRY.unregister(all_gauges.pop(self.name)) + REGISTRY.register(self) + all_gauges[self.name] = self # `MetricsEntry` only makes sense when it is a `Protocol`, @@ -254,7 +250,7 @@ class InFlightGauge(Generic[MetricsEntry], Collector): # Protects access to _registrations self._lock = threading.Lock() - REGISTRY.register(self) + self._register_with_collector() def register( self, @@ -345,6 +341,14 @@ class InFlightGauge(Generic[MetricsEntry], Collector): gauge.add_metric(labels=key, value=getattr(metrics, name)) yield gauge + def _register_with_collector(self) -> None: + if self.name in all_gauges.keys(): + logger.warning("%s already registered, reregistering", self.name) + REGISTRY.unregister(all_gauges.pop(self.name)) + + REGISTRY.register(self) + all_gauges[self.name] = self + class GaugeHistogramMetricFamilyWithLabels(GaugeHistogramMetricFamily): """ diff --git a/synapse/notifier.py b/synapse/notifier.py index d56a7b26b..448a715e2 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -86,24 +86,6 @@ users_woken_by_stream_counter = Counter( labelnames=["stream", SERVER_NAME_LABEL], ) - -notifier_listeners_gauge = LaterGauge( - name="synapse_notifier_listeners", - desc="", - labelnames=[SERVER_NAME_LABEL], -) - -notifier_rooms_gauge = LaterGauge( - name="synapse_notifier_rooms", - desc="", - labelnames=[SERVER_NAME_LABEL], -) -notifier_users_gauge = LaterGauge( - name="synapse_notifier_users", - desc="", - labelnames=[SERVER_NAME_LABEL], -) - T = TypeVar("T") @@ -299,16 +281,28 @@ class Notifier: ) } - notifier_listeners_gauge.register_hook(count_listeners) - notifier_rooms_gauge.register_hook( - lambda: { + LaterGauge( + name="synapse_notifier_listeners", + desc="", + labelnames=[SERVER_NAME_LABEL], + caller=count_listeners, + ) + + LaterGauge( + name="synapse_notifier_rooms", + desc="", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: { (self.server_name,): count( bool, list(self.room_to_user_streams.values()) ) - } + }, ) - notifier_users_gauge.register_hook( - lambda: {(self.server_name,): len(self.user_to_user_stream)} + LaterGauge( + name="synapse_notifier_users", + desc="", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: {(self.server_name,): len(self.user_to_user_stream)}, ) def add_replication_callback(self, cb: Callable[[], None]) -> None: diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index f033eaaeb..0f14c7e38 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -106,18 +106,6 @@ user_ip_cache_counter = Counter( "synapse_replication_tcp_resource_user_ip_cache", "", labelnames=[SERVER_NAME_LABEL] ) -tcp_resource_total_connections_gauge = LaterGauge( - name="synapse_replication_tcp_resource_total_connections", - desc="", - labelnames=[SERVER_NAME_LABEL], -) - -tcp_command_queue_gauge = LaterGauge( - name="synapse_replication_tcp_command_queue", - desc="Number of inbound RDATA/POSITION commands queued for processing", - labelnames=["stream_name", SERVER_NAME_LABEL], -) - # the type of the entries in _command_queues_by_stream _StreamCommandQueue = Deque[ @@ -255,8 +243,11 @@ class ReplicationCommandHandler: # outgoing replication commands to.) self._connections: List[IReplicationConnection] = [] - tcp_resource_total_connections_gauge.register_hook( - lambda: {(self.server_name,): len(self._connections)} + LaterGauge( + name="synapse_replication_tcp_resource_total_connections", + desc="", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: {(self.server_name,): len(self._connections)}, ) # When POSITION or RDATA commands arrive, we stick them in a queue and process @@ -275,11 +266,14 @@ class ReplicationCommandHandler: # from that connection. self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {} - tcp_command_queue_gauge.register_hook( - lambda: { + LaterGauge( + name="synapse_replication_tcp_command_queue", + desc="Number of inbound RDATA/POSITION commands queued for processing", + labelnames=["stream_name", SERVER_NAME_LABEL], + caller=lambda: { (stream_name, self.server_name): len(queue) for stream_name, queue in self._command_queues_by_stream.items() - } + }, ) self._is_master = hs.config.worker.worker_app is None diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 4d8381646..969f0303e 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -527,11 +527,9 @@ pending_commands = LaterGauge( name="synapse_replication_tcp_protocol_pending_commands", desc="", labelnames=["name", SERVER_NAME_LABEL], -) -pending_commands.register_hook( - lambda: { + caller=lambda: { (p.name, p.server_name): len(p.pending_commands) for p in connected_connections - } + }, ) @@ -546,11 +544,9 @@ transport_send_buffer = LaterGauge( name="synapse_replication_tcp_protocol_transport_send_buffer", desc="", labelnames=["name", SERVER_NAME_LABEL], -) -transport_send_buffer.register_hook( - lambda: { + caller=lambda: { (p.name, p.server_name): transport_buffer_size(p) for p in connected_connections - } + }, ) @@ -575,12 +571,10 @@ tcp_transport_kernel_send_buffer = LaterGauge( name="synapse_replication_tcp_protocol_transport_kernel_send_buffer", desc="", labelnames=["name", SERVER_NAME_LABEL], -) -tcp_transport_kernel_send_buffer.register_hook( - lambda: { + caller=lambda: { (p.name, p.server_name): transport_kernel_read_buffer_size(p, False) for p in connected_connections - } + }, ) @@ -588,10 +582,8 @@ tcp_transport_kernel_read_buffer = LaterGauge( name="synapse_replication_tcp_protocol_transport_kernel_read_buffer", desc="", labelnames=["name", SERVER_NAME_LABEL], -) -tcp_transport_kernel_read_buffer.register_hook( - lambda: { + caller=lambda: { (p.name, p.server_name): transport_kernel_read_buffer_size(p, True) for p in connected_connections - } + }, ) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index bbdc5b9d2..f7aec16c9 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -100,12 +100,6 @@ sql_txn_duration = Counter( labelnames=["desc", SERVER_NAME_LABEL], ) -background_update_status = LaterGauge( - name="synapse_background_update_status", - desc="Background update status", - labelnames=[SERVER_NAME_LABEL], -) - # Unique indexes which have been added in background updates. Maps from table name # to the name of the background update which added the unique index to that table. @@ -617,8 +611,11 @@ class DatabasePool: ) self.updates = BackgroundUpdater(hs, self) - background_update_status.register_hook( - lambda: {(self.server_name,): self.updates.get_status()}, + LaterGauge( + name="synapse_background_update_status", + desc="Background update status", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: {(self.server_name,): self.updates.get_status()}, ) self._previous_txn_total_time = 0.0 diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 94a1274ed..654250fad 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -84,13 +84,6 @@ _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME = "current_state_events_membership" _POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE = 1000 -federation_known_servers_gauge = LaterGauge( - name="synapse_federation_known_servers", - desc="", - labelnames=[SERVER_NAME_LABEL], -) - - @attr.s(frozen=True, slots=True, auto_attribs=True) class EventIdMembership: """Returned by `get_membership_from_event_ids`""" @@ -123,8 +116,11 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): 1, self._count_known_servers, ) - federation_known_servers_gauge.register_hook( - lambda: {(self.server_name,): self._known_servers_count} + LaterGauge( + name="synapse_federation_known_servers", + desc="", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: {(self.server_name,): self._known_servers_count}, ) @wrap_as_background_process("_count_known_servers") diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index b3c65676c..f5e592d80 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -131,31 +131,27 @@ def _get_counts_from_rate_limiter_instance( # We track the number of affected hosts per time-period so we can # differentiate one really noisy homeserver from a general # ratelimit tuning problem across the federation. -sleep_affected_hosts_gauge = LaterGauge( +LaterGauge( name="synapse_rate_limit_sleep_affected_hosts", desc="Number of hosts that had requests put to sleep", labelnames=["rate_limiter_name", SERVER_NAME_LABEL], -) -sleep_affected_hosts_gauge.register_hook( - lambda: _get_counts_from_rate_limiter_instance( + caller=lambda: _get_counts_from_rate_limiter_instance( lambda rate_limiter_instance: sum( ratelimiter.should_sleep() for ratelimiter in rate_limiter_instance.ratelimiters.values() ) - ) + ), ) -reject_affected_hosts_gauge = LaterGauge( +LaterGauge( name="synapse_rate_limit_reject_affected_hosts", desc="Number of hosts that had requests rejected", labelnames=["rate_limiter_name", SERVER_NAME_LABEL], -) -reject_affected_hosts_gauge.register_hook( - lambda: _get_counts_from_rate_limiter_instance( + caller=lambda: _get_counts_from_rate_limiter_instance( lambda rate_limiter_instance: sum( ratelimiter.should_reject() for ratelimiter in rate_limiter_instance.ratelimiters.values() ) - ) + ), ) diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 904f99fa4..fdcacdf12 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -44,13 +44,6 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -running_tasks_gauge = LaterGauge( - name="synapse_scheduler_running_tasks", - desc="The number of concurrent running tasks handled by the TaskScheduler", - labelnames=[SERVER_NAME_LABEL], -) - - class TaskScheduler: """ This is a simple task scheduler designed for resumable tasks. Normally, @@ -137,8 +130,11 @@ class TaskScheduler: TaskScheduler.SCHEDULE_INTERVAL_MS, ) - running_tasks_gauge.register_hook( - lambda: {(self.server_name,): len(self._running_tasks)} + LaterGauge( + name="synapse_scheduler_running_tasks", + desc="The number of concurrent running tasks handled by the TaskScheduler", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: {(self.server_name,): len(self._running_tasks)}, ) def register_action( diff --git a/tests/metrics/test_metrics.py b/tests/metrics/test_metrics.py index 5a3c3c1c4..61874564a 100644 --- a/tests/metrics/test_metrics.py +++ b/tests/metrics/test_metrics.py @@ -22,13 +22,7 @@ from typing import Dict, Protocol, Tuple from prometheus_client.core import Sample -from synapse.metrics import ( - REGISTRY, - SERVER_NAME_LABEL, - InFlightGauge, - LaterGauge, - generate_latest, -) +from synapse.metrics import REGISTRY, InFlightGauge, generate_latest from synapse.util.caches.deferred_cache import DeferredCache from tests import unittest @@ -291,42 +285,6 @@ class CacheMetricsTests(unittest.HomeserverTestCase): self.assertEqual(hs2_cache_max_size_metric_value, "777.0") -class LaterGaugeTests(unittest.HomeserverTestCase): - def test_later_gauge_multiple_servers(self) -> None: - """ - Test that LaterGauge metrics are reported correctly across multiple servers. We - will have an metrics entry for each homeserver that is labeled with the - `server_name` label. - """ - later_gauge = LaterGauge( - name="foo", - desc="", - labelnames=[SERVER_NAME_LABEL], - ) - later_gauge.register_hook(lambda: {("hs1",): 1}) - later_gauge.register_hook(lambda: {("hs2",): 2}) - - metrics_map = get_latest_metrics() - - # Find the metrics for the caches from both homeservers - hs1_metric = 'foo{server_name="hs1"}' - hs1_metric_value = metrics_map.get(hs1_metric) - self.assertIsNotNone( - hs1_metric_value, - f"Missing metric {hs1_metric} in cache metrics {metrics_map}", - ) - hs2_metric = 'foo{server_name="hs2"}' - hs2_metric_value = metrics_map.get(hs2_metric) - self.assertIsNotNone( - hs2_metric_value, - f"Missing metric {hs2_metric} in cache metrics {metrics_map}", - ) - - # Sanity check the metric values - self.assertEqual(hs1_metric_value, "1.0") - self.assertEqual(hs2_metric_value, "2.0") - - def get_latest_metrics() -> Dict[str, str]: """ Collect the latest metrics from the registry and parse them into an easy to use map.