Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package machinehealthcheck
import (
"context"
"fmt"
"maps"
"reflect"
"sort"
"strconv"
"strings"
Expand All @@ -39,6 +41,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -88,8 +91,9 @@ type Reconciler struct {
// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string

controller controller.Controller
recorder record.EventRecorder
controller controller.Controller
recorder record.EventRecorder
overrideRateLimit time.Duration

predicateLog *logr.Logger
}
Expand All @@ -99,12 +103,18 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt
return errors.New("Client and ClusterCache must not be nil")
}

rateLimit := 15 * time.Second
if r.overrideRateLimit != time.Duration(0) {
rateLimit = r.overrideRateLimit
}

r.predicateLog = ptr.To(ctrl.LoggerFrom(ctx).WithValues("controller", "machinehealthcheck"))
c, err := capicontrollerutil.NewControllerManagedBy(mgr, *r.predicateLog).
For(&clusterv1.MachineHealthCheck{}).
Watches(
&clusterv1.Machine{},
handler.EnqueueRequestsFromMapFunc(r.machineToMachineHealthCheck),
machineIsChangedPredicate(),
).
WithOptions(options).
WithEventFilter(predicates.ResourceHasFilterLabel(mgr.GetScheme(), *r.predicateLog, r.WatchFilterValue)).
Expand All @@ -116,6 +126,10 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt
predicates.ResourceHasFilterLabel(mgr.GetScheme(), *r.predicateLog, r.WatchFilterValue),
).
WatchesRawSource(r.ClusterCache.GetClusterSource("machinehealthcheck", r.clusterToMachineHealthCheck)).
// Intentionally increasing the rate limit interval for MHC, because during stress tests we observed
// that this reconciler gets invoked very frequently due to watches on Machines and workload cluster Nodes,
// but in most cases observed Machines are ok or remediation must be deferred until timeouts will expire.
WithRateLimitInterval(rateLimit).
Build(r)
if err != nil {
return errors.Wrap(err, "failed setting up with a controller manager")
Expand All @@ -126,6 +140,38 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt
return nil
}

func machineIsChangedPredicate() predicate.Funcs {
return predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
mNew, ok := e.ObjectNew.(*clusterv1.Machine)
if !ok {
return false
}
mOld, ok := e.ObjectOld.(*clusterv1.Machine)
if !ok {
return false
}

// MHC only cares about changes on Machine conditions.
// It also uses Machine's NodeRef to fetch nodes, Machine's labels to select Machines to monitor,
// and also checks for paused, skip remediation, manual remediation annotations in Machine's annotations.
return mNew.Status.NodeRef.Name != mOld.Status.NodeRef.Name ||
!maps.Equal(mNew.Labels, mOld.Labels) ||
!maps.Equal(mNew.Annotations, mOld.Annotations) ||
!reflect.DeepEqual(mNew.Status.Conditions, mOld.Status.Conditions)
},
CreateFunc: func(_ event.CreateEvent) bool {
return true
},
DeleteFunc: func(_ event.DeleteEvent) bool {
return true
},
GenericFunc: func(_ event.GenericEvent) bool {
return false
},
}
}

func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Result, reterr error) {
log := ctrl.LoggerFrom(ctx)

Expand Down Expand Up @@ -611,10 +657,40 @@ func (r *Reconciler) watchClusterNodes(ctx context.Context, cluster *clusterv1.C
Watcher: r.controller,
Kind: &corev1.Node{},
EventHandler: handler.EnqueueRequestsFromMapFunc(r.nodeToMachineHealthCheck),
Predicates: []predicate.TypedPredicate[client.Object]{predicates.TypedResourceIsChanged[client.Object](r.Client.Scheme(), *r.predicateLog)},
Predicates: []predicate.TypedPredicate[client.Object]{
predicates.TypedResourceIsChanged[client.Object](r.Client.Scheme(), *r.predicateLog),
nodeIsChangedPredicate(),
},
}))
}

func nodeIsChangedPredicate() predicate.Funcs {
return predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
nNew, ok := e.ObjectNew.(*corev1.Node)
if !ok {
return false
}
nOld, ok := e.ObjectOld.(*corev1.Node)
if !ok {
return false
}

// MHC only cares about changes on Node conditions.
return !reflect.DeepEqual(nNew.Status.Conditions, nOld.Status.Conditions)
},
CreateFunc: func(_ event.CreateEvent) bool {
return true
},
DeleteFunc: func(_ event.DeleteEvent) bool {
return true
},
GenericFunc: func(_ event.GenericEvent) bool {
return false
},
}
}

