Skip to content

Commit 31bc565

Browse files
authored
[HUDI-6804] Fix hive read schema evolution MOR table (#9573)
1 parent a3eea2f commit 31bc565

2 files changed

Lines changed: 93 additions & 77 deletions

File tree

hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public class SchemaEvolutionContext {
8282

8383
private final InputSplit split;
8484
private final JobConf job;
85-
private HoodieTableMetaClient metaClient;
85+
private final HoodieTableMetaClient metaClient;
8686
public Option<InternalSchema> internalSchemaOption;
8787

8888
public SchemaEvolutionContext(InputSplit split, JobConf job) throws IOException {
@@ -149,6 +149,7 @@ public void doEvolutionForRealtimeInputFormat(AbstractRealtimeRecordReader realt
149149
realtimeRecordReader.setWriterSchema(writerSchema);
150150
realtimeRecordReader.setReaderSchema(readerSchema);
151151
realtimeRecordReader.setHiveSchema(hiveSchema);
152+
internalSchemaOption = Option.of(prunedInternalSchema);
152153
RealtimeSplit realtimeSplit = (RealtimeSplit) split;
153154
LOG.info(String.format("About to read compacted logs %s for base split %s, projecting cols %s",
154155
realtimeSplit.getDeltaLogPaths(), realtimeSplit.getPath(), requiredColumns));
@@ -171,7 +172,7 @@ public void doEvolutionForParquetFormat() {
171172
if (!disableSchemaEvolution) {
172173
prunedSchema = InternalSchemaUtils.pruneInternalSchema(internalSchemaOption.get(), requiredColumns);
173174
InternalSchema querySchema = prunedSchema;
174-
Long commitTime = Long.valueOf(FSUtils.getCommitTime(finalPath.getName()));
175+
long commitTime = Long.parseLong(FSUtils.getCommitTime(finalPath.getName()));
175176
InternalSchema fileSchema = InternalSchemaCache.searchSchemaAndCache(commitTime, metaClient, false);
176177
InternalSchema mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchema, true,
177178
true).mergeSchema();
@@ -258,10 +259,10 @@ private TypeInfo constructHiveSchemaFromType(Type type, TypeInfo typeInfo) {
258259
case DECIMAL:
259260
return typeInfo;
260261
case TIME:
261-
throw new UnsupportedOperationException(String.format("cannot convert %s type to hive", new Object[] { type }));
262+
throw new UnsupportedOperationException(String.format("cannot convert %s type to hive", type));
262263
default:
263-
LOG.error(String.format("cannot convert unknown type: %s to Hive", new Object[] { type }));
264-
throw new UnsupportedOperationException(String.format("cannot convert unknown type: %s to Hive", new Object[] { type }));
264+
LOG.error(String.format("cannot convert unknown type: %s to Hive", type));
265+
throw new UnsupportedOperationException(String.format("cannot convert unknown type: %s to Hive", type));
265266
}
266267
}
267268

hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java

Lines changed: 87 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -19,39 +19,46 @@
1919
package org.apache.hudi.functional;
2020

2121
import org.apache.hudi.HoodieSparkUtils;
22-
import org.apache.hudi.common.fs.FSUtils;
2322
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
24-
import org.apache.hudi.hadoop.SchemaEvolutionContext;
25-
import org.apache.hudi.hadoop.realtime.HoodieEmptyRecordReader;
26-
import org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader;
27-
import org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader;
28-
import org.apache.hudi.hadoop.realtime.RealtimeSplit;
23+
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
2924

30-
import com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat;
3125
import org.apache.hadoop.fs.Path;
32-
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
3326
import org.apache.hadoop.hive.serde.serdeConstants;
3427
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
28+
import org.apache.hadoop.io.ArrayWritable;
29+
import org.apache.hadoop.io.DoubleWritable;
30+
import org.apache.hadoop.io.NullWritable;
31+
import org.apache.hadoop.io.Text;
32+
import org.apache.hadoop.io.Writable;
3533
import org.apache.hadoop.mapred.FileInputFormat;
3634
import org.apache.hadoop.mapred.InputSplit;
3735
import org.apache.hadoop.mapred.JobConf;
3836
import org.apache.hadoop.mapred.RecordReader;
3937
import org.apache.spark.SparkConf;
4038
import org.apache.spark.sql.SparkSession;
39+
import org.junit.jupiter.api.AfterEach;
4140
import org.junit.jupiter.api.BeforeEach;
4241
import org.junit.jupiter.api.Tag;
43-
import org.junit.jupiter.api.Test;
4442
import org.junit.jupiter.api.io.TempDir;
43+
import org.junit.jupiter.params.ParameterizedTest;
44+
import org.junit.jupiter.params.provider.ValueSource;
4545

46+
import java.io.IOException;
47+
import java.util.ArrayList;
48+
import java.util.Arrays;
4649
import java.util.Date;
50+
import java.util.List;
51+
import java.util.Objects;
52+
import java.util.stream.Collectors;
4753

4854
import static org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest;
4955
import static org.junit.jupiter.api.Assertions.assertEquals;
56+
import static org.junit.jupiter.api.Assertions.assertTrue;
5057

5158
@Tag("functional")
5259
public class TestHiveTableSchemaEvolution {
5360

54-
private SparkSession sparkSession = null;
61+
private SparkSession spark = null;
5562

5663
@TempDir
5764
java.nio.file.Path basePath;
@@ -61,90 +68,98 @@ public void setUp() {
6168
initSparkContexts("HiveSchemaEvolution");
6269
}
6370

71+
@AfterEach
72+
public void clean() {
73+
if (spark != null) {
74+
spark.close();
75+
}
76+
}
77+
6478
private void initSparkContexts(String appName) {
6579
SparkConf sparkConf = getSparkConfForTest(appName);
6680

67-
sparkSession = SparkSession.builder()
81+
spark = SparkSession.builder()
6882
.config("hoodie.support.write.lock", "false")
6983
.config("spark.sql.session.timeZone", "CTT")
7084
.config("spark.sql.hive.convertMetastoreParquet", "false")
7185
.config(sparkConf)
7286
.getOrCreate();
7387

74-
sparkSession.sparkContext().setLogLevel("ERROR");
88+
spark.sparkContext().setLogLevel("ERROR");
7589
}
7690

77-
@Test
78-
public void testCopyOnWriteTableForHive() throws Exception {
79-
String tableName = "huditest" + new Date().getTime();
91+
@ParameterizedTest
92+
@ValueSource(strings = {"cow", "mor"})
93+
public void testHiveReadSchemaEvolutionTable(String tableType) throws Exception {
8094
if (HoodieSparkUtils.gteqSpark3_1()) {
81-
sparkSession.sql("set hoodie.schema.on.read.enable=true");
95+
String tableName = "hudi_test" + new Date().getTime();
8296
String path = new Path(basePath.toAbsolutePath().toString()).toUri().toString();
83-
sparkSession.sql("create table " + tableName + "(col0 int, col1 float, col2 string) using hudi options(type='cow', primaryKey='col0', preCombineField='col1') location '" + path + "'");
84-
sparkSession.sql("insert into " + tableName + " values(1, 1.1, 'text')");
85-
sparkSession.sql("alter table " + tableName + " alter column col1 type double");
86-
sparkSession.sql("alter table " + tableName + " rename column col2 to aaa");
8797

88-
HoodieParquetInputFormat inputFormat = new HoodieParquetInputFormat();
98+
spark.sql("set hoodie.schema.on.read.enable=true");
99+
spark.sql(String.format("create table %s (col0 int, col1 float, col2 string) using hudi "
100+
+ "tblproperties (type='%s', primaryKey='col0', preCombineField='col1') location '%s'",
101+
tableName, tableType, path));
102+
spark.sql(String.format("insert into %s values(1, 1.1, 'text')", tableName));
103+
spark.sql(String.format("update %s set col2 = 'text2' where col0 = 1", tableName));
104+
spark.sql(String.format("alter table %s alter column col1 type double", tableName));
105+
spark.sql(String.format("alter table %s rename column col2 to col2_new", tableName));
106+
89107
JobConf jobConf = new JobConf();
90-
inputFormat.setConf(jobConf);
108+
jobConf.set(ColumnProjectionUtils.READ_ALL_COLUMNS, "false");
109+
jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "col1,col2_new");
110+
jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "6,7");
111+
jobConf.set(serdeConstants.LIST_COLUMNS, "_hoodie_commit_time,_hoodie_commit_seqno,"
112+
+ "_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,col0,col1,col2_new");
113+
jobConf.set(serdeConstants.LIST_COLUMN_TYPES, "string,string,string,string,string,int,double,string");
91114
FileInputFormat.setInputPaths(jobConf, path);
92-
InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
93-
assertEvolutionResult("cow", splits[0], jobConf);
94-
}
95-
}
96-
97-
@Test
98-
public void testMergeOnReadTableForHive() throws Exception {
99-
String tableName = "huditest" + new Date().getTime();
100-
if (HoodieSparkUtils.gteqSpark3_1()) {
101-
sparkSession.sql("set hoodie.schema.on.read.enable=true");
102-
String path = new Path(basePath.toAbsolutePath().toString()).toUri().toString();
103-
sparkSession.sql("create table " + tableName + "(col0 int, col1 float, col2 string) using hudi options(type='cow', primaryKey='col0', preCombineField='col1') location '" + path + "'");
104-
sparkSession.sql("insert into " + tableName + " values(1, 1.1, 'text')");
105-
sparkSession.sql("insert into " + tableName + " values(2, 1.2, 'text2')");
106-
sparkSession.sql("alter table " + tableName + " alter column col1 type double");
107-
sparkSession.sql("alter table " + tableName + " rename column col2 to aaa");
108115

109-
HoodieRealtimeInputFormat inputFormat = new HoodieRealtimeInputFormat();
110-
JobConf jobConf = new JobConf();
116+
HoodieParquetInputFormat inputFormat = "cow".equals(tableType) ? new HoodieParquetInputFormat()
117+
: new HoodieParquetRealtimeInputFormat();
111118
inputFormat.setConf(jobConf);
112-
FileInputFormat.setInputPaths(jobConf, path);
113-
InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
114-
assertEvolutionResult("mor", splits[0], jobConf);
115-
}
116-
}
117119

118-
private void assertEvolutionResult(String tableType, InputSplit split, JobConf jobConf) throws Exception {
119-
jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "col1,aaa");
120-
jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "6,7");
121-
jobConf.set(serdeConstants.LIST_COLUMNS, "_hoodie_commit_time,_hoodie_commit_seqno,"
122-
+ "_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,col0,col1,aaa");
123-
jobConf.set(serdeConstants.LIST_COLUMN_TYPES, "string,string,string,string,string,int,double,string");
124-
125-
SchemaEvolutionContext schemaEvolutionContext = new SchemaEvolutionContext(split, jobConf);
126-
if ("cow".equals(tableType)) {
127-
schemaEvolutionContext.doEvolutionForParquetFormat();
128-
} else {
129-
// mot table
130-
RealtimeSplit realtimeSplit = (RealtimeSplit) split;
131-
RecordReader recordReader;
132-
// for log only split, set the parquet reader as empty.
133-
if (FSUtils.isLogFile(realtimeSplit.getPath())) {
134-
recordReader = new HoodieRealtimeRecordReader(realtimeSplit, jobConf, new HoodieEmptyRecordReader(realtimeSplit, jobConf));
120+
InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
121+
assertEquals(1, splits.length);
122+
123+
RecordReader<NullWritable, ArrayWritable> recordReader = inputFormat.getRecordReader(splits[0], jobConf, null);
124+
List<List<Writable>> records = getWritableList(recordReader);
125+
assertEquals(1, records.size());
126+
List<Writable> record1 = records.get(0);
127+
if ("cow".equals(tableType)) {
128+
// col1, col2_new
129+
assertEquals(2, record1.size());
130+
131+
Writable c1 = record1.get(0);
132+
assertTrue(c1 instanceof DoubleWritable);
133+
assertEquals("1.1", c1.toString().substring(0, 3));
134+
135+
Writable c2 = record1.get(1);
136+
assertTrue(c2 instanceof Text);
137+
assertEquals("text2", c2.toString());
135138
} else {
136-
// create a RecordReader to be used by HoodieRealtimeRecordReader
137-
recordReader = new MapredParquetInputFormat().getRecordReader(realtimeSplit, jobConf, null);
139+
// _hoodie_record_key,_hoodie_commit_time,_hoodie_partition_path, col1, col2_new
140+
assertEquals(5, record1.size());
141+
142+
Writable c1 = record1.get(3);
143+
assertTrue(c1 instanceof DoubleWritable);
144+
assertEquals("1.1", c1.toString().substring(0, 3));
145+
146+
Writable c2 = record1.get(4);
147+
assertTrue(c2 instanceof Text);
148+
assertEquals("text2", c2.toString());
138149
}
139-
RealtimeCompactedRecordReader realtimeCompactedRecordReader = new RealtimeCompactedRecordReader(realtimeSplit, jobConf, recordReader);
140-
// mor table also run with doEvolutionForParquetFormat in HoodieParquetInputFormat
141-
schemaEvolutionContext.doEvolutionForParquetFormat();
142-
schemaEvolutionContext.doEvolutionForRealtimeInputFormat(realtimeCompactedRecordReader);
150+
recordReader.close();
143151
}
152+
}
144153

145-
assertEquals(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR), "col1,col2");
146-
assertEquals(jobConf.get(serdeConstants.LIST_COLUMNS), "_hoodie_commit_time,_hoodie_commit_seqno,"
147-
+ "_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,col0,col1,col2");
148-
assertEquals(jobConf.get(serdeConstants.LIST_COLUMN_TYPES), "string,string,string,string,string,int,double,string");
154+
private List<List<Writable>> getWritableList(RecordReader<NullWritable, ArrayWritable> recordReader) throws IOException {
155+
List<List<Writable>> records = new ArrayList<>();
156+
NullWritable key = recordReader.createKey();
157+
ArrayWritable writable = recordReader.createValue();
158+
while (writable != null && recordReader.next(key, writable)) {
159+
records.add(Arrays.stream(writable.get())
160+
.filter(Objects::nonNull)
161+
.collect(Collectors.toList()));
162+
}
163+
return records;
149164
}
150165
}

0 commit comments

Comments
 (0)