diff options
| author | Anton Samokhvalov <[email protected]> | 2022-02-10 16:45:15 +0300 | 
|---|---|---|
| committer | Daniil Cherednik <[email protected]> | 2022-02-10 16:45:15 +0300 | 
| commit | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch) | |
| tree | da2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /library/cpp/messagebus/synchandler.cpp | |
| parent | 778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff) | |
Restoring authorship annotation for Anton Samokhvalov <[email protected]>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/synchandler.cpp')
| -rw-r--r-- | library/cpp/messagebus/synchandler.cpp | 150 | 
1 files changed, 75 insertions, 75 deletions
| diff --git a/library/cpp/messagebus/synchandler.cpp b/library/cpp/messagebus/synchandler.cpp index 8e891d66b30..3498efbb215 100644 --- a/library/cpp/messagebus/synchandler.cpp +++ b/library/cpp/messagebus/synchandler.cpp @@ -13,43 +13,43 @@ using namespace NBus::NPrivate;  /// 3. Message reply  /// 4. Reply status  struct TBusSyncMessageData { -    TCondVar ReplyEvent; -    TMutex ReplyLock; -    TBusMessage* Reply; +    TCondVar ReplyEvent;  +    TMutex ReplyLock;  +    TBusMessage* Reply;       EMessageStatus ReplyStatus;      TBusSyncMessageData()          : Reply(nullptr)          , ReplyStatus(MESSAGE_DONT_ASK) -    { -    } +    {  +    }   }; -class TSyncHandler: public IBusClientHandler { +class TSyncHandler: public IBusClientHandler {   public:      TSyncHandler(bool expectReply = true)          : ExpectReply(expectReply)          , Session(nullptr) -    { -    } -    ~TSyncHandler() override { -    } +    {  +    }  +    ~TSyncHandler() override {  +    }       void OnReply(TAutoPtr<TBusMessage> pMessage0, TAutoPtr<TBusMessage> pReply0) override {          TBusMessage* pMessage = pMessage0.Release();          TBusMessage* pReply = pReply0.Release(); -        if (!ExpectReply) { // Maybe need VERIFY, but it will be better to support backward compatibility here. +        if (!ExpectReply) { // Maybe need VERIFY, but it will be better to support backward compatibility here.               return;          } -        TBusSyncMessageData* data = static_cast<TBusSyncMessageData*>(pMessage->Data); +        TBusSyncMessageData* data = static_cast<TBusSyncMessageData*>(pMessage->Data);           SignalResult(data, pReply, MESSAGE_OK);      }      void OnError(TAutoPtr<TBusMessage> pMessage0, EMessageStatus status) override {          TBusMessage* pMessage = pMessage0.Release(); -        TBusSyncMessageData* data = static_cast<TBusSyncMessageData*>(pMessage->Data); +        TBusSyncMessageData* data = static_cast<TBusSyncMessageData*>(pMessage->Data);           if (!data) {              return;          } @@ -64,7 +64,7 @@ public:      void OnMessageSentOneWay(TAutoPtr<TBusMessage> pMessage) override {          Y_ASSERT(!ExpectReply); -        TBusSyncMessageData* data = static_cast<TBusSyncMessageData*>(pMessage.Release()->Data); +        TBusSyncMessageData* data = static_cast<TBusSyncMessageData*>(pMessage.Release()->Data);           SignalResult(data, /*pReply=*/nullptr, MESSAGE_OK);      } @@ -75,7 +75,7 @@ public:      }  private: -    void SignalResult(TBusSyncMessageData* data, TBusMessage* pReply, EMessageStatus status) const { +    void SignalResult(TBusSyncMessageData* data, TBusMessage* pReply, EMessageStatus status) const {           Y_VERIFY(data, "Message data is set to NULL.");          TGuard<TMutex> G(data->ReplyLock);          data->Reply = pReply; @@ -89,76 +89,76 @@ private:      TRemoteClientSession* Session;  }; -namespace NBus { -    namespace NPrivate { +namespace NBus {  +    namespace NPrivate {   #ifdef _MSC_VER  #pragma warning(push) -#pragma warning(disable : 4250) //  'NBus::NPrivate::TRemoteClientSession' : inherits 'NBus::NPrivate::TBusSessionImpl::NBus::NPrivate::TBusSessionImpl::GetConfig' via dominance +#pragma warning(disable : 4250) //  'NBus::NPrivate::TRemoteClientSession' : inherits 'NBus::NPrivate::TBusSessionImpl::NBus::NPrivate::TBusSessionImpl::GetConfig' via dominance   #endif -        /////////////////////////////////////////////////////////////////////////// -        class TBusSyncSourceSessionImpl -           : private TSyncHandler -            // TODO: do not extend TRemoteClientSession -            , -              public TRemoteClientSession { -        private: -            bool NeedReply; - -        public: -            TBusSyncSourceSessionImpl(TBusMessageQueue* queue, TBusProtocol* proto, const TBusClientSessionConfig& config, bool needReply, const TString& name) -                : TSyncHandler(needReply) -                , TRemoteClientSession(queue, proto, this, config, name) -                , NeedReply(needReply) -            { -                SetSession(this); -            } - -            TBusMessage* SendSyncMessage(TBusMessage* pMessage, EMessageStatus& status, const TNetAddr* addr = nullptr) { -                Y_VERIFY(!Queue->GetExecutor()->IsInExecutorThread(), -                         "SendSyncMessage must not be called from executor thread"); - -                TBusMessage* reply = nullptr; -                THolder<TBusSyncMessageData> data(new TBusSyncMessageData()); - -                pMessage->Data = data.Get(); - -                { -                    TGuard<TMutex> G(data->ReplyLock); -                    if (NeedReply) { -                        status = SendMessage(pMessage, addr, false); // probably should be true -                    } else { -                        status = SendMessageOneWay(pMessage, addr); -                    } - -                    if (status == MESSAGE_OK) { -                        data->ReplyEvent.Wait(data->ReplyLock); -                        TBusSyncMessageData* rdata = static_cast<TBusSyncMessageData*>(pMessage->Data); -                        Y_VERIFY(rdata == data.Get(), "Message data pointer should not be modified."); -                        reply = rdata->Reply; -                        status = rdata->ReplyStatus; -                    } -                } - -                // deletion of message and reply is a job of application. -                pMessage->Data = nullptr; - -                return reply; -            } -        }; - +        ///////////////////////////////////////////////////////////////////////////  +        class TBusSyncSourceSessionImpl  +           : private TSyncHandler  +            // TODO: do not extend TRemoteClientSession  +            ,  +              public TRemoteClientSession {  +        private:  +            bool NeedReply;  + +        public:  +            TBusSyncSourceSessionImpl(TBusMessageQueue* queue, TBusProtocol* proto, const TBusClientSessionConfig& config, bool needReply, const TString& name)  +                : TSyncHandler(needReply)  +                , TRemoteClientSession(queue, proto, this, config, name)  +                , NeedReply(needReply)  +            {  +                SetSession(this);  +            }  + +            TBusMessage* SendSyncMessage(TBusMessage* pMessage, EMessageStatus& status, const TNetAddr* addr = nullptr) {  +                Y_VERIFY(!Queue->GetExecutor()->IsInExecutorThread(),  +                         "SendSyncMessage must not be called from executor thread");  + +                TBusMessage* reply = nullptr;  +                THolder<TBusSyncMessageData> data(new TBusSyncMessageData());  + +                pMessage->Data = data.Get();  + +                {  +                    TGuard<TMutex> G(data->ReplyLock);  +                    if (NeedReply) {  +                        status = SendMessage(pMessage, addr, false); // probably should be true  +                    } else {  +                        status = SendMessageOneWay(pMessage, addr);  +                    }  + +                    if (status == MESSAGE_OK) {  +                        data->ReplyEvent.Wait(data->ReplyLock);  +                        TBusSyncMessageData* rdata = static_cast<TBusSyncMessageData*>(pMessage->Data);  +                        Y_VERIFY(rdata == data.Get(), "Message data pointer should not be modified.");  +                        reply = rdata->Reply;  +                        status = rdata->ReplyStatus;  +                    }  +                }  + +                // deletion of message and reply is a job of application.  +                pMessage->Data = nullptr;  + +                return reply;  +            }  +        };  +   #ifdef _MSC_VER  #pragma warning(pop)  #endif -    } -} +    }  +}   TBusSyncSourceSession::TBusSyncSourceSession(TIntrusivePtr< ::NBus::NPrivate::TBusSyncSourceSessionImpl> session)      : Session(session) -{ -} +{  +}  -TBusSyncSourceSession::~TBusSyncSourceSession() { +TBusSyncSourceSession::~TBusSyncSourceSession() {       Shutdown();  } @@ -170,7 +170,7 @@ TBusMessage* TBusSyncSourceSession::SendSyncMessage(TBusMessage* pMessage, EMess      return Session->SendSyncMessage(pMessage, status, addr);  } -int TBusSyncSourceSession::RegisterService(const char* hostname, TBusKey start, TBusKey end, EIpVersion ipVersion) { +int TBusSyncSourceSession::RegisterService(const char* hostname, TBusKey start, TBusKey end, EIpVersion ipVersion) {       return Session->RegisterService(hostname, start, end, ipVersion);  } | 
