Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/telemetry.rst
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ Supported telemetry parameters:

* ``node-stats-sample-interval`` (default: 1): A positive number greater than zero denoting the sampling interval in seconds.
* ``node-stats-include-indices`` (default: ``false``): A boolean indicating whether index stats should be included.
* ``node-stats-include-indices-metrics`` (default: ``docs,store,indexing,search,merges,query_cache,fielddata,segments,translog,request_cache``): A comma-separated string specifying the Index stats metrics to include. This is useful, for example, to restrict the collected Index stats metrics. Specifying this parameter implicitly enables collection of Index stats, so you don't also need to specify ``node-stats-include-indices: true``.
* ``node-stats-include-indices-metrics`` (default: ``docs,store,indexing,search,merges,query_cache,fielddata,segments,translog,request_cache``): A comma-separated string or a list specifying the Index stats metrics to include. This is useful, for example, to restrict the collected Index stats metrics. Specifying this parameter implicitly enables collection of Index stats, so you don't also need to specify ``node-stats-include-indices: true``.

Example: ``--telemetry-params="node-stats-include-indices-metrics:'docs'"`` will **only** collect the ``docs`` metrics from Index stats. If you want to use multiple fields, pass a JSON file to ``telemetry-params`` (see the :ref:`command line reference <clr_telemetry_params>` for details).
* ``node-stats-include-thread-pools`` (default: ``true``): A boolean indicating whether thread pool stats should be included.
Expand Down Expand Up @@ -300,7 +300,7 @@ The disk-usage-stats telemetry device runs the `_disk_usage <https://www.elastic

Supported telemetry parameters:

* ``disk-usage-stats-indices`` (default all indices in the track): Comma separated list of indices who's disk usage to fetch.
* ``disk-usage-stats-indices`` (default all indices in the track): Comma separated string or a list of indices who's disk usage to fetch.

Example::

Expand Down
10 changes: 7 additions & 3 deletions esrally/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -825,13 +825,15 @@ def __init__(self, telemetry_params, cluster_name, client, metrics_store):
if self.include_indices_metrics:
if isinstance(self.include_indices_metrics, str):
self.include_indices_metrics_list = opts.csv_to_list(self.include_indices_metrics)
elif isinstance(self.include_indices_metrics, list):
self.include_indices_metrics_list = self.include_indices_metrics
else:
# we don't validate the allowable metrics as they may change across ES versions
raise exceptions.SystemSetupError(
"The telemetry parameter 'node-stats-include-indices-metrics' must be a comma-separated string but was {}".format(
type(self.include_indices_metrics)
)
"The telemetry parameter 'node-stats-include-indices-metrics' must be a comma-separated string"
" or a list but was {}".format(type(self.include_indices_metrics))
)
self.logger.debug("Including indices metrics: %s", self.include_indices_metrics_list)
else:
self.include_indices_metrics_list = [
"docs",
Expand Down Expand Up @@ -2345,6 +2347,8 @@ def on_benchmark_start(self):
)
self.logger.exception(msg)
raise exceptions.RallyError(msg)
if isinstance(self.indices, list):
self.indices = ",".join(self.indices)

def on_benchmark_stop(self):
# pylint: disable=import-outside-toplevel
Expand Down
47 changes: 44 additions & 3 deletions tests/telemetry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3025,7 +3025,7 @@ def test_stores_selected_indices_metrics_from_nodes_stats(self, metrics_store_pu
meta_data=metrics_store_meta_data,
)

def test_exception_when_include_indices_metrics_not_valid(self):
def test_exception_when_included_indices_metrics_not_valid(self):
node_stats_response = {}

client = Client(nodes=SubClient(stats=node_stats_response))
Expand All @@ -3034,10 +3034,33 @@ def test_exception_when_include_indices_metrics_not_valid(self):
telemetry_params = {"node-stats-include-indices-metrics": {"bad": "input"}}
with pytest.raises(
exceptions.SystemSetupError,
match="The telemetry parameter 'node-stats-include-indices-metrics' must be a comma-separated string but was <class 'dict'>",
match="The telemetry parameter 'node-stats-include-indices-metrics' must be a comma-separated string"
" or a list but was <class 'dict'>",
):
telemetry.NodeStatsRecorder(telemetry_params, cluster_name="remote", client=client, metrics_store=metrics_store)

def test_comma_seperated_string_of_included_indices_is_accepted(self):
logger = logging.getLogger("esrally.telemetry")
node_stats_response = {}
client = Client(nodes=SubClient(stats=node_stats_response))
cfg = create_config()
metrics_store = metrics.EsMetricsStore(cfg)
telemetry_params = {"node-stats-include-indices-metrics": "my-index-one,my-index-two"}
with mock.patch.object(logger, "debug") as mocked_debug:
telemetry.NodeStatsRecorder(telemetry_params, cluster_name="remote", client=client, metrics_store=metrics_store)
mocked_debug.assert_called_once_with("Including indices metrics: %s", ["my-index-one", "my-index-two"])

def test_list_of_includes_indices_is_accepted(self):
logger = logging.getLogger("esrally.telemetry")
node_stats_response = {}
client = Client(nodes=SubClient(stats=node_stats_response))
cfg = create_config()
metrics_store = metrics.EsMetricsStore(cfg)
telemetry_params = {"node-stats-include-indices-metrics": ["my-index-one", "my-index-two"]}
with mock.patch.object(logger, "debug") as mocked_debug:
telemetry.NodeStatsRecorder(telemetry_params, cluster_name="remote", client=client, metrics_store=metrics_store)
mocked_debug.assert_called_once_with("Including indices metrics: %s", ["my-index-one", "my-index-two"])

@mock.patch("esrally.metrics.EsMetricsStore.put_doc")
def test_logs_debug_on_missing_cgroup_stats(self, metrics_store_put_doc):
node_stats_response = {
Expand Down Expand Up @@ -4876,7 +4899,7 @@ def test_uses_indices_param_if_specified_instead_of_index_names(self, es):
)

@mock.patch("elasticsearch.Elasticsearch")
def test_uses_indices_param_if_specified_instead_of_data_stream_names(self, es):
def test_uses_indices_param_as_csv_if_specified_instead_of_data_stream_names(self, es):
cfg = create_config()
metrics_store = metrics.EsMetricsStore(cfg)
es.indices.disk_usage.return_value = {"_shards": {"failed": 0}}
Expand All @@ -4893,6 +4916,24 @@ def test_uses_indices_param_if_specified_instead_of_data_stream_names(self, es):
]
)

@mock.patch("elasticsearch.Elasticsearch")
def test_uses_indices_param_as_list_if_specified_instead_of_data_stream_names(self, es):
cfg = create_config()
metrics_store = metrics.EsMetricsStore(cfg)
es.indices.disk_usage.return_value = {"_shards": {"failed": 0}}
device = telemetry.DiskUsageStats(
{"disk-usage-stats-indices": ["foo", "bar"]}, es, metrics_store, index_names=[], data_stream_names=["baz"]
)
t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device])
t.on_benchmark_start()
t.on_benchmark_stop()
es.indices.disk_usage.assert_has_calls(
[
call(index="foo", run_expensive_tasks=True),
call(index="bar", run_expensive_tasks=True),
]
)

@mock.patch("esrally.metrics.EsMetricsStore.put_value_cluster_level")
@mock.patch("elasticsearch.Elasticsearch")
def test_error_on_retrieval_does_not_store_metrics(self, es, metrics_store_cluster_level, caplog):
Expand Down