Remove destinations from sending if not whitelisted (#18484)
Co-authored-by: Andrew Morgan <andrew@amorgan.xyz>
This commit is contained in:
parent
82189cbde4
commit
919c362466
1
changelog.d/18484.bugfix
Normal file
1
changelog.d/18484.bugfix
Normal file
@ -0,0 +1 @@
|
||||
Remove destinations from sending if not whitelisted.
|
||||
@ -94,5 +94,21 @@ class FederationConfig(Config):
|
||||
2**62,
|
||||
)
|
||||
|
||||
def is_domain_allowed_according_to_federation_whitelist(self, domain: str) -> bool:
|
||||
"""
|
||||
Returns whether a domain is allowed according to the federation whitelist. If a
|
||||
federation whitelist is not set, all domains are allowed.
|
||||
|
||||
Args:
|
||||
domain: The domain to test.
|
||||
|
||||
Returns:
|
||||
True if the domain is allowed or if a whitelist is not set, False otherwise.
|
||||
"""
|
||||
if self.federation_domain_whitelist is None:
|
||||
return True
|
||||
|
||||
return domain in self.federation_domain_whitelist
|
||||
|
||||
|
||||
_METRICS_FOR_DOMAINS_SCHEMA = {"type": "array", "items": {"type": "string"}}
|
||||
|
||||
@ -342,6 +342,8 @@ class _DestinationWakeupQueue:
|
||||
destination, _ = self.queue.popitem(last=False)
|
||||
|
||||
queue = self.sender._get_per_destination_queue(destination)
|
||||
if queue is None:
|
||||
continue
|
||||
|
||||
if not queue._new_data_to_send:
|
||||
# The per destination queue has already been woken up.
|
||||
@ -436,12 +438,23 @@ class FederationSender(AbstractFederationSender):
|
||||
self._wake_destinations_needing_catchup,
|
||||
)
|
||||
|
||||
def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
|
||||
def _get_per_destination_queue(
|
||||
self, destination: str
|
||||
) -> Optional[PerDestinationQueue]:
|
||||
"""Get or create a PerDestinationQueue for the given destination
|
||||
|
||||
Args:
|
||||
destination: server_name of remote server
|
||||
|
||||
Returns:
|
||||
None if the destination is not allowed by the federation whitelist.
|
||||
Otherwise a PerDestinationQueue for this destination.
|
||||
"""
|
||||
if not self.hs.config.federation.is_domain_allowed_according_to_federation_whitelist(
|
||||
destination
|
||||
):
|
||||
return None
|
||||
|
||||
queue = self._per_destination_queues.get(destination)
|
||||
if not queue:
|
||||
queue = PerDestinationQueue(self.hs, self._transaction_manager, destination)
|
||||
@ -718,6 +731,16 @@ class FederationSender(AbstractFederationSender):
|
||||
# track the fact that we have a PDU for these destinations,
|
||||
# to allow us to perform catch-up later on if the remote is unreachable
|
||||
# for a while.
|
||||
# Filter out any destinations not present in the federation_domain_whitelist, if
|
||||
# the whitelist exists. These destinations should not be sent to so let's not
|
||||
# waste time or space keeping track of events destined for them.
|
||||
destinations = [
|
||||
d
|
||||
for d in destinations
|
||||
if self.hs.config.federation.is_domain_allowed_according_to_federation_whitelist(
|
||||
d
|
||||
)
|
||||
]
|
||||
await self.store.store_destination_rooms_entries(
|
||||
destinations,
|
||||
pdu.room_id,
|
||||
@ -732,7 +755,12 @@ class FederationSender(AbstractFederationSender):
|
||||
)
|
||||
|
||||
for destination in destinations:
|
||||
self._get_per_destination_queue(destination).send_pdu(pdu)
|
||||
queue = self._get_per_destination_queue(destination)
|
||||
# We expect `queue` to not be None as we already filtered out
|
||||
# non-whitelisted destinations above.
|
||||
assert queue is not None
|
||||
|
||||
queue.send_pdu(pdu)
|
||||
|
||||
async def send_read_receipt(self, receipt: ReadReceipt) -> None:
|
||||
"""Send a RR to any other servers in the room
|
||||
@ -841,12 +869,16 @@ class FederationSender(AbstractFederationSender):
|
||||
for domain in immediate_domains:
|
||||
# Add to destination queue and wake the destination up
|
||||
queue = self._get_per_destination_queue(domain)
|
||||
if queue is None:
|
||||
continue
|
||||
queue.queue_read_receipt(receipt)
|
||||
queue.attempt_new_transaction()
|
||||
|
||||
for domain in delay_domains:
|
||||
# Add to destination queue...
|
||||
queue = self._get_per_destination_queue(domain)
|
||||
if queue is None:
|
||||
continue
|
||||
queue.queue_read_receipt(receipt)
|
||||
|
||||
# ... and schedule the destination to be woken up.
|
||||
@ -882,9 +914,10 @@ class FederationSender(AbstractFederationSender):
|
||||
if self.is_mine_server_name(destination):
|
||||
continue
|
||||
|
||||
self._get_per_destination_queue(destination).send_presence(
|
||||
states, start_loop=False
|
||||
)
|
||||
queue = self._get_per_destination_queue(destination)
|
||||
if queue is None:
|
||||
continue
|
||||
queue.send_presence(states, start_loop=False)
|
||||
|
||||
self._destination_wakeup_queue.add_to_queue(destination)
|
||||
|
||||
@ -934,6 +967,8 @@ class FederationSender(AbstractFederationSender):
|
||||
return
|
||||
|
||||
queue = self._get_per_destination_queue(edu.destination)
|
||||
if queue is None:
|
||||
return
|
||||
if key:
|
||||
queue.send_keyed_edu(edu, key)
|
||||
else:
|
||||
@ -958,9 +993,15 @@ class FederationSender(AbstractFederationSender):
|
||||
|
||||
for destination in destinations:
|
||||
if immediate:
|
||||
self._get_per_destination_queue(destination).attempt_new_transaction()
|
||||
queue = self._get_per_destination_queue(destination)
|
||||
if queue is None:
|
||||
continue
|
||||
queue.attempt_new_transaction()
|
||||
else:
|
||||
self._get_per_destination_queue(destination).mark_new_data()
|
||||
queue = self._get_per_destination_queue(destination)
|
||||
if queue is None:
|
||||
continue
|
||||
queue.mark_new_data()
|
||||
self._destination_wakeup_queue.add_to_queue(destination)
|
||||
|
||||
def wake_destination(self, destination: str) -> None:
|
||||
@ -979,7 +1020,9 @@ class FederationSender(AbstractFederationSender):
|
||||
):
|
||||
return
|
||||
|
||||
self._get_per_destination_queue(destination).attempt_new_transaction()
|
||||
queue = self._get_per_destination_queue(destination)
|
||||
if queue is not None:
|
||||
queue.attempt_new_transaction()
|
||||
|
||||
@staticmethod
|
||||
def get_current_token() -> int:
|
||||
@ -1024,6 +1067,9 @@ class FederationSender(AbstractFederationSender):
|
||||
d
|
||||
for d in destinations_to_wake
|
||||
if self._federation_shard_config.should_handle(self._instance_name, d)
|
||||
and self.hs.config.federation.is_domain_allowed_according_to_federation_whitelist(
|
||||
d
|
||||
)
|
||||
]
|
||||
|
||||
for destination in destinations_to_wake:
|
||||
|
||||
Loading…
Reference in New Issue
Block a user