aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/message.cpp
blob: 01e3b5440b7a14b6c15bc3cbafb4fbed7da14e8c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
#include "remote_server_connection.h" 
#include "ybus.h" 

#include <util/random/random.h> 
#include <util/string/printf.h> 
#include <util/system/atomic.h>

#include <string.h> 

using namespace NBus;

namespace NBus {
    using namespace NBus::NPrivate;

    TBusIdentity::TBusIdentity()
        : MessageId(0)
        , Size(0)
        , Flags(0)
        , LocalFlags(0)
    {
    }

    TBusIdentity::~TBusIdentity() {
    // TODO: print local flags
#ifndef NDEBUG
        Y_VERIFY(LocalFlags == 0, "local flags must be zero at this point; message type is %s",
                 MessageType.value_or("unknown").c_str());
#else
        Y_VERIFY(LocalFlags == 0, "local flags must be zero at this point");
#endif
    }

    TNetAddr TBusIdentity::GetNetAddr() const {
        if (!!Connection) {
            return Connection->GetAddr();
        } else {
            Y_FAIL();
        }
    }

    void TBusIdentity::Pack(char* dest) {
        memcpy(dest, this, sizeof(TBusIdentity));
        LocalFlags = 0;

        // prevent decref
        new (&Connection) TIntrusivePtr<TRemoteServerConnection>;
    }

    void TBusIdentity::Unpack(const char* src) {
        Y_VERIFY(LocalFlags == 0);
        Y_VERIFY(!Connection);

        memcpy(this, src, sizeof(TBusIdentity));
    }

    void TBusHeader::GenerateId() {
        for (;;) {
            Id = RandomNumber<TBusKey>();
            // Skip reserved ids
            if (IsBusKeyValid(Id))
                return;
        }
    }

    TBusMessage::TBusMessage(ui16 type, int approxsize)
        //: TCtr("BusMessage")
        : TRefCounted<TBusMessage, TAtomicCounter, TDelete>(1)
        , LocalFlags(0)
        , RequestSize(0)
        , Data(nullptr)
    {
        Y_UNUSED(approxsize);
        GetHeader()->Type = type;
        DoReset();
    }

    TBusMessage::TBusMessage(ECreateUninitialized)
        //: TCtr("BusMessage")
        : TRefCounted<TBusMessage, TAtomicCounter, TDelete>(1)
        , LocalFlags(0)
        , Data(nullptr)
    {
    }

    TString TBusMessage::Describe() const {
        return Sprintf("object type: %s, message type: %d", TypeName(*this).data(), int(GetHeader()->Type));
    }

    TBusMessage::~TBusMessage() {
#ifndef NDEBUG
        Y_VERIFY(GetHeader()->Id != YBUS_KEYINVALID, "must not be invalid key, message type: %d, ", int(Type));
        GetHeader()->Id = YBUS_KEYINVALID;
        Data = (void*)17;
        CheckClean();
#endif
    }

    void TBusMessage::DoReset() {
        GetHeader()->SendTime = 0;
        GetHeader()->Size = 0;
        GetHeader()->FlagsInternal = 0;
        GetHeader()->GenerateId();
        GetHeader()->SetVersionInternal();
    }

    void TBusMessage::Reset() {
        CheckClean();
        DoReset();
    }

    void TBusMessage::CheckClean() const {
        if (Y_UNLIKELY(LocalFlags != 0)) {
            TString describe = Describe();
            TString localFlags = LocalFlagSetToString(LocalFlags);
            Y_FAIL("message local flags must be zero, got: %s, message: %s", localFlags.data(), describe.data());
        }
    }

    ///////////////////////////////////////////////////////
    /// \brief Unpacks header from network order

    /// \todo ntoh instead of memcpy
    int TBusHeader::ReadHeader(TArrayRef<const char> data) {
        Y_ASSERT(data.size() >= sizeof(TBusHeader));
        memcpy(this, data.data(), sizeof(TBusHeader));
        return sizeof(TBusHeader);
    }

    ///////////////////////////////////////////////////////
    /// \brief Packs header to network order

    //////////////////////////////////////////////////////////
    /// \brief serialize message identity to be used to construct reply message

    /// function stores messageid, flags and connection reply address into the buffer
    /// that can later be used to construct a reply to the message
    void TBusMessage::GetIdentity(TBusIdentity& data) const {
        data.MessageId = GetHeader()->Id;
        data.Size = GetHeader()->Size;
        data.Flags = GetHeader()->FlagsInternal;
        //data.LocalFlags = LocalFlags;
    }

    ////////////////////////////////////////////////////////////
    /// \brief set message identity from serialized form

    /// function restores messageid, flags and connection reply address from the buffer
    /// into the reply message
    void TBusMessage::SetIdentity(const TBusIdentity& data) {
        // TODO: wrong assertion: YBUS_KEYMIN is 0
        Y_ASSERT(data.MessageId != 0);
        bool compressed = IsCompressed();
        GetHeader()->Id = data.MessageId;
        GetHeader()->FlagsInternal = data.Flags;
        LocalFlags = data.LocalFlags & ~MESSAGE_IN_WORK;
        ReplyTo = data.Connection->PeerAddrSocketAddr;
        SetCompressed(compressed || IsCompressedResponse());
    }

    void TBusMessage::SetCompressed(bool v) {
        if (v) {
            GetHeader()->FlagsInternal |= MESSAGE_COMPRESS_INTERNAL;
        } else {
            GetHeader()->FlagsInternal &= ~(MESSAGE_COMPRESS_INTERNAL);
        }
    }

    void TBusMessage::SetCompressedResponse(bool v) {
        if (v) {
            GetHeader()->FlagsInternal |= MESSAGE_COMPRESS_RESPONSE;
        } else {
            GetHeader()->FlagsInternal &= ~(MESSAGE_COMPRESS_RESPONSE);
        }
    }

    TString TBusIdentity::ToString() const {
        TStringStream ss;
        ss << "msg-id=" << MessageId
           << " size=" << Size;
        if (!!Connection) {
            ss << " conn=" << Connection->GetAddr();
        }
        ss
            << " flags=" << Flags
            << " local-flags=" << LocalFlags
#ifndef NDEBUG
            << " msg-type= " << MessageType.value_or("unknown").c_str()
#endif
            ;
        return ss.Str();
    }

}

template <>
void Out<TBusIdentity>(IOutputStream& os, TTypeTraits<TBusIdentity>::TFuncParam ident) {
    os << ident.ToString();
}