Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions proto/redpanda/core/admin/internal/datalake/v1/datalake.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ service DatalakeService {
authz: SUPERUSER
};
}

rpc CoordinatorResetTopicState(CoordinatorResetTopicStateRequest)
returns (CoordinatorResetTopicStateResponse) {
option (pbgen.rpc) = {
authz: SUPERUSER
};
}
}

message GetCoordinatorStateRequest {
Expand All @@ -44,6 +51,26 @@ message GetCoordinatorStateResponse {
CoordinatorState state = 1;
}

message PartitionStateOverride {
// Overrides last_committed for the partition. If not set, last_committed is
// not modified.
optional int64 last_committed = 1;
}

// When reset_all_partitions is true, clears all per partition state
// (pending entries, last_committed, etc), then applies overrides.
// When false, clears pending entries only for partitions in partition_overrides
// and applies optional overrides. No-op if reset_all_partitions is false and
// partition_overrides is empty.
message CoordinatorResetTopicStateRequest {
string topic_name = 1;
int64 revision = 2;
bool reset_all_partitions = 3;
map<int32, PartitionStateOverride> partition_overrides = 4;
}

message CoordinatorResetTopicStateResponse {}

message CoordinatorState {
map<string, TopicState> topic_states = 1;
}
Expand Down
20 changes: 20 additions & 0 deletions src/v/datalake/coordinator/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,13 @@ redpanda_cc_library(
],
deps = [
":file_committer",
":partition_state_override",
":snapshot_remover",
":state_update",
":stm",
"//src/v/cluster",
"//src/v/config",
"//src/v/container:chunked_hash_map",
"//src/v/container:chunked_vector",
"//src/v/model",
"@abseil-cpp//absl/hash",
Expand Down Expand Up @@ -255,6 +257,21 @@ redpanda_cc_library(
],
)

redpanda_cc_library(
name = "partition_state_override",
srcs = [
"partition_state_override.cc",
],
hdrs = [
"partition_state_override.h",
],
deps = [
"//src/v/model",
"//src/v/serde",
"@fmt",
],
)

redpanda_cc_library(
name = "state",
srcs = [
Expand Down Expand Up @@ -283,6 +300,7 @@ redpanda_cc_library(
],
visibility = ["//visibility:public"],
deps = [
":partition_state_override",
":translated_offset_range",
"//src/v/datalake:schema_identifier",
"//src/v/datalake:types",
Expand Down Expand Up @@ -335,9 +353,11 @@ redpanda_cc_library(
"state_update.h",
],
deps = [
":partition_state_override",
":state",
":translated_offset_range",
"//src/v/base",
"//src/v/container:chunked_hash_map",
"//src/v/container:chunked_vector",
"//src/v/iceberg:manifest_entry",
"//src/v/model",
Expand Down
50 changes: 50 additions & 0 deletions src/v/datalake/coordinator/coordinator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,56 @@ coordinator::sync_get_topic_state(chunked_vector<model::topic> topics_filter) {
co_return result;
}

ss::future<checked<void, coordinator::errc>>
coordinator::sync_reset_topic_state(
model::topic topic,
model::revision_id topic_revision,
bool reset_all_partitions,
chunked_hash_map<model::partition_id, partition_state_override>
partition_overrides) {
auto gate = maybe_gate();
if (gate.has_error()) {
co_return gate.error();
}

vlog(datalake_log.debug, "Resetting coordinator state for topic {}", topic);
auto sync_res = co_await stm_->sync(10s);
if (sync_res.has_error()) {
co_return convert_stm_errc(sync_res.error());
}

reset_topic_state_update update{
.topic = topic,
.topic_revision = topic_revision,
.reset_all_partitions = reset_all_partitions,
.partition_overrides = std::move(partition_overrides),
};
auto check_res = update.can_apply(stm_->state());
if (check_res.has_error()) {
vlog(
datalake_log.debug,
"Rejecting reset topic state request for {}: {}",
topic,
check_res.error());
co_return errc::stm_apply_error;
}
storage::record_batch_builder builder(
model::record_batch_type::datalake_coordinator, model::offset{0});
builder.add_raw_kv(
serde::to_iobuf(reset_topic_state_update::key),
serde::to_iobuf(std::move(update)));

auto repl_res = co_await stm_->replicate_and_wait(
sync_res.value(), std::move(builder).build(), as_);
if (repl_res.has_error()) {
auto e = convert_stm_errc(repl_res.error());
vlog(datalake_log.warn, "Replication failed {}", e);
co_return e;
}

co_return outcome::success();
}

ss::sstring coordinator::get_effective_default_partition_spec(
const std::optional<ss::sstring>& partition_spec) const {
const auto& cfg = config::shard_local_cfg();
Expand Down
9 changes: 9 additions & 0 deletions src/v/datalake/coordinator/coordinator.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
#include "absl/hash/hash.h"
#include "cluster/fwd.h"
#include "config/property.h"
#include "container/chunked_hash_map.h"
#include "container/chunked_vector.h"
#include "datalake/coordinator/file_committer.h"
#include "datalake/coordinator/partition_state_override.h"
#include "datalake/coordinator/snapshot_remover.h"
#include "datalake/coordinator/state_machine.h"
#include "datalake/fwd.h"
Expand Down Expand Up @@ -89,6 +91,13 @@ class coordinator {
ss::future<checked<chunked_hash_map<model::topic, topic_state>, errc>>
sync_get_topic_state(chunked_vector<model::topic> topics);

ss::future<checked<void, errc>> sync_reset_topic_state(
model::topic topic,
model::revision_id topic_rev,
bool reset_all_partitions,
chunked_hash_map<model::partition_id, partition_state_override>
partition_overrides);

void notify_leadership(std::optional<model::node_id>);

bool leader_loop_running() const { return term_as_.has_value(); }
Expand Down
45 changes: 45 additions & 0 deletions src/v/datalake/coordinator/frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,13 @@ template auto frontend::process<
&frontend::get_topic_state_locally,
&frontend::client::get_topic_state>(get_topic_state_request, bool);

template auto frontend::process<
&frontend::reset_topic_state_locally,
&frontend::client::reset_topic_state>(reset_topic_state_request, bool);

template auto frontend::remote_dispatch<&frontend::client::reset_topic_state>(
reset_topic_state_request, model::node_id);

// -- explicit instantiations ---

frontend::frontend(
Expand Down Expand Up @@ -494,4 +501,42 @@ ss::future<get_topic_state_reply> frontend::get_topic_state(
&client::get_topic_state>(std::move(request), bool(local_only_exec));
}

ss::future<reset_topic_state_reply> frontend::reset_topic_state_locally(
reset_topic_state_request request,
const model::ntp& coordinator_partition,
ss::shard_id shard) {
auto holder = _gate.hold();
co_return co_await _coordinator_mgr->invoke_on(
shard,
[coordinator_partition, &request](coordinator_manager& mgr) mutable {
auto partition = mgr.get(coordinator_partition);
if (!partition) {
return ssx::now(reset_topic_state_reply{errc::not_leader});
}
return partition
->sync_reset_topic_state(
request.topic,
request.topic_revision,
request.reset_all_partitions,
std::move(request.partition_overrides))
.then([](auto result) {
reset_topic_state_reply resp{};
if (result.has_error()) {
resp.errc = to_rpc_errc(result.error());
} else {
resp.errc = errc::ok;
}
return ssx::now(std::move(resp));
});
});
}

ss::future<reset_topic_state_reply> frontend::reset_topic_state(
reset_topic_state_request request, local_only local_only_exec) {
auto holder = _gate.hold();
co_return co_await process<
&frontend::reset_topic_state_locally,
&client::reset_topic_state>(std::move(request), bool(local_only_exec));
}

} // namespace datalake::coordinator
8 changes: 8 additions & 0 deletions src/v/datalake/coordinator/frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ class frontend : public ss::peering_sharded_service<frontend> {
ss::future<get_topic_state_reply>
get_topic_state(get_topic_state_request, local_only = local_only::no);

ss::future<reset_topic_state_reply>
reset_topic_state(reset_topic_state_request, local_only = local_only::no);

/**
* Returns the partition of datalake coordinator topic that
* coordinates datalake tasks for this topic partitions.
Expand Down Expand Up @@ -144,6 +147,11 @@ class frontend : public ss::peering_sharded_service<frontend> {
const model::ntp& coordinator_partition,
ss::shard_id);

ss::future<reset_topic_state_reply> reset_topic_state_locally(
reset_topic_state_request,
const model::ntp& coordinator_partition,
ss::shard_id);

model::node_id _self;
ss::sharded<coordinator_manager>* _coordinator_mgr;
ss::sharded<raft::group_manager>* _group_mgr;
Expand Down
26 changes: 26 additions & 0 deletions src/v/datalake/coordinator/partition_state_override.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2026 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/

#include "datalake/coordinator/partition_state_override.h"

#include <fmt/ostream.h>

namespace datalake::coordinator {

std::ostream& operator<<(std::ostream& o, const partition_state_override& p) {
if (p.last_committed.has_value()) {
fmt::print(o, "{{last_committed: {}}}", p.last_committed.value());
} else {
fmt::print(o, "{{last_committed: nullopt}}");
}
return o;
}

} // namespace datalake::coordinator
31 changes: 31 additions & 0 deletions src/v/datalake/coordinator/partition_state_override.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2026 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/

#pragma once

#include "model/fundamental.h"
#include "serde/envelope.h"

namespace datalake::coordinator {

struct partition_state_override
: public serde::envelope<
partition_state_override,
serde::version<0>,
serde::compat_version<0>> {
std::optional<kafka::offset> last_committed;

auto serde_fields() { return std::tie(last_committed); }

friend std::ostream&
operator<<(std::ostream&, const partition_state_override&);
};

} // namespace datalake::coordinator
5 changes: 5 additions & 0 deletions src/v/datalake/coordinator/rpc.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
"name": "get_topic_state",
"input_type": "get_topic_state_request",
"output_type": "get_topic_state_reply"
},
{
"name": "reset_topic_state",
"input_type": "reset_topic_state_request",
"output_type": "reset_topic_state_reply"
}
]
}
6 changes: 6 additions & 0 deletions src/v/datalake/coordinator/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,10 @@ ss::future<get_topic_state_reply> service::get_topic_state(
std::move(request), frontend::local_only::yes);
}

ss::future<reset_topic_state_reply> service::reset_topic_state(
reset_topic_state_request request, ::rpc::streaming_context&) {
return _frontend->local().reset_topic_state(
std::move(request), frontend::local_only::yes);
}

}; // namespace datalake::coordinator::rpc
3 changes: 3 additions & 0 deletions src/v/datalake/coordinator/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ class service final : public impl::datalake_coordinator_rpc_service {
ss::future<get_topic_state_reply> get_topic_state(
get_topic_state_request, ::rpc::streaming_context&) override;

ss::future<reset_topic_state_reply> reset_topic_state(
reset_topic_state_request, ::rpc::streaming_context&) override;

private:
ss::sharded<frontend>* _frontend;
};
Expand Down
7 changes: 7 additions & 0 deletions src/v/datalake/coordinator/state_machine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,13 @@ ss::future<> coordinator_stm::do_apply(const model::record_batch& b) {
maybe_log_update_error(_log, key, o, res);
continue;
}
case update_key::reset_topic_state: {
auto update = serde::read<reset_topic_state_update>(val_p);
vlog(_log.debug, "Applying {} from offset {}: {}", key, o, update);
auto res = update.apply(state_);
maybe_log_update_error(_log, key, o, res);
continue;
}
}
vlog(
_log.error,
Expand Down
Loading
Loading