diff --git a/pkg/controllers/updaterun/execution.go b/pkg/controllers/updaterun/execution.go index e8a48d4db..4f9f88a68 100644 --- a/pkg/controllers/updaterun/execution.go +++ b/pkg/controllers/updaterun/execution.go @@ -87,7 +87,7 @@ func (r *Reconciler) execute( } maxConcurrency, err := calculateMaxConcurrencyValue(updateRunStatus, updatingStageIndex) if err != nil { - return false, 0, err + return false, 0, fmt.Errorf("%w: %s", errStagedUpdatedAborted, err.Error()) } waitTime, err = r.executeUpdatingStage(ctx, updateRun, updatingStageIndex, toBeUpdatedBindings, maxConcurrency) // The execution has not finished yet. diff --git a/pkg/controllers/updaterun/validation.go b/pkg/controllers/updaterun/validation.go index ffa5ea3c2..fdf0c5391 100644 --- a/pkg/controllers/updaterun/validation.go +++ b/pkg/controllers/updaterun/validation.go @@ -136,8 +136,9 @@ func (r *Reconciler) validateStagesStatus( func validateUpdateStagesStatus(existingStageStatus []placementv1beta1.StageUpdatingStatus, updateRun placementv1beta1.UpdateRunObj) (int, int, error) { updatingStageIndex := -1 lastFinishedStageIndex := -1 + updateRunStatus := updateRun.GetUpdateRunStatus() // Remember the newly computed stage status. - newStageStatus := updateRun.GetUpdateRunStatus().StagesStatus + newStageStatus := updateRunStatus.StagesStatus // Make sure the number of stages in the updateRun are still the same. if len(existingStageStatus) != len(newStageStatus) { mismatchErr := fmt.Errorf("the number of stages in the updateRun has changed, new: %d, existing: %d", len(newStageStatus), len(existingStageStatus)) @@ -164,9 +165,12 @@ func validateUpdateStagesStatus(existingStageStatus []placementv1beta1.StageUpda return -1, -1, fmt.Errorf("%w: %s", errStagedUpdatedAborted, mismatchErr.Error()) } } - - var err error - updatingStageIndex, lastFinishedStageIndex, err = validateClusterUpdatingStatus(curStage, updatingStageIndex, lastFinishedStageIndex, &existingStageStatus[curStage], updateRun) + // Calculate maxConcurrency for the current stage. + maxConcurrency, err := calculateMaxConcurrencyValue(updateRunStatus, curStage) + if err != nil { + return -1, -1, fmt.Errorf("%w: %s", errStagedUpdatedAborted, err.Error()) + } + updatingStageIndex, lastFinishedStageIndex, err = validateClusterUpdatingStatus(curStage, updatingStageIndex, lastFinishedStageIndex, &existingStageStatus[curStage], maxConcurrency, updateRun) if err != nil { return -1, -1, err } @@ -181,6 +185,7 @@ func validateUpdateStagesStatus(existingStageStatus []placementv1beta1.StageUpda func validateClusterUpdatingStatus( curStage, updatingStageIndex, lastFinishedStageIndex int, stageStatus *placementv1beta1.StageUpdatingStatus, + maxConcurrency int, updateRun placementv1beta1.UpdateRunObj, ) (int, int, error) { stageSucceedCond := meta.FindStatusCondition(stageStatus.Conditions, string(placementv1beta1.StageUpdatingConditionSucceeded)) @@ -234,7 +239,21 @@ func validateClusterUpdatingStatus( return -1, -1, fmt.Errorf("%w: %s", errStagedUpdatedAborted, unexpectedErr.Error()) } updatingStageIndex = curStage - // TODO(arvindth): add validation to ensure updating cluster count should not exceed maxConcurrency. + // Collect the updating clusters. + updatingClusterCount := 0 + for j := range stageStatus.Clusters { + clusterStartedCond := meta.FindStatusCondition(stageStatus.Clusters[j].Conditions, string(placementv1beta1.ClusterUpdatingConditionStarted)) + clusterFinishedCond := meta.FindStatusCondition(stageStatus.Clusters[j].Conditions, string(placementv1beta1.ClusterUpdatingConditionSucceeded)) + // cluster is updating if it has started but not yet finished, we also consider failed clusters as updating clusters in execution. + if condition.IsConditionStatusTrue(clusterStartedCond, updateRun.GetGeneration()) && !(condition.IsConditionStatusTrue(clusterFinishedCond, updateRun.GetGeneration())) { + updatingClusterCount++ + } + } + if updatingClusterCount > maxConcurrency { + unexpectedErr := controller.NewUnexpectedBehaviorError(fmt.Errorf("the number of updating clusters `%d` in the updating stage `%s` exceeds maxConcurrency `%d`", updatingClusterCount, stageStatus.StageName, maxConcurrency)) + klog.ErrorS(unexpectedErr, "The number of updating clusters in the updating stage exceeds maxConcurrency", "updateRun", klog.KObj(updateRun)) + return -1, -1, fmt.Errorf("%w: %s", errStagedUpdatedAborted, unexpectedErr.Error()) + } } return updatingStageIndex, lastFinishedStageIndex, nil } diff --git a/pkg/controllers/updaterun/validation_test.go b/pkg/controllers/updaterun/validation_test.go index d6ad8215d..14fa2be14 100644 --- a/pkg/controllers/updaterun/validation_test.go +++ b/pkg/controllers/updaterun/validation_test.go @@ -40,6 +40,7 @@ func TestValidateClusterUpdatingStatus(t *testing.T) { updatingStageIndex int lastFinishedStageIndex int stageStatus *placementv1beta1.StageUpdatingStatus + maxConcurrency int wantErr error wantUpdatingStageIndex int wantLastFinishedStageIndex int @@ -144,6 +145,28 @@ func TestValidateClusterUpdatingStatus(t *testing.T) { wantUpdatingStageIndex: -1, wantLastFinishedStageIndex: -1, }, + { + name: "determineUpdatignStage should not return error if there are multiple clusters in an updating stage with no condition set (execution not started)", + curStage: 0, + updatingStageIndex: -1, + lastFinishedStageIndex: -1, + stageStatus: &placementv1beta1.StageUpdatingStatus{ + StageName: "test-stage", + Conditions: []metav1.Condition{generateTrueCondition(updateRun, placementv1beta1.StageUpdatingConditionProgressing)}, + Clusters: []placementv1beta1.ClusterUpdatingStatus{ + { + ClusterName: "cluster-1", + }, + { + ClusterName: "cluster-2", + }, + }, + }, + maxConcurrency: 1, + wantErr: nil, + wantUpdatingStageIndex: 0, + wantLastFinishedStageIndex: -1, + }, { name: "determineUpdatignStage should not return error if there are multiple clusters updating in an updating stage", curStage: 0, @@ -159,14 +182,67 @@ func TestValidateClusterUpdatingStatus(t *testing.T) { }, { ClusterName: "cluster-2", - Conditions: []metav1.Condition{generateTrueCondition(updateRun, placementv1beta1.ClusterUpdatingConditionStarted)}, + Conditions: []metav1.Condition{generateTrueCondition(updateRun, placementv1beta1.ClusterUpdatingConditionStarted), generateFalseCondition(updateRun, placementv1beta1.ClusterUpdatingConditionSucceeded)}, }, }, }, + maxConcurrency: 2, wantErr: nil, wantUpdatingStageIndex: 0, wantLastFinishedStageIndex: -1, }, + { + name: "determineUpdatignStage should not return error if multiple clusters have succeeded in an updating stage", + curStage: 0, + updatingStageIndex: -1, + lastFinishedStageIndex: -1, + stageStatus: &placementv1beta1.StageUpdatingStatus{ + StageName: "test-stage", + Conditions: []metav1.Condition{generateTrueCondition(updateRun, placementv1beta1.StageUpdatingConditionProgressing)}, + Clusters: []placementv1beta1.ClusterUpdatingStatus{ + { + ClusterName: "cluster-1", + Conditions: []metav1.Condition{generateTrueCondition(updateRun, placementv1beta1.ClusterUpdatingConditionStarted), generateTrueCondition(updateRun, placementv1beta1.ClusterUpdatingConditionSucceeded)}, + }, + { + ClusterName: "cluster-2", + Conditions: []metav1.Condition{generateTrueCondition(updateRun, placementv1beta1.ClusterUpdatingConditionStarted), generateTrueCondition(updateRun, placementv1beta1.ClusterUpdatingConditionSucceeded)}, + }, + }, + }, + maxConcurrency: 1, + wantErr: nil, + wantUpdatingStageIndex: 0, + wantLastFinishedStageIndex: -1, + }, + { + name: "validateClusterUpdatingStatus should return error if number of updating clusters exceeds maxConcurrency", + curStage: 0, + updatingStageIndex: -1, + lastFinishedStageIndex: -1, + stageStatus: &placementv1beta1.StageUpdatingStatus{ + StageName: "test-stage", + Conditions: []metav1.Condition{generateTrueCondition(updateRun, placementv1beta1.StageUpdatingConditionProgressing)}, + Clusters: []placementv1beta1.ClusterUpdatingStatus{ + { + ClusterName: "cluster-1", + Conditions: []metav1.Condition{generateTrueCondition(updateRun, placementv1beta1.ClusterUpdatingConditionStarted)}, + }, + { + ClusterName: "cluster-2", + Conditions: []metav1.Condition{generateTrueCondition(updateRun, placementv1beta1.ClusterUpdatingConditionStarted), generateFalseCondition(updateRun, placementv1beta1.ClusterUpdatingConditionSucceeded)}, + }, + { + ClusterName: "cluster-3", + Conditions: []metav1.Condition{generateTrueCondition(updateRun, placementv1beta1.ClusterUpdatingConditionStarted), generateFalseCondition(updateRun, placementv1beta1.ClusterUpdatingConditionSucceeded)}, + }, + }, + }, + maxConcurrency: 1, + wantErr: wrapErr(true, fmt.Errorf("the number of updating clusters `3` in the updating stage `test-stage` exceeds maxConcurrency `1`")), + wantUpdatingStageIndex: -1, + wantLastFinishedStageIndex: -1, + }, { name: "validateClusterUpdatingStatus should return -1 as the updatingStageIndex if no stage is updating", curStage: 0, @@ -188,6 +264,7 @@ func TestValidateClusterUpdatingStatus(t *testing.T) { StageName: "test-stage", Conditions: []metav1.Condition{generateTrueCondition(updateRun, placementv1beta1.StageUpdatingConditionProgressing)}, }, + maxConcurrency: 1, wantErr: nil, wantUpdatingStageIndex: 2, wantLastFinishedStageIndex: 1, @@ -213,7 +290,7 @@ func TestValidateClusterUpdatingStatus(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { gotUpdatingStageIndex, gotLastFinishedStageIndex, err := - validateClusterUpdatingStatus(test.curStage, test.updatingStageIndex, test.lastFinishedStageIndex, test.stageStatus, updateRun) + validateClusterUpdatingStatus(test.curStage, test.updatingStageIndex, test.lastFinishedStageIndex, test.stageStatus, test.maxConcurrency, updateRun) if test.wantErr == nil { if err != nil { t.Fatalf("validateClusterUpdatingStatus() got error = %+v, want error = nil", err)