diff options
author | vvvv <vvvv@ydb.tech> | 2023-06-01 14:24:06 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2023-06-01 14:24:06 +0300 |
commit | 7cfdf6d2778a8cfaa4c22b6c2dbde8ce6af58895 (patch) | |
tree | 94a6d3f847a9af044d1ab5302097b91e75672eed | |
parent | 3192203c5b698d14f82e5ffe551878694a52204d (diff) | |
download | ydb-7cfdf6d2778a8cfaa4c22b6c2dbde8ce6af58895.tar.gz |
Expose arrow kernel from Apply
10 files changed, 248 insertions, 5 deletions
diff --git a/ydb/library/yql/core/arrow_kernels/registry/ut/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/core/arrow_kernels/registry/ut/CMakeLists.darwin-x86_64.txt index ae4c3e50f47..183c38d9e47 100644 --- a/ydb/library/yql/core/arrow_kernels/registry/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/core/arrow_kernels/registry/ut/CMakeLists.darwin-x86_64.txt @@ -26,6 +26,7 @@ target_link_libraries(ydb-library-yql-core-arrow_kernels-registry-ut PUBLIC yql-sql-pg_dummy yql-minikql-invoke_builtins yql-minikql-comp_nodes + url_udf ) target_link_options(ydb-library-yql-core-arrow_kernels-registry-ut PRIVATE -Wl,-platform_version,macos,11.0,11.0 diff --git a/ydb/library/yql/core/arrow_kernels/registry/ut/CMakeLists.linux-aarch64.txt b/ydb/library/yql/core/arrow_kernels/registry/ut/CMakeLists.linux-aarch64.txt index 6bf2afe1c24..dfb0e15b5e2 100644 --- a/ydb/library/yql/core/arrow_kernels/registry/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/core/arrow_kernels/registry/ut/CMakeLists.linux-aarch64.txt @@ -26,6 +26,7 @@ target_link_libraries(ydb-library-yql-core-arrow_kernels-registry-ut PUBLIC yql-sql-pg_dummy yql-minikql-invoke_builtins yql-minikql-comp_nodes + url_udf ) target_link_options(ydb-library-yql-core-arrow_kernels-registry-ut PRIVATE -ldl diff --git a/ydb/library/yql/core/arrow_kernels/registry/ut/CMakeLists.linux-x86_64.txt b/ydb/library/yql/core/arrow_kernels/registry/ut/CMakeLists.linux-x86_64.txt index f1a3e5291d3..3bf9c98e99c 100644 --- a/ydb/library/yql/core/arrow_kernels/registry/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/core/arrow_kernels/registry/ut/CMakeLists.linux-x86_64.txt @@ -27,6 +27,7 @@ target_link_libraries(ydb-library-yql-core-arrow_kernels-registry-ut PUBLIC yql-sql-pg_dummy yql-minikql-invoke_builtins yql-minikql-comp_nodes + url_udf ) target_link_options(ydb-library-yql-core-arrow_kernels-registry-ut PRIVATE -ldl diff --git a/ydb/library/yql/core/arrow_kernels/registry/ut/CMakeLists.windows-x86_64.txt b/ydb/library/yql/core/arrow_kernels/registry/ut/CMakeLists.windows-x86_64.txt index 55f8a282d87..e99e5f48d83 100644 --- a/ydb/library/yql/core/arrow_kernels/registry/ut/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/core/arrow_kernels/registry/ut/CMakeLists.windows-x86_64.txt @@ -26,6 +26,7 @@ target_link_libraries(ydb-library-yql-core-arrow_kernels-registry-ut PUBLIC yql-sql-pg_dummy yql-minikql-invoke_builtins yql-minikql-comp_nodes + url_udf ) target_sources(ydb-library-yql-core-arrow_kernels-registry-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/arrow_kernels/registry/ut/registry_ut.cpp diff --git a/ydb/library/yql/core/arrow_kernels/registry/ut/registry_ut.cpp b/ydb/library/yql/core/arrow_kernels/registry/ut/registry_ut.cpp index 1acaa8637ba..d3cf78a273f 100644 --- a/ydb/library/yql/core/arrow_kernels/registry/ut/registry_ut.cpp +++ b/ydb/library/yql/core/arrow_kernels/registry/ut/registry_ut.cpp @@ -11,7 +11,8 @@ using namespace NKikimr::NMiniKQL; template <typename F> void TestOne(F&& f) { TExprContext ctx; - auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry()); + auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone(); + FillStaticModules(*functionRegistry); auto nodeFactory = GetBuiltinFactory(); TKernelRequestBuilder b(*functionRegistry); auto index = f(b, ctx); @@ -81,4 +82,14 @@ Y_UNIT_TEST_SUITE(TKernelRegistryTest) { return b.AddBinaryOp(TKernelRequestBuilder::EBinaryOp::Div, blockInt32Type, blockInt32Type, blockOptInt32Type); }); } + + Y_UNIT_TEST(TestUdf) { + TestOne([](auto& b,auto& ctx) { + auto blockOptStringType = ctx.template MakeType<TBlockExprType>( + ctx.template MakeType<TOptionalExprType>( + ctx.template MakeType<TDataExprType>(EDataSlot::String))); + return b.Udf("Url.GetHost", false, { blockOptStringType }, blockOptStringType) ; + }); + } + } diff --git a/ydb/library/yql/core/arrow_kernels/request/request.cpp b/ydb/library/yql/core/arrow_kernels/request/request.cpp index 8450acbce97..a8f5acc538f 100644 --- a/ydb/library/yql/core/arrow_kernels/request/request.cpp +++ b/ydb/library/yql/core/arrow_kernels/request/request.cpp @@ -54,6 +54,32 @@ ui32 TKernelRequestBuilder::AddBinaryOp(EBinaryOp op, const TTypeAnnotationNode* return Items_.size() - 1; } +ui32 TKernelRequestBuilder::Udf(const TString& name, bool isPolymorphic, const std::vector<const TTypeAnnotationNode*>& argTypes, + const TTypeAnnotationNode* retType) { + TGuard<NKikimr::NMiniKQL::TScopedAlloc> allocGuard(Alloc_); + std::vector<NKikimr::NMiniKQL::TType*> inputTypes; + for (const auto& type : argTypes) { + inputTypes.emplace_back(MakeType(type)); + } + + const auto userType = Pb_.NewTupleType({ + Pb_.NewTupleType(inputTypes), + Pb_.NewEmptyStructType(), + Pb_.NewEmptyTupleType()}); + + auto udf = Pb_.Udf(isPolymorphic ? name : (name + "_BlocksImpl"), Pb_.NewVoid(), userType); + std::vector<NKikimr::NMiniKQL::TRuntimeNode> args; + for (const auto& type : argTypes) { + args.emplace_back(MakeArg(type)); + } + + auto apply = Pb_.Apply(udf, args); + auto outType = MakeType(retType); + Y_ENSURE(outType->IsSameType(*apply.GetStaticType())); + Items_.emplace_back(apply); + return Items_.size() - 1; +} + TString TKernelRequestBuilder::Serialize() { TGuard<NKikimr::NMiniKQL::TScopedAlloc> allocGuard(Alloc_); auto tuple = Items_.empty() ? Pb_.AsScalar(Pb_.NewEmptyTuple()) : Pb_.BlockAsTuple(Items_); @@ -70,13 +96,18 @@ NKikimr::NMiniKQL::TRuntimeNode TKernelRequestBuilder::MakeArg(const TTypeAnnota } NKikimr::NMiniKQL::TBlockType* TKernelRequestBuilder::MakeType(const TTypeAnnotationNode* type) { + auto [it, inserted] = CachedTypes_.emplace(type, nullptr); + if (!inserted) { + return it->second; + } + TStringStream err; auto ret = NCommon::BuildType(*type, Pb_, err); if (!ret) { ythrow yexception() << err.Str(); } - return AS_TYPE(NKikimr::NMiniKQL::TBlockType, ret); + return it->second = AS_TYPE(NKikimr::NMiniKQL::TBlockType, ret); } } diff --git a/ydb/library/yql/core/arrow_kernels/request/request.h b/ydb/library/yql/core/arrow_kernels/request/request.h index f3eb860a0d4..20c0e9babe3 100644 --- a/ydb/library/yql/core/arrow_kernels/request/request.h +++ b/ydb/library/yql/core/arrow_kernels/request/request.h @@ -24,6 +24,7 @@ public: ui32 AddUnaryOp(EUnaryOp op, const TTypeAnnotationNode* arg1Type, const TTypeAnnotationNode* retType); ui32 AddBinaryOp(EBinaryOp op, const TTypeAnnotationNode* arg1Type, const TTypeAnnotationNode* arg2Type, const TTypeAnnotationNode* retType); + ui32 Udf(const TString& name, bool isPolymorphic, const std::vector<const TTypeAnnotationNode*>& argTypes, const TTypeAnnotationNode* retType); TString Serialize(); private: @@ -34,6 +35,7 @@ private: NKikimr::NMiniKQL::TTypeEnvironment Env_; NKikimr::NMiniKQL::TProgramBuilder Pb_; TVector<NKikimr::NMiniKQL::TRuntimeNode> Items_; + std::unordered_map<const TTypeAnnotationNode*, NKikimr::NMiniKQL::TBlockType*> CachedTypes_; std::unordered_map<const TTypeAnnotationNode*, NKikimr::NMiniKQL::TRuntimeNode> CachedArgs_; }; diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_apply.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_apply.cpp index 7e2dabce4c9..cf6216d9499 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_apply.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_apply.cpp @@ -1,9 +1,12 @@ #include "mkql_apply.h" +#include "mkql_block_impl.h" #include <ydb/library/yql/minikql/computation/mkql_computation_node_codegen.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> #include <ydb/library/yql/minikql/mkql_node_cast.h> #include <library/cpp/containers/stack_array/stack_array.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> +#include <ydb/library/yql/minikql/computation/mkql_value_builder.h> namespace NKikimr { namespace NMiniKQL { @@ -11,19 +14,118 @@ namespace NMiniKQL { namespace { class TApplyWrapper: public TMutableCodegeneratorPtrNode<TApplyWrapper> { +friend class TArrowNode; typedef TMutableCodegeneratorPtrNode<TApplyWrapper> TBaseComputation; public: + struct TKernelState : public arrow::compute::KernelState { + TKernelState(ui32 argsCount) + : Alloc(__LOCATION__) + , MemInfo("Apply") + , HolderFactory(Alloc.Ref(), MemInfo) + , ValueBuilder(HolderFactory, NUdf::EValidatePolicy::Exception) + , Args(argsCount) + { + Alloc.Release(); + } + + ~TKernelState() + { + Alloc.Acquire(); + } + + TScopedAlloc Alloc; + TMemoryUsageInfo MemInfo; + THolderFactory HolderFactory; + TDefaultValueBuilder ValueBuilder; + TVector<NUdf::TUnboxedValue> Args; + }; + + class TArrowNode : public IArrowKernelComputationNode { + public: + TArrowNode(const TApplyWrapper* parent, const NUdf::TUnboxedValue& callable, TType* returnType, const TVector<TType*>& argsTypes) + : Parent_(parent) + , Callable_(callable) + , ArgsValuesDescr_(ToValueDescr(argsTypes)) + , Kernel_(ConvertToInputTypes(argsTypes), ConvertToOutputType(returnType), [parent, this](arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) { + auto& state = dynamic_cast<TKernelState&>(*ctx->state()); + auto guard = Guard(state.Alloc); + Y_ENSURE(batch.values.size() == state.Args.size()); + for (ui32 i = 0; i < batch.values.size(); ++i) { + state.Args[i] = state.HolderFactory.CreateArrowBlock(arrow::Datum(batch.values[i])); + } + + const auto ret = Callable_.Run(&state.ValueBuilder, state.Args.data()); + *res = TArrowBlock::From(ret).GetDatum(); + return arrow::Status::OK(); + }) + { + Kernel_.null_handling = arrow::compute::NullHandling::COMPUTED_NO_PREALLOCATE; + Kernel_.mem_allocation = arrow::compute::MemAllocation::NO_PREALLOCATE; + Kernel_.init = [argsCount = argsTypes.size()](arrow::compute::KernelContext*, const arrow::compute::KernelInitArgs&) { + auto state = std::make_unique<TKernelState>(argsCount); + return arrow::Result(std::move(state)); + }; + } + + TStringBuf GetKernelName() const final { + return "Apply"; + } + + const arrow::compute::ScalarKernel& GetArrowKernel() const { + return Kernel_; + } + + const std::vector<arrow::ValueDescr>& GetArgsDesc() const { + return ArgsValuesDescr_; + } + + const IComputationNode* GetArgument(ui32 index) const { + return Parent_->ArgNodes[index]; + } + + private: + const TApplyWrapper* Parent_; + const NUdf::TUnboxedValue Callable_; + const std::vector<arrow::ValueDescr> ArgsValuesDescr_; + arrow::compute::ScalarKernel Kernel_; + }; + TApplyWrapper(TComputationMutables& mutables, EValueRepresentation kind, IComputationNode* callableNode, - TComputationNodePtrVector&& argNodes, ui32 usedArgs, const NUdf::TSourcePosition& pos) + TComputationNodePtrVector&& argNodes, ui32 usedArgs, const NUdf::TSourcePosition& pos, TCallableType* callableType) : TBaseComputation(mutables, kind) , CallableNode(callableNode) , ArgNodes(std::move(argNodes)) , UsedArgs(usedArgs) , Position(pos) + , CallableType(callableType) { Stateless = false; } + std::unique_ptr<IArrowKernelComputationNode> PrepareArrowKernelComputationNode(TComputationContext& ctx) const final { + if (UsedArgs != CallableType->GetArgumentsCount()) { + return {}; + } + + std::shared_ptr<arrow::DataType> t; + if (!CallableType->GetReturnType()->IsBlock() || + !ConvertArrowType(AS_TYPE(TBlockType, CallableType->GetReturnType())->GetItemType(), t)) { + return {}; + } + + TVector<TType*> argsTypes; + for (ui32 i = 0; i < CallableType->GetArgumentsCount(); ++i) { + argsTypes.push_back(CallableType->GetArgumentType(i)); + if (!CallableType->GetArgumentType(i)->IsBlock() || + !ConvertArrowType(AS_TYPE(TBlockType, CallableType->GetArgumentType(i))->GetItemType(), t)) { + return {}; + } + } + + const auto callable = CallableNode->GetValue(ctx); + return std::make_unique<TArrowNode>(this, callable, CallableType->GetReturnType(), argsTypes); + } + NUdf::TUnboxedValue DoCalculate(TComputationContext& ctx) const { NStackArray::TStackArray<NUdf::TUnboxedValue> values(ALLOC_ON_STACK(NUdf::TUnboxedValue, UsedArgs)); for (size_t i = 0; i < UsedArgs; ++i) { @@ -97,6 +199,7 @@ private: const TComputationNodePtrVector ArgNodes; const ui32 UsedArgs; const NUdf::TSourcePosition Position; + TCallableType* CallableType; }; } @@ -140,7 +243,7 @@ IComputationNode* WrapApply(TCallable& callable, const TComputationNodeFactoryCo auto functionNode = LocateNode(ctx.NodeLocator, callable, 0); return new TApplyWrapper(ctx.Mutables, GetValueRepresentation(callable.GetType()->GetReturnType()), functionNode, std::move(argNodes), - callableType->GetArgumentsCount(), NUdf::TSourcePosition(row, column, file)); + callableType->GetArgumentsCount(), NUdf::TSourcePosition(row, column, file), callableType); } } diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_blocks_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_blocks_ut.cpp index b409a523a68..da310dedfc4 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_blocks_ut.cpp +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_blocks_ut.cpp @@ -5,6 +5,28 @@ #include <arrow/compute/exec_internal.h> #include <arrow/array/builder_primitive.h> +#include <ydb/library/yql/public/udf/udf_helpers.h> +#include <ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h> + +BEGIN_SIMPLE_ARROW_UDF(TInc, i32(i32)) { + Y_UNUSED(valueBuilder); + const i32 arg = args[0].Get<i32>(); + return NYql::NUdf::TUnboxedValuePod(arg + 1); +} + +struct TIncKernelExec : public NYql::NUdf::TUnaryKernelExec<TIncKernelExec> { + template <typename TSink> + static void Process(NYql::NUdf::TBlockItem arg, const TSink& sink) { + sink(NYql::NUdf::TBlockItem(arg.As<i32>() + 1)); + } +}; + +END_SIMPLE_ARROW_UDF(TInc, TIncKernelExec::Do); + +SIMPLE_MODULE(TBlockUTModule, + TInc +) + namespace NKikimr { namespace NMiniKQL { @@ -14,6 +36,11 @@ namespace { const auto& kernel = kernelNode->GetArrowKernel(); arrow::compute::KernelContext kernelContext(&execContext); std::unique_ptr<arrow::compute::KernelState> state; + if (kernel.init) { + state = ARROW_RESULT(kernel.init(&kernelContext, { &kernel, kernelNode->GetArgsDesc(), nullptr })); + kernelContext.SetState(state.get()); + } + auto executor = arrow::compute::detail::KernelExecutor::MakeScalar(); ARROW_OK(executor->Init(&kernelContext, { &kernel, kernelNode->GetArgsDesc(), nullptr })); auto listener = std::make_shared<arrow::compute::detail::DatumAccumulator>(); @@ -604,6 +631,56 @@ Y_UNIT_TEST(WithScalars) { } } +Y_UNIT_TEST(Udf) { + TVector<TUdfModuleInfo> modules; + modules.emplace_back(TUdfModuleInfo{"", "BlockUT", new TBlockUTModule()}); + TSetup<false> setup({}, std::move(modules)); + + auto& pb = *setup.PgmBuilder; + + const auto i32Type = pb.NewDataType(NUdf::TDataType<i32>::Id); + const auto i32BlocksType = pb.NewBlockType(i32Type, TBlockType::EShape::Many); + const auto arg1 = pb.Arg(i32BlocksType); + const auto userType = pb.NewTupleType({ + pb.NewTupleType({i32BlocksType}), + pb.NewEmptyStructType(), + pb.NewEmptyTupleType()}); + const auto udf = pb.Udf("BlockUT.Inc_BlocksImpl", pb.NewVoid(), userType); + const auto apply = pb.Apply(udf, {arg1}); + + const auto graph = setup.BuildGraph(apply, {arg1.GetNode() }); + const auto topology = graph->GetKernelsTopology(); + UNIT_ASSERT(topology); + UNIT_ASSERT_VALUES_EQUAL(topology->InputArgsCount, 1); + UNIT_ASSERT_VALUES_EQUAL(topology->Items.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(topology->Items[0].Node->GetKernelName(), "Apply"); + const std::vector<ui32> expectedInputs1{0}; + UNIT_ASSERT_VALUES_EQUAL(topology->Items[0].Inputs, expectedInputs1); + + arrow::compute::ExecContext execContext; + const size_t blockSize = 100000; + std::vector<arrow::Datum> datums(topology->InputArgsCount + topology->Items.size()); + { + arrow::Int32Builder builder1(execContext.memory_pool()); + ARROW_OK(builder1.Reserve(blockSize)); + for (size_t i = 0; i < blockSize; ++i) { + builder1.UnsafeAppend(i); + } + + std::shared_ptr<arrow::ArrayData> data1; + ARROW_OK(builder1.FinishInternal(&data1)); + datums[0] = data1; + } + + ExecuteAllKernels(datums, topology, execContext); + + auto res = datums.back().array()->GetValues<i32>(1); + for (size_t i = 0; i < blockSize; ++i) { + auto expected = i + 1; + UNIT_ASSERT_VALUES_EQUAL(res[i], expected); + } +} + } } diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_computation_node_ut.h b/ydb/library/yql/minikql/comp_nodes/ut/mkql_computation_node_ut.h index 0c3eb242122..41eb68dd0c8 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_computation_node_ut.h +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_computation_node_ut.h @@ -73,13 +73,28 @@ NUdf::TUnboxedValuePod ToValue(T value) { return NUdf::TUnboxedValuePod(value); } +struct TUdfModuleInfo { + TString LibraryPath; + TString ModuleName; + NUdf::TUniquePtr<NUdf::IUdfModule> Module; +}; + template<bool UseLLVM> struct TSetup { - TSetup(TComputationNodeFactory nodeFactory = {}) + TSetup(TComputationNodeFactory nodeFactory = {}, TVector<TUdfModuleInfo>&& modules = {}) : Alloc(__LOCATION__) { NodeFactory = nodeFactory; FunctionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry()); + if (!modules.empty()) { + auto mutableRegistry = FunctionRegistry->Clone(); + for (auto& m : modules) { + mutableRegistry->AddModule(m.LibraryPath, m.ModuleName, std::move(m.Module)); + } + + FunctionRegistry = mutableRegistry; + } + RandomProvider = CreateDeterministicRandomProvider(1); TimeProvider = CreateDeterministicTimeProvider(10000000); |