-
Notifications
You must be signed in to change notification settings - Fork 966
PrometheusHttpServer drops metrics with same name and different type #5078
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,6 +10,8 @@ | |
|
|
||
| package io.opentelemetry.exporter.prometheus; | ||
|
|
||
| import static java.util.stream.Collectors.joining; | ||
|
|
||
| import com.sun.net.httpserver.HttpExchange; | ||
| import com.sun.net.httpserver.HttpHandler; | ||
| import com.sun.net.httpserver.HttpServer; | ||
|
|
@@ -34,11 +36,14 @@ | |
| import java.util.HashSet; | ||
| import java.util.List; | ||
| import java.util.Set; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.function.Predicate; | ||
| import java.util.function.Supplier; | ||
| import java.util.logging.Level; | ||
| import java.util.logging.Logger; | ||
| import java.util.zip.GZIPOutputStream; | ||
| import javax.annotation.Nullable; | ||
|
|
||
|
|
@@ -52,6 +57,7 @@ public final class PrometheusHttpServer implements Closeable, MetricReader { | |
|
|
||
| private static final DaemonThreadFactory THREAD_FACTORY = | ||
| new DaemonThreadFactory("prometheus-http"); | ||
| private static final Logger LOGGER = Logger.getLogger(PrometheusHttpServer.class.getName()); | ||
|
|
||
| private final HttpServer server; | ||
| private final ExecutorService executor; | ||
|
|
@@ -77,9 +83,10 @@ public static PrometheusHttpServerBuilder builder() { | |
| } catch (IOException e) { | ||
| throw new UncheckedIOException("Could not create Prometheus HTTP server", e); | ||
| } | ||
| server.createContext("/", new MetricsHandler(() -> getMetricProducer().collectAllMetrics())); | ||
| server.createContext( | ||
| "/metrics", new MetricsHandler(() -> getMetricProducer().collectAllMetrics())); | ||
| MetricsHandler metricsHandler = | ||
| new MetricsHandler(() -> getMetricProducer().collectAllMetrics()); | ||
| server.createContext("/", metricsHandler); | ||
| server.createContext("/metrics", metricsHandler); | ||
| server.createContext("/-/healthy", HealthHandler.INSTANCE); | ||
|
|
||
| executor = Executors.newFixedThreadPool(5, THREAD_FACTORY); | ||
|
|
@@ -159,6 +166,9 @@ InetSocketAddress getAddress() { | |
|
|
||
| private static class MetricsHandler implements HttpHandler { | ||
|
|
||
| private final Set<String> allConflictHeaderNames = | ||
| Collections.newSetFromMap(new ConcurrentHashMap<>()); | ||
|
|
||
| private final Supplier<Collection<MetricData>> metricsSupplier; | ||
|
|
||
| private MetricsHandler(Supplier<Collection<MetricData>> metricsSupplier) { | ||
|
|
@@ -190,7 +200,15 @@ public void handle(HttpExchange exchange) throws IOException { | |
| } else { | ||
| out = exchange.getResponseBody(); | ||
| } | ||
| serializer.write(metrics, out); | ||
| Set<String> conflictHeaderNames = serializer.write(metrics, out); | ||
| conflictHeaderNames.removeAll(allConflictHeaderNames); | ||
| if (conflictHeaderNames.size() > 0 && LOGGER.isLoggable(Level.WARNING)) { | ||
| LOGGER.log( | ||
| Level.WARNING, | ||
| "Metric conflict(s) detected. Multiple metrics with same name but different type: " | ||
| + conflictHeaderNames.stream().collect(joining(",", "[", "]"))); | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Goal is to log each name conflict that is detected once to avoid spammy logs. |
||
| allConflictHeaderNames.addAll(conflictHeaderNames); | ||
| } | ||
| } | ||
| exchange.close(); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -46,23 +46,25 @@ | |
| import java.io.UncheckedIOException; | ||
| import java.io.Writer; | ||
| import java.nio.charset.StandardCharsets; | ||
| import java.util.ArrayList; | ||
| import java.util.Collection; | ||
| import java.util.Collections; | ||
| import java.util.HashSet; | ||
| import java.util.LinkedHashMap; | ||
| import java.util.LinkedHashSet; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Optional; | ||
| import java.util.Set; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.function.BiConsumer; | ||
| import java.util.function.Predicate; | ||
| import java.util.stream.Collectors; | ||
| import javax.annotation.Nullable; | ||
|
|
||
| /** Serializes metrics into Prometheus exposition formats. */ | ||
| // Adapted from | ||
| // https://github.com/prometheus/client_java/blob/master/simpleclient_common/src/main/java/io/prometheus/client/exporter/common/TextFormat.java | ||
| abstract class Serializer { | ||
|
|
||
| static Serializer create(@Nullable String acceptHeader, Predicate<String> filter) { | ||
| if (acceptHeader == null) { | ||
| return new Prometheus004Serializer(filter); | ||
|
|
@@ -100,61 +102,64 @@ abstract void writeExemplar( | |
|
|
||
| abstract void writeEof(Writer writer) throws IOException; | ||
|
|
||
| final void write(Collection<MetricData> metrics, OutputStream output) throws IOException { | ||
| Map<InstrumentationScopeInfo, List<MetricData>> metricsByScope = | ||
| metrics.stream() | ||
| // Not supported in specification yet. | ||
| .filter(metric -> metric.getType() != MetricDataType.EXPONENTIAL_HISTOGRAM) | ||
| // PrometheusHttpServer#getAggregationTemporality specifies cumulative temporality for | ||
| // all instruments, but non-SDK MetricProducers may not conform. We drop delta | ||
| // temporality metrics to avoid the complexity of stateful transformation to cumulative. | ||
| .filter(metric -> !isDeltaTemporality(metric)) | ||
| .filter(metric -> metricNameFilter.test(metricName(metric))) | ||
| .collect( | ||
| Collectors.groupingBy( | ||
| MetricData::getInstrumentationScopeInfo, | ||
| LinkedHashMap::new, | ||
| Collectors.toList())); | ||
| final Set<String> write(Collection<MetricData> metrics, OutputStream output) throws IOException { | ||
| Set<String> conflictMetricNames = new HashSet<>(); | ||
| Map<String, List<MetricData>> metricsByName = new LinkedHashMap<>(); | ||
| Set<InstrumentationScopeInfo> scopes = new LinkedHashSet<>(); | ||
| // Iterate through metrics, filtering and grouping by headerName | ||
| for (MetricData metric : metrics) { | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A for loop is more appropriate because we now need to track the unique scopes we've seen and metric name conflicts, which we return to the caller for logging purposes. |
||
| // Not supported in specification yet. | ||
| if (metric.getType() == MetricDataType.EXPONENTIAL_HISTOGRAM) { | ||
| continue; | ||
| } | ||
| // PrometheusHttpServer#getAggregationTemporality specifies cumulative temporality for | ||
| // all instruments, but non-SDK MetricProducers may not conform. We drop delta | ||
| // temporality metrics to avoid the complexity of stateful transformation to cumulative. | ||
| if (isDeltaTemporality(metric)) { | ||
| continue; | ||
| } | ||
| PrometheusType prometheusType = PrometheusType.forMetric(metric); | ||
| String metricName = metricName(metric.getName(), prometheusType); | ||
| // Skip metrics which do not pass metricNameFilter | ||
| if (!metricNameFilter.test(metricName)) { | ||
| continue; | ||
| } | ||
| List<MetricData> metricsWithHeaderName = | ||
| metricsByName.computeIfAbsent(metricName, unused -> new ArrayList<>()); | ||
| // Skip metrics with the same name but different type | ||
| if (metricsWithHeaderName.size() > 0 | ||
| && prometheusType != PrometheusType.forMetric(metricsWithHeaderName.get(0))) { | ||
| conflictMetricNames.add(metricName); | ||
| continue; | ||
| } | ||
|
|
||
| metricsWithHeaderName.add(metric); | ||
| scopes.add(metric.getInstrumentationScopeInfo()); | ||
| } | ||
|
|
||
| Optional<Resource> optResource = metrics.stream().findFirst().map(MetricData::getResource); | ||
| try (Writer writer = | ||
| new BufferedWriter(new OutputStreamWriter(output, StandardCharsets.UTF_8))) { | ||
| if (optResource.isPresent()) { | ||
| writeResource(optResource.get(), writer); | ||
| } | ||
| for (Map.Entry<InstrumentationScopeInfo, List<MetricData>> entry : | ||
| metricsByScope.entrySet()) { | ||
| for (InstrumentationScopeInfo scope : scopes) { | ||
| writeScopeInfo(scope, writer); | ||
| } | ||
| for (Map.Entry<String, List<MetricData>> entry : metricsByName.entrySet()) { | ||
| write(entry.getValue(), entry.getKey(), writer); | ||
| } | ||
| writeEof(writer); | ||
| } | ||
| return conflictMetricNames; | ||
| } | ||
|
|
||
| private void write( | ||
| List<MetricData> metrics, InstrumentationScopeInfo instrumentationScopeInfo, Writer writer) | ||
| throws IOException { | ||
| writeScopeInfo(instrumentationScopeInfo, writer); | ||
| // Group metrics with the scope, name, but different types. This is a semantic error which the | ||
| // SDK warns about but passes through to exporters to handle. | ||
| Map<String, List<MetricData>> metricsByName = | ||
| metrics.stream() | ||
| .collect( | ||
| Collectors.groupingBy( | ||
| metric -> | ||
| headerName( | ||
| NameSanitizer.INSTANCE.apply(metric.getName()), | ||
| PrometheusType.forMetric(metric)), | ||
| LinkedHashMap::new, | ||
| Collectors.toList())); | ||
|
|
||
| for (Map.Entry<String, List<MetricData>> entry : metricsByName.entrySet()) { | ||
| write(entry.getValue(), entry.getKey(), writer); | ||
| } | ||
| } | ||
|
|
||
| private void write(List<MetricData> metrics, String headerName, Writer writer) | ||
| private void write(List<MetricData> metrics, String metricName, Writer writer) | ||
| throws IOException { | ||
| // Write header based on first metric | ||
| PrometheusType type = PrometheusType.forMetric(metrics.get(0)); | ||
| MetricData first = metrics.get(0); | ||
| PrometheusType type = PrometheusType.forMetric(first); | ||
| String headerName = headerName(NameSanitizer.INSTANCE.apply(first.getName()), type); | ||
| String description = metrics.get(0).getDescription(); | ||
|
|
||
| writer.write("# TYPE "); | ||
|
|
@@ -171,21 +176,19 @@ private void write(List<MetricData> metrics, String headerName, Writer writer) | |
|
|
||
| // Then write the metrics. | ||
| for (MetricData metric : metrics) { | ||
| write(metric, writer); | ||
| write(metric, metricName, writer); | ||
| } | ||
| } | ||
|
|
||
| private void write(MetricData metric, Writer writer) throws IOException { | ||
| String name = metricName(metric); | ||
|
|
||
| private void write(MetricData metric, String metricName, Writer writer) throws IOException { | ||
| for (PointData point : getPoints(metric)) { | ||
| switch (metric.getType()) { | ||
| case DOUBLE_SUM: | ||
| case DOUBLE_GAUGE: | ||
| writePoint( | ||
| writer, | ||
| metric.getInstrumentationScopeInfo(), | ||
| name, | ||
| metricName, | ||
| ((DoublePointData) point).getValue(), | ||
| point.getAttributes(), | ||
| point.getEpochNanos()); | ||
|
|
@@ -195,18 +198,18 @@ private void write(MetricData metric, Writer writer) throws IOException { | |
| writePoint( | ||
| writer, | ||
| metric.getInstrumentationScopeInfo(), | ||
| name, | ||
| metricName, | ||
| (double) ((LongPointData) point).getValue(), | ||
| point.getAttributes(), | ||
| point.getEpochNanos()); | ||
| break; | ||
| case HISTOGRAM: | ||
| writeHistogram( | ||
| writer, metric.getInstrumentationScopeInfo(), name, (HistogramPointData) point); | ||
| writer, metric.getInstrumentationScopeInfo(), metricName, (HistogramPointData) point); | ||
| break; | ||
| case SUMMARY: | ||
| writeSummary( | ||
| writer, metric.getInstrumentationScopeInfo(), name, (SummaryPointData) point); | ||
| writer, metric.getInstrumentationScopeInfo(), metricName, (SummaryPointData) point); | ||
| break; | ||
| case EXPONENTIAL_HISTOGRAM: | ||
| throw new IllegalArgumentException("Can't happen"); | ||
|
|
@@ -648,9 +651,8 @@ static Collection<? extends PointData> getPoints(MetricData metricData) { | |
| return Collections.emptyList(); | ||
| } | ||
|
|
||
| private static String metricName(MetricData metric) { | ||
| PrometheusType type = PrometheusType.forMetric(metric); | ||
| String name = NameSanitizer.INSTANCE.apply(metric.getName()); | ||
| private static String metricName(String rawMetricName, PrometheusType type) { | ||
| String name = NameSanitizer.INSTANCE.apply(rawMetricName); | ||
| if (type == PrometheusType.COUNTER) { | ||
| name = name + "_total"; | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,4 +4,5 @@ | |
|
|
||
| requires transitive io.opentelemetry.sdk.metrics; | ||
| requires jdk.httpserver; | ||
| requires java.logging; | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.