Skip to content

Kafka GRPC consumer Group Support#598

Merged
jfallows merged 41 commits intoaklivity:developfrom
akrambek:feature/kafka-grpc-group
Dec 9, 2023
Merged

Kafka GRPC consumer Group Support#598
jfallows merged 41 commits intoaklivity:developfrom
akrambek:feature/kafka-grpc-group

Conversation

@akrambek
Copy link
Contributor

Description

Kafka GRPC consumer Group Support

Fixes #597

.merged(m -> m.consumer(mc -> mc
.progress(p -> p
.partitionId(partitionId)
.partitionOffset(partitionOffset + 1L))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please hoist this as a local variable called nextPartitionOffset to better document the code.

.merged()
.capabilities("FETCH_ONLY")
.topic("requests")
.groupId("zilla-kafka-grpc")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be configurable, I'm thinking as part of KafkaConfiguration, agree?

.typeId(kafkaTypeId)
.merged(m -> m.capabilities(c -> c.set(FETCH_ONLY))
.topic(condition.topic())
.groupId("zilla-kafka-grpc")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be configurable, I'm thinking as part of KafkaConfiguration, agree?

private static GroupIdSupplier defaultGroupId(
Configuration config)
{
return () -> String.format("zilla-%s-%s", "", "");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest zilla:%s-%s instead, probably more readable.
I'm thinking we'll want to do the same for mqtt-kafka and perhaps make it possible to prefix with mqtt: instead of zilla: if desired to make it easier to differentiate the consumer groups using standard kafka tools.

newBinding.routes.forEach(r ->
r.when.forEach(c ->
{
final String groupId = String.format("zilla-%s-%s", supplyNamespace.apply(binding.id),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KafkaConfiguration changes look good for supplyGroupId(), but we want have it return the pattern and then substitute in the namespace and local name for the binding via String.format here.

The default pattern returned by KafkaConfiguration.supplyGroupId() would be "zilla:%s-%s".

jfallows
jfallows previously approved these changes Dec 9, 2023
@jfallows jfallows merged commit 7e89117 into aklivity:develop Dec 9, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Kafka GRPC consumer Group Support

2 participants