diff options
author | vvvv <vvvv@ydb.tech> | 2023-01-24 22:06:28 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2023-01-24 22:06:28 +0300 |
commit | 1da809860cf3fdcd31cae0108ca6970ffd76af2f (patch) | |
tree | 8aebdf350c8345b6d11b9ce401c99c1ad5946418 | |
parent | 4c5fb919548a5a124691ae4283475595786b72fe (diff) | |
download | ydb-1da809860cf3fdcd31cae0108ca6970ffd76af2f.tar.gz |
simple block udfs - initial infrastructure. Implemented BlockJust.
31 files changed, 783 insertions, 250 deletions
diff --git a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp index 82578b3931..24ad460afe 100644 --- a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp +++ b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp @@ -4861,7 +4861,7 @@ bool CollectBlockRewrites(const TMultiExprType* multiInputType, bool keepInputCo TExprNode::TListType funcArgs; std::string_view arrowFunctionName; - if (node->IsCallable({"And", "Or", "Xor", "Not", "Coalesce", "If"})) + if (node->IsCallable({"And", "Or", "Xor", "Not", "Coalesce", "If", "Just"})) { for (auto& child : node->ChildrenList()) { if (child->IsComplete()) { @@ -4919,19 +4919,25 @@ bool CollectBlockRewrites(const TMultiExprType* multiInputType, bool keepInputCo YQL_ENSURE(!node->IsComplete() && hasBlockArg); const TTypeAnnotationNode* outType = ctx.MakeType<TBlockExprType>(node->GetTypeAnn()); if (isUdf) { + TExprNode::TPtr extraTypes; + bool renameFunc = false; + if (node->Head().Child(2)->IsCallable("TupleType")) { + extraTypes = node->Head().Child(2)->ChildPtr(2); + } else { + renameFunc = true; + extraTypes = ctx.NewCallable(node->Head().Pos(), "TupleType", {}); + } + funcArgs.push_back(ctx.Builder(node->Head().Pos()) .Callable("Udf") - .Add(0, node->Head().ChildPtr(0)) + .Atom(0, TString(node->Head().Child(0)->Content()) + (renameFunc ? "_BlocksImpl" : "")) .Add(1, node->Head().ChildPtr(1)) .Callable(2, "TupleType") .Callable(0, "TupleType") .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { for (ui32 i = 1; i < node->ChildrenSize(); ++i) { - auto child = node->Child(i); - auto originalTypeNode = node->Head().Child(2)->Head().Child(i - 1); - parent.Callable(i - 1, child->IsComplete() ? "ScalarType" : "BlockType") - .Add(0, originalTypeNode) - .Seal(); + auto type = argTypes[i - 1]; + parent.Add(i - 1, ExpandType(node->Head().Pos(), *type, ctx)); } return parent; @@ -4939,7 +4945,7 @@ bool CollectBlockRewrites(const TMultiExprType* multiInputType, bool keepInputCo .Seal() .Callable(1, "StructType") .Seal() - .Add(2, node->Head().Child(2)->ChildPtr(2)) + .Add(2, extraTypes) .Seal() .Add(3, node->Head().ChildPtr(3)) .Seal() diff --git a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp index 9984ed3766..b2db1032cd 100644 --- a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp @@ -232,6 +232,29 @@ IGraphTransformer::TStatus BlockIfWrapper(const TExprNode::TPtr& input, TExprNod return IGraphTransformer::TStatus::Ok; } +IGraphTransformer::TStatus BlockJustWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { + Y_UNUSED(output); + if (!EnsureArgsCount(*input, 1, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + auto child = input->Child(0); + if (!EnsureBlockOrScalarType(*child, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + bool isScalar; + const TTypeAnnotationNode* blockItemType = GetBlockItemType(*child->GetTypeAnn(), isScalar); + const TTypeAnnotationNode* resultType = ctx.Expr.MakeType<TOptionalExprType>(blockItemType); + + if (isScalar) { + resultType = ctx.Expr.MakeType<TScalarExprType>(resultType); + } else { + resultType = ctx.Expr.MakeType<TBlockExprType>(resultType); + } + input->SetTypeAnn(resultType); + return IGraphTransformer::TStatus::Ok; +} + IGraphTransformer::TStatus BlockFuncWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) { Y_UNUSED(output); if (!EnsureMinArgsCount(*input, 2U, ctx.Expr)) { diff --git a/ydb/library/yql/core/type_ann/type_ann_blocks.h b/ydb/library/yql/core/type_ann/type_ann_blocks.h index 07fed8ae83..39b99c7079 100644 --- a/ydb/library/yql/core/type_ann/type_ann_blocks.h +++ b/ydb/library/yql/core/type_ann/type_ann_blocks.h @@ -14,6 +14,7 @@ namespace NTypeAnnImpl { IGraphTransformer::TStatus BlockCoalesceWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus BlockLogicalWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus BlockIfWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); + IGraphTransformer::TStatus BlockJustWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus BlockFuncWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx); IGraphTransformer::TStatus BlockBitCastWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx); IGraphTransformer::TStatus BlockCombineAllWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx); diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp index 47778ac052..66499ddeeb 100644 --- a/ydb/library/yql/core/type_ann/type_ann_core.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp @@ -11857,6 +11857,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot> Functions["BlockXor"] = &BlockLogicalWrapper; Functions["BlockNot"] = &BlockLogicalWrapper; Functions["BlockIf"] = &BlockIfWrapper; + Functions["BlockJust"] = &BlockJustWrapper; ExtFunctions["BlockFunc"] = &BlockFuncWrapper; ExtFunctions["BlockBitCast"] = &BlockBitCastWrapper; diff --git a/ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin.txt b/ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin.txt index 4ded4d6924..1167e76e2c 100644 --- a/ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin.txt +++ b/ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin.txt @@ -45,6 +45,7 @@ target_sources(yql-minikql-comp_nodes PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_if.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_impl.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_just.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt index b7a73edfa8..fe47fb92fa 100644 --- a/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt @@ -46,6 +46,7 @@ target_sources(yql-minikql-comp_nodes PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_if.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_impl.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_just.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux.txt b/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux.txt index b7a73edfa8..fe47fb92fa 100644 --- a/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux.txt +++ b/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux.txt @@ -46,6 +46,7 @@ target_sources(yql-minikql-comp_nodes PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_if.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_impl.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_just.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_just.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_just.cpp new file mode 100644 index 0000000000..3dac133b81 --- /dev/null +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_just.cpp @@ -0,0 +1,88 @@ +#include "mkql_block_just.h" +#include "mkql_block_impl.h" + +#include <ydb/library/yql/minikql/arrow/arrow_defs.h> +#include <ydb/library/yql/minikql/arrow/arrow_util.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> +#include <ydb/library/yql/minikql/mkql_node_cast.h> + +namespace NKikimr { +namespace NMiniKQL { + +namespace { + +template<bool Trivial> +class TJustBlockExec { +public: + TJustBlockExec(const std::shared_ptr<arrow::DataType>& returnArrowType) + : ReturnArrowType(returnArrowType) + {} + + arrow::Status Exec(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) const { + arrow::Datum inputDatum = batch.values[0]; + if (Trivial) { + *res = inputDatum; + return arrow::Status::OK(); + } + + if (inputDatum.is_scalar()) { + std::vector<std::shared_ptr<arrow::Scalar>> arrowValue; + arrowValue.emplace_back(inputDatum.scalar()); + *res = arrow::Datum(std::make_shared<arrow::StructScalar>(arrowValue, ReturnArrowType)); + } else { + auto array = inputDatum.array(); + auto newArrayData = arrow::ArrayData::Make(ReturnArrowType, array->length, { nullptr }, 0, 0); + newArrayData->child_data.push_back(array); + *res = arrow::Datum(newArrayData); + } + + return arrow::Status::OK(); + } + +private: + const std::shared_ptr<arrow::DataType> ReturnArrowType; +}; + +template<bool Trivial> +std::shared_ptr<arrow::compute::ScalarKernel> MakeBlockJustKernel(const TVector<TType*>& argTypes, TType* resultType) { + using TExec = TJustBlockExec<Trivial>; + + std::shared_ptr<arrow::DataType> returnArrowType; + MKQL_ENSURE(ConvertArrowType(AS_TYPE(TBlockType, resultType)->GetItemType(), returnArrowType), "Unsupported arrow type"); + auto exec = std::make_shared<TExec>(returnArrowType); + auto kernel = std::make_shared<arrow::compute::ScalarKernel>(ConvertToInputTypes(argTypes), ConvertToOutputType(resultType), + [exec](arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) { + return exec->Exec(ctx, batch, res); + }); + + kernel->null_handling = arrow::compute::NullHandling::COMPUTED_NO_PREALLOCATE; + return kernel; +} + +} // namespace + +IComputationNode* WrapBlockJust(TCallable& callable, const TComputationNodeFactoryContext& ctx) { + MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args"); + + auto data = callable.GetInput(0); + + auto dataType = AS_TYPE(TBlockType, data.GetStaticType()); + auto itemType = dataType->GetItemType(); + + auto dataCompute = LocateNode(ctx.NodeLocator, callable, 0); + + TVector<IComputationNode*> argsNodes = { dataCompute }; + TVector<TType*> argsTypes = { dataType }; + + std::shared_ptr<arrow::compute::ScalarKernel> kernel; + if (itemType->IsOptional() || itemType->IsVariant()) { + kernel = MakeBlockJustKernel<false>(argsTypes, callable.GetType()->GetReturnType()); + } else { + kernel = MakeBlockJustKernel<true>(argsTypes, callable.GetType()->GetReturnType()); + } + + return new TBlockFuncNode(ctx.Mutables, std::move(argsNodes), argsTypes, *kernel, kernel); +} + +} +} diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_just.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_just.h new file mode 100644 index 0000000000..bc0dd9a237 --- /dev/null +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_just.h @@ -0,0 +1,10 @@ +#pragma once +#include <ydb/library/yql/minikql/computation/mkql_computation_node.h> + +namespace NKikimr { +namespace NMiniKQL { + +IComputationNode* WrapBlockJust(TCallable& callable, const TComputationNodeFactoryContext& ctx); + +} +} diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp index 63df468c8e..38e76c0aa4 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp @@ -9,6 +9,7 @@ #include "mkql_block_agg.h" #include "mkql_block_coalesce.h" #include "mkql_block_if.h" +#include "mkql_block_just.h" #include "mkql_block_logical.h" #include "mkql_block_compress.h" #include "mkql_block_skiptake.h" @@ -283,6 +284,7 @@ struct TCallableComputationNodeBuilderFuncMapFiller { {"BlockOr", &WrapBlockOr}, {"BlockXor", &WrapBlockXor}, {"BlockNot", &WrapBlockNot}, + {"BlockJust", &WrapBlockJust}, {"BlockCompress", &WrapBlockCompress}, {"BlockExpandChunked", &WrapBlockExpandChunked}, {"BlockCombineAll", &WrapBlockCombineAll}, diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp index fd6f946229..1f98018a67 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.cpp +++ b/ydb/library/yql/minikql/mkql_program_builder.cpp @@ -5358,6 +5358,14 @@ TRuntimeNode TProgramBuilder::BlockIf(TRuntimeNode condition, TRuntimeNode thenB return TRuntimeNode(callableBuilder.Build(), false); } +TRuntimeNode TProgramBuilder::BlockJust(TRuntimeNode data) { + const auto initialType = AS_TYPE(TBlockType, data.GetStaticType()); + auto returnType = NewBlockType(NewOptionalType(initialType->GetItemType()), initialType->GetShape()); + TCallableBuilder callableBuilder(Env, __func__, returnType); + callableBuilder.Add(data); + return TRuntimeNode(callableBuilder.Build(), false); +} + TRuntimeNode TProgramBuilder::BlockFunc(const std::string_view& funcName, TType* returnType, const TArrayRef<const TRuntimeNode>& args) { for (const auto& arg : args) { MKQL_ENSURE(arg.GetStaticType()->IsBlock(), "Expected Block type"); diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h index d8012bd063..7f861c60c0 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.h +++ b/ydb/library/yql/minikql/mkql_program_builder.h @@ -258,6 +258,8 @@ public: TRuntimeNode BlockXor(TRuntimeNode first, TRuntimeNode second); TRuntimeNode BlockIf(TRuntimeNode condition, TRuntimeNode thenBranch, TRuntimeNode elseBranch); + TRuntimeNode BlockJust(TRuntimeNode data); + TRuntimeNode BlockFunc(const std::string_view& funcName, TType* returnType, const TArrayRef<const TRuntimeNode>& args); TRuntimeNode BlockBitCast(TRuntimeNode value, TType* targetType); TRuntimeNode BlockCombineAll(TRuntimeNode flow, std::optional<ui32> filterColumn, diff --git a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp index f5519c07be..0230ba2659 100644 --- a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp +++ b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp @@ -394,6 +394,7 @@ TMkqlCommonCallableCompiler::TShared::TShared() { {"Dec", &TProgramBuilder::Decrement}, {"Not", &TProgramBuilder::Not}, {"BlockNot", &TProgramBuilder::BlockNot}, + {"BlockJust", &TProgramBuilder::BlockJust}, {"BitNot", &TProgramBuilder::BitNot}, diff --git a/ydb/library/yql/public/udf/CMakeLists.darwin.txt b/ydb/library/yql/public/udf/CMakeLists.darwin.txt index 61c5fb48ca..fbdaa1df7c 100644 --- a/ydb/library/yql/public/udf/CMakeLists.darwin.txt +++ b/ydb/library/yql/public/udf/CMakeLists.darwin.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(arrow) add_subdirectory(service) add_subdirectory(support) add_subdirectory(tz) diff --git a/ydb/library/yql/public/udf/CMakeLists.linux-aarch64.txt b/ydb/library/yql/public/udf/CMakeLists.linux-aarch64.txt index d35c91df0d..32b31cde2d 100644 --- a/ydb/library/yql/public/udf/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/public/udf/CMakeLists.linux-aarch64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(arrow) add_subdirectory(service) add_subdirectory(support) add_subdirectory(tz) diff --git a/ydb/library/yql/public/udf/CMakeLists.linux.txt b/ydb/library/yql/public/udf/CMakeLists.linux.txt index d35c91df0d..32b31cde2d 100644 --- a/ydb/library/yql/public/udf/CMakeLists.linux.txt +++ b/ydb/library/yql/public/udf/CMakeLists.linux.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(arrow) add_subdirectory(service) add_subdirectory(support) add_subdirectory(tz) diff --git a/ydb/library/yql/public/udf/arrow/CMakeLists.darwin.txt b/ydb/library/yql/public/udf/arrow/CMakeLists.darwin.txt new file mode 100644 index 0000000000..62f5a45d7f --- /dev/null +++ b/ydb/library/yql/public/udf/arrow/CMakeLists.darwin.txt @@ -0,0 +1,22 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(public-udf-arrow) +target_compile_options(public-udf-arrow PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(public-udf-arrow PUBLIC + contrib-libs-cxxsupp + yutil + yql-public-udf + libs-apache-arrow +) +target_sources(public-udf-arrow PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.cpp +) diff --git a/ydb/library/yql/public/udf/arrow/CMakeLists.linux-aarch64.txt b/ydb/library/yql/public/udf/arrow/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..d3d13ca78a --- /dev/null +++ b/ydb/library/yql/public/udf/arrow/CMakeLists.linux-aarch64.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(public-udf-arrow) +target_compile_options(public-udf-arrow PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(public-udf-arrow PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + yql-public-udf + libs-apache-arrow +) +target_sources(public-udf-arrow PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.cpp +) diff --git a/ydb/library/yql/public/udf/arrow/CMakeLists.linux.txt b/ydb/library/yql/public/udf/arrow/CMakeLists.linux.txt new file mode 100644 index 0000000000..d3d13ca78a --- /dev/null +++ b/ydb/library/yql/public/udf/arrow/CMakeLists.linux.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(public-udf-arrow) +target_compile_options(public-udf-arrow PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(public-udf-arrow PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + yql-public-udf + libs-apache-arrow +) +target_sources(public-udf-arrow PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.cpp +) diff --git a/ydb/library/yql/public/udf/arrow/CMakeLists.txt b/ydb/library/yql/public/udf/arrow/CMakeLists.txt new file mode 100644 index 0000000000..bede1861df --- /dev/null +++ b/ydb/library/yql/public/udf/arrow/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE) + include(CMakeLists.darwin.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux.txt) +endif() diff --git a/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.cpp b/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.cpp new file mode 100644 index 0000000000..dea156b206 --- /dev/null +++ b/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.cpp @@ -0,0 +1,7 @@ +#include "udf_arrow_helpers.h" + +namespace NYql { +namespace NUdf { + +} +} diff --git a/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h b/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h new file mode 100644 index 0000000000..80c3525f24 --- /dev/null +++ b/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h @@ -0,0 +1,245 @@ +#include <ydb/library/yql/public/udf/udf_type_builder.h> +#include <ydb/library/yql/public/udf/udf_value.h> +#include <ydb/library/yql/public/udf/udf_helpers.h> +#include <ydb/library/yql/public/udf/udf_data_type.h> +#include <ydb/library/yql/public/udf/udf_type_inspection.h> + +#include <arrow/array/array_base.h> +#include <arrow/array/util.h> +#include <arrow/c/bridge.h> +#include <arrow/chunked_array.h> +#include <arrow/compute/kernel.h> + +namespace NYql { +namespace NUdf { + +using TExec = arrow::Status(*)(arrow::compute::KernelContext*, const arrow::compute::ExecBatch&, arrow::Datum*); + +class TSimpleArrowUdfImpl : public TBoxedValue { +public: + TSimpleArrowUdfImpl(const TVector<std::shared_ptr<arrow::DataType>>& argTypes, bool onlyScalars, IArrowType::TPtr&& returnType, + TExec exec, IFunctionTypeInfoBuilder& builder, const TString& name) + : ArgTypes_(argTypes) + , OnlyScalars_(onlyScalars) + , ReturnType_(std::move(returnType)) + , Exec_(exec) + , Pos_(GetSourcePosition(builder)) + , Name_(name) + , KernelContext_(&ExecContext_) + {} + + TUnboxedValue Run(const IValueBuilder* valueBuilder, const TUnboxedValuePod* args) const final { + try { + TVector<arrow::Datum> datums(ArgTypes_.size()); + for (ui32 i = 0; i < ArgTypes_.size(); ++i) { + bool isScalar; + ui64 length; + ui32 chunkCount = valueBuilder->GetArrowBlockChunks(args[i], isScalar, length); + if (isScalar) { + ArrowArray a; + valueBuilder->ExportArrowBlock(args[i], 0, &a); + auto res = arrow::ImportArray(&a, ArgTypes_[i]); + if (!res.status().ok()) { + throw yexception() << res.status().ToString(); + } + + auto arr = std::move(res).ValueOrDie(); + auto scalarRes = arr->GetScalar(0); + if (!scalarRes.status().ok()) { + throw yexception() << scalarRes.status().ToString(); + } + + auto scalar = std::move(scalarRes).ValueOrDie(); + datums[i] = scalar; + } else { + TVector<std::shared_ptr<arrow::Array>> imported(chunkCount); + for (ui32 i = 0; i < chunkCount; ++i) { + ArrowArray a; + valueBuilder->ExportArrowBlock(args[i], i, &a); + auto arrRes = arrow::ImportArray(&a, ArgTypes_[i]); + if (!arrRes.status().ok()) { + UdfTerminate(arrRes.status().ToString().c_str()); + } + + imported[i] = std::move(arrRes).ValueOrDie(); + } + + if (chunkCount == 1) { + datums[i] = imported.front(); + } else { + datums[i] = arrow::ChunkedArray::Make(std::move(imported), ArgTypes_[i]).ValueOrDie(); + } + } + } + + // TODO dechunking, scalar executor + Y_ENSURE(false); + Y_UNUSED(Exec_); + Y_UNUSED(KernelContext_); + arrow::Datum res; + if (OnlyScalars_) { + auto arrRes = arrow::MakeArrayFromScalar(*res.scalar(), 1); + if (!arrRes.status().ok()) { + throw yexception() << arrRes.status().ToString(); + } + + auto arr = std::move(arrRes).ValueOrDie(); + ArrowArray a; + auto status = arrow::ExportArray(*arr, &a); + if (!status.ok()) { + throw yexception() << status.ToString(); + } + + return valueBuilder->ImportArrowBlock(&a, 1, true, *ReturnType_); + } else { + TVector<ArrowArray> a; + if (res.is_array()) { + a.resize(1); + auto status = arrow::ExportArray(*res.make_array(), &a[0]); + if (!status.ok()) { + throw yexception() << status.ToString(); + } + } else { + Y_ENSURE(res.is_arraylike()); + a.resize(res.chunks().size()); + for (ui32 i = 0; i < res.chunks().size(); ++i) { + auto status = arrow::ExportArray(*res.chunks()[i], &a[i]); + if (!status.ok()) { + throw yexception() << status.ToString(); + } + } + } + + return valueBuilder->ImportArrowBlock(a.data(), a.size(), false, *ReturnType_); + } + } catch (const std::exception&) { + TStringBuilder sb; + sb << Pos_ << " "; + sb << CurrentExceptionMessage(); + sb << Endl << "[" << Name_ << "]"; + UdfTerminate(sb.c_str()); + } + } + +private: + const TVector<std::shared_ptr<arrow::DataType>> ArgTypes_; + const bool OnlyScalars_; + IArrowType::TPtr ReturnType_; + const TExec Exec_; + TSourcePosition Pos_; + const TString Name_; + + arrow::compute::ExecContext ExecContext_; + mutable arrow::compute::KernelContext KernelContext_; +}; + +inline void PrepareSimpleArrowUdf(IFunctionTypeInfoBuilder& builder, TType* signature, TType* userType, TExec exec, bool typesOnly, + const TString& name) { + auto typeInfoHelper = builder.TypeInfoHelper(); + TCallableTypeInspector callableInspector(*typeInfoHelper, signature); + Y_ENSURE(callableInspector); + Y_ENSURE(callableInspector.GetArgsCount() > 0); + TTupleTypeInspector userTypeInspector(*typeInfoHelper, userType); + Y_ENSURE(userTypeInspector); + Y_ENSURE(userTypeInspector.GetElementsCount() == 3); + TTupleTypeInspector argsInspector(*typeInfoHelper, userTypeInspector.GetElementType(0)); + Y_ENSURE(argsInspector); + Y_ENSURE(argsInspector.GetElementsCount() == callableInspector.GetArgsCount()); + + bool hasBlocks = false; + bool onlyScalars = true; + for (ui32 i = 0; i < argsInspector.GetElementsCount(); ++i) { + TBlockTypeInspector blockInspector(*typeInfoHelper, argsInspector.GetElementType(i)); + if (blockInspector) { + if (i == 0) { + hasBlocks = true; + } else { + Y_ENSURE(hasBlocks); + } + + onlyScalars = onlyScalars && blockInspector.IsScalar(); + } + } + + builder.SupportsBlocks(); + builder.UserType(userType); + Y_ENSURE(hasBlocks); + + TVector<std::shared_ptr<arrow::DataType>> argTypes; + auto argsBuilder = builder.Args(callableInspector.GetArgsCount()); + for (ui32 i = 0; i < argsInspector.GetElementsCount(); ++i) { + TBlockTypeInspector blockInspector(*typeInfoHelper, argsInspector.GetElementType(i)); + auto initalType = callableInspector.GetArgType(i); + argsBuilder->Add(builder.Block(blockInspector.IsScalar())->Item(initalType).Build()); + if (callableInspector.GetArgumentName(i).Size() > 0) { + argsBuilder->Name(callableInspector.GetArgumentName(i)); + } + + if (callableInspector.GetArgumentFlags(i) != 0) { + argsBuilder->Flags(callableInspector.GetArgumentFlags(i)); + } + + // TODO fill argTypes + } + + builder.Returns(builder.Block(onlyScalars)->Item(callableInspector.GetReturnType()).Build()); + if (callableInspector.GetOptionalArgsCount() > 0) { + builder.OptionalArgs(callableInspector.GetOptionalArgsCount()); + } + + if (callableInspector.GetPayload().Size() > 0) { + builder.PayloadImpl(callableInspector.GetPayload()); + } + + if (!typesOnly) { + auto returnType = typeInfoHelper->MakeArrowType(callableInspector.GetReturnType()); + Y_ENSURE(returnType); + builder.Implementation(new TSimpleArrowUdfImpl(argTypes, onlyScalars, std::move(returnType), exec, builder, name)); + } +} + +} +} + +#define BEGIN_ARROW_UDF(udfNameBlocks, signatureFunc) \ + class udfNameBlocks { \ + public: \ + typedef bool TTypeAwareMarker; \ + static const ::NYql::NUdf::TStringRef& Name() { \ + static auto name = ::NYql::NUdf::TStringRef::Of(#udfNameBlocks).Substring(1, 256); \ + return name; \ + } \ + static ::NYql::NUdf::TType* GetSignatureType(::NYql::NUdf::IFunctionTypeInfoBuilder& builder) { \ + return builder.SimpleSignatureType<signatureFunc>(); \ + } \ + static bool DeclareSignature(\ + const ::NYql::NUdf::TStringRef& name, \ + ::NYql::NUdf::TType* userType, \ + ::NYql::NUdf::IFunctionTypeInfoBuilder& builder, \ + bool typesOnly); \ + }; + +#define BEGIN_SIMPLE_ARROW_UDF(udfName, signatureFunc) \ + BEGIN_ARROW_UDF(udfName##_BlocksImpl, signatureFunc) \ + UDF(udfName, builder.SimpleSignature<signatureFunc>().SupportsBlocks();) + +#define END_ARROW_UDF(udfNameBlocks, exec) \ + inline bool udfNameBlocks::DeclareSignature(\ + const ::NYql::NUdf::TStringRef& name, \ + ::NYql::NUdf::TType* userType, \ + ::NYql::NUdf::IFunctionTypeInfoBuilder& builder, \ + bool typesOnly) { \ + if (Name() == name) { \ + PrepareSimpleArrowUdf(builder, GetSignatureType(builder), userType, exec, typesOnly, TString(name)); \ + return true; \ + } \ + return false; \ + } + +#define END_SIMPLE_ARROW_UDF(udfName, exec) \ + END_ARROW_UDF(udfName##_BlocksImpl, exec) \ + template<> \ + struct ::NYql::NUdf::TUdfTraits<udfName> { \ + static constexpr bool SupportsBlocks = true; \ + using TBlockUdf = udfName##_BlocksImpl; \ + }; diff --git a/ydb/library/yql/public/udf/udf_helpers.h b/ydb/library/yql/public/udf/udf_helpers.h index 5112ce22fe..6e12a51c91 100644 --- a/ydb/library/yql/public/udf/udf_helpers.h +++ b/ydb/library/yql/public/udf/udf_helpers.h @@ -309,6 +309,12 @@ public: } }; +template <typename TUdf> +struct TUdfTraits { + static constexpr bool SupportsBlocks = false; + using TBlockUdf = void; +}; + template<typename... TUdfs> class TSimpleUdfModuleHelper : public IUdfModule { @@ -324,6 +330,11 @@ public: if (THasTTypeAwareMarker<TUdfType>::value) { r->SetTypeAwareness(); } + + if constexpr (TUdfTraits<TUdfType>::SupportsBlocks) { + auto rBlocks = names.Add(TUdfTraits<TUdfType>::TBlockUdf::Name()); + rBlocks->SetTypeAwareness(); + } } template<typename THead1, typename THead2, typename... TTail> @@ -342,7 +353,14 @@ public: { Y_UNUSED(typeConfig); bool typesOnly = (flags & TFlags::TypesOnly); - return TUdfType::DeclareSignature(name, userType, builder, typesOnly); + bool found = TUdfType::DeclareSignature(name, userType, builder, typesOnly); + if (!found) { + if constexpr (TUdfTraits<TUdfType>::SupportsBlocks) { + found = TUdfTraits<TUdfType>::TBlockUdf::DeclareSignature(name, userType, builder, typesOnly); + } + } + + return found; } template<typename THead1, typename THead2, typename... TTail> diff --git a/ydb/library/yql/public/udf/udf_type_builder.h b/ydb/library/yql/public/udf/udf_type_builder.h index d38e716368..30597ed836 100644 --- a/ydb/library/yql/public/udf/udf_type_builder.h +++ b/ydb/library/yql/public/udf/udf_type_builder.h @@ -451,6 +451,7 @@ UDF_ASSERT_TYPE_SIZE(IListTypeBuilder, 8); namespace NImpl { template <typename T> struct TSimpleSignatureHelper; +template <typename T> struct TSimpleSignatureTypeHelper; template <typename T> struct TTypeBuilderHelper; template <typename... TArgs> struct TArgsHelper; template <typename... TArgs> struct TTupleHelper; @@ -716,6 +717,11 @@ public: return *this; } + template <typename T> + TType* SimpleSignatureType() const { + return NImpl::TSimpleSignatureTypeHelper<T>::Build(*this); + } + IFunctionTypeInfoBuilder& RunConfig(TDataTypeId type) { RunConfigImpl(type); return *this; @@ -1011,6 +1017,16 @@ struct TSimpleSignatureHelper<TReturn(TArgs...)> { } }; +template <typename TReturn, typename... TArgs> +struct TSimpleSignatureTypeHelper<TReturn(TArgs...)> { + static TType* Build(const IFunctionTypeInfoBuilder& builder) { + auto callableBuilder = builder.Callable(sizeof...(TArgs)); + callableBuilder->Returns(TTypeBuilderHelper<TReturn>::Build(builder)); + TCallableArgsHelper<TArgs...>::Arg(*callableBuilder, builder); + return callableBuilder->Build(); + } +}; + } // namspace NImpl template <typename T> diff --git a/ydb/library/yql/udfs/common/url_base/CMakeLists.darwin.txt b/ydb/library/yql/udfs/common/url_base/CMakeLists.darwin.txt index 58c5f62d5c..a6ac71608b 100644 --- a/ydb/library/yql/udfs/common/url_base/CMakeLists.darwin.txt +++ b/ydb/library/yql/udfs/common/url_base/CMakeLists.darwin.txt @@ -20,7 +20,7 @@ target_link_libraries(url_udf INTERFACE add_global_library_for(url_udf.global url_udf) target_compile_options(url_udf.global PRIVATE -DUDF_ABI_VERSION_MAJOR=2 - -DUDF_ABI_VERSION_MINOR=27 + -DUDF_ABI_VERSION_MINOR=28 -DUDF_ABI_VERSION_PATCH=0 ) target_link_libraries(url_udf.global PUBLIC diff --git a/ydb/library/yql/udfs/common/url_base/CMakeLists.linux-aarch64.txt b/ydb/library/yql/udfs/common/url_base/CMakeLists.linux-aarch64.txt index ae152cf9e0..ad561a195c 100644 --- a/ydb/library/yql/udfs/common/url_base/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/udfs/common/url_base/CMakeLists.linux-aarch64.txt @@ -21,7 +21,7 @@ target_link_libraries(url_udf INTERFACE add_global_library_for(url_udf.global url_udf) target_compile_options(url_udf.global PRIVATE -DUDF_ABI_VERSION_MAJOR=2 - -DUDF_ABI_VERSION_MINOR=27 + -DUDF_ABI_VERSION_MINOR=28 -DUDF_ABI_VERSION_PATCH=0 ) target_link_libraries(url_udf.global PUBLIC diff --git a/ydb/library/yql/udfs/common/url_base/CMakeLists.linux.txt b/ydb/library/yql/udfs/common/url_base/CMakeLists.linux.txt index ae152cf9e0..ad561a195c 100644 --- a/ydb/library/yql/udfs/common/url_base/CMakeLists.linux.txt +++ b/ydb/library/yql/udfs/common/url_base/CMakeLists.linux.txt @@ -21,7 +21,7 @@ target_link_libraries(url_udf INTERFACE add_global_library_for(url_udf.global url_udf) target_compile_options(url_udf.global PRIVATE -DUDF_ABI_VERSION_MAJOR=2 - -DUDF_ABI_VERSION_MINOR=27 + -DUDF_ABI_VERSION_MINOR=28 -DUDF_ABI_VERSION_PATCH=0 ) target_link_libraries(url_udf.global PUBLIC diff --git a/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.darwin.txt b/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.darwin.txt index 75cbdbd4af..c56c96c1fb 100644 --- a/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.darwin.txt +++ b/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.darwin.txt @@ -10,7 +10,7 @@ add_library(common-url_base-lib) target_compile_options(common-url_base-lib PRIVATE -DUDF_ABI_VERSION_MAJOR=2 - -DUDF_ABI_VERSION_MINOR=27 + -DUDF_ABI_VERSION_MINOR=28 -DUDF_ABI_VERSION_PATCH=0 ) target_link_libraries(common-url_base-lib PUBLIC @@ -23,6 +23,8 @@ target_link_libraries(common-url_base-lib PUBLIC cpp-unicode-punycode library-cpp-uri yql-public-udf + public-udf-arrow + libs-apache-arrow ) target_sources(common-url_base-lib PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/url_base/lib/url_base_udf.cpp diff --git a/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.linux-aarch64.txt b/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.linux-aarch64.txt index 1af705d435..8346301c6c 100644 --- a/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.linux-aarch64.txt @@ -10,7 +10,7 @@ add_library(common-url_base-lib) target_compile_options(common-url_base-lib PRIVATE -DUDF_ABI_VERSION_MAJOR=2 - -DUDF_ABI_VERSION_MINOR=27 + -DUDF_ABI_VERSION_MINOR=28 -DUDF_ABI_VERSION_PATCH=0 ) target_link_libraries(common-url_base-lib PUBLIC @@ -24,6 +24,8 @@ target_link_libraries(common-url_base-lib PUBLIC cpp-unicode-punycode library-cpp-uri yql-public-udf + public-udf-arrow + libs-apache-arrow ) target_sources(common-url_base-lib PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/url_base/lib/url_base_udf.cpp diff --git a/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.linux.txt b/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.linux.txt index 1af705d435..8346301c6c 100644 --- a/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.linux.txt +++ b/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.linux.txt @@ -10,7 +10,7 @@ add_library(common-url_base-lib) target_compile_options(common-url_base-lib PRIVATE -DUDF_ABI_VERSION_MAJOR=2 - -DUDF_ABI_VERSION_MINOR=27 + -DUDF_ABI_VERSION_MINOR=28 -DUDF_ABI_VERSION_PATCH=0 ) target_link_libraries(common-url_base-lib PUBLIC @@ -24,6 +24,8 @@ target_link_libraries(common-url_base-lib PUBLIC cpp-unicode-punycode library-cpp-uri yql-public-udf + public-udf-arrow + libs-apache-arrow ) target_sources(common-url_base-lib PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/url_base/lib/url_base_udf.cpp diff --git a/ydb/library/yql/udfs/common/url_base/lib/url_base_udf.h b/ydb/library/yql/udfs/common/url_base/lib/url_base_udf.h index 29e757976d..832b61a228 100644 --- a/ydb/library/yql/udfs/common/url_base/lib/url_base_udf.h +++ b/ydb/library/yql/udfs/common/url_base/lib/url_base_udf.h @@ -5,6 +5,8 @@ #include <ydb/library/yql/public/udf/udf_helpers.h> +#include <ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h> + #include <library/cpp/tld/tld.h> #include <library/cpp/charset/wide.h> #include <library/cpp/unicode/punycode/punycode.h> @@ -19,284 +21,292 @@ using namespace NUdf; using namespace NTld; using namespace NUrlUdf; -namespace { - inline bool PrepareUrl(const std::string_view& keyStr, TUri& parser) { - const NUri::TParseFlags& parseFlags(TUri::FeaturesRecommended); - return parser.ParseAbs(keyStr, parseFlags) == TUri::ParsedOK; - } +inline bool PrepareUrl(const std::string_view& keyStr, TUri& parser) { + const NUri::TParseFlags& parseFlags(TUri::FeaturesRecommended); + return parser.ParseAbs(keyStr, parseFlags) == TUri::ParsedOK; +} - SIMPLE_UDF(TNormalize, TOptional<char*>(TOptional<char*>)) { - EMPTY_RESULT_ON_EMPTY_ARG(0); - TUri url; - const bool success = PrepareUrl(args[0].AsStringRef(), url); - return success - ? valueBuilder->NewString(url.PrintS(TUri::FlagNoFrag)) - : TUnboxedValue(); - } +SIMPLE_UDF(TNormalize, TOptional<char*>(TOptional<char*>)) { + EMPTY_RESULT_ON_EMPTY_ARG(0); + TUri url; + const bool success = PrepareUrl(args[0].AsStringRef(), url); + return success + ? valueBuilder->NewString(url.PrintS(TUri::FlagNoFrag)) + : TUnboxedValue(); +} - SIMPLE_UDF(TGetScheme, char*(TAutoMap<char*>)) { - const std::string_view url(args[0].AsStringRef()); - const std::string_view prefix(GetSchemePrefix(url)); - return valueBuilder->SubString(args[0], std::distance(url.begin(), prefix.begin()), prefix.size()); - } +SIMPLE_UDF(TGetScheme, char*(TAutoMap<char*>)) { + const std::string_view url(args[0].AsStringRef()); + const std::string_view prefix(GetSchemePrefix(url)); + return valueBuilder->SubString(args[0], std::distance(url.begin(), prefix.begin()), prefix.size()); +} - SIMPLE_UDF(TGetHost, TOptional<char*>(TOptional<char*>)) { - EMPTY_RESULT_ON_EMPTY_ARG(0); - const std::string_view url(args[0].AsStringRef()); - const std::string_view host(GetOnlyHost(url)); - return host.empty() ? TUnboxedValue() : - valueBuilder->SubString(args[0], std::distance(url.begin(), host.begin()), host.size()); - } +inline arrow::Status GetHostKernelExec(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) { + Y_UNUSED(ctx); + Y_UNUSED(batch); + Y_UNUSED(res); + Y_ENSURE(false); +} - SIMPLE_UDF(TGetHostPort, TOptional<char*>(TOptional<char*>)) { - EMPTY_RESULT_ON_EMPTY_ARG(0); - const std::string_view url(args[0].AsStringRef()); - const std::string_view host(GetHostAndPort(CutSchemePrefix(url))); - return host.empty() ? TUnboxedValue() : - valueBuilder->SubString(args[0], std::distance(url.begin(), host.begin()), host.size()); - } +BEGIN_SIMPLE_ARROW_UDF(TGetHost, TOptional<char*>(TOptional<char*>)) { + EMPTY_RESULT_ON_EMPTY_ARG(0); + const std::string_view url(args[0].AsStringRef()); + const std::string_view host(GetOnlyHost(url)); + return host.empty() ? TUnboxedValue() : + valueBuilder->SubString(args[0], std::distance(url.begin(), host.begin()), host.size()); +} - SIMPLE_UDF(TGetSchemeHost, TOptional<char*>(TOptional<char*>)) { - EMPTY_RESULT_ON_EMPTY_ARG(0); - const std::string_view url(args[0].AsStringRef()); - const std::string_view host(GetSchemeHost(url, /* trimHttp */ false)); - return host.empty() ? TUnboxedValue() : - valueBuilder->SubString(args[0], 0U, std::distance(url.begin(), host.end())); - } +END_SIMPLE_ARROW_UDF(TGetHost, GetHostKernelExec); - SIMPLE_UDF(TGetSchemeHostPort, TOptional<char*>(TOptional<char*>)) { - EMPTY_RESULT_ON_EMPTY_ARG(0); - const std::string_view url(args[0].AsStringRef()); - const std::string_view host(GetSchemeHostAndPort(url, /* trimHttp */ false, /* trimDefaultPort */ false)); - return host.empty() ? TUnboxedValue() : - valueBuilder->SubString(args[0], 0U, std::distance(url.begin(), host.end())); - } +SIMPLE_UDF(TGetHostPort, TOptional<char*>(TOptional<char*>)) { + EMPTY_RESULT_ON_EMPTY_ARG(0); + const std::string_view url(args[0].AsStringRef()); + const std::string_view host(GetHostAndPort(CutSchemePrefix(url))); + return host.empty() ? TUnboxedValue() : + valueBuilder->SubString(args[0], std::distance(url.begin(), host.begin()), host.size()); +} - SIMPLE_UDF(TGetPort, TOptional<ui64>(TOptional<char*>)) { - EMPTY_RESULT_ON_EMPTY_ARG(0); - Y_UNUSED(valueBuilder); - ui16 port = 0; - TStringBuf scheme, host; - TString lowerUri(args[0].AsStringRef()); - std::transform(lowerUri.cbegin(), lowerUri.cbegin() + GetSchemePrefixSize(lowerUri), - lowerUri.begin(), [](unsigned char c){ return std::tolower(c); }); - return TryGetSchemeHostAndPort(lowerUri, scheme, host, port) && port - ? TUnboxedValuePod(port) - : TUnboxedValuePod(); - } +SIMPLE_UDF(TGetSchemeHost, TOptional<char*>(TOptional<char*>)) { + EMPTY_RESULT_ON_EMPTY_ARG(0); + const std::string_view url(args[0].AsStringRef()); + const std::string_view host(GetSchemeHost(url, /* trimHttp */ false)); + return host.empty() ? TUnboxedValue() : + valueBuilder->SubString(args[0], 0U, std::distance(url.begin(), host.end())); +} - SIMPLE_UDF(TGetTail, TOptional<char*>(TOptional<char*>)) { - EMPTY_RESULT_ON_EMPTY_ARG(0); - const TStringBuf url(args[0].AsStringRef()); - TStringBuf host, tail; - SplitUrlToHostAndPath(url, host, tail); - return tail.StartsWith('/') - ? valueBuilder->NewString(tail) - : valueBuilder->NewString(TString('/').append(tail)); - } +SIMPLE_UDF(TGetSchemeHostPort, TOptional<char*>(TOptional<char*>)) { + EMPTY_RESULT_ON_EMPTY_ARG(0); + const std::string_view url(args[0].AsStringRef()); + const std::string_view host(GetSchemeHostAndPort(url, /* trimHttp */ false, /* trimDefaultPort */ false)); + return host.empty() ? TUnboxedValue() : + valueBuilder->SubString(args[0], 0U, std::distance(url.begin(), host.end())); +} - SIMPLE_UDF(TGetPath, TOptional<char*>(TOptional<char*>)) { - EMPTY_RESULT_ON_EMPTY_ARG(0); - const std::string_view url(args[0].AsStringRef()); - std::string_view cut(CutSchemePrefix(url)); - const auto s = cut.find('/'); - if (s == std::string_view::npos) { - return valueBuilder->NewString("/"); - } +SIMPLE_UDF(TGetPort, TOptional<ui64>(TOptional<char*>)) { + EMPTY_RESULT_ON_EMPTY_ARG(0); + Y_UNUSED(valueBuilder); + ui16 port = 0; + TStringBuf scheme, host; + TString lowerUri(args[0].AsStringRef()); + std::transform(lowerUri.cbegin(), lowerUri.cbegin() + GetSchemePrefixSize(lowerUri), + lowerUri.begin(), [](unsigned char c){ return std::tolower(c); }); + return TryGetSchemeHostAndPort(lowerUri, scheme, host, port) && port + ? TUnboxedValuePod(port) + : TUnboxedValuePod(); +} - cut.remove_prefix(s); - const auto end = cut.find_first_of("?#"); - if (std::string_view::npos != end) { - cut.remove_suffix(cut.size() - end); - } +SIMPLE_UDF(TGetTail, TOptional<char*>(TOptional<char*>)) { + EMPTY_RESULT_ON_EMPTY_ARG(0); + const TStringBuf url(args[0].AsStringRef()); + TStringBuf host, tail; + SplitUrlToHostAndPath(url, host, tail); + return tail.StartsWith('/') + ? valueBuilder->NewString(tail) + : valueBuilder->NewString(TString('/').append(tail)); +} - return valueBuilder->SubString(args[0], std::distance(url.begin(), cut.begin()), cut.length()); +SIMPLE_UDF(TGetPath, TOptional<char*>(TOptional<char*>)) { + EMPTY_RESULT_ON_EMPTY_ARG(0); + const std::string_view url(args[0].AsStringRef()); + std::string_view cut(CutSchemePrefix(url)); + const auto s = cut.find('/'); + if (s == std::string_view::npos) { + return valueBuilder->NewString("/"); } - SIMPLE_UDF(TGetFragment, TOptional<char*>(TOptional<char*>)) { - EMPTY_RESULT_ON_EMPTY_ARG(0); - const std::string_view url(args[0].AsStringRef()); - const auto pos = url.find('#'); - return pos == std::string_view::npos ? TUnboxedValue() : - valueBuilder->SubString(args[0], pos + 1U, url.length() - pos - 1U); + cut.remove_prefix(s); + const auto end = cut.find_first_of("?#"); + if (std::string_view::npos != end) { + cut.remove_suffix(cut.size() - end); } - SIMPLE_UDF(TGetDomain, TOptional<char*>(TOptional<char*>, ui8)) { - EMPTY_RESULT_ON_EMPTY_ARG(0); - const std::string_view url(args[0].AsStringRef()); - const std::string_view host(GetOnlyHost(url)); - const ui8 level = args[1].Get<ui8>(); - std::vector<std::string_view> parts; - StringSplitter(host).Split('.').AddTo(&parts); - if (level && parts.size() >= level) { - const auto& result = host.substr(std::distance(host.begin(), parts[parts.size() - level].begin())); - return result.empty() ? TUnboxedValue() : - valueBuilder->SubString(args[0], std::distance(url.begin(), result.begin()), result.size()); - } + return valueBuilder->SubString(args[0], std::distance(url.begin(), cut.begin()), cut.length()); +} - return TUnboxedValue(); - } +SIMPLE_UDF(TGetFragment, TOptional<char*>(TOptional<char*>)) { + EMPTY_RESULT_ON_EMPTY_ARG(0); + const std::string_view url(args[0].AsStringRef()); + const auto pos = url.find('#'); + return pos == std::string_view::npos ? TUnboxedValue() : + valueBuilder->SubString(args[0], pos + 1U, url.length() - pos - 1U); +} - SIMPLE_UDF(TGetTLD, char*(TAutoMap<char*>)) { - const TStringBuf url(args[0].AsStringRef()); - return valueBuilder->NewString(GetZone(GetOnlyHost(url))); +SIMPLE_UDF(TGetDomain, TOptional<char*>(TOptional<char*>, ui8)) { + EMPTY_RESULT_ON_EMPTY_ARG(0); + const std::string_view url(args[0].AsStringRef()); + const std::string_view host(GetOnlyHost(url)); + const ui8 level = args[1].Get<ui8>(); + std::vector<std::string_view> parts; + StringSplitter(host).Split('.').AddTo(&parts); + if (level && parts.size() >= level) { + const auto& result = host.substr(std::distance(host.begin(), parts[parts.size() - level].begin())); + return result.empty() ? TUnboxedValue() : + valueBuilder->SubString(args[0], std::distance(url.begin(), result.begin()), result.size()); } - SIMPLE_UDF(TGetDomainLevel, ui64(TAutoMap<char*>)) { - Y_UNUSED(valueBuilder); - std::vector<std::string_view> parts; - StringSplitter(GetOnlyHost(args[0].AsStringRef())).Split('.').AddTo(&parts); - return TUnboxedValuePod(ui64(parts.size())); - } + return TUnboxedValue(); +} + +SIMPLE_UDF(TGetTLD, char*(TAutoMap<char*>)) { + const TStringBuf url(args[0].AsStringRef()); + return valueBuilder->NewString(GetZone(GetOnlyHost(url))); +} - SIMPLE_UDF_OPTIONS(TGetSignificantDomain, char*(TAutoMap<char*>, TOptional<TListType<char*>>), - builder.OptionalArgs(1)) { - const std::string_view url(args[0].AsStringRef()); - const std::string_view host(GetOnlyHost(url)); - std::vector<std::string_view> parts; - StringSplitter(host).Split('.').AddTo(&parts); - if (parts.size() > 2) { - const auto& secondLevel = parts.at(parts.size() - 2); - bool secondLevelIsZone = false; - - if (args[1]) { - const auto& zonesIterator = args[1].GetListIterator(); - for (TUnboxedValue item; zonesIterator.Next(item);) { - if (secondLevel == item.AsStringRef()) { - secondLevelIsZone = true; - break; - } +SIMPLE_UDF(TGetDomainLevel, ui64(TAutoMap<char*>)) { + Y_UNUSED(valueBuilder); + std::vector<std::string_view> parts; + StringSplitter(GetOnlyHost(args[0].AsStringRef())).Split('.').AddTo(&parts); + return TUnboxedValuePod(ui64(parts.size())); +} + +SIMPLE_UDF_OPTIONS(TGetSignificantDomain, char*(TAutoMap<char*>, TOptional<TListType<char*>>), + builder.OptionalArgs(1)) { + const std::string_view url(args[0].AsStringRef()); + const std::string_view host(GetOnlyHost(url)); + std::vector<std::string_view> parts; + StringSplitter(host).Split('.').AddTo(&parts); + if (parts.size() > 2) { + const auto& secondLevel = parts.at(parts.size() - 2); + bool secondLevelIsZone = false; + + if (args[1]) { + const auto& zonesIterator = args[1].GetListIterator(); + for (TUnboxedValue item; zonesIterator.Next(item);) { + if (secondLevel == item.AsStringRef()) { + secondLevelIsZone = true; + break; } - } else { - static const std::set<std::string_view> zones{"com", "net", "org", "co", "gov", "edu"}; - secondLevelIsZone = zones.count(secondLevel); } - - const auto from = parts[parts.size() - (secondLevelIsZone ? 3U : 2U)].begin(); - return valueBuilder->SubString(args[0], std::distance(url.begin(), from), std::distance(from, parts.back().end())); + } else { + static const std::set<std::string_view> zones{"com", "net", "org", "co", "gov", "edu"}; + secondLevelIsZone = zones.count(secondLevel); } - return valueBuilder->SubString(args[0], std::distance(url.begin(), host.begin()), host.length()); + + const auto from = parts[parts.size() - (secondLevelIsZone ? 3U : 2U)].begin(); + return valueBuilder->SubString(args[0], std::distance(url.begin(), from), std::distance(from, parts.back().end())); } + return valueBuilder->SubString(args[0], std::distance(url.begin(), host.begin()), host.length()); +} - SIMPLE_UDF(TGetCGIParam, TOptional<char*>(TOptional<char*>, char*)) { - EMPTY_RESULT_ON_EMPTY_ARG(0); - const std::string_view url(args[0].AsStringRef()); - const std::string_view key(args[1].AsStringRef()); - const auto queryStart = url.find('?'); - if (queryStart != std::string_view::npos) { - const auto from = queryStart + 1U; - const auto anc = url.find('#', from); - const auto end = anc == std::string_view::npos ? url.length() : anc; - for (auto pos = from; pos && pos < end; ++pos) { - const auto equal = url.find('=', pos); - const auto amper = url.find('&', pos); - if (equal < amper) { - const auto& param = url.substr(pos, equal - pos); - if (param == key) { - return valueBuilder->SubString(args[0], equal + 1U, std::min(amper, end) - equal - 1U); - } +SIMPLE_UDF(TGetCGIParam, TOptional<char*>(TOptional<char*>, char*)) { + EMPTY_RESULT_ON_EMPTY_ARG(0); + const std::string_view url(args[0].AsStringRef()); + const std::string_view key(args[1].AsStringRef()); + const auto queryStart = url.find('?'); + if (queryStart != std::string_view::npos) { + const auto from = queryStart + 1U; + const auto anc = url.find('#', from); + const auto end = anc == std::string_view::npos ? url.length() : anc; + for (auto pos = from; pos && pos < end; ++pos) { + const auto equal = url.find('=', pos); + const auto amper = url.find('&', pos); + if (equal < amper) { + const auto& param = url.substr(pos, equal - pos); + if (param == key) { + return valueBuilder->SubString(args[0], equal + 1U, std::min(amper, end) - equal - 1U); } - - pos = amper; } - } - return TUnboxedValue(); + pos = amper; + } } - SIMPLE_UDF(TCutScheme, TOptional<char*>(TOptional<char*>)) { - EMPTY_RESULT_ON_EMPTY_ARG(0); - const std::string_view url(args[0].AsStringRef()); - const std::string_view cut(CutSchemePrefix(url)); - return cut.empty() ? TUnboxedValue() : - valueBuilder->SubString(args[0], std::distance(url.begin(), cut.begin()), cut.length()); - } + return TUnboxedValue(); +} - SIMPLE_UDF(TCutWWW, TOptional<char*>(TOptional<char*>)) { - EMPTY_RESULT_ON_EMPTY_ARG(0); - const std::string_view url(args[0].AsStringRef()); - const std::string_view cut(CutWWWPrefix(url)); - return cut.empty() ? TUnboxedValue() : - valueBuilder->SubString(args[0], std::distance(url.begin(), cut.begin()), cut.length()); - } +SIMPLE_UDF(TCutScheme, TOptional<char*>(TOptional<char*>)) { + EMPTY_RESULT_ON_EMPTY_ARG(0); + const std::string_view url(args[0].AsStringRef()); + const std::string_view cut(CutSchemePrefix(url)); + return cut.empty() ? TUnboxedValue() : + valueBuilder->SubString(args[0], std::distance(url.begin(), cut.begin()), cut.length()); +} - SIMPLE_UDF(TCutWWW2, TOptional<char*>(TOptional<char*>)) { - EMPTY_RESULT_ON_EMPTY_ARG(0); - const std::string_view url(args[0].AsStringRef()); - const std::string_view cut(CutWWWNumberedPrefix(url)); - return cut.empty() ? TUnboxedValue() : - valueBuilder->SubString(args[0], std::distance(url.begin(), cut.begin()), cut.length()); - } +SIMPLE_UDF(TCutWWW, TOptional<char*>(TOptional<char*>)) { + EMPTY_RESULT_ON_EMPTY_ARG(0); + const std::string_view url(args[0].AsStringRef()); + const std::string_view cut(CutWWWPrefix(url)); + return cut.empty() ? TUnboxedValue() : + valueBuilder->SubString(args[0], std::distance(url.begin(), cut.begin()), cut.length()); +} - SIMPLE_UDF(TCutQueryStringAndFragment, char*(TAutoMap<char*>)) { - const std::string_view input(args[0].AsStringRef()); - const auto cut = input.find_first_of("?#"); - return std::string_view::npos == cut ? NUdf::TUnboxedValue(args[0]) : valueBuilder->SubString(args[0], 0U, cut); - } +SIMPLE_UDF(TCutWWW2, TOptional<char*>(TOptional<char*>)) { + EMPTY_RESULT_ON_EMPTY_ARG(0); + const std::string_view url(args[0].AsStringRef()); + const std::string_view cut(CutWWWNumberedPrefix(url)); + return cut.empty() ? TUnboxedValue() : + valueBuilder->SubString(args[0], std::distance(url.begin(), cut.begin()), cut.length()); +} - SIMPLE_UDF(TEncode, TOptional<char*>(TOptional<char*>)) { - EMPTY_RESULT_ON_EMPTY_ARG(0); - const std::string_view input(args[0].AsStringRef()); - if (input.empty()) { - return NUdf::TUnboxedValuePod(); - } - TString url(input); - UrlEscape(url); - return input == url ? NUdf::TUnboxedValue(args[0]) : valueBuilder->NewString(url); - } +SIMPLE_UDF(TCutQueryStringAndFragment, char*(TAutoMap<char*>)) { + const std::string_view input(args[0].AsStringRef()); + const auto cut = input.find_first_of("?#"); + return std::string_view::npos == cut ? NUdf::TUnboxedValue(args[0]) : valueBuilder->SubString(args[0], 0U, cut); +} - SIMPLE_UDF(TDecode, TOptional<char*>(TOptional<char*>)) { - EMPTY_RESULT_ON_EMPTY_ARG(0); - const std::string_view input(args[0].AsStringRef()); - if (input.empty()) { - return NUdf::TUnboxedValuePod(); - } - TString url(input); - SubstGlobal(url, '+', ' '); - UrlUnescape(url); - return input == url ? NUdf::TUnboxedValue(args[0]) : valueBuilder->NewString(url); +SIMPLE_UDF(TEncode, TOptional<char*>(TOptional<char*>)) { + EMPTY_RESULT_ON_EMPTY_ARG(0); + const std::string_view input(args[0].AsStringRef()); + if (input.empty()) { + return NUdf::TUnboxedValuePod(); } + TString url(input); + UrlEscape(url); + return input == url ? NUdf::TUnboxedValue(args[0]) : valueBuilder->NewString(url); +} - SIMPLE_UDF(TIsKnownTLD, bool(TAutoMap<char*>)) { - Y_UNUSED(valueBuilder); - return TUnboxedValuePod(IsTld(args[0].AsStringRef())); +SIMPLE_UDF(TDecode, TOptional<char*>(TOptional<char*>)) { + EMPTY_RESULT_ON_EMPTY_ARG(0); + const std::string_view input(args[0].AsStringRef()); + if (input.empty()) { + return NUdf::TUnboxedValuePod(); } + TString url(input); + SubstGlobal(url, '+', ' '); + UrlUnescape(url); + return input == url ? NUdf::TUnboxedValue(args[0]) : valueBuilder->NewString(url); +} - SIMPLE_UDF(TIsWellKnownTLD, bool(TAutoMap<char*>)) { - Y_UNUSED(valueBuilder); - return TUnboxedValuePod(IsVeryGoodTld(args[0].AsStringRef())); - } +SIMPLE_UDF(TIsKnownTLD, bool(TAutoMap<char*>)) { + Y_UNUSED(valueBuilder); + return TUnboxedValuePod(IsTld(args[0].AsStringRef())); +} - SIMPLE_UDF(THostNameToPunycode, TOptional<char*>(TAutoMap<char*>)) try { - const TUtf16String& input = UTF8ToWide(args[0].AsStringRef()); - return valueBuilder->NewString(HostNameToPunycode(input)); - } catch (TPunycodeError&) { - return TUnboxedValue(); - } +SIMPLE_UDF(TIsWellKnownTLD, bool(TAutoMap<char*>)) { + Y_UNUSED(valueBuilder); + return TUnboxedValuePod(IsVeryGoodTld(args[0].AsStringRef())); +} - SIMPLE_UDF(TForceHostNameToPunycode, char*(TAutoMap<char*>)) { - const TUtf16String& input = UTF8ToWide(args[0].AsStringRef()); - return valueBuilder->NewString(ForceHostNameToPunycode(input)); - } +SIMPLE_UDF(THostNameToPunycode, TOptional<char*>(TAutoMap<char*>)) try { + const TUtf16String& input = UTF8ToWide(args[0].AsStringRef()); + return valueBuilder->NewString(HostNameToPunycode(input)); +} catch (TPunycodeError&) { + return TUnboxedValue(); +} - SIMPLE_UDF(TPunycodeToHostName, TOptional<char*>(TAutoMap<char*>)) try { - const TStringRef& input = args[0].AsStringRef(); - const auto& result = WideToUTF8(PunycodeToHostName(input)); - return valueBuilder->NewString(result); - } catch (TPunycodeError&) { - return TUnboxedValue(); - } +SIMPLE_UDF(TForceHostNameToPunycode, char*(TAutoMap<char*>)) { + const TUtf16String& input = UTF8ToWide(args[0].AsStringRef()); + return valueBuilder->NewString(ForceHostNameToPunycode(input)); +} - SIMPLE_UDF(TForcePunycodeToHostName, char*(TAutoMap<char*>)) { - const TStringRef& input = args[0].AsStringRef(); - const auto& result = WideToUTF8(ForcePunycodeToHostName(input)); - return valueBuilder->NewString(result); - } +SIMPLE_UDF(TPunycodeToHostName, TOptional<char*>(TAutoMap<char*>)) try { + const TStringRef& input = args[0].AsStringRef(); + const auto& result = WideToUTF8(PunycodeToHostName(input)); + return valueBuilder->NewString(result); +} catch (TPunycodeError&) { + return TUnboxedValue(); +} - SIMPLE_UDF(TCanBePunycodeHostName, bool(TAutoMap<char*>)) { - Y_UNUSED(valueBuilder); - return TUnboxedValuePod(CanBePunycodeHostName(args[0].AsStringRef())); - } +SIMPLE_UDF(TForcePunycodeToHostName, char*(TAutoMap<char*>)) { + const TStringRef& input = args[0].AsStringRef(); + const auto& result = WideToUTF8(ForcePunycodeToHostName(input)); + return valueBuilder->NewString(result); +} + +SIMPLE_UDF(TCanBePunycodeHostName, bool(TAutoMap<char*>)) { + Y_UNUSED(valueBuilder); + return TUnboxedValuePod(CanBePunycodeHostName(args[0].AsStringRef())); +} #define EXPORTED_URL_BASE_UDF \ TNormalize, \ @@ -331,4 +341,4 @@ namespace { TQueryStringToList, \ TQueryStringToDict, \ TBuildQueryString -} + |