Skip to content

Comments

[IMPROVED] MQTT: Retained msgs handling with cross account and transform#7636

Merged
derekcollison merged 1 commit intomainfrom
mqtt_cross_account_retain_transform
Dec 12, 2025
Merged

[IMPROVED] MQTT: Retained msgs handling with cross account and transform#7636
derekcollison merged 1 commit intomainfrom
mqtt_cross_account_retain_transform

Conversation

@kozlovic
Copy link
Member

This PR allows retained messages to work even if a situation where the retained message stream sources from another account and has subject transform.

Regardless of that, the addition and removal of retained messages are now happening from the retained message stream's JS consumer callback. Previously, when receiving a retained message with an empty payload the server would remove the retained message on the given subject and notify the rest of the servers in the network with a NATS message. With this PR, the server simply stores the message with the empty body and all servers in the cluster can process this "delete" of a retained message by handling the fact that the payload is empty in processRetainedMsg which is the callback for the JS consumer.

This PR still keeps the network notification for backward compatibility and makes sure that the older server cannot decode (and therefore ignore) a stored retained message with an empty payload.

This PR also fixes the fact that if a retained message can no longer be found in the stream, it is appropriately removed from the map.

Also, when recovering a retained message, the topic/subject is recreated based on the subject of the message read from the stream (minus the prefix). This allows subject transform to be properly supported.

The server was previously adding shadow subscriptions' subject in the list of retained message subjects that need to be loaded, which I believe is wrong since those are on different account. No message would be found on the origin subject on the import account.

The PR also includes various other fixes. One thing not done in this PR is to replace the use of Sublist with a GenericSublist that could be more useful/efficient, but would have to port the ReverseMatch to the generic sublist. This is trivial but would have added more changes to this already big PR. This can be done in a different one.

Signed-off-by: Ivan Kozlovic ivan@synadia.com

This PR allows retained messages to work even if a situation where
the retained message stream sources from another account and has
subject transform.

Regardless of that, the addition and removal of retained messages are
now happening from the retained message stream's JS consumer callback.
Previously, when receiving a retained message with an empty payload
the server would remove the retained message on the given subject
and notify the rest of the servers in the network with a NATS message.
With this PR, the server simply stores the message with the empty
body and all servers in the cluster can process this "delete" of
a retained message by handling the fact that the payload is empty
in `processRetainedMsg` which is the callback for the JS consumer.

This PR still keeps the network notification for backward compatibility
and makes sure that the older server cannot decode (and therefore
ignore) a stored retained message with an empty payload.

This PR also fixes the fact that if a retained message can no longer
be found in the stream, it is appropriately removed from the map.

Also, when recovering a retained message, the topic/subject is
recreated based on the subject of the message read from the stream
(minus the prefix). This allows subject transform to be properly
supported.

The server was previously adding shadow subscriptions' subject
in the list of retained message subjects that need to be loaded,
which I believe is wrong since those are on different account.
No message would be found on the origin subject on the import
account.

The PR also includes various other fixes. One thing not done in
this PR is to replace the use of Sublist with a GenericSublist
that could be more useful/efficient, but would have to port
the ReverseMatch to the generic sublist. This is trivial but
would have added more changes to this already big PR. This can
be done in a different one.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
@kozlovic kozlovic requested a review from a team as a code owner December 11, 2025 21:37
// Run from various go routines (JS consumer, etc..).
// No lock held on entry.
func (as *mqttAccountSessionManager) processRetainedMsg(_ *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
func (as *mqttAccountSessionManager) processRetainedMsg(sub *subscription, c *client, acc *Account, subject, reply string, rmsg []byte) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sub and acc are not needed. The change was done during dev but I forgot to revert I guess. I will update as part of other requested changes that may come as part of the review.

Copy link
Member

@derekcollison derekcollison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@derekcollison derekcollison merged commit 8757442 into main Dec 12, 2025
111 of 114 checks passed
@derekcollison derekcollison deleted the mqtt_cross_account_retain_transform branch December 12, 2025 16:16
neilalexander added a commit that referenced this pull request Dec 15, 2025
Includes the following:

- #7636
- #7637
- #7643
- #7639
- #7648
- #7634
- #7649

Signed-off-by: Neil Twigg <neil@nats.io>
neilalexander added a commit that referenced this pull request Dec 18, 2025
Includes the following:

- #7553
- #7555
- #7579
- #7578
- #7581
- #7585
- #7586
- #7588
- #7593
- #7594
- #7595
- #7596
- #7597
- #7598
- #7601
- #7604
- #7605
- #7610
- #7616
- #7614
- #7622
- #7619
- #7624
- #7625
- #7627
- #7636
- #7637
- #7643
- #7648
- #7634
- #7655
- #7656

Signed-off-by: Neil Twigg <neil@nats.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants