feat: Add GCP_GKE and WATCHER_GCP_GKE execution modes#2488
feat: Add GCP_GKE and WATCHER_GCP_GKE execution modes#2488vricciardulli wants to merge 35 commits intoastronomer:mainfrom
GCP_GKE and WATCHER_GCP_GKE execution modes#2488Conversation
Add two new execution modes enabling dbt execution on GKE clusters: - GCP_GKE: standard mode using GKEStartPodOperator - WATCHER_GCP_GKE: watcher mode with producer/consumer pattern Includes all operators, watcher callbacks, graph.py watcher mappings, and comprehensive unit/integration tests. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Adds new execution modes so Cosmos can run dbt tasks on self-managed GKE clusters (e.g., Cloud Composer 3 on custom GKE) by using GKEStartPodOperator and a watcher variant.
Changes:
- Introduce
ExecutionMode.GCP_GKEandExecutionMode.WATCHER_GCP_GKE. - Add GKE-backed dbt operators (
cosmos/operators/gcp_gke.py) and watcher counterparts (cosmos/operators/watcher_gcp_gke.py). - Wire new watcher mode into graph-building logic and add unit/integration tests.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
cosmos/constants.py |
Adds the two new execution modes. |
cosmos/airflow/graph.py |
Ensures watcher graph building and AFTER_ALL test behavior work with WATCHER_GCP_GKE. |
cosmos/operators/gcp_gke.py |
Implements dbt operators backed by GKEStartPodOperator. |
cosmos/operators/watcher_gcp_gke.py |
Implements watcher producer/consumer behavior for GKE. |
tests/operators/test_gcp_gke.py |
Unit tests for GCP GKE operator command construction and execute path. |
tests/operators/test_watcher_gcp_gke_unit.py |
Unit tests for watcher GCP GKE retry/callback behavior. |
tests/operators/test_watcher_gcp_gke_integration.py |
Integration test for DbtDag with WATCHER_GCP_GKE. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 7 out of 7 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # image's ENTRYPOINT, so this works regardless of image configuration. | ||
| operator.cmds = [dbt_cmd[0]] | ||
| operator.arguments = dbt_cmd[1:] |
There was a problem hiding this comment.
build_kube_args unconditionally overwrites operator.cmds. This prevents users from passing a custom cmds (a supported KubernetesPodOperator/GKEStartPodOperator kwarg via operator_args) to preserve an image ENTRYPOINT/wrapper script. Consider only setting cmds when it is not already set (or provide an opt-out flag), and adjust how you split dbt_cmd accordingly.
| # image's ENTRYPOINT, so this works regardless of image configuration. | |
| operator.cmds = [dbt_cmd[0]] | |
| operator.arguments = dbt_cmd[1:] | |
| # image's ENTRYPOINT by default; if cmds is already set, preserve it and pass | |
| # the full dbt command as arguments to the existing entrypoint/wrapper. | |
| if getattr(operator, "cmds", None): | |
| # User has provided custom cmds or relies on the image ENTRYPOINT/wrapper; | |
| # do not override cmds, just pass the dbt command as arguments. | |
| operator.arguments = dbt_cmd | |
| else: | |
| # Default behavior: set cmds to the dbt executable and pass remaining | |
| # tokens as arguments. | |
| operator.cmds = [dbt_cmd[0]] | |
| operator.arguments = dbt_cmd[1:] |
| operator_kwargs: dict[str, Any] = {} | ||
| operator_args: set[str] = set() | ||
| for clazz in pod_operator_class.__mro__: | ||
| operator_args.update(inspect.signature(clazz.__init__).parameters.keys()) # type: ignore[misc] |
There was a problem hiding this comment.
Type checker error:
error: Accessing "__init__" on an instance is unsound, since instance.__init__ could be from an incompatible subclass [misc]
ignored because the code was like this also before my changes
| def _build_env_vars(env: dict[str, str | bytes | PathLike[Any]], existing_env_vars: list[Any]) -> list[k8s.V1EnvVar]: | ||
| """Merge an env dict with existing K8s env vars and return the combined list.""" | ||
| env_vars_dict = {k: str(v) for k, v in env.items()} | ||
| for ev in existing_env_vars: | ||
| env_vars_dict[ev.name] = ev.value | ||
| return convert_env_vars(env_vars_dict) # type: ignore[no-any-return] |
There was a problem hiding this comment.
Type checker error:
error: Returning Any from function declared to return "list[Any]" [no-any-return]
|
|
||
| # Get the logs from the pod | ||
| logs = [] | ||
| for log in task.pod_manager.read_pod_logs(pod, "base"): # type: ignore[attr-defined] |
There was a problem hiding this comment.
Type checker error:
error: "BaseOperator" has no attribute "pod_manager" [attr-defined]
was not getting this error before the refactoring
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 18 out of 18 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| except ImportError: | ||
| from airflow.models import BaseOperator # Airflow 2 | ||
|
|
||
| logger = get_logger(__name__) |
There was a problem hiding this comment.
logger = get_logger(__name__) is never used in this module, which will fail Ruff/Pyflakes (F841). Either remove it (and the get_logger import) or use it for module-level logging.
There was a problem hiding this comment.
Added because it was there also before the refactoring
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 16 out of 16 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| .. versionadded:: 1.13.0 | ||
|
|
||
| The ``ExecutionMode.WATCHER_KUBERNETES`` combines the **speed of the** :ref:`watcher-execution-mode` **with the isolation of** :ref:`kubernetes`. | ||
|
|
||
| A GCP GKE variant is also available as ``ExecutionMode.WATCHER_GCP_GKE``, which uses | ||
| ``GKEStartPodOperator`` instead of ``KubernetesPodOperator``. See :ref:`watcher-gcp-gke` below. | ||
|
|
There was a problem hiding this comment.
The page-level .. versionadded:: 1.13.0 now applies to the newly documented ExecutionMode.WATCHER_GCP_GKE as well, but this mode is introduced in this PR (later than 1.13.0). Consider adding a separate .. versionadded:: <new version> note for WATCHER_GCP_GKE (or rewording the existing directive) to avoid implying it has been available since 1.13.0.
|
Thank you so much for these improvements, @vricciardulli ! They look really valuable and will definitely enhance the project. We’ve had a heavier workload than expected over the past few weeks, so we haven’t been able to review everything immediately. We’ll be reviewing this PR thoroughly in the upcoming month and are considering including these changes in the upcoming 1.15.0 release. Really appreciate your patience and contribution! In the meantime, please, could you address Copilot's feedback and also rebase with the latest changes on main? |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2488 +/- ##
==========================================
- Coverage 97.92% 97.89% -0.03%
==========================================
Files 103 106 +3
Lines 7312 7460 +148
==========================================
+ Hits 7160 7303 +143
- Misses 152 157 +5 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
@tatiana I'll rebase and clean up once #2543 is merged to main |
Description
Added
GCP_GKEandWATCHER_GCP_GKEexecution modes.It's the same as
KUBERNETESandWATCHER_KUBERNETESmodes, but uses GKE operators (i.e.GKEStartPodOperator).This enables a user to use cosmos in Cloud Composer 3 with a self-managed custom GKE cluster.
Tests:
Implementation details:
cosmos/operators/_k8s_common.py.Docs:
Co-Authored-By: Claude Opus 4.6 (1M context) noreply@anthropic.com
Related Issue(s)
closes #2487
closes #2379
Breaking Change?
There should not be a breaking change.
Checklist