Skip to content

Commit f3fdd67

Browse files
committed
[HUDI-3558] consistent bucket index: support bucket num resizing & concurrent write
1 parent 61030d8 commit f3fdd67

41 files changed

Lines changed: 1899 additions & 106 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,14 @@ public class HoodieClusteringConfig extends HoodieConfig {
5151
public static final String CLUSTERING_STRATEGY_PARAM_PREFIX = "hoodie.clustering.plan.strategy.";
5252
public static final String SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
5353
"org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy";
54+
public static final String SPARK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY =
55+
"org.apache.hudi.client.clustering.plan.strategy.SparkConsistentBucketClusteringPlanStrategy";
5456
public static final String JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
5557
"org.apache.hudi.client.clustering.plan.strategy.JavaSizeBasedClusteringPlanStrategy";
5658
public static final String SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY =
5759
"org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy";
60+
public static final String SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY =
61+
"org.apache.hudi.client.clustering.run.strategy.SparkConsistentBucketClusteringExecutionStrategy";
5862
public static final String JAVA_SORT_AND_SIZE_EXECUTION_STRATEGY =
5963
"org.apache.hudi.client.clustering.run.strategy.JavaSortAndSizeExecutionStrategy";
6064
public static final String PLAN_PARTITION_FILTER_MODE =

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@
3030
import org.apache.hudi.index.HoodieIndex;
3131
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
3232

33+
import org.apache.log4j.LogManager;
34+
import org.apache.log4j.Logger;
35+
3336
import javax.annotation.concurrent.Immutable;
3437

3538
import java.io.File;
@@ -55,6 +58,8 @@
5558
+ "which tags incoming records as either inserts or updates to older records.")
5659
public class HoodieIndexConfig extends HoodieConfig {
5760

61+
private static final Logger LOG = LogManager.getLogger(HoodieIndexConfig.class);
62+
5863
public static final ConfigProperty<String> INDEX_TYPE = ConfigProperty
5964
.key("hoodie.index.type")
6065
.noDefaultValue()
@@ -252,12 +257,37 @@ public class HoodieIndexConfig extends HoodieConfig {
252257
.withDocumentation("Only applies if index type is BUCKET. Determine the number of buckets in the hudi table, "
253258
+ "and each partition is divided to N buckets.");
254259

260+
public static final ConfigProperty<String> BUCKET_INDEX_MAX_NUM_BUCKETS = ConfigProperty
261+
.key("hoodie.bucket.index.max.num.buckets")
262+
.noDefaultValue()
263+
.withDocumentation("Only applies if bucket index engine is dynamic (e.g., CONSISTENT_HASHING). Determine the upper bound of "
264+
+ "the num of buckets in the hudi table. Resizing are not allowed to split bucket if the number is reached.");
265+
266+
public static final ConfigProperty<String> BUCKET_INDEX_MIN_NUM_BUCKETS = ConfigProperty
267+
.key("hoodie.bucket.index.min.num.buckets")
268+
.noDefaultValue()
269+
.withDocumentation("Only applies if bucket index engine is dynamic (e.g., CONSISTENT_HASHING). Determine the lower bound of "
270+
+ "the num of buckets in the hudi table. Resizing are not allowed to merge buckets if the number is reached.");
271+
255272
public static final ConfigProperty<String> BUCKET_INDEX_HASH_FIELD = ConfigProperty
256273
.key("hoodie.bucket.index.hash.field")
257274
.noDefaultValue()
258275
.withDocumentation("Index key. It is used to index the record and find its file group. "
259276
+ "If not set, use record key field as default");
260277

278+
public static final ConfigProperty<Double> BUCKET_SPLIT_THRESHOLD = ConfigProperty
279+
.key("hoodie.bucket.index.split.threshold")
280+
.defaultValue(2.0)
281+
.withDocumentation("Control if the bucket should be split when using dynamic bucket index (i.e., CONSISTENT HASHING)."
282+
+ "Specifically, if a file slice size reach `hoodie.xxxx.max.file.size` * threshold, then split will be carried out.");
283+
284+
public static final ConfigProperty<Double> BUCKET_MERGE_THRESHOLD = ConfigProperty
285+
.key("hoodie.bucket.index.merge.threshold")
286+
.defaultValue(0.2)
287+
.withDocumentation("Control if buckets should be merged when using dynamic bucket index (i.e., CONSISTENT HASHING)."
288+
+ "Specifically, if a file slice size is smaller than `hoodie.xxxx.max.file.size` * threshold, then it will be considered"
289+
+ "as a merge condidate.");
290+
261291
/**
262292
* Deprecated configs. These are now part of {@link HoodieHBaseIndexConfig}.
263293
*/
@@ -589,6 +619,16 @@ public Builder withBucketNum(String bucketNum) {
589619
return this;
590620
}
591621

622+
public Builder withBucketMaxNum(int bucketMaxNum) {
623+
hoodieIndexConfig.setValue(BUCKET_INDEX_MAX_NUM_BUCKETS, String.valueOf(bucketMaxNum));
624+
return this;
625+
}
626+
627+
public Builder withBucketMinNum(int bucketMinNum) {
628+
hoodieIndexConfig.setValue(BUCKET_INDEX_MIN_NUM_BUCKETS, String.valueOf(bucketMinNum));
629+
return this;
630+
}
631+
592632
public Builder withIndexKeyField(String keyField) {
593633
hoodieIndexConfig.setValue(BUCKET_INDEX_HASH_FIELD, keyField);
594634
return this;
@@ -639,6 +679,20 @@ private void validateBucketIndexConfig() {
639679
if (hoodieIndexConfig.getIntOrDefault(BUCKET_INDEX_NUM_BUCKETS) <= 0) {
640680
throw new HoodieIndexException("When using bucket index, hoodie.bucket.index.num.buckets cannot be negative.");
641681
}
682+
int bucketNum = hoodieIndexConfig.getInt(BUCKET_INDEX_NUM_BUCKETS);
683+
if (StringUtils.isNullOrEmpty(hoodieIndexConfig.getString(BUCKET_INDEX_MAX_NUM_BUCKETS))) {
684+
hoodieIndexConfig.setValue(BUCKET_INDEX_MAX_NUM_BUCKETS, Integer.toString(bucketNum));
685+
} else if (hoodieIndexConfig.getInt(BUCKET_INDEX_MAX_NUM_BUCKETS) < bucketNum) {
686+
LOG.warn("Maximum bucket number is smaller than bucket number, maximum: " + hoodieIndexConfig.getInt(BUCKET_INDEX_MAX_NUM_BUCKETS) + ", bucketNum: " + bucketNum);
687+
hoodieIndexConfig.setValue(BUCKET_INDEX_MAX_NUM_BUCKETS, Integer.toString(bucketNum));
688+
}
689+
690+
if (StringUtils.isNullOrEmpty(hoodieIndexConfig.getString(BUCKET_INDEX_MIN_NUM_BUCKETS))) {
691+
hoodieIndexConfig.setValue(BUCKET_INDEX_MIN_NUM_BUCKETS, Integer.toString(bucketNum));
692+
} else if (hoodieIndexConfig.getInt(BUCKET_INDEX_MIN_NUM_BUCKETS) > bucketNum) {
693+
LOG.warn("Minimum bucket number is larger than the bucket number, minimum: " + hoodieIndexConfig.getInt(BUCKET_INDEX_MIN_NUM_BUCKETS) + ", bucketNum: " + bucketNum);
694+
hoodieIndexConfig.setValue(BUCKET_INDEX_MIN_NUM_BUCKETS, Integer.toString(bucketNum));
695+
}
642696
}
643697
}
644698
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@
9898
public class HoodieWriteConfig extends HoodieConfig {
9999

100100
private static final Logger LOG = LogManager.getLogger(HoodieWriteConfig.class);
101+
101102
private static final long serialVersionUID = 0L;
102103

103104
// This is a constant as is should never be changed via config (will invalidate previous commits)
@@ -1633,13 +1634,42 @@ public int getBucketIndexNumBuckets() {
16331634
return getIntOrDefault(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS);
16341635
}
16351636

1637+
public int getBucketIndexMaxNumBuckets() {
1638+
return getInt(HoodieIndexConfig.BUCKET_INDEX_MAX_NUM_BUCKETS);
1639+
}
1640+
1641+
public int getBucketIndexMinNumBuckets() {
1642+
return getInt(HoodieIndexConfig.BUCKET_INDEX_MIN_NUM_BUCKETS);
1643+
}
1644+
1645+
public double getBucketSplitThreshold() {
1646+
return getDouble(HoodieIndexConfig.BUCKET_SPLIT_THRESHOLD);
1647+
}
1648+
1649+
public double getBucketMergeThreshold() {
1650+
return getDouble(HoodieIndexConfig.BUCKET_MERGE_THRESHOLD);
1651+
}
1652+
16361653
public String getBucketIndexHashField() {
16371654
return getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD);
16381655
}
16391656

16401657
/**
16411658
* storage properties.
16421659
*/
1660+
public long getMaxFileSize(HoodieFileFormat format) {
1661+
switch (format) {
1662+
case PARQUET:
1663+
return getParquetMaxFileSize();
1664+
case HFILE:
1665+
return getHFileMaxFileSize();
1666+
case ORC:
1667+
return getOrcMaxFileSize();
1668+
default:
1669+
throw new HoodieNotSupportedException("Unknown file format: " + format);
1670+
}
1671+
}
1672+
16431673
public long getParquetMaxFileSize() {
16441674
return getLong(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE);
16451675
}
@@ -2590,6 +2620,18 @@ private void validate() {
25902620
ValidationUtils.checkArgument(!writeConfig.getString(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY)
25912621
.equals(HoodieFailedWritesCleaningPolicy.EAGER.name()), "To enable optimistic concurrency control, set hoodie.cleaner.policy.failed.writes=LAZY");
25922622
}
2623+
2624+
if (writeConfig.getIndexType() == HoodieIndex.IndexType.BUCKET && writeConfig.getBucketIndexEngineType() == HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING) {
2625+
if (!writeConfig.getClusteringPlanStrategyClass().equals(HoodieClusteringConfig.SPARK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY)) {
2626+
LOG.warn("Force setting clustering plan strategy class to '" + HoodieClusteringConfig.SPARK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY + "' because of Consistent bucket index");
2627+
writeConfig.setValue(HoodieClusteringConfig.PLAN_STRATEGY_CLASS_NAME, HoodieClusteringConfig.SPARK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY);
2628+
}
2629+
2630+
if (!writeConfig.getClusteringExecutionStrategyClass().equals(HoodieClusteringConfig.SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY)) {
2631+
LOG.warn("Force setting clustering execution strategy class to '" + HoodieClusteringConfig.SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY + "' because of Consistent bucket index");
2632+
writeConfig.setValue(HoodieClusteringConfig.EXECUTION_STRATEGY_CLASS_NAME, HoodieClusteringConfig.SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY);
2633+
}
2634+
}
25932635
}
25942636

25952637
public HoodieWriteConfig build() {

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,18 @@ public abstract HoodieData<WriteStatus> updateLocation(
8888
HoodieData<WriteStatus> writeStatuses, HoodieEngineContext context,
8989
HoodieTable hoodieTable) throws HoodieIndexException;
9090

91+
92+
/**
93+
* Extracts the location of written records, and updates the index.
94+
*/
95+
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
96+
public HoodieData<WriteStatus> updateLocation(
97+
HoodieData<WriteStatus> writeStatuses, HoodieEngineContext context,
98+
HoodieTable hoodieTable, String instant) throws HoodieIndexException {
99+
return updateLocation(writeStatuses, context, hoodieTable);
100+
}
101+
102+
91103
/**
92104
* Rollback the effects of the commit made at instantTime.
93105
*/

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIdentifier.java

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,22 @@
2222
import org.apache.hudi.common.model.ConsistentHashingNode;
2323
import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
2424
import org.apache.hudi.common.model.HoodieKey;
25+
import org.apache.hudi.common.util.Option;
26+
import org.apache.hudi.common.util.ValidationUtils;
2527
import org.apache.hudi.common.util.hash.HashID;
28+
import org.apache.hudi.exception.HoodieClusteringException;
2629

30+
import org.jetbrains.annotations.NotNull;
31+
32+
import java.util.ArrayList;
33+
import java.util.Arrays;
2734
import java.util.Collection;
2835
import java.util.HashMap;
2936
import java.util.List;
3037
import java.util.Map;
3138
import java.util.SortedMap;
3239
import java.util.TreeMap;
40+
import java.util.stream.Collectors;
3341

3442
public class ConsistentBucketIdentifier extends BucketIdentifier {
3543

@@ -88,17 +96,113 @@ protected ConsistentHashingNode getBucket(int hashValue) {
8896
return tailMap.isEmpty() ? ring.firstEntry().getValue() : tailMap.get(tailMap.firstKey());
8997
}
9098

99+
/**
100+
* Get the former node that given the bucket node.
101+
*
102+
* @param fileId
103+
* @return
104+
*/
105+
public ConsistentHashingNode getFormerBucket(String fileId) {
106+
return getFormerBucket(getBucketByFileId(fileId).getValue());
107+
}
108+
109+
/**
110+
* Get the former node that neighbouring the hash value.
111+
*
112+
* @param hashValue
113+
* @return
114+
*/
115+
public ConsistentHashingNode getFormerBucket(int hashValue) {
116+
SortedMap<Integer, ConsistentHashingNode> headMap = ring.headMap(hashValue);
117+
return headMap.isEmpty() ? ring.lastEntry().getValue() : headMap.get(headMap.lastKey());
118+
}
119+
120+
public List<ConsistentHashingNode> mergeBucket(List<String> fileIds) {
121+
// Get nodes using fileIds
122+
List<ConsistentHashingNode> nodes = fileIds.stream().map(this::getBucketByFileId)
123+
.collect(Collectors.toList());
124+
125+
// Validate the input
126+
for (int i = 0; i < nodes.size() - 1; ++i) {
127+
ValidationUtils.checkState(getFormerBucket(nodes.get(i + 1).getValue()).getValue() == nodes.get(i).getValue(), "Cannot merge discontinuous hash range");
128+
}
129+
130+
// Create child nodes with proper tag (keep last and delete other nodes)
131+
List<ConsistentHashingNode> childNodes = new ArrayList<>(nodes.size());
132+
for (int i = 0; i < nodes.size() - 1; ++i) {
133+
childNodes.add(new ConsistentHashingNode(nodes.get(i).getValue(), null, ConsistentHashingNode.NodeTag.DELETE));
134+
}
135+
childNodes.add(new ConsistentHashingNode(nodes.get(nodes.size() - 1).getValue(), FSUtils.createNewFileIdPfx(), ConsistentHashingNode.NodeTag.REPLACE));
136+
return childNodes;
137+
}
138+
139+
public Option<List<ConsistentHashingNode>> splitBucket(String fileId) {
140+
ConsistentHashingNode bucket = getBucketByFileId(fileId);
141+
ValidationUtils.checkState(bucket != null, "FileId has no corresponding bucket");
142+
return splitBucket(bucket);
143+
}
144+
145+
/**
146+
* Split bucket in the range middle, also generate the corresponding file ids
147+
*
148+
* TODO support different split criteria, e.g., distributed records evenly using statistics
149+
*
150+
* @param bucket parent bucket
151+
* @return lists of children buckets
152+
*/
153+
public Option<List<ConsistentHashingNode>> splitBucket(@NotNull ConsistentHashingNode bucket) {
154+
ConsistentHashingNode formerBucket = getFormerBucket(bucket.getValue());
155+
156+
long mid = (long) formerBucket.getValue() + bucket.getValue()
157+
+ (formerBucket.getValue() < bucket.getValue() ? 0 : (HoodieConsistentHashingMetadata.HASH_VALUE_MASK + 1L));
158+
mid = (mid >> 1) & HoodieConsistentHashingMetadata.HASH_VALUE_MASK;
159+
160+
// Cannot split as it already is the smallest bucket range
161+
if (mid == formerBucket.getValue() || mid == bucket.getValue()) {
162+
return Option.empty();
163+
}
164+
165+
return Option.of(Arrays.asList(
166+
new ConsistentHashingNode((int) mid, FSUtils.createNewFileIdPfx(), ConsistentHashingNode.NodeTag.REPLACE),
167+
new ConsistentHashingNode(bucket.getValue(), FSUtils.createNewFileIdPfx(), ConsistentHashingNode.NodeTag.REPLACE))
168+
);
169+
}
170+
91171
/**
92172
* Initialize necessary data structure to facilitate bucket identifying.
93173
* Specifically, we construct:
94174
* - An in-memory tree (ring) to speed up range mapping searching.
95175
* - A hash table (fileIdToBucket) to allow lookup of bucket using fileId.
176+
* <p>
177+
* Children nodes are also considered, and will override the original nodes,
178+
* which is used during bucket resizing (i.e., children nodes take the place
179+
* of the original nodes)
96180
*/
97181
private void initialize() {
98182
for (ConsistentHashingNode p : metadata.getNodes()) {
99183
ring.put(p.getValue(), p);
100184
// One bucket has only one file group, so append 0 directly
101185
fileIdToBucket.put(FSUtils.createNewFileId(p.getFileIdPrefix(), 0), p);
102186
}
187+
188+
// Handle children nodes, i.e., replace or delete the original nodes
189+
ConsistentHashingNode tmp;
190+
for (ConsistentHashingNode p : metadata.getChildrenNodes()) {
191+
switch (p.getTag()) {
192+
case REPLACE:
193+
tmp = ring.put(p.getValue(), p);
194+
if (tmp != null) {
195+
fileIdToBucket.remove(FSUtils.createNewFileId(tmp.getFileIdPfx(), 0));
196+
}
197+
fileIdToBucket.put(FSUtils.createNewFileId(p.getFileIdPfx(), 0), p);
198+
break;
199+
case DELETE:
200+
tmp = ring.remove(p.getValue());
201+
fileIdToBucket.remove(FSUtils.createNewFileId(tmp.getFileIdPfx(), 0));
202+
break;
203+
default:
204+
throw new HoodieClusteringException("Children node is tagged as NORMAL or unknown tag: " + p);
205+
}
206+
}
103207
}
104208
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
5050
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
5151
import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock;
52+
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
5253
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
5354
import org.apache.hudi.common.util.DefaultSizeEstimator;
5455
import org.apache.hudi.common.util.Option;
@@ -151,6 +152,11 @@ private void init(HoodieRecord record) {
151152
logFiles = fileSlice.get().getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList());
152153
} else {
153154
baseInstantTime = instantTime;
155+
// Handle log file only case. This is necessary for the concurrent clustering and writer case.
156+
// NOTE: flink engine use instantTime to mark operation type, check BaseFlinkCommitActionExecutor::execute
157+
if (record.getCurrentLocation() != null && HoodieInstantTimeGenerator.isValidInstantTime(record.getCurrentLocation().getInstantTime())) {
158+
baseInstantTime = record.getCurrentLocation().getInstantTime();
159+
}
154160
// This means there is no base data file, start appending to a new log file
155161
fileSlice = Option.of(new FileSlice(partitionPath, baseInstantTime, this.fileId));
156162
LOG.info("New AppendHandle for partition :" + partitionPath);

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,9 @@ protected Option<HoodieClusteringPlan> createClusteringPlan() {
7878
}
7979

8080
LOG.info("Generating clustering plan for table " + config.getBasePath());
81-
ClusteringPlanStrategy strategy = (ClusteringPlanStrategy)
82-
ReflectionUtils.loadClass(ClusteringPlanStrategy.checkAndGetClusteringPlanStrategy(config), table, context, config);
81+
ClusteringPlanStrategy strategy = (ClusteringPlanStrategy) ReflectionUtils.loadClass(
82+
ClusteringPlanStrategy.checkAndGetClusteringPlanStrategy(config),
83+
new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, table, context, config);
8384

8485
return strategy.generateClusteringPlan();
8586
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,14 @@ public ClusteringPlanStrategy(HoodieTable table, HoodieEngineContext engineConte
104104
*/
105105
public abstract Option<HoodieClusteringPlan> generateClusteringPlan();
106106

107+
/**
108+
* Check if the clustering can proceed. If not, empty plan will be returned to stop the scheduling.
109+
* @return
110+
*/
111+
public boolean checkPrecondition() {
112+
return true;
113+
}
114+
107115
/**
108116
* Return file slices eligible for clustering. FileIds in pending clustering/compaction are not eligible for clustering.
109117
*/

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ protected List<String> filterPartitionPaths(List<String> partitionPaths) {
6868

6969
@Override
7070
public Option<HoodieClusteringPlan> generateClusteringPlan() {
71+
if (!checkPrecondition()) {
72+
return Option.empty();
73+
}
74+
7175
HoodieTableMetaClient metaClient = getHoodieTable().getMetaClient();
7276
LOG.info("Scheduling clustering for " + metaClient.getBasePath());
7377
HoodieWriteConfig config = getWriteConfig();

0 commit comments

Comments
 (0)