fix(shard-distributor): fix high-frequent triggering of the rebalancing loop#7696
Conversation
|
gitar apply your suggestions |
Applied both suggestions:
Changes have been committed. |
0237e40 to
cd8755a
Compare
cd8755a to
91826c2
Compare
|
|
||
| // Cooldown is the minimum duration between shard rebalance operations | ||
| // Default: 250ms | ||
| Cooldown time.Duration `yaml:"cooldown"` |
There was a problem hiding this comment.
Nit: Maybe we should call this "RebalanceCooldown" or something like that
There was a problem hiding this comment.
thx, yeah, renamed it 👍
| // If an update comes in before the cooldown has expired, | ||
| // we wait until the cooldown has passed since the last rebalance before processing it. | ||
| // This ensures that we don't rebalance too frequently in response to a flurry of updates | ||
| p.timeSource.Sleep(nextRebalanceAllowedAt.Sub(p.timeSource.Now())) |
There was a problem hiding this comment.
Just to make sure:
We already de coupled the sender right? So this will not block the sender.
There was a problem hiding this comment.
Yeah, this loop already doesn't block the watch event processing, so it's safe to have the cooldown here 👍
| // If an update comes in before the cooldown has expired, | ||
| // we wait until the cooldown has passed since the last rebalance before processing it. | ||
| // This ensures that we don't rebalance too frequently in response to a flurry of updates | ||
| p.timeSource.Sleep(nextRebalanceAllowedAt.Sub(p.timeSource.Now())) |
There was a problem hiding this comment.
⚠️ Bug: Cooldown Sleep is not context-aware, blocks shutdown
p.timeSource.Sleep(...) at line 211 is not context-aware. If the context is cancelled (e.g., during graceful shutdown via Terminate()), the goroutine will block for up to the remaining cooldown duration (up to 250ms) before it can observe the cancellation. While 250ms is short, this introduces unnecessary shutdown latency and breaks the expected contract that context cancellation should promptly stop processing.
The TimeSource interface already provides SleepWithContext(ctx, d) which respects context cancellation. Using it here would allow the loop to exit promptly when the context is cancelled during a cooldown sleep.
Additionally, when the cooldown hasn't expired and a negative duration is computed (i.e., nextRebalanceAllowedAt is already in the past), time.Sleep with a negative duration returns immediately — this is fine behavior, but using SleepWithContext is still preferred for the cancellation benefit.
Suggested fix:
if err := p.timeSource.SleepWithContext(ctx, nextRebalanceAllowedAt.Sub(p.timeSource.Now())); err != nil {
return
}
Was this helpful? React with 👍 / 👎
🔍 CI failure analysis for ee56813: Test failure in unrelated election package due to mock cleanup timing race condition - same flaky test that failed in earlier runs.IssueTest Root CauseThe test expects DetailsThis failure is unrelated to the PR changes. The PR modifies:
The failing test is in Historical Evidence: This exact same test failure occurred in an earlier CI run for this PR (commit faba8f8), confirming it's a pre-existing flaky test unrelated to these changes. Code Review
|
| Auto-apply | Compact |
|
|
Was this helpful? React with 👍 / 👎 | Gitar
What changed?
Subscribemethod has been modified to filter only events that contain the executor's changed status keySubscribemethod has been renamed toSubscribeToExecutorStatusChangesLeaderProcess.Cooldown(250ms by default).Why?
GetShardOwnermethod. This change should reduce the number of events that trigger the rebalancing loop. Mostly, the triggers were caused byreported_shardsandstatusetcd keys that were updated mostly every 3 seconds by each executor.LeaderProcessor.PeriodLeaderProcess.Cooldownhas been added to reduce the rate of triggering the rebalancing loop in case of a burst of status changes.How did you test it?
ETCD=1 go test /service/sharddistributor/...Potential risks
N/A
Release notes
N/A
Documentation Changes
N/A
Reviewer Validation
PR Description Quality (check these before reviewing code):
go testinvocation)