Skip to content

Commit 22d6a06

Browse files
committed
add test for Fix the problem that using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic
1 parent dc602ef commit 22d6a06

File tree

1 file changed

+46
-0
lines changed

1 file changed

+46
-0
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1216,6 +1216,52 @@ public void testAutoCreatedSchema(String domain) throws Exception {
12161216
Assert.assertEquals(admin.schemas().getSchemaInfo(topic2).getType(), SchemaType.STRING);
12171217
}
12181218

1219+
@Test(dataProvider = "topicDomain")
1220+
public void testSubscribeWithSchemaAfterAutoConsumeNewTopic(String domain) throws Exception {
1221+
final String topic = domain + "my-property/my-ns/testSubscribeWithSchemaAfterAutoConsume-1";
1222+
1223+
Consumer autoConsumer1 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
1224+
.topic(topic)
1225+
.subscriptionType(SubscriptionType.Shared)
1226+
.subscriptionName("sub0")
1227+
.consumerName("autoConsumer1")
1228+
.subscribe();
1229+
1230+
Consumer autoConsumer2 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
1231+
.topic(topic)
1232+
.subscriptionType(SubscriptionType.Shared)
1233+
.subscriptionName("sub0")
1234+
.consumerName("autoConsumer2")
1235+
.subscribe();
1236+
try {
1237+
log.info("The autoConsumer1 isConnected: " + autoConsumer1.isConnected());
1238+
log.info("The autoConsumer2 isConnected: " + autoConsumer2.isConnected());
1239+
admin.schemas().getSchemaInfo(topic);
1240+
fail("The schema of topic should not exist");
1241+
} catch (PulsarAdminException e) {
1242+
assertEquals(e.getStatusCode(), 404);
1243+
}
1244+
1245+
Consumer<V1Data> consumerWithSchema = pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
1246+
.topic(topic)
1247+
.subscriptionType(SubscriptionType.Shared)
1248+
.subscriptionName("sub0")
1249+
.consumerName("consumerWithSchema")
1250+
.subscribe();
1251+
try {
1252+
log.info(admin.schemas().getSchemaInfo(topic).toString());
1253+
log.info("The autoConsumer1 isConnected: " + autoConsumer1.isConnected());
1254+
log.info("The autoConsumer2 isConnected: " + autoConsumer2.isConnected());
1255+
log.info("The consumerWithSchema isConnected: " + consumerWithSchema.isConnected());
1256+
} catch (PulsarAdminException e) {
1257+
assertEquals(e.getStatusCode(), 404);
1258+
}
1259+
1260+
autoConsumer1.close();
1261+
autoConsumer2.close();
1262+
consumerWithSchema.close();
1263+
}
1264+
12191265
@DataProvider(name = "keyEncodingType")
12201266
public static Object[] keyEncodingType() {
12211267
return new Object[] { KeyValueEncodingType.SEPARATED, KeyValueEncodingType.INLINE };

0 commit comments

Comments
 (0)