Skip to content

Commit 48baf8f

Browse files
committed
Add 'ignore_if_missing' to DataprocDeleteClusterOperator
1 parent c2e003f commit 48baf8f

2 files changed

Lines changed: 30 additions & 12 deletions

File tree

providers/google/src/airflow/providers/google/cloud/operators/dataproc.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -949,6 +949,8 @@ class DataprocDeleteClusterOperator(GoogleCloudBaseOperator):
949949
account from the list granting this role to the originating account (templated).
950950
:param deferrable: Run operator in the deferrable mode.
951951
:param polling_interval_seconds: Time (seconds) to wait between calls to check the cluster status.
952+
:param ignore_if_missing: If True, the operator will not raise an exception if the cluster does not exist.
953+
Defaults to False.
952954
"""
953955

954956
template_fields: Sequence[str] = (
@@ -974,11 +976,14 @@ def __init__(
974976
impersonation_chain: str | Sequence[str] | None = None,
975977
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
976978
polling_interval_seconds: int = 10,
979+
ignore_if_missing: bool = False,
977980
**kwargs,
978981
):
979982
super().__init__(**kwargs)
980983
if deferrable and polling_interval_seconds <= 0:
981984
raise ValueError("Invalid value for polling_interval_seconds. Expected value greater than 0")
985+
if deferrable and ignore_if_missing:
986+
raise ValueError("Cannot set both deferrable and ignore_if_missing to True")
982987
self.project_id = project_id
983988
self.region = region
984989
self.cluster_name = cluster_name
@@ -991,19 +996,22 @@ def __init__(
991996
self.impersonation_chain = impersonation_chain
992997
self.deferrable = deferrable
993998
self.polling_interval_seconds = polling_interval_seconds
999+
self.ignore_if_missing = ignore_if_missing
9941000

9951001
def execute(self, context: Context) -> None:
9961002
hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
9971003
try:
9981004
op: operation.Operation = self._delete_cluster(hook)
9991005

10001006
except NotFound:
1001-
self.log.info(
1002-
"Cluster %s not found in region %s. might have been deleted already.",
1003-
self.cluster_name,
1004-
self.region,
1005-
)
1006-
return
1007+
if self.ignore_if_missing:
1008+
self.log.info(
1009+
"Cluster %s not found in region %s. Ignoring.",
1010+
self.cluster_name,
1011+
self.region,
1012+
)
1013+
return
1014+
raise
10071015

10081016
except Exception as e:
10091017
raise AirflowException(str(e))

providers/google/tests/unit/google/cloud/operators/test_dataproc.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1289,6 +1289,7 @@ def test_execute_cluster_not_found(self, mock_hook):
12891289
retry=RETRY,
12901290
timeout=TIMEOUT,
12911291
metadata=METADATA,
1292+
ignore_if_missing=True,
12921293
)
12931294

12941295
delete_cluster_op.execute(context=mock.MagicMock())
@@ -1304,9 +1305,7 @@ def test_execute_cluster_not_found(self, mock_hook):
13041305
)
13051306

13061307
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
1307-
@mock.patch(DATAPROC_TRIGGERS_PATH.format("DataprocAsyncHook"))
1308-
def test_execute_cluster_not_found_deffered(self, mock_deffer, mock_hook):
1309-
mock_hook.return_value.create_cluster.return_value = None
1308+
def test_execute_cluster_not_found_raises_when_ignore_if_missing_false(self, mock_hook):
13101309
mock_hook.return_value.delete_cluster.side_effect = NotFound("test")
13111310
delete_cluster_op = DataprocDeleteClusterOperator(
13121311
task_id="test_task",
@@ -1318,10 +1317,12 @@ def test_execute_cluster_not_found_deffered(self, mock_deffer, mock_hook):
13181317
retry=RETRY,
13191318
timeout=TIMEOUT,
13201319
metadata=METADATA,
1321-
deferrable=True,
1320+
ignore_if_missing=False,
13221321
)
13231322

1324-
delete_cluster_op.execute(context=mock.MagicMock())
1323+
with pytest.raises(NotFound):
1324+
delete_cluster_op.execute(context=mock.MagicMock())
1325+
13251326
mock_hook.return_value.delete_cluster.assert_called_once_with(
13261327
project_id=GCP_PROJECT,
13271328
region=GCP_REGION,
@@ -1333,7 +1334,16 @@ def test_execute_cluster_not_found_deffered(self, mock_deffer, mock_hook):
13331334
metadata=METADATA,
13341335
)
13351336

1336-
assert not mock_deffer.called
1337+
def test_init_with_deferrable_and_ignore_if_missing_raises(self):
1338+
with pytest.raises(ValueError, match="Cannot set both deferrable and ignore_if_missing to True"):
1339+
DataprocDeleteClusterOperator(
1340+
task_id="test_task",
1341+
region=GCP_REGION,
1342+
cluster_name=CLUSTER_NAME,
1343+
project_id=GCP_PROJECT,
1344+
deferrable=True,
1345+
ignore_if_missing=True,
1346+
)
13371347

13381348

13391349
class TestDataprocSubmitJobOperator(DataprocJobTestBase):

0 commit comments

Comments
 (0)