Skip to content

Commit a5e5c56

Browse files
committed
[HUDI-4529] Tweak some default config options for flink
1 parent 642f87c commit a5e5c56

4 files changed

Lines changed: 38 additions & 31 deletions

File tree

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@
2222
import org.apache.hudi.common.config.ConfigClassProperty;
2323
import org.apache.hudi.common.config.ConfigGroups;
2424
import org.apache.hudi.common.config.HoodieConfig;
25+
import org.apache.hudi.common.model.EventTimeAvroPayload;
2526
import org.apache.hudi.common.model.HoodieCleaningPolicy;
2627
import org.apache.hudi.common.model.HoodieTableType;
27-
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
2828
import org.apache.hudi.config.HoodieIndexConfig;
2929
import org.apache.hudi.config.HoodieWriteConfig;
3030
import org.apache.hudi.exception.HoodieException;
31-
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
31+
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
3232
import org.apache.hudi.index.HoodieIndex;
3333
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
3434
import org.apache.hudi.keygen.constant.KeyGeneratorType;
@@ -287,7 +287,7 @@ private FlinkOptions() {
287287
public static final ConfigOption<String> PAYLOAD_CLASS_NAME = ConfigOptions
288288
.key("write.payload.class")
289289
.stringType()
290-
.defaultValue(OverwriteWithLatestAvroPayload.class.getName())
290+
.defaultValue(EventTimeAvroPayload.class.getName())
291291
.withDescription("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting.\n"
292292
+ "This will render any value set for the option in-effective");
293293

@@ -718,7 +718,7 @@ private FlinkOptions() {
718718
public static final ConfigOption<String> HIVE_SYNC_MODE = ConfigOptions
719719
.key("hive_sync.mode")
720720
.stringType()
721-
.defaultValue("jdbc")
721+
.defaultValue("hms")
722722
.withDescription("Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql, default 'jdbc'");
723723

724724
public static final ConfigOption<String> HIVE_SYNC_USERNAME = ConfigOptions
@@ -754,7 +754,7 @@ private FlinkOptions() {
754754
public static final ConfigOption<String> HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME = ConfigOptions
755755
.key("hive_sync.partition_extractor_class")
756756
.stringType()
757-
.defaultValue(SlashEncodedDayPartitionValueExtractor.class.getCanonicalName())
757+
.defaultValue(MultiPartKeysValueExtractor.class.getName())
758758
.withDescription("Tool to extract the partition value from HDFS path, "
759759
+ "default 'SlashEncodedDayPartitionValueExtractor'");
760760

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.apache.hudi.common.model.WriteOperationType;
2525
import org.apache.hudi.common.util.StringUtils;
2626
import org.apache.hudi.configuration.FlinkOptions;
27-
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
27+
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
2828
import org.apache.hudi.keygen.constant.KeyGeneratorType;
2929
import org.apache.hudi.util.FlinkStateBackendConverter;
3030
import org.apache.hudi.util.StreamerUtil;
@@ -321,8 +321,8 @@ public class FlinkStreamerConfig extends Configuration {
321321
public String hiveSyncPartitionFields = "";
322322

323323
@Parameter(names = {"--hive-sync-partition-extractor-class"}, description = "Tool to extract the partition value from HDFS path, "
324-
+ "default 'SlashEncodedDayPartitionValueExtractor'")
325-
public String hiveSyncPartitionExtractorClass = SlashEncodedDayPartitionValueExtractor.class.getCanonicalName();
324+
+ "default 'MultiPartKeysValueExtractor'")
325+
public String hiveSyncPartitionExtractorClass = MultiPartKeysValueExtractor.class.getCanonicalName();
326326

327327
@Parameter(names = {"--hive-sync-assume-date-partitioning"}, description = "Assume partitioning is yyyy/mm/dd, default false")
328328
public Boolean hiveSyncAssumeDatePartition = false;

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,10 @@
1919
package org.apache.hudi.table;
2020

2121
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
22-
import org.apache.hudi.common.model.EventTimeAvroPayload;
2322
import org.apache.hudi.common.util.StringUtils;
2423
import org.apache.hudi.configuration.FlinkOptions;
2524
import org.apache.hudi.configuration.OptionsResolver;
2625
import org.apache.hudi.exception.HoodieValidationException;
27-
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
2826
import org.apache.hudi.index.HoodieIndex;
2927
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
3028
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
@@ -38,6 +36,7 @@
3836
import org.apache.flink.table.api.ValidationException;
3937
import org.apache.flink.table.api.constraints.UniqueConstraint;
4038
import org.apache.flink.table.catalog.CatalogTable;
39+
import org.apache.flink.table.catalog.ObjectIdentifier;
4140
import org.apache.flink.table.catalog.ResolvedSchema;
4241
import org.apache.flink.table.connector.sink.DynamicTableSink;
4342
import org.apache.flink.table.connector.source.DynamicTableSource;
@@ -71,7 +70,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
7170
Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
7271
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
7372
sanityCheck(conf, schema);
74-
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);
73+
setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema);
7574

7675
Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
7776
new ValidationException("Option [path] should not be empty.")));
@@ -90,7 +89,7 @@ public DynamicTableSink createDynamicTableSink(Context context) {
9089
"Option [path] should not be empty.");
9190
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
9291
sanityCheck(conf, schema);
93-
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);
92+
setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema);
9493
return new HoodieTableSink(conf, schema);
9594
}
9695

