diff --git a/.github/workflows/integ-tests-with-geo.yml b/.github/workflows/integ-tests-with-geo.yml new file mode 100644 index 00000000000..065c3b3e3b7 --- /dev/null +++ b/.github/workflows/integ-tests-with-geo.yml @@ -0,0 +1,89 @@ +name: GeoSpatial Plugin IT + +on: + pull_request: + push: + branches-ignore: + - 'dependabot/**' + paths: + - 'integ-test/**' + - '.github/workflows/integ-tests-with-geo.yml' + +jobs: + Get-CI-Image-Tag: + uses: opensearch-project/opensearch-build/.github/workflows/get-ci-image-tag.yml@main + with: + product: opensearch + + security-it-linux: + needs: Get-CI-Image-Tag + strategy: + fail-fast: false + matrix: + java: [21] + runs-on: ubuntu-latest + container: + # using the same image which is used by opensearch-build team to build the OpenSearch Distribution + # this image tag is subject to change as more dependencies and updates will arrive over time + image: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-version-linux }} + options: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-start-options }} + + steps: + - name: Run start commands + run: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-start-command }} + + - uses: actions/checkout@v4 + + - name: Set up JDK ${{ matrix.java }} + uses: actions/setup-java@v4 + with: + distribution: 'temurin' + java-version: ${{ matrix.java }} + + - name: Build with Gradle + run: | + chown -R 1000:1000 `pwd` + su `id -un 1000` -c "./gradlew integTestWithGeo" + + - name: Upload test reports + if: ${{ always() }} + uses: actions/upload-artifact@v4 + continue-on-error: true + with: + name: test-reports-${{ matrix.os }}-${{ matrix.java }} + path: | + integ-test/build/reports/** + integ-test/build/testclusters/*/logs/* + integ-test/build/testclusters/*/config/* + + security-it-windows-macos: + strategy: + fail-fast: false + matrix: + os: [ windows-latest, macos-13 ] + java: [21] + + runs-on: ${{ matrix.os }} + + steps: + - uses: actions/checkout@v4 + + - name: Set up JDK ${{ matrix.java }} + uses: actions/setup-java@v4 + with: + distribution: 'temurin' + java-version: ${{ matrix.java }} + + - name: Build with Gradle + run: ./gradlew integTestWithGeo + + - name: Upload test reports + if: ${{ always() }} + uses: actions/upload-artifact@v4 + continue-on-error: true + with: + name: test-reports-${{ matrix.os }}-${{ matrix.java }} + path: | + integ-test/build/reports/** + integ-test/build/testclusters/*/logs/* + integ-test/build/testclusters/*/config/* diff --git a/core/src/main/java/org/opensearch/sql/expression/DSL.java b/core/src/main/java/org/opensearch/sql/expression/DSL.java index 44ecc2bc867..a96fb7bc218 100644 --- a/core/src/main/java/org/opensearch/sql/expression/DSL.java +++ b/core/src/main/java/org/opensearch/sql/expression/DSL.java @@ -969,6 +969,10 @@ public static FunctionExpression utc_timestamp( return compile(functionProperties, BuiltinFunctionName.UTC_TIMESTAMP, args); } + public static FunctionExpression geoip(Expression... args) { + return compile(FunctionProperties.None, BuiltinFunctionName.GEOIP, args); + } + @SuppressWarnings("unchecked") private static T compile( FunctionProperties functionProperties, BuiltinFunctionName bfn, Expression... args) { diff --git a/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java b/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java index f8e9cf7c5f7..ceb8eb5facc 100644 --- a/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java +++ b/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java @@ -204,6 +204,9 @@ public enum BuiltinFunctionName { TRIM(FunctionName.of("trim")), UPPER(FunctionName.of("upper")), + /** GEOSPATIAL Functions. */ + GEOIP(FunctionName.of("geoip")), + /** NULL Test. */ IS_NULL(FunctionName.of("is null")), IS_NOT_NULL(FunctionName.of("is not null")), diff --git a/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionRepository.java b/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionRepository.java index 79ea58b8608..a56a029ea83 100644 --- a/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionRepository.java +++ b/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionRepository.java @@ -27,6 +27,7 @@ import org.opensearch.sql.expression.aggregation.AggregatorFunctions; import org.opensearch.sql.expression.datetime.DateTimeFunctions; import org.opensearch.sql.expression.datetime.IntervalClause; +import org.opensearch.sql.expression.ip.GeoIPFunctions; import org.opensearch.sql.expression.ip.IPFunctions; import org.opensearch.sql.expression.operator.arthmetic.ArithmeticFunctions; import org.opensearch.sql.expression.operator.arthmetic.MathematicalFunctions; @@ -83,6 +84,7 @@ public static synchronized BuiltinFunctionRepository getInstance() { SystemFunctions.register(instance); OpenSearchFunctions.register(instance); IPFunctions.register(instance); + GeoIPFunctions.register(instance); } return instance; } diff --git a/core/src/main/java/org/opensearch/sql/expression/ip/GeoIPFunctions.java b/core/src/main/java/org/opensearch/sql/expression/ip/GeoIPFunctions.java new file mode 100644 index 00000000000..19ab130ee02 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/expression/ip/GeoIPFunctions.java @@ -0,0 +1,65 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.expression.ip; + +import static org.opensearch.sql.data.type.ExprCoreType.BOOLEAN; +import static org.opensearch.sql.data.type.ExprCoreType.STRING; +import static org.opensearch.sql.expression.function.FunctionDSL.define; + +import java.util.Arrays; +import java.util.List; +import lombok.experimental.UtilityClass; +import org.apache.commons.lang3.tuple.Pair; +import org.opensearch.sql.data.type.ExprType; +import org.opensearch.sql.expression.function.BuiltinFunctionName; +import org.opensearch.sql.expression.function.BuiltinFunctionRepository; +import org.opensearch.sql.expression.function.DefaultFunctionResolver; +import org.opensearch.sql.expression.function.FunctionBuilder; +import org.opensearch.sql.expression.function.FunctionName; +import org.opensearch.sql.expression.function.FunctionSignature; +import org.opensearch.sql.expression.function.SerializableFunction; + +/** + * Utility class to register the method signature for geoip( ) expression, concreted reallocated to + * `opensearch` module, as this Ip location require GeoSpatial Plugin runtime support. + */ +@UtilityClass +public class GeoIPFunctions { + + public void register(BuiltinFunctionRepository repository) { + repository.register(geoIp()); + } + + /** + * To register all method signatures related to geoip( ) expression under eval. + * + * @return Resolver for geoip( ) expression. + */ + private DefaultFunctionResolver geoIp() { + return define( + BuiltinFunctionName.GEOIP.getName(), + openSearchImpl(BOOLEAN, Arrays.asList(STRING, STRING)), + openSearchImpl(BOOLEAN, Arrays.asList(STRING, STRING, STRING))); + } + + /** + * Util method to generate probe implementation with given list of argument types, with marker + * class `OpenSearchFunctionExpression` to annotate this is an OpenSearch specific expression. + * + * @param returnType return type. + * @return Binary Function Implementation. + */ + public static SerializableFunction> + openSearchImpl(ExprType returnType, List args) { + return functionName -> { + FunctionSignature functionSignature = new FunctionSignature(functionName, args); + FunctionBuilder functionBuilder = + (functionProperties, arguments) -> + new OpenSearchFunctionExpression(functionName, arguments, returnType); + return Pair.of(functionSignature, functionBuilder); + }; + } +} diff --git a/core/src/main/java/org/opensearch/sql/expression/ip/OpenSearchFunctionExpression.java b/core/src/main/java/org/opensearch/sql/expression/ip/OpenSearchFunctionExpression.java new file mode 100644 index 00000000000..151552a5f83 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/expression/ip/OpenSearchFunctionExpression.java @@ -0,0 +1,47 @@ +/* + * + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.expression.ip; + +import java.util.List; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.type.ExprType; +import org.opensearch.sql.expression.Expression; +import org.opensearch.sql.expression.FunctionExpression; +import org.opensearch.sql.expression.env.Environment; +import org.opensearch.sql.expression.function.FunctionName; + +/** + * Marker class to identify functions only compatible with OpenSearch storage engine. Any attempt to + * invoke the method different from OpenSearch will result in UnsupportedOperationException. + */ +public class OpenSearchFunctionExpression extends FunctionExpression { + + private final ExprType returnType; + + public OpenSearchFunctionExpression( + FunctionName functionName, List arguments, ExprType returnType) { + super(functionName, arguments); + this.returnType = returnType; + } + + @Override + public ExprValue valueOf() { + return null; + } + + @Override + public ExprValue valueOf(Environment valueEnv) { + throw new UnsupportedOperationException( + "OpenSearch runtime specific function, no default implementation available"); + } + + @Override + public ExprType type() { + return returnType; + } +} diff --git a/core/src/test/java/org/opensearch/sql/expression/ip/GeoIPFunctionTest.java b/core/src/test/java/org/opensearch/sql/expression/ip/GeoIPFunctionTest.java new file mode 100644 index 00000000000..413cd4b2d5f --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/expression/ip/GeoIPFunctionTest.java @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.expression.ip; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.opensearch.sql.data.type.ExprCoreType.BOOLEAN; +import static org.opensearch.sql.data.type.ExprCoreType.STRING; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.expression.DSL; +import org.opensearch.sql.expression.Expression; +import org.opensearch.sql.expression.env.Environment; + +@ExtendWith(MockitoExtension.class) +public class GeoIPFunctionTest { + + // Mock value environment for testing. + @Mock private Environment env; + + @Test + public void geoIpDefaultImplementation() { + UnsupportedOperationException exception = + assertThrows( + UnsupportedOperationException.class, + () -> + DSL.geoip(DSL.literal("HARDCODED_DATASOURCE_NAME"), DSL.ref("ip_address", STRING)) + .valueOf(env)); + assertTrue(exception.getMessage().matches(".*no default implementation available")); + } + + @Test + public void testGeoipFnctionSignature() { + var geoip = DSL.geoip(DSL.literal("HARDCODED_DATASOURCE_NAME"), DSL.ref("ip_address", STRING)); + assertEquals(BOOLEAN, geoip.type()); + } + + /** To make sure no logic being evaluated when no environment being passed. */ + @Test + public void testDefaultValueOf() { + var geoip = DSL.geoip(DSL.literal("HARDCODED_DATASOURCE_NAME"), DSL.ref("ip_address", STRING)); + assertNull(geoip.valueOf()); + } +} diff --git a/docs/user/ppl/index.rst b/docs/user/ppl/index.rst index ef8cff334ef..1e24fa3fb0a 100644 --- a/docs/user/ppl/index.rst +++ b/docs/user/ppl/index.rst @@ -106,6 +106,8 @@ The query start with search command and then flowing a set of command delimited - `IP Address Functions `_ + - `Geo IP Address Functions `_ + * **Optimization** - `Optimization <../../user/optimization/optimization.rst>`_ diff --git a/integ-test/build.gradle b/integ-test/build.gradle index 798a0be5367..874a01996fb 100644 --- a/integ-test/build.gradle +++ b/integ-test/build.gradle @@ -49,6 +49,18 @@ String bwcVersion = baseVersion + ".0"; String baseName = "sqlBwcCluster" String bwcFilePath = "src/test/resources/bwc/" +Map dummyLoginConfig = [ + https: "false", + user: "admin", + password: "admin" +] + +Map dynamicLoginConfig = [ + https: System.getProperty("https"), + user: System.getProperty("user"), + password: System.getProperty("password") +] + repositories { mavenCentral() maven { url 'https://jitpack.io' } @@ -77,11 +89,24 @@ ext { return repo + "opensearch-security-${securitySnapshotVersion}.zip" } - var projectAbsPath = projectDir.getAbsolutePath() - File downloadedSecurityPlugin = Paths.get(projectAbsPath, 'bin', 'opensearch-security-snapshot.zip').toFile() + configureGeoPlugin = { OpenSearchCluster cluster -> + File downloadedGeoPlugin = Paths.get(projectAbsPath, 'bin', 'opensearch-geospatial-snapshot.zip').toFile() + if (!downloadedGeoPlugin.exists()) { + download.run { + src getPluginDownloadLink("geospatial") + dest downloadedGeoPlugin + } + } else { + println "Geo-spatial Plugin File Already Exists" + } + cluster.plugin provider {(RegularFile) (() -> downloadedGeoPlugin)} + } + configureSecurityPlugin = { OpenSearchCluster cluster -> + File downloadedSecurityPlugin = Paths.get(projectAbsPath, 'bin', 'opensearch-security-snapshot.zip').toFile() + cluster.getNodes().forEach { node -> var creds = node.getCredentials() if (creds.isEmpty()) { @@ -94,7 +119,7 @@ ext { // add a check to avoid re-downloading multiple times during single test run if (!downloadedSecurityPlugin.exists()) { download.run { - src getSecurityPluginDownloadLink() + src getPluginDownloadLink("opensearch-security") dest downloadedSecurityPlugin } } else { @@ -134,8 +159,7 @@ ext { ].forEach { name, value -> cluster.setting name, value } - - cluster.plugin provider((Callable) (() -> (RegularFile) (() -> downloadedSecurityPlugin))) + cluster.plugin provider {(RegularFile) (() -> downloadedSecurityPlugin)} } bwcOpenSearchJSDownload = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + baseVersion + '/latest/linux/x64/tar/builds/' + @@ -224,21 +248,43 @@ testClusters.all { } def getJobSchedulerPlugin() { - provider(new Callable() { - @Override - RegularFile call() throws Exception { - return new RegularFile() { - @Override - File getAsFile() { - return configurations.zipArchive.asFileTree.matching { - include '**/opensearch-job-scheduler*' - }.singleFile - } - } - } - }) + provider { (RegularFile) (() -> + configurations.zipArchive.asFileTree.matching { + include '**/opensearch-job-scheduler*' + }.singleFile ) + } } +static def getAllHttpSocketURI(cluster) { + return cluster.nodes.stream() + .flatMap { node -> node.getAllHttpSocketURI().stream() } + .collect(Collectors.joining(",")) +} + +static def getAllTransportSocketURI(cluster) { + return cluster.nodes.stream() + .flatMap { node -> node.getAllTransportPortURI().stream() } + .collect(Collectors.joining(",")) +} + +static def getConcatClusterName(clusters) { + return clusters.stream().map(cluster -> cluster.getName()).collect(Collectors.joining(",")) +} + +def getPluginDownloadLink(pluginName) { + var repo = "https://aws.oss.sonatype.org/content/repositories/snapshots/org/opensearch/plugin/" + + pluginName + "/$opensearch_build_snapshot/" + var metadataFile = Paths.get(projectDir.toString(), "build", "maven-metadata.xml").toAbsolutePath().toFile() + download.run { + src repo + "maven-metadata.xml" + dest metadataFile + } + def metadata = new XmlParser().parse(metadataFile) + def PluginSnapshotVersion = metadata.versioning.snapshotVersions[0].snapshotVersion[0].value[0].text() + return repo + pluginName +"-${PluginSnapshotVersion}.zip" +} + + testClusters { integTest { testDistribution = 'archive' @@ -261,6 +307,11 @@ testClusters { plugin(getJobSchedulerPlugin()) plugin ":opensearch-sql-plugin" } + integTestWithGeo { + testDistribution = 'archive' + plugin(getJobSchedulerPlugin()) + plugin ":opensearch-sql-plugin" + } } task startPrometheus(type: SpawnProcessTask) { @@ -296,6 +347,7 @@ task stopPrometheus(type: KillProcessTask) { stopPrometheus.mustRunAfter startPrometheus task integJdbcTest(type: RestIntegTestTask) { + testClusters.findAll {c -> c.clusterName == "integJdbcTest"}.first(). plugin ":opensearch-sql-plugin" @@ -319,9 +371,7 @@ task integJdbcTest(type: RestIntegTestTask) { systemProperty 'tests.security.manager', 'false' systemProperty('project.root', project.projectDir.absolutePath) - systemProperty "https", System.getProperty("https") - systemProperty "user", System.getProperty("user") - systemProperty "password", System.getProperty("password") + systemProperties dynamicLoginConfig // Set default query size limit systemProperty 'defaultQuerySizeLimit', '10000' @@ -340,11 +390,11 @@ task integJdbcTest(type: RestIntegTestTask) { } task integTestWithSecurity(type: RestIntegTestTask) { + useCluster testClusters.integTestWithSecurity useCluster testClusters.remoteIntegTestWithSecurity - systemProperty "cluster.names", - getClusters().stream().map(cluster -> cluster.getName()).collect(Collectors.joining(",")) + systemProperty "cluster.names", getConcatClusterName(getClusters()) getClusters().forEach { cluster -> configureSecurityPlugin(cluster) @@ -370,21 +420,51 @@ task integTestWithSecurity(type: RestIntegTestTask) { doFirst { systemProperty 'cluster.debug', getDebug() getClusters().forEach { cluster -> + systemProperty "tests.rest.${cluster.name}.http_hosts", "${-> getAllHttpSocketURI(cluster)}" + systemProperty "tests.rest.${cluster.name}.transport_hosts", "${-> getAllTransportSocketURI(cluster)}" + } + systemProperties dummyLoginConfig + } - String allTransportSocketURI = cluster.nodes.stream().flatMap { node -> - node.getAllTransportPortURI().stream() - }.collect(Collectors.joining(",")) - String allHttpSocketURI = cluster.nodes.stream().flatMap { node -> - node.getAllHttpSocketURI().stream() - }.collect(Collectors.joining(",")) + if (System.getProperty("test.debug") != null) { + jvmArgs '-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=*:5005' + } - systemProperty "tests.rest.${cluster.name}.http_hosts", "${-> allHttpSocketURI}" - systemProperty "tests.rest.${cluster.name}.transport_hosts", "${-> allTransportSocketURI}" - } + // NOTE: this IT config discovers only junit5 (jupiter) tests. + // https://github.com/opensearch-project/sql/issues/1974 + filter { + includeTestsMatching 'org.opensearch.sql.security.CrossClusterSearchIT' + } +} + + +task integTestWithGeo(type: RestIntegTestTask) { + useCluster testClusters.remoteCluster + + getClusters().forEach { cluster -> + configureGeoPlugin(cluster) + } + + useJUnitPlatform() + dependsOn ':opensearch-sql-plugin:bundlePlugin' + testLogging { + events "passed", "skipped", "failed" + } + + systemProperty 'tests.security.manager', 'false' + systemProperty 'project.root', project.projectDir.absolutePath + // Set default query size limit + systemProperty 'defaultQuerySizeLimit', '10000' - systemProperty "https", "false" - systemProperty "user", "admin" - systemProperty "password", "admin" + // Tell the test JVM if the cluster JVM is running under a debugger so that tests can use longer timeouts for + // requests. The 'doFirst' delays reading the debug setting on the cluster till execution time. + doFirst { + systemProperty 'cluster.debug', getDebug() + getClusters().forEach { cluster -> + systemProperty "tests.rest.${cluster.name}.http_hosts", "${-> getAllHttpSocketURI(cluster)}" + systemProperty "tests.rest.${cluster.name}.transport_hosts", "${-> getAllTransportSocketURI(cluster)}" + } + systemProperties dummyLoginConfig } if (System.getProperty("test.debug") != null) { @@ -394,10 +474,11 @@ task integTestWithSecurity(type: RestIntegTestTask) { // NOTE: this IT config discovers only junit5 (jupiter) tests. // https://github.com/opensearch-project/sql/issues/1974 filter { - includeTestsMatching 'org.opensearch.sql.security.CrossClusterSearchIT' + includeTestsMatching 'org.opensearch.sql.geo.PplIpEnrichmentIT' } } + // Run PPL ITs and new, legacy and comparison SQL ITs with new SQL engine enabled integTest { useCluster testClusters.remoteCluster @@ -405,15 +486,8 @@ integTest { // Set properties for connection to clusters and between clusters doFirst { getClusters().forEach { cluster -> - String allTransportSocketURI = cluster.nodes.stream().flatMap { node -> - node.getAllTransportPortURI().stream() - }.collect(Collectors.joining(",")) - String allHttpSocketURI = cluster.nodes.stream().flatMap { node -> - node.getAllHttpSocketURI().stream() - }.collect(Collectors.joining(",")) - - systemProperty "tests.rest.${cluster.name}.http_hosts", "${-> allHttpSocketURI}" - systemProperty "tests.rest.${cluster.name}.transport_hosts", "${-> allTransportSocketURI}" + systemProperty "tests.rest.${cluster.name}.http_hosts", "${-> getAllHttpSocketURI(cluster)}" + systemProperty "tests.rest.${cluster.name}.transport_hosts", "${-> getAllTransportSocketURI(cluster)}" } } @@ -429,9 +503,7 @@ integTest { systemProperty 'tests.security.manager', 'false' systemProperty('project.root', project.projectDir.absolutePath) - systemProperty "https", System.getProperty("https") - systemProperty "user", System.getProperty("user") - systemProperty "password", System.getProperty("password") + systemProperties dynamicLoginConfig // Set default query size limit systemProperty 'defaultQuerySizeLimit', '10000' @@ -445,7 +517,6 @@ integTest { systemProperty 'cluster.debug', getDebug() } - if (System.getProperty("test.debug") != null) { jvmArgs '-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=*:5006' } @@ -482,6 +553,9 @@ integTest { // Exclude this IT, because they executed in another task (:integTestWithSecurity) exclude 'org/opensearch/sql/security/**' + + // Exclude this IT, because they executed in another task (:integTestWithGeo) + exclude 'org/opensearch/sql/geo/**' } @@ -526,37 +600,22 @@ task comparisonTest(type: RestIntegTestTask) { testDistribution = "ARCHIVE" versions = [baseVersion, opensearch_version] numberOfNodes = 3 - plugin(provider(new Callable(){ - @Override - RegularFile call() throws Exception { - return new RegularFile() { - @Override - File getAsFile() { - if (new File("$project.rootDir/$bwcFilePath/job-scheduler/$bwcVersion").exists()) { - project.delete(files("$project.rootDir/$bwcFilePath/job-scheduler/$bwcVersion")) - } - project.mkdir bwcJobSchedulerPath + bwcVersion - ant.get(src: bwcOpenSearchJSDownload, - dest: bwcJobSchedulerPath + bwcVersion, - httpusecaches: false) - return fileTree(bwcJobSchedulerPath + bwcVersion).getSingleFile() - } - } - } - })) - plugin(provider(new Callable(){ - @Override - RegularFile call() throws Exception { - return new RegularFile() { - @Override - File getAsFile() { - return configurations.zipArchive.asFileTree.matching { - include '**/opensearch-sql-plugin*' - }.singleFile - } + plugin(provider { (RegularFile) ({ + if (new File("$project.rootDir/$bwcFilePath/job-scheduler/$bwcVersion").exists()) { + project.delete(files("$project.rootDir/$bwcFilePath/job-scheduler/$bwcVersion")) } - } - })) + project.mkdir bwcJobSchedulerPath + bwcVersion + ant.get(src: bwcOpenSearchJSDownload, + dest: bwcJobSchedulerPath + bwcVersion, + httpusecaches: false) + fileTree(bwcJobSchedulerPath + bwcVersion).getSingleFile() + }) + }) + plugin(provider { (RegularFile) ( + configurations.zipArchive.asFileTree.matching { + include '**/opensearch-sql-plugin*' + }.singleFile ) + }) setting 'path.repo', "${buildDir}/cluster/shared/repo/${baseName}" setting 'http.content_type.required', 'true' } @@ -565,17 +624,9 @@ task comparisonTest(type: RestIntegTestTask) { List> plugins = [ getJobSchedulerPlugin(), - provider(new Callable() { - @Override - RegularFile call() throws Exception { - return new RegularFile() { - @Override - File getAsFile() { - return fileTree(bwcFilePath + project.version).getSingleFile() - } - } - } - }) + provider{ (RegularFile) ( + fileTree(bwcFilePath + project.version).getSingleFile()) + } ] // Creates 2 test clusters with 3 nodes of the old version. @@ -710,10 +761,7 @@ task integTestRemote(type: RestIntegTestTask) { systemProperty 'tests.security.manager', 'false' systemProperty('project.root', project.projectDir.absolutePath) systemProperty 'java.io.tmpdir', opensearch_tmp_dir.absolutePath - - systemProperty "https", System.getProperty("https") - systemProperty "user", System.getProperty("user") - systemProperty "password", System.getProperty("password") + systemProperties dynamicLoginConfig // Set default query size limit systemProperty 'defaultQuerySizeLimit', '10000' @@ -734,3 +782,9 @@ task integTestRemote(type: RestIntegTestTask) { exclude 'org/opensearch/sql/legacy/OrderIT.class' exclude 'org/opensearch/sql/jdbc/**' } + + + + + + diff --git a/integ-test/src/test/java/org/opensearch/sql/geo/PplIpEnrichmentIT.java b/integ-test/src/test/java/org/opensearch/sql/geo/PplIpEnrichmentIT.java new file mode 100644 index 00000000000..dc7db89ede6 --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/geo/PplIpEnrichmentIT.java @@ -0,0 +1,158 @@ +/* + * + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.geo; + +import static org.opensearch.sql.legacy.TestUtils.getResponseBody; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_GEOIP; +import static org.opensearch.sql.util.MatcherUtils.columnName; +import static org.opensearch.sql.util.MatcherUtils.rows; +import static org.opensearch.sql.util.MatcherUtils.verifyColumn; +import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; + +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import lombok.SneakyThrows; +import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.json.JSONObject; +import org.junit.Assert; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.client.Request; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.Response; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.sql.ppl.PPLIntegTestCase; + +/** IP enrichment PPL request with OpenSearch Geo-sptial plugin */ +public class PplIpEnrichmentIT extends PPLIntegTestCase { + + private static boolean initialized = false; + + private static Map MANIFEST_LOCATION = + Map.of( + "endpoint", + "https://raw.githubusercontent.com/opensearch-project/geospatial/main/src/test/resources/ip2geo/server/city/manifest.json"); + + private static String DATASOURCE_NAME = "dummycityindex"; + + private static String PLUGIN_NAME = "opensearch-geospatial"; + + private static String GEO_SPATIAL_DATASOURCE_PATH = "/_plugins/geospatial/ip2geo/datasource/"; + + @SneakyThrows + @BeforeEach + public void initialize() { + if (!initialized) { + setUpIndices(); + initialized = true; + } + } + + @Override + protected void init() throws Exception { + loadIndex(Index.GEOIP); + // Create a new dataSource + createDatasource(DATASOURCE_NAME, MANIFEST_LOCATION); + waitForDatasourceToBeAvailable(DATASOURCE_NAME, Duration.ofSeconds(10)); + } + + @Test + public void testGeoPluginInstallation() throws IOException { + + Request request = new Request("GET", "/_cat/plugins?v"); + RequestOptions.Builder restOptionsBuilder = RequestOptions.DEFAULT.toBuilder(); + restOptionsBuilder.addHeader("Content-Type", "application/json"); + request.setOptions(restOptionsBuilder); + Response response = client().performRequest(request); + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + Assert.assertTrue(getResponseBody(response, true).contains(PLUGIN_NAME)); + } + + @SneakyThrows + @Test + public void testGeoIpEnrichment() { + JSONObject resultGeoIp = + executeQuery( + String.format( + "search source=%s | eval enrichmentResult = geoip(\\\"%s\\\",%s)", + TEST_INDEX_GEOIP, "dummycityindex", "ip")); + + verifyColumn(resultGeoIp, columnName("name"), columnName("ip"), columnName("enrichmentResult")); + verifyDataRows( + resultGeoIp, + rows("Test user - USA", "10.1.1.1", Map.of("country", "USA", "city", "Seattle")), + rows("Test user - Canada", "127.1.1.1", Map.of("country", "Canada", "city", "Vancouver"))); + } + + /** + * Helper method to send a PUT request to create a dummy dataSource with provided endpoint for + * integration test. + * + * @param name Name of the dataSource + * @param properties Request payload in Json format + * @return Response for the create dataSource request. + * @throws IOException In case of network failure + */ + private Response createDatasource(final String name, Map properties) + throws IOException { + XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); + for (Map.Entry config : properties.entrySet()) { + builder.field(config.getKey(), config.getValue()); + } + builder.endObject(); + Request request = new Request("PUT", GEO_SPATIAL_DATASOURCE_PATH + name); + request.setJsonEntity(builder.toString()); + return client().performRequest(request); + } + + /** + * Helper method check the status of dataSource creation within the specific timeframe. + * + * @param name The name of the dataSource to assert + * @param timeout The timeout value in seconds + * @throws Exception Exception + */ + private void waitForDatasourceToBeAvailable(final String name, final Duration timeout) + throws Exception { + Instant start = Instant.now(); + while (!"AVAILABLE".equals(getDatasourceState(name))) { + if (Duration.between(start, Instant.now()).compareTo(timeout) > 0) { + throw new RuntimeException( + String.format( + Locale.ROOT, + "Datasource state didn't change to %s after %d seconds", + "AVAILABLE", + timeout.toSeconds())); + } + Thread.sleep(1000); + } + } + + /** + * Helper method to fetch the DataSource creation status via REST client. + * + * @param name dataSource name + * @return Status in String + * @throws Exception IO. + */ + private String getDatasourceState(final String name) throws Exception { + Request request = new Request("GET", GEO_SPATIAL_DATASOURCE_PATH + name); + Response response = client().performRequest(request); + var responseInMap = + createParser(XContentType.JSON.xContent(), EntityUtils.toString(response.getEntity())) + .map(); + var datasources = (List>) responseInMap.get("datasources"); + return (String) datasources.get(0).get("state"); + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java index 1728be74e6c..18ca0e33e6a 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java @@ -20,6 +20,7 @@ import static org.opensearch.sql.legacy.TestUtils.getDogs3IndexMapping; import static org.opensearch.sql.legacy.TestUtils.getEmployeeNestedTypeIndexMapping; import static org.opensearch.sql.legacy.TestUtils.getGameOfThronesIndexMapping; +import static org.opensearch.sql.legacy.TestUtils.getGeoIpIndexMapping; import static org.opensearch.sql.legacy.TestUtils.getGeopointIndexMapping; import static org.opensearch.sql.legacy.TestUtils.getJoinTypeIndexMapping; import static org.opensearch.sql.legacy.TestUtils.getLocationIndexMapping; @@ -625,6 +626,11 @@ public enum Index { "unexpandedObject", getUnexpandedObjectIndexMapping(), "src/test/resources/unexpanded_objects.json"), + GEOIP( + TestsConstants.TEST_INDEX_GEOIP, + "geoip", + getGeoIpIndexMapping(), + "src/test/resources/geoip.json"), BANK( TestsConstants.TEST_INDEX_BANK, "account", diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java b/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java index 195dda0cbdd..646e55def07 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java @@ -195,6 +195,11 @@ public static String getBankIndexMapping() { return getMappingFile(mappingFile); } + public static String getGeoIpIndexMapping() { + String mappingFile = "geoip_index_mapping.json"; + return getMappingFile(mappingFile); + } + public static String getBankWithNullValuesIndexMapping() { String mappingFile = "bank_with_null_values_index_mapping.json"; return getMappingFile(mappingFile); diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java b/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java index 1e336f544e9..1e56388dd3f 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java @@ -58,6 +58,7 @@ public class TestsConstants { public static final String TEST_INDEX_MULTI_NESTED_TYPE = TEST_INDEX + "_multi_nested"; public static final String TEST_INDEX_NESTED_WITH_NULLS = TEST_INDEX + "_nested_with_nulls"; public static final String TEST_INDEX_GEOPOINT = TEST_INDEX + "_geopoint"; + public static final String TEST_INDEX_GEOIP = TEST_INDEX + "_geoip"; public static final String DATASOURCES = ".ql-datasources"; public static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; diff --git a/integ-test/src/test/java/org/opensearch/sql/security/CrossClusterSearchIT.java b/integ-test/src/test/java/org/opensearch/sql/security/CrossClusterSearchIT.java index cdf467706c6..7946fcd706d 100644 --- a/integ-test/src/test/java/org/opensearch/sql/security/CrossClusterSearchIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/security/CrossClusterSearchIT.java @@ -8,6 +8,7 @@ import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_DOG; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_GEOIP; import static org.opensearch.sql.util.MatcherUtils.columnName; import static org.opensearch.sql.util.MatcherUtils.rows; import static org.opensearch.sql.util.MatcherUtils.verifyColumn; @@ -41,6 +42,7 @@ public class CrossClusterSearchIT extends PPLIntegTestCase { private static final String TEST_INDEX_BANK_REMOTE = REMOTE_CLUSTER + ":" + TEST_INDEX_BANK; private static final String TEST_INDEX_DOG_REMOTE = REMOTE_CLUSTER + ":" + TEST_INDEX_DOG; + private static final String TEST_INDEX_GEOIP_REMOTE = REMOTE_CLUSTER + ":" + TEST_INDEX_GEOIP; private static final String TEST_INDEX_DOG_MATCH_ALL_REMOTE = MATCH_ALL_REMOTE_CLUSTER + ":" + TEST_INDEX_DOG; private static final String TEST_INDEX_ACCOUNT_REMOTE = REMOTE_CLUSTER + ":" + TEST_INDEX_ACCOUNT; diff --git a/integ-test/src/test/resources/geoip.json b/integ-test/src/test/resources/geoip.json new file mode 100644 index 00000000000..83e1cc72d60 --- /dev/null +++ b/integ-test/src/test/resources/geoip.json @@ -0,0 +1,4 @@ +{"index":{"_id":"1"}} +{"name":"Test user - USA","ip":"10.1.1.1"} +{"index":{"_id":"6"}} +{"name":"Test user - Canada","ip": "127.1.1.1"} diff --git a/integ-test/src/test/resources/indexDefinitions/geoip_index_mapping.json b/integ-test/src/test/resources/indexDefinitions/geoip_index_mapping.json new file mode 100644 index 00000000000..aa059567896 --- /dev/null +++ b/integ-test/src/test/resources/indexDefinitions/geoip_index_mapping.json @@ -0,0 +1,12 @@ +{ + "mappings": { + "properties": { + "name": { + "type": "text" + }, + "ip": { + "type": "text" + } + } + } +} diff --git a/opensearch/build.gradle b/opensearch/build.gradle index c47806b6bbf..b061e29a4da 100644 --- a/opensearch/build.gradle +++ b/opensearch/build.gradle @@ -40,6 +40,8 @@ dependencies { implementation group: 'org.json', name: 'json', version:'20231013' compileOnly group: 'org.opensearch.client', name: 'opensearch-rest-high-level-client', version: "${opensearch_version}" implementation group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}" + implementation group: 'org.opensearch', name:'geospatial-client', version: "${opensearch_build}" + testImplementation('org.junit.jupiter:junit-jupiter-api:5.9.3') testImplementation('org.junit.jupiter:junit-jupiter-params:5.9.3') diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtector.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtector.java index 358bc10ab4b..235ddc00752 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtector.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtector.java @@ -10,6 +10,7 @@ import org.opensearch.sql.opensearch.planner.physical.ADOperator; import org.opensearch.sql.opensearch.planner.physical.MLCommonsOperator; import org.opensearch.sql.opensearch.planner.physical.MLOperator; +import org.opensearch.sql.opensearch.planner.physical.OpenSearchEvalOperator; import org.opensearch.sql.planner.physical.AggregationOperator; import org.opensearch.sql.planner.physical.CursorCloseOperator; import org.opensearch.sql.planner.physical.DedupeOperator; @@ -96,6 +97,13 @@ public PhysicalPlan visitRemove(RemoveOperator node, Object context) { @Override public PhysicalPlan visitEval(EvalOperator node, Object context) { + if (node instanceof OpenSearchEvalOperator evalOperator) { + return doProtect( + new OpenSearchEvalOperator( + visitInput(evalOperator.getInput(), context), + evalOperator.getExpressionList(), + evalOperator.getNodeClient())); + } return new EvalOperator(visitInput(node.getInput(), context), node.getExpressionList()); } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchEvalOperator.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchEvalOperator.java new file mode 100644 index 00000000000..0b4365e47b2 --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchEvalOperator.java @@ -0,0 +1,140 @@ +/* + * + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.opensearch.planner.physical; + +import static org.opensearch.sql.data.type.ExprCoreType.STRUCT; +import static org.opensearch.sql.expression.env.Environment.extendEnv; + +import com.google.common.collect.ImmutableMap; +import java.util.Arrays; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import lombok.Getter; +import org.apache.commons.lang3.tuple.Pair; +import org.opensearch.client.node.NodeClient; +import org.opensearch.geospatial.action.IpEnrichmentActionClient; +import org.opensearch.sql.common.utils.StringUtils; +import org.opensearch.sql.data.model.ExprStringValue; +import org.opensearch.sql.data.model.ExprTupleValue; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.model.ExprValueUtils; +import org.opensearch.sql.expression.Expression; +import org.opensearch.sql.expression.ReferenceExpression; +import org.opensearch.sql.expression.env.Environment; +import org.opensearch.sql.expression.ip.OpenSearchFunctionExpression; +import org.opensearch.sql.planner.physical.EvalOperator; +import org.opensearch.sql.planner.physical.PhysicalPlan; + +/** + * OpenSearch version of eval operator, which contains nodeClient, in order to perform OpenSearch + * related operation during the eval process. + */ +public class OpenSearchEvalOperator extends EvalOperator { + + @Getter private final NodeClient nodeClient; + + public OpenSearchEvalOperator( + PhysicalPlan input, + List> expressionList, + NodeClient nodeClient) { + super(input, expressionList); + this.nodeClient = nodeClient; + } + + @Override + public ExprValue next() { + ExprValue inputValue = this.getInput().next(); + Map evalMap = eval(inputValue.bindingTuples()); + if (STRUCT == inputValue.type()) { + ImmutableMap.Builder resultBuilder = new ImmutableMap.Builder<>(); + Map tupleValue = ExprValueUtils.getTupleValue(inputValue); + for (Map.Entry valueEntry : tupleValue.entrySet()) { + if (evalMap.containsKey(valueEntry.getKey())) { + resultBuilder.put(valueEntry.getKey(), evalMap.get(valueEntry.getKey())); + evalMap.remove(valueEntry.getKey()); + } else { + resultBuilder.put(valueEntry); + } + } + resultBuilder.putAll(evalMap); + return ExprTupleValue.fromExprValueMap(resultBuilder.build()); + } else { + return inputValue; + } + } + + /** + * Evaluate the expression in the {@link EvalOperator} with {@link Environment}. + * + * @param env {@link Environment} + * @return The mapping of reference and {@link ExprValue} for each expression. + */ + private Map eval(Environment env) { + Map evalResultMap = new LinkedHashMap<>(); + for (Pair pair : this.getExpressionList()) { + ReferenceExpression var = pair.getKey(); + Expression valueExpr = pair.getValue(); + ExprValue value; + if (valueExpr instanceof OpenSearchFunctionExpression openSearchFuncExpression) { + if ("geoip".equals(openSearchFuncExpression.getFunctionName().getFunctionName())) { + // Rewrite to encapsulate the try catch. + value = fetchIpEnrichment(openSearchFuncExpression.getArguments(), env); + } else { + return null; + } + } else { + value = pair.getValue().valueOf(env); + } + env = extendEnv(env, var, value); + evalResultMap.put(var.toString(), value); + } + return evalResultMap; + } + + private ExprValue fetchIpEnrichment( + List arguments, Environment env) { + final Set PERMITTED_OPTIONS = + Set.of( + "country_iso_code", + "country_name", + "continent_name", + "region_iso_code", + "region_name", + "city_name", + "time_zone", + "location"); + IpEnrichmentActionClient ipClient = new IpEnrichmentActionClient(nodeClient); + String dataSource = StringUtils.unquoteText(arguments.get(0).toString()); + String ipAddress = arguments.get(1).valueOf(env).stringValue(); + final Set options = new HashSet<>(); + if (arguments.size() > 2) { + String option = StringUtils.unquoteText(arguments.get(2).toString()); + // Convert the option into a set. + options.addAll( + Arrays.stream(option.split(",")) + .filter(PERMITTED_OPTIONS::contains) + .collect(Collectors.toSet())); + } + try { + Map geoLocationData = ipClient.getGeoLocationData(ipAddress, dataSource); + Map enrichmentResult = + geoLocationData.entrySet().stream() + .filter(entry -> options.isEmpty() || options.contains(entry.getKey())) + .collect( + Collectors.toMap( + Map.Entry::getKey, v -> new ExprStringValue(v.getValue().toString()))); + return ExprTupleValue.fromExprValueMap(enrichmentResult); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java index b8822cd1e81..f93389c6657 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java @@ -21,6 +21,7 @@ import org.opensearch.sql.opensearch.planner.physical.ADOperator; import org.opensearch.sql.opensearch.planner.physical.MLCommonsOperator; import org.opensearch.sql.opensearch.planner.physical.MLOperator; +import org.opensearch.sql.opensearch.planner.physical.OpenSearchEvalOperator; import org.opensearch.sql.opensearch.request.OpenSearchRequest; import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder; import org.opensearch.sql.opensearch.request.system.OpenSearchDescribeIndexRequest; @@ -28,6 +29,7 @@ import org.opensearch.sql.opensearch.storage.scan.OpenSearchIndexScanBuilder; import org.opensearch.sql.planner.DefaultImplementor; import org.opensearch.sql.planner.logical.LogicalAD; +import org.opensearch.sql.planner.logical.LogicalEval; import org.opensearch.sql.planner.logical.LogicalML; import org.opensearch.sql.planner.logical.LogicalMLCommons; import org.opensearch.sql.planner.logical.LogicalPlan; @@ -209,5 +211,11 @@ public PhysicalPlan visitAD(LogicalAD node, OpenSearchIndexScan context) { public PhysicalPlan visitML(LogicalML node, OpenSearchIndexScan context) { return new MLOperator(visitChild(node, context), node.getArguments(), client.getNodeClient()); } + + @Override + public PhysicalPlan visitEval(LogicalEval node, OpenSearchIndexScan context) { + return new OpenSearchEvalOperator( + visitChild(node, context), node.getExpressions(), client.getNodeClient()); + } } } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java index 724178bd346..b2a151d2e61 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java @@ -47,6 +47,7 @@ import org.opensearch.sql.ast.tree.Trendline; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.data.model.ExprBooleanValue; +import org.opensearch.sql.data.type.ExprCoreType; import org.opensearch.sql.expression.DSL; import org.opensearch.sql.expression.Expression; import org.opensearch.sql.expression.NamedExpression; @@ -58,10 +59,12 @@ import org.opensearch.sql.expression.window.ranking.RankFunction; import org.opensearch.sql.monitor.ResourceMonitor; import org.opensearch.sql.opensearch.client.OpenSearchClient; +import org.opensearch.sql.opensearch.data.type.OpenSearchTextType; import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory; import org.opensearch.sql.opensearch.planner.physical.ADOperator; import org.opensearch.sql.opensearch.planner.physical.MLCommonsOperator; import org.opensearch.sql.opensearch.planner.physical.MLOperator; +import org.opensearch.sql.opensearch.planner.physical.OpenSearchEvalOperator; import org.opensearch.sql.opensearch.request.OpenSearchRequest; import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder; import org.opensearch.sql.opensearch.setting.OpenSearchSettings; @@ -339,6 +342,24 @@ public void test_visitTrendline() { executionProtector.visitTrendline(trendlineOperator, null)); } + @Test + void test_visitOpenSearchEval() { + NodeClient nodeClient = mock(NodeClient.class); + OpenSearchEvalOperator evalOperator = + // ADOperator adOperator = + new OpenSearchEvalOperator( + values(emptyList()), + List.of( + ImmutablePair.of( + new ReferenceExpression("ageInAbs", OpenSearchTextType.of()), + DSL.abs(DSL.abs(new ReferenceExpression("age", ExprCoreType.LONG))))), + nodeClient); + + assertEquals( + executionProtector.doProtect(evalOperator), + executionProtector.visitEval(evalOperator, null)); + } + PhysicalPlan resourceMonitor(PhysicalPlan input) { return new ResourceMonitorPlan(input, resourceMonitor); } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchEvalOperatorTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchEvalOperatorTest.java new file mode 100644 index 00000000000..3a00494efae --- /dev/null +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchEvalOperatorTest.java @@ -0,0 +1,137 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.opensearch.planner.physical; + +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; +import static org.opensearch.sql.data.type.ExprCoreType.BOOLEAN; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import lombok.SneakyThrows; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.action.ActionFuture; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.geospatial.action.IpEnrichmentAction; +import org.opensearch.geospatial.action.IpEnrichmentRequest; +import org.opensearch.geospatial.action.IpEnrichmentResponse; +import org.opensearch.sql.data.model.ExprLongValue; +import org.opensearch.sql.data.model.ExprTupleValue; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.type.ExprCoreType; +import org.opensearch.sql.expression.DSL; +import org.opensearch.sql.expression.Expression; +import org.opensearch.sql.expression.ReferenceExpression; +import org.opensearch.sql.expression.function.BuiltinFunctionName; +import org.opensearch.sql.expression.ip.OpenSearchFunctionExpression; +import org.opensearch.sql.opensearch.data.type.OpenSearchTextType; +import org.opensearch.sql.opensearch.data.value.OpenSearchExprTextValue; +import org.opensearch.sql.planner.physical.PhysicalPlan; + +/** To assert the original behaviour of eval operator. */ +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +@RunWith(MockitoJUnitRunner.Silent.class) +public class OpenSearchEvalOperatorTest { + + @Mock private PhysicalPlan input; + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private NodeClient nodeClient; + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private ActionFuture actionFuture; + + private final ExprTupleValue DATE_ROW = + new ExprTupleValue( + new LinkedHashMap<>( + Map.of( + "firstname", + new OpenSearchExprTextValue("Amber"), + "age", + new ExprLongValue(32), + "email", + new OpenSearchExprTextValue("amberduke@pyrami.com"), + "ipInStr", + new OpenSearchExprTextValue("192.168.1.1")))); + + /** + * The test-case aim to assert OpenSearchEvalOperator behaviour when evaluating generic + * expression, which is not OpenSearch Engine specific. (Ex: ABS(age) ) + */ + @Test + public void testEvalOperatorOnGenericOperations() { + + // The input dataset + when(input.next()).thenReturn(DATE_ROW); + + // Expression to be evaluated + List> ipAddress = + List.of( + ImmutablePair.of( + new ReferenceExpression("ageInAbs", OpenSearchTextType.of()), + DSL.abs(DSL.abs(new ReferenceExpression("age", ExprCoreType.LONG))))); + + OpenSearchEvalOperator evalOperator = new OpenSearchEvalOperator(input, ipAddress, nodeClient); + + // Make sure generic Expression function as expected when wrapped with OpenSearchEvalOperator. + assertSame(32, evalOperator.next().keyValue("ageInAbs").integerValue()); + } + + /** + * The test-case aim to assert OpenSearchEvalOperator behaviour when evaluating + * geoipFunctionExpression, which is only available when executing on OpeSearch storage engine. + * (No option) + */ + @SneakyThrows + @Test + public void testEvalOperatorOnGeoIpExpression() { + + // The input dataset + when(input.next()).thenReturn(DATE_ROW); + when(nodeClient.execute( + eq(IpEnrichmentAction.INSTANCE), + argThat( + request -> + request instanceof IpEnrichmentRequest + && "192.168.1.1".equals(((IpEnrichmentRequest) request).getIpString())))) + .thenReturn(actionFuture); + when(actionFuture.get()).thenReturn(new IpEnrichmentResponse(Map.of("country_name", "Canada"))); + + // Expression to be evaluated + List> ipAddress = + List.of( + ImmutablePair.of( + new ReferenceExpression("ipEnrichmentResult", OpenSearchTextType.of()), + new OpenSearchFunctionExpression( + BuiltinFunctionName.GEOIP.getName(), + List.of( + DSL.literal("my-datasource"), + new ReferenceExpression("ipInStr", OpenSearchTextType.of())), + BOOLEAN))); + + OpenSearchEvalOperator evalOperator = new OpenSearchEvalOperator(input, ipAddress, nodeClient); + + // Make sure generic Expression function as expected when wrapped with OpenSearchEvalOperator. + Map ipEnrichmentResult = + evalOperator.next().keyValue("ipEnrichmentResult").tupleValue(); + assertSame("Canada", ipEnrichmentResult.get("country_name").stringValue()); + } +} diff --git a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 index 053ec530db4..a13dfc04998 100644 --- a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 @@ -87,6 +87,9 @@ ANOMALY_SCORE_THRESHOLD: 'ANOMALY_SCORE_THRESHOLD'; CASE: 'CASE'; IN: 'IN'; +// Geo IP eval function +GEOIP: 'GEOIP'; + // LOGICAL KEYWORDS NOT: 'NOT'; OR: 'OR'; diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index 27f7e4014ba..f508732b68d 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -419,6 +419,7 @@ evalFunctionName | flowControlFunctionName | systemFunctionName | positionFunctionName + | goeipFunctionName ; functionArgs @@ -520,6 +521,10 @@ mathematicalFunctionName | trigonometricFunctionName ; +goeipFunctionName + : GEOIP + ; + trigonometricFunctionName : ACOS | ASIN @@ -863,6 +868,7 @@ keywordsCanBeId | mathematicalFunctionName | positionFunctionName | conditionFunctionName + | goeipFunctionName // commands | SEARCH | DESCRIBE