aboutsummaryrefslogblamecommitdiffstats
path: root/contrib/python/google-auth/py3/tests/transport/test_urllib3.py
blob: e832300321859680a5af3128913af32825e3534b (plain) (tree)
































































































































































































































































































































                                                                                           
# Copyright 2016 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import http.client as http_client
import os
import sys

import mock
import OpenSSL
import pytest  # type: ignore
import urllib3  # type: ignore

from google.auth import environment_vars
from google.auth import exceptions
import google.auth.credentials
import google.auth.transport._mtls_helper
import google.auth.transport.urllib3
from google.oauth2 import service_account
from tests.transport import compliance


class TestRequestResponse(compliance.RequestResponseTests):
    def make_request(self):
        http = urllib3.PoolManager()
        return google.auth.transport.urllib3.Request(http)

    def test_timeout(self):
        http = mock.create_autospec(urllib3.PoolManager)
        request = google.auth.transport.urllib3.Request(http)
        request(url="http://example.com", method="GET", timeout=5)

        assert http.request.call_args[1]["timeout"] == 5


def test__make_default_http_with_certifi():
    http = google.auth.transport.urllib3._make_default_http()
    assert "cert_reqs" in http.connection_pool_kw


@mock.patch.object(google.auth.transport.urllib3, "certifi", new=None)
def test__make_default_http_without_certifi():
    http = google.auth.transport.urllib3._make_default_http()
    assert "cert_reqs" not in http.connection_pool_kw


class CredentialsStub(google.auth.credentials.Credentials):
    def __init__(self, token="token"):
        super(CredentialsStub, self).__init__()
        self.token = token

    def apply(self, headers, token=None):
        headers["authorization"] = self.token

    def before_request(self, request, method, url, headers):
        self.apply(headers)

    def refresh(self, request):
        self.token += "1"

    def with_quota_project(self, quota_project_id):
        raise NotImplementedError()


class HttpStub(object):
    def __init__(self, responses, headers=None):
        self.responses = responses
        self.requests = []
        self.headers = headers or {}

    def urlopen(self, method, url, body=None, headers=None, **kwargs):
        self.requests.append((method, url, body, headers, kwargs))
        return self.responses.pop(0)

    def clear(self):
        pass


class ResponseStub(object):
    def __init__(self, status=http_client.OK, data=None):
        self.status = status
        self.data = data


class TestMakeMutualTlsHttp(object):
    def test_success(self):
        http = google.auth.transport.urllib3._make_mutual_tls_http(
            pytest.public_cert_bytes, pytest.private_key_bytes
        )
        assert isinstance(http, urllib3.PoolManager)

    def test_crypto_error(self):
        with pytest.raises(OpenSSL.crypto.Error):
            google.auth.transport.urllib3._make_mutual_tls_http(
                b"invalid cert", b"invalid key"
            )

    @mock.patch.dict("sys.modules", {"OpenSSL.crypto": None})
    def test_import_error(self):
        with pytest.raises(ImportError):
            google.auth.transport.urllib3._make_mutual_tls_http(
                pytest.public_cert_bytes, pytest.private_key_bytes
            )


