#pragma once #include <library/cpp/testing/unittest/registar.h> #include "alloc_counter.h" #include "message_handler_error.h" #include <library/cpp/messagebus/ybus.h> #include <library/cpp/messagebus/misc/test_sync.h> #include <util/system/event.h> namespace NBus { namespace NTest { class TExampleRequest: public TBusMessage { friend class TExampleProtocol; private: TAllocCounter AllocCounter; public: TString Data; public: TExampleRequest(TAtomic* counterPtr, size_t payloadSize = 320); TExampleRequest(ECreateUninitialized, TAtomic* counterPtr); }; class TExampleResponse: public TBusMessage { friend class TExampleProtocol; private: TAllocCounter AllocCounter; public: TString Data; TExampleResponse(TAtomic* counterPtr, size_t payloadSize = 320); TExampleResponse(ECreateUninitialized, TAtomic* counterPtr); }; class TExampleProtocol: public TBusProtocol { public: TAtomic RequestCount; TAtomic ResponseCount; TAtomic RequestCountDeserialized; TAtomic ResponseCountDeserialized; TAtomic StartCount; TExampleProtocol(int port = 0); ~TExampleProtocol() override; void Serialize(const TBusMessage* message, TBuffer& buffer) override; TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override; }; class TExampleClient: private TBusClientHandlerError { public: TExampleProtocol Proto; bool UseCompression; bool CrashOnError; size_t DataSize; ssize_t MessageCount; TAtomic RepliesCount; TAtomic Errors; EMessageStatus LastError; TSystemEvent WorkDone; TBusMessageQueuePtr Bus; TBusClientSessionPtr Session; public: TExampleClient(const TBusClientSessionConfig sessionConfig = TBusClientSessionConfig(), int port = 0); ~TExampleClient() override; EMessageStatus SendMessage(const TNetAddr* addr = nullptr); void SendMessages(size_t count, const TNetAddr* addr = nullptr); void SendMessages(size_t count, const TNetAddr& addr); void ResetCounters(); void WaitReplies(); EMessageStatus WaitForError(); void WaitForError(EMessageStatus status); void SendMessagesWaitReplies(size_t count, const TNetAddr* addr = nullptr); void SendMessagesWaitReplies(size_t count, const TNetAddr& addr); void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override; void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus) override; }; class TExampleServer: private TBusServerHandlerError { public: TExampleProtocol Proto; bool UseCompression; bool AckMessageBeforeSendReply; TMaybe<size_t> DataSize; // Nothing means use request size bool ForgetRequest; TTestSync TestSync; TBusMessageQueuePtr Bus; TBusServerSessionPtr Session; public: TExampleServer( const char* name = "TExampleServer", const TBusServerSessionConfig& sessionConfig = TBusServerSessionConfig()); TExampleServer(unsigned port, const char* name = "TExampleServer"); ~TExampleServer() override; public: size_t GetInFlight() const; unsigned GetActualListenPort() const; // any of TNetAddr GetActualListenAddr() const; void WaitForOnMessageCount(unsigned n); protected: void OnMessage(TOnMessageContext& mess) override; }; } }