diff options
| author | vvvv <[email protected]> | 2022-12-16 20:52:04 +0300 | 
|---|---|---|
| committer | vvvv <[email protected]> | 2022-12-16 20:52:04 +0300 | 
| commit | c196050dfde71d4cd34f0f50366049aaf582d919 (patch) | |
| tree | e657912d9cfe63cc8e0173c6178d57b869a965e8 | |
| parent | f6fd13cea19380072929532112fd9d5c4e9e7857 (diff) | |
WIP support of final aggregation by keys (minikql only)
10 files changed, 1427 insertions, 625 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp index ae8af5df98f..66aab94ef4b 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp @@ -110,8 +110,9 @@ namespace NMiniKQL {  namespace { +template <typename T>  struct TAggParams { -    std::unique_ptr<IPreparedBlockAggregator> Prepared_; +    std::unique_ptr<IPreparedBlockAggregator<T>> Prepared_;  };  struct TKeyParams { @@ -311,7 +312,7 @@ public:          IComputationWideFlowNode* flow,          std::optional<ui32> filterColumn,          size_t width, -        TVector<TAggParams>&& aggsParams) +        TVector<TAggParams<IBlockAggregatorCombineAll>>&& aggsParams)          : TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any)          , Flow_(flow)          , FilterColumn_(filterColumn) @@ -394,12 +395,12 @@ private:      struct TState : public TComputationValue<TState> {          TVector<NUdf::TUnboxedValue> Values_;          TVector<NUdf::TUnboxedValue*> ValuePointers_; -        TVector<std::unique_ptr<IBlockAggregator>> Aggs_; +        TVector<std::unique_ptr<IBlockAggregatorCombineAll>> Aggs_;          bool IsFinished_ = false;          bool HasValues_ = false;          TVector<char> AggStates_; -        TState(TMemoryUsageInfo* memInfo, size_t width, std::optional<ui32> filterColumn, const TVector<TAggParams>& params, TComputationContext& ctx) +        TState(TMemoryUsageInfo* memInfo, size_t width, std::optional<ui32> filterColumn, const TVector<TAggParams<IBlockAggregatorCombineAll>>& params, TComputationContext& ctx)              : TComputationValue(memInfo)              , Values_(width)              , ValuePointers_(width) @@ -443,7 +444,7 @@ private:      IComputationWideFlowNode* Flow_;      std::optional<ui32> FilterColumn_;      const size_t Width_; -    const TVector<TAggParams> AggsParams_; +    const TVector<TAggParams<IBlockAggregatorCombineAll>> AggsParams_;  };  template <typename T> @@ -482,19 +483,19 @@ TStringBuf GetKeyView(const TSSOKey& key) {      return key.AsView();  } -template <typename TKey, bool UseSet, bool UseFilter> -class TBlockCombineHashedWrapper : public TStatefulWideFlowComputationNode<TBlockCombineHashedWrapper<TKey, UseSet, UseFilter>> { +template <typename TKey, typename TAggregator, bool UseSet, bool UseFilter, bool Finalize, typename TDerived> +class THashedWrapperBase : public TStatefulWideFlowComputationNode<TDerived> {  public: -    using TSelf = TBlockCombineHashedWrapper<TKey, UseSet, UseFilter>; -    using TBase = TStatefulWideFlowComputationNode<TSelf>; +    using TSelf = THashedWrapperBase<TKey, TAggregator, UseSet, UseFilter, Finalize, TDerived>; +    using TBase = TStatefulWideFlowComputationNode<TDerived>; -    TBlockCombineHashedWrapper(TComputationMutables& mutables, +    THashedWrapperBase(TComputationMutables& mutables,          IComputationWideFlowNode* flow,          std::optional<ui32> filterColumn,          size_t width,          const std::vector<TKeyParams>& keys,          std::vector<std::unique_ptr<IKeySerializer>>&& keySerializers, -        TVector<TAggParams>&& aggsParams) +        TVector<TAggParams<TAggregator>>&& aggsParams)          : TBase(mutables, flow, EValueRepresentation::Any)          , Flow_(flow)          , FilterColumn_(filterColumn) @@ -507,6 +508,7 @@ public:          MKQL_ENSURE(Width_ > 0, "Missing block length column");          if constexpr (UseFilter) {              MKQL_ENSURE(filterColumn, "Missing filter column"); +            MKQL_ENSURE(!Finalize, "Filter isn't compatible with Finalize");          } else {              MKQL_ENSURE(!filterColumn, "Unexpected filter column");          } @@ -588,7 +590,11 @@ public:                          if (isNew) {                              for (size_t i = 0; i < s.Aggs_.size(); ++i) {                                  if (output[Keys_.size() + i]) { -                                    s.Aggs_[i]->InitKey(ptr, s.Values_.data(), row); +                                    if constexpr (Finalize) { +                                        s.Aggs_[i]->LoadState(ptr, s.Values_.data(), row); +                                    } else { +                                        s.Aggs_[i]->InitKey(ptr, s.Values_.data(), row); +                                    }                                  }                                  ptr += s.Aggs_[i]->StateSize; @@ -602,7 +608,11 @@ public:                          } else {                              for (size_t i = 0; i < s.Aggs_.size(); ++i) {                                  if (output[Keys_.size() + i]) { -                                    s.Aggs_[i]->UpdateKey(ptr, s.Values_.data(), row); +                                    if constexpr (Finalize) { +                                        s.Aggs_[i]->UpdateState(ptr, s.Values_.data(), row); +                                    } else { +                                        s.Aggs_[i]->UpdateKey(ptr, s.Values_.data(), row); +                                    }                                  }                                  ptr += s.Aggs_[i]->StateSize; @@ -644,7 +654,11 @@ public:                  } else {                      TVector<std::unique_ptr<IAggColumnBuilder>> aggBuilders;                      for (const auto& a : s.Aggs_) { -                        aggBuilders.emplace_back(a->MakeBuilder(size)); +                        if constexpr (Finalize) { +                            aggBuilders.emplace_back(a->MakeResultBuilder(size)); +                        } else { +                            aggBuilders.emplace_back(a->MakeStateBuilder(size)); +                        }                      }                      for (auto iter = s.HashMap_->Begin(); iter != s.HashMap_->End(); s.HashMap_->Advance(iter)) { @@ -694,7 +708,7 @@ private:          TVector<NUdf::TUnboxedValue> Values_;          TVector<NUdf::TUnboxedValue*> ValuePointers_; -        TVector<std::unique_ptr<IBlockAggregator>> Aggs_; +        TVector<std::unique_ptr<TAggregator>> Aggs_;          bool IsFinished_ = false;          bool HasValues_ = false;          ui32 TotalStateSize_ = 0; @@ -702,7 +716,7 @@ private:          std::unique_ptr<TRobinHoodHashSet<TKey, std::equal_to<TKey>, std::hash<TKey>, TMKQLAllocator<char>>> HashSet_;          TPagedArena Arena_; -        TState(TMemoryUsageInfo* memInfo, size_t width, std::optional<ui32> filterColumn, const TVector<TAggParams>& params, TComputationContext& ctx) +        TState(TMemoryUsageInfo* memInfo, size_t width, std::optional<ui32> filterColumn, const TVector<TAggParams<TAggregator>>& params, TComputationContext& ctx)              : TBase(memInfo)              , Values_(width)              , ValuePointers_(width) @@ -748,11 +762,80 @@ private:      const size_t Width_;      const size_t OutputWidth_;      const std::vector<TKeyParams> Keys_; -    const TVector<TAggParams> AggsParams_; +    const TVector<TAggParams<TAggregator>> AggsParams_;      std::vector<std::unique_ptr<IKeySerializer>> KeySerializers_;  }; -void FillAggParams(TTupleLiteral* aggsVal, TTupleType* tupleType, std::optional<ui32> filterColumn, TVector<TAggParams>& aggsParams, const TTypeEnvironment& env) { +template <typename TKey, bool UseSet, bool UseFilter> +class TBlockCombineHashedWrapper : public THashedWrapperBase<TKey, IBlockAggregatorCombineKeys, UseSet, UseFilter, false, TBlockCombineHashedWrapper<TKey, UseSet, UseFilter>> { +public: +    using TSelf = TBlockCombineHashedWrapper<TKey, UseSet, UseFilter>; +    using TBase = THashedWrapperBase<TKey, IBlockAggregatorCombineKeys, UseSet, UseFilter, false, TSelf>; + +    TBlockCombineHashedWrapper(TComputationMutables& mutables, +        IComputationWideFlowNode* flow, +        std::optional<ui32> filterColumn, +        size_t width, +        const std::vector<TKeyParams>& keys, +        std::vector<std::unique_ptr<IKeySerializer>>&& keySerializers, +        TVector<TAggParams<IBlockAggregatorCombineKeys>>&& aggsParams) +        : TBase(mutables, flow, filterColumn, width, keys, std::move(keySerializers), std::move(aggsParams)) +    {} +}; + +template <typename TKey, bool UseSet> +class TBlockMergeFinalizeHashedWrapper : public THashedWrapperBase<TKey, IBlockAggregatorFinalizeKeys, UseSet, false, true, TBlockMergeFinalizeHashedWrapper<TKey, UseSet>> { +public: +    using TSelf = TBlockMergeFinalizeHashedWrapper<TKey, UseSet>; +    using TBase = THashedWrapperBase<TKey, IBlockAggregatorFinalizeKeys, UseSet, false, true, TSelf>; + +    TBlockMergeFinalizeHashedWrapper(TComputationMutables& mutables, +        IComputationWideFlowNode* flow, +        size_t width, +        const std::vector<TKeyParams>& keys, +        std::vector<std::unique_ptr<IKeySerializer>>&& keySerializers, +        TVector<TAggParams<IBlockAggregatorFinalizeKeys>>&& aggsParams) +        : TBase(mutables, flow, {}, width, keys, std::move(keySerializers), std::move(aggsParams)) +    {} +}; + +template <typename TAggregator> +std::unique_ptr<IPreparedBlockAggregator<TAggregator>> PrepareBlockAggregator(const IBlockAggregatorFactory& factory, +    TTupleType* tupleType, +    std::optional<ui32> filterColumn, +    const std::vector<ui32>& argsColumns, +    const TTypeEnvironment& env); + +template <> +std::unique_ptr<IPreparedBlockAggregator<IBlockAggregatorCombineAll>> PrepareBlockAggregator<IBlockAggregatorCombineAll>(const IBlockAggregatorFactory& factory, +    TTupleType* tupleType, +    std::optional<ui32> filterColumn, +    const std::vector<ui32>& argsColumns, +    const TTypeEnvironment& env) { +    return factory.PrepareCombineAll(tupleType, filterColumn, argsColumns, env); +} + +template <> +std::unique_ptr<IPreparedBlockAggregator<IBlockAggregatorCombineKeys>> PrepareBlockAggregator<IBlockAggregatorCombineKeys>(const IBlockAggregatorFactory& factory, +    TTupleType* tupleType, +    std::optional<ui32> filterColumn, +    const std::vector<ui32>& argsColumns, +    const TTypeEnvironment& env) { +    return factory.PrepareCombineKeys(tupleType, filterColumn, argsColumns, env); +} + +template <> +std::unique_ptr<IPreparedBlockAggregator<IBlockAggregatorFinalizeKeys>> PrepareBlockAggregator<IBlockAggregatorFinalizeKeys>(const IBlockAggregatorFactory& factory, +    TTupleType* tupleType, +    std::optional<ui32> filterColumn, +    const std::vector<ui32>& argsColumns, +    const TTypeEnvironment& env) { +    MKQL_ENSURE(!filterColumn, "Unexpected filter column"); +    return factory.PrepareFinalizeKeys(tupleType, argsColumns, env); +} + +template <typename TAggregator> +void FillAggParams(TTupleLiteral* aggsVal, TTupleType* tupleType, std::optional<ui32> filterColumn, TVector<TAggParams<TAggregator>>& aggsParams, const TTypeEnvironment& env) {      for (ui32 i = 0; i < aggsVal->GetValuesCount(); ++i) {          auto aggVal = AS_VALUE(TTupleLiteral, aggsVal->GetValue(i));          auto name = AS_VALUE(TDataLiteral, aggVal->GetValue(0))->AsValue().AsStringRef(); @@ -762,8 +845,8 @@ void FillAggParams(TTupleLiteral* aggsVal, TTupleType* tupleType, std::optional<              argColumns.push_back(AS_VALUE(TDataLiteral, aggVal->GetValue(j))->AsValue().Get<ui32>());          } -        TAggParams p; -        p.Prepared_ = PrepareBlockAggregator(name, tupleType, filterColumn, argColumns, env); +        TAggParams<TAggregator> p; +        p.Prepared_ = PrepareBlockAggregator<TAggregator>(GetBlockAggregatorFactory(name), tupleType, filterColumn, argColumns, env);          aggsParams.emplace_back(std::move(p));      }  } @@ -777,7 +860,7 @@ IComputationNode* MakeBlockCombineHashedWrapper(      size_t width,      const std::vector<TKeyParams>& keys,      std::vector<std::unique_ptr<IKeySerializer>>&& keySerializers, -    TVector<TAggParams>&& aggsParams) { +    TVector<TAggParams<IBlockAggregatorCombineKeys>>&& aggsParams) {      if (totalKeysSize <= sizeof(ui32)) {          return new TBlockCombineHashedWrapper<ui32, UseSet, UseFilter>(mutables, flow, filterColumn, width, keys, std::move(keySerializers), std::move(aggsParams));      } @@ -789,55 +872,29 @@ IComputationNode* MakeBlockCombineHashedWrapper(      return new TBlockCombineHashedWrapper<TSSOKey, UseSet, UseFilter>(mutables, flow, filterColumn, width, keys, std::move(keySerializers), std::move(aggsParams));  } -} - -IComputationNode* WrapBlockCombineAll(TCallable& callable, const TComputationNodeFactoryContext& ctx) { -    MKQL_ENSURE(callable.GetInputsCount() == 3, "Expected 3 args"); -    const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); -    const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType()); - -    auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0)); -    MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); - -    auto filterColumnVal = AS_VALUE(TOptionalLiteral, callable.GetInput(1)); -    std::optional<ui32> filterColumn; -    if (filterColumnVal->HasItem()) { -        filterColumn = AS_VALUE(TDataLiteral, filterColumnVal->GetItem())->AsValue().Get<ui32>(); -    } - -    auto aggsVal = AS_VALUE(TTupleLiteral, callable.GetInput(2)); -    TVector<TAggParams> aggsParams; -    FillAggParams(aggsVal, tupleType, filterColumn, aggsParams, ctx.Env); -    return new TBlockCombineAllWrapper(ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), std::move(aggsParams)); -} - -IComputationNode* WrapBlockCombineHashed(TCallable& callable, const TComputationNodeFactoryContext& ctx) { -    MKQL_ENSURE(callable.GetInputsCount() == 4, "Expected 4 args"); -    const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); -    const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType()); - -    auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0)); -    MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); - -    auto filterColumnVal = AS_VALUE(TOptionalLiteral, callable.GetInput(1)); -    std::optional<ui32> filterColumn; -    if (filterColumnVal->HasItem()) { -        filterColumn = AS_VALUE(TDataLiteral, filterColumnVal->GetItem())->AsValue().Get<ui32>(); +template <bool UseSet> +IComputationNode* MakeBlockMergeFinalizeHashedWrapper( +    ui32 totalKeysSize, +    TComputationMutables& mutables, +    IComputationWideFlowNode* flow, +    size_t width, +    const std::vector<TKeyParams>& keys, +    std::vector<std::unique_ptr<IKeySerializer>>&& keySerializers, +    TVector<TAggParams<IBlockAggregatorFinalizeKeys>>&& aggsParams) { +    if (totalKeysSize <= sizeof(ui32)) { +        return new TBlockMergeFinalizeHashedWrapper<ui32, UseSet>(mutables, flow, width, keys, std::move(keySerializers), std::move(aggsParams));      } -    auto keysVal = AS_VALUE(TTupleLiteral, callable.GetInput(2)); -    std::vector<TKeyParams> keys; -    for (ui32 i = 0; i < keysVal->GetValuesCount(); ++i) { -        ui32 index = AS_VALUE(TDataLiteral, keysVal->GetValue(i))->AsValue().Get<ui32>(); -        keys.emplace_back(TKeyParams{ index, tupleType->GetElementType(index) }); +    if (totalKeysSize <= sizeof(ui64)) { +        return new TBlockMergeFinalizeHashedWrapper<ui64, UseSet>(mutables, flow, width, keys, std::move(keySerializers), std::move(aggsParams));      } -    auto aggsVal = AS_VALUE(TTupleLiteral, callable.GetInput(3)); -    TVector<TAggParams> aggsParams; -    FillAggParams(aggsVal, tupleType, filterColumn, aggsParams, ctx.Env); +    return new TBlockMergeFinalizeHashedWrapper<TSSOKey, UseSet>(mutables, flow, width, keys, std::move(keySerializers), std::move(aggsParams)); +} -    ui32 totalKeysSize = 0; -    std::vector<std::unique_ptr<IKeySerializer>> keySerializers; +void PrepareKeys(const std::vector<TKeyParams>& keys, ui32& totalKeysSize, std::vector<std::unique_ptr<IKeySerializer>>& keySerializers) { +    totalKeysSize = 0; +    keySerializers.clear();      for (const auto& k : keys) {          auto itemType = AS_TYPE(TBlockType, k.Type)->GetItemType();          bool isOptional; @@ -928,6 +985,58 @@ IComputationNode* WrapBlockCombineHashed(TCallable& callable, const TComputation              throw yexception() << "Unsupported key type";          }      } +} + +} + +IComputationNode* WrapBlockCombineAll(TCallable& callable, const TComputationNodeFactoryContext& ctx) { +    MKQL_ENSURE(callable.GetInputsCount() == 3, "Expected 3 args"); +    const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); +    const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType()); + +    auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0)); +    MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); + +    auto filterColumnVal = AS_VALUE(TOptionalLiteral, callable.GetInput(1)); +    std::optional<ui32> filterColumn; +    if (filterColumnVal->HasItem()) { +        filterColumn = AS_VALUE(TDataLiteral, filterColumnVal->GetItem())->AsValue().Get<ui32>(); +    } + +    auto aggsVal = AS_VALUE(TTupleLiteral, callable.GetInput(2)); +    TVector<TAggParams<IBlockAggregatorCombineAll>> aggsParams; +    FillAggParams<IBlockAggregatorCombineAll>(aggsVal, tupleType, filterColumn, aggsParams, ctx.Env); +    return new TBlockCombineAllWrapper(ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), std::move(aggsParams)); +} + +IComputationNode* WrapBlockCombineHashed(TCallable& callable, const TComputationNodeFactoryContext& ctx) { +    MKQL_ENSURE(callable.GetInputsCount() == 4, "Expected 4 args"); +    const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); +    const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType()); + +    auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0)); +    MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); + +    auto filterColumnVal = AS_VALUE(TOptionalLiteral, callable.GetInput(1)); +    std::optional<ui32> filterColumn; +    if (filterColumnVal->HasItem()) { +        filterColumn = AS_VALUE(TDataLiteral, filterColumnVal->GetItem())->AsValue().Get<ui32>(); +    } + +    auto keysVal = AS_VALUE(TTupleLiteral, callable.GetInput(2)); +    std::vector<TKeyParams> keys; +    for (ui32 i = 0; i < keysVal->GetValuesCount(); ++i) { +        ui32 index = AS_VALUE(TDataLiteral, keysVal->GetValue(i))->AsValue().Get<ui32>(); +        keys.emplace_back(TKeyParams{ index, tupleType->GetElementType(index) }); +    } + +    auto aggsVal = AS_VALUE(TTupleLiteral, callable.GetInput(3)); +    TVector<TAggParams<IBlockAggregatorCombineKeys>> aggsParams; +    FillAggParams<IBlockAggregatorCombineKeys>(aggsVal, tupleType, filterColumn, aggsParams, ctx.Env); + +    ui32 totalKeysSize = 0; +    std::vector<std::unique_ptr<IKeySerializer>> keySerializers; +    PrepareKeys(keys, totalKeysSize, keySerializers);      if (filterColumn) {          if (aggsParams.size() == 0) { @@ -944,5 +1053,35 @@ IComputationNode* WrapBlockCombineHashed(TCallable& callable, const TComputation      }  } +IComputationNode* WrapBlockMergeFinalizeHashed(TCallable& callable, const TComputationNodeFactoryContext& ctx) { +    MKQL_ENSURE(callable.GetInputsCount() == 3, "Expected 3 args"); +    const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); +    const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType()); + +    auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0)); +    MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); + +    auto keysVal = AS_VALUE(TTupleLiteral, callable.GetInput(1)); +    std::vector<TKeyParams> keys; +    for (ui32 i = 0; i < keysVal->GetValuesCount(); ++i) { +        ui32 index = AS_VALUE(TDataLiteral, keysVal->GetValue(i))->AsValue().Get<ui32>(); +        keys.emplace_back(TKeyParams{ index, tupleType->GetElementType(index) }); +    } + +    auto aggsVal = AS_VALUE(TTupleLiteral, callable.GetInput(2)); +    TVector<TAggParams<IBlockAggregatorFinalizeKeys>> aggsParams; +    FillAggParams<IBlockAggregatorFinalizeKeys>(aggsVal, tupleType, {}, aggsParams, ctx.Env); + +    ui32 totalKeysSize = 0; +    std::vector<std::unique_ptr<IKeySerializer>> keySerializers; +    PrepareKeys(keys, totalKeysSize, keySerializers); + +    if (aggsParams.size() == 0) { +        return MakeBlockMergeFinalizeHashedWrapper<true>(totalKeysSize, ctx.Mutables, wideFlow, tupleType->GetElementsCount(), keys, std::move(keySerializers), std::move(aggsParams)); +    } else { +        return MakeBlockMergeFinalizeHashedWrapper<false>(totalKeysSize, ctx.Mutables, wideFlow, tupleType->GetElementsCount(), keys, std::move(keySerializers), std::move(aggsParams)); +    } +} +  }  } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.h index 814514271d7..872788babec 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.h @@ -7,6 +7,7 @@ namespace NMiniKQL {  IComputationNode* WrapBlockCombineAll(TCallable& callable, const TComputationNodeFactoryContext& ctx);  IComputationNode* WrapBlockCombineHashed(TCallable& callable, const TComputationNodeFactoryContext& ctx); +IComputationNode* WrapBlockMergeFinalizeHashed(TCallable& callable, const TComputationNodeFactoryContext& ctx);  }  } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp index 5d1cae3a812..e40ae152f19 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp @@ -7,40 +7,52 @@  namespace NKikimr {  namespace NMiniKQL { -class TCountAllBlockAggregator : public TBlockAggregatorBase { +namespace { + +struct TState { +    ui64 Count_ = 0; +}; + +class TColumnBuilder : public IAggColumnBuilder {  public: -    struct TState { -        ui64 Count_ = 0; -    }; - -    class TColumnBuilder : public IAggColumnBuilder { -    public: -        TColumnBuilder(ui64 size, TComputationContext& ctx) -            : Builder_(arrow::uint64(), &ctx.ArrowMemoryPool) -            , Ctx_(ctx) -        { -            ARROW_OK(Builder_.Reserve(size)); -        } +    TColumnBuilder(ui64 size, TComputationContext& ctx) +        : Builder_(arrow::uint64(), &ctx.ArrowMemoryPool) +        , Ctx_(ctx) +    { +        ARROW_OK(Builder_.Reserve(size)); +    } -        void Add(const void* state) final { -            auto typedState = static_cast<const TState*>(state); -            Builder_.UnsafeAppend(typedState->Count_); -        } +    void Add(const void* state) final { +        auto typedState = static_cast<const TState*>(state); +        Builder_.UnsafeAppend(typedState->Count_); +    } -        NUdf::TUnboxedValue Build() final { -            std::shared_ptr<arrow::ArrayData> result; -            ARROW_OK(Builder_.FinishInternal(&result)); -            return Ctx_.HolderFactory.CreateArrowBlock(result); -        } +    NUdf::TUnboxedValue Build() final { +        std::shared_ptr<arrow::ArrayData> result; +        ARROW_OK(Builder_.FinishInternal(&result)); +        return Ctx_.HolderFactory.CreateArrowBlock(result); +    } -    private: -        arrow::UInt64Builder Builder_; -        TComputationContext& Ctx_; -    }; +private: +    arrow::UInt64Builder Builder_; +    TComputationContext& Ctx_; +}; + +template <typename TTag> +class TCountAllAggregator; + +template <typename TTag> +class TCountAggregator; + +template <> +class TCountAllAggregator<TCombineAllTag> : public TCombineAllTag::TBase { +public: +    using TBase = TCombineAllTag::TBase; -    TCountAllBlockAggregator(std::optional<ui32> filterColumn, TComputationContext& ctx) -        : TBlockAggregatorBase(sizeof(TState), filterColumn, ctx) +    TCountAllAggregator(std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx) +        : TBase(sizeof(TState), filterColumn, ctx)      { +        Y_UNUSED(argColumn);      }      void InitState(void* state) final { @@ -51,9 +63,10 @@ public:          auto typedState = static_cast<TState*>(state);          Y_UNUSED(columns);          if (filtered) { -           typedState->Count_ += *filtered; -        } else { -           typedState->Count_ += batchLength; +            typedState->Count_ += *filtered; +        } +        else { +            typedState->Count_ += batchLength;          }      } @@ -61,6 +74,18 @@ public:          auto typedState = static_cast<const TState*>(state);          return NUdf::TUnboxedValuePod(typedState->Count_);      } +}; + +template <> +class TCountAllAggregator<TCombineKeysTag> : public TCombineKeysTag::TBase { +public: +    using TBase = TCombineKeysTag::TBase; + +    TCountAllAggregator(std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx) +        : TBase(sizeof(TState), filterColumn, ctx) +    { +        Y_UNUSED(argColumn); +    }      void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {          new(state) TState(); @@ -74,44 +99,56 @@ public:          typedState->Count_ += 1;      } -    std::unique_ptr<IAggColumnBuilder> MakeBuilder(ui64 size) final { +    std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) final {          return std::make_unique<TColumnBuilder>(size, Ctx_);      }  }; -class TCountBlockAggregator : public TBlockAggregatorBase { +template <> +class TCountAllAggregator<TFinalizeKeysTag> : public TFinalizeKeysTag::TBase {  public: -    struct TState { -        ui64 Count_ = 0; -    }; - -    class TColumnBuilder : public IAggColumnBuilder { -    public: -        TColumnBuilder(ui64 size, TComputationContext& ctx) -            : Builder_(arrow::uint64(), &ctx.ArrowMemoryPool) -            , Ctx_(ctx) -        { -            ARROW_OK(Builder_.Reserve(size)); -        } +    using TBase = TFinalizeKeysTag::TBase; -        void Add(const void* state) final { -            auto typedState = static_cast<const TState*>(state); -            Builder_.UnsafeAppend(typedState->Count_); -        } +    TCountAllAggregator(std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx) +        : TBase(sizeof(TState), filterColumn, ctx) +        , ArgColumn_(argColumn) +    { +    } + +    void LoadState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { +        new(state) TState(); +        UpdateState(state, columns, row); +    } -        NUdf::TUnboxedValue Build() final { -            std::shared_ptr<arrow::ArrayData> result; -            ARROW_OK(Builder_.FinishInternal(&result)); -            return Ctx_.HolderFactory.CreateArrowBlock(result); +    void UpdateState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { +        auto typedState = static_cast<TState*>(state); +        const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); +        if (datum.is_scalar()) { +            MKQL_ENSURE(datum.scalar()->is_valid, "Expected not null"); +            typedState->Count_ += datum.scalar_as<arrow::UInt64Scalar>().value; +        } else { +            const auto& array = datum.array(); +            auto ptr = array->GetValues<ui64>(1); +            MKQL_ENSURE(array->GetNullCount() == 0, "Expected not null"); +            typedState->Count_ += ptr[row];          } +    } + +    std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final { +        return std::make_unique<TColumnBuilder>(size, Ctx_); +    } + +private: +    const ui32 ArgColumn_; +}; -    private: -        arrow::UInt64Builder Builder_; -        TComputationContext& Ctx_; -    }; +template <> +class TCountAggregator<TCombineAllTag> : public TCombineAllTag::TBase { +public: +    using TBase = TCombineAllTag::TBase; -    TCountBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx) -        : TBlockAggregatorBase(sizeof(TState), filterColumn, ctx) +    TCountAggregator(std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx) +        : TBase(sizeof(TState), filterColumn, ctx)          , ArgColumn_(argColumn)      {      } @@ -162,6 +199,21 @@ public:          return NUdf::TUnboxedValuePod(typedState->Count_);      } +private: +    const ui32 ArgColumn_; +}; + +template <> +class TCountAggregator<TCombineKeysTag> : public TCombineKeysTag::TBase { +public: +    using TBase = TCombineKeysTag::TBase; + +    TCountAggregator(std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx) +        : TBase(sizeof(TState), filterColumn, ctx) +        , ArgColumn_(argColumn) +    { +    } +      void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {          new(state) TState();          UpdateKey(state, columns, row); @@ -187,7 +239,7 @@ public:          }      } -    std::unique_ptr<IAggColumnBuilder> MakeBuilder(ui64 size) final { +    std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) final {          return std::make_unique<TColumnBuilder>(size, Ctx_);      } @@ -195,43 +247,44 @@ private:      const ui32 ArgColumn_;  }; -class TPreparedCountAllBlockAggregator : public IPreparedBlockAggregator { +template <> +class TCountAggregator<TFinalizeKeysTag> : public TCountAllAggregator<TFinalizeKeysTag> +{  public: -    TPreparedCountAllBlockAggregator(std::optional<ui32> filterColumn) +    using TBase = TCountAllAggregator<TFinalizeKeysTag>; + +    TCountAggregator(std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx) +        : TBase(filterColumn, argColumn, ctx) +    {} +}; + +template <typename TTag> +class TPreparedCountAll : public TTag::TPreparedAggregator { +public: +    TPreparedCountAll(std::optional<ui32> filterColumn, ui32 argColumn)          : FilterColumn_(filterColumn) +        , ArgColumn_(argColumn)      {} -    std::unique_ptr<IBlockAggregator> Make(TComputationContext& ctx) const final { -        return std::make_unique<TCountAllBlockAggregator>(FilterColumn_, ctx); +    std::unique_ptr<typename TTag::TAggregator> Make(TComputationContext& ctx) const final { +        return std::make_unique<TCountAllAggregator<TTag>>(FilterColumn_, ArgColumn_, ctx);      }  private:      const std::optional<ui32> FilterColumn_; +    const ui32 ArgColumn_;  }; -class TBlockCountAllFactory : public IBlockAggregatorFactory { -public: -   std::unique_ptr<IPreparedBlockAggregator> Prepare( -       TTupleType* tupleType, -       std::optional<ui32> filterColumn, -       const std::vector<ui32>& argsColumns, -       const TTypeEnvironment& env) const final { -       Y_UNUSED(tupleType); -       Y_UNUSED(argsColumns); -       Y_UNUSED(env); -       return std::make_unique<TPreparedCountAllBlockAggregator>(filterColumn); -   } -}; - -class TPreparedCountBlockAggregator : public IPreparedBlockAggregator { +template <typename TTag> +class TPreparedCount : public TTag::TPreparedAggregator {  public: -    TPreparedCountBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn) +    TPreparedCount(std::optional<ui32> filterColumn, ui32 argColumn)          : FilterColumn_(filterColumn)          , ArgColumn_(argColumn)      {} -    std::unique_ptr<IBlockAggregator> Make(TComputationContext& ctx) const final { -        return std::make_unique<TCountBlockAggregator>(FilterColumn_, ArgColumn_, ctx); +    std::unique_ptr<typename TTag::TAggregator> Make(TComputationContext& ctx) const final { +        return std::make_unique<TCountAggregator<TTag>>(FilterColumn_, ArgColumn_, ctx);      }  private: @@ -239,19 +292,87 @@ private:      const ui32 ArgColumn_;  }; +template <typename TTag> +std::unique_ptr<typename TTag::TPreparedAggregator> PrepareCountAll(std::optional<ui32> filterColumn, ui32 argColumn) { +    return std::make_unique<TPreparedCountAll<TTag>>(filterColumn, argColumn); +} + +template <typename TTag> +std::unique_ptr<typename TTag::TPreparedAggregator> PrepareCount(std::optional<ui32> filterColumn, ui32 argColumn) { +    return std::make_unique<TPreparedCount<TTag>>(filterColumn, argColumn); +} + +class TBlockCountAllFactory : public IBlockAggregatorFactory { +public: +    std::unique_ptr<TCombineAllTag::TPreparedAggregator> PrepareCombineAll( +        TTupleType* tupleType, +        std::optional<ui32> filterColumn, +        const std::vector<ui32>& argsColumns, +        const TTypeEnvironment& env) const final { +        Y_UNUSED(tupleType); +        Y_UNUSED(argsColumns); +        Y_UNUSED(env); +        return PrepareCountAll<TCombineAllTag>(filterColumn, 0); +    } + +    std::unique_ptr<TCombineKeysTag::TPreparedAggregator> PrepareCombineKeys( +        TTupleType* tupleType, +        std::optional<ui32> filterColumn, +        const std::vector<ui32>& argsColumns, +        const TTypeEnvironment& env) const final { +        Y_UNUSED(tupleType); +        Y_UNUSED(argsColumns); +        Y_UNUSED(env); +        return PrepareCountAll<TCombineKeysTag>(filterColumn, 0); +    } + +    std::unique_ptr<TFinalizeKeysTag::TPreparedAggregator> PrepareFinalizeKeys( +        TTupleType* tupleType, +        const std::vector<ui32>& argsColumns, +        const TTypeEnvironment& env) const final { +        Y_UNUSED(tupleType); +        Y_UNUSED(argsColumns); +        Y_UNUSED(env); +        return PrepareCountAll<TFinalizeKeysTag>(std::optional<ui32>(), argsColumns[0]); +    } +}; +  class TBlockCountFactory : public IBlockAggregatorFactory {  public: -   std::unique_ptr<IPreparedBlockAggregator> Prepare( -       TTupleType* tupleType, -       std::optional<ui32> filterColumn, -       const std::vector<ui32>& argsColumns, -       const TTypeEnvironment& env) const final { -       Y_UNUSED(tupleType); -       Y_UNUSED(env); -       return std::make_unique<TPreparedCountBlockAggregator>(filterColumn, argsColumns[0]); -   } +    std::unique_ptr<TCombineAllTag::TPreparedAggregator> PrepareCombineAll( +        TTupleType* tupleType, +        std::optional<ui32> filterColumn, +        const std::vector<ui32>& argsColumns, +        const TTypeEnvironment& env) const final { +        Y_UNUSED(tupleType); +        Y_UNUSED(env); +        return PrepareCount<TCombineAllTag>(filterColumn, argsColumns[0]); +    } + +    std::unique_ptr<TCombineKeysTag::TPreparedAggregator> PrepareCombineKeys( +        TTupleType* tupleType, +        std::optional<ui32> filterColumn, +        const std::vector<ui32>& argsColumns, +        const TTypeEnvironment& env) const final { +        Y_UNUSED(tupleType); +        Y_UNUSED(argsColumns); +        Y_UNUSED(env); +        return PrepareCount<TCombineKeysTag>(filterColumn, argsColumns[0]); +    } + +    std::unique_ptr<TFinalizeKeysTag::TPreparedAggregator> PrepareFinalizeKeys( +        TTupleType* tupleType, +        const std::vector<ui32>& argsColumns, +        const TTypeEnvironment& env) const final { +        Y_UNUSED(tupleType); +        Y_UNUSED(argsColumns); +        Y_UNUSED(env); +        return PrepareCount<TFinalizeKeysTag>(std::optional<ui32>(), argsColumns[0]); +    }  }; +} +  std::unique_ptr<IBlockAggregatorFactory> MakeBlockCountAllFactory() {      return std::make_unique<TBlockCountAllFactory>();  } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp index e96e5f3de2b..9785de7085f 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp @@ -20,19 +20,14 @@ struct TAggregatorFactories {      }  }; -std::unique_ptr<IPreparedBlockAggregator> PrepareBlockAggregator( -    TStringBuf name, -    TTupleType* tupleType, -    std::optional<ui32> filterColumn, -    const std::vector<ui32>& argsColumns, -    const TTypeEnvironment& env) { +const IBlockAggregatorFactory& GetBlockAggregatorFactory(TStringBuf name) {      const auto& f = Singleton<TAggregatorFactories>()->Factories;      auto it = f.find(name);      if (it == f.end()) {          throw yexception() << "Unsupported block aggregation function: " << name;      } -    return it->second->Prepare(tupleType, filterColumn, argsColumns, env); +    return *it->second;  }  } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h index 80b9de36723..be65fc192d1 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h @@ -15,33 +15,62 @@ public:      virtual NUdf::TUnboxedValue Build() = 0;  }; -class IBlockAggregator { +class IBlockAggregatorBase {  public: -    virtual ~IBlockAggregator() = default; +    virtual ~IBlockAggregatorBase() = default; +    const ui32 StateSize; + +    explicit IBlockAggregatorBase(ui32 stateSize) +        : StateSize(stateSize) +    {} +}; + + +class IBlockAggregatorCombineAll : public IBlockAggregatorBase { +public:      virtual void InitState(void* state) = 0;      virtual void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) = 0;      virtual NUdf::TUnboxedValue FinishOne(const void* state) = 0; +    explicit IBlockAggregatorCombineAll(ui32 stateSize) +        : IBlockAggregatorBase(stateSize) +    {} +}; + +class IBlockAggregatorCombineKeys : public IBlockAggregatorBase { +public:      virtual void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) = 0;      virtual void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) = 0; -    virtual std::unique_ptr<IAggColumnBuilder> MakeBuilder(ui64 size) = 0; +    virtual std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) = 0; -    const ui32 StateSize; +    explicit IBlockAggregatorCombineKeys(ui32 stateSize) +        : IBlockAggregatorBase(stateSize) +    {} +}; -    explicit IBlockAggregator(ui32 stateSize) -        : StateSize(stateSize) +class IBlockAggregatorFinalizeKeys : public IBlockAggregatorBase { +public: +    virtual void LoadState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) = 0; + +    virtual void UpdateState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) = 0; + +    virtual std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) = 0; + +    explicit IBlockAggregatorFinalizeKeys(ui32 stateSize) +        : IBlockAggregatorBase(stateSize)      {}  }; -class TBlockAggregatorBase : public IBlockAggregator { +template <typename TBase> +class TBlockAggregatorBase : public TBase {  public:      TBlockAggregatorBase(ui32 stateSize, std::optional<ui32> filterColumn, TComputationContext& ctx) -        : IBlockAggregator(stateSize) +        : TBase(stateSize)          , FilterColumn_(filterColumn)          , Ctx_(ctx)      { @@ -52,29 +81,54 @@ protected:      TComputationContext& Ctx_;  }; +template <typename T>  class IPreparedBlockAggregator {  public:      virtual ~IPreparedBlockAggregator() = default; -    virtual std::unique_ptr<IBlockAggregator> Make(TComputationContext& ctx) const = 0; +    virtual std::unique_ptr<T> Make(TComputationContext& ctx) const = 0;  }; -std::unique_ptr<IPreparedBlockAggregator> PrepareBlockAggregator( -    TStringBuf name, -    TTupleType* tupleType, -    std::optional<ui32> filterColumn, -    const std::vector<ui32>& argsColumns, -    const TTypeEnvironment& env); -  class IBlockAggregatorFactory {  public:     virtual ~IBlockAggregatorFactory() = default; -   virtual std::unique_ptr<IPreparedBlockAggregator> Prepare( +   virtual std::unique_ptr<IPreparedBlockAggregator<IBlockAggregatorCombineAll>> PrepareCombineAll(         TTupleType* tupleType,         std::optional<ui32> filterColumn,         const std::vector<ui32>& argsColumns,         const TTypeEnvironment& env) const = 0; + +   virtual std::unique_ptr<IPreparedBlockAggregator<IBlockAggregatorCombineKeys>> PrepareCombineKeys( +       TTupleType* tupleType, +       std::optional<ui32> filterColumn, +       const std::vector<ui32>& argsColumns, +       const TTypeEnvironment& env) const = 0; + +   virtual std::unique_ptr<IPreparedBlockAggregator<IBlockAggregatorFinalizeKeys>> PrepareFinalizeKeys( +       TTupleType* tupleType, +       const std::vector<ui32>& argsColumns, +       const TTypeEnvironment& env) const = 0; +}; + +const IBlockAggregatorFactory& GetBlockAggregatorFactory(TStringBuf name); + +struct TCombineAllTag { +    using TAggregator = IBlockAggregatorCombineAll; +    using TPreparedAggregator = IPreparedBlockAggregator<TAggregator>; +    using TBase = TBlockAggregatorBase<TAggregator>; +}; + +struct TCombineKeysTag { +    using TAggregator = IBlockAggregatorCombineKeys; +    using TPreparedAggregator = IPreparedBlockAggregator<TAggregator>; +    using TBase = TBlockAggregatorBase<TAggregator>; +}; + +struct TFinalizeKeysTag { +    using TAggregator = IBlockAggregatorFinalizeKeys; +    using TPreparedAggregator = IPreparedBlockAggregator<TAggregator>; +    using TBase = TBlockAggregatorBase<TAggregator>;  };  } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp index 2992593eafc..190ec70d0e4 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp @@ -12,6 +12,8 @@  namespace NKikimr {  namespace NMiniKQL { +namespace { +  template <bool IsMin, typename T>  T UpdateMinMax(T x, T y) {      if constexpr (IsMin) { @@ -21,65 +23,114 @@ T UpdateMinMax(T x, T y) {      }  } -template <typename TIn, typename TInScalar, typename TBuilder, bool IsMin> -class TMinMaxBlockAggregatorNullableOrScalar : public TBlockAggregatorBase { -public: -    struct TState { -        TIn Value_; -        ui8 IsValid_ = 0; +template <typename TTag, typename TIn, typename TInScalar, typename TBuilder, bool IsMin> +class TMinMaxBlockAggregatorNullableOrScalar; -        TState() { -            if constexpr (IsMin) { -                Value_ = std::numeric_limits<TIn>::max(); -            } else { -                Value_ = std::numeric_limits<TIn>::min(); -            } -        } -    }; - -    class TColumnBuilder : public IAggColumnBuilder { -    public: -        TColumnBuilder(ui64 size, const std::shared_ptr<arrow::DataType>& dataType, TComputationContext& ctx) -            : Builder_(dataType, &ctx.ArrowMemoryPool) -            , Ctx_(ctx) -        { -            ARROW_OK(Builder_.Reserve(size)); +template <typename TTag, typename TIn, typename TInScalar, typename TBuilder, bool IsMin> +class TMinMaxBlockAggregator; + +template <typename TIn, bool IsMin> +struct TState { +    TIn Value_; +    ui8 IsValid_ = 0; + +    TState() { +        if constexpr (IsMin) { +            Value_ = std::numeric_limits<TIn>::max(); +        } else { +            Value_ = std::numeric_limits<TIn>::min();          } +    } +}; -        void Add(const void* state) final { -            auto typedState = static_cast<const TState*>(state); -            if (typedState->IsValid_) { -                Builder_.UnsafeAppend(typedState->Value_); -            } else { -                Builder_.UnsafeAppendNull(); -            } +template <typename TIn, bool IsMin> +struct TSimpleState { +    TIn Value_; + +    TSimpleState() { +        if constexpr (IsMin) { +            Value_ = std::numeric_limits<TIn>::max(); +        } else { +            Value_ = std::numeric_limits<TIn>::min();          } +    } +}; + +template <typename TIn, bool IsMin, typename TBuilder> +class TColumnBuilder : public IAggColumnBuilder { +public: +    TColumnBuilder(ui64 size, const std::shared_ptr<arrow::DataType>& dataType, TComputationContext& ctx) +        : Builder_(dataType, &ctx.ArrowMemoryPool) +        , Ctx_(ctx) +    { +        ARROW_OK(Builder_.Reserve(size)); +    } -        NUdf::TUnboxedValue Build() final { -            std::shared_ptr<arrow::ArrayData> result; -            ARROW_OK(Builder_.FinishInternal(&result)); -            return Ctx_.HolderFactory.CreateArrowBlock(result); +    void Add(const void* state) final { +        auto typedState = static_cast<const TState<TIn, IsMin>*>(state); +        if (typedState->IsValid_) { +            Builder_.UnsafeAppend(typedState->Value_); +        } else { +            Builder_.UnsafeAppendNull();          } +    } + +    NUdf::TUnboxedValue Build() final { +        std::shared_ptr<arrow::ArrayData> result; +        ARROW_OK(Builder_.FinishInternal(&result)); +        return Ctx_.HolderFactory.CreateArrowBlock(result); +    } + +private: +    TBuilder Builder_; +    TComputationContext& Ctx_; +}; + +template <typename TIn, bool IsMin, typename TBuilder> +class TSimpleColumnBuilder : public IAggColumnBuilder { +public: +    TSimpleColumnBuilder(ui64 size, const std::shared_ptr<arrow::DataType>& dataType, TComputationContext& ctx) +        : Builder_(dataType, &ctx.ArrowMemoryPool) +        , Ctx_(ctx) +    { +        ARROW_OK(Builder_.Reserve(size)); +    } + +    void Add(const void* state) final { +        auto typedState = static_cast<const TSimpleState<TIn, IsMin>*>(state); +        Builder_.UnsafeAppend(typedState->Value_); +    } -    private: -        TBuilder Builder_; -        TComputationContext& Ctx_; -    }; +    NUdf::TUnboxedValue Build() final { +        std::shared_ptr<arrow::ArrayData> result; +        ARROW_OK(Builder_.FinishInternal(&result)); +        return Ctx_.HolderFactory.CreateArrowBlock(result); +    } + +private: +    TBuilder Builder_; +    TComputationContext& Ctx_; +}; + +template <typename TIn, typename TInScalar, typename TBuilder, bool IsMin> +class TMinMaxBlockAggregatorNullableOrScalar<TCombineAllTag, TIn, TInScalar, TBuilder, IsMin> : public TCombineAllTag::TBase { +public: +    using TBase = TCombineAllTag::TBase;      TMinMaxBlockAggregatorNullableOrScalar(std::optional<ui32> filterColumn, ui32 argColumn,          const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx) -        : TBlockAggregatorBase(sizeof(TState), filterColumn, ctx) +        : TBase(sizeof(TState<TIn, IsMin>), filterColumn, ctx)          , ArgColumn_(argColumn) -        , BuilderDataType_(builderDataType)      { +        Y_UNUSED(builderDataType);      }      void InitState(void* state) final { -        new(state) TState(); +        new(state) TState<TIn, IsMin>();      }      void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final { -        auto typedState = static_cast<TState*>(state); +        auto typedState = static_cast<TState<TIn, IsMin>*>(state);          Y_UNUSED(batchLength);          const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();          if (datum.is_scalar()) { @@ -149,7 +200,7 @@ public:      }      NUdf::TUnboxedValue FinishOne(const void* state) final { -        auto typedState = static_cast<const TState*>(state); +        auto typedState = static_cast<const TState<TIn, IsMin>*>(state);          if (!typedState->IsValid_) {              return NUdf::TUnboxedValuePod();          } @@ -157,38 +208,67 @@ public:          return NUdf::TUnboxedValuePod(typedState->Value_);      } +private: +    const ui32 ArgColumn_; +}; + +template <typename TIn, typename TInScalar, bool IsMin> +void PushValueToState(TState<TIn, IsMin>* typedState, const arrow::Datum& datum, ui64 row) { +    if (datum.is_scalar()) { +        if (datum.scalar()->is_valid) { +            typedState->Value_ = datum.scalar_as<TInScalar>().value; +            typedState->IsValid_ = 1; +        } +    } else { +        const auto& array = datum.array(); +        auto ptr = array->GetValues<TIn>(1); +        if (array->GetNullCount() == 0) { +            typedState->IsValid_ = 1; +            typedState->Value_ = UpdateMinMax<IsMin>(typedState->Value_, ptr[row]); +        } else { +            auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0); +            ui64 fullIndex = row + array->offset; +            // bit 1 -> mask 0xFF..FF, bit 0 -> mask 0x00..00 +            TIn mask = -TIn((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1); +            typedState->Value_ = UpdateMinMax<IsMin>(typedState->Value_, TIn((ptr[row] & mask) | (typedState->Value_ & ~mask))); +            typedState->IsValid_ |= mask & 1; +        } +    } +} + +template <typename TIn, bool IsMin> +void PushValueToSimpleState(TSimpleState<TIn, IsMin>* typedState, const arrow::Datum& datum, ui64 row) { +    const auto& array = datum.array(); +    auto ptr = array->GetValues<TIn>(1); +    typedState->Value_ = UpdateMinMax<IsMin>(typedState->Value_, ptr[row]); +} + +template <typename TIn, typename TInScalar, typename TBuilder, bool IsMin> +class TMinMaxBlockAggregatorNullableOrScalar<TCombineKeysTag, TIn, TInScalar, TBuilder, IsMin> : public TCombineKeysTag::TBase { +public: +    using TBase = TCombineKeysTag::TBase; + +    TMinMaxBlockAggregatorNullableOrScalar(std::optional<ui32> filterColumn, ui32 argColumn, +        const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx) +        : TBase(sizeof(TState<TIn, IsMin>), filterColumn, ctx) +        , ArgColumn_(argColumn) +        , BuilderDataType_(builderDataType) +    { +    } +      void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { -        new(state) TState(); +        new(state) TState<TIn, IsMin>();          UpdateKey(state, columns, row);      }      void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { -        auto typedState = static_cast<TState*>(state); +        auto typedState = static_cast<TState<TIn, IsMin>*>(state);          const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); -        if (datum.is_scalar()) { -            if (datum.scalar()->is_valid) { -                typedState->Value_ = datum.scalar_as<TInScalar>().value; -                typedState->IsValid_ = 1; -            } -        } else { -            const auto& array = datum.array(); -            auto ptr = array->GetValues<TIn>(1); -            if (array->GetNullCount() == 0) { -                typedState->IsValid_ = 1; -                typedState->Value_ = UpdateMinMax<IsMin>(typedState->Value_, ptr[row]); -            } else { -                auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0); -                ui64 fullIndex = row + array->offset; -                // bit 1 -> mask 0xFF..FF, bit 0 -> mask 0x00..00 -                TIn mask = -TIn((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1); -                typedState->Value_ = UpdateMinMax<IsMin>(typedState->Value_, TIn((ptr[row] & mask) | (typedState->Value_ & ~mask))); -                typedState->IsValid_ |= mask & 1; -            } -        } +        PushValueToState<TIn, TInScalar, IsMin>(typedState, datum, row);      } -    std::unique_ptr<IAggColumnBuilder> MakeBuilder(ui64 size) final { -        return std::make_unique<TColumnBuilder>(size, BuilderDataType_, Ctx_); +    std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) final { +        return std::make_unique<TColumnBuilder<TIn, IsMin, TBuilder>>(size, BuilderDataType_, Ctx_);      }  private: @@ -197,59 +277,57 @@ private:  };  template <typename TIn, typename TInScalar, typename TBuilder, bool IsMin> -class TMinMaxBlockAggregator: public TBlockAggregatorBase { +class TMinMaxBlockAggregatorNullableOrScalar<TFinalizeKeysTag, TIn, TInScalar, TBuilder, IsMin> : public TFinalizeKeysTag::TBase {  public: -    struct TState { -        TIn Value_; +    using TBase = TFinalizeKeysTag::TBase; -        TState() { -            if constexpr (IsMin) { -                Value_ = std::numeric_limits<TIn>::max(); -            } else { -                Value_ = std::numeric_limits<TIn>::min(); -            } -        } -    }; - -    class TColumnBuilder : public IAggColumnBuilder { -    public: -        TColumnBuilder(ui64 size, const std::shared_ptr<arrow::DataType>& dataType, TComputationContext& ctx) -            : Builder_(dataType, &ctx.ArrowMemoryPool) -            , Ctx_(ctx) -        { -            ARROW_OK(Builder_.Reserve(size)); -        } +    TMinMaxBlockAggregatorNullableOrScalar(std::optional<ui32> filterColumn, ui32 argColumn, +        const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx) +        : TBase(sizeof(TState<TIn, IsMin>), filterColumn, ctx) +        , ArgColumn_(argColumn) +        , BuilderDataType_(builderDataType) +    { +    } -        void Add(const void* state) final { -            auto typedState = static_cast<const TState*>(state); -            Builder_.UnsafeAppend(typedState->Value_); -        } +    void LoadState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { +        new(state) TState<TIn, IsMin>(); +        UpdateState(state, columns, row); +    } -        NUdf::TUnboxedValue Build() final { -            std::shared_ptr<arrow::ArrayData> result; -            ARROW_OK(Builder_.FinishInternal(&result)); -            return Ctx_.HolderFactory.CreateArrowBlock(result); -        } +    void UpdateState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { +        auto typedState = static_cast<TState<TIn, IsMin>*>(state); +        const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); +        PushValueToState<TIn, TInScalar, IsMin>(typedState, datum, row); +    } -    private: -        TBuilder Builder_; -        TComputationContext& Ctx_; -    }; +    std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final { +        return std::make_unique<TColumnBuilder<TIn, IsMin, TBuilder>>(size, BuilderDataType_, Ctx_); +    } + +private: +    const ui32 ArgColumn_; +    const std::shared_ptr<arrow::DataType> BuilderDataType_; +}; + +template <typename TIn, typename TInScalar, typename TBuilder, bool IsMin> +class TMinMaxBlockAggregator<TCombineAllTag, TIn, TInScalar, TBuilder, IsMin> : public TCombineAllTag::TBase { +public: +    using TBase = TCombineAllTag::TBase;      TMinMaxBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn,          const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx) -        : TBlockAggregatorBase(sizeof(TState), filterColumn, ctx) +        : TBase(sizeof(TSimpleState<TIn, IsMin>), filterColumn, ctx)          , ArgColumn_(argColumn) -        , BuilderDataType_(builderDataType)      { +        Y_UNUSED(builderDataType);      }      void InitState(void* state) final { -        new(state) TState; +        new(state) TSimpleState<TIn, IsMin>;      }      void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final { -        auto typedState = static_cast<TState*>(state); +        auto typedState = static_cast<TSimpleState<TIn, IsMin>*>(state);          Y_UNUSED(batchLength);          const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();          MKQL_ENSURE(datum.is_array(), "Expected array"); @@ -282,25 +360,40 @@ public:      }      NUdf::TUnboxedValue FinishOne(const void* state) final { -        auto typedState = static_cast<const TState*>(state); +        auto typedState = static_cast<const TSimpleState<TIn, IsMin>*>(state);          return NUdf::TUnboxedValuePod(typedState->Value_);      } +private: +    const ui32 ArgColumn_; +}; + +template <typename TIn, typename TInScalar, typename TBuilder, bool IsMin> +class TMinMaxBlockAggregator<TCombineKeysTag, TIn, TInScalar, TBuilder, IsMin> : public TCombineKeysTag::TBase { +public: +    using TBase = TCombineKeysTag::TBase; + +    TMinMaxBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn, +        const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx) +        : TBase(sizeof(TSimpleState<TIn, IsMin>), filterColumn, ctx) +        , ArgColumn_(argColumn) +        , BuilderDataType_(builderDataType) +    { +    } +      void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { -        new(state) TState(); +        new(state) TSimpleState<TIn, IsMin>();          UpdateKey(state, columns, row);      }      void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { -        auto typedState = static_cast<TState*>(state); +        auto typedState = static_cast<TSimpleState<TIn, IsMin>*>(state);          const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); -        const auto& array = datum.array(); -        auto ptr = array->GetValues<TIn>(1); -        typedState->Value_ = UpdateMinMax<IsMin>(typedState->Value_, ptr[row]); +        PushValueToSimpleState<TIn, IsMin>(typedState, datum, row);      } -    std::unique_ptr<IAggColumnBuilder> MakeBuilder(ui64 size) final { -        return std::make_unique<TColumnBuilder>(size, BuilderDataType_, Ctx_); +    std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) final { +        return std::make_unique<TSimpleColumnBuilder<TIn, IsMin, TBuilder>>(size, BuilderDataType_, Ctx_);      }  private: @@ -309,7 +402,40 @@ private:  };  template <typename TIn, typename TInScalar, typename TBuilder, bool IsMin> -class TPreparedMinMaxBlockAggregatorNullableOrScalar : public IPreparedBlockAggregator { +class TMinMaxBlockAggregator<TFinalizeKeysTag, TIn, TInScalar, TBuilder, IsMin> : public TFinalizeKeysTag::TBase { +public: +    using TBase = TFinalizeKeysTag::TBase; + +    TMinMaxBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn, +        const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx) +        : TBase(sizeof(TSimpleState<TIn, IsMin>), filterColumn, ctx) +        , ArgColumn_(argColumn) +        , BuilderDataType_(builderDataType) +    { +    } + +    void LoadState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { +        new(state) TSimpleState<TIn, IsMin>(); +        UpdateState(state, columns, row); +    } + +    void UpdateState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { +        auto typedState = static_cast<TSimpleState<TIn, IsMin>*>(state); +        const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); +        PushValueToSimpleState<TIn, IsMin>(typedState, datum, row); +    } + +    std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final { +        return std::make_unique<TSimpleColumnBuilder<TIn, IsMin, TBuilder>>(size, BuilderDataType_, Ctx_); +    } + +private: +    const ui32 ArgColumn_; +    const std::shared_ptr<arrow::DataType> BuilderDataType_; +}; + +template <typename TTag, typename TIn, typename TInScalar, typename TBuilder, bool IsMin> +class TPreparedMinMaxBlockAggregatorNullableOrScalar : public TTag::TPreparedAggregator {  public:      TPreparedMinMaxBlockAggregatorNullableOrScalar(std::optional<ui32> filterColumn, ui32 argColumn,          const std::shared_ptr<arrow::DataType>& builderDataType) @@ -318,8 +444,8 @@ public:          , BuilderDataType_(builderDataType)      {} -    std::unique_ptr<IBlockAggregator> Make(TComputationContext& ctx) const final { -        return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<TIn, TInScalar, TBuilder, IsMin>>(FilterColumn_, ArgColumn_, BuilderDataType_, ctx); +    std::unique_ptr<typename TTag::TAggregator> Make(TComputationContext& ctx) const final { +        return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<TTag, TIn, TInScalar, TBuilder, IsMin>>(FilterColumn_, ArgColumn_, BuilderDataType_, ctx);      }  private: @@ -328,8 +454,8 @@ private:      const std::shared_ptr<arrow::DataType> BuilderDataType_;  }; -template <typename TIn, typename TInScalar, typename TBuilder, bool IsMin> -class TPreparedMinMaxBlockAggregator : public IPreparedBlockAggregator { +template <typename TTag, typename TIn, typename TInScalar, typename TBuilder, bool IsMin> +class TPreparedMinMaxBlockAggregator : public TTag::TPreparedAggregator {  public:      TPreparedMinMaxBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn,          const std::shared_ptr<arrow::DataType>& builderDataType) @@ -338,8 +464,8 @@ public:          , BuilderDataType_(builderDataType)      {} -    std::unique_ptr<IBlockAggregator> Make(TComputationContext& ctx) const final { -        return std::make_unique<TMinMaxBlockAggregator<TIn, TInScalar, TBuilder, IsMin>>(FilterColumn_, ArgColumn_, BuilderDataType_, ctx); +    std::unique_ptr<typename TTag::TAggregator> Make(TComputationContext& ctx) const final { +        return std::make_unique<TMinMaxBlockAggregator<TTag, TIn, TInScalar, TBuilder, IsMin>>(FilterColumn_, ArgColumn_, BuilderDataType_, ctx);      }  private: @@ -348,75 +474,100 @@ private:      const std::shared_ptr<arrow::DataType> BuilderDataType_;  }; +template <typename TTag, bool IsMin> +std::unique_ptr<typename TTag::TPreparedAggregator> PrepareMinMax(TTupleType* tupleType, std::optional<ui32> filterColumn, ui32 argColumn) { +    auto blockType = AS_TYPE(TBlockType, tupleType->GetElementType(argColumn)); +    auto argType = blockType->GetItemType(); +    bool isOptional; +    auto dataType = UnpackOptionalData(argType, isOptional); +    if (blockType->GetShape() == TBlockType::EShape::Scalar || isOptional) { +        switch (*dataType->GetDataSlot()) { +        case NUdf::EDataSlot::Int8: +            return std::make_unique<TPreparedMinMaxBlockAggregatorNullableOrScalar<TTag, i8, arrow::Int8Scalar, arrow::Int8Builder, IsMin>>(filterColumn, argColumn, arrow::int8()); +        case NUdf::EDataSlot::Bool: +        case NUdf::EDataSlot::Uint8: +            return std::make_unique<TPreparedMinMaxBlockAggregatorNullableOrScalar<TTag, ui8, arrow::UInt8Scalar, arrow::UInt8Builder, IsMin>>(filterColumn, argColumn, arrow::uint8()); +        case NUdf::EDataSlot::Int16: +            return std::make_unique<TPreparedMinMaxBlockAggregatorNullableOrScalar<TTag, i16, arrow::Int16Scalar, arrow::Int16Builder, IsMin>>(filterColumn, argColumn, arrow::int16()); +        case NUdf::EDataSlot::Uint16: +        case NUdf::EDataSlot::Date: +            return std::make_unique<TPreparedMinMaxBlockAggregatorNullableOrScalar<TTag, ui16, arrow::UInt16Scalar, arrow::UInt16Builder, IsMin>>(filterColumn, argColumn, arrow::uint16()); +        case NUdf::EDataSlot::Int32: +            return std::make_unique<TPreparedMinMaxBlockAggregatorNullableOrScalar<TTag, i32, arrow::Int32Scalar, arrow::Int32Builder, IsMin>>(filterColumn, argColumn, arrow::int32()); +        case NUdf::EDataSlot::Uint32: +        case NUdf::EDataSlot::Datetime: +            return std::make_unique<TPreparedMinMaxBlockAggregatorNullableOrScalar<TTag, ui32, arrow::UInt32Scalar, arrow::UInt32Builder, IsMin>>(filterColumn, argColumn, arrow::uint32()); +        case NUdf::EDataSlot::Int64: +        case NUdf::EDataSlot::Interval: +            return std::make_unique<TPreparedMinMaxBlockAggregatorNullableOrScalar<TTag, i64, arrow::Int64Scalar, arrow::Int64Builder, IsMin>>(filterColumn, argColumn, arrow::int64()); +        case NUdf::EDataSlot::Uint64: +        case NUdf::EDataSlot::Timestamp: +            return std::make_unique<TPreparedMinMaxBlockAggregatorNullableOrScalar<TTag, ui64, arrow::UInt64Scalar, arrow::UInt64Builder, IsMin>>(filterColumn, argColumn, arrow::uint64()); +        default: +            throw yexception() << "Unsupported MIN/MAX input type"; +        } +    } +    else { +        switch (*dataType->GetDataSlot()) { +        case NUdf::EDataSlot::Int8: +            return std::make_unique<TPreparedMinMaxBlockAggregator<TTag, i8, arrow::Int8Scalar, arrow::Int8Builder, IsMin>>(filterColumn, argColumn, arrow::int8()); +        case NUdf::EDataSlot::Uint8: +        case NUdf::EDataSlot::Bool: +            return std::make_unique<TPreparedMinMaxBlockAggregator<TTag, ui8, arrow::UInt8Scalar, arrow::UInt8Builder, IsMin>>(filterColumn, argColumn, arrow::uint8()); +        case NUdf::EDataSlot::Int16: +            return std::make_unique<TPreparedMinMaxBlockAggregator<TTag, i16, arrow::Int16Scalar, arrow::Int16Builder, IsMin>>(filterColumn, argColumn, arrow::int16()); +        case NUdf::EDataSlot::Uint16: +        case NUdf::EDataSlot::Date: +            return std::make_unique<TPreparedMinMaxBlockAggregator<TTag, ui16, arrow::UInt16Scalar, arrow::UInt16Builder, IsMin>>(filterColumn, argColumn, arrow::uint16()); +        case NUdf::EDataSlot::Int32: +            return std::make_unique<TPreparedMinMaxBlockAggregator<TTag, i32, arrow::Int32Scalar, arrow::Int32Builder, IsMin>>(filterColumn, argColumn, arrow::int32()); +        case NUdf::EDataSlot::Uint32: +        case NUdf::EDataSlot::Datetime: +            return std::make_unique<TPreparedMinMaxBlockAggregator<TTag, ui32, arrow::UInt32Scalar, arrow::UInt32Builder, IsMin>>(filterColumn, argColumn, arrow::uint32()); +        case NUdf::EDataSlot::Int64: +        case NUdf::EDataSlot::Interval: +            return std::make_unique<TPreparedMinMaxBlockAggregator<TTag, i64, arrow::Int64Scalar, arrow::Int64Builder, IsMin>>(filterColumn, argColumn, arrow::int64()); +        case NUdf::EDataSlot::Uint64: +        case NUdf::EDataSlot::Timestamp: +            return std::make_unique<TPreparedMinMaxBlockAggregator<TTag, ui64, arrow::UInt64Scalar, arrow::UInt64Builder, IsMin>>(filterColumn, argColumn, arrow::uint64()); +        default: +            throw yexception() << "Unsupported MIN/MAX input type"; +        } +    } +} +  template <bool IsMin>  class TBlockMinMaxFactory : public IBlockAggregatorFactory {  public: -   std::unique_ptr<IPreparedBlockAggregator> Prepare( -       TTupleType* tupleType, -       std::optional<ui32> filterColumn, -       const std::vector<ui32>& argsColumns, -       const TTypeEnvironment& env) const final { -       Y_UNUSED(env); -       auto blockType = AS_TYPE(TBlockType, tupleType->GetElementType(argsColumns[0])); -       auto argType = blockType->GetItemType(); -       bool isOptional; -       auto dataType = UnpackOptionalData(argType, isOptional); -       if (blockType->GetShape() == TBlockType::EShape::Scalar || isOptional) { -           switch (*dataType->GetDataSlot()) { -           case NUdf::EDataSlot::Int8: -               return std::make_unique<TPreparedMinMaxBlockAggregatorNullableOrScalar<i8, arrow::Int8Scalar, arrow::Int8Builder, IsMin>>(filterColumn, argsColumns[0], arrow::int8()); -           case NUdf::EDataSlot::Bool: -           case NUdf::EDataSlot::Uint8: -               return std::make_unique<TPreparedMinMaxBlockAggregatorNullableOrScalar<ui8, arrow::UInt8Scalar, arrow::UInt8Builder, IsMin>>(filterColumn, argsColumns[0], arrow::uint8()); -           case NUdf::EDataSlot::Int16: -               return std::make_unique<TPreparedMinMaxBlockAggregatorNullableOrScalar<i16, arrow::Int16Scalar, arrow::Int16Builder, IsMin>>(filterColumn, argsColumns[0], arrow::int16()); -           case NUdf::EDataSlot::Uint16: -           case NUdf::EDataSlot::Date: -               return std::make_unique<TPreparedMinMaxBlockAggregatorNullableOrScalar<ui16, arrow::UInt16Scalar, arrow::UInt16Builder, IsMin>>(filterColumn, argsColumns[0], arrow::uint16()); -           case NUdf::EDataSlot::Int32: -               return std::make_unique<TPreparedMinMaxBlockAggregatorNullableOrScalar<i32, arrow::Int32Scalar, arrow::Int32Builder, IsMin>>(filterColumn, argsColumns[0], arrow::int32()); -           case NUdf::EDataSlot::Uint32: -           case NUdf::EDataSlot::Datetime: -               return std::make_unique<TPreparedMinMaxBlockAggregatorNullableOrScalar<ui32, arrow::UInt32Scalar, arrow::UInt32Builder, IsMin>>(filterColumn, argsColumns[0], arrow::uint32()); -           case NUdf::EDataSlot::Int64: -           case NUdf::EDataSlot::Interval: -               return std::make_unique<TPreparedMinMaxBlockAggregatorNullableOrScalar<i64, arrow::Int64Scalar, arrow::Int64Builder, IsMin>>(filterColumn, argsColumns[0], arrow::int64()); -           case NUdf::EDataSlot::Uint64: -           case NUdf::EDataSlot::Timestamp: -               return std::make_unique<TPreparedMinMaxBlockAggregatorNullableOrScalar<ui64, arrow::UInt64Scalar, arrow::UInt64Builder, IsMin>>(filterColumn, argsColumns[0], arrow::uint64()); -           default: -               throw yexception() << "Unsupported MIN/MAX input type"; -           } -       } else { -           switch (*dataType->GetDataSlot()) { -           case NUdf::EDataSlot::Int8: -               return std::make_unique<TPreparedMinMaxBlockAggregator<i8, arrow::Int8Scalar, arrow::Int8Builder, IsMin>>(filterColumn, argsColumns[0], arrow::int8()); -           case NUdf::EDataSlot::Uint8: -           case NUdf::EDataSlot::Bool: -               return std::make_unique<TPreparedMinMaxBlockAggregator<ui8, arrow::UInt8Scalar, arrow::UInt8Builder, IsMin>>(filterColumn, argsColumns[0], arrow::uint8()); -           case NUdf::EDataSlot::Int16: -               return std::make_unique<TPreparedMinMaxBlockAggregator<i16, arrow::Int16Scalar, arrow::Int16Builder, IsMin>>(filterColumn, argsColumns[0], arrow::int16()); -           case NUdf::EDataSlot::Uint16: -           case NUdf::EDataSlot::Date: -               return std::make_unique<TPreparedMinMaxBlockAggregator<ui16, arrow::UInt16Scalar, arrow::UInt16Builder, IsMin>>(filterColumn, argsColumns[0], arrow::uint16()); -           case NUdf::EDataSlot::Int32: -               return std::make_unique<TPreparedMinMaxBlockAggregator<i32, arrow::Int32Scalar, arrow::Int32Builder, IsMin>>(filterColumn, argsColumns[0], arrow::int32()); -           case NUdf::EDataSlot::Uint32: -           case NUdf::EDataSlot::Datetime: -               return std::make_unique<TPreparedMinMaxBlockAggregator<ui32, arrow::UInt32Scalar, arrow::UInt32Builder, IsMin>>(filterColumn, argsColumns[0], arrow::uint32()); -           case NUdf::EDataSlot::Int64: -           case NUdf::EDataSlot::Interval: -               return std::make_unique<TPreparedMinMaxBlockAggregator<i64, arrow::Int64Scalar, arrow::Int64Builder, IsMin>>(filterColumn, argsColumns[0], arrow::int64()); -           case NUdf::EDataSlot::Uint64: -           case NUdf::EDataSlot::Timestamp: -               return std::make_unique<TPreparedMinMaxBlockAggregator<ui64, arrow::UInt64Scalar, arrow::UInt64Builder, IsMin>>(filterColumn, argsColumns[0], arrow::uint64()); -           default: -               throw yexception() << "Unsupported MIN/MAX input type"; -           } -       } -   } +    std::unique_ptr<TCombineAllTag::TPreparedAggregator> PrepareCombineAll( +        TTupleType* tupleType, +        std::optional<ui32> filterColumn, +        const std::vector<ui32>& argsColumns, +        const TTypeEnvironment& env) const final { +        Y_UNUSED(env); +        return PrepareMinMax<TCombineAllTag, IsMin>(tupleType, filterColumn, argsColumns[0]); +    } + +    std::unique_ptr<TCombineKeysTag::TPreparedAggregator> PrepareCombineKeys( +        TTupleType* tupleType, +        std::optional<ui32> filterColumn, +        const std::vector<ui32>& argsColumns, +        const TTypeEnvironment& env) const final { +        Y_UNUSED(env); +        return PrepareMinMax<TCombineKeysTag, IsMin>(tupleType, filterColumn, argsColumns[0]); +    } + +    std::unique_ptr<TFinalizeKeysTag::TPreparedAggregator> PrepareFinalizeKeys( +        TTupleType* tupleType, +        const std::vector<ui32>& argsColumns, +        const TTypeEnvironment& env) const final { +        Y_UNUSED(env); +        return PrepareMinMax<TFinalizeKeysTag, IsMin>(tupleType, std::optional<ui32>(), argsColumns[0]); +    }  }; +} +  std::unique_ptr<IBlockAggregatorFactory> MakeBlockMinFactory() {      return std::make_unique<TBlockMinMaxFactory<true>>();  } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp index 9894a3c2317..cf1df6f051a 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp @@ -12,57 +12,189 @@  namespace NKikimr {  namespace NMiniKQL { -template <typename TIn, typename TSum, typename TBuilder, typename TInScalar> -class TSumBlockAggregatorNullableOrScalar : public TBlockAggregatorBase { +namespace { + +template <typename TSum> +struct TSumState { +    TSum Sum_ = 0; +    ui8 IsValid_ = 0; +}; + +template <typename TSum> +struct TSumSimpleState { +    TSum Sum_ = 0; +}; + +struct TAvgState { +    double Sum_ = 0; +    ui64 Count_ = 0; +}; + +template <typename TSum, typename TBuilder> +class TSumColumnBuilder : public IAggColumnBuilder {  public: -    struct TState { -        TSum Sum_ = 0; -        ui8 IsValid_ = 0; -    }; - -    class TColumnBuilder : public IAggColumnBuilder { -    public: -        TColumnBuilder(ui64 size, const std::shared_ptr<arrow::DataType>& dataType, TComputationContext& ctx) -            : Builder_(dataType, &ctx.ArrowMemoryPool) -            , Ctx_(ctx) -        { -            ARROW_OK(Builder_.Reserve(size)); +    TSumColumnBuilder(ui64 size, const std::shared_ptr<arrow::DataType>& dataType, TComputationContext& ctx) +        : Builder_(dataType, &ctx.ArrowMemoryPool) +        , Ctx_(ctx) +    { +        ARROW_OK(Builder_.Reserve(size)); +    } + +    void Add(const void* state) final { +        auto typedState = static_cast<const TSumState<TSum>*>(state); +        if (typedState->IsValid_) { +            Builder_.UnsafeAppend(typedState->Sum_);          } +        else { +            Builder_.UnsafeAppendNull(); +        } +    } -        void Add(const void* state) final { -            auto typedState = static_cast<const TState*>(state); -            if (typedState->IsValid_) { -                Builder_.UnsafeAppend(typedState->Sum_); -            } else { -                Builder_.UnsafeAppendNull(); -            } +    NUdf::TUnboxedValue Build() final { +        std::shared_ptr<arrow::ArrayData> result; +        ARROW_OK(Builder_.FinishInternal(&result)); +        return Ctx_.HolderFactory.CreateArrowBlock(result); +    } + +private: +    TBuilder Builder_; +    TComputationContext& Ctx_; +}; + +template <typename TSum, typename TBuilder> +class TSimpleSumColumnBuilder : public IAggColumnBuilder { +public: +    TSimpleSumColumnBuilder(ui64 size, const std::shared_ptr<arrow::DataType>& dataType, TComputationContext& ctx) +        : Builder_(dataType, &ctx.ArrowMemoryPool) +        , Ctx_(ctx) +    { +        ARROW_OK(Builder_.Reserve(size)); +    } + +    void Add(const void* state) final { +        auto typedState = static_cast<const TSumSimpleState<TSum>*>(state); +        Builder_.UnsafeAppend(typedState->Sum_); +    } + +    NUdf::TUnboxedValue Build() final { +        std::shared_ptr<arrow::ArrayData> result; +        ARROW_OK(Builder_.FinishInternal(&result)); +        return Ctx_.HolderFactory.CreateArrowBlock(result); +    } + +private: +    TBuilder Builder_; +    TComputationContext& Ctx_; +}; + +class TAvgStateColumnBuilder : public IAggColumnBuilder { +public: +    TAvgStateColumnBuilder(ui64 size, const std::shared_ptr<arrow::DataType>& arrowType, TComputationContext& ctx) +        : ArrowType_(arrowType) +        , Ctx_(ctx) +        , NullBitmapBuilder_(&ctx.ArrowMemoryPool) +        , SumBuilder_(arrow::float64(), &ctx.ArrowMemoryPool) +        , CountBuilder_(arrow::uint64(), &ctx.ArrowMemoryPool) +    { +        ARROW_OK(NullBitmapBuilder_.Reserve(size)); +        ARROW_OK(SumBuilder_.Reserve(size)); +        ARROW_OK(CountBuilder_.Reserve(size)); +    } + +    void Add(const void* state) final { +        auto typedState = static_cast<const TAvgState*>(state); +        if (typedState->Count_) { +            NullBitmapBuilder_.UnsafeAppend(true); +            SumBuilder_.UnsafeAppend(typedState->Sum_); +            CountBuilder_.UnsafeAppend(typedState->Count_); +        } else { +            NullBitmapBuilder_.UnsafeAppend(false); +            SumBuilder_.UnsafeAppendNull(); +            CountBuilder_.UnsafeAppendNull();          } +    } + +    NUdf::TUnboxedValue Build() final { +        std::shared_ptr<arrow::ArrayData> sumResult; +        std::shared_ptr<arrow::ArrayData> countResult; +        ARROW_OK(SumBuilder_.FinishInternal(&sumResult)); +        ARROW_OK(CountBuilder_.FinishInternal(&countResult)); +        std::shared_ptr<arrow::Buffer> nullBitmap; +        auto length = NullBitmapBuilder_.length(); +        auto nullCount = NullBitmapBuilder_.false_count(); +        ARROW_OK(NullBitmapBuilder_.Finish(&nullBitmap)); + +        auto arrayData = arrow::ArrayData::Make(ArrowType_, length, { nullBitmap }, nullCount, 0); +        arrayData->child_data.push_back(sumResult); +        arrayData->child_data.push_back(countResult); +        return Ctx_.HolderFactory.CreateArrowBlock(arrow::Datum(arrayData)); +    } -        NUdf::TUnboxedValue Build() final { -            std::shared_ptr<arrow::ArrayData> result; -            ARROW_OK(Builder_.FinishInternal(&result)); -            return Ctx_.HolderFactory.CreateArrowBlock(result); +private: +    const std::shared_ptr<arrow::DataType> ArrowType_; +    TComputationContext& Ctx_; +    arrow::TypedBufferBuilder<bool> NullBitmapBuilder_; +    arrow::DoubleBuilder SumBuilder_; +    arrow::UInt64Builder CountBuilder_; +}; + +class TAvgResultColumnBuilder : public IAggColumnBuilder { +public: +    TAvgResultColumnBuilder(ui64 size, TComputationContext& ctx) +        : Ctx_(ctx) +        , Builder_(arrow::float64(), &ctx.ArrowMemoryPool) +    { +        ARROW_OK(Builder_.Reserve(size)); +    } + +    void Add(const void* state) final { +        auto typedState = static_cast<const TAvgState*>(state); +        if (typedState->Count_) { +            Builder_.UnsafeAppend(typedState->Sum_ / typedState->Count_); +        } else { +            Builder_.UnsafeAppendNull();          } +    } + +    NUdf::TUnboxedValue Build() final { +        std::shared_ptr<arrow::ArrayData> result; +        ARROW_OK(Builder_.FinishInternal(&result)); +        return Ctx_.HolderFactory.CreateArrowBlock(arrow::Datum(result)); +    } + +private: +    TComputationContext& Ctx_; +    arrow::DoubleBuilder Builder_; +}; + +template <typename TTag, typename TIn, typename TSum, typename TBuilder, typename TInScalar> +class TSumBlockAggregatorNullableOrScalar; + +template <typename TTag, typename TIn, typename TSum, typename TBuilder, typename TInScalar> +class TSumBlockAggregator; -    private: -        TBuilder Builder_; -        TComputationContext& Ctx_; -    }; +template <typename TTag, typename TIn, typename TInScalar> +class TAvgBlockAggregator; + +template <typename TIn, typename TSum, typename TBuilder, typename TInScalar> +class TSumBlockAggregatorNullableOrScalar<TCombineAllTag, TIn, TSum, TBuilder, TInScalar> : public TCombineAllTag::TBase { +public: +    using TBase = TCombineAllTag::TBase;      TSumBlockAggregatorNullableOrScalar(std::optional<ui32> filterColumn, ui32 argColumn,          const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx) -        : TBlockAggregatorBase(sizeof(TState), filterColumn, ctx) +        : TBase(sizeof(TSumState<TSum>), filterColumn, ctx)          , ArgColumn_(argColumn) -        , BuilderDataType_(builderDataType)      { +        Y_UNUSED(builderDataType);      }      void InitState(void* state) final { -        new(state) TState(); +        new(state) TSumState<TSum>();      }      void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final { -        auto typedState = static_cast<TState*>(state); +        auto typedState = static_cast<TSumState<TSum>*>(state);          const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();          if (datum.is_scalar()) {              if (datum.scalar()->is_valid) { @@ -131,7 +263,7 @@ public:      }      NUdf::TUnboxedValue FinishOne(const void* state) final { -        auto typedState = static_cast<const TState*>(state); +        auto typedState = static_cast<const TSumState<TSum>*>(state);          if (!typedState->IsValid_) {              return NUdf::TUnboxedValuePod();          } @@ -139,38 +271,67 @@ public:          return NUdf::TUnboxedValuePod(typedState->Sum_);      } +private: +    const ui32 ArgColumn_; +}; + +template <typename TIn, typename TSum, typename TInScalar> +void PushValueToState(TSumState<TSum>* typedState, const arrow::Datum& datum, ui64 row) { +    if (datum.is_scalar()) { +        if (datum.scalar()->is_valid) { +            typedState->Sum_ += datum.scalar_as<TInScalar>().value; +            typedState->IsValid_ = 1; +        } +    } else { +        const auto& array = datum.array(); +        auto ptr = array->GetValues<TIn>(1); +        if (array->GetNullCount() == 0) { +            typedState->IsValid_ = 1; +            typedState->Sum_ += ptr[row]; +        } else { +            auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0); +            ui64 fullIndex = row + array->offset; +            // bit 1 -> mask 0xFF..FF, bit 0 -> mask 0x00..00 +            TIn mask = (((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1) ^ 1) - TIn(1); +            typedState->Sum_ += (ptr[row] & mask); +            typedState->IsValid_ |= mask & 1; +        } +    } +} + +template <typename TIn, typename TSum> +void PushValueToSimpleState(TSumSimpleState<TSum>* typedState, const arrow::Datum& datum, ui64 row) { +    const auto& array = datum.array(); +    auto ptr = array->GetValues<TIn>(1); +    typedState->Sum_ += ptr[row]; +} + +template <typename TIn, typename TSum, typename TBuilder, typename TInScalar> +class TSumBlockAggregatorNullableOrScalar<TCombineKeysTag, TIn, TSum, TBuilder, TInScalar> : public TCombineKeysTag::TBase { +public: +    using TBase = TCombineKeysTag::TBase; + +    TSumBlockAggregatorNullableOrScalar(std::optional<ui32> filterColumn, ui32 argColumn, +        const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx) +        : TBase(sizeof(TSumState<TSum>), filterColumn, ctx) +        , ArgColumn_(argColumn) +        , BuilderDataType_(builderDataType) +    { +    } +      void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { -        new(state) TState(); +        new(state) TSumState<TSum>();          UpdateKey(state, columns, row);      }      void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { -        auto typedState = static_cast<TState*>(state); +        auto typedState = static_cast<TSumState<TSum>*>(state);          const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); -        if (datum.is_scalar()) { -            if (datum.scalar()->is_valid) { -                typedState->Sum_ += datum.scalar_as<TInScalar>().value; -                typedState->IsValid_ = 1; -            } -        } else { -            const auto& array = datum.array(); -            auto ptr = array->GetValues<TIn>(1); -            if (array->GetNullCount() == 0) { -                typedState->IsValid_ = 1; -                typedState->Sum_ += ptr[row]; -            } else { -                auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0); -                ui64 fullIndex = row + array->offset; -                // bit 1 -> mask 0xFF..FF, bit 0 -> mask 0x00..00 -                TIn mask = (((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1) ^ 1) - TIn(1); -                typedState->Sum_ += (ptr[row] & mask); -                typedState->IsValid_ |= mask & 1; -            } -        } +        PushValueToState<TIn, TSum, TInScalar>(typedState, datum, row);      } -    std::unique_ptr<IAggColumnBuilder> MakeBuilder(ui64 size) final { -        return std::make_unique<TColumnBuilder>(size, BuilderDataType_, Ctx_); +    std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) final { +        return std::make_unique<TSumColumnBuilder<TSum, TBuilder>>(size, BuilderDataType_, Ctx_);      }  private: @@ -179,51 +340,57 @@ private:  };  template <typename TIn, typename TSum, typename TBuilder, typename TInScalar> -class TSumBlockAggregator : public TBlockAggregatorBase { +class TSumBlockAggregatorNullableOrScalar<TFinalizeKeysTag, TIn, TSum, TBuilder, TInScalar> : public TFinalizeKeysTag::TBase {  public: -    struct TState { -        TSum Sum_ = 0; -    }; - -    class TColumnBuilder : public IAggColumnBuilder { -    public: -        TColumnBuilder(ui64 size, const std::shared_ptr<arrow::DataType>& dataType, TComputationContext& ctx) -            : Builder_(dataType, &ctx.ArrowMemoryPool) -            , Ctx_(ctx) -        { -            ARROW_OK(Builder_.Reserve(size)); -        } +    using TBase = TFinalizeKeysTag::TBase; -        void Add(const void* state) final { -            auto typedState = static_cast<const TState*>(state); -            Builder_.UnsafeAppend(typedState->Sum_); -        } +    TSumBlockAggregatorNullableOrScalar(std::optional<ui32> filterColumn, ui32 argColumn, +        const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx) +        : TBase(sizeof(TSumState<TSum>), filterColumn, ctx) +        , ArgColumn_(argColumn) +        , BuilderDataType_(builderDataType) +    { +    } -        NUdf::TUnboxedValue Build() final { -            std::shared_ptr<arrow::ArrayData> result; -            ARROW_OK(Builder_.FinishInternal(&result)); -            return Ctx_.HolderFactory.CreateArrowBlock(result); -        } +    void LoadState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { +        new(state) TSumState<TSum>(); +        UpdateState(state, columns, row); +    } + +    void UpdateState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { +        auto typedState = static_cast<TSumState<TSum>*>(state); +        const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); +        PushValueToState<TIn, TSum, TInScalar>(typedState, datum, row); +    } -    private: -        TBuilder Builder_; -        TComputationContext& Ctx_; -    }; +    std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final { +        return std::make_unique<TSumColumnBuilder<TSum, TBuilder>>(size, BuilderDataType_, Ctx_); +    } + +private: +    const ui32 ArgColumn_; +    const std::shared_ptr<arrow::DataType> BuilderDataType_; +}; + +template <typename TIn, typename TSum, typename TBuilder, typename TInScalar> +class TSumBlockAggregator<TCombineAllTag, TIn, TSum, TBuilder, TInScalar> : public TCombineAllTag::TBase { +public: +    using TBase = TCombineAllTag::TBase;      TSumBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn,          const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx) -        : TBlockAggregatorBase(sizeof(TState), filterColumn, ctx) +        : TBase(sizeof(TSumSimpleState<TSum>), filterColumn, ctx)          , ArgColumn_(argColumn) -        , BuilderDataType_(builderDataType)      { +        Y_UNUSED(builderDataType);      }      void InitState(void* state) final { -        new(state) TState(); +        new(state) TSumSimpleState<TSum>();      }      void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final { -        auto typedState = static_cast<TState*>(state); +        auto typedState = static_cast<TSumSimpleState<TSum>*>(state);          Y_UNUSED(batchLength);          const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();          MKQL_ENSURE(datum.is_array(), "Expected array"); @@ -254,25 +421,40 @@ public:      }      NUdf::TUnboxedValue FinishOne(const void* state) final { -        auto typedState = static_cast<const TState*>(state); +        auto typedState = static_cast<const TSumSimpleState<TSum>*>(state);          return NUdf::TUnboxedValuePod(typedState->Sum_);      } +private: +    const ui32 ArgColumn_; +}; + +template <typename TIn, typename TSum, typename TBuilder, typename TInScalar> +class TSumBlockAggregator<TCombineKeysTag, TIn, TSum, TBuilder, TInScalar> : public TCombineKeysTag::TBase { +public: +    using TBase = TCombineKeysTag::TBase; + +    TSumBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn, +        const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx) +        : TBase(sizeof(TSumSimpleState<TSum>), filterColumn, ctx) +        , ArgColumn_(argColumn) +        , BuilderDataType_(builderDataType) +    { +    } +      void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { -        new(state) TState(); +        new(state) TSumSimpleState<TSum>();          UpdateKey(state, columns, row);      }      void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { -        auto typedState = static_cast<TState*>(state); +        auto typedState = static_cast<TSumSimpleState<TSum>*>(state);          const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); -        const auto& array = datum.array(); -        auto ptr = array->GetValues<TIn>(1); -        typedState->Sum_ += ptr[row]; +        PushValueToSimpleState<TIn, TSum>(typedState, datum, row);      } -    std::unique_ptr<IAggColumnBuilder> MakeBuilder(ui64 size) final { -        return std::make_unique<TColumnBuilder>(size, BuilderDataType_, Ctx_); +    std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) final { +        return std::make_unique<TSimpleSumColumnBuilder<TSum, TBuilder>>(size, BuilderDataType_, Ctx_);      }  private: @@ -280,79 +462,58 @@ private:      const std::shared_ptr<arrow::DataType> BuilderDataType_;  }; -template <typename TIn, typename TInScalar> -class TAvgBlockAggregator : public TBlockAggregatorBase { +template <typename TIn, typename TSum, typename TBuilder, typename TInScalar> +class TSumBlockAggregator<TFinalizeKeysTag, TIn, TSum, TBuilder, TInScalar> : public TFinalizeKeysTag::TBase {  public: -    struct TState { -        double Sum_ = 0; -        ui64 Count_ = 0; -    }; - -    class TColumnBuilder : public IAggColumnBuilder { -    public: -        TColumnBuilder(ui64 size, const std::shared_ptr<arrow::DataType>& arrowType, TComputationContext& ctx) -            : ArrowType_(arrowType) -            , Ctx_(ctx) -            , NullBitmapBuilder_(&ctx.ArrowMemoryPool) -            , SumBuilder_(arrow::float64(), &ctx.ArrowMemoryPool) -            , CountBuilder_(arrow::uint64(), &ctx.ArrowMemoryPool) -        { -            ARROW_OK(NullBitmapBuilder_.Reserve(size)); -            ARROW_OK(SumBuilder_.Reserve(size)); -            ARROW_OK(CountBuilder_.Reserve(size)); -        } +    using TBase = TFinalizeKeysTag::TBase; -        void Add(const void* state) final { -            auto typedState = static_cast<const TState*>(state); -            if (typedState->Count_) { -                NullBitmapBuilder_.UnsafeAppend(true); -                SumBuilder_.UnsafeAppend(typedState->Sum_); -                CountBuilder_.UnsafeAppend(typedState->Count_); -            } else { -                NullBitmapBuilder_.UnsafeAppend(false); -                SumBuilder_.UnsafeAppendNull(); -                CountBuilder_.UnsafeAppendNull(); -            } -        } +    TSumBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn, +        const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx) +        : TBase(sizeof(TSumSimpleState<TSum>), filterColumn, ctx) +        , ArgColumn_(argColumn) +        , BuilderDataType_(builderDataType) +    { +    } -        NUdf::TUnboxedValue Build() final { -            std::shared_ptr<arrow::ArrayData> sumResult; -            std::shared_ptr<arrow::ArrayData> countResult; -            ARROW_OK(SumBuilder_.FinishInternal(&sumResult)); -            ARROW_OK(CountBuilder_.FinishInternal(&countResult)); -            std::shared_ptr<arrow::Buffer> nullBitmap; -            auto length = NullBitmapBuilder_.length(); -            auto nullCount = NullBitmapBuilder_.false_count(); -            ARROW_OK(NullBitmapBuilder_.Finish(&nullBitmap)); - -            auto arrayData = arrow::ArrayData::Make(ArrowType_, length, { nullBitmap }, nullCount, 0); -            arrayData->child_data.push_back(sumResult); -            arrayData->child_data.push_back(countResult); -            return Ctx_.HolderFactory.CreateArrowBlock(arrow::Datum(arrayData)); -        } +    void LoadState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { +        new(state) TSumSimpleState<TSum>(); +        UpdateState(state, columns, row); +    } + +    void UpdateState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { +        auto typedState = static_cast<TSumSimpleState<TSum>*>(state); +        const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); +        PushValueToSimpleState<TIn, TSum>(typedState, datum, row); +    } + +    std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final { +        return std::make_unique<TSimpleSumColumnBuilder<TSum, TBuilder>>(size, BuilderDataType_, Ctx_); +    } + +private: +    const ui32 ArgColumn_; +    const std::shared_ptr<arrow::DataType> BuilderDataType_; +}; -    private: -        const std::shared_ptr<arrow::DataType> ArrowType_; -        TComputationContext& Ctx_; -        arrow::TypedBufferBuilder<bool> NullBitmapBuilder_; -        arrow::DoubleBuilder SumBuilder_; -        arrow::UInt64Builder CountBuilder_; -    }; +template <typename TIn, typename TInScalar> +class TAvgBlockAggregator<TCombineAllTag, TIn, TInScalar> : public TCombineAllTag::TBase { +public: +    using TBase = TCombineAllTag::TBase;      TAvgBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn, -        const std::shared_ptr<arrow::DataType> builderDataType, TComputationContext& ctx) -        : TBlockAggregatorBase(sizeof(TState), filterColumn, ctx) +        const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx) +        : TBase(sizeof(TAvgState), filterColumn, ctx)          , ArgColumn_(argColumn) -        , BuilderDataType_(builderDataType)      { +        Y_UNUSED(builderDataType);      }      void InitState(void* state) final { -        new(state) TState(); +        new(state) TAvgState();      }      void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final { -        auto typedState = static_cast<TState*>(state); +        auto typedState = static_cast<TAvgState*>(state);          const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();          if (datum.is_scalar()) {              if (datum.scalar()->is_valid) { @@ -421,7 +582,7 @@ public:      }      NUdf::TUnboxedValue FinishOne(const void* state) final { -        auto typedState = static_cast<const TState*>(state); +        auto typedState = static_cast<const TAvgState*>(state);          if (!typedState->Count_) {              return NUdf::TUnboxedValuePod();          } @@ -433,13 +594,30 @@ public:          return arr;      } +private: +    ui32 ArgColumn_; +}; + +template <typename TIn, typename TInScalar> +class TAvgBlockAggregator<TCombineKeysTag, TIn, TInScalar> : public TCombineKeysTag::TBase { +public: +    using TBase = TCombineKeysTag::TBase; + +    TAvgBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn, +        const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx) +        : TBase(sizeof(TAvgState), filterColumn, ctx) +        , ArgColumn_(argColumn) +        , BuilderDataType_(builderDataType) +    { +    } +      void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { -        new(state) TState(); +        new(state) TAvgState();          UpdateKey(state, columns, row);      }      void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { -        auto typedState = static_cast<TState*>(state); +        auto typedState = static_cast<TAvgState*>(state);          const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();          if (datum.is_scalar()) {              if (datum.scalar()->is_valid) { @@ -463,8 +641,8 @@ public:          }      } -    std::unique_ptr<IAggColumnBuilder> MakeBuilder(ui64 size) final { -        return std::make_unique<TColumnBuilder>(size, BuilderDataType_, Ctx_); +    std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) final { +        return std::make_unique<TAvgStateColumnBuilder>(size, BuilderDataType_, Ctx_);      }  private: @@ -472,8 +650,60 @@ private:      const std::shared_ptr<arrow::DataType> BuilderDataType_;  }; -template <typename TIn, typename TSum, typename TBuilder, typename TInScalar> -class TPreparedSumBlockAggregatorNullableOrScalar : public IPreparedBlockAggregator { +class TAvgBlockAggregatorOverState : public TFinalizeKeysTag::TBase { +public: +    using TBase = TFinalizeKeysTag::TBase; + +    TAvgBlockAggregatorOverState(ui32 argColumn, TComputationContext& ctx) +        : TBase(sizeof(TAvgState), {}, ctx) +        , ArgColumn_(argColumn) +    { +    } + +    void LoadState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { +        new(state) TAvgState(); +        UpdateState(state, columns, row); +    } + +    void UpdateState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { +        auto typedState = static_cast<TAvgState*>(state); +        const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); +        if (datum.is_scalar()) { +            if (datum.scalar()->is_valid) { +                const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(*datum.scalar()); + +                typedState->Sum_ += arrow::internal::checked_cast<const arrow::DoubleScalar&>(*structScalar.value[0]).value; +                typedState->Count_ += arrow::internal::checked_cast<const arrow::UInt64Scalar&>(*structScalar.value[1]).value; +            } +        } else { +            const auto& array = datum.array(); +            auto sumPtr = array->child_data[0]->GetValues<double>(1); +            auto countPtr = array->child_data[1]->GetValues<ui64>(1); +            if (array->GetNullCount() == 0) { +                typedState->Sum_ += sumPtr[row]; +                typedState->Count_ += countPtr[row]; +            } else { +                auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0); +                ui64 fullIndex = row + array->offset; +                // bit 1 -> mask 0xFF..FF, bit 0 -> mask 0x00..00 +                auto bit = (nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1; +                ui64 mask = -ui64(bit); +                typedState->Sum_ += sumPtr[row] * bit; +                typedState->Count_ += mask & countPtr[row]; +            } +        } +    } + +    std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final { +        return std::make_unique<TAvgResultColumnBuilder>(size, Ctx_); +    } + +private: +    const ui32 ArgColumn_; +}; + +template <typename TTag, typename TIn, typename TSum, typename TBuilder, typename TInScalar> +class TPreparedSumBlockAggregatorNullableOrScalar : public TTag::TPreparedAggregator {  public:      TPreparedSumBlockAggregatorNullableOrScalar(std::optional<ui32> filterColumn, ui32 argColumn,          const std::shared_ptr<arrow::DataType>& builderDataType) @@ -482,8 +712,8 @@ public:          , BuilderDataType_(builderDataType)      {} -    std::unique_ptr<IBlockAggregator> Make(TComputationContext& ctx) const final { -        return std::make_unique<TSumBlockAggregatorNullableOrScalar<TIn, TSum, TBuilder, TInScalar>>(FilterColumn_, ArgColumn_, BuilderDataType_, ctx); +    std::unique_ptr<typename TTag::TAggregator> Make(TComputationContext& ctx) const final { +        return std::make_unique<TSumBlockAggregatorNullableOrScalar<TTag, TIn, TSum, TBuilder, TInScalar>>(FilterColumn_, ArgColumn_, BuilderDataType_, ctx);      }  private: @@ -492,8 +722,8 @@ private:      const std::shared_ptr<arrow::DataType> BuilderDataType_;  }; -template <typename TIn, typename TSum, typename TBuilder, typename TInScalar> -class TPreparedSumBlockAggregator : public IPreparedBlockAggregator { +template <typename TTag, typename TIn, typename TSum, typename TBuilder, typename TInScalar> +class TPreparedSumBlockAggregator : public TTag::TPreparedAggregator {  public:      TPreparedSumBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn,          const std::shared_ptr<arrow::DataType>& builderDataType) @@ -502,8 +732,8 @@ public:          , BuilderDataType_(builderDataType)      {} -    std::unique_ptr<IBlockAggregator> Make(TComputationContext& ctx) const final { -        return std::make_unique<TSumBlockAggregator<TIn, TSum, TBuilder, TInScalar>>(FilterColumn_, ArgColumn_, BuilderDataType_, ctx); +    std::unique_ptr<typename TTag::TAggregator> Make(TComputationContext& ctx) const final { +        return std::make_unique<TSumBlockAggregator<TTag, TIn, TSum, TBuilder, TInScalar>>(FilterColumn_, ArgColumn_, BuilderDataType_, ctx);      }  private: @@ -512,66 +742,88 @@ private:      const std::shared_ptr<arrow::DataType> BuilderDataType_;  }; +template <typename TTag> +std::unique_ptr<typename TTag::TPreparedAggregator> PrepareSum(TTupleType* tupleType, std::optional<ui32> filterColumn, ui32 argColumn) { +    auto blockType = AS_TYPE(TBlockType, tupleType->GetElementType(argColumn)); +    auto argType = blockType->GetItemType(); +    bool isOptional; +    auto dataType = UnpackOptionalData(argType, isOptional); +    if (blockType->GetShape() == TBlockType::EShape::Scalar || isOptional) { +        switch (*dataType->GetDataSlot()) { +        case NUdf::EDataSlot::Int8: +            return std::make_unique<TPreparedSumBlockAggregatorNullableOrScalar<TTag, i8, i64, arrow::Int64Builder, arrow::Int8Scalar>>(filterColumn, argColumn, arrow::int64()); +        case NUdf::EDataSlot::Uint8: +            return std::make_unique<TPreparedSumBlockAggregatorNullableOrScalar<TTag, ui8, ui64, arrow::UInt64Builder, arrow::UInt8Scalar>>(filterColumn, argColumn, arrow::uint64()); +        case NUdf::EDataSlot::Int16: +            return std::make_unique<TPreparedSumBlockAggregatorNullableOrScalar<TTag, i16, i64, arrow::Int64Builder, arrow::Int16Scalar>>(filterColumn, argColumn, arrow::int64()); +        case NUdf::EDataSlot::Uint16: +            return std::make_unique<TPreparedSumBlockAggregatorNullableOrScalar<TTag, ui16, ui64, arrow::UInt64Builder, arrow::UInt16Scalar>>(filterColumn, argColumn, arrow::uint64()); +        case NUdf::EDataSlot::Int32: +            return std::make_unique<TPreparedSumBlockAggregatorNullableOrScalar<TTag, i32, i64, arrow::Int64Builder, arrow::Int32Scalar>>(filterColumn, argColumn, arrow::int64()); +        case NUdf::EDataSlot::Uint32: +            return std::make_unique<TPreparedSumBlockAggregatorNullableOrScalar<TTag, ui32, ui64, arrow::UInt64Builder, arrow::UInt32Scalar>>(filterColumn, argColumn, arrow::uint64()); +        case NUdf::EDataSlot::Int64: +            return std::make_unique<TPreparedSumBlockAggregatorNullableOrScalar<TTag, i64, i64, arrow::Int64Builder, arrow::Int64Scalar>>(filterColumn, argColumn, arrow::int64()); +        case NUdf::EDataSlot::Uint64: +            return std::make_unique<TPreparedSumBlockAggregatorNullableOrScalar<TTag, ui64, ui64, arrow::UInt64Builder, arrow::UInt64Scalar>>(filterColumn, argColumn, arrow::uint64()); +        default: +            throw yexception() << "Unsupported SUM input type"; +        } +    } else { +        switch (*dataType->GetDataSlot()) { +        case NUdf::EDataSlot::Int8: +            return std::make_unique<TPreparedSumBlockAggregator<TTag, i8, i64, arrow::Int64Builder, arrow::Int8Scalar>>(filterColumn, argColumn, arrow::int64()); +        case NUdf::EDataSlot::Uint8: +            return std::make_unique<TPreparedSumBlockAggregator<TTag, ui8, ui64, arrow::UInt64Builder, arrow::UInt8Scalar>>(filterColumn, argColumn, arrow::uint64()); +        case NUdf::EDataSlot::Int16: +            return std::make_unique<TPreparedSumBlockAggregator<TTag, i16, i64, arrow::Int64Builder, arrow::Int16Scalar>>(filterColumn, argColumn, arrow::int64()); +        case NUdf::EDataSlot::Uint16: +            return std::make_unique<TPreparedSumBlockAggregator<TTag, ui16, ui64, arrow::UInt64Builder, arrow::UInt16Scalar>>(filterColumn, argColumn, arrow::uint64()); +        case NUdf::EDataSlot::Int32: +            return std::make_unique<TPreparedSumBlockAggregator<TTag, i32, i64, arrow::Int64Builder, arrow::Int32Scalar>>(filterColumn, argColumn, arrow::int64()); +        case NUdf::EDataSlot::Uint32: +            return std::make_unique<TPreparedSumBlockAggregator<TTag, ui32, ui64, arrow::UInt64Builder, arrow::UInt32Scalar>>(filterColumn, argColumn, arrow::uint64()); +        case NUdf::EDataSlot::Int64: +            return std::make_unique<TPreparedSumBlockAggregator<TTag, i64, i64, arrow::Int64Builder, arrow::Int64Scalar>>(filterColumn, argColumn, arrow::int64()); +        case NUdf::EDataSlot::Uint64: +            return std::make_unique<TPreparedSumBlockAggregator<TTag, ui64, ui64, arrow::UInt64Builder, arrow::UInt64Scalar>>(filterColumn, argColumn, arrow::uint64()); +        default: +            throw yexception() << "Unsupported SUM input type"; +        } +    } +} +  class TBlockSumFactory : public IBlockAggregatorFactory {  public: -   std::unique_ptr<IPreparedBlockAggregator> Prepare( -       TTupleType* tupleType, -       std::optional<ui32> filterColumn, -       const std::vector<ui32>& argsColumns, -       const TTypeEnvironment& env) const final { -       Y_UNUSED(env); -       auto blockType = AS_TYPE(TBlockType, tupleType->GetElementType(argsColumns[0])); -       auto argType = blockType->GetItemType(); -       bool isOptional; -       auto dataType = UnpackOptionalData(argType, isOptional); -       if (blockType->GetShape() == TBlockType::EShape::Scalar || isOptional) { -           switch (*dataType->GetDataSlot()) { -           case NUdf::EDataSlot::Int8: -               return std::make_unique<TPreparedSumBlockAggregatorNullableOrScalar<i8, i64, arrow::Int64Builder, arrow::Int8Scalar>>(filterColumn, argsColumns[0], arrow::int64()); -           case NUdf::EDataSlot::Uint8: -               return std::make_unique<TPreparedSumBlockAggregatorNullableOrScalar<ui8, ui64, arrow::UInt64Builder, arrow::UInt8Scalar>>(filterColumn, argsColumns[0], arrow::uint64()); -           case NUdf::EDataSlot::Int16: -               return std::make_unique<TPreparedSumBlockAggregatorNullableOrScalar<i16, i64, arrow::Int64Builder, arrow::Int16Scalar>>(filterColumn, argsColumns[0], arrow::int64()); -           case NUdf::EDataSlot::Uint16: -               return std::make_unique<TPreparedSumBlockAggregatorNullableOrScalar<ui16, ui64, arrow::UInt64Builder, arrow::UInt16Scalar>>(filterColumn, argsColumns[0], arrow::uint64()); -           case NUdf::EDataSlot::Int32: -               return std::make_unique<TPreparedSumBlockAggregatorNullableOrScalar<i32, i64, arrow::Int64Builder, arrow::Int32Scalar>>(filterColumn, argsColumns[0], arrow::int64()); -           case NUdf::EDataSlot::Uint32: -               return std::make_unique<TPreparedSumBlockAggregatorNullableOrScalar<ui32, ui64, arrow::UInt64Builder, arrow::UInt32Scalar>>(filterColumn, argsColumns[0], arrow::uint64()); -           case NUdf::EDataSlot::Int64: -               return std::make_unique<TPreparedSumBlockAggregatorNullableOrScalar<i64, i64, arrow::Int64Builder, arrow::Int64Scalar>>(filterColumn, argsColumns[0], arrow::int64()); -           case NUdf::EDataSlot::Uint64: -               return std::make_unique<TPreparedSumBlockAggregatorNullableOrScalar<ui64, ui64, arrow::UInt64Builder, arrow::UInt64Scalar>>(filterColumn, argsColumns[0], arrow::uint64()); -           default: -               throw yexception() << "Unsupported SUM input type"; -           } -       } else { -           switch (*dataType->GetDataSlot()) { -           case NUdf::EDataSlot::Int8: -               return std::make_unique<TPreparedSumBlockAggregator<i8, i64, arrow::Int64Builder, arrow::Int8Scalar>>(filterColumn, argsColumns[0], arrow::int64()); -           case NUdf::EDataSlot::Uint8: -               return std::make_unique<TPreparedSumBlockAggregator<ui8, ui64, arrow::UInt64Builder, arrow::UInt8Scalar>>(filterColumn, argsColumns[0], arrow::uint64()); -           case NUdf::EDataSlot::Int16: -               return std::make_unique<TPreparedSumBlockAggregator<i16, i64, arrow::Int64Builder, arrow::Int16Scalar>>(filterColumn, argsColumns[0], arrow::int64()); -           case NUdf::EDataSlot::Uint16: -               return std::make_unique<TPreparedSumBlockAggregator<ui16, ui64, arrow::UInt64Builder, arrow::UInt16Scalar>>(filterColumn, argsColumns[0], arrow::uint64()); -           case NUdf::EDataSlot::Int32: -               return std::make_unique<TPreparedSumBlockAggregator<i32, i64, arrow::Int64Builder, arrow::Int32Scalar>>(filterColumn, argsColumns[0], arrow::int64()); -           case NUdf::EDataSlot::Uint32: -               return std::make_unique<TPreparedSumBlockAggregator<ui32, ui64, arrow::UInt64Builder, arrow::UInt32Scalar>>(filterColumn, argsColumns[0], arrow::uint64()); -           case NUdf::EDataSlot::Int64: -               return std::make_unique<TPreparedSumBlockAggregator<i64, i64, arrow::Int64Builder, arrow::Int64Scalar>>(filterColumn, argsColumns[0], arrow::int64()); -           case NUdf::EDataSlot::Uint64: -               return std::make_unique<TPreparedSumBlockAggregator<ui64, ui64, arrow::UInt64Builder, arrow::UInt64Scalar>>(filterColumn, argsColumns[0], arrow::uint64()); -           default: -               throw yexception() << "Unsupported SUM input type"; -           } -       } -   } +    std::unique_ptr<TCombineAllTag::TPreparedAggregator> PrepareCombineAll( +        TTupleType* tupleType, +        std::optional<ui32> filterColumn, +        const std::vector<ui32>& argsColumns, +        const TTypeEnvironment& env) const final { +        Y_UNUSED(env); +        return PrepareSum<TCombineAllTag>(tupleType, filterColumn, argsColumns[0]); +    } + +    std::unique_ptr<TCombineKeysTag::TPreparedAggregator> PrepareCombineKeys( +        TTupleType* tupleType, +        std::optional<ui32> filterColumn, +        const std::vector<ui32>& argsColumns, +        const TTypeEnvironment& env) const final { +        Y_UNUSED(env); +        return PrepareSum<TCombineKeysTag>(tupleType, filterColumn, argsColumns[0]); +    } + +    std::unique_ptr<TFinalizeKeysTag::TPreparedAggregator> PrepareFinalizeKeys( +        TTupleType* tupleType, +        const std::vector<ui32>& argsColumns, +        const TTypeEnvironment& env) const final { +        Y_UNUSED(env); +        return PrepareSum<TFinalizeKeysTag>(tupleType, std::optional<ui32>(), argsColumns[0]); +    }  }; -template <typename TIn, typename TInScalar> -class TPreparedAvgBlockAggregator : public IPreparedBlockAggregator { +template <typename TTag, typename TIn, typename TInScalar> +class TPreparedAvgBlockAggregator : public TTag::TPreparedAggregator {  public:      TPreparedAvgBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn,          const std::shared_ptr<arrow::DataType>& builderDataType) @@ -580,8 +832,8 @@ public:          , BuilderDataType_(builderDataType)      {} -    std::unique_ptr<IBlockAggregator> Make(TComputationContext& ctx) const final { -        return std::make_unique<TAvgBlockAggregator<TIn, TInScalar>>(FilterColumn_, ArgColumn_, BuilderDataType_, ctx); +    std::unique_ptr<typename TTag::TAggregator> Make(TComputationContext& ctx) const final { +        return std::make_unique<TAvgBlockAggregator<TTag, TIn, TInScalar>>(FilterColumn_, ArgColumn_, BuilderDataType_, ctx);      }  private: @@ -590,47 +842,103 @@ private:      const std::shared_ptr<arrow::DataType> BuilderDataType_;  }; +class TPreparedAvgBlockAggregatorOverState : public TFinalizeKeysTag::TPreparedAggregator { +public: +    TPreparedAvgBlockAggregatorOverState(ui32 argColumn) +        : ArgColumn_(argColumn) +    {} + +    std::unique_ptr<typename TFinalizeKeysTag::TAggregator> Make(TComputationContext& ctx) const final { +        return std::make_unique<TAvgBlockAggregatorOverState>(ArgColumn_, ctx); +    } + +private: +    const ui32 ArgColumn_; +}; + +template <typename TTag> +std::unique_ptr<typename TTag::TPreparedAggregator> PrepareAvg(TTupleType* tupleType, std::optional<ui32> filterColumn, ui32 argColumn, const TTypeEnvironment& env); + +template <typename TTag> +std::unique_ptr<typename TTag::TPreparedAggregator> PrepareAvgOverInput(TTupleType* tupleType, std::optional<ui32> filterColumn, ui32 argColumn, const TTypeEnvironment& env) { +    auto doubleType = TDataType::Create(NUdf::TDataType<double>::Id, env); +    auto ui64Type = TDataType::Create(NUdf::TDataType<ui64>::Id, env); +    TVector<TType*> tupleElements = { doubleType, ui64Type }; +    auto avgRetType = TTupleType::Create(2, tupleElements.data(), env); +    std::shared_ptr<arrow::DataType> builderDataType; +    bool isOptional; +    MKQL_ENSURE(ConvertArrowType(avgRetType, isOptional, builderDataType), "Unsupported builder type"); + +    auto argType = AS_TYPE(TBlockType, tupleType->GetElementType(argColumn))->GetItemType(); +    auto dataType = UnpackOptionalData(argType, isOptional); +    switch (*dataType->GetDataSlot()) { +    case NUdf::EDataSlot::Int8: +        return std::make_unique<TPreparedAvgBlockAggregator<TTag, i8, arrow::Int8Scalar>>(filterColumn, argColumn, builderDataType); +    case NUdf::EDataSlot::Uint8: +        return std::make_unique<TPreparedAvgBlockAggregator<TTag, ui8, arrow::UInt8Scalar>>(filterColumn, argColumn, builderDataType); +    case NUdf::EDataSlot::Int16: +        return std::make_unique<TPreparedAvgBlockAggregator<TTag, i16, arrow::Int16Scalar>>(filterColumn, argColumn, builderDataType); +    case NUdf::EDataSlot::Uint16: +        return std::make_unique<TPreparedAvgBlockAggregator<TTag, ui16, arrow::UInt16Scalar>>(filterColumn, argColumn, builderDataType); +    case NUdf::EDataSlot::Int32: +        return std::make_unique<TPreparedAvgBlockAggregator<TTag, i32, arrow::Int32Scalar>>(filterColumn, argColumn, builderDataType); +    case NUdf::EDataSlot::Uint32: +        return std::make_unique<TPreparedAvgBlockAggregator<TTag, ui32, arrow::UInt32Scalar>>(filterColumn, argColumn, builderDataType); +    case NUdf::EDataSlot::Int64: +        return std::make_unique<TPreparedAvgBlockAggregator<TTag, i64, arrow::Int64Scalar>>(filterColumn, argColumn, builderDataType); +    case NUdf::EDataSlot::Uint64: +        return std::make_unique<TPreparedAvgBlockAggregator<TTag, ui64, arrow::UInt64Scalar>>(filterColumn, argColumn, builderDataType); +    default: +        throw yexception() << "Unsupported AVG input type"; +    } +} + +template <> +std::unique_ptr<typename TCombineAllTag::TPreparedAggregator> PrepareAvg<TCombineAllTag>(TTupleType* tupleType, std::optional<ui32> filterColumn, ui32 argColumn, const TTypeEnvironment& env) { +    return PrepareAvgOverInput<TCombineAllTag>(tupleType, filterColumn, argColumn, env); +} + +template <> +std::unique_ptr<typename TCombineKeysTag::TPreparedAggregator> PrepareAvg<TCombineKeysTag>(TTupleType* tupleType, std::optional<ui32> filterColumn, ui32 argColumn, const TTypeEnvironment& env) { +    return PrepareAvgOverInput<TCombineKeysTag>(tupleType, filterColumn, argColumn, env); +} + +template <> +std::unique_ptr<typename TFinalizeKeysTag::TPreparedAggregator> PrepareAvg<TFinalizeKeysTag>(TTupleType* tupleType, std::optional<ui32> filterColumn, ui32 argColumn, const TTypeEnvironment& env) { +    Y_UNUSED(tupleType); +    Y_UNUSED(filterColumn); +    Y_UNUSED(env); +    return std::make_unique<TPreparedAvgBlockAggregatorOverState>(argColumn); +} +  class TBlockAvgFactory : public IBlockAggregatorFactory {  public: -   std::unique_ptr<IPreparedBlockAggregator> Prepare( -       TTupleType* tupleType, -       std::optional<ui32> filterColumn, -       const std::vector<ui32>& argsColumns, -       const TTypeEnvironment& env) const final { - -       auto doubleType = TDataType::Create(NUdf::TDataType<double>::Id, env); -       auto ui64Type = TDataType::Create(NUdf::TDataType<ui64>::Id, env); -       TVector<TType*> tupleElements = { doubleType, ui64Type }; -       auto avgRetType = TTupleType::Create(2, tupleElements.data(), env); -       std::shared_ptr<arrow::DataType> builderDataType; -       bool isOptional; -       MKQL_ENSURE(ConvertArrowType(avgRetType, isOptional, builderDataType), "Unsupported builder type"); - -       auto argType = AS_TYPE(TBlockType, tupleType->GetElementType(argsColumns[0]))->GetItemType(); -       auto dataType = UnpackOptionalData(argType, isOptional); -       switch (*dataType->GetDataSlot()) { -       case NUdf::EDataSlot::Int8: -           return std::make_unique<TPreparedAvgBlockAggregator<i8, arrow::Int8Scalar>>(filterColumn, argsColumns[0], builderDataType); -       case NUdf::EDataSlot::Uint8: -           return std::make_unique<TPreparedAvgBlockAggregator<ui8, arrow::UInt8Scalar>>(filterColumn, argsColumns[0], builderDataType); -       case NUdf::EDataSlot::Int16: -           return std::make_unique<TPreparedAvgBlockAggregator<i16, arrow::Int16Scalar>>(filterColumn, argsColumns[0], builderDataType); -       case NUdf::EDataSlot::Uint16: -           return std::make_unique<TPreparedAvgBlockAggregator<ui16, arrow::UInt16Scalar>>(filterColumn, argsColumns[0], builderDataType); -       case NUdf::EDataSlot::Int32: -           return std::make_unique<TPreparedAvgBlockAggregator<i32, arrow::Int32Scalar>>(filterColumn, argsColumns[0], builderDataType); -       case NUdf::EDataSlot::Uint32: -           return std::make_unique<TPreparedAvgBlockAggregator<ui32, arrow::UInt32Scalar>>(filterColumn, argsColumns[0], builderDataType); -       case NUdf::EDataSlot::Int64: -           return std::make_unique<TPreparedAvgBlockAggregator<i64, arrow::Int64Scalar>>(filterColumn, argsColumns[0], builderDataType); -       case NUdf::EDataSlot::Uint64: -           return std::make_unique<TPreparedAvgBlockAggregator<ui64, arrow::UInt64Scalar>>(filterColumn, argsColumns[0], builderDataType); -       default: -           throw yexception() << "Unsupported AVG input type"; -       } -   } +    std::unique_ptr<TCombineAllTag::TPreparedAggregator> PrepareCombineAll( +        TTupleType* tupleType, +        std::optional<ui32> filterColumn, +        const std::vector<ui32>& argsColumns, +        const TTypeEnvironment& env) const final { +        return PrepareAvg<TCombineAllTag>(tupleType, filterColumn, argsColumns[0], env); +    } + +    std::unique_ptr<TCombineKeysTag::TPreparedAggregator> PrepareCombineKeys( +        TTupleType* tupleType, +        std::optional<ui32> filterColumn, +        const std::vector<ui32>& argsColumns, +        const TTypeEnvironment& env) const final { +        return PrepareAvg<TCombineKeysTag>(tupleType, filterColumn, argsColumns[0], env); +    } + +    std::unique_ptr<TFinalizeKeysTag::TPreparedAggregator> PrepareFinalizeKeys( +        TTupleType* tupleType, +        const std::vector<ui32>& argsColumns, +        const TTypeEnvironment& env) const final { +        return PrepareAvg<TFinalizeKeysTag>(tupleType, std::optional<ui32>(), argsColumns[0], env); +    }  }; +} +  std::unique_ptr<IBlockAggregatorFactory> MakeBlockSumFactory() {      return std::make_unique<TBlockSumFactory>();  } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp index ddf51de9bda..a6d4882ac35 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp @@ -283,6 +283,7 @@ struct TCallableComputationNodeBuilderFuncMapFiller {          {"BlockCompress", &WrapBlockCompress},          {"BlockCombineAll", &WrapBlockCombineAll},          {"BlockCombineHashed", &WrapBlockCombineHashed}, +        {"BlockMergeFinalizeHashed", &WrapBlockMergeFinalizeHashed},          {"MakeHeap", &WrapMakeHeap},          {"PushHeap", &WrapPushHeap},          {"PopHeap", &WrapPopHeap}, diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp index 53c27b7e471..0f4ec2b5754 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.cpp +++ b/ydb/library/yql/minikql/mkql_program_builder.cpp @@ -5372,6 +5372,36 @@ TRuntimeNode TProgramBuilder::BlockCombineHashed(TRuntimeNode flow, std::optiona      return TRuntimeNode(builder.Build(), false);  } +TRuntimeNode TProgramBuilder::BlockMergeFinalizeHashed(TRuntimeNode flow, const TArrayRef<ui32>& keys, +    const TArrayRef<const TAggInfo>& aggs, TType* returnType) { +    if constexpr (RuntimeVersion < 31U) { +        THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; +    } + +    TCallableBuilder builder(Env, __func__, returnType); +    builder.Add(flow); + +    TVector<TRuntimeNode> keyNodes; +    for (const auto& key : keys) { +        keyNodes.push_back(NewDataLiteral<ui32>(key)); +    } + +    builder.Add(NewTuple(keyNodes)); +    TVector<TRuntimeNode> aggsNodes; +    for (const auto& agg : aggs) { +        TVector<TRuntimeNode> params; +        params.push_back(NewDataLiteral<NUdf::EDataSlot::String>(agg.Name)); +        for (const auto& col : agg.ArgsColumns) { +            params.push_back(NewDataLiteral<ui32>(col)); +        } + +        aggsNodes.push_back(NewTuple(params)); +    } + +    builder.Add(NewTuple(aggsNodes)); +    return TRuntimeNode(builder.Build(), false); +} +  bool CanExportType(TType* type, const TTypeEnvironment& env) {      if (type->GetKind() == TType::EKind::Type) {          return false; // Type of Type diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h index 1214c5ddc5a..81f4b13e6a9 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.h +++ b/ydb/library/yql/minikql/mkql_program_builder.h @@ -261,6 +261,8 @@ public:          const TArrayRef<const TAggInfo>& aggs, TType* returnType);      TRuntimeNode BlockCombineHashed(TRuntimeNode flow, std::optional<ui32> filterColumn, const TArrayRef<ui32>& keys,          const TArrayRef<const TAggInfo>& aggs, TType* returnType); +    TRuntimeNode BlockMergeFinalizeHashed(TRuntimeNode flow, const TArrayRef<ui32>& keys, +        const TArrayRef<const TAggInfo>& aggs, TType* returnType);      // udfs      TRuntimeNode Udf(  | 
