Conversation
apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/BaseRedisSubscriber.java
Outdated
Show resolved
Hide resolved
apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/BaseRedisSubscriber.java
Show resolved
Hide resolved
Backend Tests - Integration Group 16 12 files 12 suites 4m 37s ⏱️ For more details on these errors, see this check. Results for commit 480c451. ♻️ This comment has been updated with latest results. |
…Subscriber Add runtime type check in processMessage() to guard against Redis delivering empty messages where the value is a Collections.emptyList() instead of a Map. Due to Java type erasure, this ClassCastException was not caught at compile time.
6100c31 to
480c451
Compare
ldaugusto
left a comment
There was a problem hiding this comment.
The fix looks correct and well scoped, good one!
My only comment is the reassigning of lambda parameters while not wrong, is feels off, in a quick read suggests we're changing values upstream. This repeats in many places on this PR, I believe it's better to replace them with more straightforward approaches, but not a blocker.
| "Removed consumer '{}', from group '{}', pendingMessages '{}'", | ||
| consumerId, config.getConsumerGroupName(), pendingMessages)) | ||
| .doOnSuccess(pendingMessages -> { | ||
| pendingMessages = Objects.requireNonNullElse(pendingMessages, 0L); |
There was a problem hiding this comment.
Nit: I don't fancy reassigning a lambda parameter, it works, but it feels off. Also this test looks overcomplicated as well. Aren't you just trying to do
var size = pendingMessages != null ? pendingMessages : 0;
There was a problem hiding this comment.
Fair point on the style — I went with reassigning the parameter intentionally so that all code below it uses the safe value, without leaving the original null in scope where someone could accidentally use it. That said, it's just a lambda parameter and a local var, so easy to change back if we prefer the other style later.
Details
Fixes a
ClassCastExceptionand aNullPointerExceptioninBaseRedisSubscriberobserved in production. Both errors caused ERROR-level log pollution and, in the case of theClassCastException, left malformed messages stuck in the Redis stream pending list — retrying forever.ClassCastException: Redisson's decoder chain can produce
Collections.emptyList()as a stream entry's body value instead of the expectedMap<String, M>. Java type erasure hides this at compile time; the bridge method checkcast fails at runtime. Fixed with a try-catch inprocessMessage()that returnsFAILURE—ClassCastExceptionis already inNON_RETRYABLE_EXCEPTIONS, so the existing failure path acks and removes the message without retry.NullPointerException:
Mono.doOnSuccessfires withnullwhen the Mono completes empty (e.g., long-poll timeout with no messages). The lambda calledmessages.size()onnull. Fixed withObjects.requireNonNullElsedefaulting to an empty collection, so the gauge resets to 0 on empty Monos instead of NPE'ing or staying stale. Applied consistently across alldoOnSuccesscallbacks that dereference the parameter (readMessages,claimPendingMessages,removeConsumer,ackAndRemoveMessages). One location (listPending) usesdoOnNextinstead since logging "success" on empty would be misleading there.Change checklist
Issues
AI-WATERMARK
AI-WATERMARK: yes
Testing
mvn test -Dtest="BaseRedisSubscriberUnitTest"— 17/17 pass (16 pre-existing + 1 new)shouldHandleEmptyListEntryValue: mocks readGroup returning a valid outer Map withCollections.emptyList()as the entry body. Verifies subscriber survives and continues processing.BaseRedisSubscriberTest— the subscriber polls before messages are published.Documentation
N/A