Skip to content
This repository was archived by the owner on Mar 6, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 6 additions & 14 deletions google/auth/impersonated_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
'/serviceAccounts/{}:generateAccessToken')

_REFRESH_ERROR = 'Unable to acquire impersonated credentials'
_LIFETIME_ERROR = 'Credentials with lifetime set cannot be renewed'


def _make_iam_token_request(request, principal, headers, body):
Expand Down Expand Up @@ -122,6 +121,9 @@ class Credentials(credentials.Credentials):
token creator role on
`impersonated-account@_project_.iam.gserviceaccount.com`.

Enable the IAMCredentials API on the source project:
`gcloud services enable iamcredentials.googleapis.com`.

Initialize a source credential which does not have access to
list bucket::

Expand Down Expand Up @@ -156,7 +158,7 @@ class Credentials(credentials.Credentials):

def __init__(self, source_credentials, target_principal,
target_scopes, delegates=None,
lifetime=None):
lifetime=_DEFAULT_TOKEN_LIFETIME_SECS):
"""
Args:
source_credentials (google.auth.Credentials): The source credential
Expand All @@ -175,9 +177,7 @@ def __init__(self, source_credentials, target_principal,
If left unset, source_credential must have that role on
target_principal.
lifetime (int): Number of seconds the delegated credential should
be valid for (upto 3600). If set, the credentials will
**not** get refreshed after expiration. If not set, the
credentials will be refreshed every 3600s.
be valid for (upto 3600).
"""

super(Credentials, self).__init__()
Expand All @@ -193,10 +193,6 @@ def __init__(self, source_credentials, target_principal,

@_helpers.copy_docstring(credentials.Credentials)
def refresh(self, request):
if (self.token is not None and self._lifetime is not None):
self.expiry = _helpers.utcnow()
raise exceptions.RefreshError(_LIFETIME_ERROR)
self._source_credentials.refresh(request)
self._update_token(request)

@property
Expand All @@ -215,14 +211,10 @@ def _update_token(self, request):
# Refresh our source credentials.
self._source_credentials.refresh(request)

lifetime = self._lifetime
if (self._lifetime is None):
lifetime = _DEFAULT_TOKEN_LIFETIME_SECS

body = {
"delegates": self._delegates,
"scope": self._target_scopes,
"lifetime": str(lifetime) + "s"
"lifetime": str(self._lifetime) + "s"
}

headers = {
Expand Down
26 changes: 0 additions & 26 deletions tests/test_impersonated_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,32 +133,6 @@ def test_refresh_failure_malformed_expire_time(
assert not credentials.valid
assert credentials.expired

def test_refresh_failure_lifetime_specified(self, mock_donor_credentials):
credentials = self.make_credentials(lifetime=500)
token = 'token'

expire_time = (
_helpers.utcnow().replace(microsecond=0) +
datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
response_body = {
"accessToken": token,
"expireTime": expire_time
}

request = self.make_request(
data=json.dumps(response_body),
status=http_client.OK)

credentials.refresh(request)

with pytest.raises(exceptions.RefreshError) as excinfo:
credentials.refresh(request)

assert excinfo.match(impersonated_credentials._LIFETIME_ERROR)

assert not credentials.valid
assert credentials.expired

def test_refresh_failure_unauthorzed(self, mock_donor_credentials):
credentials = self.make_credentials(lifetime=None)

Expand Down