|
20 | 20 |
|
21 | 21 | import org.apache.hudi.avro.model.HoodieInstantInfo; |
22 | 22 | import org.apache.hudi.avro.model.HoodieRollbackPlan; |
| 23 | +import org.apache.hudi.avro.model.HoodieRollbackRequest; |
23 | 24 | import org.apache.hudi.common.config.HoodieMetadataConfig; |
24 | 25 | import org.apache.hudi.common.fs.FSUtils; |
25 | 26 | import org.apache.hudi.common.model.HoodieBaseFile; |
|
61 | 62 | import java.util.stream.Collectors; |
62 | 63 | import java.util.stream.Stream; |
63 | 64 |
|
| 65 | +import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING; |
64 | 66 | import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; |
65 | 67 | import static org.junit.jupiter.api.Assertions.assertEquals; |
66 | 68 | import static org.junit.jupiter.api.Assertions.assertFalse; |
| 69 | +import static org.junit.jupiter.api.Assertions.assertNotEquals; |
67 | 70 | import static org.junit.jupiter.api.Assertions.assertThrows; |
68 | 71 | import static org.junit.jupiter.api.Assertions.assertTrue; |
69 | 72 |
|
@@ -481,4 +484,107 @@ public void testAutoRollbackInflightCommit() throws Exception { |
481 | 484 | assertFalse(testTable.baseFilesExist(partitionAndFileId3, commitTime3)); |
482 | 485 | } |
483 | 486 | } |
| 487 | + |
| 488 | + private static Stream<Arguments> testRollbackWithRequestedRollbackPlanParams() { |
| 489 | + return Arrays.stream(new Boolean[][] { |
| 490 | + {true, true}, {true, false}, {false, true}, {false, false}, |
| 491 | + }).map(Arguments::of); |
| 492 | + } |
| 493 | + |
| 494 | + @ParameterizedTest |
| 495 | + @MethodSource("testRollbackWithRequestedRollbackPlanParams") |
| 496 | + public void testRollbackWithRequestedRollbackPlan(boolean enableMetadataTable, boolean isRollbackPlanCorrupted) throws Exception { |
| 497 | + // Let's create some commit files and base files |
| 498 | + final String p1 = "2022/04/05"; |
| 499 | + final String p2 = "2022/04/06"; |
| 500 | + final String commitTime1 = "20220406010101002"; |
| 501 | + final String commitTime2 = "20220406020601002"; |
| 502 | + final String commitTime3 = "20220406030611002"; |
| 503 | + final String rollbackInstantTime = "20220406040611002"; |
| 504 | + Map<String, String> partitionAndFileId1 = new HashMap<String, String>() { |
| 505 | + { |
| 506 | + put(p1, "id11"); |
| 507 | + put(p2, "id12"); |
| 508 | + } |
| 509 | + }; |
| 510 | + Map<String, String> partitionAndFileId2 = new HashMap<String, String>() { |
| 511 | + { |
| 512 | + put(p1, "id21"); |
| 513 | + put(p2, "id22"); |
| 514 | + } |
| 515 | + }; |
| 516 | + Map<String, String> partitionAndFileId3 = new HashMap<String, String>() { |
| 517 | + { |
| 518 | + put(p1, "id31"); |
| 519 | + put(p2, "id32"); |
| 520 | + } |
| 521 | + }; |
| 522 | + |
| 523 | + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) |
| 524 | + .withRollbackUsingMarkers(false) |
| 525 | + .withMetadataConfig( |
| 526 | + HoodieMetadataConfig.newBuilder() |
| 527 | + // Column Stats Index is disabled, since these tests construct tables which are |
| 528 | + // not valid (empty commit metadata, invalid parquet files) |
| 529 | + .withMetadataIndexColumnStats(false) |
| 530 | + .enable(enableMetadataTable) |
| 531 | + .build() |
| 532 | + ) |
| 533 | + .withCompactionConfig(HoodieCompactionConfig.newBuilder() |
| 534 | + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()) |
| 535 | + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build(); |
| 536 | + |
| 537 | + HoodieTestTable testTable = enableMetadataTable |
| 538 | + ? HoodieMetadataTestTable.of(metaClient, SparkHoodieBackedTableMetadataWriter.create( |
| 539 | + metaClient.getHadoopConf(), config, context)) |
| 540 | + : HoodieTestTable.of(metaClient); |
| 541 | + |
| 542 | + testTable.withPartitionMetaFiles(p1, p2) |
| 543 | + .addCommit(commitTime1) |
| 544 | + .withBaseFilesInPartitions(partitionAndFileId1) |
| 545 | + .addCommit(commitTime2) |
| 546 | + .withBaseFilesInPartitions(partitionAndFileId2) |
| 547 | + .addInflightCommit(commitTime3) |
| 548 | + .withBaseFilesInPartitions(partitionAndFileId3); |
| 549 | + |
| 550 | + try (SparkRDDWriteClient client = getHoodieWriteClient(config)) { |
| 551 | + if (isRollbackPlanCorrupted) { |
| 552 | + // Add a corrupted requested rollback plan |
| 553 | + FileCreateUtils.createRequestedRollbackFile(metaClient.getBasePath(), rollbackInstantTime, new byte[] {0, 1, 2}); |
| 554 | + } else { |
| 555 | + // Add a valid requested rollback plan to roll back commitTime3 |
| 556 | + HoodieRollbackPlan rollbackPlan = new HoodieRollbackPlan(); |
| 557 | + List<HoodieRollbackRequest> rollbackRequestList = partitionAndFileId3.keySet().stream() |
| 558 | + .map(partition -> new HoodieRollbackRequest(partition, EMPTY_STRING, EMPTY_STRING, |
| 559 | + Collections.singletonList(metaClient.getBasePath() + "/" + partition + "/" |
| 560 | + + FileCreateUtils.baseFileName(commitTime3, partitionAndFileId3.get(p1))), |
| 561 | + Collections.emptyMap())) |
| 562 | + .collect(Collectors.toList()); |
| 563 | + rollbackPlan.setRollbackRequests(rollbackRequestList); |
| 564 | + rollbackPlan.setInstantToRollback(new HoodieInstantInfo(commitTime3, HoodieTimeline.COMMIT_ACTION)); |
| 565 | + FileCreateUtils.createRequestedRollbackFile(metaClient.getBasePath(), rollbackInstantTime, rollbackPlan); |
| 566 | + } |
| 567 | + |
| 568 | + // Rollback commit3 |
| 569 | + client.rollback(commitTime3); |
| 570 | + assertFalse(testTable.inflightCommitExists(commitTime3)); |
| 571 | + assertFalse(testTable.baseFilesExist(partitionAndFileId3, commitTime3)); |
| 572 | + assertTrue(testTable.baseFilesExist(partitionAndFileId2, commitTime2)); |
| 573 | + |
| 574 | + metaClient.reloadActiveTimeline(); |
| 575 | + List<HoodieInstant> rollbackInstants = metaClient.getActiveTimeline().getRollbackTimeline().getInstants().collect(Collectors.toList()); |
| 576 | + // Corrupted requested rollback plan should be deleted before scheduling a new one |
| 577 | + assertEquals(rollbackInstants.size(), 1); |
| 578 | + HoodieInstant rollbackInstant = rollbackInstants.get(0); |
| 579 | + assertTrue(rollbackInstant.isCompleted()); |
| 580 | + |
| 581 | + if (isRollbackPlanCorrupted) { |
| 582 | + // Should create a new rollback instant |
| 583 | + assertNotEquals(rollbackInstantTime, rollbackInstant.getTimestamp()); |
| 584 | + } else { |
| 585 | + // Should reuse the rollback instant |
| 586 | + assertEquals(rollbackInstantTime, rollbackInstant.getTimestamp()); |
| 587 | + } |
| 588 | + } |
| 589 | + } |
484 | 590 | } |
0 commit comments