aboutsummaryrefslogtreecommitdiffstats
path: root/kikimr/yndx/grpc_services/persqueue/persqueue_compat_ut.cpp
blob: 7b9b117bcde471895601bcf752a9e3f440783ae8 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#include <kikimr/persqueue/sdk/deprecated/cpp/v2/ut_utils/test_pqlib.h>
#include <kikimr/persqueue/sdk/deprecated/cpp/v2/ut_utils/data_writer.h>
#include <kikimr/persqueue/sdk/deprecated/cpp/v2/types.h>

namespace NKikimr::NPersQueueTests {

using namespace NPersQueue::NTests;

class TPQv0CompatTestBase {
public:
    THolder<TTestPQLib> PQLib;
    THolder<::NPersQueue::TTestServer> Server;
    TString OriginalLegacyName1;
    TString OriginalModernName1;
    TString MirroredLegacyName1;
    TString MirroredModernName1;
    TString ShortName1;

    TString OriginalLegacyName2;
    TString OriginalModernName2;
    TString MirroredLegacyName2;
    TString MirroredModernName2;
    TString ShortName2;

public:
    TPQv0CompatTestBase()
    {
        Server = MakeHolder<::NPersQueue::TTestServer>(false);
        Server->ServerSettings.PQConfig.MutablePQDiscoveryConfig()->SetLbUserDatabaseRoot("/Root/LB");
        Server->ServerSettings.PQConfig.SetCheckACL(false);
        Server->StartServer();
        Server->EnableLogs({ NKikimrServices::KQP_PROXY }, NActors::NLog::PRI_EMERG);
        Server->EnableLogs({ NKikimrServices::PERSQUEUE }, NActors::NLog::PRI_INFO);
        Server->EnableLogs({ NKikimrServices::PQ_METACACHE }, NActors::NLog::PRI_DEBUG);
        OriginalLegacyName1 = "rt3.dc1--account--topic1";
        MirroredLegacyName1 = "rt3.dc2--account--topic1";
        OriginalModernName1 = "/Root/LB/account/topic1";
        MirroredModernName1 = "/Root/LB/account/.topic2/mirrored-from-dc2";
        ShortName1 = "account/topic1";

        OriginalLegacyName2 = "rt3.dc1--account--topic2";
        MirroredLegacyName2 = "rt3.dc2--account--topic2";
        OriginalModernName2 = "/Root/LB/account/topic2";
        MirroredModernName2 = "/Root/LB/account/.topic2/mirrored-from-dc2";
        ShortName2 = "account/topic2";

        Server->AnnoyingClient->CreateTopicNoLegacy(OriginalLegacyName1, 1, false);
        Server->AnnoyingClient->CreateTopicNoLegacy(MirroredLegacyName1, 1, false);
        Server->AnnoyingClient->CreateTopicNoLegacy(OriginalModernName2, 1, true, true, "dc1");
        Server->AnnoyingClient->CreateTopicNoLegacy(MirroredModernName2, 1, true, true, "dc2");
        Server->AnnoyingClient->CreateConsumer("test-consumer");
        InitPQLib();
    }
    void InitPQLib() {
        PQLib = MakeHolder<TTestPQLib>(*Server);
        TPQDataWriter writer{OriginalLegacyName1, ShortName1, "test", *Server};
        writer.WaitWritePQServiceInitialization();
    };
};

Y_UNIT_TEST_SUITE(TPQCompatTest) {
    Y_UNIT_TEST(DiscoverTopics) {
        TPQv0CompatTestBase testServer;
        Cerr << "Create producer\n";
        {
            auto [producer, res] = testServer.PQLib->CreateProducer(testServer.ShortName2, "123", {}, ::NPersQueue::ECodec::RAW);
            Cerr << "Got response: " << res.Response.ShortDebugString() << Endl;
            UNIT_ASSERT(res.Response.HasInit());
        }
        Cerr << "Create producer(2)\n";
        {
            auto [producer, res] = testServer.PQLib->CreateProducer(testServer.ShortName1, "123", {}, ::NPersQueue::ECodec::RAW);
            UNIT_ASSERT(res.Response.HasInit());
        }
    }

    Y_UNIT_TEST(SetupLockSession) {
        TPQv0CompatTestBase server{};
        auto [consumer, startResult] = server.PQLib->CreateConsumer({server.ShortName1}, "test-consumer", 1, true);
        Cerr << startResult.Response << "\n";
        for (ui32 i = 0; i < 2; ++i) {
            auto msg = consumer->GetNextMessage();
            msg.Wait();
            Cerr << "Response: " << msg.GetValue().Response << "\n";
            UNIT_ASSERT(msg.GetValue().Response.HasLock());
            UNIT_ASSERT(msg.GetValue().Response.GetLock().GetTopic() == server.OriginalLegacyName1
                        || msg.GetValue().Response.GetLock().GetTopic() == server.MirroredLegacyName1);
            UNIT_ASSERT(msg.GetValue().Response.GetLock().GetPartition() == 0);
        }
        auto msg = consumer->GetNextMessage();
        UNIT_ASSERT(!msg.Wait(TDuration::Seconds(1)));
        server.Server->AnnoyingClient->AlterTopic(server.MirroredLegacyName1, 2);
        msg.Wait();
        UNIT_ASSERT(msg.GetValue().Response.HasLock());
        UNIT_ASSERT(msg.GetValue().Response.GetLock().GetTopic() == server.MirroredLegacyName1);
        UNIT_ASSERT(msg.GetValue().Response.GetLock().GetPartition() == 1);
    }

    Y_UNIT_TEST(LegacyRequests) {
        TPQv0CompatTestBase server{};
        server.Server->AnnoyingClient->GetPartOffset(
                {
                    {server.OriginalLegacyName1, {0}},
                    {server.MirroredLegacyName1, {0}},
                    {server.OriginalLegacyName2, {0}},
                    {server.MirroredLegacyName2, {0}},
                },
                4, 0, true
        );
        server.Server->AnnoyingClient->SetClientOffsetPQ(server.OriginalLegacyName2, 0, 5);
        server.Server->AnnoyingClient->SetClientOffsetPQ(server.MirroredLegacyName2, 0, 5);

            server.Server->AnnoyingClient->GetPartOffset(
                    {
                            {server.OriginalLegacyName2, {0}},
                            {server.MirroredLegacyName2, {0}},
                    },
                    2, 2, true
            );
    }
}
} //namespace NKikimr::NPersQueueTests;