Skip to content
52 changes: 52 additions & 0 deletions .cursor/plans/fix_log.dirs_disk_removal_fe030094.plan.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
---
name: Fix log.dirs disk removal
overview: Identify why removed `additionalDisks` entries are still present in broker `log.dirs` and define a status-aware reconciliation strategy so `log.dirs` is updated once disk removal completes, with unit and e2e coverage.
todos:
- id: implement-effective-logdirs
content: Add status-aware helper for effective log.dirs and wire it into config generation in pkg/resources/kafka/configmap.go
status: completed
- id: stabilize-unit-tests
content: Finalize and run unit tests for effective log.dirs behavior in pkg/resources/kafka/configmap_test.go
status: completed
- id: validate-e2e-flow
content: Verify multidisk removal e2e assertions and sequencing in tests/e2e/test_multidisk_removal.go and tests/e2e/koperator_suite_test.go
status: completed
isProject: false
---

# Fix `log.dirs` After Disk Removal

## Problem Analysis

- Current config generation in [koperator/pkg/resources/kafka/configmap.go](koperator/pkg/resources/kafka/configmap.go) always merges old + new mount paths:
- `mountPathsMerged, isMountPathRemoved := mergeMountPaths(mountPathsOld, mountPathsNew)`
- This preserves removed paths indefinitely, even after `GracefulDiskRemovalSucceeded` and PVC deletion.
- Disk-removal lifecycle state is already tracked in [koperator/pkg/resources/kafka/kafka.go](koperator/pkg/resources/kafka/kafka.go) (`GracefulActionState.VolumeStates`) and state semantics are defined in [koperator/api/v1beta1/common_types.go](koperator/api/v1beta1/common_types.go).
- Your new tests indicate the intended behavior: keep removed path while removal/rebalance is active, drop it when state is missing or succeeded.

## Proposed Solution

- Replace unconditional old+new merge for `log.dirs` with a status-aware effective set:
- Keep all paths currently in spec (`mountPathsNew`).
- For paths present only in old config (`mountPathsOld - mountPathsNew`), keep **only** if broker volume state for that mount path is active:
- `CruiseControlVolumeState.IsDiskRemoval()` OR `CruiseControlVolumeState.IsDiskRebalance()`.
- Drop removed paths when state is absent or `IsDiskRemovalSucceeded()`.
- Implement helper in [koperator/pkg/resources/kafka/configmap.go](koperator/pkg/resources/kafka/configmap.go):
- `getEffectiveLogDirsMountPaths(mountPathsOld, mountPathsNew, brokerID, kafkaCluster)`
- Use this helper in `getConfigProperties()` when setting `log.dirs`.

## Test Plan

- Unit tests in [koperator/pkg/resources/kafka/configmap_test.go](koperator/pkg/resources/kafka/configmap_test.go):
- Keep your added `TestGetEffectiveLogDirsMountPaths` and ensure coverage includes:
- no state -> drop removed path
- removal/rebalance active -> keep removed path
- removal succeeded -> drop removed path
- E2E in [koperator/tests/e2e/test_multidisk_removal.go](koperator/tests/e2e/test_multidisk_removal.go):
- Install multidisk sample, then apply single-disk sample, assert broker configmaps no longer contain removed path.
- Suite wiring in [koperator/tests/e2e/koperator_suite_test.go](koperator/tests/e2e/koperator_suite_test.go) is already aligned.

## Expected Outcome

- During removal in progress, `log.dirs` remains stable and safe for CC workflows.
- After successful completion and cleanup, removed disk paths disappear from `log.dirs`, preventing writes to unintended/root filesystem paths.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ BIN_DIR := $(PROJECT_DIR)/bin
BOILERPLATE_DIR := $(PROJECT_DIR)/hack/boilerplate

# Image URL to use all building/pushing image targets
TAG ?= $(shell git describe --tags --abbrev=0 --match '[0-9].*[0-9].*[0-9]*' 2>/dev/null )
TAG ?= $(shell git describe --tags --dirty --always --abbrev=0 --match '[0-9].*[0-9].*[0-9]*' 2>/dev/null)
IMG ?= ghcr.io/adobe/kafka-operator:$(TAG)

