aboutsummaryrefslogtreecommitdiffstats
path: root/kikimr/persqueue/sdk/deprecated/cpp/v2/ut_utils/test_utils.h
blob: 13b4da27727b072c149d342edb9052ea66593564 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#pragma once
#include <util/generic/ptr.h>
#include <util/generic/size_literals.h>
#include <library/cpp/threading/chunk_queue/queue.h>
#include <util/generic/overloaded.h>
#include <library/cpp/testing/unittest/registar.h>

#include <kikimr/persqueue/sdk/deprecated/cpp/v2/ut_utils/sdk_test_setup.h>
#include <kikimr/persqueue/sdk/deprecated/cpp/v2/persqueue.h>

namespace NPersQueue {

using namespace NThreading;
using namespace NYdb::NPersQueue;
using namespace NKikimr;
using namespace NKikimr::NPersQueueTests;
//using namespace NPersQueue::V1;

template <class TPQLibObject>
void DestroyAndWait(THolder<TPQLibObject>& object) {
    if (object) {
        auto isDead = object->IsDead();
        object = nullptr;
        isDead.GetValueSync();
    }
}

inline bool GrpcV1EnabledByDefault() {
    static const bool enabled = std::getenv("PERSQUEUE_GRPC_API_V1_ENABLED");
    return enabled;
}

class TCallbackCredentialsProvider : public ICredentialsProvider {
    std::function<void(NPersQueue::TCredentials*)> Callback;
public:
    TCallbackCredentialsProvider(std::function<void(NPersQueue::TCredentials*)> callback)
    : Callback(std::move(callback))
    {}

    void FillAuthInfo(NPersQueue::TCredentials* authInfo) const {
        Callback(authInfo);
    }
};

struct TWriteResult {
    bool Ok = false;
    // No acknowledgement is expected from a writer under test
    bool NoWait = false;
    TString ResponseDebugString = TString();
};

struct TAcknowledgableMessage {
    TString Value;
    ui64 SequenceNumber;
    TInstant CreatedAt;
    TPromise<TWriteResult> AckPromise;
};

class IClientEventLoop {
protected:
    std::atomic_bool MayStop;
    std::atomic_bool MustStop;
    bool Stopped = false;
    std::unique_ptr<TThread> Thread;
    TLog Log;

public:
    IClientEventLoop()
    : MayStop()
    , MustStop()
    , MessageBuffer()
    {}

    void AllowStop() {
        MayStop = true;
    }

    void WaitForStop() {
        if (!Stopped) {
            Log << TLOG_INFO << "Wait for writer to die on itself";
            Thread->Join();
            Log << TLOG_INFO << "Client write event loop stopped";
        }
        Stopped = true;
    }

    virtual ~IClientEventLoop() {
        MustStop = true;
        if (!Stopped) {
            Log << TLOG_INFO << "Wait for client write event loop to stop";
            Thread->Join();
            Log << TLOG_INFO << "Client write event loop stopped";
        }
        Stopped = true;
    }

    TManyOneQueue<TAcknowledgableMessage> MessageBuffer;

};

} // namespace NPersQueue