aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/io/ut/end_of_stream_ut.cpp
blob: 449c966a3bad91909b6ec761eef134c75ffa42c5 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#include <yt/cpp/mapreduce/io/node_table_reader.h>

#include <library/cpp/testing/gtest/gtest.h>

using namespace NYT;

////////////////////////////////////////////////////////////////////////////////

class TStringRawTableReader
    : public TRawTableReader
{
public:
    TStringRawTableReader(const TString& string)
        : String_(string)
        , Stream_(String_)
    { }

    bool Retry(const TMaybe<ui32>&, const TMaybe<ui64>&, const std::exception_ptr&) override
    {
        return false;
    }

    void ResetRetries() override
    { }

    bool HasRangeIndices() const override
    {
        return false;
    }

private:
    size_t DoRead(void* buf, size_t len) override
    {
        return Stream_.Read(buf, len);
    }

private:
    const TString String_;
    TStringStream Stream_;
};

////////////////////////////////////////////////////////////////////////////////

class TEndOfStreamTest
    : public ::testing::TestWithParam<bool>
{ };

TEST_P(TEndOfStreamTest, Eos)
{
    bool addEos = GetParam();
    auto proxy = ::MakeIntrusive<TStringRawTableReader>(TString::Join(
        "{a=13;b = \"string\"}; {c = {d=12}};",
        "<key_switch=%true>#; {e = 42};",
        addEos ? "<end_of_stream=%true>#" : ""
    ));

    TNodeTableReader reader(proxy);
    TVector<TNode> expectedRows = {TNode()("a", 13)("b", "string"), TNode()("c", TNode()("d", 12))};
    for (const auto& expectedRow : expectedRows) {
        EXPECT_TRUE(reader.IsValid());
        EXPECT_TRUE(!reader.IsEndOfStream());
        EXPECT_TRUE(!reader.IsRawReaderExhausted());
        EXPECT_EQ(reader.GetRow(), expectedRow);
        reader.Next();
    }

    EXPECT_TRUE(!reader.IsValid());
    EXPECT_TRUE(!reader.IsEndOfStream());
    EXPECT_TRUE(!reader.IsRawReaderExhausted());

    reader.NextKey();
    reader.Next();
    expectedRows = {TNode()("e", 42)};
    for (const auto& expectedRow : expectedRows) {
        EXPECT_TRUE(reader.IsValid());
        EXPECT_TRUE(!reader.IsEndOfStream());
        EXPECT_TRUE(!reader.IsRawReaderExhausted());
        EXPECT_EQ(reader.GetRow(), expectedRow);
        reader.Next();
    }

    EXPECT_TRUE(!reader.IsValid());
    if (addEos) {
        EXPECT_TRUE(reader.IsEndOfStream());
    } else {
        EXPECT_TRUE(!reader.IsEndOfStream());
    }
    EXPECT_TRUE(reader.IsRawReaderExhausted());
}

INSTANTIATE_TEST_SUITE_P(WithEos, TEndOfStreamTest, ::testing::Values(true));
INSTANTIATE_TEST_SUITE_P(WithoutEos, TEndOfStreamTest, ::testing::Values(false));

////////////////////////////////////////////////////////////////////////////////