@@ -1569,7 +1569,7 @@ public Set<? extends Position> asyncReplayEntries(Set<? extends Position> positi
15691569 Set <Position > alreadyAcknowledgedPositions = new HashSet <>();
15701570 lock .readLock ().lock ();
15711571 try {
1572- positions .stream ().filter (this ::isMessageDeleted ).forEach (alreadyAcknowledgedPositions ::add );
1572+ positions .stream ().filter (this ::internalIsMessageDeleted ).forEach (alreadyAcknowledgedPositions ::add );
15731573 } finally {
15741574 lock .readLock ().unlock ();
15751575 }
@@ -2345,7 +2345,7 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
23452345 return ;
23462346 }
23472347
2348- if (isMessageDeleted (position )) {
2348+ if (internalIsMessageDeleted (position )) {
23492349 if (getConfig ().isDeletionAtBatchIndexLevelEnabled ()) {
23502350 BitSetRecyclable bitSetRecyclable = batchDeletedIndexes .remove (position );
23512351 if (bitSetRecyclable != null ) {
@@ -3543,13 +3543,19 @@ public Position processIndividuallyDeletedMessagesAndGetMarkDeletedPosition(
35433543 public boolean isMessageDeleted (Position position ) {
35443544 lock .readLock ().lock ();
35453545 try {
3546- return position .compareTo (markDeletePosition ) <= 0
3547- || individualDeletedMessages .contains (position .getLedgerId (), position .getEntryId ());
3546+ return internalIsMessageDeleted (position );
35483547 } finally {
35493548 lock .readLock ().unlock ();
35503549 }
35513550 }
35523551
3552+ // When this method is called while the external has already acquired a write lock or a read lock,
3553+ // it avoids unnecessary lock nesting.
3554+ private boolean internalIsMessageDeleted (Position position ) {
3555+ return position .compareTo (markDeletePosition ) <= 0
3556+ || individualDeletedMessages .contains (position .getLedgerId (), position .getEntryId ());
3557+ }
3558+
35533559 //this method will return a copy of the position's ack set
35543560 @ Override
35553561 public long [] getBatchPositionAckSet (Position position ) {
0 commit comments