Skip to content

Add sandbox plugin for composite indexing execution engine#20909

Open
alchemist51 wants to merge 6 commits intoopensearch-project:mainfrom
alchemist51:ciee
Open

Add sandbox plugin for composite indexing execution engine#20909
alchemist51 wants to merge 6 commits intoopensearch-project:mainfrom
alchemist51:ciee

Conversation

@alchemist51
Copy link
Copy Markdown
Contributor

@alchemist51 alchemist51 commented Mar 18, 2026

Description

This PR introduces the composite-engine sandbox plugin that implements the CompositeIndexingExecutionEngine — the orchestration layer for multi-format indexing as described in RFC #20644

The composite engine enables an index to write documents to multiple storage formats (e.g., Lucene + Parquet) simultaneously through a single IndexingExecutionEngine interface. Format plugins register via the ExtensiblePlugin SPI, and the composite engine delegates writes, refresh, and file management to each per-format engine.

Note: we have used ExtensiblePlugin SPI model temporarily. Once we have introduced Dataformat Registry, we should be able to get rid of this model.

What's included

New sandbox plugin: sandbox/plugins/composite-engine

  • CompositeEnginePluginExtensiblePlugin entry point that discovers DataFormatPlugin implementations at node bootstrap, validates index settings, and creates the composite engine. Registers three index settings:
    • index.composite.enabled (default false)
    • index.composite.primary_data_format (default "lucene")
    • index.composite.secondary_data_formats (default [])
  • CompositeIndexingExecutionEngine — Orchestrates indexing across a primary and zero or more secondary per-format engines. Handles writer creation, refresh (flush all writers → build segments → delegate per-format refresh), file deletion, and document input creation.
  • CompositeDataFormat — A DataFormat wrapper over the constituent formats. Uses Long.MIN_VALUE priority so concrete formats take precedence.
  • CompositeDocumentInput — Broadcasts addField, setRowId, and other metadata operations to all per-format DocumentInput instances. Releases the writer back to the pool on close().
  • CompositeWriter — Delegates addDoc, flush, sync, and close to each per-format writer (primary first, then secondaries). Implements Lock for pool checkout semantics.
  • CompositeDataFormatWriterPool — Thread-safe pool of CompositeWriter instances with lock-based checkout/release and a checkoutAll for flush.
  • RowIdGenerator — Generates monotonically increasing row IDs for cross-format document synchronization within a writer's segment scope.

New sandbox lib: sandbox/libs/composite-engine-lib

  • ConcurrentQueue — Striped concurrent queue using thread-affinity hashing to reduce contention across concurrent indexing threads.
  • LockableConcurrentQueue — Extends ConcurrentQueue with tryLock-based polling so writers can be checked out without blocking.

How format plugins integrate

Format plugins (e.g., Parquet) extend this plugin by:

  1. Declaring extendedPlugins = ['composite-engine'] in their build.gradle
  2. Implementing DataFormatPlugin
  3. The ExtensiblePlugin SPI discovers them automatically during node bootstrap

Related issues

Resolves part of #20876

Check List

  • New functionality includes testing
  • New functionality has been documented
  • All classes are annotated with @ExperimentalApi
  • No BWC tests required (sandbox/experimental)
  • Commits are signed off (DCO)

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Mar 18, 2026

PR Reviewer Guide 🔍

(Review updated until commit 80dd650)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
📝 TODO sections

🔀 Multiple PR themes

Sub-PR theme: Add concurrent queue library with lockable pool support

Relevant files:

  • libs/concurrent-queue/src/main/java/org/opensearch/common/queue/Lockable.java
  • libs/concurrent-queue/src/main/java/org/opensearch/common/queue/ConcurrentQueue.java
  • libs/concurrent-queue/src/main/java/org/opensearch/common/queue/LockableConcurrentQueue.java
  • libs/concurrent-queue/src/main/java/org/opensearch/common/queue/LockablePool.java
  • libs/concurrent-queue/src/test/java/org/opensearch/common/queue/ConcurrentQueueTests.java
  • libs/concurrent-queue/src/test/java/org/opensearch/common/queue/LockableConcurrentQueueTests.java
  • libs/concurrent-queue/src/test/java/org/opensearch/common/queue/LockablePoolTests.java
  • benchmarks/src/main/java/org/opensearch/benchmark/queue/LockableConcurrentQueueBenchmark.java

Sub-PR theme: Convert DataFormat interface to abstract class with equals/hashCode

Relevant files:

  • server/src/main/java/org/opensearch/index/engine/dataformat/DataFormat.java
  • server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatPluginTests.java

⚡ Recommended focus areas for review

Race Condition

In the checkoutAll() method, items are locked outside the synchronized block (lines 92-95), then unlocked inside the synchronized block (lines 104-105). Between unlocking and the synchronized block check, another thread could modify the items set or the item could be re-added to the queue, causing isRegistered(item) to return false and the item to be skipped. This breaks the atomicity of the checkout operation and can leak items from the pool.

public List<T> checkoutAll() {
    ensureOpen();
    List<T> lockedItems = new ArrayList<>();
    List<T> checkedOutItems = new ArrayList<>();
    for (T item : this) {
        item.lock();
        lockedItems.add(item);
    }
    synchronized (this) {
        for (T item : lockedItems) {
            try {
                if (isRegistered(item) && items.remove(item)) {
                    availableItems.remove(item);
                    checkedOutItems.add(item);
                }
            } finally {
                item.unlock();
            }
        }
    }
    return Collections.unmodifiableList(checkedOutItems);
}
Resource Leak

