datalake: clean up files after translation error#27737
datalake: clean up files after translation error#27737andrwng merged 2 commits intoredpanda-data:devfrom
Conversation
This will be useful to inject transient errors to the record multiplexer (e.g. schema registry errors) to fail translation, rather than DLQ records. Will be used in an upcoming test to ensure we clean up local files after such a failure.
bharathv
left a comment
There was a problem hiding this comment.
lgtm few nits, nice find.
src/v/datalake/record_multiplexer.h
Outdated
| * progress. | ||
| */ | ||
| ss::future<result<write_result, writer_error>> finish() &&; | ||
| ss::future<result<write_result, writer_error>> finish(finished_files*) &&; |
There was a problem hiding this comment.
any reason to pass a pointer vs passing a reference instead? (can never be null?).
There was a problem hiding this comment.
Nah not really, habit from an older code style where outparams were pointers. Will change
src/v/datalake/translation_task.cc
Outdated
| mux_result.error(), | ||
| files.data_files.size(), | ||
| files.dlq_files.size()); | ||
| [[maybe_unused]] auto del_res = co_await delete_data_and_dlq_files( |
src/v/datalake/translation_task.cc
Outdated
| ss::abort_source& as) && { | ||
| auto mux_result = co_await std::move(_multiplexer).finish(); | ||
| record_multiplexer::finished_files files; | ||
| auto mux_result = co_await std::move(_multiplexer).finish(&files); |
We previously didn't clean up files record_multiplexer::finish() returned errors. Since the returned success result is previously what held the local files, this commit updates the signature to collect the files in finish() (regardless of whether the operation succeeds or not), and cleans up the files in translation_task::finish() and ::discard().
f24f8fb to
fd496bc
Compare
There was a problem hiding this comment.
Pull Request Overview
This PR fixes a bug where on-disk files were not properly cleaned up when the datalake record multiplexer's finish() method returned errors. The fix modifies the signature to ensure files are collected and cleaned up regardless of operation success.
Key changes:
- Modified
record_multiplexer::finish()to accept afinished_filesoutput parameter for collecting files regardless of success/failure - Added cleanup logic in
translation_task::finish()and::discard()to delete local files when errors occur - Updated all test files to accommodate the new
finish()signature
Reviewed Changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| src/v/datalake/translation_task.cc | Core fix: adds file cleanup on error and updates finish/discard methods to use new multiplexer signature |
| src/v/datalake/record_multiplexer.h | Changes multiplexer interface to accept finished_files output parameter |
| src/v/datalake/record_multiplexer.cc | Updates implementation to populate finished_files parameter instead of result struct |
| src/v/datalake/record_schema_resolver.h | Adds test resolver class for injecting errors in tests |
| src/v/datalake/record_schema_resolver.cc | Implements test resolver error injection functionality |
| src/v/datalake/tests/translation_task_test.cc | Adds comprehensive tests for cleanup behavior and updates existing test setup |
| src/v/datalake/tests/record_multiplexer_test.cc | Updates all test calls to use new finish() signature |
| src/v/datalake/tests/record_multiplexer_bench.cc | Updates benchmark code for new signature |
| src/v/datalake/tests/gtest_record_multiplexer_test.cc | Updates Google Test cases for new signature |
| } | ||
| if (dlq_result.has_error()) { | ||
| vlog( | ||
| log.warn, "error deleting local dlq files - {}", data_result.error()); |
There was a problem hiding this comment.
Wrong variable used in error message. Should be dlq_result.error() instead of data_result.error() since this is logging the DLQ file deletion error.
| log.warn, "error deleting local dlq files - {}", data_result.error()); | |
| log.warn, "error deleting local dlq files - {}", dlq_result.error()); |
CI test resultstest results on build#73007
|
|
/backport v25.2.x |
|
/backport v25.1.x |
|
Failed to create a backport PR to v25.2.x branch. I tried: |
|
Failed to create a backport PR to v25.1.x branch. I tried: |
We previously didn't clean up files record_multiplexer::finish()
returned errors. Since the returned success result is previously what
held the local files, this commit updates the signature to collect the
files in finish() (regardless of whether the operation succeeds or not),
and cleans up the files in translation_task::finish() and ::discard().
Backports Required
Release Notes
Bug Fixes