class TestAuthorizedHttp(object):
    TEST_URL = "http://example.com"

    def test_authed_http_defaults(self):
        authed_http = google.auth.transport.urllib3.AuthorizedHttp(
            mock.sentinel.credentials
        )

        assert authed_http.credentials == mock.sentinel.credentials
        assert isinstance(authed_http.http, urllib3.PoolManager)

    def test_urlopen_no_refresh(self):
        credentials = mock.Mock(wraps=CredentialsStub())
        response = ResponseStub()
        http = HttpStub([response])

        authed_http = google.auth.transport.urllib3.AuthorizedHttp(
            credentials, http=http
        )

        result = authed_http.urlopen("GET", self.TEST_URL)

        assert result == response
        assert credentials.before_request.called
        assert not credentials.refresh.called
        assert http.requests == [
            ("GET", self.TEST_URL, None, {"authorization": "token"}, {})
        ]

    def test_urlopen_refresh(self):
        credentials = mock.Mock(wraps=CredentialsStub())
        final_response = ResponseStub(status=http_client.OK)
        # First request will 401, second request will succeed.
        http = HttpStub([ResponseStub(status=http_client.UNAUTHORIZED), final_response])

        authed_http = google.auth.transport.urllib3.AuthorizedHttp(
            credentials, http=http
        )

        authed_http = authed_http.urlopen("GET", "http://example.com")

        assert authed_http == final_response
        assert credentials.before_request.call_count == 2
        assert credentials.refresh.called
        assert http.requests == [
            ("GET", self.TEST_URL, None, {"authorization": "token"}, {}),
            ("GET", self.TEST_URL, None, {"authorization": "token1"}, {}),
        ]

    def test_urlopen_no_default_host(self):
        credentials = mock.create_autospec(service_account.Credentials)

        authed_http = google.auth.transport.urllib3.AuthorizedHttp(credentials)

        authed_http.credentials._create_self_signed_jwt.assert_called_once_with(None)

    def test_urlopen_with_default_host(self):
        default_host = "pubsub.googleapis.com"
        credentials = mock.create_autospec(service_account.Credentials)

        authed_http = google.auth.transport.urllib3.AuthorizedHttp(
            credentials, default_host=default_host
        )

        authed_http.credentials._create_self_signed_jwt.assert_called_once_with(
            "https://{}/".format(default_host)
        )

    def test_proxies(self):
        http = mock.create_autospec(urllib3.PoolManager)
        authed_http = google.auth.transport.urllib3.AuthorizedHttp(None, http=http)

        with authed_http:
            pass

        assert http.__enter__.called
        assert http.__exit__.called

        authed_http.headers = mock.sentinel.headers
        assert authed_http.headers == http.headers

    @mock.patch("google.auth.transport.urllib3._make_mutual_tls_http", autospec=True)
    def test_configure_mtls_channel_with_callback(self, mock_make_mutual_tls_http):
        callback = mock.Mock()
        callback.return_value = (pytest.public_cert_bytes, pytest.private_key_bytes)

        authed_http = google.auth.transport.urllib3.AuthorizedHttp(
            credentials=mock.Mock(), http=mock.Mock()
        )

        with pytest.warns(UserWarning):
            with mock.patch.dict(
                os.environ, {environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE: "true"}
            ):
                is_mtls = authed_http.configure_mtls_channel(callback)

        assert is_mtls
        mock_make_mutual_tls_http.assert_called_once_with(
            cert=pytest.public_cert_bytes, key=pytest.private_key_bytes
        )

    @mock.patch("google.auth.transport.urllib3._make_mutual_tls_http", autospec=True)
    @mock.patch(
        "google.auth.transport._mtls_helper.get_client_cert_and_key", autospec=True
    )
    def test_configure_mtls_channel_with_metadata(
        self, mock_get_client_cert_and_key, mock_make_mutual_tls_http
    ):
        authed_http = google.auth.transport.urllib3.AuthorizedHttp(
            credentials=mock.Mock()
        )

        mock_get_client_cert_and_key.return_value = (
            True,
            pytest.public_cert_bytes,
            pytest.private_key_bytes,
        )
        with mock.patch.dict(
            os.environ, {environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE: "true"}
        ):
            is_mtls = authed_http.configure_mtls_channel()

        assert is_mtls
        mock_get_client_cert_and_key.assert_called_once()
        mock_make_mutual_tls_http.assert_called_once_with(
            cert=pytest.public_cert_bytes, key=pytest.private_key_bytes
        )

    @mock.patch("google.auth.transport.urllib3._make_mutual_tls_http", autospec=True)
    @mock.patch(
        "google.auth.transport._mtls_helper.get_client_cert_and_key", autospec=True
    )
    def test_configure_mtls_channel_non_mtls(
        self, mock_get_client_cert_and_key, mock_make_mutual_tls_http
    ):
        authed_http = google.auth.transport.urllib3.AuthorizedHttp(
            credentials=mock.Mock()
        )

        mock_get_client_cert_and_key.return_value = (False, None, None)
        with mock.patch.dict(
            os.environ, {environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE: "true"}
        ):
            is_mtls = authed_http.configure_mtls_channel()

        assert not is_mtls
        mock_get_client_cert_and_key.assert_called_once()
        mock_make_mutual_tls_http.assert_not_called()

    @mock.patch(
        "google.auth.transport._mtls_helper.get_client_cert_and_key", autospec=True
    )
    def test_configure_mtls_channel_exceptions(self, mock_get_client_cert_and_key):
        authed_http = google.auth.transport.urllib3.AuthorizedHttp(
            credentials=mock.Mock()
        )

        mock_get_client_cert_and_key.side_effect = exceptions.ClientCertError()
        with pytest.raises(exceptions.MutualTLSChannelError):
            with mock.patch.dict(
                os.environ, {environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE: "true"}
            ):
                authed_http.configure_mtls_channel()

        mock_get_client_cert_and_key.return_value = (False, None, None)
        with mock.patch.dict("sys.modules"):
            sys.modules["OpenSSL"] = None
            with pytest.raises(exceptions.MutualTLSChannelError):
                with mock.patch.dict(
                    os.environ,
                    {environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE: "true"},
                ):
                    authed_http.configure_mtls_channel()

    @mock.patch(
        "google.auth.transport._mtls_helper.get_client_cert_and_key", autospec=True
    )
    def test_configure_mtls_channel_without_client_cert_env(
        self, get_client_cert_and_key
    ):
        callback = mock.Mock()

        authed_http = google.auth.transport.urllib3.AuthorizedHttp(
            credentials=mock.Mock(), http=mock.Mock()
        )

        # Test the callback is not called if GOOGLE_API_USE_CLIENT_CERTIFICATE is not set.
        is_mtls = authed_http.configure_mtls_channel(callback)
        assert not is_mtls
        callback.assert_not_called()

        # Test ADC client cert is not used if GOOGLE_API_USE_CLIENT_CERTIFICATE is not set.
        is_mtls = authed_http.configure_mtls_channel(callback)
        assert not is_mtls
        get_client_cert_and_key.assert_not_called()

    def test_clear_pool_on_del(self):
        http = mock.create_autospec(urllib3.PoolManager)
        authed_http = google.auth.transport.urllib3.AuthorizedHttp(
            mock.sentinel.credentials, http=http
        )
        authed_http.__del__()
        http.clear.assert_called_with()

        authed_http.http = None
        authed_http.__del__()
        # Expect it to not crash