aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/google-auth/py3/tests
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-07-23 07:47:34 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-07-23 07:56:54 +0300
commite9497f65d8bbe478a583fa31b74e03a8d342ff5f (patch)
tree48793f79ab80ae9a82e77744ed89f82aeee8e59a /contrib/python/google-auth/py3/tests
parent973312112362cf03ce0f7d4e5fdbcb63d1166670 (diff)
downloadydb-e9497f65d8bbe478a583fa31b74e03a8d342ff5f.tar.gz
Intermediate changes
Diffstat (limited to 'contrib/python/google-auth/py3/tests')
-rw-r--r--contrib/python/google-auth/py3/tests/test_external_account.py161
-rw-r--r--contrib/python/google-auth/py3/tests/test_identity_pool.py129
2 files changed, 286 insertions, 4 deletions
diff --git a/contrib/python/google-auth/py3/tests/test_external_account.py b/contrib/python/google-auth/py3/tests/test_external_account.py
index c458b21b643..3c372e6291c 100644
--- a/contrib/python/google-auth/py3/tests/test_external_account.py
+++ b/contrib/python/google-auth/py3/tests/test_external_account.py
@@ -235,10 +235,16 @@ class TestCredentials(object):
return request
@classmethod
- def assert_token_request_kwargs(cls, request_kwargs, headers, request_data):
+ def assert_token_request_kwargs(
+ cls, request_kwargs, headers, request_data, cert=None
+ ):
assert request_kwargs["url"] == cls.TOKEN_URL
assert request_kwargs["method"] == "POST"
assert request_kwargs["headers"] == headers
+ if cert is not None:
+ assert request_kwargs["cert"] == cert
+ else:
+ assert "cert" not in request_kwargs
assert request_kwargs["body"] is not None
body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
for (k, v) in body_tuples:
@@ -246,10 +252,16 @@ class TestCredentials(object):
assert len(body_tuples) == len(request_data.keys())
@classmethod
- def assert_impersonation_request_kwargs(cls, request_kwargs, headers, request_data):
+ def assert_impersonation_request_kwargs(
+ cls, request_kwargs, headers, request_data, cert=None
+ ):
assert request_kwargs["url"] == cls.SERVICE_ACCOUNT_IMPERSONATION_URL
assert request_kwargs["method"] == "POST"
assert request_kwargs["headers"] == headers
+ if cert is not None:
+ assert request_kwargs["cert"] == cert
+ else:
+ assert "cert" not in request_kwargs
assert request_kwargs["body"] is not None
body_json = json.loads(request_kwargs["body"].decode("utf-8"))
assert body_json == request_data
@@ -670,6 +682,56 @@ class TestCredentials(object):
return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
)
@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
+ @mock.patch(
+ "google.auth.external_account.Credentials._mtls_required", return_value=True
+ )
+ @mock.patch(
+ "google.auth.external_account.Credentials._get_mtls_cert_and_key_paths",
+ return_value=("path/to/cert.pem", "path/to/key.pem"),
+ )
+ def test_refresh_with_mtls(
+ self,
+ mock_get_mtls_cert_and_key_paths,
+ mock_mtls_required,
+ unused_utcnow,
+ mock_auth_lib_value,
+ ):
+ response = self.SUCCESS_RESPONSE.copy()
+ # Test custom expiration to confirm expiry is set correctly.
+ response["expires_in"] = 2800
+ expected_expiry = datetime.datetime.min + datetime.timedelta(
+ seconds=response["expires_in"]
+ )
+ headers = {
+ "Content-Type": "application/x-www-form-urlencoded",
+ "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false",
+ }
+ request_data = {
+ "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
+ "audience": self.AUDIENCE,
+ "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
+ "subject_token": "subject_token_0",
+ "subject_token_type": self.SUBJECT_TOKEN_TYPE,
+ }
+ request = self.make_mock_request(status=http_client.OK, data=response)
+ credentials = self.make_credentials()
+
+ credentials.refresh(request)
+
+ expected_cert_path = ("path/to/cert.pem", "path/to/key.pem")
+ self.assert_token_request_kwargs(
+ request.call_args[1], headers, request_data, expected_cert_path
+ )
+ assert credentials.valid
+ assert credentials.expiry == expected_expiry
+ assert not credentials.expired
+ assert credentials.token == response["access_token"]
+
+ @mock.patch(
+ "google.auth.metrics.python_and_auth_lib_version",
+ return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
+ )
+ @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
def test_refresh_workforce_without_client_auth_success(
self, unused_utcnow, test_auth_lib_value
):
@@ -877,6 +939,101 @@ class TestCredentials(object):
"google.auth.metrics.python_and_auth_lib_version",
return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
)
+ @mock.patch(
+ "google.auth.external_account.Credentials._mtls_required", return_value=True
+ )
+ @mock.patch(
+ "google.auth.external_account.Credentials._get_mtls_cert_and_key_paths",
+ return_value=("path/to/cert.pem", "path/to/key.pem"),
+ )
+ def test_refresh_impersonation_with_mtls_success(
+ self,
+ mock_get_mtls_cert_and_key_paths,
+ mock_mtls_required,
+ mock_metrics_header_value,
+ mock_auth_lib_value,
+ ):
+ # Simulate service account access token expires in 2800 seconds.
+ expire_time = (
+ _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
+ ).isoformat("T") + "Z"
+ expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
+ # STS token exchange request/response.
+ token_response = self.SUCCESS_RESPONSE.copy()
+ token_headers = {
+ "Content-Type": "application/x-www-form-urlencoded",
+ "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/false",
+ }
+ token_request_data = {
+ "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
+ "audience": self.AUDIENCE,
+ "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
+ "subject_token": "subject_token_0",
+ "subject_token_type": self.SUBJECT_TOKEN_TYPE,
+ "scope": "https://www.googleapis.com/auth/iam",
+ }
+ # Service account impersonation request/response.
+ impersonation_response = {
+ "accessToken": "SA_ACCESS_TOKEN",
+ "expireTime": expire_time,
+ }
+ impersonation_headers = {
+ "Content-Type": "application/json",
+ "authorization": "Bearer {}".format(token_response["access_token"]),
+ "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
+ "x-allowed-locations": "0x0",
+ }
+ impersonation_request_data = {
+ "delegates": None,
+ "scope": self.SCOPES,
+ "lifetime": "3600s",
+ }
+ # Initialize mock request to handle token exchange and service account
+ # impersonation request.
+ request = self.make_mock_request(
+ status=http_client.OK,
+ data=token_response,
+ impersonation_status=http_client.OK,
+ impersonation_data=impersonation_response,
+ )
+ # Initialize credentials with service account impersonation.
+ credentials = self.make_credentials(
+ service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
+ scopes=self.SCOPES,
+ )
+
+ credentials.refresh(request)
+
+ # Only 2 requests should be processed.
+ assert len(request.call_args_list) == 2
+ # Verify token exchange request parameters.
+ expected_cert_paths = ("path/to/cert.pem", "path/to/key.pem")
+ self.assert_token_request_kwargs(
+ request.call_args_list[0][1],
+ token_headers,
+ token_request_data,
+ expected_cert_paths,
+ )
+ # Verify service account impersonation request parameters.
+ self.assert_impersonation_request_kwargs(
+ request.call_args_list[1][1],
+ impersonation_headers,
+ impersonation_request_data,
+ expected_cert_paths,
+ )
+ assert credentials.valid
+ assert credentials.expiry == expected_expiry
+ assert not credentials.expired
+ assert credentials.token == impersonation_response["accessToken"]
+
+ @mock.patch(
+ "google.auth.metrics.token_request_access_token_impersonate",
+ return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
+ )
+ @mock.patch(
+ "google.auth.metrics.python_and_auth_lib_version",
+ return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
+ )
def test_refresh_workforce_impersonation_without_client_auth_success(
self, mock_metrics_header_value, mock_auth_lib_value
):
diff --git a/contrib/python/google-auth/py3/tests/test_identity_pool.py b/contrib/python/google-auth/py3/tests/test_identity_pool.py
index e4efe46c6bb..cc6cbf08827 100644
--- a/contrib/python/google-auth/py3/tests/test_identity_pool.py
+++ b/contrib/python/google-auth/py3/tests/test_identity_pool.py
@@ -180,6 +180,12 @@ class TestCredentials(object):
"url": CREDENTIAL_URL,
"format": {"type": "json", "subject_token_field_name": "access_token"},
}
+ CREDENTIAL_SOURCE_CERTIFICATE = {
+ "certificate": {"use_default_certificate_config": "true"}
+ }
+ CREDENTIAL_SOURCE_CERTIFICATE_NOT_DEFAULT = {
+ "certificate": {"certificate_config_location": "path/to/config"}
+ }
SUCCESS_RESPONSE = {
"access_token": "ACCESS_TOKEN",
"issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
@@ -678,6 +684,40 @@ class TestCredentials(object):
assert excinfo.match(r"Ambiguous credential_source")
+ def test_constructor_invalid_options_url_and_certificate(self):
+ credential_source = {
+ "url": self.CREDENTIAL_URL,
+ "certificate": {"certificate": {"use_default_certificate_config": True}},
+ }
+
+ with pytest.raises(ValueError) as excinfo:
+ self.make_credentials(credential_source=credential_source)
+
+ assert excinfo.match(r"Ambiguous credential_source")
+
+ def test_constructor_invalid_options_file_and_certificate(self):
+ credential_source = {
+ "file": SUBJECT_TOKEN_TEXT_FILE,
+ "certificate": {"certificate": {"use_default_certificate": True}},
+ }
+
+ with pytest.raises(ValueError) as excinfo:
+ self.make_credentials(credential_source=credential_source)
+
+ assert excinfo.match(r"Ambiguous credential_source")
+
+ def test_constructor_invalid_options_url_file_and_certificate(self):
+ credential_source = {
+ "file": SUBJECT_TOKEN_TEXT_FILE,
+ "url": self.CREDENTIAL_URL,
+ "certificate": {"certificate": {"use_default_certificate": True}},
+ }
+
+ with pytest.raises(ValueError) as excinfo:
+ self.make_credentials(credential_source=credential_source)
+
+ assert excinfo.match(r"Ambiguous credential_source")
+
def test_constructor_invalid_options_environment_id(self):
credential_source = {"url": self.CREDENTIAL_URL, "environment_id": "aws1"}
@@ -717,7 +757,7 @@ class TestCredentials(object):
)
def test_constructor_invalid_credential_source_format_type(self):
- credential_source = {"format": {"type": "xml"}}
+ credential_source = {"file": "test.txt", "format": {"type": "xml"}}
with pytest.raises(ValueError) as excinfo:
self.make_credentials(credential_source=credential_source)
@@ -725,7 +765,7 @@ class TestCredentials(object):
assert excinfo.match(r"Invalid credential_source format 'xml'")
def test_constructor_missing_subject_token_field_name(self):
- credential_source = {"format": {"type": "json"}}
+ credential_source = {"file": "test.txt", "format": {"type": "json"}}
with pytest.raises(ValueError) as excinfo:
self.make_credentials(credential_source=credential_source)
@@ -734,6 +774,27 @@ class TestCredentials(object):
r"Missing subject_token_field_name for JSON credential_source format"
)
+ def test_constructor_default_and_file_location_certificate(self):
+ credential_source = {
+ "certificate": {
+ "use_default_certificate_config": True,
+ "certificate_config_location": "test",
+ }
+ }
+
+ with pytest.raises(ValueError) as excinfo:
+ self.make_credentials(credential_source=credential_source)
+
+ assert excinfo.match(r"Invalid certificate configuration")
+
+ def test_constructor_no_default_or_file_location_certificate(self):
+ credential_source = {"certificate": {"use_default_certificate_config": False}}
+
+ with pytest.raises(ValueError) as excinfo:
+ self.make_credentials(credential_source=credential_source)
+
+ assert excinfo.match(r"Invalid certificate configuration")
+
def test_info_with_workforce_pool_user_project(self):
credentials = self.make_credentials(
audience=WORKFORCE_AUDIENCE,
@@ -783,6 +844,36 @@ class TestCredentials(object):
"universe_domain": DEFAULT_UNIVERSE_DOMAIN,
}
+ def test_info_with_certificate_credential_source(self):
+ credentials = self.make_credentials(
+ credential_source=self.CREDENTIAL_SOURCE_CERTIFICATE.copy()
+ )
+
+ assert credentials.info == {
+ "type": "external_account",
+ "audience": AUDIENCE,
+ "subject_token_type": SUBJECT_TOKEN_TYPE,
+ "token_url": TOKEN_URL,
+ "token_info_url": TOKEN_INFO_URL,
+ "credential_source": self.CREDENTIAL_SOURCE_CERTIFICATE,
+ "universe_domain": DEFAULT_UNIVERSE_DOMAIN,
+ }
+
+ def test_info_with_non_default_certificate_credential_source(self):
+ credentials = self.make_credentials(
+ credential_source=self.CREDENTIAL_SOURCE_CERTIFICATE_NOT_DEFAULT.copy()
+ )
+
+ assert credentials.info == {
+ "type": "external_account",
+ "audience": AUDIENCE,
+ "subject_token_type": SUBJECT_TOKEN_TYPE,
+ "token_url": TOKEN_URL,
+ "token_info_url": TOKEN_INFO_URL,
+ "credential_source": self.CREDENTIAL_SOURCE_CERTIFICATE_NOT_DEFAULT,
+ "universe_domain": DEFAULT_UNIVERSE_DOMAIN,
+ }
+
def test_info_with_default_token_url(self):
credentials = identity_pool.Credentials(
audience=AUDIENCE,
@@ -846,6 +937,15 @@ class TestCredentials(object):
assert subject_token == JSON_FILE_SUBJECT_TOKEN
+ def test_retrieve_subject_token_certificate(self):
+ credentials = self.make_credentials(
+ credential_source=self.CREDENTIAL_SOURCE_CERTIFICATE
+ )
+
+ subject_token = credentials.retrieve_subject_token(None)
+
+ assert subject_token == ""
+
def test_retrieve_subject_token_json_file_invalid_field_name(self):
credential_source = {
"file": SUBJECT_TOKEN_JSON_FILE,
@@ -1486,3 +1586,28 @@ class TestCredentials(object):
scopes=SCOPES,
default_scopes=None,
)
+
+ @mock.patch(
+ "google.auth.transport._mtls_helper._get_workload_cert_and_key_paths",
+ return_value=("cert", "key"),
+ )
+ def test_get_mtls_certs(self, mock_get_workload_cert_and_key_paths):
+ credentials = self.make_credentials(
+ credential_source=self.CREDENTIAL_SOURCE_CERTIFICATE.copy()
+ )
+
+ cert, key = credentials._get_mtls_cert_and_key_paths()
+ assert cert == "cert"
+ assert key == "key"
+
+ def test_get_mtls_certs_invalid(self):
+ credentials = self.make_credentials(
+ credential_source=self.CREDENTIAL_SOURCE_TEXT.copy()
+ )
+
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ credentials._get_mtls_cert_and_key_paths()
+
+ assert excinfo.match(
+ 'The credential is not configured to use mtls requests. The credential should include a "certificate" section in the credential source.'
+ )