Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 84 additions & 15 deletions pkg/controllers/rollout/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,23 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
// marked for deletion yet. Note that even unscheduled bindings will receive this update;
// as apply strategy changes might have an effect on its Applied and Available status, and
// consequently on the rollout progress.
if err := r.processApplyStrategyUpdates(ctx, &crp, allBindings); err != nil {
applyStrategyUpdated, err := r.processApplyStrategyUpdates(ctx, &crp, allBindings)
switch {
case err != nil:
klog.ErrorS(err, "Failed to process apply strategy updates", "clusterResourcePlacement", crpName)
return runtime.Result{}, err
case applyStrategyUpdated:
// After the apply strategy is updated (a spec change), all status conditions on the
// ClusterResourceBinding object will become stale. To simplify the workflow of
// the rollout controller, Fleet will requeue the request now, and let the subsequent
// reconciliation loop to handle the status condition refreshing.
//
// Note that work generator will skip processing ClusterResourceBindings with stale
// RolloutStarted conditions.
klog.V(2).InfoS("Apply strategy has been updated; requeue the request", "clusterResourcePlacement", crpName)
return reconcile.Result{Requeue: true}, nil
default:
klog.V(2).InfoS("Apply strategy is up to date on all bindings; continue with the rollout process", "clusterResourcePlacement", crpName)
}

// handle the case that a cluster was unselected by the scheduler and then selected again but the unselected binding is not completely deleted yet
Expand Down Expand Up @@ -144,7 +158,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim

// pick the bindings to be updated according to the rollout plan
// staleBoundBindings is a list of "Bound" bindings and are not selected in this round because of the rollout strategy.
toBeUpdatedBindings, staleBoundBindings, needRoll, waitTime, err := r.pickBindingsToRoll(ctx, allBindings, latestResourceSnapshot, &crp, matchedCRO, matchedRO)
toBeUpdatedBindings, staleBoundBindings, upToDateBoundBindings, needRoll, waitTime, err := r.pickBindingsToRoll(ctx, allBindings, latestResourceSnapshot, &crp, matchedCRO, matchedRO)
if err != nil {
klog.ErrorS(err, "Failed to pick the bindings to roll", "clusterResourcePlacement", crpName)
return runtime.Result{}, err
Expand All @@ -157,16 +171,40 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
// Here it will correct the binding status just in case this happens last time.
return runtime.Result{}, r.checkAndUpdateStaleBindingsStatus(ctx, allBindings)
}
klog.V(2).InfoS("Picked the bindings to be updated", "clusterResourcePlacement", crpName, "numberOfBindings", len(toBeUpdatedBindings), "numberOfStaleBindings", len(staleBoundBindings))

// StaleBindings is the list that contains bindings that need to be updated but are blocked by the rollout strategy.
// Update the status first, so that if the rolling out (updateBindings func) fails in the middle, the controller will
// recompute the list and the result may be different.
klog.V(2).InfoS("Picked the bindings to be updated",
"clusterResourcePlacement", crpName,
"numberOfToBeUpdatedBindings", len(toBeUpdatedBindings),
"numberOfStaleBindings", len(staleBoundBindings),
"numberOfUpToDateBindings", len(upToDateBoundBindings))

// StaleBindings is the list that contains bindings that need to be updated (binding to a
// cluster, upgrading to a newer resource/override snapshot) but are blocked by
// the rollout strategy.
//
// Note that Fleet does not consider unscheduled bindings as stale bindings, even if the
// status conditions on them have become stale (the work generator will handle them as an
// exception).
//
// TO-DO (chenyu1): evaluate how we could improve the flow to reduce coupling.
//
// Update the status first, so that if the rolling out (updateBindings func) fails in the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this comment mean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Ryan! This (Update the status first...) was the original comment I believe.

// middle, the controller will recompute the list so the rollout can move forward.
if err := r.updateStaleBindingsStatus(ctx, staleBoundBindings); err != nil {
return runtime.Result{}, err
}
klog.V(2).InfoS("Successfully updated status of the stale bindings", "clusterResourcePlacement", crpName, "numberOfStaleBindings", len(staleBoundBindings))

// upToDateBoundBindings contains all the ClusterResourceBindings that does not need to have
// their resource/override snapshots updated, but might need to have their status updated.
//
// Bindings might have up to date resource/override snapshots but stale status information when
// an apply strategy update has just been applied, or an error has occurred during the
// previous rollout process (specifically after the spec update but before the status update).
if err := r.refreshUpToDateBindingStatus(ctx, upToDateBoundBindings); err != nil {
return runtime.Result{}, err
}
klog.V(2).InfoS("Successfully updated status of the up-to-date bindings", "clusterResourcePlacement", crpName, "numberOfUpToDateBindings", len(upToDateBoundBindings))

// Update all the bindings in parallel according to the rollout plan.
// We need to requeue the request regardless if the binding updates succeed or not
// to avoid the case that the rollout process stalling because the time based binding readiness does not trigger any event.
Expand Down Expand Up @@ -309,8 +347,14 @@ func createUpdateInfo(binding *fleetv1beta1.ClusterResourceBinding,
// if there are out of sync bindings.
// Thus, it also returns a bool indicating whether there are out of sync bindings to be rolled to differentiate those
// two cases.
func (r *Reconciler) pickBindingsToRoll(ctx context.Context, allBindings []*fleetv1beta1.ClusterResourceBinding, latestResourceSnapshot *fleetv1beta1.ClusterResourceSnapshot, crp *fleetv1beta1.ClusterResourcePlacement,
matchedCROs []*fleetv1alpha1.ClusterResourceOverrideSnapshot, matchedROs []*fleetv1alpha1.ResourceOverrideSnapshot) ([]toBeUpdatedBinding, []toBeUpdatedBinding, bool, time.Duration, error) {
func (r *Reconciler) pickBindingsToRoll(
ctx context.Context,
allBindings []*fleetv1beta1.ClusterResourceBinding,
latestResourceSnapshot *fleetv1beta1.ClusterResourceSnapshot,
crp *fleetv1beta1.ClusterResourcePlacement,
matchedCROs []*fleetv1alpha1.ClusterResourceOverrideSnapshot,
matchedROs []*fleetv1alpha1.ResourceOverrideSnapshot,
) ([]toBeUpdatedBinding, []toBeUpdatedBinding, []toBeUpdatedBinding, bool, time.Duration, error) {
// Those are the bindings that are chosen by the scheduler to be applied to selected clusters.
// They include the bindings that are already applied to the clusters and the bindings that are newly selected by the scheduler.
schedulerTargetedBinds := make([]*fleetv1beta1.ClusterResourceBinding, 0)
Expand Down Expand Up @@ -340,6 +384,10 @@ func (r *Reconciler) pickBindingsToRoll(ctx context.Context, allBindings []*flee
// minimum AvailableNumber of copies as we won't reduce the total unavailable number of bindings.
applyFailedUpdateCandidates := make([]toBeUpdatedBinding, 0)

// Those are the bindings that have been bound to a cluster and have the latest
// resource/override snapshots, but might or might not have the refresh status information.
upToDateBoundBindings := make([]toBeUpdatedBinding, 0)

// calculate the cutoff time for a binding to be applied before so that it can be considered ready
readyTimeCutOff := time.Now().Add(-time.Duration(*crp.Spec.Strategy.RollingUpdate.UnavailablePeriodSeconds) * time.Second)

Expand Down Expand Up @@ -385,7 +433,7 @@ func (r *Reconciler) pickBindingsToRoll(ctx context.Context, allBindings []*flee
// PickFromResourceMatchedOverridesForTargetCluster always returns the ordered list of the overrides.
cro, ro, err := overrider.PickFromResourceMatchedOverridesForTargetCluster(ctx, r.Client, binding.Spec.TargetCluster, matchedCROs, matchedROs)
if err != nil {
return nil, nil, false, minWaitTime, err
return nil, nil, nil, false, minWaitTime, err
}
boundingCandidates = append(boundingCandidates, createUpdateInfo(binding, latestResourceSnapshot, cro, ro))
case fleetv1beta1.BindingStateBound:
Expand Down Expand Up @@ -414,7 +462,7 @@ func (r *Reconciler) pickBindingsToRoll(ctx context.Context, allBindings []*flee
// PickFromResourceMatchedOverridesForTargetCluster always returns the ordered list of the overrides.
cro, ro, err := overrider.PickFromResourceMatchedOverridesForTargetCluster(ctx, r.Client, binding.Spec.TargetCluster, matchedCROs, matchedROs)
if err != nil {
return nil, nil, false, 0, err
return nil, nil, nil, false, 0, err
}
// The binding needs update if it's not pointing to the latest resource resourceBinding or the overrides.
if binding.Spec.ResourceSnapshotName != latestResourceSnapshot.Name || !equality.Semantic.DeepEqual(binding.Spec.ClusterResourceOverrideSnapshots, cro) || !equality.Semantic.DeepEqual(binding.Spec.ResourceOverrideSnapshots, ro) {
Expand All @@ -425,6 +473,10 @@ func (r *Reconciler) pickBindingsToRoll(ctx context.Context, allBindings []*flee
} else {
updateCandidates = append(updateCandidates, updateInfo)
}
} else {
// The binding does not need update, but Fleet might need to refresh its status
// information.
upToDateBoundBindings = append(upToDateBoundBindings, toBeUpdatedBinding{currentBinding: binding})
}
} else if bindingReady {
// it is being deleted, it can be removed from the cluster at any time, so it can be unavailable at any time
Expand All @@ -447,13 +499,13 @@ func (r *Reconciler) pickBindingsToRoll(ctx context.Context, allBindings []*flee
// the list of bindings that are to be updated by this rolling phase
toBeUpdatedBindingList := make([]toBeUpdatedBinding, 0)
if len(removeCandidates)+len(updateCandidates)+len(boundingCandidates)+len(applyFailedUpdateCandidates) == 0 {
return toBeUpdatedBindingList, nil, false, minWaitTime, nil
return toBeUpdatedBindingList, nil, upToDateBoundBindings, false, minWaitTime, nil
}

toBeUpdatedBindingList, staleUnselectedBinding := determineBindingsToUpdate(crp, removeCandidates, updateCandidates, boundingCandidates, applyFailedUpdateCandidates, targetNumber,
readyBindings, canBeReadyBindings, canBeUnavailableBindings)

return toBeUpdatedBindingList, staleUnselectedBinding, true, minWaitTime, nil
return toBeUpdatedBindingList, staleUnselectedBinding, upToDateBoundBindings, true, minWaitTime, nil
}

// determineBindingsToUpdate determines which bindings to update
Expand Down Expand Up @@ -866,6 +918,22 @@ func (r *Reconciler) updateStaleBindingsStatus(ctx context.Context, staleBinding
return errs.Wait()
}

// refreshUpToDateBindingStatus refreshes the status of all up-to-date bindings.
func (r *Reconciler) refreshUpToDateBindingStatus(ctx context.Context, upToDateBindings []toBeUpdatedBinding) error {
if len(upToDateBindings) == 0 {
return nil
}
// Issue all the requests in parallel.
errs, cctx := errgroup.WithContext(ctx)
for i := 0; i < len(upToDateBindings); i++ {
binding := upToDateBindings[i]
errs.Go(func() error {
return r.updateBindingStatus(cctx, binding.currentBinding, true)
})
}
return errs.Wait()
}

func (r *Reconciler) updateBindingStatus(ctx context.Context, binding *fleetv1beta1.ClusterResourceBinding, rolloutStarted bool) error {
cond := metav1.Condition{
Type: string(fleetv1beta1.ResourceBindingRolloutStarted),
Expand Down Expand Up @@ -898,7 +966,7 @@ func (r *Reconciler) processApplyStrategyUpdates(
ctx context.Context,
crp *fleetv1beta1.ClusterResourcePlacement,
allBindings []*fleetv1beta1.ClusterResourceBinding,
) error {
) (applyStrategyUpdated bool, err error) {
applyStrategy := crp.Spec.Strategy.ApplyStrategy
if applyStrategy == nil {
// Initialize the apply strategy with default values; normally this would not happen
Expand Down Expand Up @@ -934,6 +1002,7 @@ func (r *Reconciler) processApplyStrategyUpdates(
// controller; to avoid unnecessary conflicts, Fleet will patch the field directly.
updatedBinding := binding.DeepCopy()
updatedBinding.Spec.ApplyStrategy = applyStrategy
applyStrategyUpdated = true

errs.Go(func() error {
if err := r.Client.Patch(childCtx, updatedBinding, client.MergeFrom(binding)); err != nil {
Expand All @@ -947,7 +1016,7 @@ func (r *Reconciler) processApplyStrategyUpdates(

// The patches are issued in parallel; wait for all of them to complete (or the first error
// to return).
return errs.Wait()
return applyStrategyUpdated, errs.Wait()
}

// handleCRP handles the update event of a ClusterResourcePlacement, which the rollout controller
Expand Down
80 changes: 79 additions & 1 deletion pkg/controllers/rollout/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
"go.goms.io/fleet/pkg/controllers/work"
"go.goms.io/fleet/pkg/utils"
"go.goms.io/fleet/pkg/utils/condition"
)

const (
Expand All @@ -37,6 +38,7 @@ const (
var (
ignoreCRBTypeMetaAndStatusFields = cmpopts.IgnoreFields(fleetv1beta1.ClusterResourceBinding{}, "TypeMeta", "Status")
ignoreObjectMetaAutoGenFields = cmpopts.IgnoreFields(metav1.ObjectMeta{}, "CreationTimestamp", "Generation", "ResourceVersion", "SelfLink", "UID", "ManagedFields")
ignoreCondLTTAndMessageFields = cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime", "Message")
)

var testCRPName string
Expand Down Expand Up @@ -103,7 +105,7 @@ var _ = Describe("Test the rollout Controller", func() {
}, timeout, interval).Should(BeTrue(), "rollout controller should roll all the bindings to Bound state")
})

It("should push apply strategy changes to all the bindings (if applicable)", func() {
It("should push apply strategy changes to all the bindings (if applicable) and refresh their status", func() {
// Create a CRP.
targetClusterCount := int32(3)
rolloutCRP = clusterResourcePlacementForTest(
Expand Down Expand Up @@ -176,6 +178,44 @@ var _ = Describe("Test the rollout Controller", func() {
return nil
}, timeout, interval).Should(Succeed(), "Failed to verify that all the bindings are bound")

// Verify that all bindings have their status refreshed (i.e., have fresh RolloutStarted
// conditions).
Eventually(func() error {
for _, binding := range bindings {
gotBinding := &fleetv1beta1.ClusterResourceBinding{}
if err := k8sClient.Get(ctx, types.NamespacedName{Name: binding.GetName()}, gotBinding); err != nil {
return fmt.Errorf("failed to get binding %s: %w", binding.Name, err)
}

wantBindingStatus := &fleetv1beta1.ResourceBindingStatus{
Conditions: []metav1.Condition{
{
Type: string(fleetv1beta1.ResourceBindingRolloutStarted),
Status: metav1.ConditionTrue,
Reason: condition.RolloutStartedReason,
ObservedGeneration: gotBinding.Generation,
},
},
}
// The scheduled binding will be set to the Bound state with the RolloutStarted
// condition set to True; the bound binding will receive a True RolloutStarted
// condition; the unscheduled binding will have no RolloutStarted condition update.
if binding.Spec.State == fleetv1beta1.BindingStateUnscheduled {
wantBindingStatus = &fleetv1beta1.ResourceBindingStatus{
Conditions: []metav1.Condition{},
}
}
if diff := cmp.Diff(
&gotBinding.Status, wantBindingStatus,
ignoreCondLTTAndMessageFields,
cmpopts.EquateEmpty(),
); diff != "" {
return fmt.Errorf("binding status diff (%v/%v) (-got, +want):\n%s", binding.Spec.State, gotBinding.Spec.State, diff)
}
}
return nil
}, timeout, interval).Should(Succeed(), "Failed to verify that all the bindings have their status refreshed")

// Update the CRP with a new apply strategy.
rolloutCRP.Spec.Strategy.ApplyStrategy = &fleetv1beta1.ApplyStrategy{
ComparisonOption: fleetv1beta1.ComparisonOptionTypeFullComparison,
Expand Down Expand Up @@ -231,6 +271,44 @@ var _ = Describe("Test the rollout Controller", func() {
}
return nil
}, timeout, interval).Should(Succeed(), "Failed to update all bindings with the new apply strategy")

// Verify that all bindings have their status refreshed (i.e., have fresh RolloutStarted
// conditions).
Eventually(func() error {
for _, binding := range bindings {
gotBinding := &fleetv1beta1.ClusterResourceBinding{}
if err := k8sClient.Get(ctx, types.NamespacedName{Name: binding.GetName()}, gotBinding); err != nil {
return fmt.Errorf("failed to get binding %s: %w", binding.Name, err)
}

wantBindingStatus := &fleetv1beta1.ResourceBindingStatus{
Conditions: []metav1.Condition{
{
Type: string(fleetv1beta1.ResourceBindingRolloutStarted),
Status: metav1.ConditionTrue,
Reason: condition.RolloutStartedReason,
ObservedGeneration: gotBinding.Generation,
},
},
}
// The scheduled binding will be set to the Bound state with the RolloutStarted
// condition set to True; the bound binding will receive a True RolloutStarted
// condition; the unscheduled binding will have no RolloutStarted condition update.
if binding.Spec.State == fleetv1beta1.BindingStateUnscheduled {
wantBindingStatus = &fleetv1beta1.ResourceBindingStatus{
Conditions: []metav1.Condition{},
}
}
if diff := cmp.Diff(
&gotBinding.Status, wantBindingStatus,
ignoreCondLTTAndMessageFields,
cmpopts.EquateEmpty(),
); diff != "" {
return fmt.Errorf("binding status diff (%v/%v) (-got, +want):\n%s", binding.Spec.State, gotBinding.Spec.State, diff)
}
}
return nil
}, timeout, interval).Should(Succeed(), "Failed to verify that all the bindings have their status refreshed")
})

It("Should rollout all the selected bindings when the rollout strategy is not set", func() {
Expand Down
Loading
Loading