aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/protobuf
diff options
context:
space:
mode:
authornga <nga@yandex-team.ru>2022-02-10 16:48:09 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:09 +0300
commitc2a1af049e9deca890e9923abe64fe6c59060348 (patch)
treeb222e5ac2e2e98872661c51ccceee5da0d291e13 /library/cpp/messagebus/protobuf
parent1f553f46fb4f3c5eec631352cdd900a0709016af (diff)
downloadydb-c2a1af049e9deca890e9923abe64fe6c59060348.tar.gz
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/protobuf')
-rw-r--r--library/cpp/messagebus/protobuf/ya.make22
-rw-r--r--library/cpp/messagebus/protobuf/ybusbuf.cpp126
-rw-r--r--library/cpp/messagebus/protobuf/ybusbuf.h38
3 files changed, 93 insertions, 93 deletions
diff --git a/library/cpp/messagebus/protobuf/ya.make b/library/cpp/messagebus/protobuf/ya.make
index 35649705fe..64ff240b51 100644
--- a/library/cpp/messagebus/protobuf/ya.make
+++ b/library/cpp/messagebus/protobuf/ya.make
@@ -1,15 +1,15 @@
-LIBRARY(messagebus_protobuf)
-
+LIBRARY(messagebus_protobuf)
+
OWNER(g:messagebus)
-
-SRCS(
- ybusbuf.cpp
-)
-
-PEERDIR(
+
+SRCS(
+ ybusbuf.cpp
+)
+
+PEERDIR(
contrib/libs/protobuf
library/cpp/messagebus
library/cpp/messagebus/actor
-)
-
-END()
+)
+
+END()
diff --git a/library/cpp/messagebus/protobuf/ybusbuf.cpp b/library/cpp/messagebus/protobuf/ybusbuf.cpp
index e6fe79e2bd..63415b3737 100644
--- a/library/cpp/messagebus/protobuf/ybusbuf.cpp
+++ b/library/cpp/messagebus/protobuf/ybusbuf.cpp
@@ -1,88 +1,88 @@
#include "ybusbuf.h"
-
+
#include <library/cpp/messagebus/actor/what_thread_does.h>
#include <google/protobuf/io/coded_stream.h>
-
-using namespace NBus;
-
-TBusBufferProtocol::TBusBufferProtocol(TBusService name, int port)
- : TBusProtocol(name, port)
-{
-}
-
-TBusBufferProtocol::~TBusBufferProtocol() {
+
+using namespace NBus;
+
+TBusBufferProtocol::TBusBufferProtocol(TBusService name, int port)
+ : TBusProtocol(name, port)
+{
+}
+
+TBusBufferProtocol::~TBusBufferProtocol() {
for (auto& type : Types) {
delete type;
- }
-}
-
-TBusBufferBase* TBusBufferProtocol::FindType(int type) {
- for (unsigned i = 0; i < Types.size(); i++) {
- if (Types[i]->GetHeader()->Type == type) {
- return Types[i];
- }
- }
+ }
+}
+
+TBusBufferBase* TBusBufferProtocol::FindType(int type) {
+ for (unsigned i = 0; i < Types.size(); i++) {
+ if (Types[i]->GetHeader()->Type == type) {
+ return Types[i];
+ }
+ }
return nullptr;
-}
-
-bool TBusBufferProtocol::IsRegisteredType(unsigned type) {
- return TypeMask[type >> 5] & (1 << (type & ((1 << 5) - 1)));
-}
-
-void TBusBufferProtocol::RegisterType(TAutoPtr<TBusBufferBase> mess) {
- ui32 type = mess->GetHeader()->Type;
- TypeMask[type >> 5] |= 1 << (type & ((1 << 5) - 1));
-
- Types.push_back(mess.Release());
-}
-
+}
+
+bool TBusBufferProtocol::IsRegisteredType(unsigned type) {
+ return TypeMask[type >> 5] & (1 << (type & ((1 << 5) - 1)));
+}
+
+void TBusBufferProtocol::RegisterType(TAutoPtr<TBusBufferBase> mess) {
+ ui32 type = mess->GetHeader()->Type;
+ TypeMask[type >> 5] |= 1 << (type & ((1 << 5) - 1));
+
+ Types.push_back(mess.Release());
+}
+
TArrayRef<TBusBufferBase* const> TBusBufferProtocol::GetTypes() const {
- return Types;
-}
-
+ return Types;
+}
+
void TBusBufferProtocol::Serialize(const TBusMessage* mess, TBuffer& data) {
- TWhatThreadDoesPushPop pp("serialize protobuf message");
-
+ TWhatThreadDoesPushPop pp("serialize protobuf message");
+
const TBusHeader* header = mess->GetHeader();
-
- if (!IsRegisteredType(header->Type)) {
+
+ if (!IsRegisteredType(header->Type)) {
Y_FAIL("unknown message type: %d", int(header->Type));
- return;
- }
-
- // cast the base from real message
- const TBusBufferBase* bmess = CheckedCast<const TBusBufferBase*>(mess);
-
- unsigned size = bmess->GetRecord()->ByteSize();
- data.Reserve(data.Size() + size);
-
+ return;
+ }
+
+ // cast the base from real message
+ const TBusBufferBase* bmess = CheckedCast<const TBusBufferBase*>(mess);
+
+ unsigned size = bmess->GetRecord()->ByteSize();
+ data.Reserve(data.Size() + size);
+
char* after = (char*)bmess->GetRecord()->SerializeWithCachedSizesToArray((ui8*)data.Pos());
Y_VERIFY(after - data.Pos() == size);
-
- data.Advance(size);
-}
-
+
+ data.Advance(size);
+}
+
TAutoPtr<TBusMessage> TBusBufferProtocol::Deserialize(ui16 messageType, TArrayRef<const char> payload) {
- TWhatThreadDoesPushPop pp("deserialize protobuf message");
-
+ TWhatThreadDoesPushPop pp("deserialize protobuf message");
+
TBusBufferBase* messageTemplate = FindType(messageType);
if (messageTemplate == nullptr) {
return nullptr;
//Y_FAIL("unknown message type: %d", unsigned(messageType));
- }
-
- // clone the base
- TAutoPtr<TBusBufferBase> bmess = messageTemplate->New();
-
+ }
+
+ // clone the base
+ TAutoPtr<TBusBufferBase> bmess = messageTemplate->New();
+
// Need to override protobuf message size limit
// NOTE: the payload size has already been checked against session MaxMessageSize
google::protobuf::io::CodedInputStream input(reinterpret_cast<const ui8*>(payload.data()), payload.size());
input.SetTotalBytesLimit(payload.size());
bool ok = bmess->GetRecord()->ParseFromCodedStream(&input) && input.ConsumedEntireMessage();
- if (!ok) {
+ if (!ok) {
return nullptr;
- }
- return bmess.Release();
-}
+ }
+ return bmess.Release();
+}
diff --git a/library/cpp/messagebus/protobuf/ybusbuf.h b/library/cpp/messagebus/protobuf/ybusbuf.h
index bdba972aa3..57b4267ea5 100644
--- a/library/cpp/messagebus/protobuf/ybusbuf.h
+++ b/library/cpp/messagebus/protobuf/ybusbuf.h
@@ -10,7 +10,7 @@
#include <util/stream/mem.h>
#include <array>
-
+
namespace NBus {
using TBusBufferRecord = ::google::protobuf::Message;
@@ -29,25 +29,25 @@ namespace NBus {
: TBusMessage(MESSAGE_CREATE_UNINITIALIZED)
{
}
-
+
ui16 GetType() const {
return GetHeader()->Type;
}
-
+
virtual TBusBufferRecord* GetRecord() const = 0;
virtual TBusBufferBase* New() = 0;
};
-
+
///////////////////////////////////////////////////////////////////
/// \brief Template for all messages that have protobuf description
-
+
/// @param TBufferRecord is record described in .proto file with namespace
/// @param MessageFile is offset for .proto file message ids
/// \attention If you want one protocol NBus::TBusBufferProtocol to handle
/// messageges described in different .proto files, make sure that they have
/// unique values for MessageFile
-
+
template <class TBufferRecord, int MType>
class TBusBufferMessage: public TBusBufferBase {
public:
@@ -93,7 +93,7 @@ namespace NBus {
class TBusBufferMessagePtrBase {
public:
typedef typename TBufferMessage::RecordType RecordType;
-
+
private:
TSelf* GetSelf() {
return static_cast<TSelf*>(this);
@@ -132,7 +132,7 @@ namespace NBus {
class TBusBufferMessagePtr: public TBusBufferMessagePtrBase<TBusBufferMessagePtr<TBufferMessage>, TBufferMessage> {
protected:
TBufferMessage* Holder;
-
+
public:
TBusBufferMessagePtr(TBufferMessage* mess)
: Holder(mess)
@@ -147,14 +147,14 @@ namespace NBus {
const TBufferMessage* Get() const {
return Holder;
}
-
+
operator TBufferMessage*() {
return Holder;
}
operator const TBufferMessage*() const {
return Holder;
}
-
+
operator TAutoPtr<TBusMessage>() {
TAutoPtr<TBusMessage> r(Holder);
Holder = 0;
@@ -166,12 +166,12 @@ namespace NBus {
return r;
}
};
-
+
template <class TBufferMessage>
class TBusBufferMessageAutoPtr: public TBusBufferMessagePtrBase<TBusBufferMessageAutoPtr<TBufferMessage>, TBufferMessage> {
public:
TAutoPtr<TBufferMessage> AutoPtr;
-
+
public:
TBusBufferMessageAutoPtr() {
}
@@ -186,11 +186,11 @@ namespace NBus {
const TBufferMessage* Get() const {
return AutoPtr.Get();
}
-
+
TBufferMessage* Release() const {
return AutoPtr.Release();
}
-
+
operator TAutoPtr<TBusMessage>() {
return AutoPtr.Release();
}
@@ -201,22 +201,22 @@ namespace NBus {
/////////////////////////////////////////////
/// \brief Generic protocol object for messages descibed with protobuf
-
+
/// \attention If you mix messages in the same protocol from more than
/// .proto file make sure that they have different MessageFile parameter
/// in the NBus::TBusBufferMessage template
-
+
class TBusBufferProtocol: public TBusProtocol {
private:
TVector<TBusBufferBase*> Types;
std::array<ui32, ((1 << 16) >> 5)> TypeMask;
-
+
TBusBufferBase* FindType(int type);
bool IsRegisteredType(unsigned type);
-
+
public:
TBusBufferProtocol(TBusService name, int port);
-
+
~TBusBufferProtocol() override;
/// register all the message that this protocol should handle