aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2022-12-08 22:45:32 +0300
committervvvv <vvvv@ydb.tech>2022-12-08 22:45:32 +0300
commit904b24c0f6381f6212ca11248c92b05503e89ee5 (patch)
tree622a727ca346eb83e32d6d06104da9addd214959
parent35e53e3bf1d4087b8c6fccfb6286001d94b5a68d (diff)
downloadydb-904b24c0f6381f6212ca11248c92b05503e89ee5.tar.gz
initial implementation of hashed combiner over blocks
https://st.yandex-team.ru/#63913c808060fd3734409ed5
-rw-r--r--ydb/library/yql/core/expr_nodes/yql_expr_nodes.json11
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_blocks.cpp139
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_blocks.h1
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_core.cpp1
-rw-r--r--ydb/library/yql/core/yql_aggregate_expander.cpp70
-rw-r--r--ydb/library/yql/core/yql_aggregate_expander.h2
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp533
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg.h1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp117
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp4
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h23
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp171
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp231
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_rh_hash.h23
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/mkql_rh_hash_ut.cpp11
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.cpp36
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.h2
-rw-r--r--ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp28
19 files changed, 1251 insertions, 154 deletions
diff --git a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
index 8e35c09274..0e83c9c9c9 100644
--- a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
+++ b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
@@ -2251,6 +2251,17 @@
{"Index": 2, "Name": "FilterColumn", "Type": "TExprBase"},
{"Index": 3, "Name": "Aggregations", "Type": "TExprList"}
]
+ },
+ {
+ "Name": "TCoBlockCombineHashed",
+ "Base": "TCoInputBase",
+ "Match": {"Type": "Callable", "Name": "BlockCombineHashed"},
+ "Children": [
+ {"Index": 1, "Name": "CountColumn", "Type": "TCoAtom"},
+ {"Index": 2, "Name": "FilterColumn", "Type": "TExprBase"},
+ {"Index": 3, "Name": "Keys", "Type": "TCoAtomList"},
+ {"Index": 4, "Name": "Aggregations", "Type": "TExprList"}
+ ]
}
]
}
diff --git a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp
index 1fc7cefbb7..c6c09dcd8e 100644
--- a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp
@@ -251,6 +251,48 @@ IGraphTransformer::TStatus BlockBitCastWrapper(const TExprNode::TPtr& input, TEx
return IGraphTransformer::TStatus::Ok;
}
+bool ValidateBlockAggs(TPositionHandle pos, const TTypeAnnotationNode::TListType inputItems, const TExprNode& aggs,
+ TTypeAnnotationNode::TListType& retMultiType, TExprContext& ctx) {
+ if (!EnsureTuple(aggs, ctx)) {
+ return false;
+ }
+
+ for (const auto& agg : aggs.Children()) {
+ if (!EnsureTupleMinSize(*agg, 1, ctx)) {
+ return false;
+ }
+
+ if (!agg->Head().IsCallable("AggBlockApply")) {
+ ctx.AddError(TIssue(ctx.GetPosition(pos), "Expected AggBlockApply"));
+ return false;
+ }
+
+ if (agg->ChildrenSize() != agg->Head().ChildrenSize()) {
+ ctx.AddError(TIssue(ctx.GetPosition(pos), "Different amount of input arguments"));
+ return false;
+ }
+
+ for (ui32 i = 1; i < agg->ChildrenSize(); ++i) {
+ ui32 argColumnIndex;
+ if (!TryFromString(agg->Child(i)->Content(), argColumnIndex) || argColumnIndex >= inputItems.size()) {
+ ctx.AddError(TIssue(ctx.GetPosition(pos), "Bad arg column index"));
+ return false;
+ }
+
+ auto applyArgType = agg->Head().Child(i)->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
+ if (!IsSameAnnotation(*inputItems[argColumnIndex], *applyArgType)) {
+ ctx.AddError(TIssue(ctx.GetPosition(pos), TStringBuilder() <<
+ "Mismatch argument type, expected: " << *applyArgType << ", got: " << *inputItems[argColumnIndex]));
+ return false;
+ }
+ }
+
+ retMultiType.push_back(AggApplySerializedStateType(agg->HeadPtr(), ctx));
+ }
+
+ return true;
+}
+
IGraphTransformer::TStatus BlockCombineAllWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) {
Y_UNUSED(output);
if (!EnsureArgsCount(*input, 4U, ctx.Expr)) {
@@ -287,6 +329,10 @@ IGraphTransformer::TStatus BlockCombineAllWrapper(const TExprNode::TPtr& input,
}
if (!input->Child(2)->IsCallable("Void")) {
+ if (!EnsureAtom(*input->Child(2), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
ui32 filterColumnIndex;
if (!TryFromString(input->Child(2)->Content(), filterColumnIndex) || filterColumnIndex >= inputItems.size()) {
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Bad filter column index"));
@@ -298,44 +344,95 @@ IGraphTransformer::TStatus BlockCombineAllWrapper(const TExprNode::TPtr& input,
}
}
- if (!EnsureTuple(*input->Child(3), ctx.Expr)) {
+ TTypeAnnotationNode::TListType retMultiType;
+ if (!ValidateBlockAggs(input->Pos(), inputItems, *input->Child(3), retMultiType, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
- TTypeAnnotationNode::TListType retMultiType;
- for (const auto& agg : input->Child(3)->Children()) {
- if (!EnsureTupleMinSize(*agg, 1, ctx.Expr)) {
+ auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(retMultiType);
+ input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType));
+ return IGraphTransformer::TStatus::Ok;
+}
+
+IGraphTransformer::TStatus BlockCombineHashedWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) {
+ Y_UNUSED(output);
+ if (!EnsureArgsCount(*input, 5U, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureWideFlowType(input->Head(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ const auto multiType = input->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>();
+ TTypeAnnotationNode::TListType inputItems;
+ for (const auto& type : multiType->GetItems()) {
+ if (!EnsureBlockOrScalarType(input->Pos(), *type, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
- if (!agg->Head().IsCallable("AggBlockApply")) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Expected AggBlockApply"));
+ bool isScalar;
+ inputItems.push_back(GetBlockItemType(*type, isScalar));
+ }
+
+ if (!EnsureAtom(*input->Child(1), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ ui32 countColumnIndex;
+ if (!TryFromString(input->Child(1)->Content(), countColumnIndex) || countColumnIndex >= inputItems.size()) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Bad count column index"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureSpecificDataType(input->Child(1)->Pos(), *inputItems[countColumnIndex], EDataSlot::Uint64, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!input->Child(2)->IsCallable("Void")) {
+ if (!EnsureAtom(*input->Child(2), ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
- if (agg->ChildrenSize() != agg->Head().ChildrenSize()) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Different amount of input arguments"));
+ ui32 filterColumnIndex;
+ if (!TryFromString(input->Child(2)->Content(), filterColumnIndex) || filterColumnIndex >= inputItems.size()) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Bad filter column index"));
return IGraphTransformer::TStatus::Error;
}
- for (ui32 i = 1; i < agg->ChildrenSize(); ++i) {
- ui32 argColumnIndex;
- if (!TryFromString(agg->Child(i)->Content(), argColumnIndex) || argColumnIndex >= inputItems.size()) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Bad arg column index"));
- return IGraphTransformer::TStatus::Error;
- }
+ if (!EnsureSpecificDataType(input->Child(2)->Pos(), *inputItems[filterColumnIndex], EDataSlot::Bool, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
- auto applyArgType = agg->Head().Child(i)->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
- if (!IsSameAnnotation(*inputItems[argColumnIndex], *applyArgType)) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), TStringBuilder() <<
- "Mismatch argument type, expected: " << *applyArgType << ", got: " << *inputItems[argColumnIndex]));
- return IGraphTransformer::TStatus::Error;
- }
+ if (!EnsureTupleMinSize(*input->Child(3), 1, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ TTypeAnnotationNode::TListType retMultiType;
+ for (auto child : input->Child(3)->Children()) {
+ if (!EnsureAtom(*child, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ ui32 keyColumnIndex;
+ if (!TryFromString(child->Content(), keyColumnIndex) || keyColumnIndex >= inputItems.size()) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Bad key column index"));
+ return IGraphTransformer::TStatus::Error;
}
- retMultiType.push_back(AggApplySerializedStateType(agg->HeadPtr(), ctx.Expr));
+ retMultiType.push_back(inputItems[keyColumnIndex]);
+ }
+
+ if (!ValidateBlockAggs(input->Pos(), inputItems, *input->Child(4), retMultiType, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ for (auto& t : retMultiType) {
+ t = ctx.Expr.MakeType<TBlockExprType>(t);
}
+ retMultiType.push_back(ctx.Expr.MakeType<TScalarExprType>(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64)));
auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(retMultiType);
input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType));
return IGraphTransformer::TStatus::Ok;
diff --git a/ydb/library/yql/core/type_ann/type_ann_blocks.h b/ydb/library/yql/core/type_ann/type_ann_blocks.h
index cc81b6f895..b8fd6fb9b8 100644
--- a/ydb/library/yql/core/type_ann/type_ann_blocks.h
+++ b/ydb/library/yql/core/type_ann/type_ann_blocks.h
@@ -15,6 +15,7 @@ namespace NTypeAnnImpl {
IGraphTransformer::TStatus BlockFuncWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx);
IGraphTransformer::TStatus BlockBitCastWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx);
IGraphTransformer::TStatus BlockCombineAllWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx);
+ IGraphTransformer::TStatus BlockCombineHashedWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx);
} // namespace NTypeAnnImpl
} // namespace NYql
diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp
index 972c322fe9..edbf7734c0 100644
--- a/ydb/library/yql/core/type_ann/type_ann_core.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp
@@ -11784,6 +11784,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
ExtFunctions["BlockFunc"] = &BlockFuncWrapper;
ExtFunctions["BlockBitCast"] = &BlockBitCastWrapper;
ExtFunctions["BlockCombineAll"] = &BlockCombineAllWrapper;
+ ExtFunctions["BlockCombineHashed"] = &BlockCombineHashedWrapper;
Functions["AsRange"] = &AsRangeWrapper;
Functions["RangeCreate"] = &RangeCreateWrapper;
diff --git a/ydb/library/yql/core/yql_aggregate_expander.cpp b/ydb/library/yql/core/yql_aggregate_expander.cpp
index 159ce50af7..52d6ab2432 100644
--- a/ydb/library/yql/core/yql_aggregate_expander.cpp
+++ b/ydb/library/yql/core/yql_aggregate_expander.cpp
@@ -492,11 +492,13 @@ TExprNode::TPtr TAggregateExpander::GetFinalAggStateExtractor(ui32 i) {
.Build();
}
-TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAll() {
+TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAllOrHashed() {
if (!TypesCtx.ArrowResolver) {
return nullptr;
}
+ const bool hashed = (KeyColumns->ChildrenSize() > 0);
+
auto streamArg = Ctx.NewArgument(Node->Pos(), "stream");
auto flow = Ctx.NewCallable(Node->Pos(), "ToFlow", { streamArg });
TVector<TString> inputColumns;
@@ -514,6 +516,26 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAll() {
TExprNode::TListType extractorRoots;
TExprNode::TListType aggs;
TVector<TString> outputColumns;
+ TExprNode::TListType keyIdxs;
+ TVector<const TTypeAnnotationNode*> allKeyTypes;
+ for (ui32 index = 0; index < KeyColumns->ChildrenSize(); ++index) {
+ auto keyName = KeyColumns->Child(index)->Content();
+ auto rowIndex = RowType->FindItem(keyName);
+ YQL_ENSURE(rowIndex, "Unknown column: " << keyName);
+ auto type = RowType->GetItems()[*rowIndex]->GetItemType();
+ extractorRoots.push_back(extractorArgs[*rowIndex]);
+
+ allKeyTypes.push_back(type);
+ keyIdxs.push_back(Ctx.NewAtom(Node->Pos(), ToString(index)));
+ outputColumns.push_back(TString(keyName));
+ }
+
+ bool supported = false;
+ YQL_ENSURE(TypesCtx.ArrowResolver->AreTypesSupported(Ctx.GetPosition(Node->Pos()), allKeyTypes, supported, Ctx));
+ if (!supported) {
+ return nullptr;
+ }
+
for (ui32 index = 0; index < AggregatedColumns->ChildrenSize(); ++index) {
auto trait = AggregatedColumns->Child(index)->ChildPtr(1);
if (trait->Child(0)->Content() == "count_all") {
@@ -571,15 +593,31 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAll() {
auto extractorLambda = Ctx.NewLambda(Node->Pos(), Ctx.NewArguments(Node->Pos(), std::move(extractorArgs)), std::move(extractorRoots));
auto mappedWideFlow = Ctx.NewCallable(Node->Pos(), "WideMap", { wideFlow, extractorLambda });
auto blocks = Ctx.NewCallable(Node->Pos(), "WideToBlocks", { mappedWideFlow });
- auto aggWideFlow = Ctx.Builder(Node->Pos())
- .Callable("BlockCombineAll")
- .Add(0, blocks)
- .Atom(1, ToString(mappedWidth))
- .Callable(2, "Void")
+ TExprNode::TPtr aggWideFlow;
+ if (hashed) {
+ aggWideFlow = Ctx.Builder(Node->Pos())
+ .Callable("WideFromBlocks")
+ .Callable(0, "BlockCombineHashed")
+ .Add(0, blocks)
+ .Atom(1, ToString(mappedWidth))
+ .Callable(2, "Void")
+ .Seal()
+ .Add(3, Ctx.NewList(Node->Pos(), std::move(keyIdxs)))
+ .Add(4, Ctx.NewList(Node->Pos(), std::move(aggs)))
+ .Seal()
.Seal()
- .Add(3, Ctx.NewList(Node->Pos(), std::move(aggs)))
- .Seal()
- .Build();
+ .Build();
+ } else {
+ aggWideFlow = Ctx.Builder(Node->Pos())
+ .Callable("BlockCombineAll")
+ .Add(0, blocks)
+ .Atom(1, ToString(mappedWidth))
+ .Callable(2, "Void")
+ .Seal()
+ .Add(3, Ctx.NewList(Node->Pos(), std::move(aggs)))
+ .Seal()
+ .Build();
+ }
auto finalFlow = MakeNarrowMap(Node->Pos(), outputColumns, aggWideFlow, Ctx);
auto root = Ctx.NewCallable(Node->Pos(), "FromFlow", { finalFlow });
@@ -2189,18 +2227,14 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombine() {
return nullptr;
}
- if (KeyColumns->ChildrenSize() == 0) {
- for (const auto& x : AggregatedColumns->Children()) {
- auto trait = x->ChildPtr(1);
- if (!trait->IsCallable("AggApply")) {
- return nullptr;
- }
+ for (const auto& x : AggregatedColumns->Children()) {
+ auto trait = x->ChildPtr(1);
+ if (!trait->IsCallable("AggApply")) {
+ return nullptr;
}
-
- return TryGenerateBlockCombineAll();
}
- return nullptr;
+ return TryGenerateBlockCombineAllOrHashed();
}
} // namespace NYql
diff --git a/ydb/library/yql/core/yql_aggregate_expander.h b/ydb/library/yql/core/yql_aggregate_expander.h
index e34b80328f..63f1cc9dfc 100644
--- a/ydb/library/yql/core/yql_aggregate_expander.h
+++ b/ydb/library/yql/core/yql_aggregate_expander.h
@@ -75,7 +75,7 @@ private:
TExprNode::TPtr GeneratePhases();
void GenerateInitForDistinct(TExprNodeBuilder& parent, ui32& ndx, const TIdxSet& indicies, const TExprNode::TPtr& distinctField);
TExprNode::TPtr GenerateJustOverStates(const TExprNode::TPtr& input, const TIdxSet& indicies);
- TExprNode::TPtr TryGenerateBlockCombineAll();
+ TExprNode::TPtr TryGenerateBlockCombineAllOrHashed();
TExprNode::TPtr TryGenerateBlockCombine();
private:
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
index a3418056e8..7524df9822 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
@@ -1,12 +1,18 @@
#include "mkql_block_agg.h"
#include "mkql_block_agg_factory.h"
+#include "mkql_rh_hash.h"
#include <ydb/library/yql/minikql/computation/mkql_computation_node_impl.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/minikql/mkql_node_cast.h>
+#include <ydb/library/yql/minikql/mkql_node_builder.h>
+#include <ydb/library/yql/minikql/arrow/arrow_defs.h>
+
+#include <arrow/scalar.h>
#include <arrow/array/array_primitive.h>
+#include <arrow/array/builder_primitive.h>
namespace NKikimr {
namespace NMiniKQL {
@@ -19,6 +25,190 @@ struct TAggParams {
std::vector<ui32> ArgColumns;
};
+struct TKeyParams {
+ ui32 Index;
+ TType* Type;
+};
+
+class TInputBuffer {
+public:
+ TInputBuffer(TStringBuf buf)
+ : Buf_(buf)
+ {}
+
+ char PopChar() {
+ Ensure(1);
+ char c = Buf_.Data()[Pos_];
+ ++Pos_;
+ return c;
+ }
+
+ template <typename T>
+ T PopNumber() {
+ Ensure(sizeof(T));
+ T t = *(const T*)(Buf_.Data() + Pos_);
+ Pos_ += sizeof(T);
+ return t;
+ }
+
+private:
+ void Ensure(size_t delta) {
+ MKQL_ENSURE(Pos_ + delta <= Buf_.Size(), "Unexpected end of buffer");
+ }
+
+private:
+ size_t Pos_ = 0;
+ TStringBuf Buf_;
+};
+
+class TOutputBuffer {
+public:
+ void PushChar(char c) {
+ Ensure(1);
+ Vec_[Pos_] = c;
+ ++Pos_;
+ }
+
+ template <typename T>
+ void PushNumber(T t) {
+ Ensure(sizeof(T));
+ *(T*)&Vec_[Pos_] = t;
+ Pos_ += sizeof(T);
+ }
+
+ // fill with zeros
+ void Resize(size_t size) {
+ Pos_ = 0;
+ Vec_.clear();
+ Vec_.resize(size);
+ }
+
+ void Rewind() {
+ Pos_ = 0;
+ }
+
+ TStringBuf Finish() const {
+ return TStringBuf(Vec_.data(), Vec_.data() + Pos_);
+ }
+
+private:
+ void Ensure(size_t delta) {
+ if (Pos_ + delta > Vec_.size()) {
+ Vec_.reserve(Max(2 * Vec_.capacity(), Pos_ + delta));
+ Vec_.resize(Pos_ + delta);
+ }
+ }
+
+private:
+ size_t Pos_ = 0;
+ TVector<char> Vec_;
+};
+
+class IKeyColumnBuilder {
+public:
+ virtual ~IKeyColumnBuilder() = default;
+
+ // decode part of buffer and advances position
+ virtual void Add(TInputBuffer& in) = 0;
+
+ virtual NUdf::TUnboxedValue Build() = 0;
+};
+
+class IKeySerializer {
+public:
+ virtual ~IKeySerializer() = default;
+
+ // handle scalar or array item
+ virtual void Serialize(const arrow::Datum& value, ui64 index, TOutputBuffer& out) const = 0;
+
+ virtual std::unique_ptr<IKeyColumnBuilder> MakeBuilder(ui64 size, TComputationContext& ctx) const = 0;
+};
+
+template <typename T, typename TBuilder, bool IsOptional>
+class TFixedSizeKeyColumnBuilder : public IKeyColumnBuilder {
+public:
+ TFixedSizeKeyColumnBuilder(ui64 size, const std::shared_ptr<arrow::DataType>& dataType, TComputationContext& ctx)
+ : Builder_(dataType, &ctx.ArrowMemoryPool)
+ , Ctx_(ctx)
+ {
+ ARROW_OK(this->Builder_.Reserve(size));
+ }
+
+ void Add(TInputBuffer& in) final {
+ if constexpr (IsOptional) {
+ if (in.PopChar()) {
+ auto x = in.PopNumber<T>();
+ this->Builder_.UnsafeAppend(x);
+ } else {
+ this->Builder_.UnsafeAppendNull();
+ }
+ } else {
+ auto x = in.PopNumber<T>();
+ this->Builder_.UnsafeAppend(x);
+ }
+ }
+
+ NUdf::TUnboxedValue Build() final {
+ std::shared_ptr<arrow::ArrayData> result;
+ ARROW_OK(this->Builder_.FinishInternal(&result));
+ return Ctx_.HolderFactory.CreateArrowBlock(result);
+ }
+
+private:
+ TBuilder Builder_;
+ TComputationContext& Ctx_;
+};
+
+template <typename T, typename TScalar, typename TBuilder, bool IsOptional>
+class TFixedSizeKeySerializer : public IKeySerializer {
+public:
+ TFixedSizeKeySerializer(const std::shared_ptr<arrow::DataType>& dataType)
+ : DataType_(dataType)
+ {}
+
+ virtual void Serialize(const arrow::Datum& value, ui64 index, TOutputBuffer& out) const final {
+ T x;
+ if (value.is_scalar()) {
+ const auto& scalar = value.scalar_as<TScalar>();
+ if constexpr (IsOptional) {
+ if (scalar.is_valid) {
+ out.PushChar(1);
+ x = scalar.value;
+ } else {
+ out.PushChar(0);
+ return;
+ }
+
+ } else {
+ Y_ASSERT(scalar.is_valid);
+ x = scalar.value;
+ }
+ } else {
+ const auto& array = *value.array();
+ if constexpr (IsOptional) {
+ if (array.GetNullCount() == 0 || arrow::BitUtil::GetBit(array.GetValues<uint8_t>(0, 0), index + array.offset)) {
+ out.PushChar(1);
+ x = array.GetValues<T>(1)[index];
+ } else {
+ out.PushChar(0);
+ return;
+ }
+ } else {
+ x = array.GetValues<T>(1)[index];
+ }
+ }
+
+ out.PushNumber<T>(x);
+ }
+
+ std::unique_ptr<IKeyColumnBuilder> MakeBuilder(ui64 size, TComputationContext& ctx) const final {
+ return std::make_unique<TFixedSizeKeyColumnBuilder<T, TBuilder, IsOptional>>(size, DataType_, ctx);
+ }
+
+private:
+ const std::shared_ptr<arrow::DataType> DataType_;
+};
+
class TBlockCombineAllWrapper : public TStatefulWideFlowComputationNode<TBlockCombineAllWrapper> {
public:
TBlockCombineAllWrapper(TComputationMutables& mutables,
@@ -115,7 +305,7 @@ private:
bool HasValues_ = false;
TVector<char> AggStates_;
- TState(TMemoryUsageInfo* memInfo, size_t width, std::optional<ui32> filterColumn, const TVector<TAggParams>& params, const THolderFactory& holderFactory)
+ TState(TMemoryUsageInfo* memInfo, size_t width, std::optional<ui32> filterColumn, const TVector<TAggParams>& params, TComputationContext& ctx)
: TComputationValue(memInfo)
, Values_(width)
, ValuePointers_(width)
@@ -126,7 +316,7 @@ private:
ui32 totalStateSize = 0;
for (const auto& p : params) {
- Aggs_.emplace_back(MakeBlockAggregator(p.Name, p.TupleType, filterColumn, p.ArgColumns, holderFactory));
+ Aggs_.emplace_back(MakeBlockAggregator(p.Name, p.TupleType, filterColumn, p.ArgColumns, ctx));
totalStateSize += Aggs_.back()->StateSize;
}
@@ -147,7 +337,291 @@ private:
TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
if (!state.HasValue()) {
- state = ctx.HolderFactory.Create<TState>(Width_, FilterColumn_, AggsParams_, ctx.HolderFactory);
+ state = ctx.HolderFactory.Create<TState>(Width_, FilterColumn_, AggsParams_, ctx);
+ }
+ return *static_cast<TState*>(state.AsBoxed().Get());
+ }
+
+ ui64 GetBatchLength(const NUdf::TUnboxedValue* columns) const {
+ return TArrowBlock::From(columns[CountColumn_]).GetDatum().scalar_as<arrow::UInt64Scalar>().value;
+ }
+
+private:
+ IComputationWideFlowNode* Flow_;
+ const ui32 CountColumn_;
+ std::optional<ui32> FilterColumn_;
+ const size_t Width_;
+ const TVector<TAggParams> AggsParams_;
+};
+
+class TBlockCombineHashedWrapper : public TStatefulWideFlowComputationNode<TBlockCombineHashedWrapper> {
+public:
+ TBlockCombineHashedWrapper(TComputationMutables& mutables,
+ IComputationWideFlowNode* flow,
+ ui32 countColumn,
+ std::optional<ui32> filterColumn,
+ size_t width,
+ const std::vector<TKeyParams>& keys,
+ TVector<TAggParams>&& aggsParams)
+ : TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any)
+ , Flow_(flow)
+ , CountColumn_(countColumn)
+ , FilterColumn_(filterColumn)
+ , Width_(width)
+ , OutputWidth_(keys.size() + aggsParams.size() + 1)
+ , Keys_(keys)
+ , AggsParams_(std::move(aggsParams))
+ {
+ for (const auto& k : Keys_) {
+ auto itemType = AS_TYPE(TBlockType, k.Type)->GetItemType();
+ bool isOptional;
+ auto dataType = UnpackOptionalData(itemType, isOptional);
+ if (isOptional) {
+ TotalKeysSize_ += 1;
+ }
+
+ switch (*dataType->GetDataSlot()) {
+ case NUdf::EDataSlot::Int8:
+ TotalKeysSize_ += 1;
+ if (isOptional) {
+ KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<i8, arrow::Int8Scalar, arrow::Int8Builder, true>>(arrow::int8()));
+ } else {
+ KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<i8, arrow::Int8Scalar, arrow::Int8Builder, false>>(arrow::int8()));
+ }
+
+ break;
+ case NUdf::EDataSlot::Bool:
+ case NUdf::EDataSlot::Uint8:
+ TotalKeysSize_ += 1;
+ if (isOptional) {
+ KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui8, arrow::UInt8Scalar, arrow::UInt8Builder, true>>(arrow::uint8()));
+ } else {
+ KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui8, arrow::UInt8Scalar, arrow::UInt8Builder, false>>(arrow::uint8()));
+ }
+
+ break;
+ case NUdf::EDataSlot::Int16:
+ TotalKeysSize_ += 2;
+ if (isOptional) {
+ KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<i16, arrow::Int16Scalar, arrow::Int16Builder, true>>(arrow::int16()));
+ } else {
+ KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<i16, arrow::Int16Scalar, arrow::Int16Builder, false>>(arrow::int16()));
+ }
+
+ break;
+ case NUdf::EDataSlot::Uint16:
+ case NUdf::EDataSlot::Date:
+ TotalKeysSize_ += 2;
+ if (isOptional) {
+ KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui16, arrow::UInt16Scalar, arrow::UInt16Builder, true>>(arrow::uint16()));
+ } else {
+ KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui16, arrow::UInt16Scalar, arrow::UInt16Builder, false>>(arrow::uint16()));
+ }
+
+ break;
+ case NUdf::EDataSlot::Int32:
+ TotalKeysSize_ += 4;
+ if (isOptional) {
+ KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<i32, arrow::Int32Scalar, arrow::Int32Builder, true>>(arrow::int32()));
+ } else {
+ KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<i32, arrow::Int32Scalar, arrow::Int32Builder, false>>(arrow::int32()));
+ }
+
+ break;
+ case NUdf::EDataSlot::Uint32:
+ case NUdf::EDataSlot::Datetime:
+ TotalKeysSize_ += 4;
+ if (isOptional) {
+ KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui32, arrow::UInt32Scalar, arrow::UInt32Builder, true>>(arrow::uint32()));
+ } else {
+ KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui32, arrow::UInt32Scalar, arrow::UInt32Builder, false>>(arrow::uint32()));
+ }
+
+ break;
+ case NUdf::EDataSlot::Int64:
+ case NUdf::EDataSlot::Interval:
+ TotalKeysSize_ += 8;
+ if (isOptional) {
+ KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<i64, arrow::Int64Scalar, arrow::Int64Builder, true>>(arrow::int64()));
+ } else {
+ KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<i64, arrow::Int64Scalar, arrow::Int64Builder, false>>(arrow::int64()));
+ }
+
+ break;
+ case NUdf::EDataSlot::Uint64:
+ case NUdf::EDataSlot::Timestamp:
+ TotalKeysSize_ += 8;
+ if (isOptional) {
+ KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui64, arrow::UInt64Scalar, arrow::UInt64Builder, true>>(arrow::uint64()));
+ } else {
+ KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui64, arrow::UInt64Scalar, arrow::UInt64Builder, false>>(arrow::uint64()));
+ }
+
+ break;
+ default:
+ throw yexception() << "Unsupported key type";
+ }
+ }
+
+ MKQL_ENSURE(TotalKeysSize_ <= 4, "TODO Support all lengths of keys");
+ }
+
+ EFetchResult DoCalculate(NUdf::TUnboxedValue& state,
+ TComputationContext& ctx,
+ NUdf::TUnboxedValue*const* output) const
+ {
+ auto& s = GetState(state, ctx);
+ if (s.IsFinished_) {
+ return EFetchResult::Finish;
+ }
+
+ for (;;) {
+ auto result = Flow_->FetchValues(ctx, s.ValuePointers_.data());
+ if (result == EFetchResult::Yield) {
+ return result;
+ } else if (result == EFetchResult::One) {
+ ui64 batchLength = GetBatchLength(s.Values_.data());
+ if (!batchLength) {
+ continue;
+ }
+
+ s.HasValues_ = true;
+ TVector<arrow::Datum> keysDatum;
+ keysDatum.reserve(Keys_.size());
+ for (ui32 i = 0; i < Keys_.size(); ++i) {
+ keysDatum.emplace_back(TArrowBlock::From(s.Values_[Keys_[i].Index]).GetDatum());
+ }
+
+ TOutputBuffer out;
+ out.Resize(sizeof(ui32));
+ for (ui64 row = 0; row < batchLength; ++row) {
+ out.Rewind();
+ // encode key
+ for (ui32 i = 0; i < keysDatum.size(); ++i) {
+ KeySerializers_[i]->Serialize(keysDatum[i], row, out);
+ }
+
+ auto str = out.Finish();
+ Y_ASSERT(str.Size() <= sizeof(ui32));
+ ui32 key = *(const ui32*)str.Data();
+ bool isNew;
+ auto iter = s.HashMap_->Insert(key, isNew);
+ char* ptr = (char*)s.HashMap_->GetPayload(iter);
+ if (isNew) {
+ for (size_t i = 0; i < s.Aggs_.size(); ++i) {
+ if (output[Keys_.size() + i]) {
+ s.Aggs_[i]->InitKey(ptr, s.Values_.data(), row);
+ }
+
+ ptr += s.Aggs_[i]->StateSize;
+ }
+
+ s.HashMap_->CheckGrow();
+ } else {
+ for (size_t i = 0; i < s.Aggs_.size(); ++i) {
+ if (output[Keys_.size() + i]) {
+ s.Aggs_[i]->UpdateKey(ptr, s.Values_.data(), row);
+ }
+
+ ptr += s.Aggs_[i]->StateSize;
+ }
+ }
+ }
+ } else {
+ s.IsFinished_ = true;
+ if (!s.HasValues_) {
+ return EFetchResult::Finish;
+ }
+
+ // export results, TODO: split by batches
+ auto size = s.HashMap_->GetSize();
+ TVector<std::unique_ptr<IKeyColumnBuilder>> keyBuilders;
+ for (const auto& ks : KeySerializers_) {
+ keyBuilders.emplace_back(ks->MakeBuilder(size, ctx));
+ }
+
+ TVector<std::unique_ptr<IAggColumnBuilder>> aggBuilders;
+ for (const auto& a : s.Aggs_) {
+ aggBuilders.emplace_back(a->MakeBuilder(size));
+ }
+
+ for (auto iter = s.HashMap_->Begin(); iter != s.HashMap_->End(); s.HashMap_->Advance(iter)) {
+ if (s.HashMap_->GetPSL(iter) < 0) {
+ continue;
+ }
+
+ ui32 key = s.HashMap_->GetKey(iter);
+ auto ptr = (const char*)s.HashMap_->GetPayload(iter);
+ TInputBuffer in(TStringBuf((const char*)&key, sizeof(key)));
+ for (auto& kb : keyBuilders) {
+ kb->Add(in);
+ }
+
+ for (size_t i = 0; i < s.Aggs_.size(); ++i) {
+ if (output[Keys_.size() + i]) {
+ aggBuilders[i]->Add(ptr);
+ }
+
+ ptr += s.Aggs_[i]->StateSize;
+ }
+ }
+
+ for (ui32 i = 0; i < Keys_.size(); ++i) {
+ if (output[i]) {
+ *output[i] = keyBuilders[i]->Build();
+ }
+ }
+
+ for (size_t i = 0; i < s.Aggs_.size(); ++i) {
+ if (output[Keys_.size() + i]) {
+ *output[Keys_.size() + i] = aggBuilders[i]->Build();
+ }
+ }
+
+ MKQL_ENSURE(output[OutputWidth_ - 1], "Block size should not be marked as unused");
+ *output[OutputWidth_ - 1] = ctx.HolderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(size)));
+ return EFetchResult::One;
+ }
+ }
+ }
+
+private:
+ struct TState : public TComputationValue<TState> {
+ TVector<NUdf::TUnboxedValue> Values_;
+ TVector<NUdf::TUnboxedValue*> ValuePointers_;
+ TVector<std::unique_ptr<IBlockAggregator>> Aggs_;
+ bool IsFinished_ = false;
+ bool HasValues_ = false;
+ ui32 TotalStateSize_ = 0;
+ std::unique_ptr<TRobinHoodHashMap<ui32>> HashMap_;
+
+ TState(TMemoryUsageInfo* memInfo, size_t width, std::optional<ui32> filterColumn, const TVector<TAggParams>& params, TComputationContext& ctx)
+ : TComputationValue(memInfo)
+ , Values_(width)
+ , ValuePointers_(width)
+ {
+ for (size_t i = 0; i < width; ++i) {
+ ValuePointers_[i] = &Values_[i];
+ }
+
+ for (const auto& p : params) {
+ Aggs_.emplace_back(MakeBlockAggregator(p.Name, p.TupleType, filterColumn, p.ArgColumns, ctx));
+
+ TotalStateSize_ += Aggs_.back()->StateSize;
+ }
+
+ HashMap_ = std::make_unique<TRobinHoodHashMap<ui32>>(TotalStateSize_);
+ }
+ };
+
+private:
+ void RegisterDependencies() const final {
+ FlowDependsOn(Flow_);
+ }
+
+ TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
+ if (!state.HasValue()) {
+ state = ctx.HolderFactory.Create<TState>(Width_, FilterColumn_, AggsParams_, ctx);
}
return *static_cast<TState*>(state.AsBoxed().Get());
}
@@ -161,9 +635,27 @@ private:
const ui32 CountColumn_;
std::optional<ui32> FilterColumn_;
const size_t Width_;
+ const size_t OutputWidth_;
+ const std::vector<TKeyParams> Keys_;
const TVector<TAggParams> AggsParams_;
+ ui32 TotalKeysSize_ = 0;
+ std::vector<std::unique_ptr<IKeySerializer>> KeySerializers_;
};
+void FillAggParams(TTupleLiteral* aggsVal, TTupleType* tupleType, TVector<TAggParams>& aggsParams) {
+ for (ui32 i = 0; i < aggsVal->GetValuesCount(); ++i) {
+ auto aggVal = AS_VALUE(TTupleLiteral, aggsVal->GetValue(i));
+ auto name = AS_VALUE(TDataLiteral, aggVal->GetValue(0))->AsValue().AsStringRef();
+
+ std::vector<ui32> argColumns;
+ for (ui32 j = 1; j < aggVal->GetValuesCount(); ++j) {
+ argColumns.push_back(AS_VALUE(TDataLiteral, aggVal->GetValue(j))->AsValue().Get<ui32>());
+ }
+
+ aggsParams.emplace_back(TAggParams{ TStringBuf(name), tupleType, argColumns });
+ }
+}
+
}
IComputationNode* WrapBlockCombineAll(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
@@ -183,19 +675,36 @@ IComputationNode* WrapBlockCombineAll(TCallable& callable, const TComputationNod
auto aggsVal = AS_VALUE(TTupleLiteral, callable.GetInput(3));
TVector<TAggParams> aggsParams;
- for (ui32 i = 0; i < aggsVal->GetValuesCount(); ++i) {
- auto aggVal = AS_VALUE(TTupleLiteral, aggsVal->GetValue(i));
- auto name = AS_VALUE(TDataLiteral, aggVal->GetValue(0))->AsValue().AsStringRef();
+ FillAggParams(aggsVal, tupleType, aggsParams);
+ return new TBlockCombineAllWrapper(ctx.Mutables, wideFlow, countColumn, filterColumn, tupleType->GetElementsCount(), std::move(aggsParams));
+}
- std::vector<ui32> argColumns;
- for (ui32 j = 1; j < aggVal->GetValuesCount(); ++j) {
- argColumns.push_back(AS_VALUE(TDataLiteral, aggVal->GetValue(j))->AsValue().Get<ui32>());
- }
+IComputationNode* WrapBlockCombineHashed(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
+ MKQL_ENSURE(callable.GetInputsCount() == 5, "Expected 5 args");
+ const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
+ const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType());
- aggsParams.emplace_back(TAggParams{ TStringBuf(name), tupleType, argColumns });
+ auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));
+ MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
+
+ ui32 countColumn = AS_VALUE(TDataLiteral, callable.GetInput(1))->AsValue().Get<ui32>();
+ auto filterColumnVal = AS_VALUE(TOptionalLiteral, callable.GetInput(2));
+ std::optional<ui32> filterColumn;
+ if (filterColumnVal->HasItem()) {
+ filterColumn = AS_VALUE(TDataLiteral, filterColumnVal->GetItem())->AsValue().Get<ui32>();
}
- return new TBlockCombineAllWrapper(ctx.Mutables, wideFlow, countColumn, filterColumn, tupleType->GetElementsCount(), std::move(aggsParams));
+ auto keysVal = AS_VALUE(TTupleLiteral, callable.GetInput(3));
+ std::vector<TKeyParams> keys;
+ for (ui32 i = 0; i < keysVal->GetValuesCount(); ++i) {
+ ui32 index = AS_VALUE(TDataLiteral, keysVal->GetValue(i))->AsValue().Get<ui32>();
+ keys.emplace_back(TKeyParams{ index, tupleType->GetElementType(index) });
+ }
+
+ auto aggsVal = AS_VALUE(TTupleLiteral, callable.GetInput(4));
+ TVector<TAggParams> aggsParams;
+ FillAggParams(aggsVal, tupleType, aggsParams);
+ return new TBlockCombineHashedWrapper(ctx.Mutables, wideFlow, countColumn, filterColumn, tupleType->GetElementsCount(), keys, std::move(aggsParams));
}
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.h
index 8fbfee8f04..814514271d 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.h
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.h
@@ -6,6 +6,7 @@ namespace NKikimr {
namespace NMiniKQL {
IComputationNode* WrapBlockCombineAll(TCallable& callable, const TComputationNodeFactoryContext& ctx);
+IComputationNode* WrapBlockCombineHashed(TCallable& callable, const TComputationNodeFactoryContext& ctx);
}
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp
index e99e140ed2..5bf945f068 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp
@@ -1,5 +1,9 @@
#include "mkql_block_agg_count.h"
+#include <ydb/library/yql/minikql/arrow/arrow_defs.h>
+
+#include <arrow/array/builder_primitive.h>
+
namespace NKikimr {
namespace NMiniKQL {
@@ -9,8 +13,33 @@ public:
ui64 Count_ = 0;
};
- TCountAllBlockAggregator(std::optional<ui32> filterColumn)
- : TBlockAggregatorBase(sizeof(TState), filterColumn)
+ class TColumnBuilder : public IAggColumnBuilder {
+ public:
+ TColumnBuilder(ui64 size, TComputationContext& ctx)
+ : Builder_(arrow::uint64(), &ctx.ArrowMemoryPool)
+ , Ctx_(ctx)
+ {
+ ARROW_OK(Builder_.Reserve(size));
+ }
+
+ void Add(const void* state) final {
+ auto typedState = static_cast<const TState*>(state);
+ Builder_.UnsafeAppend(typedState->Count_);
+ }
+
+ NUdf::TUnboxedValue Build() final {
+ std::shared_ptr<arrow::ArrayData> result;
+ ARROW_OK(Builder_.FinishInternal(&result));
+ return Ctx_.HolderFactory.CreateArrowBlock(result);
+ }
+
+ private:
+ arrow::UInt64Builder Builder_;
+ TComputationContext& Ctx_;
+ };
+
+ TCountAllBlockAggregator(std::optional<ui32> filterColumn, TComputationContext& ctx)
+ : TBlockAggregatorBase(sizeof(TState), filterColumn, ctx)
{
}
@@ -32,6 +61,22 @@ public:
auto typedState = static_cast<const TState*>(state);
return NUdf::TUnboxedValuePod(typedState->Count_);
}
+
+ void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ new(state) TState();
+ UpdateKey(state, columns, row);
+ }
+
+ void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ Y_UNUSED(columns);
+ Y_UNUSED(row);
+ auto typedState = static_cast<TState*>(state);
+ typedState->Count_ += 1;
+ }
+
+ std::unique_ptr<IAggColumnBuilder> MakeBuilder(ui64 size) final {
+ return std::make_unique<TColumnBuilder>(size, Ctx_);
+ }
};
class TCountBlockAggregator : public TBlockAggregatorBase {
@@ -40,8 +85,33 @@ public:
ui64 Count_ = 0;
};
- TCountBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn)
- : TBlockAggregatorBase(sizeof(TState), filterColumn)
+ class TColumnBuilder : public IAggColumnBuilder {
+ public:
+ TColumnBuilder(ui64 size, TComputationContext& ctx)
+ : Builder_(arrow::uint64(), &ctx.ArrowMemoryPool)
+ , Ctx_(ctx)
+ {
+ ARROW_OK(Builder_.Reserve(size));
+ }
+
+ void Add(const void* state) final {
+ auto typedState = static_cast<const TState*>(state);
+ Builder_.UnsafeAppend(typedState->Count_);
+ }
+
+ NUdf::TUnboxedValue Build() final {
+ std::shared_ptr<arrow::ArrayData> result;
+ ARROW_OK(Builder_.FinishInternal(&result));
+ return Ctx_.HolderFactory.CreateArrowBlock(result);
+ }
+
+ private:
+ arrow::UInt64Builder Builder_;
+ TComputationContext& Ctx_;
+ };
+
+ TCountBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
+ : TBlockAggregatorBase(sizeof(TState), filterColumn, ctx)
, ArgColumn_(argColumn)
{
}
@@ -92,6 +162,35 @@ public:
return NUdf::TUnboxedValuePod(typedState->Count_);
}
+ void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ new(state) TState();
+ UpdateKey(state, columns, row);
+ }
+
+ void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ auto typedState = static_cast<TState*>(state);
+ const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
+ if (datum.is_scalar()) {
+ if (datum.scalar()->is_valid) {
+ typedState->Count_ += 1;
+ }
+ } else {
+ const auto& array = datum.array();
+ if (array->GetNullCount() == 0) {
+ typedState->Count_ += 1;
+ } else {
+ auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
+ auto fullIndex = row + array->offset;
+ auto bit = ((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1);
+ typedState->Count_ += bit;
+ }
+ }
+ }
+
+ std::unique_ptr<IAggColumnBuilder> MakeBuilder(ui64 size) final {
+ return std::make_unique<TColumnBuilder>(size, Ctx_);
+ }
+
private:
const ui32 ArgColumn_;
};
@@ -102,11 +201,10 @@ public:
TTupleType* tupleType,
std::optional<ui32> filterColumn,
const std::vector<ui32>& argsColumns,
- const THolderFactory& holderFactory) const final {
+ TComputationContext& ctx) const final {
Y_UNUSED(tupleType);
Y_UNUSED(argsColumns);
- Y_UNUSED(holderFactory);
- return std::make_unique<TCountAllBlockAggregator>(filterColumn);
+ return std::make_unique<TCountAllBlockAggregator>(filterColumn, ctx);
}
};
@@ -116,10 +214,9 @@ public:
TTupleType* tupleType,
std::optional<ui32> filterColumn,
const std::vector<ui32>& argsColumns,
- const THolderFactory& holderFactory) const final {
+ TComputationContext& ctx) const final {
Y_UNUSED(tupleType);
- Y_UNUSED(holderFactory);
- return std::make_unique<TCountBlockAggregator>(filterColumn, argsColumns[0]);
+ return std::make_unique<TCountBlockAggregator>(filterColumn, argsColumns[0], ctx);
}
};
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp
index 3fdf201761..a0a66aa9c9 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp
@@ -25,14 +25,14 @@ std::unique_ptr<IBlockAggregator> MakeBlockAggregator(
TTupleType* tupleType,
std::optional<ui32> filterColumn,
const std::vector<ui32>& argsColumns,
- const THolderFactory& holderFactory) {
+ TComputationContext& ctx) {
const auto& f = Singleton<TAggregatorFactories>()->Factories;
auto it = f.find(name);
if (it == f.end()) {
throw yexception() << "Unsupported block aggregation function: " << name;
}
- return it->second->Make(tupleType, filterColumn, argsColumns, holderFactory);
+ return it->second->Make(tupleType, filterColumn, argsColumns, ctx);
}
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h
index 0e13b27dc2..2eae714e02 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h
@@ -6,6 +6,15 @@
namespace NKikimr {
namespace NMiniKQL {
+class IAggColumnBuilder {
+public:
+ virtual ~IAggColumnBuilder() = default;
+
+ virtual void Add(const void* state) = 0;
+
+ virtual NUdf::TUnboxedValue Build() = 0;
+};
+
class IBlockAggregator {
public:
virtual ~IBlockAggregator() = default;
@@ -16,6 +25,12 @@ public:
virtual NUdf::TUnboxedValue FinishOne(const void* state) = 0;
+ virtual void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) = 0;
+
+ virtual void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) = 0;
+
+ virtual std::unique_ptr<IAggColumnBuilder> MakeBuilder(ui64 size) = 0;
+
const ui32 StateSize;
explicit IBlockAggregator(ui32 stateSize)
@@ -25,14 +40,16 @@ public:
class TBlockAggregatorBase : public IBlockAggregator {
public:
- TBlockAggregatorBase(ui32 stateSize, std::optional<ui32> filterColumn)
+ TBlockAggregatorBase(ui32 stateSize, std::optional<ui32> filterColumn, TComputationContext& ctx)
: IBlockAggregator(stateSize)
, FilterColumn_(filterColumn)
+ , Ctx_(ctx)
{
}
protected:
const std::optional<ui32> FilterColumn_;
+ TComputationContext& Ctx_;
};
class THolderFactory;
@@ -42,7 +59,7 @@ std::unique_ptr<IBlockAggregator> MakeBlockAggregator(
TTupleType* tupleType,
std::optional<ui32> filterColumn,
const std::vector<ui32>& argsColumns,
- const THolderFactory& holderFactory);
+ TComputationContext& ctx);
class IBlockAggregatorFactory {
public:
@@ -52,7 +69,7 @@ public:
TTupleType* tupleType,
std::optional<ui32> filterColumn,
const std::vector<ui32>& argsColumns,
- const THolderFactory& holderFactory) const = 0;
+ TComputationContext& ctx) const = 0;
};
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp
index 9407f90840..6849728fbd 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp
@@ -4,8 +4,10 @@
#include <ydb/library/yql/minikql/mkql_node_cast.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
+#include <ydb/library/yql/minikql/arrow/arrow_defs.h>
#include <arrow/scalar.h>
+#include <arrow/array/builder_primitive.h>
namespace NKikimr {
namespace NMiniKQL {
@@ -19,12 +21,12 @@ T UpdateMinMax(T x, T y) {
}
}
-template <typename TIn, typename TInScalar, bool IsMin>
+template <typename TIn, typename TInScalar, typename TBuilder, bool IsMin>
class TMinMaxBlockAggregatorNullableOrScalar : public TBlockAggregatorBase {
public:
struct TState {
TIn Value_;
- bool IsValid_ = false;
+ ui8 IsValid_ = 0;
TState() {
if constexpr (IsMin) {
@@ -35,9 +37,40 @@ public:
}
};
- TMinMaxBlockAggregatorNullableOrScalar(std::optional<ui32> filterColumn, ui32 argColumn)
- : TBlockAggregatorBase(sizeof(TState), filterColumn)
+ class TColumnBuilder : public IAggColumnBuilder {
+ public:
+ TColumnBuilder(ui64 size, const std::shared_ptr<arrow::DataType>& dataType, TComputationContext& ctx)
+ : Builder_(dataType, &ctx.ArrowMemoryPool)
+ , Ctx_(ctx)
+ {
+ ARROW_OK(Builder_.Reserve(size));
+ }
+
+ void Add(const void* state) final {
+ auto typedState = static_cast<const TState*>(state);
+ if (typedState->IsValid_) {
+ Builder_.UnsafeAppend(typedState->Value_);
+ } else {
+ Builder_.UnsafeAppendNull();
+ }
+ }
+
+ NUdf::TUnboxedValue Build() final {
+ std::shared_ptr<arrow::ArrayData> result;
+ ARROW_OK(Builder_.FinishInternal(&result));
+ return Ctx_.HolderFactory.CreateArrowBlock(result);
+ }
+
+ private:
+ TBuilder Builder_;
+ TComputationContext& Ctx_;
+ };
+
+ TMinMaxBlockAggregatorNullableOrScalar(std::optional<ui32> filterColumn, ui32 argColumn,
+ const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx)
+ : TBlockAggregatorBase(sizeof(TState), filterColumn, ctx)
, ArgColumn_(argColumn)
+ , BuilderDataType_(builderDataType)
{
}
@@ -52,7 +85,7 @@ public:
if (datum.is_scalar()) {
if (datum.scalar()->is_valid) {
typedState->Value_ = datum.scalar_as<TInScalar>().value;
- typedState->IsValid_ = true;
+ typedState->IsValid_ = 1;
}
} else {
const auto& array = datum.array();
@@ -64,7 +97,7 @@ public:
}
if (!filtered) {
- typedState->IsValid_ = true;
+ typedState->IsValid_ = 1;
TIn value = typedState->Value_;
if (array->GetNullCount() == 0) {
for (int64_t i = 0; i < len; ++i) {
@@ -89,7 +122,7 @@ public:
TIn value = typedState->Value_;
if (array->GetNullCount() == 0) {
- typedState->IsValid_ = true;
+ typedState->IsValid_ = 1;
for (int64_t i = 0; i < len; ++i) {
TIn filterMask = (((*filterBitmap++) & 1) ^ 1) - TIn(1);
value = UpdateMinMax<IsMin>(value, TIn((ptr[i] & filterMask) | (value & ~filterMask)));
@@ -107,7 +140,7 @@ public:
count += mask & 1;
}
- typedState->IsValid_ = typedState->IsValid_ || count > 0;
+ typedState->IsValid_ |= count ? 1 : 0;
}
typedState->Value_ = value;
@@ -124,15 +157,51 @@ public:
return NUdf::TUnboxedValuePod(typedState->Value_);
}
+ void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ new(state) TState();
+ UpdateKey(state, columns, row);
+ }
+
+ void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ auto typedState = static_cast<TState*>(state);
+ const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
+ if (datum.is_scalar()) {
+ if (datum.scalar()->is_valid) {
+ typedState->Value_ = datum.scalar_as<TInScalar>().value;
+ typedState->IsValid_ = 1;
+ }
+ } else {
+ const auto& array = datum.array();
+ auto ptr = array->GetValues<TIn>(1);
+ if (array->GetNullCount() == 0) {
+ typedState->IsValid_ = 1;
+ typedState->Value_ = UpdateMinMax<IsMin>(typedState->Value_, ptr[row]);
+ } else {
+ auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
+ ui64 fullIndex = row + array->offset;
+ // bit 1 -> mask 0xFF..FF, bit 0 -> mask 0x00..00
+ TIn mask = (((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1) ^ 1) - TIn(1);
+ typedState->Value_ = UpdateMinMax<IsMin>(typedState->Value_, TIn((ptr[row] & mask) | (typedState->Value_ & ~mask)));
+ typedState->IsValid_ |= mask & 1;
+ }
+ }
+ }
+
+ std::unique_ptr<IAggColumnBuilder> MakeBuilder(ui64 size) final {
+ return std::make_unique<TColumnBuilder>(size, BuilderDataType_, Ctx_);
+ }
+
private:
const ui32 ArgColumn_;
+ const std::shared_ptr<arrow::DataType> BuilderDataType_;
};
-template <typename TIn, typename TInScalar, bool IsMin>
+template <typename TIn, typename TInScalar, typename TBuilder, bool IsMin>
class TMinMaxBlockAggregator: public TBlockAggregatorBase {
public:
struct TState {
TIn Value_;
+
TState() {
if constexpr (IsMin) {
Value_ = std::numeric_limits<TIn>::max();
@@ -142,9 +211,36 @@ public:
}
};
- TMinMaxBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn)
- : TBlockAggregatorBase(sizeof(TState), filterColumn)
+ class TColumnBuilder : public IAggColumnBuilder {
+ public:
+ TColumnBuilder(ui64 size, const std::shared_ptr<arrow::DataType>& dataType, TComputationContext& ctx)
+ : Builder_(dataType, &ctx.ArrowMemoryPool)
+ , Ctx_(ctx)
+ {
+ ARROW_OK(Builder_.Reserve(size));
+ }
+
+ void Add(const void* state) final {
+ auto typedState = static_cast<const TState*>(state);
+ Builder_.UnsafeAppend(typedState->Value_);
+ }
+
+ NUdf::TUnboxedValue Build() final {
+ std::shared_ptr<arrow::ArrayData> result;
+ ARROW_OK(Builder_.FinishInternal(&result));
+ return Ctx_.HolderFactory.CreateArrowBlock(result);
+ }
+
+ private:
+ TBuilder Builder_;
+ TComputationContext& Ctx_;
+ };
+
+ TMinMaxBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn,
+ const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx)
+ : TBlockAggregatorBase(sizeof(TState), filterColumn, ctx)
, ArgColumn_(argColumn)
+ , BuilderDataType_(builderDataType)
{
}
@@ -191,8 +287,26 @@ public:
return NUdf::TUnboxedValuePod(typedState->Value_);
}
+ void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ new(state) TState();
+ UpdateKey(state, columns, row);
+ }
+
+ void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ auto typedState = static_cast<TState*>(state);
+ const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
+ const auto& array = datum.array();
+ auto ptr = array->GetValues<TIn>(1);
+ typedState->Value_ = UpdateMinMax<IsMin>(typedState->Value_, ptr[row]);
+ }
+
+ std::unique_ptr<IAggColumnBuilder> MakeBuilder(ui64 size) final {
+ return std::make_unique<TColumnBuilder>(size, BuilderDataType_, Ctx_);
+ }
+
private:
const ui32 ArgColumn_;
+ const std::shared_ptr<arrow::DataType> BuilderDataType_;
};
template <bool IsMin>
@@ -202,8 +316,7 @@ public:
TTupleType* tupleType,
std::optional<ui32> filterColumn,
const std::vector<ui32>& argsColumns,
- const THolderFactory& holderFactory) const final {
- Y_UNUSED(holderFactory);
+ TComputationContext& ctx) const final {
auto blockType = AS_TYPE(TBlockType, tupleType->GetElementType(argsColumns[0]));
auto argType = blockType->GetItemType();
bool isOptional;
@@ -211,52 +324,52 @@ public:
if (blockType->GetShape() == TBlockType::EShape::Scalar || isOptional) {
switch (*dataType->GetDataSlot()) {
case NUdf::EDataSlot::Int8:
- return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<i8, arrow::Int8Scalar, IsMin>>(filterColumn, argsColumns[0]);
+ return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<i8, arrow::Int8Scalar, arrow::Int8Builder, IsMin>>(filterColumn, argsColumns[0], arrow::int8(), ctx);
case NUdf::EDataSlot::Bool:
case NUdf::EDataSlot::Uint8:
- return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<ui8, arrow::UInt8Scalar, IsMin>>(filterColumn, argsColumns[0]);
+ return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<ui8, arrow::UInt8Scalar, arrow::UInt8Builder, IsMin>>(filterColumn, argsColumns[0], arrow::uint8(), ctx);
case NUdf::EDataSlot::Int16:
- return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<i16, arrow::Int16Scalar, IsMin>>(filterColumn, argsColumns[0]);
+ return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<i16, arrow::Int16Scalar, arrow::Int16Builder, IsMin>>(filterColumn, argsColumns[0], arrow::int16(), ctx);
case NUdf::EDataSlot::Uint16:
case NUdf::EDataSlot::Date:
- return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<ui16, arrow::UInt16Scalar, IsMin>>(filterColumn, argsColumns[0]);
+ return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<ui16, arrow::UInt16Scalar, arrow::UInt16Builder, IsMin>>(filterColumn, argsColumns[0], arrow::uint16(), ctx);
case NUdf::EDataSlot::Int32:
- return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<i32, arrow::Int32Scalar, IsMin>>(filterColumn, argsColumns[0]);
+ return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<i32, arrow::Int32Scalar, arrow::Int32Builder, IsMin>>(filterColumn, argsColumns[0], arrow::int32(), ctx);
case NUdf::EDataSlot::Uint32:
case NUdf::EDataSlot::Datetime:
- return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<ui32, arrow::UInt32Scalar, IsMin>>(filterColumn, argsColumns[0]);
+ return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<ui32, arrow::UInt32Scalar, arrow::UInt32Builder, IsMin>>(filterColumn, argsColumns[0], arrow::uint32(), ctx);
case NUdf::EDataSlot::Int64:
case NUdf::EDataSlot::Interval:
- return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<i64, arrow::Int64Scalar, IsMin>>(filterColumn, argsColumns[0]);
+ return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<i64, arrow::Int64Scalar, arrow::Int64Builder, IsMin>>(filterColumn, argsColumns[0], arrow::int64(), ctx);
case NUdf::EDataSlot::Uint64:
case NUdf::EDataSlot::Timestamp:
- return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<ui64, arrow::UInt64Scalar, IsMin>>(filterColumn, argsColumns[0]);
+ return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<ui64, arrow::UInt64Scalar, arrow::UInt64Builder, IsMin>>(filterColumn, argsColumns[0], arrow::uint64(), ctx);
default:
throw yexception() << "Unsupported MIN/MAX input type";
}
} else {
switch (*dataType->GetDataSlot()) {
case NUdf::EDataSlot::Int8:
- return std::make_unique<TMinMaxBlockAggregator<i8, arrow::Int8Scalar, IsMin>>(filterColumn, argsColumns[0]);
+ return std::make_unique<TMinMaxBlockAggregator<i8, arrow::Int8Scalar, arrow::Int8Builder, IsMin>>(filterColumn, argsColumns[0], arrow::int8(), ctx);
case NUdf::EDataSlot::Uint8:
case NUdf::EDataSlot::Bool:
- return std::make_unique<TMinMaxBlockAggregator<ui8, arrow::UInt8Scalar, IsMin>>(filterColumn, argsColumns[0]);
+ return std::make_unique<TMinMaxBlockAggregator<ui8, arrow::UInt8Scalar, arrow::UInt8Builder, IsMin>>(filterColumn, argsColumns[0], arrow::uint8(), ctx);
case NUdf::EDataSlot::Int16:
- return std::make_unique<TMinMaxBlockAggregator<i16, arrow::Int16Scalar, IsMin>>(filterColumn, argsColumns[0]);
+ return std::make_unique<TMinMaxBlockAggregator<i16, arrow::Int16Scalar, arrow::Int16Builder, IsMin>>(filterColumn, argsColumns[0], arrow::int16(), ctx);
case NUdf::EDataSlot::Uint16:
case NUdf::EDataSlot::Date:
- return std::make_unique<TMinMaxBlockAggregator<ui16, arrow::UInt16Scalar, IsMin>>(filterColumn, argsColumns[0]);
+ return std::make_unique<TMinMaxBlockAggregator<ui16, arrow::UInt16Scalar, arrow::UInt16Builder, IsMin>>(filterColumn, argsColumns[0], arrow::uint16(), ctx);
case NUdf::EDataSlot::Int32:
- return std::make_unique<TMinMaxBlockAggregator<i32, arrow::Int32Scalar, IsMin>>(filterColumn, argsColumns[0]);
+ return std::make_unique<TMinMaxBlockAggregator<i32, arrow::Int32Scalar, arrow::Int32Builder, IsMin>>(filterColumn, argsColumns[0], arrow::int32(), ctx);
case NUdf::EDataSlot::Uint32:
case NUdf::EDataSlot::Datetime:
- return std::make_unique<TMinMaxBlockAggregator<ui32, arrow::UInt32Scalar, IsMin>>(filterColumn, argsColumns[0]);
+ return std::make_unique<TMinMaxBlockAggregator<ui32, arrow::UInt32Scalar, arrow::UInt32Builder, IsMin>>(filterColumn, argsColumns[0], arrow::uint32(), ctx);
case NUdf::EDataSlot::Int64:
case NUdf::EDataSlot::Interval:
- return std::make_unique<TMinMaxBlockAggregator<i64, arrow::Int64Scalar, IsMin>>(filterColumn, argsColumns[0]);
+ return std::make_unique<TMinMaxBlockAggregator<i64, arrow::Int64Scalar, arrow::Int64Builder, IsMin>>(filterColumn, argsColumns[0], arrow::int64(), ctx);
case NUdf::EDataSlot::Uint64:
case NUdf::EDataSlot::Timestamp:
- return std::make_unique<TMinMaxBlockAggregator<ui64, arrow::UInt64Scalar, IsMin>>(filterColumn, argsColumns[0]);
+ return std::make_unique<TMinMaxBlockAggregator<ui64, arrow::UInt64Scalar, arrow::UInt64Builder, IsMin>>(filterColumn, argsColumns[0], arrow::uint64(), ctx);
default:
throw yexception() << "Unsupported MIN/MAX input type";
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
index 347ad0a26e..74e2d884a3 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
@@ -4,23 +4,56 @@
#include <ydb/library/yql/minikql/mkql_node_cast.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
+#include <ydb/library/yql/minikql/arrow/arrow_defs.h>
#include <arrow/scalar.h>
+#include <arrow/array/builder_primitive.h>
namespace NKikimr {
namespace NMiniKQL {
-template <typename TIn, typename TSum, typename TInScalar>
+template <typename TIn, typename TSum, typename TBuilder, typename TInScalar>
class TSumBlockAggregatorNullableOrScalar : public TBlockAggregatorBase {
public:
struct TState {
TSum Sum_ = 0;
- bool IsValid_ = false;
+ ui8 IsValid_ = 0;
};
- TSumBlockAggregatorNullableOrScalar(std::optional<ui32> filterColumn, ui32 argColumn)
- : TBlockAggregatorBase(sizeof(TState), filterColumn)
+ class TColumnBuilder : public IAggColumnBuilder {
+ public:
+ TColumnBuilder(ui64 size, const std::shared_ptr<arrow::DataType>& dataType, TComputationContext& ctx)
+ : Builder_(dataType, &ctx.ArrowMemoryPool)
+ , Ctx_(ctx)
+ {
+ ARROW_OK(Builder_.Reserve(size));
+ }
+
+ void Add(const void* state) final {
+ auto typedState = static_cast<const TState*>(state);
+ if (typedState->IsValid_) {
+ Builder_.UnsafeAppend(typedState->Sum_);
+ } else {
+ Builder_.UnsafeAppendNull();
+ }
+ }
+
+ NUdf::TUnboxedValue Build() final {
+ std::shared_ptr<arrow::ArrayData> result;
+ ARROW_OK(Builder_.FinishInternal(&result));
+ return Ctx_.HolderFactory.CreateArrowBlock(result);
+ }
+
+ private:
+ TBuilder Builder_;
+ TComputationContext& Ctx_;
+ };
+
+ TSumBlockAggregatorNullableOrScalar(std::optional<ui32> filterColumn, ui32 argColumn,
+ const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx)
+ : TBlockAggregatorBase(sizeof(TState), filterColumn, ctx)
, ArgColumn_(argColumn)
+ , BuilderDataType_(builderDataType)
{
}
@@ -34,7 +67,7 @@ public:
if (datum.is_scalar()) {
if (datum.scalar()->is_valid) {
typedState->Sum_ += (filtered ? *filtered : batchLength) * datum.scalar_as<TInScalar>().value;
- typedState->IsValid_ = true;
+ typedState->IsValid_ = 1;
}
} else {
const auto& array = datum.array();
@@ -46,7 +79,7 @@ public:
}
if (!filtered) {
- typedState->IsValid_ = true;
+ typedState->IsValid_ = 1;
TSum sum = typedState->Sum_;
if (array->GetNullCount() == 0) {
for (int64_t i = 0; i < len; ++i) {
@@ -70,7 +103,7 @@ public:
auto filterBitmap = filterArray->template GetValues<uint8_t>(1, 0);
TSum sum = typedState->Sum_;
if (array->GetNullCount() == 0) {
- typedState->IsValid_ = true;
+ typedState->IsValid_ = 1;
for (int64_t i = 0; i < len; ++i) {
ui64 fullIndex = i + array->offset;
// bit 1 -> mask 0xFF..FF, bit 0 -> mask 0x00..00
@@ -90,7 +123,7 @@ public:
count += mask & 1;
}
- typedState->IsValid_ = typedState->IsValid_ || count > 0;
+ typedState->IsValid_ |= count ? 1 : 0;
}
typedState->Sum_ = sum;
@@ -107,20 +140,82 @@ public:
return NUdf::TUnboxedValuePod(typedState->Sum_);
}
+ void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ new(state) TState();
+ UpdateKey(state, columns, row);
+ }
+
+ void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ auto typedState = static_cast<TState*>(state);
+ const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
+ if (datum.is_scalar()) {
+ if (datum.scalar()->is_valid) {
+ typedState->Sum_ += datum.scalar_as<TInScalar>().value;
+ typedState->IsValid_ = 1;
+ }
+ } else {
+ const auto& array = datum.array();
+ auto ptr = array->GetValues<TIn>(1);
+ if (array->GetNullCount() == 0) {
+ typedState->IsValid_ = 1;
+ typedState->Sum_ += ptr[row];
+ } else {
+ auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
+ ui64 fullIndex = row + array->offset;
+ // bit 1 -> mask 0xFF..FF, bit 0 -> mask 0x00..00
+ TIn mask = (((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1) ^ 1) - TIn(1);
+ typedState->Sum_ += (ptr[row] & mask);
+ typedState->IsValid_ |= mask & 1;
+ }
+ }
+ }
+
+ std::unique_ptr<IAggColumnBuilder> MakeBuilder(ui64 size) final {
+ return std::make_unique<TColumnBuilder>(size, BuilderDataType_, Ctx_);
+ }
+
private:
const ui32 ArgColumn_;
+ const std::shared_ptr<arrow::DataType> BuilderDataType_;
};
-template <typename TIn, typename TSum, typename TInScalar>
+template <typename TIn, typename TSum, typename TBuilder, typename TInScalar>
class TSumBlockAggregator : public TBlockAggregatorBase {
public:
struct TState {
TSum Sum_ = 0;
};
- TSumBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn)
- : TBlockAggregatorBase(sizeof(TState), filterColumn)
+ class TColumnBuilder : public IAggColumnBuilder {
+ public:
+ TColumnBuilder(ui64 size, const std::shared_ptr<arrow::DataType>& dataType, TComputationContext& ctx)
+ : Builder_(dataType, &ctx.ArrowMemoryPool)
+ , Ctx_(ctx)
+ {
+ ARROW_OK(Builder_.Reserve(size));
+ }
+
+ void Add(const void* state) final {
+ auto typedState = static_cast<const TState*>(state);
+ Builder_.UnsafeAppend(typedState->Sum_);
+ }
+
+ NUdf::TUnboxedValue Build() final {
+ std::shared_ptr<arrow::ArrayData> result;
+ ARROW_OK(Builder_.FinishInternal(&result));
+ return Ctx_.HolderFactory.CreateArrowBlock(result);
+ }
+
+ private:
+ TBuilder Builder_;
+ TComputationContext& Ctx_;
+ };
+
+ TSumBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn,
+ const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx)
+ : TBlockAggregatorBase(sizeof(TState), filterColumn, ctx)
, ArgColumn_(argColumn)
+ , BuilderDataType_(builderDataType)
{
}
@@ -165,8 +260,26 @@ public:
return NUdf::TUnboxedValuePod(typedState->Sum_);
}
+ void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ new(state) TState();
+ UpdateKey(state, columns, row);
+ }
+
+ void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ auto typedState = static_cast<TState*>(state);
+ const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
+ const auto& array = datum.array();
+ auto ptr = array->GetValues<TIn>(1);
+ typedState->Sum_ += ptr[row];
+ }
+
+ std::unique_ptr<IAggColumnBuilder> MakeBuilder(ui64 size) final {
+ return std::make_unique<TColumnBuilder>(size, BuilderDataType_, Ctx_);
+ }
+
private:
const ui32 ArgColumn_;
+ const std::shared_ptr<arrow::DataType> BuilderDataType_;
};
template <typename TIn, typename TInScalar>
@@ -177,10 +290,9 @@ public:
ui64 Count_ = 0;
};
- TAvgBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn, const THolderFactory& holderFactory)
- : TBlockAggregatorBase(sizeof(TState), filterColumn)
+ TAvgBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
+ : TBlockAggregatorBase(sizeof(TState), filterColumn, ctx)
, ArgColumn_(argColumn)
- , HolderFactory_(holderFactory)
{
}
@@ -265,15 +377,49 @@ public:
}
NUdf::TUnboxedValue* items;
- auto arr = HolderFactory_.CreateDirectArrayHolder(2, items);
+ auto arr = Ctx_.HolderFactory.CreateDirectArrayHolder(2, items);
items[0] = NUdf::TUnboxedValuePod(typedState->Sum_);
items[1] = NUdf::TUnboxedValuePod(typedState->Count_);
return arr;
}
+ void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ new(state) TState();
+ UpdateKey(state, columns, row);
+ }
+
+ void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ auto typedState = static_cast<TState*>(state);
+ const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
+ if (datum.is_scalar()) {
+ if (datum.scalar()->is_valid) {
+ typedState->Sum_ += double(datum.scalar_as<TInScalar>().value);
+ typedState->Count_ += 1;
+ }
+ } else {
+ const auto& array = datum.array();
+ auto ptr = array->GetValues<TIn>(1);
+ if (array->GetNullCount() == 0) {
+ typedState->Sum_ += double(ptr[row]);
+ typedState->Count_ += 1;
+ } else {
+ auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
+ ui64 fullIndex = row + array->offset;
+ // bit 1 -> mask 0xFF..FF, bit 0 -> mask 0x00..00
+ TIn mask = (((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1) ^ 1) - TIn(1);
+ typedState->Sum_ += double(ptr[row] & mask);
+ typedState->Count_ += mask & 1;
+ }
+ }
+ }
+
+ std::unique_ptr<IAggColumnBuilder> MakeBuilder(ui64 size) final {
+ Y_UNUSED(size);
+ MKQL_ENSURE(false, "TODO: support of tuples");
+ }
+
private:
const ui32 ArgColumn_;
- const THolderFactory& HolderFactory_;
};
class TBlockSumFactory : public IBlockAggregatorFactory {
@@ -282,8 +428,7 @@ public:
TTupleType* tupleType,
std::optional<ui32> filterColumn,
const std::vector<ui32>& argsColumns,
- const THolderFactory& holderFactory) const final {
- Y_UNUSED(holderFactory);
+ TComputationContext& ctx) const final {
auto blockType = AS_TYPE(TBlockType, tupleType->GetElementType(argsColumns[0]));
auto argType = blockType->GetItemType();
bool isOptional;
@@ -291,50 +436,50 @@ public:
if (blockType->GetShape() == TBlockType::EShape::Scalar || isOptional) {
switch (*dataType->GetDataSlot()) {
case NUdf::EDataSlot::Int8:
- return std::make_unique<TSumBlockAggregatorNullableOrScalar<i8, i64, arrow::Int8Scalar>>(filterColumn, argsColumns[0]);
+ return std::make_unique<TSumBlockAggregatorNullableOrScalar<i8, i64, arrow::Int64Builder, arrow::Int8Scalar>>(filterColumn, argsColumns[0], arrow::int64(), ctx);
case NUdf::EDataSlot::Uint8:
- return std::make_unique<TSumBlockAggregatorNullableOrScalar<ui8, ui64, arrow::UInt8Scalar>>(filterColumn, argsColumns[0]);
+ return std::make_unique<TSumBlockAggregatorNullableOrScalar<ui8, ui64, arrow::UInt64Builder, arrow::UInt8Scalar>>(filterColumn, argsColumns[0], arrow::uint64(), ctx);
case NUdf::EDataSlot::Int16:
- return std::make_unique<TSumBlockAggregatorNullableOrScalar<i16, i64, arrow::Int16Scalar>>(filterColumn, argsColumns[0]);
+ return std::make_unique<TSumBlockAggregatorNullableOrScalar<i16, i64, arrow::Int64Builder, arrow::Int16Scalar>>(filterColumn, argsColumns[0], arrow::int64(), ctx);
case NUdf::EDataSlot::Uint16:
case NUdf::EDataSlot::Date:
- return std::make_unique<TSumBlockAggregatorNullableOrScalar<ui16, ui64, arrow::UInt16Scalar>>(filterColumn, argsColumns[0]);
+ return std::make_unique<TSumBlockAggregatorNullableOrScalar<ui16, ui64, arrow::UInt64Builder, arrow::UInt16Scalar>>(filterColumn, argsColumns[0], arrow::uint64(), ctx);
case NUdf::EDataSlot::Int32:
- return std::make_unique<TSumBlockAggregatorNullableOrScalar<i32, i64, arrow::Int32Scalar>>(filterColumn, argsColumns[0]);
+ return std::make_unique<TSumBlockAggregatorNullableOrScalar<i32, i64, arrow::Int64Builder, arrow::Int32Scalar>>(filterColumn, argsColumns[0], arrow::int64(), ctx);
case NUdf::EDataSlot::Uint32:
case NUdf::EDataSlot::Datetime:
- return std::make_unique<TSumBlockAggregatorNullableOrScalar<ui32, ui64, arrow::UInt32Scalar>>(filterColumn, argsColumns[0]);
+ return std::make_unique<TSumBlockAggregatorNullableOrScalar<ui32, ui64, arrow::UInt64Builder, arrow::UInt32Scalar>>(filterColumn, argsColumns[0], arrow::uint64(), ctx);
case NUdf::EDataSlot::Int64:
case NUdf::EDataSlot::Interval:
- return std::make_unique<TSumBlockAggregatorNullableOrScalar<i64, i64, arrow::Int64Scalar>>(filterColumn, argsColumns[0]);
+ return std::make_unique<TSumBlockAggregatorNullableOrScalar<i64, i64, arrow::Int64Builder, arrow::Int64Scalar>>(filterColumn, argsColumns[0], arrow::int64(), ctx);
case NUdf::EDataSlot::Uint64:
case NUdf::EDataSlot::Timestamp:
- return std::make_unique<TSumBlockAggregatorNullableOrScalar<ui64, ui64, arrow::UInt64Scalar>>(filterColumn, argsColumns[0]);
+ return std::make_unique<TSumBlockAggregatorNullableOrScalar<ui64, ui64, arrow::UInt64Builder, arrow::UInt64Scalar>>(filterColumn, argsColumns[0], arrow::uint64(), ctx);
default:
throw yexception() << "Unsupported SUM input type";
}
} else {
switch (*dataType->GetDataSlot()) {
case NUdf::EDataSlot::Int8:
- return std::make_unique<TSumBlockAggregator<i8, i64, arrow::Int8Scalar>>(filterColumn, argsColumns[0]);
+ return std::make_unique<TSumBlockAggregator<i8, i64, arrow::Int64Builder, arrow::Int8Scalar>>(filterColumn, argsColumns[0], arrow::int64(), ctx);
case NUdf::EDataSlot::Uint8:
- return std::make_unique<TSumBlockAggregator<ui8, ui64, arrow::UInt8Scalar>>(filterColumn, argsColumns[0]);
+ return std::make_unique<TSumBlockAggregator<ui8, ui64, arrow::UInt64Builder, arrow::UInt8Scalar>>(filterColumn, argsColumns[0], arrow::uint64(), ctx);
case NUdf::EDataSlot::Int16:
- return std::make_unique<TSumBlockAggregator<i16, i64, arrow::Int16Scalar>>(filterColumn, argsColumns[0]);
+ return std::make_unique<TSumBlockAggregator<i16, i64, arrow::Int64Builder, arrow::Int16Scalar>>(filterColumn, argsColumns[0], arrow::int64(), ctx);
case NUdf::EDataSlot::Uint16:
case NUdf::EDataSlot::Date:
- return std::make_unique<TSumBlockAggregator<ui16, ui64, arrow::UInt16Scalar>>(filterColumn, argsColumns[0]);
+ return std::make_unique<TSumBlockAggregator<ui16, ui64, arrow::UInt64Builder, arrow::UInt16Scalar>>(filterColumn, argsColumns[0], arrow::uint64(), ctx);
case NUdf::EDataSlot::Int32:
- return std::make_unique<TSumBlockAggregator<i32, i64, arrow::Int32Scalar>>(filterColumn, argsColumns[0]);
+ return std::make_unique<TSumBlockAggregator<i32, i64, arrow::Int64Builder, arrow::Int32Scalar>>(filterColumn, argsColumns[0], arrow::int64(), ctx);
case NUdf::EDataSlot::Uint32:
case NUdf::EDataSlot::Datetime:
- return std::make_unique<TSumBlockAggregator<ui32, ui64, arrow::UInt32Scalar>>(filterColumn, argsColumns[0]);
+ return std::make_unique<TSumBlockAggregator<ui32, ui64, arrow::UInt64Builder, arrow::UInt32Scalar>>(filterColumn, argsColumns[0], arrow::uint64(), ctx);
case NUdf::EDataSlot::Int64:
case NUdf::EDataSlot::Interval:
- return std::make_unique<TSumBlockAggregator<i64, i64, arrow::Int64Scalar>>(filterColumn, argsColumns[0]);
+ return std::make_unique<TSumBlockAggregator<i64, i64, arrow::Int64Builder, arrow::Int64Scalar>>(filterColumn, argsColumns[0], arrow::int64(), ctx);
case NUdf::EDataSlot::Uint64:
case NUdf::EDataSlot::Timestamp:
- return std::make_unique<TSumBlockAggregator<ui64, ui64, arrow::UInt64Scalar>>(filterColumn, argsColumns[0]);
+ return std::make_unique<TSumBlockAggregator<ui64, ui64, arrow::UInt64Builder, arrow::UInt64Scalar>>(filterColumn, argsColumns[0], arrow::uint64(), ctx);
default:
throw yexception() << "Unsupported SUM input type";
}
@@ -348,31 +493,31 @@ public:
TTupleType* tupleType,
std::optional<ui32> filterColumn,
const std::vector<ui32>& argsColumns,
- const THolderFactory& holderFactory) const final {
+ TComputationContext& ctx) const final {
auto argType = AS_TYPE(TBlockType, tupleType->GetElementType(argsColumns[0]))->GetItemType();
bool isOptional;
auto dataType = UnpackOptionalData(argType, isOptional);
switch (*dataType->GetDataSlot()) {
case NUdf::EDataSlot::Int8:
- return std::make_unique<TAvgBlockAggregator<i8, arrow::Int8Scalar>>(filterColumn, argsColumns[0], holderFactory);
+ return std::make_unique<TAvgBlockAggregator<i8, arrow::Int8Scalar>>(filterColumn, argsColumns[0], ctx);
case NUdf::EDataSlot::Uint8:
- return std::make_unique<TAvgBlockAggregator<ui8, arrow::UInt8Scalar>>(filterColumn, argsColumns[0], holderFactory);
+ return std::make_unique<TAvgBlockAggregator<ui8, arrow::UInt8Scalar>>(filterColumn, argsColumns[0], ctx);
case NUdf::EDataSlot::Int16:
- return std::make_unique<TAvgBlockAggregator<i16, arrow::Int16Scalar>>(filterColumn, argsColumns[0], holderFactory);
+ return std::make_unique<TAvgBlockAggregator<i16, arrow::Int16Scalar>>(filterColumn, argsColumns[0], ctx);
case NUdf::EDataSlot::Uint16:
case NUdf::EDataSlot::Date:
- return std::make_unique<TAvgBlockAggregator<ui16, arrow::UInt16Scalar>>(filterColumn, argsColumns[0], holderFactory);
+ return std::make_unique<TAvgBlockAggregator<ui16, arrow::UInt16Scalar>>(filterColumn, argsColumns[0], ctx);
case NUdf::EDataSlot::Int32:
- return std::make_unique<TAvgBlockAggregator<i32, arrow::Int32Scalar>>(filterColumn, argsColumns[0], holderFactory);
+ return std::make_unique<TAvgBlockAggregator<i32, arrow::Int32Scalar>>(filterColumn, argsColumns[0], ctx);
case NUdf::EDataSlot::Uint32:
case NUdf::EDataSlot::Datetime:
- return std::make_unique<TAvgBlockAggregator<ui32, arrow::UInt32Scalar>>(filterColumn, argsColumns[0], holderFactory);
+ return std::make_unique<TAvgBlockAggregator<ui32, arrow::UInt32Scalar>>(filterColumn, argsColumns[0], ctx);
case NUdf::EDataSlot::Int64:
case NUdf::EDataSlot::Interval:
- return std::make_unique<TAvgBlockAggregator<i64, arrow::Int64Scalar>>(filterColumn, argsColumns[0], holderFactory);
+ return std::make_unique<TAvgBlockAggregator<i64, arrow::Int64Scalar>>(filterColumn, argsColumns[0], ctx);
case NUdf::EDataSlot::Uint64:
case NUdf::EDataSlot::Timestamp:
- return std::make_unique<TAvgBlockAggregator<ui64, arrow::UInt64Scalar>>(filterColumn, argsColumns[0], holderFactory);
+ return std::make_unique<TAvgBlockAggregator<ui64, arrow::UInt64Scalar>>(filterColumn, argsColumns[0], ctx);
default:
throw yexception() << "Unsupported AVG input type";
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp
index ea59eb766e..ddf51de9bd 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp
@@ -282,6 +282,7 @@ struct TCallableComputationNodeBuilderFuncMapFiller {
{"BlockNot", &WrapBlockNot},
{"BlockCompress", &WrapBlockCompress},
{"BlockCombineAll", &WrapBlockCombineAll},
+ {"BlockCombineHashed", &WrapBlockCombineHashed},
{"MakeHeap", &WrapMakeHeap},
{"PushHeap", &WrapPushHeap},
{"PopHeap", &WrapPopHeap},
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_rh_hash.h b/ydb/library/yql/minikql/comp_nodes/mkql_rh_hash.h
index 631d08c388..863f8cfa82 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_rh_hash.h
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_rh_hash.h
@@ -20,8 +20,8 @@ protected:
}
public:
- // returns payload pointer for Map or nullptr for Set
- Y_FORCE_INLINE void* Insert(TKey key, bool& isNew) {
+ // returns iterator
+ Y_FORCE_INLINE char* Insert(TKey key, bool& isNew) {
auto ret = InsertImpl(key, isNew, Capacity, Data);
Size += isNew ? 1 : 0;
return ret;
@@ -87,30 +87,30 @@ public:
}
private:
- Y_FORCE_INLINE void* InsertImpl(TKey key, bool& isNew, ui64 capacity, std::vector<char>& data) {
+ Y_FORCE_INLINE char* InsertImpl(TKey key, bool& isNew, ui64 capacity, std::vector<char>& data) {
isNew = false;
ui64 bucket = THash()(key) & (capacity - 1);
char* ptr = data.data() + AsDeriv().GetCellSize() * bucket;
TPSLStorage distance = 0;
- void* returnPayload;
+ char* returnPtr;
for (;;) {
if (GetPSL(ptr) < 0) {
isNew = true;
GetPSL(ptr) = distance;
GetKey(ptr) = key;
- return GetPayload(ptr);
+ return ptr;
}
if (TEqual()(GetKey(ptr), key)) {
- return GetPayload(ptr);
+ return ptr;
}
if (distance > GetPSL(ptr)) {
// swap keys & state
- returnPayload = GetPayload(ptr);
+ returnPtr = ptr;
std::swap(distance, GetPSL(ptr));
std::swap(key, GetKey(ptr));
- AsDeriv().SavePayload(returnPayload);
+ AsDeriv().SavePayload(GetPayload(ptr));
isNew = true;
++distance;
@@ -127,7 +127,7 @@ private:
GetPSL(ptr) = distance;
GetKey(ptr) = key;
AsDeriv().RestorePayload(GetPayload(ptr));
- return returnPayload; // for original key
+ return returnPtr; // for original key
}
if (distance > GetPSL(ptr)) {
@@ -152,8 +152,9 @@ private:
}
bool isNew;
- auto payload = InsertImpl(GetKey(iter), isNew, newCapacity, newData);
- AsDeriv().CopyPayload(payload, GetPayload(iter));
+ auto newIter = InsertImpl(GetKey(iter), isNew, newCapacity, newData);
+ Y_ASSERT(isNew);
+ AsDeriv().CopyPayload(GetPayload(newIter), GetPayload(iter));
}
Data.swap(newData);
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_rh_hash_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_rh_hash_ut.cpp
index be87bd1ed3..d47bcb319a 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_rh_hash_ut.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_rh_hash_ut.cpp
@@ -16,12 +16,15 @@ Y_UNIT_TEST_SUITE(TMiniKQLRobinHoodHashTest) {
auto k = i % 1000;
auto [it, inserted] = h.emplace(k, 0);
bool isNew;
- void* p = rh.Insert(k, isNew);
+ auto iter = rh.Insert(k, isNew);
+ UNIT_ASSERT_VALUES_EQUAL(rh.GetKey(iter), k);
UNIT_ASSERT_VALUES_EQUAL(isNew, inserted);
it->second += i;
- *(i64*)p += i;
if (isNew) {
+ *(i64*)rh.GetPayload(iter) = i;
rh.CheckGrow();
+ } else {
+ *(i64*)rh.GetPayload(iter) += i;
}
UNIT_ASSERT_VALUES_EQUAL(h.size(), rh.GetSize());
@@ -49,8 +52,8 @@ Y_UNIT_TEST_SUITE(TMiniKQLRobinHoodHashTest) {
auto k = i % 1000;
auto[it, inserted] = h.emplace(k);
bool isNew;
- void* p = rh.Insert(k, isNew);
- Y_UNUSED(p);
+ auto iter = rh.Insert(k, isNew);
+ UNIT_ASSERT_VALUES_EQUAL(rh.GetKey(iter), k);
UNIT_ASSERT_VALUES_EQUAL(isNew, inserted);
if (isNew) {
rh.CheckGrow();
diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp
index d675b7c6b2..ffe022d2ef 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.cpp
+++ b/ydb/library/yql/minikql/mkql_program_builder.cpp
@@ -5338,6 +5338,42 @@ TRuntimeNode TProgramBuilder::BlockCombineAll(TRuntimeNode flow, ui32 countColum
return TRuntimeNode(builder.Build(), false);
}
+TRuntimeNode TProgramBuilder::BlockCombineHashed(TRuntimeNode flow, ui32 countColumn, std::optional<ui32> filterColumn, const TArrayRef<ui32>& keys,
+ const TArrayRef<const TAggInfo>& aggs, TType* returnType) {
+ if constexpr (RuntimeVersion < 31U) {
+ THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
+ }
+
+ TCallableBuilder builder(Env, __func__, returnType);
+ builder.Add(flow);
+ builder.Add(NewDataLiteral<ui32>(countColumn));
+ if (!filterColumn) {
+ builder.Add(NewEmptyOptionalDataLiteral(NUdf::TDataType<ui32>::Id));
+ } else {
+ builder.Add(NewOptional(NewDataLiteral<ui32>(*filterColumn)));
+ }
+
+ TVector<TRuntimeNode> keyNodes;
+ for (const auto& key : keys) {
+ keyNodes.push_back(NewDataLiteral<ui32>(key));
+ }
+
+ builder.Add(NewTuple(keyNodes));
+ TVector<TRuntimeNode> aggsNodes;
+ for (const auto& agg : aggs) {
+ TVector<TRuntimeNode> params;
+ params.push_back(NewDataLiteral<NUdf::EDataSlot::String>(agg.Name));
+ for (const auto& col : agg.ArgsColumns) {
+ params.push_back(NewDataLiteral<ui32>(col));
+ }
+
+ aggsNodes.push_back(NewTuple(params));
+ }
+
+ builder.Add(NewTuple(aggsNodes));
+ return TRuntimeNode(builder.Build(), false);
+}
+
bool CanExportType(TType* type, const TTypeEnvironment& env) {
if (type->GetKind() == TType::EKind::Type) {
return false; // Type of Type
diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h
index 265e22af61..868be9f577 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.h
+++ b/ydb/library/yql/minikql/mkql_program_builder.h
@@ -259,6 +259,8 @@ public:
TRuntimeNode BlockBitCast(TRuntimeNode value, TType* targetType);
TRuntimeNode BlockCombineAll(TRuntimeNode flow, ui32 countColumn, std::optional<ui32> filterColumn,
const TArrayRef<const TAggInfo>& aggs, TType* returnType);
+ TRuntimeNode BlockCombineHashed(TRuntimeNode flow, ui32 countColumn, std::optional<ui32> filterColumn, const TArrayRef<ui32>& keys,
+ const TArrayRef<const TAggInfo>& aggs, TType* returnType);
// udfs
TRuntimeNode Udf(
diff --git a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
index 132d16041d..071837afd9 100644
--- a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
+++ b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
@@ -2427,6 +2427,34 @@ TMkqlCommonCallableCompiler::TShared::TShared() {
return ctx.ProgramBuilder.BlockCombineAll(arg, countColumn, filterColumn, aggs, returnType);
});
+ AddCallable("BlockCombineHashed", [](const TExprNode& node, TMkqlBuildContext& ctx) {
+ auto arg = MkqlBuildExpr(*node.Child(0), ctx);
+ ui32 countColumn = FromString<ui32>(node.Child(1)->Content());
+ std::optional<ui32> filterColumn;
+ if (!node.Child(2)->IsCallable("Void")) {
+ filterColumn = FromString<ui32>(node.Child(2)->Content());
+ }
+
+ TVector<ui32> keys;
+ for (const auto& key : node.Child(3)->Children()) {
+ keys.push_back(FromString<ui32>(key->Content()));
+ }
+
+ TVector<TAggInfo> aggs;
+ for (const auto& agg : node.Child(4)->Children()) {
+ TAggInfo info;
+ info.Name = TString(agg->Head().Head().Content());
+ for (ui32 i = 1; i < agg->ChildrenSize(); ++i) {
+ info.ArgsColumns.push_back(FromString<ui32>(agg->Child(i)->Content()));
+ }
+
+ aggs.push_back(info);
+ }
+
+ auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
+ return ctx.ProgramBuilder.BlockCombineHashed(arg, countColumn, filterColumn, keys, aggs, returnType);
+ });
+
AddCallable("BlockCompress", [](const TExprNode& node, TMkqlBuildContext& ctx) {
const auto flow = MkqlBuildExpr(node.Head(), ctx);
const auto index = FromString<ui32>(node.Child(1)->Content());