diff options
author | vvvv <vvvv@ydb.tech> | 2022-12-09 20:12:40 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2022-12-09 20:12:40 +0300 |
commit | e88d560cbbd3a1551449244afb37345443487919 (patch) | |
tree | 7d9185776a8656a9d7bd1abc11066d6e438ca46b | |
parent | 66415f6b8919ea1c545c2ba15284cde3efa93d99 (diff) | |
download | ydb-e88d560cbbd3a1551449244afb37345443487919.tar.gz |
get rid of count column, always use last one. SSO keys with arena
9 files changed, 332 insertions, 246 deletions
diff --git a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json index 696777c89e5..0c31c725461 100644 --- a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json +++ b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json @@ -2247,9 +2247,8 @@ "Base": "TCoInputBase", "Match": {"Type": "Callable", "Name": "BlockCombineAll"}, "Children": [ - {"Index": 1, "Name": "CountColumn", "Type": "TCoAtom"}, - {"Index": 2, "Name": "FilterColumn", "Type": "TExprBase"}, - {"Index": 3, "Name": "Aggregations", "Type": "TExprList"} + {"Index": 1, "Name": "FilterColumn", "Type": "TExprBase"}, + {"Index": 2, "Name": "Aggregations", "Type": "TExprList"} ] }, { @@ -2257,10 +2256,9 @@ "Base": "TCoInputBase", "Match": {"Type": "Callable", "Name": "BlockCombineHashed"}, "Children": [ - {"Index": 1, "Name": "CountColumn", "Type": "TCoAtom"}, - {"Index": 2, "Name": "FilterColumn", "Type": "TExprBase"}, - {"Index": 3, "Name": "Keys", "Type": "TCoAtomList"}, - {"Index": 4, "Name": "Aggregations", "Type": "TExprList"} + {"Index": 1, "Name": "FilterColumn", "Type": "TExprBase"}, + {"Index": 2, "Name": "Keys", "Type": "TCoAtomList"}, + {"Index": 3, "Name": "Aggregations", "Type": "TExprList"} ] } ] diff --git a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp index 9ba65645e0f..7e879b086ea 100644 --- a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp +++ b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp @@ -4884,17 +4884,16 @@ TExprNode::TPtr OptimizeSkipTakeToBlocks(const TExprNode::TPtr& node, TExprConte TExprNode::TPtr UpdateBlockCombineAllColumns(const TExprNode::TPtr& node, std::optional<ui32> filterColumn, const TVector<ui32>& argIndices, TExprContext& ctx) { auto combineChildren = node->ChildrenList(); combineChildren[0] = node->Head().HeadPtr(); - combineChildren[1] = ctx.NewAtom(node->Pos(), ToString(argIndices[FromString<ui32>(combineChildren[1]->Content())])); if (filterColumn) { - YQL_ENSURE(combineChildren[2]->IsCallable("Void"), "Filter column is already used"); - combineChildren[2] = ctx.NewAtom(node->Pos(), ToString(*filterColumn)); + YQL_ENSURE(combineChildren[1]->IsCallable("Void"), "Filter column is already used"); + combineChildren[1] = ctx.NewAtom(node->Pos(), ToString(*filterColumn)); } else { - if (!combineChildren[2]->IsCallable("Void")) { - combineChildren[2] = ctx.NewAtom(node->Pos(), ToString(argIndices[FromString<ui32>(combineChildren[2]->Content())])); + if (!combineChildren[1]->IsCallable("Void")) { + combineChildren[1] = ctx.NewAtom(node->Pos(), ToString(argIndices[FromString<ui32>(combineChildren[2]->Content())])); } } - auto payloadNodes = combineChildren[3]->ChildrenList(); + auto payloadNodes = combineChildren[2]->ChildrenList(); for (auto& p : payloadNodes) { YQL_ENSURE(p->IsList() && p->ChildrenSize() >= 1 && p->Head().IsCallable("AggBlockApply"), "Expected AggBlockApply"); auto payloadArgs = p->ChildrenList(); @@ -4905,7 +4904,7 @@ TExprNode::TPtr UpdateBlockCombineAllColumns(const TExprNode::TPtr& node, std::o p = ctx.ChangeChildren(*p, std::move(payloadArgs)); } - combineChildren[3] = ctx.ChangeChildren(*combineChildren[3], std::move(payloadNodes)); + combineChildren[2] = ctx.ChangeChildren(*combineChildren[2], std::move(payloadNodes)); return ctx.ChangeChildren(*node, std::move(combineChildren)); } @@ -4921,7 +4920,7 @@ TExprNode::TPtr OptimizeBlockCombineAll(const TExprNode::TPtr& node, TExprContex } } - if (node->Head().IsCallable("BlockCompress") && node->Child(2)->IsCallable("Void")) { + if (node->Head().IsCallable("BlockCompress") && node->Child(1)->IsCallable("Void")) { auto filterIndex = FromString<ui32>(node->Head().Child(1)->Content()); TVector<ui32> argIndices; argIndices.resize(node->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>()->GetSize()); diff --git a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp index c6c09dcd8ea..bcf423eac71 100644 --- a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp @@ -295,57 +295,33 @@ bool ValidateBlockAggs(TPositionHandle pos, const TTypeAnnotationNode::TListType IGraphTransformer::TStatus BlockCombineAllWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) { Y_UNUSED(output); - if (!EnsureArgsCount(*input, 4U, ctx.Expr)) { + if (!EnsureArgsCount(*input, 3U, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } - if (!EnsureWideFlowType(input->Head(), ctx.Expr)) { - return IGraphTransformer::TStatus::Error; - } - - const auto multiType = input->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>(); - TTypeAnnotationNode::TListType inputItems; - for (const auto& type : multiType->GetItems()) { - if (!EnsureBlockOrScalarType(input->Pos(), *type, ctx.Expr)) { - return IGraphTransformer::TStatus::Error; - } - - bool isScalar; - inputItems.push_back(GetBlockItemType(*type, isScalar)); - } - - if (!EnsureAtom(*input->Child(1), ctx.Expr)) { - return IGraphTransformer::TStatus::Error; - } - - ui32 countColumnIndex; - if (!TryFromString(input->Child(1)->Content(), countColumnIndex) || countColumnIndex >= inputItems.size()) { - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Bad count column index")); - return IGraphTransformer::TStatus::Error; - } - - if (!EnsureSpecificDataType(input->Child(1)->Pos(), *inputItems[countColumnIndex], EDataSlot::Uint64, ctx.Expr)) { + TTypeAnnotationNode::TListType blockItemTypes; + if (!EnsureWideFlowBlockType(input->Head(), blockItemTypes, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } - if (!input->Child(2)->IsCallable("Void")) { - if (!EnsureAtom(*input->Child(2), ctx.Expr)) { + if (!input->Child(1)->IsCallable("Void")) { + if (!EnsureAtom(*input->Child(1), ctx.Expr)) { return IGraphTransformer::TStatus::Error; } ui32 filterColumnIndex; - if (!TryFromString(input->Child(2)->Content(), filterColumnIndex) || filterColumnIndex >= inputItems.size()) { + if (!TryFromString(input->Child(1)->Content(), filterColumnIndex) || filterColumnIndex >= blockItemTypes.size()) { ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Bad filter column index")); return IGraphTransformer::TStatus::Error; } - if (!EnsureSpecificDataType(input->Child(2)->Pos(), *inputItems[filterColumnIndex], EDataSlot::Bool, ctx.Expr)) { + if (!EnsureSpecificDataType(input->Child(1)->Pos(), *blockItemTypes[filterColumnIndex], EDataSlot::Bool, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } } TTypeAnnotationNode::TListType retMultiType; - if (!ValidateBlockAggs(input->Pos(), inputItems, *input->Child(3), retMultiType, ctx.Expr)) { + if (!ValidateBlockAggs(input->Pos(), blockItemTypes, *input->Child(2), retMultiType, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } @@ -356,75 +332,51 @@ IGraphTransformer::TStatus BlockCombineAllWrapper(const TExprNode::TPtr& input, IGraphTransformer::TStatus BlockCombineHashedWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) { Y_UNUSED(output); - if (!EnsureArgsCount(*input, 5U, ctx.Expr)) { - return IGraphTransformer::TStatus::Error; - } - - if (!EnsureWideFlowType(input->Head(), ctx.Expr)) { - return IGraphTransformer::TStatus::Error; - } - - const auto multiType = input->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>(); - TTypeAnnotationNode::TListType inputItems; - for (const auto& type : multiType->GetItems()) { - if (!EnsureBlockOrScalarType(input->Pos(), *type, ctx.Expr)) { - return IGraphTransformer::TStatus::Error; - } - - bool isScalar; - inputItems.push_back(GetBlockItemType(*type, isScalar)); - } - - if (!EnsureAtom(*input->Child(1), ctx.Expr)) { - return IGraphTransformer::TStatus::Error; - } - - ui32 countColumnIndex; - if (!TryFromString(input->Child(1)->Content(), countColumnIndex) || countColumnIndex >= inputItems.size()) { - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Bad count column index")); + if (!EnsureArgsCount(*input, 4U, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } - if (!EnsureSpecificDataType(input->Child(1)->Pos(), *inputItems[countColumnIndex], EDataSlot::Uint64, ctx.Expr)) { + TTypeAnnotationNode::TListType blockItemTypes; + if (!EnsureWideFlowBlockType(input->Head(), blockItemTypes, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } - if (!input->Child(2)->IsCallable("Void")) { - if (!EnsureAtom(*input->Child(2), ctx.Expr)) { + if (!input->Child(1)->IsCallable("Void")) { + if (!EnsureAtom(*input->Child(1), ctx.Expr)) { return IGraphTransformer::TStatus::Error; } ui32 filterColumnIndex; - if (!TryFromString(input->Child(2)->Content(), filterColumnIndex) || filterColumnIndex >= inputItems.size()) { + if (!TryFromString(input->Child(1)->Content(), filterColumnIndex) || filterColumnIndex >= blockItemTypes.size()) { ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Bad filter column index")); return IGraphTransformer::TStatus::Error; } - if (!EnsureSpecificDataType(input->Child(2)->Pos(), *inputItems[filterColumnIndex], EDataSlot::Bool, ctx.Expr)) { + if (!EnsureSpecificDataType(input->Child(1)->Pos(), *blockItemTypes[filterColumnIndex], EDataSlot::Bool, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } } - if (!EnsureTupleMinSize(*input->Child(3), 1, ctx.Expr)) { + if (!EnsureTupleMinSize(*input->Child(2), 1, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } TTypeAnnotationNode::TListType retMultiType; - for (auto child : input->Child(3)->Children()) { + for (auto child : input->Child(2)->Children()) { if (!EnsureAtom(*child, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } ui32 keyColumnIndex; - if (!TryFromString(child->Content(), keyColumnIndex) || keyColumnIndex >= inputItems.size()) { + if (!TryFromString(child->Content(), keyColumnIndex) || keyColumnIndex >= blockItemTypes.size()) { ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Bad key column index")); return IGraphTransformer::TStatus::Error; } - retMultiType.push_back(inputItems[keyColumnIndex]); + retMultiType.push_back(blockItemTypes[keyColumnIndex]); } - if (!ValidateBlockAggs(input->Pos(), inputItems, *input->Child(4), retMultiType, ctx.Expr)) { + if (!ValidateBlockAggs(input->Pos(), blockItemTypes, *input->Child(3), retMultiType, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } diff --git a/ydb/library/yql/core/yql_aggregate_expander.cpp b/ydb/library/yql/core/yql_aggregate_expander.cpp index 52d6ab24329..6ffce1fd3d0 100644 --- a/ydb/library/yql/core/yql_aggregate_expander.cpp +++ b/ydb/library/yql/core/yql_aggregate_expander.cpp @@ -589,7 +589,6 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAllOrHashed() { outputColumns.push_back(TString(FinalColumnNames[index]->Content())); } - auto mappedWidth = extractorRoots.size(); auto extractorLambda = Ctx.NewLambda(Node->Pos(), Ctx.NewArguments(Node->Pos(), std::move(extractorArgs)), std::move(extractorRoots)); auto mappedWideFlow = Ctx.NewCallable(Node->Pos(), "WideMap", { wideFlow, extractorLambda }); auto blocks = Ctx.NewCallable(Node->Pos(), "WideToBlocks", { mappedWideFlow }); @@ -599,11 +598,10 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAllOrHashed() { .Callable("WideFromBlocks") .Callable(0, "BlockCombineHashed") .Add(0, blocks) - .Atom(1, ToString(mappedWidth)) - .Callable(2, "Void") + .Callable(1, "Void") .Seal() - .Add(3, Ctx.NewList(Node->Pos(), std::move(keyIdxs))) - .Add(4, Ctx.NewList(Node->Pos(), std::move(aggs))) + .Add(2, Ctx.NewList(Node->Pos(), std::move(keyIdxs))) + .Add(3, Ctx.NewList(Node->Pos(), std::move(aggs))) .Seal() .Seal() .Build(); @@ -611,10 +609,9 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAllOrHashed() { aggWideFlow = Ctx.Builder(Node->Pos()) .Callable("BlockCombineAll") .Add(0, blocks) - .Atom(1, ToString(mappedWidth)) - .Callable(2, "Void") + .Callable(1, "Void") .Seal() - .Add(3, Ctx.NewList(Node->Pos(), std::move(aggs))) + .Add(2, Ctx.NewList(Node->Pos(), std::move(aggs))) .Seal() .Build(); } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp index 7524df98221..46b5fd1b6da 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp @@ -19,6 +19,96 @@ namespace NMiniKQL { namespace { +class TSSOKey { +public: + static constexpr size_t SSO_Length = 16; + static_assert(SSO_Length < 128); // should fit into 7 bits + + static bool CanBeInplace(TStringBuf data) { + return data.Size() + 1 <= sizeof(TSSOKey); + } + + static TSSOKey Inplace(TStringBuf data) { + Y_ASSERT(CanBeInplace(data)); + TSSOKey ret(1 | (data.Size() << 1), 0); + memcpy(ret.U.I.Buffer_, data.Data(), data.Size()); + return ret; + } + + static TSSOKey External(TStringBuf data) { + return TSSOKey(data.Size() << 1, data.Data()); + } + + bool IsInplace() const { + return U.I.SmallLength_ & 1; + } + + TStringBuf AsView() const { + if (IsInplace()) { + // inplace + return TStringBuf(U.I.Buffer_, U.I.SmallLength_ >> 1); + } else { + // external + return TStringBuf(U.E.Ptr_, U.E.Length_ >> 1); + } + } + + void UpdateExternalPointer(const char *ptr) { + Y_ASSERT(!IsInplace()); + U.E.Ptr_ = ptr; + } + +private: + TSSOKey(ui64 length, const char* ptr) { + U.E.Length_ = length; + U.E.Ptr_ = ptr; + } + +private: + union { + struct TExternal { + ui64 Length_; + const char* Ptr_; + } E; + struct TInplace { + ui8 SmallLength_; + char Buffer_[SSO_Length]; + } I; + } U; +}; + +} +} +} + +namespace std { + template <> + struct hash<NKikimr::NMiniKQL::TSSOKey> { + using argument_type = NKikimr::NMiniKQL::TSSOKey; + using result_type = size_t; + inline result_type operator()(argument_type const& s) const noexcept { + return std::hash<std::string_view>()(s.AsView()); + } + }; + + template <> + struct equal_to<NKikimr::NMiniKQL::TSSOKey> { + using argument_type = NKikimr::NMiniKQL::TSSOKey; + bool operator()(argument_type x, argument_type y) const { + return x.AsView() == y.AsView(); + } + bool operator()(argument_type x, TStringBuf y) const { + return x.AsView() == y; + } + using is_transparent = void; + }; +} + +namespace NKikimr { +namespace NMiniKQL { + +namespace { + struct TAggParams { TStringBuf Name; TTupleType* TupleType; @@ -213,17 +303,16 @@ class TBlockCombineAllWrapper : public TStatefulWideFlowComputationNode<TBlockCo public: TBlockCombineAllWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, - ui32 countColumn, std::optional<ui32> filterColumn, size_t width, TVector<TAggParams>&& aggsParams) : TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any) , Flow_(flow) - , CountColumn_(countColumn) , FilterColumn_(filterColumn) , Width_(width) , AggsParams_(std::move(aggsParams)) { + MKQL_ENSURE(Width_ > 0, "Missing block length column"); } EFetchResult DoCalculate(NUdf::TUnboxedValue& state, @@ -343,127 +432,75 @@ private: } ui64 GetBatchLength(const NUdf::TUnboxedValue* columns) const { - return TArrowBlock::From(columns[CountColumn_]).GetDatum().scalar_as<arrow::UInt64Scalar>().value; + return TArrowBlock::From(columns[Width_ - 1]).GetDatum().scalar_as<arrow::UInt64Scalar>().value; } private: IComputationWideFlowNode* Flow_; - const ui32 CountColumn_; std::optional<ui32> FilterColumn_; const size_t Width_; const TVector<TAggParams> AggsParams_; }; -class TBlockCombineHashedWrapper : public TStatefulWideFlowComputationNode<TBlockCombineHashedWrapper> { +template <typename T> +T MakeKey(TStringBuf s) { + Y_ASSERT(s.Size() <= sizeof(T)); + return *(const T*)s.Data(); +} + +template <> +TSSOKey MakeKey(TStringBuf s) { + if (TSSOKey::CanBeInplace(s)) { + return TSSOKey::Inplace(s); + } else { + return TSSOKey::External(s); + } +} + +void MoveKeyToArena(TSSOKey& key, TPagedArena& arena) { + if (key.IsInplace()) { + return; + } + + auto view = key.AsView(); + auto ptr = (char*)arena.Alloc(view.Size()); + memcpy(ptr, view.Data(), view.Size()); + key.UpdateExternalPointer(ptr); +} + +template <typename T> +TStringBuf GetKeyView(const T& key) { + return TStringBuf((const char*)&key, sizeof(T)); +} + +template <> +TStringBuf GetKeyView(const TSSOKey& key) { + return key.AsView(); +} + +template <typename TKey> +class TBlockCombineHashedWrapper : public TStatefulWideFlowComputationNode<TBlockCombineHashedWrapper<TKey>> { public: + using TSelf = TBlockCombineHashedWrapper<TKey>; + using TBase = TStatefulWideFlowComputationNode<TSelf>; + TBlockCombineHashedWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, - ui32 countColumn, std::optional<ui32> filterColumn, size_t width, const std::vector<TKeyParams>& keys, + std::vector<std::unique_ptr<IKeySerializer>>&& keySerializers, TVector<TAggParams>&& aggsParams) - : TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any) + : TBase(mutables, flow, EValueRepresentation::Any) , Flow_(flow) - , CountColumn_(countColumn) , FilterColumn_(filterColumn) , Width_(width) , OutputWidth_(keys.size() + aggsParams.size() + 1) , Keys_(keys) + , KeySerializers_(std::move(keySerializers)) , AggsParams_(std::move(aggsParams)) { - for (const auto& k : Keys_) { - auto itemType = AS_TYPE(TBlockType, k.Type)->GetItemType(); - bool isOptional; - auto dataType = UnpackOptionalData(itemType, isOptional); - if (isOptional) { - TotalKeysSize_ += 1; - } - - switch (*dataType->GetDataSlot()) { - case NUdf::EDataSlot::Int8: - TotalKeysSize_ += 1; - if (isOptional) { - KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<i8, arrow::Int8Scalar, arrow::Int8Builder, true>>(arrow::int8())); - } else { - KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<i8, arrow::Int8Scalar, arrow::Int8Builder, false>>(arrow::int8())); - } - - break; - case NUdf::EDataSlot::Bool: - case NUdf::EDataSlot::Uint8: - TotalKeysSize_ += 1; - if (isOptional) { - KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui8, arrow::UInt8Scalar, arrow::UInt8Builder, true>>(arrow::uint8())); - } else { - KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui8, arrow::UInt8Scalar, arrow::UInt8Builder, false>>(arrow::uint8())); - } - - break; - case NUdf::EDataSlot::Int16: - TotalKeysSize_ += 2; - if (isOptional) { - KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<i16, arrow::Int16Scalar, arrow::Int16Builder, true>>(arrow::int16())); - } else { - KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<i16, arrow::Int16Scalar, arrow::Int16Builder, false>>(arrow::int16())); - } - - break; - case NUdf::EDataSlot::Uint16: - case NUdf::EDataSlot::Date: - TotalKeysSize_ += 2; - if (isOptional) { - KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui16, arrow::UInt16Scalar, arrow::UInt16Builder, true>>(arrow::uint16())); - } else { - KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui16, arrow::UInt16Scalar, arrow::UInt16Builder, false>>(arrow::uint16())); - } - - break; - case NUdf::EDataSlot::Int32: - TotalKeysSize_ += 4; - if (isOptional) { - KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<i32, arrow::Int32Scalar, arrow::Int32Builder, true>>(arrow::int32())); - } else { - KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<i32, arrow::Int32Scalar, arrow::Int32Builder, false>>(arrow::int32())); - } - - break; - case NUdf::EDataSlot::Uint32: - case NUdf::EDataSlot::Datetime: - TotalKeysSize_ += 4; - if (isOptional) { - KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui32, arrow::UInt32Scalar, arrow::UInt32Builder, true>>(arrow::uint32())); - } else { - KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui32, arrow::UInt32Scalar, arrow::UInt32Builder, false>>(arrow::uint32())); - } - - break; - case NUdf::EDataSlot::Int64: - case NUdf::EDataSlot::Interval: - TotalKeysSize_ += 8; - if (isOptional) { - KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<i64, arrow::Int64Scalar, arrow::Int64Builder, true>>(arrow::int64())); - } else { - KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<i64, arrow::Int64Scalar, arrow::Int64Builder, false>>(arrow::int64())); - } - - break; - case NUdf::EDataSlot::Uint64: - case NUdf::EDataSlot::Timestamp: - TotalKeysSize_ += 8; - if (isOptional) { - KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui64, arrow::UInt64Scalar, arrow::UInt64Builder, true>>(arrow::uint64())); - } else { - KeySerializers_.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui64, arrow::UInt64Scalar, arrow::UInt64Builder, false>>(arrow::uint64())); - } - - break; - default: - throw yexception() << "Unsupported key type"; - } - } - - MKQL_ENSURE(TotalKeysSize_ <= 4, "TODO Support all lengths of keys"); + MKQL_ENSURE(Width_ > 0, "Missing block length column"); } EFetchResult DoCalculate(NUdf::TUnboxedValue& state, @@ -493,7 +530,7 @@ public: } TOutputBuffer out; - out.Resize(sizeof(ui32)); + out.Resize(sizeof(TKey)); for (ui64 row = 0; row < batchLength; ++row) { out.Rewind(); // encode key @@ -502,8 +539,7 @@ public: } auto str = out.Finish(); - Y_ASSERT(str.Size() <= sizeof(ui32)); - ui32 key = *(const ui32*)str.Data(); + TKey key = MakeKey<TKey>(str); bool isNew; auto iter = s.HashMap_->Insert(key, isNew); char* ptr = (char*)s.HashMap_->GetPayload(iter); @@ -516,6 +552,10 @@ public: ptr += s.Aggs_[i]->StateSize; } + if constexpr (std::is_same<TKey, TSSOKey>::value) { + MoveKeyToArena(s.HashMap_->GetKey(iter), s.Arena_); + } + s.HashMap_->CheckGrow(); } else { for (size_t i = 0; i < s.Aggs_.size(); ++i) { @@ -550,9 +590,9 @@ public: continue; } - ui32 key = s.HashMap_->GetKey(iter); + const TKey& key = s.HashMap_->GetKey(iter); auto ptr = (const char*)s.HashMap_->GetPayload(iter); - TInputBuffer in(TStringBuf((const char*)&key, sizeof(key))); + TInputBuffer in(GetKeyView<TKey>(key)); for (auto& kb : keyBuilders) { kb->Add(in); } @@ -587,18 +627,22 @@ public: private: struct TState : public TComputationValue<TState> { + using TBase = TComputationValue<TState>; + TVector<NUdf::TUnboxedValue> Values_; TVector<NUdf::TUnboxedValue*> ValuePointers_; TVector<std::unique_ptr<IBlockAggregator>> Aggs_; bool IsFinished_ = false; bool HasValues_ = false; ui32 TotalStateSize_ = 0; - std::unique_ptr<TRobinHoodHashMap<ui32>> HashMap_; + std::unique_ptr<TRobinHoodHashMap<TKey, std::equal_to<TKey>, std::hash<TKey>, TMKQLAllocator<char>>> HashMap_; + TPagedArena Arena_; TState(TMemoryUsageInfo* memInfo, size_t width, std::optional<ui32> filterColumn, const TVector<TAggParams>& params, TComputationContext& ctx) - : TComputationValue(memInfo) + : TBase(memInfo) , Values_(width) , ValuePointers_(width) + , Arena_(TlsAllocState) { for (size_t i = 0; i < width; ++i) { ValuePointers_[i] = &Values_[i]; @@ -610,13 +654,13 @@ private: TotalStateSize_ += Aggs_.back()->StateSize; } - HashMap_ = std::make_unique<TRobinHoodHashMap<ui32>>(TotalStateSize_); + HashMap_ = std::make_unique<TRobinHoodHashMap<TKey, std::equal_to<TKey>, std::hash<TKey>, TMKQLAllocator<char>>>(TotalStateSize_); } }; private: void RegisterDependencies() const final { - FlowDependsOn(Flow_); + this->FlowDependsOn(Flow_); } TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const { @@ -627,18 +671,16 @@ private: } ui64 GetBatchLength(const NUdf::TUnboxedValue* columns) const { - return TArrowBlock::From(columns[CountColumn_]).GetDatum().scalar_as<arrow::UInt64Scalar>().value; + return TArrowBlock::From(columns[Width_ - 1]).GetDatum().scalar_as<arrow::UInt64Scalar>().value; } private: IComputationWideFlowNode* Flow_; - const ui32 CountColumn_; std::optional<ui32> FilterColumn_; const size_t Width_; const size_t OutputWidth_; const std::vector<TKeyParams> Keys_; const TVector<TAggParams> AggsParams_; - ui32 TotalKeysSize_ = 0; std::vector<std::unique_ptr<IKeySerializer>> KeySerializers_; }; @@ -659,52 +701,152 @@ void FillAggParams(TTupleLiteral* aggsVal, TTupleType* tupleType, TVector<TAggPa } IComputationNode* WrapBlockCombineAll(TCallable& callable, const TComputationNodeFactoryContext& ctx) { - MKQL_ENSURE(callable.GetInputsCount() == 4, "Expected 4 args"); + MKQL_ENSURE(callable.GetInputsCount() == 3, "Expected 3 args"); const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType()); auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0)); MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); - ui32 countColumn = AS_VALUE(TDataLiteral, callable.GetInput(1))->AsValue().Get<ui32>(); - auto filterColumnVal = AS_VALUE(TOptionalLiteral, callable.GetInput(2)); + auto filterColumnVal = AS_VALUE(TOptionalLiteral, callable.GetInput(1)); std::optional<ui32> filterColumn; if (filterColumnVal->HasItem()) { filterColumn = AS_VALUE(TDataLiteral, filterColumnVal->GetItem())->AsValue().Get<ui32>(); } - auto aggsVal = AS_VALUE(TTupleLiteral, callable.GetInput(3)); + auto aggsVal = AS_VALUE(TTupleLiteral, callable.GetInput(2)); TVector<TAggParams> aggsParams; FillAggParams(aggsVal, tupleType, aggsParams); - return new TBlockCombineAllWrapper(ctx.Mutables, wideFlow, countColumn, filterColumn, tupleType->GetElementsCount(), std::move(aggsParams)); + return new TBlockCombineAllWrapper(ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), std::move(aggsParams)); } IComputationNode* WrapBlockCombineHashed(TCallable& callable, const TComputationNodeFactoryContext& ctx) { - MKQL_ENSURE(callable.GetInputsCount() == 5, "Expected 5 args"); + MKQL_ENSURE(callable.GetInputsCount() == 4, "Expected 4 args"); const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType()); auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0)); MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); - ui32 countColumn = AS_VALUE(TDataLiteral, callable.GetInput(1))->AsValue().Get<ui32>(); - auto filterColumnVal = AS_VALUE(TOptionalLiteral, callable.GetInput(2)); + auto filterColumnVal = AS_VALUE(TOptionalLiteral, callable.GetInput(1)); std::optional<ui32> filterColumn; if (filterColumnVal->HasItem()) { filterColumn = AS_VALUE(TDataLiteral, filterColumnVal->GetItem())->AsValue().Get<ui32>(); } - auto keysVal = AS_VALUE(TTupleLiteral, callable.GetInput(3)); + auto keysVal = AS_VALUE(TTupleLiteral, callable.GetInput(2)); std::vector<TKeyParams> keys; for (ui32 i = 0; i < keysVal->GetValuesCount(); ++i) { ui32 index = AS_VALUE(TDataLiteral, keysVal->GetValue(i))->AsValue().Get<ui32>(); keys.emplace_back(TKeyParams{ index, tupleType->GetElementType(index) }); } - auto aggsVal = AS_VALUE(TTupleLiteral, callable.GetInput(4)); + auto aggsVal = AS_VALUE(TTupleLiteral, callable.GetInput(3)); TVector<TAggParams> aggsParams; FillAggParams(aggsVal, tupleType, aggsParams); - return new TBlockCombineHashedWrapper(ctx.Mutables, wideFlow, countColumn, filterColumn, tupleType->GetElementsCount(), keys, std::move(aggsParams)); + + ui32 totalKeysSize = 0; + std::vector<std::unique_ptr<IKeySerializer>> keySerializers; + for (const auto& k : keys) { + auto itemType = AS_TYPE(TBlockType, k.Type)->GetItemType(); + bool isOptional; + auto dataType = UnpackOptionalData(itemType, isOptional); + if (isOptional) { + totalKeysSize += 1; + } + + switch (*dataType->GetDataSlot()) { + case NUdf::EDataSlot::Int8: + totalKeysSize += 1; + if (isOptional) { + keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<i8, arrow::Int8Scalar, arrow::Int8Builder, true>>(arrow::int8())); + } else { + keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<i8, arrow::Int8Scalar, arrow::Int8Builder, false>>(arrow::int8())); + } + + break; + case NUdf::EDataSlot::Bool: + case NUdf::EDataSlot::Uint8: + totalKeysSize += 1; + if (isOptional) { + keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui8, arrow::UInt8Scalar, arrow::UInt8Builder, true>>(arrow::uint8())); + } else { + keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui8, arrow::UInt8Scalar, arrow::UInt8Builder, false>>(arrow::uint8())); + } + + break; + case NUdf::EDataSlot::Int16: + totalKeysSize += 2; + if (isOptional) { + keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<i16, arrow::Int16Scalar, arrow::Int16Builder, true>>(arrow::int16())); + } else { + keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<i16, arrow::Int16Scalar, arrow::Int16Builder, false>>(arrow::int16())); + } + + break; + case NUdf::EDataSlot::Uint16: + case NUdf::EDataSlot::Date: + totalKeysSize += 2; + if (isOptional) { + keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui16, arrow::UInt16Scalar, arrow::UInt16Builder, true>>(arrow::uint16())); + } else { + keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui16, arrow::UInt16Scalar, arrow::UInt16Builder, false>>(arrow::uint16())); + } + + break; + case NUdf::EDataSlot::Int32: + totalKeysSize += 4; + if (isOptional) { + keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<i32, arrow::Int32Scalar, arrow::Int32Builder, true>>(arrow::int32())); + } else { + keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<i32, arrow::Int32Scalar, arrow::Int32Builder, false>>(arrow::int32())); + } + + break; + case NUdf::EDataSlot::Uint32: + case NUdf::EDataSlot::Datetime: + totalKeysSize += 4; + if (isOptional) { + keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui32, arrow::UInt32Scalar, arrow::UInt32Builder, true>>(arrow::uint32())); + } else { + keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui32, arrow::UInt32Scalar, arrow::UInt32Builder, false>>(arrow::uint32())); + } + + break; + case NUdf::EDataSlot::Int64: + case NUdf::EDataSlot::Interval: + totalKeysSize += 8; + if (isOptional) { + keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<i64, arrow::Int64Scalar, arrow::Int64Builder, true>>(arrow::int64())); + } else { + keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<i64, arrow::Int64Scalar, arrow::Int64Builder, false>>(arrow::int64())); + } + + break; + case NUdf::EDataSlot::Uint64: + case NUdf::EDataSlot::Timestamp: + totalKeysSize += 8; + if (isOptional) { + keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui64, arrow::UInt64Scalar, arrow::UInt64Builder, true>>(arrow::uint64())); + } else { + keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui64, arrow::UInt64Scalar, arrow::UInt64Builder, false>>(arrow::uint64())); + } + + break; + default: + throw yexception() << "Unsupported key type"; + } + } + + if (totalKeysSize <= sizeof(ui32)) { + return new TBlockCombineHashedWrapper<ui32>(ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), keys, std::move(keySerializers), std::move(aggsParams)); + } + + if (totalKeysSize <= sizeof(ui64)) { + return new TBlockCombineHashedWrapper<ui64>(ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), keys, std::move(keySerializers), std::move(aggsParams)); + } + + return new TBlockCombineHashedWrapper<TSSOKey>(ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), keys, std::move(keySerializers), std::move(aggsParams)); } } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_rh_hash.h b/ydb/library/yql/minikql/comp_nodes/mkql_rh_hash.h index 863f8cfa827..8f3744bdc30 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_rh_hash.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_rh_hash.h @@ -8,11 +8,13 @@ namespace NKikimr { namespace NMiniKQL { //TODO: only POD key & payloads are now supported -template <typename TKey, typename TEqual, typename THash, typename TDeriv> +template <typename TKey, typename TEqual, typename THash, typename TAllocator, typename TDeriv> class TRobinHoodHashBase { protected: using TPSLStorage = i32; + using TVec = std::vector<char, TAllocator>; + explicit TRobinHoodHashBase(ui64 initialCapacity = 1u << 8) : Capacity(initialCapacity) { @@ -87,7 +89,7 @@ public: } private: - Y_FORCE_INLINE char* InsertImpl(TKey key, bool& isNew, ui64 capacity, std::vector<char>& data) { + Y_FORCE_INLINE char* InsertImpl(TKey key, bool& isNew, ui64 capacity, TVec& data) { isNew = false; ui64 bucket = THash()(key) & (capacity - 1); char* ptr = data.data() + AsDeriv().GetCellSize() * bucket; @@ -143,7 +145,7 @@ private: } void Grow() { - std::vector<char> newData; + TVec newData; auto newCapacity = Capacity * 2; Allocate(newCapacity, newData); for (auto iter = Begin(); iter != End(); Advance(iter)) { @@ -161,7 +163,7 @@ private: Capacity = newCapacity; } - void AdvancePointer(char*& ptr, std::vector<char>& data) const { + void AdvancePointer(char*& ptr, TVec& data) const { ptr += AsDeriv().GetCellSize(); ptr = (ptr == data.data() + data.size()) ? data.data() : ptr; } @@ -172,7 +174,7 @@ protected: } private: - void Allocate(ui64 capacity, std::vector<char>& data) const { + void Allocate(ui64 capacity, TVec& data) const { data.resize(AsDeriv().GetCellSize() * capacity); char* ptr = data.data(); for (ui64 i = 0; i < capacity; ++i) { @@ -192,14 +194,14 @@ private: private: ui64 Size = 0; ui64 Capacity; - std::vector<char> Data; + TVec Data; }; -template <typename TKey, typename TEqual = std::equal_to<TKey>, typename THash = std::hash<TKey>> -class TRobinHoodHashMap : public TRobinHoodHashBase<TKey, TEqual, THash, TRobinHoodHashMap<TKey, TEqual, THash>> { +template <typename TKey, typename TEqual = std::equal_to<TKey>, typename THash = std::hash<TKey>, typename TAllocator = std::allocator<char>> +class TRobinHoodHashMap : public TRobinHoodHashBase<TKey, TEqual, THash, TAllocator, TRobinHoodHashMap<TKey, TEqual, THash, TAllocator>> { public: - using TSelf = TRobinHoodHashMap<TKey, TEqual, THash>; - using TBase = TRobinHoodHashBase<TKey, TEqual, THash, TSelf>; + using TSelf = TRobinHoodHashMap<TKey, TEqual, THash, TAllocator>; + using TBase = TRobinHoodHashBase<TKey, TEqual, THash, TAllocator, TSelf>; explicit TRobinHoodHashMap(ui32 payloadSize, ui64 initialCapacity = 1u << 8) : TBase(initialCapacity) @@ -244,14 +246,14 @@ public: private: const ui32 CellSize; const ui32 PayloadSize; - std::vector<char> TmpPayload, TmpPayload2; + typename TBase::TVec TmpPayload, TmpPayload2; }; -template <typename TKey, typename TEqual = std::equal_to<TKey>, typename THash = std::hash<TKey>> -class TRobinHoodHashSet : public TRobinHoodHashBase<TKey, TEqual, THash, TRobinHoodHashSet<TKey, TEqual, THash>> { +template <typename TKey, typename TEqual = std::equal_to<TKey>, typename THash = std::hash<TKey>, typename TAllocator = std::allocator<char>> +class TRobinHoodHashSet : public TRobinHoodHashBase<TKey, TEqual, THash, TAllocator, TRobinHoodHashSet<TKey, TEqual, THash, TAllocator>> { public: - using TSelf = TRobinHoodHashSet<TKey, TEqual, THash>; - using TBase = TRobinHoodHashBase<TKey, TEqual, THash, TSelf>; + using TSelf = TRobinHoodHashSet<TKey, TEqual, THash, TAllocator>; + using TBase = TRobinHoodHashBase<TKey, TEqual, THash, TAllocator, TSelf>; explicit TRobinHoodHashSet(ui64 initialCapacity = 1u << 8) : TBase(initialCapacity) diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp index ffe022d2efe..53c27b7e471 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.cpp +++ b/ydb/library/yql/minikql/mkql_program_builder.cpp @@ -5308,7 +5308,7 @@ TRuntimeNode TProgramBuilder::BlockBitCast(TRuntimeNode value, TType* targetType return TRuntimeNode(builder.Build(), false); } -TRuntimeNode TProgramBuilder::BlockCombineAll(TRuntimeNode flow, ui32 countColumn, std::optional<ui32> filterColumn, +TRuntimeNode TProgramBuilder::BlockCombineAll(TRuntimeNode flow, std::optional<ui32> filterColumn, const TArrayRef<const TAggInfo>& aggs, TType* returnType) { if constexpr (RuntimeVersion < 31U) { THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; @@ -5316,7 +5316,6 @@ TRuntimeNode TProgramBuilder::BlockCombineAll(TRuntimeNode flow, ui32 countColum TCallableBuilder builder(Env, __func__, returnType); builder.Add(flow); - builder.Add(NewDataLiteral<ui32>(countColumn)); if (!filterColumn) { builder.Add(NewEmptyOptionalDataLiteral(NUdf::TDataType<ui32>::Id)); } else { @@ -5338,7 +5337,7 @@ TRuntimeNode TProgramBuilder::BlockCombineAll(TRuntimeNode flow, ui32 countColum return TRuntimeNode(builder.Build(), false); } -TRuntimeNode TProgramBuilder::BlockCombineHashed(TRuntimeNode flow, ui32 countColumn, std::optional<ui32> filterColumn, const TArrayRef<ui32>& keys, +TRuntimeNode TProgramBuilder::BlockCombineHashed(TRuntimeNode flow, std::optional<ui32> filterColumn, const TArrayRef<ui32>& keys, const TArrayRef<const TAggInfo>& aggs, TType* returnType) { if constexpr (RuntimeVersion < 31U) { THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; @@ -5346,7 +5345,6 @@ TRuntimeNode TProgramBuilder::BlockCombineHashed(TRuntimeNode flow, ui32 countCo TCallableBuilder builder(Env, __func__, returnType); builder.Add(flow); - builder.Add(NewDataLiteral<ui32>(countColumn)); if (!filterColumn) { builder.Add(NewEmptyOptionalDataLiteral(NUdf::TDataType<ui32>::Id)); } else { diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h index 868be9f5775..1214c5ddc5a 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.h +++ b/ydb/library/yql/minikql/mkql_program_builder.h @@ -257,9 +257,9 @@ public: TRuntimeNode BlockFunc(const std::string_view& funcName, TType* returnType, const TArrayRef<const TRuntimeNode>& args); TRuntimeNode BlockBitCast(TRuntimeNode value, TType* targetType); - TRuntimeNode BlockCombineAll(TRuntimeNode flow, ui32 countColumn, std::optional<ui32> filterColumn, + TRuntimeNode BlockCombineAll(TRuntimeNode flow, std::optional<ui32> filterColumn, const TArrayRef<const TAggInfo>& aggs, TType* returnType); - TRuntimeNode BlockCombineHashed(TRuntimeNode flow, ui32 countColumn, std::optional<ui32> filterColumn, const TArrayRef<ui32>& keys, + TRuntimeNode BlockCombineHashed(TRuntimeNode flow, std::optional<ui32> filterColumn, const TArrayRef<ui32>& keys, const TArrayRef<const TAggInfo>& aggs, TType* returnType); // udfs diff --git a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp index 071837afd90..321e3452877 100644 --- a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp +++ b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp @@ -2406,14 +2406,13 @@ TMkqlCommonCallableCompiler::TShared::TShared() { AddCallable("BlockCombineAll", [](const TExprNode& node, TMkqlBuildContext& ctx) { auto arg = MkqlBuildExpr(*node.Child(0), ctx); - ui32 countColumn = FromString<ui32>(node.Child(1)->Content()); std::optional<ui32> filterColumn; - if (!node.Child(2)->IsCallable("Void")) { - filterColumn = FromString<ui32>(node.Child(2)->Content()); + if (!node.Child(1)->IsCallable("Void")) { + filterColumn = FromString<ui32>(node.Child(1)->Content()); } TVector<TAggInfo> aggs; - for (const auto& agg : node.Child(3)->Children()) { + for (const auto& agg : node.Child(2)->Children()) { TAggInfo info; info.Name = TString(agg->Head().Head().Content()); for (ui32 i = 1; i < agg->ChildrenSize(); ++i) { @@ -2424,24 +2423,23 @@ TMkqlCommonCallableCompiler::TShared::TShared() { } auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder); - return ctx.ProgramBuilder.BlockCombineAll(arg, countColumn, filterColumn, aggs, returnType); + return ctx.ProgramBuilder.BlockCombineAll(arg, filterColumn, aggs, returnType); }); AddCallable("BlockCombineHashed", [](const TExprNode& node, TMkqlBuildContext& ctx) { auto arg = MkqlBuildExpr(*node.Child(0), ctx); - ui32 countColumn = FromString<ui32>(node.Child(1)->Content()); std::optional<ui32> filterColumn; - if (!node.Child(2)->IsCallable("Void")) { - filterColumn = FromString<ui32>(node.Child(2)->Content()); + if (!node.Child(1)->IsCallable("Void")) { + filterColumn = FromString<ui32>(node.Child(1)->Content()); } TVector<ui32> keys; - for (const auto& key : node.Child(3)->Children()) { + for (const auto& key : node.Child(2)->Children()) { keys.push_back(FromString<ui32>(key->Content())); } TVector<TAggInfo> aggs; - for (const auto& agg : node.Child(4)->Children()) { + for (const auto& agg : node.Child(3)->Children()) { TAggInfo info; info.Name = TString(agg->Head().Head().Content()); for (ui32 i = 1; i < agg->ChildrenSize(); ++i) { @@ -2452,7 +2450,7 @@ TMkqlCommonCallableCompiler::TShared::TShared() { } auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder); - return ctx.ProgramBuilder.BlockCombineHashed(arg, countColumn, filterColumn, keys, aggs, returnType); + return ctx.ProgramBuilder.BlockCombineHashed(arg, filterColumn, keys, aggs, returnType); }); AddCallable("BlockCompress", [](const TExprNode& node, TMkqlBuildContext& ctx) { |