Skip to content

Commit 963d54a

Browse files
authored
KAFKA-19945: Always set status field in StreamsGroupHeartbeat response (#21031)
The status field should always set in the response, even when empty, to ensure that the status gets reset on the client. We are not doing this due to an oversight - it is defined as nullable, and it is null when not set. So status does not clear correctly on the client, which ignores the field if it's null. This can cause the streams application to incorrectly timeout, if the source topic does not exist when the application is first started, but it is created after the application started. Otherwise, there is no noticable difference. Reviewers: Lucas Brutschy <[email protected]>
1 parent 7c4ae09 commit 963d54a

File tree

4 files changed

+139
-39
lines changed

4 files changed

+139
-39
lines changed

clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@
5353
{ "name": "TaskOffsetIntervalMs", "type": "int32", "versions": "0+",
5454
"about": "The interval in which the task changelog offsets on a client are updated on the broker. The offsets are sent with the next heartbeat after this time has passed." },
5555

56-
{ "name": "Status", "type": "[]Status", "versions": "0+", "nullableVersions": "0+", "default": "null",
57-
"about": "Indicate zero or more status for the group. Null if unchanged since last heartbeat." },
56+
{ "name": "Status", "type": "[]Status", "versions": "0+", "nullableVersions": "0+",
57+
"about": "Indicate zero or more status for the group." },
5858

5959
// The streams app knows which partitions to fetch from given this information
6060
{ "name": "ActiveTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null",

core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo
209209
assertNotNull(streamsGroupHeartbeatResponse, "StreamsGroupHeartbeatResponse should not be null")
210210
assertEquals(memberId, streamsGroupHeartbeatResponse.memberId())
211211
assertEquals(3, streamsGroupHeartbeatResponse.memberEpoch())
212-
assertEquals(null, streamsGroupHeartbeatResponse.status())
212+
assertEquals(List.empty.asJava, streamsGroupHeartbeatResponse.status())
213213
val expectedActiveTasks = List(
214214
new StreamsGroupHeartbeatResponseData.TaskIds()
215215
.setSubtopologyId("subtopology-1")

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2153,9 +2153,7 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream
21532153
)
21542154
));
21552155

2156-
if (!returnedStatus.isEmpty()) {
2157-
response.setStatus(returnedStatus);
2158-
}
2156+
response.setStatus(returnedStatus);
21592157
return new CoordinatorResult<>(records, new StreamsGroupHeartbeatResult(response, internalTopicsToBeCreated));
21602158
}
21612159

@@ -4215,7 +4213,8 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream
42154213
}
42164214
StreamsGroupHeartbeatResponseData response = new StreamsGroupHeartbeatResponseData()
42174215
.setMemberId(memberId)
4218-
.setMemberEpoch(memberEpoch);
4216+
.setMemberEpoch(memberEpoch)
4217+
.setStatus(List.of());
42194218

42204219
if (instanceId == null) {
42214220
StreamsGroupMember member = group.getMemberOrThrow(memberId);

0 commit comments

Comments
 (0)