diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/protobuf | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/protobuf')
-rw-r--r-- | library/cpp/messagebus/protobuf/ya.make | 15 | ||||
-rw-r--r-- | library/cpp/messagebus/protobuf/ybusbuf.cpp | 88 | ||||
-rw-r--r-- | library/cpp/messagebus/protobuf/ybusbuf.h | 233 |
3 files changed, 336 insertions, 0 deletions
diff --git a/library/cpp/messagebus/protobuf/ya.make b/library/cpp/messagebus/protobuf/ya.make new file mode 100644 index 0000000000..64ff240b51 --- /dev/null +++ b/library/cpp/messagebus/protobuf/ya.make @@ -0,0 +1,15 @@ +LIBRARY(messagebus_protobuf) + +OWNER(g:messagebus) + +SRCS( + ybusbuf.cpp +) + +PEERDIR( + contrib/libs/protobuf + library/cpp/messagebus + library/cpp/messagebus/actor +) + +END() diff --git a/library/cpp/messagebus/protobuf/ybusbuf.cpp b/library/cpp/messagebus/protobuf/ybusbuf.cpp new file mode 100644 index 0000000000..63415b3737 --- /dev/null +++ b/library/cpp/messagebus/protobuf/ybusbuf.cpp @@ -0,0 +1,88 @@ +#include "ybusbuf.h" + +#include <library/cpp/messagebus/actor/what_thread_does.h> + +#include <google/protobuf/io/coded_stream.h> + +using namespace NBus; + +TBusBufferProtocol::TBusBufferProtocol(TBusService name, int port) + : TBusProtocol(name, port) +{ +} + +TBusBufferProtocol::~TBusBufferProtocol() { + for (auto& type : Types) { + delete type; + } +} + +TBusBufferBase* TBusBufferProtocol::FindType(int type) { + for (unsigned i = 0; i < Types.size(); i++) { + if (Types[i]->GetHeader()->Type == type) { + return Types[i]; + } + } + return nullptr; +} + +bool TBusBufferProtocol::IsRegisteredType(unsigned type) { + return TypeMask[type >> 5] & (1 << (type & ((1 << 5) - 1))); +} + +void TBusBufferProtocol::RegisterType(TAutoPtr<TBusBufferBase> mess) { + ui32 type = mess->GetHeader()->Type; + TypeMask[type >> 5] |= 1 << (type & ((1 << 5) - 1)); + + Types.push_back(mess.Release()); +} + +TArrayRef<TBusBufferBase* const> TBusBufferProtocol::GetTypes() const { + return Types; +} + +void TBusBufferProtocol::Serialize(const TBusMessage* mess, TBuffer& data) { + TWhatThreadDoesPushPop pp("serialize protobuf message"); + + const TBusHeader* header = mess->GetHeader(); + + if (!IsRegisteredType(header->Type)) { + Y_FAIL("unknown message type: %d", int(header->Type)); + return; + } + + // cast the base from real message + const TBusBufferBase* bmess = CheckedCast<const TBusBufferBase*>(mess); + + unsigned size = bmess->GetRecord()->ByteSize(); + data.Reserve(data.Size() + size); + + char* after = (char*)bmess->GetRecord()->SerializeWithCachedSizesToArray((ui8*)data.Pos()); + Y_VERIFY(after - data.Pos() == size); + + data.Advance(size); +} + +TAutoPtr<TBusMessage> TBusBufferProtocol::Deserialize(ui16 messageType, TArrayRef<const char> payload) { + TWhatThreadDoesPushPop pp("deserialize protobuf message"); + + TBusBufferBase* messageTemplate = FindType(messageType); + if (messageTemplate == nullptr) { + return nullptr; + //Y_FAIL("unknown message type: %d", unsigned(messageType)); + } + + // clone the base + TAutoPtr<TBusBufferBase> bmess = messageTemplate->New(); + + // Need to override protobuf message size limit + // NOTE: the payload size has already been checked against session MaxMessageSize + google::protobuf::io::CodedInputStream input(reinterpret_cast<const ui8*>(payload.data()), payload.size()); + input.SetTotalBytesLimit(payload.size()); + + bool ok = bmess->GetRecord()->ParseFromCodedStream(&input) && input.ConsumedEntireMessage(); + if (!ok) { + return nullptr; + } + return bmess.Release(); +} diff --git a/library/cpp/messagebus/protobuf/ybusbuf.h b/library/cpp/messagebus/protobuf/ybusbuf.h new file mode 100644 index 0000000000..57b4267ea5 --- /dev/null +++ b/library/cpp/messagebus/protobuf/ybusbuf.h @@ -0,0 +1,233 @@ +#pragma once + +#include <library/cpp/messagebus/ybus.h> + +#include <google/protobuf/descriptor.h> +#include <google/protobuf/message.h> + +#include <util/generic/cast.h> +#include <util/generic/vector.h> +#include <util/stream/mem.h> + +#include <array> + +namespace NBus { + using TBusBufferRecord = ::google::protobuf::Message; + + template <class TBufferMessage> + class TBusBufferMessagePtr; + template <class TBufferMessage> + class TBusBufferMessageAutoPtr; + + class TBusBufferBase: public TBusMessage { + public: + TBusBufferBase(int type) + : TBusMessage((ui16)type) + { + } + TBusBufferBase(ECreateUninitialized) + : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) + { + } + + ui16 GetType() const { + return GetHeader()->Type; + } + + virtual TBusBufferRecord* GetRecord() const = 0; + virtual TBusBufferBase* New() = 0; + }; + + /////////////////////////////////////////////////////////////////// + /// \brief Template for all messages that have protobuf description + + /// @param TBufferRecord is record described in .proto file with namespace + /// @param MessageFile is offset for .proto file message ids + + /// \attention If you want one protocol NBus::TBusBufferProtocol to handle + /// messageges described in different .proto files, make sure that they have + /// unique values for MessageFile + + template <class TBufferRecord, int MType> + class TBusBufferMessage: public TBusBufferBase { + public: + static const int MessageType = MType; + + typedef TBusBufferMessagePtr<TBusBufferMessage<TBufferRecord, MType>> TPtr; + typedef TBusBufferMessageAutoPtr<TBusBufferMessage<TBufferRecord, MType>> TAutoPtr; + + public: + typedef TBufferRecord RecordType; + TBufferRecord Record; + + public: + TBusBufferMessage() + : TBusBufferBase(MessageType) + { + } + TBusBufferMessage(ECreateUninitialized) + : TBusBufferBase(MESSAGE_CREATE_UNINITIALIZED) + { + } + explicit TBusBufferMessage(const TBufferRecord& record) + : TBusBufferBase(MessageType) + , Record(record) + { + } + explicit TBusBufferMessage(TBufferRecord&& record) + : TBusBufferBase(MessageType) + , Record(std::move(record)) + { + } + + public: + TBusBufferRecord* GetRecord() const override { + return (TBusBufferRecord*)&Record; + } + TBusBufferBase* New() override { + return new TBusBufferMessage<TBufferRecord, MessageType>(); + } + }; + + template <class TSelf, class TBufferMessage> + class TBusBufferMessagePtrBase { + public: + typedef typename TBufferMessage::RecordType RecordType; + + private: + TSelf* GetSelf() { + return static_cast<TSelf*>(this); + } + const TSelf* GetSelf() const { + return static_cast<const TSelf*>(this); + } + + public: + RecordType* operator->() { + Y_ASSERT(GetSelf()->Get()); + return &(GetSelf()->Get()->Record); + } + const RecordType* operator->() const { + Y_ASSERT(GetSelf()->Get()); + return &(GetSelf()->Get()->Record); + } + RecordType& operator*() { + Y_ASSERT(GetSelf()->Get()); + return GetSelf()->Get()->Record; + } + const RecordType& operator*() const { + Y_ASSERT(GetSelf()->Get()); + return GetSelf()->Get()->Record; + } + + TBusHeader* GetHeader() { + return GetSelf()->Get()->GetHeader(); + } + const TBusHeader* GetHeader() const { + return GetSelf()->Get()->GetHeader(); + } + }; + + template <class TBufferMessage> + class TBusBufferMessagePtr: public TBusBufferMessagePtrBase<TBusBufferMessagePtr<TBufferMessage>, TBufferMessage> { + protected: + TBufferMessage* Holder; + + public: + TBusBufferMessagePtr(TBufferMessage* mess) + : Holder(mess) + { + } + static TBusBufferMessagePtr<TBufferMessage> DynamicCast(TBusMessage* message) { + return dynamic_cast<TBufferMessage*>(message); + } + TBufferMessage* Get() { + return Holder; + } + const TBufferMessage* Get() const { + return Holder; + } + + operator TBufferMessage*() { + return Holder; + } + operator const TBufferMessage*() const { + return Holder; + } + + operator TAutoPtr<TBusMessage>() { + TAutoPtr<TBusMessage> r(Holder); + Holder = 0; + return r; + } + operator TBusMessageAutoPtr() { + TBusMessageAutoPtr r(Holder); + Holder = nullptr; + return r; + } + }; + + template <class TBufferMessage> + class TBusBufferMessageAutoPtr: public TBusBufferMessagePtrBase<TBusBufferMessageAutoPtr<TBufferMessage>, TBufferMessage> { + public: + TAutoPtr<TBufferMessage> AutoPtr; + + public: + TBusBufferMessageAutoPtr() { + } + TBusBufferMessageAutoPtr(TBufferMessage* message) + : AutoPtr(message) + { + } + + TBufferMessage* Get() { + return AutoPtr.Get(); + } + const TBufferMessage* Get() const { + return AutoPtr.Get(); + } + + TBufferMessage* Release() const { + return AutoPtr.Release(); + } + + operator TAutoPtr<TBusMessage>() { + return AutoPtr.Release(); + } + operator TBusMessageAutoPtr() { + return AutoPtr.Release(); + } + }; + + ///////////////////////////////////////////// + /// \brief Generic protocol object for messages descibed with protobuf + + /// \attention If you mix messages in the same protocol from more than + /// .proto file make sure that they have different MessageFile parameter + /// in the NBus::TBusBufferMessage template + + class TBusBufferProtocol: public TBusProtocol { + private: + TVector<TBusBufferBase*> Types; + std::array<ui32, ((1 << 16) >> 5)> TypeMask; + + TBusBufferBase* FindType(int type); + bool IsRegisteredType(unsigned type); + + public: + TBusBufferProtocol(TBusService name, int port); + + ~TBusBufferProtocol() override; + + /// register all the message that this protocol should handle + void RegisterType(TAutoPtr<TBusBufferBase> mess); + + TArrayRef<TBusBufferBase* const> GetTypes() const; + + /// serialized protocol specific data into TBusData + void Serialize(const TBusMessage* mess, TBuffer& data) override; + + TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override; + }; + +} |