diff --git a/docs/track.rst b/docs/track.rst index 49e631093..a2caefab9 100644 --- a/docs/track.rst +++ b/docs/track.rst @@ -3132,6 +3132,40 @@ Meta-data The operation returns no meta-data. +esql +~~~~~~~~~~~~~ + +With the operation type ``esql`` you can execute `ES|QL query `_. + +Properties +"""""""""" + +* ``query`` (mandatory): An ES|QL query starts with a source command followed processing commands. +* ``filter`` (optional): A query filter defined in `Elasticsearch query DSL `_. +* ``body`` (optional): The query body. + +Example:: + + { + "name": "default", + "operation-type": "esql", + "query": "FROM logs-* | STATS count=count(*) BY agent.hostname | SORT count DESC | LIMIT 20", + "filter": { + "range": { + "timestamp": { + "gte": "now-1d/d", + "lte": "now/d" + } + } + } + } + +Meta-data +""""""""" + +* ``weight``: "weight" of an operation, in this case the number of retrieved pages. +* ``unit``: The unit in which to interpret ``weight``, in this case ``pages``. +* ``success``: A boolean indicating whether the query has succeeded. .. _track_dependencies: diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 6222e038a..de062791b 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -61,6 +61,7 @@ def register_default_runners(): register_runner(track.OperationType.ClosePointInTime, ClosePointInTime(), async_runner=True) register_runner(track.OperationType.Sql, Sql(), async_runner=True) register_runner(track.OperationType.FieldCaps, FieldCaps(), async_runner=True) + register_runner(track.OperationType.Esql, Esql(), async_runner=True) # This is an administrative operation but there is no need for a retry here as we don't issue a request register_runner(track.OperationType.Sleep, Sleep(), async_runner=True) @@ -2829,6 +2830,28 @@ def __repr__(self, *args, **kwargs): return "field-caps" +class Esql(Runner): + async def __call__(self, es, params): + params, request_params, transport_params, headers = self._transport_request_params(params) + es = es.options(**transport_params) + query = mandatory(params, "query", self) + body = params.get("body", {}) + body["query"] = query + query_filter = params.get("filter") + if query_filter: + body["filter"] = query_filter + if not bool(headers): + # counter-intuitive, but preserves prior behavior + headers = None + # disable eager response parsing - responses might be huge thus skewing results + es.return_raw_response() + await es.perform_request(method="POST", path="/_query", headers=headers, body=body, params=request_params) + return {"success": True, "unit": "ops", "weight": 1} + + def __repr__(self, *args, **kwargs): + return "esql" + + class RequestTiming(Runner, Delegator): def __init__(self, delegate): super().__init__(delegate=delegate) diff --git a/esrally/track/track.py b/esrally/track/track.py index 9257aec23..cac32802b 100644 --- a/esrally/track/track.py +++ b/esrally/track/track.py @@ -701,44 +701,45 @@ class OperationType(Enum): CompositeAgg = (18, AdminStatus.No, serverless.Status.Public) WaitForCurrentSnapshotsCreate = (19, AdminStatus.No, serverless.Status.Internal) Downsample = (20, AdminStatus.No, serverless.Status.Internal) + Esql = (21, AdminStatus.No, serverless.Status.Blocked) # administrative actions - ForceMerge = (21, AdminStatus.Yes, serverless.Status.Internal) - ClusterHealth = (22, AdminStatus.Yes, serverless.Status.Internal) - PutPipeline = (23, AdminStatus.Yes, serverless.Status.Public) - Refresh = (24, AdminStatus.Yes, serverless.Status.Public) - CreateIndex = (25, AdminStatus.Yes, serverless.Status.Public) - DeleteIndex = (26, AdminStatus.Yes, serverless.Status.Public) - CreateIndexTemplate = (27, AdminStatus.Yes, serverless.Status.Blocked) - DeleteIndexTemplate = (28, AdminStatus.Yes, serverless.Status.Blocked) - ShrinkIndex = (29, AdminStatus.Yes, serverless.Status.Blocked) - CreateMlDatafeed = (30, AdminStatus.Yes, serverless.Status.Public) - DeleteMlDatafeed = (31, AdminStatus.Yes, serverless.Status.Public) - StartMlDatafeed = (32, AdminStatus.Yes, serverless.Status.Public) - StopMlDatafeed = (33, AdminStatus.Yes, serverless.Status.Public) - CreateMlJob = (34, AdminStatus.Yes, serverless.Status.Public) - DeleteMlJob = (35, AdminStatus.Yes, serverless.Status.Public) - OpenMlJob = (36, AdminStatus.Yes, serverless.Status.Public) - CloseMlJob = (37, AdminStatus.Yes, serverless.Status.Public) - Sleep = (38, AdminStatus.Yes, serverless.Status.Public) - DeleteSnapshotRepository = (39, AdminStatus.Yes, serverless.Status.Internal) - CreateSnapshotRepository = (40, AdminStatus.Yes, serverless.Status.Internal) - CreateSnapshot = (41, AdminStatus.Yes, serverless.Status.Internal) - RestoreSnapshot = (42, AdminStatus.Yes, serverless.Status.Internal) - PutSettings = (43, AdminStatus.Yes, serverless.Status.Internal) - CreateTransform = (44, AdminStatus.Yes, serverless.Status.Public) - StartTransform = (45, AdminStatus.Yes, serverless.Status.Public) - WaitForTransform = (46, AdminStatus.Yes, serverless.Status.Public) - DeleteTransform = (47, AdminStatus.Yes, serverless.Status.Public) - CreateDataStream = (48, AdminStatus.Yes, serverless.Status.Public) - DeleteDataStream = (49, AdminStatus.Yes, serverless.Status.Public) - CreateComposableTemplate = (50, AdminStatus.Yes, serverless.Status.Public) - DeleteComposableTemplate = (51, AdminStatus.Yes, serverless.Status.Public) - CreateComponentTemplate = (52, AdminStatus.Yes, serverless.Status.Public) - DeleteComponentTemplate = (53, AdminStatus.Yes, serverless.Status.Public) - TransformStats = (54, AdminStatus.Yes, serverless.Status.Public) - CreateIlmPolicy = (55, AdminStatus.Yes, serverless.Status.Blocked) - DeleteIlmPolicy = (56, AdminStatus.Yes, serverless.Status.Blocked) + ForceMerge = (22, AdminStatus.Yes, serverless.Status.Internal) + ClusterHealth = (23, AdminStatus.Yes, serverless.Status.Internal) + PutPipeline = (24, AdminStatus.Yes, serverless.Status.Public) + Refresh = (25, AdminStatus.Yes, serverless.Status.Public) + CreateIndex = (26, AdminStatus.Yes, serverless.Status.Public) + DeleteIndex = (27, AdminStatus.Yes, serverless.Status.Public) + CreateIndexTemplate = (28, AdminStatus.Yes, serverless.Status.Blocked) + DeleteIndexTemplate = (29, AdminStatus.Yes, serverless.Status.Blocked) + ShrinkIndex = (30, AdminStatus.Yes, serverless.Status.Blocked) + CreateMlDatafeed = (31, AdminStatus.Yes, serverless.Status.Public) + DeleteMlDatafeed = (32, AdminStatus.Yes, serverless.Status.Public) + StartMlDatafeed = (33, AdminStatus.Yes, serverless.Status.Public) + StopMlDatafeed = (34, AdminStatus.Yes, serverless.Status.Public) + CreateMlJob = (35, AdminStatus.Yes, serverless.Status.Public) + DeleteMlJob = (36, AdminStatus.Yes, serverless.Status.Public) + OpenMlJob = (37, AdminStatus.Yes, serverless.Status.Public) + CloseMlJob = (38, AdminStatus.Yes, serverless.Status.Public) + Sleep = (39, AdminStatus.Yes, serverless.Status.Public) + DeleteSnapshotRepository = (40, AdminStatus.Yes, serverless.Status.Internal) + CreateSnapshotRepository = (41, AdminStatus.Yes, serverless.Status.Internal) + CreateSnapshot = (42, AdminStatus.Yes, serverless.Status.Internal) + RestoreSnapshot = (43, AdminStatus.Yes, serverless.Status.Internal) + PutSettings = (44, AdminStatus.Yes, serverless.Status.Internal) + CreateTransform = (45, AdminStatus.Yes, serverless.Status.Public) + StartTransform = (46, AdminStatus.Yes, serverless.Status.Public) + WaitForTransform = (47, AdminStatus.Yes, serverless.Status.Public) + DeleteTransform = (48, AdminStatus.Yes, serverless.Status.Public) + CreateDataStream = (49, AdminStatus.Yes, serverless.Status.Public) + DeleteDataStream = (50, AdminStatus.Yes, serverless.Status.Public) + CreateComposableTemplate = (51, AdminStatus.Yes, serverless.Status.Public) + DeleteComposableTemplate = (52, AdminStatus.Yes, serverless.Status.Public) + CreateComponentTemplate = (53, AdminStatus.Yes, serverless.Status.Public) + DeleteComponentTemplate = (54, AdminStatus.Yes, serverless.Status.Public) + TransformStats = (55, AdminStatus.Yes, serverless.Status.Public) + CreateIlmPolicy = (56, AdminStatus.Yes, serverless.Status.Blocked) + DeleteIlmPolicy = (57, AdminStatus.Yes, serverless.Status.Blocked) def __init__(self, id: int, admin_status: AdminStatus, serverless_status: serverless.Status): self.id = id @@ -870,6 +871,8 @@ def from_hyphenated_string(cls, v): return OperationType.FieldCaps elif v == "downsample": return OperationType.Downsample + elif v == "esql": + return OperationType.Esql else: raise KeyError(f"No enum value for [{v}]") diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index 42833df31..a257b48e4 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -7228,3 +7228,41 @@ async def test_field_caps_with_index_filter(self, es): expected_body = {"index_filter": index_filter} es.field_caps.assert_awaited_once_with(index="_all", fields="time-*", body=expected_body, params=None) + + +class TestEsqlRunner: + @mock.patch("elasticsearch.Elasticsearch") + @pytest.mark.asyncio + async def test_esql_without_query_filter(self, es): + es.options.return_value = es + es.perform_request = mock.AsyncMock() + esql = runner.Esql() + result = await esql(es, params={"query": "from logs-* | stats c = count(*)"}) + assert result == {"weight": 1, "unit": "ops", "success": True} + expected_body = {"query": "from logs-* | stats c = count(*)"} + es.perform_request.assert_awaited_once_with(method="POST", path="/_query", headers=None, body=expected_body, params={}) + + @mock.patch("elasticsearch.Elasticsearch") + @pytest.mark.asyncio + async def test_esql_with_query_filter(self, es): + es.options.return_value = es + es.perform_request = mock.AsyncMock() + esql = runner.Esql() + query_filter = {"range": {"@timestamp": {"gte": "2023"}}} + result = await esql(es, params={"query": "from * | limit 1", "filter": query_filter}) + assert result == {"weight": 1, "unit": "ops", "success": True} + expected_body = {"query": "from * | limit 1", "filter": query_filter} + es.perform_request.assert_awaited_once_with(method="POST", path="/_query", headers=None, body=expected_body, params={}) + + @mock.patch("elasticsearch.Elasticsearch") + @pytest.mark.asyncio + async def test_esql_with_body(self, es): + es.options.return_value = es + es.perform_request = mock.AsyncMock() + esql = runner.Esql() + pragma = {"data_partitioning": "doc"} + result = await esql(es, params={"query": "from * | limit 1", "body": {"pragma": pragma}}) + assert result == {"weight": 1, "unit": "ops", "success": True} + + expected_body = {"pragma": pragma, "query": "from * | limit 1"} + es.perform_request.assert_awaited_once_with(method="POST", path="/_query", headers=None, body=expected_body, params={})