Consumer group message acknowledgement support#538
Consumer group message acknowledgement support#538jfallows merged 62 commits intoaklivity:developfrom akrambek:feature/consumer-group-cont
Conversation
…artial data frame while computing crc32c value
| KafkaHeader[] headers; // INIT + FIN (produce), INIT only (fetch) | ||
| } | ||
|
|
||
| struct KafkaMergedConsumerDataExKafkaMergedConsumerDataEx |
| union KafkaMergedFlushEx switch (uint8) | ||
| { | ||
| case 252: kafka::stream::KafkaMergedConsumerFlushEx consumer; | ||
| case 8: kafka::stream::KafkaMergedOffsetCommitFlushEx offsetCommit; |
There was a problem hiding this comment.
This should stay as KafkaMergedConsumerFlushEx for consistency with KafkaMergedConsumerDataEx, right?
| struct KafkaConsumerDataEx | ||
| union KafkaConsumerDataEx switch (uint8) | ||
| { | ||
| case 253: kafka::stream::KafkaMergedConsumerDataEx group; |
There was a problem hiding this comment.
Merged should not be on Consumer, right?
|
|
||
| struct KafkaOffsetCommitDataEx | ||
| { | ||
| string16 topic; |
There was a problem hiding this comment.
The idea is to have one offset commit stream per group instead of one per topic?
| int64 timestamp = 0; // INIT only | ||
| int64 filters = -1; // INIT only | ||
| KafkaOffset partition; // INIT only | ||
| KafkaOffset[] progress; // INIT only |
There was a problem hiding this comment.
Is progress used by produce?
| { | ||
| int32 partitionId; | ||
| int64 partitionOffset; | ||
| KafkaOffset progress; |
There was a problem hiding this comment.
progress or partition? (actual question)
| struct KafkaConsumerOffsetCommitDataEx | ||
| { | ||
| KafkaOffset progress; | ||
| int32 committedLeaderEpoch; |
There was a problem hiding this comment.
Perhaps just leaderEpoch as we are in the context of Commit already here?
| int32 partitionId; | ||
| int64 partitionOffset; | ||
| KafkaOffset progress; | ||
| int32 committedLeaderEpoch; |
There was a problem hiding this comment.
Perhaps just leaderEpoch as we are in the context of Commit already here?
| { | ||
| case 253: kafka::stream::KafkaGroupFlushEx group; | ||
| case 255: kafka::stream::KafkaMergedFlushEx merged; | ||
| case 8: kafka::stream::KafkaOffsetCommitFlushEx offsetCommit; |
There was a problem hiding this comment.
Perhaps offset is sufficient here instead of offsetCommit?
| union KafkaConsumerDataEx switch (uint8) | ||
| { | ||
| case 253: kafka::stream::KafkaConsumerGroupDataEx group; | ||
| case 8: kafka::stream::KafkaConsumerOffsetCommitDataEx offsetCommit; |
There was a problem hiding this comment.
Perhaps offset is sufficient here instead of offsetCommit?
| read zilla:data.ext ${kafka:dataEx() | ||
| .typeId(zilla:id("kafka")) | ||
| .consumer() | ||
| .offsetCommit() |
There was a problem hiding this comment.
I think offset would read better here, as in
consumer()
.offset()
...
| write zilla:data.empty | ||
| write flush | ||
|
|
||
| read option zilla:ack 0 |
There was a problem hiding this comment.
My concern here is that reserved is 0 when sending the empty DATA frame, so WINDOW ack does not need to elevate the initialAck, making it difficult to tell the difference before or after ack.
Do you think it's an issue?
| .typeId(zilla:id("kafka")) | ||
| .offsetCommit() | ||
| .progress(0, 2) | ||
| .committedLeaderEpoch(0) |
There was a problem hiding this comment.
.leaderEpoch(0) reads better here.
| { | ||
| string16 topic; | ||
| KafkaOffset[] offsets; | ||
| KafkaTopicPartitionOffset[] partitions; |
There was a problem hiding this comment.
Since KafkaOffsetFetchBeginEx now includes topic, then KafkaOffsetFetchDataEx is already topic specific and doesn't need topic again, right?
I think that means we should use KafkaTopicPartition[] partitions on KafkaOffsetFetchDataEx and remove KafkaOffsetFetchTopicOffsets from kafka.idl, agree?
| KafkaKey hashKey; // INIT only | ||
| KafkaDelta delta; // INIT + FIN | ||
| KafkaHeader[] headers; // INIT + FIN (produce), INIT only (fetch) | ||
| KafkaHeader[] headers; // INIT only |
There was a problem hiding this comment.
I think the comment before might have been incorrect for fetch, since headers are at the end of a kafka message in the wire protocol, so merged directly over client fetch would send headers with FIN, whereas cache always sends headers with INIT, so this comment should probably be INIT + FIN.
| format("[merged] (%d) %d %s %d %d %d", | ||
| merged.deferred(), merged.timestamp(), asString(key.value()), | ||
| partition.partitionId(), partition.partitionOffset(), partition.latestOffset())); | ||
| format("[merged] (%d) %d %s %d %d %d", |
There was a problem hiding this comment.
Perhaps [merged] [fetch] ...?
| final KafkaOffsetFW partition = produce.partition(); | ||
|
|
||
| out.printf(verboseFormat, index, offset, timestamp, | ||
| format("[merged] (%d) %d %s %d %d %d", |
There was a problem hiding this comment.
Perhaps [merged] [produce] ...?
| { | ||
| final KafkaMergedDataExFW kafkaMergedDataEx = kafkaDataEx.merged(); | ||
| final int contentLength = payload.sizeof() + kafkaMergedDataEx.deferred(); | ||
| final KafkaMergedFetchDataExFW fetch = kafkaDataEx.merged().fetch(); |
There was a problem hiding this comment.
Please rename fetch to kafkaMergedFetchDataEx.
| kafkaMergedDataEx.fetch().progress() : null; | ||
| key = kafkaMergedDataEx != null ? kafkaMergedDataEx.fetch().key().value() : null; | ||
| final Array32FW<KafkaHeaderFW> headers = kafkaMergedDataEx != null ? | ||
| kafkaMergedDataEx.fetch().headers() : null; |
There was a problem hiding this comment.
Define kafkaMergedFetchDataEx to avoid calling kafkaMergedDataEx.fetch() repeatedly.
| @@ -176,7 +176,7 @@ read 20 # size | |||
| -1s # authentication bytes | |||
| 0L # session lifetime | |||
|
|
|||
| write 119 # size | |||
| write 82 # size | |||
There was a problem hiding this comment.
Comment has vertical misalignment
(also for other similar scripts with ${instanceId} replaced by "zilla")
| @Specification({ | ||
| "${app}/topic.offset.info/client", | ||
| "${app}/topic.offset.info/server"}) | ||
| public void shouldFetchPartitionOffsetInfo() throws Exception |
There was a problem hiding this comment.
Test scenario name does not match script scenario name.
| @Specification({ | ||
| "${net}/topic.offset.info/client", | ||
| "${net}/topic.offset.info/server"}) | ||
| public void shouldFetchPartitionOffsetInfo() throws Exception |
There was a problem hiding this comment.
Please check test scenario name.
| import org.kaazing.k3po.junit.annotation.Specification; | ||
| import org.kaazing.k3po.junit.rules.K3poRule; | ||
|
|
||
| public class OffsetFetchSaslIT |
There was a problem hiding this comment.
Please check test scenario names.
Also typo Sals should be Sasl, and Scarm should be Scram
|
|
||
|
|
| format("%s: %s", asString(h.name()), asString(h.value())))); | ||
| } | ||
|
|
||
|
|
| traceId, authorization, initialBud, reserved, extension); | ||
| } | ||
|
|
||
|
|
| final KafkaBeginExFW groupBeginEx = beginEx.typeId() == kafkaTypeId ? extension.get(kafkaBeginExRO::wrap) : null; | ||
| final KafkaGroupBeginExFW kafkaGroupBeginEx = groupBeginEx != null ? groupBeginEx.group() : null; |
There was a problem hiding this comment.
| final KafkaBeginExFW groupBeginEx = beginEx.typeId() == kafkaTypeId ? extension.get(kafkaBeginExRO::wrap) : null; | |
| final KafkaGroupBeginExFW kafkaGroupBeginEx = groupBeginEx != null ? groupBeginEx.group() : null; | |
| final KafkaBeginExFW kafkaBeginEx = beginEx.typeId() == kafkaTypeId ? extension.get(kafkaBeginExRO::wrap) : null; | |
| final KafkaGroupBeginExFW kafkaGroupBeginEx = groupBeginEx != null ? kafkaBeginEx.group() : null; |
| client.errorCode = errorCode; | ||
| client.decoder = decodeReject; | ||
| } | ||
|
|
There was a problem hiding this comment.
Suggest switch instead of if-else-if-else.
Description
This feature lets other kafka mapping ability to start fetching from the last observed message offset
Fixes #588