Skip to content

Zilla crashes when it tries to send flush on retain stream #770

@akrambek

Description

@akrambek

Describe the bug
Running the taxi-demo and load_test.sh script simulates a large number of connected MQTT clients, producing ~100k messages within a few min. Zilla crashes with the below error:

2024-01-31 11:21:43 Caused by: java.lang.NullPointerException: Cannot invoke "io.aklivity.zilla.runtime.binding.kafka.internal.stream.KafkaMergedFactory$KafkaUnmergedProduceStream.doProduceInitialFlush(long, int, io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaMergedFlushExFW)" because "producer" is null
2024-01-31 11:21:43     at io.aklivity.zilla.runtime.binding.kafka/io.aklivity.zilla.runtime.binding.kafka.internal.stream.KafkaMergedFactory$KafkaMergedStream.onMergedFetchFlush(KafkaMergedFactory.java:1443)
2024-01-31 11:21:43     at io.aklivity.zilla.runtime.binding.kafka/io.aklivity.zilla.runtime.binding.kafka.internal.stream.KafkaMergedFactory$KafkaMergedStream.onMergedInitialFlush(KafkaMergedFactory.java:1381)
2024-01-31 11:21:43     at io.aklivity.zilla.runtime.binding.kafka/io.aklivity.zilla.runtime.binding.kafka.internal.stream.KafkaMergedFactory$KafkaMergedStream.onMergedMessage(KafkaMergedFactory.java:1136)
2024-01-31 11:21:43     at io.aklivity.zilla.runtime.binding.kafka/io.aklivity.zilla.runtime.binding.kafka.internal.stream.KafkaMergedFactory.lambda$newStream$4(KafkaMergedFactory.java:248)
2024-01-31 11:21:43     at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.handleReadInitial(EngineWorker.java:1085)
2024-01-31 11:21:43     at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.handleRead(EngineWorker.java:1041)
2024-01-31 11:21:43     at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.concurent.ManyToOneRingBuffer.read(ManyToOneRingBuffer.java:181)
2024-01-31 11:21:43     at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.doWork(EngineWorker.java:701)
2024-01-31 11:21:43     ... 3 more
2024-01-31 11:21:43     Suppressed: java.lang.Exception: [engine/data#7]        [0x07070000000000cf] streams=[consumeAt=0x00018c38 (0x0000000000018c38), produceAt=0x0001a050 (0x000000000001a050)]
2024-01-31 11:21:43             at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.doWork(EngineWorker.java:705)
2024-01-31 11:21:43             ... 3 more

To Reproduce
Steps to reproduce the behavior:

  1. Go to taxi-demo on the load-test branch
  2. Run docker compose build
  3. Follow the demo instructions to start the demo
  4. Review the load testing instructions
  5. with replication set to 300, run the load_test.sh script 2-5 times or until Zilla throws an error.

Expected behavior
Zilla should handle this many clients

Analysis

Initial analysis shows that KafkaRetainedProxy is sending flush before receiving window since merge hasn't complete creating unmerge producer stream for all partitions.

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions