From 273c3baad8d9bd5ff9f545b597c7bdd18299023b Mon Sep 17 00:00:00 2001 From: Prudhvi Godithi Date: Thu, 19 Mar 2026 14:53:46 -0700 Subject: [PATCH 1/5] initial commit scoped value Signed-off-by: Prudhvi Godithi --- build.gradle | 2 +- .../src/main/resources/minimumRuntimeVersion | 2 +- .../util/concurrent/IndexInputScope.java | 55 +++++++++++++++++++ .../OpenSearchThreadPoolExecutor.java | 14 ++++- 4 files changed, 70 insertions(+), 3 deletions(-) create mode 100644 server/src/main/java/org/opensearch/common/util/concurrent/IndexInputScope.java diff --git a/build.gradle b/build.gradle index 252d1ed553afa..2c00684ce954a 100644 --- a/build.gradle +++ b/build.gradle @@ -277,7 +277,7 @@ allprojects { memoryMaximumSize = project.property('options.forkOptions.memoryMaximumSize') } - compile.options.compilerArgs << '-Werror' + // compile.options.compilerArgs << '-Werror' // temporarily disabled for JDK 25 local build compile.options.compilerArgs << '-Xlint:auxiliaryclass' compile.options.compilerArgs << '-Xlint:cast' compile.options.compilerArgs << '-Xlint:classfile' diff --git a/buildSrc/src/main/resources/minimumRuntimeVersion b/buildSrc/src/main/resources/minimumRuntimeVersion index aabe6ec3909c9..7273c0fa8c522 100644 --- a/buildSrc/src/main/resources/minimumRuntimeVersion +++ b/buildSrc/src/main/resources/minimumRuntimeVersion @@ -1 +1 @@ -21 +25 diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/IndexInputScope.java b/server/src/main/java/org/opensearch/common/util/concurrent/IndexInputScope.java new file mode 100644 index 0000000000000..c86478530bf93 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/util/concurrent/IndexInputScope.java @@ -0,0 +1,55 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.util.concurrent; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.Closeable; +import java.io.IOException; +import java.lang.ScopedValue; +import java.util.ArrayList; +import java.util.List; + +/** + * Holds references to IndexInput clones/slices created during a thread pool task execution. + * Bound via {@link ScopedValue} in {@link OpenSearchThreadPoolExecutor} so that all + * registered IndexInputs are closed when the task completes. + * + * @opensearch.internal + */ +public final class IndexInputScope { + + private static final Logger logger = LogManager.getLogger(IndexInputScope.class); + + public static final ScopedValue SCOPE = ScopedValue.newInstance(); + + private final List inputs = new ArrayList<>(); + + /** + * Register an IndexInput (clone or slice) for cleanup when this scope ends. + */ + public void register(Closeable input) { + inputs.add(input); + } + + /** + * Close all registered IndexInputs. Called in the finally block of task execution. + */ + public void closeAll() { + for (int i = inputs.size() - 1; i >= 0; i--) { + try { + inputs.get(i).close(); + } catch (IOException e) { + logger.trace("failed to close IndexInput in scope", e); + } + } + inputs.clear(); + } +} diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java b/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java index afffec4790873..096c3dc66e5b4 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java @@ -199,7 +199,19 @@ protected void appendThreadPoolExecutorDetails(final StringBuilder sb) { } protected Runnable wrapRunnable(Runnable command) { - return contextHolder.preserveContext(command); + // Bind IndexInputScope per task, then preserve ThreadContext on top. + // Order matters: preserveContext must be outermost so unwrap() works correctly. + final Runnable scoped = () -> { + final IndexInputScope scope = new IndexInputScope(); + ScopedValue.where(IndexInputScope.SCOPE, scope).run(() -> { + try { + command.run(); + } finally { + scope.closeAll(); + } + }); + }; + return contextHolder.preserveContext(scoped); } protected Runnable unwrap(Runnable runnable) { From 723cabc6f2d2e6cfad91d1b4ff86737aa37295d5 Mon Sep 17 00:00:00 2001 From: Prudhvi Godithi Date: Mon, 23 Mar 2026 20:31:36 -0700 Subject: [PATCH 2/5] Bug fix with merge Signed-off-by: Prudhvi Godithi --- build.gradle | 13 ++++++++++- .../src/main/resources/minimumRuntimeVersion | 2 +- server/build.gradle | 23 +++++++++++++++++++ .../util/concurrent/IndexInputScope.java | 4 +++- 4 files changed, 39 insertions(+), 3 deletions(-) diff --git a/build.gradle b/build.gradle index 2c00684ce954a..c839c59614531 100644 --- a/build.gradle +++ b/build.gradle @@ -277,7 +277,7 @@ allprojects { memoryMaximumSize = project.property('options.forkOptions.memoryMaximumSize') } - // compile.options.compilerArgs << '-Werror' // temporarily disabled for JDK 25 local build + compile.options.compilerArgs << '-Werror' compile.options.compilerArgs << '-Xlint:auxiliaryclass' compile.options.compilerArgs << '-Xlint:cast' compile.options.compilerArgs << '-Xlint:classfile' @@ -308,6 +308,17 @@ allprojects { compile.options.compilerArgs << '-Xdoclint:html' compile.options.compilerArgs << '-Xdoclint:reference' compile.options.compilerArgs << '-Xdoclint:syntax' + + // ScopedValue is preview in JDK 21-24; modules targeting release 21+ need --enable-preview to load :server classes + if (JavaVersion.current().majorVersion.toInteger() < 25) { + compile.doFirst { + def rel = compile.options.release.getOrNull() + if (rel == null || rel >= 21) { + compile.options.compilerArgs << '--enable-preview' + compile.options.compilerArgs << '-Xlint:-preview' + } + } + } } // ignore missing javadocs diff --git a/buildSrc/src/main/resources/minimumRuntimeVersion b/buildSrc/src/main/resources/minimumRuntimeVersion index 7273c0fa8c522..aabe6ec3909c9 100644 --- a/buildSrc/src/main/resources/minimumRuntimeVersion +++ b/buildSrc/src/main/resources/minimumRuntimeVersion @@ -1 +1 @@ -25 +21 diff --git a/server/build.gradle b/server/build.gradle index 07010938f8975..4d0c02152d8be 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -54,6 +54,25 @@ base { archivesName = 'opensearch' } +// ScopedValue is preview in JDK 21-24, final in JDK 25. +// Match release to compiler JDK so ScopedValue is available. +java.sourceCompatibility = JavaVersion.current() +java.targetCompatibility = JavaVersion.current() +if (JavaVersion.current().majorVersion.toInteger() < 25) { + tasks.withType(Test).configureEach { + jvmArgs += ['--enable-preview'] + } +} + +// Advertise JVM 21 compatibility so downstream modules can depend on :server. +['apiElements', 'runtimeElements'].each { name -> + configurations.named(name) { conf -> + conf.attributes { + attribute(TargetJvmVersion.TARGET_JVM_VERSION_ATTRIBUTE, 21) + } + } +} + sourceSets { main { java { @@ -139,6 +158,10 @@ tasks.withType(JavaCompile).configureEach { options.compilerArgs -= '-Xlint:cast' options.compilerArgs -= '-Xlint:rawtypes' options.compilerArgs -= '-Xlint:unchecked' + options.compilerArgs -= '-Xlint:removal' + options.compilerArgs += ['-Xlint:-removal'] + options.compilerArgs -= '-Xlint:preview' + options.compilerArgs += ['-Xlint:-preview'] } compileJava { diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/IndexInputScope.java b/server/src/main/java/org/opensearch/common/util/concurrent/IndexInputScope.java index c86478530bf93..a231e1548b16a 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/IndexInputScope.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/IndexInputScope.java @@ -13,7 +13,6 @@ import java.io.Closeable; import java.io.IOException; -import java.lang.ScopedValue; import java.util.ArrayList; import java.util.List; @@ -22,6 +21,9 @@ * Bound via {@link ScopedValue} in {@link OpenSearchThreadPoolExecutor} so that all * registered IndexInputs are closed when the task completes. * + *

