Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Onboarding new maven snapshots publishing to s3 ([#19619](https://github.com/opensearch-project/OpenSearch/pull/19619))
- Remove MultiCollectorWrapper and use MultiCollector in Lucene instead ([#19595](https://github.com/opensearch-project/OpenSearch/pull/19595))
- Change implementation for `percentiles` aggregation for latency improvement ([#19648](https://github.com/opensearch-project/OpenSearch/pull/19648))
- Refactor the ThreadPoolStats.Stats class to use the Builder pattern instead of constructors ([#19306](https://github.com/opensearch-project/OpenSearch/pull/19306))

### Fixed
- Fix Allocation and Rebalance Constraints of WeightFunction are incorrectly reset ([#19012](https://github.com/opensearch-project/OpenSearch/pull/19012))
Expand All @@ -42,6 +43,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `stefanzweifel/git-auto-commit-action` from 6 to 7 ([#19689](https://github.com/opensearch-project/OpenSearch/pull/19689))

### Deprecated
- Deprecated existing constructors in ThreadPoolStats.Stats in favor of the new Builder ([#19306](https://github.com/opensearch-project/OpenSearch/pull/19306))

### Removed

Expand Down
25 changes: 18 additions & 7 deletions server/src/main/java/org/opensearch/threadpool/ThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,18 @@ public ThreadPoolStats stats() {
rejected = ((XRejectedExecutionHandler) rejectedExecutionHandler).rejected();
}
}
stats.add(new ThreadPoolStats.Stats(name, threads, queue, active, rejected, largest, completed, waitTimeNanos, parallelism));
stats.add(
new ThreadPoolStats.Stats.Builder().name(name)
.threads(threads)
.queue(queue)
.active(active)
.rejected(rejected)
.largest(largest)
.completed(completed)
.waitTimeNanos(waitTimeNanos)
.parallelism(parallelism)
.build()
);
}
return new ThreadPoolStats(stats);
}
Expand Down Expand Up @@ -606,14 +617,14 @@ public ExecutorService executor(String name) {
/**
* Schedules a one-shot command to run after a given delay. The command is run in the context of the calling thread.
*
* @param command the command to run
* @param delay delay before the task executes
* @param command the command to run
* @param delay delay before the task executes
* @param executor the name of the thread pool on which to execute this task. SAME means "execute on the scheduler thread" which changes
* the meaning of the ScheduledFuture returned by this method. In that case the ScheduledFuture will complete only when the
* command completes.
* the meaning of the ScheduledFuture returned by this method. In that case the ScheduledFuture will complete only when the
* command completes.
* @return a ScheduledFuture who's get will return when the task is has been added to its target thread pool and throw an exception if
* the task is canceled before it was added to its target thread pool. Once the task has been added to its target thread pool
* the ScheduledFuture will cannot interact with it.
* the task is canceled before it was added to its target thread pool. Once the task has been added to its target thread pool
* the ScheduledFuture will cannot interact with it.
* @throws OpenSearchRejectedExecutionException if the task cannot be scheduled for execution
*/
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,28 @@ public static class Stats implements Writeable, ToXContentFragment, Comparable<S
private final long waitTimeNanos;
private final int parallelism;

/**
* Private constructor that takes a builder.
* This is the sole entry point for creating a new Stats object.
* @param builder The builder instance containing all the values.
*/
private Stats(Builder builder) {
this.name = builder.name;
this.threads = builder.threads;
this.queue = builder.queue;
this.active = builder.active;
this.rejected = builder.rejected;
this.largest = builder.largest;
this.completed = builder.completed;
this.waitTimeNanos = builder.waitTimeNanos;
this.parallelism = builder.parallelism;
}

/**
* This constructor will be deprecated starting in version 3.3.0.
* Use {@link Builder} instead.
*/
@Deprecated
public Stats(String name, int threads, int queue, int active, long rejected, int largest, long completed, long waitTimeNanos) {
this.name = name;
this.threads = threads;
Expand Down Expand Up @@ -226,6 +248,77 @@ public int compareTo(Stats other) {
return compare;
}
}

/**
* Builder for the {@link Stats} class.
* Provides a fluent API for constructing a Stats object.
*/
public static class Builder {
private String name = "";
private int threads = 0;
private int queue = 0;
private int active = 0;
private long rejected = 0;
private int largest = 0;
private long completed = 0;
private long waitTimeNanos = 0;
private int parallelism = 0;

public Builder() {}

public Builder name(String name) {
this.name = name;
return this;
}

public Builder threads(int threads) {
this.threads = threads;
return this;
}

public Builder queue(int queue) {
this.queue = queue;
return this;
}

public Builder active(int active) {
this.active = active;
return this;
}

public Builder rejected(long rejected) {
this.rejected = rejected;
return this;
}

public Builder largest(int largest) {
this.largest = largest;
return this;
}

public Builder completed(long completed) {
this.completed = completed;
return this;
}

public Builder waitTimeNanos(long waitTimeNanos) {
this.waitTimeNanos = waitTimeNanos;
return this;
}

public Builder parallelism(int parallelism) {
this.parallelism = parallelism;
return this;
}

/**
* Creates a {@link Stats} object from the builder's current state.
* @return A new Stats instance.
*/
public Stats build() {
return new Stats(this);
}
}
}

private List<Stats> stats;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -734,17 +734,16 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) throws IOExcep
List<ThreadPoolStats.Stats> threadPoolStatsList = new ArrayList<>();
for (int i = 0; i < numThreadPoolStats; i++) {
threadPoolStatsList.add(
new ThreadPoolStats.Stats(
randomAlphaOfLengthBetween(3, 10),
randomIntBetween(1, 1000),
randomIntBetween(1, 1000),
randomIntBetween(1, 1000),
randomNonNegativeLong(),
randomIntBetween(1, 1000),
randomIntBetween(1, 1000),
randomIntBetween(-1, 10),
-1 // Non-ForkJoinPool: use -1
)
new ThreadPoolStats.Stats.Builder().name(randomAlphaOfLengthBetween(3, 10))
.threads(randomIntBetween(1, 1000))
.queue(randomIntBetween(1, 1000))
.active(randomIntBetween(1, 1000))
.rejected(randomNonNegativeLong())
.largest(randomIntBetween(1, 1000))
.completed(randomIntBetween(1, 1000))
.waitTimeNanos(randomIntBetween(-1, 10))
.parallelism(-1) // Non-ForkJoinPool: use -1
.build()
);
}
threadPoolStats = new ThreadPoolStats(threadPoolStatsList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,18 @@ public void testNodeValidatorWithHealthyResources() {
when(cpu.getPercent()).thenReturn((short) 50);
when(jvm.getHeapUsedPercent()).thenReturn((short) 60);
ThreadPoolStats stats = new ThreadPoolStats(
Arrays.asList(new ThreadPoolStats.Stats(
ThreadPool.Names.FORCE_MERGE, 1, 0, 0, 0, 1, 0, 0, -1
))
Arrays.asList(
new ThreadPoolStats.Stats.Builder().name(ThreadPool.Names.FORCE_MERGE)
.threads(1)
.queue(0)
.active(0)
.rejected(0)
.largest(1)
.completed(0)
.waitTimeNanos(0)
.parallelism(-1)
.build()
)
);
when(threadPool.stats()).thenReturn(stats);

Expand All @@ -256,9 +265,17 @@ public void testNodeValidatorWithFeatureSwitch() {
when(cpu.getPercent()).thenReturn((short) 50);
when(jvm.getHeapUsedPercent()).thenReturn((short) 60);
ThreadPoolStats stats = new ThreadPoolStats(
Arrays.asList(new ThreadPoolStats.Stats(
ThreadPool.Names.FORCE_MERGE, 1, 0, 0, 0, 1, 0, 0, -1
))
Arrays.asList(new ThreadPoolStats.Stats.Builder().name(ThreadPool.Names.FORCE_MERGE)
.threads(1)
.queue(0)
.active(0)
.rejected(0)
.largest(1)
.completed(0)
.waitTimeNanos(0)
.parallelism(-1)
.build()
)
);
when(threadPool.stats()).thenReturn(stats);
Settings settings = getConfiguredClusterSettings(false, false, Collections.emptyMap());
Expand Down Expand Up @@ -335,9 +352,18 @@ public void testNodeValidatorWithInsufficientForceMergeThreads() {
when(cpu.getPercent()).thenReturn((short) 50);
when(jvm.getHeapUsedPercent()).thenReturn((short) 50);
ThreadPoolStats stats = new ThreadPoolStats(
Arrays.asList(new ThreadPoolStats.Stats(
ThreadPool.Names.FORCE_MERGE, 1, 1, 1, 0, 1, 0, -1, -1
))
Arrays.asList(
new ThreadPoolStats.Stats.Builder().name(ThreadPool.Names.FORCE_MERGE)
.threads(1)
.queue(1)
.active(1)
.rejected(0)
.largest(1)
.completed(0)
.waitTimeNanos(-1)
.parallelism(-1)
.build()
)
);
when(threadPool.stats()).thenReturn(stats);
AutoForceMergeManager autoForceMergeManager = clusterSetupWithNode(getConfiguredClusterSettings(true, true, Collections.emptyMap()), getNodeWithRoles(DATA_NODE_1, Set.of(DiscoveryNodeRole.DATA_ROLE)));
Expand Down Expand Up @@ -474,7 +500,18 @@ public void testForceMergeOperationOnDataNodeWithFailingMerges() throws IOExcept
ExecutorService executorService = Executors.newFixedThreadPool(forceMergeThreads);
when(threadPool.executor(ThreadPool.Names.FORCE_MERGE)).thenReturn(executorService);
ThreadPoolStats stats = new ThreadPoolStats(
Arrays.asList(new ThreadPoolStats.Stats(ThreadPool.Names.FORCE_MERGE, forceMergeThreads, 0, 0, 0, forceMergeThreads, 0, -1, -1))
Arrays.asList(
new ThreadPoolStats.Stats.Builder().name(ThreadPool.Names.FORCE_MERGE)
.threads(forceMergeThreads)
.queue(0)
.active(0)
.rejected(0)
.largest(forceMergeThreads)
.completed(0)
.waitTimeNanos(-1)
.parallelism(-1)
.build()
)
);
when(threadPool.stats()).thenReturn(stats);

Expand Down Expand Up @@ -524,7 +561,18 @@ public void testForceMergeOperationOnDataNodeOfWarmEnabledCluster() throws IOExc
ExecutorService executorService = Executors.newFixedThreadPool(forceMergeThreads);
when(threadPool.executor(ThreadPool.Names.FORCE_MERGE)).thenReturn(executorService);
ThreadPoolStats stats = new ThreadPoolStats(
Arrays.asList(new ThreadPoolStats.Stats(ThreadPool.Names.FORCE_MERGE, forceMergeThreads, 0, 0, 0, forceMergeThreads, 0, -1, -1))
Arrays.asList(
new ThreadPoolStats.Stats.Builder().name(ThreadPool.Names.FORCE_MERGE)
.threads(forceMergeThreads)
.queue(0)
.active(0)
.rejected(0)
.largest(forceMergeThreads)
.completed(0)
.waitTimeNanos(-1)
.parallelism(-1)
.build()
)
);
when(threadPool.stats()).thenReturn(stats);
IndexService indexService1 = mock(IndexService.class);
Expand Down Expand Up @@ -580,7 +628,18 @@ public void testForceMergeOperationOnDataNodeWithThreadInterruption() throws Int
ExecutorService executorService = Executors.newFixedThreadPool(forceMergeThreads);
when(threadPool.executor(ThreadPool.Names.FORCE_MERGE)).thenReturn(executorService);
ThreadPoolStats stats = new ThreadPoolStats(
Arrays.asList(new ThreadPoolStats.Stats(ThreadPool.Names.FORCE_MERGE, forceMergeThreads, 0, 0, 0, forceMergeThreads, 0, -1, -1))
Arrays.asList(
new ThreadPoolStats.Stats.Builder().name(ThreadPool.Names.FORCE_MERGE)
.threads(forceMergeThreads)
.queue(0)
.active(0)
.rejected(0)
.largest(forceMergeThreads)
.completed(0)
.waitTimeNanos(-1)
.parallelism(-1)
.build()
)
);
when(threadPool.stats()).thenReturn(stats);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,20 @@
public class ThreadPoolStatsTests extends OpenSearchTestCase {
public void testThreadPoolStatsSort() throws IOException {
List<ThreadPoolStats.Stats> stats = new ArrayList<>();
stats.add(new ThreadPoolStats.Stats("z", -1, 0, 0, 0, 0, 0L, 0L, -1));
stats.add(new ThreadPoolStats.Stats("m", 3, 0, 0, 0, 0, 0L, 0L, -1));
stats.add(new ThreadPoolStats.Stats("m", 1, 0, 0, 0, 0, 0L, 0L, -1));
stats.add(new ThreadPoolStats.Stats("d", -1, 0, 0, 0, 0, 0L, 0L, -1));
stats.add(new ThreadPoolStats.Stats("m", 2, 0, 0, 0, 0, 0L, 0L, -1));
stats.add(new ThreadPoolStats.Stats("t", -1, 0, 0, 0, 0, 0L, 0L, -1));
stats.add(new ThreadPoolStats.Stats("a", -1, 0, 0, 0, 0, 0L, 0L, -1));
ThreadPoolStats.Stats.Builder defaultStats = new ThreadPoolStats.Stats.Builder().queue(0)
.active(0)
.rejected(0)
.largest(0)
.completed(0L)
.waitTimeNanos(0L)
.parallelism(-1);
stats.add(defaultStats.name("z").threads(-1).build());
stats.add(defaultStats.name("m").threads(3).build());
stats.add(defaultStats.name("m").threads(1).build());
stats.add(defaultStats.name("d").threads(-1).build());
stats.add(defaultStats.name("m").threads(2).build());
stats.add(defaultStats.name("t").threads(-1).build());
stats.add(defaultStats.name("a").threads(-1).build());

List<ThreadPoolStats.Stats> copy = new ArrayList<>(stats);
Collections.sort(copy);
Expand Down Expand Up @@ -131,7 +138,16 @@ public void testStatsCompareToWithParallelism() {
}

public void testStatsGetters() {
ThreadPoolStats.Stats stats = new ThreadPoolStats.Stats("test", 1, 2, 3, 4L, 5, 6L, 7L, 8);
ThreadPoolStats.Stats stats = new ThreadPoolStats.Stats.Builder().name("test")
.threads(1)
.queue(2)
.active(3)
.rejected(4L)
.largest(5)
.completed(6L)
.waitTimeNanos(7L)
.parallelism(8)
.build();
assertEquals("test", stats.getName());
assertEquals(1, stats.getThreads());
assertEquals(2, stats.getQueue());
Expand All @@ -147,11 +163,18 @@ public void testThreadPoolStatsToXContent() throws IOException {
try (BytesStreamOutput os = new BytesStreamOutput()) {

List<ThreadPoolStats.Stats> stats = new ArrayList<>();
stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.SEARCH, -1, 0, 0, 0, 0, 0L, 0L, -1));
stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.WARMER, -1, 0, 0, 0, 0, 0L, -1L, -1));
stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.GENERIC, -1, 0, 0, 0, 0, 0L, -1L, -1));
stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.FORCE_MERGE, -1, 0, 0, 0, 0, 0L, -1L, -1));
stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.SAME, -1, 0, 0, 0, 0, 0L, -1L, -1));
ThreadPoolStats.Stats.Builder defaultStats = new ThreadPoolStats.Stats.Builder().threads(-1)
.queue(0)
.active(0)
.rejected(0)
.largest(0)
.completed(0L)
.parallelism(-1);
stats.add(defaultStats.name(ThreadPool.Names.SEARCH).waitTimeNanos(0L).build());
stats.add(defaultStats.name(ThreadPool.Names.WARMER).waitTimeNanos(-1L).build());
stats.add(defaultStats.name(ThreadPool.Names.GENERIC).waitTimeNanos(-1L).build());
stats.add(defaultStats.name(ThreadPool.Names.FORCE_MERGE).waitTimeNanos(-1L).build());
stats.add(defaultStats.name(ThreadPool.Names.SAME).waitTimeNanos(-1L).build());

ThreadPoolStats threadPoolStats = new ThreadPoolStats(stats);
try (XContentBuilder builder = new XContentBuilder(MediaTypeRegistry.JSON.xContent(), os)) {
Expand Down
Loading