aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Ozeritskiy <aozeritsky@ydb.tech>2024-08-08 21:56:06 +0200
committerGitHub <noreply@github.com>2024-08-08 22:56:06 +0300
commitf71631da8acb44db9abdcb8ebb921da9f631dcfd (patch)
treeb4cccf6b0528d5368a1d3c3d1d23c54d82725433
parent39c75e8b8b8f6d232ca72135eb9ae44b092806d3 (diff)
downloadydb-f71631da8acb44db9abdcb8ebb921da9f631dcfd.tar.gz
Add decimal support to arrow reader (#6151)
-rw-r--r--ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp2
-rw-r--r--ydb/library/yql/minikql/arrow/arrow_util.h47
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp42
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_state_helper.h75
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp205
-rw-r--r--ydb/library/yql/minikql/computation/mkql_block_impl.cpp5
-rw-r--r--ydb/library/yql/minikql/computation/mkql_block_reader.cpp23
-rw-r--r--ydb/library/yql/minikql/mkql_type_builder.cpp5
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.cpp18
-rw-r--r--ydb/library/yql/public/decimal/yql_decimal.cpp2
-rw-r--r--ydb/library/yql/public/decimal/yql_decimal.h56
-rw-r--r--ydb/library/yql/public/udf/arrow/block_builder.h33
-rw-r--r--ydb/library/yql/public/udf/arrow/block_io_buffer.h5
-rw-r--r--ydb/library/yql/public/udf/arrow/block_item.h13
-rw-r--r--ydb/library/yql/public/udf/arrow/block_item_comparator.h22
-rw-r--r--ydb/library/yql/public/udf/arrow/block_item_hasher.h8
-rw-r--r--ydb/library/yql/public/udf/arrow/block_reader.h27
-rw-r--r--ydb/library/yql/tests/sql/dq_file/part11/canondata/result.json22
-rw-r--r--ydb/library/yql/tests/sql/dq_file/part18/canondata/result.json22
-rw-r--r--ydb/library/yql/tests/sql/dq_file/part9/canondata/result.json22
-rw-r--r--ydb/library/yql/tests/sql/hybrid_file/part1/canondata/result.json14
-rw-r--r--ydb/library/yql/tests/sql/hybrid_file/part10/canondata/result.json14
-rw-r--r--ydb/library/yql/tests/sql/hybrid_file/part6/canondata/result.json14
-rw-r--r--ydb/library/yql/tests/sql/sql2yql/canondata/result.json42
-rw-r--r--ydb/library/yql/tests/sql/suites/blocks/add_decimal.cfg1
-rw-r--r--ydb/library/yql/tests/sql/suites/blocks/add_decimal.sql5
-rw-r--r--ydb/library/yql/tests/sql/suites/blocks/combine_all_decimal.cfg1
-rw-r--r--ydb/library/yql/tests/sql/suites/blocks/combine_all_decimal.sql15
-rw-r--r--ydb/library/yql/tests/sql/suites/blocks/combine_all_decimal_max.sql9
-rw-r--r--ydb/library/yql/tests/sql/suites/blocks/input_decimal.txt10
-rw-r--r--ydb/library/yql/tests/sql/suites/blocks/input_decimal.txt.attr7
-rw-r--r--ydb/library/yql/tests/sql/yt_native_file/part11/canondata/result.json28
-rw-r--r--ydb/library/yql/tests/sql/yt_native_file/part18/canondata/result.json28
-rw-r--r--ydb/library/yql/tests/sql/yt_native_file/part9/canondata/result.json28
34 files changed, 766 insertions, 104 deletions
diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
index de634ba3580..eb36108b96a 100644
--- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
+++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
@@ -7732,7 +7732,7 @@ Y_UNIT_TEST_SUITE(KqpOlapTypes) {
testHelper.ReadData("SELECT dec FROM `/Root/ColumnTableTest` WHERE id=2", "[[\"inf\"]]");
testHelper.ReadData("SELECT dec FROM `/Root/ColumnTableTest` WHERE id=3", "[[\"-inf\"]]");
testHelper.ReadData("SELECT dec FROM `/Root/ColumnTableTest` WHERE id=4", "[[\"nan\"]]");
- testHelper.ReadData("SELECT dec FROM `/Root/ColumnTableTest` WHERE id=5", "[[\"-nan\"]]");
+ testHelper.ReadData("SELECT dec FROM `/Root/ColumnTableTest` WHERE id=5", "[[\"nan\"]]");
testHelper.ReadData("SELECT id FROM `/Root/ColumnTableTest` WHERE dec=CAST(\"10.1\" As Decimal(22,9))", "[[1]]");
testHelper.ReadData("SELECT id FROM `/Root/ColumnTableTest` WHERE dec=CAST(\"inf\" As Decimal(22,9)) ORDER BY id", "[[2];[8]]");
testHelper.ReadData("SELECT id FROM `/Root/ColumnTableTest` WHERE dec=CAST(\"-inf\" As Decimal(22,9)) ORDER BY id", "[[3];[9]]");
diff --git a/ydb/library/yql/minikql/arrow/arrow_util.h b/ydb/library/yql/minikql/arrow/arrow_util.h
index a1847999107..a024a13687a 100644
--- a/ydb/library/yql/minikql/arrow/arrow_util.h
+++ b/ydb/library/yql/minikql/arrow/arrow_util.h
@@ -74,6 +74,7 @@ struct TPrimitiveDataType;
template<>
struct TPrimitiveDataType<bool> {
using TLayout = ui8;
+ using TArithmetic = ui8;
using TResult = arrow::UInt8Type;
using TScalarResult = arrow::UInt8Scalar;
};
@@ -81,6 +82,7 @@ struct TPrimitiveDataType<bool> {
template<>
struct TPrimitiveDataType<i8> {
using TLayout = i8;
+ using TArithmetic = i8;
using TResult = arrow::Int8Type;
using TScalarResult = arrow::Int8Scalar;
};
@@ -88,6 +90,7 @@ struct TPrimitiveDataType<i8> {
template<>
struct TPrimitiveDataType<ui8> {
using TLayout = ui8;
+ using TArithmetic = ui8;
using TResult = arrow::UInt8Type;
using TScalarResult = arrow::UInt8Scalar;
};
@@ -95,6 +98,7 @@ struct TPrimitiveDataType<ui8> {
template<>
struct TPrimitiveDataType<i16> {
using TLayout = i16;
+ using TArithmetic = i16;
using TResult = arrow::Int16Type;
using TScalarResult = arrow::Int16Scalar;
};
@@ -102,6 +106,7 @@ struct TPrimitiveDataType<i16> {
template<>
struct TPrimitiveDataType<ui16> {
using TLayout = ui16;
+ using TArithmetic = ui16;
using TResult = arrow::UInt16Type;
using TScalarResult = arrow::UInt16Scalar;
};
@@ -109,6 +114,7 @@ struct TPrimitiveDataType<ui16> {
template<>
struct TPrimitiveDataType<i32> {
using TLayout = i32;
+ using TArithmetic = i32;
using TResult = arrow::Int32Type;
using TScalarResult = arrow::Int32Scalar;
};
@@ -116,6 +122,7 @@ struct TPrimitiveDataType<i32> {
template<>
struct TPrimitiveDataType<ui32> {
using TLayout = ui32;
+ using TArithmetic = ui32;
using TResult = arrow::UInt32Type;
using TScalarResult = arrow::UInt32Scalar;
};
@@ -123,6 +130,7 @@ struct TPrimitiveDataType<ui32> {
template<>
struct TPrimitiveDataType<i64> {
using TLayout = i64;
+ using TArithmetic = i64;
using TResult = arrow::Int64Type;
using TScalarResult = arrow::Int64Scalar;
};
@@ -130,6 +138,7 @@ struct TPrimitiveDataType<i64> {
template<>
struct TPrimitiveDataType<ui64> {
using TLayout = ui64;
+ using TArithmetic = ui64;
using TResult = arrow::UInt64Type;
using TScalarResult = arrow::UInt64Scalar;
};
@@ -137,6 +146,7 @@ struct TPrimitiveDataType<ui64> {
template<>
struct TPrimitiveDataType<float> {
using TLayout = float;
+ using TArithmetic = float;
using TResult = arrow::FloatType;
using TScalarResult = arrow::FloatScalar;
};
@@ -144,6 +154,7 @@ struct TPrimitiveDataType<float> {
template<>
struct TPrimitiveDataType<double> {
using TLayout = double;
+ using TArithmetic = double;
using TResult = arrow::DoubleType;
using TScalarResult = arrow::DoubleScalar;
};
@@ -160,6 +171,31 @@ struct TPrimitiveDataType<NYql::NUdf::TUtf8> {
using TScalarResult = arrow::StringScalar;
};
+template<>
+struct TPrimitiveDataType<NYql::NDecimal::TInt128> {
+ using TArithmetic = NYql::NDecimal::TDecimal;
+
+ class TResult: public arrow::FixedSizeBinaryType
+ {
+ public:
+ TResult(): arrow::FixedSizeBinaryType(16)
+ { }
+ };
+
+
+ class TScalarResult: public arrow::FixedSizeBinaryScalar
+ {
+ public:
+ TScalarResult(std::shared_ptr<arrow::Buffer> value)
+ : arrow::FixedSizeBinaryScalar(std::move(value), arrow::fixed_size_binary(16))
+ { }
+
+ TScalarResult()
+ : arrow::FixedSizeBinaryScalar(arrow::fixed_size_binary(16))
+ { }
+ };
+};
+
template <typename T, typename = typename std::enable_if<std::is_arithmetic<T>::value>::type>
inline arrow::Datum MakeScalarDatum(T value) {
return arrow::Datum(std::make_shared<typename TPrimitiveDataType<T>::TScalarResult>(value));
@@ -179,3 +215,14 @@ inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType() {
using NYql::NUdf::TTypedBufferBuilder;
}
+
+namespace arrow {
+
+template <>
+struct TypeTraits<typename NKikimr::NMiniKQL::TPrimitiveDataType<NYql::NDecimal::TInt128>::TResult> {
+ static inline std::shared_ptr<DataType> type_singleton() {
+ return arrow::fixed_size_binary(16);
+ }
+};
+
+}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp
index 3b47523256e..0c2dae395ff 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp
@@ -1,4 +1,5 @@
#include "mkql_block_agg_minmax.h"
+#include "mkql_block_agg_state_helper.h"
#include <ydb/library/yql/minikql/mkql_node_cast.h>
#include <ydb/library/yql/minikql/mkql_node_builder.h>
@@ -95,6 +96,12 @@ constexpr TIn InitialStateValue() {
} else {
return -std::numeric_limits<TIn>::infinity();
}
+ } else if constexpr (std::is_same_v<TIn, NYql::NDecimal::TInt128>) {
+ if constexpr (IsMin) {
+ return NYql::NDecimal::Nan();
+ } else {
+ return -NYql::NDecimal::Inf();
+ }
} else {
if constexpr (IsMin) {
return std::numeric_limits<TIn>::max();
@@ -129,7 +136,7 @@ public:
}
void Add(const void* state) final {
- auto typedState = static_cast<const TStateType*>(state);
+ auto typedState = MakeStateWrapper<TStateType>(state);
if constexpr (IsNullable) {
if (!typedState->IsValid) {
Builder_.Add(TBlockItem());
@@ -620,8 +627,9 @@ public:
Y_UNUSED(type);
}
- void InitState(void* state) final {
- new(state) TStateType();
+ void InitState(void* ptr) final {
+ TStateType state;
+ WriteUnaligned<TStateType>(ptr, state);
}
void DestroyState(void* state) noexcept final {
@@ -630,18 +638,18 @@ public:
}
void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final {
- auto typedState = static_cast<TStateType*>(state);
+ auto typedState = MakeStateWrapper<TStateType>(state);
Y_UNUSED(batchLength);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
if constexpr (IsScalar) {
Y_ENSURE(datum.is_scalar());
if constexpr (IsNullable) {
if (datum.scalar()->is_valid) {
- typedState->Value = datum.scalar_as<TInScalar>().value;
+ typedState->Value = TIn(Cast(datum.scalar_as<TInScalar>().value));
typedState->IsValid = 1;
}
} else {
- typedState->Value = datum.scalar_as<TInScalar>().value;
+ typedState->Value = TIn(Cast(datum.scalar_as<TInScalar>().value));
}
} else {
const auto& array = datum.array();
@@ -706,7 +714,7 @@ public:
}
NUdf::TUnboxedValue FinishOne(const void* state) final {
- auto typedState = static_cast<const TStateType*>(state);
+ auto typedState = MakeStateWrapper<TStateType>(state);
if constexpr (IsNullable) {
if (!typedState->IsValid) {
return NUdf::TUnboxedValuePod();
@@ -727,11 +735,11 @@ static void PushValueToState(TState<IsNullable, TIn, IsMin>* typedState, const a
Y_ENSURE(datum.is_scalar());
if constexpr (IsNullable) {
if (datum.scalar()->is_valid) {
- typedState->Value = datum.scalar_as<TInScalar>().value;
+ typedState->Value = TIn(Cast(datum.scalar_as<TInScalar>().value));
typedState->IsValid = 1;
}
} else {
- typedState->Value = datum.scalar_as<TInScalar>().value;
+ typedState->Value = TIn(Cast(datum.scalar_as<TInScalar>().value));
}
} else {
const auto &array = datum.array();
@@ -767,7 +775,8 @@ public:
}
void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
- new(state) TStateType();
+ TStateType st;
+ WriteUnaligned<TStateType>(state, st);
UpdateKey(state, batchNum, columns, row);
}
@@ -778,9 +787,9 @@ public:
void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
Y_UNUSED(batchNum);
- auto typedState = static_cast<TStateType*>(state);
+ auto typedState = MakeStateWrapper<TStateType>(state);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
- PushValueToState<IsNullable, IsScalar, TIn, IsMin>(typedState, datum, row);
+ PushValueToState<IsNullable, IsScalar, TIn, IsMin>(typedState.Get(), datum, row);
}
std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) final {
@@ -807,7 +816,8 @@ public:
}
void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
- new(state) TStateType();
+ TStateType st;
+ WriteUnaligned<TStateType>(state, st);
UpdateState(state, batchNum, columns, row);
}
@@ -818,9 +828,9 @@ public:
void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
Y_UNUSED(batchNum);
- auto typedState = static_cast<TStateType*>(state);
+ auto typedState = MakeStateWrapper<TStateType>(state);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
- PushValueToState<IsNullable, IsScalar, TIn, IsMin>(typedState, datum, row);
+ PushValueToState<IsNullable, IsScalar, TIn, IsMin>(typedState.Get(), datum, row);
}
std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final {
@@ -963,6 +973,8 @@ std::unique_ptr<typename TTag::TPreparedAggregator> PrepareMinMax(TTupleType* tu
return PrepareMinMaxFixed<TTag, float, IsMin>(dataType, isOptional, isScalar, filterColumn, argColumn);
case NUdf::EDataSlot::Double:
return PrepareMinMaxFixed<TTag, double, IsMin>(dataType, isOptional, isScalar, filterColumn, argColumn);
+ case NUdf::EDataSlot::Decimal:
+ return PrepareMinMaxFixed<TTag, NYql::NDecimal::TInt128, IsMin>(dataType, isOptional, isScalar, filterColumn, argColumn);
default:
throw yexception() << "Unsupported MIN/MAX input type";
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_state_helper.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_state_helper.h
new file mode 100644
index 00000000000..9234b70e8b5
--- /dev/null
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_state_helper.h
@@ -0,0 +1,75 @@
+#pragma once
+
+#include <util/system/unaligned_mem.h>
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+template <typename T, bool IsConst = std::is_const_v<T>>
+class TStateWrapper;
+
+template <typename T>
+class TStateWrapper<T, true> {
+public:
+ TStateWrapper(const void* ptr)
+ : State_(ReadUnaligned<typename std::remove_const<T>::type>(ptr))
+ { }
+
+ T* Get() {
+ return &State_;
+ }
+
+ T* operator->() {
+ return Get();
+ }
+
+private:
+ T State_;
+};
+
+template <typename T>
+class TStateWrapper<T, false> {
+public:
+ TStateWrapper(void* ptr)
+ : State_(ReadUnaligned<T>(ptr))
+ , Ptr_(ptr)
+ { }
+
+ ~TStateWrapper() {
+ WriteUnaligned<T>(Ptr_, State_);
+ }
+
+ T* Get() {
+ return &State_;
+ }
+
+ T* operator->() {
+ return Get();
+ }
+
+private:
+ T State_;
+ void* Ptr_;
+};
+
+template <typename T>
+inline TStateWrapper<T> MakeStateWrapper(void* ptr) {
+ return TStateWrapper<T>(ptr);
+}
+
+template <typename T>
+inline TStateWrapper<const T> MakeStateWrapper(const void* ptr) {
+ return TStateWrapper<const T>(ptr);
+}
+
+template<typename T>
+inline T Cast(T t) {
+ return t;
+}
+
+inline NYql::NDecimal::TDecimal Cast(const std::shared_ptr<arrow::Buffer>& buffer) {
+ return *reinterpret_cast<const NYql::NDecimal::TDecimal*>(buffer->data());
+}
+
+}
+}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
index 7a663a2f453..6e6eaac0d19 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
@@ -1,7 +1,9 @@
#include "mkql_block_agg_sum.h"
+#include "mkql_block_agg_state_helper.h"
#include <ydb/library/yql/minikql/mkql_node_builder.h>
#include <ydb/library/yql/minikql/mkql_node_cast.h>
+#include <ydb/library/yql/minikql/mkql_node_printer.h>
#include <ydb/library/yql/minikql/computation/mkql_block_builder.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
@@ -22,17 +24,18 @@ struct TSumState;
template<typename TSum>
struct TSumState<true, TSum> {
- TSum Sum_ = 0;
+ typename TPrimitiveDataType<TSum>::TArithmetic Sum_ = 0;
ui8 IsValid_ = 0;
};
template<typename TSum>
struct TSumState<false, TSum> {
- TSum Sum_ = 0;
+ typename TPrimitiveDataType<TSum>::TArithmetic Sum_ = 0;
};
+template<typename TOut>
struct TAvgState {
- double Sum_ = 0;
+ typename TPrimitiveDataType<TOut>::TArithmetic Sum_ = 0;
ui64 Count_ = 0;
};
@@ -48,14 +51,14 @@ public:
}
void Add(const void* state) final {
- auto typedState = static_cast<const TStateType*>(state);
+ auto typedState = MakeStateWrapper<TStateType>(state);
if constexpr (IsNullable) {
if (!typedState->IsValid_) {
Builder_.Add(TBlockItem());
return;
}
}
- Builder_.Add(TBlockItem(typedState->Sum_));
+ Builder_.Add(TBlockItem(TSum(typedState->Sum_)));
}
NUdf::TUnboxedValue Build() final {
@@ -67,6 +70,7 @@ private:
TComputationContext& Ctx_;
};
+template<typename TOut>
class TAvgStateColumnBuilder : public IAggColumnBuilder {
public:
TAvgStateColumnBuilder(ui64 size, TType* outputType, TComputationContext& ctx)
@@ -76,10 +80,10 @@ public:
}
void Add(const void* state) final {
- auto typedState = static_cast<const TAvgState*>(state);
+ auto typedState = MakeStateWrapper<TAvgState<TOut>>(state);
auto tupleBuilder = static_cast<NUdf::TTupleArrayBuilder<true>*>(Builder_.get());
if (typedState->Count_) {
- TBlockItem tupleItems[] = { TBlockItem(typedState->Sum_), TBlockItem(typedState->Count_)} ;
+ TBlockItem tupleItems[] = { TBlockItem(TOut(typedState->Sum_)), TBlockItem(typedState->Count_)} ;
tupleBuilder->Add(TBlockItem(tupleItems));
} else {
tupleBuilder->Add(TBlockItem());
@@ -95,18 +99,19 @@ private:
const std::unique_ptr<IArrayBuilder> Builder_;
};
+template<typename TOut>
class TAvgResultColumnBuilder : public IAggColumnBuilder {
public:
TAvgResultColumnBuilder(ui64 size, TComputationContext& ctx)
: Ctx_(ctx)
- , Builder_(TTypeInfoHelper(), arrow::float64(), ctx.ArrowMemoryPool, size)
+ , Builder_(TTypeInfoHelper(), arrow::TypeTraits<typename TPrimitiveDataType<TOut>::TResult>::type_singleton(), ctx.ArrowMemoryPool, size)
{
}
void Add(const void* state) final {
- auto typedState = static_cast<const TAvgState*>(state);
+ auto typedState = MakeStateWrapper<TAvgState<TOut>>(state);
if (typedState->Count_) {
- Builder_.Add(TBlockItem(typedState->Sum_ / typedState->Count_));
+ Builder_.Add(TBlockItem(TOut(typedState->Sum_ / typedState->Count_)));
} else {
Builder_.Add(TBlockItem());
}
@@ -118,13 +123,13 @@ public:
private:
TComputationContext& Ctx_;
- NYql::NUdf::TFixedSizeArrayBuilder<double, /*Nullable=*/true> Builder_;
+ NYql::NUdf::TFixedSizeArrayBuilder<TOut, /*Nullable=*/true> Builder_;
};
template <typename TTag, bool IsNullable, bool IsScalar, typename TIn, typename TSum>
class TSumBlockAggregator;
-template <typename TTag, typename TIn>
+template <typename TTag, typename TIn, typename TOut>
class TAvgBlockAggregator;
template <bool IsNullable, bool IsScalar, typename TIn, typename TSum>
@@ -142,7 +147,8 @@ public:
}
void InitState(void* state) final {
- new(state) TStateType();
+ TStateType st;
+ WriteUnaligned<TStateType>(state, st);
}
void DestroyState(void* state) noexcept final {
@@ -151,17 +157,17 @@ public:
}
void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final {
- auto typedState = static_cast<TStateType*>(state);
+ auto typedState = MakeStateWrapper<TStateType>(state);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
if constexpr (IsScalar) {
Y_ENSURE(datum.is_scalar());
if constexpr (IsNullable) {
if (datum.scalar()->is_valid) {
- typedState->Sum_ += (filtered ? *filtered : batchLength) * datum.scalar_as<TInScalar>().value;
+ typedState->Sum_ += (filtered ? *filtered : batchLength) * Cast(datum.scalar_as<TInScalar>().value);
typedState->IsValid_ = 1;
}
} else {
- typedState->Sum_ += (filtered ? *filtered : batchLength) * datum.scalar_as<TInScalar>().value;
+ typedState->Sum_ += (filtered ? *filtered : batchLength) * Cast(datum.scalar_as<TInScalar>().value);
}
} else {
const auto& array = datum.array();
@@ -177,7 +183,7 @@ public:
if constexpr (IsNullable) {
typedState->IsValid_ = 1;
}
- TSum sum = typedState->Sum_;
+ auto sum = typedState->Sum_;
if (IsNullable && nullCount != 0) {
auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
for (int64_t i = 0; i < len; ++i) {
@@ -197,7 +203,7 @@ public:
const auto& filterArray = filterDatum.array();
MKQL_ENSURE(filterArray->GetNullCount() == 0, "Expected non-nullable bool column");
const ui8* filterBitmap = filterArray->template GetValues<uint8_t>(1);
- TSum sum = typedState->Sum_;
+ auto sum = typedState->Sum_;
if (IsNullable && nullCount != 0) {
ui64 count = 0;
auto nullBitmapPtr = array->template GetValues<uint8_t>(0, 0);
@@ -226,13 +232,13 @@ public:
}
NUdf::TUnboxedValue FinishOne(const void* state) final {
- auto typedState = static_cast<const TStateType*>(state);
+ auto typedState = MakeStateWrapper<TStateType>(state);
if constexpr (IsNullable) {
if (!typedState->IsValid_) {
return NUdf::TUnboxedValuePod();
}
}
- return NUdf::TUnboxedValuePod(typedState->Sum_);
+ return NUdf::TUnboxedValuePod(TSum(typedState->Sum_));
}
private:
@@ -246,11 +252,11 @@ void PushValueToState(TSumState<IsNullable, TSum>* typedState, const arrow::Datu
Y_ENSURE(datum.is_scalar());
if constexpr (IsNullable) {
if (datum.scalar()->is_valid) {
- typedState->Sum_ += datum.scalar_as<TInScalar>().value;
+ typedState->Sum_ += Cast(datum.scalar_as<TInScalar>().value);
typedState->IsValid_ = 1;
}
} else {
- typedState->Sum_ += datum.scalar_as<TInScalar>().value;
+ typedState->Sum_ += Cast(datum.scalar_as<TInScalar>().value);
}
} else {
const auto& array = datum.array();
@@ -286,7 +292,8 @@ public:
}
void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
- new(state) TStateType();
+ TStateType st;
+ WriteUnaligned<TStateType>(state, st);
UpdateKey(state, batchNum, columns, row);
}
@@ -297,9 +304,9 @@ public:
void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
Y_UNUSED(batchNum);
- auto typedState = static_cast<TStateType*>(state);
+ auto typedState = MakeStateWrapper<TStateType>(state);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
- PushValueToState<IsNullable, IsScalar, TIn, TSum>(typedState, datum, row);
+ PushValueToState<IsNullable, IsScalar, TIn, TSum>(typedState.Get(), datum, row);
}
std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) final {
@@ -325,7 +332,8 @@ public:
}
void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
- new(state) TStateType();
+ TStateType st;
+ WriteUnaligned<TStateType>(state, st);
UpdateState(state, batchNum, columns, row);
}
@@ -336,9 +344,9 @@ public:
void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
Y_UNUSED(batchNum);
- auto typedState = static_cast<TStateType*>(state);
+ auto typedState = MakeStateWrapper<TStateType>(state);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
- PushValueToState<IsNullable, IsScalar, TIn, TSum>(typedState, datum, row);
+ PushValueToState<IsNullable, IsScalar, TIn, TSum>(typedState.Get(), datum, row);
}
std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final {
@@ -350,34 +358,35 @@ private:
TType* const DataType_;
};
-template<typename TIn>
-class TAvgBlockAggregator<TCombineAllTag, TIn> : public TCombineAllTag::TBase {
+template<typename TIn, typename TOut>
+class TAvgBlockAggregator<TCombineAllTag, TIn, TOut> : public TCombineAllTag::TBase {
public:
using TBase = TCombineAllTag::TBase;
using TInScalar = typename TPrimitiveDataType<TIn>::TScalarResult;
TAvgBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn, TType* outputType, TComputationContext& ctx)
- : TBase(sizeof(TAvgState), filterColumn, ctx)
+ : TBase(sizeof(TAvgState<TOut>), filterColumn, ctx)
, ArgColumn_(argColumn)
{
Y_UNUSED(outputType);
}
void InitState(void* state) final {
- new(state) TAvgState();
+ TAvgState<TOut> st;
+ WriteUnaligned<TAvgState<TOut>>(state, st);
}
void DestroyState(void* state) noexcept final {
- static_assert(std::is_trivially_destructible<TAvgState>::value);
+ static_assert(std::is_trivially_destructible<TAvgState<TOut>>::value);
Y_UNUSED(state);
}
void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final {
- auto typedState = static_cast<TAvgState*>(state);
+ auto typedState = MakeStateWrapper<TAvgState<TOut>>(state);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
if (datum.is_scalar()) {
if (datum.scalar()->is_valid) {
- typedState->Sum_ += double((filtered ? *filtered : batchLength) * datum.scalar_as<TInScalar>().value);
+ typedState->Sum_ += (filtered ? *filtered : batchLength) * Cast(datum.scalar_as<TInScalar>().value);
typedState->Count_ += batchLength;
}
} else {
@@ -391,10 +400,10 @@ public:
if (!filtered) {
typedState->Count_ += count;
- double sum = typedState->Sum_;
+ auto sum = typedState->Sum_;
if (array->GetNullCount() == 0) {
for (int64_t i = 0; i < len; ++i) {
- sum += double(ptr[i]);
+ sum += ptr[i];
}
} else {
auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
@@ -402,7 +411,7 @@ public:
ui64 fullIndex = i + array->offset;
// bit 1 -> mask 0xFF..FF, bit 0 -> mask 0x00..00
ui8 notNull = (nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1;
- sum += double(SelectArg<TIn>(notNull, ptr[i], 0));
+ sum += SelectArg<TIn>(notNull, ptr[i], 0);
}
}
@@ -413,12 +422,12 @@ public:
MKQL_ENSURE(filterArray->GetNullCount() == 0, "Expected non-nullable bool column");
const ui8* filterBitmap = filterArray->template GetValues<uint8_t>(1);
- double sum = typedState->Sum_;
+ auto sum = typedState->Sum_;
ui64 count = typedState->Count_;
if (array->GetNullCount() == 0) {
for (int64_t i = 0; i < len; ++i) {
ui8 filtered = filterBitmap[i];
- sum += double(SelectArg<TIn>(filterBitmap[i], ptr[i], 0));
+ sum += SelectArg<TIn>(filterBitmap[i], ptr[i], 0);
count += filtered;
}
} else {
@@ -426,7 +435,7 @@ public:
for (int64_t i = 0; i < len; ++i) {
ui64 fullIndex = i + array->offset;
ui8 notNullAndFiltered = ((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1) & filterBitmap[i];
- sum += double(SelectArg<TIn>(notNullAndFiltered, ptr[i], 0));
+ sum += SelectArg<TIn>(notNullAndFiltered, ptr[i], 0);
count += notNullAndFiltered;
}
}
@@ -438,14 +447,14 @@ public:
}
NUdf::TUnboxedValue FinishOne(const void* state) final {
- auto typedState = static_cast<const TAvgState*>(state);
+ auto typedState = MakeStateWrapper<TAvgState<TOut>>(state);
if (!typedState->Count_) {
return NUdf::TUnboxedValuePod();
}
NUdf::TUnboxedValue* items;
auto arr = Ctx_.HolderFactory.CreateDirectArrayHolder(2, items);
- items[0] = NUdf::TUnboxedValuePod(typedState->Sum_);
+ items[0] = NUdf::TUnboxedValuePod(TOut(typedState->Sum_));
items[1] = NUdf::TUnboxedValuePod(typedState->Count_);
return arr;
}
@@ -454,56 +463,57 @@ private:
ui32 ArgColumn_;
};
-template <typename TIn>
-class TAvgBlockAggregator<TCombineKeysTag, TIn> : public TCombineKeysTag::TBase {
+template <typename TIn, typename TOut>
+class TAvgBlockAggregator<TCombineKeysTag, TIn, TOut> : public TCombineKeysTag::TBase {
public:
using TBase = TCombineKeysTag::TBase;
using TInScalar = typename TPrimitiveDataType<TIn>::TScalarResult;
TAvgBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn, TType* outputType, TComputationContext& ctx)
- : TBase(sizeof(TAvgState), filterColumn, ctx)
+ : TBase(sizeof(TAvgState<TOut>), filterColumn, ctx)
, ArgColumn_(argColumn)
, OutputType_(outputType)
{
}
void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
- new(state) TAvgState();
+ TAvgState<TOut> st;
+ WriteUnaligned<TAvgState<TOut>>(state, st);
UpdateKey(state, batchNum, columns, row);
}
void DestroyState(void* state) noexcept final {
- static_assert(std::is_trivially_destructible<TAvgState>::value);
+ static_assert(std::is_trivially_destructible<TAvgState<TOut>>::value);
Y_UNUSED(state);
}
void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
Y_UNUSED(batchNum);
- auto typedState = static_cast<TAvgState*>(state);
+ auto typedState = MakeStateWrapper<TAvgState<TOut>>(state);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
if (datum.is_scalar()) {
if (datum.scalar()->is_valid) {
- typedState->Sum_ += double(datum.scalar_as<TInScalar>().value);
+ typedState->Sum_ += Cast(datum.scalar_as<TInScalar>().value);
typedState->Count_ += 1;
}
} else {
const auto& array = datum.array();
auto ptr = array->GetValues<TIn>(1);
if (array->GetNullCount() == 0) {
- typedState->Sum_ += double(ptr[row]);
+ typedState->Sum_ += ptr[row];
typedState->Count_ += 1;
} else {
auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
ui64 fullIndex = row + array->offset;
ui8 notNull = (nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1;
- typedState->Sum_ += double(SelectArg<TIn>(notNull, ptr[row], 0));
+ typedState->Sum_ += SelectArg<TIn>(notNull, ptr[row], 0);
typedState->Count_ += notNull;
}
}
}
std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) final {
- return std::make_unique<TAvgStateColumnBuilder>(size, OutputType_, Ctx_);
+ return std::make_unique<TAvgStateColumnBuilder<TOut>>(size, OutputType_, Ctx_);
}
private:
@@ -511,40 +521,43 @@ private:
TType* const OutputType_;
};
+template<typename TOut>
class TAvgBlockAggregatorOverState : public TFinalizeKeysTag::TBase {
public:
using TBase = TFinalizeKeysTag::TBase;
+ using TInScalar = typename TPrimitiveDataType<TOut>::TScalarResult;
TAvgBlockAggregatorOverState(ui32 argColumn, TComputationContext& ctx)
- : TBase(sizeof(TAvgState), {}, ctx)
+ : TBase(sizeof(TAvgState<TOut>), {}, ctx)
, ArgColumn_(argColumn)
{
}
void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
- new(state) TAvgState();
+ TAvgState<TOut> st;
+ WriteUnaligned<TAvgState<TOut>>(state, st);
UpdateState(state, batchNum, columns, row);
}
void DestroyState(void* state) noexcept final {
- static_assert(std::is_trivially_destructible<TAvgState>::value);
+ static_assert(std::is_trivially_destructible<TAvgState<TOut>>::value);
Y_UNUSED(state);
}
void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
Y_UNUSED(batchNum);
- auto typedState = static_cast<TAvgState*>(state);
+ auto typedState = MakeStateWrapper<TAvgState<TOut>>(state);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
if (datum.is_scalar()) {
if (datum.scalar()->is_valid) {
const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(*datum.scalar());
- typedState->Sum_ += arrow::internal::checked_cast<const arrow::DoubleScalar&>(*structScalar.value[0]).value;
+ typedState->Sum_ += Cast(arrow::internal::checked_cast<const TInScalar&>(*structScalar.value[0]).value);
typedState->Count_ += arrow::internal::checked_cast<const arrow::UInt64Scalar&>(*structScalar.value[1]).value;
}
} else {
const auto& array = datum.array();
- auto sumPtr = array->child_data[0]->GetValues<double>(1);
+ auto sumPtr = array->child_data[0]->GetValues<TOut>(1);
auto countPtr = array->child_data[1]->GetValues<ui64>(1);
if (array->GetNullCount() == 0) {
typedState->Sum_ += sumPtr[row];
@@ -562,7 +575,7 @@ public:
}
std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final {
- return std::make_unique<TAvgResultColumnBuilder>(size, Ctx_);
+ return std::make_unique<TAvgResultColumnBuilder<TOut>>(size, Ctx_);
}
private:
@@ -621,6 +634,12 @@ std::unique_ptr<typename TTag::TPreparedAggregator> PrepareSum(TTupleType* tuple
sumRetType = TDataType::Create(NUdf::TDataType<i64>::Id, env);
} else if (typeInfo.Features & NYql::NUdf::EDataTypeFeatures::UnsignedIntegralType) {
sumRetType = TDataType::Create(NUdf::TDataType<ui64>::Id, env);
+ } else if (*dataType->GetDataSlot() == NUdf::EDataSlot::Decimal) {
+ auto decimalType = static_cast<TDataDecimalType*>(dataType);
+ auto [_, scale] = decimalType->GetParams();
+ sumRetType = TDataDecimalType::Create(NYql::NDecimal::MaxPrecision, scale, env);
+ } else if (*dataType->GetDataSlot() == NUdf::EDataSlot::Interval) {
+ sumRetType = TDataDecimalType::Create(NYql::NDecimal::MaxPrecision, 0, env);
} else {
Y_ENSURE(typeInfo.Features & NYql::NUdf::EDataTypeFeatures::FloatType);
sumRetType = dataType;
@@ -649,6 +668,10 @@ std::unique_ptr<typename TTag::TPreparedAggregator> PrepareSum(TTupleType* tuple
return PrepareSumFixed<TTag, float, float>(sumRetType, isOptional, isScalar, filterColumn, argColumn);
case NUdf::EDataSlot::Double:
return PrepareSumFixed<TTag, double, double>(sumRetType, isOptional, isScalar, filterColumn, argColumn);
+ case NUdf::EDataSlot::Interval:
+ return PrepareSumFixed<TTag, i64, NYql::NDecimal::TInt128>(sumRetType, isOptional, isScalar, filterColumn, argColumn);
+ case NUdf::EDataSlot::Decimal:
+ return PrepareSumFixed<TTag, NYql::NDecimal::TInt128, NYql::NDecimal::TInt128>(sumRetType, isOptional, isScalar, filterColumn, argColumn);
default:
throw yexception() << "Unsupported SUM input type";
}
@@ -684,20 +707,20 @@ public:
}
};
-template <typename TTag, typename TIn>
+template <typename TTag, typename TIn, typename TOut>
class TPreparedAvgBlockAggregator : public TTag::TPreparedAggregator {
public:
using TBase = typename TTag::TPreparedAggregator;
TPreparedAvgBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn, TType* outputType)
- : TBase(sizeof(TAvgState))
+ : TBase(sizeof(TAvgState<TOut>))
, FilterColumn_(filterColumn)
, ArgColumn_(argColumn)
, OutputType_(outputType)
{}
std::unique_ptr<typename TTag::TAggregator> Make(TComputationContext& ctx) const final {
- return std::make_unique<TAvgBlockAggregator<TTag, TIn>>(FilterColumn_, ArgColumn_, OutputType_, ctx);
+ return std::make_unique<TAvgBlockAggregator<TTag, TIn, TOut>>(FilterColumn_, ArgColumn_, OutputType_, ctx);
}
private:
@@ -706,17 +729,18 @@ private:
TType* const OutputType_;
};
+template<typename TOut>
class TPreparedAvgBlockAggregatorOverState : public TFinalizeKeysTag::TPreparedAggregator {
public:
using TBase = TFinalizeKeysTag::TPreparedAggregator;
TPreparedAvgBlockAggregatorOverState(ui32 argColumn)
- : TBase(sizeof(TAvgState))
+ : TBase(sizeof(TAvgState<TOut>))
, ArgColumn_(argColumn)
{}
std::unique_ptr<typename TFinalizeKeysTag::TAggregator> Make(TComputationContext& ctx) const final {
- return std::make_unique<TAvgBlockAggregatorOverState>(ArgColumn_, ctx);
+ return std::make_unique<TAvgBlockAggregatorOverState<TOut>>(ArgColumn_, ctx);
}
private:
@@ -738,26 +762,39 @@ std::unique_ptr<typename TTag::TPreparedAggregator> PrepareAvgOverInput(TTupleTy
auto dataType = UnpackOptionalData(argType, isOptional);
switch (*dataType->GetDataSlot()) {
case NUdf::EDataSlot::Int8:
- return std::make_unique<TPreparedAvgBlockAggregator<TTag, i8>>(filterColumn, argColumn, avgRetType);
+ return std::make_unique<TPreparedAvgBlockAggregator<TTag, i8, double>>(filterColumn, argColumn, avgRetType);
case NUdf::EDataSlot::Uint8:
case NUdf::EDataSlot::Bool:
- return std::make_unique<TPreparedAvgBlockAggregator<TTag, ui8>>(filterColumn, argColumn, avgRetType);
+ return std::make_unique<TPreparedAvgBlockAggregator<TTag, ui8, double>>(filterColumn, argColumn, avgRetType);
case NUdf::EDataSlot::Int16:
- return std::make_unique<TPreparedAvgBlockAggregator<TTag, i16>>(filterColumn, argColumn, avgRetType);
+ return std::make_unique<TPreparedAvgBlockAggregator<TTag, i16, double>>(filterColumn, argColumn, avgRetType);
case NUdf::EDataSlot::Uint16:
- return std::make_unique<TPreparedAvgBlockAggregator<TTag, ui16>>(filterColumn, argColumn, avgRetType);
+ return std::make_unique<TPreparedAvgBlockAggregator<TTag, ui16, double>>(filterColumn, argColumn, avgRetType);
case NUdf::EDataSlot::Int32:
- return std::make_unique<TPreparedAvgBlockAggregator<TTag, i32>>(filterColumn, argColumn, avgRetType);
+ return std::make_unique<TPreparedAvgBlockAggregator<TTag, i32, double>>(filterColumn, argColumn, avgRetType);
case NUdf::EDataSlot::Uint32:
- return std::make_unique<TPreparedAvgBlockAggregator<TTag, ui32>>(filterColumn, argColumn, avgRetType);
+ return std::make_unique<TPreparedAvgBlockAggregator<TTag, ui32, double>>(filterColumn, argColumn, avgRetType);
case NUdf::EDataSlot::Int64:
- return std::make_unique<TPreparedAvgBlockAggregator<TTag, i64>>(filterColumn, argColumn, avgRetType);
+ return std::make_unique<TPreparedAvgBlockAggregator<TTag, i64, double>>(filterColumn, argColumn, avgRetType);
case NUdf::EDataSlot::Uint64:
- return std::make_unique<TPreparedAvgBlockAggregator<TTag, ui64>>(filterColumn, argColumn, avgRetType);
+ return std::make_unique<TPreparedAvgBlockAggregator<TTag, ui64, double>>(filterColumn, argColumn, avgRetType);
case NUdf::EDataSlot::Float:
- return std::make_unique<TPreparedAvgBlockAggregator<TTag, float>>(filterColumn, argColumn, avgRetType);
+ return std::make_unique<TPreparedAvgBlockAggregator<TTag, float, double>>(filterColumn, argColumn, avgRetType);
case NUdf::EDataSlot::Double:
- return std::make_unique<TPreparedAvgBlockAggregator<TTag, double>>(filterColumn, argColumn, avgRetType);
+ return std::make_unique<TPreparedAvgBlockAggregator<TTag, double, double>>(filterColumn, argColumn, avgRetType);
+ case NUdf::EDataSlot::Interval: {
+ auto decimalType = TDataDecimalType::Create(NYql::NDecimal::MaxPrecision, 0, env);
+ TVector<TType*> tupleDecimalElements = { decimalType, ui64Type };
+ auto avgRetDecimalType = TOptionalType::Create(TTupleType::Create(2, tupleDecimalElements.data(), env), env);
+ return std::make_unique<TPreparedAvgBlockAggregator<TTag, i64, NYql::NDecimal::TInt128>>(filterColumn, argColumn, avgRetDecimalType);
+ }
+ case NUdf::EDataSlot::Decimal: {
+ auto [precision, scale] = static_cast<TDataDecimalType*>(dataType)->GetParams();
+ auto decimalType = TDataDecimalType::Create(precision, scale, env);
+ TVector<TType*> tupleDecimalElements = { decimalType, ui64Type };
+ auto avgRetDecimalType = TOptionalType::Create(TTupleType::Create(2, tupleDecimalElements.data(), env), env);
+ return std::make_unique<TPreparedAvgBlockAggregator<TTag, NYql::NDecimal::TInt128, NYql::NDecimal::TInt128>>(filterColumn, argColumn, avgRetDecimalType);
+ }
default:
throw yexception() << "Unsupported AVG input type";
}
@@ -775,10 +812,24 @@ std::unique_ptr<typename TCombineKeysTag::TPreparedAggregator> PrepareAvg<TCombi
template <>
std::unique_ptr<typename TFinalizeKeysTag::TPreparedAggregator> PrepareAvg<TFinalizeKeysTag>(TTupleType* tupleType, std::optional<ui32> filterColumn, ui32 argColumn, const TTypeEnvironment& env) {
- Y_UNUSED(tupleType);
Y_UNUSED(filterColumn);
Y_UNUSED(env);
- return std::make_unique<TPreparedAvgBlockAggregatorOverState>(argColumn);
+
+ auto argType = AS_TYPE(TBlockType, tupleType->GetElementType(argColumn))->GetItemType();
+ bool isOptional;
+ auto aggTupleType = UnpackOptional(argType, isOptional);
+ MKQL_ENSURE(aggTupleType->IsTuple(),
+ "Expected tuple or optional of tuple, actual: " << PrintNode(argType, true));
+ auto dataType = UnpackOptionalData(AS_TYPE(TTupleType, aggTupleType)->GetElementType(0), isOptional);
+
+ switch (*dataType->GetDataSlot()) {
+ case NUdf::EDataSlot::Decimal:
+ return std::make_unique<TPreparedAvgBlockAggregatorOverState<NYql::NDecimal::TInt128>>(argColumn);
+ case NUdf::EDataSlot::Double:
+ return std::make_unique<TPreparedAvgBlockAggregatorOverState<double>>(argColumn);
+ default:
+ throw yexception() << "Unsupported Finalize input type";
+ }
}
class TBlockAvgFactory : public IBlockAggregatorFactory {
diff --git a/ydb/library/yql/minikql/computation/mkql_block_impl.cpp b/ydb/library/yql/minikql/computation/mkql_block_impl.cpp
index 933990c414c..a46bc9db1c8 100644
--- a/ydb/library/yql/minikql/computation/mkql_block_impl.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_block_impl.cpp
@@ -167,6 +167,11 @@ arrow::Datum DoConvertScalar(TType* type, const T& value, arrow::MemoryPool& poo
return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzTimestamp64>()));
}
+ case NUdf::EDataSlot::Decimal: {
+ std::shared_ptr<arrow::Buffer> buffer(ARROW_RESULT(arrow::AllocateBuffer(16, &pool)));
+ *reinterpret_cast<NYql::NDecimal::TInt128*>(buffer->mutable_data()) = value.GetInt128();
+ return arrow::Datum(std::make_shared<TPrimitiveDataType<NYql::NDecimal::TInt128>::TScalarResult>(buffer));
+ }
default:
MKQL_ENSURE(false, "Unsupported data slot " << slot);
}
diff --git a/ydb/library/yql/minikql/computation/mkql_block_reader.cpp b/ydb/library/yql/minikql/computation/mkql_block_reader.cpp
index ee2864cc231..56a82ab1949 100644
--- a/ydb/library/yql/minikql/computation/mkql_block_reader.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_block_reader.cpp
@@ -40,6 +40,29 @@ public:
};
template <bool Nullable>
+class TFixedSizeBlockItemConverter<NYql::NDecimal::TInt128, Nullable> : public IBlockItemConverter {
+public:
+ NUdf::TUnboxedValuePod MakeValue(TBlockItem item, const THolderFactory& holderFactory) const final {
+ Y_UNUSED(holderFactory);
+ if constexpr (Nullable) {
+ if (!item) {
+ return {};
+ }
+ }
+ return NUdf::TUnboxedValuePod(item.GetInt128());
+ }
+
+ TBlockItem MakeItem(const NUdf::TUnboxedValuePod& value) const final {
+ if constexpr (Nullable) {
+ if (!value) {
+ return {};
+ }
+ }
+ return TBlockItem(value.GetInt128());
+ }
+};
+
+template <bool Nullable>
class TResourceBlockItemConverter : public IBlockItemConverter {
public:
NUdf::TUnboxedValuePod MakeValue(TBlockItem item, const THolderFactory& holderFactory) const final {
diff --git a/ydb/library/yql/minikql/mkql_type_builder.cpp b/ydb/library/yql/minikql/mkql_type_builder.cpp
index ae2b84c2d09..8fc007e9c82 100644
--- a/ydb/library/yql/minikql/mkql_type_builder.cpp
+++ b/ydb/library/yql/minikql/mkql_type_builder.cpp
@@ -1508,7 +1508,8 @@ bool ConvertArrowType(NUdf::EDataSlot slot, std::shared_ptr<arrow::DataType>& ty
return false;
}
case NUdf::EDataSlot::Decimal: {
- return false;
+ type = arrow::fixed_size_binary(sizeof(NYql::NUdf::TUnboxedValuePod));
+ return true;
}
case NUdf::EDataSlot::DyNumber: {
return false;
@@ -2487,7 +2488,7 @@ size_t CalcMaxBlockItemSize(const TType* type) {
MKQL_ENSURE(false, "Unsupported data slot: " << slot);
}
case NUdf::EDataSlot::Decimal: {
- MKQL_ENSURE(false, "Unsupported data slot: " << slot);
+ return sizeof(NYql::NDecimal::TInt128);
}
case NUdf::EDataSlot::DyNumber: {
MKQL_ENSURE(false, "Unsupported data slot: " << slot);
diff --git a/ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.cpp b/ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.cpp
index a3fada9666a..d39699e1a8b 100644
--- a/ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.cpp
@@ -561,6 +561,24 @@ TColumnConverter BuildCustomConverter(const std::shared_ptr<arrow::DataType>& or
return {};
}
}
+ case arrow::Type::DECIMAL128: {
+ switch (slotItem) {
+ case NUdf::EDataSlot::Decimal: {
+ if (targetType->id() == arrow::Type::FIXED_SIZE_BINARY &&
+ (static_cast<arrow::FixedSizeBinaryType&>(*targetType)).byte_width() == 16
+ ) {
+ return [](const std::shared_ptr<arrow::Array>& value) {
+ auto decimals = std::static_pointer_cast<arrow::Decimal128Array>(value);
+ auto output = std::make_shared<arrow::FixedSizeBinaryArray>(arrow::fixed_size_binary(16), decimals->length(), decimals->values());
+ return output;
+ };
+ }
+ return {};
+ }
+ default:
+ return {};
+ }
+ }
default:
return {};
}
diff --git a/ydb/library/yql/public/decimal/yql_decimal.cpp b/ydb/library/yql/public/decimal/yql_decimal.cpp
index 89fc92cd49b..d444557c42e 100644
--- a/ydb/library/yql/public/decimal/yql_decimal.cpp
+++ b/ydb/library/yql/public/decimal/yql_decimal.cpp
@@ -139,7 +139,7 @@ TInt128 FromString(const TStringBuf& str, ui8 precision, ui8 scale) {
if (IsInf(s))
return neg ? -Inf() : Inf();
if (IsNan(s))
- return neg ? -Nan() : Nan();
+ return Nan();
}
TUint128 v = 0U;
diff --git a/ydb/library/yql/public/decimal/yql_decimal.h b/ydb/library/yql/public/decimal/yql_decimal.h
index 20695f0d84b..59471b9e3f1 100644
--- a/ydb/library/yql/public/decimal/yql_decimal.h
+++ b/ydb/library/yql/public/decimal/yql_decimal.h
@@ -155,5 +155,61 @@ TInt128 MulAndDivNormalDivider(TInt128 a, TInt128 b, TInt128 c);
// a*b/c Only for non zero normal positive multiplier.
TInt128 MulAndDivNormalMultiplier(TInt128 a, TInt128 b, TInt128 c);
+struct TDecimal {
+ TInt128 Value = 0;
+
+ TDecimal() = default;
+
+ template<typename T>
+ TDecimal(T t): Value(t) { }
+
+ explicit operator TInt128() const {
+ return Value;
+ }
+
+ TDecimal& operator+=(TDecimal right) {
+ const auto l = Value;
+ const auto r = right.Value;
+ const auto a = l + r;
+ if (IsNormal(l) && IsNormal(r) && IsNormal(a)) {
+ Value = a;
+ } else if (IsNan(l) || IsNan(r) || !a /* inf - inf*/) {
+ Value = Nan();
+ } else {
+ Value = a > 0
+ ? +Inf()
+ : -Inf();
+ }
+ return *this;
+ }
+
+ TDecimal& operator*=(TDecimal right) {
+ Value = Mul(Value, right.Value);
+ return *this;
+ }
+
+ TDecimal& operator/=(TDecimal right) {
+ Value = Div(Value, right.Value);
+ return *this;
+ }
+
+ // TODO: implement '-' and '%'
+
+ friend TDecimal operator+(TDecimal left, TDecimal right) {
+ left += right;
+ return left;
+ }
+
+ friend TDecimal operator*(TDecimal left, TDecimal right) {
+ left *= right;
+ return left;
+ }
+
+ friend TDecimal operator/(TDecimal left, TDecimal right) {
+ left /= right;
+ return left;
+ }
+};
+
}
}
diff --git a/ydb/library/yql/public/udf/arrow/block_builder.h b/ydb/library/yql/public/udf/arrow/block_builder.h
index 2f46532de71..ffaaea37314 100644
--- a/ydb/library/yql/public/udf/arrow/block_builder.h
+++ b/ydb/library/yql/public/udf/arrow/block_builder.h
@@ -546,6 +546,37 @@ public:
};
template<bool Nullable>
+class TFixedSizeArrayBuilder<NYql::NDecimal::TInt128, Nullable> final: public TFixedSizeArrayBuilderBase<NYql::NDecimal::TInt128, Nullable, TFixedSizeArrayBuilder<NYql::NDecimal::TInt128, Nullable>> {
+ using TSelf = TFixedSizeArrayBuilder<NYql::NDecimal::TInt128, Nullable>;
+ using TBase = TFixedSizeArrayBuilderBase<NYql::NDecimal::TInt128, Nullable, TSelf>;
+
+public:
+ TFixedSizeArrayBuilder(const ITypeInfoHelper& typeInfoHelper, std::shared_ptr<arrow::DataType> arrowType, arrow::MemoryPool& pool, size_t maxLen, size_t* totalAllocated = nullptr)
+ : TBase(typeInfoHelper, std::move(arrowType), pool, maxLen, totalAllocated)
+ {}
+
+ TFixedSizeArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, size_t* totalAllocated = nullptr)
+ : TBase(typeInfoHelper, type, pool, maxLen, totalAllocated)
+ {}
+
+ void DoAddNotNull(TUnboxedValuePod value) {
+ this->PlaceItem(value.GetInt128());
+ }
+
+ void DoAddNotNull(TBlockItem value) {
+ this->PlaceItem(value.GetInt128());
+ }
+
+ void DoAddNotNull(TInputBuffer& input) {
+ this->DoAdd(TBlockItem(input.PopNumber<NYql::NDecimal::TInt128>()));
+ }
+
+ void DoAddNotNull(TBlockItem value, size_t count) {
+ std::fill(this->DataPtr + this->GetCurrLen(), this->DataPtr + this->GetCurrLen() + count, value.GetInt128());
+ }
+};
+
+template<bool Nullable>
class TResourceArrayBuilder final: public TFixedSizeArrayBuilderBase<TUnboxedValue, Nullable, TResourceArrayBuilder<Nullable>> {
public:
TResourceArrayBuilder(const ITypeInfoHelper& typeInfoHelper, std::shared_ptr<arrow::DataType> arrowType, arrow::MemoryPool& pool, size_t maxLen, size_t* totalAllocated = nullptr)
@@ -1422,6 +1453,8 @@ inline std::unique_ptr<TArrayBuilderBase> MakeArrayBuilderImpl(
return std::make_unique<TTzDateArrayBuilder<TTzDatetime64, Nullable>>(typeInfoHelper, type, pool, maxLen);
case NUdf::EDataSlot::TzTimestamp64:
return std::make_unique<TTzDateArrayBuilder<TTzTimestamp64, Nullable>>(typeInfoHelper, type, pool, maxLen);
+ case NUdf::EDataSlot::Decimal:
+ return std::make_unique<TFixedSizeArrayBuilder<NYql::NDecimal::TInt128, Nullable>>(typeInfoHelper, type, pool, maxLen, totalAllocated);
default:
Y_ENSURE(false, "Unsupported data slot");
}
diff --git a/ydb/library/yql/public/udf/arrow/block_io_buffer.h b/ydb/library/yql/public/udf/arrow/block_io_buffer.h
index 8cf543eae49..c1a8175adea 100644
--- a/ydb/library/yql/public/udf/arrow/block_io_buffer.h
+++ b/ydb/library/yql/public/udf/arrow/block_io_buffer.h
@@ -2,6 +2,7 @@
#include <util/generic/strbuf.h>
#include <util/generic/vector.h>
+#include <util/system/unaligned_mem.h>
namespace NYql {
namespace NUdf {
@@ -22,7 +23,7 @@ public:
template <typename T>
T PopNumber() {
Ensure(sizeof(T));
- T t = *(const T*)(Buf_.Data() + Pos_);
+ T t = ReadUnaligned<T>(Buf_.Data() + Pos_);
Pos_ += sizeof(T);
return t;
}
@@ -56,7 +57,7 @@ public:
template <typename T>
void PushNumber(T t) {
Ensure(sizeof(T));
- *(T*)&Vec_[Pos_] = t;
+ WriteUnaligned<T>(Vec_.data() + Pos_, t);
Pos_ += sizeof(T);
}
diff --git a/ydb/library/yql/public/udf/arrow/block_item.h b/ydb/library/yql/public/udf/arrow/block_item.h
index f04a25666b1..4dd42495fe7 100644
--- a/ydb/library/yql/public/udf/arrow/block_item.h
+++ b/ydb/library/yql/public/udf/arrow/block_item.h
@@ -22,6 +22,11 @@ public:
template <typename T, typename = std::enable_if_t<TPrimitiveDataType<T>::Result>>
inline explicit TBlockItem(T value);
+
+ inline explicit TBlockItem(NYql::NDecimal::TInt128 value) {
+ *reinterpret_cast<NYql::NDecimal::TInt128*>(&Raw) = value;
+ Raw.Simple.Meta = static_cast<ui8>(EMarkers::Embedded);
+ }
inline explicit TBlockItem(IBoxedValuePtr&& value) {
Raw.Resource.Meta = static_cast<ui8>(EMarkers::Boxed);
@@ -74,6 +79,14 @@ public:
template <typename T, typename = std::enable_if_t<TPrimitiveDataType<T>::Result>>
inline T Get() const;
+ inline NYql::NDecimal::TInt128 GetInt128() const {
+ Y_DEBUG_ABORT_UNLESS(GetMarkers() == EMarkers::Embedded);
+ auto v = *reinterpret_cast<const NYql::NDecimal::TInt128*>(&Raw);
+ const auto p = reinterpret_cast<ui8*>(&v);
+ p[0xF] = (p[0xE] & 0x80) ? 0xFF : 0x00;
+ return v;
+ }
+
// TODO: deprecate AsTuple() in favor of GetElements()
inline const TBlockItem* AsTuple() const {
Y_DEBUG_ABORT_UNLESS(GetMarkers() == EMarkers::Embedded);
diff --git a/ydb/library/yql/public/udf/arrow/block_item_comparator.h b/ydb/library/yql/public/udf/arrow/block_item_comparator.h
index 3431932d925..ca7655f8813 100644
--- a/ydb/library/yql/public/udf/arrow/block_item_comparator.h
+++ b/ydb/library/yql/public/udf/arrow/block_item_comparator.h
@@ -133,6 +133,28 @@ public:
}
};
+template <bool Nullable>
+class TFixedSizeBlockItemComparator<NYql::NDecimal::TInt128, Nullable> : public TBlockItemComparatorBase<TFixedSizeBlockItemComparator<NYql::NDecimal::TInt128, Nullable>, Nullable> {
+public:
+ i64 DoCompare(TBlockItem lhs, TBlockItem rhs) const {
+ auto l = lhs.GetInt128();
+ auto r = rhs.GetInt128();
+ return (l > r) - (l < r);
+ }
+
+ bool DoEquals(TBlockItem lhs, TBlockItem rhs) const {
+ auto l = lhs.GetInt128();
+ auto r = rhs.GetInt128();
+ return l == r;
+ }
+
+ bool DoLess(TBlockItem lhs, TBlockItem rhs) const {
+ auto l = lhs.GetInt128();
+ auto r = rhs.GetInt128();
+ return l < r;
+ }
+};
+
template <typename TStringType, bool Nullable>
class TStringBlockItemComparator : public TBlockItemComparatorBase<TStringBlockItemComparator<TStringType, Nullable>, Nullable> {
public:
diff --git a/ydb/library/yql/public/udf/arrow/block_item_hasher.h b/ydb/library/yql/public/udf/arrow/block_item_hasher.h
index a173e7940f2..1904cac19d3 100644
--- a/ydb/library/yql/public/udf/arrow/block_item_hasher.h
+++ b/ydb/library/yql/public/udf/arrow/block_item_hasher.h
@@ -49,6 +49,14 @@ public:
}
};
+template <bool Nullable>
+class TFixedSizeBlockItemHasher<NYql::NDecimal::TInt128, Nullable> : public TBlockItemHasherBase<TFixedSizeBlockItemHasher<NYql::NDecimal::TInt128, Nullable>, Nullable> {
+public:
+ ui64 DoHash(TBlockItem value) const {
+ return GetValueHash<TDataType<NUdf::TDecimal>::Slot>(NUdf::TUnboxedValuePod(value.GetInt128()));
+ }
+};
+
template <typename T, bool Nullable>
class TTzDateBlockItemHasher : public TBlockItemHasherBase<TTzDateBlockItemHasher<T, Nullable>, Nullable> {
public:
diff --git a/ydb/library/yql/public/udf/arrow/block_reader.h b/ydb/library/yql/public/udf/arrow/block_reader.h
index bac76df5afe..dd72f51ac79 100644
--- a/ydb/library/yql/public/udf/arrow/block_reader.h
+++ b/ydb/library/yql/public/udf/arrow/block_reader.h
@@ -53,9 +53,15 @@ public:
}
}
- return static_cast<TDerived*>(this)->MakeBlockItem(
- *static_cast<const T*>(checked_cast<const PrimitiveScalarBase&>(scalar).data())
- );
+ if constexpr(std::is_same_v<T, NYql::NDecimal::TInt128>) {
+ auto& fixedScalar = checked_cast<const arrow::FixedSizeBinaryScalar&>(scalar);
+ T value; memcpy((void*)&value, fixedScalar.value->data(), sizeof(T));
+ return static_cast<TDerived*>(this)->MakeBlockItem(value);
+ } else {
+ return static_cast<TDerived*>(this)->MakeBlockItem(
+ *static_cast<const T*>(checked_cast<const PrimitiveScalarBase&>(scalar).data())
+ );
+ }
}
ui64 GetDataWeight(const arrow::ArrayData& data) const final {
@@ -96,7 +102,13 @@ public:
out.PushChar(1);
}
- out.PushNumber(*static_cast<const T*>(arrow::internal::checked_cast<const arrow::internal::PrimitiveScalarBase&>(scalar).data()));
+ if constexpr(std::is_same_v<T, NYql::NDecimal::TInt128>) {
+ auto& fixedScalar = arrow::internal::checked_cast<const arrow::FixedSizeBinaryScalar&>(scalar);
+ T value; memcpy((void*)&value, fixedScalar.value->data(), sizeof(T));
+ out.PushNumber(value);
+ } else {
+ out.PushNumber(*static_cast<const T*>(arrow::internal::checked_cast<const arrow::internal::PrimitiveScalarBase&>(scalar).data()));
+ }
}
};
@@ -660,8 +672,9 @@ std::unique_ptr<typename TTraits::TResult> MakeBlockReaderImpl(const ITypeInfoHe
return TTraits::template MakeTzDate<TTzDatetime64>(isOptional);
case NUdf::EDataSlot::TzTimestamp64:
return TTraits::template MakeTzDate<TTzTimestamp64>(isOptional);
- case NUdf::EDataSlot::Uuid:
case NUdf::EDataSlot::Decimal:
+ return MakeFixedSizeBlockReaderImpl<TTraits, NYql::NDecimal::TInt128>(isOptional);
+ case NUdf::EDataSlot::Uuid:
case NUdf::EDataSlot::DyNumber:
Y_ENSURE(false, "Unsupported data slot");
}
@@ -721,7 +734,9 @@ inline void UpdateBlockItemSerializeProps(const ITypeInfoHelper& typeInfoHelper,
auto typeId = typeData.GetTypeId();
auto slot = GetDataSlot(typeId);
auto& dataTypeInfo = GetDataTypeInfo(slot);
- if (dataTypeInfo.Features & StringType) {
+ if (dataTypeInfo.Features & DecimalType) {
+ *props.MaxSize += 16;
+ } else if (dataTypeInfo.Features & StringType) {
props.MaxSize = {};
props.IsFixed = false;
} else if (dataTypeInfo.Features & TzDateType) {
diff --git a/ydb/library/yql/tests/sql/dq_file/part11/canondata/result.json b/ydb/library/yql/tests/sql/dq_file/part11/canondata/result.json
index 8d0de24473e..3c8f8443336 100644
--- a/ydb/library/yql/tests/sql/dq_file/part11/canondata/result.json
+++ b/ydb/library/yql/tests/sql/dq_file/part11/canondata/result.json
@@ -423,6 +423,28 @@
}
],
"test.test[bigdate-const_interval64-default.txt-Results]": [],
+ "test.test[blocks-combine_all_decimal--Analyze]": [
+ {
+ "checksum": "9865e69a146e242a2e5bed878ef02996",
+ "size": 5512,
+ "uri": "https://{canondata_backend}/1880306/ee64d24fc7c0bd8fa221eca8eb309837e5c0fe9d/resource.tar.gz#test.test_blocks-combine_all_decimal--Analyze_/plan.txt"
+ }
+ ],
+ "test.test[blocks-combine_all_decimal--Debug]": [
+ {
+ "checksum": "43fa7229e0622d1ae40bfb5a06819008",
+ "size": 6511,
+ "uri": "https://{canondata_backend}/1937492/ff27734bcb37c413b13864458b4334e93e0d3308/resource.tar.gz#test.test_blocks-combine_all_decimal--Debug_/opt.yql_patched"
+ }
+ ],
+ "test.test[blocks-combine_all_decimal--Plan]": [
+ {
+ "checksum": "9865e69a146e242a2e5bed878ef02996",
+ "size": 5512,
+ "uri": "https://{canondata_backend}/1880306/ee64d24fc7c0bd8fa221eca8eb309837e5c0fe9d/resource.tar.gz#test.test_blocks-combine_all_decimal--Plan_/plan.txt"
+ }
+ ],
+ "test.test[blocks-combine_all_decimal--Results]": [],
"test.test[blocks-combine_all_some_filter--Analyze]": [
{
"checksum": "4d05f39dec0aca30f992ac6208eb278c",
diff --git a/ydb/library/yql/tests/sql/dq_file/part18/canondata/result.json b/ydb/library/yql/tests/sql/dq_file/part18/canondata/result.json
index efefba92223..3ef561fc5c7 100644
--- a/ydb/library/yql/tests/sql/dq_file/part18/canondata/result.json
+++ b/ydb/library/yql/tests/sql/dq_file/part18/canondata/result.json
@@ -432,6 +432,28 @@
}
],
"test.test[blocks-add_uint64_opt2--Results]": [],
+ "test.test[blocks-combine_all_decimal_max-default.txt-Analyze]": [
+ {
+ "checksum": "b4dd508a329723c74293d80f0278c705",
+ "size": 505,
+ "uri": "https://{canondata_backend}/1936947/25efb9f6eb4d1e76047ae7c2aef5ff59896f5b3c/resource.tar.gz#test.test_blocks-combine_all_decimal_max-default.txt-Analyze_/plan.txt"
+ }
+ ],
+ "test.test[blocks-combine_all_decimal_max-default.txt-Debug]": [
+ {
+ "checksum": "48418f38150ee63d607b0e494e40b972",
+ "size": 749,
+ "uri": "https://{canondata_backend}/1936947/25efb9f6eb4d1e76047ae7c2aef5ff59896f5b3c/resource.tar.gz#test.test_blocks-combine_all_decimal_max-default.txt-Debug_/opt.yql_patched"
+ }
+ ],
+ "test.test[blocks-combine_all_decimal_max-default.txt-Plan]": [
+ {
+ "checksum": "b4dd508a329723c74293d80f0278c705",
+ "size": 505,
+ "uri": "https://{canondata_backend}/1936947/25efb9f6eb4d1e76047ae7c2aef5ff59896f5b3c/resource.tar.gz#test.test_blocks-combine_all_decimal_max-default.txt-Plan_/plan.txt"
+ }
+ ],
+ "test.test[blocks-combine_all_decimal_max-default.txt-Results]": [],
"test.test[blocks-combine_all_min_filter--Analyze]": [
{
"checksum": "4d05f39dec0aca30f992ac6208eb278c",
diff --git a/ydb/library/yql/tests/sql/dq_file/part9/canondata/result.json b/ydb/library/yql/tests/sql/dq_file/part9/canondata/result.json
index ca40ba808b2..fa80955643b 100644
--- a/ydb/library/yql/tests/sql/dq_file/part9/canondata/result.json
+++ b/ydb/library/yql/tests/sql/dq_file/part9/canondata/result.json
@@ -401,6 +401,28 @@
}
],
"test.test[bigdate-compare_big_small-default.txt-Results]": [],
+ "test.test[blocks-add_decimal--Analyze]": [
+ {
+ "checksum": "4ea41bf11d32f86031a9a238809fc531",
+ "size": 3723,
+ "uri": "https://{canondata_backend}/1936997/4c96904082a08349976603bc8711f3c66e870d86/resource.tar.gz#test.test_blocks-add_decimal--Analyze_/plan.txt"
+ }
+ ],
+ "test.test[blocks-add_decimal--Debug]": [
+ {
+ "checksum": "9f81433542312b9fd72ef78206c3dfc9",
+ "size": 1800,
+ "uri": "https://{canondata_backend}/1936997/4c96904082a08349976603bc8711f3c66e870d86/resource.tar.gz#test.test_blocks-add_decimal--Debug_/opt.yql_patched"
+ }
+ ],
+ "test.test[blocks-add_decimal--Plan]": [
+ {
+ "checksum": "4ea41bf11d32f86031a9a238809fc531",
+ "size": 3723,
+ "uri": "https://{canondata_backend}/1936997/4c96904082a08349976603bc8711f3c66e870d86/resource.tar.gz#test.test_blocks-add_decimal--Plan_/plan.txt"
+ }
+ ],
+ "test.test[blocks-add_decimal--Results]": [],
"test.test[blocks-combine_all_count--Analyze]": [
{
"checksum": "ffbc8a0b29d03ebcaa44db67a2544952",
diff --git a/ydb/library/yql/tests/sql/hybrid_file/part1/canondata/result.json b/ydb/library/yql/tests/sql/hybrid_file/part1/canondata/result.json
index 1e958e5a83d..650998a6422 100644
--- a/ydb/library/yql/tests/sql/hybrid_file/part1/canondata/result.json
+++ b/ydb/library/yql/tests/sql/hybrid_file/part1/canondata/result.json
@@ -531,6 +531,20 @@
"uri": "https://{canondata_backend}/1889210/b0118a685beb1cfba4436799d514f392b983c379/resource.tar.gz#test.test_blocks-coalesce_ints--Plan_/plan.txt"
}
],
+ "test.test[blocks-combine_all_decimal--Debug]": [
+ {
+ "checksum": "6a14942091e192a9cfc41b1f8bd222f0",
+ "size": 9047,
+ "uri": "https://{canondata_backend}/1871182/aba145d12e60259474a4d54a3c0451194ac1617f/resource.tar.gz#test.test_blocks-combine_all_decimal--Debug_/opt.yql_patched"
+ }
+ ],
+ "test.test[blocks-combine_all_decimal--Plan]": [
+ {
+ "checksum": "c6efb578089e649666b0f254bce73693",
+ "size": 5645,
+ "uri": "https://{canondata_backend}/1809005/7142336c5d0d72d24df8de2731d5f39f0b6e741c/resource.tar.gz#test.test_blocks-combine_all_decimal--Plan_/plan.txt"
+ }
+ ],
"test.test[blocks-combine_all_min_filter--Debug]": [
{
"checksum": "29a4530555819599901f599a116aa9f0",
diff --git a/ydb/library/yql/tests/sql/hybrid_file/part10/canondata/result.json b/ydb/library/yql/tests/sql/hybrid_file/part10/canondata/result.json
index 5e7627305ef..43bc42fe3a7 100644
--- a/ydb/library/yql/tests/sql/hybrid_file/part10/canondata/result.json
+++ b/ydb/library/yql/tests/sql/hybrid_file/part10/canondata/result.json
@@ -559,6 +559,20 @@
"uri": "https://{canondata_backend}/1775319/3515b86fb929979a6751f93bd43a0291eaa01262/resource.tar.gz#test.test_binding-tie_scalar_context-default.txt-Plan_/plan.txt"
}
],
+ "test.test[blocks-add_decimal--Debug]": [
+ {
+ "checksum": "3b41a09ad32d2e3ab74db02fd98fd386",
+ "size": 2539,
+ "uri": "https://{canondata_backend}/1925821/b3b1e9eed0dbfb05c9eb91f8523c914ef2b33985/resource.tar.gz#test.test_blocks-add_decimal--Debug_/opt.yql_patched"
+ }
+ ],
+ "test.test[blocks-add_decimal--Plan]": [
+ {
+ "checksum": "f6ee0c504755fc4e53aea49fe66277b6",
+ "size": 4097,
+ "uri": "https://{canondata_backend}/1925821/b3b1e9eed0dbfb05c9eb91f8523c914ef2b33985/resource.tar.gz#test.test_blocks-add_decimal--Plan_/plan.txt"
+ }
+ ],
"test.test[blocks-coalesce_bools--Debug]": [
{
"checksum": "7ca8710ba5c6493da83b6bc790cd1e24",
diff --git a/ydb/library/yql/tests/sql/hybrid_file/part6/canondata/result.json b/ydb/library/yql/tests/sql/hybrid_file/part6/canondata/result.json
index a2797dc5cc7..541e8680cc2 100644
--- a/ydb/library/yql/tests/sql/hybrid_file/part6/canondata/result.json
+++ b/ydb/library/yql/tests/sql/hybrid_file/part6/canondata/result.json
@@ -587,6 +587,20 @@
"uri": "https://{canondata_backend}/1600758/a0037402be63401435fff64f584873e15fc9edd1/resource.tar.gz#test.test_blocks-combine_all_count_filter--Plan_/plan.txt"
}
],
+ "test.test[blocks-combine_all_decimal_max-default.txt-Debug]": [
+ {
+ "checksum": "a3b1b0090dffe12204a5b2e56fb8ee64",
+ "size": 748,
+ "uri": "https://{canondata_backend}/1781765/cf4791e13b24747d9e6fb3bfc11e0fdb45a964c9/resource.tar.gz#test.test_blocks-combine_all_decimal_max-default.txt-Debug_/opt.yql_patched"
+ }
+ ],
+ "test.test[blocks-combine_all_decimal_max-default.txt-Plan]": [
+ {
+ "checksum": "b4dd508a329723c74293d80f0278c705",
+ "size": 505,
+ "uri": "https://{canondata_backend}/1781765/cf4791e13b24747d9e6fb3bfc11e0fdb45a964c9/resource.tar.gz#test.test_blocks-combine_all_decimal_max-default.txt-Plan_/plan.txt"
+ }
+ ],
"test.test[blocks-combine_all_minmax_nested--Debug]": [
{
"checksum": "5d2d9f3790a48d0161d988a8823c100e",
diff --git a/ydb/library/yql/tests/sql/sql2yql/canondata/result.json b/ydb/library/yql/tests/sql/sql2yql/canondata/result.json
index 73ec13c715c..551be78940d 100644
--- a/ydb/library/yql/tests/sql/sql2yql/canondata/result.json
+++ b/ydb/library/yql/tests/sql/sql2yql/canondata/result.json
@@ -3247,6 +3247,13 @@
"uri": "https://{canondata_backend}/1784117/d56ae82ad9d30397a41490647be1bd2124718f98/resource.tar.gz#test_sql2yql.test_bitcast_implicit-sub_bitcast_/sql.yql"
}
],
+ "test_sql2yql.test[blocks-add_decimal]": [
+ {
+ "checksum": "e6d39b84d45f6541ce253f4307e2c6bf",
+ "size": 1668,
+ "uri": "https://{canondata_backend}/1900335/8af685f983ecf5996df52bc7dc7bb3d611a1566f/resource.tar.gz#test_sql2yql.test_blocks-add_decimal_/sql.yql"
+ }
+ ],
"test_sql2yql.test[blocks-add_int16]": [
{
"checksum": "42b47f64d1624134c6f630719ce556e5",
@@ -3401,6 +3408,20 @@
"uri": "https://{canondata_backend}/1889210/525a727248199e4967ca2f854e5235352e3d5482/resource.tar.gz#test_sql2yql.test_blocks-combine_all_count_filter_opt_/sql.yql"
}
],
+ "test_sql2yql.test[blocks-combine_all_decimal]": [
+ {
+ "checksum": "87016e6b1e2a0e93870086ee9bb5f973",
+ "size": 3955,
+ "uri": "https://{canondata_backend}/1689644/bebe9ff4f61a19b42469a2d8a15dc9603a6ce7d5/resource.tar.gz#test_sql2yql.test_blocks-combine_all_decimal_/sql.yql"
+ }
+ ],
+ "test_sql2yql.test[blocks-combine_all_decimal_max]": [
+ {
+ "checksum": "cb2fec771db04767583a924da4d767b9",
+ "size": 1607,
+ "uri": "https://{canondata_backend}/1942100/df0cf5f44e4a847c8c10b5a97f5a8900aa2ae5b4/resource.tar.gz#test_sql2yql.test_blocks-combine_all_decimal_max_/sql.yql"
+ }
+ ],
"test_sql2yql.test[blocks-combine_all_max]": [
{
"checksum": "8f4360e9c24a9329a3d64ca8e2047807",
@@ -22672,6 +22693,13 @@
"uri": "https://{canondata_backend}/1880306/64654158d6bfb1289c66c626a8162239289559d0/resource.tar.gz#test_sql_format.test_bitcast_implicit-sub_bitcast_/formatted.sql"
}
],
+ "test_sql_format.test[blocks-add_decimal]": [
+ {
+ "checksum": "caeaea13a55ab18aaae27f6bf2606e16",
+ "size": 171,
+ "uri": "https://{canondata_backend}/1900335/8af685f983ecf5996df52bc7dc7bb3d611a1566f/resource.tar.gz#test_sql_format.test_blocks-add_decimal_/formatted.sql"
+ }
+ ],
"test_sql_format.test[blocks-add_int16]": [
{
"checksum": "5210bc3844abe9126e017234871b2a94",
@@ -22826,6 +22854,20 @@
"uri": "https://{canondata_backend}/1889210/525a727248199e4967ca2f854e5235352e3d5482/resource.tar.gz#test_sql_format.test_blocks-combine_all_count_filter_opt_/formatted.sql"
}
],
+ "test_sql_format.test[blocks-combine_all_decimal]": [
+ {
+ "checksum": "04545fcefe37c88291831c68b1af9016",
+ "size": 342,
+ "uri": "https://{canondata_backend}/1689644/bebe9ff4f61a19b42469a2d8a15dc9603a6ce7d5/resource.tar.gz#test_sql_format.test_blocks-combine_all_decimal_/formatted.sql"
+ }
+ ],
+ "test_sql_format.test[blocks-combine_all_decimal_max]": [
+ {
+ "checksum": "cc05542eb7dd01b70af82d41df5e21e9",
+ "size": 185,
+ "uri": "https://{canondata_backend}/1942100/df0cf5f44e4a847c8c10b5a97f5a8900aa2ae5b4/resource.tar.gz#test_sql_format.test_blocks-combine_all_decimal_max_/formatted.sql"
+ }
+ ],
"test_sql_format.test[blocks-combine_all_max]": [
{
"checksum": "09feb94b391372d4fb0b275f6ae5efa4",
diff --git a/ydb/library/yql/tests/sql/suites/blocks/add_decimal.cfg b/ydb/library/yql/tests/sql/suites/blocks/add_decimal.cfg
new file mode 100644
index 00000000000..a21c6560ede
--- /dev/null
+++ b/ydb/library/yql/tests/sql/suites/blocks/add_decimal.cfg
@@ -0,0 +1 @@
+in Input input_decimal.txt
diff --git a/ydb/library/yql/tests/sql/suites/blocks/add_decimal.sql b/ydb/library/yql/tests/sql/suites/blocks/add_decimal.sql
new file mode 100644
index 00000000000..648a3709d5e
--- /dev/null
+++ b/ydb/library/yql/tests/sql/suites/blocks/add_decimal.sql
@@ -0,0 +1,5 @@
+USE plato;
+
+SELECT
+ cs_ext_list_price+cs_sales_price,cast(1 as decimal(7,2))+cs_ext_list_price,cs_sales_price+cast(2ul as decimal(7,2))
+FROM Input
diff --git a/ydb/library/yql/tests/sql/suites/blocks/combine_all_decimal.cfg b/ydb/library/yql/tests/sql/suites/blocks/combine_all_decimal.cfg
new file mode 100644
index 00000000000..a21c6560ede
--- /dev/null
+++ b/ydb/library/yql/tests/sql/suites/blocks/combine_all_decimal.cfg
@@ -0,0 +1 @@
+in Input input_decimal.txt
diff --git a/ydb/library/yql/tests/sql/suites/blocks/combine_all_decimal.sql b/ydb/library/yql/tests/sql/suites/blocks/combine_all_decimal.sql
new file mode 100644
index 00000000000..1085d839000
--- /dev/null
+++ b/ydb/library/yql/tests/sql/suites/blocks/combine_all_decimal.sql
@@ -0,0 +1,15 @@
+use plato;
+
+SELECT
+ count(cs_ext_list_price),
+ sum(cs_ext_tax),
+ sum(cs_ext_list_price),
+ min(cs_sales_price),
+ max(cs_ext_list_price),
+ avg(cs_ext_tax),
+
+ sum(cast(cs_sales_price as float)),
+ min(cast(cs_ext_list_price as float)),
+ max(cast(cs_ext_tax as float)),
+ avg(cast(cs_sales_price as float))
+FROM Input
diff --git a/ydb/library/yql/tests/sql/suites/blocks/combine_all_decimal_max.sql b/ydb/library/yql/tests/sql/suites/blocks/combine_all_decimal_max.sql
new file mode 100644
index 00000000000..7119d3dc79e
--- /dev/null
+++ b/ydb/library/yql/tests/sql/suites/blocks/combine_all_decimal_max.sql
@@ -0,0 +1,9 @@
+use plato;
+
+SELECT
+ sum(x),
+ avg(x)
+FROM (values
+ (Decimal("99999999999999999999999999999999999",35,0)),
+ (Decimal("1",35,0))
+) as a(x)
diff --git a/ydb/library/yql/tests/sql/suites/blocks/input_decimal.txt b/ydb/library/yql/tests/sql/suites/blocks/input_decimal.txt
new file mode 100644
index 00000000000..7be0d5ef5f4
--- /dev/null
+++ b/ydb/library/yql/tests/sql/suites/blocks/input_decimal.txt
@@ -0,0 +1,10 @@
+{"cs_ext_list_price"="\x83\7\xE1\xFC";"cs_ext_tax"="\x82\4p";"cs_sales_price"="\x82\7\x86";};
+{"cs_ext_list_price"="\x83\3\x11P";"cs_ext_tax"="\x82\x1BS";"cs_sales_price"="\x82\0143";};
+{"cs_ext_list_price"="\x83\x0B{\x94";"cs_ext_tax"="\x82\0229";"cs_sales_price"="\x82\x1B\x9D";};
+{"cs_ext_list_price"="\x83\x10\x98i";"cs_ext_tax"="\x82Q\x91";"cs_sales_price"="\x82\x0C\xE8";};
+{"cs_ext_list_price"="\x83\x15\x1B\xE8";"cs_ext_tax"="\x82\xA2\x1C";"cs_sales_price"="\x82@\xD8";};
+{"cs_ext_list_price"="\x83\x08\xF3Z";"cs_ext_tax"="\x82pD";"cs_sales_price"="\x82\x1D)";};
+{"cs_ext_list_price"="\x83\6\xF6\xB0";"cs_ext_tax"="\x82\x08\xBC";"cs_sales_price"="\x82\t\xED";};
+{"cs_ext_list_price"="\x83\x08\xBC\xD4";"cs_ext_tax"="\x82\n\xAD";"cs_sales_price"="\x82\x19T";};
+{"cs_ext_list_price"="\x82\x14r";"cs_ext_tax"="\x82\0011";"cs_sales_price"="\x82\7v";};
+{"cs_ext_list_price"="\x83\2\xC6\xB8";"cs_ext_tax"="\x82\n\x83";"cs_sales_price"="\x82\n\xF4";};
diff --git a/ydb/library/yql/tests/sql/suites/blocks/input_decimal.txt.attr b/ydb/library/yql/tests/sql/suites/blocks/input_decimal.txt.attr
new file mode 100644
index 00000000000..ebddf8b641a
--- /dev/null
+++ b/ydb/library/yql/tests/sql/suites/blocks/input_decimal.txt.attr
@@ -0,0 +1,7 @@
+{"_yql_row_spec"={
+ "Type"=["StructType";[
+ ["cs_ext_list_price";["OptionalType";["DataType";"Decimal";"7";"2"]]];
+ ["cs_ext_tax";["OptionalType";["DataType";"Decimal";"7";"2"]]];
+ ["cs_sales_price";["OptionalType";["DataType";"Decimal";"7";"2"]]];
+ ]];
+}}
diff --git a/ydb/library/yql/tests/sql/yt_native_file/part11/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part11/canondata/result.json
index 29180c15002..e16aca10a2a 100644
--- a/ydb/library/yql/tests/sql/yt_native_file/part11/canondata/result.json
+++ b/ydb/library/yql/tests/sql/yt_native_file/part11/canondata/result.json
@@ -408,6 +408,34 @@
"uri": "https://{canondata_backend}/1900335/b19e3a88f54bb49e1964c45fd96be88e34fd38d0/resource.tar.gz#test.test_bigdate-const_interval64-default.txt-Results_/results.txt"
}
],
+ "test.test[blocks-combine_all_decimal--Debug]": [
+ {
+ "checksum": "114289acc7a4144ba97bc98472aba836",
+ "size": 8215,
+ "uri": "https://{canondata_backend}/1936273/3dc94c747fd0323396d150e2c3d46e4c23a5a7bc/resource.tar.gz#test.test_blocks-combine_all_decimal--Debug_/opt.yql"
+ }
+ ],
+ "test.test[blocks-combine_all_decimal--Peephole]": [
+ {
+ "checksum": "b4aa8df16bc21d01673b7b5b3c12ca33",
+ "size": 6592,
+ "uri": "https://{canondata_backend}/1936273/3dc94c747fd0323396d150e2c3d46e4c23a5a7bc/resource.tar.gz#test.test_blocks-combine_all_decimal--Peephole_/opt.yql"
+ }
+ ],
+ "test.test[blocks-combine_all_decimal--Plan]": [
+ {
+ "checksum": "55814f8ca57ce1b51948df8c9e61334a",
+ "size": 6041,
+ "uri": "https://{canondata_backend}/1937027/fad81fefb8af5f49d771d0dbcf3ce8ad436537aa/resource.tar.gz#test.test_blocks-combine_all_decimal--Plan_/plan.txt"
+ }
+ ],
+ "test.test[blocks-combine_all_decimal--Results]": [
+ {
+ "checksum": "7ce953c23a6b6758c3f5cdb53b5853fb",
+ "size": 5578,
+ "uri": "https://{canondata_backend}/1773845/c59fdfc79083236f6028d8e1b25c44ef85dc0e46/resource.tar.gz#test.test_blocks-combine_all_decimal--Results_/results.txt"
+ }
+ ],
"test.test[blocks-combine_all_some_filter--Debug]": [
{
"checksum": "54dc4cd5f72d55789e0677fb0494143a",
diff --git a/ydb/library/yql/tests/sql/yt_native_file/part18/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part18/canondata/result.json
index a51db0fa4b9..683f743a3cd 100644
--- a/ydb/library/yql/tests/sql/yt_native_file/part18/canondata/result.json
+++ b/ydb/library/yql/tests/sql/yt_native_file/part18/canondata/result.json
@@ -419,6 +419,34 @@
"uri": "https://{canondata_backend}/1899731/c92a452de9406ab0662eb9deef9f0799dbd3673d/resource.tar.gz#test.test_blocks-add_uint64_opt2--Results_/results.txt"
}
],
+ "test.test[blocks-combine_all_decimal_max-default.txt-Debug]": [
+ {
+ "checksum": "b9f7bf5cd9f9d2dba6c434dae8f87ce7",
+ "size": 679,
+ "uri": "https://{canondata_backend}/1937150/90029cb00529b334c5c69abcbf5d339ec52ab8cd/resource.tar.gz#test.test_blocks-combine_all_decimal_max-default.txt-Debug_/opt.yql"
+ }
+ ],
+ "test.test[blocks-combine_all_decimal_max-default.txt-Peephole]": [
+ {
+ "checksum": "b8b7433a6bd71df143272eac907bd4dd",
+ "size": 1041,
+ "uri": "https://{canondata_backend}/1937150/90029cb00529b334c5c69abcbf5d339ec52ab8cd/resource.tar.gz#test.test_blocks-combine_all_decimal_max-default.txt-Peephole_/opt.yql"
+ }
+ ],
+ "test.test[blocks-combine_all_decimal_max-default.txt-Plan]": [
+ {
+ "checksum": "b4dd508a329723c74293d80f0278c705",
+ "size": 505,
+ "uri": "https://{canondata_backend}/1937150/90029cb00529b334c5c69abcbf5d339ec52ab8cd/resource.tar.gz#test.test_blocks-combine_all_decimal_max-default.txt-Plan_/plan.txt"
+ }
+ ],
+ "test.test[blocks-combine_all_decimal_max-default.txt-Results]": [
+ {
+ "checksum": "737cae747bb974b0d78f3dab53c6a381",
+ "size": 1562,
+ "uri": "https://{canondata_backend}/1937150/90029cb00529b334c5c69abcbf5d339ec52ab8cd/resource.tar.gz#test.test_blocks-combine_all_decimal_max-default.txt-Results_/results.txt"
+ }
+ ],
"test.test[blocks-combine_all_min_filter--Debug]": [
{
"checksum": "d6cd48031c1326b0c541aa9f9a2e035c",
diff --git a/ydb/library/yql/tests/sql/yt_native_file/part9/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part9/canondata/result.json
index 4237c7b276e..e671578ca77 100644
--- a/ydb/library/yql/tests/sql/yt_native_file/part9/canondata/result.json
+++ b/ydb/library/yql/tests/sql/yt_native_file/part9/canondata/result.json
@@ -371,6 +371,34 @@
"uri": "https://{canondata_backend}/1775059/59a8dbe71df1bdc11b3f063ab535eee737c36649/resource.tar.gz#test.test_bigdate-compare_big_small-default.txt-Results_/results.txt"
}
],
+ "test.test[blocks-add_decimal--Debug]": [
+ {
+ "checksum": "db08e47a48e39c52b3db56449978e177",
+ "size": 1901,
+ "uri": "https://{canondata_backend}/1597364/32529e94173a652cdfc0171aa011006fdaa6c446/resource.tar.gz#test.test_blocks-add_decimal--Debug_/opt.yql"
+ }
+ ],
+ "test.test[blocks-add_decimal--Peephole]": [
+ {
+ "checksum": "4be1a7c6b0407ec937324149ea3edd73",
+ "size": 1646,
+ "uri": "https://{canondata_backend}/1597364/32529e94173a652cdfc0171aa011006fdaa6c446/resource.tar.gz#test.test_blocks-add_decimal--Peephole_/opt.yql"
+ }
+ ],
+ "test.test[blocks-add_decimal--Plan]": [
+ {
+ "checksum": "984a99248f092852201faa89380e8e8b",
+ "size": 4067,
+ "uri": "https://{canondata_backend}/1597364/32529e94173a652cdfc0171aa011006fdaa6c446/resource.tar.gz#test.test_blocks-add_decimal--Plan_/plan.txt"
+ }
+ ],
+ "test.test[blocks-add_decimal--Results]": [
+ {
+ "checksum": "309c872fe6ad43b9f4059eca8b56c119",
+ "size": 4973,
+ "uri": "https://{canondata_backend}/1597364/32529e94173a652cdfc0171aa011006fdaa6c446/resource.tar.gz#test.test_blocks-add_decimal--Results_/results.txt"
+ }
+ ],
"test.test[blocks-combine_all_count--Debug]": [
{
"checksum": "be2c22b81b2297d9b974001c34dbae15",