diff --git a/chart/templates/NOTES.txt b/chart/templates/NOTES.txt index fc79327ddf10d..4cf3844a49965 100644 --- a/chart/templates/NOTES.txt +++ b/chart/templates/NOTES.txt @@ -332,6 +332,14 @@ DEPRECATION WARNING: {{- end }} +{{- if not (empty .Values.workers.podManagementPolicy) }} + + DEPRECATION WARNING: + `workers.podManagementPolicy` has been renamed to `workers.celery.podManagementPolicy`. + Please change your values as support for the old name will be dropped in a future release. + +{{- end }} + {{ if (semverCompare ">=3.0.0" .Values.airflowVersion) }} ##################################################### # WARNING: You should set a static API secret key # diff --git a/chart/templates/workers/worker-deployment.yaml b/chart/templates/workers/worker-deployment.yaml index be7eb614dcf22..c0701a85a2f49 100644 --- a/chart/templates/workers/worker-deployment.yaml +++ b/chart/templates/workers/worker-deployment.yaml @@ -76,8 +76,8 @@ spec: tier: airflow component: worker release: {{ .Release.Name }} - {{- if and $persistence .Values.workers.podManagementPolicy }} - podManagementPolicy: {{ .Values.workers.podManagementPolicy }} + {{- if and $persistence (or .Values.workers.celery.podManagementPolicy .Values.workers.podManagementPolicy) }} + podManagementPolicy: {{ .Values.workers.celery.podManagementPolicy | default .Values.workers.podManagementPolicy }} {{- end }} {{- if and $persistence (or .Values.workers.celery.updateStrategy .Values.workers.updateStrategy) }} updateStrategy: {{- toYaml (.Values.workers.celery.updateStrategy | default .Values.workers.updateStrategy) | nindent 4 }} diff --git a/chart/values.schema.json b/chart/values.schema.json index f3132d6a00648..696a70d4de4e1 100644 --- a/chart/values.schema.json +++ b/chart/values.schema.json @@ -1725,7 +1725,7 @@ "default": null }, "podManagementPolicy": { - "description": "Specifies the policy for managing pods within the Airflow Celery worker. Only applicable to StatefulSet.", + "description": "Specifies the policy for managing pods within the Airflow Celery worker (deprecated, use `workers.celery.podManagementPolicy` instead). Only applicable to StatefulSet.", "type": [ "null", "string" @@ -2682,6 +2682,18 @@ } } }, + "podManagementPolicy": { + "description": "Specifies the policy for managing pods within the Airflow Celery worker. Only applicable to StatefulSet.", + "type": [ + "null", + "string" + ], + "default": null, + "enum": [ + "OrderedReady", + "Parallel" + ] + }, "serviceAccount": { "description": "Create ServiceAccount.", "type": "object", diff --git a/chart/values.yaml b/chart/values.yaml index be3bb953bebfe..d657485df4745 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -687,6 +687,7 @@ workers: maxUnavailable: "50%" # Allow relaxing ordering guarantees for Airflow Celery worker while preserving its uniqueness and identity + # (deprecated, use `workers.celery.podManagementPolicy` instead) # podManagementPolicy: Parallel # When not set, the values defined in the global securityContext will @@ -1065,6 +1066,10 @@ workers: maxSurge: "100%" maxUnavailable: "50%" + # Allow relaxing ordering guarantees for Airflow Celery worker + # while preserving its uniqueness and identity + # podManagementPolicy: Parallel + # Create ServiceAccount for Airflow Celery workers serviceAccount: # default value is true diff --git a/helm-tests/tests/helm_tests/airflow_core/test_worker.py b/helm-tests/tests/helm_tests/airflow_core/test_worker.py index 01d090dd16ba8..c00e2de01025b 100644 --- a/helm-tests/tests/helm_tests/airflow_core/test_worker.py +++ b/helm-tests/tests/helm_tests/airflow_core/test_worker.py @@ -18,7 +18,7 @@ import jmespath import pytest -from chart_utils.helm_template_generator import render_chart +from chart_utils.helm_template_generator import HelmFailedError, render_chart from chart_utils.log_groomer import LogGroomerTestBase @@ -1593,6 +1593,43 @@ def test_init_container_volume_permissions_not_exist(self, workers_values): == 0 ) + def test_pod_management_policy_default(self): + docs = render_chart(show_only=["templates/workers/worker-deployment.yaml"]) + assert jmespath.search("spec.podManagementPolicy", docs[0]) is None + + @pytest.mark.parametrize( + ("workers_values", "expected"), + [ + ({"podManagementPolicy": "Parallel"}, "Parallel"), + ({"podManagementPolicy": "OrderedReady"}, "OrderedReady"), + ({"celery": {"podManagementPolicy": "Parallel"}}, "Parallel"), + ({"celery": {"podManagementPolicy": "OrderedReady"}}, "OrderedReady"), + ( + {"podManagementPolicy": "OrderedReady", "celery": {"podManagementPolicy": "Parallel"}}, + "Parallel", + ), + ( + {"podManagementPolicy": "Parallel", "celery": {"podManagementPolicy": "OrderedReady"}}, + "OrderedReady", + ), + ], + ) + def test_pod_management_policy(self, workers_values, expected): + docs = render_chart( + values={"workers": workers_values}, show_only=["templates/workers/worker-deployment.yaml"] + ) + + assert jmespath.search("spec.podManagementPolicy", docs[0]) == expected + + @pytest.mark.parametrize( + "workers_values", [{"podManagementPolicy": "Test"}, {"celery": {"podManagementPolicy": "Test"}}] + ) + def test_pod_management_policy_not_valid_value(self, workers_values): + with pytest.raises(HelmFailedError): + render_chart( + values={"workers": workers_values}, show_only=["templates/workers/worker-deployment.yaml"] + ) + class TestWorkerLogGroomer(LogGroomerTestBase): """Worker groomer."""