aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2022-11-07 21:01:23 +0300
committervvvv <vvvv@ydb.tech>2022-11-07 21:01:23 +0300
commit6049c4f06222778f764111348c11ddbbacb50fa2 (patch)
treecd3502e4dcfb6965e786827344cac9513a7c2492
parent6861509fb1a82c5cfcc57f994592c8dd66b56975 (diff)
downloadydb-6049c4f06222778f764111348c11ddbbacb50fa2.tar.gz
initial version of sum
-rw-r--r--ydb/library/yql/minikql/comp_nodes/CMakeLists.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp25
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp23
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp5
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h15
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp107
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.h10
7 files changed, 156 insertions, 30 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/CMakeLists.txt b/ydb/library/yql/minikql/comp_nodes/CMakeLists.txt
index 0dea4f84dc8..3e1f4f514be 100644
--- a/ydb/library/yql/minikql/comp_nodes/CMakeLists.txt
+++ b/ydb/library/yql/minikql/comp_nodes/CMakeLists.txt
@@ -38,6 +38,7 @@ target_sources(yql-minikql-comp_nodes PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_skiptake.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
index 8cd1c843eb4..4b01cc83512 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
@@ -15,10 +15,12 @@ class TBlockCombineAllWrapper : public TStatefulWideFlowComputationNode<TBlockCo
public:
TBlockCombineAllWrapper(TComputationMutables& mutables,
IComputationWideFlowNode* flow,
+ ui32 countColumn,
size_t width,
TVector<std::unique_ptr<IBlockAggregator>>&& aggs)
: TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any)
, Flow_(flow)
+ , CountColumn_(countColumn)
, Width_(width)
, Aggs_(std::move(aggs))
{
@@ -38,13 +40,23 @@ public:
if (result == EFetchResult::Yield) {
return result;
} else if (result == EFetchResult::One) {
+ ui64 batchLength = GetBatchLength(s.Values_.data());
+ if (!batchLength) {
+ continue;
+ }
+
+ s.HasValues_ = true;
for (size_t i = 0; i < Aggs_.size(); ++i) {
if (output[i]) {
- Aggs_[i]->AddMany(s.Values_.data());
+ Aggs_[i]->AddMany(s.Values_.data(), batchLength);
}
}
} else {
s.IsFinished_ = true;
+ if (!s.HasValues_) {
+ return EFetchResult::Finish;
+ }
+
for (size_t i = 0; i < Aggs_.size(); ++i) {
if (auto* out = output[i]; out != nullptr) {
*out = Aggs_[i]->Finish();
@@ -63,6 +75,7 @@ private:
TVector<NUdf::TUnboxedValue> Values_;
TVector<NUdf::TUnboxedValue*> ValuePointers_;
bool IsFinished_ = false;
+ bool HasValues_ = false;
TState(TMemoryUsageInfo* memInfo, size_t width)
: TComputationValue(memInfo)
@@ -87,9 +100,13 @@ private:
return *static_cast<TState*>(state.AsBoxed().Get());
}
+ ui64 GetBatchLength(const NUdf::TUnboxedValue* columns) const {
+ return TArrowBlock::From(columns[CountColumn_]).GetDatum().scalar_as<arrow::UInt64Scalar>().value;
+ }
+
private:
IComputationWideFlowNode* Flow_;
- TTupleType* TupleType_;
+ const ui32 CountColumn_;
const size_t Width_;
TVector<std::unique_ptr<IBlockAggregator>> Aggs_;
};
@@ -123,10 +140,10 @@ IComputationNode* WrapBlockCombineAll(TCallable& callable, const TComputationNod
argColumns.push_back(AS_VALUE(TDataLiteral, aggVal->GetValue(j))->AsValue().Get<ui32>());
}
- aggs.emplace_back(MakeBlockAggregator(name, tupleType, countColumn, filterColumn, argColumns));
+ aggs.emplace_back(MakeBlockAggregator(name, tupleType, filterColumn, argColumns));
}
- return new TBlockCombineAllWrapper(ctx.Mutables, wideFlow, tupleType->GetElementsCount(), std::move(aggs));
+ return new TBlockCombineAllWrapper(ctx.Mutables, wideFlow, countColumn, tupleType->GetElementsCount(), std::move(aggs));
}
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp
index 66cad7a1eec..765f9928f24 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp
@@ -5,13 +5,14 @@ namespace NMiniKQL {
class TCountAllBlockAggregator : public TBlockAggregatorBase {
public:
- TCountAllBlockAggregator(ui32 countColumn, std::optional<ui32> filterColumn)
- : TBlockAggregatorBase(countColumn, filterColumn)
+ TCountAllBlockAggregator(std::optional<ui32> filterColumn)
+ : TBlockAggregatorBase(filterColumn)
{
}
- void AddMany(const NUdf::TUnboxedValue* columns) final {
- State_ += GetBatchLength(columns);
+ void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength) final {
+ Y_UNUSED(columns);
+ State_ += batchLength;
}
NUdf::TUnboxedValue Finish() final {
@@ -24,17 +25,17 @@ private:
class TCountBlockAggregator : public TBlockAggregatorBase {
public:
- TCountBlockAggregator(ui32 countColumn, std::optional<ui32> filterColumn, ui32 argColumn)
- : TBlockAggregatorBase(countColumn, filterColumn)
+ TCountBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn)
+ : TBlockAggregatorBase(filterColumn)
, ArgColumn_(argColumn)
{
}
- void AddMany(const NUdf::TUnboxedValue* columns) final {
+ void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength) final {
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
if (datum.is_scalar()) {
if (datum.scalar()->is_valid) {
- State_ += GetBatchLength(columns);
+ State_ += batchLength;
}
} else {
const auto& array = datum.array();
@@ -55,12 +56,11 @@ class TBlockCountAllFactory : public IBlockAggregatorFactory {
public:
std::unique_ptr<IBlockAggregator> Make(
TTupleType* tupleType,
- ui32 countColumn,
std::optional<ui32> filterColumn,
const std::vector<ui32>& argsColumns) const final {
Y_UNUSED(tupleType);
Y_UNUSED(argsColumns);
- return std::make_unique<TCountAllBlockAggregator>(countColumn, filterColumn);
+ return std::make_unique<TCountAllBlockAggregator>(filterColumn);
}
};
@@ -68,11 +68,10 @@ class TBlockCountFactory : public IBlockAggregatorFactory {
public:
std::unique_ptr<IBlockAggregator> Make(
TTupleType* tupleType,
- ui32 countColumn,
std::optional<ui32> filterColumn,
const std::vector<ui32>& argsColumns) const final {
Y_UNUSED(tupleType);
- return std::make_unique<TCountBlockAggregator>(countColumn, filterColumn, argsColumns[0]);
+ return std::make_unique<TCountBlockAggregator>(filterColumn, argsColumns[0]);
}
};
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp
index c616657153c..732fd8d984c 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp
@@ -1,5 +1,6 @@
#include "mkql_block_agg_factory.h"
#include "mkql_block_agg_count.h"
+#include "mkql_block_agg_sum.h"
namespace NKikimr {
namespace NMiniKQL {
@@ -11,13 +12,13 @@ struct TAggregatorFactories {
{
Factories["count_all"] = MakeBlockCountAllFactory();
Factories["count"] = MakeBlockCountFactory();
+ Factories["sum"] = MakeBlockSumFactory();
}
};
std::unique_ptr<IBlockAggregator> MakeBlockAggregator(
TStringBuf name,
TTupleType* tupleType,
- ui32 countColumn,
std::optional<ui32> filterColumn,
const std::vector<ui32>& argsColumns) {
const auto& f = Singleton<TAggregatorFactories>()->Factories;
@@ -26,7 +27,7 @@ std::unique_ptr<IBlockAggregator> MakeBlockAggregator(
throw yexception() << "Unsupported block aggregation function: " << name;
}
- return it->second->Make(tupleType, countColumn, filterColumn, argsColumns);
+ return it->second->Make(tupleType, filterColumn, argsColumns);
}
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h
index 6af18abbcc2..21a4be24cd1 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h
@@ -10,33 +10,25 @@ class IBlockAggregator {
public:
virtual ~IBlockAggregator() = default;
- virtual void AddMany(const NUdf::TUnboxedValue* columns) = 0;
+ virtual void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength) = 0;
virtual NUdf::TUnboxedValue Finish() = 0;
};
class TBlockAggregatorBase : public IBlockAggregator {
public:
- TBlockAggregatorBase(ui32 countColumn, std::optional<ui32> filterColumn)
- : CountColumn_(countColumn)
- , FilterColumn_(filterColumn)
+ TBlockAggregatorBase(std::optional<ui32> filterColumn)
+ : FilterColumn_(filterColumn)
{
}
protected:
- ui64 GetBatchLength(const NUdf::TUnboxedValue* columns) const {
- return TArrowBlock::From(columns[CountColumn_]).GetDatum().scalar_as<arrow::UInt64Scalar>().value;
- }
-
-protected:
- const ui32 CountColumn_;
const std::optional<ui32> FilterColumn_;
};
std::unique_ptr<IBlockAggregator> MakeBlockAggregator(
TStringBuf name,
TTupleType* tupleType,
- ui32 countColumn,
std::optional<ui32> filterColumn,
const std::vector<ui32>& argsColumns);
@@ -46,7 +38,6 @@ public:
virtual std::unique_ptr<IBlockAggregator> Make(
TTupleType* tupleType,
- ui32 countColumn,
std::optional<ui32> filterColumn,
const std::vector<ui32>& argsColumns) const = 0;
};
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
new file mode 100644
index 00000000000..4ab1fa71e04
--- /dev/null
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
@@ -0,0 +1,107 @@
+#include "mkql_block_agg_sum.h"
+
+#include <ydb/library/yql/minikql/mkql_node_builder.h>
+#include <ydb/library/yql/minikql/mkql_node_cast.h>
+
+#include <arrow/scalar.h>
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+template <typename TIn, typename TState, typename TInScalar>
+class TSumBlockAggregator : public TBlockAggregatorBase {
+public:
+ TSumBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn)
+ : TBlockAggregatorBase(filterColumn)
+ , ArgColumn_(argColumn)
+ {
+ }
+
+ void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength) final {
+ const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
+ if (datum.is_scalar()) {
+ if (datum.scalar()->is_valid) {
+ State_ += batchLength * datum.scalar_as<TInScalar>().value;
+ Count_ += batchLength;
+ }
+ } else {
+ const auto& array = datum.array();
+ auto ptr = array->GetValues<TIn>(1);
+ auto len = array->length;
+ auto count = len - array->GetNullCount();
+ if (!count) {
+ return;
+ }
+
+ Count_ += count;
+ TState state = State_;
+ if (array->GetNullCount() == 0) {
+ for (int64_t i = 0; i < len; ++i) {
+ state += ptr[i];
+ }
+ } else {
+ auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
+ for (int64_t i = 0; i < len; ++i) {
+ ui64 fullIndex = i + array->offset;
+ // bit 1 -> mask 0xFF..FF, bit 0 -> mask 0x00..00
+ TState mask = (((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1) ^ 1) - TState(1);
+ state += ptr[i] & mask;
+ }
+ }
+
+ State_ = state;
+ }
+ }
+
+ NUdf::TUnboxedValue Finish() final {
+ if (!Count_) {
+ return NUdf::TUnboxedValuePod();
+ }
+
+ return NUdf::TUnboxedValuePod(State_);
+ }
+
+private:
+ const ui32 ArgColumn_;
+ TState State_ = 0;
+ ui64 Count_ = 0;
+};
+
+class TBlockSumFactory : public IBlockAggregatorFactory {
+public:
+ std::unique_ptr<IBlockAggregator> Make(
+ TTupleType* tupleType,
+ std::optional<ui32> filterColumn,
+ const std::vector<ui32>& argsColumns) const final {
+ auto argType = AS_TYPE(TBlockType, tupleType->GetElementType(argsColumns[0]))->GetItemType();
+ bool isOptional;
+ auto dataType = UnpackOptionalData(argType, isOptional);
+ switch (*dataType->GetDataSlot()) {
+ case NUdf::EDataSlot::Int8:
+ return std::make_unique<TSumBlockAggregator<i8, i64, arrow::Int8Scalar>>(filterColumn, argsColumns[0]);
+ case NUdf::EDataSlot::Uint8:
+ return std::make_unique<TSumBlockAggregator<ui8, ui64, arrow::UInt8Scalar>>(filterColumn, argsColumns[0]);
+ case NUdf::EDataSlot::Int16:
+ return std::make_unique<TSumBlockAggregator<i16, i64, arrow::Int16Scalar>>(filterColumn, argsColumns[0]);
+ case NUdf::EDataSlot::Uint16:
+ return std::make_unique<TSumBlockAggregator<ui16, ui64, arrow::UInt16Scalar>>(filterColumn, argsColumns[0]);
+ case NUdf::EDataSlot::Int32:
+ return std::make_unique<TSumBlockAggregator<i32, i64, arrow::Int32Scalar>>(filterColumn, argsColumns[0]);
+ case NUdf::EDataSlot::Uint32:
+ return std::make_unique<TSumBlockAggregator<ui32, ui64, arrow::UInt32Scalar>>(filterColumn, argsColumns[0]);
+ case NUdf::EDataSlot::Int64:
+ return std::make_unique<TSumBlockAggregator<i64, i64, arrow::Int64Scalar>>(filterColumn, argsColumns[0]);
+ case NUdf::EDataSlot::Uint64:
+ return std::make_unique<TSumBlockAggregator<ui64, ui64, arrow::UInt64Scalar>>(filterColumn, argsColumns[0]);
+ default:
+ throw yexception() << "Unsupported SUM input type";
+ }
+ }
+};
+
+std::unique_ptr<IBlockAggregatorFactory> MakeBlockSumFactory() {
+ return std::make_unique<TBlockSumFactory>();
+}
+
+}
+}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.h
new file mode 100644
index 00000000000..14ac5d8c132
--- /dev/null
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.h
@@ -0,0 +1,10 @@
+#pragma once
+#include "mkql_block_agg_factory.h"
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+std::unique_ptr<IBlockAggregatorFactory> MakeBlockSumFactory();
+
+}
+}