Skip to content

Commit a2eb2b0

Browse files
[HUDI-2480] FileSlice after pending compaction-requested instant-time… (#3703)
* [HUDI-2480] FileSlice after pending compaction-requested instant-time is ignored by MOR snapshot reader * include file slice after a pending compaction for spark reader Co-authored-by: garyli1019 <yanjia.gary.li@gmail.com>
1 parent 88067f5 commit a2eb2b0

4 files changed

Lines changed: 32 additions & 5 deletions

File tree

hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -303,8 +303,8 @@ private List<MergeOnReadInputSplit> buildFileIndex() {
303303
}
304304

305305
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
306-
metaClient.getActiveTimeline().getCommitsTimeline()
307-
.filterCompletedInstants(), fileStatuses);
306+
// file-slice after pending compaction-requested instant-time is also considered valid
307+
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(), fileStatuses);
308308
String latestCommit = fsView.getLastInstant().get().getTimestamp();
309309
final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
310310
final AtomicInteger cnt = new AtomicInteger(0);

hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,32 @@ void testReadWithWiderSchema(HoodieTableType tableType) throws Exception {
463463
TestData.assertRowDataEquals(result, TestData.DATA_SET_INSERT);
464464
}
465465

466+
/**
467+
* Test reading file groups with compaction plan scheduled and delta logs.
468+
* File-slice after pending compaction-requested instant-time should also be considered valid.
469+
*/
470+
@Test
471+
void testReadMORWithCompactionPlanScheduled() throws Exception {
472+
Map<String, String> options = new HashMap<>();
473+
// compact for each commit
474+
options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1");
475+
options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
476+
beforeEach(HoodieTableType.MERGE_ON_READ, options);
477+
478+
// write three commits
479+
for (int i = 0; i < 6; i += 2) {
480+
List<RowData> dataset = TestData.dataSetInsert(i + 1, i + 2);
481+
TestData.writeData(dataset, conf);
482+
}
483+
484+
InputFormat<RowData, ?> inputFormat1 = this.tableSource.getInputFormat();
485+
assertThat(inputFormat1, instanceOf(MergeOnReadInputFormat.class));
486+
487+
List<RowData> actual = readData(inputFormat1);
488+
final List<RowData> expected = TestData.dataSetInsert(1, 2, 3, 4, 5, 6);
489+
TestData.assertRowDataEquals(actual, expected);
490+
}
491+
466492
// -------------------------------------------------------------------------
467493
// Utilities
468494
// -------------------------------------------------------------------------

hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ public static List<Pair<Option<HoodieBaseFile>, List<String>>> groupLogsByBaseFi
237237
try {
238238
// Both commit and delta-commits are included - pick the latest completed one
239239
Option<HoodieInstant> latestCompletedInstant =
240-
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
240+
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants().lastInstant();
241241

242242
Stream<FileSlice> latestFileSlices = latestCompletedInstant
243243
.map(instant -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, instant.getTimestamp()))

hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,9 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
151151
// Load files from the global paths if it has defined to be compatible with the original mode
152152
val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths.get)
153153
val fsView = new HoodieTableFileSystemView(metaClient,
154-
metaClient.getActiveTimeline.getCommitsTimeline
155-
.filterCompletedInstants, inMemoryFileIndex.allFiles().toArray)
154+
// file-slice after pending compaction-requested instant-time is also considered valid
155+
metaClient.getCommitsAndCompactionTimeline.filterCompletedAndCompactionInstants,
156+
inMemoryFileIndex.allFiles().toArray)
156157
val partitionPaths = fsView.getLatestBaseFiles.iterator().asScala.toList.map(_.getFileStatus.getPath.getParent)
157158

158159

0 commit comments

Comments
 (0)