diff options
author | nga <nga@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
commit | 1f553f46fb4f3c5eec631352cdd900a0709016af (patch) | |
tree | a231fba2c03b440becaea6c86a2702d0bfb0336e /library/cpp/messagebus/protobuf/ybusbuf.cpp | |
parent | c4de7efdedc25b49cbea74bd589eecb61b55b60a (diff) | |
download | ydb-1f553f46fb4f3c5eec631352cdd900a0709016af.tar.gz |
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/protobuf/ybusbuf.cpp')
-rw-r--r-- | library/cpp/messagebus/protobuf/ybusbuf.cpp | 126 |
1 files changed, 63 insertions, 63 deletions
diff --git a/library/cpp/messagebus/protobuf/ybusbuf.cpp b/library/cpp/messagebus/protobuf/ybusbuf.cpp index 63415b3737..e6fe79e2bd 100644 --- a/library/cpp/messagebus/protobuf/ybusbuf.cpp +++ b/library/cpp/messagebus/protobuf/ybusbuf.cpp @@ -1,88 +1,88 @@ #include "ybusbuf.h" - + #include <library/cpp/messagebus/actor/what_thread_does.h> #include <google/protobuf/io/coded_stream.h> - -using namespace NBus; - -TBusBufferProtocol::TBusBufferProtocol(TBusService name, int port) - : TBusProtocol(name, port) -{ -} - -TBusBufferProtocol::~TBusBufferProtocol() { + +using namespace NBus; + +TBusBufferProtocol::TBusBufferProtocol(TBusService name, int port) + : TBusProtocol(name, port) +{ +} + +TBusBufferProtocol::~TBusBufferProtocol() { for (auto& type : Types) { delete type; - } -} - -TBusBufferBase* TBusBufferProtocol::FindType(int type) { - for (unsigned i = 0; i < Types.size(); i++) { - if (Types[i]->GetHeader()->Type == type) { - return Types[i]; - } - } + } +} + +TBusBufferBase* TBusBufferProtocol::FindType(int type) { + for (unsigned i = 0; i < Types.size(); i++) { + if (Types[i]->GetHeader()->Type == type) { + return Types[i]; + } + } return nullptr; -} - -bool TBusBufferProtocol::IsRegisteredType(unsigned type) { - return TypeMask[type >> 5] & (1 << (type & ((1 << 5) - 1))); -} - -void TBusBufferProtocol::RegisterType(TAutoPtr<TBusBufferBase> mess) { - ui32 type = mess->GetHeader()->Type; - TypeMask[type >> 5] |= 1 << (type & ((1 << 5) - 1)); - - Types.push_back(mess.Release()); -} - +} + +bool TBusBufferProtocol::IsRegisteredType(unsigned type) { + return TypeMask[type >> 5] & (1 << (type & ((1 << 5) - 1))); +} + +void TBusBufferProtocol::RegisterType(TAutoPtr<TBusBufferBase> mess) { + ui32 type = mess->GetHeader()->Type; + TypeMask[type >> 5] |= 1 << (type & ((1 << 5) - 1)); + + Types.push_back(mess.Release()); +} + TArrayRef<TBusBufferBase* const> TBusBufferProtocol::GetTypes() const { - return Types; -} - + return Types; +} + void TBusBufferProtocol::Serialize(const TBusMessage* mess, TBuffer& data) { - TWhatThreadDoesPushPop pp("serialize protobuf message"); - + TWhatThreadDoesPushPop pp("serialize protobuf message"); + const TBusHeader* header = mess->GetHeader(); - - if (!IsRegisteredType(header->Type)) { + + if (!IsRegisteredType(header->Type)) { Y_FAIL("unknown message type: %d", int(header->Type)); - return; - } - - // cast the base from real message - const TBusBufferBase* bmess = CheckedCast<const TBusBufferBase*>(mess); - - unsigned size = bmess->GetRecord()->ByteSize(); - data.Reserve(data.Size() + size); - + return; + } + + // cast the base from real message + const TBusBufferBase* bmess = CheckedCast<const TBusBufferBase*>(mess); + + unsigned size = bmess->GetRecord()->ByteSize(); + data.Reserve(data.Size() + size); + char* after = (char*)bmess->GetRecord()->SerializeWithCachedSizesToArray((ui8*)data.Pos()); Y_VERIFY(after - data.Pos() == size); - - data.Advance(size); -} - + + data.Advance(size); +} + TAutoPtr<TBusMessage> TBusBufferProtocol::Deserialize(ui16 messageType, TArrayRef<const char> payload) { - TWhatThreadDoesPushPop pp("deserialize protobuf message"); - + TWhatThreadDoesPushPop pp("deserialize protobuf message"); + TBusBufferBase* messageTemplate = FindType(messageType); if (messageTemplate == nullptr) { return nullptr; //Y_FAIL("unknown message type: %d", unsigned(messageType)); - } - - // clone the base - TAutoPtr<TBusBufferBase> bmess = messageTemplate->New(); - + } + + // clone the base + TAutoPtr<TBusBufferBase> bmess = messageTemplate->New(); + // Need to override protobuf message size limit // NOTE: the payload size has already been checked against session MaxMessageSize google::protobuf::io::CodedInputStream input(reinterpret_cast<const ui8*>(payload.data()), payload.size()); input.SetTotalBytesLimit(payload.size()); bool ok = bmess->GetRecord()->ParseFromCodedStream(&input) && input.ConsumedEntireMessage(); - if (!ok) { + if (!ok) { return nullptr; - } - return bmess.Release(); -} + } + return bmess.Release(); +} |