-
Notifications
You must be signed in to change notification settings - Fork 91
Description
Our kafka avro producer code generates a schema such that new fields can't be added by default. For example, a table with long and int column gets generated as follows:
{
"type": "record",
"name": "yellow",
"fields": [
{
"name": "VendorID",
"type": [
"long",
"null"
]
},
{
"name": "passenger_count",
"type": [
"int",
"null"
]
}
]
}The order of the union types is important; in the case above, we have the option to, but don't, set an int default. If instead we want to set null as the default, we need "null" to come first.
If we create a new table that has an additional field and try to publish to publish it to the topic / schema registry, we get:
RuntimeError: io.deephaven.UncheckedDeephavenException: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an e
arlier schema for subject "{yellow}"; error code: 409
at io.deephaven.kafka.AvroImpl$AvroProduce.ensureSchema(AvroImpl.java:253)
at io.deephaven.kafka.AvroImpl$AvroProduce.getColumnNames(AvroImpl.java:194)
at io.deephaven.kafka.KafkaTools.produceFromTable(KafkaTools.java:1432)
...
caused by io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema for subject "{yellow}"; error c
ode: 409
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:314)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:384)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:561)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:549)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:297)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:404)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:383)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:371)
at io.deephaven.kafka.AvroImpl$AvroProduce.ensureSchema(AvroImpl.java:251)
... 24 more
This is because the schema registry as backwards-compatible by default - consumers using the new schema must be able to read data from the previous schema; but since no default is provided in the new schema, it can't do that.
We may either want to be more lenient in our avro schema definitions by default, or give the user the ability to more finely configure how they want to translate the data types into an avro schema.