aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/synchandler.cpp
diff options
context:
space:
mode:
authorprateek <prateek@yandex-team.ru>2022-02-10 16:50:32 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:32 +0300
commit06e925754c8de946ff79d538bde1e6424cbd4cbb (patch)
tree5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/messagebus/synchandler.cpp
parent30744531a4767f053be08b22b325594d7ed8ffb3 (diff)
downloadydb-06e925754c8de946ff79d538bde1e6424cbd4cbb.tar.gz
Restoring authorship annotation for <prateek@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/synchandler.cpp')
-rw-r--r--library/cpp/messagebus/synchandler.cpp74
1 files changed, 37 insertions, 37 deletions
diff --git a/library/cpp/messagebus/synchandler.cpp b/library/cpp/messagebus/synchandler.cpp
index c95ada8039..8e891d66b3 100644
--- a/library/cpp/messagebus/synchandler.cpp
+++ b/library/cpp/messagebus/synchandler.cpp
@@ -1,32 +1,32 @@
#include "remote_client_session.h"
#include "remote_connection.h"
#include "ybus.h"
-
+
using namespace NBus;
using namespace NBus::NPrivate;
-/////////////////////////////////////////////////////////////////
-/// Object that encapsulates all messgae data required for sending
-/// a message synchronously and receiving a reply. It includes:
-/// 1. ConditionVariable to wait on message reply
-/// 2. Lock used by condition variable
-/// 3. Message reply
-/// 4. Reply status
-struct TBusSyncMessageData {
+/////////////////////////////////////////////////////////////////
+/// Object that encapsulates all messgae data required for sending
+/// a message synchronously and receiving a reply. It includes:
+/// 1. ConditionVariable to wait on message reply
+/// 2. Lock used by condition variable
+/// 3. Message reply
+/// 4. Reply status
+struct TBusSyncMessageData {
TCondVar ReplyEvent;
TMutex ReplyLock;
TBusMessage* Reply;
- EMessageStatus ReplyStatus;
-
- TBusSyncMessageData()
+ EMessageStatus ReplyStatus;
+
+ TBusSyncMessageData()
: Reply(nullptr)
- , ReplyStatus(MESSAGE_DONT_ASK)
+ , ReplyStatus(MESSAGE_DONT_ASK)
{
}
-};
-
+};
+
class TSyncHandler: public IBusClientHandler {
-public:
+public:
TSyncHandler(bool expectReply = true)
: ExpectReply(expectReply)
, Session(nullptr)
@@ -34,7 +34,7 @@ public:
}
~TSyncHandler() override {
}
-
+
void OnReply(TAutoPtr<TBusMessage> pMessage0, TAutoPtr<TBusMessage> pReply0) override {
TBusMessage* pMessage = pMessage0.Release();
TBusMessage* pReply = pReply0.Release();
@@ -45,15 +45,15 @@ public:
TBusSyncMessageData* data = static_cast<TBusSyncMessageData*>(pMessage->Data);
SignalResult(data, pReply, MESSAGE_OK);
- }
-
+ }
+
void OnError(TAutoPtr<TBusMessage> pMessage0, EMessageStatus status) override {
TBusMessage* pMessage = pMessage0.Release();
TBusSyncMessageData* data = static_cast<TBusSyncMessageData*>(pMessage->Data);
- if (!data) {
- return;
- }
-
+ if (!data) {
+ return;
+ }
+
SignalResult(data, /*pReply=*/nullptr, status);
}
@@ -77,18 +77,18 @@ public:
private:
void SignalResult(TBusSyncMessageData* data, TBusMessage* pReply, EMessageStatus status) const {
Y_VERIFY(data, "Message data is set to NULL.");
- TGuard<TMutex> G(data->ReplyLock);
+ TGuard<TMutex> G(data->ReplyLock);
data->Reply = pReply;
- data->ReplyStatus = status;
- data->ReplyEvent.Signal();
- }
+ data->ReplyStatus = status;
+ data->ReplyEvent.Signal();
+ }
private:
// This is weird, because in regular client one-way-ness is selected per call, not per session.
bool ExpectReply;
TRemoteClientSession* Session;
-};
-
+};
+
namespace NBus {
namespace NPrivate {
#ifdef _MSC_VER
@@ -168,31 +168,31 @@ void TBusSyncSourceSession::Shutdown() {
TBusMessage* TBusSyncSourceSession::SendSyncMessage(TBusMessage* pMessage, EMessageStatus& status, const TNetAddr* addr) {
return Session->SendSyncMessage(pMessage, status, addr);
-}
-
+}
+
int TBusSyncSourceSession::RegisterService(const char* hostname, TBusKey start, TBusKey end, EIpVersion ipVersion) {
return Session->RegisterService(hostname, start, end, ipVersion);
}
-
+
int TBusSyncSourceSession::GetInFlight() {
return Session->GetInFlight();
}
-
+
const TBusProtocol* TBusSyncSourceSession::GetProto() const {
return Session->GetProto();
}
-
+
const TBusClientSession* TBusSyncSourceSession::GetBusClientSessionWorkaroundDoNotUse() const {
return Session.Get();
}
-
+
TBusSyncClientSessionPtr TBusMessageQueue::CreateSyncSource(TBusProtocol* proto, const TBusClientSessionConfig& config, bool needReply, const TString& name) {
TIntrusivePtr<TBusSyncSourceSessionImpl> session = new TBusSyncSourceSessionImpl(this, proto, config, needReply, name);
Add(session.Get());
return new TBusSyncSourceSession(session);
}
-
+
void TBusMessageQueue::Destroy(TBusSyncClientSessionPtr session) {
Destroy(session->Session.Get());
Y_UNUSED(session->Session.Release());
-}
+}