diff --git a/changelog.d/19173.misc b/changelog.d/19173.misc new file mode 100644 index 000000000..b0f144ddc --- /dev/null +++ b/changelog.d/19173.misc @@ -0,0 +1 @@ +Add debug logs to track `Clock` utilities. diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 30c22780b..250f84d64 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -65,8 +65,6 @@ from typing import ( Sequence, ) -from twisted.internet.interfaces import IDelayedCall - from synapse.appservice import ( ApplicationService, ApplicationServiceState, @@ -78,7 +76,7 @@ from synapse.events import EventBase from synapse.logging.context import run_in_background from synapse.storage.databases.main import DataStore from synapse.types import DeviceListUpdates, JsonMapping -from synapse.util.clock import Clock +from synapse.util.clock import Clock, DelayedCallWrapper if TYPE_CHECKING: from synapse.server import HomeServer @@ -503,7 +501,7 @@ class _Recoverer: self.service = service self.callback = callback self.backoff_counter = 1 - self.scheduled_recovery: IDelayedCall | None = None + self.scheduled_recovery: DelayedCallWrapper | None = None def recover(self) -> None: delay = 2**self.backoff_counter diff --git a/synapse/util/clock.py b/synapse/util/clock.py index 5b59cef60..65f716489 100644 --- a/synapse/util/clock.py +++ b/synapse/util/clock.py @@ -14,6 +14,7 @@ # +import logging from typing import ( Any, Callable, @@ -30,10 +31,14 @@ from twisted.internet.task import LoopingCall from synapse.logging import context from synapse.types import ISynapseThreadlessReactor from synapse.util import log_failure +from synapse.util.stringutils import random_string_insecure_fast P = ParamSpec("P") +logger = logging.getLogger(__name__) + + class Clock: """ A Clock wraps a Twisted reactor and provides utilities on top of it. @@ -64,7 +69,12 @@ class Clock: """List of active looping calls""" self._call_id_to_delayed_call: dict[int, IDelayedCall] = {} - """Mapping from unique call ID to delayed call""" + """ + Mapping from unique call ID to delayed call. + + For "performance", this only tracks a subset of delayed calls: those created + with `call_later` with `call_later_cancel_on_shutdown=True`. + """ self._is_shutdown = False """Whether shutdown has been requested by the HomeServer""" @@ -153,11 +163,20 @@ class Clock: **kwargs: P.kwargs, ) -> LoopingCall: """Common functionality for `looping_call` and `looping_call_now`""" + instance_id = random_string_insecure_fast(5) if self._is_shutdown: raise Exception("Cannot start looping call. Clock has been shutdown") + looping_call_context_string = "looping_call" + if now: + looping_call_context_string = "looping_call_now" + def wrapped_f(*args: P.args, **kwargs: P.kwargs) -> Deferred: + logger.debug( + "%s(%s): Executing callback", looping_call_context_string, instance_id + ) + assert context.current_context() is context.SENTINEL_CONTEXT, ( "Expected `looping_call` callback from the reactor to start with the sentinel logcontext " f"but saw {context.current_context()}. In other words, another task shouldn't have " @@ -201,6 +220,17 @@ class Clock: d = call.start(msec / 1000.0, now=now) d.addErrback(log_failure, "Looping call died", consumeErrors=False) self._looping_calls.append(call) + + logger.debug( + "%s(%s): Scheduled looping call every %sms later", + looping_call_context_string, + instance_id, + msec, + # Find out who is scheduling the call which makes it easy to follow in the + # logs. + stack_info=True, + ) + return call def cancel_all_looping_calls(self, consumeErrors: bool = True) -> None: @@ -226,7 +256,7 @@ class Clock: *args: Any, call_later_cancel_on_shutdown: bool = True, **kwargs: Any, - ) -> IDelayedCall: + ) -> "DelayedCallWrapper": """Call something later Note that the function will be called with generic `call_later` logcontext, so @@ -245,74 +275,79 @@ class Clock: issue, we can just track all delayed calls. **kwargs: Key arguments to pass to function. """ + call_id = self._delayed_call_id + self._delayed_call_id = self._delayed_call_id + 1 if self._is_shutdown: raise Exception("Cannot start delayed call. Clock has been shutdown") - def create_wrapped_callback( - track_for_shutdown_cancellation: bool, - ) -> Callable[P, None]: - def wrapped_callback(*args: Any, **kwargs: Any) -> None: - assert context.current_context() is context.SENTINEL_CONTEXT, ( - "Expected `call_later` callback from the reactor to start with the sentinel logcontext " - f"but saw {context.current_context()}. In other words, another task shouldn't have " - "leaked their logcontext to us." - ) + def wrapped_callback(*args: Any, **kwargs: Any) -> None: + logger.debug("call_later(%s): Executing callback", call_id) - # Because this is a callback from the reactor, we will be using the - # `sentinel` log context at this point. We want the function to log with - # some logcontext as we want to know which server the logs came from. - # - # We use `PreserveLoggingContext` to prevent our new `call_later` - # logcontext from finishing as soon as we exit this function, in case `f` - # returns an awaitable/deferred which would continue running and may try to - # restore the `call_later` context when it's done (because it's trying to - # adhere to the Synapse logcontext rules.) - # - # This also ensures that we return to the `sentinel` context when we exit - # this function and yield control back to the reactor to avoid leaking the - # current logcontext to the reactor (which would then get picked up and - # associated with the next thing the reactor does) - try: - with context.PreserveLoggingContext( - context.LoggingContext( - name="call_later", server_name=self._server_name - ) - ): - # We use `run_in_background` to reset the logcontext after `f` (or the - # awaitable returned by `f`) completes to avoid leaking the current - # logcontext to the reactor - context.run_in_background(callback, *args, **kwargs) - finally: - if track_for_shutdown_cancellation: - # We still want to remove the call from the tracking map. Even if - # the callback raises an exception. - self._call_id_to_delayed_call.pop(call_id) + assert context.current_context() is context.SENTINEL_CONTEXT, ( + "Expected `call_later` callback from the reactor to start with the sentinel logcontext " + f"but saw {context.current_context()}. In other words, another task shouldn't have " + "leaked their logcontext to us." + ) - return wrapped_callback + # Because this is a callback from the reactor, we will be using the + # `sentinel` log context at this point. We want the function to log with + # some logcontext as we want to know which server the logs came from. + # + # We use `PreserveLoggingContext` to prevent our new `call_later` + # logcontext from finishing as soon as we exit this function, in case `f` + # returns an awaitable/deferred which would continue running and may try to + # restore the `call_later` context when it's done (because it's trying to + # adhere to the Synapse logcontext rules.) + # + # This also ensures that we return to the `sentinel` context when we exit + # this function and yield control back to the reactor to avoid leaking the + # current logcontext to the reactor (which would then get picked up and + # associated with the next thing the reactor does) + try: + with context.PreserveLoggingContext( + context.LoggingContext( + name="call_later", server_name=self._server_name + ) + ): + # We use `run_in_background` to reset the logcontext after `f` (or the + # awaitable returned by `f`) completes to avoid leaking the current + # logcontext to the reactor + context.run_in_background(callback, *args, **kwargs) + finally: + if call_later_cancel_on_shutdown: + # We still want to remove the call from the tracking map. Even if + # the callback raises an exception. + self._call_id_to_delayed_call.pop(call_id) + # We can ignore the lint here since this class is the one location callLater should + # be called. + call = self._reactor.callLater(delay, wrapped_callback, *args, **kwargs) # type: ignore[call-later-not-tracked] + + logger.debug( + "call_later(%s): Scheduled call for %ss later (tracked for shutdown: %s)", + call_id, + delay, + call_later_cancel_on_shutdown, + # Find out who is scheduling the call which makes it easy to follow in the + # logs. + stack_info=True, + ) + + wrapped_call = DelayedCallWrapper(call, call_id, self) if call_later_cancel_on_shutdown: - call_id = self._delayed_call_id - self._delayed_call_id = self._delayed_call_id + 1 + self._call_id_to_delayed_call[call_id] = wrapped_call - # We can ignore the lint here since this class is the one location callLater - # should be called. - call = self._reactor.callLater( - delay, create_wrapped_callback(True), *args, **kwargs - ) # type: ignore[call-later-not-tracked] - call = DelayedCallWrapper(call, call_id, self) - self._call_id_to_delayed_call[call_id] = call - return call - else: - # We can ignore the lint here since this class is the one location callLater should - # be called. - return self._reactor.callLater( - delay, create_wrapped_callback(False), *args, **kwargs - ) # type: ignore[call-later-not-tracked] + return wrapped_call - def cancel_call_later(self, timer: IDelayedCall, ignore_errs: bool = False) -> None: + def cancel_call_later( + self, wrapped_call: "DelayedCallWrapper", ignore_errs: bool = False + ) -> None: try: - timer.cancel() + logger.debug( + "cancel_call_later: cancelling scheduled call %s", wrapped_call.call_id + ) + wrapped_call.delayed_call.cancel() except Exception: if not ignore_errs: raise @@ -327,8 +362,11 @@ class Clock: """ # We make a copy here since calling `cancel()` on a delayed_call # will result in the call removing itself from the map mid-iteration. - for call in list(self._call_id_to_delayed_call.values()): + for call_id, call in list(self._call_id_to_delayed_call.items()): try: + logger.debug( + "cancel_all_delayed_calls: cancelling scheduled call %s", call_id + ) call.cancel() except Exception: if not ignore_errs: @@ -352,8 +390,11 @@ class Clock: *args: Postional arguments to pass to function. **kwargs: Key arguments to pass to function. """ + instance_id = random_string_insecure_fast(5) def wrapped_callback(*args: Any, **kwargs: Any) -> None: + logger.debug("call_when_running(%s): Executing callback", instance_id) + # Since this callback can be invoked immediately if the reactor is already # running, we can't always assume that we're running in the sentinel # logcontext (i.e. we can't assert that we're in the sentinel context like @@ -392,6 +433,14 @@ class Clock: # callWhenRunning should be called. self._reactor.callWhenRunning(wrapped_callback, *args, **kwargs) # type: ignore[prefer-synapse-clock-call-when-running] + logger.debug( + "call_when_running(%s): Scheduled call", + instance_id, + # Find out who is scheduling the call which makes it easy to follow in the + # logs. + stack_info=True, + ) + def add_system_event_trigger( self, phase: str, @@ -417,8 +466,16 @@ class Clock: Returns: an ID that can be used to remove this call with `reactor.removeSystemEventTrigger`. """ + instance_id = random_string_insecure_fast(5) def wrapped_callback(*args: Any, **kwargs: Any) -> None: + logger.debug( + "add_system_event_trigger(%s): Executing %s %s callback", + instance_id, + phase, + event_type, + ) + assert context.current_context() is context.SENTINEL_CONTEXT, ( "Expected `add_system_event_trigger` callback from the reactor to start with the sentinel logcontext " f"but saw {context.current_context()}. In other words, another task shouldn't have " @@ -449,6 +506,16 @@ class Clock: # logcontext to the reactor context.run_in_background(callback, *args, **kwargs) + logger.debug( + "add_system_event_trigger(%s) for %s %s", + instance_id, + phase, + event_type, + # Find out who is scheduling the call which makes it easy to follow in the + # logs. + stack_info=True, + ) + # We can ignore the lint here since this class is the one location # `addSystemEventTrigger` should be called. return self._reactor.addSystemEventTrigger(