diff options
author | a-romanov <Anton.Romanov@ydb.tech> | 2023-09-18 19:28:07 +0300 |
---|---|---|
committer | a-romanov <Anton.Romanov@ydb.tech> | 2023-09-18 20:09:05 +0300 |
commit | 636c751bf747f51954ebecb915b5b2a3dce6a7eb (patch) | |
tree | b6143bdadb0a8a5b11a75bb06fe4d5f07e6360a8 | |
parent | 6cb5179992596006085e79a336f1cdca9fa2b994 (diff) | |
download | ydb-636c751bf747f51954ebecb915b5b2a3dce6a7eb.tar.gz |
YQL-15891 LLVM for WideFromBlocks.
15 files changed, 371 insertions, 154 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp index 5baa4ab6bd..7280012da8 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp @@ -143,7 +143,7 @@ IComputationNode* WrapBlockCoalesce(TCallable& callable, const TComputationNodeF auto firstCompute = LocateNode(ctx.NodeLocator, callable, 0); auto secondCompute = LocateNode(ctx.NodeLocator, callable, 1); - TVector<IComputationNode*> argsNodes = { firstCompute, secondCompute }; + TComputationNodePtrVector argsNodes = { firstCompute, secondCompute }; TVector<TType*> argsTypes = { firstType, secondType }; auto kernel = MakeBlockCoalesceKernel(argsTypes, secondType, needUnwrapFirst); diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp index 6fb4f88fdf..489fdde70b 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp @@ -152,7 +152,7 @@ private: } private: - IComputationWideFlowNode* Flow_; + IComputationWideFlowNode *const Flow_; const ui32 BitmapIndex_; const ui32 Width_; }; diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp index c933b114d9..f308372ec1 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp @@ -66,7 +66,7 @@ IComputationNode* WrapBlockFunc(TCallable& callable, const TComputationNodeFacto MKQL_ENSURE(callable.GetInputsCount() >= 1, "Expected at least 1 arg"); const auto funcNameData = AS_VALUE(TDataLiteral, callable.GetInput(0)); const auto funcName = TString(funcNameData->AsValue().AsStringRef()); - TVector<IComputationNode*> argsNodes; + TComputationNodePtrVector argsNodes; TVector<TType*> argsTypes; const auto callableType = callable.GetType(); for (ui32 i = 1; i < callable.GetInputsCount(); ++i) { diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_if.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_if.cpp index 8fda91715d..61b2e08dc0 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_if.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_if.cpp @@ -235,7 +235,7 @@ IComputationNode* WrapBlockIf(TCallable& callable, const TComputationNodeFactory thenIsScalar, elseIsScalar, argsTypes); } - TVector<IComputationNode*> argsNodes = { predCompute, thenCompute, elseCompute }; + TComputationNodePtrVector argsNodes = { predCompute, thenCompute, elseCompute }; std::shared_ptr<arrow::compute::ScalarKernel> kernel; if (thenIsScalar && elseIsScalar) { diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_just.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_just.cpp index 131047fe1a..7144668079 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_just.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_just.cpp @@ -71,7 +71,7 @@ IComputationNode* WrapBlockJust(TCallable& callable, const TComputationNodeFacto auto dataCompute = LocateNode(ctx.NodeLocator, callable, 0); - TVector<IComputationNode*> argsNodes = { dataCompute }; + TComputationNodePtrVector argsNodes = { dataCompute }; TVector<TType*> argsTypes = { dataType }; std::shared_ptr<arrow::compute::ScalarKernel> kernel; diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp index 39954fb989..50eb362ae6 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp @@ -456,7 +456,7 @@ IComputationNode* WrapBlockLogical(std::string_view name, TCallable& callable, c auto compute1 = LocateNode(ctx.NodeLocator, callable, 0); auto compute2 = LocateNode(ctx.NodeLocator, callable, 1); - TVector<IComputationNode*> argsNodes = { compute1, compute2 }; + TComputationNodePtrVector argsNodes = { compute1, compute2 }; TVector<TType*> argsTypes = { callable.GetInput(0).GetStaticType(), callable.GetInput(1).GetStaticType() }; std::shared_ptr<arrow::compute::ScalarKernel> kernel; @@ -494,7 +494,7 @@ IComputationNode* WrapBlockNot(TCallable& callable, const TComputationNodeFactor "Requires boolean args."); auto compute = LocateNode(ctx.NodeLocator, callable, 0); - TVector<IComputationNode*> argsNodes = { compute }; + TComputationNodePtrVector argsNodes = { compute }; TVector<TType*> argsTypes = { callable.GetInput(0).GetStaticType() }; auto kernel = MakeKernel<TNotBlockExec>(argsTypes, argsTypes[0]); diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_tuple.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_tuple.cpp index 9c59e96abe..494b6ad222 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_tuple.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_tuple.cpp @@ -148,7 +148,7 @@ std::shared_ptr<arrow::compute::ScalarKernel> MakeBlockNthKernel(const TVector<T } // namespace IComputationNode* WrapBlockAsTuple(TCallable& callable, const TComputationNodeFactoryContext& ctx) { - TVector<IComputationNode*> argsNodes; + TComputationNodePtrVector argsNodes; TVector<TType*> argsTypes; for (ui32 i = 0; i < callable.GetInputsCount(); ++i) { argsNodes.push_back(LocateNode(ctx.NodeLocator, callable, i)); @@ -173,7 +173,7 @@ IComputationNode* WrapBlockNth(TCallable& callable, const TComputationNodeFactor auto tuple = LocateNode(ctx.NodeLocator, callable, 0); - TVector<IComputationNode*> argsNodes = { tuple }; + TComputationNodePtrVector argsNodes = { tuple }; TVector<TType*> argsTypes = { blockType }; auto kernel = MakeBlockNthKernel(argsTypes, callable.GetType()->GetReturnType(), index, isOptional, needExternalOptional); return new TBlockFuncNode(ctx.Mutables, callable.GetType()->GetName(), std::move(argsNodes), argsTypes, *kernel, kernel); diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp index 47c565e9c7..613c667a07 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp @@ -1,4 +1,5 @@ #include "mkql_blocks.h" +#include "mkql_llvm_base.h" #include <ydb/library/yql/minikql/computation/mkql_block_reader.h> #include <ydb/library/yql/minikql/computation/mkql_block_builder.h> @@ -7,7 +8,7 @@ #include <ydb/library/yql/minikql/arrow/arrow_defs.h> #include <ydb/library/yql/minikql/arrow/arrow_util.h> #include <ydb/library/yql/minikql/mkql_type_builder.h> -#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node_codegen.h> #include <ydb/library/yql/minikql/mkql_node_builder.h> #include <ydb/library/yql/minikql/mkql_node_cast.h> @@ -17,6 +18,10 @@ #include <arrow/array.h> #include <arrow/datum.h> +extern "C" size_t GetCount(const NYql::NUdf::TUnboxedValuePod data) { + return NKikimr::NMiniKQL::TArrowBlock::From(data).GetDatum().scalar_as<arrow::UInt64Scalar>().value; +} + namespace NKikimr { namespace NMiniKQL { @@ -53,7 +58,6 @@ private: void RegisterDependencies() const final { FlowDependsOn(Flow_); } - private: IComputationNode* const Flow_; TType* ItemType_; @@ -112,7 +116,6 @@ public: s.Rows_ = 0; return EFetchResult::One; } - private: struct TState : public TComputationValue<TState> { std::vector<NUdf::TUnboxedValue> Values_; @@ -253,95 +256,269 @@ private: const ui32 StateIndex_; }; -class TWideFromBlocksWrapper : public TStatefulWideFlowComputationNode<TWideFromBlocksWrapper> { +class TWideFromBlocksWrapper : public TStatefulWideFlowCodegeneratorNode<TWideFromBlocksWrapper> { +using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideFromBlocksWrapper>; public: TWideFromBlocksWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, TVector<TType*>&& types) - : TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any) + : TBaseComputation(mutables, flow, EValueRepresentation::Boxed) , Flow_(flow) , Types_(std::move(types)) - , Width_(Types_.size()) - { - } + , WideFieldsIndex_(mutables.IncrementWideFieldsIndex(Types_.size() + 1U)) + {} EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { auto& s = GetState(state, ctx); - while (s.Index_ == s.Count_) { - auto result = Flow_->FetchValues(ctx, s.ValuePointers_.data()); - if (result != EFetchResult::One) { + const auto fields = ctx.WideFields.data() + WideFieldsIndex_; + if (s.Index_ == s.Count_) do { + if (const auto result = Flow_->FetchValues(ctx, fields); result != EFetchResult::One) return result; - } s.Index_ = 0; - s.Count_ = TArrowBlock::From(s.Values_[Width_]).GetDatum().scalar_as<arrow::UInt64Scalar>().value; + s.Count_ = GetCount(s.Values_.back()); + } while (!s.Count_); + + s.Current_ = s.Index_; + ++s.Index_; + for (size_t i = 0; i < Types_.size(); ++i) + if (const auto out = output[i]) + *out = s.Get(ctx.HolderFactory, i); + + return EFetchResult::One; + } +#ifndef MKQL_DISABLE_CODEGEN + ICodegeneratorInlineWideNode::TGenerateResult DoGenGetValues(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const { + auto& context = ctx.Codegen.GetContext(); + + const auto valueType = Type::getInt128Ty(context); + const auto ptrValueType = PointerType::getUnqual(valueType); + const auto statusType = Type::getInt32Ty(context); + const auto indexType = Type::getInt64Ty(context); + + TLLVMFieldsStructureState stateFields(context); + const auto stateType = StructType::get(context, stateFields.GetFieldsArray()); + const auto statePtrType = PointerType::getUnqual(stateType); + + const auto getFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::Get)); + const auto getType = FunctionType::get(valueType, {statePtrType, ctx.GetFactory()->getType(), indexType}, false); + const auto getPtr = CastInst::Create(Instruction::IntToPtr, getFunc, PointerType::getUnqual(getType), "get", &ctx.Func->getEntryBlock().back()); + const auto stateOnStack = new AllocaInst(statePtrType, 0U, "state_on_stack", &ctx.Func->getEntryBlock().back()); + new StoreInst(ConstantPointerNull::get(statePtrType), stateOnStack, &ctx.Func->getEntryBlock().back()); + + const auto name = "GetCount"; + ctx.Codegen.AddGlobalMapping(name, reinterpret_cast<const void*>(&GetCount)); + const auto getCountType = NYql::NCodegen::ETarget::Windows != ctx.Codegen.GetEffectiveTarget() ? + FunctionType::get(indexType, { valueType }, false): + FunctionType::get(indexType, { ptrValueType }, false); + const auto getCount = ctx.Codegen.GetModule().getOrInsertFunction(name, getCountType); + + const auto make = BasicBlock::Create(context, "make", ctx.Func); + const auto main = BasicBlock::Create(context, "main", ctx.Func); + const auto more = BasicBlock::Create(context, "more", ctx.Func); + const auto good = BasicBlock::Create(context, "good", ctx.Func); + const auto save = BasicBlock::Create(context, "save", ctx.Func); + const auto work = BasicBlock::Create(context, "work", ctx.Func); + const auto over = BasicBlock::Create(context, "over", ctx.Func); + + BranchInst::Create(main, make, HasValue(statePtr, block), block); + block = make; + + const auto ptrType = PointerType::getUnqual(StructType::get(context)); + const auto self = CastInst::Create(Instruction::IntToPtr, ConstantInt::get(Type::getInt64Ty(context), uintptr_t(this)), ptrType, "self", block); + const auto makeFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TWideFromBlocksWrapper::MakeState)); + const auto makeType = FunctionType::get(Type::getVoidTy(context), {self->getType(), ctx.Ctx->getType(), statePtr->getType()}, false); + const auto makeFuncPtr = CastInst::Create(Instruction::IntToPtr, makeFunc, PointerType::getUnqual(makeType), "function", block); + CallInst::Create(makeType, makeFuncPtr, {self, ctx.Ctx, statePtr}, "", block); + BranchInst::Create(main, block); + + block = main; + + const auto state = new LoadInst(valueType, statePtr, "state", block); + const auto half = CastInst::Create(Instruction::Trunc, state, Type::getInt64Ty(context), "half", block); + const auto stateArg = CastInst::Create(Instruction::IntToPtr, half, statePtrType, "state_arg", block); + + const auto countPtr = GetElementPtrInst::CreateInBounds(stateType, stateArg, { stateFields.This(), stateFields.GetCount() }, "count_ptr", block); + const auto indexPtr = GetElementPtrInst::CreateInBounds(stateType, stateArg, { stateFields.This(), stateFields.GetIndex() }, "index_ptr", block); + + const auto count = new LoadInst(indexType, countPtr, "count", block); + const auto index = new LoadInst(indexType, indexPtr, "index", block); + + const auto next = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, count, index, "next", block); + + BranchInst::Create(more, work, next, block); + + block = more; + + const auto pointerPtr = GetElementPtrInst::CreateInBounds(stateType, stateArg, { stateFields.This(), stateFields.GetPointer() }, "pointer_ptr", block); + const auto pointer = new LoadInst(ptrValueType, pointerPtr, "pointer", block); + + std::vector<Value*> pointers(Types_.size()); + for (size_t idx = 0U; idx < pointers.size(); ++idx) { + pointers[idx] = GetElementPtrInst::CreateInBounds(valueType, pointer, { ConstantInt::get(Type::getInt32Ty(context), idx) }, (TString("ptr_") += ToString(idx)).c_str(), block); + SafeUnRefUnboxed(pointers[idx], ctx, block); } - for (size_t i = 0; i < Width_; ++i) { - if (!output[i]) { - continue; - } + const auto getres = GetNodeValues(Flow_, ctx, block); - const auto& datum = TArrowBlock::From(s.Values_[i]).GetDatum(); - TBlockItem item; - if (datum.is_scalar()) { - item = s.Readers_[i]->GetScalarItem(*datum.scalar()); - } else { - MKQL_ENSURE(datum.is_array(), "Expecting array"); - item = s.Readers_[i]->GetItem(*datum.array(), s.Index_); - } + const auto special = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_SLE, getres.first, ConstantInt::get(getres.first->getType(), static_cast<i32>(EFetchResult::Yield)), "special", block); + + const auto result = PHINode::Create(statusType, 2U, "result", over); + result->addIncoming(getres.first, block); + + BranchInst::Create(over, good, special, block); + + block = good; + + const auto countValue = getres.second.back()(ctx, block); + const auto height = CallInst::Create(getCount, { WrapArgumentForWindows(countValue, ctx, block) }, "height", block); + CleanupBoxed(countValue, ctx, block); + + new StoreInst(height, countPtr, block); + new StoreInst(ConstantInt::get(indexType, 0), indexPtr, block); + + const auto empty = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, ConstantInt::get(indexType, 0), height, "empty", block); - *(output[i]) = s.Converters_[i]->MakeValue(item, ctx.HolderFactory); + BranchInst::Create(more, save, empty, block); + + block = save; + + for (size_t idx = 0U; idx < pointers.size(); ++idx) { + const auto value = getres.second[idx](ctx, block); + AddRefBoxed(value, ctx, block); + new StoreInst(value, pointers[idx], block); } - ++s.Index_; - return EFetchResult::One; - } + BranchInst::Create(work, block); + + block = work; + const auto current = new LoadInst(indexType, indexPtr, "current", block); + const auto currentPtr = GetElementPtrInst::CreateInBounds(stateType, stateArg, { stateFields.This(), stateFields.GetCurrent() }, "current_ptr", block); + new StoreInst(current, currentPtr, block); + const auto increment = BinaryOperator::CreateAdd(current, ConstantInt::get(indexType, 1), "increment", block); + new StoreInst(increment, indexPtr, block); + new StoreInst(stateArg, stateOnStack, block); + + result->addIncoming(ConstantInt::get(statusType, static_cast<i32>(EFetchResult::One)), block); + + BranchInst::Create(over, block); + + block = over; + + ICodegeneratorInlineWideNode::TGettersList getters(Types_.size()); + for (size_t idx = 0U; idx < getters.size(); ++idx) { + getters[idx] = [idx, getType, getPtr, indexType, statePtrType, stateOnStack](const TCodegenContext& ctx, BasicBlock*& block) { + const auto stateArg = new LoadInst(statePtrType, stateOnStack, "state", block); + return CallInst::Create(getType, getPtr, {stateArg, ctx.GetFactory(), ConstantInt::get(indexType, idx)}, "get", block); + }; + } + return {result, std::move(getters)}; + } +#endif private: struct TState : public TComputationValue<TState> { - TVector<NUdf::TUnboxedValue> Values_; - TVector<NUdf::TUnboxedValue*> ValuePointers_; - TVector<std::unique_ptr<IBlockReader>> Readers_; - TVector<std::unique_ptr<IBlockItemConverter>> Converters_; size_t Count_ = 0; size_t Index_ = 0; + size_t Current_ = 0; + NUdf::TUnboxedValue* Pointer_ = nullptr; + TUnboxedValueVector Values_; + std::vector<std::unique_ptr<IBlockReader>> Readers_; + std::vector<std::unique_ptr<IBlockItemConverter>> Converters_; - TState(TMemoryUsageInfo* memInfo, const TVector<TType*>& types, const NUdf::IPgBuilder& pgBuilder) + TState(TMemoryUsageInfo* memInfo, TComputationContext& ctx, size_t wideFieldsIndex, const TVector<TType*>& types) : TComputationValue(memInfo) , Values_(types.size() + 1) - , ValuePointers_(types.size() + 1) { + Pointer_ = Values_.data(); + auto**const fields = ctx.WideFields.data() + wideFieldsIndex; for (size_t i = 0; i < types.size() + 1; ++i) { - ValuePointers_[i] = &Values_[i]; + fields[i] = &Values_[i]; } + const auto& pgBuilder = ctx.Builder->GetPgBuilder(); for (size_t i = 0; i < types.size(); ++i) { Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), types[i])); Converters_.push_back(MakeBlockItemConverter(TTypeInfoHelper(), types[i], pgBuilder)); } } + + NUdf::TUnboxedValuePod Get(const THolderFactory& holderFactory, size_t idx) const { + TBlockItem item; + if (const auto& datum = TArrowBlock::From(Values_[idx]).GetDatum(); datum.is_scalar()) { + item = Readers_[idx]->GetScalarItem(*datum.scalar()); + } else { + MKQL_ENSURE(datum.is_array(), "Expecting array"); + item = Readers_[idx]->GetItem(*datum.array(), Current_); + } + return Converters_[idx]->MakeValue(item, holderFactory); + } }; +#ifndef MKQL_DISABLE_CODEGEN + class TLLVMFieldsStructureState: public TLLVMFieldsStructure<TComputationValue<TState>> { + private: + using TBase = TLLVMFieldsStructure<TComputationValue<TState>>; + llvm::IntegerType*const CountType; + llvm::IntegerType*const IndexType; + llvm::IntegerType*const CurrentType; + llvm::PointerType*const PointerType; + protected: + using TBase::Context; + public: + std::vector<llvm::Type*> GetFieldsArray() { + std::vector<llvm::Type*> result = TBase::GetFields(); + result.emplace_back(CountType); + result.emplace_back(IndexType); + result.emplace_back(CurrentType); + result.emplace_back(PointerType); + return result; + } -private: + llvm::Constant* GetCount() { + return ConstantInt::get(Type::getInt32Ty(Context), TBase::GetFieldsCount() + 0); + } + + llvm::Constant* GetIndex() { + return ConstantInt::get(Type::getInt32Ty(Context), TBase::GetFieldsCount() + 1); + } + + llvm::Constant* GetCurrent() { + return ConstantInt::get(Type::getInt32Ty(Context), TBase::GetFieldsCount() + 2); + } + + llvm::Constant* GetPointer() { + return ConstantInt::get(Type::getInt32Ty(Context), TBase::GetFieldsCount() + 3); + } + + TLLVMFieldsStructureState(llvm::LLVMContext& context) + : TBase(context) + , CountType(Type::getInt64Ty(Context)) + , IndexType(Type::getInt64Ty(Context)) + , CurrentType(Type::getInt64Ty(Context)) + , PointerType(PointerType::getUnqual(Type::getInt128Ty(Context))) + {} + }; +#endif void RegisterDependencies() const final { FlowDependsOn(Flow_); } + void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const { + state = ctx.HolderFactory.Create<TState>(ctx, WideFieldsIndex_, Types_); + } + TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const { - if (!state.HasValue()) { - state = ctx.HolderFactory.Create<TState>(Types_, ctx.Builder->GetPgBuilder()); - } + if (!state.HasValue()) + MakeState(ctx, state); return *static_cast<TState*>(state.AsBoxed().Get()); } -private: - IComputationWideFlowNode* Flow_; + IComputationWideFlowNode* const Flow_; const TVector<TType*> Types_; - const size_t Width_; + const size_t WideFieldsIndex_; }; class TPrecomputedArrowNode : public IArrowKernelComputationNode { @@ -373,7 +550,6 @@ public: Y_UNUSED(index); ythrow yexception() << "No input arguments"; } - private: arrow::compute::ScalarKernel Kernel_; const TStringBuf KernelName_; @@ -477,10 +653,9 @@ private: FlowDependsOn(Flow_); } - IComputationWideFlowNode* Flow_; + IComputationWideFlowNode* const Flow_; }; - } // namespace IComputationNode* WrapToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) { diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_top_sort.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_top_sort.cpp index d4e67db3a0..579dec75a9 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_top_sort.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_top_sort.cpp @@ -432,8 +432,7 @@ public: ValueAddRef(Representations[i], item, ctx, block); new StoreInst(item, placeholders[i], block); } - - } else { + } else { for (auto i = 0U; i < Keys.size(); ++i) { const auto item = getres.second[Indexes[i]](ctx, block); new StoreInst(item, placeholders[i], block); @@ -463,7 +462,6 @@ public: new StoreInst(item, placeholders[i], block); } - BranchInst::Create(loop, block); block = skip; diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_blocks_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_blocks_ut.cpp index d8940a82e8..c968bc7941 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_blocks_ut.cpp +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_blocks_ut.cpp @@ -33,7 +33,7 @@ namespace NMiniKQL { namespace { arrow::Datum ExecuteOneKernel(const IArrowKernelComputationNode* kernelNode, const std::vector<arrow::Datum>& argDatums, arrow::compute::ExecContext& execContext) { - const auto& kernel = kernelNode->GetArrowKernel(); + const auto& kernel = kernelNode->GetArrowKernel(); arrow::compute::KernelContext kernelContext(&execContext); std::unique_ptr<arrow::compute::KernelState> state; if (kernel.init) { @@ -58,7 +58,7 @@ namespace { arrow::Datum output = ExecuteOneKernel(topology->Items[i].Node.get(), argDatums, execContext); datums[i + topology->InputArgsCount] = output; - } + } } } @@ -84,9 +84,10 @@ Y_UNIT_TEST(TestSimple) { TSetup<false> setup; auto& pb = *setup.PgmBuilder; - auto data = TVector<TRuntimeNode>(Reserve(dataCount)); - for (size_t i = 0; i < dataCount; ++i) { - data.push_back(pb.NewDataLiteral<ui64>(i)); + TRuntimeNode::TList data; + data.reserve(dataCount); + for (ui64 i = 0ULL; i < dataCount; ++i) { + data.push_back(pb.NewDataLiteral(i)); } const auto type = pb.NewDataType(NUdf::TDataType<ui64>::Id); const auto list = pb.NewList(type, data); @@ -108,7 +109,7 @@ Y_UNIT_TEST(TestSimple) { Y_UNIT_TEST(TestWideToBlocks) { TSetup<false> setup; - auto& pb = *setup.PgmBuilder; + TProgramBuilder& pb = *setup.PgmBuilder; const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id); const auto tupleType = pb.NewTupleType({ui64Type, ui64Type}); @@ -156,7 +157,7 @@ void TestChunked(bool withBlockExpand) { const auto tupleType = pb.NewTupleType({ui64Type, boolType, stringType, utf8Type}); - TVector<TRuntimeNode> items; + TRuntimeNode::TList items; const size_t bigStrSize = 1024 * 1024 + 100; const size_t smallStrSize = 256 * 1024; for (size_t i = 0; i < 20; ++i) { @@ -513,7 +514,41 @@ Y_UNIT_TEST(TestBlockFuncWithScalar) { UNIT_ASSERT(!iterator.Next(item)); } -Y_UNIT_TEST(TestWideFromBlocks) { +Y_UNIT_TEST_LLVM(TestWideFromBlocks) { + TSetup<LLVM> setup; + TProgramBuilder& pb = *setup.PgmBuilder; + + const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id); + + const auto data1 = pb.NewDataLiteral<ui64>(10); + const auto data2 = pb.NewDataLiteral<ui64>(20); + const auto data3 = pb.NewDataLiteral<ui64>(30); + + const auto list = pb.NewList(ui64Type, {data1, data2, data3}); + const auto flow = pb.ToFlow(list); + + const auto blocksFlow = pb.ToBlocks(flow); + const auto wideFlow = pb.ExpandMap(blocksFlow, [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item, pb.AsScalar(pb.NewDataLiteral<ui64>(3ULL))}; }); + const auto wideFlow2 = pb.WideFromBlocks(wideFlow); + const auto narrowFlow = pb.NarrowMap(wideFlow2, [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); }); + + const auto pgmReturn = pb.Collect(narrowFlow); + + const auto graph = setup.BuildGraph(pgmReturn); + const auto iterator = graph->GetValue().GetListIterator(); + + NUdf::TUnboxedValue item; + UNIT_ASSERT(iterator.Next(item)); + UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 10); + + UNIT_ASSERT(iterator.Next(item)); + UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 20); + + UNIT_ASSERT(iterator.Next(item)); + UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 30); +} + +Y_UNIT_TEST(TestWideToAndFromBlocks) { TSetup<false> setup; auto& pb = *setup.PgmBuilder; diff --git a/ydb/library/yql/minikql/computation/mkql_block_impl.cpp b/ydb/library/yql/minikql/computation/mkql_block_impl.cpp index c54f76b7bc..8bd1647514 100644 --- a/ydb/library/yql/minikql/computation/mkql_block_impl.cpp +++ b/ydb/library/yql/minikql/computation/mkql_block_impl.cpp @@ -151,7 +151,7 @@ arrow::compute::OutputType ConvertToOutputType(TType* output) { return arrow::compute::OutputType(ToValueDescr(output)); } -TBlockFuncNode::TBlockFuncNode(TComputationMutables& mutables, TStringBuf name, TVector<IComputationNode*>&& argsNodes, +TBlockFuncNode::TBlockFuncNode(TComputationMutables& mutables, TStringBuf name, TComputationNodePtrVector&& argsNodes, const TVector<TType*>& argsTypes, const arrow::compute::ScalarKernel& kernel, std::shared_ptr<arrow::compute::ScalarKernel> kernelHolder, const arrow::compute::FunctionOptions* functionOptions) diff --git a/ydb/library/yql/minikql/computation/mkql_block_impl.h b/ydb/library/yql/minikql/computation/mkql_block_impl.h index 7f7f001551..cb5904bf5a 100644 --- a/ydb/library/yql/minikql/computation/mkql_block_impl.h +++ b/ydb/library/yql/minikql/computation/mkql_block_impl.h @@ -26,7 +26,7 @@ arrow::compute::OutputType ConvertToOutputType(TType* output); class TBlockFuncNode : public TMutableComputationNode<TBlockFuncNode> { public: - TBlockFuncNode(TComputationMutables& mutables, TStringBuf name, TVector<IComputationNode*>&& argsNodes, + TBlockFuncNode(TComputationMutables& mutables, TStringBuf name, TComputationNodePtrVector&& argsNodes, const TVector<TType*>& argsTypes, const arrow::compute::ScalarKernel& kernel, std::shared_ptr<arrow::compute::ScalarKernel> kernelHolder = {}, const arrow::compute::FunctionOptions* functionOptions = nullptr); @@ -74,7 +74,7 @@ private: private: const ui32 StateIndex; - const TVector<IComputationNode*> ArgsNodes; + const TComputationNodePtrVector ArgsNodes; const std::vector<arrow::ValueDescr> ArgsValuesDescr; const arrow::compute::ScalarKernel& Kernel; const std::shared_ptr<arrow::compute::ScalarKernel> KernelHolder; @@ -83,6 +83,79 @@ private: const TString Name; }; +struct TBlockState : public TComputationValue<TBlockState> { + using TBase = TComputationValue<TBlockState>; + + ui64 Count = 0; + TUnboxedValueVector Values; + std::vector<std::deque<std::shared_ptr<arrow::ArrayData>>> Arrays; + + TBlockState(TMemoryUsageInfo* memInfo, size_t width) + : TBase(memInfo), Values(width), Arrays(width - 1ULL) + {} + + void FillArrays(NUdf::TUnboxedValue*const* output) { + auto& counterDatum = TArrowBlock::From(Values.back()).GetDatum(); + MKQL_ENSURE(counterDatum.is_scalar(), "Unexpected block length type (expecting scalar)"); + Count = counterDatum.scalar_as<arrow::UInt64Scalar>().value; + if (!Count) { + return; + } + for (size_t i = 0U; i < Arrays.size(); ++i) { + Arrays[i].clear(); + if (!output[i]) { + return; + } + auto& datum = TArrowBlock::From(Values[i]).GetDatum(); + if (datum.is_scalar()) { + return; + } + MKQL_ENSURE(datum.is_arraylike(), "Unexpected block type (expecting array or chunked array)"); + if (datum.is_array()) { + Arrays[i].push_back(datum.array()); + } else { + for (auto& chunk : datum.chunks()) { + Arrays[i].push_back(chunk->data()); + } + } + } + } + + void FillOutputs(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) { + auto sliceSize = Count; + for (size_t i = 0; i < Arrays.size(); ++i) { + const auto& arr = Arrays[i]; + if (arr.empty()) { + continue; + } + + MKQL_ENSURE(ui64(arr.front()->length) <= Count, "Unexpected array length at column #" << i); + sliceSize = std::min<ui64>(sliceSize, arr.front()->length); + } + + for (size_t i = 0; i < Arrays.size(); ++i) { + if (const auto out = output[i]) { + if (Arrays[i].empty()) { + *out = Values[i]; + continue; + } + + if (auto& array = Arrays[i].front(); ui64(array->length) == sliceSize) { + *out = ctx.HolderFactory.CreateArrowBlock(std::move(array)); + Arrays[i].pop_front(); + } else { + *out = ctx.HolderFactory.CreateArrowBlock(Chop(array, sliceSize)); + } + } + } + + if (const auto out = output[Values.size() - 1U]) { + *out = ctx.HolderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(sliceSize))); + } + Count -= sliceSize; + } +}; + template <typename TDerived> class TStatefulWideFlowBlockComputationNode: public TWideFlowBaseComputationNode<TDerived> { @@ -90,109 +163,46 @@ protected: TStatefulWideFlowBlockComputationNode(TComputationMutables& mutables, const IComputationNode* source, ui32 width) : TWideFlowBaseComputationNode<TDerived>(source) , StateIndex(mutables.CurValueIndex++) - , StateKind(EValueRepresentation::Any) + , StateKind(EValueRepresentation::Boxed) , Width(width) + , WideFieldsIndex(mutables.IncrementWideFieldsIndex(width)) { MKQL_ENSURE(width > 0, "Wide flow blocks should have at least one column (block length)"); } - private: - struct TState : public TComputationValue<TState> { - using TBase = TComputationValue<TState>; - - TVector<NUdf::TUnboxedValue> Values; - TVector<NUdf::TUnboxedValue*> ValuePointers; + struct TState : public TBlockState { NUdf::TUnboxedValue ChildState; - TVector<TDeque<std::shared_ptr<arrow::ArrayData>>> Arrays; - ui64 Count = 0; - - TState(TMemoryUsageInfo* memInfo, size_t width, NUdf::TUnboxedValue*const* values, TComputationContext&) - : TBase(memInfo) - , Values(width) - , ValuePointers(width) - , Arrays(width - 1) + + TState(TMemoryUsageInfo* memInfo, size_t width, ui32 wideFieldsIndex, NUdf::TUnboxedValue*const* values, TComputationContext& ctx) + : TBlockState(memInfo, width) { + auto**const fields = ctx.WideFields.data() + wideFieldsIndex; for (size_t i = 0; i < width - 1; ++i) { - ValuePointers[i] = values[i] ? &Values[i] : nullptr; + fields[i] = values[i] ? &Values[i] : nullptr; } - ValuePointers.back() = &Values.back(); + fields[width - 1] = &Values.back(); } }; TState& GetState(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { auto& state = ctx.MutableValues[GetIndex()]; if (!state.HasValue()) { - state = ctx.HolderFactory.Create<TState>(Width, output, ctx); + state = ctx.HolderFactory.Create<TState>(Width, WideFieldsIndex, output, ctx); } return *static_cast<TState*>(state.AsBoxed().Get()); } EFetchResult FetchValues(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const final { - TState& s = GetState(ctx, output); + auto& s = GetState(ctx, output); + const auto fields = ctx.WideFields.data() + WideFieldsIndex; while (s.Count == 0) { - auto result = static_cast<const TDerived*>(this)->DoCalculate(s.ChildState, ctx, s.ValuePointers.data()); - if (result != EFetchResult::One) { + if (const auto result = static_cast<const TDerived*>(this)->DoCalculate(s.ChildState, ctx, fields); result != EFetchResult::One) { return result; } - - auto& counterDatum = TArrowBlock::From(s.Values.back()).GetDatum(); - MKQL_ENSURE(counterDatum.is_scalar(), "Unexpected block length type (expecting scalar)"); - s.Count = counterDatum.template scalar_as<arrow::UInt64Scalar>().value; - if (!s.Count) { - continue; - } - for (size_t i = 0; i < Width - 1; ++i) { - s.Arrays[i].clear(); - if (!output[i]) { - continue; - } - auto& datum = TArrowBlock::From(s.Values[i]).GetDatum(); - if (datum.is_scalar()) { - continue; - } - MKQL_ENSURE(datum.is_arraylike(), "Unexpected block type (expecting array or chunked array)"); - if (datum.is_array()) { - s.Arrays[i].push_back(datum.array()); - } else { - for (auto& chunk : datum.chunks()) { - s.Arrays[i].push_back(chunk->data()); - } - } - } + s.FillArrays(output); } - ui64 sliceSize = s.Count; - for (size_t i = 0; i < s.Arrays.size(); ++i) { - const auto& arr = s.Arrays[i]; - if (arr.empty()) { - continue; - } - MKQL_ENSURE(ui64(arr.front()->length) <= s.Count, "Unexpected array length at column #" << i); - sliceSize = std::min<ui64>(sliceSize, arr.front()->length); - } - - for (size_t i = 0; i < s.Arrays.size(); ++i) { - if (!output[i]) { - continue; - } - if (s.Arrays[i].empty()) { - *(output[i]) = s.Values[i]; - continue; - } - - auto& array = s.Arrays[i].front(); - if (ui64(array->length) == sliceSize) { - *(output[i]) = ctx.HolderFactory.CreateArrowBlock(std::move(array)); - s.Arrays[i].pop_front(); - } else { - *(output[i]) = ctx.HolderFactory.CreateArrowBlock(Chop(array, sliceSize)); - } - } - - if (output[Width - 1]) { - *(output[Width - 1]) = ctx.HolderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(sliceSize))); - } - s.Count -= sliceSize; + s.FillOutputs(ctx, output); return EFetchResult::One; } @@ -209,10 +219,11 @@ private: this->Dependence->CollectDependentIndexes(owner, dependencies); } } - +protected: const ui32 StateIndex; const EValueRepresentation StateKind; const ui32 Width; + const ui32 WideFieldsIndex; }; } //namespace NKikimr::NMiniKQL diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_holders.h b/ydb/library/yql/minikql/computation/mkql_computation_node_holders.h index 1b978734bc..0dd7703f71 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_holders.h +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_holders.h @@ -639,8 +639,6 @@ private: THashMap<TTypeBase, TValuePtr, THasherTType, TEqualTType> Registry; }; - - ////////////////////////////////////////////////////////////////////////////// // THolderFactory ////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/library/yql/minikql/mkql_type_builder.cpp b/ydb/library/yql/minikql/mkql_type_builder.cpp index abab15a9f8..71573fc155 100644 --- a/ydb/library/yql/minikql/mkql_type_builder.cpp +++ b/ydb/library/yql/minikql/mkql_type_builder.cpp @@ -1372,7 +1372,7 @@ public: } auto cmpKeys = CompareKey_->Compare( - static_cast<const NUdf::TUnboxedValuePod&>(lhsCurr->first), + static_cast<const NUdf::TUnboxedValuePod&>(lhsCurr->first), static_cast<const NUdf::TUnboxedValuePod&>(rhsCurr->first) ); @@ -1381,7 +1381,7 @@ public: } auto cmpPayloads = ComparePayload_->Compare( - static_cast<const NUdf::TUnboxedValuePod&>(lhsCurr->second), + static_cast<const NUdf::TUnboxedValuePod&>(lhsCurr->second), static_cast<const NUdf::TUnboxedValuePod&>(rhsCurr->second) ); @@ -2288,7 +2288,7 @@ NUdf::ICompare::TPtr MakeCompareImpl(const NMiniKQL::TType* type) { case NMiniKQL::TType::EKind::List: return new TCompare<NMiniKQL::TType::EKind::List>(type); case NMiniKQL::TType::EKind::Dict: - return new TCompare<NMiniKQL::TType::EKind::Dict>(type); + return new TCompare<NMiniKQL::TType::EKind::Dict>(type); case NMiniKQL::TType::EKind::Pg: return MakePgCompare((const TPgType*)type); case NMiniKQL::TType::EKind::Tagged: { diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp index 0e1da3cf8d..c897505018 100644 --- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp +++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp @@ -1759,7 +1759,7 @@ TComputationNodeFactory GetPgFactory() { const auto idData = AS_VALUE(TDataLiteral, callable.GetInput(1)); auto name = nameData->AsValue().AsStringRef(); auto id = idData->AsValue().Get<ui32>(); - TVector<IComputationNode*> argNodes; + TComputationNodePtrVector argNodes; TVector<TType*> argTypes; for (ui32 i = 2; i < callable.GetInputsCount(); ++i) { argNodes.emplace_back(LocateNode(ctx.NodeLocator, callable, i)); |