// getMachineFromNode retrieves the machine with a nodeRef to nodeName
// There should at most one machine with a given nodeRef, returns an error otherwise.
func getMachineFromNode(ctx context.Context, c client.Client, nodeName string) (*clusterv1.Machine, error) {
Expand Down
2 changes: 2 additions & 0 deletions internal/controllers/machinehealthcheck/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ func TestMain(m *testing.M) {
if err := (&Reconciler{
Client: mgr.GetClient(),
ClusterCache: clusterCache,
// use a shorter rate limit for testing.
overrideRateLimit: 1 * time.Second,
}).SetupWithManager(ctx, mgr, controller.Options{MaxConcurrentReconciles: 1}); err != nil {
panic(fmt.Sprintf("Failed to start Reconciler : %v", err))
}
Expand Down
33 changes: 24 additions & 9 deletions util/controller/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@ const (

// Builder is a wrapper around controller-runtime's builder.Builder.
type Builder struct {
builder *builder.Builder
mgr manager.Manager
predicateLog logr.Logger
options controller.TypedOptions[reconcile.Request]
forObject client.Object
controllerName string
builder *builder.Builder
mgr manager.Manager
predicateLog logr.Logger
options controller.TypedOptions[reconcile.Request]
forObject client.Object
controllerName string
rateLimitInterval time.Duration
}

// NewControllerManagedBy returns a new controller builder that will be started by the provided Manager.
Expand Down Expand Up @@ -116,6 +117,13 @@ func (blder *Builder) WithOptions(options controller.TypedOptions[reconcile.Requ
return blder
}

// WithRateLimitInterval overrides the default rate limit interval, 1s.
// Note: intervals lower than 1s will be ignored.
func (blder *Builder) WithRateLimitInterval(interval time.Duration) *Builder {
blder.rateLimitInterval = interval
return blder
}

// WithEventFilter sets the event filters, to filter which create/update/delete/generic events eventually
// trigger reconciliations. For example, filtering on whether the resource version has changed.
func (blder *Builder) WithEventFilter(p predicate.Predicate) *Builder {
Expand Down Expand Up @@ -200,16 +208,23 @@ func (blder *Builder) Build(r reconcile.TypedReconciler[reconcile.Request]) (Con
}
}

// Sets the rate limit interval.
rateLimitInterval := time.Second
if blder.rateLimitInterval > time.Second {
rateLimitInterval = blder.rateLimitInterval
}

// Passing the options to the underlying builder here because we modified them above.
blder.builder.WithOptions(blder.options)

// Create reconcileCache.
reconcileCache := cache.New[reconcileCacheEntry](cache.DefaultTTL)

c, err := blder.builder.Build(reconcilerWrapper{
name: controllerName,
reconciler: r,
reconcileCache: reconcileCache,
name: controllerName,
reconciler: r,
reconcileCache: reconcileCache,
rateLimitInterval: rateLimitInterval,
})
if err != nil {
return nil, err
Expand Down
9 changes: 5 additions & 4 deletions util/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ import (
)

type reconcilerWrapper struct {
name string
reconcileCache cache.Cache[reconcileCacheEntry]
reconciler reconcile.Reconciler
name string
reconcileCache cache.Cache[reconcileCacheEntry]
reconciler reconcile.Reconciler
rateLimitInterval time.Duration
}

func (r reconcilerWrapper) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
Expand All @@ -52,7 +53,7 @@ func (r reconcilerWrapper) Reconcile(ctx context.Context, req reconcile.Request)
// Under certain circumstances the ReconcileAfter time will be set to a later time via DeferNextReconcile /
// DeferNextReconcileForObject, e.g. when we're waiting for Pods to terminate during node drain or
// volumes to detach. This is done to ensure we're not spamming the workload cluster API server.
r.reconcileCache.Add(reconcileCacheEntry{Request: req, ReconcileAfter: reconcileStartTime.Add(1 * time.Second)})
r.reconcileCache.Add(reconcileCacheEntry{Request: req, ReconcileAfter: reconcileStartTime.Add(r.rateLimitInterval)})
}

// Update metrics after processing each item
Expand Down
6 changes: 4 additions & 2 deletions util/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func TestReconcile(t *testing.T) {
reconcileCounter++
return reconcile.Result{}, nil
}),
rateLimitInterval: 1 * time.Second,
}
c := controllerWrapper{
reconcileCache: reconcileCache,
Expand Down Expand Up @@ -149,8 +150,9 @@ func TestReconcileMetrics(t *testing.T) {
g := NewWithT(t)

r := reconcilerWrapper{
name: "cluster",
reconcileCache: reconcileCache,
name: "cluster",
reconcileCache: reconcileCache,
rateLimitInterval: 1 * time.Second,
}

req := reconcile.Request{
Expand Down
Loading