Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions src/v/storage/disk_log_appender.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ ss::future<> disk_log_appender::initialize() {
}

bool disk_log_appender::segment_is_appendable(model::term_id batch_term) const {
if (!_seg || !_seg->has_appender() || _seg->is_tombstone()) {
if (
!_seg || !_seg->has_appender() || _seg->is_tombstone()
|| _seg->is_closed()) {
// The latest segment with which this log_appender has called
// initialize() has been rolled and no longer has an segment appender
// (e.g. because segment.ms rolled onto a new segment). There is likely
Expand Down Expand Up @@ -114,12 +116,9 @@ disk_log_appender::operator()(model::record_batch& batch) {
co_return stop;
} catch (...) {
release_lock();
vlog(
stlog.info,
"Could not append batch: {} - {}",
std::current_exception(),
*this);
_log.get_probe().batch_write_error(std::current_exception());
auto e = std::current_exception();
vlog(stlog.error, "Could not append batch: {} - {}", e, *this);
_log.get_probe().batch_write_error();
throw;
}
}
Expand Down
92 changes: 60 additions & 32 deletions src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "model/timestamp.h"
#include "reflection/adl.h"
#include "ssx/future-util.h"
#include "ssx/semaphore.h"
#include "storage/api.h"
#include "storage/chunk_cache.h"
#include "storage/compacted_offset_list.h"
Expand Down Expand Up @@ -222,14 +223,21 @@ size_t disk_log_impl::compute_max_segment_size() {
ss::future<> disk_log_impl::remove() {
vassert(!_closed, "Invalid double closing of log - {}", *this);

// Request abort of compaction before obtaining rewrite lock holder.
_compaction_as.request_abort();

// To prevent a race between prefix truncation and closing, obtain the
// rewrite lock here and indicate it as broken for any future waiters.
auto rewrite_lock_holder = co_await _segment_rewrite_lock.get_units();
_segment_rewrite_lock.broken();

// To prevent racing with a new segment being rolled, obtain the mutex here,
// and indicate it as broken for any future waiters.
auto roll_lock_holder = co_await _segments_rolling_lock.get_units();
_segments_rolling_lock.broken();

_closed = true;
// wait for compaction to finish
_compaction_as.request_abort();
co_await _compaction_housekeeping_gate.close();
// gets all the futures started in the background
std::vector<ss::future<>> permanent_delete;
Expand Down Expand Up @@ -275,6 +283,14 @@ ss::future<> disk_log_impl::start(
ss::future<std::optional<ss::sstring>> disk_log_impl::close() {
vassert(!_closed, "Invalid double closing of log - {}", *this);

// Request abort of compaction before obtaining rewrite lock holder.
_compaction_as.request_abort();

// To prevent a race between prefix truncation and closing, obtain the
// rewrite lock here and indicate it as broken for any future waiters.
auto rewrite_lock_holder = co_await _segment_rewrite_lock.get_units();
_segment_rewrite_lock.broken();

// To prevent racing with a new segment being rolled, obtain the mutex here,
// and indicate it as broken for any future waiters.
auto roll_lock_holder = co_await _segments_rolling_lock.get_units();
Expand All @@ -287,10 +303,11 @@ ss::future<std::optional<ss::sstring>> disk_log_impl::close() {
&& !_eviction_monitor->promise.get_future().available()) {
_eviction_monitor->promise.set_exception(segment_closed_exception());
}

// wait for compaction to finish
vlog(stlog.trace, "waiting for {} compaction to finish", config().ntp());
_compaction_as.request_abort();
co_await _compaction_housekeeping_gate.close();

vlog(stlog.trace, "stopping {} readers cache", config().ntp());

// close() on the segments is not expected to fail, but it might
Expand Down Expand Up @@ -3136,39 +3153,50 @@ disk_log_impl::remove_prefix_full_segments(truncate_prefix_config cfg) {
// guaranteed that no other operation will remove the segment from the
// segment list head (front).
auto ptr = _segs.front();
return _readers_cache->evict_segment_readers(ptr).then(
[this, ptr, prefix_truncate_offset](
readers_cache::range_lock_holder cache_lock) {
return ptr->write_lock().then(
[this,
ptr,
prefix_truncate_offset,
cache_lock = std::move(cache_lock)](
ss::rwlock::holder lock_holder) {
// after the lock is acquired, check if the segment is
// still eligible for deletion as there might have been
// concurrent appends. If segments collection is empty we
// can skip prefix truncation as the segments were removed
if (
keep_segment_after_prefix_truncate(
ptr, prefix_truncate_offset)
|| _segs.empty()) {
return ss::make_ready_future<>();
}
_segs.pop_front();
_probe->add_bytes_prefix_truncated(ptr->file_size());
// first call the remove segments, then release the lock
// before waiting for future to finish
auto f = remove_segment_permanently(
ptr, "remove_prefix_full_segments");
lock_holder.return_all();

return f;
});
});
return do_remove_prefix_full_segment(
std::move(ptr), prefix_truncate_offset);
});
}

ss::future<> disk_log_impl::do_remove_prefix_full_segment(
ss::lw_shared_ptr<segment> ptr, model::offset prefix_truncate_offset) {
auto cache_lock = co_await _readers_cache->evict_segment_readers(ptr);

// We will be calling `remove_segment_permanently()` below on what is
// potentially the active segment. If there is a concurrent append occuring,
// we may get into a race with the `disk_log_appender` since we return the
// units in `lock_holder` (i.e the segment's write lock) while the future
// for `remove_segment_permanently()` is still unresolved. In this case, we
// need to obtain units from `_segments_rolling_lock` to prevent any
// concurrency issues.
std::optional<ssx::semaphore_units> seg_rolling_units;
if (ptr->has_appender()) {
// This cannot throw due to a broken semaphore, as we are already
// holding `_segment_rewrite_lock`.
seg_rolling_units = co_await _segments_rolling_lock.get_units();
}

auto lock_holder = co_await ptr->write_lock();

// after the locks are acquired, check if the segment is
// still eligible for deletion as there might have been
// concurrent appends. If segments collection is empty we
// can skip prefix truncation as the segments were removed
if (
keep_segment_after_prefix_truncate(ptr, prefix_truncate_offset)
|| _segs.empty()) {
co_return;
}
_segs.pop_front();
_probe->add_bytes_prefix_truncated(ptr->file_size());
// first call the remove segments, then release the lock
// before waiting for future to finish
auto f = remove_segment_permanently(ptr, "remove_prefix_full_segments");
lock_holder.return_all();

co_return co_await std::move(f);
}

ss::future<> disk_log_impl::truncate_prefix(truncate_prefix_config cfg) {
vassert(!_closed, "truncate_prefix() on closed log - {}", *this);
co_await _failure_probes.truncate_prefix().then([this, cfg]() mutable {
Expand Down
2 changes: 2 additions & 0 deletions src/v/storage/disk_log_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,8 @@ class disk_log_impl final : public log {

ss::future<> do_truncate_prefix(truncate_prefix_config);
ss::future<> remove_prefix_full_segments(truncate_prefix_config);
ss::future<>
do_remove_prefix_full_segment(ss::lw_shared_ptr<segment>, model::offset);

// Propagate a request to the Raft layer to evict segments up until the
// specified offest.
Expand Down
5 changes: 1 addition & 4 deletions src/v/storage/probe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,7 @@ void probe::delete_segment(const segment& s) {
_partition_bytes -= s.file_size();
}

void probe::batch_write_error(const std::exception_ptr& e) {
stlog.error("Error writing record batch {}", e);
++_batch_write_errors;
}
void probe::batch_write_error() { ++_batch_write_errors; }

void readers_cache_probe::setup_metrics(const model::ntp& ntp) {
if (config::shard_local_cfg().disable_metrics()) {
Expand Down
2 changes: 1 addition & 1 deletion src/v/storage/probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class probe {
_num_adjacent_segments_compacted += num_segments_compacted;
}

void batch_write_error(const std::exception_ptr& e);
void batch_write_error();

void add_batches_read(uint32_t batches) { _batches_read += batches; }
void add_cached_batches_read(uint32_t batches) {
Expand Down
100 changes: 96 additions & 4 deletions src/v/storage/tests/storage_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1300,7 +1300,7 @@ TEST_F(storage_test_fixture, test_compaction_preserve_state) {
ASSERT_EQ(log->offsets().dirty_offset, model::offset(2));
}

void append_single_record_batch(
ss::future<> append_single_record_batch_coro(
ss::shared_ptr<storage::log> log,
int cnt,
model::term_id term,
Expand Down Expand Up @@ -1335,12 +1335,20 @@ void append_single_record_batch(
.timeout = model::no_timeout,
};

std::move(reader)
.for_each_ref(log->make_appender(cfg), cfg.timeout)
.get();
co_await std::move(reader).for_each_ref(
log->make_appender(cfg), cfg.timeout);
}
}

void append_single_record_batch(
ss::shared_ptr<storage::log> log,
int cnt,
model::term_id term,
size_t val_size = 0,
bool rand_key = false) {
append_single_record_batch_coro(log, cnt, term, val_size, rand_key).get();
}

/**
* Test scenario:
* 1) append few single record batches in term 1
Expand Down Expand Up @@ -6501,3 +6509,87 @@ TEST_F(storage_test_fixture, adjacent_merge_compaction_advances_generation_id) {

ASSERT_EQ(gen_id_after(), gen_id_before() + 1);
}

TEST_F(storage_test_fixture, truncate_prefix_append_and_close) {
using namespace storage;
disk_log_builder b;
b | start();
auto log = b.get_log();
auto& disk_log = b.get_disk_log_impl();
const auto num_segs = 5;
const auto start_offset = 0;
const auto records_per_seg = 150;
for (int i = 0; i < num_segs; ++i) {
auto offset = start_offset + i * records_per_seg;
b | add_segment(offset)
| add_random_batch(
offset, records_per_seg, maybe_compress_batches::yes);
disk_log.force_roll().get();
}

ss::abort_source as;
mutex log_mutex{"e2e_test::log_mutex"};
auto random_sleep = [](int min, int max) {
return ss::sleep(
std::chrono::milliseconds(random_generators::get_int(min, max)));
};
static constexpr auto append_sleep_min = 5;
static constexpr auto append_sleep_max = 20;
auto append_fut = ss::do_until(
[&]() { return as.abort_requested(); },
[&]() {
auto maybe_u = log_mutex.try_get_units();
if (!maybe_u.has_value()) {
return ss::now();
}
auto u = std::move(maybe_u).value();
return append_single_record_batch_coro(log, 10, model::term_id{0})
.then([&, u = std::move(u)]() mutable {
u.return_all();
return random_sleep(append_sleep_min, append_sleep_max);
});
});

static constexpr auto close_sleep_min = 150;
static constexpr auto close_sleep_max = 200;
auto close_fut = random_sleep(close_sleep_min, close_sleep_max).then([&]() {
return log_mutex.get_units().then([&](ssx::semaphore_units u) {
(void)u;
log_mutex.broken();
// Calling `builder.stop()` will stop the log_manager and therefore
// close the log.
return b.stop();
});
});
static constexpr auto prefix_truncate_sleep_min = 100;
static constexpr auto prefix_truncate_sleep_max = 149;
// Expect 0 segments after prefix truncation in the happy path.
size_t expected_segment_count = 0;
auto prefix_truncate_fut
= random_sleep(prefix_truncate_sleep_min, prefix_truncate_sleep_max)
.then([&]() {
return log_mutex.get_units().then([&](ssx::semaphore_units u) {
as.request_abort();
u.return_all();
return log
->truncate_prefix(truncate_prefix_config(
model::next_offset(log->offsets().dirty_offset)))
.handle_exception([&](const std::exception_ptr&) {
// We may have raced with a close() operation when
// attempting to obtain locks within
// `truncate_prefix()`. In this case, we should expect
// that no segments were removed from the log.
expected_segment_count = log->segment_count();
return ss::now();
});
});
});

ss::when_all(
std::move(append_fut),
std::move(close_fut),
std::move(prefix_truncate_fut))
.get();

ASSERT_EQ(log->segment_count(), expected_segment_count);
}