Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -695,11 +695,18 @@ public void testConsumerReinitializationWithNoInitialMessages() throws Exception
// Verify no messages processed
verify(processor, never()).process(any(), any());

// Request consumer reinitialization
// Request consumer reinitialization and wait for it to complete before adding messages
IngestionShardConsumer oldConsumer = poller.getConsumer();
IngestionSource mockIngestionSource = new IngestionSource.Builder("test").build();
poller.requestConsumerReinitialization(mockIngestionSource);

// Add a message
assertBusy(() -> {
IngestionShardConsumer currentConsumer = poller.getConsumer();
assertNotNull(currentConsumer);
assertNotSame(oldConsumer, currentConsumer);
}, 30, TimeUnit.SECONDS);

// Add a message after reinitialization is complete
messages.add("{\"_id\":\"1\",\"_source\":{\"name\":\"bob\", \"age\": 24}}".getBytes(StandardCharsets.UTF_8));

// Wait for the message to be processed
Expand Down
Loading