diff --git a/src/v/cluster_link/replication/partition_replicator.cc b/src/v/cluster_link/replication/partition_replicator.cc index c759f6180c6da..961f2d461922b 100644 --- a/src/v/cluster_link/replication/partition_replicator.cc +++ b/src/v/cluster_link/replication/partition_replicator.cc @@ -319,6 +319,7 @@ ss::future<> partition_replicator::fetch_and_replicate() { } ss::future<> partition_replicator::maybe_synchronize_start_offset() { + auto shadow_partition_hwm = _sink->high_watermark(); auto shadow_partition_start_offset = _sink->start_offset(); auto source_offsets = _source->get_offsets(); @@ -333,6 +334,7 @@ ss::future<> partition_replicator::maybe_synchronize_start_offset() { } auto source_start_offset = source_offsets->source_start_offset; + auto source_lso = source_offsets->source_lso; if (source_start_offset <= shadow_partition_start_offset) { vlog( @@ -344,6 +346,29 @@ ss::future<> partition_replicator::maybe_synchronize_start_offset() { co_return; } + // The source partition may perform a prefix truncation that lands in the + // middle of a batch. Redpanda's data replicators will replicate the whole + // batch, including data that starts before the start offset. If we prefix + // truncate the shadow partition before that batch is replicated, this will + // interfere with our ability to replicate the entire batch, leading to data + // loss. To prevent being too eager, we only perform prefix truncation when + // we know that we have fully replicated all batches up to or past the + // source start offset. This means: source_start_offset < + // shadow_partition_hwm OR source_start_offset == source_lso. + if ( + source_lso != source_start_offset + && source_start_offset > shadow_partition_hwm) { + vlog( + _log.debug, + "Shadow partition has not replicated up to the source start offset " + "yet, deferring prefix truncation. source_start_offset: {}, " + "source_lso: {}, shadow_partition_hwm: {}", + source_start_offset, + source_lso, + shadow_partition_hwm); + co_return; + } + auto truncate_offset = std::max(_start_offset, source_start_offset); vlog( _log.debug,