aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/synchandler.cpp
diff options
context:
space:
mode:
authornga <nga@yandex-team.ru>2022-02-10 16:48:09 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:09 +0300
commit1f553f46fb4f3c5eec631352cdd900a0709016af (patch)
treea231fba2c03b440becaea6c86a2702d0bfb0336e /library/cpp/messagebus/synchandler.cpp
parentc4de7efdedc25b49cbea74bd589eecb61b55b60a (diff)
downloadydb-1f553f46fb4f3c5eec631352cdd900a0709016af.tar.gz
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/synchandler.cpp')
-rw-r--r--library/cpp/messagebus/synchandler.cpp134
1 files changed, 67 insertions, 67 deletions
diff --git a/library/cpp/messagebus/synchandler.cpp b/library/cpp/messagebus/synchandler.cpp
index 8e891d66b3..4ea4eb1ee0 100644
--- a/library/cpp/messagebus/synchandler.cpp
+++ b/library/cpp/messagebus/synchandler.cpp
@@ -1,10 +1,10 @@
-#include "remote_client_session.h"
-#include "remote_connection.h"
+#include "remote_client_session.h"
+#include "remote_connection.h"
#include "ybus.h"
-using namespace NBus;
-using namespace NBus::NPrivate;
-
+using namespace NBus;
+using namespace NBus::NPrivate;
+
/////////////////////////////////////////////////////////////////
/// Object that encapsulates all messgae data required for sending
/// a message synchronously and receiving a reply. It includes:
@@ -27,8 +27,8 @@ struct TBusSyncMessageData {
class TSyncHandler: public IBusClientHandler {
public:
- TSyncHandler(bool expectReply = true)
- : ExpectReply(expectReply)
+ TSyncHandler(bool expectReply = true)
+ : ExpectReply(expectReply)
, Session(nullptr)
{
}
@@ -36,57 +36,57 @@ public:
}
void OnReply(TAutoPtr<TBusMessage> pMessage0, TAutoPtr<TBusMessage> pReply0) override {
- TBusMessage* pMessage = pMessage0.Release();
- TBusMessage* pReply = pReply0.Release();
-
+ TBusMessage* pMessage = pMessage0.Release();
+ TBusMessage* pReply = pReply0.Release();
+
if (!ExpectReply) { // Maybe need VERIFY, but it will be better to support backward compatibility here.
- return;
- }
-
+ return;
+ }
+
TBusSyncMessageData* data = static_cast<TBusSyncMessageData*>(pMessage->Data);
- SignalResult(data, pReply, MESSAGE_OK);
+ SignalResult(data, pReply, MESSAGE_OK);
}
void OnError(TAutoPtr<TBusMessage> pMessage0, EMessageStatus status) override {
- TBusMessage* pMessage = pMessage0.Release();
+ TBusMessage* pMessage = pMessage0.Release();
TBusSyncMessageData* data = static_cast<TBusSyncMessageData*>(pMessage->Data);
if (!data) {
return;
}
SignalResult(data, /*pReply=*/nullptr, status);
- }
-
+ }
+
void OnMessageSent(TBusMessage* pMessage) override {
Y_UNUSED(pMessage);
Y_ASSERT(ExpectReply);
- }
-
+ }
+
void OnMessageSentOneWay(TAutoPtr<TBusMessage> pMessage) override {
Y_ASSERT(!ExpectReply);
TBusSyncMessageData* data = static_cast<TBusSyncMessageData*>(pMessage.Release()->Data);
SignalResult(data, /*pReply=*/nullptr, MESSAGE_OK);
- }
-
- void SetSession(TRemoteClientSession* session) {
- if (!ExpectReply) {
- Session = session;
- }
- }
-
-private:
+ }
+
+ void SetSession(TRemoteClientSession* session) {
+ if (!ExpectReply) {
+ Session = session;
+ }
+ }
+
+private:
void SignalResult(TBusSyncMessageData* data, TBusMessage* pReply, EMessageStatus status) const {
Y_VERIFY(data, "Message data is set to NULL.");
TGuard<TMutex> G(data->ReplyLock);
- data->Reply = pReply;
+ data->Reply = pReply;
data->ReplyStatus = status;
data->ReplyEvent.Signal();
}
-
-private:
- // This is weird, because in regular client one-way-ness is selected per call, not per session.
- bool ExpectReply;
- TRemoteClientSession* Session;
+
+private:
+ // This is weird, because in regular client one-way-ness is selected per call, not per session.
+ bool ExpectReply;
+ TRemoteClientSession* Session;
};
namespace NBus {
@@ -104,7 +104,7 @@ namespace NBus {
public TRemoteClientSession {
private:
bool NeedReply;
-
+
public:
TBusSyncSourceSessionImpl(TBusMessageQueue* queue, TBusProtocol* proto, const TBusClientSessionConfig& config, bool needReply, const TString& name)
: TSyncHandler(needReply)
@@ -113,16 +113,16 @@ namespace NBus {
{
SetSession(this);
}
-
+
TBusMessage* SendSyncMessage(TBusMessage* pMessage, EMessageStatus& status, const TNetAddr* addr = nullptr) {
Y_VERIFY(!Queue->GetExecutor()->IsInExecutorThread(),
"SendSyncMessage must not be called from executor thread");
-
+
TBusMessage* reply = nullptr;
THolder<TBusSyncMessageData> data(new TBusSyncMessageData());
-
+
pMessage->Data = data.Get();
-
+
{
TGuard<TMutex> G(data->ReplyLock);
if (NeedReply) {
@@ -130,7 +130,7 @@ namespace NBus {
} else {
status = SendMessageOneWay(pMessage, addr);
}
-
+
if (status == MESSAGE_OK) {
data->ReplyEvent.Wait(data->ReplyLock);
TBusSyncMessageData* rdata = static_cast<TBusSyncMessageData*>(pMessage->Data);
@@ -139,7 +139,7 @@ namespace NBus {
status = rdata->ReplyStatus;
}
}
-
+
// deletion of message and reply is a job of application.
pMessage->Data = nullptr;
@@ -153,46 +153,46 @@ namespace NBus {
}
}
-TBusSyncSourceSession::TBusSyncSourceSession(TIntrusivePtr< ::NBus::NPrivate::TBusSyncSourceSessionImpl> session)
- : Session(session)
+TBusSyncSourceSession::TBusSyncSourceSession(TIntrusivePtr< ::NBus::NPrivate::TBusSyncSourceSessionImpl> session)
+ : Session(session)
{
}
-
+
TBusSyncSourceSession::~TBusSyncSourceSession() {
- Shutdown();
-}
-
-void TBusSyncSourceSession::Shutdown() {
- Session->Shutdown();
-}
-
-TBusMessage* TBusSyncSourceSession::SendSyncMessage(TBusMessage* pMessage, EMessageStatus& status, const TNetAddr* addr) {
- return Session->SendSyncMessage(pMessage, status, addr);
+ Shutdown();
+}
+
+void TBusSyncSourceSession::Shutdown() {
+ Session->Shutdown();
+}
+
+TBusMessage* TBusSyncSourceSession::SendSyncMessage(TBusMessage* pMessage, EMessageStatus& status, const TNetAddr* addr) {
+ return Session->SendSyncMessage(pMessage, status, addr);
}
int TBusSyncSourceSession::RegisterService(const char* hostname, TBusKey start, TBusKey end, EIpVersion ipVersion) {
- return Session->RegisterService(hostname, start, end, ipVersion);
-}
+ return Session->RegisterService(hostname, start, end, ipVersion);
+}
-int TBusSyncSourceSession::GetInFlight() {
- return Session->GetInFlight();
-}
+int TBusSyncSourceSession::GetInFlight() {
+ return Session->GetInFlight();
+}
-const TBusProtocol* TBusSyncSourceSession::GetProto() const {
- return Session->GetProto();
-}
+const TBusProtocol* TBusSyncSourceSession::GetProto() const {
+ return Session->GetProto();
+}
const TBusClientSession* TBusSyncSourceSession::GetBusClientSessionWorkaroundDoNotUse() const {
return Session.Get();
}
TBusSyncClientSessionPtr TBusMessageQueue::CreateSyncSource(TBusProtocol* proto, const TBusClientSessionConfig& config, bool needReply, const TString& name) {
- TIntrusivePtr<TBusSyncSourceSessionImpl> session = new TBusSyncSourceSessionImpl(this, proto, config, needReply, name);
- Add(session.Get());
- return new TBusSyncSourceSession(session);
-}
+ TIntrusivePtr<TBusSyncSourceSessionImpl> session = new TBusSyncSourceSessionImpl(this, proto, config, needReply, name);
+ Add(session.Get());
+ return new TBusSyncSourceSession(session);
+}
-void TBusMessageQueue::Destroy(TBusSyncClientSessionPtr session) {
- Destroy(session->Session.Get());
+void TBusMessageQueue::Destroy(TBusSyncClientSessionPtr session) {
+ Destroy(session->Session.Get());
Y_UNUSED(session->Session.Release());
}