diff --git a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/BulkByScrollUsesAllScrollDocumentsAfterConflictsIntegTests.java b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/BulkByScrollUsesAllScrollDocumentsAfterConflictsIntegTests.java index a1aceb68967b3..a06f9a0ac2b45 100644 --- a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/BulkByScrollUsesAllScrollDocumentsAfterConflictsIntegTests.java +++ b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/BulkByScrollUsesAllScrollDocumentsAfterConflictsIntegTests.java @@ -96,9 +96,9 @@ public void setUpCluster() { internalCluster().startMasterOnlyNode(); // Use a single thread pool for writes so we can enforce a consistent ordering internalCluster().startDataOnlyNode(Settings.builder().put("thread_pool.write.size", 1).build()); + internalCluster().startCoordinatingOnlyNode(Settings.EMPTY); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/79300") public void testUpdateByQuery() throws Exception { final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); final boolean scriptEnabled = randomBoolean(); @@ -112,7 +112,6 @@ public void testUpdateByQuery() throws Exception { }); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/79300") public void testReindex() throws Exception { final String sourceIndex = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); final String targetIndex = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); @@ -135,7 +134,6 @@ public void testReindex() throws Exception { }); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/79300") public void testDeleteByQuery() throws Exception { final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); executeConcurrentUpdatesOnSubsetOfDocs(indexName, @@ -217,7 +215,12 @@ Self extends AbstractBulkByScrollRequestBuilder> void executeConcurrent } // The bulk request is enqueued before the update by query - final ActionFuture bulkFuture = client().bulk(conflictingUpdatesBulkRequest); + // Since #77731 TransportBulkAction is dispatched into the Write thread pool, + // this test makes use of a deterministic task order in the data node write + // thread pool. To ensure that ordering, execute the TransportBulkAction + // in a coordinator node preventing that additional tasks are scheduled into + // the data node write thread pool. + final ActionFuture bulkFuture = internalCluster().coordOnlyNodeClient().bulk(conflictingUpdatesBulkRequest); // Ensure that the concurrent writes are enqueued before the update by query request is sent assertBusy(() -> assertThat(writeThreadPool.getQueue().size(), equalTo(1)));