Faster redis replication handling (#19138)

Spawning a background process comes with a bunch of overhead, so let's
try to reduce the number of background processes we need to spawn when
handling inbound fed.

Currently, we seem to be doing roughly one per command. Instead, lets
keep the background process alive for a bit waiting for a new command to
come in.
This commit is contained in:
Erik Johnston 2025-11-05 13:42:04 +00:00 committed by GitHub
parent 2fd8d88b42
commit 4906771da1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 262 additions and 33 deletions

1
changelog.d/19138.misc Normal file
View File

@ -0,0 +1 @@
Minor speed up of processing of inbound replication.

View File

@ -20,7 +20,6 @@
#
#
import logging
from collections import deque
from typing import (
TYPE_CHECKING,
Any,
@ -71,6 +70,7 @@ from synapse.replication.tcp.streams._base import (
DeviceListsStream,
ThreadSubscriptionsStream,
)
from synapse.util.bacckground_queue import BackgroundQueue
if TYPE_CHECKING:
from synapse.server import HomeServer
@ -115,8 +115,8 @@ tcp_command_queue_gauge = LaterGauge(
# the type of the entries in _command_queues_by_stream
_StreamCommandQueue = deque[
tuple[Union[RdataCommand, PositionCommand], IReplicationConnection]
_StreamCommandQueueItem = tuple[
Union[RdataCommand, PositionCommand], IReplicationConnection
]
@ -265,7 +265,12 @@ class ReplicationCommandHandler:
# for each stream, a queue of commands that are awaiting processing, and the
# connection that they arrived on.
self._command_queues_by_stream = {
stream_name: _StreamCommandQueue() for stream_name in self._streams
stream_name: BackgroundQueue[_StreamCommandQueueItem](
hs,
"process-replication-data",
self._unsafe_process_item,
)
for stream_name in self._streams
}
# For each connection, the incoming stream names that have received a POSITION
@ -349,38 +354,17 @@ class ReplicationCommandHandler:
logger.error("Got %s for unknown stream: %s", cmd.NAME, stream_name)
return
queue.append((cmd, conn))
queue.add((cmd, conn))
# if we're already processing this stream, there's nothing more to do:
# the new entry on the queue will get picked up in due course
if stream_name in self._processing_streams:
return
async def _unsafe_process_item(self, item: _StreamCommandQueueItem) -> None:
"""Process a single command from the stream queue.
# fire off a background process to start processing the queue.
self.hs.run_as_background_process(
"process-replication-data",
self._unsafe_process_queue,
stream_name,
)
async def _unsafe_process_queue(self, stream_name: str) -> None:
"""Processes the command queue for the given stream, until it is empty
Does not check if there is already a thread processing the queue, hence "unsafe"
This should only be called one at a time per stream, and is called from
the stream's BackgroundQueue.
"""
assert stream_name not in self._processing_streams
self._processing_streams.add(stream_name)
try:
queue = self._command_queues_by_stream.get(stream_name)
while queue:
cmd, conn = queue.popleft()
try:
await self._process_command(cmd, conn, stream_name)
except Exception:
logger.exception("Failed to handle command %s", cmd)
finally:
self._processing_streams.discard(stream_name)
cmd, conn = item
stream_name = cmd.stream_name
await self._process_command(cmd, conn, stream_name)
async def _process_command(
self,

View File

@ -0,0 +1,139 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 Element Creations Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
#
#
import collections
import logging
from typing import (
TYPE_CHECKING,
Awaitable,
Callable,
Generic,
Optional,
TypeVar,
)
from synapse.util.async_helpers import DeferredEvent
from synapse.util.constants import MILLISECONDS_PER_SECOND
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
T = TypeVar("T")
class BackgroundQueue(Generic[T]):
"""A single-producer single-consumer async queue processing items in the
background.
This is optimised for the case where we receive many items, but processing
each one takes a short amount of time. In this case we don't want to pay the
overhead of a new background process each time. Instead, we spawn a
background process that will wait for new items to arrive.
If the background process has been idle for a while, it will exit, and a new
background process will be spawned when new items arrive.
Args:
hs: The homeserver.
name: The name of the background process.
callback: The async callback to process each item.
timeout_ms: The time in milliseconds to wait for new items before
exiting the background process.
"""
def __init__(
self,
hs: "HomeServer",
name: str,
callback: Callable[[T], Awaitable[None]],
timeout_ms: int = 1000,
) -> None:
self._hs = hs
self._name = name
self._callback = callback
self._timeout_ms = timeout_ms
# The queue of items to process.
self._queue: collections.deque[T] = collections.deque()
# Indicates if a background process is running, and if so whether there
# is new data in the queue. Used to signal to an existing background
# process that there is new data added to the queue.
self._wakeup_event: Optional[DeferredEvent] = None
def add(self, item: T) -> None:
"""Add an item into the queue."""
self._queue.append(item)
if self._wakeup_event is None:
self._hs.run_as_background_process(self._name, self._process_queue)
else:
self._wakeup_event.set()
async def _process_queue(self) -> None:
"""Process items in the queue until it is empty."""
# Make sure we're the only background process.
if self._wakeup_event is not None:
# If there is already a background process then we signal it to wake
# up and exit. We do not want multiple background processes running
# at a time.
self._wakeup_event.set()
return
self._wakeup_event = DeferredEvent(self._hs.get_clock())
try:
while True:
# Clear the event before checking the queue. If we cleared after
# we run the risk of the wakeup signal racing with us checking
# the queue. (This can't really happen in Python due to the
# single threaded nature, but let's be a bit defensive anyway.)
self._wakeup_event.clear()
while self._queue:
item = self._queue.popleft()
try:
await self._callback(item)
except Exception:
logger.exception("Error processing background queue item")
# Wait for new data to arrive, timing out after a while to avoid
# keeping the background process alive forever.
#
# New data may have arrived and been processed while we were
# pulling from the queue, so this may return that there is new
# data immediately even though there isn't. That's fine, we'll
# just loop round, clear the event, recheck the queue, and then
# wait here again.
new_data = await self._wakeup_event.wait(
timeout_seconds=self._timeout_ms / MILLISECONDS_PER_SECOND
)
if not new_data:
# Timed out waiting for new data, so exit the loop
break
finally:
# This background process is exiting, so clear the wakeup event to
# indicate that a new one should be started when new data arrives.
self._wakeup_event = None
# The queue must be empty here.
assert not self._queue
def __len__(self) -> int:
return len(self._queue)

View File

@ -0,0 +1,105 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 Element Creations Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
from unittest.mock import Mock
from twisted.internet.defer import Deferred
from twisted.internet.testing import MemoryReactor
from synapse.server import HomeServer
from synapse.util.bacckground_queue import BackgroundQueue
from synapse.util.clock import Clock
from tests.unittest import HomeserverTestCase
class BackgroundQueueTests(HomeserverTestCase):
def prepare(
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
) -> None:
self._process_item_mock = Mock(spec_set=[])
self.queue = BackgroundQueue[int](
hs=homeserver,
name="test_queue",
callback=self._process_item_mock,
timeout_ms=1000,
)
def test_simple_call(self) -> None:
"""Test that items added to the queue are processed."""
# Register a deferred to be the return value of the callback.
callback_result_deferred: Deferred[None] = Deferred()
self._process_item_mock.side_effect = callback_result_deferred
# Adding an item should cause the callback to be invoked.
self.queue.add(1)
self._process_item_mock.assert_called_once_with(1)
self._process_item_mock.reset_mock()
# Adding another item should not cause the callback to be invoked again
# until the previous one has completed.
self.queue.add(2)
self._process_item_mock.assert_not_called()
# Once the first callback completes, the second item should be
# processed.
callback_result_deferred.callback(None)
self._process_item_mock.assert_called_once_with(2)
def test_timeout(self) -> None:
"""Test that the background process wakes up if its idle, and that it
times out after being idle."""
# Register a deferred to be the return value of the callback.
callback_result_deferred: Deferred[None] = Deferred()
self._process_item_mock.side_effect = callback_result_deferred
# Adding an item should cause the callback to be invoked.
self.queue.add(1)
self._process_item_mock.assert_called_once_with(1)
self._process_item_mock.reset_mock()
# Let the callback complete.
callback_result_deferred.callback(None)
# Advance the clock by less than the timeout, and add another item.
self.reactor.advance(0.5)
self.assertIsNotNone(self.queue._wakeup_event)
self.queue.add(2)
# The callback should be invoked again.
callback_result_deferred = Deferred()
self._process_item_mock.side_effect = callback_result_deferred
self._process_item_mock.assert_called_once_with(2)
self._process_item_mock.reset_mock()
# Let the callback complete.
callback_result_deferred.callback(None)
# Advance the clock by more than the timeout.
self.reactor.advance(1.5)
# The background process should have exited, we check this by checking
# the internal wakeup event has been removed.
self.assertIsNone(self.queue._wakeup_event)
# Add another item. This should cause a new background process to be
# started.
self.queue.add(3)
self._process_item_mock.assert_called_once_with(3)