Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/v/cloud_storage/async_manifest_materializer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ ss::future<
result<async_manifest_materializer::materialized_manifest_ptr, error_outcome>>
async_manifest_materializer::materialize_manifest(const segment_meta& meta) {
auto res = _manifest_cache->get(
std::make_tuple(_stm_manifest->get_ntp(), meta.base_offset), _ctxlog);
std::make_tuple(
_stm_manifest->get_ntp(),
_stm_manifest->get_revision_id(),
meta.base_offset),
_ctxlog);
if (res) {
return ss::make_ready_future<
result<materialized_manifest_ptr, error_outcome>>(std::move(res));
Expand Down Expand Up @@ -110,6 +114,7 @@ ss::future<> async_manifest_materializer::run_bg_loop() {
if (!_manifest_cache->contains(
std::make_tuple(
_stm_manifest->get_ntp(),
_stm_manifest->get_revision_id(),
front.search_vec.base_offset))) {
// Manifest is not cached and has to be hydrated and/or
// materialized.
Expand Down Expand Up @@ -185,7 +190,9 @@ ss::future<> async_manifest_materializer::run_bg_loop() {
}
auto cached = _manifest_cache->get(
std::make_tuple(
_stm_manifest->get_ntp(), front.search_vec.base_offset),
_stm_manifest->get_ntp(),
_stm_manifest->get_revision_id(),
front.search_vec.base_offset),
_ctxlog);
front.promise.set_value(cached);
vlog(
Expand Down
8 changes: 5 additions & 3 deletions src/v/cloud_storage/materialized_manifest_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ template<>
struct fmt::formatter<cloud_storage::manifest_cache_key>
: public fmt::formatter<std::string_view> {
auto format(const cloud_storage::manifest_cache_key& key, auto& ctx) const {
const auto& [ntp, o] = key;
const auto& [ntp, revision, o] = key;
return formatter<std::string_view>::format(
fmt::format("[{}:{}]", ntp, o), ctx);
fmt::format("[{}:{}:{}]", ntp, revision, o), ctx);
}
};

Expand Down Expand Up @@ -169,7 +169,9 @@ void materialized_manifest_cache::put(
manifest.get_ntp());
const model::ntp& ntp = manifest.get_ntp();
auto key = manifest_cache_key(
ntp, manifest.get_start_offset().value_or(model::offset{}));
ntp,
manifest.get_revision_id(),
manifest.get_start_offset().value_or(model::offset{}));
vlog(ctxlog.debug, "Cache PUT key {}, {} units", key, s.count());
if (!_eviction_rollback.empty()) {
auto it = lookup_eviction_rollback_list(key);
Expand Down
4 changes: 3 additions & 1 deletion src/v/cloud_storage/materialized_manifest_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@

namespace cloud_storage {

using manifest_cache_key = std::tuple<model::ntp, model::offset>;
using manifest_cache_key
= std::tuple<model::ntp, model::initial_revision_id, model::offset>;

/// Materialized spillover manifest
///
Expand All @@ -47,6 +48,7 @@ struct materialized_manifest
manifest_cache_key get_key() const {
return std::make_tuple(
manifest.get_ntp(),
manifest.get_revision_id(),
manifest.get_start_offset().value_or(model::offset{}));
}

Expand Down
86 changes: 86 additions & 0 deletions src/v/cloud_storage/tests/cloud_storage_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1542,6 +1542,92 @@ TEST_F(ManualFixture, TestSpilloverWithTruncationRetainsStartOffset) {
archiver.manifest().get_start_kafka_offset_override(), kafka::offset{48});
}

// Test a scenario where after a topic recreation the spillover manifests from
// the previous incarnation could be applied to the new topic, causing errors or
// reading wrong data.
Comment on lines +1545 to +1547
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for posterity, this test would consistently fail before your fix?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Every single time. The test was written before the fix.

I need to an assert though to make sure that spillover did actually happen to future proof it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every single time

😞

assert ... to make sure that spillover did actually happen

good idea 👍

TEST_F(ManualFixture, TestSpilloverCacheCollision) {
test_local_cfg.get("log_compaction_interval_ms")
.set_value(std::chrono::duration_cast<std::chrono::milliseconds>(1s));
test_local_cfg.get("cloud_storage_disable_upload_loop_for_tests")
.set_value(true);
test_local_cfg.get("cloud_storage_spillover_manifest_max_segments")
.set_value(std::make_optional<size_t>(2));
test_local_cfg.get("cloud_storage_spillover_manifest_size")
.set_value(std::optional<size_t>{});

const model::topic topic_name("spillover_truncate_test");
model::ntp ntp(model::kafka_namespace, topic_name, 0);

cluster::topic_properties props;
props.shadow_indexing = model::shadow_indexing_mode::full;
props.cleanup_policy_bitflags = model::cleanup_policy_bitflags::deletion;
props.retention_bytes = tristate<size_t>(disable_tristate_t{});
props.retention_duration = tristate<std::chrono::milliseconds>(
disable_tristate_t{});

// Run two iterations of topic creation, data production, spillover, and
// deletion. The second iteration will reuse the topic name, and we want to
// verify that the first incarnation does not affect the
// second incarnation.
for (int iteration = 0; iteration < 2; ++iteration) {
vlog(e2e_test_log.info, "Running iteration {}", iteration);

add_topic({model::kafka_namespace, topic_name}, 1, props).get();
wait_for_leader(ntp).get();
Copy link

Copilot AI Dec 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment 'Seeding partition data' is correct, but consider making it more descriptive for this specific iteration context, such as 'Seeding partition data for iteration N' to improve test output clarity.

Suggested change
wait_for_leader(ntp).get();
SCOPED_TRACE(
fmt::format("Seeding partition data for iteration {}", iteration));

Copilot uses AI. Check for mistakes.

auto partition = app.partition_manager.local().get(ntp);
auto* archiver = &partition->archiver().value().get();
archiver->initialize_probe();

vlog(e2e_test_log.info, "Seeding partition data");

tests::remote_segment_generator gen(
make_kafka_client().get(), *partition);
auto deferred_g_close = ss::defer([&gen] { gen.stop().get(); });

auto total_records = gen.num_segments(6)
.batches_per_segment(5)
.records_per_batch(1)
.start_ix(iteration)
.produce()
.get();
ASSERT_GE(total_records, 30);

ASSERT_TRUE(archiver->sync_for_tests().get());

// Evict local log to force reads from tiered storage.
vlog(e2e_test_log.info, "Setting cloud_gc to evict local log");
partition->log()->set_cloud_gc_offset(
archiver->manifest().get_last_offset());

RPTEST_REQUIRE_EVENTUALLY(10s, [log = partition->log()] {
return log->segments().size() == 1;
});

vlog(e2e_test_log.info, "Applying spillover");
archiver->apply_spillover().get();

// Verify that spillover happened as we expect.
ASSERT_EQ(archiver->manifest().get_spillover_map().size(), 2);

// Consume all data.
tests::kafka_consume_transport consumer(make_kafka_client().get());
auto deferred_c_close = ss::defer(
[&consumer] { consumer.stop().get(); });
Copy link

Copilot AI Dec 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test verifies only the first record's key matches the expected iteration value. Consider also checking the last record or a sample of records to ensure all consumed data is from the correct iteration and not mixed with data from the previous topic incarnation.

Copilot uses AI. Check for mistakes.
consumer.start().get();
auto consumed = consumer
.consume_from_partition(
topic_name, ntp.tp.partition, model::offset{0})
.get();
ASSERT_EQ(consumed.size(), 30);
ASSERT_EQ(consumed.front().key, fmt::format("key{}", iteration));

// Delete topic.
vlog(e2e_test_log.info, "Deleting topic");
delete_topic({model::kafka_namespace, topic_name}).get();
}
}

INSTANTIATE_TEST_SUITE_P(WithOverride, EndToEndFixture, ::testing::Bool());
INSTANTIATE_TEST_SUITE_P(
WithOverride, CloudStorageEndToEndManualTest, ::testing::Bool());
17 changes: 5 additions & 12 deletions src/v/cloud_storage/tests/materialized_manifest_cache_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,11 @@
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/

#include "bytes/iostream.h"
#include "cloud_io/tests/s3_imposter.h"
#include "cloud_storage/materialized_manifest_cache.h"
#include "cloud_storage/spillover_manifest.h"
#include "cloud_storage/tests/cloud_storage_fixture.h"
#include "cloud_storage/tests/util.h"
#include "cloud_storage/tests/common_def.h"
#include "cloud_storage/types.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "model/timeout_clock.h"
#include "utils/retry_chain_node.h"

#include <seastar/core/abort_source.hh>
Expand All @@ -27,10 +23,7 @@
#include <boost/test/tools/old/interface.hpp>
#include <boost/test/unit_test.hpp>

#include <chrono>
#include <iterator>
#include <numeric>

using namespace std::chrono_literals;
using namespace cloud_storage;

static ss::logger test_log("async_manifest_view_log");
Expand All @@ -48,11 +41,11 @@ static spillover_manifest make_manifest(model::offset base) {
}

manifest_cache_key make_key(int64_t off) {
return std::make_tuple(manifest_ntp, model::offset{off});
return std::make_tuple(manifest_ntp, manifest_rev, model::offset{off});
}

manifest_cache_key make_key(model::offset off) {
return std::make_tuple(manifest_ntp, off);
return std::make_tuple(manifest_ntp, manifest_rev, off);
}

// Add elements to an empty cache and verify that they are added correctly.
Expand Down
17 changes: 10 additions & 7 deletions src/v/cloud_storage/tests/produce_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ class remote_segment_generator {
kafka::client::transport transport, cluster::partition& partition)
: _producer(std::move(transport))
, _partition(partition) {}

remote_segment_generator& start_ix(size_t ix) {
_current_ix = ix;
return *this;
}
Comment on lines +28 to +31
Copy link

Copilot AI Dec 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new start_ix method lacks documentation explaining its purpose and when it should be used. Add a comment describing that this method sets the starting index for record generation, useful for continuing sequences across multiple invocations or topic incarnations.

Copilot uses AI. Check for mistakes.
remote_segment_generator& num_segments(size_t n) {
_num_remote_segs = n;
return *this;
Expand Down Expand Up @@ -65,14 +68,13 @@ class remote_segment_generator {
auto log = _partition.log();
auto& archiver = _partition.archiver().value().get();

size_t total_records = 0;
auto cur_timestamp = _base_timestamp;
while (_partition.archival_meta_stm()->manifest().size()
< _num_remote_segs) {
for (size_t i = 0; i < _batches_per_seg; i++) {
std::vector<kv_t> records = kv_t::sequence(
total_records, _records_per_batch);
total_records += _records_per_batch;
_current_ix, _records_per_batch);
_current_ix += _records_per_batch;
co_await _producer.produce_to_partition(
_partition.ntp().tp.topic,
_partition.ntp().tp.partition,
Expand Down Expand Up @@ -114,8 +116,8 @@ class remote_segment_generator {
for (size_t i = 0; i < _num_local_segs; i++) {
for (size_t i = 0; i < _batches_per_seg; i++) {
std::vector<kv_t> records = kv_t::sequence(
total_records, _records_per_batch);
total_records += _records_per_batch;
_current_ix, _records_per_batch);
_current_ix += _records_per_batch;
co_await _producer.produce_to_partition(
_partition.ntp().tp.topic,
_partition.ntp().tp.partition,
Expand All @@ -129,7 +131,7 @@ class remote_segment_generator {
co_await log->flush();
co_await log->force_roll();
}
co_return total_records;
co_return _current_ix;
}

private:
Expand All @@ -143,6 +145,7 @@ class remote_segment_generator {
size_t _batches_per_seg{1};
std::optional<model::timestamp> _base_timestamp{std::nullopt};
int _batch_time_delta_ms{1};
size_t _current_ix{0};
};

} // namespace tests