diff --git a/CHANGELOG.md b/CHANGELOG.md index ed39b9d8cbc3c..32143b3d7f13f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Implement FieldMappingIngestionMessageMapper for pull-based ingestion ([#20729](https://github.com/opensearch-project/OpenSearch/pull/20729)) - Added support of WarmerRefreshListener in NRTReplicationEngine to trigger warmer after replication on replica shards ([#20650](https://github.com/opensearch-project/OpenSearch/pull/20650)) - WLM group custom search settings - groundwork and timeout ([#20536](https://github.com/opensearch-project/OpenSearch/issues/20536)) +- Expose JVM runtime metrics via telemetry framework ([#20844](https://github.com/opensearch-project/OpenSearch/pull/20844)) ### Changed - Make telemetry `Tags` immutable ([#20788](https://github.com/opensearch-project/OpenSearch/pull/20788)) diff --git a/server/src/main/java/org/opensearch/monitor/NodeRuntimeMetrics.java b/server/src/main/java/org/opensearch/monitor/NodeRuntimeMetrics.java new file mode 100644 index 0000000000000..d995ba28a44c1 --- /dev/null +++ b/server/src/main/java/org/opensearch/monitor/NodeRuntimeMetrics.java @@ -0,0 +1,427 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.monitor; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.monitor.jvm.JvmService; +import org.opensearch.monitor.jvm.JvmStats; +import org.opensearch.monitor.os.OsProbe; +import org.opensearch.monitor.process.ProcessProbe; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.tags.Tags; + +import java.io.Closeable; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Registers pull-based gauges for node-level runtime metrics covering JVM memory + * (heap, non-heap, per-pool), GC collectors, buffer pools, threads (including + * per-state counts), class loading, uptime, and CPU usage. + * + *

All JVM gauge suppliers read through {@link JvmService#stats()}, which caches + * the {@link JvmStats} snapshot with a 1-second TTL. A single collection sweep + * reuses one snapshot across all gauges with no redundant MXBean calls. + * + *

Memory pool, GC collector, and buffer pool gauges are discovered dynamically + * from the initial {@link JvmStats} snapshot and tagged by name, so they work + * identically across G1, Parallel, CMS, ZGC, and other collector implementations. + * + * @opensearch.internal + */ +public class NodeRuntimeMetrics implements Closeable { + + private static final Logger logger = LogManager.getLogger(NodeRuntimeMetrics.class); + + // Units + static final String UNIT_BYTES = "bytes"; + static final String UNIT_SECONDS = "s"; + static final String UNIT_1 = "1"; + + // Tag keys + static final String TAG_TYPE = "type"; + static final String TAG_POOL = "pool"; + static final String TAG_GC = "gc"; + static final String TAG_STATE = "state"; + + // Memory + static final String JVM_MEMORY_USED = "jvm.memory.used"; + static final String JVM_MEMORY_COMMITTED = "jvm.memory.committed"; + static final String JVM_MEMORY_LIMIT = "jvm.memory.limit"; + static final String JVM_MEMORY_USED_AFTER_LAST_GC = "jvm.memory.used_after_last_gc"; + + // GC + static final String JVM_GC_DURATION = "jvm.gc.duration"; + static final String JVM_GC_COUNT = "jvm.gc.count"; + + // Buffer pools + static final String JVM_BUFFER_MEMORY_USED = "jvm.buffer.memory.used"; + static final String JVM_BUFFER_MEMORY_LIMIT = "jvm.buffer.memory.limit"; + static final String JVM_BUFFER_COUNT = "jvm.buffer.count"; + + // Threads + static final String JVM_THREAD_COUNT = "jvm.thread.count"; + + // Classes + static final String JVM_CLASS_COUNT = "jvm.class.count"; + static final String JVM_CLASS_LOADED = "jvm.class.loaded"; + static final String JVM_CLASS_UNLOADED = "jvm.class.unloaded"; + + // CPU + static final String JVM_CPU_RECENT_UTILIZATION = "jvm.cpu.recent_utilization"; + static final String JVM_SYSTEM_CPU_UTILIZATION = "jvm.system.cpu.utilization"; + + // Uptime + static final String JVM_UPTIME = "jvm.uptime"; + + private final JvmService jvmService; + private final List gaugeHandles = new ArrayList<>(); + private final AtomicBoolean closed = new AtomicBoolean(); + + // Thread state snapshot cache — shared across all per-state gauges + private final ThreadMXBean threadMXBean; + private long[] threadStateCounts = new long[Thread.State.values().length]; + private long threadStateTimestamp; + private static final long CACHE_TTL_MS = 1000; + + public NodeRuntimeMetrics(MetricsRegistry registry, JvmService jvmService, ProcessProbe processProbe, OsProbe osProbe) { + this(registry, jvmService, processProbe, osProbe, ManagementFactory.getThreadMXBean()); + } + + NodeRuntimeMetrics( + MetricsRegistry registry, + JvmService jvmService, + ProcessProbe processProbe, + OsProbe osProbe, + ThreadMXBean threadMXBean + ) { + this.jvmService = Objects.requireNonNull(jvmService, "jvmService"); + this.threadMXBean = Objects.requireNonNull(threadMXBean, "threadMXBean"); + Objects.requireNonNull(registry, "registry"); + Objects.requireNonNull(processProbe, "processProbe"); + Objects.requireNonNull(osProbe, "osProbe"); + + try { + registerMemoryGauges(registry); + registerGcGauges(registry); + registerBufferPoolGauges(registry); + registerThreadGauges(registry); + registerClassGauges(registry); + registerUptimeGauge(registry); + registerCpuGauges(registry, processProbe, osProbe); + } catch (Exception e) { + closeQuietly(); + throw e; + } + + logger.debug("Registered {} node runtime metric gauges", gaugeHandles.size()); + } + + // ---- Memory (heap aggregate, non-heap aggregate, per-pool) ---- + + private void registerMemoryGauges(MetricsRegistry registry) { + Tags heapTags = Tags.of(TAG_TYPE, "heap"); + Tags nonHeapTags = Tags.of(TAG_TYPE, "non_heap"); + + gaugeHandles.add( + registry.createGauge( + JVM_MEMORY_USED, + "JVM heap memory used", + UNIT_BYTES, + () -> (double) jvmService.stats().getMem().getHeapUsed().getBytes(), + heapTags + ) + ); + gaugeHandles.add( + registry.createGauge( + JVM_MEMORY_COMMITTED, + "JVM heap memory committed", + UNIT_BYTES, + () -> (double) jvmService.stats().getMem().getHeapCommitted().getBytes(), + heapTags + ) + ); + gaugeHandles.add( + registry.createGauge( + JVM_MEMORY_LIMIT, + "JVM heap memory max", + UNIT_BYTES, + () -> (double) jvmService.stats().getMem().getHeapMax().getBytes(), + heapTags + ) + ); + gaugeHandles.add( + registry.createGauge( + JVM_MEMORY_USED, + "JVM non-heap memory used", + UNIT_BYTES, + () -> (double) jvmService.stats().getMem().getNonHeapUsed().getBytes(), + nonHeapTags + ) + ); + gaugeHandles.add( + registry.createGauge( + JVM_MEMORY_COMMITTED, + "JVM non-heap memory committed", + UNIT_BYTES, + () -> (double) jvmService.stats().getMem().getNonHeapCommitted().getBytes(), + nonHeapTags + ) + ); + + // Per-pool gauges + JvmStats stats = jvmService.stats(); + for (JvmStats.MemoryPool pool : stats.getMem()) { + String poolName = pool.getName(); + Tags poolTags = Tags.of(TAG_POOL, poolName); + gaugeHandles.add(registry.createGauge(JVM_MEMORY_USED, "JVM memory pool used", UNIT_BYTES, () -> { + JvmStats.MemoryPool p = getPoolByName(poolName); + return p == null ? 0.0 : (double) p.getUsed().getBytes(); + }, poolTags)); + gaugeHandles.add(registry.createGauge(JVM_MEMORY_LIMIT, "JVM memory pool max", UNIT_BYTES, () -> { + JvmStats.MemoryPool p = getPoolByName(poolName); + if (p == null) return 0.0; + long bytes = p.getMax().getBytes(); + return bytes < 0 ? 0.0 : (double) bytes; + }, poolTags)); + gaugeHandles.add(registry.createGauge(JVM_MEMORY_USED_AFTER_LAST_GC, "JVM memory pool used after last GC", UNIT_BYTES, () -> { + JvmStats.MemoryPool p = getPoolByName(poolName); + return p == null ? 0.0 : (double) p.getLastGcStats().getUsed().getBytes(); + }, poolTags)); + } + } + + private JvmStats.MemoryPool getPoolByName(String poolName) { + for (JvmStats.MemoryPool pool : jvmService.stats().getMem()) { + if (pool.getName().equals(poolName)) { + return pool; + } + } + return null; + } + + // ---- GC (cumulative gauges) ---- + // TODO: Add event-driven GC pause duration histograms via GarbageCollectorMXBean + // notification listeners. Cumulative gauges give rate-of-GC-time; histograms would + // enable percentile analysis of individual pause durations (p50, p99). + + private void registerGcGauges(MetricsRegistry registry) { + JvmStats stats = jvmService.stats(); + for (JvmStats.GarbageCollector gc : stats.getGc()) { + String collectorName = gc.getName(); + Tags gcTags = Tags.of(TAG_GC, collectorName); + gaugeHandles.add(registry.createGauge(JVM_GC_DURATION, "GC cumulative collection time", UNIT_SECONDS, () -> { + JvmStats.GarbageCollector c = getCollectorByName(collectorName); + return c == null ? 0.0 : c.getCollectionTime().getMillis() / 1000.0; + }, gcTags)); + gaugeHandles.add(registry.createGauge(JVM_GC_COUNT, "GC collection count", UNIT_1, () -> { + JvmStats.GarbageCollector c = getCollectorByName(collectorName); + return c == null ? 0.0 : (double) c.getCollectionCount(); + }, gcTags)); + } + } + + private JvmStats.GarbageCollector getCollectorByName(String collectorName) { + for (JvmStats.GarbageCollector gc : jvmService.stats().getGc()) { + if (gc.getName().equals(collectorName)) { + return gc; + } + } + return null; + } + + // ---- Buffer pools ---- + + private void registerBufferPoolGauges(MetricsRegistry registry) { + JvmStats stats = jvmService.stats(); + for (JvmStats.BufferPool bp : stats.getBufferPools()) { + String bpName = bp.getName(); + Tags bpTags = Tags.of(TAG_POOL, bpName); + gaugeHandles.add(registry.createGauge(JVM_BUFFER_MEMORY_USED, "Buffer pool memory used", UNIT_BYTES, () -> { + JvmStats.BufferPool b = getBufferPoolByName(bpName); + return b == null ? 0.0 : (double) b.getUsed().getBytes(); + }, bpTags)); + gaugeHandles.add(registry.createGauge(JVM_BUFFER_MEMORY_LIMIT, "Buffer pool total capacity", UNIT_BYTES, () -> { + JvmStats.BufferPool b = getBufferPoolByName(bpName); + return b == null ? 0.0 : (double) b.getTotalCapacity().getBytes(); + }, bpTags)); + gaugeHandles.add(registry.createGauge(JVM_BUFFER_COUNT, "Buffer pool buffer count", UNIT_1, () -> { + JvmStats.BufferPool b = getBufferPoolByName(bpName); + return b == null ? 0.0 : (double) b.getCount(); + }, bpTags)); + } + } + + private JvmStats.BufferPool getBufferPoolByName(String bpName) { + for (JvmStats.BufferPool bp : jvmService.stats().getBufferPools()) { + if (bp.getName().equals(bpName)) { + return bp; + } + } + return null; + } + + // ---- Threads ---- + + private void registerThreadGauges(MetricsRegistry registry) { + gaugeHandles.add( + registry.createGauge( + JVM_THREAD_COUNT, + "JVM thread count", + UNIT_1, + () -> (double) jvmService.stats().getThreads().getCount(), + Tags.EMPTY + ) + ); + + for (Thread.State state : Thread.State.values()) { + String stateName = state.name().toLowerCase(Locale.ROOT); + Tags stateTags = Tags.of(TAG_STATE, stateName); + gaugeHandles.add( + registry.createGauge( + JVM_THREAD_COUNT, + "JVM threads in this state", + UNIT_1, + () -> (double) getThreadStateCount(state), + stateTags + ) + ); + } + } + + /** + * Returns the count of threads in the given state, using a cached snapshot + * that is refreshed at most once per second. This ensures that during a + * single collection sweep all per-state gauges share one getThreadInfo() call. + */ + private synchronized long getThreadStateCount(Thread.State state) { + long now = System.currentTimeMillis(); + if (now - threadStateTimestamp > CACHE_TTL_MS) { + refreshThreadStateCounts(); + threadStateTimestamp = now; + } + return threadStateCounts[state.ordinal()]; + } + + private void refreshThreadStateCounts() { + long[] counts = new long[Thread.State.values().length]; + try { + long[] threadIds = threadMXBean.getAllThreadIds(); + ThreadInfo[] infos = threadMXBean.getThreadInfo(threadIds); + for (ThreadInfo info : infos) { + if (info != null) { + counts[info.getThreadState().ordinal()]++; + } + } + } catch (Exception e) { + logger.debug("Failed to collect thread state counts", e); + } + threadStateCounts = counts; + } + + // ---- Classes ---- + + private void registerClassGauges(MetricsRegistry registry) { + gaugeHandles.add( + registry.createGauge( + JVM_CLASS_COUNT, + "Currently loaded class count", + UNIT_1, + () -> (double) jvmService.stats().getClasses().getLoadedClassCount(), + Tags.EMPTY + ) + ); + gaugeHandles.add( + registry.createGauge( + JVM_CLASS_LOADED, + "Total loaded class count since JVM start", + UNIT_1, + () -> (double) jvmService.stats().getClasses().getTotalLoadedClassCount(), + Tags.EMPTY + ) + ); + gaugeHandles.add( + registry.createGauge( + JVM_CLASS_UNLOADED, + "Total unloaded class count since JVM start", + UNIT_1, + () -> (double) jvmService.stats().getClasses().getUnloadedClassCount(), + Tags.EMPTY + ) + ); + } + + // ---- Uptime ---- + + private void registerUptimeGauge(MetricsRegistry registry) { + gaugeHandles.add( + registry.createGauge( + JVM_UPTIME, + "JVM uptime", + UNIT_SECONDS, + () -> jvmService.stats().getUptime().getMillis() / 1000.0, + Tags.EMPTY + ) + ); + } + + // ---- CPU ---- + + private void registerCpuGauges(MetricsRegistry registry, ProcessProbe processProbe, OsProbe osProbe) { + gaugeHandles.add( + registry.createGauge( + JVM_CPU_RECENT_UTILIZATION, + "Recent JVM CPU utilization", + UNIT_1, + () -> clampCpuPercent(processProbe.getProcessCpuPercent()), + Tags.EMPTY + ) + ); + gaugeHandles.add( + registry.createGauge( + JVM_SYSTEM_CPU_UTILIZATION, + "System CPU utilization", + UNIT_1, + () -> clampCpuPercent(osProbe.getSystemCpuPercent()), + Tags.EMPTY + ) + ); + } + + private static double clampCpuPercent(short pct) { + return pct < 0 ? 0.0 : pct / 100.0; + } + + @Override + public void close() throws IOException { + if (closed.compareAndSet(false, true) == false) { + return; + } + closeQuietly(); + } + + private void closeQuietly() { + for (Closeable handle : gaugeHandles) { + try { + handle.close(); + } catch (IOException e) { + logger.debug("Failed to close gauge handle", e); + } + } + gaugeHandles.clear(); + } +} diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 445368507f506..f051abfffacf2 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -199,10 +199,13 @@ import org.opensearch.ingest.IngestService; import org.opensearch.ingest.SystemIngestPipelineCache; import org.opensearch.monitor.MonitorService; +import org.opensearch.monitor.NodeRuntimeMetrics; import org.opensearch.monitor.fs.FsHealthService; import org.opensearch.monitor.fs.FsProbe; import org.opensearch.monitor.fs.FsServiceProvider; import org.opensearch.monitor.jvm.JvmInfo; +import org.opensearch.monitor.os.OsProbe; +import org.opensearch.monitor.process.ProcessProbe; import org.opensearch.node.remotestore.RemoteStoreNodeService; import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService; import org.opensearch.node.resource.tracker.NodeResourceUsageTracker; @@ -1035,6 +1038,14 @@ protected Node(final Environment initialEnvironment, Collection clas ); final MonitorService monitorService = new MonitorService(settings, threadPool, fsServiceProvider); + final NodeRuntimeMetrics nodeRuntimeMetrics = new NodeRuntimeMetrics( + metricsRegistry, + monitorService.jvmService(), + ProcessProbe.getInstance(), + OsProbe.getInstance() + ); + resourcesToClose.add(nodeRuntimeMetrics); + final AliasValidator aliasValidator = new AliasValidator(); final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService, systemIndices); diff --git a/server/src/test/java/org/opensearch/monitor/NodeRuntimeMetricsTests.java b/server/src/test/java/org/opensearch/monitor/NodeRuntimeMetricsTests.java new file mode 100644 index 0000000000000..44f9e95e9e56c --- /dev/null +++ b/server/src/test/java/org/opensearch/monitor/NodeRuntimeMetricsTests.java @@ -0,0 +1,604 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.monitor; + +import org.opensearch.common.settings.Settings; +import org.opensearch.monitor.jvm.JvmService; +import org.opensearch.monitor.jvm.JvmStats; +import org.opensearch.monitor.os.OsProbe; +import org.opensearch.monitor.process.ProcessProbe; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.tags.Tags; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.function.Supplier; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class NodeRuntimeMetricsTests extends OpenSearchTestCase { + + private MetricsRegistry registry; + private JvmService jvmService; + private ProcessProbe processProbe; + private OsProbe osProbe; + private List createdHandles; + + @Override + public void setUp() throws Exception { + super.setUp(); + registry = mock(MetricsRegistry.class); + jvmService = new JvmService(Settings.EMPTY); + processProbe = ProcessProbe.getInstance(); + osProbe = OsProbe.getInstance(); + createdHandles = new ArrayList<>(); + + when(registry.createGauge(anyString(), anyString(), anyString(), any(Supplier.class), any(Tags.class))).thenAnswer(invocation -> { + Closeable handle = mock(Closeable.class); + createdHandles.add(handle); + return handle; + }); + } + + public void testRegistersMemoryGauges() { + new NodeRuntimeMetrics(registry, jvmService, processProbe, osProbe); + + Tags heapTags = Tags.of(NodeRuntimeMetrics.TAG_TYPE, "heap"); + Tags nonHeapTags = Tags.of(NodeRuntimeMetrics.TAG_TYPE, "non_heap"); + + verify(registry).createGauge( + eq(NodeRuntimeMetrics.JVM_MEMORY_USED), + anyString(), + eq(NodeRuntimeMetrics.UNIT_BYTES), + any(), + eq(heapTags) + ); + verify(registry).createGauge( + eq(NodeRuntimeMetrics.JVM_MEMORY_COMMITTED), + anyString(), + eq(NodeRuntimeMetrics.UNIT_BYTES), + any(), + eq(heapTags) + ); + verify(registry).createGauge( + eq(NodeRuntimeMetrics.JVM_MEMORY_LIMIT), + anyString(), + eq(NodeRuntimeMetrics.UNIT_BYTES), + any(), + eq(heapTags) + ); + verify(registry).createGauge( + eq(NodeRuntimeMetrics.JVM_MEMORY_USED), + anyString(), + eq(NodeRuntimeMetrics.UNIT_BYTES), + any(), + eq(nonHeapTags) + ); + verify(registry).createGauge( + eq(NodeRuntimeMetrics.JVM_MEMORY_COMMITTED), + anyString(), + eq(NodeRuntimeMetrics.UNIT_BYTES), + any(), + eq(nonHeapTags) + ); + } + + public void testRegistersMemoryPoolGauges() { + new NodeRuntimeMetrics(registry, jvmService, processProbe, osProbe); + + JvmStats stats = jvmService.stats(); + for (JvmStats.MemoryPool pool : stats.getMem()) { + Tags expectedTags = Tags.of(NodeRuntimeMetrics.TAG_POOL, pool.getName()); + verify(registry).createGauge( + eq(NodeRuntimeMetrics.JVM_MEMORY_USED), + anyString(), + eq(NodeRuntimeMetrics.UNIT_BYTES), + any(), + eq(expectedTags) + ); + verify(registry).createGauge( + eq(NodeRuntimeMetrics.JVM_MEMORY_LIMIT), + anyString(), + eq(NodeRuntimeMetrics.UNIT_BYTES), + any(), + eq(expectedTags) + ); + verify(registry).createGauge( + eq(NodeRuntimeMetrics.JVM_MEMORY_USED_AFTER_LAST_GC), + anyString(), + eq(NodeRuntimeMetrics.UNIT_BYTES), + any(), + eq(expectedTags) + ); + } + } + + public void testRegistersGcGauges() { + new NodeRuntimeMetrics(registry, jvmService, processProbe, osProbe); + + JvmStats stats = jvmService.stats(); + for (JvmStats.GarbageCollector gc : stats.getGc()) { + Tags expectedTags = Tags.of(NodeRuntimeMetrics.TAG_GC, gc.getName()); + verify(registry).createGauge( + eq(NodeRuntimeMetrics.JVM_GC_DURATION), + anyString(), + eq(NodeRuntimeMetrics.UNIT_SECONDS), + any(), + eq(expectedTags) + ); + verify(registry).createGauge( + eq(NodeRuntimeMetrics.JVM_GC_COUNT), + anyString(), + eq(NodeRuntimeMetrics.UNIT_1), + any(), + eq(expectedTags) + ); + } + } + + public void testRegistersBufferPoolGauges() { + new NodeRuntimeMetrics(registry, jvmService, processProbe, osProbe); + + JvmStats stats = jvmService.stats(); + for (JvmStats.BufferPool bp : stats.getBufferPools()) { + Tags expectedTags = Tags.of(NodeRuntimeMetrics.TAG_POOL, bp.getName()); + verify(registry).createGauge( + eq(NodeRuntimeMetrics.JVM_BUFFER_MEMORY_USED), + anyString(), + eq(NodeRuntimeMetrics.UNIT_BYTES), + any(), + eq(expectedTags) + ); + verify(registry).createGauge( + eq(NodeRuntimeMetrics.JVM_BUFFER_MEMORY_LIMIT), + anyString(), + eq(NodeRuntimeMetrics.UNIT_BYTES), + any(), + eq(expectedTags) + ); + verify(registry).createGauge( + eq(NodeRuntimeMetrics.JVM_BUFFER_COUNT), + anyString(), + eq(NodeRuntimeMetrics.UNIT_1), + any(), + eq(expectedTags) + ); + } + } + + public void testRegistersThreadGauges() { + new NodeRuntimeMetrics(registry, jvmService, processProbe, osProbe); + + verify(registry).createGauge( + eq(NodeRuntimeMetrics.JVM_THREAD_COUNT), + anyString(), + eq(NodeRuntimeMetrics.UNIT_1), + any(), + eq(Tags.EMPTY) + ); + + for (Thread.State state : Thread.State.values()) { + Tags expectedTags = Tags.of(NodeRuntimeMetrics.TAG_STATE, state.name().toLowerCase(Locale.ROOT)); + verify(registry).createGauge( + eq(NodeRuntimeMetrics.JVM_THREAD_COUNT), + anyString(), + eq(NodeRuntimeMetrics.UNIT_1), + any(), + eq(expectedTags) + ); + } + } + + public void testRegistersClassGauges() { + new NodeRuntimeMetrics(registry, jvmService, processProbe, osProbe); + + verify(registry).createGauge( + eq(NodeRuntimeMetrics.JVM_CLASS_COUNT), + anyString(), + eq(NodeRuntimeMetrics.UNIT_1), + any(), + eq(Tags.EMPTY) + ); + verify(registry).createGauge( + eq(NodeRuntimeMetrics.JVM_CLASS_LOADED), + anyString(), + eq(NodeRuntimeMetrics.UNIT_1), + any(), + eq(Tags.EMPTY) + ); + verify(registry).createGauge( + eq(NodeRuntimeMetrics.JVM_CLASS_UNLOADED), + anyString(), + eq(NodeRuntimeMetrics.UNIT_1), + any(), + eq(Tags.EMPTY) + ); + } + + public void testRegistersUptimeGauge() { + new NodeRuntimeMetrics(registry, jvmService, processProbe, osProbe); + + verify(registry).createGauge( + eq(NodeRuntimeMetrics.JVM_UPTIME), + anyString(), + eq(NodeRuntimeMetrics.UNIT_SECONDS), + any(), + eq(Tags.EMPTY) + ); + } + + public void testRegistersCpuGauges() { + new NodeRuntimeMetrics(registry, jvmService, processProbe, osProbe); + + verify(registry).createGauge( + eq(NodeRuntimeMetrics.JVM_CPU_RECENT_UTILIZATION), + anyString(), + eq(NodeRuntimeMetrics.UNIT_1), + any(), + eq(Tags.EMPTY) + ); + verify(registry).createGauge( + eq(NodeRuntimeMetrics.JVM_SYSTEM_CPU_UTILIZATION), + anyString(), + eq(NodeRuntimeMetrics.UNIT_1), + any(), + eq(Tags.EMPTY) + ); + } + + public void testTotalGaugeCount() { + new NodeRuntimeMetrics(registry, jvmService, processProbe, osProbe); + + JvmStats stats = jvmService.stats(); + int memoryPools = 0; + for (JvmStats.MemoryPool ignored : stats.getMem()) + memoryPools++; + int gcCollectors = stats.getGc().getCollectors().length; + int bufferPools = stats.getBufferPools().size(); + + int expected = 5 // memory aggregates (3 heap + 2 non-heap) + + (memoryPools * 3) // memory pools (used + limit + used_after_last_gc per pool) + + (gcCollectors * 2) // GC (duration + count per collector) + + (bufferPools * 3) // buffer pools (used + limit + count per pool) + + 1 + Thread.State.values().length // threads (total + per-state) + + 3 // classes + + 1 // uptime + + 2; // CPU + + verify(registry, org.mockito.Mockito.times(expected)).createGauge( + anyString(), + anyString(), + anyString(), + any(Supplier.class), + any(Tags.class) + ); + } + + public void testCloseClosesAllHandles() throws Exception { + NodeRuntimeMetrics metrics = new NodeRuntimeMetrics(registry, jvmService, processProbe, osProbe); + int handleCount = createdHandles.size(); + assertTrue("Expected gauge handles to be created", handleCount > 0); + + metrics.close(); + + for (Closeable handle : createdHandles) { + verify(handle).close(); + } + } + + public void testCloseHandlesErrors() throws Exception { + NodeRuntimeMetrics metrics = new NodeRuntimeMetrics(registry, jvmService, processProbe, osProbe); + for (Closeable handle : createdHandles) { + doThrow(new IOException("test")).when(handle).close(); + } + + metrics.close(); + } + + public void testCloseIdempotent() throws Exception { + NodeRuntimeMetrics metrics = new NodeRuntimeMetrics(registry, jvmService, processProbe, osProbe); + metrics.close(); + metrics.close(); + } + + @SuppressWarnings("unchecked") + public void testHeapMemoryGaugeSupplierReturnsValidValue() { + Tags heapTags = Tags.of(NodeRuntimeMetrics.TAG_TYPE, "heap"); + final Supplier[] captured = new Supplier[1]; + when(registry.createGauge(eq(NodeRuntimeMetrics.JVM_MEMORY_USED), anyString(), anyString(), any(Supplier.class), eq(heapTags))) + .thenAnswer(invocation -> { + captured[0] = invocation.getArgument(3); + return mock(Closeable.class); + }); + + new NodeRuntimeMetrics(registry, jvmService, processProbe, osProbe); + + assertNotNull(captured[0]); + double value = captured[0].get(); + assertTrue("Heap used should be positive", value > 0); + } + + @SuppressWarnings("unchecked") + public void testUptimeGaugeSupplierReturnsValidValue() { + final Supplier[] captured = new Supplier[1]; + when(registry.createGauge(eq(NodeRuntimeMetrics.JVM_UPTIME), anyString(), anyString(), any(Supplier.class), any(Tags.class))) + .thenAnswer(invocation -> { + captured[0] = invocation.getArgument(3); + return mock(Closeable.class); + }); + + new NodeRuntimeMetrics(registry, jvmService, processProbe, osProbe); + + assertNotNull(captured[0]); + double value = captured[0].get(); + assertTrue("Uptime should be positive", value > 0); + } + + @SuppressWarnings("unchecked") + public void testThreadStateGaugesReturnNonNegative() { + List> stateSuppliers = new ArrayList<>(); + when(registry.createGauge(eq(NodeRuntimeMetrics.JVM_THREAD_COUNT), anyString(), anyString(), any(Supplier.class), any(Tags.class))) + .thenAnswer(invocation -> { + Tags tags = invocation.getArgument(4); + if (!Tags.EMPTY.equals(tags)) { + stateSuppliers.add(invocation.getArgument(3)); + } + return mock(Closeable.class); + }); + + new NodeRuntimeMetrics(registry, jvmService, processProbe, osProbe); + + assertEquals(Thread.State.values().length, stateSuppliers.size()); + double total = 0; + for (Supplier supplier : stateSuppliers) { + double count = supplier.get(); + assertTrue("Thread state count should be >= 0", count >= 0); + total += count; + } + assertTrue("Total thread state count should be positive", total > 0); + } + + @SuppressWarnings("unchecked") + public void testGcDurationInSeconds() { + final Supplier[] captured = new Supplier[1]; + when( + registry.createGauge( + eq(NodeRuntimeMetrics.JVM_GC_DURATION), + anyString(), + eq(NodeRuntimeMetrics.UNIT_SECONDS), + any(Supplier.class), + any(Tags.class) + ) + ).thenAnswer(invocation -> { + captured[0] = invocation.getArgument(3); + return mock(Closeable.class); + }); + + new NodeRuntimeMetrics(registry, jvmService, processProbe, osProbe); + + if (captured[0] != null) { + double value = captured[0].get(); + assertTrue("GC duration should be >= 0", value >= 0); + } + } + + @SuppressWarnings("unchecked") + public void testCpuUtilizationIsRatio() { + final Supplier[] captured = new Supplier[1]; + when( + registry.createGauge( + eq(NodeRuntimeMetrics.JVM_CPU_RECENT_UTILIZATION), + anyString(), + anyString(), + any(Supplier.class), + any(Tags.class) + ) + ).thenAnswer(invocation -> { + captured[0] = invocation.getArgument(3); + return mock(Closeable.class); + }); + + new NodeRuntimeMetrics(registry, jvmService, processProbe, osProbe); + + assertNotNull(captured[0]); + double value = captured[0].get(); + assertTrue("CPU utilization should be >= 0", value >= 0); + assertTrue("CPU utilization should be <= 1.0", value <= 1.0); + } + + public void testDynamicPoolDiscovery() { + JvmStats stats = jvmService.stats(); + int poolCount = 0; + for (JvmStats.MemoryPool ignored : stats.getMem()) + poolCount++; + + assertTrue("JVM should have at least one memory pool", poolCount > 0); + + new NodeRuntimeMetrics(registry, jvmService, processProbe, osProbe); + + verify(registry, atLeastOnce()).createGauge( + eq(NodeRuntimeMetrics.JVM_MEMORY_USED), + anyString(), + anyString(), + any(), + any(Tags.class) + ); + } + + @SuppressWarnings("unchecked") + public void testCpuGuardAgainstNegativeValues() { + ProcessProbe negativeCpuProbe = mock(ProcessProbe.class); + when(negativeCpuProbe.getProcessCpuPercent()).thenReturn((short) -1); + OsProbe negativeOsProbe = mock(OsProbe.class); + when(negativeOsProbe.getSystemCpuPercent()).thenReturn((short) -1); + + final Supplier[] processCpu = new Supplier[1]; + final Supplier[] systemCpu = new Supplier[1]; + when( + registry.createGauge( + eq(NodeRuntimeMetrics.JVM_CPU_RECENT_UTILIZATION), + anyString(), + anyString(), + any(Supplier.class), + any(Tags.class) + ) + ).thenAnswer(invocation -> { + processCpu[0] = invocation.getArgument(3); + return mock(Closeable.class); + }); + when( + registry.createGauge( + eq(NodeRuntimeMetrics.JVM_SYSTEM_CPU_UTILIZATION), + anyString(), + anyString(), + any(Supplier.class), + any(Tags.class) + ) + ).thenAnswer(invocation -> { + systemCpu[0] = invocation.getArgument(3); + return mock(Closeable.class); + }); + + new NodeRuntimeMetrics(registry, jvmService, negativeCpuProbe, negativeOsProbe); + + assertNotNull(processCpu[0]); + assertNotNull(systemCpu[0]); + assertEquals(0.0, processCpu[0].get(), 0.0); + assertEquals(0.0, systemCpu[0].get(), 0.0); + } + + public void testConstructorCleansUpOnFailure() { + MetricsRegistry failingRegistry = mock(MetricsRegistry.class); + Closeable successHandle = mock(Closeable.class); + when(failingRegistry.createGauge(anyString(), anyString(), anyString(), any(Supplier.class), any(Tags.class))).thenReturn( + successHandle + ).thenReturn(successHandle).thenThrow(new RuntimeException("registration failure")); + + expectThrows(RuntimeException.class, () -> new NodeRuntimeMetrics(failingRegistry, jvmService, processProbe, osProbe)); + } + + public void testDynamicBufferPoolDiscovery() { + JvmStats stats = jvmService.stats(); + int bpCount = stats.getBufferPools().size(); + + assertTrue("JVM should have at least one buffer pool", bpCount > 0); + + new NodeRuntimeMetrics(registry, jvmService, processProbe, osProbe); + + verify(registry, atLeastOnce()).createGauge( + eq(NodeRuntimeMetrics.JVM_BUFFER_MEMORY_USED), + anyString(), + anyString(), + any(), + any(Tags.class) + ); + } + + @SuppressWarnings("unchecked") + public void testMemoryPoolSupplierReturnsValidValues() { + JvmStats stats = jvmService.stats(); + JvmStats.MemoryPool firstPool = stats.getMem().iterator().next(); + Tags poolTags = Tags.of(NodeRuntimeMetrics.TAG_POOL, firstPool.getName()); + + List> poolSuppliers = new ArrayList<>(); + when(registry.createGauge(anyString(), anyString(), anyString(), any(Supplier.class), eq(poolTags))).thenAnswer(invocation -> { + poolSuppliers.add(invocation.getArgument(3)); + return mock(Closeable.class); + }); + + new NodeRuntimeMetrics(registry, jvmService, processProbe, osProbe); + + assertEquals("Expected 3 per-pool gauges (used, limit, used_after_last_gc)", 3, poolSuppliers.size()); + for (Supplier supplier : poolSuppliers) { + double value = supplier.get(); + assertTrue("Pool metric should be >= 0", value >= 0); + } + } + + @SuppressWarnings("unchecked") + public void testGcSupplierReturnsValidValues() { + JvmStats stats = jvmService.stats(); + JvmStats.GarbageCollector firstGc = stats.getGc().getCollectors()[0]; + Tags gcTags = Tags.of(NodeRuntimeMetrics.TAG_GC, firstGc.getName()); + + List> gcSuppliers = new ArrayList<>(); + when(registry.createGauge(anyString(), anyString(), anyString(), any(Supplier.class), eq(gcTags))).thenAnswer(invocation -> { + gcSuppliers.add(invocation.getArgument(3)); + return mock(Closeable.class); + }); + + new NodeRuntimeMetrics(registry, jvmService, processProbe, osProbe); + + assertEquals("Expected 2 per-collector gauges (duration, count)", 2, gcSuppliers.size()); + for (Supplier supplier : gcSuppliers) { + double value = supplier.get(); + assertTrue("GC metric should be >= 0", value >= 0); + } + } + + @SuppressWarnings("unchecked") + public void testBufferPoolSupplierReturnsValidValues() { + JvmStats stats = jvmService.stats(); + JvmStats.BufferPool firstBp = stats.getBufferPools().get(0); + Tags bpTags = Tags.of(NodeRuntimeMetrics.TAG_POOL, firstBp.getName()); + + List> bpSuppliers = new ArrayList<>(); + when(registry.createGauge(anyString(), anyString(), anyString(), any(Supplier.class), eq(bpTags))).thenAnswer(invocation -> { + bpSuppliers.add(invocation.getArgument(3)); + return mock(Closeable.class); + }); + + new NodeRuntimeMetrics(registry, jvmService, processProbe, osProbe); + + assertEquals("Expected 3 per-buffer-pool gauges (used, limit, count)", 3, bpSuppliers.size()); + for (Supplier supplier : bpSuppliers) { + double value = supplier.get(); + assertTrue("Buffer pool metric should be >= 0", value >= 0); + } + } + + @SuppressWarnings("unchecked") + public void testClassLoadingSupplierReturnsValidValues() { + List> classSuppliers = new ArrayList<>(); + when(registry.createGauge(eq(NodeRuntimeMetrics.JVM_CLASS_COUNT), anyString(), anyString(), any(Supplier.class), eq(Tags.EMPTY))) + .thenAnswer(invocation -> { + classSuppliers.add(invocation.getArgument(3)); + return mock(Closeable.class); + }); + when(registry.createGauge(eq(NodeRuntimeMetrics.JVM_CLASS_LOADED), anyString(), anyString(), any(Supplier.class), eq(Tags.EMPTY))) + .thenAnswer(invocation -> { + classSuppliers.add(invocation.getArgument(3)); + return mock(Closeable.class); + }); + when(registry.createGauge(eq(NodeRuntimeMetrics.JVM_CLASS_UNLOADED), anyString(), anyString(), any(Supplier.class), eq(Tags.EMPTY))) + .thenAnswer(invocation -> { + classSuppliers.add(invocation.getArgument(3)); + return mock(Closeable.class); + }); + + new NodeRuntimeMetrics(registry, jvmService, processProbe, osProbe); + + assertEquals("Expected 3 class loading gauges", 3, classSuppliers.size()); + for (Supplier supplier : classSuppliers) { + double value = supplier.get(); + assertTrue("Class loading metric should be >= 0", value >= 0); + } + } +}