diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index 195601df8b552..c5d461da04029 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -1569,45 +1569,52 @@ void op_context::create_response_placeholders() { } } -bool update_fetch_partition( +// Determines if a partition should be included in an incremental fetch +// response per KIP-227. +bool partition_has_changes( const fetch_response::partition_response& resp, - fetch_session_partition& partition) { - bool include = false; + const fetch_session_partition& session_partition) { if (resp.records && resp.records->size_bytes() > 0) { - // Partitions with new data are always included in the response. - include = true; + return true; } - if (partition.high_watermark != resp.high_watermark) { - include = true; - partition.high_watermark = model::offset(resp.high_watermark); + if (session_partition.high_watermark != resp.high_watermark) { + return true; } - if (partition.last_stable_offset != resp.last_stable_offset) { - include = true; - partition.last_stable_offset = model::offset(resp.last_stable_offset); + if (session_partition.last_stable_offset != resp.last_stable_offset) { + return true; } - if (partition.start_offset != resp.log_start_offset) { - include = true; - partition.start_offset = model::offset(resp.log_start_offset); + if (session_partition.start_offset != resp.log_start_offset) { + return true; } /** * Always include partition in a response if it contains information about * the preferred replica */ if (resp.preferred_read_replica != -1) { - include = true; - } - if (include) { - return include; + return true; } if (resp.error_code != error_code::none) { // Partitions with errors are always included in the response. - // We also set the cached highWatermark to an invalid offset, -1. - // This ensures that when the error goes away, we re-send the - // partition. - partition.high_watermark = model::offset{-1}; - include = true; + return true; + } + return false; +} + +// Updates the fetch session's partition with the response. Called in +// send_response() when committing the response, not during fetch iteration (to +// avoid premature updates on retries). +void update_session_partition( + const fetch_response::partition_response& resp, + fetch_session_partition& session_partition) { + session_partition.high_watermark = model::offset(resp.high_watermark); + session_partition.last_stable_offset = model::offset( + resp.last_stable_offset); + session_partition.start_offset = model::offset(resp.log_start_offset); + if (resp.error_code != error_code::none) { + // Set high_watermark to -1 so we re-send this partition once the error + // clears. + session_partition.high_watermark = model::offset{-1}; } - return include; } ss::future op_context::send_response() && { @@ -1641,7 +1648,24 @@ ss::future op_context::send_response() && { } // bellow we handle incremental fetches, set response session id response.data.session_id = session_ctx.session()->id(); + + auto& session_partitions = session_ctx.session()->partitions(); + auto update_session = [&session_partitions](const auto& resp_it) { + auto key = model::kitp_view( + resp_it->partition->topic_id, + resp_it->partition->topic, + resp_it->partition_response->partition_index); + if (auto sp_it = session_partitions.find(key); + sp_it != session_partitions.end()) { + update_session_partition( + *resp_it->partition_response, sp_it->second->partition); + } + }; + if (session_ctx.is_full_fetch()) { + for (auto it = response.begin(false); it != response.end(); ++it) { + update_session(it); + } return rctx.respond(std::move(response)); } @@ -1651,6 +1675,8 @@ ss::future op_context::send_response() && { final_response.internal_topic_bytes = response.internal_topic_bytes; for (auto it = response.begin(true); it != response.end(); ++it) { + update_session(it); + if (it->is_new_topic) { final_response.data.responses.emplace_back( fetchable_topic_response{ @@ -1762,7 +1788,7 @@ void op_context::response_placeholder::set( if (auto it = session_partitions.find(key); it != session_partitions.end()) { - auto has_to_be_included = update_fetch_partition( + auto has_to_be_included = partition_has_changes( *_it->partition_response, it->second->partition); /** * From KIP-227 diff --git a/src/v/kafka/server/tests/fetch_test.cc b/src/v/kafka/server/tests/fetch_test.cc index 9784ab24d281a..98e5023d023b8 100644 --- a/src/v/kafka/server/tests/fetch_test.cc +++ b/src/v/kafka/server/tests/fetch_test.cc @@ -1209,3 +1209,113 @@ FIXTURE_TEST(fetch_response_bytes_eq_units, redpanda_thread_fixture) { BOOST_REQUIRE(octx.response_size > 0); BOOST_REQUIRE(octx.response_size == octx.total_response_memory_units()); } + +// Regression test for CORE-14617: When a fetch is retried internally (due to +// min_bytes not being satisfied), partitions with changed metadata (like +// log_start_offset) must still be included in the final response. +FIXTURE_TEST( + fetch_session_propagates_log_start_offset, redpanda_thread_fixture) { + model::topic topic("foo"); + model::partition_id pid(0); + auto ntp = make_default_ntp(topic, pid); + + wait_for_controller_leadership().get(); + add_topic(model::topic_namespace_view(ntp)).get(); + wait_for_partition_offset(ntp, model::offset(0)).get(); + + // Produce some data + auto shard = app.shard_table.local().shard_for(ntp); + app.partition_manager + .invoke_on( + *shard, + [ntp](cluster::partition_manager& mgr) { + return model::test::make_random_batches(model::offset(0), 20) + .then([ntp, &mgr](auto batches) { + auto partition = mgr.get(ntp); + return partition->raft()->replicate( + chunked_vector( + std::from_range, + std::move(batches) | std::views::as_rvalue), + raft::replicate_options( + raft::consistency_level::quorum_ack)); + }); + }) + .get(); + + auto client = make_kafka_client().get(); + client.connect().get(); + + // Full fetch to establish session (session_epoch=0, invalid session_id) + kafka::fetch_request req1; + req1.data.max_bytes = std::numeric_limits::max(); + req1.data.min_bytes = 1; + req1.data.max_wait_ms = 1000ms; + req1.data.session_id = kafka::invalid_fetch_session_id; + req1.data.session_epoch = kafka::initial_fetch_session_epoch; + req1.data.topics.emplace_back( + kafka::fetch_topic{ + .topic = topic, + .partitions = {{ + .partition = pid, + .fetch_offset = model::offset(5), + }}, + }); + + auto resp1 = client.dispatch(std::move(req1), kafka::api_version(12)).get(); + BOOST_REQUIRE_EQUAL(resp1.data.responses.size(), 1); + BOOST_REQUIRE_EQUAL( + resp1.data.responses[0].partitions[0].error_code, + kafka::error_code::none); + BOOST_REQUIRE_NE(resp1.data.session_id, kafka::invalid_fetch_session_id); + + auto session_id = resp1.data.session_id; + auto initial_log_start + = resp1.data.responses[0].partitions[0].log_start_offset; + + // Prefix truncate to change log_start_offset + auto trunc_err = app.partition_manager + .invoke_on( + *shard, + [ntp](cluster::partition_manager& mgr) { + auto partition = mgr.get(ntp); + auto k_trunc_offset = kafka::offset(5); + auto rp_trunc_offset + = partition->log()->to_log_offset( + model::offset(k_trunc_offset)); + return partition->prefix_truncate( + rp_trunc_offset, + k_trunc_offset, + ss::lowres_clock::time_point::max()); + }) + .get(); + BOOST_REQUIRE(!trunc_err); + + // Incremental fetch with min_bytes=1 - will retry internally waiting for + // data, but should still include the partition due to log_start_offset + // change even if no new data arrives during the retry window. + kafka::fetch_request req2; + req2.data.max_bytes = std::numeric_limits::max(); + req2.data.min_bytes = 1; + req2.data.max_wait_ms = 1000ms; + req2.data.session_id = session_id; + req2.data.session_epoch = kafka::fetch_session_epoch(1); + req2.data.topics.emplace_back( + kafka::fetch_topic{ + .topic = topic, + .partitions = {{ + .partition = pid, + .fetch_offset = model::offset(20), + }}, + }); + + auto resp2 = client.dispatch(std::move(req2), kafka::api_version(12)).get(); + client.stop().then([&client] { client.shutdown(); }).get(); + + // The partition must be included even though no new data arrived, because + // log_start_offset changed. This is the regression test for CORE-14617. + BOOST_REQUIRE_EQUAL(resp2.data.responses.size(), 1); + BOOST_REQUIRE_EQUAL(resp2.data.responses[0].partitions.size(), 1); + auto new_log_start = resp2.data.responses[0].partitions[0].log_start_offset; + BOOST_REQUIRE_GT(new_log_start, initial_log_start); + BOOST_REQUIRE_EQUAL(new_log_start, model::offset(5)); +} diff --git a/tests/rptest/tests/cluster_linking_e2e_test.py b/tests/rptest/tests/cluster_linking_e2e_test.py index 88de6b8cc03b5..d31b77fcf14c7 100644 --- a/tests/rptest/tests/cluster_linking_e2e_test.py +++ b/tests/rptest/tests/cluster_linking_e2e_test.py @@ -1665,10 +1665,8 @@ def _check_partitions_match( self.logger.debug( f"Partition {partition_id}: source hwm={hwm}, shadow_hwm{p_info.source_high_watermark}, last_update={p_info.source_last_updated_timestamp}" ) - # TODO: Re-enable once CORE-14617 is addressed - # TODO: CORE-14653 - # if p_info.source_high_watermark != hwm: - # return False + if p_info.source_high_watermark != hwm: + return False return True def _fetch_shadow_topic_and_compare_results(