# # This file is licensed under the Affero General Public License (AGPL) version 3. # # Copyright 2022 The Matrix.org Foundation C.I.C. # Copyright (C) 2023 New Vector, Ltd # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # See the GNU Affero General Public License for more details: # . # # Originally licensed under the Apache License, Version 2.0: # . # # [This file includes modifications made by New Vector Limited] # # from unittest.mock import AsyncMock, Mock from twisted.internet.testing import MemoryReactor import synapse.rest.admin from synapse.rest.client import login, notifications, receipts, room from synapse.server import HomeServer from synapse.util.clock import Clock from tests.unittest import HomeserverTestCase class HTTPPusherTests(HomeserverTestCase): servlets = [ synapse.rest.admin.register_servlets_for_client_rest_resource, room.register_servlets, login.register_servlets, receipts.register_servlets, notifications.register_servlets, ] def prepare( self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer ) -> None: self.store = homeserver.get_datastores().main self.module_api = homeserver.get_module_api() self.event_creation_handler = homeserver.get_event_creation_handler() self.sync_handler = homeserver.get_sync_handler() self.auth_handler = homeserver.get_auth_handler() self.user_id = self.register_user("user", "pass") self.access_token = self.login("user", "pass") self.other_user_id = self.register_user("otheruser", "pass") self.other_access_token = self.login("otheruser", "pass") # Create a room self.room_id = self.helper.create_room_as(self.user_id, tok=self.access_token) def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: # Mock out the calls over federation. fed_transport_client = Mock(spec=["send_transaction"]) fed_transport_client.send_transaction = AsyncMock(return_value={}) return self.setup_test_homeserver( federation_transport_client=fed_transport_client, ) def test_notify_for_local_invites(self) -> None: """ Local users will get notified for invites """ # Check we start with no pushes self._request_notifications(from_token=None, limit=1, expected_count=0) # Send an invite self.helper.invite( room=self.room_id, src=self.user_id, targ=self.other_user_id, tok=self.access_token, ) # We should have a notification now channel = self.make_request( "GET", "/notifications", access_token=self.other_access_token, ) self.assertEqual(channel.code, 200) self.assertEqual(len(channel.json_body["notifications"]), 1, channel.json_body) self.assertEqual( channel.json_body["notifications"][0]["event"]["content"]["membership"], "invite", channel.json_body, ) def test_pagination_of_notifications(self) -> None: """ Check that pagination of notifications works. """ # Check we start with no pushes self._request_notifications(from_token=None, limit=1, expected_count=0) # Send an invite and have the other user join the room. self.helper.invite( room=self.room_id, src=self.user_id, targ=self.other_user_id, tok=self.access_token, ) self.helper.join(self.room_id, self.other_user_id, tok=self.other_access_token) # Send 5 messages in the room and note down their event IDs. sent_event_ids = [] for _ in range(5): resp = self.helper.send_event( self.room_id, "m.room.message", {"body": "honk", "msgtype": "m.text"}, tok=self.access_token, ) sent_event_ids.append(resp["event_id"]) # We expect to get notifications for messages in reverse order. # So reverse this list of event IDs to make it easier to compare # against later. sent_event_ids.reverse() # We should have a few notifications now. Let's try and fetch the first 2. notification_event_ids, _ = self._request_notifications( from_token=None, limit=2, expected_count=2 ) # Check we got the expected event IDs back. self.assertEqual(notification_event_ids, sent_event_ids[:2]) # Try requesting again without a 'from' query parameter. We should get the # same two notifications back. notification_event_ids, next_token = self._request_notifications( from_token=None, limit=2, expected_count=2 ) self.assertEqual(notification_event_ids, sent_event_ids[:2]) # Ask for the next 5 notifications, though there should only be # 4 remaining; the next 3 messages and the invite. # # We need to use the "next_token" from the response as the "from" # query parameter in the next request in order to paginate. notification_event_ids, next_token = self._request_notifications( from_token=next_token, limit=5, expected_count=4 ) # Ensure we chop off the invite on the end. notification_event_ids = notification_event_ids[:-1] self.assertEqual(notification_event_ids, sent_event_ids[2:]) def _request_notifications( self, from_token: str | None, limit: int, expected_count: int ) -> tuple[list[str], str]: """ Make a request to /notifications to get the latest events to be notified about. Only the event IDs are returned. The request is made by the "other user". Args: from_token: An optional starting parameter. limit: The maximum number of results to return. expected_count: The number of events to expect in the response. Returns: A list of event IDs that the client should be notified about. Events are returned newest-first. """ # Construct the request path. path = f"/notifications?limit={limit}" if from_token is not None: path += f"&from={from_token}" channel = self.make_request( "GET", path, access_token=self.other_access_token, ) self.assertEqual(channel.code, 200) self.assertEqual( len(channel.json_body["notifications"]), expected_count, channel.json_body ) # Extract the necessary data from the response. next_token = channel.json_body["next_token"] event_ids = [ event["event"]["event_id"] for event in channel.json_body["notifications"] ] return event_ids, next_token def test_parameters(self) -> None: """ Test that appropriate errors are returned when query parameters are malformed. """ # Test that no parameters are required. channel = self.make_request( "GET", "/notifications", access_token=self.other_access_token, ) self.assertEqual(channel.code, 200) # Test that limit cannot be negative channel = self.make_request( "GET", "/notifications?limit=-1", access_token=self.other_access_token, ) self.assertEqual(channel.code, 400) # Test that the 'limit' parameter must be an integer. channel = self.make_request( "GET", "/notifications?limit=foobar", access_token=self.other_access_token, ) self.assertEqual(channel.code, 400) # Test that the 'from' parameter must be an integer. channel = self.make_request( "GET", "/notifications?from=osborne", access_token=self.other_access_token, ) self.assertEqual(channel.code, 400)