Skip to content

Commit 5373f2d

Browse files
authored
KAFKA-15283:[2/2] Client support for OffsetFetch with topic ID (#19885)
Introduce Topic ID in existing OffsetFetch API, the patch only contains the client side changes. Reviewers: Lianet Magrans <[email protected]>
1 parent 289a010 commit 5373f2d

File tree

2 files changed

+124
-35
lines changed

2 files changed

+124
-35
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java

Lines changed: 38 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1004,38 +1004,37 @@ public boolean sameRequest(final OffsetFetchRequestState request) {
10041004
}
10051005

10061006
public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
1007-
List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics = requestedPartitions.stream()
1008-
.collect(Collectors.groupingBy(TopicPartition::topic))
1009-
.entrySet()
1010-
.stream()
1011-
.map(entry -> new OffsetFetchRequestData.OffsetFetchRequestTopics()
1007+
Map<String, Uuid> topicIds = metadata.topicIds();
1008+
boolean canUseTopicIds = true;
1009+
List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics = new ArrayList<>();
1010+
Map<String, List<TopicPartition>> tps = requestedPartitions.stream().collect(Collectors.groupingBy(TopicPartition::topic));
1011+
for (Map.Entry<String, List<TopicPartition>> entry : tps.entrySet()) {
1012+
String topic = entry.getKey();
1013+
Uuid topicId = topicIds.getOrDefault(topic, Uuid.ZERO_UUID);
1014+
if (Uuid.ZERO_UUID.equals(topicId)) {
1015+
canUseTopicIds = false;
1016+
}
1017+
topics.add(new OffsetFetchRequestData.OffsetFetchRequestTopics()
10121018
.setName(entry.getKey())
1019+
.setTopicId(topicId)
10131020
.setPartitionIndexes(entry.getValue().stream()
10141021
.map(TopicPartition::partition)
1015-
.collect(Collectors.toList())))
1016-
.collect(Collectors.toList());
1022+
.collect(Collectors.toList())));
1023+
}
10171024

1018-
OffsetFetchRequest.Builder builder = memberInfo.memberEpoch
1019-
.map(epoch -> OffsetFetchRequest.Builder.forTopicNames(
1020-
new OffsetFetchRequestData()
1021-
.setRequireStable(true)
1022-
.setGroups(List.of(
1023-
new OffsetFetchRequestData.OffsetFetchRequestGroup()
1024-
.setGroupId(groupId)
1025-
.setMemberId(memberInfo.memberId)
1026-
.setMemberEpoch(epoch)
1027-
.setTopics(topics))),
1028-
throwOnFetchStableOffsetUnsupported))
1029-
// Building request without passing member ID/epoch to leave the logic to choose
1030-
// default values when not present on the request builder.
1031-
.orElseGet(() -> OffsetFetchRequest.Builder.forTopicNames(
1032-
new OffsetFetchRequestData()
1033-
.setRequireStable(true)
1034-
.setGroups(List.of(
1035-
new OffsetFetchRequestData.OffsetFetchRequestGroup()
1036-
.setGroupId(groupId)
1037-
.setTopics(topics))),
1038-
throwOnFetchStableOffsetUnsupported));
1025+
OffsetFetchRequestData.OffsetFetchRequestGroup groupData = new OffsetFetchRequestData.OffsetFetchRequestGroup()
1026+
.setGroupId(groupId)
1027+
.setTopics(topics);
1028+
if (memberInfo.memberEpoch.isPresent()) {
1029+
groupData = groupData.setMemberId(memberInfo.memberId)
1030+
.setMemberEpoch(memberInfo.memberEpoch.get());
1031+
}
1032+
OffsetFetchRequestData data = new OffsetFetchRequestData()
1033+
.setRequireStable(true)
1034+
.setGroups(List.of(groupData));
1035+
OffsetFetchRequest.Builder builder = canUseTopicIds
1036+
? OffsetFetchRequest.Builder.forTopicIdsOrNames(data, throwOnFetchStableOffsetUnsupported, true)
1037+
: OffsetFetchRequest.Builder.forTopicNames(data, throwOnFetchStableOffsetUnsupported);
10391038
return buildRequestWithResponseHandling(builder);
10401039
}
10411040