# Produce CRDs that work back to Kubernetes 1.11 (no version conversion)
Expand Down Expand Up @@ -171,7 +171,7 @@ test: generate fmt vet bin/setup-envtest
cd third_party/github.com/banzaicloud/go-cruise-control && \
go test -v -parallel 2 -failfast ./... -cover -covermode=count -coverprofile cover.out -test.v -test.paniconexit0

# Run e2e tests
# Run e2e tests. Set IMG_E2E to use a custom operator image; otherwise the chart default is used.
test-e2e:
cd tests/e2e && IMG_E2E=${IMG_E2E} go test . \
-v \
Expand Down
281 changes: 281 additions & 0 deletions config/samples/simplekafkacluster_2disk.yaml

Large diffs are not rendered by default.

295 changes: 295 additions & 0 deletions config/samples/simplekafkacluster_4disk.yaml

Large diffs are not rendered by default.

70 changes: 44 additions & 26 deletions pkg/resources/kafka/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,7 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, broker v
}

mountPathsNew := generateStorageConfig(bConfig.StorageConfigs)
mountPathsMerged, isMountPathRemoved := mergeMountPaths(mountPathsOld, mountPathsNew)

if isMountPathRemoved {
log.Error(errors.New("removed storage is found in the KafkaCluster CR"),
"removing storage from broker is not supported", v1beta1.BrokerIdLabelKey, broker.Id, "mountPaths",
mountPathsOld, "mountPaths in kafkaCluster CR ", mountPathsNew)
}
mountPathsMerged := getEffectiveLogDirsMountPaths(mountPathsOld, mountPathsNew, fmt.Sprintf("%d", broker.Id), r.KafkaCluster)

