Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions src/v/datalake/coordinator/coordinator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -747,12 +747,17 @@ coordinator::update_lifecycle_state(
model::topic_namespace_view{model::kafka_namespace, t});
if (tombstone_it != topic_table_.get_iceberg_tombstones().end()) {
auto tombstone_rev = tombstone_it->second.last_deleted_revision;
// The Glue REST catalog doesn't support purging data; explicitly
// pass that down to the drop operations.
auto should_purge = using_glue_catalog()
? file_committer::purge_data::no
: file_committer::purge_data::yes;
if (tombstone_rev >= topic.revision) {
// Drop the main table if it exists.
{
auto table_id = table_id_provider::table_id(t);
auto drop_res = co_await file_committer_.drop_table(
table_id);
table_id, should_purge);
if (drop_res.has_error()) {
switch (drop_res.error()) {
case file_committer::errc::shutting_down:
Expand All @@ -771,7 +776,7 @@ coordinator::update_lifecycle_state(
{
auto dlq_table_id = table_id_provider::dlq_table_id(t);
auto drop_res = co_await file_committer_.drop_table(
dlq_table_id);
dlq_table_id, should_purge);
if (drop_res.has_error()) {
switch (drop_res.error()) {
case file_committer::errc::shutting_down:
Expand Down Expand Up @@ -856,14 +861,8 @@ ss::sstring coordinator::get_effective_default_partition_spec(
const std::optional<ss::sstring>& partition_spec) const {
const auto& cfg = config::shard_local_cfg();
auto current_spec = partition_spec.value_or(default_partition_spec_());

bool is_glue = cfg.iceberg_catalog_type()
== config::datalake_catalog_type::rest
&& cfg.iceberg_rest_catalog_authentication_mode()
== config::datalake_catalog_auth_mode::aws_sigv4
&& cfg.iceberg_rest_catalog_aws_service_name() == "glue";
if (
is_glue
using_glue_catalog()
&& current_spec == cfg.iceberg_default_partition_spec.default_value()) {
// Glue can't partition on nested fields like redpanda.timestamp.
vlog(
Expand All @@ -875,4 +874,13 @@ ss::sstring coordinator::get_effective_default_partition_spec(

return current_spec;
}

bool coordinator::using_glue_catalog() const {
const auto& cfg = config::shard_local_cfg();
return cfg.iceberg_catalog_type() == config::datalake_catalog_type::rest
&& cfg.iceberg_rest_catalog_authentication_mode()
== config::datalake_catalog_auth_mode::aws_sigv4
&& cfg.iceberg_rest_catalog_aws_service_name() == "glue";
}

} // namespace datalake::coordinator
5 changes: 5 additions & 0 deletions src/v/datalake/coordinator/coordinator.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ class coordinator {
ss::sstring get_effective_default_partition_spec(
const std::optional<ss::sstring>& partition_spec) const;

// Return whether the underlying catalog is the Glue REST catalog.
// TODO: if the kludges start piling up, we should abstract some "catalog
// capabilities" out.
bool using_glue_catalog() const;

ss::shared_ptr<coordinator_stm> stm_;
cluster::topic_table& topic_table_;
type_resolver& type_resolver_;
Expand Down
3 changes: 2 additions & 1 deletion src/v/datalake/coordinator/file_committer.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ class file_committer {
checked<chunked_vector<mark_files_committed_update>, errc>>
commit_topic_files_to_catalog(model::topic, const topics_state&) const = 0;

using purge_data = ss::bool_class<struct purge_data_tag>;
virtual ss::future<checked<std::nullopt_t, errc>>
drop_table(const iceberg::table_identifier&) const = 0;
drop_table(const iceberg::table_identifier&, purge_data) const = 0;

virtual ~file_committer() = default;
};
Expand Down
5 changes: 3 additions & 2 deletions src/v/datalake/coordinator/iceberg_file_committer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ iceberg_file_committer::commit_topic_files_to_catalog(

ss::future<checked<std::nullopt_t, file_committer::errc>>
iceberg_file_committer::drop_table(
const iceberg::table_identifier& table_id) const {
const iceberg::table_identifier& table_id, purge_data should_purge) const {
auto load_res = co_await catalog_.load_table(table_id);
if (load_res.has_error()) {
if (load_res.error() == iceberg::catalog::errc::not_found) {
Expand All @@ -609,7 +609,8 @@ iceberg_file_committer::drop_table(
"anyway",
table_id));
}
auto drop_res = co_await catalog_.drop_table(table_id, true);
auto drop_res = co_await catalog_.drop_table(
table_id, should_purge == purge_data::yes);
if (
drop_res.has_error()
&& drop_res.error() != iceberg::catalog::errc::not_found) {
Expand Down
2 changes: 1 addition & 1 deletion src/v/datalake/coordinator/iceberg_file_committer.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class iceberg_file_committer : public file_committer {
model::topic, const topics_state&) const final;

ss::future<checked<std::nullopt_t, errc>>
drop_table(const iceberg::table_identifier&) const final;
drop_table(const iceberg::table_identifier&, purge_data) const final;

private:
// Must outlive this committer.
Expand Down
2 changes: 1 addition & 1 deletion src/v/datalake/coordinator/tests/coordinator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class noop_file_committer : public file_committer {
}

ss::future<checked<std::nullopt_t, errc>>
drop_table(const iceberg::table_identifier&) const final {
drop_table(const iceberg::table_identifier&, purge_data) const final {
co_return std::nullopt;
}

Expand Down
2 changes: 1 addition & 1 deletion src/v/datalake/coordinator/tests/state_test_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class simple_file_committer : public file_committer {
}

ss::future<checked<std::nullopt_t, errc>>
drop_table(const iceberg::table_identifier&) const final {
drop_table(const iceberg::table_identifier&, purge_data) const final {
co_return std::nullopt;
}

Expand Down