Skip to content

fix(shard-distributor): send initial state to new subscribers#7499

Merged
jakobht merged 8 commits intocadence-workflow:masterfrom
jakobht:bugInitialState
Dec 2, 2025
Merged

fix(shard-distributor): send initial state to new subscribers#7499
jakobht merged 8 commits intocadence-workflow:masterfrom
jakobht:bugInitialState

Conversation

@jakobht
Copy link
Member

@jakobht jakobht commented Dec 1, 2025

What changed?
Modified the Subscribe method in namespaceShardToExecutor to send the initial executor state to new subscribers immediately upon subscription, and refactored getExecutorState into a separate method to safely retrieve a copy of the current state.

Removed logic for sending initial state from the handler. The logic didn't send the metadata for the executors.

Improved test infrastructure by updating setupExecutorWithShards to use atomic etcd transactions and to properly delete old testdata.

Why?
New subscribers were not receiving the initial state when they subscribed to the executor state pub/sub, causing them to wait for the next update before getting data. This lead to delays in subscribers getting the current shard distribution.

The test improvements ensure executor setup happens atomically and makes tests more maintainable by ensuring deletion of stale state.

How did you test it?
Unit tests

Potential risks

Release notes

Documentation Changes

Signed-off-by: Jakob Haahr Taankvist <jht@uber.com>
Signed-off-by: Jakob Haahr Taankvist <jht@uber.com>
Signed-off-by: Jakob Haahr Taankvist <jht@uber.com>
@jakobht jakobht changed the title Bug initial state fix(shard-distributor): send initial state to new subscribers Dec 1, 2025
Signed-off-by: Jakob Haahr Taankvist <jht@uber.com>

// The go routine sends the initial state to the subscriber.
go func() {
n.RLock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are already locking and inside the function we can remove this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great catch! removed

func (n *namespaceShardToExecutor) Subscribe(ctx context.Context) (<-chan map[*store.ShardOwner][]string, func()) {
return n.pubSub.subscribe(ctx)
subCh, unSub := n.pubSub.subscribe(ctx)
n.logger.Info("initial state", tag.ShardNamespace(n.namespace), tag.Value(n.getExecutorState()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to move this in the go routine and use the same state that we are going to send instead of querying the method twice

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need this log at all, deleted it

Signed-off-by: Jakob Haahr Taankvist <jht@uber.com>
Signed-off-by: Jakob Haahr Taankvist <jht@uber.com>
@jakobht jakobht enabled auto-merge (squash) December 2, 2025 11:53
@jakobht jakobht merged commit 956a742 into cadence-workflow:master Dec 2, 2025
41 checks passed
@jakobht jakobht deleted the bugInitialState branch March 2, 2026 11:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants