Skip to content

Commit 64e8859

Browse files
committed
[HUDI-2938] Metadata table util to get latest file slices for reader/writers
- Readers would want to get the latest file slices merged with the last known committed instants just before the pending compaction if any, and Writers would want to get the latest file slices without merging with any. - Refactoring the code so that the readers and writers can use the right version
1 parent 1d4fb82 commit 64e8859

6 files changed

Lines changed: 54 additions & 21 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -696,7 +696,6 @@ protected void compactIfNecessary(AbstractHoodieWriteClient writeClient, String
696696
return;
697697
}
698698

699-
700699
// Trigger compaction with suffixes based on the same instant time. This ensures that any future
701700
// delta commits synced over will not have an instant time lesser than the last completed instant on the
702701
// metadata table.

hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ protected void commit(HoodieData<HoodieRecord> hoodieDataRecords, String partiti
154154
* The record is tagged with respective file slice's location based on its record key.
155155
*/
156156
private List<HoodieRecord> prepRecords(List<HoodieRecord> records, String partitionName, int numFileGroups) {
157-
List<FileSlice> fileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName, false);
157+
List<FileSlice> fileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, partitionName);
158158
ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups));
159159

160160
return records.stream().map(r -> {

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ protected void commit(HoodieData<HoodieRecord> hoodieDataRecords, String partiti
169169
* The record is tagged with respective file slice's location based on its record key.
170170
*/
171171
private JavaRDD<HoodieRecord> prepRecords(JavaRDD<HoodieRecord> recordsRDD, String partitionName, int numFileGroups) {
172-
List<FileSlice> fileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName, false);
172+
List<FileSlice> fileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, partitionName);
173173
ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups));
174174

175175
return recordsRDD.map(r -> {

hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -414,10 +414,8 @@ public void testVirtualKeysInBaseFiles(boolean populateMetaFields) throws Except
414414
});
415415
}
416416

417-
418417
/**
419418
* Tests that virtual key configs are honored in base files after compaction in metadata table.
420-
*
421419
*/
422420
@ParameterizedTest
423421
@ValueSource(booleans = {true, false})

hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> openReadersI
245245

