1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
# Copyright (C) 2010-2013 Sebastian Rahlf and others (see AUTHORS).
#
# This program is release under the MIT license. You can find the full text of
# the license in the LICENSE file.
import json
import sys
import threading
from werkzeug.serving import make_server
from werkzeug.wrappers import Response, Request
class WSGIServer(threading.Thread):
"""
HTTP server running a WSGI application in its own thread.
"""
def __init__(self, host='127.0.0.1', port=0, application=None, **kwargs):
self.app = application
self._server = make_server(host, port, self.app, **kwargs)
self.server_address = self._server.server_address
super(WSGIServer, self).__init__(
name=self.__class__,
target=self._server.serve_forever)
def __del__(self):
self.stop()
def stop(self):
self._server.shutdown()
@property
def url(self):
host, port = self.server_address
proto = 'http' if self._server.ssl_context is None else 'https'
return '%s://%s:%i' % (proto, host, port)
class ContentServer(WSGIServer):
"""
Small test server which can be taught which content (i.e. string) to serve
with which response code. Try the following snippet for testing API calls::
server = ContentServer(port=8080)
server.start()
print 'Test server running at http://%s:%i' % server.server_address
# any request to http://localhost:8080 will get a 503 response.
server.content = 'Hello World!'
server.code = 503
# ...
# we're done
server.stop()
"""
def __init__(self, host='127.0.0.1', port=0, ssl_context=None):
super(ContentServer, self).__init__(host, port, self, ssl_context=ssl_context)
self.content, self.code = ('', 204) # HTTP 204: No Content
self.headers = {}
self.show_post_vars = False
self.compress = None
self.requests = []
def __call__(self, environ, start_response):
"""
This is the WSGI application.
"""
request = Request(environ)
self.requests.append(request)
if (request.content_type == 'application/x-www-form-urlencoded'
and request.method == 'POST' and self.show_post_vars):
content = json.dumps(request.form)
else:
content = self.content
response = Response(status=self.code)
response.headers.clear()
response.headers.extend(self.headers)
# FIXME get compression working!
# if self.compress == 'gzip':
# content = gzip.compress(content.encode('utf-8'))
# response.content_encoding = 'gzip'
response.data = content
return response(environ, start_response)
def serve_content(self, content, code=200, headers=None):
"""
Serves string content (with specified HTTP error code) as response to
all subsequent request.
:param content: content to be displayed
:param code: HTTP status code
:param headers: HTTP headers to be returned
"""
self.content, self.code = (content, code)
if headers:
self.headers = headers
if __name__ == '__main__': # pragma: no cover
import os.path
import time
app = ContentServer()
server = WSGIServer(application=app)
server.start()
print('HTTP server is running at %s' % server.url)
print('Type <Ctrl-C> to stop')
try:
path = sys.argv[1]
except IndexError:
path = os.path.join(
os.path.dirname(os.path.abspath(__file__)), '..', 'README.rst')
app.serve_content(open(path).read(), 302)
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print('\rstopping...')
server.stop()
|