Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3132,6 +3132,40 @@ Meta-data

The operation returns no meta-data.

esql
~~~~~~~~~~~~~

With the operation type ``esql`` you can execute `ES|QL query <https://www.elastic.co/guide/en/elasticsearch/reference/master/esql.html>`_.

Properties
""""""""""

* ``query`` (mandatory): An ES|QL query starts with a source command followed processing commands.
* ``filter`` (optional): A query filter defined in `Elasticsearch query DSL <https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html>`_.
* ``body`` (optional): The query body.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The description here should indicate that this is used for anything additions to put into the body, since the query will already be placed there. For example, we use this for pragma.


Example::

{
"name": "default",
"operation-type": "esql",
"query": "FROM logs-* | STATS count=count(*) BY agent.hostname | SORT count DESC | LIMIT 20",
"filter": {
"range": {
"timestamp": {
"gte": "now-1d/d",
"lte": "now/d"
}
}
}
}

Meta-data
"""""""""

* ``weight``: "weight" of an operation, in this case the number of retrieved pages.
* ``unit``: The unit in which to interpret ``weight``, in this case ``pages``.
* ``success``: A boolean indicating whether the query has succeeded.

.. _track_dependencies:

Expand Down
23 changes: 23 additions & 0 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def register_default_runners():
register_runner(track.OperationType.ClosePointInTime, ClosePointInTime(), async_runner=True)
register_runner(track.OperationType.Sql, Sql(), async_runner=True)
register_runner(track.OperationType.FieldCaps, FieldCaps(), async_runner=True)
register_runner(track.OperationType.Esql, Esql(), async_runner=True)

# This is an administrative operation but there is no need for a retry here as we don't issue a request
register_runner(track.OperationType.Sleep, Sleep(), async_runner=True)
Expand Down Expand Up @@ -2829,6 +2830,28 @@ def __repr__(self, *args, **kwargs):
return "field-caps"


class Esql(Runner):
async def __call__(self, es, params):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a couple of high level properties that operations should expose, namely:

  • request-timeout
  • headers
  • opaque-id

You can add support for these by using the Runner._transport_request_params helper method:

def _transport_request_params(params):
"""
Takes all of a runner's params and splits out request parameters, transport
level parameters, and headers into their own respective dicts.
:param params: A hash with all the respective runner's parameters.
:return: A tuple of the specific runner's params, request level parameters, transport level parameters, and headers, respectively.
"""
transport_params = {}
request_params = params.get("request-params", {})
if request_timeout := params.pop("request-timeout", None):
transport_params["request_timeout"] = request_timeout
if (ignore_status := request_params.pop("ignore", None)) or (ignore_status := params.pop("ignore", None)):
transport_params["ignore_status"] = ignore_status
headers = params.pop("headers", None) or {}
if opaque_id := params.pop("opaque-id", None):
headers.update({"x-opaque-id": opaque_id})
return params, request_params, transport_params, headers

An example of usage:

params, request_params, transport_params, headers = self._transport_request_params(params)

params, request_params, transport_params, headers = self._transport_request_params(params)
es = es.options(**transport_params)
query = mandatory(params, "query", self)
body = params.get("body", {})
body["query"] = query
query_filter = params.get("filter")
if query_filter:
body["filter"] = query_filter
if not bool(headers):
# counter-intuitive, but preserves prior behavior
headers = None
# disable eager response parsing - responses might be huge thus skewing results
es.return_raw_response()
await es.perform_request(method="POST", path="/_query", headers=headers, body=body, params=request_params)
return {"success": True, "unit": "ops", "weight": 1}

def __repr__(self, *args, **kwargs):
return "esql"


class RequestTiming(Runner, Delegator):
def __init__(self, delegate):
super().__init__(delegate=delegate)
Expand Down
75 changes: 39 additions & 36 deletions esrally/track/track.py
Original file line number Diff line number Diff line change
Expand Up @@ -701,44 +701,45 @@ class OperationType(Enum):
CompositeAgg = (18, AdminStatus.No, serverless.Status.Public)
WaitForCurrentSnapshotsCreate = (19, AdminStatus.No, serverless.Status.Internal)
Downsample = (20, AdminStatus.No, serverless.Status.Internal)
Esql = (21, AdminStatus.No, serverless.Status.Blocked)

