Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/v/datalake/record_multiplexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,8 @@ ss::future<writer_error> record_multiplexer::flush_writers() {
}

ss::future<result<record_multiplexer::write_result, writer_error>>
record_multiplexer::finish() && {
record_multiplexer::finish(
record_multiplexer::finished_files& finished_files) && {
auto writers = std::move(_writers);
for (auto& [id, writer] : writers) {
auto res = co_await std::move(*writer).finish();
Expand All @@ -362,7 +363,7 @@ record_multiplexer::finish() && {
std::move(
files.begin(),
files.end(),
std::back_inserter(_result->data_files));
std::back_inserter(finished_files.data_files));
}
}
if (_invalid_record_writer) {
Expand All @@ -375,7 +376,7 @@ record_multiplexer::finish() && {
std::move(
files.begin(),
files.end(),
std::back_inserter(_result->dlq_files));
std::back_inserter(finished_files.dlq_files));
}
}
if (_error && !is_recoverable_error(_error.value())) {
Expand Down
14 changes: 8 additions & 6 deletions src/v/datalake/record_multiplexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,6 @@ class record_multiplexer {
kafka::offset start_offset;
// last offset of the last translated batch (inclusive)
kafka::offset last_offset;
// vector containing a list of files that were written during
// translation.
chunked_vector<partitioning_writer::partitioned_file> data_files;
// files with invalid records
chunked_vector<partitioning_writer::partitioned_file> dlq_files;
// Total number of kafka bytes processed by the multiplexer
uint64_t kafka_bytes_processed{0};
};
Expand Down Expand Up @@ -104,12 +99,19 @@ class record_multiplexer {
*/
ss::future<writer_error> flush_writers();

struct finished_files {
// vector containing a list of files that were written during
// translation.
chunked_vector<partitioning_writer::partitioned_file> data_files;
// files with invalid records
chunked_vector<partitioning_writer::partitioned_file> dlq_files;
};
/**
* Cleanup and return the result. Should be the last operation to
* be called. May not be called in parallel while multiplexing is in
* progress.
*/
ss::future<result<write_result, writer_error>> finish() &&;
ss::future<result<write_result, writer_error>> finish(finished_files&) &&;

size_t buffered_bytes() const;

Expand Down
16 changes: 16 additions & 0 deletions src/v/datalake/record_schema_resolver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,22 @@ binary_type_resolver::resolve_identifier(schema_identifier) const {
co_return type_resolver::errc::translation_error;
}

ss::future<checked<type_and_buf, type_resolver::errc>>
test_binary_type_resolver::resolve_buf_type(std::optional<iobuf> b) const {
if (injected_error_.has_value()) {
co_return *injected_error_;
}
co_return co_await binary_type_resolver::resolve_buf_type(std::move(b));
}

ss::future<checked<resolved_type, type_resolver::errc>>
test_binary_type_resolver::resolve_identifier(schema_identifier id) const {
if (injected_error_.has_value()) {
co_return *injected_error_;
}
co_return co_await binary_type_resolver::resolve_identifier(std::move(id));
}

ss::future<checked<type_and_buf, type_resolver::errc>>
record_schema_resolver::resolve_buf_type(std::optional<iobuf> b) const {
if (!b.has_value()) {
Expand Down
14 changes: 14 additions & 0 deletions src/v/datalake/record_schema_resolver.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,20 @@ class binary_type_resolver : public type_resolver {
~binary_type_resolver() override = default;
};

class test_binary_type_resolver : public binary_type_resolver {
public:
ss::future<checked<type_and_buf, type_resolver::errc>>
resolve_buf_type(std::optional<iobuf> b) const override;

ss::future<checked<resolved_type, errc>>
resolve_identifier(schema_identifier) const override;
~test_binary_type_resolver() override = default;
void set_fail_requests(type_resolver::errc e) { injected_error_ = e; }

private:
std::optional<type_resolver::errc> injected_error_{};
};

// record_schema_resolver uses the schema registry wire format
// to decode messages and resolve their schemas.
class record_schema_resolver : public type_resolver {
Expand Down
22 changes: 12 additions & 10 deletions src/v/datalake/tests/gtest_record_multiplexer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,13 @@ TEST(DatalakeMultiplexerTest, TestMultiplexer) {
.multiplex(
std::move(reader), kafka::offset{start_offset}, model::no_timeout, as)
.get();
auto result = std::move(multiplexer).finish().get();
record_multiplexer::finished_files files;
auto result = std::move(multiplexer).finish(files).get();

ASSERT_TRUE(result.has_value());
ASSERT_EQ(result.value().data_files.size(), 1);
ASSERT_EQ(files.data_files.size(), 1);
EXPECT_EQ(
result.value().data_files[0].local_file.row_count,
record_count * batch_count);
files.data_files[0].local_file.row_count, record_count * batch_count);
EXPECT_EQ(result.value().start_offset(), start_offset);
// Subtract one since offsets end at 0, and this is an inclusive range.
EXPECT_EQ(
Expand Down Expand Up @@ -136,7 +136,8 @@ TEST(DatalakeMultiplexerTest, TestMultiplexerWriteError) {
multiplexer
.multiplex(std::move(reader), kafka::offset{0}, model::no_timeout, as)
.get();
auto res = std::move(multiplexer).finish().get();
record_multiplexer::finished_files files;
auto res = std::move(multiplexer).finish(files).get();
ASSERT_TRUE(res.has_error());
EXPECT_EQ(res.error(), datalake::writer_error::parquet_conversion_error);
}
Expand Down Expand Up @@ -190,13 +191,13 @@ TEST(DatalakeMultiplexerTest, WritesDataFiles) {
.multiplex(
std::move(reader), kafka::offset{start_offset}, model::no_timeout, as)
.get();
auto result = std::move(multiplexer).finish().get();
record_multiplexer::finished_files files;
auto result = std::move(multiplexer).finish(files).get();

ASSERT_TRUE(result.has_value());
ASSERT_EQ(result.value().data_files.size(), 1);
ASSERT_EQ(files.data_files.size(), 1);
EXPECT_EQ(
result.value().data_files[0].local_file.row_count,
record_count * batch_count);
files.data_files[0].local_file.row_count, record_count * batch_count);
EXPECT_EQ(result.value().start_offset(), start_offset);
// Subtract one since offsets end at 0, and this is an inclusive range.
EXPECT_EQ(
Expand Down Expand Up @@ -305,7 +306,8 @@ TEST_F(RecordMultiplexerParquetTest, TestSimple) {
.multiplex(
std::move(reader), kafka::offset{start_offset}, model::no_timeout, as)
.get();
auto res = std::move(mux).finish().get();
record_multiplexer::finished_files files;
auto res = std::move(mux).finish(files).get();
ASSERT_FALSE(res.has_error()) << res.error();
EXPECT_EQ(res.value().start_offset(), start_offset());

Expand Down
3 changes: 2 additions & 1 deletion src/v/datalake/tests/record_multiplexer_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ struct counting_consumer {
return mux.do_multiplex(std::move(batch), kafka::offset{}, as);
}
ss::future<counting_consumer> end_of_stream() {
auto res = co_await std::move(mux).finish();
datalake::record_multiplexer::finished_files files;
auto res = co_await std::move(mux).finish(files);
if (res.has_error()) [[unlikely]] {
throw std::runtime_error(
fmt::format("failed to end stream: {}", res.error()));
Expand Down
Loading