Skip to content

[MQTT-Kafka] Randomly closing the connection in the middle of produce triggers the exception #559

@akrambek

Description

@akrambek

Describe the bug
Running the load-test branch of zilla-demos/taxi I would periodically run into the below exception. Some happen on startup, and some happen when the taxi-service (the MQTT producer) is stopped or restarted.

2023-11-01 17:31:54 Caused by: java.lang.IndexOutOfBoundsException: index=0 length=12596 capacity=11
2023-11-01 17:31:54     at org.agrona.core/org.agrona.concurrent.UnsafeBuffer.boundsCheck0(UnsafeBuffer.java:1692)
2023-11-01 17:31:54     at org.agrona.core/org.agrona.concurrent.UnsafeBuffer.boundsCheck(UnsafeBuffer.java:1698)
2023-11-01 17:31:54     at org.agrona.core/org.agrona.concurrent.UnsafeBuffer.putBytes(UnsafeBuffer.java:946)
2023-11-01 17:31:54     at io.aklivity.zilla.runtime.binding.mqtt.kafka/io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.OctetsFW$Builder.set(OctetsFW.java:77)
2023-11-01 17:31:54     at io.aklivity.zilla.runtime.binding.mqtt.kafka/io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.KafkaKeyFW$Builder.value(KafkaKeyFW.java:147)
2023-11-01 17:31:54     at io.aklivity.zilla.runtime.binding.mqtt.kafka/io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.stream.MqttKafkaSessionFactory$KafkaSignalStream.lambda$onKafkaSessionExpirySignal$2(MqttKafkaSessionFactory.java:1065)
2023-11-01 17:31:54     at io.aklivity.zilla.runtime.binding.mqtt.kafka/io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.KafkaMergedDataExFW$Builder.key(KafkaMergedDataExFW.java:292)
2023-11-01 17:31:54     at io.aklivity.zilla.runtime.binding.mqtt.kafka/io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.stream.MqttKafkaSessionFactory$KafkaSignalStream.lambda$onKafkaSessionExpirySignal$4(MqttKafkaSessionFactory.java:1064)
2023-11-01 17:31:54     at io.aklivity.zilla.runtime.binding.mqtt.kafka/io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.KafkaDataExFW$Builder.merged(KafkaDataExFW.java:384)
2023-11-01 17:31:54     at io.aklivity.zilla.runtime.binding.mqtt.kafka/io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.stream.MqttKafkaSessionFactory$KafkaSignalStream.onKafkaSessionExpirySignal(MqttKafkaSessionFactory.java:1060)
2023-11-01 17:31:54     at io.aklivity.zilla.runtime.binding.mqtt.kafka/io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.stream.MqttKafkaSessionFactory$KafkaSignalStream.onSignal(MqttKafkaSessionFactory.java:1045)
2023-11-01 17:31:54     at io.aklivity.zilla.runtime.binding.mqtt.kafka/io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.stream.MqttKafkaSessionFactory$KafkaSignalStream.onSignalMessage(MqttKafkaSessionFactory.java:1033)
2023-11-01 17:31:54     at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.registry.DispatchAgent.handleReadInitial(DispatchAgent.java:1119)
2023-11-01 17:31:54     at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.registry.DispatchAgent.handleRead(DispatchAgent.java:1041)
2023-11-01 17:31:54     at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.concurent.ManyToOneRingBuffer.read(ManyToOneRingBuffer.java:181)
2023-11-01 17:31:54     at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.registry.DispatchAgent.doWork(DispatchAgent.java:701)

To Reproduce
Steps to reproduce the behavior:

  1. checkout zilla-demos/taxi@mqtt-load-test
  2. start the demo ./startup.sh
  3. Once the setup is running stable the metric stream_active_received{namespace="zilla-taxi-demo",binding="mqtt_server"} should be 500
  4. Let it run

Desktop (please complete the following information):

  • OS: MacOS
  • Zilla: 0.9.55

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions