summaryrefslogtreecommitdiffstats
path: root/contrib/python/google-auth/py3/tests
diff options
context:
space:
mode:
authorrobot-piglet <[email protected]>2024-06-21 09:28:26 +0300
committerrobot-piglet <[email protected]>2024-06-21 09:36:40 +0300
commit0cb3f820fac6a243bcb7e4c4388700898660bfd0 (patch)
tree056f1b8bc5f72039fa422aac0af13bab0e966aa7 /contrib/python/google-auth/py3/tests
parent08049311fe5c42a97e8bb47a73fb6cd143c0bdb1 (diff)
Intermediate changes
Diffstat (limited to 'contrib/python/google-auth/py3/tests')
-rw-r--r--contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py4
-rw-r--r--contrib/python/google-auth/py3/tests/oauth2/test__client.py13
-rw-r--r--contrib/python/google-auth/py3/tests/oauth2/test_challenges.py153
-rw-r--r--contrib/python/google-auth/py3/tests/oauth2/test_service_account.py29
-rw-r--r--contrib/python/google-auth/py3/tests/oauth2/test_webauthn_handler.py148
-rw-r--r--contrib/python/google-auth/py3/tests/oauth2/test_webauthn_handler_factory.py29
-rw-r--r--contrib/python/google-auth/py3/tests/oauth2/test_webauthn_types.py237
-rw-r--r--contrib/python/google-auth/py3/tests/test_aws.py33
-rw-r--r--contrib/python/google-auth/py3/tests/test_identity_pool.py33
-rw-r--r--contrib/python/google-auth/py3/tests/transport/test__custom_tls_signer.py2
-rw-r--r--contrib/python/google-auth/py3/tests/transport/test_requests.py35
11 files changed, 711 insertions, 5 deletions
diff --git a/contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py b/contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py
index 9cca317924e..bb29f8c6e2b 100644
--- a/contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py
+++ b/contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py
@@ -499,7 +499,7 @@ class TestIDTokenCredentials(object):
responses.add(
responses.POST,
"https://iamcredentials.googleapis.com/v1/projects/-/"
- "serviceAccounts/[email protected]:signBlob?alt=json",
+ "serviceAccounts/[email protected]:signBlob",
status=200,
content_type="application/json",
json={"keyId": "some-key-id", "signedBlob": signature},
@@ -657,7 +657,7 @@ class TestIDTokenCredentials(object):
responses.add(
responses.POST,
"https://iamcredentials.googleapis.com/v1/projects/-/"
- "serviceAccounts/[email protected]:signBlob?alt=json",
+ "serviceAccounts/[email protected]:signBlob",
status=200,
content_type="application/json",
json={"keyId": "some-key-id", "signedBlob": signature},
diff --git a/contrib/python/google-auth/py3/tests/oauth2/test__client.py b/contrib/python/google-auth/py3/tests/oauth2/test__client.py
index 444232f3967..f9a2d3aff49 100644
--- a/contrib/python/google-auth/py3/tests/oauth2/test__client.py
+++ b/contrib/python/google-auth/py3/tests/oauth2/test__client.py
@@ -24,6 +24,7 @@ import pytest # type: ignore
from google.auth import _helpers
from google.auth import crypt
from google.auth import exceptions
+from google.auth import iam
from google.auth import jwt
from google.auth import transport
from google.oauth2 import _client
@@ -319,7 +320,11 @@ def test_call_iam_generate_id_token_endpoint():
request = make_request({"token": id_token})
token, expiry = _client.call_iam_generate_id_token_endpoint(
- request, "fake_email", "fake_audience", "fake_access_token"
+ request,
+ iam._IAM_IDTOKEN_ENDPOINT,
+ "fake_email",
+ "fake_audience",
+ "fake_access_token",
)
assert (
@@ -352,7 +357,11 @@ def test_call_iam_generate_id_token_endpoint_no_id_token():
with pytest.raises(exceptions.RefreshError) as excinfo:
_client.call_iam_generate_id_token_endpoint(
- request, "fake_email", "fake_audience", "fake_access_token"
+ request,
+ iam._IAM_IDTOKEN_ENDPOINT,
+ "fake_email",
+ "fake_audience",
+ "fake_access_token",
)
assert excinfo.match("No ID token in response")
diff --git a/contrib/python/google-auth/py3/tests/oauth2/test_challenges.py b/contrib/python/google-auth/py3/tests/oauth2/test_challenges.py
index a06f5528377..4116b913abd 100644
--- a/contrib/python/google-auth/py3/tests/oauth2/test_challenges.py
+++ b/contrib/python/google-auth/py3/tests/oauth2/test_challenges.py
@@ -15,6 +15,7 @@
"""Tests for the reauth module."""
import base64
+import os
import sys
import mock
@@ -23,6 +24,13 @@ import pyu2f # type: ignore
from google.auth import exceptions
from google.oauth2 import challenges
+from google.oauth2.webauthn_types import (
+ AuthenticationExtensionsClientInputs,
+ AuthenticatorAssertionResponse,
+ GetRequest,
+ GetResponse,
+ PublicKeyCredentialDescriptor,
+)
def test_get_user_password():
@@ -54,6 +62,8 @@ def test_security_key():
# Test the case that security key challenge is passed with applicationId and
# relyingPartyId the same.
+ os.environ.pop('"GOOGLE_AUTH_WEBAUTHN_PLUGIN"', None)
+
with mock.patch("pyu2f.model.RegisteredKey", return_value=mock_key):
with mock.patch(
"pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate"
@@ -70,6 +80,19 @@ def test_security_key():
print_callback=sys.stderr.write,
)
+ # Test the case that webauthn plugin is available
+ os.environ["GOOGLE_AUTH_WEBAUTHN_PLUGIN"] = "plugin"
+
+ with mock.patch(
+ "google.oauth2.challenges.SecurityKeyChallenge._obtain_challenge_input_webauthn",
+ return_value={"securityKey": "security key response"},
+ ):
+
+ assert challenge.obtain_challenge_input(metadata) == {
+ "securityKey": "security key response"
+ }
+ os.environ.pop('"GOOGLE_AUTH_WEBAUTHN_PLUGIN"', None)
+
# Test the case that security key challenge is passed with applicationId and
# relyingPartyId different, first call works.
metadata["securityKey"]["relyingPartyId"] = "security_key_relying_party_id"
@@ -173,6 +196,136 @@ def test_security_key():
assert excinfo.match(r"pyu2f dependency is required")
+def test_security_key_webauthn():
+ metadata = {
+ "status": "READY",
+ "challengeId": 2,
+ "challengeType": "SECURITY_KEY",
+ "securityKey": {
+ "applicationId": "security_key_application_id",
+ "challenges": [
+ {
+ "keyHandle": "some_key",
+ "challenge": base64.urlsafe_b64encode(
+ "some_challenge".encode("ascii")
+ ).decode("ascii"),
+ }
+ ],
+ "relyingPartyId": "security_key_application_id",
+ },
+ }
+
+ challenge = challenges.SecurityKeyChallenge()
+
+ sk = metadata["securityKey"]
+ sk_challenges = sk["challenges"]
+
+ application_id = sk["applicationId"]
+
+ allow_credentials = []
+ for sk_challenge in sk_challenges:
+ allow_credentials.append(
+ PublicKeyCredentialDescriptor(id=sk_challenge["keyHandle"])
+ )
+
+ extension = AuthenticationExtensionsClientInputs(appid=application_id)
+
+ get_request = GetRequest(
+ origin=challenges.REAUTH_ORIGIN,
+ rpid=application_id,
+ challenge=challenge._unpadded_urlsafe_b64recode(sk_challenge["challenge"]),
+ timeout_ms=challenges.WEBAUTHN_TIMEOUT_MS,
+ allow_credentials=allow_credentials,
+ user_verification="required",
+ extensions=extension,
+ )
+
+ assertion_resp = AuthenticatorAssertionResponse(
+ client_data_json="clientDataJSON",
+ authenticator_data="authenticatorData",
+ signature="signature",
+ user_handle="userHandle",
+ )
+ get_response = GetResponse(
+ id="id",
+ response=assertion_resp,
+ authenticator_attachment="authenticatorAttachment",
+ client_extension_results="clientExtensionResults",
+ )
+ response = {
+ "clientData": get_response.response.client_data_json,
+ "authenticatorData": get_response.response.authenticator_data,
+ "signatureData": get_response.response.signature,
+ "applicationId": "security_key_application_id",
+ "keyHandle": get_response.id,
+ "securityKeyReplyType": 2,
+ }
+
+ mock_handler = mock.Mock()
+ mock_handler.get.return_value = get_response
+
+ # Test success case
+ assert challenge._obtain_challenge_input_webauthn(metadata, mock_handler) == {
+ "securityKey": response
+ }
+ mock_handler.get.assert_called_with(get_request)
+
+ # Test exceptions
+
+ # Missing Values
+ sk = metadata["securityKey"]
+ metadata["securityKey"] = None
+ with pytest.raises(exceptions.InvalidValue):
+ challenge._obtain_challenge_input_webauthn(metadata, mock_handler)
+ metadata["securityKey"] = sk
+
+ c = metadata["securityKey"]["challenges"]
+ metadata["securityKey"]["challenges"] = None
+ with pytest.raises(exceptions.InvalidValue):
+ challenge._obtain_challenge_input_webauthn(metadata, mock_handler)
+ metadata["securityKey"]["challenges"] = []
+ with pytest.raises(exceptions.InvalidValue):
+ challenge._obtain_challenge_input_webauthn(metadata, mock_handler)
+ metadata["securityKey"]["challenges"] = c
+
+ aid = metadata["securityKey"]["applicationId"]
+ metadata["securityKey"]["applicationId"] = None
+ with pytest.raises(exceptions.InvalidValue):
+ challenge._obtain_challenge_input_webauthn(metadata, mock_handler)
+ metadata["securityKey"]["applicationId"] = aid
+
+ rpi = metadata["securityKey"]["relyingPartyId"]
+ metadata["securityKey"]["relyingPartyId"] = None
+ with pytest.raises(exceptions.InvalidValue):
+ challenge._obtain_challenge_input_webauthn(metadata, mock_handler)
+ metadata["securityKey"]["relyingPartyId"] = rpi
+
+ kh = metadata["securityKey"]["challenges"][0]["keyHandle"]
+ metadata["securityKey"]["challenges"][0]["keyHandle"] = None
+ with pytest.raises(exceptions.InvalidValue):
+ challenge._obtain_challenge_input_webauthn(metadata, mock_handler)
+ metadata["securityKey"]["challenges"][0]["keyHandle"] = kh
+
+ ch = metadata["securityKey"]["challenges"][0]["challenge"]
+ metadata["securityKey"]["challenges"][0]["challenge"] = None
+ with pytest.raises(exceptions.InvalidValue):
+ challenge._obtain_challenge_input_webauthn(metadata, mock_handler)
+ metadata["securityKey"]["challenges"][0]["challenge"] = ch
+
+ # Handler Exceptions
+ mock_handler.get.side_effect = exceptions.MalformedError
+ with pytest.raises(exceptions.MalformedError):
+ challenge._obtain_challenge_input_webauthn(metadata, mock_handler)
+
+ mock_handler.get.side_effect = exceptions.InvalidResource
+ with pytest.raises(exceptions.InvalidResource):
+ challenge._obtain_challenge_input_webauthn(metadata, mock_handler)
+
+ mock_handler.get.side_effect = exceptions.ReauthFailError
+ with pytest.raises(exceptions.ReauthFailError):
+ challenge._obtain_challenge_input_webauthn(metadata, mock_handler)
+
+
@mock.patch("getpass.getpass", return_value="foo")
def test_password_challenge(getpass_mock):
challenge = challenges.PasswordChallenge()
diff --git a/contrib/python/google-auth/py3/tests/oauth2/test_service_account.py b/contrib/python/google-auth/py3/tests/oauth2/test_service_account.py
index ce0c72fa0ab..0dbe316a0f4 100644
--- a/contrib/python/google-auth/py3/tests/oauth2/test_service_account.py
+++ b/contrib/python/google-auth/py3/tests/oauth2/test_service_account.py
@@ -22,6 +22,7 @@ import pytest # type: ignore
from google.auth import _helpers
from google.auth import crypt
from google.auth import exceptions
+from google.auth import iam
from google.auth import jwt
from google.auth import transport
from google.auth.credentials import DEFAULT_UNIVERSE_DOMAIN
@@ -772,10 +773,36 @@ class TestIDTokenCredentials(object):
)
request = mock.Mock()
credentials.refresh(request)
- req, signer_email, target_audience, access_token = call_iam_generate_id_token_endpoint.call_args[
+ req, iam_endpoint, signer_email, target_audience, access_token = call_iam_generate_id_token_endpoint.call_args[
0
]
assert req == request
+ assert iam_endpoint == iam._IAM_IDTOKEN_ENDPOINT
+ assert signer_email == "[email protected]"
+ assert target_audience == "https://example.com"
+ decoded_access_token = jwt.decode(access_token, verify=False)
+ assert decoded_access_token["scope"] == "https://www.googleapis.com/auth/iam"
+
+ @mock.patch(
+ "google.oauth2._client.call_iam_generate_id_token_endpoint", autospec=True
+ )
+ def test_refresh_iam_flow_non_gdu(self, call_iam_generate_id_token_endpoint):
+ credentials = self.make_credentials(universe_domain="fake-universe")
+ token = "id_token"
+ call_iam_generate_id_token_endpoint.return_value = (
+ token,
+ _helpers.utcnow() + datetime.timedelta(seconds=500),
+ )
+ request = mock.Mock()
+ credentials.refresh(request)
+ req, iam_endpoint, signer_email, target_audience, access_token = call_iam_generate_id_token_endpoint.call_args[
+ 0
+ ]
+ assert req == request
+ assert (
+ iam_endpoint
+ == "https://iamcredentials.fake-universe/v1/projects/-/serviceAccounts/{}:generateIdToken"
+ )
assert signer_email == "[email protected]"
assert target_audience == "https://example.com"
decoded_access_token = jwt.decode(access_token, verify=False)
diff --git a/contrib/python/google-auth/py3/tests/oauth2/test_webauthn_handler.py b/contrib/python/google-auth/py3/tests/oauth2/test_webauthn_handler.py
new file mode 100644
index 00000000000..454e97cb61d
--- /dev/null
+++ b/contrib/python/google-auth/py3/tests/oauth2/test_webauthn_handler.py
@@ -0,0 +1,148 @@
+import json
+import struct
+
+import mock
+import pytest # type: ignore
+
+from google.auth import exceptions
+from google.oauth2 import webauthn_handler
+from google.oauth2 import webauthn_types
+
+
+def os_get_stub():
+ with mock.patch.object(
+ webauthn_handler.os.environ,
+ "get",
+ return_value="gcloud_webauthn_plugin",
+ name="fake os.environ.get",
+ ) as mock_os_environ_get:
+ yield mock_os_environ_get
+
+
+def subprocess_run_stub():
+ with mock.patch.object(
+ webauthn_handler.subprocess, "run", name="fake subprocess.run"
+ ) as mock_subprocess_run:
+ yield mock_subprocess_run
+
+
+def test_PluginHandler_is_available(os_get_stub):
+ test_handler = webauthn_handler.PluginHandler()
+
+ assert test_handler.is_available() is True
+
+ os_get_stub.return_value = None
+ assert test_handler.is_available() is False
+
+
+GET_ASSERTION_REQUEST = webauthn_types.GetRequest(
+ origin="fake_origin",
+ rpid="fake_rpid",
+ challenge="fake_challenge",
+ allow_credentials=[webauthn_types.PublicKeyCredentialDescriptor(id="fake_id_1")],
+)
+
+
+def test_malformated_get_assertion_response(os_get_stub, subprocess_run_stub):
+ response_len = struct.pack("<I", 5)
+ response = "1234567890"
+ mock_response = mock.Mock()
+ mock_response.stdout = response_len + response.encode()
+ subprocess_run_stub.return_value = mock_response
+
+ test_handler = webauthn_handler.PluginHandler()
+ with pytest.raises(exceptions.MalformedError) as excinfo:
+ test_handler.get(GET_ASSERTION_REQUEST)
+ assert "Plugin response length" in str(excinfo.value)
+
+
+def test_failure_get_assertion(os_get_stub, subprocess_run_stub):
+ failure_response = {
+ "type": "getResponse",
+ "error": "fake_plugin_get_assertion_failure",
+ }
+ response_json = json.dumps(failure_response).encode()
+ response_len = struct.pack("<I", len(response_json))
+
+ # process returns get response in json
+ mock_response = mock.Mock()
+ mock_response.stdout = response_len + response_json
+ subprocess_run_stub.return_value = mock_response
+
+ test_handler = webauthn_handler.PluginHandler()
+ with pytest.raises(exceptions.ReauthFailError) as excinfo:
+ test_handler.get(GET_ASSERTION_REQUEST)
+ assert failure_response["error"] in str(excinfo.value)
+
+
+def test_success_get_assertion(os_get_stub, subprocess_run_stub):
+ success_response = {
+ "type": "public-key",
+ "id": "fake-id",
+ "authenticatorAttachment": "cross-platform",
+ "clientExtensionResults": {"appid": True},
+ "response": {
+ "clientDataJSON": "fake_client_data_json_base64",
+ "authenticatorData": "fake_authenticator_data_base64",
+ "signature": "fake_signature_base64",
+ "userHandle": "fake_user_handle_base64",
+ },
+ }
+ valid_plugin_response = {"type": "getResponse", "responseData": success_response}
+ valid_plugin_response_json = json.dumps(valid_plugin_response).encode()
+ valid_plugin_response_len = struct.pack("<I", len(valid_plugin_response_json))
+
+ # process returns get response in json
+ mock_response = mock.Mock()
+ mock_response.stdout = valid_plugin_response_len + valid_plugin_response_json
+ subprocess_run_stub.return_value = mock_response
+
+ # Call get()
+ test_handler = webauthn_handler.PluginHandler()
+ got_response = test_handler.get(GET_ASSERTION_REQUEST)
+
+ # Validate expected plugin request
+ os_get_stub.assert_called_once()
+ subprocess_run_stub.assert_called_once()
+
+ stdin_input = subprocess_run_stub.call_args.kwargs["input"]
+ input_json_len_le = stdin_input[:4]
+ input_json_len = struct.unpack("<I", input_json_len_le)[0]
+ input_json = stdin_input[4:]
+ assert len(input_json) == input_json_len
+
+ input_dict = json.loads(input_json.decode("utf8"))
+ assert input_dict == {
+ "type": "get",
+ "origin": "fake_origin",
+ "requestData": {
+ "rpid": "fake_rpid",
+ "challenge": "fake_challenge",
+ "allowCredentials": [{"type": "public-key", "id": "fake_id_1"}],
+ },
+ }
+
+ # Validate get assertion response
+ assert got_response.id == success_response["id"]
+ assert (
+ got_response.authenticator_attachment
+ == success_response["authenticatorAttachment"]
+ )
+ assert (
+ got_response.client_extension_results
+ == success_response["clientExtensionResults"]
+ )
+ assert (
+ got_response.response.client_data_json
+ == success_response["response"]["clientDataJSON"]
+ )
+ assert (
+ got_response.response.authenticator_data
+ == success_response["response"]["authenticatorData"]
+ )
+ assert got_response.response.signature == success_response["response"]["signature"]
+ assert (
+ got_response.response.user_handle == success_response["response"]["userHandle"]
+ )
diff --git a/contrib/python/google-auth/py3/tests/oauth2/test_webauthn_handler_factory.py b/contrib/python/google-auth/py3/tests/oauth2/test_webauthn_handler_factory.py
new file mode 100644
index 00000000000..47890ce4b46
--- /dev/null
+++ b/contrib/python/google-auth/py3/tests/oauth2/test_webauthn_handler_factory.py
@@ -0,0 +1,29 @@
+import mock
+import pytest # type: ignore
+
+from google.oauth2 import webauthn_handler
+from google.oauth2 import webauthn_handler_factory
+
+
+def os_get_stub():
+ with mock.patch.object(
+ webauthn_handler.os.environ,
+ "get",
+ return_value="gcloud_webauthn_plugin",
+ name="fake os.environ.get",
+ ) as mock_os_environ_get:
+ yield mock_os_environ_get
+
+
+# Check that get_handler returns a value when env is set,
+# that type is PluginHandler, and that no value is returned
+# if env not set.
+def test_WebauthHandlerFactory_get(os_get_stub):
+ factory = webauthn_handler_factory.WebauthnHandlerFactory()
+ assert factory.get_handler() is not None
+
+ assert isinstance(factory.get_handler(), webauthn_handler.PluginHandler)
+
+ os_get_stub.return_value = None
+ assert factory.get_handler() is None
diff --git a/contrib/python/google-auth/py3/tests/oauth2/test_webauthn_types.py b/contrib/python/google-auth/py3/tests/oauth2/test_webauthn_types.py
new file mode 100644
index 00000000000..5231d21896a
--- /dev/null
+++ b/contrib/python/google-auth/py3/tests/oauth2/test_webauthn_types.py
@@ -0,0 +1,237 @@
+import json
+
+import pytest # type: ignore
+
+from google.oauth2 import webauthn_types
+
+
+ "test_pub_key_cred,expected_dict",
+ [
+ (
+ webauthn_types.PublicKeyCredentialDescriptor(
+ id="fake_cred_id_base64", transports=None
+ ),
+ {"type": "public-key", "id": "fake_cred_id_base64"},
+ ),
+ (
+ webauthn_types.PublicKeyCredentialDescriptor(
+ id="fake_cred_id_base64", transports=[]
+ ),
+ {"type": "public-key", "id": "fake_cred_id_base64"},
+ ),
+ (
+ webauthn_types.PublicKeyCredentialDescriptor(
+ id="fake_cred_id_base64", transports=["usb"]
+ ),
+ {"type": "public-key", "id": "fake_cred_id_base64", "transports": ["usb"]},
+ ),
+ (
+ webauthn_types.PublicKeyCredentialDescriptor(
+ id="fake_cred_id_base64", transports=["usb", "internal"]
+ ),
+ {
+ "type": "public-key",
+ "id": "fake_cred_id_base64",
+ "transports": ["usb", "internal"],
+ },
+ ),
+ ],
+)
+def test_PublicKeyCredentialDescriptor(test_pub_key_cred, expected_dict):
+ assert test_pub_key_cred.to_dict() == expected_dict
+
+
+ "test_extension_input,expected_dict",
+ [
+ (webauthn_types.AuthenticationExtensionsClientInputs(), {}),
+ (webauthn_types.AuthenticationExtensionsClientInputs(appid=""), {}),
+ (
+ webauthn_types.AuthenticationExtensionsClientInputs(appid="fake_appid"),
+ {"appid": "fake_appid"},
+ ),
+ ],
+)
+def test_AuthenticationExtensionsClientInputs(test_extension_input, expected_dict):
+ assert test_extension_input.to_dict() == expected_dict
+
+
[email protected]("has_allow_credentials", [(False), (True)])
+def test_GetRequest(has_allow_credentials):
+ allow_credentials = [
+ webauthn_types.PublicKeyCredentialDescriptor(id="fake_id_1"),
+ webauthn_types.PublicKeyCredentialDescriptor(id="fake_id_2"),
+ ]
+ test_get_request = webauthn_types.GetRequest(
+ origin="fake_origin",
+ rpid="fake_rpid",
+ challenge="fake_challenge",
+ timeout_ms=123,
+ allow_credentials=allow_credentials if has_allow_credentials else None,
+ user_verification="preferred",
+ extensions=webauthn_types.AuthenticationExtensionsClientInputs(
+ appid="fake_appid"
+ ),
+ )
+ expected_allow_credentials = [
+ {"type": "public-key", "id": "fake_id_1"},
+ {"type": "public-key", "id": "fake_id_2"},
+ ]
+ exepcted_dict = {
+ "type": "get",
+ "origin": "fake_origin",
+ "requestData": {
+ "rpid": "fake_rpid",
+ "timeout": 123,
+ "challenge": "fake_challenge",
+ "userVerification": "preferred",
+ "extensions": {"appid": "fake_appid"},
+ },
+ }
+ if has_allow_credentials:
+ exepcted_dict["requestData"]["allowCredentials"] = expected_allow_credentials
+ assert json.loads(test_get_request.to_json()) == exepcted_dict
+
+
+ "has_user_handle,has_authenticator_attachment,has_client_extension_results",
+ [
+ (False, False, False),
+ (False, False, True),
+ (False, True, False),
+ (False, True, True),
+ (True, False, False),
+ (True, False, True),
+ (True, True, False),
+ (True, True, True),
+ ],
+)
+def test_GetResponse(
+ has_user_handle, has_authenticator_attachment, has_client_extension_results
+):
+ input_response_data = {
+ "type": "public-key",
+ "id": "fake-id",
+ "authenticatorAttachment": "cross-platform",
+ "clientExtensionResults": {"appid": True},
+ "response": {
+ "clientDataJSON": "fake_client_data_json_base64",
+ "authenticatorData": "fake_authenticator_data_base64",
+ "signature": "fake_signature_base64",
+ "userHandle": "fake_user_handle_base64",
+ },
+ }
+ if not has_authenticator_attachment:
+ input_response_data.pop("authenticatorAttachment")
+ if not has_client_extension_results:
+ input_response_data.pop("clientExtensionResults")
+ if not has_user_handle:
+ input_response_data["response"].pop("userHandle")
+
+ response = webauthn_types.GetResponse.from_json(
+ json.dumps({"type": "getResponse", "responseData": input_response_data})
+ )
+
+ assert response.id == input_response_data["id"]
+ assert response.authenticator_attachment == (
+ input_response_data["authenticatorAttachment"]
+ if has_authenticator_attachment
+ else None
+ )
+ assert response.client_extension_results == (
+ input_response_data["clientExtensionResults"]
+ if has_client_extension_results
+ else None
+ )
+ assert (
+ response.response.client_data_json
+ == input_response_data["response"]["clientDataJSON"]
+ )
+ assert (
+ response.response.authenticator_data
+ == input_response_data["response"]["authenticatorData"]
+ )
+ assert response.response.signature == input_response_data["response"]["signature"]
+ assert response.response.user_handle == (
+ input_response_data["response"]["userHandle"] if has_user_handle else None
+ )
+
+
+ "input_dict,expected_error",
+ [
+ ({"xyz_type": "wrong_type"}, "Invalid Get response type"),
+ ({"type": "wrong_type"}, "Invalid Get response type"),
+ ({"type": "getResponse"}, "Get response is empty"),
+ (
+ {"type": "getResponse", "error": "fake_get_response_error"},
+ "WebAuthn.get failure: fake_get_response_error",
+ ),
+ (
+ {"type": "getResponse", "responseData": {"xyz_type": "wrong_type"}},
+ "Invalid credential type",
+ ),
+ (
+ {"type": "getResponse", "responseData": {"type": "wrong_type"}},
+ "Invalid credential type",
+ ),
+ (
+ {
+ "type": "getResponse",
+ "responseData": {"type": "public-key", "response": {}},
+ },
+ "KeyError",
+ ),
+ (
+ {
+ "type": "getResponse",
+ "responseData": {
+ "type": "public-key",
+ "response": {"clientDataJSON": "fake_client_data_json_base64"},
+ },
+ },
+ "KeyError",
+ ),
+ (
+ {
+ "type": "getResponse",
+ "responseData": {
+ "type": "public-key",
+ "response": {
+ "clientDataJSON": "fake_client_data_json_base64",
+ "authenticatorData": "fake_authenticator_data_base64",
+ },
+ },
+ },
+ "KeyError",
+ ),
+ (
+ {
+ "type": "getResponse",
+ "responseData": {
+ "type": "public-key",
+ "response": {
+ "clientDataJSON": "fake_client_data_json_base64",
+ "authenticatorData": "fake_authenticator_data_base64",
+ "signature": "fake_signature_base64",
+ },
+ },
+ },
+ "KeyError",
+ ),
+ ],
+)
+def test_GetResponse_error(input_dict, expected_error):
+ with pytest.raises(Exception) as excinfo:
+ webauthn_types.GetResponse.from_json(json.dumps(input_dict))
+ if expected_error == "KeyError":
+ assert excinfo.type is KeyError
+ else:
+ assert expected_error in str(excinfo.value)
+
+
+def test_MalformatedJsonInput():
+ with pytest.raises(ValueError) as excinfo:
+ webauthn_types.GetResponse.from_json(")]}")
+ assert "Invalid Get JSON response" in str(excinfo.value)
diff --git a/contrib/python/google-auth/py3/tests/test_aws.py b/contrib/python/google-auth/py3/tests/test_aws.py
index 56148203128..df1f02e7d70 100644
--- a/contrib/python/google-auth/py3/tests/test_aws.py
+++ b/contrib/python/google-auth/py3/tests/test_aws.py
@@ -1220,6 +1220,39 @@ class TestCredentials(object):
url + SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE
)
+ def test_info_with_default_token_url(self):
+ credentials = aws.Credentials(
+ audience=AUDIENCE,
+ subject_token_type=SUBJECT_TOKEN_TYPE,
+ credential_source=self.CREDENTIAL_SOURCE.copy(),
+ )
+
+ assert credentials.info == {
+ "type": "external_account",
+ "audience": AUDIENCE,
+ "subject_token_type": SUBJECT_TOKEN_TYPE,
+ "token_url": TOKEN_URL,
+ "credential_source": self.CREDENTIAL_SOURCE.copy(),
+ "universe_domain": DEFAULT_UNIVERSE_DOMAIN,
+ }
+
+ def test_info_with_default_token_url_with_universe_domain(self):
+ credentials = aws.Credentials(
+ audience=AUDIENCE,
+ subject_token_type=SUBJECT_TOKEN_TYPE,
+ credential_source=self.CREDENTIAL_SOURCE.copy(),
+ universe_domain="testdomain.org",
+ )
+
+ assert credentials.info == {
+ "type": "external_account",
+ "audience": AUDIENCE,
+ "subject_token_type": SUBJECT_TOKEN_TYPE,
+ "token_url": "https://sts.testdomain.org/v1/token",
+ "credential_source": self.CREDENTIAL_SOURCE.copy(),
+ "universe_domain": "testdomain.org",
+ }
+
def test_retrieve_subject_token_missing_region_url(self):
# When AWS_REGION envvar is not available, region_url is required for
# determining the current AWS region.
diff --git a/contrib/python/google-auth/py3/tests/test_identity_pool.py b/contrib/python/google-auth/py3/tests/test_identity_pool.py
index 0de711832f6..e4efe46c6bb 100644
--- a/contrib/python/google-auth/py3/tests/test_identity_pool.py
+++ b/contrib/python/google-auth/py3/tests/test_identity_pool.py
@@ -783,6 +783,39 @@ class TestCredentials(object):
"universe_domain": DEFAULT_UNIVERSE_DOMAIN,
}
+ def test_info_with_default_token_url(self):
+ credentials = identity_pool.Credentials(
+ audience=AUDIENCE,
+ subject_token_type=SUBJECT_TOKEN_TYPE,
+ credential_source=self.CREDENTIAL_SOURCE_TEXT_URL.copy(),
+ )
+
+ assert credentials.info == {
+ "type": "external_account",
+ "audience": AUDIENCE,
+ "subject_token_type": SUBJECT_TOKEN_TYPE,
+ "token_url": TOKEN_URL,
+ "credential_source": self.CREDENTIAL_SOURCE_TEXT_URL,
+ "universe_domain": DEFAULT_UNIVERSE_DOMAIN,
+ }
+
+ def test_info_with_default_token_url_with_universe_domain(self):
+ credentials = identity_pool.Credentials(
+ audience=AUDIENCE,
+ subject_token_type=SUBJECT_TOKEN_TYPE,
+ credential_source=self.CREDENTIAL_SOURCE_TEXT_URL.copy(),
+ universe_domain="testdomain.org",
+ )
+
+ assert credentials.info == {
+ "type": "external_account",
+ "audience": AUDIENCE,
+ "subject_token_type": SUBJECT_TOKEN_TYPE,
+ "token_url": "https://sts.testdomain.org/v1/token",
+ "credential_source": self.CREDENTIAL_SOURCE_TEXT_URL,
+ "universe_domain": "testdomain.org",
+ }
+
def test_retrieve_subject_token_missing_subject_token(self, tmpdir):
# Provide empty text file.
empty_file = tmpdir.join("empty.txt")
diff --git a/contrib/python/google-auth/py3/tests/transport/test__custom_tls_signer.py b/contrib/python/google-auth/py3/tests/transport/test__custom_tls_signer.py
index d2907bad297..3a33c2c0215 100644
--- a/contrib/python/google-auth/py3/tests/transport/test__custom_tls_signer.py
+++ b/contrib/python/google-auth/py3/tests/transport/test__custom_tls_signer.py
@@ -195,6 +195,7 @@ def test_custom_tls_signer():
get_cert.assert_called_once()
get_sign_callback.assert_called_once()
offload_lib.ConfigureSslContext.assert_called_once()
+ assert not signer_object.should_use_provider()
assert signer_object._enterprise_cert_file_path == ENTERPRISE_CERT_FILE
assert signer_object._offload_lib == offload_lib
assert signer_object._signer_lib == signer_lib
@@ -216,6 +217,7 @@ def test_custom_tls_signer_provider():
signer_object.load_libraries()
signer_object.attach_to_ssl_context(mock.MagicMock())
+ assert signer_object.should_use_provider()
assert signer_object._enterprise_cert_file_path == ENTERPRISE_CERT_FILE_PROVIDER
assert signer_object._provider_lib == provider_lib
load_provider_lib.assert_called_with("/path/to/provider/lib")
diff --git a/contrib/python/google-auth/py3/tests/transport/test_requests.py b/contrib/python/google-auth/py3/tests/transport/test_requests.py
index aadc1ddbfd0..0da3e36d9a3 100644
--- a/contrib/python/google-auth/py3/tests/transport/test_requests.py
+++ b/contrib/python/google-auth/py3/tests/transport/test_requests.py
@@ -568,3 +568,38 @@ class TestMutualTlsOffloadAdapter(object):
adapter.proxy_manager_for()
mock_proxy_manager_for.assert_called_with(ssl_context=adapter._ctx_proxymanager)
+
+ @mock.patch.object(requests.adapters.HTTPAdapter, "init_poolmanager")
+ @mock.patch.object(requests.adapters.HTTPAdapter, "proxy_manager_for")
+ @mock.patch.object(
+ google.auth.transport._custom_tls_signer.CustomTlsSigner, "should_use_provider"
+ )
+ @mock.patch.object(
+ google.auth.transport._custom_tls_signer.CustomTlsSigner, "load_libraries"
+ )
+ @mock.patch.object(
+ google.auth.transport._custom_tls_signer.CustomTlsSigner,
+ "attach_to_ssl_context",
+ )
+ def test_success_should_use_provider(
+ self,
+ mock_attach_to_ssl_context,
+ mock_load_libraries,
+ mock_should_use_provider,
+ mock_proxy_manager_for,
+ mock_init_poolmanager,
+ ):
+ enterprise_cert_file_path = "/path/to/enterprise/cert/json"
+ adapter = google.auth.transport.requests._MutualTlsOffloadAdapter(
+ enterprise_cert_file_path
+ )
+
+ mock_should_use_provider.side_effect = True
+ mock_load_libraries.assert_called_once()
+ assert mock_attach_to_ssl_context.call_count == 2
+
+ adapter.init_poolmanager()
+ mock_init_poolmanager.assert_called_with(ssl_context=adapter._ctx_poolmanager)
+
+ adapter.proxy_manager_for()
+ mock_proxy_manager_for.assert_called_with(ssl_context=adapter._ctx_proxymanager)