From 240dad180ccd8c9a4dfc49e9800df233498f0198 Mon Sep 17 00:00:00 2001 From: Yeonguk Date: Wed, 2 Jul 2025 04:26:33 +0900 Subject: [PATCH 1/3] Fix inconsistent queued DAG filtering between dashboard and list page --- airflow-core/src/airflow/api_fastapi/common/db/dags.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/common/db/dags.py b/airflow-core/src/airflow/api_fastapi/common/db/dags.py index cdc4bf91be9da..d19cffe725bb6 100644 --- a/airflow-core/src/airflow/api_fastapi/common/db/dags.py +++ b/airflow-core/src/airflow/api_fastapi/common/db/dags.py @@ -19,7 +19,7 @@ from typing import TYPE_CHECKING -from sqlalchemy import func, null, select +from sqlalchemy import func, null, or_, select from airflow.api_fastapi.common.db.common import ( apply_filters_to_select, @@ -27,6 +27,7 @@ from airflow.api_fastapi.common.parameters import BaseParam, RangeFilter, SortParam from airflow.models import DagModel from airflow.models.dagrun import DagRun +from airflow.utils.state import DagRunState if TYPE_CHECKING: from sqlalchemy.sql import Select @@ -37,7 +38,7 @@ def generate_dag_with_latest_run_query(max_run_filters: list[BaseParam], order_b max_run_id_query = ( # ordering by id will not always be "latest run", but it's a simplifying assumption select(DagRun.dag_id, func.max(DagRun.id).label("max_dag_run_id")) - .where(DagRun.start_date.is_not(null())) + .where(or_(DagRun.start_date.is_not(null()), DagRun.state == DagRunState.QUEUED)) .group_by(DagRun.dag_id) .subquery(name="mrq") ) From d9ab9360e7efdcff042e2aab83ea8d818f786fa2 Mon Sep 17 00:00:00 2001 From: Yeonguk Date: Sat, 5 Jul 2025 02:40:10 +0900 Subject: [PATCH 2/3] Fix Dag list filtering to include DagRuns with null start_date --- .../src/airflow/api_fastapi/common/db/dags.py | 4 +- .../unit/api_fastapi/common/db/__init__.py | 16 + .../unit/api_fastapi/common/db/test_dags.py | 273 ++++++++++++++++++ 3 files changed, 290 insertions(+), 3 deletions(-) create mode 100644 airflow-core/tests/unit/api_fastapi/common/db/__init__.py create mode 100644 airflow-core/tests/unit/api_fastapi/common/db/test_dags.py diff --git a/airflow-core/src/airflow/api_fastapi/common/db/dags.py b/airflow-core/src/airflow/api_fastapi/common/db/dags.py index d19cffe725bb6..36283f86c55eb 100644 --- a/airflow-core/src/airflow/api_fastapi/common/db/dags.py +++ b/airflow-core/src/airflow/api_fastapi/common/db/dags.py @@ -19,7 +19,7 @@ from typing import TYPE_CHECKING -from sqlalchemy import func, null, or_, select +from sqlalchemy import func, select from airflow.api_fastapi.common.db.common import ( apply_filters_to_select, @@ -27,7 +27,6 @@ from airflow.api_fastapi.common.parameters import BaseParam, RangeFilter, SortParam from airflow.models import DagModel from airflow.models.dagrun import DagRun -from airflow.utils.state import DagRunState if TYPE_CHECKING: from sqlalchemy.sql import Select @@ -38,7 +37,6 @@ def generate_dag_with_latest_run_query(max_run_filters: list[BaseParam], order_b max_run_id_query = ( # ordering by id will not always be "latest run", but it's a simplifying assumption select(DagRun.dag_id, func.max(DagRun.id).label("max_dag_run_id")) - .where(or_(DagRun.start_date.is_not(null()), DagRun.state == DagRunState.QUEUED)) .group_by(DagRun.dag_id) .subquery(name="mrq") ) diff --git a/airflow-core/tests/unit/api_fastapi/common/db/__init__.py b/airflow-core/tests/unit/api_fastapi/common/db/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/airflow-core/tests/unit/api_fastapi/common/db/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow-core/tests/unit/api_fastapi/common/db/test_dags.py b/airflow-core/tests/unit/api_fastapi/common/db/test_dags.py new file mode 100644 index 0000000000000..9db4ac40485ae --- /dev/null +++ b/airflow-core/tests/unit/api_fastapi/common/db/test_dags.py @@ -0,0 +1,273 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from datetime import datetime, timezone + +import pytest + +from airflow.api_fastapi.common.db.dags import generate_dag_with_latest_run_query +from airflow.api_fastapi.common.parameters import SortParam +from airflow.models import DagModel +from airflow.models.dagrun import DagRun +from airflow.utils.session import provide_session +from airflow.utils.state import DagRunState +from airflow.utils.timezone import utcnow + +from tests_common.test_utils.db import clear_db_dags, clear_db_runs + +pytestmark = pytest.mark.db_test + + +class TestGenerateDagWithLatestRunQuery: + """Unit tests for generate_dag_with_latest_run_query function.""" + + @staticmethod + def _clear_db(): + clear_db_runs() + clear_db_dags() + + @pytest.fixture(autouse=True) + def setup_teardown(self): + """Setup and teardown for each test.""" + self._clear_db() + yield + self._clear_db() + + @pytest.fixture + @provide_session + def dag_with_queued_run(self, session): + """Returns a DAG with a QUEUED DagRun and null start_date.""" + + dag_id = "dag_with_queued_run" + + # Create DagModel + dag_model = DagModel( + dag_id=dag_id, + is_stale=False, + is_paused=False, + fileloc="/tmp/dag.py", + ) + session.add(dag_model) + session.flush() + + # Create DagRun with start_date=None (QUEUED state) + dagrun = DagRun( + dag_id=dag_id, + run_id="manual__queued", + run_type="manual", + logical_date=utcnow(), + state=DagRunState.QUEUED, + start_date=None, + ) + session.add(dagrun) + session.commit() + + return dag_model, dagrun + + @pytest.fixture + @provide_session + def dag_with_running_run(self, session): + """Returns a DAG with a RUNNING DagRun and a valid start_date.""" + + dag_id = "dag_with_running_run" + + # Create DagModel + dag_model = DagModel( + dag_id=dag_id, + is_stale=False, + is_paused=False, + fileloc="/tmp/dag2.py", + ) + session.add(dag_model) + session.flush() + + # Create DagRun with start_date set (RUNNING state) + start_time = utcnow() + dagrun = DagRun( + dag_id=dag_id, + run_id="manual__running", + run_type="manual", + logical_date=start_time, + state=DagRunState.RUNNING, + start_date=start_time, + ) + session.add(dagrun) + session.commit() + + return dag_model, dagrun + + @provide_session + def test_includes_queued_run_without_start_date(self, dag_with_queued_run, session): + """DAGs with QUEUED runs and null start_date should be included when no filters are applied, and joined DagRun state must not be None.""" + dag_model, _ = dag_with_queued_run + query = generate_dag_with_latest_run_query( + max_run_filters=[], + order_by=SortParam(allowed_attrs=["dag_id"], model=DagModel).set_value("dag_id"), + ) + + # Also fetch joined DagRun's state and start_date + extended_query = query.add_columns(DagRun.state, DagRun.start_date) + result = session.execute(extended_query).fetchall() + dag_row = next((row for row in result if row[0].dag_id == dag_model.dag_id), None) + assert dag_row is not None + dagrun_state = dag_row[1] + assert dagrun_state is not None, "Joined DagRun state must not be None" + + @provide_session + def test_includes_queued_run_when_ordering_by_state( + self, dag_with_queued_run, dag_with_running_run, session + ): + """DAGs with QUEUED runs and null start_date, and RUNNING runs must all have joined DagRun info not None.""" + queued_dag_model, _ = dag_with_queued_run + running_dag_model, _ = dag_with_running_run + + query = generate_dag_with_latest_run_query( + max_run_filters=[], + order_by=SortParam(allowed_attrs=["last_run_state"], model=DagModel).set_value("last_run_state"), + ) + extended_query = query.add_columns(DagRun.state, DagRun.start_date) + result = session.execute(extended_query).fetchall() + + # QUEUED DAG + queued_row = next((row for row in result if row[0].dag_id == queued_dag_model.dag_id), None) + assert queued_row is not None + assert queued_row[1] is not None, "Joined DagRun state for QUEUED DAG must not be None" + # RUNNING DAG + running_row = next((row for row in result if row[0].dag_id == running_dag_model.dag_id), None) + assert running_row is not None + assert running_row[1] is not None, "Joined DagRun state for RUNNING DAG must not be None" + assert running_row[2] is not None, "Joined DagRun start_date for RUNNING DAG must not be None" + + @provide_session + def test_includes_queued_run_when_ordering_by_start_date( + self, dag_with_queued_run, dag_with_running_run, session + ): + """DAGs with QUEUED runs and RUNNING runs must all have joined DagRun info not None when ordering by start_date.""" + queued_dag_model, _ = dag_with_queued_run + running_dag_model, _ = dag_with_running_run + + query = generate_dag_with_latest_run_query( + max_run_filters=[], + order_by=SortParam(allowed_attrs=["last_run_start_date"], model=DagModel).set_value( + "last_run_start_date" + ), + ) + extended_query = query.add_columns(DagRun.state, DagRun.start_date) + result = session.execute(extended_query).fetchall() + + # QUEUED DAG + queued_row = next((row for row in result if row[0].dag_id == queued_dag_model.dag_id), None) + assert queued_row is not None + assert queued_row[1] is not None, "Joined DagRun state for QUEUED DAG must not be None" + # RUNNING DAG + running_row = next((row for row in result if row[0].dag_id == running_dag_model.dag_id), None) + assert running_row is not None + assert running_row[1] is not None, "Joined DagRun state for RUNNING DAG must not be None" + assert running_row[2] is not None, "Joined DagRun start_date for RUNNING DAG must not be None" + + @provide_session + def test_latest_queued_run_without_start_date_is_included(self, session): + """Even if the latest DagRun is QUEUED+start_date=None, joined DagRun state must not be None.""" + dag_id = "dag_with_multiple_runs" + dag_model = DagModel( + dag_id=dag_id, + is_stale=False, + is_paused=False, + fileloc="/tmp/dag3.py", + ) + session.add(dag_model) + session.flush() + older_run = DagRun( + dag_id=dag_id, + run_id="manual__older", + run_type="manual", + logical_date=datetime(2025, 1, 1, tzinfo=timezone.utc), + state=DagRunState.SUCCESS, + start_date=datetime(2025, 1, 1, tzinfo=timezone.utc), + ) + session.add(older_run) + newer_run = DagRun( + dag_id=dag_id, + run_id="manual__newer_queued", + run_type="manual", + logical_date=utcnow(), + state=DagRunState.QUEUED, + start_date=None, + ) + session.add(newer_run) + session.commit() + query = generate_dag_with_latest_run_query( + max_run_filters=[], + order_by=SortParam(allowed_attrs=["last_run_state"], model=DagModel).set_value("last_run_state"), + ) + extended_query = query.add_columns(DagRun.state, DagRun.start_date) + result = session.execute(extended_query).fetchall() + dag_row = next((row for row in result if row[0].dag_id == dag_id), None) + assert dag_row is not None + assert dag_row[1] is not None, ( + "Even if latest DagRun is QUEUED+start_date=None, state must not be None" + ) + + @provide_session + def test_queued_runs_with_null_start_date_are_properly_joined( + self, dag_with_queued_run, dag_with_running_run, session + ): + """ + Verifies that DAGs with null start_date are properly joined in the query. + + If a WHERE clause filters out null start_dates, these DAGs would be excluded. + This test ensures they are still present and joined correctly. + """ + + queued_dag_model, _ = dag_with_queued_run + running_dag_model, _ = dag_with_running_run + query = generate_dag_with_latest_run_query( + max_run_filters=[], + order_by=SortParam(allowed_attrs=["last_run_state"], model=DagModel).set_value("last_run_state"), + ) + extended_query = query.add_columns(DagRun.state, DagRun.start_date) + + result = session.execute(extended_query).fetchall() + + # Find results for each DAG + queued_dag_result = None + running_dag_result = None + + for row in result: + dag_model = row[0] + if dag_model.dag_id == queued_dag_model.dag_id: + queued_dag_result = row + elif dag_model.dag_id == running_dag_model.dag_id: + running_dag_result = row + + # Assert both DAGs are present + assert queued_dag_result is not None, f"Queued DAG {queued_dag_model.dag_id} should be in results" + assert running_dag_result is not None, f"Running DAG {running_dag_model.dag_id} should be in results" + + # if WHERE start_date IS NOT NULL is present, + # the queued DAG should have NO DagRun information joined (state=None, start_date=None) + # But the running DAG should have DagRun information joined + + queued_dagrun_state = queued_dag_result[1] + running_dagrun_state = running_dag_result[1] + assert queued_dagrun_state is not None, ( + "Queued DAG should have DagRun state joined, but got None. " + "This suggests the WHERE start_date IS NOT NULL condition is excluding it." + ) + assert running_dagrun_state is not None, "Running DAG should have DagRun state joined" From 394504f5b7558f3ba2be92158e82614fc882522c Mon Sep 17 00:00:00 2001 From: Yeonguk Date: Mon, 7 Jul 2025 22:59:44 +0900 Subject: [PATCH 3/3] Remove unnecessary @provide_session decorators from test_dags.py --- .../tests/unit/api_fastapi/common/db/test_dags.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/airflow-core/tests/unit/api_fastapi/common/db/test_dags.py b/airflow-core/tests/unit/api_fastapi/common/db/test_dags.py index 9db4ac40485ae..b674ddb9a8b2e 100644 --- a/airflow-core/tests/unit/api_fastapi/common/db/test_dags.py +++ b/airflow-core/tests/unit/api_fastapi/common/db/test_dags.py @@ -25,7 +25,6 @@ from airflow.api_fastapi.common.parameters import SortParam from airflow.models import DagModel from airflow.models.dagrun import DagRun -from airflow.utils.session import provide_session from airflow.utils.state import DagRunState from airflow.utils.timezone import utcnow @@ -50,7 +49,6 @@ def setup_teardown(self): self._clear_db() @pytest.fixture - @provide_session def dag_with_queued_run(self, session): """Returns a DAG with a QUEUED DagRun and null start_date.""" @@ -81,7 +79,6 @@ def dag_with_queued_run(self, session): return dag_model, dagrun @pytest.fixture - @provide_session def dag_with_running_run(self, session): """Returns a DAG with a RUNNING DagRun and a valid start_date.""" @@ -112,7 +109,6 @@ def dag_with_running_run(self, session): return dag_model, dagrun - @provide_session def test_includes_queued_run_without_start_date(self, dag_with_queued_run, session): """DAGs with QUEUED runs and null start_date should be included when no filters are applied, and joined DagRun state must not be None.""" dag_model, _ = dag_with_queued_run @@ -129,7 +125,6 @@ def test_includes_queued_run_without_start_date(self, dag_with_queued_run, sessi dagrun_state = dag_row[1] assert dagrun_state is not None, "Joined DagRun state must not be None" - @provide_session def test_includes_queued_run_when_ordering_by_state( self, dag_with_queued_run, dag_with_running_run, session ): @@ -154,7 +149,6 @@ def test_includes_queued_run_when_ordering_by_state( assert running_row[1] is not None, "Joined DagRun state for RUNNING DAG must not be None" assert running_row[2] is not None, "Joined DagRun start_date for RUNNING DAG must not be None" - @provide_session def test_includes_queued_run_when_ordering_by_start_date( self, dag_with_queued_run, dag_with_running_run, session ): @@ -181,7 +175,6 @@ def test_includes_queued_run_when_ordering_by_start_date( assert running_row[1] is not None, "Joined DagRun state for RUNNING DAG must not be None" assert running_row[2] is not None, "Joined DagRun start_date for RUNNING DAG must not be None" - @provide_session def test_latest_queued_run_without_start_date_is_included(self, session): """Even if the latest DagRun is QUEUED+start_date=None, joined DagRun state must not be None.""" dag_id = "dag_with_multiple_runs" @@ -224,7 +217,6 @@ def test_latest_queued_run_without_start_date_is_included(self, session): "Even if latest DagRun is QUEUED+start_date=None, state must not be None" ) - @provide_session def test_queued_runs_with_null_start_date_are_properly_joined( self, dag_with_queued_run, dag_with_running_run, session ):