diff --git a/src/v/cloud_storage/async_manifest_materializer.cc b/src/v/cloud_storage/async_manifest_materializer.cc index 39e741dcf8425..c4f78a1ee44ef 100644 --- a/src/v/cloud_storage/async_manifest_materializer.cc +++ b/src/v/cloud_storage/async_manifest_materializer.cc @@ -61,7 +61,11 @@ ss::future< result> async_manifest_materializer::materialize_manifest(const segment_meta& meta) { auto res = _manifest_cache->get( - std::make_tuple(_stm_manifest->get_ntp(), meta.base_offset), _ctxlog); + std::make_tuple( + _stm_manifest->get_ntp(), + _stm_manifest->get_revision_id(), + meta.base_offset), + _ctxlog); if (res) { return ss::make_ready_future< result>(std::move(res)); @@ -110,6 +114,7 @@ ss::future<> async_manifest_materializer::run_bg_loop() { if (!_manifest_cache->contains( std::make_tuple( _stm_manifest->get_ntp(), + _stm_manifest->get_revision_id(), front.search_vec.base_offset))) { // Manifest is not cached and has to be hydrated and/or // materialized. @@ -185,7 +190,9 @@ ss::future<> async_manifest_materializer::run_bg_loop() { } auto cached = _manifest_cache->get( std::make_tuple( - _stm_manifest->get_ntp(), front.search_vec.base_offset), + _stm_manifest->get_ntp(), + _stm_manifest->get_revision_id(), + front.search_vec.base_offset), _ctxlog); front.promise.set_value(cached); vlog( diff --git a/src/v/cloud_storage/materialized_manifest_cache.cc b/src/v/cloud_storage/materialized_manifest_cache.cc index 4404865104c15..13c5d3631c83f 100644 --- a/src/v/cloud_storage/materialized_manifest_cache.cc +++ b/src/v/cloud_storage/materialized_manifest_cache.cc @@ -38,9 +38,9 @@ template<> struct fmt::formatter : public fmt::formatter { auto format(const cloud_storage::manifest_cache_key& key, auto& ctx) const { - const auto& [ntp, o] = key; + const auto& [ntp, revision, o] = key; return formatter::format( - fmt::format("[{}:{}]", ntp, o), ctx); + fmt::format("[{}:{}:{}]", ntp, revision, o), ctx); } }; @@ -169,7 +169,9 @@ void materialized_manifest_cache::put( manifest.get_ntp()); const model::ntp& ntp = manifest.get_ntp(); auto key = manifest_cache_key( - ntp, manifest.get_start_offset().value_or(model::offset{})); + ntp, + manifest.get_revision_id(), + manifest.get_start_offset().value_or(model::offset{})); vlog(ctxlog.debug, "Cache PUT key {}, {} units", key, s.count()); if (!_eviction_rollback.empty()) { auto it = lookup_eviction_rollback_list(key); diff --git a/src/v/cloud_storage/materialized_manifest_cache.h b/src/v/cloud_storage/materialized_manifest_cache.h index 823f1de55c183..5ebc8f4b48e5c 100644 --- a/src/v/cloud_storage/materialized_manifest_cache.h +++ b/src/v/cloud_storage/materialized_manifest_cache.h @@ -25,7 +25,8 @@ namespace cloud_storage { -using manifest_cache_key = std::tuple; +using manifest_cache_key + = std::tuple; /// Materialized spillover manifest /// @@ -47,6 +48,7 @@ struct materialized_manifest manifest_cache_key get_key() const { return std::make_tuple( manifest.get_ntp(), + manifest.get_revision_id(), manifest.get_start_offset().value_or(model::offset{})); } diff --git a/src/v/cloud_storage/tests/cloud_storage_e2e_test.cc b/src/v/cloud_storage/tests/cloud_storage_e2e_test.cc index fa7f8a914d229..b17bfd2153968 100644 --- a/src/v/cloud_storage/tests/cloud_storage_e2e_test.cc +++ b/src/v/cloud_storage/tests/cloud_storage_e2e_test.cc @@ -1542,6 +1542,92 @@ TEST_F(ManualFixture, TestSpilloverWithTruncationRetainsStartOffset) { archiver.manifest().get_start_kafka_offset_override(), kafka::offset{48}); } +// Test a scenario where after a topic recreation the spillover manifests from +// the previous incarnation could be applied to the new topic, causing errors or +// reading wrong data. +TEST_F(ManualFixture, TestSpilloverCacheCollision) { + test_local_cfg.get("log_compaction_interval_ms") + .set_value(std::chrono::duration_cast(1s)); + test_local_cfg.get("cloud_storage_disable_upload_loop_for_tests") + .set_value(true); + test_local_cfg.get("cloud_storage_spillover_manifest_max_segments") + .set_value(std::make_optional(2)); + test_local_cfg.get("cloud_storage_spillover_manifest_size") + .set_value(std::optional{}); + + const model::topic topic_name("spillover_truncate_test"); + model::ntp ntp(model::kafka_namespace, topic_name, 0); + + cluster::topic_properties props; + props.shadow_indexing = model::shadow_indexing_mode::full; + props.cleanup_policy_bitflags = model::cleanup_policy_bitflags::deletion; + props.retention_bytes = tristate(disable_tristate_t{}); + props.retention_duration = tristate( + disable_tristate_t{}); + + // Run two iterations of topic creation, data production, spillover, and + // deletion. The second iteration will reuse the topic name, and we want to + // verify that the first incarnation does not affect the + // second incarnation. + for (int iteration = 0; iteration < 2; ++iteration) { + vlog(e2e_test_log.info, "Running iteration {}", iteration); + + add_topic({model::kafka_namespace, topic_name}, 1, props).get(); + wait_for_leader(ntp).get(); + + auto partition = app.partition_manager.local().get(ntp); + auto* archiver = &partition->archiver().value().get(); + archiver->initialize_probe(); + + vlog(e2e_test_log.info, "Seeding partition data"); + + tests::remote_segment_generator gen( + make_kafka_client().get(), *partition); + auto deferred_g_close = ss::defer([&gen] { gen.stop().get(); }); + + auto total_records = gen.num_segments(6) + .batches_per_segment(5) + .records_per_batch(1) + .start_ix(iteration) + .produce() + .get(); + ASSERT_GE(total_records, 30); + + ASSERT_TRUE(archiver->sync_for_tests().get()); + + // Evict local log to force reads from tiered storage. + vlog(e2e_test_log.info, "Setting cloud_gc to evict local log"); + partition->log()->set_cloud_gc_offset( + archiver->manifest().get_last_offset()); + + RPTEST_REQUIRE_EVENTUALLY(10s, [log = partition->log()] { + return log->segments().size() == 1; + }); + + vlog(e2e_test_log.info, "Applying spillover"); + archiver->apply_spillover().get(); + + // Verify that spillover happened as we expect. + ASSERT_EQ(archiver->manifest().get_spillover_map().size(), 2); + + // Consume all data. + tests::kafka_consume_transport consumer(make_kafka_client().get()); + auto deferred_c_close = ss::defer( + [&consumer] { consumer.stop().get(); }); + consumer.start().get(); + auto consumed = consumer + .consume_from_partition( + topic_name, ntp.tp.partition, model::offset{0}) + .get(); + ASSERT_EQ(consumed.size(), 30); + ASSERT_EQ(consumed.front().key, fmt::format("key{}", iteration)); + + // Delete topic. + vlog(e2e_test_log.info, "Deleting topic"); + delete_topic({model::kafka_namespace, topic_name}).get(); + } +} + INSTANTIATE_TEST_SUITE_P(WithOverride, EndToEndFixture, ::testing::Bool()); INSTANTIATE_TEST_SUITE_P( WithOverride, CloudStorageEndToEndManualTest, ::testing::Bool()); diff --git a/src/v/cloud_storage/tests/materialized_manifest_cache_test.cc b/src/v/cloud_storage/tests/materialized_manifest_cache_test.cc index 126de8de7053c..9e6a70dc9c6fa 100644 --- a/src/v/cloud_storage/tests/materialized_manifest_cache_test.cc +++ b/src/v/cloud_storage/tests/materialized_manifest_cache_test.cc @@ -8,15 +8,11 @@ * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md */ -#include "bytes/iostream.h" -#include "cloud_io/tests/s3_imposter.h" +#include "cloud_storage/materialized_manifest_cache.h" #include "cloud_storage/spillover_manifest.h" -#include "cloud_storage/tests/cloud_storage_fixture.h" -#include "cloud_storage/tests/util.h" +#include "cloud_storage/tests/common_def.h" #include "cloud_storage/types.h" #include "model/fundamental.h" -#include "model/metadata.h" -#include "model/timeout_clock.h" #include "utils/retry_chain_node.h" #include @@ -27,10 +23,7 @@ #include #include -#include -#include -#include - +using namespace std::chrono_literals; using namespace cloud_storage; static ss::logger test_log("async_manifest_view_log"); @@ -48,11 +41,11 @@ static spillover_manifest make_manifest(model::offset base) { } manifest_cache_key make_key(int64_t off) { - return std::make_tuple(manifest_ntp, model::offset{off}); + return std::make_tuple(manifest_ntp, manifest_rev, model::offset{off}); } manifest_cache_key make_key(model::offset off) { - return std::make_tuple(manifest_ntp, off); + return std::make_tuple(manifest_ntp, manifest_rev, off); } // Add elements to an empty cache and verify that they are added correctly. diff --git a/src/v/cloud_storage/tests/produce_utils.h b/src/v/cloud_storage/tests/produce_utils.h index 24be25e5728fa..6f307f4e225d3 100644 --- a/src/v/cloud_storage/tests/produce_utils.h +++ b/src/v/cloud_storage/tests/produce_utils.h @@ -25,7 +25,10 @@ class remote_segment_generator { kafka::client::transport transport, cluster::partition& partition) : _producer(std::move(transport)) , _partition(partition) {} - + remote_segment_generator& start_ix(size_t ix) { + _current_ix = ix; + return *this; + } remote_segment_generator& num_segments(size_t n) { _num_remote_segs = n; return *this; @@ -65,14 +68,13 @@ class remote_segment_generator { auto log = _partition.log(); auto& archiver = _partition.archiver().value().get(); - size_t total_records = 0; auto cur_timestamp = _base_timestamp; while (_partition.archival_meta_stm()->manifest().size() < _num_remote_segs) { for (size_t i = 0; i < _batches_per_seg; i++) { std::vector records = kv_t::sequence( - total_records, _records_per_batch); - total_records += _records_per_batch; + _current_ix, _records_per_batch); + _current_ix += _records_per_batch; co_await _producer.produce_to_partition( _partition.ntp().tp.topic, _partition.ntp().tp.partition, @@ -114,8 +116,8 @@ class remote_segment_generator { for (size_t i = 0; i < _num_local_segs; i++) { for (size_t i = 0; i < _batches_per_seg; i++) { std::vector records = kv_t::sequence( - total_records, _records_per_batch); - total_records += _records_per_batch; + _current_ix, _records_per_batch); + _current_ix += _records_per_batch; co_await _producer.produce_to_partition( _partition.ntp().tp.topic, _partition.ntp().tp.partition, @@ -129,7 +131,7 @@ class remote_segment_generator { co_await log->flush(); co_await log->force_roll(); } - co_return total_records; + co_return _current_ix; } private: @@ -143,6 +145,7 @@ class remote_segment_generator { size_t _batches_per_seg{1}; std::optional _base_timestamp{std::nullopt}; int _batch_time_delta_ms{1}; + size_t _current_ix{0}; }; } // namespace tests