@@ -19,10 +19,10 @@ package org.apache.hudi
1919
2020import org .apache .commons .io .FileUtils
2121import org .apache .hadoop .fs .Path
22- import org .apache .hudi .DataSourceWriteOptions .{ INSERT_DROP_DUPS , INSERT_OPERATION_OPT_VAL , KEYGENERATOR_CLASS_NAME , MOR_TABLE_TYPE_OPT_VAL , OPERATION , PAYLOAD_CLASS_NAME , PRECOMBINE_FIELD , RECORDKEY_FIELD , TABLE_TYPE }
22+ import org .apache .hudi .DataSourceWriteOptions ._
2323import org .apache .hudi .client .SparkRDDWriteClient
2424import org .apache .hudi .common .config .HoodieConfig
25- import org .apache .hudi .common .model .{ HoodieFileFormat , HoodieRecord , HoodieRecordPayload , HoodieTableType , WriteOperationType }
25+ import org .apache .hudi .common .model ._
2626import org .apache .hudi .common .table .{HoodieTableConfig , HoodieTableMetaClient , TableSchemaResolver }
2727import org .apache .hudi .common .testutils .HoodieTestDataGenerator
2828import org .apache .hudi .config .{HoodieBootstrapConfig , HoodieWriteConfig }
@@ -35,10 +35,12 @@ import org.apache.hudi.testutils.DataSourceTestUtils
3535import org .apache .spark .SparkContext
3636import org .apache .spark .api .java .JavaSparkContext
3737import org .apache .spark .sql .functions .{expr , lit }
38+ import org .apache .spark .sql .hudi .HoodieSparkSessionExtension
39+ import org .apache .spark .sql .hudi .command .SqlKeyGenerator
3840import org .apache .spark .sql .internal .{SQLConf , StaticSQLConf }
3941import org .apache .spark .sql .{DataFrame , Dataset , Row , SQLContext , SaveMode , SparkSession }
4042import org .junit .jupiter .api .Assertions .{assertEquals , assertFalse , assertTrue , fail }
41- import org .junit .jupiter .api .{AfterEach , Assertions , BeforeEach , Test }
43+ import org .junit .jupiter .api .{AfterEach , BeforeEach , Test }
4244import org .junit .jupiter .params .ParameterizedTest
4345import org .junit .jupiter .params .provider .{CsvSource , EnumSource , ValueSource }
4446import org .mockito .ArgumentMatchers .any
@@ -47,15 +49,13 @@ import org.scalatest.Matchers.{assertResult, be, convertToAnyShouldWrapper, inte
4749
4850import java .time .Instant
4951import java .util .{Collections , Date , UUID }
50-
5152import scala .collection .JavaConversions ._
5253import scala .collection .JavaConverters
53- import scala .util .control .NonFatal
5454
5555/**
5656 * Test suite for SparkSqlWriter class.
5757 */
58- class HoodieSparkSqlWriterSuite {
58+ class TestHoodieSparkSqlWriter {
5959 var spark : SparkSession = _
6060 var sqlContext : SQLContext = _
6161 var sc : SparkContext = _
@@ -70,7 +70,7 @@ class HoodieSparkSqlWriterSuite {
7070 * Setup method running before each test.
7171 */
7272 @ BeforeEach
73- def setUp () {
73+ def setUp (): Unit = {
7474 initSparkContext()
7575 tempPath = java.nio.file.Files .createTempDirectory(" hoodie_test_path" )
7676 tempBootStrapPath = java.nio.file.Files .createTempDirectory(" hoodie_test_bootstrap" )
@@ -95,6 +95,7 @@ class HoodieSparkSqlWriterSuite {
9595 spark = SparkSession .builder()
9696 .appName(hoodieFooTableName)
9797 .master(" local[2]" )
98+ .withExtensions(new HoodieSparkSessionExtension )
9899 .config(" spark.serializer" , " org.apache.spark.serializer.KryoSerializer" )
99100 .getOrCreate()
100101 sc = spark.sparkContext
@@ -250,12 +251,14 @@ class HoodieSparkSqlWriterSuite {
250251 " hoodie.insert.shuffle.parallelism" -> " 4" , " hoodie.upsert.shuffle.parallelism" -> " 4" )
251252 val dataFrame2 = spark.createDataFrame(Seq (StringLongTest (UUID .randomUUID().toString, new Date ().getTime)))
252253 val tableAlreadyExistException = intercept[HoodieException ](HoodieSparkSqlWriter .write(sqlContext, SaveMode .Append , barTableModifier, dataFrame2))
253- assert(tableAlreadyExistException.getMessage.contains(" hoodie table with name " + hoodieFooTableName + " already exist" ))
254+ assert(tableAlreadyExistException.getMessage.contains(" Config conflict" ))
255+ assert(tableAlreadyExistException.getMessage.contains(s " ${HoodieWriteConfig .TBL_NAME .key}: \t hoodie_bar_tbl \t hoodie_foo_tbl " ))
254256
255257 // on same path try append with delete operation and different("hoodie_bar_tbl") table name which should throw an exception
256258 val deleteTableModifier = barTableModifier ++ Map (OPERATION .key -> " delete" )
257259 val deleteCmdException = intercept[HoodieException ](HoodieSparkSqlWriter .write(sqlContext, SaveMode .Append , deleteTableModifier, dataFrame2))
258- assert(deleteCmdException.getMessage.contains(" hoodie table with name " + hoodieFooTableName + " already exist" ))
260+ assert(tableAlreadyExistException.getMessage.contains(" Config conflict" ))
261+ assert(tableAlreadyExistException.getMessage.contains(s " ${HoodieWriteConfig .TBL_NAME .key}: \t hoodie_bar_tbl \t hoodie_foo_tbl " ))
259262 }
260263
261264 /**
@@ -266,7 +269,7 @@ class HoodieSparkSqlWriterSuite {
266269 @ ParameterizedTest
267270 @ EnumSource (value = classOf [BulkInsertSortMode ])
268271 def testBulkInsertForSortMode (sortMode : BulkInsertSortMode ): Unit = {
269- testBulkInsertWithSortMode(sortMode, true )
272+ testBulkInsertWithSortMode(sortMode, populateMetaFields = true )
270273 }
271274
272275 /**
@@ -287,12 +290,13 @@ class HoodieSparkSqlWriterSuite {
287290 @ Test
288291 def testDisableAndEnableMetaFields (): Unit = {
289292 try {
290- testBulkInsertWithSortMode(BulkInsertSortMode .NONE , false )
293+ testBulkInsertWithSortMode(BulkInsertSortMode .NONE , populateMetaFields = false )
291294 // create a new table
292295 val fooTableModifier = commonTableModifier.updated(" hoodie.bulkinsert.shuffle.parallelism" , " 4" )
293296 .updated(DataSourceWriteOptions .OPERATION .key, DataSourceWriteOptions .BULK_INSERT_OPERATION_OPT_VAL )
294297 .updated(DataSourceWriteOptions .ENABLE_ROW_WRITER .key, " true" )
295298 .updated(HoodieWriteConfig .BULK_INSERT_SORT_MODE .key(), BulkInsertSortMode .NONE .name())
299+ .updated(HoodieTableConfig .POPULATE_META_FIELDS .key(), " true" )
296300
297301 // generate the inserts
298302 val schema = DataSourceTestUtils .getStructTypeExampleSchema
@@ -302,9 +306,10 @@ class HoodieSparkSqlWriterSuite {
302306 try {
303307 // write to Hudi
304308 HoodieSparkSqlWriter .write(sqlContext, SaveMode .Append , fooTableModifier, df)
305- Assertions . fail(" Should have thrown exception" )
309+ fail(" Should have thrown exception" )
306310 } catch {
307- case e : HoodieException => assertTrue(e.getMessage.contains(" hoodie.populate.meta.fields already disabled for the table. Can't be re-enabled back" ))
311+ case e : HoodieException => assertTrue(e.getMessage.startsWith(" Config conflict" ))
312+ case e : Exception => fail(e);
308313 }
309314 }
310315 }
@@ -439,7 +444,7 @@ class HoodieSparkSqlWriterSuite {
439444 val records = DataSourceTestUtils .generateRandomRows(100 )
440445 val recordsSeq = convertRowListToSeq(records)
441446 val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
442- initializeMetaClientForBootstrap(fooTableParams, tableType, false )
447+ initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = false )
443448 val client = spy(DataSourceUtils .createHoodieClient(
444449 new JavaSparkContext (sc), modifiedSchema.toString, tempBasePath, hoodieFooTableName,
445450 mapAsJavaMap(fooTableParams)).asInstanceOf [SparkRDDWriteClient [HoodieRecordPayload [Nothing ]]])
@@ -496,7 +501,7 @@ class HoodieSparkSqlWriterSuite {
496501 DataSourceWriteOptions .PARTITIONPATH_FIELD .key -> " partition" ,
497502 HoodieBootstrapConfig .KEYGEN_CLASS_NAME .key -> classOf [NonpartitionedKeyGenerator ].getCanonicalName)
498503 val fooTableParams = HoodieWriterUtils .parametersWithWriteDefaults(fooTableModifier)
499- initializeMetaClientForBootstrap(fooTableParams, tableType, true )
504+ initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = true )
500505
501506 val client = spy(DataSourceUtils .createHoodieClient(
502507 new JavaSparkContext (sc),
@@ -526,7 +531,8 @@ class HoodieSparkSqlWriterSuite {
526531 .setTableType(tableType)
527532 .setTableName(hoodieFooTableName)
528533 .setRecordKeyFields(fooTableParams(DataSourceWriteOptions .RECORDKEY_FIELD .key))
529- .setBaseFileFormat(HoodieTableConfig .BASE_FILE_FORMAT .defaultValue().name())
534+ .setBaseFileFormat(fooTableParams.getOrElse(HoodieWriteConfig .BASE_FILE_FORMAT .key,
535+ HoodieTableConfig .BASE_FILE_FORMAT .defaultValue().name))
530536 .setArchiveLogFolder(HoodieTableConfig .ARCHIVELOG_FOLDER .defaultValue())
531537 .setPayloadClassName(fooTableParams(PAYLOAD_CLASS_NAME .key))
532538 .setPreCombineField(fooTableParams(PRECOMBINE_FIELD .key))
@@ -873,18 +879,15 @@ class HoodieSparkSqlWriterSuite {
873879
874880 val df2 = Seq ((2 , " a2" , 20 , 1000 , " 2021-10-16" )).toDF(" id" , " name" , " value" , " ts" , " dt" )
875881 // raise exception when use params which is not same with HoodieTableConfig
876- try {
882+ val configConflictException = intercept[ HoodieException ] {
877883 df2.write.format(" hudi" )
878884 .options(options)
879885 .option(HoodieWriteConfig .TBL_NAME .key, tableName2)
880886 .option(HoodieWriteConfig .KEYGENERATOR_CLASS_NAME .key, classOf [ComplexKeyGenerator ].getName)
881887 .mode(SaveMode .Append ).save(tablePath2)
882- } catch {
883- case NonFatal (e) =>
884- assert(e.getMessage.contains(" Config conflict" ))
885- assert(e.getMessage.contains(
886- s " ${HoodieWriteConfig .KEYGENERATOR_CLASS_NAME .key}\t ${classOf [ComplexKeyGenerator ].getName}\t ${classOf [SimpleKeyGenerator ].getName}" ))
887888 }
889+ assert(configConflictException.getMessage.contains(" Config conflict" ))
890+ assert(configConflictException.getMessage.contains(s " KeyGenerator: \t ${classOf [ComplexKeyGenerator ].getName}\t ${classOf [SimpleKeyGenerator ].getName}" ))
888891
889892 // do not need to specify hiveStylePartitioning/urlEncodePartitioning/KeyGenerator params
890893 df2.write.format(" hudi" )
@@ -893,6 +896,24 @@ class HoodieSparkSqlWriterSuite {
893896 .mode(SaveMode .Append ).save(tablePath2)
894897 val data = spark.read.format(" hudi" ).load(tablePath2 + " /*" )
895898 assert(data.count() == 2 )
896- assert(data.select(" _hoodie_partition_path" ).map(_.getString(0 )).distinct.collect.head == " dt=2021-10-16" )
899+ assert(data.select(" _hoodie_partition_path" ).map(_.getString(0 )).distinct.collect.head == " 2021-10-16" )
900+ }
901+
902+ @ Test
903+ def testGetOriginKeyGenerator (): Unit = {
904+ // for dataframe write
905+ val m1 = Map (
906+ HoodieWriteConfig .KEYGENERATOR_CLASS_NAME .key -> classOf [ComplexKeyGenerator ].getName
907+ )
908+ val kg1 = HoodieWriterUtils .getOriginKeyGenerator(m1)
909+ assertTrue(kg1 == classOf [ComplexKeyGenerator ].getName)
910+
911+ // for sql write
912+ val m2 = Map (
913+ HoodieWriteConfig .KEYGENERATOR_CLASS_NAME .key -> classOf [SqlKeyGenerator ].getName,
914+ SqlKeyGenerator .ORIGIN_KEYGEN_CLASS_NAME -> classOf [SimpleKeyGenerator ].getName
915+ )
916+ val kg2 = HoodieWriterUtils .getOriginKeyGenerator(m2)
917+ assertTrue(kg2 == classOf [SimpleKeyGenerator ].getName)
897918 }
898919}
0 commit comments