if len(mountPathsMerged) != 0 {
if err := config.Set(kafkautils.KafkaConfigBrokerLogDirectory, strings.Join(mountPathsMerged, ",")); err != nil {
Expand Down Expand Up @@ -291,29 +285,53 @@ func configureBrokerZKMode(brokerID int32, kafkaCluster *v1beta1.KafkaCluster, c
}
}

// mergeMountPaths is merges the new mountPaths with the old.
// It returns the merged []string and a bool which true or false depend on mountPathsNew contains or not all of the elements of the mountPathsOld
func mergeMountPaths(mountPathsOld, mountPathsNew []string) ([]string, bool) {
var mountPathsMerged []string
mountPathsMerged = append(mountPathsMerged, mountPathsNew...)
isMountPathRemoved := false
// Merging the new mountPaths with the old. If any of them is removed we can check the difference in the mountPathsOldLen
for i := range mountPathsOld {
found := false
for k := range mountPathsNew {
if mountPathsOld[i] == mountPathsNew[k] {
found = true
break
}
func getEffectiveLogDirsMountPaths(mountPathsOld, mountPathsNew []string, brokerID string, kafkaCluster *v1beta1.KafkaCluster) []string {
mountPathsEffective := append([]string{}, mountPathsNew...)
if len(mountPathsOld) == 0 {
return mountPathsEffective
}

newMountPathsSet := make(map[string]struct{}, len(mountPathsNew))
for _, path := range mountPathsNew {
newMountPathsSet[path] = struct{}{}
}

for _, oldPath := range mountPathsOld {
if _, found := newMountPathsSet[oldPath]; found {
continue
}
// if this is a new mountPath then add it to the current
if !found {
mountPathsMerged = append(mountPathsMerged, mountPathsOld[i])
isMountPathRemoved = true

if shouldKeepRemovedLogDirInConfig(oldPath, brokerID, kafkaCluster) {
mountPathsEffective = append(mountPathsEffective, oldPath)
}
}

return mountPathsMerged, isMountPathRemoved
return mountPathsEffective
}

func shouldKeepRemovedLogDirInConfig(logDirPath, brokerID string, kafkaCluster *v1beta1.KafkaCluster) bool {
if kafkaCluster == nil {
return false
}

brokerState, found := kafkaCluster.Status.BrokersState[brokerID]
if !found || brokerState.GracefulActionState.VolumeStates == nil {
return false
}

volumePath := strings.TrimSuffix(logDirPath, "/kafka")
volumeState, found := brokerState.GracefulActionState.VolumeStates[volumePath]
if !found {
return false
}

// Keep removed path until removal/rebalance is confirmed succeeded; drop only when state succeeded.
// On error or paused (unconfirmed success), keep the path to avoid data loss and allow retry.
s := volumeState.CruiseControlVolumeState
if s.IsDiskRemovalSucceeded() || s.IsDiskRebalanceSucceeded() {
return false
}
return s.IsDiskRemoval() || s.IsDiskRebalance()
}

func generateSuperUsers(users []string) (suStrings []string) {
Expand Down
171 changes: 135 additions & 36 deletions pkg/resources/kafka/configmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,56 +73,155 @@ zookeeper.connect=zookeeper-server-client.zookeeper:2181/
}
}

func TestMergeMountPaths(t *testing.T) {
func TestGetEffectiveLogDirsMountPaths(t *testing.T) {
tests := []struct {
testName string
mountPathNew []string
mountPathOld []string
expectedMergedMountPath []string
expectedRemoved bool
testName string
mountPathsOld []string
mountPathsNew []string
brokerID string
kafkaCluster *v1beta1.KafkaCluster
expectedEffective []string
}{
{
testName: "no old mountPath",
mountPathNew: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"},
mountPathOld: []string{},
expectedMergedMountPath: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"},
expectedRemoved: false,
testName: "no broker state - effective is mountPathsNew only",
mountPathsOld: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka"},
mountPathsNew: []string{"/kafka-logs/kafka"},
brokerID: "0",
kafkaCluster: nil,
expectedEffective: []string{"/kafka-logs/kafka"},
},
{
testName: "same",
mountPathNew: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"},
mountPathOld: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"},
expectedMergedMountPath: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"},
expectedRemoved: false,
testName: "nil VolumeStates - effective is mountPathsNew only",
mountPathsOld: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka"},
mountPathsNew: []string{"/kafka-logs/kafka"},
brokerID: "0",
kafkaCluster: &v1beta1.KafkaCluster{Status: v1beta1.KafkaClusterStatus{BrokersState: map[string]v1beta1.BrokerState{"0": {GracefulActionState: v1beta1.GracefulActionState{VolumeStates: nil}}}}},
expectedEffective: []string{"/kafka-logs/kafka"},
},
{
testName: "changed order",
mountPathNew: []string{"/kafka-logs/kafka", "/kafka-logs3/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"},
mountPathOld: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"},
expectedMergedMountPath: []string{"/kafka-logs/kafka", "/kafka-logs3/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"},
expectedRemoved: false,
testName: "removed path with VolumeState in progress (GracefulDiskRemovalRequired) - path kept in effective",
mountPathsOld: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka"},
mountPathsNew: []string{"/kafka-logs/kafka"},
brokerID: "0",
kafkaCluster: &v1beta1.KafkaCluster{
Status: v1beta1.KafkaClusterStatus{
BrokersState: map[string]v1beta1.BrokerState{
"0": {
GracefulActionState: v1beta1.GracefulActionState{
VolumeStates: map[string]v1beta1.VolumeState{
"/kafka-logs2": {CruiseControlVolumeState: v1beta1.GracefulDiskRemovalRequired},
},
},
},
},
},
},
expectedEffective: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka"},
},
{
testName: "removed path with state not found - path not in effective",
mountPathsOld: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka"},
mountPathsNew: []string{"/kafka-logs/kafka"},
brokerID: "0",
kafkaCluster: &v1beta1.KafkaCluster{
Status: v1beta1.KafkaClusterStatus{
BrokersState: map[string]v1beta1.BrokerState{
"0": {
GracefulActionState: v1beta1.GracefulActionState{
VolumeStates: map[string]v1beta1.VolumeState{},
},
},
},
},
},
expectedEffective: []string{"/kafka-logs/kafka"},
},
{
testName: "removed path with IsDiskRemovalSucceeded - path not in effective",
mountPathsOld: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka"},
mountPathsNew: []string{"/kafka-logs/kafka"},
brokerID: "0",
kafkaCluster: &v1beta1.KafkaCluster{
Status: v1beta1.KafkaClusterStatus{
BrokersState: map[string]v1beta1.BrokerState{
"0": {
GracefulActionState: v1beta1.GracefulActionState{
VolumeStates: map[string]v1beta1.VolumeState{
"/kafka-logs2": {CruiseControlVolumeState: v1beta1.GracefulDiskRemovalSucceeded},
},
},
},
},
},
},
expectedEffective: []string{"/kafka-logs/kafka"},
},
{
testName: "removed path with IsDiskRebalance - path kept in effective",
mountPathsOld: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka"},
mountPathsNew: []string{"/kafka-logs/kafka"},
brokerID: "0",
kafkaCluster: &v1beta1.KafkaCluster{
Status: v1beta1.KafkaClusterStatus{
BrokersState: map[string]v1beta1.BrokerState{
"0": {
GracefulActionState: v1beta1.GracefulActionState{
VolumeStates: map[string]v1beta1.VolumeState{
"/kafka-logs2": {CruiseControlVolumeState: v1beta1.GracefulDiskRebalanceRequired},
},
},
},
},
},
},
expectedEffective: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka"},
},
{
testName: "removed one",
mountPathNew: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"},
mountPathOld: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"},
expectedMergedMountPath: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka", "/kafka-logs3/kafka"},
expectedRemoved: true,
testName: "removed path with disk removal completed with error - path kept in effective (unconfirmed success, avoid data loss)",
mountPathsOld: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka"},
mountPathsNew: []string{"/kafka-logs/kafka"},
brokerID: "0",
kafkaCluster: &v1beta1.KafkaCluster{
Status: v1beta1.KafkaClusterStatus{
BrokersState: map[string]v1beta1.BrokerState{
"0": {
GracefulActionState: v1beta1.GracefulActionState{
VolumeStates: map[string]v1beta1.VolumeState{
"/kafka-logs2": {CruiseControlVolumeState: v1beta1.GracefulDiskRemovalCompletedWithError},
},
},
},
},
},
},
expectedEffective: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka"},
},
{
testName: "removed all",
mountPathNew: []string{},
mountPathOld: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"},
expectedMergedMountPath: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"},
expectedRemoved: true,
testName: "removed path with disk rebalance paused - path kept in effective (unconfirmed success, avoid data loss)",
mountPathsOld: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka"},
mountPathsNew: []string{"/kafka-logs/kafka"},
brokerID: "0",
kafkaCluster: &v1beta1.KafkaCluster{
Status: v1beta1.KafkaClusterStatus{
BrokersState: map[string]v1beta1.BrokerState{
"0": {
GracefulActionState: v1beta1.GracefulActionState{
VolumeStates: map[string]v1beta1.VolumeState{
"/kafka-logs2": {CruiseControlVolumeState: v1beta1.GracefulDiskRebalancePaused},
},
},
},
},
},
},
expectedEffective: []string{"/kafka-logs/kafka", "/kafka-logs2/kafka"},
},
}
for _, test := range tests {
mergedMountPaths, isRemoved := mergeMountPaths(test.mountPathOld, test.mountPathNew)
if !reflect.DeepEqual(mergedMountPaths, test.expectedMergedMountPath) {
t.Errorf("testName: %s, expected: %s, got: %s", test.testName, test.expectedMergedMountPath, mergedMountPaths)
}
require.Equal(t, test.expectedRemoved, isRemoved)
t.Run(test.testName, func(t *testing.T) {
got := getEffectiveLogDirsMountPaths(test.mountPathsOld, test.mountPathsNew, test.brokerID, test.kafkaCluster)
require.Equal(t, test.expectedEffective, got, "effective log dirs")
})
}
}

Expand Down
4 changes: 4 additions & 0 deletions tests/e2e/koperator_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,11 @@ var _ = ginkgo.When("Testing e2e test altogether", ginkgo.Ordered, func() {
testProduceConsumeInternalSSL(defaultTLSSecretName)
testJmxExporter()
testUninstallKafkaCluster()
testInstallKafkaCluster("../../config/samples/simplekafkacluster_4disk.yaml")
testMultiDiskRemoval()
testUninstallKafkaCluster()
testUninstallZookeeperCluster()
// kraft tests
testInstallKafkaCluster("../../config/samples/kraft/simplekafkacluster_kraft.yaml")
testProduceConsumeInternal()
testJmxExporter()
Expand Down
Loading
Loading