diff --git a/src/v/datalake/coordinator/coordinator.cc b/src/v/datalake/coordinator/coordinator.cc index 6087f6cf6a286..edf6bbfc24e1c 100644 --- a/src/v/datalake/coordinator/coordinator.cc +++ b/src/v/datalake/coordinator/coordinator.cc @@ -747,12 +747,17 @@ coordinator::update_lifecycle_state( model::topic_namespace_view{model::kafka_namespace, t}); if (tombstone_it != topic_table_.get_iceberg_tombstones().end()) { auto tombstone_rev = tombstone_it->second.last_deleted_revision; + // The Glue REST catalog doesn't support purging data; explicitly + // pass that down to the drop operations. + auto should_purge = using_glue_catalog() + ? file_committer::purge_data::no + : file_committer::purge_data::yes; if (tombstone_rev >= topic.revision) { // Drop the main table if it exists. { auto table_id = table_id_provider::table_id(t); auto drop_res = co_await file_committer_.drop_table( - table_id); + table_id, should_purge); if (drop_res.has_error()) { switch (drop_res.error()) { case file_committer::errc::shutting_down: @@ -771,7 +776,7 @@ coordinator::update_lifecycle_state( { auto dlq_table_id = table_id_provider::dlq_table_id(t); auto drop_res = co_await file_committer_.drop_table( - dlq_table_id); + dlq_table_id, should_purge); if (drop_res.has_error()) { switch (drop_res.error()) { case file_committer::errc::shutting_down: @@ -856,14 +861,8 @@ ss::sstring coordinator::get_effective_default_partition_spec( const std::optional& partition_spec) const { const auto& cfg = config::shard_local_cfg(); auto current_spec = partition_spec.value_or(default_partition_spec_()); - - bool is_glue = cfg.iceberg_catalog_type() - == config::datalake_catalog_type::rest - && cfg.iceberg_rest_catalog_authentication_mode() - == config::datalake_catalog_auth_mode::aws_sigv4 - && cfg.iceberg_rest_catalog_aws_service_name() == "glue"; if ( - is_glue + using_glue_catalog() && current_spec == cfg.iceberg_default_partition_spec.default_value()) { // Glue can't partition on nested fields like redpanda.timestamp. vlog( @@ -875,4 +874,13 @@ ss::sstring coordinator::get_effective_default_partition_spec( return current_spec; } + +bool coordinator::using_glue_catalog() const { + const auto& cfg = config::shard_local_cfg(); + return cfg.iceberg_catalog_type() == config::datalake_catalog_type::rest + && cfg.iceberg_rest_catalog_authentication_mode() + == config::datalake_catalog_auth_mode::aws_sigv4 + && cfg.iceberg_rest_catalog_aws_service_name() == "glue"; +} + } // namespace datalake::coordinator diff --git a/src/v/datalake/coordinator/coordinator.h b/src/v/datalake/coordinator/coordinator.h index a76d928c56af9..10bd75d53044b 100644 --- a/src/v/datalake/coordinator/coordinator.h +++ b/src/v/datalake/coordinator/coordinator.h @@ -148,6 +148,11 @@ class coordinator { ss::sstring get_effective_default_partition_spec( const std::optional& partition_spec) const; + // Return whether the underlying catalog is the Glue REST catalog. + // TODO: if the kludges start piling up, we should abstract some "catalog + // capabilities" out. + bool using_glue_catalog() const; + ss::shared_ptr stm_; cluster::topic_table& topic_table_; type_resolver& type_resolver_; diff --git a/src/v/datalake/coordinator/file_committer.h b/src/v/datalake/coordinator/file_committer.h index 1e948b09da18f..34a0da0cb60d7 100644 --- a/src/v/datalake/coordinator/file_committer.h +++ b/src/v/datalake/coordinator/file_committer.h @@ -28,8 +28,9 @@ class file_committer { checked, errc>> commit_topic_files_to_catalog(model::topic, const topics_state&) const = 0; + using purge_data = ss::bool_class; virtual ss::future> - drop_table(const iceberg::table_identifier&) const = 0; + drop_table(const iceberg::table_identifier&, purge_data) const = 0; virtual ~file_committer() = default; }; diff --git a/src/v/datalake/coordinator/iceberg_file_committer.cc b/src/v/datalake/coordinator/iceberg_file_committer.cc index 04f31062377ed..a16cc136794ed 100644 --- a/src/v/datalake/coordinator/iceberg_file_committer.cc +++ b/src/v/datalake/coordinator/iceberg_file_committer.cc @@ -596,7 +596,7 @@ iceberg_file_committer::commit_topic_files_to_catalog( ss::future> iceberg_file_committer::drop_table( - const iceberg::table_identifier& table_id) const { + const iceberg::table_identifier& table_id, purge_data should_purge) const { auto load_res = co_await catalog_.load_table(table_id); if (load_res.has_error()) { if (load_res.error() == iceberg::catalog::errc::not_found) { @@ -609,7 +609,8 @@ iceberg_file_committer::drop_table( "anyway", table_id)); } - auto drop_res = co_await catalog_.drop_table(table_id, true); + auto drop_res = co_await catalog_.drop_table( + table_id, should_purge == purge_data::yes); if ( drop_res.has_error() && drop_res.error() != iceberg::catalog::errc::not_found) { diff --git a/src/v/datalake/coordinator/iceberg_file_committer.h b/src/v/datalake/coordinator/iceberg_file_committer.h index 8ad59cb65b1c2..b576292916504 100644 --- a/src/v/datalake/coordinator/iceberg_file_committer.h +++ b/src/v/datalake/coordinator/iceberg_file_committer.h @@ -59,7 +59,7 @@ class iceberg_file_committer : public file_committer { model::topic, const topics_state&) const final; ss::future> - drop_table(const iceberg::table_identifier&) const final; + drop_table(const iceberg::table_identifier&, purge_data) const final; private: // Must outlive this committer. diff --git a/src/v/datalake/coordinator/tests/coordinator_test.cc b/src/v/datalake/coordinator/tests/coordinator_test.cc index 67ab1cbce9ff6..8a52bcc1e7fa2 100644 --- a/src/v/datalake/coordinator/tests/coordinator_test.cc +++ b/src/v/datalake/coordinator/tests/coordinator_test.cc @@ -43,7 +43,7 @@ class noop_file_committer : public file_committer { } ss::future> - drop_table(const iceberg::table_identifier&) const final { + drop_table(const iceberg::table_identifier&, purge_data) const final { co_return std::nullopt; } diff --git a/src/v/datalake/coordinator/tests/state_test_utils.h b/src/v/datalake/coordinator/tests/state_test_utils.h index 056cbfa7fad46..bdb22ba00c6bb 100644 --- a/src/v/datalake/coordinator/tests/state_test_utils.h +++ b/src/v/datalake/coordinator/tests/state_test_utils.h @@ -56,7 +56,7 @@ class simple_file_committer : public file_committer { } ss::future> - drop_table(const iceberg::table_identifier&) const final { + drop_table(const iceberg::table_identifier&, purge_data) const final { co_return std::nullopt; }