Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions service/sharddistributor/handler/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (h *executor) Heartbeat(ctx context.Context, request *types.ExecutorHeartbe
return nil, &types.BadRequestError{Message: "migration mode is local passthrough, no calls to heartbeat allowed"}
// From SD perspective the behaviour is the same
case types.MigrationModeLOCALPASSTHROUGHSHADOW, types.MigrationModeDISTRIBUTEDPASSTHROUGH:
assignedShards, err = h.assignShardsInCurrentHeartbeat(ctx, request)
assignedShards, err = h.assignShardsInCurrentHeartbeat(ctx, request, assignedShards)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -134,33 +134,34 @@ func (h *executor) emitShardAssignmentMetrics(namespace string, heartbeatTime ti
}

// assignShardsInCurrentHeartbeat is used during the migration phase to assign the shards to the executors according to what is reported during the heartbeat
func (h *executor) assignShardsInCurrentHeartbeat(ctx context.Context, request *types.ExecutorHeartbeatRequest) (*store.AssignedState, error) {
assignedShards := store.AssignedState{
func (h *executor) assignShardsInCurrentHeartbeat(ctx context.Context, request *types.ExecutorHeartbeatRequest, assignedShards *store.AssignedState) (*store.AssignedState, error) {
modRevision := int64(0)
if assignedShards != nil {
modRevision = assignedShards.ModRevision
}
newState := store.AssignedState{
AssignedShards: make(map[string]*types.ShardAssignment),
LastUpdated: h.timeSource.Now().UTC(),
ModRevision: int64(0),
}
err := h.storage.DeleteExecutors(ctx, request.GetNamespace(), []string{request.GetExecutorID()}, store.NopGuard())
if err != nil {
return nil, &types.InternalServiceError{Message: fmt.Sprintf("failed to delete assigned shards: %v", err)}
ModRevision: modRevision,
}

for shard := range request.GetShardStatusReports() {
assignedShards.AssignedShards[shard] = &types.ShardAssignment{
newState.AssignedShards[shard] = &types.ShardAssignment{
Status: types.AssignmentStatusREADY,
}
}
assignShardsRequest := store.AssignShardsRequest{
NewState: &store.NamespaceState{
ShardAssignments: map[string]store.AssignedState{
request.GetExecutorID(): assignedShards,
request.GetExecutorID(): newState,
},
},
}
err = h.storage.AssignShards(ctx, request.GetNamespace(), assignShardsRequest, store.NopGuard())
err := h.storage.AssignShards(ctx, request.GetNamespace(), assignShardsRequest, store.NopGuard())
if err != nil {
return nil, &types.InternalServiceError{Message: fmt.Sprintf("failed to assign shards in the current heartbeat: %v", err)}
}
return &assignedShards, nil
return &newState, nil
}

func _convertResponse(shards *store.AssignedState, mode types.MigrationMode) *types.ExecutorHeartbeatResponse {
Expand Down
49 changes: 2 additions & 47 deletions service/sharddistributor/handler/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ func TestHeartbeat(t *testing.T) {
}

mockStore.EXPECT().GetHeartbeat(gomock.Any(), namespace, executorID).Return(&previousHeartbeat, &assignedState, nil)
mockStore.EXPECT().DeleteExecutors(gomock.Any(), namespace, []string{executorID}, gomock.Any()).Return(nil)
mockStore.EXPECT().AssignShards(gomock.Any(), namespace, gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, namespace string, request store.AssignShardsRequest, guard store.GuardFunc) error {
// Expect to Assign the shard in the request
Expand Down Expand Up @@ -259,49 +258,6 @@ func TestHeartbeat(t *testing.T) {
)

// Test Case 8: Heartbeat with executor associated with distributed passthrough
t.Run("MigrationModeDISTRIBUTEDPASSTHROUGHDeletionFailure", func(t *testing.T) {
ctrl := gomock.NewController(t)
mockStore := store.NewMockStore(ctrl)
mockTimeSource := clock.NewMockedTimeSource()
shardDistributionCfg := config.ShardDistribution{
Namespaces: []config.Namespace{{Name: namespace, Mode: config.MigrationModeLOCALPASSTHROUGHSHADOW}},
}
cfg := newConfig(t, []configEntry{{dynamicproperties.ShardDistributorMigrationMode, config.MigrationModeLOCALPASSTHROUGHSHADOW}})
handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg, cfg, metrics.NoopClient)

req := &types.ExecutorHeartbeatRequest{
Namespace: namespace,
ExecutorID: executorID,
Status: types.ExecutorStatusACTIVE,
ShardStatusReports: map[string]*types.ShardStatusReport{
"shard0": {Status: types.ShardStatusREADY, ShardLoad: 1.0},
},
}

previousHeartbeat := store.HeartbeatState{
LastHeartbeat: now,
Status: types.ExecutorStatusACTIVE,
ReportedShards: map[string]*types.ShardStatusReport{
"shard1": {Status: types.ShardStatusREADY, ShardLoad: 1.0},
},
}

assignedState := store.AssignedState{
AssignedShards: map[string]*types.ShardAssignment{
"shard1": {Status: types.AssignmentStatusREADY},
},
}

mockStore.EXPECT().GetHeartbeat(gomock.Any(), namespace, executorID).Return(&previousHeartbeat, &assignedState, nil)
expectedErr := errors.New("deletion failed")
mockStore.EXPECT().DeleteExecutors(gomock.Any(), namespace, []string{executorID}, gomock.Any()).Return(expectedErr)

_, err := handler.Heartbeat(ctx, req)
require.Error(t, err)
require.Contains(t, err.Error(), expectedErr.Error())
})

// Test Case 9: Heartbeat with executor associated with distributed passthrough
t.Run("MigrationModeDISTRIBUTEDPASSTHROUGHAssignmentFailure", func(t *testing.T) {
ctrl := gomock.NewController(t)
mockStore := store.NewMockStore(ctrl)
Expand Down Expand Up @@ -336,7 +292,6 @@ func TestHeartbeat(t *testing.T) {
}

mockStore.EXPECT().GetHeartbeat(gomock.Any(), namespace, executorID).Return(&previousHeartbeat, &assignedState, nil)
mockStore.EXPECT().DeleteExecutors(gomock.Any(), namespace, []string{executorID}, gomock.Any()).Return(nil)
expectedErr := errors.New("assignemnt failed")
mockStore.EXPECT().AssignShards(gomock.Any(), namespace, gomock.Any(), gomock.Any()).Return(expectedErr)

Expand All @@ -345,7 +300,7 @@ func TestHeartbeat(t *testing.T) {
require.Contains(t, err.Error(), expectedErr.Error())
})

// Test Case 10: Heartbeat with metadata validation failure - too many keys
// Test Case 9: Heartbeat with metadata validation failure - too many keys
t.Run("MetadataValidationTooManyKeys", func(t *testing.T) {
ctrl := gomock.NewController(t)
mockStore := store.NewMockStore(ctrl)
Expand Down Expand Up @@ -374,7 +329,7 @@ func TestHeartbeat(t *testing.T) {
require.Contains(t, err.Error(), "invalid metadata: metadata has 33 keys, which exceeds the maximum of 32")
})

// Test Case 11: Heartbeat with executor associated with MigrationModeLOCALPASSTHROUGH (should error)
// Test Case 10: Heartbeat with executor associated with MigrationModeLOCALPASSTHROUGH (should error)
t.Run("MigrationModeLOCALPASSTHROUGH", func(t *testing.T) {
ctrl := gomock.NewController(t)
mockStore := store.NewMockStore(ctrl)
Expand Down