Skip to content

Replace Khepri topic routing projection with trie + ordered_set (v4)#15619

Open
ansd wants to merge 1 commit intomainfrom
da-topic-routing
Open

Replace Khepri topic routing projection with trie + ordered_set (v4)#15619
ansd wants to merge 1 commit intomainfrom
da-topic-routing

Conversation

@ansd
Copy link
Member

@ansd ansd commented Mar 3, 2026

Resolves #15588.

The previous Khepri topic routing projection (v3) stored topic bindings as sets:set(#binding{}) inside trie leaf nodes. This design had a major performance drawback:

On the insertion/deletion path (in the single Khepri Ra process), every binding change required a read-modify-write of the entire sets:set(), making it O(N) in the number of bindings at that leaf. With many MQTT clients connecting concurrently (each subscribing to the same topic filter), this made the Ra process a bottleneck.

Another less severe performance issue was that the entire binding was being copied including the binding arguments containing the MQTT 5.0 subscription options such as:

{<<"x-mqtt-subscription-opts">>,table,
 [{<<"id">>,unsignedint,1},
  {<<"no-local">>,bool,false},
  {<<"qos">>,unsignedbyte,0},
  {<<"retain-as-published">>,bool,false},
  {<<"retain-handling">>,unsignedbyte,0}]}]

Replace the single ETS projection table with two purpose-built tables:

  1. Trie edges table (ETS set, read_concurrency=true):

    • Row: {{XSrc, ParentNodeId, Word}, ChildNodeId, ChildCount}
    • XSrc = {VHost, ExchangeName} (compact 2-tuple of binaries)
    • NodeId = root | reference()
    • ChildCount tracks outgoing edges for garbage collection
  2. Leaf bindings table (ETS ordered_set, read_concurrency=true):

    • Key: {NodeId, BindingKey, Dest}
    • Stored as 1-tuples: {{NodeId, BindingKey, Dest}}
    • No value column; all data is in the key to minimize copying

The trie structure preserves O(depth * 3) routing complexity regardless of the number of overall bindings or wildcard filters. At each trie level, we probe at most 3 edges (literal word, <<"*">>, <<"#">>), each via ets:lookup_element/4 which copies only the ChildNodeId (a reference).

The ordered_set for bindings provides:

  • O(log N) insert and delete per binding (no read-modify-write)
  • The binding key (needed for MQTT subscription identifiers and topic aliases) is part of the key, so it is returned directly during destination collection without additional lookups

Collecting destinations at a matched trie leaf uses a hybrid strategy:

  • Fanout 0-2 (the common case: unicast, device + stream): up to 3 ets:next/2 probes. Each ets:next/2 call costs O(log N) because the CATree (used with read_concurrency) allocates a fresh tree traversal stack on each call.
  • Fanout > 2: ets:select/2 with a partially bound key does an O(log N) seek followed by an O(F) range scan. The match spec compilation overhead amortises over the larger result set.

ets:lookup_element/4 (OTP 26+) returns a default value on miss instead of throwing badarg, and copies only the requested element on hit. This avoids both exception overhead (misses are common during trie traversal of <<"*">> and <<"#">> branches) and unnecessary data copying (we only need the ChildNodeId, not the full row).

Trie node IDs are ephemeral (the tables are rebuilt when the Khepri projection is re-registered). make_ref() is fast, globally unique within a node, and has good hash distribution for the ETS set table.

When a binding is deleted, the trie path from root to leaf is collected in a single downward walk (trie_follow_down_get_path). Empty nodes are then pruned bottom-up: a node is empty when its ChildCount is 0 and it has no bindings in the ordered_set table.

