diff --git a/controlplane/kubeadm/internal/controllers/controller.go b/controlplane/kubeadm/internal/controllers/controller.go index 2f3f60cc1a3b..2b7d0c9b869d 100644 --- a/controlplane/kubeadm/internal/controllers/controller.go +++ b/controlplane/kubeadm/internal/controllers/controller.go @@ -100,9 +100,8 @@ type KubeadmControlPlaneReconciler struct { RemoteConditionsGracePeriod time.Duration - managementCluster internal.ManagementCluster - managementClusterUncached internal.ManagementCluster - ssaCache ssa.Cache + managementCluster internal.ManagementCluster + ssaCache ssa.Cache // Only used for testing. overrideTryInPlaceUpdateFunc func(ctx context.Context, controlPlane *internal.ControlPlane, machineToInPlaceUpdate *clusterv1.Machine, machineUpToDateResult internal.UpToDateResult) (bool, ctrl.Result, error) @@ -171,10 +170,6 @@ func (r *KubeadmControlPlaneReconciler) SetupWithManager(ctx context.Context, mg } } - if r.managementClusterUncached == nil { - r.managementClusterUncached = &internal.Management{Client: mgr.GetAPIReader()} - } - return nil } @@ -308,7 +303,7 @@ func (r *KubeadmControlPlaneReconciler) initControlPlaneScope(ctx context.Contex } // Read control plane machines - controlPlaneMachines, err := r.managementClusterUncached.GetControlPlaneMachinesForCluster(ctx, cluster) + controlPlaneMachines, err := r.managementCluster.GetControlPlaneMachinesForCluster(ctx, cluster) if err != nil { log.Error(err, "Failed to retrieve control plane machines for cluster") return nil, false, err @@ -756,6 +751,7 @@ func (r *KubeadmControlPlaneReconciler) reconcileDelete(ctx context.Context, con continue } + // Note: It's not critical that the next Reconcile observes the deletion, so we don't wait for the cache. if err := r.Client.Delete(ctx, machineToDelete); err != nil && !apierrors.IsNotFound(err) { errs = append(errs, errors.Wrapf(err, "failed to delete control plane Machine %s", klog.KObj(machineToDelete))) } @@ -1542,7 +1538,7 @@ func (r *KubeadmControlPlaneReconciler) adoptMachines(ctx context.Context, kcp * // We do an uncached full quorum read against the KCP to avoid re-adopting Machines the garbage collector just intentionally orphaned // See https://github.com/kubernetes/kubernetes/issues/42639 uncached := controlplanev1.KubeadmControlPlane{} - err := r.managementClusterUncached.Get(ctx, client.ObjectKey{Namespace: kcp.Namespace, Name: kcp.Name}, &uncached) + err := r.APIReader.Get(ctx, client.ObjectKey{Namespace: kcp.Namespace, Name: kcp.Name}, &uncached) if err != nil { return errors.Wrapf(err, "failed to check whether %v/%v was deleted before adoption", kcp.GetNamespace(), kcp.GetName()) } diff --git a/controlplane/kubeadm/internal/controllers/controller_test.go b/controlplane/kubeadm/internal/controllers/controller_test.go index f6fea59c57d3..1957dbb4b2ff 100644 --- a/controlplane/kubeadm/internal/controllers/controller_test.go +++ b/controlplane/kubeadm/internal/controllers/controller_test.go @@ -553,10 +553,10 @@ func TestKubeadmControlPlaneReconciler_adoption(t *testing.T) { fakeClient := newFakeClient(objs...) fmc.Reader = fakeClient r := &KubeadmControlPlaneReconciler{ - Client: fakeClient, - SecretCachingClient: fakeClient, - managementCluster: fmc, - managementClusterUncached: fmc, + Client: fakeClient, + APIReader: fakeClient, + SecretCachingClient: fakeClient, + managementCluster: fmc, } _, adoptableMachineFound, err := r.initControlPlaneScope(ctx, cluster, kcp) @@ -645,10 +645,10 @@ func TestKubeadmControlPlaneReconciler_adoption(t *testing.T) { fakeClient := newFakeClient(objs...) fmc.Reader = fakeClient r := &KubeadmControlPlaneReconciler{ - Client: fakeClient, - SecretCachingClient: fakeClient, - managementCluster: fmc, - managementClusterUncached: fmc, + Client: fakeClient, + APIReader: fakeClient, + SecretCachingClient: fakeClient, + managementCluster: fmc, } _, adoptableMachineFound, err := r.initControlPlaneScope(ctx, cluster, kcp) @@ -731,10 +731,10 @@ func TestKubeadmControlPlaneReconciler_adoption(t *testing.T) { fakeClient := newFakeClient(objs...) fmc.Reader = fakeClient r := &KubeadmControlPlaneReconciler{ - Client: fakeClient, - SecretCachingClient: fakeClient, - managementCluster: fmc, - managementClusterUncached: fmc, + Client: fakeClient, + APIReader: fakeClient, + SecretCachingClient: fakeClient, + managementCluster: fmc, } _, adoptableMachineFound, err := r.initControlPlaneScope(ctx, cluster, kcp) @@ -784,11 +784,11 @@ func TestKubeadmControlPlaneReconciler_adoption(t *testing.T) { fmc.Reader = fakeClient recorder := record.NewFakeRecorder(32) r := &KubeadmControlPlaneReconciler{ - Client: fakeClient, - SecretCachingClient: fakeClient, - recorder: recorder, - managementCluster: fmc, - managementClusterUncached: fmc, + Client: fakeClient, + APIReader: fakeClient, + SecretCachingClient: fakeClient, + recorder: recorder, + managementCluster: fmc, } _, adoptableMachineFound, err := r.initControlPlaneScope(ctx, cluster, kcp) @@ -1343,6 +1343,7 @@ kubernetesVersion: metav1.16.1 expectedLabels := map[string]string{clusterv1.ClusterNameLabel: "foo"} r := &KubeadmControlPlaneReconciler{ Client: env, + APIReader: env, SecretCachingClient: secretCachingClient, recorder: record.NewFakeRecorder(32), managementCluster: &fakeManagementCluster{ @@ -1354,15 +1355,6 @@ kubernetesVersion: metav1.16.1 Status: internal.ClusterStatus{}, }, }, - managementClusterUncached: &fakeManagementCluster{ - Management: &internal.Management{Client: env}, - Workload: &fakeWorkloadCluster{ - Workload: &internal.Workload{ - Client: env, - }, - Status: internal.ClusterStatus{}, - }, - }, ssaCache: ssa.NewCache("test-controller"), } @@ -1590,6 +1582,7 @@ kubernetesVersion: metav1.16.1`, r := &KubeadmControlPlaneReconciler{ Client: env, + APIReader: env, SecretCachingClient: secretCachingClient, recorder: record.NewFakeRecorder(32), managementCluster: &fakeManagementCluster{ @@ -1601,15 +1594,6 @@ kubernetesVersion: metav1.16.1`, Status: internal.ClusterStatus{}, }, }, - managementClusterUncached: &fakeManagementCluster{ - Management: &internal.Management{Client: env}, - Workload: &fakeWorkloadCluster{ - Workload: &internal.Workload{ - Client: env, - }, - Status: internal.ClusterStatus{}, - }, - }, ssaCache: ssa.NewCache("test-controller"), } diff --git a/controlplane/kubeadm/internal/controllers/helpers.go b/controlplane/kubeadm/internal/controllers/helpers.go index 0cbf38b57206..54e829095d71 100644 --- a/controlplane/kubeadm/internal/controllers/helpers.go +++ b/controlplane/kubeadm/internal/controllers/helpers.go @@ -36,6 +36,7 @@ import ( "sigs.k8s.io/cluster-api/controllers/external" "sigs.k8s.io/cluster-api/controlplane/kubeadm/internal" "sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/desiredstate" + clientutil "sigs.k8s.io/cluster-api/internal/util/client" "sigs.k8s.io/cluster-api/internal/util/ssa" "sigs.k8s.io/cluster-api/util" "sigs.k8s.io/cluster-api/util/certs" @@ -301,7 +302,8 @@ func (r *KubeadmControlPlaneReconciler) createMachine(ctx context.Context, kcp * // Remove the annotation tracking that a remediation is in progress (the remediation completed when // the replacement machine has been created above). delete(kcp.Annotations, controlplanev1.RemediationInProgressAnnotation) - return nil + + return clientutil.WaitForObjectsToBeAddedToTheCache(ctx, r.Client, "Machine creation", machine) } func (r *KubeadmControlPlaneReconciler) updateMachine(ctx context.Context, machine *clusterv1.Machine, kcp *controlplanev1.KubeadmControlPlane, cluster *clusterv1.Cluster) (*clusterv1.Machine, error) { @@ -310,8 +312,8 @@ func (r *KubeadmControlPlaneReconciler) updateMachine(ctx context.Context, machi return nil, errors.Wrap(err, "failed to apply Machine") } - err = ssa.Patch(ctx, r.Client, kcpManagerName, updatedMachine, ssa.WithCachingProxy{Cache: r.ssaCache, Original: machine}) - if err != nil { + // Note: It's not critical that the next Reconcile observes the changes applied here, so we don't wait for the cache. + if err := ssa.Patch(ctx, r.Client, kcpManagerName, updatedMachine, ssa.WithCachingProxy{Cache: r.ssaCache, Original: machine}); err != nil { return nil, err } return updatedMachine, nil diff --git a/controlplane/kubeadm/internal/controllers/remediation.go b/controlplane/kubeadm/internal/controllers/remediation.go index 9977c6dcecdc..00fe010c6ee4 100644 --- a/controlplane/kubeadm/internal/controllers/remediation.go +++ b/controlplane/kubeadm/internal/controllers/remediation.go @@ -42,6 +42,7 @@ import ( "sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/etcd/util" "sigs.k8s.io/cluster-api/feature" "sigs.k8s.io/cluster-api/internal/contract" + clientutil "sigs.k8s.io/cluster-api/internal/util/client" "sigs.k8s.io/cluster-api/util/annotations" "sigs.k8s.io/cluster-api/util/collections" "sigs.k8s.io/cluster-api/util/conditions" @@ -333,7 +334,7 @@ func (r *KubeadmControlPlaneReconciler) reconcileUnhealthyMachines(ctx context.C } } - // Delete the machine + // Delete the machine (waiting for cache to observe the deletion at the end of this func, so everything in between is always executed) if err := r.Client.Delete(ctx, machineToBeRemediated); err != nil { v1beta1conditions.MarkFalse(machineToBeRemediated, clusterv1.MachineOwnerRemediatedV1Beta1Condition, clusterv1.RemediationFailedV1Beta1Reason, clusterv1.ConditionSeverityError, "%s", err.Error()) @@ -372,6 +373,11 @@ func (r *KubeadmControlPlaneReconciler) reconcileUnhealthyMachines(ctx context.C controlplanev1.RemediationInProgressAnnotation: remediationInProgressValue, }) + // Using DeepCopy() to avoid overwriting the Machine because we still need it to Patch the Machine in defer. + if err := clientutil.WaitForObjectsToBeDeletedFromTheCache(ctx, r.Client, "Machine deletion (remediating unhealthy Machine)", machineToBeRemediated.DeepCopy()); err != nil { + return ctrl.Result{}, err + } + return ctrl.Result{RequeueAfter: time.Millisecond}, nil // Technically there is no need to requeue here. Machine deletion above triggers reconciliation. But we have to return a non-zero Result so reconcile above returns. } diff --git a/controlplane/kubeadm/internal/controllers/scale.go b/controlplane/kubeadm/internal/controllers/scale.go index 6d1e59aaf6e7..d69d1756d673 100644 --- a/controlplane/kubeadm/internal/controllers/scale.go +++ b/controlplane/kubeadm/internal/controllers/scale.go @@ -35,6 +35,7 @@ import ( clusterv1 "sigs.k8s.io/cluster-api/api/core/v1beta2" "sigs.k8s.io/cluster-api/controlplane/kubeadm/internal" "sigs.k8s.io/cluster-api/feature" + clientutil "sigs.k8s.io/cluster-api/internal/util/client" "sigs.k8s.io/cluster-api/util/collections" "sigs.k8s.io/cluster-api/util/conditions" ) @@ -159,6 +160,10 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane( log.WithValues(controlPlane.StatusToLogKeyAndValues(nil, machineToDelete)...). Info(fmt.Sprintf("Machine %s deleting (scale down)", klog.KObj(machineToDelete)), "Machine", klog.KObj(machineToDelete), "desiredReplicas", ptr.Deref(controlPlane.KCP.Spec.Replicas, 0), "replicas", len(controlPlane.Machines)) + if err := clientutil.WaitForObjectsToBeDeletedFromTheCache(ctx, r.Client, "Machine deletion (scale down)", machineToDelete); err != nil { + return ctrl.Result{}, err + } + return ctrl.Result{}, nil // No need to requeue here. Machine deletion above triggers reconciliation. } diff --git a/controlplane/kubeadm/internal/controllers/scale_test.go b/controlplane/kubeadm/internal/controllers/scale_test.go index 1bb3e1b3192e..b3fc3561082a 100644 --- a/controlplane/kubeadm/internal/controllers/scale_test.go +++ b/controlplane/kubeadm/internal/controllers/scale_test.go @@ -73,7 +73,7 @@ func TestKubeadmControlPlaneReconciler_initializeControlPlane(t *testing.T) { r := &KubeadmControlPlaneReconciler{ Client: env, recorder: record.NewFakeRecorder(32), - managementClusterUncached: &fakeManagementCluster{ + managementCluster: &fakeManagementCluster{ Management: &internal.Management{Client: env}, Workload: &fakeWorkloadCluster{}, }, @@ -152,10 +152,9 @@ func TestKubeadmControlPlaneReconciler_scaleUpControlPlane(t *testing.T) { } r := &KubeadmControlPlaneReconciler{ - Client: env, - managementCluster: fmc, - managementClusterUncached: fmc, - recorder: record.NewFakeRecorder(32), + Client: env, + managementCluster: fmc, + recorder: record.NewFakeRecorder(32), } controlPlane := &internal.ControlPlane{ KCP: kcp, @@ -223,12 +222,11 @@ func TestKubeadmControlPlaneReconciler_scaleUpControlPlane(t *testing.T) { fc := capicontrollerutil.NewFakeController() r := &KubeadmControlPlaneReconciler{ - Client: env, - SecretCachingClient: secretCachingClient, - controller: fc, - managementCluster: fmc, - managementClusterUncached: fmc, - recorder: record.NewFakeRecorder(32), + Client: env, + SecretCachingClient: secretCachingClient, + controller: fc, + managementCluster: fmc, + recorder: record.NewFakeRecorder(32), } controlPlane, adoptableMachineFound, err := r.initControlPlaneScope(ctx, cluster, kcp) diff --git a/controlplane/kubeadm/internal/controllers/update_test.go b/controlplane/kubeadm/internal/controllers/update_test.go index ef219fb22eb1..332eb6f17e61 100644 --- a/controlplane/kubeadm/internal/controllers/update_test.go +++ b/controlplane/kubeadm/internal/controllers/update_test.go @@ -100,12 +100,6 @@ func TestKubeadmControlPlaneReconciler_RolloutStrategy_ScaleUp(t *testing.T) { Status: internal.ClusterStatus{Nodes: 1}, }, }, - managementClusterUncached: &fakeManagementCluster{ - Management: &internal.Management{Client: env}, - Workload: &fakeWorkloadCluster{ - Status: internal.ClusterStatus{Nodes: 1}, - }, - }, ssaCache: ssa.NewCache("test-controller"), } controlPlane := &internal.ControlPlane{ @@ -252,10 +246,9 @@ func TestKubeadmControlPlaneReconciler_RolloutStrategy_ScaleDown(t *testing.T) { fakeClient := newFakeClient(objs...) fmc.Reader = fakeClient r := &KubeadmControlPlaneReconciler{ - Client: fakeClient, - SecretCachingClient: fakeClient, - managementCluster: fmc, - managementClusterUncached: fmc, + Client: fakeClient, + SecretCachingClient: fakeClient, + managementCluster: fmc, } controlPlane := &internal.ControlPlane{