Skip to content

Commit af1360f

Browse files
authored
[PIP-236][fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic (#17449)
Fixes #17354 PIP #19113 ### Motivation *Fixed the failure to use schema to create consumer after using AUTO-CONSUME consumer to subscribe an empty topic, and Broker returned the error message as `IncompatibleSchemaException("Topic does not have schema to check")`.* ### Modifications *In PersistentTopic::addSchemaIfIdleOrCheckCompatible, when there is an active consumer, but the consumer is using the AUTO_CONSUME schema to subscribe to the topic. Continuing to create a schema consumer to subscribe to the topic will fail.* - When `numActiveConsumers != 0`, and check the schema of the currently existing consumers is AUTO_CONSUME schema.
1 parent b2658af commit af1360f

File tree

10 files changed

+153
-30
lines changed

10 files changed

+153
-30
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
5959
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
6060
import org.apache.pulsar.common.protocol.Commands;
61+
import org.apache.pulsar.common.schema.SchemaType;
6162
import org.apache.pulsar.common.stats.Rate;
6263
import org.apache.pulsar.common.util.DateFormatter;
6364
import org.apache.pulsar.common.util.FutureUtil;
@@ -142,12 +143,24 @@ public class Consumer {
142143

143144
private long negtiveUnackedMsgsTimestamp;
144145

146+
@Getter
147+
private final SchemaType schemaType;
148+
145149
public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId,
146150
int priorityLevel, String consumerName,
147151
boolean isDurable, TransportCnx cnx, String appId,
148152
Map<String, String> metadata, boolean readCompacted,
149153
KeySharedMeta keySharedMeta, MessageId startMessageId, long consumerEpoch) {
154+
this(subscription, subType, topicName, consumerId, priorityLevel, consumerName, isDurable, cnx, appId,
155+
metadata, readCompacted, keySharedMeta, startMessageId, consumerEpoch, null);
156+
}
150157

158+
public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId,
159+
int priorityLevel, String consumerName,
160+
boolean isDurable, TransportCnx cnx, String appId,
161+
Map<String, String> metadata, boolean readCompacted,
162+
KeySharedMeta keySharedMeta, MessageId startMessageId,
163+
long consumerEpoch, SchemaType schemaType) {
151164
this.subscription = subscription;
152165
this.subType = subType;
153166
this.topicName = topicName;
@@ -204,6 +217,8 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
204217
this.consumerEpoch = consumerEpoch;
205218
this.isAcknowledgmentAtBatchIndexLevelEnabled = subscription.getTopic().getBrokerService()
206219
.getPulsar().getConfiguration().isAcknowledgmentAtBatchIndexLevelEnabled();
220+
221+
this.schemaType = schemaType;
207222
}
208223

209224
@VisibleForTesting
@@ -231,6 +246,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
231246
this.clientAddress = null;
232247
this.startMessageId = null;
233248
this.isAcknowledgmentAtBatchIndexLevelEnabled = false;
249+
this.schemaType = null;
234250
MESSAGE_PERMITS_UPDATER.set(this, availablePermits);
235251
}
236252

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1195,8 +1195,9 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
11951195
.replicatedSubscriptionStateArg(isReplicated).keySharedMeta(keySharedMeta)
11961196
.subscriptionProperties(subscriptionProperties)
11971197
.consumerEpoch(consumerEpoch)
1198+
.schemaType(schema == null ? null : schema.getType())
11981199
.build();
1199-
if (schema != null) {
1200+
if (schema != null && schema.getType() != SchemaType.AUTO_CONSUME) {
12001201
return topic.addSchemaIfIdleOrCheckCompatible(schema)
12011202
.thenCompose(v -> topic.subscribe(option));
12021203
} else {

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.pulsar.common.api.proto.CommandSubscribe;
3030
import org.apache.pulsar.common.api.proto.KeySharedMeta;
3131
import org.apache.pulsar.common.api.proto.KeyValue;
32+
import org.apache.pulsar.common.schema.SchemaType;
3233

3334
@Getter
3435
@Builder
@@ -49,6 +50,7 @@ public class SubscriptionOption {
4950
private KeySharedMeta keySharedMeta;
5051
private Optional<Map<String, String>> subscriptionProperties;
5152
private long consumerEpoch;
53+
private SchemaType schemaType;
5254

5355
public static Optional<Map<String, String>> getPropertiesMap(List<KeyValue> list) {
5456
if (list == null) {

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
import org.apache.pulsar.common.policies.data.stats.PublisherStatsImpl;
8888
import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
8989
import org.apache.pulsar.common.protocol.schema.SchemaData;
90+
import org.apache.pulsar.common.schema.SchemaType;
9091
import org.apache.pulsar.common.util.FutureUtil;
9192
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
9293
import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -255,7 +256,8 @@ public CompletableFuture<Consumer> subscribe(SubscriptionOption option) {
255256
option.isDurable(), option.getStartMessageId(), option.getMetadata(),
256257
option.isReadCompacted(),
257258
option.getStartMessageRollbackDurationSec(), option.isReplicatedSubscriptionStateArg(),
258-
option.getKeySharedMeta(), option.getSubscriptionProperties().orElse(null));
259+
option.getKeySharedMeta(), option.getSubscriptionProperties().orElse(null),
260+
option.getSchemaType());
259261
}
260262

261263
@Override
@@ -268,7 +270,7 @@ public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subs
268270
KeySharedMeta keySharedMeta) {
269271
return internalSubscribe(cnx, subscriptionName, consumerId, subType, priorityLevel, consumerName,
270272
isDurable, startMessageId, metadata, readCompacted, resetStartMessageBackInSec,
271-
replicateSubscriptionState, keySharedMeta, null);
273+
replicateSubscriptionState, keySharedMeta, null, null);
272274
}
273275

274276
private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, String subscriptionName,
@@ -279,7 +281,8 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
279281
long resetStartMessageBackInSec,
280282
boolean replicateSubscriptionState,
281283
KeySharedMeta keySharedMeta,
282-
Map<String, String> subscriptionProperties) {
284+
Map<String, String> subscriptionProperties,
285+
SchemaType schemaType) {
283286

284287
return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> {
285288
final CompletableFuture<Consumer> future = new CompletableFuture<>();
@@ -321,8 +324,8 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
321324
name -> new NonPersistentSubscription(this, subscriptionName, isDurable, subscriptionProperties));
322325

323326
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName,
324-
false, cnx, cnx.getAuthRole(), metadata, readCompacted, keySharedMeta,
325-
MessageId.latest, DEFAULT_CONSUMER_EPOCH);
327+
false, cnx, cnx.getAuthRole(), metadata, readCompacted, keySharedMeta, MessageId.latest,
328+
DEFAULT_CONSUMER_EPOCH, schemaType);
326329
if (isMigrated()) {
327330
consumer.topicMigrated(getClusterMigrationUrl());
328331
}
@@ -1162,12 +1165,14 @@ public CompletableFuture<MessageId> getLastMessageId() {
11621165
@Override
11631166
public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
11641167
return hasSchema().thenCompose((hasSchema) -> {
1165-
int numActiveConsumers = subscriptions.values().stream()
1166-
.mapToInt(subscription -> subscription.getConsumers().size())
1168+
int numActiveConsumersWithoutAutoSchema = subscriptions.values().stream()
1169+
.mapToInt(subscription -> subscription.getConsumers().stream()
1170+
.filter(consumer -> consumer.getSchemaType() != SchemaType.AUTO_CONSUME)
1171+
.toList().size())
11671172
.sum();
11681173
if (hasSchema
11691174
|| (!producers.isEmpty())
1170-
|| (numActiveConsumers != 0)
1175+
|| (numActiveConsumersWithoutAutoSchema != 0)
11711176
|| ENTRIES_ADDED_COUNTER_UPDATER.get(this) != 0) {
11721177
return checkSchemaCompatibleForConsumer(schema);
11731178
} else {

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@
155155
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
156156
import org.apache.pulsar.common.protocol.Commands;
157157
import org.apache.pulsar.common.protocol.schema.SchemaData;
158+
import org.apache.pulsar.common.schema.SchemaType;
158159
import org.apache.pulsar.common.topics.TopicCompactionStrategy;
159160
import org.apache.pulsar.common.util.Codec;
160161
import org.apache.pulsar.common.util.DateFormatter;
@@ -727,7 +728,8 @@ public CompletableFuture<Consumer> subscribe(SubscriptionOption option) {
727728
option.getStartMessageId(), option.getMetadata(), option.isReadCompacted(),
728729
option.getInitialPosition(), option.getStartMessageRollbackDurationSec(),
729730
option.isReplicatedSubscriptionStateArg(), option.getKeySharedMeta(),
730-
option.getSubscriptionProperties().orElse(Collections.emptyMap()), option.getConsumerEpoch());
731+
option.getSubscriptionProperties().orElse(Collections.emptyMap()),
732+
option.getConsumerEpoch(), option.getSchemaType());
731733
}
732734

733735
private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, String subscriptionName,
@@ -740,7 +742,8 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
740742
boolean replicatedSubscriptionStateArg,
741743
KeySharedMeta keySharedMeta,
742744
Map<String, String> subscriptionProperties,
743-
long consumerEpoch) {
745+
long consumerEpoch,
746+
SchemaType schemaType) {
744747
if (readCompacted && !(subType == SubType.Failover || subType == SubType.Exclusive)) {
745748
return FutureUtil.failedFuture(new NotAllowedException(
746749
"readCompacted only allowed on failover or exclusive subscriptions"));
@@ -828,7 +831,7 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
828831
CompletableFuture<Consumer> future = subscriptionFuture.thenCompose(subscription -> {
829832
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel,
830833
consumerName, isDurable, cnx, cnx.getAuthRole(), metadata,
831-
readCompacted, keySharedMeta, startMessageId, consumerEpoch);
834+
readCompacted, keySharedMeta, startMessageId, consumerEpoch, schemaType);
832835

833836
return addConsumerToSubscription(subscription, consumer).thenCompose(v -> {
834837
if (subscription instanceof PersistentSubscription persistentSubscription) {
@@ -907,7 +910,7 @@ public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subs
907910
KeySharedMeta keySharedMeta) {
908911
return internalSubscribe(cnx, subscriptionName, consumerId, subType, priorityLevel, consumerName,
909912
isDurable, startMessageId, metadata, readCompacted, initialPosition, startMessageRollbackDurationSec,
910-
replicatedSubscriptionStateArg, keySharedMeta, null, DEFAULT_CONSUMER_EPOCH);
913+
replicatedSubscriptionStateArg, keySharedMeta, null, DEFAULT_CONSUMER_EPOCH, null);
911914
}
912915

913916
private CompletableFuture<Subscription> getDurableSubscription(String subscriptionName,
@@ -3107,21 +3110,22 @@ public synchronized OffloadProcessStatus offloadStatus() {
31073110

31083111
@Override
31093112
public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
3110-
return hasSchema()
3111-
.thenCompose((hasSchema) -> {
3112-
int numActiveConsumers = subscriptions.values().stream()
3113-
.mapToInt(subscription -> subscription.getConsumers().size())
3114-
.sum();
3115-
if (hasSchema
3116-
|| (!producers.isEmpty())
3117-
|| (numActiveConsumers != 0)
3118-
|| (ledger.getTotalSize() != 0)) {
3119-
return checkSchemaCompatibleForConsumer(schema);
3120-
} else {
3121-
return addSchema(schema).thenCompose(schemaVersion ->
3122-
CompletableFuture.completedFuture(null));
3123-
}
3124-
});
3113+
return hasSchema().thenCompose((hasSchema) -> {
3114+
int numActiveConsumersWithoutAutoSchema = subscriptions.values().stream()
3115+
.mapToInt(subscription -> subscription.getConsumers().stream()
3116+
.filter(consumer -> consumer.getSchemaType() != SchemaType.AUTO_CONSUME)
3117+
.toList().size())
3118+
.sum();
3119+
if (hasSchema
3120+
|| (!producers.isEmpty())
3121+
|| (numActiveConsumersWithoutAutoSchema != 0)
3122+
|| (ledger.getTotalSize() != 0)) {
3123+
return checkSchemaCompatibleForConsumer(schema);
3124+
} else {
3125+
return addSchema(schema).thenCompose(schemaVersion ->
3126+
CompletableFuture.completedFuture(null));
3127+
}
3128+
});
31253129
}
31263130

31273131
public synchronized void checkReplicatedSubscriptionControllerState() {

pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1239,6 +1239,79 @@ public void testAutoCreatedSchema(String domain) throws Exception {
12391239
Assert.assertEquals(admin.schemas().getSchemaInfo(topic2).getType(), SchemaType.STRING);
12401240
}
12411241

1242+
@Test(dataProvider = "topicDomain")
1243+
public void testSubscribeWithSchemaAfterAutoConsumeNewTopic(String domain) throws Exception {
1244+
final String topic = domain + "my-property/my-ns/testSubscribeWithSchemaAfterAutoConsume-1";
1245+
1246+
@Cleanup
1247+
Consumer<GenericRecord> autoConsumer1 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
1248+
.topic(topic)
1249+
.subscriptionType(SubscriptionType.Shared)
1250+
.subscriptionName("sub0")
1251+
.consumerName("autoConsumer1")
1252+
.subscribe();
1253+
@Cleanup
1254+
Consumer<GenericRecord> autoConsumer2 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
1255+
.topic(topic)
1256+
.subscriptionType(SubscriptionType.Shared)
1257+
.subscriptionName("sub0")
1258+
.consumerName("autoConsumer2")
1259+
.subscribe();
1260+
@Cleanup
1261+
Consumer<GenericRecord> autoConsumer3 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
1262+
.topic(topic)
1263+
.subscriptionType(SubscriptionType.Shared)
1264+
.subscriptionName("sub1")
1265+
.consumerName("autoConsumer3")
1266+
.subscribe();
1267+
@Cleanup
1268+
Consumer<GenericRecord> autoConsumer4 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
1269+
.topic(topic)
1270+
.subscriptionType(SubscriptionType.Shared)
1271+
.subscriptionName("sub1")
1272+
.consumerName("autoConsumer4")
1273+
.subscribe();
1274+
try {
1275+
log.info("The autoConsumer1 isConnected: " + autoConsumer1.isConnected());
1276+
log.info("The autoConsumer2 isConnected: " + autoConsumer2.isConnected());
1277+
log.info("The autoConsumer3 isConnected: " + autoConsumer3.isConnected());
1278+
log.info("The autoConsumer4 isConnected: " + autoConsumer4.isConnected());
1279+
admin.schemas().getSchemaInfo(topic);
1280+
fail("The schema of topic should not exist");
1281+
} catch (PulsarAdminException e) {
1282+
assertEquals(e.getStatusCode(), 404);
1283+
}
1284+
1285+
@Cleanup
1286+
Consumer<V1Data> consumerWithSchema1 = pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
1287+
.topic(topic)
1288+
.subscriptionType(SubscriptionType.Shared)
1289+
.subscriptionName("sub0")
1290+
.consumerName("consumerWithSchema-1")
1291+
.subscribe();
1292+
@Cleanup
1293+
Consumer<V1Data> consumerWithSchema2 = pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
1294+
.topic(topic)
1295+
.subscriptionType(SubscriptionType.Shared)
1296+
.subscriptionName("sub0")
1297+
.consumerName("consumerWithSchema-2")
1298+
.subscribe();
1299+
@Cleanup
1300+
Consumer<V1Data> consumerWithSchema3 = pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
1301+
.topic(topic)
1302+
.subscriptionType(SubscriptionType.Shared)
1303+
.subscriptionName("sub1")
1304+
.consumerName("consumerWithSchema-3")
1305+
.subscribe();
1306+
@Cleanup
1307+
Consumer<V1Data> consumerWithSchema4 = pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
1308+
.topic(topic)
1309+
.subscriptionType(SubscriptionType.Shared)
1310+
.subscriptionName("sub1")
1311+
.consumerName("consumerWithSchema-4")
1312+
.subscribe();
1313+
}
1314+
12421315
@DataProvider(name = "keyEncodingType")
12431316
public static Object[] keyEncodingType() {
12441317
return new Object[] { KeyValueEncodingType.SEPARATED, KeyValueEncodingType.INLINE };

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
import org.apache.pulsar.client.api.transaction.TxnID;
8585
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
8686
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
87+
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
8788
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
8889
import org.apache.pulsar.client.util.ExecutorProvider;
8990
import org.apache.pulsar.client.util.RetryMessageUtil;
@@ -811,6 +812,11 @@ public void connectionOpened(final ClientCnx cnx) {
811812
if (si != null && (SchemaType.BYTES == si.getType() || SchemaType.NONE == si.getType())) {
812813
// don't set schema for Schema.BYTES
813814
si = null;
815+
} else {
816+
if (schema instanceof AutoConsumeSchema
817+
&& Commands.peerSupportsCarryAutoConsumeSchemaToBroker(cnx.getRemoteEndpointProtocolVersion())) {
818+
si = AutoConsumeSchema.SCHEMA_INFO;
819+
}
814820
}
815821
// startMessageRollbackDurationInSec should be consider only once when consumer connects to first time
816822
long startMessageRollbackDuration = (startMessageRollbackDurationInSec > 0

pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@ public class AutoConsumeSchema implements Schema<GenericRecord> {
5757

5858
private SchemaInfoProvider schemaInfoProvider;
5959

60+
public static final SchemaInfo SCHEMA_INFO = SchemaInfoImpl.builder()
61+
.name("AutoConsume")
62+
.type(SchemaType.AUTO_CONSUME)
63+
.schema(new byte[0])
64+
.build();
65+
6066
private ConcurrentMap<SchemaVersion, Schema<?>> initSchemaMap() {
6167
ConcurrentMap<SchemaVersion, Schema<?>> schemaMap = new ConcurrentHashMap<>();
6268
// The Schema.BYTES will not be uploaded to the broker and store in the schema storage,

0 commit comments

Comments
 (0)