Skip to content

Commit 059bd7d

Browse files
committed
[FLINK-36779][table] Fix metric is incorrect and non-changing during the time in Rank
1 parent d8010df commit 059bd7d

8 files changed

Lines changed: 160 additions & 13 deletions

File tree

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AbstractTopNFunction.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import java.util.Comparator;
4444
import java.util.Objects;
4545
import java.util.function.Function;
46+
import java.util.function.LongSupplier;
4647

4748
/**
4849
* Base class for TopN Function.
@@ -209,23 +210,28 @@ protected boolean checkSortKeyInBufferRange(RowData sortKey, TopNBuffer buffer)
209210
return buffer.checkSortKeyInBufferRange(sortKey, getDefaultTopNSize());
210211
}
211212

212-
protected void registerMetric(long heapSize) {
213-
registerMetric(heapSize, requestCount, hitCount);
213+
protected void registerMetric(LongSupplier heapSizeSupplier) {
214+
registerMetric(heapSizeSupplier, () -> requestCount, () -> hitCount);
214215
}
215216

216-
protected void registerMetric(long heapSize, long requestCount, long hitCount) {
217+
protected void registerMetric(
218+
LongSupplier heapSizeSupplier,
219+
LongSupplier requestCountSupplier,
220+
LongSupplier hitCountSupplier) {
217221
getRuntimeContext()
218222
.getMetricGroup()
219223
.<Double, Gauge<Double>>gauge(
220224
"topn.cache.hitRate",
221-
() ->
222-
requestCount == 0
223-
? 1.0
224-
: Long.valueOf(hitCount).doubleValue() / requestCount);
225+
() -> {
226+
long requests = requestCountSupplier.getAsLong();
227+
return requests == 0
228+
? 1.0
229+
: (double) hitCountSupplier.getAsLong() / requests;
230+
});
225231

226232
getRuntimeContext()
227233
.getMetricGroup()
228-
.<Long, Gauge<Long>>gauge("topn.cache.size", () -> heapSize);
234+
.<Long, Gauge<Long>>gauge("topn.cache.size", heapSizeSupplier::getAsLong);
229235
}
230236

231237
protected void collectInsert(
@@ -386,8 +392,8 @@ public void accHitCount() {
386392
hitCount++;
387393
}
388394

389-
protected void registerMetric(long heapSize) {
390-
topNFunction.registerMetric(heapSize, requestCount, hitCount);
395+
protected void registerMetric(LongSupplier heapSizeSupplier) {
396+
topNFunction.registerMetric(heapSizeSupplier, () -> requestCount, () -> hitCount);
391397
}
392398
}
393399
}

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ public void open(OpenContext openContext) throws Exception {
165165
dataState = getRuntimeContext().getMapState(mapStateDescriptor);
166166

167167
// metrics
168-
registerMetric(cacheSize);
168+
registerMetric(() -> kvRowKeyMap.size() * getDefaultTopNSize());
169169
}
170170

171171
@Override

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/utils/AppendOnlyTopNHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public AppendOnlyTopNHelper(AbstractTopNFunction topNFunction, long cacheSize, l
7070
}
7171

7272
public void registerMetric() {
73-
registerMetric(kvSortedMap.size() * topNSize);
73+
registerMetric(() -> kvSortedMap.size() * topNSize);
7474
}
7575

7676
@Nullable

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/utils/FastTop1Helper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,6 @@ public void flushAllCacheToState() throws Exception {
126126
public abstract void flushBufferToState(RowData currentKey, RowData value) throws Exception;
127127

128128
public void registerMetric() {
129-
registerMetric(kvCache.size() * topNSize);
129+
registerMetric(() -> kvCache.size() * topNSize);
130130
}
131131
}

flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunctionTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.flink.table.runtime.operators.rank;
2020

21+
import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
2122
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
2223
import org.apache.flink.table.data.RowData;
2324
import org.apache.flink.table.runtime.operators.rank.async.AsyncStateAppendOnlyTopNFunction;
@@ -29,6 +30,8 @@
2930

3031
import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord;
3132
import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
33+
import static org.assertj.core.api.Assertions.assertThat;
34+
import static org.junit.jupiter.api.Assumptions.assumeFalse;
3235

3336
/** Tests for {@link AppendOnlyTopNFunction} and {@link AsyncStateAppendOnlyTopNFunction}. */
3437
class AppendOnlyTopNFunctionTest extends TopNFunctionTestBase {
@@ -95,4 +98,33 @@ void testVariableRankRange() throws Exception {
9598
assertorWithoutRowNumber.assertOutputEquals(
9699
"output wrong.", expectedOutput, testHarness.getOutput());
97100
}
101+
102+
/**
103+
* Verifies that {@code topn.cache.size} reflects the live cache state instead of staying at 0
104+
* for {@link AppendOnlyTopNFunction}.
105+
*/
106+
@TestTemplate
107+
void testCacheMetricsReflectLiveState() throws Exception {
108+
// async harness does not allow injecting a custom metric group; sync path covers the fix.
109+
assumeFalse(enableAsyncState);
110+
AbstractTopNFunction func =
111+
createFunction(RankType.ROW_NUMBER, new ConstantRankRange(1, 2), true, false);
112+
InterceptingOperatorMetricGroup metricGroup = new InterceptingOperatorMetricGroup();
113+
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
114+
createTestHarnessWithMetrics(func, metricGroup);
115+
testHarness.open();
116+
// no partition cached yet -> cache size is 0.
117+
assertThat(readCacheSizeMetric(metricGroup)).isZero();
118+
// no requests issued yet -> hit rate falls back to 1.0.
119+
assertThat(readCacheHitRateMetric(metricGroup)).isEqualTo(1.0);
120+
121+
testHarness.processElement(insertRecord("book", 1L, 12));
122+
testHarness.processElement(insertRecord("book", 2L, 19));
123+
testHarness.processElement(insertRecord("fruit", 4L, 33));
124+
// 2 partitions cached -> live size = 2 * topN(2) = 4.
125+
assertThat(readCacheSizeMetric(metricGroup)).isEqualTo(4L);
126+
// hit rate is now driven by live counters and lies in [0.0, 1.0].
127+
assertThat(readCacheHitRateMetric(metricGroup)).isBetween(0.0, 1.0);
128+
testHarness.close();
129+
}
98130
}

flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/FastTop1FunctionTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.table.runtime.operators.rank;
2020

2121
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
22+
import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
2223
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
2324
import org.apache.flink.table.data.RowData;
2425
import org.apache.flink.table.runtime.operators.rank.async.AsyncStateFastTop1Function;
@@ -31,6 +32,8 @@
3132
import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
3233
import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
3334
import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateBeforeRecord;
35+
import static org.assertj.core.api.Assertions.assertThat;
36+
import static org.junit.jupiter.api.Assumptions.assumeFalse;
3437

3538
/** Tests for {@link FastTop1Function} and {@link AsyncStateFastTop1Function}. */
3639
public class FastTop1FunctionTest extends TopNFunctionTestBase {
@@ -376,4 +379,32 @@ void testConstantRankRangeWithOffset() throws Exception {
376379
void testOutputRankNumberWithVariableRankRange() throws Exception {
377380
// skip
378381
}
382+
383+
/**
384+
* Verifies that {@code topn.cache.size} reflects the live cache state instead of staying at 0
385+
* for {@link FastTop1Function}.
386+
*/
387+
@TestTemplate
388+
void testCacheMetricsReflectLiveState() throws Exception {
389+
// async harness does not allow injecting a custom metric group; sync path covers the fix.
390+
assumeFalse(enableAsyncState);
391+
AbstractTopNFunction func =
392+
createFunction(RankType.ROW_NUMBER, new ConstantRankRange(1, 1), true, false);
393+
InterceptingOperatorMetricGroup metricGroup = new InterceptingOperatorMetricGroup();
394+
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
395+
createTestHarnessWithMetrics(func, metricGroup);
396+
testHarness.open();
397+
// no partition cached yet -> cache size is 0.
398+
assertThat(readCacheSizeMetric(metricGroup)).isZero();
399+
// no requests issued yet -> hit rate falls back to 1.0.
400+
assertThat(readCacheHitRateMetric(metricGroup)).isEqualTo(1.0);
401+
402+
testHarness.processElement(insertRecord("book", 1L, 12));
403+
testHarness.processElement(insertRecord("fruit", 4L, 33));
404+
// 2 partitions cached -> live size = 2 * topN(1) = 2.
405+
assertThat(readCacheSizeMetric(metricGroup)).isEqualTo(2L);
406+
// hit rate is now driven by live counters and lies in [0.0, 1.0].
407+
assertThat(readCacheHitRateMetric(metricGroup)).isBetween(0.0, 1.0);
408+
testHarness.close();
409+
}
379410
}

flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/TopNFunctionTestBase.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,15 @@
1919
package org.apache.flink.table.runtime.operators.rank;
2020

2121
import org.apache.flink.api.common.state.StateTtlConfig;
22+
import org.apache.flink.metrics.Gauge;
2223
import org.apache.flink.runtime.asyncprocessing.operators.AsyncKeyedProcessOperator;
2324
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
25+
import org.apache.flink.runtime.jobgraph.OperatorID;
26+
import org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup;
27+
import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
28+
import org.apache.flink.runtime.metrics.util.InterceptingTaskMetricGroup;
29+
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
30+
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
2431
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
2532
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
2633
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -53,11 +60,13 @@
5360
import java.util.ArrayList;
5461
import java.util.Arrays;
5562
import java.util.List;
63+
import java.util.Map;
5664

5765
import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord;
5866
import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
5967
import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
6068
import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateBeforeRecord;
69+
import static org.assertj.core.api.Assertions.assertThat;
6170
import static org.assertj.core.api.Assertions.assertThatThrownBy;
6271
import static org.junit.jupiter.api.Assumptions.assumeFalse;
6372

@@ -425,4 +434,44 @@ abstract AbstractTopNFunction createFunction(
425434

426435
/** TODO remove this method after all rank function support async state. */
427436
abstract boolean supportedAsyncState();
437+
438+
// shared helpers for verifying cache metrics in subclasses.
439+
440+
static final String CACHE_SIZE_METRIC = "topn.cache.size";
441+
static final String CACHE_HIT_RATE_METRIC = "topn.cache.hitRate";
442+
443+
/** Creates a sync test harness that exposes the operator metric group for assertions. */
444+
OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarnessWithMetrics(
445+
AbstractTopNFunction rankFunction, InterceptingOperatorMetricGroup operatorMetricGroup)
446+
throws Exception {
447+
KeyedProcessOperator<RowData, RowData, RowData> operator =
448+
new KeyedProcessOperator<>(rankFunction);
449+
rankFunction.setKeyContext(operator);
450+
InterceptingTaskMetricGroup taskMetricGroup =
451+
new InterceptingTaskMetricGroup() {
452+
@Override
453+
public InternalOperatorMetricGroup getOrAddOperator(
454+
OperatorID id, String name, Map<String, String> additionalVariables) {
455+
return operatorMetricGroup;
456+
}
457+
};
458+
MockEnvironment environment =
459+
new MockEnvironmentBuilder().setMetricGroup(taskMetricGroup).build();
460+
return new KeyedOneInputStreamOperatorTestHarness<>(
461+
operator, keySelector, keySelector.getProducedType(), environment);
462+
}
463+
464+
@SuppressWarnings("unchecked")
465+
static long readCacheSizeMetric(InterceptingOperatorMetricGroup metricGroup) {
466+
Gauge<Long> gauge = (Gauge<Long>) metricGroup.get(CACHE_SIZE_METRIC);
467+
assertThat(gauge).as("topn.cache.size gauge should be registered").isNotNull();
468+
return gauge.getValue();
469+
}
470+
471+
@SuppressWarnings("unchecked")
472+
static double readCacheHitRateMetric(InterceptingOperatorMetricGroup metricGroup) {
473+
Gauge<Double> gauge = (Gauge<Double>) metricGroup.get(CACHE_HIT_RATE_METRIC);
474+
assertThat(gauge).as("topn.cache.hitRate gauge should be registered").isNotNull();
475+
return gauge.getValue();
476+
}
428477
}

flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunctionTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.flink.table.runtime.operators.rank;
2020

21+
import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
2122
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
2223
import org.apache.flink.table.data.RowData;
2324

@@ -30,6 +31,7 @@
3031
import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
3132
import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
3233
import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateBeforeRecord;
34+
import static org.assertj.core.api.Assertions.assertThat;
3335

3436
/** Tests for {@link UpdatableTopNFunction}. */
3537
class UpdatableTopNFunctionTest extends TopNFunctionTestBase {
@@ -251,4 +253,31 @@ void testSortKeyChangesWhenNotOutputRankNumberAndNotGenerateUpdateBefore() throw
251253
assertorWithRowNumber.assertOutputEquals(
252254
"output wrong.", expectedOutput, testHarness.getOutput());
253255
}
256+
257+
/**
258+
* Verifies that {@code topn.cache.size} reflects the live cache state instead of being stuck at
259+
* the configured {@code cacheSize} constant for {@link UpdatableTopNFunction}.
260+
*/
261+
@TestTemplate
262+
void testCacheMetricsReflectLiveState() throws Exception {
263+
AbstractTopNFunction func =
264+
createFunction(RankType.ROW_NUMBER, new ConstantRankRange(1, 2), true, false);
265+
InterceptingOperatorMetricGroup metricGroup = new InterceptingOperatorMetricGroup();
266+
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
267+
createTestHarnessWithMetrics(func, metricGroup);
268+
testHarness.open();
269+
// no partition cached yet -> live size is 0 (not the configured cacheSize).
270+
assertThat(readCacheSizeMetric(metricGroup)).isZero();
271+
// no requests issued yet -> hit rate falls back to 1.0.
272+
assertThat(readCacheHitRateMetric(metricGroup)).isEqualTo(1.0);
273+
274+
testHarness.processElement(insertRecord("book", 2L, 19));
275+
testHarness.processElement(insertRecord("book", 3L, 16));
276+
testHarness.processElement(insertRecord("fruit", 1L, 33));
277+
// 2 partitions cached -> live size = 2 * topN(2) = 4.
278+
assertThat(readCacheSizeMetric(metricGroup)).isEqualTo(4L);
279+
// hit rate is now driven by live counters and lies in [0.0, 1.0].
280+
assertThat(readCacheHitRateMetric(metricGroup)).isBetween(0.0, 1.0);
281+
testHarness.close();
282+
}
254283
}

0 commit comments

Comments
 (0)