Skip to content

Commit 642f87c

Browse files
wuwenchi吴文池
andauthored
[HUDI-4601] Read error from MOR table after compaction with timestamp partitioning (#6365)
* read error from mor after compaction Co-authored-by: 吴文池 <wuwenchi@deepexi.com>
1 parent 8c02e90 commit 642f87c

3 files changed

Lines changed: 39 additions & 12 deletions

File tree

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,14 @@ public void open(FileInputSplit fileSplit) throws IOException {
108108
LinkedHashMap<String, String> partSpec = PartitionPathUtils.extractPartitionSpecFromPath(
109109
fileSplit.getPath());
110110
LinkedHashMap<String, Object> partObjects = new LinkedHashMap<>();
111-
partSpec.forEach((k, v) -> partObjects.put(k, DataTypeUtils.resolvePartition(
112-
partDefaultName.equals(v) ? null : v,
113-
fullFieldTypes[fieldNameList.indexOf(k)])));
111+
partSpec.forEach((k, v) -> {
112+
DataType fieldType = fullFieldTypes[fieldNameList.indexOf(k)];
113+
if (!DataTypeUtils.isDatetimeType(fieldType)) {
114+
// date time type partition field is formatted specifically,
115+
// read directly from the data file to avoid format mismatch or precision loss
116+
partObjects.put(k, DataTypeUtils.resolvePartition(partDefaultName.equals(v) ? null : v, fieldType));
117+
}
118+
});
114119

115120
this.reader = ParquetSplitReaderUtil.genPartColumnarRowReader(
116121
utcTimestamp,

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -299,9 +299,14 @@ private ParquetColumnarRowSplitReader getReader(String path, int[] requiredPos)
299299
this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING),
300300
FilePathUtils.extractPartitionKeys(this.conf));
301301
LinkedHashMap<String, Object> partObjects = new LinkedHashMap<>();
302-
partSpec.forEach((k, v) -> partObjects.put(k, DataTypeUtils.resolvePartition(
303-
defaultPartName.equals(v) ? null : v,
304-
fieldTypes.get(fieldNames.indexOf(k)))));
302+
partSpec.forEach((k, v) -> {
303+
DataType fieldType = fieldTypes.get(fieldNames.indexOf(k));
304+
if (!DataTypeUtils.isDatetimeType(fieldType)) {
305+
// date time type partition field is formatted specifically,
306+
// read directly from the data file to avoid format mismatch or precision loss
307+
partObjects.put(k, DataTypeUtils.resolvePartition(defaultPartName.equals(v) ? null : v, fieldType));
308+
}
309+
});
305310

306311
return ParquetSplitReaderUtil.genPartColumnarRowReader(
307312
this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE),

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1041,15 +1041,12 @@ void testAppendWrite(boolean clustering) {
10411041
}
10421042

10431043
@ParameterizedTest
1044-
@EnumSource(value = ExecMode.class)
1045-
void testWriteAndReadWithTimestampPartitioning(ExecMode execMode) {
1046-
// can not read the hive style and timestamp based partitioning table
1047-
// in batch mode, the code path in CopyOnWriteInputFormat relies on
1048-
// the value on the partition path to recover the partition value,
1049-
// but the date format has changed(milliseconds switch to hours).
1044+
@MethodSource("executionModeAndPartitioningParams")
1045+
void testWriteAndReadWithTimestampPartitioning(ExecMode execMode, boolean hiveStylePartitioning) {
10501046
TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv;
10511047
String hoodieTableDDL = sql("t1")
10521048
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
1049+
.option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
10531050
.partitionField("ts") // use timestamp as partition path field
10541051
.end();
10551052
tableEnv.executeSql(hoodieTableDDL);
@@ -1068,6 +1065,26 @@ void testWriteAndReadWithTimestampPartitioning(ExecMode execMode) {
10681065
+ "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
10691066
}
10701067

1068+
@Test
1069+
void testMergeOnReadCompactionWithTimestampPartitioning() {
1070+
TableEnvironment tableEnv = batchTableEnv;
1071+
1072+
String hoodieTableDDL = sql("t1")
1073+
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
1074+
.option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
1075+
.option(FlinkOptions.COMPACTION_DELTA_COMMITS, 1)
1076+
.option(FlinkOptions.COMPACTION_TASKS, 1)
1077+
.partitionField("ts")
1078+
.end();
1079+
tableEnv.executeSql(hoodieTableDDL);
1080+
execInsertSql(tableEnv, TestSQL.INSERT_T1);
1081+
1082+
List<Row> rows = CollectionUtil.iterableToList(
1083+
() -> tableEnv.sqlQuery("select * from t1").execute().collect());
1084+
1085+
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
1086+
}
1087+
10711088
@ParameterizedTest
10721089
@ValueSource(strings = {FlinkOptions.PARTITION_FORMAT_DAY, FlinkOptions.PARTITION_FORMAT_DASHED_DAY})
10731090
void testWriteAndReadWithDatePartitioning(String partitionFormat) {

0 commit comments

Comments
 (0)