Skip to content

Commit 6ed149e

Browse files
author
Rick Boyd
authored
Remove buggy indices.exists() calls from code (#1580)
Recently we've had a few errors surface in Rally executions when indices/data-streams are deleted. The errors would be reported as connection timed out for the DELETE /<index> call, but when inspected (via a packet capture) the server was shown to have responded correctly and in a timely fashion to Rally. It turns out that in our version of elasticsearch-py, indices.exists() calls were handled via GET rather than HEAD. It's unclear to me why this causes an issue, but any of the following fixes the issue when targeting very large indices with the delete-index operation: Reduce the body size of the index (e.g. reduce the number of mappings) Set only-if-exists to false in the operation (to avoid the exists() call) Change the exists() call to a get() call. I'm led to believe the bytes from the body are left in the event loop and not dequeued properly by the client, hence blocking somehow the response from the very next transaction. This PR changes from using the exists() method to using the get() method to determine an index's existence.
1 parent aa6ccb8 commit 6ed149e

2 files changed

Lines changed: 45 additions & 25 deletions

File tree

esrally/driver/runner.py

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1371,10 +1371,15 @@ async def __call__(self, es, params):
13711371
if not only_if_exists:
13721372
await es.indices.delete(index=index_name, params=request_params)
13731373
ops += 1
1374-
elif only_if_exists and await es.indices.exists(index=index_name):
1375-
self.logger.info("Index [%s] already exists. Deleting it.", index_name)
1376-
await es.indices.delete(index=index_name, params=request_params)
1377-
ops += 1
1374+
elif only_if_exists:
1375+
# here we use .get() and check for 404 instead of exists due to a bug in some versions
1376+
# of elasticsearch-py/elastic-transport with HEAD calls.
1377+
# can change back once using elasticsearch-py >= 8.0.0 and elastic-transport >= 8.1.0
1378+
get_response = await es.indices.get(index=index_name, ignore=[404])
1379+
if not get_response.get("status") == 404:
1380+
self.logger.info("Index [%s] already exists. Deleting it.", index_name)
1381+
await es.indices.delete(index=index_name, params=request_params)
1382+
ops += 1
13781383
finally:
13791384
await set_destructive_requires_name(es, prior_destructive_setting)
13801385
return {
@@ -1403,10 +1408,15 @@ async def __call__(self, es, params):
14031408
if not only_if_exists:
14041409
await es.indices.delete_data_stream(name=data_stream, ignore=[404], params=request_params)
14051410
ops += 1
1406-
elif only_if_exists and await es.indices.exists(index=data_stream):
1407-
self.logger.info("Data stream [%s] already exists. Deleting it.", data_stream)
1408-
await es.indices.delete_data_stream(name=data_stream, params=request_params)
1409-
ops += 1
1411+
elif only_if_exists:
1412+
# here we use .get() and check for 404 instead of exists due to a bug in some versions
1413+
# of elasticsearch-py/elastic-transport with HEAD calls.
1414+
# can change back once using elasticsearch-py >= 8.0.0 and elastic-transport >= 8.1.0
1415+
get_response = await es.indices.get(index=data_stream, ignore=[404])
1416+
if not get_response.get("status") == 404:
1417+
self.logger.info("Data stream [%s] already exists. Deleting it.", data_stream)
1418+
await es.indices.delete_data_stream(name=data_stream, params=request_params)
1419+
ops += 1
14101420

14111421
return {
14121422
"weight": ops,
@@ -1455,10 +1465,15 @@ async def __call__(self, es, params):
14551465
if not only_if_exists:
14561466
await es.cluster.delete_component_template(name=template_name, params=request_params, ignore=[404])
14571467
ops_count += 1
1458-
elif only_if_exists and await es.cluster.exists_component_template(name=template_name):
1459-
self.logger.info("Component Index template [%s] already exists. Deleting it.", template_name)
1460-
await es.cluster.delete_component_template(name=template_name, params=request_params)
1461-
ops_count += 1
1468+
elif only_if_exists:
1469+
# here we use .get() and check for 404 instead of exists_component_template due to a bug in some versions
1470+
# of elasticsearch-py/elastic-transport with HEAD calls.
1471+
# can change back once using elasticsearch-py >= 8.0.0 and elastic-transport >= 8.1.0
1472+
component_template_exists = await es.cluster.get_component_template(name=template_name, ignore=[404])
1473+
if not component_template_exists.get("status") == 404:
1474+
self.logger.info("Component Index template [%s] already exists. Deleting it.", template_name)
1475+
await es.cluster.delete_component_template(name=template_name, params=request_params)
1476+
ops_count += 1
14621477
return {
14631478
"weight": ops_count,
14641479
"unit": "ops",
@@ -1505,10 +1520,15 @@ async def __call__(self, es, params):
15051520
if not only_if_exists:
15061521
await es.indices.delete_index_template(name=template_name, params=request_params, ignore=[404])
15071522
ops_count += 1
1508-
elif only_if_exists and await es.indices.exists_index_template(name=template_name):
1509-
self.logger.info("Composable Index template [%s] already exists. Deleting it.", template_name)
1510-
await es.indices.delete_index_template(name=template_name, params=request_params)
1511-
ops_count += 1
1523+
elif only_if_exists:
1524+
# here we use .get() and check for 404 instead of exists_index_template due to a bug in some versions
1525+
# of elasticsearch-py/elastic-transport with HEAD calls.
1526+
# can change back once using elasticsearch-py >= 8.0.0 and elastic-transport >= 8.1.0
1527+
index_template_exists = await es.indices.get_index_template(name=template_name, ignore=[404])
1528+
if not index_template_exists.get("status") == 404:
1529+
self.logger.info("Composable Index template [%s] already exists. Deleting it.", template_name)
1530+
await es.indices.delete_index_template(name=template_name, params=request_params)
1531+
ops_count += 1
15121532
# ensure that we do not provide an empty index pattern by accident
15131533
if delete_matching_indices and index_pattern:
15141534
await es.indices.delete(index=index_pattern)
@@ -1560,7 +1580,10 @@ async def __call__(self, es, params):
15601580
if not only_if_exists:
15611581
await es.indices.delete_template(name=template_name, params=request_params)
15621582
ops_count += 1
1563-
elif only_if_exists and await es.indices.exists_template(name=template_name):
1583+
# here we use .get_template() and check for empty instead of exists_template due to a bug in some versions
1584+
# of elasticsearch-py/elastic-transport with HEAD calls.
1585+
# can change back once using elasticsearch-py >= 8.0.0 and elastic-transport >= 8.1.0
1586+
elif only_if_exists and await es.indices.get_template(name=template_name, ignore=[404]):
15641587
self.logger.info("Index template [%s] already exists. Deleting it.", template_name)
15651588
await es.indices.delete_template(name=template_name, params=request_params)
15661589
ops_count += 1

tests/driver/runner_test.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2712,7 +2712,7 @@ class TestDeleteIndexRunner:
27122712
@mock.patch("elasticsearch.Elasticsearch")
27132713
@pytest.mark.asyncio
27142714
async def test_deletes_existing_indices(self, es):
2715-
es.indices.exists = mock.AsyncMock(side_effect=[False, True])
2715+
es.indices.get = mock.AsyncMock(side_effect=[{"status": 404}, {"status": 200}])
27162716
es.indices.delete = mock.AsyncMock()
27172717
es.cluster.get_settings = mock.AsyncMock(return_value={"persistent": {}, "transient": {"action.destructive_requires_name": True}})
27182718
es.cluster.put_settings = mock.AsyncMock()
@@ -2777,7 +2777,7 @@ class TestDeleteDataStreamRunner:
27772777
@mock.patch("elasticsearch.Elasticsearch")
27782778
@pytest.mark.asyncio
27792779
async def test_deletes_existing_data_streams(self, es):
2780-
es.indices.exists = mock.AsyncMock(side_effect=[False, True])
2780+
es.indices.get = mock.AsyncMock(side_effect=[{"status": 404}, {"status": 200}])
27812781
es.indices.delete_data_stream = mock.AsyncMock()
27822782

27832783
r = runner.DeleteDataStream()
@@ -2907,7 +2907,7 @@ async def test_deletes_all_index_templates(self, es):
29072907
@mock.patch("elasticsearch.Elasticsearch")
29082908
@pytest.mark.asyncio
29092909
async def test_deletes_only_existing_index_templates(self, es):
2910-
es.indices.exists_template = mock.AsyncMock(side_effect=[False, True])
2910+
es.indices.get_template = mock.AsyncMock(side_effect=[False, True])
29112911
es.indices.delete_template = mock.AsyncMock()
29122912
es.indices.delete = mock.AsyncMock()
29132913

@@ -3038,10 +3038,7 @@ async def test_deletes_all_index_templates(self, es):
30383038
@mock.patch("elasticsearch.Elasticsearch")
30393039
@pytest.mark.asyncio
30403040
async def test_deletes_only_existing_index_templates(self, es):
3041-
async def _side_effect(name):
3042-
return name == "templateB"
3043-
3044-
es.cluster.exists_component_template = mock.AsyncMock(side_effect=_side_effect)
3041+
es.cluster.get_component_template = mock.AsyncMock(side_effect=[{"status": 404}, {"status": 200}])
30453042
es.cluster.delete_component_template = mock.AsyncMock()
30463043

30473044
r = runner.DeleteComponentTemplate()
@@ -3201,7 +3198,7 @@ async def test_deletes_all_index_templates(self, es):
32013198
@mock.patch("elasticsearch.Elasticsearch")
32023199
@pytest.mark.asyncio
32033200
async def test_deletes_only_existing_index_templates(self, es):
3204-
es.indices.exists_index_template = mock.AsyncMock(side_effect=[False, True])
3201+
es.indices.get_index_template = mock.AsyncMock(side_effect=[{"status": 404}, {"status": 200}])
32053202
es.indices.delete_index_template = mock.AsyncMock()
32063203

32073204
r = runner.DeleteComposableTemplate()

0 commit comments

Comments
 (0)