[HUDI-6098] Use bulk insert prepped for the initial write into MDT.#8493
[HUDI-6098] Use bulk insert prepped for the initial write into MDT.#8493prashantwason wants to merge 1 commit intoapache:masterfrom
Conversation
| public static int getFileGroupIndexFromFileId(String fileId) { | ||
| // 0.10 version MDT code added -0 (0th fileIndex) to the fileID | ||
| int endIndex = fileId.endsWith("-0") ? fileId.length() - 2 : fileId.length(); | ||
| int fromIndex = fileId.lastIndexOf("-", endIndex); |
There was a problem hiding this comment.
Can we abstract this code as separate method:
// 0.10 version MDT code added -0 (0th fileIndex) to the fileID
int endIndex = fileId.endsWith("-0") ? fileId.length() - 2 : fileId.length()44bee77 to
f76d8be
Compare
|
Oops, code conflicts with your previous change. |
f76d8be to
6d9d24f
Compare
|
Rebased and fixed conflict. |
|
@hudi-bot run azure |
1 similar comment
|
@hudi-bot run azure |
|
@prashantwason You need to rebase with the latest master to get the tests passed. |
| * | ||
| * This partitioner requires the records to be already tagged with location. | ||
| */ | ||
| public class SparkHoodieMetadataBulkInsertPartitioner implements BulkInsertPartitioner<JavaRDD<HoodieRecord>> { |
There was a problem hiding this comment.
do we have UT for this ?
|
|
||
| // Partition the records by their file group | ||
| JavaRDD<HoodieRecord> partitionedRDD = records | ||
| // key by <file group index, record key>. The file group index is used to partition and the record key is used to sort within the partition. |
There was a problem hiding this comment.
from what I glean, partitioning is based on fileGroupIndex disregarding the MDT partition. So, tell me something.
if we have 2 file groups in col stats and 2 file groups for RLI, does 1st file group for both col stats and RLI belong to same partition in this repartition call ?
should the partitioning be based on fileId itself and sorting within that can be based on the record keys within each partition. or am I missing something ?
There was a problem hiding this comment.
reason being, the argument "JavaRDD records " to this method could contain records for N no of partitions in MDT. not sure if we are making any assumptions on that.
| fileIds.add(fileID); | ||
| } else { | ||
| // Empty partition | ||
| fileIds.add(""); |
There was a problem hiding this comment.
can you help me understand when we might hit this ?
| * @param index Index of the file group within the partition | ||
| * @return The fileID | ||
| */ | ||
| public static String getFileIDForFileGroup(MetadataPartitionType metadataPartition, int index) { |
There was a problem hiding this comment.
do we have UTs for these.
| int fileGroupIndex = HoodieTableMetadataUtil.getFileGroupIndexFromFileId(r.getCurrentLocation().getFileId()); | ||
| return new Tuple2<Integer, String>(fileGroupIndex, r.getRecordKey()); | ||
| }) | ||
| .repartitionAndSortWithinPartitions(new FileGroupPartitioner(), keyComparator) |
There was a problem hiding this comment.
we know the total partitions is going to be equal to total file groups. can we override the "numPartitions" for FileGroupPartitioner?
|
synced up directly. I am ok with the assumption that, we will initialize one metadata partition at a time. |
|
Closing this as I have added the changes in another PR: #8684 |
Change Logs
Impact
Massive increase in read performance after initial creation of a index.
Reduces the large read/write IO requirement for the first compaction in MDT.
Reduces the duplicate storage of initial log files keeping the redundant initial commit data until cleaned.
Faster initial commit as bulkInsert is more performant for billions of records than upsert which has a workload profiling stage.
Risk level (write none, low medium or high below)
None
Already covered by existing unit tests.
Documentation Update
None
Contributor's checklist