aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/message.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/message.cpp
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/message.cpp')
-rw-r--r--library/cpp/messagebus/message.cpp198
1 files changed, 198 insertions, 0 deletions
diff --git a/library/cpp/messagebus/message.cpp b/library/cpp/messagebus/message.cpp
new file mode 100644
index 0000000000..bfa7ed8e9b
--- /dev/null
+++ b/library/cpp/messagebus/message.cpp
@@ -0,0 +1,198 @@
+#include "remote_server_connection.h"
+#include "ybus.h"
+
+#include <util/random/random.h>
+#include <util/string/printf.h>
+#include <util/system/atomic.h>
+
+#include <string.h>
+
+using namespace NBus;
+
+namespace NBus {
+ using namespace NBus::NPrivate;
+
+ TBusIdentity::TBusIdentity()
+ : MessageId(0)
+ , Size(0)
+ , Flags(0)
+ , LocalFlags(0)
+ {
+ }
+
+ TBusIdentity::~TBusIdentity() {
+ // TODO: print local flags
+#ifndef NDEBUG
+ Y_VERIFY(LocalFlags == 0, "local flags must be zero at this point; message type is %s",
+ MessageType.value_or("unknown").c_str());
+#else
+ Y_VERIFY(LocalFlags == 0, "local flags must be zero at this point");
+#endif
+ }
+
+ TNetAddr TBusIdentity::GetNetAddr() const {
+ if (!!Connection) {
+ return Connection->GetAddr();
+ } else {
+ Y_FAIL();
+ }
+ }
+
+ void TBusIdentity::Pack(char* dest) {
+ memcpy(dest, this, sizeof(TBusIdentity));
+ LocalFlags = 0;
+
+ // prevent decref
+ new (&Connection) TIntrusivePtr<TRemoteServerConnection>;
+ }
+
+ void TBusIdentity::Unpack(const char* src) {
+ Y_VERIFY(LocalFlags == 0);
+ Y_VERIFY(!Connection);
+
+ memcpy(this, src, sizeof(TBusIdentity));
+ }
+
+ void TBusHeader::GenerateId() {
+ for (;;) {
+ Id = RandomNumber<TBusKey>();
+ // Skip reserved ids
+ if (IsBusKeyValid(Id))
+ return;
+ }
+ }
+
+ TBusMessage::TBusMessage(ui16 type, int approxsize)
+ //: TCtr("BusMessage")
+ : TRefCounted<TBusMessage, TAtomicCounter, TDelete>(1)
+ , LocalFlags(0)
+ , RequestSize(0)
+ , Data(nullptr)
+ {
+ Y_UNUSED(approxsize);
+ GetHeader()->Type = type;
+ DoReset();
+ }
+
+ TBusMessage::TBusMessage(ECreateUninitialized)
+ //: TCtr("BusMessage")
+ : TRefCounted<TBusMessage, TAtomicCounter, TDelete>(1)
+ , LocalFlags(0)
+ , Data(nullptr)
+ {
+ }
+
+ TString TBusMessage::Describe() const {
+ return Sprintf("object type: %s, message type: %d", TypeName(*this).data(), int(GetHeader()->Type));
+ }
+
+ TBusMessage::~TBusMessage() {
+#ifndef NDEBUG
+ Y_VERIFY(GetHeader()->Id != YBUS_KEYINVALID, "must not be invalid key, message type: %d, ", int(Type));
+ GetHeader()->Id = YBUS_KEYINVALID;
+ Data = (void*)17;
+ CheckClean();
+#endif
+ }
+
+ void TBusMessage::DoReset() {
+ GetHeader()->SendTime = 0;
+ GetHeader()->Size = 0;
+ GetHeader()->FlagsInternal = 0;
+ GetHeader()->GenerateId();
+ GetHeader()->SetVersionInternal();
+ }
+
+ void TBusMessage::Reset() {
+ CheckClean();
+ DoReset();
+ }
+
+ void TBusMessage::CheckClean() const {
+ if (Y_UNLIKELY(LocalFlags != 0)) {
+ TString describe = Describe();
+ TString localFlags = LocalFlagSetToString(LocalFlags);
+ Y_FAIL("message local flags must be zero, got: %s, message: %s", localFlags.data(), describe.data());
+ }
+ }
+
+ ///////////////////////////////////////////////////////
+ /// \brief Unpacks header from network order
+
+ /// \todo ntoh instead of memcpy
+ int TBusHeader::ReadHeader(TArrayRef<const char> data) {
+ Y_ASSERT(data.size() >= sizeof(TBusHeader));
+ memcpy(this, data.data(), sizeof(TBusHeader));
+ return sizeof(TBusHeader);
+ }
+
+ ///////////////////////////////////////////////////////
+ /// \brief Packs header to network order
+
+ //////////////////////////////////////////////////////////
+ /// \brief serialize message identity to be used to construct reply message
+
+ /// function stores messageid, flags and connection reply address into the buffer
+ /// that can later be used to construct a reply to the message
+ void TBusMessage::GetIdentity(TBusIdentity& data) const {
+ data.MessageId = GetHeader()->Id;
+ data.Size = GetHeader()->Size;
+ data.Flags = GetHeader()->FlagsInternal;
+ //data.LocalFlags = LocalFlags;
+ }
+
+ ////////////////////////////////////////////////////////////
+ /// \brief set message identity from serialized form
+
+ /// function restores messageid, flags and connection reply address from the buffer
+ /// into the reply message
+ void TBusMessage::SetIdentity(const TBusIdentity& data) {
+ // TODO: wrong assertion: YBUS_KEYMIN is 0
+ Y_ASSERT(data.MessageId != 0);
+ bool compressed = IsCompressed();
+ GetHeader()->Id = data.MessageId;
+ GetHeader()->FlagsInternal = data.Flags;
+ LocalFlags = data.LocalFlags & ~MESSAGE_IN_WORK;
+ ReplyTo = data.Connection->PeerAddrSocketAddr;
+ SetCompressed(compressed || IsCompressedResponse());
+ }
+
+ void TBusMessage::SetCompressed(bool v) {
+ if (v) {
+ GetHeader()->FlagsInternal |= MESSAGE_COMPRESS_INTERNAL;
+ } else {
+ GetHeader()->FlagsInternal &= ~(MESSAGE_COMPRESS_INTERNAL);
+ }
+ }
+
+ void TBusMessage::SetCompressedResponse(bool v) {
+ if (v) {
+ GetHeader()->FlagsInternal |= MESSAGE_COMPRESS_RESPONSE;
+ } else {
+ GetHeader()->FlagsInternal &= ~(MESSAGE_COMPRESS_RESPONSE);
+ }
+ }
+
+ TString TBusIdentity::ToString() const {
+ TStringStream ss;
+ ss << "msg-id=" << MessageId
+ << " size=" << Size;
+ if (!!Connection) {
+ ss << " conn=" << Connection->GetAddr();
+ }
+ ss
+ << " flags=" << Flags
+ << " local-flags=" << LocalFlags
+#ifndef NDEBUG
+ << " msg-type= " << MessageType.value_or("unknown").c_str()
+#endif
+ ;
+ return ss.Str();
+ }
+
+}
+
+template <>
+void Out<TBusIdentity>(IOutputStream& os, TTypeTraits<TBusIdentity>::TFuncParam ident) {
+ os << ident.ToString();
+}