aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/test/helper/example.h
blob: 1566b3c53947604ff868c428cf2fce366d270ad3 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
#pragma once

#include <library/cpp/testing/unittest/registar.h>

#include "alloc_counter.h"
#include "message_handler_error.h"

#include <library/cpp/messagebus/ybus.h>
#include <library/cpp/messagebus/misc/test_sync.h>
 
#include <util/system/event.h> 
 
namespace NBus {
    namespace NTest {
        class TExampleRequest: public TBusMessage {
            friend class TExampleProtocol;

        private:
            TAllocCounter AllocCounter;

        public:
            TString Data;

        public:
            TExampleRequest(TAtomic* counterPtr, size_t payloadSize = 320);
            TExampleRequest(ECreateUninitialized, TAtomic* counterPtr);
        };

        class TExampleResponse: public TBusMessage {
            friend class TExampleProtocol;

        private:
            TAllocCounter AllocCounter;

        public:
            TString Data;
            TExampleResponse(TAtomic* counterPtr, size_t payloadSize = 320);
            TExampleResponse(ECreateUninitialized, TAtomic* counterPtr);
        };

        class TExampleProtocol: public TBusProtocol {
        public:
            TAtomic RequestCount;
            TAtomic ResponseCount;
            TAtomic RequestCountDeserialized;
            TAtomic ResponseCountDeserialized;
            TAtomic StartCount;

            TExampleProtocol(int port = 0);

            ~TExampleProtocol() override;

            void Serialize(const TBusMessage* message, TBuffer& buffer) override;

            TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override;
        };

        class TExampleClient: private TBusClientHandlerError {
        public:
            TExampleProtocol Proto;
            bool UseCompression;
            bool CrashOnError;
            size_t DataSize;

            ssize_t MessageCount;
            TAtomic RepliesCount;
            TAtomic Errors;
            EMessageStatus LastError;

            TSystemEvent WorkDone;

            TBusMessageQueuePtr Bus;
            TBusClientSessionPtr Session;

        public:
            TExampleClient(const TBusClientSessionConfig sessionConfig = TBusClientSessionConfig(), int port = 0);
            ~TExampleClient() override;

            EMessageStatus SendMessage(const TNetAddr* addr = nullptr);

            void SendMessages(size_t count, const TNetAddr* addr = nullptr);
            void SendMessages(size_t count, const TNetAddr& addr);

            void ResetCounters();
            void WaitReplies();
            EMessageStatus WaitForError();
            void WaitForError(EMessageStatus status);

            void SendMessagesWaitReplies(size_t count, const TNetAddr* addr = nullptr);
            void SendMessagesWaitReplies(size_t count, const TNetAddr& addr);

            void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override;

            void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus) override;
        };

        class TExampleServer: private TBusServerHandlerError {
        public:
            TExampleProtocol Proto;
            bool UseCompression;
            bool AckMessageBeforeSendReply;
            TMaybe<size_t> DataSize; // Nothing means use request size
            bool ForgetRequest;

            TTestSync TestSync;

            TBusMessageQueuePtr Bus;
            TBusServerSessionPtr Session;

        public:
            TExampleServer(
                const char* name = "TExampleServer",
                const TBusServerSessionConfig& sessionConfig = TBusServerSessionConfig());

            TExampleServer(unsigned port, const char* name = "TExampleServer");

            ~TExampleServer() override;

        public:
            size_t GetInFlight() const;
            unsigned GetActualListenPort() const;
            // any of
            TNetAddr GetActualListenAddr() const;

            void WaitForOnMessageCount(unsigned n);

        protected:
            void OnMessage(TOnMessageContext& mess) override;
        };

    }
}