3333import org .apache .hudi .client .embedded .EmbeddedTimelineService ;
3434import org .apache .hudi .client .heartbeat .HeartbeatUtils ;
3535import org .apache .hudi .client .transaction .TransactionManager ;
36+ import org .apache .hudi .client .utils .TransactionUtils ;
3637import org .apache .hudi .common .engine .HoodieEngineContext ;
3738import org .apache .hudi .common .model .HoodieCommitMetadata ;
3839import org .apache .hudi .common .model .HoodieFailedWritesCleaningPolicy ;
@@ -107,7 +108,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
107108 private transient HoodieWriteCommitCallback commitCallback ;
108109 protected transient AsyncCleanerService asyncCleanerService ;
109110 protected final TransactionManager txnManager ;
110- protected Option <HoodieInstant > lastCompletedTxn ;
111+ protected Option <Pair < HoodieInstant , Map < String , String >>> lastCompletedTxnAndMetadata = Option . empty () ;
111112
112113 /**
113114 * Create a write client, with new hudi index.
@@ -179,20 +180,20 @@ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Opti
179180 finalizeWrite (table , instantTime , stats );
180181 HeartbeatUtils .abortIfHeartbeatExpired (instantTime , table , heartbeatClient , config );
181182 this .txnManager .beginTransaction (Option .of (new HoodieInstant (State .INFLIGHT , table .getMetaClient ().getCommitActionType (), instantTime )),
182- lastCompletedTxn );
183+ lastCompletedTxnAndMetadata . isPresent () ? Option . of ( lastCompletedTxnAndMetadata . get (). getLeft ()) : Option . empty () );
183184 try {
184185 preCommit (instantTime , metadata );
185186 commit (table , commitActionType , instantTime , metadata , stats );
186187 postCommit (table , metadata , instantTime , extraMetadata );
187- emitCommitMetrics (instantTime , metadata , commitActionType );
188188 LOG .info ("Committed " + instantTime );
189189 } catch (IOException e ) {
190- throw new HoodieCommitException ("Failed to complete commit " + config .getBasePath () + " at time " + instantTime ,
191- e );
190+ throw new HoodieCommitException ("Failed to complete commit " + config .getBasePath () + " at time " + instantTime , e );
192191 } finally {
193192 this .txnManager .endTransaction ();
194193 }
195-
194+ // do this outside of lock since compaction, clustering can be time taking and we don't need a lock for the entire execution period
195+ runTableServicesInline (table , metadata , extraMetadata );
196+ emitCommitMetrics (instantTime , metadata , commitActionType );
196197 // callback if needed.
197198 if (config .writeCommitCallbackOn ()) {
198199 if (null == commitCallback ) {
@@ -392,9 +393,10 @@ public abstract O bulkInsertPreppedRecords(I preppedRecords, final String instan
392393 protected void preWrite (String instantTime , WriteOperationType writeOperationType ,
393394 HoodieTableMetaClient metaClient ) {
394395 setOperationType (writeOperationType );
395- this .lastCompletedTxn = metaClient .getActiveTimeline ().getCommitsTimeline ().filterCompletedInstants ()
396- .lastInstant ();
397- this .txnManager .beginTransaction (Option .of (new HoodieInstant (State .INFLIGHT , metaClient .getCommitActionType (), instantTime )), lastCompletedTxn );
396+ this .lastCompletedTxnAndMetadata = TransactionUtils .getLastCompletedTxnInstantAndMetadata (metaClient );
397+ this .txnManager .beginTransaction (Option .of (new HoodieInstant (State .INFLIGHT , metaClient .getCommitActionType (), instantTime )), lastCompletedTxnAndMetadata
398+ .isPresent ()
399+ ? Option .of (lastCompletedTxnAndMetadata .get ().getLeft ()) : Option .empty ());
398400 try {
399401 syncTableMetadata ();
400402 } finally {
@@ -422,29 +424,8 @@ protected void preWrite(String instantTime, WriteOperationType writeOperationTyp
422424 */
423425 protected void postCommit (HoodieTable <T , I , K , O > table , HoodieCommitMetadata metadata , String instantTime , Option <Map <String , String >> extraMetadata ) {
424426 try {
425-
426427 // Delete the marker directory for the instant.
427428 new MarkerFiles (table , instantTime ).quietDeleteMarkerDir (context , config .getMarkersDeleteParallelism ());
428-
429- if (config .inlineTableServices ()) {
430- // Do an inline compaction if enabled
431- if (config .inlineCompactionEnabled ()) {
432- runAnyPendingCompactions (table );
433- metadata .addMetadata (HoodieCompactionConfig .INLINE_COMPACT_PROP , "true" );
434- inlineCompact (extraMetadata );
435- } else {
436- metadata .addMetadata (HoodieCompactionConfig .INLINE_COMPACT_PROP , "false" );
437- }
438-
439- // Do an inline clustering if enabled
440- if (config .inlineClusteringEnabled ()) {
441- runAnyPendingClustering (table );
442- metadata .addMetadata (HoodieClusteringConfig .INLINE_CLUSTERING_PROP , "true" );
443- inlineCluster (extraMetadata );
444- } else {
445- metadata .addMetadata (HoodieClusteringConfig .INLINE_CLUSTERING_PROP , "false" );
446- }
447- }
448429 // We cannot have unbounded commit files. Archive commits if we have to archive
449430 HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog (config , table );
450431 archiveLog .archiveIfRequired (context );
@@ -457,6 +438,28 @@ protected void postCommit(HoodieTable<T, I, K, O> table, HoodieCommitMetadata me
457438 }
458439 }
459440
441+ protected void runTableServicesInline (HoodieTable <T , I , K , O > table , HoodieCommitMetadata metadata , Option <Map <String , String >> extraMetadata ) {
442+ if (config .inlineTableServices ()) {
443+ // Do an inline compaction if enabled
444+ if (config .inlineCompactionEnabled ()) {
445+ runAnyPendingCompactions (table );
446+ metadata .addMetadata (HoodieCompactionConfig .INLINE_COMPACT_PROP , "true" );
447+ inlineCompact (extraMetadata );
448+ } else {
449+ metadata .addMetadata (HoodieCompactionConfig .INLINE_COMPACT_PROP , "false" );
450+ }
451+
452+ // Do an inline clustering if enabled
453+ if (config .inlineClusteringEnabled ()) {
454+ runAnyPendingClustering (table );
455+ metadata .addMetadata (HoodieClusteringConfig .INLINE_CLUSTERING_PROP , "true" );
456+ inlineCluster (extraMetadata );
457+ } else {
458+ metadata .addMetadata (HoodieClusteringConfig .INLINE_CLUSTERING_PROP , "false" );
459+ }
460+ }
461+ }
462+
460463 protected void runAnyPendingCompactions (HoodieTable <T , I , K , O > table ) {
461464 table .getActiveTimeline ().getWriteTimeline ().filterPendingCompactionTimeline ().getInstants ()
462465 .forEach (instant -> {
@@ -719,7 +722,7 @@ private void startCommit(String instantTime, String actionType, HoodieTableMetaC
719722 */
720723 public Option <String > scheduleCompaction (Option <Map <String , String >> extraMetadata ) throws HoodieIOException {
721724 String instantTime = HoodieActiveTimeline .createNewInstantTime ();
722- return scheduleTableService (instantTime , extraMetadata , TableServiceType . COMPACT );
725+ return scheduleCompactionAtInstant (instantTime , extraMetadata ) ? Option . of ( instantTime ) : Option . empty ( );
723726 }
724727
725728 /**
@@ -728,10 +731,7 @@ public Option<String> scheduleCompaction(Option<Map<String, String>> extraMetada
728731 * @param extraMetadata Extra Metadata to be stored
729732 */
730733 public boolean scheduleCompactionAtInstant (String instantTime , Option <Map <String , String >> extraMetadata ) throws HoodieIOException {
731- LOG .info ("Scheduling compaction at instant time :" + instantTime );
732- Option <HoodieCompactionPlan > plan = createTable (config , hadoopConf )
733- .scheduleCompaction (context , instantTime , extraMetadata );
734- return plan .isPresent ();
734+ return scheduleTableService (instantTime , extraMetadata , TableServiceType .COMPACT ).isPresent ();
735735 }
736736
737737 /**
@@ -850,9 +850,7 @@ protected List<String> getInstantsToRollback(HoodieTableMetaClient metaClient, H
850850 * Performs a compaction operation on a table, serially before or after an insert/upsert action.
851851 */
852852 protected Option <String > inlineCompact (Option <Map <String , String >> extraMetadata ) {
853- String compactionInstantTime = HoodieActiveTimeline .createNewInstantTime ();
854- Option <String > compactionInstantTimeOpt = scheduleTableServiceInternal (compactionInstantTime ,
855- extraMetadata , TableServiceType .COMPACT );
853+ Option <String > compactionInstantTimeOpt = scheduleCompaction (extraMetadata );
856854 compactionInstantTimeOpt .ifPresent (compactInstantTime -> {
857855 // inline compaction should auto commit as the user is never given control
858856 compact (compactInstantTime , true );
@@ -866,7 +864,7 @@ protected Option<String> inlineCompact(Option<Map<String, String>> extraMetadata
866864 */
867865 public Option <String > scheduleClustering (Option <Map <String , String >> extraMetadata ) throws HoodieIOException {
868866 String instantTime = HoodieActiveTimeline .createNewInstantTime ();
869- return scheduleTableService (instantTime , extraMetadata , TableServiceType . CLUSTER );
867+ return scheduleClusteringAtInstant (instantTime , extraMetadata ) ? Option . of ( instantTime ) : Option . empty ( );
870868 }
871869
872870 /**
@@ -875,10 +873,7 @@ public Option<String> scheduleClustering(Option<Map<String, String>> extraMetada
875873 * @param extraMetadata Extra Metadata to be stored
876874 */
877875 public boolean scheduleClusteringAtInstant (String instantTime , Option <Map <String , String >> extraMetadata ) throws HoodieIOException {
878- LOG .info ("Scheduling clustering at instant time :" + instantTime );
879- Option <HoodieClusteringPlan > plan = createTable (config , hadoopConf )
880- .scheduleClustering (context , instantTime , extraMetadata );
881- return plan .isPresent ();
876+ return scheduleTableService (instantTime , extraMetadata , TableServiceType .CLUSTER ).isPresent ();
882877 }
883878
884879 /**
@@ -887,7 +882,7 @@ public boolean scheduleClusteringAtInstant(String instantTime, Option<Map<String
887882 */
888883 protected Option <String > scheduleCleaning (Option <Map <String , String >> extraMetadata ) throws HoodieIOException {
889884 String instantTime = HoodieActiveTimeline .createNewInstantTime ();
890- return scheduleTableService (instantTime , extraMetadata , TableServiceType . CLEAN );
885+ return scheduleCleaningAtInstant (instantTime , extraMetadata ) ? Option . of ( instantTime ) : Option . empty ( );
891886 }
892887
893888 /**
@@ -896,15 +891,11 @@ protected Option<String> scheduleCleaning(Option<Map<String, String>> extraMetad
896891 * @param extraMetadata Extra Metadata to be stored
897892 */
898893 protected boolean scheduleCleaningAtInstant (String instantTime , Option <Map <String , String >> extraMetadata ) throws HoodieIOException {
899- LOG .info ("Scheduling clustering at instant time :" + instantTime );
900- Option <HoodieCleanerPlan > plan = createTable (config , hadoopConf )
901- .scheduleCleaning (context , instantTime , extraMetadata );
902- return plan .isPresent ();
894+ return scheduleTableService (instantTime , extraMetadata , TableServiceType .CLEAN ).isPresent ();
903895 }
904896
905897 /**
906898 * Ensures clustering instant is in expected state and performs clustering for the plan stored in metadata.
907- *
908899 * @param clusteringInstant Clustering Instant Time
909900 * @return Collection of Write Status
910901 */
@@ -946,11 +937,20 @@ private Option<String> scheduleTableServiceInternal(String instantTime, Option<M
946937 TableServiceType tableServiceType ) {
947938 switch (tableServiceType ) {
948939 case CLUSTER :
949- return scheduleClusteringAtInstant (instantTime , extraMetadata ) ? Option .of (instantTime ) : Option .empty ();
940+ LOG .info ("Scheduling clustering at instant time :" + instantTime );
941+ Option <HoodieClusteringPlan > clusteringPlan = createTable (config , hadoopConf )
942+ .scheduleClustering (context , instantTime , extraMetadata );
943+ return clusteringPlan .isPresent () ? Option .of (instantTime ) : Option .empty ();
950944 case COMPACT :
951- return scheduleCompactionAtInstant (instantTime , extraMetadata ) ? Option .of (instantTime ) : Option .empty ();
945+ LOG .info ("Scheduling compaction at instant time :" + instantTime );
946+ Option <HoodieCompactionPlan > compactionPlan = createTable (config , hadoopConf )
947+ .scheduleCompaction (context , instantTime , extraMetadata );
948+ return compactionPlan .isPresent () ? Option .of (instantTime ) : Option .empty ();
952949 case CLEAN :
953- return scheduleCleaningAtInstant (instantTime , extraMetadata ) ? Option .of (instantTime ) : Option .empty ();
950+ LOG .info ("Scheduling cleaning at instant time :" + instantTime );
951+ Option <HoodieCleanerPlan > cleanerPlan = createTable (config , hadoopConf )
952+ .scheduleCleaning (context , instantTime , extraMetadata );
953+ return cleanerPlan .isPresent () ? Option .of (instantTime ) : Option .empty ();
954954 default :
955955 throw new IllegalArgumentException ("Invalid TableService " + tableServiceType );
956956 }
@@ -960,9 +960,7 @@ private Option<String> scheduleTableServiceInternal(String instantTime, Option<M
960960 * Executes a clustering plan on a table, serially before or after an insert/upsert action.
961961 */
962962 protected Option <String > inlineCluster (Option <Map <String , String >> extraMetadata ) {
963- String clusteringInstantTme = HoodieActiveTimeline .createNewInstantTime ();
964- Option <String > clusteringInstantOpt = scheduleTableServiceInternal (clusteringInstantTme ,
965- extraMetadata , TableServiceType .CLUSTER );
963+ Option <String > clusteringInstantOpt = scheduleClustering (extraMetadata );
966964 clusteringInstantOpt .ifPresent (clusteringInstant -> {
967965 // inline cluster should auto commit as the user is never given control
968966 cluster (clusteringInstant , true );
0 commit comments