Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 47 additions & 10 deletions pkg/controllers/clusterresourceplacement/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import (
"fmt"
"sort"
"strconv"
"time"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/klog/v2"
Expand All @@ -43,15 +45,50 @@ const (
resourcePlacementConditionScheduleSucceededWithScoreMessageFormat = "Successfully scheduled resources for placement in %s with clusterScore %+v: %s"
)

func (r *Reconciler) Reconcile(ctx context.Context, _ controller.QueueKey) (ctrl.Result, error) {
// TODO workaround to bypass lint check
return r.handleUpdate(ctx, nil)
func (r *Reconciler) Reconcile(ctx context.Context, key controller.QueueKey) (ctrl.Result, error) {
name, ok := key.(string)
if !ok {
err := fmt.Errorf("get place key %+v not of type string", key)
klog.ErrorS(controller.NewUnexpectedBehaviorError(err), "We have encountered a fatal error that can't be retried, requeue after a day")
return ctrl.Result{}, nil // ignore this unexpected error
}
startTime := time.Now()
klog.V(2).InfoS("ClusterResourcePlacement reconciliation starts", "clusterResourcePlacement", name)
defer func() {
latency := time.Since(startTime).Milliseconds()
klog.V(2).InfoS("ClusterResourcePlacement reconciliation ends", "clusterResourcePlacement", name, "latency", latency)
}()

crp := fleetv1beta1.ClusterResourcePlacement{}
if err := r.Client.Get(ctx, types.NamespacedName{Name: name}, &crp); err != nil {
if errors.IsNotFound(err) {
klog.V(4).InfoS("Ignoring NotFound clusterResourcePlacement", "clusterResourcePlacement", name)
return ctrl.Result{}, nil
}
klog.ErrorS(err, "Failed to get clusterResourcePlacement", "clusterResourcePlacement", name)
return ctrl.Result{}, controller.NewAPIServerError(true, err)
}

if crp.ObjectMeta.DeletionTimestamp != nil {
return r.handleDelete(ctx, &crp)
}

// register finalizer
if !controllerutil.ContainsFinalizer(&crp, fleetv1beta1.ClusterResourcePlacementCleanupFinalizer) {
controllerutil.AddFinalizer(&crp, fleetv1beta1.ClusterResourcePlacementCleanupFinalizer)
if err := r.Client.Update(ctx, &crp); err != nil {
klog.ErrorS(err, "Failed to add clusterResourcePlacement finalizer", "clusterResourcePlacement", name)
return ctrl.Result{}, controller.NewUpdateIgnoreConflictError(err)
}
}

return r.handleUpdate(ctx, &crp)
}

func (r *Reconciler) handleDelete(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement) (ctrl.Result, error) {
crpKObj := klog.KObj(crp)
if !controllerutil.ContainsFinalizer(crp, fleetv1beta1.ClusterResourcePlacementCleanupFinalizer) {
klog.V(4).InfoS("clusterResourcePlacement is being deleted and no cleanup work needs to be done", "clusterResourcePlacement", crpKObj)
klog.V(4).InfoS("clusterResourcePlacement is being deleted and no cleanup work needs to be done by the CRP controller, waiting for the scheduler to cleanup the bindings", "clusterResourcePlacement", crpKObj)
return ctrl.Result{}, nil
}
klog.V(2).InfoS("Removing snapshots created by clusterResourcePlacement", "clusterResourcePlacement", crpKObj)
Expand Down Expand Up @@ -175,7 +212,7 @@ func (r *Reconciler) getOrCreateClusterSchedulingPolicySnapshot(ctx context.Cont
latestPolicySnapshot.Labels[fleetv1beta1.IsLatestSnapshotLabel] = strconv.FormatBool(false)
if err := r.Client.Update(ctx, latestPolicySnapshot); err != nil {
klog.ErrorS(err, "Failed to set the isLatestSnapshot label to false", "clusterResourcePlacement", crpKObj, "clusterSchedulingPolicySnapshot", klog.KObj(latestPolicySnapshot))
return nil, controller.NewAPIServerError(false, err)
return nil, controller.NewUpdateIgnoreConflictError(err)
}
}

Expand Down Expand Up @@ -337,7 +374,7 @@ func (r *Reconciler) getOrCreateClusterResourceSnapshot(ctx context.Context, crp
latestResourceSnapshot.Labels[fleetv1beta1.IsLatestSnapshotLabel] = strconv.FormatBool(false)
if err := r.Client.Update(ctx, latestResourceSnapshot); err != nil {
klog.ErrorS(err, "Failed to set the isLatestSnapshot label to false", "clusterResourceSnapshot", klog.KObj(latestResourceSnapshot))
return nil, controller.NewAPIServerError(false, err)
return nil, controller.NewUpdateIgnoreConflictError(err)
}
}
// delete redundant snapshot revisions before creating a new snapshot to guarantee that the number of snapshots
Expand Down Expand Up @@ -411,7 +448,7 @@ func (r *Reconciler) ensureLatestPolicySnapshot(ctx context.Context, crp *fleetv
}
if err := r.Client.Update(ctx, latest); err != nil {
klog.ErrorS(err, "Failed to update the clusterSchedulingPolicySnapshot", "clusterSchedulingPolicySnapshot", klog.KObj(latest))
return controller.NewAPIServerError(false, err)
return controller.NewUpdateIgnoreConflictError(err)
}
return nil
}
Expand All @@ -428,7 +465,7 @@ func (r *Reconciler) ensureLatestResourceSnapshot(ctx context.Context, latest *f
latest.Labels[fleetv1beta1.IsLatestSnapshotLabel] = strconv.FormatBool(true)
if err := r.Client.Update(ctx, latest); err != nil {
klog.ErrorS(err, "Failed to update the clusterResourceSnapshot", "ClusterResourceSnapshot", klog.KObj(latest))
return controller.NewAPIServerError(false, err)
return controller.NewUpdateIgnoreConflictError(err)
}
return nil
}
Expand Down Expand Up @@ -536,7 +573,7 @@ func (r *Reconciler) lookupLatestResourceSnapshot(ctx context.Context, crp *flee
crpKObj := klog.KObj(crp)
if err := r.Client.List(ctx, snapshotList, latestSnapshotLabelMatcher); err != nil {
klog.ErrorS(err, "Failed to list active clusterResourceSnapshots", "clusterResourcePlacement", crpKObj)
return nil, -1, controller.NewAPIServerError(false, err)
return nil, -1, controller.NewAPIServerError(true, err)
}
if len(snapshotList.Items) == 1 {
resourceIndex, err := parseResourceIndexFromLabel(&snapshotList.Items[0])
Expand Down Expand Up @@ -579,7 +616,7 @@ func (r *Reconciler) listSortedResourceSnapshots(ctx context.Context, crp *fleet
crpKObj := klog.KObj(crp)
if err := r.Client.List(ctx, snapshotList, client.MatchingLabels{fleetv1beta1.CRPTrackingLabel: crp.Name}); err != nil {
klog.ErrorS(err, "Failed to list all clusterResourceSnapshots", "clusterResourcePlacement", crpKObj)
return nil, controller.NewAPIServerError(false, err)
return nil, controller.NewAPIServerError(true, err)
}
var errs []error
sort.Slice(snapshotList.Items, func(i, j int) bool {
Expand Down