Skip to content

Commit 3f82768

Browse files
author
xiaoxingstack
committed
[HUDI-4813] Fix infer keygen not work in sparksql side issue
1 parent 5d9db86 commit 3f82768

2 files changed

Lines changed: 11 additions & 3 deletions

File tree

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -787,9 +787,13 @@ object DataSourceOptionsHelper {
787787

788788
def inferKeyGenClazz(props: TypedProperties): String = {
789789
val partitionFields = props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), null)
790+
val recordsKeyFields = props.getString(DataSourceWriteOptions.RECORDKEY_FIELD.key(), DataSourceWriteOptions.RECORDKEY_FIELD.defaultValue())
791+
genKeyGenerator(recordsKeyFields, partitionFields)
792+
}
793+
794+
def genKeyGenerator(recordsKeyFields: String, partitionFields: String): String = {
790795
if (partitionFields != null) {
791796
val numPartFields = partitionFields.split(",").length
792-
val recordsKeyFields = props.getString(DataSourceWriteOptions.RECORDKEY_FIELD.key(), DataSourceWriteOptions.RECORDKEY_FIELD.defaultValue())
793797
val numRecordKeyFields = recordsKeyFields.split(",").length
794798
if (numPartFields == 1 && numRecordKeyFields == 1) {
795799
classOf[SimpleKeyGenerator].getName

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.catalyst.catalog
1919

20-
import org.apache.hudi.AvroConversionUtils
20+
import org.apache.hudi.{AvroConversionUtils, DataSourceOptionsHelper}
2121
import org.apache.hudi.DataSourceWriteOptions.OPERATION
2222
import org.apache.hudi.HoodieWriterUtils._
2323
import org.apache.hudi.common.config.DFSPropertiesConfiguration
@@ -30,6 +30,7 @@ import org.apache.spark.internal.Logging
3030
import org.apache.spark.sql.avro.SchemaConverters
3131
import org.apache.spark.sql.catalyst.TableIdentifier
3232
import org.apache.spark.sql.hudi.HoodieOptionConfig
33+
import org.apache.spark.sql.hudi.HoodieOptionConfig.SQL_KEY_TABLE_PRIMARY_KEY
3334
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
3435
import org.apache.spark.sql.types.{StructField, StructType}
3536
import org.apache.spark.sql.{AnalysisException, SparkSession}
@@ -288,7 +289,10 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten
288289
HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(
289290
originTableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key))
290291
} else {
291-
extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = classOf[ComplexKeyGenerator].getCanonicalName
292+
val primaryKeys = table.properties.get(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName).get
293+
val partitions = table.partitionColumnNames.mkString(",")
294+
extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) =
295+
DataSourceOptionsHelper.genKeyGenerator(primaryKeys, partitions)
292296
}
293297
extraConfig.toMap
294298
}

0 commit comments

Comments
 (0)