Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion apis/placement/v1/binding_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ import (
// +kubebuilder:object:root=true
// +kubebuilder:resource:scope=Cluster,categories={fleet,fleet-placement},shortName=rb
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:JSONPath=`.status.conditions[?(@.type=="WorkSynchronized")].status`,name="WorkCreated",type=string
// +kubebuilder:printcolumn:JSONPath=`.status.conditions[?(@.type=="WorkSynchronized")].status`,name="WorkSynchronized",type=string
// +kubebuilder:printcolumn:JSONPath=`.status.conditions[?(@.type=="Applied")].status`,name="ResourcesApplied",type=string
// +kubebuilder:printcolumn:JSONPath=`.status.conditions[?(@.type=="Available")].status`,name="ResourceAvailable",priority=1,type=string
// +kubebuilder:printcolumn:JSONPath=`.metadata.creationTimestamp`,name="Age",type=date

// ClusterResourceBinding represents a scheduling decision that binds a group of resources to a cluster.
Expand Down
3 changes: 2 additions & 1 deletion apis/placement/v1beta1/binding_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ const (
// +kubebuilder:resource:scope=Cluster,categories={fleet,fleet-placement},shortName=rb
// +kubebuilder:subresource:status
// +kubebuilder:storageversion
// +kubebuilder:printcolumn:JSONPath=`.status.conditions[?(@.type=="WorkSynchronized")].status`,name="WorkCreated",type=string
// +kubebuilder:printcolumn:JSONPath=`.status.conditions[?(@.type=="WorkSynchronized")].status`,name="WorkSynchronized",type=string
// +kubebuilder:printcolumn:JSONPath=`.status.conditions[?(@.type=="Applied")].status`,name="ResourcesApplied",type=string
// +kubebuilder:printcolumn:JSONPath=`.status.conditions[?(@.type=="Available")].status`,name="ResourceAvailable",priority=1,type=string
// +kubebuilder:printcolumn:JSONPath=`.metadata.creationTimestamp`,name="Age",type=date

// ClusterResourceBinding represents a scheduling decision that binds a group of resources to a cluster.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@ spec:
versions:
- additionalPrinterColumns:
- jsonPath: .status.conditions[?(@.type=="WorkSynchronized")].status
name: WorkCreated
name: WorkSynchronized
type: string
- jsonPath: .status.conditions[?(@.type=="Applied")].status
name: ResourcesApplied
type: string
- jsonPath: .status.conditions[?(@.type=="Available")].status
name: ResourceAvailable
priority: 1
type: string
- jsonPath: .metadata.creationTimestamp
name: Age
type: date
Expand Down Expand Up @@ -388,11 +392,15 @@ spec:
status: {}
- additionalPrinterColumns:
- jsonPath: .status.conditions[?(@.type=="WorkSynchronized")].status
name: WorkCreated
name: WorkSynchronized
type: string
- jsonPath: .status.conditions[?(@.type=="Applied")].status
name: ResourcesApplied
type: string
- jsonPath: .status.conditions[?(@.type=="Available")].status
name: ResourceAvailable
priority: 1
type: string
- jsonPath: .metadata.creationTimestamp
name: Age
type: date
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,10 @@ spec:
description: |-
Value defines the content to be applied on the target location.
Value should be empty when operator is `remove`.
We have reserved a few variables in this field that will be replaced by the actual values.
Those variables all start with `$` and are case sensitive.
Here is the list of currently supported variables:
`${MEMBER-CLUSTER-NAME}`: this will be replaced by the name of the memberCluster CR that represents this cluster.
x-kubernetes-preserve-unknown-fields: true
required:
- op
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,10 @@ spec:
description: |-
Value defines the content to be applied on the target location.
Value should be empty when operator is `remove`.
We have reserved a few variables in this field that will be replaced by the actual values.
Those variables all start with `$` and are case sensitive.
Here is the list of currently supported variables:
`${MEMBER-CLUSTER-NAME}`: this will be replaced by the name of the memberCluster CR that represents this cluster.
x-kubernetes-preserve-unknown-fields: true
required:
- op
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,10 @@ spec:
description: |-
Value defines the content to be applied on the target location.
Value should be empty when operator is `remove`.
We have reserved a few variables in this field that will be replaced by the actual values.
Those variables all start with `$` and are case sensitive.
Here is the list of currently supported variables:
`${MEMBER-CLUSTER-NAME}`: this will be replaced by the name of the memberCluster CR that represents this cluster.
x-kubernetes-preserve-unknown-fields: true
required:
- op
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,10 @@ spec:
description: |-
Value defines the content to be applied on the target location.
Value should be empty when operator is `remove`.
We have reserved a few variables in this field that will be replaced by the actual values.
Those variables all start with `$` and are case sensitive.
Here is the list of currently supported variables:
`${MEMBER-CLUSTER-NAME}`: this will be replaced by the name of the memberCluster CR that represents this cluster.
x-kubernetes-preserve-unknown-fields: true
required:
- op
Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/clusterresourcebindingwatcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ type Reconciler struct {
PlacementController controller.Controller
}

// Reconcile reconciles the clusterResourceBinding.
// Reconcile watches the clusterResourceBinding and enqueues the corresponding CRP name for those bindings whose
// status has changed. This is for the CRP controller to update the corresponding placementStatuses.
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
bindingRef := klog.KRef("", req.Name)

Expand Down
67 changes: 49 additions & 18 deletions pkg/controllers/rollout/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
runtime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
ctrl "sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

fleetv1alpha1 "go.goms.io/fleet/apis/placement/v1alpha1"
Expand Down Expand Up @@ -161,9 +159,9 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
}
klog.V(2).InfoS("Picked the bindings to be updated", "clusterResourcePlacement", crpName, "numberOfBindings", len(toBeUpdatedBindings), "numberOfStaleBindings", len(staleBoundBindings))

// StaleBindings is the list that contains bindings that need to be updated but are blocked by the rollout strategy.
// Update the status first, so that if the rolling out (updateBindings func) fails in the middle, the controller will
// recompute the list and the result may be different.
// As far as now, these bindings are blocked by the rollout strategy.
if err := r.updateStaleBindingsStatus(ctx, staleBoundBindings); err != nil {
return runtime.Result{}, err
}
Expand Down Expand Up @@ -357,13 +355,13 @@ func (r *Reconciler) pickBindingsToRoll(ctx context.Context, allBindings []*flee
switch binding.Spec.State {
case fleetv1beta1.BindingStateUnscheduled:
if bindingutils.HasBindingFailed(binding) {
klog.V(3).InfoS("Found a failed to be ready unscheduled binding", "clusterResourcePlacement", crpKObj, "binding", bindingKObj)
klog.V(2).InfoS("Found a failed to be ready unscheduled binding", "clusterResourcePlacement", crpKObj, "binding", bindingKObj)
} else {
canBeReadyBindings = append(canBeReadyBindings, binding)
}
waitTime, bindingReady := isBindingReady(binding, readyTimeCutOff)
if bindingReady {
klog.V(3).InfoS("Found a ready unscheduled binding", "clusterResourcePlacement", crpKObj, "binding", bindingKObj)
klog.V(2).InfoS("Found a ready unscheduled binding", "clusterResourcePlacement", crpKObj, "binding", bindingKObj)
readyBindings = append(readyBindings, binding)
} else {
allReady = false
Expand All @@ -373,7 +371,7 @@ func (r *Reconciler) pickBindingsToRoll(ctx context.Context, allBindings []*flee
}
if binding.DeletionTimestamp.IsZero() {
// it's not been deleted yet, so it is a removal candidate
klog.V(3).InfoS("Found a not yet deleted unscheduled binding", "clusterResourcePlacement", crpKObj, "binding", bindingKObj)
klog.V(2).InfoS("Found a not yet deleted unscheduled binding", "clusterResourcePlacement", crpKObj, "binding", bindingKObj)
// The desired binding is nil for the removeCandidates.
removeCandidates = append(removeCandidates, toBeUpdatedBinding{currentBinding: binding})
} else if bindingReady {
Expand All @@ -395,7 +393,7 @@ func (r *Reconciler) pickBindingsToRoll(ctx context.Context, allBindings []*flee
schedulerTargetedBinds = append(schedulerTargetedBinds, binding)
waitTime, bindingReady := isBindingReady(binding, readyTimeCutOff)
if bindingReady {
klog.V(3).InfoS("Found a ready bound binding", "clusterResourcePlacement", crpKObj, "binding", bindingKObj)
klog.V(2).InfoS("Found a ready bound binding", "clusterResourcePlacement", crpKObj, "binding", bindingKObj)
readyBindings = append(readyBindings, binding)
} else {
allReady = false
Expand All @@ -405,7 +403,7 @@ func (r *Reconciler) pickBindingsToRoll(ctx context.Context, allBindings []*flee
}
// check if the binding is failed or still on going
if bindingutils.HasBindingFailed(binding) {
klog.V(3).InfoS("Found a failed to be ready bound binding", "clusterResourcePlacement", crpKObj, "binding", bindingKObj)
klog.V(2).InfoS("Found a failed to be ready bound binding", "clusterResourcePlacement", crpKObj, "binding", bindingKObj)
bindingFailed = true
} else {
canBeReadyBindings = append(canBeReadyBindings, binding)
Expand Down Expand Up @@ -443,7 +441,8 @@ func (r *Reconciler) pickBindingsToRoll(ctx context.Context, allBindings []*flee
klog.V(2).InfoS("Calculated the targetNumber", "clusterResourcePlacement", crpKObj,
"targetNumber", targetNumber, "readyBindingNumber", len(readyBindings), "canBeUnavailableBindingNumber", len(canBeUnavailableBindings),
"canBeReadyBindingNumber", len(canBeReadyBindings), "boundingCandidateNumber", len(boundingCandidates),
"removeCandidateNumber", len(removeCandidates), "updateCandidateNumber", len(updateCandidates), "applyFailedUpdateCandidateNumber", len(applyFailedUpdateCandidates))
"removeCandidateNumber", len(removeCandidates), "updateCandidateNumber", len(updateCandidates), "applyFailedUpdateCandidateNumber",
len(applyFailedUpdateCandidates), "minWaitTime", minWaitTime)

// the list of bindings that are to be updated by this rolling phase
toBeUpdatedBindingList := make([]toBeUpdatedBinding, 0)
Expand Down Expand Up @@ -496,7 +495,7 @@ func determineBindingsToUpdate(
for ; boundingCandidatesUnselectedIndex < maxNumberToAdd && boundingCandidatesUnselectedIndex < len(boundingCandidates); boundingCandidatesUnselectedIndex++ {
toBeUpdatedBindingList = append(toBeUpdatedBindingList, boundingCandidates[boundingCandidatesUnselectedIndex])
}

// Those are the bindings that are not up to date but not selected to be updated in this round because of the rollout constraints.
staleUnselectedBinding := make([]toBeUpdatedBinding, 0)
if updateCandidateUnselectedIndex < len(updateCandidates) {
staleUnselectedBinding = append(staleUnselectedBinding, updateCandidates[updateCandidateUnselectedIndex:]...)
Expand Down Expand Up @@ -580,6 +579,7 @@ func isBindingReady(binding *fleetv1beta1.ClusterResourceBinding, readyTimeCutOf
return waitTime, false
}
// we don't know when the current spec is available yet, return a negative wait time
// since we will reconcile again after the binding status changes
return -1, false
}

Expand Down Expand Up @@ -669,17 +669,16 @@ func (r *Reconciler) SetupWithManager(mgr runtime.Manager) error {
Watches(&fleetv1beta1.ClusterResourceBinding{}, handler.Funcs{
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) {
klog.V(2).InfoS("Handling a resourceBinding create event", "resourceBinding", klog.KObj(e.Object))
handleResourceBinding(e.Object, q)
enqueueResourceBinding(e.Object, q)
},
UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) {
klog.V(2).InfoS("Handling a resourceBinding update event", "resourceBinding", klog.KObj(e.ObjectNew))
handleResourceBinding(e.ObjectNew, q)
handleResourceBindingUpdated(e.ObjectNew, e.ObjectOld, q)
},
GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.RateLimitingInterface) {
klog.V(2).InfoS("Handling a resourceBinding generic event", "resourceBinding", klog.KObj(e.Object))
handleResourceBinding(e.Object, q)
enqueueResourceBinding(e.Object, q)
},
}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
}).
// Aside from ClusterResourceSnapshot and ClusterResourceBinding objects, the rollout
// controller also watches ClusterResourcePlacement objects, so that it can push apply
// strategy updates to all bindings right away.
Expand Down Expand Up @@ -795,8 +794,8 @@ func handleResourceSnapshot(snapshot client.Object, q workqueue.RateLimitingInte
})
}

// handleResourceBinding parse the binding label and enqueue the CRP name associated with the resource binding
func handleResourceBinding(binding client.Object, q workqueue.RateLimitingInterface) {
// enqueueResourceBinding parse the binding label and enqueue the CRP name associated with the resource binding
func enqueueResourceBinding(binding client.Object, q workqueue.RateLimitingInterface) {
bindingRef := klog.KObj(binding)
// get the CRP name from the label
crp := binding.GetLabels()[fleetv1beta1.CRPTrackingLabel]
Expand All @@ -812,6 +811,38 @@ func handleResourceBinding(binding client.Object, q workqueue.RateLimitingInterf
})
}

// handleResourceBindingUpdated determines the action to take when a resource binding is updated.
// we only care about the Available and DiffReported condition change.
func handleResourceBindingUpdated(objectOld, objectNew client.Object, q workqueue.RateLimitingInterface) {
// Check if the update event is valid.
if objectOld == nil || objectNew == nil {
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("update event is nil")), "Failed to process update event")
return
}
oldBinding, oldOk := objectOld.(*fleetv1beta1.ClusterResourceBinding)
newBinding, newOk := objectNew.(*fleetv1beta1.ClusterResourceBinding)
if !oldOk || !newOk {
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("failed to cast runtime objects in update event to cluster resource binding objects")), "Failed to process update event")
}
if oldBinding.GetGeneration() != newBinding.GetGeneration() {
klog.V(2).InfoS("The binding spec have changed, need to notify rollout controller", "binding", klog.KObj(newBinding))
enqueueResourceBinding(newBinding, q)
return
}
// these are the conditions we care about
conditionsToMonitor := []string{string(fleetv1beta1.ResourceBindingDiffReported), string(fleetv1beta1.ResourceBindingAvailable)}
for _, conditionType := range conditionsToMonitor {
oldCond := oldBinding.GetCondition(conditionType)
newCond := newBinding.GetCondition(conditionType)
if !condition.EqualCondition(oldCond, newCond) {
klog.V(2).InfoS("The binding status have changed, need to notify rollout controller", "binding", klog.KObj(newBinding), "conditionType", conditionType)
enqueueResourceBinding(newBinding, q)
return
}
}
klog.V(2).InfoS("A resourceBinding is updated but we don't need to handle it", "resourceBinding", klog.KObj(newBinding))
}

// updateStaleBindingsStatus updates the status of the stale bindings to indicate that they are blocked by the rollout strategy.
// Note: the binding state should be "Scheduled" or "Bound".
// The desired binding will be ignored.
Expand Down Expand Up @@ -893,7 +924,7 @@ func (r *Reconciler) processApplyStrategyUpdates(
// Verify if the binding has the latest apply strategy set.
if equality.Semantic.DeepEqual(binding.Spec.ApplyStrategy, applyStrategy) {
// The binding already has the latest apply strategy set; no need to push the update.
klog.V(2).InfoS("The binding already has the latest apply strategy; skip the apply strategy update", "clusterResourceBinding", klog.KObj(binding))
klog.V(3).InfoS("The binding already has the latest apply strategy; skip the apply strategy update", "clusterResourceBinding", klog.KObj(binding))
continue
}

Expand Down
Loading
Loading