Skip to content

Commit 1895076

Browse files
committed
[HUDI-2796] Metadata table support for Restore action to first commit
- Adding support for the metadata table to restore to first commit and take proper action for the bootstrap on subequent commits.
1 parent 4e067ca commit 1895076

4 files changed

Lines changed: 64 additions & 20 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java

Lines changed: 56 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -339,21 +339,11 @@ private <T extends SpecificRecordBase> boolean isBootstrapNeeded(Option<HoodieIn
339339
return false;
340340
}
341341

342-
boolean isRollbackAction = false;
343-
List<String> rollbackedTimestamps = Collections.emptyList();
344-
if (actionMetadata.isPresent() && actionMetadata.get() instanceof HoodieRollbackMetadata) {
345-
isRollbackAction = true;
346-
List<HoodieInstantInfo> rollbackedInstants =
347-
((HoodieRollbackMetadata) actionMetadata.get()).getInstantsRollback();
348-
rollbackedTimestamps = rollbackedInstants.stream().map(instant -> {
349-
return instant.getCommitTime().toString();
350-
}).collect(Collectors.toList());
351-
}
352-
342+
// Detect the commit gaps if any from the data and the metadata active timeline
353343
if (dataMetaClient.getActiveTimeline().getAllCommitsTimeline().isBeforeTimelineStarts(
354344
latestMetadataInstant.get().getTimestamp())
355-
&& (!isRollbackAction || !rollbackedTimestamps.contains(latestMetadataInstantTimestamp))) {
356-
LOG.warn("Metadata Table will need to be re-bootstrapped as un-synced instants have been archived."
345+
&& !isCommitRevertedByInFlightAction(actionMetadata, latestMetadataInstantTimestamp)) {
346+
LOG.error("Metadata Table will need to be re-bootstrapped as un-synced instants have been archived."
357347
+ " latestMetadataInstant=" + latestMetadataInstant.get().getTimestamp()
358348
+ ", latestDataInstant=" + dataMetaClient.getActiveTimeline().firstInstant().get().getTimestamp());
359349
return true;
@@ -362,10 +352,62 @@ private <T extends SpecificRecordBase> boolean isBootstrapNeeded(Option<HoodieIn
362352
return false;
363353
}
364354

355+
/**
356+
* Is the latest commit instant reverted by the in-flight instant action?
357+
*
358+
* @param actionMetadata - In-flight instant action metadata
359+
* @param latestMetadataInstantTimestamp - Metadata table latest instant timestamp
360+
* @param <T> - ActionMetadata type
361+
* @return True if the latest instant action is reverted by the action
362+
*/
363+
private <T extends SpecificRecordBase> boolean isCommitRevertedByInFlightAction(Option<T> actionMetadata,
364+
final String latestMetadataInstantTimestamp) {
365+
366+
if (!actionMetadata.isPresent()) {
367+
return false;
368+
}
369+
370+
final String INSTANT_ACTION = (actionMetadata.get() instanceof HoodieRollbackMetadata
371+
? HoodieTimeline.ROLLBACK_ACTION
372+
: (actionMetadata.get() instanceof HoodieRestoreMetadata ? HoodieTimeline.RESTORE_ACTION : ""));
373+
374+
List<String> affectedInstantTimestamps;
375+
switch (INSTANT_ACTION) {
376+
case HoodieTimeline.ROLLBACK_ACTION:
377+
List<HoodieInstantInfo> rollbackedInstants =
378+
((HoodieRollbackMetadata) actionMetadata.get()).getInstantsRollback();
379+
affectedInstantTimestamps = rollbackedInstants.stream().map(instant -> {
380+
return instant.getCommitTime().toString();
381+
}).collect(Collectors.toList());
382+
383+
if (affectedInstantTimestamps.contains(latestMetadataInstantTimestamp)) {
384+
return true;
385+
}
386+
break;
387+
388+
case HoodieTimeline.RESTORE_ACTION:
389+
List<HoodieInstantInfo> restoredInstants =
390+
((HoodieRestoreMetadata) actionMetadata.get()).getRestoreInstantInfo();
391+
affectedInstantTimestamps = restoredInstants.stream().map(instant -> {
392+
return instant.getCommitTime().toString();
393+
}).collect(Collectors.toList());
394+
395+
if (affectedInstantTimestamps.contains(latestMetadataInstantTimestamp)) {
396+
return true;
397+
}
398+
break;
399+
400+
default:
401+
return false;
402+
}
403+
404+
return false;
405+
}
406+
365407
/**
366408
* Initialize the Metadata Table by listing files and partitions from the file system.
367409
*
368-
* @param dataMetaClient {@code HoodieTableMetaClient} for the dataset.
410+
* @param dataMetaClient {@code HoodieTableMetaClient} for the dataset.
369411
* @param inflightInstantTimestamp
370412
*/
371413
private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, HoodieTableMetaClient dataMetaClient,

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,6 @@ protected final void writeTableMetadata(HoodieRollbackMetadata metadata) {
8181
* @param metadata restore metadata of interest.
8282
*/
8383
protected final void writeTableMetadata(HoodieRestoreMetadata metadata) {
84-
table.getMetadataWriter().ifPresent(w -> w.update(metadata, instantTime));
84+
table.getMetadataWriter(Option.of(metadata)).ifPresent(w -> w.update(metadata, instantTime));
8585
}
8686
}

hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ public void testRollbackOperations(HoodieTableType tableType) throws Exception {
361361
@Test
362362
public void testManualRollbacks() throws Exception {
363363
HoodieTableType tableType = COPY_ON_WRITE;
364-
init(tableType, false);
364+
init(tableType, true);
365365
// Setting to archive more aggressively on the Metadata Table than the Dataset
366366
final int maxDeltaCommitsBeforeCompaction = 4;
367367
final int minArchiveCommitsMetadata = 2;
@@ -871,7 +871,7 @@ public void testReader() throws Exception {
871871
*/
872872
@Test
873873
public void testCleaningArchivingAndCompaction() throws Exception {
874-
init(HoodieTableType.COPY_ON_WRITE, false);
874+
init(HoodieTableType.COPY_ON_WRITE, true);
875875
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
876876

877877
final int maxDeltaCommitsBeforeCompaction = 3;

hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,8 @@ void testCOWToMORConvertedTableRollback(boolean rollbackUsingMarkers) throws Exc
139139
@ParameterizedTest
140140
@ValueSource(booleans = {true, false})
141141
void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) throws Exception {
142-
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false, rollbackUsingMarkers, HoodieIndex.IndexType.SIMPLE);
142+
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false, rollbackUsingMarkers, HoodieIndex.IndexType.SIMPLE)
143+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build());
143144
addConfigsForPopulateMetaFields(cfgBuilder, true);
144145
HoodieWriteConfig cfg = cfgBuilder.build();
145146

@@ -294,7 +295,8 @@ void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) thro
294295
@Test
295296
void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception {
296297
boolean populateMetaFields = true;
297-
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build());
298+
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false)
299+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build());
298300
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
299301
HoodieWriteConfig cfg = cfgBuilder.build();
300302

@@ -344,7 +346,7 @@ void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception {
344346
newCommitTime = "002";
345347
// WriteClient with custom config (disable small file handling)
346348
HoodieWriteConfig smallFileWriteConfig = getHoodieWriteConfigWithSmallFileHandlingOffBuilder(populateMetaFields)
347-
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).build();
349+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build();
348350
try (SparkRDDWriteClient nClient = getHoodieWriteClient(smallFileWriteConfig)) {
349351
nClient.startCommitWithTime(newCommitTime);
350352

0 commit comments

Comments
 (0)