diff --git a/changelog.d/19138.misc b/changelog.d/19138.misc new file mode 100644 index 000000000..118336173 --- /dev/null +++ b/changelog.d/19138.misc @@ -0,0 +1 @@ +Minor speed up of processing of inbound replication. diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index bd1ee5ff9..ed7cff72b 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -20,7 +20,6 @@ # # import logging -from collections import deque from typing import ( TYPE_CHECKING, Any, @@ -71,6 +70,7 @@ from synapse.replication.tcp.streams._base import ( DeviceListsStream, ThreadSubscriptionsStream, ) +from synapse.util.bacckground_queue import BackgroundQueue if TYPE_CHECKING: from synapse.server import HomeServer @@ -115,8 +115,8 @@ tcp_command_queue_gauge = LaterGauge( # the type of the entries in _command_queues_by_stream -_StreamCommandQueue = deque[ - tuple[Union[RdataCommand, PositionCommand], IReplicationConnection] +_StreamCommandQueueItem = tuple[ + Union[RdataCommand, PositionCommand], IReplicationConnection ] @@ -265,7 +265,12 @@ class ReplicationCommandHandler: # for each stream, a queue of commands that are awaiting processing, and the # connection that they arrived on. self._command_queues_by_stream = { - stream_name: _StreamCommandQueue() for stream_name in self._streams + stream_name: BackgroundQueue[_StreamCommandQueueItem]( + hs, + "process-replication-data", + self._unsafe_process_item, + ) + for stream_name in self._streams } # For each connection, the incoming stream names that have received a POSITION @@ -349,38 +354,17 @@ class ReplicationCommandHandler: logger.error("Got %s for unknown stream: %s", cmd.NAME, stream_name) return - queue.append((cmd, conn)) + queue.add((cmd, conn)) - # if we're already processing this stream, there's nothing more to do: - # the new entry on the queue will get picked up in due course - if stream_name in self._processing_streams: - return + async def _unsafe_process_item(self, item: _StreamCommandQueueItem) -> None: + """Process a single command from the stream queue. - # fire off a background process to start processing the queue. - self.hs.run_as_background_process( - "process-replication-data", - self._unsafe_process_queue, - stream_name, - ) - - async def _unsafe_process_queue(self, stream_name: str) -> None: - """Processes the command queue for the given stream, until it is empty - - Does not check if there is already a thread processing the queue, hence "unsafe" + This should only be called one at a time per stream, and is called from + the stream's BackgroundQueue. """ - assert stream_name not in self._processing_streams - - self._processing_streams.add(stream_name) - try: - queue = self._command_queues_by_stream.get(stream_name) - while queue: - cmd, conn = queue.popleft() - try: - await self._process_command(cmd, conn, stream_name) - except Exception: - logger.exception("Failed to handle command %s", cmd) - finally: - self._processing_streams.discard(stream_name) + cmd, conn = item + stream_name = cmd.stream_name + await self._process_command(cmd, conn, stream_name) async def _process_command( self, diff --git a/synapse/util/bacckground_queue.py b/synapse/util/bacckground_queue.py new file mode 100644 index 000000000..daf6a9484 --- /dev/null +++ b/synapse/util/bacckground_queue.py @@ -0,0 +1,139 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2025 Element Creations Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# . +# +# +# + +import collections +import logging +from typing import ( + TYPE_CHECKING, + Awaitable, + Callable, + Generic, + Optional, + TypeVar, +) + +from synapse.util.async_helpers import DeferredEvent +from synapse.util.constants import MILLISECONDS_PER_SECOND + +if TYPE_CHECKING: + from synapse.server import HomeServer + +logger = logging.getLogger(__name__) + +T = TypeVar("T") + + +class BackgroundQueue(Generic[T]): + """A single-producer single-consumer async queue processing items in the + background. + + This is optimised for the case where we receive many items, but processing + each one takes a short amount of time. In this case we don't want to pay the + overhead of a new background process each time. Instead, we spawn a + background process that will wait for new items to arrive. + + If the background process has been idle for a while, it will exit, and a new + background process will be spawned when new items arrive. + + Args: + hs: The homeserver. + name: The name of the background process. + callback: The async callback to process each item. + timeout_ms: The time in milliseconds to wait for new items before + exiting the background process. + """ + + def __init__( + self, + hs: "HomeServer", + name: str, + callback: Callable[[T], Awaitable[None]], + timeout_ms: int = 1000, + ) -> None: + self._hs = hs + self._name = name + self._callback = callback + self._timeout_ms = timeout_ms + + # The queue of items to process. + self._queue: collections.deque[T] = collections.deque() + + # Indicates if a background process is running, and if so whether there + # is new data in the queue. Used to signal to an existing background + # process that there is new data added to the queue. + self._wakeup_event: Optional[DeferredEvent] = None + + def add(self, item: T) -> None: + """Add an item into the queue.""" + + self._queue.append(item) + if self._wakeup_event is None: + self._hs.run_as_background_process(self._name, self._process_queue) + else: + self._wakeup_event.set() + + async def _process_queue(self) -> None: + """Process items in the queue until it is empty.""" + + # Make sure we're the only background process. + if self._wakeup_event is not None: + # If there is already a background process then we signal it to wake + # up and exit. We do not want multiple background processes running + # at a time. + self._wakeup_event.set() + return + + self._wakeup_event = DeferredEvent(self._hs.get_clock()) + + try: + while True: + # Clear the event before checking the queue. If we cleared after + # we run the risk of the wakeup signal racing with us checking + # the queue. (This can't really happen in Python due to the + # single threaded nature, but let's be a bit defensive anyway.) + self._wakeup_event.clear() + + while self._queue: + item = self._queue.popleft() + try: + await self._callback(item) + except Exception: + logger.exception("Error processing background queue item") + + # Wait for new data to arrive, timing out after a while to avoid + # keeping the background process alive forever. + # + # New data may have arrived and been processed while we were + # pulling from the queue, so this may return that there is new + # data immediately even though there isn't. That's fine, we'll + # just loop round, clear the event, recheck the queue, and then + # wait here again. + new_data = await self._wakeup_event.wait( + timeout_seconds=self._timeout_ms / MILLISECONDS_PER_SECOND + ) + if not new_data: + # Timed out waiting for new data, so exit the loop + break + finally: + # This background process is exiting, so clear the wakeup event to + # indicate that a new one should be started when new data arrives. + self._wakeup_event = None + + # The queue must be empty here. + assert not self._queue + + def __len__(self) -> int: + return len(self._queue) diff --git a/tests/util/test_background_queue.py b/tests/util/test_background_queue.py new file mode 100644 index 000000000..d7eb4f4f0 --- /dev/null +++ b/tests/util/test_background_queue.py @@ -0,0 +1,105 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2025 Element Creations Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# . +# + + +from unittest.mock import Mock + +from twisted.internet.defer import Deferred +from twisted.internet.testing import MemoryReactor + +from synapse.server import HomeServer +from synapse.util.bacckground_queue import BackgroundQueue +from synapse.util.clock import Clock + +from tests.unittest import HomeserverTestCase + + +class BackgroundQueueTests(HomeserverTestCase): + def prepare( + self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer + ) -> None: + self._process_item_mock = Mock(spec_set=[]) + + self.queue = BackgroundQueue[int]( + hs=homeserver, + name="test_queue", + callback=self._process_item_mock, + timeout_ms=1000, + ) + + def test_simple_call(self) -> None: + """Test that items added to the queue are processed.""" + # Register a deferred to be the return value of the callback. + callback_result_deferred: Deferred[None] = Deferred() + self._process_item_mock.side_effect = callback_result_deferred + + # Adding an item should cause the callback to be invoked. + self.queue.add(1) + + self._process_item_mock.assert_called_once_with(1) + self._process_item_mock.reset_mock() + + # Adding another item should not cause the callback to be invoked again + # until the previous one has completed. + self.queue.add(2) + self._process_item_mock.assert_not_called() + + # Once the first callback completes, the second item should be + # processed. + callback_result_deferred.callback(None) + self._process_item_mock.assert_called_once_with(2) + + def test_timeout(self) -> None: + """Test that the background process wakes up if its idle, and that it + times out after being idle.""" + + # Register a deferred to be the return value of the callback. + callback_result_deferred: Deferred[None] = Deferred() + self._process_item_mock.side_effect = callback_result_deferred + + # Adding an item should cause the callback to be invoked. + self.queue.add(1) + + self._process_item_mock.assert_called_once_with(1) + self._process_item_mock.reset_mock() + + # Let the callback complete. + callback_result_deferred.callback(None) + + # Advance the clock by less than the timeout, and add another item. + self.reactor.advance(0.5) + self.assertIsNotNone(self.queue._wakeup_event) + self.queue.add(2) + + # The callback should be invoked again. + callback_result_deferred = Deferred() + self._process_item_mock.side_effect = callback_result_deferred + self._process_item_mock.assert_called_once_with(2) + self._process_item_mock.reset_mock() + + # Let the callback complete. + callback_result_deferred.callback(None) + + # Advance the clock by more than the timeout. + self.reactor.advance(1.5) + + # The background process should have exited, we check this by checking + # the internal wakeup event has been removed. + self.assertIsNone(self.queue._wakeup_event) + + # Add another item. This should cause a new background process to be + # started. + self.queue.add(3) + + self._process_item_mock.assert_called_once_with(3)