aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/protobuf/ybusbuf.cpp
blob: e6fe79e2bd14e8d42576479656b908a1cfc59b3e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
#include "ybusbuf.h"
 
#include <library/cpp/messagebus/actor/what_thread_does.h>

#include <google/protobuf/io/coded_stream.h>
 
using namespace NBus; 
 
TBusBufferProtocol::TBusBufferProtocol(TBusService name, int port) 
    : TBusProtocol(name, port) 
{ 
} 
 
TBusBufferProtocol::~TBusBufferProtocol() { 
    for (auto& type : Types) {
        delete type;
    } 
} 
 
TBusBufferBase* TBusBufferProtocol::FindType(int type) { 
    for (unsigned i = 0; i < Types.size(); i++) { 
        if (Types[i]->GetHeader()->Type == type) { 
            return Types[i]; 
        } 
    } 
    return nullptr;
} 
 
bool TBusBufferProtocol::IsRegisteredType(unsigned type) { 
    return TypeMask[type >> 5] & (1 << (type & ((1 << 5) - 1))); 
} 
 
void TBusBufferProtocol::RegisterType(TAutoPtr<TBusBufferBase> mess) { 
    ui32 type = mess->GetHeader()->Type; 
    TypeMask[type >> 5] |= 1 << (type & ((1 << 5) - 1)); 
 
    Types.push_back(mess.Release()); 
} 
 
TArrayRef<TBusBufferBase* const> TBusBufferProtocol::GetTypes() const {
    return Types; 
} 
 
void TBusBufferProtocol::Serialize(const TBusMessage* mess, TBuffer& data) {
    TWhatThreadDoesPushPop pp("serialize protobuf message"); 
 
    const TBusHeader* header = mess->GetHeader();
 
    if (!IsRegisteredType(header->Type)) { 
        Y_FAIL("unknown message type: %d", int(header->Type));
        return; 
    } 
 
    // cast the base from real message 
    const TBusBufferBase* bmess = CheckedCast<const TBusBufferBase*>(mess); 
 
    unsigned size = bmess->GetRecord()->ByteSize(); 
    data.Reserve(data.Size() + size); 
 
    char* after = (char*)bmess->GetRecord()->SerializeWithCachedSizesToArray((ui8*)data.Pos());
    Y_VERIFY(after - data.Pos() == size);
 
    data.Advance(size); 
} 
 
TAutoPtr<TBusMessage> TBusBufferProtocol::Deserialize(ui16 messageType, TArrayRef<const char> payload) {
    TWhatThreadDoesPushPop pp("deserialize protobuf message"); 
 
    TBusBufferBase* messageTemplate = FindType(messageType);
    if (messageTemplate == nullptr) {
        return nullptr;
        //Y_FAIL("unknown message type: %d", unsigned(messageType));
    } 
 
    // clone the base 
    TAutoPtr<TBusBufferBase> bmess = messageTemplate->New(); 
 
    // Need to override protobuf message size limit
    // NOTE: the payload size has already been checked against session MaxMessageSize
    google::protobuf::io::CodedInputStream input(reinterpret_cast<const ui8*>(payload.data()), payload.size());
    input.SetTotalBytesLimit(payload.size());

    bool ok = bmess->GetRecord()->ParseFromCodedStream(&input) && input.ConsumedEntireMessage();
    if (!ok) { 
        return nullptr;
    } 
    return bmess.Release(); 
}