Introduce Clock.add_system_event_trigger(...) to include logcontext by default (#18945)
Introduce `Clock.add_system_event_trigger(...)` to wrap system event callback code in a logcontext, ensuring we can identify which server generated the logs. Background: > Ideally, nothing from the Synapse homeserver would be logged against the `sentinel` > logcontext as we want to know which server the logs came from. In practice, this is not > always the case yet especially outside of request handling. > > Global things outside of Synapse (e.g. Twisted reactor code) should run in the > `sentinel` logcontext. It's only when it calls into application code that a logcontext > gets activated. This means the reactor should be started in the `sentinel` logcontext, > and any time an awaitable yields control back to the reactor, it should reset the > logcontext to be the `sentinel` logcontext. This is important to avoid leaking the > current logcontext to the reactor (which would then get picked up and associated with > the next thing the reactor does). > > *-- `docs/log_contexts.md` Also adds a lint to prefer `Clock.add_system_event_trigger(...)` over `reactor.addSystemEventTrigger(...)` Part of https://github.com/element-hq/synapse/issues/18905
This commit is contained in:
parent
8d5d87fb0a
commit
d05f44a1c6
1
changelog.d/18945.misc
Normal file
1
changelog.d/18945.misc
Normal file
@ -0,0 +1 @@
|
||||
Introduce `Clock.add_system_event_trigger(...)` to wrap system event callback code in a logcontext, ensuring we can identify which server generated the logs.
|
||||
@ -74,6 +74,12 @@ PREFER_SYNAPSE_CLOCK_CALL_WHEN_RUNNING = ErrorCode(
|
||||
category="synapse-reactor-clock",
|
||||
)
|
||||
|
||||
PREFER_SYNAPSE_CLOCK_ADD_SYSTEM_EVENT_TRIGGER = ErrorCode(
|
||||
"prefer-synapse-clock-add-system-event-trigger",
|
||||
"`synapse.util.Clock.add_system_event_trigger` should be used instead of `reactor.addSystemEventTrigger`",
|
||||
category="synapse-reactor-clock",
|
||||
)
|
||||
|
||||
|
||||
class Sentinel(enum.Enum):
|
||||
# defining a sentinel in this way allows mypy to correctly handle the
|
||||
@ -242,6 +248,13 @@ class SynapsePlugin(Plugin):
|
||||
):
|
||||
return check_call_when_running
|
||||
|
||||
if fullname in (
|
||||
"twisted.internet.interfaces.IReactorCore.addSystemEventTrigger",
|
||||
"synapse.types.ISynapseThreadlessReactor.addSystemEventTrigger",
|
||||
"synapse.types.ISynapseReactor.addSystemEventTrigger",
|
||||
):
|
||||
return check_add_system_event_trigger
|
||||
|
||||
return None
|
||||
|
||||
|
||||
@ -272,6 +285,33 @@ def check_call_when_running(ctx: MethodSigContext) -> CallableType:
|
||||
return signature
|
||||
|
||||
|
||||
def check_add_system_event_trigger(ctx: MethodSigContext) -> CallableType:
|
||||
"""
|
||||
Ensure that the `reactor.addSystemEventTrigger` callsites aren't used.
|
||||
|
||||
`synapse.util.Clock.add_system_event_trigger` should always be used instead of
|
||||
`reactor.addSystemEventTrigger`.
|
||||
|
||||
Since `reactor.addSystemEventTrigger` is a reactor callback, the callback will start out
|
||||
with the sentinel logcontext. `synapse.util.Clock` starts a default logcontext as we
|
||||
want to know which server the logs came from.
|
||||
|
||||
Args:
|
||||
ctx: The `FunctionSigContext` from mypy.
|
||||
"""
|
||||
signature: CallableType = ctx.default_signature
|
||||
ctx.api.fail(
|
||||
(
|
||||
"Expected all `reactor.addSystemEventTrigger` calls to use `synapse.util.Clock.add_system_event_trigger` instead. "
|
||||
"This is so all Synapse code runs with a logcontext as we want to know which server the logs came from."
|
||||
),
|
||||
ctx.context,
|
||||
code=PREFER_SYNAPSE_CLOCK_ADD_SYSTEM_EVENT_TRIGGER,
|
||||
)
|
||||
|
||||
return signature
|
||||
|
||||
|
||||
def analyze_prometheus_metric_classes(ctx: ClassDefContext) -> None:
|
||||
"""
|
||||
Cross-check the list of Prometheus metric classes against the
|
||||
|
||||
@ -518,7 +518,9 @@ async def start(hs: "HomeServer") -> None:
|
||||
# numbers of DNS requests don't starve out other users of the threadpool.
|
||||
resolver_threadpool = ThreadPool(name="gai_resolver")
|
||||
resolver_threadpool.start()
|
||||
reactor.addSystemEventTrigger("during", "shutdown", resolver_threadpool.stop)
|
||||
hs.get_clock().add_system_event_trigger(
|
||||
"during", "shutdown", resolver_threadpool.stop
|
||||
)
|
||||
reactor.installNameResolver(
|
||||
GAIResolver(reactor, getThreadPool=lambda: resolver_threadpool)
|
||||
)
|
||||
@ -605,7 +607,7 @@ async def start(hs: "HomeServer") -> None:
|
||||
logger.info("Shutting down...")
|
||||
|
||||
# Log when we start the shut down process.
|
||||
hs.get_reactor().addSystemEventTrigger("before", "shutdown", log_shutdown)
|
||||
hs.get_clock().add_system_event_trigger("before", "shutdown", log_shutdown)
|
||||
|
||||
setup_sentry(hs)
|
||||
setup_sdnotify(hs)
|
||||
@ -720,7 +722,7 @@ def setup_sdnotify(hs: "HomeServer") -> None:
|
||||
# we're not using systemd.
|
||||
sdnotify(b"READY=1\nMAINPID=%i" % (os.getpid(),))
|
||||
|
||||
hs.get_reactor().addSystemEventTrigger(
|
||||
hs.get_clock().add_system_event_trigger(
|
||||
"before", "shutdown", sdnotify, b"STOPPING=1"
|
||||
)
|
||||
|
||||
|
||||
@ -541,7 +541,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
||||
self.send_stop_syncing, UPDATE_SYNCING_USERS_MS
|
||||
)
|
||||
|
||||
hs.get_reactor().addSystemEventTrigger(
|
||||
hs.get_clock().add_system_event_trigger(
|
||||
"before",
|
||||
"shutdown",
|
||||
run_as_background_process,
|
||||
@ -842,7 +842,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||
# have not yet been persisted
|
||||
self.unpersisted_users_changes: Set[str] = set()
|
||||
|
||||
hs.get_reactor().addSystemEventTrigger(
|
||||
hs.get_clock().add_system_event_trigger(
|
||||
"before",
|
||||
"shutdown",
|
||||
run_as_background_process,
|
||||
|
||||
@ -1007,7 +1007,7 @@ class HomeServer(metaclass=abc.ABCMeta):
|
||||
)
|
||||
|
||||
media_threadpool.start()
|
||||
self.get_reactor().addSystemEventTrigger(
|
||||
self.get_clock().add_system_event_trigger(
|
||||
"during", "shutdown", media_threadpool.stop
|
||||
)
|
||||
|
||||
|
||||
@ -455,7 +455,7 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore, MonthlyActiveUsersWorke
|
||||
self._client_ip_looper = self._clock.looping_call(
|
||||
self._update_client_ips_batch, 5 * 1000
|
||||
)
|
||||
self.hs.get_reactor().addSystemEventTrigger(
|
||||
self.hs.get_clock().add_system_event_trigger(
|
||||
"before", "shutdown", self._update_client_ips_batch
|
||||
)
|
||||
|
||||
|
||||
@ -99,7 +99,7 @@ class LockStore(SQLBaseStore):
|
||||
# lead to a race, as we may drop the lock while we are still processing.
|
||||
# However, a) it should be a small window, b) the lock is best effort
|
||||
# anyway and c) we want to really avoid leaking locks when we restart.
|
||||
hs.get_reactor().addSystemEventTrigger(
|
||||
hs.get_clock().add_system_event_trigger(
|
||||
"before",
|
||||
"shutdown",
|
||||
self._on_shutdown,
|
||||
|
||||
@ -206,3 +206,59 @@ class Clock:
|
||||
# We can ignore the lint here since this class is the one location
|
||||
# callWhenRunning should be called.
|
||||
self._reactor.callWhenRunning(wrapped_callback, *args, **kwargs) # type: ignore[prefer-synapse-clock-call-when-running]
|
||||
|
||||
def add_system_event_trigger(
|
||||
self,
|
||||
phase: str,
|
||||
event_type: str,
|
||||
callback: Callable[P, object],
|
||||
*args: P.args,
|
||||
**kwargs: P.kwargs,
|
||||
) -> None:
|
||||
"""
|
||||
Add a function to be called when a system event occurs.
|
||||
|
||||
Equivalent to `reactor.addSystemEventTrigger` (see the that docstring for more
|
||||
details), but ensures that the callback is run in a logging context.
|
||||
|
||||
Args:
|
||||
phase: a time to call the event -- either the string 'before', 'after', or
|
||||
'during', describing when to call it relative to the event's execution.
|
||||
eventType: this is a string describing the type of event.
|
||||
callback: Function to call
|
||||
*args: Postional arguments to pass to function.
|
||||
**kwargs: Key arguments to pass to function.
|
||||
"""
|
||||
|
||||
def wrapped_callback(*args: Any, **kwargs: Any) -> None:
|
||||
assert context.current_context() is context.SENTINEL_CONTEXT, (
|
||||
"Expected `add_system_event_trigger` callback from the reactor to start with the sentinel logcontext "
|
||||
f"but saw {context.current_context()}. In other words, another task shouldn't have "
|
||||
"leaked their logcontext to us."
|
||||
)
|
||||
|
||||
# Because this is a callback from the reactor, we will be using the
|
||||
# `sentinel` log context at this point. We want the function to log with
|
||||
# some logcontext as we want to know which server the logs came from.
|
||||
#
|
||||
# We use `PreserveLoggingContext` to prevent our new `system_event`
|
||||
# logcontext from finishing as soon as we exit this function, in case `f`
|
||||
# returns an awaitable/deferred which would continue running and may try to
|
||||
# restore the `loop_call` context when it's done (because it's trying to
|
||||
# adhere to the Synapse logcontext rules.)
|
||||
#
|
||||
# This also ensures that we return to the `sentinel` context when we exit
|
||||
# this function and yield control back to the reactor to avoid leaking the
|
||||
# current logcontext to the reactor (which would then get picked up and
|
||||
# associated with the next thing the reactor does)
|
||||
with context.PreserveLoggingContext(context.LoggingContext("system_event")):
|
||||
# We use `run_in_background` to reset the logcontext after `f` (or the
|
||||
# awaitable returned by `f`) completes to avoid leaking the current
|
||||
# logcontext to the reactor
|
||||
context.run_in_background(callback, *args, **kwargs)
|
||||
|
||||
# We can ignore the lint here since this class is the one location
|
||||
# `addSystemEventTrigger` should be called.
|
||||
self._reactor.addSystemEventTrigger(
|
||||
phase, event_type, wrapped_callback, *args, **kwargs
|
||||
) # type: ignore[prefer-synapse-clock-add-system-event-trigger]
|
||||
|
||||
Loading…
Reference in New Issue
Block a user