Skip to content

Commit be2a156

Browse files
committed
Dynamic Mapping support for Pluggable Data formats
Signed-off-by: rayshrey <rayshrey@amazon.com>
1 parent c20b3bd commit be2a156

36 files changed

Lines changed: 1447 additions & 164 deletions

File tree

libs/concurrent-queue/src/main/java/org/opensearch/common/queue/ConcurrentQueue.java

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ public final class ConcurrentQueue<T> {
3131
private final int concurrency;
3232
private final Lock[] locks;
3333
private final Queue<T>[] queues;
34-
private final Supplier<Queue<T>> queueSupplier;
3534

3635
ConcurrentQueue(Supplier<Queue<T>> queueSupplier, int concurrency) {
3736
if (concurrency < MIN_CONCURRENCY || concurrency > MAX_CONCURRENCY) {
@@ -40,7 +39,6 @@ public final class ConcurrentQueue<T> {
4039
);
4140
}
4241
this.concurrency = concurrency;
43-
this.queueSupplier = queueSupplier;
4442
locks = new Lock[concurrency];
4543
@SuppressWarnings({ "rawtypes", "unchecked" })
4644
Queue<T>[] queues = new Queue[concurrency];
@@ -81,21 +79,19 @@ void add(T entry) {
8179
}
8280

8381
T poll(Predicate<T> predicate) {
82+
return pollAndDropIncompatible(e -> true, predicate);
83+
}
84+
85+
T pollAndDropIncompatible(Predicate<T> isCompatible, Predicate<T> predicate) {
8486
final int threadHash = Thread.currentThread().hashCode() & 0xFFFF;
8587
for (int i = 0; i < concurrency; ++i) {
8688
final int index = (threadHash + i) % concurrency;
8789
final Lock lock = locks[index];
8890
final Queue<T> queue = queues[index];
8991
if (lock.tryLock()) {
9092
try {
91-
Iterator<T> it = queue.iterator();
92-
while (it.hasNext()) {
93-
T entry = it.next();
94-
if (predicate.test(entry)) {
95-
it.remove();
96-
return entry;
97-
}
98-
}
93+
T matched = scanAndDropIncompatible(queue, isCompatible, predicate);
94+
if (matched != null) return matched;
9995
} finally {
10096
lock.unlock();
10197
}
@@ -107,14 +103,8 @@ T poll(Predicate<T> predicate) {
107103
final Queue<T> queue = queues[index];
108104
lock.lock();
109105
try {
110-
Iterator<T> it = queue.iterator();
111-
while (it.hasNext()) {
112-
T entry = it.next();
113-
if (predicate.test(entry)) {
114-
it.remove();
115-
return entry;
116-
}
117-
}
106+
T matched = scanAndDropIncompatible(queue, isCompatible, predicate);
107+
if (matched != null) return matched;
118108
} finally {
119109
lock.unlock();
120110
}
@@ -137,4 +127,18 @@ boolean remove(T entry) {
137127
}
138128
return false;
139129
}
130+
131+
private T scanAndDropIncompatible(Queue<T> queue, Predicate<T> isCompatible, Predicate<T> predicate) {
132+
Iterator<T> it = queue.iterator();
133+
while (it.hasNext()) {
134+
T entry = it.next();
135+
if (isCompatible.test(entry) == false) {
136+
it.remove();
137+
} else if (predicate.test(entry)) {
138+
it.remove();
139+
return entry;
140+
}
141+
}
142+
return null;
143+
}
140144
}

libs/concurrent-queue/src/main/java/org/opensearch/common/queue/LockableConcurrentQueue.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import java.util.Queue;
1212
import java.util.concurrent.atomic.AtomicInteger;
13+
import java.util.function.Predicate;
1314
import java.util.function.Supplier;
1415

1516
/**
@@ -56,6 +57,26 @@ public T lockAndPoll() {
5657
return null;
5758
}
5859

60+
/**
61+
* Poll with compatibility filter. Scans the queue for a compatible entry that can be locked.
62+
* Incompatible entries are removed from the queue.
63+
* Compatible entries that cannot be locked (held by another thread) are skipped.
64+
*
65+
* @param isCompatible predicate to test compatibility — failing entries are removed
66+
* @return the locked matched entry, or null if none found
67+
*/
68+
public T lockAndPollWithRejects(Predicate<T> isCompatible) {
69+
int addAndUnlockCount;
70+
do {
71+
addAndUnlockCount = addAndUnlockCounter.get();
72+
T entry = queue.pollAndDropIncompatible(isCompatible, Lockable::tryLock);
73+
if (entry != null) {
74+
return entry;
75+
}
76+
} while (addAndUnlockCount != addAndUnlockCounter.get());
77+
return null;
78+
}
79+
5980
/**
6081
* Remove an entry from the queue.
6182
*

libs/concurrent-queue/src/main/java/org/opensearch/common/queue/LockablePool.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,20 @@
1818
import java.util.Objects;
1919
import java.util.Queue;
2020
import java.util.Set;
21+
import java.util.function.Predicate;
2122
import java.util.function.Supplier;
2223

2324
/**
2425
* A thread-safe pool of {@link Lockable} items backed by a {@link LockableConcurrentQueue}.
2526
* Items are locked on checkout and unlocked on release, ensuring safe reuse across threads.
2627
* <p>
27-
* The pool is created with a supplier that produces new items on demand when the pool
28-
* is empty. Items are tracked in a set for registration checks and iteration.
28+
* The pool supports two modes of checkout:
29+
* <ul>
30+
* <li>{@link #getAndLock()} — returns any available item, or creates a new one</li>
31+
* <li>{@link #getAndLock(Predicate)} — returns a compatible item (per predicate),
32+
* rejecting incompatible ones. Rejected items are removed from the available queue
33+
* but remain tracked by the pool and included in {@link #checkoutAll()}.</li>
34+
* </ul>
2935
*
3036
* @param <T> the pooled item type, must implement {@link Lockable}
3137
*/
@@ -56,9 +62,21 @@ public LockablePool(Supplier<T> itemSupplier, Supplier<Queue<T>> queueSupplier,
5662
* @throws IllegalStateException if the pool is closed
5763
*/
5864
public T getAndLock() {
65+
return getAndLock(e -> true);
66+
}
67+
68+
/**
69+
* Locks and polls a compatible item from the pool in a single pass. Items that fail
70+
* the predicate are removed from the available queue but remain tracked by the pool.
71+
* If no compatible item is found, a new one is created using the constructor's item supplier.
72+
*
73+
* @param isCompatible predicate to test each polled item
74+
* @return a locked, compatible item
75+
* @throws IllegalStateException if the pool is closed
76+
*/
77+
public T getAndLock(Predicate<T> isCompatible) {
5978
ensureOpen();
60-
T item = availableItems.lockAndPoll();
61-
return Objects.requireNonNullElseGet(item, this::fetchItem);
79+
return Objects.requireNonNullElseGet(availableItems.lockAndPollWithRejects(isCompatible), this::fetchItem);
6280
}
6381

6482
private synchronized T fetchItem() {

libs/concurrent-queue/src/test/java/org/opensearch/common/queue/ConcurrentQueueTests.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,4 +160,73 @@ public void testConcurrentAddAndPoll() throws Exception {
160160
pollLatch.await();
161161
assertEquals(numThreads * itemsPerThread, totalPolled.get());
162162
}
163+
164+
// --- Tests for pollWithRejects ---
165+
166+
public void testPollAndDropIncompatibleReturnsCompatibleAndDropsIncompatible() {
167+
ConcurrentQueue<Integer> queue = new ConcurrentQueue<>(LinkedList::new, 1);
168+
queue.add(1);
169+
queue.add(2);
170+
queue.add(3);
171+
172+
// Compatible: even numbers. canSelect: always true.
173+
Integer result = queue.pollAndDropIncompatible(n -> n % 2 == 0, n -> true);
174+
assertEquals(Integer.valueOf(2), result);
175+
// 1 was rejected (removed), 3 still in queue (after the match, not scanned)
176+
assertEquals(Integer.valueOf(3), queue.poll(e -> true));
177+
assertNull(queue.poll(e -> true)); // 1 was removed
178+
}
179+
180+
public void testPollAndDropIncompatibleAllIncompatible() {
181+
ConcurrentQueue<Integer> queue = new ConcurrentQueue<>(LinkedList::new, 1);
182+
queue.add(1);
183+
queue.add(3);
184+
queue.add(5);
185+
186+
Integer result = queue.pollAndDropIncompatible(n -> n % 2 == 0, n -> true);
187+
assertNull(result);
188+
// All removed as incompatible
189+
assertNull(queue.poll(e -> true));
190+
}
191+
192+
public void testPollAndDropIncompatibleEmptyQueue() {
193+
ConcurrentQueue<Integer> queue = new ConcurrentQueue<>(LinkedList::new, 1);
194+
Integer result = queue.pollAndDropIncompatible(n -> true, n -> true);
195+
assertNull(result);
196+
}
197+
198+
public void testPollAndDropIncompatibleSkipsCompatibleButUnselectable() {
199+
ConcurrentQueue<Integer> queue = new ConcurrentQueue<>(LinkedList::new, 1);
200+
queue.add(1); // incompatible
201+
queue.add(2); // compatible but canSelect=false
202+
queue.add(4); // compatible and canSelect=true
203+
204+
Integer result = queue.pollAndDropIncompatible(
205+
n -> n % 2 == 0, // compatible: even
206+
n -> n > 3 // canSelect: > 3
207+
);
208+
assertEquals(Integer.valueOf(4), result);
209+
// 1 was rejected (removed), 2 should still be in queue (compatible but not selectable)
210+
assertEquals(Integer.valueOf(2), queue.poll(e -> true));
211+
assertNull(queue.poll(e -> true));
212+
}
213+
214+
public void testPollAndDropIncompatibleMultipleStripes() {
215+
ConcurrentQueue<Integer> queue = new ConcurrentQueue<>(LinkedList::new, 4);
216+
for (int i = 0; i < 20; i++) {
217+
queue.add(i);
218+
}
219+
220+
// Drop 0-9 as incompatible, select first compatible entry (>= 10)
221+
Integer first = queue.pollAndDropIncompatible(n -> n >= 10, n -> true);
222+
assertNotNull(first);
223+
assertTrue(first >= 10);
224+
225+
// Poll all remaining entries
226+
int count = 1; // counting the first result
227+
while (queue.poll(e -> true) != null) {
228+
count++;
229+
}
230+
assertEquals(10, count);
231+
}
163232
}

libs/concurrent-queue/src/test/java/org/opensearch/common/queue/LockableConcurrentQueueTests.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,4 +215,36 @@ public void testConcurrentLockAndPollAndAddAndUnlock() throws Exception {
215215
}
216216
assertEquals(numEntries, remaining);
217217
}
218+
219+
public void testLockAndPollWithRejectsReturnsCompatibleEntry() {
220+
LockableConcurrentQueue<LockableEntry> queue = new LockableConcurrentQueue<>(LinkedList::new, 1);
221+
LockableEntry e1 = new LockableEntry("old");
222+
LockableEntry e2 = new LockableEntry("old");
223+
LockableEntry e3 = new LockableEntry("current");
224+
seedEntry(queue, e1);
225+
seedEntry(queue, e2);
226+
seedEntry(queue, e3);
227+
228+
LockableEntry result = queue.lockAndPollWithRejects(e -> e.id.equals("current"));
229+
assertSame(e3, result);
230+
assertTrue(result.isHeldByCurrentThread());
231+
// e1 and e2 were incompatible and should have been dropped
232+
assertNull(queue.lockAndPoll());
233+
result.unlock();
234+
}
235+
236+
public void testLockAndPollWithRejectsAllIncompatible() {
237+
LockableConcurrentQueue<LockableEntry> queue = new LockableConcurrentQueue<>(LinkedList::new, 1);
238+
seedEntry(queue, new LockableEntry("old"));
239+
seedEntry(queue, new LockableEntry("old"));
240+
241+
assertNull(queue.lockAndPollWithRejects(e -> e.id.equals("new")));
242+
// Queue should be empty after all incompatible entries were dropped
243+
assertNull(queue.lockAndPoll());
244+
}
245+
246+
public void testLockAndPollWithRejectsEmptyQueue() {
247+
LockableConcurrentQueue<LockableEntry> queue = new LockableConcurrentQueue<>(LinkedList::new, 1);
248+
assertNull(queue.lockAndPollWithRejects(e -> true));
249+
}
218250
}

libs/concurrent-queue/src/test/java/org/opensearch/common/queue/LockablePoolTests.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,4 +143,57 @@ public void testClosedPoolThrowsOnCheckoutAll() throws IOException {
143143
IllegalStateException ex = expectThrows(IllegalStateException.class, pool::checkoutAll);
144144
assertEquals("LockablePool is already closed", ex.getMessage());
145145
}
146+
147+
// --- Tests for filtered getAndLock with rejection ---
148+
149+
public void testFilteredGetAndLockReturnsCompatibleItem() {
150+
LockablePool<LockableEntry> pool = createPool();
151+
LockableEntry item = pool.getAndLock();
152+
pool.releaseAndUnlock(item);
153+
154+
LockableEntry result = pool.getAndLock(e -> true);
155+
assertSame(item, result);
156+
pool.releaseAndUnlock(result);
157+
}
158+
159+
public void testFilteredGetAndLockRejectsIncompatibleItem() {
160+
LockablePool<LockableEntry> pool = createPool();
161+
LockableEntry item = pool.getAndLock();
162+
pool.releaseAndUnlock(item);
163+
164+
// Reject the existing item — predicate fails, pool creates new via supplier
165+
LockableEntry result = pool.getAndLock(e -> !e.id.equals(item.id));
166+
assertNotSame(item, result);
167+
pool.releaseAndUnlock(result);
168+
}
169+
170+
public void testCheckoutAllIncludesRejectedItems() {
171+
LockablePool<LockableEntry> pool = createPool();
172+
LockableEntry item = pool.getAndLock();
173+
pool.releaseAndUnlock(item);
174+
175+
// Reject the existing item
176+
LockableEntry fresh = pool.getAndLock(e -> !e.id.equals(item.id));
177+
pool.releaseAndUnlock(fresh);
178+
179+
List<LockableEntry> all = pool.checkoutAll();
180+
assertEquals(2, all.size());
181+
assertTrue(all.contains(item));
182+
assertTrue(all.contains(fresh));
183+
}
184+
185+
public void testRejectedItemNotReturnedBySubsequentPoll() {
186+
LockablePool<LockableEntry> pool = createPool();
187+
LockableEntry item = pool.getAndLock();
188+
pool.releaseAndUnlock(item);
189+
190+
// Reject it
191+
LockableEntry fresh = pool.getAndLock(e -> !e.id.equals(item.id));
192+
pool.releaseAndUnlock(fresh);
193+
194+
// Next poll should return fresh, not the rejected item
195+
LockableEntry next = pool.getAndLock();
196+
assertSame(fresh, next);
197+
pool.releaseAndUnlock(next);
198+
}
146199
}

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneIndexingExecutionEngine.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.opensearch.index.engine.dataformat.RefreshInput;
3232
import org.opensearch.index.engine.dataformat.RefreshResult;
3333
import org.opensearch.index.engine.dataformat.Writer;
34+
import org.opensearch.index.engine.dataformat.WriterConfig;
3435
import org.opensearch.index.engine.exec.Segment;
3536
import org.opensearch.index.engine.exec.WriterFileSet;
3637
import org.opensearch.index.engine.exec.commit.IndexStoreProvider;
@@ -52,7 +53,7 @@
5253
* Lucene-specific {@link IndexingExecutionEngine} that manages per-writer Lucene segments
5354
* and incorporates them into the shared {@link LuceneCommitter} writer during refresh.
5455
*
55-
* Write path: Each call to {@link #createWriter(long)} creates a {@link LuceneWriter} with its own
56+
* Write path: Each call to {@link #createWriter(WriterConfig)} creates a {@link LuceneWriter} with its own
5657
* {@link IndexWriter} in an isolated temp directory. Documents are indexed into this
5758
* per-writer segment. On flush, the writer force-merges to exactly 1 segment.
5859
*
@@ -145,17 +146,17 @@ public FormatStore getStore(DataFormat dataFormat) {
145146
* Creates a new {@link LuceneWriter} for the given generation in an isolated temp directory
146147
* under the shard's Lucene base directory.
147148
*
148-
* @param writerGeneration the generation number for the new writer
149+
* @param config the writer configuration
149150
* @return a new writer
150151
* @throws RuntimeException wrapping an {@link IOException} if writer creation fails
151152
*/
152153
@Override
153-
public Writer<LuceneDocumentInput> createWriter(long writerGeneration) {
154+
public Writer<LuceneDocumentInput> createWriter(WriterConfig config) {
154155
assert sharedWriter.isOpen() : "Cannot create writer — shared IndexWriter is closed";
155156
try {
156-
return new LuceneWriter(writerGeneration, dataFormat, baseDirectory, analyzer, codec);
157+
return new LuceneWriter(config.writerGeneration(), dataFormat, baseDirectory, analyzer, codec);
157158
} catch (IOException e) {
158-
throw new RuntimeException("Failed to create LuceneWriter for generation " + writerGeneration, e);
159+
throw new RuntimeException("Failed to create LuceneWriter for generation " + config.writerGeneration(), e);
159160
}
160161
}
161162

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneWriter.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,19 @@ public long generation() {
196196
return writerGeneration;
197197
}
198198

199+
@Override
200+
public boolean isSchemaMutable() {
201+
return true;
202+
}
203+
204+
@Override
205+
public long mappingVersion() {
206+
return 0;
207+
}
208+
209+
@Override
210+
public void updateMappingVersion(long newVersion) {}
211+
199212
/** Acquires the writer's reentrant lock. Used by the writer pool to serialize access. */
200213
@Override
201214
public void lock() {

0 commit comments

Comments
 (0)