Skip to content

Commit 982d94d

Browse files
committed
do not reuse schemaMap in AutoConsumeSchema(close the client and create a new one)
1 parent 7009071 commit 982d94d

File tree

1 file changed

+38
-28
lines changed

1 file changed

+38
-28
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java

Lines changed: 38 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -4329,6 +4329,10 @@ public static Object[] avroSchemaProvider() {
43294329
public void testAccessAvroSchemaMetadata(Schema<MyBean> schema) throws Exception {
43304330
log.info("-- Starting {} test --", methodName);
43314331

4332+
if (pulsarClient == null) {
4333+
pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
4334+
}
4335+
43324336
final String topic = "persistent://my-property/my-ns/accessSchema";
43334337
Consumer<GenericRecord> consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
43344338
.topic(topic)
@@ -4344,37 +4348,43 @@ public void testAccessAvroSchemaMetadata(Schema<MyBean> schema) throws Exception
43444348
producer.send(payload);
43454349
producer.close();
43464350

4347-
GenericRecord res = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS).getValue();
4348-
consumer.close();
4349-
assertEquals(schema.getSchemaInfo().getType(), res.getSchemaType());
4350-
org.apache.avro.generic.GenericRecord nativeAvroRecord = null;
4351-
JsonNode nativeJsonRecord = null;
4352-
if (schema.getSchemaInfo().getType() == SchemaType.AVRO) {
4353-
nativeAvroRecord = (org.apache.avro.generic.GenericRecord) res.getNativeObject();
4354-
assertNotNull(nativeAvroRecord);
4355-
} else {
4356-
nativeJsonRecord = (JsonNode) res.getNativeObject();
4357-
assertNotNull(nativeJsonRecord);
4358-
}
4359-
for (org.apache.pulsar.client.api.schema.Field f : res.getFields()) {
4360-
log.info("field {} {}", f.getName(), res.getField(f));
4361-
assertEquals("field", f.getName());
4362-
assertEquals("aaaaaaaaaaaaaaaaaaaaaaaaa", res.getField(f));
4363-
4364-
if (nativeAvroRecord != null) {
4365-
// test that the native schema is accessible
4366-
org.apache.avro.Schema.Field fieldDetails = nativeAvroRecord.getSchema().getField(f.getName());
4367-
// a nullable string is an UNION
4368-
assertEquals(org.apache.avro.Schema.Type.UNION, fieldDetails.schema().getType());
4369-
assertTrue(fieldDetails.schema().getTypes().stream().anyMatch(s -> s.getType() == org.apache.avro.Schema.Type.STRING));
4370-
assertTrue(fieldDetails.schema().getTypes().stream().anyMatch(s -> s.getType() == org.apache.avro.Schema.Type.NULL));
4351+
try {
4352+
GenericRecord res = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS).getValue();
4353+
consumer.close();
4354+
assertEquals(schema.getSchemaInfo().getType(), res.getSchemaType());
4355+
org.apache.avro.generic.GenericRecord nativeAvroRecord = null;
4356+
JsonNode nativeJsonRecord = null;
4357+
if (schema.getSchemaInfo().getType() == SchemaType.AVRO) {
4358+
nativeAvroRecord = (org.apache.avro.generic.GenericRecord) res.getNativeObject();
4359+
assertNotNull(nativeAvroRecord);
43714360
} else {
4372-
assertEquals(JsonNodeType.STRING, nativeJsonRecord.get("field").getNodeType());
4361+
nativeJsonRecord = (JsonNode) res.getNativeObject();
4362+
assertNotNull(nativeJsonRecord);
4363+
}
4364+
for (org.apache.pulsar.client.api.schema.Field f : res.getFields()) {
4365+
log.info("field {} {}", f.getName(), res.getField(f));
4366+
assertEquals("field", f.getName());
4367+
assertEquals("aaaaaaaaaaaaaaaaaaaaaaaaa", res.getField(f));
4368+
4369+
if (nativeAvroRecord != null) {
4370+
// test that the native schema is accessible
4371+
org.apache.avro.Schema.Field fieldDetails = nativeAvroRecord.getSchema().getField(f.getName());
4372+
// a nullable string is an UNION
4373+
assertEquals(org.apache.avro.Schema.Type.UNION, fieldDetails.schema().getType());
4374+
assertTrue(fieldDetails.schema().getTypes().stream().anyMatch(s -> s.getType() == org.apache.avro.Schema.Type.STRING));
4375+
assertTrue(fieldDetails.schema().getTypes().stream().anyMatch(s -> s.getType() == org.apache.avro.Schema.Type.NULL));
4376+
} else {
4377+
assertEquals(JsonNodeType.STRING, nativeJsonRecord.get("field").getNodeType());
4378+
}
43734379
}
4380+
assertEquals(1, res.getFields().size());
4381+
} catch (Exception e) {
4382+
fail();
4383+
} finally {
4384+
pulsarClient.shutdown();
4385+
pulsarClient = null;
4386+
admin.schemas().deleteSchema(topic);
43744387
}
4375-
assertEquals(1, res.getFields().size());
4376-
4377-
admin.schemas().deleteSchema(topic);
43784388
}
43794389

43804390
@Test(timeOut = 100000)

0 commit comments

Comments
 (0)