diff --git a/esrally/metrics.py b/esrally/metrics.py index 84dcd4762..d36ef8225 100644 --- a/esrally/metrics.py +++ b/esrally/metrics.py @@ -46,6 +46,7 @@ def __init__(self, client, cluster_version=None): self._client = client self.logger = logging.getLogger(__name__) self._cluster_version = cluster_version + self.retryable_status_codes = [502, 503, 504, 429] # TODO #1335: Use version-specific support for metrics stores after 7.8.0. def probe_version(self): @@ -171,8 +172,29 @@ def guarded(self, target, *args, **kwargs): ) self.logger.exception(msg) raise exceptions.SystemSetupError(msg) + except elasticsearch.helpers.BulkIndexError as e: + for err in e.errors: + err_type = err.get("index", {}).get("error", {}).get("type", None) + if err.get("index", {}).get("status", None) not in self.retryable_status_codes: + msg = f"Unretryable error encountered when sending metrics to remote metrics store: [{err_type}]" + self.logger.exception("%s - Full error(s) [%s]", msg, str(e.errors)) + raise exceptions.RallyError(msg) + + if execution_count <= max_execution_count: + self.logger.debug( + "Error in sending metrics to remote metrics store [%s] in attempt [%d/%d]. Sleeping for [%f] seconds.", + e, + execution_count, + max_execution_count, + time_to_sleep, + ) + time.sleep(time_to_sleep) + else: + msg = f"Failed to send metrics to remote metrics store: [{e.errors}]" + self.logger.exception("%s - Full error(s) [%s]", msg, str(e.errors)) + raise exceptions.RallyError(msg) except ApiError as e: - if e.status_code in (502, 503, 504, 429) and execution_count <= max_execution_count: + if e.status_code in self.retryable_status_codes and execution_count <= max_execution_count: self.logger.debug( "%s (code: %d) in attempt [%d/%d]. Sleeping for [%f] seconds.", e.error, @@ -451,9 +473,9 @@ def close(self): metrics on close (in order to avoid additional latency during the benchmark). """ self.logger.info("Closing metrics store.") + self.opened = False self.flush() self._clear_meta_info() - self.opened = False def add_meta_info(self, scope, scope_key, key, value): """ diff --git a/tests/metrics_test.py b/tests/metrics_test.py index f94ca88cd..cf7dbcc93 100644 --- a/tests/metrics_test.py +++ b/tests/metrics_test.py @@ -28,6 +28,7 @@ from unittest import mock import elasticsearch.exceptions +import elasticsearch.helpers import pytest from esrally import config, exceptions, metrics, paths, track @@ -231,7 +232,59 @@ def raise_error(self): err = elasticsearch.exceptions.ApiError("unit-test", meta=TestEsClient.ApiResponseMeta(status=self.status_code), body={}) raise err - retriable_errors = [ApiError(429), ApiError(502), ApiError(503), ApiError(504), ConnectionError(), ConnectionTimeout()] + class BulkIndexError: + def __init__(self, errors): + self.errors = errors + self.error_message = f"{len(self.errors)} document(s) failed to index" + + def logging_statements(self, retries): + logging_statements = [] + for i, v in enumerate(range(retries)): + logging_statements.append( + "Error in sending metrics to remote metrics store [%s] in attempt [%d/%d]. Sleeping for [%f] seconds." + % ( + self.error_message, + i + 1, + max_retry, + sleep_slots[v], + ) + ) + logging_statements.append( + f"Failed to send metrics to remote metrics store: [{self.errors}] - Full error(s) [{self.errors}]" + ) + return logging_statements + + def raise_error(self): + raise elasticsearch.helpers.BulkIndexError(self.error_message, self.errors) + + bulk_index_errors = [ + { + "index": { + "_index": "rally-metrics-2023-04", + "_id": "dffAc4cBOnIJ2Omtflwg", + "status": 429, + "error": { + "type": "circuit_breaking_exception", + "reason": "[parent] Data too large, data for [] would be [123848638/118.1mb], " + "which is larger than the limit of [123273216/117.5mb], real usage: [120182112/114.6mb], " + "new bytes reserved: [3666526/3.4mb]", + "bytes_wanted": 123848638, + "bytes_limit": 123273216, + "durability": "TRANSIENT", + }, + } + }, + ] + + retryable_errors = [ + ApiError(429), + ApiError(502), + ApiError(503), + ApiError(504), + ConnectionError(), + ConnectionTimeout(), + BulkIndexError(bulk_index_errors), + ] max_retry = 10 @@ -247,7 +300,7 @@ def raise_error(self): exepcted_logger_calls = [] expected_sleep_calls = [] - for e in retriable_errors: + for e in retryable_errors: exepcted_logger_calls += e.logging_statements(max_retry) expected_sleep_calls += [mock.call(int(sleep_slots[i])) for i in range(0, max_retry)] @@ -302,6 +355,60 @@ def raise_unknown_error(): "store on host [127.0.0.1] at port [9243]." ) + def test_raises_rally_error_on_unretryable_bulk_indexing_errors(self): + bulk_index_errors = [ + { + "index": { + "_index": "rally-metrics-2023-04", + "_id": "dffAc4cBOnIJ2Omtflwg", + "status": 429, + "error": { + "type": "circuit_breaking_exception", + "reason": "[parent] Data too large, data for [] would be [123848638/118.1mb], " + "which is larger than the limit of [123273216/117.5mb], real usage: [120182112/114.6mb], " + "new bytes reserved: [3666526/3.4mb]", + "bytes_wanted": 123848638, + "bytes_limit": 123273216, + "durability": "TRANSIENT", + }, + } + }, + { + "index": { + "_id": "1", + "_index": "rally-metrics-2023-04", + "error": {"type": "version_conflict_engine_exception"}, + "status": 409, + } + }, + { + "index": { + "_index": "rally-metrics-2023-04", + "_id": "dffAc4cBOnIJ2Omtflwg", + "status": 400, + "error": { + "type": "mapper_parsing_exception", + "reason": "failed to parse field [meta.error-description] of type [keyword] in document with id " + "'dffAc4cBOnIJ2Omtflwg'. Preview of field's value: 'HTTP status: 400, message: failed to parse " + "field [@timestamp] of type [date] in document with id '-PXAc4cBOnIJ2OmtX33J'. Preview of " + "field's value: '1998-04-30T15:02:56-05:00'", + }, + } + }, + ] + + def raise_bulk_index_error(): + err = elasticsearch.helpers.BulkIndexError(f"{len(bulk_index_errors)} document(s) failed to index", bulk_index_errors) + raise err + + client = metrics.EsClient(self.ClientMock([{"host": "127.0.0.1", "port": "9243"}])) + + with pytest.raises( + exceptions.RallyError, + match=(r"Unretryable error encountered when sending metrics to remote metrics store: \[version_conflict_engine_exception\]"), + ): + client.guarded(raise_bulk_index_error) + class TestEsMetrics: RACE_TIMESTAMP = datetime.datetime(2016, 1, 31)