aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/google-auth/py3/tests
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-07-16 11:26:46 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-07-16 11:37:31 +0300
commit4f7ed63af6564380b49bfe854d3b8d5294dbbd5a (patch)
tree88396187d9355fe619023d504b99e75ddc56f344 /contrib/python/google-auth/py3/tests
parent0ba3062c6dde0f103ca69900448ba0ec477fbaa9 (diff)
downloadydb-4f7ed63af6564380b49bfe854d3b8d5294dbbd5a.tar.gz
Intermediate changes
Diffstat (limited to 'contrib/python/google-auth/py3/tests')
-rw-r--r--contrib/python/google-auth/py3/tests/transport/test__mtls_helper.py219
-rw-r--r--contrib/python/google-auth/py3/tests/transport/test_grpc.py36
2 files changed, 203 insertions, 52 deletions
diff --git a/contrib/python/google-auth/py3/tests/transport/test__mtls_helper.py b/contrib/python/google-auth/py3/tests/transport/test__mtls_helper.py
index 1621a053021..b195616dd57 100644
--- a/contrib/python/google-auth/py3/tests/transport/test__mtls_helper.py
+++ b/contrib/python/google-auth/py3/tests/transport/test__mtls_helper.py
@@ -126,7 +126,7 @@ class TestCheckaMetadataPath(object):
class TestReadMetadataFile(object):
def test_success(self):
metadata_path = os.path.join(pytest.data_dir, "context_aware_metadata.json")
- metadata = _mtls_helper._read_dca_metadata_file(metadata_path)
+ metadata = _mtls_helper._load_json_file(metadata_path)
assert "cert_provider_command" in metadata
@@ -134,7 +134,7 @@ class TestReadMetadataFile(object):
# read a file which is not json format.
metadata_path = os.path.join(pytest.data_dir, "privatekey.pem")
with pytest.raises(exceptions.ClientCertError):
- _mtls_helper._read_dca_metadata_file(metadata_path)
+ _mtls_helper._load_json_file(metadata_path)
class TestRunCertProviderCommand(object):
@@ -277,22 +277,18 @@ class TestGetClientSslCredentials(object):
@mock.patch(
"google.auth.transport._mtls_helper._run_cert_provider_command", autospec=True
)
- @mock.patch(
- "google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True
- )
+ @mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
@mock.patch(
"google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True
)
def test_success(
self,
mock_check_dca_metadata_path,
- mock_read_dca_metadata_file,
+ mock_load_json_file,
mock_run_cert_provider_command,
):
mock_check_dca_metadata_path.return_value = True
- mock_read_dca_metadata_file.return_value = {
- "cert_provider_command": ["command"]
- }
+ mock_load_json_file.return_value = {"cert_provider_command": ["command"]}
mock_run_cert_provider_command.return_value = (b"cert", b"key", None)
has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials()
assert has_cert
@@ -314,22 +310,18 @@ class TestGetClientSslCredentials(object):
@mock.patch(
"google.auth.transport._mtls_helper._run_cert_provider_command", autospec=True
)
- @mock.patch(
- "google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True
- )
+ @mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
@mock.patch(
"google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True
)
def test_success_with_encrypted_key(
self,
mock_check_dca_metadata_path,
- mock_read_dca_metadata_file,
+ mock_load_json_file,
mock_run_cert_provider_command,
):
mock_check_dca_metadata_path.return_value = True
- mock_read_dca_metadata_file.return_value = {
- "cert_provider_command": ["command"]
- }
+ mock_load_json_file.return_value = {"cert_provider_command": ["command"]}
mock_run_cert_provider_command.return_value = (b"cert", b"key", b"passphrase")
has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials(
generate_encrypted_key=True
@@ -342,40 +334,34 @@ class TestGetClientSslCredentials(object):
["command", "--with_passphrase"], expect_encrypted_key=True
)
- @mock.patch(
- "google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True
- )
+ @mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
@mock.patch(
"google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True
)
def test_missing_cert_command(
- self, mock_check_dca_metadata_path, mock_read_dca_metadata_file
+ self, mock_check_dca_metadata_path, mock_load_json_file
):
mock_check_dca_metadata_path.return_value = True
- mock_read_dca_metadata_file.return_value = {}
+ mock_load_json_file.return_value = {}
with pytest.raises(exceptions.ClientCertError):
_mtls_helper.get_client_ssl_credentials()
@mock.patch(
"google.auth.transport._mtls_helper._run_cert_provider_command", autospec=True
)
- @mock.patch(
- "google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True
- )
+ @mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
@mock.patch(
"google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True
)
def test_customize_context_aware_metadata_path(
self,
mock_check_dca_metadata_path,
- mock_read_dca_metadata_file,
+ mock_load_json_file,
mock_run_cert_provider_command,
):
context_aware_metadata_path = "/path/to/metata/data"
mock_check_dca_metadata_path.return_value = context_aware_metadata_path
- mock_read_dca_metadata_file.return_value = {
- "cert_provider_command": ["command"]
- }
+ mock_load_json_file.return_value = {"cert_provider_command": ["command"]}
mock_run_cert_provider_command.return_value = (b"cert", b"key", None)
has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials(
@@ -387,7 +373,182 @@ class TestGetClientSslCredentials(object):
assert key == b"key"
assert passphrase is None
mock_check_dca_metadata_path.assert_called_with(context_aware_metadata_path)
- mock_read_dca_metadata_file.assert_called_with(context_aware_metadata_path)
+ mock_load_json_file.assert_called_with(context_aware_metadata_path)
+
+
+class TestGetWorkloadCertAndKey(object):
+ @mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
+ @mock.patch(
+ "google.auth.transport._mtls_helper._get_cert_config_path", autospec=True
+ )
+ @mock.patch(
+ "google.auth.transport._mtls_helper._read_cert_and_key_files", autospec=True
+ )
+ def test_success(
+ self,
+ mock_read_cert_and_key_files,
+ mock_get_cert_config_path,
+ mock_load_json_file,
+ ):
+ cert_config_path = "/path/to/cert"
+ mock_get_cert_config_path.return_value = "/path/to/cert"
+ mock_load_json_file.return_value = {
+ "cert_configs": {
+ "workload": {"cert_path": "cert/path", "key_path": "key/path"}
+ }
+ }
+ mock_read_cert_and_key_files.return_value = (
+ pytest.public_cert_bytes,
+ pytest.private_key_bytes,
+ )
+
+ actual_cert, actual_key = _mtls_helper._get_workload_cert_and_key(
+ cert_config_path
+ )
+ assert actual_cert == pytest.public_cert_bytes
+ assert actual_key == pytest.private_key_bytes
+
+ @mock.patch(
+ "google.auth.transport._mtls_helper._get_cert_config_path", autospec=True
+ )
+ def test_file_not_found_returns_none(self, mock_get_cert_config_path):
+ mock_get_cert_config_path.return_value = None
+
+ actual_cert, actual_key = _mtls_helper._get_workload_cert_and_key()
+ assert actual_cert is None
+ assert actual_key is None
+
+ @mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
+ @mock.patch(
+ "google.auth.transport._mtls_helper._get_cert_config_path", autospec=True
+ )
+ def test_no_cert_configs(self, mock_get_cert_config_path, mock_load_json_file):
+ mock_get_cert_config_path.return_value = "/path/to/cert"
+ mock_load_json_file.return_value = {}
+
+ with pytest.raises(exceptions.ClientCertError):
+ _mtls_helper._get_workload_cert_and_key("")
+
+ @mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
+ @mock.patch(
+ "google.auth.transport._mtls_helper._get_cert_config_path", autospec=True
+ )
+ def test_no_workload(self, mock_get_cert_config_path, mock_load_json_file):
+ mock_get_cert_config_path.return_value = "/path/to/cert"
+ mock_load_json_file.return_value = {"cert_configs": {}}
+
+ with pytest.raises(exceptions.ClientCertError):
+ _mtls_helper._get_workload_cert_and_key("")
+
+ @mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
+ @mock.patch(
+ "google.auth.transport._mtls_helper._get_cert_config_path", autospec=True
+ )
+ def test_no_cert_file(self, mock_get_cert_config_path, mock_load_json_file):
+ mock_get_cert_config_path.return_value = "/path/to/cert"
+ mock_load_json_file.return_value = {
+ "cert_configs": {"workload": {"key_path": "path/to/key"}}
+ }
+
+ with pytest.raises(exceptions.ClientCertError):
+ _mtls_helper._get_workload_cert_and_key("")
+
+ @mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
+ @mock.patch(
+ "google.auth.transport._mtls_helper._get_cert_config_path", autospec=True
+ )
+ def test_no_key_file(self, mock_get_cert_config_path, mock_load_json_file):
+ mock_get_cert_config_path.return_value = "/path/to/cert"
+ mock_load_json_file.return_value = {
+ "cert_configs": {"workload": {"cert_path": "path/to/key"}}
+ }
+
+ with pytest.raises(exceptions.ClientCertError):
+ _mtls_helper._get_workload_cert_and_key("")
+
+
+class TestReadCertAndKeyFile(object):
+ def test_success(self):
+ cert_path = os.path.join(pytest.data_dir, "public_cert.pem")
+ key_path = os.path.join(pytest.data_dir, "privatekey.pem")
+
+ actual_cert, actual_key = _mtls_helper._read_cert_and_key_files(
+ cert_path, key_path
+ )
+ assert actual_cert == pytest.public_cert_bytes
+ assert actual_key == pytest.private_key_bytes
+
+ def test_no_cert_file(self):
+ cert_path = "fake/file/path"
+ key_path = os.path.join(pytest.data_dir, "privatekey.pem")
+ with pytest.raises(FileNotFoundError):
+ _mtls_helper._read_cert_and_key_files(cert_path, key_path)
+
+ def test_no_key_file(self):
+ cert_path = os.path.join(pytest.data_dir, "public_cert.pem")
+ key_path = "fake/file/path"
+ with pytest.raises(FileNotFoundError):
+ _mtls_helper._read_cert_and_key_files(cert_path, key_path)
+
+ def test_invalid_cert_file(self):
+ cert_path = os.path.join(pytest.data_dir, "service_account.json")
+ key_path = os.path.join(pytest.data_dir, "privatekey.pem")
+ with pytest.raises(exceptions.ClientCertError):
+ _mtls_helper._read_cert_and_key_files(cert_path, key_path)
+
+ def test_invalid_key_file(self):
+ cert_path = os.path.join(pytest.data_dir, "public_cert.pem")
+ key_path = os.path.join(pytest.data_dir, "public_cert.pem")
+ with pytest.raises(exceptions.ClientCertError):
+ _mtls_helper._read_cert_and_key_files(cert_path, key_path)
+
+
+class TestGetCertConfigPath(object):
+ def test_success_with_override(self):
+ config_path = os.path.join(pytest.data_dir, "service_account.json")
+ returned_path = _mtls_helper._get_cert_config_path(config_path)
+ assert returned_path == config_path
+
+ def test_override_does_not_exist(self):
+ config_path = "fake/file/path"
+ returned_path = _mtls_helper._get_cert_config_path(config_path)
+ assert returned_path is None
+
+ @mock.patch.dict(os.environ, {"GOOGLE_API_CERTIFICATE_CONFIG": ""})
+ @mock.patch("os.path.exists", autospec=True)
+ def test_default(self, mock_path_exists):
+ mock_path_exists.return_value = True
+ returned_path = _mtls_helper._get_cert_config_path()
+ expected_path = os.path.expanduser(
+ _mtls_helper._CERTIFICATE_CONFIGURATION_DEFAULT_PATH
+ )
+ assert returned_path == expected_path
+
+ @mock.patch.dict(
+ os.environ, {"GOOGLE_API_CERTIFICATE_CONFIG": "path/to/config/file"}
+ )
+ @mock.patch("os.path.exists", autospec=True)
+ def test_env_variable(self, mock_path_exists):
+ mock_path_exists.return_value = True
+ returned_path = _mtls_helper._get_cert_config_path()
+ expected_path = "path/to/config/file"
+ assert returned_path == expected_path
+
+ @mock.patch.dict(os.environ, {"GOOGLE_API_CERTIFICATE_CONFIG": ""})
+ @mock.patch("os.path.exists", autospec=True)
+ def test_env_variable_file_does_not_exist(self, mock_path_exists):
+ mock_path_exists.return_value = False
+ returned_path = _mtls_helper._get_cert_config_path()
+ assert returned_path is None
+
+ @mock.patch.dict(
+ os.environ, {"GOOGLE_API_CERTIFICATE_CONFIG": "path/to/config/file"}
+ )
+ @mock.patch("os.path.exists", autospec=True)
+ def test_default_file_does_not_exist(self, mock_path_exists):
+ mock_path_exists.return_value = False
+ returned_path = _mtls_helper._get_cert_config_path()
+ assert returned_path is None
class TestGetClientCertAndKey(object):
diff --git a/contrib/python/google-auth/py3/tests/transport/test_grpc.py b/contrib/python/google-auth/py3/tests/transport/test_grpc.py
index 29fae4cdf65..9badb59b284 100644
--- a/contrib/python/google-auth/py3/tests/transport/test_grpc.py
+++ b/contrib/python/google-auth/py3/tests/transport/test_grpc.py
@@ -142,16 +142,14 @@ class TestAuthMetadataPlugin(object):
@mock.patch("grpc.ssl_channel_credentials", autospec=True)
@mock.patch("grpc.secure_channel", autospec=True)
class TestSecureAuthorizedChannel(object):
- @mock.patch(
- "google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True
- )
+ @mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
@mock.patch(
"google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True
)
def test_secure_authorized_channel_adc(
self,
check_dca_metadata_path,
- read_dca_metadata_file,
+ load_json_file,
secure_channel,
ssl_channel_credentials,
metadata_call_credentials,
@@ -165,9 +163,7 @@ class TestSecureAuthorizedChannel(object):
# Mock the context aware metadata and client cert/key so mTLS SSL channel
# will be used.
check_dca_metadata_path.return_value = METADATA_PATH
- read_dca_metadata_file.return_value = {
- "cert_provider_command": ["some command"]
- }
+ load_json_file.return_value = {"cert_provider_command": ["some command"]}
get_client_ssl_credentials.return_value = (
True,
PUBLIC_CERT_BYTES,
@@ -335,16 +331,14 @@ class TestSecureAuthorizedChannel(object):
ssl_channel_credentials.return_value, metadata_call_credentials.return_value
)
- @mock.patch(
- "google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True
- )
+ @mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
@mock.patch(
"google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True
)
def test_secure_authorized_channel_with_client_cert_callback_failure(
self,
check_dca_metadata_path,
- read_dca_metadata_file,
+ load_json_file,
secure_channel,
ssl_channel_credentials,
metadata_call_credentials,
@@ -406,7 +400,7 @@ class TestSecureAuthorizedChannel(object):
@mock.patch(
"google.auth.transport._mtls_helper.get_client_ssl_credentials", autospec=True
)
-@mock.patch("google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True)
+@mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
@mock.patch(
"google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True
)
@@ -414,7 +408,7 @@ class TestSslCredentials(object):
def test_no_context_aware_metadata(
self,
mock_check_dca_metadata_path,
- mock_read_dca_metadata_file,
+ mock_load_json_file,
mock_get_client_ssl_credentials,
mock_ssl_channel_credentials,
):
@@ -437,14 +431,12 @@ class TestSslCredentials(object):
def test_get_client_ssl_credentials_failure(
self,
mock_check_dca_metadata_path,
- mock_read_dca_metadata_file,
+ mock_load_json_file,
mock_get_client_ssl_credentials,
mock_ssl_channel_credentials,
):
mock_check_dca_metadata_path.return_value = METADATA_PATH
- mock_read_dca_metadata_file.return_value = {
- "cert_provider_command": ["some command"]
- }
+ mock_load_json_file.return_value = {"cert_provider_command": ["some command"]}
# Mock that client cert and key are not loaded and exception is raised.
mock_get_client_ssl_credentials.side_effect = exceptions.ClientCertError()
@@ -458,14 +450,12 @@ class TestSslCredentials(object):
def test_get_client_ssl_credentials_success(
self,
mock_check_dca_metadata_path,
- mock_read_dca_metadata_file,
+ mock_load_json_file,
mock_get_client_ssl_credentials,
mock_ssl_channel_credentials,
):
mock_check_dca_metadata_path.return_value = METADATA_PATH
- mock_read_dca_metadata_file.return_value = {
- "cert_provider_command": ["some command"]
- }
+ mock_load_json_file.return_value = {"cert_provider_command": ["some command"]}
mock_get_client_ssl_credentials.return_value = (
True,
PUBLIC_CERT_BYTES,
@@ -488,7 +478,7 @@ class TestSslCredentials(object):
def test_get_client_ssl_credentials_without_client_cert_env(
self,
mock_check_dca_metadata_path,
- mock_read_dca_metadata_file,
+ mock_load_json_file,
mock_get_client_ssl_credentials,
mock_ssl_channel_credentials,
):
@@ -498,6 +488,6 @@ class TestSslCredentials(object):
assert ssl_credentials.ssl_credentials is not None
assert not ssl_credentials.is_mtls
mock_check_dca_metadata_path.assert_not_called()
- mock_read_dca_metadata_file.assert_not_called()
+ mock_load_json_file.assert_not_called()
mock_get_client_ssl_credentials.assert_not_called()
mock_ssl_channel_credentials.assert_called_once()