diff options
author | aneporada <aneporada@ydb.tech> | 2022-12-30 18:52:29 +0300 |
---|---|---|
committer | aneporada <aneporada@ydb.tech> | 2022-12-30 18:52:29 +0300 |
commit | e83ba80b8b391bfc196ab24ba633cf69e9bf327e (patch) | |
tree | d9d87eadffd23a8f5c2e4c0daac1015873e1eca0 | |
parent | 20070209a718431338a2eed7e45673d99c9f3738 (diff) | |
download | ydb-e83ba80b8b391bfc196ab24ba633cf69e9bf327e.tar.gz |
Support strings as aggregation keys
-rw-r--r-- | ydb/library/yql/core/yql_aggregate_expander.cpp | 1 | ||||
-rw-r--r-- | ydb/library/yql/minikql/arrow/arrow_util.cpp | 14 | ||||
-rw-r--r-- | ydb/library/yql/minikql/arrow/arrow_util.h | 248 | ||||
-rw-r--r-- | ydb/library/yql/minikql/arrow/mkql_bit_utils.h (renamed from ydb/library/yql/minikql/comp_nodes/mkql_bit_utils.h) | 0 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp | 243 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp | 79 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp | 2 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp | 2 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/ut/mkql_bit_utils_ut.cpp | 2 | ||||
-rw-r--r-- | ydb/library/yql/minikql/invoke_builtins/mkql_builtins_impl.h | 180 |
10 files changed, 490 insertions, 281 deletions
diff --git a/ydb/library/yql/core/yql_aggregate_expander.cpp b/ydb/library/yql/core/yql_aggregate_expander.cpp index 30ae73f3ac2..a4ef67f11c7 100644 --- a/ydb/library/yql/core/yql_aggregate_expander.cpp +++ b/ydb/library/yql/core/yql_aggregate_expander.cpp @@ -614,6 +614,7 @@ TExprNode::TPtr TAggregateExpander::MakeInputBlocks(const TExprNode::TPtr& strea auto extractorLambda = Ctx.NewLambda(Node->Pos(), Ctx.NewArguments(Node->Pos(), std::move(extractorArgs)), std::move(extractorRoots)); auto mappedWideFlow = Ctx.NewCallable(Node->Pos(), "WideMap", { wideFlow, extractorLambda }); auto blocks = Ctx.NewCallable(Node->Pos(), "WideToBlocks", { mappedWideFlow }); + blocks = Ctx.NewCallable(Node->Pos(), "BlockExpandChunked", { blocks }); return blocks; } diff --git a/ydb/library/yql/minikql/arrow/arrow_util.cpp b/ydb/library/yql/minikql/arrow/arrow_util.cpp index 2acd9654dc2..37ded548e4e 100644 --- a/ydb/library/yql/minikql/arrow/arrow_util.cpp +++ b/ydb/library/yql/minikql/arrow/arrow_util.cpp @@ -1,4 +1,5 @@ #include "arrow_util.h" +#include "mkql_bit_utils.h" #include <ydb/library/yql/minikql/mkql_node_builder.h> #include <util/system/yassert.h> @@ -52,5 +53,18 @@ std::shared_ptr<arrow::ArrayData> Unwrap(const arrow::ArrayData& data, TType* it } } +std::shared_ptr<arrow::Buffer> AllocateBitmapWithReserve(size_t bitCount, arrow::MemoryPool* pool) { + // align up to 64 bit + bitCount = (bitCount + 63u) & ~size_t(63u); + // this simplifies code compression code - we can write single 64 bit word after array boundaries + bitCount += 64; + return ARROW_RESULT(arrow::AllocateBitmap(bitCount, pool)); +} + +std::shared_ptr<arrow::Buffer> MakeDenseBitmap(const ui8* srcSparse, size_t len, arrow::MemoryPool* pool) { + auto bitmap = AllocateBitmapWithReserve(len, pool); + CompressSparseBitmap(bitmap->mutable_data(), srcSparse, len); + return bitmap; +} } diff --git a/ydb/library/yql/minikql/arrow/arrow_util.h b/ydb/library/yql/minikql/arrow/arrow_util.h index 76fb9a9aef6..cc46c1efc47 100644 --- a/ydb/library/yql/minikql/arrow/arrow_util.h +++ b/ydb/library/yql/minikql/arrow/arrow_util.h @@ -1,6 +1,13 @@ #pragma once +#include "arrow_defs.h" + #include <arrow/array/data.h> +#include <arrow/buffer_builder.h> +#include <arrow/datum.h> +#include <arrow/scalar.h> +#include <arrow/util/bitmap.h> + #include <ydb/library/yql/minikql/mkql_node.h> namespace NKikimr::NMiniKQL { @@ -14,4 +21,245 @@ std::shared_ptr<arrow::ArrayData> Chop(std::shared_ptr<arrow::ArrayData>& data, /// \brief Remove optional from `data` as new ArrayData object std::shared_ptr<arrow::ArrayData> Unwrap(const arrow::ArrayData& data, TType* itemType); +std::shared_ptr<arrow::Buffer> AllocateBitmapWithReserve(size_t bitCount, arrow::MemoryPool* pool); +std::shared_ptr<arrow::Buffer> MakeDenseBitmap(const ui8* srcSparse, size_t len, arrow::MemoryPool* pool); + +inline arrow::internal::Bitmap GetBitmap(const arrow::ArrayData& arr, int index) { + return arrow::internal::Bitmap{ arr.buffers[index], arr.offset, arr.length }; +} + +template <typename T> +T GetPrimitiveScalarValue(const arrow::Scalar& scalar) { + return *static_cast<const T*>(dynamic_cast<const arrow::internal::PrimitiveScalarBase&>(scalar).data()); +} + +inline std::string_view GetStringScalarValue(const arrow::Scalar& scalar) { + const auto& base = dynamic_cast<const arrow::BaseBinaryScalar&>(scalar); + return std::string_view{reinterpret_cast<const char*>(base.value->data()), static_cast<size_t>(base.value->size())}; +} + +template <typename T> +std::shared_ptr<arrow::DataType> GetPrimitiveDataType(); + +template <> +inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<bool>() { + return arrow::uint8(); +} + +template <> +inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<i8>() { + return arrow::int8(); +} + +template <> +inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<ui8>() { + return arrow::uint8(); +} + +template <> +inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<i16>() { + return arrow::int16(); +} + +template <> +inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<ui16>() { + return arrow::uint16(); +} + +template <> +inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<i32>() { + return arrow::int32(); +} + +template <> +inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<ui32>() { + return arrow::uint32(); +} + +template <> +inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<i64>() { + return arrow::int64(); +} + +template <> +inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<ui64>() { + return arrow::uint64(); +} + +template <> +inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<char*>() { + return arrow::binary(); +} + +template <> +inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<NYql::NUdf::TUtf8>() { + return arrow::utf8(); +} + +template<typename T> +struct TPrimitiveDataType; + +template<> +struct TPrimitiveDataType<bool> { + using TResult = arrow::UInt8Type; +}; + +template<> +struct TPrimitiveDataType<i8> { + using TResult = arrow::Int8Type; +}; + +template<> +struct TPrimitiveDataType<ui8> { + using TResult = arrow::UInt8Type; +}; + +template<> +struct TPrimitiveDataType<i16> { + using TResult = arrow::Int16Type; +}; + +template<> +struct TPrimitiveDataType<ui16> { + using TResult = arrow::UInt16Type; +}; + +template<> +struct TPrimitiveDataType<i32> { + using TResult = arrow::Int32Type; +}; + +template<> +struct TPrimitiveDataType<ui32> { + using TResult = arrow::UInt32Type; +}; + +template<> +struct TPrimitiveDataType<i64> { + using TResult = arrow::Int64Type; +}; + +template<> +struct TPrimitiveDataType<ui64> { + using TResult = arrow::UInt64Type; +}; + +template<> +struct TPrimitiveDataType<char*> { + using TResult = arrow::BinaryType; +}; + +template<> +struct TPrimitiveDataType<NYql::NUdf::TUtf8> { + using TResult = arrow::StringType; +}; + +template <typename T> +arrow::Datum MakeScalarDatum(T value); + +template <> +inline arrow::Datum MakeScalarDatum<bool>(bool value) { + return arrow::Datum(std::make_shared<arrow::UInt8Scalar>(value)); +} + +template <> +inline arrow::Datum MakeScalarDatum<i8>(i8 value) { + return arrow::Datum(std::make_shared<arrow::Int8Scalar>(value)); +} + +template <> +inline arrow::Datum MakeScalarDatum<ui8>(ui8 value) { + return arrow::Datum(std::make_shared<arrow::UInt8Scalar>(value)); +} + +template <> +inline arrow::Datum MakeScalarDatum<i16>(i16 value) { + return arrow::Datum(std::make_shared<arrow::Int16Scalar>(value)); +} + +template <> +inline arrow::Datum MakeScalarDatum<ui16>(ui16 value) { + return arrow::Datum(std::make_shared<arrow::UInt16Scalar>(value)); +} + +template <> +inline arrow::Datum MakeScalarDatum<i32>(i32 value) { + return arrow::Datum(std::make_shared<arrow::Int32Scalar>(value)); +} + +template <> +inline arrow::Datum MakeScalarDatum<ui32>(ui32 value) { + return arrow::Datum(std::make_shared<arrow::UInt32Scalar>(value)); +} + +template <> +inline arrow::Datum MakeScalarDatum<i64>(i64 value) { + return arrow::Datum(std::make_shared<arrow::Int64Scalar>(value)); +} + +template <> +inline arrow::Datum MakeScalarDatum<ui64>(ui64 value) { + return arrow::Datum(std::make_shared<arrow::UInt64Scalar>(value)); +} + +// similar to arrow::TypedBufferBuilder, but with UnsafeAdvance() method +// and shrinkToFit = false +template<typename T> +class TTypedBufferBuilder { + static_assert(std::is_pod_v<T>); + static_assert(!std::is_same_v<T, bool>); +public: + explicit TTypedBufferBuilder(arrow::MemoryPool* pool) + : Builder(pool) + { + } + + inline void Reserve(size_t size) { + ARROW_OK(Builder.Reserve(size * sizeof(T))); + } + + inline size_t Length() const { + return Builder.length() / sizeof(T); + } + + inline T* MutableData() { + return reinterpret_cast<T*>(Builder.mutable_data()); + } + + inline T* End() { + return MutableData() + Length(); + } + + inline const T* Data() const { + return reinterpret_cast<const T*>(Builder.data()); + } + + inline void UnsafeAppend(const T* values, size_t count) { + std::memcpy(End(), values, count * sizeof(T)); + UnsafeAdvance(count); + } + + inline void UnsafeAppend(size_t count, const T& value) { + T* target = End(); + std::fill(target, target + count, value); + UnsafeAdvance(count); + } + + inline void UnsafeAppend(T&& value) { + *End() = std::move(value); + UnsafeAdvance(1); + } + + inline void UnsafeAdvance(size_t count) { + Builder.UnsafeAdvance(count * sizeof(T)); + } + + inline std::shared_ptr<arrow::Buffer> Finish() { + bool shrinkToFit = false; + return ARROW_RESULT(Builder.Finish(shrinkToFit)); + } +private: + arrow::BufferBuilder Builder; +}; + } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_bit_utils.h b/ydb/library/yql/minikql/arrow/mkql_bit_utils.h index 9bb5d898871..9bb5d898871 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_bit_utils.h +++ b/ydb/library/yql/minikql/arrow/mkql_bit_utils.h diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp index b303d3c4b5a..979a67c4733 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp @@ -1,6 +1,6 @@ #include "mkql_block_agg.h" #include "mkql_block_agg_factory.h" -#include "mkql_bit_utils.h" +#include "mkql_block_builder.h" #include "mkql_rh_hash.h" #include <ydb/library/yql/minikql/computation/mkql_computation_node_impl.h> @@ -11,10 +11,12 @@ #include <ydb/library/yql/minikql/arrow/arrow_defs.h> #include <ydb/library/yql/minikql/arrow/arrow_util.h> +#include <ydb/library/yql/minikql/arrow/mkql_bit_utils.h> #include <arrow/scalar.h> #include <arrow/array/array_primitive.h> #include <arrow/array/builder_primitive.h> +#include <arrow/chunked_array.h> //#define USE_STD_UNORDERED @@ -333,6 +335,14 @@ public: return t; } + std::string_view PopString() { + ui32 size = PopNumber<ui32>(); + Ensure(size); + std::string_view result(Buf_.Data() + Pos_, size); + Pos_ += size; + return result; + } + private: void Ensure(size_t delta) { MKQL_ENSURE(Pos_ + delta <= Buf_.Size(), "Unexpected end of buffer"); @@ -358,6 +368,14 @@ public: Pos_ += sizeof(T); } + void PushString(std::string_view data) { + Ensure(sizeof(ui32) + data.size()); + *(ui32*)&Vec_[Pos_] = data.size(); + Pos_ += sizeof(ui32); + std::memcpy(Vec_.data() + Pos_, data.data(), data.size()); + Pos_ += data.size(); + } + // fill with zeros void Resize(size_t size) { Pos_ = 0; @@ -441,6 +459,106 @@ private: TComputationContext& Ctx_; }; +template <typename T, bool IsOptional> +class TStringKeyColumnBuilder : public IKeyColumnBuilder { +public: + using TArrowType = typename TPrimitiveDataType<T>::TResult; + using TOffset = typename TArrowType::offset_type; + + TStringKeyColumnBuilder(ui64 size, TComputationContext& ctx) + : Ctx_(ctx) + , MaxLen_(size) + { + Reserve(); + } + + void Add(TInputBuffer& in) final { + if constexpr (IsOptional) { + if (!in.PopChar()) { + NullBuilder_->UnsafeAppend(0); + AppendCurrentOffset(); + return; + } + } + + std::string_view str = in.PopString(); + + size_t currentLen = DataBuilder_->Length(); + // empty string can always be appended + if (!str.empty() && currentLen + str.size() > MaxBlockSizeInBytes) { + if (currentLen) { + FlushChunk(false); + } + if (str.size() > MaxBlockSizeInBytes) { + DataBuilder_->Reserve(str.size()); + } + } + + AppendCurrentOffset(); + DataBuilder_->UnsafeAppend((const ui8*)str.data(), str.size()); + if constexpr (IsOptional) { + NullBuilder_->UnsafeAppend(1); + } + } + + NUdf::TUnboxedValue Build() final { + FlushChunk(true); + arrow::ArrayVector chunks; + for (auto& data : Chunks_) { + chunks.push_back(arrow::Datum(data).make_array()); + } + Y_VERIFY(!chunks.empty()); + + auto chunked = ARROW_RESULT(arrow::ChunkedArray::Make(std::move(chunks), std::make_shared<TArrowType>())); + return Ctx_.HolderFactory.CreateArrowBlock(std::move(chunked)); + } + +private: + void Reserve() { + if constexpr (IsOptional) { + NullBuilder_ = std::make_unique<TTypedBufferBuilder<ui8>>(&Ctx_.ArrowMemoryPool); + NullBuilder_->Reserve(MaxLen_ + 1); + } + OffsetsBuilder_ = std::make_unique<TTypedBufferBuilder<TOffset>>(&Ctx_.ArrowMemoryPool); + OffsetsBuilder_->Reserve(MaxLen_ + 1); + DataBuilder_ = std::make_unique<TTypedBufferBuilder<ui8>>(&Ctx_.ArrowMemoryPool); + DataBuilder_->Reserve(MaxBlockSizeInBytes); + } + + void AppendCurrentOffset() { + OffsetsBuilder_->UnsafeAppend(DataBuilder_->Length()); + } + + void FlushChunk(bool finish) { + const auto length = OffsetsBuilder_->Length(); + Y_VERIFY(length > 0); + + AppendCurrentOffset(); + std::shared_ptr<arrow::Buffer> nullBitmap; + if constexpr (IsOptional) { + nullBitmap = NullBuilder_->Finish(); + nullBitmap = MakeDenseBitmap(nullBitmap->data(), length, &Ctx_.ArrowMemoryPool); + } + std::shared_ptr<arrow::Buffer> offsets = OffsetsBuilder_->Finish(); + std::shared_ptr<arrow::Buffer> data = DataBuilder_->Finish(); + + auto arrowType = std::make_shared<TArrowType>(); + Chunks_.push_back(arrow::ArrayData::Make(arrowType, length, { nullBitmap, offsets, data })); + if (!finish) { + Reserve(); + } + } + + std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder_; + std::unique_ptr<TTypedBufferBuilder<TOffset>> OffsetsBuilder_; + std::unique_ptr<TTypedBufferBuilder<ui8>> DataBuilder_; + + std::vector<std::shared_ptr<arrow::ArrayData>> Chunks_; + + TComputationContext& Ctx_; + const ui64 MaxLen_; +}; + template <typename T, typename TScalar, typename TBuilder, bool IsOptional> class TFixedSizeKeySerializer : public IKeySerializer { public: @@ -491,6 +609,49 @@ private: const std::shared_ptr<arrow::DataType> DataType_; }; +template <typename T, bool IsOptional> +class TStringKeySerializer : public IKeySerializer { +public: + using TOffset = typename TPrimitiveDataType<T>::TResult::offset_type; + + TStringKeySerializer() = default; + + virtual void Serialize(const arrow::Datum& value, ui64 index, TOutputBuffer& out) const final { + std::string_view x; + if (value.is_scalar()) { + const auto& scalar = *value.scalar(); + if constexpr (IsOptional) { + if (!scalar.is_valid) { + out.PushChar(0); + return; + } + out.PushChar(1); + } + x = GetStringScalarValue(scalar); + } else { + const auto& array = *value.array(); + if constexpr (IsOptional) { + bool isValid = array.GetNullCount() == 0 || + arrow::BitUtil::GetBit(array.GetValues<uint8_t>(0, 0), index + array.offset); + if (!isValid) { + out.PushChar(0); + return; + } + out.PushChar(1); + } + const TOffset* offsets = array.GetValues<TOffset>(1); + const char* data = array.GetValues<char>(2, 0); + x = std::string_view(data + offsets[index], offsets[index + 1] - offsets[index]); + } + + out.PushString(x); + } + + std::unique_ptr<IKeyColumnBuilder> MakeBuilder(ui64 size, TComputationContext& ctx) const final { + return std::make_unique<TStringKeyColumnBuilder<T, IsOptional>>(size, ctx); + } +}; + size_t GetBitmapPopCount(const std::shared_ptr<arrow::ArrayData>& arr) { size_t len = (size_t)arr->length; MKQL_ENSURE(arr->GetNullCount() == 0, "Bitmap block should not have nulls"); @@ -1242,7 +1403,7 @@ IComputationNode* MakeBlockCombineHashedWrapper( template <bool UseSet, bool UseFilter> IComputationNode* MakeBlockCombineHashedWrapper( - ui32 totalKeysSize, + TMaybe<ui32> totalKeysSize, ui32 totalStateSize, TComputationMutables& mutables, IComputationWideFlowNode* flow, @@ -1251,11 +1412,11 @@ IComputationNode* MakeBlockCombineHashedWrapper( const std::vector<TKeyParams>& keys, std::vector<std::unique_ptr<IKeySerializer>>&& keySerializers, TVector<TAggParams<IBlockAggregatorCombineKeys>>&& aggsParams) { - if (totalKeysSize <= sizeof(ui32)) { + if (totalKeysSize && *totalKeysSize <= sizeof(ui32)) { return MakeBlockCombineHashedWrapper<UseSet, UseFilter, ui32>(totalStateSize, mutables, flow, filterColumn, width, keys, std::move(keySerializers), std::move(aggsParams)); } - if (totalKeysSize <= sizeof(ui64)) { + if (totalKeysSize && *totalKeysSize <= sizeof(ui64)) { return MakeBlockCombineHashedWrapper<UseSet, UseFilter, ui64>(totalStateSize, mutables, flow, filterColumn, width, keys, std::move(keySerializers), std::move(aggsParams)); } @@ -1285,7 +1446,7 @@ IComputationNode* MakeBlockMergeFinalizeHashedWrapper( template <bool UseSet> IComputationNode* MakeBlockMergeFinalizeHashedWrapper( - ui32 totalKeysSize, + TMaybe<ui32> totalKeysSize, ui32 totalStateSize, TComputationMutables& mutables, IComputationWideFlowNode* flow, @@ -1293,11 +1454,11 @@ IComputationNode* MakeBlockMergeFinalizeHashedWrapper( const std::vector<TKeyParams>& keys, std::vector<std::unique_ptr<IKeySerializer>>&& keySerializers, TVector<TAggParams<IBlockAggregatorFinalizeKeys>>&& aggsParams) { - if (totalKeysSize <= sizeof(ui32)) { + if (totalKeysSize && *totalKeysSize <= sizeof(ui32)) { return MakeBlockMergeFinalizeHashedWrapper<ui32, UseSet>(totalStateSize, mutables, flow, width, keys, std::move(keySerializers), std::move(aggsParams)); } - if (totalKeysSize <= sizeof(ui64)) { + if (totalKeysSize && *totalKeysSize <= sizeof(ui64)) { return MakeBlockMergeFinalizeHashedWrapper<ui64, UseSet>(totalStateSize, mutables, flow, width, keys, std::move(keySerializers), std::move(aggsParams)); } @@ -1328,7 +1489,7 @@ IComputationNode* MakeBlockMergeManyFinalizeHashedWrapper( } IComputationNode* MakeBlockMergeManyFinalizeHashedWrapper( - ui32 totalKeysSize, + TMaybe<ui32> totalKeysSize, ui32 totalStateSize, TComputationMutables& mutables, IComputationWideFlowNode* flow, @@ -1338,31 +1499,33 @@ IComputationNode* MakeBlockMergeManyFinalizeHashedWrapper( TVector<TAggParams<IBlockAggregatorFinalizeKeys>>&& aggsParams, ui32 streamIndex, TVector<TVector<ui32>>&& streams) { - if (totalKeysSize <= sizeof(ui32)) { + if (totalKeysSize && *totalKeysSize <= sizeof(ui32)) { return MakeBlockMergeManyFinalizeHashedWrapper<ui32>(totalStateSize, mutables, flow, width, keys, std::move(keySerializers), std::move(aggsParams), streamIndex, std::move(streams)); } - if (totalKeysSize <= sizeof(ui64)) { + if (totalKeysSize && *totalKeysSize <= sizeof(ui64)) { return MakeBlockMergeManyFinalizeHashedWrapper<ui64>(totalStateSize, mutables, flow, width, keys, std::move(keySerializers), std::move(aggsParams), streamIndex, std::move(streams)); } return MakeBlockMergeManyFinalizeHashedWrapper<TSSOKey>(totalStateSize, mutables, flow, width, keys, std::move(keySerializers), std::move(aggsParams), streamIndex, std::move(streams)); } -void PrepareKeys(const std::vector<TKeyParams>& keys, ui32& totalKeysSize, std::vector<std::unique_ptr<IKeySerializer>>& keySerializers) { +void PrepareKeys(const std::vector<TKeyParams>& keys, TMaybe<ui32>& totalKeysSize, std::vector<std::unique_ptr<IKeySerializer>>& keySerializers) { totalKeysSize = 0; keySerializers.clear(); for (const auto& k : keys) { auto itemType = AS_TYPE(TBlockType, k.Type)->GetItemType(); bool isOptional; auto dataType = UnpackOptionalData(itemType, isOptional); - if (isOptional) { - totalKeysSize += 1; + if (isOptional && totalKeysSize) { + *totalKeysSize += 1; } switch (*dataType->GetDataSlot()) { case NUdf::EDataSlot::Int8: - totalKeysSize += 1; + if (totalKeysSize) { + *totalKeysSize += 1; + } if (isOptional) { keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<i8, arrow::Int8Scalar, arrow::Int8Builder, true>>(arrow::int8())); } else { @@ -1372,7 +1535,9 @@ void PrepareKeys(const std::vector<TKeyParams>& keys, ui32& totalKeysSize, std:: break; case NUdf::EDataSlot::Bool: case NUdf::EDataSlot::Uint8: - totalKeysSize += 1; + if (totalKeysSize) { + *totalKeysSize += 1; + } if (isOptional) { keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui8, arrow::UInt8Scalar, arrow::UInt8Builder, true>>(arrow::uint8())); } else { @@ -1381,7 +1546,9 @@ void PrepareKeys(const std::vector<TKeyParams>& keys, ui32& totalKeysSize, std:: break; case NUdf::EDataSlot::Int16: - totalKeysSize += 2; + if (totalKeysSize) { + *totalKeysSize += 2; + } if (isOptional) { keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<i16, arrow::Int16Scalar, arrow::Int16Builder, true>>(arrow::int16())); } else { @@ -1391,7 +1558,9 @@ void PrepareKeys(const std::vector<TKeyParams>& keys, ui32& totalKeysSize, std:: break; case NUdf::EDataSlot::Uint16: case NUdf::EDataSlot::Date: - totalKeysSize += 2; + if (totalKeysSize) { + *totalKeysSize += 2; + } if (isOptional) { keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui16, arrow::UInt16Scalar, arrow::UInt16Builder, true>>(arrow::uint16())); } else { @@ -1400,7 +1569,9 @@ void PrepareKeys(const std::vector<TKeyParams>& keys, ui32& totalKeysSize, std:: break; case NUdf::EDataSlot::Int32: - totalKeysSize += 4; + if (totalKeysSize) { + *totalKeysSize += 4; + } if (isOptional) { keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<i32, arrow::Int32Scalar, arrow::Int32Builder, true>>(arrow::int32())); } else { @@ -1410,7 +1581,9 @@ void PrepareKeys(const std::vector<TKeyParams>& keys, ui32& totalKeysSize, std:: break; case NUdf::EDataSlot::Uint32: case NUdf::EDataSlot::Datetime: - totalKeysSize += 4; + if (totalKeysSize) { + *totalKeysSize += 4; + } if (isOptional) { keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui32, arrow::UInt32Scalar, arrow::UInt32Builder, true>>(arrow::uint32())); } else { @@ -1420,7 +1593,9 @@ void PrepareKeys(const std::vector<TKeyParams>& keys, ui32& totalKeysSize, std:: break; case NUdf::EDataSlot::Int64: case NUdf::EDataSlot::Interval: - totalKeysSize += 8; + if (totalKeysSize) { + *totalKeysSize += 8; + } if (isOptional) { keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<i64, arrow::Int64Scalar, arrow::Int64Builder, true>>(arrow::int64())); } else { @@ -1430,7 +1605,9 @@ void PrepareKeys(const std::vector<TKeyParams>& keys, ui32& totalKeysSize, std:: break; case NUdf::EDataSlot::Uint64: case NUdf::EDataSlot::Timestamp: - totalKeysSize += 8; + if (totalKeysSize) { + *totalKeysSize += 8; + } if (isOptional) { keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui64, arrow::UInt64Scalar, arrow::UInt64Builder, true>>(arrow::uint64())); } else { @@ -1438,6 +1615,24 @@ void PrepareKeys(const std::vector<TKeyParams>& keys, ui32& totalKeysSize, std:: } break; + case NUdf::EDataSlot::String: + totalKeysSize = {}; + if (isOptional) { + keySerializers.emplace_back(std::make_unique<TStringKeySerializer<char*, true>>()); + } else { + keySerializers.emplace_back(std::make_unique<TStringKeySerializer<char*, false>>()); + } + + break; + case NUdf::EDataSlot::Utf8: + totalKeysSize = {}; + if (isOptional) { + keySerializers.emplace_back(std::make_unique<TStringKeySerializer<NYql::NUdf::TUtf8, true>>()); + } else { + keySerializers.emplace_back(std::make_unique<TStringKeySerializer<NYql::NUdf::TUtf8, false>>()); + } + + break; default: throw yexception() << "Unsupported key type"; } @@ -1504,7 +1699,7 @@ IComputationNode* WrapBlockCombineHashed(TCallable& callable, const TComputation TVector<TAggParams<IBlockAggregatorCombineKeys>> aggsParams; ui32 totalStateSize = FillAggParams<IBlockAggregatorCombineKeys>(aggsVal, tupleType, filterColumn, aggsParams, ctx.Env, false, false); - ui32 totalKeysSize = 0; + TMaybe<ui32> totalKeysSize; std::vector<std::unique_ptr<IKeySerializer>> keySerializers; PrepareKeys(keys, totalKeysSize, keySerializers); @@ -1542,7 +1737,7 @@ IComputationNode* WrapBlockMergeFinalizeHashed(TCallable& callable, const TCompu TVector<TAggParams<IBlockAggregatorFinalizeKeys>> aggsParams; ui32 totalStateSize = FillAggParams<IBlockAggregatorFinalizeKeys>(aggsVal, tupleType, {}, aggsParams, ctx.Env, true, false); - ui32 totalKeysSize = 0; + TMaybe<ui32> totalKeysSize; std::vector<std::unique_ptr<IKeySerializer>> keySerializers; PrepareKeys(keys, totalKeysSize, keySerializers); @@ -1572,7 +1767,7 @@ IComputationNode* WrapBlockMergeManyFinalizeHashed(TCallable& callable, const TC TVector<TAggParams<IBlockAggregatorFinalizeKeys>> aggsParams; ui32 totalStateSize = FillAggParams<IBlockAggregatorFinalizeKeys>(aggsVal, tupleType, {}, aggsParams, ctx.Env, true, true); - ui32 totalKeysSize = 0; + TMaybe<ui32> totalKeysSize; std::vector<std::unique_ptr<IKeySerializer>> keySerializers; PrepareKeys(keys, totalKeysSize, keySerializers); diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp index ef6914c379c..58bd0a54778 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp @@ -1,8 +1,8 @@ #include "mkql_block_builder.h" -#include "mkql_bit_utils.h" #include <ydb/library/yql/minikql/arrow/arrow_defs.h> #include <ydb/library/yql/minikql/arrow/arrow_util.h> +#include <ydb/library/yql/minikql/arrow/mkql_bit_utils.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> #include <ydb/library/yql/minikql/mkql_node_cast.h> #include <ydb/library/yql/minikql/mkql_type_builder.h> @@ -46,79 +46,6 @@ std::shared_ptr<arrow::DataType> GetArrowType(TType* type) { return result; } -std::shared_ptr<arrow::Buffer> AllocateBitmapWithReserve(size_t bitCount, arrow::MemoryPool* pool) { - // align up to 64 bit - bitCount = (bitCount + 63u) & ~size_t(63u); - // this simplifies code compression code - we can write single 64 bit word after array boundaries - bitCount += 64; - return ARROW_RESULT(arrow::AllocateBitmap(bitCount, pool)); -} - -std::shared_ptr<arrow::Buffer> MakeDenseBitmap(const ui8* srcSparse, size_t len, arrow::MemoryPool* pool) { - auto bitmap = AllocateBitmapWithReserve(len, pool); - CompressSparseBitmap(bitmap->mutable_data(), srcSparse, len); - return bitmap; -} - -// similar to arrow::TypedBufferBuilder, but with UnsafeAdvance() method -template<typename T> -class TTypedBufferBuilder { - static_assert(std::is_pod_v<T>); - static_assert(!std::is_same_v<T, bool>); -public: - explicit TTypedBufferBuilder(arrow::MemoryPool* pool) - : Builder(pool) - { - } - - inline void Reserve(size_t size) { - ARROW_OK(Builder.Reserve(size * sizeof(T))); - } - - inline size_t Length() const { - return Builder.length() / sizeof(T); - } - - inline T* MutableData() { - return reinterpret_cast<T*>(Builder.mutable_data()); - } - - inline T* End() { - return MutableData() + Length(); - } - - inline const T* Data() const { - return reinterpret_cast<const T*>(Builder.data()); - } - - inline void UnsafeAppend(const T* values, size_t count) { - std::memcpy(End(), values, count * sizeof(T)); - UnsafeAdvance(count); - } - - inline void UnsafeAppend(size_t count, const T& value) { - T* target = End(); - std::fill(target, target + count, value); - UnsafeAdvance(count); - } - - inline void UnsafeAppend(T&& value) { - *End() = std::move(value); - UnsafeAdvance(1); - } - - inline void UnsafeAdvance(size_t count) { - Builder.UnsafeAdvance(count * sizeof(T)); - } - - inline std::shared_ptr<arrow::Buffer> Finish() { - bool shrinkToFit = false; - return ARROW_RESULT(Builder.Finish(shrinkToFit)); - } -private: - arrow::BufferBuilder Builder; -}; - class TBlockBuilderBase : public IBlockBuilder { public: using Ptr = std::unique_ptr<TBlockBuilderBase>; @@ -362,7 +289,6 @@ public: AppendCurrentOffset(); return; } - NullBuilder->UnsafeAppend(1); } const TStringBuf str = value.AsStringRef(); @@ -380,6 +306,9 @@ public: AppendCurrentOffset(); DataBuilder->UnsafeAppend((const ui8*)str.data(), str.size()); + if (Nullable) { + NullBuilder->UnsafeAppend(1); + } } void DoAddDefault() final { diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp index d2d4ad6c1b2..47cda0a5999 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp @@ -1,8 +1,8 @@ #include "mkql_block_compress.h" -#include "mkql_bit_utils.h" #include "mkql_block_builder.h" #include <ydb/library/yql/minikql/arrow/arrow_util.h> +#include <ydb/library/yql/minikql/arrow/mkql_bit_utils.h> #include <ydb/library/yql/minikql/mkql_type_builder.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> #include <ydb/library/yql/minikql/mkql_node_builder.h> diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp index 8c58736815b..c33168c4108 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp @@ -1,7 +1,7 @@ #include "mkql_block_logical.h" -#include "mkql_bit_utils.h" #include <ydb/library/yql/minikql/arrow/arrow_defs.h> +#include <ydb/library/yql/minikql/arrow/mkql_bit_utils.h> #include <ydb/library/yql/minikql/mkql_type_builder.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> #include <ydb/library/yql/minikql/mkql_node_builder.h> diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_bit_utils_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_bit_utils_ut.cpp index 919ee317a3b..619de191ea9 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_bit_utils_ut.cpp +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_bit_utils_ut.cpp @@ -1,6 +1,6 @@ #include "mkql_computation_node_ut.h" -#include <ydb/library/yql/minikql/comp_nodes/mkql_bit_utils.h> +#include <ydb/library/yql/minikql/arrow/mkql_bit_utils.h> namespace NKikimr { namespace NMiniKQL { diff --git a/ydb/library/yql/minikql/invoke_builtins/mkql_builtins_impl.h b/ydb/library/yql/minikql/invoke_builtins/mkql_builtins_impl.h index 07516e823cd..ed2c43f866f 100644 --- a/ydb/library/yql/minikql/invoke_builtins/mkql_builtins_impl.h +++ b/ydb/library/yql/minikql/invoke_builtins/mkql_builtins_impl.h @@ -5,15 +5,14 @@ #include <ydb/library/yql/public/udf/udf_types.h> #include <ydb/library/yql/minikql/mkql_function_metadata.h> #include <ydb/library/yql/minikql/arrow/arrow_defs.h> +#include <ydb/library/yql/minikql/arrow/arrow_util.h> #include <util/string/cast.h> #include "mkql_builtins.h" #include "mkql_builtins_codegen.h" #include <arrow/compute/function.h> -#include <arrow/scalar.h> #include <arrow/util/bit_util.h> -#include <arrow/util/bitmap.h> #include <arrow/util/bitmap_ops.h> namespace NKikimr { @@ -806,125 +805,6 @@ void RegisterAggrMax(IBuiltinFunctionRegistry& registry); void RegisterAggrMin(IBuiltinFunctionRegistry& registry); void RegisterWith(IBuiltinFunctionRegistry& registry); -inline arrow::internal::Bitmap GetBitmap(const arrow::ArrayData& arr, int index) { - return arrow::internal::Bitmap{ arr.buffers[index], arr.offset, arr.length }; -} - -template <typename T> -std::shared_ptr<arrow::DataType> GetPrimitiveDataType(); - -template <> -inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<bool>() { - return arrow::uint8(); -} - -template <> -inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<i8>() { - return arrow::int8(); -} - -template <> -inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<ui8>() { - return arrow::uint8(); -} - -template <> -inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<i16>() { - return arrow::int16(); -} - -template <> -inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<ui16>() { - return arrow::uint16(); -} - -template <> -inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<i32>() { - return arrow::int32(); -} - -template <> -inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<ui32>() { - return arrow::uint32(); -} - -template <> -inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<i64>() { - return arrow::int64(); -} - -template <> -inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<ui64>() { - return arrow::uint64(); -} - -template <> -inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<char*>() { - return arrow::binary(); -} - -template <> -inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<NYql::NUdf::TUtf8>() { - return arrow::utf8(); -} - -template<typename T> -struct TPrimitiveDataType; - -template<> -struct TPrimitiveDataType<bool> { - using TResult = arrow::UInt8Type; -}; - -template<> -struct TPrimitiveDataType<i8> { - using TResult = arrow::Int8Type; -}; - -template<> -struct TPrimitiveDataType<ui8> { - using TResult = arrow::UInt8Type; -}; - -template<> -struct TPrimitiveDataType<i16> { - using TResult = arrow::Int16Type; -}; - -template<> -struct TPrimitiveDataType<ui16> { - using TResult = arrow::UInt16Type; -}; - -template<> -struct TPrimitiveDataType<i32> { - using TResult = arrow::Int32Type; -}; - -template<> -struct TPrimitiveDataType<ui32> { - using TResult = arrow::UInt32Type; -}; - -template<> -struct TPrimitiveDataType<i64> { - using TResult = arrow::Int64Type; -}; - -template<> -struct TPrimitiveDataType<ui64> { - using TResult = arrow::UInt64Type; -}; - -template<> -struct TPrimitiveDataType<char*> { - using TResult = arrow::BinaryType; -}; - -template<> -struct TPrimitiveDataType<NYql::NUdf::TUtf8> { - using TResult = arrow::StringType; -}; template <typename T> arrow::compute::InputType GetPrimitiveInputArrowType() { @@ -936,64 +816,6 @@ arrow::compute::OutputType GetPrimitiveOutputArrowType() { return arrow::compute::OutputType(GetPrimitiveDataType<T>()); } -template <typename T> -T GetPrimitiveScalarValue(const arrow::Scalar& scalar) { - return *static_cast<const T*>(dynamic_cast<const arrow::internal::PrimitiveScalarBase&>(scalar).data()); -} - -inline std::string_view GetStringScalarValue(const arrow::Scalar& scalar) { - const auto& base = dynamic_cast<const arrow::BaseBinaryScalar&>(scalar); - return std::string_view{reinterpret_cast<const char*>(base.value->data()), static_cast<size_t>(base.value->size())}; -} - -template <typename T> -arrow::Datum MakeScalarDatum(T value); - -template <> -inline arrow::Datum MakeScalarDatum<bool>(bool value) { - return arrow::Datum(std::make_shared<arrow::UInt8Scalar>(value)); -} - -template <> -inline arrow::Datum MakeScalarDatum<i8>(i8 value) { - return arrow::Datum(std::make_shared<arrow::Int8Scalar>(value)); -} - -template <> -inline arrow::Datum MakeScalarDatum<ui8>(ui8 value) { - return arrow::Datum(std::make_shared<arrow::UInt8Scalar>(value)); -} - -template <> -inline arrow::Datum MakeScalarDatum<i16>(i16 value) { - return arrow::Datum(std::make_shared<arrow::Int16Scalar>(value)); -} - -template <> -inline arrow::Datum MakeScalarDatum<ui16>(ui16 value) { - return arrow::Datum(std::make_shared<arrow::UInt16Scalar>(value)); -} - -template <> -inline arrow::Datum MakeScalarDatum<i32>(i32 value) { - return arrow::Datum(std::make_shared<arrow::Int32Scalar>(value)); -} - -template <> -inline arrow::Datum MakeScalarDatum<ui32>(ui32 value) { - return arrow::Datum(std::make_shared<arrow::UInt32Scalar>(value)); -} - -template <> -inline arrow::Datum MakeScalarDatum<i64>(i64 value) { - return arrow::Datum(std::make_shared<arrow::Int64Scalar>(value)); -} - -template <> -inline arrow::Datum MakeScalarDatum<ui64>(ui64 value) { - return arrow::Datum(std::make_shared<arrow::UInt64Scalar>(value)); -} - template<typename TDerived> struct TUnaryKernelExecsBase { static arrow::Status Exec(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) { |