Skip to content

Commit 33aace3

Browse files
committed
[HUDI-3558] consistent hashing index support bulk insert & resolve conflicts
1 parent f3fdd67 commit 33aace3

17 files changed

Lines changed: 253 additions & 61 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIndexLocationMapper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@
2828
public interface BucketIndexLocationMapper extends Serializable {
2929

3030
/**
31-
* Get record location given hoodie key and partition path
31+
* Get record location given hoodie key
3232
*/
33-
Option<HoodieRecordLocation> getRecordLocation(HoodieKey key, String partitionPath);
33+
Option<HoodieRecordLocation> getRecordLocation(HoodieKey key);
3434

3535
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIdentifier.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public int getNumBuckets() {
7676
/**
7777
* Get bucket of the given file group
7878
*
79-
* @param fileId the file group id. NOTE: not filePfx (i.e., uuid)
79+
* @param fileId the file group id. NOTE: not filePrefix (i.e., uuid)
8080
*/
8181
public ConsistentHashingNode getBucketByFileId(String fileId) {
8282
return fileIdToBucket.get(fileId);
@@ -192,13 +192,13 @@ private void initialize() {
192192
case REPLACE:
193193
tmp = ring.put(p.getValue(), p);
194194
if (tmp != null) {
195-
fileIdToBucket.remove(FSUtils.createNewFileId(tmp.getFileIdPfx(), 0));
195+
fileIdToBucket.remove(FSUtils.createNewFileId(tmp.getFileIdPrefix(), 0));
196196
}
197-
fileIdToBucket.put(FSUtils.createNewFileId(p.getFileIdPfx(), 0), p);
197+
fileIdToBucket.put(FSUtils.createNewFileId(p.getFileIdPrefix(), 0), p);
198198
break;
199199
case DELETE:
200200
tmp = ring.remove(p.getValue());
201-
fileIdToBucket.remove(FSUtils.createNewFileId(tmp.getFileIdPfx(), 0));
201+
fileIdToBucket.remove(FSUtils.createNewFileId(tmp.getFileIdPrefix(), 0));
202202
break;
203203
default:
204204
throw new HoodieClusteringException("Children node is tagged as NORMAL or unknown tag: " + p);

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public <R> HoodieData<HoodieRecord<R>> tagLocation(
8080
protected HoodieRecord<R> computeNext() {
8181
// TODO maybe batch the operation to improve performance
8282
HoodieRecord record = inputItr.next();
83-
Option<HoodieRecordLocation> loc = mapper.getRecordLocation(record.getKey(), record.getPartitionPath());
83+
Option<HoodieRecordLocation> loc = mapper.getRecordLocation(record.getKey());
8484
return HoodieIndexUtils.getTaggedRecord(record, loc);
8585
}
8686
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,9 @@ public SimpleBucketIndexLocationMapper(HoodieTable table, List<String> partition
9090
}
9191

9292
@Override
93-
public Option<HoodieRecordLocation> getRecordLocation(HoodieKey key, String partitionPath) {
93+
public Option<HoodieRecordLocation> getRecordLocation(HoodieKey key) {
9494
int bucketId = BucketIdentifier.getBucketId(key, indexKeyFields, numBuckets);
95-
Map<Integer, HoodieRecordLocation> bucketIdToFileIdMapping = partitionPathFileIDList.get(partitionPath);
95+
Map<Integer, HoodieRecordLocation> bucketIdToFileIdMapping = partitionPathFileIDList.get(key.getPartitionPath());
9696
return Option.ofNullable(bucketIdToFileIdMapping.getOrDefault(bucketId, null));
9797
}
9898
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieConsistentBucketLayout.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ public class HoodieConsistentBucketLayout extends HoodieStorageLayout {
3232
public static final Set<WriteOperationType> SUPPORTED_OPERATIONS = CollectionUtils.createImmutableSet(
3333
WriteOperationType.INSERT,
3434
WriteOperationType.INSERT_PREPPED,
35+
WriteOperationType.BULK_INSERT,
36+
WriteOperationType.BULK_INSERT_PREPPED,
3537
WriteOperationType.UPSERT,
3638
WriteOperationType.UPSERT_PREPPED,
3739
WriteOperationType.INSERT_OVERWRITE,

hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestConsistentBucketIdIdentifier.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -128,26 +128,26 @@ public void testMerge() {
128128
HoodieConsistentHashingMetadata meta = new HoodieConsistentHashingMetadata("partition", 8);
129129
List<ConsistentHashingNode> nodes = meta.getNodes();
130130

131-
List<String> fileIds = IntStream.range(0, 3).mapToObj(i -> FSUtils.createNewFileId(nodes.get(i).getFileIdPfx(), 0)).collect(Collectors.toList());
131+
List<String> fileIds = IntStream.range(0, 3).mapToObj(i -> FSUtils.createNewFileId(nodes.get(i).getFileIdPrefix(), 0)).collect(Collectors.toList());
132132
List<ConsistentHashingNode> childNodes = new ConsistentBucketIdentifier(meta).mergeBucket(fileIds);
133133
Assertions.assertEquals(ConsistentHashingNode.NodeTag.DELETE, childNodes.get(0).getTag());
134134
Assertions.assertEquals(ConsistentHashingNode.NodeTag.DELETE, childNodes.get(1).getTag());
135135
Assertions.assertEquals(ConsistentHashingNode.NodeTag.REPLACE, childNodes.get(2).getTag());
136136
Assertions.assertEquals(nodes.get(2).getValue(), childNodes.get(2).getValue());
137-
Assertions.assertNotEquals(nodes.get(2).getFileIdPfx(), childNodes.get(2).getFileIdPfx());
137+
Assertions.assertNotEquals(nodes.get(2).getFileIdPrefix(), childNodes.get(2).getFileIdPrefix());
138138

139139
fileIds = Arrays.asList(nodes.get(7), nodes.get(0), nodes.get(1)).stream()
140-
.map(ConsistentHashingNode::getFileIdPfx).map(f -> FSUtils.createNewFileId(f, 0)).collect(Collectors.toList());
140+
.map(ConsistentHashingNode::getFileIdPrefix).map(f -> FSUtils.createNewFileId(f, 0)).collect(Collectors.toList());
141141
childNodes = new ConsistentBucketIdentifier(meta).mergeBucket(fileIds);
142142
Assertions.assertEquals(ConsistentHashingNode.NodeTag.DELETE, childNodes.get(0).getTag());
143143
Assertions.assertEquals(ConsistentHashingNode.NodeTag.DELETE, childNodes.get(1).getTag());
144144
Assertions.assertEquals(ConsistentHashingNode.NodeTag.REPLACE, childNodes.get(2).getTag());
145145
Assertions.assertEquals(nodes.get(1).getValue(), childNodes.get(2).getValue());
146-
Assertions.assertNotEquals(nodes.get(1).getFileIdPfx(), childNodes.get(2).getFileIdPfx());
146+
Assertions.assertNotEquals(nodes.get(1).getFileIdPrefix(), childNodes.get(2).getFileIdPrefix());
147147

148148
boolean exception = false;
149149
try {
150-
fileIds = IntStream.range(0, 2).mapToObj(i -> FSUtils.createNewFileId(nodes.get(i * 2).getFileIdPfx(), 0)).collect(Collectors.toList());
150+
fileIds = IntStream.range(0, 2).mapToObj(i -> FSUtils.createNewFileId(nodes.get(i * 2).getFileIdPrefix(), 0)).collect(Collectors.toList());
151151
new ConsistentBucketIdentifier(meta).mergeBucket(fileIds);
152152
} catch (Exception e) {
153153
exception = true;
@@ -167,9 +167,9 @@ public void testChildrenNodesInitialization() {
167167
ConsistentBucketIdentifier identifier = new ConsistentBucketIdentifier(metadata);
168168
List<ConsistentHashingNode> nodes = new ArrayList<>(identifier.getNodes());
169169
Assertions.assertEquals(1024, nodes.get(0).getValue());
170-
Assertions.assertEquals("a1", nodes.get(0).getFileIdPfx());
170+
Assertions.assertEquals("a1", nodes.get(0).getFileIdPrefix());
171171
Assertions.assertEquals(metadata.getNodes().get(1).getValue(), nodes.get(1).getValue());
172-
Assertions.assertEquals("a2", nodes.get(1).getFileIdPfx());
172+
Assertions.assertEquals("a2", nodes.get(1).getFileIdPrefix());
173173

174174
childrenNodes = new ArrayList<>();
175175
childrenNodes.add(new ConsistentHashingNode(metadata.getNodes().get(0).getValue(), "d1", ConsistentHashingNode.NodeTag.NORMAL));

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkDuplicateUpdateStrategy.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.hudi.avro.model.HoodieClusteringGroup;
2222
import org.apache.hudi.avro.model.HoodieClusteringPlan;
2323
import org.apache.hudi.client.clustering.plan.strategy.SparkConsistentBucketClusteringPlanStrategy;
24+
import org.apache.hudi.common.data.HoodieData;
2425
import org.apache.hudi.common.engine.HoodieEngineContext;
2526
import org.apache.hudi.common.fs.FSUtils;
2627
import org.apache.hudi.common.model.ConsistentHashingNode;
@@ -45,8 +46,8 @@
4546

4647
import org.apache.log4j.LogManager;
4748
import org.apache.log4j.Logger;
48-
import org.apache.spark.api.java.JavaRDD;
4949

50+
import java.util.Arrays;
5051
import java.util.Collections;
5152
import java.util.HashMap;
5253
import java.util.HashSet;
@@ -61,7 +62,7 @@
6162
* If updates to file groups that are under clustering are identified, then generate
6263
* two same records for each update, routing into both old and new file groups
6364
*/
64-
public class SparkDuplicateUpdateStrategy<T extends HoodieRecordPayload<T>> extends UpdateStrategy<T, JavaRDD<HoodieRecord<T>>> {
65+
public class SparkDuplicateUpdateStrategy<T extends HoodieRecordPayload<T>> extends UpdateStrategy<T, HoodieData<HoodieRecord<T>>> {
6566

6667
private static final Logger LOG = LogManager.getLogger(SparkDuplicateUpdateStrategy.class);
6768

@@ -70,12 +71,12 @@ public SparkDuplicateUpdateStrategy(HoodieEngineContext engineContext, HoodieTab
7071
}
7172

7273
@Override
73-
public Pair<JavaRDD<HoodieRecord<T>>, Set<HoodieFileGroupId>> handleUpdate(JavaRDD<HoodieRecord<T>> taggedRecordsRDD) {
74+
public Pair<HoodieData<HoodieRecord<T>>, Set<HoodieFileGroupId>> handleUpdate(HoodieData<HoodieRecord<T>> taggedRecordsRDD) {
7475
if (fileGroupsInPendingClustering.isEmpty()) {
7576
return Pair.of(taggedRecordsRDD, Collections.emptySet());
7677
}
7778

78-
JavaRDD<HoodieRecord<T>> filteredRecordsRDD = taggedRecordsRDD.filter(r -> {
79+
HoodieData<HoodieRecord<T>> filteredRecordsRDD = taggedRecordsRDD.filter(r -> {
7980
ValidationUtils.checkState(r.getCurrentLocation() != null);
8081
return fileGroupsInPendingClustering.contains(new HoodieFileGroupId(r.getPartitionPath(), r.getCurrentLocation().getFileId()));
8182
});
@@ -92,7 +93,7 @@ public Pair<JavaRDD<HoodieRecord<T>>, Set<HoodieFileGroupId>> handleUpdate(JavaR
9293
.collect(Collectors.toList());
9394

9495
// Construct child node for each partition & build the bucket identifier
95-
final Set<String> partitions = new HashSet<>(filteredRecordsRDD.map(HoodieRecord::getPartitionPath).distinct().collect());
96+
final Set<String> partitions = new HashSet<>(filteredRecordsRDD.map(HoodieRecord::getPartitionPath).distinct().collectAsList());
9697
Map<String, HoodieConsistentHashingMetadata> partitionToHashingMeta = new HashMap<>();
9798
Map<String, String> partitionToInstant = new HashMap<>();
9899
for (Pair<HoodieInstant, HoodieClusteringPlan> pair : instantPlanPairs) {
@@ -104,11 +105,11 @@ public Pair<JavaRDD<HoodieRecord<T>>, Set<HoodieFileGroupId>> handleUpdate(JavaR
104105
.collect(Collectors.toMap(Map.Entry::getKey, e -> new ConsistentBucketIdentifier(e.getValue())));
105106

106107
// Produce records tagged with new record location
107-
String indexKeyFields = table.getConfig().getBucketIndexHashField();
108-
JavaRDD<HoodieRecord<T>> redirectedRecordsRDD = filteredRecordsRDD.map(r -> {
108+
List<String> indexKeyFields = Arrays.asList(table.getConfig().getBucketIndexHashField().split(","));
109+
HoodieData<HoodieRecord<T>> redirectedRecordsRDD = filteredRecordsRDD.map(r -> {
109110
ConsistentHashingNode node = partitionToIdentifier.get(r.getPartitionPath()).getBucket(r.getKey(), indexKeyFields);
110111
return HoodieIndexUtils.getTaggedRecord(new HoodieAvroRecord(r.getKey(), r.getData(), r.getOperation()),
111-
Option.ofNullable(new HoodieRecordLocation(partitionToInstant.get(r.getPartitionPath()), FSUtils.createNewFileId(node.getFileIdPfx(), 0))));
112+
Option.ofNullable(new HoodieRecordLocation(partitionToInstant.get(r.getPartitionPath()), FSUtils.createNewFileId(node.getFileIdPrefix(), 0))));
112113
});
113114

114115
// Return combined iterator (the original and records with new location)

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketPartitioner.java

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,15 @@
3838
import org.apache.hudi.table.HoodieTable;
3939

4040
import org.apache.avro.Schema;
41+
import org.apache.log4j.LogManager;
42+
import org.apache.log4j.Logger;
4143
import org.apache.spark.Partitioner;
4244
import org.apache.spark.api.java.JavaRDD;
4345

4446
import java.io.Serializable;
4547
import java.util.ArrayList;
48+
import java.util.Arrays;
49+
import java.util.Collections;
4650
import java.util.Comparator;
4751
import java.util.HashMap;
4852
import java.util.List;
@@ -58,9 +62,11 @@
5862
*/
5963
public class RDDConsistentBucketPartitioner<T extends HoodieRecordPayload> extends RDDBucketIndexPartitioner<T> {
6064

65+
private static final Logger LOG = LogManager.getLogger(RDDConsistentBucketPartitioner.class);
66+
6167
private final HoodieTable table;
6268
private final HoodieWriteConfig config;
63-
private String indexKeyFields;
69+
private List<String> indexKeyFields;
6470
private List<Boolean> doAppend;
6571
private List<String> fileIdPfxList;
6672
private Map<String, Map<String, Integer>> partitionToFileIdPfxIdxMap;
@@ -73,7 +79,7 @@ public class RDDConsistentBucketPartitioner<T extends HoodieRecordPayload> exten
7379
public RDDConsistentBucketPartitioner(HoodieTable table, HoodieWriteConfig config, Map<String, String> strategyParams, boolean preserveHoodieMetadata) {
7480
this(table, config);
7581
this.preserveHoodieMetadata = preserveHoodieMetadata;
76-
this.indexKeyFields = config.getBucketIndexHashField();
82+
this.indexKeyFields = Arrays.asList(config.getBucketIndexHashField().split(","));
7783

7884
if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) {
7985
sortColumnNames = strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(",");
@@ -89,7 +95,7 @@ public RDDConsistentBucketPartitioner(HoodieTable table, HoodieWriteConfig confi
8995
this.table = table;
9096
this.config = config;
9197
this.hashingChildrenNodes = new HashMap<>();
92-
this.indexKeyFields = config.getBucketIndexHashField();
98+
this.indexKeyFields = Arrays.asList(config.getBucketIndexHashField().split(","));
9399
this.consistentLogicalTimestampEnabled = config.isConsistentLogicalTimestampEnabled();
94100
}
95101

@@ -144,7 +150,7 @@ public int getPartition(Object key) {
144150
HoodieKey hoodieKey = (HoodieKey) key;
145151
String partition = hoodieKey.getPartitionPath();
146152
ConsistentHashingNode node = partitionToIdentifier.get(partition).getBucket(hoodieKey, indexKeyFields);
147-
return partitionToFileIdPfxIdxMap.get(partition).get(node.getFileIdPfx());
153+
return partitionToFileIdPfxIdxMap.get(partition).get(node.getFileIdPrefix());
148154
}
149155
});
150156
}
@@ -187,11 +193,16 @@ protected void generateFileIdPfx(int parallelism) {
187193
for (ConsistentBucketIdentifier identifier : partitionToIdentifier.values()) {
188194
Map<String, Integer> fileIdPfxToIdx = new HashMap();
189195
for (ConsistentHashingNode node : identifier.getNodes()) {
190-
fileIdPfxToIdx.put(node.getFileIdPfx(), count++);
196+
fileIdPfxToIdx.put(node.getFileIdPrefix(), count++);
197+
}
198+
fileIdPfxList.addAll(identifier.getNodes().stream().map(ConsistentHashingNode::getFileIdPrefix).collect(Collectors.toList()));
199+
if (identifier.getMetadata().isFirstCreated()) {
200+
// Create new file group when the hashing metadata is new (i.e., first write to the partition)
201+
doAppend.addAll(Collections.nCopies(identifier.getNodes().size(), false));
202+
} else {
203+
// Child node requires generating a fresh new base file, rather than log file
204+
doAppend.addAll(identifier.getNodes().stream().map(n -> n.getTag() == ConsistentHashingNode.NodeTag.NORMAL).collect(Collectors.toList()));
191205
}
192-
fileIdPfxList.addAll(identifier.getNodes().stream().map(ConsistentHashingNode::getFileIdPfx).collect(Collectors.toList()));
193-
// Child node requires generating a fresh new base file, rather than log file
194-
doAppend.addAll(identifier.getNodes().stream().map(n -> n.getTag() == ConsistentHashingNode.NodeTag.NORMAL).collect(Collectors.toList()));
195206
partitionToFileIdPfxIdxMap.put(identifier.getMetadata().getPartitionPath(), fileIdPfxToIdx);
196207
}
197208

@@ -260,6 +271,10 @@ public int getPartition(Object key) {
260271
* @return
261272
*/
262273
private JavaRDD<HoodieRecord<T>> doPartitionAndSortByRecordKey(JavaRDD<HoodieRecord<T>> records, Partitioner partitioner) {
274+
if (config.getBulkInsertSortMode() == BulkInsertSortMode.GLOBAL_SORT) {
275+
LOG.warn("Consistent bucket does not support global sort mode, the sort will only be done within each data partition");
276+
}
277+
263278
Comparator<HoodieKey> comparator = (Comparator<HoodieKey> & Serializable) (t1, t2) -> {
264279
return t1.getRecordKey().compareTo(t2.getRecordKey());
265280
};

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import org.apache.hudi.common.util.collection.Pair;
4141
import org.apache.hudi.config.HoodieWriteConfig;
4242
import org.apache.hudi.data.HoodieJavaRDD;
43-
import org.apache.hudi.exception.HoodieIOException;
4443
import org.apache.hudi.exception.HoodieIndexException;
4544
import org.apache.hudi.table.HoodieTable;
4645

@@ -73,6 +72,14 @@ public HoodieSparkConsistentBucketIndex(HoodieWriteConfig config) {
7372
super(config);
7473
}
7574

75+
@Override
76+
public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> writeStatuses,
77+
HoodieEngineContext context,
78+
HoodieTable hoodieTable)
79+
throws HoodieIndexException {
80+
throw new HoodieIndexException("Consistent hashing index does not support update location without the instant parameter");
81+
}
82+
7683
/**
7784
* Persist hashing metadata to storage. Only clustering operations will modify the metadata.
7885
* For example, splitting & merging buckets, or just sorting and producing a new bucket.
@@ -87,7 +94,8 @@ public HoodieSparkConsistentBucketIndex(HoodieWriteConfig config) {
8794
@Override
8895
public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> writeStatuses,
8996
HoodieEngineContext context,
90-
HoodieTable hoodieTable)
97+
HoodieTable hoodieTable,
98+
String instantTime)
9199
throws HoodieIndexException {
92100
HoodieInstant instant = hoodieTable.getMetaClient().getActiveTimeline().findInstantsAfterOrEquals(instantTime, 1).firstInstant().get();
93101
ValidationUtils.checkState(instant.getTimestamp().equals(instantTime), "Cannot get the same instant, instantTime: " + instantTime);
@@ -123,7 +131,7 @@ public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> writeStatu
123131
// Get new metadata and save
124132
meta.setChildrenNodes(childNodes);
125133
List<ConsistentHashingNode> newNodes = (new ConsistentBucketIdentifier(meta)).getNodes().stream()
126-
.map(n -> new ConsistentHashingNode(n.getValue(), n.getFileIdPfx(), ConsistentHashingNode.NodeTag.NORMAL))
134+
.map(n -> new ConsistentHashingNode(n.getValue(), n.getFileIdPrefix(), ConsistentHashingNode.NodeTag.NORMAL))
127135
.collect(Collectors.toList());
128136
HoodieConsistentHashingMetadata newMeta = new HoodieConsistentHashingMetadata(meta.getVersion(), meta.getPartitionPath(),
129137
instantTime, meta.getNumBuckets(), seqNo + 1, newNodes);
@@ -256,7 +264,8 @@ public ConsistentBucketIndexLocationMapper(HoodieTable table, List<String> parti
256264
}
257265

258266
@Override
259-
public Option<HoodieRecordLocation> getRecordLocation(HoodieKey key, String partitionPath) {
267+
public Option<HoodieRecordLocation> getRecordLocation(HoodieKey key) {
268+
String partitionPath = key.getPartitionPath();
260269
ConsistentHashingNode node = partitionToIdentifier.get(partitionPath).getBucket(key, indexKeyFields);
261270
if (!StringUtils.isNullOrEmpty(node.getFileIdPrefix())) {
262271
/**

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public BaseSparkCommitActionExecutor(HoodieEngineContext context,
112112
}
113113
}
114114

115-
private HoodieData<HoodieRecord<T>> clusteringHandleUpdate(HoodieData<HoodieRecord<T>> inputRecords, Set<HoodieFileGroupId> fileGroupsInPendingClustering) {
115+
private HoodieData<HoodieRecord<T>> clusteringHandleUpdate(HoodieData<HoodieRecord<T>> inputRecords) {
116116
context.setJobStatus(this.getClass().getSimpleName(), "Handling updates which are under clustering: " + config.getTableName());
117117
Set<HoodieFileGroupId> fileGroupsInPendingClustering =
118118
table.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet());
@@ -266,7 +266,7 @@ protected HoodieData<WriteStatus> updateIndex(HoodieData<WriteStatus> writeStatu
266266
writeStatuses.persist(config.getString(WRITE_STATUS_STORAGE_LEVEL_VALUE));
267267
Instant indexStartTime = Instant.now();
268268
// Update the index back
269-
HoodieData<WriteStatus> statuses = table.getIndex().updateLocation(writeStatuses, context, table);
269+
HoodieData<WriteStatus> statuses = table.getIndex().updateLocation(writeStatuses, context, table, instantTime);
270270
result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
271271
result.setWriteStatuses(statuses);
272272
return statuses;

0 commit comments

Comments
 (0)