diff options
author | aneporada <aneporada@ydb.tech> | 2023-03-03 17:59:22 +0300 |
---|---|---|
committer | aneporada <aneporada@ydb.tech> | 2023-03-03 17:59:22 +0300 |
commit | a61300f2c663f1ade2a187a1bf5f696ce65b74da (patch) | |
tree | b52899752c173f5fef1b1afbd15ce8287872ee92 | |
parent | 0fe89e4d5fa637c208c5ee3918e3549d86715f73 (diff) | |
download | ydb-a61300f2c663f1ade2a187a1bf5f696ce65b74da.tar.gz |
[blocks] Support tuples and nested optionals in MIN/MAX
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp | 273 | ||||
-rw-r--r-- | ydb/library/yql/public/udf/arrow/block_item_comparator.h | 15 |
2 files changed, 276 insertions, 12 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp index f91f486b631..54381052ef8 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp @@ -9,6 +9,10 @@ #include <ydb/library/yql/minikql/arrow/arrow_defs.h> #include <ydb/library/yql/minikql/arrow/arrow_util.h> +#include <ydb/library/yql/minikql/comp_nodes/mkql_block_reader.h> + +#include <ydb/library/yql/public/udf/arrow/block_item_comparator.h> + #include <arrow/scalar.h> #include <arrow/array/builder_primitive.h> @@ -17,7 +21,7 @@ namespace NMiniKQL { namespace { -template <bool IsMin, typename T> +template<bool IsMin, typename T> inline T UpdateMinMax(T x, T y) { if constexpr (IsMin) { return x < y ? x : y; @@ -26,21 +30,30 @@ inline T UpdateMinMax(T x, T y) { } } -template <bool IsMin, typename T> +template<bool IsMin, typename T> inline void UpdateMinMax(TMaybe<T>& state, bool& stateUpdated, T value) { - if (!state) { - state = value; - stateUpdated = true; - return; + if constexpr (IsMin) { + if (!state || value < *state) { + state = value; + stateUpdated = true; + } + } else { + if (!state || *state < value) { + state = value; + stateUpdated = true; + } } +} +template<bool IsMin> +inline void UpdateMinMax(NYql::NUdf::IBlockItemComparator& comparator, TBlockItem& state, bool& stateUpdated, TBlockItem value) { if constexpr (IsMin) { - if (value < *state) { + if (!state || comparator.Less(value, state)) { state = value; stateUpdated = true; } } else { - if (*state < value) { + if (!state || comparator.Less(state, value)) { state = value; stateUpdated = true; } @@ -50,9 +63,12 @@ inline void UpdateMinMax(TMaybe<T>& state, bool& stateUpdated, T value) { template<typename TTag, typename TString, bool IsMin> class TMinMaxBlockStringAggregator; -template <typename TTag, bool IsNullable, bool IsScalar, typename TIn, bool IsMin> +template<typename TTag, bool IsNullable, bool IsScalar, typename TIn, bool IsMin> class TMinMaxBlockFixedAggregator; +template<typename TTag, bool IsMin> +class TMinMaxBlockGenericAggregator; + template <bool IsNullable, typename TIn, bool IsMin> struct TState; @@ -137,6 +153,211 @@ private: TComputationContext& Ctx_; }; +template <bool IsMin> +void PushValueToState(TGenericState* typedState, const arrow::Datum& datum, ui64 row, IBlockReader& reader, + IBlockItemConverter& converter, NYql::NUdf::IBlockItemComparator& comparator, TComputationContext& ctx) +{ + TBlockItem stateItem; + bool stateChanged = false; + if (datum.is_scalar()) { + if (datum.scalar()->is_valid) { + stateItem = reader.GetScalarItem(*datum.scalar()); + stateChanged = true; + } + } else { + stateItem = converter.MakeItem(*typedState); + + const auto& array = datum.array(); + + constexpr int updateCmp = IsMin ? -1 : 1; + + TBlockItem curr = reader.GetItem(*array, row); + if (curr) { + UpdateMinMax<IsMin>(comparator, stateItem, stateChanged, curr); + } + } + + if (stateChanged) { + typedState->DeleteUnreferenced(); + *typedState = converter.MakeValue(stateItem, ctx.HolderFactory); + } +} + +template<bool IsMin> +class TMinMaxBlockGenericAggregator<TCombineAllTag, IsMin> : public TCombineAllTag::TBase { +public: + using TBase = TCombineAllTag::TBase; + + TMinMaxBlockGenericAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx) + : TBase(sizeof(TGenericState), filterColumn, ctx) + , ArgColumn_(argColumn) + , ReaderOne_(MakeBlockReader(TTypeInfoHelper(), type)) + , ReaderTwo_(MakeBlockReader(TTypeInfoHelper(), type)) + , Converter_(MakeBlockItemConverter(TTypeInfoHelper(), type)) + , Compare_(NYql::NUdf::MakeBlockItemComparator(TTypeInfoHelper(), type)) + { + } + + void InitState(void* state) final { + new(state) TGenericState(); + } + + void DestroyState(void* state) noexcept final { + auto typedState = static_cast<TGenericState*>(state); + typedState->DeleteUnreferenced(); + *typedState = TGenericState(); + } + + void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final { + TGenericState& typedState = *static_cast<TGenericState*>(state); + Y_UNUSED(batchLength); + const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); + + IBlockReader* currReader = ReaderOne_.get(); + IBlockReader* stateReader = ReaderTwo_.get(); + + TBlockItem stateItem; + bool stateChanged = false; + if (datum.is_scalar()) { + if (datum.scalar()->is_valid) { + stateItem = currReader->GetScalarItem(*datum.scalar()); + stateChanged = true; + } + } else { + stateItem = Converter_->MakeItem(typedState); + + const auto& array = datum.array(); + auto len = array->length; + + const ui8* filterBitmap = nullptr; + if (filtered) { + const auto& filterDatum = TArrowBlock::From(columns[*FilterColumn_]).GetDatum(); + const auto& filterArray = filterDatum.array(); + MKQL_ENSURE(filterArray->GetNullCount() == 0, "Expected non-nullable bool column"); + filterBitmap = filterArray->template GetValues<uint8_t>(1); + } + auto& comparator = *Compare_; + for (size_t i = 0; i < len; ++i) { + TBlockItem curr = currReader->GetItem(*array, i); + if (curr && (!filterBitmap || filterBitmap[i])) { + bool changed = false; + UpdateMinMax<IsMin>(comparator, stateItem, changed, curr); + if (changed) { + std::swap(currReader, stateReader); + stateChanged = true; + } + } + } + } + + if (stateChanged) { + typedState.DeleteUnreferenced(); + typedState = Converter_->MakeValue(stateItem, Ctx_.HolderFactory); + } + } + + NUdf::TUnboxedValue FinishOne(const void *state) final { + auto typedState = *static_cast<const TGenericState *>(state); + return typedState; + } + +private: + const ui32 ArgColumn_; + const std::unique_ptr<IBlockReader> ReaderOne_; + const std::unique_ptr<IBlockReader> ReaderTwo_; + const std::unique_ptr<IBlockItemConverter> Converter_; + const std::unique_ptr<NYql::NUdf::IBlockItemComparator> Compare_; +}; + +template<bool IsMin> +class TMinMaxBlockGenericAggregator<TCombineKeysTag, IsMin> : public TCombineKeysTag::TBase { +public: + using TBase = TCombineKeysTag::TBase; + + TMinMaxBlockGenericAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx) + : TBase(sizeof(TGenericState), filterColumn, ctx) + , ArgColumn_(argColumn) + , Type_(type) + , Reader_(MakeBlockReader(TTypeInfoHelper(), type)) + , Converter_(MakeBlockItemConverter(TTypeInfoHelper(), type)) + , Compare_(NYql::NUdf::MakeBlockItemComparator(TTypeInfoHelper(), type)) + { + } + + void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + new(state) TGenericState(); + UpdateKey(state, columns, row); + } + + void DestroyState(void* state) noexcept final { + auto typedState = static_cast<TGenericState*>(state); + typedState->DeleteUnreferenced(); + *typedState = TGenericState(); + } + + void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + auto typedState = static_cast<TGenericState*>(state); + const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); + PushValueToState<IsMin>(typedState, datum, row, *Reader_, *Converter_, *Compare_, Ctx_); + } + + std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) final { + return std::make_unique<TGenericColumnBuilder>(size, Type_, Ctx_); + } + +private: + const ui32 ArgColumn_; + TType* const Type_; + const std::unique_ptr<IBlockReader> Reader_; + const std::unique_ptr<IBlockItemConverter> Converter_; + const std::unique_ptr<NYql::NUdf::IBlockItemComparator> Compare_; +}; + +template<bool IsMin> +class TMinMaxBlockGenericAggregator<TFinalizeKeysTag, IsMin> : public TFinalizeKeysTag::TBase { +public: + using TBase = TFinalizeKeysTag::TBase; + + TMinMaxBlockGenericAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx) + : TBase(sizeof(TGenericState), filterColumn, ctx) + , ArgColumn_(argColumn) + , Type_(type) + , Reader_(MakeBlockReader(TTypeInfoHelper(), type)) + , Converter_(MakeBlockItemConverter(TTypeInfoHelper(), type)) + , Compare_(NYql::NUdf::MakeBlockItemComparator(TTypeInfoHelper(), type)) + + { + } + + void LoadState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + new(state) TGenericState(); + UpdateState(state, columns, row); + } + + void DestroyState(void* state) noexcept final { + auto typedState = static_cast<TGenericState*>(state); + typedState->DeleteUnreferenced(); + *typedState = TGenericState(); + } + + void UpdateState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final { + auto typedState = static_cast<TGenericState*>(state); + const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); + PushValueToState<IsMin>(typedState, datum, row, *Reader_, *Converter_, *Compare_, Ctx_); + } + + std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final { + return std::make_unique<TGenericColumnBuilder>(size, Type_, Ctx_); + } + +private: + const ui32 ArgColumn_; + TType* const Type_; + const std::unique_ptr<IBlockReader> Reader_; + const std::unique_ptr<IBlockItemConverter> Converter_; + const std::unique_ptr<NYql::NUdf::IBlockItemComparator> Compare_; +}; + template <typename TStringType, bool IsMin> void PushValueToState(TGenericState* typedState, const arrow::Datum& datum, ui64 row) { using TOffset = typename TPrimitiveDataType<TStringType>::TResult::offset_type;; @@ -638,6 +859,28 @@ private: const ui32 ArgColumn_; }; +template <typename TTag, bool IsMin> +class TPreparedMinMaxBlockGenericAggregator : public TTag::TPreparedAggregator { +public: + using TBase = typename TTag::TPreparedAggregator; + + TPreparedMinMaxBlockGenericAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn) + : TBase(sizeof(TGenericState)) + , Type_(type) + , FilterColumn_(filterColumn) + , ArgColumn_(argColumn) + {} + + std::unique_ptr<typename TTag::TAggregator> Make(TComputationContext& ctx) const final { + return std::make_unique<TMinMaxBlockGenericAggregator<TTag, IsMin>>(Type_, FilterColumn_, ArgColumn_, ctx); + } + +private: + TType* const Type_; + const std::optional<ui32> FilterColumn_; + const ui32 ArgColumn_; +}; + template<typename TTag, typename TIn, bool IsMin> std::unique_ptr<typename TTag::TPreparedAggregator> PrepareMinMaxFixed(TType* type, bool isOptional, bool isScalar, std::optional<ui32> filterColumn, ui32 argColumn) { if (isScalar) { @@ -655,9 +898,16 @@ std::unique_ptr<typename TTag::TPreparedAggregator> PrepareMinMaxFixed(TType* ty template <typename TTag, bool IsMin> std::unique_ptr<typename TTag::TPreparedAggregator> PrepareMinMax(TTupleType* tupleType, std::optional<ui32> filterColumn, ui32 argColumn) { auto blockType = AS_TYPE(TBlockType, tupleType->GetElementType(argColumn)); - auto argType = blockType->GetItemType(); + const bool isScalar = blockType->GetShape() == TBlockType::EShape::Scalar; + auto argType = blockType->GetItemType(); + bool isOptional; - auto dataType = UnpackOptionalData(argType, isOptional); + auto unpacked = UnpackOptional(argType, isOptional); + if (!unpacked->IsData()) { + return std::make_unique<TPreparedMinMaxBlockGenericAggregator<TTag, IsMin>>(argType, filterColumn, argColumn); + } + + auto dataType = AS_TYPE(TDataType, unpacked); const auto slot = *dataType->GetDataSlot(); if (slot == NUdf::EDataSlot::String) { using TStringType = char*; @@ -666,7 +916,6 @@ std::unique_ptr<typename TTag::TPreparedAggregator> PrepareMinMax(TTupleType* tu using TStringType = NUdf::TUtf8; return std::make_unique<TPreparedMinMaxBlockStringAggregator<TTag, TStringType, IsMin>>(argType, filterColumn, argColumn); } - bool isScalar = blockType->GetShape() == TBlockType::EShape::Scalar; switch (slot) { case NUdf::EDataSlot::Int8: return PrepareMinMaxFixed<TTag, i8, IsMin>(dataType, isOptional, isScalar, filterColumn, argColumn); diff --git a/ydb/library/yql/public/udf/arrow/block_item_comparator.h b/ydb/library/yql/public/udf/arrow/block_item_comparator.h index 93764dc6bad..da868faa707 100644 --- a/ydb/library/yql/public/udf/arrow/block_item_comparator.h +++ b/ydb/library/yql/public/udf/arrow/block_item_comparator.h @@ -126,14 +126,29 @@ public: } bool DoEquals(TBlockItem lhs, TBlockItem rhs) const { + if constexpr (std::is_floating_point<T>::value) { + if (std::isunordered(lhs.As<T>(), rhs.As<T>())) { + return std::isnan(lhs.As<T>()) == std::isnan(rhs.As<T>()); + } + } return lhs.As<T>() == rhs.As<T>(); } bool DoLess(TBlockItem lhs, TBlockItem rhs) const { + if constexpr (std::is_floating_point<T>::value) { + if (std::isunordered(lhs.As<T>(), rhs.As<T>())) { + return std::isnan(lhs.As<T>()) < std::isnan(rhs.As<T>()); + } + } return lhs.As<T>() < rhs.As<T>(); } bool DoGreater(TBlockItem lhs, TBlockItem rhs) const { + if constexpr (std::is_floating_point<T>::value) { + if (std::isunordered(lhs.As<T>(), rhs.As<T>())) { + return std::isnan(lhs.As<T>()) > std::isnan(rhs.As<T>()); + } + } return lhs.As<T>() > rhs.As<T>(); } }; |