aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@ydb.tech>2023-03-03 17:59:22 +0300
committeraneporada <aneporada@ydb.tech>2023-03-03 17:59:22 +0300
commita61300f2c663f1ade2a187a1bf5f696ce65b74da (patch)
treeb52899752c173f5fef1b1afbd15ce8287872ee92
parent0fe89e4d5fa637c208c5ee3918e3549d86715f73 (diff)
downloadydb-a61300f2c663f1ade2a187a1bf5f696ce65b74da.tar.gz
[blocks] Support tuples and nested optionals in MIN/MAX
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp273
-rw-r--r--ydb/library/yql/public/udf/arrow/block_item_comparator.h15
2 files changed, 276 insertions, 12 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp
index f91f486b631..54381052ef8 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp
@@ -9,6 +9,10 @@
#include <ydb/library/yql/minikql/arrow/arrow_defs.h>
#include <ydb/library/yql/minikql/arrow/arrow_util.h>
+#include <ydb/library/yql/minikql/comp_nodes/mkql_block_reader.h>
+
+#include <ydb/library/yql/public/udf/arrow/block_item_comparator.h>
+
#include <arrow/scalar.h>
#include <arrow/array/builder_primitive.h>
@@ -17,7 +21,7 @@ namespace NMiniKQL {
namespace {
-template <bool IsMin, typename T>
+template<bool IsMin, typename T>
inline T UpdateMinMax(T x, T y) {
if constexpr (IsMin) {
return x < y ? x : y;
@@ -26,21 +30,30 @@ inline T UpdateMinMax(T x, T y) {
}
}
-template <bool IsMin, typename T>
+template<bool IsMin, typename T>
inline void UpdateMinMax(TMaybe<T>& state, bool& stateUpdated, T value) {
- if (!state) {
- state = value;
- stateUpdated = true;
- return;
+ if constexpr (IsMin) {
+ if (!state || value < *state) {
+ state = value;
+ stateUpdated = true;
+ }
+ } else {
+ if (!state || *state < value) {
+ state = value;
+ stateUpdated = true;
+ }
}
+}
+template<bool IsMin>
+inline void UpdateMinMax(NYql::NUdf::IBlockItemComparator& comparator, TBlockItem& state, bool& stateUpdated, TBlockItem value) {
if constexpr (IsMin) {
- if (value < *state) {
+ if (!state || comparator.Less(value, state)) {
state = value;
stateUpdated = true;
}
} else {
- if (*state < value) {
+ if (!state || comparator.Less(state, value)) {
state = value;
stateUpdated = true;
}
@@ -50,9 +63,12 @@ inline void UpdateMinMax(TMaybe<T>& state, bool& stateUpdated, T value) {
template<typename TTag, typename TString, bool IsMin>
class TMinMaxBlockStringAggregator;
-template <typename TTag, bool IsNullable, bool IsScalar, typename TIn, bool IsMin>
+template<typename TTag, bool IsNullable, bool IsScalar, typename TIn, bool IsMin>
class TMinMaxBlockFixedAggregator;
+template<typename TTag, bool IsMin>
+class TMinMaxBlockGenericAggregator;
+
template <bool IsNullable, typename TIn, bool IsMin>
struct TState;
@@ -137,6 +153,211 @@ private:
TComputationContext& Ctx_;
};
+template <bool IsMin>
+void PushValueToState(TGenericState* typedState, const arrow::Datum& datum, ui64 row, IBlockReader& reader,
+ IBlockItemConverter& converter, NYql::NUdf::IBlockItemComparator& comparator, TComputationContext& ctx)
+{
+ TBlockItem stateItem;
+ bool stateChanged = false;
+ if (datum.is_scalar()) {
+ if (datum.scalar()->is_valid) {
+ stateItem = reader.GetScalarItem(*datum.scalar());
+ stateChanged = true;
+ }
+ } else {
+ stateItem = converter.MakeItem(*typedState);
+
+ const auto& array = datum.array();
+
+ constexpr int updateCmp = IsMin ? -1 : 1;
+
+ TBlockItem curr = reader.GetItem(*array, row);
+ if (curr) {
+ UpdateMinMax<IsMin>(comparator, stateItem, stateChanged, curr);
+ }
+ }
+
+ if (stateChanged) {
+ typedState->DeleteUnreferenced();
+ *typedState = converter.MakeValue(stateItem, ctx.HolderFactory);
+ }
+}
+
+template<bool IsMin>
+class TMinMaxBlockGenericAggregator<TCombineAllTag, IsMin> : public TCombineAllTag::TBase {
+public:
+ using TBase = TCombineAllTag::TBase;
+
+ TMinMaxBlockGenericAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
+ : TBase(sizeof(TGenericState), filterColumn, ctx)
+ , ArgColumn_(argColumn)
+ , ReaderOne_(MakeBlockReader(TTypeInfoHelper(), type))
+ , ReaderTwo_(MakeBlockReader(TTypeInfoHelper(), type))
+ , Converter_(MakeBlockItemConverter(TTypeInfoHelper(), type))
+ , Compare_(NYql::NUdf::MakeBlockItemComparator(TTypeInfoHelper(), type))
+ {
+ }
+
+ void InitState(void* state) final {
+ new(state) TGenericState();
+ }
+
+ void DestroyState(void* state) noexcept final {
+ auto typedState = static_cast<TGenericState*>(state);
+ typedState->DeleteUnreferenced();
+ *typedState = TGenericState();
+ }
+
+ void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final {
+ TGenericState& typedState = *static_cast<TGenericState*>(state);
+ Y_UNUSED(batchLength);
+ const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
+
+ IBlockReader* currReader = ReaderOne_.get();
+ IBlockReader* stateReader = ReaderTwo_.get();
+
+ TBlockItem stateItem;
+ bool stateChanged = false;
+ if (datum.is_scalar()) {
+ if (datum.scalar()->is_valid) {
+ stateItem = currReader->GetScalarItem(*datum.scalar());
+ stateChanged = true;
+ }
+ } else {
+ stateItem = Converter_->MakeItem(typedState);
+
+ const auto& array = datum.array();
+ auto len = array->length;
+
+ const ui8* filterBitmap = nullptr;
+ if (filtered) {
+ const auto& filterDatum = TArrowBlock::From(columns[*FilterColumn_]).GetDatum();
+ const auto& filterArray = filterDatum.array();
+ MKQL_ENSURE(filterArray->GetNullCount() == 0, "Expected non-nullable bool column");
+ filterBitmap = filterArray->template GetValues<uint8_t>(1);
+ }
+ auto& comparator = *Compare_;
+ for (size_t i = 0; i < len; ++i) {
+ TBlockItem curr = currReader->GetItem(*array, i);
+ if (curr && (!filterBitmap || filterBitmap[i])) {
+ bool changed = false;
+ UpdateMinMax<IsMin>(comparator, stateItem, changed, curr);
+ if (changed) {
+ std::swap(currReader, stateReader);
+ stateChanged = true;
+ }
+ }
+ }
+ }
+
+ if (stateChanged) {
+ typedState.DeleteUnreferenced();
+ typedState = Converter_->MakeValue(stateItem, Ctx_.HolderFactory);
+ }
+ }
+
+ NUdf::TUnboxedValue FinishOne(const void *state) final {
+ auto typedState = *static_cast<const TGenericState *>(state);
+ return typedState;
+ }
+
+private:
+ const ui32 ArgColumn_;
+ const std::unique_ptr<IBlockReader> ReaderOne_;
+ const std::unique_ptr<IBlockReader> ReaderTwo_;
+ const std::unique_ptr<IBlockItemConverter> Converter_;
+ const std::unique_ptr<NYql::NUdf::IBlockItemComparator> Compare_;
+};
+
+template<bool IsMin>
+class TMinMaxBlockGenericAggregator<TCombineKeysTag, IsMin> : public TCombineKeysTag::TBase {
+public:
+ using TBase = TCombineKeysTag::TBase;
+
+ TMinMaxBlockGenericAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
+ : TBase(sizeof(TGenericState), filterColumn, ctx)
+ , ArgColumn_(argColumn)
+ , Type_(type)
+ , Reader_(MakeBlockReader(TTypeInfoHelper(), type))
+ , Converter_(MakeBlockItemConverter(TTypeInfoHelper(), type))
+ , Compare_(NYql::NUdf::MakeBlockItemComparator(TTypeInfoHelper(), type))
+ {
+ }
+
+ void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ new(state) TGenericState();
+ UpdateKey(state, columns, row);
+ }
+
+ void DestroyState(void* state) noexcept final {
+ auto typedState = static_cast<TGenericState*>(state);
+ typedState->DeleteUnreferenced();
+ *typedState = TGenericState();
+ }
+
+ void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ auto typedState = static_cast<TGenericState*>(state);
+ const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
+ PushValueToState<IsMin>(typedState, datum, row, *Reader_, *Converter_, *Compare_, Ctx_);
+ }
+
+ std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) final {
+ return std::make_unique<TGenericColumnBuilder>(size, Type_, Ctx_);
+ }
+
+private:
+ const ui32 ArgColumn_;
+ TType* const Type_;
+ const std::unique_ptr<IBlockReader> Reader_;
+ const std::unique_ptr<IBlockItemConverter> Converter_;
+ const std::unique_ptr<NYql::NUdf::IBlockItemComparator> Compare_;
+};
+
+template<bool IsMin>
+class TMinMaxBlockGenericAggregator<TFinalizeKeysTag, IsMin> : public TFinalizeKeysTag::TBase {
+public:
+ using TBase = TFinalizeKeysTag::TBase;
+
+ TMinMaxBlockGenericAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
+ : TBase(sizeof(TGenericState), filterColumn, ctx)
+ , ArgColumn_(argColumn)
+ , Type_(type)
+ , Reader_(MakeBlockReader(TTypeInfoHelper(), type))
+ , Converter_(MakeBlockItemConverter(TTypeInfoHelper(), type))
+ , Compare_(NYql::NUdf::MakeBlockItemComparator(TTypeInfoHelper(), type))
+
+ {
+ }
+
+ void LoadState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ new(state) TGenericState();
+ UpdateState(state, columns, row);
+ }
+
+ void DestroyState(void* state) noexcept final {
+ auto typedState = static_cast<TGenericState*>(state);
+ typedState->DeleteUnreferenced();
+ *typedState = TGenericState();
+ }
+
+ void UpdateState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ auto typedState = static_cast<TGenericState*>(state);
+ const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
+ PushValueToState<IsMin>(typedState, datum, row, *Reader_, *Converter_, *Compare_, Ctx_);
+ }
+
+ std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final {
+ return std::make_unique<TGenericColumnBuilder>(size, Type_, Ctx_);
+ }
+
+private:
+ const ui32 ArgColumn_;
+ TType* const Type_;
+ const std::unique_ptr<IBlockReader> Reader_;
+ const std::unique_ptr<IBlockItemConverter> Converter_;
+ const std::unique_ptr<NYql::NUdf::IBlockItemComparator> Compare_;
+};
+
template <typename TStringType, bool IsMin>
void PushValueToState(TGenericState* typedState, const arrow::Datum& datum, ui64 row) {
using TOffset = typename TPrimitiveDataType<TStringType>::TResult::offset_type;;
@@ -638,6 +859,28 @@ private:
const ui32 ArgColumn_;
};
+template <typename TTag, bool IsMin>
+class TPreparedMinMaxBlockGenericAggregator : public TTag::TPreparedAggregator {
+public:
+ using TBase = typename TTag::TPreparedAggregator;
+
+ TPreparedMinMaxBlockGenericAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn)
+ : TBase(sizeof(TGenericState))
+ , Type_(type)
+ , FilterColumn_(filterColumn)
+ , ArgColumn_(argColumn)
+ {}
+
+ std::unique_ptr<typename TTag::TAggregator> Make(TComputationContext& ctx) const final {
+ return std::make_unique<TMinMaxBlockGenericAggregator<TTag, IsMin>>(Type_, FilterColumn_, ArgColumn_, ctx);
+ }
+
+private:
+ TType* const Type_;
+ const std::optional<ui32> FilterColumn_;
+ const ui32 ArgColumn_;
+};
+
template<typename TTag, typename TIn, bool IsMin>
std::unique_ptr<typename TTag::TPreparedAggregator> PrepareMinMaxFixed(TType* type, bool isOptional, bool isScalar, std::optional<ui32> filterColumn, ui32 argColumn) {
if (isScalar) {
@@ -655,9 +898,16 @@ std::unique_ptr<typename TTag::TPreparedAggregator> PrepareMinMaxFixed(TType* ty
template <typename TTag, bool IsMin>
std::unique_ptr<typename TTag::TPreparedAggregator> PrepareMinMax(TTupleType* tupleType, std::optional<ui32> filterColumn, ui32 argColumn) {
auto blockType = AS_TYPE(TBlockType, tupleType->GetElementType(argColumn));
- auto argType = blockType->GetItemType();
+ const bool isScalar = blockType->GetShape() == TBlockType::EShape::Scalar;
+ auto argType = blockType->GetItemType();
+
bool isOptional;
- auto dataType = UnpackOptionalData(argType, isOptional);
+ auto unpacked = UnpackOptional(argType, isOptional);
+ if (!unpacked->IsData()) {
+ return std::make_unique<TPreparedMinMaxBlockGenericAggregator<TTag, IsMin>>(argType, filterColumn, argColumn);
+ }
+
+ auto dataType = AS_TYPE(TDataType, unpacked);
const auto slot = *dataType->GetDataSlot();
if (slot == NUdf::EDataSlot::String) {
using TStringType = char*;
@@ -666,7 +916,6 @@ std::unique_ptr<typename TTag::TPreparedAggregator> PrepareMinMax(TTupleType* tu
using TStringType = NUdf::TUtf8;
return std::make_unique<TPreparedMinMaxBlockStringAggregator<TTag, TStringType, IsMin>>(argType, filterColumn, argColumn);
}
- bool isScalar = blockType->GetShape() == TBlockType::EShape::Scalar;
switch (slot) {
case NUdf::EDataSlot::Int8:
return PrepareMinMaxFixed<TTag, i8, IsMin>(dataType, isOptional, isScalar, filterColumn, argColumn);
diff --git a/ydb/library/yql/public/udf/arrow/block_item_comparator.h b/ydb/library/yql/public/udf/arrow/block_item_comparator.h
index 93764dc6bad..da868faa707 100644
--- a/ydb/library/yql/public/udf/arrow/block_item_comparator.h
+++ b/ydb/library/yql/public/udf/arrow/block_item_comparator.h
@@ -126,14 +126,29 @@ public:
}
bool DoEquals(TBlockItem lhs, TBlockItem rhs) const {
+ if constexpr (std::is_floating_point<T>::value) {
+ if (std::isunordered(lhs.As<T>(), rhs.As<T>())) {
+ return std::isnan(lhs.As<T>()) == std::isnan(rhs.As<T>());
+ }
+ }
return lhs.As<T>() == rhs.As<T>();
}
bool DoLess(TBlockItem lhs, TBlockItem rhs) const {
+ if constexpr (std::is_floating_point<T>::value) {
+ if (std::isunordered(lhs.As<T>(), rhs.As<T>())) {
+ return std::isnan(lhs.As<T>()) < std::isnan(rhs.As<T>());
+ }
+ }
return lhs.As<T>() < rhs.As<T>();
}
bool DoGreater(TBlockItem lhs, TBlockItem rhs) const {
+ if constexpr (std::is_floating_point<T>::value) {
+ if (std::isunordered(lhs.As<T>(), rhs.As<T>())) {
+ return std::isnan(lhs.As<T>()) > std::isnan(rhs.As<T>());
+ }
+ }
return lhs.As<T>() > rhs.As<T>();
}
};