diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java index af64d252a45d9..064949bf4bae5 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java @@ -384,7 +384,7 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState) public ClusterState execute(ClusterState currentState) { invoked3.countDown(); try { - block2.await(); + assertTrue(block2.await(10, TimeUnit.SECONDS)); } catch (InterruptedException e) { fail(); } @@ -397,40 +397,48 @@ public void onFailure(Exception e) { fail(); } }); - invoked3.await(); - - for (int i = 2; i <= 5; i++) { - clusterService.submitUnbatchedStateUpdateTask(Integer.toString(i), new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - return currentState; - } + assertTrue(invoked3.await(10, TimeUnit.SECONDS)); + + try { + for (int i = 2; i <= 5; i++) { + clusterService.submitUnbatchedStateUpdateTask(Integer.toString(i), new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + return currentState; + } + + @Override + public void onFailure(Exception e) { + fail(); + } + }); + } - @Override - public void onFailure(Exception e) { - fail(); - } - }); - } - Thread.sleep(100); + final var startNanoTime = System.nanoTime(); + while (TimeUnit.MILLISECONDS.convert(System.nanoTime() - startNanoTime, TimeUnit.NANOSECONDS) <= 0) { + // noinspection BusyWait + Thread.sleep(100); + } - pendingClusterTasks = clusterService.getMasterService().pendingTasks(); - assertThat(pendingClusterTasks.size(), greaterThanOrEqualTo(5)); - controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5")); - for (PendingClusterTask task : pendingClusterTasks) { - controlSources.remove(task.getSource().string()); - } - assertTrue(controlSources.isEmpty()); + pendingClusterTasks = clusterService.getMasterService().pendingTasks(); + assertThat(pendingClusterTasks.size(), greaterThanOrEqualTo(5)); + controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5")); + for (PendingClusterTask task : pendingClusterTasks) { + controlSources.remove(task.getSource().string()); + } + assertTrue(controlSources.isEmpty()); - response = internalCluster().coordOnlyNodeClient().admin().cluster().preparePendingClusterTasks().get(); - assertThat(response.pendingTasks().size(), greaterThanOrEqualTo(5)); - controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5")); - for (PendingClusterTask task : response) { - if (controlSources.remove(task.getSource().string())) { - assertThat(task.getTimeInQueueInMillis(), greaterThan(0L)); + response = internalCluster().coordOnlyNodeClient().admin().cluster().preparePendingClusterTasks().get(); + assertThat(response.pendingTasks().size(), greaterThanOrEqualTo(5)); + controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5")); + for (PendingClusterTask task : response) { + if (controlSources.remove(task.getSource().string())) { + assertThat(task.getTimeInQueueInMillis(), greaterThan(0L)); + } } + assertTrue(controlSources.isEmpty()); + } finally { + block2.countDown(); } - assertTrue(controlSources.isEmpty()); - block2.countDown(); } }