@@ -1124,22 +1123,28 @@ private void onSuccess(final long currentTimeMs,
11241123
var failedRequestRegistered = false;
11251124

11261125
for (var topic : response.topics()) {
1126+
// If the topic id is used, the topic name is empty in the response.
1127+
String topicName = topic.name().isEmpty() ? metadata.topicNames().get(topic.topicId()) : topic.name();
11271128
for (var partition : topic.partitions()) {
11281129
var tp = new TopicPartition(
1129-
topic.name(),
1130+
topicName,
11301131
partition.partitionIndex()
11311132
);
11321133
var error = Errors.forCode(partition.errorCode());
1133-
if (error != Errors.NONE) {
1134-
log.debug("Failed to fetch offset for partition {}: {}", tp, error.message());
1134+
if (error != Errors.NONE || topicName == null) {
1135+
if (error != Errors.NONE) {
1136+
log.debug("Failed to fetch offset for partition {}: {}", tp, error.message());
1137+
} else { // unknown topic name
1138+
log.debug("Failed to fetch offset, topic does not exist");
1139+
}
11351140

11361141
if (!failedRequestRegistered) {
11371142
onFailedAttempt(currentTimeMs);
11381143
failedRequestRegistered = true;
11391144
}
11401145

1141-
if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
1142-
future.completeExceptionally(new KafkaException("Topic or Partition " + tp + " does not exist"));
1146+
if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION || error == Errors.UNKNOWN_TOPIC_ID || topicName == null) {
1147+
future.completeExceptionally(new KafkaException("Topic does not exist"));
11431148
return;
11441149
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
11451150
unauthorizedTopics.add(tp.topic());

clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java

Lines changed: 86 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
import static org.junit.jupiter.api.Assertions.assertNotEquals;
8989
import static org.junit.jupiter.api.Assertions.assertNotNull;
9090
import static org.junit.jupiter.api.Assertions.assertNull;
91+
import static org.junit.jupiter.api.Assertions.assertThrows;
9192
import static org.junit.jupiter.api.Assertions.assertTrue;
9293
import static org.junit.jupiter.api.Assertions.fail;
9394
import static org.mockito.ArgumentMatchers.any;
@@ -751,6 +752,61 @@ public void testOffsetFetchRequestEnsureDuplicatedRequestSucceed() {
751752
assertEmptyPendingRequests(commitRequestManager);
752753
}
753754

755+
@Test
756+
public void testOffsetFetchRequestShouldSucceedWithTopicId() {
757+
CommitRequestManager commitRequestManager = create(true, 100);
758+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
759+
Uuid topicId = Uuid.randomUuid();
760+
when(metadata.topicIds()).thenReturn(Map.of("t1", topicId));
761+
when(metadata.topicNames()).thenReturn(Map.of(topicId, "t1"));
762+
Set<TopicPartition> partitions = new HashSet<>();
763+
partitions.add(new TopicPartition("t1", 0));
764+
765+
List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> futures = sendAndVerifyDuplicatedOffsetFetchRequests(
766+
commitRequestManager,
767+
partitions,
768+
2,
769+
Errors.NONE,
770+
true,
771+
topicId);
772+
futures.forEach(f -> {
773+
assertTrue(f.isDone());
774+
assertFalse(f.isCompletedExceptionally());
775+
});
776+
// expecting the buffers to be emptied after being completed successfully
777+
commitRequestManager.poll(0);
778+
assertEmptyPendingRequests(commitRequestManager);
779+
}
780+
781+
@Test
782+
public void testOffsetFetchRequestShouldFailWithTopicIdWhenMetadataUnknownResponseTopicId() {
783+
CommitRequestManager commitRequestManager = create(true, 100);
784+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
785+
Uuid topicId = Uuid.randomUuid();
786+
when(metadata.topicIds()).thenReturn(Map.of("t1", topicId));
787+
// Mock the scenario where the topicID from the response is not in the metadata.
788+
when(metadata.topicNames()).thenReturn(Map.of());
789+
Set<TopicPartition> partitions = new HashSet<>();
790+
partitions.add(new TopicPartition("t1", 0));
791+
792+
List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> futures = sendAndVerifyDuplicatedOffsetFetchRequests(
793+
commitRequestManager,
794+
partitions,
795+
1,
796+
Errors.NONE,
797+
true,
798+
topicId);
799+
futures.forEach(f -> {
800+
assertTrue(f.isDone());
801+
assertTrue(f.isCompletedExceptionally());
802+
ExecutionException exception = assertThrows(ExecutionException.class, f::get);
803+
assertInstanceOf(KafkaException.class, exception.getCause());
804+
});
805+
// expecting the buffers to be emptied after being completed successfully
806+
commitRequestManager.poll(0);
807+
assertEmptyPendingRequests(commitRequestManager);
808+
}
809+
754810
@ParameterizedTest
755811
@MethodSource("offsetFetchExceptionSupplier")
756812
public void testOffsetFetchRequestErroredRequests(final Errors error) {
@@ -1423,6 +1479,7 @@ private static Stream<Arguments> offsetFetchExceptionSupplier() {
14231479
Arguments.of(Errors.REQUEST_TIMED_OUT, TimeoutException.class),
14241480
Arguments.of(Errors.UNSTABLE_OFFSET_COMMIT, TimeoutException.class),
14251481
Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION, TimeoutException.class),
1482+
Arguments.of(Errors.UNKNOWN_TOPIC_ID, TimeoutException.class),
14261483

14271484
// Non-retriable errors should result in their specific exceptions
14281485
Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED, GroupAuthorizationException.class),
@@ -1599,6 +1656,7 @@ private static Stream<Arguments> partitionDataErrorSupplier() {
15991656
return Stream.of(
16001657
Arguments.of(Errors.UNSTABLE_OFFSET_COMMIT, true),
16011658
Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION, false),
1659+
Arguments.of(Errors.UNKNOWN_TOPIC_ID, false),
16021660
Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED, false),
16031661
Arguments.of(Errors.UNKNOWN_SERVER_ERROR, false));
16041662
}
@@ -1608,6 +1666,16 @@ private List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> sendAndV
16081666
final Set<TopicPartition> partitions,
16091667
int numRequest,
16101668
final Errors error) {
1669+
return sendAndVerifyDuplicatedOffsetFetchRequests(commitRequestManager, partitions, numRequest, error, false, Uuid.ZERO_UUID);
1670+
}
1671+
1672+
private List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> sendAndVerifyDuplicatedOffsetFetchRequests(
1673+
final CommitRequestManager commitRequestManager,
1674+
final Set<TopicPartition> partitions,
1675+
int numRequest,
1676+
final Errors error,
1677+
final boolean shouldUseTopicIds,
1678+
final Uuid topicId) {
16111679
List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> futures = new ArrayList<>();
16121680
long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
16131681
for (int i = 0; i < numRequest; i++) {
@@ -1616,8 +1684,14 @@ private List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> sendAndV
16161684

16171685
NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds());
16181686
assertEquals(1, res.unsentRequests.size());
1687+
1688+
assertEquals(shouldUseTopicIds, res.unsentRequests.get(0).requestBuilder().latestAllowedVersion() >= 10);
1689+
((OffsetFetchRequestData) res.unsentRequests.get(0).requestBuilder().build().data()).groups()
1690+
.forEach(group -> group.topics()
1691+
.forEach(topic -> assertEquals(shouldUseTopicIds, !topic.topicId().equals(Uuid.ZERO_UUID))));
1692+
16191693
res.unsentRequests.get(0).handler().onComplete(buildOffsetFetchClientResponse(res.unsentRequests.get(0),
1620-
partitions, error));
1694+
partitions, error, shouldUseTopicIds, topicId));
16211695
res = commitRequestManager.poll(time.milliseconds());
16221696
assertEquals(0, res.unsentRequests.size());
16231697
return futures;
@@ -1703,12 +1777,22 @@ private ClientResponse buildOffsetFetchClientResponse(
17031777
final NetworkClientDelegate.UnsentRequest request,
17041778
final Set<TopicPartition> topicPartitions,
17051779
final Errors error) {
1780+
return buildOffsetFetchClientResponse(request, topicPartitions, error, false, Uuid.ZERO_UUID);
1781+
}
1782+
1783+
private ClientResponse buildOffsetFetchClientResponse(
1784+
final NetworkClientDelegate.UnsentRequest request,
1785+
final Set<TopicPartition> topicPartitions,
1786+
final Errors error,
1787+
final boolean shouldUseTopicIds,
1788+
final Uuid topicId) {
17061789
OffsetFetchResponseData.OffsetFetchResponseGroup group = new OffsetFetchResponseData.OffsetFetchResponseGroup()
17071790
.setGroupId(DEFAULT_GROUP_ID)
17081791
.setErrorCode(error.code())
17091792
.setTopics(topicPartitions.stream().collect(Collectors.groupingBy(TopicPartition::topic)).entrySet().stream().map(entry ->
17101793
new OffsetFetchResponseData.OffsetFetchResponseTopics()
1711-
.setName(entry.getKey())
1794+
.setName(shouldUseTopicIds ? "" : entry.getKey())
1795+
.setTopicId(topicId)
17121796
.setPartitions(entry.getValue().stream().map(partition ->
17131797
new OffsetFetchResponseData.OffsetFetchResponsePartitions()
17141798
.setPartitionIndex(partition.partition())

0 commit comments

Comments
 (0)