diff options
author | vvvv <vvvv@ydb.tech> | 2023-06-09 19:34:57 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2023-06-09 19:34:57 +0300 |
commit | ca86bc92d4c212d4b0eedca237fba606da157eb5 (patch) | |
tree | c305973b893eb88062ae9d409b59d1770bb9cb42 | |
parent | cfc79c6e4c0150a0e1239b28e2ff5195d6ef5059 (diff) | |
download | ydb-ca86bc92d4c212d4b0eedca237fba606da157eb5.tar.gz |
Kernel for String::LevensteinDistance + BinaryKernel helper
8 files changed, 142 insertions, 42 deletions
diff --git a/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h b/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h index ed0755f46df..c68da466a85 100644 --- a/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h +++ b/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h @@ -322,6 +322,88 @@ struct TUnaryKernelExec { } }; +template <typename TDerived> +struct TBinaryKernelExec { + static arrow::Status Do(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) { + auto& state = dynamic_cast<TUdfKernelState&>(*ctx->state()); + auto& reader1 = state.GetReader(0); + auto& reader2 = state.GetReader(1); + const auto& arg1 = batch.values[0]; + const auto& arg2 = batch.values[1]; + if (arg1.is_scalar() && arg2.is_scalar()) { + auto& builder = state.GetScalarBuilder(); + auto item1 = reader1.GetScalarItem(*arg1.scalar()); + auto item2 = reader2.GetScalarItem(*arg2.scalar()); + TDerived::Process(item1, item2, [&](TBlockItem out) { + *res = builder.Build(out); + }); + } + else if (arg1.is_scalar() && arg2.is_array()) { + auto item1 = reader1.GetScalarItem(*arg1.scalar()); + auto& array2 = *arg2.array(); + auto& builder = state.GetArrayBuilder(); + size_t maxBlockLength = builder.MaxLength(); + Y_ENSURE(maxBlockLength > 0); + TVector<std::shared_ptr<arrow::ArrayData>> outputArrays; + for (int64_t i = 0; i < array2.length;) { + for (size_t j = 0; j < maxBlockLength && i < array2.length; ++j, ++i) { + auto item2 = reader2.GetItem(array2, i); + TDerived::Process(item1, item2, [&](TBlockItem out) { + builder.Add(out); + }); + } + auto outputDatum = builder.Build(false); + ForEachArrayData(outputDatum, [&](const auto& arr) { outputArrays.push_back(arr); }); + } + + *res = MakeArray(outputArrays); + } else if (arg1.is_array() && arg2.is_scalar()) { + auto& array1 = *arg1.array(); + auto item2 = reader2.GetScalarItem(*arg2.scalar()); + auto& builder = state.GetArrayBuilder(); + size_t maxBlockLength = builder.MaxLength(); + Y_ENSURE(maxBlockLength > 0); + TVector<std::shared_ptr<arrow::ArrayData>> outputArrays; + for (int64_t i = 0; i < array1.length;) { + for (size_t j = 0; j < maxBlockLength && i < array1.length; ++j, ++i) { + auto item1 = reader1.GetItem(array1, i); + TDerived::Process(item1, item2, [&](TBlockItem out) { + builder.Add(out); + }); + } + auto outputDatum = builder.Build(false); + ForEachArrayData(outputDatum, [&](const auto& arr) { outputArrays.push_back(arr); }); + } + + *res = MakeArray(outputArrays); + } else { + Y_ENSURE(arg1.is_array() && arg2.is_array()); + auto& array1 = *arg1.array(); + auto& array2 = *arg2.array(); + auto& builder = state.GetArrayBuilder(); + size_t maxBlockLength = builder.MaxLength(); + Y_ENSURE(maxBlockLength > 0); + TVector<std::shared_ptr<arrow::ArrayData>> outputArrays; + Y_ENSURE(array1.length == array2.length); + for (int64_t i = 0; i < array1.length;) { + for (size_t j = 0; j < maxBlockLength && i < array1.length; ++j, ++i) { + auto item1 = reader1.GetItem(array1, i); + auto item2 = reader2.GetItem(array2, i); + TDerived::Process(item1, item2, [&](TBlockItem out) { + builder.Add(out); + }); + } + auto outputDatum = builder.Build(false); + ForEachArrayData(outputDatum, [&](const auto& arr) { outputArrays.push_back(arr); }); + } + + *res = MakeArray(outputArrays); + } + + return arrow::Status::OK(); + } +}; + template <typename TInput, typename TOutput, TOutput(*Core)(TInput)> arrow::Status UnaryPreallocatedExecImpl(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) { Y_UNUSED(ctx); @@ -373,11 +455,11 @@ public: #define BEGIN_SIMPLE_ARROW_UDF(udfName, signatureFunc) \ BEGIN_ARROW_UDF(udfName##_BlocksImpl, signatureFunc) \ - UDF(udfName, builder.SimpleSignature<signatureFunc>().SupportsBlocks();) + UDF_IMPL(udfName, builder.SimpleSignature<signatureFunc>().SupportsBlocks();, ;, ;, "", "", udfName##_BlocksImpl) #define BEGIN_SIMPLE_STRICT_ARROW_UDF(udfName, signatureFunc) \ BEGIN_ARROW_UDF(udfName##_BlocksImpl, signatureFunc) \ - UDF(udfName, builder.SimpleSignature<signatureFunc>().SupportsBlocks().IsStrict();) + UDF_IMPL(udfName, builder.SimpleSignature<signatureFunc>().SupportsBlocks().IsStrict();, ;, ;, "", "", udfName##_BlocksImpl) #define END_ARROW_UDF(udfNameBlocks, exec) \ inline bool udfNameBlocks::DeclareSignature(\ @@ -393,9 +475,4 @@ public: } #define END_SIMPLE_ARROW_UDF(udfName, exec) \ - END_ARROW_UDF(udfName##_BlocksImpl, exec) \ - template<> \ - struct ::NYql::NUdf::TUdfTraits<udfName> { \ - static constexpr bool SupportsBlocks = true; \ - using TBlockUdf = udfName##_BlocksImpl; \ - }; + END_ARROW_UDF(udfName##_BlocksImpl, exec) diff --git a/ydb/library/yql/public/udf/udf_helpers.h b/ydb/library/yql/public/udf/udf_helpers.h index 6e12a51c91a..d240e31117e 100644 --- a/ydb/library/yql/public/udf/udf_helpers.h +++ b/ydb/library/yql/public/udf/udf_helpers.h @@ -42,9 +42,10 @@ namespace NUdf { } } -#define UDF_IMPL_EX(udfName, typeBody, members, init, irResourceId, irFunctionName, create_impl) \ +#define UDF_IMPL_EX(udfName, typeBody, members, init, irResourceId, irFunctionName, blockType, create_impl) \ class udfName: public ::NYql::NUdf::TBoxedValue { \ - public: \ + public: \ + using TBlockType = blockType; \ explicit udfName(::NYql::NUdf::IFunctionTypeInfoBuilder& builder) \ : Pos_(GetSourcePosition(builder)) \ { \ @@ -93,10 +94,10 @@ namespace NUdf { const ::NYql::NUdf::IValueBuilder* valueBuilder, \ const ::NYql::NUdf::TUnboxedValuePod* args) const -#define UDF_IMPL(udfName, typeBody, members, init, irResourceId, irFunctionName) \ - UDF_IMPL_EX(udfName, typeBody, members, init, irResourceId, irFunctionName, builder.Implementation(new udfName(builder));) +#define UDF_IMPL(udfName, typeBody, members, init, irResourceId, irFunctionName, blockType) \ + UDF_IMPL_EX(udfName, typeBody, members, init, irResourceId, irFunctionName, blockType, builder.Implementation(new udfName(builder));) -#define UDF(udfName, typeBody) UDF_IMPL(udfName, typeBody, ;, ;, "", "") +#define UDF(udfName, typeBody) UDF_IMPL(udfName, typeBody, ;, ;, "", "", void) #define UDF_RUN_IMPL(udfName, typeBody, members, init, irResourceId, irFunctionName) \ struct udfName##Members { \ @@ -181,10 +182,10 @@ namespace NUdf { UDF(udfName, builder.SimpleSignature<signature>().IsStrict();) #define SIMPLE_UDF_WITH_IR(udfName, signature, irResourceId, irFunctionName) \ - UDF_IMPL(udfName, builder.SimpleSignature<signature>();, ;, ;, irResourceId, irFunctionName) + UDF_IMPL(udfName, builder.SimpleSignature<signature>();, ;, ;, irResourceId, irFunctionName, void) #define SIMPLE_UDF_WITH_CREATE_IMPL(udfName, signature, create_impl) \ - UDF_IMPL_EX(udfName, builder.SimpleSignature<signature>();, ;, ;, "", "", create_impl) + UDF_IMPL_EX(udfName, builder.SimpleSignature<signature>();, ;, ;, "", "", void, create_impl) #define SIMPLE_UDF_OPTIONS(udfName, signature, options) \ UDF(udfName, builder.SimpleSignature<signature>(); options;) @@ -309,16 +310,11 @@ public: } }; -template <typename TUdf> -struct TUdfTraits { - static constexpr bool SupportsBlocks = false; - using TBlockUdf = void; -}; - template<typename... TUdfs> class TSimpleUdfModuleHelper : public IUdfModule { Y_HAS_SUBTYPE(TTypeAwareMarker); + Y_HAS_SUBTYPE(TBlockType); public: void CleanupOnTerminate() const override { @@ -331,9 +327,11 @@ public: r->SetTypeAwareness(); } - if constexpr (TUdfTraits<TUdfType>::SupportsBlocks) { - auto rBlocks = names.Add(TUdfTraits<TUdfType>::TBlockUdf::Name()); - rBlocks->SetTypeAwareness(); + if constexpr (THasTBlockType<TUdfType>::value) { + if constexpr (!std::is_same_v<typename TUdfType::TBlockType, void>) { + auto rBlocks = names.Add(TUdfType::TBlockType::Name()); + rBlocks->SetTypeAwareness(); + } } } @@ -355,9 +353,11 @@ public: bool typesOnly = (flags & TFlags::TypesOnly); bool found = TUdfType::DeclareSignature(name, userType, builder, typesOnly); if (!found) { - if constexpr (TUdfTraits<TUdfType>::SupportsBlocks) { - found = TUdfTraits<TUdfType>::TBlockUdf::DeclareSignature(name, userType, builder, typesOnly); - } + if constexpr (THasTBlockType<TUdfType>::value) { + if constexpr (!std::is_same_v<typename TUdfType::TBlockType, void>) { + found = TUdfType::TBlockType::DeclareSignature(name, userType, builder, typesOnly); + } + } } return found; diff --git a/ydb/library/yql/udfs/common/math/math_udf.cpp b/ydb/library/yql/udfs/common/math/math_udf.cpp index 394b8cdf7f8..fc5806e290a 100644 --- a/ydb/library/yql/udfs/common/math/math_udf.cpp +++ b/ydb/library/yql/udfs/common/math/math_udf.cpp @@ -50,18 +50,18 @@ XX(Rem, TOptional<i64>(TAutoMap<i64>, i64), ;) \ XXL(Round, double(TAutoMap<double>, TPrecision), builder.OptionalArgs(1)) -#define MATH_UDF_IMPL(name, signature, options) \ - UDF_IMPL(T##name, builder.SimpleSignature<signature>(); options;, ;, ;, "/llvm_bc/Math", #name "IR") { \ - TUnboxedValuePod res; \ - name##IR(this, &res, valueBuilder, args); \ - return res; \ +#define MATH_UDF_IMPL(name, signature, options) \ + UDF_IMPL(T##name, builder.SimpleSignature<signature>(); options;, ;, ;, "/llvm_bc/Math", #name "IR", void) { \ + TUnboxedValuePod res; \ + name##IR(this, &res, valueBuilder, args); \ + return res; \ } -#define MATH_STRICT_UDF_IMPL(name, signature, options) \ - UDF_IMPL(T##name, builder.SimpleSignature<signature>().IsStrict(); options;, ;, ;, "/llvm_bc/Math", #name "IR") { \ - TUnboxedValuePod res; \ - name##IR(this, &res, valueBuilder, args); \ - return res; \ +#define MATH_STRICT_UDF_IMPL(name, signature, options) \ + UDF_IMPL(T##name, builder.SimpleSignature<signature>().IsStrict(); options;, ;, ;, "/llvm_bc/Math", #name "IR", void) { \ + TUnboxedValuePod res; \ + name##IR(this, &res, valueBuilder, args); \ + return res; \ } #define REGISTER_MATH_UDF(udfName, ...) T##udfName, diff --git a/ydb/library/yql/udfs/common/string/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/udfs/common/string/CMakeLists.darwin-x86_64.txt index 443f7fdf0ca..fc63c16d947 100644 --- a/ydb/library/yql/udfs/common/string/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/udfs/common/string/CMakeLists.darwin-x86_64.txt @@ -13,6 +13,7 @@ target_link_libraries(string_udf INTERFACE yutil yql-public-udf public-udf-support + public-udf-arrow library-cpp-charset cpp-deprecated-split cpp-html-pcdata @@ -24,7 +25,7 @@ target_link_libraries(string_udf INTERFACE add_global_library_for(string_udf.global string_udf) target_compile_options(string_udf.global PRIVATE -DUDF_ABI_VERSION_MAJOR=2 - -DUDF_ABI_VERSION_MINOR=28 + -DUDF_ABI_VERSION_MINOR=33 -DUDF_ABI_VERSION_PATCH=0 ) target_link_libraries(string_udf.global PUBLIC @@ -32,6 +33,7 @@ target_link_libraries(string_udf.global PUBLIC yutil yql-public-udf public-udf-support + public-udf-arrow library-cpp-charset cpp-deprecated-split cpp-html-pcdata diff --git a/ydb/library/yql/udfs/common/string/CMakeLists.linux-aarch64.txt b/ydb/library/yql/udfs/common/string/CMakeLists.linux-aarch64.txt index 2548da1b236..8213bf95ba9 100644 --- a/ydb/library/yql/udfs/common/string/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/udfs/common/string/CMakeLists.linux-aarch64.txt @@ -14,6 +14,7 @@ target_link_libraries(string_udf INTERFACE yutil yql-public-udf public-udf-support + public-udf-arrow library-cpp-charset cpp-deprecated-split cpp-html-pcdata @@ -25,7 +26,7 @@ target_link_libraries(string_udf INTERFACE add_global_library_for(string_udf.global string_udf) target_compile_options(string_udf.global PRIVATE -DUDF_ABI_VERSION_MAJOR=2 - -DUDF_ABI_VERSION_MINOR=28 + -DUDF_ABI_VERSION_MINOR=33 -DUDF_ABI_VERSION_PATCH=0 ) target_link_libraries(string_udf.global PUBLIC @@ -34,6 +35,7 @@ target_link_libraries(string_udf.global PUBLIC yutil yql-public-udf public-udf-support + public-udf-arrow library-cpp-charset cpp-deprecated-split cpp-html-pcdata diff --git a/ydb/library/yql/udfs/common/string/CMakeLists.linux-x86_64.txt b/ydb/library/yql/udfs/common/string/CMakeLists.linux-x86_64.txt index 2548da1b236..8213bf95ba9 100644 --- a/ydb/library/yql/udfs/common/string/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/udfs/common/string/CMakeLists.linux-x86_64.txt @@ -14,6 +14,7 @@ target_link_libraries(string_udf INTERFACE yutil yql-public-udf public-udf-support + public-udf-arrow library-cpp-charset cpp-deprecated-split cpp-html-pcdata @@ -25,7 +26,7 @@ target_link_libraries(string_udf INTERFACE add_global_library_for(string_udf.global string_udf) target_compile_options(string_udf.global PRIVATE -DUDF_ABI_VERSION_MAJOR=2 - -DUDF_ABI_VERSION_MINOR=28 + -DUDF_ABI_VERSION_MINOR=33 -DUDF_ABI_VERSION_PATCH=0 ) target_link_libraries(string_udf.global PUBLIC @@ -34,6 +35,7 @@ target_link_libraries(string_udf.global PUBLIC yutil yql-public-udf public-udf-support + public-udf-arrow library-cpp-charset cpp-deprecated-split cpp-html-pcdata diff --git a/ydb/library/yql/udfs/common/string/CMakeLists.windows-x86_64.txt b/ydb/library/yql/udfs/common/string/CMakeLists.windows-x86_64.txt index 443f7fdf0ca..fc63c16d947 100644 --- a/ydb/library/yql/udfs/common/string/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/udfs/common/string/CMakeLists.windows-x86_64.txt @@ -13,6 +13,7 @@ target_link_libraries(string_udf INTERFACE yutil yql-public-udf public-udf-support + public-udf-arrow library-cpp-charset cpp-deprecated-split cpp-html-pcdata @@ -24,7 +25,7 @@ target_link_libraries(string_udf INTERFACE add_global_library_for(string_udf.global string_udf) target_compile_options(string_udf.global PRIVATE -DUDF_ABI_VERSION_MAJOR=2 - -DUDF_ABI_VERSION_MINOR=28 + -DUDF_ABI_VERSION_MINOR=33 -DUDF_ABI_VERSION_PATCH=0 ) target_link_libraries(string_udf.global PUBLIC @@ -32,6 +33,7 @@ target_link_libraries(string_udf.global PUBLIC yutil yql-public-udf public-udf-support + public-udf-arrow library-cpp-charset cpp-deprecated-split cpp-html-pcdata diff --git a/ydb/library/yql/udfs/common/string/string_udf.cpp b/ydb/library/yql/udfs/common/string/string_udf.cpp index c2bce4ae420..479d9fc82e6 100644 --- a/ydb/library/yql/udfs/common/string/string_udf.cpp +++ b/ydb/library/yql/udfs/common/string/string_udf.cpp @@ -9,6 +9,8 @@ #include <library/cpp/string_utils/levenshtein_diff/levenshtein_diff.h> #include <library/cpp/string_utils/quote/quote.h> +#include <ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h> + #include <util/charset/wide.h> #include <util/generic/vector.h> #include <util/stream/format.h> @@ -27,6 +29,7 @@ using namespace NKikimr; using namespace NUdf; namespace { + #define STRING_UDF(udfName, function) \ SIMPLE_STRICT_UDF(T##udfName, char*(TAutoMap<char*>)) { \ const TString input(args[0].AsStringRef()); \ @@ -369,7 +372,7 @@ namespace { return valueBuilder->NewString(JoinSeq(delimeter, items)); } - SIMPLE_STRICT_UDF(TLevensteinDistance, ui64(TAutoMap<char*>, TAutoMap<char*>)) { + BEGIN_SIMPLE_STRICT_ARROW_UDF(TLevensteinDistance, ui64(TAutoMap<char*>, TAutoMap<char*>)) { Y_UNUSED(valueBuilder); const TStringBuf left(args[0].AsStringRef()); const TStringBuf right(args[1].AsStringRef()); @@ -377,6 +380,18 @@ namespace { return TUnboxedValuePod(result); } + struct TLevensteinDistanceKernelExec : public TBinaryKernelExec<TLevensteinDistanceKernelExec> { + template <typename TSink> + static void Process(TBlockItem arg1, TBlockItem arg2, const TSink& sink) { + const std::string_view left(arg1.AsStringRef()); + const std::string_view right(arg2.AsStringRef()); + const ui64 result = NLevenshtein::Distance(left, right); + sink(TBlockItem(result)); + } + }; + + END_SIMPLE_ARROW_UDF(TLevensteinDistance, TLevensteinDistanceKernelExec::Do); + static constexpr ui64 padLim = 1000000; SIMPLE_UDF_OPTIONS(TRightPad, char*(TAutoMap<char*>, ui64, TOptional<char*>), builder.OptionalArgs(1)) { |