aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/message.cpp
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:17 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:17 +0300
commitd3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch)
treedd4bd3ca0f36b817e96812825ffaf10d645803f2 /library/cpp/messagebus/message.cpp
parent72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff)
downloadydb-d3a398281c6fd1d3672036cb2d63f842d2cb28c5.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/message.cpp')
-rw-r--r--library/cpp/messagebus/message.cpp324
1 files changed, 162 insertions, 162 deletions
diff --git a/library/cpp/messagebus/message.cpp b/library/cpp/messagebus/message.cpp
index a57e235977..bfa7ed8e9b 100644
--- a/library/cpp/messagebus/message.cpp
+++ b/library/cpp/messagebus/message.cpp
@@ -10,185 +10,185 @@
using namespace NBus;
namespace NBus {
- using namespace NBus::NPrivate;
-
- TBusIdentity::TBusIdentity()
- : MessageId(0)
- , Size(0)
- , Flags(0)
- , LocalFlags(0)
- {
- }
+ using namespace NBus::NPrivate;
+
+ TBusIdentity::TBusIdentity()
+ : MessageId(0)
+ , Size(0)
+ , Flags(0)
+ , LocalFlags(0)
+ {
+ }
- TBusIdentity::~TBusIdentity() {
+ TBusIdentity::~TBusIdentity() {
// TODO: print local flags
#ifndef NDEBUG
- Y_VERIFY(LocalFlags == 0, "local flags must be zero at this point; message type is %s",
+ Y_VERIFY(LocalFlags == 0, "local flags must be zero at this point; message type is %s",
MessageType.value_or("unknown").c_str());
#else
- Y_VERIFY(LocalFlags == 0, "local flags must be zero at this point");
+ Y_VERIFY(LocalFlags == 0, "local flags must be zero at this point");
#endif
- }
-
- TNetAddr TBusIdentity::GetNetAddr() const {
- if (!!Connection) {
- return Connection->GetAddr();
- } else {
- Y_FAIL();
- }
- }
-
- void TBusIdentity::Pack(char* dest) {
- memcpy(dest, this, sizeof(TBusIdentity));
- LocalFlags = 0;
-
- // prevent decref
- new (&Connection) TIntrusivePtr<TRemoteServerConnection>;
- }
-
- void TBusIdentity::Unpack(const char* src) {
- Y_VERIFY(LocalFlags == 0);
- Y_VERIFY(!Connection);
-
- memcpy(this, src, sizeof(TBusIdentity));
- }
-
- void TBusHeader::GenerateId() {
- for (;;) {
- Id = RandomNumber<TBusKey>();
- // Skip reserved ids
- if (IsBusKeyValid(Id))
- return;
- }
- }
-
- TBusMessage::TBusMessage(ui16 type, int approxsize)
- //: TCtr("BusMessage")
- : TRefCounted<TBusMessage, TAtomicCounter, TDelete>(1)
- , LocalFlags(0)
- , RequestSize(0)
- , Data(nullptr)
- {
- Y_UNUSED(approxsize);
- GetHeader()->Type = type;
- DoReset();
- }
-
- TBusMessage::TBusMessage(ECreateUninitialized)
- //: TCtr("BusMessage")
- : TRefCounted<TBusMessage, TAtomicCounter, TDelete>(1)
- , LocalFlags(0)
- , Data(nullptr)
- {
- }
-
- TString TBusMessage::Describe() const {
+ }
+
+ TNetAddr TBusIdentity::GetNetAddr() const {
+ if (!!Connection) {
+ return Connection->GetAddr();
+ } else {
+ Y_FAIL();
+ }
+ }
+
+ void TBusIdentity::Pack(char* dest) {
+ memcpy(dest, this, sizeof(TBusIdentity));
+ LocalFlags = 0;
+
+ // prevent decref
+ new (&Connection) TIntrusivePtr<TRemoteServerConnection>;
+ }
+
+ void TBusIdentity::Unpack(const char* src) {
+ Y_VERIFY(LocalFlags == 0);
+ Y_VERIFY(!Connection);
+
+ memcpy(this, src, sizeof(TBusIdentity));
+ }
+
+ void TBusHeader::GenerateId() {
+ for (;;) {
+ Id = RandomNumber<TBusKey>();
+ // Skip reserved ids
+ if (IsBusKeyValid(Id))
+ return;
+ }
+ }
+
+ TBusMessage::TBusMessage(ui16 type, int approxsize)
+ //: TCtr("BusMessage")
+ : TRefCounted<TBusMessage, TAtomicCounter, TDelete>(1)
+ , LocalFlags(0)
+ , RequestSize(0)
+ , Data(nullptr)
+ {
+ Y_UNUSED(approxsize);
+ GetHeader()->Type = type;
+ DoReset();
+ }
+
+ TBusMessage::TBusMessage(ECreateUninitialized)
+ //: TCtr("BusMessage")
+ : TRefCounted<TBusMessage, TAtomicCounter, TDelete>(1)
+ , LocalFlags(0)
+ , Data(nullptr)
+ {
+ }
+
+ TString TBusMessage::Describe() const {
return Sprintf("object type: %s, message type: %d", TypeName(*this).data(), int(GetHeader()->Type));
- }
+ }
- TBusMessage::~TBusMessage() {
+ TBusMessage::~TBusMessage() {
#ifndef NDEBUG
- Y_VERIFY(GetHeader()->Id != YBUS_KEYINVALID, "must not be invalid key, message type: %d, ", int(Type));
- GetHeader()->Id = YBUS_KEYINVALID;
- Data = (void*)17;
- CheckClean();
+ Y_VERIFY(GetHeader()->Id != YBUS_KEYINVALID, "must not be invalid key, message type: %d, ", int(Type));
+ GetHeader()->Id = YBUS_KEYINVALID;
+ Data = (void*)17;
+ CheckClean();
#endif
- }
-
- void TBusMessage::DoReset() {
- GetHeader()->SendTime = 0;
- GetHeader()->Size = 0;
- GetHeader()->FlagsInternal = 0;
- GetHeader()->GenerateId();
- GetHeader()->SetVersionInternal();
- }
-
- void TBusMessage::Reset() {
- CheckClean();
- DoReset();
- }
-
- void TBusMessage::CheckClean() const {
- if (Y_UNLIKELY(LocalFlags != 0)) {
- TString describe = Describe();
- TString localFlags = LocalFlagSetToString(LocalFlags);
+ }
+
+ void TBusMessage::DoReset() {
+ GetHeader()->SendTime = 0;
+ GetHeader()->Size = 0;
+ GetHeader()->FlagsInternal = 0;
+ GetHeader()->GenerateId();
+ GetHeader()->SetVersionInternal();
+ }
+
+ void TBusMessage::Reset() {
+ CheckClean();
+ DoReset();
+ }
+
+ void TBusMessage::CheckClean() const {
+ if (Y_UNLIKELY(LocalFlags != 0)) {
+ TString describe = Describe();
+ TString localFlags = LocalFlagSetToString(LocalFlags);
Y_FAIL("message local flags must be zero, got: %s, message: %s", localFlags.data(), describe.data());
- }
- }
-
- ///////////////////////////////////////////////////////
- /// \brief Unpacks header from network order
-
- /// \todo ntoh instead of memcpy
- int TBusHeader::ReadHeader(TArrayRef<const char> data) {
- Y_ASSERT(data.size() >= sizeof(TBusHeader));
- memcpy(this, data.data(), sizeof(TBusHeader));
- return sizeof(TBusHeader);
- }
-
- ///////////////////////////////////////////////////////
- /// \brief Packs header to network order
-
- //////////////////////////////////////////////////////////
- /// \brief serialize message identity to be used to construct reply message
-
- /// function stores messageid, flags and connection reply address into the buffer
- /// that can later be used to construct a reply to the message
- void TBusMessage::GetIdentity(TBusIdentity& data) const {
- data.MessageId = GetHeader()->Id;
- data.Size = GetHeader()->Size;
- data.Flags = GetHeader()->FlagsInternal;
- //data.LocalFlags = LocalFlags;
- }
-
- ////////////////////////////////////////////////////////////
- /// \brief set message identity from serialized form
-
- /// function restores messageid, flags and connection reply address from the buffer
- /// into the reply message
- void TBusMessage::SetIdentity(const TBusIdentity& data) {
- // TODO: wrong assertion: YBUS_KEYMIN is 0
- Y_ASSERT(data.MessageId != 0);
- bool compressed = IsCompressed();
- GetHeader()->Id = data.MessageId;
- GetHeader()->FlagsInternal = data.Flags;
- LocalFlags = data.LocalFlags & ~MESSAGE_IN_WORK;
- ReplyTo = data.Connection->PeerAddrSocketAddr;
- SetCompressed(compressed || IsCompressedResponse());
- }
-
- void TBusMessage::SetCompressed(bool v) {
- if (v) {
- GetHeader()->FlagsInternal |= MESSAGE_COMPRESS_INTERNAL;
- } else {
- GetHeader()->FlagsInternal &= ~(MESSAGE_COMPRESS_INTERNAL);
- }
- }
-
- void TBusMessage::SetCompressedResponse(bool v) {
- if (v) {
- GetHeader()->FlagsInternal |= MESSAGE_COMPRESS_RESPONSE;
- } else {
- GetHeader()->FlagsInternal &= ~(MESSAGE_COMPRESS_RESPONSE);
- }
- }
-
- TString TBusIdentity::ToString() const {
- TStringStream ss;
- ss << "msg-id=" << MessageId
- << " size=" << Size;
- if (!!Connection) {
- ss << " conn=" << Connection->GetAddr();
- }
- ss
+ }
+ }
+
+ ///////////////////////////////////////////////////////
+ /// \brief Unpacks header from network order
+
+ /// \todo ntoh instead of memcpy
+ int TBusHeader::ReadHeader(TArrayRef<const char> data) {
+ Y_ASSERT(data.size() >= sizeof(TBusHeader));
+ memcpy(this, data.data(), sizeof(TBusHeader));
+ return sizeof(TBusHeader);
+ }
+
+ ///////////////////////////////////////////////////////
+ /// \brief Packs header to network order
+
+ //////////////////////////////////////////////////////////
+ /// \brief serialize message identity to be used to construct reply message
+
+ /// function stores messageid, flags and connection reply address into the buffer
+ /// that can later be used to construct a reply to the message
+ void TBusMessage::GetIdentity(TBusIdentity& data) const {
+ data.MessageId = GetHeader()->Id;
+ data.Size = GetHeader()->Size;
+ data.Flags = GetHeader()->FlagsInternal;
+ //data.LocalFlags = LocalFlags;
+ }
+
+ ////////////////////////////////////////////////////////////
+ /// \brief set message identity from serialized form
+
+ /// function restores messageid, flags and connection reply address from the buffer
+ /// into the reply message
+ void TBusMessage::SetIdentity(const TBusIdentity& data) {
+ // TODO: wrong assertion: YBUS_KEYMIN is 0
+ Y_ASSERT(data.MessageId != 0);
+ bool compressed = IsCompressed();
+ GetHeader()->Id = data.MessageId;
+ GetHeader()->FlagsInternal = data.Flags;
+ LocalFlags = data.LocalFlags & ~MESSAGE_IN_WORK;
+ ReplyTo = data.Connection->PeerAddrSocketAddr;
+ SetCompressed(compressed || IsCompressedResponse());
+ }
+
+ void TBusMessage::SetCompressed(bool v) {
+ if (v) {
+ GetHeader()->FlagsInternal |= MESSAGE_COMPRESS_INTERNAL;
+ } else {
+ GetHeader()->FlagsInternal &= ~(MESSAGE_COMPRESS_INTERNAL);
+ }
+ }
+
+ void TBusMessage::SetCompressedResponse(bool v) {
+ if (v) {
+ GetHeader()->FlagsInternal |= MESSAGE_COMPRESS_RESPONSE;
+ } else {
+ GetHeader()->FlagsInternal &= ~(MESSAGE_COMPRESS_RESPONSE);
+ }
+ }
+
+ TString TBusIdentity::ToString() const {
+ TStringStream ss;
+ ss << "msg-id=" << MessageId
+ << " size=" << Size;
+ if (!!Connection) {
+ ss << " conn=" << Connection->GetAddr();
+ }
+ ss
<< " flags=" << Flags
<< " local-flags=" << LocalFlags
#ifndef NDEBUG
<< " msg-type= " << MessageType.value_or("unknown").c_str()
#endif
;
- return ss.Str();
- }
+ return ss.Str();
+ }
}