diff options
author | vvvv <vvvv@ydb.tech> | 2022-11-07 21:01:23 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2022-11-07 21:01:23 +0300 |
commit | 6049c4f06222778f764111348c11ddbbacb50fa2 (patch) | |
tree | cd3502e4dcfb6965e786827344cac9513a7c2492 | |
parent | 6861509fb1a82c5cfcc57f994592c8dd66b56975 (diff) | |
download | ydb-6049c4f06222778f764111348c11ddbbacb50fa2.tar.gz |
initial version of sum
7 files changed, 156 insertions, 30 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/CMakeLists.txt b/ydb/library/yql/minikql/comp_nodes/CMakeLists.txt index 0dea4f84dc8..3e1f4f514be 100644 --- a/ydb/library/yql/minikql/comp_nodes/CMakeLists.txt +++ b/ydb/library/yql/minikql/comp_nodes/CMakeLists.txt @@ -38,6 +38,7 @@ target_sources(yql-minikql-comp_nodes PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_skiptake.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp index 8cd1c843eb4..4b01cc83512 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp @@ -15,10 +15,12 @@ class TBlockCombineAllWrapper : public TStatefulWideFlowComputationNode<TBlockCo public: TBlockCombineAllWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, + ui32 countColumn, size_t width, TVector<std::unique_ptr<IBlockAggregator>>&& aggs) : TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any) , Flow_(flow) + , CountColumn_(countColumn) , Width_(width) , Aggs_(std::move(aggs)) { @@ -38,13 +40,23 @@ public: if (result == EFetchResult::Yield) { return result; } else if (result == EFetchResult::One) { + ui64 batchLength = GetBatchLength(s.Values_.data()); + if (!batchLength) { + continue; + } + + s.HasValues_ = true; for (size_t i = 0; i < Aggs_.size(); ++i) { if (output[i]) { - Aggs_[i]->AddMany(s.Values_.data()); + Aggs_[i]->AddMany(s.Values_.data(), batchLength); } } } else { s.IsFinished_ = true; + if (!s.HasValues_) { + return EFetchResult::Finish; + } + for (size_t i = 0; i < Aggs_.size(); ++i) { if (auto* out = output[i]; out != nullptr) { *out = Aggs_[i]->Finish(); @@ -63,6 +75,7 @@ private: TVector<NUdf::TUnboxedValue> Values_; TVector<NUdf::TUnboxedValue*> ValuePointers_; bool IsFinished_ = false; + bool HasValues_ = false; TState(TMemoryUsageInfo* memInfo, size_t width) : TComputationValue(memInfo) @@ -87,9 +100,13 @@ private: return *static_cast<TState*>(state.AsBoxed().Get()); } + ui64 GetBatchLength(const NUdf::TUnboxedValue* columns) const { + return TArrowBlock::From(columns[CountColumn_]).GetDatum().scalar_as<arrow::UInt64Scalar>().value; + } + private: IComputationWideFlowNode* Flow_; - TTupleType* TupleType_; + const ui32 CountColumn_; const size_t Width_; TVector<std::unique_ptr<IBlockAggregator>> Aggs_; }; @@ -123,10 +140,10 @@ IComputationNode* WrapBlockCombineAll(TCallable& callable, const TComputationNod argColumns.push_back(AS_VALUE(TDataLiteral, aggVal->GetValue(j))->AsValue().Get<ui32>()); } - aggs.emplace_back(MakeBlockAggregator(name, tupleType, countColumn, filterColumn, argColumns)); + aggs.emplace_back(MakeBlockAggregator(name, tupleType, filterColumn, argColumns)); } - return new TBlockCombineAllWrapper(ctx.Mutables, wideFlow, tupleType->GetElementsCount(), std::move(aggs)); + return new TBlockCombineAllWrapper(ctx.Mutables, wideFlow, countColumn, tupleType->GetElementsCount(), std::move(aggs)); } } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp index 66cad7a1eec..765f9928f24 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp @@ -5,13 +5,14 @@ namespace NMiniKQL { class TCountAllBlockAggregator : public TBlockAggregatorBase { public: - TCountAllBlockAggregator(ui32 countColumn, std::optional<ui32> filterColumn) - : TBlockAggregatorBase(countColumn, filterColumn) + TCountAllBlockAggregator(std::optional<ui32> filterColumn) + : TBlockAggregatorBase(filterColumn) { } - void AddMany(const NUdf::TUnboxedValue* columns) final { - State_ += GetBatchLength(columns); + void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength) final { + Y_UNUSED(columns); + State_ += batchLength; } NUdf::TUnboxedValue Finish() final { @@ -24,17 +25,17 @@ private: class TCountBlockAggregator : public TBlockAggregatorBase { public: - TCountBlockAggregator(ui32 countColumn, std::optional<ui32> filterColumn, ui32 argColumn) - : TBlockAggregatorBase(countColumn, filterColumn) + TCountBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn) + : TBlockAggregatorBase(filterColumn) , ArgColumn_(argColumn) { } - void AddMany(const NUdf::TUnboxedValue* columns) final { + void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength) final { const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); if (datum.is_scalar()) { if (datum.scalar()->is_valid) { - State_ += GetBatchLength(columns); + State_ += batchLength; } } else { const auto& array = datum.array(); @@ -55,12 +56,11 @@ class TBlockCountAllFactory : public IBlockAggregatorFactory { public: std::unique_ptr<IBlockAggregator> Make( TTupleType* tupleType, - ui32 countColumn, std::optional<ui32> filterColumn, const std::vector<ui32>& argsColumns) const final { Y_UNUSED(tupleType); Y_UNUSED(argsColumns); - return std::make_unique<TCountAllBlockAggregator>(countColumn, filterColumn); + return std::make_unique<TCountAllBlockAggregator>(filterColumn); } }; @@ -68,11 +68,10 @@ class TBlockCountFactory : public IBlockAggregatorFactory { public: std::unique_ptr<IBlockAggregator> Make( TTupleType* tupleType, - ui32 countColumn, std::optional<ui32> filterColumn, const std::vector<ui32>& argsColumns) const final { Y_UNUSED(tupleType); - return std::make_unique<TCountBlockAggregator>(countColumn, filterColumn, argsColumns[0]); + return std::make_unique<TCountBlockAggregator>(filterColumn, argsColumns[0]); } }; diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp index c616657153c..732fd8d984c 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp @@ -1,5 +1,6 @@ #include "mkql_block_agg_factory.h" #include "mkql_block_agg_count.h" +#include "mkql_block_agg_sum.h" namespace NKikimr { namespace NMiniKQL { @@ -11,13 +12,13 @@ struct TAggregatorFactories { { Factories["count_all"] = MakeBlockCountAllFactory(); Factories["count"] = MakeBlockCountFactory(); + Factories["sum"] = MakeBlockSumFactory(); } }; std::unique_ptr<IBlockAggregator> MakeBlockAggregator( TStringBuf name, TTupleType* tupleType, - ui32 countColumn, std::optional<ui32> filterColumn, const std::vector<ui32>& argsColumns) { const auto& f = Singleton<TAggregatorFactories>()->Factories; @@ -26,7 +27,7 @@ std::unique_ptr<IBlockAggregator> MakeBlockAggregator( throw yexception() << "Unsupported block aggregation function: " << name; } - return it->second->Make(tupleType, countColumn, filterColumn, argsColumns); + return it->second->Make(tupleType, filterColumn, argsColumns); } } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h index 6af18abbcc2..21a4be24cd1 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h @@ -10,33 +10,25 @@ class IBlockAggregator { public: virtual ~IBlockAggregator() = default; - virtual void AddMany(const NUdf::TUnboxedValue* columns) = 0; + virtual void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength) = 0; virtual NUdf::TUnboxedValue Finish() = 0; }; class TBlockAggregatorBase : public IBlockAggregator { public: - TBlockAggregatorBase(ui32 countColumn, std::optional<ui32> filterColumn) - : CountColumn_(countColumn) - , FilterColumn_(filterColumn) + TBlockAggregatorBase(std::optional<ui32> filterColumn) + : FilterColumn_(filterColumn) { } protected: - ui64 GetBatchLength(const NUdf::TUnboxedValue* columns) const { - return TArrowBlock::From(columns[CountColumn_]).GetDatum().scalar_as<arrow::UInt64Scalar>().value; - } - -protected: - const ui32 CountColumn_; const std::optional<ui32> FilterColumn_; }; std::unique_ptr<IBlockAggregator> MakeBlockAggregator( TStringBuf name, TTupleType* tupleType, - ui32 countColumn, std::optional<ui32> filterColumn, const std::vector<ui32>& argsColumns); @@ -46,7 +38,6 @@ public: virtual std::unique_ptr<IBlockAggregator> Make( TTupleType* tupleType, - ui32 countColumn, std::optional<ui32> filterColumn, const std::vector<ui32>& argsColumns) const = 0; }; diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp new file mode 100644 index 00000000000..4ab1fa71e04 --- /dev/null +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp @@ -0,0 +1,107 @@ +#include "mkql_block_agg_sum.h" + +#include <ydb/library/yql/minikql/mkql_node_builder.h> +#include <ydb/library/yql/minikql/mkql_node_cast.h> + +#include <arrow/scalar.h> + +namespace NKikimr { +namespace NMiniKQL { + +template <typename TIn, typename TState, typename TInScalar> +class TSumBlockAggregator : public TBlockAggregatorBase { +public: + TSumBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn) + : TBlockAggregatorBase(filterColumn) + , ArgColumn_(argColumn) + { + } + + void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength) final { + const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); + if (datum.is_scalar()) { + if (datum.scalar()->is_valid) { + State_ += batchLength * datum.scalar_as<TInScalar>().value; + Count_ += batchLength; + } + } else { + const auto& array = datum.array(); + auto ptr = array->GetValues<TIn>(1); + auto len = array->length; + auto count = len - array->GetNullCount(); + if (!count) { + return; + } + + Count_ += count; + TState state = State_; + if (array->GetNullCount() == 0) { + for (int64_t i = 0; i < len; ++i) { + state += ptr[i]; + } + } else { + auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0); + for (int64_t i = 0; i < len; ++i) { + ui64 fullIndex = i + array->offset; + // bit 1 -> mask 0xFF..FF, bit 0 -> mask 0x00..00 + TState mask = (((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1) ^ 1) - TState(1); + state += ptr[i] & mask; + } + } + + State_ = state; + } + } + + NUdf::TUnboxedValue Finish() final { + if (!Count_) { + return NUdf::TUnboxedValuePod(); + } + + return NUdf::TUnboxedValuePod(State_); + } + +private: + const ui32 ArgColumn_; + TState State_ = 0; + ui64 Count_ = 0; +}; + +class TBlockSumFactory : public IBlockAggregatorFactory { +public: + std::unique_ptr<IBlockAggregator> Make( + TTupleType* tupleType, + std::optional<ui32> filterColumn, + const std::vector<ui32>& argsColumns) const final { + auto argType = AS_TYPE(TBlockType, tupleType->GetElementType(argsColumns[0]))->GetItemType(); + bool isOptional; + auto dataType = UnpackOptionalData(argType, isOptional); + switch (*dataType->GetDataSlot()) { + case NUdf::EDataSlot::Int8: + return std::make_unique<TSumBlockAggregator<i8, i64, arrow::Int8Scalar>>(filterColumn, argsColumns[0]); + case NUdf::EDataSlot::Uint8: + return std::make_unique<TSumBlockAggregator<ui8, ui64, arrow::UInt8Scalar>>(filterColumn, argsColumns[0]); + case NUdf::EDataSlot::Int16: + return std::make_unique<TSumBlockAggregator<i16, i64, arrow::Int16Scalar>>(filterColumn, argsColumns[0]); + case NUdf::EDataSlot::Uint16: + return std::make_unique<TSumBlockAggregator<ui16, ui64, arrow::UInt16Scalar>>(filterColumn, argsColumns[0]); + case NUdf::EDataSlot::Int32: + return std::make_unique<TSumBlockAggregator<i32, i64, arrow::Int32Scalar>>(filterColumn, argsColumns[0]); + case NUdf::EDataSlot::Uint32: + return std::make_unique<TSumBlockAggregator<ui32, ui64, arrow::UInt32Scalar>>(filterColumn, argsColumns[0]); + case NUdf::EDataSlot::Int64: + return std::make_unique<TSumBlockAggregator<i64, i64, arrow::Int64Scalar>>(filterColumn, argsColumns[0]); + case NUdf::EDataSlot::Uint64: + return std::make_unique<TSumBlockAggregator<ui64, ui64, arrow::UInt64Scalar>>(filterColumn, argsColumns[0]); + default: + throw yexception() << "Unsupported SUM input type"; + } + } +}; + +std::unique_ptr<IBlockAggregatorFactory> MakeBlockSumFactory() { + return std::make_unique<TBlockSumFactory>(); +} + +} +} diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.h new file mode 100644 index 00000000000..14ac5d8c132 --- /dev/null +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.h @@ -0,0 +1,10 @@ +#pragma once +#include "mkql_block_agg_factory.h" + +namespace NKikimr { +namespace NMiniKQL { + +std::unique_ptr<IBlockAggregatorFactory> MakeBlockSumFactory(); + +} +} |