Fixup logcontexts after replication PR. (#19146)

Fixes logcontext leaks introduced in #19138.
This commit is contained in:
Erik Johnston 2025-11-05 15:38:14 +00:00 committed by GitHub
parent d3ffd04f66
commit 6790312831
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 32 additions and 25 deletions

1
changelog.d/19146.misc Normal file
View File

@ -0,0 +1 @@
Minor speed up of processing of inbound replication.

View File

@ -1029,7 +1029,8 @@ class DeferredEvent:
def set(self) -> None: def set(self) -> None:
if not self._deferred.called: if not self._deferred.called:
self._deferred.callback(None) with PreserveLoggingContext():
self._deferred.callback(None)
def clear(self) -> None: def clear(self) -> None:
if self._deferred.called: if self._deferred.called:
@ -1042,26 +1043,15 @@ class DeferredEvent:
if self.is_set(): if self.is_set():
return True return True
# Create a deferred that gets called in N seconds
sleep_deferred: "defer.Deferred[None]" = defer.Deferred()
call = self._clock.call_later(
timeout_seconds,
sleep_deferred.callback,
None,
)
try: try:
await make_deferred_yieldable( await make_deferred_yieldable(
defer.DeferredList( timeout_deferred(
[sleep_deferred, self._deferred], deferred=stop_cancellation(self._deferred),
fireOnOneCallback=True, timeout=timeout_seconds,
fireOnOneErrback=True, clock=self._clock,
consumeErrors=True,
) )
) )
finally: except defer.TimeoutError:
# Cancel the sleep if we were woken up pass
if call.active():
call.cancel()
return self.is_set() return self.is_set()

View File

@ -25,6 +25,8 @@ from typing import (
TypeVar, TypeVar,
) )
from twisted.internet import defer
from synapse.util.async_helpers import DeferredEvent from synapse.util.async_helpers import DeferredEvent
from synapse.util.constants import MILLISECONDS_PER_SECOND from synapse.util.constants import MILLISECONDS_PER_SECOND
@ -110,6 +112,8 @@ class BackgroundQueue(Generic[T]):
item = self._queue.popleft() item = self._queue.popleft()
try: try:
await self._callback(item) await self._callback(item)
except defer.CancelledError:
raise
except Exception: except Exception:
logger.exception("Error processing background queue item") logger.exception("Error processing background queue item")

View File

@ -18,11 +18,12 @@ from unittest.mock import Mock
from twisted.internet.defer import Deferred from twisted.internet.defer import Deferred
from twisted.internet.testing import MemoryReactor from twisted.internet.testing import MemoryReactor
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.util.background_queue import BackgroundQueue from synapse.util.background_queue import BackgroundQueue
from synapse.util.clock import Clock from synapse.util.clock import Clock
from tests.unittest import HomeserverTestCase from tests.unittest import HomeserverTestCase, logcontext_clean
class BackgroundQueueTests(HomeserverTestCase): class BackgroundQueueTests(HomeserverTestCase):
@ -38,11 +39,14 @@ class BackgroundQueueTests(HomeserverTestCase):
timeout_ms=1000, timeout_ms=1000,
) )
@logcontext_clean
def test_simple_call(self) -> None: def test_simple_call(self) -> None:
"""Test that items added to the queue are processed.""" """Test that items added to the queue are processed."""
# Register a deferred to be the return value of the callback. # Register a deferred to be the return value of the callback.
callback_result_deferred: Deferred[None] = Deferred() callback_result_deferred: Deferred[None] = Deferred()
self._process_item_mock.side_effect = callback_result_deferred self._process_item_mock.side_effect = lambda _: make_deferred_yieldable(
callback_result_deferred
)
# Adding an item should cause the callback to be invoked. # Adding an item should cause the callback to be invoked.
self.queue.add(1) self.queue.add(1)
@ -57,16 +61,20 @@ class BackgroundQueueTests(HomeserverTestCase):
# Once the first callback completes, the second item should be # Once the first callback completes, the second item should be
# processed. # processed.
callback_result_deferred.callback(None) with PreserveLoggingContext():
callback_result_deferred.callback(None)
self._process_item_mock.assert_called_once_with(2) self._process_item_mock.assert_called_once_with(2)
@logcontext_clean
def test_timeout(self) -> None: def test_timeout(self) -> None:
"""Test that the background process wakes up if its idle, and that it """Test that the background process wakes up if its idle, and that it
times out after being idle.""" times out after being idle."""
# Register a deferred to be the return value of the callback. # Register a deferred to be the return value of the callback.
callback_result_deferred: Deferred[None] = Deferred() callback_result_deferred: Deferred[None] = Deferred()
self._process_item_mock.side_effect = callback_result_deferred self._process_item_mock.side_effect = lambda _: make_deferred_yieldable(
callback_result_deferred
)
# Adding an item should cause the callback to be invoked. # Adding an item should cause the callback to be invoked.
self.queue.add(1) self.queue.add(1)
@ -75,7 +83,8 @@ class BackgroundQueueTests(HomeserverTestCase):
self._process_item_mock.reset_mock() self._process_item_mock.reset_mock()
# Let the callback complete. # Let the callback complete.
callback_result_deferred.callback(None) with PreserveLoggingContext():
callback_result_deferred.callback(None)
# Advance the clock by less than the timeout, and add another item. # Advance the clock by less than the timeout, and add another item.
self.reactor.advance(0.5) self.reactor.advance(0.5)
@ -84,12 +93,15 @@ class BackgroundQueueTests(HomeserverTestCase):
# The callback should be invoked again. # The callback should be invoked again.
callback_result_deferred = Deferred() callback_result_deferred = Deferred()
self._process_item_mock.side_effect = callback_result_deferred self._process_item_mock.side_effect = lambda _: make_deferred_yieldable(
callback_result_deferred
)
self._process_item_mock.assert_called_once_with(2) self._process_item_mock.assert_called_once_with(2)
self._process_item_mock.reset_mock() self._process_item_mock.reset_mock()
# Let the callback complete. # Let the callback complete.
callback_result_deferred.callback(None) with PreserveLoggingContext():
callback_result_deferred.callback(None)
# Advance the clock by more than the timeout. # Advance the clock by more than the timeout.
self.reactor.advance(1.5) self.reactor.advance(1.5)