-
Notifications
You must be signed in to change notification settings - Fork 16.8k
Description
Apache Airflow Provider(s)
cncf-kubernetes
Versions of Apache Airflow Providers
See deployment details, but cncf-kubernetes provider version is 10.8.1, for info.
Apache Airflow version
3.1.0
Operating System
Official Airflow image: docker.io/apache/airflow:3.1.0-python3.12
Deployment
Official Apache Airflow Helm Chart
Deployment details
I use helm with a custom image built via this Dockerfile:
FROM docker.io/apache/airflow:3.1.0-python3.12
USER root
# Copy requirements to working directory
COPY requirements.txt /var/airflow/requirements.txt
# Set the working directory in the container
WORKDIR /var/airflow
USER airflow
RUN pip install --upgrade pip
# Install the necessary dependencies
RUN pip install \
--no-cache-dir \
--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-3.1.0/constraints-3.12.txt" \
-r /var/airflow/requirements.txt
The requirements.txt file is:
apache-airflow[amazon,google,postgres,async,cncf.kubernetes,celery,slack,http,fab,standard,openlineage]==3.1.0
What happened
I am using the SparkKubernetesOperator as follow:
full_reload_job = SparkKubernetesOperator(
task_id="full_reload_job",
namespace="spark-operator",
application_file="spark_app/full_reload_job/spark_application_config.yml",
kubernetes_conn_id="kubernetes_default",
random_name_suffix=True,
get_logs=True,
reattach_on_restart=True,
delete_on_termination=True,
do_xcom_push=False,
deferrable=True,
retries=0,
on_execute_callback=upload_spark_config_to_gcs,
)
The job succeeds when deferrable=False, but fails when it is True.
The error I get is:
ERROR - Task failed with exception
AttributeError: 'SparkKubernetesOperator' object has no attribute 'launcher'
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py", line 920 in run
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py", line 1307 in _execute_task
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/bases/operator.py", line 1632 in resume_execution
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 956 in trigger_reentry
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 978 in _clean
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 1013 in post_complete_action
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 1056 in cleanup
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py", line 265 in process_pod_deletion
Any idea why this is happening? It was working fine in Airflow 2.11.0 and before.
What you think should happen instead
The task should succeeds.
How to reproduce
Run a SparkKubernetesOperator task in deferrable mode.
Anything else
This issue #55747 describes the same behavior but for a Snowflake operator. I suspect that this issue is not tied to a specific provider but is more generic (linked to how task are deferred maybe).
Another remark that might be important: I am using keda to autoscale the triggerer pod (as well as the worker pods), and the triggerer and worker pods downscale to 0 when there is nothing to execute.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct