summaryrefslogtreecommitdiffstats
path: root/contrib/python/google-auth/py3/tests/oauth2/test__client.py
diff options
context:
space:
mode:
authoralexv-smirnov <[email protected]>2023-12-01 12:02:50 +0300
committeralexv-smirnov <[email protected]>2023-12-01 13:28:10 +0300
commit0e578a4c44d4abd539d9838347b9ebafaca41dfb (patch)
treea0c1969c37f818c830ebeff9c077eacf30be6ef8 /contrib/python/google-auth/py3/tests/oauth2/test__client.py
parent84f2d3d4cc985e63217cff149bd2e6d67ae6fe22 (diff)
Change "ya.make"
Diffstat (limited to 'contrib/python/google-auth/py3/tests/oauth2/test__client.py')
-rw-r--r--contrib/python/google-auth/py3/tests/oauth2/test__client.py622
1 files changed, 622 insertions, 0 deletions
diff --git a/contrib/python/google-auth/py3/tests/oauth2/test__client.py b/contrib/python/google-auth/py3/tests/oauth2/test__client.py
new file mode 100644
index 00000000000..54179269bde
--- /dev/null
+++ b/contrib/python/google-auth/py3/tests/oauth2/test__client.py
@@ -0,0 +1,622 @@
+# Copyright 2016 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+import http.client as http_client
+import json
+import os
+import urllib
+
+import mock
+import pytest # type: ignore
+
+from google.auth import _helpers
+from google.auth import crypt
+from google.auth import exceptions
+from google.auth import jwt
+from google.auth import transport
+from google.oauth2 import _client
+
+
+import yatest.common
+DATA_DIR = os.path.join(yatest.common.test_source_path(), "data")
+
+with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh:
+ PRIVATE_KEY_BYTES = fh.read()
+
+SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, "1")
+
+SCOPES_AS_LIST = [
+ "https://www.googleapis.com/auth/pubsub",
+ "https://www.googleapis.com/auth/logging.write",
+]
+SCOPES_AS_STRING = (
+ "https://www.googleapis.com/auth/pubsub"
+ " https://www.googleapis.com/auth/logging.write"
+)
+
+ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE = (
+ "gl-python/3.7 auth/1.1 auth-request-type/at cred-type/sa"
+)
+ID_TOKEN_REQUEST_METRICS_HEADER_VALUE = (
+ "gl-python/3.7 auth/1.1 auth-request-type/it cred-type/sa"
+)
+
+
[email protected]("retryable", [True, False])
+def test__handle_error_response(retryable):
+ response_data = {"error": "help", "error_description": "I'm alive"}
+
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ _client._handle_error_response(response_data, retryable)
+
+ assert excinfo.value.retryable == retryable
+ assert excinfo.match(r"help: I\'m alive")
+
+
+def test__handle_error_response_no_error():
+ response_data = {"foo": "bar"}
+
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ _client._handle_error_response(response_data, False)
+
+ assert not excinfo.value.retryable
+ assert excinfo.match(r"{\"foo\": \"bar\"}")
+
+
+def test__handle_error_response_not_json():
+ response_data = "this is an error message"
+
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ _client._handle_error_response(response_data, False)
+
+ assert not excinfo.value.retryable
+ assert excinfo.match(response_data)
+
+
+def test__can_retry_retryable():
+ retryable_codes = transport.DEFAULT_RETRYABLE_STATUS_CODES
+ for status_code in range(100, 600):
+ if status_code in retryable_codes:
+ assert _client._can_retry(status_code, {"error": "invalid_scope"})
+ else:
+ assert not _client._can_retry(status_code, {"error": "invalid_scope"})
+
+
+ "response_data", [{"error": "internal_failure"}, {"error": "server_error"}]
+)
+def test__can_retry_message(response_data):
+ assert _client._can_retry(http_client.OK, response_data)
+
+
+ "response_data",
+ [
+ {"error": "invalid_scope"},
+ {"error": {"foo": "bar"}},
+ {"error_description": {"foo", "bar"}},
+ ],
+)
+def test__can_retry_no_retry_message(response_data):
+ assert not _client._can_retry(http_client.OK, response_data)
+
+
[email protected]("mock_expires_in", [500, "500"])
[email protected]("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
+def test__parse_expiry(unused_utcnow, mock_expires_in):
+ result = _client._parse_expiry({"expires_in": mock_expires_in})
+ assert result == datetime.datetime.min + datetime.timedelta(seconds=500)
+
+
+def test__parse_expiry_none():
+ assert _client._parse_expiry({}) is None
+
+
+def make_request(response_data, status=http_client.OK):
+ response = mock.create_autospec(transport.Response, instance=True)
+ response.status = status
+ response.data = json.dumps(response_data).encode("utf-8")
+ request = mock.create_autospec(transport.Request)
+ request.return_value = response
+ return request
+
+
+def test__token_endpoint_request():
+ request = make_request({"test": "response"})
+
+ result = _client._token_endpoint_request(
+ request, "http://example.com", {"test": "params"}
+ )
+
+ # Check request call
+ request.assert_called_with(
+ method="POST",
+ url="http://example.com",
+ headers={"Content-Type": "application/x-www-form-urlencoded"},
+ body="test=params".encode("utf-8"),
+ )
+
+ # Check result
+ assert result == {"test": "response"}
+
+
+def test__token_endpoint_request_use_json():
+ request = make_request({"test": "response"})
+
+ result = _client._token_endpoint_request(
+ request,
+ "http://example.com",
+ {"test": "params"},
+ access_token="access_token",
+ use_json=True,
+ )
+
+ # Check request call
+ request.assert_called_with(
+ method="POST",
+ url="http://example.com",
+ headers={
+ "Content-Type": "application/json",
+ "Authorization": "Bearer access_token",
+ },
+ body=b'{"test": "params"}',
+ )
+
+ # Check result
+ assert result == {"test": "response"}
+
+
+def test__token_endpoint_request_error():
+ request = make_request({}, status=http_client.BAD_REQUEST)
+
+ with pytest.raises(exceptions.RefreshError):
+ _client._token_endpoint_request(request, "http://example.com", {})
+
+
+def test__token_endpoint_request_internal_failure_error():
+ request = make_request(
+ {"error_description": "internal_failure"}, status=http_client.BAD_REQUEST
+ )
+
+ with pytest.raises(exceptions.RefreshError):
+ _client._token_endpoint_request(
+ request, "http://example.com", {"error_description": "internal_failure"}
+ )
+ # request should be called once and then with 3 retries
+ assert request.call_count == 4
+
+ request = make_request(
+ {"error": "internal_failure"}, status=http_client.BAD_REQUEST
+ )
+
+ with pytest.raises(exceptions.RefreshError):
+ _client._token_endpoint_request(
+ request, "http://example.com", {"error": "internal_failure"}
+ )
+ # request should be called once and then with 3 retries
+ assert request.call_count == 4
+
+
+def test__token_endpoint_request_internal_failure_and_retry_failure_error():
+ retryable_error = mock.create_autospec(transport.Response, instance=True)
+ retryable_error.status = http_client.BAD_REQUEST
+ retryable_error.data = json.dumps({"error_description": "internal_failure"}).encode(
+ "utf-8"
+ )
+
+ unretryable_error = mock.create_autospec(transport.Response, instance=True)
+ unretryable_error.status = http_client.BAD_REQUEST
+ unretryable_error.data = json.dumps({"error_description": "invalid_scope"}).encode(
+ "utf-8"
+ )
+
+ request = mock.create_autospec(transport.Request)
+
+ request.side_effect = [retryable_error, retryable_error, unretryable_error]
+
+ with pytest.raises(exceptions.RefreshError):
+ _client._token_endpoint_request(
+ request, "http://example.com", {"error_description": "invalid_scope"}
+ )
+ # request should be called three times. Two retryable errors and one
+ # unretryable error to break the retry loop.
+ assert request.call_count == 3
+
+
+def test__token_endpoint_request_internal_failure_and_retry_succeeds():
+ retryable_error = mock.create_autospec(transport.Response, instance=True)
+ retryable_error.status = http_client.BAD_REQUEST
+ retryable_error.data = json.dumps({"error_description": "internal_failure"}).encode(
+ "utf-8"
+ )
+
+ response = mock.create_autospec(transport.Response, instance=True)
+ response.status = http_client.OK
+ response.data = json.dumps({"hello": "world"}).encode("utf-8")
+
+ request = mock.create_autospec(transport.Request)
+
+ request.side_effect = [retryable_error, response]
+
+ _ = _client._token_endpoint_request(
+ request, "http://example.com", {"test": "params"}
+ )
+
+ assert request.call_count == 2
+
+
+def test__token_endpoint_request_string_error():
+ response = mock.create_autospec(transport.Response, instance=True)
+ response.status = http_client.BAD_REQUEST
+ response.data = "this is an error message"
+ request = mock.create_autospec(transport.Request)
+ request.return_value = response
+
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ _client._token_endpoint_request(request, "http://example.com", {})
+ assert excinfo.match("this is an error message")
+
+
+def verify_request_params(request, params):
+ request_body = request.call_args[1]["body"].decode("utf-8")
+ request_params = urllib.parse.parse_qs(request_body)
+
+ for key, value in params.items():
+ assert request_params[key][0] == value
+
+
[email protected]("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
+def test_jwt_grant(utcnow):
+ request = make_request(
+ {"access_token": "token", "expires_in": 500, "extra": "data"}
+ )
+
+ token, expiry, extra_data = _client.jwt_grant(
+ request, "http://example.com", "assertion_value"
+ )
+
+ # Check request call
+ verify_request_params(
+ request, {"grant_type": _client._JWT_GRANT_TYPE, "assertion": "assertion_value"}
+ )
+
+ # Check result
+ assert token == "token"
+ assert expiry == utcnow() + datetime.timedelta(seconds=500)
+ assert extra_data["extra"] == "data"
+
+
+def test_jwt_grant_no_access_token():
+ request = make_request(
+ {
+ # No access token.
+ "expires_in": 500,
+ "extra": "data",
+ }
+ )
+
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ _client.jwt_grant(request, "http://example.com", "assertion_value")
+ assert not excinfo.value.retryable
+
+
+def test_call_iam_generate_id_token_endpoint():
+ now = _helpers.utcnow()
+ id_token_expiry = _helpers.datetime_to_secs(now)
+ id_token = jwt.encode(SIGNER, {"exp": id_token_expiry}).decode("utf-8")
+ request = make_request({"token": id_token})
+
+ token, expiry = _client.call_iam_generate_id_token_endpoint(
+ request, "fake_email", "fake_audience", "fake_access_token"
+ )
+
+ assert (
+ request.call_args[1]["url"]
+ == "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/fake_email:generateIdToken"
+ )
+ assert request.call_args[1]["headers"]["Content-Type"] == "application/json"
+ assert (
+ request.call_args[1]["headers"]["Authorization"] == "Bearer fake_access_token"
+ )
+ response_body = json.loads(request.call_args[1]["body"])
+ assert response_body["audience"] == "fake_audience"
+ assert response_body["includeEmail"] == "true"
+ assert response_body["useEmailAzp"] == "true"
+
+ # Check result
+ assert token == id_token
+ # JWT does not store microseconds
+ now = now.replace(microsecond=0)
+ assert expiry == now
+
+
+def test_call_iam_generate_id_token_endpoint_no_id_token():
+ request = make_request(
+ {
+ # No access token.
+ "error": "no token"
+ }
+ )
+
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ _client.call_iam_generate_id_token_endpoint(
+ request, "fake_email", "fake_audience", "fake_access_token"
+ )
+ assert excinfo.match("No ID token in response")
+
+
+def test_id_token_jwt_grant():
+ now = _helpers.utcnow()
+ id_token_expiry = _helpers.datetime_to_secs(now)
+ id_token = jwt.encode(SIGNER, {"exp": id_token_expiry}).decode("utf-8")
+ request = make_request({"id_token": id_token, "extra": "data"})
+
+ token, expiry, extra_data = _client.id_token_jwt_grant(
+ request, "http://example.com", "assertion_value"
+ )
+
+ # Check request call
+ verify_request_params(
+ request, {"grant_type": _client._JWT_GRANT_TYPE, "assertion": "assertion_value"}
+ )
+
+ # Check result
+ assert token == id_token
+ # JWT does not store microseconds
+ now = now.replace(microsecond=0)
+ assert expiry == now
+ assert extra_data["extra"] == "data"
+
+
+def test_id_token_jwt_grant_no_access_token():
+ request = make_request(
+ {
+ # No access token.
+ "expires_in": 500,
+ "extra": "data",
+ }
+ )
+
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ _client.id_token_jwt_grant(request, "http://example.com", "assertion_value")
+ assert not excinfo.value.retryable
+
+
[email protected]("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
+def test_refresh_grant(unused_utcnow):
+ request = make_request(
+ {
+ "access_token": "token",
+ "refresh_token": "new_refresh_token",
+ "expires_in": 500,
+ "extra": "data",
+ }
+ )
+
+ token, refresh_token, expiry, extra_data = _client.refresh_grant(
+ request,
+ "http://example.com",
+ "refresh_token",
+ "client_id",
+ "client_secret",
+ rapt_token="rapt_token",
+ )
+
+ # Check request call
+ verify_request_params(
+ request,
+ {
+ "grant_type": _client._REFRESH_GRANT_TYPE,
+ "refresh_token": "refresh_token",
+ "client_id": "client_id",
+ "client_secret": "client_secret",
+ "rapt": "rapt_token",
+ },
+ )
+
+ # Check result
+ assert token == "token"
+ assert refresh_token == "new_refresh_token"
+ assert expiry == datetime.datetime.min + datetime.timedelta(seconds=500)
+ assert extra_data["extra"] == "data"
+
+
[email protected]("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
+def test_refresh_grant_with_scopes(unused_utcnow):
+ request = make_request(
+ {
+ "access_token": "token",
+ "refresh_token": "new_refresh_token",
+ "expires_in": 500,
+ "extra": "data",
+ "scope": SCOPES_AS_STRING,
+ }
+ )
+
+ token, refresh_token, expiry, extra_data = _client.refresh_grant(
+ request,
+ "http://example.com",
+ "refresh_token",
+ "client_id",
+ "client_secret",
+ SCOPES_AS_LIST,
+ )
+
+ # Check request call.
+ verify_request_params(
+ request,
+ {
+ "grant_type": _client._REFRESH_GRANT_TYPE,
+ "refresh_token": "refresh_token",
+ "client_id": "client_id",
+ "client_secret": "client_secret",
+ "scope": SCOPES_AS_STRING,
+ },
+ )
+
+ # Check result.
+ assert token == "token"
+ assert refresh_token == "new_refresh_token"
+ assert expiry == datetime.datetime.min + datetime.timedelta(seconds=500)
+ assert extra_data["extra"] == "data"
+
+
+def test_refresh_grant_no_access_token():
+ request = make_request(
+ {
+ # No access token.
+ "refresh_token": "new_refresh_token",
+ "expires_in": 500,
+ "extra": "data",
+ }
+ )
+
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ _client.refresh_grant(
+ request, "http://example.com", "refresh_token", "client_id", "client_secret"
+ )
+ assert not excinfo.value.retryable
+
+
+ "google.auth.metrics.token_request_access_token_sa_assertion",
+ return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
+)
[email protected]("google.oauth2._client._parse_expiry", return_value=None)
[email protected](_client, "_token_endpoint_request", autospec=True)
+def test_jwt_grant_retry_default(
+ mock_token_endpoint_request, mock_expiry, mock_metrics_header_value
+):
+ _client.jwt_grant(mock.Mock(), mock.Mock(), mock.Mock())
+ mock_token_endpoint_request.assert_called_with(
+ mock.ANY,
+ mock.ANY,
+ mock.ANY,
+ can_retry=True,
+ headers={"x-goog-api-client": ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE},
+ )
+
+
[email protected]("can_retry", [True, False])
+ "google.auth.metrics.token_request_access_token_sa_assertion",
+ return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
+)
[email protected]("google.oauth2._client._parse_expiry", return_value=None)
[email protected](_client, "_token_endpoint_request", autospec=True)
+def test_jwt_grant_retry_with_retry(
+ mock_token_endpoint_request, mock_expiry, mock_metrics_header_value, can_retry
+):
+ _client.jwt_grant(mock.Mock(), mock.Mock(), mock.Mock(), can_retry=can_retry)
+ mock_token_endpoint_request.assert_called_with(
+ mock.ANY,
+ mock.ANY,
+ mock.ANY,
+ can_retry=can_retry,
+ headers={"x-goog-api-client": ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE},
+ )
+
+
+ "google.auth.metrics.token_request_id_token_sa_assertion",
+ return_value=ID_TOKEN_REQUEST_METRICS_HEADER_VALUE,
+)
[email protected]("google.auth.jwt.decode", return_value={"exp": 0})
[email protected](_client, "_token_endpoint_request", autospec=True)
+def test_id_token_jwt_grant_retry_default(
+ mock_token_endpoint_request, mock_jwt_decode, mock_metrics_header_value
+):
+ _client.id_token_jwt_grant(mock.Mock(), mock.Mock(), mock.Mock())
+ mock_token_endpoint_request.assert_called_with(
+ mock.ANY,
+ mock.ANY,
+ mock.ANY,
+ can_retry=True,
+ headers={"x-goog-api-client": ID_TOKEN_REQUEST_METRICS_HEADER_VALUE},
+ )
+
+
[email protected]("can_retry", [True, False])
+ "google.auth.metrics.token_request_id_token_sa_assertion",
+ return_value=ID_TOKEN_REQUEST_METRICS_HEADER_VALUE,
+)
[email protected]("google.auth.jwt.decode", return_value={"exp": 0})
[email protected](_client, "_token_endpoint_request", autospec=True)
+def test_id_token_jwt_grant_retry_with_retry(
+ mock_token_endpoint_request, mock_jwt_decode, mock_metrics_header_value, can_retry
+):
+ _client.id_token_jwt_grant(
+ mock.Mock(), mock.Mock(), mock.Mock(), can_retry=can_retry
+ )
+ mock_token_endpoint_request.assert_called_with(
+ mock.ANY,
+ mock.ANY,
+ mock.ANY,
+ can_retry=can_retry,
+ headers={"x-goog-api-client": ID_TOKEN_REQUEST_METRICS_HEADER_VALUE},
+ )
+
+
[email protected]("google.oauth2._client._parse_expiry", return_value=None)
[email protected](_client, "_token_endpoint_request", autospec=True)
+def test_refresh_grant_retry_default(mock_token_endpoint_request, mock_parse_expiry):
+ _client.refresh_grant(
+ mock.Mock(), mock.Mock(), mock.Mock(), mock.Mock(), mock.Mock()
+ )
+ mock_token_endpoint_request.assert_called_with(
+ mock.ANY, mock.ANY, mock.ANY, can_retry=True
+ )
+
+
[email protected]("can_retry", [True, False])
[email protected]("google.oauth2._client._parse_expiry", return_value=None)
[email protected](_client, "_token_endpoint_request", autospec=True)
+def test_refresh_grant_retry_with_retry(
+ mock_token_endpoint_request, mock_parse_expiry, can_retry
+):
+ _client.refresh_grant(
+ mock.Mock(),
+ mock.Mock(),
+ mock.Mock(),
+ mock.Mock(),
+ mock.Mock(),
+ can_retry=can_retry,
+ )
+ mock_token_endpoint_request.assert_called_with(
+ mock.ANY, mock.ANY, mock.ANY, can_retry=can_retry
+ )
+
+
[email protected]("can_retry", [True, False])
+def test__token_endpoint_request_no_throw_with_retry(can_retry):
+ response_data = {"error": "help", "error_description": "I'm alive"}
+ body = "dummy body"
+
+ mock_response = mock.create_autospec(transport.Response, instance=True)
+ mock_response.status = http_client.INTERNAL_SERVER_ERROR
+ mock_response.data = json.dumps(response_data).encode("utf-8")
+
+ mock_request = mock.create_autospec(transport.Request)
+ mock_request.return_value = mock_response
+
+ _client._token_endpoint_request_no_throw(
+ mock_request, mock.Mock(), body, mock.Mock(), mock.Mock(), can_retry=can_retry
+ )
+
+ if can_retry:
+ assert mock_request.call_count == 4
+ else:
+ assert mock_request.call_count == 1