diff --git a/chart/templates/_helpers.yaml b/chart/templates/_helpers.yaml index f76d3f0aac6fd..042ac491a9e42 100644 --- a/chart/templates/_helpers.yaml +++ b/chart/templates/_helpers.yaml @@ -1147,3 +1147,18 @@ capabilities: app: keda-operator {{- end }} {{- end }} + +{{/* +Convert a Kubernetes CPU limit (e.g., "500m", "1.5", "2", "750m") into an integer number of CPU cores. +*/}} +{{- define "cpu_count" -}} + {{- $v := toString . -}} + {{- if hasSuffix "m" $v -}} + {{- /* millicores path: e.g. 500m, 1500m */ -}} + {{- $m := float64 (trimSuffix "m" $v) -}} + {{- int (ceil (divf $m 1000)) -}} + {{- else -}} + {{- /* plain cores: e.g. 0.5, 1, 1.5 */ -}} + {{- int (ceil (float64 $v)) -}} + {{- end -}} +{{- end }} diff --git a/chart/values.yaml b/chart/values.yaml index b4d8d8f15619c..128700c58831d 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -3011,6 +3011,7 @@ config: celery: flower_url_prefix: '{{ ternary "" .Values.ingress.flower.path (eq .Values.ingress.flower.path "/") }}' worker_concurrency: 16 + sync_parallelism: '{{ include "cpu_count" (((.Values.scheduler).resources).limits).cpu }}' scheduler: standalone_dag_processor: '{{ ternary "True" "False" (or (semverCompare ">=3.0.0" .Values.airflowVersion) (.Values.dagProcessor.enabled | default false)) }}' # statsd params included for Airflow 1.10 backward compatibility; moved to [metrics] in 2.0 diff --git a/helm-tests/tests/helm_tests/airflow_aux/test_configmap.py b/helm-tests/tests/helm_tests/airflow_aux/test_configmap.py index 42e302be345fb..0fc78be86040e 100644 --- a/helm-tests/tests/helm_tests/airflow_aux/test_configmap.py +++ b/helm-tests/tests/helm_tests/airflow_aux/test_configmap.py @@ -301,3 +301,30 @@ def test_execution_api_server_url( assert "execution_api_server_url" not in config, ( "execution_api_server_url should not be set for Airflow 2.x versions" ) + + @pytest.mark.parametrize( + ("scheduler_cpu_limit", "expected_sync_parallelism"), + [ + ("1m", "1"), + ("1000m", "1"), + ("1001m", "2"), + ("0.1", "1"), + ("1", "1"), + ("1.01", "2"), + (None, 0), + (0, 0), + ], + ) + def test_expected_celery_sync_parallelism(self, scheduler_cpu_limit, expected_sync_parallelism): + scheduler_resources_cpu_limit = {} + if scheduler_cpu_limit is not None: + scheduler_resources_cpu_limit = { + "scheduler": {"resources": {"limits": {"cpu": scheduler_cpu_limit}}} + } + + configmap = render_chart( + values=scheduler_resources_cpu_limit, + show_only=["templates/configmaps/configmap.yaml"], + ) + config = jmespath.search('data."airflow.cfg"', configmap[0]) + assert f"\nsync_parallelism = {expected_sync_parallelism}\n" in config