Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 19 additions & 19 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1487,7 +1487,7 @@ const (
ShardDistributorStoreGetExecutorScope
ShardDistributorStoreGetStateScope
ShardDistributorStoreRecordHeartbeatScope
ShardDistributorStoreSubscribeScope
ShardDistributorStoreSubscribeToExecutorStatusChangesScope
ShardDistributorStoreSubscribeToAssignmentChangesScope
ShardDistributorStoreDeleteAssignedStatesScope

Expand Down Expand Up @@ -2164,24 +2164,24 @@ var ScopeDefs = map[ServiceIdx]map[ScopeIdx]scopeDefinition{
DiagnosticsWorkflowScope: {operation: "DiagnosticsWorkflow"},
},
ShardDistributor: {
ShardDistributorGetShardOwnerScope: {operation: "GetShardOwner"},
ShardDistributorWatchNamespaceStateScope: {operation: "WatchNamespaceState"},
ShardDistributorHeartbeatScope: {operation: "ExecutorHeartbeat"},
ShardDistributorAssignLoopScope: {operation: "ShardAssignLoop"},
ShardDistributorExecutorScope: {operation: "Executor"},
ShardDistributorStoreGetShardOwnerScope: {operation: "StoreGetShardOwner"},
ShardDistributorStoreAssignShardScope: {operation: "StoreAssignShard"},
ShardDistributorStoreAssignShardsScope: {operation: "StoreAssignShards"},
ShardDistributorStoreDeleteExecutorsScope: {operation: "StoreDeleteExecutors"},
ShardDistributorStoreGetShardStatsScope: {operation: "StoreGetShardStats"},
ShardDistributorStoreDeleteShardStatsScope: {operation: "StoreDeleteShardStats"},
ShardDistributorStoreGetHeartbeatScope: {operation: "StoreGetHeartbeat"},
ShardDistributorStoreGetExecutorScope: {operation: "StoreGetExecutor"},
ShardDistributorStoreGetStateScope: {operation: "StoreGetState"},
ShardDistributorStoreRecordHeartbeatScope: {operation: "StoreRecordHeartbeat"},
ShardDistributorStoreSubscribeScope: {operation: "StoreSubscribe"},
ShardDistributorStoreSubscribeToAssignmentChangesScope: {operation: "StoreSubscribeToAssignmentChanges"},
ShardDistributorStoreDeleteAssignedStatesScope: {operation: "StoreDeleteAssignedStates"},
ShardDistributorGetShardOwnerScope: {operation: "GetShardOwner"},
ShardDistributorWatchNamespaceStateScope: {operation: "WatchNamespaceState"},
ShardDistributorHeartbeatScope: {operation: "ExecutorHeartbeat"},
ShardDistributorAssignLoopScope: {operation: "ShardAssignLoop"},
ShardDistributorExecutorScope: {operation: "Executor"},
ShardDistributorStoreGetShardOwnerScope: {operation: "StoreGetShardOwner"},
ShardDistributorStoreAssignShardScope: {operation: "StoreAssignShard"},
ShardDistributorStoreAssignShardsScope: {operation: "StoreAssignShards"},
ShardDistributorStoreDeleteExecutorsScope: {operation: "StoreDeleteExecutors"},
ShardDistributorStoreGetShardStatsScope: {operation: "StoreGetShardStats"},
ShardDistributorStoreDeleteShardStatsScope: {operation: "StoreDeleteShardStats"},
ShardDistributorStoreGetHeartbeatScope: {operation: "StoreGetHeartbeat"},
ShardDistributorStoreGetExecutorScope: {operation: "StoreGetExecutor"},
ShardDistributorStoreGetStateScope: {operation: "StoreGetState"},
ShardDistributorStoreRecordHeartbeatScope: {operation: "StoreRecordHeartbeat"},
ShardDistributorStoreSubscribeToExecutorStatusChangesScope: {operation: "StoreSubscribeToExecutorStatusChanges"},
ShardDistributorStoreSubscribeToAssignmentChangesScope: {operation: "StoreSubscribeToAssignmentChanges"},
ShardDistributorStoreDeleteAssignedStatesScope: {operation: "StoreDeleteAssignedStates"},
},
}

