Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 51 additions & 25 deletions src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1569,45 +1569,52 @@ void op_context::create_response_placeholders() {
}
}

bool update_fetch_partition(
// Determines if a partition should be included in an incremental fetch
// response per KIP-227.
bool partition_has_changes(
const fetch_response::partition_response& resp,
fetch_session_partition& partition) {
bool include = false;
const fetch_session_partition& session_partition) {
if (resp.records && resp.records->size_bytes() > 0) {
// Partitions with new data are always included in the response.
include = true;
return true;
}
if (partition.high_watermark != resp.high_watermark) {
include = true;
partition.high_watermark = model::offset(resp.high_watermark);
if (session_partition.high_watermark != resp.high_watermark) {
return true;
}
if (partition.last_stable_offset != resp.last_stable_offset) {
include = true;
partition.last_stable_offset = model::offset(resp.last_stable_offset);
if (session_partition.last_stable_offset != resp.last_stable_offset) {
return true;
}
if (partition.start_offset != resp.log_start_offset) {
include = true;
partition.start_offset = model::offset(resp.log_start_offset);
if (session_partition.start_offset != resp.log_start_offset) {
return true;
}
/**
* Always include partition in a response if it contains information about
* the preferred replica
*/
if (resp.preferred_read_replica != -1) {
include = true;
}
if (include) {
return include;
return true;
}
if (resp.error_code != error_code::none) {
// Partitions with errors are always included in the response.
// We also set the cached highWatermark to an invalid offset, -1.
// This ensures that when the error goes away, we re-send the
// partition.
partition.high_watermark = model::offset{-1};
include = true;
return true;
}
return false;
}

// Updates the fetch session's partition with the response. Called in
// send_response() when committing the response, not during fetch iteration (to
// avoid premature updates on retries).
void update_session_partition(
const fetch_response::partition_response& resp,
fetch_session_partition& session_partition) {
session_partition.high_watermark = model::offset(resp.high_watermark);
session_partition.last_stable_offset = model::offset(
resp.last_stable_offset);
session_partition.start_offset = model::offset(resp.log_start_offset);
if (resp.error_code != error_code::none) {
// Set high_watermark to -1 so we re-send this partition once the error
// clears.
session_partition.high_watermark = model::offset{-1};
}
return include;
}

ss::future<response_ptr> op_context::send_response() && {
Expand Down Expand Up @@ -1641,7 +1648,24 @@ ss::future<response_ptr> op_context::send_response() && {
}
// bellow we handle incremental fetches, set response session id
response.data.session_id = session_ctx.session()->id();

auto& session_partitions = session_ctx.session()->partitions();
auto update_session = [&session_partitions](const auto& resp_it) {
auto key = model::kitp_view(
resp_it->partition->topic_id,
resp_it->partition->topic,
resp_it->partition_response->partition_index);
if (auto sp_it = session_partitions.find(key);
sp_it != session_partitions.end()) {
update_session_partition(
*resp_it->partition_response, sp_it->second->partition);
}
};

if (session_ctx.is_full_fetch()) {
for (auto it = response.begin(false); it != response.end(); ++it) {
update_session(it);
}
return rctx.respond(std::move(response));
}

Expand All @@ -1651,6 +1675,8 @@ ss::future<response_ptr> op_context::send_response() && {
final_response.internal_topic_bytes = response.internal_topic_bytes;

for (auto it = response.begin(true); it != response.end(); ++it) {
update_session(it);

if (it->is_new_topic) {
final_response.data.responses.emplace_back(
fetchable_topic_response{
Expand Down Expand Up @@ -1762,7 +1788,7 @@ void op_context::response_placeholder::set(

if (auto it = session_partitions.find(key);
it != session_partitions.end()) {
auto has_to_be_included = update_fetch_partition(
auto has_to_be_included = partition_has_changes(
*_it->partition_response, it->second->partition);
/**
* From KIP-227
Expand Down
110 changes: 110 additions & 0 deletions src/v/kafka/server/tests/fetch_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1209,3 +1209,113 @@ FIXTURE_TEST(fetch_response_bytes_eq_units, redpanda_thread_fixture) {
BOOST_REQUIRE(octx.response_size > 0);
BOOST_REQUIRE(octx.response_size == octx.total_response_memory_units());
}

// Regression test for CORE-14617: When a fetch is retried internally (due to
// min_bytes not being satisfied), partitions with changed metadata (like
// log_start_offset) must still be included in the final response.
FIXTURE_TEST(
fetch_session_propagates_log_start_offset, redpanda_thread_fixture) {
model::topic topic("foo");
model::partition_id pid(0);
auto ntp = make_default_ntp(topic, pid);

wait_for_controller_leadership().get();
add_topic(model::topic_namespace_view(ntp)).get();
wait_for_partition_offset(ntp, model::offset(0)).get();

// Produce some data
auto shard = app.shard_table.local().shard_for(ntp);
app.partition_manager
.invoke_on(
*shard,
[ntp](cluster::partition_manager& mgr) {
return model::test::make_random_batches(model::offset(0), 20)
.then([ntp, &mgr](auto batches) {
auto partition = mgr.get(ntp);
return partition->raft()->replicate(
chunked_vector<model::record_batch>(
std::from_range,
std::move(batches) | std::views::as_rvalue),
raft::replicate_options(
raft::consistency_level::quorum_ack));
});
})
.get();

auto client = make_kafka_client().get();
client.connect().get();

// Full fetch to establish session (session_epoch=0, invalid session_id)
kafka::fetch_request req1;
req1.data.max_bytes = std::numeric_limits<int32_t>::max();
req1.data.min_bytes = 1;
req1.data.max_wait_ms = 1000ms;
req1.data.session_id = kafka::invalid_fetch_session_id;
req1.data.session_epoch = kafka::initial_fetch_session_epoch;
req1.data.topics.emplace_back(
kafka::fetch_topic{
.topic = topic,
.partitions = {{
.partition = pid,
.fetch_offset = model::offset(5),
}},
});

auto resp1 = client.dispatch(std::move(req1), kafka::api_version(12)).get();
BOOST_REQUIRE_EQUAL(resp1.data.responses.size(), 1);
BOOST_REQUIRE_EQUAL(
resp1.data.responses[0].partitions[0].error_code,
kafka::error_code::none);
BOOST_REQUIRE_NE(resp1.data.session_id, kafka::invalid_fetch_session_id);

auto session_id = resp1.data.session_id;
auto initial_log_start
= resp1.data.responses[0].partitions[0].log_start_offset;

// Prefix truncate to change log_start_offset
auto trunc_err = app.partition_manager
.invoke_on(
*shard,
[ntp](cluster::partition_manager& mgr) {
auto partition = mgr.get(ntp);
auto k_trunc_offset = kafka::offset(5);
auto rp_trunc_offset
= partition->log()->to_log_offset(
model::offset(k_trunc_offset));
return partition->prefix_truncate(
rp_trunc_offset,
k_trunc_offset,
ss::lowres_clock::time_point::max());
})
.get();
BOOST_REQUIRE(!trunc_err);

// Incremental fetch with min_bytes=1 - will retry internally waiting for
// data, but should still include the partition due to log_start_offset
// change even if no new data arrives during the retry window.
kafka::fetch_request req2;
req2.data.max_bytes = std::numeric_limits<int32_t>::max();
req2.data.min_bytes = 1;
req2.data.max_wait_ms = 1000ms;
req2.data.session_id = session_id;
req2.data.session_epoch = kafka::fetch_session_epoch(1);
req2.data.topics.emplace_back(
kafka::fetch_topic{
.topic = topic,
.partitions = {{
.partition = pid,
.fetch_offset = model::offset(20),
}},
});

auto resp2 = client.dispatch(std::move(req2), kafka::api_version(12)).get();
client.stop().then([&client] { client.shutdown(); }).get();

// The partition must be included even though no new data arrived, because
// log_start_offset changed. This is the regression test for CORE-14617.
BOOST_REQUIRE_EQUAL(resp2.data.responses.size(), 1);
BOOST_REQUIRE_EQUAL(resp2.data.responses[0].partitions.size(), 1);
auto new_log_start = resp2.data.responses[0].partitions[0].log_start_offset;
BOOST_REQUIRE_GT(new_log_start, initial_log_start);
BOOST_REQUIRE_EQUAL(new_log_start, model::offset(5));
}
6 changes: 2 additions & 4 deletions tests/rptest/tests/cluster_linking_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1665,10 +1665,8 @@ def _check_partitions_match(
self.logger.debug(
f"Partition {partition_id}: source hwm={hwm}, shadow_hwm{p_info.source_high_watermark}, last_update={p_info.source_last_updated_timestamp}"
)
# TODO: Re-enable once CORE-14617 is addressed
# TODO: CORE-14653
# if p_info.source_high_watermark != hwm:
# return False
if p_info.source_high_watermark != hwm:
return False
return True

def _fetch_shadow_topic_and_compare_results(
Expand Down