aboutsummaryrefslogblamecommitdiffstats
path: root/library/cpp/messagebus/message.cpp
blob: bfa7ed8e9ba613a4966f58689986b722d61cfab7 (plain) (tree)
1
2
3
4
5
6
7
8
                                     
 
                               
                               
                   


                     







                                   
 
                                   
                              
                                                                                               
                                                          
                                                                            
      





















































                                                                 
                                                                                                            
     
 
                                 
              


                                                                                                               
      

















                                                                  
                                                                                                                 


































































                                                                                       




                                                                       
                        





                                                                                        
#include "remote_server_connection.h"
#include "ybus.h"

#include <util/random/random.h>
#include <util/string/printf.h>
#include <util/system/atomic.h>

#include <string.h>

using namespace NBus;

namespace NBus {
    using namespace NBus::NPrivate;

    TBusIdentity::TBusIdentity()
        : MessageId(0)
        , Size(0)
        , Flags(0)
        , LocalFlags(0)
    {
    }

    TBusIdentity::~TBusIdentity() {
    // TODO: print local flags
#ifndef NDEBUG
        Y_VERIFY(LocalFlags == 0, "local flags must be zero at this point; message type is %s",
                 MessageType.value_or("unknown").c_str());
#else
        Y_VERIFY(LocalFlags == 0, "local flags must be zero at this point");
#endif
    }

    TNetAddr TBusIdentity::GetNetAddr() const {
        if (!!Connection) {
            return Connection->GetAddr();
        } else {
            Y_FAIL();
        }
    }

    void TBusIdentity::Pack(char* dest) {
        memcpy(dest, this, sizeof(TBusIdentity));
        LocalFlags = 0;

        // prevent decref
        new (&Connection) TIntrusivePtr<TRemoteServerConnection>;
    }

    void TBusIdentity::Unpack(const char* src) {
        Y_VERIFY(LocalFlags == 0);
        Y_VERIFY(!Connection);

        memcpy(this, src, sizeof(TBusIdentity));
    }

    void TBusHeader::GenerateId() {
        for (;;) {
            Id = RandomNumber<TBusKey>();
            // Skip reserved ids
            if (IsBusKeyValid(Id))
                return;
        }
    }

    TBusMessage::TBusMessage(ui16 type, int approxsize)
        //: TCtr("BusMessage")
        : TRefCounted<TBusMessage, TAtomicCounter, TDelete>(1)
        , LocalFlags(0)
        , RequestSize(0)
        , Data(nullptr)
    {
        Y_UNUSED(approxsize);
        GetHeader()->Type = type;
        DoReset();
    }

    TBusMessage::TBusMessage(ECreateUninitialized)
        //: TCtr("BusMessage")
        : TRefCounted<TBusMessage, TAtomicCounter, TDelete>(1)
        , LocalFlags(0)
        , Data(nullptr)
    {
    }

    TString TBusMessage::Describe() const {
        return Sprintf("object type: %s, message type: %d", TypeName(*this).data(), int(GetHeader()->Type));
    }

    TBusMessage::~TBusMessage() {
#ifndef NDEBUG
        Y_VERIFY(GetHeader()->Id != YBUS_KEYINVALID, "must not be invalid key, message type: %d, ", int(Type));
        GetHeader()->Id = YBUS_KEYINVALID;
        Data = (void*)17;
        CheckClean();
#endif
    }

    void TBusMessage::DoReset() {
        GetHeader()->SendTime = 0;
        GetHeader()->Size = 0;
        GetHeader()->FlagsInternal = 0;
        GetHeader()->GenerateId();
        GetHeader()->SetVersionInternal();
    }

    void TBusMessage::Reset() {
        CheckClean();
        DoReset();
    }

    void TBusMessage::CheckClean() const {
        if (Y_UNLIKELY(LocalFlags != 0)) {
            TString describe = Describe();
            TString localFlags = LocalFlagSetToString(LocalFlags);
            Y_FAIL("message local flags must be zero, got: %s, message: %s", localFlags.data(), describe.data());
        }
    }

    ///////////////////////////////////////////////////////
    /// \brief Unpacks header from network order

    /// \todo ntoh instead of memcpy
    int TBusHeader::ReadHeader(TArrayRef<const char> data) {
        Y_ASSERT(data.size() >= sizeof(TBusHeader));
        memcpy(this, data.data(), sizeof(TBusHeader));
        return sizeof(TBusHeader);
    }

    ///////////////////////////////////////////////////////
    /// \brief Packs header to network order

    //////////////////////////////////////////////////////////
    /// \brief serialize message identity to be used to construct reply message

    /// function stores messageid, flags and connection reply address into the buffer
    /// that can later be used to construct a reply to the message
    void TBusMessage::GetIdentity(TBusIdentity& data) const {
        data.MessageId = GetHeader()->Id;
        data.Size = GetHeader()->Size;
        data.Flags = GetHeader()->FlagsInternal;
        //data.LocalFlags = LocalFlags;
    }

    ////////////////////////////////////////////////////////////
    /// \brief set message identity from serialized form

    /// function restores messageid, flags and connection reply address from the buffer
    /// into the reply message
    void TBusMessage::SetIdentity(const TBusIdentity& data) {
        // TODO: wrong assertion: YBUS_KEYMIN is 0
        Y_ASSERT(data.MessageId != 0);
        bool compressed = IsCompressed();
        GetHeader()->Id = data.MessageId;
        GetHeader()->FlagsInternal = data.Flags;
        LocalFlags = data.LocalFlags & ~MESSAGE_IN_WORK;
        ReplyTo = data.Connection->PeerAddrSocketAddr;
        SetCompressed(compressed || IsCompressedResponse());
    }

    void TBusMessage::SetCompressed(bool v) {
        if (v) {
            GetHeader()->FlagsInternal |= MESSAGE_COMPRESS_INTERNAL;
        } else {
            GetHeader()->FlagsInternal &= ~(MESSAGE_COMPRESS_INTERNAL);
        }
    }

    void TBusMessage::SetCompressedResponse(bool v) {
        if (v) {
            GetHeader()->FlagsInternal |= MESSAGE_COMPRESS_RESPONSE;
        } else {
            GetHeader()->FlagsInternal &= ~(MESSAGE_COMPRESS_RESPONSE);
        }
    }

    TString TBusIdentity::ToString() const {
        TStringStream ss;
        ss << "msg-id=" << MessageId
           << " size=" << Size;
        if (!!Connection) {
            ss << " conn=" << Connection->GetAddr();
        }
        ss
            << " flags=" << Flags
            << " local-flags=" << LocalFlags
#ifndef NDEBUG
            << " msg-type= " << MessageType.value_or("unknown").c_str()
#endif
            ;
        return ss.Str();
    }

}

template <>
void Out<TBusIdentity>(IOutputStream& os, TTypeTraits<TBusIdentity>::TFuncParam ident) {
    os << ident.ToString();
}