Replace Khepri topic routing projection with trie + ordered_set (v4)#15619
Open
Replace Khepri topic routing projection with trie + ordered_set (v4)#15619
Conversation
ansd
commented
Mar 3, 2026
Comment on lines
+1618
to
+1630
| %% TODO: Khepri should support declaring dependencies between projections | ||
| %% so that restore order is guaranteed. Until then, we lazily create the | ||
| %% binding table here as a safety net: during restore_projections, the trie | ||
| %% projection may be triggered before the binding projection's table is | ||
| %% initialised due to Khepri's prepend-based list ordering. | ||
| ensure_topic_binding_table(Name) -> | ||
| case ets:whereis(Name) of | ||
| undefined -> | ||
| ets:new(Name, [ordered_set, named_table, protected, | ||
| {read_concurrency, true}]); | ||
| Tid -> | ||
| Tid | ||
| end. |
Member
Author
There was a problem hiding this comment.
@dcorbacho @dumbbell @the-mikedavis How can we register two Khepri projections that depend on each other?
Collaborator
There was a problem hiding this comment.
After thinking about this a bit more, what about managing multiple ETS tables for a single projection?
In other words:
- You register a projection with some options:
#{tables => #{rabbit_khepri_topic_trie_v4 => EtsOptions, rabbit_khepri_topic_binding_v4 => EtsOptions}} - Khepri continues to create the ETS tables with the specified options
- When a projection is triggered, the callback is called with a map of table name/IDs (today, it’s called with a table ID as it’s first argument)
What do you think? In the context of this patch, you would have a single callback to implement that handles both ETS tables.
Member
Author
There was a problem hiding this comment.
Our goal is that at the point in time the projection is triggered both ETS tables must be present.
I think your suggestion solves this. So, yes, that would be perfect, thank you.
Resolves #15588. The previous Khepri topic routing projection (v3) stored topic bindings as sets:set(#binding{}) inside trie leaf nodes. This design had a major performance drawback: On the insertion/deletion path (in the single Khepri Ra process), every binding change required a read-modify-write of the entire sets:set(), making it O(N) in the number of bindings at that leaf. With many MQTT clients connecting concurrently (each subscribing to the same topic filter), this made the Ra process a bottleneck. Another less severe performance issue was that the entire binding was being copied including the binding arguments containing the MQTT 5.0 subscription options such as: ``` {<<"x-mqtt-subscription-opts">>,table, [{<<"id">>,unsignedint,1}, {<<"no-local">>,bool,false}, {<<"qos">>,unsignedbyte,0}, {<<"retain-as-published">>,bool,false}, {<<"retain-handling">>,unsignedbyte,0}]}] ``` Replace the single ETS projection table with two purpose-built tables: 1. Trie edges table (ETS set, read_concurrency=true): - Row: `{{XSrc, ParentNodeId, Word}, ChildNodeId, ChildCount}` - XSrc = {VHost, ExchangeName} (compact 2-tuple of binaries) - NodeId = root | reference() - ChildCount tracks outgoing edges for garbage collection 2. Leaf bindings table (ETS ordered_set, read_concurrency=true): - Key: {NodeId, BindingKey, Dest} - Stored as 1-tuples: {{NodeId, BindingKey, Dest}} - No value column; all data is in the key to minimize copying The trie structure preserves O(depth * 3) routing complexity regardless of the number of overall bindings or wildcard filters. At each trie level, we probe at most 3 edges (literal word, <<"*">>, <<"#">>), each via ets:lookup_element/4 which copies only the ChildNodeId (a reference). The ordered_set for bindings provides: - O(log N) insert and delete per binding (no read-modify-write) - The binding key (needed for MQTT subscription identifiers and topic aliases) is part of the key, so it is returned directly during destination collection without additional lookups Collecting destinations at a matched trie leaf uses a hybrid strategy: - Fanout 0-2 (the common case: unicast, device + stream): up to 3 ets:next/2 probes. Each ets:next/2 call costs O(log N) because the CATree (used with read_concurrency) allocates a fresh tree traversal stack on each call. - Fanout > 2: ets:select/2 with a partially bound key does an O(log N) seek followed by an O(F) range scan. The match spec compilation overhead amortises over the larger result set. ets:lookup_element/4 (OTP 26+) returns a default value on miss instead of throwing badarg, and copies only the requested element on hit. This avoids both exception overhead (misses are common during trie traversal of <<"*">> and <<"#">> branches) and unnecessary data copying (we only need the ChildNodeId, not the full row). Trie node IDs are ephemeral (the tables are rebuilt when the Khepri projection is re-registered). make_ref() is fast, globally unique within a node, and has good hash distribution for the ETS set table. When a binding is deleted, the trie path from root to leaf is collected in a single downward walk (trie_follow_down_get_path). Empty nodes are then pruned bottom-up: a node is empty when its ChildCount is 0 and it has no bindings in the ordered_set table. Benchmarks below were run with 500K routing operations per scenario (on the same machine, back-to-back between main (v3) and this commit. Significant insert/delete improvements: Churn insert (8K bindings, 4 filters/client): ~1,120 vs ~810 ops/s (+38%) v3 did a read-modify-write of sets:set() per binding; v4 does a single ets:insert into the ordered_set plus trie edge updates. MQTT device insert (20K bindings): ~650 vs ~420 ops/s (+55%) Same mechanism as churn insert. Particularly impactful when many clients share the same wildcard filter (e.g. "broadcast.#"), since v3's sets:set() grew with each client while v4 inserts are O(log N) regardless. Same-key fanout insert (10K): ~415 vs ~290 ops/s (+43%) The worst case for v3: all 10K bindings share the same key, so each insert copies and rebuilds the growing sets:set(). Routing improvements: MQTT unicast (10K devices, 20K bindings): ~460K vs ~250K ops/s (+80%) Each route matches 1 queue among 10K unique exact keys plus 10K queues sharing "broadcast.#". v3 stored bindings in the same ETS row as the trie edge, so every trie lookup copied the entire sets:set(). v4 separates trie edges (small rows, set table) from bindings (ordered_set), so the trie walk copies only references. Large fanout (10K queues, same key): ~3,100 vs ~1,170 ops/s (+165%) v3 copied a 10K-element sets:set() out of ETS in a single ets:lookup, then called sets:to_list/1. v4 uses ets:select/2 with a partially bound key, which does an O(log N) seek and then an efficient O(F) range scan without intermediate set conversion. MQTT broadcast (10K fanout): ~0.6 vs ~0.9 ms/route (+50%) Same mechanism as above. Scenarios with no significant change (within benchmark noise): Exact match, wildcard *, wildcard #, mixed wildcards, and many wildcard filters showed no clear difference. Both v3 and v4 use a trie walk, so routing speed is comparable when the fanout is small and the bottleneck is trie traversal rather than destination collection.
Member
Author
|
I mark this PR as
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Resolves #15588.
The previous Khepri topic routing projection (v3) stored topic bindings as sets:set(#binding{}) inside trie leaf nodes. This design had a major performance drawback:
On the insertion/deletion path (in the single Khepri Ra process), every binding change required a read-modify-write of the entire sets:set(), making it O(N) in the number of bindings at that leaf. With many MQTT clients connecting concurrently (each subscribing to the same topic filter), this made the Ra process a bottleneck.
Another less severe performance issue was that the entire binding was being copied including the binding arguments containing the MQTT 5.0 subscription options such as:
Replace the single ETS projection table with two purpose-built tables:
Trie edges table (ETS set, read_concurrency=true):
{{XSrc, ParentNodeId, Word}, ChildNodeId, ChildCount}Leaf bindings table (ETS ordered_set, read_concurrency=true):
The trie structure preserves O(depth * 3) routing complexity regardless of the number of overall bindings or wildcard filters. At each trie level, we probe at most 3 edges (literal word, <<"*">>, <<"#">>), each via ets:lookup_element/4 which copies only the ChildNodeId (a reference).
The ordered_set for bindings provides:
Collecting destinations at a matched trie leaf uses a hybrid strategy:
ets:lookup_element/4 (OTP 26+) returns a default value on miss instead of throwing badarg, and copies only the requested element on hit. This avoids both exception overhead (misses are common during trie traversal of <<"*">> and <<"#">> branches) and unnecessary data copying (we only need the ChildNodeId, not the full row).
Trie node IDs are ephemeral (the tables are rebuilt when the Khepri projection is re-registered). make_ref() is fast, globally unique within a node, and has good hash distribution for the ETS set table.
When a binding is deleted, the trie path from root to leaf is collected in a single downward walk (trie_follow_down_get_path). Empty nodes are then pruned bottom-up: a node is empty when its ChildCount is 0 and it has no bindings in the ordered_set table.
Benchmarks below were run with 500K routing operations per scenario (on the same machine, back-to-back between main (v3) and this commit.
Significant insert/delete improvements:
Churn insert (8K bindings, 4 filters/client): ~1,120 vs ~810 ops/s (+38%)
v3 did a read-modify-write of sets:set() per binding; v4 does
a single ets:insert into the ordered_set plus trie edge updates.
MQTT device insert (20K bindings): ~650 vs ~420 ops/s (+55%)
Same mechanism as churn insert. Particularly impactful when many
clients share the same wildcard filter (e.g. "broadcast.#"),
since v3's sets:set() grew with each client while v4 inserts
are O(log N) regardless.
Same-key fanout insert (10K): ~415 vs ~290 ops/s (+43%)
The worst case for v3: all 10K bindings share the same key,
so each insert copies and rebuilds the growing sets:set().
Routing improvements:
MQTT unicast (10K devices, 20K bindings): ~460K vs ~250K ops/s (+80%)
Each route matches 1 queue among 10K unique exact keys plus
10K queues sharing "broadcast.#". v3 stored bindings in the same
ETS row as the trie edge, so every trie lookup copied the entire
sets:set(). v4 separates trie edges (small rows, set table) from
bindings (ordered_set), so the trie walk copies only references.
Large fanout (10K queues, same key): ~3,100 vs ~1,170 ops/s (+165%)
v3 copied a 10K-element sets:set() out of ETS in a single
ets:lookup, then called sets:to_list/1. v4 uses ets:select/2
with a partially bound key, which does an O(log N) seek and
then an efficient O(F) range scan without intermediate set
conversion.
MQTT broadcast (10K fanout): ~0.6 vs ~0.9 ms/route (+50%)
Same mechanism as above.
Scenarios with no significant change (within benchmark noise):
Exact match, wildcard *, wildcard #, mixed wildcards, and many
wildcard filters showed no clear difference. Both v3 and v4 use
a trie walk, so routing speed is comparable when the fanout is
small and the bottleneck is trie traversal rather than destination
collection.