diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/protobuf/ybusbuf.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/protobuf/ybusbuf.cpp')
-rw-r--r-- | library/cpp/messagebus/protobuf/ybusbuf.cpp | 88 |
1 files changed, 88 insertions, 0 deletions
diff --git a/library/cpp/messagebus/protobuf/ybusbuf.cpp b/library/cpp/messagebus/protobuf/ybusbuf.cpp new file mode 100644 index 0000000000..63415b3737 --- /dev/null +++ b/library/cpp/messagebus/protobuf/ybusbuf.cpp @@ -0,0 +1,88 @@ +#include "ybusbuf.h" + +#include <library/cpp/messagebus/actor/what_thread_does.h> + +#include <google/protobuf/io/coded_stream.h> + +using namespace NBus; + +TBusBufferProtocol::TBusBufferProtocol(TBusService name, int port) + : TBusProtocol(name, port) +{ +} + +TBusBufferProtocol::~TBusBufferProtocol() { + for (auto& type : Types) { + delete type; + } +} + +TBusBufferBase* TBusBufferProtocol::FindType(int type) { + for (unsigned i = 0; i < Types.size(); i++) { + if (Types[i]->GetHeader()->Type == type) { + return Types[i]; + } + } + return nullptr; +} + +bool TBusBufferProtocol::IsRegisteredType(unsigned type) { + return TypeMask[type >> 5] & (1 << (type & ((1 << 5) - 1))); +} + +void TBusBufferProtocol::RegisterType(TAutoPtr<TBusBufferBase> mess) { + ui32 type = mess->GetHeader()->Type; + TypeMask[type >> 5] |= 1 << (type & ((1 << 5) - 1)); + + Types.push_back(mess.Release()); +} + +TArrayRef<TBusBufferBase* const> TBusBufferProtocol::GetTypes() const { + return Types; +} + +void TBusBufferProtocol::Serialize(const TBusMessage* mess, TBuffer& data) { + TWhatThreadDoesPushPop pp("serialize protobuf message"); + + const TBusHeader* header = mess->GetHeader(); + + if (!IsRegisteredType(header->Type)) { + Y_FAIL("unknown message type: %d", int(header->Type)); + return; + } + + // cast the base from real message + const TBusBufferBase* bmess = CheckedCast<const TBusBufferBase*>(mess); + + unsigned size = bmess->GetRecord()->ByteSize(); + data.Reserve(data.Size() + size); + + char* after = (char*)bmess->GetRecord()->SerializeWithCachedSizesToArray((ui8*)data.Pos()); + Y_VERIFY(after - data.Pos() == size); + + data.Advance(size); +} + +TAutoPtr<TBusMessage> TBusBufferProtocol::Deserialize(ui16 messageType, TArrayRef<const char> payload) { + TWhatThreadDoesPushPop pp("deserialize protobuf message"); + + TBusBufferBase* messageTemplate = FindType(messageType); + if (messageTemplate == nullptr) { + return nullptr; + //Y_FAIL("unknown message type: %d", unsigned(messageType)); + } + + // clone the base + TAutoPtr<TBusBufferBase> bmess = messageTemplate->New(); + + // Need to override protobuf message size limit + // NOTE: the payload size has already been checked against session MaxMessageSize + google::protobuf::io::CodedInputStream input(reinterpret_cast<const ui8*>(payload.data()), payload.size()); + input.SetTotalBytesLimit(payload.size()); + + bool ok = bmess->GetRecord()->ParseFromCodedStream(&input) && input.ConsumedEntireMessage(); + if (!ok) { + return nullptr; + } + return bmess.Release(); +} |