aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/http/ut/http_ut.cpp
blob: e41e83c5a041f105ec4bb815be4feffa8491437c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#include "simple_server.h"

#include <yt/cpp/mapreduce/http/http.h>

#include <yt/cpp/mapreduce/interface/config.h>

#include <library/cpp/testing/common/network.h>

#include <library/cpp/testing/gtest/gtest.h>

#include <util/system/byteorder.h>

using namespace NYT;

void WriteDataFrame(TStringBuf string, IOutputStream* stream)
{
    stream->Write("\x01");
    ui32 size = string.size();
    auto littleEndianSize = HostToLittle(size);
    stream->Write(&littleEndianSize, sizeof(littleEndianSize));
    stream->Write(string);
}

THolder<TSimpleServer> CreateFramingEchoServer()
{
    auto port = NTesting::GetFreePort();
    return MakeHolder<TSimpleServer>(
        port,
        [] (IInputStream* input, IOutputStream* output) {
            try {
                THttpInput httpInput(input);
                if (!httpInput.Headers().FindHeader("X-YT-Accept-Framing")) {
                    FAIL() << "X-YT-Accept-Framing header not found";
                }
                auto input = httpInput.ReadAll();

                THttpOutput httpOutput(output);
                httpOutput << "HTTP/1.1 200 OK\r\n";
                httpOutput << "X-YT-Framing: 1\r\n";
                httpOutput << "\r\n";
                httpOutput << "\x02\x02"; // Two KeepAlive frames.
                WriteDataFrame("", &httpOutput);
                WriteDataFrame(TStringBuf(input).SubString(0, 10), &httpOutput);
                httpOutput << "\x02"; // KeepAlive.
                WriteDataFrame("", &httpOutput);
                WriteDataFrame(TStringBuf(input).SubString(10, std::string::npos), &httpOutput);
                httpOutput << "\x02"; // KeepAlive.

                httpOutput.Flush();
            } catch (const std::exception& exc) {
            }
        });
}

TEST(THttpHeaderTest, AddParameter)
{
    THttpHeader header("POST", "/foo");
    header.AddMutationId();

    auto id1 = header.GetParameters()["mutation_id"].AsString();

    header.AddMutationId();

    auto id2 = header.GetParameters()["mutation_id"].AsString();

    EXPECT_TRUE(id1 != id2);
}

TEST(TFramingTest, FramingSimple)
{
    auto server = CreateFramingEchoServer();

    THttpRequest request("0-0-0-0", server->GetAddress(), THttpHeader("POST", "concatenate"), TDuration::Zero());
    auto requestStream = request.StartRequest();
    *requestStream << "Some funny data";
    request.FinishRequest();
    auto response = request.GetResponseStream()->ReadAll();
    EXPECT_EQ(response, "Some funny data");
}

TEST(TFramingTest, FramingLarge)
{
    auto server = CreateFramingEchoServer();

    THttpRequest request("0-0-0-0", server->GetAddress(), THttpHeader("POST", "concatenate"), TDuration::Zero());
    auto requestStream = request.StartRequest();
    auto data = TString(100000, 'x');
    *requestStream << data;
    request.FinishRequest();
    auto response = request.GetResponseStream()->ReadAll();
    EXPECT_EQ(response, data);
}