Skip to content

Commit 103f3ef

Browse files
committed
[HUDI-5863] Fix HoodieMetadataFileSystemView serving stale view at the timeline server
1 parent bb240f7 commit 103f3ef

8 files changed

Lines changed: 407 additions & 40 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.hudi.avro.model.HoodieCompactionPlan;
2727
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
2828
import org.apache.hudi.avro.model.HoodieRollbackPlan;
29+
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
2930
import org.apache.hudi.client.heartbeat.HeartbeatUtils;
3031
import org.apache.hudi.common.HoodiePendingRollbackInfo;
3132
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -87,8 +88,8 @@ public abstract class BaseHoodieTableServiceClient<O> extends BaseHoodieClient i
8788

8889
protected Set<String> pendingInflightAndRequestedInstants;
8990

90-
protected BaseHoodieTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
91-
super(context, clientConfig, Option.empty());
91+
protected BaseHoodieTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig, Option<EmbeddedTimelineService> timelineService) {
92+
super(context, clientConfig, timelineService);
9293
}
9394

9495
protected void startAsyncCleanerService(BaseHoodieWriteClient writeClient) {

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.hudi.avro.model.HoodieClusteringGroup;
2222
import org.apache.hudi.avro.model.HoodieClusteringPlan;
23+
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
2324
import org.apache.hudi.common.data.HoodieData;
2425
import org.apache.hudi.common.engine.HoodieEngineContext;
2526
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -58,8 +59,8 @@ public class SparkRDDTableServiceClient<T> extends BaseHoodieTableServiceClient<
5859

5960
private static final Logger LOG = LoggerFactory.getLogger(SparkRDDTableServiceClient.class);
6061

61-
protected SparkRDDTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
62-
super(context, clientConfig);
62+
protected SparkRDDTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig, Option<EmbeddedTimelineService> timelineService) {
63+
super(context, clientConfig, timelineService);
6364
}
6465

6566
@Override

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeC
8383
public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig,
8484
Option<EmbeddedTimelineService> timelineService) {
8585
super(context, writeConfig, timelineService, SparkUpgradeDowngradeHelper.getInstance());
86-
this.tableServiceClient = new SparkRDDTableServiceClient<>(context, writeConfig);
86+
this.tableServiceClient = new SparkRDDTableServiceClient<>(context, writeConfig, timelineService);
8787
}
8888

