Skip to content

Commit 73308cf

Browse files
committed
GCP: Use catalog endpoint as base when refreshing OAuth2 token
1 parent 03ff41c commit 73308cf

File tree

3 files changed

+49
-27
lines changed

3 files changed

+49
-27
lines changed

gcp/src/main/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandler.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.List;
2727
import java.util.Map;
2828
import java.util.stream.Collectors;
29+
import org.apache.iceberg.CatalogProperties;
2930
import org.apache.iceberg.gcp.GCPProperties;
3031
import org.apache.iceberg.io.CloseableGroup;
3132
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -41,6 +42,9 @@
4142
public class OAuth2RefreshCredentialsHandler
4243
implements OAuth2CredentialsWithRefresh.OAuth2RefreshHandler, AutoCloseable {
4344
private final Map<String, String> properties;
45+
private final String credentialsEndpoint;
46+
// will be used to refresh the OAuth2 token
47+
private final String catalogEndpoint;
4448
private volatile HTTPClient client;
4549
private AuthManager authManager;
4650
private AuthSession authSession;
@@ -49,6 +53,11 @@ private OAuth2RefreshCredentialsHandler(Map<String, String> properties) {
4953
Preconditions.checkArgument(
5054
null != properties.get(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT),
5155
"Invalid credentials endpoint: null");
56+
Preconditions.checkArgument(
57+
null != properties.get(CatalogProperties.URI), "Invalid catalog endpoint: null");
58+
this.credentialsEndpoint =
59+
properties.get(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT);
60+
this.catalogEndpoint = properties.get(CatalogProperties.URI);
5261
this.properties = properties;
5362
}
5463

@@ -58,7 +67,7 @@ public AccessToken refreshAccessToken() {
5867
LoadCredentialsResponse response =
5968
httpClient()
6069
.get(
61-
properties.get(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT),
70+
credentialsEndpoint,
6271
null,
6372
LoadCredentialsResponse.class,
6473
Map.of(),
@@ -99,10 +108,7 @@ private RESTClient httpClient() {
99108
synchronized (this) {
100109
if (null == client) {
101110
authManager = AuthManagers.loadAuthManager("gcs-credentials-refresh", properties);
102-
HTTPClient httpClient =
103-
HTTPClient.builder(properties)
104-
.uri(properties.get(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT))
105-
.build();
111+
HTTPClient httpClient = HTTPClient.builder(properties).uri(catalogEndpoint).build();
106112
authSession = authManager.catalogSession(httpClient, properties);
107113
client = httpClient.withAuthSession(authSession);
108114
}

gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import java.util.Random;
4545
import java.util.stream.StreamSupport;
4646
import org.apache.hadoop.conf.Configuration;
47+
import org.apache.iceberg.CatalogProperties;
4748
import org.apache.iceberg.TestHelpers;
4849
import org.apache.iceberg.common.DynMethods;
4950
import org.apache.iceberg.gcp.GCPProperties;
@@ -238,6 +239,8 @@ public void refreshCredentialsEndpointSet() {
238239
try (GCSFileIO fileIO = new GCSFileIO()) {
239240
fileIO.initialize(
240241
ImmutableMap.of(
242+
CatalogProperties.URI,
243+
"http://catalog-endpoint",
241244
GCS_OAUTH2_TOKEN,
242245
"gcsToken",
243246
GCS_OAUTH2_TOKEN_EXPIRES_AT,

gcp/src/test/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandlerTest.java

Lines changed: 35 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import com.google.auth.oauth2.AccessToken;
2626
import java.time.Instant;
2727
import java.time.temporal.ChronoUnit;
28+
import java.util.Map;
29+
import org.apache.iceberg.CatalogProperties;
2830
import org.apache.iceberg.exceptions.BadRequestException;
2931
import org.apache.iceberg.exceptions.RESTException;
3032
import org.apache.iceberg.gcp.GCPProperties;
@@ -45,7 +47,15 @@
4547

4648
public class OAuth2RefreshCredentialsHandlerTest {
4749
private static final int PORT = 3333;
48-
private static final String URI = String.format("http://127.0.0.1:%d/v1/credentials", PORT);
50+
private static final String CREDENTIALS_URI =
51+
String.format("http://127.0.0.1:%d/v1/credentials", PORT);
52+
private static final String CATALOG_URI = String.format("http://127.0.0.1:%d/v1/", PORT);
53+
private static final Map<String, String> PROPERTIES =
54+
ImmutableMap.of(
55+
GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT,
56+
CREDENTIALS_URI,
57+
CatalogProperties.URI,
58+
CATALOG_URI);
4959
private static ClientAndServer mockServer;
5060

5161
@BeforeAll
@@ -65,18 +75,33 @@ public void before() {
6575

6676
@Test
6777
public void invalidOrMissingUri() {
68-
assertThatThrownBy(() -> OAuth2RefreshCredentialsHandler.create(ImmutableMap.of()))
78+
assertThatThrownBy(
79+
() ->
80+
OAuth2RefreshCredentialsHandler.create(
81+
ImmutableMap.of(CatalogProperties.URI, CATALOG_URI)))
6982
.isInstanceOf(IllegalArgumentException.class)
7083
.hasMessage("Invalid credentials endpoint: null");
7184

85+
assertThatThrownBy(
86+
() ->
87+
OAuth2RefreshCredentialsHandler.create(
88+
ImmutableMap.of(
89+
GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, CREDENTIALS_URI)))
90+
.isInstanceOf(IllegalArgumentException.class)
91+
.hasMessage("Invalid catalog endpoint: null");
92+
7293
assertThatThrownBy(
7394
() ->
7495
OAuth2RefreshCredentialsHandler.create(
7596
ImmutableMap.of(
76-
GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, "invalid uri"))
97+
GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT,
98+
"invalid uri",
99+
CatalogProperties.URI,
100+
CATALOG_URI))
77101
.refreshAccessToken())
78102
.isInstanceOf(RESTException.class)
79-
.hasMessageStartingWith("Failed to create request URI from base invalid uri");
103+
.hasMessageStartingWith(
104+
"Failed to create request URI from base %sinvalid uri", CATALOG_URI);
80105
}
81106

82107
@Test
@@ -87,9 +112,7 @@ public void badRequest() {
87112
HttpResponse mockResponse = HttpResponse.response().withStatusCode(400);
88113
mockServer.when(mockRequest).respond(mockResponse);
89114

90-
OAuth2RefreshCredentialsHandler handler =
91-
OAuth2RefreshCredentialsHandler.create(
92-
ImmutableMap.of(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, URI));
115+
OAuth2RefreshCredentialsHandler handler = OAuth2RefreshCredentialsHandler.create(PROPERTIES);
93116

94117
assertThatThrownBy(handler::refreshAccessToken)
95118
.isInstanceOf(BadRequestException.class)
@@ -108,9 +131,7 @@ public void noGcsCredentialInResponse() {
108131
.withStatusCode(200);
109132
mockServer.when(mockRequest).respond(mockResponse);
110133

111-
OAuth2RefreshCredentialsHandler handler =
112-
OAuth2RefreshCredentialsHandler.create(
113-
ImmutableMap.of(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, URI));
134+
OAuth2RefreshCredentialsHandler handler = OAuth2RefreshCredentialsHandler.create(PROPERTIES);
114135

115136
assertThatThrownBy(handler::refreshAccessToken)
116137
.isInstanceOf(IllegalStateException.class)
@@ -134,9 +155,7 @@ public void noGcsToken() {
134155
.withStatusCode(200);
135156
mockServer.when(mockRequest).respond(mockResponse);
136157

137-
OAuth2RefreshCredentialsHandler handler =
138-
OAuth2RefreshCredentialsHandler.create(
139-
ImmutableMap.of(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, URI));
158+
OAuth2RefreshCredentialsHandler handler = OAuth2RefreshCredentialsHandler.create(PROPERTIES);
140159

141160
assertThatThrownBy(handler::refreshAccessToken)
142161
.isInstanceOf(IllegalStateException.class)
@@ -160,9 +179,7 @@ public void tokenWithoutExpiration() {
160179
.withStatusCode(200);
161180
mockServer.when(mockRequest).respond(mockResponse);
162181

163-
OAuth2RefreshCredentialsHandler handler =
164-
OAuth2RefreshCredentialsHandler.create(
165-
ImmutableMap.of(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, URI));
182+
OAuth2RefreshCredentialsHandler handler = OAuth2RefreshCredentialsHandler.create(PROPERTIES);
166183

167184
assertThatThrownBy(handler::refreshAccessToken)
168185
.isInstanceOf(IllegalStateException.class)
@@ -191,9 +208,7 @@ public void tokenWithExpiration() {
191208
.withStatusCode(200);
192209
mockServer.when(mockRequest).respond(mockResponse);
193210

194-
OAuth2RefreshCredentialsHandler handler =
195-
OAuth2RefreshCredentialsHandler.create(
196-
ImmutableMap.of(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, URI));
211+
OAuth2RefreshCredentialsHandler handler = OAuth2RefreshCredentialsHandler.create(PROPERTIES);
197212

198213
AccessToken accessToken = handler.refreshAccessToken();
199214
assertThat(accessToken.getTokenValue())
@@ -253,9 +268,7 @@ public void multipleGcsCredentials() {
253268
.withStatusCode(200);
254269
mockServer.when(mockRequest).respond(mockResponse);
255270

256-
OAuth2RefreshCredentialsHandler handler =
257-
OAuth2RefreshCredentialsHandler.create(
258-
ImmutableMap.of(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, URI));
271+
OAuth2RefreshCredentialsHandler handler = OAuth2RefreshCredentialsHandler.create(PROPERTIES);
259272

260273
assertThatThrownBy(handler::refreshAccessToken)
261274
.isInstanceOf(IllegalStateException.class)

0 commit comments

Comments
 (0)