From 5cea86ec6bc29ed382255ecaf1535c762d42a679 Mon Sep 17 00:00:00 2001 From: fabriziopandini Date: Wed, 4 Mar 2026 20:56:03 +0100 Subject: [PATCH] Increase MHC rate limiting and filter events --- .../machinehealthcheck_controller.go | 82 ++++++++++++++++++- .../machinehealthcheck/suite_test.go | 2 + util/controller/builder.go | 33 ++++++-- util/controller/controller.go | 9 +- util/controller/controller_test.go | 6 +- 5 files changed, 114 insertions(+), 18 deletions(-) diff --git a/internal/controllers/machinehealthcheck/machinehealthcheck_controller.go b/internal/controllers/machinehealthcheck/machinehealthcheck_controller.go index e489707f323b..17ca355e94db 100644 --- a/internal/controllers/machinehealthcheck/machinehealthcheck_controller.go +++ b/internal/controllers/machinehealthcheck/machinehealthcheck_controller.go @@ -19,6 +19,8 @@ package machinehealthcheck import ( "context" "fmt" + "maps" + "reflect" "sort" "strconv" "strings" @@ -39,6 +41,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -88,8 +91,9 @@ type Reconciler struct { // WatchFilterValue is the label value used to filter events prior to reconciliation. WatchFilterValue string - controller controller.Controller - recorder record.EventRecorder + controller controller.Controller + recorder record.EventRecorder + overrideRateLimit time.Duration predicateLog *logr.Logger } @@ -99,12 +103,18 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt return errors.New("Client and ClusterCache must not be nil") } + rateLimit := 15 * time.Second + if r.overrideRateLimit != time.Duration(0) { + rateLimit = r.overrideRateLimit + } + r.predicateLog = ptr.To(ctrl.LoggerFrom(ctx).WithValues("controller", "machinehealthcheck")) c, err := capicontrollerutil.NewControllerManagedBy(mgr, *r.predicateLog). For(&clusterv1.MachineHealthCheck{}). Watches( &clusterv1.Machine{}, handler.EnqueueRequestsFromMapFunc(r.machineToMachineHealthCheck), + machineIsChangedPredicate(), ). WithOptions(options). WithEventFilter(predicates.ResourceHasFilterLabel(mgr.GetScheme(), *r.predicateLog, r.WatchFilterValue)). @@ -116,6 +126,10 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt predicates.ResourceHasFilterLabel(mgr.GetScheme(), *r.predicateLog, r.WatchFilterValue), ). WatchesRawSource(r.ClusterCache.GetClusterSource("machinehealthcheck", r.clusterToMachineHealthCheck)). + // Intentionally increasing the rate limit interval for MHC, because during stress tests we observed + // that this reconciler gets invoked very frequently due to watches on Machines and workload cluster Nodes, + // but in most cases observed Machines are ok or remediation must be deferred until timeouts will expire. + WithRateLimitInterval(rateLimit). Build(r) if err != nil { return errors.Wrap(err, "failed setting up with a controller manager") @@ -126,6 +140,38 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt return nil } +func machineIsChangedPredicate() predicate.Funcs { + return predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + mNew, ok := e.ObjectNew.(*clusterv1.Machine) + if !ok { + return false + } + mOld, ok := e.ObjectOld.(*clusterv1.Machine) + if !ok { + return false + } + + // MHC only cares about changes on Machine conditions. + // It also uses Machine's NodeRef to fetch nodes, Machine's labels to select Machines to monitor, + // and also checks for paused, skip remediation, manual remediation annotations in Machine's annotations. + return mNew.Status.NodeRef.Name != mOld.Status.NodeRef.Name || + !maps.Equal(mNew.Labels, mOld.Labels) || + !maps.Equal(mNew.Annotations, mOld.Annotations) || + !reflect.DeepEqual(mNew.Status.Conditions, mOld.Status.Conditions) + }, + CreateFunc: func(_ event.CreateEvent) bool { + return true + }, + DeleteFunc: func(_ event.DeleteEvent) bool { + return true + }, + GenericFunc: func(_ event.GenericEvent) bool { + return false + }, + } +} + func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Result, reterr error) { log := ctrl.LoggerFrom(ctx) @@ -611,10 +657,40 @@ func (r *Reconciler) watchClusterNodes(ctx context.Context, cluster *clusterv1.C Watcher: r.controller, Kind: &corev1.Node{}, EventHandler: handler.EnqueueRequestsFromMapFunc(r.nodeToMachineHealthCheck), - Predicates: []predicate.TypedPredicate[client.Object]{predicates.TypedResourceIsChanged[client.Object](r.Client.Scheme(), *r.predicateLog)}, + Predicates: []predicate.TypedPredicate[client.Object]{ + predicates.TypedResourceIsChanged[client.Object](r.Client.Scheme(), *r.predicateLog), + nodeIsChangedPredicate(), + }, })) } +func nodeIsChangedPredicate() predicate.Funcs { + return predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + nNew, ok := e.ObjectNew.(*corev1.Node) + if !ok { + return false + } + nOld, ok := e.ObjectOld.(*corev1.Node) + if !ok { + return false + } + + // MHC only cares about changes on Node conditions. + return !reflect.DeepEqual(nNew.Status.Conditions, nOld.Status.Conditions) + }, + CreateFunc: func(_ event.CreateEvent) bool { + return true + }, + DeleteFunc: func(_ event.DeleteEvent) bool { + return true + }, + GenericFunc: func(_ event.GenericEvent) bool { + return false + }, + } +} + // getMachineFromNode retrieves the machine with a nodeRef to nodeName // There should at most one machine with a given nodeRef, returns an error otherwise. func getMachineFromNode(ctx context.Context, c client.Client, nodeName string) (*clusterv1.Machine, error) { diff --git a/internal/controllers/machinehealthcheck/suite_test.go b/internal/controllers/machinehealthcheck/suite_test.go index 40551c4222a1..b909de133105 100644 --- a/internal/controllers/machinehealthcheck/suite_test.go +++ b/internal/controllers/machinehealthcheck/suite_test.go @@ -91,6 +91,8 @@ func TestMain(m *testing.M) { if err := (&Reconciler{ Client: mgr.GetClient(), ClusterCache: clusterCache, + // use a shorter rate limit for testing. + overrideRateLimit: 1 * time.Second, }).SetupWithManager(ctx, mgr, controller.Options{MaxConcurrentReconciles: 1}); err != nil { panic(fmt.Sprintf("Failed to start Reconciler : %v", err)) } diff --git a/util/controller/builder.go b/util/controller/builder.go index a6aa186e52b4..319abc9faea2 100644 --- a/util/controller/builder.go +++ b/util/controller/builder.go @@ -46,12 +46,13 @@ const ( // Builder is a wrapper around controller-runtime's builder.Builder. type Builder struct { - builder *builder.Builder - mgr manager.Manager - predicateLog logr.Logger - options controller.TypedOptions[reconcile.Request] - forObject client.Object - controllerName string + builder *builder.Builder + mgr manager.Manager + predicateLog logr.Logger + options controller.TypedOptions[reconcile.Request] + forObject client.Object + controllerName string + rateLimitInterval time.Duration } // NewControllerManagedBy returns a new controller builder that will be started by the provided Manager. @@ -116,6 +117,13 @@ func (blder *Builder) WithOptions(options controller.TypedOptions[reconcile.Requ return blder } +// WithRateLimitInterval overrides the default rate limit interval, 1s. +// Note: intervals lower than 1s will be ignored. +func (blder *Builder) WithRateLimitInterval(interval time.Duration) *Builder { + blder.rateLimitInterval = interval + return blder +} + // WithEventFilter sets the event filters, to filter which create/update/delete/generic events eventually // trigger reconciliations. For example, filtering on whether the resource version has changed. func (blder *Builder) WithEventFilter(p predicate.Predicate) *Builder { @@ -200,6 +208,12 @@ func (blder *Builder) Build(r reconcile.TypedReconciler[reconcile.Request]) (Con } } + // Sets the rate limit interval. + rateLimitInterval := time.Second + if blder.rateLimitInterval > time.Second { + rateLimitInterval = blder.rateLimitInterval + } + // Passing the options to the underlying builder here because we modified them above. blder.builder.WithOptions(blder.options) @@ -207,9 +221,10 @@ func (blder *Builder) Build(r reconcile.TypedReconciler[reconcile.Request]) (Con reconcileCache := cache.New[reconcileCacheEntry](cache.DefaultTTL) c, err := blder.builder.Build(reconcilerWrapper{ - name: controllerName, - reconciler: r, - reconcileCache: reconcileCache, + name: controllerName, + reconciler: r, + reconcileCache: reconcileCache, + rateLimitInterval: rateLimitInterval, }) if err != nil { return nil, err diff --git a/util/controller/controller.go b/util/controller/controller.go index 041f188dba0f..f38faabc5d2e 100644 --- a/util/controller/controller.go +++ b/util/controller/controller.go @@ -32,9 +32,10 @@ import ( ) type reconcilerWrapper struct { - name string - reconcileCache cache.Cache[reconcileCacheEntry] - reconciler reconcile.Reconciler + name string + reconcileCache cache.Cache[reconcileCacheEntry] + reconciler reconcile.Reconciler + rateLimitInterval time.Duration } func (r reconcilerWrapper) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { @@ -52,7 +53,7 @@ func (r reconcilerWrapper) Reconcile(ctx context.Context, req reconcile.Request) // Under certain circumstances the ReconcileAfter time will be set to a later time via DeferNextReconcile / // DeferNextReconcileForObject, e.g. when we're waiting for Pods to terminate during node drain or // volumes to detach. This is done to ensure we're not spamming the workload cluster API server. - r.reconcileCache.Add(reconcileCacheEntry{Request: req, ReconcileAfter: reconcileStartTime.Add(1 * time.Second)}) + r.reconcileCache.Add(reconcileCacheEntry{Request: req, ReconcileAfter: reconcileStartTime.Add(r.rateLimitInterval)}) } // Update metrics after processing each item diff --git a/util/controller/controller_test.go b/util/controller/controller_test.go index 25fcbee21934..e620e5df0398 100644 --- a/util/controller/controller_test.go +++ b/util/controller/controller_test.go @@ -58,6 +58,7 @@ func TestReconcile(t *testing.T) { reconcileCounter++ return reconcile.Result{}, nil }), + rateLimitInterval: 1 * time.Second, } c := controllerWrapper{ reconcileCache: reconcileCache, @@ -149,8 +150,9 @@ func TestReconcileMetrics(t *testing.T) { g := NewWithT(t) r := reconcilerWrapper{ - name: "cluster", - reconcileCache: reconcileCache, + name: "cluster", + reconcileCache: reconcileCache, + rateLimitInterval: 1 * time.Second, } req := reconcile.Request{