# administrative actions
ForceMerge = (21, AdminStatus.Yes, serverless.Status.Internal)
ClusterHealth = (22, AdminStatus.Yes, serverless.Status.Internal)
PutPipeline = (23, AdminStatus.Yes, serverless.Status.Public)
Refresh = (24, AdminStatus.Yes, serverless.Status.Public)
CreateIndex = (25, AdminStatus.Yes, serverless.Status.Public)
DeleteIndex = (26, AdminStatus.Yes, serverless.Status.Public)
CreateIndexTemplate = (27, AdminStatus.Yes, serverless.Status.Blocked)
DeleteIndexTemplate = (28, AdminStatus.Yes, serverless.Status.Blocked)
ShrinkIndex = (29, AdminStatus.Yes, serverless.Status.Blocked)
CreateMlDatafeed = (30, AdminStatus.Yes, serverless.Status.Public)
DeleteMlDatafeed = (31, AdminStatus.Yes, serverless.Status.Public)
StartMlDatafeed = (32, AdminStatus.Yes, serverless.Status.Public)
StopMlDatafeed = (33, AdminStatus.Yes, serverless.Status.Public)
CreateMlJob = (34, AdminStatus.Yes, serverless.Status.Public)
DeleteMlJob = (35, AdminStatus.Yes, serverless.Status.Public)
OpenMlJob = (36, AdminStatus.Yes, serverless.Status.Public)
CloseMlJob = (37, AdminStatus.Yes, serverless.Status.Public)
Sleep = (38, AdminStatus.Yes, serverless.Status.Public)
DeleteSnapshotRepository = (39, AdminStatus.Yes, serverless.Status.Internal)
CreateSnapshotRepository = (40, AdminStatus.Yes, serverless.Status.Internal)
CreateSnapshot = (41, AdminStatus.Yes, serverless.Status.Internal)
RestoreSnapshot = (42, AdminStatus.Yes, serverless.Status.Internal)
PutSettings = (43, AdminStatus.Yes, serverless.Status.Internal)
CreateTransform = (44, AdminStatus.Yes, serverless.Status.Public)
StartTransform = (45, AdminStatus.Yes, serverless.Status.Public)
WaitForTransform = (46, AdminStatus.Yes, serverless.Status.Public)
DeleteTransform = (47, AdminStatus.Yes, serverless.Status.Public)
CreateDataStream = (48, AdminStatus.Yes, serverless.Status.Public)
DeleteDataStream = (49, AdminStatus.Yes, serverless.Status.Public)
CreateComposableTemplate = (50, AdminStatus.Yes, serverless.Status.Public)
DeleteComposableTemplate = (51, AdminStatus.Yes, serverless.Status.Public)
CreateComponentTemplate = (52, AdminStatus.Yes, serverless.Status.Public)
DeleteComponentTemplate = (53, AdminStatus.Yes, serverless.Status.Public)
TransformStats = (54, AdminStatus.Yes, serverless.Status.Public)
CreateIlmPolicy = (55, AdminStatus.Yes, serverless.Status.Blocked)
DeleteIlmPolicy = (56, AdminStatus.Yes, serverless.Status.Blocked)
ForceMerge = (22, AdminStatus.Yes, serverless.Status.Internal)
ClusterHealth = (23, AdminStatus.Yes, serverless.Status.Internal)
PutPipeline = (24, AdminStatus.Yes, serverless.Status.Public)
Refresh = (25, AdminStatus.Yes, serverless.Status.Public)
CreateIndex = (26, AdminStatus.Yes, serverless.Status.Public)
DeleteIndex = (27, AdminStatus.Yes, serverless.Status.Public)
CreateIndexTemplate = (28, AdminStatus.Yes, serverless.Status.Blocked)
DeleteIndexTemplate = (29, AdminStatus.Yes, serverless.Status.Blocked)
ShrinkIndex = (30, AdminStatus.Yes, serverless.Status.Blocked)
CreateMlDatafeed = (31, AdminStatus.Yes, serverless.Status.Public)
DeleteMlDatafeed = (32, AdminStatus.Yes, serverless.Status.Public)
StartMlDatafeed = (33, AdminStatus.Yes, serverless.Status.Public)
StopMlDatafeed = (34, AdminStatus.Yes, serverless.Status.Public)
CreateMlJob = (35, AdminStatus.Yes, serverless.Status.Public)
DeleteMlJob = (36, AdminStatus.Yes, serverless.Status.Public)
OpenMlJob = (37, AdminStatus.Yes, serverless.Status.Public)
CloseMlJob = (38, AdminStatus.Yes, serverless.Status.Public)
Sleep = (39, AdminStatus.Yes, serverless.Status.Public)
DeleteSnapshotRepository = (40, AdminStatus.Yes, serverless.Status.Internal)
CreateSnapshotRepository = (41, AdminStatus.Yes, serverless.Status.Internal)
CreateSnapshot = (42, AdminStatus.Yes, serverless.Status.Internal)
RestoreSnapshot = (43, AdminStatus.Yes, serverless.Status.Internal)
PutSettings = (44, AdminStatus.Yes, serverless.Status.Internal)
CreateTransform = (45, AdminStatus.Yes, serverless.Status.Public)
StartTransform = (46, AdminStatus.Yes, serverless.Status.Public)
WaitForTransform = (47, AdminStatus.Yes, serverless.Status.Public)
DeleteTransform = (48, AdminStatus.Yes, serverless.Status.Public)
CreateDataStream = (49, AdminStatus.Yes, serverless.Status.Public)
DeleteDataStream = (50, AdminStatus.Yes, serverless.Status.Public)
CreateComposableTemplate = (51, AdminStatus.Yes, serverless.Status.Public)
DeleteComposableTemplate = (52, AdminStatus.Yes, serverless.Status.Public)
CreateComponentTemplate = (53, AdminStatus.Yes, serverless.Status.Public)
DeleteComponentTemplate = (54, AdminStatus.Yes, serverless.Status.Public)
TransformStats = (55, AdminStatus.Yes, serverless.Status.Public)
CreateIlmPolicy = (56, AdminStatus.Yes, serverless.Status.Blocked)
DeleteIlmPolicy = (57, AdminStatus.Yes, serverless.Status.Blocked)

