Skip to content

Commit 3d4947e

Browse files
committed
Merge branch 'master' of https://github.com/apache/hudi into HUDI-4410
2 parents ce0b46f + 6c35780 commit 3d4947e

25 files changed

Lines changed: 2445 additions & 49 deletions

File tree

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.hudi.client.transaction.lock;
21+
22+
import org.apache.hadoop.conf.Configuration;
23+
import org.apache.hadoop.fs.FileSystem;
24+
import org.apache.hadoop.fs.Path;
25+
import org.apache.hudi.common.config.LockConfiguration;
26+
import org.apache.hudi.common.fs.FSUtils;
27+
import org.apache.hudi.common.lock.LockProvider;
28+
import org.apache.hudi.common.lock.LockState;
29+
import org.apache.hudi.common.table.HoodieTableMetaClient;
30+
import org.apache.hudi.common.util.StringUtils;
31+
import org.apache.hudi.common.util.ValidationUtils;
32+
import org.apache.hudi.config.HoodieWriteConfig;
33+
import org.apache.hudi.exception.HoodieIOException;
34+
import org.apache.hudi.exception.HoodieLockException;
35+
import org.apache.log4j.LogManager;
36+
import org.apache.log4j.Logger;
37+
38+
import java.io.IOException;
39+
import java.io.Serializable;
40+
import java.util.concurrent.TimeUnit;
41+
42+
import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY;
43+
import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
44+
45+
/**
46+
* A FileSystem based lock. This {@link LockProvider} implementation allows to lock table operations
47+
* using DFS. Users might need to manually clean the Locker's path if writeClient crash and never run again.
48+
* NOTE: This only works for DFS with atomic create/delete operation
49+
*/
50+
public class FileSystemBasedLockProvider implements LockProvider<String>, Serializable {
51+
52+
private static final Logger LOG = LogManager.getLogger(FileSystemBasedLockProvider.class);
53+
54+
private static final String LOCK_FILE_NAME = "lock";
55+
56+
private final int lockTimeoutMinutes;
57+
private transient FileSystem fs;
58+
private transient Path lockFile;
59+
protected LockConfiguration lockConfiguration;
60+
61+
public FileSystemBasedLockProvider(final LockConfiguration lockConfiguration, final Configuration configuration) {
62+
checkRequiredProps(lockConfiguration);
63+
this.lockConfiguration = lockConfiguration;
64+
String lockDirectory = lockConfiguration.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY, null);
65+
if (StringUtils.isNullOrEmpty(lockDirectory)) {
66+
lockDirectory = lockConfiguration.getConfig().getString(HoodieWriteConfig.BASE_PATH.key())
67+
+ Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME;
68+
}
69+
this.lockTimeoutMinutes = lockConfiguration.getConfig().getInteger(FILESYSTEM_LOCK_EXPIRE_PROP_KEY);
70+
this.lockFile = new Path(lockDirectory + Path.SEPARATOR + LOCK_FILE_NAME);
71+
this.fs = FSUtils.getFs(this.lockFile.toString(), configuration);
72+
}
73+
74+
@Override
75+
public void close() {
76+
synchronized (LOCK_FILE_NAME) {
77+
try {
78+
fs.delete(this.lockFile, true);
79+
} catch (IOException e) {
80+
throw new HoodieLockException(generateLogStatement(LockState.FAILED_TO_RELEASE), e);
81+
}
82+
}
83+
}
84+
85+
@Override
86+
public boolean tryLock(long time, TimeUnit unit) {
87+
try {
88+
synchronized (LOCK_FILE_NAME) {
89+
// Check whether lock is already expired, if so try to delete lock file
90+
if (fs.exists(this.lockFile) && checkIfExpired()) {
91+
fs.delete(this.lockFile, true);
92+
}
93+
acquireLock();
94+
return fs.exists(this.lockFile);
95+
}
96+
} catch (IOException | HoodieIOException e) {
97+
LOG.info(generateLogStatement(LockState.FAILED_TO_ACQUIRE), e);
98+
return false;
99+
}
100+
}
101+
102+
@Override
103+
public void unlock() {
104+
synchronized (LOCK_FILE_NAME) {
105+
try {
106+
if (fs.exists(this.lockFile)) {
107+
fs.delete(this.lockFile, true);
108+
}
109+
} catch (IOException io) {
110+
throw new HoodieIOException(generateLogStatement(LockState.FAILED_TO_RELEASE), io);
111+
}
112+
}
113+
}
114+
115+
@Override
116+
public String getLock() {
117+
return this.lockFile.toString();
118+
}
119+
120+
private boolean checkIfExpired() {
121+
if (lockTimeoutMinutes == 0) {
122+
return false;
123+
}
124+
try {
125+
long modificationTime = fs.getFileStatus(this.lockFile).getModificationTime();
126+
if (System.currentTimeMillis() - modificationTime > lockTimeoutMinutes * 60 * 1000) {
127+
return true;
128+
}
129+
} catch (IOException | HoodieIOException e) {
130+
LOG.error(generateLogStatement(LockState.ALREADY_RELEASED) + " failed to get lockFile's modification time", e);
131+
}
132+
return false;
133+
}
134+
135+
private void acquireLock() {
136+
try {
137+
fs.create(this.lockFile, false).close();
138+
} catch (IOException e) {
139+
throw new HoodieIOException(generateLogStatement(LockState.FAILED_TO_ACQUIRE), e);
140+
}
141+
}
142+
143+
protected String generateLogStatement(LockState state) {
144+
return StringUtils.join(state.name(), " lock at: ", getLock());
145+
}
146+
147+
private void checkRequiredProps(final LockConfiguration config) {
148+
ValidationUtils.checkArgument(config.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY, null) != null
149+
|| config.getConfig().getString(HoodieWriteConfig.BASE_PATH.key(), null) != null);
150+
ValidationUtils.checkArgument(config.getConfig().getInteger(FILESYSTEM_LOCK_EXPIRE_PROP_KEY) >= 0);
151+
}
152+
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ public void lock() {
7474
if (retryCount >= maxRetries) {
7575
throw new HoodieLockException("Unable to acquire lock, lock object ", e);
7676
}
77+
try {
78+
Thread.sleep(maxWaitTimeInMs);
79+
} catch (InterruptedException ex) {
80+
// ignore InterruptedException here
81+
}
7782
} finally {
7883
retryCount++;
7984
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS;
3737
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS;
3838
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS;
39+
import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY;
3940
import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
4041
import static org.apache.hudi.common.config.LockConfiguration.HIVE_DATABASE_NAME_PROP_KEY;
4142
import static org.apache.hudi.common.config.LockConfiguration.HIVE_METASTORE_URI_PROP_KEY;
@@ -106,7 +107,13 @@ public class HoodieLockConfig extends HoodieConfig {
106107
.key(FILESYSTEM_LOCK_PATH_PROP_KEY)
107108
.noDefaultValue()
108109
.sinceVersion("0.8.0")
109-
.withDocumentation("For DFS based lock providers, path to store the locks under.");
110+
.withDocumentation("For DFS based lock providers, path to store the locks under. use Table's meta path as default");
111+
112+
public static final ConfigProperty<Integer> FILESYSTEM_LOCK_EXPIRE = ConfigProperty
113+
.key(FILESYSTEM_LOCK_EXPIRE_PROP_KEY)
114+
.defaultValue(0)
115+
.sinceVersion("0.12.0")
116+
.withDocumentation("For DFS based lock providers, expire time in minutes, must be a nonnegative number, default means no expire");
110117

111118
public static final ConfigProperty<String> HIVE_DATABASE_NAME = ConfigProperty
112119
.key(HIVE_DATABASE_NAME_PROP_KEY)
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.hudi.client;
21+
22+
import org.apache.hadoop.conf.Configuration;
23+
import org.apache.hadoop.fs.FileSystem;
24+
import org.apache.hadoop.fs.Path;
25+
import org.apache.hadoop.hdfs.MiniDFSCluster;
26+
import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
27+
import org.apache.hudi.common.config.LockConfiguration;
28+
import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
29+
import org.apache.hudi.config.HoodieWriteConfig;
30+
import org.apache.hudi.exception.HoodieLockException;
31+
import org.junit.jupiter.api.AfterAll;
32+
import org.junit.jupiter.api.AfterEach;
33+
import org.junit.jupiter.api.Assertions;
34+
import org.junit.jupiter.api.BeforeAll;
35+
import org.junit.jupiter.api.Test;
36+
37+
import java.io.IOException;
38+
import java.util.Properties;
39+
import java.util.concurrent.TimeUnit;
40+
41+
import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY;
42+
import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
43+
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY;
44+
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY;
45+
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
46+
47+
public class TestFileBasedLockProvider {
48+
private static HdfsTestService hdfsTestService;
49+
private static MiniDFSCluster dfsCluster;
50+
private static LockConfiguration lockConfiguration;
51+
private static Configuration hadoopConf;
52+
53+
@BeforeAll
54+
public static void setup() throws IOException {
55+
hdfsTestService = new HdfsTestService();
56+
dfsCluster = hdfsTestService.start(true);
57+
hadoopConf = dfsCluster.getFileSystem().getConf();
58+
59+
Properties properties = new Properties();
60+
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, "/tmp/");
61+
properties.setProperty(FILESYSTEM_LOCK_EXPIRE_PROP_KEY, "1");
62+
properties.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000");
63+
properties.setProperty(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "1000");
64+
properties.setProperty(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY, "3");
65+
lockConfiguration = new LockConfiguration(properties);
66+
}
67+
68+
@AfterAll
69+
public static void cleanUpAfterAll() throws IOException {
70+
Path workDir = dfsCluster.getFileSystem().getWorkingDirectory();
71+
FileSystem fs = workDir.getFileSystem(hdfsTestService.getHadoopConf());
72+
fs.delete(new Path("/tmp"), true);
73+
if (hdfsTestService != null) {
74+
hdfsTestService.stop();
75+
hdfsTestService = null;
76+
}
77+
}
78+
79+
@AfterEach
80+
public void cleanUpAfterEach() throws IOException {
81+
Path workDir = dfsCluster.getFileSystem().getWorkingDirectory();
82+
FileSystem fs = workDir.getFileSystem(hdfsTestService.getHadoopConf());
83+
fs.delete(new Path("/tmp/lock"), true);
84+
}
85+
86+
@Test
87+
public void testAcquireLock() {
88+
FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf);
89+
Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
90+
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
91+
fileBasedLockProvider.unlock();
92+
}
93+
94+
@Test
95+
public void testAcquireLockWithDefaultPath() {
96+
lockConfiguration.getConfig().remove(FILESYSTEM_LOCK_PATH_PROP_KEY);
97+
lockConfiguration.getConfig().setProperty(HoodieWriteConfig.BASE_PATH.key(), "/tmp/");
98+
FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf);
99+
Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
100+
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
101+
fileBasedLockProvider.unlock();
102+
lockConfiguration.getConfig().setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, "/tmp/");
103+
}
104+
105+
@Test
106+
public void testUnLock() {
107+
FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf);
108+
Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
109+
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
110+
fileBasedLockProvider.unlock();
111+
Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
112+
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
113+
}
114+
115+
@Test
116+
public void testReentrantLock() {
117+
FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf);
118+
Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
119+
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
120+
Assertions.assertFalse(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
121+
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
122+
fileBasedLockProvider.unlock();
123+
}
124+
125+
@Test
126+
public void testUnlockWithoutLock() {
127+
try {
128+
FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf);
129+
fileBasedLockProvider.unlock();
130+
} catch (HoodieLockException e) {
131+
Assertions.fail();
132+
}
133+
}
134+
135+
}

0 commit comments

Comments
 (0)