2323import static org .mockito .ArgumentMatchers .anyInt ;
2424import static org .mockito .ArgumentMatchers .anyString ;
2525import static org .mockito .ArgumentMatchers .eq ;
26+ import static org .mockito .Mockito .doAnswer ;
2627import static org .mockito .Mockito .doReturn ;
2728import static org .mockito .Mockito .doThrow ;
2829import static org .mockito .Mockito .mock ;
3637import java .util .Map ;
3738import java .util .concurrent .ArrayBlockingQueue ;
3839import java .util .concurrent .CompletableFuture ;
40+ import java .util .concurrent .CountDownLatch ;
3941import java .util .concurrent .TimeUnit ;
4042import lombok .extern .slf4j .Slf4j ;
4143import org .apache .pulsar .client .admin .PulsarAdmin ;
@@ -148,6 +150,17 @@ public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable {
148150 // test new assignment add functions
149151 FunctionRuntimeManager functionRuntimeManager = mock (FunctionRuntimeManager .class );
150152
153+ // Use CountDownLatch to park the background thread after first processing and before re-stubbing
154+ CountDownLatch firstProcessed = new java .util .concurrent .CountDownLatch (1 );
155+ CountDownLatch release = new java .util .concurrent .CountDownLatch (1 );
156+
157+ // On first processing of message1, block and wait for re-stubbing
158+ doAnswer (inv -> {
159+ firstProcessed .countDown ();
160+ release .await (5 , TimeUnit .SECONDS );
161+ return null ;
162+ }).when (functionRuntimeManager ).processAssignmentMessage (eq (message1 ));
163+
151164 FunctionAssignmentTailer functionAssignmentTailer =
152165 spy (new FunctionAssignmentTailer (functionRuntimeManager , readerBuilder , workerConfig , errorNotifier ));
153166
@@ -157,12 +170,17 @@ public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable {
157170 verify (errorNotifier , times (0 )).triggerError (any ());
158171
159172 messageList .add (message1 );
173+ Assert .assertTrue (firstProcessed .await (5 , TimeUnit .SECONDS ),
174+ "First processing did not reach the blocking point" );
160175
161176 verify (errorNotifier , times (0 )).triggerError (any ());
162177
163178 // trigger an error to be thrown
164179 doThrow (new RuntimeException ("test" )).when (functionRuntimeManager ).processAssignmentMessage (any ());
165180
181+ // Release the first processing
182+ release .countDown ();
183+
166184 messageList .add (message2 );
167185
168186 try {
0 commit comments