aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/message.h
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/message.h
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/message.h')
-rw-r--r--library/cpp/messagebus/message.h272
1 files changed, 272 insertions, 0 deletions
diff --git a/library/cpp/messagebus/message.h b/library/cpp/messagebus/message.h
new file mode 100644
index 0000000000..005ca10c65
--- /dev/null
+++ b/library/cpp/messagebus/message.h
@@ -0,0 +1,272 @@
+#pragma once
+
+#include "base.h"
+#include "local_flags.h"
+#include "message_status.h"
+#include "netaddr.h"
+#include "socket_addr.h"
+
+#include <util/generic/array_ref.h>
+#include <util/generic/noncopyable.h>
+#include <util/generic/ptr.h>
+#include <util/generic/string.h>
+#include <util/system/defaults.h>
+#include <util/system/type_name.h>
+#include <util/system/yassert.h>
+
+#include <optional>
+#include <typeinfo>
+
+namespace NBus {
+ ///////////////////////////////////////////////////////////////////
+ /// \brief Structure to preserve identity from message to reply
+ struct TBusIdentity : TNonCopyable {
+ friend class TBusMessage;
+ friend class NPrivate::TRemoteServerSession;
+ friend struct NPrivate::TClientRequestImpl;
+ friend class TOnMessageContext;
+
+ // TODO: make private
+ TBusKey MessageId;
+
+ private:
+ ui32 Size;
+ TIntrusivePtr<NPrivate::TRemoteServerConnection> Connection;
+ ui16 Flags;
+ ui32 LocalFlags;
+ TInstant RecvTime;
+
+#ifndef NDEBUG
+ std::optional<TString> MessageType;
+#endif
+
+ private:
+ // TODO: drop
+ TNetAddr GetNetAddr() const;
+
+ public:
+ void Pack(char* dest);
+ void Unpack(const char* src);
+
+ bool IsInWork() const {
+ return LocalFlags & NPrivate::MESSAGE_IN_WORK;
+ }
+
+ // for internal use only
+ void BeginWork() {
+ SetInWork(true);
+ }
+
+ // for internal use only
+ void EndWork() {
+ SetInWork(false);
+ }
+
+ TBusIdentity();
+ ~TBusIdentity();
+
+ void Swap(TBusIdentity& that) {
+ DoSwap(MessageId, that.MessageId);
+ DoSwap(Size, that.Size);
+ DoSwap(Connection, that.Connection);
+ DoSwap(Flags, that.Flags);
+ DoSwap(LocalFlags, that.LocalFlags);
+ DoSwap(RecvTime, that.RecvTime);
+#ifndef NDEBUG
+ DoSwap(MessageType, that.MessageType);
+#endif
+ }
+
+ TString ToString() const;
+
+ private:
+ void SetInWork(bool inWork) {
+ if (LocalFlags == 0 && inWork) {
+ LocalFlags = NPrivate::MESSAGE_IN_WORK;
+ } else if (LocalFlags == NPrivate::MESSAGE_IN_WORK && !inWork) {
+ LocalFlags = 0;
+ } else {
+ Y_FAIL("impossible combination of flag and parameter: %s %d",
+ inWork ? "true" : "false", unsigned(LocalFlags));
+ }
+ }
+
+ void SetMessageType(const std::type_info& messageTypeInfo) {
+#ifndef NDEBUG
+ Y_VERIFY(!MessageType, "state check");
+ MessageType = TypeName(messageTypeInfo);
+#else
+ Y_UNUSED(messageTypeInfo);
+#endif
+ }
+ };
+
+ static const size_t BUS_IDENTITY_PACKED_SIZE = sizeof(TBusIdentity);
+
+ ///////////////////////////////////////////////////////////////
+ /// \brief Message flags in TBusHeader.Flags
+ enum EMessageFlags {
+ MESSAGE_COMPRESS_INTERNAL = 0x8000, ///< message is compressed
+ MESSAGE_COMPRESS_RESPONSE = 0x4000, ///< message prefers compressed response
+ MESSAGE_VERSION_INTERNAL = 0x00F0, ///< these bits are used as version
+ };
+
+//////////////////////////////////////////////////////////
+/// \brief Message header present in all message send and received
+
+/// This header is send into the wire.
+/// \todo fix for low/high end, 32/64bit some day
+#pragma pack(1)
+ struct TBusHeader {
+ friend class TBusMessage;
+
+ TBusKey Id = 0; ///< unique message ID
+ ui32 Size = 0; ///< total size of the message
+ TBusInstant SendTime = 0; ///< time the message was sent
+ ui16 FlagsInternal = 0; ///< TRACE is one of the flags
+ ui16 Type = 0; ///< to be used by TBusProtocol
+
+ int GetVersionInternal() {
+ return (FlagsInternal & MESSAGE_VERSION_INTERNAL) >> 4;
+ }
+ void SetVersionInternal(unsigned ver = YBUS_VERSION) {
+ FlagsInternal |= (ver << 4);
+ }
+
+ public:
+ TBusHeader() {
+ }
+ TBusHeader(TArrayRef<const char> data) {
+ ReadHeader(data);
+ }
+
+ private:
+ /// function for serialization/deserialization of the header
+ /// returns number of bytes written/read
+ int ReadHeader(TArrayRef<const char> data);
+
+ void GenerateId();
+ };
+#pragma pack()
+
+#define TBUSMAX_MESSAGE 26 * 1024 * 1024 + sizeof(NBus::TBusHeader) ///< is't it enough?
+#define TBUSMIN_MESSAGE sizeof(NBus::TBusHeader) ///< can't be less then header
+
+ inline bool IsVersionNegotiation(const NBus::TBusHeader& header) {
+ return header.Id == 0 && header.Size == sizeof(TBusHeader);
+ }
+
+ //////////////////////////////////////////////////////////
+ /// \brief Base class for all messages passed in the system
+
+ enum ECreateUninitialized {
+ MESSAGE_CREATE_UNINITIALIZED,
+ };
+
+ class TBusMessage
+ : protected TBusHeader,
+ public TRefCounted<TBusMessage, TAtomicCounter, TDelete>,
+ private TNonCopyable {
+ friend class TLocalSession;
+ friend struct ::NBus::NPrivate::TBusSessionImpl;
+ friend class ::NBus::NPrivate::TRemoteServerSession;
+ friend class ::NBus::NPrivate::TRemoteClientSession;
+ friend class ::NBus::NPrivate::TRemoteConnection;
+ friend class ::NBus::NPrivate::TRemoteClientConnection;
+ friend class ::NBus::NPrivate::TRemoteServerConnection;
+ friend struct ::NBus::NPrivate::TBusMessagePtrAndHeader;
+
+ private:
+ ui32 LocalFlags;
+
+ /// connection identity for reply set by PushMessage()
+ NPrivate::TBusSocketAddr ReplyTo;
+ // server-side response only, hack
+ ui32 RequestSize;
+
+ TInstant RecvTime;
+
+ public:
+ /// constructor to create messages on sending end
+ TBusMessage(ui16 type, int approxsize = sizeof(TBusHeader));
+
+ /// constructor with serialzed data to examine the header
+ TBusMessage(ECreateUninitialized);
+
+ // slow, for diagnostics only
+ virtual TString Describe() const;
+
+ // must be called if this message object needs to be reused
+ void Reset();
+
+ void CheckClean() const;
+
+ void SetCompressed(bool);
+ void SetCompressedResponse(bool);
+
+ private:
+ bool IsCompressed() const {
+ return FlagsInternal & MESSAGE_COMPRESS_INTERNAL;
+ }
+ bool IsCompressedResponse() const {
+ return FlagsInternal & MESSAGE_COMPRESS_RESPONSE;
+ }
+
+ public:
+ /// can have private data to destroy
+ virtual ~TBusMessage();
+
+ /// returns header of the message
+ TBusHeader* GetHeader() {
+ return this;
+ }
+ const TBusHeader* GetHeader() const {
+ return this;
+ }
+
+ /// helper to return type for protocol object to unpack object
+ static ui16 GetType(TArrayRef<const char> data) {
+ return TBusHeader(data).Type;
+ }
+
+ /// returns payload data
+ static TArrayRef<const char> GetPayload(TArrayRef<const char> data) {
+ return data.Slice(sizeof(TBusHeader));
+ }
+
+ private:
+ void DoReset();
+
+ /// serialize message identity to be used to construct reply message
+ void GetIdentity(TBusIdentity& ident) const;
+
+ /// set message identity from serialized form
+ void SetIdentity(const TBusIdentity& ident);
+
+ public:
+ TNetAddr GetReplyTo() const {
+ return ReplyTo.ToNetAddr();
+ }
+
+ /// store of application specific data, never serialized into wire
+ void* Data;
+ };
+
+ class TBusMessageAutoPtr: public TAutoPtr<TBusMessage> {
+ public:
+ TBusMessageAutoPtr() {
+ }
+
+ TBusMessageAutoPtr(TBusMessage* message)
+ : TAutoPtr<TBusMessage>(message)
+ {
+ }
+
+ template <typename T1>
+ TBusMessageAutoPtr(const TAutoPtr<T1>& that)
+ : TAutoPtr<TBusMessage>(that.Release())
+ {
+ }
+ };
+
+}