From 67903128316b6ad4e8aebb42b1e318091b873f49 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 5 Nov 2025 15:38:14 +0000 Subject: [PATCH] Fixup logcontexts after replication PR. (#19146) Fixes logcontext leaks introduced in #19138. --- changelog.d/19146.misc | 1 + synapse/util/async_helpers.py | 26 ++++++++------------------ synapse/util/background_queue.py | 4 ++++ tests/util/test_background_queue.py | 26 +++++++++++++++++++------- 4 files changed, 32 insertions(+), 25 deletions(-) create mode 100644 changelog.d/19146.misc diff --git a/changelog.d/19146.misc b/changelog.d/19146.misc new file mode 100644 index 000000000..118336173 --- /dev/null +++ b/changelog.d/19146.misc @@ -0,0 +1 @@ +Minor speed up of processing of inbound replication. diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 99e899d1e..8322a1bb3 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -1029,7 +1029,8 @@ class DeferredEvent: def set(self) -> None: if not self._deferred.called: - self._deferred.callback(None) + with PreserveLoggingContext(): + self._deferred.callback(None) def clear(self) -> None: if self._deferred.called: @@ -1042,26 +1043,15 @@ class DeferredEvent: if self.is_set(): return True - # Create a deferred that gets called in N seconds - sleep_deferred: "defer.Deferred[None]" = defer.Deferred() - call = self._clock.call_later( - timeout_seconds, - sleep_deferred.callback, - None, - ) - try: await make_deferred_yieldable( - defer.DeferredList( - [sleep_deferred, self._deferred], - fireOnOneCallback=True, - fireOnOneErrback=True, - consumeErrors=True, + timeout_deferred( + deferred=stop_cancellation(self._deferred), + timeout=timeout_seconds, + clock=self._clock, ) ) - finally: - # Cancel the sleep if we were woken up - if call.active(): - call.cancel() + except defer.TimeoutError: + pass return self.is_set() diff --git a/synapse/util/background_queue.py b/synapse/util/background_queue.py index daf6a9484..7e4c32266 100644 --- a/synapse/util/background_queue.py +++ b/synapse/util/background_queue.py @@ -25,6 +25,8 @@ from typing import ( TypeVar, ) +from twisted.internet import defer + from synapse.util.async_helpers import DeferredEvent from synapse.util.constants import MILLISECONDS_PER_SECOND @@ -110,6 +112,8 @@ class BackgroundQueue(Generic[T]): item = self._queue.popleft() try: await self._callback(item) + except defer.CancelledError: + raise except Exception: logger.exception("Error processing background queue item") diff --git a/tests/util/test_background_queue.py b/tests/util/test_background_queue.py index 56fa12128..901b01484 100644 --- a/tests/util/test_background_queue.py +++ b/tests/util/test_background_queue.py @@ -18,11 +18,12 @@ from unittest.mock import Mock from twisted.internet.defer import Deferred from twisted.internet.testing import MemoryReactor +from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable from synapse.server import HomeServer from synapse.util.background_queue import BackgroundQueue from synapse.util.clock import Clock -from tests.unittest import HomeserverTestCase +from tests.unittest import HomeserverTestCase, logcontext_clean class BackgroundQueueTests(HomeserverTestCase): @@ -38,11 +39,14 @@ class BackgroundQueueTests(HomeserverTestCase): timeout_ms=1000, ) + @logcontext_clean def test_simple_call(self) -> None: """Test that items added to the queue are processed.""" # Register a deferred to be the return value of the callback. callback_result_deferred: Deferred[None] = Deferred() - self._process_item_mock.side_effect = callback_result_deferred + self._process_item_mock.side_effect = lambda _: make_deferred_yieldable( + callback_result_deferred + ) # Adding an item should cause the callback to be invoked. self.queue.add(1) @@ -57,16 +61,20 @@ class BackgroundQueueTests(HomeserverTestCase): # Once the first callback completes, the second item should be # processed. - callback_result_deferred.callback(None) + with PreserveLoggingContext(): + callback_result_deferred.callback(None) self._process_item_mock.assert_called_once_with(2) + @logcontext_clean def test_timeout(self) -> None: """Test that the background process wakes up if its idle, and that it times out after being idle.""" # Register a deferred to be the return value of the callback. callback_result_deferred: Deferred[None] = Deferred() - self._process_item_mock.side_effect = callback_result_deferred + self._process_item_mock.side_effect = lambda _: make_deferred_yieldable( + callback_result_deferred + ) # Adding an item should cause the callback to be invoked. self.queue.add(1) @@ -75,7 +83,8 @@ class BackgroundQueueTests(HomeserverTestCase): self._process_item_mock.reset_mock() # Let the callback complete. - callback_result_deferred.callback(None) + with PreserveLoggingContext(): + callback_result_deferred.callback(None) # Advance the clock by less than the timeout, and add another item. self.reactor.advance(0.5) @@ -84,12 +93,15 @@ class BackgroundQueueTests(HomeserverTestCase): # The callback should be invoked again. callback_result_deferred = Deferred() - self._process_item_mock.side_effect = callback_result_deferred + self._process_item_mock.side_effect = lambda _: make_deferred_yieldable( + callback_result_deferred + ) self._process_item_mock.assert_called_once_with(2) self._process_item_mock.reset_mock() # Let the callback complete. - callback_result_deferred.callback(None) + with PreserveLoggingContext(): + callback_result_deferred.callback(None) # Advance the clock by more than the timeout. self.reactor.advance(1.5)