diff --git a/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java b/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java index 7979c2c86dec1..adc8582218ed5 100644 --- a/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java +++ b/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java @@ -695,11 +695,18 @@ public void testConsumerReinitializationWithNoInitialMessages() throws Exception // Verify no messages processed verify(processor, never()).process(any(), any()); - // Request consumer reinitialization + // Request consumer reinitialization and wait for it to complete before adding messages + IngestionShardConsumer oldConsumer = poller.getConsumer(); IngestionSource mockIngestionSource = new IngestionSource.Builder("test").build(); poller.requestConsumerReinitialization(mockIngestionSource); - // Add a message + assertBusy(() -> { + IngestionShardConsumer currentConsumer = poller.getConsumer(); + assertNotNull(currentConsumer); + assertNotSame(oldConsumer, currentConsumer); + }, 30, TimeUnit.SECONDS); + + // Add a message after reinitialization is complete messages.add("{\"_id\":\"1\",\"_source\":{\"name\":\"bob\", \"age\": 24}}".getBytes(StandardCharsets.UTF_8)); // Wait for the message to be processed