Skip to content

Cannot run SparkKubernetesOperator in deferrable mode #56291

@Ferdinanddb

Description

@Ferdinanddb

Apache Airflow Provider(s)

cncf-kubernetes

Versions of Apache Airflow Providers

See deployment details, but cncf-kubernetes provider version is 10.8.1, for info.

Apache Airflow version

3.1.0

Operating System

Official Airflow image: docker.io/apache/airflow:3.1.0-python3.12

Deployment

Official Apache Airflow Helm Chart

Deployment details

I use helm with a custom image built via this Dockerfile:

FROM docker.io/apache/airflow:3.1.0-python3.12

USER root

# Copy requirements to working directory
COPY requirements.txt /var/airflow/requirements.txt

# Set the working directory in the container
WORKDIR /var/airflow


USER airflow

RUN pip install --upgrade pip

# Install the necessary dependencies
RUN pip install \
    --no-cache-dir \
    --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-3.1.0/constraints-3.12.txt" \
    -r /var/airflow/requirements.txt

The requirements.txt file is:

apache-airflow[amazon,google,postgres,async,cncf.kubernetes,celery,slack,http,fab,standard,openlineage]==3.1.0

What happened

I am using the SparkKubernetesOperator as follow:

full_reload_job = SparkKubernetesOperator(
        task_id="full_reload_job",
        namespace="spark-operator",
        application_file="spark_app/full_reload_job/spark_application_config.yml",
        kubernetes_conn_id="kubernetes_default",
        random_name_suffix=True,
        get_logs=True,
        reattach_on_restart=True,
        delete_on_termination=True,
        do_xcom_push=False,
        deferrable=True,
        retries=0,
        on_execute_callback=upload_spark_config_to_gcs,
    )

The job succeeds when deferrable=False, but fails when it is True.

The error I get is:

ERROR - Task failed with exception
AttributeError: 'SparkKubernetesOperator' object has no attribute 'launcher'

File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py", line 920 in run
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py", line 1307 in _execute_task
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/bases/operator.py", line 1632 in resume_execution
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 956 in trigger_reentry
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 978 in _clean
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 1013 in post_complete_action
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 1056 in cleanup
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py", line 265 in process_pod_deletion

Any idea why this is happening? It was working fine in Airflow 2.11.0 and before.

What you think should happen instead

The task should succeeds.

How to reproduce

Run a SparkKubernetesOperator task in deferrable mode.

Anything else

This issue #55747 describes the same behavior but for a Snowflake operator. I suspect that this issue is not tied to a specific provider but is more generic (linked to how task are deferred maybe).

Another remark that might be important: I am using keda to autoscale the triggerer pod (as well as the worker pods), and the triggerer and worker pods downscale to 0 when there is nothing to execute.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions