diff options
author | vvvv <vvvv@ydb.tech> | 2022-12-08 22:45:32 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2022-12-08 22:45:32 +0300 |
commit | 904b24c0f6381f6212ca11248c92b05503e89ee5 (patch) | |
tree | 622a727ca346eb83e32d6d06104da9addd214959 | |
parent | 35e53e3bf1d4087b8c6fccfb6286001d94b5a68d (diff) | |
download | ydb-904b24c0f6381f6212ca11248c92b05503e89ee5.tar.gz |
initial implementation of hashed combiner over blocks
https://st.yandex-team.ru/#63913c808060fd3734409ed5
19 files changed, 1251 insertions, 154 deletions
diff --git a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json index 8e35c09274..0e83c9c9c9 100644 --- a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json +++ b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json @@ -2251,6 +2251,17 @@ {"Index": 2, "Name": "FilterColumn", "Type": "TExprBase"}, {"Index": 3, "Name": "Aggregations", "Type": "TExprList"} ] + }, + { + "Name": "TCoBlockCombineHashed", + "Base": "TCoInputBase", + "Match": {"Type": "Callable", "Name": "BlockCombineHashed"}, + "Children": [ + {"Index": 1, "Name": "CountColumn", "Type": "TCoAtom"}, + {"Index": 2, "Name": "FilterColumn", "Type": "TExprBase"}, + {"Index": 3, "Name": "Keys", "Type": "TCoAtomList"}, + {"Index": 4, "Name": "Aggregations", "Type": "TExprList"} + ] } ] } diff --git a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp index 1fc7cefbb7..c6c09dcd8e 100644 --- a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp @@ -251,6 +251,48 @@ IGraphTransformer::TStatus BlockBitCastWrapper(const TExprNode::TPtr& input, TEx return IGraphTransformer::TStatus::Ok; } +bool ValidateBlockAggs(TPositionHandle pos, const TTypeAnnotationNode::TListType inputItems, const TExprNode& aggs, + TTypeAnnotationNode::TListType& retMultiType, TExprContext& ctx) { + if (!EnsureTuple(aggs, ctx)) { + return false; + } + + for (const auto& agg : aggs.Children()) { + if (!EnsureTupleMinSize(*agg, 1, ctx)) { + return false; + } + + if (!agg->Head().IsCallable("AggBlockApply")) { + ctx.AddError(TIssue(ctx.GetPosition(pos), "Expected AggBlockApply")); + return false; + } + + if (agg->ChildrenSize() != agg->Head().ChildrenSize()) { + ctx.AddError(TIssue(ctx.GetPosition(pos), "Different amount of input arguments")); + return false; + } + + for (ui32 i = 1; i < agg->ChildrenSize(); ++i) { + ui32 argColumnIndex; + if (!TryFromString(agg->Child(i)->Content(), argColumnIndex) || argColumnIndex >= inputItems.size()) { + ctx.AddError(TIssue(ctx.GetPosition(pos), "Bad arg column index")); + return false; + } + + auto applyArgType = agg->Head().Child(i)->GetTypeAnn()->Cast<TTypeExprType>()->GetType(); + if (!IsSameAnnotation(*inputItems[argColumnIndex], *applyArgType)) { + ctx.AddError(TIssue(ctx.GetPosition(pos), TStringBuilder() << + "Mismatch argument type, expected: " << *applyArgType << ", got: " << *inputItems[argColumnIndex])); + return false; + } + } + + retMultiType.push_back(AggApplySerializedStateType(agg->HeadPtr(), ctx)); + } + + return true; +} + IGraphTransformer::TStatus BlockCombineAllWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) { Y_UNUSED(output); if (!EnsureArgsCount(*input, 4U, ctx.Expr)) { @@ -287,6 +329,10 @@ IGraphTransformer::TStatus BlockCombineAllWrapper(const TExprNode::TPtr& input, } if (!input->Child(2)->IsCallable("Void")) { + if (!EnsureAtom(*input->Child(2), ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + ui32 filterColumnIndex; if (!TryFromString(input->Child(2)->Content(), filterColumnIndex) || filterColumnIndex >= inputItems.size()) { ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Bad filter column index")); @@ -298,44 +344,95 @@ IGraphTransformer::TStatus BlockCombineAllWrapper(const TExprNode::TPtr& input, } } - if (!EnsureTuple(*input->Child(3), ctx.Expr)) { + TTypeAnnotationNode::TListType retMultiType; + if (!ValidateBlockAggs(input->Pos(), inputItems, *input->Child(3), retMultiType, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } - TTypeAnnotationNode::TListType retMultiType; - for (const auto& agg : input->Child(3)->Children()) { - if (!EnsureTupleMinSize(*agg, 1, ctx.Expr)) { + auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(retMultiType); + input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType)); + return IGraphTransformer::TStatus::Ok; +} + +IGraphTransformer::TStatus BlockCombineHashedWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) { + Y_UNUSED(output); + if (!EnsureArgsCount(*input, 5U, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + if (!EnsureWideFlowType(input->Head(), ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + const auto multiType = input->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>(); + TTypeAnnotationNode::TListType inputItems; + for (const auto& type : multiType->GetItems()) { + if (!EnsureBlockOrScalarType(input->Pos(), *type, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } - if (!agg->Head().IsCallable("AggBlockApply")) { - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Expected AggBlockApply")); + bool isScalar; + inputItems.push_back(GetBlockItemType(*type, isScalar)); + } + + if (!EnsureAtom(*input->Child(1), ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + ui32 countColumnIndex; + if (!TryFromString(input->Child(1)->Content(), countColumnIndex) || countColumnIndex >= inputItems.size()) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Bad count column index")); + return IGraphTransformer::TStatus::Error; + } + + if (!EnsureSpecificDataType(input->Child(1)->Pos(), *inputItems[countColumnIndex], EDataSlot::Uint64, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + if (!input->Child(2)->IsCallable("Void")) { + if (!EnsureAtom(*input->Child(2), ctx.Expr)) { return IGraphTransformer::TStatus::Error; } - if (agg->ChildrenSize() != agg->Head().ChildrenSize()) { - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Different amount of input arguments")); + ui32 filterColumnIndex; + if (!TryFromString(input->Child(2)->Content(), filterColumnIndex) || filterColumnIndex >= inputItems.size()) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Bad filter column index")); return IGraphTransformer::TStatus::Error; } - for (ui32 i = 1; i < agg->ChildrenSize(); ++i) { - ui32 argColumnIndex; - if (!TryFromString(agg->Child(i)->Content(), argColumnIndex) || argColumnIndex >= inputItems.size()) { - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Bad arg column index")); - return IGraphTransformer::TStatus::Error; - } + if (!EnsureSpecificDataType(input->Child(2)->Pos(), *inputItems[filterColumnIndex], EDataSlot::Bool, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + } - auto applyArgType = agg->Head().Child(i)->GetTypeAnn()->Cast<TTypeExprType>()->GetType(); - if (!IsSameAnnotation(*inputItems[argColumnIndex], *applyArgType)) { - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), TStringBuilder() << - "Mismatch argument type, expected: " << *applyArgType << ", got: " << *inputItems[argColumnIndex])); - return IGraphTransformer::TStatus::Error; - } + if (!EnsureTupleMinSize(*input->Child(3), 1, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + TTypeAnnotationNode::TListType retMultiType; + for (auto child : input->Child(3)->Children()) { + if (!EnsureAtom(*child, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + ui32 keyColumnIndex; + if (!TryFromString(child->Content(), keyColumnIndex) || keyColumnIndex >= inputItems.size()) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Bad key column index")); + return IGraphTransformer::TStatus::Error; } - retMultiType.push_back(AggApplySerializedStateType(agg->HeadPtr(), ctx.Expr)); + retMultiType.push_back(inputItems[keyColumnIndex]); + } + + if (!ValidateBlockAggs(input->Pos(), inputItems, *input->Child(4), retMultiType, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + for (auto& t : retMultiType) { + t = ctx.Expr.MakeType<TBlockExprType>(t); } + retMultiType.push_back(ctx.Expr.MakeType<TScalarExprType>(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64))); auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(retMultiType); input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType)); return IGraphTransformer::TStatus::Ok; diff --git a/ydb/library/yql/core/type_ann/type_ann_blocks.h b/ydb/library/yql/core/type_ann/type_ann_blocks.h index cc81b6f895..b8fd6fb9b8 100644 --- a/ydb/library/yql/core/type_ann/type_ann_blocks.h +++ b/ydb/library/yql/core/type_ann/type_ann_blocks.h @@ -15,6 +15,7 @@ namespace NTypeAnnImpl { IGraphTransformer::TStatus BlockFuncWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx); IGraphTransformer::TStatus BlockBitCastWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx); IGraphTransformer::TStatus BlockCombineAllWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx); + IGraphTransformer::TStatus BlockCombineHashedWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx); } // namespace NTypeAnnImpl } // namespace NYql diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp index 972c322fe9..edbf7734c0 100644 --- a/ydb/library/yql/core/type_ann/type_ann_core.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp @@ -11784,6 +11784,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot> ExtFunctions["BlockFunc"] = &BlockFuncWrapper; ExtFunctions["BlockBitCast"] = &BlockBitCastWrapper; ExtFunctions["BlockCombineAll"] = &BlockCombineAllWrapper; + ExtFunctions["BlockCombineHashed"] = &BlockCombineHashedWrapper; Functions["AsRange"] = &AsRangeWrapper; Functions["RangeCreate"] = &RangeCreateWrapper; diff --git a/ydb/library/yql/core/yql_aggregate_expander.cpp b/ydb/library/yql/core/yql_aggregate_expander.cpp index 159ce50af7..52d6ab2432 100644 --- a/ydb/library/yql/core/yql_aggregate_expander.cpp +++ b/ydb/library/yql/core/yql_aggregate_expander.cpp @@ -492,11 +492,13 @@ TExprNode::TPtr TAggregateExpander::GetFinalAggStateExtractor(ui32 i) { .Build(); } -TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAll() { +TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAllOrHashed() { if (!TypesCtx.ArrowResolver) { return nullptr; } + const bool hashed = (KeyColumns->ChildrenSize() > 0); + auto streamArg = Ctx.NewArgument(Node->Pos(), "stream"); auto flow = Ctx.NewCallable(Node->Pos(), "ToFlow", { streamArg }); TVector<TString> inputColumns; @@ -514,6 +516,26 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAll() { TExprNode::TListType extractorRoots; TExprNode::TListType aggs; TVector<TString> outputColumns; + TExprNode::TListType keyIdxs; + TVector<const TTypeAnnotationNode*> allKeyTypes; + for (ui32 index = 0; index < KeyColumns->ChildrenSize(); ++index) { + auto keyName = KeyColumns->Child(index)->Content(); + auto rowIndex = RowType->FindItem(keyName); + YQL_ENSURE(rowIndex, "Unknown column: " << keyName); + auto type = RowType->GetItems()[*rowIndex]->GetItemType(); + extractorRoots.push_back(extractorArgs[*rowIndex]); + + allKeyTypes.push_back(type); + keyIdxs.push_back(Ctx.NewAtom(Node->Pos(), ToString(index))); + outputColumns.push_back(TString(keyName)); + } + + bool supported = false; + YQL_ENSURE(TypesCtx.ArrowResolver->AreTypesSupported(Ctx.GetPosition(Node->Pos()), allKeyTypes, supported, Ctx)); + if (!supported) { + return nullptr; + } + for (ui32 index = 0; index < AggregatedColumns->ChildrenSize(); ++index) { auto trait = AggregatedColumns->Child(index)->ChildPtr(1); if (trait->Child(0)->Content() == "count_all") { @@ -571,15 +593,31 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAll() { auto extractorLambda = Ctx.NewLambda(Node->Pos(), Ctx.NewArguments(Node->Pos(), std::move(extractorArgs)), std::move(extractorRoots)); auto mappedWideFlow = Ctx.NewCallable(Node->Pos(), "WideMap", { wideFlow, extractorLambda }); auto blocks = Ctx.NewCallable(Node->Pos(), "WideToBlocks", { mappedWideFlow }); - auto aggWideFlow = Ctx.Builder(Node->Pos()) - .Callable("BlockCombineAll") - .Add(0, blocks) - .Atom(1, ToString(mappedWidth)) - .Callable(2, "Void") + TExprNode::TPtr aggWideFlow; + if (hashed) { + aggWideFlow = Ctx.Builder(Node->Pos()) + .Callable("WideFromBlocks") + .Callable(0, "BlockCombineHashed") + .Add(0, blocks) + .Atom(1, ToString(mappedWidth)) + .Callable(2, "Void") + .Seal() + .Add(3, Ctx.NewList(Node->Pos(), std::move(keyIdxs))) + .Add(4, Ctx.NewList(Node->Pos(), std::move(aggs))) + .Seal() .Seal() - .Add(3, Ctx.NewList(Node->Pos(), std::move(aggs))) - .Seal() - .Build(); + .Build(); + } else { + aggWideFlow = Ctx.Builder(Node->Pos()) + .Callable("BlockCombineAll") + .Add(0, blocks) + .Atom(1, ToString(mappedWidth)) + .Callable(2, "Void") + .Seal() + .Add(3, Ctx.NewList(Node->Pos(), std::move(aggs))) + .Seal() + .Build(); + } auto finalFlow = MakeNarrowMap(Node->Pos(), outputColumns, aggWideFlow, Ctx); auto root = Ctx.NewCallable(Node->Pos(), "FromFlow", { finalFlow }); @@ -2189,18 +2227,14 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombine() { return nullptr; } - if (KeyColumns->ChildrenSize() == 0) { - for (const auto& x : AggregatedColumns->Children()) { - auto trait = x->ChildPtr(1); - if (!trait->IsCallable("AggApply")) { - return nullptr; - } + for (const auto& x : AggregatedColumns->Children()) { + auto trait = x->ChildPtr(1); + if (!trait->IsCallable("AggApply")) { + return nullptr; } - - return TryGenerateBlockCombineAll(); } - return nullptr; + return TryGenerateBlockCombineAllOrHashed(); } } // namespace NYql diff --git a/ydb/library/yql/core/yql_aggregate_expander.h b/ydb/library/yql/core/yql_aggregate_expander.h index e34b80328f..63f1cc9dfc 100644 --- a/ydb/library/yql/core/yql_aggregate_expander.h +++ b/ydb/library/yql/core/yql_aggregate_expander.h @@ -75,7 +75,7 @@ private: TExprNode::TPtr GeneratePhases(); void GenerateInitForDistinct(TExprNodeBuilder& parent, ui32& ndx, const TIdxSet& indicies, const TExprNode::TPtr& distinctField); TExprNode::TPtr GenerateJustOverStates(const TExprNode::TPtr& input, const TIdxSet& indicies); - TExprNode::TPtr TryGenerateBlockCombineAll(); + TExprNode::TPtr TryGenerateBlockCombineAllOrHashed(); TExprNode::TPtr TryGenerateBlockCombine(); private: diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp index a3418056e8..7524df9822 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp @@ -1,12 +1,18 @@ #include "mkql_block_agg.h" #include "mkql_block_agg_factory.h" +#include "mkql_rh_hash.h" #include <ydb/library/yql/minikql/computation/mkql_computation_node_impl.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> #include <ydb/library/yql/minikql/mkql_node_cast.h> +#include <ydb/library/yql/minikql/mkql_node_builder.h> +#include <ydb/library/yql/minikql/arrow/arrow_defs.h> + +#include <arrow/scalar.h> #include <arrow/array/array_primitive.h> +#include <arrow/array/builder_primitive.h> namespace NKikimr { namespace NMiniKQL { @@ -19,6 +25,190 @@ struct TAggParams { std::vector<ui32> ArgColumns; }; +struct TKeyParams { + ui32 Index; + TType* Type; +}; + +class TInputBuffer { +public: + TInputBuffer(TStringBuf buf) + : Buf_(buf) + {} + + char PopChar() { + Ensure(1); + char c = Buf_.Data()[Pos_]; + ++Pos_; + return c; + } + + template <typename T> + T PopNumber() { + Ensure(sizeof(T)); + T t = *(const T*)(Buf_.Data() + Pos_); + Pos_ += sizeof(T); + return t; + } + +private: + void Ensure(size_t delta) { + MKQL_ENSURE(Pos_ + delta <= Buf_.Size(), "Unexpected end of buffer"); + } + +private: + size_t Pos_ = 0; + TStringBuf Buf_; +}; + +class TOutputBuffer { +public: + void PushChar(char c) { + Ensure(1); + Vec_[Pos_] = c; + ++Pos_; + } + + template <typename T> + void PushNumber(T t) { + Ensure(sizeof(T)); + *(T*)&Vec_[Pos_] = t; + Pos_ += sizeof(T); + } + + // fill with zeros + void Resize(size_t size) { + Pos_ = 0; + Vec_.clear(); + Vec_.resize(size); + } + + void Rewind() { + Pos_ = 0; + } + + TStringBuf Finish() const { + return TStringBuf(Vec_.data(), Vec_.data() + Pos_); + } + +private: + void Ensure(size_t delta) { + if (Pos_ + delta > Vec_.size()) { + Vec_.reserve(Max(2 * Vec_.capacity(), Pos_ + delta)); + Vec_.resize(Pos_ + delta); + } + } + +private: + size_t Pos_ = 0; + TVector<char> Vec_; +}; + +class IKeyColumnBuilder { +public: + virtual ~IKeyColumnBuilder() = default; + + // decode part of buffer and advances position + virtual void Add(TInputBuffer& in) = 0; + + virtual NUdf::TUnboxedValue Build() = 0; +}; + +class IKeySerializer { +public: + virtual ~IKeySerializer() = default; + + // handle scalar or array item + virtual void Serialize(const arrow::Datum& value, ui64 index, TOutputBuffer& out) const = 0; + + virtual std::unique_ptr<IKeyColumnBuilder> MakeBuilder(ui64 size, TComputationContext& ctx) const = 0; +}; + +template <typename T, typename TBuilder, bool IsOptional> +class TFixedSizeKeyColumnBuilder : public IKeyColumnBuilder { +public: + TFixedSizeKeyColumnBuilder(ui64 size, const std::shared_ptr<arrow::DataType>& dataType, TComputationContext& ctx) + : Builder_(dataType, &ctx.ArrowMemoryPool) + , Ctx_(ctx) + { + ARROW_OK(this->Builder_.Reserve(size)); + } + + void Add(TInputBuffer& in) final { + if constexpr (IsOptional) { + if (in.PopChar()) { + auto x = in.PopNumber<T>(); + this->Builder_.UnsafeAppend(x); + } else { + this->Builder_.UnsafeAppendNull(); + } + } else { + auto x = in.PopNumber<T>(); + this->Builder_.UnsafeAppend(x); + } + } + + NUdf::TUnboxedValue Build() final { + std::shared_ptr<arrow::ArrayData> result; + ARROW_OK(this->Builder_.FinishInternal(&result)); + return Ctx_.HolderFactory.CreateArrowBlock(result); + } + +private: + TBuilder Builder_; + TComputationContext& Ctx_; +}; + +template <typename T, typename TScalar, typename TBuilder, bool IsOptional> +class TFixedSizeKeySerializer : public IKeySerializer { +public: + TFixedSizeKeySerializer(const std::shared_ptr<arrow::DataType>& dataType) + : DataType_(dataType) + {} + + virtual void Serialize(const arrow::Datum& value, ui64 index, TOutputBuffer& out) const final { + T x; + if (value.is_scalar()) { + const auto& scalar = value.scalar_as<TScalar>(); + if constexpr (IsOptional) { + if (scalar.is_valid) { + out.PushChar(1); + x = scalar.value; + } else { + out.PushChar(0); + return; + } + + } else { + Y_ASSERT(scalar.is_valid); + x = scalar.value; + } + } else { + const auto& array = *value.array(); + if constexpr (IsOptional) { + if (array.GetNullCount() == 0 || arrow::BitUtil::GetBit(array.GetValues<uint8_t>(0, 0), index + array.offset)) { + out.PushChar(1); + x = array.GetValues<T>(1)[index]; + } else { + out.PushChar(0); + return; + } + } else { + x = array.GetValues<T>(1)[index]; + } + } + + out.PushNumber<T>(x); + } + + std::unique_ptr<IKeyColumnBuilder> MakeBuilder(ui64 size, TComputationContext& ctx) const final { + return std::make_unique<TFixedSizeKeyColumnBuilder<T, TBuilder, IsOptional>>(size, DataType_, ctx); + } + +private: + const std::shared_ptr<arrow::DataType> DataType_; +}; + class TBlockCombineAllWrapper : public TStatefulWideFlowComputationNode<TBlockCombineAllWrapper> { public: TBlockCombineAllWrapper(TComputationMutables& mutables, @@ -115,7 +305,7 @@ private: bool HasValues_ = false; TVector<char> AggStates_; - TState(TMemoryUsageInfo* memInfo, size_t width, std::optional<ui32> filterColumn, const TVector<TAggParams>& params, const THolderFactory& holderFactory) + TState(TMemoryUsageInfo* memInfo, size_t width, std::optional<ui32> filterColumn, const TVector<TAggParams>& params, TComputationContext& ctx) : TComputationValue(memInfo) , Values_(width) , ValuePointers_(width) @@ -126,7 +316,7 @@ private: ui32 totalStateSize = 0; for (const auto& p : params) { - Aggs_.emplace_back(MakeBlockAggregator(p.Name, p.TupleType, filterColumn, p.ArgColumns, holderFactory)); + Aggs_.emplace_back(MakeBlockAggregator(p.Name, p.TupleType, filterColumn, p.ArgColumns, ctx)); totalStateSize += Aggs_.back()->StateSize; } @@ -147,7 +337,291 @@ private: TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const { if (!state.HasValue()) { - state = ctx.HolderFactory.Create<TState>(Width_, FilterColumn_, AggsParams_, ctx.HolderFactory); + state = ctx.HolderFactory.Create<TState>(Width_, FilterColumn_, AggsParams_, ctx); + } + return *static_cast<TState*>(state.AsBoxed().Get()); + } + + ui64 GetBatchLength(const NUdf::TUnboxedValue* columns) const { + return TArrowBlock::From(columns[CountColumn_]).GetDatum().scalar_as<arrow::UInt64Scalar>().value; + } + +private: + IComputationWideFlowNode* Flow_; + const ui32 CountColumn_; + std::optional<ui32> FilterColumn_; + const size_t Width_; + const TVector<TAggParams> AggsParams_; +}; + +class TBlockCombineHashedWrapper : public TStatefulWideFlowComputationNode<TBlockCombineHashedWrapper> { +public: + TBlockCombineHashedWrapper(TComputationMutables& mutables, + IComputationWideFlowNode* flow, + ui32 countColumn, + std::optional<ui32> filterColumn, + size_t width, + const std::vector<TKeyParams>& keys, + TVector<TAggParams>&& aggsParams) + : TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any) + , Flow_(flow) + , CountColumn_(countColumn) + , FilterColumn_(filterColumn) + , Width_(width) + , OutputWidth_(keys.size() + aggsParams.size() + 1) + , Keys_(keys) + , AggsParams_(std::move(aggsParams)) + { + for (const auto& k : Keys_) { + auto itemType = AS_TYPE(TBlockType, k.Type)->GetItemType(); + bool isOptional; + auto dataType = UnpackOptionalData(itemType, isOptional); + if (isOptional) { + TotalKeysSize_ += 1; + } + + switch (*dataType->GetDataSlot()) { + case NUdf::EDataSlot::Int8: + TotalKeysSize_ += 1; + if (isOptional) { + KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<i8, arrow::Int8Scalar, arrow::Int8Builder, true>>(arrow::int8())); + } else { + KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<i8, arrow::Int8Scalar, arrow::Int8Builder, false>>(arrow::int8())); + } + + break; + case NUdf::EDataSlot::Bool: + case NUdf::EDataSlot::Uint8: + TotalKeysSize_ += 1; + if (isOptional) { + KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui8, arrow::UInt8Scalar, arrow::UInt8Builder, true>>(arrow::uint8())); + } else { + KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui8, arrow::UInt8Scalar, arrow::UInt8Builder, false>>(arrow::uint8())); + } + + break; + case NUdf::EDataSlot::Int16: + TotalKeysSize_ += 2; + if (isOptional) { + KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<i16, arrow::Int16Scalar, arrow::Int16Builder, true>>(arrow::int16())); + } else { + KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<i16, arrow::Int16Scalar, arrow::Int16Builder, false>>(arrow::int16())); + } + + break; + case NUdf::EDataSlot::Uint16: + case NUdf::EDataSlot::Date: + TotalKeysSize_ += 2; + if (isOptional) { + KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui16, arrow::UInt16Scalar, arrow::UInt16Builder, true>>(arrow::uint16())); + } else { + KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui16, arrow::UInt16Scalar, arrow::UInt16Builder, false>>(arrow::uint16())); + } + + break; + case NUdf::EDataSlot::Int32: + TotalKeysSize_ += 4; + if (isOptional) { + KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<i32, arrow::Int32Scalar, arrow::Int32Builder, true>>(arrow::int32())); + } else { + KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<i32, arrow::Int32Scalar, arrow::Int32Builder, false>>(arrow::int32())); + } + + break; + case NUdf::EDataSlot::Uint32: + case NUdf::EDataSlot::Datetime: + TotalKeysSize_ += 4; + if (isOptional) { + KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui32, arrow::UInt32Scalar, arrow::UInt32Builder, true>>(arrow::uint32())); + } else { + KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui32, arrow::UInt32Scalar, arrow::UInt32Builder, false>>(arrow::uint32())); + } + + break; + case NUdf::EDataSlot::Int64: + case NUdf::EDataSlot::Interval: + TotalKeysSize_ += 8; + if (isOptional) { + KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<i64, arrow::Int64Scalar, arrow::Int64Builder, true>>(arrow::int64())); + } else { + KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<i64, arrow::Int64Scalar, arrow::Int64Builder, false>>(arrow::int64())); + } + + break; + case NUdf::EDataSlot::Uint64: + case NUdf::EDataSlot::Timestamp: + TotalKeysSize_ += 8; + if (isOptional) { + KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui64, arrow::UInt64Scalar, arrow::UInt64Builder, true>>(arrow::uint64())); + } else { + KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui64, arrow::UInt64Scalar, arrow::UInt64Builder, false>>(arrow::uint64())); + } + + break; + default: + throw yexception() << "Unsupported key type"; + } + } + + MKQL_ENSURE(TotalKeysSize_ <= 4, "TODO Support all lengths of keys"); + } + + EFetchResult DoCalculate(NUdf::TUnboxedValue& state, + TComputationContext& ctx, + NUdf::TUnboxedValue*const* output) const + { + auto& s = GetState(state, ctx); + if (s.IsFinished_) { + return EFetchResult::Finish; + } + + for (;;) { + auto result = Flow_->FetchValues(ctx, s.ValuePointers_.data()); + if (result == EFetchResult::Yield) { + return result; + } else if (result == EFetchResult::One) { + ui64 batchLength = GetBatchLength(s.Values_.data()); + if (!batchLength) { + continue; + } + + s.HasValues_ = true; + TVector<arrow::Datum> keysDatum; + keysDatum.reserve(Keys_.size()); + for (ui32 i = 0; i < Keys_.size(); ++i) { + keysDatum.emplace_back(TArrowBlock::From(s.Values_[Keys_[i].Index]).GetDatum()); + } + + TOutputBuffer out; + out.Resize(sizeof(ui32)); + for (ui64 row = 0; row < batchLength; ++row) { + out.Rewind(); + // encode key + for (ui32 i = 0; i < keysDatum.size(); ++i) { + KeySerializers_[i]->Serialize(keysDatum[i], row, out); + } + + auto str = out.Finish(); + Y_ASSERT(str.Size() <= sizeof(ui32)); + ui32 key = *(const ui32*)str.Data(); + bool isNew; + auto iter = s.HashMap_->Insert(key, isNew); + char* ptr = (char*)s.HashMap_->GetPayload(iter); + if (isNew) { + for (size_t i = 0; i < s.Aggs_.size(); ++i) { + if (output[Keys_.size() + i]) { + s.Aggs_[i]->InitKey(ptr, s.Values_.data(), row); + } + + ptr += s.Aggs_[i]->StateSize; + } + + s.HashMap_->CheckGrow(); + } else { + for (size_t i = 0; i < s.Aggs_.size(); ++i) { + if (output[Keys_.size() + i]) { + s.Aggs_[i]->UpdateKey(ptr, s.Values_.data(), row); + } + + ptr += s.Aggs_[i]->StateSize; + } + } + } + } else { + s.IsFinished_ = true; + if (!s.HasValues_) { + return EFetchResult::Finish; + } + + // export results, TODO: split by batches + auto size = s.HashMap_->GetSize(); + TVector<std::unique_ptr<IKeyColumnBuilder>> keyBuilders; + for (const auto& ks : KeySerializers_) { + keyBuilders.emplace_back(ks->MakeBuilder(size, ctx)); + } + + TVector<std::unique_ptr<IAggColumnBuilder>> aggBuilders; + for (const auto& a : s.Aggs_) { + aggBuilders.emplace_back(a->MakeBuilder(size)); + } + + for (auto iter = s.HashMap_->Begin(); iter != s.HashMap_->End(); s.HashMap_->Advance(iter)) { + if (s.HashMap_->GetPSL(iter) < 0) { + continue; + } + + ui32 key = s.HashMap_->GetKey(iter); + auto ptr = (const char*)s.HashMap_->GetPayload(iter); + TInputBuffer in(TStringBuf((const char*)&key, sizeof(key))); + for (auto& kb : keyBuilders) { + kb->Add(in); + } + + for (size_t i = 0; i < s.Aggs_.size(); ++i) { + if (output[Keys_.size() + i]) { + aggBuilders[i]->Add(ptr); + } + + ptr += s.Aggs_[i]->StateSize; + } + } + + for (ui32 i = 0; i < Keys_.size(); ++i) { + if (output[i]) { + *output[i] = keyBuilders[i]->Build(); + } + } + + for (size_t i = 0; i < s.Aggs_.size(); ++i) { + if (output[Keys_.size() + i]) { + *output[Keys_.size() + i] = aggBuilders[i]->Build(); + } + } + + MKQL_ENSURE(output[OutputWidth_ - 1], "Block size should not be marked as unused"); + *output[OutputWidth_ - 1] = ctx.HolderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(size))); + return EFetchResult::One; + } + } + } + +private: + struct TState : public TComputationValue<TState> { + TVector<NUdf::TUnboxedValue> Values_; + TVector<NUdf::TUnboxedValue*> ValuePointers_; + TVector<std::unique_ptr<IBlockAggregator>> Aggs_; + bool IsFinished_ = false; + bool HasValues_ = false; + ui32 TotalStateSize_ = 0; + std::unique_ptr<TRobinHoodHashMap<ui32>> HashMap_; + + TState(TMemoryUsageInfo* memInfo, size_t width, std::optional<ui32> filterColumn, const TVector<TAggParams>& params, TComputationContext& ctx) + : TComputationValue(memInfo) + , Values_(width) + , ValuePointers_(width) + { + for (size_t i = 0; i < width; ++i) { + ValuePointers_[i] = &Values_[i]; + } + + for (const auto& p : params) { + Aggs_.emplace_back(MakeBlockAggregator(p.Name, p.TupleType, filterColumn, p.ArgColumns, ctx)); + + TotalStateSize_ += Aggs_.back()->StateSize; + } + + HashMap_ = std::make_unique<TRobinHoodHashMap<ui32>>(TotalStateSize_); + } + }; + +private: + void RegisterDependencies() const final { + FlowDependsOn(Flow_); + } + + TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const { + if (!state.HasValue()) { + state = ctx.HolderFactory.Create<TState>(Width_, FilterColumn_, AggsParams_, ctx); } return *static_cast<TState*>(state.AsBoxed().Get()); } @@ -161,9 +635,27 @@ private: const ui32 CountColumn_; std::optional<ui32> FilterColumn_; const size_t Width_; + const size_t OutputWidth_; + const std::vector<TKeyParams> Keys_; const TVector<TAggParams> AggsParams_; + ui32 TotalKeysSize_ = 0; + std::vector<std::unique_ptr<IKeySerializer>> KeySerializers_; }; +void FillAggParams(TTupleLiteral* aggsVal, TTupleType* tupleType, TVector<TAggParams>& aggsParams) { + for (ui32 i = 0; i < aggsVal->GetValuesCount(); ++i) { + auto aggVal = AS_VALUE(TTupleLiteral, aggsVal->GetValue(i)); + auto name = AS_VALUE(TDataLiteral, aggVal->GetValue(0))->AsValue().AsStringRef(); + + std::vector<ui32> argColumns; + for (ui32 j = 1; j < aggVal->GetValuesCount(); ++j) { + argColumns.push_back(AS_VALUE(TDataLiteral, aggVal->GetValue(j))->AsValue().Get<ui32>()); + } + + aggsParams.emplace_back(TAggParams{ TStringBuf(name), tupleType, argColumns }); + } +} + } IComputationNode* WrapBlockCombineAll(TCallable& callable, const TComputationNodeFactoryContext& ctx) { @@ -183,19 +675,36 @@ IComputationNode* WrapBlockCombineAll(TCallable& callable, const TComputationNod auto aggsVal = AS_VALUE(TTupleLiteral, callable.GetInput(3)); TVector<TAggParams> aggsParams; - for (ui32 i = 0; i < aggsVal->GetValuesCount(); ++i) { - auto aggVal = AS_VALUE(TTupleLiteral, aggsVal->GetValue(i)); - auto name = AS_VALUE(TDataLiteral, aggVal->GetValue(0))->AsValue().AsStringRef(); + FillAggParams(aggsVal, tupleType, aggsParams); + return new TBlockCombineAllWrapper(ctx.Mutables, wideFlow, countColumn, filterColumn, tupleType->GetElementsCount(), std::move(aggsParams)); +} - std::vector<ui32> argColumns; - for (ui32 j = 1; j < aggVal->GetValuesCount(); ++j) { - argColumns.push_back(AS_VALUE(TDataLiteral, aggVal->GetValue(j))->AsValue().Get<ui32>()); - } +IComputationNode* WrapBlockCombineHashed(TCallable& callable, const TComputationNodeFactoryContext& ctx) { + MKQL_ENSURE(callable.GetInputsCount() == 5, "Expected 5 args"); + const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); + const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType()); - aggsParams.emplace_back(TAggParams{ TStringBuf(name), tupleType, argColumns }); + auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0)); + MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); + + ui32 countColumn = AS_VALUE(TDataLiteral, callable.GetInput(1))->AsValue().Get<ui32>(); + auto filterColumnVal = AS_VALUE(TOptionalLiteral, callable.GetInput(2)); + std::optional<ui32> filterColumn; + if (filterColumnVal->HasItem()) { + filterColumn = AS_VALUE(TDataLiteral, filterColumnVal->GetItem())->AsValue().Get<ui32>(); } - return new TBlockCombineAllWrapper(ctx.Mutables, wideFlow, countColumn, filterColumn, tupleType->GetElementsCount(), std::move(aggsParams)); + auto keysVal = AS_VALUE(TTupleLiteral, callable.GetInput(3)); + std::vector<TKeyParams> keys; + for (ui32 i = 0; i < keysVal->GetValuesCount(); ++i) { + ui32 index = AS_VALUE(TDataLiteral, keysVal->GetValue(i))->AsValue().Get<ui32>(); + keys.emplace_back(TKeyParams{ index, tupleType->GetElementType(index) }); + } + + auto aggsVal = AS_VALUE(TTupleLiteral, callable.GetInput(4)); + TVector<TAggParams> aggsParams; + FillAggParams(aggsVal, tupleType, aggsParams); + return new TBlockCombineHashedWrapper(ctx.Mutables, wideFlow, countColumn, filterColumn, tupleType->GetElementsCount(), keys, std::move(aggsParams)); } } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.h index 8fbfee8f04..814514271d 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.h @@ -6,6 +6,7 @@ namespace NKikimr { namespace NMiniKQL { IComputationNode* WrapBlockCombineAll(TCallable& callable, const TComputationNodeFactoryContext& ctx); +IComputationNode* WrapBlockCombineHashed(TCallable& callable, const TComputationNodeFactoryContext& ctx); } } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp index e99e140ed2..5bf945f068 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp @@ -1,5 +1,9 @@ #include "mkql_block_agg_count.h" +#include <ydb/library/yql/minikql/arrow/arrow_defs.h> + +#include <arrow/array/builder_primitive.h> + namespace NKikimr { namespace NMiniKQL { @@ -9,8 +13,33 @@ public: ui64 Count_ = 0; }; - TCountAllBlockAggregator(std::optional<ui32> filterColumn) - : TBlockAggregatorBase(sizeof(TState), filterColumn) + class TColumnBuilder : public IAggColumnBuilder { + public: + TColumnBuilder(ui64 size, TComputationContext& ctx) + : Builder_(arrow::uint64(), &ctx.ArrowMemoryPool) + , Ctx_(ctx) + { + ARROW_OK(Builder_.Reserve(size)); + } + + void Add(const void* state) final { + auto typedState = static_cast<const TState*>(state); + Builder_.UnsafeAppend(typedState->Count_); + } + + NUdf::TUnboxedValue Build() final { + std::shared_ptr<arrow::ArrayData> result; + ARROW_OK(Builder_.FinishInternal(&result)); + return Ctx_.HolderFactory.CreateArrowBlock(result); + } + + private: + arrow::UInt64Builder Builder_; + TComputationContext& Ctx_; + }; + + TCountAllBlockAggregator(std::optional<ui32> filterColumn, TComputationContext& ctx) + : TBlockAggregatorBase(sizeof(TState), filterColumn, ctx) { } @@ -32,6 +61,22 @@ public: auto typedState = static_cast<const TState*>(state); return NUdf::TUnboxedValuePod(typedState->Count_); } + + void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + new(state) TState(); + UpdateKey(state, columns, row); + } + + void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + Y_UNUSED(columns); + Y_UNUSED(row); + auto typedState = static_cast<TState*>(state); + typedState->Count_ += 1; + } + + std::unique_ptr<IAggColumnBuilder> MakeBuilder(ui64 size) final { + return std::make_unique<TColumnBuilder>(size, Ctx_); + } }; class TCountBlockAggregator : public TBlockAggregatorBase { @@ -40,8 +85,33 @@ public: ui64 Count_ = 0; }; - TCountBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn) - : TBlockAggregatorBase(sizeof(TState), filterColumn) + class TColumnBuilder : public IAggColumnBuilder { + public: + TColumnBuilder(ui64 size, TComputationContext& ctx) + : Builder_(arrow::uint64(), &ctx.ArrowMemoryPool) + , Ctx_(ctx) + { + ARROW_OK(Builder_.Reserve(size)); + } + + void Add(const void* state) final { + auto typedState = static_cast<const TState*>(state); + Builder_.UnsafeAppend(typedState->Count_); + } + + NUdf::TUnboxedValue Build() final { + std::shared_ptr<arrow::ArrayData> result; + ARROW_OK(Builder_.FinishInternal(&result)); + return Ctx_.HolderFactory.CreateArrowBlock(result); + } + + private: + arrow::UInt64Builder Builder_; + TComputationContext& Ctx_; + }; + + TCountBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx) + : TBlockAggregatorBase(sizeof(TState), filterColumn, ctx) , ArgColumn_(argColumn) { } @@ -92,6 +162,35 @@ public: return NUdf::TUnboxedValuePod(typedState->Count_); } + void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + new(state) TState(); + UpdateKey(state, columns, row); + } + + void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + auto typedState = static_cast<TState*>(state); + const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); + if (datum.is_scalar()) { + if (datum.scalar()->is_valid) { + typedState->Count_ += 1; + } + } else { + const auto& array = datum.array(); + if (array->GetNullCount() == 0) { + typedState->Count_ += 1; + } else { + auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0); + auto fullIndex = row + array->offset; + auto bit = ((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1); + typedState->Count_ += bit; + } + } + } + + std::unique_ptr<IAggColumnBuilder> MakeBuilder(ui64 size) final { + return std::make_unique<TColumnBuilder>(size, Ctx_); + } + private: const ui32 ArgColumn_; }; @@ -102,11 +201,10 @@ public: TTupleType* tupleType, std::optional<ui32> filterColumn, const std::vector<ui32>& argsColumns, - const THolderFactory& holderFactory) const final { + TComputationContext& ctx) const final { Y_UNUSED(tupleType); Y_UNUSED(argsColumns); - Y_UNUSED(holderFactory); - return std::make_unique<TCountAllBlockAggregator>(filterColumn); + return std::make_unique<TCountAllBlockAggregator>(filterColumn, ctx); } }; @@ -116,10 +214,9 @@ public: TTupleType* tupleType, std::optional<ui32> filterColumn, const std::vector<ui32>& argsColumns, - const THolderFactory& holderFactory) const final { + TComputationContext& ctx) const final { Y_UNUSED(tupleType); - Y_UNUSED(holderFactory); - return std::make_unique<TCountBlockAggregator>(filterColumn, argsColumns[0]); + return std::make_unique<TCountBlockAggregator>(filterColumn, argsColumns[0], ctx); } }; diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp index 3fdf201761..a0a66aa9c9 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp @@ -25,14 +25,14 @@ std::unique_ptr<IBlockAggregator> MakeBlockAggregator( TTupleType* tupleType, std::optional<ui32> filterColumn, const std::vector<ui32>& argsColumns, - const THolderFactory& holderFactory) { + TComputationContext& ctx) { const auto& f = Singleton<TAggregatorFactories>()->Factories; auto it = f.find(name); if (it == f.end()) { throw yexception() << "Unsupported block aggregation function: " << name; } - return it->second->Make(tupleType, filterColumn, argsColumns, holderFactory); + return it->second->Make(tupleType, filterColumn, argsColumns, ctx); } } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h index 0e13b27dc2..2eae714e02 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h @@ -6,6 +6,15 @@ namespace NKikimr { namespace NMiniKQL { +class IAggColumnBuilder { +public: + virtual ~IAggColumnBuilder() = default; + + virtual void Add(const void* state) = 0; + + virtual NUdf::TUnboxedValue Build() = 0; +}; + class IBlockAggregator { public: virtual ~IBlockAggregator() = default; @@ -16,6 +25,12 @@ public: virtual NUdf::TUnboxedValue FinishOne(const void* state) = 0; + virtual void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) = 0; + + virtual void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) = 0; + + virtual std::unique_ptr<IAggColumnBuilder> MakeBuilder(ui64 size) = 0; + const ui32 StateSize; explicit IBlockAggregator(ui32 stateSize) @@ -25,14 +40,16 @@ public: class TBlockAggregatorBase : public IBlockAggregator { public: - TBlockAggregatorBase(ui32 stateSize, std::optional<ui32> filterColumn) + TBlockAggregatorBase(ui32 stateSize, std::optional<ui32> filterColumn, TComputationContext& ctx) : IBlockAggregator(stateSize) , FilterColumn_(filterColumn) + , Ctx_(ctx) { } protected: const std::optional<ui32> FilterColumn_; + TComputationContext& Ctx_; }; class THolderFactory; @@ -42,7 +59,7 @@ std::unique_ptr<IBlockAggregator> MakeBlockAggregator( TTupleType* tupleType, std::optional<ui32> filterColumn, const std::vector<ui32>& argsColumns, - const THolderFactory& holderFactory); + TComputationContext& ctx); class IBlockAggregatorFactory { public: @@ -52,7 +69,7 @@ public: TTupleType* tupleType, std::optional<ui32> filterColumn, const std::vector<ui32>& argsColumns, - const THolderFactory& holderFactory) const = 0; + TComputationContext& ctx) const = 0; }; } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp index 9407f90840..6849728fbd 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp @@ -4,8 +4,10 @@ #include <ydb/library/yql/minikql/mkql_node_cast.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> +#include <ydb/library/yql/minikql/arrow/arrow_defs.h> #include <arrow/scalar.h> +#include <arrow/array/builder_primitive.h> namespace NKikimr { namespace NMiniKQL { @@ -19,12 +21,12 @@ T UpdateMinMax(T x, T y) { } } -template <typename TIn, typename TInScalar, bool IsMin> +template <typename TIn, typename TInScalar, typename TBuilder, bool IsMin> class TMinMaxBlockAggregatorNullableOrScalar : public TBlockAggregatorBase { public: struct TState { TIn Value_; - bool IsValid_ = false; + ui8 IsValid_ = 0; TState() { if constexpr (IsMin) { @@ -35,9 +37,40 @@ public: } }; - TMinMaxBlockAggregatorNullableOrScalar(std::optional<ui32> filterColumn, ui32 argColumn) - : TBlockAggregatorBase(sizeof(TState), filterColumn) + class TColumnBuilder : public IAggColumnBuilder { + public: + TColumnBuilder(ui64 size, const std::shared_ptr<arrow::DataType>& dataType, TComputationContext& ctx) + : Builder_(dataType, &ctx.ArrowMemoryPool) + , Ctx_(ctx) + { + ARROW_OK(Builder_.Reserve(size)); + } + + void Add(const void* state) final { + auto typedState = static_cast<const TState*>(state); + if (typedState->IsValid_) { + Builder_.UnsafeAppend(typedState->Value_); + } else { + Builder_.UnsafeAppendNull(); + } + } + + NUdf::TUnboxedValue Build() final { + std::shared_ptr<arrow::ArrayData> result; + ARROW_OK(Builder_.FinishInternal(&result)); + return Ctx_.HolderFactory.CreateArrowBlock(result); + } + + private: + TBuilder Builder_; + TComputationContext& Ctx_; + }; + + TMinMaxBlockAggregatorNullableOrScalar(std::optional<ui32> filterColumn, ui32 argColumn, + const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx) + : TBlockAggregatorBase(sizeof(TState), filterColumn, ctx) , ArgColumn_(argColumn) + , BuilderDataType_(builderDataType) { } @@ -52,7 +85,7 @@ public: if (datum.is_scalar()) { if (datum.scalar()->is_valid) { typedState->Value_ = datum.scalar_as<TInScalar>().value; - typedState->IsValid_ = true; + typedState->IsValid_ = 1; } } else { const auto& array = datum.array(); @@ -64,7 +97,7 @@ public: } if (!filtered) { - typedState->IsValid_ = true; + typedState->IsValid_ = 1; TIn value = typedState->Value_; if (array->GetNullCount() == 0) { for (int64_t i = 0; i < len; ++i) { @@ -89,7 +122,7 @@ public: TIn value = typedState->Value_; if (array->GetNullCount() == 0) { - typedState->IsValid_ = true; + typedState->IsValid_ = 1; for (int64_t i = 0; i < len; ++i) { TIn filterMask = (((*filterBitmap++) & 1) ^ 1) - TIn(1); value = UpdateMinMax<IsMin>(value, TIn((ptr[i] & filterMask) | (value & ~filterMask))); @@ -107,7 +140,7 @@ public: count += mask & 1; } - typedState->IsValid_ = typedState->IsValid_ || count > 0; + typedState->IsValid_ |= count ? 1 : 0; } typedState->Value_ = value; @@ -124,15 +157,51 @@ public: return NUdf::TUnboxedValuePod(typedState->Value_); } + void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + new(state) TState(); + UpdateKey(state, columns, row); + } + + void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + auto typedState = static_cast<TState*>(state); + const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); + if (datum.is_scalar()) { + if (datum.scalar()->is_valid) { + typedState->Value_ = datum.scalar_as<TInScalar>().value; + typedState->IsValid_ = 1; + } + } else { + const auto& array = datum.array(); + auto ptr = array->GetValues<TIn>(1); + if (array->GetNullCount() == 0) { + typedState->IsValid_ = 1; + typedState->Value_ = UpdateMinMax<IsMin>(typedState->Value_, ptr[row]); + } else { + auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0); + ui64 fullIndex = row + array->offset; + // bit 1 -> mask 0xFF..FF, bit 0 -> mask 0x00..00 + TIn mask = (((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1) ^ 1) - TIn(1); + typedState->Value_ = UpdateMinMax<IsMin>(typedState->Value_, TIn((ptr[row] & mask) | (typedState->Value_ & ~mask))); + typedState->IsValid_ |= mask & 1; + } + } + } + + std::unique_ptr<IAggColumnBuilder> MakeBuilder(ui64 size) final { + return std::make_unique<TColumnBuilder>(size, BuilderDataType_, Ctx_); + } + private: const ui32 ArgColumn_; + const std::shared_ptr<arrow::DataType> BuilderDataType_; }; -template <typename TIn, typename TInScalar, bool IsMin> +template <typename TIn, typename TInScalar, typename TBuilder, bool IsMin> class TMinMaxBlockAggregator: public TBlockAggregatorBase { public: struct TState { TIn Value_; + TState() { if constexpr (IsMin) { Value_ = std::numeric_limits<TIn>::max(); @@ -142,9 +211,36 @@ public: } }; - TMinMaxBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn) - : TBlockAggregatorBase(sizeof(TState), filterColumn) + class TColumnBuilder : public IAggColumnBuilder { + public: + TColumnBuilder(ui64 size, const std::shared_ptr<arrow::DataType>& dataType, TComputationContext& ctx) + : Builder_(dataType, &ctx.ArrowMemoryPool) + , Ctx_(ctx) + { + ARROW_OK(Builder_.Reserve(size)); + } + + void Add(const void* state) final { + auto typedState = static_cast<const TState*>(state); + Builder_.UnsafeAppend(typedState->Value_); + } + + NUdf::TUnboxedValue Build() final { + std::shared_ptr<arrow::ArrayData> result; + ARROW_OK(Builder_.FinishInternal(&result)); + return Ctx_.HolderFactory.CreateArrowBlock(result); + } + + private: + TBuilder Builder_; + TComputationContext& Ctx_; + }; + + TMinMaxBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn, + const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx) + : TBlockAggregatorBase(sizeof(TState), filterColumn, ctx) , ArgColumn_(argColumn) + , BuilderDataType_(builderDataType) { } @@ -191,8 +287,26 @@ public: return NUdf::TUnboxedValuePod(typedState->Value_); } + void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + new(state) TState(); + UpdateKey(state, columns, row); + } + + void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + auto typedState = static_cast<TState*>(state); + const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); + const auto& array = datum.array(); + auto ptr = array->GetValues<TIn>(1); + typedState->Value_ = UpdateMinMax<IsMin>(typedState->Value_, ptr[row]); + } + + std::unique_ptr<IAggColumnBuilder> MakeBuilder(ui64 size) final { + return std::make_unique<TColumnBuilder>(size, BuilderDataType_, Ctx_); + } + private: const ui32 ArgColumn_; + const std::shared_ptr<arrow::DataType> BuilderDataType_; }; template <bool IsMin> @@ -202,8 +316,7 @@ public: TTupleType* tupleType, std::optional<ui32> filterColumn, const std::vector<ui32>& argsColumns, - const THolderFactory& holderFactory) const final { - Y_UNUSED(holderFactory); + TComputationContext& ctx) const final { auto blockType = AS_TYPE(TBlockType, tupleType->GetElementType(argsColumns[0])); auto argType = blockType->GetItemType(); bool isOptional; @@ -211,52 +324,52 @@ public: if (blockType->GetShape() == TBlockType::EShape::Scalar || isOptional) { switch (*dataType->GetDataSlot()) { case NUdf::EDataSlot::Int8: - return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<i8, arrow::Int8Scalar, IsMin>>(filterColumn, argsColumns[0]); + return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<i8, arrow::Int8Scalar, arrow::Int8Builder, IsMin>>(filterColumn, argsColumns[0], arrow::int8(), ctx); case NUdf::EDataSlot::Bool: case NUdf::EDataSlot::Uint8: - return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<ui8, arrow::UInt8Scalar, IsMin>>(filterColumn, argsColumns[0]); + return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<ui8, arrow::UInt8Scalar, arrow::UInt8Builder, IsMin>>(filterColumn, argsColumns[0], arrow::uint8(), ctx); case NUdf::EDataSlot::Int16: - return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<i16, arrow::Int16Scalar, IsMin>>(filterColumn, argsColumns[0]); + return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<i16, arrow::Int16Scalar, arrow::Int16Builder, IsMin>>(filterColumn, argsColumns[0], arrow::int16(), ctx); case NUdf::EDataSlot::Uint16: case NUdf::EDataSlot::Date: - return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<ui16, arrow::UInt16Scalar, IsMin>>(filterColumn, argsColumns[0]); + return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<ui16, arrow::UInt16Scalar, arrow::UInt16Builder, IsMin>>(filterColumn, argsColumns[0], arrow::uint16(), ctx); case NUdf::EDataSlot::Int32: - return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<i32, arrow::Int32Scalar, IsMin>>(filterColumn, argsColumns[0]); + return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<i32, arrow::Int32Scalar, arrow::Int32Builder, IsMin>>(filterColumn, argsColumns[0], arrow::int32(), ctx); case NUdf::EDataSlot::Uint32: case NUdf::EDataSlot::Datetime: - return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<ui32, arrow::UInt32Scalar, IsMin>>(filterColumn, argsColumns[0]); + return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<ui32, arrow::UInt32Scalar, arrow::UInt32Builder, IsMin>>(filterColumn, argsColumns[0], arrow::uint32(), ctx); case NUdf::EDataSlot::Int64: case NUdf::EDataSlot::Interval: - return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<i64, arrow::Int64Scalar, IsMin>>(filterColumn, argsColumns[0]); + return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<i64, arrow::Int64Scalar, arrow::Int64Builder, IsMin>>(filterColumn, argsColumns[0], arrow::int64(), ctx); case NUdf::EDataSlot::Uint64: case NUdf::EDataSlot::Timestamp: - return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<ui64, arrow::UInt64Scalar, IsMin>>(filterColumn, argsColumns[0]); + return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<ui64, arrow::UInt64Scalar, arrow::UInt64Builder, IsMin>>(filterColumn, argsColumns[0], arrow::uint64(), ctx); default: throw yexception() << "Unsupported MIN/MAX input type"; } } else { switch (*dataType->GetDataSlot()) { case NUdf::EDataSlot::Int8: - return std::make_unique<TMinMaxBlockAggregator<i8, arrow::Int8Scalar, IsMin>>(filterColumn, argsColumns[0]); + return std::make_unique<TMinMaxBlockAggregator<i8, arrow::Int8Scalar, arrow::Int8Builder, IsMin>>(filterColumn, argsColumns[0], arrow::int8(), ctx); case NUdf::EDataSlot::Uint8: case NUdf::EDataSlot::Bool: - return std::make_unique<TMinMaxBlockAggregator<ui8, arrow::UInt8Scalar, IsMin>>(filterColumn, argsColumns[0]); + return std::make_unique<TMinMaxBlockAggregator<ui8, arrow::UInt8Scalar, arrow::UInt8Builder, IsMin>>(filterColumn, argsColumns[0], arrow::uint8(), ctx); case NUdf::EDataSlot::Int16: - return std::make_unique<TMinMaxBlockAggregator<i16, arrow::Int16Scalar, IsMin>>(filterColumn, argsColumns[0]); + return std::make_unique<TMinMaxBlockAggregator<i16, arrow::Int16Scalar, arrow::Int16Builder, IsMin>>(filterColumn, argsColumns[0], arrow::int16(), ctx); case NUdf::EDataSlot::Uint16: case NUdf::EDataSlot::Date: - return std::make_unique<TMinMaxBlockAggregator<ui16, arrow::UInt16Scalar, IsMin>>(filterColumn, argsColumns[0]); + return std::make_unique<TMinMaxBlockAggregator<ui16, arrow::UInt16Scalar, arrow::UInt16Builder, IsMin>>(filterColumn, argsColumns[0], arrow::uint16(), ctx); case NUdf::EDataSlot::Int32: - return std::make_unique<TMinMaxBlockAggregator<i32, arrow::Int32Scalar, IsMin>>(filterColumn, argsColumns[0]); + return std::make_unique<TMinMaxBlockAggregator<i32, arrow::Int32Scalar, arrow::Int32Builder, IsMin>>(filterColumn, argsColumns[0], arrow::int32(), ctx); case NUdf::EDataSlot::Uint32: case NUdf::EDataSlot::Datetime: - return std::make_unique<TMinMaxBlockAggregator<ui32, arrow::UInt32Scalar, IsMin>>(filterColumn, argsColumns[0]); + return std::make_unique<TMinMaxBlockAggregator<ui32, arrow::UInt32Scalar, arrow::UInt32Builder, IsMin>>(filterColumn, argsColumns[0], arrow::uint32(), ctx); case NUdf::EDataSlot::Int64: case NUdf::EDataSlot::Interval: - return std::make_unique<TMinMaxBlockAggregator<i64, arrow::Int64Scalar, IsMin>>(filterColumn, argsColumns[0]); + return std::make_unique<TMinMaxBlockAggregator<i64, arrow::Int64Scalar, arrow::Int64Builder, IsMin>>(filterColumn, argsColumns[0], arrow::int64(), ctx); case NUdf::EDataSlot::Uint64: case NUdf::EDataSlot::Timestamp: - return std::make_unique<TMinMaxBlockAggregator<ui64, arrow::UInt64Scalar, IsMin>>(filterColumn, argsColumns[0]); + return std::make_unique<TMinMaxBlockAggregator<ui64, arrow::UInt64Scalar, arrow::UInt64Builder, IsMin>>(filterColumn, argsColumns[0], arrow::uint64(), ctx); default: throw yexception() << "Unsupported MIN/MAX input type"; } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp index 347ad0a26e..74e2d884a3 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp @@ -4,23 +4,56 @@ #include <ydb/library/yql/minikql/mkql_node_cast.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> +#include <ydb/library/yql/minikql/arrow/arrow_defs.h> #include <arrow/scalar.h> +#include <arrow/array/builder_primitive.h> namespace NKikimr { namespace NMiniKQL { -template <typename TIn, typename TSum, typename TInScalar> +template <typename TIn, typename TSum, typename TBuilder, typename TInScalar> class TSumBlockAggregatorNullableOrScalar : public TBlockAggregatorBase { public: struct TState { TSum Sum_ = 0; - bool IsValid_ = false; + ui8 IsValid_ = 0; }; - TSumBlockAggregatorNullableOrScalar(std::optional<ui32> filterColumn, ui32 argColumn) - : TBlockAggregatorBase(sizeof(TState), filterColumn) + class TColumnBuilder : public IAggColumnBuilder { + public: + TColumnBuilder(ui64 size, const std::shared_ptr<arrow::DataType>& dataType, TComputationContext& ctx) + : Builder_(dataType, &ctx.ArrowMemoryPool) + , Ctx_(ctx) + { + ARROW_OK(Builder_.Reserve(size)); + } + + void Add(const void* state) final { + auto typedState = static_cast<const TState*>(state); + if (typedState->IsValid_) { + Builder_.UnsafeAppend(typedState->Sum_); + } else { + Builder_.UnsafeAppendNull(); + } + } + + NUdf::TUnboxedValue Build() final { + std::shared_ptr<arrow::ArrayData> result; + ARROW_OK(Builder_.FinishInternal(&result)); + return Ctx_.HolderFactory.CreateArrowBlock(result); + } + + private: + TBuilder Builder_; + TComputationContext& Ctx_; + }; + + TSumBlockAggregatorNullableOrScalar(std::optional<ui32> filterColumn, ui32 argColumn, + const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx) + : TBlockAggregatorBase(sizeof(TState), filterColumn, ctx) , ArgColumn_(argColumn) + , BuilderDataType_(builderDataType) { } @@ -34,7 +67,7 @@ public: if (datum.is_scalar()) { if (datum.scalar()->is_valid) { typedState->Sum_ += (filtered ? *filtered : batchLength) * datum.scalar_as<TInScalar>().value; - typedState->IsValid_ = true; + typedState->IsValid_ = 1; } } else { const auto& array = datum.array(); @@ -46,7 +79,7 @@ public: } if (!filtered) { - typedState->IsValid_ = true; + typedState->IsValid_ = 1; TSum sum = typedState->Sum_; if (array->GetNullCount() == 0) { for (int64_t i = 0; i < len; ++i) { @@ -70,7 +103,7 @@ public: auto filterBitmap = filterArray->template GetValues<uint8_t>(1, 0); TSum sum = typedState->Sum_; if (array->GetNullCount() == 0) { - typedState->IsValid_ = true; + typedState->IsValid_ = 1; for (int64_t i = 0; i < len; ++i) { ui64 fullIndex = i + array->offset; // bit 1 -> mask 0xFF..FF, bit 0 -> mask 0x00..00 @@ -90,7 +123,7 @@ public: count += mask & 1; } - typedState->IsValid_ = typedState->IsValid_ || count > 0; + typedState->IsValid_ |= count ? 1 : 0; } typedState->Sum_ = sum; @@ -107,20 +140,82 @@ public: return NUdf::TUnboxedValuePod(typedState->Sum_); } + void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + new(state) TState(); + UpdateKey(state, columns, row); + } + + void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + auto typedState = static_cast<TState*>(state); + const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); + if (datum.is_scalar()) { + if (datum.scalar()->is_valid) { + typedState->Sum_ += datum.scalar_as<TInScalar>().value; + typedState->IsValid_ = 1; + } + } else { + const auto& array = datum.array(); + auto ptr = array->GetValues<TIn>(1); + if (array->GetNullCount() == 0) { + typedState->IsValid_ = 1; + typedState->Sum_ += ptr[row]; + } else { + auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0); + ui64 fullIndex = row + array->offset; + // bit 1 -> mask 0xFF..FF, bit 0 -> mask 0x00..00 + TIn mask = (((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1) ^ 1) - TIn(1); + typedState->Sum_ += (ptr[row] & mask); + typedState->IsValid_ |= mask & 1; + } + } + } + + std::unique_ptr<IAggColumnBuilder> MakeBuilder(ui64 size) final { + return std::make_unique<TColumnBuilder>(size, BuilderDataType_, Ctx_); + } + private: const ui32 ArgColumn_; + const std::shared_ptr<arrow::DataType> BuilderDataType_; }; -template <typename TIn, typename TSum, typename TInScalar> +template <typename TIn, typename TSum, typename TBuilder, typename TInScalar> class TSumBlockAggregator : public TBlockAggregatorBase { public: struct TState { TSum Sum_ = 0; }; - TSumBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn) - : TBlockAggregatorBase(sizeof(TState), filterColumn) + class TColumnBuilder : public IAggColumnBuilder { + public: + TColumnBuilder(ui64 size, const std::shared_ptr<arrow::DataType>& dataType, TComputationContext& ctx) + : Builder_(dataType, &ctx.ArrowMemoryPool) + , Ctx_(ctx) + { + ARROW_OK(Builder_.Reserve(size)); + } + + void Add(const void* state) final { + auto typedState = static_cast<const TState*>(state); + Builder_.UnsafeAppend(typedState->Sum_); + } + + NUdf::TUnboxedValue Build() final { + std::shared_ptr<arrow::ArrayData> result; + ARROW_OK(Builder_.FinishInternal(&result)); + return Ctx_.HolderFactory.CreateArrowBlock(result); + } + + private: + TBuilder Builder_; + TComputationContext& Ctx_; + }; + + TSumBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn, + const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx) + : TBlockAggregatorBase(sizeof(TState), filterColumn, ctx) , ArgColumn_(argColumn) + , BuilderDataType_(builderDataType) { } @@ -165,8 +260,26 @@ public: return NUdf::TUnboxedValuePod(typedState->Sum_); } + void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + new(state) TState(); + UpdateKey(state, columns, row); + } + + void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + auto typedState = static_cast<TState*>(state); + const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); + const auto& array = datum.array(); + auto ptr = array->GetValues<TIn>(1); + typedState->Sum_ += ptr[row]; + } + + std::unique_ptr<IAggColumnBuilder> MakeBuilder(ui64 size) final { + return std::make_unique<TColumnBuilder>(size, BuilderDataType_, Ctx_); + } + private: const ui32 ArgColumn_; + const std::shared_ptr<arrow::DataType> BuilderDataType_; }; template <typename TIn, typename TInScalar> @@ -177,10 +290,9 @@ public: ui64 Count_ = 0; }; - TAvgBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn, const THolderFactory& holderFactory) - : TBlockAggregatorBase(sizeof(TState), filterColumn) + TAvgBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx) + : TBlockAggregatorBase(sizeof(TState), filterColumn, ctx) , ArgColumn_(argColumn) - , HolderFactory_(holderFactory) { } @@ -265,15 +377,49 @@ public: } NUdf::TUnboxedValue* items; - auto arr = HolderFactory_.CreateDirectArrayHolder(2, items); + auto arr = Ctx_.HolderFactory.CreateDirectArrayHolder(2, items); items[0] = NUdf::TUnboxedValuePod(typedState->Sum_); items[1] = NUdf::TUnboxedValuePod(typedState->Count_); return arr; } + void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + new(state) TState(); + UpdateKey(state, columns, row); + } + + void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + auto typedState = static_cast<TState*>(state); + const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); + if (datum.is_scalar()) { + if (datum.scalar()->is_valid) { + typedState->Sum_ += double(datum.scalar_as<TInScalar>().value); + typedState->Count_ += 1; + } + } else { + const auto& array = datum.array(); + auto ptr = array->GetValues<TIn>(1); + if (array->GetNullCount() == 0) { + typedState->Sum_ += double(ptr[row]); + typedState->Count_ += 1; + } else { + auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0); + ui64 fullIndex = row + array->offset; + // bit 1 -> mask 0xFF..FF, bit 0 -> mask 0x00..00 + TIn mask = (((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1) ^ 1) - TIn(1); + typedState->Sum_ += double(ptr[row] & mask); + typedState->Count_ += mask & 1; + } + } + } + + std::unique_ptr<IAggColumnBuilder> MakeBuilder(ui64 size) final { + Y_UNUSED(size); + MKQL_ENSURE(false, "TODO: support of tuples"); + } + private: const ui32 ArgColumn_; - const THolderFactory& HolderFactory_; }; class TBlockSumFactory : public IBlockAggregatorFactory { @@ -282,8 +428,7 @@ public: TTupleType* tupleType, std::optional<ui32> filterColumn, const std::vector<ui32>& argsColumns, - const THolderFactory& holderFactory) const final { - Y_UNUSED(holderFactory); + TComputationContext& ctx) const final { auto blockType = AS_TYPE(TBlockType, tupleType->GetElementType(argsColumns[0])); auto argType = blockType->GetItemType(); bool isOptional; @@ -291,50 +436,50 @@ public: if (blockType->GetShape() == TBlockType::EShape::Scalar || isOptional) { switch (*dataType->GetDataSlot()) { case NUdf::EDataSlot::Int8: - return std::make_unique<TSumBlockAggregatorNullableOrScalar<i8, i64, arrow::Int8Scalar>>(filterColumn, argsColumns[0]); + return std::make_unique<TSumBlockAggregatorNullableOrScalar<i8, i64, arrow::Int64Builder, arrow::Int8Scalar>>(filterColumn, argsColumns[0], arrow::int64(), ctx); case NUdf::EDataSlot::Uint8: - return std::make_unique<TSumBlockAggregatorNullableOrScalar<ui8, ui64, arrow::UInt8Scalar>>(filterColumn, argsColumns[0]); + return std::make_unique<TSumBlockAggregatorNullableOrScalar<ui8, ui64, arrow::UInt64Builder, arrow::UInt8Scalar>>(filterColumn, argsColumns[0], arrow::uint64(), ctx); case NUdf::EDataSlot::Int16: - return std::make_unique<TSumBlockAggregatorNullableOrScalar<i16, i64, arrow::Int16Scalar>>(filterColumn, argsColumns[0]); + return std::make_unique<TSumBlockAggregatorNullableOrScalar<i16, i64, arrow::Int64Builder, arrow::Int16Scalar>>(filterColumn, argsColumns[0], arrow::int64(), ctx); case NUdf::EDataSlot::Uint16: case NUdf::EDataSlot::Date: - return std::make_unique<TSumBlockAggregatorNullableOrScalar<ui16, ui64, arrow::UInt16Scalar>>(filterColumn, argsColumns[0]); + return std::make_unique<TSumBlockAggregatorNullableOrScalar<ui16, ui64, arrow::UInt64Builder, arrow::UInt16Scalar>>(filterColumn, argsColumns[0], arrow::uint64(), ctx); case NUdf::EDataSlot::Int32: - return std::make_unique<TSumBlockAggregatorNullableOrScalar<i32, i64, arrow::Int32Scalar>>(filterColumn, argsColumns[0]); + return std::make_unique<TSumBlockAggregatorNullableOrScalar<i32, i64, arrow::Int64Builder, arrow::Int32Scalar>>(filterColumn, argsColumns[0], arrow::int64(), ctx); case NUdf::EDataSlot::Uint32: case NUdf::EDataSlot::Datetime: - return std::make_unique<TSumBlockAggregatorNullableOrScalar<ui32, ui64, arrow::UInt32Scalar>>(filterColumn, argsColumns[0]); + return std::make_unique<TSumBlockAggregatorNullableOrScalar<ui32, ui64, arrow::UInt64Builder, arrow::UInt32Scalar>>(filterColumn, argsColumns[0], arrow::uint64(), ctx); case NUdf::EDataSlot::Int64: case NUdf::EDataSlot::Interval: - return std::make_unique<TSumBlockAggregatorNullableOrScalar<i64, i64, arrow::Int64Scalar>>(filterColumn, argsColumns[0]); + return std::make_unique<TSumBlockAggregatorNullableOrScalar<i64, i64, arrow::Int64Builder, arrow::Int64Scalar>>(filterColumn, argsColumns[0], arrow::int64(), ctx); case NUdf::EDataSlot::Uint64: case NUdf::EDataSlot::Timestamp: - return std::make_unique<TSumBlockAggregatorNullableOrScalar<ui64, ui64, arrow::UInt64Scalar>>(filterColumn, argsColumns[0]); + return std::make_unique<TSumBlockAggregatorNullableOrScalar<ui64, ui64, arrow::UInt64Builder, arrow::UInt64Scalar>>(filterColumn, argsColumns[0], arrow::uint64(), ctx); default: throw yexception() << "Unsupported SUM input type"; } } else { switch (*dataType->GetDataSlot()) { case NUdf::EDataSlot::Int8: - return std::make_unique<TSumBlockAggregator<i8, i64, arrow::Int8Scalar>>(filterColumn, argsColumns[0]); + return std::make_unique<TSumBlockAggregator<i8, i64, arrow::Int64Builder, arrow::Int8Scalar>>(filterColumn, argsColumns[0], arrow::int64(), ctx); case NUdf::EDataSlot::Uint8: - return std::make_unique<TSumBlockAggregator<ui8, ui64, arrow::UInt8Scalar>>(filterColumn, argsColumns[0]); + return std::make_unique<TSumBlockAggregator<ui8, ui64, arrow::UInt64Builder, arrow::UInt8Scalar>>(filterColumn, argsColumns[0], arrow::uint64(), ctx); case NUdf::EDataSlot::Int16: - return std::make_unique<TSumBlockAggregator<i16, i64, arrow::Int16Scalar>>(filterColumn, argsColumns[0]); + return std::make_unique<TSumBlockAggregator<i16, i64, arrow::Int64Builder, arrow::Int16Scalar>>(filterColumn, argsColumns[0], arrow::int64(), ctx); case NUdf::EDataSlot::Uint16: case NUdf::EDataSlot::Date: - return std::make_unique<TSumBlockAggregator<ui16, ui64, arrow::UInt16Scalar>>(filterColumn, argsColumns[0]); + return std::make_unique<TSumBlockAggregator<ui16, ui64, arrow::UInt64Builder, arrow::UInt16Scalar>>(filterColumn, argsColumns[0], arrow::uint64(), ctx); case NUdf::EDataSlot::Int32: - return std::make_unique<TSumBlockAggregator<i32, i64, arrow::Int32Scalar>>(filterColumn, argsColumns[0]); + return std::make_unique<TSumBlockAggregator<i32, i64, arrow::Int64Builder, arrow::Int32Scalar>>(filterColumn, argsColumns[0], arrow::int64(), ctx); case NUdf::EDataSlot::Uint32: case NUdf::EDataSlot::Datetime: - return std::make_unique<TSumBlockAggregator<ui32, ui64, arrow::UInt32Scalar>>(filterColumn, argsColumns[0]); + return std::make_unique<TSumBlockAggregator<ui32, ui64, arrow::UInt64Builder, arrow::UInt32Scalar>>(filterColumn, argsColumns[0], arrow::uint64(), ctx); case NUdf::EDataSlot::Int64: case NUdf::EDataSlot::Interval: - return std::make_unique<TSumBlockAggregator<i64, i64, arrow::Int64Scalar>>(filterColumn, argsColumns[0]); + return std::make_unique<TSumBlockAggregator<i64, i64, arrow::Int64Builder, arrow::Int64Scalar>>(filterColumn, argsColumns[0], arrow::int64(), ctx); case NUdf::EDataSlot::Uint64: case NUdf::EDataSlot::Timestamp: - return std::make_unique<TSumBlockAggregator<ui64, ui64, arrow::UInt64Scalar>>(filterColumn, argsColumns[0]); + return std::make_unique<TSumBlockAggregator<ui64, ui64, arrow::UInt64Builder, arrow::UInt64Scalar>>(filterColumn, argsColumns[0], arrow::uint64(), ctx); default: throw yexception() << "Unsupported SUM input type"; } @@ -348,31 +493,31 @@ public: TTupleType* tupleType, std::optional<ui32> filterColumn, const std::vector<ui32>& argsColumns, - const THolderFactory& holderFactory) const final { + TComputationContext& ctx) const final { auto argType = AS_TYPE(TBlockType, tupleType->GetElementType(argsColumns[0]))->GetItemType(); bool isOptional; auto dataType = UnpackOptionalData(argType, isOptional); switch (*dataType->GetDataSlot()) { case NUdf::EDataSlot::Int8: - return std::make_unique<TAvgBlockAggregator<i8, arrow::Int8Scalar>>(filterColumn, argsColumns[0], holderFactory); + return std::make_unique<TAvgBlockAggregator<i8, arrow::Int8Scalar>>(filterColumn, argsColumns[0], ctx); case NUdf::EDataSlot::Uint8: - return std::make_unique<TAvgBlockAggregator<ui8, arrow::UInt8Scalar>>(filterColumn, argsColumns[0], holderFactory); + return std::make_unique<TAvgBlockAggregator<ui8, arrow::UInt8Scalar>>(filterColumn, argsColumns[0], ctx); case NUdf::EDataSlot::Int16: - return std::make_unique<TAvgBlockAggregator<i16, arrow::Int16Scalar>>(filterColumn, argsColumns[0], holderFactory); + return std::make_unique<TAvgBlockAggregator<i16, arrow::Int16Scalar>>(filterColumn, argsColumns[0], ctx); case NUdf::EDataSlot::Uint16: case NUdf::EDataSlot::Date: - return std::make_unique<TAvgBlockAggregator<ui16, arrow::UInt16Scalar>>(filterColumn, argsColumns[0], holderFactory); + return std::make_unique<TAvgBlockAggregator<ui16, arrow::UInt16Scalar>>(filterColumn, argsColumns[0], ctx); case NUdf::EDataSlot::Int32: - return std::make_unique<TAvgBlockAggregator<i32, arrow::Int32Scalar>>(filterColumn, argsColumns[0], holderFactory); + return std::make_unique<TAvgBlockAggregator<i32, arrow::Int32Scalar>>(filterColumn, argsColumns[0], ctx); case NUdf::EDataSlot::Uint32: case NUdf::EDataSlot::Datetime: - return std::make_unique<TAvgBlockAggregator<ui32, arrow::UInt32Scalar>>(filterColumn, argsColumns[0], holderFactory); + return std::make_unique<TAvgBlockAggregator<ui32, arrow::UInt32Scalar>>(filterColumn, argsColumns[0], ctx); case NUdf::EDataSlot::Int64: case NUdf::EDataSlot::Interval: - return std::make_unique<TAvgBlockAggregator<i64, arrow::Int64Scalar>>(filterColumn, argsColumns[0], holderFactory); + return std::make_unique<TAvgBlockAggregator<i64, arrow::Int64Scalar>>(filterColumn, argsColumns[0], ctx); case NUdf::EDataSlot::Uint64: case NUdf::EDataSlot::Timestamp: - return std::make_unique<TAvgBlockAggregator<ui64, arrow::UInt64Scalar>>(filterColumn, argsColumns[0], holderFactory); + return std::make_unique<TAvgBlockAggregator<ui64, arrow::UInt64Scalar>>(filterColumn, argsColumns[0], ctx); default: throw yexception() << "Unsupported AVG input type"; } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp index ea59eb766e..ddf51de9bd 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp @@ -282,6 +282,7 @@ struct TCallableComputationNodeBuilderFuncMapFiller { {"BlockNot", &WrapBlockNot}, {"BlockCompress", &WrapBlockCompress}, {"BlockCombineAll", &WrapBlockCombineAll}, + {"BlockCombineHashed", &WrapBlockCombineHashed}, {"MakeHeap", &WrapMakeHeap}, {"PushHeap", &WrapPushHeap}, {"PopHeap", &WrapPopHeap}, diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_rh_hash.h b/ydb/library/yql/minikql/comp_nodes/mkql_rh_hash.h index 631d08c388..863f8cfa82 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_rh_hash.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_rh_hash.h @@ -20,8 +20,8 @@ protected: } public: - // returns payload pointer for Map or nullptr for Set - Y_FORCE_INLINE void* Insert(TKey key, bool& isNew) { + // returns iterator + Y_FORCE_INLINE char* Insert(TKey key, bool& isNew) { auto ret = InsertImpl(key, isNew, Capacity, Data); Size += isNew ? 1 : 0; return ret; @@ -87,30 +87,30 @@ public: } private: - Y_FORCE_INLINE void* InsertImpl(TKey key, bool& isNew, ui64 capacity, std::vector<char>& data) { + Y_FORCE_INLINE char* InsertImpl(TKey key, bool& isNew, ui64 capacity, std::vector<char>& data) { isNew = false; ui64 bucket = THash()(key) & (capacity - 1); char* ptr = data.data() + AsDeriv().GetCellSize() * bucket; TPSLStorage distance = 0; - void* returnPayload; + char* returnPtr; for (;;) { if (GetPSL(ptr) < 0) { isNew = true; GetPSL(ptr) = distance; GetKey(ptr) = key; - return GetPayload(ptr); + return ptr; } if (TEqual()(GetKey(ptr), key)) { - return GetPayload(ptr); + return ptr; } if (distance > GetPSL(ptr)) { // swap keys & state - returnPayload = GetPayload(ptr); + returnPtr = ptr; std::swap(distance, GetPSL(ptr)); std::swap(key, GetKey(ptr)); - AsDeriv().SavePayload(returnPayload); + AsDeriv().SavePayload(GetPayload(ptr)); isNew = true; ++distance; @@ -127,7 +127,7 @@ private: GetPSL(ptr) = distance; GetKey(ptr) = key; AsDeriv().RestorePayload(GetPayload(ptr)); - return returnPayload; // for original key + return returnPtr; // for original key } if (distance > GetPSL(ptr)) { @@ -152,8 +152,9 @@ private: } bool isNew; - auto payload = InsertImpl(GetKey(iter), isNew, newCapacity, newData); - AsDeriv().CopyPayload(payload, GetPayload(iter)); + auto newIter = InsertImpl(GetKey(iter), isNew, newCapacity, newData); + Y_ASSERT(isNew); + AsDeriv().CopyPayload(GetPayload(newIter), GetPayload(iter)); } Data.swap(newData); diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_rh_hash_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_rh_hash_ut.cpp index be87bd1ed3..d47bcb319a 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_rh_hash_ut.cpp +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_rh_hash_ut.cpp @@ -16,12 +16,15 @@ Y_UNIT_TEST_SUITE(TMiniKQLRobinHoodHashTest) { auto k = i % 1000; auto [it, inserted] = h.emplace(k, 0); bool isNew; - void* p = rh.Insert(k, isNew); + auto iter = rh.Insert(k, isNew); + UNIT_ASSERT_VALUES_EQUAL(rh.GetKey(iter), k); UNIT_ASSERT_VALUES_EQUAL(isNew, inserted); it->second += i; - *(i64*)p += i; if (isNew) { + *(i64*)rh.GetPayload(iter) = i; rh.CheckGrow(); + } else { + *(i64*)rh.GetPayload(iter) += i; } UNIT_ASSERT_VALUES_EQUAL(h.size(), rh.GetSize()); @@ -49,8 +52,8 @@ Y_UNIT_TEST_SUITE(TMiniKQLRobinHoodHashTest) { auto k = i % 1000; auto[it, inserted] = h.emplace(k); bool isNew; - void* p = rh.Insert(k, isNew); - Y_UNUSED(p); + auto iter = rh.Insert(k, isNew); + UNIT_ASSERT_VALUES_EQUAL(rh.GetKey(iter), k); UNIT_ASSERT_VALUES_EQUAL(isNew, inserted); if (isNew) { rh.CheckGrow(); diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp index d675b7c6b2..ffe022d2ef 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.cpp +++ b/ydb/library/yql/minikql/mkql_program_builder.cpp @@ -5338,6 +5338,42 @@ TRuntimeNode TProgramBuilder::BlockCombineAll(TRuntimeNode flow, ui32 countColum return TRuntimeNode(builder.Build(), false); } +TRuntimeNode TProgramBuilder::BlockCombineHashed(TRuntimeNode flow, ui32 countColumn, std::optional<ui32> filterColumn, const TArrayRef<ui32>& keys, + const TArrayRef<const TAggInfo>& aggs, TType* returnType) { + if constexpr (RuntimeVersion < 31U) { + THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; + } + + TCallableBuilder builder(Env, __func__, returnType); + builder.Add(flow); + builder.Add(NewDataLiteral<ui32>(countColumn)); + if (!filterColumn) { + builder.Add(NewEmptyOptionalDataLiteral(NUdf::TDataType<ui32>::Id)); + } else { + builder.Add(NewOptional(NewDataLiteral<ui32>(*filterColumn))); + } + + TVector<TRuntimeNode> keyNodes; + for (const auto& key : keys) { + keyNodes.push_back(NewDataLiteral<ui32>(key)); + } + + builder.Add(NewTuple(keyNodes)); + TVector<TRuntimeNode> aggsNodes; + for (const auto& agg : aggs) { + TVector<TRuntimeNode> params; + params.push_back(NewDataLiteral<NUdf::EDataSlot::String>(agg.Name)); + for (const auto& col : agg.ArgsColumns) { + params.push_back(NewDataLiteral<ui32>(col)); + } + + aggsNodes.push_back(NewTuple(params)); + } + + builder.Add(NewTuple(aggsNodes)); + return TRuntimeNode(builder.Build(), false); +} + bool CanExportType(TType* type, const TTypeEnvironment& env) { if (type->GetKind() == TType::EKind::Type) { return false; // Type of Type diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h index 265e22af61..868be9f577 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.h +++ b/ydb/library/yql/minikql/mkql_program_builder.h @@ -259,6 +259,8 @@ public: TRuntimeNode BlockBitCast(TRuntimeNode value, TType* targetType); TRuntimeNode BlockCombineAll(TRuntimeNode flow, ui32 countColumn, std::optional<ui32> filterColumn, const TArrayRef<const TAggInfo>& aggs, TType* returnType); + TRuntimeNode BlockCombineHashed(TRuntimeNode flow, ui32 countColumn, std::optional<ui32> filterColumn, const TArrayRef<ui32>& keys, + const TArrayRef<const TAggInfo>& aggs, TType* returnType); // udfs TRuntimeNode Udf( diff --git a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp index 132d16041d..071837afd9 100644 --- a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp +++ b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp @@ -2427,6 +2427,34 @@ TMkqlCommonCallableCompiler::TShared::TShared() { return ctx.ProgramBuilder.BlockCombineAll(arg, countColumn, filterColumn, aggs, returnType); }); + AddCallable("BlockCombineHashed", [](const TExprNode& node, TMkqlBuildContext& ctx) { + auto arg = MkqlBuildExpr(*node.Child(0), ctx); + ui32 countColumn = FromString<ui32>(node.Child(1)->Content()); + std::optional<ui32> filterColumn; + if (!node.Child(2)->IsCallable("Void")) { + filterColumn = FromString<ui32>(node.Child(2)->Content()); + } + + TVector<ui32> keys; + for (const auto& key : node.Child(3)->Children()) { + keys.push_back(FromString<ui32>(key->Content())); + } + + TVector<TAggInfo> aggs; + for (const auto& agg : node.Child(4)->Children()) { + TAggInfo info; + info.Name = TString(agg->Head().Head().Content()); + for (ui32 i = 1; i < agg->ChildrenSize(); ++i) { + info.ArgsColumns.push_back(FromString<ui32>(agg->Child(i)->Content())); + } + + aggs.push_back(info); + } + + auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder); + return ctx.ProgramBuilder.BlockCombineHashed(arg, countColumn, filterColumn, keys, aggs, returnType); + }); + AddCallable("BlockCompress", [](const TExprNode& node, TMkqlBuildContext& ctx) { const auto flow = MkqlBuildExpr(node.Head(), ctx); const auto index = FromString<ui32>(node.Child(1)->Content()); |