Benchmarks below were run with 500K routing operations per scenario (on the same machine, back-to-back between main (v3) and this commit.

Significant insert/delete improvements:

Churn insert (8K bindings, 4 filters/client): ~1,120 vs ~810 ops/s (+38%)
v3 did a read-modify-write of sets:set() per binding; v4 does
a single ets:insert into the ordered_set plus trie edge updates.

MQTT device insert (20K bindings): ~650 vs ~420 ops/s (+55%)
Same mechanism as churn insert. Particularly impactful when many
clients share the same wildcard filter (e.g. "broadcast.#"),
since v3's sets:set() grew with each client while v4 inserts
are O(log N) regardless.

Same-key fanout insert (10K): ~415 vs ~290 ops/s (+43%)
The worst case for v3: all 10K bindings share the same key,
so each insert copies and rebuilds the growing sets:set().

Routing improvements:

MQTT unicast (10K devices, 20K bindings): ~460K vs ~250K ops/s (+80%)
Each route matches 1 queue among 10K unique exact keys plus
10K queues sharing "broadcast.#". v3 stored bindings in the same
ETS row as the trie edge, so every trie lookup copied the entire
sets:set(). v4 separates trie edges (small rows, set table) from
bindings (ordered_set), so the trie walk copies only references.

Large fanout (10K queues, same key): ~3,100 vs ~1,170 ops/s (+165%)
v3 copied a 10K-element sets:set() out of ETS in a single
ets:lookup, then called sets:to_list/1. v4 uses ets:select/2
with a partially bound key, which does an O(log N) seek and
then an efficient O(F) range scan without intermediate set
conversion.

MQTT broadcast (10K fanout): ~0.6 vs ~0.9 ms/route (+50%)
Same mechanism as above.

Scenarios with no significant change (within benchmark noise):

Exact match, wildcard *, wildcard #, mixed wildcards, and many
wildcard filters showed no clear difference. Both v3 and v4 use
a trie walk, so routing speed is comparable when the fanout is
small and the bottleneck is trie traversal rather than destination
collection.

Comment on lines +1618 to +1630
%% TODO: Khepri should support declaring dependencies between projections
%% so that restore order is guaranteed. Until then, we lazily create the
%% binding table here as a safety net: during restore_projections, the trie
%% projection may be triggered before the binding projection's table is
%% initialised due to Khepri's prepend-based list ordering.
ensure_topic_binding_table(Name) ->
case ets:whereis(Name) of
undefined ->
ets:new(Name, [ordered_set, named_table, protected,
{read_concurrency, true}]);
Tid ->
Tid
end.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dcorbacho @dumbbell @the-mikedavis How can we register two Khepri projections that depend on each other?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking about this a bit more, what about managing multiple ETS tables for a single projection?

In other words:

  1. You register a projection with some options:
    #{tables => #{rabbit_khepri_topic_trie_v4 => EtsOptions,
                  rabbit_khepri_topic_binding_v4 => EtsOptions}}
    
  2. Khepri continues to create the ETS tables with the specified options
  3. When a projection is triggered, the callback is called with a map of table name/IDs (today, it’s called with a table ID as it’s first argument)

What do you think? In the context of this patch, you would have a single callback to implement that handles both ETS tables.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our goal is that at the point in time the projection is triggered both ETS tables must be present.
I think your suggestion solves this. So, yes, that would be perfect, thank you.

@ansd ansd force-pushed the da-topic-routing branch from b13e222 to 6cefc2e Compare March 3, 2026 08:37
Resolves #15588.

