aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/pytest-localserver/py2/pytest_localserver/http.py
blob: 1ce71562ebda2623306180f0fafeb54b51222074 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# Copyright (C) 2010-2013 Sebastian Rahlf and others (see AUTHORS).
#
# This program is release under the MIT license. You can find the full text of
# the license in the LICENSE file.

import json
import sys
import threading

from werkzeug.serving import make_server
from werkzeug.wrappers import Response, Request


class WSGIServer(threading.Thread):

    """
    HTTP server running a WSGI application in its own thread.
    """

    def __init__(self, host='127.0.0.1', port=0, application=None, **kwargs):
        self.app = application
        self._server = make_server(host, port, self.app, **kwargs)
        self.server_address = self._server.server_address

        super(WSGIServer, self).__init__(
            name=self.__class__,
            target=self._server.serve_forever)

    def __del__(self):
        self.stop()

    def stop(self):
        self._server.shutdown()

    @property
    def url(self):
        host, port = self.server_address
        proto = 'http' if self._server.ssl_context is None else 'https'
        return '%s://%s:%i' % (proto, host, port)


class ContentServer(WSGIServer):

    """
    Small test server which can be taught which content (i.e. string) to serve
    with which response code. Try the following snippet for testing API calls::

        server = ContentServer(port=8080)
        server.start()
        print 'Test server running at http://%s:%i' % server.server_address

        # any request to http://localhost:8080 will get a 503 response.
        server.content = 'Hello World!'
        server.code = 503

        # ...

        # we're done
        server.stop()

    """

    def __init__(self, host='127.0.0.1', port=0, ssl_context=None):
        super(ContentServer, self).__init__(host, port, self, ssl_context=ssl_context)
        self.content, self.code = ('', 204)  # HTTP 204: No Content
        self.headers = {}
        self.show_post_vars = False
        self.compress = None
        self.requests = []

    def __call__(self, environ, start_response):
        """
        This is the WSGI application.
        """
        request = Request(environ)
        self.requests.append(request)
        if (request.content_type == 'application/x-www-form-urlencoded'
        and request.method == 'POST' and self.show_post_vars):
            content = json.dumps(request.form)
        else:
            content = self.content

        response = Response(status=self.code)
        response.headers.clear()
        response.headers.extend(self.headers)

        # FIXME get compression working!
        # if self.compress == 'gzip':
        #     content = gzip.compress(content.encode('utf-8'))
        #     response.content_encoding = 'gzip'

        response.data = content
        return response(environ, start_response)

    def serve_content(self, content, code=200, headers=None):
        """
        Serves string content (with specified HTTP error code) as response to
        all subsequent request.

        :param content: content to be displayed
        :param code: HTTP status code
        :param headers: HTTP headers to be returned
        """
        self.content, self.code = (content, code)
        if headers:
            self.headers = headers


if __name__ == '__main__':  # pragma: no cover
    import os.path
    import time

    app = ContentServer()
    server = WSGIServer(application=app)
    server.start()

    print('HTTP server is running at %s' % server.url)
    print('Type <Ctrl-C> to stop')

    try:
        path = sys.argv[1]
    except IndexError:
        path = os.path.join(
            os.path.dirname(os.path.abspath(__file__)), '..', 'README.rst')

    app.serve_content(open(path).read(), 302)

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print('\rstopping...')
    server.stop()