Skip to content

[OPIK-5647] [BE] fix: handle empty Redis stream messages in BaseRedisSubscriber#6161

Merged
andrescrz merged 2 commits intomainfrom
andrescrz/OPIK-5647-fix-empty-redis-stream-message-crash
Apr 9, 2026
Merged

[OPIK-5647] [BE] fix: handle empty Redis stream messages in BaseRedisSubscriber#6161
andrescrz merged 2 commits intomainfrom
andrescrz/OPIK-5647-fix-empty-redis-stream-message-crash

Conversation

@andrescrz
Copy link
Copy Markdown
Member

@andrescrz andrescrz commented Apr 9, 2026

Details

Fixes a ClassCastException and a NullPointerException in BaseRedisSubscriber observed in production. Both errors caused ERROR-level log pollution and, in the case of the ClassCastException, left malformed messages stuck in the Redis stream pending list — retrying forever.

ClassCastException: Redisson's decoder chain can produce Collections.emptyList() as a stream entry's body value instead of the expected Map<String, M>. Java type erasure hides this at compile time; the bridge method checkcast fails at runtime. Fixed with a try-catch in processMessage() that returns FAILUREClassCastException is already in NON_RETRYABLE_EXCEPTIONS, so the existing failure path acks and removes the message without retry.

NullPointerException: Mono.doOnSuccess fires with null when the Mono completes empty (e.g., long-poll timeout with no messages). The lambda called messages.size() on null. Fixed with Objects.requireNonNullElse defaulting to an empty collection, so the gauge resets to 0 on empty Monos instead of NPE'ing or staying stale. Applied consistently across all doOnSuccess callbacks that dereference the parameter (readMessages, claimPendingMessages, removeConsumer, ackAndRemoveMessages). One location (listPending) uses doOnNext instead since logging "success" on empty would be misleading there.

Change checklist

  • User facing
  • Documentation update

Issues

  • OPIK-5647

AI-WATERMARK

AI-WATERMARK: yes

  • Tools: Claude Code
  • Model(s): Claude Opus 4.6
  • Scope: Full implementation with deep Redisson source analysis (decompiled decoder chain)
  • Human verification: Code review + iterative design feedback + manual edits

Testing

  • mvn test -Dtest="BaseRedisSubscriberUnitTest" — 17/17 pass (16 pre-existing + 1 new)
  • New test shouldHandleEmptyListEntryValue: mocks readGroup returning a valid outer Map with Collections.emptyList() as the entry body. Verifies subscriber survives and continues processing.
  • Empty Mono scenario (long-poll timeout) is implicitly covered by every integration test in BaseRedisSubscriberTest — the subscriber polls before messages are published.

Documentation

N/A

@github-actions github-actions bot added java Pull requests that update Java code Backend tests Including test files, or tests related like configuration. labels Apr 9, 2026
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 9, 2026

Backend Tests - Integration Group 16

 12 files   12 suites   4m 37s ⏱️
184 tests 183 ✅ 0 💤 0 ❌ 1 🔥
183 runs  183 ✅ 0 💤 0 ❌

For more details on these errors, see this check.

Results for commit 480c451.

♻️ This comment has been updated with latest results.

@comet-ml comet-ml deleted a comment from baz-reviewer bot Apr 9, 2026
…Subscriber

Add runtime type check in processMessage() to guard against Redis delivering
empty messages where the value is a Collections.emptyList() instead of a Map.
Due to Java type erasure, this ClassCastException was not caught at compile time.
@andrescrz andrescrz force-pushed the andrescrz/OPIK-5647-fix-empty-redis-stream-message-crash branch from 6100c31 to 480c451 Compare April 9, 2026 15:35
@andrescrz andrescrz marked this pull request as ready for review April 9, 2026 15:37
@andrescrz andrescrz requested a review from a team as a code owner April 9, 2026 15:37
Copy link
Copy Markdown
Contributor

@ldaugusto ldaugusto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix looks correct and well scoped, good one!

My only comment is the reassigning of lambda parameters while not wrong, is feels off, in a quick read suggests we're changing values upstream. This repeats in many places on this PR, I believe it's better to replace them with more straightforward approaches, but not a blocker.

"Removed consumer '{}', from group '{}', pendingMessages '{}'",
consumerId, config.getConsumerGroupName(), pendingMessages))
.doOnSuccess(pendingMessages -> {
pendingMessages = Objects.requireNonNullElse(pendingMessages, 0L);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I don't fancy reassigning a lambda parameter, it works, but it feels off. Also this test looks overcomplicated as well. Aren't you just trying to do

var size = pendingMessages != null ? pendingMessages : 0;

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point on the style — I went with reassigning the parameter intentionally so that all code below it uses the safe value, without leaving the original null in scope where someone could accidentally use it. That said, it's just a lambda parameter and a local var, so easy to change back if we prefer the other style later.

@andrescrz andrescrz merged commit 9fe16f6 into main Apr 9, 2026
81 checks passed
@andrescrz andrescrz deleted the andrescrz/OPIK-5647-fix-empty-redis-stream-message-crash branch April 9, 2026 16:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Backend java Pull requests that update Java code tests Including test files, or tests related like configuration.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants