diff --git a/changelog.d/18564.misc b/changelog.d/18564.misc new file mode 100644 index 000000000..a31c98370 --- /dev/null +++ b/changelog.d/18564.misc @@ -0,0 +1 @@ +Remove unnecessary HTTP replication calls. diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 8718b7040..2a7f5b2c4 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -85,7 +85,6 @@ from synapse.logging.opentracing import ( from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.replication.http.federation import ( ReplicationFederationSendEduRestServlet, - ReplicationGetQueryRestServlet, ) from synapse.storage.databases.main.lock import Lock from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary @@ -1380,7 +1379,6 @@ class FederationHandlerRegistry: # and use them. However we have guards before we use them to ensure that # we don't route to ourselves, and in monolith mode that will always be # the case. - self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs) self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs) self.edu_handlers: Dict[str, Callable[[str, dict], Awaitable[None]]] = {} @@ -1469,10 +1467,6 @@ class FederationHandlerRegistry: if handler: return await handler(args) - # Check if we can route it somewhere else that isn't us - if self._instance_name == "master": - return await self._get_query_client(query_type=query_type, args=args) - # Uh oh, no handler! Let's raise an exception so the request returns an # error. logger.warning("No handler registered for query type %s", query_type) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 015fb3edc..f7806e67a 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -73,10 +73,6 @@ from synapse.logging.context import nested_logging_context from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace from synapse.metrics.background_process_metrics import run_as_background_process from synapse.module_api import NOT_SPAM -from synapse.replication.http.federation import ( - ReplicationCleanRoomRestServlet, - ReplicationStoreRoomOnOutlierMembershipRestServlet, -) from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.invite_rule import InviteRule from synapse.types import JsonDict, StrCollection, get_domain_from_id @@ -163,19 +159,6 @@ class FederationHandler: self._notifier = hs.get_notifier() self._worker_locks = hs.get_worker_locks_handler() - self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client( - hs - ) - - if hs.config.worker.worker_app: - self._maybe_store_room_on_outlier_membership = ( - ReplicationStoreRoomOnOutlierMembershipRestServlet.make_client(hs) - ) - else: - self._maybe_store_room_on_outlier_membership = ( - self.store.maybe_store_room_on_outlier_membership - ) - self._room_backfill = Linearizer("room_backfill") self._third_party_event_rules = ( @@ -647,7 +630,7 @@ class FederationHandler: # room. # In short, the races either have an acceptable outcome or should be # impossible. - await self._clean_room_for_join(room_id) + await self.store.clean_room_for_join(room_id) try: # Try the host we successfully got a response to /make_join/ @@ -857,7 +840,7 @@ class FederationHandler: event.internal_metadata.out_of_band_membership = True # Record the room ID and its version so that we have a record of the room - await self._maybe_store_room_on_outlier_membership( + await self.store.maybe_store_room_on_outlier_membership( room_id=event.room_id, room_version=event_format_version ) @@ -1115,7 +1098,7 @@ class FederationHandler: # keep a record of the room version, if we don't yet know it. # (this may get overwritten if we later get a different room version in a # join dance). - await self._maybe_store_room_on_outlier_membership( + await self.store.maybe_store_room_on_outlier_membership( room_id=event.room_id, room_version=room_version ) @@ -1761,18 +1744,6 @@ class FederationHandler: if "valid" not in response or not response["valid"]: raise AuthError(403, "Third party certificate was invalid") - async def _clean_room_for_join(self, room_id: str) -> None: - """Called to clean up any data in DB for a given room, ready for the - server to join the room. - - Args: - room_id - """ - if self.config.worker.worker_app: - await self._clean_room_for_join_client(room_id) - else: - await self.store.clean_room_for_join(room_id) - async def get_room_complexity( self, remote_room_hosts: List[str], room_id: str ) -> Optional[dict]: diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index 940f41839..f3f8ddfd6 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -202,6 +202,8 @@ class ReplicationFederationSendEduRestServlet(ReplicationEndpoint): return 200, {} +# FIXME(2025-07-22): Remove this on the next release, this will only get used +# during rollout to Synapse 1.135 and can be removed after that release. class ReplicationGetQueryRestServlet(ReplicationEndpoint): """Handle responding to queries from federation. @@ -249,6 +251,8 @@ class ReplicationGetQueryRestServlet(ReplicationEndpoint): return 200, result +# FIXME(2025-07-22): Remove this on the next release, this will only get used +# during rollout to Synapse 1.135 and can be removed after that release. class ReplicationCleanRoomRestServlet(ReplicationEndpoint): """Called to clean up any data in DB for a given room, ready for the server to join the room. @@ -284,6 +288,8 @@ class ReplicationCleanRoomRestServlet(ReplicationEndpoint): return 200, {} +# FIXME(2025-07-22): Remove this on the next release, this will only get used +# during rollout to Synapse 1.135 and can be removed after that release. class ReplicationStoreRoomOnOutlierMembershipRestServlet(ReplicationEndpoint): """Called to clean up any data in DB for a given room, ready for the server to join the room. diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 9418fb6dd..dc37f6711 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -41,7 +41,6 @@ from synapse.storage.database import ( LoggingDatabaseConnection, LoggingTransaction, ) -from synapse.storage.databases.main.events import SLIDING_SYNC_RELEVANT_STATE_SET from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import MultiWriterIdGenerator from synapse.util.caches.descriptors import CachedFunction @@ -284,6 +283,11 @@ class CacheInvalidationWorkerStore(SQLBaseStore): super().process_replication_position(stream_name, instance_name, token) def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None: + # This is needed to avoid a circular import. + from synapse.storage.databases.main.events import ( + SLIDING_SYNC_RELEVANT_STATE_SET, + ) + data = row.data if row.type == EventsStreamEventRow.TypeId: @@ -347,6 +351,11 @@ class CacheInvalidationWorkerStore(SQLBaseStore): relates_to: Optional[str], backfilled: bool, ) -> None: + # This is needed to avoid a circular import. + from synapse.storage.databases.main.events import ( + SLIDING_SYNC_RELEVANT_STATE_SET, + ) + # XXX: If you add something to this function make sure you add it to # `_invalidate_caches_for_room_events` as well. diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 46aa5902d..dfc25d893 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -46,13 +46,14 @@ from synapse.api.room_versions import EventFormatVersions, RoomVersion from synapse.events import EventBase, make_event_from_dict from synapse.logging.opentracing import tag_args, trace from synapse.metrics.background_process_metrics import wrap_as_background_process -from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause +from synapse.storage._base import db_to_json, make_in_list_sql_clause from synapse.storage.background_updates import ForeignKeyConstraint from synapse.storage.database import ( DatabasePool, LoggingDatabaseConnection, LoggingTransaction, ) +from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.signatures import SignatureWorkerStore from synapse.storage.engines import PostgresEngine, Sqlite3Engine @@ -123,7 +124,9 @@ class _NoChainCoverIndex(Exception): super().__init__("Unexpectedly no chain cover for events in %s" % (room_id,)) -class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBaseStore): +class EventFederationWorkerStore( + SignatureWorkerStore, EventsWorkerStore, CacheInvalidationWorkerStore +): # TODO: this attribute comes from EventPushActionWorkerStore. Should we inherit from # that store so that mypy can deduce this for itself? stream_ordering_month_ago: Optional[int] @@ -2053,6 +2056,19 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas number_pdus_in_federation_queue.set(count) oldest_pdu_in_federation_staging.set(age) + async def clean_room_for_join(self, room_id: str) -> None: + await self.db_pool.runInteraction( + "clean_room_for_join", self._clean_room_for_join_txn, room_id + ) + + def _clean_room_for_join_txn(self, txn: LoggingTransaction, room_id: str) -> None: + query = "DELETE FROM event_forward_extremities WHERE room_id = ?" + + txn.execute(query, (room_id,)) + self._invalidate_cache_and_stream( + txn, self.get_latest_event_ids_in_room, (room_id,) + ) + class EventFederationStore(EventFederationWorkerStore): """Responsible for storing and serving up the various graphs associated @@ -2078,17 +2094,6 @@ class EventFederationStore(EventFederationWorkerStore): self.EVENT_AUTH_STATE_ONLY, self._background_delete_non_state_event_auth ) - async def clean_room_for_join(self, room_id: str) -> None: - await self.db_pool.runInteraction( - "clean_room_for_join", self._clean_room_for_join_txn, room_id - ) - - def _clean_room_for_join_txn(self, txn: LoggingTransaction, room_id: str) -> None: - query = "DELETE FROM event_forward_extremities WHERE room_id = ?" - - txn.execute(query, (room_id,)) - txn.call_after(self.get_latest_event_ids_in_room.invalidate, (room_id,)) - async def _background_delete_non_state_event_auth( self, progress: JsonDict, batch_size: int ) -> int: diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 58451d3ff..153bdb351 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -1935,6 +1935,65 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): desc="set_room_is_public_appservice_false", ) + async def has_auth_chain_index(self, room_id: str) -> bool: + """Check if the room has (or can have) a chain cover index. + + Defaults to True if we don't have an entry in `rooms` table nor any + events for the room. + """ + + has_auth_chain_index = await self.db_pool.simple_select_one_onecol( + table="rooms", + keyvalues={"room_id": room_id}, + retcol="has_auth_chain_index", + desc="has_auth_chain_index", + allow_none=True, + ) + + if has_auth_chain_index: + return True + + # It's possible that we already have events for the room in our DB + # without a corresponding room entry. If we do then we don't want to + # mark the room as having an auth chain cover index. + max_ordering = await self.db_pool.simple_select_one_onecol( + table="events", + keyvalues={"room_id": room_id}, + retcol="MAX(stream_ordering)", + allow_none=True, + desc="has_auth_chain_index_fallback", + ) + + return max_ordering is None + + async def maybe_store_room_on_outlier_membership( + self, room_id: str, room_version: RoomVersion + ) -> None: + """ + When we receive an invite or any other event over federation that may relate to a room + we are not in, store the version of the room if we don't already know the room version. + """ + # It's possible that we already have events for the room in our DB + # without a corresponding room entry. If we do then we don't want to + # mark the room as having an auth chain cover index. + has_auth_chain_index = await self.has_auth_chain_index(room_id) + + await self.db_pool.simple_upsert( + desc="maybe_store_room_on_outlier_membership", + table="rooms", + keyvalues={"room_id": room_id}, + values={}, + insertion_values={ + "room_version": room_version.identifier, + "is_public": False, + # We don't worry about setting the `creator` here because + # we don't process any messages in a room while a user is + # invited (only after the join). + "creator": "", + "has_auth_chain_index": has_auth_chain_index, + }, + ) + class _BackgroundUpdates: REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory" @@ -2186,37 +2245,6 @@ class RoomBackgroundUpdateStore(RoomWorkerStore): return len(rooms) - async def has_auth_chain_index(self, room_id: str) -> bool: - """Check if the room has (or can have) a chain cover index. - - Defaults to True if we don't have an entry in `rooms` table nor any - events for the room. - """ - - has_auth_chain_index = await self.db_pool.simple_select_one_onecol( - table="rooms", - keyvalues={"room_id": room_id}, - retcol="has_auth_chain_index", - desc="has_auth_chain_index", - allow_none=True, - ) - - if has_auth_chain_index: - return True - - # It's possible that we already have events for the room in our DB - # without a corresponding room entry. If we do then we don't want to - # mark the room as having an auth chain cover index. - max_ordering = await self.db_pool.simple_select_one_onecol( - table="events", - keyvalues={"room_id": room_id}, - retcol="MAX(stream_ordering)", - allow_none=True, - desc="has_auth_chain_index_fallback", - ) - - return max_ordering is None - async def _background_populate_room_depth_min_depth2( self, progress: JsonDict, batch_size: int ) -> int: @@ -2567,34 +2595,6 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore): updatevalues={"join_event_id": join_event_id}, ) - async def maybe_store_room_on_outlier_membership( - self, room_id: str, room_version: RoomVersion - ) -> None: - """ - When we receive an invite or any other event over federation that may relate to a room - we are not in, store the version of the room if we don't already know the room version. - """ - # It's possible that we already have events for the room in our DB - # without a corresponding room entry. If we do then we don't want to - # mark the room as having an auth chain cover index. - has_auth_chain_index = await self.has_auth_chain_index(room_id) - - await self.db_pool.simple_upsert( - desc="maybe_store_room_on_outlier_membership", - table="rooms", - keyvalues={"room_id": room_id}, - values={}, - insertion_values={ - "room_version": room_version.identifier, - "is_public": False, - # We don't worry about setting the `creator` here because - # we don't process any messages in a room while a user is - # invited (only after the join). - "creator": "", - "has_auth_chain_index": has_auth_chain_index, - }, - ) - async def add_event_report( self, room_id: str,