diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-04-04 07:45:46 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-04-04 07:53:28 +0300 |
commit | 51958dfd22674e02052c8a292ab70fc2d52a07fc (patch) | |
tree | 42d2b859c555e9045203791ed3f60fa16e1b267e /contrib/python/google-auth/py3/google/auth/identity_pool.py | |
parent | 7e62114667a5059c6b6a617d4cd9076928818478 (diff) | |
download | ydb-51958dfd22674e02052c8a292ab70fc2d52a07fc.tar.gz |
Intermediate changes
Diffstat (limited to 'contrib/python/google-auth/py3/google/auth/identity_pool.py')
-rw-r--r-- | contrib/python/google-auth/py3/google/auth/identity_pool.py | 275 |
1 files changed, 194 insertions, 81 deletions
diff --git a/contrib/python/google-auth/py3/google/auth/identity_pool.py b/contrib/python/google-auth/py3/google/auth/identity_pool.py index a515353c37..a9ec577334 100644 --- a/contrib/python/google-auth/py3/google/auth/identity_pool.py +++ b/contrib/python/google-auth/py3/google/auth/identity_pool.py @@ -26,11 +26,13 @@ long-live service account private keys. Identity Pool Credentials are initialized using external_account arguments which are typically loaded from an external credentials file or -an external credentials URL. Unlike other Credentials that can be initialized -with a list of explicit arguments, secrets or credentials, external account -clients use the environment and hints/guidelines provided by the -external_account JSON file to retrieve credentials and exchange them for Google -access tokens. +an external credentials URL. + +This module also provides a definition for an abstract subject token supplier. +This supplier can be implemented to return a valid OIDC or SAML2.0 subject token +and used to create Identity Pool credentials. The credentials will then call the +supplier instead of using pre-defined methods such as reading a local file or +calling a URL. """ try: @@ -38,15 +40,129 @@ try: # Python 2.7 compatibility except ImportError: # pragma: NO COVER from collections import Mapping -import io +import abc import json import os +from typing import NamedTuple from google.auth import _helpers from google.auth import exceptions from google.auth import external_account +class SubjectTokenSupplier(metaclass=abc.ABCMeta): + """Base class for subject token suppliers. This can be implemented with custom logic to retrieve + a subject token to exchange for a Google Cloud access token when using Workload or + Workforce Identity Federation. The identity pool credential does not cache the subject token, + so caching logic should be added in the implementation. + """ + + @abc.abstractmethod + def get_subject_token(self, context, request): + """Returns the requested subject token. The subject token must be valid. + + .. warning: This is not cached by the calling Google credential, so caching logic should be implemented in the supplier. + + Args: + context (google.auth.externalaccount.SupplierContext): The context object + containing information about the requested audience and subject token type. + request (google.auth.transport.Request): The object used to make + HTTP requests. + + Raises: + google.auth.exceptions.RefreshError: If an error is encountered during + subject token retrieval logic. + + Returns: + str: The requested subject token string. + """ + raise NotImplementedError("") + + +class _TokenContent(NamedTuple): + """Models the token content response from file and url internal suppliers. + Attributes: + content (str): The string content of the file or URL response. + location (str): The location the content was retrieved from. This will either be a file location or a URL. + """ + + content: str + location: str + + +class _FileSupplier(SubjectTokenSupplier): + """ Internal implementation of subject token supplier which supports reading a subject token from a file.""" + + def __init__(self, path, format_type, subject_token_field_name): + self._path = path + self._format_type = format_type + self._subject_token_field_name = subject_token_field_name + + @_helpers.copy_docstring(SubjectTokenSupplier) + def get_subject_token(self, context, request): + if not os.path.exists(self._path): + raise exceptions.RefreshError("File '{}' was not found.".format(self._path)) + + with open(self._path, "r", encoding="utf-8") as file_obj: + token_content = _TokenContent(file_obj.read(), self._path) + + return _parse_token_data( + token_content, self._format_type, self._subject_token_field_name + ) + + +class _UrlSupplier(SubjectTokenSupplier): + """ Internal implementation of subject token supplier which supports retrieving a subject token by calling a URL endpoint.""" + + def __init__(self, url, format_type, subject_token_field_name, headers): + self._url = url + self._format_type = format_type + self._subject_token_field_name = subject_token_field_name + self._headers = headers + + @_helpers.copy_docstring(SubjectTokenSupplier) + def get_subject_token(self, context, request): + response = request(url=self._url, method="GET", headers=self._headers) + + # support both string and bytes type response.data + response_body = ( + response.data.decode("utf-8") + if hasattr(response.data, "decode") + else response.data + ) + + if response.status != 200: + raise exceptions.RefreshError( + "Unable to retrieve Identity Pool subject token", response_body + ) + token_content = _TokenContent(response_body, self._url) + return _parse_token_data( + token_content, self._format_type, self._subject_token_field_name + ) + + +def _parse_token_data(token_content, format_type="text", subject_token_field_name=None): + if format_type == "text": + token = token_content.content + else: + try: + # Parse file content as JSON. + response_data = json.loads(token_content.content) + # Get the subject_token. + token = response_data[subject_token_field_name] + except (KeyError, ValueError): + raise exceptions.RefreshError( + "Unable to parse subject_token from JSON file '{}' using key '{}'".format( + token_content.location, subject_token_field_name + ) + ) + if not token: + raise exceptions.RefreshError( + "Missing subject_token in the credential_source file" + ) + return token + + class Credentials(external_account.Credentials): """External account credentials sourced from files and URLs.""" @@ -54,8 +170,9 @@ class Credentials(external_account.Credentials): self, audience, subject_token_type, - token_url, - credential_source, + token_url=external_account._DEFAULT_TOKEN_URL, + credential_source=None, + subject_token_supplier=None, *args, **kwargs ): @@ -63,11 +180,18 @@ class Credentials(external_account.Credentials): Args: audience (str): The STS audience field. - subject_token_type (str): The subject token type. - token_url (str): The STS endpoint URL. - credential_source (Mapping): The credential source dictionary used to + subject_token_type (str): The subject token type based on the Oauth2.0 token exchange spec. + Expected values include:: + + “urn:ietf:params:oauth:token-type:jwt” + “urn:ietf:params:oauth:token-type:id-token” + “urn:ietf:params:oauth:token-type:saml2” + + token_url (Optional [str]): The STS endpoint URL. If not provided, will default to "https://sts.googleapis.com/v1/token". + credential_source (Optional [Mapping]): The credential source dictionary used to provide instructions on how to retrieve external credential to be - exchanged for Google access tokens. + exchanged for Google access tokens. Either a credential source or + a subject token supplier must be provided. Example credential_source for url-sourced credential:: @@ -85,6 +209,10 @@ class Credentials(external_account.Credentials): { "file": "/path/to/token/file.txt" } + subject_token_supplier (Optional [SubjectTokenSupplier]): Optional subject token supplier. + This will be called to supply a valid subject token which will then + be exchanged for Google access tokens. Either a subject token supplier + or a credential source must be provided. args (List): Optional positional arguments passed into the underlying :meth:`~external_account.Credentials.__init__` method. kwargs (Mapping): Optional keyword arguments passed into the underlying :meth:`~external_account.Credentials.__init__` method. @@ -106,10 +234,25 @@ class Credentials(external_account.Credentials): *args, **kwargs ) - if not isinstance(credential_source, Mapping): + if credential_source is None and subject_token_supplier is None: + raise exceptions.InvalidValue( + "A valid credential source or a subject token supplier must be provided." + ) + if credential_source is not None and subject_token_supplier is not None: + raise exceptions.InvalidValue( + "Identity pool credential cannot have both a credential source and a subject token supplier." + ) + + if subject_token_supplier is not None: + self._subject_token_supplier = subject_token_supplier self._credential_source_file = None self._credential_source_url = None else: + if not isinstance(credential_source, Mapping): + self._credential_source_executable = None + raise exceptions.MalformedError( + "Invalid credential_source. The credential_source is not a dict." + ) self._credential_source_file = credential_source.get("file") self._credential_source_url = credential_source.get("url") self._credential_source_headers = credential_source.get("headers") @@ -143,79 +286,35 @@ class Credentials(external_account.Credentials): else: self._credential_source_field_name = None - if self._credential_source_file and self._credential_source_url: - raise exceptions.MalformedError( - "Ambiguous credential_source. 'file' is mutually exclusive with 'url'." - ) - if not self._credential_source_file and not self._credential_source_url: - raise exceptions.MalformedError( - "Missing credential_source. A 'file' or 'url' must be provided." - ) + if self._credential_source_file and self._credential_source_url: + raise exceptions.MalformedError( + "Ambiguous credential_source. 'file' is mutually exclusive with 'url'." + ) + if not self._credential_source_file and not self._credential_source_url: + raise exceptions.MalformedError( + "Missing credential_source. A 'file' or 'url' must be provided." + ) + + if self._credential_source_file: + self._subject_token_supplier = _FileSupplier( + self._credential_source_file, + self._credential_source_format_type, + self._credential_source_field_name, + ) + else: + self._subject_token_supplier = _UrlSupplier( + self._credential_source_url, + self._credential_source_format_type, + self._credential_source_field_name, + self._credential_source_headers, + ) @_helpers.copy_docstring(external_account.Credentials) def retrieve_subject_token(self, request): - return self._parse_token_data( - self._get_token_data(request), - self._credential_source_format_type, - self._credential_source_field_name, - ) - - def _get_token_data(self, request): - if self._credential_source_file: - return self._get_file_data(self._credential_source_file) - else: - return self._get_url_data( - request, self._credential_source_url, self._credential_source_headers - ) - - def _get_file_data(self, filename): - if not os.path.exists(filename): - raise exceptions.RefreshError("File '{}' was not found.".format(filename)) - - with io.open(filename, "r", encoding="utf-8") as file_obj: - return file_obj.read(), filename - - def _get_url_data(self, request, url, headers): - response = request(url=url, method="GET", headers=headers) - - # support both string and bytes type response.data - response_body = ( - response.data.decode("utf-8") - if hasattr(response.data, "decode") - else response.data + return self._subject_token_supplier.get_subject_token( + self._supplier_context, request ) - if response.status != 200: - raise exceptions.RefreshError( - "Unable to retrieve Identity Pool subject token", response_body - ) - - return response_body, url - - def _parse_token_data( - self, token_content, format_type="text", subject_token_field_name=None - ): - content, filename = token_content - if format_type == "text": - token = content - else: - try: - # Parse file content as JSON. - response_data = json.loads(content) - # Get the subject_token. - token = response_data[subject_token_field_name] - except (KeyError, ValueError): - raise exceptions.RefreshError( - "Unable to parse subject_token from JSON file '{}' using key '{}'".format( - filename, subject_token_field_name - ) - ) - if not token: - raise exceptions.RefreshError( - "Missing subject_token in the credential_source file" - ) - return token - def _create_default_metrics_options(self): metrics_options = super(Credentials, self)._create_default_metrics_options() # Check that credential source is a dict before checking for file vs url. This check needs to be done @@ -226,8 +325,20 @@ class Credentials(external_account.Credentials): metrics_options["source"] = "file" else: metrics_options["source"] = "url" + else: + metrics_options["source"] = "programmatic" return metrics_options + def _has_custom_supplier(self): + return self._credential_source is None + + def _constructor_args(self): + args = super(Credentials, self)._constructor_args() + # If a custom supplier was used, append it to the args dict. + if self._has_custom_supplier(): + args.update({"subject_token_supplier": self._subject_token_supplier}) + return args + @classmethod def from_info(cls, info, **kwargs): """Creates an Identity Pool Credentials instance from parsed external account info. @@ -244,6 +355,8 @@ class Credentials(external_account.Credentials): Raises: ValueError: For invalid parameters. """ + subject_token_supplier = info.get("subject_token_supplier") + kwargs.update({"subject_token_supplier": subject_token_supplier}) return super(Credentials, cls).from_info(info, **kwargs) @classmethod |