aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/protobuf/ybusbuf.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/protobuf/ybusbuf.cpp
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/protobuf/ybusbuf.cpp')
-rw-r--r--library/cpp/messagebus/protobuf/ybusbuf.cpp88
1 files changed, 88 insertions, 0 deletions
diff --git a/library/cpp/messagebus/protobuf/ybusbuf.cpp b/library/cpp/messagebus/protobuf/ybusbuf.cpp
new file mode 100644
index 0000000000..63415b3737
--- /dev/null
+++ b/library/cpp/messagebus/protobuf/ybusbuf.cpp
@@ -0,0 +1,88 @@
+#include "ybusbuf.h"
+
+#include <library/cpp/messagebus/actor/what_thread_does.h>
+
+#include <google/protobuf/io/coded_stream.h>
+
+using namespace NBus;
+
+TBusBufferProtocol::TBusBufferProtocol(TBusService name, int port)
+ : TBusProtocol(name, port)
+{
+}
+
+TBusBufferProtocol::~TBusBufferProtocol() {
+ for (auto& type : Types) {
+ delete type;
+ }
+}
+
+TBusBufferBase* TBusBufferProtocol::FindType(int type) {
+ for (unsigned i = 0; i < Types.size(); i++) {
+ if (Types[i]->GetHeader()->Type == type) {
+ return Types[i];
+ }
+ }
+ return nullptr;
+}
+
+bool TBusBufferProtocol::IsRegisteredType(unsigned type) {
+ return TypeMask[type >> 5] & (1 << (type & ((1 << 5) - 1)));
+}
+
+void TBusBufferProtocol::RegisterType(TAutoPtr<TBusBufferBase> mess) {
+ ui32 type = mess->GetHeader()->Type;
+ TypeMask[type >> 5] |= 1 << (type & ((1 << 5) - 1));
+
+ Types.push_back(mess.Release());
+}
+
+TArrayRef<TBusBufferBase* const> TBusBufferProtocol::GetTypes() const {
+ return Types;
+}
+
+void TBusBufferProtocol::Serialize(const TBusMessage* mess, TBuffer& data) {
+ TWhatThreadDoesPushPop pp("serialize protobuf message");
+
+ const TBusHeader* header = mess->GetHeader();
+
+ if (!IsRegisteredType(header->Type)) {
+ Y_FAIL("unknown message type: %d", int(header->Type));
+ return;
+ }
+
+ // cast the base from real message
+ const TBusBufferBase* bmess = CheckedCast<const TBusBufferBase*>(mess);
+
+ unsigned size = bmess->GetRecord()->ByteSize();
+ data.Reserve(data.Size() + size);
+
+ char* after = (char*)bmess->GetRecord()->SerializeWithCachedSizesToArray((ui8*)data.Pos());
+ Y_VERIFY(after - data.Pos() == size);
+
+ data.Advance(size);
+}
+
+TAutoPtr<TBusMessage> TBusBufferProtocol::Deserialize(ui16 messageType, TArrayRef<const char> payload) {
+ TWhatThreadDoesPushPop pp("deserialize protobuf message");
+
+ TBusBufferBase* messageTemplate = FindType(messageType);
+ if (messageTemplate == nullptr) {
+ return nullptr;
+ //Y_FAIL("unknown message type: %d", unsigned(messageType));
+ }
+
+ // clone the base
+ TAutoPtr<TBusBufferBase> bmess = messageTemplate->New();
+
+ // Need to override protobuf message size limit
+ // NOTE: the payload size has already been checked against session MaxMessageSize
+ google::protobuf::io::CodedInputStream input(reinterpret_cast<const ui8*>(payload.data()), payload.size());
+ input.SetTotalBytesLimit(payload.size());
+
+ bool ok = bmess->GetRecord()->ParseFromCodedStream(&input) && input.ConsumedEntireMessage();
+ if (!ok) {
+ return nullptr;
+ }
+ return bmess.Release();
+}