@@ -140,6 +140,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
140140 protected transient Timer .Context writeTimer = null ;
141141 protected transient Timer .Context compactionTimer ;
142142 protected transient Timer .Context clusteringTimer ;
143+ protected transient Timer .Context logCompactionTimer ;
143144
144145 protected transient AsyncCleanerService asyncCleanerService ;
145146 protected transient AsyncArchiveService asyncArchiveService ;
@@ -362,7 +363,7 @@ public void bootstrap(Option<Map<String, String>> extraMetadata) {
362363 protected void rollbackFailedBootstrap () {
363364 LOG .info ("Rolling back pending bootstrap if present" );
364365 HoodieTable <T , I , K , O > table = createTable (config , hadoopConf );
365- HoodieTimeline inflightTimeline = table .getMetaClient ().getCommitsTimeline ().filterPendingExcludingCompaction ();
366+ HoodieTimeline inflightTimeline = table .getMetaClient ().getCommitsTimeline ().filterPendingExcludingMajorAndMinorCompaction ();
366367 Option <String > instant = Option .fromJavaOptional (
367368 inflightTimeline .getReverseOrderedInstants ().map (HoodieInstant ::getTimestamp ).findFirst ());
368369 if (instant .isPresent () && HoodieTimeline .compareTimestamps (instant .get (), HoodieTimeline .LESSER_THAN_OR_EQUALS ,
@@ -558,6 +559,15 @@ protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata me
558559 inlineScheduleCompaction (extraMetadata );
559560 }
560561
562+ // Do an inline log compaction if enabled
563+ if (config .inlineLogCompactionEnabled ()) {
564+ runAnyPendingLogCompactions (table );
565+ metadata .addMetadata (HoodieCompactionConfig .INLINE_LOG_COMPACT .key (), "true" );
566+ inlineLogCompact (extraMetadata );
567+ } else {
568+ metadata .addMetadata (HoodieCompactionConfig .INLINE_LOG_COMPACT .key (), "false" );
569+ }
570+
561571 // Do an inline clustering if enabled
562572 if (config .inlineClusteringEnabled ()) {
563573 runAnyPendingClustering (table );
@@ -585,6 +595,14 @@ protected void runAnyPendingCompactions(HoodieTable table) {
585595 });
586596 }
587597
598+ protected void runAnyPendingLogCompactions (HoodieTable table ) {
599+ table .getActiveTimeline ().getWriteTimeline ().filterPendingLogCompactionTimeline ().getInstants ()
600+ .forEach (instant -> {
601+ LOG .info ("Running previously failed inflight log compaction at instant " + instant );
602+ logCompact (instant .getTimestamp (), true );
603+ });
604+ }
605+
588606 protected void runAnyPendingClustering (HoodieTable table ) {
589607 table .getActiveTimeline ().filterPendingReplaceTimeline ().getInstants ().forEach (instant -> {
590608 Option <Pair <HoodieInstant , HoodieClusteringPlan >> instantPlan = ClusteringUtils .getClusteringPlan (table .getMetaClient (), instant );
@@ -1071,13 +1089,60 @@ public abstract void commitCompaction(String compactionInstantTime, HoodieCommit
10711089 */
10721090 protected abstract void completeCompaction (HoodieCommitMetadata metadata , HoodieTable table , String compactionCommitTime );
10731091
1092+ /**
1093+ * Schedules a new log compaction instant.
1094+ * @param extraMetadata Extra Metadata to be stored
1095+ */
1096+ public Option <String > scheduleLogCompaction (Option <Map <String , String >> extraMetadata ) throws HoodieIOException {
1097+ String instantTime = HoodieActiveTimeline .createNewInstantTime ();
1098+ return scheduleLogCompactionAtInstant (instantTime , extraMetadata ) ? Option .of (instantTime ) : Option .empty ();
1099+ }
1100+
1101+ /**
1102+ * Schedules a new log compaction instant with passed-in instant time.
1103+ * @param instantTime Log Compaction Instant Time
1104+ * @param extraMetadata Extra Metadata to be stored
1105+ */
1106+ public boolean scheduleLogCompactionAtInstant (String instantTime , Option <Map <String , String >> extraMetadata ) throws HoodieIOException {
1107+ return scheduleTableService (instantTime , extraMetadata , TableServiceType .LOG_COMPACT ).isPresent ();
1108+ }
1109+
1110+ /**
1111+ * Performs Log Compaction for the workload stored in instant-time.
1112+ *
1113+ * @param logCompactionInstantTime Log Compaction Instant Time
1114+ * @return Collection of WriteStatus to inspect errors and counts
1115+ */
1116+ public HoodieWriteMetadata <O > logCompact (String logCompactionInstantTime ) {
1117+ return logCompact (logCompactionInstantTime , config .shouldAutoCommit ());
1118+ }
1119+
1120+ /**
1121+ * Commit a log compaction operation. Allow passing additional meta-data to be stored in commit instant file.
1122+ *
1123+ * @param logCompactionInstantTime Log Compaction Instant Time
1124+ * @param metadata All the metadata that gets stored along with a commit
1125+ * @param extraMetadata Extra Metadata to be stored
1126+ */
1127+ public void commitLogCompaction (String logCompactionInstantTime , HoodieCommitMetadata metadata ,
1128+ Option <Map <String , String >> extraMetadata ) {
1129+ throw new UnsupportedOperationException ("Log compaction is not supported yet." );
1130+ }
1131+
1132+ /**
1133+ * Commit Log Compaction and track metrics.
1134+ */
1135+ protected void completeLogCompaction (HoodieCommitMetadata metadata , HoodieTable <T , I , K , O > table , String logCompactionCommitTime ) {
1136+ throw new UnsupportedOperationException ("Log compaction is not supported yet." );
1137+ }
1138+
10741139 /**
10751140 * Get inflight time line exclude compaction and clustering.
10761141 * @param metaClient
10771142 * @return
10781143 */
10791144 private HoodieTimeline getInflightTimelineExcludeCompactionAndClustering (HoodieTableMetaClient metaClient ) {
1080- HoodieTimeline inflightTimelineWithReplaceCommit = metaClient .getCommitsTimeline ().filterPendingExcludingCompaction ();
1145+ HoodieTimeline inflightTimelineWithReplaceCommit = metaClient .getCommitsTimeline ().filterPendingExcludingMajorAndMinorCompaction ();
10811146 HoodieTimeline inflightTimelineExcludeClusteringCommit = inflightTimelineWithReplaceCommit .filter (instant -> {
10821147 if (instant .getAction ().equals (HoodieTimeline .REPLACE_COMMIT_ACTION )) {
10831148 Option <Pair <HoodieInstant , HoodieClusteringPlan >> instantPlan = ClusteringUtils .getClusteringPlan (metaClient , instant );
@@ -1133,7 +1198,7 @@ protected Map<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInfos
11331198 try {
11341199 String action = rollbackPlan .getInstantToRollback ().getAction ();
11351200 if (ignoreCompactionAndClusteringInstants ) {
1136- if (!HoodieTimeline .COMPACTION_ACTION .equals (action )) {
1201+ if (!HoodieTimeline .COMPACTION_ACTION .equals (action ) && ! HoodieTimeline . LOG_COMPACTION_ACTION . equals ( action ) ) {
11371202 boolean isClustering = HoodieTimeline .REPLACE_COMMIT_ACTION .equals (action )
11381203 && ClusteringUtils .getClusteringPlan (metaClient , new HoodieInstant (true , rollbackPlan .getInstantToRollback ().getAction (),
11391204 rollbackPlan .getInstantToRollback ().getCommitTime ())).isPresent ();
@@ -1247,6 +1312,28 @@ protected Option<String> inlineScheduleCompaction(Option<Map<String, String>> ex
12471312 return scheduleCompaction (extraMetadata );
12481313 }
12491314
1315+ /**
1316+ * Ensures compaction instant is in expected state and performs Log Compaction for the workload stored in instant-time.s
1317+ *
1318+ * @param compactionInstantTime Compaction Instant Time
1319+ * @return Collection of Write Status
1320+ */
1321+ protected HoodieWriteMetadata <O > logCompact (String compactionInstantTime , boolean shouldComplete ) {
1322+ throw new UnsupportedOperationException ("Log compaction is not supported yet." );
1323+ }
1324+
1325+ /**
1326+ * Performs a log compaction operation on a table, serially before or after an insert/upsert action.
1327+ */
1328+ protected Option <String > inlineLogCompact (Option <Map <String , String >> extraMetadata ) {
1329+ Option <String > logCompactionInstantTimeOpt = scheduleLogCompaction (extraMetadata );
1330+ logCompactionInstantTimeOpt .ifPresent (logCompactInstantTime -> {
1331+ // inline log compaction should auto commit as the user is never given control
1332+ logCompact (logCompactInstantTime , true );
1333+ });
1334+ return logCompactionInstantTimeOpt ;
1335+ }
1336+
12501337 /**
12511338 * Schedules a new clustering instant.
12521339 * @param extraMetadata Extra Metadata to be stored
@@ -1342,6 +1429,11 @@ private Option<String> scheduleTableServiceInternal(String instantTime, Option<M
13421429 Option <HoodieCompactionPlan > compactionPlan = createTable (config , hadoopConf )
13431430 .scheduleCompaction (context , instantTime , extraMetadata );
13441431 return compactionPlan .isPresent () ? Option .of (instantTime ) : Option .empty ();
1432+ case LOG_COMPACT :
1433+ LOG .info ("Scheduling log compaction at instant time :" + instantTime );
1434+ Option <HoodieCompactionPlan > logCompactionPlan = createTable (config , hadoopConf )
1435+ .scheduleLogCompaction (context , instantTime , extraMetadata );
1436+ return logCompactionPlan .isPresent () ? Option .of (instantTime ) : Option .empty ();
13451437 case CLEAN :
13461438 LOG .info ("Scheduling cleaning at instant time :" + instantTime );
13471439 Option <HoodieCleanerPlan > cleanerPlan = createTable (config , hadoopConf )
0 commit comments