Skip to content

Commit f49c7b2

Browse files
berg223lhotari
authored andcommitted
[fix][client] Send all chunkMessageIds to broker for redelivery (apache#25229)
(cherry picked from commit 0a0ce6d)
1 parent 2a46c70 commit f49c7b2

File tree

3 files changed

+54
-2
lines changed

3 files changed

+54
-2
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,39 @@ public void testInterleavedChunks() throws Exception {
193193
assertEquals(receivedUuidList1, Arrays.asList("A-0", "B-0", "B-1", "A-1"));
194194
}
195195

196+
// Issue #25220
197+
@Test
198+
public void testNegativeAckChunkedMessage() throws Exception {
199+
final String topic = "persistent://my-property/my-ns/test-negative-acknowledge-with-chunk";
200+
201+
@Cleanup
202+
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
203+
.topic(topic)
204+
.subscriptionName("sub1")
205+
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
206+
.subscriptionType(SubscriptionType.Shared)
207+
.negativeAckRedeliveryDelay(1, TimeUnit.SECONDS)
208+
.subscribe();
209+
210+
@Cleanup
211+
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
212+
.topic(topic)
213+
.enableBatching(false)
214+
.enableChunking(true)
215+
.chunkMaxMessageSize(1024) // 1KB max - forces chunking for larger messages
216+
.create();
217+
String longMessage = "X".repeat(10 * 1024);
218+
producer.sendAsync(longMessage);
219+
producer.flush();
220+
221+
// negative ack the first message
222+
consumer.negativeAcknowledge(consumer.receive());
223+
224+
// now 2s has passed, the first message should be redelivered 1s later.
225+
Message<String> msg1 = consumer.receive(2, TimeUnit.SECONDS);
226+
assertNotNull(msg1);
227+
}
228+
196229
private Producer<String> createProducer(String topic) throws PulsarClientException {
197230
return pulsarClient.newProducer(Schema.STRING)
198231
.topic(topic)

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1486,7 +1486,6 @@ void messageReceived(CommandMessage cmdMessage, ByteBuf headersAndPayload, Clien
14861486
// and return undecrypted payload
14871487
if (isMessageUndecryptable || (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch())) {
14881488

1489-
// right now, chunked messages are only supported by non-shared subscription
14901489
if (isChunkedMessage) {
14911490
uncompressedPayload = processMessageChunk(uncompressedPayload, msgMetadata, msgId, messageId, cnx);
14921491
if (uncompressedPayload == null) {

pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ class NegativeAcksTracker implements Closeable {
4747
// different timestamp, there will be multiple entries in the map
4848
// RB Tree -> LongOpenHashMap -> Roaring64Bitmap
4949
private Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>> nackedMessages = null;
50+
private final Long2ObjectMap<Long2ObjectMap<MessageId>> nackedMessageIds = new Long2ObjectOpenHashMap<>();
5051

5152
private final ConsumerBase<?> consumer;
5253
private final Timer timer;
@@ -89,7 +90,17 @@ private void triggerRedelivery(Timeout t) {
8990
long ledgerId = ledgerEntry.getLongKey();
9091
Roaring64Bitmap entrySet = ledgerEntry.getValue();
9192
entrySet.forEach(entryId -> {
92-
MessageId msgId = new MessageIdImpl(ledgerId, entryId, DUMMY_PARTITION_INDEX);
93+
MessageId msgId = null;
94+
Long2ObjectMap<MessageId> entryMap = nackedMessageIds.get(ledgerId);
95+
if (entryMap != null) {
96+
msgId = entryMap.remove(entryId);
97+
if (entryMap.isEmpty()) {
98+
nackedMessageIds.remove(ledgerId);
99+
}
100+
}
101+
if (msgId == null) {
102+
msgId = new MessageIdImpl(ledgerId, entryId, DUMMY_PARTITION_INDEX);
103+
}
93104
addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, messagesToRedeliver, this.consumer);
94105
messagesToRedeliver.add(msgId);
95106
});
@@ -143,6 +154,12 @@ static long trimLowerBit(long timestamp, int bits) {
143154
}
144155

145156
private synchronized void add(MessageId messageId, int redeliveryCount) {
157+
if (messageId instanceof ChunkMessageIdImpl) {
158+
MessageIdAdv msgId = (MessageIdAdv) messageId;
159+
nackedMessageIds.computeIfAbsent(msgId.getLedgerId(), k -> new Long2ObjectOpenHashMap<>())
160+
.put(msgId.getEntryId(), messageId);
161+
}
162+
146163
if (nackedMessages == null) {
147164
nackedMessages = new Long2ObjectAVLTreeMap<>();
148165
}
@@ -201,5 +218,8 @@ public synchronized void close() {
201218
nackedMessages.clear();
202219
nackedMessages = null;
203220
}
221+
if (nackedMessageIds != null) {
222+
nackedMessageIds.clear();
223+
}
204224
}
205225
}

0 commit comments

Comments
 (0)