|
27 | 27 | from synapse.replication.tcp.commands import PositionCommand |
28 | 28 | from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol |
29 | 29 | from synapse.replication.tcp.streams import EventsStream |
30 | | -from synapse.replication.tcp.streams._base import StreamRow, Token |
| 30 | +from synapse.replication.tcp.streams._base import CachesStream, StreamRow, Token |
31 | 31 | from synapse.util.metrics import Measure |
32 | 32 |
|
33 | 33 | if TYPE_CHECKING: |
@@ -204,6 +204,23 @@ async def _run_notifier_loop(self) -> None: |
204 | 204 | # The token has advanced but there is no data to |
205 | 205 | # send, so we send a `POSITION` to inform other |
206 | 206 | # workers of the updated position. |
| 207 | + # |
| 208 | + # There are two reasons for this: 1) this instance |
| 209 | + # requested a stream ID but didn't use it, or 2) |
| 210 | + # this instance advanced its own stream position due |
| 211 | + # to receiving notifications about other instances |
| 212 | + # advancing their stream position. |
| 213 | + |
| 214 | + # We skip sending `POSITION` for the `caches` stream |
| 215 | + # for the second case as a) it generates a lot of |
| 216 | + # traffic as every worker would echo each write, and |
| 217 | + # b) nothing cares if a given worker's caches stream |
| 218 | + # position lags. |
| 219 | + if stream.NAME == CachesStream.NAME: |
| 220 | + # If there haven't been any writes since the |
| 221 | + # `last_token` then we're in the second case. |
| 222 | + if stream.minimal_local_current_token() <= last_token: |
| 223 | + continue |
207 | 224 |
|
208 | 225 | # Note: `last_token` may not *actually* be the |
209 | 226 | # last token we sent out in a RDATA or POSITION. |
|
0 commit comments