Skip to content

Commit a583373

Browse files
committed
draft for gbh:
- Re-name SPI contract to ReaderManagerProvider to load in DataFormatAwareEngineFactory. - Deletes EngineBridge - replace with SearchExecEngine - add delegation framework api - sketch with just filter for now - removes non reader functionality from DataFormatAwareEngineFactory/Engine. Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
1 parent 7f5f3e6 commit a583373

72 files changed

Lines changed: 3799 additions & 674 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
.claude
22
CLAUDE.md
33
.cursor*
4-
4+
.kiro/
5+
**/target/**
56
# intellij files
67
.idea/
78
*.iml

gradle/run.gradle

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,9 @@ testClusters {
6060
for (String p : installedPlugins) {
6161
// check if its a local plugin first
6262
if (project.findProject(':plugins:' + p) != null) {
63-
plugin('plugins:' + p)
63+
plugin(':plugins:' + p)
64+
} else if (project.findProject(':sandbox:plugins:' + p) != null) {
65+
plugin(':sandbox:plugins:' + p)
6466
} else {
6567
// attempt to fetch it from maven
6668
project.repositories.mavenLocal()

sandbox/libs/analytics-framework/build.gradle

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,15 @@
1414

1515
def calciteVersion = '1.41.0'
1616

17+
// Guava comes transitively from calcite-core — forbidden on compile classpaths by OpenSearch.
18+
// Bypass via custom config for classes that extend Calcite types referencing ImmutableList.
19+
configurations {
20+
calciteCompile
21+
}
22+
sourceSets.main.compileClasspath += configurations.calciteCompile
23+
1724
dependencies {
25+
calciteCompile "com.google.guava:guava:${versions.guava}"
1826
compileOnly project(':server')
1927
api "org.apache.calcite:calcite-core:${calciteVersion}"
2028
// Calcite's expression tree and Enumerable runtime — required by calcite-core API
@@ -27,6 +35,30 @@ dependencies {
2735
// SLF4J — Calcite's logging facade
2836
runtimeOnly "org.slf4j:slf4j-api:${versions.slf4j}"
2937

38+
// Calcite optional deps required at runtime — BuiltInMethod.<clinit> reflectively loads ALL
39+
// methods which triggers class loading for every type referenced in Calcite's SqlFunctions.
40+
// Every single one of these is needed or the class initializer fails with NoClassDefFoundError.
41+
runtimeOnly "commons-codec:commons-codec:${versions.commonscodec}"
42+
runtimeOnly "org.codehaus.janino:janino:3.1.12"
43+
runtimeOnly "org.codehaus.janino:commons-compiler:3.1.12"
44+
runtimeOnly 'org.jooq:joou-java-6:0.9.4'
45+
runtimeOnly "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"
46+
runtimeOnly "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}"
47+
runtimeOnly "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson_annotations}"
48+
runtimeOnly "org.apache.commons:commons-lang3:${versions.commonslang}"
49+
runtimeOnly 'org.apache.commons:commons-text:1.11.0'
50+
runtimeOnly 'org.apache.commons:commons-math3:3.6.1'
51+
runtimeOnly 'org.immutables:value-annotations:2.8.8'
52+
runtimeOnly 'com.jayway.jsonpath:json-path:2.9.0'
53+
runtimeOnly "net.minidev:json-smart:${versions.json_smart}"
54+
runtimeOnly 'net.minidev:accessors-smart:2.5.2'
55+
runtimeOnly 'org.ow2.asm:asm:9.7.1'
56+
runtimeOnly 'org.apache.calcite.avatica:avatica-metrics:1.27.0'
57+
runtimeOnly "org.locationtech.jts:jts-core:${versions.jts}"
58+
runtimeOnly 'org.locationtech.jts.io:jts-io-common:1.19.0'
59+
runtimeOnly 'org.locationtech.proj4j:proj4j:1.2.2'
60+
runtimeOnly 'com.google.uzaygezen:uzaygezen-core:0.2'
61+
3062
// Calcite bytecode references annotations from apiguardian (@API) and
3163
// checker-framework (@EnsuresNonNullIf). compileOnlyApi propagates to
3264
// consumers' compile/javadoc classpath without becoming a runtime dep.

sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/backend/EngineBridge.java

Lines changed: 0 additions & 57 deletions
This file was deleted.
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.analytics.backend;
10+
11+
import org.opensearch.analytics.delegation.DelegationContext;
12+
import org.opensearch.analytics.plan.ResolvedPlan;
13+
import org.opensearch.index.engine.DataFormatAwareEngine;
14+
import org.opensearch.plugins.ReaderManagerProvider;
15+
16+
import java.util.LinkedHashMap;
17+
import java.util.Map;
18+
19+
/**
20+
* Execution context carrying plan, reader, and delegation state through
21+
* the query execution lifecycle.
22+
*
23+
* @opensearch.internal
24+
*/
25+
public class ExecutionContext {
26+
27+
private final ResolvedPlan plan;
28+
private final String tableName;
29+
private DelegationContext delegationContext;
30+
private ReaderProvider readerProvider;
31+
32+
public ExecutionContext(ResolvedPlan plan, String tableName) {
33+
this.plan = plan;
34+
this.tableName = tableName;
35+
}
36+
37+
public ResolvedPlan plan() {
38+
return plan;
39+
}
40+
41+
public String getTableName() {
42+
return tableName;
43+
}
44+
45+
public void setDelegationContext(DelegationContext delegationContext) {
46+
this.delegationContext = delegationContext;
47+
}
48+
49+
public boolean hasDelegation() {
50+
return delegationContext != null && delegationContext.hasDelegation();
51+
}
52+
53+
public DelegationContext getDelegationContext() {
54+
return delegationContext;
55+
}
56+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.analytics.backend;
10+
11+
import org.opensearch.analytics.plan.ResolvedPlan;
12+
import org.opensearch.common.annotation.ExperimentalApi;
13+
14+
import java.io.Closeable;
15+
import java.io.IOException;
16+
17+
/**
18+
* Shard-level search execution engine interface.
19+
* @opensearch.experimental
20+
*/
21+
@ExperimentalApi
22+
public interface SearchExecEngine extends Closeable {
23+
24+
/**
25+
* Creates an execution context from a resolved plan.
26+
*
27+
* @param context ExecutionContext
28+
*/
29+
void prepare(ExecutionContext context);
30+
31+
/** Executes the context and returns a result stream. */
32+
EngineResultStream execute(ExecutionContext context) throws IOException;
33+
34+
@Override
35+
default void close() throws IOException {}
36+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.analytics.delegation;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.analytics.delegation.filter.FilterDelegationTarget;
14+
import org.opensearch.common.annotation.ExperimentalApi;
15+
16+
import java.util.concurrent.ConcurrentHashMap;
17+
import java.util.concurrent.atomic.AtomicLong;
18+
19+
/**
20+
* Registry mapping delegation context IDs to {@link DelegationTarget} instances.
21+
* Each target gets its own ID. A single query may register multiple targets.
22+
*
23+
* <p>Rust JNI callbacks resolve targets via the static {@link #delegateFilter} entry point.
24+
*
25+
* @opensearch.internal
26+
*/
27+
@ExperimentalApi
28+
public class DelegationBroker {
29+
30+
private static final Logger logger = LogManager.getLogger(DelegationBroker.class);
31+
private static final DelegationBroker INSTANCE = new DelegationBroker();
32+
33+
private final AtomicLong nextId = new AtomicLong(1);
34+
private final ConcurrentHashMap<Long, DelegationTarget> targets = new ConcurrentHashMap<>();
35+
36+
public static DelegationBroker getInstance() {
37+
return INSTANCE;
38+
}
39+
40+
/**
41+
* Registers a delegation target and returns its context ID.
42+
*/
43+
public long register(DelegationTarget target) {
44+
long id = nextId.getAndIncrement();
45+
targets.put(id, target);
46+
logger.info("[DelegationBroker] register: id={}, type={}", id, target.type());
47+
return id;
48+
}
49+
50+
/**
51+
* Releases a delegation context.
52+
*/
53+
public void release(long delegationContextId) {
54+
targets.remove(delegationContextId);
55+
}
56+
57+
/**
58+
* Resolves a {@link FilterDelegationTarget} by context ID.
59+
*/
60+
FilterDelegationTarget resolveFilterTarget(long delegationContextId) {
61+
DelegationTarget target = targets.get(delegationContextId);
62+
return target instanceof FilterDelegationTarget ? (FilterDelegationTarget) target : null;
63+
}
64+
65+
/**
66+
* Called from Rust via JNI to delegate a filter predicate.
67+
*
68+
* @param delegationContextId the context ID
69+
* @param targetBackend the backend name (for logging/routing)
70+
* @param segmentOrd 0-based segment ordinal
71+
* @param minDocId inclusive min doc ID
72+
* @param maxDocId exclusive max doc ID
73+
* @return matching doc IDs as BitSet.toLongArray(), or empty on error
74+
*/
75+
public static long[] delegateFilter(
76+
long delegationContextId, String targetBackend,
77+
int segmentOrd, int minDocId, int maxDocId) {
78+
logger.info("[DelegationBroker] delegateFilter: ctxId={}, backend={}, segment={}, docs=[{}, {})",
79+
delegationContextId, targetBackend, segmentOrd, minDocId, maxDocId);
80+
81+
FilterDelegationTarget target = INSTANCE.resolveFilterTarget(delegationContextId);
82+
if (target == null) {
83+
logger.warn("[DelegationBroker] No FilterDelegationTarget for ctxId={}", delegationContextId);
84+
return new long[0];
85+
}
86+
87+
long[] result = target.delegateFilter(targetBackend, segmentOrd, minDocId, maxDocId);
88+
logger.info("[DelegationBroker] delegateFilter result: segment={}, bitsetWords={}", segmentOrd, result.length);
89+
return result;
90+
}
91+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.analytics.delegation;
10+
11+
import java.util.List;
12+
13+
/**
14+
* Carries delegation state for a query. Holds the broker-assigned context IDs
15+
* for all registered delegation targets.
16+
*
17+
* @opensearch.internal
18+
*/
19+
public class DelegationContext {
20+
21+
public static final DelegationContext NONE = new DelegationContext(List.of());
22+
23+
private final List<Long> ids;
24+
25+
public DelegationContext(List<Long> ids) {
26+
this.ids = List.copyOf(ids);
27+
}
28+
29+
/** All delegation context IDs for this query. */
30+
public List<Long> getIds() {
31+
return ids;
32+
}
33+
34+
/** Returns true if this context carries active delegations. */
35+
public boolean hasDelegation() {
36+
return !ids.isEmpty();
37+
}
38+
39+
/** Releases all delegation targets from the broker. */
40+
public void release() {
41+
DelegationBroker broker = DelegationBroker.getInstance();
42+
for (long id : ids) {
43+
broker.release(id);
44+
}
45+
}
46+
}

0 commit comments

Comments
 (0)