From df1147e2b2ebc36a6e1390282426760aced9206a Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Thu, 12 Feb 2026 16:03:37 +0000 Subject: [PATCH 1/2] datalake/coordinator: add reset topic state escape hatch Operators may need to clear pending files from the datalake coordinator when Iceberg catalog state becomes inconsistent (e.g. after manual catalog modifications) which can result in stuck coordinator. The new CoordinatorResetState RPC is exposed via the admin API with SUPERUSER authorization and plumbed through the coordinator frontend, RPC service, and state machine. (cherry picked from commit c6f56215bab7d5a4bd8a4bf3d1bd2ebfbef6fd3f) --- .../admin/internal/datalake/v1/datalake.proto | 15 +++ src/v/datalake/coordinator/coordinator.cc | 47 ++++++++ src/v/datalake/coordinator/coordinator.h | 5 + src/v/datalake/coordinator/frontend.cc | 44 ++++++++ src/v/datalake/coordinator/frontend.h | 8 ++ src/v/datalake/coordinator/rpc.json | 5 + src/v/datalake/coordinator/service.cc | 6 + src/v/datalake/coordinator/service.h | 3 + src/v/datalake/coordinator/state_machine.cc | 7 ++ src/v/datalake/coordinator/state_update.cc | 50 ++++++++- src/v/datalake/coordinator/state_update.h | 23 ++++ .../coordinator/tests/state_update_test.cc | 61 ++++++++++ src/v/datalake/coordinator/types.cc | 6 + src/v/datalake/coordinator/types.h | 61 ++++++++++ .../admin/services/datalake/datalake.cc | 38 +++++++ .../admin/services/datalake/datalake.h | 5 + .../internal/datalake/v1/datalake_pb2.py | 48 ++++---- .../internal/datalake/v1/datalake_pb2.pyi | 25 +++++ .../datalake/v1/datalake_pb2_connect.py | 34 ++++++ tests/rptest/clients/types.py | 1 + .../tests/datalake/datalake_e2e_test.py | 106 +++++++++++++++++- 21 files changed, 573 insertions(+), 25 deletions(-) diff --git a/proto/redpanda/core/admin/internal/datalake/v1/datalake.proto b/proto/redpanda/core/admin/internal/datalake/v1/datalake.proto index 9b04996ce35ea..97d741d0707b7 100644 --- a/proto/redpanda/core/admin/internal/datalake/v1/datalake.proto +++ b/proto/redpanda/core/admin/internal/datalake/v1/datalake.proto @@ -32,6 +32,13 @@ service DatalakeService { authz: SUPERUSER }; } + + rpc CoordinatorResetTopicState(CoordinatorResetTopicStateRequest) + returns (CoordinatorResetTopicStateResponse) { + option (pbgen.rpc) = { + authz: SUPERUSER + }; + } } message GetCoordinatorStateRequest { @@ -44,6 +51,14 @@ message GetCoordinatorStateResponse { CoordinatorState state = 1; } +message CoordinatorResetTopicStateRequest { + string topic_name = 1; + int64 revision = 2; + bool reset_all_partitions = 3; +} + +message CoordinatorResetTopicStateResponse {} + message CoordinatorState { map topic_states = 1; } diff --git a/src/v/datalake/coordinator/coordinator.cc b/src/v/datalake/coordinator/coordinator.cc index 0c307eecb40af..0b4a0276437c1 100644 --- a/src/v/datalake/coordinator/coordinator.cc +++ b/src/v/datalake/coordinator/coordinator.cc @@ -885,6 +885,53 @@ coordinator::sync_get_topic_state(chunked_vector topics_filter) { co_return result; } +ss::future> +coordinator::sync_reset_topic_state( + model::topic topic, + model::revision_id topic_revision, + bool reset_all_partitions) { + auto gate = maybe_gate(); + if (gate.has_error()) { + co_return gate.error(); + } + + vlog(datalake_log.debug, "Resetting coordinator state for topic {}", topic); + auto sync_res = co_await stm_->sync(10s); + if (sync_res.has_error()) { + co_return convert_stm_errc(sync_res.error()); + } + + reset_topic_state_update update{ + .topic = topic, + .topic_revision = topic_revision, + .reset_all_partitions = reset_all_partitions, + }; + auto check_res = update.can_apply(stm_->state()); + if (check_res.has_error()) { + vlog( + datalake_log.debug, + "Rejecting reset topic state request for {}: {}", + topic, + check_res.error()); + co_return errc::stm_apply_error; + } + storage::record_batch_builder builder( + model::record_batch_type::datalake_coordinator, model::offset{0}); + builder.add_raw_kv( + serde::to_iobuf(reset_topic_state_update::key), + serde::to_iobuf(std::move(update))); + + auto repl_res = co_await stm_->replicate_and_wait( + sync_res.value(), std::move(builder).build(), as_); + if (repl_res.has_error()) { + auto e = convert_stm_errc(repl_res.error()); + vlog(datalake_log.warn, "Replication failed {}", e); + co_return e; + } + + co_return outcome::success(); +} + ss::sstring coordinator::get_effective_default_partition_spec( const std::optional& partition_spec) const { const auto& cfg = config::shard_local_cfg(); diff --git a/src/v/datalake/coordinator/coordinator.h b/src/v/datalake/coordinator/coordinator.h index 5e5a8a20a37ac..04b89a4029a3b 100644 --- a/src/v/datalake/coordinator/coordinator.h +++ b/src/v/datalake/coordinator/coordinator.h @@ -89,6 +89,11 @@ class coordinator { ss::future, errc>> sync_get_topic_state(chunked_vector topics); + ss::future> sync_reset_topic_state( + model::topic topic, + model::revision_id topic_rev, + bool reset_all_partitions); + void notify_leadership(std::optional); bool leader_loop_running() const { return term_as_.has_value(); } diff --git a/src/v/datalake/coordinator/frontend.cc b/src/v/datalake/coordinator/frontend.cc index 33b4282a86d16..a4afc277383d1 100644 --- a/src/v/datalake/coordinator/frontend.cc +++ b/src/v/datalake/coordinator/frontend.cc @@ -215,6 +215,13 @@ template auto frontend::process< &frontend::get_topic_state_locally, &frontend::client::get_topic_state>(get_topic_state_request, bool); +template auto frontend::process< + &frontend::reset_topic_state_locally, + &frontend::client::reset_topic_state>(reset_topic_state_request, bool); + +template auto frontend::remote_dispatch<&frontend::client::reset_topic_state>( + reset_topic_state_request, model::node_id); + // -- explicit instantiations --- frontend::frontend( @@ -494,4 +501,41 @@ ss::future frontend::get_topic_state( &client::get_topic_state>(std::move(request), bool(local_only_exec)); } +ss::future frontend::reset_topic_state_locally( + reset_topic_state_request request, + const model::ntp& coordinator_partition, + ss::shard_id shard) { + auto holder = _gate.hold(); + co_return co_await _coordinator_mgr->invoke_on( + shard, + [coordinator_partition, &request](coordinator_manager& mgr) mutable { + auto partition = mgr.get(coordinator_partition); + if (!partition) { + return ssx::now(reset_topic_state_reply{errc::not_leader}); + } + return partition + ->sync_reset_topic_state( + request.topic, + request.topic_revision, + request.reset_all_partitions) + .then([](auto result) { + reset_topic_state_reply resp{}; + if (result.has_error()) { + resp.errc = to_rpc_errc(result.error()); + } else { + resp.errc = errc::ok; + } + return ssx::now(std::move(resp)); + }); + }); +} + +ss::future frontend::reset_topic_state( + reset_topic_state_request request, local_only local_only_exec) { + auto holder = _gate.hold(); + co_return co_await process< + &frontend::reset_topic_state_locally, + &client::reset_topic_state>(std::move(request), bool(local_only_exec)); +} + } // namespace datalake::coordinator diff --git a/src/v/datalake/coordinator/frontend.h b/src/v/datalake/coordinator/frontend.h index 9cd4c00bda5cd..dbf429a67f2be 100644 --- a/src/v/datalake/coordinator/frontend.h +++ b/src/v/datalake/coordinator/frontend.h @@ -77,6 +77,9 @@ class frontend : public ss::peering_sharded_service { ss::future get_topic_state(get_topic_state_request, local_only = local_only::no); + ss::future + reset_topic_state(reset_topic_state_request, local_only = local_only::no); + /** * Returns the partition of datalake coordinator topic that * coordinates datalake tasks for this topic partitions. @@ -144,6 +147,11 @@ class frontend : public ss::peering_sharded_service { const model::ntp& coordinator_partition, ss::shard_id); + ss::future reset_topic_state_locally( + reset_topic_state_request, + const model::ntp& coordinator_partition, + ss::shard_id); + model::node_id _self; ss::sharded* _coordinator_mgr; ss::sharded* _group_mgr; diff --git a/src/v/datalake/coordinator/rpc.json b/src/v/datalake/coordinator/rpc.json index 026f2a0bd3ea3..8d8fab5987bb0 100644 --- a/src/v/datalake/coordinator/rpc.json +++ b/src/v/datalake/coordinator/rpc.json @@ -34,6 +34,11 @@ "name": "get_topic_state", "input_type": "get_topic_state_request", "output_type": "get_topic_state_reply" + }, + { + "name": "reset_topic_state", + "input_type": "reset_topic_state_request", + "output_type": "reset_topic_state_reply" } ] } diff --git a/src/v/datalake/coordinator/service.cc b/src/v/datalake/coordinator/service.cc index 5fe082f08fc32..7948aa08eccd7 100644 --- a/src/v/datalake/coordinator/service.cc +++ b/src/v/datalake/coordinator/service.cc @@ -58,4 +58,10 @@ ss::future service::get_topic_state( std::move(request), frontend::local_only::yes); } +ss::future service::reset_topic_state( + reset_topic_state_request request, ::rpc::streaming_context&) { + return _frontend->local().reset_topic_state( + std::move(request), frontend::local_only::yes); +} + }; // namespace datalake::coordinator::rpc diff --git a/src/v/datalake/coordinator/service.h b/src/v/datalake/coordinator/service.h index b24f17340d8cd..5f0f79eba5b52 100644 --- a/src/v/datalake/coordinator/service.h +++ b/src/v/datalake/coordinator/service.h @@ -39,6 +39,9 @@ class service final : public impl::datalake_coordinator_rpc_service { ss::future get_topic_state( get_topic_state_request, ::rpc::streaming_context&) override; + ss::future reset_topic_state( + reset_topic_state_request, ::rpc::streaming_context&) override; + private: ss::sharded* _frontend; }; diff --git a/src/v/datalake/coordinator/state_machine.cc b/src/v/datalake/coordinator/state_machine.cc index be53a915c9be5..3322727f9c61e 100644 --- a/src/v/datalake/coordinator/state_machine.cc +++ b/src/v/datalake/coordinator/state_machine.cc @@ -133,6 +133,13 @@ ss::future<> coordinator_stm::do_apply(const model::record_batch& b) { maybe_log_update_error(_log, key, o, res); continue; } + case update_key::reset_topic_state: { + auto update = serde::read(val_p); + vlog(_log.debug, "Applying {} from offset {}: {}", key, o, update); + auto res = update.apply(state_); + maybe_log_update_error(_log, key, o, res); + continue; + } } vlog( _log.error, diff --git a/src/v/datalake/coordinator/state_update.cc b/src/v/datalake/coordinator/state_update.cc index 85d0c2a0df4a6..0d1741e0e9a74 100644 --- a/src/v/datalake/coordinator/state_update.cc +++ b/src/v/datalake/coordinator/state_update.cc @@ -12,8 +12,6 @@ #include "datalake/coordinator/state.h" #include "datalake/coordinator/translated_offset_range.h" -#include - namespace datalake::coordinator { std::ostream& operator<<(std::ostream& o, const update_key& u) { @@ -24,6 +22,8 @@ std::ostream& operator<<(std::ostream& o, const update_key& u) { return o << "update_key::mark_files_committed"; case update_key::topic_lifecycle_update: return o << "update_key::topic_lifecycle_update"; + case update_key::reset_topic_state: + return o << "update_key::reset_topic_state"; } } @@ -291,6 +291,42 @@ topic_lifecycle_update::apply(topics_state& state) { return true; } +checked +reset_topic_state_update::can_apply(const topics_state& state) { + auto topic_it = state.topic_to_state.find(topic); + if (topic_it == state.topic_to_state.end()) { + // No topic at all, the reset is a no-op. + return outcome::success(); + } + const auto& cur_topic = topic_it->second; + if (topic_revision != cur_topic.revision) { + return stm_update_error{fmt::format( + "topic {} revision mismatch: got {}, current rev {}", + topic, + topic_revision, + cur_topic.revision)}; + } + return outcome::success(); +} + +checked +reset_topic_state_update::apply(topics_state& state) { + auto allowed = can_apply(state); + if (allowed.has_error()) { + return allowed.error(); + } + + auto topic_it = state.topic_to_state.find(topic); + if (topic_it == state.topic_to_state.end()) { + return outcome::success(); + } + auto& t_state = topic_it->second; + if (reset_all_partitions) { + t_state.pid_to_pending_files.clear(); + } + return outcome::success(); +} + std::ostream& operator<<(std::ostream& o, const add_files_update& u) { fmt::print(o, "{{tp: {}, revision: {}, entries: [", u.tp, u.topic_revision); static constexpr size_t max_to_log = 6; @@ -329,4 +365,14 @@ std::ostream& operator<<(std::ostream& o, const topic_lifecycle_update& u) { return o; } +std::ostream& operator<<(std::ostream& o, const reset_topic_state_update& u) { + fmt::print( + o, + "{{topic: {}, revision: {}, reset_all_partitions: {}}}", + u.topic, + u.topic_revision, + u.reset_all_partitions); + return o; +} + } // namespace datalake::coordinator diff --git a/src/v/datalake/coordinator/state_update.h b/src/v/datalake/coordinator/state_update.h index d85b35984ce6c..d1d54fef3d513 100644 --- a/src/v/datalake/coordinator/state_update.h +++ b/src/v/datalake/coordinator/state_update.h @@ -26,6 +26,7 @@ enum class update_key : uint8_t { add_files = 0, mark_files_committed = 1, topic_lifecycle_update = 2, + reset_topic_state = 3, }; std::ostream& operator<<(std::ostream&, const update_key&); @@ -116,4 +117,26 @@ struct topic_lifecycle_update topic_state::lifecycle_state_t new_state; }; +struct reset_topic_state_update + : public serde::envelope< + reset_topic_state_update, + serde::version<0>, + serde::compat_version<0>> { + static constexpr auto key{update_key::reset_topic_state}; + + model::topic topic; + model::revision_id topic_revision; + bool reset_all_partitions{false}; + + auto serde_fields() { + return std::tie(topic, topic_revision, reset_all_partitions); + } + + checked can_apply(const topics_state&); + checked apply(topics_state&); + + friend std::ostream& + operator<<(std::ostream&, const reset_topic_state_update&); +}; + } // namespace datalake::coordinator diff --git a/src/v/datalake/coordinator/tests/state_update_test.cc b/src/v/datalake/coordinator/tests/state_update_test.cc index 82c6232546752..567304be12f7d 100644 --- a/src/v/datalake/coordinator/tests/state_update_test.cc +++ b/src/v/datalake/coordinator/tests/state_update_test.cc @@ -314,3 +314,64 @@ TEST(StateUpdateTest, TestLifecycle) { apply_lc_transition(state, rev3, topic_state::lifecycle_state_t::closed) .has_value()); } + +TEST(StateUpdateTest, TestResetTopicState) { + topics_state state; + ASSERT_FALSE( + apply_lc_transition(state, rev, topic_state::lifecycle_state_t::live) + .has_error()); + + auto res = add_files_update::build( + state, tp, rev, make_pending_files({{0, 100}})) + .value() + .apply(state, model::offset{}); + ASSERT_FALSE(res.has_error()); + ASSERT_NO_FATAL_FAILURE( + check_partition(state, tp, std::nullopt, {{0, 100}})); + + // Reset with reset_all_partitions clears pending files. + reset_topic_state_update update{ + .topic = topic, + .topic_revision = rev, + .reset_all_partitions = true, + }; + ASSERT_FALSE(update.apply(state).has_error()); + auto ps = state.partition_state(tp); + ASSERT_FALSE(ps.has_value()); + + // Reset with wrong revision fails. + reset_topic_state_update bad_rev{ + .topic = topic, + .topic_revision = model::revision_id{999}, + }; + ASSERT_TRUE(bad_rev.apply(state).has_error()); + + // Reset on nonexistent topic is a no-op. + reset_topic_state_update missing{ + .topic = model::topic{"no_such_topic"}, + .topic_revision = rev, + }; + ASSERT_FALSE(missing.apply(state).has_error()); +} + +TEST(StateUpdateTest, TestResetNoOp) { + topics_state state; + ASSERT_FALSE( + apply_lc_transition(state, rev, topic_state::lifecycle_state_t::live) + .has_error()); + + auto res = add_files_update::build( + state, tp, rev, make_pending_files({{0, 100}})) + .value() + .apply(state, model::offset{}); + ASSERT_FALSE(res.has_error()); + + // reset_all_partitions=false (default), no overrides: state is unchanged. + reset_topic_state_update update{ + .topic = topic, + .topic_revision = rev, + }; + ASSERT_FALSE(update.apply(state).has_error()); + ASSERT_NO_FATAL_FAILURE( + check_partition(state, tp, std::nullopt, {{0, 100}})); +} diff --git a/src/v/datalake/coordinator/types.cc b/src/v/datalake/coordinator/types.cc index 25579a347ed1c..bf3f944d48502 100644 --- a/src/v/datalake/coordinator/types.cc +++ b/src/v/datalake/coordinator/types.cc @@ -140,4 +140,10 @@ operator<<(std::ostream& o, const get_topic_state_request& request) { request.topics_filter); return o; } +std::ostream& +operator<<(std::ostream& o, const reset_topic_state_reply& reply) { + fmt::print(o, "{{errc: {}}}", reply.errc); + return o; +} + } // namespace datalake::coordinator diff --git a/src/v/datalake/coordinator/types.h b/src/v/datalake/coordinator/types.h index f923821c1bad7..95a4265edb369 100644 --- a/src/v/datalake/coordinator/types.h +++ b/src/v/datalake/coordinator/types.h @@ -382,4 +382,65 @@ struct get_topic_state_request } }; +struct reset_topic_state_reply + : serde::envelope< + reset_topic_state_reply, + serde::version<0>, + serde::compat_version<0>> { + reset_topic_state_reply() = default; + explicit reset_topic_state_reply(errc err) + : errc(err) {} + friend std::ostream& + operator<<(std::ostream&, const reset_topic_state_reply&); + errc errc; + auto serde_fields() { return std::tie(errc); } +}; + +struct reset_topic_state_request + : serde::envelope< + reset_topic_state_request, + serde::version<0>, + serde::compat_version<0>> { + using resp_t = reset_topic_state_reply; + + model::partition_id coordinator_partition; + model::topic topic; + model::revision_id topic_revision; + bool reset_all_partitions{false}; + + reset_topic_state_request() = default; + + explicit reset_topic_state_request( + model::partition_id coordinator_partition, + model::topic topic, + model::revision_id topic_revision, + bool reset_all_partitions = false) + : coordinator_partition(coordinator_partition) + , topic(std::move(topic)) + , topic_revision(topic_revision) + , reset_all_partitions(reset_all_partitions) {} + + model::partition_id get_coordinator_partition() const { + return coordinator_partition; + } + + friend std::ostream& + operator<<(std::ostream& o, const reset_topic_state_request& req) { + fmt::print( + o, + "{{coordinator_partition: {}, topic: {}, topic_revision: {}, " + "reset_all_partitions: {}}}", + req.coordinator_partition, + req.topic, + req.topic_revision, + req.reset_all_partitions); + return o; + } + + auto serde_fields() { + return std::tie( + coordinator_partition, topic, topic_revision, reset_all_partitions); + } +}; + } // namespace datalake::coordinator diff --git a/src/v/redpanda/admin/services/datalake/datalake.cc b/src/v/redpanda/admin/services/datalake/datalake.cc index 22dca55ae9561..fbb9722976e9d 100644 --- a/src/v/redpanda/admin/services/datalake/datalake.cc +++ b/src/v/redpanda/admin/services/datalake/datalake.cc @@ -195,4 +195,42 @@ datalake_service_impl::get_coordinator_state( co_return response; } +ss::future +datalake_service_impl::coordinator_reset_topic_state( + serde::pb::rpc::context, + proto::admin::coordinator_reset_topic_state_request req) { + if (!_coordinator_fe->local_is_initialized()) { + throw serde::pb::rpc::unavailable_exception( + "Datalake coordinator frontend not initialized"); + } + + model::topic topic{req.get_topic_name()}; + auto partition_opt = _coordinator_fe->local().coordinator_partition(topic); + if (!partition_opt.has_value()) { + throw serde::pb::rpc::unavailable_exception( + fmt::format( + "Datalake coordinator couldn't get coordinator partition " + "for {}", + topic)); + } + + model::revision_id topic_revision{req.get_revision()}; + + auto fe_res = co_await _coordinator_fe->local().reset_topic_state( + datalake::coordinator::reset_topic_state_request( + partition_opt.value(), + topic, + topic_revision, + req.get_reset_all_partitions())); + if (fe_res.errc != datalake::coordinator::errc::ok) { + throw serde::pb::rpc::internal_exception( + fmt::format( + "Datalake coordinator error for partition {}: {}", + partition_opt.value(), + fe_res.errc)); + } + + co_return proto::admin::coordinator_reset_topic_state_response{}; +} + } // namespace admin diff --git a/src/v/redpanda/admin/services/datalake/datalake.h b/src/v/redpanda/admin/services/datalake/datalake.h index 7a210645d98e3..a229d0fcb73da 100644 --- a/src/v/redpanda/admin/services/datalake/datalake.h +++ b/src/v/redpanda/admin/services/datalake/datalake.h @@ -28,6 +28,11 @@ class datalake_service_impl : public proto::admin::datalake_service { serde::pb::rpc::context, proto::admin::get_coordinator_state_request) override; + ss::future + coordinator_reset_topic_state( + serde::pb::rpc::context, + proto::admin::coordinator_reset_topic_state_request) override; + private: admin::proxy::client _proxy_client; diff --git a/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/datalake/v1/datalake_pb2.py b/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/datalake/v1/datalake_pb2.py index 85961aa8e92bc..ccedae4b0ba01 100644 --- a/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/datalake/v1/datalake_pb2.py +++ b/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/datalake/v1/datalake_pb2.py @@ -8,7 +8,7 @@ _sym_db = _symbol_database.Default() from ........proto.redpanda.core.pbgen import options_pb2 as proto_dot_redpanda_dot_core_dot_pbgen_dot_options__pb2 from ........proto.redpanda.core.pbgen import rpc_pb2 as proto_dot_redpanda_dot_core_dot_pbgen_dot_rpc__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n=proto/redpanda/core/admin/internal/datalake/v1/datalake.proto\x12(redpanda.core.admin.internal.datalake.v1\x1a\'proto/redpanda/core/pbgen/options.proto\x1a#proto/redpanda/core/pbgen/rpc.proto"3\n\x1aGetCoordinatorStateRequest\x12\x15\n\rtopics_filter\x18\x01 \x03(\t"h\n\x1bGetCoordinatorStateResponse\x12I\n\x05state\x18\x01 \x01(\x0b2:.redpanda.core.admin.internal.datalake.v1.CoordinatorState"\xdf\x01\n\x10CoordinatorState\x12a\n\x0ctopic_states\x18\x01 \x03(\x0b2K.redpanda.core.admin.internal.datalake.v1.CoordinatorState.TopicStatesEntry\x1ah\n\x10TopicStatesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12C\n\x05value\x18\x02 \x01(\x0b24.redpanda.core.admin.internal.datalake.v1.TopicState:\x028\x01"\x96\x01\n\x08DataFile\x12\x13\n\x0bremote_path\x18\x01 \x01(\t\x12\x11\n\trow_count\x18\x02 \x01(\x04\x12\x17\n\x0ffile_size_bytes\x18\x03 \x01(\x04\x12\x17\n\x0ftable_schema_id\x18\x04 \x01(\x05\x12\x19\n\x11partition_spec_id\x18\x05 \x01(\x05\x12\x15\n\rpartition_key\x18\x06 \x03(\x0c"\xf0\x01\n\x15TranslatedOffsetRange\x12\x14\n\x0cstart_offset\x18\x01 \x01(\x03\x12\x13\n\x0blast_offset\x18\x02 \x01(\x03\x12F\n\ndata_files\x18\x03 \x03(\x0b22.redpanda.core.admin.internal.datalake.v1.DataFile\x12E\n\tdlq_files\x18\x04 \x03(\x0b22.redpanda.core.admin.internal.datalake.v1.DataFile\x12\x1d\n\x15kafka_processed_bytes\x18\x05 \x01(\x04"w\n\x0cPendingEntry\x12M\n\x04data\x18\x01 \x01(\x0b2?.redpanda.core.admin.internal.datalake.v1.TranslatedOffsetRange\x12\x18\n\x10added_pending_at\x18\x02 \x01(\x03"\x91\x01\n\x0ePartitionState\x12O\n\x0fpending_entries\x18\x01 \x03(\x0b26.redpanda.core.admin.internal.datalake.v1.PendingEntry\x12\x1b\n\x0elast_committed\x18\x02 \x01(\x03H\x00\x88\x01\x01B\x11\n\x0f_last_committed"\x91\x03\n\nTopicState\x12\x10\n\x08revision\x18\x01 \x01(\x03\x12c\n\x10partition_states\x18\x02 \x03(\x0b2I.redpanda.core.admin.internal.datalake.v1.TopicState.PartitionStatesEntry\x12Q\n\x0flifecycle_state\x18\x03 \x01(\x0e28.redpanda.core.admin.internal.datalake.v1.LifecycleState\x12#\n\x1btotal_kafka_processed_bytes\x18\x04 \x01(\x04\x12"\n\x1alast_committed_snapshot_id\x18\x05 \x01(\x04\x1ap\n\x14PartitionStatesEntry\x12\x0b\n\x03key\x18\x01 \x01(\x05\x12G\n\x05value\x18\x02 \x01(\x0b28.redpanda.core.admin.internal.datalake.v1.PartitionState:\x028\x01*\x83\x01\n\x0eLifecycleState\x12\x1f\n\x1bLIFECYCLE_STATE_UNSPECIFIED\x10\x00\x12\x18\n\x14LIFECYCLE_STATE_LIVE\x10\x01\x12\x1a\n\x16LIFECYCLE_STATE_CLOSED\x10\x02\x12\x1a\n\x16LIFECYCLE_STATE_PURGED\x10\x032\xbe\x01\n\x0fDatalakeService\x12\xaa\x01\n\x13GetCoordinatorState\x12D.redpanda.core.admin.internal.datalake.v1.GetCoordinatorStateRequest\x1aE.redpanda.core.admin.internal.datalake.v1.GetCoordinatorStateResponse"\x06\xea\x92\x19\x02\x10\x03B\x10\xea\x92\x19\x0cproto::adminb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n=proto/redpanda/core/admin/internal/datalake/v1/datalake.proto\x12(redpanda.core.admin.internal.datalake.v1\x1a\'proto/redpanda/core/pbgen/options.proto\x1a#proto/redpanda/core/pbgen/rpc.proto"3\n\x1aGetCoordinatorStateRequest\x12\x15\n\rtopics_filter\x18\x01 \x03(\t"h\n\x1bGetCoordinatorStateResponse\x12I\n\x05state\x18\x01 \x01(\x0b2:.redpanda.core.admin.internal.datalake.v1.CoordinatorState"g\n!CoordinatorResetTopicStateRequest\x12\x12\n\ntopic_name\x18\x01 \x01(\t\x12\x10\n\x08revision\x18\x02 \x01(\x03\x12\x1c\n\x14reset_all_partitions\x18\x03 \x01(\x08"$\n"CoordinatorResetTopicStateResponse"\xdf\x01\n\x10CoordinatorState\x12a\n\x0ctopic_states\x18\x01 \x03(\x0b2K.redpanda.core.admin.internal.datalake.v1.CoordinatorState.TopicStatesEntry\x1ah\n\x10TopicStatesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12C\n\x05value\x18\x02 \x01(\x0b24.redpanda.core.admin.internal.datalake.v1.TopicState:\x028\x01"\x96\x01\n\x08DataFile\x12\x13\n\x0bremote_path\x18\x01 \x01(\t\x12\x11\n\trow_count\x18\x02 \x01(\x04\x12\x17\n\x0ffile_size_bytes\x18\x03 \x01(\x04\x12\x17\n\x0ftable_schema_id\x18\x04 \x01(\x05\x12\x19\n\x11partition_spec_id\x18\x05 \x01(\x05\x12\x15\n\rpartition_key\x18\x06 \x03(\x0c"\xf0\x01\n\x15TranslatedOffsetRange\x12\x14\n\x0cstart_offset\x18\x01 \x01(\x03\x12\x13\n\x0blast_offset\x18\x02 \x01(\x03\x12F\n\ndata_files\x18\x03 \x03(\x0b22.redpanda.core.admin.internal.datalake.v1.DataFile\x12E\n\tdlq_files\x18\x04 \x03(\x0b22.redpanda.core.admin.internal.datalake.v1.DataFile\x12\x1d\n\x15kafka_processed_bytes\x18\x05 \x01(\x04"w\n\x0cPendingEntry\x12M\n\x04data\x18\x01 \x01(\x0b2?.redpanda.core.admin.internal.datalake.v1.TranslatedOffsetRange\x12\x18\n\x10added_pending_at\x18\x02 \x01(\x03"\x91\x01\n\x0ePartitionState\x12O\n\x0fpending_entries\x18\x01 \x03(\x0b26.redpanda.core.admin.internal.datalake.v1.PendingEntry\x12\x1b\n\x0elast_committed\x18\x02 \x01(\x03H\x00\x88\x01\x01B\x11\n\x0f_last_committed"\x91\x03\n\nTopicState\x12\x10\n\x08revision\x18\x01 \x01(\x03\x12c\n\x10partition_states\x18\x02 \x03(\x0b2I.redpanda.core.admin.internal.datalake.v1.TopicState.PartitionStatesEntry\x12Q\n\x0flifecycle_state\x18\x03 \x01(\x0e28.redpanda.core.admin.internal.datalake.v1.LifecycleState\x12#\n\x1btotal_kafka_processed_bytes\x18\x04 \x01(\x04\x12"\n\x1alast_committed_snapshot_id\x18\x05 \x01(\x04\x1ap\n\x14PartitionStatesEntry\x12\x0b\n\x03key\x18\x01 \x01(\x05\x12G\n\x05value\x18\x02 \x01(\x0b28.redpanda.core.admin.internal.datalake.v1.PartitionState:\x028\x01*\x83\x01\n\x0eLifecycleState\x12\x1f\n\x1bLIFECYCLE_STATE_UNSPECIFIED\x10\x00\x12\x18\n\x14LIFECYCLE_STATE_LIVE\x10\x01\x12\x1a\n\x16LIFECYCLE_STATE_CLOSED\x10\x02\x12\x1a\n\x16LIFECYCLE_STATE_PURGED\x10\x032\x80\x03\n\x0fDatalakeService\x12\xaa\x01\n\x13GetCoordinatorState\x12D.redpanda.core.admin.internal.datalake.v1.GetCoordinatorStateRequest\x1aE.redpanda.core.admin.internal.datalake.v1.GetCoordinatorStateResponse"\x06\xea\x92\x19\x02\x10\x03\x12\xbf\x01\n\x1aCoordinatorResetTopicState\x12K.redpanda.core.admin.internal.datalake.v1.CoordinatorResetTopicStateRequest\x1aL.redpanda.core.admin.internal.datalake.v1.CoordinatorResetTopicStateResponse"\x06\xea\x92\x19\x02\x10\x03B\x10\xea\x92\x19\x0cproto::adminb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'proto.redpanda.core.admin.internal.datalake.v1.datalake_pb2', _globals) @@ -21,27 +21,33 @@ _globals['_TOPICSTATE_PARTITIONSTATESENTRY']._serialized_options = b'8\x01' _globals['_DATALAKESERVICE'].methods_by_name['GetCoordinatorState']._loaded_options = None _globals['_DATALAKESERVICE'].methods_by_name['GetCoordinatorState']._serialized_options = b'\xea\x92\x19\x02\x10\x03' - _globals['_LIFECYCLESTATE']._serialized_start = 1640 - _globals['_LIFECYCLESTATE']._serialized_end = 1771 + _globals['_DATALAKESERVICE'].methods_by_name['CoordinatorResetTopicState']._loaded_options = None + _globals['_DATALAKESERVICE'].methods_by_name['CoordinatorResetTopicState']._serialized_options = b'\xea\x92\x19\x02\x10\x03' + _globals['_LIFECYCLESTATE']._serialized_start = 1783 + _globals['_LIFECYCLESTATE']._serialized_end = 1914 _globals['_GETCOORDINATORSTATEREQUEST']._serialized_start = 185 _globals['_GETCOORDINATORSTATEREQUEST']._serialized_end = 236 _globals['_GETCOORDINATORSTATERESPONSE']._serialized_start = 238 _globals['_GETCOORDINATORSTATERESPONSE']._serialized_end = 342 - _globals['_COORDINATORSTATE']._serialized_start = 345 - _globals['_COORDINATORSTATE']._serialized_end = 568 - _globals['_COORDINATORSTATE_TOPICSTATESENTRY']._serialized_start = 464 - _globals['_COORDINATORSTATE_TOPICSTATESENTRY']._serialized_end = 568 - _globals['_DATAFILE']._serialized_start = 571 - _globals['_DATAFILE']._serialized_end = 721 - _globals['_TRANSLATEDOFFSETRANGE']._serialized_start = 724 - _globals['_TRANSLATEDOFFSETRANGE']._serialized_end = 964 - _globals['_PENDINGENTRY']._serialized_start = 966 - _globals['_PENDINGENTRY']._serialized_end = 1085 - _globals['_PARTITIONSTATE']._serialized_start = 1088 - _globals['_PARTITIONSTATE']._serialized_end = 1233 - _globals['_TOPICSTATE']._serialized_start = 1236 - _globals['_TOPICSTATE']._serialized_end = 1637 - _globals['_TOPICSTATE_PARTITIONSTATESENTRY']._serialized_start = 1525 - _globals['_TOPICSTATE_PARTITIONSTATESENTRY']._serialized_end = 1637 - _globals['_DATALAKESERVICE']._serialized_start = 1774 - _globals['_DATALAKESERVICE']._serialized_end = 1964 \ No newline at end of file + _globals['_COORDINATORRESETTOPICSTATEREQUEST']._serialized_start = 344 + _globals['_COORDINATORRESETTOPICSTATEREQUEST']._serialized_end = 447 + _globals['_COORDINATORRESETTOPICSTATERESPONSE']._serialized_start = 449 + _globals['_COORDINATORRESETTOPICSTATERESPONSE']._serialized_end = 485 + _globals['_COORDINATORSTATE']._serialized_start = 488 + _globals['_COORDINATORSTATE']._serialized_end = 711 + _globals['_COORDINATORSTATE_TOPICSTATESENTRY']._serialized_start = 607 + _globals['_COORDINATORSTATE_TOPICSTATESENTRY']._serialized_end = 711 + _globals['_DATAFILE']._serialized_start = 714 + _globals['_DATAFILE']._serialized_end = 864 + _globals['_TRANSLATEDOFFSETRANGE']._serialized_start = 867 + _globals['_TRANSLATEDOFFSETRANGE']._serialized_end = 1107 + _globals['_PENDINGENTRY']._serialized_start = 1109 + _globals['_PENDINGENTRY']._serialized_end = 1228 + _globals['_PARTITIONSTATE']._serialized_start = 1231 + _globals['_PARTITIONSTATE']._serialized_end = 1376 + _globals['_TOPICSTATE']._serialized_start = 1379 + _globals['_TOPICSTATE']._serialized_end = 1780 + _globals['_TOPICSTATE_PARTITIONSTATESENTRY']._serialized_start = 1668 + _globals['_TOPICSTATE_PARTITIONSTATESENTRY']._serialized_end = 1780 + _globals['_DATALAKESERVICE']._serialized_start = 1917 + _globals['_DATALAKESERVICE']._serialized_end = 2301 \ No newline at end of file diff --git a/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/datalake/v1/datalake_pb2.pyi b/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/datalake/v1/datalake_pb2.pyi index dffc723d94f54..6d03b243e4428 100644 --- a/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/datalake/v1/datalake_pb2.pyi +++ b/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/datalake/v1/datalake_pb2.pyi @@ -85,6 +85,31 @@ class GetCoordinatorStateResponse(google.protobuf.message.Message): ... Global___GetCoordinatorStateResponse: typing_extensions.TypeAlias = GetCoordinatorStateResponse +@typing.final +class CoordinatorResetTopicStateRequest(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + TOPIC_NAME_FIELD_NUMBER: builtins.int + REVISION_FIELD_NUMBER: builtins.int + RESET_ALL_PARTITIONS_FIELD_NUMBER: builtins.int + topic_name: builtins.str + revision: builtins.int + reset_all_partitions: builtins.bool + + def __init__(self, *, topic_name: builtins.str=..., revision: builtins.int=..., reset_all_partitions: builtins.bool=...) -> None: + ... + + def ClearField(self, field_name: typing.Literal['reset_all_partitions', b'reset_all_partitions', 'revision', b'revision', 'topic_name', b'topic_name']) -> None: + ... +Global___CoordinatorResetTopicStateRequest: typing_extensions.TypeAlias = CoordinatorResetTopicStateRequest + +@typing.final +class CoordinatorResetTopicStateResponse(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + def __init__(self) -> None: + ... +Global___CoordinatorResetTopicStateResponse: typing_extensions.TypeAlias = CoordinatorResetTopicStateResponse + @typing.final class CoordinatorState(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor diff --git a/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/datalake/v1/datalake_pb2_connect.py b/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/datalake/v1/datalake_pb2_connect.py index 9eff710e1ecf0..d18960938c4d7 100644 --- a/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/datalake/v1/datalake_pb2_connect.py +++ b/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/datalake/v1/datalake_pb2_connect.py @@ -49,6 +49,21 @@ def get_coordinator_state(self, req: proto.redpanda.core.admin.internal.datalake raise ConnectProtocolError('missing response message') return msg + def call_coordinator_reset_topic_state(self, req: proto.redpanda.core.admin.internal.datalake.v1.datalake_pb2.CoordinatorResetTopicStateRequest, extra_headers: HeaderInput | None=None, timeout_seconds: float | None=None) -> UnaryOutput[proto.redpanda.core.admin.internal.datalake.v1.datalake_pb2.CoordinatorResetTopicStateResponse]: + """Low-level method to call CoordinatorResetTopicState, granting access to errors and metadata""" + url = self.base_url + '/redpanda.core.admin.internal.datalake.v1.DatalakeService/CoordinatorResetTopicState' + return self._connect_client.call_unary(url, req, proto.redpanda.core.admin.internal.datalake.v1.datalake_pb2.CoordinatorResetTopicStateResponse, extra_headers, timeout_seconds) + + def coordinator_reset_topic_state(self, req: proto.redpanda.core.admin.internal.datalake.v1.datalake_pb2.CoordinatorResetTopicStateRequest, extra_headers: HeaderInput | None=None, timeout_seconds: float | None=None) -> proto.redpanda.core.admin.internal.datalake.v1.datalake_pb2.CoordinatorResetTopicStateResponse: + response = self.call_coordinator_reset_topic_state(req, extra_headers, timeout_seconds) + err = response.error() + if err is not None: + raise err + msg = response.message() + if msg is None: + raise ConnectProtocolError('missing response message') + return msg + class AsyncDatalakeServiceClient: def __init__(self, base_url: str, http_client: aiohttp.ClientSession, protocol: ConnectProtocol=ConnectProtocol.CONNECT_PROTOBUF): @@ -70,14 +85,33 @@ async def get_coordinator_state(self, req: proto.redpanda.core.admin.internal.da raise ConnectProtocolError('missing response message') return msg + async def call_coordinator_reset_topic_state(self, req: proto.redpanda.core.admin.internal.datalake.v1.datalake_pb2.CoordinatorResetTopicStateRequest, extra_headers: HeaderInput | None=None, timeout_seconds: float | None=None) -> UnaryOutput[proto.redpanda.core.admin.internal.datalake.v1.datalake_pb2.CoordinatorResetTopicStateResponse]: + """Low-level method to call CoordinatorResetTopicState, granting access to errors and metadata""" + url = self.base_url + '/redpanda.core.admin.internal.datalake.v1.DatalakeService/CoordinatorResetTopicState' + return await self._connect_client.call_unary(url, req, proto.redpanda.core.admin.internal.datalake.v1.datalake_pb2.CoordinatorResetTopicStateResponse, extra_headers, timeout_seconds) + + async def coordinator_reset_topic_state(self, req: proto.redpanda.core.admin.internal.datalake.v1.datalake_pb2.CoordinatorResetTopicStateRequest, extra_headers: HeaderInput | None=None, timeout_seconds: float | None=None) -> proto.redpanda.core.admin.internal.datalake.v1.datalake_pb2.CoordinatorResetTopicStateResponse: + response = await self.call_coordinator_reset_topic_state(req, extra_headers, timeout_seconds) + err = response.error() + if err is not None: + raise err + msg = response.message() + if msg is None: + raise ConnectProtocolError('missing response message') + return msg + @typing.runtime_checkable class DatalakeServiceProtocol(typing.Protocol): def get_coordinator_state(self, req: ClientRequest[proto.redpanda.core.admin.internal.datalake.v1.datalake_pb2.GetCoordinatorStateRequest]) -> ServerResponse[proto.redpanda.core.admin.internal.datalake.v1.datalake_pb2.GetCoordinatorStateResponse]: ... + + def coordinator_reset_topic_state(self, req: ClientRequest[proto.redpanda.core.admin.internal.datalake.v1.datalake_pb2.CoordinatorResetTopicStateRequest]) -> ServerResponse[proto.redpanda.core.admin.internal.datalake.v1.datalake_pb2.CoordinatorResetTopicStateResponse]: + ... DATALAKE_SERVICE_PATH_PREFIX = '/redpanda.core.admin.internal.datalake.v1.DatalakeService' def wsgi_datalake_service(implementation: DatalakeServiceProtocol) -> WSGIApplication: app = ConnectWSGI() app.register_unary_rpc('/redpanda.core.admin.internal.datalake.v1.DatalakeService/GetCoordinatorState', implementation.get_coordinator_state, proto.redpanda.core.admin.internal.datalake.v1.datalake_pb2.GetCoordinatorStateRequest) + app.register_unary_rpc('/redpanda.core.admin.internal.datalake.v1.DatalakeService/CoordinatorResetTopicState', implementation.coordinator_reset_topic_state, proto.redpanda.core.admin.internal.datalake.v1.datalake_pb2.CoordinatorResetTopicStateRequest) return app \ No newline at end of file diff --git a/tests/rptest/clients/types.py b/tests/rptest/clients/types.py index e3aecf74443a1..5c3b6fe33c22d 100644 --- a/tests/rptest/clients/types.py +++ b/tests/rptest/clients/types.py @@ -42,6 +42,7 @@ class TopicSpec: PROPERTY_DELETE_RETENTION_MS = "delete.retention.ms" PROPERTY_ICEBERG_INVALID_RECORD_ACTION = "redpanda.iceberg.invalid.record.action" PROPERTY_ICEBERG_TARGET_LAG_MS = "redpanda.iceberg.target.lag.ms" + PROPERTY_ICEBERG_PARTITION_SPEC = "redpanda.iceberg.partition.spec" PROPERTY_MIN_CLEANABLE_DIRTY_RATIO = "min.cleanable.dirty.ratio" PROPERTY_MIN_COMPACTION_LAG_MS = "min.compaction.lag.ms" PROPERTY_MAX_COMPACTION_LAG_MS = "max.compaction.lag.ms" diff --git a/tests/rptest/tests/datalake/datalake_e2e_test.py b/tests/rptest/tests/datalake/datalake_e2e_test.py index 8243b738dc297..1421748f6b0bc 100644 --- a/tests/rptest/tests/datalake/datalake_e2e_test.py +++ b/tests/rptest/tests/datalake/datalake_e2e_test.py @@ -12,7 +12,7 @@ import re import time from random import randint -from typing import Callable, Any +from typing import Any, Callable from confluent_kafka import Producer, avro from confluent_kafka.avro import AvroProducer @@ -20,10 +20,12 @@ from ducktape.utils.util import wait_until from google import protobuf from google.protobuf import json_format as pb_json_format -from google.protobuf import text_format as pb_text_format from google.protobuf import message_factory +from google.protobuf import text_format as pb_text_format +from rptest.clients.admin import v2 as admin_v2 from rptest.clients.rpk import RpkTool +from rptest.clients.types import TopicSpec from rptest.services.admin import Admin from rptest.services.catalog_service import CatalogType from rptest.services.cluster import cluster @@ -1770,3 +1772,103 @@ def test_custom_namespace(self, cloud_storage_type, catalog_type): msg_count=count, namespace=self.test_namespace, ) + + +class DatalakeCoordinatorResetTest(RedpandaTest): + def __init__(self, test_ctx, *args, **kwargs): + super().__init__( + test_ctx, + num_brokers=1, + si_settings=SISettings(test_context=test_ctx), + extra_rp_conf={ + "iceberg_enabled": "true", + "iceberg_catalog_commit_interval_ms": 5000, + }, + *args, + **kwargs, + ) + self.test_ctx = test_ctx + self.topic_name = "test" + + def setUp(self): + """Redpanda will be started by DatalakeServices.""" + pass + + def _get_topic_state(self): + dl_pb = admin_v2.datalake_pb + admin = admin_v2.Admin(self.redpanda) + resp = admin.datalake().get_coordinator_state( + dl_pb.GetCoordinatorStateRequest() + ) + return resp.state.topic_states[self.topic_name] + + def _reset_coordinator_state(self, reset_all_partitions: bool): + dl_pb = admin_v2.datalake_pb + admin = admin_v2.Admin(self.redpanda) + rev = self._get_topic_state().revision + admin.datalake().coordinator_reset_topic_state( + dl_pb.CoordinatorResetTopicStateRequest( + topic_name=self.topic_name, + revision=rev, + reset_all_partitions=reset_all_partitions, + ) + ) + + def _count_pending_entries(self): + ts = self._get_topic_state() + return sum(len(ps.pending_entries) for ps in ts.partition_states.values()) + + @cluster(num_nodes=4) + @matrix( + cloud_storage_type=supported_storage_types(), + catalog_type=[CatalogType.REST_JDBC], + ) + def test_coordinator_reset(self, cloud_storage_type, catalog_type): + with DatalakeServices( + self.test_ctx, + redpanda=self.redpanda, + include_query_engines=[QueryEngineType.SPARK], + catalog_type=catalog_type, + ) as dl: + dl.create_iceberg_enabled_topic( + self.topic_name, + partitions=3, + config={ + TopicSpec.PROPERTY_ICEBERG_PARTITION_SPEC: "()", + }, + ) + dl.produce_to_topic(self.topic_name, msg_size=1024, msg_count=10) + dl.wait_for_translation(self.topic_name, msg_count=10) + + # Increase the commit interval to accumulate pending commits. + self.redpanda.set_cluster_config( + {"iceberg_catalog_commit_interval_ms": 100000} + ) + # Sleep twice the original commit interval to ensure we will be + # waiting on the new interval. + time.sleep(10) + + dl.produce_to_topic(self.topic_name, msg_size=1024, msg_count=10) + + wait_until( + lambda: self._count_pending_entries() > 0, + timeout_sec=30, + backoff_sec=1, + err_msg="Expected pending entries to be present after producing records", + ) + + self._reset_coordinator_state(reset_all_partitions=False) + assert self._count_pending_entries() > 0, ( + "Expected pending entries to still be present after no-op reset" + ) + + self._reset_coordinator_state(reset_all_partitions=True) + assert self._count_pending_entries() == 0, ( + "Expected pending entries to be cleared after coordinator reset" + ) + + # After a plain reset, no partition should have last_committed. + for pid, ps in self._get_topic_state().partition_states.items(): + assert not ps.HasField("last_committed"), ( + f"Partition {pid} has unexpected last_committed" + ) From ab91a5c13221002567603bcda7ca38e123b51acb Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Thu, 12 Feb 2026 17:09:53 +0000 Subject: [PATCH 2/2] datalake/coordinator: add partition overrides to reset MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In some cases we want to manually choose from where translation should resume after a coordinator reset. Add per-partition last_committed overrides and a reset_all_partitions flag. By default reset_all_partitions is false and the request is a no-op unless partition_overrides are provided — only the listed partitions have their pending entries cleared and last_committed set. Setting reset_all_partitions to true clears all partitions first, then applies any overrides. This default-safe design minimizes surprise and makes it harder to accidentally wipe state. (cherry picked from commit f44ca9da9fcb88afababaabf471ab59c59c3f516) --- .../admin/internal/datalake/v1/datalake.proto | 12 +++ src/v/datalake/coordinator/BUILD | 20 +++++ src/v/datalake/coordinator/coordinator.cc | 5 +- src/v/datalake/coordinator/coordinator.h | 6 +- src/v/datalake/coordinator/frontend.cc | 3 +- .../coordinator/partition_state_override.cc | 26 ++++++ .../coordinator/partition_state_override.h | 31 +++++++ src/v/datalake/coordinator/state_update.cc | 13 ++- src/v/datalake/coordinator/state_update.h | 7 +- src/v/datalake/coordinator/tests/BUILD | 2 + .../coordinator/tests/state_update_test.cc | 83 +++++++++++++++++++ src/v/datalake/coordinator/types.cc | 1 + src/v/datalake/coordinator/types.h | 23 +++-- .../admin/services/datalake/datalake.cc | 15 +++- .../internal/datalake/v1/datalake_pb2.py | 56 +++++++------ .../internal/datalake/v1/datalake_pb2.pyi | 53 +++++++++++- .../tests/datalake/datalake_e2e_test.py | 25 +++++- 17 files changed, 341 insertions(+), 40 deletions(-) create mode 100644 src/v/datalake/coordinator/partition_state_override.cc create mode 100644 src/v/datalake/coordinator/partition_state_override.h diff --git a/proto/redpanda/core/admin/internal/datalake/v1/datalake.proto b/proto/redpanda/core/admin/internal/datalake/v1/datalake.proto index 97d741d0707b7..1e01575db5dc6 100644 --- a/proto/redpanda/core/admin/internal/datalake/v1/datalake.proto +++ b/proto/redpanda/core/admin/internal/datalake/v1/datalake.proto @@ -51,10 +51,22 @@ message GetCoordinatorStateResponse { CoordinatorState state = 1; } +message PartitionStateOverride { + // Overrides last_committed for the partition. If not set, last_committed is + // not modified. + optional int64 last_committed = 1; +} + +// When reset_all_partitions is true, clears all per partition state +// (pending entries, last_committed, etc), then applies overrides. +// When false, clears pending entries only for partitions in partition_overrides +// and applies optional overrides. No-op if reset_all_partitions is false and +// partition_overrides is empty. message CoordinatorResetTopicStateRequest { string topic_name = 1; int64 revision = 2; bool reset_all_partitions = 3; + map partition_overrides = 4; } message CoordinatorResetTopicStateResponse {} diff --git a/src/v/datalake/coordinator/BUILD b/src/v/datalake/coordinator/BUILD index 22156a7d5750f..0bd6260c311e2 100644 --- a/src/v/datalake/coordinator/BUILD +++ b/src/v/datalake/coordinator/BUILD @@ -51,11 +51,13 @@ redpanda_cc_library( ], deps = [ ":file_committer", + ":partition_state_override", ":snapshot_remover", ":state_update", ":stm", "//src/v/cluster", "//src/v/config", + "//src/v/container:chunked_hash_map", "//src/v/container:chunked_vector", "//src/v/model", "@abseil-cpp//absl/hash", @@ -255,6 +257,21 @@ redpanda_cc_library( ], ) +redpanda_cc_library( + name = "partition_state_override", + srcs = [ + "partition_state_override.cc", + ], + hdrs = [ + "partition_state_override.h", + ], + deps = [ + "//src/v/model", + "//src/v/serde", + "@fmt", + ], +) + redpanda_cc_library( name = "state", srcs = [ @@ -283,6 +300,7 @@ redpanda_cc_library( ], visibility = ["//visibility:public"], deps = [ + ":partition_state_override", ":translated_offset_range", "//src/v/datalake:schema_identifier", "//src/v/datalake:types", @@ -335,9 +353,11 @@ redpanda_cc_library( "state_update.h", ], deps = [ + ":partition_state_override", ":state", ":translated_offset_range", "//src/v/base", + "//src/v/container:chunked_hash_map", "//src/v/container:chunked_vector", "//src/v/iceberg:manifest_entry", "//src/v/model", diff --git a/src/v/datalake/coordinator/coordinator.cc b/src/v/datalake/coordinator/coordinator.cc index 0b4a0276437c1..3ce0573df6e1b 100644 --- a/src/v/datalake/coordinator/coordinator.cc +++ b/src/v/datalake/coordinator/coordinator.cc @@ -889,7 +889,9 @@ ss::future> coordinator::sync_reset_topic_state( model::topic topic, model::revision_id topic_revision, - bool reset_all_partitions) { + bool reset_all_partitions, + chunked_hash_map + partition_overrides) { auto gate = maybe_gate(); if (gate.has_error()) { co_return gate.error(); @@ -905,6 +907,7 @@ coordinator::sync_reset_topic_state( .topic = topic, .topic_revision = topic_revision, .reset_all_partitions = reset_all_partitions, + .partition_overrides = std::move(partition_overrides), }; auto check_res = update.can_apply(stm_->state()); if (check_res.has_error()) { diff --git a/src/v/datalake/coordinator/coordinator.h b/src/v/datalake/coordinator/coordinator.h index 04b89a4029a3b..ee9eefa51ce86 100644 --- a/src/v/datalake/coordinator/coordinator.h +++ b/src/v/datalake/coordinator/coordinator.h @@ -12,8 +12,10 @@ #include "absl/hash/hash.h" #include "cluster/fwd.h" #include "config/property.h" +#include "container/chunked_hash_map.h" #include "container/chunked_vector.h" #include "datalake/coordinator/file_committer.h" +#include "datalake/coordinator/partition_state_override.h" #include "datalake/coordinator/snapshot_remover.h" #include "datalake/coordinator/state_machine.h" #include "datalake/fwd.h" @@ -92,7 +94,9 @@ class coordinator { ss::future> sync_reset_topic_state( model::topic topic, model::revision_id topic_rev, - bool reset_all_partitions); + bool reset_all_partitions, + chunked_hash_map + partition_overrides); void notify_leadership(std::optional); diff --git a/src/v/datalake/coordinator/frontend.cc b/src/v/datalake/coordinator/frontend.cc index a4afc277383d1..ca395bef2d1f8 100644 --- a/src/v/datalake/coordinator/frontend.cc +++ b/src/v/datalake/coordinator/frontend.cc @@ -517,7 +517,8 @@ ss::future frontend::reset_topic_state_locally( ->sync_reset_topic_state( request.topic, request.topic_revision, - request.reset_all_partitions) + request.reset_all_partitions, + std::move(request.partition_overrides)) .then([](auto result) { reset_topic_state_reply resp{}; if (result.has_error()) { diff --git a/src/v/datalake/coordinator/partition_state_override.cc b/src/v/datalake/coordinator/partition_state_override.cc new file mode 100644 index 0000000000000..942fae8ef0ec3 --- /dev/null +++ b/src/v/datalake/coordinator/partition_state_override.cc @@ -0,0 +1,26 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "datalake/coordinator/partition_state_override.h" + +#include + +namespace datalake::coordinator { + +std::ostream& operator<<(std::ostream& o, const partition_state_override& p) { + if (p.last_committed.has_value()) { + fmt::print(o, "{{last_committed: {}}}", p.last_committed.value()); + } else { + fmt::print(o, "{{last_committed: nullopt}}"); + } + return o; +} + +} // namespace datalake::coordinator diff --git a/src/v/datalake/coordinator/partition_state_override.h b/src/v/datalake/coordinator/partition_state_override.h new file mode 100644 index 0000000000000..964bc4dd0aa59 --- /dev/null +++ b/src/v/datalake/coordinator/partition_state_override.h @@ -0,0 +1,31 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#pragma once + +#include "model/fundamental.h" +#include "serde/envelope.h" + +namespace datalake::coordinator { + +struct partition_state_override + : public serde::envelope< + partition_state_override, + serde::version<0>, + serde::compat_version<0>> { + std::optional last_committed; + + auto serde_fields() { return std::tie(last_committed); } + + friend std::ostream& + operator<<(std::ostream&, const partition_state_override&); +}; + +} // namespace datalake::coordinator diff --git a/src/v/datalake/coordinator/state_update.cc b/src/v/datalake/coordinator/state_update.cc index 0d1741e0e9a74..f565ed143f106 100644 --- a/src/v/datalake/coordinator/state_update.cc +++ b/src/v/datalake/coordinator/state_update.cc @@ -324,6 +324,13 @@ reset_topic_state_update::apply(topics_state& state) { if (reset_all_partitions) { t_state.pid_to_pending_files.clear(); } + for (auto& [pid, po] : partition_overrides) { + auto& ps = t_state.pid_to_pending_files[pid]; + ps.pending_entries.clear(); + if (po.last_committed.has_value()) { + ps.last_committed = po.last_committed.value(); + } + } return outcome::success(); } @@ -368,10 +375,12 @@ std::ostream& operator<<(std::ostream& o, const topic_lifecycle_update& u) { std::ostream& operator<<(std::ostream& o, const reset_topic_state_update& u) { fmt::print( o, - "{{topic: {}, revision: {}, reset_all_partitions: {}}}", + "{{topic: {}, revision: {}, reset_all_partitions: {}, " + "partition_overrides: {} entries}}", u.topic, u.topic_revision, - u.reset_all_partitions); + u.reset_all_partitions, + u.partition_overrides.size()); return o; } diff --git a/src/v/datalake/coordinator/state_update.h b/src/v/datalake/coordinator/state_update.h index d1d54fef3d513..0ea80931c739b 100644 --- a/src/v/datalake/coordinator/state_update.h +++ b/src/v/datalake/coordinator/state_update.h @@ -10,7 +10,9 @@ #pragma once #include "base/outcome.h" +#include "container/chunked_hash_map.h" #include "container/chunked_vector.h" +#include "datalake/coordinator/partition_state_override.h" #include "datalake/coordinator/state.h" #include "datalake/coordinator/translated_offset_range.h" #include "iceberg/manifest_entry.h" @@ -127,9 +129,12 @@ struct reset_topic_state_update model::topic topic; model::revision_id topic_revision; bool reset_all_partitions{false}; + chunked_hash_map + partition_overrides; auto serde_fields() { - return std::tie(topic, topic_revision, reset_all_partitions); + return std::tie( + topic, topic_revision, reset_all_partitions, partition_overrides); } checked can_apply(const topics_state&); diff --git a/src/v/datalake/coordinator/tests/BUILD b/src/v/datalake/coordinator/tests/BUILD index e71b6551e57ba..58b1de1fe37b7 100644 --- a/src/v/datalake/coordinator/tests/BUILD +++ b/src/v/datalake/coordinator/tests/BUILD @@ -122,7 +122,9 @@ redpanda_cc_gtest( ], deps = [ ":state_test_utils", + "//src/v/container:chunked_hash_map", "//src/v/container:chunked_vector", + "//src/v/datalake/coordinator:partition_state_override", "//src/v/datalake/coordinator:state", "//src/v/datalake/coordinator:state_update", "//src/v/datalake/coordinator:translated_offset_range", diff --git a/src/v/datalake/coordinator/tests/state_update_test.cc b/src/v/datalake/coordinator/tests/state_update_test.cc index 567304be12f7d..5dbc6c0a89ffd 100644 --- a/src/v/datalake/coordinator/tests/state_update_test.cc +++ b/src/v/datalake/coordinator/tests/state_update_test.cc @@ -7,7 +7,9 @@ * * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md */ +#include "container/chunked_hash_map.h" #include "container/chunked_vector.h" +#include "datalake/coordinator/partition_state_override.h" #include "datalake/coordinator/state.h" #include "datalake/coordinator/state_update.h" #include "datalake/coordinator/tests/state_test_utils.h" @@ -375,3 +377,84 @@ TEST(StateUpdateTest, TestResetNoOp) { ASSERT_NO_FATAL_FAILURE( check_partition(state, tp, std::nullopt, {{0, 100}})); } + +TEST(StateUpdateTest, TestResetAllWithOverrides) { + const model::partition_id pid1{1}; + const model::topic_partition tp1{topic, pid1}; + + topics_state state; + ASSERT_FALSE( + apply_lc_transition(state, rev, topic_state::lifecycle_state_t::live) + .has_error()); + + // Add pending files to two partitions. + for (const auto& t : {tp, tp1}) { + auto res = add_files_update::build( + state, t, rev, make_pending_files({{0, 100}})) + .value() + .apply(state, model::offset{}); + ASSERT_FALSE(res.has_error()); + } + + // Full reset with an override on pid 0 only. + chunked_hash_map overrides; + overrides[pid] = partition_state_override{ + .last_committed = kafka::offset{50}}; + + reset_topic_state_update update2{ + .topic = topic, + .topic_revision = rev, + .reset_all_partitions = true, + .partition_overrides = std::move(overrides), + }; + ASSERT_FALSE(update2.apply(state).has_error()); + + // pid 0: cleared, last_committed set to 50. + ASSERT_NO_FATAL_FAILURE(check_partition(state, tp, 50, {})); + // pid 1: cleared entirely (no override). + auto ps1 = state.partition_state(tp1); + ASSERT_FALSE(ps1.has_value()); +} + +TEST(StateUpdateTest, TestPartialReset) { + const model::partition_id pid1{1}; + const model::partition_id pid2{2}; + const model::topic_partition tp1{topic, pid1}; + const model::topic_partition tp2{topic, pid2}; + + topics_state state; + ASSERT_FALSE( + apply_lc_transition(state, rev, topic_state::lifecycle_state_t::live) + .has_error()); + + // Add pending files to pid 0 and pid 1. + for (const auto& t : {tp, tp1}) { + auto res = add_files_update::build( + state, t, rev, make_pending_files({{0, 100}})) + .value() + .apply(state, model::offset{}); + ASSERT_FALSE(res.has_error()); + } + + // Partial reset: override pid 0 and pid 2 (pid 2 doesn't exist yet). + chunked_hash_map overrides; + overrides[pid] = partition_state_override{ + .last_committed = kafka::offset{42}}; + overrides[pid2] = partition_state_override{ + .last_committed = kafka::offset{99}}; + + reset_topic_state_update update3{ + .topic = topic, + .topic_revision = rev, + .partition_overrides = std::move(overrides), + }; + ASSERT_FALSE(update3.apply(state).has_error()); + + // pid 0: pending cleared, last_committed set. + ASSERT_NO_FATAL_FAILURE(check_partition(state, tp, 42, {})); + // pid 1: untouched. + ASSERT_NO_FATAL_FAILURE( + check_partition(state, tp1, std::nullopt, {{0, 100}})); + // pid 2: created with last_committed. + ASSERT_NO_FATAL_FAILURE(check_partition(state, tp2, 99, {})); +} diff --git a/src/v/datalake/coordinator/types.cc b/src/v/datalake/coordinator/types.cc index bf3f944d48502..d1e031fc20f03 100644 --- a/src/v/datalake/coordinator/types.cc +++ b/src/v/datalake/coordinator/types.cc @@ -140,6 +140,7 @@ operator<<(std::ostream& o, const get_topic_state_request& request) { request.topics_filter); return o; } + std::ostream& operator<<(std::ostream& o, const reset_topic_state_reply& reply) { fmt::print(o, "{{errc: {}}}", reply.errc); diff --git a/src/v/datalake/coordinator/types.h b/src/v/datalake/coordinator/types.h index 95a4265edb369..e0ef5e4f27326 100644 --- a/src/v/datalake/coordinator/types.h +++ b/src/v/datalake/coordinator/types.h @@ -9,7 +9,9 @@ */ #pragma once +#include "container/chunked_hash_map.h" #include "container/chunked_vector.h" +#include "datalake/coordinator/partition_state_override.h" #include "datalake/coordinator/state.h" #include "datalake/coordinator/translated_offset_range.h" #include "datalake/errors.h" @@ -407,6 +409,8 @@ struct reset_topic_state_request model::topic topic; model::revision_id topic_revision; bool reset_all_partitions{false}; + chunked_hash_map + partition_overrides; reset_topic_state_request() = default; @@ -414,11 +418,15 @@ struct reset_topic_state_request model::partition_id coordinator_partition, model::topic topic, model::revision_id topic_revision, - bool reset_all_partitions = false) + bool reset_all_partitions = false, + chunked_hash_map + partition_overrides + = {}) : coordinator_partition(coordinator_partition) , topic(std::move(topic)) , topic_revision(topic_revision) - , reset_all_partitions(reset_all_partitions) {} + , reset_all_partitions(reset_all_partitions) + , partition_overrides(std::move(partition_overrides)) {} model::partition_id get_coordinator_partition() const { return coordinator_partition; @@ -429,17 +437,22 @@ struct reset_topic_state_request fmt::print( o, "{{coordinator_partition: {}, topic: {}, topic_revision: {}, " - "reset_all_partitions: {}}}", + "reset_all_partitions: {}, partition_overrides: {} entries}}", req.coordinator_partition, req.topic, req.topic_revision, - req.reset_all_partitions); + req.reset_all_partitions, + req.partition_overrides.size()); return o; } auto serde_fields() { return std::tie( - coordinator_partition, topic, topic_revision, reset_all_partitions); + coordinator_partition, + topic, + topic_revision, + reset_all_partitions, + partition_overrides); } }; diff --git a/src/v/redpanda/admin/services/datalake/datalake.cc b/src/v/redpanda/admin/services/datalake/datalake.cc index fbb9722976e9d..c956d66be9794 100644 --- a/src/v/redpanda/admin/services/datalake/datalake.cc +++ b/src/v/redpanda/admin/services/datalake/datalake.cc @@ -216,12 +216,25 @@ datalake_service_impl::coordinator_reset_topic_state( model::revision_id topic_revision{req.get_revision()}; + chunked_hash_map< + model::partition_id, + datalake::coordinator::partition_state_override> + partition_overrides; + for (const auto& [pid, po] : req.get_partition_overrides()) { + datalake::coordinator::partition_state_override o; + if (po.has_last_committed()) { + o.last_committed = kafka::offset{po.get_last_committed()}; + } + partition_overrides.emplace(model::partition_id{pid}, std::move(o)); + } + auto fe_res = co_await _coordinator_fe->local().reset_topic_state( datalake::coordinator::reset_topic_state_request( partition_opt.value(), topic, topic_revision, - req.get_reset_all_partitions())); + req.get_reset_all_partitions(), + std::move(partition_overrides))); if (fe_res.errc != datalake::coordinator::errc::ok) { throw serde::pb::rpc::internal_exception( fmt::format( diff --git a/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/datalake/v1/datalake_pb2.py b/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/datalake/v1/datalake_pb2.py index ccedae4b0ba01..a94a6f29b6477 100644 --- a/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/datalake/v1/datalake_pb2.py +++ b/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/datalake/v1/datalake_pb2.py @@ -8,13 +8,15 @@ _sym_db = _symbol_database.Default() from ........proto.redpanda.core.pbgen import options_pb2 as proto_dot_redpanda_dot_core_dot_pbgen_dot_options__pb2 from ........proto.redpanda.core.pbgen import rpc_pb2 as proto_dot_redpanda_dot_core_dot_pbgen_dot_rpc__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n=proto/redpanda/core/admin/internal/datalake/v1/datalake.proto\x12(redpanda.core.admin.internal.datalake.v1\x1a\'proto/redpanda/core/pbgen/options.proto\x1a#proto/redpanda/core/pbgen/rpc.proto"3\n\x1aGetCoordinatorStateRequest\x12\x15\n\rtopics_filter\x18\x01 \x03(\t"h\n\x1bGetCoordinatorStateResponse\x12I\n\x05state\x18\x01 \x01(\x0b2:.redpanda.core.admin.internal.datalake.v1.CoordinatorState"g\n!CoordinatorResetTopicStateRequest\x12\x12\n\ntopic_name\x18\x01 \x01(\t\x12\x10\n\x08revision\x18\x02 \x01(\x03\x12\x1c\n\x14reset_all_partitions\x18\x03 \x01(\x08"$\n"CoordinatorResetTopicStateResponse"\xdf\x01\n\x10CoordinatorState\x12a\n\x0ctopic_states\x18\x01 \x03(\x0b2K.redpanda.core.admin.internal.datalake.v1.CoordinatorState.TopicStatesEntry\x1ah\n\x10TopicStatesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12C\n\x05value\x18\x02 \x01(\x0b24.redpanda.core.admin.internal.datalake.v1.TopicState:\x028\x01"\x96\x01\n\x08DataFile\x12\x13\n\x0bremote_path\x18\x01 \x01(\t\x12\x11\n\trow_count\x18\x02 \x01(\x04\x12\x17\n\x0ffile_size_bytes\x18\x03 \x01(\x04\x12\x17\n\x0ftable_schema_id\x18\x04 \x01(\x05\x12\x19\n\x11partition_spec_id\x18\x05 \x01(\x05\x12\x15\n\rpartition_key\x18\x06 \x03(\x0c"\xf0\x01\n\x15TranslatedOffsetRange\x12\x14\n\x0cstart_offset\x18\x01 \x01(\x03\x12\x13\n\x0blast_offset\x18\x02 \x01(\x03\x12F\n\ndata_files\x18\x03 \x03(\x0b22.redpanda.core.admin.internal.datalake.v1.DataFile\x12E\n\tdlq_files\x18\x04 \x03(\x0b22.redpanda.core.admin.internal.datalake.v1.DataFile\x12\x1d\n\x15kafka_processed_bytes\x18\x05 \x01(\x04"w\n\x0cPendingEntry\x12M\n\x04data\x18\x01 \x01(\x0b2?.redpanda.core.admin.internal.datalake.v1.TranslatedOffsetRange\x12\x18\n\x10added_pending_at\x18\x02 \x01(\x03"\x91\x01\n\x0ePartitionState\x12O\n\x0fpending_entries\x18\x01 \x03(\x0b26.redpanda.core.admin.internal.datalake.v1.PendingEntry\x12\x1b\n\x0elast_committed\x18\x02 \x01(\x03H\x00\x88\x01\x01B\x11\n\x0f_last_committed"\x91\x03\n\nTopicState\x12\x10\n\x08revision\x18\x01 \x01(\x03\x12c\n\x10partition_states\x18\x02 \x03(\x0b2I.redpanda.core.admin.internal.datalake.v1.TopicState.PartitionStatesEntry\x12Q\n\x0flifecycle_state\x18\x03 \x01(\x0e28.redpanda.core.admin.internal.datalake.v1.LifecycleState\x12#\n\x1btotal_kafka_processed_bytes\x18\x04 \x01(\x04\x12"\n\x1alast_committed_snapshot_id\x18\x05 \x01(\x04\x1ap\n\x14PartitionStatesEntry\x12\x0b\n\x03key\x18\x01 \x01(\x05\x12G\n\x05value\x18\x02 \x01(\x0b28.redpanda.core.admin.internal.datalake.v1.PartitionState:\x028\x01*\x83\x01\n\x0eLifecycleState\x12\x1f\n\x1bLIFECYCLE_STATE_UNSPECIFIED\x10\x00\x12\x18\n\x14LIFECYCLE_STATE_LIVE\x10\x01\x12\x1a\n\x16LIFECYCLE_STATE_CLOSED\x10\x02\x12\x1a\n\x16LIFECYCLE_STATE_PURGED\x10\x032\x80\x03\n\x0fDatalakeService\x12\xaa\x01\n\x13GetCoordinatorState\x12D.redpanda.core.admin.internal.datalake.v1.GetCoordinatorStateRequest\x1aE.redpanda.core.admin.internal.datalake.v1.GetCoordinatorStateResponse"\x06\xea\x92\x19\x02\x10\x03\x12\xbf\x01\n\x1aCoordinatorResetTopicState\x12K.redpanda.core.admin.internal.datalake.v1.CoordinatorResetTopicStateRequest\x1aL.redpanda.core.admin.internal.datalake.v1.CoordinatorResetTopicStateResponse"\x06\xea\x92\x19\x02\x10\x03B\x10\xea\x92\x19\x0cproto::adminb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n=proto/redpanda/core/admin/internal/datalake/v1/datalake.proto\x12(redpanda.core.admin.internal.datalake.v1\x1a\'proto/redpanda/core/pbgen/options.proto\x1a#proto/redpanda/core/pbgen/rpc.proto"3\n\x1aGetCoordinatorStateRequest\x12\x15\n\rtopics_filter\x18\x01 \x03(\t"h\n\x1bGetCoordinatorStateResponse\x12I\n\x05state\x18\x01 \x01(\x0b2:.redpanda.core.admin.internal.datalake.v1.CoordinatorState"H\n\x16PartitionStateOverride\x12\x1b\n\x0elast_committed\x18\x01 \x01(\x03H\x00\x88\x01\x01B\x11\n\x0f_last_committed"\xe7\x02\n!CoordinatorResetTopicStateRequest\x12\x12\n\ntopic_name\x18\x01 \x01(\t\x12\x10\n\x08revision\x18\x02 \x01(\x03\x12\x1c\n\x14reset_all_partitions\x18\x03 \x01(\x08\x12\x80\x01\n\x13partition_overrides\x18\x04 \x03(\x0b2c.redpanda.core.admin.internal.datalake.v1.CoordinatorResetTopicStateRequest.PartitionOverridesEntry\x1a{\n\x17PartitionOverridesEntry\x12\x0b\n\x03key\x18\x01 \x01(\x05\x12O\n\x05value\x18\x02 \x01(\x0b2@.redpanda.core.admin.internal.datalake.v1.PartitionStateOverride:\x028\x01"$\n"CoordinatorResetTopicStateResponse"\xdf\x01\n\x10CoordinatorState\x12a\n\x0ctopic_states\x18\x01 \x03(\x0b2K.redpanda.core.admin.internal.datalake.v1.CoordinatorState.TopicStatesEntry\x1ah\n\x10TopicStatesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12C\n\x05value\x18\x02 \x01(\x0b24.redpanda.core.admin.internal.datalake.v1.TopicState:\x028\x01"\x96\x01\n\x08DataFile\x12\x13\n\x0bremote_path\x18\x01 \x01(\t\x12\x11\n\trow_count\x18\x02 \x01(\x04\x12\x17\n\x0ffile_size_bytes\x18\x03 \x01(\x04\x12\x17\n\x0ftable_schema_id\x18\x04 \x01(\x05\x12\x19\n\x11partition_spec_id\x18\x05 \x01(\x05\x12\x15\n\rpartition_key\x18\x06 \x03(\x0c"\xf0\x01\n\x15TranslatedOffsetRange\x12\x14\n\x0cstart_offset\x18\x01 \x01(\x03\x12\x13\n\x0blast_offset\x18\x02 \x01(\x03\x12F\n\ndata_files\x18\x03 \x03(\x0b22.redpanda.core.admin.internal.datalake.v1.DataFile\x12E\n\tdlq_files\x18\x04 \x03(\x0b22.redpanda.core.admin.internal.datalake.v1.DataFile\x12\x1d\n\x15kafka_processed_bytes\x18\x05 \x01(\x04"w\n\x0cPendingEntry\x12M\n\x04data\x18\x01 \x01(\x0b2?.redpanda.core.admin.internal.datalake.v1.TranslatedOffsetRange\x12\x18\n\x10added_pending_at\x18\x02 \x01(\x03"\x91\x01\n\x0ePartitionState\x12O\n\x0fpending_entries\x18\x01 \x03(\x0b26.redpanda.core.admin.internal.datalake.v1.PendingEntry\x12\x1b\n\x0elast_committed\x18\x02 \x01(\x03H\x00\x88\x01\x01B\x11\n\x0f_last_committed"\x91\x03\n\nTopicState\x12\x10\n\x08revision\x18\x01 \x01(\x03\x12c\n\x10partition_states\x18\x02 \x03(\x0b2I.redpanda.core.admin.internal.datalake.v1.TopicState.PartitionStatesEntry\x12Q\n\x0flifecycle_state\x18\x03 \x01(\x0e28.redpanda.core.admin.internal.datalake.v1.LifecycleState\x12#\n\x1btotal_kafka_processed_bytes\x18\x04 \x01(\x04\x12"\n\x1alast_committed_snapshot_id\x18\x05 \x01(\x04\x1ap\n\x14PartitionStatesEntry\x12\x0b\n\x03key\x18\x01 \x01(\x05\x12G\n\x05value\x18\x02 \x01(\x0b28.redpanda.core.admin.internal.datalake.v1.PartitionState:\x028\x01*\x83\x01\n\x0eLifecycleState\x12\x1f\n\x1bLIFECYCLE_STATE_UNSPECIFIED\x10\x00\x12\x18\n\x14LIFECYCLE_STATE_LIVE\x10\x01\x12\x1a\n\x16LIFECYCLE_STATE_CLOSED\x10\x02\x12\x1a\n\x16LIFECYCLE_STATE_PURGED\x10\x032\x80\x03\n\x0fDatalakeService\x12\xaa\x01\n\x13GetCoordinatorState\x12D.redpanda.core.admin.internal.datalake.v1.GetCoordinatorStateRequest\x1aE.redpanda.core.admin.internal.datalake.v1.GetCoordinatorStateResponse"\x06\xea\x92\x19\x02\x10\x03\x12\xbf\x01\n\x1aCoordinatorResetTopicState\x12K.redpanda.core.admin.internal.datalake.v1.CoordinatorResetTopicStateRequest\x1aL.redpanda.core.admin.internal.datalake.v1.CoordinatorResetTopicStateResponse"\x06\xea\x92\x19\x02\x10\x03B\x10\xea\x92\x19\x0cproto::adminb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'proto.redpanda.core.admin.internal.datalake.v1.datalake_pb2', _globals) if not _descriptor._USE_C_DESCRIPTORS: _globals['DESCRIPTOR']._loaded_options = None _globals['DESCRIPTOR']._serialized_options = b'\xea\x92\x19\x0cproto::admin' + _globals['_COORDINATORRESETTOPICSTATEREQUEST_PARTITIONOVERRIDESENTRY']._loaded_options = None + _globals['_COORDINATORRESETTOPICSTATEREQUEST_PARTITIONOVERRIDESENTRY']._serialized_options = b'8\x01' _globals['_COORDINATORSTATE_TOPICSTATESENTRY']._loaded_options = None _globals['_COORDINATORSTATE_TOPICSTATESENTRY']._serialized_options = b'8\x01' _globals['_TOPICSTATE_PARTITIONSTATESENTRY']._loaded_options = None @@ -23,31 +25,35 @@ _globals['_DATALAKESERVICE'].methods_by_name['GetCoordinatorState']._serialized_options = b'\xea\x92\x19\x02\x10\x03' _globals['_DATALAKESERVICE'].methods_by_name['CoordinatorResetTopicState']._loaded_options = None _globals['_DATALAKESERVICE'].methods_by_name['CoordinatorResetTopicState']._serialized_options = b'\xea\x92\x19\x02\x10\x03' - _globals['_LIFECYCLESTATE']._serialized_start = 1783 - _globals['_LIFECYCLESTATE']._serialized_end = 1914 + _globals['_LIFECYCLESTATE']._serialized_start = 2114 + _globals['_LIFECYCLESTATE']._serialized_end = 2245 _globals['_GETCOORDINATORSTATEREQUEST']._serialized_start = 185 _globals['_GETCOORDINATORSTATEREQUEST']._serialized_end = 236 _globals['_GETCOORDINATORSTATERESPONSE']._serialized_start = 238 _globals['_GETCOORDINATORSTATERESPONSE']._serialized_end = 342 - _globals['_COORDINATORRESETTOPICSTATEREQUEST']._serialized_start = 344 - _globals['_COORDINATORRESETTOPICSTATEREQUEST']._serialized_end = 447 - _globals['_COORDINATORRESETTOPICSTATERESPONSE']._serialized_start = 449 - _globals['_COORDINATORRESETTOPICSTATERESPONSE']._serialized_end = 485 - _globals['_COORDINATORSTATE']._serialized_start = 488 - _globals['_COORDINATORSTATE']._serialized_end = 711 - _globals['_COORDINATORSTATE_TOPICSTATESENTRY']._serialized_start = 607 - _globals['_COORDINATORSTATE_TOPICSTATESENTRY']._serialized_end = 711 - _globals['_DATAFILE']._serialized_start = 714 - _globals['_DATAFILE']._serialized_end = 864 - _globals['_TRANSLATEDOFFSETRANGE']._serialized_start = 867 - _globals['_TRANSLATEDOFFSETRANGE']._serialized_end = 1107 - _globals['_PENDINGENTRY']._serialized_start = 1109 - _globals['_PENDINGENTRY']._serialized_end = 1228 - _globals['_PARTITIONSTATE']._serialized_start = 1231 - _globals['_PARTITIONSTATE']._serialized_end = 1376 - _globals['_TOPICSTATE']._serialized_start = 1379 - _globals['_TOPICSTATE']._serialized_end = 1780 - _globals['_TOPICSTATE_PARTITIONSTATESENTRY']._serialized_start = 1668 - _globals['_TOPICSTATE_PARTITIONSTATESENTRY']._serialized_end = 1780 - _globals['_DATALAKESERVICE']._serialized_start = 1917 - _globals['_DATALAKESERVICE']._serialized_end = 2301 \ No newline at end of file + _globals['_PARTITIONSTATEOVERRIDE']._serialized_start = 344 + _globals['_PARTITIONSTATEOVERRIDE']._serialized_end = 416 + _globals['_COORDINATORRESETTOPICSTATEREQUEST']._serialized_start = 419 + _globals['_COORDINATORRESETTOPICSTATEREQUEST']._serialized_end = 778 + _globals['_COORDINATORRESETTOPICSTATEREQUEST_PARTITIONOVERRIDESENTRY']._serialized_start = 655 + _globals['_COORDINATORRESETTOPICSTATEREQUEST_PARTITIONOVERRIDESENTRY']._serialized_end = 778 + _globals['_COORDINATORRESETTOPICSTATERESPONSE']._serialized_start = 780 + _globals['_COORDINATORRESETTOPICSTATERESPONSE']._serialized_end = 816 + _globals['_COORDINATORSTATE']._serialized_start = 819 + _globals['_COORDINATORSTATE']._serialized_end = 1042 + _globals['_COORDINATORSTATE_TOPICSTATESENTRY']._serialized_start = 938 + _globals['_COORDINATORSTATE_TOPICSTATESENTRY']._serialized_end = 1042 + _globals['_DATAFILE']._serialized_start = 1045 + _globals['_DATAFILE']._serialized_end = 1195 + _globals['_TRANSLATEDOFFSETRANGE']._serialized_start = 1198 + _globals['_TRANSLATEDOFFSETRANGE']._serialized_end = 1438 + _globals['_PENDINGENTRY']._serialized_start = 1440 + _globals['_PENDINGENTRY']._serialized_end = 1559 + _globals['_PARTITIONSTATE']._serialized_start = 1562 + _globals['_PARTITIONSTATE']._serialized_end = 1707 + _globals['_TOPICSTATE']._serialized_start = 1710 + _globals['_TOPICSTATE']._serialized_end = 2111 + _globals['_TOPICSTATE_PARTITIONSTATESENTRY']._serialized_start = 1999 + _globals['_TOPICSTATE_PARTITIONSTATESENTRY']._serialized_end = 2111 + _globals['_DATALAKESERVICE']._serialized_start = 2248 + _globals['_DATALAKESERVICE']._serialized_end = 2632 \ No newline at end of file diff --git a/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/datalake/v1/datalake_pb2.pyi b/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/datalake/v1/datalake_pb2.pyi index 6d03b243e4428..2d0478b4c7560 100644 --- a/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/datalake/v1/datalake_pb2.pyi +++ b/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/datalake/v1/datalake_pb2.pyi @@ -85,20 +85,69 @@ class GetCoordinatorStateResponse(google.protobuf.message.Message): ... Global___GetCoordinatorStateResponse: typing_extensions.TypeAlias = GetCoordinatorStateResponse +@typing.final +class PartitionStateOverride(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + LAST_COMMITTED_FIELD_NUMBER: builtins.int + last_committed: builtins.int + + def __init__(self, *, last_committed: builtins.int | None=...) -> None: + ... + + def HasField(self, field_name: typing.Literal['_last_committed', b'_last_committed', 'last_committed', b'last_committed']) -> builtins.bool: + ... + + def ClearField(self, field_name: typing.Literal['_last_committed', b'_last_committed', 'last_committed', b'last_committed']) -> None: + ... + + def WhichOneof(self, oneof_group: typing.Literal['_last_committed', b'_last_committed']) -> typing.Literal['last_committed'] | None: + ... +Global___PartitionStateOverride: typing_extensions.TypeAlias = PartitionStateOverride + @typing.final class CoordinatorResetTopicStateRequest(google.protobuf.message.Message): + """When reset_all_partitions is true, clears pending entries and + last_committed for all partitions, then applies overrides. When false, + clears pending entries only for partitions in partition_overrides and + applies their overrides. No-op when false and partition_overrides is empty. + """ DESCRIPTOR: google.protobuf.descriptor.Descriptor + + @typing.final + class PartitionOverridesEntry(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + KEY_FIELD_NUMBER: builtins.int + VALUE_FIELD_NUMBER: builtins.int + key: builtins.int + + @property + def value(self) -> Global___PartitionStateOverride: + ... + + def __init__(self, *, key: builtins.int=..., value: Global___PartitionStateOverride | None=...) -> None: + ... + + def HasField(self, field_name: typing.Literal['value', b'value']) -> builtins.bool: + ... + + def ClearField(self, field_name: typing.Literal['key', b'key', 'value', b'value']) -> None: + ... TOPIC_NAME_FIELD_NUMBER: builtins.int REVISION_FIELD_NUMBER: builtins.int RESET_ALL_PARTITIONS_FIELD_NUMBER: builtins.int + PARTITION_OVERRIDES_FIELD_NUMBER: builtins.int topic_name: builtins.str revision: builtins.int reset_all_partitions: builtins.bool - def __init__(self, *, topic_name: builtins.str=..., revision: builtins.int=..., reset_all_partitions: builtins.bool=...) -> None: + @property + def partition_overrides(self) -> google.protobuf.internal.containers.MessageMap[builtins.int, Global___PartitionStateOverride]: + ... + + def __init__(self, *, topic_name: builtins.str=..., revision: builtins.int=..., reset_all_partitions: builtins.bool=..., partition_overrides: collections.abc.Mapping[builtins.int, Global___PartitionStateOverride] | None=...) -> None: ... - def ClearField(self, field_name: typing.Literal['reset_all_partitions', b'reset_all_partitions', 'revision', b'revision', 'topic_name', b'topic_name']) -> None: + def ClearField(self, field_name: typing.Literal['partition_overrides', b'partition_overrides', 'reset_all_partitions', b'reset_all_partitions', 'revision', b'revision', 'topic_name', b'topic_name']) -> None: ... Global___CoordinatorResetTopicStateRequest: typing_extensions.TypeAlias = CoordinatorResetTopicStateRequest diff --git a/tests/rptest/tests/datalake/datalake_e2e_test.py b/tests/rptest/tests/datalake/datalake_e2e_test.py index 1421748f6b0bc..776cee5da083c 100644 --- a/tests/rptest/tests/datalake/datalake_e2e_test.py +++ b/tests/rptest/tests/datalake/datalake_e2e_test.py @@ -1802,7 +1802,9 @@ def _get_topic_state(self): ) return resp.state.topic_states[self.topic_name] - def _reset_coordinator_state(self, reset_all_partitions: bool): + def _reset_coordinator_state( + self, reset_all_partitions: bool, partition_overrides=None + ): dl_pb = admin_v2.datalake_pb admin = admin_v2.Admin(self.redpanda) rev = self._get_topic_state().revision @@ -1811,6 +1813,7 @@ def _reset_coordinator_state(self, reset_all_partitions: bool): topic_name=self.topic_name, revision=rev, reset_all_partitions=reset_all_partitions, + partition_overrides=partition_overrides, ) ) @@ -1872,3 +1875,23 @@ def test_coordinator_reset(self, cloud_storage_type, catalog_type): assert not ps.HasField("last_committed"), ( f"Partition {pid} has unexpected last_committed" ) + + dl_pb = admin_v2.datalake_pb + + # Reset with per-partition last_committed overrides. + expected = {0: 5, 2: 7} + self._reset_coordinator_state( + reset_all_partitions=False, + partition_overrides={ + pid: dl_pb.PartitionStateOverride(last_committed=off) + for pid, off in expected.items() + }, + ) + + ts = self._get_topic_state() + for pid, off in expected.items(): + ps = ts.partition_states[pid] + assert ps.last_committed == off, ( + f"Partition {pid}: expected {off}, got {ps.last_committed}" + ) + assert not ts.partition_states[1].HasField("last_committed")