aboutsummaryrefslogtreecommitdiffstats
path: root/kikimr/yndx/grpc_services/persqueue/protocol_compatibility_ut.cpp
blob: f69080ea6b839e1c4c5bf11ef2ab232efe7d9671 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h>
#include <ydb/core/client/server/msgbus_server_pq_metacache.h>
#include <kikimr/persqueue/sdk/deprecated/cpp/v2/persqueue.h>
#include <kikimr/yndx/grpc_services/persqueue/persqueue.h>
#include <kikimr/yndx/persqueue/msgbus_server/read_session_info.h>


namespace NKikimr {
namespace NPersQueueTests {

Y_UNIT_TEST_SUITE(TPersQueueProtocolCompatibility) {
    Y_UNIT_TEST(GetReadInfoFromV1AboutV0Session) {
        NKikimr::Tests::TServerSettings serverSettings = PQSettings(0);
        serverSettings.RegisterGrpcService<NKikimr::NGRpcService::TGRpcPersQueueService>(
            "pq",
            NKikimr::NMsgBusProxy::CreatePersQueueMetaCacheV2Id()
        );
        serverSettings.SetPersQueueGetReadSessionsInfoWorkerFactory(
            std::make_shared<NKikimr::NMsgBusProxy::TPersQueueGetReadSessionsInfoWorkerWithPQv0Factory>()
        );

        NPersQueue::TTestServer server(serverSettings);
        server.EnableLogs({ NKikimrServices::PERSQUEUE, NKikimrServices::PQ_READ_PROXY });
        server.AnnoyingClient->CreateTopic("rt3.dc1--topic1", 1);

        NPersQueue::TPQLibSettings pqSettings;
        pqSettings.DefaultLogger = new NPersQueue::TCerrLogger(NPersQueue::DEBUG_LOG_LEVEL);
        THolder<NPersQueue::TPQLib> PQLib = MakeHolder<NPersQueue::TPQLib>(pqSettings);

        NPersQueue::TConsumerSettings settings;
        settings.Server = NPersQueue::TServerSetting{"localhost", server.GrpcPort};
        settings.ClientId = "user";
        settings.Topics = {"topic1"};
        settings.UseLockSession = true;
        auto consumer = PQLib->CreateConsumer(settings);
        auto response = consumer->Start().GetValueSync();
        UNIT_ASSERT_C(response.Response.HasInit(), response.Response);

        auto msg = consumer->GetNextMessage();
        auto value = msg.ExtractValueSync();
        Cerr << value.Response << "\n";
        UNIT_ASSERT(value.Response.HasLock());
        value.ReadyToRead.SetValue(NPersQueue::TLockInfo{});
        auto lock = value.Response.GetLock();
        Cout << lock.DebugString() << Endl;
        {
            std::shared_ptr<grpc::Channel> channel;
            std::unique_ptr<Ydb::PersQueue::V1::PersQueueService::Stub> stub;

            {
                channel = grpc::CreateChannel(
                    "localhost:" + ToString(server.GrpcPort),
                    grpc::InsecureChannelCredentials()
                );
                stub = Ydb::PersQueue::V1::PersQueueService::NewStub(channel);
            }
            {
                Sleep(TDuration::Seconds(10));
                Ydb::PersQueue::V1::ReadInfoRequest request;
                Ydb::PersQueue::V1::ReadInfoResponse response;
                request.mutable_consumer()->set_path("user");
                request.set_get_only_original(true);
                request.add_topics()->set_path("topic1");
                grpc::ClientContext rcontext;
                auto status = stub->GetReadSessionsInfo(&rcontext, request, &response);
                UNIT_ASSERT(status.ok());
                Ydb::PersQueue::V1::ReadInfoResult res;
                response.operation().result().UnpackTo(&res);
                Cerr << "Read info response: " << response << Endl << res << Endl;
                UNIT_ASSERT_VALUES_EQUAL(res.topics_size(), 1);
                UNIT_ASSERT(res.topics(0).status() == Ydb::StatusIds::SUCCESS);
            }
        }

    }

} // Y_UNIT_TEST_SUITE(TPersQueueProtocolCompatibility)

} // namespace NPersQueueTests
} // namespace NKikimr