-
Notifications
You must be signed in to change notification settings - Fork 424
Move device changes off the main process #18581
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
81bf67d
4caf770
a7ba48a
cf5d5f5
4c39d80
1bd919a
db3facd
2b67e3e
709779f
d7fc913
3e8ff9d
3a1b253
4f655a6
839e260
4d5ad56
cd201d8
cd9e98e
a197067
10f5b88
e707d43
e07bc58
77c718f
1a45bf0
d7cc4ba
242884c
0dddd99
641f729
42e7df4
b94e314
fbe46bc
56a5cf8
1556872
d6e0ec4
d436e89
0e54b94
bd929d4
04d9780
9a770b3
8c94934
fdcd607
cd400c2
4394314
da58cab
e02cf43
9572e95
38a243b
bedf66b
90eb9ba
8a89a25
2a28d42
8173ad7
4e2b9af
3ed6db7
a0e4e2d
8488e1f
a64dafd
0a5d2d1
e94d0d3
bd9a360
60ea378
565403e
339c90f
d766b81
07f32aa
d916e9e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Enable workers to write directly to the device lists stream and handle device list updates, reducing load on the main process. | ||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -178,6 +178,8 @@ | |||||
| "^/_matrix/client/(api/v1|r0|v3|unstable)/login$", | ||||||
| "^/_matrix/client/(api/v1|r0|v3|unstable)/account/3pid$", | ||||||
| "^/_matrix/client/(api/v1|r0|v3|unstable)/account/whoami$", | ||||||
| "^/_matrix/client/(api/v1|r0|v3|unstable)/devices(/|$)", | ||||||
| "^/_matrix/client/(r0|v3)/delete_devices$", | ||||||
| "^/_matrix/client/versions$", | ||||||
| "^/_matrix/client/(api/v1|r0|v3|unstable)/voip/turnServer$", | ||||||
| "^/_matrix/client/(r0|v3|unstable)/register$", | ||||||
|
|
@@ -194,6 +196,9 @@ | |||||
| "^/_matrix/client/(api/v1|r0|v3|unstable)/directory/room/.*$", | ||||||
| "^/_matrix/client/(r0|v3|unstable)/capabilities$", | ||||||
| "^/_matrix/client/(r0|v3|unstable)/notifications$", | ||||||
| "^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload", | ||||||
| "^/_matrix/client/(api/v1|r0|v3|unstable)/keys/device_signing/upload$", | ||||||
| "^/_matrix/client/(api/v1|r0|v3|unstable)/keys/signatures/upload$", | ||||||
| ], | ||||||
| "shared_extra_conf": {}, | ||||||
| "worker_extra_conf": "", | ||||||
|
|
@@ -265,13 +270,6 @@ | |||||
| "shared_extra_conf": {}, | ||||||
| "worker_extra_conf": "", | ||||||
| }, | ||||||
| "frontend_proxy": { | ||||||
| "app": "synapse.app.generic_worker", | ||||||
| "listener_resources": ["client", "replication"], | ||||||
| "endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"], | ||||||
| "shared_extra_conf": {}, | ||||||
| "worker_extra_conf": "", | ||||||
| }, | ||||||
| "account_data": { | ||||||
| "app": "synapse.app.generic_worker", | ||||||
| "listener_resources": ["client", "replication"], | ||||||
|
|
@@ -306,6 +304,13 @@ | |||||
| "shared_extra_conf": {}, | ||||||
| "worker_extra_conf": "", | ||||||
| }, | ||||||
| "device_lists": { | ||||||
| "app": "synapse.app.generic_worker", | ||||||
| "listener_resources": ["client", "replication"], | ||||||
| "endpoint_patterns": [], | ||||||
| "shared_extra_conf": {}, | ||||||
| "worker_extra_conf": "", | ||||||
| }, | ||||||
| "typing": { | ||||||
| "app": "synapse.app.generic_worker", | ||||||
| "listener_resources": ["client", "replication"], | ||||||
|
|
@@ -412,16 +417,17 @@ def add_worker_roles_to_shared_config( | |||||
| # streams | ||||||
| instance_map = shared_config.setdefault("instance_map", {}) | ||||||
|
|
||||||
| # This is a list of the stream_writers that there can be only one of. Events can be | ||||||
| # sharded, and therefore doesn't belong here. | ||||||
| singular_stream_writers = [ | ||||||
| # This is a list of the stream_writers. | ||||||
| stream_writers = { | ||||||
| "account_data", | ||||||
| "events", | ||||||
| "device_lists", | ||||||
| "presence", | ||||||
| "receipts", | ||||||
| "to_device", | ||||||
| "typing", | ||||||
| "push_rules", | ||||||
| ] | ||||||
| } | ||||||
|
|
||||||
| # Worker-type specific sharding config. Now a single worker can fulfill multiple | ||||||
| # roles, check each. | ||||||
|
|
@@ -431,28 +437,11 @@ def add_worker_roles_to_shared_config( | |||||
| if "federation_sender" in worker_types_set: | ||||||
| shared_config.setdefault("federation_sender_instances", []).append(worker_name) | ||||||
|
|
||||||
| if "event_persister" in worker_types_set: | ||||||
| # Event persisters write to the events stream, so we need to update | ||||||
| # the list of event stream writers | ||||||
| shared_config.setdefault("stream_writers", {}).setdefault("events", []).append( | ||||||
| worker_name | ||||||
| ) | ||||||
|
|
||||||
| # Map of stream writer instance names to host/ports combos | ||||||
| if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False): | ||||||
| instance_map[worker_name] = { | ||||||
| "path": f"/run/worker.{worker_port}", | ||||||
| } | ||||||
| else: | ||||||
| instance_map[worker_name] = { | ||||||
| "host": "localhost", | ||||||
| "port": worker_port, | ||||||
| } | ||||||
| # Update the list of stream writers. It's convenient that the name of the worker | ||||||
| # type is the same as the stream to write. Iterate over the whole list in case there | ||||||
| # is more than one. | ||||||
| for worker in worker_types_set: | ||||||
| if worker in singular_stream_writers: | ||||||
| if worker in stream_writers: | ||||||
| shared_config.setdefault("stream_writers", {}).setdefault( | ||||||
| worker, [] | ||||||
| ).append(worker_name) | ||||||
|
|
@@ -876,6 +865,13 @@ def generate_worker_files( | |||||
| else: | ||||||
| healthcheck_urls.append("http://localhost:%d/health" % (worker_port,)) | ||||||
|
|
||||||
| # Special case for event_persister: those are just workers that write to | ||||||
| # the `events` stream. For other workers, the worker name is the same | ||||||
| # name of the stream they write to, but for some reason it is not the | ||||||
| # case for event_persister. | ||||||
| if "event_persister" in worker_types_set: | ||||||
| worker_types_set.add("events") | ||||||
|
Comment on lines
+868
to
+873
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Feels like we can have this even further up the chain. Maybe in See #18581 (comment)
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (still relevant) Any opinions?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, missed that one. It wouldn't work to put it in parse_worker_types, as this is then used to pull out config out of WORKER_CONFIG here: synapse/docker/configure_workers_and_start.py Lines 841 to 842 in 0a5d2d1
I think ideally we would need to decouple the worker name from the stream they write to, and define that within the WORKER_CONFIG definition… but this refactoring is out of scope of this PR already way too big PR :p |
||||||
|
|
||||||
| # Update the shared config with sharding-related options if necessary | ||||||
| add_worker_roles_to_shared_config( | ||||||
| shared_config, worker_types_set, worker_name, worker_port | ||||||
|
|
||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, this is the type of thing I can't really review (related discussion: #18581 (comment))
I can add an approval but it's just like "sure".
Feels like some typing/lints are missing around ensuring only serializable data is passed to a
ReplicationEndpointbut this is something to address outside of this PR.