diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/ybus.h | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/ybus.h')
-rw-r--r-- | library/cpp/messagebus/ybus.h | 205 |
1 files changed, 205 insertions, 0 deletions
diff --git a/library/cpp/messagebus/ybus.h b/library/cpp/messagebus/ybus.h new file mode 100644 index 0000000000..de21ad8521 --- /dev/null +++ b/library/cpp/messagebus/ybus.h @@ -0,0 +1,205 @@ +#pragma once + +/// Asynchronous Messaging Library implements framework for sending and +/// receiving messages between loosely connected processes. + +#include "coreconn.h" +#include "defs.h" +#include "handler.h" +#include "handler_impl.h" +#include "local_flags.h" +#include "locator.h" +#include "message.h" +#include "message_status.h" +#include "network.h" +#include "queue_config.h" +#include "remote_connection_status.h" +#include "session.h" +#include "session_config.h" +#include "socket_addr.h" + +#include <library/cpp/messagebus/actor/executor.h> +#include <library/cpp/messagebus/scheduler/scheduler.h> + +#include <library/cpp/codecs/codecs.h> + +#include <util/generic/array_ref.h> +#include <util/generic/buffer.h> +#include <util/generic/noncopyable.h> +#include <util/generic/ptr.h> +#include <util/stream/input.h> +#include <util/system/atomic.h> +#include <util/system/condvar.h> +#include <util/system/type_name.h> +#include <util/system/event.h> +#include <util/system/mutex.h> + +namespace NBus { + //////////////////////////////////////////////////////// + /// \brief Common structure to store address information + + int CompareByHost(const IRemoteAddr& l, const IRemoteAddr& r) noexcept; + bool operator<(const TNetAddr& a1, const TNetAddr& a2); // compare by addresses + + ///////////////////////////////////////////////////////////////////////// + /// \brief Handles routing and data encoding to/from wire + + /// Protocol is stateless threadsafe singleton object that + /// encapsulates relationship between a message (TBusMessage) object + /// and destination server. Protocol object is reponsible for serializing in-memory + /// message and reply into the wire, retuning name of the service and resource + /// distribution key for given protocol. + + /// Protocol object should transparently handle messages and replies. + /// This is interface only class, actuall instances of the protocols + /// should be created using templates inhereted from this base class. + class TBusProtocol { + private: + TString ServiceName; + int ServicePort; + + public: + TBusProtocol(TBusService name = "UNKNOWN", int port = 0) + : ServiceName(name) + , ServicePort(port) + { + } + + /// returns service type for this protocol and message + TBusService GetService() const { + return ServiceName.data(); + } + + /// returns port number for destination session to open socket + int GetPort() const { + return ServicePort; + } + + virtual ~TBusProtocol() { + } + + /// \brief serialized protocol specific data into TBusData + /// \note buffer passed to the function (data) is not empty, use append functions + virtual void Serialize(const TBusMessage* mess, TBuffer& data) = 0; + + /// deserialized TBusData into new instance of the message + virtual TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) = 0; + + /// returns key for messages of this protocol + virtual TBusKey GetKey(const TBusMessage*) { + return YBUS_KEYMIN; + } + + /// default implementation of routing policy to allow overrides + virtual EMessageStatus GetDestination(const TBusClientSession* session, TBusMessage* mess, TBusLocator* locator, TNetAddr* addr); + + /// codec for transport level compression + virtual NCodecs::TCodecPtr GetTransportCodec(void) const { + return NCodecs::ICodec::GetInstance("snappy"); + } + }; + + class TBusSyncSourceSession: public TAtomicRefCount<TBusSyncSourceSession> { + friend class TBusMessageQueue; + + public: + TBusSyncSourceSession(TIntrusivePtr< ::NBus::NPrivate::TBusSyncSourceSessionImpl> session); + ~TBusSyncSourceSession(); + + void Shutdown(); + + TBusMessage* SendSyncMessage(TBusMessage* pMessage, EMessageStatus& status, const TNetAddr* addr = nullptr); + + int RegisterService(const char* hostname, TBusKey start = YBUS_KEYMIN, TBusKey end = YBUS_KEYMAX, EIpVersion ipVersion = EIP_VERSION_4); + + int GetInFlight(); + + const TBusProtocol* GetProto() const; + + const TBusClientSession* GetBusClientSessionWorkaroundDoNotUse() const; // It's for TLoadBalancedProtocol::GetDestination() function that really needs TBusClientSession* unlike all other protocols. Look at review 32425 (http://rb.yandex-team.ru/arc/r/32425/) for more information. + private: + TIntrusivePtr< ::NBus::NPrivate::TBusSyncSourceSessionImpl> Session; + }; + + using TBusSyncClientSessionPtr = TIntrusivePtr<TBusSyncSourceSession>; + + /////////////////////////////////////////////////////////////////// + /// \brief Main message queue object, need one per application + class TBusMessageQueue: public TAtomicRefCount<TBusMessageQueue> { + /// allow mesage queue to be created only via factory + friend TBusMessageQueuePtr CreateMessageQueue(const TBusQueueConfig& config, NActor::TExecutorPtr executor, TBusLocator* locator, const char* name); + friend class ::NBus::NPrivate::TRemoteConnection; + friend struct ::NBus::NPrivate::TBusSessionImpl; + friend class ::NBus::NPrivate::TAcceptor; + friend struct ::NBus::TBusServerSession; + + private: + const TBusQueueConfig Config; + TMutex Lock; + TList<TIntrusivePtr< ::NBus::NPrivate::TBusSessionImpl>> Sessions; + TSimpleIntrusivePtr<TBusLocator> Locator; + NPrivate::TScheduler Scheduler; + + ::NActor::TExecutorPtr WorkQueue; + + TAtomic Running; + TSystemEvent ShutdownComplete; + + private: + /// constructor is protected, used NBus::CreateMessageQueue() to create a instance + TBusMessageQueue(const TBusQueueConfig& config, NActor::TExecutorPtr executor, TBusLocator* locator, const char* name); + + public: + TString GetNameInternal() const; + + ~TBusMessageQueue(); + + void Stop(); + bool IsRunning(); + + public: + void EnqueueWork(TArrayRef< ::NActor::IWorkItem* const> w) { + WorkQueue->EnqueueWork(w); + } + + ::NActor::TExecutor* GetExecutor() { + return WorkQueue.Get(); + } + + TString GetStatus(ui16 flags = YBUS_STATUS_CONNS) const; + // without sessions + NPrivate::TBusMessageQueueStatus GetStatusRecordInternal() const; + TString GetStatusSelf() const; + TString GetStatusSingleLine() const; + + TBusLocator* GetLocator() const { + return Locator.Get(); + } + + TBusClientSessionPtr CreateSource(TBusProtocol* proto, IBusClientHandler* handler, const TBusClientSessionConfig& config, const TString& name = ""); + TBusSyncClientSessionPtr CreateSyncSource(TBusProtocol* proto, const TBusClientSessionConfig& config, bool needReply = true, const TString& name = ""); + TBusServerSessionPtr CreateDestination(TBusProtocol* proto, IBusServerHandler* hander, const TBusServerSessionConfig& config, const TString& name = ""); + TBusServerSessionPtr CreateDestination(TBusProtocol* proto, IBusServerHandler* hander, const TBusServerSessionConfig& config, const TVector<TBindResult>& bindTo, const TString& name = ""); + + private: + void Destroy(TBusSession* session); + void Destroy(TBusSyncClientSessionPtr session); + + public: + void Schedule(NPrivate::IScheduleItemAutoPtr i); + + private: + void DestroyAllSessions(); + void Add(TIntrusivePtr< ::NBus::NPrivate::TBusSessionImpl> session); + void Remove(TBusSession* session); + }; + + ///////////////////////////////////////////////////////////////// + /// Factory methods to construct message queue + TBusMessageQueuePtr CreateMessageQueue(const char* name = ""); + TBusMessageQueuePtr CreateMessageQueue(NActor::TExecutorPtr executor, const char* name = ""); + TBusMessageQueuePtr CreateMessageQueue(const TBusQueueConfig& config, const char* name = ""); + TBusMessageQueuePtr CreateMessageQueue(const TBusQueueConfig& config, TBusLocator* locator, const char* name = ""); + TBusMessageQueuePtr CreateMessageQueue(const TBusQueueConfig& config, NActor::TExecutorPtr executor, TBusLocator* locator, const char* name = ""); + +} |