-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Top N Queries by latency implementation #11904
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
1db4ab5
Top N Queries by latency implementation
ansjcy ae9e336
Increase JavaDoc coverage and update PR based comments
ansjcy c4fb7ae
Refactor record and service to make them generic
ansjcy 409fc92
refactor service for improving multithreading efficiency
ansjcy c7c7ef6
rebase from master to pick up query insights plugin changes
ansjcy File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
274 changes: 274 additions & 0 deletions
274
...ternalClusterTest/java/org/opensearch/plugin/insights/QueryInsightsPluginTransportIT.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,274 @@ | ||
| /* | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| * | ||
| * The OpenSearch Contributors require contributions made to | ||
| * this file be licensed under the Apache-2.0 license or a | ||
| * compatible open source license. | ||
| */ | ||
|
|
||
| package org.opensearch.plugin.insights; | ||
|
|
||
| import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; | ||
| import org.opensearch.action.admin.cluster.node.info.NodeInfo; | ||
| import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest; | ||
| import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; | ||
| import org.opensearch.action.admin.cluster.node.info.PluginsAndModules; | ||
| import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; | ||
| import org.opensearch.action.index.IndexResponse; | ||
| import org.opensearch.action.search.SearchResponse; | ||
| import org.opensearch.common.settings.Settings; | ||
| import org.opensearch.index.query.QueryBuilders; | ||
| import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction; | ||
| import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesRequest; | ||
| import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesResponse; | ||
| import org.opensearch.plugin.insights.rules.model.MetricType; | ||
| import org.opensearch.plugins.Plugin; | ||
| import org.opensearch.plugins.PluginInfo; | ||
| import org.opensearch.test.OpenSearchIntegTestCase; | ||
| import org.junit.Assert; | ||
|
|
||
| import java.util.Arrays; | ||
| import java.util.Collection; | ||
| import java.util.List; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.function.Function; | ||
| import java.util.stream.Collectors; | ||
| import java.util.stream.Stream; | ||
|
|
||
| import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED; | ||
| import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE; | ||
| import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE; | ||
| import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; | ||
|
|
||
| /** | ||
| * Transport Action tests for Query Insights Plugin | ||
| */ | ||
|
|
||
| @OpenSearchIntegTestCase.ClusterScope(numDataNodes = 0, scope = OpenSearchIntegTestCase.Scope.TEST) | ||
| public class QueryInsightsPluginTransportIT extends OpenSearchIntegTestCase { | ||
|
|
||
| private final int TOTAL_NUMBER_OF_NODES = 2; | ||
| private final int TOTAL_SEARCH_REQUESTS = 5; | ||
|
|
||
| @Override | ||
| protected Collection<Class<? extends Plugin>> nodePlugins() { | ||
| return Arrays.asList(QueryInsightsPlugin.class); | ||
| } | ||
|
|
||
| /** | ||
| * Test Query Insights Plugin is installed | ||
| */ | ||
| public void testQueryInsightPluginInstalled() { | ||
| NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); | ||
| nodesInfoRequest.addMetric(NodesInfoRequest.Metric.PLUGINS.metricName()); | ||
| NodesInfoResponse nodesInfoResponse = OpenSearchIntegTestCase.client().admin().cluster().nodesInfo(nodesInfoRequest).actionGet(); | ||
| List<PluginInfo> pluginInfos = nodesInfoResponse.getNodes() | ||
| .stream() | ||
| .flatMap( | ||
| (Function<NodeInfo, Stream<PluginInfo>>) nodeInfo -> nodeInfo.getInfo(PluginsAndModules.class).getPluginInfos().stream() | ||
| ) | ||
| .collect(Collectors.toList()); | ||
| Assert.assertTrue( | ||
| pluginInfos.stream().anyMatch(pluginInfo -> pluginInfo.getName().equals("org.opensearch.plugin.insights.QueryInsightsPlugin")) | ||
| ); | ||
| } | ||
|
|
||
| /** | ||
| * Test get top queries when feature disabled | ||
| */ | ||
| public void testGetTopQueriesWhenFeatureDisabled() { | ||
| TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY); | ||
| TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet(); | ||
| Assert.assertNotEquals(0, response.failures().size()); | ||
| Assert.assertEquals( | ||
| "Cannot get top n queries for [latency] when it is not enabled.", | ||
| response.failures().get(0).getCause().getCause().getMessage() | ||
| ); | ||
| } | ||
|
|
||
| /** | ||
| * Test update top query record when feature enabled | ||
| */ | ||
| public void testUpdateRecordWhenFeatureDisabledThenEnabled() throws ExecutionException, InterruptedException { | ||
| Settings commonSettings = Settings.builder().put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "false").build(); | ||
|
|
||
| logger.info("--> starting nodes for query insight testing"); | ||
| List<String> nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build()); | ||
|
|
||
| logger.info("--> waiting for nodes to form a cluster"); | ||
| ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet(); | ||
| assertFalse(health.isTimedOut()); | ||
|
|
||
| assertAcked( | ||
| prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 2)) | ||
| ); | ||
| ensureStableCluster(2); | ||
| logger.info("--> creating indices for query insight testing"); | ||
| for (int i = 0; i < 5; i++) { | ||
| IndexResponse response = client().prepareIndex("test_" + i).setId("" + i).setSource("field_" + i, "value_" + i).get(); | ||
| assertEquals("CREATED", response.status().toString()); | ||
| } | ||
| // making search requests to get top queries | ||
| for (int i = 0; i < TOTAL_SEARCH_REQUESTS; i++) { | ||
| SearchResponse searchResponse = internalCluster().client(randomFrom(nodes)) | ||
| .prepareSearch() | ||
| .setQuery(QueryBuilders.matchAllQuery()) | ||
| .get(); | ||
| assertEquals(searchResponse.getFailedShards(), 0); | ||
| } | ||
|
|
||
| TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY); | ||
| TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet(); | ||
| Assert.assertNotEquals(0, response.failures().size()); | ||
| Assert.assertEquals( | ||
| "Cannot get top n queries for [latency] when it is not enabled.", | ||
| response.failures().get(0).getCause().getCause().getMessage() | ||
| ); | ||
|
|
||
| ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().persistentSettings( | ||
| Settings.builder().put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true").build() | ||
| ); | ||
| assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).get()); | ||
| TopQueriesRequest request2 = new TopQueriesRequest(MetricType.LATENCY); | ||
| TopQueriesResponse response2 = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request2).actionGet(); | ||
| Assert.assertEquals(0, response2.failures().size()); | ||
| Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response2.getNodes().size()); | ||
| for (int i = 0; i < TOTAL_NUMBER_OF_NODES; i++) { | ||
| Assert.assertEquals(0, response2.getNodes().get(i).getTopQueriesRecord().size()); | ||
| } | ||
|
|
||
| internalCluster().stopAllNodes(); | ||
| } | ||
|
|
||
| /** | ||
| * Test get top queries when feature enabled | ||
| */ | ||
| public void testGetTopQueriesWhenFeatureEnabled() throws InterruptedException { | ||
| Settings commonSettings = Settings.builder() | ||
| .put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true") | ||
| .put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "100") | ||
| .put(TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey(), "600s") | ||
| .build(); | ||
|
|
||
| logger.info("--> starting nodes for query insight testing"); | ||
| List<String> nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build()); | ||
|
|
||
| logger.info("--> waiting for nodes to form a cluster"); | ||
| ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet(); | ||
| assertFalse(health.isTimedOut()); | ||
|
|
||
| assertAcked( | ||
| prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 2)) | ||
| ); | ||
| ensureStableCluster(2); | ||
| logger.info("--> creating indices for query insight testing"); | ||
| for (int i = 0; i < 5; i++) { | ||
| IndexResponse response = client().prepareIndex("test_" + i).setId("" + i).setSource("field_" + i, "value_" + i).get(); | ||
| assertEquals("CREATED", response.status().toString()); | ||
| } | ||
| // making search requests to get top queries | ||
| for (int i = 0; i < TOTAL_SEARCH_REQUESTS; i++) { | ||
| SearchResponse searchResponse = internalCluster().client(randomFrom(nodes)) | ||
| .prepareSearch() | ||
| .setQuery(QueryBuilders.matchAllQuery()) | ||
| .get(); | ||
| assertEquals(searchResponse.getFailedShards(), 0); | ||
| } | ||
| // Sleep to wait for queue drained to top queries store | ||
| Thread.sleep(6000); | ||
| TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY); | ||
| TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet(); | ||
| Assert.assertEquals(0, response.failures().size()); | ||
| Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size()); | ||
| Assert.assertEquals(TOTAL_SEARCH_REQUESTS, response.getNodes().stream().mapToInt(o -> o.getTopQueriesRecord().size()).sum()); | ||
|
|
||
| internalCluster().stopAllNodes(); | ||
| } | ||
|
|
||
| /** | ||
| * Test get top queries with small top n size | ||
| */ | ||
| public void testGetTopQueriesWithSmallTopN() throws InterruptedException { | ||
| Settings commonSettings = Settings.builder() | ||
| .put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true") | ||
| .put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "1") | ||
| .put(TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey(), "600s") | ||
| .build(); | ||
|
|
||
| logger.info("--> starting nodes for query insight testing"); | ||
| List<String> nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build()); | ||
|
|
||
| logger.info("--> waiting for nodes to form a cluster"); | ||
| ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet(); | ||
| assertFalse(health.isTimedOut()); | ||
|
|
||
| assertAcked( | ||
| prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 2)) | ||
| ); | ||
| ensureStableCluster(2); | ||
| logger.info("--> creating indices for query insight testing"); | ||
| for (int i = 0; i < 5; i++) { | ||
| IndexResponse response = client().prepareIndex("test_" + i).setId("" + i).setSource("field_" + i, "value_" + i).get(); | ||
| assertEquals("CREATED", response.status().toString()); | ||
| } | ||
| // making search requests to get top queries | ||
| for (int i = 0; i < TOTAL_SEARCH_REQUESTS; i++) { | ||
| SearchResponse searchResponse = internalCluster().client(randomFrom(nodes)) | ||
| .prepareSearch() | ||
| .setQuery(QueryBuilders.matchAllQuery()) | ||
| .get(); | ||
| assertEquals(searchResponse.getFailedShards(), 0); | ||
| } | ||
| Thread.sleep(6000); | ||
| TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY); | ||
| TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet(); | ||
| Assert.assertEquals(0, response.failures().size()); | ||
| Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size()); | ||
| Assert.assertEquals(2, response.getNodes().stream().mapToInt(o -> o.getTopQueriesRecord().size()).sum()); | ||
|
|
||
| internalCluster().stopAllNodes(); | ||
| } | ||
|
|
||
| /** | ||
| * Test get top queries with small window size | ||
| */ | ||
| public void testGetTopQueriesWithSmallWindowSize() throws InterruptedException { | ||
| Settings commonSettings = Settings.builder() | ||
| .put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true") | ||
| .put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "100") | ||
| .put(TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey(), "1m") | ||
| .build(); | ||
|
|
||
| logger.info("--> starting nodes for query insight testing"); | ||
| List<String> nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build()); | ||
|
|
||
| logger.info("--> waiting for nodes to form a cluster"); | ||
| ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet(); | ||
| assertFalse(health.isTimedOut()); | ||
|
|
||
| assertAcked( | ||
| prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 2)) | ||
| ); | ||
| ensureStableCluster(2); | ||
| logger.info("--> creating indices for query insight testing"); | ||
| for (int i = 0; i < 5; i++) { | ||
| IndexResponse response = client().prepareIndex("test_" + i).setId("" + i).setSource("field_" + i, "value_" + i).get(); | ||
| assertEquals("CREATED", response.status().toString()); | ||
| } | ||
| // making search requests to get top queries | ||
| for (int i = 0; i < TOTAL_SEARCH_REQUESTS; i++) { | ||
| SearchResponse searchResponse = internalCluster().client(randomFrom(nodes)) | ||
| .prepareSearch() | ||
| .setQuery(QueryBuilders.matchAllQuery()) | ||
| .get(); | ||
| assertEquals(searchResponse.getFailedShards(), 0); | ||
| } | ||
|
|
||
| TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY); | ||
| TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet(); | ||
| Assert.assertEquals(0, response.failures().size()); | ||
| Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size()); | ||
| Thread.sleep(6000); | ||
| internalCluster().stopAllNodes(); | ||
| } | ||
| } | ||
107 changes: 107 additions & 0 deletions
107
...query-insights/src/javaRestTest/java/org/opensearch/plugin/insights/TopQueriesRestIT.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,107 @@ | ||
| /* | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| * | ||
| * The OpenSearch Contributors require contributions made to | ||
| * this file be licensed under the Apache-2.0 license or a | ||
| * compatible open source license. | ||
| */ | ||
|
|
||
| package org.opensearch.plugin.insights; | ||
|
|
||
| import org.opensearch.client.Request; | ||
| import org.opensearch.client.Response; | ||
| import org.opensearch.common.xcontent.LoggingDeprecationHandler; | ||
| import org.opensearch.common.xcontent.json.JsonXContent; | ||
| import org.opensearch.core.xcontent.NamedXContentRegistry; | ||
| import org.opensearch.test.rest.OpenSearchRestTestCase; | ||
| import org.junit.Assert; | ||
|
|
||
| import java.io.IOException; | ||
| import java.nio.charset.StandardCharsets; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
|
|
||
| /** | ||
| * Rest Action tests for Query Insights | ||
| */ | ||
| public class TopQueriesRestIT extends OpenSearchRestTestCase { | ||
|
|
||
| /** | ||
| * test Query Insights is installed | ||
| * @throws IOException IOException | ||
| */ | ||
| @SuppressWarnings("unchecked") | ||
| public void testQueryInsightsPluginInstalled() throws IOException { | ||
| Request request = new Request("GET", "/_cat/plugins?s=component&h=name,component,version,description&format=json"); | ||
| Response response = client().performRequest(request); | ||
| List<Object> pluginsList = JsonXContent.jsonXContent.createParser( | ||
| NamedXContentRegistry.EMPTY, | ||
| LoggingDeprecationHandler.INSTANCE, | ||
| response.getEntity().getContent() | ||
| ).list(); | ||
| Assert.assertTrue( | ||
| pluginsList.stream().map(o -> (Map<String, Object>) o).anyMatch(plugin -> plugin.get("component").equals("query-insights")) | ||
| ); | ||
| } | ||
|
|
||
| /** | ||
| * test enabling top queries | ||
| * @throws IOException IOException | ||
| */ | ||
| public void testTopQueriesResponses() throws IOException { | ||
| // Enable Top N Queries feature | ||
| Request request = new Request("PUT", "/_cluster/settings"); | ||
| request.setJsonEntity(defaultTopQueriesSettings()); | ||
| Response response = client().performRequest(request); | ||
|
|
||
| Assert.assertEquals(200, response.getStatusLine().getStatusCode()); | ||
|
|
||
| // Create documents for search | ||
| request = new Request("POST", "/my-index-0/_doc"); | ||
| request.setJsonEntity(createDocumentsBody()); | ||
| response = client().performRequest(request); | ||
|
|
||
| Assert.assertEquals(201, response.getStatusLine().getStatusCode()); | ||
|
|
||
| // Do Search | ||
| request = new Request("GET", "/my-index-0/_search?size=20&pretty"); | ||
| request.setJsonEntity(searchBody()); | ||
| response = client().performRequest(request); | ||
| Assert.assertEquals(200, response.getStatusLine().getStatusCode()); | ||
| response = client().performRequest(request); | ||
| Assert.assertEquals(200, response.getStatusLine().getStatusCode()); | ||
|
|
||
| // Get Top Queries | ||
| request = new Request("GET", "/_insights/top_queries?pretty"); | ||
| response = client().performRequest(request); | ||
|
|
||
| Assert.assertEquals(200, response.getStatusLine().getStatusCode()); | ||
| String top_requests = new String(response.getEntity().getContent().readAllBytes(), StandardCharsets.UTF_8); | ||
| Assert.assertTrue(top_requests.contains("top_queries")); | ||
| Assert.assertEquals(2, top_requests.split("searchType", -1).length - 1); | ||
| } | ||
|
|
||
| private String defaultTopQueriesSettings() { | ||
| return "{\n" | ||
| + " \"persistent\" : {\n" | ||
| + " \"search.top_n_queries.latency.enabled\" : \"true\",\n" | ||
| + " \"search.top_n_queries.latency.window_size\" : \"600s\",\n" | ||
| + " \"search.top_n_queries.latency.top_n_size\" : 5\n" | ||
| + " }\n" | ||
| + "}"; | ||
| } | ||
|
|
||
| private String createDocumentsBody() { | ||
| return "{\n" | ||
| + " \"@timestamp\": \"2099-11-15T13:12:00\",\n" | ||
| + " \"message\": \"this is document 1\",\n" | ||
| + " \"user\": {\n" | ||
| + " \"id\": \"cyji\"\n" | ||
| + " }\n" | ||
| + "}"; | ||
| } | ||
|
|
||
| private String searchBody() { | ||
| return "{}"; | ||
| } | ||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.