aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/protobuf
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/protobuf
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/protobuf')
-rw-r--r--library/cpp/messagebus/protobuf/ya.make15
-rw-r--r--library/cpp/messagebus/protobuf/ybusbuf.cpp88
-rw-r--r--library/cpp/messagebus/protobuf/ybusbuf.h233
3 files changed, 336 insertions, 0 deletions
diff --git a/library/cpp/messagebus/protobuf/ya.make b/library/cpp/messagebus/protobuf/ya.make
new file mode 100644
index 0000000000..64ff240b51
--- /dev/null
+++ b/library/cpp/messagebus/protobuf/ya.make
@@ -0,0 +1,15 @@
+LIBRARY(messagebus_protobuf)
+
+OWNER(g:messagebus)
+
+SRCS(
+ ybusbuf.cpp
+)
+
+PEERDIR(
+ contrib/libs/protobuf
+ library/cpp/messagebus
+ library/cpp/messagebus/actor
+)
+
+END()
diff --git a/library/cpp/messagebus/protobuf/ybusbuf.cpp b/library/cpp/messagebus/protobuf/ybusbuf.cpp
new file mode 100644
index 0000000000..63415b3737
--- /dev/null
+++ b/library/cpp/messagebus/protobuf/ybusbuf.cpp
@@ -0,0 +1,88 @@
+#include "ybusbuf.h"
+
+#include <library/cpp/messagebus/actor/what_thread_does.h>
+
+#include <google/protobuf/io/coded_stream.h>
+
+using namespace NBus;
+
+TBusBufferProtocol::TBusBufferProtocol(TBusService name, int port)
+ : TBusProtocol(name, port)
+{
+}
+
+TBusBufferProtocol::~TBusBufferProtocol() {
+ for (auto& type : Types) {
+ delete type;
+ }
+}
+
+TBusBufferBase* TBusBufferProtocol::FindType(int type) {
+ for (unsigned i = 0; i < Types.size(); i++) {
+ if (Types[i]->GetHeader()->Type == type) {
+ return Types[i];
+ }
+ }
+ return nullptr;
+}
+
+bool TBusBufferProtocol::IsRegisteredType(unsigned type) {
+ return TypeMask[type >> 5] & (1 << (type & ((1 << 5) - 1)));
+}
+
+void TBusBufferProtocol::RegisterType(TAutoPtr<TBusBufferBase> mess) {
+ ui32 type = mess->GetHeader()->Type;
+ TypeMask[type >> 5] |= 1 << (type & ((1 << 5) - 1));
+
+ Types.push_back(mess.Release());
+}
+
+TArrayRef<TBusBufferBase* const> TBusBufferProtocol::GetTypes() const {
+ return Types;
+}
+
+void TBusBufferProtocol::Serialize(const TBusMessage* mess, TBuffer& data) {
+ TWhatThreadDoesPushPop pp("serialize protobuf message");
+
+ const TBusHeader* header = mess->GetHeader();
+
+ if (!IsRegisteredType(header->Type)) {
+ Y_FAIL("unknown message type: %d", int(header->Type));
+ return;
+ }
+
+ // cast the base from real message
+ const TBusBufferBase* bmess = CheckedCast<const TBusBufferBase*>(mess);
+
+ unsigned size = bmess->GetRecord()->ByteSize();
+ data.Reserve(data.Size() + size);
+
+ char* after = (char*)bmess->GetRecord()->SerializeWithCachedSizesToArray((ui8*)data.Pos());
+ Y_VERIFY(after - data.Pos() == size);
+
+ data.Advance(size);
+}
+
+TAutoPtr<TBusMessage> TBusBufferProtocol::Deserialize(ui16 messageType, TArrayRef<const char> payload) {
+ TWhatThreadDoesPushPop pp("deserialize protobuf message");
+
+ TBusBufferBase* messageTemplate = FindType(messageType);
+ if (messageTemplate == nullptr) {
+ return nullptr;
+ //Y_FAIL("unknown message type: %d", unsigned(messageType));
+ }
+
+ // clone the base
+ TAutoPtr<TBusBufferBase> bmess = messageTemplate->New();
+
+ // Need to override protobuf message size limit
+ // NOTE: the payload size has already been checked against session MaxMessageSize
+ google::protobuf::io::CodedInputStream input(reinterpret_cast<const ui8*>(payload.data()), payload.size());
+ input.SetTotalBytesLimit(payload.size());
+
+ bool ok = bmess->GetRecord()->ParseFromCodedStream(&input) && input.ConsumedEntireMessage();
+ if (!ok) {
+ return nullptr;
+ }
+ return bmess.Release();
+}
diff --git a/library/cpp/messagebus/protobuf/ybusbuf.h b/library/cpp/messagebus/protobuf/ybusbuf.h
new file mode 100644
index 0000000000..57b4267ea5
--- /dev/null
+++ b/library/cpp/messagebus/protobuf/ybusbuf.h
@@ -0,0 +1,233 @@
+#pragma once
+
+#include <library/cpp/messagebus/ybus.h>
+
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/message.h>
+
+#include <util/generic/cast.h>
+#include <util/generic/vector.h>
+#include <util/stream/mem.h>
+
+#include <array>
+
+namespace NBus {
+ using TBusBufferRecord = ::google::protobuf::Message;
+
+ template <class TBufferMessage>
+ class TBusBufferMessagePtr;
+ template <class TBufferMessage>
+ class TBusBufferMessageAutoPtr;
+
+ class TBusBufferBase: public TBusMessage {
+ public:
+ TBusBufferBase(int type)
+ : TBusMessage((ui16)type)
+ {
+ }
+ TBusBufferBase(ECreateUninitialized)
+ : TBusMessage(MESSAGE_CREATE_UNINITIALIZED)
+ {
+ }
+
+ ui16 GetType() const {
+ return GetHeader()->Type;
+ }
+
+ virtual TBusBufferRecord* GetRecord() const = 0;
+ virtual TBusBufferBase* New() = 0;
+ };
+
+ ///////////////////////////////////////////////////////////////////
+ /// \brief Template for all messages that have protobuf description
+
+ /// @param TBufferRecord is record described in .proto file with namespace
+ /// @param MessageFile is offset for .proto file message ids
+
+ /// \attention If you want one protocol NBus::TBusBufferProtocol to handle
+ /// messageges described in different .proto files, make sure that they have
+ /// unique values for MessageFile
+
+ template <class TBufferRecord, int MType>
+ class TBusBufferMessage: public TBusBufferBase {
+ public:
+ static const int MessageType = MType;
+
+ typedef TBusBufferMessagePtr<TBusBufferMessage<TBufferRecord, MType>> TPtr;
+ typedef TBusBufferMessageAutoPtr<TBusBufferMessage<TBufferRecord, MType>> TAutoPtr;
+
+ public:
+ typedef TBufferRecord RecordType;
+ TBufferRecord Record;
+
+ public:
+ TBusBufferMessage()
+ : TBusBufferBase(MessageType)
+ {
+ }
+ TBusBufferMessage(ECreateUninitialized)
+ : TBusBufferBase(MESSAGE_CREATE_UNINITIALIZED)
+ {
+ }
+ explicit TBusBufferMessage(const TBufferRecord& record)
+ : TBusBufferBase(MessageType)
+ , Record(record)
+ {
+ }
+ explicit TBusBufferMessage(TBufferRecord&& record)
+ : TBusBufferBase(MessageType)
+ , Record(std::move(record))
+ {
+ }
+
+ public:
+ TBusBufferRecord* GetRecord() const override {
+ return (TBusBufferRecord*)&Record;
+ }
+ TBusBufferBase* New() override {
+ return new TBusBufferMessage<TBufferRecord, MessageType>();
+ }
+ };
+
+ template <class TSelf, class TBufferMessage>
+ class TBusBufferMessagePtrBase {
+ public:
+ typedef typename TBufferMessage::RecordType RecordType;
+
+ private:
+ TSelf* GetSelf() {
+ return static_cast<TSelf*>(this);
+ }
+ const TSelf* GetSelf() const {
+ return static_cast<const TSelf*>(this);
+ }
+
+ public:
+ RecordType* operator->() {
+ Y_ASSERT(GetSelf()->Get());
+ return &(GetSelf()->Get()->Record);
+ }
+ const RecordType* operator->() const {
+ Y_ASSERT(GetSelf()->Get());
+ return &(GetSelf()->Get()->Record);
+ }
+ RecordType& operator*() {
+ Y_ASSERT(GetSelf()->Get());
+ return GetSelf()->Get()->Record;
+ }
+ const RecordType& operator*() const {
+ Y_ASSERT(GetSelf()->Get());
+ return GetSelf()->Get()->Record;
+ }
+
+ TBusHeader* GetHeader() {
+ return GetSelf()->Get()->GetHeader();
+ }
+ const TBusHeader* GetHeader() const {
+ return GetSelf()->Get()->GetHeader();
+ }
+ };
+
+ template <class TBufferMessage>
+ class TBusBufferMessagePtr: public TBusBufferMessagePtrBase<TBusBufferMessagePtr<TBufferMessage>, TBufferMessage> {
+ protected:
+ TBufferMessage* Holder;
+
+ public:
+ TBusBufferMessagePtr(TBufferMessage* mess)
+ : Holder(mess)
+ {
+ }
+ static TBusBufferMessagePtr<TBufferMessage> DynamicCast(TBusMessage* message) {
+ return dynamic_cast<TBufferMessage*>(message);
+ }
+ TBufferMessage* Get() {
+ return Holder;
+ }
+ const TBufferMessage* Get() const {
+ return Holder;
+ }
+
+ operator TBufferMessage*() {
+ return Holder;
+ }
+ operator const TBufferMessage*() const {
+ return Holder;
+ }
+
+ operator TAutoPtr<TBusMessage>() {
+ TAutoPtr<TBusMessage> r(Holder);
+ Holder = 0;
+ return r;
+ }
+ operator TBusMessageAutoPtr() {
+ TBusMessageAutoPtr r(Holder);
+ Holder = nullptr;
+ return r;
+ }
+ };
+
+ template <class TBufferMessage>
+ class TBusBufferMessageAutoPtr: public TBusBufferMessagePtrBase<TBusBufferMessageAutoPtr<TBufferMessage>, TBufferMessage> {
+ public:
+ TAutoPtr<TBufferMessage> AutoPtr;
+
+ public:
+ TBusBufferMessageAutoPtr() {
+ }
+ TBusBufferMessageAutoPtr(TBufferMessage* message)
+ : AutoPtr(message)
+ {
+ }
+
+ TBufferMessage* Get() {
+ return AutoPtr.Get();
+ }
+ const TBufferMessage* Get() const {
+ return AutoPtr.Get();
+ }
+
+ TBufferMessage* Release() const {
+ return AutoPtr.Release();
+ }
+
+ operator TAutoPtr<TBusMessage>() {
+ return AutoPtr.Release();
+ }
+ operator TBusMessageAutoPtr() {
+ return AutoPtr.Release();
+ }
+ };
+
+ /////////////////////////////////////////////
+ /// \brief Generic protocol object for messages descibed with protobuf
+
+ /// \attention If you mix messages in the same protocol from more than
+ /// .proto file make sure that they have different MessageFile parameter
+ /// in the NBus::TBusBufferMessage template
+
+ class TBusBufferProtocol: public TBusProtocol {
+ private:
+ TVector<TBusBufferBase*> Types;
+ std::array<ui32, ((1 << 16) >> 5)> TypeMask;
+
+ TBusBufferBase* FindType(int type);
+ bool IsRegisteredType(unsigned type);
+
+ public:
+ TBusBufferProtocol(TBusService name, int port);
+
+ ~TBusBufferProtocol() override;
+
+ /// register all the message that this protocol should handle
+ void RegisterType(TAutoPtr<TBusBufferBase> mess);
+
+ TArrayRef<TBusBufferBase* const> GetTypes() const;
+
+ /// serialized protocol specific data into TBusData
+ void Serialize(const TBusMessage* mess, TBuffer& data) override;
+
+ TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override;
+ };
+
+}