Skip to content

Commit 0d7af5b

Browse files
committed
grpc-kafka fetch changes
1 parent 2cbff6c commit 0d7af5b

File tree

12 files changed

+96
-3
lines changed

12 files changed

+96
-3
lines changed

runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/stream/GrpcKafkaProxyFactory.java

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -542,6 +542,11 @@ protected void onKafkaAbort(
542542
long traceId,
543543
long authorization)
544544
{
545+
if (!GrpcKafkaState.replyOpening(state))
546+
{
547+
doGrpcBegin(traceId, authorization, 0L, emptyRO);
548+
}
549+
545550
cleanup(traceId, authorization);
546551
}
547552

@@ -569,6 +574,15 @@ protected void onKafkaData(
569574
int length,
570575
OctetsFW extension)
571576
{
577+
if (!GrpcKafkaState.replyOpening(state))
578+
{
579+
final ExtensionFW dataEx = extension.get(extensionRO::tryWrap);
580+
final KafkaDataExFW kafkaDataEx =
581+
dataEx != null && dataEx.typeId() == kafkaTypeId ? extension.get(kafkaDataExRO::tryWrap) : null;
582+
583+
doGrpcBegin(traceId, authorization, 0L, kafkaDataEx.merged().fetch().headers());
584+
}
585+
572586
doGrpcData(traceId, authorization, budgetId, reserved, flags, buffer, offset, length);
573587
}
574588

@@ -623,6 +637,50 @@ private void doGrpcBegin(
623637
traceId, authorization, affinity, extension);
624638
}
625639

640+
private void doGrpcBegin(
641+
long traceId,
642+
long authorization,
643+
long affinity,
644+
Array32FW<KafkaHeaderFW> headers)
645+
{
646+
state = GrpcKafkaState.openingReply(state);
647+
648+
Array32FW.Builder<GrpcMetadataFW.Builder, GrpcMetadataFW> builder =
649+
grpcMetadataRW.wrap(metaBuffer, 0, metaBuffer.capacity());
650+
651+
headers.forEach(h ->
652+
{
653+
final OctetsFW name = h.name();
654+
final DirectBuffer buffer = name.buffer();
655+
final int offset = name.offset();
656+
final int limit = name.limit();
657+
buffer.getBytes(offset, headerPrefix);
658+
buffer.getBytes(limit - BIN_SUFFIX.length, headerSuffix);
659+
if (Arrays.equals(META_PREFIX, headerPrefix))
660+
{
661+
final GrpcType type = Arrays.equals(BIN_SUFFIX, headerSuffix) ? BASE64 : TEXT;
662+
final int nameLen = type == BASE64
663+
? h.nameLen() - META_PREFIX_LENGTH - BIN_SUFFIX_LENGTH
664+
: h.nameLen() - META_PREFIX_LENGTH;
665+
builder.item(m -> m
666+
.type(t -> t.set(type))
667+
.nameLen(nameLen)
668+
.name(name.value(), META_PREFIX_LENGTH, nameLen)
669+
.valueLen(h.valueLen())
670+
.value(h.value()));
671+
}
672+
});
673+
674+
GrpcBeginExFW beginEx = grpcBeginExRW
675+
.wrap(extBuffer, 0, extBuffer.capacity())
676+
.typeId(grpcTypeId)
677+
.metadata(builder.build())
678+
.build();
679+
680+
doBegin(grpc, originId, routedId, replyId, replySeq, replyAck, replyMax,
681+
traceId, authorization, affinity, beginEx);
682+
}
683+
626684
private void doGrpcData(
627685
long traceId,
628686
long authorization,
@@ -865,8 +923,6 @@ private void onKafkaBegin(
865923
int lenSize = len.sizeof();
866924
replyPad = result.fieldId().sizeof() + lenSize + partitions.sizeof();
867925
}
868-
869-
delegate.onKafkaBegin(traceId, authorization, extension);
870926
}
871927

872928
private void onKafkaData(
@@ -939,7 +995,7 @@ private void onKafkaData(
939995
}
940996

941997
int length = encodeProgress - encodeOffset;
942-
delegate.onKafkaData(traceId, authorization, budgetId, reserved, flags,
998+
delegate.onKafkaData(traceId, authorization, budgetId, reserved + length, flags,
943999
encodeBuffer, encodeOffset, length, extension);
9441000
}
9451001
}

specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/grpc/fetch/client.sent.read.abort.unary.rpc/client.rpt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
connect "zilla://streams/grpc0"
1717
option zilla:window 8192
1818
option zilla:transmission "half-duplex"
19+
option zilla:update "proactive"
1920

