Skip to content

Commit efe044d

Browse files
authored
fix(shard distributor): remove heartbeat write cooldown (#7513)
<!-- Describe what has changed in this PR --> **What changed?** - Removed `_heartbeatRefreshRate` and the block that early returns in service/sharddistributor/handler/executor.go, so executor heartbeats are always persisted. - Also updated TestHeartbeat in service/sharddistributor/handler/executor_test.go. Replaced the “within/after refresh rate” cases that were present before the removal of `_heartbeatRereshRate` with a single test that expects `RecordHeartbeat` to be called on a second heartbeat with the same status. <!-- Tell your future self why have you made these changes --> **Why?** - Previously, the handler skipped writing some heartbeats when: - the migration mode was 'onboarding', a previous heartbeat existed, the status was unchanged, - and most importantly the new heartbeat arrived within `**_heartbeatRefreshRate**` (2s) of the last one. - This meant the stored LastHeartbeat in etcd could lag behind the real heartbeat rate. With a heartbeat ttl being the same value (2s), healthy executors could be misclassified as stale and removed, which matched what we saw in the canary (executors disappearing from GetState, and shards "collapsing" onto only a few executors). - Removing the write cooldown gives a simpler and safer behavior: - executors heartbeat roughly once a second (in development), - every heartbeat is persisted, - the staleness check is based on the real heartbeat frequency, and can't be interrupted by another setting somewhere else in the code that may gate it Two other alternatives were considered, instead of removing the check and cooldown: - Increasing the heartbeat TTL (e.g., to 5–10s) while keeping the cooldown. - or decreasing `_heartbeatRefreshRate` (e.g., to 1s). Both of these alternatives would reduce the chance of misclassifying healthy executors as stale, but they keep a hidden coupling between heartbeat.TTL and the write cooldown. Removing the cooldown entirely makes the behavior easier to reason about and avoids this subtle issue than can happen in configration. <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** - Updated and ran unit tests in service/sharddistributor/handler/executor_test.go (TestHeartbeat). - ran the sharddistributor canary with multiple executors and observed stable executor counts and shard distribution. <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** - Higher etcd write load for heartbeats: we now persist every heartbeat instead of at most once per `_heartbeatRefreshRate`. - Adopters without external rate limiting (again, not sure if this is relevant) may see more frequent heartbeat writes than before, so they should be aware that this change removes a local write throttle and rely on their own rate limiting <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes** Signed-off-by: Andreas Holt <6665487+AndreasHolt@users.noreply.github.com>
1 parent 1836420 commit efe044d

File tree

2 files changed

+12
-52
lines changed

2 files changed

+12
-52
lines changed

service/sharddistributor/handler/executor.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ import (
1616
)
1717

1818
const (
19-
_heartbeatRefreshRate = 2 * time.Second
20-
2119
_maxMetadataKeys = 32
2220
_maxMetadataKeyLength = 128
2321
_maxMetadataValueSize = 512 * 1024 // 512KB
@@ -76,15 +74,6 @@ func (h *executor) Heartbeat(ctx context.Context, request *types.ExecutorHeartbe
7674
shardAssignedInBackground = false
7775
}
7876

79-
// If the state has changed we need to update heartbeat data.
80-
// Otherwise, we want to do it with controlled frequency - at most every _heartbeatRefreshRate.
81-
if previousHeartbeat != nil && request.Status == previousHeartbeat.Status && mode == types.MigrationModeONBOARDED {
82-
lastHeartbeatTime := previousHeartbeat.LastHeartbeat
83-
if heartbeatTime.Sub(lastHeartbeatTime) < _heartbeatRefreshRate {
84-
return _convertResponse(assignedShards, mode), nil
85-
}
86-
}
87-
8877
newHeartbeat := store.HeartbeatState{
8978
LastHeartbeat: heartbeatTime,
9079
Status: request.Status,

service/sharddistributor/handler/executor_test.go

Lines changed: 12 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ func TestHeartbeat(t *testing.T) {
5252
require.NoError(t, err)
5353
})
5454

55-
// Test Case 2: Subsequent Heartbeat within the refresh rate (no update)
56-
t.Run("SubsequentHeartbeatWithinRate", func(t *testing.T) {
55+
// Test Case 2: Subsequent heartbeat records a new heartbeat
56+
t.Run("SubsequentHeartbeatRecords", func(t *testing.T) {
5757
ctrl := gomock.NewController(t)
5858
mockStore := store.NewMockStore(ctrl)
5959
mockTimeSource := clock.NewMockedTimeSourceAt(now)
@@ -72,46 +72,17 @@ func TestHeartbeat(t *testing.T) {
7272
Status: types.ExecutorStatusACTIVE,
7373
}
7474

75-
mockStore.EXPECT().GetHeartbeat(gomock.Any(), namespace, executorID).Return(&previousHeartbeat, nil, nil)
76-
77-
_, err := handler.Heartbeat(ctx, req)
78-
require.NoError(t, err)
79-
})
80-
81-
// Test Case 3: Subsequent Heartbeat after refresh rate (with update)
82-
t.Run("SubsequentHeartbeatAfterRate", func(t *testing.T) {
83-
ctrl := gomock.NewController(t)
84-
mockStore := store.NewMockStore(ctrl)
85-
mockTimeSource := clock.NewMockedTimeSourceAt(now)
86-
shardDistributionCfg := config.ShardDistribution{}
87-
migrationConfig := newMigrationConfig(t, []configEntry{})
88-
handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg, migrationConfig, metrics.NoopClient)
89-
90-
req := &types.ExecutorHeartbeatRequest{
91-
Namespace: namespace,
92-
ExecutorID: executorID,
93-
Status: types.ExecutorStatusACTIVE,
94-
}
95-
96-
previousHeartbeat := store.HeartbeatState{
97-
LastHeartbeat: now,
98-
Status: types.ExecutorStatusACTIVE,
99-
}
100-
101-
// Advance time
102-
mockTimeSource.Advance(_heartbeatRefreshRate + time.Second)
103-
10475
mockStore.EXPECT().GetHeartbeat(gomock.Any(), namespace, executorID).Return(&previousHeartbeat, nil, nil)
10576
mockStore.EXPECT().RecordHeartbeat(gomock.Any(), namespace, executorID, store.HeartbeatState{
106-
LastHeartbeat: mockTimeSource.Now().UTC(),
77+
LastHeartbeat: now,
10778
Status: types.ExecutorStatusACTIVE,
10879
})
10980

11081
_, err := handler.Heartbeat(ctx, req)
11182
require.NoError(t, err)
11283
})
11384

114-
// Test Case 4: Status Change (with update)
85+
// Test Case 3: Status Change (with update)
11586
t.Run("StatusChange", func(t *testing.T) {
11687
ctrl := gomock.NewController(t)
11788
mockStore := store.NewMockStore(ctrl)
@@ -141,7 +112,7 @@ func TestHeartbeat(t *testing.T) {
141112
require.NoError(t, err)
142113
})
143114

144-
// Test Case 5: Storage Error
115+
// Test Case 4: Storage Error
145116
t.Run("StorageError", func(t *testing.T) {
146117
ctrl := gomock.NewController(t)
147118
mockStore := store.NewMockStore(ctrl)
@@ -164,7 +135,7 @@ func TestHeartbeat(t *testing.T) {
164135
require.Contains(t, err.Error(), expectedErr.Error())
165136
})
166137

167-
// Test Case 6: Heartbeat with executor associated invalid migration mode
138+
// Test Case 5: Heartbeat with executor associated invalid migration mode
168139
t.Run("MigrationModeInvald", func(t *testing.T) {
169140
ctrl := gomock.NewController(t)
170141
mockStore := store.NewMockStore(ctrl)
@@ -193,7 +164,7 @@ func TestHeartbeat(t *testing.T) {
193164
require.Contains(t, err.Error(), expectedErr.Error())
194165
})
195166

196-
// Test Case 7: Heartbeat with executor associated with local passthrough mode
167+
// Test Case 6: Heartbeat with executor associated with local passthrough mode
197168
t.Run("MigrationModeLocalPassthrough", func(t *testing.T) {
198169
ctrl := gomock.NewController(t)
199170
mockStore := store.NewMockStore(ctrl)
@@ -222,7 +193,7 @@ func TestHeartbeat(t *testing.T) {
222193
require.Contains(t, err.Error(), expectedErr.Error())
223194
})
224195

225-
// Test Case 8: Heartbeat with executor associated with local passthrough shadow
196+
// Test Case 7: Heartbeat with executor associated with local passthrough shadow
226197
t.Run("MigrationModeLocalPassthroughWithAssignmentChanges", func(t *testing.T) {
227198
ctrl := gomock.NewController(t)
228199
mockStore := store.NewMockStore(ctrl)
@@ -287,7 +258,7 @@ func TestHeartbeat(t *testing.T) {
287258
},
288259
)
289260

290-
// Test Case 9: Heartbeat with executor associated with distributed passthrough
261+
// Test Case 8: Heartbeat with executor associated with distributed passthrough
291262
t.Run("MigrationModeDISTRIBUTEDPASSTHROUGHDeletionFailure", func(t *testing.T) {
292263
ctrl := gomock.NewController(t)
293264
mockStore := store.NewMockStore(ctrl)
@@ -330,7 +301,7 @@ func TestHeartbeat(t *testing.T) {
330301
require.Contains(t, err.Error(), expectedErr.Error())
331302
})
332303

333-
// Test Case 10: Heartbeat with executor associated with distributed passthrough
304+
// Test Case 9: Heartbeat with executor associated with distributed passthrough
334305
t.Run("MigrationModeDISTRIBUTEDPASSTHROUGHAssignmentFailure", func(t *testing.T) {
335306
ctrl := gomock.NewController(t)
336307
mockStore := store.NewMockStore(ctrl)
@@ -374,7 +345,7 @@ func TestHeartbeat(t *testing.T) {
374345
require.Contains(t, err.Error(), expectedErr.Error())
375346
})
376347

377-
// Test Case 11: Heartbeat with metadata validation failure - too many keys
348+
// Test Case 10: Heartbeat with metadata validation failure - too many keys
378349
t.Run("MetadataValidationTooManyKeys", func(t *testing.T) {
379350
ctrl := gomock.NewController(t)
380351
mockStore := store.NewMockStore(ctrl)
@@ -403,7 +374,7 @@ func TestHeartbeat(t *testing.T) {
403374
require.Contains(t, err.Error(), "invalid metadata: metadata has 33 keys, which exceeds the maximum of 32")
404375
})
405376

406-
// Test Case: Heartbeat with executor associated with MigrationModeLOCALPASSTHROUGH (should error)
377+
// Test Case 11: Heartbeat with executor associated with MigrationModeLOCALPASSTHROUGH (should error)
407378
t.Run("MigrationModeLOCALPASSTHROUGH", func(t *testing.T) {
408379
ctrl := gomock.NewController(t)
409380
mockStore := store.NewMockStore(ctrl)

0 commit comments

Comments
 (0)