The previous Khepri topic routing projection (v3) stored topic bindings
as sets:set(#binding{}) inside trie leaf nodes. This design had a
major performance drawback:

On the insertion/deletion path (in the single Khepri Ra process),
every binding change required a read-modify-write of the entire
sets:set(), making it O(N) in the number of bindings at that leaf.
With many MQTT clients connecting concurrently (each subscribing to
the same topic filter), this made the Ra process a bottleneck.

Another less severe performance issue was that the entire binding
was being copied including the binding arguments containing the MQTT 5.0
subscription options such as:
```
{<<"x-mqtt-subscription-opts">>,table,
 [{<<"id">>,unsignedint,1},
  {<<"no-local">>,bool,false},
  {<<"qos">>,unsignedbyte,0},
  {<<"retain-as-published">>,bool,false},
  {<<"retain-handling">>,unsignedbyte,0}]}]
```

Replace the single ETS projection table with two purpose-built tables:

1. Trie edges table (ETS set, read_concurrency=true):
   - Row: `{{XSrc, ParentNodeId, Word}, ChildNodeId, ChildCount}`
   - XSrc = {VHost, ExchangeName} (compact 2-tuple of binaries)
   - NodeId = root | reference()
   - ChildCount tracks outgoing edges for garbage collection

2. Leaf bindings table (ETS ordered_set, read_concurrency=true):
   - Key: {NodeId, BindingKey, Dest}
   - Stored as 1-tuples: {{NodeId, BindingKey, Dest}}
   - No value column; all data is in the key to minimize copying

The trie structure preserves O(depth * 3) routing complexity regardless
of the number of overall bindings or wildcard filters. At each trie level, we
probe at most 3 edges (literal word, <<"*">>, <<"#">>), each via
ets:lookup_element/4 which copies only the ChildNodeId (a reference).

The ordered_set for bindings provides:
- O(log N) insert and delete per binding (no read-modify-write)
- The binding key (needed for MQTT subscription identifiers and topic
  aliases) is part of the key, so it is returned directly during
  destination collection without additional lookups

Collecting destinations at a matched trie leaf uses a hybrid strategy:
- Fanout 0-2 (the common case: unicast, device + stream): up to 3
  ets:next/2 probes. Each ets:next/2 call costs O(log N) because the
  CATree (used with read_concurrency) allocates a fresh tree traversal
  stack on each call.
- Fanout > 2: ets:select/2 with a partially bound key does an O(log N)
  seek followed by an O(F) range scan. The match spec compilation
  overhead amortises over the larger result set.

ets:lookup_element/4 (OTP 26+) returns a default value on miss
instead of throwing badarg, and copies only the requested element
on hit. This avoids both exception overhead (misses are common during
trie traversal of <<"*">> and <<"#">> branches) and unnecessary data
copying (we only need the ChildNodeId, not the full row).

Trie node IDs are ephemeral (the tables are rebuilt when the Khepri
projection is re-registered). make_ref() is fast, globally unique
within a node, and has good hash distribution for the ETS set table.

When a binding is deleted, the trie path from root to leaf is collected
in a single downward walk (trie_follow_down_get_path). Empty nodes are
then pruned bottom-up: a node is empty when its ChildCount is 0 and it
has no bindings in the ordered_set table.

Benchmarks below were run with 500K routing operations per scenario
(on the same machine, back-to-back between main (v3) and this commit.

Significant insert/delete improvements:

  Churn insert (8K bindings, 4 filters/client): ~1,120 vs ~810 ops/s (+38%)
    v3 did a read-modify-write of sets:set() per binding; v4 does
    a single ets:insert into the ordered_set plus trie edge updates.

  MQTT device insert (20K bindings): ~650 vs ~420 ops/s (+55%)
    Same mechanism as churn insert. Particularly impactful when many
    clients share the same wildcard filter (e.g. "broadcast.#"),
    since v3's sets:set() grew with each client while v4 inserts
    are O(log N) regardless.

  Same-key fanout insert (10K): ~415 vs ~290 ops/s (+43%)
    The worst case for v3: all 10K bindings share the same key,
    so each insert copies and rebuilds the growing sets:set().

Routing improvements:

  MQTT unicast (10K devices, 20K bindings): ~460K vs ~250K ops/s (+80%)
    Each route matches 1 queue among 10K unique exact keys plus
    10K queues sharing "broadcast.#". v3 stored bindings in the same
    ETS row as the trie edge, so every trie lookup copied the entire
    sets:set(). v4 separates trie edges (small rows, set table) from
    bindings (ordered_set), so the trie walk copies only references.

  Large fanout (10K queues, same key): ~3,100 vs ~1,170 ops/s (+165%)
    v3 copied a 10K-element sets:set() out of ETS in a single
    ets:lookup, then called sets:to_list/1. v4 uses ets:select/2
    with a partially bound key, which does an O(log N) seek and
    then an efficient O(F) range scan without intermediate set
    conversion.

  MQTT broadcast (10K fanout): ~0.6 vs ~0.9 ms/route (+50%)
    Same mechanism as above.

Scenarios with no significant change (within benchmark noise):

  Exact match, wildcard *, wildcard #, mixed wildcards, and many
  wildcard filters showed no clear difference. Both v3 and v4 use
  a trie walk, so routing speed is comparable when the fanout is
  small and the bottleneck is trie traversal rather than destination
  collection.
@ansd ansd force-pushed the da-topic-routing branch from 6cefc2e to 22464ce Compare March 3, 2026 09:19
@ansd
Copy link
Member Author

ansd commented Mar 3, 2026

I mark this PR as Ready for review but we should not merge this PR until all of the following are done:

@ansd ansd marked this pull request as ready for review March 3, 2026 10:33
@ansd ansd requested a review from the-mikedavis March 3, 2026 10:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Performance issue for subscriptions with the same topic filter

2 participants