-
Notifications
You must be signed in to change notification settings - Fork 727
cloud_storage: include revision_id in spillover manifest cache key #29068
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -1542,6 +1542,92 @@ TEST_F(ManualFixture, TestSpilloverWithTruncationRetainsStartOffset) { | |||||||
| archiver.manifest().get_start_kafka_offset_override(), kafka::offset{48}); | ||||||||
| } | ||||||||
|
|
||||||||
| // Test a scenario where after a topic recreation the spillover manifests from | ||||||||
| // the previous incarnation could be applied to the new topic, causing errors or | ||||||||
| // reading wrong data. | ||||||||
| TEST_F(ManualFixture, TestSpilloverCacheCollision) { | ||||||||
| test_local_cfg.get("log_compaction_interval_ms") | ||||||||
| .set_value(std::chrono::duration_cast<std::chrono::milliseconds>(1s)); | ||||||||
| test_local_cfg.get("cloud_storage_disable_upload_loop_for_tests") | ||||||||
| .set_value(true); | ||||||||
| test_local_cfg.get("cloud_storage_spillover_manifest_max_segments") | ||||||||
| .set_value(std::make_optional<size_t>(2)); | ||||||||
| test_local_cfg.get("cloud_storage_spillover_manifest_size") | ||||||||
| .set_value(std::optional<size_t>{}); | ||||||||
|
|
||||||||
| const model::topic topic_name("spillover_truncate_test"); | ||||||||
| model::ntp ntp(model::kafka_namespace, topic_name, 0); | ||||||||
|
|
||||||||
| cluster::topic_properties props; | ||||||||
| props.shadow_indexing = model::shadow_indexing_mode::full; | ||||||||
| props.cleanup_policy_bitflags = model::cleanup_policy_bitflags::deletion; | ||||||||
| props.retention_bytes = tristate<size_t>(disable_tristate_t{}); | ||||||||
| props.retention_duration = tristate<std::chrono::milliseconds>( | ||||||||
| disable_tristate_t{}); | ||||||||
|
|
||||||||
| // Run two iterations of topic creation, data production, spillover, and | ||||||||
| // deletion. The second iteration will reuse the topic name, and we want to | ||||||||
| // verify that the first incarnation does not affect the | ||||||||
| // second incarnation. | ||||||||
| for (int iteration = 0; iteration < 2; ++iteration) { | ||||||||
| vlog(e2e_test_log.info, "Running iteration {}", iteration); | ||||||||
|
|
||||||||
| add_topic({model::kafka_namespace, topic_name}, 1, props).get(); | ||||||||
| wait_for_leader(ntp).get(); | ||||||||
|
||||||||
| wait_for_leader(ntp).get(); | |
| SCOPED_TRACE( | |
| fmt::format("Seeding partition data for iteration {}", iteration)); |
oleiman marked this conversation as resolved.
Show resolved
Hide resolved
Copilot
AI
Dec 20, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test verifies only the first record's key matches the expected iteration value. Consider also checking the last record or a sample of records to ensure all consumed data is from the correct iteration and not mixed with data from the previous topic incarnation.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,7 +25,10 @@ class remote_segment_generator { | |
| kafka::client::transport transport, cluster::partition& partition) | ||
| : _producer(std::move(transport)) | ||
| , _partition(partition) {} | ||
|
|
||
| remote_segment_generator& start_ix(size_t ix) { | ||
| _current_ix = ix; | ||
| return *this; | ||
| } | ||
|
Comment on lines
+28
to
+31
|
||
| remote_segment_generator& num_segments(size_t n) { | ||
| _num_remote_segs = n; | ||
| return *this; | ||
|
|
@@ -65,14 +68,13 @@ class remote_segment_generator { | |
| auto log = _partition.log(); | ||
| auto& archiver = _partition.archiver().value().get(); | ||
|
|
||
| size_t total_records = 0; | ||
| auto cur_timestamp = _base_timestamp; | ||
| while (_partition.archival_meta_stm()->manifest().size() | ||
| < _num_remote_segs) { | ||
| for (size_t i = 0; i < _batches_per_seg; i++) { | ||
| std::vector<kv_t> records = kv_t::sequence( | ||
| total_records, _records_per_batch); | ||
| total_records += _records_per_batch; | ||
| _current_ix, _records_per_batch); | ||
| _current_ix += _records_per_batch; | ||
| co_await _producer.produce_to_partition( | ||
| _partition.ntp().tp.topic, | ||
| _partition.ntp().tp.partition, | ||
|
|
@@ -114,8 +116,8 @@ class remote_segment_generator { | |
| for (size_t i = 0; i < _num_local_segs; i++) { | ||
| for (size_t i = 0; i < _batches_per_seg; i++) { | ||
| std::vector<kv_t> records = kv_t::sequence( | ||
| total_records, _records_per_batch); | ||
| total_records += _records_per_batch; | ||
| _current_ix, _records_per_batch); | ||
| _current_ix += _records_per_batch; | ||
| co_await _producer.produce_to_partition( | ||
| _partition.ntp().tp.topic, | ||
| _partition.ntp().tp.partition, | ||
|
|
@@ -129,7 +131,7 @@ class remote_segment_generator { | |
| co_await log->flush(); | ||
| co_await log->force_roll(); | ||
| } | ||
| co_return total_records; | ||
| co_return _current_ix; | ||
| } | ||
|
|
||
| private: | ||
|
|
@@ -143,6 +145,7 @@ class remote_segment_generator { | |
| size_t _batches_per_seg{1}; | ||
| std::optional<model::timestamp> _base_timestamp{std::nullopt}; | ||
| int _batch_time_delta_ms{1}; | ||
| size_t _current_ix{0}; | ||
| }; | ||
|
|
||
| } // namespace tests | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for posterity, this test would consistently fail before your fix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Every single time. The test was written before the fix.
I need to an assert though to make sure that spillover did actually happen to future proof it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😞
good idea 👍