-
Notifications
You must be signed in to change notification settings - Fork 71
Description
Describe the bug
I use the following Avro schemas (key/value) for a topic:
{
"type": "record",
"namespace": "...",
"name": "DeviceKey",
"fields": [
{
"name": "region_id",
"type": "string"
},
{
"name": "device_id",
"type": "string"
}
]
}
{
"type": "record",
"namespace": "...",
"name": "DeviceValue",
"fields": [
{
"name": "properties",
"type": {
"type": "map",
"values": "string"
}
}
]
}
When using the Schema Registry catalog with Avro model, the app ultimately crashes with the following exception:
org.agrona.concurrent.AgentTerminationException: java.lang.InternalError: a fault occurred in an unsafe memory access operation
at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.doWork(EngineWorker.java:890)
at org.agrona.core/org.agrona.concurrent.AgentRunner.doWork(AgentRunner.java:304)
at org.agrona.core/org.agrona.concurrent.AgentRunner.workLoop(AgentRunner.java:296)
at org.agrona.core/org.agrona.concurrent.AgentRunner.run(AgentRunner.java:162)
at java.base/java.lang.Thread.run(Thread.java:1570)
Caused by: java.lang.InternalError: a fault occurred in an unsafe memory access operation
at io.aklivity.zilla.runtime.binding.kafka/io.aklivity.zilla.runtime.binding.kafka.internal.cache.KafkaCacheFile.writeInt(KafkaCacheFile.java:194)
at io.aklivity.zilla.runtime.binding.kafka/io.aklivity.zilla.runtime.binding.kafka.internal.cache.KafkaCachePartition.lambda$writeEntryFinish$3(KafkaCachePartition.java:612)
at io.aklivity.zilla.runtime.model.avro/io.aklivity.zilla.runtime.model.avro.internal.AvroReadConverterHandler.decodePayload(AvroReadConverterHandler.java:176)
at io.aklivity.zilla.runtime.catalog.schema.registry/io.aklivity.zilla.runtime.catalog.schema.registry.internal.handler.SchemaRegistryCatalogHandler.decode(SchemaRegistryCatalogHandler.java:397)
at io.aklivity.zilla.runtime.model.avro/io.aklivity.zilla.runtime.model.avro.internal.AvroReadConverterHandler.convert(AvroReadConverterHandler.java:111)
at io.aklivity.zilla.runtime.binding.kafka/io.aklivity.zilla.runtime.binding.kafka.internal.cache.KafkaCachePartition.writeEntryFinish(KafkaCachePartition.java:619)
at io.aklivity.zilla.runtime.binding.kafka/io.aklivity.zilla.runtime.binding.kafka.internal.stream.KafkaCacheServerFetchFactory$KafkaCacheServerFetchFanout.onServerFanoutReplyData(KafkaCacheServerFetchFactory.java:913)
at io.aklivity.zilla.runtime.binding.kafka/io.aklivity.zilla.runtime.binding.kafka.internal.stream.KafkaCacheServerFetchFactory$KafkaCacheServerFetchFanout.onServerFanoutMessage(KafkaCacheServerFetchFactory.java:685)
at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.handleReadReply(EngineWorker.java:1542)
at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.handleRead(EngineWorker.java:1321)
at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.concurent.ManyToOneRingBuffer.read(ManyToOneRingBuffer.java:229)
at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.doWork(EngineWorker.java:884)
... 4 more
Suppressed: java.lang.Exception: [engine/worker#15] [0x0f0f000000000244] streams=[consumeAt=0x0018a200 (0x000000000018a200), produceAt=0x003c32f8 (0x00000000003c32f8)]
at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.doWork(EngineWorker.java:888)
... 4 more
To Reproduce
Use the above schemas and zilla.yaml configuration, and keep producing messages which their map entry (properties in this case` contains 10 entries.
After about 100 messages the app will crash
Expected behavior
App does not crash :)
Zilla Environment:
Running either in k8s or locally with zilla start --config zilla.yaml -e -v
Attach the zilla.yaml config file:
Relevant sections in zilla.yaml:
---
name: kafka-api
catalogs:
schema-registry:
...
bindings:
tcp_server:
...
kafka_http_server:
type: http
kind: server
routes:
- when:
- headers:
:path: /api/v1/*
exit: http_kafka_mapping
http_kafka_mapping:
type: http-kafka
kind: proxy
routes:
- when:
- method: PUT
path: /api/v1/region/{regionId}/device/{deviceId}
exit: kafka_cache_client
with:
capability: produce
topic: devices
key: '{"region_id": "${params.regionId}", "device_id": "${params.deviceId}"}'
kafka_cache_client:
type: kafka
kind: cache_client
options:
topics:
- name: devices
key:
model: avro
view: json
catalog:
schema-registry:
- strategy: topic
version: latest
value:
model: avro
view: json
catalog:
schema-registry:
- strategy: topic
version: latest
exit: kafka_cache_server
kafka_cache_server:
type: kafka
kind: cache_server
options:
bootstrap:
- devices
topics:
- name: devices
key:
model: avro
view: json
catalog:
schema-registry:
- strategy: topic
version: latest
value:
model: avro
view: json
catalog:
schema-registry:
- strategy: topic
version: latest
exit: kafka_client
kafka_client:
...
kafka_tls_client:
...
kafka_tcp_client:
...
Additional context
I found the issue. There's a bug in the padding calculation here. When using MAP, we also need to account for the length of the quotation marks for each entry (which adds 4 bytes each). This applies to the record name as well, although I believe it's compensated by the arbitrary padding = 10 defined for each schema.
I tried adding some arbitrary high padding value and it solved the issue.
However, I don't see a proper way around without calculating the entire converted value (I don't understand why we need to calculate the padding in advance, but I guess there is a reason for that)