Skip to content

Commit 5a3c7ab

Browse files
committed
[HUDI-3669] Add a remote request retry mechanism for Remotehoodietablefilesystemview.
1 parent 59527c3 commit 5a3c7ab

7 files changed

Lines changed: 195 additions & 37 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,11 @@ public FileSystemViewStorageConfig getRemoteFileSystemViewConfig() {
117117
.withRemoteServerHost(hostAddr)
118118
.withRemoteServerPort(serverPort)
119119
.withRemoteTimelineClientTimeoutSecs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientTimeoutSecs())
120+
.withRemoteTimelineClientRetry(writeConfig.getClientSpecifiedViewStorageConfig().isRemoteTimelineClientRetryEnabled())
121+
.withRemoteTimelineClientMaxRetryNumbers(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientMaxRetryNumbers())
122+
.withRemoteTimelineInitialRetryIntervalMs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineInitialRetryIntervalMs())
123+
.withRemoteTimelineClientMaxRetryIntervalMs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientMaxRetryIntervalMs())
124+
.withRemoteTimelineClientRetryExceptions(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientRetryExceptions())
120125
.build();
121126
}
122127

hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,8 +214,7 @@ private static RemoteHoodieTableFileSystemView createRemoteFileSystemView(Serial
214214
LOG.info("Creating remote view for basePath " + metaClient.getBasePath() + ". Server="
215215
+ viewConf.getRemoteViewServerHost() + ":" + viewConf.getRemoteViewServerPort() + ", Timeout="
216216
+ viewConf.getRemoteTimelineClientTimeoutSecs());
217-
return new RemoteHoodieTableFileSystemView(viewConf.getRemoteViewServerHost(), viewConf.getRemoteViewServerPort(),
218-
metaClient, viewConf.getRemoteTimelineClientTimeoutSecs());
217+
return new RemoteHoodieTableFileSystemView(metaClient, viewConf);
219218
}
220219

