diff --git a/CHANGELOG.md b/CHANGELOG.md index 41e7d2e12c564..395a6914ba9b2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Onboarding new maven snapshots publishing to s3 ([#19619](https://github.com/opensearch-project/OpenSearch/pull/19619)) - Remove MultiCollectorWrapper and use MultiCollector in Lucene instead ([#19595](https://github.com/opensearch-project/OpenSearch/pull/19595)) - Change implementation for `percentiles` aggregation for latency improvement ([#19648](https://github.com/opensearch-project/OpenSearch/pull/19648)) +- Refactor the ThreadPoolStats.Stats class to use the Builder pattern instead of constructors ([#19306](https://github.com/opensearch-project/OpenSearch/pull/19306)) ### Fixed - Fix Allocation and Rebalance Constraints of WeightFunction are incorrectly reset ([#19012](https://github.com/opensearch-project/OpenSearch/pull/19012)) @@ -42,6 +43,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `stefanzweifel/git-auto-commit-action` from 6 to 7 ([#19689](https://github.com/opensearch-project/OpenSearch/pull/19689)) ### Deprecated +- Deprecated existing constructors in ThreadPoolStats.Stats in favor of the new Builder ([#19306](https://github.com/opensearch-project/OpenSearch/pull/19306)) ### Removed diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index 5cc962e8992f1..20426d09b6207 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -567,7 +567,18 @@ public ThreadPoolStats stats() { rejected = ((XRejectedExecutionHandler) rejectedExecutionHandler).rejected(); } } - stats.add(new ThreadPoolStats.Stats(name, threads, queue, active, rejected, largest, completed, waitTimeNanos, parallelism)); + stats.add( + new ThreadPoolStats.Stats.Builder().name(name) + .threads(threads) + .queue(queue) + .active(active) + .rejected(rejected) + .largest(largest) + .completed(completed) + .waitTimeNanos(waitTimeNanos) + .parallelism(parallelism) + .build() + ); } return new ThreadPoolStats(stats); } @@ -606,14 +617,14 @@ public ExecutorService executor(String name) { /** * Schedules a one-shot command to run after a given delay. The command is run in the context of the calling thread. * - * @param command the command to run - * @param delay delay before the task executes + * @param command the command to run + * @param delay delay before the task executes * @param executor the name of the thread pool on which to execute this task. SAME means "execute on the scheduler thread" which changes - * the meaning of the ScheduledFuture returned by this method. In that case the ScheduledFuture will complete only when the - * command completes. + * the meaning of the ScheduledFuture returned by this method. In that case the ScheduledFuture will complete only when the + * command completes. * @return a ScheduledFuture who's get will return when the task is has been added to its target thread pool and throw an exception if - * the task is canceled before it was added to its target thread pool. Once the task has been added to its target thread pool - * the ScheduledFuture will cannot interact with it. + * the task is canceled before it was added to its target thread pool. Once the task has been added to its target thread pool + * the ScheduledFuture will cannot interact with it. * @throws OpenSearchRejectedExecutionException if the task cannot be scheduled for execution */ @Override diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPoolStats.java b/server/src/main/java/org/opensearch/threadpool/ThreadPoolStats.java index 92e2766109ef5..9ee058bed9d16 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPoolStats.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPoolStats.java @@ -73,6 +73,28 @@ public static class Stats implements Writeable, ToXContentFragment, Comparable stats; diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index 86ea238866a85..58b21a1e1b270 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -734,17 +734,16 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) throws IOExcep List threadPoolStatsList = new ArrayList<>(); for (int i = 0; i < numThreadPoolStats; i++) { threadPoolStatsList.add( - new ThreadPoolStats.Stats( - randomAlphaOfLengthBetween(3, 10), - randomIntBetween(1, 1000), - randomIntBetween(1, 1000), - randomIntBetween(1, 1000), - randomNonNegativeLong(), - randomIntBetween(1, 1000), - randomIntBetween(1, 1000), - randomIntBetween(-1, 10), - -1 // Non-ForkJoinPool: use -1 - ) + new ThreadPoolStats.Stats.Builder().name(randomAlphaOfLengthBetween(3, 10)) + .threads(randomIntBetween(1, 1000)) + .queue(randomIntBetween(1, 1000)) + .active(randomIntBetween(1, 1000)) + .rejected(randomNonNegativeLong()) + .largest(randomIntBetween(1, 1000)) + .completed(randomIntBetween(1, 1000)) + .waitTimeNanos(randomIntBetween(-1, 10)) + .parallelism(-1) // Non-ForkJoinPool: use -1 + .build() ); } threadPoolStats = new ThreadPoolStats(threadPoolStatsList); diff --git a/server/src/test/java/org/opensearch/index/autoforcemerge/AutoForceMergeManagerTests.java b/server/src/test/java/org/opensearch/index/autoforcemerge/AutoForceMergeManagerTests.java index 7ca32bccf045b..34019fb42a802 100644 --- a/server/src/test/java/org/opensearch/index/autoforcemerge/AutoForceMergeManagerTests.java +++ b/server/src/test/java/org/opensearch/index/autoforcemerge/AutoForceMergeManagerTests.java @@ -240,9 +240,18 @@ public void testNodeValidatorWithHealthyResources() { when(cpu.getPercent()).thenReturn((short) 50); when(jvm.getHeapUsedPercent()).thenReturn((short) 60); ThreadPoolStats stats = new ThreadPoolStats( - Arrays.asList(new ThreadPoolStats.Stats( - ThreadPool.Names.FORCE_MERGE, 1, 0, 0, 0, 1, 0, 0, -1 - )) + Arrays.asList( + new ThreadPoolStats.Stats.Builder().name(ThreadPool.Names.FORCE_MERGE) + .threads(1) + .queue(0) + .active(0) + .rejected(0) + .largest(1) + .completed(0) + .waitTimeNanos(0) + .parallelism(-1) + .build() + ) ); when(threadPool.stats()).thenReturn(stats); @@ -256,9 +265,17 @@ public void testNodeValidatorWithFeatureSwitch() { when(cpu.getPercent()).thenReturn((short) 50); when(jvm.getHeapUsedPercent()).thenReturn((short) 60); ThreadPoolStats stats = new ThreadPoolStats( - Arrays.asList(new ThreadPoolStats.Stats( - ThreadPool.Names.FORCE_MERGE, 1, 0, 0, 0, 1, 0, 0, -1 - )) + Arrays.asList(new ThreadPoolStats.Stats.Builder().name(ThreadPool.Names.FORCE_MERGE) + .threads(1) + .queue(0) + .active(0) + .rejected(0) + .largest(1) + .completed(0) + .waitTimeNanos(0) + .parallelism(-1) + .build() + ) ); when(threadPool.stats()).thenReturn(stats); Settings settings = getConfiguredClusterSettings(false, false, Collections.emptyMap()); @@ -335,9 +352,18 @@ public void testNodeValidatorWithInsufficientForceMergeThreads() { when(cpu.getPercent()).thenReturn((short) 50); when(jvm.getHeapUsedPercent()).thenReturn((short) 50); ThreadPoolStats stats = new ThreadPoolStats( - Arrays.asList(new ThreadPoolStats.Stats( - ThreadPool.Names.FORCE_MERGE, 1, 1, 1, 0, 1, 0, -1, -1 - )) + Arrays.asList( + new ThreadPoolStats.Stats.Builder().name(ThreadPool.Names.FORCE_MERGE) + .threads(1) + .queue(1) + .active(1) + .rejected(0) + .largest(1) + .completed(0) + .waitTimeNanos(-1) + .parallelism(-1) + .build() + ) ); when(threadPool.stats()).thenReturn(stats); AutoForceMergeManager autoForceMergeManager = clusterSetupWithNode(getConfiguredClusterSettings(true, true, Collections.emptyMap()), getNodeWithRoles(DATA_NODE_1, Set.of(DiscoveryNodeRole.DATA_ROLE))); @@ -474,7 +500,18 @@ public void testForceMergeOperationOnDataNodeWithFailingMerges() throws IOExcept ExecutorService executorService = Executors.newFixedThreadPool(forceMergeThreads); when(threadPool.executor(ThreadPool.Names.FORCE_MERGE)).thenReturn(executorService); ThreadPoolStats stats = new ThreadPoolStats( - Arrays.asList(new ThreadPoolStats.Stats(ThreadPool.Names.FORCE_MERGE, forceMergeThreads, 0, 0, 0, forceMergeThreads, 0, -1, -1)) + Arrays.asList( + new ThreadPoolStats.Stats.Builder().name(ThreadPool.Names.FORCE_MERGE) + .threads(forceMergeThreads) + .queue(0) + .active(0) + .rejected(0) + .largest(forceMergeThreads) + .completed(0) + .waitTimeNanos(-1) + .parallelism(-1) + .build() + ) ); when(threadPool.stats()).thenReturn(stats); @@ -524,7 +561,18 @@ public void testForceMergeOperationOnDataNodeOfWarmEnabledCluster() throws IOExc ExecutorService executorService = Executors.newFixedThreadPool(forceMergeThreads); when(threadPool.executor(ThreadPool.Names.FORCE_MERGE)).thenReturn(executorService); ThreadPoolStats stats = new ThreadPoolStats( - Arrays.asList(new ThreadPoolStats.Stats(ThreadPool.Names.FORCE_MERGE, forceMergeThreads, 0, 0, 0, forceMergeThreads, 0, -1, -1)) + Arrays.asList( + new ThreadPoolStats.Stats.Builder().name(ThreadPool.Names.FORCE_MERGE) + .threads(forceMergeThreads) + .queue(0) + .active(0) + .rejected(0) + .largest(forceMergeThreads) + .completed(0) + .waitTimeNanos(-1) + .parallelism(-1) + .build() + ) ); when(threadPool.stats()).thenReturn(stats); IndexService indexService1 = mock(IndexService.class); @@ -580,7 +628,18 @@ public void testForceMergeOperationOnDataNodeWithThreadInterruption() throws Int ExecutorService executorService = Executors.newFixedThreadPool(forceMergeThreads); when(threadPool.executor(ThreadPool.Names.FORCE_MERGE)).thenReturn(executorService); ThreadPoolStats stats = new ThreadPoolStats( - Arrays.asList(new ThreadPoolStats.Stats(ThreadPool.Names.FORCE_MERGE, forceMergeThreads, 0, 0, 0, forceMergeThreads, 0, -1, -1)) + Arrays.asList( + new ThreadPoolStats.Stats.Builder().name(ThreadPool.Names.FORCE_MERGE) + .threads(forceMergeThreads) + .queue(0) + .active(0) + .rejected(0) + .largest(forceMergeThreads) + .completed(0) + .waitTimeNanos(-1) + .parallelism(-1) + .build() + ) ); when(threadPool.stats()).thenReturn(stats); diff --git a/server/src/test/java/org/opensearch/threadpool/ThreadPoolStatsTests.java b/server/src/test/java/org/opensearch/threadpool/ThreadPoolStatsTests.java index 8323ecc5096a7..f5d32a58f3797 100644 --- a/server/src/test/java/org/opensearch/threadpool/ThreadPoolStatsTests.java +++ b/server/src/test/java/org/opensearch/threadpool/ThreadPoolStatsTests.java @@ -54,13 +54,20 @@ public class ThreadPoolStatsTests extends OpenSearchTestCase { public void testThreadPoolStatsSort() throws IOException { List stats = new ArrayList<>(); - stats.add(new ThreadPoolStats.Stats("z", -1, 0, 0, 0, 0, 0L, 0L, -1)); - stats.add(new ThreadPoolStats.Stats("m", 3, 0, 0, 0, 0, 0L, 0L, -1)); - stats.add(new ThreadPoolStats.Stats("m", 1, 0, 0, 0, 0, 0L, 0L, -1)); - stats.add(new ThreadPoolStats.Stats("d", -1, 0, 0, 0, 0, 0L, 0L, -1)); - stats.add(new ThreadPoolStats.Stats("m", 2, 0, 0, 0, 0, 0L, 0L, -1)); - stats.add(new ThreadPoolStats.Stats("t", -1, 0, 0, 0, 0, 0L, 0L, -1)); - stats.add(new ThreadPoolStats.Stats("a", -1, 0, 0, 0, 0, 0L, 0L, -1)); + ThreadPoolStats.Stats.Builder defaultStats = new ThreadPoolStats.Stats.Builder().queue(0) + .active(0) + .rejected(0) + .largest(0) + .completed(0L) + .waitTimeNanos(0L) + .parallelism(-1); + stats.add(defaultStats.name("z").threads(-1).build()); + stats.add(defaultStats.name("m").threads(3).build()); + stats.add(defaultStats.name("m").threads(1).build()); + stats.add(defaultStats.name("d").threads(-1).build()); + stats.add(defaultStats.name("m").threads(2).build()); + stats.add(defaultStats.name("t").threads(-1).build()); + stats.add(defaultStats.name("a").threads(-1).build()); List copy = new ArrayList<>(stats); Collections.sort(copy); @@ -131,7 +138,16 @@ public void testStatsCompareToWithParallelism() { } public void testStatsGetters() { - ThreadPoolStats.Stats stats = new ThreadPoolStats.Stats("test", 1, 2, 3, 4L, 5, 6L, 7L, 8); + ThreadPoolStats.Stats stats = new ThreadPoolStats.Stats.Builder().name("test") + .threads(1) + .queue(2) + .active(3) + .rejected(4L) + .largest(5) + .completed(6L) + .waitTimeNanos(7L) + .parallelism(8) + .build(); assertEquals("test", stats.getName()); assertEquals(1, stats.getThreads()); assertEquals(2, stats.getQueue()); @@ -147,11 +163,18 @@ public void testThreadPoolStatsToXContent() throws IOException { try (BytesStreamOutput os = new BytesStreamOutput()) { List stats = new ArrayList<>(); - stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.SEARCH, -1, 0, 0, 0, 0, 0L, 0L, -1)); - stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.WARMER, -1, 0, 0, 0, 0, 0L, -1L, -1)); - stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.GENERIC, -1, 0, 0, 0, 0, 0L, -1L, -1)); - stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.FORCE_MERGE, -1, 0, 0, 0, 0, 0L, -1L, -1)); - stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.SAME, -1, 0, 0, 0, 0, 0L, -1L, -1)); + ThreadPoolStats.Stats.Builder defaultStats = new ThreadPoolStats.Stats.Builder().threads(-1) + .queue(0) + .active(0) + .rejected(0) + .largest(0) + .completed(0L) + .parallelism(-1); + stats.add(defaultStats.name(ThreadPool.Names.SEARCH).waitTimeNanos(0L).build()); + stats.add(defaultStats.name(ThreadPool.Names.WARMER).waitTimeNanos(-1L).build()); + stats.add(defaultStats.name(ThreadPool.Names.GENERIC).waitTimeNanos(-1L).build()); + stats.add(defaultStats.name(ThreadPool.Names.FORCE_MERGE).waitTimeNanos(-1L).build()); + stats.add(defaultStats.name(ThreadPool.Names.SAME).waitTimeNanos(-1L).build()); ThreadPoolStats threadPoolStats = new ThreadPoolStats(stats); try (XContentBuilder builder = new XContentBuilder(MediaTypeRegistry.JSON.xContent(), os)) {