Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions controlplane/kubeadm/internal/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,8 @@ type KubeadmControlPlaneReconciler struct {

RemoteConditionsGracePeriod time.Duration

managementCluster internal.ManagementCluster
managementClusterUncached internal.ManagementCluster
ssaCache ssa.Cache
managementCluster internal.ManagementCluster
ssaCache ssa.Cache

// Only used for testing.
overrideTryInPlaceUpdateFunc func(ctx context.Context, controlPlane *internal.ControlPlane, machineToInPlaceUpdate *clusterv1.Machine, machineUpToDateResult internal.UpToDateResult) (bool, ctrl.Result, error)
Expand Down Expand Up @@ -171,10 +170,6 @@ func (r *KubeadmControlPlaneReconciler) SetupWithManager(ctx context.Context, mg
}
}

if r.managementClusterUncached == nil {
r.managementClusterUncached = &internal.Management{Client: mgr.GetAPIReader()}
}

return nil
}

Expand Down Expand Up @@ -308,7 +303,7 @@ func (r *KubeadmControlPlaneReconciler) initControlPlaneScope(ctx context.Contex
}

// Read control plane machines
controlPlaneMachines, err := r.managementClusterUncached.GetControlPlaneMachinesForCluster(ctx, cluster)
controlPlaneMachines, err := r.managementCluster.GetControlPlaneMachinesForCluster(ctx, cluster)
if err != nil {
log.Error(err, "Failed to retrieve control plane machines for cluster")
return nil, false, err
Expand Down Expand Up @@ -756,6 +751,7 @@ func (r *KubeadmControlPlaneReconciler) reconcileDelete(ctx context.Context, con
continue
}

// Note: It's not critical that the next Reconcile observes the deletion, so we don't wait for the cache.
if err := r.Client.Delete(ctx, machineToDelete); err != nil && !apierrors.IsNotFound(err) {
errs = append(errs, errors.Wrapf(err, "failed to delete control plane Machine %s", klog.KObj(machineToDelete)))
}
Expand Down Expand Up @@ -1542,7 +1538,7 @@ func (r *KubeadmControlPlaneReconciler) adoptMachines(ctx context.Context, kcp *
// We do an uncached full quorum read against the KCP to avoid re-adopting Machines the garbage collector just intentionally orphaned
// See https://github.com/kubernetes/kubernetes/issues/42639
uncached := controlplanev1.KubeadmControlPlane{}
err := r.managementClusterUncached.Get(ctx, client.ObjectKey{Namespace: kcp.Namespace, Name: kcp.Name}, &uncached)
err := r.APIReader.Get(ctx, client.ObjectKey{Namespace: kcp.Namespace, Name: kcp.Name}, &uncached)
if err != nil {
return errors.Wrapf(err, "failed to check whether %v/%v was deleted before adoption", kcp.GetNamespace(), kcp.GetName())
}
Expand Down
54 changes: 19 additions & 35 deletions controlplane/kubeadm/internal/controllers/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,10 +553,10 @@ func TestKubeadmControlPlaneReconciler_adoption(t *testing.T) {
fakeClient := newFakeClient(objs...)
fmc.Reader = fakeClient
r := &KubeadmControlPlaneReconciler{
Client: fakeClient,
SecretCachingClient: fakeClient,
managementCluster: fmc,
managementClusterUncached: fmc,
Client: fakeClient,
APIReader: fakeClient,
SecretCachingClient: fakeClient,
managementCluster: fmc,
}

_, adoptableMachineFound, err := r.initControlPlaneScope(ctx, cluster, kcp)
Expand Down Expand Up @@ -645,10 +645,10 @@ func TestKubeadmControlPlaneReconciler_adoption(t *testing.T) {
fakeClient := newFakeClient(objs...)
fmc.Reader = fakeClient
r := &KubeadmControlPlaneReconciler{
Client: fakeClient,
SecretCachingClient: fakeClient,
managementCluster: fmc,
managementClusterUncached: fmc,
Client: fakeClient,
APIReader: fakeClient,
SecretCachingClient: fakeClient,
managementCluster: fmc,
}

_, adoptableMachineFound, err := r.initControlPlaneScope(ctx, cluster, kcp)
Expand Down Expand Up @@ -731,10 +731,10 @@ func TestKubeadmControlPlaneReconciler_adoption(t *testing.T) {
fakeClient := newFakeClient(objs...)
fmc.Reader = fakeClient
r := &KubeadmControlPlaneReconciler{
Client: fakeClient,
SecretCachingClient: fakeClient,
managementCluster: fmc,
managementClusterUncached: fmc,
Client: fakeClient,
APIReader: fakeClient,
SecretCachingClient: fakeClient,
managementCluster: fmc,
}

_, adoptableMachineFound, err := r.initControlPlaneScope(ctx, cluster, kcp)
Expand Down Expand Up @@ -784,11 +784,11 @@ func TestKubeadmControlPlaneReconciler_adoption(t *testing.T) {
fmc.Reader = fakeClient
recorder := record.NewFakeRecorder(32)
r := &KubeadmControlPlaneReconciler{
Client: fakeClient,
SecretCachingClient: fakeClient,
recorder: recorder,
managementCluster: fmc,
managementClusterUncached: fmc,
Client: fakeClient,
APIReader: fakeClient,
SecretCachingClient: fakeClient,
recorder: recorder,
managementCluster: fmc,
}

_, adoptableMachineFound, err := r.initControlPlaneScope(ctx, cluster, kcp)
Expand Down Expand Up @@ -1343,6 +1343,7 @@ kubernetesVersion: metav1.16.1
expectedLabels := map[string]string{clusterv1.ClusterNameLabel: "foo"}
r := &KubeadmControlPlaneReconciler{
Client: env,
APIReader: env,
SecretCachingClient: secretCachingClient,
recorder: record.NewFakeRecorder(32),
managementCluster: &fakeManagementCluster{
Expand All @@ -1354,15 +1355,6 @@ kubernetesVersion: metav1.16.1
Status: internal.ClusterStatus{},
},
},
managementClusterUncached: &fakeManagementCluster{
Management: &internal.Management{Client: env},
Workload: &fakeWorkloadCluster{
Workload: &internal.Workload{
Client: env,
},
Status: internal.ClusterStatus{},
},
},
ssaCache: ssa.NewCache("test-controller"),
}

Expand Down Expand Up @@ -1590,6 +1582,7 @@ kubernetesVersion: metav1.16.1`,

r := &KubeadmControlPlaneReconciler{
Client: env,
APIReader: env,
SecretCachingClient: secretCachingClient,
recorder: record.NewFakeRecorder(32),
managementCluster: &fakeManagementCluster{
Expand All @@ -1601,15 +1594,6 @@ kubernetesVersion: metav1.16.1`,
Status: internal.ClusterStatus{},
},
},
managementClusterUncached: &fakeManagementCluster{
Management: &internal.Management{Client: env},
Workload: &fakeWorkloadCluster{
Workload: &internal.Workload{
Client: env,
},
Status: internal.ClusterStatus{},
},
},
ssaCache: ssa.NewCache("test-controller"),
}

Expand Down
8 changes: 5 additions & 3 deletions controlplane/kubeadm/internal/controllers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"sigs.k8s.io/cluster-api/controllers/external"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/desiredstate"
clientutil "sigs.k8s.io/cluster-api/internal/util/client"
"sigs.k8s.io/cluster-api/internal/util/ssa"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/certs"
Expand Down Expand Up @@ -301,7 +302,8 @@ func (r *KubeadmControlPlaneReconciler) createMachine(ctx context.Context, kcp *
// Remove the annotation tracking that a remediation is in progress (the remediation completed when
// the replacement machine has been created above).
delete(kcp.Annotations, controlplanev1.RemediationInProgressAnnotation)
return nil

return clientutil.WaitForObjectsToBeAddedToTheCache(ctx, r.Client, "Machine creation", machine)
}

func (r *KubeadmControlPlaneReconciler) updateMachine(ctx context.Context, machine *clusterv1.Machine, kcp *controlplanev1.KubeadmControlPlane, cluster *clusterv1.Cluster) (*clusterv1.Machine, error) {
Expand All @@ -310,8 +312,8 @@ func (r *KubeadmControlPlaneReconciler) updateMachine(ctx context.Context, machi
return nil, errors.Wrap(err, "failed to apply Machine")
}

err = ssa.Patch(ctx, r.Client, kcpManagerName, updatedMachine, ssa.WithCachingProxy{Cache: r.ssaCache, Original: machine})
if err != nil {
// Note: It's not critical that the next Reconcile observes the changes applied here, so we don't wait for the cache.
if err := ssa.Patch(ctx, r.Client, kcpManagerName, updatedMachine, ssa.WithCachingProxy{Cache: r.ssaCache, Original: machine}); err != nil {
return nil, err
}
return updatedMachine, nil
Expand Down
8 changes: 7 additions & 1 deletion controlplane/kubeadm/internal/controllers/remediation.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/etcd/util"
"sigs.k8s.io/cluster-api/feature"
"sigs.k8s.io/cluster-api/internal/contract"
clientutil "sigs.k8s.io/cluster-api/internal/util/client"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/collections"
"sigs.k8s.io/cluster-api/util/conditions"
Expand Down Expand Up @@ -333,7 +334,7 @@ func (r *KubeadmControlPlaneReconciler) reconcileUnhealthyMachines(ctx context.C
}
}

// Delete the machine
// Delete the machine (waiting for cache to observe the deletion at the end of this func, so everything in between is always executed)
if err := r.Client.Delete(ctx, machineToBeRemediated); err != nil {
v1beta1conditions.MarkFalse(machineToBeRemediated, clusterv1.MachineOwnerRemediatedV1Beta1Condition, clusterv1.RemediationFailedV1Beta1Reason, clusterv1.ConditionSeverityError, "%s", err.Error())

Expand Down Expand Up @@ -372,6 +373,11 @@ func (r *KubeadmControlPlaneReconciler) reconcileUnhealthyMachines(ctx context.C
controlplanev1.RemediationInProgressAnnotation: remediationInProgressValue,
})

// Using DeepCopy() to avoid overwriting the Machine because we still need it to Patch the Machine in defer.
if err := clientutil.WaitForObjectsToBeDeletedFromTheCache(ctx, r.Client, "Machine deletion (remediating unhealthy Machine)", machineToBeRemediated.DeepCopy()); err != nil {
return ctrl.Result{}, err
}

return ctrl.Result{RequeueAfter: time.Millisecond}, nil // Technically there is no need to requeue here. Machine deletion above triggers reconciliation. But we have to return a non-zero Result so reconcile above returns.
}

Expand Down
5 changes: 5 additions & 0 deletions controlplane/kubeadm/internal/controllers/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
clusterv1 "sigs.k8s.io/cluster-api/api/core/v1beta2"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal"
"sigs.k8s.io/cluster-api/feature"
clientutil "sigs.k8s.io/cluster-api/internal/util/client"
"sigs.k8s.io/cluster-api/util/collections"
"sigs.k8s.io/cluster-api/util/conditions"
)
Expand Down Expand Up @@ -159,6 +160,10 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(
log.WithValues(controlPlane.StatusToLogKeyAndValues(nil, machineToDelete)...).
Info(fmt.Sprintf("Machine %s deleting (scale down)", klog.KObj(machineToDelete)), "Machine", klog.KObj(machineToDelete), "desiredReplicas", ptr.Deref(controlPlane.KCP.Spec.Replicas, 0), "replicas", len(controlPlane.Machines))

if err := clientutil.WaitForObjectsToBeDeletedFromTheCache(ctx, r.Client, "Machine deletion (scale down)", machineToDelete); err != nil {
return ctrl.Result{}, err
}

return ctrl.Result{}, nil // No need to requeue here. Machine deletion above triggers reconciliation.
}

Expand Down
20 changes: 9 additions & 11 deletions controlplane/kubeadm/internal/controllers/scale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestKubeadmControlPlaneReconciler_initializeControlPlane(t *testing.T) {
r := &KubeadmControlPlaneReconciler{
Client: env,
recorder: record.NewFakeRecorder(32),
managementClusterUncached: &fakeManagementCluster{
managementCluster: &fakeManagementCluster{
Management: &internal.Management{Client: env},
Workload: &fakeWorkloadCluster{},
},
Expand Down Expand Up @@ -152,10 +152,9 @@ func TestKubeadmControlPlaneReconciler_scaleUpControlPlane(t *testing.T) {
}

r := &KubeadmControlPlaneReconciler{
Client: env,
managementCluster: fmc,
managementClusterUncached: fmc,
recorder: record.NewFakeRecorder(32),
Client: env,
managementCluster: fmc,
recorder: record.NewFakeRecorder(32),
}
controlPlane := &internal.ControlPlane{
KCP: kcp,
Expand Down Expand Up @@ -223,12 +222,11 @@ func TestKubeadmControlPlaneReconciler_scaleUpControlPlane(t *testing.T) {
fc := capicontrollerutil.NewFakeController()

r := &KubeadmControlPlaneReconciler{
Client: env,
SecretCachingClient: secretCachingClient,
controller: fc,
managementCluster: fmc,
managementClusterUncached: fmc,
recorder: record.NewFakeRecorder(32),
Client: env,
SecretCachingClient: secretCachingClient,
controller: fc,
managementCluster: fmc,
recorder: record.NewFakeRecorder(32),
}

controlPlane, adoptableMachineFound, err := r.initControlPlaneScope(ctx, cluster, kcp)
Expand Down
13 changes: 3 additions & 10 deletions controlplane/kubeadm/internal/controllers/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,6 @@ func TestKubeadmControlPlaneReconciler_RolloutStrategy_ScaleUp(t *testing.T) {
Status: internal.ClusterStatus{Nodes: 1},
},
},
managementClusterUncached: &fakeManagementCluster{
Management: &internal.Management{Client: env},
Workload: &fakeWorkloadCluster{
Status: internal.ClusterStatus{Nodes: 1},
},
},
ssaCache: ssa.NewCache("test-controller"),
}
controlPlane := &internal.ControlPlane{
Expand Down Expand Up @@ -252,10 +246,9 @@ func TestKubeadmControlPlaneReconciler_RolloutStrategy_ScaleDown(t *testing.T) {
fakeClient := newFakeClient(objs...)
fmc.Reader = fakeClient
r := &KubeadmControlPlaneReconciler{
Client: fakeClient,
SecretCachingClient: fakeClient,
managementCluster: fmc,
managementClusterUncached: fmc,
Client: fakeClient,
SecretCachingClient: fakeClient,
managementCluster: fmc,
}

controlPlane := &internal.ControlPlane{
Expand Down
Loading