From 7ec5e60671c8516563fb7c8eb45b134a70237f49 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 26 Aug 2025 10:15:03 +0100 Subject: [PATCH] Introduce `EventPersistencePair` type (#18857) `Tuple[EventBase, EventContext]` is everywhere and I keep misspelling it. Let's just define a type for it. --- changelog.d/18857.misc | 1 + synapse/events/snapshot.py | 8 +++- synapse/federation/federation_server.py | 4 +- synapse/handlers/federation_event.py | 12 +++-- synapse/handlers/message.py | 9 ++-- synapse/push/bulk_push_rule_evaluator.py | 4 +- synapse/replication/http/federation.py | 6 +-- synapse/replication/http/send_events.py | 6 +-- synapse/storage/controllers/persist_events.py | 20 ++++----- synapse/storage/databases/main/events.py | 44 +++++++++---------- synapse/storage/databases/state/deletion.py | 5 +-- 11 files changed, 64 insertions(+), 55 deletions(-) create mode 100644 changelog.d/18857.misc diff --git a/changelog.d/18857.misc b/changelog.d/18857.misc new file mode 100644 index 000000000..e679be8b9 --- /dev/null +++ b/changelog.d/18857.misc @@ -0,0 +1 @@ +Introduce `EventPersistencePair` type. diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index 0bca4c188..63551143d 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -306,6 +306,12 @@ class EventContext(UnpersistedEventContextBase): ) +EventPersistencePair = Tuple[EventBase, EventContext] +""" +The combination of an event to be persisted and its context. +""" + + @attr.s(slots=True, auto_attribs=True) class UnpersistedEventContext(UnpersistedEventContextBase): """ @@ -363,7 +369,7 @@ class UnpersistedEventContext(UnpersistedEventContextBase): room_id: str, last_known_state_group: int, datastore: "StateGroupDataStore", - ) -> List[Tuple[EventBase, EventContext]]: + ) -> List[EventPersistencePair]: """ Takes a list of events and their associated unpersisted contexts and persists the unpersisted contexts, returning a list of events and persisted contexts. diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 127518e1f..a8d5c3c28 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -59,7 +59,7 @@ from synapse.api.errors import ( from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion from synapse.crypto.event_signing import compute_event_signature from synapse.events import EventBase -from synapse.events.snapshot import EventContext +from synapse.events.snapshot import EventPersistencePair from synapse.federation.federation_base import ( FederationBase, InvalidEventSignatureError, @@ -914,7 +914,7 @@ class FederationServer(FederationBase): async def _on_send_membership_event( self, origin: str, content: JsonDict, membership_type: str, room_id: str - ) -> Tuple[EventBase, EventContext]: + ) -> EventPersistencePair: """Handle an on_send_{join,leave,knock} request Does some preliminary validation before passing the request on to the diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 511394c66..04ee774aa 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -66,7 +66,11 @@ from synapse.event_auth import ( validate_event_for_room_version, ) from synapse.events import EventBase -from synapse.events.snapshot import EventContext, UnpersistedEventContextBase +from synapse.events.snapshot import ( + EventContext, + EventPersistencePair, + UnpersistedEventContextBase, +) from synapse.federation.federation_client import InvalidResponseError, PulledPduInfo from synapse.logging.context import nested_logging_context from synapse.logging.opentracing import ( @@ -341,7 +345,7 @@ class FederationEventHandler: async def on_send_membership_event( self, origin: str, event: EventBase - ) -> Tuple[EventBase, EventContext]: + ) -> EventPersistencePair: """ We have received a join/leave/knock event for a room via send_join/leave/knock. @@ -1712,7 +1716,7 @@ class FederationEventHandler: ) auth_map.update(persisted_events) - events_and_contexts_to_persist: List[Tuple[EventBase, EventContext]] = [] + events_and_contexts_to_persist: List[EventPersistencePair] = [] async def prep(event: EventBase) -> None: with nested_logging_context(suffix=event.event_id): @@ -2225,7 +2229,7 @@ class FederationEventHandler: async def persist_events_and_notify( self, room_id: str, - event_and_contexts: Sequence[Tuple[EventBase, EventContext]], + event_and_contexts: Sequence[EventPersistencePair], backfilled: bool = False, ) -> int: """Persists events and tells the notifier/pushers about them, if diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index fff46b640..d850b617d 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -57,6 +57,7 @@ from synapse.events import EventBase, relation_from_event from synapse.events.builder import EventBuilder from synapse.events.snapshot import ( EventContext, + EventPersistencePair, UnpersistedEventContext, UnpersistedEventContextBase, ) @@ -1439,7 +1440,7 @@ class EventCreationHandler: async def handle_new_client_event( self, requester: Requester, - events_and_context: List[Tuple[EventBase, EventContext]], + events_and_context: List[EventPersistencePair], ratelimit: bool = True, extra_users: Optional[List[UserID]] = None, ignore_shadow_ban: bool = False, @@ -1651,7 +1652,7 @@ class EventCreationHandler: async def _persist_events( self, requester: Requester, - events_and_context: List[Tuple[EventBase, EventContext]], + events_and_context: List[EventPersistencePair], ratelimit: bool = True, extra_users: Optional[List[UserID]] = None, ) -> EventBase: @@ -1737,7 +1738,7 @@ class EventCreationHandler: raise async def cache_joined_hosts_for_events( - self, events_and_context: List[Tuple[EventBase, EventContext]] + self, events_and_context: List[EventPersistencePair] ) -> None: """Precalculate the joined hosts at each of the given events, when using Redis, so that external federation senders don't have to recalculate it themselves. @@ -1843,7 +1844,7 @@ class EventCreationHandler: async def persist_and_notify_client_events( self, requester: Requester, - events_and_context: List[Tuple[EventBase, EventContext]], + events_and_context: List[EventPersistencePair], ratelimit: bool = True, extra_users: Optional[List[UserID]] = None, ) -> EventBase: diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index bb9d5dbca..ea9169aef 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -49,7 +49,7 @@ from synapse.api.constants import ( from synapse.api.room_versions import PushRuleRoomFlag from synapse.event_auth import auth_types_for_event, get_user_power_level from synapse.events import EventBase, relation_from_event -from synapse.events.snapshot import EventContext +from synapse.events.snapshot import EventContext, EventPersistencePair from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.metrics import SERVER_NAME_LABEL from synapse.state import CREATE_KEY, POWER_KEY @@ -352,7 +352,7 @@ class BulkPushRuleEvaluator: return related_events async def action_for_events_by_user( - self, events_and_context: List[Tuple[EventBase, EventContext]] + self, events_and_context: List[EventPersistencePair] ) -> None: """Given a list of events and their associated contexts, evaluate the push rules for each event, check if the message should increment the unread count, and diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index c29ed8d14..1e302ef59 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -24,8 +24,8 @@ from typing import TYPE_CHECKING, List, Tuple from twisted.web.server import Request from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion -from synapse.events import EventBase, make_event_from_dict -from synapse.events.snapshot import EventContext +from synapse.events import make_event_from_dict +from synapse.events.snapshot import EventContext, EventPersistencePair from synapse.http.server import HttpServer from synapse.replication.http._base import ReplicationEndpoint from synapse.types import JsonDict @@ -86,7 +86,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): async def _serialize_payload( # type: ignore[override] store: "DataStore", room_id: str, - event_and_contexts: List[Tuple[EventBase, EventContext]], + event_and_contexts: List[EventPersistencePair], backfilled: bool, ) -> JsonDict: """ diff --git a/synapse/replication/http/send_events.py b/synapse/replication/http/send_events.py index 15e363b3e..6b1a5a995 100644 --- a/synapse/replication/http/send_events.py +++ b/synapse/replication/http/send_events.py @@ -25,8 +25,8 @@ from typing import TYPE_CHECKING, List, Tuple from twisted.web.server import Request from synapse.api.room_versions import KNOWN_ROOM_VERSIONS -from synapse.events import EventBase, make_event_from_dict -from synapse.events.snapshot import EventContext +from synapse.events import make_event_from_dict +from synapse.events.snapshot import EventContext, EventPersistencePair from synapse.http.server import HttpServer from synapse.replication.http._base import ReplicationEndpoint from synapse.types import JsonDict, Requester, UserID @@ -85,7 +85,7 @@ class ReplicationSendEventsRestServlet(ReplicationEndpoint): @staticmethod async def _serialize_payload( # type: ignore[override] - events_and_context: List[Tuple[EventBase, EventContext]], + events_and_context: List[EventPersistencePair], store: "DataStore", requester: Requester, ratelimit: bool, diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index 95a34f7be..120934af5 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -51,7 +51,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.events import EventBase -from synapse.events.snapshot import EventContext +from synapse.events.snapshot import EventContext, EventPersistencePair from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable from synapse.logging.opentracing import ( @@ -144,7 +144,7 @@ class _PersistEventsTask: name: ClassVar[str] = "persist_event_batch" # used for opentracing - events_and_contexts: List[Tuple[EventBase, EventContext]] + events_and_contexts: List[EventPersistencePair] backfilled: bool def try_merge(self, task: "_EventPersistQueueTask") -> bool: @@ -391,7 +391,7 @@ class EventsPersistenceStorageController: @trace async def persist_events( self, - events_and_contexts: Iterable[Tuple[EventBase, EventContext]], + events_and_contexts: Iterable[EventPersistencePair], backfilled: bool = False, ) -> Tuple[List[EventBase], RoomStreamToken]: """ @@ -414,7 +414,7 @@ class EventsPersistenceStorageController: a room that has been un-partial stated. """ event_ids: List[str] = [] - partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {} + partitioned: Dict[str, List[EventPersistencePair]] = {} for event, ctx in events_and_contexts: partitioned.setdefault(event.room_id, []).append((event, ctx)) event_ids.append(event.event_id) @@ -430,7 +430,7 @@ class EventsPersistenceStorageController: set_tag(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled)) async def enqueue( - item: Tuple[str, List[Tuple[EventBase, EventContext]]], + item: Tuple[str, List[EventPersistencePair]], ) -> Dict[str, str]: room_id, evs_ctxs = item return await self._event_persist_queue.add_to_queue( @@ -677,7 +677,7 @@ class EventsPersistenceStorageController: return replaced_events async def _calculate_new_forward_extremities_and_state_delta( - self, room_id: str, ev_ctx_rm: List[Tuple[EventBase, EventContext]] + self, room_id: str, ev_ctx_rm: List[EventPersistencePair] ) -> Tuple[Optional[Set[str]], Optional[DeltaState]]: """Calculates the new forward extremities and state delta for a room given events to persist. @@ -802,7 +802,7 @@ class EventsPersistenceStorageController: async def _calculate_new_extremities( self, room_id: str, - event_contexts: List[Tuple[EventBase, EventContext]], + event_contexts: List[EventPersistencePair], latest_event_ids: AbstractSet[str], ) -> Set[str]: """Calculates the new forward extremities for a room given events to @@ -862,7 +862,7 @@ class EventsPersistenceStorageController: async def _get_new_state_after_events( self, room_id: str, - events_context: List[Tuple[EventBase, EventContext]], + events_context: List[EventPersistencePair], old_latest_event_ids: AbstractSet[str], new_latest_event_ids: Set[str], ) -> Tuple[Optional[StateMap[str]], Optional[StateMap[str]], Set[str]]: @@ -1039,7 +1039,7 @@ class EventsPersistenceStorageController: new_latest_event_ids: Set[str], resolved_state_group: int, event_id_to_state_group: Dict[str, int], - events_context: List[Tuple[EventBase, EventContext]], + events_context: List[EventPersistencePair], ) -> Set[str]: """See if we can prune any of the extremities after calculating the resolved state. @@ -1176,7 +1176,7 @@ class EventsPersistenceStorageController: async def _is_server_still_joined( self, room_id: str, - ev_ctx_rm: List[Tuple[EventBase, EventContext]], + ev_ctx_rm: List[EventPersistencePair], delta: DeltaState, ) -> bool: """Check if the server will still be joined after the given events have diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 2478367f0..a50e889b9 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -57,7 +57,7 @@ from synapse.events import ( is_creator, relation_from_event, ) -from synapse.events.snapshot import EventContext +from synapse.events.snapshot import EventPersistencePair from synapse.events.utils import parse_stripped_state_event from synapse.logging.opentracing import trace from synapse.metrics import SERVER_NAME_LABEL @@ -274,7 +274,7 @@ class PersistEventsStore: async def _persist_events_and_state_updates( self, room_id: str, - events_and_contexts: List[Tuple[EventBase, EventContext]], + events_and_contexts: List[EventPersistencePair], *, state_delta_for_room: Optional[DeltaState], new_forward_extremities: Optional[Set[str]], @@ -532,7 +532,7 @@ class PersistEventsStore: async def _calculate_sliding_sync_table_changes( self, room_id: str, - events_and_contexts: Sequence[Tuple[EventBase, EventContext]], + events_and_contexts: Sequence[EventPersistencePair], delta_state: DeltaState, ) -> SlidingSyncTableChanges: """ @@ -1016,7 +1016,7 @@ class PersistEventsStore: txn: LoggingTransaction, *, room_id: str, - events_and_contexts: List[Tuple[EventBase, EventContext]], + events_and_contexts: List[EventPersistencePair], inhibit_local_membership_updates: bool, state_delta_for_room: Optional[DeltaState], new_forward_extremities: Optional[Set[str]], @@ -1666,7 +1666,7 @@ class PersistEventsStore: def _persist_transaction_ids_txn( self, txn: LoggingTransaction, - events_and_contexts: List[Tuple[EventBase, EventContext]], + events_and_contexts: List[EventPersistencePair], ) -> None: """Persist the mapping from transaction IDs to event IDs (if defined).""" @@ -2316,7 +2316,7 @@ class PersistEventsStore: self, txn: LoggingTransaction, room_id: str, - events_and_contexts: List[Tuple[EventBase, EventContext]], + events_and_contexts: List[EventPersistencePair], ) -> None: """ Update the latest `event_stream_ordering`/`bump_stamp` columns in the @@ -2456,8 +2456,8 @@ class PersistEventsStore: @classmethod def _filter_events_and_contexts_for_duplicates( - cls, events_and_contexts: List[Tuple[EventBase, EventContext]] - ) -> List[Tuple[EventBase, EventContext]]: + cls, events_and_contexts: List[EventPersistencePair] + ) -> List[EventPersistencePair]: """Ensure that we don't have the same event twice. Pick the earliest non-outlier if there is one, else the earliest one. @@ -2468,9 +2468,7 @@ class PersistEventsStore: Returns: filtered list """ - new_events_and_contexts: OrderedDict[str, Tuple[EventBase, EventContext]] = ( - OrderedDict() - ) + new_events_and_contexts: OrderedDict[str, EventPersistencePair] = OrderedDict() for event, context in events_and_contexts: prev_event_context = new_events_and_contexts.get(event.event_id) if prev_event_context: @@ -2488,7 +2486,7 @@ class PersistEventsStore: self, txn: LoggingTransaction, room_id: str, - events_and_contexts: List[Tuple[EventBase, EventContext]], + events_and_contexts: List[EventPersistencePair], ) -> None: """Update min_depth for each room @@ -2530,8 +2528,8 @@ class PersistEventsStore: def _update_outliers_txn( self, txn: LoggingTransaction, - events_and_contexts: List[Tuple[EventBase, EventContext]], - ) -> List[Tuple[EventBase, EventContext]]: + events_and_contexts: List[EventPersistencePair], + ) -> List[EventPersistencePair]: """Update any outliers with new event info. This turns outliers into ex-outliers (unless the new event was rejected), and @@ -2638,7 +2636,7 @@ class PersistEventsStore: def _store_event_txn( self, txn: LoggingTransaction, - events_and_contexts: Collection[Tuple[EventBase, EventContext]], + events_and_contexts: Collection[EventPersistencePair], ) -> None: """Insert new events into the event, event_json, redaction and state_events tables. @@ -2742,8 +2740,8 @@ class PersistEventsStore: def _store_rejected_events_txn( self, txn: LoggingTransaction, - events_and_contexts: List[Tuple[EventBase, EventContext]], - ) -> List[Tuple[EventBase, EventContext]]: + events_and_contexts: List[EventPersistencePair], + ) -> List[EventPersistencePair]: """Add rows to the 'rejections' table for received events which were rejected @@ -2770,8 +2768,8 @@ class PersistEventsStore: self, txn: LoggingTransaction, *, - events_and_contexts: List[Tuple[EventBase, EventContext]], - all_events_and_contexts: List[Tuple[EventBase, EventContext]], + events_and_contexts: List[EventPersistencePair], + all_events_and_contexts: List[EventPersistencePair], inhibit_local_membership_updates: bool = False, ) -> None: """Update all the miscellaneous tables for new events @@ -2865,7 +2863,7 @@ class PersistEventsStore: def _add_to_cache( self, txn: LoggingTransaction, - events_and_contexts: List[Tuple[EventBase, EventContext]], + events_and_contexts: List[EventPersistencePair], ) -> None: to_prefill: List[EventCacheEntry] = [] @@ -3338,8 +3336,8 @@ class PersistEventsStore: def _set_push_actions_for_event_and_users_txn( self, txn: LoggingTransaction, - events_and_contexts: List[Tuple[EventBase, EventContext]], - all_events_and_contexts: List[Tuple[EventBase, EventContext]], + events_and_contexts: List[EventPersistencePair], + all_events_and_contexts: List[EventPersistencePair], ) -> None: """Handles moving push actions from staging table to main event_push_actions table for all events in `events_and_contexts`. @@ -3422,7 +3420,7 @@ class PersistEventsStore: def _store_event_state_mappings_txn( self, txn: LoggingTransaction, - events_and_contexts: Collection[Tuple[EventBase, EventContext]], + events_and_contexts: Collection[EventPersistencePair], ) -> None: """ Raises: diff --git a/synapse/storage/databases/state/deletion.py b/synapse/storage/databases/state/deletion.py index f77c46f6a..9b62c1d81 100644 --- a/synapse/storage/databases/state/deletion.py +++ b/synapse/storage/databases/state/deletion.py @@ -25,8 +25,7 @@ from typing import ( Tuple, ) -from synapse.events import EventBase -from synapse.events.snapshot import EventContext +from synapse.events.snapshot import EventPersistencePair from synapse.storage.database import ( DatabasePool, LoggingDatabaseConnection, @@ -228,7 +227,7 @@ class StateDeletionDataStore: @contextlib.asynccontextmanager async def persisting_state_group_references( - self, event_and_contexts: Collection[Tuple[EventBase, EventContext]] + self, event_and_contexts: Collection[EventPersistencePair] ) -> AsyncIterator[None]: """Wraps the persistence of the given events and contexts, ensuring that any state groups referenced still exist and that they don't get deleted