diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:17 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:17 +0300 |
commit | d3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch) | |
tree | dd4bd3ca0f36b817e96812825ffaf10d645803f2 /library/cpp/messagebus/message.h | |
parent | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff) | |
download | ydb-d3a398281c6fd1d3672036cb2d63f842d2cb28c5.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/message.h')
-rw-r--r-- | library/cpp/messagebus/message.h | 436 |
1 files changed, 218 insertions, 218 deletions
diff --git a/library/cpp/messagebus/message.h b/library/cpp/messagebus/message.h index 275f1859df..005ca10c65 100644 --- a/library/cpp/messagebus/message.h +++ b/library/cpp/messagebus/message.h @@ -18,98 +18,98 @@ #include <typeinfo> namespace NBus { - /////////////////////////////////////////////////////////////////// - /// \brief Structure to preserve identity from message to reply - struct TBusIdentity : TNonCopyable { - friend class TBusMessage; - friend class NPrivate::TRemoteServerSession; - friend struct NPrivate::TClientRequestImpl; - friend class TOnMessageContext; - - // TODO: make private - TBusKey MessageId; - - private: - ui32 Size; - TIntrusivePtr<NPrivate::TRemoteServerConnection> Connection; - ui16 Flags; - ui32 LocalFlags; - TInstant RecvTime; + /////////////////////////////////////////////////////////////////// + /// \brief Structure to preserve identity from message to reply + struct TBusIdentity : TNonCopyable { + friend class TBusMessage; + friend class NPrivate::TRemoteServerSession; + friend struct NPrivate::TClientRequestImpl; + friend class TOnMessageContext; + + // TODO: make private + TBusKey MessageId; + + private: + ui32 Size; + TIntrusivePtr<NPrivate::TRemoteServerConnection> Connection; + ui16 Flags; + ui32 LocalFlags; + TInstant RecvTime; #ifndef NDEBUG std::optional<TString> MessageType; #endif private: - // TODO: drop - TNetAddr GetNetAddr() const; - - public: - void Pack(char* dest); - void Unpack(const char* src); - - bool IsInWork() const { - return LocalFlags & NPrivate::MESSAGE_IN_WORK; - } - - // for internal use only - void BeginWork() { - SetInWork(true); - } - - // for internal use only - void EndWork() { - SetInWork(false); - } - - TBusIdentity(); - ~TBusIdentity(); - - void Swap(TBusIdentity& that) { - DoSwap(MessageId, that.MessageId); - DoSwap(Size, that.Size); - DoSwap(Connection, that.Connection); - DoSwap(Flags, that.Flags); - DoSwap(LocalFlags, that.LocalFlags); - DoSwap(RecvTime, that.RecvTime); + // TODO: drop + TNetAddr GetNetAddr() const; + + public: + void Pack(char* dest); + void Unpack(const char* src); + + bool IsInWork() const { + return LocalFlags & NPrivate::MESSAGE_IN_WORK; + } + + // for internal use only + void BeginWork() { + SetInWork(true); + } + + // for internal use only + void EndWork() { + SetInWork(false); + } + + TBusIdentity(); + ~TBusIdentity(); + + void Swap(TBusIdentity& that) { + DoSwap(MessageId, that.MessageId); + DoSwap(Size, that.Size); + DoSwap(Connection, that.Connection); + DoSwap(Flags, that.Flags); + DoSwap(LocalFlags, that.LocalFlags); + DoSwap(RecvTime, that.RecvTime); #ifndef NDEBUG - DoSwap(MessageType, that.MessageType); + DoSwap(MessageType, that.MessageType); #endif - } - - TString ToString() const; - - private: - void SetInWork(bool inWork) { - if (LocalFlags == 0 && inWork) { - LocalFlags = NPrivate::MESSAGE_IN_WORK; - } else if (LocalFlags == NPrivate::MESSAGE_IN_WORK && !inWork) { - LocalFlags = 0; - } else { - Y_FAIL("impossible combination of flag and parameter: %s %d", - inWork ? "true" : "false", unsigned(LocalFlags)); - } + } + + TString ToString() const; + + private: + void SetInWork(bool inWork) { + if (LocalFlags == 0 && inWork) { + LocalFlags = NPrivate::MESSAGE_IN_WORK; + } else if (LocalFlags == NPrivate::MESSAGE_IN_WORK && !inWork) { + LocalFlags = 0; + } else { + Y_FAIL("impossible combination of flag and parameter: %s %d", + inWork ? "true" : "false", unsigned(LocalFlags)); + } } void SetMessageType(const std::type_info& messageTypeInfo) { #ifndef NDEBUG - Y_VERIFY(!MessageType, "state check"); + Y_VERIFY(!MessageType, "state check"); MessageType = TypeName(messageTypeInfo); #else Y_UNUSED(messageTypeInfo); #endif - } - }; + } + }; - static const size_t BUS_IDENTITY_PACKED_SIZE = sizeof(TBusIdentity); + static const size_t BUS_IDENTITY_PACKED_SIZE = sizeof(TBusIdentity); - /////////////////////////////////////////////////////////////// - /// \brief Message flags in TBusHeader.Flags - enum EMessageFlags { - MESSAGE_COMPRESS_INTERNAL = 0x8000, ///< message is compressed - MESSAGE_COMPRESS_RESPONSE = 0x4000, ///< message prefers compressed response - MESSAGE_VERSION_INTERNAL = 0x00F0, ///< these bits are used as version - }; + /////////////////////////////////////////////////////////////// + /// \brief Message flags in TBusHeader.Flags + enum EMessageFlags { + MESSAGE_COMPRESS_INTERNAL = 0x8000, ///< message is compressed + MESSAGE_COMPRESS_RESPONSE = 0x4000, ///< message prefers compressed response + MESSAGE_VERSION_INTERNAL = 0x00F0, ///< these bits are used as version + }; ////////////////////////////////////////////////////////// /// \brief Message header present in all message send and received @@ -117,8 +117,8 @@ namespace NBus { /// This header is send into the wire. /// \todo fix for low/high end, 32/64bit some day #pragma pack(1) - struct TBusHeader { - friend class TBusMessage; + struct TBusHeader { + friend class TBusMessage; TBusKey Id = 0; ///< unique message ID ui32 Size = 0; ///< total size of the message @@ -126,147 +126,147 @@ namespace NBus { ui16 FlagsInternal = 0; ///< TRACE is one of the flags ui16 Type = 0; ///< to be used by TBusProtocol - int GetVersionInternal() { - return (FlagsInternal & MESSAGE_VERSION_INTERNAL) >> 4; - } - void SetVersionInternal(unsigned ver = YBUS_VERSION) { - FlagsInternal |= (ver << 4); - } - - public: - TBusHeader() { - } - TBusHeader(TArrayRef<const char> data) { - ReadHeader(data); - } - - private: - /// function for serialization/deserialization of the header - /// returns number of bytes written/read - int ReadHeader(TArrayRef<const char> data); - - void GenerateId(); - }; + int GetVersionInternal() { + return (FlagsInternal & MESSAGE_VERSION_INTERNAL) >> 4; + } + void SetVersionInternal(unsigned ver = YBUS_VERSION) { + FlagsInternal |= (ver << 4); + } + + public: + TBusHeader() { + } + TBusHeader(TArrayRef<const char> data) { + ReadHeader(data); + } + + private: + /// function for serialization/deserialization of the header + /// returns number of bytes written/read + int ReadHeader(TArrayRef<const char> data); + + void GenerateId(); + }; #pragma pack() -#define TBUSMAX_MESSAGE 26 * 1024 * 1024 + sizeof(NBus::TBusHeader) ///< is't it enough? -#define TBUSMIN_MESSAGE sizeof(NBus::TBusHeader) ///< can't be less then header - - inline bool IsVersionNegotiation(const NBus::TBusHeader& header) { - return header.Id == 0 && header.Size == sizeof(TBusHeader); - } - - ////////////////////////////////////////////////////////// - /// \brief Base class for all messages passed in the system - - enum ECreateUninitialized { - MESSAGE_CREATE_UNINITIALIZED, - }; - - class TBusMessage - : protected TBusHeader, - public TRefCounted<TBusMessage, TAtomicCounter, TDelete>, - private TNonCopyable { - friend class TLocalSession; - friend struct ::NBus::NPrivate::TBusSessionImpl; - friend class ::NBus::NPrivate::TRemoteServerSession; - friend class ::NBus::NPrivate::TRemoteClientSession; - friend class ::NBus::NPrivate::TRemoteConnection; - friend class ::NBus::NPrivate::TRemoteClientConnection; - friend class ::NBus::NPrivate::TRemoteServerConnection; - friend struct ::NBus::NPrivate::TBusMessagePtrAndHeader; - - private: - ui32 LocalFlags; - - /// connection identity for reply set by PushMessage() - NPrivate::TBusSocketAddr ReplyTo; - // server-side response only, hack - ui32 RequestSize; - - TInstant RecvTime; - - public: - /// constructor to create messages on sending end - TBusMessage(ui16 type, int approxsize = sizeof(TBusHeader)); - - /// constructor with serialzed data to examine the header - TBusMessage(ECreateUninitialized); - - // slow, for diagnostics only - virtual TString Describe() const; - - // must be called if this message object needs to be reused - void Reset(); - - void CheckClean() const; - - void SetCompressed(bool); - void SetCompressedResponse(bool); - - private: - bool IsCompressed() const { - return FlagsInternal & MESSAGE_COMPRESS_INTERNAL; - } - bool IsCompressedResponse() const { - return FlagsInternal & MESSAGE_COMPRESS_RESPONSE; - } - - public: - /// can have private data to destroy - virtual ~TBusMessage(); - - /// returns header of the message - TBusHeader* GetHeader() { - return this; - } - const TBusHeader* GetHeader() const { - return this; - } - - /// helper to return type for protocol object to unpack object - static ui16 GetType(TArrayRef<const char> data) { - return TBusHeader(data).Type; - } - - /// returns payload data - static TArrayRef<const char> GetPayload(TArrayRef<const char> data) { - return data.Slice(sizeof(TBusHeader)); - } - - private: - void DoReset(); - - /// serialize message identity to be used to construct reply message - void GetIdentity(TBusIdentity& ident) const; - - /// set message identity from serialized form - void SetIdentity(const TBusIdentity& ident); - - public: - TNetAddr GetReplyTo() const { - return ReplyTo.ToNetAddr(); - } - - /// store of application specific data, never serialized into wire - void* Data; - }; - - class TBusMessageAutoPtr: public TAutoPtr<TBusMessage> { - public: - TBusMessageAutoPtr() { - } - - TBusMessageAutoPtr(TBusMessage* message) - : TAutoPtr<TBusMessage>(message) - { - } - - template <typename T1> - TBusMessageAutoPtr(const TAutoPtr<T1>& that) - : TAutoPtr<TBusMessage>(that.Release()) - { - } - }; - -} +#define TBUSMAX_MESSAGE 26 * 1024 * 1024 + sizeof(NBus::TBusHeader) ///< is't it enough? +#define TBUSMIN_MESSAGE sizeof(NBus::TBusHeader) ///< can't be less then header + + inline bool IsVersionNegotiation(const NBus::TBusHeader& header) { + return header.Id == 0 && header.Size == sizeof(TBusHeader); + } + + ////////////////////////////////////////////////////////// + /// \brief Base class for all messages passed in the system + + enum ECreateUninitialized { + MESSAGE_CREATE_UNINITIALIZED, + }; + + class TBusMessage + : protected TBusHeader, + public TRefCounted<TBusMessage, TAtomicCounter, TDelete>, + private TNonCopyable { + friend class TLocalSession; + friend struct ::NBus::NPrivate::TBusSessionImpl; + friend class ::NBus::NPrivate::TRemoteServerSession; + friend class ::NBus::NPrivate::TRemoteClientSession; + friend class ::NBus::NPrivate::TRemoteConnection; + friend class ::NBus::NPrivate::TRemoteClientConnection; + friend class ::NBus::NPrivate::TRemoteServerConnection; + friend struct ::NBus::NPrivate::TBusMessagePtrAndHeader; + + private: + ui32 LocalFlags; + + /// connection identity for reply set by PushMessage() + NPrivate::TBusSocketAddr ReplyTo; + // server-side response only, hack + ui32 RequestSize; + + TInstant RecvTime; + + public: + /// constructor to create messages on sending end + TBusMessage(ui16 type, int approxsize = sizeof(TBusHeader)); + + /// constructor with serialzed data to examine the header + TBusMessage(ECreateUninitialized); + + // slow, for diagnostics only + virtual TString Describe() const; + + // must be called if this message object needs to be reused + void Reset(); + + void CheckClean() const; + + void SetCompressed(bool); + void SetCompressedResponse(bool); + + private: + bool IsCompressed() const { + return FlagsInternal & MESSAGE_COMPRESS_INTERNAL; + } + bool IsCompressedResponse() const { + return FlagsInternal & MESSAGE_COMPRESS_RESPONSE; + } + + public: + /// can have private data to destroy + virtual ~TBusMessage(); + + /// returns header of the message + TBusHeader* GetHeader() { + return this; + } + const TBusHeader* GetHeader() const { + return this; + } + + /// helper to return type for protocol object to unpack object + static ui16 GetType(TArrayRef<const char> data) { + return TBusHeader(data).Type; + } + + /// returns payload data + static TArrayRef<const char> GetPayload(TArrayRef<const char> data) { + return data.Slice(sizeof(TBusHeader)); + } + + private: + void DoReset(); + + /// serialize message identity to be used to construct reply message + void GetIdentity(TBusIdentity& ident) const; + + /// set message identity from serialized form + void SetIdentity(const TBusIdentity& ident); + + public: + TNetAddr GetReplyTo() const { + return ReplyTo.ToNetAddr(); + } + + /// store of application specific data, never serialized into wire + void* Data; + }; + + class TBusMessageAutoPtr: public TAutoPtr<TBusMessage> { + public: + TBusMessageAutoPtr() { + } + + TBusMessageAutoPtr(TBusMessage* message) + : TAutoPtr<TBusMessage>(message) + { + } + + template <typename T1> + TBusMessageAutoPtr(const TAutoPtr<T1>& that) + : TAutoPtr<TBusMessage>(that.Release()) + { + } + }; + +} |