Skip to content

Commit c50f95a

Browse files
authored
Merge pull request #8575 from vinted/endpointsetipport
thanos/query: add IP/port to endpointset metrics
2 parents 075fb99 + 23a1c66 commit c50f95a

File tree

3 files changed

+188
-74
lines changed

3 files changed

+188
-74
lines changed

cmd/thanos/query.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func registerQuery(app *extkingpin.App) {
103103

104104
queryConnMetricLabels := cmd.Flag("query.conn-metric.label", "Optional selection of query connection metric labels to be collected from endpoint set").
105105
Default(string(query.ExternalLabels), string(query.StoreType)).
106-
Enums(string(query.ExternalLabels), string(query.StoreType))
106+
Enums(string(query.ExternalLabels), string(query.StoreType), string(query.IPPort))
107107

108108
deduplicationFunc := cmd.Flag("deduplication.func", "Experimental. Deduplication algorithm for merging overlapping series. "+
109109
"Possible values are: \"penalty\", \"chain\". If no value is specified, penalty based deduplication algorithm will be used. "+

pkg/query/endpointset.go

Lines changed: 123 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@ import (
99
"fmt"
1010
"math"
1111
"sort"
12+
"strings"
1213
"sync"
1314
"time"
1415
"unicode/utf8"
1516

17+
"github.com/cespare/xxhash/v2"
1618
"github.com/go-kit/log"
1719
"github.com/go-kit/log/level"
1820
"github.com/pkg/errors"
@@ -46,6 +48,7 @@ type queryConnMetricLabel string
4648
const (
4749
ExternalLabels queryConnMetricLabel = "external_labels"
4850
StoreType queryConnMetricLabel = "store_type"
51+
IPPort queryConnMetricLabel = "ip_port"
4952
)
5053

5154
type GRPCEndpointSpec struct {
@@ -117,28 +120,41 @@ type EndpointStatus struct {
117120
// A Collector is required as we want atomic updates for all 'thanos_store_nodes_grpc_connections' series.
118121
// TODO(hitanshu-mehta) Currently,only collecting metrics of storeEndpoints. Make this struct generic.
119122
type endpointSetNodeCollector struct {
120-
mtx sync.Mutex
121-
storeNodes map[string]map[string]int
122-
storePerExtLset map[string]int
123+
mtx sync.Mutex
124+
storeNodes endpointStats
123125

124126
logger log.Logger
125127
connectionsDesc *prometheus.Desc
126128
labels []string
129+
labelsMap map[string]struct{}
130+
131+
hasherPool sync.Pool
127132
}
128133

129134
func newEndpointSetNodeCollector(logger log.Logger, labels ...string) *endpointSetNodeCollector {
130135
if len(labels) == 0 {
131136
labels = []string{string(ExternalLabels), string(StoreType)}
132137
}
138+
139+
labelsMap := make(map[string]struct{})
140+
for _, lbl := range labels {
141+
labelsMap[lbl] = struct{}{}
142+
}
133143
return &endpointSetNodeCollector{
134144
logger: logger,
135-
storeNodes: map[string]map[string]int{},
145+
storeNodes: endpointStats{},
136146
connectionsDesc: prometheus.NewDesc(
137147
"thanos_store_nodes_grpc_connections",
138148
"Number of gRPC connection to Store APIs. Opened connection means healthy store APIs available for Querier.",
139149
labels, nil,
140150
),
141-
labels: labels,
151+
labels: labels,
152+
labelsMap: labelsMap,
153+
hasherPool: sync.Pool{
154+
New: func() any {
155+
return xxhash.New()
156+
},
157+
},
142158
}
143159
}
144160

@@ -155,52 +171,66 @@ func truncateExtLabels(s string, threshold int) string {
155171
}
156172
return s
157173
}
158-
func (c *endpointSetNodeCollector) Update(nodes map[string]map[string]int) {
159-
storeNodes := make(map[string]map[string]int, len(nodes))
160-
storePerExtLset := map[string]int{}
161-
162-
for storeType, occurrencesPerExtLset := range nodes {
163-
storeNodes[storeType] = make(map[string]int, len(occurrencesPerExtLset))
164-
for externalLabels, occurrences := range occurrencesPerExtLset {
165-
externalLabels = truncateExtLabels(externalLabels, externalLabelLimit)
166-
storePerExtLset[externalLabels] += occurrences
167-
storeNodes[storeType][externalLabels] = occurrences
168-
}
169-
}
170-
174+
func (c *endpointSetNodeCollector) Update(stats endpointStats) {
171175
c.mtx.Lock()
172176
defer c.mtx.Unlock()
173-
c.storeNodes = storeNodes
174-
c.storePerExtLset = storePerExtLset
177+
c.storeNodes = stats
175178
}
176179

177180
func (c *endpointSetNodeCollector) Describe(ch chan<- *prometheus.Desc) {
178181
ch <- c.connectionsDesc
179182
}
180183

184+
func (c *endpointSetNodeCollector) hash(e endpointStat) uint64 {
185+
h := c.hasherPool.Get().(*xxhash.Digest)
186+
defer func() {
187+
h.Reset()
188+
c.hasherPool.Put(h)
189+
}()
190+
191+
if _, ok := c.labelsMap[string(IPPort)]; ok {
192+
_, _ = h.Write([]byte(e.ip))
193+
}
194+
if _, ok := c.labelsMap[string(ExternalLabels)]; ok {
195+
_, _ = h.Write([]byte(e.extLset))
196+
}
197+
if _, ok := c.labelsMap[string(StoreType)]; ok {
198+
_, _ = h.Write([]byte(e.component))
199+
}
200+
201+
return h.Sum64()
202+
}
203+
181204
func (c *endpointSetNodeCollector) Collect(ch chan<- prometheus.Metric) {
182205
c.mtx.Lock()
183206
defer c.mtx.Unlock()
184207

185-
for k, occurrencesPerExtLset := range c.storeNodes {
186-
for externalLabels, occurrences := range occurrencesPerExtLset {
187-
// Select only required labels.
188-
lbls := []string{}
189-
for _, lbl := range c.labels {
190-
switch lbl {
191-
case string(ExternalLabels):
192-
lbls = append(lbls, externalLabels)
193-
case string(StoreType):
194-
lbls = append(lbls, k)
195-
}
196-
}
197-
select {
198-
case ch <- prometheus.MustNewConstMetric(c.connectionsDesc, prometheus.GaugeValue, float64(occurrences), lbls...):
199-
case <-time.After(1 * time.Second):
200-
level.Warn(c.logger).Log("msg", "failed to collect endpointset metrics", "timeout", 1*time.Second)
201-
return
208+
var occurrences = make(map[uint64]int)
209+
for _, e := range c.storeNodes {
210+
h := c.hash(e)
211+
occurrences[h]++
212+
}
213+
214+
for _, n := range c.storeNodes {
215+
h := c.hash(n)
216+
lbls := make([]string, 0, len(c.labels))
217+
for _, lbl := range c.labels {
218+
switch lbl {
219+
case string(ExternalLabels):
220+
lbls = append(lbls, n.extLset)
221+
case string(StoreType):
222+
lbls = append(lbls, n.component)
223+
case string(IPPort):
224+
lbls = append(lbls, n.ip)
202225
}
203226
}
227+
228+
select {
229+
case ch <- prometheus.MustNewConstMetric(c.connectionsDesc, prometheus.GaugeValue, float64(occurrences[h]), lbls...):
230+
case <-time.After(1 * time.Second):
231+
level.Warn(c.logger).Log("msg", "failed to collect endpointset metrics", "timeout", 1*time.Second)
232+
return
233+
}
204234
}
205235
}
206236

@@ -374,9 +404,21 @@ func (e *EndpointSet) Update(ctx context.Context) {
374404
}
375405
level.Debug(e.logger).Log("msg", "updated endpoints", "activeEndpoints", len(e.endpoints))
376406

407+
nodes := make(map[string]map[string]int, len(component.All))
408+
for _, comp := range component.All {
409+
nodes[comp.String()] = map[string]int{}
410+
}
411+
377412
// Update stats.
378413
stats := newEndpointAPIStats()
379-
for addr, er := range e.endpoints {
414+
415+
endpointIPs := make([]string, 0, len(e.endpoints))
416+
for addr := range e.endpoints {
417+
endpointIPs = append(endpointIPs, addr)
418+
}
419+
sort.Strings(endpointIPs)
420+
for _, addr := range endpointIPs {
421+
er := e.endpoints[addr]
380422
if !er.isQueryable() {
381423
continue
382424
}
@@ -385,12 +427,14 @@ func (e *EndpointSet) Update(ctx context.Context) {
385427

386428
// All producers that expose StoreAPI should have unique external labels. Check all which connect to our Querier.
387429
if er.HasStoreAPI() && (er.ComponentType() == component.Sidecar || er.ComponentType() == component.Rule) &&
388-
stats[component.Sidecar.String()][extLset]+stats[component.Rule.String()][extLset] > 0 {
430+
nodes[component.Sidecar.String()][extLset]+nodes[component.Rule.String()][extLset] > 0 {
389431

390432
level.Warn(e.logger).Log("msg", "found duplicate storeEndpoints producer (sidecar or ruler). This is not advised as it will malform data in in the same bucket",
391-
"address", addr, "extLset", extLset, "duplicates", fmt.Sprintf("%v", stats[component.Sidecar.String()][extLset]+stats[component.Rule.String()][extLset]+1))
433+
"address", addr, "extLset", extLset, "duplicates", fmt.Sprintf("%v", nodes[component.Sidecar.String()][extLset]+nodes[component.Rule.String()][extLset]+1))
392434
}
393-
stats[er.ComponentType().String()][extLset]++
435+
nodes[er.ComponentType().String()][extLset]++
436+
437+
stats = stats.append(er.addr, extLset, er.ComponentType().String())
394438
}
395439

396440
e.endpointsMetric.Update(stats)
@@ -861,12 +905,44 @@ type endpointMetadata struct {
861905
*infopb.InfoResponse
862906
}
863907

864-
func newEndpointAPIStats() map[string]map[string]int {
865-
nodes := make(map[string]map[string]int, len(component.All))
866-
for _, comp := range component.All {
867-
nodes[comp.String()] = map[string]int{}
868-
}
869-
return nodes
908+
type endpointStat struct {
909+
ip string
910+
extLset string
911+
component string
912+
}
913+
914+
func newEndpointAPIStats() endpointStats {
915+
return []endpointStat{}
916+
}
917+
918+
type endpointStats []endpointStat
919+
920+
func (s *endpointStats) Sort() endpointStats {
921+
sort.Slice(*s, func(i, j int) bool {
922+
ipc := strings.Compare((*s)[i].ip, (*s)[j].ip)
923+
if ipc != 0 {
924+
return ipc < 0
925+
}
926+
927+
extLsetc := strings.Compare((*s)[i].extLset, (*s)[j].extLset)
928+
if extLsetc != 0 {
929+
return extLsetc < 0
930+
}
931+
932+
return strings.Compare((*s)[i].component, (*s)[j].component) < 0
933+
})
934+
935+
return *s
936+
}
937+
938+
func (es *endpointStats) append(ip, extLset, component string) endpointStats {
939+
truncatedExtLabels := truncateExtLabels(extLset, externalLabelLimit)
940+
941+
return append(*es, endpointStat{
942+
ip: ip,
943+
extLset: truncatedExtLabels,
944+
component: component,
945+
})
870946
}
871947

872948
func maxRangeStoreMetadata() *endpointMetadata {

pkg/query/endpointset_test.go

Lines changed: 64 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -708,11 +708,13 @@ func TestEndpointSetUpdate_AvailabilityScenarios(t *testing.T) {
708708

709709
// Check stats.
710710
expected := newEndpointAPIStats()
711-
expected[component.Sidecar.String()] = map[string]int{
712-
fmt.Sprintf("{a=\"b\"},{addr=\"%s\"}", discoveredEndpointAddr[0]): 1,
713-
fmt.Sprintf("{a=\"b\"},{addr=\"%s\"}", discoveredEndpointAddr[1]): 1,
714-
}
715-
testutil.Equals(t, expected, endpointSet.endpointsMetric.storeNodes)
711+
expected = expected.append(
712+
discoveredEndpointAddr[0], fmt.Sprintf("{a=\"b\"},{addr=\"%s\"}", discoveredEndpointAddr[0]), component.Sidecar.String(),
713+
)
714+
expected = expected.append(
715+
discoveredEndpointAddr[1], fmt.Sprintf("{a=\"b\"},{addr=\"%s\"}", discoveredEndpointAddr[1]), component.Sidecar.String(),
716+
)
717+
testutil.Equals(t, expected.Sort(), endpointSet.endpointsMetric.storeNodes.Sort())
716718

717719
// Remove address from discovered and reset last check, which should ensure cleanup of status on next update.
718720
now = now.Add(3 * time.Minute)
@@ -721,7 +723,7 @@ func TestEndpointSetUpdate_AvailabilityScenarios(t *testing.T) {
721723
testutil.Equals(t, 2, len(endpointSet.endpoints))
722724

723725
endpoints.CloseOne(discoveredEndpointAddr[0])
724-
delete(expected[component.Sidecar.String()], fmt.Sprintf("{a=\"b\"},{addr=\"%s\"}", discoveredEndpointAddr[0]))
726+
expected = expected[1:]
725727

726728
// We expect Update to tear down store client for closed store server.
727729
endpointSet.Update(context.Background())
@@ -736,7 +738,6 @@ func TestEndpointSetUpdate_AvailabilityScenarios(t *testing.T) {
736738
testutil.Equals(t, 2, len(lset))
737739
testutil.Equals(t, addr, lset[0].Get("addr"))
738740
testutil.Equals(t, "b", lset[1].Get("a"))
739-
testutil.Equals(t, expected, endpointSet.endpointsMetric.storeNodes)
740741

741742
// New big batch of endpoints.
742743
endpoint2, err := startTestEndpoints([]testEndpointMeta{
@@ -967,26 +968,63 @@ func TestEndpointSetUpdate_AvailabilityScenarios(t *testing.T) {
967968

968969
// Check stats.
969970
expected = newEndpointAPIStats()
970-
expected[component.Query.String()] = map[string]int{
971-
"{l1=\"v2\", l2=\"v3\"}": 1,
972-
"{l1=\"v2\", l2=\"v3\"},{l3=\"v4\"}": 2,
973-
}
974-
expected[component.Rule.String()] = map[string]int{
975-
"{l1=\"v2\", l2=\"v3\"}": 2,
976-
}
977-
expected[component.Sidecar.String()] = map[string]int{
978-
fmt.Sprintf("{a=\"b\"},{addr=\"%s\"}", discoveredEndpointAddr[1]): 1,
979-
"{l1=\"v2\", l2=\"v3\"}": 2,
971+
expected = expected.append(
972+
discoveredEndpointAddr[6], "{l1=\"v2\", l2=\"v3\"}", component.Query.String(),
973+
)
974+
expected = expected.append(
975+
discoveredEndpointAddr[2], "{l1=\"v2\", l2=\"v3\"},{l3=\"v4\"}", component.Query.String(),
976+
)
977+
expected = expected.append(
978+
discoveredEndpointAddr[3], "{l1=\"v2\", l2=\"v3\"},{l3=\"v4\"}", component.Query.String(),
979+
)
980+
expected = expected.append(
981+
discoveredEndpointAddr[8], "{l1=\"v2\", l2=\"v3\"}", component.Rule.String(),
982+
)
983+
expected = expected.append(
984+
discoveredEndpointAddr[7], "{l1=\"v2\", l2=\"v3\"}", component.Rule.String(),
985+
)
986+
987+
expected = expected.append(
988+
discoveredEndpointAddr[1], fmt.Sprintf("{a=\"b\"},{addr=\"%s\"}", discoveredEndpointAddr[1]), component.Sidecar.String(),
989+
)
990+
expected = expected.append(
991+
discoveredEndpointAddr[4], "{l1=\"v2\", l2=\"v3\"}", component.Sidecar.String(),
992+
)
993+
expected = expected.append(
994+
discoveredEndpointAddr[5], "{l1=\"v2\", l2=\"v3\"}", component.Sidecar.String(),
995+
)
996+
997+
expected = expected.append(
998+
discoveredEndpointAddr[12], "{l1=\"v2\", l2=\"v3\"},{l3=\"v4\"}", component.Store.String(),
999+
)
1000+
expected = expected.append(
1001+
discoveredEndpointAddr[11], "{l1=\"v2\", l2=\"v3\"},{l3=\"v4\"}", component.Store.String(),
1002+
)
1003+
expected = expected.append(
1004+
discoveredEndpointAddr[13], "{l1=\"v2\", l2=\"v3\"},{l3=\"v4\"}", component.Store.String(),
1005+
)
1006+
1007+
expected = expected.append(
1008+
discoveredEndpointAddr[10], "", component.Store.String(),
1009+
)
1010+
expected = expected.append(
1011+
discoveredEndpointAddr[9], "", component.Store.String(),
1012+
)
1013+
1014+
expected = expected.append(
1015+
discoveredEndpointAddr[14], "{l1=\"v2\", l2=\"v3\"},{l3=\"v4\"}", component.Receive.String(),
1016+
)
1017+
expected = expected.append(
1018+
discoveredEndpointAddr[15], "{l1=\"v2\", l2=\"v3\"},{l3=\"v4\"}", component.Receive.String(),
1019+
)
1020+
1021+
expected = expected.Sort()
1022+
endpointSet.endpointsMetric.storeNodes = endpointSet.endpointsMetric.storeNodes.Sort()
1023+
testutil.Equals(t, len(expected), len(endpointSet.endpointsMetric.storeNodes))
1024+
for i := range expected {
1025+
t.Log(i)
1026+
testutil.Equals(t, expected[i], endpointSet.endpointsMetric.storeNodes[i])
9801027
}
981-
expected[component.Store.String()] = map[string]int{
982-
"": 2,
983-
"{l1=\"v2\", l2=\"v3\"},{l3=\"v4\"}": 3,
984-
}
985-
expected[component.Receive.String()] = map[string]int{
986-
"{l1=\"v2\", l2=\"v3\"},{l3=\"v4\"}": 2,
987-
}
988-
testutil.Equals(t, expected, endpointSet.endpointsMetric.storeNodes)
989-
9901028
// Close remaining endpoint from previous batch
9911029
endpoints.CloseOne(discoveredEndpointAddr[1])
9921030
endpointSet.Update(context.Background())

0 commit comments

Comments
 (0)