246246
// Metadata is in sync till the latest completed instant on the dataset
247247
HoodieTimer timer = new HoodieTimer().startTimer();
248-
List<FileSlice> latestFileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName, true);
248+
List<FileSlice> latestFileSlices = HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, partitionName);
249249
if (latestFileSlices.size() == 0) {
250250
// empty partition
251251
return Pair.of(null, null);

hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java

Lines changed: 51 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ public static List<HoodieRecord> convertMetadataToRecords(HoodieCleanMetadata cl
157157
* @return a list of metadata table records
158158
*/
159159
public static List<HoodieRecord> convertMetadataToRecords(HoodieActiveTimeline metadataTableTimeline,
160-
HoodieRestoreMetadata restoreMetadata, String instantTime, Option<String> lastSyncTs) {
160+
HoodieRestoreMetadata restoreMetadata, String instantTime, Option<String> lastSyncTs) {
161161
Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>();
162162
Map<String, List<String>> partitionToDeletedFiles = new HashMap<>();
163163
restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> {
@@ -334,29 +334,65 @@ public static int mapRecordKeyToFileGroupIndex(String recordKey, int numFileGrou
334334
}
335335

336336
/**
337-
* Loads the list of file groups for a partition of the Metadata Table with latest file slices.
337+
* Get the latest file slices for a Metadata Table partition. If the file slice is
338+
* because of pending compaction instant, then merge the file slice with the one
339+
* just before the compaction instant time. The list of file slices returned is
340+
* sorted in the correct order of file group name.
338341
*
339-
* The list of file slices returned is sorted in the correct order of file group name.
340-
* @param metaClient instance of {@link HoodieTableMetaClient}.
341-
* @param partition The name of the partition whose file groups are to be loaded.
342-
* @param isReader true if reader code path, false otherwise.
342+
* @param metaClient - Instance of {@link HoodieTableMetaClient}.
343+
* @param partition - The name of the partition whose file groups are to be loaded.
343344
* @return List of latest file slices for all file groups in a given partition.
344345
*/
345-
public static List<FileSlice> loadPartitionFileGroupsWithLatestFileSlices(HoodieTableMetaClient metaClient, String partition, boolean isReader) {
346-
LOG.info("Loading file groups for metadata table partition " + partition);
346+
public static List<FileSlice> getPartitionLatestMergedFileSlices(HoodieTableMetaClient metaClient, String partition) {
347+
LOG.info("Loading latest merged file slices for metadata table partition " + partition);
348+
return getPartitionFileSlices(metaClient, partition, true);
349+
}
350+
351+
/**
352+
* Get the latest file slices for a Metadata Table partition. The list of file slices
353+
* returned is sorted in the correct order of file group name.
354+
*
355+
* @param metaClient - Instance of {@link HoodieTableMetaClient}.
356+
* @param partition - The name of the partition whose file groups are to be loaded.
357+
* @return List of latest file slices for all file groups in a given partition.
358+
*/
359+
public static List<FileSlice> getPartitionLatestFileSlices(HoodieTableMetaClient metaClient, String partition) {
360+
LOG.info("Loading latest file slices for metadata table partition " + partition);
361+
return getPartitionFileSlices(metaClient, partition, false);
362+
}
347363

348-
// If there are no commits on the metadata table then the table's default FileSystemView will not return any file
349-
// slices even though we may have initialized them.
364+
/**
365+
* Get the latest file slices for a given partition.
366+
*
367+
* @param metaClient - Instance of {@link HoodieTableMetaClient}.
368+
* @param partition - The name of the partition whose file groups are to be loaded.
369+
* @param mergeFileSlices - When enabled, will merge the latest file slices with the last known
370+
* completed instant. This is useful for readers when there are pending
371+
* compactions. MergeFileSlices when disabled, will return the latest file
372+
* slices without any merging, and this is needed for the writers.
373+
* @return List of latest file slices for all file groups in a given partition.
374+
*/
375+
private static List<FileSlice> getPartitionFileSlices(HoodieTableMetaClient metaClient, String partition,
376+
boolean mergeFileSlices) {
377+
// If there are no commits on the metadata table then the table's
378+
// default FileSystemView will not return any file slices even
379+
// though we may have initialized them.
350380
HoodieTimeline timeline = metaClient.getActiveTimeline();
351381
if (timeline.empty()) {
352-
final HoodieInstant instant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieActiveTimeline.createNewInstantTime());
382+
final HoodieInstant instant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION,
383+
HoodieActiveTimeline.createNewInstantTime());
353384
timeline = new HoodieDefaultTimeline(Arrays.asList(instant).stream(), metaClient.getActiveTimeline()::getInstantDetails);
354385
}
355386

356387
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, timeline);
357-
Stream<FileSlice> fileSliceStream = isReader ? fsView.getLatestMergedFileSlicesBeforeOrOn(partition, timeline.filterCompletedInstants().lastInstant().get().getTimestamp()) :
358-
fsView.getLatestFileSlices(partition);
359-
return fileSliceStream.sorted((s1, s2) -> s1.getFileId().compareTo(s2.getFileId()))
360-
.collect(Collectors.toList());
388+
Stream<FileSlice> fileSliceStream;
389+
if (mergeFileSlices) {
390+
fileSliceStream = fsView.getLatestMergedFileSlicesBeforeOrOn(
391+
partition, timeline.filterCompletedInstants().lastInstant().get().getTimestamp());
392+
} else {
393+
fileSliceStream = fsView.getLatestFileSlices(partition);
394+
}
395+
return fileSliceStream.sorted((s1, s2) -> s1.getFileId().compareTo(s2.getFileId())).collect(Collectors.toList());
361396
}
397+
362398
}

0 commit comments

Comments
 (0)