Remove unnecessary replication calls (#18564)
This should be reviewed commit by commit. Nowadays it's trivial to propagate cache invalidations, which means we can move some things off the main process, and not go through HTTP replication. `ReplicationGetQueryRestServlet` appeared to be unused, and was very weird, as it was being called if the current instance is the main one… to RPC to the main one (if no instance is set on a replication client, it makes it to the main process) The other two handlers could be relatively trivially moved to any workers, moving some methods to the worker store. **I've intentionally not removed the replication servlets yet** so that it's safe to rollout, and will do another PR that clean those up to remove on the N+1 version
This commit is contained in:
parent
1dc29563c1
commit
28c9ed3ccb
1
changelog.d/18564.misc
Normal file
1
changelog.d/18564.misc
Normal file
@ -0,0 +1 @@
|
||||
Remove unnecessary HTTP replication calls.
|
||||
@ -85,7 +85,6 @@ from synapse.logging.opentracing import (
|
||||
from synapse.metrics.background_process_metrics import wrap_as_background_process
|
||||
from synapse.replication.http.federation import (
|
||||
ReplicationFederationSendEduRestServlet,
|
||||
ReplicationGetQueryRestServlet,
|
||||
)
|
||||
from synapse.storage.databases.main.lock import Lock
|
||||
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
|
||||
@ -1380,7 +1379,6 @@ class FederationHandlerRegistry:
|
||||
# and use them. However we have guards before we use them to ensure that
|
||||
# we don't route to ourselves, and in monolith mode that will always be
|
||||
# the case.
|
||||
self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs)
|
||||
self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs)
|
||||
|
||||
self.edu_handlers: Dict[str, Callable[[str, dict], Awaitable[None]]] = {}
|
||||
@ -1469,10 +1467,6 @@ class FederationHandlerRegistry:
|
||||
if handler:
|
||||
return await handler(args)
|
||||
|
||||
# Check if we can route it somewhere else that isn't us
|
||||
if self._instance_name == "master":
|
||||
return await self._get_query_client(query_type=query_type, args=args)
|
||||
|
||||
# Uh oh, no handler! Let's raise an exception so the request returns an
|
||||
# error.
|
||||
logger.warning("No handler registered for query type %s", query_type)
|
||||
|
||||
@ -73,10 +73,6 @@ from synapse.logging.context import nested_logging_context
|
||||
from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.module_api import NOT_SPAM
|
||||
from synapse.replication.http.federation import (
|
||||
ReplicationCleanRoomRestServlet,
|
||||
ReplicationStoreRoomOnOutlierMembershipRestServlet,
|
||||
)
|
||||
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
|
||||
from synapse.storage.invite_rule import InviteRule
|
||||
from synapse.types import JsonDict, StrCollection, get_domain_from_id
|
||||
@ -163,19 +159,6 @@ class FederationHandler:
|
||||
self._notifier = hs.get_notifier()
|
||||
self._worker_locks = hs.get_worker_locks_handler()
|
||||
|
||||
self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
|
||||
hs
|
||||
)
|
||||
|
||||
if hs.config.worker.worker_app:
|
||||
self._maybe_store_room_on_outlier_membership = (
|
||||
ReplicationStoreRoomOnOutlierMembershipRestServlet.make_client(hs)
|
||||
)
|
||||
else:
|
||||
self._maybe_store_room_on_outlier_membership = (
|
||||
self.store.maybe_store_room_on_outlier_membership
|
||||
)
|
||||
|
||||
self._room_backfill = Linearizer("room_backfill")
|
||||
|
||||
self._third_party_event_rules = (
|
||||
@ -647,7 +630,7 @@ class FederationHandler:
|
||||
# room.
|
||||
# In short, the races either have an acceptable outcome or should be
|
||||
# impossible.
|
||||
await self._clean_room_for_join(room_id)
|
||||
await self.store.clean_room_for_join(room_id)
|
||||
|
||||
try:
|
||||
# Try the host we successfully got a response to /make_join/
|
||||
@ -857,7 +840,7 @@ class FederationHandler:
|
||||
event.internal_metadata.out_of_band_membership = True
|
||||
|
||||
# Record the room ID and its version so that we have a record of the room
|
||||
await self._maybe_store_room_on_outlier_membership(
|
||||
await self.store.maybe_store_room_on_outlier_membership(
|
||||
room_id=event.room_id, room_version=event_format_version
|
||||
)
|
||||
|
||||
@ -1115,7 +1098,7 @@ class FederationHandler:
|
||||
# keep a record of the room version, if we don't yet know it.
|
||||
# (this may get overwritten if we later get a different room version in a
|
||||
# join dance).
|
||||
await self._maybe_store_room_on_outlier_membership(
|
||||
await self.store.maybe_store_room_on_outlier_membership(
|
||||
room_id=event.room_id, room_version=room_version
|
||||
)
|
||||
|
||||
@ -1761,18 +1744,6 @@ class FederationHandler:
|
||||
if "valid" not in response or not response["valid"]:
|
||||
raise AuthError(403, "Third party certificate was invalid")
|
||||
|
||||
async def _clean_room_for_join(self, room_id: str) -> None:
|
||||
"""Called to clean up any data in DB for a given room, ready for the
|
||||
server to join the room.
|
||||
|
||||
Args:
|
||||
room_id
|
||||
"""
|
||||
if self.config.worker.worker_app:
|
||||
await self._clean_room_for_join_client(room_id)
|
||||
else:
|
||||
await self.store.clean_room_for_join(room_id)
|
||||
|
||||
async def get_room_complexity(
|
||||
self, remote_room_hosts: List[str], room_id: str
|
||||
) -> Optional[dict]:
|
||||
|
||||
@ -202,6 +202,8 @@ class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
|
||||
return 200, {}
|
||||
|
||||
|
||||
# FIXME(2025-07-22): Remove this on the next release, this will only get used
|
||||
# during rollout to Synapse 1.135 and can be removed after that release.
|
||||
class ReplicationGetQueryRestServlet(ReplicationEndpoint):
|
||||
"""Handle responding to queries from federation.
|
||||
|
||||
@ -249,6 +251,8 @@ class ReplicationGetQueryRestServlet(ReplicationEndpoint):
|
||||
return 200, result
|
||||
|
||||
|
||||
# FIXME(2025-07-22): Remove this on the next release, this will only get used
|
||||
# during rollout to Synapse 1.135 and can be removed after that release.
|
||||
class ReplicationCleanRoomRestServlet(ReplicationEndpoint):
|
||||
"""Called to clean up any data in DB for a given room, ready for the
|
||||
server to join the room.
|
||||
@ -284,6 +288,8 @@ class ReplicationCleanRoomRestServlet(ReplicationEndpoint):
|
||||
return 200, {}
|
||||
|
||||
|
||||
# FIXME(2025-07-22): Remove this on the next release, this will only get used
|
||||
# during rollout to Synapse 1.135 and can be removed after that release.
|
||||
class ReplicationStoreRoomOnOutlierMembershipRestServlet(ReplicationEndpoint):
|
||||
"""Called to clean up any data in DB for a given room, ready for the
|
||||
server to join the room.
|
||||
|
||||
@ -41,7 +41,6 @@ from synapse.storage.database import (
|
||||
LoggingDatabaseConnection,
|
||||
LoggingTransaction,
|
||||
)
|
||||
from synapse.storage.databases.main.events import SLIDING_SYNC_RELEVANT_STATE_SET
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.storage.util.id_generators import MultiWriterIdGenerator
|
||||
from synapse.util.caches.descriptors import CachedFunction
|
||||
@ -284,6 +283,11 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
super().process_replication_position(stream_name, instance_name, token)
|
||||
|
||||
def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None:
|
||||
# This is needed to avoid a circular import.
|
||||
from synapse.storage.databases.main.events import (
|
||||
SLIDING_SYNC_RELEVANT_STATE_SET,
|
||||
)
|
||||
|
||||
data = row.data
|
||||
|
||||
if row.type == EventsStreamEventRow.TypeId:
|
||||
@ -347,6 +351,11 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
relates_to: Optional[str],
|
||||
backfilled: bool,
|
||||
) -> None:
|
||||
# This is needed to avoid a circular import.
|
||||
from synapse.storage.databases.main.events import (
|
||||
SLIDING_SYNC_RELEVANT_STATE_SET,
|
||||
)
|
||||
|
||||
# XXX: If you add something to this function make sure you add it to
|
||||
# `_invalidate_caches_for_room_events` as well.
|
||||
|
||||
|
||||
@ -46,13 +46,14 @@ from synapse.api.room_versions import EventFormatVersions, RoomVersion
|
||||
from synapse.events import EventBase, make_event_from_dict
|
||||
from synapse.logging.opentracing import tag_args, trace
|
||||
from synapse.metrics.background_process_metrics import wrap_as_background_process
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
|
||||
from synapse.storage._base import db_to_json, make_in_list_sql_clause
|
||||
from synapse.storage.background_updates import ForeignKeyConstraint
|
||||
from synapse.storage.database import (
|
||||
DatabasePool,
|
||||
LoggingDatabaseConnection,
|
||||
LoggingTransaction,
|
||||
)
|
||||
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
|
||||
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
||||
from synapse.storage.databases.main.signatures import SignatureWorkerStore
|
||||
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
|
||||
@ -123,7 +124,9 @@ class _NoChainCoverIndex(Exception):
|
||||
super().__init__("Unexpectedly no chain cover for events in %s" % (room_id,))
|
||||
|
||||
|
||||
class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBaseStore):
|
||||
class EventFederationWorkerStore(
|
||||
SignatureWorkerStore, EventsWorkerStore, CacheInvalidationWorkerStore
|
||||
):
|
||||
# TODO: this attribute comes from EventPushActionWorkerStore. Should we inherit from
|
||||
# that store so that mypy can deduce this for itself?
|
||||
stream_ordering_month_ago: Optional[int]
|
||||
@ -2053,6 +2056,19 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
|
||||
number_pdus_in_federation_queue.set(count)
|
||||
oldest_pdu_in_federation_staging.set(age)
|
||||
|
||||
async def clean_room_for_join(self, room_id: str) -> None:
|
||||
await self.db_pool.runInteraction(
|
||||
"clean_room_for_join", self._clean_room_for_join_txn, room_id
|
||||
)
|
||||
|
||||
def _clean_room_for_join_txn(self, txn: LoggingTransaction, room_id: str) -> None:
|
||||
query = "DELETE FROM event_forward_extremities WHERE room_id = ?"
|
||||
|
||||
txn.execute(query, (room_id,))
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.get_latest_event_ids_in_room, (room_id,)
|
||||
)
|
||||
|
||||
|
||||
class EventFederationStore(EventFederationWorkerStore):
|
||||
"""Responsible for storing and serving up the various graphs associated
|
||||
@ -2078,17 +2094,6 @@ class EventFederationStore(EventFederationWorkerStore):
|
||||
self.EVENT_AUTH_STATE_ONLY, self._background_delete_non_state_event_auth
|
||||
)
|
||||
|
||||
async def clean_room_for_join(self, room_id: str) -> None:
|
||||
await self.db_pool.runInteraction(
|
||||
"clean_room_for_join", self._clean_room_for_join_txn, room_id
|
||||
)
|
||||
|
||||
def _clean_room_for_join_txn(self, txn: LoggingTransaction, room_id: str) -> None:
|
||||
query = "DELETE FROM event_forward_extremities WHERE room_id = ?"
|
||||
|
||||
txn.execute(query, (room_id,))
|
||||
txn.call_after(self.get_latest_event_ids_in_room.invalidate, (room_id,))
|
||||
|
||||
async def _background_delete_non_state_event_auth(
|
||||
self, progress: JsonDict, batch_size: int
|
||||
) -> int:
|
||||
|
||||
@ -1935,6 +1935,65 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
|
||||
desc="set_room_is_public_appservice_false",
|
||||
)
|
||||
|
||||
async def has_auth_chain_index(self, room_id: str) -> bool:
|
||||
"""Check if the room has (or can have) a chain cover index.
|
||||
|
||||
Defaults to True if we don't have an entry in `rooms` table nor any
|
||||
events for the room.
|
||||
"""
|
||||
|
||||
has_auth_chain_index = await self.db_pool.simple_select_one_onecol(
|
||||
table="rooms",
|
||||
keyvalues={"room_id": room_id},
|
||||
retcol="has_auth_chain_index",
|
||||
desc="has_auth_chain_index",
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
if has_auth_chain_index:
|
||||
return True
|
||||
|
||||
# It's possible that we already have events for the room in our DB
|
||||
# without a corresponding room entry. If we do then we don't want to
|
||||
# mark the room as having an auth chain cover index.
|
||||
max_ordering = await self.db_pool.simple_select_one_onecol(
|
||||
table="events",
|
||||
keyvalues={"room_id": room_id},
|
||||
retcol="MAX(stream_ordering)",
|
||||
allow_none=True,
|
||||
desc="has_auth_chain_index_fallback",
|
||||
)
|
||||
|
||||
return max_ordering is None
|
||||
|
||||
async def maybe_store_room_on_outlier_membership(
|
||||
self, room_id: str, room_version: RoomVersion
|
||||
) -> None:
|
||||
"""
|
||||
When we receive an invite or any other event over federation that may relate to a room
|
||||
we are not in, store the version of the room if we don't already know the room version.
|
||||
"""
|
||||
# It's possible that we already have events for the room in our DB
|
||||
# without a corresponding room entry. If we do then we don't want to
|
||||
# mark the room as having an auth chain cover index.
|
||||
has_auth_chain_index = await self.has_auth_chain_index(room_id)
|
||||
|
||||
await self.db_pool.simple_upsert(
|
||||
desc="maybe_store_room_on_outlier_membership",
|
||||
table="rooms",
|
||||
keyvalues={"room_id": room_id},
|
||||
values={},
|
||||
insertion_values={
|
||||
"room_version": room_version.identifier,
|
||||
"is_public": False,
|
||||
# We don't worry about setting the `creator` here because
|
||||
# we don't process any messages in a room while a user is
|
||||
# invited (only after the join).
|
||||
"creator": "",
|
||||
"has_auth_chain_index": has_auth_chain_index,
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
class _BackgroundUpdates:
|
||||
REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory"
|
||||
@ -2186,37 +2245,6 @@ class RoomBackgroundUpdateStore(RoomWorkerStore):
|
||||
|
||||
return len(rooms)
|
||||
|
||||
async def has_auth_chain_index(self, room_id: str) -> bool:
|
||||
"""Check if the room has (or can have) a chain cover index.
|
||||
|
||||
Defaults to True if we don't have an entry in `rooms` table nor any
|
||||
events for the room.
|
||||
"""
|
||||
|
||||
has_auth_chain_index = await self.db_pool.simple_select_one_onecol(
|
||||
table="rooms",
|
||||
keyvalues={"room_id": room_id},
|
||||
retcol="has_auth_chain_index",
|
||||
desc="has_auth_chain_index",
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
if has_auth_chain_index:
|
||||
return True
|
||||
|
||||
# It's possible that we already have events for the room in our DB
|
||||
# without a corresponding room entry. If we do then we don't want to
|
||||
# mark the room as having an auth chain cover index.
|
||||
max_ordering = await self.db_pool.simple_select_one_onecol(
|
||||
table="events",
|
||||
keyvalues={"room_id": room_id},
|
||||
retcol="MAX(stream_ordering)",
|
||||
allow_none=True,
|
||||
desc="has_auth_chain_index_fallback",
|
||||
)
|
||||
|
||||
return max_ordering is None
|
||||
|
||||
async def _background_populate_room_depth_min_depth2(
|
||||
self, progress: JsonDict, batch_size: int
|
||||
) -> int:
|
||||
@ -2567,34 +2595,6 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
|
||||
updatevalues={"join_event_id": join_event_id},
|
||||
)
|
||||
|
||||
async def maybe_store_room_on_outlier_membership(
|
||||
self, room_id: str, room_version: RoomVersion
|
||||
) -> None:
|
||||
"""
|
||||
When we receive an invite or any other event over federation that may relate to a room
|
||||
we are not in, store the version of the room if we don't already know the room version.
|
||||
"""
|
||||
# It's possible that we already have events for the room in our DB
|
||||
# without a corresponding room entry. If we do then we don't want to
|
||||
# mark the room as having an auth chain cover index.
|
||||
has_auth_chain_index = await self.has_auth_chain_index(room_id)
|
||||
|
||||
await self.db_pool.simple_upsert(
|
||||
desc="maybe_store_room_on_outlier_membership",
|
||||
table="rooms",
|
||||
keyvalues={"room_id": room_id},
|
||||
values={},
|
||||
insertion_values={
|
||||
"room_version": room_version.identifier,
|
||||
"is_public": False,
|
||||
# We don't worry about setting the `creator` here because
|
||||
# we don't process any messages in a room while a user is
|
||||
# invited (only after the join).
|
||||
"creator": "",
|
||||
"has_auth_chain_index": has_auth_chain_index,
|
||||
},
|
||||
)
|
||||
|
||||
async def add_event_report(
|
||||
self,
|
||||
room_id: str,
|
||||
|
||||
Loading…
Reference in New Issue
Block a user