aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <Anton.Romanov@ydb.tech>2023-09-18 19:28:07 +0300
committera-romanov <Anton.Romanov@ydb.tech>2023-09-18 20:09:05 +0300
commit636c751bf747f51954ebecb915b5b2a3dce6a7eb (patch)
treeb6143bdadb0a8a5b11a75bb06fe4d5f07e6360a8
parent6cb5179992596006085e79a336f1cdca9fa2b994 (diff)
downloadydb-636c751bf747f51954ebecb915b5b2a3dce6a7eb.tar.gz
YQL-15891 LLVM for WideFromBlocks.
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp2
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp2
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp2
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_if.cpp2
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_just.cpp2
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp4
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_tuple.cpp4
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp267
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_wide_top_sort.cpp4
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/mkql_blocks_ut.cpp51
-rw-r--r--ydb/library/yql/minikql/computation/mkql_block_impl.cpp2
-rw-r--r--ydb/library/yql/minikql/computation/mkql_block_impl.h173
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node_holders.h2
-rw-r--r--ydb/library/yql/minikql/mkql_type_builder.cpp6
-rw-r--r--ydb/library/yql/parser/pg_wrapper/comp_factory.cpp2
15 files changed, 371 insertions, 154 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp
index 5baa4ab6bd..7280012da8 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp
@@ -143,7 +143,7 @@ IComputationNode* WrapBlockCoalesce(TCallable& callable, const TComputationNodeF
auto firstCompute = LocateNode(ctx.NodeLocator, callable, 0);
auto secondCompute = LocateNode(ctx.NodeLocator, callable, 1);
- TVector<IComputationNode*> argsNodes = { firstCompute, secondCompute };
+ TComputationNodePtrVector argsNodes = { firstCompute, secondCompute };
TVector<TType*> argsTypes = { firstType, secondType };
auto kernel = MakeBlockCoalesceKernel(argsTypes, secondType, needUnwrapFirst);
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp
index 6fb4f88fdf..489fdde70b 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp
@@ -152,7 +152,7 @@ private:
}
private:
- IComputationWideFlowNode* Flow_;
+ IComputationWideFlowNode *const Flow_;
const ui32 BitmapIndex_;
const ui32 Width_;
};
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp
index c933b114d9..f308372ec1 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp
@@ -66,7 +66,7 @@ IComputationNode* WrapBlockFunc(TCallable& callable, const TComputationNodeFacto
MKQL_ENSURE(callable.GetInputsCount() >= 1, "Expected at least 1 arg");
const auto funcNameData = AS_VALUE(TDataLiteral, callable.GetInput(0));
const auto funcName = TString(funcNameData->AsValue().AsStringRef());
- TVector<IComputationNode*> argsNodes;
+ TComputationNodePtrVector argsNodes;
TVector<TType*> argsTypes;
const auto callableType = callable.GetType();
for (ui32 i = 1; i < callable.GetInputsCount(); ++i) {
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_if.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_if.cpp
index 8fda91715d..61b2e08dc0 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_if.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_if.cpp
@@ -235,7 +235,7 @@ IComputationNode* WrapBlockIf(TCallable& callable, const TComputationNodeFactory
thenIsScalar, elseIsScalar, argsTypes);
}
- TVector<IComputationNode*> argsNodes = { predCompute, thenCompute, elseCompute };
+ TComputationNodePtrVector argsNodes = { predCompute, thenCompute, elseCompute };
std::shared_ptr<arrow::compute::ScalarKernel> kernel;
if (thenIsScalar && elseIsScalar) {
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_just.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_just.cpp
index 131047fe1a..7144668079 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_just.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_just.cpp
@@ -71,7 +71,7 @@ IComputationNode* WrapBlockJust(TCallable& callable, const TComputationNodeFacto
auto dataCompute = LocateNode(ctx.NodeLocator, callable, 0);
- TVector<IComputationNode*> argsNodes = { dataCompute };
+ TComputationNodePtrVector argsNodes = { dataCompute };
TVector<TType*> argsTypes = { dataType };
std::shared_ptr<arrow::compute::ScalarKernel> kernel;
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp
index 39954fb989..50eb362ae6 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp
@@ -456,7 +456,7 @@ IComputationNode* WrapBlockLogical(std::string_view name, TCallable& callable, c
auto compute1 = LocateNode(ctx.NodeLocator, callable, 0);
auto compute2 = LocateNode(ctx.NodeLocator, callable, 1);
- TVector<IComputationNode*> argsNodes = { compute1, compute2 };
+ TComputationNodePtrVector argsNodes = { compute1, compute2 };
TVector<TType*> argsTypes = { callable.GetInput(0).GetStaticType(), callable.GetInput(1).GetStaticType() };
std::shared_ptr<arrow::compute::ScalarKernel> kernel;
@@ -494,7 +494,7 @@ IComputationNode* WrapBlockNot(TCallable& callable, const TComputationNodeFactor
"Requires boolean args.");
auto compute = LocateNode(ctx.NodeLocator, callable, 0);
- TVector<IComputationNode*> argsNodes = { compute };
+ TComputationNodePtrVector argsNodes = { compute };
TVector<TType*> argsTypes = { callable.GetInput(0).GetStaticType() };
auto kernel = MakeKernel<TNotBlockExec>(argsTypes, argsTypes[0]);
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_tuple.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_tuple.cpp
index 9c59e96abe..494b6ad222 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_tuple.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_tuple.cpp
@@ -148,7 +148,7 @@ std::shared_ptr<arrow::compute::ScalarKernel> MakeBlockNthKernel(const TVector<T
} // namespace
IComputationNode* WrapBlockAsTuple(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- TVector<IComputationNode*> argsNodes;
+ TComputationNodePtrVector argsNodes;
TVector<TType*> argsTypes;
for (ui32 i = 0; i < callable.GetInputsCount(); ++i) {
argsNodes.push_back(LocateNode(ctx.NodeLocator, callable, i));
@@ -173,7 +173,7 @@ IComputationNode* WrapBlockNth(TCallable& callable, const TComputationNodeFactor
auto tuple = LocateNode(ctx.NodeLocator, callable, 0);
- TVector<IComputationNode*> argsNodes = { tuple };
+ TComputationNodePtrVector argsNodes = { tuple };
TVector<TType*> argsTypes = { blockType };
auto kernel = MakeBlockNthKernel(argsTypes, callable.GetType()->GetReturnType(), index, isOptional, needExternalOptional);
return new TBlockFuncNode(ctx.Mutables, callable.GetType()->GetName(), std::move(argsNodes), argsTypes, *kernel, kernel);
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
index 47c565e9c7..613c667a07 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
@@ -1,4 +1,5 @@
#include "mkql_blocks.h"
+#include "mkql_llvm_base.h"
#include <ydb/library/yql/minikql/computation/mkql_block_reader.h>
#include <ydb/library/yql/minikql/computation/mkql_block_builder.h>
@@ -7,7 +8,7 @@
#include <ydb/library/yql/minikql/arrow/arrow_defs.h>
#include <ydb/library/yql/minikql/arrow/arrow_util.h>
#include <ydb/library/yql/minikql/mkql_type_builder.h>
-#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_codegen.h>
#include <ydb/library/yql/minikql/mkql_node_builder.h>
#include <ydb/library/yql/minikql/mkql_node_cast.h>
@@ -17,6 +18,10 @@
#include <arrow/array.h>
#include <arrow/datum.h>
+extern "C" size_t GetCount(const NYql::NUdf::TUnboxedValuePod data) {
+ return NKikimr::NMiniKQL::TArrowBlock::From(data).GetDatum().scalar_as<arrow::UInt64Scalar>().value;
+}
+
namespace NKikimr {
namespace NMiniKQL {
@@ -53,7 +58,6 @@ private:
void RegisterDependencies() const final {
FlowDependsOn(Flow_);
}
-
private:
IComputationNode* const Flow_;
TType* ItemType_;
@@ -112,7 +116,6 @@ public:
s.Rows_ = 0;
return EFetchResult::One;
}
-
private:
struct TState : public TComputationValue<TState> {
std::vector<NUdf::TUnboxedValue> Values_;
@@ -253,95 +256,269 @@ private:
const ui32 StateIndex_;
};
-class TWideFromBlocksWrapper : public TStatefulWideFlowComputationNode<TWideFromBlocksWrapper> {
+class TWideFromBlocksWrapper : public TStatefulWideFlowCodegeneratorNode<TWideFromBlocksWrapper> {
+using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideFromBlocksWrapper>;
public:
TWideFromBlocksWrapper(TComputationMutables& mutables,
IComputationWideFlowNode* flow,
TVector<TType*>&& types)
- : TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any)
+ : TBaseComputation(mutables, flow, EValueRepresentation::Boxed)
, Flow_(flow)
, Types_(std::move(types))
- , Width_(Types_.size())
- {
- }
+ , WideFieldsIndex_(mutables.IncrementWideFieldsIndex(Types_.size() + 1U))
+ {}
EFetchResult DoCalculate(NUdf::TUnboxedValue& state,
TComputationContext& ctx,
NUdf::TUnboxedValue*const* output) const
{
auto& s = GetState(state, ctx);
- while (s.Index_ == s.Count_) {
- auto result = Flow_->FetchValues(ctx, s.ValuePointers_.data());
- if (result != EFetchResult::One) {
+ const auto fields = ctx.WideFields.data() + WideFieldsIndex_;
+ if (s.Index_ == s.Count_) do {
+ if (const auto result = Flow_->FetchValues(ctx, fields); result != EFetchResult::One)
return result;
- }
s.Index_ = 0;
- s.Count_ = TArrowBlock::From(s.Values_[Width_]).GetDatum().scalar_as<arrow::UInt64Scalar>().value;
+ s.Count_ = GetCount(s.Values_.back());
+ } while (!s.Count_);
+
+ s.Current_ = s.Index_;
+ ++s.Index_;
+ for (size_t i = 0; i < Types_.size(); ++i)
+ if (const auto out = output[i])
+ *out = s.Get(ctx.HolderFactory, i);
+
+ return EFetchResult::One;
+ }
+#ifndef MKQL_DISABLE_CODEGEN
+ ICodegeneratorInlineWideNode::TGenerateResult DoGenGetValues(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
+ auto& context = ctx.Codegen.GetContext();
+
+ const auto valueType = Type::getInt128Ty(context);
+ const auto ptrValueType = PointerType::getUnqual(valueType);
+ const auto statusType = Type::getInt32Ty(context);
+ const auto indexType = Type::getInt64Ty(context);
+
+ TLLVMFieldsStructureState stateFields(context);
+ const auto stateType = StructType::get(context, stateFields.GetFieldsArray());
+ const auto statePtrType = PointerType::getUnqual(stateType);
+
+ const auto getFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::Get));
+ const auto getType = FunctionType::get(valueType, {statePtrType, ctx.GetFactory()->getType(), indexType}, false);
+ const auto getPtr = CastInst::Create(Instruction::IntToPtr, getFunc, PointerType::getUnqual(getType), "get", &ctx.Func->getEntryBlock().back());
+ const auto stateOnStack = new AllocaInst(statePtrType, 0U, "state_on_stack", &ctx.Func->getEntryBlock().back());
+ new StoreInst(ConstantPointerNull::get(statePtrType), stateOnStack, &ctx.Func->getEntryBlock().back());
+
+ const auto name = "GetCount";
+ ctx.Codegen.AddGlobalMapping(name, reinterpret_cast<const void*>(&GetCount));
+ const auto getCountType = NYql::NCodegen::ETarget::Windows != ctx.Codegen.GetEffectiveTarget() ?
+ FunctionType::get(indexType, { valueType }, false):
+ FunctionType::get(indexType, { ptrValueType }, false);
+ const auto getCount = ctx.Codegen.GetModule().getOrInsertFunction(name, getCountType);
+
+ const auto make = BasicBlock::Create(context, "make", ctx.Func);
+ const auto main = BasicBlock::Create(context, "main", ctx.Func);
+ const auto more = BasicBlock::Create(context, "more", ctx.Func);
+ const auto good = BasicBlock::Create(context, "good", ctx.Func);
+ const auto save = BasicBlock::Create(context, "save", ctx.Func);
+ const auto work = BasicBlock::Create(context, "work", ctx.Func);
+ const auto over = BasicBlock::Create(context, "over", ctx.Func);
+
+ BranchInst::Create(main, make, HasValue(statePtr, block), block);
+ block = make;
+
+ const auto ptrType = PointerType::getUnqual(StructType::get(context));
+ const auto self = CastInst::Create(Instruction::IntToPtr, ConstantInt::get(Type::getInt64Ty(context), uintptr_t(this)), ptrType, "self", block);
+ const auto makeFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TWideFromBlocksWrapper::MakeState));
+ const auto makeType = FunctionType::get(Type::getVoidTy(context), {self->getType(), ctx.Ctx->getType(), statePtr->getType()}, false);
+ const auto makeFuncPtr = CastInst::Create(Instruction::IntToPtr, makeFunc, PointerType::getUnqual(makeType), "function", block);
+ CallInst::Create(makeType, makeFuncPtr, {self, ctx.Ctx, statePtr}, "", block);
+ BranchInst::Create(main, block);
+
+ block = main;
+
+ const auto state = new LoadInst(valueType, statePtr, "state", block);
+ const auto half = CastInst::Create(Instruction::Trunc, state, Type::getInt64Ty(context), "half", block);
+ const auto stateArg = CastInst::Create(Instruction::IntToPtr, half, statePtrType, "state_arg", block);
+
+ const auto countPtr = GetElementPtrInst::CreateInBounds(stateType, stateArg, { stateFields.This(), stateFields.GetCount() }, "count_ptr", block);
+ const auto indexPtr = GetElementPtrInst::CreateInBounds(stateType, stateArg, { stateFields.This(), stateFields.GetIndex() }, "index_ptr", block);
+
+ const auto count = new LoadInst(indexType, countPtr, "count", block);
+ const auto index = new LoadInst(indexType, indexPtr, "index", block);
+
+ const auto next = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, count, index, "next", block);
+
+ BranchInst::Create(more, work, next, block);
+
+ block = more;
+
+ const auto pointerPtr = GetElementPtrInst::CreateInBounds(stateType, stateArg, { stateFields.This(), stateFields.GetPointer() }, "pointer_ptr", block);
+ const auto pointer = new LoadInst(ptrValueType, pointerPtr, "pointer", block);
+
+ std::vector<Value*> pointers(Types_.size());
+ for (size_t idx = 0U; idx < pointers.size(); ++idx) {
+ pointers[idx] = GetElementPtrInst::CreateInBounds(valueType, pointer, { ConstantInt::get(Type::getInt32Ty(context), idx) }, (TString("ptr_") += ToString(idx)).c_str(), block);
+ SafeUnRefUnboxed(pointers[idx], ctx, block);
}
- for (size_t i = 0; i < Width_; ++i) {
- if (!output[i]) {
- continue;
- }
+ const auto getres = GetNodeValues(Flow_, ctx, block);
- const auto& datum = TArrowBlock::From(s.Values_[i]).GetDatum();
- TBlockItem item;
- if (datum.is_scalar()) {
- item = s.Readers_[i]->GetScalarItem(*datum.scalar());
- } else {
- MKQL_ENSURE(datum.is_array(), "Expecting array");
- item = s.Readers_[i]->GetItem(*datum.array(), s.Index_);
- }
+ const auto special = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_SLE, getres.first, ConstantInt::get(getres.first->getType(), static_cast<i32>(EFetchResult::Yield)), "special", block);
+
+ const auto result = PHINode::Create(statusType, 2U, "result", over);
+ result->addIncoming(getres.first, block);
+
+ BranchInst::Create(over, good, special, block);
+
+ block = good;
+
+ const auto countValue = getres.second.back()(ctx, block);
+ const auto height = CallInst::Create(getCount, { WrapArgumentForWindows(countValue, ctx, block) }, "height", block);
+ CleanupBoxed(countValue, ctx, block);
+
+ new StoreInst(height, countPtr, block);
+ new StoreInst(ConstantInt::get(indexType, 0), indexPtr, block);
+
+ const auto empty = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, ConstantInt::get(indexType, 0), height, "empty", block);
- *(output[i]) = s.Converters_[i]->MakeValue(item, ctx.HolderFactory);
+ BranchInst::Create(more, save, empty, block);
+
+ block = save;
+
+ for (size_t idx = 0U; idx < pointers.size(); ++idx) {
+ const auto value = getres.second[idx](ctx, block);
+ AddRefBoxed(value, ctx, block);
+ new StoreInst(value, pointers[idx], block);
}
- ++s.Index_;
- return EFetchResult::One;
- }
+ BranchInst::Create(work, block);
+
+ block = work;
+ const auto current = new LoadInst(indexType, indexPtr, "current", block);
+ const auto currentPtr = GetElementPtrInst::CreateInBounds(stateType, stateArg, { stateFields.This(), stateFields.GetCurrent() }, "current_ptr", block);
+ new StoreInst(current, currentPtr, block);
+ const auto increment = BinaryOperator::CreateAdd(current, ConstantInt::get(indexType, 1), "increment", block);
+ new StoreInst(increment, indexPtr, block);
+ new StoreInst(stateArg, stateOnStack, block);
+
+ result->addIncoming(ConstantInt::get(statusType, static_cast<i32>(EFetchResult::One)), block);
+
+ BranchInst::Create(over, block);
+
+ block = over;
+
+ ICodegeneratorInlineWideNode::TGettersList getters(Types_.size());
+ for (size_t idx = 0U; idx < getters.size(); ++idx) {
+ getters[idx] = [idx, getType, getPtr, indexType, statePtrType, stateOnStack](const TCodegenContext& ctx, BasicBlock*& block) {
+ const auto stateArg = new LoadInst(statePtrType, stateOnStack, "state", block);
+ return CallInst::Create(getType, getPtr, {stateArg, ctx.GetFactory(), ConstantInt::get(indexType, idx)}, "get", block);
+ };
+ }
+ return {result, std::move(getters)};
+ }
+#endif
private:
struct TState : public TComputationValue<TState> {
- TVector<NUdf::TUnboxedValue> Values_;
- TVector<NUdf::TUnboxedValue*> ValuePointers_;
- TVector<std::unique_ptr<IBlockReader>> Readers_;
- TVector<std::unique_ptr<IBlockItemConverter>> Converters_;
size_t Count_ = 0;
size_t Index_ = 0;
+ size_t Current_ = 0;
+ NUdf::TUnboxedValue* Pointer_ = nullptr;
+ TUnboxedValueVector Values_;
+ std::vector<std::unique_ptr<IBlockReader>> Readers_;
+ std::vector<std::unique_ptr<IBlockItemConverter>> Converters_;
- TState(TMemoryUsageInfo* memInfo, const TVector<TType*>& types, const NUdf::IPgBuilder& pgBuilder)
+ TState(TMemoryUsageInfo* memInfo, TComputationContext& ctx, size_t wideFieldsIndex, const TVector<TType*>& types)
: TComputationValue(memInfo)
, Values_(types.size() + 1)
- , ValuePointers_(types.size() + 1)
{
+ Pointer_ = Values_.data();
+ auto**const fields = ctx.WideFields.data() + wideFieldsIndex;
for (size_t i = 0; i < types.size() + 1; ++i) {
- ValuePointers_[i] = &Values_[i];
+ fields[i] = &Values_[i];
}
+ const auto& pgBuilder = ctx.Builder->GetPgBuilder();
for (size_t i = 0; i < types.size(); ++i) {
Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), types[i]));
Converters_.push_back(MakeBlockItemConverter(TTypeInfoHelper(), types[i], pgBuilder));
}
}
+
+ NUdf::TUnboxedValuePod Get(const THolderFactory& holderFactory, size_t idx) const {
+ TBlockItem item;
+ if (const auto& datum = TArrowBlock::From(Values_[idx]).GetDatum(); datum.is_scalar()) {
+ item = Readers_[idx]->GetScalarItem(*datum.scalar());
+ } else {
+ MKQL_ENSURE(datum.is_array(), "Expecting array");
+ item = Readers_[idx]->GetItem(*datum.array(), Current_);
+ }
+ return Converters_[idx]->MakeValue(item, holderFactory);
+ }
};
+#ifndef MKQL_DISABLE_CODEGEN
+ class TLLVMFieldsStructureState: public TLLVMFieldsStructure<TComputationValue<TState>> {
+ private:
+ using TBase = TLLVMFieldsStructure<TComputationValue<TState>>;
+ llvm::IntegerType*const CountType;
+ llvm::IntegerType*const IndexType;
+ llvm::IntegerType*const CurrentType;
+ llvm::PointerType*const PointerType;
+ protected:
+ using TBase::Context;
+ public:
+ std::vector<llvm::Type*> GetFieldsArray() {
+ std::vector<llvm::Type*> result = TBase::GetFields();
+ result.emplace_back(CountType);
+ result.emplace_back(IndexType);
+ result.emplace_back(CurrentType);
+ result.emplace_back(PointerType);
+ return result;
+ }
-private:
+ llvm::Constant* GetCount() {
+ return ConstantInt::get(Type::getInt32Ty(Context), TBase::GetFieldsCount() + 0);
+ }
+
+ llvm::Constant* GetIndex() {
+ return ConstantInt::get(Type::getInt32Ty(Context), TBase::GetFieldsCount() + 1);
+ }
+
+ llvm::Constant* GetCurrent() {
+ return ConstantInt::get(Type::getInt32Ty(Context), TBase::GetFieldsCount() + 2);
+ }
+
+ llvm::Constant* GetPointer() {
+ return ConstantInt::get(Type::getInt32Ty(Context), TBase::GetFieldsCount() + 3);
+ }
+
+ TLLVMFieldsStructureState(llvm::LLVMContext& context)
+ : TBase(context)
+ , CountType(Type::getInt64Ty(Context))
+ , IndexType(Type::getInt64Ty(Context))
+ , CurrentType(Type::getInt64Ty(Context))
+ , PointerType(PointerType::getUnqual(Type::getInt128Ty(Context)))
+ {}
+ };
+#endif
void RegisterDependencies() const final {
FlowDependsOn(Flow_);
}
+ void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
+ state = ctx.HolderFactory.Create<TState>(ctx, WideFieldsIndex_, Types_);
+ }
+
TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
- if (!state.HasValue()) {
- state = ctx.HolderFactory.Create<TState>(Types_, ctx.Builder->GetPgBuilder());
- }
+ if (!state.HasValue())
+ MakeState(ctx, state);
return *static_cast<TState*>(state.AsBoxed().Get());
}
-private:
- IComputationWideFlowNode* Flow_;
+ IComputationWideFlowNode* const Flow_;
const TVector<TType*> Types_;
- const size_t Width_;
+ const size_t WideFieldsIndex_;
};
class TPrecomputedArrowNode : public IArrowKernelComputationNode {
@@ -373,7 +550,6 @@ public:
Y_UNUSED(index);
ythrow yexception() << "No input arguments";
}
-
private:
arrow::compute::ScalarKernel Kernel_;
const TStringBuf KernelName_;
@@ -477,10 +653,9 @@ private:
FlowDependsOn(Flow_);
}
- IComputationWideFlowNode* Flow_;
+ IComputationWideFlowNode* const Flow_;
};
-
} // namespace
IComputationNode* WrapToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_top_sort.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_top_sort.cpp
index d4e67db3a0..579dec75a9 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_top_sort.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_top_sort.cpp
@@ -432,8 +432,7 @@ public:
ValueAddRef(Representations[i], item, ctx, block);
new StoreInst(item, placeholders[i], block);
}
-
- } else {
+ } else {
for (auto i = 0U; i < Keys.size(); ++i) {
const auto item = getres.second[Indexes[i]](ctx, block);
new StoreInst(item, placeholders[i], block);
@@ -463,7 +462,6 @@ public:
new StoreInst(item, placeholders[i], block);
}
-
BranchInst::Create(loop, block);
block = skip;
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_blocks_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_blocks_ut.cpp
index d8940a82e8..c968bc7941 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_blocks_ut.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_blocks_ut.cpp
@@ -33,7 +33,7 @@ namespace NMiniKQL {
namespace {
arrow::Datum ExecuteOneKernel(const IArrowKernelComputationNode* kernelNode,
const std::vector<arrow::Datum>& argDatums, arrow::compute::ExecContext& execContext) {
- const auto& kernel = kernelNode->GetArrowKernel();
+ const auto& kernel = kernelNode->GetArrowKernel();
arrow::compute::KernelContext kernelContext(&execContext);
std::unique_ptr<arrow::compute::KernelState> state;
if (kernel.init) {
@@ -58,7 +58,7 @@ namespace {
arrow::Datum output = ExecuteOneKernel(topology->Items[i].Node.get(), argDatums, execContext);
datums[i + topology->InputArgsCount] = output;
- }
+ }
}
}
@@ -84,9 +84,10 @@ Y_UNIT_TEST(TestSimple) {
TSetup<false> setup;
auto& pb = *setup.PgmBuilder;
- auto data = TVector<TRuntimeNode>(Reserve(dataCount));
- for (size_t i = 0; i < dataCount; ++i) {
- data.push_back(pb.NewDataLiteral<ui64>(i));
+ TRuntimeNode::TList data;
+ data.reserve(dataCount);
+ for (ui64 i = 0ULL; i < dataCount; ++i) {
+ data.push_back(pb.NewDataLiteral(i));
}
const auto type = pb.NewDataType(NUdf::TDataType<ui64>::Id);
const auto list = pb.NewList(type, data);
@@ -108,7 +109,7 @@ Y_UNIT_TEST(TestSimple) {
Y_UNIT_TEST(TestWideToBlocks) {
TSetup<false> setup;
- auto& pb = *setup.PgmBuilder;
+ TProgramBuilder& pb = *setup.PgmBuilder;
const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id);
const auto tupleType = pb.NewTupleType({ui64Type, ui64Type});
@@ -156,7 +157,7 @@ void TestChunked(bool withBlockExpand) {
const auto tupleType = pb.NewTupleType({ui64Type, boolType, stringType, utf8Type});
- TVector<TRuntimeNode> items;
+ TRuntimeNode::TList items;
const size_t bigStrSize = 1024 * 1024 + 100;
const size_t smallStrSize = 256 * 1024;
for (size_t i = 0; i < 20; ++i) {
@@ -513,7 +514,41 @@ Y_UNIT_TEST(TestBlockFuncWithScalar) {
UNIT_ASSERT(!iterator.Next(item));
}
-Y_UNIT_TEST(TestWideFromBlocks) {
+Y_UNIT_TEST_LLVM(TestWideFromBlocks) {
+ TSetup<LLVM> setup;
+ TProgramBuilder& pb = *setup.PgmBuilder;
+
+ const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id);
+
+ const auto data1 = pb.NewDataLiteral<ui64>(10);
+ const auto data2 = pb.NewDataLiteral<ui64>(20);
+ const auto data3 = pb.NewDataLiteral<ui64>(30);
+
+ const auto list = pb.NewList(ui64Type, {data1, data2, data3});
+ const auto flow = pb.ToFlow(list);
+
+ const auto blocksFlow = pb.ToBlocks(flow);
+ const auto wideFlow = pb.ExpandMap(blocksFlow, [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item, pb.AsScalar(pb.NewDataLiteral<ui64>(3ULL))}; });
+ const auto wideFlow2 = pb.WideFromBlocks(wideFlow);
+ const auto narrowFlow = pb.NarrowMap(wideFlow2, [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); });
+
+ const auto pgmReturn = pb.Collect(narrowFlow);
+
+ const auto graph = setup.BuildGraph(pgmReturn);
+ const auto iterator = graph->GetValue().GetListIterator();
+
+ NUdf::TUnboxedValue item;
+ UNIT_ASSERT(iterator.Next(item));
+ UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 10);
+
+ UNIT_ASSERT(iterator.Next(item));
+ UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 20);
+
+ UNIT_ASSERT(iterator.Next(item));
+ UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 30);
+}
+
+Y_UNIT_TEST(TestWideToAndFromBlocks) {
TSetup<false> setup;
auto& pb = *setup.PgmBuilder;
diff --git a/ydb/library/yql/minikql/computation/mkql_block_impl.cpp b/ydb/library/yql/minikql/computation/mkql_block_impl.cpp
index c54f76b7bc..8bd1647514 100644
--- a/ydb/library/yql/minikql/computation/mkql_block_impl.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_block_impl.cpp
@@ -151,7 +151,7 @@ arrow::compute::OutputType ConvertToOutputType(TType* output) {
return arrow::compute::OutputType(ToValueDescr(output));
}
-TBlockFuncNode::TBlockFuncNode(TComputationMutables& mutables, TStringBuf name, TVector<IComputationNode*>&& argsNodes,
+TBlockFuncNode::TBlockFuncNode(TComputationMutables& mutables, TStringBuf name, TComputationNodePtrVector&& argsNodes,
const TVector<TType*>& argsTypes, const arrow::compute::ScalarKernel& kernel,
std::shared_ptr<arrow::compute::ScalarKernel> kernelHolder,
const arrow::compute::FunctionOptions* functionOptions)
diff --git a/ydb/library/yql/minikql/computation/mkql_block_impl.h b/ydb/library/yql/minikql/computation/mkql_block_impl.h
index 7f7f001551..cb5904bf5a 100644
--- a/ydb/library/yql/minikql/computation/mkql_block_impl.h
+++ b/ydb/library/yql/minikql/computation/mkql_block_impl.h
@@ -26,7 +26,7 @@ arrow::compute::OutputType ConvertToOutputType(TType* output);
class TBlockFuncNode : public TMutableComputationNode<TBlockFuncNode> {
public:
- TBlockFuncNode(TComputationMutables& mutables, TStringBuf name, TVector<IComputationNode*>&& argsNodes,
+ TBlockFuncNode(TComputationMutables& mutables, TStringBuf name, TComputationNodePtrVector&& argsNodes,
const TVector<TType*>& argsTypes, const arrow::compute::ScalarKernel& kernel,
std::shared_ptr<arrow::compute::ScalarKernel> kernelHolder = {},
const arrow::compute::FunctionOptions* functionOptions = nullptr);
@@ -74,7 +74,7 @@ private:
private:
const ui32 StateIndex;
- const TVector<IComputationNode*> ArgsNodes;
+ const TComputationNodePtrVector ArgsNodes;
const std::vector<arrow::ValueDescr> ArgsValuesDescr;
const arrow::compute::ScalarKernel& Kernel;
const std::shared_ptr<arrow::compute::ScalarKernel> KernelHolder;
@@ -83,6 +83,79 @@ private:
const TString Name;
};
+struct TBlockState : public TComputationValue<TBlockState> {
+ using TBase = TComputationValue<TBlockState>;
+
+ ui64 Count = 0;
+ TUnboxedValueVector Values;
+ std::vector<std::deque<std::shared_ptr<arrow::ArrayData>>> Arrays;
+
+ TBlockState(TMemoryUsageInfo* memInfo, size_t width)
+ : TBase(memInfo), Values(width), Arrays(width - 1ULL)
+ {}
+
+ void FillArrays(NUdf::TUnboxedValue*const* output) {
+ auto& counterDatum = TArrowBlock::From(Values.back()).GetDatum();
+ MKQL_ENSURE(counterDatum.is_scalar(), "Unexpected block length type (expecting scalar)");
+ Count = counterDatum.scalar_as<arrow::UInt64Scalar>().value;
+ if (!Count) {
+ return;
+ }
+ for (size_t i = 0U; i < Arrays.size(); ++i) {
+ Arrays[i].clear();
+ if (!output[i]) {
+ return;
+ }
+ auto& datum = TArrowBlock::From(Values[i]).GetDatum();
+ if (datum.is_scalar()) {
+ return;
+ }
+ MKQL_ENSURE(datum.is_arraylike(), "Unexpected block type (expecting array or chunked array)");
+ if (datum.is_array()) {
+ Arrays[i].push_back(datum.array());
+ } else {
+ for (auto& chunk : datum.chunks()) {
+ Arrays[i].push_back(chunk->data());
+ }
+ }
+ }
+ }
+
+ void FillOutputs(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) {
+ auto sliceSize = Count;
+ for (size_t i = 0; i < Arrays.size(); ++i) {
+ const auto& arr = Arrays[i];
+ if (arr.empty()) {
+ continue;
+ }
+
+ MKQL_ENSURE(ui64(arr.front()->length) <= Count, "Unexpected array length at column #" << i);
+ sliceSize = std::min<ui64>(sliceSize, arr.front()->length);
+ }
+
+ for (size_t i = 0; i < Arrays.size(); ++i) {
+ if (const auto out = output[i]) {
+ if (Arrays[i].empty()) {
+ *out = Values[i];
+ continue;
+ }
+
+ if (auto& array = Arrays[i].front(); ui64(array->length) == sliceSize) {
+ *out = ctx.HolderFactory.CreateArrowBlock(std::move(array));
+ Arrays[i].pop_front();
+ } else {
+ *out = ctx.HolderFactory.CreateArrowBlock(Chop(array, sliceSize));
+ }
+ }
+ }
+
+ if (const auto out = output[Values.size() - 1U]) {
+ *out = ctx.HolderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(sliceSize)));
+ }
+ Count -= sliceSize;
+ }
+};
+
template <typename TDerived>
class TStatefulWideFlowBlockComputationNode: public TWideFlowBaseComputationNode<TDerived>
{
@@ -90,109 +163,46 @@ protected:
TStatefulWideFlowBlockComputationNode(TComputationMutables& mutables, const IComputationNode* source, ui32 width)
: TWideFlowBaseComputationNode<TDerived>(source)
, StateIndex(mutables.CurValueIndex++)
- , StateKind(EValueRepresentation::Any)
+ , StateKind(EValueRepresentation::Boxed)
, Width(width)
+ , WideFieldsIndex(mutables.IncrementWideFieldsIndex(width))
{
MKQL_ENSURE(width > 0, "Wide flow blocks should have at least one column (block length)");
}
-
private:
- struct TState : public TComputationValue<TState> {
- using TBase = TComputationValue<TState>;
-
- TVector<NUdf::TUnboxedValue> Values;
- TVector<NUdf::TUnboxedValue*> ValuePointers;
+ struct TState : public TBlockState {
NUdf::TUnboxedValue ChildState;
- TVector<TDeque<std::shared_ptr<arrow::ArrayData>>> Arrays;
- ui64 Count = 0;
-
- TState(TMemoryUsageInfo* memInfo, size_t width, NUdf::TUnboxedValue*const* values, TComputationContext&)
- : TBase(memInfo)
- , Values(width)
- , ValuePointers(width)
- , Arrays(width - 1)
+
+ TState(TMemoryUsageInfo* memInfo, size_t width, ui32 wideFieldsIndex, NUdf::TUnboxedValue*const* values, TComputationContext& ctx)
+ : TBlockState(memInfo, width)
{
+ auto**const fields = ctx.WideFields.data() + wideFieldsIndex;
for (size_t i = 0; i < width - 1; ++i) {
- ValuePointers[i] = values[i] ? &Values[i] : nullptr;
+ fields[i] = values[i] ? &Values[i] : nullptr;
}
- ValuePointers.back() = &Values.back();
+ fields[width - 1] = &Values.back();
}
};
TState& GetState(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
auto& state = ctx.MutableValues[GetIndex()];
if (!state.HasValue()) {
- state = ctx.HolderFactory.Create<TState>(Width, output, ctx);
+ state = ctx.HolderFactory.Create<TState>(Width, WideFieldsIndex, output, ctx);
}
return *static_cast<TState*>(state.AsBoxed().Get());
}
EFetchResult FetchValues(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const final {
- TState& s = GetState(ctx, output);
+ auto& s = GetState(ctx, output);
+ const auto fields = ctx.WideFields.data() + WideFieldsIndex;
while (s.Count == 0) {
- auto result = static_cast<const TDerived*>(this)->DoCalculate(s.ChildState, ctx, s.ValuePointers.data());
- if (result != EFetchResult::One) {
+ if (const auto result = static_cast<const TDerived*>(this)->DoCalculate(s.ChildState, ctx, fields); result != EFetchResult::One) {
return result;
}
-
- auto& counterDatum = TArrowBlock::From(s.Values.back()).GetDatum();
- MKQL_ENSURE(counterDatum.is_scalar(), "Unexpected block length type (expecting scalar)");
- s.Count = counterDatum.template scalar_as<arrow::UInt64Scalar>().value;
- if (!s.Count) {
- continue;
- }
- for (size_t i = 0; i < Width - 1; ++i) {
- s.Arrays[i].clear();
- if (!output[i]) {
- continue;
- }
- auto& datum = TArrowBlock::From(s.Values[i]).GetDatum();
- if (datum.is_scalar()) {
- continue;
- }
- MKQL_ENSURE(datum.is_arraylike(), "Unexpected block type (expecting array or chunked array)");
- if (datum.is_array()) {
- s.Arrays[i].push_back(datum.array());
- } else {
- for (auto& chunk : datum.chunks()) {
- s.Arrays[i].push_back(chunk->data());
- }
- }
- }
+ s.FillArrays(output);
}
- ui64 sliceSize = s.Count;
- for (size_t i = 0; i < s.Arrays.size(); ++i) {
- const auto& arr = s.Arrays[i];
- if (arr.empty()) {
- continue;
- }
- MKQL_ENSURE(ui64(arr.front()->length) <= s.Count, "Unexpected array length at column #" << i);
- sliceSize = std::min<ui64>(sliceSize, arr.front()->length);
- }
-
- for (size_t i = 0; i < s.Arrays.size(); ++i) {
- if (!output[i]) {
- continue;
- }
- if (s.Arrays[i].empty()) {
- *(output[i]) = s.Values[i];
- continue;
- }
-
- auto& array = s.Arrays[i].front();
- if (ui64(array->length) == sliceSize) {
- *(output[i]) = ctx.HolderFactory.CreateArrowBlock(std::move(array));
- s.Arrays[i].pop_front();
- } else {
- *(output[i]) = ctx.HolderFactory.CreateArrowBlock(Chop(array, sliceSize));
- }
- }
-
- if (output[Width - 1]) {
- *(output[Width - 1]) = ctx.HolderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(sliceSize)));
- }
- s.Count -= sliceSize;
+ s.FillOutputs(ctx, output);
return EFetchResult::One;
}
@@ -209,10 +219,11 @@ private:
this->Dependence->CollectDependentIndexes(owner, dependencies);
}
}
-
+protected:
const ui32 StateIndex;
const EValueRepresentation StateKind;
const ui32 Width;
+ const ui32 WideFieldsIndex;
};
} //namespace NKikimr::NMiniKQL
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_holders.h b/ydb/library/yql/minikql/computation/mkql_computation_node_holders.h
index 1b978734bc..0dd7703f71 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node_holders.h
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node_holders.h
@@ -639,8 +639,6 @@ private:
THashMap<TTypeBase, TValuePtr, THasherTType, TEqualTType> Registry;
};
-
-
//////////////////////////////////////////////////////////////////////////////
// THolderFactory
//////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/library/yql/minikql/mkql_type_builder.cpp b/ydb/library/yql/minikql/mkql_type_builder.cpp
index abab15a9f8..71573fc155 100644
--- a/ydb/library/yql/minikql/mkql_type_builder.cpp
+++ b/ydb/library/yql/minikql/mkql_type_builder.cpp
@@ -1372,7 +1372,7 @@ public:
}
auto cmpKeys = CompareKey_->Compare(
- static_cast<const NUdf::TUnboxedValuePod&>(lhsCurr->first),
+ static_cast<const NUdf::TUnboxedValuePod&>(lhsCurr->first),
static_cast<const NUdf::TUnboxedValuePod&>(rhsCurr->first)
);
@@ -1381,7 +1381,7 @@ public:
}
auto cmpPayloads = ComparePayload_->Compare(
- static_cast<const NUdf::TUnboxedValuePod&>(lhsCurr->second),
+ static_cast<const NUdf::TUnboxedValuePod&>(lhsCurr->second),
static_cast<const NUdf::TUnboxedValuePod&>(rhsCurr->second)
);
@@ -2288,7 +2288,7 @@ NUdf::ICompare::TPtr MakeCompareImpl(const NMiniKQL::TType* type) {
case NMiniKQL::TType::EKind::List:
return new TCompare<NMiniKQL::TType::EKind::List>(type);
case NMiniKQL::TType::EKind::Dict:
- return new TCompare<NMiniKQL::TType::EKind::Dict>(type);
+ return new TCompare<NMiniKQL::TType::EKind::Dict>(type);
case NMiniKQL::TType::EKind::Pg:
return MakePgCompare((const TPgType*)type);
case NMiniKQL::TType::EKind::Tagged: {
diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
index 0e1da3cf8d..c897505018 100644
--- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
+++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
@@ -1759,7 +1759,7 @@ TComputationNodeFactory GetPgFactory() {
const auto idData = AS_VALUE(TDataLiteral, callable.GetInput(1));
auto name = nameData->AsValue().AsStringRef();
auto id = idData->AsValue().Get<ui32>();
- TVector<IComputationNode*> argNodes;
+ TComputationNodePtrVector argNodes;
TVector<TType*> argTypes;
for (ui32 i = 2; i < callable.GetInputsCount(); ++i) {
argNodes.emplace_back(LocateNode(ctx.NodeLocator, callable, i));