def __init__(self, id: int, admin_status: AdminStatus, serverless_status: serverless.Status):
self.id = id
Expand Down Expand Up @@ -870,6 +871,8 @@ def from_hyphenated_string(cls, v):
return OperationType.FieldCaps
elif v == "downsample":
return OperationType.Downsample
elif v == "esql":
return OperationType.Esql
else:
raise KeyError(f"No enum value for [{v}]")

Expand Down
38 changes: 38 additions & 0 deletions tests/driver/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7228,3 +7228,41 @@ async def test_field_caps_with_index_filter(self, es):

expected_body = {"index_filter": index_filter}
es.field_caps.assert_awaited_once_with(index="_all", fields="time-*", body=expected_body, params=None)


class TestEsqlRunner:
@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_esql_without_query_filter(self, es):
es.options.return_value = es
es.perform_request = mock.AsyncMock()
esql = runner.Esql()
result = await esql(es, params={"query": "from logs-* | stats c = count(*)"})
assert result == {"weight": 1, "unit": "ops", "success": True}
expected_body = {"query": "from logs-* | stats c = count(*)"}
es.perform_request.assert_awaited_once_with(method="POST", path="/_query", headers=None, body=expected_body, params={})

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_esql_with_query_filter(self, es):
es.options.return_value = es
es.perform_request = mock.AsyncMock()
esql = runner.Esql()
query_filter = {"range": {"@timestamp": {"gte": "2023"}}}
result = await esql(es, params={"query": "from * | limit 1", "filter": query_filter})
assert result == {"weight": 1, "unit": "ops", "success": True}
expected_body = {"query": "from * | limit 1", "filter": query_filter}
es.perform_request.assert_awaited_once_with(method="POST", path="/_query", headers=None, body=expected_body, params={})

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_esql_with_body(self, es):
es.options.return_value = es
es.perform_request = mock.AsyncMock()
esql = runner.Esql()
pragma = {"data_partitioning": "doc"}
result = await esql(es, params={"query": "from * | limit 1", "body": {"pragma": pragma}})
assert result == {"weight": 1, "unit": "ops", "success": True}

expected_body = {"pragma": pragma, "query": "from * | limit 1"}
es.perform_request.assert_awaited_once_with(method="POST", path="/_query", headers=None, body=expected_body, params={})