Skip to content

Commit dd574b5

Browse files
committed
[HUDI-4145] Archives the metadata file in HoodieInstant.State sequence (part2)
1 parent eb21901 commit dd574b5

1 file changed

Lines changed: 30 additions & 16 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -506,13 +506,7 @@ private Stream<HoodieInstant> getInstantsToArchive() {
506506
List<HoodieInstant> instantsToStream = groupByTsAction.get(Pair.of(hoodieInstant.getTimestamp(),
507507
HoodieInstant.getComparableAction(hoodieInstant.getAction())));
508508
if (instantsToStream != null) {
509-
// sorts the instants in natural order to make sure the metadata files be removed
510-
// in HoodieInstant.State sequence: requested -> inflight -> completed,
511-
// this is important because when a COMPLETED metadata file is removed first,
512-
// other monitors on the timeline(such as the compaction or clustering services) would
513-
// mistakenly recognize the pending file as a pending operation,
514-
// then all kinds of weird bugs occur.
515-
return instantsToStream.stream().sorted();
509+
return instantsToStream.stream();
516510
} else {
517511
// if a concurrent writer archived the instant
518512
return Stream.empty();
@@ -522,19 +516,29 @@ private Stream<HoodieInstant> getInstantsToArchive() {
522516

523517
private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants, HoodieEngineContext context) throws IOException {
524518
LOG.info("Deleting instants " + archivedInstants);
525-
boolean success = true;
526-
List<String> instantFiles = archivedInstants.stream().map(archivedInstant ->
527-
new Path(metaClient.getMetaPath(), archivedInstant.getFileName())
528-
).map(Path::toString).collect(Collectors.toList());
529519

530-
context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants: " + config.getTableName());
531-
Map<String, Boolean> resultDeleteInstantFiles = deleteFilesParallelize(metaClient, instantFiles, context, false);
520+
List<String> pendingInstantFiles = new ArrayList<>();
521+
List<String> completedInstantFiles = new ArrayList<>();
532522

533-
for (Map.Entry<String, Boolean> result : resultDeleteInstantFiles.entrySet()) {
534-
LOG.info("Archived and deleted instant file " + result.getKey() + " : " + result.getValue());
535-
success &= result.getValue();
523+
for (HoodieInstant instant : archivedInstants) {
524+
String filePath = new Path(metaClient.getMetaPath(), instant.getFileName()).toString();
525+
if (instant.isCompleted()) {
526+
completedInstantFiles.add(filePath);
527+
} else {
528+
pendingInstantFiles.add(filePath);
529+
}
536530
}
537531

532+
context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants: " + config.getTableName());
533+
// Delete the metadata files
534+
// in HoodieInstant.State sequence: requested -> inflight -> completed,
535+
// this is important because when a COMPLETED metadata file is removed first,
536+
// other monitors on the timeline(such as the compaction or clustering services) would
537+
// mistakenly recognize the pending file as a pending operation,
538+
// then all kinds of weird bugs occur.
539+
boolean success = deleteArchivedInstantFiles(context, true, pendingInstantFiles);
540+
success &= deleteArchivedInstantFiles(context, success, completedInstantFiles);
541+
538542
// Remove older meta-data from auxiliary path too
539543
Option<HoodieInstant> latestCommitted = Option.fromJavaOptional(archivedInstants.stream().filter(i -> i.isCompleted() && (i.getAction().equals(HoodieTimeline.COMMIT_ACTION)
540544
|| (i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)))).max(Comparator.comparing(HoodieInstant::getTimestamp)));
@@ -545,6 +549,16 @@ private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants, Hoo
545549
return success;
546550
}
547551

552+
private boolean deleteArchivedInstantFiles(HoodieEngineContext context, boolean success, List<String> files) {
553+
Map<String, Boolean> resultDeleteInstantFiles = deleteFilesParallelize(metaClient, files, context, false);
554+
555+
for (Map.Entry<String, Boolean> result : resultDeleteInstantFiles.entrySet()) {
556+
LOG.info("Archived and deleted instant file " + result.getKey() + " : " + result.getValue());
557+
success &= result.getValue();
558+
}
559+
return success;
560+
}
561+
548562
/**
549563
* Remove older instants from auxiliary meta folder.
550564
*

0 commit comments

Comments
 (0)