diff options
author | prateek <prateek@yandex-team.ru> | 2022-02-10 16:50:32 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:32 +0300 |
commit | 06e925754c8de946ff79d538bde1e6424cbd4cbb (patch) | |
tree | 5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/messagebus/synchandler.cpp | |
parent | 30744531a4767f053be08b22b325594d7ed8ffb3 (diff) | |
download | ydb-06e925754c8de946ff79d538bde1e6424cbd4cbb.tar.gz |
Restoring authorship annotation for <prateek@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/synchandler.cpp')
-rw-r--r-- | library/cpp/messagebus/synchandler.cpp | 74 |
1 files changed, 37 insertions, 37 deletions
diff --git a/library/cpp/messagebus/synchandler.cpp b/library/cpp/messagebus/synchandler.cpp index c95ada8039..8e891d66b3 100644 --- a/library/cpp/messagebus/synchandler.cpp +++ b/library/cpp/messagebus/synchandler.cpp @@ -1,32 +1,32 @@ #include "remote_client_session.h" #include "remote_connection.h" #include "ybus.h" - + using namespace NBus; using namespace NBus::NPrivate; -///////////////////////////////////////////////////////////////// -/// Object that encapsulates all messgae data required for sending -/// a message synchronously and receiving a reply. It includes: -/// 1. ConditionVariable to wait on message reply -/// 2. Lock used by condition variable -/// 3. Message reply -/// 4. Reply status -struct TBusSyncMessageData { +///////////////////////////////////////////////////////////////// +/// Object that encapsulates all messgae data required for sending +/// a message synchronously and receiving a reply. It includes: +/// 1. ConditionVariable to wait on message reply +/// 2. Lock used by condition variable +/// 3. Message reply +/// 4. Reply status +struct TBusSyncMessageData { TCondVar ReplyEvent; TMutex ReplyLock; TBusMessage* Reply; - EMessageStatus ReplyStatus; - - TBusSyncMessageData() + EMessageStatus ReplyStatus; + + TBusSyncMessageData() : Reply(nullptr) - , ReplyStatus(MESSAGE_DONT_ASK) + , ReplyStatus(MESSAGE_DONT_ASK) { } -}; - +}; + class TSyncHandler: public IBusClientHandler { -public: +public: TSyncHandler(bool expectReply = true) : ExpectReply(expectReply) , Session(nullptr) @@ -34,7 +34,7 @@ public: } ~TSyncHandler() override { } - + void OnReply(TAutoPtr<TBusMessage> pMessage0, TAutoPtr<TBusMessage> pReply0) override { TBusMessage* pMessage = pMessage0.Release(); TBusMessage* pReply = pReply0.Release(); @@ -45,15 +45,15 @@ public: TBusSyncMessageData* data = static_cast<TBusSyncMessageData*>(pMessage->Data); SignalResult(data, pReply, MESSAGE_OK); - } - + } + void OnError(TAutoPtr<TBusMessage> pMessage0, EMessageStatus status) override { TBusMessage* pMessage = pMessage0.Release(); TBusSyncMessageData* data = static_cast<TBusSyncMessageData*>(pMessage->Data); - if (!data) { - return; - } - + if (!data) { + return; + } + SignalResult(data, /*pReply=*/nullptr, status); } @@ -77,18 +77,18 @@ public: private: void SignalResult(TBusSyncMessageData* data, TBusMessage* pReply, EMessageStatus status) const { Y_VERIFY(data, "Message data is set to NULL."); - TGuard<TMutex> G(data->ReplyLock); + TGuard<TMutex> G(data->ReplyLock); data->Reply = pReply; - data->ReplyStatus = status; - data->ReplyEvent.Signal(); - } + data->ReplyStatus = status; + data->ReplyEvent.Signal(); + } private: // This is weird, because in regular client one-way-ness is selected per call, not per session. bool ExpectReply; TRemoteClientSession* Session; -}; - +}; + namespace NBus { namespace NPrivate { #ifdef _MSC_VER @@ -168,31 +168,31 @@ void TBusSyncSourceSession::Shutdown() { TBusMessage* TBusSyncSourceSession::SendSyncMessage(TBusMessage* pMessage, EMessageStatus& status, const TNetAddr* addr) { return Session->SendSyncMessage(pMessage, status, addr); -} - +} + int TBusSyncSourceSession::RegisterService(const char* hostname, TBusKey start, TBusKey end, EIpVersion ipVersion) { return Session->RegisterService(hostname, start, end, ipVersion); } - + int TBusSyncSourceSession::GetInFlight() { return Session->GetInFlight(); } - + const TBusProtocol* TBusSyncSourceSession::GetProto() const { return Session->GetProto(); } - + const TBusClientSession* TBusSyncSourceSession::GetBusClientSessionWorkaroundDoNotUse() const { return Session.Get(); } - + TBusSyncClientSessionPtr TBusMessageQueue::CreateSyncSource(TBusProtocol* proto, const TBusClientSessionConfig& config, bool needReply, const TString& name) { TIntrusivePtr<TBusSyncSourceSessionImpl> session = new TBusSyncSourceSessionImpl(this, proto, config, needReply, name); Add(session.Get()); return new TBusSyncSourceSession(session); } - + void TBusMessageQueue::Destroy(TBusSyncClientSessionPtr session) { Destroy(session->Session.Get()); Y_UNUSED(session->Session.Release()); -} +} |