diff --git a/changelog.d/18342.bugfix b/changelog.d/18342.bugfix new file mode 100644 index 000000000..6fa2fa679 --- /dev/null +++ b/changelog.d/18342.bugfix @@ -0,0 +1 @@ +Fix `ExternalIDReuse` exception after migrating to MAS on workers with a high traffic. diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index eadbf4901..c43f31353 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -763,16 +763,33 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): txn, self.get_user_by_external_id, (auth_provider, external_id) ) - self.db_pool.simple_insert_txn( + # This INSERT ... ON CONFLICT DO NOTHING statement will cause a + # 'could not serialize access due to concurrent update' + # if the row is added concurrently by another transaction. + # This is exactly what we want, as it makes the transaction get retried + # in a new snapshot where we can check for a genuine conflict. + was_inserted = self.db_pool.simple_upsert_txn( txn, table="user_external_ids", - values={ - "auth_provider": auth_provider, - "external_id": external_id, - "user_id": user_id, - }, + keyvalues={"auth_provider": auth_provider, "external_id": external_id}, + values={}, + insertion_values={"user_id": user_id}, ) + if not was_inserted: + existing_id = self.db_pool.simple_select_one_onecol_txn( + txn, + table="user_external_ids", + keyvalues={"auth_provider": auth_provider, "user_id": user_id}, + retcol="external_id", + allow_none=True, + ) + + if existing_id != external_id: + raise ExternalIDReuseException( + f"{user_id!r} has external id {existing_id!r} for {auth_provider} but trying to add {external_id!r}" + ) + async def remove_user_external_id( self, auth_provider: str, external_id: str, user_id: str ) -> None: