Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 39 additions & 7 deletions src/v/cluster_link/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,9 @@ class default_link_config_provider
}

private:
ss::future<std::optional<kafka::offset>> fetch_offset_for_timestamp(
/// Dispatch a ListOffsets request for the given timestamp and return the
/// offset from the response. Returns std::nullopt on transient errors.
ss::future<std::optional<kafka::offset>> dispatch_list_offsets(
retry_chain_node& rcn,
const ::model::topic_partition& tp,
::model::timestamp ts) {
Expand Down Expand Up @@ -799,15 +801,45 @@ class default_link_config_provider
partition.error_code);
co_return std::nullopt;
}
vlog(
cllog.debug,
"[{}] Fetched offset {} for timestamp {}",
tp,
partition.offset,
ts);
co_return partition.offset;
}

ss::future<std::optional<kafka::offset>> fetch_offset_for_timestamp(
retry_chain_node& rcn,
const ::model::topic_partition& tp,
::model::timestamp ts) {
auto offset = co_await dispatch_list_offsets(rcn, tp, ts);
if (!offset) {
co_return std::nullopt;
}
if (*offset >= kafka::offset{0}) {
vlog(
cllog.debug,
"[{}] Fetched offset {} for timestamp {}",
tp,
*offset,
ts);
co_return offset;
}
// ListOffsets returns offset -1 when the timestamp is past the end
// of the log (or the partition is empty). Fall back to the last
// stable offset (LSO) so we start replicating only new committed
// data. We query with latest_timestamp which, combined with our
// read_committed isolation level, returns the LSO.
auto lso = co_await dispatch_list_offsets(
rcn, tp, kafka::list_offsets_request::latest_timestamp);
if (lso) {
vlog(
cllog.info,
"[{}] Timestamp {} is past the end of the source log, "
"falling back to last stable offset {}",
tp,
ts,
*lso);
}
co_return lso;
}

ss::future<std::optional<kafka::api_version>>
get_list_offset_api_version(retry_chain_node& rcn) {
rcn.check_abort();
Expand Down
153 changes: 152 additions & 1 deletion tests/rptest/tests/cluster_linking_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@
shadow_link_pb2,
)
from rptest.clients.kafka_cli_tools import KafkaCliToolsError
from rptest.clients.rpk import RpkTool, RPKACLInput, RpkException, RpkGroup
from rptest.clients.rpk import (
RpkPartition,
RpkTool,
RPKACLInput,
RpkException,
RpkGroup,
)
from rptest.clients.types import TopicSpec
from rptest.services.cluster import TestContext
from rptest.services.admin import Admin
Expand Down Expand Up @@ -3902,3 +3908,148 @@ def do_wait_for_hwm():
assert source_offsets == target_offsets, (
f"Expected source and target offsets to match, got {target_offsets} vs {source_offsets}"
)

@cluster(num_nodes=7)
@matrix(
source_cluster_spec=[
SecondaryClusterSpec(ServiceType.REDPANDA),
SecondaryClusterSpec(
ServiceType.KAFKA,
kafka_version="3.8.0",
kafka_quorum="COMBINED_KRAFT",
),
],
)
def test_start_at_future_timestamp(
self,
source_cluster_spec: SecondaryClusterSpec,
):
"""
Verify that when a shadow link is configured with a start timestamp
past the end of the source log, replication begins at the LSO rather
than offset 0.

ListOffsets returns offset -1 with error_code=none when the requested
timestamp exceeds all data in the partition. The fix detects this and
falls back to the LSO so only new data is replicated.
"""
_ = source_cluster_spec
topic = TopicSpec(name="source-topic", partition_count=1, replication_factor=3)
self.source_default_client().create_topic(topic)

# Produce historical data that must NOT be replicated to the target.
initial_msg_count = 1000
KgoVerifierProducer.oneshot(
self.test_context,
self.source_cluster.service,
topic=topic.name,
msg_size=4 * 1024,
msg_count=initial_msg_count,
custom_node=self.preallocated_nodes,
)

def get_partition_0_info(rpk: RpkTool) -> RpkPartition | None:
try:
for part in rpk.describe_topic(topic.name, timeout=3):
if part.id == 0:
return part
except Exception as e:
self.logger.debug(f"Failed to describe topic: {e}")
return None

def source_hwm_reached_msg_count() -> int | None:
p_info = get_partition_0_info(self.source_cluster_rpk)
if p_info and p_info.high_watermark >= initial_msg_count:
return p_info.high_watermark
return None

source_orig_hwm = wait_until_result(
source_hwm_reached_msg_count,
timeout_sec=30,
backoff_sec=1,
err_msg="Timed out waiting for source HWM to reach expected count",
)
self.logger.info(f"Source HWM before link creation: {source_orig_hwm}")

# Configure the link with a timestamp far in the future so that
# ListOffsets returns offset -1 (no record at or after that timestamp).
req = self.create_default_link_request("test-link")
timestamp_pb = google.protobuf.timestamp_pb2.Timestamp()
timestamp_pb.FromMilliseconds(
int(
time.mktime(time.strptime("2100-01-01T00:00:00", "%Y-%m-%dT%H:%M:%S"))
* 1000
)
)
req.shadow_link.configurations.topic_metadata_sync_options.start_at_timestamp.CopyFrom(
timestamp_pb
)
self.create_link_with_request(req=req)

self.target_cluster.service.wait_until(
lambda: self.topic_partitions_exists_in_target(topic),
timeout_sec=30,
backoff_sec=1,
err_msg=f"Topic {topic.name} not found in target cluster",
)

# Confirm that the link is not replicating historical data (both HWM and start offset stay 0 on the target).
sleep(5)
prev_target_info = get_partition_0_info(self.target_cluster_rpk)
assert prev_target_info is not None, "Failed to get target partition info"
assert prev_target_info.high_watermark == 0, (
f"Expected target HWM to be 0, got {prev_target_info.high_watermark}"
)
assert prev_target_info.start_offset == 0, (
f"Expected target start offset to be 0, got {prev_target_info.start_offset}"
)

self.logger.info("Producing one new record to the source topic")
self.source_cluster_rpk.produce(
topic=topic.name, key="key", msg="value", partition=0
)

source_info = get_partition_0_info(self.source_cluster_rpk)
assert source_info is not None, (
"Failed to get source partition info after producing new record"
)
assert source_info.high_watermark == source_orig_hwm + 1, (
f"Expected source HWM to advance by 1 after producing new record, "
f"got {source_info.high_watermark} vs previous {source_orig_hwm}"
)

def target_has_new_data():
target_info = get_partition_0_info(self.target_cluster_rpk)
if target_info is None:
return False

return (
target_info.high_watermark == source_info.high_watermark,
target_info,
)

target_info: RpkPartition = wait_until_result(
target_has_new_data,
timeout_sec=60,
backoff_sec=2,
err_msg="New data was not replicated to the target cluster",
retry_on_exc=True,
)

assert target_info.high_watermark == source_info.high_watermark, (
f"Expected target HWM to be {source_info.high_watermark} after replicating new record, got {target_info.high_watermark}"
)

source_last_record = json.loads(
self.source_cluster_rpk.consume(
topic=topic.name, n=1, partition=0, offset=-1
)
)
target_first_record = json.loads(
self.target_cluster_rpk.consume(
topic=topic.name, n=1, partition=0, offset="start"
)
)
assert source_last_record == target_first_record, (
f"Record mismatch: source={source_last_record}, target={target_first_record}"
)
Loading