From 28c9ed3ccbd95a4a8d06866d4088320e5acfbbd0 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Fri, 11 Jul 2025 10:47:54 +0200 Subject: [PATCH] Remove unnecessary replication calls (#18564) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This should be reviewed commit by commit. Nowadays it's trivial to propagate cache invalidations, which means we can move some things off the main process, and not go through HTTP replication. `ReplicationGetQueryRestServlet` appeared to be unused, and was very weird, as it was being called if the current instance is the main one… to RPC to the main one (if no instance is set on a replication client, it makes it to the main process) The other two handlers could be relatively trivially moved to any workers, moving some methods to the worker store. **I've intentionally not removed the replication servlets yet** so that it's safe to rollout, and will do another PR that clean those up to remove on the N+1 version --- changelog.d/18564.misc | 1 + synapse/federation/federation_server.py | 6 - synapse/handlers/federation.py | 35 +----- synapse/replication/http/federation.py | 6 + synapse/storage/databases/main/cache.py | 11 +- .../databases/main/event_federation.py | 31 +++-- synapse/storage/databases/main/room.py | 118 +++++++++--------- 7 files changed, 97 insertions(+), 111 deletions(-) create mode 100644 changelog.d/18564.misc diff --git a/changelog.d/18564.misc b/changelog.d/18564.misc new file mode 100644 index 000000000..a31c98370 --- /dev/null +++ b/changelog.d/18564.misc @@ -0,0 +1 @@ +Remove unnecessary HTTP replication calls. diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 8718b7040..2a7f5b2c4 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -85,7 +85,6 @@ from synapse.logging.opentracing import ( from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.replication.http.federation import ( ReplicationFederationSendEduRestServlet, - ReplicationGetQueryRestServlet, ) from synapse.storage.databases.main.lock import Lock from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary @@ -1380,7 +1379,6 @@ class FederationHandlerRegistry: # and use them. However we have guards before we use them to ensure that # we don't route to ourselves, and in monolith mode that will always be # the case. - self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs) self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs) self.edu_handlers: Dict[str, Callable[[str, dict], Awaitable[None]]] = {} @@ -1469,10 +1467,6 @@ class FederationHandlerRegistry: if handler: return await handler(args) - # Check if we can route it somewhere else that isn't us - if self._instance_name == "master": - return await self._get_query_client(query_type=query_type, args=args) - # Uh oh, no handler! Let's raise an exception so the request returns an # error. logger.warning("No handler registered for query type %s", query_type) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 015fb3edc..f7806e67a 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -73,10 +73,6 @@ from synapse.logging.context import nested_logging_context from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace from synapse.metrics.background_process_metrics import run_as_background_process from synapse.module_api import NOT_SPAM -from synapse.replication.http.federation import ( - ReplicationCleanRoomRestServlet, - ReplicationStoreRoomOnOutlierMembershipRestServlet, -) from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.invite_rule import InviteRule from synapse.types import JsonDict, StrCollection, get_domain_from_id @@ -163,19 +159,6 @@ class FederationHandler: self._notifier = hs.get_notifier() self._worker_locks = hs.get_worker_locks_handler() - self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client( - hs - ) - - if hs.config.worker.worker_app: - self._maybe_store_room_on_outlier_membership = ( - ReplicationStoreRoomOnOutlierMembershipRestServlet.make_client(hs) - ) - else: - self._maybe_store_room_on_outlier_membership = ( - self.store.maybe_store_room_on_outlier_membership - ) - self._room_backfill = Linearizer("room_backfill") self._third_party_event_rules = ( @@ -647,7 +630,7 @@ class FederationHandler: # room. # In short, the races either have an acceptable outcome or should be # impossible. - await self._clean_room_for_join(room_id) + await self.store.clean_room_for_join(room_id) try: # Try the host we successfully got a response to /make_join/ @@ -857,7 +840,7 @@ class FederationHandler: event.internal_metadata.out_of_band_membership = True # Record the room ID and its version so that we have a record of the room - await self._maybe_store_room_on_outlier_membership( + await self.store.maybe_store_room_on_outlier_membership( room_id=event.room_id, room_version=event_format_version ) @@ -1115,7 +1098,7 @@ class FederationHandler: # keep a record of the room version, if we don't yet know it. # (this may get overwritten if we later get a different room version in a # join dance). - await self._maybe_store_room_on_outlier_membership( + await self.store.maybe_store_room_on_outlier_membership( room_id=event.room_id, room_version=room_version ) @@ -1761,18 +1744,6 @@ class FederationHandler: if "valid" not in response or not response["valid"]: raise AuthError(403, "Third party certificate was invalid") - async def _clean_room_for_join(self, room_id: str) -> None: - """Called to clean up any data in DB for a given room, ready for the - server to join the room. - - Args: - room_id - """ - if self.config.worker.worker_app: - await self._clean_room_for_join_client(room_id) - else: - await self.store.clean_room_for_join(room_id) - async def get_room_complexity( self, remote_room_hosts: List[str], room_id: str ) -> Optional[dict]: diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index 940f41839..f3f8ddfd6 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -202,6 +202,8 @@ class ReplicationFederationSendEduRestServlet(ReplicationEndpoint): return 200, {} +# FIXME(2025-07-22): Remove this on the next release, this will only get used +# during rollout to Synapse 1.135 and can be removed after that release. class ReplicationGetQueryRestServlet(ReplicationEndpoint): """Handle responding to queries from federation. @@ -249,6 +251,8 @@ class ReplicationGetQueryRestServlet(ReplicationEndpoint): return 200, result +# FIXME(2025-07-22): Remove this on the next release, this will only get used +# during rollout to Synapse 1.135 and can be removed after that release. class ReplicationCleanRoomRestServlet(ReplicationEndpoint): """Called to clean up any data in DB for a given room, ready for the server to join the room. @@ -284,6 +288,8 @@ class ReplicationCleanRoomRestServlet(ReplicationEndpoint): return 200, {} +# FIXME(2025-07-22): Remove this on the next release, this will only get used +# during rollout to Synapse 1.135 and can be removed after that release. class ReplicationStoreRoomOnOutlierMembershipRestServlet(ReplicationEndpoint): """Called to clean up any data in DB for a given room, ready for the server to join the room. diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 9418fb6dd..dc37f6711 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -41,7 +41,6 @@ from synapse.storage.database import ( LoggingDatabaseConnection, LoggingTransaction, ) -from synapse.storage.databases.main.events import SLIDING_SYNC_RELEVANT_STATE_SET from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import MultiWriterIdGenerator from synapse.util.caches.descriptors import CachedFunction @@ -284,6 +283,11 @@ class CacheInvalidationWorkerStore(SQLBaseStore): super().process_replication_position(stream_name, instance_name, token) def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None: + # This is needed to avoid a circular import. + from synapse.storage.databases.main.events import ( + SLIDING_SYNC_RELEVANT_STATE_SET, + ) + data = row.data if row.type == EventsStreamEventRow.TypeId: @@ -347,6 +351,11 @@ class CacheInvalidationWorkerStore(SQLBaseStore): relates_to: Optional[str], backfilled: bool, ) -> None: + # This is needed to avoid a circular import. + from synapse.storage.databases.main.events import ( + SLIDING_SYNC_RELEVANT_STATE_SET, + ) + # XXX: If you add something to this function make sure you add it to # `_invalidate_caches_for_room_events` as well. diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 46aa5902d..dfc25d893 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -46,13 +46,14 @@ from synapse.api.room_versions import EventFormatVersions, RoomVersion from synapse.events import EventBase, make_event_from_dict from synapse.logging.opentracing import tag_args, trace from synapse.metrics.background_process_metrics import wrap_as_background_process -from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause +from synapse.storage._base import db_to_json, make_in_list_sql_clause from synapse.storage.background_updates import ForeignKeyConstraint from synapse.storage.database import ( DatabasePool, LoggingDatabaseConnection, LoggingTransaction, ) +from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.signatures import SignatureWorkerStore from synapse.storage.engines import PostgresEngine, Sqlite3Engine @@ -123,7 +124,9 @@ class _NoChainCoverIndex(Exception): super().__init__("Unexpectedly no chain cover for events in %s" % (room_id,)) -class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBaseStore): +class EventFederationWorkerStore( + SignatureWorkerStore, EventsWorkerStore, CacheInvalidationWorkerStore +): # TODO: this attribute comes from EventPushActionWorkerStore. Should we inherit from # that store so that mypy can deduce this for itself? stream_ordering_month_ago: Optional[int] @@ -2053,6 +2056,19 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas number_pdus_in_federation_queue.set(count) oldest_pdu_in_federation_staging.set(age) + async def clean_room_for_join(self, room_id: str) -> None: + await self.db_pool.runInteraction( + "clean_room_for_join", self._clean_room_for_join_txn, room_id + ) + + def _clean_room_for_join_txn(self, txn: LoggingTransaction, room_id: str) -> None: + query = "DELETE FROM event_forward_extremities WHERE room_id = ?" + + txn.execute(query, (room_id,)) + self._invalidate_cache_and_stream( + txn, self.get_latest_event_ids_in_room, (room_id,) + ) + class EventFederationStore(EventFederationWorkerStore): """Responsible for storing and serving up the various graphs associated @@ -2078,17 +2094,6 @@ class EventFederationStore(EventFederationWorkerStore): self.EVENT_AUTH_STATE_ONLY, self._background_delete_non_state_event_auth ) - async def clean_room_for_join(self, room_id: str) -> None: - await self.db_pool.runInteraction( - "clean_room_for_join", self._clean_room_for_join_txn, room_id - ) - - def _clean_room_for_join_txn(self, txn: LoggingTransaction, room_id: str) -> None: - query = "DELETE FROM event_forward_extremities WHERE room_id = ?" - - txn.execute(query, (room_id,)) - txn.call_after(self.get_latest_event_ids_in_room.invalidate, (room_id,)) - async def _background_delete_non_state_event_auth( self, progress: JsonDict, batch_size: int ) -> int: diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 58451d3ff..153bdb351 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -1935,6 +1935,65 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): desc="set_room_is_public_appservice_false", ) + async def has_auth_chain_index(self, room_id: str) -> bool: + """Check if the room has (or can have) a chain cover index. + + Defaults to True if we don't have an entry in `rooms` table nor any + events for the room. + """ + + has_auth_chain_index = await self.db_pool.simple_select_one_onecol( + table="rooms", + keyvalues={"room_id": room_id}, + retcol="has_auth_chain_index", + desc="has_auth_chain_index", + allow_none=True, + ) + + if has_auth_chain_index: + return True + + # It's possible that we already have events for the room in our DB + # without a corresponding room entry. If we do then we don't want to + # mark the room as having an auth chain cover index. + max_ordering = await self.db_pool.simple_select_one_onecol( + table="events", + keyvalues={"room_id": room_id}, + retcol="MAX(stream_ordering)", + allow_none=True, + desc="has_auth_chain_index_fallback", + ) + + return max_ordering is None + + async def maybe_store_room_on_outlier_membership( + self, room_id: str, room_version: RoomVersion + ) -> None: + """ + When we receive an invite or any other event over federation that may relate to a room + we are not in, store the version of the room if we don't already know the room version. + """ + # It's possible that we already have events for the room in our DB + # without a corresponding room entry. If we do then we don't want to + # mark the room as having an auth chain cover index. + has_auth_chain_index = await self.has_auth_chain_index(room_id) + + await self.db_pool.simple_upsert( + desc="maybe_store_room_on_outlier_membership", + table="rooms", + keyvalues={"room_id": room_id}, + values={}, + insertion_values={ + "room_version": room_version.identifier, + "is_public": False, + # We don't worry about setting the `creator` here because + # we don't process any messages in a room while a user is + # invited (only after the join). + "creator": "", + "has_auth_chain_index": has_auth_chain_index, + }, + ) + class _BackgroundUpdates: REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory" @@ -2186,37 +2245,6 @@ class RoomBackgroundUpdateStore(RoomWorkerStore): return len(rooms) - async def has_auth_chain_index(self, room_id: str) -> bool: - """Check if the room has (or can have) a chain cover index. - - Defaults to True if we don't have an entry in `rooms` table nor any - events for the room. - """ - - has_auth_chain_index = await self.db_pool.simple_select_one_onecol( - table="rooms", - keyvalues={"room_id": room_id}, - retcol="has_auth_chain_index", - desc="has_auth_chain_index", - allow_none=True, - ) - - if has_auth_chain_index: - return True - - # It's possible that we already have events for the room in our DB - # without a corresponding room entry. If we do then we don't want to - # mark the room as having an auth chain cover index. - max_ordering = await self.db_pool.simple_select_one_onecol( - table="events", - keyvalues={"room_id": room_id}, - retcol="MAX(stream_ordering)", - allow_none=True, - desc="has_auth_chain_index_fallback", - ) - - return max_ordering is None - async def _background_populate_room_depth_min_depth2( self, progress: JsonDict, batch_size: int ) -> int: @@ -2567,34 +2595,6 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore): updatevalues={"join_event_id": join_event_id}, ) - async def maybe_store_room_on_outlier_membership( - self, room_id: str, room_version: RoomVersion - ) -> None: - """ - When we receive an invite or any other event over federation that may relate to a room - we are not in, store the version of the room if we don't already know the room version. - """ - # It's possible that we already have events for the room in our DB - # without a corresponding room entry. If we do then we don't want to - # mark the room as having an auth chain cover index. - has_auth_chain_index = await self.has_auth_chain_index(room_id) - - await self.db_pool.simple_upsert( - desc="maybe_store_room_on_outlier_membership", - table="rooms", - keyvalues={"room_id": room_id}, - values={}, - insertion_values={ - "room_version": room_version.identifier, - "is_public": False, - # We don't worry about setting the `creator` here because - # we don't process any messages in a room while a user is - # invited (only after the join). - "creator": "", - "has_auth_chain_index": has_auth_chain_index, - }, - ) - async def add_event_report( self, room_id: str,