Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
ba92b33
Set the current version as 2.13.0.dev
fressi-elastic Mar 30, 2026
9cf495d
Improve integration tests stability
fressi-elastic Mar 30, 2026
6ea9439
Skip metrics store install when ES already runs on IT port
fressi-elastic Mar 30, 2026
62ca69a
Merge branch 'master' of github.com:elastic/rally into improve-integr…
fressi-elastic Mar 30, 2026
3ec3726
Refactor TestCluster to improve cluster detection
fressi-elastic Mar 30, 2026
7a3f2af
Isolate IT git global config in session fixture
fressi-elastic Mar 30, 2026
25ac645
Apply suggestion from @Copilot
fressi-elastic Mar 30, 2026
3e6e479
Merge branch 'master' of github.com:elastic/rally into improve-integr…
fressi-elastic Mar 31, 2026
633c8ac
Derive cluster health client timeout from server wait
fressi-elastic Mar 31, 2026
8e8d5a7
Address PR review: probe retries and protocol error cap
fressi-elastic Mar 31, 2026
1c714d4
Gate tar extractall filter on Python 3.12+
fressi-elastic Mar 31, 2026
934b5aa
Fix mypy TarFile.extractall typing in _do_tar_decompress
fressi-elastic Mar 31, 2026
5de4a3d
it: fail wait_for_cluster_health when cluster.health returns timed_out
fressi-elastic Mar 31, 2026
4705b22
Add docstrings, comments, and type hints for IT stability PR
fressi-elastic Mar 31, 2026
bc6066f
fix(it): stabilize metrics store bootstrap and git config path
fressi-elastic Mar 31, 2026
6f97292
Document tar extractall limitation on Python <3.12
fressi-elastic Mar 31, 2026
8803bed
Stabilize integration tests around benchmark HTTP port
fressi-elastic Mar 31, 2026
f620ade
Pass linters tests
fressi-elastic Mar 31, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 58 additions & 6 deletions esrally/client/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import contextlib
import logging
import time
from typing import Any

import certifi
from urllib3.connection import is_ipaddress
Expand Down Expand Up @@ -275,10 +276,43 @@ async def on_request_end(session, trace_config_ctx, params):
return async_client


def wait_for_rest_layer(es, max_attempts=40):
# Fail fast on persistent protocol-style connection errors (e.g. HTTP/HTTPS mismatch) instead of
# burning the full max_attempts budget (~minutes with 3s sleeps).
_MAX_CONSECUTIVE_PROTOCOL_CONNECTION_ERRORS = 8


def _connection_error_is_protocol_like(e: Exception) -> bool:
"""
Return whether ``e`` represents a urllib3-style protocol failure.

Protocol errors are retried a limited number of times (startup transients) but abort early
if they persist, so HTTP/HTTPS mismatches do not consume the full ``max_attempts`` budget.

:param e: A ``ConnectionError`` from the Elasticsearch client transport layer.
:return: True if the error message or nested ``errors`` include ``ProtocolError``.
"""
if "ProtocolError" in str(e):
return True
for err in getattr(e, "errors", None) or []:
if err is None:
continue
# pylint: disable=import-outside-toplevel
import urllib3.exceptions

if isinstance(err, urllib3.exceptions.ProtocolError):
return True
return False


