aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/google-auth/py3/google/auth/identity_pool.py
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-04-04 07:45:46 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-04-04 07:53:28 +0300
commit51958dfd22674e02052c8a292ab70fc2d52a07fc (patch)
tree42d2b859c555e9045203791ed3f60fa16e1b267e /contrib/python/google-auth/py3/google/auth/identity_pool.py
parent7e62114667a5059c6b6a617d4cd9076928818478 (diff)
downloadydb-51958dfd22674e02052c8a292ab70fc2d52a07fc.tar.gz
Intermediate changes
Diffstat (limited to 'contrib/python/google-auth/py3/google/auth/identity_pool.py')
-rw-r--r--contrib/python/google-auth/py3/google/auth/identity_pool.py275
1 files changed, 194 insertions, 81 deletions
diff --git a/contrib/python/google-auth/py3/google/auth/identity_pool.py b/contrib/python/google-auth/py3/google/auth/identity_pool.py
index a515353c37..a9ec577334 100644
--- a/contrib/python/google-auth/py3/google/auth/identity_pool.py
+++ b/contrib/python/google-auth/py3/google/auth/identity_pool.py
@@ -26,11 +26,13 @@ long-live service account private keys.
Identity Pool Credentials are initialized using external_account
arguments which are typically loaded from an external credentials file or
-an external credentials URL. Unlike other Credentials that can be initialized
-with a list of explicit arguments, secrets or credentials, external account
-clients use the environment and hints/guidelines provided by the
-external_account JSON file to retrieve credentials and exchange them for Google
-access tokens.
+an external credentials URL.
+
+This module also provides a definition for an abstract subject token supplier.
+This supplier can be implemented to return a valid OIDC or SAML2.0 subject token
+and used to create Identity Pool credentials. The credentials will then call the
+supplier instead of using pre-defined methods such as reading a local file or
+calling a URL.
"""
try:
@@ -38,15 +40,129 @@ try:
# Python 2.7 compatibility
except ImportError: # pragma: NO COVER
from collections import Mapping
-import io
+import abc
import json
import os
+from typing import NamedTuple
from google.auth import _helpers
from google.auth import exceptions
from google.auth import external_account
+class SubjectTokenSupplier(metaclass=abc.ABCMeta):
+ """Base class for subject token suppliers. This can be implemented with custom logic to retrieve
+ a subject token to exchange for a Google Cloud access token when using Workload or
+ Workforce Identity Federation. The identity pool credential does not cache the subject token,
+ so caching logic should be added in the implementation.
+ """
+
+ @abc.abstractmethod
+ def get_subject_token(self, context, request):
+ """Returns the requested subject token. The subject token must be valid.
+
+ .. warning: This is not cached by the calling Google credential, so caching logic should be implemented in the supplier.
+
+ Args:
+ context (google.auth.externalaccount.SupplierContext): The context object
+ containing information about the requested audience and subject token type.
+ request (google.auth.transport.Request): The object used to make
+ HTTP requests.
+
+ Raises:
+ google.auth.exceptions.RefreshError: If an error is encountered during
+ subject token retrieval logic.
+
+ Returns:
+ str: The requested subject token string.
+ """
+ raise NotImplementedError("")
+
+
+class _TokenContent(NamedTuple):
+ """Models the token content response from file and url internal suppliers.
+ Attributes:
+ content (str): The string content of the file or URL response.
+ location (str): The location the content was retrieved from. This will either be a file location or a URL.
+ """
+
+ content: str
+ location: str
+
+
+class _FileSupplier(SubjectTokenSupplier):
+ """ Internal implementation of subject token supplier which supports reading a subject token from a file."""
+
+ def __init__(self, path, format_type, subject_token_field_name):
+ self._path = path
+ self._format_type = format_type
+ self._subject_token_field_name = subject_token_field_name
+
+ @_helpers.copy_docstring(SubjectTokenSupplier)
+ def get_subject_token(self, context, request):
+ if not os.path.exists(self._path):
+ raise exceptions.RefreshError("File '{}' was not found.".format(self._path))
+
+ with open(self._path, "r", encoding="utf-8") as file_obj:
+ token_content = _TokenContent(file_obj.read(), self._path)
+
+ return _parse_token_data(
+ token_content, self._format_type, self._subject_token_field_name
+ )
+
+
+class _UrlSupplier(SubjectTokenSupplier):
+ """ Internal implementation of subject token supplier which supports retrieving a subject token by calling a URL endpoint."""
+
+ def __init__(self, url, format_type, subject_token_field_name, headers):
+ self._url = url
+ self._format_type = format_type
+ self._subject_token_field_name = subject_token_field_name
+ self._headers = headers
+
+ @_helpers.copy_docstring(SubjectTokenSupplier)
+ def get_subject_token(self, context, request):
+ response = request(url=self._url, method="GET", headers=self._headers)
+
+ # support both string and bytes type response.data
+ response_body = (
+ response.data.decode("utf-8")
+ if hasattr(response.data, "decode")
+ else response.data
+ )
+
+ if response.status != 200:
+ raise exceptions.RefreshError(
+ "Unable to retrieve Identity Pool subject token", response_body
+ )
+ token_content = _TokenContent(response_body, self._url)
+ return _parse_token_data(
+ token_content, self._format_type, self._subject_token_field_name
+ )
+
+
+def _parse_token_data(token_content, format_type="text", subject_token_field_name=None):
+ if format_type == "text":
+ token = token_content.content
+ else:
+ try:
+ # Parse file content as JSON.
+ response_data = json.loads(token_content.content)
+ # Get the subject_token.
+ token = response_data[subject_token_field_name]
+ except (KeyError, ValueError):
+ raise exceptions.RefreshError(
+ "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
+ token_content.location, subject_token_field_name
+ )
+ )
+ if not token:
+ raise exceptions.RefreshError(
+ "Missing subject_token in the credential_source file"
+ )
+ return token
+
+
class Credentials(external_account.Credentials):
"""External account credentials sourced from files and URLs."""
@@ -54,8 +170,9 @@ class Credentials(external_account.Credentials):
self,
audience,
subject_token_type,
- token_url,
- credential_source,
+ token_url=external_account._DEFAULT_TOKEN_URL,
+ credential_source=None,
+ subject_token_supplier=None,
*args,
**kwargs
):
@@ -63,11 +180,18 @@ class Credentials(external_account.Credentials):
Args:
audience (str): The STS audience field.
- subject_token_type (str): The subject token type.
- token_url (str): The STS endpoint URL.
- credential_source (Mapping): The credential source dictionary used to
+ subject_token_type (str): The subject token type based on the Oauth2.0 token exchange spec.
+ Expected values include::
+
+ “urn:ietf:params:oauth:token-type:jwt”
+ “urn:ietf:params:oauth:token-type:id-token”
+ “urn:ietf:params:oauth:token-type:saml2”
+
+ token_url (Optional [str]): The STS endpoint URL. If not provided, will default to "https://sts.googleapis.com/v1/token".
+ credential_source (Optional [Mapping]): The credential source dictionary used to
provide instructions on how to retrieve external credential to be
- exchanged for Google access tokens.
+ exchanged for Google access tokens. Either a credential source or
+ a subject token supplier must be provided.
Example credential_source for url-sourced credential::
@@ -85,6 +209,10 @@ class Credentials(external_account.Credentials):
{
"file": "/path/to/token/file.txt"
}
+ subject_token_supplier (Optional [SubjectTokenSupplier]): Optional subject token supplier.
+ This will be called to supply a valid subject token which will then
+ be exchanged for Google access tokens. Either a subject token supplier
+ or a credential source must be provided.
args (List): Optional positional arguments passed into the underlying :meth:`~external_account.Credentials.__init__` method.
kwargs (Mapping): Optional keyword arguments passed into the underlying :meth:`~external_account.Credentials.__init__` method.
@@ -106,10 +234,25 @@ class Credentials(external_account.Credentials):
*args,
**kwargs
)
- if not isinstance(credential_source, Mapping):
+ if credential_source is None and subject_token_supplier is None:
+ raise exceptions.InvalidValue(
+ "A valid credential source or a subject token supplier must be provided."
+ )
+ if credential_source is not None and subject_token_supplier is not None:
+ raise exceptions.InvalidValue(
+ "Identity pool credential cannot have both a credential source and a subject token supplier."
+ )
+
+ if subject_token_supplier is not None:
+ self._subject_token_supplier = subject_token_supplier
self._credential_source_file = None
self._credential_source_url = None
else:
+ if not isinstance(credential_source, Mapping):
+ self._credential_source_executable = None
+ raise exceptions.MalformedError(
+ "Invalid credential_source. The credential_source is not a dict."
+ )
self._credential_source_file = credential_source.get("file")
self._credential_source_url = credential_source.get("url")
self._credential_source_headers = credential_source.get("headers")
@@ -143,79 +286,35 @@ class Credentials(external_account.Credentials):
else:
self._credential_source_field_name = None
- if self._credential_source_file and self._credential_source_url:
- raise exceptions.MalformedError(
- "Ambiguous credential_source. 'file' is mutually exclusive with 'url'."
- )
- if not self._credential_source_file and not self._credential_source_url:
- raise exceptions.MalformedError(
- "Missing credential_source. A 'file' or 'url' must be provided."
- )
+ if self._credential_source_file and self._credential_source_url:
+ raise exceptions.MalformedError(
+ "Ambiguous credential_source. 'file' is mutually exclusive with 'url'."
+ )
+ if not self._credential_source_file and not self._credential_source_url:
+ raise exceptions.MalformedError(
+ "Missing credential_source. A 'file' or 'url' must be provided."
+ )
+
+ if self._credential_source_file:
+ self._subject_token_supplier = _FileSupplier(
+ self._credential_source_file,
+ self._credential_source_format_type,
+ self._credential_source_field_name,
+ )
+ else:
+ self._subject_token_supplier = _UrlSupplier(
+ self._credential_source_url,
+ self._credential_source_format_type,
+ self._credential_source_field_name,
+ self._credential_source_headers,
+ )
@_helpers.copy_docstring(external_account.Credentials)
def retrieve_subject_token(self, request):
- return self._parse_token_data(
- self._get_token_data(request),
- self._credential_source_format_type,
- self._credential_source_field_name,
- )
-
- def _get_token_data(self, request):
- if self._credential_source_file:
- return self._get_file_data(self._credential_source_file)
- else:
- return self._get_url_data(
- request, self._credential_source_url, self._credential_source_headers
- )
-
- def _get_file_data(self, filename):
- if not os.path.exists(filename):
- raise exceptions.RefreshError("File '{}' was not found.".format(filename))
-
- with io.open(filename, "r", encoding="utf-8") as file_obj:
- return file_obj.read(), filename
-
- def _get_url_data(self, request, url, headers):
- response = request(url=url, method="GET", headers=headers)
-
- # support both string and bytes type response.data
- response_body = (
- response.data.decode("utf-8")
- if hasattr(response.data, "decode")
- else response.data
+ return self._subject_token_supplier.get_subject_token(
+ self._supplier_context, request
)
- if response.status != 200:
- raise exceptions.RefreshError(
- "Unable to retrieve Identity Pool subject token", response_body
- )
-
- return response_body, url
-
- def _parse_token_data(
- self, token_content, format_type="text", subject_token_field_name=None
- ):
- content, filename = token_content
- if format_type == "text":
- token = content
- else:
- try:
- # Parse file content as JSON.
- response_data = json.loads(content)
- # Get the subject_token.
- token = response_data[subject_token_field_name]
- except (KeyError, ValueError):
- raise exceptions.RefreshError(
- "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
- filename, subject_token_field_name
- )
- )
- if not token:
- raise exceptions.RefreshError(
- "Missing subject_token in the credential_source file"
- )
- return token
-
def _create_default_metrics_options(self):
metrics_options = super(Credentials, self)._create_default_metrics_options()
# Check that credential source is a dict before checking for file vs url. This check needs to be done
@@ -226,8 +325,20 @@ class Credentials(external_account.Credentials):
metrics_options["source"] = "file"
else:
metrics_options["source"] = "url"
+ else:
+ metrics_options["source"] = "programmatic"
return metrics_options
+ def _has_custom_supplier(self):
+ return self._credential_source is None
+
+ def _constructor_args(self):
+ args = super(Credentials, self)._constructor_args()
+ # If a custom supplier was used, append it to the args dict.
+ if self._has_custom_supplier():
+ args.update({"subject_token_supplier": self._subject_token_supplier})
+ return args
+
@classmethod
def from_info(cls, info, **kwargs):
"""Creates an Identity Pool Credentials instance from parsed external account info.
@@ -244,6 +355,8 @@ class Credentials(external_account.Credentials):
Raises:
ValueError: For invalid parameters.
"""
+ subject_token_supplier = info.get("subject_token_supplier")
+ kwargs.update({"subject_token_supplier": subject_token_supplier})
return super(Credentials, cls).from_info(info, **kwargs)
@classmethod