Improve documentation around streams, particularly ID generators and adding new streams. (#18943)
This arises mostly from my recent experience adding a stream for Thread Subscriptions and trying to help others add their own streams. --------- Signed-off-by: Olivier 'reivilibre <oliverw@matrix.org>
This commit is contained in:
parent
378c5c838c
commit
a50923b6bf
1
changelog.d/18943.doc
Normal file
1
changelog.d/18943.doc
Normal file
@ -0,0 +1 @@
|
|||||||
|
Improve documentation around streams, particularly ID generators and adding new streams.
|
||||||
@ -1,4 +1,4 @@
|
|||||||
## Streams
|
# Streams
|
||||||
|
|
||||||
Synapse has a concept of "streams", which are roughly described in [`id_generators.py`](
|
Synapse has a concept of "streams", which are roughly described in [`id_generators.py`](
|
||||||
https://github.com/element-hq/synapse/blob/develop/synapse/storage/util/id_generators.py
|
https://github.com/element-hq/synapse/blob/develop/synapse/storage/util/id_generators.py
|
||||||
@ -19,7 +19,7 @@ To that end, let's describe streams formally, paraphrasing from the docstring of
|
|||||||
https://github.com/element-hq/synapse/blob/a719b703d9bd0dade2565ddcad0e2f3a7a9d4c37/synapse/storage/util/id_generators.py#L96
|
https://github.com/element-hq/synapse/blob/a719b703d9bd0dade2565ddcad0e2f3a7a9d4c37/synapse/storage/util/id_generators.py#L96
|
||||||
).
|
).
|
||||||
|
|
||||||
### Definition
|
## Definition
|
||||||
|
|
||||||
A stream is an append-only log `T1, T2, ..., Tn, ...` of facts[^1] which grows over time.
|
A stream is an append-only log `T1, T2, ..., Tn, ...` of facts[^1] which grows over time.
|
||||||
Only "writers" can add facts to a stream, and there may be multiple writers.
|
Only "writers" can add facts to a stream, and there may be multiple writers.
|
||||||
@ -47,7 +47,7 @@ But unhappy cases (e.g. transaction rollback due to an error) also count as comp
|
|||||||
Once completed, the rows written with that stream ID are fixed, and no new rows
|
Once completed, the rows written with that stream ID are fixed, and no new rows
|
||||||
will be inserted with that ID.
|
will be inserted with that ID.
|
||||||
|
|
||||||
### Current stream ID
|
## Current stream ID
|
||||||
|
|
||||||
For any given stream reader (including writers themselves), we may define a per-writer current stream ID:
|
For any given stream reader (including writers themselves), we may define a per-writer current stream ID:
|
||||||
|
|
||||||
@ -93,7 +93,7 @@ Consider a single-writer stream which is initially at ID 1.
|
|||||||
| Complete 6 | 6 | |
|
| Complete 6 | 6 | |
|
||||||
|
|
||||||
|
|
||||||
### Multi-writer streams
|
## Multi-writer streams
|
||||||
|
|
||||||
There are two ways to view a multi-writer stream.
|
There are two ways to view a multi-writer stream.
|
||||||
|
|
||||||
@ -115,7 +115,7 @@ The facts this stream holds are instructions to "you should now invalidate these
|
|||||||
We only ever treat this as a multiple single-writer streams as there is no important ordering between cache invalidations.
|
We only ever treat this as a multiple single-writer streams as there is no important ordering between cache invalidations.
|
||||||
(Invalidations are self-contained facts; and the invalidations commute/are idempotent).
|
(Invalidations are self-contained facts; and the invalidations commute/are idempotent).
|
||||||
|
|
||||||
### Writing to streams
|
## Writing to streams
|
||||||
|
|
||||||
Writers need to track:
|
Writers need to track:
|
||||||
- track their current position (i.e. its own per-writer stream ID).
|
- track their current position (i.e. its own per-writer stream ID).
|
||||||
@ -133,7 +133,7 @@ To complete a fact, first remove it from your map of facts currently awaiting co
|
|||||||
Then, if no earlier fact is awaiting completion, the writer can advance its current position in that stream.
|
Then, if no earlier fact is awaiting completion, the writer can advance its current position in that stream.
|
||||||
Upon doing so it should emit an `RDATA` message[^3], once for every fact between the old and the new stream ID.
|
Upon doing so it should emit an `RDATA` message[^3], once for every fact between the old and the new stream ID.
|
||||||
|
|
||||||
### Subscribing to streams
|
## Subscribing to streams
|
||||||
|
|
||||||
Readers need to track the current position of every writer.
|
Readers need to track the current position of every writer.
|
||||||
|
|
||||||
@ -146,10 +146,44 @@ The `RDATA` itself is not a self-contained representation of the fact;
|
|||||||
readers will have to query the stream tables for the full details.
|
readers will have to query the stream tables for the full details.
|
||||||
Readers must also advance their record of the writer's current position for that stream.
|
Readers must also advance their record of the writer's current position for that stream.
|
||||||
|
|
||||||
# Summary
|
## Summary
|
||||||
|
|
||||||
In a nutshell: we have an append-only log with a "buffer/scratchpad" at the end where we have to wait for the sequence to be linear and contiguous.
|
In a nutshell: we have an append-only log with a "buffer/scratchpad" at the end where we have to wait for the sequence to be linear and contiguous.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Cheatsheet for creating a new stream
|
||||||
|
|
||||||
|
These rough notes and links may help you to create a new stream and add all the
|
||||||
|
necessary registration and event handling.
|
||||||
|
|
||||||
|
**Create your stream:**
|
||||||
|
- [create a stream class and stream row class](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/replication/tcp/streams/_base.py#L728)
|
||||||
|
- will need an [ID generator](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/storage/databases/main/thread_subscriptions.py#L75)
|
||||||
|
- may need [writer configuration](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/config/workers.py#L177), if there isn't already an obvious source of configuration for which workers should be designated as writers to your new stream.
|
||||||
|
- if adding new writer configuration, add Docker-worker configuration, which lets us configure the writer worker in Complement tests: [[1]](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/docker/configure_workers_and_start.py#L331), [[2]](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/docker/configure_workers_and_start.py#L440)
|
||||||
|
- most of the time, you will likely introduce a new datastore class for the concept represented by the new stream, unless there is already an obvious datastore that covers it.
|
||||||
|
- consider whether it may make sense to introduce a handler
|
||||||
|
|
||||||
|
**Register your stream in:**
|
||||||
|
- [`STREAMS_MAP`](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/replication/tcp/streams/__init__.py#L71)
|
||||||
|
|
||||||
|
**Advance your stream in:**
|
||||||
|
- [`process_replication_position` of your appropriate datastore](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/storage/databases/main/thread_subscriptions.py#L111)
|
||||||
|
- don't forget the super call
|
||||||
|
|
||||||
|
**If you're going to do any caching that needs invalidation from new rows:**
|
||||||
|
- add invalidations to [`process_replication_rows` of your appropriate datastore](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/storage/databases/main/thread_subscriptions.py#L91)
|
||||||
|
- don't forget the super call
|
||||||
|
- add local-only [invalidations to your writer transactions](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/storage/databases/main/thread_subscriptions.py#L201)
|
||||||
|
|
||||||
|
**For streams to be used in sync:**
|
||||||
|
- add a new field to [`StreamToken`](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/types/__init__.py#L1003)
|
||||||
|
- add a new [`StreamKeyType`](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/types/__init__.py#L999)
|
||||||
|
- add appropriate wake-up rules
|
||||||
|
- in [`on_rdata`](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/replication/tcp/client.py#L260)
|
||||||
|
- locally on the same worker when completing a write, [e.g. in your handler](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/handlers/thread_subscriptions.py#L139)
|
||||||
|
- add the stream in [`bound_future_token`](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/streams/events.py#L127)
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
|||||||
@ -322,7 +322,7 @@ class LoggingTransaction:
|
|||||||
self, callback: Callable[P, object], *args: P.args, **kwargs: P.kwargs
|
self, callback: Callable[P, object], *args: P.args, **kwargs: P.kwargs
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Call the given callback on the main twisted thread after the transaction has
|
"""Call the given callback on the main twisted thread after the transaction has
|
||||||
finished.
|
finished successfully.
|
||||||
|
|
||||||
Mostly used to invalidate the caches on the correct thread.
|
Mostly used to invalidate the caches on the correct thread.
|
||||||
|
|
||||||
@ -343,7 +343,7 @@ class LoggingTransaction:
|
|||||||
self, callback: Callable[P, Awaitable], *args: P.args, **kwargs: P.kwargs
|
self, callback: Callable[P, Awaitable], *args: P.args, **kwargs: P.kwargs
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Call the given asynchronous callback on the main twisted thread after
|
"""Call the given asynchronous callback on the main twisted thread after
|
||||||
the transaction has finished (but before those added in `call_after`).
|
the transaction has finished successfully (but before those added in `call_after`).
|
||||||
|
|
||||||
Mostly used to invalidate remote caches after transactions.
|
Mostly used to invalidate remote caches after transactions.
|
||||||
|
|
||||||
|
|||||||
@ -175,7 +175,8 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
|
|||||||
Uses a Postgres sequence to coordinate ID assignment, but positions of other
|
Uses a Postgres sequence to coordinate ID assignment, but positions of other
|
||||||
writers will only get updated when `advance` is called (by replication).
|
writers will only get updated when `advance` is called (by replication).
|
||||||
|
|
||||||
Note: Only works with Postgres.
|
On SQLite, falls back to a single-writer implementation, which is fine because
|
||||||
|
Synapse only supports monolith mode when SQLite is the database driver.
|
||||||
|
|
||||||
Warning: Streams using this generator start at ID 2, because ID 1 is always assumed
|
Warning: Streams using this generator start at ID 2, because ID 1 is always assumed
|
||||||
to have been 'seen as persisted'.
|
to have been 'seen as persisted'.
|
||||||
@ -536,6 +537,16 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
|
|||||||
|
|
||||||
def get_next_txn(self, txn: LoggingTransaction) -> int:
|
def get_next_txn(self, txn: LoggingTransaction) -> int:
|
||||||
"""
|
"""
|
||||||
|
Generate an ID for immediate use within a database transaction.
|
||||||
|
|
||||||
|
The ID will automatically be marked as finished at the end of the
|
||||||
|
database transaction, therefore the stream rows MUST be persisted
|
||||||
|
within the active transaction (MUST NOT be persisted in a later
|
||||||
|
transaction).
|
||||||
|
|
||||||
|
The replication notifier will automatically be notified when the
|
||||||
|
transaction ends successfully.
|
||||||
|
|
||||||
Usage:
|
Usage:
|
||||||
|
|
||||||
stream_id = stream_id_gen.get_next_txn(txn)
|
stream_id = stream_id_gen.get_next_txn(txn)
|
||||||
@ -573,6 +584,16 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
|
|||||||
|
|
||||||
def get_next_mult_txn(self, txn: LoggingTransaction, n: int) -> list[int]:
|
def get_next_mult_txn(self, txn: LoggingTransaction, n: int) -> list[int]:
|
||||||
"""
|
"""
|
||||||
|
Generate multiple IDs for immediate use within a database transaction.
|
||||||
|
|
||||||
|
The IDs will automatically be marked as finished at the end of the
|
||||||
|
database transaction, therefore the stream rows MUST be persisted
|
||||||
|
within the active transaction (MUST NOT be persisted in a later
|
||||||
|
transaction).
|
||||||
|
|
||||||
|
The replication notifier will automatically be notified when the
|
||||||
|
transaction ends successfully.
|
||||||
|
|
||||||
Usage:
|
Usage:
|
||||||
|
|
||||||
stream_id = stream_id_gen.get_next_txn(txn)
|
stream_id = stream_id_gen.get_next_txn(txn)
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user