1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
|
# Copyright (C) 2010-2013 Sebastian Rahlf and others (see AUTHORS).
#
# This program is release under the MIT license. You can find the full text of
# the license in the LICENSE file.
import enum
import itertools
import json
import sys
import threading
from werkzeug.datastructures import Headers
from werkzeug.serving import make_server
from werkzeug.wrappers import Request
from werkzeug.wrappers import Response
class WSGIServer(threading.Thread):
"""
HTTP server running a WSGI application in its own thread.
"""
def __init__(self, host="127.0.0.1", port=0, application=None, **kwargs):
self.app = application
self._server = make_server(host, port, self.app, **kwargs)
self.server_address = self._server.server_address
super().__init__(name=self.__class__, target=self._server.serve_forever)
def __del__(self):
self.stop()
def stop(self):
try:
server = self._server
except AttributeError:
pass
else:
server.shutdown()
@property
def url(self):
host, port = self.server_address
proto = "http" if self._server.ssl_context is None else "https"
return "%s://%s:%i" % (proto, host, port)
class Chunked(enum.Enum):
NO = False
YES = True
AUTO = None
def __bool__(self):
return bool(self.value)
def _encode_chunk(chunk, charset):
if isinstance(chunk, str):
chunk = chunk.encode(charset)
return "{:x}".format(len(chunk)).encode(charset) + b"\r\n" + chunk + b"\r\n"
class ContentServer(WSGIServer):
"""
Small test server which can be taught which content (i.e. string) to serve
with which response code. Try the following snippet for testing API calls::
server = ContentServer(port=8080)
server.start()
print 'Test server running at http://%s:%i' % server.server_address
# any request to http://localhost:8080 will get a 503 response.
server.content = 'Hello World!'
server.code = 503
# ...
# we're done
server.stop()
"""
def __init__(self, host="127.0.0.1", port=0, ssl_context=None):
super().__init__(host, port, self, ssl_context=ssl_context)
self.content, self.code = ("", 204) # HTTP 204: No Content
self.headers = {}
self.show_post_vars = False
self.compress = None
self.requests = []
self.chunked = Chunked.NO
def __call__(self, environ, start_response):
"""
This is the WSGI application.
"""
request = Request(environ)
self.requests.append(request)
if (
request.content_type == "application/x-www-form-urlencoded"
and request.method == "POST"
and self.show_post_vars
):
content = json.dumps(request.form)
else:
content = self.content
if self.chunked == Chunked.YES or (
self.chunked == Chunked.AUTO and "chunked" in self.headers.get("Transfer-encoding", "")
):
# If the code below ever changes to allow setting the charset of
# the Response object, the charset used here should also be changed
# to match. But until that happens, use UTF-8 since it is Werkzeug's
# default.
charset = "utf-8"
if isinstance(content, (str, bytes)):
content = (_encode_chunk(content, charset), "0\r\n\r\n")
else:
content = itertools.chain((_encode_chunk(item, charset) for item in content), ["0\r\n\r\n"])
response = Response(response=content, status=self.code)
response.headers.clear()
response.headers.extend(self.headers)
# FIXME get compression working!
# if self.compress == 'gzip':
# content = gzip.compress(content.encode('utf-8'))
# response.content_encoding = 'gzip'
return response(environ, start_response)
def serve_content(self, content, code=200, headers=None, chunked=Chunked.NO):
"""
Serves string content (with specified HTTP error code) as response to
all subsequent request.
:param content: content to be displayed
:param code: HTTP status code
:param headers: HTTP headers to be returned
:param chunked: whether to apply chunked transfer encoding to the content
"""
if not isinstance(content, (str, bytes, list, tuple)):
# If content is an iterable which is not known to be a string,
# bytes, or sequence, it might be something that can only be iterated
# through once, in which case we need to cache it so it can be reused
# to handle multiple requests.
try:
content = tuple(iter(content))
except TypeError:
# this probably means that content is not iterable, so just go
# ahead in case it's some type that Response knows how to handle
pass
self.content = content
self.code = code
self.chunked = chunked
if headers:
self.headers = Headers(headers)
if __name__ == "__main__": # pragma: no cover
import os.path
import time
app = ContentServer()
server = WSGIServer(application=app)
server.start()
print("HTTP server is running at %s" % server.url)
print("Type <Ctrl-C> to stop")
try:
path = sys.argv[1]
except IndexError:
path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "README.rst")
app.serve_content(open(path).read(), 302)
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\rstopping...")
server.stop()
|