diff options
author | alexv-smirnov <alex@ydb.tech> | 2023-09-19 12:14:42 +0300 |
---|---|---|
committer | alexv-smirnov <alex@ydb.tech> | 2023-09-19 12:43:02 +0300 |
commit | 4662db373a1785a836134a4b2342e8471342bdda (patch) | |
tree | 2df6840dc71039dbcb4192a4b6b2edbf5f26248d | |
parent | 0364421e62126bcf971c82084d3c923be85c77c7 (diff) | |
download | ydb-4662db373a1785a836134a4b2342e8471342bdda.tar.gz |
Clone protobuf_udf to ydb/library
48 files changed, 3429 insertions, 0 deletions
diff --git a/library/cpp/protobuf/CMakeLists.txt b/library/cpp/protobuf/CMakeLists.txt index cfd4a1733f..ab3580519b 100644 --- a/library/cpp/protobuf/CMakeLists.txt +++ b/library/cpp/protobuf/CMakeLists.txt @@ -6,6 +6,8 @@ # original buildsystem will not be accepted. +add_subdirectory(dynamic_prototype) add_subdirectory(interop) add_subdirectory(json) add_subdirectory(util) +add_subdirectory(yql) diff --git a/library/cpp/protobuf/dynamic_prototype/CMakeLists.darwin-x86_64.txt b/library/cpp/protobuf/dynamic_prototype/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..f17724c5b1 --- /dev/null +++ b/library/cpp/protobuf/dynamic_prototype/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,19 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(cpp-protobuf-dynamic_prototype) +target_link_libraries(cpp-protobuf-dynamic_prototype PUBLIC + contrib-libs-cxxsupp + yutil + contrib-libs-protobuf +) +target_sources(cpp-protobuf-dynamic_prototype PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/protobuf/dynamic_prototype/dynamic_prototype.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/protobuf/dynamic_prototype/generate_file_descriptor_set.cpp +) diff --git a/library/cpp/protobuf/dynamic_prototype/CMakeLists.linux-aarch64.txt b/library/cpp/protobuf/dynamic_prototype/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..1d956f9a86 --- /dev/null +++ b/library/cpp/protobuf/dynamic_prototype/CMakeLists.linux-aarch64.txt @@ -0,0 +1,20 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(cpp-protobuf-dynamic_prototype) +target_link_libraries(cpp-protobuf-dynamic_prototype PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + contrib-libs-protobuf +) +target_sources(cpp-protobuf-dynamic_prototype PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/protobuf/dynamic_prototype/dynamic_prototype.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/protobuf/dynamic_prototype/generate_file_descriptor_set.cpp +) diff --git a/library/cpp/protobuf/dynamic_prototype/CMakeLists.linux-x86_64.txt b/library/cpp/protobuf/dynamic_prototype/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..1d956f9a86 --- /dev/null +++ b/library/cpp/protobuf/dynamic_prototype/CMakeLists.linux-x86_64.txt @@ -0,0 +1,20 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(cpp-protobuf-dynamic_prototype) +target_link_libraries(cpp-protobuf-dynamic_prototype PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + contrib-libs-protobuf +) +target_sources(cpp-protobuf-dynamic_prototype PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/protobuf/dynamic_prototype/dynamic_prototype.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/protobuf/dynamic_prototype/generate_file_descriptor_set.cpp +) diff --git a/library/cpp/protobuf/dynamic_prototype/CMakeLists.txt b/library/cpp/protobuf/dynamic_prototype/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/library/cpp/protobuf/dynamic_prototype/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/library/cpp/protobuf/dynamic_prototype/CMakeLists.windows-x86_64.txt b/library/cpp/protobuf/dynamic_prototype/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..f17724c5b1 --- /dev/null +++ b/library/cpp/protobuf/dynamic_prototype/CMakeLists.windows-x86_64.txt @@ -0,0 +1,19 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(cpp-protobuf-dynamic_prototype) +target_link_libraries(cpp-protobuf-dynamic_prototype PUBLIC + contrib-libs-cxxsupp + yutil + contrib-libs-protobuf +) +target_sources(cpp-protobuf-dynamic_prototype PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/protobuf/dynamic_prototype/dynamic_prototype.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/protobuf/dynamic_prototype/generate_file_descriptor_set.cpp +) diff --git a/library/cpp/protobuf/dynamic_prototype/dynamic_prototype.cpp b/library/cpp/protobuf/dynamic_prototype/dynamic_prototype.cpp new file mode 100644 index 0000000000..692331fbbe --- /dev/null +++ b/library/cpp/protobuf/dynamic_prototype/dynamic_prototype.cpp @@ -0,0 +1,63 @@ +#include "dynamic_prototype.h" + +#include <util/generic/yexception.h> + +TDynamicPrototype::TDynamicMessage::TDynamicMessage(THolder<NProtoBuf::Message> message, TIntrusivePtr<TDynamicPrototype> prototype) + : Prototype(std::move(prototype)) + , Message(std::move(message)) +{ +} + +NProtoBuf::Message& TDynamicPrototype::TDynamicMessage::operator*() { + return *Message; +} + +NProtoBuf::Message* TDynamicPrototype::TDynamicMessage::operator->() { + return Get(); +} + +NProtoBuf::Message* TDynamicPrototype::TDynamicMessage::Get() { + return Message.Get(); +} + +TDynamicPrototypePtr TDynamicPrototype::Create(const NProtoBuf::FileDescriptorSet& fds, const TString& messageName, bool yqlHack) { + return new TDynamicPrototype(fds, messageName, yqlHack); +} + +TDynamicPrototype::TDynamicPrototype(const NProtoBuf::FileDescriptorSet& fileDescriptorSet, const TString& messageName, bool yqlHack) +{ + const NProtoBuf::FileDescriptor* fileDescriptor; + + for (int i = 0; i < fileDescriptorSet.file_size(); ++i) { + fileDescriptor = Pool.BuildFile(fileDescriptorSet.file(i)); + if (fileDescriptor == nullptr) { + ythrow yexception() << "can't build file descriptor"; + } + } + + Descriptor = Pool.FindMessageTypeByName(messageName); + + // Первоначальный вариант поведения, когда тип определялся + // по имени сообщения верхнего уровня в заданном файле. + if (yqlHack && !Descriptor) { + Descriptor = fileDescriptor->FindMessageTypeByName(messageName); + } + + if (!Descriptor) { + ythrow yexception() << "no descriptor for " << messageName; + } + + Prototype = Factory.GetPrototype(Descriptor); +} + +TDynamicPrototype::TDynamicMessage TDynamicPrototype::Create() { + return TDynamicMessage(CreateUnsafe(), this); +} + +THolder<NProtoBuf::Message> TDynamicPrototype::CreateUnsafe() const { + return THolder<NProtoBuf::Message>(Prototype->New()); +} + +const NProtoBuf::Descriptor* TDynamicPrototype::GetDescriptor() const { + return Descriptor; +} diff --git a/library/cpp/protobuf/dynamic_prototype/dynamic_prototype.h b/library/cpp/protobuf/dynamic_prototype/dynamic_prototype.h new file mode 100644 index 0000000000..f2afcd5f86 --- /dev/null +++ b/library/cpp/protobuf/dynamic_prototype/dynamic_prototype.h @@ -0,0 +1,43 @@ +#pragma once + +#include <google/protobuf/dynamic_message.h> +#include <google/protobuf/descriptor.pb.h> + +#include <util/generic/ptr.h> + +class TDynamicPrototype; + +using TDynamicPrototypePtr = TIntrusivePtr<TDynamicPrototype>; + +class TDynamicPrototype : public TThrRefBase { +public: + class TDynamicMessage { + public: + TDynamicMessage() = default; + TDynamicMessage(THolder<NProtoBuf::Message> message, TIntrusivePtr<TDynamicPrototype> prototype); + NProtoBuf::Message& operator*(); + NProtoBuf::Message* operator->(); + NProtoBuf::Message* Get(); + private: + TIntrusivePtr<TDynamicPrototype> Prototype; + THolder<NProtoBuf::Message> Message; + }; + + static TDynamicPrototypePtr Create(const NProtoBuf::FileDescriptorSet& fds, const TString& messageName, bool yqlHack = false); + + // https://developers.google.com/protocol-buffers/docs/reference/cpp/google.protobuf.dynamic_message#DynamicMessageFactory + // Must outlive created messages + TDynamicMessage Create(); + + THolder<NProtoBuf::Message> CreateUnsafe() const; + + const NProtoBuf::Descriptor* GetDescriptor() const; + +private: + TDynamicPrototype(const NProtoBuf::FileDescriptorSet& fds, const TString& messageName, bool yqlHack); + + NProtoBuf::DescriptorPool Pool; + const NProtoBuf::Descriptor* Descriptor; + NProtoBuf::DynamicMessageFactory Factory; + const NProtoBuf::Message* Prototype; +}; diff --git a/library/cpp/protobuf/dynamic_prototype/generate_file_descriptor_set.cpp b/library/cpp/protobuf/dynamic_prototype/generate_file_descriptor_set.cpp new file mode 100644 index 0000000000..b3fc68e545 --- /dev/null +++ b/library/cpp/protobuf/dynamic_prototype/generate_file_descriptor_set.cpp @@ -0,0 +1,33 @@ +#include "generate_file_descriptor_set.h" + +#include <util/generic/queue.h> +#include <util/generic/set.h> +#include <util/generic/vector.h> + +using namespace NProtoBuf; + +FileDescriptorSet GenerateFileDescriptorSet(const Descriptor* desc) { + TVector<const FileDescriptor*> flat; + TQueue<const FileDescriptor*> descriptors; + + for (descriptors.push(desc->file()); !descriptors.empty(); descriptors.pop()) { + const FileDescriptor* file = descriptors.front(); + + for (int i = 0; i < file->dependency_count(); ++i) { + descriptors.push(file->dependency(i)); + } + + flat.push_back(file); + } + + FileDescriptorSet result; + TSet<TString> visited; + + for (auto ri = flat.rbegin(); ri != flat.rend(); ++ri) { + if (visited.insert((*ri)->name()).second == true) { + (*ri)->CopyTo(result.add_file()); + } + } + + return result; +} diff --git a/library/cpp/protobuf/dynamic_prototype/generate_file_descriptor_set.h b/library/cpp/protobuf/dynamic_prototype/generate_file_descriptor_set.h new file mode 100644 index 0000000000..8e1fb33a08 --- /dev/null +++ b/library/cpp/protobuf/dynamic_prototype/generate_file_descriptor_set.h @@ -0,0 +1,5 @@ +#pragma once + +#include <google/protobuf/descriptor.pb.h> + +NProtoBuf::FileDescriptorSet GenerateFileDescriptorSet(const NProtoBuf::Descriptor* desc); diff --git a/library/cpp/protobuf/dynamic_prototype/ut/dynamic_prototype_ut.cpp b/library/cpp/protobuf/dynamic_prototype/ut/dynamic_prototype_ut.cpp new file mode 100644 index 0000000000..bf3bfca4aa --- /dev/null +++ b/library/cpp/protobuf/dynamic_prototype/ut/dynamic_prototype_ut.cpp @@ -0,0 +1,27 @@ +#include <library/cpp/testing/unittest/registar.h> + +#include <library/cpp/protobuf/dynamic_prototype/dynamic_prototype.h> +#include <library/cpp/protobuf/dynamic_prototype/generate_file_descriptor_set.h> +#include <library/cpp/protobuf/dynamic_prototype/ut/my_message.pb.h> + + +Y_UNIT_TEST_SUITE(TDynamicPrototype) { + Y_UNIT_TEST(Create) { + const auto* descriptor = TMyMessage().GetDescriptor(); + const auto& fileDescriptorSet = GenerateFileDescriptorSet(descriptor); + + auto prototype = TDynamicPrototype::Create(fileDescriptorSet, "TMyMessage"); + auto myMessage = prototype->Create(); + + TMyMessage reference; + reference.SetFloat(12.1); + reference.MutableInnerMessage()->SetInt32(42); + reference.MutableInnerMessage()->SetString("string"); + + TString serialized; + Y_PROTOBUF_SUPPRESS_NODISCARD reference.SerializeToString(&serialized); + Y_PROTOBUF_SUPPRESS_NODISCARD myMessage->ParseFromString(serialized); + + UNIT_ASSERT_STRINGS_EQUAL(reference.DebugString(), myMessage->DebugString()); + } +} diff --git a/library/cpp/protobuf/dynamic_prototype/ut/my_inner_message.proto b/library/cpp/protobuf/dynamic_prototype/ut/my_inner_message.proto new file mode 100644 index 0000000000..d893d7b4d9 --- /dev/null +++ b/library/cpp/protobuf/dynamic_prototype/ut/my_inner_message.proto @@ -0,0 +1,4 @@ +message TMyInnerMessage { + required int32 Int32 = 1; + required string String = 2; +} diff --git a/library/cpp/protobuf/dynamic_prototype/ut/my_message.proto b/library/cpp/protobuf/dynamic_prototype/ut/my_message.proto new file mode 100644 index 0000000000..c548985530 --- /dev/null +++ b/library/cpp/protobuf/dynamic_prototype/ut/my_message.proto @@ -0,0 +1,6 @@ +import "library/cpp/protobuf/dynamic_prototype/ut/my_inner_message.proto"; + +message TMyMessage { + required TMyInnerMessage InnerMessage = 1; + required float Float = 2; +}
\ No newline at end of file diff --git a/library/cpp/protobuf/dynamic_prototype/ut/ya.make b/library/cpp/protobuf/dynamic_prototype/ut/ya.make new file mode 100644 index 0000000000..1bb570f959 --- /dev/null +++ b/library/cpp/protobuf/dynamic_prototype/ut/ya.make @@ -0,0 +1,13 @@ +UNITTEST() + +SRCS( + dynamic_prototype_ut.cpp + my_inner_message.proto + my_message.proto +) + +PEERDIR( + library/cpp/protobuf/dynamic_prototype +) + +END() diff --git a/library/cpp/protobuf/dynamic_prototype/ya.make b/library/cpp/protobuf/dynamic_prototype/ya.make new file mode 100644 index 0000000000..3fd883b6a2 --- /dev/null +++ b/library/cpp/protobuf/dynamic_prototype/ya.make @@ -0,0 +1,16 @@ +LIBRARY() + +SRCS( + dynamic_prototype.cpp + generate_file_descriptor_set.cpp +) + +PEERDIR( + contrib/libs/protobuf +) + +END() + +RECURSE_FOR_TESTS( + ut +) diff --git a/library/cpp/protobuf/yql/CMakeLists.darwin-x86_64.txt b/library/cpp/protobuf/yql/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..112f6e94e7 --- /dev/null +++ b/library/cpp/protobuf/yql/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,22 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(cpp-protobuf-yql) +target_link_libraries(cpp-protobuf-yql PUBLIC + contrib-libs-cxxsupp + yutil + contrib-libs-protobuf + library-cpp-json + cpp-protobuf-dynamic_prototype + cpp-protobuf-json + cpp-string_utils-base64 +) +target_sources(cpp-protobuf-yql PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/protobuf/yql/descriptor.cpp +) diff --git a/library/cpp/protobuf/yql/CMakeLists.linux-aarch64.txt b/library/cpp/protobuf/yql/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..de09662231 --- /dev/null +++ b/library/cpp/protobuf/yql/CMakeLists.linux-aarch64.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(cpp-protobuf-yql) +target_link_libraries(cpp-protobuf-yql PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + contrib-libs-protobuf + library-cpp-json + cpp-protobuf-dynamic_prototype + cpp-protobuf-json + cpp-string_utils-base64 +) +target_sources(cpp-protobuf-yql PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/protobuf/yql/descriptor.cpp +) diff --git a/library/cpp/protobuf/yql/CMakeLists.linux-x86_64.txt b/library/cpp/protobuf/yql/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..de09662231 --- /dev/null +++ b/library/cpp/protobuf/yql/CMakeLists.linux-x86_64.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(cpp-protobuf-yql) +target_link_libraries(cpp-protobuf-yql PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + contrib-libs-protobuf + library-cpp-json + cpp-protobuf-dynamic_prototype + cpp-protobuf-json + cpp-string_utils-base64 +) +target_sources(cpp-protobuf-yql PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/protobuf/yql/descriptor.cpp +) diff --git a/library/cpp/protobuf/yql/CMakeLists.txt b/library/cpp/protobuf/yql/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/library/cpp/protobuf/yql/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/library/cpp/protobuf/yql/CMakeLists.windows-x86_64.txt b/library/cpp/protobuf/yql/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..112f6e94e7 --- /dev/null +++ b/library/cpp/protobuf/yql/CMakeLists.windows-x86_64.txt @@ -0,0 +1,22 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(cpp-protobuf-yql) +target_link_libraries(cpp-protobuf-yql PUBLIC + contrib-libs-cxxsupp + yutil + contrib-libs-protobuf + library-cpp-json + cpp-protobuf-dynamic_prototype + cpp-protobuf-json + cpp-string_utils-base64 +) +target_sources(cpp-protobuf-yql PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/protobuf/yql/descriptor.cpp +) diff --git a/library/cpp/protobuf/yql/README.md b/library/cpp/protobuf/yql/README.md new file mode 100644 index 0000000000..622bfce841 --- /dev/null +++ b/library/cpp/protobuf/yql/README.md @@ -0,0 +1,26 @@ +Функции данной библиотеки, позволяют генерировать метаинформацию, необходимую YQL для того, чтобы работать с содержимым protobuf-сообщения как со структурой. + +Пример задания атрибута для таблицы в YT: +```c++ +#include <proto/person.pb.h> +#include <library/cpp/protobuf/yql/descriptor.h> + +auto client = NYT::CreateClient(ServerName); +client->Create(OutputPath, NT_TABLE, + TCreateOptions().Attributes(NYT::TNode() + ("_yql_proto_field_Data", GenerateProtobufTypeConfig<TPersonProto>()) +); +``` +Далее, в YQL можно будет написать следующий запрос: +```sql +SELECT t.Data.Name, t.Data.Age, FROM [table] AS t; +``` + +Для того, чтобы работать с protobuf-сообщениями, сохранёнными в разных представлениях есть специальные опции. На данный момент поддерживается три формата представления данных - binary-protobuf, text-protbuf и json. + +Пример использования опций: +```c++ +GenerateProtobufTypeConfig<TPersonProto>( + TProtoTypeConfigOptions().SetProtoFormat(PF_PROTOTEXT)) +); +``` diff --git a/library/cpp/protobuf/yql/descriptor.cpp b/library/cpp/protobuf/yql/descriptor.cpp new file mode 100644 index 0000000000..2d5a154fc1 --- /dev/null +++ b/library/cpp/protobuf/yql/descriptor.cpp @@ -0,0 +1,314 @@ +#include "descriptor.h" + +#include <library/cpp/json/json_reader.h> +#include <library/cpp/json/json_writer.h> +#include <library/cpp/protobuf/dynamic_prototype/dynamic_prototype.h> +#include <library/cpp/protobuf/dynamic_prototype/generate_file_descriptor_set.h> +#include <library/cpp/protobuf/json/json2proto.h> +#include <library/cpp/protobuf/json/proto2json.h> +#include <library/cpp/string_utils/base64/base64.h> + +#include <util/generic/hash.h> +#include <util/generic/queue.h> +#include <util/generic/set.h> +#include <util/generic/vector.h> +#include <util/stream/mem.h> +#include <util/stream/str.h> +#include <util/stream/zlib.h> +#include <util/string/cast.h> + +#include <google/protobuf/text_format.h> +#include <google/protobuf/io/zero_copy_stream_impl_lite.h> + +using namespace NProtoBuf; + +static TString SerializeFileDescriptorSet(const FileDescriptorSet& proto) { + const auto size = proto.ByteSize(); + TTempBuf data(size); + proto.SerializeWithCachedSizesToArray((ui8*)data.Data()); + + TStringStream str; + { + TZLibCompress comp(&str, ZLib::GZip); + comp.Write(data.Data(), size); + } + return str.Str(); +} + +static bool ParseFileDescriptorSet(const TStringBuf& data, FileDescriptorSet* proto) { + TMemoryInput input(data.data(), data.size()); + TString buf = TZLibDecompress(&input).ReadAll(); + + if (!proto->ParseFromArray(buf.data(), buf.size())) { + return false; + } + return true; +} + +TDynamicInfo::TDynamicInfo(TDynamicPrototypePtr dynamicPrototype) + : DynamicPrototype(dynamicPrototype) + , SkipBytes_(0) +{ +} + +TDynamicInfo::~TDynamicInfo() { +} + +TDynamicInfoRef TDynamicInfo::Create(const TStringBuf& typeConfig) { + auto data = ParseTypeConfig(typeConfig); + const TString& meta = Base64Decode(data.Metadata); + const TString& name = data.MessageName; + FileDescriptorSet set; + + if (!ParseFileDescriptorSet(meta, &set)) { + ythrow yexception() << "can't parse metadata"; + } + + auto info = MakeIntrusive<TDynamicInfo>(TDynamicPrototype::Create(set, name, true)); + + info->EnumFormat_ = data.EnumFormat; + info->ProtoFormat_ = data.ProtoFormat; + info->Recursion_ = data.Recursion; + info->YtMode_ = data.YtMode; + info->SkipBytes_ = data.SkipBytes; + info->OptionalLists_ = data.OptionalLists; + info->SyntaxAware_ = data.SyntaxAware; + return info; +} + +const Descriptor* TDynamicInfo::Descriptor() const { + return DynamicPrototype->GetDescriptor(); +} + +EEnumFormat TDynamicInfo::GetEnumFormat() const { + return EnumFormat_; +} + +ERecursionTraits TDynamicInfo::GetRecursionTraits() const { + return Recursion_; +} + +bool TDynamicInfo::GetYtMode() const { + return YtMode_; +} + +bool TDynamicInfo::GetOptionalLists() const { + return OptionalLists_; +} + +bool TDynamicInfo::GetSyntaxAware() const { + return SyntaxAware_; +} + +TAutoPtr<Message> TDynamicInfo::MakeProto() { + return DynamicPrototype->CreateUnsafe(); +} + +TAutoPtr<Message> TDynamicInfo::Parse(const TStringBuf& data) { + auto mut = MakeProto(); + TStringBuf tmp(data); + + if (SkipBytes_) { + tmp = TStringBuf(tmp.data() + SkipBytes_, tmp.size() - SkipBytes_); + } + + switch (ProtoFormat_) { + case PF_PROTOBIN: { + if (!mut->ParseFromArray(tmp.data(), tmp.size())) { + ythrow yexception() << "can't parse protobin message"; + } + break; + } + case PF_PROTOTEXT: { + io::ArrayInputStream si(tmp.data(), tmp.size()); + if (!TextFormat::Parse(&si, mut.Get())) { + ythrow yexception() << "can't parse prototext message"; + } + break; + } + case PF_JSON: { + NJson::TJsonValue value; + + if (NJson::ReadJsonFastTree(tmp, &value)) { + NProtobufJson::Json2Proto(value, *mut); + } else { + ythrow yexception() << "can't parse json value"; + } + break; + } + } + + return mut; +} + +TString TDynamicInfo::Serialize(const Message& proto) { + TString result; + switch (ProtoFormat_) { + case PF_PROTOBIN: { + result.ReserveAndResize(proto.ByteSize()); + if (!proto.SerializeToArray(result.begin(), result.size())) { + ythrow yexception() << "can't serialize protobin message"; + } + break; + } + case PF_PROTOTEXT: { + if (!TextFormat::PrintToString(proto, &result)) { + ythrow yexception() << "can't serialize prototext message"; + } + break; + } + case PF_JSON: { + NJson::TJsonValue value; + NProtobufJson::Proto2Json(proto, value); + result = NJson::WriteJson(value); + break; + } + } + return result; +} + +TString GenerateProtobufTypeConfig( + const Descriptor* descriptor, + const TProtoTypeConfigOptions& options) { + NJson::TJsonValue ret(NJson::JSON_MAP); + + ret["name"] = descriptor->full_name(); + ret["meta"] = Base64Encode( + SerializeFileDescriptorSet(GenerateFileDescriptorSet(descriptor))); + + if (options.SkipBytes > 0) { + ret["skip"] = options.SkipBytes; + } + + switch (options.ProtoFormat) { + case PF_PROTOBIN: + break; + case PF_PROTOTEXT: + ret["format"] = "prototext"; + break; + case PF_JSON: + ret["format"] = "json"; + break; + } + + if (!options.OptionalLists) { + ret["lists"]["optional"] = false; + } + + if (options.SyntaxAware) { + ret["syntax"]["aware"] = options.SyntaxAware; + } + + switch (options.EnumFormat) { + case EEnumFormat::Number: + break; + case EEnumFormat::Name: + ret["view"]["enum"] = "name"; + break; + case EEnumFormat::FullName: + ret["view"]["enum"] = "full_name"; + break; + } + + switch (options.Recursion) { + case ERecursionTraits::Fail: + break; + case ERecursionTraits::Ignore: + ret["view"]["recursion"] = "ignore"; + break; + case ERecursionTraits::Bytes: + ret["view"]["recursion"] = "bytes"; + break; + } + + if (options.YtMode) { + ret["view"]["yt_mode"] = true; + } + + return NJson::WriteJson(ret, false); +} + +TProtoTypeConfig ParseTypeConfig(const TStringBuf& config) { + if (config.empty()) { + ythrow yexception() << "empty metadata"; + } + + switch (config[0]) { + case '#': { + auto plus = config.find('+'); + + if (config[0] != '#') { + ythrow yexception() << "unknown version of metadata format"; + } + if (plus == TStringBuf::npos) { + ythrow yexception() << "invalid metadata"; + } + + TProtoTypeConfig result; + + result.MessageName = TStringBuf(config.begin() + 1, plus - 1); + result.Metadata = TStringBuf(config.begin() + 1 + plus, config.size() - plus - 1); + result.SkipBytes = 0; + + return result; + } + + case '{': { + NJson::TJsonValue value; + + if (NJson::ReadJsonFastTree(config, &value)) { + TProtoTypeConfig result; + TString protoFormat = value["format"].GetStringSafe("protobin"); + TString enumFormat = value["view"]["enum"].GetStringSafe("number"); + TString recursion = value["view"]["recursion"].GetStringSafe("fail"); + + result.MessageName = value["name"].GetString(); + result.Metadata = value["meta"].GetString(); + result.SkipBytes = value["skip"].GetIntegerSafe(0); + result.OptionalLists = value["lists"]["optional"].GetBooleanSafe(true); + result.SyntaxAware = value["syntax"]["aware"].GetBooleanSafe(false); + result.YtMode = value["view"]["yt_mode"].GetBooleanSafe(false); + + if (protoFormat == "protobin") { + result.ProtoFormat = PF_PROTOBIN; + } else if (protoFormat == "prototext") { + result.ProtoFormat = PF_PROTOTEXT; + } else if (protoFormat == "json") { + result.ProtoFormat = PF_JSON; + } else { + ythrow yexception() << "unsupported format " << protoFormat; + } + + if (enumFormat == "number") { + result.EnumFormat = EEnumFormat::Number; + } else if (enumFormat == "name") { + result.EnumFormat = EEnumFormat::Name; + } else if (enumFormat == "full_name") { + result.EnumFormat = EEnumFormat::FullName; + } else { + ythrow yexception() << "unsupported enum representation " + << enumFormat; + } + + if (recursion == "fail") { + result.Recursion = ERecursionTraits::Fail; + } else if (recursion == "ignore") { + result.Recursion = ERecursionTraits::Ignore; + } else if (recursion == "bytes") { + result.Recursion = ERecursionTraits::Bytes; + } else { + ythrow yexception() << "unsupported recursion trait " + << recursion; + } + + return result; + } else { + ythrow yexception() << "can't parse json metadata"; + } + } + + default: + ythrow yexception() << "invalid control char " + << TStringBuf(config.data(), 1); + } +} diff --git a/library/cpp/protobuf/yql/descriptor.h b/library/cpp/protobuf/yql/descriptor.h new file mode 100644 index 0000000000..bbed6850ec --- /dev/null +++ b/library/cpp/protobuf/yql/descriptor.h @@ -0,0 +1,161 @@ +#pragma once + +#include <google/protobuf/descriptor.h> +#include <google/protobuf/dynamic_message.h> +#include <google/protobuf/descriptor.pb.h> + +#include <library/cpp/protobuf/dynamic_prototype/dynamic_prototype.h> + +#include <util/generic/ptr.h> + +enum EProtoFormat { + PF_PROTOBIN = 0, + PF_PROTOTEXT = 1, + PF_JSON = 2, +}; + +enum class EEnumFormat { + Number = 0, + Name = 1, + FullName = 2, +}; + +enum class ERecursionTraits { + //! Падать, если на входе имеется рекурсивно определённый тип. + Fail = 0, + //! Игнорировать все поля с рекурсивно определённым типом. + Ignore = 1, + //! Возвращать поля с рекурсивным типом в виде сериализованной строки + Bytes = 2, +}; + +struct TProtoTypeConfig { + //! Имя сообщения. + TString MessageName; + //! Сериализованные метаданные. + TString Metadata; + //! Количество байт, которые надо отступить + //! от начала каждого блока данных. + ui32 SkipBytes = 0; + //! Формат сериализации proto-сообщений. + EProtoFormat ProtoFormat = PF_PROTOBIN; + //! Формат представление значений Enum. + EEnumFormat EnumFormat = EEnumFormat::Number; + //! Способ интерпретации рекурсивно определённых типов. + ERecursionTraits Recursion = ERecursionTraits::Fail; + //! Выдать бинарными строками, если SERIALIZATION_PROTOBUF. + //! Если SERIALIZATION_YT, то для рекурсивных структур поведение зависит от Recursion. + bool YtMode = false; + //! Заворачивать ли списочные типы в Optional. + bool OptionalLists = false; + //! Заполнять ли пустые Optional типы дефолтным значением (только для proto3). + bool SyntaxAware = false; +}; + +struct TProtoTypeConfigOptions { + //! Количество байт, которые надо отступить + //! от начала каждого блока данных. + ui32 SkipBytes = 0; + //! Формат сериализации proto-сообщений. + EProtoFormat ProtoFormat = PF_PROTOBIN; + //! Формат представление значений Enum. + EEnumFormat EnumFormat = EEnumFormat::Number; + //! Способ интерпретации рекурсивно определённых типов. + ERecursionTraits Recursion = ERecursionTraits::Fail; + //! Выдать бинарными строками, если SERIALIZATION_PROTOBUF. + //! Если SERIALIZATION_YT, то для рекурсивных структур поведение зависит от Recursion. + bool YtMode = false; + //! Заворачивать ли списочные типы в Optional. + bool OptionalLists = false; + //! Заполнять ли пустые Optional типы дефолтным значением (только для proto3). + bool SyntaxAware = false; + + TProtoTypeConfigOptions& SetProtoFormat(EProtoFormat value) { + ProtoFormat = value; + return *this; + } + + TProtoTypeConfigOptions& SetSkipBytes(ui32 value) { + SkipBytes = value; + return *this; + } + + TProtoTypeConfigOptions& SetEnumFormat(EEnumFormat value) { + EnumFormat = value; + return *this; + } + + TProtoTypeConfigOptions& SetRecursionTraits(ERecursionTraits value) { + Recursion = value; + return *this; + } + + TProtoTypeConfigOptions& SetYtMode(bool value) { + YtMode = value; + return *this; + } + + TProtoTypeConfigOptions& SetOptionalLists(bool value) { + OptionalLists = value; + return *this; + } + + TProtoTypeConfigOptions& SetSyntaxAware(bool value) { + SyntaxAware = value; + return *this; + } +}; + +TString GenerateProtobufTypeConfig( + const NProtoBuf::Descriptor* descriptor, + const TProtoTypeConfigOptions& options = TProtoTypeConfigOptions()); + +template <typename T> +inline TString GenerateProtobufTypeConfig( + const TProtoTypeConfigOptions& options = TProtoTypeConfigOptions()) { + return GenerateProtobufTypeConfig(T::descriptor(), options); +} + +TProtoTypeConfig ParseTypeConfig(const TStringBuf& config); + +using TDynamicInfoRef = TIntrusivePtr<class TDynamicInfo>; + +class TDynamicInfo: public TSimpleRefCount<TDynamicInfo> { +public: + TDynamicInfo(TDynamicPrototypePtr); + ~TDynamicInfo(); + + static TDynamicInfoRef Create(const TStringBuf& typeConfig); + + const NProtoBuf::Descriptor* Descriptor() const; + + //! Текущий формат представление значений Enum. + EEnumFormat GetEnumFormat() const; + + //! Текущий способ интерпретации рекурсивно определённых типов. + ERecursionTraits GetRecursionTraits() const; + + bool GetYtMode() const; + + bool GetOptionalLists() const; + + bool GetSyntaxAware() const; + + TAutoPtr<NProtoBuf::Message> MakeProto(); + + //! \param data сериализованное protobuf-сообщение. + TAutoPtr<NProtoBuf::Message> Parse(const TStringBuf& data); + + //! \param proto protobuf-сообщение. + TString Serialize(const NProtoBuf::Message& proto); + +private: + TDynamicPrototypePtr DynamicPrototype; + EEnumFormat EnumFormat_; + EProtoFormat ProtoFormat_; + ERecursionTraits Recursion_; + bool YtMode_; + ui32 SkipBytes_; + bool OptionalLists_; + bool SyntaxAware_; +}; diff --git a/library/cpp/protobuf/yql/ya.make b/library/cpp/protobuf/yql/ya.make new file mode 100644 index 0000000000..29c21da402 --- /dev/null +++ b/library/cpp/protobuf/yql/ya.make @@ -0,0 +1,15 @@ +LIBRARY() + +SRCS( + descriptor.cpp +) + +PEERDIR( + contrib/libs/protobuf + library/cpp/json + library/cpp/protobuf/dynamic_prototype + library/cpp/protobuf/json + library/cpp/string_utils/base64 +) + +END() diff --git a/ydb/library/yql/CMakeLists.txt b/ydb/library/yql/CMakeLists.txt index 7f41afad8e..32d85b66b2 100644 --- a/ydb/library/yql/CMakeLists.txt +++ b/ydb/library/yql/CMakeLists.txt @@ -11,6 +11,7 @@ add_subdirectory(core) add_subdirectory(dq) add_subdirectory(minikql) add_subdirectory(parser) +add_subdirectory(protobuf_udf) add_subdirectory(protos) add_subdirectory(providers) add_subdirectory(public) diff --git a/ydb/library/yql/protobuf_udf/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/protobuf_udf/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..5291f23d84 --- /dev/null +++ b/ydb/library/yql/protobuf_udf/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,33 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(ut) + +add_library(library-yql-protobuf_udf) +target_compile_options(library-yql-protobuf_udf PRIVATE + -DUDF_ABI_VERSION_MAJOR=2 + -DUDF_ABI_VERSION_MINOR=9 + -DUDF_ABI_VERSION_PATCH=0 +) +target_link_libraries(library-yql-protobuf_udf PUBLIC + contrib-libs-cxxsupp + yutil + contrib-libs-protobuf + cpp-protobuf-yql + yql-public-udf + library-yql-minikql + library-yql-utils + cpp-mapreduce-interface + yt_proto-yt-formats +) +target_sources(library-yql-protobuf_udf PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/protobuf_udf/proto_builder.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/protobuf_udf/module.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/protobuf_udf/type_builder.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/protobuf_udf/value_builder.cpp +) diff --git a/ydb/library/yql/protobuf_udf/CMakeLists.linux-aarch64.txt b/ydb/library/yql/protobuf_udf/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..1d5e018520 --- /dev/null +++ b/ydb/library/yql/protobuf_udf/CMakeLists.linux-aarch64.txt @@ -0,0 +1,34 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(ut) + +add_library(library-yql-protobuf_udf) +target_compile_options(library-yql-protobuf_udf PRIVATE + -DUDF_ABI_VERSION_MAJOR=2 + -DUDF_ABI_VERSION_MINOR=9 + -DUDF_ABI_VERSION_PATCH=0 +) +target_link_libraries(library-yql-protobuf_udf PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + contrib-libs-protobuf + cpp-protobuf-yql + yql-public-udf + library-yql-minikql + library-yql-utils + cpp-mapreduce-interface + yt_proto-yt-formats +) +target_sources(library-yql-protobuf_udf PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/protobuf_udf/proto_builder.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/protobuf_udf/module.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/protobuf_udf/type_builder.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/protobuf_udf/value_builder.cpp +) diff --git a/ydb/library/yql/protobuf_udf/CMakeLists.linux-x86_64.txt b/ydb/library/yql/protobuf_udf/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..1d5e018520 --- /dev/null +++ b/ydb/library/yql/protobuf_udf/CMakeLists.linux-x86_64.txt @@ -0,0 +1,34 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(ut) + +add_library(library-yql-protobuf_udf) +target_compile_options(library-yql-protobuf_udf PRIVATE + -DUDF_ABI_VERSION_MAJOR=2 + -DUDF_ABI_VERSION_MINOR=9 + -DUDF_ABI_VERSION_PATCH=0 +) +target_link_libraries(library-yql-protobuf_udf PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + contrib-libs-protobuf + cpp-protobuf-yql + yql-public-udf + library-yql-minikql + library-yql-utils + cpp-mapreduce-interface + yt_proto-yt-formats +) +target_sources(library-yql-protobuf_udf PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/protobuf_udf/proto_builder.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/protobuf_udf/module.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/protobuf_udf/type_builder.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/protobuf_udf/value_builder.cpp +) diff --git a/ydb/library/yql/protobuf_udf/CMakeLists.txt b/ydb/library/yql/protobuf_udf/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/library/yql/protobuf_udf/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/library/yql/protobuf_udf/CMakeLists.windows-x86_64.txt b/ydb/library/yql/protobuf_udf/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..5291f23d84 --- /dev/null +++ b/ydb/library/yql/protobuf_udf/CMakeLists.windows-x86_64.txt @@ -0,0 +1,33 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(ut) + +add_library(library-yql-protobuf_udf) +target_compile_options(library-yql-protobuf_udf PRIVATE + -DUDF_ABI_VERSION_MAJOR=2 + -DUDF_ABI_VERSION_MINOR=9 + -DUDF_ABI_VERSION_PATCH=0 +) +target_link_libraries(library-yql-protobuf_udf PUBLIC + contrib-libs-cxxsupp + yutil + contrib-libs-protobuf + cpp-protobuf-yql + yql-public-udf + library-yql-minikql + library-yql-utils + cpp-mapreduce-interface + yt_proto-yt-formats +) +target_sources(library-yql-protobuf_udf PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/protobuf_udf/proto_builder.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/protobuf_udf/module.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/protobuf_udf/type_builder.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/protobuf_udf/value_builder.cpp +) diff --git a/ydb/library/yql/protobuf_udf/module.cpp b/ydb/library/yql/protobuf_udf/module.cpp new file mode 100644 index 0000000000..95befe2220 --- /dev/null +++ b/ydb/library/yql/protobuf_udf/module.cpp @@ -0,0 +1,79 @@ +#include "module.h" + +namespace NYql { +namespace NUdf { + +using namespace NProtoBuf; + +void TProtobufBase::CleanupOnTerminate() const { +} + +void TProtobufBase::GetAllFunctions(IFunctionsSink& sink) const { + sink.Add(TStringRef::Of("Parse")); + sink.Add(TStringRef::Of("ParseText")); + sink.Add(TStringRef::Of("Serialize")); + sink.Add(TStringRef::Of("SerializeText")); +} + +void TProtobufBase::BuildFunctionTypeInfo( + const TStringRef& name, + TType* userType, + const TStringRef& typeConfig, + ui32 flags, + IFunctionTypeInfoBuilder& builder) const +{ + Y_UNUSED(userType); + Y_UNUSED(typeConfig); + + try { + TProtoInfo typeInfo; + ProtoTypeBuild(GetDescriptor(), + EEnumFormat::Number, + ERecursionTraits::Fail, true, builder, &typeInfo); + + auto stringType = builder.SimpleType<char*>(); + + if ((TStringRef::Of("Serialize") == name) || (TStringRef::Of("SerializeText") == name)) { + // function signature: + // String Serialize(Protobuf value) + builder.Returns(stringType) + .Args()->Add(typeInfo.StructType) + .Flags(ICallablePayload::TArgumentFlags::AutoMap) + .Done(); + } else { + // function signature: + // Protobuf Parse(String value) + builder.Returns(typeInfo.StructType) + .Args()->Add(stringType) + .Flags(ICallablePayload::TArgumentFlags::AutoMap) + .Done(); + } + + + if (TStringRef::Of("Serialize") == name) { + if ((flags & TFlags::TypesOnly) == 0) { + builder.Implementation(this->CreateSerialize(typeInfo, false)); + } + } + if (TStringRef::Of("SerializeText") == name) { + if ((flags & TFlags::TypesOnly) == 0) { + builder.Implementation(this->CreateSerialize(typeInfo, true)); + } + } + if (TStringRef::Of("Parse") == name) { + if ((flags & TFlags::TypesOnly) == 0) { + builder.Implementation(this->CreateValue(typeInfo, false)); + } + } + if (TStringRef::Of("ParseText") == name) { + if ((flags & TFlags::TypesOnly) == 0) { + builder.Implementation(this->CreateValue(typeInfo, true)); + } + } + } catch (...) { + builder.SetError(CurrentExceptionMessage()); + } +} + +} // namespace NUdf +} // namespace NYql diff --git a/ydb/library/yql/protobuf_udf/module.h b/ydb/library/yql/protobuf_udf/module.h new file mode 100644 index 0000000000..c6fb373e51 --- /dev/null +++ b/ydb/library/yql/protobuf_udf/module.h @@ -0,0 +1,113 @@ +#pragma once + +#include "type_builder.h" +#include "value_builder.h" + +#include <ydb/library/yql/public/udf/udf_value.h> +#include <ydb/library/yql/public/udf/udf_registrator.h> + +#include <google/protobuf/message.h> +#include <google/protobuf/text_format.h> +#include <google/protobuf/io/zero_copy_stream_impl_lite.h> + +namespace NYql { +namespace NUdf { + +class TProtobufBase : public IUdfModule { +public: + void CleanupOnTerminate() const override; + + void GetAllFunctions(IFunctionsSink& sink) const override; + + void BuildFunctionTypeInfo( + const TStringRef& name, + TType* userType, + const TStringRef& typeConfig, + ui32 flags, + IFunctionTypeInfoBuilder& builder) const override; + +protected: + virtual const NProtoBuf::Descriptor* GetDescriptor() const = 0; + + virtual TProtobufValue* CreateValue(const TProtoInfo& info, bool asText) const = 0; + + virtual TProtobufSerialize* CreateSerialize(const TProtoInfo& info, bool asText) const = 0; +}; + + +template <typename T> +class TProtobufModule : public TProtobufBase { + + class TValue : public TProtobufValue { + public: + TValue(const TProtoInfo& info, bool asText) + : TProtobufValue(info) + , AsText_(asText) + { + } + + TAutoPtr<NProtoBuf::Message> Parse(const TStringBuf& data) const override { + TAutoPtr<T> proto(new T); + if (AsText_) { + NProtoBuf::io::ArrayInputStream si(data.data(), data.size()); + if (!NProtoBuf::TextFormat::Parse(&si, proto.Get())) { + ythrow yexception() << "can't parse text protobuf"; + } + } else { + if (!proto->ParseFromArray(data.data(), data.size())) { + ythrow yexception() << "can't parse binary protobuf"; + } + } + return proto.Release(); + } + + private: + const bool AsText_; + }; + + class TSerialize : public TProtobufSerialize { + public: + TSerialize(const TProtoInfo& info, bool asText) + : TProtobufSerialize(info) + , AsText_(asText) + { + } + + TMaybe<TString> Serialize(const NProtoBuf::Message& proto) const override { + TString result; + if (AsText_) { + if (!NProtoBuf::TextFormat::PrintToString(proto, &result)) { + ythrow yexception() << "can't serialize prototext message"; + } + } else { + result.ReserveAndResize(proto.ByteSize()); + if (!proto.SerializeToArray(result.begin(), result.size())) { + ythrow yexception() << "can't serialize protobin message"; + } + } + return result; + } + + TAutoPtr<NProtoBuf::Message> MakeProto() const override { + return TAutoPtr<NProtoBuf::Message>(new T); + } + private: + const bool AsText_; + }; + +private: + const NProtoBuf::Descriptor* GetDescriptor() const override { + return T::descriptor(); + } + + TProtobufValue* CreateValue(const TProtoInfo& info, bool asText) const override { + return new TValue(info, asText); + } + + TProtobufSerialize* CreateSerialize(const TProtoInfo& info, bool asText) const override { + return new TSerialize(info, asText); + } +}; + +} // namespace NUdf +} // namespace NYql diff --git a/ydb/library/yql/protobuf_udf/proto_builder.cpp b/ydb/library/yql/protobuf_udf/proto_builder.cpp new file mode 100644 index 0000000000..883101d12c --- /dev/null +++ b/ydb/library/yql/protobuf_udf/proto_builder.cpp @@ -0,0 +1,276 @@ +#include "proto_builder.h" + +#include <ydb/library/yql/public/udf/udf_value_builder.h> +#include <ydb/library/yql/minikql/mkql_string_util.h> + +#include <util/generic/singleton.h> + +using namespace google::protobuf; + +namespace { + using namespace NYql::NUdf; + + const EnumValueDescriptor& GetEnumValue(const TUnboxedValuePod& source, const FieldDescriptor& field, + const TProtoInfo& info, TFlags<EFieldFlag> flags) { + const auto* enumDescriptor = field.enum_type(); + Y_ENSURE(enumDescriptor); + if (flags.HasFlags(EFieldFlag::EnumInt)) { + const auto number = source.Get<i64>(); + const auto* result = enumDescriptor->FindValueByNumber(number); + if (!result) { + ythrow yexception() << "unknown value " << number + << " for enum type " << enumDescriptor->full_name() + << ", field " << field.full_name(); + } + return *result; + } else if (flags.HasFlags(EFieldFlag::EnumString)) { + const TStringBuf name = source.AsStringRef(); + for (int i = 0; i < enumDescriptor->value_count(); ++i) { + const auto& value = *enumDescriptor->value(i); + if (value.name() == name) { + return value; + } + } + ythrow yexception() << "unknown value " << name + << " for enum type " << enumDescriptor->full_name() + << ", field " << field.full_name(); + } + if (info.EnumFormat == EEnumFormat::Number) { + const auto number = source.Get<i32>(); + const auto* result = enumDescriptor->FindValueByNumber(number); + if (!result) { + ythrow yexception() << "unknown value " << number + << " for enum type " << enumDescriptor->full_name() + << ", field " << field.full_name(); + } + return *result; + } + const TStringBuf name = source.AsStringRef(); + for (int i = 0; i < enumDescriptor->value_count(); ++i) { + const auto& value = *enumDescriptor->value(i); + const auto& valueName = info.EnumFormat == EEnumFormat::Name ? value.name() : value.full_name(); + if (valueName == name) { + return value; + } + } + ythrow yexception() << "unknown value " << name + << " for enum type " << enumDescriptor->full_name() + << ", field " << field.full_name(); + } + + void FillRepeatedField(const TUnboxedValuePod& source, Message& target, + const FieldDescriptor& field, const TProtoInfo& info, TFlags<EFieldFlag> flags) { + const auto& reflection = *target.GetReflection(); + const auto iter = source.GetListIterator(); + reflection.ClearField(&target, &field); + for (TUnboxedValue item; iter.Next(item);) { + switch (field.type()) { + case FieldDescriptor::TYPE_DOUBLE: + reflection.AddDouble(&target, &field, item.Get<double>()); + break; + + case FieldDescriptor::TYPE_FLOAT: + reflection.AddFloat(&target, &field, item.Get<float>()); + break; + + case FieldDescriptor::TYPE_INT64: + case FieldDescriptor::TYPE_SFIXED64: + case FieldDescriptor::TYPE_SINT64: + reflection.AddInt64(&target, &field, item.Get<i64>()); + break; + + case FieldDescriptor::TYPE_ENUM: + { + const auto& enumValue = GetEnumValue(item, field, info, flags); + reflection.AddEnum(&target, &field, &enumValue); + } + break; + case FieldDescriptor::TYPE_UINT64: + case FieldDescriptor::TYPE_FIXED64: + reflection.AddUInt64(&target, &field, item.Get<ui64>()); + break; + + case FieldDescriptor::TYPE_INT32: + case FieldDescriptor::TYPE_SFIXED32: + case FieldDescriptor::TYPE_SINT32: + reflection.AddInt32(&target, &field, item.Get<i32>()); + break; + + case FieldDescriptor::TYPE_UINT32: + case FieldDescriptor::TYPE_FIXED32: + reflection.AddUInt32(&target, &field, item.Get<ui32>()); + break; + + case FieldDescriptor::TYPE_BOOL: + reflection.AddBool(&target, &field, item.Get<bool>()); + break; + + case FieldDescriptor::TYPE_STRING: + reflection.AddString(&target, &field, TString(item.AsStringRef())); + break; + + case FieldDescriptor::TYPE_BYTES: + reflection.AddString(&target, &field, TString(item.AsStringRef())); + break; + + case FieldDescriptor::TYPE_MESSAGE: + { + auto* nestedMessage = reflection.AddMessage(&target, &field); + if (flags.HasFlags(EFieldFlag::Binary)) { + const auto& bytes = TStringBuf(item.AsStringRef()); + Y_ENSURE(nestedMessage->ParseFromArray(bytes.data(), bytes.size())); + } else { + FillProtoFromValue(item, *nestedMessage, info); + } + } + break; + + default: + ythrow yexception() << "Unsupported protobuf type: " + << field.type_name() << ", field: " << field.name(); + } + } + } + + void FillSingleField(const TUnboxedValuePod& source, Message& target, + const FieldDescriptor& field, const TProtoInfo& info, TFlags<EFieldFlag> flags) { + const auto& reflection = *target.GetReflection(); + switch (field.type()) { + case FieldDescriptor::TYPE_DOUBLE: + reflection.SetDouble(&target, &field, source.Get<double>()); + break; + + case FieldDescriptor::TYPE_FLOAT: + reflection.SetFloat(&target, &field, source.Get<float>()); + break; + + case FieldDescriptor::TYPE_INT64: + case FieldDescriptor::TYPE_SFIXED64: + case FieldDescriptor::TYPE_SINT64: + reflection.SetInt64(&target, &field, source.Get<i64>()); + break; + + case FieldDescriptor::TYPE_ENUM: + { + const auto& enumValue = GetEnumValue(source, field, info, flags); + reflection.SetEnum(&target, &field, &enumValue); + } + break; + + case FieldDescriptor::TYPE_UINT64: + case FieldDescriptor::TYPE_FIXED64: + reflection.SetUInt64(&target, &field, source.Get<ui64>()); + break; + + case FieldDescriptor::TYPE_INT32: + case FieldDescriptor::TYPE_SFIXED32: + case FieldDescriptor::TYPE_SINT32: + reflection.SetInt32(&target, &field, source.Get<i32>()); + break; + + case FieldDescriptor::TYPE_UINT32: + case FieldDescriptor::TYPE_FIXED32: + reflection.SetUInt32(&target, &field, source.Get<ui32>()); + break; + + case FieldDescriptor::TYPE_BOOL: + reflection.SetBool(&target, &field, source.Get<bool>()); + break; + + case FieldDescriptor::TYPE_STRING: + reflection.SetString(&target, &field, TString(source.AsStringRef())); + break; + + case FieldDescriptor::TYPE_BYTES: + reflection.SetString(&target, &field, TString(source.AsStringRef())); + break; + + case FieldDescriptor::TYPE_MESSAGE: + { + auto* nestedMessage = reflection.MutableMessage(&target, &field); + if (flags.HasFlags(EFieldFlag::Binary)) { + const auto& bytes = TStringBuf(source.AsStringRef()); + Y_ENSURE(nestedMessage->ParseFromArray(bytes.data(), bytes.size())); + } else { + FillProtoFromValue(source, *nestedMessage, info); + } + } + break; + + default: + ythrow yexception() << "Unsupported protobuf type: " + << field.type_name() << ", field: " << field.name(); + } + } + + void FillMapField(const TUnboxedValuePod& source, Message& target, const FieldDescriptor& field, const TProtoInfo& info, TFlags<EFieldFlag> flags) { + const auto& reflection = *target.GetReflection(); + reflection.ClearField(&target, &field); + if (source) { + const auto noBinaryFlags = TFlags<EFieldFlag>(flags).RemoveFlags(EFieldFlag::Binary); + const auto iter = source.GetDictIterator(); + for (TUnboxedValue key, value; iter.NextPair(key, value);) { + auto* nestedMessage = reflection.AddMessage(&target, &field); + const auto& descriptor = *nestedMessage->GetDescriptor(); + FillSingleField(key, *nestedMessage, *descriptor.map_key(), info, noBinaryFlags); + FillSingleField(value, *nestedMessage, *descriptor.map_value(), info, flags); + } + } + } +} + +namespace NYql::NUdf { + +void FillProtoFromValue(const TUnboxedValuePod& source, Message& target, const TProtoInfo& info) { + const auto& descriptor = *target.GetDescriptor(); + TMessageInfo* messageInfo; + { + const auto it = info.Messages.find(descriptor.full_name()); + if (it == info.Messages.end()) { + ythrow yexception() << "unknown message " << descriptor.full_name(); + } + messageInfo = it->second.get(); + } + + const auto& reflection = *target.GetReflection(); + for (int i = 0; i < descriptor.field_count(); ++i) { + const auto& field = *descriptor.field(i); + const auto it = messageInfo->Fields.find(field.number()); + Y_ENSURE(it != messageInfo->Fields.end()); + auto pos = it->second.Pos; + TFlags<EFieldFlag> flags = it->second.Flags; + auto fieldValue = source.GetElement(pos); + if (field.containing_oneof() && flags.HasFlags(EFieldFlag::Variant)) { + const ui32* varIndex = messageInfo->VariantIndicies.FindPtr(field.number()); + Y_ENSURE(varIndex); + if (fieldValue && fieldValue.GetVariantIndex() == *varIndex) { + fieldValue = fieldValue.GetVariantItem(); + } else { + reflection.ClearField(&target, &field); + continue; + } + } + if (flags.HasFlags(EFieldFlag::Void)) { + reflection.ClearField(&target, &field); + continue; + } + + if (!fieldValue) { + if (field.is_required()) { + ythrow yexception() << "required field " << field.name() << " has no value"; + } + reflection.ClearField(&target, &field); + continue; + } + if (field.is_map() && flags.HasFlags(EFieldFlag::Dict)) { + FillMapField(fieldValue, target, field, info, flags); + } else if (field.is_repeated()) { + FillRepeatedField(fieldValue, target, field, info, flags); + } else { + FillSingleField(fieldValue, target, field, info, flags); + } + } +} + +} // namespace NYql::NUdf + diff --git a/ydb/library/yql/protobuf_udf/proto_builder.h b/ydb/library/yql/protobuf_udf/proto_builder.h new file mode 100644 index 0000000000..dab934ad98 --- /dev/null +++ b/ydb/library/yql/protobuf_udf/proto_builder.h @@ -0,0 +1,20 @@ +#pragma once + +#include <google/protobuf/descriptor.h> +#include <google/protobuf/message.h> +#include <ydb/library/yql/protobuf_udf/type_builder.h> +#include <ydb/library/yql/public/udf/udf_type_builder.h> + +#include <util/generic/vector.h> +#include <util/generic/hash.h> +#include <util/system/mutex.h> + +namespace NYql { +namespace NUdf { + +void FillProtoFromValue(const NKikimr::NUdf::TUnboxedValuePod& source, NProtoBuf::Message& target, + const NKikimr::NUdf::TProtoInfo& info); + + +} // namespace NUdf +} // namespace NYql diff --git a/ydb/library/yql/protobuf_udf/type_builder.cpp b/ydb/library/yql/protobuf_udf/type_builder.cpp new file mode 100644 index 0000000000..9817cd55a3 --- /dev/null +++ b/ydb/library/yql/protobuf_udf/type_builder.cpp @@ -0,0 +1,463 @@ +#include "type_builder.h" + +#include <ydb/library/yql/public/udf/udf_value.h> +#include <ydb/library/yql/public/udf/udf_type_inspection.h> +#include <ydb/library/yql/utils/yql_panic.h> + +#include <yt/cpp/mapreduce/interface/protobuf_format.h> +#include <yt/yt_proto/yt/formats/extension.pb.h> + +#include <util/generic/set.h> + +#include <optional> + +namespace NYql { +namespace NUdf { +namespace { + +using namespace NProtoBuf; + +class TTypeBuilder { +public: + TTypeBuilder(EEnumFormat enumFormat, + ERecursionTraits recursion, + bool ytMode, + bool optionalLists, + bool syntaxAware, + bool useJsonName, + EProtoStringYqlType stringType, + IFunctionTypeInfoBuilder& builder); + ~TTypeBuilder(); + + void Build(const Descriptor* descriptor, TProtoInfo* info); + + TType* GenerateTypeInfo(const Descriptor* descriptor, bool defaultYtSerialize); + +private: + TType* GetBytesType(); + TType* GetInt64Type(); + TType* GetYsonType(); + TType* GetType(const FieldDescriptor* fd, bool defaultYtSerialize); + TType* GetOptionalType(TType* type); + TType* GetListType(TType* type); + +private: + using TTypeMap = THashMap<TType*, TType*>; + + EEnumFormat EnumFormat_; + ERecursionTraits Recursion_; + bool YtMode_; + bool OptionalLists_; + bool SyntaxAware_; + bool UseJsonName_; + EProtoStringYqlType StringType_; + IFunctionTypeInfoBuilder& + Builder_; + TProtoInfo* Info_; + TType* BasicTypes_[FieldDescriptor::Type::MAX_TYPE + 1]; + TType* YsonType; + TSet<TString> KnownMessages_; + TTypeMap Optionals_; + TTypeMap Lists_; +}; + +TTypeBuilder::TTypeBuilder(EEnumFormat enumFormat, + ERecursionTraits recursion, + bool ytMode, + bool optionalLists, + bool syntaxAware, + bool useJsonName, + EProtoStringYqlType stringType, + IFunctionTypeInfoBuilder& builder) + : EnumFormat_(enumFormat) + , Recursion_(recursion) + , YtMode_(ytMode) + , OptionalLists_(optionalLists) + , SyntaxAware_(syntaxAware) + , UseJsonName_(useJsonName) + , StringType_(stringType) + , Builder_(builder) + , Info_(nullptr) + , YsonType(nullptr) +{ + for (size_t i = 0; i < Y_ARRAY_SIZE(BasicTypes_); ++i) { + BasicTypes_[i] = nullptr; + } +} + +TTypeBuilder::~TTypeBuilder() +{ } + +void TTypeBuilder::Build(const Descriptor* descriptor, TProtoInfo* info) { + Info_ = info; + Info_->EnumFormat = EnumFormat_; + Info_->Recursion = Recursion_; + Info_->YtMode = YtMode_; + Info_->StructType = GenerateTypeInfo(descriptor, false); + Info_->OptionalLists = OptionalLists_; + Info_->SyntaxAware = SyntaxAware_; + Info_->StringType = StringType_; + Info_->UseJsonName = UseJsonName_; +} + +TType* TTypeBuilder::GenerateTypeInfo(const Descriptor* descriptor, bool defaultYtSerialize) { + auto fullName = descriptor->full_name(); + + if (KnownMessages_.find(fullName) != KnownMessages_.end()) { + auto mi = Info_->Messages.find(fullName); + if (mi == Info_->Messages.end()) { + switch (Recursion_) { + case ERecursionTraits::Fail: + ythrow yexception() << "can't handle recursive types: " + << fullName; + case ERecursionTraits::Bytes: + case ERecursionTraits::Ignore: + return nullptr; + } + } + return mi->second->StructType; + } else { + KnownMessages_.insert(fullName); + } + + std::shared_ptr<TMessageInfo> message = std::make_shared<TMessageInfo>(); + + auto makeField = [&](ui32& pos, TFlags<EFieldFlag>& flags, const IStructTypeBuilder::TPtr& structType, const FieldDescriptor* fd, std::optional<bool> fromVariant) { + bool isMessageField = fd->cpp_type() == FieldDescriptor::CPPTYPE_MESSAGE; + + auto name = fd->name(); + if (UseJsonName_) { + Y_ASSERT(!fd->json_name().empty()); + name = fd->json_name(); + } else if (YtMode_) { + name = NYT::NDetail::GetColumnName(*fd); + } + + // Создаём тип поля + TType* type = nullptr; + std::optional<NYT::NDetail::TProtobufFieldOptions> ytOpts; + + auto wrapRecursiveType = [&](TType* type, TFlags<EFieldFlag>& flags) { + if (type) { + return type; + } + if (Recursion_ == ERecursionTraits::Ignore) { + flags |= EFieldFlag::Void; + return Builder_.Void(); + } + Y_ENSURE(Recursion_ == ERecursionTraits::Bytes); + flags |= EFieldFlag::Binary; + type = GetBytesType(); + if (fromVariant && !*fromVariant) { + type = GetOptionalType(type); + } + return type; + }; + + if (YtMode_) { + ytOpts = NYT::NDetail::GetFieldOptions(fd, + defaultYtSerialize + ? MakeMaybe<NYT::NDetail::TProtobufFieldOptions>(NYT::NDetail::TProtobufFieldOptions{.SerializationMode = NYT::NDetail::EProtobufSerializationMode::Yt}) + : Nothing()); + if (fd->is_map()) { + auto mapMessage = fd->message_type(); + switch (ytOpts->MapMode) { + case NYT::NDetail::EProtobufMapMode::ListOfStructsLegacy: + case NYT::NDetail::EProtobufMapMode::ListOfStructs: + type = GenerateTypeInfo(mapMessage, NYT::NDetail::EProtobufMapMode::ListOfStructs == ytOpts->MapMode); + break; + case NYT::NDetail::EProtobufMapMode::Dict: + case NYT::NDetail::EProtobufMapMode::OptionalDict: + Y_ENSURE(mapMessage->field_count() == 2); + flags |= EFieldFlag::Dict; + type = Builder_.Dict() + ->Key(GetType(mapMessage->map_key(), false)) + .Value(wrapRecursiveType(GetType(mapMessage->map_value(), true), flags)) + .Build(); + message->DictTypes[fd->number()] = type; + if (NYT::NDetail::EProtobufMapMode::OptionalDict == ytOpts->MapMode) { + flags |= EFieldFlag::OptionalContainer; + type = GetOptionalType(type); + } + break; + } + } else if (isMessageField && ytOpts->SerializationMode == NYT::NDetail::EProtobufSerializationMode::Protobuf) { + type = GetBytesType(); + flags |= EFieldFlag::Binary; + } else if (ytOpts->Type) { + switch (*ytOpts->Type) { + case NYT::NDetail::EProtobufType::Any: + if (fd->type() != FieldDescriptor::TYPE_BYTES) { + ythrow yexception() << "Expected 'string' type of 'ANY' field: " << fd->name(); + } + type = GetYsonType(); + break; + case NYT::NDetail::EProtobufType::EnumInt: + if (fd->type() != FieldDescriptor::TYPE_ENUM) { + ythrow yexception() << "Expected 'enum' type of 'ENUM_INT' field: " << fd->name(); + } + flags |= EFieldFlag::EnumInt; + type = GetInt64Type(); + break; + case NYT::NDetail::EProtobufType::EnumString: + if (fd->type() != FieldDescriptor::TYPE_ENUM) { + ythrow yexception() << "Expected 'enum' type of 'ENUM_STRING' field: " << fd->name(); + } + flags |= EFieldFlag::EnumString; + type = GetBytesType(); + break; + default: + ythrow yexception() << "Unsupported YT extension tag: " << *ytOpts->Type + << ", field: " << fd->name(); + } + } else { + type = GetType(fd, false); + } + } else { + type = GetType(fd, false); + } + + if (!flags.HasFlags(EFieldFlag::Dict)) { + if (type) { + if (fd->is_repeated()) { + // Преобразуем базовый тип к списку + type = GetListType(type); + // и к nullable, если это необходимо. + if (OptionalLists_ || (ytOpts && NYT::NDetail::EProtobufListMode::Optional == ytOpts->ListMode)) { + flags |= EFieldFlag::OptionalContainer; + type = GetOptionalType(type); + } + } else { + if (fromVariant) { + if (!*fromVariant) { + // For 'variant as separate fields' always make optional type + // Otherwise always ignore optionality + type = GetOptionalType(type); + } + } else if (fd->is_optional() && (isMessageField || !AvoidOptionalScalars(SyntaxAware_, fd))) { + type = GetOptionalType(type); + } + } + } else { + type = wrapRecursiveType(type, flags); + } + } + + // Добавляем поле в текущую структуру + structType->AddField(name, type, &pos); + }; + + auto structType = Builder_.Struct(descriptor->field_count()); + message->Fields.reserve(descriptor->field_count()); + + THashMap<const OneofDescriptor*, ui32> visitedOneofs; + for (int i = 0, end = descriptor->field_count(); i < end; ++i) { + const FieldDescriptor* fd = descriptor->field(i); + if (auto oneofDescriptor = fd->containing_oneof(); YtMode_ && oneofDescriptor) { + if (!visitedOneofs.contains(oneofDescriptor)) { + auto oneofOptions = NYT::NDetail::GetOneofOptions(oneofDescriptor); + switch (oneofOptions.Mode) { + case NYT::NDetail::EProtobufOneofMode::SeparateFields: + for (int i = 0; i < oneofDescriptor->field_count(); ++i) { + auto& field = message->Fields[oneofDescriptor->field(i)->number()]; + makeField(field.Pos, field.Flags, structType, oneofDescriptor->field(i), false); + } + visitedOneofs.emplace(oneofDescriptor, Max<ui32>()); + break; + case NYT::NDetail::EProtobufOneofMode::Variant: { + auto varStructType = Builder_.Struct(oneofDescriptor->field_count()); + for (int i = 0; i < oneofDescriptor->field_count(); ++i) { + auto fdOneof = oneofDescriptor->field(i); + auto& field = message->Fields[fdOneof->number()]; + field.Flags |= EFieldFlag::Variant; + makeField(message->VariantIndicies[fdOneof->number()], field.Flags, varStructType, fdOneof, true); + } + structType->AddField(oneofOptions.VariantFieldName, Builder_.Optional()->Item(Builder_.Variant()->Over(varStructType->Build()).Build()).Build(), &visitedOneofs[oneofDescriptor]); + break; + } + } + } + } else { + // Запоминаем поле по соответствующему тегу из proto + auto& field = message->Fields[fd->number()]; + makeField(field.Pos, field.Flags, structType, fd, std::nullopt); + } + } + + // Завершаем создание текушей структуры + message->StructType = structType->Build(); + + auto typeHelper = Builder_.TypeInfoHelper(); + auto structTypeInspector = TStructTypeInspector(*typeHelper, message->StructType); + if (!structTypeInspector) { + ythrow yexception() << "invalid struct type of " << fullName << " descriptor"; + } + message->FieldsCount = structTypeInspector.GetMembersCount(); + + // Позиции становятся известны после вызова Build() + for (auto [oneofDescriptor, pos]: visitedOneofs) { + if (pos != Max<ui32>()) { + for (int i = 0; i < oneofDescriptor->field_count(); ++i) { + message->Fields[oneofDescriptor->field(i)->number()].Pos = pos; + } + } + } + + // Зарегистрируем созданное сообщение в дескрипторе типа. + Info_->Messages.insert( + std::make_pair(descriptor->full_name(), message) + ); + + return message->StructType; +} + +TType* TTypeBuilder::GetBytesType() { + if (BasicTypes_[FieldDescriptor::TYPE_BYTES] == nullptr) { + BasicTypes_[FieldDescriptor::TYPE_BYTES] = Builder_.SimpleType<char*>(); + } + return BasicTypes_[FieldDescriptor::TYPE_BYTES]; +} + +TType* TTypeBuilder::GetInt64Type() { + if (BasicTypes_[FieldDescriptor::TYPE_INT64] == nullptr) { + BasicTypes_[FieldDescriptor::TYPE_INT64] = Builder_.SimpleType<i64>(); + } + return BasicTypes_[FieldDescriptor::TYPE_INT64]; +} + +TType* TTypeBuilder::GetYsonType() { + if (YsonType == nullptr) { + YsonType = Builder_.SimpleType<TYson>(); + } + return YsonType; +} + +TType* TTypeBuilder::GetType(const FieldDescriptor* fd, bool defaultYtSerialize) { + FieldDescriptor::Type type = fd->type(); + + // Unify types + switch (type) { + case FieldDescriptor::TYPE_SFIXED32: + case FieldDescriptor::TYPE_SINT32: + type = FieldDescriptor::TYPE_INT32; + break; + case FieldDescriptor::TYPE_SFIXED64: + case FieldDescriptor::TYPE_SINT64: + type = FieldDescriptor::TYPE_INT64; + break; + case FieldDescriptor::TYPE_FIXED32: + type = FieldDescriptor::TYPE_UINT32; + break; + case FieldDescriptor::TYPE_FIXED64: + type = FieldDescriptor::TYPE_UINT64; + break; + case FieldDescriptor::TYPE_FLOAT: + if (YtMode_) { + type = FieldDescriptor::TYPE_DOUBLE; + } + break; + case FieldDescriptor::TYPE_STRING: + if (StringType_ == EProtoStringYqlType::Bytes) { + type = FieldDescriptor::TYPE_BYTES; + } + break; + case FieldDescriptor::TYPE_MESSAGE: + return GenerateTypeInfo(fd->message_type(), defaultYtSerialize); + default: + ; + } + + if (BasicTypes_[type] == nullptr) { + switch (type) { + case FieldDescriptor::TYPE_INT32: + BasicTypes_[type] = Builder_.SimpleType<i32>(); + break; + case FieldDescriptor::TYPE_INT64: + BasicTypes_[type] = Builder_.SimpleType<i64>(); + break; + case FieldDescriptor::TYPE_UINT32: + BasicTypes_[type] = Builder_.SimpleType<ui32>(); + break; + case FieldDescriptor::TYPE_UINT64: + BasicTypes_[type] = Builder_.SimpleType<ui64>(); + break; + case FieldDescriptor::TYPE_FLOAT: + BasicTypes_[type] = Builder_.SimpleType<float>(); + break; + case FieldDescriptor::TYPE_DOUBLE: + BasicTypes_[type] = Builder_.SimpleType<double>(); + break; + case FieldDescriptor::TYPE_BOOL: + BasicTypes_[type] = Builder_.SimpleType<bool>(); + break; + case FieldDescriptor::TYPE_ENUM: + switch (EnumFormat_) { + case EEnumFormat::Number: + BasicTypes_[type] = Builder_.SimpleType<i32>(); + break; + case EEnumFormat::Name: + case EEnumFormat::FullName: + BasicTypes_[type] = Builder_.SimpleType<char*>(); + break; + } + break; + case FieldDescriptor::TYPE_STRING: + BasicTypes_[type] = Builder_.SimpleType<TUtf8>(); + break; + case FieldDescriptor::TYPE_BYTES: + BasicTypes_[type] = Builder_.SimpleType<char*>(); + break; + default: + ythrow yexception() << "Unsupported protobuf type: " << fd->type_name() + << ", field: " << fd->name() << ", " << int(fd->type()); + } + } + + return BasicTypes_[type]; +} + +TType* TTypeBuilder::GetOptionalType(TType* type) { + auto ti = Optionals_.find(type); + if (ti != Optionals_.end()) { + return ti->second; + } else { + auto optionalType = Builder_.Optional()->Item(type).Build(); + Optionals_.insert(std::make_pair(type, optionalType)); + return optionalType; + } +} + +TType* TTypeBuilder::GetListType(TType* type) { + auto ti = Lists_.find(type); + if (ti != Lists_.end()) { + return ti->second; + } else { + auto listType = Builder_.List()->Item(type).Build(); + Lists_.insert(std::make_pair(type, listType)); + return listType; + } +} + +} // namespace + +void ProtoTypeBuild(const NProtoBuf::Descriptor* descriptor, + const EEnumFormat enumFormat, + const ERecursionTraits recursion, + const bool optionalLists, + IFunctionTypeInfoBuilder& builder, + TProtoInfo* info, + EProtoStringYqlType stringType, + const bool syntaxAware, + const bool useJsonName, + const bool ytMode) +{ + TTypeBuilder(enumFormat, recursion, ytMode, optionalLists, syntaxAware, useJsonName, + stringType, builder).Build(descriptor, info); +} + +bool AvoidOptionalScalars(bool syntaxAware, const FieldDescriptor* fd) { + return syntaxAware && fd->file()->syntax() == FileDescriptor::SYNTAX_PROTO3; +} + +} // namespace NUdf +} // namespace NYql diff --git a/ydb/library/yql/protobuf_udf/type_builder.h b/ydb/library/yql/protobuf_udf/type_builder.h new file mode 100644 index 0000000000..d2e5543e5f --- /dev/null +++ b/ydb/library/yql/protobuf_udf/type_builder.h @@ -0,0 +1,93 @@ +#pragma once + +#include <ydb/library/yql/public/udf/udf_type_builder.h> +#include <library/cpp/protobuf/yql/descriptor.h> + +#include <util/generic/hash.h> +#include <util/generic/hash_set.h> +#include <util/generic/string.h> +#include <util/generic/ptr.h> +#include <util/generic/flags.h> + +#include <google/protobuf/message.h> + +namespace NYql { +namespace NUdf { + +enum class EFieldFlag: ui16 { + Void = 1 << 0, + Binary = 1 << 1, + OptionalContainer = 1 << 2, + Variant = 1 << 3, + Dict = 1 << 4, + EnumInt = 1 << 5, + EnumString = 1 << 6, +}; + +struct TMessageInfo { + struct TFieldInfo { + ui32 Pos; + TFlags<EFieldFlag> Flags; + }; + TType* StructType = nullptr; + ui32 FieldsCount = 0; + THashMap<ui64, TFieldInfo> Fields; + THashMap<ui64, ui32> VariantIndicies; + THashMap<ui64, TType*> DictTypes; +}; + +enum class EProtoStringYqlType { + Bytes, + Utf8 +}; + +// Don't reuse this structure between UDF calls. It caches TType*, which are valid only in specific scope. +struct TProtoInfo { + using TMessageMap = THashMap<TString, std::shared_ptr<TMessageInfo>>; + + TType* StructType = nullptr; + TMessageMap Messages; + EEnumFormat EnumFormat = EEnumFormat::Number; + ERecursionTraits Recursion = ERecursionTraits::Fail; + bool YtMode = false; + bool OptionalLists = false; + bool SyntaxAware = false; + EProtoStringYqlType StringType = EProtoStringYqlType::Bytes; + bool UseJsonName = false; + + #define SET_VALUE(type, name) \ + TProtoInfo& With##name(const type value) { \ + name = value; \ + return static_cast<TProtoInfo&>(*this); \ + } + SET_VALUE(EEnumFormat, EnumFormat); + SET_VALUE(ERecursionTraits, Recursion); + SET_VALUE(bool, YtMode); + SET_VALUE(bool, OptionalLists); + SET_VALUE(bool, SyntaxAware); + SET_VALUE(EProtoStringYqlType, StringType); + SET_VALUE(bool, UseJsonName); +}; + +void ProtoTypeBuild(const NProtoBuf::Descriptor* descriptor, + const EEnumFormat enumFormat, + const ERecursionTraits recursion, + const bool optionalLists, + IFunctionTypeInfoBuilder& builder, + TProtoInfo* info, + EProtoStringYqlType stringType = EProtoStringYqlType::Bytes, + const bool syntaxAware = false, + const bool useJsonName = false, + const bool ytMode = false); + +template<class T> +void ProtoTypeBuild(IFunctionTypeInfoBuilder& builder, TProtoInfo* info) { + ProtoTypeBuild(T::GetDescriptor(), info->EnumFormat, info->Recursion, + info->OptionalLists, builder, info, info->StringType, info->SyntaxAware, + info->UseJsonName, info->YtMode); +} + +bool AvoidOptionalScalars(bool syntaxAware, const NProtoBuf::FieldDescriptor* fd); + +} // namespace NUdf +} // namespace NYql diff --git a/ydb/library/yql/protobuf_udf/ut/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/protobuf_udf/ut/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..7a6463cd05 --- /dev/null +++ b/ydb/library/yql/protobuf_udf/ut/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,106 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) + +add_executable(ydb-library-yql-protobuf_udf-ut) +target_compile_options(ydb-library-yql-protobuf_udf-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-library-yql-protobuf_udf-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/protobuf_udf +) +target_link_libraries(ydb-library-yql-protobuf_udf-ut PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + library-yql-protobuf_udf + yt-lib-schema + providers-yt-common + yql-parser-pg_wrapper + udf-service-exception_policy + library-yql-minikql + yql-public-udf + common-schema-mkql + contrib-libs-protobuf +) +target_link_options(ydb-library-yql-protobuf_udf-ut PRIVATE + -Wl,-platform_version,macos,11.0,11.0 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_proto_messages(ydb-library-yql-protobuf_udf-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/protobuf_udf/ut/protobuf_ut.proto +) +target_sources(ydb-library-yql-protobuf_udf-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/protobuf_udf/ut/type_builder_ut.cpp +) +set_property( + TARGET + ydb-library-yql-protobuf_udf-ut + PROPERTY + SPLIT_FACTOR + 1 +) +add_yunittest( + NAME + ydb-library-yql-protobuf_udf-ut + TEST_TARGET + ydb-library-yql-protobuf_udf-ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-library-yql-protobuf_udf-ut + PROPERTY + LABELS + SMALL +) +set_yunittest_property( + TEST + ydb-library-yql-protobuf_udf-ut + PROPERTY + PROCESSORS + 1 +) +target_allocator(ydb-library-yql-protobuf_udf-ut + system_allocator +) +target_proto_addincls(ydb-library-yql-protobuf_udf-ut + ./ + ${CMAKE_SOURCE_DIR}/ + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src +) +target_proto_outs(ydb-library-yql-protobuf_udf-ut + --cpp_out=${CMAKE_BINARY_DIR}/ + --cpp_styleguide_out=${CMAKE_BINARY_DIR}/ +) +vcs_info(ydb-library-yql-protobuf_udf-ut) diff --git a/ydb/library/yql/protobuf_udf/ut/CMakeLists.linux-aarch64.txt b/ydb/library/yql/protobuf_udf/ut/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..b6b44236a9 --- /dev/null +++ b/ydb/library/yql/protobuf_udf/ut/CMakeLists.linux-aarch64.txt @@ -0,0 +1,109 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) + +add_executable(ydb-library-yql-protobuf_udf-ut) +target_compile_options(ydb-library-yql-protobuf_udf-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-library-yql-protobuf_udf-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/protobuf_udf +) +target_link_libraries(ydb-library-yql-protobuf_udf-ut PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-testing-unittest_main + library-yql-protobuf_udf + yt-lib-schema + providers-yt-common + yql-parser-pg_wrapper + udf-service-exception_policy + library-yql-minikql + yql-public-udf + common-schema-mkql + contrib-libs-protobuf +) +target_link_options(ydb-library-yql-protobuf_udf-ut PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_proto_messages(ydb-library-yql-protobuf_udf-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/protobuf_udf/ut/protobuf_ut.proto +) +target_sources(ydb-library-yql-protobuf_udf-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/protobuf_udf/ut/type_builder_ut.cpp +) +set_property( + TARGET + ydb-library-yql-protobuf_udf-ut + PROPERTY + SPLIT_FACTOR + 1 +) +add_yunittest( + NAME + ydb-library-yql-protobuf_udf-ut + TEST_TARGET + ydb-library-yql-protobuf_udf-ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-library-yql-protobuf_udf-ut + PROPERTY + LABELS + SMALL +) +set_yunittest_property( + TEST + ydb-library-yql-protobuf_udf-ut + PROPERTY + PROCESSORS + 1 +) +target_allocator(ydb-library-yql-protobuf_udf-ut + cpp-malloc-jemalloc +) +target_proto_addincls(ydb-library-yql-protobuf_udf-ut + ./ + ${CMAKE_SOURCE_DIR}/ + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src +) +target_proto_outs(ydb-library-yql-protobuf_udf-ut + --cpp_out=${CMAKE_BINARY_DIR}/ + --cpp_styleguide_out=${CMAKE_BINARY_DIR}/ +) +vcs_info(ydb-library-yql-protobuf_udf-ut) diff --git a/ydb/library/yql/protobuf_udf/ut/CMakeLists.linux-x86_64.txt b/ydb/library/yql/protobuf_udf/ut/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..456b5e3d4d --- /dev/null +++ b/ydb/library/yql/protobuf_udf/ut/CMakeLists.linux-x86_64.txt @@ -0,0 +1,111 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) + +add_executable(ydb-library-yql-protobuf_udf-ut) +target_compile_options(ydb-library-yql-protobuf_udf-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-library-yql-protobuf_udf-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/protobuf_udf +) +target_link_libraries(ydb-library-yql-protobuf_udf-ut PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + library-yql-protobuf_udf + yt-lib-schema + providers-yt-common + yql-parser-pg_wrapper + udf-service-exception_policy + library-yql-minikql + yql-public-udf + common-schema-mkql + contrib-libs-protobuf +) +target_link_options(ydb-library-yql-protobuf_udf-ut PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_proto_messages(ydb-library-yql-protobuf_udf-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/protobuf_udf/ut/protobuf_ut.proto +) +target_sources(ydb-library-yql-protobuf_udf-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/protobuf_udf/ut/type_builder_ut.cpp +) +set_property( + TARGET + ydb-library-yql-protobuf_udf-ut + PROPERTY + SPLIT_FACTOR + 1 +) +add_yunittest( + NAME + ydb-library-yql-protobuf_udf-ut + TEST_TARGET + ydb-library-yql-protobuf_udf-ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-library-yql-protobuf_udf-ut + PROPERTY + LABELS + SMALL +) +set_yunittest_property( + TEST + ydb-library-yql-protobuf_udf-ut + PROPERTY + PROCESSORS + 1 +) +target_allocator(ydb-library-yql-protobuf_udf-ut + cpp-malloc-tcmalloc + libs-tcmalloc-no_percpu_cache +) +target_proto_addincls(ydb-library-yql-protobuf_udf-ut + ./ + ${CMAKE_SOURCE_DIR}/ + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src +) +target_proto_outs(ydb-library-yql-protobuf_udf-ut + --cpp_out=${CMAKE_BINARY_DIR}/ + --cpp_styleguide_out=${CMAKE_BINARY_DIR}/ +) +vcs_info(ydb-library-yql-protobuf_udf-ut) diff --git a/ydb/library/yql/protobuf_udf/ut/CMakeLists.txt b/ydb/library/yql/protobuf_udf/ut/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/library/yql/protobuf_udf/ut/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/library/yql/protobuf_udf/ut/CMakeLists.windows-x86_64.txt b/ydb/library/yql/protobuf_udf/ut/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..e86e7bd371 --- /dev/null +++ b/ydb/library/yql/protobuf_udf/ut/CMakeLists.windows-x86_64.txt @@ -0,0 +1,99 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) + +add_executable(ydb-library-yql-protobuf_udf-ut) +target_compile_options(ydb-library-yql-protobuf_udf-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-library-yql-protobuf_udf-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/protobuf_udf +) +target_link_libraries(ydb-library-yql-protobuf_udf-ut PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + library-yql-protobuf_udf + yt-lib-schema + providers-yt-common + yql-parser-pg_wrapper + udf-service-exception_policy + library-yql-minikql + yql-public-udf + common-schema-mkql + contrib-libs-protobuf +) +target_proto_messages(ydb-library-yql-protobuf_udf-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/protobuf_udf/ut/protobuf_ut.proto +) +target_sources(ydb-library-yql-protobuf_udf-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/protobuf_udf/ut/type_builder_ut.cpp +) +set_property( + TARGET + ydb-library-yql-protobuf_udf-ut + PROPERTY + SPLIT_FACTOR + 1 +) +add_yunittest( + NAME + ydb-library-yql-protobuf_udf-ut + TEST_TARGET + ydb-library-yql-protobuf_udf-ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-library-yql-protobuf_udf-ut + PROPERTY + LABELS + SMALL +) +set_yunittest_property( + TEST + ydb-library-yql-protobuf_udf-ut + PROPERTY + PROCESSORS + 1 +) +target_allocator(ydb-library-yql-protobuf_udf-ut + system_allocator +) +target_proto_addincls(ydb-library-yql-protobuf_udf-ut + ./ + ${CMAKE_SOURCE_DIR}/ + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src +) +target_proto_outs(ydb-library-yql-protobuf_udf-ut + --cpp_out=${CMAKE_BINARY_DIR}/ + --cpp_styleguide_out=${CMAKE_BINARY_DIR}/ +) +vcs_info(ydb-library-yql-protobuf_udf-ut) diff --git a/ydb/library/yql/protobuf_udf/ut/protobuf_ut.proto b/ydb/library/yql/protobuf_udf/ut/protobuf_ut.proto new file mode 100644 index 0000000000..ed1c283420 --- /dev/null +++ b/ydb/library/yql/protobuf_udf/ut/protobuf_ut.proto @@ -0,0 +1,299 @@ +import "yt/yt_proto/yt/formats/extension.proto"; + +package NYql.NProtoTest; + +message TIntegral +{ + optional double DoubleField = 1; + optional float FloatField = 2; + optional int32 Int32Field = 3; + optional int64 Int64Field = 4; + optional uint32 Uint32Field = 5; + optional uint64 Uint64Field = 6; + optional sint32 Sint32Field = 7; + optional sint64 Sint64Field = 8; + optional fixed32 Fixed32Field = 9; + optional fixed64 Fixed64Field = 10; + optional sfixed32 Sfixed32Field = 11; + optional sfixed64 Sfixed64Field = 12; + optional bool BoolField = 13; + enum TriBool + { + TRI_FALSE = 0; + TRI_TRUE = 1; + TRI_UNDEF = -1; + } + optional TriBool EnumField = 14; +} + +message TRepeated +{ + repeated int32 Int32Field = 1; +} + +message TWithTypeOptions +{ + enum Color + { + WHITE = 0; + BLUE = 1; + RED = -1; + } + + message TEmbedded + { + option (NYT.default_field_flags) = SERIALIZATION_YT; + + optional Color ColorIntField = 1 [(NYT.flags) = ENUM_INT]; + optional Color ColorStringField = 2 [(NYT.flags) = ENUM_STRING]; + optional bytes AnyField = 3 [(NYT.flags) = ANY]; + required bytes RequiredAnyField = 4 [(NYT.flags) = ANY]; + } + + optional Color ColorIntField = 1 [(NYT.flags) = ENUM_INT]; + optional Color ColorStringField = 2 [(NYT.flags) = ENUM_STRING]; + optional bytes AnyField = 3 [(NYT.flags) = ANY]; + required bytes RequiredAnyField = 4 [(NYT.flags) = ANY]; + // Not supported: + // optional bytes OtherColumnsField = 5 [(NYT.flags) = OTHER_COLUMNS]; + optional TEmbedded EmbeddedField = 6 [(NYT.flags) = SERIALIZATION_YT]; + repeated Color RepeatedEnumIntField = 7 [(NYT.flags) = SERIALIZATION_YT, (NYT.flags) = ENUM_INT]; +} + +message TOneOf +{ + oneof Chooser + { + double DoubleField = 1; + int32 Int32Field = 2; + } + optional bool BoolField = 3; +} + +message TAggregated +{ + optional string StringField = 1; + optional bytes BytesField = 2; + optional TIntegral NestedField = 3; + optional TRepeated NestedRepeatedField = 4; + optional TOneOf NestedOneOfField = 5; + optional TAggregated NestedRecursiveField = 6; +} + + +//////////////////////////////////////////////////////////////////////////////// + +message TUrlRow +{ + optional string Host = 1 [(NYT.column_name) = "Host"]; + optional string Path = 2 [(NYT.column_name) = "Path"]; + optional sint32 HttpCode = 3 [(NYT.column_name) = "HttpCode"]; +} + +message TRowFieldSerializationOption +{ + optional TUrlRow UrlRow_1 = 1 [(NYT.flags) = SERIALIZATION_YT]; + optional TUrlRow UrlRow_2 = 2; +} + +message TRowMessageSerializationOption +{ + option (NYT.default_field_flags) = SERIALIZATION_YT; + optional TUrlRow UrlRow_1 = 1; + optional TUrlRow UrlRow_2 = 2; +} + +message TRowMixedSerializationOptions +{ + option (NYT.default_field_flags) = SERIALIZATION_YT; + optional TUrlRow UrlRow_1 = 1; + optional TUrlRow UrlRow_2 = 2 [(NYT.flags) = SERIALIZATION_PROTOBUF]; +} + +message TRowSerializedRepeatedFields +{ + option (NYT.default_field_flags) = SERIALIZATION_YT; + repeated int64 Ints = 1; + repeated TUrlRow UrlRows = 2; +} + +message TUrlRowWithColumnNames +{ + optional string Host = 1 [(NYT.column_name) = "Host_ColumnName", (NYT.key_column_name) = "Host_KeyColumnName"]; + optional string Path = 2 [(NYT.key_column_name) = "Path_KeyColumnName"]; + optional sint32 HttpCode = 3; +} + +message TRowMixedSerializationOptions_ColumnNames +{ + option (NYT.default_field_flags) = SERIALIZATION_YT; + optional TUrlRowWithColumnNames UrlRow_1 = 1; + optional TUrlRowWithColumnNames UrlRow_2 = 2 [(NYT.flags) = SERIALIZATION_PROTOBUF]; +} + +message TNoOptionInheritance +{ + message TDeepestEmbedded + { + optional int64 x = 1; + } + + message TEmbedded + { + optional TDeepestEmbedded embedded = 1; + } + + message TEmbeddedYt + { + option (NYT.default_field_flags) = SERIALIZATION_YT; + + optional TDeepestEmbedded embedded = 1; + } + + message TEmbeddedProtobuf + { + option (NYT.default_field_flags) = SERIALIZATION_PROTOBUF; + + optional TDeepestEmbedded embedded = 1; + } + + optional TEmbeddedYt EmbeddedYt_YtOption = 1 [(NYT.flags) = SERIALIZATION_YT]; + optional TEmbeddedYt EmbeddedYt_ProtobufOption = 2 [(NYT.flags) = SERIALIZATION_PROTOBUF]; + optional TEmbeddedYt EmbeddedYt_NoOption = 3; + optional TEmbeddedProtobuf EmbeddedProtobuf_YtOption = 4 [(NYT.flags) = SERIALIZATION_YT]; + optional TEmbeddedProtobuf EmbeddedProtobuf_ProtobufOption = 5 [(NYT.flags) = SERIALIZATION_PROTOBUF]; + optional TEmbeddedProtobuf EmbeddedProtobuf_NoOption = 6; + optional TEmbedded Embedded_YtOption = 7 [(NYT.flags) = SERIALIZATION_YT]; + optional TEmbedded Embedded_ProtobufOption = 8 [(NYT.flags) = SERIALIZATION_PROTOBUF]; + optional TEmbedded Embedded_NoOption = 9; +} + +message TOptionalList +{ + repeated int64 OptionalListInt64 = 1 [(NYT.flags) = OPTIONAL_LIST, (NYT.flags) = SERIALIZATION_YT]; +} + +message TWithMap +{ + option (NYT.default_field_flags) = SERIALIZATION_YT; + + message TEmbedded { + optional int64 x = 1; + optional string y = 2; + } + + message TEmbeddedProtobuf { + option (NYT.default_field_flags) = SERIALIZATION_PROTOBUF; + optional int64 x = 1; + optional string y = 2; + } + + message TEmbeddedSubfieldProtobuf { + option (NYT.default_field_flags) = SERIALIZATION_YT; + optional int64 x = 1; + optional string y = 2; + + message TSubField { + repeated string List = 1 [(NYT.column_name) = "list"]; + } + + required TSubField SubField = 3 [(NYT.column_name) = "sub_field", (NYT.flags) = SERIALIZATION_PROTOBUF]; + } + + map<int64, TEmbedded> MapDefault = 1; + map<int64, TEmbedded> MapListOfStructsLegacy = 2 [(NYT.flags) = MAP_AS_LIST_OF_STRUCTS_LEGACY]; + map<int64, TEmbedded> MapListOfStructs = 3 [(NYT.flags) = MAP_AS_LIST_OF_STRUCTS]; + map<int64, TEmbedded> MapOptionalDict = 4 [(NYT.flags) = MAP_AS_OPTIONAL_DICT]; + map<int64, TEmbedded> MapDict = 5 [(NYT.flags) = MAP_AS_DICT]; + + map<int64, TEmbeddedProtobuf> Embedded_MapDefault = 6; + map<int64, TEmbeddedProtobuf> Embedded_MapListOfStructsLegacy = 7 [(NYT.flags) = MAP_AS_LIST_OF_STRUCTS_LEGACY]; + map<int64, TEmbeddedProtobuf> Embedded_MapListOfStructs = 8 [(NYT.flags) = MAP_AS_LIST_OF_STRUCTS]; + map<int64, TEmbeddedProtobuf> Embedded_MapOptionalDict = 9 [(NYT.flags) = MAP_AS_OPTIONAL_DICT]; + map<int64, TEmbeddedProtobuf> Embedded_MapDict = 10 [(NYT.flags) = MAP_AS_DICT]; + + map<int64, TEmbeddedSubfieldProtobuf> EmbeddedSubfield_MapDefault = 11; + map<int64, TEmbeddedSubfieldProtobuf> EmbeddedSubfield_MapListOfStructsLegacy = 12 [(NYT.flags) = MAP_AS_LIST_OF_STRUCTS_LEGACY]; + map<int64, TEmbeddedSubfieldProtobuf> EmbeddedSubfield_MapListOfStructs = 13 [(NYT.flags) = MAP_AS_LIST_OF_STRUCTS]; + map<int64, TEmbeddedSubfieldProtobuf> EmbeddedSubfield_MapOptionalDict = 14 [(NYT.flags) = MAP_AS_OPTIONAL_DICT]; + map<int64, TEmbeddedSubfieldProtobuf> EmbeddedSubfield_MapDict = 15 [(NYT.flags) = MAP_AS_DICT]; +} + +message TWithOneof +{ + option (NYT.default_field_flags) = SERIALIZATION_YT; + + message TEmbedded + { + option (NYT.default_field_flags) = SERIALIZATION_YT; + oneof Oneof { + int64 x = 1; + string y = 2; + } + } + + message TDefaultSeparateFields + { + option (NYT.default_oneof_flags) = SEPARATE_FIELDS; + option (NYT.default_field_flags) = SERIALIZATION_YT; + + optional string field = 1; + + oneof Oneof2 + { + option (NYT.variant_field_name) = "variant_field_name"; + option (NYT.oneof_flags) = VARIANT; + string y2 = 4; + TEmbedded z2 = 6; + int64 x2 = 2; + } + + oneof Oneof1 + { + int64 x1 = 10; + string y1 = 3; + TEmbedded z1 = 5; + } + } + + message TNoDefault + { + option (NYT.default_field_flags) = SERIALIZATION_YT; + + optional string field = 1; + + oneof Oneof2 + { + string y2 = 4; + TEmbedded z2 = 6; + int64 x2 = 2; + } + + oneof Oneof1 + { + option (NYT.oneof_flags) = SEPARATE_FIELDS; + int64 x1 = 10; + string y1 = 3; + TEmbedded z1 = 5; + } + } + + message TSerializationProtobuf + { + oneof Oneof + { + int64 x1 = 2; + string y1 = 1; + TEmbedded z1 = 3; + } + } + + optional TDefaultSeparateFields DefaultSeparateFields = 1; + optional TNoDefault NoDefault = 2; + optional TSerializationProtobuf SerializationProtobuf = 3; + + oneof TopLevelOneof + { + int64 MemberOfTopLevelOneof = 4; + } +} diff --git a/ydb/library/yql/protobuf_udf/ut/type_builder_ut.cpp b/ydb/library/yql/protobuf_udf/ut/type_builder_ut.cpp new file mode 100644 index 0000000000..cf88426205 --- /dev/null +++ b/ydb/library/yql/protobuf_udf/ut/type_builder_ut.cpp @@ -0,0 +1,126 @@ +#include "type_builder.h" + +//#include <alice/wonderlogs/protos/wonderlogs.pb.h> +#include <ydb/library/yql/protobuf_udf/ut/protobuf_ut.pb.h> +#include <ydb/library/yql/providers/yt/lib/schema/schema.h> +#include <ydb/library/yql/providers/yt/common/yql_names.h> + +#include <ydb/library/yql/providers/common/schema/mkql/yql_mkql_schema.h> +#include <ydb/library/yql/minikql/mkql_alloc.h> +#include <ydb/library/yql/minikql/mkql_node.h> +#include <ydb/library/yql/minikql/mkql_type_builder.h> +#include <ydb/library/yql/minikql/mkql_program_builder.h> +#include <ydb/library/yql/minikql/mkql_function_registry.h> +#include <ydb/library/yql/public/udf/udf_types.h> +#include <yt/cpp/mapreduce/interface/format.h> + +#include <library/cpp/yson/node/node_io.h> +#include <library/cpp/testing/unittest/registar.h> + +#include <util/generic/string.h> + +#include <algorithm> + +using namespace NYql; +using namespace NKikimr::NMiniKQL; + +namespace { + +struct TSetup { + TSetup() + : Alloc(__LOCATION__) + , Env(Alloc) + , FunctionRegistry(CreateFunctionRegistry(IBuiltinFunctionRegistry::TPtr())) + , TypeInfoHelper(new TTypeInfoHelper()) + , FunctionTypeInfoBuilder(Env, TypeInfoHelper, "", nullptr, {}) + , PgmBuilder(Env, *FunctionRegistry) + { + } + + TScopedAlloc Alloc; + TTypeEnvironment Env; + IFunctionRegistry::TPtr FunctionRegistry; + NUdf::ITypeInfoHelper::TPtr TypeInfoHelper; + TFunctionTypeInfoBuilder FunctionTypeInfoBuilder; + TProgramBuilder PgmBuilder; +}; + +} // unnamed + +Y_UNIT_TEST_SUITE(TProtobufTypeBuilderTests) { + + template <typename TProto, EEnumFormat EnumFormat = EEnumFormat::Number, ERecursionTraits Recursion = ERecursionTraits::Fail> + void CheckYtSchemaCompatibility () { + TSetup setup; + + NUdf::TProtoInfo info; + info.YtMode = true; + info.EnumFormat = EnumFormat; + info.Recursion = Recursion; + + NUdf::ProtoTypeBuild<TProto>(setup.FunctionTypeInfoBuilder, &info); + auto protoType = NCommon::TypeToYsonNode(static_cast<const NKikimr::NMiniKQL::TStructType*>(info.StructType)); + + auto ytSchema = NYT::CreateTableSchema(*TProto::GetDescriptor(), true); + auto ytSchemaType = YTSchemaToRowSpec(ytSchema.ToNode())[RowSpecAttrType]; + // Yt schema has struct fields in protobuf-order. Make them in name ascending order + ytSchemaType = NCommon::TypeToYsonNode(NCommon::ParseTypeFromYson(ytSchemaType, setup.PgmBuilder, Cerr)); + + UNIT_ASSERT_EQUAL_C( + NYT::NodeToYsonString(protoType, ::NYson::EYsonFormat::Text), + NYT::NodeToYsonString(ytSchemaType, ::NYson::EYsonFormat::Text), + "\nUdfType: " << NYT::NodeToYsonString(protoType, ::NYson::EYsonFormat::Pretty) + << "\nYtSchemaType: " << NYT::NodeToYsonString(ytSchemaType, ::NYson::EYsonFormat::Pretty) + ); + } + + Y_UNIT_TEST(YtMode_Map) { + CheckYtSchemaCompatibility<NYql::NProtoTest::TWithMap>(); + } + + Y_UNIT_TEST(YtMode_OneOf) { + CheckYtSchemaCompatibility<NYql::NProtoTest::TWithOneof>(); + } + + Y_UNIT_TEST(YtMode_OptionalList) { + CheckYtSchemaCompatibility<NYql::NProtoTest::TOptionalList>(); + } + + Y_UNIT_TEST(YtMode_NoOptionInheritance) { + CheckYtSchemaCompatibility<NYql::NProtoTest::TNoOptionInheritance>(); + } + + Y_UNIT_TEST(YtMode_RowMixedSerializationOptions_ColumnNames) { + CheckYtSchemaCompatibility<NYql::NProtoTest::TRowMixedSerializationOptions_ColumnNames>(); + } + + Y_UNIT_TEST(YtMode_RowSerializedRepeatedFields) { + CheckYtSchemaCompatibility<NYql::NProtoTest::TRowSerializedRepeatedFields>(); + } + + Y_UNIT_TEST(YtMode_RowMixedSerializationOptions) { + CheckYtSchemaCompatibility<NYql::NProtoTest::TRowMixedSerializationOptions>(); + } + + Y_UNIT_TEST(YtMode_RowMessageSerializationOption) { + CheckYtSchemaCompatibility<NYql::NProtoTest::TRowMessageSerializationOption>(); + } + + Y_UNIT_TEST(YtMode_RowFieldSerializationOption) { + CheckYtSchemaCompatibility<NYql::NProtoTest::TRowFieldSerializationOption>(); + } + + Y_UNIT_TEST(YtMode_WithTypeOptions) { + CheckYtSchemaCompatibility<NYql::NProtoTest::TWithTypeOptions>(); + } + + Y_UNIT_TEST(YtMode_Aggregated) { + CheckYtSchemaCompatibility<NYql::NProtoTest::TAggregated>(); + } + +/* + Y_UNIT_TEST(YtMode_Wonderlog) { + CheckYtSchemaCompatibility<NAlice::NWonderlogs::TWonderlog, EEnumFormat::Name, ERecursionTraits::Bytes>(); + } +*/ +}; diff --git a/ydb/library/yql/protobuf_udf/ut/ya.make b/ydb/library/yql/protobuf_udf/ut/ya.make new file mode 100644 index 0000000000..b61ae76a6d --- /dev/null +++ b/ydb/library/yql/protobuf_udf/ut/ya.make @@ -0,0 +1,23 @@ +UNITTEST_FOR(ydb/library/yql/protobuf_udf) + +SRCS( + type_builder_ut.cpp + protobuf_ut.proto +) + +PEERDIR( + ydb/library/yql/providers/yt/lib/schema + ydb/library/yql/providers/yt/common + ydb/library/yql/parser/pg_wrapper + ydb/library/yql/public/udf/service/exception_policy + ydb/library/yql/minikql + ydb/library/yql/public/udf + ydb/library/yql/providers/common/schema/mkql + contrib/libs/protobuf + + #alice/wonderlogs/protos +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/library/yql/protobuf_udf/value_builder.cpp b/ydb/library/yql/protobuf_udf/value_builder.cpp new file mode 100644 index 0000000000..0d7575332a --- /dev/null +++ b/ydb/library/yql/protobuf_udf/value_builder.cpp @@ -0,0 +1,335 @@ +#include "type_builder.h" +#include "value_builder.h" + +#include <ydb/library/yql/public/udf/udf_value.h> +#include <ydb/library/yql/public/udf/udf_value_builder.h> +#include <ydb/library/yql/public/udf/udf_terminator.h> +#include <ydb/library/yql/minikql/mkql_node_cast.h> +#include <ydb/library/yql/minikql/mkql_node.h> + +namespace NYql { +namespace NUdf { + +using namespace NProtoBuf; + +TProtobufValue::TProtobufValue(const TProtoInfo& info) + : Info_(info) +{ +} + +TProtobufValue::~TProtobufValue() +{ } + +TUnboxedValue TProtobufValue::Run( + const IValueBuilder* valueBuilder, + const TUnboxedValuePod* args) const +{ + auto blob = args[0].AsStringRef(); + + try { + auto result = this->Parse(TStringBuf(blob.Data(), blob.Size())); + if (result == nullptr) { + return TUnboxedValue(); + } + auto proto(result); + return FillValueFromProto(*proto.Get(), valueBuilder, Info_); + } catch (const std::exception& e) { + UdfTerminate(e.what()); + } +} + +TProtobufSerialize::TProtobufSerialize(const TProtoInfo& info) + : Info_(info) +{ +} + +TProtobufSerialize::~TProtobufSerialize() +{ } + +TUnboxedValue TProtobufSerialize::Run( + const IValueBuilder* valueBuilder, + const TUnboxedValuePod* args) const +{ + try { + TAutoPtr<Message> proto = MakeProto(); + FillProtoFromValue(args[0], *proto, Info_); + TMaybe<TString> result = this->Serialize(*proto); + if (!result) { + return TUnboxedValue(); + } + return valueBuilder->NewString(*result); + } catch (const std::exception& e) { + UdfTerminate(e.what()); + } +} + +namespace { + +static TUnboxedValuePod CreateEnumValue( + const IValueBuilder* valueBuilder, + const NProtoBuf::EnumValueDescriptor* desc, + const EEnumFormat format, + TFlags<EFieldFlag> fieldFlags) +{ + if (fieldFlags.HasFlags(EFieldFlag::EnumInt)) { + return TUnboxedValuePod((i64)desc->number()); + } else if (fieldFlags.HasFlags(EFieldFlag::EnumString)) { + return valueBuilder->NewString(desc->name()).Release(); + } + switch (format) { + case EEnumFormat::Number: + return TUnboxedValuePod((i32)desc->number()); + case EEnumFormat::Name: + return valueBuilder->NewString(desc->name()).Release(); + case EEnumFormat::FullName: + return valueBuilder->NewString(desc->full_name()).Release(); + } + + Y_UNREACHABLE(); +} + +static TUnboxedValuePod CreateSingleField( + const IValueBuilder* valueBuilder, + const Message& proto, + const FieldDescriptor* fd, + const TProtoInfo& info, + TFlags<EFieldFlag> fieldFlags) +{ + auto r = proto.GetReflection(); + +#define FIELD_TO_VALUE(EProtoCppType, ProtoGet) \ +case FieldDescriptor::EProtoCppType: { \ + return TUnboxedValuePod(r->ProtoGet(proto, fd)); \ +} + + switch (fd->cpp_type()) { + FIELD_TO_VALUE(CPPTYPE_INT32, GetInt32); + FIELD_TO_VALUE(CPPTYPE_INT64, GetInt64); + FIELD_TO_VALUE(CPPTYPE_UINT32, GetUInt32); + FIELD_TO_VALUE(CPPTYPE_UINT64, GetUInt64); + FIELD_TO_VALUE(CPPTYPE_DOUBLE, GetDouble); + FIELD_TO_VALUE(CPPTYPE_FLOAT, GetFloat); + FIELD_TO_VALUE(CPPTYPE_BOOL, GetBool); + + case FieldDescriptor::CPPTYPE_ENUM: { + return CreateEnumValue(valueBuilder, r->GetEnum(proto, fd), info.EnumFormat, fieldFlags); + } + case FieldDescriptor::CPPTYPE_STRING: { + return valueBuilder->NewString(r->GetString(proto, fd)).Release(); + } + case FieldDescriptor::CPPTYPE_MESSAGE: { + const auto& protoField = r->GetMessage(proto, fd); + if (fieldFlags.HasFlags(EFieldFlag::Binary)) { + return valueBuilder->NewString(protoField.SerializeAsString()).Release(); + } else { + auto msg = FillValueFromProto(protoField, valueBuilder, info); + return fd->is_optional() ? msg.Release().MakeOptional() : msg.Release(); + } + } + } +#undef FIELD_TO_VALUE + + return TUnboxedValuePod(); +} + +static TUnboxedValuePod CreateDefaultValue( + const IValueBuilder* valueBuilder, + const FieldDescriptor* fd, + const TProtoInfo& info, + TFlags<EFieldFlag> fieldFlags) +{ +#define DEFAULT_TO_VALUE(EProtoCppType, ValueGet) \ +case FieldDescriptor::EProtoCppType: { \ + return TUnboxedValuePod(fd->ValueGet()); \ + break; \ +} + + switch (fd->cpp_type()) { + DEFAULT_TO_VALUE(CPPTYPE_INT32, default_value_int32); + DEFAULT_TO_VALUE(CPPTYPE_INT64, default_value_int64); + DEFAULT_TO_VALUE(CPPTYPE_UINT32, default_value_uint32); + DEFAULT_TO_VALUE(CPPTYPE_UINT64, default_value_uint64); + DEFAULT_TO_VALUE(CPPTYPE_DOUBLE, default_value_double); + DEFAULT_TO_VALUE(CPPTYPE_FLOAT, default_value_float); + DEFAULT_TO_VALUE(CPPTYPE_BOOL, default_value_bool); + + case FieldDescriptor::CPPTYPE_ENUM: + return CreateEnumValue(valueBuilder, fd->default_value_enum(), info.EnumFormat, fieldFlags); + + case FieldDescriptor::CPPTYPE_STRING: + return valueBuilder->NewString(fd->default_value_string()).Release(); + default: + return TUnboxedValuePod(); +} +#undef DEFAULT_TO_VALUE +} + +static TUnboxedValuePod CreateRepeatedField( + const IValueBuilder* valueBuilder, + const Message& proto, + const FieldDescriptor* fd, + const TProtoInfo& info, + TFlags<EFieldFlag> fieldFlags) +{ + auto r = proto.GetReflection(); + +#define REPEATED_FIELD_TO_VALUE(EProtoCppType, ProtoGet) \ +case FieldDescriptor::EProtoCppType: { \ + for (int i = 0; i < endI; ++i) { \ + *items++ = TUnboxedValuePod(r->ProtoGet(proto, fd, i)); \ + } \ + break; \ +} + + const auto endI = r->FieldSize(proto, fd); + NUdf::TUnboxedValue *items = nullptr; + auto list = valueBuilder->NewArray(endI, items); + switch (fd->cpp_type()) { + REPEATED_FIELD_TO_VALUE(CPPTYPE_INT32, GetRepeatedInt32); + REPEATED_FIELD_TO_VALUE(CPPTYPE_INT64, GetRepeatedInt64); + REPEATED_FIELD_TO_VALUE(CPPTYPE_UINT32, GetRepeatedUInt32); + REPEATED_FIELD_TO_VALUE(CPPTYPE_UINT64, GetRepeatedUInt64); + REPEATED_FIELD_TO_VALUE(CPPTYPE_DOUBLE, GetRepeatedDouble); + REPEATED_FIELD_TO_VALUE(CPPTYPE_FLOAT, GetRepeatedFloat); + REPEATED_FIELD_TO_VALUE(CPPTYPE_BOOL, GetRepeatedBool); + + case FieldDescriptor::CPPTYPE_ENUM: + for (int i = 0; i < endI; ++i) { + *items++ = CreateEnumValue(valueBuilder, r->GetRepeatedEnum(proto, fd, i), info.EnumFormat, fieldFlags); + } + break; + case FieldDescriptor::CPPTYPE_STRING: + for (int i = 0; i < endI; ++i) { + *items++ = valueBuilder->NewString(r->GetRepeatedString(proto, fd, i)); + } + break; + case FieldDescriptor::CPPTYPE_MESSAGE: + for (int i = 0; i < endI; ++i) { + const auto& protoFieldElement = r->GetRepeatedMessage(proto, fd, i); + if (fieldFlags.HasFlags(EFieldFlag::Binary)) { + *items++ = valueBuilder->NewString(protoFieldElement.SerializeAsString()); + } else { + *items++ = FillValueFromProto(protoFieldElement, valueBuilder, info); + } + } + break; + } +#undef REPEATED_FIELD_TO_VALUE + + return list.Release(); +} + +static TUnboxedValuePod CreateMapField( + const IValueBuilder* valueBuilder, + const Message& proto, + const FieldDescriptor* fd, + const TProtoInfo& info, + const TMessageInfo& msgInfo, + TFlags<EFieldFlag> fieldFlags) +{ + auto r = proto.GetReflection(); + + auto dictType = msgInfo.DictTypes.Value(fd->number(), nullptr); + Y_ENSURE(dictType); + auto dictBuilder = valueBuilder->NewDict(dictType, TDictFlags::Hashed); + + const auto noBinaryFlags = TFlags<EFieldFlag>(fieldFlags).RemoveFlags(EFieldFlag::Binary); + for (int i = 0, end = r->FieldSize(proto, fd); i < end; ++i) { + const auto& protoDictElement = r->GetRepeatedMessage(proto, fd, i); + dictBuilder->Add( + TUnboxedValue(CreateSingleField(valueBuilder, protoDictElement, fd->message_type()->map_key(), info, noBinaryFlags)), + TUnboxedValue(CreateSingleField(valueBuilder, protoDictElement, fd->message_type()->map_value(), info, fieldFlags)) + ); + } + + return dictBuilder->Build().Release(); +} + +} + +TUnboxedValue FillValueFromProto( + const Message& proto, + const IValueBuilder* valueBuilder, + const TProtoInfo& info) +{ + const auto d = proto.GetDescriptor(); + const auto r = proto.GetReflection(); + const auto mi = info.Messages.find(d->full_name()); + + if (mi == info.Messages.end()) { + ythrow yexception() << "unknown message " << d->full_name(); + } + + const auto msgInfo = mi->second; + TUnboxedValue* items = nullptr; + auto value = valueBuilder->NewArray(msgInfo->FieldsCount, items); + + auto makeValue = [&](const FieldDescriptor* fd, const TMessageInfo::TFieldInfo& fInfo) -> TUnboxedValuePod { + if (fInfo.Flags.HasFlags(EFieldFlag::Void)) { + return TUnboxedValuePod::Void(); + } + + if (fd->is_map() && fInfo.Flags.HasFlags(EFieldFlag::Dict)) { + if (r->FieldSize(proto, fd) == 0 && fInfo.Flags.HasFlags(EFieldFlag::OptionalContainer)) { + return TUnboxedValuePod(); + } else { + return CreateMapField(valueBuilder, proto, fd, info, *msgInfo, fInfo.Flags); + } + } else if (fd->is_optional()) { + if (r->HasField(proto, fd)) { + return CreateSingleField(valueBuilder, proto, fd, info, fInfo.Flags); + } else if (fd->has_default_value() || AvoidOptionalScalars(info.SyntaxAware, fd)) { + return CreateDefaultValue(valueBuilder, fd, info, fInfo.Flags); + } else { + return TUnboxedValuePod(); + } + } else if (fd->is_repeated()) { + if (r->FieldSize(proto, fd) > 0) { + return CreateRepeatedField(valueBuilder, proto, fd, info, fInfo.Flags); + } else { + if (info.OptionalLists || fInfo.Flags.HasFlags(EFieldFlag::OptionalContainer)) { + return TUnboxedValuePod(); + } else { + return valueBuilder->NewEmptyList().Release(); + } + } + } else if (fd->is_required()) { + if (r->HasField(proto, fd)) { + return CreateSingleField(valueBuilder, proto, fd, info, fInfo.Flags); + } else { + ythrow yexception() << "required field " << fd->name() << " has no value"; + } + } + return TUnboxedValuePod(); + }; + + THashSet<const OneofDescriptor*> visitedOneofs; + for (int i = 0, end = d->field_count(); i < end; ++i) { + const FieldDescriptor* fd = d->field(i); + const auto& fInfo = msgInfo->Fields[fd->number()]; + + if (auto oneofDescriptor = fd->containing_oneof(); info.YtMode && oneofDescriptor && fInfo.Flags.HasFlags(EFieldFlag::Variant)) { + if (visitedOneofs.insert(oneofDescriptor).second) { + items[fInfo.Pos] = TUnboxedValuePod(); + if (auto ofd = r->GetOneofFieldDescriptor(proto, oneofDescriptor)) { + const auto& ofInfo = msgInfo->Fields[ofd->number()]; + if (fInfo.Pos != ofInfo.Pos) { + ythrow yexception() << "mismatch of oneof field " << ofd->name() << " position"; + } + const ui32* varIndex = msgInfo->VariantIndicies.FindPtr(ofd->number()); + if (!varIndex) { + ythrow yexception() << "missing oneof field " << ofd->name() << " index"; + } + items[ofInfo.Pos] = valueBuilder->NewVariant(*varIndex, TUnboxedValue(makeValue(ofd, ofInfo))).Release().MakeOptional(); + } + } + } else { + items[fInfo.Pos] = makeValue(fd, fInfo); + } + } + + return value; +} + +} // namespace NUdf +} // namespace NYql diff --git a/ydb/library/yql/protobuf_udf/value_builder.h b/ydb/library/yql/protobuf_udf/value_builder.h new file mode 100644 index 0000000000..5362638d23 --- /dev/null +++ b/ydb/library/yql/protobuf_udf/value_builder.h @@ -0,0 +1,50 @@ +#pragma once + +#include "type_builder.h" +#include "proto_builder.h" +#include <ydb/library/yql/public/udf/udf_value.h> + +#include <google/protobuf/message.h> + +namespace NYql { +namespace NUdf { + +class TProtobufValue : public TBoxedValue { +public: + TProtobufValue(const TProtoInfo& info); + ~TProtobufValue() override; + + TUnboxedValue Run( + const IValueBuilder* valueBuilder, + const TUnboxedValuePod* args) const override; + + virtual TAutoPtr<NProtoBuf::Message> Parse(const TStringBuf& data) const = 0; + +protected: + const TProtoInfo Info_; +}; + +class TProtobufSerialize : public TBoxedValue { +public: + TProtobufSerialize(const TProtoInfo& info); + ~TProtobufSerialize() override; + + TUnboxedValue Run( + const IValueBuilder* valueBuilder, + const TUnboxedValuePod* args) const override; + + virtual TMaybe<TString> Serialize(const NProtoBuf::Message& proto) const = 0; + + virtual TAutoPtr<NProtoBuf::Message> MakeProto() const = 0; + +protected: + const TProtoInfo Info_; +}; + +TUnboxedValue FillValueFromProto( + const NProtoBuf::Message& proto, + const IValueBuilder* valueBuilder, + const TProtoInfo& info); + +} // namespace NUdf +} // namespace NYql diff --git a/ydb/library/yql/protobuf_udf/ya.make b/ydb/library/yql/protobuf_udf/ya.make new file mode 100644 index 0000000000..578a42a2e8 --- /dev/null +++ b/ydb/library/yql/protobuf_udf/ya.make @@ -0,0 +1,27 @@ +LIBRARY() + +YQL_ABI_VERSION(2 9 0) + +SRCS( + proto_builder.cpp + module.cpp + type_builder.cpp + value_builder.cpp +) + +PEERDIR( + contrib/libs/protobuf + library/cpp/protobuf/yql + ydb/library/yql/public/udf + ydb/library/yql/minikql + ydb/library/yql/utils + yt/cpp/mapreduce/interface + yt/yt_proto/yt/formats + yt/yt_proto/yt/formats +) + +END() + +RECURSE_FOR_TESTS( + ut +) diff --git a/ydb/library/yql/ya.make b/ydb/library/yql/ya.make index 3aafe2834a..89825313e1 100644 --- a/ydb/library/yql/ya.make +++ b/ydb/library/yql/ya.make @@ -4,6 +4,7 @@ RECURSE( dq minikql parser + protobuf_udf protos providers public |