1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
|
import json
import struct
import mock
import pytest # type: ignore
from google.auth import exceptions
from google.oauth2 import webauthn_handler
from google.oauth2 import webauthn_types
@pytest.fixture
def os_get_stub():
with mock.patch.object(
webauthn_handler.os.environ,
"get",
return_value="gcloud_webauthn_plugin",
name="fake os.environ.get",
) as mock_os_environ_get:
yield mock_os_environ_get
@pytest.fixture
def subprocess_run_stub():
with mock.patch.object(
webauthn_handler.subprocess, "run", name="fake subprocess.run"
) as mock_subprocess_run:
yield mock_subprocess_run
def test_PluginHandler_is_available(os_get_stub):
test_handler = webauthn_handler.PluginHandler()
assert test_handler.is_available() is True
os_get_stub.return_value = None
assert test_handler.is_available() is False
GET_ASSERTION_REQUEST = webauthn_types.GetRequest(
origin="fake_origin",
rpid="fake_rpid",
challenge="fake_challenge",
allow_credentials=[webauthn_types.PublicKeyCredentialDescriptor(id="fake_id_1")],
)
def test_malformated_get_assertion_response(os_get_stub, subprocess_run_stub):
response_len = struct.pack("<I", 5)
response = "1234567890"
mock_response = mock.Mock()
mock_response.stdout = response_len + response.encode()
subprocess_run_stub.return_value = mock_response
test_handler = webauthn_handler.PluginHandler()
with pytest.raises(exceptions.MalformedError) as excinfo:
test_handler.get(GET_ASSERTION_REQUEST)
assert "Plugin response length" in str(excinfo.value)
def test_failure_get_assertion(os_get_stub, subprocess_run_stub):
failure_response = {
"type": "getResponse",
"error": "fake_plugin_get_assertion_failure",
}
response_json = json.dumps(failure_response).encode()
response_len = struct.pack("<I", len(response_json))
# process returns get response in json
mock_response = mock.Mock()
mock_response.stdout = response_len + response_json
subprocess_run_stub.return_value = mock_response
test_handler = webauthn_handler.PluginHandler()
with pytest.raises(exceptions.ReauthFailError) as excinfo:
test_handler.get(GET_ASSERTION_REQUEST)
assert failure_response["error"] in str(excinfo.value)
def test_success_get_assertion(os_get_stub, subprocess_run_stub):
success_response = {
"type": "public-key",
"id": "fake-id",
"authenticatorAttachment": "cross-platform",
"clientExtensionResults": {"appid": True},
"response": {
"clientDataJSON": "fake_client_data_json_base64",
"authenticatorData": "fake_authenticator_data_base64",
"signature": "fake_signature_base64",
"userHandle": "fake_user_handle_base64",
},
}
valid_plugin_response = {"type": "getResponse", "responseData": success_response}
valid_plugin_response_json = json.dumps(valid_plugin_response).encode()
valid_plugin_response_len = struct.pack("<I", len(valid_plugin_response_json))
# process returns get response in json
mock_response = mock.Mock()
mock_response.stdout = valid_plugin_response_len + valid_plugin_response_json
subprocess_run_stub.return_value = mock_response
# Call get()
test_handler = webauthn_handler.PluginHandler()
got_response = test_handler.get(GET_ASSERTION_REQUEST)
# Validate expected plugin request
os_get_stub.assert_called_once()
subprocess_run_stub.assert_called_once()
stdin_input = subprocess_run_stub.call_args.kwargs["input"]
input_json_len_le = stdin_input[:4]
input_json_len = struct.unpack("<I", input_json_len_le)[0]
input_json = stdin_input[4:]
assert len(input_json) == input_json_len
input_dict = json.loads(input_json.decode("utf8"))
assert input_dict == {
"type": "get",
"origin": "fake_origin",
"requestData": {
"rpid": "fake_rpid",
"challenge": "fake_challenge",
"allowCredentials": [{"type": "public-key", "id": "fake_id_1"}],
},
}
# Validate get assertion response
assert got_response.id == success_response["id"]
assert (
got_response.authenticator_attachment
== success_response["authenticatorAttachment"]
)
assert (
got_response.client_extension_results
== success_response["clientExtensionResults"]
)
assert (
got_response.response.client_data_json
== success_response["response"]["clientDataJSON"]
)
assert (
got_response.response.authenticator_data
== success_response["response"]["authenticatorData"]
)
assert got_response.response.signature == success_response["response"]["signature"]
assert (
got_response.response.user_handle == success_response["response"]["userHandle"]
)
|