Skip to content

feat: Add GCP_GKE and WATCHER_GCP_GKE execution modes#2488

Open
vricciardulli wants to merge 35 commits intoastronomer:mainfrom
vricciardulli:gcp-modes
Open

feat: Add GCP_GKE and WATCHER_GCP_GKE execution modes#2488
vricciardulli wants to merge 35 commits intoastronomer:mainfrom
vricciardulli:gcp-modes

Conversation

@vricciardulli
Copy link
Copy Markdown
Contributor

@vricciardulli vricciardulli commented Mar 21, 2026

Description

Added GCP_GKE and WATCHER_GCP_GKE execution modes.

It's the same as KUBERNETES and WATCHER_KUBERNETES modes, but uses GKE operators (i.e. GKEStartPodOperator).

This enables a user to use cosmos in Cloud Composer 3 with a self-managed custom GKE cluster.

Tests:

  • Added unit tests
  • Tested in GCP Cloud Composer 3 with a custom GKE cluster. It works.
  • Did not add any integration test.

Implementation details:

  • Given the amount of duplication between the k8s modes and the new GKE ones, I've extracted all common code and placed it in helper functions in cosmos/operators/_k8s_common.py.
  • I've attempted to type hint as best as possible, but I had to ignore some type-checker errors. I've marked those with comments.
  • An alternative approach would have been to use a factory to generate the classes based on the pod operator provided, but I think that this approach would have been too opaque.

Docs:

  • I did not add separate pages documenting these modes.
  • I concisely added details of GKE modes in the k8s and watcher k8s pages.
  • Also added to the mermaid graph.

Co-Authored-By: Claude Opus 4.6 (1M context) noreply@anthropic.com

Related Issue(s)

closes #2487
closes #2379

Breaking Change?

There should not be a breaking change.

Checklist

  • I have made corresponding changes to the documentation (if required)
  • I have added tests that prove my fix is effective or that my feature works

vricciardulli and others added 3 commits March 19, 2026 11:02
Add two new execution modes enabling dbt execution on GKE clusters:
- GCP_GKE: standard mode using GKEStartPodOperator
- WATCHER_GCP_GKE: watcher mode with producer/consumer pattern

Includes all operators, watcher callbacks, graph.py watcher mappings,
and comprehensive unit/integration tests.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds new execution modes so Cosmos can run dbt tasks on self-managed GKE clusters (e.g., Cloud Composer 3 on custom GKE) by using GKEStartPodOperator and a watcher variant.

Changes:

  • Introduce ExecutionMode.GCP_GKE and ExecutionMode.WATCHER_GCP_GKE.
  • Add GKE-backed dbt operators (cosmos/operators/gcp_gke.py) and watcher counterparts (cosmos/operators/watcher_gcp_gke.py).
  • Wire new watcher mode into graph-building logic and add unit/integration tests.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
cosmos/constants.py Adds the two new execution modes.
cosmos/airflow/graph.py Ensures watcher graph building and AFTER_ALL test behavior work with WATCHER_GCP_GKE.
cosmos/operators/gcp_gke.py Implements dbt operators backed by GKEStartPodOperator.
cosmos/operators/watcher_gcp_gke.py Implements watcher producer/consumer behavior for GKE.
tests/operators/test_gcp_gke.py Unit tests for GCP GKE operator command construction and execute path.
tests/operators/test_watcher_gcp_gke_unit.py Unit tests for watcher GCP GKE retry/callback behavior.
tests/operators/test_watcher_gcp_gke_integration.py Integration test for DbtDag with WATCHER_GCP_GKE.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread tests/operators/test_gcp_gke.py
Comment thread cosmos/operators/gcp_gke.py Outdated
Comment thread cosmos/operators/watcher_gcp_gke.py Outdated
Copilot AI review requested due to automatic review settings March 21, 2026 10:41
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 7 out of 7 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread tests/operators/test_watcher_gcp_gke_integration.py Outdated
Comment thread cosmos/operators/gcp_gke.py Outdated
Comment thread cosmos/operators/watcher_gcp_gke.py Outdated
Copilot AI review requested due to automatic review settings March 21, 2026 12:47
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 15 out of 15 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread cosmos/operators/gcp_gke.py
Comment on lines +138 to +140
# image's ENTRYPOINT, so this works regardless of image configuration.
operator.cmds = [dbt_cmd[0]]
operator.arguments = dbt_cmd[1:]
Copy link

Copilot AI Mar 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

build_kube_args unconditionally overwrites operator.cmds. This prevents users from passing a custom cmds (a supported KubernetesPodOperator/GKEStartPodOperator kwarg via operator_args) to preserve an image ENTRYPOINT/wrapper script. Consider only setting cmds when it is not already set (or provide an opt-out flag), and adjust how you split dbt_cmd accordingly.

Suggested change
# image's ENTRYPOINT, so this works regardless of image configuration.
operator.cmds = [dbt_cmd[0]]
operator.arguments = dbt_cmd[1:]
# image's ENTRYPOINT by default; if cmds is already set, preserve it and pass
# the full dbt command as arguments to the existing entrypoint/wrapper.
if getattr(operator, "cmds", None):
# User has provided custom cmds or relies on the image ENTRYPOINT/wrapper;
# do not override cmds, just pass the dbt command as arguments.
operator.arguments = dbt_cmd
else:
# Default behavior: set cmds to the dbt executable and pass remaining
# tokens as arguments.
operator.cmds = [dbt_cmd[0]]
operator.arguments = dbt_cmd[1:]

