diff --git a/changelog.d/83.misc b/changelog.d/83.misc new file mode 100644 index 000000000..fad525272 --- /dev/null +++ b/changelog.d/83.misc @@ -0,0 +1 @@ +Add a parameter to `upgrade_rooms(..)` to allow auto join local users. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 4a939b964..fff46b640 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1560,7 +1560,7 @@ class EventCreationHandler: self, requester: Requester, room_id: str, - prev_event_id: str, + prev_event_id: Optional[str], event_dicts: Sequence[JsonDict], ratelimit: bool = True, ignore_shadow_ban: bool = False, @@ -1591,6 +1591,12 @@ class EventCreationHandler: # Nothing to do. return + if prev_event_id is None: + # Pick the latest forward extremity as the previous event ID. + prev_event_ids = await self.store.get_forward_extremities_for_room(room_id) + prev_event_ids.sort(key=lambda x: x[2]) # Sort by depth. + prev_event_id = prev_event_ids[-1][0] + state_groups = await self._storage_controllers.state.get_state_group_for_events( [prev_event_id] ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 72afb35ed..8b3377b39 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -94,6 +94,7 @@ from synapse.types import ( from synapse.types.handlers import ShutdownRoomParams, ShutdownRoomResponse from synapse.types.state import StateFilter from synapse.util import stringutils +from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.response_cache import ResponseCache from synapse.util.iterutils import batch_iter from synapse.util.stringutils import parse_and_validate_server_name @@ -201,6 +202,8 @@ class RoomCreationHandler: old_room_id: str, new_version: RoomVersion, additional_creators: Optional[List[str]], + auto_member: bool = False, + ratelimit: bool = True, ) -> str: """Replace a room with a new room with a different version @@ -209,6 +212,8 @@ class RoomCreationHandler: old_room_id: the id of the room to be replaced new_version: the new room version to use additional_creators: additional room creators, for MSC4289. + auto_member: Whether to automatically join local users to the new + room and send out invites to remote users. Returns: the new room id @@ -216,11 +221,12 @@ class RoomCreationHandler: Raises: ShadowBanError if the requester is shadow-banned. """ - await self.creation_ratelimiter.ratelimit(requester, update=False) + if ratelimit: + await self.creation_ratelimiter.ratelimit(requester, update=False) - # then apply the ratelimits - await self.common_request_ratelimiter.ratelimit(requester) - await self.creation_ratelimiter.ratelimit(requester) + # then apply the ratelimits + await self.common_request_ratelimiter.ratelimit(requester) + await self.creation_ratelimiter.ratelimit(requester) user_id = requester.user.to_string() @@ -314,6 +320,7 @@ class RoomCreationHandler: tombstone_context, additional_creators, creation_event_with_context, + auto_member=auto_member, ) return ret @@ -341,6 +348,7 @@ class RoomCreationHandler: creation_event_with_context: Optional[ Tuple[EventBase, synapse.events.snapshot.EventContext] ] = None, + auto_member: bool = False, ) -> str: """ Args: @@ -354,6 +362,8 @@ class RoomCreationHandler: tombstone_context: the context for the tombstone event additional_creators: additional room creators, for MSC4289. creation_event_with_context: The new room's create event, for room IDs as create event IDs. + auto_member: Whether to automatically join local users to the new + room and send out invites to remote users. Raises: ShadowBanError if the requester is shadow-banned. @@ -382,6 +392,7 @@ class RoomCreationHandler: tombstone_event_id=tombstone_event.event_id, additional_creators=additional_creators, creation_event_with_context=creation_event_with_context, + auto_member=auto_member, ) # now send the tombstone @@ -548,6 +559,7 @@ class RoomCreationHandler: creation_event_with_context: Optional[ Tuple[EventBase, synapse.events.snapshot.EventContext] ] = None, + auto_member: bool = False, ) -> None: """Populate a new room based on an old room @@ -561,6 +573,8 @@ class RoomCreationHandler: additional_creators: additional room creators, for MSC4289. creation_event_with_context: The create event of the new room, if the new room supports room ID as create event ID hash. + auto_member: Whether to automatically join local users to the new + room and send out invites to remote users. """ user_id = requester.user.to_string() @@ -714,7 +728,7 @@ class RoomCreationHandler: # `update_membership`, however in this case its fine to bypass as # these bans don't need any special treatment, i.e. the sender is in # the room and they don't need any extra signatures, etc. - for batched_events in batch_iter(ban_events, 1000): + for batched_ban_events in batch_iter(ban_events, 1000): await self.event_creation_handler.create_and_send_new_client_events( requester=requester, room_id=new_room_id, @@ -727,13 +741,201 @@ class RoomCreationHandler: "sender": requester.user.to_string(), "content": ban_event.content, } - for ban_event in batched_events + for ban_event in batched_ban_events ], - ratelimit=False, + ratelimit=False, # We ratelimit the entire upgrade, not individual events. ) - # XXX invites/joins - # XXX 3pid invites + if auto_member: + logger.info("Joining local users to %s", new_room_id) + + # 1. Copy over all joins for local + joined_profiles = await self.store.get_users_in_room_with_profiles( + old_room_id + ) + + local_user_ids = [ + user_id for user_id in joined_profiles if self.hs.is_mine_id(user_id) + ] + + logger.info("Local user IDs %s", local_user_ids) + + for batched_local_user_ids in batch_iter(local_user_ids, 1000): + invites_to_send = [] + + # For each local user we create an invite event (from the + # upgrading user) plus a join event. + for local_user_id in batched_local_user_ids: + if local_user_id == user_id: + # Ignore the upgrading user, as they are already in the + # new room. + continue + + invites_to_send.append( + { + "type": EventTypes.Member, + "state_key": local_user_id, + "room_id": new_room_id, + "sender": requester.user.to_string(), + "content": { + "membership": Membership.INVITE, + }, + } + ) + + # If the user has profile information in the previous join, + # add it to the content. + # + # We could instead copy over the contents from the old join + # event, however a) that would require us to fetch all the + # old join events (which is slow), and b) generally the join + # events have no extra information in them. (We also believe + # that most clients don't copy this information over either, + # but we could be wrong.) + content_profile = {} + user_profile = joined_profiles[local_user_id] + if user_profile.display_name: + content_profile["displayname"] = user_profile.display_name + if user_profile.avatar_url: + content_profile["avatar_url"] = user_profile.avatar_url + + invites_to_send.append( + { + "type": EventTypes.Member, + "state_key": local_user_id, + "room_id": new_room_id, + "sender": local_user_id, + "content": { + "membership": Membership.JOIN, + **content_profile, + }, + } + ) + + await self.event_creation_handler.create_and_send_new_client_events( + requester=requester, + room_id=new_room_id, + prev_event_id=None, + event_dicts=invites_to_send, + ratelimit=False, # We ratelimit the entire upgrade, not individual events. + ) + + # Invite other users if the room is not public. If the room *is* + # public then users can simply directly join, and inviting them as + # well may lead to confusion. + + join_rule_content = initial_state.get((EventTypes.JoinRules, ""), None) + is_public = False + if join_rule_content: + is_public = join_rule_content["join_rule"] == JoinRules.PUBLIC + + if not is_public: + # Copy invites + # TODO: Copy over 3pid invites as well. + invited_users = await self.store.get_invited_users_in_room( + room_id=old_room_id + ) + + # For local users we can just batch send the invites. + local_invited_users = [ + user_id for user_id in invited_users if self.hs.is_mine_id(user_id) + ] + + logger.info( + "Joining local user IDs %s to new room %s", + local_invited_users, + new_room_id, + ) + + for batched_local_invited_users in batch_iter( + local_invited_users, 1000 + ): + invites_to_send = [] + leaves_to_send = [] + + # For each local user we create an invite event (from the + # upgrading user), and reject the invite event in the old + # room. + # + # This ensures that the user ends up with a single invite to + # the new room (rather than multiple invites which may be + # noisy and confusing). + for local_user_id in batched_local_invited_users: + leaves_to_send.append( + { + "type": EventTypes.Member, + "state_key": local_user_id, + "room_id": old_room_id, + "sender": local_user_id, + "content": { + "membership": Membership.LEAVE, + }, + } + ) + invites_to_send.append( + { + "type": EventTypes.Member, + "state_key": local_user_id, + "room_id": new_room_id, + "sender": requester.user.to_string(), + "content": { + "membership": Membership.INVITE, + }, + } + ) + + await self.event_creation_handler.create_and_send_new_client_events( + requester=requester, + room_id=old_room_id, + prev_event_id=None, + event_dicts=leaves_to_send, + ratelimit=False, # We ratelimit the entire upgrade, not individual events. + ) + await self.event_creation_handler.create_and_send_new_client_events( + requester=requester, + room_id=new_room_id, + prev_event_id=None, + event_dicts=invites_to_send, + ratelimit=False, + ) + + # For remote users we send invites one by one, as we need to + # send each one to the remote server. + # + # We also invite joined remote users who were in the old room. + remote_user_ids = [ + user_id + for user_id in itertools.chain(invited_users, joined_profiles) + if not self.hs.is_mine_id(user_id) + ] + + logger.debug("Inviting remote user IDs %s", remote_user_ids) + + async def remote_invite(remote_user: str) -> None: + try: + await self.room_member_handler.update_membership( + requester, + UserID.from_string(remote_user), + new_room_id, + Membership.INVITE, + ratelimit=False, # We ratelimit the entire upgrade, not individual events. + ) + except SynapseError as e: + # If we fail to invite a remote user, we log it but continue + # on with the upgrade. + logger.warning( + "Failed to invite remote user %s to new room %s: %s", + remote_user, + new_room_id, + e, + ) + + # We do this concurrently, as it can take a while to invite + await concurrently_execute( + remote_invite, + remote_user_ids, + 10, + ) async def _move_aliases_to_new_room( self, diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 654250fad..67e7e99ba 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -196,6 +196,19 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): retcol="state_key", ) + async def get_invited_users_in_room(self, room_id: str) -> StrCollection: + """Returns a list of users invited to the room.""" + return await self.db_pool.simple_select_onecol( + table="current_state_events", + keyvalues={ + "type": EventTypes.Member, + "room_id": room_id, + "membership": Membership.INVITE, + }, + retcol="state_key", + desc="get_invited_users_in_room", + ) + @cached() def get_user_in_room_with_profile(self, room_id: str, user_id: str) -> ProfileInfo: raise NotImplementedError()