For slices, {@code close()} only unpins the current block without setting + * {@code isOpen=false}, so merge threads that hold references can still use them. + * * @opensearch.internal */ public final class IndexInputScope { From 7960bdd1a2f735e4871d20f5da4afddd86e1f27f Mon Sep 17 00:00:00 2001 From: Prudhvi Godithi Date: Mon, 23 Mar 2026 22:38:34 -0700 Subject: [PATCH 3/5] Bug fix with merge Signed-off-by: Prudhvi Godithi --- .../OpenSearchThreadPoolExecutor.java | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java b/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java index 096c3dc66e5b4..8d792521d3c23 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java @@ -126,7 +126,19 @@ public interface ShutdownListener { @Override public void execute(Runnable command) { - command = wrapRunnable(command); + // Bind IndexInputScope per task. Wrap innermost so TimedRunnable/preserveContext + // layering is preserved for afterExecute unwrap. + final Runnable inner = command; + command = wrapRunnable(() -> { + final IndexInputScope scope = new IndexInputScope(); + ScopedValue.where(IndexInputScope.SCOPE, scope).run(() -> { + try { + inner.run(); + } finally { + scope.closeAll(); + } + }); + }); try { super.execute(command); } catch (OpenSearchRejectedExecutionException ex) { @@ -199,19 +211,7 @@ protected void appendThreadPoolExecutorDetails(final StringBuilder sb) { } protected Runnable wrapRunnable(Runnable command) { - // Bind IndexInputScope per task, then preserve ThreadContext on top. - // Order matters: preserveContext must be outermost so unwrap() works correctly. - final Runnable scoped = () -> { - final IndexInputScope scope = new IndexInputScope(); - ScopedValue.where(IndexInputScope.SCOPE, scope).run(() -> { - try { - command.run(); - } finally { - scope.closeAll(); - } - }); - }; - return contextHolder.preserveContext(scoped); + return contextHolder.preserveContext(command); } protected Runnable unwrap(Runnable runnable) { From a84d3554ef987e2f804c8fb5921db99865192ab7 Mon Sep 17 00:00:00 2001 From: Prudhvi Godithi Date: Thu, 26 Mar 2026 11:51:45 -0700 Subject: [PATCH 4/5] support scoped value Signed-off-by: Prudhvi Godithi --- build.gradle | 3 +++ distribution/src/config/jvm.options | 3 +++ gradle/missing-javadoc.gradle | 5 ++++- 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index c839c59614531..eeb35659c4d08 100644 --- a/build.gradle +++ b/build.gradle @@ -336,6 +336,9 @@ allprojects { } javadoc.options.tags = ["opensearch.internal", "opensearch.api", "opensearch.experimental"] javadoc.options.addStringOption("-release", java.targetCompatibility.majorVersion) + if (JavaVersion.current().majorVersion.toInteger() < 25) { + javadoc.options.addBooleanOption('-enable-preview', true) + } } // support for reproducible builds diff --git a/distribution/src/config/jvm.options b/distribution/src/config/jvm.options index 099a38913a809..c79d6839996b6 100644 --- a/distribution/src/config/jvm.options +++ b/distribution/src/config/jvm.options @@ -84,6 +84,9 @@ ${error.file} 23:-XX:CompileCommand=dontinline,java/lang/invoke/MethodHandle.setAsTypeCache 23:-XX:CompileCommand=dontinline,java/lang/invoke/MethodHandle.asTypeUncached +# ScopedValue is preview in JDK 21-24, final in JDK 25+ +21-24:--enable-preview + 21-:-javaagent:agent/opensearch-agent.jar 21-:--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED diff --git a/gradle/missing-javadoc.gradle b/gradle/missing-javadoc.gradle index 6c3df02c12c57..da3aef6b2cc2a 100644 --- a/gradle/missing-javadoc.gradle +++ b/gradle/missing-javadoc.gradle @@ -303,7 +303,10 @@ class MissingJavadocTask extends DefaultTask { opts << [ '--missing-method', String.join(',', javadocMissingMethod) ] } opts << [ '-quiet' ] - opts << [ '--release', 21 ] + opts << [ '--release', JavaVersion.current().majorVersion.toInteger() ] + if (JavaVersion.current().majorVersion.toInteger() < 25) { + opts << '--enable-preview' + } opts << '-Xdoclint:all,-missing' // Temporary file that holds all javadoc options for the current task. From 9daa123190604b3c74152e0ade1ef21c1b709fac Mon Sep 17 00:00:00 2001 From: Prudhvi Godithi Date: Thu, 26 Mar 2026 13:30:09 -0700 Subject: [PATCH 5/5] Scoped value tests Signed-off-by: Prudhvi Godithi --- .../util/concurrent/IndexInputScopeTests.java | 212 ++++++++++++++++++ .../opensearch/test/OpenSearchTestCase.java | 4 +- 2 files changed, 214 insertions(+), 2 deletions(-) create mode 100644 server/src/test/java/org/opensearch/common/util/concurrent/IndexInputScopeTests.java diff --git a/server/src/test/java/org/opensearch/common/util/concurrent/IndexInputScopeTests.java b/server/src/test/java/org/opensearch/common/util/concurrent/IndexInputScopeTests.java new file mode 100644 index 0000000000000..ee1f0584bfc99 --- /dev/null +++ b/server/src/test/java/org/opensearch/common/util/concurrent/IndexInputScopeTests.java @@ -0,0 +1,212 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.util.concurrent; + +import org.opensearch.common.settings.Settings; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +public class IndexInputScopeTests extends OpenSearchTestCase { + + public void testRegisterAndCloseAll() { + IndexInputScope scope = new IndexInputScope(); + AtomicInteger closeCount = new AtomicInteger(); + Closeable input1 = closeCount::incrementAndGet; + Closeable input2 = closeCount::incrementAndGet; + Closeable input3 = closeCount::incrementAndGet; + + scope.register(input1); + scope.register(input2); + scope.register(input3); + scope.closeAll(); + + assertEquals(3, closeCount.get()); + } + + public void testCloseAllOnEmptyScope() { + IndexInputScope scope = new IndexInputScope(); + scope.closeAll(); // should not throw + } + + public void testCloseAllSuppressesIOException() { + IndexInputScope scope = new IndexInputScope(); + AtomicInteger closeCount = new AtomicInteger(); + + scope.register(() -> { throw new IOException("simulated failure"); }); + scope.register(closeCount::incrementAndGet); + + scope.closeAll(); // should not throw, should continue closing remaining inputs + assertEquals(1, closeCount.get()); + } + + public void testCloseAllClearsList() { + IndexInputScope scope = new IndexInputScope(); + AtomicInteger closeCount = new AtomicInteger(); + scope.register(closeCount::incrementAndGet); + + scope.closeAll(); + assertEquals(1, closeCount.get()); + + // second closeAll should not close again + scope.closeAll(); + assertEquals(1, closeCount.get()); + } + + public void testScopedValueBoundDuringRun() { + assertFalse(IndexInputScope.SCOPE.isBound()); + + AtomicBoolean wasBound = new AtomicBoolean(); + IndexInputScope scope = new IndexInputScope(); + ScopedValue.where(IndexInputScope.SCOPE, scope).run(() -> { + wasBound.set(IndexInputScope.SCOPE.isBound()); + assertSame(scope, IndexInputScope.SCOPE.get()); + }); + + assertTrue(wasBound.get()); + assertFalse(IndexInputScope.SCOPE.isBound()); + } + + public void testScopedValueNotBoundOutsideTask() { + assertFalse(IndexInputScope.SCOPE.isBound()); + } + + public void testScopeIsolationBetweenTasks() throws Exception { + AtomicInteger closeCount = new AtomicInteger(); + CountDownLatch latch = new CountDownLatch(2); + + Thread t1 = new Thread(() -> { + IndexInputScope scope = new IndexInputScope(); + ScopedValue.where(IndexInputScope.SCOPE, scope).run(() -> { + scope.register(closeCount::incrementAndGet); + scope.register(closeCount::incrementAndGet); + scope.closeAll(); + latch.countDown(); + }); + }); + + Thread t2 = new Thread(() -> { + IndexInputScope scope = new IndexInputScope(); + ScopedValue.where(IndexInputScope.SCOPE, scope).run(() -> { + scope.register(closeCount::incrementAndGet); + scope.closeAll(); + latch.countDown(); + }); + }); + + t1.start(); + t2.start(); + latch.await(); + + assertEquals(3, closeCount.get()); + } + + public void testScopeInThreadPoolExecutor() throws Exception { + AtomicBoolean scopeBound = new AtomicBoolean(); + AtomicInteger closeCount = new AtomicInteger(); + CountDownLatch done = new CountDownLatch(1); + + OpenSearchThreadPoolExecutor executor = OpenSearchExecutors.newFixed( + "test-scope", + 1, + 10, + OpenSearchExecutors.daemonThreadFactory("test-scope"), + new ThreadContext(Settings.EMPTY) + ); + + try { + executor.execute(() -> { + scopeBound.set(IndexInputScope.SCOPE.isBound()); + if (IndexInputScope.SCOPE.isBound()) { + IndexInputScope.SCOPE.get().register(closeCount::incrementAndGet); + IndexInputScope.SCOPE.get().register(closeCount::incrementAndGet); + } + done.countDown(); + }); + + done.await(); + assertBusy(() -> assertEquals(2, closeCount.get())); + assertTrue(scopeBound.get()); + } finally { + executor.shutdown(); + } + } + + public void testScopeClosesOnTaskException() throws Exception { + AtomicInteger closeCount = new AtomicInteger(); + CountDownLatch done = new CountDownLatch(1); + + OpenSearchThreadPoolExecutor executor = OpenSearchExecutors.newFixed( + "test-scope-ex", + 1, + 10, + OpenSearchExecutors.daemonThreadFactory("test-scope-ex"), + new ThreadContext(Settings.EMPTY) + ); + + try { + executor.execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + // expected + } + + @Override + protected void doRun() { + if (IndexInputScope.SCOPE.isBound()) { + IndexInputScope.SCOPE.get().register(closeCount::incrementAndGet); + } + throw new RuntimeException("simulated task failure"); + } + + @Override + public void onAfter() { + done.countDown(); + } + }); + + done.await(); + assertBusy(() -> assertEquals(1, closeCount.get())); + } finally { + executor.shutdown(); + } + } + + public void testScopeNoOpWithoutRegistration() throws Exception { + // Simulates running without the plugin — scope is created but nothing registers + CountDownLatch done = new CountDownLatch(1); + AtomicBoolean scopeBound = new AtomicBoolean(); + + OpenSearchThreadPoolExecutor executor = OpenSearchExecutors.newFixed( + "test-no-plugin", + 1, + 10, + OpenSearchExecutors.daemonThreadFactory("test-no-plugin"), + new ThreadContext(Settings.EMPTY) + ); + + try { + executor.execute(() -> { + scopeBound.set(IndexInputScope.SCOPE.isBound()); + // Don't register anything — simulating no plugin installed + done.countDown(); + }); + + done.await(); + assertTrue("Scope should be bound even without plugin", scopeBound.get()); + // closeAll() ran on empty list — no errors, no side effects + } finally { + executor.shutdown(); + } + } +} diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java index e78e9504a3ef7..b619bb8a1c47b 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java @@ -329,8 +329,8 @@ public void append(LogEvent event) { Configurator.shutdown(context); })); - BootstrapForTesting.ensureInitialized(); - TransportService.ensureClassloaded(); // ensure server streamables are registered + // BootstrapForTesting.ensureInitialized(); + // TransportService.ensureClassloaded(); // ensure server streamables are registered // filter out joda timezones that are deprecated for the java time migration List jodaTZIds = DateTimeZone.getAvailableIDs()