In the newDocumentInput() method (line 253), if an exception occurs after writerPool.getAndLock() but before the CompositeDocumentInput is successfully constructed, the locked writer is never released back to the pool. The onClose callback is only invoked when CompositeDocumentInput.close() is called, but if construction fails, the callback is never registered.

public CompositeDocumentInput newDocumentInput() {
    CompositeWriter writer = writerPool.getAndLock();
    DocumentInput<?> primaryInput = primaryEngine.newDocumentInput();
    Map<DataFormat, DocumentInput<?>> secondaryInputMap = new LinkedHashMap<>();
    for (IndexingExecutionEngine<?, ?> engine : secondaryEngines) {
        secondaryInputMap.put(engine.getDataFormat(), engine.newDocumentInput());
    }
    return new CompositeDocumentInput(primaryEngine.getDataFormat(), primaryInput, secondaryInputMap, () -> {
        assert writer.getState() == CompositeWriter.WriterState.ACTIVE : "CompositeWriter is not ACTIVE, state=" + writer.getState();
        writerPool.releaseAndUnlock(writer);
    });
}
Potential NPE

In the refresh() method (line 220), RefreshInput.builder().build() is called to create an empty input. If the RefreshInput.builder() or build() method returns null, the subsequent calls to primaryEngine.refresh(emptyInput) and engine.refresh(emptyInput) will throw a NullPointerException. The code should validate that emptyInput is not null before use.

RefreshInput emptyInput = RefreshInput.builder().build();
primaryEngine.refresh(emptyInput);
for (IndexingExecutionEngine<?, ?> engine : secondaryEngines) {
    engine.refresh(emptyInput);
}

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Mar 18, 2026

PR Code Suggestions ✨

Latest suggestions up to 80dd650

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Handle resource cleanup on secondary input creation failure

