Skip to content

Commit c180d9e

Browse files
committed
Clean up after extension give up
1 parent b9b4aff commit c180d9e

File tree

2 files changed

+23
-11
lines changed

2 files changed

+23
-11
lines changed

google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ class MessageDispatcher {
7878
private final MessageWaiter messagesWaiter;
7979

8080
// Maps ID to "total expiration time". If it takes longer than this, stop extending.
81-
private final ConcurrentMap<String, Instant> pendingMessages = new ConcurrentHashMap<>();
81+
private final ConcurrentMap<AckHandler, Instant> pendingMessages = new ConcurrentHashMap<>();
8282

8383
private final LinkedBlockingQueue<String> pendingAcks = new LinkedBlockingQueue<>();
8484
private final LinkedBlockingQueue<String> pendingNacks = new LinkedBlockingQueue<>();
@@ -141,7 +141,7 @@ private class AckHandler implements FutureCallback<AckReply> {
141141
}
142142

143143
private void onBoth(LinkedBlockingQueue<String> destination) {
144-
pendingMessages.remove(ackId);
144+
pendingMessages.remove(this);
145145
destination.add(ackId);
146146
flowController.release(1, outstandingBytes);
147147
messagesWaiter.incrementPendingMessages(-1);
@@ -329,17 +329,15 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don
329329
}
330330
messagesWaiter.incrementPendingMessages(messages.size());
331331

332-
Instant totalExpiration = now().plus(maxAckExtensionPeriod);
333-
for (ReceivedMessage message : messages) {
334-
pendingReceipts.add(message.getAckId());
335-
pendingMessages.put(message.getAckId(), totalExpiration);
336-
}
337332

333+
Instant totalExpiration = now().plus(maxAckExtensionPeriod);
338334
OutstandingMessageBatch outstandingBatch = new OutstandingMessageBatch(doneCallback);
339335
for (ReceivedMessage message : messages) {
340336
AckHandler ackHandler =
341337
new AckHandler(message.getAckId(), message.getMessage().getSerializedSize());
342338
outstandingBatch.addMessage(message, ackHandler);
339+
pendingReceipts.add(message.getAckId());
340+
pendingMessages.put(ackHandler, totalExpiration);
343341
}
344342
synchronized (outstandingMessageBatches) {
345343
outstandingMessageBatches.add(outstandingBatch);
@@ -436,10 +434,10 @@ void extendDeadlines() {
436434
Instant extendTo = now.plusSeconds(extendSeconds);
437435

438436
int count = 0;
439-
Iterator<Map.Entry<String, Instant>> it = pendingMessages.entrySet().iterator();
437+
Iterator<Map.Entry<AckHandler, Instant>> it = pendingMessages.entrySet().iterator();
440438
while (it.hasNext()) {
441-
Map.Entry<String, Instant> entry = it.next();
442-
String ackId = entry.getKey();
439+
Map.Entry<AckHandler, Instant> entry = it.next();
440+
String ackId = entry.getKey().ackId;
443441
Instant totalExpiration = entry.getValue();
444442
// TODO(pongad): PendingModifyAckDeadline is created to dance around polling pull,
445443
// since one modack RPC only takes one expiration.
@@ -455,6 +453,9 @@ void extendDeadlines() {
455453
int sec = Math.max(1, (int) now.until(totalExpiration, ChronoUnit.SECONDS));
456454
modacks.add(new PendingModifyAckDeadline(sec, ackId));
457455
count++;
456+
} else {
457+
flowController.release(1, entry.getKey().outstandingBytes);
458+
messagesWaiter.incrementPendingMessages(-1);
458459
}
459460
}
460461
modacks.add(modack);

google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public void run() {
5656
private List<String> sentAcks;
5757
private List<ModAckItem> sentModAcks;
5858
private FakeClock clock;
59+
private FlowController flowController;
5960

6061
@AutoValue
6162
abstract static class ModAckItem {
@@ -101,6 +102,12 @@ public void sendAckOperations(
101102
systemExecutor.shutdownNow();
102103

103104
clock = new FakeClock();
105+
flowController =
106+
new FlowController(
107+
FlowControlSettings.newBuilder()
108+
.setMaxOutstandingElementCount(1L)
109+
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException)
110+
.build());
104111

105112
dispatcher =
106113
new MessageDispatcher(
@@ -109,7 +116,7 @@ public void sendAckOperations(
109116
Duration.ofSeconds(5),
110117
Duration.ofMinutes(60),
111118
new Distribution(Subscriber.MAX_ACK_DEADLINE_SECONDS + 1),
112-
new FlowController(FlowControlSettings.newBuilder().build()),
119+
flowController,
113120
new LinkedList<MessageDispatcher.OutstandingMessageBatch>(),
114121
MoreExecutors.directExecutor(),
115122
systemExecutor,
@@ -182,6 +189,10 @@ public void testExtension_GiveUp() throws Exception {
182189
clock.advance(1, TimeUnit.DAYS);
183190
dispatcher.extendDeadlines();
184191
assertThat(sentModAcks).isEmpty();
192+
193+
// We should be able to reserve another item in the flow controller and not block shutdown
194+
flowController.reserve(1, 0);
195+
dispatcher.stop();
185196
}
186197

187198
@Test

0 commit comments

Comments
 (0)