Kafka GRPC consumer Group Support#598
Kafka GRPC consumer Group Support#598jfallows merged 41 commits intoaklivity:developfrom akrambek:feature/kafka-grpc-group
Conversation
…artial data frame while computing crc32c value
| .merged(m -> m.consumer(mc -> mc | ||
| .progress(p -> p | ||
| .partitionId(partitionId) | ||
| .partitionOffset(partitionOffset + 1L)))) |
There was a problem hiding this comment.
Please hoist this as a local variable called nextPartitionOffset to better document the code.
| .merged() | ||
| .capabilities("FETCH_ONLY") | ||
| .topic("requests") | ||
| .groupId("zilla-kafka-grpc") |
There was a problem hiding this comment.
This needs to be configurable, I'm thinking as part of KafkaConfiguration, agree?
| .typeId(kafkaTypeId) | ||
| .merged(m -> m.capabilities(c -> c.set(FETCH_ONLY)) | ||
| .topic(condition.topic()) | ||
| .groupId("zilla-kafka-grpc") |
There was a problem hiding this comment.
This needs to be configurable, I'm thinking as part of KafkaConfiguration, agree?
| private static GroupIdSupplier defaultGroupId( | ||
| Configuration config) | ||
| { | ||
| return () -> String.format("zilla-%s-%s", "", ""); |
There was a problem hiding this comment.
Suggest zilla:%s-%s instead, probably more readable.
I'm thinking we'll want to do the same for mqtt-kafka and perhaps make it possible to prefix with mqtt: instead of zilla: if desired to make it easier to differentiate the consumer groups using standard kafka tools.
| newBinding.routes.forEach(r -> | ||
| r.when.forEach(c -> | ||
| { | ||
| final String groupId = String.format("zilla-%s-%s", supplyNamespace.apply(binding.id), |
There was a problem hiding this comment.
KafkaConfiguration changes look good for supplyGroupId(), but we want have it return the pattern and then substitute in the namespace and local name for the binding via String.format here.
The default pattern returned by KafkaConfiguration.supplyGroupId() would be "zilla:%s-%s".
Description
Kafka GRPC consumer Group Support
Fixes #597