Minor performance improvements to notifier/replication (#18367)
These are some improvements to `on_new_event` which is a hot path. Not sure how much this will save, but maybe like ~5%? Possibly easier to review commit-by-commit
This commit is contained in:
parent
ad140130cc
commit
4eaab31757
1
changelog.d/18367.misc
Normal file
1
changelog.d/18367.misc
Normal file
@ -0,0 +1 @@
|
||||
Minor performance improvements to the notifier.
|
||||
@ -66,7 +66,6 @@ from synapse.types import (
|
||||
from synapse.util.async_helpers import (
|
||||
timeout_deferred,
|
||||
)
|
||||
from synapse.util.metrics import Measure
|
||||
from synapse.util.stringutils import shortstr
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
||||
@ -520,20 +519,22 @@ class Notifier:
|
||||
users = users or []
|
||||
rooms = rooms or []
|
||||
|
||||
with Measure(self.clock, "on_new_event"):
|
||||
user_streams: Set[_NotifierUserStream] = set()
|
||||
user_streams: Set[_NotifierUserStream] = set()
|
||||
|
||||
log_kv(
|
||||
{
|
||||
"waking_up_explicit_users": len(users),
|
||||
"waking_up_explicit_rooms": len(rooms),
|
||||
"users": shortstr(users),
|
||||
"rooms": shortstr(rooms),
|
||||
"stream": stream_key,
|
||||
"stream_id": new_token,
|
||||
}
|
||||
)
|
||||
log_kv(
|
||||
{
|
||||
"waking_up_explicit_users": len(users),
|
||||
"waking_up_explicit_rooms": len(rooms),
|
||||
"users": shortstr(users),
|
||||
"rooms": shortstr(rooms),
|
||||
"stream": stream_key,
|
||||
"stream_id": new_token,
|
||||
}
|
||||
)
|
||||
|
||||
# Only calculate which user streams to wake up if there are, in fact,
|
||||
# any user streams registered.
|
||||
if self.user_to_user_stream or self.room_to_user_streams:
|
||||
for user in users:
|
||||
user_stream = self.user_to_user_stream.get(str(user))
|
||||
if user_stream is not None:
|
||||
@ -565,25 +566,25 @@ class Notifier:
|
||||
# We resolve all these deferreds in one go so that we only need to
|
||||
# call `PreserveLoggingContext` once, as it has a bunch of overhead
|
||||
# (to calculate performance stats)
|
||||
with PreserveLoggingContext():
|
||||
for listener in listeners:
|
||||
listener.callback(current_token)
|
||||
if listeners:
|
||||
with PreserveLoggingContext():
|
||||
for listener in listeners:
|
||||
listener.callback(current_token)
|
||||
|
||||
users_woken_by_stream_counter.labels(stream_key).inc(len(user_streams))
|
||||
if user_streams:
|
||||
users_woken_by_stream_counter.labels(stream_key).inc(len(user_streams))
|
||||
|
||||
self.notify_replication()
|
||||
self.notify_replication()
|
||||
|
||||
# Notify appservices.
|
||||
try:
|
||||
self.appservice_handler.notify_interested_services_ephemeral(
|
||||
stream_key,
|
||||
new_token,
|
||||
users,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Error notifying application services of ephemeral events"
|
||||
)
|
||||
# Notify appservices.
|
||||
try:
|
||||
self.appservice_handler.notify_interested_services_ephemeral(
|
||||
stream_key,
|
||||
new_token,
|
||||
users,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Error notifying application services of ephemeral events")
|
||||
|
||||
def on_new_replication_data(self) -> None:
|
||||
"""Used to inform replication listeners that something has happened
|
||||
|
||||
Loading…
Reference in New Issue
Block a user