aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/google-auth/py3/tests
diff options
context:
space:
mode:
authorAlexSm <alex@ydb.tech>2023-12-27 23:31:58 +0100
committerGitHub <noreply@github.com>2023-12-27 23:31:58 +0100
commitd67bfb4b4b7549081543e87a31bc6cb5c46ac973 (patch)
tree8674f2f1570877cb653e7ddcff37ba00288de15a /contrib/python/google-auth/py3/tests
parent1f6bef05ed441c3aa2d565ac792b26cded704ac7 (diff)
downloadydb-d67bfb4b4b7549081543e87a31bc6cb5c46ac973.tar.gz
Import libs 4 (#758)
Diffstat (limited to 'contrib/python/google-auth/py3/tests')
-rw-r--r--contrib/python/google-auth/py3/tests/compute_engine/test__metadata.py79
-rw-r--r--contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py24
-rw-r--r--contrib/python/google-auth/py3/tests/conftest.py11
-rw-r--r--contrib/python/google-auth/py3/tests/crypt/test__cryptography_rsa.py19
-rw-r--r--contrib/python/google-auth/py3/tests/crypt/test__python_rsa.py4
-rw-r--r--contrib/python/google-auth/py3/tests/crypt/test_crypt.py4
-rw-r--r--contrib/python/google-auth/py3/tests/crypt/test_es256.py17
-rw-r--r--contrib/python/google-auth/py3/tests/data/enterprise_cert_valid_provider.json6
-rw-r--r--contrib/python/google-auth/py3/tests/oauth2/test__client.py4
-rw-r--r--contrib/python/google-auth/py3/tests/oauth2/test_credentials.py33
-rw-r--r--contrib/python/google-auth/py3/tests/oauth2/test_gdch_credentials.py4
-rw-r--r--contrib/python/google-auth/py3/tests/oauth2/test_id_token.py10
-rw-r--r--contrib/python/google-auth/py3/tests/oauth2/test_service_account.py29
-rw-r--r--contrib/python/google-auth/py3/tests/test__cloud_sdk.py7
-rw-r--r--contrib/python/google-auth/py3/tests/test__default.py4
-rw-r--r--contrib/python/google-auth/py3/tests/test__helpers.py26
-rw-r--r--contrib/python/google-auth/py3/tests/test__oauth2client.py4
-rw-r--r--contrib/python/google-auth/py3/tests/test__service_account_info.py4
-rw-r--r--contrib/python/google-auth/py3/tests/test_aws.py4
-rw-r--r--contrib/python/google-auth/py3/tests/test_credentials.py18
-rw-r--r--contrib/python/google-auth/py3/tests/test_external_account.py51
-rw-r--r--contrib/python/google-auth/py3/tests/test_identity_pool.py6
-rw-r--r--contrib/python/google-auth/py3/tests/test_impersonated_credentials.py4
-rw-r--r--contrib/python/google-auth/py3/tests/test_jwt.py4
-rw-r--r--contrib/python/google-auth/py3/tests/transport/test__custom_tls_signer.py108
-rw-r--r--contrib/python/google-auth/py3/tests/transport/test__mtls_helper.py11
-rw-r--r--contrib/python/google-auth/py3/tests/transport/test_grpc.py4
-rw-r--r--contrib/python/google-auth/py3/tests/transport/test_requests.py5
-rw-r--r--contrib/python/google-auth/py3/tests/ya.make5
29 files changed, 368 insertions, 141 deletions
diff --git a/contrib/python/google-auth/py3/tests/compute_engine/test__metadata.py b/contrib/python/google-auth/py3/tests/compute_engine/test__metadata.py
index ddf84596af..5e037a940b 100644
--- a/contrib/python/google-auth/py3/tests/compute_engine/test__metadata.py
+++ b/contrib/python/google-auth/py3/tests/compute_engine/test__metadata.py
@@ -63,6 +63,7 @@ def make_request(data, status=http_client.OK, headers=None, retry=False):
return request
+@pytest.mark.xfail
def test_detect_gce_residency_linux_success():
_metadata._GCE_PRODUCT_NAME_FILE = SMBIOS_PRODUCT_NAME_FILE
assert _metadata.detect_gce_residency_linux()
@@ -89,6 +90,7 @@ def test_is_on_gce_windows_success():
assert not _metadata.is_on_gce(request)
+@pytest.mark.xfail
@mock.patch("os.name", new="posix")
def test_is_on_gce_linux_success():
request = make_request("", headers={_metadata._METADATA_FLAVOR_HEADER: "meep"})
@@ -176,6 +178,24 @@ def test_get_success_json():
assert result[key] == value
+def test_get_success_json_content_type_charset():
+ key, value = "foo", "bar"
+
+ data = json.dumps({key: value})
+ request = make_request(
+ data, headers={"content-type": "application/json; charset=UTF-8"}
+ )
+
+ result = _metadata.get(request, PATH)
+
+ request.assert_called_once_with(
+ method="GET",
+ url=_metadata._METADATA_ROOT + PATH,
+ headers=_metadata._METADATA_HEADERS,
+ )
+ assert result[key] == value
+
+
def test_get_success_retry():
key, value = "foo", "bar"
@@ -307,6 +327,18 @@ def test_get_failure():
)
+def test_get_return_none_for_not_found_error():
+ request = make_request("Metadata error", status=http_client.NOT_FOUND)
+
+ assert _metadata.get(request, PATH, return_none_for_not_found_error=True) is None
+
+ request.assert_called_once_with(
+ method="GET",
+ url=_metadata._METADATA_ROOT + PATH,
+ headers=_metadata._METADATA_HEADERS,
+ )
+
+
def test_get_failure_connection_failed():
request = make_request("")
request.side_effect = exceptions.TransportError()
@@ -353,6 +385,53 @@ def test_get_project_id():
assert project_id == project
+def test_get_universe_domain_success():
+ request = make_request(
+ "fake_universe_domain", headers={"content-type": "text/plain"}
+ )
+
+ universe_domain = _metadata.get_universe_domain(request)
+
+ request.assert_called_once_with(
+ method="GET",
+ url=_metadata._METADATA_ROOT + "universe/universe_domain",
+ headers=_metadata._METADATA_HEADERS,
+ )
+ assert universe_domain == "fake_universe_domain"
+
+
+def test_get_universe_domain_not_found():
+ # Test that if the universe domain endpoint returns 404 error, we should
+ # use googleapis.com as the universe domain
+ request = make_request("not found", status=http_client.NOT_FOUND)
+
+ universe_domain = _metadata.get_universe_domain(request)
+
+ request.assert_called_once_with(
+ method="GET",
+ url=_metadata._METADATA_ROOT + "universe/universe_domain",
+ headers=_metadata._METADATA_HEADERS,
+ )
+ assert universe_domain == "googleapis.com"
+
+
+def test_get_universe_domain_other_error():
+ # Test that if the universe domain endpoint returns an error other than 404
+ # we should throw the error
+ request = make_request("unauthorized", status=http_client.UNAUTHORIZED)
+
+ with pytest.raises(exceptions.TransportError) as excinfo:
+ _metadata.get_universe_domain(request)
+
+ assert excinfo.match(r"unauthorized")
+
+ request.assert_called_once_with(
+ method="GET",
+ url=_metadata._METADATA_ROOT + "universe/universe_domain",
+ headers=_metadata._METADATA_HEADERS,
+ )
+
+
@mock.patch(
"google.auth.metrics.token_request_access_token_mds",
return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
diff --git a/contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py b/contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py
index 507fea9fcc..5d6ccdcdec 100644
--- a/contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py
+++ b/contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py
@@ -208,6 +208,30 @@ class TestCredentials(object):
assert headers["authorization"] == "Bearer token"
assert headers["x-goog-api-client"] == "cred-type/mds"
+ @mock.patch(
+ "google.auth.compute_engine._metadata.get_universe_domain",
+ return_value="fake_universe_domain",
+ )
+ def test_universe_domain(self, get_universe_domain):
+ self.credentials._universe_domain_cached = False
+ self.credentials._universe_domain = "googleapis.com"
+
+ # calling the universe_domain property should trigger a call to
+ # get_universe_domain to fetch the value. The value should be cached.
+ assert self.credentials.universe_domain == "fake_universe_domain"
+ assert self.credentials._universe_domain == "fake_universe_domain"
+ assert self.credentials._universe_domain_cached
+ get_universe_domain.assert_called_once_with(
+ self.credentials._universe_domain_request
+ )
+
+ # calling the universe_domain property the second time should use the
+ # cached value instead of calling get_universe_domain
+ assert self.credentials.universe_domain == "fake_universe_domain"
+ get_universe_domain.assert_called_once_with(
+ self.credentials._universe_domain_request
+ )
+
class TestIDTokenCredentials(object):
credentials = None
diff --git a/contrib/python/google-auth/py3/tests/conftest.py b/contrib/python/google-auth/py3/tests/conftest.py
index 08896b0f82..7658d8f456 100644
--- a/contrib/python/google-auth/py3/tests/conftest.py
+++ b/contrib/python/google-auth/py3/tests/conftest.py
@@ -21,9 +21,14 @@ import pytest # type: ignore
def pytest_configure():
"""Load public certificate and private key."""
- import __res
- pytest.private_key_bytes = __res.find("data/privatekey.pem")
- pytest.public_cert_bytes = __res.find("data/public_cert.pem")
+ import yatest.common as yc
+ pytest.data_dir = os.path.join(os.path.dirname(yc.source_path("contrib/python/google-auth/py3/tests/conftest.py")), "data")
+
+ with open(os.path.join(pytest.data_dir, "privatekey.pem"), "rb") as fh:
+ pytest.private_key_bytes = fh.read()
+
+ with open(os.path.join(pytest.data_dir, "public_cert.pem"), "rb") as fh:
+ pytest.public_cert_bytes = fh.read()
@pytest.fixture
diff --git a/contrib/python/google-auth/py3/tests/crypt/test__cryptography_rsa.py b/contrib/python/google-auth/py3/tests/crypt/test__cryptography_rsa.py
index d19154b61b..2c4cebe0d7 100644
--- a/contrib/python/google-auth/py3/tests/crypt/test__cryptography_rsa.py
+++ b/contrib/python/google-auth/py3/tests/crypt/test__cryptography_rsa.py
@@ -14,6 +14,7 @@
import json
import os
+import pickle
from cryptography.hazmat.primitives.asymmetric import rsa
import pytest # type: ignore
@@ -23,8 +24,8 @@ from google.auth.crypt import _cryptography_rsa
from google.auth.crypt import base
-import yatest.common
-DATA_DIR = os.path.join(yatest.common.test_source_path(), "data")
+import yatest.common as yc
+DATA_DIR = os.path.join(os.path.dirname(yc.source_path(__file__)), "..", "data")
# To generate privatekey.pem, privatekey.pub, and public_cert.pem:
# $ openssl req -new -newkey rsa:1024 -x509 -nodes -out public_cert.pem \
@@ -160,3 +161,17 @@ class TestRSASigner(object):
assert signer.key_id == SERVICE_ACCOUNT_INFO[base._JSON_FILE_PRIVATE_KEY_ID]
assert isinstance(signer._key, rsa.RSAPrivateKey)
+
+ def test_pickle(self):
+ signer = _cryptography_rsa.RSASigner.from_service_account_file(
+ SERVICE_ACCOUNT_JSON_FILE
+ )
+
+ assert signer.key_id == SERVICE_ACCOUNT_INFO[base._JSON_FILE_PRIVATE_KEY_ID]
+ assert isinstance(signer._key, rsa.RSAPrivateKey)
+
+ pickled_signer = pickle.dumps(signer)
+ signer = pickle.loads(pickled_signer)
+
+ assert signer.key_id == SERVICE_ACCOUNT_INFO[base._JSON_FILE_PRIVATE_KEY_ID]
+ assert isinstance(signer._key, rsa.RSAPrivateKey)
diff --git a/contrib/python/google-auth/py3/tests/crypt/test__python_rsa.py b/contrib/python/google-auth/py3/tests/crypt/test__python_rsa.py
index 592b523d92..75dcb314f7 100644
--- a/contrib/python/google-auth/py3/tests/crypt/test__python_rsa.py
+++ b/contrib/python/google-auth/py3/tests/crypt/test__python_rsa.py
@@ -26,8 +26,8 @@ from google.auth.crypt import _python_rsa
from google.auth.crypt import base
-import yatest.common
-DATA_DIR = os.path.join(yatest.common.test_source_path(), "data")
+import yatest.common as yc
+DATA_DIR = os.path.join(os.path.dirname(yc.source_path(__file__)), "..", "data")
# To generate privatekey.pem, privatekey.pub, and public_cert.pem:
# $ openssl req -new -newkey rsa:1024 -x509 -nodes -out public_cert.pem \
diff --git a/contrib/python/google-auth/py3/tests/crypt/test_crypt.py b/contrib/python/google-auth/py3/tests/crypt/test_crypt.py
index 97c2abc257..30de18a5dd 100644
--- a/contrib/python/google-auth/py3/tests/crypt/test_crypt.py
+++ b/contrib/python/google-auth/py3/tests/crypt/test_crypt.py
@@ -17,8 +17,8 @@ import os
from google.auth import crypt
-import yatest.common
-DATA_DIR = os.path.join(yatest.common.test_source_path(), "data")
+import yatest.common as yc
+DATA_DIR = os.path.join(os.path.dirname(yc.source_path(__file__)), "..", "data")
# To generate privatekey.pem, privatekey.pub, and public_cert.pem:
# $ openssl req -new -newkey rsa:1024 -x509 -nodes -out public_cert.pem \
diff --git a/contrib/python/google-auth/py3/tests/crypt/test_es256.py b/contrib/python/google-auth/py3/tests/crypt/test_es256.py
index 1a43a2f01b..3ba5b64fad 100644
--- a/contrib/python/google-auth/py3/tests/crypt/test_es256.py
+++ b/contrib/python/google-auth/py3/tests/crypt/test_es256.py
@@ -15,6 +15,7 @@
import base64
import json
import os
+import pickle
from cryptography.hazmat.primitives.asymmetric import ec
import pytest # type: ignore
@@ -24,8 +25,8 @@ from google.auth.crypt import base
from google.auth.crypt import es256
-import yatest.common
-DATA_DIR = os.path.join(yatest.common.test_source_path(), "data")
+import yatest.common as yc
+DATA_DIR = os.path.join(os.path.dirname(yc.source_path(__file__)), "..", "data")
# To generate es256_privatekey.pem, es256_privatekey.pub, and
# es256_public_cert.pem:
@@ -142,3 +143,15 @@ class TestES256Signer(object):
assert signer.key_id == SERVICE_ACCOUNT_INFO[base._JSON_FILE_PRIVATE_KEY_ID]
assert isinstance(signer._key, ec.EllipticCurvePrivateKey)
+
+ def test_pickle(self):
+ signer = es256.ES256Signer.from_service_account_file(SERVICE_ACCOUNT_JSON_FILE)
+
+ assert signer.key_id == SERVICE_ACCOUNT_INFO[base._JSON_FILE_PRIVATE_KEY_ID]
+ assert isinstance(signer._key, ec.EllipticCurvePrivateKey)
+
+ pickled_signer = pickle.dumps(signer)
+ signer = pickle.loads(pickled_signer)
+
+ assert signer.key_id == SERVICE_ACCOUNT_INFO[base._JSON_FILE_PRIVATE_KEY_ID]
+ assert isinstance(signer._key, ec.EllipticCurvePrivateKey)
diff --git a/contrib/python/google-auth/py3/tests/data/enterprise_cert_valid_provider.json b/contrib/python/google-auth/py3/tests/data/enterprise_cert_valid_provider.json
new file mode 100644
index 0000000000..9b7adf8bc3
--- /dev/null
+++ b/contrib/python/google-auth/py3/tests/data/enterprise_cert_valid_provider.json
@@ -0,0 +1,6 @@
+{
+ "libs": {
+ "ecp_client": "/path/to/signer/lib",
+ "ecp_provider": "/path/to/provider/lib"
+ }
+}
diff --git a/contrib/python/google-auth/py3/tests/oauth2/test__client.py b/contrib/python/google-auth/py3/tests/oauth2/test__client.py
index 54179269bd..444232f396 100644
--- a/contrib/python/google-auth/py3/tests/oauth2/test__client.py
+++ b/contrib/python/google-auth/py3/tests/oauth2/test__client.py
@@ -29,8 +29,8 @@ from google.auth import transport
from google.oauth2 import _client
-import yatest.common
-DATA_DIR = os.path.join(yatest.common.test_source_path(), "data")
+import yatest.common as yc
+DATA_DIR = os.path.join(os.path.dirname(yc.source_path(__file__)), "..", "data")
with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh:
PRIVATE_KEY_BYTES = fh.read()
diff --git a/contrib/python/google-auth/py3/tests/oauth2/test_credentials.py b/contrib/python/google-auth/py3/tests/oauth2/test_credentials.py
index f2604a5f18..d6a1915862 100644
--- a/contrib/python/google-auth/py3/tests/oauth2/test_credentials.py
+++ b/contrib/python/google-auth/py3/tests/oauth2/test_credentials.py
@@ -27,8 +27,8 @@ from google.auth import transport
from google.oauth2 import credentials
-import yatest.common
-DATA_DIR = os.path.join(yatest.common.test_source_path(), "data")
+import yatest.common as yc
+DATA_DIR = os.path.join(os.path.dirname(yc.source_path(__file__)), "..", "data")
AUTH_USER_JSON_FILE = os.path.join(DATA_DIR, "authorized_user.json")
@@ -123,6 +123,17 @@ class TestCredentials(object):
assert excinfo.match("The provided refresh_handler is not a callable or None.")
+ def test_refresh_with_non_default_universe_domain(self):
+ creds = credentials.Credentials(
+ token="token", universe_domain="dummy_universe.com"
+ )
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ creds.refresh(mock.Mock())
+
+ assert excinfo.match(
+ "refresh is only supported in the default googleapis.com universe domain"
+ )
+
@mock.patch("google.oauth2.reauth.refresh_grant", autospec=True)
@mock.patch(
"google.auth._helpers.utcnow",
@@ -775,6 +786,12 @@ class TestCredentials(object):
creds.apply(headers)
assert "x-goog-user-project" in headers
+ def test_with_universe_domain(self):
+ creds = credentials.Credentials(token="token")
+ assert creds.universe_domain == "googleapis.com"
+ new_creds = creds.with_universe_domain("dummy_universe.com")
+ assert new_creds.universe_domain == "dummy_universe.com"
+
def test_with_token_uri(self):
info = AUTH_USER_INFO.copy()
@@ -869,6 +886,7 @@ class TestCredentials(object):
assert json_asdict.get("scopes") == creds.scopes
assert json_asdict.get("client_secret") == creds.client_secret
assert json_asdict.get("expiry") == info["expiry"]
+ assert json_asdict.get("universe_domain") == creds.universe_domain
# Test with a `strip` arg
json_output = creds.to_json(strip=["client_secret"])
@@ -896,6 +914,17 @@ class TestCredentials(object):
for attr in list(creds.__dict__):
assert getattr(creds, attr) == getattr(unpickled, attr)
+ def test_pickle_and_unpickle_universe_domain(self):
+ # old version of auth lib doesn't have _universe_domain, so the pickled
+ # cred doesn't have such a field.
+ creds = self.make_credentials()
+ del creds._universe_domain
+
+ unpickled = pickle.loads(pickle.dumps(creds))
+
+ # make sure the unpickled cred sets _universe_domain to default.
+ assert unpickled.universe_domain == "googleapis.com"
+
def test_pickle_and_unpickle_with_refresh_handler(self):
expected_expiry = _helpers.utcnow() + datetime.timedelta(seconds=2800)
refresh_handler = mock.Mock(return_value=("TOKEN", expected_expiry))
diff --git a/contrib/python/google-auth/py3/tests/oauth2/test_gdch_credentials.py b/contrib/python/google-auth/py3/tests/oauth2/test_gdch_credentials.py
index 1ff61d8683..9a67a07345 100644
--- a/contrib/python/google-auth/py3/tests/oauth2/test_gdch_credentials.py
+++ b/contrib/python/google-auth/py3/tests/oauth2/test_gdch_credentials.py
@@ -27,7 +27,7 @@ import google.auth.transport.requests
from google.oauth2 import gdch_credentials
from google.oauth2.gdch_credentials import ServiceAccountCredentials
-import yatest.common
+import yatest.common as yc
class TestServiceAccountCredentials(object):
@@ -39,7 +39,7 @@ class TestServiceAccountCredentials(object):
TOKEN_URI = "https://service-identity.<Domain>/authenticate"
JSON_PATH = os.path.join(
- yatest.common.test_source_path(), "data", "gdch_service_account.json"
+ os.path.dirname(yc.source_path(__file__)), "..", "data", "gdch_service_account.json"
)
with open(JSON_PATH, "rb") as fh:
INFO = json.load(fh)
diff --git a/contrib/python/google-auth/py3/tests/oauth2/test_id_token.py b/contrib/python/google-auth/py3/tests/oauth2/test_id_token.py
index 861f76ce4f..8657bdfb7e 100644
--- a/contrib/python/google-auth/py3/tests/oauth2/test_id_token.py
+++ b/contrib/python/google-auth/py3/tests/oauth2/test_id_token.py
@@ -24,9 +24,9 @@ from google.auth import transport
from google.oauth2 import id_token
from google.oauth2 import service_account
-import yatest.common
+import yatest.common as yc
SERVICE_ACCOUNT_FILE = os.path.join(
- yatest.common.test_source_path(), "data/service_account.json"
+ os.path.dirname(yc.source_path(__file__)), "../data/service_account.json"
)
ID_TOKEN_AUDIENCE = "https://pubsub.googleapis.com"
@@ -263,7 +263,7 @@ def test_fetch_id_token_credentials_no_cred_exists(monkeypatch):
def test_fetch_id_token_credentials_invalid_cred_file_type(monkeypatch):
user_credentials_file = os.path.join(
- yatest.common.test_source_path(), "data/authorized_user.json"
+ os.path.dirname(yc.source_path(__file__)), "../data/authorized_user.json"
)
monkeypatch.setenv(environment_vars.CREDENTIALS, user_credentials_file)
@@ -276,7 +276,7 @@ def test_fetch_id_token_credentials_invalid_cred_file_type(monkeypatch):
def test_fetch_id_token_credentials_invalid_json(monkeypatch):
- not_json_file = os.path.join(yatest.common.test_source_path(), "data/public_cert.pem")
+ not_json_file = os.path.join(os.path.dirname(yc.source_path(__file__)), "../data/public_cert.pem")
monkeypatch.setenv(environment_vars.CREDENTIALS, not_json_file)
with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
@@ -287,7 +287,7 @@ def test_fetch_id_token_credentials_invalid_json(monkeypatch):
def test_fetch_id_token_credentials_invalid_cred_path(monkeypatch):
- not_json_file = os.path.join(yatest.common.test_source_path(), "data/not_exists.json")
+ not_json_file = os.path.join(os.path.dirname(yc.source_path(__file__)), "../data/not_exists.json")
monkeypatch.setenv(environment_vars.CREDENTIALS, not_json_file)
with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
diff --git a/contrib/python/google-auth/py3/tests/oauth2/test_service_account.py b/contrib/python/google-auth/py3/tests/oauth2/test_service_account.py
index c474c90e6b..8dd5f219be 100644
--- a/contrib/python/google-auth/py3/tests/oauth2/test_service_account.py
+++ b/contrib/python/google-auth/py3/tests/oauth2/test_service_account.py
@@ -27,8 +27,8 @@ from google.auth import transport
from google.oauth2 import service_account
-import yatest.common
-DATA_DIR = os.path.join(yatest.common.test_source_path(), "data")
+import yatest.common as yc
+DATA_DIR = os.path.join(os.path.dirname(yc.source_path(__file__)), "..", "data")
with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh:
PRIVATE_KEY_BYTES = fh.read()
@@ -206,6 +206,17 @@ class TestCredentials(object):
creds_with_new_token_uri = credentials.with_token_uri(new_token_uri)
assert creds_with_new_token_uri._token_uri == new_token_uri
+ def test_with_universe_domain(self):
+ credentials = self.make_credentials()
+
+ new_credentials = credentials.with_universe_domain("dummy_universe.com")
+ assert new_credentials.universe_domain == "dummy_universe.com"
+ assert new_credentials._always_use_jwt_access
+
+ new_credentials = credentials.with_universe_domain("googleapis.com")
+ assert new_credentials.universe_domain == "googleapis.com"
+ assert not new_credentials._always_use_jwt_access
+
def test__with_always_use_jwt_access(self):
credentials = self.make_credentials()
assert not credentials._always_use_jwt_access
@@ -558,12 +569,16 @@ class TestCredentials(object):
assert jwt_grant.called
assert not self_signed_jwt_refresh.called
- def test_refresh_non_gdu_missing_jwt_credentials(self):
- credentials = self.make_credentials(universe_domain="foo")
+ def test_refresh_missing_jwt_credentials(self):
+ credentials = self.make_credentials()
+ credentials = credentials.with_scopes(["foo", "bar"])
+ credentials = credentials.with_always_use_jwt_access(True)
+ assert not credentials._jwt_credentials
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials.refresh(None)
- assert excinfo.match("self._jwt_credentials is missing")
+ credentials.refresh(mock.Mock())
+
+ # jwt credentials should have been automatically created with scopes
+ assert credentials._jwt_credentials is not None
def test_refresh_non_gdu_domain_wide_delegation_not_supported(self):
credentials = self.make_credentials(universe_domain="foo")
diff --git a/contrib/python/google-auth/py3/tests/test__cloud_sdk.py b/contrib/python/google-auth/py3/tests/test__cloud_sdk.py
index 18ac18fa35..d46621a7f3 100644
--- a/contrib/python/google-auth/py3/tests/test__cloud_sdk.py
+++ b/contrib/python/google-auth/py3/tests/test__cloud_sdk.py
@@ -26,8 +26,8 @@ from google.auth import environment_vars
from google.auth import exceptions
-import yatest.common
-DATA_DIR = os.path.join(yatest.common.test_source_path(), "data")
+import yatest.common as yc
+DATA_DIR = os.path.join(os.path.dirname(yc.source_path(__file__)), "data")
AUTHORIZED_USER_FILE = os.path.join(DATA_DIR, "authorized_user.json")
with io.open(AUTHORIZED_USER_FILE, "rb") as fh:
@@ -66,8 +66,7 @@ def test_get_project_id_call_error(check_output):
assert check_output.called
-@pytest.mark.xfail
-def test__run_subprocess_ignore_stderr():
+def _test__run_subprocess_ignore_stderr():
command = [
sys.executable,
"-c",
diff --git a/contrib/python/google-auth/py3/tests/test__default.py b/contrib/python/google-auth/py3/tests/test__default.py
index 29904ec7aa..d619614790 100644
--- a/contrib/python/google-auth/py3/tests/test__default.py
+++ b/contrib/python/google-auth/py3/tests/test__default.py
@@ -36,8 +36,8 @@ from google.oauth2 import service_account
import google.oauth2.credentials
-import yatest.common
-DATA_DIR = os.path.join(yatest.common.test_source_path(), "data")
+import yatest.common as yc
+DATA_DIR = os.path.join(os.path.dirname(yc.source_path(__file__)), "data")
AUTHORIZED_USER_FILE = os.path.join(DATA_DIR, "authorized_user.json")
with open(AUTHORIZED_USER_FILE) as fh:
diff --git a/contrib/python/google-auth/py3/tests/test__helpers.py b/contrib/python/google-auth/py3/tests/test__helpers.py
index c1f1d812e5..c9a3847ac4 100644
--- a/contrib/python/google-auth/py3/tests/test__helpers.py
+++ b/contrib/python/google-auth/py3/tests/test__helpers.py
@@ -51,6 +51,32 @@ def test_copy_docstring_non_existing():
_helpers.copy_docstring(SourceClass)(func2)
+def test_parse_content_type_plain():
+ assert _helpers.parse_content_type("text/html") == "text/html"
+ assert _helpers.parse_content_type("application/xml") == "application/xml"
+ assert _helpers.parse_content_type("application/json") == "application/json"
+
+
+def test_parse_content_type_with_parameters():
+ content_type_html = "text/html; charset=UTF-8"
+ content_type_xml = "application/xml; charset=UTF-16; version=1.0"
+ content_type_json = "application/json; charset=UTF-8; indent=2"
+ assert _helpers.parse_content_type(content_type_html) == "text/html"
+ assert _helpers.parse_content_type(content_type_xml) == "application/xml"
+ assert _helpers.parse_content_type(content_type_json) == "application/json"
+
+
+def test_parse_content_type_missing_or_broken():
+ content_type_foo = None
+ content_type_bar = ""
+ content_type_baz = "1234"
+ content_type_qux = " ; charset=UTF-8"
+ assert _helpers.parse_content_type(content_type_foo) == "text/plain"
+ assert _helpers.parse_content_type(content_type_bar) == "text/plain"
+ assert _helpers.parse_content_type(content_type_baz) == "text/plain"
+ assert _helpers.parse_content_type(content_type_qux) == "text/plain"
+
+
def test_utcnow():
assert isinstance(_helpers.utcnow(), datetime.datetime)
diff --git a/contrib/python/google-auth/py3/tests/test__oauth2client.py b/contrib/python/google-auth/py3/tests/test__oauth2client.py
index 72db6535bc..1db595fd9a 100644
--- a/contrib/python/google-auth/py3/tests/test__oauth2client.py
+++ b/contrib/python/google-auth/py3/tests/test__oauth2client.py
@@ -33,8 +33,8 @@ except ImportError: # pragma: NO COVER
from google.auth import _oauth2client
-import yatest.common
-DATA_DIR = os.path.join(yatest.common.test_source_path(), "data")
+import yatest.common as yc
+DATA_DIR = os.path.join(os.path.dirname(yc.source_path(__file__)), "data")
SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, "service_account.json")
diff --git a/contrib/python/google-auth/py3/tests/test__service_account_info.py b/contrib/python/google-auth/py3/tests/test__service_account_info.py
index db8106081c..2335765bb4 100644
--- a/contrib/python/google-auth/py3/tests/test__service_account_info.py
+++ b/contrib/python/google-auth/py3/tests/test__service_account_info.py
@@ -21,8 +21,8 @@ from google.auth import _service_account_info
from google.auth import crypt
-import yatest.common
-DATA_DIR = os.path.join(yatest.common.test_source_path(), "data")
+import yatest.common as yc
+DATA_DIR = os.path.join(os.path.dirname(yc.source_path(__file__)), "data")
SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, "service_account.json")
GDCH_SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, "gdch_service_account.json")
diff --git a/contrib/python/google-auth/py3/tests/test_aws.py b/contrib/python/google-auth/py3/tests/test_aws.py
index 39138ab12e..db2e984100 100644
--- a/contrib/python/google-auth/py3/tests/test_aws.py
+++ b/contrib/python/google-auth/py3/tests/test_aws.py
@@ -1969,7 +1969,7 @@ class TestCredentials(object):
"authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
"x-goog-user-project": QUOTA_PROJECT_ID,
"x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
- "x-identity-trust-boundary": "0",
+ "x-allowed-locations": "0x0",
}
impersonation_request_data = {
"delegates": None,
@@ -2066,7 +2066,7 @@ class TestCredentials(object):
"authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
"x-goog-user-project": QUOTA_PROJECT_ID,
"x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
- "x-identity-trust-boundary": "0",
+ "x-allowed-locations": "0x0",
}
impersonation_request_data = {
"delegates": None,
diff --git a/contrib/python/google-auth/py3/tests/test_credentials.py b/contrib/python/google-auth/py3/tests/test_credentials.py
index 99235cda61..d64f3abb50 100644
--- a/contrib/python/google-auth/py3/tests/test_credentials.py
+++ b/contrib/python/google-auth/py3/tests/test_credentials.py
@@ -55,9 +55,7 @@ def test_expired_and_valid():
# Set the expiration to one second more than now plus the clock skew
# accomodation. These credentials should be valid.
credentials.expiry = (
- datetime.datetime.utcnow()
- + _helpers.REFRESH_THRESHOLD
- + datetime.timedelta(seconds=1)
+ _helpers.utcnow() + _helpers.REFRESH_THRESHOLD + datetime.timedelta(seconds=1)
)
assert credentials.valid
@@ -65,7 +63,7 @@ def test_expired_and_valid():
# Set the credentials expiration to now. Because of the clock skew
# accomodation, these credentials should report as expired.
- credentials.expiry = datetime.datetime.utcnow()
+ credentials.expiry = _helpers.utcnow()
assert not credentials.valid
assert credentials.expired
@@ -81,7 +79,7 @@ def test_before_request():
assert credentials.valid
assert credentials.token == "token"
assert headers["authorization"] == "Bearer token"
- assert "x-identity-trust-boundary" not in headers
+ assert "x-allowed-locations" not in headers
request = "token2"
headers = {}
@@ -91,13 +89,13 @@ def test_before_request():
assert credentials.valid
assert credentials.token == "token"
assert headers["authorization"] == "Bearer token"
- assert "x-identity-trust-boundary" not in headers
+ assert "x-allowed-locations" not in headers
def test_before_request_with_trust_boundary():
- DUMMY_BOUNDARY = "00110101"
+ DUMMY_BOUNDARY = "0xA30"
credentials = CredentialsImpl()
- credentials._trust_boundary = DUMMY_BOUNDARY
+ credentials._trust_boundary = {"locations": [], "encoded_locations": DUMMY_BOUNDARY}
request = "token"
headers = {}
@@ -106,7 +104,7 @@ def test_before_request_with_trust_boundary():
assert credentials.valid
assert credentials.token == "token"
assert headers["authorization"] == "Bearer token"
- assert headers["x-identity-trust-boundary"] == DUMMY_BOUNDARY
+ assert headers["x-allowed-locations"] == DUMMY_BOUNDARY
request = "token2"
headers = {}
@@ -116,7 +114,7 @@ def test_before_request_with_trust_boundary():
assert credentials.valid
assert credentials.token == "token"
assert headers["authorization"] == "Bearer token"
- assert headers["x-identity-trust-boundary"] == DUMMY_BOUNDARY
+ assert headers["x-allowed-locations"] == DUMMY_BOUNDARY
def test_before_request_metrics():
diff --git a/contrib/python/google-auth/py3/tests/test_external_account.py b/contrib/python/google-auth/py3/tests/test_external_account.py
index 0b165bc70b..5225dcf342 100644
--- a/contrib/python/google-auth/py3/tests/test_external_account.py
+++ b/contrib/python/google-auth/py3/tests/test_external_account.py
@@ -505,6 +505,11 @@ class TestCredentials(object):
credentials = self.make_credentials()
assert credentials.universe_domain == external_account._DEFAULT_UNIVERSE_DOMAIN
+ def test_with_universe_domain(self):
+ credentials = self.make_credentials()
+ new_credentials = credentials.with_universe_domain("dummy_universe.com")
+ assert new_credentials.universe_domain == "dummy_universe.com"
+
def test_info_workforce_pool(self):
credentials = self.make_workforce_pool_credentials(
workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
@@ -833,7 +838,7 @@ class TestCredentials(object):
"Content-Type": "application/json",
"authorization": "Bearer {}".format(token_response["access_token"]),
"x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
- "x-identity-trust-boundary": "0",
+ "x-allowed-locations": "0x0",
}
impersonation_request_data = {
"delegates": None,
@@ -915,7 +920,7 @@ class TestCredentials(object):
"Content-Type": "application/json",
"authorization": "Bearer {}".format(token_response["access_token"]),
"x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
- "x-identity-trust-boundary": "0",
+ "x-allowed-locations": "0x0",
}
impersonation_request_data = {
"delegates": None,
@@ -1134,7 +1139,7 @@ class TestCredentials(object):
"Content-Type": "application/json",
"authorization": "Bearer {}".format(token_response["access_token"]),
"x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
- "x-identity-trust-boundary": "0",
+ "x-allowed-locations": "0x0",
}
impersonation_request_data = {
"delegates": None,
@@ -1218,7 +1223,7 @@ class TestCredentials(object):
"Content-Type": "application/json",
"authorization": "Bearer {}".format(token_response["access_token"]),
"x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
- "x-identity-trust-boundary": "0",
+ "x-allowed-locations": "0x0",
}
impersonation_request_data = {
"delegates": None,
@@ -1274,7 +1279,7 @@ class TestCredentials(object):
assert headers == {
"authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
- "x-identity-trust-boundary": "0",
+ "x-allowed-locations": "0x0",
}
def test_apply_workforce_without_quota_project_id(self):
@@ -1291,7 +1296,7 @@ class TestCredentials(object):
assert headers == {
"authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
- "x-identity-trust-boundary": "0",
+ "x-allowed-locations": "0x0",
}
def test_apply_impersonation_without_quota_project_id(self):
@@ -1323,7 +1328,7 @@ class TestCredentials(object):
assert headers == {
"authorization": "Bearer {}".format(impersonation_response["accessToken"]),
- "x-identity-trust-boundary": "0",
+ "x-allowed-locations": "0x0",
}
def test_apply_with_quota_project_id(self):
@@ -1340,7 +1345,7 @@ class TestCredentials(object):
"other": "header-value",
"authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
"x-goog-user-project": self.QUOTA_PROJECT_ID,
- "x-identity-trust-boundary": "0",
+ "x-allowed-locations": "0x0",
}
def test_apply_impersonation_with_quota_project_id(self):
@@ -1375,7 +1380,7 @@ class TestCredentials(object):
"other": "header-value",
"authorization": "Bearer {}".format(impersonation_response["accessToken"]),
"x-goog-user-project": self.QUOTA_PROJECT_ID,
- "x-identity-trust-boundary": "0",
+ "x-allowed-locations": "0x0",
}
def test_before_request(self):
@@ -1391,7 +1396,7 @@ class TestCredentials(object):
assert headers == {
"other": "header-value",
"authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
- "x-identity-trust-boundary": "0",
+ "x-allowed-locations": "0x0",
}
# Second call shouldn't call refresh.
@@ -1400,7 +1405,7 @@ class TestCredentials(object):
assert headers == {
"other": "header-value",
"authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
- "x-identity-trust-boundary": "0",
+ "x-allowed-locations": "0x0",
}
def test_before_request_workforce(self):
@@ -1418,7 +1423,7 @@ class TestCredentials(object):
assert headers == {
"other": "header-value",
"authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
- "x-identity-trust-boundary": "0",
+ "x-allowed-locations": "0x0",
}
# Second call shouldn't call refresh.
@@ -1427,7 +1432,7 @@ class TestCredentials(object):
assert headers == {
"other": "header-value",
"authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
- "x-identity-trust-boundary": "0",
+ "x-allowed-locations": "0x0",
}
def test_before_request_impersonation(self):
@@ -1458,7 +1463,7 @@ class TestCredentials(object):
assert headers == {
"other": "header-value",
"authorization": "Bearer {}".format(impersonation_response["accessToken"]),
- "x-identity-trust-boundary": "0",
+ "x-allowed-locations": "0x0",
}
# Second call shouldn't call refresh.
@@ -1467,7 +1472,7 @@ class TestCredentials(object):
assert headers == {
"other": "header-value",
"authorization": "Bearer {}".format(impersonation_response["accessToken"]),
- "x-identity-trust-boundary": "0",
+ "x-allowed-locations": "0x0",
}
@mock.patch("google.auth._helpers.utcnow")
@@ -1495,7 +1500,7 @@ class TestCredentials(object):
# Cached token should be used.
assert headers == {
"authorization": "Bearer token",
- "x-identity-trust-boundary": "0",
+ "x-allowed-locations": "0x0",
}
# Next call should simulate 1 second passed.
@@ -1509,7 +1514,7 @@ class TestCredentials(object):
# New token should be retrieved.
assert headers == {
"authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
- "x-identity-trust-boundary": "0",
+ "x-allowed-locations": "0x0",
}
@mock.patch("google.auth._helpers.utcnow")
@@ -1552,7 +1557,7 @@ class TestCredentials(object):
# Cached token should be used.
assert headers == {
"authorization": "Bearer token",
- "x-identity-trust-boundary": "0",
+ "x-allowed-locations": "0x0",
}
# Next call should simulate 1 second passed. This will trigger the expiration
@@ -1567,7 +1572,7 @@ class TestCredentials(object):
# New token should be retrieved.
assert headers == {
"authorization": "Bearer {}".format(impersonation_response["accessToken"]),
- "x-identity-trust-boundary": "0",
+ "x-allowed-locations": "0x0",
}
@pytest.mark.parametrize(
@@ -1666,7 +1671,7 @@ class TestCredentials(object):
"x-goog-user-project": self.QUOTA_PROJECT_ID,
"authorization": "Bearer {}".format(token_response["access_token"]),
"x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
- "x-identity-trust-boundary": "0",
+ "x-allowed-locations": "0x0",
}
impersonation_request_data = {
"delegates": None,
@@ -1720,7 +1725,7 @@ class TestCredentials(object):
"authorization": "Bearer {}".format(
impersonation_response["accessToken"]
),
- "x-identity-trust-boundary": "0",
+ "x-allowed-locations": "0x0",
},
)
@@ -1792,7 +1797,7 @@ class TestCredentials(object):
"authorization": "Bearer {}".format(
self.SUCCESS_RESPONSE["access_token"]
),
- "x-identity-trust-boundary": "0",
+ "x-allowed-locations": "0x0",
},
)
@@ -1842,7 +1847,7 @@ class TestCredentials(object):
"Content-Type": "application/json",
"authorization": "Bearer {}".format(token_response["access_token"]),
"x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
- "x-identity-trust-boundary": "0",
+ "x-allowed-locations": "0x0",
}
impersonation_request_data = {
"delegates": None,
diff --git a/contrib/python/google-auth/py3/tests/test_identity_pool.py b/contrib/python/google-auth/py3/tests/test_identity_pool.py
index d126a579bd..2d10a5d268 100644
--- a/contrib/python/google-auth/py3/tests/test_identity_pool.py
+++ b/contrib/python/google-auth/py3/tests/test_identity_pool.py
@@ -45,8 +45,8 @@ SERVICE_ACCOUNT_IMPERSONATION_URL = (
QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID"
SCOPES = ["scope1", "scope2"]
-import yatest.common
-DATA_DIR = os.path.join(yatest.common.test_source_path(), "data")
+import yatest.common as yc
+DATA_DIR = os.path.join(os.path.dirname(yc.source_path(__file__)), "data")
SUBJECT_TOKEN_TEXT_FILE = os.path.join(DATA_DIR, "external_subject_token.txt")
SUBJECT_TOKEN_JSON_FILE = os.path.join(DATA_DIR, "external_subject_token.json")
SUBJECT_TOKEN_FIELD_NAME = "access_token"
@@ -320,7 +320,7 @@ class TestCredentials(object):
"Content-Type": "application/json",
"authorization": "Bearer {}".format(token_response["access_token"]),
"x-goog-api-client": metrics_header_value,
- "x-identity-trust-boundary": "0",
+ "x-allowed-locations": "0x0",
}
impersonation_request_data = {
"delegates": None,
diff --git a/contrib/python/google-auth/py3/tests/test_impersonated_credentials.py b/contrib/python/google-auth/py3/tests/test_impersonated_credentials.py
index d63d2d5d3b..9696e823ff 100644
--- a/contrib/python/google-auth/py3/tests/test_impersonated_credentials.py
+++ b/contrib/python/google-auth/py3/tests/test_impersonated_credentials.py
@@ -29,8 +29,8 @@ from google.auth.impersonated_credentials import Credentials
from google.oauth2 import credentials
from google.oauth2 import service_account
-import yatest.common
-DATA_DIR = os.path.join(yatest.common.test_source_path(), "data")
+import yatest.common as yc
+DATA_DIR = os.path.join(os.path.dirname(yc.source_path(__file__)), "data")
with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh:
PRIVATE_KEY_BYTES = fh.read()
diff --git a/contrib/python/google-auth/py3/tests/test_jwt.py b/contrib/python/google-auth/py3/tests/test_jwt.py
index 62f310606d..ff8fd67da6 100644
--- a/contrib/python/google-auth/py3/tests/test_jwt.py
+++ b/contrib/python/google-auth/py3/tests/test_jwt.py
@@ -26,8 +26,8 @@ from google.auth import exceptions
from google.auth import jwt
-import yatest.common
-DATA_DIR = os.path.join(yatest.common.test_source_path(), "data")
+import yatest.common as yc
+DATA_DIR = os.path.join(os.path.dirname(yc.source_path(__file__)), "data")
with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh:
PRIVATE_KEY_BYTES = fh.read()
diff --git a/contrib/python/google-auth/py3/tests/transport/test__custom_tls_signer.py b/contrib/python/google-auth/py3/tests/transport/test__custom_tls_signer.py
index 5836b325ad..d2907bad29 100644
--- a/contrib/python/google-auth/py3/tests/transport/test__custom_tls_signer.py
+++ b/contrib/python/google-auth/py3/tests/transport/test__custom_tls_signer.py
@@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
import base64
import ctypes
import os
@@ -30,11 +29,19 @@ FAKE_ENTERPRISE_CERT_FILE_PATH = "/path/to/enterprise/cert/file"
ENTERPRISE_CERT_FILE = os.path.join(
os.path.dirname(__file__), "../data/enterprise_cert_valid.json"
)
+ENTERPRISE_CERT_FILE_PROVIDER = os.path.join(
+ os.path.dirname(__file__), "../data/enterprise_cert_valid_provider.json"
+)
INVALID_ENTERPRISE_CERT_FILE = os.path.join(
os.path.dirname(__file__), "../data/enterprise_cert_invalid.json"
)
+def test_load_provider_lib():
+ with mock.patch("ctypes.CDLL", return_value=mock.MagicMock()):
+ _custom_tls_signer.load_provider_lib("/path/to/provider/lib")
+
+
def test_load_offload_lib():
with mock.patch("ctypes.CDLL", return_value=mock.MagicMock()):
lib = _custom_tls_signer.load_offload_lib("/path/to/offload/lib")
@@ -173,62 +180,81 @@ def test_custom_tls_signer():
) as load_offload_lib:
load_offload_lib.return_value = offload_lib
load_signer_lib.return_value = signer_lib
- signer_object = _custom_tls_signer.CustomTlsSigner(ENTERPRISE_CERT_FILE)
- signer_object.load_libraries()
- assert signer_object._cert is None
+ with mock.patch(
+ "google.auth.transport._custom_tls_signer.get_cert"
+ ) as get_cert:
+ with mock.patch(
+ "google.auth.transport._custom_tls_signer.get_sign_callback"
+ ) as get_sign_callback:
+ get_cert.return_value = b"mock_cert"
+ signer_object = _custom_tls_signer.CustomTlsSigner(
+ ENTERPRISE_CERT_FILE
+ )
+ signer_object.load_libraries()
+ signer_object.attach_to_ssl_context(create_urllib3_context())
+ get_cert.assert_called_once()
+ get_sign_callback.assert_called_once()
+ offload_lib.ConfigureSslContext.assert_called_once()
assert signer_object._enterprise_cert_file_path == ENTERPRISE_CERT_FILE
assert signer_object._offload_lib == offload_lib
assert signer_object._signer_lib == signer_lib
load_signer_lib.assert_called_with("/path/to/signer/lib")
load_offload_lib.assert_called_with("/path/to/offload/lib")
- # Test set_up_custom_key and set_up_ssl_context methods
- with mock.patch("google.auth.transport._custom_tls_signer.get_cert") as get_cert:
- with mock.patch(
- "google.auth.transport._custom_tls_signer.get_sign_callback"
- ) as get_sign_callback:
- get_cert.return_value = b"mock_cert"
- signer_object.set_up_custom_key()
- signer_object.attach_to_ssl_context(create_urllib3_context())
- get_cert.assert_called_once()
- get_sign_callback.assert_called_once()
- offload_lib.ConfigureSslContext.assert_called_once()
+def test_custom_tls_signer_provider():
+ provider_lib = mock.MagicMock()
-def test_custom_tls_signer_failed_to_load_libraries():
# Test load_libraries method
+ with mock.patch(
+ "google.auth.transport._custom_tls_signer.load_provider_lib"
+ ) as load_provider_lib:
+ load_provider_lib.return_value = provider_lib
+ signer_object = _custom_tls_signer.CustomTlsSigner(
+ ENTERPRISE_CERT_FILE_PROVIDER
+ )
+ signer_object.load_libraries()
+ signer_object.attach_to_ssl_context(mock.MagicMock())
+
+ assert signer_object._enterprise_cert_file_path == ENTERPRISE_CERT_FILE_PROVIDER
+ assert signer_object._provider_lib == provider_lib
+ load_provider_lib.assert_called_with("/path/to/provider/lib")
+
+
+def test_custom_tls_signer_failed_to_load_libraries():
with pytest.raises(exceptions.MutualTLSChannelError) as excinfo:
signer_object = _custom_tls_signer.CustomTlsSigner(INVALID_ENTERPRISE_CERT_FILE)
signer_object.load_libraries()
assert excinfo.match("enterprise cert file is invalid")
-def test_custom_tls_signer_fail_to_offload():
- offload_lib = mock.MagicMock()
- signer_lib = mock.MagicMock()
+def test_custom_tls_signer_failed_to_attach():
+ with pytest.raises(exceptions.MutualTLSChannelError) as excinfo:
+ signer_object = _custom_tls_signer.CustomTlsSigner(ENTERPRISE_CERT_FILE)
+ signer_object._offload_lib = mock.MagicMock()
+ signer_object._signer_lib = mock.MagicMock()
+ signer_object._sign_callback = mock.MagicMock()
+ signer_object._cert = b"mock cert"
+ signer_object._offload_lib.ConfigureSslContext.return_value = False
+ signer_object.attach_to_ssl_context(mock.MagicMock())
+ assert excinfo.match("failed to configure ECP Offload SSL context")
- with mock.patch(
- "google.auth.transport._custom_tls_signer.load_signer_lib"
- ) as load_signer_lib:
- with mock.patch(
- "google.auth.transport._custom_tls_signer.load_offload_lib"
- ) as load_offload_lib:
- load_offload_lib.return_value = offload_lib
- load_signer_lib.return_value = signer_lib
- signer_object = _custom_tls_signer.CustomTlsSigner(ENTERPRISE_CERT_FILE)
- signer_object.load_libraries()
- # set the return value to be 0 which indicts offload fails
- offload_lib.ConfigureSslContext.return_value = 0
+def test_custom_tls_signer_failed_to_attach_provider():
+ with pytest.raises(exceptions.MutualTLSChannelError) as excinfo:
+ signer_object = _custom_tls_signer.CustomTlsSigner(
+ ENTERPRISE_CERT_FILE_PROVIDER
+ )
+ signer_object._provider_lib = mock.MagicMock()
+ signer_object._provider_lib.ECP_attach_to_ctx.return_value = False
+ signer_object.attach_to_ssl_context(mock.MagicMock())
+ assert excinfo.match("failed to configure ECP Provider SSL context")
+
+def test_custom_tls_signer_failed_to_attach_no_libs():
with pytest.raises(exceptions.MutualTLSChannelError) as excinfo:
- with mock.patch(
- "google.auth.transport._custom_tls_signer.get_cert"
- ) as get_cert:
- with mock.patch(
- "google.auth.transport._custom_tls_signer.get_sign_callback"
- ):
- get_cert.return_value = b"mock_cert"
- signer_object.set_up_custom_key()
- signer_object.attach_to_ssl_context(create_urllib3_context())
- assert excinfo.match("failed to configure SSL context")
+ signer_object = _custom_tls_signer.CustomTlsSigner(ENTERPRISE_CERT_FILE)
+ signer_object._offload_lib = None
+ signer_object._signer_lib = None
+ signer_object.attach_to_ssl_context(mock.MagicMock())
+ assert excinfo.match("Invalid ECP configuration.")
diff --git a/contrib/python/google-auth/py3/tests/transport/test__mtls_helper.py b/contrib/python/google-auth/py3/tests/transport/test__mtls_helper.py
index 642283a5c5..1621a05302 100644
--- a/contrib/python/google-auth/py3/tests/transport/test__mtls_helper.py
+++ b/contrib/python/google-auth/py3/tests/transport/test__mtls_helper.py
@@ -22,9 +22,6 @@ import pytest # type: ignore
from google.auth import exceptions
from google.auth.transport import _mtls_helper
-import yatest.common
-DATA_DIR = os.path.join(yatest.common.test_source_path(), "data")
-
CONTEXT_AWARE_METADATA = {"cert_provider_command": ["some command"]}
ENCRYPTED_EC_PRIVATE_KEY = b"""-----BEGIN ENCRYPTED PRIVATE KEY-----
@@ -116,26 +113,26 @@ class TestCertAndKeyRegex(object):
class TestCheckaMetadataPath(object):
def test_success(self):
- metadata_path = os.path.join(DATA_DIR, "context_aware_metadata.json")
+ metadata_path = os.path.join(pytest.data_dir, "context_aware_metadata.json")
returned_path = _mtls_helper._check_dca_metadata_path(metadata_path)
assert returned_path is not None
def test_failure(self):
- metadata_path = os.path.join(DATA_DIR, "not_exists.json")
+ metadata_path = os.path.join(pytest.data_dir, "not_exists.json")
returned_path = _mtls_helper._check_dca_metadata_path(metadata_path)
assert returned_path is None
class TestReadMetadataFile(object):
def test_success(self):
- metadata_path = os.path.join(DATA_DIR, "context_aware_metadata.json")
+ metadata_path = os.path.join(pytest.data_dir, "context_aware_metadata.json")
metadata = _mtls_helper._read_dca_metadata_file(metadata_path)
assert "cert_provider_command" in metadata
def test_file_not_json(self):
# read a file which is not json format.
- metadata_path = os.path.join(DATA_DIR, "privatekey.pem")
+ metadata_path = os.path.join(pytest.data_dir, "privatekey.pem")
with pytest.raises(exceptions.ClientCertError):
_mtls_helper._read_dca_metadata_file(metadata_path)
diff --git a/contrib/python/google-auth/py3/tests/transport/test_grpc.py b/contrib/python/google-auth/py3/tests/transport/test_grpc.py
index 05dc5fad0e..29fae4cdf6 100644
--- a/contrib/python/google-auth/py3/tests/transport/test_grpc.py
+++ b/contrib/python/google-auth/py3/tests/transport/test_grpc.py
@@ -35,8 +35,8 @@ try:
except ImportError: # pragma: NO COVER
HAS_GRPC = False
-import yatest.common
-DATA_DIR = os.path.join(yatest.common.test_source_path(), "data")
+import yatest.common as yc
+DATA_DIR = os.path.join(os.path.dirname(yc.source_path(__file__)), "..", "data")
METADATA_PATH = os.path.join(DATA_DIR, "context_aware_metadata.json")
with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh:
PRIVATE_KEY_BYTES = fh.read()
diff --git a/contrib/python/google-auth/py3/tests/transport/test_requests.py b/contrib/python/google-auth/py3/tests/transport/test_requests.py
index d962814346..aadc1ddbfd 100644
--- a/contrib/python/google-auth/py3/tests/transport/test_requests.py
+++ b/contrib/python/google-auth/py3/tests/transport/test_requests.py
@@ -545,16 +545,12 @@ class TestMutualTlsOffloadAdapter(object):
google.auth.transport._custom_tls_signer.CustomTlsSigner, "load_libraries"
)
@mock.patch.object(
- google.auth.transport._custom_tls_signer.CustomTlsSigner, "set_up_custom_key"
- )
- @mock.patch.object(
google.auth.transport._custom_tls_signer.CustomTlsSigner,
"attach_to_ssl_context",
)
def test_success(
self,
mock_attach_to_ssl_context,
- mock_set_up_custom_key,
mock_load_libraries,
mock_proxy_manager_for,
mock_init_poolmanager,
@@ -565,7 +561,6 @@ class TestMutualTlsOffloadAdapter(object):
)
mock_load_libraries.assert_called_once()
- mock_set_up_custom_key.assert_called_once()
assert mock_attach_to_ssl_context.call_count == 2
adapter.init_poolmanager()
diff --git a/contrib/python/google-auth/py3/tests/ya.make b/contrib/python/google-auth/py3/tests/ya.make
index e7a1b3b272..dfcabf5bfb 100644
--- a/contrib/python/google-auth/py3/tests/ya.make
+++ b/contrib/python/google-auth/py3/tests/ya.make
@@ -67,11 +67,6 @@ TEST_SRCS(
# transport/test_urllib3.py
)
-RESOURCE(
- data/privatekey.pem data/privatekey.pem
- data/public_cert.pem data/public_cert.pem
-)
-
NO_LINT()
END()