diff options
author | prateek <prateek@yandex-team.ru> | 2022-02-10 16:50:32 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:32 +0300 |
commit | 06e925754c8de946ff79d538bde1e6424cbd4cbb (patch) | |
tree | 5d5cb817648f650d76cf1076100726fd9b8448e8 | |
parent | 30744531a4767f053be08b22b325594d7ed8ffb3 (diff) | |
download | ydb-06e925754c8de946ff79d538bde1e6424cbd4cbb.tar.gz |
Restoring authorship annotation for <prateek@yandex-team.ru>. Commit 2 of 2.
-rw-r--r-- | library/cpp/messagebus/oldmodule/startsession.h | 2 | ||||
-rw-r--r-- | library/cpp/messagebus/synchandler.cpp | 74 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/sync_client_ut.cpp | 22 | ||||
-rw-r--r-- | library/cpp/messagebus/ybus.h | 4 |
4 files changed, 51 insertions, 51 deletions
diff --git a/library/cpp/messagebus/oldmodule/startsession.h b/library/cpp/messagebus/oldmodule/startsession.h index cb31217dd8..5e26e7e1e5 100644 --- a/library/cpp/messagebus/oldmodule/startsession.h +++ b/library/cpp/messagebus/oldmodule/startsession.h @@ -27,7 +27,7 @@ namespace NBus { public: TBusStarter(TBusModule* module, const TBusSessionConfig& config); ~TBusStarter(); - + void Shutdown(); }; diff --git a/library/cpp/messagebus/synchandler.cpp b/library/cpp/messagebus/synchandler.cpp index c95ada8039..8e891d66b3 100644 --- a/library/cpp/messagebus/synchandler.cpp +++ b/library/cpp/messagebus/synchandler.cpp @@ -1,32 +1,32 @@ #include "remote_client_session.h" #include "remote_connection.h" #include "ybus.h" - + using namespace NBus; using namespace NBus::NPrivate; -///////////////////////////////////////////////////////////////// -/// Object that encapsulates all messgae data required for sending -/// a message synchronously and receiving a reply. It includes: -/// 1. ConditionVariable to wait on message reply -/// 2. Lock used by condition variable -/// 3. Message reply -/// 4. Reply status -struct TBusSyncMessageData { +///////////////////////////////////////////////////////////////// +/// Object that encapsulates all messgae data required for sending +/// a message synchronously and receiving a reply. It includes: +/// 1. ConditionVariable to wait on message reply +/// 2. Lock used by condition variable +/// 3. Message reply +/// 4. Reply status +struct TBusSyncMessageData { TCondVar ReplyEvent; TMutex ReplyLock; TBusMessage* Reply; - EMessageStatus ReplyStatus; - - TBusSyncMessageData() + EMessageStatus ReplyStatus; + + TBusSyncMessageData() : Reply(nullptr) - , ReplyStatus(MESSAGE_DONT_ASK) + , ReplyStatus(MESSAGE_DONT_ASK) { } -}; - +}; + class TSyncHandler: public IBusClientHandler { -public: +public: TSyncHandler(bool expectReply = true) : ExpectReply(expectReply) , Session(nullptr) @@ -34,7 +34,7 @@ public: } ~TSyncHandler() override { } - + void OnReply(TAutoPtr<TBusMessage> pMessage0, TAutoPtr<TBusMessage> pReply0) override { TBusMessage* pMessage = pMessage0.Release(); TBusMessage* pReply = pReply0.Release(); @@ -45,15 +45,15 @@ public: TBusSyncMessageData* data = static_cast<TBusSyncMessageData*>(pMessage->Data); SignalResult(data, pReply, MESSAGE_OK); - } - + } + void OnError(TAutoPtr<TBusMessage> pMessage0, EMessageStatus status) override { TBusMessage* pMessage = pMessage0.Release(); TBusSyncMessageData* data = static_cast<TBusSyncMessageData*>(pMessage->Data); - if (!data) { - return; - } - + if (!data) { + return; + } + SignalResult(data, /*pReply=*/nullptr, status); } @@ -77,18 +77,18 @@ public: private: void SignalResult(TBusSyncMessageData* data, TBusMessage* pReply, EMessageStatus status) const { Y_VERIFY(data, "Message data is set to NULL."); - TGuard<TMutex> G(data->ReplyLock); + TGuard<TMutex> G(data->ReplyLock); data->Reply = pReply; - data->ReplyStatus = status; - data->ReplyEvent.Signal(); - } + data->ReplyStatus = status; + data->ReplyEvent.Signal(); + } private: // This is weird, because in regular client one-way-ness is selected per call, not per session. bool ExpectReply; TRemoteClientSession* Session; -}; - +}; + namespace NBus { namespace NPrivate { #ifdef _MSC_VER @@ -168,31 +168,31 @@ void TBusSyncSourceSession::Shutdown() { TBusMessage* TBusSyncSourceSession::SendSyncMessage(TBusMessage* pMessage, EMessageStatus& status, const TNetAddr* addr) { return Session->SendSyncMessage(pMessage, status, addr); -} - +} + int TBusSyncSourceSession::RegisterService(const char* hostname, TBusKey start, TBusKey end, EIpVersion ipVersion) { return Session->RegisterService(hostname, start, end, ipVersion); } - + int TBusSyncSourceSession::GetInFlight() { return Session->GetInFlight(); } - + const TBusProtocol* TBusSyncSourceSession::GetProto() const { return Session->GetProto(); } - + const TBusClientSession* TBusSyncSourceSession::GetBusClientSessionWorkaroundDoNotUse() const { return Session.Get(); } - + TBusSyncClientSessionPtr TBusMessageQueue::CreateSyncSource(TBusProtocol* proto, const TBusClientSessionConfig& config, bool needReply, const TString& name) { TIntrusivePtr<TBusSyncSourceSessionImpl> session = new TBusSyncSourceSessionImpl(this, proto, config, needReply, name); Add(session.Get()); return new TBusSyncSourceSession(session); } - + void TBusMessageQueue::Destroy(TBusSyncClientSessionPtr session) { Destroy(session->Session.Get()); Y_UNUSED(session->Session.Release()); -} +} diff --git a/library/cpp/messagebus/test/ut/sync_client_ut.cpp b/library/cpp/messagebus/test/ut/sync_client_ut.cpp index b3d5e74652..400128193f 100644 --- a/library/cpp/messagebus/test/ut/sync_client_ut.cpp +++ b/library/cpp/messagebus/test/ut/sync_client_ut.cpp @@ -1,7 +1,7 @@ #include <library/cpp/messagebus/test/helper/example.h> #include <library/cpp/messagebus/test/helper/object_count_check.h> - -namespace NBus { + +namespace NBus { namespace NTest { using namespace std; @@ -9,34 +9,34 @@ namespace NBus { /// \brief Client for sending synchronous message to local server struct TSyncClient { TNetAddr ServerAddr; - + TExampleProtocol Proto; TBusMessageQueuePtr Bus; TBusSyncClientSessionPtr Session; int NumReplies; int NumMessages; - + /// constructor creates instances of queue, protocol and session TSyncClient(const TNetAddr& serverAddr) : ServerAddr(serverAddr) { /// create or get instance of message queue, need one per application Bus = CreateMessageQueue(); - + NumReplies = 0; NumMessages = 10; - + /// register source/client session TBusClientSessionConfig sessionConfig; Session = Bus->CreateSyncSource(&Proto, sessionConfig); Session->RegisterService("localhost"); } - + ~TSyncClient() { Session->Shutdown(); } - + /// dispatch of requests is done here void Work() { for (int i = 0; i < NumMessages; i++) { @@ -49,7 +49,7 @@ namespace NBus { } } }; - + Y_UNIT_TEST_SUITE(SyncClientTest) { Y_UNIT_TEST(TestSync) { TObjectCountCheck objectCountCheck; @@ -63,7 +63,7 @@ namespace NBus { UNIT_ASSERT_EQUAL(server.Session->GetInFlight(), 0); UNIT_ASSERT_EQUAL(client.Session->GetInFlight(), 0); } - } - + } + } } diff --git a/library/cpp/messagebus/ybus.h b/library/cpp/messagebus/ybus.h index b363899855..de21ad8521 100644 --- a/library/cpp/messagebus/ybus.h +++ b/library/cpp/messagebus/ybus.h @@ -29,10 +29,10 @@ #include <util/generic/ptr.h> #include <util/stream/input.h> #include <util/system/atomic.h> -#include <util/system/condvar.h> +#include <util/system/condvar.h> #include <util/system/type_name.h> #include <util/system/event.h> -#include <util/system/mutex.h> +#include <util/system/mutex.h> namespace NBus { //////////////////////////////////////////////////////// |