Skip to content

Commit bb240f7

Browse files
authored
[HUDI-5777] Support Metrics for Multiple Tables Simultaneously (#7934)
- Allows usage of metrics for multiple tables. This is useful for RFC-20 as well as HoodieMultiTableDeltaStreamer. --------- Co-authored-by: Jonathan Vexler <=>
1 parent b6490c1 commit bb240f7

16 files changed

Lines changed: 314 additions & 209 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,15 @@ public class HoodieLockMetrics {
4747
private transient Timer lockDuration;
4848
private transient Timer lockApiRequestDuration;
4949
private static final Object REGISTRY_LOCK = new Object();
50+
private Metrics metrics;
5051

5152
public HoodieLockMetrics(HoodieWriteConfig writeConfig) {
5253
this.isMetricsEnabled = writeConfig.isLockingMetricsEnabled();
5354
this.writeConfig = writeConfig;
5455

5556
if (isMetricsEnabled) {
56-
MetricRegistry registry = Metrics.getInstance().getRegistry();
57+
metrics = Metrics.getInstance(writeConfig);
58+
MetricRegistry registry = metrics.getRegistry();
5759

5860
lockAttempts = registry.counter(getMetricsName(LOCK_ACQUIRE_ATTEMPTS_COUNTER_NAME));
5961
successfulLockAttempts = registry.counter(getMetricsName(LOCK_ACQUIRE_SUCCESS_COUNTER_NAME));

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java

Lines changed: 48 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
public class HoodieMetrics {
3636

3737
private static final Logger LOG = LogManager.getLogger(HoodieMetrics.class);
38+
39+
private Metrics metrics;
3840
// Some timers
3941
public String rollbackTimerName = null;
4042
public String cleanTimerName = null;
@@ -67,7 +69,7 @@ public HoodieMetrics(HoodieWriteConfig config) {
6769
this.config = config;
6870
this.tableName = config.getTableName();
6971
if (config.isMetricsOn()) {
70-
Metrics.init(config);
72+
metrics = Metrics.getInstance(config);
7173
this.rollbackTimerName = getMetricsName("timer", HoodieTimeline.ROLLBACK_ACTION);
7274
this.cleanTimerName = getMetricsName("timer", HoodieTimeline.CLEAN_ACTION);
7375
this.commitTimerName = getMetricsName("timer", HoodieTimeline.COMMIT_ACTION);
@@ -84,7 +86,11 @@ public HoodieMetrics(HoodieWriteConfig config) {
8486
}
8587

8688
private Timer createTimer(String name) {
87-
return config.isMetricsOn() ? Metrics.getInstance().getRegistry().timer(name) : null;
89+
return config.isMetricsOn() ? metrics.getRegistry().timer(name) : null;
90+
}
91+
92+
public Metrics getMetrics() {
93+
return metrics;
8894
}
8995

9096
public Timer.Context getRollbackCtx() {
@@ -162,20 +168,20 @@ public void updateMetricsForEmptyData(String actionType) {
162168
// No-op if metrics are not of type PROMETHEUS_PUSHGATEWAY.
163169
return;
164170
}
165-
Metrics.registerGauge(getMetricsName(actionType, "totalPartitionsWritten"), 0);
166-
Metrics.registerGauge(getMetricsName(actionType, "totalFilesInsert"), 0);
167-
Metrics.registerGauge(getMetricsName(actionType, "totalFilesUpdate"), 0);
168-
Metrics.registerGauge(getMetricsName(actionType, "totalRecordsWritten"), 0);
169-
Metrics.registerGauge(getMetricsName(actionType, "totalUpdateRecordsWritten"), 0);
170-
Metrics.registerGauge(getMetricsName(actionType, "totalInsertRecordsWritten"), 0);
171-
Metrics.registerGauge(getMetricsName(actionType, "totalRecordsDeleted"), 0);
172-
Metrics.registerGauge(getMetricsName(actionType, "totalBytesWritten"), 0);
173-
Metrics.registerGauge(getMetricsName(actionType, "totalScanTime"), 0);
174-
Metrics.registerGauge(getMetricsName(actionType, "totalCreateTime"), 0);
175-
Metrics.registerGauge(getMetricsName(actionType, "totalUpsertTime"), 0);
176-
Metrics.registerGauge(getMetricsName(actionType, "totalCompactedRecordsUpdated"), 0);
177-
Metrics.registerGauge(getMetricsName(actionType, "totalLogFilesCompacted"), 0);
178-
Metrics.registerGauge(getMetricsName(actionType, "totalLogFilesSize"), 0);
171+
metrics.registerGauge(getMetricsName(actionType, "totalPartitionsWritten"), 0);
172+
metrics.registerGauge(getMetricsName(actionType, "totalFilesInsert"), 0);
173+
metrics.registerGauge(getMetricsName(actionType, "totalFilesUpdate"), 0);
174+
metrics.registerGauge(getMetricsName(actionType, "totalRecordsWritten"), 0);
175+
metrics.registerGauge(getMetricsName(actionType, "totalUpdateRecordsWritten"), 0);
176+
metrics.registerGauge(getMetricsName(actionType, "totalInsertRecordsWritten"), 0);
177+
metrics.registerGauge(getMetricsName(actionType, "totalRecordsDeleted"), 0);
178+
metrics.registerGauge(getMetricsName(actionType, "totalBytesWritten"), 0);
179+
metrics.registerGauge(getMetricsName(actionType, "totalScanTime"), 0);
180+
metrics.registerGauge(getMetricsName(actionType, "totalCreateTime"), 0);
181+
metrics.registerGauge(getMetricsName(actionType, "totalUpsertTime"), 0);
182+
metrics.registerGauge(getMetricsName(actionType, "totalCompactedRecordsUpdated"), 0);
183+
metrics.registerGauge(getMetricsName(actionType, "totalLogFilesCompacted"), 0);
184+
metrics.registerGauge(getMetricsName(actionType, "totalLogFilesSize"), 0);
179185
}
180186

181187
public void updateCommitMetrics(long commitEpochTimeInMs, long durationInMs, HoodieCommitMetadata metadata,
@@ -196,20 +202,20 @@ public void updateCommitMetrics(long commitEpochTimeInMs, long durationInMs, Hoo
196202
long totalCompactedRecordsUpdated = metadata.getTotalCompactedRecordsUpdated();
197203
long totalLogFilesCompacted = metadata.getTotalLogFilesCompacted();
198204
long totalLogFilesSize = metadata.getTotalLogFilesSize();
199-
Metrics.registerGauge(getMetricsName(actionType, "totalPartitionsWritten"), totalPartitionsWritten);
200-
Metrics.registerGauge(getMetricsName(actionType, "totalFilesInsert"), totalFilesInsert);
201-
Metrics.registerGauge(getMetricsName(actionType, "totalFilesUpdate"), totalFilesUpdate);
202-
Metrics.registerGauge(getMetricsName(actionType, "totalRecordsWritten"), totalRecordsWritten);
203-
Metrics.registerGauge(getMetricsName(actionType, "totalUpdateRecordsWritten"), totalUpdateRecordsWritten);
204-
Metrics.registerGauge(getMetricsName(actionType, "totalInsertRecordsWritten"), totalInsertRecordsWritten);
205-
Metrics.registerGauge(getMetricsName(actionType, "totalBytesWritten"), totalBytesWritten);
206-
Metrics.registerGauge(getMetricsName(actionType, "totalScanTime"), totalTimeTakenByScanner);
207-
Metrics.registerGauge(getMetricsName(actionType, "totalCreateTime"), totalTimeTakenForInsert);
208-
Metrics.registerGauge(getMetricsName(actionType, "totalUpsertTime"), totalTimeTakenForUpsert);
209-
Metrics.registerGauge(getMetricsName(actionType, "totalCompactedRecordsUpdated"), totalCompactedRecordsUpdated);
210-
Metrics.registerGauge(getMetricsName(actionType, "totalLogFilesCompacted"), totalLogFilesCompacted);
211-
Metrics.registerGauge(getMetricsName(actionType, "totalLogFilesSize"), totalLogFilesSize);
212-
Metrics.registerGauge(getMetricsName(actionType, "totalRecordsDeleted"), totalRecordsDeleted);
205+
metrics.registerGauge(getMetricsName(actionType, "totalPartitionsWritten"), totalPartitionsWritten);
206+
metrics.registerGauge(getMetricsName(actionType, "totalFilesInsert"), totalFilesInsert);
207+
metrics.registerGauge(getMetricsName(actionType, "totalFilesUpdate"), totalFilesUpdate);
208+
metrics.registerGauge(getMetricsName(actionType, "totalRecordsWritten"), totalRecordsWritten);
209+
metrics.registerGauge(getMetricsName(actionType, "totalUpdateRecordsWritten"), totalUpdateRecordsWritten);
210+
metrics.registerGauge(getMetricsName(actionType, "totalInsertRecordsWritten"), totalInsertRecordsWritten);
211+
metrics.registerGauge(getMetricsName(actionType, "totalBytesWritten"), totalBytesWritten);
212+
metrics.registerGauge(getMetricsName(actionType, "totalScanTime"), totalTimeTakenByScanner);
213+
metrics.registerGauge(getMetricsName(actionType, "totalCreateTime"), totalTimeTakenForInsert);
214+
metrics.registerGauge(getMetricsName(actionType, "totalUpsertTime"), totalTimeTakenForUpsert);
215+
metrics.registerGauge(getMetricsName(actionType, "totalCompactedRecordsUpdated"), totalCompactedRecordsUpdated);
216+
metrics.registerGauge(getMetricsName(actionType, "totalLogFilesCompacted"), totalLogFilesCompacted);
217+
metrics.registerGauge(getMetricsName(actionType, "totalLogFilesSize"), totalLogFilesSize);
218+
metrics.registerGauge(getMetricsName(actionType, "totalRecordsDeleted"), totalRecordsDeleted);
213219
}
214220
}
215221

@@ -219,48 +225,48 @@ private void updateCommitTimingMetrics(long commitEpochTimeInMs, long durationIn
219225
Pair<Option<Long>, Option<Long>> eventTimePairMinMax = metadata.getMinAndMaxEventTime();
220226
if (eventTimePairMinMax.getLeft().isPresent()) {
221227
long commitLatencyInMs = commitEpochTimeInMs + durationInMs - eventTimePairMinMax.getLeft().get();
222-
Metrics.registerGauge(getMetricsName(actionType, "commitLatencyInMs"), commitLatencyInMs);
228+
metrics.registerGauge(getMetricsName(actionType, "commitLatencyInMs"), commitLatencyInMs);
223229
}
224230
if (eventTimePairMinMax.getRight().isPresent()) {
225231
long commitFreshnessInMs = commitEpochTimeInMs + durationInMs - eventTimePairMinMax.getRight().get();
226-
Metrics.registerGauge(getMetricsName(actionType, "commitFreshnessInMs"), commitFreshnessInMs);
232+
metrics.registerGauge(getMetricsName(actionType, "commitFreshnessInMs"), commitFreshnessInMs);
227233
}
228-
Metrics.registerGauge(getMetricsName(actionType, "commitTime"), commitEpochTimeInMs);
229-
Metrics.registerGauge(getMetricsName(actionType, "duration"), durationInMs);
234+
metrics.registerGauge(getMetricsName(actionType, "commitTime"), commitEpochTimeInMs);
235+
metrics.registerGauge(getMetricsName(actionType, "duration"), durationInMs);
230236
}
231237
}
232238

233239
public void updateRollbackMetrics(long durationInMs, long numFilesDeleted) {
234240
if (config.isMetricsOn()) {
235241
LOG.info(
236242
String.format("Sending rollback metrics (duration=%d, numFilesDeleted=%d)", durationInMs, numFilesDeleted));
237-
Metrics.registerGauge(getMetricsName("rollback", "duration"), durationInMs);
238-
Metrics.registerGauge(getMetricsName("rollback", "numFilesDeleted"), numFilesDeleted);
243+
metrics.registerGauge(getMetricsName("rollback", "duration"), durationInMs);
244+
metrics.registerGauge(getMetricsName("rollback", "numFilesDeleted"), numFilesDeleted);
239245
}
240246
}
241247

242248
public void updateCleanMetrics(long durationInMs, int numFilesDeleted) {
243249
if (config.isMetricsOn()) {
244250
LOG.info(
245251
String.format("Sending clean metrics (duration=%d, numFilesDeleted=%d)", durationInMs, numFilesDeleted));
246-
Metrics.registerGauge(getMetricsName("clean", "duration"), durationInMs);
247-
Metrics.registerGauge(getMetricsName("clean", "numFilesDeleted"), numFilesDeleted);
252+
metrics.registerGauge(getMetricsName("clean", "duration"), durationInMs);
253+
metrics.registerGauge(getMetricsName("clean", "numFilesDeleted"), numFilesDeleted);
248254
}
249255
}
250256

251257
public void updateFinalizeWriteMetrics(long durationInMs, long numFilesFinalized) {
252258
if (config.isMetricsOn()) {
253259
LOG.info(String.format("Sending finalize write metrics (duration=%d, numFilesFinalized=%d)", durationInMs,
254260
numFilesFinalized));
255-
Metrics.registerGauge(getMetricsName("finalize", "duration"), durationInMs);
256-
Metrics.registerGauge(getMetricsName("finalize", "numFilesFinalized"), numFilesFinalized);
261+
metrics.registerGauge(getMetricsName("finalize", "duration"), durationInMs);
262+
metrics.registerGauge(getMetricsName("finalize", "numFilesFinalized"), numFilesFinalized);
257263
}
258264
}
259265

260266
public void updateIndexMetrics(final String action, final long durationInMs) {
261267
if (config.isMetricsOn()) {
262268
LOG.info(String.format("Sending index metrics (%s.duration, %d)", action, durationInMs));
263-
Metrics.registerGauge(getMetricsName("index", String.format("%s.duration", action)), durationInMs);
269+
metrics.registerGauge(getMetricsName("index", String.format("%s.duration", action)), durationInMs);
264270
}
265271
}
266272

@@ -293,7 +299,7 @@ public void emitConflictResolutionFailed() {
293299

294300
private Counter getCounter(Counter counter, String name) {
295301
if (counter == null) {
296-
return Metrics.getInstance().getRegistry().counter(name);
302+
return metrics.getRegistry().counter(name);
297303
}
298304
return counter;
299305
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java

Lines changed: 40 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@
2121
import org.apache.hudi.common.metrics.Registry;
2222
import org.apache.hudi.common.util.Option;
2323
import org.apache.hudi.config.HoodieWriteConfig;
24-
import org.apache.hudi.exception.HoodieException;
2524

2625
import com.codahale.metrics.MetricRegistry;
2726
import org.apache.log4j.LogManager;
2827
import org.apache.log4j.Logger;
2928

29+
import java.util.HashMap;
3030
import java.util.Map;
3131

3232
/**
@@ -36,14 +36,14 @@ public class Metrics {
3636

3737
private static final Logger LOG = LogManager.getLogger(Metrics.class);
3838

39-
private static volatile boolean initialized = false;
40-
private static Metrics instance = null;
39+
private static final Map<String, Metrics> METRICS_INSTANCE_PER_BASEPATH = new HashMap<>();
4140

4241
private final MetricRegistry registry;
4342
private MetricsReporter reporter;
4443
private final String commonMetricPrefix;
44+
private boolean initialized = false;
4545

46-
private Metrics(HoodieWriteConfig metricConfig) {
46+
public Metrics(HoodieWriteConfig metricConfig) {
4747
registry = new MetricRegistry();
4848
commonMetricPrefix = metricConfig.getMetricReporterMetricsNamePrefix();
4949
reporter = MetricsReporterFactory.createReporter(metricConfig, registry);
@@ -52,75 +52,60 @@ private Metrics(HoodieWriteConfig metricConfig) {
5252
}
5353
reporter.start();
5454

55-
Runtime.getRuntime().addShutdownHook(new Thread(Metrics::shutdown));
55+
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
56+
this.initialized = true;
5657
}
5758

58-
private void reportAndStopReporter() {
59+
private void registerHoodieCommonMetrics() {
60+
registerGauges(Registry.getAllMetrics(true, true), Option.of(commonMetricPrefix));
61+
}
62+
63+
public static synchronized Metrics getInstance(HoodieWriteConfig metricConfig) {
64+
String basePath = metricConfig.getBasePath();
65+
if (METRICS_INSTANCE_PER_BASEPATH.containsKey(basePath)) {
66+
return METRICS_INSTANCE_PER_BASEPATH.get(basePath);
67+
}
68+
69+
Metrics metrics = new Metrics(metricConfig);
70+
METRICS_INSTANCE_PER_BASEPATH.put(basePath, metrics);
71+
return metrics;
72+
}
73+
74+
public static synchronized void shutdownAllMetrics() {
75+
METRICS_INSTANCE_PER_BASEPATH.values().forEach(Metrics::shutdown);
76+
}
77+
78+
public synchronized void shutdown() {
5979
try {
6080
registerHoodieCommonMetrics();
6181
reporter.report();
6282
LOG.info("Stopping the metrics reporter...");
6383
reporter.stop();
6484
} catch (Exception e) {
6585
LOG.warn("Error while closing reporter", e);
86+
} finally {
87+
initialized = false;
6688
}
6789
}
6890

69-
private void reportAndFlushMetrics() {
91+
public synchronized void flush() {
7092
try {
7193
LOG.info("Reporting and flushing all metrics");
72-
this.registerHoodieCommonMetrics();
73-
this.reporter.report();
74-
this.registry.getNames().forEach(this.registry::remove);
94+
registerHoodieCommonMetrics();
95+
reporter.report();
96+
registry.getNames().forEach(this.registry::remove);
7597
} catch (Exception e) {
7698
LOG.error("Error while reporting and flushing metrics", e);
7799
}
78100
}
79-
80-
private void registerHoodieCommonMetrics() {
81-
registerGauges(Registry.getAllMetrics(true, true), Option.of(commonMetricPrefix));
82-
}
83-
84-
public static Metrics getInstance() {
85-
assert initialized;
86-
return instance;
87-
}
88-
89-
public static synchronized void init(HoodieWriteConfig metricConfig) {
90-
if (initialized) {
91-
return;
92-
}
93-
try {
94-
instance = new Metrics(metricConfig);
95-
} catch (Exception e) {
96-
throw new HoodieException(e);
97-
}
98-
initialized = true;
99-
}
100-
101-
public static synchronized void shutdown() {
102-
if (!initialized) {
103-
return;
104-
}
105-
instance.reportAndStopReporter();
106-
initialized = false;
107-
}
108-
109-
public static synchronized void flush() {
110-
if (!Metrics.initialized) {
111-
return;
112-
}
113-
instance.reportAndFlushMetrics();
114-
}
115-
116-
public static void registerGauges(Map<String, Long> metricsMap, Option<String> prefix) {
101+
102+
public void registerGauges(Map<String, Long> metricsMap, Option<String> prefix) {
117103
String metricPrefix = prefix.isPresent() ? prefix.get() + "." : "";
118104
metricsMap.forEach((k, v) -> registerGauge(metricPrefix + k, v));
119105
}
120106

121-
public static void registerGauge(String metricName, final long value) {
107+
public void registerGauge(String metricName, final long value) {
122108
try {
123-
MetricRegistry registry = Metrics.getInstance().getRegistry();
124109
HoodieGauge guage = (HoodieGauge) registry.gauge(metricName, () -> new HoodieGauge<>(value));
125110
guage.setValue(value);
126111
} catch (Exception e) {
@@ -134,7 +119,10 @@ public MetricRegistry getRegistry() {
134119
return registry;
135120
}
136121

137-
public static boolean isInitialized() {
138-
return initialized;
122+
public static boolean isInitialized(String basePath) {
123+
if (METRICS_INSTANCE_PER_BASEPATH.containsKey(basePath)) {
124+
return METRICS_INSTANCE_PER_BASEPATH.get(basePath).initialized;
125+
}
126+
return false;
139127
}
140128
}

hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieConsoleMetrics.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@
2626
import org.mockito.Mock;
2727
import org.mockito.junit.jupiter.MockitoExtension;
2828

29-
import static org.apache.hudi.metrics.Metrics.registerGauge;
29+
import java.util.UUID;
30+
3031
import static org.junit.jupiter.api.Assertions.assertEquals;
3132
import static org.mockito.Mockito.when;
3233

@@ -35,23 +36,27 @@ public class TestHoodieConsoleMetrics {
3536

3637
@Mock
3738
HoodieWriteConfig config;
39+
HoodieMetrics hoodieMetrics;
40+
Metrics metrics;
3841

3942
@BeforeEach
4043
public void start() {
4144
when(config.getTableName()).thenReturn("console_metrics_test");
4245
when(config.isMetricsOn()).thenReturn(true);
4346
when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.CONSOLE);
44-
new HoodieMetrics(config);
47+
when(config.getBasePath()).thenReturn("s3://test" + UUID.randomUUID());
48+
hoodieMetrics = new HoodieMetrics(config);
49+
metrics = hoodieMetrics.getMetrics();
4550
}
4651

4752
@AfterEach
4853
public void stop() {
49-
Metrics.shutdown();
54+
metrics.shutdown();
5055
}
5156

5257
@Test
5358
public void testRegisterGauge() {
54-
registerGauge("metric1", 123L);
55-
assertEquals("123", Metrics.getInstance().getRegistry().getGauges().get("metric1").getValue().toString());
59+
metrics.registerGauge("metric1", 123L);
60+
assertEquals("123", metrics.getRegistry().getGauges().get("metric1").getValue().toString());
5661
}
5762
}

0 commit comments

Comments
 (0)