8989
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.hudi.client.functional;
21+
22+
import org.apache.hudi.client.SparkRDDWriteClient;
23+
import org.apache.hudi.client.WriteStatus;
24+
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
25+
import org.apache.hudi.common.fs.FSUtils;
26+
import org.apache.hudi.common.model.FileSlice;
27+
import org.apache.hudi.common.model.HoodieCommitMetadata;
28+
import org.apache.hudi.common.model.HoodieRecord;
29+
import org.apache.hudi.common.model.HoodieTableType;
30+
import org.apache.hudi.common.table.HoodieTableMetaClient;
31+
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
32+
import org.apache.hudi.common.table.timeline.HoodieInstant;
33+
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
34+
import org.apache.hudi.common.table.view.FileSystemViewManager;
35+
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
36+
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
37+
import org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView;
38+
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
39+
import org.apache.hudi.common.util.Option;
40+
import org.apache.hudi.common.util.collection.Pair;
41+
import org.apache.hudi.config.HoodieCompactionConfig;
42+
import org.apache.hudi.config.HoodieWriteConfig;
43+
import org.apache.hudi.metadata.HoodieBackedTestDelayedTableMetadata;
44+
import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
45+
import org.apache.hudi.testutils.HoodieClientTestHarness;
46+
import org.apache.hudi.timeline.service.TimelineService;
47+
48+
import org.apache.hadoop.conf.Configuration;
49+
import org.apache.hadoop.fs.FileSystem;
50+
import org.apache.hadoop.fs.Path;
51+
import org.apache.log4j.LogManager;
52+
import org.apache.log4j.Logger;
53+
import org.apache.spark.api.java.JavaRDD;
54+
import org.junit.jupiter.api.AfterEach;
55+
import org.junit.jupiter.api.BeforeEach;
56+
import org.junit.jupiter.api.Test;
57+
58+
import java.io.IOException;
59+
import java.util.ArrayList;
60+
import java.util.Collections;
61+
import java.util.List;
62+
import java.util.concurrent.Callable;
63+
import java.util.concurrent.ExecutorService;
64+
import java.util.concurrent.Executors;
65+
import java.util.concurrent.Future;
66+
import java.util.stream.Collectors;
67+
68+
import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
69+
import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
70+
import static org.junit.jupiter.api.Assertions.assertTrue;
71+
72+
/**
73+
* Tests the {@link RemoteHoodieTableFileSystemView} with metadata table enabled, using
74+
* {@link HoodieMetadataFileSystemView} on the timeline server.
75+
*/
76+
public class TestRemoteFileSystemViewWithMetadataTable extends HoodieClientTestHarness {
77+
private static final Logger LOG = LogManager.getLogger(TestRemoteFileSystemViewWithMetadataTable.class);
78+
79+
@BeforeEach
80+
public void setUp() throws Exception {
81+
initPath();
82+
initSparkContexts();
83+
initFileSystem();
84+
initMetaClient();
85+
initTimelineService();
86+
dataGen = new HoodieTestDataGenerator(0x1f86);
87+
}
88+
89+
@AfterEach
90+
public void tearDown() throws Exception {
91+
cleanupTimelineService();
92+
cleanupClients();
93+
cleanupSparkContexts();
94+
cleanupFileSystem();
95+
cleanupExecutorService();
96+
dataGen = null;
97+
System.gc();
98+
}
99+
100+
@Override
101+
public void initTimelineService() {
102+
// Start a timeline server that are running across multiple commits
103+
HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf());
104+
105+
try {
106+
HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
107+
.withPath(basePath)
108+
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
109+
.withRemoteServerPort(incrementTimelineServicePortToUse()).build())
110+
.build();
111+
timelineService = new TimelineService(localEngineContext, new Configuration(),
112+
TimelineService.Config.builder().enableMarkerRequests(true)
113+
.serverPort(config.getViewStorageConfig().getRemoteViewServerPort()).build(),
114+
FileSystem.get(new Configuration()),
115+
FileSystemViewManager.createViewManager(
116+
context, config.getMetadataConfig(), config.getViewStorageConfig(),
117+
config.getCommonConfig(),
118+
() -> new HoodieBackedTestDelayedTableMetadata(
119+
context, config.getMetadataConfig(), basePath,
120+
config.getViewStorageConfig().getSpillableDir(), true)));
121+
timelineService.startService();
122+
timelineServicePort = timelineService.getServerPort();
123+
LOG.info("Started timeline server on port: " + timelineServicePort);
124+
} catch (Exception ex) {
125+
throw new RuntimeException(ex);
126+
}
127+
}
128+
129+
@Test
130+
public void testMORGetLatestFileSliceWithMetadataTable() throws IOException {
131+
// This test utilizes the `HoodieBackedTestDelayedTableMetadata` to make sure the
132+
// synced file system view is always served.
133+
for (int i = 0; i < 3; i++) {
134+
writeToTable(i, timelineService);
135+
}
136+
137+
// At this point, there are three deltacommits and one compaction commit in the Hudi timeline,
138+
// and the file system view of timeline server is not yet synced
139+
HoodieTableMetaClient newMetaClient = HoodieTableMetaClient.builder()
140+
.setConf(metaClient.getHadoopConf())
141+
.setBasePath(basePath)
142+
.build();
143+
HoodieActiveTimeline timeline = newMetaClient.getActiveTimeline();
144+
HoodieInstant compactionCommit = timeline.lastInstant().get();
145+
assertTrue(timeline.lastInstant().get().getAction().equals(COMMIT_ACTION));
146+
147+
// For all the file groups compacted by the compaction commit, the file system view
148+
// should return the latest file slices which is written by the latest commit
149+
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
150+
timeline.getInstantDetails(compactionCommit).get(), HoodieCommitMetadata.class);
151+
List<Pair<String, String>> partitionFileIdPairList =
152+
commitMetadata.getPartitionToWriteStats().entrySet().stream().flatMap(
153+
entry -> {
154+
String partitionPath = entry.getKey();
155+
return entry.getValue().stream().map(
156+
writeStat -> Pair.of(partitionPath, writeStat.getFileId()));
157+
}
158+
).collect(Collectors.toList());
159+
List<Pair<String, String>> lookupList = new ArrayList<>();
160+
// Accumulate enough threads to test read-write concurrency while syncing the file system
161+
// view at the timeline server
162+
while (lookupList.size() < 128) {
163+
lookupList.addAll(partitionFileIdPairList);
164+
}
165+
166+
LOG.info("Connecting to Timeline Server: " + timelineService.getServerPort());
167+
RemoteHoodieTableFileSystemView view = new RemoteHoodieTableFileSystemView(
168+
"localhost", timelineService.getServerPort(), metaClient);
169+
170+
List<TestViewLookUpCallable> callableList = lookupList.stream()
171+
.map(pair -> new TestViewLookUpCallable(view, pair, compactionCommit.getTimestamp()))
172+
.collect(Collectors.toList());
173+
List<Future<Boolean>> resultList = new ArrayList<>();
174+
175+
ExecutorService pool = Executors.newCachedThreadPool();
176+
callableList.forEach(callable -> {
177+
resultList.add(pool.submit(callable));
178+
});
179+
180+
assertTrue(resultList.stream().map(future -> {
181+
try {
182+
return future.get();
183+
} catch (Exception e) {
184+
LOG.error(e);
185+
return false;
186+
}
187+
}).reduce((a, b) -> a && b).get());
188+
}
189+
190+
@Override
191+
protected HoodieTableType getTableType() {
192+
return HoodieTableType.MERGE_ON_READ;
193+
}
194+
195+
private void writeToTable(int round, TimelineService timelineService) throws IOException {
196+
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
197+
.withPath(basePath)
198+
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
199+
.withParallelism(2, 2)
200+
.withBulkInsertParallelism(2)
201+
.withFinalizeWriteParallelism(2)
202+
.withDeleteParallelism(2)
203+
.withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
204+
.withMergeSmallFileGroupCandidatesLimit(0)
205+
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
206+
.withMaxNumDeltaCommitsBeforeCompaction(3)
207+
.build())
208+
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
209+
.withStorageType(FileSystemViewStorageType.REMOTE_ONLY)
210+
.withRemoteServerPort(timelineService.getServerPort())
211+
.build())
212+
.withAutoCommit(false)
213+
.forTable("test_mor_table")
214+
.build();
215+
SparkRDDWriteClient writeClient =
216+
new SparkRDDWriteClient(context, writeConfig, Option.of(timelineService));
217+
String instantTime = HoodieActiveTimeline.createNewInstantTime();
218+
writeClient.startCommitWithTime(instantTime);
219+
List<HoodieRecord> records = round == 0
220+
? dataGen.generateInserts(instantTime, 100)
221+
: dataGen.generateUpdates(instantTime, 100);
222+
223+
JavaRDD<WriteStatus> writeStatusRDD = writeClient.upsert(jsc.parallelize(records, 1), instantTime);
224+
writeClient.commit(instantTime, writeStatusRDD, Option.empty(), DELTA_COMMIT_ACTION, Collections.emptyMap());
225+
// Triggers compaction
226+
writeClient.scheduleCompaction(Option.empty());
227+
writeClient.runAnyPendingCompactions();
228+
}
229+
230+
/**
231+
* Test callable to send lookup request to the timeline server for the latest file slice
232+
* based on the partition path and file ID.
233+
*/
234+
class TestViewLookUpCallable implements Callable<Boolean> {
235+
private final RemoteHoodieTableFileSystemView view;
236+
private final Pair<String, String> partitionFileIdPair;
237+
private final String expectedCommitTime;
238+
239+
public TestViewLookUpCallable(
240+
RemoteHoodieTableFileSystemView view,
241+
Pair<String, String> partitionFileIdPair,
242+
String expectedCommitTime) {
243+
this.view = view;
244+
this.partitionFileIdPair = partitionFileIdPair;
245+
this.expectedCommitTime = expectedCommitTime;
246+
}
247+
248+
@Override
249+
public Boolean call() throws Exception {
250+
Option<FileSlice> latestFileSlice = view.getLatestFileSlice(
251+
partitionFileIdPair.getLeft(), partitionFileIdPair.getRight());
252+
boolean result = latestFileSlice.isPresent() && expectedCommitTime.equals(
253+
FSUtils.getCommitTime(new Path(latestFileSlice.get().getBaseFile().get().getPath()).getName()));
254+
if (!result) {
255+
LOG.error("The timeline server does not return the correct result: latestFileSliceReturned="
256+
+ latestFileSlice + " expectedCommitTime=" + expectedCommitTime);
257+
}
258+
return result;
259+
}
260+
}
261+
}

hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,10 @@ public void close() {
269269

270270
/**
271271
* Clears the partition Map and reset view states.
272+
* <p>
273+
* NOTE: This method SHOULD NOT BE OVERRIDDEN which may cause stale file system view
274+
* to be served. Instead, override {@link AbstractTableFileSystemView#runReset} to
275+
* add custom logic.
272276
*/
273277
@Override
274278
public void reset() {
@@ -282,6 +286,20 @@ public void reset() {
282286
}
283287
}
284288

289+
/**
290+
* Resets the view states, which can be overridden by subclasses. This reset logic is guarded
291+
* by the write lock.
292+
* <p>
293+
* NOTE: This method SHOULD BE OVERRIDDEN for any custom logic. DO NOT OVERRIDE
294+
* {@link AbstractTableFileSystemView#reset} directly, which may cause stale file system view
295+
* to be served.
296+
*/
297+
protected void runReset() {
298+
clear();
299+
// Initialize with new Hoodie timeline.
300+
init(metaClient, getTimeline());
301+
}
302+
285303
/**
286304
* Clear the resource.
287305
*/
@@ -1388,6 +1406,13 @@ public HoodieTimeline getTimeline() {
13881406
return visibleCommitsAndCompactionTimeline;
13891407
}
13901408

1409+
/**
1410+
* Syncs the file system view from storage to memory.
1411+
* <p>
1412+
* NOTE: This method SHOULD NOT BE OVERRIDDEN which may cause stale file system view
1413+
* to be served. Instead, override {@link AbstractTableFileSystemView#runSync} to
1414+
* add custom logic.
1415+
*/
13911416
@Override
13921417
public void sync() {
13931418
HoodieTimeline oldTimeline = getTimeline();
@@ -1401,11 +1426,15 @@ public void sync() {
14011426
}
14021427

14031428
/**
1404-
* Performs complete reset of file-system view. Subsequent partition view calls will load file slices against latest
1405-
* timeline
1429+
* Performs complete reset of file-system view. Subsequent partition view calls will load file
1430+
* slices against the latest timeline. This sync logic is guarded by the write lock.
1431+
* <p>
1432+
* NOTE: This method SHOULD BE OVERRIDDEN for any custom logic. DO NOT OVERRIDE
1433+
* {@link AbstractTableFileSystemView#sync} directly, which may cause stale file system view
1434+
* to be served.
14061435
*
1407-
* @param oldTimeline Old Hoodie Timeline
1408-
* @param newTimeline New Hoodie Timeline
1436+
* @param oldTimeline Old Hudi Timeline
1437+
* @param newTimeline New Hudi Timeline
14091438
*/
14101439
protected void runSync(HoodieTimeline oldTimeline, HoodieTimeline newTimeline) {
14111440
clear();

hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,14 +90,14 @@ protected Map<Pair<String, Path>, FileStatus[]> listPartitions(List<Pair<String,
9090
}
9191

9292
@Override
93-
public void reset() {
94-
super.reset();
93+
public void runReset() {
94+
super.runReset();
9595
tableMetadata.reset();
9696
}
9797

9898
@Override
99-
public void sync() {
100-
super.sync();
99+
protected void runSync(HoodieTimeline oldTimeline, HoodieTimeline newTimeline) {
100+
super.runSync(oldTimeline, newTimeline);
101101
tableMetadata.reset();
102102
}
103103

0 commit comments

Comments
 (0)