aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2022-12-09 20:12:40 +0300
committervvvv <vvvv@ydb.tech>2022-12-09 20:12:40 +0300
commite88d560cbbd3a1551449244afb37345443487919 (patch)
tree7d9185776a8656a9d7bd1abc11066d6e438ca46b
parent66415f6b8919ea1c545c2ba15284cde3efa93d99 (diff)
downloadydb-e88d560cbbd3a1551449244afb37345443487919.tar.gz
get rid of count column, always use last one. SSO keys with arena
-rw-r--r--ydb/library/yql/core/expr_nodes/yql_expr_nodes.json12
-rw-r--r--ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp15
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_blocks.cpp88
-rw-r--r--ydb/library/yql/core/yql_aggregate_expander.cpp13
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp388
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_rh_hash.h32
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.cpp6
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.h4
-rw-r--r--ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp20
9 files changed, 332 insertions, 246 deletions
diff --git a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
index 696777c89e5..0c31c725461 100644
--- a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
+++ b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
@@ -2247,9 +2247,8 @@
"Base": "TCoInputBase",
"Match": {"Type": "Callable", "Name": "BlockCombineAll"},
"Children": [
- {"Index": 1, "Name": "CountColumn", "Type": "TCoAtom"},
- {"Index": 2, "Name": "FilterColumn", "Type": "TExprBase"},
- {"Index": 3, "Name": "Aggregations", "Type": "TExprList"}
+ {"Index": 1, "Name": "FilterColumn", "Type": "TExprBase"},
+ {"Index": 2, "Name": "Aggregations", "Type": "TExprList"}
]
},
{
@@ -2257,10 +2256,9 @@
"Base": "TCoInputBase",
"Match": {"Type": "Callable", "Name": "BlockCombineHashed"},
"Children": [
- {"Index": 1, "Name": "CountColumn", "Type": "TCoAtom"},
- {"Index": 2, "Name": "FilterColumn", "Type": "TExprBase"},
- {"Index": 3, "Name": "Keys", "Type": "TCoAtomList"},
- {"Index": 4, "Name": "Aggregations", "Type": "TExprList"}
+ {"Index": 1, "Name": "FilterColumn", "Type": "TExprBase"},
+ {"Index": 2, "Name": "Keys", "Type": "TCoAtomList"},
+ {"Index": 3, "Name": "Aggregations", "Type": "TExprList"}
]
}
]
diff --git a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
index 9ba65645e0f..7e879b086ea 100644
--- a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
+++ b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
@@ -4884,17 +4884,16 @@ TExprNode::TPtr OptimizeSkipTakeToBlocks(const TExprNode::TPtr& node, TExprConte
TExprNode::TPtr UpdateBlockCombineAllColumns(const TExprNode::TPtr& node, std::optional<ui32> filterColumn, const TVector<ui32>& argIndices, TExprContext& ctx) {
auto combineChildren = node->ChildrenList();
combineChildren[0] = node->Head().HeadPtr();
- combineChildren[1] = ctx.NewAtom(node->Pos(), ToString(argIndices[FromString<ui32>(combineChildren[1]->Content())]));
if (filterColumn) {
- YQL_ENSURE(combineChildren[2]->IsCallable("Void"), "Filter column is already used");
- combineChildren[2] = ctx.NewAtom(node->Pos(), ToString(*filterColumn));
+ YQL_ENSURE(combineChildren[1]->IsCallable("Void"), "Filter column is already used");
+ combineChildren[1] = ctx.NewAtom(node->Pos(), ToString(*filterColumn));
} else {
- if (!combineChildren[2]->IsCallable("Void")) {
- combineChildren[2] = ctx.NewAtom(node->Pos(), ToString(argIndices[FromString<ui32>(combineChildren[2]->Content())]));
+ if (!combineChildren[1]->IsCallable("Void")) {
+ combineChildren[1] = ctx.NewAtom(node->Pos(), ToString(argIndices[FromString<ui32>(combineChildren[2]->Content())]));
}
}
- auto payloadNodes = combineChildren[3]->ChildrenList();
+ auto payloadNodes = combineChildren[2]->ChildrenList();
for (auto& p : payloadNodes) {
YQL_ENSURE(p->IsList() && p->ChildrenSize() >= 1 && p->Head().IsCallable("AggBlockApply"), "Expected AggBlockApply");
auto payloadArgs = p->ChildrenList();
@@ -4905,7 +4904,7 @@ TExprNode::TPtr UpdateBlockCombineAllColumns(const TExprNode::TPtr& node, std::o
p = ctx.ChangeChildren(*p, std::move(payloadArgs));
}
- combineChildren[3] = ctx.ChangeChildren(*combineChildren[3], std::move(payloadNodes));
+ combineChildren[2] = ctx.ChangeChildren(*combineChildren[2], std::move(payloadNodes));
return ctx.ChangeChildren(*node, std::move(combineChildren));
}
@@ -4921,7 +4920,7 @@ TExprNode::TPtr OptimizeBlockCombineAll(const TExprNode::TPtr& node, TExprContex
}
}
- if (node->Head().IsCallable("BlockCompress") && node->Child(2)->IsCallable("Void")) {
+ if (node->Head().IsCallable("BlockCompress") && node->Child(1)->IsCallable("Void")) {
auto filterIndex = FromString<ui32>(node->Head().Child(1)->Content());
TVector<ui32> argIndices;
argIndices.resize(node->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>()->GetSize());
diff --git a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp
index c6c09dcd8ea..bcf423eac71 100644
--- a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp
@@ -295,57 +295,33 @@ bool ValidateBlockAggs(TPositionHandle pos, const TTypeAnnotationNode::TListType
IGraphTransformer::TStatus BlockCombineAllWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) {
Y_UNUSED(output);
- if (!EnsureArgsCount(*input, 4U, ctx.Expr)) {
+ if (!EnsureArgsCount(*input, 3U, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
- if (!EnsureWideFlowType(input->Head(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- const auto multiType = input->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>();
- TTypeAnnotationNode::TListType inputItems;
- for (const auto& type : multiType->GetItems()) {
- if (!EnsureBlockOrScalarType(input->Pos(), *type, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- bool isScalar;
- inputItems.push_back(GetBlockItemType(*type, isScalar));
- }
-
- if (!EnsureAtom(*input->Child(1), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- ui32 countColumnIndex;
- if (!TryFromString(input->Child(1)->Content(), countColumnIndex) || countColumnIndex >= inputItems.size()) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Bad count column index"));
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!EnsureSpecificDataType(input->Child(1)->Pos(), *inputItems[countColumnIndex], EDataSlot::Uint64, ctx.Expr)) {
+ TTypeAnnotationNode::TListType blockItemTypes;
+ if (!EnsureWideFlowBlockType(input->Head(), blockItemTypes, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
- if (!input->Child(2)->IsCallable("Void")) {
- if (!EnsureAtom(*input->Child(2), ctx.Expr)) {
+ if (!input->Child(1)->IsCallable("Void")) {
+ if (!EnsureAtom(*input->Child(1), ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
ui32 filterColumnIndex;
- if (!TryFromString(input->Child(2)->Content(), filterColumnIndex) || filterColumnIndex >= inputItems.size()) {
+ if (!TryFromString(input->Child(1)->Content(), filterColumnIndex) || filterColumnIndex >= blockItemTypes.size()) {
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Bad filter column index"));
return IGraphTransformer::TStatus::Error;
}
- if (!EnsureSpecificDataType(input->Child(2)->Pos(), *inputItems[filterColumnIndex], EDataSlot::Bool, ctx.Expr)) {
+ if (!EnsureSpecificDataType(input->Child(1)->Pos(), *blockItemTypes[filterColumnIndex], EDataSlot::Bool, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
}
TTypeAnnotationNode::TListType retMultiType;
- if (!ValidateBlockAggs(input->Pos(), inputItems, *input->Child(3), retMultiType, ctx.Expr)) {
+ if (!ValidateBlockAggs(input->Pos(), blockItemTypes, *input->Child(2), retMultiType, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
@@ -356,75 +332,51 @@ IGraphTransformer::TStatus BlockCombineAllWrapper(const TExprNode::TPtr& input,
IGraphTransformer::TStatus BlockCombineHashedWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) {
Y_UNUSED(output);
- if (!EnsureArgsCount(*input, 5U, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!EnsureWideFlowType(input->Head(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- const auto multiType = input->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>();
- TTypeAnnotationNode::TListType inputItems;
- for (const auto& type : multiType->GetItems()) {
- if (!EnsureBlockOrScalarType(input->Pos(), *type, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- bool isScalar;
- inputItems.push_back(GetBlockItemType(*type, isScalar));
- }
-
- if (!EnsureAtom(*input->Child(1), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- ui32 countColumnIndex;
- if (!TryFromString(input->Child(1)->Content(), countColumnIndex) || countColumnIndex >= inputItems.size()) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Bad count column index"));
+ if (!EnsureArgsCount(*input, 4U, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
- if (!EnsureSpecificDataType(input->Child(1)->Pos(), *inputItems[countColumnIndex], EDataSlot::Uint64, ctx.Expr)) {
+ TTypeAnnotationNode::TListType blockItemTypes;
+ if (!EnsureWideFlowBlockType(input->Head(), blockItemTypes, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
- if (!input->Child(2)->IsCallable("Void")) {
- if (!EnsureAtom(*input->Child(2), ctx.Expr)) {
+ if (!input->Child(1)->IsCallable("Void")) {
+ if (!EnsureAtom(*input->Child(1), ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
ui32 filterColumnIndex;
- if (!TryFromString(input->Child(2)->Content(), filterColumnIndex) || filterColumnIndex >= inputItems.size()) {
+ if (!TryFromString(input->Child(1)->Content(), filterColumnIndex) || filterColumnIndex >= blockItemTypes.size()) {
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Bad filter column index"));
return IGraphTransformer::TStatus::Error;
}
- if (!EnsureSpecificDataType(input->Child(2)->Pos(), *inputItems[filterColumnIndex], EDataSlot::Bool, ctx.Expr)) {
+ if (!EnsureSpecificDataType(input->Child(1)->Pos(), *blockItemTypes[filterColumnIndex], EDataSlot::Bool, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
}
- if (!EnsureTupleMinSize(*input->Child(3), 1, ctx.Expr)) {
+ if (!EnsureTupleMinSize(*input->Child(2), 1, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
TTypeAnnotationNode::TListType retMultiType;
- for (auto child : input->Child(3)->Children()) {
+ for (auto child : input->Child(2)->Children()) {
if (!EnsureAtom(*child, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
ui32 keyColumnIndex;
- if (!TryFromString(child->Content(), keyColumnIndex) || keyColumnIndex >= inputItems.size()) {
+ if (!TryFromString(child->Content(), keyColumnIndex) || keyColumnIndex >= blockItemTypes.size()) {
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Bad key column index"));
return IGraphTransformer::TStatus::Error;
}
- retMultiType.push_back(inputItems[keyColumnIndex]);
+ retMultiType.push_back(blockItemTypes[keyColumnIndex]);
}
- if (!ValidateBlockAggs(input->Pos(), inputItems, *input->Child(4), retMultiType, ctx.Expr)) {
+ if (!ValidateBlockAggs(input->Pos(), blockItemTypes, *input->Child(3), retMultiType, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
diff --git a/ydb/library/yql/core/yql_aggregate_expander.cpp b/ydb/library/yql/core/yql_aggregate_expander.cpp
index 52d6ab24329..6ffce1fd3d0 100644
--- a/ydb/library/yql/core/yql_aggregate_expander.cpp
+++ b/ydb/library/yql/core/yql_aggregate_expander.cpp
@@ -589,7 +589,6 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAllOrHashed() {
outputColumns.push_back(TString(FinalColumnNames[index]->Content()));
}
- auto mappedWidth = extractorRoots.size();
auto extractorLambda = Ctx.NewLambda(Node->Pos(), Ctx.NewArguments(Node->Pos(), std::move(extractorArgs)), std::move(extractorRoots));
auto mappedWideFlow = Ctx.NewCallable(Node->Pos(), "WideMap", { wideFlow, extractorLambda });
auto blocks = Ctx.NewCallable(Node->Pos(), "WideToBlocks", { mappedWideFlow });
@@ -599,11 +598,10 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAllOrHashed() {
.Callable("WideFromBlocks")
.Callable(0, "BlockCombineHashed")
.Add(0, blocks)
- .Atom(1, ToString(mappedWidth))
- .Callable(2, "Void")
+ .Callable(1, "Void")
.Seal()
- .Add(3, Ctx.NewList(Node->Pos(), std::move(keyIdxs)))
- .Add(4, Ctx.NewList(Node->Pos(), std::move(aggs)))
+ .Add(2, Ctx.NewList(Node->Pos(), std::move(keyIdxs)))
+ .Add(3, Ctx.NewList(Node->Pos(), std::move(aggs)))
.Seal()
.Seal()
.Build();
@@ -611,10 +609,9 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAllOrHashed() {
aggWideFlow = Ctx.Builder(Node->Pos())
.Callable("BlockCombineAll")
.Add(0, blocks)
- .Atom(1, ToString(mappedWidth))
- .Callable(2, "Void")
+ .Callable(1, "Void")
.Seal()
- .Add(3, Ctx.NewList(Node->Pos(), std::move(aggs)))
+ .Add(2, Ctx.NewList(Node->Pos(), std::move(aggs)))
.Seal()
.Build();
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
index 7524df98221..46b5fd1b6da 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
@@ -19,6 +19,96 @@ namespace NMiniKQL {
namespace {
+class TSSOKey {
+public:
+ static constexpr size_t SSO_Length = 16;
+ static_assert(SSO_Length < 128); // should fit into 7 bits
+
+ static bool CanBeInplace(TStringBuf data) {
+ return data.Size() + 1 <= sizeof(TSSOKey);
+ }
+
+ static TSSOKey Inplace(TStringBuf data) {
+ Y_ASSERT(CanBeInplace(data));
+ TSSOKey ret(1 | (data.Size() << 1), 0);
+ memcpy(ret.U.I.Buffer_, data.Data(), data.Size());
+ return ret;
+ }
+
+ static TSSOKey External(TStringBuf data) {
+ return TSSOKey(data.Size() << 1, data.Data());
+ }
+
+ bool IsInplace() const {
+ return U.I.SmallLength_ & 1;
+ }
+
+ TStringBuf AsView() const {
+ if (IsInplace()) {
+ // inplace
+ return TStringBuf(U.I.Buffer_, U.I.SmallLength_ >> 1);
+ } else {
+ // external
+ return TStringBuf(U.E.Ptr_, U.E.Length_ >> 1);
+ }
+ }
+
+ void UpdateExternalPointer(const char *ptr) {
+ Y_ASSERT(!IsInplace());
+ U.E.Ptr_ = ptr;
+ }
+
+private:
+ TSSOKey(ui64 length, const char* ptr) {
+ U.E.Length_ = length;
+ U.E.Ptr_ = ptr;
+ }
+
+private:
+ union {
+ struct TExternal {
+ ui64 Length_;
+ const char* Ptr_;
+ } E;
+ struct TInplace {
+ ui8 SmallLength_;
+ char Buffer_[SSO_Length];
+ } I;
+ } U;
+};
+
+}
+}
+}
+
+namespace std {
+ template <>
+ struct hash<NKikimr::NMiniKQL::TSSOKey> {
+ using argument_type = NKikimr::NMiniKQL::TSSOKey;
+ using result_type = size_t;
+ inline result_type operator()(argument_type const& s) const noexcept {
+ return std::hash<std::string_view>()(s.AsView());
+ }
+ };
+
+ template <>
+ struct equal_to<NKikimr::NMiniKQL::TSSOKey> {
+ using argument_type = NKikimr::NMiniKQL::TSSOKey;
+ bool operator()(argument_type x, argument_type y) const {
+ return x.AsView() == y.AsView();
+ }
+ bool operator()(argument_type x, TStringBuf y) const {
+ return x.AsView() == y;
+ }
+ using is_transparent = void;
+ };
+}
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+namespace {
+
struct TAggParams {
TStringBuf Name;
TTupleType* TupleType;
@@ -213,17 +303,16 @@ class TBlockCombineAllWrapper : public TStatefulWideFlowComputationNode<TBlockCo
public:
TBlockCombineAllWrapper(TComputationMutables& mutables,
IComputationWideFlowNode* flow,
- ui32 countColumn,
std::optional<ui32> filterColumn,
size_t width,
TVector<TAggParams>&& aggsParams)
: TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any)
, Flow_(flow)
- , CountColumn_(countColumn)
, FilterColumn_(filterColumn)
, Width_(width)
, AggsParams_(std::move(aggsParams))
{
+ MKQL_ENSURE(Width_ > 0, "Missing block length column");
}
EFetchResult DoCalculate(NUdf::TUnboxedValue& state,
@@ -343,127 +432,75 @@ private:
}
ui64 GetBatchLength(const NUdf::TUnboxedValue* columns) const {
- return TArrowBlock::From(columns[CountColumn_]).GetDatum().scalar_as<arrow::UInt64Scalar>().value;
+ return TArrowBlock::From(columns[Width_ - 1]).GetDatum().scalar_as<arrow::UInt64Scalar>().value;
}
private:
IComputationWideFlowNode* Flow_;
- const ui32 CountColumn_;
std::optional<ui32> FilterColumn_;
const size_t Width_;
const TVector<TAggParams> AggsParams_;
};
-class TBlockCombineHashedWrapper : public TStatefulWideFlowComputationNode<TBlockCombineHashedWrapper> {
+template <typename T>
+T MakeKey(TStringBuf s) {
+ Y_ASSERT(s.Size() <= sizeof(T));
+ return *(const T*)s.Data();
+}
+
+template <>
+TSSOKey MakeKey(TStringBuf s) {
+ if (TSSOKey::CanBeInplace(s)) {
+ return TSSOKey::Inplace(s);
+ } else {
+ return TSSOKey::External(s);
+ }
+}
+
+void MoveKeyToArena(TSSOKey& key, TPagedArena& arena) {
+ if (key.IsInplace()) {
+ return;
+ }
+
+ auto view = key.AsView();
+ auto ptr = (char*)arena.Alloc(view.Size());
+ memcpy(ptr, view.Data(), view.Size());
+ key.UpdateExternalPointer(ptr);
+}
+
+template <typename T>
+TStringBuf GetKeyView(const T& key) {
+ return TStringBuf((const char*)&key, sizeof(T));
+}
+
+template <>
+TStringBuf GetKeyView(const TSSOKey& key) {
+ return key.AsView();
+}
+
+template <typename TKey>
+class TBlockCombineHashedWrapper : public TStatefulWideFlowComputationNode<TBlockCombineHashedWrapper<TKey>> {
public:
+ using TSelf = TBlockCombineHashedWrapper<TKey>;
+ using TBase = TStatefulWideFlowComputationNode<TSelf>;
+
TBlockCombineHashedWrapper(TComputationMutables& mutables,
IComputationWideFlowNode* flow,
- ui32 countColumn,
std::optional<ui32> filterColumn,
size_t width,
const std::vector<TKeyParams>& keys,
+ std::vector<std::unique_ptr<IKeySerializer>>&& keySerializers,
TVector<TAggParams>&& aggsParams)
- : TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any)
+ : TBase(mutables, flow, EValueRepresentation::Any)
, Flow_(flow)
- , CountColumn_(countColumn)
, FilterColumn_(filterColumn)
, Width_(width)
, OutputWidth_(keys.size() + aggsParams.size() + 1)
, Keys_(keys)
+ , KeySerializers_(std::move(keySerializers))
, AggsParams_(std::move(aggsParams))
{
- for (const auto& k : Keys_) {
- auto itemType = AS_TYPE(TBlockType, k.Type)->GetItemType();
- bool isOptional;
- auto dataType = UnpackOptionalData(itemType, isOptional);
- if (isOptional) {
- TotalKeysSize_ += 1;
- }
-
- switch (*dataType->GetDataSlot()) {
- case NUdf::EDataSlot::Int8:
- TotalKeysSize_ += 1;
- if (isOptional) {
- KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<i8, arrow::Int8Scalar, arrow::Int8Builder, true>>(arrow::int8()));
- } else {
- KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<i8, arrow::Int8Scalar, arrow::Int8Builder, false>>(arrow::int8()));
- }
-
- break;
- case NUdf::EDataSlot::Bool:
- case NUdf::EDataSlot::Uint8:
- TotalKeysSize_ += 1;
- if (isOptional) {
- KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui8, arrow::UInt8Scalar, arrow::UInt8Builder, true>>(arrow::uint8()));
- } else {
- KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui8, arrow::UInt8Scalar, arrow::UInt8Builder, false>>(arrow::uint8()));
- }
-
- break;
- case NUdf::EDataSlot::Int16:
- TotalKeysSize_ += 2;
- if (isOptional) {
- KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<i16, arrow::Int16Scalar, arrow::Int16Builder, true>>(arrow::int16()));
- } else {
- KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<i16, arrow::Int16Scalar, arrow::Int16Builder, false>>(arrow::int16()));
- }
-
- break;
- case NUdf::EDataSlot::Uint16:
- case NUdf::EDataSlot::Date:
- TotalKeysSize_ += 2;
- if (isOptional) {
- KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui16, arrow::UInt16Scalar, arrow::UInt16Builder, true>>(arrow::uint16()));
- } else {
- KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui16, arrow::UInt16Scalar, arrow::UInt16Builder, false>>(arrow::uint16()));
- }
-
- break;
- case NUdf::EDataSlot::Int32:
- TotalKeysSize_ += 4;
- if (isOptional) {
- KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<i32, arrow::Int32Scalar, arrow::Int32Builder, true>>(arrow::int32()));
- } else {
- KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<i32, arrow::Int32Scalar, arrow::Int32Builder, false>>(arrow::int32()));
- }
-
- break;
- case NUdf::EDataSlot::Uint32:
- case NUdf::EDataSlot::Datetime:
- TotalKeysSize_ += 4;
- if (isOptional) {
- KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui32, arrow::UInt32Scalar, arrow::UInt32Builder, true>>(arrow::uint32()));
- } else {
- KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui32, arrow::UInt32Scalar, arrow::UInt32Builder, false>>(arrow::uint32()));
- }
-
- break;
- case NUdf::EDataSlot::Int64:
- case NUdf::EDataSlot::Interval:
- TotalKeysSize_ += 8;
- if (isOptional) {
- KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<i64, arrow::Int64Scalar, arrow::Int64Builder, true>>(arrow::int64()));
- } else {
- KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<i64, arrow::Int64Scalar, arrow::Int64Builder, false>>(arrow::int64()));
- }
-
- break;
- case NUdf::EDataSlot::Uint64:
- case NUdf::EDataSlot::Timestamp:
- TotalKeysSize_ += 8;
- if (isOptional) {
- KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui64, arrow::UInt64Scalar, arrow::UInt64Builder, true>>(arrow::uint64()));
- } else {
- KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui64, arrow::UInt64Scalar, arrow::UInt64Builder, false>>(arrow::uint64()));
- }
-
- break;
- default:
- throw yexception() << "Unsupported key type";
- }
- }
-
- MKQL_ENSURE(TotalKeysSize_ <= 4, "TODO Support all lengths of keys");
+ MKQL_ENSURE(Width_ > 0, "Missing block length column");
}
EFetchResult DoCalculate(NUdf::TUnboxedValue& state,
@@ -493,7 +530,7 @@ public:
}
TOutputBuffer out;
- out.Resize(sizeof(ui32));
+ out.Resize(sizeof(TKey));
for (ui64 row = 0; row < batchLength; ++row) {
out.Rewind();
// encode key
@@ -502,8 +539,7 @@ public:
}
auto str = out.Finish();
- Y_ASSERT(str.Size() <= sizeof(ui32));
- ui32 key = *(const ui32*)str.Data();
+ TKey key = MakeKey<TKey>(str);
bool isNew;
auto iter = s.HashMap_->Insert(key, isNew);
char* ptr = (char*)s.HashMap_->GetPayload(iter);
@@ -516,6 +552,10 @@ public:
ptr += s.Aggs_[i]->StateSize;
}
+ if constexpr (std::is_same<TKey, TSSOKey>::value) {
+ MoveKeyToArena(s.HashMap_->GetKey(iter), s.Arena_);
+ }
+
s.HashMap_->CheckGrow();
} else {
for (size_t i = 0; i < s.Aggs_.size(); ++i) {
@@ -550,9 +590,9 @@ public:
continue;
}
- ui32 key = s.HashMap_->GetKey(iter);
+ const TKey& key = s.HashMap_->GetKey(iter);
auto ptr = (const char*)s.HashMap_->GetPayload(iter);
- TInputBuffer in(TStringBuf((const char*)&key, sizeof(key)));
+ TInputBuffer in(GetKeyView<TKey>(key));
for (auto& kb : keyBuilders) {
kb->Add(in);
}
@@ -587,18 +627,22 @@ public:
private:
struct TState : public TComputationValue<TState> {
+ using TBase = TComputationValue<TState>;
+
TVector<NUdf::TUnboxedValue> Values_;
TVector<NUdf::TUnboxedValue*> ValuePointers_;
TVector<std::unique_ptr<IBlockAggregator>> Aggs_;
bool IsFinished_ = false;
bool HasValues_ = false;
ui32 TotalStateSize_ = 0;
- std::unique_ptr<TRobinHoodHashMap<ui32>> HashMap_;
+ std::unique_ptr<TRobinHoodHashMap<TKey, std::equal_to<TKey>, std::hash<TKey>, TMKQLAllocator<char>>> HashMap_;
+ TPagedArena Arena_;
TState(TMemoryUsageInfo* memInfo, size_t width, std::optional<ui32> filterColumn, const TVector<TAggParams>& params, TComputationContext& ctx)
- : TComputationValue(memInfo)
+ : TBase(memInfo)
, Values_(width)
, ValuePointers_(width)
+ , Arena_(TlsAllocState)
{
for (size_t i = 0; i < width; ++i) {
ValuePointers_[i] = &Values_[i];
@@ -610,13 +654,13 @@ private:
TotalStateSize_ += Aggs_.back()->StateSize;
}
- HashMap_ = std::make_unique<TRobinHoodHashMap<ui32>>(TotalStateSize_);
+ HashMap_ = std::make_unique<TRobinHoodHashMap<TKey, std::equal_to<TKey>, std::hash<TKey>, TMKQLAllocator<char>>>(TotalStateSize_);
}
};
private:
void RegisterDependencies() const final {
- FlowDependsOn(Flow_);
+ this->FlowDependsOn(Flow_);
}
TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
@@ -627,18 +671,16 @@ private:
}
ui64 GetBatchLength(const NUdf::TUnboxedValue* columns) const {
- return TArrowBlock::From(columns[CountColumn_]).GetDatum().scalar_as<arrow::UInt64Scalar>().value;
+ return TArrowBlock::From(columns[Width_ - 1]).GetDatum().scalar_as<arrow::UInt64Scalar>().value;
}
private:
IComputationWideFlowNode* Flow_;
- const ui32 CountColumn_;
std::optional<ui32> FilterColumn_;
const size_t Width_;
const size_t OutputWidth_;
const std::vector<TKeyParams> Keys_;
const TVector<TAggParams> AggsParams_;
- ui32 TotalKeysSize_ = 0;
std::vector<std::unique_ptr<IKeySerializer>> KeySerializers_;
};
@@ -659,52 +701,152 @@ void FillAggParams(TTupleLiteral* aggsVal, TTupleType* tupleType, TVector<TAggPa
}
IComputationNode* WrapBlockCombineAll(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- MKQL_ENSURE(callable.GetInputsCount() == 4, "Expected 4 args");
+ MKQL_ENSURE(callable.GetInputsCount() == 3, "Expected 3 args");
const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType());
auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));
MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
- ui32 countColumn = AS_VALUE(TDataLiteral, callable.GetInput(1))->AsValue().Get<ui32>();
- auto filterColumnVal = AS_VALUE(TOptionalLiteral, callable.GetInput(2));
+ auto filterColumnVal = AS_VALUE(TOptionalLiteral, callable.GetInput(1));
std::optional<ui32> filterColumn;
if (filterColumnVal->HasItem()) {
filterColumn = AS_VALUE(TDataLiteral, filterColumnVal->GetItem())->AsValue().Get<ui32>();
}
- auto aggsVal = AS_VALUE(TTupleLiteral, callable.GetInput(3));
+ auto aggsVal = AS_VALUE(TTupleLiteral, callable.GetInput(2));
TVector<TAggParams> aggsParams;
FillAggParams(aggsVal, tupleType, aggsParams);
- return new TBlockCombineAllWrapper(ctx.Mutables, wideFlow, countColumn, filterColumn, tupleType->GetElementsCount(), std::move(aggsParams));
+ return new TBlockCombineAllWrapper(ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), std::move(aggsParams));
}
IComputationNode* WrapBlockCombineHashed(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- MKQL_ENSURE(callable.GetInputsCount() == 5, "Expected 5 args");
+ MKQL_ENSURE(callable.GetInputsCount() == 4, "Expected 4 args");
const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType());
auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));
MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
- ui32 countColumn = AS_VALUE(TDataLiteral, callable.GetInput(1))->AsValue().Get<ui32>();
- auto filterColumnVal = AS_VALUE(TOptionalLiteral, callable.GetInput(2));
+ auto filterColumnVal = AS_VALUE(TOptionalLiteral, callable.GetInput(1));
std::optional<ui32> filterColumn;
if (filterColumnVal->HasItem()) {
filterColumn = AS_VALUE(TDataLiteral, filterColumnVal->GetItem())->AsValue().Get<ui32>();
}
- auto keysVal = AS_VALUE(TTupleLiteral, callable.GetInput(3));
+ auto keysVal = AS_VALUE(TTupleLiteral, callable.GetInput(2));
std::vector<TKeyParams> keys;
for (ui32 i = 0; i < keysVal->GetValuesCount(); ++i) {
ui32 index = AS_VALUE(TDataLiteral, keysVal->GetValue(i))->AsValue().Get<ui32>();
keys.emplace_back(TKeyParams{ index, tupleType->GetElementType(index) });
}
- auto aggsVal = AS_VALUE(TTupleLiteral, callable.GetInput(4));
+ auto aggsVal = AS_VALUE(TTupleLiteral, callable.GetInput(3));
TVector<TAggParams> aggsParams;
FillAggParams(aggsVal, tupleType, aggsParams);
- return new TBlockCombineHashedWrapper(ctx.Mutables, wideFlow, countColumn, filterColumn, tupleType->GetElementsCount(), keys, std::move(aggsParams));
+
+ ui32 totalKeysSize = 0;
+ std::vector<std::unique_ptr<IKeySerializer>> keySerializers;
+ for (const auto& k : keys) {
+ auto itemType = AS_TYPE(TBlockType, k.Type)->GetItemType();
+ bool isOptional;
+ auto dataType = UnpackOptionalData(itemType, isOptional);
+ if (isOptional) {
+ totalKeysSize += 1;
+ }
+
+ switch (*dataType->GetDataSlot()) {
+ case NUdf::EDataSlot::Int8:
+ totalKeysSize += 1;
+ if (isOptional) {
+ keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<i8, arrow::Int8Scalar, arrow::Int8Builder, true>>(arrow::int8()));
+ } else {
+ keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<i8, arrow::Int8Scalar, arrow::Int8Builder, false>>(arrow::int8()));
+ }
+
+ break;
+ case NUdf::EDataSlot::Bool:
+ case NUdf::EDataSlot::Uint8:
+ totalKeysSize += 1;
+ if (isOptional) {
+ keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui8, arrow::UInt8Scalar, arrow::UInt8Builder, true>>(arrow::uint8()));
+ } else {
+ keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui8, arrow::UInt8Scalar, arrow::UInt8Builder, false>>(arrow::uint8()));
+ }
+
+ break;
+ case NUdf::EDataSlot::Int16:
+ totalKeysSize += 2;
+ if (isOptional) {
+ keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<i16, arrow::Int16Scalar, arrow::Int16Builder, true>>(arrow::int16()));
+ } else {
+ keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<i16, arrow::Int16Scalar, arrow::Int16Builder, false>>(arrow::int16()));
+ }
+
+ break;
+ case NUdf::EDataSlot::Uint16:
+ case NUdf::EDataSlot::Date:
+ totalKeysSize += 2;
+ if (isOptional) {
+ keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui16, arrow::UInt16Scalar, arrow::UInt16Builder, true>>(arrow::uint16()));
+ } else {
+ keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui16, arrow::UInt16Scalar, arrow::UInt16Builder, false>>(arrow::uint16()));
+ }
+
+ break;
+ case NUdf::EDataSlot::Int32:
+ totalKeysSize += 4;
+ if (isOptional) {
+ keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<i32, arrow::Int32Scalar, arrow::Int32Builder, true>>(arrow::int32()));
+ } else {
+ keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<i32, arrow::Int32Scalar, arrow::Int32Builder, false>>(arrow::int32()));
+ }
+
+ break;
+ case NUdf::EDataSlot::Uint32:
+ case NUdf::EDataSlot::Datetime:
+ totalKeysSize += 4;
+ if (isOptional) {
+ keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui32, arrow::UInt32Scalar, arrow::UInt32Builder, true>>(arrow::uint32()));
+ } else {
+ keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui32, arrow::UInt32Scalar, arrow::UInt32Builder, false>>(arrow::uint32()));
+ }
+
+ break;
+ case NUdf::EDataSlot::Int64:
+ case NUdf::EDataSlot::Interval:
+ totalKeysSize += 8;
+ if (isOptional) {
+ keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<i64, arrow::Int64Scalar, arrow::Int64Builder, true>>(arrow::int64()));
+ } else {
+ keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<i64, arrow::Int64Scalar, arrow::Int64Builder, false>>(arrow::int64()));
+ }
+
+ break;
+ case NUdf::EDataSlot::Uint64:
+ case NUdf::EDataSlot::Timestamp:
+ totalKeysSize += 8;
+ if (isOptional) {
+ keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui64, arrow::UInt64Scalar, arrow::UInt64Builder, true>>(arrow::uint64()));
+ } else {
+ keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui64, arrow::UInt64Scalar, arrow::UInt64Builder, false>>(arrow::uint64()));
+ }
+
+ break;
+ default:
+ throw yexception() << "Unsupported key type";
+ }
+ }
+
+ if (totalKeysSize <= sizeof(ui32)) {
+ return new TBlockCombineHashedWrapper<ui32>(ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), keys, std::move(keySerializers), std::move(aggsParams));
+ }
+
+ if (totalKeysSize <= sizeof(ui64)) {
+ return new TBlockCombineHashedWrapper<ui64>(ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), keys, std::move(keySerializers), std::move(aggsParams));
+ }
+
+ return new TBlockCombineHashedWrapper<TSSOKey>(ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), keys, std::move(keySerializers), std::move(aggsParams));
}
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_rh_hash.h b/ydb/library/yql/minikql/comp_nodes/mkql_rh_hash.h
index 863f8cfa827..8f3744bdc30 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_rh_hash.h
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_rh_hash.h
@@ -8,11 +8,13 @@ namespace NKikimr {
namespace NMiniKQL {
//TODO: only POD key & payloads are now supported
-template <typename TKey, typename TEqual, typename THash, typename TDeriv>
+template <typename TKey, typename TEqual, typename THash, typename TAllocator, typename TDeriv>
class TRobinHoodHashBase {
protected:
using TPSLStorage = i32;
+ using TVec = std::vector<char, TAllocator>;
+
explicit TRobinHoodHashBase(ui64 initialCapacity = 1u << 8)
: Capacity(initialCapacity)
{
@@ -87,7 +89,7 @@ public:
}
private:
- Y_FORCE_INLINE char* InsertImpl(TKey key, bool& isNew, ui64 capacity, std::vector<char>& data) {
+ Y_FORCE_INLINE char* InsertImpl(TKey key, bool& isNew, ui64 capacity, TVec& data) {
isNew = false;
ui64 bucket = THash()(key) & (capacity - 1);
char* ptr = data.data() + AsDeriv().GetCellSize() * bucket;
@@ -143,7 +145,7 @@ private:
}
void Grow() {
- std::vector<char> newData;
+ TVec newData;
auto newCapacity = Capacity * 2;
Allocate(newCapacity, newData);
for (auto iter = Begin(); iter != End(); Advance(iter)) {
@@ -161,7 +163,7 @@ private:
Capacity = newCapacity;
}
- void AdvancePointer(char*& ptr, std::vector<char>& data) const {
+ void AdvancePointer(char*& ptr, TVec& data) const {
ptr += AsDeriv().GetCellSize();
ptr = (ptr == data.data() + data.size()) ? data.data() : ptr;
}
@@ -172,7 +174,7 @@ protected:
}
private:
- void Allocate(ui64 capacity, std::vector<char>& data) const {
+ void Allocate(ui64 capacity, TVec& data) const {
data.resize(AsDeriv().GetCellSize() * capacity);
char* ptr = data.data();
for (ui64 i = 0; i < capacity; ++i) {
@@ -192,14 +194,14 @@ private:
private:
ui64 Size = 0;
ui64 Capacity;
- std::vector<char> Data;
+ TVec Data;
};
-template <typename TKey, typename TEqual = std::equal_to<TKey>, typename THash = std::hash<TKey>>
-class TRobinHoodHashMap : public TRobinHoodHashBase<TKey, TEqual, THash, TRobinHoodHashMap<TKey, TEqual, THash>> {
+template <typename TKey, typename TEqual = std::equal_to<TKey>, typename THash = std::hash<TKey>, typename TAllocator = std::allocator<char>>
+class TRobinHoodHashMap : public TRobinHoodHashBase<TKey, TEqual, THash, TAllocator, TRobinHoodHashMap<TKey, TEqual, THash, TAllocator>> {
public:
- using TSelf = TRobinHoodHashMap<TKey, TEqual, THash>;
- using TBase = TRobinHoodHashBase<TKey, TEqual, THash, TSelf>;
+ using TSelf = TRobinHoodHashMap<TKey, TEqual, THash, TAllocator>;
+ using TBase = TRobinHoodHashBase<TKey, TEqual, THash, TAllocator, TSelf>;
explicit TRobinHoodHashMap(ui32 payloadSize, ui64 initialCapacity = 1u << 8)
: TBase(initialCapacity)
@@ -244,14 +246,14 @@ public:
private:
const ui32 CellSize;
const ui32 PayloadSize;
- std::vector<char> TmpPayload, TmpPayload2;
+ typename TBase::TVec TmpPayload, TmpPayload2;
};
-template <typename TKey, typename TEqual = std::equal_to<TKey>, typename THash = std::hash<TKey>>
-class TRobinHoodHashSet : public TRobinHoodHashBase<TKey, TEqual, THash, TRobinHoodHashSet<TKey, TEqual, THash>> {
+template <typename TKey, typename TEqual = std::equal_to<TKey>, typename THash = std::hash<TKey>, typename TAllocator = std::allocator<char>>
+class TRobinHoodHashSet : public TRobinHoodHashBase<TKey, TEqual, THash, TAllocator, TRobinHoodHashSet<TKey, TEqual, THash, TAllocator>> {
public:
- using TSelf = TRobinHoodHashSet<TKey, TEqual, THash>;
- using TBase = TRobinHoodHashBase<TKey, TEqual, THash, TSelf>;
+ using TSelf = TRobinHoodHashSet<TKey, TEqual, THash, TAllocator>;
+ using TBase = TRobinHoodHashBase<TKey, TEqual, THash, TAllocator, TSelf>;
explicit TRobinHoodHashSet(ui64 initialCapacity = 1u << 8)
: TBase(initialCapacity)
diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp
index ffe022d2efe..53c27b7e471 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.cpp
+++ b/ydb/library/yql/minikql/mkql_program_builder.cpp
@@ -5308,7 +5308,7 @@ TRuntimeNode TProgramBuilder::BlockBitCast(TRuntimeNode value, TType* targetType
return TRuntimeNode(builder.Build(), false);
}
-TRuntimeNode TProgramBuilder::BlockCombineAll(TRuntimeNode flow, ui32 countColumn, std::optional<ui32> filterColumn,
+TRuntimeNode TProgramBuilder::BlockCombineAll(TRuntimeNode flow, std::optional<ui32> filterColumn,
const TArrayRef<const TAggInfo>& aggs, TType* returnType) {
if constexpr (RuntimeVersion < 31U) {
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
@@ -5316,7 +5316,6 @@ TRuntimeNode TProgramBuilder::BlockCombineAll(TRuntimeNode flow, ui32 countColum
TCallableBuilder builder(Env, __func__, returnType);
builder.Add(flow);
- builder.Add(NewDataLiteral<ui32>(countColumn));
if (!filterColumn) {
builder.Add(NewEmptyOptionalDataLiteral(NUdf::TDataType<ui32>::Id));
} else {
@@ -5338,7 +5337,7 @@ TRuntimeNode TProgramBuilder::BlockCombineAll(TRuntimeNode flow, ui32 countColum
return TRuntimeNode(builder.Build(), false);
}
-TRuntimeNode TProgramBuilder::BlockCombineHashed(TRuntimeNode flow, ui32 countColumn, std::optional<ui32> filterColumn, const TArrayRef<ui32>& keys,
+TRuntimeNode TProgramBuilder::BlockCombineHashed(TRuntimeNode flow, std::optional<ui32> filterColumn, const TArrayRef<ui32>& keys,
const TArrayRef<const TAggInfo>& aggs, TType* returnType) {
if constexpr (RuntimeVersion < 31U) {
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
@@ -5346,7 +5345,6 @@ TRuntimeNode TProgramBuilder::BlockCombineHashed(TRuntimeNode flow, ui32 countCo
TCallableBuilder builder(Env, __func__, returnType);
builder.Add(flow);
- builder.Add(NewDataLiteral<ui32>(countColumn));
if (!filterColumn) {
builder.Add(NewEmptyOptionalDataLiteral(NUdf::TDataType<ui32>::Id));
} else {
diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h
index 868be9f5775..1214c5ddc5a 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.h
+++ b/ydb/library/yql/minikql/mkql_program_builder.h
@@ -257,9 +257,9 @@ public:
TRuntimeNode BlockFunc(const std::string_view& funcName, TType* returnType, const TArrayRef<const TRuntimeNode>& args);
TRuntimeNode BlockBitCast(TRuntimeNode value, TType* targetType);
- TRuntimeNode BlockCombineAll(TRuntimeNode flow, ui32 countColumn, std::optional<ui32> filterColumn,
+ TRuntimeNode BlockCombineAll(TRuntimeNode flow, std::optional<ui32> filterColumn,
const TArrayRef<const TAggInfo>& aggs, TType* returnType);
- TRuntimeNode BlockCombineHashed(TRuntimeNode flow, ui32 countColumn, std::optional<ui32> filterColumn, const TArrayRef<ui32>& keys,
+ TRuntimeNode BlockCombineHashed(TRuntimeNode flow, std::optional<ui32> filterColumn, const TArrayRef<ui32>& keys,
const TArrayRef<const TAggInfo>& aggs, TType* returnType);
// udfs
diff --git a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
index 071837afd90..321e3452877 100644
--- a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
+++ b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
@@ -2406,14 +2406,13 @@ TMkqlCommonCallableCompiler::TShared::TShared() {
AddCallable("BlockCombineAll", [](const TExprNode& node, TMkqlBuildContext& ctx) {
auto arg = MkqlBuildExpr(*node.Child(0), ctx);
- ui32 countColumn = FromString<ui32>(node.Child(1)->Content());
std::optional<ui32> filterColumn;
- if (!node.Child(2)->IsCallable("Void")) {
- filterColumn = FromString<ui32>(node.Child(2)->Content());
+ if (!node.Child(1)->IsCallable("Void")) {
+ filterColumn = FromString<ui32>(node.Child(1)->Content());
}
TVector<TAggInfo> aggs;
- for (const auto& agg : node.Child(3)->Children()) {
+ for (const auto& agg : node.Child(2)->Children()) {
TAggInfo info;
info.Name = TString(agg->Head().Head().Content());
for (ui32 i = 1; i < agg->ChildrenSize(); ++i) {
@@ -2424,24 +2423,23 @@ TMkqlCommonCallableCompiler::TShared::TShared() {
}
auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
- return ctx.ProgramBuilder.BlockCombineAll(arg, countColumn, filterColumn, aggs, returnType);
+ return ctx.ProgramBuilder.BlockCombineAll(arg, filterColumn, aggs, returnType);
});
AddCallable("BlockCombineHashed", [](const TExprNode& node, TMkqlBuildContext& ctx) {
auto arg = MkqlBuildExpr(*node.Child(0), ctx);
- ui32 countColumn = FromString<ui32>(node.Child(1)->Content());
std::optional<ui32> filterColumn;
- if (!node.Child(2)->IsCallable("Void")) {
- filterColumn = FromString<ui32>(node.Child(2)->Content());
+ if (!node.Child(1)->IsCallable("Void")) {
+ filterColumn = FromString<ui32>(node.Child(1)->Content());
}
TVector<ui32> keys;
- for (const auto& key : node.Child(3)->Children()) {
+ for (const auto& key : node.Child(2)->Children()) {
keys.push_back(FromString<ui32>(key->Content()));
}
TVector<TAggInfo> aggs;
- for (const auto& agg : node.Child(4)->Children()) {
+ for (const auto& agg : node.Child(3)->Children()) {
TAggInfo info;
info.Name = TString(agg->Head().Head().Content());
for (ui32 i = 1; i < agg->ChildrenSize(); ++i) {
@@ -2452,7 +2450,7 @@ TMkqlCommonCallableCompiler::TShared::TShared() {
}
auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
- return ctx.ProgramBuilder.BlockCombineHashed(arg, countColumn, filterColumn, keys, aggs, returnType);
+ return ctx.ProgramBuilder.BlockCombineHashed(arg, filterColumn, keys, aggs, returnType);
});
AddCallable("BlockCompress", [](const TExprNode& node, TMkqlBuildContext& ctx) {