Copilot uses AI. Check for mistakes.
Comment thread cosmos/constants.py
Copilot AI review requested due to automatic review settings March 29, 2026 15:52
operator_kwargs: dict[str, Any] = {}
operator_args: set[str] = set()
for clazz in pod_operator_class.__mro__:
operator_args.update(inspect.signature(clazz.__init__).parameters.keys()) # type: ignore[misc]
Copy link
Copy Markdown
Contributor Author

@vricciardulli vricciardulli Mar 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type checker error:

error: Accessing "__init__" on an instance is unsound, since instance.__init__ could be from an incompatible subclass  [misc]

ignored because the code was like this also before my changes

Comment on lines +124 to +129
def _build_env_vars(env: dict[str, str | bytes | PathLike[Any]], existing_env_vars: list[Any]) -> list[k8s.V1EnvVar]:
"""Merge an env dict with existing K8s env vars and return the combined list."""
env_vars_dict = {k: str(v) for k, v in env.items()}
for ev in existing_env_vars:
env_vars_dict[ev.name] = ev.value
return convert_env_vars(env_vars_dict) # type: ignore[no-any-return]
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type checker error:

error: Returning Any from function declared to return "list[Any]"  [no-any-return]


# Get the logs from the pod
logs = []
for log in task.pod_manager.read_pod_logs(pod, "base"): # type: ignore[attr-defined]
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type checker error:

error: "BaseOperator" has no attribute "pod_manager"  [attr-defined]

was not getting this error before the refactoring

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 18 out of 18 changed files in this pull request and generated 6 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread docs/guides/run_dbt/container/kubernetes.rst Outdated
Comment thread docs/guides/run_dbt/container/kubernetes.rst Outdated
Comment thread docs/guides/run_dbt/container/watcher-kubernetes-execution-mode.rst Outdated
Comment thread tests/operators/test_watcher_gcp_gke_integration.py Outdated
Comment thread tests/operators/test_watcher_gcp_gke_integration.py Outdated
except ImportError:
from airflow.models import BaseOperator # Airflow 2

logger = get_logger(__name__)
Copy link

Copilot AI Mar 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger = get_logger(__name__) is never used in this module, which will fail Ruff/Pyflakes (F841). Either remove it (and the get_logger import) or use it for module-level logging.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added because it was there also before the refactoring

Copilot AI review requested due to automatic review settings April 1, 2026 16:52
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 16 out of 16 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread cosmos/operators/_k8s_common.py
Comment thread tests/operators/test_k8s_common.py
Comment on lines 7 to +13
.. versionadded:: 1.13.0

The ``ExecutionMode.WATCHER_KUBERNETES`` combines the **speed of the** :ref:`watcher-execution-mode` **with the isolation of** :ref:`kubernetes`.

A GCP GKE variant is also available as ``ExecutionMode.WATCHER_GCP_GKE``, which uses
``GKEStartPodOperator`` instead of ``KubernetesPodOperator``. See :ref:`watcher-gcp-gke` below.

Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The page-level .. versionadded:: 1.13.0 now applies to the newly documented ExecutionMode.WATCHER_GCP_GKE as well, but this mode is introduced in this PR (later than 1.13.0). Consider adding a separate .. versionadded:: <new version> note for WATCHER_GCP_GKE (or rewording the existing directive) to avoid implying it has been available since 1.13.0.

Copilot uses AI. Check for mistakes.
@tatiana
Copy link
Copy Markdown
Collaborator

tatiana commented Apr 7, 2026

Thank you so much for these improvements, @vricciardulli ! They look really valuable and will definitely enhance the project.

We’ve had a heavier workload than expected over the past few weeks, so we haven’t been able to review everything immediately. We’ll be reviewing this PR thoroughly in the upcoming month and are considering including these changes in the upcoming 1.15.0 release.

Really appreciate your patience and contribution!

In the meantime, please, could you address Copilot's feedback and also rebase with the latest changes on main?

@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 7, 2026

Codecov Report

❌ Patch coverage is 95.45455% with 14 lines in your changes missing coverage. Please review.
✅ Project coverage is 97.89%. Comparing base (0ba7fde) to head (eb9c56e).
⚠️ Report is 8 commits behind head on main.

Files with missing lines Patch % Lines
cosmos/operators/watcher_gcp_gke.py 88.67% 6 Missing ⚠️
cosmos/operators/gcp_gke.py 91.52% 5 Missing ⚠️
cosmos/operators/_k8s_common.py 98.35% 3 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2488      +/-   ##
==========================================
- Coverage   97.92%   97.89%   -0.03%     
==========================================
  Files         103      106       +3     
  Lines        7312     7460     +148     
==========================================
+ Hits         7160     7303     +143     
- Misses        152      157       +5     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@vricciardulli
Copy link
Copy Markdown
Contributor Author

Thank you so much for these improvements, @vricciardulli ! They look really valuable and will definitely enhance the project.

We’ve had a heavier workload than expected over the past few weeks, so we haven’t been able to review everything immediately. We’ll be reviewing this PR thoroughly in the upcoming month and are considering including these changes in the upcoming 1.15.0 release.

Really appreciate your patience and contribution!

In the meantime, please, could you address Copilot's feedback and also rebase with the latest changes on main?

@tatiana I'll rebase and clean up once #2543 is merged to main

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] Add GCP_GKE and WATCHER_GCP_GKE execution modes [Feature] Add support for GKEStartPodOperator (again)

3 participants