From ecbf16f4323f9905ea095b8c5594e98c4fedfbe1 Mon Sep 17 00:00:00 2001 From: bmaidics Date: Wed, 3 Jan 2024 16:35:33 +0100 Subject: [PATCH 1/2] Optimize memory allocation for mqtt-kafka offset tracking --- .../stream/MqttKafkaSessionFactory.java | 1 - .../stream/MqttKafkaSubscribeFactory.java | 47 ++++++++++++------- .../binding/mqtt/internal/MqttFunctions.java | 33 +++++++++---- .../main/resources/META-INF/zilla/mqtt.idl | 11 ----- .../mqtt/internal/MqttFunctionsTest.java | 23 +++++---- 5 files changed, 66 insertions(+), 49 deletions(-) diff --git a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java index ee84c2f9d5..368e5ceaaf 100644 --- a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java +++ b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java @@ -142,7 +142,6 @@ public class MqttKafkaSessionFactory implements MqttKafkaStreamFactory public static final int MQTT_NOT_AUTHORIZED = 0x87; public static final int MQTT_IMPLEMENTATION_SPECIFIC_ERROR = 0x83; public static final String MQTT_INVALID_SESSION_TIMEOUT_REASON = "Invalid session expiry interval"; - private static final String16FW EMPTY_STRING = new String16FW(""); static { diff --git a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeFactory.java b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeFactory.java index 08ef044b0c..0682c10625 100644 --- a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeFactory.java +++ b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeFactory.java @@ -82,7 +82,6 @@ import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttBeginExFW; import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttDataExFW; import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttFlushExFW; -import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttOffsetMetadataFW; import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttOffsetStateFlags; import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttSubscribeBeginExFW; import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttSubscribeFlushExFW; @@ -108,6 +107,7 @@ public class MqttKafkaSubscribeFactory implements MqttKafkaStreamFactory private static final int DATA_FLAG_INIT = 0x02; private static final int DATA_FLAG_FIN = 0x01; private static final OctetsFW EMPTY_OCTETS = new OctetsFW().wrap(new UnsafeBuffer(new byte[0]), 0, 0); + private static final String16FW EMPTY_STRING = new String16FW(""); private final OctetsFW emptyRO = new OctetsFW().wrap(new UnsafeBuffer(0L, 0), 0, 0); private final BeginFW beginRO = new BeginFW(); @@ -128,7 +128,6 @@ public class MqttKafkaSubscribeFactory implements MqttKafkaStreamFactory private final WindowFW.Builder windowRW = new WindowFW.Builder(); private final ResetFW.Builder resetRW = new ResetFW.Builder(); private final MqttSubscribeMessageFW.Builder mqttSubscribeMessageRW = new MqttSubscribeMessageFW.Builder(); - private final MqttOffsetMetadataFW.Builder mqttOffsetMetadataRW = new MqttOffsetMetadataFW.Builder(); private final ExtensionFW extensionRO = new ExtensionFW(); private final MqttBeginExFW mqttBeginExRO = new MqttBeginExFW(); @@ -138,7 +137,6 @@ public class MqttKafkaSubscribeFactory implements MqttKafkaStreamFactory private final KafkaFlushExFW kafkaFlushExRO = new KafkaFlushExFW(); private final KafkaHeaderFW kafkaHeaderRO = new KafkaHeaderFW(); private final MqttSubscribeMessageFW mqttSubscribeMessageRO = new MqttSubscribeMessageFW(); - private final MqttOffsetMetadataFW mqttOffsetMetadataRO = new MqttOffsetMetadataFW(); private final MqttDataExFW.Builder mqttDataExRW = new MqttDataExFW.Builder(); private final MqttFlushExFW.Builder mqttFlushExRW = new MqttFlushExFW.Builder(); @@ -1200,8 +1198,8 @@ else if (state == MqttOffsetStateFlags.INCOMPLETE) { p.partitionId(offset.partitionId).partitionOffset(offset.offset + 1); final IntArrayList incomplete = incompletePacketIds.get(offset.partitionId); - final String partitionMetadata = - incomplete == null || incomplete.isEmpty() ? "" : offSetMetadataListToString(incomplete); + final String16FW partitionMetadata = incomplete == null || incomplete.isEmpty() ? + EMPTY_STRING : offsetMetadataListToString(incomplete); p.metadata(partitionMetadata); }); f.correlationId(correlationId); @@ -1826,26 +1824,39 @@ public void flushDataIfNecessary( } } - //TODO: how to make these more efficient while keeping the internal object easily modifieable (not using FW)? private IntArrayList stringToOffsetMetadataList( String16FW metadata) { final IntArrayList metadataList = new IntArrayList(); - UnsafeBuffer buffer = new UnsafeBuffer(BitUtil.fromHex(metadata.asString())); - final MqttOffsetMetadataFW offsetMetadata = mqttOffsetMetadataRO.wrap(buffer, 0, buffer.capacity()); - offsetMetadata.metadata().forEach(m -> metadataList.add(m.packetId())); + int offset = 0; + final DirectBuffer buffer = metadata.value(); + byte version = buffer.getByte(offset++); + for (; offset < buffer.capacity(); offset += BitUtil.SIZE_OF_SHORT) + { + metadataList.add((int) buffer.getShort(offset)); + } + return metadataList; } - private String offSetMetadataListToString( + private String16FW offsetMetadataListToString( IntArrayList metadataList) { - mqttOffsetMetadataRW.wrap(offsetBuffer, 0, offsetBuffer.capacity()); - metadataList.forEach(m -> mqttOffsetMetadataRW.metadataItem(mi -> mi.packetId(m))); - final MqttOffsetMetadataFW offsetMetadata = mqttOffsetMetadataRW.build(); - final byte[] array = new byte[offsetMetadata.sizeof()]; - offsetMetadata.buffer().getBytes(offsetMetadata.offset(), array); - return BitUtil.toHex(array); + final int length = metadataList.size() * BitUtil.SIZE_OF_SHORT + 1; + final int capacity = BitUtil.SIZE_OF_SHORT + length; + int offset = 0; + + offsetBuffer.putShort(offset, (short) length); + offset += BitUtil.SIZE_OF_SHORT; + offsetBuffer.putByte(offset++, (byte) 1); + + for (int value : metadataList) + { + offsetBuffer.putShort(offset, (short) value); + offset += BitUtil.SIZE_OF_SHORT; + } + + return new String16FW().wrap(offsetBuffer, 0, capacity); } final class KafkaRetainedProxy extends KafkaProxy @@ -1972,8 +1983,8 @@ protected void doKafkaConsumerFlush( { p.partitionId(offset.partitionId).partitionOffset(offset.offset + 1); final IntArrayList incomplete = incompletePacketIds.get(offset.partitionId); - final String partitionMetadata = - incomplete == null || incomplete.isEmpty() ? "" : offSetMetadataListToString(incomplete); + final String16FW partitionMetadata = incomplete == null || incomplete.isEmpty() ? + EMPTY_STRING : offsetMetadataListToString(incomplete); p.metadata(partitionMetadata); }); f.correlationId(correlationId); diff --git a/specs/binding-mqtt.spec/src/main/java/io/aklivity/zilla/specs/binding/mqtt/internal/MqttFunctions.java b/specs/binding-mqtt.spec/src/main/java/io/aklivity/zilla/specs/binding/mqtt/internal/MqttFunctions.java index 61e1124078..def1c15c09 100644 --- a/specs/binding-mqtt.spec/src/main/java/io/aklivity/zilla/specs/binding/mqtt/internal/MqttFunctions.java +++ b/specs/binding-mqtt.spec/src/main/java/io/aklivity/zilla/specs/binding/mqtt/internal/MqttFunctions.java @@ -26,6 +26,7 @@ import org.agrona.BitUtil; import org.agrona.DirectBuffer; import org.agrona.MutableDirectBuffer; +import org.agrona.collections.IntArrayList; import org.agrona.concurrent.UnsafeBuffer; import org.kaazing.k3po.lang.el.BytesMatcher; import org.kaazing.k3po.lang.el.Function; @@ -53,7 +54,6 @@ import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttDataExFW; import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttExtensionKind; import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttFlushExFW; -import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttOffsetMetadataFW; import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttOffsetStateFlags; import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttPublishBeginExFW; import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttPublishDataExFW; @@ -848,27 +848,42 @@ public byte[] build() public static final class MqttOffsetMetadataBuilder { - private final MqttOffsetMetadataFW.Builder offsetMetadataRW = new MqttOffsetMetadataFW.Builder(); + final MutableDirectBuffer writeBuffer; + final IntArrayList packetIds; + + byte version = 1; + private MqttOffsetMetadataBuilder() { - MutableDirectBuffer writeBuffer = new UnsafeBuffer(new byte[1024 * 8]); - offsetMetadataRW.wrap(writeBuffer, 0, writeBuffer.capacity()); + writeBuffer = new UnsafeBuffer(new byte[1024 * 8]); + packetIds = new IntArrayList(); } public MqttOffsetMetadataBuilder metadata( int packetId) { - offsetMetadataRW.metadataItem(f -> f.packetId(packetId)); + packetIds.add(packetId); return this; } public String build() { - final MqttOffsetMetadataFW offsetMetadata = offsetMetadataRW.build(); - final byte[] array = new byte[offsetMetadata.sizeof()]; - offsetMetadata.buffer().getBytes(offsetMetadata.offset(), array); - return BitUtil.toHex(array); + final int length = packetIds.size() * BitUtil.SIZE_OF_SHORT + 1; + final int capacity = BitUtil.SIZE_OF_SHORT + length; + int offset = 0; + + writeBuffer.putShort(offset, (short) length); + offset += BitUtil.SIZE_OF_SHORT; + writeBuffer.putByte(offset++, (byte) 1); + + for (int value : packetIds) + { + writeBuffer.putShort(offset, (short) value); + offset += BitUtil.SIZE_OF_SHORT; + } + + return new String16FW().wrap(writeBuffer, 0, capacity).asString(); } } diff --git a/specs/binding-mqtt.spec/src/main/resources/META-INF/zilla/mqtt.idl b/specs/binding-mqtt.spec/src/main/resources/META-INF/zilla/mqtt.idl index 9c44728e19..2aa9c95025 100644 --- a/specs/binding-mqtt.spec/src/main/resources/META-INF/zilla/mqtt.idl +++ b/specs/binding-mqtt.spec/src/main/resources/META-INF/zilla/mqtt.idl @@ -248,16 +248,5 @@ scope mqtt COMPLETE(0), INCOMPLETE(1) } - - struct MqttOffsetState - { - uint16 packetId; - } - - struct MqttOffsetMetadata - { - uint8 version = 1; - MqttOffsetState[] metadata; - } } } diff --git a/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/internal/MqttFunctionsTest.java b/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/internal/MqttFunctionsTest.java index b40abe4eae..51262d3af4 100644 --- a/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/internal/MqttFunctionsTest.java +++ b/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/internal/MqttFunctionsTest.java @@ -26,6 +26,7 @@ import org.agrona.BitUtil; import org.agrona.DirectBuffer; +import org.agrona.collections.IntArrayList; import org.agrona.concurrent.UnsafeBuffer; import org.junit.Test; import org.kaazing.k3po.lang.el.BytesMatcher; @@ -35,10 +36,10 @@ import io.aklivity.zilla.specs.binding.mqtt.internal.types.MqttSessionSignalType; import io.aklivity.zilla.specs.binding.mqtt.internal.types.MqttSessionStateFW; import io.aklivity.zilla.specs.binding.mqtt.internal.types.MqttWillMessageFW; +import io.aklivity.zilla.specs.binding.mqtt.internal.types.String16FW; import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttBeginExFW; import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttDataExFW; import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttFlushExFW; -import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttOffsetMetadataFW; import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttResetExFW; public class MqttFunctionsTest @@ -1272,16 +1273,18 @@ public void shouldEncodeMqttOffsetMetadata() .metadata(2) .build(); - DirectBuffer buffer = new UnsafeBuffer(BitUtil.fromHex(state)); - MqttOffsetMetadataFW offsetMetadata = new MqttOffsetMetadataFW().wrap(buffer, 0, buffer.capacity()); + final IntArrayList metadataList = new IntArrayList(); + int offset = 0; + final DirectBuffer buffer = new String16FW(state).value(); + byte version = buffer.getByte(offset++); + for (; offset < buffer.capacity(); offset += BitUtil.SIZE_OF_SHORT) + { + metadataList.add((int) buffer.getShort(offset)); + } - assertNotNull(offsetMetadata.metadata() - .matchFirst(m -> - 1 == m.packetId())); - - assertNotNull(offsetMetadata.metadata() - .matchFirst(m -> - 2 == m.packetId())); + assertEquals(1, version); + assertEquals(1, (int) metadataList.get(0)); + assertEquals(2, (int) metadataList.get(1)); } @Test From babb66933b0492da5d79f3adf3d3d45cf74e1ea5 Mon Sep 17 00:00:00 2001 From: bmaidics Date: Sun, 14 Jan 2024 12:58:20 +0100 Subject: [PATCH 2/2] Address feedback --- .../stream/MqttKafkaSubscribeFactory.java | 37 +++++++------------ .../binding/mqtt/internal/MqttFunctions.java | 29 ++++----------- .../main/resources/META-INF/zilla/mqtt.idl | 7 ++++ .../mqtt/internal/MqttFunctionsTest.java | 17 ++++----- 4 files changed, 36 insertions(+), 54 deletions(-) diff --git a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeFactory.java b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeFactory.java index 0682c10625..c0b42eba14 100644 --- a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeFactory.java +++ b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeFactory.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.function.IntConsumer; import java.util.function.LongFunction; import java.util.function.LongUnaryOperator; import java.util.function.Supplier; @@ -82,6 +83,7 @@ import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttBeginExFW; import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttDataExFW; import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttFlushExFW; +import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttOffsetMetadataFW; import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttOffsetStateFlags; import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttSubscribeBeginExFW; import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttSubscribeFlushExFW; @@ -108,6 +110,7 @@ public class MqttKafkaSubscribeFactory implements MqttKafkaStreamFactory private static final int DATA_FLAG_FIN = 0x01; private static final OctetsFW EMPTY_OCTETS = new OctetsFW().wrap(new UnsafeBuffer(new byte[0]), 0, 0); private static final String16FW EMPTY_STRING = new String16FW(""); + private static final int OFFSET_METADATA_VERSION = 1; private final OctetsFW emptyRO = new OctetsFW().wrap(new UnsafeBuffer(0L, 0), 0, 0); private final BeginFW beginRO = new BeginFW(); @@ -128,6 +131,7 @@ public class MqttKafkaSubscribeFactory implements MqttKafkaStreamFactory private final WindowFW.Builder windowRW = new WindowFW.Builder(); private final ResetFW.Builder resetRW = new ResetFW.Builder(); private final MqttSubscribeMessageFW.Builder mqttSubscribeMessageRW = new MqttSubscribeMessageFW.Builder(); + private final MqttOffsetMetadataFW.Builder mqttOffsetMetadataRW = new MqttOffsetMetadataFW.Builder(); private final ExtensionFW extensionRO = new ExtensionFW(); private final MqttBeginExFW mqttBeginExRO = new MqttBeginExFW(); @@ -137,6 +141,7 @@ public class MqttKafkaSubscribeFactory implements MqttKafkaStreamFactory private final KafkaFlushExFW kafkaFlushExRO = new KafkaFlushExFW(); private final KafkaHeaderFW kafkaHeaderRO = new KafkaHeaderFW(); private final MqttSubscribeMessageFW mqttSubscribeMessageRO = new MqttSubscribeMessageFW(); + private final MqttOffsetMetadataFW mqttOffsetMetadataRO = new MqttOffsetMetadataFW(); private final MqttDataExFW.Builder mqttDataExRW = new MqttDataExFW.Builder(); private final MqttFlushExFW.Builder mqttFlushExRW = new MqttFlushExFW.Builder(); @@ -1828,35 +1833,21 @@ private IntArrayList stringToOffsetMetadataList( String16FW metadata) { final IntArrayList metadataList = new IntArrayList(); - int offset = 0; - final DirectBuffer buffer = metadata.value(); - byte version = buffer.getByte(offset++); - for (; offset < buffer.capacity(); offset += BitUtil.SIZE_OF_SHORT) - { - metadataList.add((int) buffer.getShort(offset)); - } - + UnsafeBuffer buffer = new UnsafeBuffer(BitUtil.fromHex(metadata.asString())); + final MqttOffsetMetadataFW offsetMetadata = mqttOffsetMetadataRO.wrap(buffer, 0, buffer.capacity()); + offsetMetadata.packetIds().forEachRemaining((IntConsumer) metadataList::add); return metadataList; } private String16FW offsetMetadataListToString( IntArrayList metadataList) { - final int length = metadataList.size() * BitUtil.SIZE_OF_SHORT + 1; - final int capacity = BitUtil.SIZE_OF_SHORT + length; - int offset = 0; - - offsetBuffer.putShort(offset, (short) length); - offset += BitUtil.SIZE_OF_SHORT; - offsetBuffer.putByte(offset++, (byte) 1); - - for (int value : metadataList) - { - offsetBuffer.putShort(offset, (short) value); - offset += BitUtil.SIZE_OF_SHORT; - } - - return new String16FW().wrap(offsetBuffer, 0, capacity); + mqttOffsetMetadataRW.wrap(offsetBuffer, 0, offsetBuffer.capacity()); + mqttOffsetMetadataRW.version(OFFSET_METADATA_VERSION); + metadataList.forEach(p -> mqttOffsetMetadataRW.appendPacketIds(p.shortValue())); + final MqttOffsetMetadataFW offsetMetadata = mqttOffsetMetadataRW.build(); + return new String16FW(BitUtil.toHex(offsetMetadata.buffer().byteArray(), + offsetMetadata.offset(), offsetMetadata.limit())); } final class KafkaRetainedProxy extends KafkaProxy diff --git a/specs/binding-mqtt.spec/src/main/java/io/aklivity/zilla/specs/binding/mqtt/internal/MqttFunctions.java b/specs/binding-mqtt.spec/src/main/java/io/aklivity/zilla/specs/binding/mqtt/internal/MqttFunctions.java index def1c15c09..3330a6dc75 100644 --- a/specs/binding-mqtt.spec/src/main/java/io/aklivity/zilla/specs/binding/mqtt/internal/MqttFunctions.java +++ b/specs/binding-mqtt.spec/src/main/java/io/aklivity/zilla/specs/binding/mqtt/internal/MqttFunctions.java @@ -26,7 +26,6 @@ import org.agrona.BitUtil; import org.agrona.DirectBuffer; import org.agrona.MutableDirectBuffer; -import org.agrona.collections.IntArrayList; import org.agrona.concurrent.UnsafeBuffer; import org.kaazing.k3po.lang.el.BytesMatcher; import org.kaazing.k3po.lang.el.Function; @@ -54,6 +53,7 @@ import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttDataExFW; import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttExtensionKind; import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttFlushExFW; +import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttOffsetMetadataFW; import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttOffsetStateFlags; import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttPublishBeginExFW; import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttPublishDataExFW; @@ -848,42 +848,29 @@ public byte[] build() public static final class MqttOffsetMetadataBuilder { - final MutableDirectBuffer writeBuffer; - final IntArrayList packetIds; + private final MqttOffsetMetadataFW.Builder offsetMetadataRW = new MqttOffsetMetadataFW.Builder(); byte version = 1; private MqttOffsetMetadataBuilder() { - writeBuffer = new UnsafeBuffer(new byte[1024 * 8]); - packetIds = new IntArrayList(); + MutableDirectBuffer writeBuffer = new UnsafeBuffer(new byte[1024 * 8]); + offsetMetadataRW.wrap(writeBuffer, 0, writeBuffer.capacity()); + offsetMetadataRW.version(version); } public MqttOffsetMetadataBuilder metadata( int packetId) { - packetIds.add(packetId); + offsetMetadataRW.appendPacketIds((short) packetId); return this; } public String build() { - final int length = packetIds.size() * BitUtil.SIZE_OF_SHORT + 1; - final int capacity = BitUtil.SIZE_OF_SHORT + length; - int offset = 0; - - writeBuffer.putShort(offset, (short) length); - offset += BitUtil.SIZE_OF_SHORT; - writeBuffer.putByte(offset++, (byte) 1); - - for (int value : packetIds) - { - writeBuffer.putShort(offset, (short) value); - offset += BitUtil.SIZE_OF_SHORT; - } - - return new String16FW().wrap(writeBuffer, 0, capacity).asString(); + final MqttOffsetMetadataFW offsetMetadata = offsetMetadataRW.build(); + return BitUtil.toHex(offsetMetadata.buffer().byteArray(), offsetMetadata.offset(), offsetMetadata.limit()); } } diff --git a/specs/binding-mqtt.spec/src/main/resources/META-INF/zilla/mqtt.idl b/specs/binding-mqtt.spec/src/main/resources/META-INF/zilla/mqtt.idl index 2aa9c95025..9640e967e8 100644 --- a/specs/binding-mqtt.spec/src/main/resources/META-INF/zilla/mqtt.idl +++ b/specs/binding-mqtt.spec/src/main/resources/META-INF/zilla/mqtt.idl @@ -248,5 +248,12 @@ scope mqtt COMPLETE(0), INCOMPLETE(1) } + + struct MqttOffsetMetadata + { + uint8 version = 1; + uint8 length; + int16[length] packetIds; + } } } diff --git a/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/internal/MqttFunctionsTest.java b/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/internal/MqttFunctionsTest.java index 51262d3af4..81094ab9a1 100644 --- a/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/internal/MqttFunctionsTest.java +++ b/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/internal/MqttFunctionsTest.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.util.Objects; +import java.util.function.IntConsumer; import org.agrona.BitUtil; import org.agrona.DirectBuffer; @@ -36,10 +37,10 @@ import io.aklivity.zilla.specs.binding.mqtt.internal.types.MqttSessionSignalType; import io.aklivity.zilla.specs.binding.mqtt.internal.types.MqttSessionStateFW; import io.aklivity.zilla.specs.binding.mqtt.internal.types.MqttWillMessageFW; -import io.aklivity.zilla.specs.binding.mqtt.internal.types.String16FW; import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttBeginExFW; import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttDataExFW; import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttFlushExFW; +import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttOffsetMetadataFW; import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttResetExFW; public class MqttFunctionsTest @@ -1274,15 +1275,11 @@ public void shouldEncodeMqttOffsetMetadata() .build(); final IntArrayList metadataList = new IntArrayList(); - int offset = 0; - final DirectBuffer buffer = new String16FW(state).value(); - byte version = buffer.getByte(offset++); - for (; offset < buffer.capacity(); offset += BitUtil.SIZE_OF_SHORT) - { - metadataList.add((int) buffer.getShort(offset)); - } - - assertEquals(1, version); + UnsafeBuffer buffer = new UnsafeBuffer(BitUtil.fromHex(state)); + MqttOffsetMetadataFW offsetMetadata = new MqttOffsetMetadataFW().wrap(buffer, 0, buffer.capacity()); + offsetMetadata.packetIds().forEachRemaining((IntConsumer) metadataList::add); + + assertEquals(1, offsetMetadata.version()); assertEquals(1, (int) metadataList.get(0)); assertEquals(2, (int) metadataList.get(1)); }