2021
write zilla:begin.ext ${grpc:beginEx()
2122
.typeId(zilla:id("grpc"))

specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/grpc/fetch/client.sent.write.abort.unary.rpc/client.rpt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
connect "zilla://streams/grpc0"
1717
option zilla:window 8192
1818
option zilla:transmission "half-duplex"
19+
option zilla:update "proactive"
1920

2021
write zilla:begin.ext ${grpc:beginEx()
2122
.typeId(zilla:id("grpc"))

specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/grpc/fetch/reject.request.body.unary.rpc/client.rpt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
connect "zilla://streams/grpc0"
1717
option zilla:window 8192
1818
option zilla:transmission "half-duplex"
19+
option zilla:update "proactive"
1920

2021
write zilla:begin.ext ${grpc:beginEx()
2122
.typeId(zilla:id("grpc"))

specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/grpc/fetch/reliable.unary.rpc/client.rpt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
connect "zilla://streams/grpc0"
1717
option zilla:window 8192
1818
option zilla:transmission "half-duplex"
19+
option zilla:update "proactive"
1920

2021
write zilla:begin.ext ${grpc:beginEx()
2122
.typeId(zilla:id("grpc"))
@@ -32,6 +33,12 @@ write flush
3233

3334
write close
3435

36+
read zilla:begin.ext ${grpc:matchBeginEx()
37+
.typeId(zilla:id("grpc"))
38+
.metadata("custom", "value")
39+
.metadata("BASE64", "customProperty", "dGVzdA==")
40+
.build()}
41+
3542
read ${grpc:protobuf()
3643
.string(1, "Hello World")
3744
.bytes(32767, grpc_kafka:messageId()

specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/grpc/fetch/reliable.unary.rpc/server.rpt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@ read zilla:data.empty
3232

3333
read closed
3434

35+
write zilla:begin.ext ${grpc:beginEx()
36+
.typeId(zilla:id("grpc"))
37+
.metadata("custom", "value")
38+
.metadata("BASE64", "customProperty", "dGVzdA==")
39+
.build()}
40+
3541
write ${grpc:protobuf()
3642
.string(1, "Hello World")
3743
.bytes(32767, grpc_kafka:messageId()

specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/grpc/fetch/unreliable.unary.rpc/client.rpt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
connect "zilla://streams/grpc0"
1717
option zilla:window 8192
1818
option zilla:transmission "half-duplex"
19+
option zilla:update "proactive"
1920

2021
write zilla:begin.ext ${grpc:beginEx()
2122
.typeId(zilla:id("grpc"))
@@ -31,6 +32,12 @@ write flush
3132

3233
write close
3334

35+
read zilla:begin.ext ${grpc:matchBeginEx()
36+
.typeId(zilla:id("grpc"))
37+
.metadata("custom", "value")
38+
.metadata("BASE64", "customProperty", "dGVzdA==")
39+
.build()}
40+
3441
read ${grpc:protobuf()
3542
.string(1, "Hello World")
3643
.bytes(32767, grpc_kafka:messageId()

specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/grpc/fetch/unreliable.unary.rpc/server.rpt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ read zilla:data.empty
3131

3232
read closed
3333

34+
write zilla:begin.ext ${grpc:beginEx()
35+
.typeId(zilla:id("grpc"))
36+
.metadata("custom", "value")
37+
.metadata("BASE64", "customProperty", "dGVzdA==")
38+
.build()}
39+
3440
write ${grpc:protobuf()
3541
.string(1, "Hello World")
3642
.bytes(32767, grpc_kafka:messageId()

specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/fetch/reliable.unary.rpc/client.rpt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ read zilla:data.ext ${kafka:matchDataEx()
5151
.progress(0, 2)
5252
.progress(1, 1)
5353
.key("test")
54+
.header("meta:custom", "value")
55+
.header("meta:customProperty-bin", "dGVzdA==")
5456
.build()
5557
.build()}
5658
read ${grpc:protobuf()

specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/fetch/reliable.unary.rpc/server.rpt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ write zilla:data.ext ${kafka:dataEx()
5454
.progress(0, 2)
5555
.progress(1, 1)
5656
.key("test")
57+
.header("meta:custom", "value")
58+
.header("meta:customProperty-bin", "dGVzdA==")
5759
.build()
5860
.build()}
5961
write ${grpc:protobuf()

0 commit comments

Comments
 (0)