def wait_for_rest_layer(es: Any, max_attempts: int = 40) -> bool:
"""
Waits for ``max_attempts`` until Elasticsearch's REST API is available.

Repeated urllib3 protocol errors (common while nodes start, or when the URL scheme is wrong)
are retried up to :data:`_MAX_CONSECUTIVE_PROTOCOL_CONNECTION_ERRORS` times in a row before
raising :class:`~esrally.exceptions.SystemSetupError`, instead of sleeping through all
``max_attempts``.

:param es: Elasticsearch client to use for connecting.
:param max_attempts: The maximum number of attempts to check whether the REST API is available.
:return: True iff Elasticsearch's REST API is available.
Expand All @@ -289,6 +323,7 @@ def wait_for_rest_layer(es, max_attempts=40):
expected_node_count = len(es.transport.node_pool)
logger = logging.getLogger(__name__)
attempt = 0
consecutive_protocol_connection_errors = 0
while attempt <= max_attempts:
attempt += 1
# pylint: disable=import-outside-toplevel
Expand All @@ -313,32 +348,49 @@ def wait_for_rest_layer(es, max_attempts=40):
)

if attempt <= max_attempts:
consecutive_protocol_connection_errors = 0
logger.debug("Got serialization error [%s] on attempt [%s]. Sleeping...", e, attempt)
time.sleep(3)
else:
raise
except TlsError as e:
raise exceptions.SystemSetupError("Could not connect to cluster via HTTPS. Are you sure this is an HTTPS endpoint?", e)
except ConnectionError as e:
if "ProtocolError" in str(e):
is_protocol = _connection_error_is_protocol_like(e)
if is_protocol:
consecutive_protocol_connection_errors += 1
if consecutive_protocol_connection_errors > _MAX_CONSECUTIVE_PROTOCOL_CONNECTION_ERRORS:
raise exceptions.SystemSetupError(
"Received a protocol error. Are you sure you're using the correct scheme (HTTP or HTTPS)?", e
)
else:
consecutive_protocol_connection_errors = 0
if attempt <= max_attempts:
if is_protocol:
logger.debug(
"Got protocol error on attempt [%s] (often transient while Elasticsearch starts). Sleeping...",
attempt,
)
else:
logger.debug("Got connection error on attempt [%s]. Sleeping...", attempt)
time.sleep(3)
elif is_protocol:
raise exceptions.SystemSetupError(
"Received a protocol error. Are you sure you're using the correct scheme (HTTP or HTTPS)?", e
)

if attempt <= max_attempts:
logger.debug("Got connection error on attempt [%s]. Sleeping...", attempt)
time.sleep(3)
else:
raise
except TransportError as e:
if attempt <= max_attempts:
consecutive_protocol_connection_errors = 0
logger.debug("Got transport error on attempt [%s]. Sleeping...", attempt)
time.sleep(3)
else:
raise
except ApiError as e:
# cluster block, x-pack not initialized yet, our wait condition is not reached
if e.status_code in (503, 401, 408) and attempt <= max_attempts:
consecutive_protocol_connection_errors = 0
logger.debug("Got status code [%s] on attempt [%s]. Sleeping...", e.message, attempt)
time.sleep(3)
else:
Expand Down
1 change: 0 additions & 1 deletion esrally/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ def install_default_log_config():
io.ensure_dir(paths.logs())


# pylint: disable=unused-argument
def configure_file_handler(*, filename: str, encoding: str = "UTF-8", delay: bool = False, **kwargs: Any) -> logging.Handler:
"""
Configures the WatchedFileHandler supporting expansion of `~` and `${LOG_PATH}` to the user's home and the log path respectively.
Expand Down
16 changes: 15 additions & 1 deletion esrally/utils/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import os
import shutil
import subprocess
import sys
import tarfile
import zipfile
from collections.abc import Collection, Mapping, Sequence
Expand Down Expand Up @@ -396,8 +397,21 @@ def _do_decompress_manually_with_lib(target_directory: str, filename: str, compr


def _do_tar_decompress(target_directory: str, compressed_file: tarfile.TarFile) -> None:
"""
Extract a tar archive into ``target_directory`` and close the handle.

On Python 3.12+, use :meth:`tarfile.TarFile.extractall` ``filter="tar"`` (PEP 706) so extraction
follows the documented tar safety profile; older interpreters use the legacy default, which does
not apply PEP 706 member-path filtering.
"""
try:
compressed_file.extractall(path=target_directory, filter="tar")
if sys.version_info >= (3, 12):
compressed_file.extractall(path=target_directory, filter="tar")
else:
# ``extractall`` has no ``filter`` argument before Python 3.12. Without PEP 706's "tar"
# profile, member names can escape ``target_directory`` (e.g. via ``../``), which is
# path traversal if the archive is not from a trusted source.
compressed_file.extractall(path=target_directory)
except Exception:
raise RuntimeError(f"Could not decompress provided archive [{compressed_file.name!r}]. Please check if it is a valid tar file.")
finally:
Expand Down
Loading
Loading