221220
public static FileSystemViewManager createViewManager(final HoodieEngineContext context,

hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,37 @@ public class FileSystemViewStorageConfig extends HoodieConfig {
110110
.defaultValue(5 * 60) // 5 min
111111
.withDocumentation("Timeout in seconds, to wait for API requests against a remote file system view. e.g timeline server.");
112112

113+
public static final ConfigProperty<String> REMOTE_RETRY_ENABLE = ConfigProperty
114+
.key("hoodie.filesystem.view.remote.retry.enable")
115+
.defaultValue("false")
116+
.sinceVersion("0.12.0")
117+
.withDocumentation("Whether to enable API request retry for remote file system view.");
118+
119+
public static final ConfigProperty<Integer> REMOTE_MAX_RETRY_NUMBERS = ConfigProperty
120+
.key("hoodie.filesystem.view.remote.retry.max_numbers")
121+
.defaultValue(3) // 3 times
122+
.sinceVersion("0.12.0")
123+
.withDocumentation("Maximum number of retry for API requests against a remote file system view. e.g timeline server.");
124+
125+
public static final ConfigProperty<Long> REMOTE_INITIAL_RETRY_INTERVAL_MS = ConfigProperty
126+
.key("hoodie.filesystem.view.remote.retry.initial_interval_ms")
127+
.defaultValue(100L)
128+
.sinceVersion("0.12.0")
129+
.withDocumentation("Amount of time (in ms) to wait, before retry to do operations on storage.");
130+
131+
public static final ConfigProperty<Long> REMOTE_MAX_RETRY_INTERVAL_MS = ConfigProperty
132+
.key("hoodie.filesystem.view.remote.retry.max_interval_ms")
133+
.defaultValue(2000L)
134+
.sinceVersion("0.12.0")
135+
.withDocumentation("Maximum amount of time (in ms), to wait for next retry.");
136+
137+
public static final ConfigProperty<String> RETRY_EXCEPTIONS = ConfigProperty
138+
.key("hoodie.filesystem.view.remote.retry.exceptions")
139+
.defaultValue("")
140+
.sinceVersion("0.12.0")
141+
.withDocumentation("The class name of the Exception that needs to be re-tryed, separated by commas. "
142+
+ "Default is empty which means retry all the IOException and RuntimeException from Remote Request.");
143+
113144
public static final ConfigProperty<String> REMOTE_BACKUP_VIEW_ENABLE = ConfigProperty
114145
.key("hoodie.filesystem.remote.backup.view.enable")
115146
.defaultValue("true") // Need to be disabled only for tests.
@@ -144,6 +175,26 @@ public Integer getRemoteTimelineClientTimeoutSecs() {
144175
return getInt(REMOTE_TIMEOUT_SECS);
145176
}
146177

178+
public boolean isRemoteTimelineClientRetryEnabled() {
179+
return getBoolean(REMOTE_RETRY_ENABLE);
180+
}
181+
182+
public Integer getRemoteTimelineClientMaxRetryNumbers() {
183+
return getInt(REMOTE_MAX_RETRY_NUMBERS);
184+
}
185+
186+
public Long getRemoteTimelineInitialRetryIntervalMs() {
187+
return getLong(REMOTE_INITIAL_RETRY_INTERVAL_MS);
188+
}
189+
190+
public Long getRemoteTimelineClientMaxRetryIntervalMs() {
191+
return getLong(REMOTE_MAX_RETRY_INTERVAL_MS);
192+
}
193+
194+
public String getRemoteTimelineClientRetryExceptions() {
195+
return getString(RETRY_EXCEPTIONS);
196+
}
197+
147198
public long getMaxMemoryForFileGroupMap() {
148199
long totalMemory = getLong(SPILLABLE_MEMORY);
149200
return totalMemory - getMaxMemoryForPendingCompaction() - getMaxMemoryForBootstrapBaseFile();
@@ -245,6 +296,31 @@ public Builder withRemoteTimelineClientTimeoutSecs(Integer timelineClientTimeout
245296
return this;
246297
}
247298

299+
public Builder withRemoteTimelineClientRetry(boolean enableRetry) {
300+
fileSystemViewStorageConfig.setValue(REMOTE_RETRY_ENABLE, Boolean.toString(enableRetry));
301+
return this;
302+
}
303+
304+
public Builder withRemoteTimelineClientMaxRetryNumbers(Integer maxRetryNumbers) {
305+
fileSystemViewStorageConfig.setValue(REMOTE_MAX_RETRY_NUMBERS, maxRetryNumbers.toString());
306+
return this;
307+
}
308+
309+
public Builder withRemoteTimelineInitialRetryIntervalMs(Long initialRetryIntervalMs) {
310+
fileSystemViewStorageConfig.setValue(REMOTE_INITIAL_RETRY_INTERVAL_MS, initialRetryIntervalMs.toString());
311+
return this;
312+
}
313+
314+
public Builder withRemoteTimelineClientMaxRetryIntervalMs(Long maxRetryIntervalMs) {
315+
fileSystemViewStorageConfig.setValue(REMOTE_MAX_RETRY_INTERVAL_MS, maxRetryIntervalMs.toString());
316+
return this;
317+
}
318+
319+
public Builder withRemoteTimelineClientRetryExceptions(String retryExceptions) {
320+
fileSystemViewStorageConfig.setValue(RETRY_EXCEPTIONS, retryExceptions);
321+
return this;
322+
}
323+
248324
public Builder withMemFractionForPendingCompaction(Double memFractionForPendingCompaction) {
249325
fileSystemViewStorageConfig.setValue(SPILLABLE_COMPACTION_MEM_FRACTION, memFractionForPendingCompaction.toString());
250326
return this;

hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java

Lines changed: 51 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.hudi.common.table.timeline.dto.InstantDTO;
4040
import org.apache.hudi.common.table.timeline.dto.TimelineDTO;
4141
import org.apache.hudi.common.util.Option;
42+
import org.apache.hudi.common.util.RetryHelper;
4243
import org.apache.hudi.common.util.StringUtils;
4344
import org.apache.hudi.common.util.ValidationUtils;
4445
import org.apache.hudi.common.util.collection.Pair;
@@ -132,22 +133,35 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
132133

133134
private boolean closed = false;
134135

136+
private RetryHelper<Response> retryHelper;
137+
138+
private final HttpRequestCheckedFunction urlCheckedFunc;
139+
135140
private enum RequestMethod {
136141
GET, POST
137142
}
138143

139144
public RemoteHoodieTableFileSystemView(String server, int port, HoodieTableMetaClient metaClient) {
140-
this(server, port, metaClient, 300);
145+
this(metaClient, FileSystemViewStorageConfig.newBuilder().withRemoteServerHost(server).withRemoteServerPort(port).build());
141146
}
142147

143-
public RemoteHoodieTableFileSystemView(String server, int port, HoodieTableMetaClient metaClient, int timeoutSecs) {
148+
public RemoteHoodieTableFileSystemView(HoodieTableMetaClient metaClient, FileSystemViewStorageConfig viewConf) {
144149
this.basePath = metaClient.getBasePath();
145-
this.serverHost = server;
146-
this.serverPort = port;
147150
this.mapper = new ObjectMapper();
148151
this.metaClient = metaClient;
149152
this.timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
150-
this.timeoutSecs = timeoutSecs;
153+
this.serverHost = viewConf.getRemoteViewServerHost();
154+
this.serverPort = viewConf.getRemoteViewServerPort();
155+
this.timeoutSecs = viewConf.getRemoteTimelineClientTimeoutSecs();
156+
this.urlCheckedFunc = new HttpRequestCheckedFunction(this.timeoutSecs * 1000);
157+
if (viewConf.isRemoteTimelineClientRetryEnabled()) {
158+
retryHelper = new RetryHelper(
159+
viewConf.getRemoteTimelineClientMaxRetryIntervalMs(),
160+
viewConf.getRemoteTimelineClientMaxRetryNumbers(),
161+
viewConf.getRemoteTimelineInitialRetryIntervalMs(),
162+
viewConf.getRemoteTimelineClientRetryExceptions(),
163+
"Sending request");
164+
}
151165
}
152166

153167
private <T> T executeRequest(String requestPath, Map<String, String> queryParameters, TypeReference reference,
@@ -165,17 +179,9 @@ private <T> T executeRequest(String requestPath, Map<String, String> queryParame
165179

166180
String url = builder.toString();
167181
LOG.info("Sending request : (" + url + ")");
168-
Response response;
169-
int timeout = this.timeoutSecs * 1000; // msec
170-
switch (method) {
171-
case GET:
172-
response = Request.Get(url).connectTimeout(timeout).socketTimeout(timeout).execute();
173-
break;
174-
case POST:
175-
default:
176-
response = Request.Post(url).connectTimeout(timeout).socketTimeout(timeout).execute();
177-
break;
178-
}
182+
// Reset url and method, to avoid repeatedly instantiating objects.
183+
urlCheckedFunc.setUrlAndMethod(url, method);
184+
Response response = retryHelper != null ? retryHelper.tryWith(urlCheckedFunc).start() : urlCheckedFunc.get();
179185
String content = response.returnContent().asString();
180186
return (T) mapper.readValue(content, reference);
181187
}
@@ -495,4 +501,33 @@ public Option<HoodieBaseFile> getLatestBaseFile(String partitionPath, String fil
495501
throw new HoodieRemoteException(e);
496502
}
497503
}
504+
505+
/**
506+
* For remote HTTP requests, to avoid repeatedly instantiating objects.
507+
*/
508+
private class HttpRequestCheckedFunction implements RetryHelper.CheckedFunction<Response> {
509+
private String url;
510+
private RequestMethod method;
511+
private final int timeout;
512+
513+
public void setUrlAndMethod(String url, RequestMethod method) {
514+
this.method = method;
515+
this.url = url;
516+
}
517+
518+
public HttpRequestCheckedFunction(int timeout) {
519+
this.timeout = timeout;
520+
}
521+
522+
@Override
523+
public Response get() throws IOException {
524+
switch (method) {
525+
case GET:
526+
return Request.Get(url).connectTimeout(timeout).socketTimeout(timeout).execute();
527+
case POST:
528+
default:
529+
return Request.Post(url).connectTimeout(timeout).socketTimeout(timeout).execute();
530+
}
531+
}
532+
}
498533
}

hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,47 +18,52 @@
1818

1919
package org.apache.hudi.common.util;
2020

21+
import org.apache.hudi.exception.HoodieException;
2122
import org.apache.log4j.LogManager;
2223
import org.apache.log4j.Logger;
2324

2425
import java.io.IOException;
26+
import java.io.Serializable;
2527
import java.util.ArrayList;
2628
import java.util.Arrays;
2729
import java.util.List;
2830
import java.util.Random;
2931
import java.util.stream.Collectors;
3032

31-
public class RetryHelper<T> {
33+
public class RetryHelper<T> implements Serializable {
3234
private static final Logger LOG = LogManager.getLogger(RetryHelper.class);
33-
private CheckedFunction<T> func;
34-
private int num;
35-
private long maxIntervalTime;
36-
private long initialIntervalTime = 100L;
35+
private transient CheckedFunction<T> func;
36+
private final int num;
37+
private final long maxIntervalTime;
38+
private final long initialIntervalTime;
3739
private String taskInfo = "N/A";
3840
private List<? extends Class<? extends Exception>> retryExceptionsClasses;
3941

40-
public RetryHelper() {
41-
}
42-
4342
public RetryHelper(long maxRetryIntervalMs, int maxRetryNumbers, long initialRetryIntervalMs, String retryExceptions) {
4443
this.num = maxRetryNumbers;
4544
this.initialIntervalTime = initialRetryIntervalMs;
4645
this.maxIntervalTime = maxRetryIntervalMs;
4746
if (StringUtils.isNullOrEmpty(retryExceptions)) {
4847
this.retryExceptionsClasses = new ArrayList<>();
4948
} else {
50-
this.retryExceptionsClasses = Arrays.stream(retryExceptions.split(","))
51-
.map(exception -> (Exception) ReflectionUtils.loadClass(exception, ""))
52-
.map(Exception::getClass)
53-
.collect(Collectors.toList());
49+
try {
50+
this.retryExceptionsClasses = Arrays.stream(retryExceptions.split(","))
51+
.map(exception -> (Exception) ReflectionUtils.loadClass(exception, ""))
52+
.map(Exception::getClass)
53+
.collect(Collectors.toList());
54+
} catch (HoodieException e) {
55+
LOG.error("Exception while loading retry exceptions classes '" + retryExceptions + "'.", e);
56+
this.retryExceptionsClasses = new ArrayList<>();
57+
}
5458
}
5559
}
5660

57-
public RetryHelper(String taskInfo) {
61+
public RetryHelper(long maxRetryIntervalMs, int maxRetryNumbers, long initialRetryIntervalMs, String retryExceptions, String taskInfo) {
62+
this(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptions);
5863
this.taskInfo = taskInfo;
5964
}
6065

61-
public RetryHelper tryWith(CheckedFunction<T> func) {
66+
public RetryHelper<T> tryWith(CheckedFunction<T> func) {
6267
this.func = func;
6368
return this;
6469
}
@@ -77,21 +82,25 @@ public T start() throws IOException {
7782
throw e;
7883
}
7984
if (retries++ >= num) {
80-
LOG.error("Still failed to " + taskInfo + " after retried " + num + " times.", e);
81-
throw e;
85+
String message = "Still failed to " + taskInfo + " after retried " + num + " times.";
86+
LOG.error(message, e);
87+
if (e instanceof IOException) {
88+
throw new IOException(message, e);
89+
}
8290
}
83-
LOG.warn("Catch Exception " + taskInfo + ", will retry after " + waitTime + " ms.", e);
91+
LOG.warn("Catch Exception for " + taskInfo + ", will retry after " + waitTime + " ms.", e);
8492
try {
8593
Thread.sleep(waitTime);
8694
} catch (InterruptedException ex) {
87-
// ignore InterruptedException here
95+
// ignore InterruptedException here
8896
}
8997
}
9098
}
9199

92100
if (retries > 0) {
93101
LOG.info("Success to " + taskInfo + " after retried " + retries + " times.");
94102
}
103+
95104
return functionResult;
96105
}
97106

@@ -123,7 +132,7 @@ private long getWaitTimeExp(int retryCount) {
123132
}
124133

125134
@FunctionalInterface
126-
public interface CheckedFunction<T> {
135+
public interface CheckedFunction<T> extends Serializable {
127136
T get() throws IOException;
128137
}
129138
}

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,11 @@ public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throw
431431
.withRemoteServerHost(viewStorageConfig.getRemoteViewServerHost())
432432
.withRemoteServerPort(viewStorageConfig.getRemoteViewServerPort())
433433
.withRemoteTimelineClientTimeoutSecs(viewStorageConfig.getRemoteTimelineClientTimeoutSecs())
434+
.withRemoteTimelineClientRetry(viewStorageConfig.isRemoteTimelineClientRetryEnabled())
435+
.withRemoteTimelineClientMaxRetryNumbers(viewStorageConfig.getRemoteTimelineClientMaxRetryNumbers())
436+
.withRemoteTimelineInitialRetryIntervalMs(viewStorageConfig.getRemoteTimelineInitialRetryIntervalMs())
437+
.withRemoteTimelineClientMaxRetryIntervalMs(viewStorageConfig.getRemoteTimelineClientMaxRetryIntervalMs())
438+
.withRemoteTimelineClientRetryExceptions(viewStorageConfig.getRemoteTimelineClientRetryExceptions())
434439
.build();
435440
ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), rebuilt, conf);
436441
return writeClient;

hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,14 @@
2828
import org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView;
2929
import org.apache.hudi.common.table.view.SyncableFileSystemView;
3030
import org.apache.hudi.common.table.view.TestHoodieTableFileSystemView;
31+
import org.apache.hudi.exception.HoodieRemoteException;
3132
import org.apache.hudi.timeline.service.TimelineService;
3233

3334
import org.apache.hadoop.conf.Configuration;
3435
import org.apache.hadoop.fs.FileSystem;
3536
import org.apache.log4j.LogManager;
3637
import org.apache.log4j.Logger;
38+
import org.junit.jupiter.api.Test;
3739

3840
/**
3941
* Bring up a remote Timeline Server and run all test-cases of TestHoodieTableFileSystemView against it.
@@ -64,4 +66,31 @@ protected SyncableFileSystemView getFileSystemView(HoodieTimeline timeline) {
6466
view = new RemoteHoodieTableFileSystemView("localhost", server.getServerPort(), metaClient);
6567
return view;
6668
}
69+
70+
@Test
71+
public void testRemoteHoodieTableFileSystemViewWithRetry() {
72+
// The server is normal.
73+
view.getLatestBaseFiles();
74+
// Shut down service.
75+
server.close();
76+
// Default behavior.
77+
try {
78+
view.getLatestBaseFiles();
79+
} catch (HoodieRemoteException e) {
80+
assert e.getMessage().contains("Connection refused (Connection refused)");
81+
}
82+
// Open remote request retry.
83+
view = new RemoteHoodieTableFileSystemView(metaClient, FileSystemViewStorageConfig
84+
.newBuilder()
85+
.withRemoteServerHost("localhost")
86+
.withRemoteServerPort(server.getServerPort())
87+
.withRemoteTimelineClientRetry(true)
88+
.withRemoteTimelineClientMaxRetryNumbers(4)
89+
.build());
90+
try {
91+
view.getLatestBaseFiles();
92+
} catch (HoodieRemoteException e) {
93+
assert e.getMessage().equalsIgnoreCase("Still failed to Sending request after retried 4 times.");
94+
}
95+
}
6796
}

0 commit comments

Comments
 (0)