Add debug logs to track Clock callbacks (#19173)
Spawning from wanting to find the source of a `Clock.call_later()` callback, https://github.com/element-hq/synapse/issues/19165
This commit is contained in:
parent
b9dda0ff22
commit
47d24bd234
1
changelog.d/19173.misc
Normal file
1
changelog.d/19173.misc
Normal file
@ -0,0 +1 @@
|
||||
Add debug logs to track `Clock` utilities.
|
||||
@ -65,8 +65,6 @@ from typing import (
|
||||
Sequence,
|
||||
)
|
||||
|
||||
from twisted.internet.interfaces import IDelayedCall
|
||||
|
||||
from synapse.appservice import (
|
||||
ApplicationService,
|
||||
ApplicationServiceState,
|
||||
@ -78,7 +76,7 @@ from synapse.events import EventBase
|
||||
from synapse.logging.context import run_in_background
|
||||
from synapse.storage.databases.main import DataStore
|
||||
from synapse.types import DeviceListUpdates, JsonMapping
|
||||
from synapse.util.clock import Clock
|
||||
from synapse.util.clock import Clock, DelayedCallWrapper
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
@ -503,7 +501,7 @@ class _Recoverer:
|
||||
self.service = service
|
||||
self.callback = callback
|
||||
self.backoff_counter = 1
|
||||
self.scheduled_recovery: IDelayedCall | None = None
|
||||
self.scheduled_recovery: DelayedCallWrapper | None = None
|
||||
|
||||
def recover(self) -> None:
|
||||
delay = 2**self.backoff_counter
|
||||
|
||||
@ -14,6 +14,7 @@
|
||||
#
|
||||
|
||||
|
||||
import logging
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
@ -30,10 +31,14 @@ from twisted.internet.task import LoopingCall
|
||||
from synapse.logging import context
|
||||
from synapse.types import ISynapseThreadlessReactor
|
||||
from synapse.util import log_failure
|
||||
from synapse.util.stringutils import random_string_insecure_fast
|
||||
|
||||
P = ParamSpec("P")
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Clock:
|
||||
"""
|
||||
A Clock wraps a Twisted reactor and provides utilities on top of it.
|
||||
@ -64,7 +69,12 @@ class Clock:
|
||||
"""List of active looping calls"""
|
||||
|
||||
self._call_id_to_delayed_call: dict[int, IDelayedCall] = {}
|
||||
"""Mapping from unique call ID to delayed call"""
|
||||
"""
|
||||
Mapping from unique call ID to delayed call.
|
||||
|
||||
For "performance", this only tracks a subset of delayed calls: those created
|
||||
with `call_later` with `call_later_cancel_on_shutdown=True`.
|
||||
"""
|
||||
|
||||
self._is_shutdown = False
|
||||
"""Whether shutdown has been requested by the HomeServer"""
|
||||
@ -153,11 +163,20 @@ class Clock:
|
||||
**kwargs: P.kwargs,
|
||||
) -> LoopingCall:
|
||||
"""Common functionality for `looping_call` and `looping_call_now`"""
|
||||
instance_id = random_string_insecure_fast(5)
|
||||
|
||||
if self._is_shutdown:
|
||||
raise Exception("Cannot start looping call. Clock has been shutdown")
|
||||
|
||||
looping_call_context_string = "looping_call"
|
||||
if now:
|
||||
looping_call_context_string = "looping_call_now"
|
||||
|
||||
def wrapped_f(*args: P.args, **kwargs: P.kwargs) -> Deferred:
|
||||
logger.debug(
|
||||
"%s(%s): Executing callback", looping_call_context_string, instance_id
|
||||
)
|
||||
|
||||
assert context.current_context() is context.SENTINEL_CONTEXT, (
|
||||
"Expected `looping_call` callback from the reactor to start with the sentinel logcontext "
|
||||
f"but saw {context.current_context()}. In other words, another task shouldn't have "
|
||||
@ -201,6 +220,17 @@ class Clock:
|
||||
d = call.start(msec / 1000.0, now=now)
|
||||
d.addErrback(log_failure, "Looping call died", consumeErrors=False)
|
||||
self._looping_calls.append(call)
|
||||
|
||||
logger.debug(
|
||||
"%s(%s): Scheduled looping call every %sms later",
|
||||
looping_call_context_string,
|
||||
instance_id,
|
||||
msec,
|
||||
# Find out who is scheduling the call which makes it easy to follow in the
|
||||
# logs.
|
||||
stack_info=True,
|
||||
)
|
||||
|
||||
return call
|
||||
|
||||
def cancel_all_looping_calls(self, consumeErrors: bool = True) -> None:
|
||||
@ -226,7 +256,7 @@ class Clock:
|
||||
*args: Any,
|
||||
call_later_cancel_on_shutdown: bool = True,
|
||||
**kwargs: Any,
|
||||
) -> IDelayedCall:
|
||||
) -> "DelayedCallWrapper":
|
||||
"""Call something later
|
||||
|
||||
Note that the function will be called with generic `call_later` logcontext, so
|
||||
@ -245,74 +275,79 @@ class Clock:
|
||||
issue, we can just track all delayed calls.
|
||||
**kwargs: Key arguments to pass to function.
|
||||
"""
|
||||
call_id = self._delayed_call_id
|
||||
self._delayed_call_id = self._delayed_call_id + 1
|
||||
|
||||
if self._is_shutdown:
|
||||
raise Exception("Cannot start delayed call. Clock has been shutdown")
|
||||
|
||||
def create_wrapped_callback(
|
||||
track_for_shutdown_cancellation: bool,
|
||||
) -> Callable[P, None]:
|
||||
def wrapped_callback(*args: Any, **kwargs: Any) -> None:
|
||||
assert context.current_context() is context.SENTINEL_CONTEXT, (
|
||||
"Expected `call_later` callback from the reactor to start with the sentinel logcontext "
|
||||
f"but saw {context.current_context()}. In other words, another task shouldn't have "
|
||||
"leaked their logcontext to us."
|
||||
)
|
||||
def wrapped_callback(*args: Any, **kwargs: Any) -> None:
|
||||
logger.debug("call_later(%s): Executing callback", call_id)
|
||||
|
||||
# Because this is a callback from the reactor, we will be using the
|
||||
# `sentinel` log context at this point. We want the function to log with
|
||||
# some logcontext as we want to know which server the logs came from.
|
||||
#
|
||||
# We use `PreserveLoggingContext` to prevent our new `call_later`
|
||||
# logcontext from finishing as soon as we exit this function, in case `f`
|
||||
# returns an awaitable/deferred which would continue running and may try to
|
||||
# restore the `call_later` context when it's done (because it's trying to
|
||||
# adhere to the Synapse logcontext rules.)
|
||||
#
|
||||
# This also ensures that we return to the `sentinel` context when we exit
|
||||
# this function and yield control back to the reactor to avoid leaking the
|
||||
# current logcontext to the reactor (which would then get picked up and
|
||||
# associated with the next thing the reactor does)
|
||||
try:
|
||||
with context.PreserveLoggingContext(
|
||||
context.LoggingContext(
|
||||
name="call_later", server_name=self._server_name
|
||||
)
|
||||
):
|
||||
# We use `run_in_background` to reset the logcontext after `f` (or the
|
||||
# awaitable returned by `f`) completes to avoid leaking the current
|
||||
# logcontext to the reactor
|
||||
context.run_in_background(callback, *args, **kwargs)
|
||||
finally:
|
||||
if track_for_shutdown_cancellation:
|
||||
# We still want to remove the call from the tracking map. Even if
|
||||
# the callback raises an exception.
|
||||
self._call_id_to_delayed_call.pop(call_id)
|
||||
assert context.current_context() is context.SENTINEL_CONTEXT, (
|
||||
"Expected `call_later` callback from the reactor to start with the sentinel logcontext "
|
||||
f"but saw {context.current_context()}. In other words, another task shouldn't have "
|
||||
"leaked their logcontext to us."
|
||||
)
|
||||
|
||||
return wrapped_callback
|
||||
# Because this is a callback from the reactor, we will be using the
|
||||
# `sentinel` log context at this point. We want the function to log with
|
||||
# some logcontext as we want to know which server the logs came from.
|
||||
#
|
||||
# We use `PreserveLoggingContext` to prevent our new `call_later`
|
||||
# logcontext from finishing as soon as we exit this function, in case `f`
|
||||
# returns an awaitable/deferred which would continue running and may try to
|
||||
# restore the `call_later` context when it's done (because it's trying to
|
||||
# adhere to the Synapse logcontext rules.)
|
||||
#
|
||||
# This also ensures that we return to the `sentinel` context when we exit
|
||||
# this function and yield control back to the reactor to avoid leaking the
|
||||
# current logcontext to the reactor (which would then get picked up and
|
||||
# associated with the next thing the reactor does)
|
||||
try:
|
||||
with context.PreserveLoggingContext(
|
||||
context.LoggingContext(
|
||||
name="call_later", server_name=self._server_name
|
||||
)
|
||||
):
|
||||
# We use `run_in_background` to reset the logcontext after `f` (or the
|
||||
# awaitable returned by `f`) completes to avoid leaking the current
|
||||
# logcontext to the reactor
|
||||
context.run_in_background(callback, *args, **kwargs)
|
||||
finally:
|
||||
if call_later_cancel_on_shutdown:
|
||||
# We still want to remove the call from the tracking map. Even if
|
||||
# the callback raises an exception.
|
||||
self._call_id_to_delayed_call.pop(call_id)
|
||||
|
||||
# We can ignore the lint here since this class is the one location callLater should
|
||||
# be called.
|
||||
call = self._reactor.callLater(delay, wrapped_callback, *args, **kwargs) # type: ignore[call-later-not-tracked]
|
||||
|
||||
logger.debug(
|
||||
"call_later(%s): Scheduled call for %ss later (tracked for shutdown: %s)",
|
||||
call_id,
|
||||
delay,
|
||||
call_later_cancel_on_shutdown,
|
||||
# Find out who is scheduling the call which makes it easy to follow in the
|
||||
# logs.
|
||||
stack_info=True,
|
||||
)
|
||||
|
||||
wrapped_call = DelayedCallWrapper(call, call_id, self)
|
||||
if call_later_cancel_on_shutdown:
|
||||
call_id = self._delayed_call_id
|
||||
self._delayed_call_id = self._delayed_call_id + 1
|
||||
self._call_id_to_delayed_call[call_id] = wrapped_call
|
||||
|
||||
# We can ignore the lint here since this class is the one location callLater
|
||||
# should be called.
|
||||
call = self._reactor.callLater(
|
||||
delay, create_wrapped_callback(True), *args, **kwargs
|
||||
) # type: ignore[call-later-not-tracked]
|
||||
call = DelayedCallWrapper(call, call_id, self)
|
||||
self._call_id_to_delayed_call[call_id] = call
|
||||
return call
|
||||
else:
|
||||
# We can ignore the lint here since this class is the one location callLater should
|
||||
# be called.
|
||||
return self._reactor.callLater(
|
||||
delay, create_wrapped_callback(False), *args, **kwargs
|
||||
) # type: ignore[call-later-not-tracked]
|
||||
return wrapped_call
|
||||
|
||||
def cancel_call_later(self, timer: IDelayedCall, ignore_errs: bool = False) -> None:
|
||||
def cancel_call_later(
|
||||
self, wrapped_call: "DelayedCallWrapper", ignore_errs: bool = False
|
||||
) -> None:
|
||||
try:
|
||||
timer.cancel()
|
||||
logger.debug(
|
||||
"cancel_call_later: cancelling scheduled call %s", wrapped_call.call_id
|
||||
)
|
||||
wrapped_call.delayed_call.cancel()
|
||||
except Exception:
|
||||
if not ignore_errs:
|
||||
raise
|
||||
@ -327,8 +362,11 @@ class Clock:
|
||||
"""
|
||||
# We make a copy here since calling `cancel()` on a delayed_call
|
||||
# will result in the call removing itself from the map mid-iteration.
|
||||
for call in list(self._call_id_to_delayed_call.values()):
|
||||
for call_id, call in list(self._call_id_to_delayed_call.items()):
|
||||
try:
|
||||
logger.debug(
|
||||
"cancel_all_delayed_calls: cancelling scheduled call %s", call_id
|
||||
)
|
||||
call.cancel()
|
||||
except Exception:
|
||||
if not ignore_errs:
|
||||
@ -352,8 +390,11 @@ class Clock:
|
||||
*args: Postional arguments to pass to function.
|
||||
**kwargs: Key arguments to pass to function.
|
||||
"""
|
||||
instance_id = random_string_insecure_fast(5)
|
||||
|
||||
def wrapped_callback(*args: Any, **kwargs: Any) -> None:
|
||||
logger.debug("call_when_running(%s): Executing callback", instance_id)
|
||||
|
||||
# Since this callback can be invoked immediately if the reactor is already
|
||||
# running, we can't always assume that we're running in the sentinel
|
||||
# logcontext (i.e. we can't assert that we're in the sentinel context like
|
||||
@ -392,6 +433,14 @@ class Clock:
|
||||
# callWhenRunning should be called.
|
||||
self._reactor.callWhenRunning(wrapped_callback, *args, **kwargs) # type: ignore[prefer-synapse-clock-call-when-running]
|
||||
|
||||
logger.debug(
|
||||
"call_when_running(%s): Scheduled call",
|
||||
instance_id,
|
||||
# Find out who is scheduling the call which makes it easy to follow in the
|
||||
# logs.
|
||||
stack_info=True,
|
||||
)
|
||||
|
||||
def add_system_event_trigger(
|
||||
self,
|
||||
phase: str,
|
||||
@ -417,8 +466,16 @@ class Clock:
|
||||
Returns:
|
||||
an ID that can be used to remove this call with `reactor.removeSystemEventTrigger`.
|
||||
"""
|
||||
instance_id = random_string_insecure_fast(5)
|
||||
|
||||
def wrapped_callback(*args: Any, **kwargs: Any) -> None:
|
||||
logger.debug(
|
||||
"add_system_event_trigger(%s): Executing %s %s callback",
|
||||
instance_id,
|
||||
phase,
|
||||
event_type,
|
||||
)
|
||||
|
||||
assert context.current_context() is context.SENTINEL_CONTEXT, (
|
||||
"Expected `add_system_event_trigger` callback from the reactor to start with the sentinel logcontext "
|
||||
f"but saw {context.current_context()}. In other words, another task shouldn't have "
|
||||
@ -449,6 +506,16 @@ class Clock:
|
||||
# logcontext to the reactor
|
||||
context.run_in_background(callback, *args, **kwargs)
|
||||
|
||||
logger.debug(
|
||||
"add_system_event_trigger(%s) for %s %s",
|
||||
instance_id,
|
||||
phase,
|
||||
event_type,
|
||||
# Find out who is scheduling the call which makes it easy to follow in the
|
||||
# logs.
|
||||
stack_info=True,
|
||||
)
|
||||
|
||||
# We can ignore the lint here since this class is the one location
|
||||
# `addSystemEventTrigger` should be called.
|
||||
return self._reactor.addSystemEventTrigger(
|
||||
|
||||
Loading…
Reference in New Issue
Block a user