diff --git a/changelog.d/19226.misc b/changelog.d/19226.misc new file mode 100644 index 000000000..c38d1d3ef --- /dev/null +++ b/changelog.d/19226.misc @@ -0,0 +1 @@ +Add log to determine whether clients are using `/messages` as expected. diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index f869a41c5..63e5dfa70 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -21,22 +21,25 @@ import logging from typing import TYPE_CHECKING, cast +import attr + from twisted.python.failure import Failure from synapse.api.constants import Direction, EventTypes, Membership from synapse.api.errors import SynapseError from synapse.api.filtering import Filter -from synapse.events.utils import SerializeEventConfig +from synapse.events import EventBase +from synapse.handlers.relations import BundledAggregations from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME from synapse.logging.opentracing import trace from synapse.rest.admin._base import assert_user_is_admin from synapse.streams.config import PaginationConfig from synapse.types import ( - JsonDict, JsonMapping, Requester, ScheduledTask, StreamKeyType, + StreamToken, TaskStatus, ) from synapse.types.handlers import ShutdownRoomParams, ShutdownRoomResponse @@ -70,6 +73,58 @@ PURGE_ROOM_ACTION_NAME = "purge_room" SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME = "shutdown_and_purge_room" +@attr.s(slots=True, frozen=True, auto_attribs=True) +class GetMessagesResult: + """ + Everything needed to serialize a `/messages` response. + """ + + messages_chunk: list[EventBase] + """ + A list of room events. + + - When the request is `Direction.FORWARDS`, events will be in the range: + `start_token` < x <= `end_token`, (ascending topological_order) + - When the request is `Direction.BACKWARDS`, events will be in the range: + `start_token` >= x > `end_token`, (descending topological_order) + + Note that an empty chunk does not necessarily imply that no more events are + available. Clients should continue to paginate until no `end_token` property is returned. + """ + + bundled_aggregations: dict[str, BundledAggregations] + """ + A map of event ID to the bundled aggregations for the events in the chunk. + + If an event doesn't have any bundled aggregations, it may not appear in the map. + """ + + state: list[EventBase] | None + """ + A list of state events relevant to showing the chunk. For example, if + lazy_load_members is enabled in the filter then this may contain the membership + events for the senders of events in the chunk. + + Omitted from the response when `None`. + """ + + start_token: StreamToken + """ + Token corresponding to the start of chunk. This will be the same as the value given + in `from` query parameter of the `/messages` request. + """ + + end_token: StreamToken | None + """ + A token corresponding to the end of chunk. This token can be passed back to this + endpoint to request further events. + + If no further events are available (either because we have reached the start of the + timeline, or because the user does not have permission to see any more events), this + property is omitted from the response. + """ + + class PaginationHandler: """Handles pagination and purge history requests. @@ -418,7 +473,7 @@ class PaginationHandler: as_client_event: bool = True, event_filter: Filter | None = None, use_admin_priviledge: bool = False, - ) -> JsonDict: + ) -> GetMessagesResult: """Get messages in a room. Args: @@ -617,10 +672,13 @@ class PaginationHandler: # In that case we do not return end, to tell the client # there is no need for further queries. if not events: - return { - "chunk": [], - "start": await from_token.to_string(self.store), - } + return GetMessagesResult( + messages_chunk=[], + bundled_aggregations={}, + state=None, + start_token=from_token, + end_token=None, + ) if event_filter: events = await event_filter.filter(events) @@ -636,11 +694,13 @@ class PaginationHandler: # if after the filter applied there are no more events # return immediately - but there might be more in next_token batch if not events: - return { - "chunk": [], - "start": await from_token.to_string(self.store), - "end": await next_token.to_string(self.store), - } + return GetMessagesResult( + messages_chunk=[], + bundled_aggregations={}, + state=None, + start_token=from_token, + end_token=next_token, + ) state = None if event_filter and event_filter.lazy_load_members and len(events) > 0: @@ -657,38 +717,20 @@ class PaginationHandler: if state_ids: state_dict = await self.store.get_events(list(state_ids.values())) - state = state_dict.values() + state = list(state_dict.values()) aggregations = await self._relations_handler.get_bundled_aggregations( events, user_id ) - time_now = self.clock.time_msec() - - serialize_options = SerializeEventConfig( - as_client_event=as_client_event, requester=requester + return GetMessagesResult( + messages_chunk=events, + bundled_aggregations=aggregations, + state=state, + start_token=from_token, + end_token=next_token, ) - chunk = { - "chunk": ( - await self._event_serializer.serialize_events( - events, - time_now, - config=serialize_options, - bundle_aggregations=aggregations, - ) - ), - "start": await from_token.to_string(self.store), - "end": await next_token.to_string(self.store), - } - - if state: - chunk["state"] = await self._event_serializer.serialize_events( - state, time_now, config=serialize_options - ) - - return chunk - async def _shutdown_and_purge_room( self, task: ScheduledTask, diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index cf24bc628..a886859ff 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -28,9 +28,13 @@ from immutabledict import immutabledict from synapse.api.constants import Direction, EventTypes, JoinRules, Membership from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError from synapse.api.filtering import Filter +from synapse.events.utils import ( + SerializeEventConfig, +) from synapse.handlers.pagination import ( PURGE_ROOM_ACTION_NAME, SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME, + GetMessagesResult, ) from synapse.http.servlet import ( ResolveRoomIdMixin, @@ -44,11 +48,13 @@ from synapse.http.servlet import ( parse_string, ) from synapse.http.site import SynapseRequest +from synapse.logging.opentracing import trace from synapse.rest.admin._base import ( admin_patterns, assert_requester_is_admin, assert_user_is_admin, ) +from synapse.rest.client.room import SerializeMessagesDeps, encode_messages_response from synapse.storage.databases.main.room import RoomSortOrder from synapse.streams.config import PaginationConfig from synapse.types import JsonDict, RoomID, ScheduledTask, UserID, create_requester @@ -976,6 +982,7 @@ class RoomMessagesRestServlet(RestServlet): self._pagination_handler = hs.get_pagination_handler() self._auth = hs.get_auth() self._store = hs.get_datastores().main + self._event_serializer = hs.get_event_client_serializer() async def on_GET( self, request: SynapseRequest, room_id: str @@ -999,7 +1006,11 @@ class RoomMessagesRestServlet(RestServlet): ): as_client_event = False - msgs = await self._pagination_handler.get_messages( + serialize_options = SerializeEventConfig( + as_client_event=as_client_event, requester=requester + ) + + get_messages_result = await self._pagination_handler.get_messages( room_id=room_id, requester=requester, pagin_config=pagination_config, @@ -1008,7 +1019,27 @@ class RoomMessagesRestServlet(RestServlet): use_admin_priviledge=True, ) - return HTTPStatus.OK, msgs + response_content = await self.encode_response( + get_messages_result, serialize_options + ) + + return HTTPStatus.OK, response_content + + @trace + async def encode_response( + self, + get_messages_result: GetMessagesResult, + serialize_options: SerializeEventConfig, + ) -> JsonDict: + return await encode_messages_response( + get_messages_result=get_messages_result, + serialize_options=serialize_options, + serialize_deps=SerializeMessagesDeps( + clock=self._clock, + event_serializer=self._event_serializer, + store=self._store, + ), + ) class RoomTimestampToEventRestServlet(RestServlet): diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index 81a6bd57f..5e7dcb019 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -28,6 +28,7 @@ from http import HTTPStatus from typing import TYPE_CHECKING, Awaitable from urllib import parse as urlparse +import attr from prometheus_client.core import Histogram from twisted.web.server import Request @@ -45,10 +46,12 @@ from synapse.api.errors import ( ) from synapse.api.filtering import Filter from synapse.events.utils import ( + EventClientSerializer, SerializeEventConfig, format_event_for_client_v2, serialize_event, ) +from synapse.handlers.pagination import GetMessagesResult from synapse.http.server import HttpServer from synapse.http.servlet import ( ResolveRoomIdMixin, @@ -64,15 +67,17 @@ from synapse.http.servlet import ( ) from synapse.http.site import SynapseRequest from synapse.logging.context import make_deferred_yieldable, run_in_background -from synapse.logging.opentracing import set_tag +from synapse.logging.opentracing import set_tag, trace from synapse.metrics import SERVER_NAME_LABEL from synapse.rest.client._base import client_patterns from synapse.rest.client.transactions import HttpTransactionCache from synapse.state import CREATE_KEY, POWER_KEY +from synapse.storage.databases.main import DataStore from synapse.streams.config import PaginationConfig from synapse.types import JsonDict, Requester, StreamToken, ThirdPartyInstanceID, UserID from synapse.types.state import StateFilter from synapse.util.cancellation import cancellable +from synapse.util.clock import Clock from synapse.util.events import generate_fake_event_id from synapse.util.stringutils import parse_and_validate_server_name @@ -790,6 +795,56 @@ class JoinedRoomMemberListRestServlet(RestServlet): return 200, {"joined": users_with_profile} +@attr.s(slots=True, frozen=True, auto_attribs=True) +class SerializeMessagesDeps: + clock: Clock + event_serializer: EventClientSerializer + store: DataStore + + +@trace +async def encode_messages_response( + *, + get_messages_result: GetMessagesResult, + serialize_options: SerializeEventConfig, + serialize_deps: SerializeMessagesDeps, +) -> JsonDict: + """ + Serialize a `GetMessagesResult` into the JSON response format for the `/messages` + endpoint. + + This logic is shared between the client API and Synapse admin API. + """ + + time_now = serialize_deps.clock.time_msec() + + serialized_result = { + "chunk": ( + await serialize_deps.event_serializer.serialize_events( + get_messages_result.messages_chunk, + time_now, + config=serialize_options, + bundle_aggregations=get_messages_result.bundled_aggregations, + ) + ), + "start": await get_messages_result.start_token.to_string(serialize_deps.store), + } + + if get_messages_result.end_token is not None: + serialized_result["end"] = await get_messages_result.end_token.to_string( + serialize_deps.store + ) + + if get_messages_result.state is not None: + serialized_result[ + "state" + ] = await serialize_deps.event_serializer.serialize_events( + get_messages_result.state, time_now, config=serialize_options + ) + + return serialized_result + + # TODO: Needs better unit testing class RoomMessageListRestServlet(RestServlet): PATTERNS = client_patterns("/rooms/(?P[^/]*)/messages$", v1=True) @@ -806,6 +861,7 @@ class RoomMessageListRestServlet(RestServlet): self.pagination_handler = hs.get_pagination_handler() self.auth = hs.get_auth() self.store = hs.get_datastores().main + self.event_serializer = hs.get_event_client_serializer() async def on_GET( self, request: SynapseRequest, room_id: str @@ -839,7 +895,11 @@ class RoomMessageListRestServlet(RestServlet): ): as_client_event = False - msgs = await self.pagination_handler.get_messages( + serialize_options = SerializeEventConfig( + as_client_event=as_client_event, requester=requester + ) + + get_messages_result = await self.pagination_handler.get_messages( room_id=room_id, requester=requester, pagin_config=pagination_config, @@ -847,6 +907,24 @@ class RoomMessageListRestServlet(RestServlet): event_filter=event_filter, ) + # Useful for debugging timeline/pagination issues. For example, if a client + # isn't seeing the full history, we can check the homeserver logs to see if the + # client just never made the next request with the given `end` token. + logger.info( + "Responding to `/messages` request: {%s} %s %s -> %d messages with end_token=%s", + requester.user.to_string(), + request.get_method(), + request.get_redacted_uri(), + len(get_messages_result.messages_chunk), + (await get_messages_result.end_token.to_string(self.store)) + if get_messages_result.end_token + else None, + ) + + response_content = await self.encode_response( + get_messages_result, serialize_options + ) + processing_end_time = self.clock.time_msec() room_member_count = await make_deferred_yieldable(room_member_count_deferred) messsages_response_timer.labels( @@ -854,7 +932,23 @@ class RoomMessageListRestServlet(RestServlet): **{SERVER_NAME_LABEL: self.server_name}, ).observe((processing_end_time - processing_start_time) / 1000) - return 200, msgs + return 200, response_content + + @trace + async def encode_response( + self, + get_messages_result: GetMessagesResult, + serialize_options: SerializeEventConfig, + ) -> JsonDict: + return await encode_messages_response( + get_messages_result=get_messages_result, + serialize_options=serialize_options, + serialize_deps=SerializeMessagesDeps( + clock=self.clock, + event_serializer=self.event_serializer, + store=self.store, + ), + ) # TODO: Needs unit testing