diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-06-21 09:28:26 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-06-21 09:36:40 +0300 |
commit | 0cb3f820fac6a243bcb7e4c4388700898660bfd0 (patch) | |
tree | 056f1b8bc5f72039fa422aac0af13bab0e966aa7 /contrib/python/google-auth/py3/tests/oauth2 | |
parent | 08049311fe5c42a97e8bb47a73fb6cd143c0bdb1 (diff) | |
download | ydb-0cb3f820fac6a243bcb7e4c4388700898660bfd0.tar.gz |
Intermediate changes
Diffstat (limited to 'contrib/python/google-auth/py3/tests/oauth2')
6 files changed, 606 insertions, 3 deletions
diff --git a/contrib/python/google-auth/py3/tests/oauth2/test__client.py b/contrib/python/google-auth/py3/tests/oauth2/test__client.py index 444232f396..f9a2d3aff4 100644 --- a/contrib/python/google-auth/py3/tests/oauth2/test__client.py +++ b/contrib/python/google-auth/py3/tests/oauth2/test__client.py @@ -24,6 +24,7 @@ import pytest # type: ignore from google.auth import _helpers from google.auth import crypt from google.auth import exceptions +from google.auth import iam from google.auth import jwt from google.auth import transport from google.oauth2 import _client @@ -319,7 +320,11 @@ def test_call_iam_generate_id_token_endpoint(): request = make_request({"token": id_token}) token, expiry = _client.call_iam_generate_id_token_endpoint( - request, "fake_email", "fake_audience", "fake_access_token" + request, + iam._IAM_IDTOKEN_ENDPOINT, + "fake_email", + "fake_audience", + "fake_access_token", ) assert ( @@ -352,7 +357,11 @@ def test_call_iam_generate_id_token_endpoint_no_id_token(): with pytest.raises(exceptions.RefreshError) as excinfo: _client.call_iam_generate_id_token_endpoint( - request, "fake_email", "fake_audience", "fake_access_token" + request, + iam._IAM_IDTOKEN_ENDPOINT, + "fake_email", + "fake_audience", + "fake_access_token", ) assert excinfo.match("No ID token in response") diff --git a/contrib/python/google-auth/py3/tests/oauth2/test_challenges.py b/contrib/python/google-auth/py3/tests/oauth2/test_challenges.py index a06f552837..4116b913ab 100644 --- a/contrib/python/google-auth/py3/tests/oauth2/test_challenges.py +++ b/contrib/python/google-auth/py3/tests/oauth2/test_challenges.py @@ -15,6 +15,7 @@ """Tests for the reauth module.""" import base64 +import os import sys import mock @@ -23,6 +24,13 @@ import pyu2f # type: ignore from google.auth import exceptions from google.oauth2 import challenges +from google.oauth2.webauthn_types import ( + AuthenticationExtensionsClientInputs, + AuthenticatorAssertionResponse, + GetRequest, + GetResponse, + PublicKeyCredentialDescriptor, +) def test_get_user_password(): @@ -54,6 +62,8 @@ def test_security_key(): # Test the case that security key challenge is passed with applicationId and # relyingPartyId the same. + os.environ.pop('"GOOGLE_AUTH_WEBAUTHN_PLUGIN"', None) + with mock.patch("pyu2f.model.RegisteredKey", return_value=mock_key): with mock.patch( "pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate" @@ -70,6 +80,19 @@ def test_security_key(): print_callback=sys.stderr.write, ) + # Test the case that webauthn plugin is available + os.environ["GOOGLE_AUTH_WEBAUTHN_PLUGIN"] = "plugin" + + with mock.patch( + "google.oauth2.challenges.SecurityKeyChallenge._obtain_challenge_input_webauthn", + return_value={"securityKey": "security key response"}, + ): + + assert challenge.obtain_challenge_input(metadata) == { + "securityKey": "security key response" + } + os.environ.pop('"GOOGLE_AUTH_WEBAUTHN_PLUGIN"', None) + # Test the case that security key challenge is passed with applicationId and # relyingPartyId different, first call works. metadata["securityKey"]["relyingPartyId"] = "security_key_relying_party_id" @@ -173,6 +196,136 @@ def test_security_key(): assert excinfo.match(r"pyu2f dependency is required") +def test_security_key_webauthn(): + metadata = { + "status": "READY", + "challengeId": 2, + "challengeType": "SECURITY_KEY", + "securityKey": { + "applicationId": "security_key_application_id", + "challenges": [ + { + "keyHandle": "some_key", + "challenge": base64.urlsafe_b64encode( + "some_challenge".encode("ascii") + ).decode("ascii"), + } + ], + "relyingPartyId": "security_key_application_id", + }, + } + + challenge = challenges.SecurityKeyChallenge() + + sk = metadata["securityKey"] + sk_challenges = sk["challenges"] + + application_id = sk["applicationId"] + + allow_credentials = [] + for sk_challenge in sk_challenges: + allow_credentials.append( + PublicKeyCredentialDescriptor(id=sk_challenge["keyHandle"]) + ) + + extension = AuthenticationExtensionsClientInputs(appid=application_id) + + get_request = GetRequest( + origin=challenges.REAUTH_ORIGIN, + rpid=application_id, + challenge=challenge._unpadded_urlsafe_b64recode(sk_challenge["challenge"]), + timeout_ms=challenges.WEBAUTHN_TIMEOUT_MS, + allow_credentials=allow_credentials, + user_verification="required", + extensions=extension, + ) + + assertion_resp = AuthenticatorAssertionResponse( + client_data_json="clientDataJSON", + authenticator_data="authenticatorData", + signature="signature", + user_handle="userHandle", + ) + get_response = GetResponse( + id="id", + response=assertion_resp, + authenticator_attachment="authenticatorAttachment", + client_extension_results="clientExtensionResults", + ) + response = { + "clientData": get_response.response.client_data_json, + "authenticatorData": get_response.response.authenticator_data, + "signatureData": get_response.response.signature, + "applicationId": "security_key_application_id", + "keyHandle": get_response.id, + "securityKeyReplyType": 2, + } + + mock_handler = mock.Mock() + mock_handler.get.return_value = get_response + + # Test success case + assert challenge._obtain_challenge_input_webauthn(metadata, mock_handler) == { + "securityKey": response + } + mock_handler.get.assert_called_with(get_request) + + # Test exceptions + + # Missing Values + sk = metadata["securityKey"] + metadata["securityKey"] = None + with pytest.raises(exceptions.InvalidValue): + challenge._obtain_challenge_input_webauthn(metadata, mock_handler) + metadata["securityKey"] = sk + + c = metadata["securityKey"]["challenges"] + metadata["securityKey"]["challenges"] = None + with pytest.raises(exceptions.InvalidValue): + challenge._obtain_challenge_input_webauthn(metadata, mock_handler) + metadata["securityKey"]["challenges"] = [] + with pytest.raises(exceptions.InvalidValue): + challenge._obtain_challenge_input_webauthn(metadata, mock_handler) + metadata["securityKey"]["challenges"] = c + + aid = metadata["securityKey"]["applicationId"] + metadata["securityKey"]["applicationId"] = None + with pytest.raises(exceptions.InvalidValue): + challenge._obtain_challenge_input_webauthn(metadata, mock_handler) + metadata["securityKey"]["applicationId"] = aid + + rpi = metadata["securityKey"]["relyingPartyId"] + metadata["securityKey"]["relyingPartyId"] = None + with pytest.raises(exceptions.InvalidValue): + challenge._obtain_challenge_input_webauthn(metadata, mock_handler) + metadata["securityKey"]["relyingPartyId"] = rpi + + kh = metadata["securityKey"]["challenges"][0]["keyHandle"] + metadata["securityKey"]["challenges"][0]["keyHandle"] = None + with pytest.raises(exceptions.InvalidValue): + challenge._obtain_challenge_input_webauthn(metadata, mock_handler) + metadata["securityKey"]["challenges"][0]["keyHandle"] = kh + + ch = metadata["securityKey"]["challenges"][0]["challenge"] + metadata["securityKey"]["challenges"][0]["challenge"] = None + with pytest.raises(exceptions.InvalidValue): + challenge._obtain_challenge_input_webauthn(metadata, mock_handler) + metadata["securityKey"]["challenges"][0]["challenge"] = ch + + # Handler Exceptions + mock_handler.get.side_effect = exceptions.MalformedError + with pytest.raises(exceptions.MalformedError): + challenge._obtain_challenge_input_webauthn(metadata, mock_handler) + + mock_handler.get.side_effect = exceptions.InvalidResource + with pytest.raises(exceptions.InvalidResource): + challenge._obtain_challenge_input_webauthn(metadata, mock_handler) + + mock_handler.get.side_effect = exceptions.ReauthFailError + with pytest.raises(exceptions.ReauthFailError): + challenge._obtain_challenge_input_webauthn(metadata, mock_handler) + + @mock.patch("getpass.getpass", return_value="foo") def test_password_challenge(getpass_mock): challenge = challenges.PasswordChallenge() diff --git a/contrib/python/google-auth/py3/tests/oauth2/test_service_account.py b/contrib/python/google-auth/py3/tests/oauth2/test_service_account.py index ce0c72fa0a..0dbe316a0f 100644 --- a/contrib/python/google-auth/py3/tests/oauth2/test_service_account.py +++ b/contrib/python/google-auth/py3/tests/oauth2/test_service_account.py @@ -22,6 +22,7 @@ import pytest # type: ignore from google.auth import _helpers from google.auth import crypt from google.auth import exceptions +from google.auth import iam from google.auth import jwt from google.auth import transport from google.auth.credentials import DEFAULT_UNIVERSE_DOMAIN @@ -772,10 +773,36 @@ class TestIDTokenCredentials(object): ) request = mock.Mock() credentials.refresh(request) - req, signer_email, target_audience, access_token = call_iam_generate_id_token_endpoint.call_args[ + req, iam_endpoint, signer_email, target_audience, access_token = call_iam_generate_id_token_endpoint.call_args[ 0 ] assert req == request + assert iam_endpoint == iam._IAM_IDTOKEN_ENDPOINT + assert signer_email == "service-account@example.com" + assert target_audience == "https://example.com" + decoded_access_token = jwt.decode(access_token, verify=False) + assert decoded_access_token["scope"] == "https://www.googleapis.com/auth/iam" + + @mock.patch( + "google.oauth2._client.call_iam_generate_id_token_endpoint", autospec=True + ) + def test_refresh_iam_flow_non_gdu(self, call_iam_generate_id_token_endpoint): + credentials = self.make_credentials(universe_domain="fake-universe") + token = "id_token" + call_iam_generate_id_token_endpoint.return_value = ( + token, + _helpers.utcnow() + datetime.timedelta(seconds=500), + ) + request = mock.Mock() + credentials.refresh(request) + req, iam_endpoint, signer_email, target_audience, access_token = call_iam_generate_id_token_endpoint.call_args[ + 0 + ] + assert req == request + assert ( + iam_endpoint + == "https://iamcredentials.fake-universe/v1/projects/-/serviceAccounts/{}:generateIdToken" + ) assert signer_email == "service-account@example.com" assert target_audience == "https://example.com" decoded_access_token = jwt.decode(access_token, verify=False) diff --git a/contrib/python/google-auth/py3/tests/oauth2/test_webauthn_handler.py b/contrib/python/google-auth/py3/tests/oauth2/test_webauthn_handler.py new file mode 100644 index 0000000000..454e97cb61 --- /dev/null +++ b/contrib/python/google-auth/py3/tests/oauth2/test_webauthn_handler.py @@ -0,0 +1,148 @@ +import json +import struct + +import mock +import pytest # type: ignore + +from google.auth import exceptions +from google.oauth2 import webauthn_handler +from google.oauth2 import webauthn_types + + +@pytest.fixture +def os_get_stub(): + with mock.patch.object( + webauthn_handler.os.environ, + "get", + return_value="gcloud_webauthn_plugin", + name="fake os.environ.get", + ) as mock_os_environ_get: + yield mock_os_environ_get + + +@pytest.fixture +def subprocess_run_stub(): + with mock.patch.object( + webauthn_handler.subprocess, "run", name="fake subprocess.run" + ) as mock_subprocess_run: + yield mock_subprocess_run + + +def test_PluginHandler_is_available(os_get_stub): + test_handler = webauthn_handler.PluginHandler() + + assert test_handler.is_available() is True + + os_get_stub.return_value = None + assert test_handler.is_available() is False + + +GET_ASSERTION_REQUEST = webauthn_types.GetRequest( + origin="fake_origin", + rpid="fake_rpid", + challenge="fake_challenge", + allow_credentials=[webauthn_types.PublicKeyCredentialDescriptor(id="fake_id_1")], +) + + +def test_malformated_get_assertion_response(os_get_stub, subprocess_run_stub): + response_len = struct.pack("<I", 5) + response = "1234567890" + mock_response = mock.Mock() + mock_response.stdout = response_len + response.encode() + subprocess_run_stub.return_value = mock_response + + test_handler = webauthn_handler.PluginHandler() + with pytest.raises(exceptions.MalformedError) as excinfo: + test_handler.get(GET_ASSERTION_REQUEST) + assert "Plugin response length" in str(excinfo.value) + + +def test_failure_get_assertion(os_get_stub, subprocess_run_stub): + failure_response = { + "type": "getResponse", + "error": "fake_plugin_get_assertion_failure", + } + response_json = json.dumps(failure_response).encode() + response_len = struct.pack("<I", len(response_json)) + + # process returns get response in json + mock_response = mock.Mock() + mock_response.stdout = response_len + response_json + subprocess_run_stub.return_value = mock_response + + test_handler = webauthn_handler.PluginHandler() + with pytest.raises(exceptions.ReauthFailError) as excinfo: + test_handler.get(GET_ASSERTION_REQUEST) + assert failure_response["error"] in str(excinfo.value) + + +def test_success_get_assertion(os_get_stub, subprocess_run_stub): + success_response = { + "type": "public-key", + "id": "fake-id", + "authenticatorAttachment": "cross-platform", + "clientExtensionResults": {"appid": True}, + "response": { + "clientDataJSON": "fake_client_data_json_base64", + "authenticatorData": "fake_authenticator_data_base64", + "signature": "fake_signature_base64", + "userHandle": "fake_user_handle_base64", + }, + } + valid_plugin_response = {"type": "getResponse", "responseData": success_response} + valid_plugin_response_json = json.dumps(valid_plugin_response).encode() + valid_plugin_response_len = struct.pack("<I", len(valid_plugin_response_json)) + + # process returns get response in json + mock_response = mock.Mock() + mock_response.stdout = valid_plugin_response_len + valid_plugin_response_json + subprocess_run_stub.return_value = mock_response + + # Call get() + test_handler = webauthn_handler.PluginHandler() + got_response = test_handler.get(GET_ASSERTION_REQUEST) + + # Validate expected plugin request + os_get_stub.assert_called_once() + subprocess_run_stub.assert_called_once() + + stdin_input = subprocess_run_stub.call_args.kwargs["input"] + input_json_len_le = stdin_input[:4] + input_json_len = struct.unpack("<I", input_json_len_le)[0] + input_json = stdin_input[4:] + assert len(input_json) == input_json_len + + input_dict = json.loads(input_json.decode("utf8")) + assert input_dict == { + "type": "get", + "origin": "fake_origin", + "requestData": { + "rpid": "fake_rpid", + "challenge": "fake_challenge", + "allowCredentials": [{"type": "public-key", "id": "fake_id_1"}], + }, + } + + # Validate get assertion response + assert got_response.id == success_response["id"] + assert ( + got_response.authenticator_attachment + == success_response["authenticatorAttachment"] + ) + assert ( + got_response.client_extension_results + == success_response["clientExtensionResults"] + ) + assert ( + got_response.response.client_data_json + == success_response["response"]["clientDataJSON"] + ) + assert ( + got_response.response.authenticator_data + == success_response["response"]["authenticatorData"] + ) + assert got_response.response.signature == success_response["response"]["signature"] + assert ( + got_response.response.user_handle == success_response["response"]["userHandle"] + ) diff --git a/contrib/python/google-auth/py3/tests/oauth2/test_webauthn_handler_factory.py b/contrib/python/google-auth/py3/tests/oauth2/test_webauthn_handler_factory.py new file mode 100644 index 0000000000..47890ce4b4 --- /dev/null +++ b/contrib/python/google-auth/py3/tests/oauth2/test_webauthn_handler_factory.py @@ -0,0 +1,29 @@ +import mock +import pytest # type: ignore + +from google.oauth2 import webauthn_handler +from google.oauth2 import webauthn_handler_factory + + +@pytest.fixture +def os_get_stub(): + with mock.patch.object( + webauthn_handler.os.environ, + "get", + return_value="gcloud_webauthn_plugin", + name="fake os.environ.get", + ) as mock_os_environ_get: + yield mock_os_environ_get + + +# Check that get_handler returns a value when env is set, +# that type is PluginHandler, and that no value is returned +# if env not set. +def test_WebauthHandlerFactory_get(os_get_stub): + factory = webauthn_handler_factory.WebauthnHandlerFactory() + assert factory.get_handler() is not None + + assert isinstance(factory.get_handler(), webauthn_handler.PluginHandler) + + os_get_stub.return_value = None + assert factory.get_handler() is None diff --git a/contrib/python/google-auth/py3/tests/oauth2/test_webauthn_types.py b/contrib/python/google-auth/py3/tests/oauth2/test_webauthn_types.py new file mode 100644 index 0000000000..5231d21896 --- /dev/null +++ b/contrib/python/google-auth/py3/tests/oauth2/test_webauthn_types.py @@ -0,0 +1,237 @@ +import json + +import pytest # type: ignore + +from google.oauth2 import webauthn_types + + +@pytest.mark.parametrize( + "test_pub_key_cred,expected_dict", + [ + ( + webauthn_types.PublicKeyCredentialDescriptor( + id="fake_cred_id_base64", transports=None + ), + {"type": "public-key", "id": "fake_cred_id_base64"}, + ), + ( + webauthn_types.PublicKeyCredentialDescriptor( + id="fake_cred_id_base64", transports=[] + ), + {"type": "public-key", "id": "fake_cred_id_base64"}, + ), + ( + webauthn_types.PublicKeyCredentialDescriptor( + id="fake_cred_id_base64", transports=["usb"] + ), + {"type": "public-key", "id": "fake_cred_id_base64", "transports": ["usb"]}, + ), + ( + webauthn_types.PublicKeyCredentialDescriptor( + id="fake_cred_id_base64", transports=["usb", "internal"] + ), + { + "type": "public-key", + "id": "fake_cred_id_base64", + "transports": ["usb", "internal"], + }, + ), + ], +) +def test_PublicKeyCredentialDescriptor(test_pub_key_cred, expected_dict): + assert test_pub_key_cred.to_dict() == expected_dict + + +@pytest.mark.parametrize( + "test_extension_input,expected_dict", + [ + (webauthn_types.AuthenticationExtensionsClientInputs(), {}), + (webauthn_types.AuthenticationExtensionsClientInputs(appid=""), {}), + ( + webauthn_types.AuthenticationExtensionsClientInputs(appid="fake_appid"), + {"appid": "fake_appid"}, + ), + ], +) +def test_AuthenticationExtensionsClientInputs(test_extension_input, expected_dict): + assert test_extension_input.to_dict() == expected_dict + + +@pytest.mark.parametrize("has_allow_credentials", [(False), (True)]) +def test_GetRequest(has_allow_credentials): + allow_credentials = [ + webauthn_types.PublicKeyCredentialDescriptor(id="fake_id_1"), + webauthn_types.PublicKeyCredentialDescriptor(id="fake_id_2"), + ] + test_get_request = webauthn_types.GetRequest( + origin="fake_origin", + rpid="fake_rpid", + challenge="fake_challenge", + timeout_ms=123, + allow_credentials=allow_credentials if has_allow_credentials else None, + user_verification="preferred", + extensions=webauthn_types.AuthenticationExtensionsClientInputs( + appid="fake_appid" + ), + ) + expected_allow_credentials = [ + {"type": "public-key", "id": "fake_id_1"}, + {"type": "public-key", "id": "fake_id_2"}, + ] + exepcted_dict = { + "type": "get", + "origin": "fake_origin", + "requestData": { + "rpid": "fake_rpid", + "timeout": 123, + "challenge": "fake_challenge", + "userVerification": "preferred", + "extensions": {"appid": "fake_appid"}, + }, + } + if has_allow_credentials: + exepcted_dict["requestData"]["allowCredentials"] = expected_allow_credentials + assert json.loads(test_get_request.to_json()) == exepcted_dict + + +@pytest.mark.parametrize( + "has_user_handle,has_authenticator_attachment,has_client_extension_results", + [ + (False, False, False), + (False, False, True), + (False, True, False), + (False, True, True), + (True, False, False), + (True, False, True), + (True, True, False), + (True, True, True), + ], +) +def test_GetResponse( + has_user_handle, has_authenticator_attachment, has_client_extension_results +): + input_response_data = { + "type": "public-key", + "id": "fake-id", + "authenticatorAttachment": "cross-platform", + "clientExtensionResults": {"appid": True}, + "response": { + "clientDataJSON": "fake_client_data_json_base64", + "authenticatorData": "fake_authenticator_data_base64", + "signature": "fake_signature_base64", + "userHandle": "fake_user_handle_base64", + }, + } + if not has_authenticator_attachment: + input_response_data.pop("authenticatorAttachment") + if not has_client_extension_results: + input_response_data.pop("clientExtensionResults") + if not has_user_handle: + input_response_data["response"].pop("userHandle") + + response = webauthn_types.GetResponse.from_json( + json.dumps({"type": "getResponse", "responseData": input_response_data}) + ) + + assert response.id == input_response_data["id"] + assert response.authenticator_attachment == ( + input_response_data["authenticatorAttachment"] + if has_authenticator_attachment + else None + ) + assert response.client_extension_results == ( + input_response_data["clientExtensionResults"] + if has_client_extension_results + else None + ) + assert ( + response.response.client_data_json + == input_response_data["response"]["clientDataJSON"] + ) + assert ( + response.response.authenticator_data + == input_response_data["response"]["authenticatorData"] + ) + assert response.response.signature == input_response_data["response"]["signature"] + assert response.response.user_handle == ( + input_response_data["response"]["userHandle"] if has_user_handle else None + ) + + +@pytest.mark.parametrize( + "input_dict,expected_error", + [ + ({"xyz_type": "wrong_type"}, "Invalid Get response type"), + ({"type": "wrong_type"}, "Invalid Get response type"), + ({"type": "getResponse"}, "Get response is empty"), + ( + {"type": "getResponse", "error": "fake_get_response_error"}, + "WebAuthn.get failure: fake_get_response_error", + ), + ( + {"type": "getResponse", "responseData": {"xyz_type": "wrong_type"}}, + "Invalid credential type", + ), + ( + {"type": "getResponse", "responseData": {"type": "wrong_type"}}, + "Invalid credential type", + ), + ( + { + "type": "getResponse", + "responseData": {"type": "public-key", "response": {}}, + }, + "KeyError", + ), + ( + { + "type": "getResponse", + "responseData": { + "type": "public-key", + "response": {"clientDataJSON": "fake_client_data_json_base64"}, + }, + }, + "KeyError", + ), + ( + { + "type": "getResponse", + "responseData": { + "type": "public-key", + "response": { + "clientDataJSON": "fake_client_data_json_base64", + "authenticatorData": "fake_authenticator_data_base64", + }, + }, + }, + "KeyError", + ), + ( + { + "type": "getResponse", + "responseData": { + "type": "public-key", + "response": { + "clientDataJSON": "fake_client_data_json_base64", + "authenticatorData": "fake_authenticator_data_base64", + "signature": "fake_signature_base64", + }, + }, + }, + "KeyError", + ), + ], +) +def test_GetResponse_error(input_dict, expected_error): + with pytest.raises(Exception) as excinfo: + webauthn_types.GetResponse.from_json(json.dumps(input_dict)) + if expected_error == "KeyError": + assert excinfo.type is KeyError + else: + assert expected_error in str(excinfo.value) + + +def test_MalformatedJsonInput(): + with pytest.raises(ValueError) as excinfo: + webauthn_types.GetResponse.from_json(")]}") + assert "Invalid Get JSON response" in str(excinfo.value) |