Skip to content

Commit ff70e66

Browse files
Denovo1998lhotari
authored andcommitted
[fix][broker] Flaky-test: ExtensibleLoadManagerImplTest.testDisableBroker (apache#24770)
(cherry picked from commit e44e084)
1 parent e371e66 commit ff70e66

File tree

1 file changed

+26
-0
lines changed

1 file changed

+26
-0
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,32 @@ private CompletableFuture<Optional<String>> getActiveOwnerAsync(
491491
String serviceUnit,
492492
ServiceUnitState state,
493493
Optional<String> owner) {
494+
// When the channel is closed, do not perform liveness verification, return according to the status:
495+
if (channelState == Closed) {
496+
switch (state) {
497+
// Owned/Splitting: Directly return owner (for isOwner judgment as true)
498+
case Owned:
499+
case Splitting:
500+
return CompletableFuture.completedFuture(owner);
501+
case Assigning:
502+
case Releasing:
503+
if (owner.isPresent()) {
504+
if (isTargetBroker(owner.get())) {
505+
// This machine is the target taker,
506+
// return an unfinished future with "waiting for ownership"
507+
return dedupeGetOwnerRequest(serviceUnit).thenApply(Optional::ofNullable);
508+
} else {
509+
// The target is another broker, return directly so that the upper layer can redirect
510+
return CompletableFuture.completedFuture(owner);
511+
}
512+
} else {
513+
return CompletableFuture.completedFuture(Optional.empty());
514+
}
515+
// Other status: return empty
516+
default:
517+
return CompletableFuture.completedFuture(Optional.empty());
518+
}
519+
}
494520
return dedupeGetOwnerRequest(serviceUnit)
495521
.thenCompose(newOwner -> {
496522
if (newOwner == null) {

0 commit comments

Comments
 (0)