Skip to content

fix(shard-distributor): fix high-frequent triggering of the rebalancing loop#7696

Merged
arzonus merged 2 commits intocadence-workflow:masterfrom
arzonus:refactor-subscribe-to-subscribe-to-executor-status-changes
Feb 12, 2026
Merged

fix(shard-distributor): fix high-frequent triggering of the rebalancing loop#7696
arzonus merged 2 commits intocadence-workflow:masterfrom
arzonus:refactor-subscribe-to-subscribe-to-executor-status-changes

Conversation

@arzonus
Copy link
Contributor

@arzonus arzonus commented Feb 11, 2026

What changed?

  • Subscribe method has been modified to filter only events that contain the executor's changed status key
  • Subscribe method has been renamed to SubscribeToExecutorStatusChanges
  • A cooldown period has been added to the rebalancing loop to not be called more frequently than LeaderProcess.Cooldown (250ms by default).

Why?

  • We have observed a high rate of triggering of the rebalancing loop, causing increased latency for GetShardOwner method. This change should reduce the number of events that trigger the rebalancing loop. Mostly, the triggers were caused by reported_shards and status etcd keys that were updated mostly every 3 seconds by each executor.
  • After this change, in case of no changes from executors, we should not have triggered the rebalancing loop more frequently than once per LeaderProcessor.Period
  • LeaderProcess.Cooldown has been added to reduce the rate of triggering the rebalancing loop in case of a burst of status changes.

How did you test it?

  • ETCD=1 go test /service/sharddistributor/...
  • Run on dev cluster

Potential risks
N/A

Release notes
N/A

Documentation Changes
N/A


Reviewer Validation

PR Description Quality (check these before reviewing code):

  • "What changed" provides a clear 1-2 line summary
    • Project Issue is linked
  • "Why" explains the full motivation with sufficient context
  • Testing is documented:
    • Unit test commands are included (with exact go test invocation)
    • Integration test setup/commands included (if integration tests were run)
    • Canary testing details included (if canary was mentioned)
  • Potential risks section is thoughtfully filled out (or legitimately N/A)
  • Release notes included if this completes a user-facing feature
  • Documentation needs are addressed (or noted if uncertain)

@arzonus
Copy link
Contributor Author

arzonus commented Feb 12, 2026

gitar apply your suggestions

@gitar-bot
Copy link

gitar-bot bot commented Feb 12, 2026

gitar apply your suggestions

Applied both suggestions:

  1. ✅ Fixed metric operation string in common/metrics/defs.go:2182 - removed erroneous "Scope" suffix
  2. ✅ Fixed doc comment in service/sharddistributor/store/store.go:73 - now correctly describes status transitions instead of heartbeat updates

Changes have been committed.

@arzonus arzonus force-pushed the refactor-subscribe-to-subscribe-to-executor-status-changes branch 2 times, most recently from 0237e40 to cd8755a Compare February 12, 2026 12:42
@arzonus arzonus force-pushed the refactor-subscribe-to-subscribe-to-executor-status-changes branch from cd8755a to 91826c2 Compare February 12, 2026 12:52
@arzonus arzonus changed the title chore(shard-distributor): refactor subscribe method to subscribe to executor status changes fix(shard-distributor): fix high-frequency triggering of the rebalancing loop Feb 12, 2026
@arzonus arzonus changed the title fix(shard-distributor): fix high-frequency triggering of the rebalancing loop fix(shard-distributor): fix high-frequent triggering of the rebalancing loop Feb 12, 2026