Expand Down
4 changes: 4 additions & 0 deletions service/sharddistributor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ type (
// Default: 1 second
Period time.Duration `yaml:"period"`

// RebalanceCooldown is the minimum duration between shard rebalance operations
// Default: 250ms
RebalanceCooldown time.Duration `yaml:"rebalanceCooldown"`

// Timeout is the maximum duration of a single shard rebalance operation
// Default: 1 second
Timeout time.Duration `yaml:"timeout"`
Expand Down
14 changes: 13 additions & 1 deletion service/sharddistributor/leader/process/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const (
_defaultPeriod = time.Second
_defaultHeartbeatTTL = 10 * time.Second
_defaultTimeout = 1 * time.Second
_defaultCooldown = 250 * time.Millisecond
)

type processorFactory struct {
Expand Down Expand Up @@ -90,6 +91,9 @@ func NewProcessorFactory(
if cfg.Process.Timeout == 0 {
cfg.Process.Timeout = _defaultTimeout
}
if cfg.Process.RebalanceCooldown == 0 {
cfg.Process.RebalanceCooldown = _defaultCooldown
}

return &processorFactory{
logger: logger,
Expand Down Expand Up @@ -192,13 +196,21 @@ func (p *namespaceProcessor) runRebalancingLoop(ctx context.Context) {
return
}

nextRebalanceAllowedAt := p.timeSource.Now()

for {
select {
case <-ctx.Done():
p.logger.Info("Rebalancing loop cancelled.")
return

case update := <-updateChan:
// If an update comes in before the cooldown has expired,
// we wait until the cooldown has passed since the last rebalance before processing it.
// This ensures that we don't rebalance too frequently in response to a flurry of updates
p.timeSource.Sleep(nextRebalanceAllowedAt.Sub(p.timeSource.Now()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make sure:

We already de coupled the sender right? So this will not block the sender.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this loop already doesn't block the watch event processing, so it's safe to have the cooldown here 👍

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Bug: Cooldown Sleep is not context-aware, blocks shutdown

p.timeSource.Sleep(...) at line 211 is not context-aware. If the context is cancelled (e.g., during graceful shutdown via Terminate()), the goroutine will block for up to the remaining cooldown duration (up to 250ms) before it can observe the cancellation. While 250ms is short, this introduces unnecessary shutdown latency and breaks the expected contract that context cancellation should promptly stop processing.

The TimeSource interface already provides SleepWithContext(ctx, d) which respects context cancellation. Using it here would allow the loop to exit promptly when the context is cancelled during a cooldown sleep.

Additionally, when the cooldown hasn't expired and a negative duration is computed (i.e., nextRebalanceAllowedAt is already in the past), time.Sleep with a negative duration returns immediately — this is fine behavior, but using SleepWithContext is still preferred for the cancellation benefit.

Suggested fix:

			if err := p.timeSource.SleepWithContext(ctx, nextRebalanceAllowedAt.Sub(p.timeSource.Now())); err != nil {
				return
			}

Was this helpful? React with 👍 / 👎

nextRebalanceAllowedAt = p.timeSource.Now().Add(p.cfg.RebalanceCooldown)

p.logger.Info("Rebalancing triggered", tag.Dynamic("reason", update))
if err := p.rebalanceShards(ctx); err != nil {
p.logger.Error("rebalance failed", tag.Error(err))
Expand All @@ -213,7 +225,7 @@ func (p *namespaceProcessor) runRebalanceTriggeringLoop(ctx context.Context) (<-
// Buffered channel to allow one pending rebalance trigger.
triggerChan := make(chan string, 1)

updateChan, err := p.shardStore.Subscribe(ctx, p.namespaceCfg.Name)
updateChan, err := p.shardStore.SubscribeToExecutorStatusChanges(ctx, p.namespaceCfg.Name)
if err != nil {
p.logger.Error("Failed to subscribe to state changes, stopping rebalancing loop.", tag.Error(err))
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions service/sharddistributor/leader/process/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestRunAndTerminate(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

mocks.store.EXPECT().GetState(gomock.Any(), mocks.cfg.Name).Return(&store.NamespaceState{}, nil).AnyTimes()
mocks.store.EXPECT().Subscribe(gomock.Any(), mocks.cfg.Name).Return(make(chan int64), nil).AnyTimes()
mocks.store.EXPECT().SubscribeToExecutorStatusChanges(gomock.Any(), mocks.cfg.Name).Return(make(chan int64), nil).AnyTimes()

err := processor.Run(ctx)
require.NoError(t, err)
Expand Down Expand Up @@ -409,7 +409,7 @@ func TestRunLoop_SubscriptionError(t *testing.T) {

expectedErr := errors.New("subscription failed")
mocks.store.EXPECT().GetState(gomock.Any(), mocks.cfg.Name).Return(&store.NamespaceState{}, nil)
mocks.store.EXPECT().Subscribe(gomock.Any(), mocks.cfg.Name).Return(nil, expectedErr)
mocks.store.EXPECT().SubscribeToExecutorStatusChanges(gomock.Any(), mocks.cfg.Name).Return(nil, expectedErr)

var wg sync.WaitGroup
wg.Add(1)
Expand All @@ -428,7 +428,7 @@ func TestRunLoop_ContextCancellation(t *testing.T) {

// Setup for the initial call to rebalanceShards and the subscription
mocks.store.EXPECT().GetState(gomock.Any(), mocks.cfg.Name).Return(&store.NamespaceState{}, nil)
mocks.store.EXPECT().Subscribe(gomock.Any(), mocks.cfg.Name).Return(make(chan int64), nil)
mocks.store.EXPECT().SubscribeToExecutorStatusChanges(gomock.Any(), mocks.cfg.Name).Return(make(chan int64), nil)

processor.wg.Add(1)
// Run the process in a separate goroutine to avoid blocking the test
Expand Down
71 changes: 45 additions & 26 deletions service/sharddistributor/store/etcd/executorstore/etcdstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,46 +270,65 @@ func (s *executorStoreImpl) SubscribeToAssignmentChanges(ctx context.Context, na
return s.shardCache.Subscribe(ctx, namespace)
}

func (s *executorStoreImpl) Subscribe(ctx context.Context, namespace string) (<-chan int64, error) {
func (s *executorStoreImpl) SubscribeToExecutorStatusChanges(ctx context.Context, namespace string) (<-chan int64, error) {
revisionChan := make(chan int64, 1)
watchPrefix := etcdkeys.BuildExecutorsPrefix(s.prefix, namespace)

go func() {
defer close(revisionChan)
watchChan := s.client.Watch(ctx, watchPrefix, clientv3.WithPrefix())
watchChan := s.client.Watch(ctx,
etcdkeys.BuildExecutorsPrefix(s.prefix, namespace),
clientv3.WithPrefix(),
clientv3.WithPrevKV(),
)

for watchResp := range watchChan {
if err := watchResp.Err(); err != nil {
return
}
isSignificantChange := false
for _, event := range watchResp.Events {
if !event.IsCreate() && !event.IsModify() {
isSignificantChange = true
break
}
_, keyType, err := etcdkeys.ParseExecutorKey(s.prefix, namespace, string(event.Kv.Key))
if err != nil {
continue
}
// Treat heartbeat, assigned_state and statistics updates as non-significant for rebalancing.
if keyType != etcdkeys.ExecutorHeartbeatKey &&
keyType != etcdkeys.ExecutorAssignedStateKey &&
keyType != etcdkeys.ExecutorShardStatisticsKey {
isSignificantChange = true
break
}

if !s.hasExecutorStatusChanged(watchResp, namespace) {
continue
}
if isSignificantChange {
select {
case <-revisionChan:
default:
}
revisionChan <- watchResp.Header.Revision

// If the channel is full, it means the previous revision hasn't been processed yet.
// Pop the old revision to make room for the new one, ensuring we always have the latest revision.
select {
case <-revisionChan:
default:
}

revisionChan <- watchResp.Header.Revision
}
}()

return revisionChan, nil
}

// hasExecutorStatusChanged checks if any of the events in the watch response correspond to changes in executor status.
func (s *executorStoreImpl) hasExecutorStatusChanged(watchResp clientv3.WatchResponse, namespace string) bool {
for _, event := range watchResp.Events {
_, keyType, err := etcdkeys.ParseExecutorKey(s.prefix, namespace, string(event.Kv.Key))
if err != nil {
s.logger.Warn("Received watch event with unrecognized key format", tag.Key(string(event.Kv.Key)))
continue
}

// Only consider changes to the ExecutorStatusKey as significant for triggering a revision update.
if keyType != etcdkeys.ExecutorStatusKey {
continue
}

// If the previous value is the same as the new value, it means the status didn't actually change
if event.PrevKv != nil && string(event.PrevKv.Value) == string(event.Kv.Value) {
continue
}

return true
}

return false
}

func (s *executorStoreImpl) AssignShards(ctx context.Context, namespace string, request store.AssignShardsRequest, guard store.GuardFunc) (err error) {
var ops []clientv3.Op
var opsElse []clientv3.Op
Expand Down
100 changes: 74 additions & 26 deletions service/sharddistributor/store/etcd/executorstore/etcdstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ func TestGuardedOperations(t *testing.T) {
}

// TestSubscribe verifies that the subscription channel receives notifications for significant changes.
func TestSubscribe(t *testing.T) {
func TestSubscribeToExecutorStatusChanges(t *testing.T) {
tc := testhelper.SetupStoreTestCluster(t)
executorStore := createStore(t, tc)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Expand All @@ -413,37 +413,85 @@ func TestSubscribe(t *testing.T) {
executorID := "exec-sub"

// Start subscription
sub, err := executorStore.Subscribe(ctx, tc.Namespace)
sub, err := executorStore.SubscribeToExecutorStatusChanges(ctx, tc.Namespace)
require.NoError(t, err)

// Manually put a heartbeat update, which is an insignificant change
heartbeatKey := etcdkeys.BuildExecutorKey(tc.EtcdPrefix, tc.Namespace, executorID, "heartbeat")
_, err = tc.Client.Put(ctx, heartbeatKey, "timestamp")
require.NoError(t, err)
// Test case #1: Update heartbeat without changing status or reported shards - should NOT trigger notification
{
// Manually put a heartbeat update, which is an insignificant change
heartbeatKey := etcdkeys.BuildExecutorKey(tc.EtcdPrefix, tc.Namespace, executorID, "heartbeat")
_, err = tc.Client.Put(ctx, heartbeatKey, "timestamp")
require.NoError(t, err)

select {
case <-sub:
t.Fatal("Should not receive notification for a heartbeat-only update")
case <-time.After(100 * time.Millisecond):
// Expected behavior
select {
case <-sub:
t.Fatal("Should not receive notification for a heartbeat-only update")
case <-time.After(100 * time.Millisecond):
// Expected behavior
}
}

// Now update the reported shards, which IS a significant change
reportedShardsKey := etcdkeys.BuildExecutorKey(tc.EtcdPrefix, tc.Namespace, executorID, "reported_shards")
require.NoError(t, err)
writer, err := common.NewRecordWriter(tc.Compression)
require.NoError(t, err)
compressedShards, err := writer.Write([]byte(`{"shard-1":{"status":"running"}}`))
require.NoError(t, err)
_, err = tc.Client.Put(ctx, reportedShardsKey, string(compressedShards))
require.NoError(t, err)
// Test case #2: Update reported shards without changing status - should NOT trigger notification
{
// Manually put a reported shards update, which is an insignificant change
reportedShardsKey := etcdkeys.BuildExecutorKey(tc.EtcdPrefix, tc.Namespace, executorID, "reported_shards")
writer, err := common.NewRecordWriter(tc.Compression)
require.NoError(t, err)
compressedShards, err := writer.Write([]byte(`{"shard-1":{"status":"running"}}`))
require.NoError(t, err)
_, err = tc.Client.Put(ctx, reportedShardsKey, string(compressedShards))
require.NoError(t, err)

select {
case <-sub:
t.Fatal("Should not receive notification for a reported-shards-only update")
case <-time.After(100 * time.Millisecond):
// Expected behavior
}
}

select {
case rev, ok := <-sub:
require.True(t, ok, "Channel should be open")
assert.Greater(t, rev, int64(0), "Should receive a valid revision for reported shards change")
case <-time.After(1 * time.Second):
t.Fatal("Should have received a notification for a reported shards change")
// Test case #3: Update status without prevKV - should trigger notification
{
statusKey := etcdkeys.BuildExecutorKey(tc.EtcdPrefix, tc.Namespace, executorID, "status")
_, err = tc.Client.Put(ctx, statusKey, stringStatus(types.ExecutorStatusDRAINING))
require.NoError(t, err)

select {
case rev, ok := <-sub:
require.True(t, ok, "Channel should be open")
assert.Greater(t, rev, int64(0), "Should receive a valid revision for status change")
case <-time.After(1 * time.Second):
t.Fatal("Should have received a notification for a status change")
}
}

// Test case #4: Update status with prevKV but the same value - should NOT trigger notification
{
statusKey := etcdkeys.BuildExecutorKey(tc.EtcdPrefix, tc.Namespace, executorID, "status")
_, err = tc.Client.Put(ctx, statusKey, stringStatus(types.ExecutorStatusDRAINING))
require.NoError(t, err)

select {
case <-sub:
t.Fatal("Should not receive notification")
case <-time.After(100 * time.Millisecond):
// Expected behavior
}
}

// Test case #5: Update status with prevKV - should trigger notification
{
statusKey := etcdkeys.BuildExecutorKey(tc.EtcdPrefix, tc.Namespace, executorID, "status")
_, err = tc.Client.Put(ctx, statusKey, stringStatus(types.ExecutorStatusACTIVE))
require.NoError(t, err)

select {
case rev, ok := <-sub:
require.True(t, ok, "Channel should be open")
assert.Greater(t, rev, int64(0), "Should receive a valid revision for status change")
case <-time.After(1 * time.Second):
t.Fatal("Should have received a notification for a status change")
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion service/sharddistributor/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ type Store interface {
// AssignShard assigns a single shard to an executor within a namespace.
AssignShard(ctx context.Context, namespace string, shardID string, executorID string) error

Subscribe(ctx context.Context, namespace string) (<-chan int64, error)
// SubscribeToExecutorStatusChanges subscribes to changes of executors' status key within a namespace.
SubscribeToExecutorStatusChanges(ctx context.Context, namespace string) (<-chan int64, error)
DeleteExecutors(ctx context.Context, namespace string, executorIDs []string, guard GuardFunc) error

// DeleteAssignedStates deletes the assigned states of multiple executors within a namespace.
Expand Down
Loading
Loading