aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/message.h
diff options
context:
space:
mode:
authornga <nga@yandex-team.ru>2022-02-10 16:48:09 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:09 +0300
commitc2a1af049e9deca890e9923abe64fe6c59060348 (patch)
treeb222e5ac2e2e98872661c51ccceee5da0d291e13 /library/cpp/messagebus/message.h
parent1f553f46fb4f3c5eec631352cdd900a0709016af (diff)
downloadydb-c2a1af049e9deca890e9923abe64fe6c59060348.tar.gz
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/message.h')
-rw-r--r--library/cpp/messagebus/message.h134
1 files changed, 67 insertions, 67 deletions
diff --git a/library/cpp/messagebus/message.h b/library/cpp/messagebus/message.h
index 6fce2b1fac..005ca10c65 100644
--- a/library/cpp/messagebus/message.h
+++ b/library/cpp/messagebus/message.h
@@ -1,5 +1,5 @@
-#pragma once
-
+#pragma once
+
#include "base.h"
#include "local_flags.h"
#include "message_status.h"
@@ -8,16 +8,16 @@
#include <util/generic/array_ref.h>
#include <util/generic/noncopyable.h>
-#include <util/generic/ptr.h>
+#include <util/generic/ptr.h>
#include <util/generic/string.h>
#include <util/system/defaults.h>
#include <util/system/type_name.h>
-#include <util/system/yassert.h>
-
+#include <util/system/yassert.h>
+
#include <optional>
#include <typeinfo>
-namespace NBus {
+namespace NBus {
///////////////////////////////////////////////////////////////////
/// \brief Structure to preserve identity from message to reply
struct TBusIdentity : TNonCopyable {
@@ -25,33 +25,33 @@ namespace NBus {
friend class NPrivate::TRemoteServerSession;
friend struct NPrivate::TClientRequestImpl;
friend class TOnMessageContext;
-
+
// TODO: make private
TBusKey MessageId;
-
+
private:
ui32 Size;
TIntrusivePtr<NPrivate::TRemoteServerConnection> Connection;
ui16 Flags;
ui32 LocalFlags;
TInstant RecvTime;
-
-#ifndef NDEBUG
+
+#ifndef NDEBUG
std::optional<TString> MessageType;
-#endif
-
+#endif
+
private:
// TODO: drop
TNetAddr GetNetAddr() const;
-
+
public:
void Pack(char* dest);
void Unpack(const char* src);
-
+
bool IsInWork() const {
return LocalFlags & NPrivate::MESSAGE_IN_WORK;
}
-
+
// for internal use only
void BeginWork() {
SetInWork(true);
@@ -64,7 +64,7 @@ namespace NBus {
TBusIdentity();
~TBusIdentity();
-
+
void Swap(TBusIdentity& that) {
DoSwap(MessageId, that.MessageId);
DoSwap(Size, that.Size);
@@ -72,13 +72,13 @@ namespace NBus {
DoSwap(Flags, that.Flags);
DoSwap(LocalFlags, that.LocalFlags);
DoSwap(RecvTime, that.RecvTime);
-#ifndef NDEBUG
+#ifndef NDEBUG
DoSwap(MessageType, that.MessageType);
-#endif
+#endif
}
-
+
TString ToString() const;
-
+
private:
void SetInWork(bool inWork) {
if (LocalFlags == 0 && inWork) {
@@ -89,20 +89,20 @@ namespace NBus {
Y_FAIL("impossible combination of flag and parameter: %s %d",
inWork ? "true" : "false", unsigned(LocalFlags));
}
- }
-
+ }
+
void SetMessageType(const std::type_info& messageTypeInfo) {
-#ifndef NDEBUG
+#ifndef NDEBUG
Y_VERIFY(!MessageType, "state check");
MessageType = TypeName(messageTypeInfo);
-#else
+#else
Y_UNUSED(messageTypeInfo);
-#endif
+#endif
}
};
-
+
static const size_t BUS_IDENTITY_PACKED_SIZE = sizeof(TBusIdentity);
-
+
///////////////////////////////////////////////////////////////
/// \brief Message flags in TBusHeader.Flags
enum EMessageFlags {
@@ -110,59 +110,59 @@ namespace NBus {
MESSAGE_COMPRESS_RESPONSE = 0x4000, ///< message prefers compressed response
MESSAGE_VERSION_INTERNAL = 0x00F0, ///< these bits are used as version
};
-
-//////////////////////////////////////////////////////////
-/// \brief Message header present in all message send and received
-
-/// This header is send into the wire.
-/// \todo fix for low/high end, 32/64bit some day
-#pragma pack(1)
+
+//////////////////////////////////////////////////////////
+/// \brief Message header present in all message send and received
+
+/// This header is send into the wire.
+/// \todo fix for low/high end, 32/64bit some day
+#pragma pack(1)
struct TBusHeader {
friend class TBusMessage;
-
+
TBusKey Id = 0; ///< unique message ID
ui32 Size = 0; ///< total size of the message
TBusInstant SendTime = 0; ///< time the message was sent
ui16 FlagsInternal = 0; ///< TRACE is one of the flags
ui16 Type = 0; ///< to be used by TBusProtocol
-
+
int GetVersionInternal() {
return (FlagsInternal & MESSAGE_VERSION_INTERNAL) >> 4;
}
void SetVersionInternal(unsigned ver = YBUS_VERSION) {
FlagsInternal |= (ver << 4);
}
-
+
public:
TBusHeader() {
}
TBusHeader(TArrayRef<const char> data) {
ReadHeader(data);
}
-
+
private:
/// function for serialization/deserialization of the header
/// returns number of bytes written/read
int ReadHeader(TArrayRef<const char> data);
-
+
void GenerateId();
};
-#pragma pack()
-
+#pragma pack()
+
#define TBUSMAX_MESSAGE 26 * 1024 * 1024 + sizeof(NBus::TBusHeader) ///< is't it enough?
#define TBUSMIN_MESSAGE sizeof(NBus::TBusHeader) ///< can't be less then header
-
+
inline bool IsVersionNegotiation(const NBus::TBusHeader& header) {
return header.Id == 0 && header.Size == sizeof(TBusHeader);
}
//////////////////////////////////////////////////////////
/// \brief Base class for all messages passed in the system
-
+
enum ECreateUninitialized {
MESSAGE_CREATE_UNINITIALIZED,
};
-
+
class TBusMessage
: protected TBusHeader,
public TRefCounted<TBusMessage, TAtomicCounter, TDelete>,
@@ -175,35 +175,35 @@ namespace NBus {
friend class ::NBus::NPrivate::TRemoteClientConnection;
friend class ::NBus::NPrivate::TRemoteServerConnection;
friend struct ::NBus::NPrivate::TBusMessagePtrAndHeader;
-
+
private:
ui32 LocalFlags;
-
+
/// connection identity for reply set by PushMessage()
NPrivate::TBusSocketAddr ReplyTo;
// server-side response only, hack
ui32 RequestSize;
-
+
TInstant RecvTime;
-
+
public:
/// constructor to create messages on sending end
TBusMessage(ui16 type, int approxsize = sizeof(TBusHeader));
-
+
/// constructor with serialzed data to examine the header
TBusMessage(ECreateUninitialized);
-
+
// slow, for diagnostics only
virtual TString Describe() const;
-
+
// must be called if this message object needs to be reused
void Reset();
-
+
void CheckClean() const;
-
+
void SetCompressed(bool);
void SetCompressedResponse(bool);
-
+
private:
bool IsCompressed() const {
return FlagsInternal & MESSAGE_COMPRESS_INTERNAL;
@@ -211,11 +211,11 @@ namespace NBus {
bool IsCompressedResponse() const {
return FlagsInternal & MESSAGE_COMPRESS_RESPONSE;
}
-
+
public:
/// can have private data to destroy
virtual ~TBusMessage();
-
+
/// returns header of the message
TBusHeader* GetHeader() {
return this;
@@ -223,50 +223,50 @@ namespace NBus {
const TBusHeader* GetHeader() const {
return this;
}
-
+
/// helper to return type for protocol object to unpack object
static ui16 GetType(TArrayRef<const char> data) {
return TBusHeader(data).Type;
}
-
+
/// returns payload data
static TArrayRef<const char> GetPayload(TArrayRef<const char> data) {
return data.Slice(sizeof(TBusHeader));
}
-
+
private:
void DoReset();
-
+
/// serialize message identity to be used to construct reply message
void GetIdentity(TBusIdentity& ident) const;
-
+
/// set message identity from serialized form
void SetIdentity(const TBusIdentity& ident);
-
+
public:
TNetAddr GetReplyTo() const {
return ReplyTo.ToNetAddr();
}
-
+
/// store of application specific data, never serialized into wire
void* Data;
};
-
+
class TBusMessageAutoPtr: public TAutoPtr<TBusMessage> {
public:
TBusMessageAutoPtr() {
}
-
+
TBusMessageAutoPtr(TBusMessage* message)
: TAutoPtr<TBusMessage>(message)
{
}
-
+
template <typename T1>
TBusMessageAutoPtr(const TAutoPtr<T1>& that)
: TAutoPtr<TBusMessage>(that.Release())
{
}
};
-
+
}