diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:15 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:15 +0300 |
commit | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch) | |
tree | da2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /library/cpp/messagebus/ybus.h | |
parent | 778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff) | |
download | ydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/ybus.h')
-rw-r--r-- | library/cpp/messagebus/ybus.h | 270 |
1 files changed, 135 insertions, 135 deletions
diff --git a/library/cpp/messagebus/ybus.h b/library/cpp/messagebus/ybus.h index de21ad8521..784fba4c1b 100644 --- a/library/cpp/messagebus/ybus.h +++ b/library/cpp/messagebus/ybus.h @@ -1,8 +1,8 @@ -#pragma once - +#pragma once + /// Asynchronous Messaging Library implements framework for sending and /// receiving messages between loosely connected processes. - + #include "coreconn.h" #include "defs.h" #include "handler.h" @@ -35,171 +35,171 @@ #include <util/system/mutex.h> namespace NBus { - //////////////////////////////////////////////////////// - /// \brief Common structure to store address information - - int CompareByHost(const IRemoteAddr& l, const IRemoteAddr& r) noexcept; - bool operator<(const TNetAddr& a1, const TNetAddr& a2); // compare by addresses - - ///////////////////////////////////////////////////////////////////////// - /// \brief Handles routing and data encoding to/from wire - - /// Protocol is stateless threadsafe singleton object that - /// encapsulates relationship between a message (TBusMessage) object - /// and destination server. Protocol object is reponsible for serializing in-memory - /// message and reply into the wire, retuning name of the service and resource - /// distribution key for given protocol. - - /// Protocol object should transparently handle messages and replies. - /// This is interface only class, actuall instances of the protocols - /// should be created using templates inhereted from this base class. - class TBusProtocol { - private: - TString ServiceName; - int ServicePort; - - public: - TBusProtocol(TBusService name = "UNKNOWN", int port = 0) - : ServiceName(name) - , ServicePort(port) - { - } - - /// returns service type for this protocol and message - TBusService GetService() const { + //////////////////////////////////////////////////////// + /// \brief Common structure to store address information + + int CompareByHost(const IRemoteAddr& l, const IRemoteAddr& r) noexcept; + bool operator<(const TNetAddr& a1, const TNetAddr& a2); // compare by addresses + + ///////////////////////////////////////////////////////////////////////// + /// \brief Handles routing and data encoding to/from wire + + /// Protocol is stateless threadsafe singleton object that + /// encapsulates relationship between a message (TBusMessage) object + /// and destination server. Protocol object is reponsible for serializing in-memory + /// message and reply into the wire, retuning name of the service and resource + /// distribution key for given protocol. + + /// Protocol object should transparently handle messages and replies. + /// This is interface only class, actuall instances of the protocols + /// should be created using templates inhereted from this base class. + class TBusProtocol { + private: + TString ServiceName; + int ServicePort; + + public: + TBusProtocol(TBusService name = "UNKNOWN", int port = 0) + : ServiceName(name) + , ServicePort(port) + { + } + + /// returns service type for this protocol and message + TBusService GetService() const { return ServiceName.data(); - } + } - /// returns port number for destination session to open socket - int GetPort() const { - return ServicePort; - } + /// returns port number for destination session to open socket + int GetPort() const { + return ServicePort; + } - virtual ~TBusProtocol() { - } + virtual ~TBusProtocol() { + } - /// \brief serialized protocol specific data into TBusData - /// \note buffer passed to the function (data) is not empty, use append functions - virtual void Serialize(const TBusMessage* mess, TBuffer& data) = 0; + /// \brief serialized protocol specific data into TBusData + /// \note buffer passed to the function (data) is not empty, use append functions + virtual void Serialize(const TBusMessage* mess, TBuffer& data) = 0; - /// deserialized TBusData into new instance of the message - virtual TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) = 0; + /// deserialized TBusData into new instance of the message + virtual TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) = 0; - /// returns key for messages of this protocol - virtual TBusKey GetKey(const TBusMessage*) { - return YBUS_KEYMIN; - } + /// returns key for messages of this protocol + virtual TBusKey GetKey(const TBusMessage*) { + return YBUS_KEYMIN; + } - /// default implementation of routing policy to allow overrides - virtual EMessageStatus GetDestination(const TBusClientSession* session, TBusMessage* mess, TBusLocator* locator, TNetAddr* addr); + /// default implementation of routing policy to allow overrides + virtual EMessageStatus GetDestination(const TBusClientSession* session, TBusMessage* mess, TBusLocator* locator, TNetAddr* addr); - /// codec for transport level compression - virtual NCodecs::TCodecPtr GetTransportCodec(void) const { - return NCodecs::ICodec::GetInstance("snappy"); - } - }; + /// codec for transport level compression + virtual NCodecs::TCodecPtr GetTransportCodec(void) const { + return NCodecs::ICodec::GetInstance("snappy"); + } + }; - class TBusSyncSourceSession: public TAtomicRefCount<TBusSyncSourceSession> { - friend class TBusMessageQueue; + class TBusSyncSourceSession: public TAtomicRefCount<TBusSyncSourceSession> { + friend class TBusMessageQueue; - public: - TBusSyncSourceSession(TIntrusivePtr< ::NBus::NPrivate::TBusSyncSourceSessionImpl> session); - ~TBusSyncSourceSession(); + public: + TBusSyncSourceSession(TIntrusivePtr< ::NBus::NPrivate::TBusSyncSourceSessionImpl> session); + ~TBusSyncSourceSession(); - void Shutdown(); + void Shutdown(); - TBusMessage* SendSyncMessage(TBusMessage* pMessage, EMessageStatus& status, const TNetAddr* addr = nullptr); + TBusMessage* SendSyncMessage(TBusMessage* pMessage, EMessageStatus& status, const TNetAddr* addr = nullptr); - int RegisterService(const char* hostname, TBusKey start = YBUS_KEYMIN, TBusKey end = YBUS_KEYMAX, EIpVersion ipVersion = EIP_VERSION_4); + int RegisterService(const char* hostname, TBusKey start = YBUS_KEYMIN, TBusKey end = YBUS_KEYMAX, EIpVersion ipVersion = EIP_VERSION_4); - int GetInFlight(); + int GetInFlight(); - const TBusProtocol* GetProto() const; + const TBusProtocol* GetProto() const; - const TBusClientSession* GetBusClientSessionWorkaroundDoNotUse() const; // It's for TLoadBalancedProtocol::GetDestination() function that really needs TBusClientSession* unlike all other protocols. Look at review 32425 (http://rb.yandex-team.ru/arc/r/32425/) for more information. - private: - TIntrusivePtr< ::NBus::NPrivate::TBusSyncSourceSessionImpl> Session; - }; + const TBusClientSession* GetBusClientSessionWorkaroundDoNotUse() const; // It's for TLoadBalancedProtocol::GetDestination() function that really needs TBusClientSession* unlike all other protocols. Look at review 32425 (http://rb.yandex-team.ru/arc/r/32425/) for more information. + private: + TIntrusivePtr< ::NBus::NPrivate::TBusSyncSourceSessionImpl> Session; + }; - using TBusSyncClientSessionPtr = TIntrusivePtr<TBusSyncSourceSession>; + using TBusSyncClientSessionPtr = TIntrusivePtr<TBusSyncSourceSession>; - /////////////////////////////////////////////////////////////////// - /// \brief Main message queue object, need one per application - class TBusMessageQueue: public TAtomicRefCount<TBusMessageQueue> { - /// allow mesage queue to be created only via factory - friend TBusMessageQueuePtr CreateMessageQueue(const TBusQueueConfig& config, NActor::TExecutorPtr executor, TBusLocator* locator, const char* name); - friend class ::NBus::NPrivate::TRemoteConnection; - friend struct ::NBus::NPrivate::TBusSessionImpl; - friend class ::NBus::NPrivate::TAcceptor; - friend struct ::NBus::TBusServerSession; + /////////////////////////////////////////////////////////////////// + /// \brief Main message queue object, need one per application + class TBusMessageQueue: public TAtomicRefCount<TBusMessageQueue> { + /// allow mesage queue to be created only via factory + friend TBusMessageQueuePtr CreateMessageQueue(const TBusQueueConfig& config, NActor::TExecutorPtr executor, TBusLocator* locator, const char* name); + friend class ::NBus::NPrivate::TRemoteConnection; + friend struct ::NBus::NPrivate::TBusSessionImpl; + friend class ::NBus::NPrivate::TAcceptor; + friend struct ::NBus::TBusServerSession; - private: - const TBusQueueConfig Config; - TMutex Lock; - TList<TIntrusivePtr< ::NBus::NPrivate::TBusSessionImpl>> Sessions; - TSimpleIntrusivePtr<TBusLocator> Locator; - NPrivate::TScheduler Scheduler; + private: + const TBusQueueConfig Config; + TMutex Lock; + TList<TIntrusivePtr< ::NBus::NPrivate::TBusSessionImpl>> Sessions; + TSimpleIntrusivePtr<TBusLocator> Locator; + NPrivate::TScheduler Scheduler; - ::NActor::TExecutorPtr WorkQueue; + ::NActor::TExecutorPtr WorkQueue; - TAtomic Running; + TAtomic Running; TSystemEvent ShutdownComplete; - private: - /// constructor is protected, used NBus::CreateMessageQueue() to create a instance - TBusMessageQueue(const TBusQueueConfig& config, NActor::TExecutorPtr executor, TBusLocator* locator, const char* name); + private: + /// constructor is protected, used NBus::CreateMessageQueue() to create a instance + TBusMessageQueue(const TBusQueueConfig& config, NActor::TExecutorPtr executor, TBusLocator* locator, const char* name); - public: - TString GetNameInternal() const; + public: + TString GetNameInternal() const; - ~TBusMessageQueue(); + ~TBusMessageQueue(); - void Stop(); - bool IsRunning(); + void Stop(); + bool IsRunning(); - public: - void EnqueueWork(TArrayRef< ::NActor::IWorkItem* const> w) { - WorkQueue->EnqueueWork(w); - } + public: + void EnqueueWork(TArrayRef< ::NActor::IWorkItem* const> w) { + WorkQueue->EnqueueWork(w); + } - ::NActor::TExecutor* GetExecutor() { - return WorkQueue.Get(); - } + ::NActor::TExecutor* GetExecutor() { + return WorkQueue.Get(); + } - TString GetStatus(ui16 flags = YBUS_STATUS_CONNS) const; - // without sessions - NPrivate::TBusMessageQueueStatus GetStatusRecordInternal() const; - TString GetStatusSelf() const; - TString GetStatusSingleLine() const; + TString GetStatus(ui16 flags = YBUS_STATUS_CONNS) const; + // without sessions + NPrivate::TBusMessageQueueStatus GetStatusRecordInternal() const; + TString GetStatusSelf() const; + TString GetStatusSingleLine() const; - TBusLocator* GetLocator() const { - return Locator.Get(); - } + TBusLocator* GetLocator() const { + return Locator.Get(); + } - TBusClientSessionPtr CreateSource(TBusProtocol* proto, IBusClientHandler* handler, const TBusClientSessionConfig& config, const TString& name = ""); - TBusSyncClientSessionPtr CreateSyncSource(TBusProtocol* proto, const TBusClientSessionConfig& config, bool needReply = true, const TString& name = ""); - TBusServerSessionPtr CreateDestination(TBusProtocol* proto, IBusServerHandler* hander, const TBusServerSessionConfig& config, const TString& name = ""); - TBusServerSessionPtr CreateDestination(TBusProtocol* proto, IBusServerHandler* hander, const TBusServerSessionConfig& config, const TVector<TBindResult>& bindTo, const TString& name = ""); + TBusClientSessionPtr CreateSource(TBusProtocol* proto, IBusClientHandler* handler, const TBusClientSessionConfig& config, const TString& name = ""); + TBusSyncClientSessionPtr CreateSyncSource(TBusProtocol* proto, const TBusClientSessionConfig& config, bool needReply = true, const TString& name = ""); + TBusServerSessionPtr CreateDestination(TBusProtocol* proto, IBusServerHandler* hander, const TBusServerSessionConfig& config, const TString& name = ""); + TBusServerSessionPtr CreateDestination(TBusProtocol* proto, IBusServerHandler* hander, const TBusServerSessionConfig& config, const TVector<TBindResult>& bindTo, const TString& name = ""); - private: - void Destroy(TBusSession* session); - void Destroy(TBusSyncClientSessionPtr session); + private: + void Destroy(TBusSession* session); + void Destroy(TBusSyncClientSessionPtr session); - public: - void Schedule(NPrivate::IScheduleItemAutoPtr i); + public: + void Schedule(NPrivate::IScheduleItemAutoPtr i); - private: - void DestroyAllSessions(); - void Add(TIntrusivePtr< ::NBus::NPrivate::TBusSessionImpl> session); - void Remove(TBusSession* session); - }; + private: + void DestroyAllSessions(); + void Add(TIntrusivePtr< ::NBus::NPrivate::TBusSessionImpl> session); + void Remove(TBusSession* session); + }; - ///////////////////////////////////////////////////////////////// - /// Factory methods to construct message queue - TBusMessageQueuePtr CreateMessageQueue(const char* name = ""); - TBusMessageQueuePtr CreateMessageQueue(NActor::TExecutorPtr executor, const char* name = ""); - TBusMessageQueuePtr CreateMessageQueue(const TBusQueueConfig& config, const char* name = ""); - TBusMessageQueuePtr CreateMessageQueue(const TBusQueueConfig& config, TBusLocator* locator, const char* name = ""); - TBusMessageQueuePtr CreateMessageQueue(const TBusQueueConfig& config, NActor::TExecutorPtr executor, TBusLocator* locator, const char* name = ""); + ///////////////////////////////////////////////////////////////// + /// Factory methods to construct message queue + TBusMessageQueuePtr CreateMessageQueue(const char* name = ""); + TBusMessageQueuePtr CreateMessageQueue(NActor::TExecutorPtr executor, const char* name = ""); + TBusMessageQueuePtr CreateMessageQueue(const TBusQueueConfig& config, const char* name = ""); + TBusMessageQueuePtr CreateMessageQueue(const TBusQueueConfig& config, TBusLocator* locator, const char* name = ""); + TBusMessageQueuePtr CreateMessageQueue(const TBusQueueConfig& config, NActor::TExecutorPtr executor, TBusLocator* locator, const char* name = ""); -} +} |