diff --git a/CHANGES.md b/CHANGES.md index 673ab28a5..ec24d9e0e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,16 @@ +# Synapse 1.135.0rc2 (2025-07-30) + +### Bugfixes + +- Fix user failing to deactivate with MAS when `/_synapse/mas` is handled by a worker. ([\#18716](https://github.com/element-hq/synapse/issues/18716)) + +### Internal Changes + +- Fix performance regression introduced in [#18238](https://github.com/element-hq/synapse/issues/18238) by adding a cache to `is_server_admin`. ([\#18747](https://github.com/element-hq/synapse/issues/18747)) + + + + # Synapse 1.135.0rc1 (2025-07-22) ### Features diff --git a/debian/changelog b/debian/changelog index 221f10ae3..e654b05ed 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +matrix-synapse-py3 (1.135.0~rc2) stable; urgency=medium + + * New Synapse release 1.135.0rc2. + + -- Synapse Packaging team Wed, 30 Jul 2025 12:19:14 +0100 + matrix-synapse-py3 (1.135.0~rc1) stable; urgency=medium * New Synapse release 1.135.0rc1. diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index 6212a9404..6f25653bb 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -178,6 +178,7 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = { "^/_matrix/client/(api/v1|r0|v3|unstable)/login$", "^/_matrix/client/(api/v1|r0|v3|unstable)/account/3pid$", "^/_matrix/client/(api/v1|r0|v3|unstable)/account/whoami$", + "^/_matrix/client/(api/v1|r0|v3|unstable)/account/deactivate$", "^/_matrix/client/(api/v1|r0|v3|unstable)/devices(/|$)", "^/_matrix/client/(r0|v3)/delete_devices$", "^/_matrix/client/versions$", diff --git a/docs/workers.md b/docs/workers.md index 7881aeebb..59c60dd0a 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -238,6 +238,7 @@ information. ^/_matrix/client/unstable/im.nheko.summary/summary/.*$ ^/_matrix/client/(r0|v3|unstable)/account/3pid$ ^/_matrix/client/(r0|v3|unstable)/account/whoami$ + ^/_matrix/client/(r0|v3|unstable)/account/deactivate$ ^/_matrix/client/(r0|v3)/delete_devices$ ^/_matrix/client/(api/v1|r0|v3|unstable)/devices(/|$) ^/_matrix/client/versions$ diff --git a/pyproject.toml b/pyproject.toml index 513918148..a7a9e11f9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -101,7 +101,7 @@ module-name = "synapse.synapse_rust" [tool.poetry] name = "matrix-synapse" -version = "1.135.0rc1" +version = "1.135.0rc2" description = "Homeserver for the Matrix decentralised comms protocol" authors = ["Matrix.org Team and Contributors "] license = "AGPL-3.0-or-later" diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index abef46222..e07bc2e72 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -222,6 +222,7 @@ class AuthHandler: self._password_localdb_enabled = hs.config.auth.password_localdb_enabled self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules self._account_validity_handler = hs.get_account_validity_handler() + self._pusher_pool = hs.get_pusherpool() # Ratelimiter for failed auth during UIA. Uses same ratelimit config # as per `rc_login.failed_attempts`. @@ -1662,7 +1663,7 @@ class AuthHandler: ) if medium == "email": - await self.store.delete_pusher_by_app_id_pushkey_user_id( + await self._pusher_pool.remove_pusher( app_id="m.email", pushkey=address, user_id=user_id ) diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py index 8994e0920..e4169321c 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py @@ -25,6 +25,9 @@ from typing import TYPE_CHECKING, Optional from synapse.api.constants import Membership from synapse.api.errors import SynapseError from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.replication.http.deactivate_account import ( + ReplicationNotifyAccountDeactivatedServlet, +) from synapse.types import Codes, Requester, UserID, create_requester if TYPE_CHECKING: @@ -45,6 +48,7 @@ class DeactivateAccountHandler: self._room_member_handler = hs.get_room_member_handler() self._identity_handler = hs.get_identity_handler() self._profile_handler = hs.get_profile_handler() + self._pusher_pool = hs.get_pusherpool() self.user_directory_handler = hs.get_user_directory_handler() self._server_name = hs.hostname self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules @@ -53,10 +57,16 @@ class DeactivateAccountHandler: self._user_parter_running = False self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules + self._notify_account_deactivated_client = None + # Start the user parter loop so it can resume parting users from rooms where # it left off (if it has work left to do). - if hs.config.worker.run_background_tasks: + if hs.config.worker.worker_app is None: hs.get_reactor().callWhenRunning(self._start_user_parting) + else: + self._notify_account_deactivated_client = ( + ReplicationNotifyAccountDeactivatedServlet.make_client(hs) + ) self._account_validity_enabled = ( hs.config.account_validity.account_validity_enabled @@ -146,7 +156,7 @@ class DeactivateAccountHandler: # Most of the pushers will have been deleted when we logged out the # associated devices above, but we still need to delete pushers not # associated with devices, e.g. email pushers. - await self.store.delete_all_pushers_for_user(user_id) + await self._pusher_pool.delete_all_pushers_for_user(user_id) # Add the user to a table of users pending deactivation (ie. # removal from all the rooms they're a member of) @@ -170,10 +180,6 @@ class DeactivateAccountHandler: logger.info("Marking %s as erased", user_id) await self.store.mark_user_erased(user_id) - # Now start the process that goes through that list and - # parts users from rooms (if it isn't already running) - self._start_user_parting() - # Reject all pending invites and knocks for the user, so that the # user doesn't show up in the "invited" section of rooms' members list. await self._reject_pending_invites_and_knocks_for_user(user_id) @@ -194,15 +200,37 @@ class DeactivateAccountHandler: # Delete any server-side backup keys await self.store.bulk_delete_backup_keys_and_versions_for_user(user_id) + # Notify modules and start the room parting process. + await self.notify_account_deactivated(user_id, by_admin=by_admin) + + return identity_server_supports_unbinding + + async def notify_account_deactivated( + self, + user_id: str, + by_admin: bool = False, + ) -> None: + """Notify modules and start the room parting process. + Goes through replication if this is not the main process. + """ + if self._notify_account_deactivated_client is not None: + await self._notify_account_deactivated_client( + user_id=user_id, + by_admin=by_admin, + ) + return + + # Now start the process that goes through that list and + # parts users from rooms (if it isn't already running) + self._start_user_parting() + # Let modules know the user has been deactivated. await self._third_party_rules.on_user_deactivation_status_changed( user_id, True, - by_admin, + by_admin=by_admin, ) - return identity_server_supports_unbinding - async def _reject_pending_invites_and_knocks_for_user(self, user_id: str) -> None: """Reject pending invites and knocks addressed to a given user ID. diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index ee51872c2..d1f79ec99 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -32,7 +32,10 @@ from synapse.metrics.background_process_metrics import ( ) from synapse.push import Pusher, PusherConfig, PusherConfigException from synapse.push.pusher import PusherFactory -from synapse.replication.http.push import ReplicationRemovePusherRestServlet +from synapse.replication.http.push import ( + ReplicationDeleteAllPushersForUserRestServlet, + ReplicationRemovePusherRestServlet, +) from synapse.types import JsonDict, RoomStreamToken, StrCollection from synapse.util.async_helpers import concurrently_execute from synapse.util.threepids import canonicalise_email @@ -84,10 +87,14 @@ class PusherPool: # We can only delete pushers on master. self._remove_pusher_client = None + self._delete_all_pushers_for_user_client = None if hs.config.worker.worker_app: self._remove_pusher_client = ReplicationRemovePusherRestServlet.make_client( hs ) + self._delete_all_pushers_for_user_client = ( + ReplicationDeleteAllPushersForUserRestServlet.make_client(hs) + ) # Record the last stream ID that we were poked about so we can get # changes since then. We set this to the current max stream ID on @@ -468,6 +475,13 @@ class PusherPool: app_id, pushkey, user_id ) + async def delete_all_pushers_for_user(self, user_id: str) -> None: + """Deletes all pushers for a user.""" + if self._delete_all_pushers_for_user_client is not None: + await self._delete_all_pushers_for_user_client(user_id=user_id) + else: + await self.store.delete_all_pushers_for_user(user_id=user_id) + def maybe_stop_pusher(self, app_id: str, pushkey: str, user_id: str) -> None: """Stops a pusher with the given app ID and push key if one is running. diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py index 4dadf33cc..68cc6ce1f 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py @@ -23,6 +23,7 @@ from typing import TYPE_CHECKING from synapse.http.server import JsonResource from synapse.replication.http import ( account_data, + deactivate_account, delayed_events, devices, federation, @@ -64,3 +65,4 @@ class ReplicationRestResource(JsonResource): login.register_servlets(hs, self) register.register_servlets(hs, self) delayed_events.register_servlets(hs, self) + deactivate_account.register_servlets(hs, self) diff --git a/synapse/replication/http/deactivate_account.py b/synapse/replication/http/deactivate_account.py new file mode 100644 index 000000000..89658350a --- /dev/null +++ b/synapse/replication/http/deactivate_account.py @@ -0,0 +1,81 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2023 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# . +# +# Originally licensed under the Apache License, Version 2.0: +# . +# +# [This file includes modifications made by New Vector Limited] +# +# + +import logging +from typing import TYPE_CHECKING, Tuple + +from twisted.web.server import Request + +from synapse.http.server import HttpServer +from synapse.replication.http._base import ReplicationEndpoint +from synapse.types import JsonDict + +if TYPE_CHECKING: + from synapse.server import HomeServer + +logger = logging.getLogger(__name__) + + +class ReplicationNotifyAccountDeactivatedServlet(ReplicationEndpoint): + """Notify that an account has been deactivated. + + Request format: + + POST /_synapse/replication/notify_account_deactivated/:user_id + + { + "by_admin": true, + } + + """ + + NAME = "notify_account_deactivated" + PATH_ARGS = ("user_id",) + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + self.deactivate_account_handler = hs.get_deactivate_account_handler() + + @staticmethod + async def _serialize_payload( # type: ignore[override] + user_id: str, + by_admin: bool, + ) -> JsonDict: + """ + Args: + user_id: The user ID which has been deactivated. + by_admin: Whether the user was deactivated by an admin. + """ + return { + "by_admin": by_admin, + } + + async def _handle_request( # type: ignore[override] + self, request: Request, content: JsonDict, user_id: str + ) -> Tuple[int, JsonDict]: + by_admin = content["by_admin"] + await self.deactivate_account_handler.notify_account_deactivated( + user_id, by_admin=by_admin + ) + return 200, {} + + +def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: + ReplicationNotifyAccountDeactivatedServlet(hs).register(http_server) diff --git a/synapse/replication/http/push.py b/synapse/replication/http/push.py index 48e254cdb..6e20a208b 100644 --- a/synapse/replication/http/push.py +++ b/synapse/replication/http/push.py @@ -118,6 +118,39 @@ class ReplicationCopyPusherRestServlet(ReplicationEndpoint): return 200, {} +class ReplicationDeleteAllPushersForUserRestServlet(ReplicationEndpoint): + """Deletes all pushers for a user. + + Request format: + + POST /_synapse/replication/delete_all_pushers_for_user/:user_id + + {} + + """ + + NAME = "delete_all_pushers_for_user" + PATH_ARGS = ("user_id",) + CACHE = False + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + + self._store = hs.get_datastores().main + + @staticmethod + async def _serialize_payload(user_id: str) -> JsonDict: # type: ignore[override] + return {} + + async def _handle_request( # type: ignore[override] + self, request: Request, content: JsonDict, user_id: str + ) -> Tuple[int, JsonDict]: + await self._store.delete_all_pushers_for_user(user_id) + + return 200, {} + + def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: ReplicationRemovePusherRestServlet(hs).register(http_server) ReplicationCopyPusherRestServlet(hs).register(http_server) + ReplicationDeleteAllPushersForUserRestServlet(hs).register(http_server) diff --git a/synapse/rest/client/account.py b/synapse/rest/client/account.py index 667e79abd..86f1c9c9e 100644 --- a/synapse/rest/client/account.py +++ b/synapse/rest/client/account.py @@ -905,23 +905,26 @@ class AccountStatusRestServlet(RestServlet): def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: + ThreepidRestServlet(hs).register(http_server) + WhoamiRestServlet(hs).register(http_server) + + if not hs.config.experimental.msc3861.enabled: + DeactivateAccountRestServlet(hs).register(http_server) + + # These servlets are only registered on the main process if hs.config.worker.worker_app is None: + ThreepidBindRestServlet(hs).register(http_server) + ThreepidUnbindRestServlet(hs).register(http_server) + if not hs.config.experimental.msc3861.enabled: EmailPasswordRequestTokenRestServlet(hs).register(http_server) - DeactivateAccountRestServlet(hs).register(http_server) PasswordRestServlet(hs).register(http_server) EmailThreepidRequestTokenRestServlet(hs).register(http_server) MsisdnThreepidRequestTokenRestServlet(hs).register(http_server) AddThreepidEmailSubmitTokenServlet(hs).register(http_server) AddThreepidMsisdnSubmitTokenServlet(hs).register(http_server) - ThreepidRestServlet(hs).register(http_server) - if hs.config.worker.worker_app is None: - ThreepidBindRestServlet(hs).register(http_server) - ThreepidUnbindRestServlet(hs).register(http_server) - if not hs.config.experimental.msc3861.enabled: ThreepidAddRestServlet(hs).register(http_server) ThreepidDeleteRestServlet(hs).register(http_server) - WhoamiRestServlet(hs).register(http_server) - if hs.config.worker.worker_app is None and hs.config.experimental.msc3720_enabled: - AccountStatusRestServlet(hs).register(http_server) + if hs.config.experimental.msc3720_enabled: + AccountStatusRestServlet(hs).register(http_server) diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 320d29e47..c4131ddf7 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -673,6 +673,7 @@ class RegistrationWorkerStore(StatsStore, CacheInvalidationWorkerStore): desc="delete_account_validity_for_user", ) + @cached(max_entries=100000) async def is_server_admin(self, user: UserID) -> bool: """Determines if a user is an admin of this homeserver. @@ -707,6 +708,9 @@ class RegistrationWorkerStore(StatsStore, CacheInvalidationWorkerStore): self._invalidate_cache_and_stream( txn, self.get_user_by_id, (user.to_string(),) ) + self._invalidate_cache_and_stream( + txn, self.is_server_admin, (user.to_string(),) + ) await self.db_pool.runInteraction("set_server_admin", set_server_admin_txn) @@ -2596,6 +2600,36 @@ class RegistrationWorkerStore(StatsStore, CacheInvalidationWorkerStore): await self.db_pool.runInteraction("delete_access_token", f) + async def user_set_password_hash( + self, user_id: str, password_hash: Optional[str] + ) -> None: + """ + NB. This does *not* evict any cache because the one use for this + removes most of the entries subsequently anyway so it would be + pointless. Use flush_user separately. + """ + + def user_set_password_hash_txn(txn: LoggingTransaction) -> None: + self.db_pool.simple_update_one_txn( + txn, "users", {"name": user_id}, {"password_hash": password_hash} + ) + self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) + + await self.db_pool.runInteraction( + "user_set_password_hash", user_set_password_hash_txn + ) + + async def add_user_pending_deactivation(self, user_id: str) -> None: + """ + Adds a user to the table of users who need to be parted from all the rooms they're + in + """ + await self.db_pool.simple_insert( + "users_pending_deactivation", + values={"user_id": user_id}, + desc="add_user_pending_deactivation", + ) + class RegistrationBackgroundUpdateStore(RegistrationWorkerStore): def __init__( @@ -2820,25 +2854,6 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): return next_id - async def user_set_password_hash( - self, user_id: str, password_hash: Optional[str] - ) -> None: - """ - NB. This does *not* evict any cache because the one use for this - removes most of the entries subsequently anyway so it would be - pointless. Use flush_user separately. - """ - - def user_set_password_hash_txn(txn: LoggingTransaction) -> None: - self.db_pool.simple_update_one_txn( - txn, "users", {"name": user_id}, {"password_hash": password_hash} - ) - self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) - - await self.db_pool.runInteraction( - "user_set_password_hash", user_set_password_hash_txn - ) - async def user_set_consent_version( self, user_id: str, consent_version: str ) -> None: @@ -2891,17 +2906,6 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): await self.db_pool.runInteraction("user_set_consent_server_notice_sent", f) - async def add_user_pending_deactivation(self, user_id: str) -> None: - """ - Adds a user to the table of users who need to be parted from all the rooms they're - in - """ - await self.db_pool.simple_insert( - "users_pending_deactivation", - values={"user_id": user_id}, - desc="add_user_pending_deactivation", - ) - async def validate_threepid_session( self, session_id: str, client_secret: str, token: str, current_ts: int ) -> Optional[str]: diff --git a/synapse/storage/databases/main/user_erasure_store.py b/synapse/storage/databases/main/user_erasure_store.py index bbde8491f..cceed484c 100644 --- a/synapse/storage/databases/main/user_erasure_store.py +++ b/synapse/storage/databases/main/user_erasure_store.py @@ -70,8 +70,6 @@ class UserErasureWorkerStore(CacheInvalidationWorkerStore): return {u: u in erased_users for u in user_ids} - -class UserErasureStore(UserErasureWorkerStore): async def mark_user_erased(self, user_id: str) -> None: """Indicate that user_id wishes their message history to be erased. @@ -113,3 +111,7 @@ class UserErasureStore(UserErasureWorkerStore): self._invalidate_cache_and_stream(txn, self.is_user_erased, (user_id,)) await self.db_pool.runInteraction("mark_user_not_erased", f) + + +class UserErasureStore(UserErasureWorkerStore): + pass diff --git a/tests/rest/client/test_relations.py b/tests/rest/client/test_relations.py index 3f289a4ea..fd1e87296 100644 --- a/tests/rest/client/test_relations.py +++ b/tests/rest/client/test_relations.py @@ -1181,7 +1181,7 @@ class BundledAggregationsTestCase(BaseRelationsTestCase): bundled_aggregations, ) - self._test_bundled_aggregations(RelationTypes.REFERENCE, assert_annotations, 9) + self._test_bundled_aggregations(RelationTypes.REFERENCE, assert_annotations, 7) def test_thread(self) -> None: """ @@ -1226,21 +1226,21 @@ class BundledAggregationsTestCase(BaseRelationsTestCase): # The "user" sent the root event and is making queries for the bundled # aggregations: they have participated. - self._test_bundled_aggregations(RelationTypes.THREAD, _gen_assert(True), 10) + self._test_bundled_aggregations(RelationTypes.THREAD, _gen_assert(True), 7) # The "user2" sent replies in the thread and is making queries for the # bundled aggregations: they have participated. # # Note that this re-uses some cached values, so the total number of # queries is much smaller. self._test_bundled_aggregations( - RelationTypes.THREAD, _gen_assert(True), 7, access_token=self.user2_token + RelationTypes.THREAD, _gen_assert(True), 4, access_token=self.user2_token ) # A user with no interactions with the thread: they have not participated. user3_id, user3_token = self._create_user("charlie") self.helper.join(self.room, user=user3_id, tok=user3_token) self._test_bundled_aggregations( - RelationTypes.THREAD, _gen_assert(False), 7, access_token=user3_token + RelationTypes.THREAD, _gen_assert(False), 4, access_token=user3_token ) def test_thread_with_bundled_aggregations_for_latest(self) -> None: @@ -1287,7 +1287,7 @@ class BundledAggregationsTestCase(BaseRelationsTestCase): bundled_aggregations["latest_event"].get("unsigned"), ) - self._test_bundled_aggregations(RelationTypes.THREAD, assert_thread, 10) + self._test_bundled_aggregations(RelationTypes.THREAD, assert_thread, 7) def test_nested_thread(self) -> None: """