summaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/synchandler.cpp
diff options
context:
space:
mode:
authorAnton Samokhvalov <[email protected]>2022-02-10 16:45:17 +0300
committerDaniil Cherednik <[email protected]>2022-02-10 16:45:17 +0300
commitd3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch)
treedd4bd3ca0f36b817e96812825ffaf10d645803f2 /library/cpp/messagebus/synchandler.cpp
parent72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff)
Restoring authorship annotation for Anton Samokhvalov <[email protected]>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/synchandler.cpp')
-rw-r--r--library/cpp/messagebus/synchandler.cpp150
1 files changed, 75 insertions, 75 deletions
diff --git a/library/cpp/messagebus/synchandler.cpp b/library/cpp/messagebus/synchandler.cpp
index 3498efbb215..8e891d66b30 100644
--- a/library/cpp/messagebus/synchandler.cpp
+++ b/library/cpp/messagebus/synchandler.cpp
@@ -13,43 +13,43 @@ using namespace NBus::NPrivate;
/// 3. Message reply
/// 4. Reply status
struct TBusSyncMessageData {
- TCondVar ReplyEvent;
- TMutex ReplyLock;
- TBusMessage* Reply;
+ TCondVar ReplyEvent;
+ TMutex ReplyLock;
+ TBusMessage* Reply;
EMessageStatus ReplyStatus;
TBusSyncMessageData()
: Reply(nullptr)
, ReplyStatus(MESSAGE_DONT_ASK)
- {
- }
+ {
+ }
};
-class TSyncHandler: public IBusClientHandler {
+class TSyncHandler: public IBusClientHandler {
public:
TSyncHandler(bool expectReply = true)
: ExpectReply(expectReply)
, Session(nullptr)
- {
- }
- ~TSyncHandler() override {
- }
+ {
+ }
+ ~TSyncHandler() override {
+ }
void OnReply(TAutoPtr<TBusMessage> pMessage0, TAutoPtr<TBusMessage> pReply0) override {
TBusMessage* pMessage = pMessage0.Release();
TBusMessage* pReply = pReply0.Release();
- if (!ExpectReply) { // Maybe need VERIFY, but it will be better to support backward compatibility here.
+ if (!ExpectReply) { // Maybe need VERIFY, but it will be better to support backward compatibility here.
return;
}
- TBusSyncMessageData* data = static_cast<TBusSyncMessageData*>(pMessage->Data);
+ TBusSyncMessageData* data = static_cast<TBusSyncMessageData*>(pMessage->Data);
SignalResult(data, pReply, MESSAGE_OK);
}
void OnError(TAutoPtr<TBusMessage> pMessage0, EMessageStatus status) override {
TBusMessage* pMessage = pMessage0.Release();
- TBusSyncMessageData* data = static_cast<TBusSyncMessageData*>(pMessage->Data);
+ TBusSyncMessageData* data = static_cast<TBusSyncMessageData*>(pMessage->Data);
if (!data) {
return;
}
@@ -64,7 +64,7 @@ public:
void OnMessageSentOneWay(TAutoPtr<TBusMessage> pMessage) override {
Y_ASSERT(!ExpectReply);
- TBusSyncMessageData* data = static_cast<TBusSyncMessageData*>(pMessage.Release()->Data);
+ TBusSyncMessageData* data = static_cast<TBusSyncMessageData*>(pMessage.Release()->Data);
SignalResult(data, /*pReply=*/nullptr, MESSAGE_OK);
}
@@ -75,7 +75,7 @@ public:
}
private:
- void SignalResult(TBusSyncMessageData* data, TBusMessage* pReply, EMessageStatus status) const {
+ void SignalResult(TBusSyncMessageData* data, TBusMessage* pReply, EMessageStatus status) const {
Y_VERIFY(data, "Message data is set to NULL.");
TGuard<TMutex> G(data->ReplyLock);
data->Reply = pReply;
@@ -89,76 +89,76 @@ private:
TRemoteClientSession* Session;
};
-namespace NBus {
- namespace NPrivate {
+namespace NBus {
+ namespace NPrivate {
#ifdef _MSC_VER
#pragma warning(push)
-#pragma warning(disable : 4250) // 'NBus::NPrivate::TRemoteClientSession' : inherits 'NBus::NPrivate::TBusSessionImpl::NBus::NPrivate::TBusSessionImpl::GetConfig' via dominance
+#pragma warning(disable : 4250) // 'NBus::NPrivate::TRemoteClientSession' : inherits 'NBus::NPrivate::TBusSessionImpl::NBus::NPrivate::TBusSessionImpl::GetConfig' via dominance
#endif
- ///////////////////////////////////////////////////////////////////////////
- class TBusSyncSourceSessionImpl
- : private TSyncHandler
- // TODO: do not extend TRemoteClientSession
- ,
- public TRemoteClientSession {
- private:
- bool NeedReply;
-
- public:
- TBusSyncSourceSessionImpl(TBusMessageQueue* queue, TBusProtocol* proto, const TBusClientSessionConfig& config, bool needReply, const TString& name)
- : TSyncHandler(needReply)
- , TRemoteClientSession(queue, proto, this, config, name)
- , NeedReply(needReply)
- {
- SetSession(this);
- }
-
- TBusMessage* SendSyncMessage(TBusMessage* pMessage, EMessageStatus& status, const TNetAddr* addr = nullptr) {
- Y_VERIFY(!Queue->GetExecutor()->IsInExecutorThread(),
- "SendSyncMessage must not be called from executor thread");
-
- TBusMessage* reply = nullptr;
- THolder<TBusSyncMessageData> data(new TBusSyncMessageData());
-
- pMessage->Data = data.Get();
-
- {
- TGuard<TMutex> G(data->ReplyLock);
- if (NeedReply) {
- status = SendMessage(pMessage, addr, false); // probably should be true
- } else {
- status = SendMessageOneWay(pMessage, addr);
- }
-
- if (status == MESSAGE_OK) {
- data->ReplyEvent.Wait(data->ReplyLock);
- TBusSyncMessageData* rdata = static_cast<TBusSyncMessageData*>(pMessage->Data);
- Y_VERIFY(rdata == data.Get(), "Message data pointer should not be modified.");
- reply = rdata->Reply;
- status = rdata->ReplyStatus;
- }
- }
-
- // deletion of message and reply is a job of application.
- pMessage->Data = nullptr;
-
- return reply;
- }
- };
-
+ ///////////////////////////////////////////////////////////////////////////
+ class TBusSyncSourceSessionImpl
+ : private TSyncHandler
+ // TODO: do not extend TRemoteClientSession
+ ,
+ public TRemoteClientSession {
+ private:
+ bool NeedReply;
+
+ public:
+ TBusSyncSourceSessionImpl(TBusMessageQueue* queue, TBusProtocol* proto, const TBusClientSessionConfig& config, bool needReply, const TString& name)
+ : TSyncHandler(needReply)
+ , TRemoteClientSession(queue, proto, this, config, name)
+ , NeedReply(needReply)
+ {
+ SetSession(this);
+ }
+
+ TBusMessage* SendSyncMessage(TBusMessage* pMessage, EMessageStatus& status, const TNetAddr* addr = nullptr) {
+ Y_VERIFY(!Queue->GetExecutor()->IsInExecutorThread(),
+ "SendSyncMessage must not be called from executor thread");
+
+ TBusMessage* reply = nullptr;
+ THolder<TBusSyncMessageData> data(new TBusSyncMessageData());
+
+ pMessage->Data = data.Get();
+
+ {
+ TGuard<TMutex> G(data->ReplyLock);
+ if (NeedReply) {
+ status = SendMessage(pMessage, addr, false); // probably should be true
+ } else {
+ status = SendMessageOneWay(pMessage, addr);
+ }
+
+ if (status == MESSAGE_OK) {
+ data->ReplyEvent.Wait(data->ReplyLock);
+ TBusSyncMessageData* rdata = static_cast<TBusSyncMessageData*>(pMessage->Data);
+ Y_VERIFY(rdata == data.Get(), "Message data pointer should not be modified.");
+ reply = rdata->Reply;
+ status = rdata->ReplyStatus;
+ }
+ }
+
+ // deletion of message and reply is a job of application.
+ pMessage->Data = nullptr;
+
+ return reply;
+ }
+ };
+
#ifdef _MSC_VER
#pragma warning(pop)
#endif
- }
-}
+ }
+}
TBusSyncSourceSession::TBusSyncSourceSession(TIntrusivePtr< ::NBus::NPrivate::TBusSyncSourceSessionImpl> session)
: Session(session)
-{
-}
+{
+}
-TBusSyncSourceSession::~TBusSyncSourceSession() {
+TBusSyncSourceSession::~TBusSyncSourceSession() {
Shutdown();
}
@@ -170,7 +170,7 @@ TBusMessage* TBusSyncSourceSession::SendSyncMessage(TBusMessage* pMessage, EMess
return Session->SendSyncMessage(pMessage, status, addr);
}
-int TBusSyncSourceSession::RegisterService(const char* hostname, TBusKey start, TBusKey end, EIpVersion ipVersion) {
+int TBusSyncSourceSession::RegisterService(const char* hostname, TBusKey start, TBusKey end, EIpVersion ipVersion) {
return Session->RegisterService(hostname, start, end, ipVersion);
}