Skip to content

Commit bef3d62

Browse files
committed
[HUDI-5066] Support flink hoodie source metaclient cache
1 parent 3452876 commit bef3d62

2 files changed

Lines changed: 20 additions & 4 deletions

File tree

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public HoodieTableSource(
136136
List<String> partitionKeys,
137137
String defaultPartName,
138138
Configuration conf) {
139-
this(schema, path, partitionKeys, defaultPartName, conf, null, null, null, null);
139+
this(schema, path, partitionKeys, defaultPartName, conf, null, null, null, null, null);
140140
}
141141

142142
public HoodieTableSource(
@@ -148,7 +148,8 @@ public HoodieTableSource(
148148
@Nullable List<Map<String, String>> requiredPartitions,
149149
@Nullable int[] requiredPos,
150150
@Nullable Long limit,
151-
@Nullable List<ResolvedExpression> filters) {
151+
@Nullable List<ResolvedExpression> filters,
152+
@Nullable HoodieTableMetaClient metaClient) {
152153
this.schema = schema;
153154
this.tableRowType = (RowType) schema.toPhysicalRowDataType().notNull().getLogicalType();
154155
this.path = path;
@@ -162,7 +163,7 @@ public HoodieTableSource(
162163
this.limit = limit == null ? NO_LIMIT_CONSTANT : limit;
163164
this.filters = filters == null ? Collections.emptyList() : filters;
164165
this.hadoopConf = HadoopConfigurations.getHadoopConf(conf);
165-
this.metaClient = StreamerUtil.metaClientForReader(conf, hadoopConf);
166+
this.metaClient = Optional.ofNullable(metaClient).orElse(StreamerUtil.metaClientForReader(conf, hadoopConf));
166167
this.fileIndex = FileIndex.instance(this.path, this.conf, this.tableRowType);
167168
this.maxCompactionMemoryInBytes = StreamerUtil.getMaxCompactionMemoryInBytes(conf);
168169
}
@@ -212,7 +213,7 @@ public ChangelogMode getChangelogMode() {
212213
@Override
213214
public DynamicTableSource copy() {
214215
return new HoodieTableSource(schema, path, partitionKeys, defaultPartName,
215-
conf, requiredPartitions, requiredPos, limit, filters);
216+
conf, requiredPartitions, requiredPos, limit, filters, metaClient);
216217
}
217218

218219
@Override

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hudi.table;
2020

21+
import org.apache.hudi.common.table.HoodieTableMetaClient;
2122
import org.apache.hudi.configuration.FlinkOptions;
2223
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
2324
import org.apache.hudi.utils.TestConfigurations;
@@ -137,4 +138,18 @@ void testGetTableAvroSchema() {
137138
+ "uuid,name,age,ts,partition";
138139
assertThat(schemaFields, is(expected));
139140
}
141+
142+
@Test
143+
void testHoodieSourceCachedMetaClient() throws Exception {
144+
beforeEach();
145+
HoodieTableSource tableSource = new HoodieTableSource(
146+
TestConfigurations.TABLE_SCHEMA,
147+
new Path(tempFile.getPath()),
148+
Arrays.asList(conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",")),
149+
"default-par",
150+
conf);
151+
HoodieTableMetaClient metaClient = tableSource.getMetaClient();
152+
HoodieTableSource tableSourceCopy = (HoodieTableSource) tableSource.copy();
153+
assertThat(metaClient, is(tableSourceCopy.getMetaClient()));
154+
}
140155
}

0 commit comments

Comments
 (0)