Skip to content

Commit c928c4b

Browse files
authored
GH-2959: Optimize the test case of parquet rewriter. (#2960)
1 parent 824b7d0 commit c928c4b

File tree

2 files changed

+91
-71
lines changed

2 files changed

+91
-71
lines changed

parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java

Lines changed: 76 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,13 @@ public class ParquetRewriterTest {
107107
private final IndexCache.CacheStrategy indexCacheStrategy;
108108
private final boolean usingHadoop;
109109

110-
private List<EncryptionTestFile> inputFiles = null;
110+
private List<EncryptionTestFile> inputFiles = Lists.newArrayList();
111111
private String outputFile = null;
112112
private ParquetRewriter rewriter = null;
113113

114+
private final EncryptionTestFile gzipEncryptionTestFileWithoutBloomFilterColumn;
115+
private final EncryptionTestFile uncompressedEncryptionTestFileWithoutBloomFilterColumn;
116+
114117
@Parameterized.Parameters(name = "WriterVersion = {0}, IndexCacheStrategy = {1}, UsingHadoop = {2}")
115118
public static Object[][] parameters() {
116119
return new Object[][] {
@@ -121,10 +124,26 @@ public static Object[][] parameters() {
121124
};
122125
}
123126

124-
public ParquetRewriterTest(String writerVersion, String indexCacheStrategy, boolean usingHadoop) {
127+
public ParquetRewriterTest(String writerVersion, String indexCacheStrategy, boolean usingHadoop)
128+
throws IOException {
125129
this.writerVersion = ParquetProperties.WriterVersion.fromString(writerVersion);
126130
this.indexCacheStrategy = IndexCache.CacheStrategy.valueOf(indexCacheStrategy);
127131
this.usingHadoop = usingHadoop;
132+
133+
MessageType testSchema = createSchema();
134+
this.gzipEncryptionTestFileWithoutBloomFilterColumn = new TestFileBuilder(conf, testSchema)
135+
.withNumRecord(numRecord)
136+
.withCodec("GZIP")
137+
.withPageSize(1024)
138+
.withWriterVersion(this.writerVersion)
139+
.build();
140+
141+
this.uncompressedEncryptionTestFileWithoutBloomFilterColumn = new TestFileBuilder(conf, testSchema)
142+
.withNumRecord(numRecord)
143+
.withCodec("UNCOMPRESSED")
144+
.withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
145+
.withWriterVersion(this.writerVersion)
146+
.build();
128147
}
129148

130149
private void testPruneSingleColumnTranslateCodec(List<Path> inputPaths) throws Exception {
@@ -141,7 +160,7 @@ private void testPruneSingleColumnTranslateCodec(List<Path> inputPaths) throws E
141160
rewriter.processBlocks();
142161
rewriter.close();
143162

144-
// Verify the schema are not changed for the columns not pruned
163+
// Verify the schema is not changed for the columns not pruned
145164
validateSchema();
146165

147166
// Verify codec has been translated
@@ -179,7 +198,7 @@ public void setUp() {
179198

180199
@Test
181200
public void testPruneSingleColumnTranslateCodecSingleFile() throws Exception {
182-
testSingleInputFileSetup("GZIP");
201+
ensureContainsGzipFile();
183202
List<Path> inputPaths = new ArrayList<Path>() {
184203
{
185204
add(new Path(inputFiles.get(0).getFileName()));
@@ -190,7 +209,8 @@ public void testPruneSingleColumnTranslateCodecSingleFile() throws Exception {
190209

191210
@Test
192211
public void testPruneSingleColumnTranslateCodecTwoFiles() throws Exception {
193-
testMultipleInputFilesSetup();
212+
ensureContainsGzipFile();
213+
ensureContainsUncompressedFile();
194214
List<Path> inputPaths = new ArrayList<Path>() {
195215
{
196216
add(new Path(inputFiles.get(0).getFileName()));
@@ -249,7 +269,8 @@ private void testPruneNullifyTranslateCodec(List<Path> inputPaths) throws Except
249269

250270
@Test
251271
public void testPruneNullifyTranslateCodecSingleFile() throws Exception {
252-
testSingleInputFileSetup("GZIP");
272+
ensureContainsGzipFile();
273+
253274
List<Path> inputPaths = new ArrayList<Path>() {
254275
{
255276
add(new Path(inputFiles.get(0).getFileName()));
@@ -260,7 +281,9 @@ public void testPruneNullifyTranslateCodecSingleFile() throws Exception {
260281

261282
@Test
262283
public void testPruneNullifyTranslateCodecTwoFiles() throws Exception {
263-
testMultipleInputFilesSetup();
284+
ensureContainsGzipFile();
285+
ensureContainsUncompressedFile();
286+
264287
List<Path> inputPaths = new ArrayList<Path>() {
265288
{
266289
add(new Path(inputFiles.get(0).getFileName()));
@@ -294,7 +317,7 @@ private void testPruneEncryptTranslateCodec(List<Path> inputPaths) throws Except
294317
rewriter.processBlocks();
295318
rewriter.close();
296319

297-
// Verify the schema are not changed for the columns not pruned
320+
// Verify the schema is not changed for the columns not pruned
298321
validateSchema();
299322

300323
// Verify codec has been translated
@@ -331,7 +354,8 @@ private void testPruneEncryptTranslateCodec(List<Path> inputPaths) throws Except
331354

332355
@Test
333356
public void testPruneEncryptTranslateCodecSingleFile() throws Exception {
334-
testSingleInputFileSetup("GZIP");
357+
ensureContainsGzipFile();
358+
335359
List<Path> inputPaths = new ArrayList<Path>() {
336360
{
337361
add(new Path(inputFiles.get(0).getFileName()));
@@ -342,7 +366,9 @@ public void testPruneEncryptTranslateCodecSingleFile() throws Exception {
342366

343367
@Test
344368
public void testPruneEncryptTranslateCodecTwoFiles() throws Exception {
345-
testMultipleInputFilesSetup();
369+
ensureContainsGzipFile();
370+
ensureContainsUncompressedFile();
371+
346372
List<Path> inputPaths = new ArrayList<Path>() {
347373
{
348374
add(new Path(inputFiles.get(0).getFileName()));
@@ -383,7 +409,7 @@ public void testRewriteWithoutColumnIndexes() throws Exception {
383409
rewriter.processBlocks();
384410
rewriter.close();
385411

386-
// Verify the schema are not changed for the columns not pruned
412+
// Verify the schema is not changed for the columns not pruned
387413
ParquetMetadata pmd =
388414
ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER);
389415
MessageType schema = pmd.getFileMetaData().getSchema();
@@ -413,7 +439,7 @@ public void testRewriteWithoutColumnIndexes() throws Exception {
413439
assertEquals(inRead.getLong("id", 0), outRead.getLong("id", 0));
414440
assertEquals(inRead.getString("name", 0), outRead.getString("name", 0));
415441

416-
// location was nulled
442+
// location was null
417443
Group finalOutRead = outRead;
418444
assertThrows(
419445
RuntimeException.class,
@@ -422,7 +448,7 @@ public void testRewriteWithoutColumnIndexes() throws Exception {
422448
RuntimeException.class,
423449
() -> finalOutRead.getGroup("location", 0).getDouble("lon", 0));
424450

425-
// phonenumbers was pruned
451+
// phone numbers was pruned
426452
assertThrows(InvalidRecordException.class, () -> finalOutRead.getGroup("phoneNumbers", 0));
427453
}
428454
}
@@ -497,7 +523,8 @@ private void testNullifyAndEncryptColumn(List<Path> inputPaths) throws Exception
497523

498524
@Test
499525
public void testNullifyEncryptSingleFile() throws Exception {
500-
testSingleInputFileSetup("GZIP");
526+
ensureContainsGzipFile();
527+
501528
List<Path> inputPaths = new ArrayList<Path>() {
502529
{
503530
add(new Path(inputFiles.get(0).getFileName()));
@@ -508,7 +535,9 @@ public void testNullifyEncryptSingleFile() throws Exception {
508535

509536
@Test
510537
public void testNullifyEncryptTwoFiles() throws Exception {
511-
testMultipleInputFilesSetup();
538+
ensureContainsGzipFile();
539+
ensureContainsUncompressedFile();
540+
512541
List<Path> inputPaths = new ArrayList<Path>() {
513542
{
514543
add(new Path(inputFiles.get(0).getFileName()));
@@ -520,7 +549,8 @@ public void testNullifyEncryptTwoFiles() throws Exception {
520549

521550
@Test
522551
public void testMergeTwoFilesOnly() throws Exception {
523-
testMultipleInputFilesSetup();
552+
ensureContainsGzipFile();
553+
ensureContainsUncompressedFile();
524554

525555
// Only merge two files but do not change anything.
526556
List<Path> inputPaths = new ArrayList<>();
@@ -534,7 +564,7 @@ public void testMergeTwoFilesOnly() throws Exception {
534564
rewriter.processBlocks();
535565
rewriter.close();
536566

537-
// Verify the schema are not changed
567+
// Verify the schema is not changed
538568
ParquetMetadata pmd =
539569
ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER);
540570
MessageType schema = pmd.getFileMetaData().getSchema();
@@ -615,7 +645,8 @@ public void testMergeTwoFilesWithDifferentSchema() throws Exception {
615645

616646
@Test
617647
public void testRewriteFileWithMultipleBlocks() throws Exception {
618-
testSingleInputFileSetup("GZIP", 1024L);
648+
ensureContainsGzipFile();
649+
619650
List<Path> inputPaths = new ArrayList<Path>() {
620651
{
621652
add(new Path(inputFiles.get(0).getFileName()));
@@ -626,7 +657,7 @@ public void testRewriteFileWithMultipleBlocks() throws Exception {
626657

627658
@Test
628659
public void testPruneSingleColumnTranslateCodecAndEnableBloomFilter() throws Exception {
629-
testSingleInputFileSetupWithBloomFilter("GZIP", "DocId");
660+
testSingleInputFileSetupWithBloomFilter("DocId");
630661
List<Path> inputPaths = new ArrayList<Path>() {
631662
{
632663
add(new Path(inputFiles.get(0).getFileName()));
@@ -635,14 +666,14 @@ public void testPruneSingleColumnTranslateCodecAndEnableBloomFilter() throws Exc
635666
testPruneSingleColumnTranslateCodec(inputPaths);
636667

637668
// Verify bloom filters
638-
Map<ColumnPath, List<BloomFilter>> inputBloomFilters = allInputBloomFilters(null);
669+
Map<ColumnPath, List<BloomFilter>> inputBloomFilters = allInputBloomFilters();
639670
Map<ColumnPath, List<BloomFilter>> outputBloomFilters = allOutputBloomFilters(null);
640671
assertEquals(inputBloomFilters, outputBloomFilters);
641672
}
642673

643674
@Test
644675
public void testPruneNullifyTranslateCodecAndEnableBloomFilter() throws Exception {
645-
testSingleInputFileSetupWithBloomFilter("GZIP", "DocId", "Links.Forward");
676+
testSingleInputFileSetupWithBloomFilter("DocId", "Links.Forward");
646677
List<Path> inputPaths = new ArrayList<Path>() {
647678
{
648679
add(new Path(inputFiles.get(0).getFileName()));
@@ -651,7 +682,7 @@ public void testPruneNullifyTranslateCodecAndEnableBloomFilter() throws Exceptio
651682
testPruneNullifyTranslateCodec(inputPaths);
652683

653684
// Verify bloom filters
654-
Map<ColumnPath, List<BloomFilter>> inputBloomFilters = allInputBloomFilters(null);
685+
Map<ColumnPath, List<BloomFilter>> inputBloomFilters = allInputBloomFilters();
655686
assertEquals(inputBloomFilters.size(), 2);
656687
assertTrue(inputBloomFilters.containsKey(ColumnPath.fromDotString("Links.Forward")));
657688
assertTrue(inputBloomFilters.containsKey(ColumnPath.fromDotString("DocId")));
@@ -666,7 +697,7 @@ public void testPruneNullifyTranslateCodecAndEnableBloomFilter() throws Exceptio
666697

667698
@Test
668699
public void testPruneEncryptTranslateCodecAndEnableBloomFilter() throws Exception {
669-
testSingleInputFileSetupWithBloomFilter("GZIP", "DocId", "Links.Forward");
700+
testSingleInputFileSetupWithBloomFilter("DocId", "Links.Forward");
670701
List<Path> inputPaths = new ArrayList<Path>() {
671702
{
672703
add(new Path(inputFiles.get(0).getFileName()));
@@ -675,7 +706,7 @@ public void testPruneEncryptTranslateCodecAndEnableBloomFilter() throws Exceptio
675706
testPruneEncryptTranslateCodec(inputPaths);
676707

677708
// Verify bloom filters
678-
Map<ColumnPath, List<BloomFilter>> inputBloomFilters = allInputBloomFilters(null);
709+
Map<ColumnPath, List<BloomFilter>> inputBloomFilters = allInputBloomFilters();
679710

680711
// Cannot read without FileDecryptionProperties
681712
assertThrows(ParquetCryptoRuntimeException.class, () -> allOutputBloomFilters(null));
@@ -685,42 +716,19 @@ public void testPruneEncryptTranslateCodecAndEnableBloomFilter() throws Exceptio
685716
assertEquals(inputBloomFilters, outputBloomFilters);
686717
}
687718

688-
private void testSingleInputFileSetup(String compression) throws IOException {
689-
testSingleInputFileSetup(compression, ParquetWriter.DEFAULT_BLOCK_SIZE);
690-
}
691-
692-
private void testSingleInputFileSetupWithBloomFilter(String compression, String... bloomFilterEnabledColumns)
693-
throws IOException {
694-
testSingleInputFileSetup(compression, ParquetWriter.DEFAULT_BLOCK_SIZE, bloomFilterEnabledColumns);
719+
private void testSingleInputFileSetupWithBloomFilter(String... bloomFilterEnabledColumns) throws IOException {
720+
testSingleInputFileSetup(bloomFilterEnabledColumns);
695721
}
696722

697-
private void testSingleInputFileSetup(String compression, long rowGroupSize, String... bloomFilterEnabledColumns)
698-
throws IOException {
699-
MessageType schema = createSchema();
700-
inputFiles = Lists.newArrayList();
701-
inputFiles.add(new TestFileBuilder(conf, schema)
702-
.withNumRecord(numRecord)
703-
.withCodec(compression)
704-
.withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
705-
.withRowGroupSize(rowGroupSize)
706-
.withBloomFilterEnabled(bloomFilterEnabledColumns)
707-
.withWriterVersion(writerVersion)
708-
.build());
709-
}
710-
711-
private void testMultipleInputFilesSetup() throws IOException {
723+
private void testSingleInputFileSetup(String... bloomFilterEnabledColumns) throws IOException {
712724
MessageType schema = createSchema();
713725
inputFiles = Lists.newArrayList();
714726
inputFiles.add(new TestFileBuilder(conf, schema)
715727
.withNumRecord(numRecord)
716728
.withCodec("GZIP")
717729
.withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
718-
.withWriterVersion(writerVersion)
719-
.build());
720-
inputFiles.add(new TestFileBuilder(conf, schema)
721-
.withNumRecord(numRecord)
722-
.withCodec("UNCOMPRESSED")
723-
.withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
730+
.withRowGroupSize(ParquetWriter.DEFAULT_BLOCK_SIZE)
731+
.withBloomFilterEnabled(bloomFilterEnabledColumns)
724732
.withWriterVersion(writerVersion)
725733
.build());
726734
}
@@ -748,7 +756,7 @@ private void validateColumnData(
748756
.withDecryption(fileDecryptionProperties)
749757
.build();
750758

751-
// Get total number of rows from input files
759+
// Get the total number of rows from input files
752760
int totalRows = 0;
753761
for (EncryptionTestFile inputFile : inputFiles) {
754762
totalRows += inputFile.getFileContent().length;
@@ -821,7 +829,7 @@ private ParquetMetadata getFileMetaData(String file, FileDecryptionProperties fi
821829
ParquetReadOptions readOptions = ParquetReadOptions.builder()
822830
.withDecryption(fileDecryptionProperties)
823831
.build();
824-
ParquetMetadata pmd = null;
832+
ParquetMetadata pmd;
825833
InputFile inputFile = HadoopInputFile.fromPath(new Path(file), conf);
826834
try (SeekableInputStream in = inputFile.newStream()) {
827835
pmd = ParquetFileReader.readFooter(inputFile, readOptions, in);
@@ -995,12 +1003,10 @@ private void validateRowGroupRowCount() throws Exception {
9951003
assertEquals(inputRowCounts, outputRowCounts);
9961004
}
9971005

998-
private Map<ColumnPath, List<BloomFilter>> allInputBloomFilters(FileDecryptionProperties fileDecryptionProperties)
999-
throws Exception {
1006+
private Map<ColumnPath, List<BloomFilter>> allInputBloomFilters() throws Exception {
10001007
Map<ColumnPath, List<BloomFilter>> inputBloomFilters = new HashMap<>();
10011008
for (EncryptionTestFile inputFile : inputFiles) {
1002-
Map<ColumnPath, List<BloomFilter>> bloomFilters =
1003-
allBloomFilters(inputFile.getFileName(), fileDecryptionProperties);
1009+
Map<ColumnPath, List<BloomFilter>> bloomFilters = allBloomFilters(inputFile.getFileName(), null);
10041010
for (Map.Entry<ColumnPath, List<BloomFilter>> entry : bloomFilters.entrySet()) {
10051011
List<BloomFilter> bloomFilterList = inputBloomFilters.getOrDefault(entry.getKey(), new ArrayList<>());
10061012
bloomFilterList.addAll(entry.getValue());
@@ -1072,4 +1078,16 @@ private void validateSchema() throws IOException {
10721078
assertEquals(subFields.get(0).getName(), "Backward");
10731079
assertEquals(subFields.get(1).getName(), "Forward");
10741080
}
1081+
1082+
private void ensureContainsGzipFile() {
1083+
if (!inputFiles.contains(gzipEncryptionTestFileWithoutBloomFilterColumn)) {
1084+
inputFiles.add(this.gzipEncryptionTestFileWithoutBloomFilterColumn);
1085+
}
1086+
}
1087+
1088+
private void ensureContainsUncompressedFile() {
1089+
if (!inputFiles.contains(uncompressedEncryptionTestFileWithoutBloomFilterColumn)) {
1090+
inputFiles.add(uncompressedEncryptionTestFileWithoutBloomFilterColumn);
1091+
}
1092+
}
10751093
}

0 commit comments

Comments
 (0)