[v25.3.x] direct_consumer: move offset update logic to fetch_next#29467
Merged
Conversation
direct_consumer_fixture_test now only asserts on initial conditions in one test to reduce redundancy. The initial assert is changed to permit offset only updates, but restrict those updates to at max one per partition. direct_consumer_test is changed to no longer assert on empty fetches.
Adds utility functions to find subscriptions, returning the reference as an optional on reference wrapper. Updates usages of these to use the helpers for cleanliness
Add reasonable initializers to fetched_partition_data. This is not a required correctness change, instead this is meant to preempt a writer from forgetting to set a value when filling in the fields of fetched_partition_data. Adds are_offsets_equal to source_partition_offsets which will check if tracked offsets are the same or different over time.
This commit does four things 1. fetch data will now be added to the queue even if it has no batches 2. offsets will be updated at the point in time at which fetch_next is called 3. a new filter is applied to remove fetches which contain no new information (all offsets are the same) 4. fetch next will retry fetching from the data_queue if the filters have removed everything from the resultant fetch
The results of partitioning were incorrectly named in filter_stale_subscriptions. Fix the names and additionally pull the iterators from the subspan rather than doing wasteful (and dangerous) iterator math.
Clarifies why a vassert is firing in direct_consumer: Direct consumers filtering is order dependent to reduce the amount of code spent checking nullopt
Fetcher is remarkably error prone to work on. To mitigate this, this commit splits the logic for processing fetch responses into new delegated functions. 1. do_process_partition_response: a sync static method which is responsible for taking a given fetch and determining what should be done with it - retriable errors -> update metadata - out of bounds -> reset offsets - unknown error -> bubble to caller - data fetch -> return data - offset only fetch -> return offsets 2. process_partition_response: an async wrapper for do_process_partition_response which updates the fetcher local state and incorporates the results into the resultant fetch response
Adds fetcher unit tests to ensure the decision logic in do_process_partition_response is per expectations.
Adds clarity to the meaning and implication of consistent partitions in vasserts. Namely, the code is written to check whether a partition is consistent before operating on it, allowing us to skip most checks on iterators to the end of a collection and nullopts from helper getting methods. This was done to significantly cut down on invalid entry checks.
bharathv
approved these changes
Jan 30, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Backport of PR #28309
These changes have baked for over a month without issue in dev. Backporting