diff options
author | nga <nga@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
commit | c2a1af049e9deca890e9923abe64fe6c59060348 (patch) | |
tree | b222e5ac2e2e98872661c51ccceee5da0d291e13 /library/cpp/messagebus/protobuf | |
parent | 1f553f46fb4f3c5eec631352cdd900a0709016af (diff) | |
download | ydb-c2a1af049e9deca890e9923abe64fe6c59060348.tar.gz |
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/protobuf')
-rw-r--r-- | library/cpp/messagebus/protobuf/ya.make | 22 | ||||
-rw-r--r-- | library/cpp/messagebus/protobuf/ybusbuf.cpp | 126 | ||||
-rw-r--r-- | library/cpp/messagebus/protobuf/ybusbuf.h | 38 |
3 files changed, 93 insertions, 93 deletions
diff --git a/library/cpp/messagebus/protobuf/ya.make b/library/cpp/messagebus/protobuf/ya.make index 35649705fe..64ff240b51 100644 --- a/library/cpp/messagebus/protobuf/ya.make +++ b/library/cpp/messagebus/protobuf/ya.make @@ -1,15 +1,15 @@ -LIBRARY(messagebus_protobuf) - +LIBRARY(messagebus_protobuf) + OWNER(g:messagebus) - -SRCS( - ybusbuf.cpp -) - -PEERDIR( + +SRCS( + ybusbuf.cpp +) + +PEERDIR( contrib/libs/protobuf library/cpp/messagebus library/cpp/messagebus/actor -) - -END() +) + +END() diff --git a/library/cpp/messagebus/protobuf/ybusbuf.cpp b/library/cpp/messagebus/protobuf/ybusbuf.cpp index e6fe79e2bd..63415b3737 100644 --- a/library/cpp/messagebus/protobuf/ybusbuf.cpp +++ b/library/cpp/messagebus/protobuf/ybusbuf.cpp @@ -1,88 +1,88 @@ #include "ybusbuf.h" - + #include <library/cpp/messagebus/actor/what_thread_does.h> #include <google/protobuf/io/coded_stream.h> - -using namespace NBus; - -TBusBufferProtocol::TBusBufferProtocol(TBusService name, int port) - : TBusProtocol(name, port) -{ -} - -TBusBufferProtocol::~TBusBufferProtocol() { + +using namespace NBus; + +TBusBufferProtocol::TBusBufferProtocol(TBusService name, int port) + : TBusProtocol(name, port) +{ +} + +TBusBufferProtocol::~TBusBufferProtocol() { for (auto& type : Types) { delete type; - } -} - -TBusBufferBase* TBusBufferProtocol::FindType(int type) { - for (unsigned i = 0; i < Types.size(); i++) { - if (Types[i]->GetHeader()->Type == type) { - return Types[i]; - } - } + } +} + +TBusBufferBase* TBusBufferProtocol::FindType(int type) { + for (unsigned i = 0; i < Types.size(); i++) { + if (Types[i]->GetHeader()->Type == type) { + return Types[i]; + } + } return nullptr; -} - -bool TBusBufferProtocol::IsRegisteredType(unsigned type) { - return TypeMask[type >> 5] & (1 << (type & ((1 << 5) - 1))); -} - -void TBusBufferProtocol::RegisterType(TAutoPtr<TBusBufferBase> mess) { - ui32 type = mess->GetHeader()->Type; - TypeMask[type >> 5] |= 1 << (type & ((1 << 5) - 1)); - - Types.push_back(mess.Release()); -} - +} + +bool TBusBufferProtocol::IsRegisteredType(unsigned type) { + return TypeMask[type >> 5] & (1 << (type & ((1 << 5) - 1))); +} + +void TBusBufferProtocol::RegisterType(TAutoPtr<TBusBufferBase> mess) { + ui32 type = mess->GetHeader()->Type; + TypeMask[type >> 5] |= 1 << (type & ((1 << 5) - 1)); + + Types.push_back(mess.Release()); +} + TArrayRef<TBusBufferBase* const> TBusBufferProtocol::GetTypes() const { - return Types; -} - + return Types; +} + void TBusBufferProtocol::Serialize(const TBusMessage* mess, TBuffer& data) { - TWhatThreadDoesPushPop pp("serialize protobuf message"); - + TWhatThreadDoesPushPop pp("serialize protobuf message"); + const TBusHeader* header = mess->GetHeader(); - - if (!IsRegisteredType(header->Type)) { + + if (!IsRegisteredType(header->Type)) { Y_FAIL("unknown message type: %d", int(header->Type)); - return; - } - - // cast the base from real message - const TBusBufferBase* bmess = CheckedCast<const TBusBufferBase*>(mess); - - unsigned size = bmess->GetRecord()->ByteSize(); - data.Reserve(data.Size() + size); - + return; + } + + // cast the base from real message + const TBusBufferBase* bmess = CheckedCast<const TBusBufferBase*>(mess); + + unsigned size = bmess->GetRecord()->ByteSize(); + data.Reserve(data.Size() + size); + char* after = (char*)bmess->GetRecord()->SerializeWithCachedSizesToArray((ui8*)data.Pos()); Y_VERIFY(after - data.Pos() == size); - - data.Advance(size); -} - + + data.Advance(size); +} + TAutoPtr<TBusMessage> TBusBufferProtocol::Deserialize(ui16 messageType, TArrayRef<const char> payload) { - TWhatThreadDoesPushPop pp("deserialize protobuf message"); - + TWhatThreadDoesPushPop pp("deserialize protobuf message"); + TBusBufferBase* messageTemplate = FindType(messageType); if (messageTemplate == nullptr) { return nullptr; //Y_FAIL("unknown message type: %d", unsigned(messageType)); - } - - // clone the base - TAutoPtr<TBusBufferBase> bmess = messageTemplate->New(); - + } + + // clone the base + TAutoPtr<TBusBufferBase> bmess = messageTemplate->New(); + // Need to override protobuf message size limit // NOTE: the payload size has already been checked against session MaxMessageSize google::protobuf::io::CodedInputStream input(reinterpret_cast<const ui8*>(payload.data()), payload.size()); input.SetTotalBytesLimit(payload.size()); bool ok = bmess->GetRecord()->ParseFromCodedStream(&input) && input.ConsumedEntireMessage(); - if (!ok) { + if (!ok) { return nullptr; - } - return bmess.Release(); -} + } + return bmess.Release(); +} diff --git a/library/cpp/messagebus/protobuf/ybusbuf.h b/library/cpp/messagebus/protobuf/ybusbuf.h index bdba972aa3..57b4267ea5 100644 --- a/library/cpp/messagebus/protobuf/ybusbuf.h +++ b/library/cpp/messagebus/protobuf/ybusbuf.h @@ -10,7 +10,7 @@ #include <util/stream/mem.h> #include <array> - + namespace NBus { using TBusBufferRecord = ::google::protobuf::Message; @@ -29,25 +29,25 @@ namespace NBus { : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) { } - + ui16 GetType() const { return GetHeader()->Type; } - + virtual TBusBufferRecord* GetRecord() const = 0; virtual TBusBufferBase* New() = 0; }; - + /////////////////////////////////////////////////////////////////// /// \brief Template for all messages that have protobuf description - + /// @param TBufferRecord is record described in .proto file with namespace /// @param MessageFile is offset for .proto file message ids /// \attention If you want one protocol NBus::TBusBufferProtocol to handle /// messageges described in different .proto files, make sure that they have /// unique values for MessageFile - + template <class TBufferRecord, int MType> class TBusBufferMessage: public TBusBufferBase { public: @@ -93,7 +93,7 @@ namespace NBus { class TBusBufferMessagePtrBase { public: typedef typename TBufferMessage::RecordType RecordType; - + private: TSelf* GetSelf() { return static_cast<TSelf*>(this); @@ -132,7 +132,7 @@ namespace NBus { class TBusBufferMessagePtr: public TBusBufferMessagePtrBase<TBusBufferMessagePtr<TBufferMessage>, TBufferMessage> { protected: TBufferMessage* Holder; - + public: TBusBufferMessagePtr(TBufferMessage* mess) : Holder(mess) @@ -147,14 +147,14 @@ namespace NBus { const TBufferMessage* Get() const { return Holder; } - + operator TBufferMessage*() { return Holder; } operator const TBufferMessage*() const { return Holder; } - + operator TAutoPtr<TBusMessage>() { TAutoPtr<TBusMessage> r(Holder); Holder = 0; @@ -166,12 +166,12 @@ namespace NBus { return r; } }; - + template <class TBufferMessage> class TBusBufferMessageAutoPtr: public TBusBufferMessagePtrBase<TBusBufferMessageAutoPtr<TBufferMessage>, TBufferMessage> { public: TAutoPtr<TBufferMessage> AutoPtr; - + public: TBusBufferMessageAutoPtr() { } @@ -186,11 +186,11 @@ namespace NBus { const TBufferMessage* Get() const { return AutoPtr.Get(); } - + TBufferMessage* Release() const { return AutoPtr.Release(); } - + operator TAutoPtr<TBusMessage>() { return AutoPtr.Release(); } @@ -201,22 +201,22 @@ namespace NBus { ///////////////////////////////////////////// /// \brief Generic protocol object for messages descibed with protobuf - + /// \attention If you mix messages in the same protocol from more than /// .proto file make sure that they have different MessageFile parameter /// in the NBus::TBusBufferMessage template - + class TBusBufferProtocol: public TBusProtocol { private: TVector<TBusBufferBase*> Types; std::array<ui32, ((1 << 16) >> 5)> TypeMask; - + TBusBufferBase* FindType(int type); bool IsRegisteredType(unsigned type); - + public: TBusBufferProtocol(TBusService name, int port); - + ~TBusBufferProtocol() override; /// register all the message that this protocol should handle |