Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
81bf67d
Remove the `device_id_exists_cache` from the device store.
sandhose Jun 16, 2025
4caf770
Move the store/delete/update device methods to the DeviceWorkerStore
sandhose Jun 16, 2025
a7ba48a
Type the store explicitly on the DeviceHandler
sandhose Jun 16, 2025
cf5d5f5
Allow configuration of device_lists stream writers
sandhose Jun 18, 2025
4c39d80
Replicate the device lists stream from writers
sandhose Jun 26, 2025
1bd919a
Simplify the creation of MultiWriterStreamToken
sandhose Jun 18, 2025
db3facd
Make the device list token a MultiWriterStreamToken
sandhose Jun 20, 2025
2b67e3e
Move all the device storage methods to workers
sandhose Jun 20, 2025
709779f
Add a replication endpoint for DeviceHandler.notify_device_update
sandhose Jun 20, 2025
d7fc913
Move _check_device_name_length to a separate function
sandhose Jun 20, 2025
3e8ff9d
Move DeviceHandler.check_device_registered to work on any worker
sandhose Jun 20, 2025
3a1b253
Move delete_devices to work on any worker
sandhose Jun 20, 2025
4f655a6
Move update_device and upsert_device to be available on all workers
sandhose Jun 20, 2025
839e260
Move dehydrated devices operations on workers
sandhose Jun 20, 2025
4d5ad56
Make notify_user_signature_update avail in workers through replication
sandhose Jun 20, 2025
cd201d8
Instanciate the device handler on device list writers
sandhose Jun 23, 2025
cd9e98e
Route device list updates EDUs to the right writers
sandhose Jun 23, 2025
a197067
Register replication endpoints on device writers
sandhose Jun 23, 2025
10f5b88
Remove the dependency on DeviceHandler in most places
sandhose Jun 23, 2025
e707d43
Setup device_lists workers in complement
sandhose Jun 23, 2025
e07bc58
Move all the E2E keys store methods to the worker store
sandhose Jun 24, 2025
77c718f
Handle signing key updates EDUs on device lists writers
sandhose Jun 24, 2025
1a45bf0
Poke the fed sender if it's on the same instance as device list writers
sandhose Jun 24, 2025
d7cc4ba
Consolidate multi user device resyncs through the DeviceListUpdater
sandhose Jun 25, 2025
242884c
Only handle signing key updates on a single device_lists writer
sandhose Jun 25, 2025
0dddd99
Run _handle_new_device_update_async on the first device list writer only
sandhose Jun 25, 2025
641f729
Ensure a few operations are only running on the first device list writer
sandhose Jun 25, 2025
42e7df4
Run the 'delete_stale_devices' background task on the background worker
sandhose Jun 25, 2025
b94e314
Route room un-partialing to device writers using replication
sandhose Jun 26, 2025
fbe46bc
Comment on my expecetations around the partial state room device stream
sandhose Jun 26, 2025
56a5cf8
TEMP: newsfile
sandhose Jun 23, 2025
1556872
Replace occurences of "master" with MAIN_PROCESS_INSTANCE_NAME
sandhose Jul 11, 2025
d6e0ec4
Merge branch 'develop' into quenting/device-changes-off-main
sandhose Jul 11, 2025
d436e89
get_uncoverted_outbound_room_pokes can also be called from non-writers
sandhose Jul 11, 2025
0e54b94
Add comments why some functions must be called on a writer.
sandhose Jul 11, 2025
bd929d4
Remove unused _is_main_process property
sandhose Jul 15, 2025
04d9780
Rename DeviceHandler -> DeviceWriterHandler & DeviceWorkerHandler ->
sandhose Jul 15, 2025
9a770b3
Reminder to remove replication handler
sandhose Jul 15, 2025
8c94934
Comment about why handle_new_device_update RPCs the first writer
sandhose Jul 15, 2025
fdcd607
Comment on what would be needed for
sandhose Jul 15, 2025
cd400c2
Comment in
sandhose Jul 15, 2025
4394314
Comments about why we use a 'main device list writer'
sandhose Jul 15, 2025
da58cab
Get the event_persister -> events logic out of
sandhose Jul 15, 2025
e02cf43
Typo in comment
sandhose Jul 16, 2025
9572e95
Doc comment on the DeviceHandler
sandhose Jul 16, 2025
38a243b
Only instanciate the DeviceListUpdater on the main device list writer
sandhose Jul 16, 2025
bedf66b
Merge remote-tracking branch 'origin/develop' into quenting/device-ch…
sandhose Jul 16, 2025
90eb9ba
tests: remove frontend_proxy from workers & route device changes to c…
sandhose Jul 16, 2025
8a89a25
Document the new device_lists stream
sandhose Jul 16, 2025
2a28d42
Newsfile
sandhose Jul 16, 2025
8173ad7
Correctly run the delete stale device task on the background worker
sandhose Jul 16, 2025
4e2b9af
Comment why we use _main_device_list_writer
sandhose Jul 16, 2025
3ed6db7
Comment why we run _handle_new_device_update_async on a single worker
sandhose Jul 16, 2025
a0e4e2d
Reword newsfile
sandhose Jul 17, 2025
8488e1f
Comment to keep the main device list writer in sync
sandhose Jul 17, 2025
a64dafd
Explain why it's fine to use the stream ID for the oubound pokes
sandhose Jul 17, 2025
0a5d2d1
Explain why some logic only happens on one writer
sandhose Jul 17, 2025
e94d0d3
/delete_devices can also be delegated to workers
sandhose Jul 17, 2025
bd9a360
Merge remote-tracking branch 'origin/develop' into quenting/device-ch…
sandhose Jul 17, 2025
60ea378
More accurate comment on the minimum stream ID
sandhose Jul 17, 2025
565403e
Forgot delete_devices in one place
sandhose Jul 17, 2025
339c90f
Explain why it's fine to use the min stream pos when processing rdata
sandhose Jul 17, 2025
d766b81
Review the last places where we use the min stream ID
sandhose Jul 17, 2025
07f32aa
Move token deletions to worker store
sandhose Jul 17, 2025
d916e9e
Make sure to listify the device_ids before going to replication
sandhose Jul 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/18581.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Enable workers to write directly to the device lists stream and handle device list updates, reducing load on the main process.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually adding /delete_devices I think uncovered two more bugs:

  • deleting access token was only available on the main store, not worker store (fixed with 07f32aa)
  • when we called notify_device_update through replication, it was missing a conversion from StrCollection (which can be a lot of things) to a proper list to be serializable. Grrr @ replication clients that don't retain proper typing… Fixed with d916e9e