@@ -154,35 +153,30 @@ private void sanityCheck(Configuration conf, ResolvedSchema schema) {
154153
throw new HoodieValidationException("Field " + preCombineField + " does not exist in the table schema."
155154
+ "Please check '" + FlinkOptions.PRECOMBINE_FIELD.key() + "' option.");
156155
}
157-
} else if (FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.PAYLOAD_CLASS_NAME)) {
158-
// if precombine field is specified but payload clazz is default,
159-
// use DefaultHoodieRecordPayload to make sure the precombine field is always taken for
160-
// comparing.
161-
conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, EventTimeAvroPayload.class.getName());
162156
}
163157
}
164158

165159
/**
166-
* Sets up the config options based on the table definition, for e.g the table name, primary key.
160+
* Sets up the config options based on the table definition, for e.g, the table name, primary key.
167161
*
168-
* @param conf The configuration to setup
169-
* @param tableName The table name
162+
* @param conf The configuration to set up
163+
* @param tablePath The table path
170164
* @param table The catalog table
171165
* @param schema The physical schema
172166
*/
173167
private static void setupConfOptions(
174168
Configuration conf,
175-
String tableName,
169+
ObjectIdentifier tablePath,
176170
CatalogTable table,
177171
ResolvedSchema schema) {
178172
// table name
179-
conf.setString(FlinkOptions.TABLE_NAME.key(), tableName);
173+
conf.setString(FlinkOptions.TABLE_NAME.key(), tablePath.getObjectName());
180174
// hoodie key about options
181175
setupHoodieKeyOptions(conf, table);
182176
// compaction options
183177
setupCompactionOptions(conf);
184178
// hive options
185-
setupHiveOptions(conf);
179+
setupHiveOptions(conf, tablePath);
186180
// read options
187181
setupReadOptions(conf);
188182
// write options
@@ -309,10 +303,12 @@ private static void setupCompactionOptions(Configuration conf) {
309303
/**
310304
* Sets up the hive options from the table definition.
311305
*/
312-
private static void setupHiveOptions(Configuration conf) {
313-
if (!conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING)
314-
&& FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME)) {
315-
conf.setString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME, MultiPartKeysValueExtractor.class.getName());
306+
private static void setupHiveOptions(Configuration conf, ObjectIdentifier tablePath) {
307+
if (!conf.contains(FlinkOptions.HIVE_SYNC_DB)) {
308+
conf.setString(FlinkOptions.HIVE_SYNC_DB, tablePath.getDatabaseName());
309+
}
310+
if (!conf.contains(FlinkOptions.HIVE_SYNC_TABLE)) {
311+
conf.setString(FlinkOptions.HIVE_SYNC_TABLE, tablePath.getObjectName());
316312
}
317313
}
318314

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.hudi.configuration.FlinkOptions;
2424
import org.apache.hudi.exception.HoodieValidationException;
2525
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
26-
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
2726
import org.apache.hudi.index.HoodieIndex;
2827
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
2928
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
@@ -240,15 +239,21 @@ void testSetupHiveOptionsForSource() {
240239
final MockContext sourceContext1 = MockContext.getInstance(this.conf, schema1, "f2");
241240
final HoodieTableSource tableSource1 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext1);
242241
final Configuration conf1 = tableSource1.getConf();
242+
assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_DB), is("db1"));
243+
assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_TABLE), is("t1"));
243244
assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME), is(MultiPartKeysValueExtractor.class.getName()));
244245

245246
// set up hive style partitioning is true.
247+
this.conf.setString(FlinkOptions.HIVE_SYNC_DB, "db2");
248+
this.conf.setString(FlinkOptions.HIVE_SYNC_TABLE, "t2");
246249
this.conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, true);
247250

248251
final MockContext sourceContext2 = MockContext.getInstance(this.conf, schema1, "f2");
249252
final HoodieTableSource tableSource2 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext2);
250253
final Configuration conf2 = tableSource2.getConf();
251-
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME), is(SlashEncodedDayPartitionValueExtractor.class.getName()));
254+
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_DB), is("db2"));
255+
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_TABLE), is("t2"));
256+
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME), is(MultiPartKeysValueExtractor.class.getName()));
252257
}
253258

254259
@Test
@@ -430,15 +435,21 @@ void testSetupHiveOptionsForSink() {
430435
final MockContext sinkContext1 = MockContext.getInstance(this.conf, schema1, "f2");
431436
final HoodieTableSink tableSink1 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sinkContext1);
432437
final Configuration conf1 = tableSink1.getConf();
438+
assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_DB), is("db1"));
439+
assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_TABLE), is("t1"));
433440
assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME), is(MultiPartKeysValueExtractor.class.getName()));
434441

435442
// set up hive style partitioning is true.
443+
this.conf.setString(FlinkOptions.HIVE_SYNC_DB, "db2");
444+
this.conf.setString(FlinkOptions.HIVE_SYNC_TABLE, "t2");
436445
this.conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, true);
437446

438447
final MockContext sinkContext2 = MockContext.getInstance(this.conf, schema1, "f2");
439448
final HoodieTableSink tableSink2 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sinkContext2);
440449
final Configuration conf2 = tableSink2.getConf();
441-
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME), is(SlashEncodedDayPartitionValueExtractor.class.getName()));
450+
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_DB), is("db2"));
451+
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_TABLE), is("t2"));
452+
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME), is(MultiPartKeysValueExtractor.class.getName()));
442453
}
443454

444455
@Test
@@ -542,7 +553,7 @@ static MockContext getInstance(Configuration conf, ResolvedSchema schema, List<S
542553

543554
@Override
544555
public ObjectIdentifier getObjectIdentifier() {
545-
return ObjectIdentifier.of("hudi", "default", "t1");
556+
return ObjectIdentifier.of("hudi", "db1", "t1");
546557
}
547558

548559
@Override

0 commit comments

Comments
 (0)