Skip to content

Commit 836ce42

Browse files
committed
[HUDI-5066] Support flink hoodie source metaclient cache
1 parent b2a84ee commit 836ce42

2 files changed

Lines changed: 14 additions & 4 deletions

File tree

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public HoodieTableSource(
136136
List<String> partitionKeys,
137137
String defaultPartName,
138138
Configuration conf) {
139-
this(schema, path, partitionKeys, defaultPartName, conf, null, null, null, null);
139+
this(schema, path, partitionKeys, defaultPartName, conf, null, null, null, null, null);
140140
}
141141

142142
public HoodieTableSource(
@@ -148,7 +148,8 @@ public HoodieTableSource(
148148
@Nullable FileIndex fileIndex,
149149
@Nullable List<Map<String, String>> requiredPartitions,
150150
@Nullable int[] requiredPos,
151-
@Nullable Long limit) {
151+
@Nullable Long limit,
152+
@Nullable HoodieTableMetaClient metaClient) {
152153
this.schema = schema;
153154
this.tableRowType = (RowType) schema.toPhysicalRowDataType().notNull().getLogicalType();
154155
this.path = path;
@@ -164,7 +165,7 @@ public HoodieTableSource(
164165
: requiredPos;
165166
this.limit = limit == null ? NO_LIMIT_CONSTANT : limit;
166167
this.hadoopConf = HadoopConfigurations.getHadoopConf(conf);
167-
this.metaClient = StreamerUtil.metaClientForReader(conf, hadoopConf);
168+
this.metaClient = metaClient == null ? StreamerUtil.metaClientForReader(conf, hadoopConf) : metaClient;
168169
this.maxCompactionMemoryInBytes = StreamerUtil.getMaxCompactionMemoryInBytes(conf);
169170
}
170171

@@ -215,7 +216,7 @@ public ChangelogMode getChangelogMode() {
215216
@Override
216217
public DynamicTableSource copy() {
217218
return new HoodieTableSource(schema, path, partitionKeys, defaultPartName,
218-
conf, fileIndex, requiredPartitions, requiredPos, limit);
219+
conf, fileIndex, requiredPartitions, requiredPos, limit, metaClient);
219220
}
220221

221222
@Override

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hudi.table;
2020

21+
import org.apache.hudi.common.table.HoodieTableMetaClient;
2122
import org.apache.hudi.configuration.FlinkOptions;
2223
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
2324
import org.apache.hudi.utils.TestConfigurations;
@@ -148,6 +149,14 @@ void testDataSkippingFilterShouldBeNotNullWhenTableSourceIsCopied() {
148149
assertEquals(expectedFilters, actualFilters);
149150
}
150151

152+
@Test
153+
void testHoodieSourceCachedMetaClient() {
154+
HoodieTableSource tableSource = getEmptyStreamingSource();
155+
HoodieTableMetaClient metaClient = tableSource.getMetaClient();
156+
HoodieTableSource tableSourceCopy = (HoodieTableSource) tableSource.copy();
157+
assertThat(metaClient, is(tableSourceCopy.getMetaClient()));
158+
}
159+
151160
private HoodieTableSource getEmptyStreamingSource() {
152161
final String path = tempFile.getAbsolutePath();
153162
conf = TestConfigurations.getDefaultConf(path);

0 commit comments

Comments
 (0)