-- @sandhose, #18581 (comment)

Overall, this is the type of thing I can't really review (related discussion: #18581 (comment))

I can add an approval but it's just like "sure".

Feels like some typing/lints are missing around ensuring only serializable data is passed to a ReplicationEndpoint but this is something to address outside of this PR.

2 changes: 1 addition & 1 deletion docker/complement/conf/start_for_complement.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ if [[ -n "$SYNAPSE_COMPLEMENT_USE_WORKERS" ]]; then
export SYNAPSE_WORKER_TYPES="\
event_persister:2, \
background_worker, \
frontend_proxy, \
event_creator, \
user_dir, \
media_repository, \
Expand All @@ -65,6 +64,7 @@ if [[ -n "$SYNAPSE_COMPLEMENT_USE_WORKERS" ]]; then
client_reader, \
appservice, \
pusher, \
device_lists:2, \
stream_writers=account_data+presence+receipts+to_device+typing"

fi
Expand Down
54 changes: 25 additions & 29 deletions docker/configure_workers_and_start.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@
"^/_matrix/client/(api/v1|r0|v3|unstable)/login$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/account/3pid$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/account/whoami$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/devices(/|$)",
"^/_matrix/client/(r0|v3)/delete_devices$",
"^/_matrix/client/versions$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/voip/turnServer$",
"^/_matrix/client/(r0|v3|unstable)/register$",
Expand All @@ -194,6 +196,9 @@
"^/_matrix/client/(api/v1|r0|v3|unstable)/directory/room/.*$",
"^/_matrix/client/(r0|v3|unstable)/capabilities$",
"^/_matrix/client/(r0|v3|unstable)/notifications$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload",
"^/_matrix/client/(api/v1|r0|v3|unstable)/keys/device_signing/upload$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/keys/signatures/upload$",
],
"shared_extra_conf": {},
"worker_extra_conf": "",
Expand Down Expand Up @@ -265,13 +270,6 @@
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"frontend_proxy": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client", "replication"],
"endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"account_data": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client", "replication"],
Expand Down Expand Up @@ -306,6 +304,13 @@
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"device_lists": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client", "replication"],
"endpoint_patterns": [],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"typing": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client", "replication"],
Expand Down Expand Up @@ -412,16 +417,17 @@ def add_worker_roles_to_shared_config(
# streams
instance_map = shared_config.setdefault("instance_map", {})

# This is a list of the stream_writers that there can be only one of. Events can be
# sharded, and therefore doesn't belong here.
singular_stream_writers = [
# This is a list of the stream_writers.
stream_writers = {
"account_data",
"events",
"device_lists",
"presence",
"receipts",
"to_device",
"typing",
"push_rules",
]
}

# Worker-type specific sharding config. Now a single worker can fulfill multiple
# roles, check each.
Expand All @@ -431,28 +437,11 @@ def add_worker_roles_to_shared_config(
if "federation_sender" in worker_types_set:
shared_config.setdefault("federation_sender_instances", []).append(worker_name)

if "event_persister" in worker_types_set:
# Event persisters write to the events stream, so we need to update
# the list of event stream writers
shared_config.setdefault("stream_writers", {}).setdefault("events", []).append(
worker_name
)

# Map of stream writer instance names to host/ports combos
if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
instance_map[worker_name] = {
"path": f"/run/worker.{worker_port}",
}
else:
instance_map[worker_name] = {
"host": "localhost",
"port": worker_port,
}
# Update the list of stream writers. It's convenient that the name of the worker
# type is the same as the stream to write. Iterate over the whole list in case there
# is more than one.
for worker in worker_types_set:
if worker in singular_stream_writers:
if worker in stream_writers:
shared_config.setdefault("stream_writers", {}).setdefault(
worker, []
).append(worker_name)
Expand Down Expand Up @@ -876,6 +865,13 @@ def generate_worker_files(
else:
healthcheck_urls.append("http://localhost:%d/health" % (worker_port,))

# Special case for event_persister: those are just workers that write to
# the `events` stream. For other workers, the worker name is the same
# name of the stream they write to, but for some reason it is not the
# case for event_persister.
if "event_persister" in worker_types_set:
worker_types_set.add("events")
Comment on lines +868 to +873
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like we can have this even further up the chain. Maybe in parse_worker_types(...)?

See #18581 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(still relevant)

Any opinions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, missed that one. It wouldn't work to put it in parse_worker_types, as this is then used to pull out config out of WORKER_CONFIG here:

copy_of_template_config = WORKERS_CONFIG[worker_type].copy()

I think ideally we would need to decouple the worker name from the stream they write to, and define that within the WORKER_CONFIG definition… but this refactoring is out of scope of this PR already way too big PR :p


# Update the shared config with sharding-related options if necessary
add_worker_roles_to_shared_config(
shared_config, worker_types_set, worker_name, worker_port
Expand Down
2 changes: 2 additions & 0 deletions docs/usage/configuration/config_documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -4341,6 +4341,8 @@ This setting has the following sub-options:

* `push_rules` (string): Name of a worker assigned to the `push_rules` stream.

* `device_lists` (string): Name of a worker assigned to the `device_lists` stream.

Example configuration:
```yaml
stream_writers:
Expand Down
21 changes: 17 additions & 4 deletions docs/workers.md
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,8 @@ information.
^/_matrix/client/unstable/im.nheko.summary/summary/.*$
^/_matrix/client/(r0|v3|unstable)/account/3pid$
^/_matrix/client/(r0|v3|unstable)/account/whoami$
^/_matrix/client/(r0|v3|unstable)/devices$
^/_matrix/client/(r0|v3)/delete_devices$
^/_matrix/client/(api/v1|r0|v3|unstable)/devices(/|$)
^/_matrix/client/versions$
^/_matrix/client/(api/v1|r0|v3|unstable)/voip/turnServer$
^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/event/
Expand All @@ -257,7 +258,9 @@ information.
^/_matrix/client/(r0|v3|unstable)/keys/changes$
^/_matrix/client/(r0|v3|unstable)/keys/claim$
^/_matrix/client/(r0|v3|unstable)/room_keys/
^/_matrix/client/(r0|v3|unstable)/keys/upload$
^/_matrix/client/(r0|v3|unstable)/keys/upload
^/_matrix/client/(api/v1|r0|v3|unstable/keys/device_signing/upload$
^/_matrix/client/(api/v1|r0|v3|unstable)/keys/signatures/upload$

# Registration/login requests
^/_matrix/client/(api/v1|r0|v3|unstable)/login$
Expand All @@ -282,7 +285,6 @@ Additionally, the following REST endpoints can be handled for GET requests:

^/_matrix/client/(api/v1|r0|v3|unstable)/pushrules/
^/_matrix/client/unstable/org.matrix.msc4140/delayed_events
^/_matrix/client/(api/v1|r0|v3|unstable)/devices/

# Account data requests
^/_matrix/client/(r0|v3|unstable)/.*/tags
Expand Down Expand Up @@ -329,7 +331,6 @@ set to `true`), the following endpoints can be handled by the worker:
^/_synapse/admin/v2/users/[^/]+$
^/_synapse/admin/v1/username_available$
^/_synapse/admin/v1/users/[^/]+/_allow_cross_signing_replacement_without_uia$
# Only the GET method:
^/_synapse/admin/v1/users/[^/]+/devices$

Note that a [HTTP listener](usage/configuration/config_documentation.md#listeners)
Expand Down Expand Up @@ -550,6 +551,18 @@ the stream writer for the `push_rules` stream:

^/_matrix/client/(api/v1|r0|v3|unstable)/pushrules/

##### The `device_lists` stream

The `device_lists` stream supports multiple writers. The following endpoints
can be handled by any worker, but should be routed directly one of the workers
configured as stream writer for the `device_lists` stream:

^/_matrix/client/(r0|v3)/delete_devices$
^/_matrix/client/(api/v1|r0|v3|unstable)/devices/
^/_matrix/client/(r0|v3|unstable)/keys/upload
^/_matrix/client/(api/v1|r0|v3|unstable/keys/device_signing/upload$
^/_matrix/client/(api/v1|r0|v3|unstable)/keys/signatures/upload$

#### Restrict outbound federation traffic to a specific set of workers

The
Expand Down
3 changes: 3 additions & 0 deletions schema/synapse-config.schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5383,6 +5383,9 @@ properties:
push_rules:
type: string
description: Name of a worker assigned to the `push_rules` stream.
device_lists:
type: string
description: Name of a worker assigned to the `device_lists` stream.
default: {}
examples:
- events: worker1
Expand Down
46 changes: 31 additions & 15 deletions synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,40 +134,44 @@ class WriterLocations:
can only be a single instance.
account_data: The instances that write to the account data streams. Currently
can only be a single instance.
receipts: The instances that write to the receipts stream. Currently
can only be a single instance.
receipts: The instances that write to the receipts stream.
presence: The instances that write to the presence stream. Currently
can only be a single instance.
push_rules: The instances that write to the push stream. Currently
can only be a single instance.
device_lists: The instances that write to the device list stream.
"""

events: List[str] = attr.ib(
default=["master"],
default=[MAIN_PROCESS_INSTANCE_NAME],
converter=_instance_to_list_converter,
)
typing: List[str] = attr.ib(
default=["master"],
default=[MAIN_PROCESS_INSTANCE_NAME],
converter=_instance_to_list_converter,
)
to_device: List[str] = attr.ib(
default=["master"],
default=[MAIN_PROCESS_INSTANCE_NAME],
converter=_instance_to_list_converter,
)
account_data: List[str] = attr.ib(
default=["master"],
default=[MAIN_PROCESS_INSTANCE_NAME],
converter=_instance_to_list_converter,
)
receipts: List[str] = attr.ib(
default=["master"],
default=[MAIN_PROCESS_INSTANCE_NAME],
converter=_instance_to_list_converter,
)
presence: List[str] = attr.ib(
default=["master"],
default=[MAIN_PROCESS_INSTANCE_NAME],
converter=_instance_to_list_converter,
)
push_rules: List[str] = attr.ib(
default=["master"],
default=[MAIN_PROCESS_INSTANCE_NAME],
converter=_instance_to_list_converter,
)
device_lists: List[str] = attr.ib(
default=[MAIN_PROCESS_INSTANCE_NAME],
converter=_instance_to_list_converter,
)

Expand Down Expand Up @@ -358,7 +362,10 @@ def read_config(
):
instances = _instance_to_list_converter(getattr(self.writers, stream))
for instance in instances:
if instance != "master" and instance not in self.instance_map:
if (
instance != MAIN_PROCESS_INSTANCE_NAME
and instance not in self.instance_map
):
raise ConfigError(
"Instance %r is configured to write %s but does not appear in `instance_map` config."
% (instance, stream)
Expand Down Expand Up @@ -397,6 +404,11 @@ def read_config(
"Must only specify one instance to handle `push` messages."
)

if len(self.writers.device_lists) == 0:
raise ConfigError(
"Must specify at least one instance to handle `device_lists` messages."
)

self.events_shard_config = RoutableShardedWorkerHandlingConfig(
self.writers.events
)
Expand All @@ -419,9 +431,12 @@ def read_config(
#
# No effort is made to ensure only a single instance of these tasks is
# running.
background_tasks_instance = config.get("run_background_tasks_on") or "master"
background_tasks_instance = (
config.get("run_background_tasks_on") or MAIN_PROCESS_INSTANCE_NAME
)
self.run_background_tasks = (
self.worker_name is None and background_tasks_instance == "master"
self.worker_name is None
and background_tasks_instance == MAIN_PROCESS_INSTANCE_NAME
) or self.worker_name == background_tasks_instance

self.should_notify_appservices = self._should_this_worker_perform_duty(
Expand Down Expand Up @@ -493,9 +508,10 @@ def _should_this_worker_perform_duty(
# 'don't run here'.
new_option_should_run_here = None
if new_option_name in config:
designated_worker = config[new_option_name] or "master"
designated_worker = config[new_option_name] or MAIN_PROCESS_INSTANCE_NAME
new_option_should_run_here = (
designated_worker == "master" and self.worker_name is None
designated_worker == MAIN_PROCESS_INSTANCE_NAME
and self.worker_name is None
) or designated_worker == self.worker_name

legacy_option_should_run_here = None
Expand Down Expand Up @@ -592,7 +608,7 @@ def _worker_names_performing_this_duty(
# If no worker instances are set we check if the legacy option
# is set, which means use the main process.
if legacy_option:
worker_instances = ["master"]
worker_instances = [MAIN_PROCESS_INSTANCE_NAME]

if self.worker_app == legacy_app_name:
if legacy_option:
Expand Down
3 changes: 2 additions & 1 deletion synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,8 @@ async def _get_device_list_summary(

# Fetch the users who have modified their device list since then.
users_with_changed_device_lists = await self.store.get_all_devices_changed(
from_key, to_key=new_key
MultiWriterStreamToken(stream=from_key),
to_key=MultiWriterStreamToken(stream=new_key),
)

# Filter out any users the application service is not interested in
Expand Down
5 changes: 0 additions & 5 deletions synapse/handlers/deactivate_account.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

from synapse.api.constants import Membership
from synapse.api.errors import SynapseError
from synapse.handlers.device import DeviceHandler
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import Codes, Requester, UserID, create_requester

Expand Down Expand Up @@ -84,10 +83,6 @@ async def deactivate_account(
Returns:
True if identity server supports removing threepids, otherwise False.
"""

# This can only be called on the main process.
assert isinstance(self._device_handler, DeviceHandler)

# Check if this user can be deactivated
if not await self._third_party_rules.check_can_deactivate_user(
user_id, by_admin
Expand Down
Loading
Loading