If an exception occurs while creating secondary document inputs, the primary input
and writer are never cleaned up, causing resource leaks. Wrap the secondary input
creation in a try-catch block to ensure proper cleanup of the primary input and
writer release on failure.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java [252-263]

 @Override
 public CompositeDocumentInput newDocumentInput() {
     CompositeWriter writer = writerPool.getAndLock();
     DocumentInput<?> primaryInput = primaryEngine.newDocumentInput();
-    Map<DataFormat, DocumentInput<?>> secondaryInputMap = new LinkedHashMap<>();
-    for (IndexingExecutionEngine<?, ?> engine : secondaryEngines) {
-        secondaryInputMap.put(engine.getDataFormat(), engine.newDocumentInput());
+    try {
+        Map<DataFormat, DocumentInput<?>> secondaryInputMap = new LinkedHashMap<>();
+        for (IndexingExecutionEngine<?, ?> engine : secondaryEngines) {
+            secondaryInputMap.put(engine.getDataFormat(), engine.newDocumentInput());
+        }
+        return new CompositeDocumentInput(primaryEngine.getDataFormat(), primaryInput, secondaryInputMap, () -> {
+            assert writer.getState() == CompositeWriter.WriterState.ACTIVE : "CompositeWriter is not ACTIVE, state=" + writer.getState();
+            writerPool.releaseAndUnlock(writer);
+        });
+    } catch (Exception e) {
+        try {
+            primaryInput.close();
+        } catch (Exception suppressed) {
+            e.addSuppressed(suppressed);
+        }
+        writerPool.releaseAndUnlock(writer);
+        throw e;
     }
-    return new CompositeDocumentInput(primaryEngine.getDataFormat(), primaryInput, secondaryInputMap, () -> {
-        assert writer.getState() == CompositeWriter.WriterState.ACTIVE : "CompositeWriter is not ACTIVE, state=" + writer.getState();
-        writerPool.releaseAndUnlock(writer);
-    });
 }
Suggestion importance[1-10]: 8

__

Why: This is a valid error handling suggestion that addresses a real resource leak vulnerability. If secondary input creation fails, the primaryInput and writer would not be cleaned up, causing resource exhaustion. The suggested try-catch with proper cleanup is appropriate and important for production robustness.

Medium
General
Handle writer close failures gracefully

If writer.close() throws an exception, the writer is left in an inconsistent state
and subsequent operations may fail. Wrap the close operation in a try-catch to log
the error and continue flushing remaining writers, or ensure all writers are
properly closed even if one fails.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java [191-209]

 // Flush each writer to disk and build segments from the file infos
     for (CompositeWriter writer : dataFormatWriters) {
         FileInfos fileInfos = writer.flush();
         Segment.Builder segmentBuilder = Segment.builder(writer.getWriterGeneration());
         boolean hasFiles = false;
         for (Map.Entry<DataFormat, WriterFileSet> entry : fileInfos.writerFilesMap().entrySet()) {
             logger.debug(
                 "Writer gen={} flushed format=[{}] files={}",
                 writer.getWriterGeneration(),
                 entry.getKey().name(),
                 entry.getValue().files()
             );
             segmentBuilder.addSearchableFiles(entry.getKey(), entry.getValue());
             hasFiles = true;
         }
-        writer.close();
+        try {
+            writer.close();
+        } catch (Exception e) {
+            logger.warn("Failed to close writer gen={}", writer.getWriterGeneration(), e);
+        }
         if (hasFiles) {
             newSegmentList.add(segmentBuilder.build());
         }
     }
Suggestion importance[1-10]: 6

__

Why: This is a reasonable error handling suggestion that prevents a single writer close failure from disrupting the entire refresh operation. Wrapping the close in try-catch with logging allows remaining writers to be processed, improving resilience. However, error handling suggestions typically score lower than functional improvements.

Low

Previous suggestions

Suggestions up to commit 2081844
CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent writer pool resource leak on exception

If primaryEngine.newDocumentInput() or any secondary engine's newDocumentInput()
throws an exception, the writer is never released back to the pool, causing a
resource leak. Wrap the document input creation in a try-catch to ensure the writer
is always released on failure.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java [251-262]

 @Override
 public CompositeDocumentInput newDocumentInput() {
     CompositeWriter writer = writerPool.getAndLock();
-    DocumentInput<?> primaryInput = primaryEngine.newDocumentInput();
-    Map<DataFormat, DocumentInput<?>> secondaryInputMap = new LinkedHashMap<>();
-    for (IndexingExecutionEngine<?, ?> engine : secondaryEngines) {
-        secondaryInputMap.put(engine.getDataFormat(), engine.newDocumentInput());
+    try {
+        DocumentInput<?> primaryInput = primaryEngine.newDocumentInput();
+        Map<DataFormat, DocumentInput<?>> secondaryInputMap = new LinkedHashMap<>();
+        for (IndexingExecutionEngine<?, ?> engine : secondaryEngines) {
+            secondaryInputMap.put(engine.getDataFormat(), engine.newDocumentInput());
+        }
+        return new CompositeDocumentInput(primaryEngine.getDataFormat(), primaryInput, secondaryInputMap, () -> {
+            assert writer.getState() == CompositeWriter.WriterState.ACTIVE : "CompositeWriter is not ACTIVE, state=" + writer.getState();
+            writerPool.releaseAndUnlock(writer);
+        });
+    } catch (Exception e) {
+        writerPool.releaseAndUnlock(writer);
+        throw e;
     }
-    return new CompositeDocumentInput(primaryEngine.getDataFormat(), primaryInput, secondaryInputMap, () -> {
-        assert writer.getState() == CompositeWriter.WriterState.ACTIVE : "CompositeWriter is not ACTIVE, state=" + writer.getState();
-        writerPool.releaseAndUnlock(writer);
-    });
 }
Suggestion importance[1-10]: 8

__

Why: This is a critical resource management issue. If primaryEngine.newDocumentInput() or any secondary engine's newDocumentInput() throws an exception, the writer is never released back to the pool, causing a permanent resource leak. The suggested try-catch-finally pattern correctly ensures the writer is always released, which is essential for maintaining pool integrity and preventing thread starvation in production.

Medium
Add null safety to equals and hashCode

The equals() and hashCode() methods rely on name() which could be null or blank,
potentially causing inconsistent behavior when used as Map keys. Add null/blank
checks to ensure robust equality semantics and prevent silent bugs in collections.

server/src/main/java/org/opensearch/index/engine/dataformat/DataFormat.java [49-59]

 @Override
 public final boolean equals(Object o) {
     if (this == o) return true;
     if (o instanceof DataFormat == false) return false;
-    return Objects.equals(name(), ((DataFormat) o).name());
+    String thisName = name();
+    String otherName = ((DataFormat) o).name();
+    if (thisName == null || otherName == null) return false;
+    return thisName.equals(otherName);
 }
 
 @Override
 public final int hashCode() {
-    return Objects.hashCode(name());
+    String n = name();
+    return n == null ? 0 : n.hashCode();
 }
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies a potential issue where name() could return null, causing NullPointerException or inconsistent behavior when DataFormat is used as a Map key. However, the PR code already shows that DataFormat is used as a Map key in CompositeWriter and CompositeDocumentInput, and the codebase appears to enforce non-null names (e.g., validation in CompositeEnginePlugin.loadExtensions). The improvement is defensive but may be unnecessary given the existing validation patterns.

Low
General
Ensure all writers are processed despite close failures

If writer.close() throws an exception, the writer is not properly cleaned up and
subsequent writers may not be flushed. Wrap the close operation in a try-finally
block to ensure all writers are processed even if one fails.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java [189-208]

 // Flush each writer to disk and build segments from the file infos
     for (CompositeWriter writer : dataFormatWriters) {
         FileInfos fileInfos = writer.flush();
         Segment.Builder segmentBuilder = Segment.builder(writer.getWriterGeneration());
         boolean hasFiles = false;
         for (Map.Entry<DataFormat, WriterFileSet> entry : fileInfos.writerFilesMap().entrySet()) {
             logger.debug(
                 "Writer gen={} flushed format=[{}] files={}",
                 writer.getWriterGeneration(),
                 entry.getKey().name(),
                 entry.getValue().files()
             );
             segmentBuilder.addSearchableFiles(entry.getKey(), entry.getValue());
             hasFiles = true;
         }
-        writer.close();
-        if (hasFiles) {
-            newSegmentList.add(segmentBuilder.build());
+        try {
+            writer.close();
+        } finally {
+            if (hasFiles) {
+                newSegmentList.add(segmentBuilder.build());
+            }
         }
     }
Suggestion importance[1-10]: 5

__

Why: While the suggestion identifies a potential issue where writer.close() could throw an exception, the proposed fix has a logic flaw. Moving the segment addition into a finally block means segments would be added even if the close fails, which could result in corrupted segment metadata. A better approach would be to ensure all writers are closed (possibly with exception suppression) before adding segments, rather than the suggested try-finally structure.

Low
Suggestions up to commit 0267a41
CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix race condition in counter increment ordering

The entry is unlocked before the counter is incremented, creating a race window
where a concurrent lockAndPoll thread can acquire the entry and observe a stale
counter value, causing it to exit the retry loop prematurely and return null even
though an entry was just added. The counter should be incremented before unlocking
the entry so that any thread spinning in lockAndPoll sees the updated count before
the entry becomes acquirable.

libs/concurrent-queue/src/main/java/org/opensearch/common/queue/LockableConcurrentQueue.java [74-78]

 public void addAndUnlock(T entry) {
     queue.add(entry);
+    addAndUnlockCounter.incrementAndGet();
     entry.unlock();
-    addAndUnlockCounter.incrementAndGet();
 }
Suggestion importance[1-10]: 8

__

Why: This is a genuine race condition: unlocking the entry before incrementing the counter means a lockAndPoll thread could acquire the entry and see the old counter value, causing it to exit the retry loop prematurely and return null. Incrementing the counter before unlock() closes this window.

Medium
Fix checked-out items being incorrectly unlocked

Items that are locked but fail the isRegistered check (e.g., concurrently removed)
are unlocked in the finally block, but items that are successfully checked out are
also unlocked in the same finally block — meaning checked-out items are returned to
callers in an unlocked state. The item.unlock() in the finally block should only be
called for items that were NOT successfully checked out.

libs/concurrent-queue/src/main/java/org/opensearch/common/queue/LockablePool.java [88-109]

 public List<T> checkoutAll() {
     ensureOpen();
     List<T> lockedItems = new ArrayList<>();
     List<T> checkedOutItems = new ArrayList<>();
     for (T item : this) {
         item.lock();
         lockedItems.add(item);
     }
     synchronized (this) {
         for (T item : lockedItems) {
+            boolean checkedOut = false;
             try {
                 if (isRegistered(item) && items.remove(item)) {
                     availableItems.remove(item);
                     checkedOutItems.add(item);
+                    checkedOut = true;
                 }
             } finally {
-                item.unlock();
+                if (!checkedOut) {
+                    item.unlock();
+                }
             }
         }
     }
     return Collections.unmodifiableList(checkedOutItems);
 }
Suggestion importance[1-10]: 8

__

Why: The finally block unconditionally calls item.unlock() for all items, including those successfully checked out. This means callers receive items in an unlocked state, violating the contract that checkoutAll returns locked items. The fix correctly only unlocks items that were not checked out.

Medium
Prevent writer pool leak on document input creation failure

If primaryEngine.newDocumentInput() or any secondary engine's newDocumentInput()
throws an exception, the checked-out writer is never returned to the pool, causing a
permanent pool leak. The document input creation should be wrapped in a try-catch
that releases the writer back to the pool on failure.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java [251-262]

 public CompositeDocumentInput newDocumentInput() {
     CompositeWriter writer = writerPool.getAndLock();
-    DocumentInput<?> primaryInput = primaryEngine.newDocumentInput();
-    Map<DataFormat, DocumentInput<?>> secondaryInputMap = new LinkedHashMap<>();
-    for (IndexingExecutionEngine<?, ?> engine : secondaryEngines) {
-        secondaryInputMap.put(engine.getDataFormat(), engine.newDocumentInput());
+    try {
+        DocumentInput<?> primaryInput = primaryEngine.newDocumentInput();
+        Map<DataFormat, DocumentInput<?>> secondaryInputMap = new LinkedHashMap<>();
+        for (IndexingExecutionEngine<?, ?> engine : secondaryEngines) {
+            secondaryInputMap.put(engine.getDataFormat(), engine.newDocumentInput());
+        }
+        return new CompositeDocumentInput(primaryEngine.getDataFormat(), primaryInput, secondaryInputMap, () -> {
+            assert writer.getState() == CompositeWriter.WriterState.ACTIVE : "CompositeWriter is not ACTIVE, state=" + writer.getState();
+            writerPool.releaseAndUnlock(writer);
+        });
+    } catch (Exception e) {
+        writerPool.releaseAndUnlock(writer);
+        throw e;
     }
-    return new CompositeDocumentInput(primaryEngine.getDataFormat(), primaryInput, secondaryInputMap, () -> {
-        assert writer.getState() == CompositeWriter.WriterState.ACTIVE : "CompositeWriter is not ACTIVE, state=" + writer.getState();
-        writerPool.releaseAndUnlock(writer);
-    });
 }
Suggestion importance[1-10]: 7

__

Why: If primaryEngine.newDocumentInput() or any secondary engine call throws, the checked-out writer is never returned to the pool. The suggested try-catch correctly releases the writer on failure, preventing a pool resource leak.

Medium
General
Remove duplicate Javadoc comment block

The Javadoc comment for DataFormatPlugin is duplicated — the same comment block
appears twice consecutively. The duplicate comment block should be removed to keep
the file clean.

server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatPlugin.java [16-27]

 /**
  * Plugin interface for providing custom data format implementations.
  * Plugins implement this to register their data format (e.g., Parquet, Lucene)
  * with the DataFormatRegistry during node bootstrap.
  *
  * @opensearch.experimental
  */
-/**
- * Plugin interface for providing custom data format implementations.
- * Plugins implement this to register their data format (e.g., Parquet, Lucene)
- * with the DataFormatRegistry during node bootstrap.
Suggestion importance[1-10]: 3

__

Why: The Javadoc comment is indeed duplicated in the PR diff (lines 16-22 and 23-27 contain the same content). This is a minor code quality issue with no functional impact.

Low
Suggestions up to commit 5c20337
CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix checked-out items being unlocked before return

Items that are locked but fail the isRegistered check (e.g., concurrently removed)
are unlocked in the finally block, but items that were successfully checked out are
also unlocked there — meaning all checked-out items are unlocked before being
returned to the caller. The caller receives items that are no longer locked,
defeating the purpose of checkoutAll. Items added to checkedOutItems should not be
unlocked.

libs/concurrent-queue/src/main/java/org/opensearch/common/queue/LockablePool.java [88-109]

 public List<T> checkoutAll() {
     ensureOpen();
     List<T> lockedItems = new ArrayList<>();
     List<T> checkedOutItems = new ArrayList<>();
     for (T item : this) {
         item.lock();
         lockedItems.add(item);
     }
     synchronized (this) {
         for (T item : lockedItems) {
-            try {
-                if (isRegistered(item) && items.remove(item)) {
-                    availableItems.remove(item);
-                    checkedOutItems.add(item);
-                }
-            } finally {
+            if (isRegistered(item) && items.remove(item)) {
+                availableItems.remove(item);
+                checkedOutItems.add(item);
+            } else {
                 item.unlock();
             }
         }
     }
     return Collections.unmodifiableList(checkedOutItems);
 }
Suggestion importance[1-10]: 9

__

Why: This is a critical correctness bug — the finally block unconditionally unlocks all items including those added to checkedOutItems, so the caller receives items that are already unlocked, completely defeating the purpose of checkoutAll. The improved code correctly only unlocks items that were not checked out.

High
Prevent writer pool leak on document input creation failure

If primaryEngine.newDocumentInput() or any secondary newDocumentInput() throws an
exception, the checked-out writer is never returned to the pool, causing a permanent
pool leak. The writer checkout and document input creation should be wrapped in a
try/finally block to ensure the writer is released on failure.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java [251-262]

 public CompositeDocumentInput newDocumentInput() {
     CompositeWriter writer = writerPool.getAndLock();
-    DocumentInput<?> primaryInput = primaryEngine.newDocumentInput();
-    Map<DataFormat, DocumentInput<?>> secondaryInputMap = new LinkedHashMap<>();
-    for (IndexingExecutionEngine<?, ?> engine : secondaryEngines) {
-        secondaryInputMap.put(engine.getDataFormat(), engine.newDocumentInput());
+    try {
+        DocumentInput<?> primaryInput = primaryEngine.newDocumentInput();
+        Map<DataFormat, DocumentInput<?>> secondaryInputMap = new LinkedHashMap<>();
+        for (IndexingExecutionEngine<?, ?> engine : secondaryEngines) {
+            secondaryInputMap.put(engine.getDataFormat(), engine.newDocumentInput());
+        }
+        return new CompositeDocumentInput(primaryEngine.getDataFormat(), primaryInput, secondaryInputMap, () -> {
+            assert writer.getState() == CompositeWriter.WriterState.ACTIVE : "CompositeWriter is not ACTIVE, state=" + writer.getState();
+            writerPool.releaseAndUnlock(writer);
+        });
+    } catch (Exception e) {
+        writerPool.releaseAndUnlock(writer);
+        throw e;
     }
-    return new CompositeDocumentInput(primaryEngine.getDataFormat(), primaryInput, secondaryInputMap, () -> {
-        assert writer.getState() == CompositeWriter.WriterState.ACTIVE : "CompositeWriter is not ACTIVE, state=" + writer.getState();
-        writerPool.releaseAndUnlock(writer);
-    });
 }
Suggestion importance[1-10]: 8

__

Why: This is a valid and important bug fix — if primaryEngine.newDocumentInput() or any secondary engine's newDocumentInput() throws, the writer checked out from writerPool is never returned, causing a permanent pool leak that would degrade or halt indexing.

Medium
Fix race condition in counter increment ordering

The entry is unlocked before the counter is incremented. A thread in lockAndPoll
could observe the entry as available (via tryLock), poll it, and then the counter
increment would cause a spurious retry loop. The counter should be incremented
before unlocking so that any waiting lockAndPoll caller sees the updated count only
after the entry is truly available.

libs/concurrent-queue/src/main/java/org/opensearch/common/queue/LockableConcurrentQueue.java [74-78]

 public void addAndUnlock(T entry) {
     queue.add(entry);
+    addAndUnlockCounter.incrementAndGet();
     entry.unlock();
-    addAndUnlockCounter.incrementAndGet();
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that incrementing the counter before unlocking ensures lockAndPoll callers see the updated count only after the entry is truly available, preventing a subtle race condition where the entry is visible but the counter hasn't been updated yet.

Medium
General
Remove duplicate Javadoc comment

The Javadoc comment for DataFormatPlugin is duplicated — the same block comment
appears twice consecutively. The duplicate should be removed to keep the
documentation clean.

server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatPlugin.java [16-27]

 /**
  * Plugin interface for providing custom data format implementations.
  * Plugins implement this to register their data format (e.g., Parquet, Lucene)
  * with the DataFormatRegistry during node bootstrap.
  *
  * @opensearch.experimental
  */
-/**
- * Plugin interface for providing custom data format implementations.
- * Plugins implement this to register their data format (e.g., Parquet, Lucene)
- * with the DataFormatRegistry during node bootstrap.
Suggestion importance[1-10]: 3

__

Why: The duplicate Javadoc block is a real issue in the PR diff, but it's a minor documentation problem that doesn't affect functionality.

Low
Suggestions up to commit ff770b5
CategorySuggestion                                                                                                                                    Impact
Possible issue
Keep checked-out writers locked after checkout

In checkoutAll, writers that are locked but fail the isRegistered check (e.g.,
already removed) are unlocked in the finally block, which is correct. However,
writers that are successfully checked out are also unlocked in the finally block,
meaning they are returned to callers in an unlocked state. The callers (in refresh)
expect checked-out writers to remain locked for the duration of the flush. The
unlock should only happen for writers that are NOT checked out.

server/src/main/java/org/opensearch/index/engine/dataformat/DataformatAwareLockableWriterPool.java [110-131]

 public List<W> checkoutAll() {
     ensureOpen();
     List<W> lockedWriters = new ArrayList<>();
     List<W> checkedOutWriters = new ArrayList<>();
     for (W writer : this) {
         writer.lock();
         lockedWriters.add(writer);
     }
     synchronized (this) {
         for (W writer : lockedWriters) {
-            try {
-                if (isRegistered(writer) && writers.remove(writer)) {
-                    availableWriters.remove(writer);
-                    checkedOutWriters.add(writer);
-                }
-            } finally {
+            if (isRegistered(writer) && writers.remove(writer)) {
+                availableWriters.remove(writer);
+                checkedOutWriters.add(writer);
+            } else {
                 writer.unlock();
             }
         }
     }
     return Collections.unmodifiableList(checkedOutWriters);
 }
Suggestion importance[1-10]: 9

__

Why: This is a critical correctness bug. The finally block unconditionally unlocks all writers including successfully checked-out ones, but the refresh method in CompositeIndexingExecutionEngine expects writers returned by checkoutAll to remain locked. The fix correctly only unlocks writers that were NOT checked out.

High
Unlock entry before adding to queue

The addAndUnlock method adds the entry to the queue and then unlocks it. However,
between queue.add(entry) and entry.unlock(), another thread calling lockAndPoll
could see the entry in the queue but fail to lock it (since it's still locked),
causing it to miss the entry. The unlock should happen before the add so the entry
is lockable as soon as it's visible in the queue.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/LockableConcurrentQueue.java [74-78]

 public void addAndUnlock(T entry) {
+    entry.unlock();
     queue.add(entry);
-    entry.unlock();
     addAndUnlockCounter.incrementAndGet();
 }
Suggestion importance[1-10]: 8

__

Why: This is a valid race condition concern. If the entry is added to the queue while still locked, a polling thread could see it but fail tryLock, potentially missing it. Unlocking before adding ensures the entry is immediately acquirable. However, the addAndUnlockCounter increment after queue.add in the suggested code means the counter is still incremented after the add, which is the correct ordering for the retry loop in lockAndPoll.

Medium
Release writer on document input creation failure

If primaryEngine.newDocumentInput() or any secondary newDocumentInput() throws an
exception, the writer that was locked via writerPool.getAndLock() will never be
released back to the pool, causing a resource leak. The writer checkout should be
wrapped in a try-catch to ensure it is released on failure.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java [257-271]

 public CompositeDocumentInput newDocumentInput() {
     CompositeWriter writer = writerPool.getAndLock();
-    DocumentInput<?> primaryInput = primaryEngine.newDocumentInput();
-    Map<DataFormat, DocumentInput<?>> secondaryInputMap = new LinkedHashMap<>();
-    for (IndexingExecutionEngine<?, ?> engine : secondaryEngines) {
-        secondaryInputMap.put(engine.getDataFormat(), engine.newDocumentInput());
+    try {
+        DocumentInput<?> primaryInput = primaryEngine.newDocumentInput();
+        Map<DataFormat, DocumentInput<?>> secondaryInputMap = new LinkedHashMap<>();
+        for (IndexingExecutionEngine<?, ?> engine : secondaryEngines) {
+            secondaryInputMap.put(engine.getDataFormat(), engine.newDocumentInput());
+        }
+        return new CompositeDocumentInput(primaryEngine.getDataFormat(), primaryInput, secondaryInputMap, () -> {
+            assert writer.isFlushPending() == false && writer.isAborted() == false : "CompositeWriter has pending flush: "
+                + writer.isFlushPending()
+                + " aborted="
+                + writer.isAborted();
+            writerPool.releaseAndUnlock(writer);
+        });
+    } catch (Exception e) {
+        writerPool.releaseAndUnlock(writer);
+        throw e;
     }
-    return new CompositeDocumentInput(primaryEngine.getDataFormat(), primaryInput, secondaryInputMap, () -> {
-        assert writer.isFlushPending() == false && writer.isAborted() == false : "CompositeWriter has pending flush: "
-            + writer.isFlushPending()
-            + " aborted="
-            + writer.isAborted();
-        writerPool.releaseAndUnlock(writer);
-    });
 }
Suggestion importance[1-10]: 7

__

Why: This is a valid resource leak concern. If primaryEngine.newDocumentInput() or any secondary engine's newDocumentInput() throws, the locked writer from writerPool.getAndLock() would never be released, causing the pool to be permanently depleted of that writer slot.

Medium
General
Prevent race condition in pool initialization

The initialize method has a race condition: two threads could both pass the null
check before either sets writerSupplier, resulting in double initialization. Since
writerSupplier is a volatile field, a synchronized block or a SetOnce-style atomic
check-and-set is needed to make this thread-safe.

server/src/main/java/org/opensearch/index/engine/dataformat/DataformatAwareLockableWriterPool.java [68-73]

-public void initialize(Supplier<W> writerSupplier) {
+public synchronized void initialize(Supplier<W> writerSupplier) {
     if (this.writerSupplier != null) {
         throw new IllegalStateException("DataformatAwareLockableWriterPool is already initialized");
     }
     this.writerSupplier = Objects.requireNonNull(writerSupplier, "writerSupplier must not be null");
 }
Suggestion importance[1-10]: 6

__

Why: The race condition in initialize is a valid concern since two threads could both pass the null check before either sets writerSupplier. Adding synchronized is a simple and correct fix, though in practice initialize is typically called once during construction.

Low
Suggestions up to commit ea0817e
CategorySuggestion                                                                                                                                    Impact
Possible issue
Preserve lock on successfully checked-out writers

In checkoutAll, writers that are successfully checked out are unlocked in the
finally block, but the callers (e.g., refresh) expect to hold the lock on the
checked-out writers. Writers that are not checked out (e.g., already unregistered)
should be unlocked, but writers that are successfully checked out should remain
locked until the caller explicitly releases them.

server/src/main/java/org/opensearch/index/engine/dataformat/DataformatAwareLockableWriterPool.java [110-131]

 public List<W> checkoutAll() {
     ensureOpen();
     List<W> lockedWriters = new ArrayList<>();
     List<W> checkedOutWriters = new ArrayList<>();
     for (W writer : this) {
         writer.lock();
         lockedWriters.add(writer);
     }
     synchronized (this) {
         for (W writer : lockedWriters) {
-            try {
-                if (isRegistered(writer) && writers.remove(writer)) {
-                    availableWriters.remove(writer);
-                    checkedOutWriters.add(writer);
-                }
-            } finally {
+            if (isRegistered(writer) && writers.remove(writer)) {
+                availableWriters.remove(writer);
+                checkedOutWriters.add(writer);
+            } else {
                 writer.unlock();
             }
         }
     }
     return Collections.unmodifiableList(checkedOutWriters);
 }
Suggestion importance[1-10]: 8

__

Why: This is a real bug: the finally block unconditionally unlocks all writers including those successfully checked out, but callers like refresh in CompositeIndexingExecutionEngine expect to hold the lock on checked-out writers. The improved code correctly only unlocks writers that were not checked out.

Medium
Unlock entry before adding to queue

The addAndUnlock method adds the entry to the queue and then unlocks it. However,
between queue.add(entry) and entry.unlock(), another thread calling lockAndPoll
could see the entry in the queue but fail to lock it (since it's still locked),
causing a spurious retry loop. The entry should be unlocked before being added to
the queue so that it is immediately acquirable by polling threads.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/LockableConcurrentQueue.java [74-78]

 public void addAndUnlock(T entry) {
+    entry.unlock();
     queue.add(entry);
-    entry.unlock();
     addAndUnlockCounter.incrementAndGet();
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion points to a real ordering issue: if an entry is added to the queue while still locked, a polling thread could find it but fail tryLock, causing unnecessary retries. However, the file path in the suggestion is wrong (the class is in LockableConcurrentQueue.java under libs/concurrent-queue), and the actual code lines are in that file at lines 74-78. The logic concern is valid but the window is very small since unlock immediately follows add.

Medium
Fix race condition in initialization check

The initialize method has a race condition: two threads could both pass the null
check before either sets writerSupplier, resulting in double initialization. Use a
synchronized block or SetOnce (already imported) to make this check-and-set atomic.

server/src/main/java/org/opensearch/index/engine/dataformat/DataformatAwareLockableWriterPool.java [68-73]

-public void initialize(Supplier<W> writerSupplier) {
+public synchronized void initialize(Supplier<W> writerSupplier) {
     if (this.writerSupplier != null) {
         throw new IllegalStateException("DataformatAwareLockableWriterPool is already initialized");
     }
     this.writerSupplier = Objects.requireNonNull(writerSupplier, "writerSupplier must not be null");
 }
Suggestion importance[1-10]: 6

__

Why: The race condition in initialize is a real concurrency issue since two threads could both pass the null check before either sets writerSupplier. Adding synchronized is a straightforward fix, though in practice initialize is typically called once during construction.

Low
General
Return actual inputs instead of null

getFinalInput() always returns null, which may cause NullPointerException in callers
that expect a valid list of per-format document inputs. It should return a list
containing the primary and all secondary document inputs so callers can access the
finalized inputs for each format.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDocumentInput.java [76-78]

 @Override
 public List<? extends DocumentInput<?>> getFinalInput() {
-    return null;
+    List<DocumentInput<?>> inputs = new ArrayList<>();
+    inputs.add(primaryDocumentInput);
+    inputs.addAll(secondaryDocumentInputs.values());
+    return Collections.unmodifiableList(inputs);
 }
Suggestion importance[1-10]: 4

__

Why: While returning null from getFinalInput() could be problematic for callers, the existing code and tests explicitly assert null is returned, suggesting this is intentional for now (with a TODO-like design). The suggestion may be premature without knowing the full contract of getFinalInput.

Low

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 392e1fd: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit b3f4e8a

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for b3f4e8a: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit ae4560a

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 03e124c

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 03e124c: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 82ffb04

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit dc8d029

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for dc8d029: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 6687b5d

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 6687b5d: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Mar 19, 2026

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit 8f87d7e.

PathLineSeverityDescription
server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatPlugin.java13lowDuplicate Javadoc comment block — the same plugin interface documentation appears twice in the file. Likely a copy-paste artifact with no security impact, but anomalous enough to note as it could obscure intentional modifications to the interface contract.

The table above displays the top 10 most important findings.

Total: 1 | Critical: 0 | High: 0 | Medium: 0 | Low: 1


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 238416a

@alchemist51 alchemist51 added the Indexing Indexing, Bulk Indexing and anything related to indexing label Mar 19, 2026
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 61cd8e4

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 61cd8e4: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 2e5d2fa

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit f685976

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 5214646

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 166e609

@github-actions
Copy link
Copy Markdown
Contributor

Failed to generate code suggestions for PR

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 8f87d7e: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Failed to generate code suggestions for PR

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for c7cdd18: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@alchemist51
Copy link
Copy Markdown
Contributor Author

REPRODUCE WITH: ./gradlew ':server:internalClusterTest' --tests 'org.opensearch.recovery.RecoveryWhileUnderLoadIT' -Dtests.method='testRecoverWhileUnderLoadAllocateReplicasRelocatePrimariesTest {p0={"cluster.indices.replication.strategy":"SEGMENT"}}' -Dtests.seed=FA2A6A0B41ACE982 -Dtests.security.manager=true -Dtests.jvm.argline="-XX:TieredStopAtLevel=1 -XX:ReservedCodeCacheSize=64m" -Dtests.locale=en-NZ -Dtests.timezone=Atlantic/Madeira -Druntime.java=25

RecoveryWhileUnderLoadIT > testRecoverWhileUnderLoadAllocateReplicasRelocatePrimariesTest > testRecoverWhileUnderLoadAllocateReplicasRelocatePrimariesTest {p0={"cluster.indices.replication.strategy":"SEGMENT"}} FAILED
    java.lang.RuntimeException: java.util.NoSuchElementException: No value present
        at __randomizedtesting.SeedInfo.seed([FA2A6A0B41ACE982:2303C8CAE5033328]:0)
        at org.opensearch.test.OpenSearchIntegTestCase.waitForReplication(OpenSearchIntegTestCase.java:2590)
        at org.opensearch.recovery.RecoveryWhileUnderLoadIT.assertAfterRefreshAndWaitForReplication(RecoveryWhileUnderLoadIT.java:504)
        at org.opensearch.recovery.RecoveryWhileUnderLoadIT.testRecoverWhileUnderLoadAllocateReplicasRelocatePrimariesTest(RecoveryWhileUnderLoadIT.java:236)

        Caused by:
        java.util.NoSuchElementException: No value present
            at java.****/java.util.Optional.get(Optional.java:143)
            at org.opensearch.test.OpenSearchIntegTestCase.getIndexShard(OpenSearchIntegTestCase.java:2613)
            at org.opensearch.test.OpenSearchIntegTestCase.getIndexShard(OpenSearchIntegTestCase.java:2602)
            at org.opensearch.test.OpenSearchIntegTestCase.lambda$waitForReplication$1(OpenSearchIntegTestCase.java:2564)
            at org.opensearch.test.OpenSearchTestCase.assertBusy(OpenSearchTestCase.java:1181)
            at org.opensearch.test.OpenSearchIntegTestCase.waitForReplication(OpenSearchIntegTestCase.java:2558)
            ... 2 more

@github-actions
Copy link
Copy Markdown
Contributor

Failed to generate code suggestions for PR

@github-actions
Copy link
Copy Markdown
Contributor

Failed to generate code suggestions for PR

@github-actions
Copy link
Copy Markdown
Contributor

Failed to generate code suggestions for PR

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 3a9ddb3: SUCCESS

alchemist51 and others added 5 commits March 29, 2026 00:22
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
Co-authored-by: Bukhtawar Khan <bukhtawa@amazon.com>
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Failed to generate code suggestions for PR

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 2404098: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
Co-authored-by: Arpit Bandejiya <abandeji@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Failed to generate code suggestions for PR

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for e93d27a: SUCCESS

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Indexing Indexing, Bulk Indexing and anything related to indexing lucene

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants