aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/google-auth/py3/google/auth/aws.py
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-04-04 07:45:46 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-04-04 07:53:28 +0300
commit51958dfd22674e02052c8a292ab70fc2d52a07fc (patch)
tree42d2b859c555e9045203791ed3f60fa16e1b267e /contrib/python/google-auth/py3/google/auth/aws.py
parent7e62114667a5059c6b6a617d4cd9076928818478 (diff)
downloadydb-51958dfd22674e02052c8a292ab70fc2d52a07fc.tar.gz
Intermediate changes
Diffstat (limited to 'contrib/python/google-auth/py3/google/auth/aws.py')
-rw-r--r--contrib/python/google-auth/py3/google/auth/aws.py618
1 files changed, 351 insertions, 267 deletions
diff --git a/contrib/python/google-auth/py3/google/auth/aws.py b/contrib/python/google-auth/py3/google/auth/aws.py
index 6e0e4e864f..28c065d3c7 100644
--- a/contrib/python/google-auth/py3/google/auth/aws.py
+++ b/contrib/python/google-auth/py3/google/auth/aws.py
@@ -21,10 +21,11 @@ of long-live service account private keys.
AWS Credentials are initialized using external_account arguments which are
typically loaded from the external credentials JSON file.
-Unlike other Credentials that can be initialized with a list of explicit
-arguments, secrets or credentials, external account clients use the
-environment and hints/guidelines provided by the external_account JSON
-file to retrieve credentials and exchange them for Google access tokens.
+
+This module also provides a definition for an abstract AWS security credentials supplier.
+This supplier can be implemented to return valid AWS security credentials and an AWS region
+and used to create AWS credentials. The credentials will then call the
+supplier instead of using pre-defined methods such as calling the EC2 metadata endpoints.
This module also provides a basic implementation of the
`AWS Signature Version 4`_ request signing algorithm.
@@ -37,6 +38,8 @@ via the GCP STS endpoint.
.. _AWS STS GetCallerIdentity: https://docs.aws.amazon.com/STS/latest/APIReference/API_GetCallerIdentity.html
"""
+import abc
+from dataclasses import dataclass
import hashlib
import hmac
import http.client as http_client
@@ -44,6 +47,7 @@ import json
import os
import posixpath
import re
+from typing import Optional
import urllib
from urllib.parse import urljoin
@@ -61,6 +65,12 @@ _AWS_REQUEST_TYPE = "aws4_request"
_AWS_SECURITY_TOKEN_HEADER = "x-amz-security-token"
# The AWS authorization header name for the auto-generated date.
_AWS_DATE_HEADER = "x-amz-date"
+# The default AWS regional credential verification URL.
+_DEFAULT_AWS_REGIONAL_CREDENTIAL_VERIFICATION_URL = (
+ "https://sts.{region}.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15"
+)
+# IMDSV2 session token lifetime. This is set to a low value because the session token is used immediately.
+_IMDSV2_SESSION_TOKEN_TTL_SECONDS = "300"
class RequestSigner(object):
@@ -92,8 +102,7 @@ class RequestSigner(object):
https://docs.aws.amazon.com/general/latest/gr/sigv4_signing.html
Args:
- aws_security_credentials (Mapping[str, str]): A dictionary containing
- the AWS security credentials.
+ aws_security_credentials (AWSSecurityCredentials): The AWS security credentials.
url (str): The AWS service URL containing the canonical URI and
query string.
method (str): The HTTP method used to call this API.
@@ -105,10 +114,6 @@ class RequestSigner(object):
Returns:
Mapping[str, str]: The AWS signed request dictionary object.
"""
- # Get AWS credentials.
- access_key = aws_security_credentials.get("access_key_id")
- secret_key = aws_security_credentials.get("secret_access_key")
- security_token = aws_security_credentials.get("security_token")
additional_headers = additional_headers or {}
@@ -129,9 +134,7 @@ class RequestSigner(object):
canonical_querystring=_get_canonical_querystring(uri.query),
method=method,
region=self._region_name,
- access_key=access_key,
- secret_key=secret_key,
- security_token=security_token,
+ aws_security_credentials=aws_security_credentials,
request_payload=request_payload,
additional_headers=additional_headers,
)
@@ -147,8 +150,8 @@ class RequestSigner(object):
headers[key] = additional_headers[key]
# Add session token if available.
- if security_token is not None:
- headers[_AWS_SECURITY_TOKEN_HEADER] = security_token
+ if aws_security_credentials.session_token is not None:
+ headers[_AWS_SECURITY_TOKEN_HEADER] = aws_security_credentials.session_token
signed_request = {"url": url, "method": method, "headers": headers}
if request_payload:
@@ -233,9 +236,7 @@ def _generate_authentication_header_map(
canonical_querystring,
method,
region,
- access_key,
- secret_key,
- security_token,
+ aws_security_credentials,
request_payload="",
additional_headers={},
):
@@ -248,10 +249,7 @@ def _generate_authentication_header_map(
canonical_querystring (str): The AWS service URL query string.
method (str): The HTTP method used to call this API.
region (str): The AWS region.
- access_key (str): The AWS access key ID.
- secret_key (str): The AWS secret access key.
- security_token (Optional[str]): The AWS security session token. This is
- available for temporary sessions.
+ aws_security_credentials (AWSSecurityCredentials): The AWS security credentials.
request_payload (Optional[str]): The optional request payload if
available.
additional_headers (Optional[Mapping[str, str]]): The optional
@@ -274,8 +272,10 @@ def _generate_authentication_header_map(
for key in additional_headers:
full_headers[key.lower()] = additional_headers[key]
# Add AWS session token if available.
- if security_token is not None:
- full_headers[_AWS_SECURITY_TOKEN_HEADER] = security_token
+ if aws_security_credentials.session_token is not None:
+ full_headers[
+ _AWS_SECURITY_TOKEN_HEADER
+ ] = aws_security_credentials.session_token
# Required headers
full_headers["host"] = host
@@ -321,14 +321,20 @@ def _generate_authentication_header_map(
)
# https://docs.aws.amazon.com/general/latest/gr/sigv4-calculate-signature.html
- signing_key = _get_signing_key(secret_key, date_stamp, region, service_name)
+ signing_key = _get_signing_key(
+ aws_security_credentials.secret_access_key, date_stamp, region, service_name
+ )
signature = hmac.new(
signing_key, string_to_sign.encode("utf-8"), hashlib.sha256
).hexdigest()
# https://docs.aws.amazon.com/general/latest/gr/sigv4-add-signature-to-request.html
authorization_header = "{} Credential={}/{}, SignedHeaders={}, Signature={}".format(
- _AWS_ALGORITHM, access_key, credential_scope, signed_headers, signature
+ _AWS_ALGORITHM,
+ aws_security_credentials.access_key_id,
+ credential_scope,
+ signed_headers,
+ signature,
)
authentication_header = {"authorization_header": authorization_header}
@@ -338,211 +344,112 @@ def _generate_authentication_header_map(
return authentication_header
-class Credentials(external_account.Credentials):
- """AWS external account credentials.
- This is used to exchange serialized AWS signature v4 signed requests to
- AWS STS GetCallerIdentity service for Google access tokens.
- """
-
- def __init__(
- self,
- audience,
- subject_token_type,
- token_url,
- credential_source=None,
- *args,
- **kwargs
- ):
- """Instantiates an AWS workload external account credentials object.
-
- Args:
- audience (str): The STS audience field.
- subject_token_type (str): The subject token type.
- token_url (str): The STS endpoint URL.
- credential_source (Mapping): The credential source dictionary used
- to provide instructions on how to retrieve external credential
- to be exchanged for Google access tokens.
- args (List): Optional positional arguments passed into the underlying :meth:`~external_account.Credentials.__init__` method.
- kwargs (Mapping): Optional keyword arguments passed into the underlying :meth:`~external_account.Credentials.__init__` method.
-
- Raises:
- google.auth.exceptions.RefreshError: If an error is encountered during
- access token retrieval logic.
- ValueError: For invalid parameters.
-
- .. note:: Typically one of the helper constructors
- :meth:`from_file` or
- :meth:`from_info` are used instead of calling the constructor directly.
- """
- super(Credentials, self).__init__(
- audience=audience,
- subject_token_type=subject_token_type,
- token_url=token_url,
- credential_source=credential_source,
- *args,
- **kwargs
- )
- credential_source = credential_source or {}
- self._environment_id = credential_source.get("environment_id") or ""
- self._region_url = credential_source.get("region_url")
- self._security_credentials_url = credential_source.get("url")
- self._cred_verification_url = credential_source.get(
- "regional_cred_verification_url"
- )
- self._imdsv2_session_token_url = credential_source.get(
- "imdsv2_session_token_url"
- )
- self._region = None
- self._request_signer = None
- self._target_resource = audience
+@dataclass
+class AwsSecurityCredentials:
+ """A class that models AWS security credentials with an optional session token.
- # Get the environment ID. Currently, only one version supported (v1).
- matches = re.match(r"^(aws)([\d]+)$", self._environment_id)
- if matches:
- env_id, env_version = matches.groups()
- else:
- env_id, env_version = (None, None)
+ Attributes:
+ access_key_id (str): The AWS security credentials access key id.
+ secret_access_key (str): The AWS security credentials secret access key.
+ session_token (Optional[str]): The optional AWS security credentials session token. This should be set when using temporary credentials.
+ """
- if env_id != "aws" or self._cred_verification_url is None:
- raise exceptions.InvalidResource(
- "No valid AWS 'credential_source' provided"
- )
- elif int(env_version or "") != 1:
- raise exceptions.InvalidValue(
- "aws version '{}' is not supported in the current build.".format(
- env_version
- )
- )
+ access_key_id: str
+ secret_access_key: str
+ session_token: Optional[str] = None
- def retrieve_subject_token(self, request):
- """Retrieves the subject token using the credential_source object.
- The subject token is a serialized `AWS GetCallerIdentity signed request`_.
- The logic is summarized as:
+class AwsSecurityCredentialsSupplier(metaclass=abc.ABCMeta):
+ """Base class for AWS security credential suppliers. This can be implemented with custom logic to retrieve
+ AWS security credentials to exchange for a Google Cloud access token. The AWS external account credential does
+ not cache the AWS security credentials, so caching logic should be added in the implementation.
+ """
- Retrieve the AWS region from the AWS_REGION or AWS_DEFAULT_REGION
- environment variable or from the AWS metadata server availability-zone
- if not found in the environment variable.
+ @abc.abstractmethod
+ def get_aws_security_credentials(self, context, request):
+ """Returns the AWS security credentials for the requested context.
- Check AWS credentials in environment variables. If not found, retrieve
- from the AWS metadata server security-credentials endpoint.
+ .. warning: This is not cached by the calling Google credential, so caching logic should be implemented in the supplier.
- When retrieving AWS credentials from the metadata server
- security-credentials endpoint, the AWS role needs to be determined by
- calling the security-credentials endpoint without any argument. Then the
- credentials can be retrieved via: security-credentials/role_name
+ Args:
+ context (google.auth.externalaccount.SupplierContext): The context object
+ containing information about the requested audience and subject token type.
+ request (google.auth.transport.Request): The object used to make
+ HTTP requests.
- Generate the signed request to AWS STS GetCallerIdentity action.
+ Raises:
+ google.auth.exceptions.RefreshError: If an error is encountered during
+ security credential retrieval logic.
- Inject x-goog-cloud-target-resource into header and serialize the
- signed request. This will be the subject-token to pass to GCP STS.
+ Returns:
+ AwsSecurityCredentials: The requested AWS security credentials.
+ """
+ raise NotImplementedError("")
- .. _AWS GetCallerIdentity signed request:
- https://cloud.google.com/iam/docs/access-resources-aws#exchange-token
+ @abc.abstractmethod
+ def get_aws_region(self, context, request):
+ """Returns the AWS region for the requested context.
Args:
- request (google.auth.transport.Request): A callable used to make
+ context (google.auth.externalaccount.SupplierContext): The context object
+ containing information about the requested audience and subject token type.
+ request (google.auth.transport.Request): The object used to make
HTTP requests.
- Returns:
- str: The retrieved subject token.
- """
- # Fetch the session token required to make meta data endpoint calls to aws.
- if (
- request is not None
- and self._imdsv2_session_token_url is not None
- and self._should_use_metadata_server()
- ):
- headers = {"X-aws-ec2-metadata-token-ttl-seconds": "300"}
- imdsv2_session_token_response = request(
- url=self._imdsv2_session_token_url, method="PUT", headers=headers
- )
+ Raises:
+ google.auth.exceptions.RefreshError: If an error is encountered during
+ region retrieval logic.
- if imdsv2_session_token_response.status != 200:
- raise exceptions.RefreshError(
- "Unable to retrieve AWS Session Token",
- imdsv2_session_token_response.data,
- )
+ Returns:
+ str: The AWS region.
+ """
+ raise NotImplementedError("")
- imdsv2_session_token = imdsv2_session_token_response.data
- else:
- imdsv2_session_token = None
- # Initialize the request signer if not yet initialized after determining
- # the current AWS region.
- if self._request_signer is None:
- self._region = self._get_region(
- request, self._region_url, imdsv2_session_token
- )
- self._request_signer = RequestSigner(self._region)
+class _DefaultAwsSecurityCredentialsSupplier(AwsSecurityCredentialsSupplier):
+ """Default implementation of AWS security credentials supplier. Supports retrieving
+ credentials and region via EC2 metadata endpoints and environment variables.
+ """
- # Retrieve the AWS security credentials needed to generate the signed
- # request.
- aws_security_credentials = self._get_security_credentials(
- request, imdsv2_session_token
- )
- # Generate the signed request to AWS STS GetCallerIdentity API.
- # Use the required regional endpoint. Otherwise, the request will fail.
- request_options = self._request_signer.get_request_options(
- aws_security_credentials,
- self._cred_verification_url.replace("{region}", self._region),
- "POST",
+ def __init__(self, credential_source):
+ self._region_url = credential_source.get("region_url")
+ self._security_credentials_url = credential_source.get("url")
+ self._imdsv2_session_token_url = credential_source.get(
+ "imdsv2_session_token_url"
)
- # The GCP STS endpoint expects the headers to be formatted as:
- # [
- # {key: 'x-amz-date', value: '...'},
- # {key: 'Authorization', value: '...'},
- # ...
- # ]
- # And then serialized as:
- # quote(json.dumps({
- # url: '...',
- # method: 'POST',
- # headers: [{key: 'x-amz-date', value: '...'}, ...]
- # }))
- request_headers = request_options.get("headers")
- # The full, canonical resource name of the workload identity pool
- # provider, with or without the HTTPS prefix.
- # Including this header as part of the signature is recommended to
- # ensure data integrity.
- request_headers["x-goog-cloud-target-resource"] = self._target_resource
- # Serialize AWS signed request.
- # Keeping inner keys in sorted order makes testing easier for Python
- # versions <=3.5 as the stringified JSON string would have a predictable
- # key order.
- aws_signed_req = {}
- aws_signed_req["url"] = request_options.get("url")
- aws_signed_req["method"] = request_options.get("method")
- aws_signed_req["headers"] = []
- # Reformat header to GCP STS expected format.
- for key in sorted(request_headers.keys()):
- aws_signed_req["headers"].append(
- {"key": key, "value": request_headers[key]}
- )
+ @_helpers.copy_docstring(AwsSecurityCredentialsSupplier)
+ def get_aws_security_credentials(self, context, request):
- return urllib.parse.quote(
- json.dumps(aws_signed_req, separators=(",", ":"), sort_keys=True)
+ # Check environment variables for permanent credentials first.
+ # https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html
+ env_aws_access_key_id = os.environ.get(environment_vars.AWS_ACCESS_KEY_ID)
+ env_aws_secret_access_key = os.environ.get(
+ environment_vars.AWS_SECRET_ACCESS_KEY
)
+ # This is normally not available for permanent credentials.
+ env_aws_session_token = os.environ.get(environment_vars.AWS_SESSION_TOKEN)
+ if env_aws_access_key_id and env_aws_secret_access_key:
+ return AwsSecurityCredentials(
+ env_aws_access_key_id, env_aws_secret_access_key, env_aws_session_token
+ )
- def _get_region(self, request, url, imdsv2_session_token):
- """Retrieves the current AWS region from either the AWS_REGION or
- AWS_DEFAULT_REGION environment variable or from the AWS metadata server.
+ imdsv2_session_token = self._get_imdsv2_session_token(request)
+ role_name = self._get_metadata_role_name(request, imdsv2_session_token)
- Args:
- request (google.auth.transport.Request): A callable used to make
- HTTP requests.
- url (str): The AWS metadata server region URL.
- imdsv2_session_token (str): The AWS IMDSv2 session token to be added as a
- header in the requests to AWS metadata endpoint.
+ # Get security credentials.
+ credentials = self._get_metadata_security_credentials(
+ request, role_name, imdsv2_session_token
+ )
- Returns:
- str: The current AWS region.
+ return AwsSecurityCredentials(
+ credentials.get("AccessKeyId"),
+ credentials.get("SecretAccessKey"),
+ credentials.get("Token"),
+ )
- Raises:
- google.auth.exceptions.RefreshError: If an error occurs while
- retrieving the AWS region.
- """
+ @_helpers.copy_docstring(AwsSecurityCredentialsSupplier)
+ def get_aws_region(self, context, request):
# The AWS metadata server is not available in some AWS environments
# such as AWS lambda. Instead, it is available via environment
# variable.
@@ -558,6 +465,7 @@ class Credentials(external_account.Credentials):
raise exceptions.RefreshError("Unable to determine AWS region")
headers = None
+ imdsv2_session_token = self._get_imdsv2_session_token(request)
if imdsv2_session_token is not None:
headers = {"X-aws-ec2-metadata-token": imdsv2_session_token}
@@ -570,62 +478,35 @@ class Credentials(external_account.Credentials):
else response.data
)
- if response.status != 200:
+ if response.status != http_client.OK:
raise exceptions.RefreshError(
- "Unable to retrieve AWS region", response_body
+ "Unable to retrieve AWS region: {}".format(response_body)
)
# This endpoint will return the region in format: us-east-2b.
# Only the us-east-2 part should be used.
return response_body[:-1]
- def _get_security_credentials(self, request, imdsv2_session_token):
- """Retrieves the AWS security credentials required for signing AWS
- requests from either the AWS security credentials environment variables
- or from the AWS metadata server.
-
- Args:
- request (google.auth.transport.Request): A callable used to make
- HTTP requests.
- imdsv2_session_token (str): The AWS IMDSv2 session token to be added as a
- header in the requests to AWS metadata endpoint.
-
- Returns:
- Mapping[str, str]: The AWS security credentials dictionary object.
-
- Raises:
- google.auth.exceptions.RefreshError: If an error occurs while
- retrieving the AWS security credentials.
- """
-
- # Check environment variables for permanent credentials first.
- # https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html
- env_aws_access_key_id = os.environ.get(environment_vars.AWS_ACCESS_KEY_ID)
- env_aws_secret_access_key = os.environ.get(
- environment_vars.AWS_SECRET_ACCESS_KEY
- )
- # This is normally not available for permanent credentials.
- env_aws_session_token = os.environ.get(environment_vars.AWS_SESSION_TOKEN)
- if env_aws_access_key_id and env_aws_secret_access_key:
- return {
- "access_key_id": env_aws_access_key_id,
- "secret_access_key": env_aws_secret_access_key,
- "security_token": env_aws_session_token,
+ def _get_imdsv2_session_token(self, request):
+ if request is not None and self._imdsv2_session_token_url is not None:
+ headers = {
+ "X-aws-ec2-metadata-token-ttl-seconds": _IMDSV2_SESSION_TOKEN_TTL_SECONDS
}
- # Get role name.
- role_name = self._get_metadata_role_name(request, imdsv2_session_token)
+ imdsv2_session_token_response = request(
+ url=self._imdsv2_session_token_url, method="PUT", headers=headers
+ )
- # Get security credentials.
- credentials = self._get_metadata_security_credentials(
- request, role_name, imdsv2_session_token
- )
+ if imdsv2_session_token_response.status != http_client.OK:
+ raise exceptions.RefreshError(
+ "Unable to retrieve AWS Session Token: {}".format(
+ imdsv2_session_token_response.data
+ )
+ )
- return {
- "access_key_id": credentials.get("AccessKeyId"),
- "secret_access_key": credentials.get("SecretAccessKey"),
- "security_token": credentials.get("Token"),
- }
+ return imdsv2_session_token_response.data
+ else:
+ return None
def _get_metadata_security_credentials(
self, request, role_name, imdsv2_session_token
@@ -669,7 +550,7 @@ class Credentials(external_account.Credentials):
if response.status != http_client.OK:
raise exceptions.RefreshError(
- "Unable to retrieve AWS security credentials", response_body
+ "Unable to retrieve AWS security credentials: {}".format(response_body)
)
credentials_response = json.loads(response_body)
@@ -717,35 +598,232 @@ class Credentials(external_account.Credentials):
if response.status != http_client.OK:
raise exceptions.RefreshError(
- "Unable to retrieve AWS role name", response_body
+ "Unable to retrieve AWS role name {}".format(response_body)
)
return response_body
- def _should_use_metadata_server(self):
- # The AWS region can be provided through AWS_REGION or AWS_DEFAULT_REGION.
- # The metadata server should be used if it cannot be retrieved from one of
- # these environment variables.
- if not os.environ.get(environment_vars.AWS_REGION) and not os.environ.get(
- environment_vars.AWS_DEFAULT_REGION
- ):
- return True
- # AWS security credentials can be retrieved from the AWS_ACCESS_KEY_ID
- # and AWS_SECRET_ACCESS_KEY environment variables. The metadata server
- # should be used if either of these are not available.
- if not os.environ.get(environment_vars.AWS_ACCESS_KEY_ID) or not os.environ.get(
- environment_vars.AWS_SECRET_ACCESS_KEY
+class Credentials(external_account.Credentials):
+ """AWS external account credentials.
+ This is used to exchange serialized AWS signature v4 signed requests to
+ AWS STS GetCallerIdentity service for Google access tokens.
+ """
+
+ def __init__(
+ self,
+ audience,
+ subject_token_type,
+ token_url=external_account._DEFAULT_TOKEN_URL,
+ credential_source=None,
+ aws_security_credentials_supplier=None,
+ *args,
+ **kwargs
+ ):
+ """Instantiates an AWS workload external account credentials object.
+
+ Args:
+ audience (str): The STS audience field.
+ subject_token_type (str): The subject token type based on the Oauth2.0 token exchange spec.
+ Expected values include::
+
+ “urn:ietf:params:aws:token-type:aws4_request”
+
+ token_url (Optional [str]): The STS endpoint URL. If not provided, will default to "https://sts.googleapis.com/v1/token".
+ credential_source (Optional [Mapping]): The credential source dictionary used
+ to provide instructions on how to retrieve external credential to be exchanged for Google access tokens.
+ Either a credential source or an AWS security credentials supplier must be provided.
+
+ Example credential_source for AWS credential::
+
+ {
+ "environment_id": "aws1",
+ "regional_cred_verification_url": "https://sts.{region}.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15",
+ "region_url": "http://169.254.169.254/latest/meta-data/placement/availability-zone",
+ "url": "http://169.254.169.254/latest/meta-data/iam/security-credentials",
+ imdsv2_session_token_url": "http://169.254.169.254/latest/api/token"
+ }
+
+ aws_security_credentials_supplier (Optional [AwsSecurityCredentialsSupplier]): Optional AWS security credentials supplier.
+ This will be called to supply valid AWS security credentails which will then
+ be exchanged for Google access tokens. Either an AWS security credentials supplier
+ or a credential source must be provided.
+ args (List): Optional positional arguments passed into the underlying :meth:`~external_account.Credentials.__init__` method.
+ kwargs (Mapping): Optional keyword arguments passed into the underlying :meth:`~external_account.Credentials.__init__` method.
+
+ Raises:
+ google.auth.exceptions.RefreshError: If an error is encountered during
+ access token retrieval logic.
+ ValueError: For invalid parameters.
+
+ .. note:: Typically one of the helper constructors
+ :meth:`from_file` or
+ :meth:`from_info` are used instead of calling the constructor directly.
+ """
+ super(Credentials, self).__init__(
+ audience=audience,
+ subject_token_type=subject_token_type,
+ token_url=token_url,
+ credential_source=credential_source,
+ *args,
+ **kwargs
+ )
+ if credential_source is None and aws_security_credentials_supplier is None:
+ raise exceptions.InvalidValue(
+ "A valid credential source or AWS security credentials supplier must be provided."
+ )
+ if (
+ credential_source is not None
+ and aws_security_credentials_supplier is not None
):
- return True
+ raise exceptions.InvalidValue(
+ "AWS credential cannot have both a credential source and an AWS security credentials supplier."
+ )
+
+ if aws_security_credentials_supplier:
+ self._aws_security_credentials_supplier = aws_security_credentials_supplier
+ # The regional cred verification URL would normally be provided through the credential source. So set it to the default one here.
+ self._cred_verification_url = (
+ _DEFAULT_AWS_REGIONAL_CREDENTIAL_VERIFICATION_URL
+ )
+ else:
+ environment_id = credential_source.get("environment_id") or ""
+ self._aws_security_credentials_supplier = _DefaultAwsSecurityCredentialsSupplier(
+ credential_source
+ )
+ self._cred_verification_url = credential_source.get(
+ "regional_cred_verification_url"
+ )
+
+ # Get the environment ID, i.e. "aws1". Currently, only one version supported (1).
+ matches = re.match(r"^(aws)([\d]+)$", environment_id)
+ if matches:
+ env_id, env_version = matches.groups()
+ else:
+ env_id, env_version = (None, None)
+
+ if env_id != "aws" or self._cred_verification_url is None:
+ raise exceptions.InvalidResource(
+ "No valid AWS 'credential_source' provided"
+ )
+ elif env_version is None or int(env_version) != 1:
+ raise exceptions.InvalidValue(
+ "aws version '{}' is not supported in the current build.".format(
+ env_version
+ )
+ )
+
+ self._target_resource = audience
+ self._request_signer = None
+
+ def retrieve_subject_token(self, request):
+ """Retrieves the subject token using the credential_source object.
+ The subject token is a serialized `AWS GetCallerIdentity signed request`_.
+
+ The logic is summarized as:
+
+ Retrieve the AWS region from the AWS_REGION or AWS_DEFAULT_REGION
+ environment variable or from the AWS metadata server availability-zone
+ if not found in the environment variable.
+
+ Check AWS credentials in environment variables. If not found, retrieve
+ from the AWS metadata server security-credentials endpoint.
+
+ When retrieving AWS credentials from the metadata server
+ security-credentials endpoint, the AWS role needs to be determined by
+ calling the security-credentials endpoint without any argument. Then the
+ credentials can be retrieved via: security-credentials/role_name
+
+ Generate the signed request to AWS STS GetCallerIdentity action.
- return False
+ Inject x-goog-cloud-target-resource into header and serialize the
+ signed request. This will be the subject-token to pass to GCP STS.
+
+ .. _AWS GetCallerIdentity signed request:
+ https://cloud.google.com/iam/docs/access-resources-aws#exchange-token
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests.
+ Returns:
+ str: The retrieved subject token.
+ """
+
+ # Initialize the request signer if not yet initialized after determining
+ # the current AWS region.
+ if self._request_signer is None:
+ self._region = self._aws_security_credentials_supplier.get_aws_region(
+ self._supplier_context, request
+ )
+ self._request_signer = RequestSigner(self._region)
+
+ # Retrieve the AWS security credentials needed to generate the signed
+ # request.
+ aws_security_credentials = self._aws_security_credentials_supplier.get_aws_security_credentials(
+ self._supplier_context, request
+ )
+ # Generate the signed request to AWS STS GetCallerIdentity API.
+ # Use the required regional endpoint. Otherwise, the request will fail.
+ request_options = self._request_signer.get_request_options(
+ aws_security_credentials,
+ self._cred_verification_url.replace("{region}", self._region),
+ "POST",
+ )
+ # The GCP STS endpoint expects the headers to be formatted as:
+ # [
+ # {key: 'x-amz-date', value: '...'},
+ # {key: 'Authorization', value: '...'},
+ # ...
+ # ]
+ # And then serialized as:
+ # quote(json.dumps({
+ # url: '...',
+ # method: 'POST',
+ # headers: [{key: 'x-amz-date', value: '...'}, ...]
+ # }))
+ request_headers = request_options.get("headers")
+ # The full, canonical resource name of the workload identity pool
+ # provider, with or without the HTTPS prefix.
+ # Including this header as part of the signature is recommended to
+ # ensure data integrity.
+ request_headers["x-goog-cloud-target-resource"] = self._target_resource
+
+ # Serialize AWS signed request.
+ aws_signed_req = {}
+ aws_signed_req["url"] = request_options.get("url")
+ aws_signed_req["method"] = request_options.get("method")
+ aws_signed_req["headers"] = []
+ # Reformat header to GCP STS expected format.
+ for key in request_headers.keys():
+ aws_signed_req["headers"].append(
+ {"key": key, "value": request_headers[key]}
+ )
+
+ return urllib.parse.quote(
+ json.dumps(aws_signed_req, separators=(",", ":"), sort_keys=True)
+ )
def _create_default_metrics_options(self):
metrics_options = super(Credentials, self)._create_default_metrics_options()
metrics_options["source"] = "aws"
+ if self._has_custom_supplier():
+ metrics_options["source"] = "programmatic"
return metrics_options
+ def _has_custom_supplier(self):
+ return self._credential_source is None
+
+ def _constructor_args(self):
+ args = super(Credentials, self)._constructor_args()
+ # If a custom supplier was used, append it to the args dict.
+ if self._has_custom_supplier():
+ args.update(
+ {
+ "aws_security_credentials_supplier": self._aws_security_credentials_supplier
+ }
+ )
+ return args
+
@classmethod
def from_info(cls, info, **kwargs):
"""Creates an AWS Credentials instance from parsed external account info.
@@ -761,6 +839,12 @@ class Credentials(external_account.Credentials):
Raises:
ValueError: For invalid parameters.
"""
+ aws_security_credentials_supplier = info.get(
+ "aws_security_credentials_supplier"
+ )
+ kwargs.update(
+ {"aws_security_credentials_supplier": aws_security_credentials_supplier}
+ )
return super(Credentials, cls).from_info(info, **kwargs)
@classmethod