// Cooldown is the minimum duration between shard rebalance operations
// Default: 250ms
Cooldown time.Duration `yaml:"cooldown"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Maybe we should call this "RebalanceCooldown" or something like that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx, yeah, renamed it 👍

// If an update comes in before the cooldown has expired,
// we wait until the cooldown has passed since the last rebalance before processing it.
// This ensures that we don't rebalance too frequently in response to a flurry of updates
p.timeSource.Sleep(nextRebalanceAllowedAt.Sub(p.timeSource.Now()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make sure:

We already de coupled the sender right? So this will not block the sender.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this loop already doesn't block the watch event processing, so it's safe to have the cooldown here 👍

// If an update comes in before the cooldown has expired,
// we wait until the cooldown has passed since the last rebalance before processing it.
// This ensures that we don't rebalance too frequently in response to a flurry of updates
p.timeSource.Sleep(nextRebalanceAllowedAt.Sub(p.timeSource.Now()))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Bug: Cooldown Sleep is not context-aware, blocks shutdown

p.timeSource.Sleep(...) at line 211 is not context-aware. If the context is cancelled (e.g., during graceful shutdown via Terminate()), the goroutine will block for up to the remaining cooldown duration (up to 250ms) before it can observe the cancellation. While 250ms is short, this introduces unnecessary shutdown latency and breaks the expected contract that context cancellation should promptly stop processing.

The TimeSource interface already provides SleepWithContext(ctx, d) which respects context cancellation. Using it here would allow the loop to exit promptly when the context is cancelled during a cooldown sleep.

Additionally, when the cooldown hasn't expired and a negative duration is computed (i.e., nextRebalanceAllowedAt is already in the past), time.Sleep with a negative duration returns immediately — this is fine behavior, but using SleepWithContext is still preferred for the cancellation benefit.

Suggested fix:

			if err := p.timeSource.SleepWithContext(ctx, nextRebalanceAllowedAt.Sub(p.timeSource.Now())); err != nil {
				return
			}

Was this helpful? React with 👍 / 👎

@gitar-bot
Copy link

gitar-bot bot commented Feb 12, 2026

Running post-merge workflows

🔍 CI failure analysis for ee56813: Test failure in unrelated election package due to mock cleanup timing race condition - same flaky test that failed in earlier runs.

Issue

Test TestElector_Run in service/sharddistributor/leader/election failed due to a missing mock expectation.

Root Cause

The test expects Cleanup() to be called on the mock election object before completion, but the test's context is cancelled before the cleanup code path executes. This is a race condition between context cancellation and cleanup execution timing.

controller.go:97: missing call(s) to *store.MockElection.Cleanup(is anything)

Details

This failure is unrelated to the PR changes. The PR modifies:

  • SubscribeSubscribeToExecutorStatusChanges method rename
  • Event filtering logic in etcd store (filter on ExecutorStatusKey only)
  • Added WithPrevKV for value comparison
  • Added cooldown period with RebalanceCooldown configuration
  • Updates to callers in the processor

The failing test is in leader/election package and tests the elector's Run method, which has no connection to subscription mechanism changes or rebalancing cooldown logic.

Historical Evidence: This exact same test failure occurred in an earlier CI run for this PR (commit faba8f8), confirming it's a pre-existing flaky test unrelated to these changes.

Code Review ⚠️ Changes requested 5 resolved / 6 findings

Clean and well-structured refactor that effectively filters executor status changes and adds a cooldown. One important issue: the cooldown Sleep call is not context-aware, which can delay graceful shutdown — use SleepWithContext instead.

⚠️ Bug: Cooldown Sleep is not context-aware, blocks shutdown

📄 service/sharddistributor/leader/process/processor.go:211

p.timeSource.Sleep(...) at line 211 is not context-aware. If the context is cancelled (e.g., during graceful shutdown via Terminate()), the goroutine will block for up to the remaining cooldown duration (up to 250ms) before it can observe the cancellation. While 250ms is short, this introduces unnecessary shutdown latency and breaks the expected contract that context cancellation should promptly stop processing.

The TimeSource interface already provides SleepWithContext(ctx, d) which respects context cancellation. Using it here would allow the loop to exit promptly when the context is cancelled during a cooldown sleep.

Additionally, when the cooldown hasn't expired and a negative duration is computed (i.e., nextRebalanceAllowedAt is already in the past), time.Sleep with a negative duration returns immediately — this is fine behavior, but using SleepWithContext is still preferred for the cancellation benefit.

Suggested fix
			if err := p.timeSource.SleepWithContext(ctx, nextRebalanceAllowedAt.Sub(p.timeSource.Now())); err != nil {
				return
			}
✅ 5 resolved
Bug: Cooldown nextRebalanceAllowedAt set from pre-sleep time

📄 service/sharddistributor/leader/process/processor.go:214
The cooldown's nextRebalanceAllowedAt is computed from now which was captured before the sleep, but should be based on the time after the sleep completes.

When the sleep is non-zero (i.e., a burst of updates), the sequence is:

  1. now = T100, nextRebalanceAllowedAt = T200 → sleep 100ms
  2. After sleep, wall clock is ~T200, but nextRebalanceAllowedAt = T100 + 250ms = T350
  3. Rebalance runs at ~T200, next one allowed at T350 → effective cooldown is only ~150ms

The intended behavior is for the cooldown to be measured from when the rebalance actually runs, not from when the update was received. This means the cooldown is shorter than configured during bursts, which is exactly the scenario it's meant to protect against.

Quality: Spurious require.NoError(t, err) checks stale err value

📄 service/sharddistributor/store/etcd/executorstore/etcdstore_test.go:438
On line 438, require.NoError(t, err) checks the outer-scope err variable, which was last assigned and already checked on lines 423–424 (the heartbeat Put call from test case #1). This require.NoError is checking an already-validated error, making it misleading — it appears to validate reportedShardsKey construction (line 437), but BuildExecutorKey doesn't return an error.

This was present in the old code and was carried over during the refactor, but since the test cases are now restructured into separate blocks, this stale assertion is more visible and confusing.

Quality: Stale test comment says reported shards "IS a significant change"

📄 service/sharddistributor/store/etcd/executorstore/etcdstore_test.go:431
The comment on line 431 says // Now update the reported shards, which IS a significant change but in the refactored code, reported shards updates are explicitly NOT significant — the test itself correctly asserts on line 443 that no notification should be received. This comment was leftover from the old Subscribe behavior and is now misleading.

Bug: Metric operation string has erroneous "Scope" suffix

📄 common/metrics/defs.go:2182
The operation string for ShardDistributorStoreSubscribeToExecutorStatusChangesScope is "StoreSubscribeToExecutorStatusChangesScope" — note the trailing "Scope" suffix. All other operation strings in this block follow the pattern of being the operation name without the "Scope" suffix (e.g., "StoreSubscribe", "StoreSubscribeToAssignmentChanges", "StoreRecordHeartbeat").

This will result in inconsistent metric names in dashboards and monitoring, making it harder to query and correlate metrics. The old value was "StoreSubscribe", so the new value should be "StoreSubscribeToExecutorStatusChanges" (without the trailing "Scope").

Quality: Interface doc comment incorrectly cites heartbeat as example

📄 service/sharddistributor/store/store.go:73
The doc comment on SubscribeToExecutorStatusChanges says:

// SubscribeToExecutorStatusChanges subscribes to changes of executors' status (e.g. heartbeat updates) within a namespace.

Heartbeat updates are explicitly filtered OUT by this method — the entire point of this PR is to avoid triggering on heartbeat/statistics/reported_shards changes. The example in the comment contradicts the implementation and will mislead future maintainers.

A more accurate comment would be:

// SubscribeToExecutorStatusChanges subscribes to changes of executors' status key (e.g., RUNNING → DRAINING transitions) within a namespace.
Rules ❌ No requirements met

Repository Rules

PR Description Quality Standards: [How did you test it?] Fix test command path (use `./service/sharddistributor/...`). [Potential risks] Consider mentioning this is an internal API change (method rename) to clarify backward compatibility impact.
Options

Auto-apply is off → Gitar will not commit updates to this branch.
Display: compact → Showing less information.

Comment with these commands to change:

Auto-apply Compact
gitar auto-apply:on         
gitar display:verbose         

Was this helpful? React with 👍 / 👎 | Gitar

@arzonus arzonus merged commit 07c86bf into cadence-workflow:master Feb 12, 2026
42 of 43 checks passed
@arzonus arzonus deleted the refactor-subscribe-to-subscribe-to-executor-status-changes branch February 12, 2026 14:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants