Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,17 @@ allprojects {
compile.options.compilerArgs << '-Xdoclint:html'
compile.options.compilerArgs << '-Xdoclint:reference'
compile.options.compilerArgs << '-Xdoclint:syntax'

// ScopedValue is preview in JDK 21-24; modules targeting release 21+ need --enable-preview to load :server classes
if (JavaVersion.current().majorVersion.toInteger() < 25) {
compile.doFirst {
def rel = compile.options.release.getOrNull()
if (rel == null || rel >= 21) {
compile.options.compilerArgs << '--enable-preview'
compile.options.compilerArgs << '-Xlint:-preview'
}
}
}
}

// ignore missing javadocs
Expand All @@ -325,6 +336,9 @@ allprojects {
}
javadoc.options.tags = ["opensearch.internal", "opensearch.api", "opensearch.experimental"]
javadoc.options.addStringOption("-release", java.targetCompatibility.majorVersion)
if (JavaVersion.current().majorVersion.toInteger() < 25) {
javadoc.options.addBooleanOption('-enable-preview', true)
}
}

// support for reproducible builds
Expand Down
3 changes: 3 additions & 0 deletions distribution/src/config/jvm.options
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ ${error.file}
23:-XX:CompileCommand=dontinline,java/lang/invoke/MethodHandle.setAsTypeCache
23:-XX:CompileCommand=dontinline,java/lang/invoke/MethodHandle.asTypeUncached

# ScopedValue is preview in JDK 21-24, final in JDK 25+
21-24:--enable-preview

21-:-javaagent:agent/opensearch-agent.jar
21-:--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED

Expand Down
5 changes: 4 additions & 1 deletion gradle/missing-javadoc.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,10 @@ class MissingJavadocTask extends DefaultTask {
opts << [ '--missing-method', String.join(',', javadocMissingMethod) ]
}
opts << [ '-quiet' ]
opts << [ '--release', 21 ]
opts << [ '--release', JavaVersion.current().majorVersion.toInteger() ]
if (JavaVersion.current().majorVersion.toInteger() < 25) {
opts << '--enable-preview'
}
opts << '-Xdoclint:all,-missing'

// Temporary file that holds all javadoc options for the current task.
Expand Down
23 changes: 23 additions & 0 deletions server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,25 @@ base {
archivesName = 'opensearch'
}

// ScopedValue is preview in JDK 21-24, final in JDK 25.
// Match release to compiler JDK so ScopedValue is available.
java.sourceCompatibility = JavaVersion.current()
java.targetCompatibility = JavaVersion.current()
if (JavaVersion.current().majorVersion.toInteger() < 25) {
tasks.withType(Test).configureEach {
jvmArgs += ['--enable-preview']
}
}

// Advertise JVM 21 compatibility so downstream modules can depend on :server.
['apiElements', 'runtimeElements'].each { name ->
configurations.named(name) { conf ->
conf.attributes {
attribute(TargetJvmVersion.TARGET_JVM_VERSION_ATTRIBUTE, 21)
}
}
}

sourceSets {
main {
java {
Expand Down Expand Up @@ -139,6 +158,10 @@ tasks.withType(JavaCompile).configureEach {
options.compilerArgs -= '-Xlint:cast'
options.compilerArgs -= '-Xlint:rawtypes'
options.compilerArgs -= '-Xlint:unchecked'
options.compilerArgs -= '-Xlint:removal'
options.compilerArgs += ['-Xlint:-removal']
options.compilerArgs -= '-Xlint:preview'
options.compilerArgs += ['-Xlint:-preview']
}

compileJava {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.util.concurrent;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* Holds references to IndexInput clones/slices created during a thread pool task execution.
* Bound via {@link ScopedValue} in {@link OpenSearchThreadPoolExecutor} so that all
* registered IndexInputs are closed when the task completes.
*
* <p>For slices, {@code close()} only unpins the current block without setting
* {@code isOpen=false}, so merge threads that hold references can still use them.
*
* @opensearch.internal
*/
public final class IndexInputScope {

private static final Logger logger = LogManager.getLogger(IndexInputScope.class);

public static final ScopedValue<IndexInputScope> SCOPE = ScopedValue.newInstance();

private final List<Closeable> inputs = new ArrayList<>();

/**
* Register an IndexInput (clone or slice) for cleanup when this scope ends.
*/
public void register(Closeable input) {
inputs.add(input);
}

/**
* Close all registered IndexInputs. Called in the finally block of task execution.
*/
public void closeAll() {
for (int i = inputs.size() - 1; i >= 0; i--) {
try {
inputs.get(i).close();
} catch (IOException e) {
logger.trace("failed to close IndexInput in scope", e);
}
}
inputs.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,19 @@ public interface ShutdownListener {

@Override
public void execute(Runnable command) {
command = wrapRunnable(command);
// Bind IndexInputScope per task. Wrap innermost so TimedRunnable/preserveContext
// layering is preserved for afterExecute unwrap.
final Runnable inner = command;
command = wrapRunnable(() -> {
final IndexInputScope scope = new IndexInputScope();
ScopedValue.where(IndexInputScope.SCOPE, scope).run(() -> {
try {
inner.run();
} finally {
scope.closeAll();
}
});
});
try {
super.execute(command);
} catch (OpenSearchRejectedExecutionException ex) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.util.concurrent;

import org.opensearch.common.settings.Settings;
import org.opensearch.test.OpenSearchTestCase;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class IndexInputScopeTests extends OpenSearchTestCase {

public void testRegisterAndCloseAll() {
IndexInputScope scope = new IndexInputScope();
AtomicInteger closeCount = new AtomicInteger();
Closeable input1 = closeCount::incrementAndGet;
Closeable input2 = closeCount::incrementAndGet;
Closeable input3 = closeCount::incrementAndGet;

scope.register(input1);
scope.register(input2);
scope.register(input3);
scope.closeAll();

assertEquals(3, closeCount.get());
}

public void testCloseAllOnEmptyScope() {
IndexInputScope scope = new IndexInputScope();
scope.closeAll(); // should not throw
}

public void testCloseAllSuppressesIOException() {
IndexInputScope scope = new IndexInputScope();
AtomicInteger closeCount = new AtomicInteger();

scope.register(() -> { throw new IOException("simulated failure"); });
scope.register(closeCount::incrementAndGet);

scope.closeAll(); // should not throw, should continue closing remaining inputs
assertEquals(1, closeCount.get());
}

public void testCloseAllClearsList() {
IndexInputScope scope = new IndexInputScope();
AtomicInteger closeCount = new AtomicInteger();
scope.register(closeCount::incrementAndGet);

scope.closeAll();
assertEquals(1, closeCount.get());

// second closeAll should not close again
scope.closeAll();
assertEquals(1, closeCount.get());
}

public void testScopedValueBoundDuringRun() {
assertFalse(IndexInputScope.SCOPE.isBound());

AtomicBoolean wasBound = new AtomicBoolean();
IndexInputScope scope = new IndexInputScope();
ScopedValue.where(IndexInputScope.SCOPE, scope).run(() -> {
wasBound.set(IndexInputScope.SCOPE.isBound());
assertSame(scope, IndexInputScope.SCOPE.get());
});

assertTrue(wasBound.get());
assertFalse(IndexInputScope.SCOPE.isBound());
}

public void testScopedValueNotBoundOutsideTask() {
assertFalse(IndexInputScope.SCOPE.isBound());
}

public void testScopeIsolationBetweenTasks() throws Exception {
AtomicInteger closeCount = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(2);

Thread t1 = new Thread(() -> {
IndexInputScope scope = new IndexInputScope();
ScopedValue.where(IndexInputScope.SCOPE, scope).run(() -> {
scope.register(closeCount::incrementAndGet);
scope.register(closeCount::incrementAndGet);
scope.closeAll();
latch.countDown();
});
});

Thread t2 = new Thread(() -> {
IndexInputScope scope = new IndexInputScope();
ScopedValue.where(IndexInputScope.SCOPE, scope).run(() -> {
scope.register(closeCount::incrementAndGet);
scope.closeAll();
latch.countDown();
});
});

t1.start();
t2.start();
latch.await();

assertEquals(3, closeCount.get());
}

public void testScopeInThreadPoolExecutor() throws Exception {
AtomicBoolean scopeBound = new AtomicBoolean();
AtomicInteger closeCount = new AtomicInteger();
CountDownLatch done = new CountDownLatch(1);

OpenSearchThreadPoolExecutor executor = OpenSearchExecutors.newFixed(
"test-scope",
1,
10,
OpenSearchExecutors.daemonThreadFactory("test-scope"),
new ThreadContext(Settings.EMPTY)
);

try {
executor.execute(() -> {
scopeBound.set(IndexInputScope.SCOPE.isBound());
if (IndexInputScope.SCOPE.isBound()) {
IndexInputScope.SCOPE.get().register(closeCount::incrementAndGet);
IndexInputScope.SCOPE.get().register(closeCount::incrementAndGet);
}
done.countDown();
});

done.await();
assertBusy(() -> assertEquals(2, closeCount.get()));
assertTrue(scopeBound.get());
} finally {
executor.shutdown();
}
}

public void testScopeClosesOnTaskException() throws Exception {
AtomicInteger closeCount = new AtomicInteger();
CountDownLatch done = new CountDownLatch(1);

OpenSearchThreadPoolExecutor executor = OpenSearchExecutors.newFixed(
"test-scope-ex",
1,
10,
OpenSearchExecutors.daemonThreadFactory("test-scope-ex"),
new ThreadContext(Settings.EMPTY)
);

try {
executor.execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
// expected
}

@Override
protected void doRun() {
if (IndexInputScope.SCOPE.isBound()) {
IndexInputScope.SCOPE.get().register(closeCount::incrementAndGet);
}
throw new RuntimeException("simulated task failure");
}

@Override
public void onAfter() {
done.countDown();
}
});

done.await();
assertBusy(() -> assertEquals(1, closeCount.get()));
} finally {
executor.shutdown();
}
}

public void testScopeNoOpWithoutRegistration() throws Exception {
// Simulates running without the plugin — scope is created but nothing registers
CountDownLatch done = new CountDownLatch(1);
AtomicBoolean scopeBound = new AtomicBoolean();

OpenSearchThreadPoolExecutor executor = OpenSearchExecutors.newFixed(
"test-no-plugin",
1,
10,
OpenSearchExecutors.daemonThreadFactory("test-no-plugin"),
new ThreadContext(Settings.EMPTY)
);

try {
executor.execute(() -> {
scopeBound.set(IndexInputScope.SCOPE.isBound());
// Don't register anything — simulating no plugin installed
done.countDown();
});

done.await();
assertTrue("Scope should be bound even without plugin", scopeBound.get());
// closeAll() ran on empty list — no errors, no side effects
} finally {
executor.shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,8 @@ public void append(LogEvent event) {
Configurator.shutdown(context);
}));

BootstrapForTesting.ensureInitialized();
TransportService.ensureClassloaded(); // ensure server streamables are registered
// BootstrapForTesting.ensureInitialized();
// TransportService.ensureClassloaded(); // ensure server streamables are registered

// filter out joda timezones that are deprecated for the java time migration
List<String> jodaTZIds = DateTimeZone.getAvailableIDs()
Expand Down
Loading