Skip to content

Using Maps and Arrays with Avro, cause app to crash due to unsafe memory access operation #1525

@Yoni-Weisberg

Description

@Yoni-Weisberg

Describe the bug
I use the following Avro schemas (key/value) for a topic:

{
  "type": "record",
  "namespace": "...",
  "name": "DeviceKey",
  "fields": [
    {
      "name": "region_id",
      "type": "string"
    },
    {
      "name": "device_id",
      "type": "string"
    }
  ]
}

{
  "type": "record",
  "namespace": "...",
  "name": "DeviceValue",
  "fields": [
    {
      "name": "properties",
      "type": {
        "type": "map",
        "values": "string"
      }
    }
  ]
} 

When using the Schema Registry catalog with Avro model, the app ultimately crashes with the following exception:

org.agrona.concurrent.AgentTerminationException: java.lang.InternalError: a fault occurred in an unsafe memory access operation
        at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.doWork(EngineWorker.java:890)
        at org.agrona.core/org.agrona.concurrent.AgentRunner.doWork(AgentRunner.java:304)
        at org.agrona.core/org.agrona.concurrent.AgentRunner.workLoop(AgentRunner.java:296)
        at org.agrona.core/org.agrona.concurrent.AgentRunner.run(AgentRunner.java:162)
        at java.base/java.lang.Thread.run(Thread.java:1570)
Caused by: java.lang.InternalError: a fault occurred in an unsafe memory access operation
        at io.aklivity.zilla.runtime.binding.kafka/io.aklivity.zilla.runtime.binding.kafka.internal.cache.KafkaCacheFile.writeInt(KafkaCacheFile.java:194)
        at io.aklivity.zilla.runtime.binding.kafka/io.aklivity.zilla.runtime.binding.kafka.internal.cache.KafkaCachePartition.lambda$writeEntryFinish$3(KafkaCachePartition.java:612)
        at io.aklivity.zilla.runtime.model.avro/io.aklivity.zilla.runtime.model.avro.internal.AvroReadConverterHandler.decodePayload(AvroReadConverterHandler.java:176)
        at io.aklivity.zilla.runtime.catalog.schema.registry/io.aklivity.zilla.runtime.catalog.schema.registry.internal.handler.SchemaRegistryCatalogHandler.decode(SchemaRegistryCatalogHandler.java:397)
        at io.aklivity.zilla.runtime.model.avro/io.aklivity.zilla.runtime.model.avro.internal.AvroReadConverterHandler.convert(AvroReadConverterHandler.java:111)
        at io.aklivity.zilla.runtime.binding.kafka/io.aklivity.zilla.runtime.binding.kafka.internal.cache.KafkaCachePartition.writeEntryFinish(KafkaCachePartition.java:619)
        at io.aklivity.zilla.runtime.binding.kafka/io.aklivity.zilla.runtime.binding.kafka.internal.stream.KafkaCacheServerFetchFactory$KafkaCacheServerFetchFanout.onServerFanoutReplyData(KafkaCacheServerFetchFactory.java:913)
        at io.aklivity.zilla.runtime.binding.kafka/io.aklivity.zilla.runtime.binding.kafka.internal.stream.KafkaCacheServerFetchFactory$KafkaCacheServerFetchFanout.onServerFanoutMessage(KafkaCacheServerFetchFactory.java:685)
        at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.handleReadReply(EngineWorker.java:1542)
        at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.handleRead(EngineWorker.java:1321)
        at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.concurent.ManyToOneRingBuffer.read(ManyToOneRingBuffer.java:229)
        at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.doWork(EngineWorker.java:884)
        ... 4 more
        Suppressed: java.lang.Exception: [engine/worker#15]     [0x0f0f000000000244] streams=[consumeAt=0x0018a200 (0x000000000018a200), produceAt=0x003c32f8 (0x00000000003c32f8)]
                at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.doWork(EngineWorker.java:888)
                ... 4 more

To Reproduce
Use the above schemas and zilla.yaml configuration, and keep producing messages which their map entry (properties in this case` contains 10 entries.
After about 100 messages the app will crash

Expected behavior
App does not crash :)

Zilla Environment:
Running either in k8s or locally with zilla start --config zilla.yaml -e -v

Attach the zilla.yaml config file:
Relevant sections in zilla.yaml:

---
name: kafka-api

catalogs:
  schema-registry:
    ...
 
bindings:
  tcp_server:
    ...

  kafka_http_server:
    type: http
    kind: server
    routes:
      - when:
          - headers:
              :path: /api/v1/*
        exit: http_kafka_mapping

  http_kafka_mapping:
    type: http-kafka
    kind: proxy
    routes:
      - when:
          - method: PUT
            path: /api/v1/region/{regionId}/device/{deviceId}
        exit: kafka_cache_client
        with:
          capability: produce
          topic: devices
          key: '{"region_id": "${params.regionId}", "device_id": "${params.deviceId}"}'

  
  kafka_cache_client:
    type: kafka
    kind: cache_client
    options:
      topics:
        - name: devices
          key:
            model: avro
            view: json
            catalog:
              schema-registry:
                - strategy: topic
                  version: latest
          value:
            model: avro
            view: json
            catalog:
              schema-registry:
                - strategy: topic
                  version: latest
    exit: kafka_cache_server

  kafka_cache_server:
    type: kafka
    kind: cache_server
    options:
      bootstrap:
        - devices
      topics:
        - name: devices
          key:
            model: avro
            view: json
            catalog:
              schema-registry:
                - strategy: topic
                  version: latest
          value:
            model: avro
            view: json
            catalog:
              schema-registry:
                - strategy: topic
                  version: latest
    exit: kafka_client
  
  kafka_client:
    ...
  kafka_tls_client:
    ...
  kafka_tcp_client:
    ...

Additional context
I found the issue. There's a bug in the padding calculation here. When using MAP, we also need to account for the length of the quotation marks for each entry (which adds 4 bytes each). This applies to the record name as well, although I believe it's compensated by the arbitrary padding = 10 defined for each schema.
I tried adding some arbitrary high padding value and it solved the issue.
However, I don't see a proper way around without calculating the entire converted value (I don't understand why we need to calculate the padding in advance, but I guess there is a reason for that)

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions