Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/v/cloud_storage/async_manifest_materializer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ ss::future<
result<async_manifest_materializer::materialized_manifest_ptr, error_outcome>>
async_manifest_materializer::materialize_manifest(const segment_meta& meta) {
auto res = _manifest_cache->get(
std::make_tuple(_stm_manifest->get_ntp(), meta.base_offset), _ctxlog);
std::make_tuple(
_stm_manifest->get_ntp(),
_stm_manifest->get_revision_id(),
meta.base_offset),
_ctxlog);
if (res) {
return ss::make_ready_future<
result<materialized_manifest_ptr, error_outcome>>(std::move(res));
Expand Down Expand Up @@ -110,6 +114,7 @@ ss::future<> async_manifest_materializer::run_bg_loop() {
if (!_manifest_cache->contains(
std::make_tuple(
_stm_manifest->get_ntp(),
_stm_manifest->get_revision_id(),
front.search_vec.base_offset))) {
// Manifest is not cached and has to be hydrated and/or
// materialized.
Expand Down Expand Up @@ -185,7 +190,9 @@ ss::future<> async_manifest_materializer::run_bg_loop() {
}
auto cached = _manifest_cache->get(
std::make_tuple(
_stm_manifest->get_ntp(), front.search_vec.base_offset),
_stm_manifest->get_ntp(),
_stm_manifest->get_revision_id(),
front.search_vec.base_offset),
_ctxlog);
front.promise.set_value(cached);
vlog(
Expand Down
8 changes: 5 additions & 3 deletions src/v/cloud_storage/materialized_manifest_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ template<>
struct fmt::formatter<cloud_storage::manifest_cache_key>
: public fmt::formatter<std::string_view> {
auto format(const cloud_storage::manifest_cache_key& key, auto& ctx) const {
const auto& [ntp, o] = key;
const auto& [ntp, revision, o] = key;
return formatter<std::string_view>::format(
fmt::format("[{}:{}]", ntp, o), ctx);
fmt::format("[{}:{}:{}]", ntp, revision, o), ctx);
}
};

Expand Down Expand Up @@ -169,7 +169,9 @@ void materialized_manifest_cache::put(
manifest.get_ntp());
const model::ntp& ntp = manifest.get_ntp();
auto key = manifest_cache_key(
ntp, manifest.get_start_offset().value_or(model::offset{}));
ntp,
manifest.get_revision_id(),
manifest.get_start_offset().value_or(model::offset{}));
vlog(ctxlog.debug, "Cache PUT key {}, {} units", key, s.count());
if (!_eviction_rollback.empty()) {
auto it = lookup_eviction_rollback_list(key);
Expand Down
4 changes: 3 additions & 1 deletion src/v/cloud_storage/materialized_manifest_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@

namespace cloud_storage {

using manifest_cache_key = std::tuple<model::ntp, model::offset>;
using manifest_cache_key
= std::tuple<model::ntp, model::initial_revision_id, model::offset>;

/// Materialized spillover manifest
///
Expand All @@ -47,6 +48,7 @@ struct materialized_manifest
manifest_cache_key get_key() const {
return std::make_tuple(
manifest.get_ntp(),
manifest.get_revision_id(),
manifest.get_start_offset().value_or(model::offset{}));
}

Expand Down
86 changes: 86 additions & 0 deletions src/v/cloud_storage/tests/cloud_storage_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1542,6 +1542,92 @@ TEST_F(ManualFixture, TestSpilloverWithTruncationRetainsStartOffset) {
archiver.manifest().get_start_kafka_offset_override(), kafka::offset{48});
}

// Test a scenario where after a topic recreation the spillover manifests from
// the previous incarnation could be applied to the new topic, causing errors or
// reading wrong data.
TEST_F(ManualFixture, TestSpilloverCacheCollision) {
test_local_cfg.get("log_compaction_interval_ms")
.set_value(std::chrono::duration_cast<std::chrono::milliseconds>(1s));
test_local_cfg.get("cloud_storage_disable_upload_loop_for_tests")
.set_value(true);
test_local_cfg.get("cloud_storage_spillover_manifest_max_segments")
.set_value(std::make_optional<size_t>(2));
test_local_cfg.get("cloud_storage_spillover_manifest_size")
.set_value(std::optional<size_t>{});

const model::topic topic_name("spillover_truncate_test");
model::ntp ntp(model::kafka_namespace, topic_name, 0);

cluster::topic_properties props;
props.shadow_indexing = model::shadow_indexing_mode::full;
props.cleanup_policy_bitflags = model::cleanup_policy_bitflags::deletion;
props.retention_bytes = tristate<size_t>(disable_tristate_t{});
props.retention_duration = tristate<std::chrono::milliseconds>(
disable_tristate_t{});

// Run two iterations of topic creation, data production, spillover, and
// deletion. The second iteration will reuse the topic name, and we want to
// verify that the first incarnation does not affect the
// second incarnation.
for (int iteration = 0; iteration < 2; ++iteration) {
vlog(e2e_test_log.info, "Running iteration {}", iteration);

add_topic({model::kafka_namespace, topic_name}, 1, props).get();
wait_for_leader(ntp).get();

auto partition = app.partition_manager.local().get(ntp);
auto* archiver = &partition->archiver().value().get();
archiver->initialize_probe();

vlog(e2e_test_log.info, "Seeding partition data");

tests::remote_segment_generator gen(
make_kafka_client().get(), *partition);
auto deferred_g_close = ss::defer([&gen] { gen.stop().get(); });

auto total_records = gen.num_segments(6)
.batches_per_segment(5)
.records_per_batch(1)
.start_ix(iteration)
.produce()
.get();
ASSERT_GE(total_records, 30);

ASSERT_TRUE(archiver->sync_for_tests().get());

// Evict local log to force reads from tiered storage.
vlog(e2e_test_log.info, "Setting cloud_gc to evict local log");
partition->log()->set_cloud_gc_offset(
archiver->manifest().get_last_offset());

RPTEST_REQUIRE_EVENTUALLY(10s, [log = partition->log()] {
return log->segments().size() == 1;
});

vlog(e2e_test_log.info, "Applying spillover");
archiver->apply_spillover().get();

// Verify that spillover happened as we expect.
ASSERT_EQ(archiver->manifest().get_spillover_map().size(), 2);

// Consume all data.
tests::kafka_consume_transport consumer(make_kafka_client().get());
auto deferred_c_close = ss::defer(
[&consumer] { consumer.stop().get(); });
consumer.start().get();
auto consumed = consumer
.consume_from_partition(
topic_name, ntp.tp.partition, model::offset{0})
.get();
ASSERT_EQ(consumed.size(), 30);
ASSERT_EQ(consumed.front().key, fmt::format("key{}", iteration));

// Delete topic.
vlog(e2e_test_log.info, "Deleting topic");
delete_topic({model::kafka_namespace, topic_name}).get();
}
}

INSTANTIATE_TEST_SUITE_P(WithOverride, EndToEndFixture, ::testing::Bool());
INSTANTIATE_TEST_SUITE_P(
WithOverride, CloudStorageEndToEndManualTest, ::testing::Bool());
17 changes: 5 additions & 12 deletions src/v/cloud_storage/tests/materialized_manifest_cache_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,11 @@
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/

#include "bytes/iostream.h"
#include "cloud_io/tests/s3_imposter.h"
#include "cloud_storage/materialized_manifest_cache.h"
#include "cloud_storage/spillover_manifest.h"
#include "cloud_storage/tests/cloud_storage_fixture.h"
#include "cloud_storage/tests/util.h"
#include "cloud_storage/tests/common_def.h"
#include "cloud_storage/types.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "model/timeout_clock.h"
#include "utils/retry_chain_node.h"

#include <seastar/core/abort_source.hh>
Expand All @@ -27,10 +23,7 @@
#include <boost/test/tools/old/interface.hpp>
#include <boost/test/unit_test.hpp>

#include <chrono>
#include <iterator>
#include <numeric>

using namespace std::chrono_literals;
using namespace cloud_storage;

static ss::logger test_log("async_manifest_view_log");
Expand All @@ -48,11 +41,11 @@ static spillover_manifest make_manifest(model::offset base) {
}

manifest_cache_key make_key(int64_t off) {
return std::make_tuple(manifest_ntp, model::offset{off});
return std::make_tuple(manifest_ntp, manifest_rev, model::offset{off});
}

manifest_cache_key make_key(model::offset off) {
return std::make_tuple(manifest_ntp, off);
return std::make_tuple(manifest_ntp, manifest_rev, off);
}

// Add elements to an empty cache and verify that they are added correctly.
Expand Down
17 changes: 10 additions & 7 deletions src/v/cloud_storage/tests/produce_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ class remote_segment_generator {
kafka::client::transport transport, cluster::partition& partition)
: _producer(std::move(transport))
, _partition(partition) {}

remote_segment_generator& start_ix(size_t ix) {
_current_ix = ix;
return *this;
}
remote_segment_generator& num_segments(size_t n) {
_num_remote_segs = n;
return *this;
Expand Down Expand Up @@ -65,14 +68,13 @@ class remote_segment_generator {
auto log = _partition.log();
auto& archiver = _partition.archiver().value().get();

size_t total_records = 0;
auto cur_timestamp = _base_timestamp;
while (_partition.archival_meta_stm()->manifest().size()
< _num_remote_segs) {
for (size_t i = 0; i < _batches_per_seg; i++) {
std::vector<kv_t> records = kv_t::sequence(
total_records, _records_per_batch);
total_records += _records_per_batch;
_current_ix, _records_per_batch);
_current_ix += _records_per_batch;
co_await _producer.produce_to_partition(
_partition.ntp().tp.topic,
_partition.ntp().tp.partition,
Expand Down Expand Up @@ -114,8 +116,8 @@ class remote_segment_generator {
for (size_t i = 0; i < _num_local_segs; i++) {
for (size_t i = 0; i < _batches_per_seg; i++) {
std::vector<kv_t> records = kv_t::sequence(
total_records, _records_per_batch);
total_records += _records_per_batch;
_current_ix, _records_per_batch);
_current_ix += _records_per_batch;
co_await _producer.produce_to_partition(
_partition.ntp().tp.topic,
_partition.ntp().tp.partition,
Expand All @@ -129,7 +131,7 @@ class remote_segment_generator {
co_await log->flush();
co_await log->force_roll();
}
co_return total_records;
co_return _current_ix;
}

private:
Expand All @@ -143,6 +145,7 @@ class remote_segment_generator {
size_t _batches_per_seg{1};
std::optional<model::timestamp> _base_timestamp{std::nullopt};
int _batch_time_delta_ms{1};
size_t _current_ix{0};
};

} // namespace tests