summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <[email protected]>2022-12-16 20:52:04 +0300
committervvvv <[email protected]>2022-12-16 20:52:04 +0300
commitc196050dfde71d4cd34f0f50366049aaf582d919 (patch)
treee657912d9cfe63cc8e0173c6178d57b869a965e8
parentf6fd13cea19380072929532112fd9d5c4e9e7857 (diff)
WIP support of final aggregation by keys (minikql only)
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp269
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg.h1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp303
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp9
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h88
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp515
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp834
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp1
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.cpp30
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.h2
10 files changed, 1427 insertions, 625 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
index ae8af5df98f..66aab94ef4b 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
@@ -110,8 +110,9 @@ namespace NMiniKQL {
namespace {
+template <typename T>
struct TAggParams {
- std::unique_ptr<IPreparedBlockAggregator> Prepared_;
+ std::unique_ptr<IPreparedBlockAggregator<T>> Prepared_;
};
struct TKeyParams {
@@ -311,7 +312,7 @@ public:
IComputationWideFlowNode* flow,
std::optional<ui32> filterColumn,
size_t width,
- TVector<TAggParams>&& aggsParams)
+ TVector<TAggParams<IBlockAggregatorCombineAll>>&& aggsParams)
: TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any)
, Flow_(flow)
, FilterColumn_(filterColumn)
@@ -394,12 +395,12 @@ private:
struct TState : public TComputationValue<TState> {
TVector<NUdf::TUnboxedValue> Values_;
TVector<NUdf::TUnboxedValue*> ValuePointers_;
- TVector<std::unique_ptr<IBlockAggregator>> Aggs_;
+ TVector<std::unique_ptr<IBlockAggregatorCombineAll>> Aggs_;
bool IsFinished_ = false;
bool HasValues_ = false;
TVector<char> AggStates_;
- TState(TMemoryUsageInfo* memInfo, size_t width, std::optional<ui32> filterColumn, const TVector<TAggParams>& params, TComputationContext& ctx)
+ TState(TMemoryUsageInfo* memInfo, size_t width, std::optional<ui32> filterColumn, const TVector<TAggParams<IBlockAggregatorCombineAll>>& params, TComputationContext& ctx)
: TComputationValue(memInfo)
, Values_(width)
, ValuePointers_(width)
@@ -443,7 +444,7 @@ private:
IComputationWideFlowNode* Flow_;
std::optional<ui32> FilterColumn_;
const size_t Width_;
- const TVector<TAggParams> AggsParams_;
+ const TVector<TAggParams<IBlockAggregatorCombineAll>> AggsParams_;
};
template <typename T>
@@ -482,19 +483,19 @@ TStringBuf GetKeyView(const TSSOKey& key) {
return key.AsView();
}
-template <typename TKey, bool UseSet, bool UseFilter>
-class TBlockCombineHashedWrapper : public TStatefulWideFlowComputationNode<TBlockCombineHashedWrapper<TKey, UseSet, UseFilter>> {
+template <typename TKey, typename TAggregator, bool UseSet, bool UseFilter, bool Finalize, typename TDerived>
+class THashedWrapperBase : public TStatefulWideFlowComputationNode<TDerived> {
public:
- using TSelf = TBlockCombineHashedWrapper<TKey, UseSet, UseFilter>;
- using TBase = TStatefulWideFlowComputationNode<TSelf>;
+ using TSelf = THashedWrapperBase<TKey, TAggregator, UseSet, UseFilter, Finalize, TDerived>;
+ using TBase = TStatefulWideFlowComputationNode<TDerived>;
- TBlockCombineHashedWrapper(TComputationMutables& mutables,
+ THashedWrapperBase(TComputationMutables& mutables,
IComputationWideFlowNode* flow,
std::optional<ui32> filterColumn,
size_t width,
const std::vector<TKeyParams>& keys,
std::vector<std::unique_ptr<IKeySerializer>>&& keySerializers,
- TVector<TAggParams>&& aggsParams)
+ TVector<TAggParams<TAggregator>>&& aggsParams)
: TBase(mutables, flow, EValueRepresentation::Any)
, Flow_(flow)
, FilterColumn_(filterColumn)
@@ -507,6 +508,7 @@ public:
MKQL_ENSURE(Width_ > 0, "Missing block length column");
if constexpr (UseFilter) {
MKQL_ENSURE(filterColumn, "Missing filter column");
+ MKQL_ENSURE(!Finalize, "Filter isn't compatible with Finalize");
} else {
MKQL_ENSURE(!filterColumn, "Unexpected filter column");
}
@@ -588,7 +590,11 @@ public:
if (isNew) {
for (size_t i = 0; i < s.Aggs_.size(); ++i) {
if (output[Keys_.size() + i]) {
- s.Aggs_[i]->InitKey(ptr, s.Values_.data(), row);
+ if constexpr (Finalize) {
+ s.Aggs_[i]->LoadState(ptr, s.Values_.data(), row);
+ } else {
+ s.Aggs_[i]->InitKey(ptr, s.Values_.data(), row);
+ }
}
ptr += s.Aggs_[i]->StateSize;
@@ -602,7 +608,11 @@ public:
} else {
for (size_t i = 0; i < s.Aggs_.size(); ++i) {
if (output[Keys_.size() + i]) {
- s.Aggs_[i]->UpdateKey(ptr, s.Values_.data(), row);
+ if constexpr (Finalize) {
+ s.Aggs_[i]->UpdateState(ptr, s.Values_.data(), row);
+ } else {
+ s.Aggs_[i]->UpdateKey(ptr, s.Values_.data(), row);
+ }
}
ptr += s.Aggs_[i]->StateSize;
@@ -644,7 +654,11 @@ public:
} else {
TVector<std::unique_ptr<IAggColumnBuilder>> aggBuilders;
for (const auto& a : s.Aggs_) {
- aggBuilders.emplace_back(a->MakeBuilder(size));
+ if constexpr (Finalize) {
+ aggBuilders.emplace_back(a->MakeResultBuilder(size));
+ } else {
+ aggBuilders.emplace_back(a->MakeStateBuilder(size));
+ }
}
for (auto iter = s.HashMap_->Begin(); iter != s.HashMap_->End(); s.HashMap_->Advance(iter)) {
@@ -694,7 +708,7 @@ private:
TVector<NUdf::TUnboxedValue> Values_;
TVector<NUdf::TUnboxedValue*> ValuePointers_;
- TVector<std::unique_ptr<IBlockAggregator>> Aggs_;
+ TVector<std::unique_ptr<TAggregator>> Aggs_;
bool IsFinished_ = false;
bool HasValues_ = false;
ui32 TotalStateSize_ = 0;
@@ -702,7 +716,7 @@ private:
std::unique_ptr<TRobinHoodHashSet<TKey, std::equal_to<TKey>, std::hash<TKey>, TMKQLAllocator<char>>> HashSet_;
TPagedArena Arena_;
- TState(TMemoryUsageInfo* memInfo, size_t width, std::optional<ui32> filterColumn, const TVector<TAggParams>& params, TComputationContext& ctx)
+ TState(TMemoryUsageInfo* memInfo, size_t width, std::optional<ui32> filterColumn, const TVector<TAggParams<TAggregator>>& params, TComputationContext& ctx)
: TBase(memInfo)
, Values_(width)
, ValuePointers_(width)
@@ -748,11 +762,80 @@ private:
const size_t Width_;
const size_t OutputWidth_;
const std::vector<TKeyParams> Keys_;
- const TVector<TAggParams> AggsParams_;
+ const TVector<TAggParams<TAggregator>> AggsParams_;
std::vector<std::unique_ptr<IKeySerializer>> KeySerializers_;
};
-void FillAggParams(TTupleLiteral* aggsVal, TTupleType* tupleType, std::optional<ui32> filterColumn, TVector<TAggParams>& aggsParams, const TTypeEnvironment& env) {
+template <typename TKey, bool UseSet, bool UseFilter>
+class TBlockCombineHashedWrapper : public THashedWrapperBase<TKey, IBlockAggregatorCombineKeys, UseSet, UseFilter, false, TBlockCombineHashedWrapper<TKey, UseSet, UseFilter>> {
+public:
+ using TSelf = TBlockCombineHashedWrapper<TKey, UseSet, UseFilter>;
+ using TBase = THashedWrapperBase<TKey, IBlockAggregatorCombineKeys, UseSet, UseFilter, false, TSelf>;
+
+ TBlockCombineHashedWrapper(TComputationMutables& mutables,
+ IComputationWideFlowNode* flow,
+ std::optional<ui32> filterColumn,
+ size_t width,
+ const std::vector<TKeyParams>& keys,
+ std::vector<std::unique_ptr<IKeySerializer>>&& keySerializers,
+ TVector<TAggParams<IBlockAggregatorCombineKeys>>&& aggsParams)
+ : TBase(mutables, flow, filterColumn, width, keys, std::move(keySerializers), std::move(aggsParams))
+ {}
+};
+
+template <typename TKey, bool UseSet>
+class TBlockMergeFinalizeHashedWrapper : public THashedWrapperBase<TKey, IBlockAggregatorFinalizeKeys, UseSet, false, true, TBlockMergeFinalizeHashedWrapper<TKey, UseSet>> {
+public:
+ using TSelf = TBlockMergeFinalizeHashedWrapper<TKey, UseSet>;
+ using TBase = THashedWrapperBase<TKey, IBlockAggregatorFinalizeKeys, UseSet, false, true, TSelf>;
+
+ TBlockMergeFinalizeHashedWrapper(TComputationMutables& mutables,
+ IComputationWideFlowNode* flow,
+ size_t width,
+ const std::vector<TKeyParams>& keys,
+ std::vector<std::unique_ptr<IKeySerializer>>&& keySerializers,
+ TVector<TAggParams<IBlockAggregatorFinalizeKeys>>&& aggsParams)
+ : TBase(mutables, flow, {}, width, keys, std::move(keySerializers), std::move(aggsParams))
+ {}
+};
+
+template <typename TAggregator>
+std::unique_ptr<IPreparedBlockAggregator<TAggregator>> PrepareBlockAggregator(const IBlockAggregatorFactory& factory,
+ TTupleType* tupleType,
+ std::optional<ui32> filterColumn,
+ const std::vector<ui32>& argsColumns,
+ const TTypeEnvironment& env);
+
+template <>
+std::unique_ptr<IPreparedBlockAggregator<IBlockAggregatorCombineAll>> PrepareBlockAggregator<IBlockAggregatorCombineAll>(const IBlockAggregatorFactory& factory,
+ TTupleType* tupleType,
+ std::optional<ui32> filterColumn,
+ const std::vector<ui32>& argsColumns,
+ const TTypeEnvironment& env) {
+ return factory.PrepareCombineAll(tupleType, filterColumn, argsColumns, env);
+}
+
+template <>
+std::unique_ptr<IPreparedBlockAggregator<IBlockAggregatorCombineKeys>> PrepareBlockAggregator<IBlockAggregatorCombineKeys>(const IBlockAggregatorFactory& factory,
+ TTupleType* tupleType,
+ std::optional<ui32> filterColumn,
+ const std::vector<ui32>& argsColumns,
+ const TTypeEnvironment& env) {
+ return factory.PrepareCombineKeys(tupleType, filterColumn, argsColumns, env);
+}
+
+template <>
+std::unique_ptr<IPreparedBlockAggregator<IBlockAggregatorFinalizeKeys>> PrepareBlockAggregator<IBlockAggregatorFinalizeKeys>(const IBlockAggregatorFactory& factory,
+ TTupleType* tupleType,
+ std::optional<ui32> filterColumn,
+ const std::vector<ui32>& argsColumns,
+ const TTypeEnvironment& env) {
+ MKQL_ENSURE(!filterColumn, "Unexpected filter column");
+ return factory.PrepareFinalizeKeys(tupleType, argsColumns, env);
+}
+
+template <typename TAggregator>
+void FillAggParams(TTupleLiteral* aggsVal, TTupleType* tupleType, std::optional<ui32> filterColumn, TVector<TAggParams<TAggregator>>& aggsParams, const TTypeEnvironment& env) {
for (ui32 i = 0; i < aggsVal->GetValuesCount(); ++i) {
auto aggVal = AS_VALUE(TTupleLiteral, aggsVal->GetValue(i));
auto name = AS_VALUE(TDataLiteral, aggVal->GetValue(0))->AsValue().AsStringRef();
@@ -762,8 +845,8 @@ void FillAggParams(TTupleLiteral* aggsVal, TTupleType* tupleType, std::optional<
argColumns.push_back(AS_VALUE(TDataLiteral, aggVal->GetValue(j))->AsValue().Get<ui32>());
}
- TAggParams p;
- p.Prepared_ = PrepareBlockAggregator(name, tupleType, filterColumn, argColumns, env);
+ TAggParams<TAggregator> p;
+ p.Prepared_ = PrepareBlockAggregator<TAggregator>(GetBlockAggregatorFactory(name), tupleType, filterColumn, argColumns, env);
aggsParams.emplace_back(std::move(p));
}
}
@@ -777,7 +860,7 @@ IComputationNode* MakeBlockCombineHashedWrapper(
size_t width,
const std::vector<TKeyParams>& keys,
std::vector<std::unique_ptr<IKeySerializer>>&& keySerializers,
- TVector<TAggParams>&& aggsParams) {
+ TVector<TAggParams<IBlockAggregatorCombineKeys>>&& aggsParams) {
if (totalKeysSize <= sizeof(ui32)) {
return new TBlockCombineHashedWrapper<ui32, UseSet, UseFilter>(mutables, flow, filterColumn, width, keys, std::move(keySerializers), std::move(aggsParams));
}
@@ -789,55 +872,29 @@ IComputationNode* MakeBlockCombineHashedWrapper(
return new TBlockCombineHashedWrapper<TSSOKey, UseSet, UseFilter>(mutables, flow, filterColumn, width, keys, std::move(keySerializers), std::move(aggsParams));
}
-}
-
-IComputationNode* WrapBlockCombineAll(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- MKQL_ENSURE(callable.GetInputsCount() == 3, "Expected 3 args");
- const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
- const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType());
-
- auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));
- MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
-
- auto filterColumnVal = AS_VALUE(TOptionalLiteral, callable.GetInput(1));
- std::optional<ui32> filterColumn;
- if (filterColumnVal->HasItem()) {
- filterColumn = AS_VALUE(TDataLiteral, filterColumnVal->GetItem())->AsValue().Get<ui32>();
- }
-
- auto aggsVal = AS_VALUE(TTupleLiteral, callable.GetInput(2));
- TVector<TAggParams> aggsParams;
- FillAggParams(aggsVal, tupleType, filterColumn, aggsParams, ctx.Env);
- return new TBlockCombineAllWrapper(ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), std::move(aggsParams));
-}
-
-IComputationNode* WrapBlockCombineHashed(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- MKQL_ENSURE(callable.GetInputsCount() == 4, "Expected 4 args");
- const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
- const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType());
-
- auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));
- MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
-
- auto filterColumnVal = AS_VALUE(TOptionalLiteral, callable.GetInput(1));
- std::optional<ui32> filterColumn;
- if (filterColumnVal->HasItem()) {
- filterColumn = AS_VALUE(TDataLiteral, filterColumnVal->GetItem())->AsValue().Get<ui32>();
+template <bool UseSet>
+IComputationNode* MakeBlockMergeFinalizeHashedWrapper(
+ ui32 totalKeysSize,
+ TComputationMutables& mutables,
+ IComputationWideFlowNode* flow,
+ size_t width,
+ const std::vector<TKeyParams>& keys,
+ std::vector<std::unique_ptr<IKeySerializer>>&& keySerializers,
+ TVector<TAggParams<IBlockAggregatorFinalizeKeys>>&& aggsParams) {
+ if (totalKeysSize <= sizeof(ui32)) {
+ return new TBlockMergeFinalizeHashedWrapper<ui32, UseSet>(mutables, flow, width, keys, std::move(keySerializers), std::move(aggsParams));
}
- auto keysVal = AS_VALUE(TTupleLiteral, callable.GetInput(2));
- std::vector<TKeyParams> keys;
- for (ui32 i = 0; i < keysVal->GetValuesCount(); ++i) {
- ui32 index = AS_VALUE(TDataLiteral, keysVal->GetValue(i))->AsValue().Get<ui32>();
- keys.emplace_back(TKeyParams{ index, tupleType->GetElementType(index) });
+ if (totalKeysSize <= sizeof(ui64)) {
+ return new TBlockMergeFinalizeHashedWrapper<ui64, UseSet>(mutables, flow, width, keys, std::move(keySerializers), std::move(aggsParams));
}
- auto aggsVal = AS_VALUE(TTupleLiteral, callable.GetInput(3));
- TVector<TAggParams> aggsParams;
- FillAggParams(aggsVal, tupleType, filterColumn, aggsParams, ctx.Env);
+ return new TBlockMergeFinalizeHashedWrapper<TSSOKey, UseSet>(mutables, flow, width, keys, std::move(keySerializers), std::move(aggsParams));
+}
- ui32 totalKeysSize = 0;
- std::vector<std::unique_ptr<IKeySerializer>> keySerializers;
+void PrepareKeys(const std::vector<TKeyParams>& keys, ui32& totalKeysSize, std::vector<std::unique_ptr<IKeySerializer>>& keySerializers) {
+ totalKeysSize = 0;
+ keySerializers.clear();
for (const auto& k : keys) {
auto itemType = AS_TYPE(TBlockType, k.Type)->GetItemType();
bool isOptional;
@@ -928,6 +985,58 @@ IComputationNode* WrapBlockCombineHashed(TCallable& callable, const TComputation
throw yexception() << "Unsupported key type";
}
}
+}
+
+}
+
+IComputationNode* WrapBlockCombineAll(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
+ MKQL_ENSURE(callable.GetInputsCount() == 3, "Expected 3 args");
+ const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
+ const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType());
+
+ auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));
+ MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
+
+ auto filterColumnVal = AS_VALUE(TOptionalLiteral, callable.GetInput(1));
+ std::optional<ui32> filterColumn;
+ if (filterColumnVal->HasItem()) {
+ filterColumn = AS_VALUE(TDataLiteral, filterColumnVal->GetItem())->AsValue().Get<ui32>();
+ }
+
+ auto aggsVal = AS_VALUE(TTupleLiteral, callable.GetInput(2));
+ TVector<TAggParams<IBlockAggregatorCombineAll>> aggsParams;
+ FillAggParams<IBlockAggregatorCombineAll>(aggsVal, tupleType, filterColumn, aggsParams, ctx.Env);
+ return new TBlockCombineAllWrapper(ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), std::move(aggsParams));
+}
+
+IComputationNode* WrapBlockCombineHashed(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
+ MKQL_ENSURE(callable.GetInputsCount() == 4, "Expected 4 args");
+ const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
+ const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType());
+
+ auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));
+ MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
+
+ auto filterColumnVal = AS_VALUE(TOptionalLiteral, callable.GetInput(1));
+ std::optional<ui32> filterColumn;
+ if (filterColumnVal->HasItem()) {
+ filterColumn = AS_VALUE(TDataLiteral, filterColumnVal->GetItem())->AsValue().Get<ui32>();
+ }
+
+ auto keysVal = AS_VALUE(TTupleLiteral, callable.GetInput(2));
+ std::vector<TKeyParams> keys;
+ for (ui32 i = 0; i < keysVal->GetValuesCount(); ++i) {
+ ui32 index = AS_VALUE(TDataLiteral, keysVal->GetValue(i))->AsValue().Get<ui32>();
+ keys.emplace_back(TKeyParams{ index, tupleType->GetElementType(index) });
+ }
+
+ auto aggsVal = AS_VALUE(TTupleLiteral, callable.GetInput(3));
+ TVector<TAggParams<IBlockAggregatorCombineKeys>> aggsParams;
+ FillAggParams<IBlockAggregatorCombineKeys>(aggsVal, tupleType, filterColumn, aggsParams, ctx.Env);
+
+ ui32 totalKeysSize = 0;
+ std::vector<std::unique_ptr<IKeySerializer>> keySerializers;
+ PrepareKeys(keys, totalKeysSize, keySerializers);
if (filterColumn) {
if (aggsParams.size() == 0) {
@@ -944,5 +1053,35 @@ IComputationNode* WrapBlockCombineHashed(TCallable& callable, const TComputation
}
}
+IComputationNode* WrapBlockMergeFinalizeHashed(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
+ MKQL_ENSURE(callable.GetInputsCount() == 3, "Expected 3 args");
+ const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
+ const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType());
+
+ auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));
+ MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
+
+ auto keysVal = AS_VALUE(TTupleLiteral, callable.GetInput(1));
+ std::vector<TKeyParams> keys;
+ for (ui32 i = 0; i < keysVal->GetValuesCount(); ++i) {
+ ui32 index = AS_VALUE(TDataLiteral, keysVal->GetValue(i))->AsValue().Get<ui32>();
+ keys.emplace_back(TKeyParams{ index, tupleType->GetElementType(index) });
+ }
+
+ auto aggsVal = AS_VALUE(TTupleLiteral, callable.GetInput(2));
+ TVector<TAggParams<IBlockAggregatorFinalizeKeys>> aggsParams;
+ FillAggParams<IBlockAggregatorFinalizeKeys>(aggsVal, tupleType, {}, aggsParams, ctx.Env);
+
+ ui32 totalKeysSize = 0;
+ std::vector<std::unique_ptr<IKeySerializer>> keySerializers;
+ PrepareKeys(keys, totalKeysSize, keySerializers);
+
+ if (aggsParams.size() == 0) {
+ return MakeBlockMergeFinalizeHashedWrapper<true>(totalKeysSize, ctx.Mutables, wideFlow, tupleType->GetElementsCount(), keys, std::move(keySerializers), std::move(aggsParams));
+ } else {
+ return MakeBlockMergeFinalizeHashedWrapper<false>(totalKeysSize, ctx.Mutables, wideFlow, tupleType->GetElementsCount(), keys, std::move(keySerializers), std::move(aggsParams));
+ }
+}
+
}
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.h
index 814514271d7..872788babec 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.h
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.h
@@ -7,6 +7,7 @@ namespace NMiniKQL {
IComputationNode* WrapBlockCombineAll(TCallable& callable, const TComputationNodeFactoryContext& ctx);
IComputationNode* WrapBlockCombineHashed(TCallable& callable, const TComputationNodeFactoryContext& ctx);
+IComputationNode* WrapBlockMergeFinalizeHashed(TCallable& callable, const TComputationNodeFactoryContext& ctx);
}
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp
index 5d1cae3a812..e40ae152f19 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp
@@ -7,40 +7,52 @@
namespace NKikimr {
namespace NMiniKQL {
-class TCountAllBlockAggregator : public TBlockAggregatorBase {
+namespace {
+
+struct TState {
+ ui64 Count_ = 0;
+};
+
+class TColumnBuilder : public IAggColumnBuilder {
public:
- struct TState {
- ui64 Count_ = 0;
- };
-
- class TColumnBuilder : public IAggColumnBuilder {
- public:
- TColumnBuilder(ui64 size, TComputationContext& ctx)
- : Builder_(arrow::uint64(), &ctx.ArrowMemoryPool)
- , Ctx_(ctx)
- {
- ARROW_OK(Builder_.Reserve(size));
- }
+ TColumnBuilder(ui64 size, TComputationContext& ctx)
+ : Builder_(arrow::uint64(), &ctx.ArrowMemoryPool)
+ , Ctx_(ctx)
+ {
+ ARROW_OK(Builder_.Reserve(size));
+ }
- void Add(const void* state) final {
- auto typedState = static_cast<const TState*>(state);
- Builder_.UnsafeAppend(typedState->Count_);
- }
+ void Add(const void* state) final {
+ auto typedState = static_cast<const TState*>(state);
+ Builder_.UnsafeAppend(typedState->Count_);
+ }
- NUdf::TUnboxedValue Build() final {
- std::shared_ptr<arrow::ArrayData> result;
- ARROW_OK(Builder_.FinishInternal(&result));
- return Ctx_.HolderFactory.CreateArrowBlock(result);
- }
+ NUdf::TUnboxedValue Build() final {
+ std::shared_ptr<arrow::ArrayData> result;
+ ARROW_OK(Builder_.FinishInternal(&result));
+ return Ctx_.HolderFactory.CreateArrowBlock(result);
+ }
- private:
- arrow::UInt64Builder Builder_;
- TComputationContext& Ctx_;
- };
+private:
+ arrow::UInt64Builder Builder_;
+ TComputationContext& Ctx_;
+};
+
+template <typename TTag>
+class TCountAllAggregator;
+
+template <typename TTag>
+class TCountAggregator;
+
+template <>
+class TCountAllAggregator<TCombineAllTag> : public TCombineAllTag::TBase {
+public:
+ using TBase = TCombineAllTag::TBase;
- TCountAllBlockAggregator(std::optional<ui32> filterColumn, TComputationContext& ctx)
- : TBlockAggregatorBase(sizeof(TState), filterColumn, ctx)
+ TCountAllAggregator(std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
+ : TBase(sizeof(TState), filterColumn, ctx)
{
+ Y_UNUSED(argColumn);
}
void InitState(void* state) final {
@@ -51,9 +63,10 @@ public:
auto typedState = static_cast<TState*>(state);
Y_UNUSED(columns);
if (filtered) {
- typedState->Count_ += *filtered;
- } else {
- typedState->Count_ += batchLength;
+ typedState->Count_ += *filtered;
+ }
+ else {
+ typedState->Count_ += batchLength;
}
}
@@ -61,6 +74,18 @@ public:
auto typedState = static_cast<const TState*>(state);
return NUdf::TUnboxedValuePod(typedState->Count_);
}
+};
+
+template <>
+class TCountAllAggregator<TCombineKeysTag> : public TCombineKeysTag::TBase {
+public:
+ using TBase = TCombineKeysTag::TBase;
+
+ TCountAllAggregator(std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
+ : TBase(sizeof(TState), filterColumn, ctx)
+ {
+ Y_UNUSED(argColumn);
+ }
void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
new(state) TState();
@@ -74,44 +99,56 @@ public:
typedState->Count_ += 1;
}
- std::unique_ptr<IAggColumnBuilder> MakeBuilder(ui64 size) final {
+ std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) final {
return std::make_unique<TColumnBuilder>(size, Ctx_);
}
};
-class TCountBlockAggregator : public TBlockAggregatorBase {
+template <>
+class TCountAllAggregator<TFinalizeKeysTag> : public TFinalizeKeysTag::TBase {
public:
- struct TState {
- ui64 Count_ = 0;
- };
-
- class TColumnBuilder : public IAggColumnBuilder {
- public:
- TColumnBuilder(ui64 size, TComputationContext& ctx)
- : Builder_(arrow::uint64(), &ctx.ArrowMemoryPool)
- , Ctx_(ctx)
- {
- ARROW_OK(Builder_.Reserve(size));
- }
+ using TBase = TFinalizeKeysTag::TBase;
- void Add(const void* state) final {
- auto typedState = static_cast<const TState*>(state);
- Builder_.UnsafeAppend(typedState->Count_);
- }
+ TCountAllAggregator(std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
+ : TBase(sizeof(TState), filterColumn, ctx)
+ , ArgColumn_(argColumn)
+ {
+ }
+
+ void LoadState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ new(state) TState();
+ UpdateState(state, columns, row);
+ }
- NUdf::TUnboxedValue Build() final {
- std::shared_ptr<arrow::ArrayData> result;
- ARROW_OK(Builder_.FinishInternal(&result));
- return Ctx_.HolderFactory.CreateArrowBlock(result);
+ void UpdateState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ auto typedState = static_cast<TState*>(state);
+ const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
+ if (datum.is_scalar()) {
+ MKQL_ENSURE(datum.scalar()->is_valid, "Expected not null");
+ typedState->Count_ += datum.scalar_as<arrow::UInt64Scalar>().value;
+ } else {
+ const auto& array = datum.array();
+ auto ptr = array->GetValues<ui64>(1);
+ MKQL_ENSURE(array->GetNullCount() == 0, "Expected not null");
+ typedState->Count_ += ptr[row];
}
+ }
+
+ std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final {
+ return std::make_unique<TColumnBuilder>(size, Ctx_);
+ }
+
+private:
+ const ui32 ArgColumn_;
+};
- private:
- arrow::UInt64Builder Builder_;
- TComputationContext& Ctx_;
- };
+template <>
+class TCountAggregator<TCombineAllTag> : public TCombineAllTag::TBase {
+public:
+ using TBase = TCombineAllTag::TBase;
- TCountBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
- : TBlockAggregatorBase(sizeof(TState), filterColumn, ctx)
+ TCountAggregator(std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
+ : TBase(sizeof(TState), filterColumn, ctx)
, ArgColumn_(argColumn)
{
}
@@ -162,6 +199,21 @@ public:
return NUdf::TUnboxedValuePod(typedState->Count_);
}
+private:
+ const ui32 ArgColumn_;
+};
+
+template <>
+class TCountAggregator<TCombineKeysTag> : public TCombineKeysTag::TBase {
+public:
+ using TBase = TCombineKeysTag::TBase;
+
+ TCountAggregator(std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
+ : TBase(sizeof(TState), filterColumn, ctx)
+ , ArgColumn_(argColumn)
+ {
+ }
+
void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
new(state) TState();
UpdateKey(state, columns, row);
@@ -187,7 +239,7 @@ public:
}
}
- std::unique_ptr<IAggColumnBuilder> MakeBuilder(ui64 size) final {
+ std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) final {
return std::make_unique<TColumnBuilder>(size, Ctx_);
}
@@ -195,43 +247,44 @@ private:
const ui32 ArgColumn_;
};
-class TPreparedCountAllBlockAggregator : public IPreparedBlockAggregator {
+template <>
+class TCountAggregator<TFinalizeKeysTag> : public TCountAllAggregator<TFinalizeKeysTag>
+{
public:
- TPreparedCountAllBlockAggregator(std::optional<ui32> filterColumn)
+ using TBase = TCountAllAggregator<TFinalizeKeysTag>;
+
+ TCountAggregator(std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
+ : TBase(filterColumn, argColumn, ctx)
+ {}
+};
+
+template <typename TTag>
+class TPreparedCountAll : public TTag::TPreparedAggregator {
+public:
+ TPreparedCountAll(std::optional<ui32> filterColumn, ui32 argColumn)
: FilterColumn_(filterColumn)
+ , ArgColumn_(argColumn)
{}
- std::unique_ptr<IBlockAggregator> Make(TComputationContext& ctx) const final {
- return std::make_unique<TCountAllBlockAggregator>(FilterColumn_, ctx);
+ std::unique_ptr<typename TTag::TAggregator> Make(TComputationContext& ctx) const final {
+ return std::make_unique<TCountAllAggregator<TTag>>(FilterColumn_, ArgColumn_, ctx);
}
private:
const std::optional<ui32> FilterColumn_;
+ const ui32 ArgColumn_;
};
-class TBlockCountAllFactory : public IBlockAggregatorFactory {
-public:
- std::unique_ptr<IPreparedBlockAggregator> Prepare(
- TTupleType* tupleType,
- std::optional<ui32> filterColumn,
- const std::vector<ui32>& argsColumns,
- const TTypeEnvironment& env) const final {
- Y_UNUSED(tupleType);
- Y_UNUSED(argsColumns);
- Y_UNUSED(env);
- return std::make_unique<TPreparedCountAllBlockAggregator>(filterColumn);
- }
-};
-
-class TPreparedCountBlockAggregator : public IPreparedBlockAggregator {
+template <typename TTag>
+class TPreparedCount : public TTag::TPreparedAggregator {
public:
- TPreparedCountBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn)
+ TPreparedCount(std::optional<ui32> filterColumn, ui32 argColumn)
: FilterColumn_(filterColumn)
, ArgColumn_(argColumn)
{}
- std::unique_ptr<IBlockAggregator> Make(TComputationContext& ctx) const final {
- return std::make_unique<TCountBlockAggregator>(FilterColumn_, ArgColumn_, ctx);
+ std::unique_ptr<typename TTag::TAggregator> Make(TComputationContext& ctx) const final {
+ return std::make_unique<TCountAggregator<TTag>>(FilterColumn_, ArgColumn_, ctx);
}
private:
@@ -239,19 +292,87 @@ private:
const ui32 ArgColumn_;
};
+template <typename TTag>
+std::unique_ptr<typename TTag::TPreparedAggregator> PrepareCountAll(std::optional<ui32> filterColumn, ui32 argColumn) {
+ return std::make_unique<TPreparedCountAll<TTag>>(filterColumn, argColumn);
+}
+
+template <typename TTag>
+std::unique_ptr<typename TTag::TPreparedAggregator> PrepareCount(std::optional<ui32> filterColumn, ui32 argColumn) {
+ return std::make_unique<TPreparedCount<TTag>>(filterColumn, argColumn);
+}
+
+class TBlockCountAllFactory : public IBlockAggregatorFactory {
+public:
+ std::unique_ptr<TCombineAllTag::TPreparedAggregator> PrepareCombineAll(
+ TTupleType* tupleType,
+ std::optional<ui32> filterColumn,
+ const std::vector<ui32>& argsColumns,
+ const TTypeEnvironment& env) const final {
+ Y_UNUSED(tupleType);
+ Y_UNUSED(argsColumns);
+ Y_UNUSED(env);
+ return PrepareCountAll<TCombineAllTag>(filterColumn, 0);
+ }
+
+ std::unique_ptr<TCombineKeysTag::TPreparedAggregator> PrepareCombineKeys(
+ TTupleType* tupleType,
+ std::optional<ui32> filterColumn,
+ const std::vector<ui32>& argsColumns,
+ const TTypeEnvironment& env) const final {
+ Y_UNUSED(tupleType);
+ Y_UNUSED(argsColumns);
+ Y_UNUSED(env);
+ return PrepareCountAll<TCombineKeysTag>(filterColumn, 0);
+ }
+
+ std::unique_ptr<TFinalizeKeysTag::TPreparedAggregator> PrepareFinalizeKeys(
+ TTupleType* tupleType,
+ const std::vector<ui32>& argsColumns,
+ const TTypeEnvironment& env) const final {
+ Y_UNUSED(tupleType);
+ Y_UNUSED(argsColumns);
+ Y_UNUSED(env);
+ return PrepareCountAll<TFinalizeKeysTag>(std::optional<ui32>(), argsColumns[0]);
+ }
+};
+
class TBlockCountFactory : public IBlockAggregatorFactory {
public:
- std::unique_ptr<IPreparedBlockAggregator> Prepare(
- TTupleType* tupleType,
- std::optional<ui32> filterColumn,
- const std::vector<ui32>& argsColumns,
- const TTypeEnvironment& env) const final {
- Y_UNUSED(tupleType);
- Y_UNUSED(env);
- return std::make_unique<TPreparedCountBlockAggregator>(filterColumn, argsColumns[0]);
- }
+ std::unique_ptr<TCombineAllTag::TPreparedAggregator> PrepareCombineAll(
+ TTupleType* tupleType,
+ std::optional<ui32> filterColumn,
+ const std::vector<ui32>& argsColumns,
+ const TTypeEnvironment& env) const final {
+ Y_UNUSED(tupleType);
+ Y_UNUSED(env);
+ return PrepareCount<TCombineAllTag>(filterColumn, argsColumns[0]);
+ }
+
+ std::unique_ptr<TCombineKeysTag::TPreparedAggregator> PrepareCombineKeys(
+ TTupleType* tupleType,
+ std::optional<ui32> filterColumn,
+ const std::vector<ui32>& argsColumns,
+ const TTypeEnvironment& env) const final {
+ Y_UNUSED(tupleType);
+ Y_UNUSED(argsColumns);
+ Y_UNUSED(env);
+ return PrepareCount<TCombineKeysTag>(filterColumn, argsColumns[0]);
+ }
+
+ std::unique_ptr<TFinalizeKeysTag::TPreparedAggregator> PrepareFinalizeKeys(
+ TTupleType* tupleType,
+ const std::vector<ui32>& argsColumns,
+ const TTypeEnvironment& env) const final {
+ Y_UNUSED(tupleType);
+ Y_UNUSED(argsColumns);
+ Y_UNUSED(env);
+ return PrepareCount<TFinalizeKeysTag>(std::optional<ui32>(), argsColumns[0]);
+ }
};
+}
+
std::unique_ptr<IBlockAggregatorFactory> MakeBlockCountAllFactory() {
return std::make_unique<TBlockCountAllFactory>();
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp
index e96e5f3de2b..9785de7085f 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp
@@ -20,19 +20,14 @@ struct TAggregatorFactories {
}
};
-std::unique_ptr<IPreparedBlockAggregator> PrepareBlockAggregator(
- TStringBuf name,
- TTupleType* tupleType,
- std::optional<ui32> filterColumn,
- const std::vector<ui32>& argsColumns,
- const TTypeEnvironment& env) {
+const IBlockAggregatorFactory& GetBlockAggregatorFactory(TStringBuf name) {
const auto& f = Singleton<TAggregatorFactories>()->Factories;
auto it = f.find(name);
if (it == f.end()) {
throw yexception() << "Unsupported block aggregation function: " << name;
}
- return it->second->Prepare(tupleType, filterColumn, argsColumns, env);
+ return *it->second;
}
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h
index 80b9de36723..be65fc192d1 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h
@@ -15,33 +15,62 @@ public:
virtual NUdf::TUnboxedValue Build() = 0;
};
-class IBlockAggregator {
+class IBlockAggregatorBase {
public:
- virtual ~IBlockAggregator() = default;
+ virtual ~IBlockAggregatorBase() = default;
+ const ui32 StateSize;
+
+ explicit IBlockAggregatorBase(ui32 stateSize)
+ : StateSize(stateSize)
+ {}
+};
+
+
+class IBlockAggregatorCombineAll : public IBlockAggregatorBase {
+public:
virtual void InitState(void* state) = 0;
virtual void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) = 0;
virtual NUdf::TUnboxedValue FinishOne(const void* state) = 0;
+ explicit IBlockAggregatorCombineAll(ui32 stateSize)
+ : IBlockAggregatorBase(stateSize)
+ {}
+};
+
+class IBlockAggregatorCombineKeys : public IBlockAggregatorBase {
+public:
virtual void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) = 0;
virtual void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) = 0;
- virtual std::unique_ptr<IAggColumnBuilder> MakeBuilder(ui64 size) = 0;
+ virtual std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) = 0;
- const ui32 StateSize;
+ explicit IBlockAggregatorCombineKeys(ui32 stateSize)
+ : IBlockAggregatorBase(stateSize)
+ {}
+};
- explicit IBlockAggregator(ui32 stateSize)
- : StateSize(stateSize)
+class IBlockAggregatorFinalizeKeys : public IBlockAggregatorBase {
+public:
+ virtual void LoadState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) = 0;
+
+ virtual void UpdateState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) = 0;
+
+ virtual std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) = 0;
+
+ explicit IBlockAggregatorFinalizeKeys(ui32 stateSize)
+ : IBlockAggregatorBase(stateSize)
{}
};
-class TBlockAggregatorBase : public IBlockAggregator {
+template <typename TBase>
+class TBlockAggregatorBase : public TBase {
public:
TBlockAggregatorBase(ui32 stateSize, std::optional<ui32> filterColumn, TComputationContext& ctx)
- : IBlockAggregator(stateSize)
+ : TBase(stateSize)
, FilterColumn_(filterColumn)
, Ctx_(ctx)
{
@@ -52,29 +81,54 @@ protected:
TComputationContext& Ctx_;
};
+template <typename T>
class IPreparedBlockAggregator {
public:
virtual ~IPreparedBlockAggregator() = default;
- virtual std::unique_ptr<IBlockAggregator> Make(TComputationContext& ctx) const = 0;
+ virtual std::unique_ptr<T> Make(TComputationContext& ctx) const = 0;
};
-std::unique_ptr<IPreparedBlockAggregator> PrepareBlockAggregator(
- TStringBuf name,
- TTupleType* tupleType,
- std::optional<ui32> filterColumn,
- const std::vector<ui32>& argsColumns,
- const TTypeEnvironment& env);
-
class IBlockAggregatorFactory {
public:
virtual ~IBlockAggregatorFactory() = default;
- virtual std::unique_ptr<IPreparedBlockAggregator> Prepare(
+ virtual std::unique_ptr<IPreparedBlockAggregator<IBlockAggregatorCombineAll>> PrepareCombineAll(
TTupleType* tupleType,
std::optional<ui32> filterColumn,
const std::vector<ui32>& argsColumns,
const TTypeEnvironment& env) const = 0;
+
+ virtual std::unique_ptr<IPreparedBlockAggregator<IBlockAggregatorCombineKeys>> PrepareCombineKeys(
+ TTupleType* tupleType,
+ std::optional<ui32> filterColumn,
+ const std::vector<ui32>& argsColumns,
+ const TTypeEnvironment& env) const = 0;
+
+ virtual std::unique_ptr<IPreparedBlockAggregator<IBlockAggregatorFinalizeKeys>> PrepareFinalizeKeys(
+ TTupleType* tupleType,
+ const std::vector<ui32>& argsColumns,
+ const TTypeEnvironment& env) const = 0;
+};
+
+const IBlockAggregatorFactory& GetBlockAggregatorFactory(TStringBuf name);
+
+struct TCombineAllTag {
+ using TAggregator = IBlockAggregatorCombineAll;
+ using TPreparedAggregator = IPreparedBlockAggregator<TAggregator>;
+ using TBase = TBlockAggregatorBase<TAggregator>;
+};
+
+struct TCombineKeysTag {
+ using TAggregator = IBlockAggregatorCombineKeys;
+ using TPreparedAggregator = IPreparedBlockAggregator<TAggregator>;
+ using TBase = TBlockAggregatorBase<TAggregator>;
+};
+
+struct TFinalizeKeysTag {
+ using TAggregator = IBlockAggregatorFinalizeKeys;
+ using TPreparedAggregator = IPreparedBlockAggregator<TAggregator>;
+ using TBase = TBlockAggregatorBase<TAggregator>;
};
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp
index 2992593eafc..190ec70d0e4 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp
@@ -12,6 +12,8 @@
namespace NKikimr {
namespace NMiniKQL {
+namespace {
+
template <bool IsMin, typename T>
T UpdateMinMax(T x, T y) {
if constexpr (IsMin) {
@@ -21,65 +23,114 @@ T UpdateMinMax(T x, T y) {
}
}
-template <typename TIn, typename TInScalar, typename TBuilder, bool IsMin>
-class TMinMaxBlockAggregatorNullableOrScalar : public TBlockAggregatorBase {
-public:
- struct TState {
- TIn Value_;
- ui8 IsValid_ = 0;
+template <typename TTag, typename TIn, typename TInScalar, typename TBuilder, bool IsMin>
+class TMinMaxBlockAggregatorNullableOrScalar;
- TState() {
- if constexpr (IsMin) {
- Value_ = std::numeric_limits<TIn>::max();
- } else {
- Value_ = std::numeric_limits<TIn>::min();
- }
- }
- };
-
- class TColumnBuilder : public IAggColumnBuilder {
- public:
- TColumnBuilder(ui64 size, const std::shared_ptr<arrow::DataType>& dataType, TComputationContext& ctx)
- : Builder_(dataType, &ctx.ArrowMemoryPool)
- , Ctx_(ctx)
- {
- ARROW_OK(Builder_.Reserve(size));
+template <typename TTag, typename TIn, typename TInScalar, typename TBuilder, bool IsMin>
+class TMinMaxBlockAggregator;
+
+template <typename TIn, bool IsMin>
+struct TState {
+ TIn Value_;
+ ui8 IsValid_ = 0;
+
+ TState() {
+ if constexpr (IsMin) {
+ Value_ = std::numeric_limits<TIn>::max();
+ } else {
+ Value_ = std::numeric_limits<TIn>::min();
}
+ }
+};
- void Add(const void* state) final {
- auto typedState = static_cast<const TState*>(state);
- if (typedState->IsValid_) {
- Builder_.UnsafeAppend(typedState->Value_);
- } else {
- Builder_.UnsafeAppendNull();
- }
+template <typename TIn, bool IsMin>
+struct TSimpleState {
+ TIn Value_;
+
+ TSimpleState() {
+ if constexpr (IsMin) {
+ Value_ = std::numeric_limits<TIn>::max();
+ } else {
+ Value_ = std::numeric_limits<TIn>::min();
}
+ }
+};
+
+template <typename TIn, bool IsMin, typename TBuilder>
+class TColumnBuilder : public IAggColumnBuilder {
+public:
+ TColumnBuilder(ui64 size, const std::shared_ptr<arrow::DataType>& dataType, TComputationContext& ctx)
+ : Builder_(dataType, &ctx.ArrowMemoryPool)
+ , Ctx_(ctx)
+ {
+ ARROW_OK(Builder_.Reserve(size));
+ }
- NUdf::TUnboxedValue Build() final {
- std::shared_ptr<arrow::ArrayData> result;
- ARROW_OK(Builder_.FinishInternal(&result));
- return Ctx_.HolderFactory.CreateArrowBlock(result);
+ void Add(const void* state) final {
+ auto typedState = static_cast<const TState<TIn, IsMin>*>(state);
+ if (typedState->IsValid_) {
+ Builder_.UnsafeAppend(typedState->Value_);
+ } else {
+ Builder_.UnsafeAppendNull();
}
+ }
+
+ NUdf::TUnboxedValue Build() final {
+ std::shared_ptr<arrow::ArrayData> result;
+ ARROW_OK(Builder_.FinishInternal(&result));
+ return Ctx_.HolderFactory.CreateArrowBlock(result);
+ }
+
+private:
+ TBuilder Builder_;
+ TComputationContext& Ctx_;
+};
+
+template <typename TIn, bool IsMin, typename TBuilder>
+class TSimpleColumnBuilder : public IAggColumnBuilder {
+public:
+ TSimpleColumnBuilder(ui64 size, const std::shared_ptr<arrow::DataType>& dataType, TComputationContext& ctx)
+ : Builder_(dataType, &ctx.ArrowMemoryPool)
+ , Ctx_(ctx)
+ {
+ ARROW_OK(Builder_.Reserve(size));
+ }
+
+ void Add(const void* state) final {
+ auto typedState = static_cast<const TSimpleState<TIn, IsMin>*>(state);
+ Builder_.UnsafeAppend(typedState->Value_);
+ }
- private:
- TBuilder Builder_;
- TComputationContext& Ctx_;
- };
+ NUdf::TUnboxedValue Build() final {
+ std::shared_ptr<arrow::ArrayData> result;
+ ARROW_OK(Builder_.FinishInternal(&result));
+ return Ctx_.HolderFactory.CreateArrowBlock(result);
+ }
+
+private:
+ TBuilder Builder_;
+ TComputationContext& Ctx_;
+};
+
+template <typename TIn, typename TInScalar, typename TBuilder, bool IsMin>
+class TMinMaxBlockAggregatorNullableOrScalar<TCombineAllTag, TIn, TInScalar, TBuilder, IsMin> : public TCombineAllTag::TBase {
+public:
+ using TBase = TCombineAllTag::TBase;
TMinMaxBlockAggregatorNullableOrScalar(std::optional<ui32> filterColumn, ui32 argColumn,
const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx)
- : TBlockAggregatorBase(sizeof(TState), filterColumn, ctx)
+ : TBase(sizeof(TState<TIn, IsMin>), filterColumn, ctx)
, ArgColumn_(argColumn)
- , BuilderDataType_(builderDataType)
{
+ Y_UNUSED(builderDataType);
}
void InitState(void* state) final {
- new(state) TState();
+ new(state) TState<TIn, IsMin>();
}
void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final {
- auto typedState = static_cast<TState*>(state);
+ auto typedState = static_cast<TState<TIn, IsMin>*>(state);
Y_UNUSED(batchLength);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
if (datum.is_scalar()) {
@@ -149,7 +200,7 @@ public:
}
NUdf::TUnboxedValue FinishOne(const void* state) final {
- auto typedState = static_cast<const TState*>(state);
+ auto typedState = static_cast<const TState<TIn, IsMin>*>(state);
if (!typedState->IsValid_) {
return NUdf::TUnboxedValuePod();
}
@@ -157,38 +208,67 @@ public:
return NUdf::TUnboxedValuePod(typedState->Value_);
}
+private:
+ const ui32 ArgColumn_;
+};
+
+template <typename TIn, typename TInScalar, bool IsMin>
+void PushValueToState(TState<TIn, IsMin>* typedState, const arrow::Datum& datum, ui64 row) {
+ if (datum.is_scalar()) {
+ if (datum.scalar()->is_valid) {
+ typedState->Value_ = datum.scalar_as<TInScalar>().value;
+ typedState->IsValid_ = 1;
+ }
+ } else {
+ const auto& array = datum.array();
+ auto ptr = array->GetValues<TIn>(1);
+ if (array->GetNullCount() == 0) {
+ typedState->IsValid_ = 1;
+ typedState->Value_ = UpdateMinMax<IsMin>(typedState->Value_, ptr[row]);
+ } else {
+ auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
+ ui64 fullIndex = row + array->offset;
+ // bit 1 -> mask 0xFF..FF, bit 0 -> mask 0x00..00
+ TIn mask = -TIn((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1);
+ typedState->Value_ = UpdateMinMax<IsMin>(typedState->Value_, TIn((ptr[row] & mask) | (typedState->Value_ & ~mask)));
+ typedState->IsValid_ |= mask & 1;
+ }
+ }
+}
+
+template <typename TIn, bool IsMin>
+void PushValueToSimpleState(TSimpleState<TIn, IsMin>* typedState, const arrow::Datum& datum, ui64 row) {
+ const auto& array = datum.array();
+ auto ptr = array->GetValues<TIn>(1);
+ typedState->Value_ = UpdateMinMax<IsMin>(typedState->Value_, ptr[row]);
+}
+
+template <typename TIn, typename TInScalar, typename TBuilder, bool IsMin>
+class TMinMaxBlockAggregatorNullableOrScalar<TCombineKeysTag, TIn, TInScalar, TBuilder, IsMin> : public TCombineKeysTag::TBase {
+public:
+ using TBase = TCombineKeysTag::TBase;
+
+ TMinMaxBlockAggregatorNullableOrScalar(std::optional<ui32> filterColumn, ui32 argColumn,
+ const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx)
+ : TBase(sizeof(TState<TIn, IsMin>), filterColumn, ctx)
+ , ArgColumn_(argColumn)
+ , BuilderDataType_(builderDataType)
+ {
+ }
+
void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
- new(state) TState();
+ new(state) TState<TIn, IsMin>();
UpdateKey(state, columns, row);
}
void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
- auto typedState = static_cast<TState*>(state);
+ auto typedState = static_cast<TState<TIn, IsMin>*>(state);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
- if (datum.is_scalar()) {
- if (datum.scalar()->is_valid) {
- typedState->Value_ = datum.scalar_as<TInScalar>().value;
- typedState->IsValid_ = 1;
- }
- } else {
- const auto& array = datum.array();
- auto ptr = array->GetValues<TIn>(1);
- if (array->GetNullCount() == 0) {
- typedState->IsValid_ = 1;
- typedState->Value_ = UpdateMinMax<IsMin>(typedState->Value_, ptr[row]);
- } else {
- auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
- ui64 fullIndex = row + array->offset;
- // bit 1 -> mask 0xFF..FF, bit 0 -> mask 0x00..00
- TIn mask = -TIn((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1);
- typedState->Value_ = UpdateMinMax<IsMin>(typedState->Value_, TIn((ptr[row] & mask) | (typedState->Value_ & ~mask)));
- typedState->IsValid_ |= mask & 1;
- }
- }
+ PushValueToState<TIn, TInScalar, IsMin>(typedState, datum, row);
}
- std::unique_ptr<IAggColumnBuilder> MakeBuilder(ui64 size) final {
- return std::make_unique<TColumnBuilder>(size, BuilderDataType_, Ctx_);
+ std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) final {
+ return std::make_unique<TColumnBuilder<TIn, IsMin, TBuilder>>(size, BuilderDataType_, Ctx_);
}
private:
@@ -197,59 +277,57 @@ private:
};
template <typename TIn, typename TInScalar, typename TBuilder, bool IsMin>
-class TMinMaxBlockAggregator: public TBlockAggregatorBase {
+class TMinMaxBlockAggregatorNullableOrScalar<TFinalizeKeysTag, TIn, TInScalar, TBuilder, IsMin> : public TFinalizeKeysTag::TBase {
public:
- struct TState {
- TIn Value_;
+ using TBase = TFinalizeKeysTag::TBase;
- TState() {
- if constexpr (IsMin) {
- Value_ = std::numeric_limits<TIn>::max();
- } else {
- Value_ = std::numeric_limits<TIn>::min();
- }
- }
- };
-
- class TColumnBuilder : public IAggColumnBuilder {
- public:
- TColumnBuilder(ui64 size, const std::shared_ptr<arrow::DataType>& dataType, TComputationContext& ctx)
- : Builder_(dataType, &ctx.ArrowMemoryPool)
- , Ctx_(ctx)
- {
- ARROW_OK(Builder_.Reserve(size));
- }
+ TMinMaxBlockAggregatorNullableOrScalar(std::optional<ui32> filterColumn, ui32 argColumn,
+ const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx)
+ : TBase(sizeof(TState<TIn, IsMin>), filterColumn, ctx)
+ , ArgColumn_(argColumn)
+ , BuilderDataType_(builderDataType)
+ {
+ }
- void Add(const void* state) final {
- auto typedState = static_cast<const TState*>(state);
- Builder_.UnsafeAppend(typedState->Value_);
- }
+ void LoadState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ new(state) TState<TIn, IsMin>();
+ UpdateState(state, columns, row);
+ }
- NUdf::TUnboxedValue Build() final {
- std::shared_ptr<arrow::ArrayData> result;
- ARROW_OK(Builder_.FinishInternal(&result));
- return Ctx_.HolderFactory.CreateArrowBlock(result);
- }
+ void UpdateState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ auto typedState = static_cast<TState<TIn, IsMin>*>(state);
+ const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
+ PushValueToState<TIn, TInScalar, IsMin>(typedState, datum, row);
+ }
- private:
- TBuilder Builder_;
- TComputationContext& Ctx_;
- };
+ std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final {
+ return std::make_unique<TColumnBuilder<TIn, IsMin, TBuilder>>(size, BuilderDataType_, Ctx_);
+ }
+
+private:
+ const ui32 ArgColumn_;
+ const std::shared_ptr<arrow::DataType> BuilderDataType_;
+};
+
+template <typename TIn, typename TInScalar, typename TBuilder, bool IsMin>
+class TMinMaxBlockAggregator<TCombineAllTag, TIn, TInScalar, TBuilder, IsMin> : public TCombineAllTag::TBase {
+public:
+ using TBase = TCombineAllTag::TBase;
TMinMaxBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn,
const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx)
- : TBlockAggregatorBase(sizeof(TState), filterColumn, ctx)
+ : TBase(sizeof(TSimpleState<TIn, IsMin>), filterColumn, ctx)
, ArgColumn_(argColumn)
- , BuilderDataType_(builderDataType)
{
+ Y_UNUSED(builderDataType);
}
void InitState(void* state) final {
- new(state) TState;
+ new(state) TSimpleState<TIn, IsMin>;
}
void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final {
- auto typedState = static_cast<TState*>(state);
+ auto typedState = static_cast<TSimpleState<TIn, IsMin>*>(state);
Y_UNUSED(batchLength);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
MKQL_ENSURE(datum.is_array(), "Expected array");
@@ -282,25 +360,40 @@ public:
}
NUdf::TUnboxedValue FinishOne(const void* state) final {
- auto typedState = static_cast<const TState*>(state);
+ auto typedState = static_cast<const TSimpleState<TIn, IsMin>*>(state);
return NUdf::TUnboxedValuePod(typedState->Value_);
}
+private:
+ const ui32 ArgColumn_;
+};
+
+template <typename TIn, typename TInScalar, typename TBuilder, bool IsMin>
+class TMinMaxBlockAggregator<TCombineKeysTag, TIn, TInScalar, TBuilder, IsMin> : public TCombineKeysTag::TBase {
+public:
+ using TBase = TCombineKeysTag::TBase;
+
+ TMinMaxBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn,
+ const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx)
+ : TBase(sizeof(TSimpleState<TIn, IsMin>), filterColumn, ctx)
+ , ArgColumn_(argColumn)
+ , BuilderDataType_(builderDataType)
+ {
+ }
+
void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
- new(state) TState();
+ new(state) TSimpleState<TIn, IsMin>();
UpdateKey(state, columns, row);
}
void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
- auto typedState = static_cast<TState*>(state);
+ auto typedState = static_cast<TSimpleState<TIn, IsMin>*>(state);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
- const auto& array = datum.array();
- auto ptr = array->GetValues<TIn>(1);
- typedState->Value_ = UpdateMinMax<IsMin>(typedState->Value_, ptr[row]);
+ PushValueToSimpleState<TIn, IsMin>(typedState, datum, row);
}
- std::unique_ptr<IAggColumnBuilder> MakeBuilder(ui64 size) final {
- return std::make_unique<TColumnBuilder>(size, BuilderDataType_, Ctx_);
+ std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) final {
+ return std::make_unique<TSimpleColumnBuilder<TIn, IsMin, TBuilder>>(size, BuilderDataType_, Ctx_);
}
private:
@@ -309,7 +402,40 @@ private:
};
template <typename TIn, typename TInScalar, typename TBuilder, bool IsMin>
-class TPreparedMinMaxBlockAggregatorNullableOrScalar : public IPreparedBlockAggregator {
+class TMinMaxBlockAggregator<TFinalizeKeysTag, TIn, TInScalar, TBuilder, IsMin> : public TFinalizeKeysTag::TBase {
+public:
+ using TBase = TFinalizeKeysTag::TBase;
+
+ TMinMaxBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn,
+ const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx)
+ : TBase(sizeof(TSimpleState<TIn, IsMin>), filterColumn, ctx)
+ , ArgColumn_(argColumn)
+ , BuilderDataType_(builderDataType)
+ {
+ }
+
+ void LoadState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ new(state) TSimpleState<TIn, IsMin>();
+ UpdateState(state, columns, row);
+ }
+
+ void UpdateState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ auto typedState = static_cast<TSimpleState<TIn, IsMin>*>(state);
+ const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
+ PushValueToSimpleState<TIn, IsMin>(typedState, datum, row);
+ }
+
+ std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final {
+ return std::make_unique<TSimpleColumnBuilder<TIn, IsMin, TBuilder>>(size, BuilderDataType_, Ctx_);
+ }
+
+private:
+ const ui32 ArgColumn_;
+ const std::shared_ptr<arrow::DataType> BuilderDataType_;
+};
+
+template <typename TTag, typename TIn, typename TInScalar, typename TBuilder, bool IsMin>
+class TPreparedMinMaxBlockAggregatorNullableOrScalar : public TTag::TPreparedAggregator {
public:
TPreparedMinMaxBlockAggregatorNullableOrScalar(std::optional<ui32> filterColumn, ui32 argColumn,
const std::shared_ptr<arrow::DataType>& builderDataType)
@@ -318,8 +444,8 @@ public:
, BuilderDataType_(builderDataType)
{}
- std::unique_ptr<IBlockAggregator> Make(TComputationContext& ctx) const final {
- return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<TIn, TInScalar, TBuilder, IsMin>>(FilterColumn_, ArgColumn_, BuilderDataType_, ctx);
+ std::unique_ptr<typename TTag::TAggregator> Make(TComputationContext& ctx) const final {
+ return std::make_unique<TMinMaxBlockAggregatorNullableOrScalar<TTag, TIn, TInScalar, TBuilder, IsMin>>(FilterColumn_, ArgColumn_, BuilderDataType_, ctx);
}
private:
@@ -328,8 +454,8 @@ private:
const std::shared_ptr<arrow::DataType> BuilderDataType_;
};
-template <typename TIn, typename TInScalar, typename TBuilder, bool IsMin>
-class TPreparedMinMaxBlockAggregator : public IPreparedBlockAggregator {
+template <typename TTag, typename TIn, typename TInScalar, typename TBuilder, bool IsMin>
+class TPreparedMinMaxBlockAggregator : public TTag::TPreparedAggregator {
public:
TPreparedMinMaxBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn,
const std::shared_ptr<arrow::DataType>& builderDataType)
@@ -338,8 +464,8 @@ public:
, BuilderDataType_(builderDataType)
{}
- std::unique_ptr<IBlockAggregator> Make(TComputationContext& ctx) const final {
- return std::make_unique<TMinMaxBlockAggregator<TIn, TInScalar, TBuilder, IsMin>>(FilterColumn_, ArgColumn_, BuilderDataType_, ctx);
+ std::unique_ptr<typename TTag::TAggregator> Make(TComputationContext& ctx) const final {
+ return std::make_unique<TMinMaxBlockAggregator<TTag, TIn, TInScalar, TBuilder, IsMin>>(FilterColumn_, ArgColumn_, BuilderDataType_, ctx);
}
private:
@@ -348,75 +474,100 @@ private:
const std::shared_ptr<arrow::DataType> BuilderDataType_;
};
+template <typename TTag, bool IsMin>
+std::unique_ptr<typename TTag::TPreparedAggregator> PrepareMinMax(TTupleType* tupleType, std::optional<ui32> filterColumn, ui32 argColumn) {
+ auto blockType = AS_TYPE(TBlockType, tupleType->GetElementType(argColumn));
+ auto argType = blockType->GetItemType();
+ bool isOptional;
+ auto dataType = UnpackOptionalData(argType, isOptional);
+ if (blockType->GetShape() == TBlockType::EShape::Scalar || isOptional) {
+ switch (*dataType->GetDataSlot()) {
+ case NUdf::EDataSlot::Int8:
+ return std::make_unique<TPreparedMinMaxBlockAggregatorNullableOrScalar<TTag, i8, arrow::Int8Scalar, arrow::Int8Builder, IsMin>>(filterColumn, argColumn, arrow::int8());
+ case NUdf::EDataSlot::Bool:
+ case NUdf::EDataSlot::Uint8:
+ return std::make_unique<TPreparedMinMaxBlockAggregatorNullableOrScalar<TTag, ui8, arrow::UInt8Scalar, arrow::UInt8Builder, IsMin>>(filterColumn, argColumn, arrow::uint8());
+ case NUdf::EDataSlot::Int16:
+ return std::make_unique<TPreparedMinMaxBlockAggregatorNullableOrScalar<TTag, i16, arrow::Int16Scalar, arrow::Int16Builder, IsMin>>(filterColumn, argColumn, arrow::int16());
+ case NUdf::EDataSlot::Uint16:
+ case NUdf::EDataSlot::Date:
+ return std::make_unique<TPreparedMinMaxBlockAggregatorNullableOrScalar<TTag, ui16, arrow::UInt16Scalar, arrow::UInt16Builder, IsMin>>(filterColumn, argColumn, arrow::uint16());
+ case NUdf::EDataSlot::Int32:
+ return std::make_unique<TPreparedMinMaxBlockAggregatorNullableOrScalar<TTag, i32, arrow::Int32Scalar, arrow::Int32Builder, IsMin>>(filterColumn, argColumn, arrow::int32());
+ case NUdf::EDataSlot::Uint32:
+ case NUdf::EDataSlot::Datetime:
+ return std::make_unique<TPreparedMinMaxBlockAggregatorNullableOrScalar<TTag, ui32, arrow::UInt32Scalar, arrow::UInt32Builder, IsMin>>(filterColumn, argColumn, arrow::uint32());
+ case NUdf::EDataSlot::Int64:
+ case NUdf::EDataSlot::Interval:
+ return std::make_unique<TPreparedMinMaxBlockAggregatorNullableOrScalar<TTag, i64, arrow::Int64Scalar, arrow::Int64Builder, IsMin>>(filterColumn, argColumn, arrow::int64());
+ case NUdf::EDataSlot::Uint64:
+ case NUdf::EDataSlot::Timestamp:
+ return std::make_unique<TPreparedMinMaxBlockAggregatorNullableOrScalar<TTag, ui64, arrow::UInt64Scalar, arrow::UInt64Builder, IsMin>>(filterColumn, argColumn, arrow::uint64());
+ default:
+ throw yexception() << "Unsupported MIN/MAX input type";
+ }
+ }
+ else {
+ switch (*dataType->GetDataSlot()) {
+ case NUdf::EDataSlot::Int8:
+ return std::make_unique<TPreparedMinMaxBlockAggregator<TTag, i8, arrow::Int8Scalar, arrow::Int8Builder, IsMin>>(filterColumn, argColumn, arrow::int8());
+ case NUdf::EDataSlot::Uint8:
+ case NUdf::EDataSlot::Bool:
+ return std::make_unique<TPreparedMinMaxBlockAggregator<TTag, ui8, arrow::UInt8Scalar, arrow::UInt8Builder, IsMin>>(filterColumn, argColumn, arrow::uint8());
+ case NUdf::EDataSlot::Int16:
+ return std::make_unique<TPreparedMinMaxBlockAggregator<TTag, i16, arrow::Int16Scalar, arrow::Int16Builder, IsMin>>(filterColumn, argColumn, arrow::int16());
+ case NUdf::EDataSlot::Uint16:
+ case NUdf::EDataSlot::Date:
+ return std::make_unique<TPreparedMinMaxBlockAggregator<TTag, ui16, arrow::UInt16Scalar, arrow::UInt16Builder, IsMin>>(filterColumn, argColumn, arrow::uint16());
+ case NUdf::EDataSlot::Int32:
+ return std::make_unique<TPreparedMinMaxBlockAggregator<TTag, i32, arrow::Int32Scalar, arrow::Int32Builder, IsMin>>(filterColumn, argColumn, arrow::int32());
+ case NUdf::EDataSlot::Uint32:
+ case NUdf::EDataSlot::Datetime:
+ return std::make_unique<TPreparedMinMaxBlockAggregator<TTag, ui32, arrow::UInt32Scalar, arrow::UInt32Builder, IsMin>>(filterColumn, argColumn, arrow::uint32());
+ case NUdf::EDataSlot::Int64:
+ case NUdf::EDataSlot::Interval:
+ return std::make_unique<TPreparedMinMaxBlockAggregator<TTag, i64, arrow::Int64Scalar, arrow::Int64Builder, IsMin>>(filterColumn, argColumn, arrow::int64());
+ case NUdf::EDataSlot::Uint64:
+ case NUdf::EDataSlot::Timestamp:
+ return std::make_unique<TPreparedMinMaxBlockAggregator<TTag, ui64, arrow::UInt64Scalar, arrow::UInt64Builder, IsMin>>(filterColumn, argColumn, arrow::uint64());
+ default:
+ throw yexception() << "Unsupported MIN/MAX input type";
+ }
+ }
+}
+
template <bool IsMin>
class TBlockMinMaxFactory : public IBlockAggregatorFactory {
public:
- std::unique_ptr<IPreparedBlockAggregator> Prepare(
- TTupleType* tupleType,
- std::optional<ui32> filterColumn,
- const std::vector<ui32>& argsColumns,
- const TTypeEnvironment& env) const final {
- Y_UNUSED(env);
- auto blockType = AS_TYPE(TBlockType, tupleType->GetElementType(argsColumns[0]));
- auto argType = blockType->GetItemType();
- bool isOptional;
- auto dataType = UnpackOptionalData(argType, isOptional);
- if (blockType->GetShape() == TBlockType::EShape::Scalar || isOptional) {
- switch (*dataType->GetDataSlot()) {
- case NUdf::EDataSlot::Int8:
- return std::make_unique<TPreparedMinMaxBlockAggregatorNullableOrScalar<i8, arrow::Int8Scalar, arrow::Int8Builder, IsMin>>(filterColumn, argsColumns[0], arrow::int8());
- case NUdf::EDataSlot::Bool:
- case NUdf::EDataSlot::Uint8:
- return std::make_unique<TPreparedMinMaxBlockAggregatorNullableOrScalar<ui8, arrow::UInt8Scalar, arrow::UInt8Builder, IsMin>>(filterColumn, argsColumns[0], arrow::uint8());
- case NUdf::EDataSlot::Int16:
- return std::make_unique<TPreparedMinMaxBlockAggregatorNullableOrScalar<i16, arrow::Int16Scalar, arrow::Int16Builder, IsMin>>(filterColumn, argsColumns[0], arrow::int16());
- case NUdf::EDataSlot::Uint16:
- case NUdf::EDataSlot::Date:
- return std::make_unique<TPreparedMinMaxBlockAggregatorNullableOrScalar<ui16, arrow::UInt16Scalar, arrow::UInt16Builder, IsMin>>(filterColumn, argsColumns[0], arrow::uint16());
- case NUdf::EDataSlot::Int32:
- return std::make_unique<TPreparedMinMaxBlockAggregatorNullableOrScalar<i32, arrow::Int32Scalar, arrow::Int32Builder, IsMin>>(filterColumn, argsColumns[0], arrow::int32());
- case NUdf::EDataSlot::Uint32:
- case NUdf::EDataSlot::Datetime:
- return std::make_unique<TPreparedMinMaxBlockAggregatorNullableOrScalar<ui32, arrow::UInt32Scalar, arrow::UInt32Builder, IsMin>>(filterColumn, argsColumns[0], arrow::uint32());
- case NUdf::EDataSlot::Int64:
- case NUdf::EDataSlot::Interval:
- return std::make_unique<TPreparedMinMaxBlockAggregatorNullableOrScalar<i64, arrow::Int64Scalar, arrow::Int64Builder, IsMin>>(filterColumn, argsColumns[0], arrow::int64());
- case NUdf::EDataSlot::Uint64:
- case NUdf::EDataSlot::Timestamp:
- return std::make_unique<TPreparedMinMaxBlockAggregatorNullableOrScalar<ui64, arrow::UInt64Scalar, arrow::UInt64Builder, IsMin>>(filterColumn, argsColumns[0], arrow::uint64());
- default:
- throw yexception() << "Unsupported MIN/MAX input type";
- }
- } else {
- switch (*dataType->GetDataSlot()) {
- case NUdf::EDataSlot::Int8:
- return std::make_unique<TPreparedMinMaxBlockAggregator<i8, arrow::Int8Scalar, arrow::Int8Builder, IsMin>>(filterColumn, argsColumns[0], arrow::int8());
- case NUdf::EDataSlot::Uint8:
- case NUdf::EDataSlot::Bool:
- return std::make_unique<TPreparedMinMaxBlockAggregator<ui8, arrow::UInt8Scalar, arrow::UInt8Builder, IsMin>>(filterColumn, argsColumns[0], arrow::uint8());
- case NUdf::EDataSlot::Int16:
- return std::make_unique<TPreparedMinMaxBlockAggregator<i16, arrow::Int16Scalar, arrow::Int16Builder, IsMin>>(filterColumn, argsColumns[0], arrow::int16());
- case NUdf::EDataSlot::Uint16:
- case NUdf::EDataSlot::Date:
- return std::make_unique<TPreparedMinMaxBlockAggregator<ui16, arrow::UInt16Scalar, arrow::UInt16Builder, IsMin>>(filterColumn, argsColumns[0], arrow::uint16());
- case NUdf::EDataSlot::Int32:
- return std::make_unique<TPreparedMinMaxBlockAggregator<i32, arrow::Int32Scalar, arrow::Int32Builder, IsMin>>(filterColumn, argsColumns[0], arrow::int32());
- case NUdf::EDataSlot::Uint32:
- case NUdf::EDataSlot::Datetime:
- return std::make_unique<TPreparedMinMaxBlockAggregator<ui32, arrow::UInt32Scalar, arrow::UInt32Builder, IsMin>>(filterColumn, argsColumns[0], arrow::uint32());
- case NUdf::EDataSlot::Int64:
- case NUdf::EDataSlot::Interval:
- return std::make_unique<TPreparedMinMaxBlockAggregator<i64, arrow::Int64Scalar, arrow::Int64Builder, IsMin>>(filterColumn, argsColumns[0], arrow::int64());
- case NUdf::EDataSlot::Uint64:
- case NUdf::EDataSlot::Timestamp:
- return std::make_unique<TPreparedMinMaxBlockAggregator<ui64, arrow::UInt64Scalar, arrow::UInt64Builder, IsMin>>(filterColumn, argsColumns[0], arrow::uint64());
- default:
- throw yexception() << "Unsupported MIN/MAX input type";
- }
- }
- }
+ std::unique_ptr<TCombineAllTag::TPreparedAggregator> PrepareCombineAll(
+ TTupleType* tupleType,
+ std::optional<ui32> filterColumn,
+ const std::vector<ui32>& argsColumns,
+ const TTypeEnvironment& env) const final {
+ Y_UNUSED(env);
+ return PrepareMinMax<TCombineAllTag, IsMin>(tupleType, filterColumn, argsColumns[0]);
+ }
+
+ std::unique_ptr<TCombineKeysTag::TPreparedAggregator> PrepareCombineKeys(
+ TTupleType* tupleType,
+ std::optional<ui32> filterColumn,
+ const std::vector<ui32>& argsColumns,
+ const TTypeEnvironment& env) const final {
+ Y_UNUSED(env);
+ return PrepareMinMax<TCombineKeysTag, IsMin>(tupleType, filterColumn, argsColumns[0]);
+ }
+
+ std::unique_ptr<TFinalizeKeysTag::TPreparedAggregator> PrepareFinalizeKeys(
+ TTupleType* tupleType,
+ const std::vector<ui32>& argsColumns,
+ const TTypeEnvironment& env) const final {
+ Y_UNUSED(env);
+ return PrepareMinMax<TFinalizeKeysTag, IsMin>(tupleType, std::optional<ui32>(), argsColumns[0]);
+ }
};
+}
+
std::unique_ptr<IBlockAggregatorFactory> MakeBlockMinFactory() {
return std::make_unique<TBlockMinMaxFactory<true>>();
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
index 9894a3c2317..cf1df6f051a 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
@@ -12,57 +12,189 @@
namespace NKikimr {
namespace NMiniKQL {
-template <typename TIn, typename TSum, typename TBuilder, typename TInScalar>
-class TSumBlockAggregatorNullableOrScalar : public TBlockAggregatorBase {
+namespace {
+
+template <typename TSum>
+struct TSumState {
+ TSum Sum_ = 0;
+ ui8 IsValid_ = 0;
+};
+
+template <typename TSum>
+struct TSumSimpleState {
+ TSum Sum_ = 0;
+};
+
+struct TAvgState {
+ double Sum_ = 0;
+ ui64 Count_ = 0;
+};
+
+template <typename TSum, typename TBuilder>
+class TSumColumnBuilder : public IAggColumnBuilder {
public:
- struct TState {
- TSum Sum_ = 0;
- ui8 IsValid_ = 0;
- };
-
- class TColumnBuilder : public IAggColumnBuilder {
- public:
- TColumnBuilder(ui64 size, const std::shared_ptr<arrow::DataType>& dataType, TComputationContext& ctx)
- : Builder_(dataType, &ctx.ArrowMemoryPool)
- , Ctx_(ctx)
- {
- ARROW_OK(Builder_.Reserve(size));
+ TSumColumnBuilder(ui64 size, const std::shared_ptr<arrow::DataType>& dataType, TComputationContext& ctx)
+ : Builder_(dataType, &ctx.ArrowMemoryPool)
+ , Ctx_(ctx)
+ {
+ ARROW_OK(Builder_.Reserve(size));
+ }
+
+ void Add(const void* state) final {
+ auto typedState = static_cast<const TSumState<TSum>*>(state);
+ if (typedState->IsValid_) {
+ Builder_.UnsafeAppend(typedState->Sum_);
}
+ else {
+ Builder_.UnsafeAppendNull();
+ }
+ }
- void Add(const void* state) final {
- auto typedState = static_cast<const TState*>(state);
- if (typedState->IsValid_) {
- Builder_.UnsafeAppend(typedState->Sum_);
- } else {
- Builder_.UnsafeAppendNull();
- }
+ NUdf::TUnboxedValue Build() final {
+ std::shared_ptr<arrow::ArrayData> result;
+ ARROW_OK(Builder_.FinishInternal(&result));
+ return Ctx_.HolderFactory.CreateArrowBlock(result);
+ }
+
+private:
+ TBuilder Builder_;
+ TComputationContext& Ctx_;
+};
+
+template <typename TSum, typename TBuilder>
+class TSimpleSumColumnBuilder : public IAggColumnBuilder {
+public:
+ TSimpleSumColumnBuilder(ui64 size, const std::shared_ptr<arrow::DataType>& dataType, TComputationContext& ctx)
+ : Builder_(dataType, &ctx.ArrowMemoryPool)
+ , Ctx_(ctx)
+ {
+ ARROW_OK(Builder_.Reserve(size));
+ }
+
+ void Add(const void* state) final {
+ auto typedState = static_cast<const TSumSimpleState<TSum>*>(state);
+ Builder_.UnsafeAppend(typedState->Sum_);
+ }
+
+ NUdf::TUnboxedValue Build() final {
+ std::shared_ptr<arrow::ArrayData> result;
+ ARROW_OK(Builder_.FinishInternal(&result));
+ return Ctx_.HolderFactory.CreateArrowBlock(result);
+ }
+
+private:
+ TBuilder Builder_;
+ TComputationContext& Ctx_;
+};
+
+class TAvgStateColumnBuilder : public IAggColumnBuilder {
+public:
+ TAvgStateColumnBuilder(ui64 size, const std::shared_ptr<arrow::DataType>& arrowType, TComputationContext& ctx)
+ : ArrowType_(arrowType)
+ , Ctx_(ctx)
+ , NullBitmapBuilder_(&ctx.ArrowMemoryPool)
+ , SumBuilder_(arrow::float64(), &ctx.ArrowMemoryPool)
+ , CountBuilder_(arrow::uint64(), &ctx.ArrowMemoryPool)
+ {
+ ARROW_OK(NullBitmapBuilder_.Reserve(size));
+ ARROW_OK(SumBuilder_.Reserve(size));
+ ARROW_OK(CountBuilder_.Reserve(size));
+ }
+
+ void Add(const void* state) final {
+ auto typedState = static_cast<const TAvgState*>(state);
+ if (typedState->Count_) {
+ NullBitmapBuilder_.UnsafeAppend(true);
+ SumBuilder_.UnsafeAppend(typedState->Sum_);
+ CountBuilder_.UnsafeAppend(typedState->Count_);
+ } else {
+ NullBitmapBuilder_.UnsafeAppend(false);
+ SumBuilder_.UnsafeAppendNull();
+ CountBuilder_.UnsafeAppendNull();
}
+ }
+
+ NUdf::TUnboxedValue Build() final {
+ std::shared_ptr<arrow::ArrayData> sumResult;
+ std::shared_ptr<arrow::ArrayData> countResult;
+ ARROW_OK(SumBuilder_.FinishInternal(&sumResult));
+ ARROW_OK(CountBuilder_.FinishInternal(&countResult));
+ std::shared_ptr<arrow::Buffer> nullBitmap;
+ auto length = NullBitmapBuilder_.length();
+ auto nullCount = NullBitmapBuilder_.false_count();
+ ARROW_OK(NullBitmapBuilder_.Finish(&nullBitmap));
+
+ auto arrayData = arrow::ArrayData::Make(ArrowType_, length, { nullBitmap }, nullCount, 0);
+ arrayData->child_data.push_back(sumResult);
+ arrayData->child_data.push_back(countResult);
+ return Ctx_.HolderFactory.CreateArrowBlock(arrow::Datum(arrayData));
+ }
- NUdf::TUnboxedValue Build() final {
- std::shared_ptr<arrow::ArrayData> result;
- ARROW_OK(Builder_.FinishInternal(&result));
- return Ctx_.HolderFactory.CreateArrowBlock(result);
+private:
+ const std::shared_ptr<arrow::DataType> ArrowType_;
+ TComputationContext& Ctx_;
+ arrow::TypedBufferBuilder<bool> NullBitmapBuilder_;
+ arrow::DoubleBuilder SumBuilder_;
+ arrow::UInt64Builder CountBuilder_;
+};
+
+class TAvgResultColumnBuilder : public IAggColumnBuilder {
+public:
+ TAvgResultColumnBuilder(ui64 size, TComputationContext& ctx)
+ : Ctx_(ctx)
+ , Builder_(arrow::float64(), &ctx.ArrowMemoryPool)
+ {
+ ARROW_OK(Builder_.Reserve(size));
+ }
+
+ void Add(const void* state) final {
+ auto typedState = static_cast<const TAvgState*>(state);
+ if (typedState->Count_) {
+ Builder_.UnsafeAppend(typedState->Sum_ / typedState->Count_);
+ } else {
+ Builder_.UnsafeAppendNull();
}
+ }
+
+ NUdf::TUnboxedValue Build() final {
+ std::shared_ptr<arrow::ArrayData> result;
+ ARROW_OK(Builder_.FinishInternal(&result));
+ return Ctx_.HolderFactory.CreateArrowBlock(arrow::Datum(result));
+ }
+
+private:
+ TComputationContext& Ctx_;
+ arrow::DoubleBuilder Builder_;
+};
+
+template <typename TTag, typename TIn, typename TSum, typename TBuilder, typename TInScalar>
+class TSumBlockAggregatorNullableOrScalar;
+
+template <typename TTag, typename TIn, typename TSum, typename TBuilder, typename TInScalar>
+class TSumBlockAggregator;
- private:
- TBuilder Builder_;
- TComputationContext& Ctx_;
- };
+template <typename TTag, typename TIn, typename TInScalar>
+class TAvgBlockAggregator;
+
+template <typename TIn, typename TSum, typename TBuilder, typename TInScalar>
+class TSumBlockAggregatorNullableOrScalar<TCombineAllTag, TIn, TSum, TBuilder, TInScalar> : public TCombineAllTag::TBase {
+public:
+ using TBase = TCombineAllTag::TBase;
TSumBlockAggregatorNullableOrScalar(std::optional<ui32> filterColumn, ui32 argColumn,
const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx)
- : TBlockAggregatorBase(sizeof(TState), filterColumn, ctx)
+ : TBase(sizeof(TSumState<TSum>), filterColumn, ctx)
, ArgColumn_(argColumn)
- , BuilderDataType_(builderDataType)
{
+ Y_UNUSED(builderDataType);
}
void InitState(void* state) final {
- new(state) TState();
+ new(state) TSumState<TSum>();
}
void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final {
- auto typedState = static_cast<TState*>(state);
+ auto typedState = static_cast<TSumState<TSum>*>(state);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
if (datum.is_scalar()) {
if (datum.scalar()->is_valid) {
@@ -131,7 +263,7 @@ public:
}
NUdf::TUnboxedValue FinishOne(const void* state) final {
- auto typedState = static_cast<const TState*>(state);
+ auto typedState = static_cast<const TSumState<TSum>*>(state);
if (!typedState->IsValid_) {
return NUdf::TUnboxedValuePod();
}
@@ -139,38 +271,67 @@ public:
return NUdf::TUnboxedValuePod(typedState->Sum_);
}
+private:
+ const ui32 ArgColumn_;
+};
+
+template <typename TIn, typename TSum, typename TInScalar>
+void PushValueToState(TSumState<TSum>* typedState, const arrow::Datum& datum, ui64 row) {
+ if (datum.is_scalar()) {
+ if (datum.scalar()->is_valid) {
+ typedState->Sum_ += datum.scalar_as<TInScalar>().value;
+ typedState->IsValid_ = 1;
+ }
+ } else {
+ const auto& array = datum.array();
+ auto ptr = array->GetValues<TIn>(1);
+ if (array->GetNullCount() == 0) {
+ typedState->IsValid_ = 1;
+ typedState->Sum_ += ptr[row];
+ } else {
+ auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
+ ui64 fullIndex = row + array->offset;
+ // bit 1 -> mask 0xFF..FF, bit 0 -> mask 0x00..00
+ TIn mask = (((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1) ^ 1) - TIn(1);
+ typedState->Sum_ += (ptr[row] & mask);
+ typedState->IsValid_ |= mask & 1;
+ }
+ }
+}
+
+template <typename TIn, typename TSum>
+void PushValueToSimpleState(TSumSimpleState<TSum>* typedState, const arrow::Datum& datum, ui64 row) {
+ const auto& array = datum.array();
+ auto ptr = array->GetValues<TIn>(1);
+ typedState->Sum_ += ptr[row];
+}
+
+template <typename TIn, typename TSum, typename TBuilder, typename TInScalar>
+class TSumBlockAggregatorNullableOrScalar<TCombineKeysTag, TIn, TSum, TBuilder, TInScalar> : public TCombineKeysTag::TBase {
+public:
+ using TBase = TCombineKeysTag::TBase;
+
+ TSumBlockAggregatorNullableOrScalar(std::optional<ui32> filterColumn, ui32 argColumn,
+ const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx)
+ : TBase(sizeof(TSumState<TSum>), filterColumn, ctx)
+ , ArgColumn_(argColumn)
+ , BuilderDataType_(builderDataType)
+ {
+ }
+
void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
- new(state) TState();
+ new(state) TSumState<TSum>();
UpdateKey(state, columns, row);
}
void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
- auto typedState = static_cast<TState*>(state);
+ auto typedState = static_cast<TSumState<TSum>*>(state);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
- if (datum.is_scalar()) {
- if (datum.scalar()->is_valid) {
- typedState->Sum_ += datum.scalar_as<TInScalar>().value;
- typedState->IsValid_ = 1;
- }
- } else {
- const auto& array = datum.array();
- auto ptr = array->GetValues<TIn>(1);
- if (array->GetNullCount() == 0) {
- typedState->IsValid_ = 1;
- typedState->Sum_ += ptr[row];
- } else {
- auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
- ui64 fullIndex = row + array->offset;
- // bit 1 -> mask 0xFF..FF, bit 0 -> mask 0x00..00
- TIn mask = (((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1) ^ 1) - TIn(1);
- typedState->Sum_ += (ptr[row] & mask);
- typedState->IsValid_ |= mask & 1;
- }
- }
+ PushValueToState<TIn, TSum, TInScalar>(typedState, datum, row);
}
- std::unique_ptr<IAggColumnBuilder> MakeBuilder(ui64 size) final {
- return std::make_unique<TColumnBuilder>(size, BuilderDataType_, Ctx_);
+ std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) final {
+ return std::make_unique<TSumColumnBuilder<TSum, TBuilder>>(size, BuilderDataType_, Ctx_);
}
private:
@@ -179,51 +340,57 @@ private:
};
template <typename TIn, typename TSum, typename TBuilder, typename TInScalar>
-class TSumBlockAggregator : public TBlockAggregatorBase {
+class TSumBlockAggregatorNullableOrScalar<TFinalizeKeysTag, TIn, TSum, TBuilder, TInScalar> : public TFinalizeKeysTag::TBase {
public:
- struct TState {
- TSum Sum_ = 0;
- };
-
- class TColumnBuilder : public IAggColumnBuilder {
- public:
- TColumnBuilder(ui64 size, const std::shared_ptr<arrow::DataType>& dataType, TComputationContext& ctx)
- : Builder_(dataType, &ctx.ArrowMemoryPool)
- , Ctx_(ctx)
- {
- ARROW_OK(Builder_.Reserve(size));
- }
+ using TBase = TFinalizeKeysTag::TBase;
- void Add(const void* state) final {
- auto typedState = static_cast<const TState*>(state);
- Builder_.UnsafeAppend(typedState->Sum_);
- }
+ TSumBlockAggregatorNullableOrScalar(std::optional<ui32> filterColumn, ui32 argColumn,
+ const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx)
+ : TBase(sizeof(TSumState<TSum>), filterColumn, ctx)
+ , ArgColumn_(argColumn)
+ , BuilderDataType_(builderDataType)
+ {
+ }
- NUdf::TUnboxedValue Build() final {
- std::shared_ptr<arrow::ArrayData> result;
- ARROW_OK(Builder_.FinishInternal(&result));
- return Ctx_.HolderFactory.CreateArrowBlock(result);
- }
+ void LoadState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ new(state) TSumState<TSum>();
+ UpdateState(state, columns, row);
+ }
+
+ void UpdateState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ auto typedState = static_cast<TSumState<TSum>*>(state);
+ const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
+ PushValueToState<TIn, TSum, TInScalar>(typedState, datum, row);
+ }
- private:
- TBuilder Builder_;
- TComputationContext& Ctx_;
- };
+ std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final {
+ return std::make_unique<TSumColumnBuilder<TSum, TBuilder>>(size, BuilderDataType_, Ctx_);
+ }
+
+private:
+ const ui32 ArgColumn_;
+ const std::shared_ptr<arrow::DataType> BuilderDataType_;
+};
+
+template <typename TIn, typename TSum, typename TBuilder, typename TInScalar>
+class TSumBlockAggregator<TCombineAllTag, TIn, TSum, TBuilder, TInScalar> : public TCombineAllTag::TBase {
+public:
+ using TBase = TCombineAllTag::TBase;
TSumBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn,
const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx)
- : TBlockAggregatorBase(sizeof(TState), filterColumn, ctx)
+ : TBase(sizeof(TSumSimpleState<TSum>), filterColumn, ctx)
, ArgColumn_(argColumn)
- , BuilderDataType_(builderDataType)
{
+ Y_UNUSED(builderDataType);
}
void InitState(void* state) final {
- new(state) TState();
+ new(state) TSumSimpleState<TSum>();
}
void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final {
- auto typedState = static_cast<TState*>(state);
+ auto typedState = static_cast<TSumSimpleState<TSum>*>(state);
Y_UNUSED(batchLength);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
MKQL_ENSURE(datum.is_array(), "Expected array");
@@ -254,25 +421,40 @@ public:
}
NUdf::TUnboxedValue FinishOne(const void* state) final {
- auto typedState = static_cast<const TState*>(state);
+ auto typedState = static_cast<const TSumSimpleState<TSum>*>(state);
return NUdf::TUnboxedValuePod(typedState->Sum_);
}
+private:
+ const ui32 ArgColumn_;
+};
+
+template <typename TIn, typename TSum, typename TBuilder, typename TInScalar>
+class TSumBlockAggregator<TCombineKeysTag, TIn, TSum, TBuilder, TInScalar> : public TCombineKeysTag::TBase {
+public:
+ using TBase = TCombineKeysTag::TBase;
+
+ TSumBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn,
+ const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx)
+ : TBase(sizeof(TSumSimpleState<TSum>), filterColumn, ctx)
+ , ArgColumn_(argColumn)
+ , BuilderDataType_(builderDataType)
+ {
+ }
+
void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
- new(state) TState();
+ new(state) TSumSimpleState<TSum>();
UpdateKey(state, columns, row);
}
void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
- auto typedState = static_cast<TState*>(state);
+ auto typedState = static_cast<TSumSimpleState<TSum>*>(state);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
- const auto& array = datum.array();
- auto ptr = array->GetValues<TIn>(1);
- typedState->Sum_ += ptr[row];
+ PushValueToSimpleState<TIn, TSum>(typedState, datum, row);
}
- std::unique_ptr<IAggColumnBuilder> MakeBuilder(ui64 size) final {
- return std::make_unique<TColumnBuilder>(size, BuilderDataType_, Ctx_);
+ std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) final {
+ return std::make_unique<TSimpleSumColumnBuilder<TSum, TBuilder>>(size, BuilderDataType_, Ctx_);
}
private:
@@ -280,79 +462,58 @@ private:
const std::shared_ptr<arrow::DataType> BuilderDataType_;
};
-template <typename TIn, typename TInScalar>
-class TAvgBlockAggregator : public TBlockAggregatorBase {
+template <typename TIn, typename TSum, typename TBuilder, typename TInScalar>
+class TSumBlockAggregator<TFinalizeKeysTag, TIn, TSum, TBuilder, TInScalar> : public TFinalizeKeysTag::TBase {
public:
- struct TState {
- double Sum_ = 0;
- ui64 Count_ = 0;
- };
-
- class TColumnBuilder : public IAggColumnBuilder {
- public:
- TColumnBuilder(ui64 size, const std::shared_ptr<arrow::DataType>& arrowType, TComputationContext& ctx)
- : ArrowType_(arrowType)
- , Ctx_(ctx)
- , NullBitmapBuilder_(&ctx.ArrowMemoryPool)
- , SumBuilder_(arrow::float64(), &ctx.ArrowMemoryPool)
- , CountBuilder_(arrow::uint64(), &ctx.ArrowMemoryPool)
- {
- ARROW_OK(NullBitmapBuilder_.Reserve(size));
- ARROW_OK(SumBuilder_.Reserve(size));
- ARROW_OK(CountBuilder_.Reserve(size));
- }
+ using TBase = TFinalizeKeysTag::TBase;
- void Add(const void* state) final {
- auto typedState = static_cast<const TState*>(state);
- if (typedState->Count_) {
- NullBitmapBuilder_.UnsafeAppend(true);
- SumBuilder_.UnsafeAppend(typedState->Sum_);
- CountBuilder_.UnsafeAppend(typedState->Count_);
- } else {
- NullBitmapBuilder_.UnsafeAppend(false);
- SumBuilder_.UnsafeAppendNull();
- CountBuilder_.UnsafeAppendNull();
- }
- }
+ TSumBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn,
+ const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx)
+ : TBase(sizeof(TSumSimpleState<TSum>), filterColumn, ctx)
+ , ArgColumn_(argColumn)
+ , BuilderDataType_(builderDataType)
+ {
+ }
- NUdf::TUnboxedValue Build() final {
- std::shared_ptr<arrow::ArrayData> sumResult;
- std::shared_ptr<arrow::ArrayData> countResult;
- ARROW_OK(SumBuilder_.FinishInternal(&sumResult));
- ARROW_OK(CountBuilder_.FinishInternal(&countResult));
- std::shared_ptr<arrow::Buffer> nullBitmap;
- auto length = NullBitmapBuilder_.length();
- auto nullCount = NullBitmapBuilder_.false_count();
- ARROW_OK(NullBitmapBuilder_.Finish(&nullBitmap));
-
- auto arrayData = arrow::ArrayData::Make(ArrowType_, length, { nullBitmap }, nullCount, 0);
- arrayData->child_data.push_back(sumResult);
- arrayData->child_data.push_back(countResult);
- return Ctx_.HolderFactory.CreateArrowBlock(arrow::Datum(arrayData));
- }
+ void LoadState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ new(state) TSumSimpleState<TSum>();
+ UpdateState(state, columns, row);
+ }
+
+ void UpdateState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ auto typedState = static_cast<TSumSimpleState<TSum>*>(state);
+ const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
+ PushValueToSimpleState<TIn, TSum>(typedState, datum, row);
+ }
+
+ std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final {
+ return std::make_unique<TSimpleSumColumnBuilder<TSum, TBuilder>>(size, BuilderDataType_, Ctx_);
+ }
+
+private:
+ const ui32 ArgColumn_;
+ const std::shared_ptr<arrow::DataType> BuilderDataType_;
+};
- private:
- const std::shared_ptr<arrow::DataType> ArrowType_;
- TComputationContext& Ctx_;
- arrow::TypedBufferBuilder<bool> NullBitmapBuilder_;
- arrow::DoubleBuilder SumBuilder_;
- arrow::UInt64Builder CountBuilder_;
- };
+template <typename TIn, typename TInScalar>
+class TAvgBlockAggregator<TCombineAllTag, TIn, TInScalar> : public TCombineAllTag::TBase {
+public:
+ using TBase = TCombineAllTag::TBase;
TAvgBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn,
- const std::shared_ptr<arrow::DataType> builderDataType, TComputationContext& ctx)
- : TBlockAggregatorBase(sizeof(TState), filterColumn, ctx)
+ const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx)
+ : TBase(sizeof(TAvgState), filterColumn, ctx)
, ArgColumn_(argColumn)
- , BuilderDataType_(builderDataType)
{
+ Y_UNUSED(builderDataType);
}
void InitState(void* state) final {
- new(state) TState();
+ new(state) TAvgState();
}
void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final {
- auto typedState = static_cast<TState*>(state);
+ auto typedState = static_cast<TAvgState*>(state);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
if (datum.is_scalar()) {
if (datum.scalar()->is_valid) {
@@ -421,7 +582,7 @@ public:
}
NUdf::TUnboxedValue FinishOne(const void* state) final {
- auto typedState = static_cast<const TState*>(state);
+ auto typedState = static_cast<const TAvgState*>(state);
if (!typedState->Count_) {
return NUdf::TUnboxedValuePod();
}
@@ -433,13 +594,30 @@ public:
return arr;
}
+private:
+ ui32 ArgColumn_;
+};
+
+template <typename TIn, typename TInScalar>
+class TAvgBlockAggregator<TCombineKeysTag, TIn, TInScalar> : public TCombineKeysTag::TBase {
+public:
+ using TBase = TCombineKeysTag::TBase;
+
+ TAvgBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn,
+ const std::shared_ptr<arrow::DataType>& builderDataType, TComputationContext& ctx)
+ : TBase(sizeof(TAvgState), filterColumn, ctx)
+ , ArgColumn_(argColumn)
+ , BuilderDataType_(builderDataType)
+ {
+ }
+
void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
- new(state) TState();
+ new(state) TAvgState();
UpdateKey(state, columns, row);
}
void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
- auto typedState = static_cast<TState*>(state);
+ auto typedState = static_cast<TAvgState*>(state);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
if (datum.is_scalar()) {
if (datum.scalar()->is_valid) {
@@ -463,8 +641,8 @@ public:
}
}
- std::unique_ptr<IAggColumnBuilder> MakeBuilder(ui64 size) final {
- return std::make_unique<TColumnBuilder>(size, BuilderDataType_, Ctx_);
+ std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) final {
+ return std::make_unique<TAvgStateColumnBuilder>(size, BuilderDataType_, Ctx_);
}
private:
@@ -472,8 +650,60 @@ private:
const std::shared_ptr<arrow::DataType> BuilderDataType_;
};
-template <typename TIn, typename TSum, typename TBuilder, typename TInScalar>
-class TPreparedSumBlockAggregatorNullableOrScalar : public IPreparedBlockAggregator {
+class TAvgBlockAggregatorOverState : public TFinalizeKeysTag::TBase {
+public:
+ using TBase = TFinalizeKeysTag::TBase;
+
+ TAvgBlockAggregatorOverState(ui32 argColumn, TComputationContext& ctx)
+ : TBase(sizeof(TAvgState), {}, ctx)
+ , ArgColumn_(argColumn)
+ {
+ }
+
+ void LoadState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ new(state) TAvgState();
+ UpdateState(state, columns, row);
+ }
+
+ void UpdateState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ auto typedState = static_cast<TAvgState*>(state);
+ const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
+ if (datum.is_scalar()) {
+ if (datum.scalar()->is_valid) {
+ const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(*datum.scalar());
+
+ typedState->Sum_ += arrow::internal::checked_cast<const arrow::DoubleScalar&>(*structScalar.value[0]).value;
+ typedState->Count_ += arrow::internal::checked_cast<const arrow::UInt64Scalar&>(*structScalar.value[1]).value;
+ }
+ } else {
+ const auto& array = datum.array();
+ auto sumPtr = array->child_data[0]->GetValues<double>(1);
+ auto countPtr = array->child_data[1]->GetValues<ui64>(1);
+ if (array->GetNullCount() == 0) {
+ typedState->Sum_ += sumPtr[row];
+ typedState->Count_ += countPtr[row];
+ } else {
+ auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
+ ui64 fullIndex = row + array->offset;
+ // bit 1 -> mask 0xFF..FF, bit 0 -> mask 0x00..00
+ auto bit = (nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1;
+ ui64 mask = -ui64(bit);
+ typedState->Sum_ += sumPtr[row] * bit;
+ typedState->Count_ += mask & countPtr[row];
+ }
+ }
+ }
+
+ std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final {
+ return std::make_unique<TAvgResultColumnBuilder>(size, Ctx_);
+ }
+
+private:
+ const ui32 ArgColumn_;
+};
+
+template <typename TTag, typename TIn, typename TSum, typename TBuilder, typename TInScalar>
+class TPreparedSumBlockAggregatorNullableOrScalar : public TTag::TPreparedAggregator {
public:
TPreparedSumBlockAggregatorNullableOrScalar(std::optional<ui32> filterColumn, ui32 argColumn,
const std::shared_ptr<arrow::DataType>& builderDataType)
@@ -482,8 +712,8 @@ public:
, BuilderDataType_(builderDataType)
{}
- std::unique_ptr<IBlockAggregator> Make(TComputationContext& ctx) const final {
- return std::make_unique<TSumBlockAggregatorNullableOrScalar<TIn, TSum, TBuilder, TInScalar>>(FilterColumn_, ArgColumn_, BuilderDataType_, ctx);
+ std::unique_ptr<typename TTag::TAggregator> Make(TComputationContext& ctx) const final {
+ return std::make_unique<TSumBlockAggregatorNullableOrScalar<TTag, TIn, TSum, TBuilder, TInScalar>>(FilterColumn_, ArgColumn_, BuilderDataType_, ctx);
}
private:
@@ -492,8 +722,8 @@ private:
const std::shared_ptr<arrow::DataType> BuilderDataType_;
};
-template <typename TIn, typename TSum, typename TBuilder, typename TInScalar>
-class TPreparedSumBlockAggregator : public IPreparedBlockAggregator {
+template <typename TTag, typename TIn, typename TSum, typename TBuilder, typename TInScalar>
+class TPreparedSumBlockAggregator : public TTag::TPreparedAggregator {
public:
TPreparedSumBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn,
const std::shared_ptr<arrow::DataType>& builderDataType)
@@ -502,8 +732,8 @@ public:
, BuilderDataType_(builderDataType)
{}
- std::unique_ptr<IBlockAggregator> Make(TComputationContext& ctx) const final {
- return std::make_unique<TSumBlockAggregator<TIn, TSum, TBuilder, TInScalar>>(FilterColumn_, ArgColumn_, BuilderDataType_, ctx);
+ std::unique_ptr<typename TTag::TAggregator> Make(TComputationContext& ctx) const final {
+ return std::make_unique<TSumBlockAggregator<TTag, TIn, TSum, TBuilder, TInScalar>>(FilterColumn_, ArgColumn_, BuilderDataType_, ctx);
}
private:
@@ -512,66 +742,88 @@ private:
const std::shared_ptr<arrow::DataType> BuilderDataType_;
};
+template <typename TTag>
+std::unique_ptr<typename TTag::TPreparedAggregator> PrepareSum(TTupleType* tupleType, std::optional<ui32> filterColumn, ui32 argColumn) {
+ auto blockType = AS_TYPE(TBlockType, tupleType->GetElementType(argColumn));
+ auto argType = blockType->GetItemType();
+ bool isOptional;
+ auto dataType = UnpackOptionalData(argType, isOptional);
+ if (blockType->GetShape() == TBlockType::EShape::Scalar || isOptional) {
+ switch (*dataType->GetDataSlot()) {
+ case NUdf::EDataSlot::Int8:
+ return std::make_unique<TPreparedSumBlockAggregatorNullableOrScalar<TTag, i8, i64, arrow::Int64Builder, arrow::Int8Scalar>>(filterColumn, argColumn, arrow::int64());
+ case NUdf::EDataSlot::Uint8:
+ return std::make_unique<TPreparedSumBlockAggregatorNullableOrScalar<TTag, ui8, ui64, arrow::UInt64Builder, arrow::UInt8Scalar>>(filterColumn, argColumn, arrow::uint64());
+ case NUdf::EDataSlot::Int16:
+ return std::make_unique<TPreparedSumBlockAggregatorNullableOrScalar<TTag, i16, i64, arrow::Int64Builder, arrow::Int16Scalar>>(filterColumn, argColumn, arrow::int64());
+ case NUdf::EDataSlot::Uint16:
+ return std::make_unique<TPreparedSumBlockAggregatorNullableOrScalar<TTag, ui16, ui64, arrow::UInt64Builder, arrow::UInt16Scalar>>(filterColumn, argColumn, arrow::uint64());
+ case NUdf::EDataSlot::Int32:
+ return std::make_unique<TPreparedSumBlockAggregatorNullableOrScalar<TTag, i32, i64, arrow::Int64Builder, arrow::Int32Scalar>>(filterColumn, argColumn, arrow::int64());
+ case NUdf::EDataSlot::Uint32:
+ return std::make_unique<TPreparedSumBlockAggregatorNullableOrScalar<TTag, ui32, ui64, arrow::UInt64Builder, arrow::UInt32Scalar>>(filterColumn, argColumn, arrow::uint64());
+ case NUdf::EDataSlot::Int64:
+ return std::make_unique<TPreparedSumBlockAggregatorNullableOrScalar<TTag, i64, i64, arrow::Int64Builder, arrow::Int64Scalar>>(filterColumn, argColumn, arrow::int64());
+ case NUdf::EDataSlot::Uint64:
+ return std::make_unique<TPreparedSumBlockAggregatorNullableOrScalar<TTag, ui64, ui64, arrow::UInt64Builder, arrow::UInt64Scalar>>(filterColumn, argColumn, arrow::uint64());
+ default:
+ throw yexception() << "Unsupported SUM input type";
+ }
+ } else {
+ switch (*dataType->GetDataSlot()) {
+ case NUdf::EDataSlot::Int8:
+ return std::make_unique<TPreparedSumBlockAggregator<TTag, i8, i64, arrow::Int64Builder, arrow::Int8Scalar>>(filterColumn, argColumn, arrow::int64());
+ case NUdf::EDataSlot::Uint8:
+ return std::make_unique<TPreparedSumBlockAggregator<TTag, ui8, ui64, arrow::UInt64Builder, arrow::UInt8Scalar>>(filterColumn, argColumn, arrow::uint64());
+ case NUdf::EDataSlot::Int16:
+ return std::make_unique<TPreparedSumBlockAggregator<TTag, i16, i64, arrow::Int64Builder, arrow::Int16Scalar>>(filterColumn, argColumn, arrow::int64());
+ case NUdf::EDataSlot::Uint16:
+ return std::make_unique<TPreparedSumBlockAggregator<TTag, ui16, ui64, arrow::UInt64Builder, arrow::UInt16Scalar>>(filterColumn, argColumn, arrow::uint64());
+ case NUdf::EDataSlot::Int32:
+ return std::make_unique<TPreparedSumBlockAggregator<TTag, i32, i64, arrow::Int64Builder, arrow::Int32Scalar>>(filterColumn, argColumn, arrow::int64());
+ case NUdf::EDataSlot::Uint32:
+ return std::make_unique<TPreparedSumBlockAggregator<TTag, ui32, ui64, arrow::UInt64Builder, arrow::UInt32Scalar>>(filterColumn, argColumn, arrow::uint64());
+ case NUdf::EDataSlot::Int64:
+ return std::make_unique<TPreparedSumBlockAggregator<TTag, i64, i64, arrow::Int64Builder, arrow::Int64Scalar>>(filterColumn, argColumn, arrow::int64());
+ case NUdf::EDataSlot::Uint64:
+ return std::make_unique<TPreparedSumBlockAggregator<TTag, ui64, ui64, arrow::UInt64Builder, arrow::UInt64Scalar>>(filterColumn, argColumn, arrow::uint64());
+ default:
+ throw yexception() << "Unsupported SUM input type";
+ }
+ }
+}
+
class TBlockSumFactory : public IBlockAggregatorFactory {
public:
- std::unique_ptr<IPreparedBlockAggregator> Prepare(
- TTupleType* tupleType,
- std::optional<ui32> filterColumn,
- const std::vector<ui32>& argsColumns,
- const TTypeEnvironment& env) const final {
- Y_UNUSED(env);
- auto blockType = AS_TYPE(TBlockType, tupleType->GetElementType(argsColumns[0]));
- auto argType = blockType->GetItemType();
- bool isOptional;
- auto dataType = UnpackOptionalData(argType, isOptional);
- if (blockType->GetShape() == TBlockType::EShape::Scalar || isOptional) {
- switch (*dataType->GetDataSlot()) {
- case NUdf::EDataSlot::Int8:
- return std::make_unique<TPreparedSumBlockAggregatorNullableOrScalar<i8, i64, arrow::Int64Builder, arrow::Int8Scalar>>(filterColumn, argsColumns[0], arrow::int64());
- case NUdf::EDataSlot::Uint8:
- return std::make_unique<TPreparedSumBlockAggregatorNullableOrScalar<ui8, ui64, arrow::UInt64Builder, arrow::UInt8Scalar>>(filterColumn, argsColumns[0], arrow::uint64());
- case NUdf::EDataSlot::Int16:
- return std::make_unique<TPreparedSumBlockAggregatorNullableOrScalar<i16, i64, arrow::Int64Builder, arrow::Int16Scalar>>(filterColumn, argsColumns[0], arrow::int64());
- case NUdf::EDataSlot::Uint16:
- return std::make_unique<TPreparedSumBlockAggregatorNullableOrScalar<ui16, ui64, arrow::UInt64Builder, arrow::UInt16Scalar>>(filterColumn, argsColumns[0], arrow::uint64());
- case NUdf::EDataSlot::Int32:
- return std::make_unique<TPreparedSumBlockAggregatorNullableOrScalar<i32, i64, arrow::Int64Builder, arrow::Int32Scalar>>(filterColumn, argsColumns[0], arrow::int64());
- case NUdf::EDataSlot::Uint32:
- return std::make_unique<TPreparedSumBlockAggregatorNullableOrScalar<ui32, ui64, arrow::UInt64Builder, arrow::UInt32Scalar>>(filterColumn, argsColumns[0], arrow::uint64());
- case NUdf::EDataSlot::Int64:
- return std::make_unique<TPreparedSumBlockAggregatorNullableOrScalar<i64, i64, arrow::Int64Builder, arrow::Int64Scalar>>(filterColumn, argsColumns[0], arrow::int64());
- case NUdf::EDataSlot::Uint64:
- return std::make_unique<TPreparedSumBlockAggregatorNullableOrScalar<ui64, ui64, arrow::UInt64Builder, arrow::UInt64Scalar>>(filterColumn, argsColumns[0], arrow::uint64());
- default:
- throw yexception() << "Unsupported SUM input type";
- }
- } else {
- switch (*dataType->GetDataSlot()) {
- case NUdf::EDataSlot::Int8:
- return std::make_unique<TPreparedSumBlockAggregator<i8, i64, arrow::Int64Builder, arrow::Int8Scalar>>(filterColumn, argsColumns[0], arrow::int64());
- case NUdf::EDataSlot::Uint8:
- return std::make_unique<TPreparedSumBlockAggregator<ui8, ui64, arrow::UInt64Builder, arrow::UInt8Scalar>>(filterColumn, argsColumns[0], arrow::uint64());
- case NUdf::EDataSlot::Int16:
- return std::make_unique<TPreparedSumBlockAggregator<i16, i64, arrow::Int64Builder, arrow::Int16Scalar>>(filterColumn, argsColumns[0], arrow::int64());
- case NUdf::EDataSlot::Uint16:
- return std::make_unique<TPreparedSumBlockAggregator<ui16, ui64, arrow::UInt64Builder, arrow::UInt16Scalar>>(filterColumn, argsColumns[0], arrow::uint64());
- case NUdf::EDataSlot::Int32:
- return std::make_unique<TPreparedSumBlockAggregator<i32, i64, arrow::Int64Builder, arrow::Int32Scalar>>(filterColumn, argsColumns[0], arrow::int64());
- case NUdf::EDataSlot::Uint32:
- return std::make_unique<TPreparedSumBlockAggregator<ui32, ui64, arrow::UInt64Builder, arrow::UInt32Scalar>>(filterColumn, argsColumns[0], arrow::uint64());
- case NUdf::EDataSlot::Int64:
- return std::make_unique<TPreparedSumBlockAggregator<i64, i64, arrow::Int64Builder, arrow::Int64Scalar>>(filterColumn, argsColumns[0], arrow::int64());
- case NUdf::EDataSlot::Uint64:
- return std::make_unique<TPreparedSumBlockAggregator<ui64, ui64, arrow::UInt64Builder, arrow::UInt64Scalar>>(filterColumn, argsColumns[0], arrow::uint64());
- default:
- throw yexception() << "Unsupported SUM input type";
- }
- }
- }
+ std::unique_ptr<TCombineAllTag::TPreparedAggregator> PrepareCombineAll(
+ TTupleType* tupleType,
+ std::optional<ui32> filterColumn,
+ const std::vector<ui32>& argsColumns,
+ const TTypeEnvironment& env) const final {
+ Y_UNUSED(env);
+ return PrepareSum<TCombineAllTag>(tupleType, filterColumn, argsColumns[0]);
+ }
+
+ std::unique_ptr<TCombineKeysTag::TPreparedAggregator> PrepareCombineKeys(
+ TTupleType* tupleType,
+ std::optional<ui32> filterColumn,
+ const std::vector<ui32>& argsColumns,
+ const TTypeEnvironment& env) const final {
+ Y_UNUSED(env);
+ return PrepareSum<TCombineKeysTag>(tupleType, filterColumn, argsColumns[0]);
+ }
+
+ std::unique_ptr<TFinalizeKeysTag::TPreparedAggregator> PrepareFinalizeKeys(
+ TTupleType* tupleType,
+ const std::vector<ui32>& argsColumns,
+ const TTypeEnvironment& env) const final {
+ Y_UNUSED(env);
+ return PrepareSum<TFinalizeKeysTag>(tupleType, std::optional<ui32>(), argsColumns[0]);
+ }
};
-template <typename TIn, typename TInScalar>
-class TPreparedAvgBlockAggregator : public IPreparedBlockAggregator {
+template <typename TTag, typename TIn, typename TInScalar>
+class TPreparedAvgBlockAggregator : public TTag::TPreparedAggregator {
public:
TPreparedAvgBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn,
const std::shared_ptr<arrow::DataType>& builderDataType)
@@ -580,8 +832,8 @@ public:
, BuilderDataType_(builderDataType)
{}
- std::unique_ptr<IBlockAggregator> Make(TComputationContext& ctx) const final {
- return std::make_unique<TAvgBlockAggregator<TIn, TInScalar>>(FilterColumn_, ArgColumn_, BuilderDataType_, ctx);
+ std::unique_ptr<typename TTag::TAggregator> Make(TComputationContext& ctx) const final {
+ return std::make_unique<TAvgBlockAggregator<TTag, TIn, TInScalar>>(FilterColumn_, ArgColumn_, BuilderDataType_, ctx);
}
private:
@@ -590,47 +842,103 @@ private:
const std::shared_ptr<arrow::DataType> BuilderDataType_;
};
+class TPreparedAvgBlockAggregatorOverState : public TFinalizeKeysTag::TPreparedAggregator {
+public:
+ TPreparedAvgBlockAggregatorOverState(ui32 argColumn)
+ : ArgColumn_(argColumn)
+ {}
+
+ std::unique_ptr<typename TFinalizeKeysTag::TAggregator> Make(TComputationContext& ctx) const final {
+ return std::make_unique<TAvgBlockAggregatorOverState>(ArgColumn_, ctx);
+ }
+
+private:
+ const ui32 ArgColumn_;
+};
+
+template <typename TTag>
+std::unique_ptr<typename TTag::TPreparedAggregator> PrepareAvg(TTupleType* tupleType, std::optional<ui32> filterColumn, ui32 argColumn, const TTypeEnvironment& env);
+
+template <typename TTag>
+std::unique_ptr<typename TTag::TPreparedAggregator> PrepareAvgOverInput(TTupleType* tupleType, std::optional<ui32> filterColumn, ui32 argColumn, const TTypeEnvironment& env) {
+ auto doubleType = TDataType::Create(NUdf::TDataType<double>::Id, env);
+ auto ui64Type = TDataType::Create(NUdf::TDataType<ui64>::Id, env);
+ TVector<TType*> tupleElements = { doubleType, ui64Type };
+ auto avgRetType = TTupleType::Create(2, tupleElements.data(), env);
+ std::shared_ptr<arrow::DataType> builderDataType;
+ bool isOptional;
+ MKQL_ENSURE(ConvertArrowType(avgRetType, isOptional, builderDataType), "Unsupported builder type");
+
+ auto argType = AS_TYPE(TBlockType, tupleType->GetElementType(argColumn))->GetItemType();
+ auto dataType = UnpackOptionalData(argType, isOptional);
+ switch (*dataType->GetDataSlot()) {
+ case NUdf::EDataSlot::Int8:
+ return std::make_unique<TPreparedAvgBlockAggregator<TTag, i8, arrow::Int8Scalar>>(filterColumn, argColumn, builderDataType);
+ case NUdf::EDataSlot::Uint8:
+ return std::make_unique<TPreparedAvgBlockAggregator<TTag, ui8, arrow::UInt8Scalar>>(filterColumn, argColumn, builderDataType);
+ case NUdf::EDataSlot::Int16:
+ return std::make_unique<TPreparedAvgBlockAggregator<TTag, i16, arrow::Int16Scalar>>(filterColumn, argColumn, builderDataType);
+ case NUdf::EDataSlot::Uint16:
+ return std::make_unique<TPreparedAvgBlockAggregator<TTag, ui16, arrow::UInt16Scalar>>(filterColumn, argColumn, builderDataType);
+ case NUdf::EDataSlot::Int32:
+ return std::make_unique<TPreparedAvgBlockAggregator<TTag, i32, arrow::Int32Scalar>>(filterColumn, argColumn, builderDataType);
+ case NUdf::EDataSlot::Uint32:
+ return std::make_unique<TPreparedAvgBlockAggregator<TTag, ui32, arrow::UInt32Scalar>>(filterColumn, argColumn, builderDataType);
+ case NUdf::EDataSlot::Int64:
+ return std::make_unique<TPreparedAvgBlockAggregator<TTag, i64, arrow::Int64Scalar>>(filterColumn, argColumn, builderDataType);
+ case NUdf::EDataSlot::Uint64:
+ return std::make_unique<TPreparedAvgBlockAggregator<TTag, ui64, arrow::UInt64Scalar>>(filterColumn, argColumn, builderDataType);
+ default:
+ throw yexception() << "Unsupported AVG input type";
+ }
+}
+
+template <>
+std::unique_ptr<typename TCombineAllTag::TPreparedAggregator> PrepareAvg<TCombineAllTag>(TTupleType* tupleType, std::optional<ui32> filterColumn, ui32 argColumn, const TTypeEnvironment& env) {
+ return PrepareAvgOverInput<TCombineAllTag>(tupleType, filterColumn, argColumn, env);
+}
+
+template <>
+std::unique_ptr<typename TCombineKeysTag::TPreparedAggregator> PrepareAvg<TCombineKeysTag>(TTupleType* tupleType, std::optional<ui32> filterColumn, ui32 argColumn, const TTypeEnvironment& env) {
+ return PrepareAvgOverInput<TCombineKeysTag>(tupleType, filterColumn, argColumn, env);
+}
+
+template <>
+std::unique_ptr<typename TFinalizeKeysTag::TPreparedAggregator> PrepareAvg<TFinalizeKeysTag>(TTupleType* tupleType, std::optional<ui32> filterColumn, ui32 argColumn, const TTypeEnvironment& env) {
+ Y_UNUSED(tupleType);
+ Y_UNUSED(filterColumn);
+ Y_UNUSED(env);
+ return std::make_unique<TPreparedAvgBlockAggregatorOverState>(argColumn);
+}
+
class TBlockAvgFactory : public IBlockAggregatorFactory {
public:
- std::unique_ptr<IPreparedBlockAggregator> Prepare(
- TTupleType* tupleType,
- std::optional<ui32> filterColumn,
- const std::vector<ui32>& argsColumns,
- const TTypeEnvironment& env) const final {
-
- auto doubleType = TDataType::Create(NUdf::TDataType<double>::Id, env);
- auto ui64Type = TDataType::Create(NUdf::TDataType<ui64>::Id, env);
- TVector<TType*> tupleElements = { doubleType, ui64Type };
- auto avgRetType = TTupleType::Create(2, tupleElements.data(), env);
- std::shared_ptr<arrow::DataType> builderDataType;
- bool isOptional;
- MKQL_ENSURE(ConvertArrowType(avgRetType, isOptional, builderDataType), "Unsupported builder type");
-
- auto argType = AS_TYPE(TBlockType, tupleType->GetElementType(argsColumns[0]))->GetItemType();
- auto dataType = UnpackOptionalData(argType, isOptional);
- switch (*dataType->GetDataSlot()) {
- case NUdf::EDataSlot::Int8:
- return std::make_unique<TPreparedAvgBlockAggregator<i8, arrow::Int8Scalar>>(filterColumn, argsColumns[0], builderDataType);
- case NUdf::EDataSlot::Uint8:
- return std::make_unique<TPreparedAvgBlockAggregator<ui8, arrow::UInt8Scalar>>(filterColumn, argsColumns[0], builderDataType);
- case NUdf::EDataSlot::Int16:
- return std::make_unique<TPreparedAvgBlockAggregator<i16, arrow::Int16Scalar>>(filterColumn, argsColumns[0], builderDataType);
- case NUdf::EDataSlot::Uint16:
- return std::make_unique<TPreparedAvgBlockAggregator<ui16, arrow::UInt16Scalar>>(filterColumn, argsColumns[0], builderDataType);
- case NUdf::EDataSlot::Int32:
- return std::make_unique<TPreparedAvgBlockAggregator<i32, arrow::Int32Scalar>>(filterColumn, argsColumns[0], builderDataType);
- case NUdf::EDataSlot::Uint32:
- return std::make_unique<TPreparedAvgBlockAggregator<ui32, arrow::UInt32Scalar>>(filterColumn, argsColumns[0], builderDataType);
- case NUdf::EDataSlot::Int64:
- return std::make_unique<TPreparedAvgBlockAggregator<i64, arrow::Int64Scalar>>(filterColumn, argsColumns[0], builderDataType);
- case NUdf::EDataSlot::Uint64:
- return std::make_unique<TPreparedAvgBlockAggregator<ui64, arrow::UInt64Scalar>>(filterColumn, argsColumns[0], builderDataType);
- default:
- throw yexception() << "Unsupported AVG input type";
- }
- }
+ std::unique_ptr<TCombineAllTag::TPreparedAggregator> PrepareCombineAll(
+ TTupleType* tupleType,
+ std::optional<ui32> filterColumn,
+ const std::vector<ui32>& argsColumns,
+ const TTypeEnvironment& env) const final {
+ return PrepareAvg<TCombineAllTag>(tupleType, filterColumn, argsColumns[0], env);
+ }
+
+ std::unique_ptr<TCombineKeysTag::TPreparedAggregator> PrepareCombineKeys(
+ TTupleType* tupleType,
+ std::optional<ui32> filterColumn,
+ const std::vector<ui32>& argsColumns,
+ const TTypeEnvironment& env) const final {
+ return PrepareAvg<TCombineKeysTag>(tupleType, filterColumn, argsColumns[0], env);
+ }
+
+ std::unique_ptr<TFinalizeKeysTag::TPreparedAggregator> PrepareFinalizeKeys(
+ TTupleType* tupleType,
+ const std::vector<ui32>& argsColumns,
+ const TTypeEnvironment& env) const final {
+ return PrepareAvg<TFinalizeKeysTag>(tupleType, std::optional<ui32>(), argsColumns[0], env);
+ }
};
+}
+
std::unique_ptr<IBlockAggregatorFactory> MakeBlockSumFactory() {
return std::make_unique<TBlockSumFactory>();
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp
index ddf51de9bda..a6d4882ac35 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp
@@ -283,6 +283,7 @@ struct TCallableComputationNodeBuilderFuncMapFiller {
{"BlockCompress", &WrapBlockCompress},
{"BlockCombineAll", &WrapBlockCombineAll},
{"BlockCombineHashed", &WrapBlockCombineHashed},
+ {"BlockMergeFinalizeHashed", &WrapBlockMergeFinalizeHashed},
{"MakeHeap", &WrapMakeHeap},
{"PushHeap", &WrapPushHeap},
{"PopHeap", &WrapPopHeap},
diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp
index 53c27b7e471..0f4ec2b5754 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.cpp
+++ b/ydb/library/yql/minikql/mkql_program_builder.cpp
@@ -5372,6 +5372,36 @@ TRuntimeNode TProgramBuilder::BlockCombineHashed(TRuntimeNode flow, std::optiona
return TRuntimeNode(builder.Build(), false);
}
+TRuntimeNode TProgramBuilder::BlockMergeFinalizeHashed(TRuntimeNode flow, const TArrayRef<ui32>& keys,
+ const TArrayRef<const TAggInfo>& aggs, TType* returnType) {
+ if constexpr (RuntimeVersion < 31U) {
+ THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
+ }
+
+ TCallableBuilder builder(Env, __func__, returnType);
+ builder.Add(flow);
+
+ TVector<TRuntimeNode> keyNodes;
+ for (const auto& key : keys) {
+ keyNodes.push_back(NewDataLiteral<ui32>(key));
+ }
+
+ builder.Add(NewTuple(keyNodes));
+ TVector<TRuntimeNode> aggsNodes;
+ for (const auto& agg : aggs) {
+ TVector<TRuntimeNode> params;
+ params.push_back(NewDataLiteral<NUdf::EDataSlot::String>(agg.Name));
+ for (const auto& col : agg.ArgsColumns) {
+ params.push_back(NewDataLiteral<ui32>(col));
+ }
+
+ aggsNodes.push_back(NewTuple(params));
+ }
+
+ builder.Add(NewTuple(aggsNodes));
+ return TRuntimeNode(builder.Build(), false);
+}
+
bool CanExportType(TType* type, const TTypeEnvironment& env) {
if (type->GetKind() == TType::EKind::Type) {
return false; // Type of Type
diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h
index 1214c5ddc5a..81f4b13e6a9 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.h
+++ b/ydb/library/yql/minikql/mkql_program_builder.h
@@ -261,6 +261,8 @@ public:
const TArrayRef<const TAggInfo>& aggs, TType* returnType);
TRuntimeNode BlockCombineHashed(TRuntimeNode flow, std::optional<ui32> filterColumn, const TArrayRef<ui32>& keys,
const TArrayRef<const TAggInfo>& aggs, TType* returnType);
+ TRuntimeNode BlockMergeFinalizeHashed(TRuntimeNode flow, const TArrayRef<ui32>& keys,
+ const TArrayRef<const TAggInfo>& aggs, TType* returnType);
// udfs
TRuntimeNode Udf(