aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/message.h
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:17 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:17 +0300
commitd3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch)
treedd4bd3ca0f36b817e96812825ffaf10d645803f2 /library/cpp/messagebus/message.h
parent72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff)
downloadydb-d3a398281c6fd1d3672036cb2d63f842d2cb28c5.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/message.h')
-rw-r--r--library/cpp/messagebus/message.h436
1 files changed, 218 insertions, 218 deletions
diff --git a/library/cpp/messagebus/message.h b/library/cpp/messagebus/message.h
index 275f1859df..005ca10c65 100644
--- a/library/cpp/messagebus/message.h
+++ b/library/cpp/messagebus/message.h
@@ -18,98 +18,98 @@
#include <typeinfo>
namespace NBus {
- ///////////////////////////////////////////////////////////////////
- /// \brief Structure to preserve identity from message to reply
- struct TBusIdentity : TNonCopyable {
- friend class TBusMessage;
- friend class NPrivate::TRemoteServerSession;
- friend struct NPrivate::TClientRequestImpl;
- friend class TOnMessageContext;
-
- // TODO: make private
- TBusKey MessageId;
-
- private:
- ui32 Size;
- TIntrusivePtr<NPrivate::TRemoteServerConnection> Connection;
- ui16 Flags;
- ui32 LocalFlags;
- TInstant RecvTime;
+ ///////////////////////////////////////////////////////////////////
+ /// \brief Structure to preserve identity from message to reply
+ struct TBusIdentity : TNonCopyable {
+ friend class TBusMessage;
+ friend class NPrivate::TRemoteServerSession;
+ friend struct NPrivate::TClientRequestImpl;
+ friend class TOnMessageContext;
+
+ // TODO: make private
+ TBusKey MessageId;
+
+ private:
+ ui32 Size;
+ TIntrusivePtr<NPrivate::TRemoteServerConnection> Connection;
+ ui16 Flags;
+ ui32 LocalFlags;
+ TInstant RecvTime;
#ifndef NDEBUG
std::optional<TString> MessageType;
#endif
private:
- // TODO: drop
- TNetAddr GetNetAddr() const;
-
- public:
- void Pack(char* dest);
- void Unpack(const char* src);
-
- bool IsInWork() const {
- return LocalFlags & NPrivate::MESSAGE_IN_WORK;
- }
-
- // for internal use only
- void BeginWork() {
- SetInWork(true);
- }
-
- // for internal use only
- void EndWork() {
- SetInWork(false);
- }
-
- TBusIdentity();
- ~TBusIdentity();
-
- void Swap(TBusIdentity& that) {
- DoSwap(MessageId, that.MessageId);
- DoSwap(Size, that.Size);
- DoSwap(Connection, that.Connection);
- DoSwap(Flags, that.Flags);
- DoSwap(LocalFlags, that.LocalFlags);
- DoSwap(RecvTime, that.RecvTime);
+ // TODO: drop
+ TNetAddr GetNetAddr() const;
+
+ public:
+ void Pack(char* dest);
+ void Unpack(const char* src);
+
+ bool IsInWork() const {
+ return LocalFlags & NPrivate::MESSAGE_IN_WORK;
+ }
+
+ // for internal use only
+ void BeginWork() {
+ SetInWork(true);
+ }
+
+ // for internal use only
+ void EndWork() {
+ SetInWork(false);
+ }
+
+ TBusIdentity();
+ ~TBusIdentity();
+
+ void Swap(TBusIdentity& that) {
+ DoSwap(MessageId, that.MessageId);
+ DoSwap(Size, that.Size);
+ DoSwap(Connection, that.Connection);
+ DoSwap(Flags, that.Flags);
+ DoSwap(LocalFlags, that.LocalFlags);
+ DoSwap(RecvTime, that.RecvTime);
#ifndef NDEBUG
- DoSwap(MessageType, that.MessageType);
+ DoSwap(MessageType, that.MessageType);
#endif
- }
-
- TString ToString() const;
-
- private:
- void SetInWork(bool inWork) {
- if (LocalFlags == 0 && inWork) {
- LocalFlags = NPrivate::MESSAGE_IN_WORK;
- } else if (LocalFlags == NPrivate::MESSAGE_IN_WORK && !inWork) {
- LocalFlags = 0;
- } else {
- Y_FAIL("impossible combination of flag and parameter: %s %d",
- inWork ? "true" : "false", unsigned(LocalFlags));
- }
+ }
+
+ TString ToString() const;
+
+ private:
+ void SetInWork(bool inWork) {
+ if (LocalFlags == 0 && inWork) {
+ LocalFlags = NPrivate::MESSAGE_IN_WORK;
+ } else if (LocalFlags == NPrivate::MESSAGE_IN_WORK && !inWork) {
+ LocalFlags = 0;
+ } else {
+ Y_FAIL("impossible combination of flag and parameter: %s %d",
+ inWork ? "true" : "false", unsigned(LocalFlags));
+ }
}
void SetMessageType(const std::type_info& messageTypeInfo) {
#ifndef NDEBUG
- Y_VERIFY(!MessageType, "state check");
+ Y_VERIFY(!MessageType, "state check");
MessageType = TypeName(messageTypeInfo);
#else
Y_UNUSED(messageTypeInfo);
#endif
- }
- };
+ }
+ };
- static const size_t BUS_IDENTITY_PACKED_SIZE = sizeof(TBusIdentity);
+ static const size_t BUS_IDENTITY_PACKED_SIZE = sizeof(TBusIdentity);
- ///////////////////////////////////////////////////////////////
- /// \brief Message flags in TBusHeader.Flags
- enum EMessageFlags {
- MESSAGE_COMPRESS_INTERNAL = 0x8000, ///< message is compressed
- MESSAGE_COMPRESS_RESPONSE = 0x4000, ///< message prefers compressed response
- MESSAGE_VERSION_INTERNAL = 0x00F0, ///< these bits are used as version
- };
+ ///////////////////////////////////////////////////////////////
+ /// \brief Message flags in TBusHeader.Flags
+ enum EMessageFlags {
+ MESSAGE_COMPRESS_INTERNAL = 0x8000, ///< message is compressed
+ MESSAGE_COMPRESS_RESPONSE = 0x4000, ///< message prefers compressed response
+ MESSAGE_VERSION_INTERNAL = 0x00F0, ///< these bits are used as version
+ };
//////////////////////////////////////////////////////////
/// \brief Message header present in all message send and received
@@ -117,8 +117,8 @@ namespace NBus {
/// This header is send into the wire.
/// \todo fix for low/high end, 32/64bit some day
#pragma pack(1)
- struct TBusHeader {
- friend class TBusMessage;
+ struct TBusHeader {
+ friend class TBusMessage;
TBusKey Id = 0; ///< unique message ID
ui32 Size = 0; ///< total size of the message
@@ -126,147 +126,147 @@ namespace NBus {
ui16 FlagsInternal = 0; ///< TRACE is one of the flags
ui16 Type = 0; ///< to be used by TBusProtocol
- int GetVersionInternal() {
- return (FlagsInternal & MESSAGE_VERSION_INTERNAL) >> 4;
- }
- void SetVersionInternal(unsigned ver = YBUS_VERSION) {
- FlagsInternal |= (ver << 4);
- }
-
- public:
- TBusHeader() {
- }
- TBusHeader(TArrayRef<const char> data) {
- ReadHeader(data);
- }
-
- private:
- /// function for serialization/deserialization of the header
- /// returns number of bytes written/read
- int ReadHeader(TArrayRef<const char> data);
-
- void GenerateId();
- };
+ int GetVersionInternal() {
+ return (FlagsInternal & MESSAGE_VERSION_INTERNAL) >> 4;
+ }
+ void SetVersionInternal(unsigned ver = YBUS_VERSION) {
+ FlagsInternal |= (ver << 4);
+ }
+
+ public:
+ TBusHeader() {
+ }
+ TBusHeader(TArrayRef<const char> data) {
+ ReadHeader(data);
+ }
+
+ private:
+ /// function for serialization/deserialization of the header
+ /// returns number of bytes written/read
+ int ReadHeader(TArrayRef<const char> data);
+
+ void GenerateId();
+ };
#pragma pack()
-#define TBUSMAX_MESSAGE 26 * 1024 * 1024 + sizeof(NBus::TBusHeader) ///< is't it enough?
-#define TBUSMIN_MESSAGE sizeof(NBus::TBusHeader) ///< can't be less then header
-
- inline bool IsVersionNegotiation(const NBus::TBusHeader& header) {
- return header.Id == 0 && header.Size == sizeof(TBusHeader);
- }
-
- //////////////////////////////////////////////////////////
- /// \brief Base class for all messages passed in the system
-
- enum ECreateUninitialized {
- MESSAGE_CREATE_UNINITIALIZED,
- };
-
- class TBusMessage
- : protected TBusHeader,
- public TRefCounted<TBusMessage, TAtomicCounter, TDelete>,
- private TNonCopyable {
- friend class TLocalSession;
- friend struct ::NBus::NPrivate::TBusSessionImpl;
- friend class ::NBus::NPrivate::TRemoteServerSession;
- friend class ::NBus::NPrivate::TRemoteClientSession;
- friend class ::NBus::NPrivate::TRemoteConnection;
- friend class ::NBus::NPrivate::TRemoteClientConnection;
- friend class ::NBus::NPrivate::TRemoteServerConnection;
- friend struct ::NBus::NPrivate::TBusMessagePtrAndHeader;
-
- private:
- ui32 LocalFlags;
-
- /// connection identity for reply set by PushMessage()
- NPrivate::TBusSocketAddr ReplyTo;
- // server-side response only, hack
- ui32 RequestSize;
-
- TInstant RecvTime;
-
- public:
- /// constructor to create messages on sending end
- TBusMessage(ui16 type, int approxsize = sizeof(TBusHeader));
-
- /// constructor with serialzed data to examine the header
- TBusMessage(ECreateUninitialized);
-
- // slow, for diagnostics only
- virtual TString Describe() const;
-
- // must be called if this message object needs to be reused
- void Reset();
-
- void CheckClean() const;
-
- void SetCompressed(bool);
- void SetCompressedResponse(bool);
-
- private:
- bool IsCompressed() const {
- return FlagsInternal & MESSAGE_COMPRESS_INTERNAL;
- }
- bool IsCompressedResponse() const {
- return FlagsInternal & MESSAGE_COMPRESS_RESPONSE;
- }
-
- public:
- /// can have private data to destroy
- virtual ~TBusMessage();
-
- /// returns header of the message
- TBusHeader* GetHeader() {
- return this;
- }
- const TBusHeader* GetHeader() const {
- return this;
- }
-
- /// helper to return type for protocol object to unpack object
- static ui16 GetType(TArrayRef<const char> data) {
- return TBusHeader(data).Type;
- }
-
- /// returns payload data
- static TArrayRef<const char> GetPayload(TArrayRef<const char> data) {
- return data.Slice(sizeof(TBusHeader));
- }
-
- private:
- void DoReset();
-
- /// serialize message identity to be used to construct reply message
- void GetIdentity(TBusIdentity& ident) const;
-
- /// set message identity from serialized form
- void SetIdentity(const TBusIdentity& ident);
-
- public:
- TNetAddr GetReplyTo() const {
- return ReplyTo.ToNetAddr();
- }
-
- /// store of application specific data, never serialized into wire
- void* Data;
- };
-
- class TBusMessageAutoPtr: public TAutoPtr<TBusMessage> {
- public:
- TBusMessageAutoPtr() {
- }
-
- TBusMessageAutoPtr(TBusMessage* message)
- : TAutoPtr<TBusMessage>(message)
- {
- }
-
- template <typename T1>
- TBusMessageAutoPtr(const TAutoPtr<T1>& that)
- : TAutoPtr<TBusMessage>(that.Release())
- {
- }
- };
-
-}
+#define TBUSMAX_MESSAGE 26 * 1024 * 1024 + sizeof(NBus::TBusHeader) ///< is't it enough?
+#define TBUSMIN_MESSAGE sizeof(NBus::TBusHeader) ///< can't be less then header
+
+ inline bool IsVersionNegotiation(const NBus::TBusHeader& header) {
+ return header.Id == 0 && header.Size == sizeof(TBusHeader);
+ }
+
+ //////////////////////////////////////////////////////////
+ /// \brief Base class for all messages passed in the system
+
+ enum ECreateUninitialized {
+ MESSAGE_CREATE_UNINITIALIZED,
+ };
+
+ class TBusMessage
+ : protected TBusHeader,
+ public TRefCounted<TBusMessage, TAtomicCounter, TDelete>,
+ private TNonCopyable {
+ friend class TLocalSession;
+ friend struct ::NBus::NPrivate::TBusSessionImpl;
+ friend class ::NBus::NPrivate::TRemoteServerSession;
+ friend class ::NBus::NPrivate::TRemoteClientSession;
+ friend class ::NBus::NPrivate::TRemoteConnection;
+ friend class ::NBus::NPrivate::TRemoteClientConnection;
+ friend class ::NBus::NPrivate::TRemoteServerConnection;
+ friend struct ::NBus::NPrivate::TBusMessagePtrAndHeader;
+
+ private:
+ ui32 LocalFlags;
+
+ /// connection identity for reply set by PushMessage()
+ NPrivate::TBusSocketAddr ReplyTo;
+ // server-side response only, hack
+ ui32 RequestSize;
+
+ TInstant RecvTime;
+
+ public:
+ /// constructor to create messages on sending end
+ TBusMessage(ui16 type, int approxsize = sizeof(TBusHeader));
+
+ /// constructor with serialzed data to examine the header
+ TBusMessage(ECreateUninitialized);
+
+ // slow, for diagnostics only
+ virtual TString Describe() const;
+
+ // must be called if this message object needs to be reused
+ void Reset();
+
+ void CheckClean() const;
+
+ void SetCompressed(bool);
+ void SetCompressedResponse(bool);
+
+ private:
+ bool IsCompressed() const {
+ return FlagsInternal & MESSAGE_COMPRESS_INTERNAL;
+ }
+ bool IsCompressedResponse() const {
+ return FlagsInternal & MESSAGE_COMPRESS_RESPONSE;
+ }
+
+ public:
+ /// can have private data to destroy
+ virtual ~TBusMessage();
+
+ /// returns header of the message
+ TBusHeader* GetHeader() {
+ return this;
+ }
+ const TBusHeader* GetHeader() const {
+ return this;
+ }
+
+ /// helper to return type for protocol object to unpack object
+ static ui16 GetType(TArrayRef<const char> data) {
+ return TBusHeader(data).Type;
+ }
+
+ /// returns payload data
+ static TArrayRef<const char> GetPayload(TArrayRef<const char> data) {
+ return data.Slice(sizeof(TBusHeader));
+ }
+
+ private:
+ void DoReset();
+
+ /// serialize message identity to be used to construct reply message
+ void GetIdentity(TBusIdentity& ident) const;
+
+ /// set message identity from serialized form
+ void SetIdentity(const TBusIdentity& ident);
+
+ public:
+ TNetAddr GetReplyTo() const {
+ return ReplyTo.ToNetAddr();
+ }
+
+ /// store of application specific data, never serialized into wire
+ void* Data;
+ };
+
+ class TBusMessageAutoPtr: public TAutoPtr<TBusMessage> {
+ public:
+ TBusMessageAutoPtr() {
+ }
+
+ TBusMessageAutoPtr(TBusMessage* message)
+ : TAutoPtr<TBusMessage>(message)
+ {
+ }
+
+ template <typename T1>
+ TBusMessageAutoPtr(const TAutoPtr<T1>& that)
+ : TAutoPtr<TBusMessage>(that.Release())
+ {
+ }
+ };
+
+}