Fix audit log client on upgrade#28852
Conversation
When building a list of authorized operations for a metadata response, describe cluster response or a describe groups response, make the authz checks quiet so they do not clutter the log at INFO level. Signed-off-by: Michael Boquard <michael@redpanda.com>
There was a problem hiding this comment.
Pull request overview
This PR fixes issues related to internal Kafka client authentication and authorization logging by reducing log noise and handling errors more gracefully during metadata operations. The changes specifically address problems encountered when the audit log client upgrades or performs metadata refreshes.
Key changes:
- Adds error mitigation support for metadata refresh failures in the Kafka client
- Disables authorized operations requests for internal (non-SL) clients to reduce authorization failures
- Reduces log severity of authorization checks from INFO to DEBUG
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/rptest/tests/audit_log_test.py | Enables kafka/client trace logging and reduces audit log partitions from 3 to 1 for testing |
| src/v/security/audit/client.cc | Adds metadata error mitigation callback to kafka_client_impl constructor |
| src/v/kafka/server/handlers/details/security.h | Changes authorized_operations to use quiet authorization checks |
| src/v/kafka/client/configuration.h | Adds include_authorized_operations flag to control whether clients request authorized operations |
| src/v/kafka/client/configuration.cc | Sets include_authorized_operations to false for internal clients |
| src/v/kafka/client/cluster.h | Adds external_metadata_mitigate callback support for handling metadata update failures |
| src/v/kafka/client/cluster.cc | Implements metadata error mitigation and conditionally includes authorized operations based on config |
| src/v/kafka/client/client.h | Adds metadata_mitigater parameter to client constructors |
| src/v/kafka/client/client.cc | Passes metadata_mitigater through to cluster constructor |
| [this](std::exception_ptr eptr) { return mitigate_error(eptr); }, | ||
| [this](std::exception_ptr eptr) { return mitigate_error(eptr); }) {} |
There was a problem hiding this comment.
The same lambda [this](std::exception_ptr eptr) { return mitigate_error(eptr); } is duplicated on consecutive lines. This suggests the constructor now takes two mitigation callbacks where previously it took one. Consider extracting this to a named variable or using a helper function to avoid duplication and make the intent clearer.
| [this](std::exception_ptr eptr) { return mitigate_error(eptr); }, | |
| [this](std::exception_ptr eptr) { return mitigate_error(eptr); }) {} | |
| error_mitigator, | |
| error_mitigator) | |
| { | |
| auto error_mitigator = [this](std::exception_ptr eptr) { return mitigate_error(eptr); }; | |
| } |
| metadata_update mu; | ||
| if (describe_cluster_version) { | ||
| auto describe_cluster_resp = co_await dispatch_describe_cluster_request( | ||
| broker); |
There was a problem hiding this comment.
[nitpick] The indentation change here appears inconsistent with the original formatting. The opening of the function call should align with the assignment or follow project style conventions.
| broker); | |
| broker); |
f033e8d to
e23848d
Compare
|
Force push:
|
e23848d to
98f6061
Compare
|
Force push:
|
| , _external_mitigate(std::move(mitigater)) | ||
| , _cluster(std::make_unique<cluster>(cfg.connection_cfg)) | ||
| , _cluster( | ||
| std::make_unique<cluster>( | ||
| cfg.connection_cfg, std::move(metadata_mitigater))) |
There was a problem hiding this comment.
Could we reuse the same mitigater instead of introducing a separate one?
There was a problem hiding this comment.
noncopyable_function
There was a problem hiding this comment.
We could wrap that in a ss::shared_ptr or switch to a std::function
|
|
||
| vlog(_logger.debug, "Starting cluster metadata dispatch and update"); | ||
|
|
||
| auto f = ss::now(); |
There was a problem hiding this comment.
I think now that this is not a coroutine, the gate holder on L146 no longer works.
Could we leave this as a coroutine instead of using continuation style? It's so much easier to reason about coroutines than continuation style. Maybe ss::coroutine::as_future is what we need here to make the code clear with coroutines + exceptions.
There was a problem hiding this comment.
oh right good call thanks
| : audit_client(sink, controller) | ||
| , _client( | ||
| config::to_yaml(client_config, config::redact_secrets::no), | ||
| [this](std::exception_ptr eptr) { return mitigate_error(eptr); }, |
There was a problem hiding this comment.
I think the SR client would have the same issue as well, right? I think it would be simpler to just reuse the existing mitigator function because I can't see why we wouldn't want to mitigate metadata update errors if we want to mitigate all other errors.
There was a problem hiding this comment.
+1 for reusing the existing mitigator instead of passing another one.
| security = AuditLogTestSecurityConfig.default_credentials() | ||
| audit_config = AuditLogConfig( | ||
| enabled=True, | ||
| num_partitions=3, |
There was a problem hiding this comment.
I think it would be worth extracting the num_partitions + num_brokers into constants with a note about the reasoning why we need a lower num_partitions than num_brokers for the completeness of this test.
Is the reasoning that the audit log client prefers to send data to the partitions that are local to the node, and so this auth issue only surfaces if there are no local partitions to produce to?
There was a problem hiding this comment.
I'm also wondering if it would be worth disabling the "prefer partitions led by the local node" optimization in debug builds to get more test coverage over the scenario when the data needs to go to a different node.
There was a problem hiding this comment.
Is the reasoning that the audit log client prefers to send data to the partitions that are local to the node, and so this auth issue only surfaces if there are no local partitions to produce to?
Partially - there really wasn't an issue in the first place - the logs were mostly scary and hard to reason about whether there was an issue or not. Reducing the partition count will help ensure that our mitigation efforts work and I like the idea of disabling the "preference" on debug builds.
Retry command for Build#77406please wait until all jobs are finished before running the slash command |
IoannisRP
left a comment
There was a problem hiding this comment.
A couple of question on the mitigate on the update_metadata path...
- Is it there just for the
informcall to happen earlier? i.e. we don't have to wait for a adispatchcall to end up callinginform dispatchcalls happen with a retry. The first fails, informs and then it "succeeds".
There is no retry aroundupdate_metadata, is there? Is the logic that the first fails and the next (either timer or mitigator call)update_metadatasucceeds?
| .include_cluster_authorized_operations | ||
| = _config.include_authorized_operations ? true : false, | ||
| .include_topic_authorized_operations | ||
| = _config.include_authorized_operations ? true : false}}, |
There was a problem hiding this comment.
nit:
| .include_cluster_authorized_operations | |
| = _config.include_authorized_operations ? true : false, | |
| .include_topic_authorized_operations | |
| = _config.include_authorized_operations ? true : false}}, | |
| .include_cluster_authorized_operations | |
| = _config.include_authorized_operations(), | |
| .include_topic_authorized_operations | |
| = _config.include_authorized_operations()}}, |
same below
| .handle_exception([this](std::exception_ptr ex) { | ||
| vlog( | ||
| _logger.warn, | ||
| "External metadata mitigation failed - {}", |
There was a problem hiding this comment.
if the external mitigator is set, we wouldn't reach the warning bellow. Good to have the context of what failed in the warning, as well.
| "External metadata mitigation failed - {}", | |
| "Failed to dispatch metadata request: External metadata mitigation failed - {}", |
| : audit_client(sink, controller) | ||
| , _client( | ||
| config::to_yaml(client_config, config::redact_secrets::no), | ||
| [this](std::exception_ptr eptr) { return mitigate_error(eptr); }, |
There was a problem hiding this comment.
+1 for reusing the existing mitigator instead of passing another one.
The issue is that brokers may shut down and come back and in that time they'll lose the ephemeral credentials they previous held, so we need to react to these situations with
Previously, the behavior would have been that the metadata request would have failed and thrown an exception and continued the retry.. The changes within |
I have seen the same behavior in SR, as well. It is normally resolved through the next
which path are we discussing? In the client, the The paths that I see that don't do mitigation (so no Is it the periodic metadata update that is the target of this fix? i.e. to not have logs full of sasl errors, even when not in the presence of other calls? |
Not all uses of the kafka client need metadata or describe_cluster requests to include the bitmask of authorized operations. If the authenticated principal used by the client doesn't have the appropriate level of authorization to get that information this clutters up the broker logs with failed authorization checks. This new option permits users of k/client to opt out of getting that information. Signed-off-by: Michael Boquard <michael@redpanda.com>
All uses of the kafka client outside of Shadow Linking do not require the authorized operations flag to be set. This change ensures that any use disables that flag. Signed-off-by: Michael Boquard <michael@redpanda.com>
98f6061 to
4984b58
Compare
that would be the desire |
|
Force push:
|
| cluster::cluster( | ||
| connection_configuration config, | ||
| std::optional<external_metadata_mitigate> mitigater) |
There was a problem hiding this comment.
Corrected spelling of 'mitigater' to 'mitigator'.
| // The mitigater can be used to perform actions when metadata update fails. | ||
| explicit cluster( | ||
| connection_configuration config, | ||
| std::optional<external_metadata_mitigate> mitigater = std::nullopt); | ||
|
|
||
| // The mitigater can be used to perform actions when metadata update fails. |
There was a problem hiding this comment.
Corrected spelling of 'mitigater' to 'mitigator'.
| // The mitigater can be used to perform actions when metadata update fails. | |
| explicit cluster( | |
| connection_configuration config, | |
| std::optional<external_metadata_mitigate> mitigater = std::nullopt); | |
| // The mitigater can be used to perform actions when metadata update fails. | |
| // The mitigator can be used to perform actions when metadata update fails. | |
| explicit cluster( | |
| connection_configuration config, | |
| std::optional<external_metadata_mitigate> mitigater = std::nullopt); | |
| // The mitigator can be used to perform actions when metadata update fails. |
| // The mitigater can be used to perform actions when metadata update fails. | ||
| explicit cluster( | ||
| connection_configuration config, | ||
| std::optional<external_metadata_mitigate> mitigater = std::nullopt); | ||
|
|
||
| // The mitigater can be used to perform actions when metadata update fails. |
There was a problem hiding this comment.
Corrected spelling of 'mitigater' to 'mitigator'.
| // The mitigater can be used to perform actions when metadata update fails. | |
| explicit cluster( | |
| connection_configuration config, | |
| std::optional<external_metadata_mitigate> mitigater = std::nullopt); | |
| // The mitigater can be used to perform actions when metadata update fails. | |
| // The mitigator can be used to perform actions when metadata update fails. | |
| explicit cluster( | |
| connection_configuration config, | |
| std::optional<external_metadata_mitigate> mitigater = std::nullopt); | |
| // The mitigator can be used to perform actions when metadata update fails. |
| : client( | ||
| client_configuration::from_config_store(configuration(cfg)), | ||
| std::move(mitigater)) {} |
There was a problem hiding this comment.
Corrected spelling of 'mitigater' to 'mitigator'.
Retry command for Build#77539please wait until all jobs are finished before running the slash command |
| auto dispatch_f = co_await ss::coroutine::as_future( | ||
| do_dispatch_and_apply_metadata_updates(std::move(topics_request_list))); | ||
|
|
||
| if (dispatch_f.failed()) { | ||
| auto err = dispatch_f.get_exception(); | ||
| if (_external_metadata_mitigate) { | ||
| auto external_f = co_await ss::coroutine::as_future( | ||
| _external_metadata_mitigate.value()(err)); | ||
| if (external_f.failed()) { | ||
| auto external_err = external_f.get_exception(); | ||
| vlog( | ||
| _logger.warn, | ||
| "Failed to dispatch metadata request: External metadata " | ||
| "mitigation failed - {}", | ||
| external_err); | ||
| } | ||
| } else { | ||
| // If describe cluster isn't supported, then dispatch a metadata | ||
| // request with no topics at API version 10 in order to get the | ||
| // cluster level authorized operations | ||
| auto metadata_resp = co_await dispatch_metadata_request( | ||
| broker, {}, api_version(10)); | ||
| mu = to_metadata_update( | ||
| std::move(metadata_resp), topic_metadata_included_t::no); | ||
| vlog(_logger.warn, "Failed to dispatch metadata request - {}", err); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
it's a shame we can't reuse some of the "client/utils" here. The closest would be retry_with_mitigation, but it only does mitigation on retry, and if you add a retry, it would retry on every failure (not just sasl).
Signed-off-by: Michael Boquard <michael@redpanda.com>
4984b58 to
d7f6816
Compare
|
Force push:
|
|
/backport v25.3.x |
Fixes: CORE-14903
This PR does the following:
INFOtoDEBUGkafka::client::clusterto provide an error mitigation function to be called if there is an error refreshing metadata - reduces prevlance of sasl authentication failed error messagesBackports Required
Release Notes
Bug Fixes