aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@ydb.tech>2022-12-30 18:52:29 +0300
committeraneporada <aneporada@ydb.tech>2022-12-30 18:52:29 +0300
commite83ba80b8b391bfc196ab24ba633cf69e9bf327e (patch)
treed9d87eadffd23a8f5c2e4c0daac1015873e1eca0
parent20070209a718431338a2eed7e45673d99c9f3738 (diff)
downloadydb-e83ba80b8b391bfc196ab24ba633cf69e9bf327e.tar.gz
Support strings as aggregation keys
-rw-r--r--ydb/library/yql/core/yql_aggregate_expander.cpp1
-rw-r--r--ydb/library/yql/minikql/arrow/arrow_util.cpp14
-rw-r--r--ydb/library/yql/minikql/arrow/arrow_util.h248
-rw-r--r--ydb/library/yql/minikql/arrow/mkql_bit_utils.h (renamed from ydb/library/yql/minikql/comp_nodes/mkql_bit_utils.h)0
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp243
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp79
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp2
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp2
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/mkql_bit_utils_ut.cpp2
-rw-r--r--ydb/library/yql/minikql/invoke_builtins/mkql_builtins_impl.h180
10 files changed, 490 insertions, 281 deletions
diff --git a/ydb/library/yql/core/yql_aggregate_expander.cpp b/ydb/library/yql/core/yql_aggregate_expander.cpp
index 30ae73f3ac2..a4ef67f11c7 100644
--- a/ydb/library/yql/core/yql_aggregate_expander.cpp
+++ b/ydb/library/yql/core/yql_aggregate_expander.cpp
@@ -614,6 +614,7 @@ TExprNode::TPtr TAggregateExpander::MakeInputBlocks(const TExprNode::TPtr& strea
auto extractorLambda = Ctx.NewLambda(Node->Pos(), Ctx.NewArguments(Node->Pos(), std::move(extractorArgs)), std::move(extractorRoots));
auto mappedWideFlow = Ctx.NewCallable(Node->Pos(), "WideMap", { wideFlow, extractorLambda });
auto blocks = Ctx.NewCallable(Node->Pos(), "WideToBlocks", { mappedWideFlow });
+ blocks = Ctx.NewCallable(Node->Pos(), "BlockExpandChunked", { blocks });
return blocks;
}
diff --git a/ydb/library/yql/minikql/arrow/arrow_util.cpp b/ydb/library/yql/minikql/arrow/arrow_util.cpp
index 2acd9654dc2..37ded548e4e 100644
--- a/ydb/library/yql/minikql/arrow/arrow_util.cpp
+++ b/ydb/library/yql/minikql/arrow/arrow_util.cpp
@@ -1,4 +1,5 @@
#include "arrow_util.h"
+#include "mkql_bit_utils.h"
#include <ydb/library/yql/minikql/mkql_node_builder.h>
#include <util/system/yassert.h>
@@ -52,5 +53,18 @@ std::shared_ptr<arrow::ArrayData> Unwrap(const arrow::ArrayData& data, TType* it
}
}
+std::shared_ptr<arrow::Buffer> AllocateBitmapWithReserve(size_t bitCount, arrow::MemoryPool* pool) {
+ // align up to 64 bit
+ bitCount = (bitCount + 63u) & ~size_t(63u);
+ // this simplifies code compression code - we can write single 64 bit word after array boundaries
+ bitCount += 64;
+ return ARROW_RESULT(arrow::AllocateBitmap(bitCount, pool));
+}
+
+std::shared_ptr<arrow::Buffer> MakeDenseBitmap(const ui8* srcSparse, size_t len, arrow::MemoryPool* pool) {
+ auto bitmap = AllocateBitmapWithReserve(len, pool);
+ CompressSparseBitmap(bitmap->mutable_data(), srcSparse, len);
+ return bitmap;
+}
}
diff --git a/ydb/library/yql/minikql/arrow/arrow_util.h b/ydb/library/yql/minikql/arrow/arrow_util.h
index 76fb9a9aef6..cc46c1efc47 100644
--- a/ydb/library/yql/minikql/arrow/arrow_util.h
+++ b/ydb/library/yql/minikql/arrow/arrow_util.h
@@ -1,6 +1,13 @@
#pragma once
+#include "arrow_defs.h"
+
#include <arrow/array/data.h>
+#include <arrow/buffer_builder.h>
+#include <arrow/datum.h>
+#include <arrow/scalar.h>
+#include <arrow/util/bitmap.h>
+
#include <ydb/library/yql/minikql/mkql_node.h>
namespace NKikimr::NMiniKQL {
@@ -14,4 +21,245 @@ std::shared_ptr<arrow::ArrayData> Chop(std::shared_ptr<arrow::ArrayData>& data,
/// \brief Remove optional from `data` as new ArrayData object
std::shared_ptr<arrow::ArrayData> Unwrap(const arrow::ArrayData& data, TType* itemType);
+std::shared_ptr<arrow::Buffer> AllocateBitmapWithReserve(size_t bitCount, arrow::MemoryPool* pool);
+std::shared_ptr<arrow::Buffer> MakeDenseBitmap(const ui8* srcSparse, size_t len, arrow::MemoryPool* pool);
+
+inline arrow::internal::Bitmap GetBitmap(const arrow::ArrayData& arr, int index) {
+ return arrow::internal::Bitmap{ arr.buffers[index], arr.offset, arr.length };
+}
+
+template <typename T>
+T GetPrimitiveScalarValue(const arrow::Scalar& scalar) {
+ return *static_cast<const T*>(dynamic_cast<const arrow::internal::PrimitiveScalarBase&>(scalar).data());
+}
+
+inline std::string_view GetStringScalarValue(const arrow::Scalar& scalar) {
+ const auto& base = dynamic_cast<const arrow::BaseBinaryScalar&>(scalar);
+ return std::string_view{reinterpret_cast<const char*>(base.value->data()), static_cast<size_t>(base.value->size())};
+}
+
+template <typename T>
+std::shared_ptr<arrow::DataType> GetPrimitiveDataType();
+
+template <>
+inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<bool>() {
+ return arrow::uint8();
+}
+
+template <>
+inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<i8>() {
+ return arrow::int8();
+}
+
+template <>
+inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<ui8>() {
+ return arrow::uint8();
+}
+
+template <>
+inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<i16>() {
+ return arrow::int16();
+}
+
+template <>
+inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<ui16>() {
+ return arrow::uint16();
+}
+
+template <>
+inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<i32>() {
+ return arrow::int32();
+}
+
+template <>
+inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<ui32>() {
+ return arrow::uint32();
+}
+
+template <>
+inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<i64>() {
+ return arrow::int64();
+}
+
+template <>
+inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<ui64>() {
+ return arrow::uint64();
+}
+
+template <>
+inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<char*>() {
+ return arrow::binary();
+}
+
+template <>
+inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<NYql::NUdf::TUtf8>() {
+ return arrow::utf8();
+}
+
+template<typename T>
+struct TPrimitiveDataType;
+
+template<>
+struct TPrimitiveDataType<bool> {
+ using TResult = arrow::UInt8Type;
+};
+
+template<>
+struct TPrimitiveDataType<i8> {
+ using TResult = arrow::Int8Type;
+};
+
+template<>
+struct TPrimitiveDataType<ui8> {
+ using TResult = arrow::UInt8Type;
+};
+
+template<>
+struct TPrimitiveDataType<i16> {
+ using TResult = arrow::Int16Type;
+};
+
+template<>
+struct TPrimitiveDataType<ui16> {
+ using TResult = arrow::UInt16Type;
+};
+
+template<>
+struct TPrimitiveDataType<i32> {
+ using TResult = arrow::Int32Type;
+};
+
+template<>
+struct TPrimitiveDataType<ui32> {
+ using TResult = arrow::UInt32Type;
+};
+
+template<>
+struct TPrimitiveDataType<i64> {
+ using TResult = arrow::Int64Type;
+};
+
+template<>
+struct TPrimitiveDataType<ui64> {
+ using TResult = arrow::UInt64Type;
+};
+
+template<>
+struct TPrimitiveDataType<char*> {
+ using TResult = arrow::BinaryType;
+};
+
+template<>
+struct TPrimitiveDataType<NYql::NUdf::TUtf8> {
+ using TResult = arrow::StringType;
+};
+
+template <typename T>
+arrow::Datum MakeScalarDatum(T value);
+
+template <>
+inline arrow::Datum MakeScalarDatum<bool>(bool value) {
+ return arrow::Datum(std::make_shared<arrow::UInt8Scalar>(value));
+}
+
+template <>
+inline arrow::Datum MakeScalarDatum<i8>(i8 value) {
+ return arrow::Datum(std::make_shared<arrow::Int8Scalar>(value));
+}
+
+template <>
+inline arrow::Datum MakeScalarDatum<ui8>(ui8 value) {
+ return arrow::Datum(std::make_shared<arrow::UInt8Scalar>(value));
+}
+
+template <>
+inline arrow::Datum MakeScalarDatum<i16>(i16 value) {
+ return arrow::Datum(std::make_shared<arrow::Int16Scalar>(value));
+}
+
+template <>
+inline arrow::Datum MakeScalarDatum<ui16>(ui16 value) {
+ return arrow::Datum(std::make_shared<arrow::UInt16Scalar>(value));
+}
+
+template <>
+inline arrow::Datum MakeScalarDatum<i32>(i32 value) {
+ return arrow::Datum(std::make_shared<arrow::Int32Scalar>(value));
+}
+
+template <>
+inline arrow::Datum MakeScalarDatum<ui32>(ui32 value) {
+ return arrow::Datum(std::make_shared<arrow::UInt32Scalar>(value));
+}
+
+template <>
+inline arrow::Datum MakeScalarDatum<i64>(i64 value) {
+ return arrow::Datum(std::make_shared<arrow::Int64Scalar>(value));
+}
+
+template <>
+inline arrow::Datum MakeScalarDatum<ui64>(ui64 value) {
+ return arrow::Datum(std::make_shared<arrow::UInt64Scalar>(value));
+}
+
+// similar to arrow::TypedBufferBuilder, but with UnsafeAdvance() method
+// and shrinkToFit = false
+template<typename T>
+class TTypedBufferBuilder {
+ static_assert(std::is_pod_v<T>);
+ static_assert(!std::is_same_v<T, bool>);
+public:
+ explicit TTypedBufferBuilder(arrow::MemoryPool* pool)
+ : Builder(pool)
+ {
+ }
+
+ inline void Reserve(size_t size) {
+ ARROW_OK(Builder.Reserve(size * sizeof(T)));
+ }
+
+ inline size_t Length() const {
+ return Builder.length() / sizeof(T);
+ }
+
+ inline T* MutableData() {
+ return reinterpret_cast<T*>(Builder.mutable_data());
+ }
+
+ inline T* End() {
+ return MutableData() + Length();
+ }
+
+ inline const T* Data() const {
+ return reinterpret_cast<const T*>(Builder.data());
+ }
+
+ inline void UnsafeAppend(const T* values, size_t count) {
+ std::memcpy(End(), values, count * sizeof(T));
+ UnsafeAdvance(count);
+ }
+
+ inline void UnsafeAppend(size_t count, const T& value) {
+ T* target = End();
+ std::fill(target, target + count, value);
+ UnsafeAdvance(count);
+ }
+
+ inline void UnsafeAppend(T&& value) {
+ *End() = std::move(value);
+ UnsafeAdvance(1);
+ }
+
+ inline void UnsafeAdvance(size_t count) {
+ Builder.UnsafeAdvance(count * sizeof(T));
+ }
+
+ inline std::shared_ptr<arrow::Buffer> Finish() {
+ bool shrinkToFit = false;
+ return ARROW_RESULT(Builder.Finish(shrinkToFit));
+ }
+private:
+ arrow::BufferBuilder Builder;
+};
+
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_bit_utils.h b/ydb/library/yql/minikql/arrow/mkql_bit_utils.h
index 9bb5d898871..9bb5d898871 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_bit_utils.h
+++ b/ydb/library/yql/minikql/arrow/mkql_bit_utils.h
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
index b303d3c4b5a..979a67c4733 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
@@ -1,6 +1,6 @@
#include "mkql_block_agg.h"
#include "mkql_block_agg_factory.h"
-#include "mkql_bit_utils.h"
+#include "mkql_block_builder.h"
#include "mkql_rh_hash.h"
#include <ydb/library/yql/minikql/computation/mkql_computation_node_impl.h>
@@ -11,10 +11,12 @@
#include <ydb/library/yql/minikql/arrow/arrow_defs.h>
#include <ydb/library/yql/minikql/arrow/arrow_util.h>
+#include <ydb/library/yql/minikql/arrow/mkql_bit_utils.h>
#include <arrow/scalar.h>
#include <arrow/array/array_primitive.h>
#include <arrow/array/builder_primitive.h>
+#include <arrow/chunked_array.h>
//#define USE_STD_UNORDERED
@@ -333,6 +335,14 @@ public:
return t;
}
+ std::string_view PopString() {
+ ui32 size = PopNumber<ui32>();
+ Ensure(size);
+ std::string_view result(Buf_.Data() + Pos_, size);
+ Pos_ += size;
+ return result;
+ }
+
private:
void Ensure(size_t delta) {
MKQL_ENSURE(Pos_ + delta <= Buf_.Size(), "Unexpected end of buffer");
@@ -358,6 +368,14 @@ public:
Pos_ += sizeof(T);
}
+ void PushString(std::string_view data) {
+ Ensure(sizeof(ui32) + data.size());
+ *(ui32*)&Vec_[Pos_] = data.size();
+ Pos_ += sizeof(ui32);
+ std::memcpy(Vec_.data() + Pos_, data.data(), data.size());
+ Pos_ += data.size();
+ }
+
// fill with zeros
void Resize(size_t size) {
Pos_ = 0;
@@ -441,6 +459,106 @@ private:
TComputationContext& Ctx_;
};
+template <typename T, bool IsOptional>
+class TStringKeyColumnBuilder : public IKeyColumnBuilder {
+public:
+ using TArrowType = typename TPrimitiveDataType<T>::TResult;
+ using TOffset = typename TArrowType::offset_type;
+
+ TStringKeyColumnBuilder(ui64 size, TComputationContext& ctx)
+ : Ctx_(ctx)
+ , MaxLen_(size)
+ {
+ Reserve();
+ }
+
+ void Add(TInputBuffer& in) final {
+ if constexpr (IsOptional) {
+ if (!in.PopChar()) {
+ NullBuilder_->UnsafeAppend(0);
+ AppendCurrentOffset();
+ return;
+ }
+ }
+
+ std::string_view str = in.PopString();
+
+ size_t currentLen = DataBuilder_->Length();
+ // empty string can always be appended
+ if (!str.empty() && currentLen + str.size() > MaxBlockSizeInBytes) {
+ if (currentLen) {
+ FlushChunk(false);
+ }
+ if (str.size() > MaxBlockSizeInBytes) {
+ DataBuilder_->Reserve(str.size());
+ }
+ }
+
+ AppendCurrentOffset();
+ DataBuilder_->UnsafeAppend((const ui8*)str.data(), str.size());
+ if constexpr (IsOptional) {
+ NullBuilder_->UnsafeAppend(1);
+ }
+ }
+
+ NUdf::TUnboxedValue Build() final {
+ FlushChunk(true);
+ arrow::ArrayVector chunks;
+ for (auto& data : Chunks_) {
+ chunks.push_back(arrow::Datum(data).make_array());
+ }
+ Y_VERIFY(!chunks.empty());
+
+ auto chunked = ARROW_RESULT(arrow::ChunkedArray::Make(std::move(chunks), std::make_shared<TArrowType>()));
+ return Ctx_.HolderFactory.CreateArrowBlock(std::move(chunked));
+ }
+
+private:
+ void Reserve() {
+ if constexpr (IsOptional) {
+ NullBuilder_ = std::make_unique<TTypedBufferBuilder<ui8>>(&Ctx_.ArrowMemoryPool);
+ NullBuilder_->Reserve(MaxLen_ + 1);
+ }
+ OffsetsBuilder_ = std::make_unique<TTypedBufferBuilder<TOffset>>(&Ctx_.ArrowMemoryPool);
+ OffsetsBuilder_->Reserve(MaxLen_ + 1);
+ DataBuilder_ = std::make_unique<TTypedBufferBuilder<ui8>>(&Ctx_.ArrowMemoryPool);
+ DataBuilder_->Reserve(MaxBlockSizeInBytes);
+ }
+
+ void AppendCurrentOffset() {
+ OffsetsBuilder_->UnsafeAppend(DataBuilder_->Length());
+ }
+
+ void FlushChunk(bool finish) {
+ const auto length = OffsetsBuilder_->Length();
+ Y_VERIFY(length > 0);
+
+ AppendCurrentOffset();
+ std::shared_ptr<arrow::Buffer> nullBitmap;
+ if constexpr (IsOptional) {
+ nullBitmap = NullBuilder_->Finish();
+ nullBitmap = MakeDenseBitmap(nullBitmap->data(), length, &Ctx_.ArrowMemoryPool);
+ }
+ std::shared_ptr<arrow::Buffer> offsets = OffsetsBuilder_->Finish();
+ std::shared_ptr<arrow::Buffer> data = DataBuilder_->Finish();
+
+ auto arrowType = std::make_shared<TArrowType>();
+ Chunks_.push_back(arrow::ArrayData::Make(arrowType, length, { nullBitmap, offsets, data }));
+ if (!finish) {
+ Reserve();
+ }
+ }
+
+ std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder_;
+ std::unique_ptr<TTypedBufferBuilder<TOffset>> OffsetsBuilder_;
+ std::unique_ptr<TTypedBufferBuilder<ui8>> DataBuilder_;
+
+ std::vector<std::shared_ptr<arrow::ArrayData>> Chunks_;
+
+ TComputationContext& Ctx_;
+ const ui64 MaxLen_;
+};
+
template <typename T, typename TScalar, typename TBuilder, bool IsOptional>
class TFixedSizeKeySerializer : public IKeySerializer {
public:
@@ -491,6 +609,49 @@ private:
const std::shared_ptr<arrow::DataType> DataType_;
};
+template <typename T, bool IsOptional>
+class TStringKeySerializer : public IKeySerializer {
+public:
+ using TOffset = typename TPrimitiveDataType<T>::TResult::offset_type;
+
+ TStringKeySerializer() = default;
+
+ virtual void Serialize(const arrow::Datum& value, ui64 index, TOutputBuffer& out) const final {
+ std::string_view x;
+ if (value.is_scalar()) {
+ const auto& scalar = *value.scalar();
+ if constexpr (IsOptional) {
+ if (!scalar.is_valid) {
+ out.PushChar(0);
+ return;
+ }
+ out.PushChar(1);
+ }
+ x = GetStringScalarValue(scalar);
+ } else {
+ const auto& array = *value.array();
+ if constexpr (IsOptional) {
+ bool isValid = array.GetNullCount() == 0 ||
+ arrow::BitUtil::GetBit(array.GetValues<uint8_t>(0, 0), index + array.offset);
+ if (!isValid) {
+ out.PushChar(0);
+ return;
+ }
+ out.PushChar(1);
+ }
+ const TOffset* offsets = array.GetValues<TOffset>(1);
+ const char* data = array.GetValues<char>(2, 0);
+ x = std::string_view(data + offsets[index], offsets[index + 1] - offsets[index]);
+ }
+
+ out.PushString(x);
+ }
+
+ std::unique_ptr<IKeyColumnBuilder> MakeBuilder(ui64 size, TComputationContext& ctx) const final {
+ return std::make_unique<TStringKeyColumnBuilder<T, IsOptional>>(size, ctx);
+ }
+};
+
size_t GetBitmapPopCount(const std::shared_ptr<arrow::ArrayData>& arr) {
size_t len = (size_t)arr->length;
MKQL_ENSURE(arr->GetNullCount() == 0, "Bitmap block should not have nulls");
@@ -1242,7 +1403,7 @@ IComputationNode* MakeBlockCombineHashedWrapper(
template <bool UseSet, bool UseFilter>
IComputationNode* MakeBlockCombineHashedWrapper(
- ui32 totalKeysSize,
+ TMaybe<ui32> totalKeysSize,
ui32 totalStateSize,
TComputationMutables& mutables,
IComputationWideFlowNode* flow,
@@ -1251,11 +1412,11 @@ IComputationNode* MakeBlockCombineHashedWrapper(
const std::vector<TKeyParams>& keys,
std::vector<std::unique_ptr<IKeySerializer>>&& keySerializers,
TVector<TAggParams<IBlockAggregatorCombineKeys>>&& aggsParams) {
- if (totalKeysSize <= sizeof(ui32)) {
+ if (totalKeysSize && *totalKeysSize <= sizeof(ui32)) {
return MakeBlockCombineHashedWrapper<UseSet, UseFilter, ui32>(totalStateSize, mutables, flow, filterColumn, width, keys, std::move(keySerializers), std::move(aggsParams));
}
- if (totalKeysSize <= sizeof(ui64)) {
+ if (totalKeysSize && *totalKeysSize <= sizeof(ui64)) {
return MakeBlockCombineHashedWrapper<UseSet, UseFilter, ui64>(totalStateSize, mutables, flow, filterColumn, width, keys, std::move(keySerializers), std::move(aggsParams));
}
@@ -1285,7 +1446,7 @@ IComputationNode* MakeBlockMergeFinalizeHashedWrapper(
template <bool UseSet>
IComputationNode* MakeBlockMergeFinalizeHashedWrapper(
- ui32 totalKeysSize,
+ TMaybe<ui32> totalKeysSize,
ui32 totalStateSize,
TComputationMutables& mutables,
IComputationWideFlowNode* flow,
@@ -1293,11 +1454,11 @@ IComputationNode* MakeBlockMergeFinalizeHashedWrapper(
const std::vector<TKeyParams>& keys,
std::vector<std::unique_ptr<IKeySerializer>>&& keySerializers,
TVector<TAggParams<IBlockAggregatorFinalizeKeys>>&& aggsParams) {
- if (totalKeysSize <= sizeof(ui32)) {
+ if (totalKeysSize && *totalKeysSize <= sizeof(ui32)) {
return MakeBlockMergeFinalizeHashedWrapper<ui32, UseSet>(totalStateSize, mutables, flow, width, keys, std::move(keySerializers), std::move(aggsParams));
}
- if (totalKeysSize <= sizeof(ui64)) {
+ if (totalKeysSize && *totalKeysSize <= sizeof(ui64)) {
return MakeBlockMergeFinalizeHashedWrapper<ui64, UseSet>(totalStateSize, mutables, flow, width, keys, std::move(keySerializers), std::move(aggsParams));
}
@@ -1328,7 +1489,7 @@ IComputationNode* MakeBlockMergeManyFinalizeHashedWrapper(
}
IComputationNode* MakeBlockMergeManyFinalizeHashedWrapper(
- ui32 totalKeysSize,
+ TMaybe<ui32> totalKeysSize,
ui32 totalStateSize,
TComputationMutables& mutables,
IComputationWideFlowNode* flow,
@@ -1338,31 +1499,33 @@ IComputationNode* MakeBlockMergeManyFinalizeHashedWrapper(
TVector<TAggParams<IBlockAggregatorFinalizeKeys>>&& aggsParams,
ui32 streamIndex,
TVector<TVector<ui32>>&& streams) {
- if (totalKeysSize <= sizeof(ui32)) {
+ if (totalKeysSize && *totalKeysSize <= sizeof(ui32)) {
return MakeBlockMergeManyFinalizeHashedWrapper<ui32>(totalStateSize, mutables, flow, width, keys, std::move(keySerializers), std::move(aggsParams), streamIndex, std::move(streams));
}
- if (totalKeysSize <= sizeof(ui64)) {
+ if (totalKeysSize && *totalKeysSize <= sizeof(ui64)) {
return MakeBlockMergeManyFinalizeHashedWrapper<ui64>(totalStateSize, mutables, flow, width, keys, std::move(keySerializers), std::move(aggsParams), streamIndex, std::move(streams));
}
return MakeBlockMergeManyFinalizeHashedWrapper<TSSOKey>(totalStateSize, mutables, flow, width, keys, std::move(keySerializers), std::move(aggsParams), streamIndex, std::move(streams));
}
-void PrepareKeys(const std::vector<TKeyParams>& keys, ui32& totalKeysSize, std::vector<std::unique_ptr<IKeySerializer>>& keySerializers) {
+void PrepareKeys(const std::vector<TKeyParams>& keys, TMaybe<ui32>& totalKeysSize, std::vector<std::unique_ptr<IKeySerializer>>& keySerializers) {
totalKeysSize = 0;
keySerializers.clear();
for (const auto& k : keys) {
auto itemType = AS_TYPE(TBlockType, k.Type)->GetItemType();
bool isOptional;
auto dataType = UnpackOptionalData(itemType, isOptional);
- if (isOptional) {
- totalKeysSize += 1;
+ if (isOptional && totalKeysSize) {
+ *totalKeysSize += 1;
}
switch (*dataType->GetDataSlot()) {
case NUdf::EDataSlot::Int8:
- totalKeysSize += 1;
+ if (totalKeysSize) {
+ *totalKeysSize += 1;
+ }
if (isOptional) {
keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<i8, arrow::Int8Scalar, arrow::Int8Builder, true>>(arrow::int8()));
} else {
@@ -1372,7 +1535,9 @@ void PrepareKeys(const std::vector<TKeyParams>& keys, ui32& totalKeysSize, std::
break;
case NUdf::EDataSlot::Bool:
case NUdf::EDataSlot::Uint8:
- totalKeysSize += 1;
+ if (totalKeysSize) {
+ *totalKeysSize += 1;
+ }
if (isOptional) {
keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui8, arrow::UInt8Scalar, arrow::UInt8Builder, true>>(arrow::uint8()));
} else {
@@ -1381,7 +1546,9 @@ void PrepareKeys(const std::vector<TKeyParams>& keys, ui32& totalKeysSize, std::
break;
case NUdf::EDataSlot::Int16:
- totalKeysSize += 2;
+ if (totalKeysSize) {
+ *totalKeysSize += 2;
+ }
if (isOptional) {
keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<i16, arrow::Int16Scalar, arrow::Int16Builder, true>>(arrow::int16()));
} else {
@@ -1391,7 +1558,9 @@ void PrepareKeys(const std::vector<TKeyParams>& keys, ui32& totalKeysSize, std::
break;
case NUdf::EDataSlot::Uint16:
case NUdf::EDataSlot::Date:
- totalKeysSize += 2;
+ if (totalKeysSize) {
+ *totalKeysSize += 2;
+ }
if (isOptional) {
keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui16, arrow::UInt16Scalar, arrow::UInt16Builder, true>>(arrow::uint16()));
} else {
@@ -1400,7 +1569,9 @@ void PrepareKeys(const std::vector<TKeyParams>& keys, ui32& totalKeysSize, std::
break;
case NUdf::EDataSlot::Int32:
- totalKeysSize += 4;
+ if (totalKeysSize) {
+ *totalKeysSize += 4;
+ }
if (isOptional) {
keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<i32, arrow::Int32Scalar, arrow::Int32Builder, true>>(arrow::int32()));
} else {
@@ -1410,7 +1581,9 @@ void PrepareKeys(const std::vector<TKeyParams>& keys, ui32& totalKeysSize, std::
break;
case NUdf::EDataSlot::Uint32:
case NUdf::EDataSlot::Datetime:
- totalKeysSize += 4;
+ if (totalKeysSize) {
+ *totalKeysSize += 4;
+ }
if (isOptional) {
keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui32, arrow::UInt32Scalar, arrow::UInt32Builder, true>>(arrow::uint32()));
} else {
@@ -1420,7 +1593,9 @@ void PrepareKeys(const std::vector<TKeyParams>& keys, ui32& totalKeysSize, std::
break;
case NUdf::EDataSlot::Int64:
case NUdf::EDataSlot::Interval:
- totalKeysSize += 8;
+ if (totalKeysSize) {
+ *totalKeysSize += 8;
+ }
if (isOptional) {
keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<i64, arrow::Int64Scalar, arrow::Int64Builder, true>>(arrow::int64()));
} else {
@@ -1430,7 +1605,9 @@ void PrepareKeys(const std::vector<TKeyParams>& keys, ui32& totalKeysSize, std::
break;
case NUdf::EDataSlot::Uint64:
case NUdf::EDataSlot::Timestamp:
- totalKeysSize += 8;
+ if (totalKeysSize) {
+ *totalKeysSize += 8;
+ }
if (isOptional) {
keySerializers.emplace_back(std::make_unique<TFixedSizeKeySerializer<ui64, arrow::UInt64Scalar, arrow::UInt64Builder, true>>(arrow::uint64()));
} else {
@@ -1438,6 +1615,24 @@ void PrepareKeys(const std::vector<TKeyParams>& keys, ui32& totalKeysSize, std::
}
break;
+ case NUdf::EDataSlot::String:
+ totalKeysSize = {};
+ if (isOptional) {
+ keySerializers.emplace_back(std::make_unique<TStringKeySerializer<char*, true>>());
+ } else {
+ keySerializers.emplace_back(std::make_unique<TStringKeySerializer<char*, false>>());
+ }
+
+ break;
+ case NUdf::EDataSlot::Utf8:
+ totalKeysSize = {};
+ if (isOptional) {
+ keySerializers.emplace_back(std::make_unique<TStringKeySerializer<NYql::NUdf::TUtf8, true>>());
+ } else {
+ keySerializers.emplace_back(std::make_unique<TStringKeySerializer<NYql::NUdf::TUtf8, false>>());
+ }
+
+ break;
default:
throw yexception() << "Unsupported key type";
}
@@ -1504,7 +1699,7 @@ IComputationNode* WrapBlockCombineHashed(TCallable& callable, const TComputation
TVector<TAggParams<IBlockAggregatorCombineKeys>> aggsParams;
ui32 totalStateSize = FillAggParams<IBlockAggregatorCombineKeys>(aggsVal, tupleType, filterColumn, aggsParams, ctx.Env, false, false);
- ui32 totalKeysSize = 0;
+ TMaybe<ui32> totalKeysSize;
std::vector<std::unique_ptr<IKeySerializer>> keySerializers;
PrepareKeys(keys, totalKeysSize, keySerializers);
@@ -1542,7 +1737,7 @@ IComputationNode* WrapBlockMergeFinalizeHashed(TCallable& callable, const TCompu
TVector<TAggParams<IBlockAggregatorFinalizeKeys>> aggsParams;
ui32 totalStateSize = FillAggParams<IBlockAggregatorFinalizeKeys>(aggsVal, tupleType, {}, aggsParams, ctx.Env, true, false);
- ui32 totalKeysSize = 0;
+ TMaybe<ui32> totalKeysSize;
std::vector<std::unique_ptr<IKeySerializer>> keySerializers;
PrepareKeys(keys, totalKeysSize, keySerializers);
@@ -1572,7 +1767,7 @@ IComputationNode* WrapBlockMergeManyFinalizeHashed(TCallable& callable, const TC
TVector<TAggParams<IBlockAggregatorFinalizeKeys>> aggsParams;
ui32 totalStateSize = FillAggParams<IBlockAggregatorFinalizeKeys>(aggsVal, tupleType, {}, aggsParams, ctx.Env, true, true);
- ui32 totalKeysSize = 0;
+ TMaybe<ui32> totalKeysSize;
std::vector<std::unique_ptr<IKeySerializer>> keySerializers;
PrepareKeys(keys, totalKeysSize, keySerializers);
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp
index ef6914c379c..58bd0a54778 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp
@@ -1,8 +1,8 @@
#include "mkql_block_builder.h"
-#include "mkql_bit_utils.h"
#include <ydb/library/yql/minikql/arrow/arrow_defs.h>
#include <ydb/library/yql/minikql/arrow/arrow_util.h>
+#include <ydb/library/yql/minikql/arrow/mkql_bit_utils.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/minikql/mkql_node_cast.h>
#include <ydb/library/yql/minikql/mkql_type_builder.h>
@@ -46,79 +46,6 @@ std::shared_ptr<arrow::DataType> GetArrowType(TType* type) {
return result;
}
-std::shared_ptr<arrow::Buffer> AllocateBitmapWithReserve(size_t bitCount, arrow::MemoryPool* pool) {
- // align up to 64 bit
- bitCount = (bitCount + 63u) & ~size_t(63u);
- // this simplifies code compression code - we can write single 64 bit word after array boundaries
- bitCount += 64;
- return ARROW_RESULT(arrow::AllocateBitmap(bitCount, pool));
-}
-
-std::shared_ptr<arrow::Buffer> MakeDenseBitmap(const ui8* srcSparse, size_t len, arrow::MemoryPool* pool) {
- auto bitmap = AllocateBitmapWithReserve(len, pool);
- CompressSparseBitmap(bitmap->mutable_data(), srcSparse, len);
- return bitmap;
-}
-
-// similar to arrow::TypedBufferBuilder, but with UnsafeAdvance() method
-template<typename T>
-class TTypedBufferBuilder {
- static_assert(std::is_pod_v<T>);
- static_assert(!std::is_same_v<T, bool>);
-public:
- explicit TTypedBufferBuilder(arrow::MemoryPool* pool)
- : Builder(pool)
- {
- }
-
- inline void Reserve(size_t size) {
- ARROW_OK(Builder.Reserve(size * sizeof(T)));
- }
-
- inline size_t Length() const {
- return Builder.length() / sizeof(T);
- }
-
- inline T* MutableData() {
- return reinterpret_cast<T*>(Builder.mutable_data());
- }
-
- inline T* End() {
- return MutableData() + Length();
- }
-
- inline const T* Data() const {
- return reinterpret_cast<const T*>(Builder.data());
- }
-
- inline void UnsafeAppend(const T* values, size_t count) {
- std::memcpy(End(), values, count * sizeof(T));
- UnsafeAdvance(count);
- }
-
- inline void UnsafeAppend(size_t count, const T& value) {
- T* target = End();
- std::fill(target, target + count, value);
- UnsafeAdvance(count);
- }
-
- inline void UnsafeAppend(T&& value) {
- *End() = std::move(value);
- UnsafeAdvance(1);
- }
-
- inline void UnsafeAdvance(size_t count) {
- Builder.UnsafeAdvance(count * sizeof(T));
- }
-
- inline std::shared_ptr<arrow::Buffer> Finish() {
- bool shrinkToFit = false;
- return ARROW_RESULT(Builder.Finish(shrinkToFit));
- }
-private:
- arrow::BufferBuilder Builder;
-};
-
class TBlockBuilderBase : public IBlockBuilder {
public:
using Ptr = std::unique_ptr<TBlockBuilderBase>;
@@ -362,7 +289,6 @@ public:
AppendCurrentOffset();
return;
}
- NullBuilder->UnsafeAppend(1);
}
const TStringBuf str = value.AsStringRef();
@@ -380,6 +306,9 @@ public:
AppendCurrentOffset();
DataBuilder->UnsafeAppend((const ui8*)str.data(), str.size());
+ if (Nullable) {
+ NullBuilder->UnsafeAppend(1);
+ }
}
void DoAddDefault() final {
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp
index d2d4ad6c1b2..47cda0a5999 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp
@@ -1,8 +1,8 @@
#include "mkql_block_compress.h"
-#include "mkql_bit_utils.h"
#include "mkql_block_builder.h"
#include <ydb/library/yql/minikql/arrow/arrow_util.h>
+#include <ydb/library/yql/minikql/arrow/mkql_bit_utils.h>
#include <ydb/library/yql/minikql/mkql_type_builder.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/minikql/mkql_node_builder.h>
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp
index 8c58736815b..c33168c4108 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp
@@ -1,7 +1,7 @@
#include "mkql_block_logical.h"
-#include "mkql_bit_utils.h"
#include <ydb/library/yql/minikql/arrow/arrow_defs.h>
+#include <ydb/library/yql/minikql/arrow/mkql_bit_utils.h>
#include <ydb/library/yql/minikql/mkql_type_builder.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/minikql/mkql_node_builder.h>
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_bit_utils_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_bit_utils_ut.cpp
index 919ee317a3b..619de191ea9 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_bit_utils_ut.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_bit_utils_ut.cpp
@@ -1,6 +1,6 @@
#include "mkql_computation_node_ut.h"
-#include <ydb/library/yql/minikql/comp_nodes/mkql_bit_utils.h>
+#include <ydb/library/yql/minikql/arrow/mkql_bit_utils.h>
namespace NKikimr {
namespace NMiniKQL {
diff --git a/ydb/library/yql/minikql/invoke_builtins/mkql_builtins_impl.h b/ydb/library/yql/minikql/invoke_builtins/mkql_builtins_impl.h
index 07516e823cd..ed2c43f866f 100644
--- a/ydb/library/yql/minikql/invoke_builtins/mkql_builtins_impl.h
+++ b/ydb/library/yql/minikql/invoke_builtins/mkql_builtins_impl.h
@@ -5,15 +5,14 @@
#include <ydb/library/yql/public/udf/udf_types.h>
#include <ydb/library/yql/minikql/mkql_function_metadata.h>
#include <ydb/library/yql/minikql/arrow/arrow_defs.h>
+#include <ydb/library/yql/minikql/arrow/arrow_util.h>
#include <util/string/cast.h>
#include "mkql_builtins.h"
#include "mkql_builtins_codegen.h"
#include <arrow/compute/function.h>
-#include <arrow/scalar.h>
#include <arrow/util/bit_util.h>
-#include <arrow/util/bitmap.h>
#include <arrow/util/bitmap_ops.h>
namespace NKikimr {
@@ -806,125 +805,6 @@ void RegisterAggrMax(IBuiltinFunctionRegistry& registry);
void RegisterAggrMin(IBuiltinFunctionRegistry& registry);
void RegisterWith(IBuiltinFunctionRegistry& registry);
-inline arrow::internal::Bitmap GetBitmap(const arrow::ArrayData& arr, int index) {
- return arrow::internal::Bitmap{ arr.buffers[index], arr.offset, arr.length };
-}
-
-template <typename T>
-std::shared_ptr<arrow::DataType> GetPrimitiveDataType();
-
-template <>
-inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<bool>() {
- return arrow::uint8();
-}
-
-template <>
-inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<i8>() {
- return arrow::int8();
-}
-
-template <>
-inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<ui8>() {
- return arrow::uint8();
-}
-
-template <>
-inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<i16>() {
- return arrow::int16();
-}
-
-template <>
-inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<ui16>() {
- return arrow::uint16();
-}
-
-template <>
-inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<i32>() {
- return arrow::int32();
-}
-
-template <>
-inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<ui32>() {
- return arrow::uint32();
-}
-
-template <>
-inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<i64>() {
- return arrow::int64();
-}
-
-template <>
-inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<ui64>() {
- return arrow::uint64();
-}
-
-template <>
-inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<char*>() {
- return arrow::binary();
-}
-
-template <>
-inline std::shared_ptr<arrow::DataType> GetPrimitiveDataType<NYql::NUdf::TUtf8>() {
- return arrow::utf8();
-}
-
-template<typename T>
-struct TPrimitiveDataType;
-
-template<>
-struct TPrimitiveDataType<bool> {
- using TResult = arrow::UInt8Type;
-};
-
-template<>
-struct TPrimitiveDataType<i8> {
- using TResult = arrow::Int8Type;
-};
-
-template<>
-struct TPrimitiveDataType<ui8> {
- using TResult = arrow::UInt8Type;
-};
-
-template<>
-struct TPrimitiveDataType<i16> {
- using TResult = arrow::Int16Type;
-};
-
-template<>
-struct TPrimitiveDataType<ui16> {
- using TResult = arrow::UInt16Type;
-};
-
-template<>
-struct TPrimitiveDataType<i32> {
- using TResult = arrow::Int32Type;
-};
-
-template<>
-struct TPrimitiveDataType<ui32> {
- using TResult = arrow::UInt32Type;
-};
-
-template<>
-struct TPrimitiveDataType<i64> {
- using TResult = arrow::Int64Type;
-};
-
-template<>
-struct TPrimitiveDataType<ui64> {
- using TResult = arrow::UInt64Type;
-};
-
-template<>
-struct TPrimitiveDataType<char*> {
- using TResult = arrow::BinaryType;
-};
-
-template<>
-struct TPrimitiveDataType<NYql::NUdf::TUtf8> {
- using TResult = arrow::StringType;
-};
template <typename T>
arrow::compute::InputType GetPrimitiveInputArrowType() {
@@ -936,64 +816,6 @@ arrow::compute::OutputType GetPrimitiveOutputArrowType() {
return arrow::compute::OutputType(GetPrimitiveDataType<T>());
}
-template <typename T>
-T GetPrimitiveScalarValue(const arrow::Scalar& scalar) {
- return *static_cast<const T*>(dynamic_cast<const arrow::internal::PrimitiveScalarBase&>(scalar).data());
-}
-
-inline std::string_view GetStringScalarValue(const arrow::Scalar& scalar) {
- const auto& base = dynamic_cast<const arrow::BaseBinaryScalar&>(scalar);
- return std::string_view{reinterpret_cast<const char*>(base.value->data()), static_cast<size_t>(base.value->size())};
-}
-
-template <typename T>
-arrow::Datum MakeScalarDatum(T value);
-
-template <>
-inline arrow::Datum MakeScalarDatum<bool>(bool value) {
- return arrow::Datum(std::make_shared<arrow::UInt8Scalar>(value));
-}
-
-template <>
-inline arrow::Datum MakeScalarDatum<i8>(i8 value) {
- return arrow::Datum(std::make_shared<arrow::Int8Scalar>(value));
-}
-
-template <>
-inline arrow::Datum MakeScalarDatum<ui8>(ui8 value) {
- return arrow::Datum(std::make_shared<arrow::UInt8Scalar>(value));
-}
-
-template <>
-inline arrow::Datum MakeScalarDatum<i16>(i16 value) {
- return arrow::Datum(std::make_shared<arrow::Int16Scalar>(value));
-}
-
-template <>
-inline arrow::Datum MakeScalarDatum<ui16>(ui16 value) {
- return arrow::Datum(std::make_shared<arrow::UInt16Scalar>(value));
-}
-
-template <>
-inline arrow::Datum MakeScalarDatum<i32>(i32 value) {
- return arrow::Datum(std::make_shared<arrow::Int32Scalar>(value));
-}
-
-template <>
-inline arrow::Datum MakeScalarDatum<ui32>(ui32 value) {
- return arrow::Datum(std::make_shared<arrow::UInt32Scalar>(value));
-}
-
-template <>
-inline arrow::Datum MakeScalarDatum<i64>(i64 value) {
- return arrow::Datum(std::make_shared<arrow::Int64Scalar>(value));
-}
-
-template <>
-inline arrow::Datum MakeScalarDatum<ui64>(ui64 value) {
- return arrow::Datum(std::make_shared<arrow::UInt64Scalar>(value));
-}
-
template<typename TDerived>
struct TUnaryKernelExecsBase {
static arrow::Status Exec(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) {