diff options
author | vvvv <vvvv@ydb.tech> | 2023-01-25 23:20:45 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2023-01-25 23:20:45 +0300 |
commit | 252723c46e7003fc87b6545a53370e25a4be74d0 (patch) | |
tree | 93f54cf5db40a67adefe8565df415425be0a94d1 | |
parent | fbcf4e52d02eef4937008d7d332b87be3fb0ed2e (diff) | |
download | ydb-252723c46e7003fc87b6545a53370e25a4be74d0.tar.gz |
extracted readers&builders for simple block udfs, implemented vectorized Url::GetHost
37 files changed, 1649 insertions, 1331 deletions
diff --git a/ydb/library/yql/minikql/arrow/arrow_util.cpp b/ydb/library/yql/minikql/arrow/arrow_util.cpp index 942ae6ef95..3c43da0b9d 100644 --- a/ydb/library/yql/minikql/arrow/arrow_util.cpp +++ b/ydb/library/yql/minikql/arrow/arrow_util.cpp @@ -25,18 +25,4 @@ std::shared_ptr<arrow::ArrayData> Unwrap(const arrow::ArrayData& data, TType* it } } -std::shared_ptr<arrow::Buffer> AllocateBitmapWithReserve(size_t bitCount, arrow::MemoryPool* pool) { - // align up to 64 bit - bitCount = (bitCount + 63u) & ~size_t(63u); - // this simplifies code compression code - we can write single 64 bit word after array boundaries - bitCount += 64; - return ARROW_RESULT(arrow::AllocateBitmap(bitCount, pool)); -} - -std::shared_ptr<arrow::Buffer> MakeDenseBitmap(const ui8* srcSparse, size_t len, arrow::MemoryPool* pool) { - auto bitmap = AllocateBitmapWithReserve(len, pool); - CompressSparseBitmap(bitmap->mutable_data(), srcSparse, len); - return bitmap; -} - } diff --git a/ydb/library/yql/minikql/arrow/arrow_util.h b/ydb/library/yql/minikql/arrow/arrow_util.h index 6623fc4384..a0e58ccbc1 100644 --- a/ydb/library/yql/minikql/arrow/arrow_util.h +++ b/ydb/library/yql/minikql/arrow/arrow_util.h @@ -19,8 +19,8 @@ using NYql::NUdf::Chop; /// \brief Remove optional from `data` as new ArrayData object std::shared_ptr<arrow::ArrayData> Unwrap(const arrow::ArrayData& data, TType* itemType); -std::shared_ptr<arrow::Buffer> AllocateBitmapWithReserve(size_t bitCount, arrow::MemoryPool* pool); -std::shared_ptr<arrow::Buffer> MakeDenseBitmap(const ui8* srcSparse, size_t len, arrow::MemoryPool* pool); +using NYql::NUdf::AllocateBitmapWithReserve; +using NYql::NUdf::MakeDenseBitmap; inline arrow::internal::Bitmap GetBitmap(const arrow::ArrayData& arr, int index) { return arrow::internal::Bitmap{ arr.buffers[index], arr.offset, arr.length }; @@ -233,64 +233,6 @@ inline arrow::Datum MakeScalarDatum<double>(double value) { return arrow::Datum(std::make_shared<arrow::DoubleScalar>(value)); } -// similar to arrow::TypedBufferBuilder, but with UnsafeAdvance() method -// and shrinkToFit = false -template<typename T> -class TTypedBufferBuilder { - static_assert(std::is_pod_v<T>); - static_assert(!std::is_same_v<T, bool>); -public: - explicit TTypedBufferBuilder(arrow::MemoryPool* pool) - : Builder(pool) - { - } - - inline void Reserve(size_t size) { - ARROW_OK(Builder.Reserve(size * sizeof(T))); - } - - inline size_t Length() const { - return Builder.length() / sizeof(T); - } - - inline T* MutableData() { - return reinterpret_cast<T*>(Builder.mutable_data()); - } - - inline T* End() { - return MutableData() + Length(); - } - - inline const T* Data() const { - return reinterpret_cast<const T*>(Builder.data()); - } - - inline void UnsafeAppend(const T* values, size_t count) { - std::memcpy(End(), values, count * sizeof(T)); - UnsafeAdvance(count); - } - - inline void UnsafeAppend(size_t count, const T& value) { - T* target = End(); - std::fill(target, target + count, value); - UnsafeAdvance(count); - } - - inline void UnsafeAppend(T&& value) { - *End() = std::move(value); - UnsafeAdvance(1); - } - - inline void UnsafeAdvance(size_t count) { - Builder.UnsafeAdvance(count * sizeof(T)); - } - - inline std::shared_ptr<arrow::Buffer> Finish() { - bool shrinkToFit = false; - return ARROW_RESULT(Builder.Finish(shrinkToFit)); - } -private: - arrow::BufferBuilder Builder; -}; +using NYql::NUdf::TTypedBufferBuilder; } diff --git a/ydb/library/yql/minikql/arrow/mkql_bit_utils.h b/ydb/library/yql/minikql/arrow/mkql_bit_utils.h index 9bb5d89887..1775dd68bb 100644 --- a/ydb/library/yql/minikql/arrow/mkql_bit_utils.h +++ b/ydb/library/yql/minikql/arrow/mkql_bit_utils.h @@ -1,5 +1,6 @@ #pragma once #include <util/system/types.h> +#include <ydb/library/yql/public/udf/arrow/bit_util.h> namespace NKikimr { namespace NMiniKQL { @@ -70,48 +71,8 @@ inline size_t GetSparseBitmapPopCount(const ui8* src, size_t len) { return result; } -namespace { -template<bool Negate> -inline void CompressSparseImpl(ui8* dst, const ui8* srcSparse, size_t len) { - while (len >= 8) { - ui8 result = 0; - result |= (*srcSparse++ & 1u) << 0; - result |= (*srcSparse++ & 1u) << 1; - result |= (*srcSparse++ & 1u) << 2; - result |= (*srcSparse++ & 1u) << 3; - result |= (*srcSparse++ & 1u) << 4; - result |= (*srcSparse++ & 1u) << 5; - result |= (*srcSparse++ & 1u) << 6; - result |= (*srcSparse++ & 1u) << 7; - if constexpr (Negate) { - *dst++ = ~result; - } else { - *dst++ = result; - } - len -= 8; - } - if (len) { - ui8 result = 0; - for (ui8 i = 0; i < len; ++i) { - result |= (*srcSparse++ & 1u) << i; - } - if constexpr (Negate) { - *dst++ = ~result; - } else { - *dst++ = result; - } - } -} - -} // namespace - -inline void CompressSparseBitmap(ui8* dst, const ui8* srcSparse, size_t len) { - return CompressSparseImpl<false>(dst, srcSparse, len); -} - -inline void CompressSparseBitmapNegate(ui8* dst, const ui8* srcSparse, size_t len) { - return CompressSparseImpl<true>(dst, srcSparse, len); -} +using NYql::NUdf::CompressSparseBitmap; +using NYql::NUdf::CompressSparseBitmapNegate; inline void NegateSparseBitmap(ui8* dst, const ui8* src, size_t len) { while (len--) { @@ -182,24 +143,8 @@ inline size_t CompressBitmap(const ui8* src, size_t srcOffset, return dstOffset; } -template<typename T> -inline T* CompressArray(const T* src, const ui8* sparseBitmap, T* dst, size_t count) { - while (count--) { - *dst = *src++; - dst += *sparseBitmap++; - } - return dst; -} - -inline ui8* CompressAsSparseBitmap(const ui8* src, size_t srcOffset, const ui8* sparseBitmap, ui8* dst, size_t count) { - while (count--) { - ui8 inputBit = (src[srcOffset >> 3] >> (srcOffset & 7)) & 1; - *dst = inputBit; - ++srcOffset; - dst += *sparseBitmap++; - } - return dst; -} +using NYql::NUdf::CompressAsSparseBitmap; +using NYql::NUdf::CompressArray; } // namespace NMiniKQL } // namespace NKikimr
\ No newline at end of file diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp index 2e933e3181..87b207e427 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp @@ -144,7 +144,7 @@ private: class TGenericColumnBuilder : public IAggColumnBuilder { public: TGenericColumnBuilder(ui64 size, TType* columnType, TComputationContext& ctx) - : Builder_(MakeBlockBuilder(columnType, ctx.ArrowMemoryPool, size)) + : Builder_(MakeArrayBuilder(TTypeInfoHelper(), columnType, ctx.ArrowMemoryPool, size)) , Ctx_(ctx) { } @@ -158,7 +158,7 @@ public: } private: - const std::unique_ptr<IBlockBuilder> Builder_; + const std::unique_ptr<IArrayBuilder> Builder_; TComputationContext& Ctx_; }; diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp index 08fdd91649..090df34937 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp @@ -15,797 +15,5 @@ namespace NKikimr { namespace NMiniKQL { -namespace { - -std::shared_ptr<arrow::DataType> GetArrowType(TType* type) { - std::shared_ptr<arrow::DataType> result; - Y_VERIFY(ConvertArrowType(type, result)); - return result; -} - -class TBlockBuilderBase : public IBlockBuilder { -public: - using Ptr = std::unique_ptr<TBlockBuilderBase>; - - struct TBlockArrayTree { - using Ptr = std::shared_ptr<TBlockArrayTree>; - std::deque<std::shared_ptr<arrow::ArrayData>> Payload; - std::vector<TBlockArrayTree::Ptr> Children; - }; - - - TBlockBuilderBase(TType* type, arrow::MemoryPool& pool, size_t maxLen) - : Type(type) - , Pool(&pool) - , MaxLen(maxLen) - { - Y_VERIFY(type); - Y_VERIFY(maxLen > 0); - } - - size_t MaxLength() const final { - return MaxLen; - } - - void Add(NUdf::TUnboxedValuePod value) final { - Y_VERIFY(CurrLen < MaxLen); - DoAdd(value); - CurrLen++; - } - - void Add(TBlockItem value) final { - Y_VERIFY(CurrLen < MaxLen); - DoAdd(value); - CurrLen++; - } - - - void AddDefault() { - Y_VERIFY(CurrLen < MaxLen); - DoAddDefault(); - CurrLen++; - } - - void AddMany(const arrow::ArrayData& array, size_t popCount, const ui8* sparseBitmap, size_t bitmapSize) final { - Y_VERIFY(array.length == bitmapSize); - Y_VERIFY(popCount <= bitmapSize); - Y_VERIFY(CurrLen + popCount <= MaxLen); - - if (popCount) { - DoAddMany(array, sparseBitmap, popCount); - } - - CurrLen += popCount; - } - - arrow::Datum Build(bool finish) final { - auto tree = BuildTree(finish); - TVector<std::shared_ptr<arrow::ArrayData>> chunks; - while (size_t size = CalcSliceSize(*tree)) { - chunks.push_back(Slice(*tree, size)); - } - - return MakeArray(chunks); - } - - TBlockArrayTree::Ptr BuildTree(bool finish) { - auto result = DoBuildTree(finish); - CurrLen = 0; - return result; - } -protected: - virtual void DoAdd(NUdf::TUnboxedValuePod value) = 0; - virtual void DoAdd(TBlockItem value) = 0; - virtual void DoAddDefault() = 0; - virtual void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) = 0; - virtual TBlockArrayTree::Ptr DoBuildTree(bool finish) = 0; - -private: - static size_t CalcSliceSize(const TBlockArrayTree& tree) { - if (tree.Payload.empty()) { - return 0; - } - - if (!tree.Children.empty()) { - Y_VERIFY(tree.Payload.size() == 1); - size_t result = std::numeric_limits<size_t>::max(); - for (auto& child : tree.Children) { - size_t childSize = CalcSliceSize(*child); - result = std::min(result, childSize); - } - Y_VERIFY(result <= tree.Payload.front()->length); - return result; - } - - int64_t result = std::numeric_limits<int64_t>::max(); - for (auto& data : tree.Payload) { - result = std::min(result, data->length); - } - - Y_VERIFY(result > 0); - return static_cast<size_t>(result); - } - - static std::shared_ptr<arrow::ArrayData> Slice(TBlockArrayTree& tree, size_t size) { - Y_VERIFY(size > 0); - - Y_VERIFY(!tree.Payload.empty()); - auto& main = tree.Payload.front(); - std::shared_ptr<arrow::ArrayData> sliced; - if (size == main->length) { - sliced = main; - tree.Payload.pop_front(); - } else { - Y_VERIFY(size < main->length); - sliced = Chop(main, size); - } - - if (!tree.Children.empty()) { - std::vector<std::shared_ptr<arrow::ArrayData>> children; - for (auto& child : tree.Children) { - children.push_back(Slice(*child, size)); - } - - sliced->child_data = std::move(children); - if (tree.Payload.empty()) { - tree.Children.clear(); - } - } - return sliced; - } - -protected: - size_t GetCurrLen() const { - return CurrLen; - } - - TType* const Type; - arrow::MemoryPool* const Pool; - const size_t MaxLen; -private: - size_t CurrLen = 0; -}; - -template <typename T, bool Nullable> -class TFixedSizeBlockBuilder : public TBlockBuilderBase { -public: - TFixedSizeBlockBuilder(TType* type, arrow::MemoryPool& pool, size_t maxLen) - : TBlockBuilderBase(type, pool, maxLen) - { - Reserve(); - } - - void DoAdd(NUdf::TUnboxedValuePod value) final { - if constexpr (Nullable) { - if (!value) { - return DoAdd(TBlockItem{}); - } - } - DoAdd(TBlockItem(value.Get<T>())); - } - - void DoAdd(TBlockItem value) final { - if constexpr (Nullable) { - if (!value) { - NullBuilder->UnsafeAppend(0); - DataBuilder->UnsafeAppend(T{}); - return; - } - NullBuilder->UnsafeAppend(1); - } - - DataBuilder->UnsafeAppend(value.As<T>()); - } - - void DoAddDefault() final { - if constexpr (Nullable) { - NullBuilder->UnsafeAppend(1); - } - DataBuilder->UnsafeAppend(T{}); - } - - void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final { - Y_VERIFY(array.buffers.size() > 1); - if constexpr (Nullable) { - Y_VERIFY(NullBuilder->Length() == DataBuilder->Length()); - if (array.buffers.front()) { - ui8* dstBitmap = NullBuilder->End(); - CompressAsSparseBitmap(array.GetValues<ui8>(0, 0), array.offset, sparseBitmap, dstBitmap, array.length); - NullBuilder->UnsafeAdvance(popCount); - } else { - NullBuilder->UnsafeAppend(popCount, 1); - } - } - - const T* src = array.GetValues<T>(1); - T* dst = DataBuilder->End(); - CompressArray(src, sparseBitmap, dst, array.length); - DataBuilder->UnsafeAdvance(popCount); - } - - TBlockArrayTree::Ptr DoBuildTree(bool finish) final { - const size_t len = DataBuilder->Length(); - std::shared_ptr<arrow::Buffer> nulls; - if constexpr (Nullable) { - Y_VERIFY(NullBuilder->Length() == len); - nulls = NullBuilder->Finish(); - nulls = MakeDenseBitmap(nulls->data(), len, Pool); - } - std::shared_ptr<arrow::Buffer> data = DataBuilder->Finish(); - - TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>(); - result->Payload.push_back(arrow::ArrayData::Make(GetArrowType(Type), len, {nulls, data})); - - NullBuilder.reset(); - DataBuilder.reset(); - if (!finish) { - Reserve(); - } - return result; - } - -private: - void Reserve() { - DataBuilder = std::make_unique<TTypedBufferBuilder<T>>(Pool); - DataBuilder->Reserve(MaxLen + 1); - if constexpr (Nullable) { - NullBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool); - NullBuilder->Reserve(MaxLen + 1); - } - } - - std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder; - std::unique_ptr<TTypedBufferBuilder<T>> DataBuilder; -}; - -template<typename TStringType, bool Nullable> -class TStringBlockBuilder : public TBlockBuilderBase { -public: - using TOffset = typename TStringType::offset_type; - - TStringBlockBuilder(TType* type, arrow::MemoryPool& pool, size_t maxLen) - : TBlockBuilderBase(type, pool, maxLen) - { - Reserve(); - } - - void DoAdd(NUdf::TUnboxedValuePod value) final { - if constexpr (Nullable) { - if (!value) { - return DoAdd(TBlockItem{}); - } - } - - DoAdd(TBlockItem(value.AsStringRef())); - } - - void DoAdd(TBlockItem value) final { - if constexpr (Nullable) { - if (!value) { - NullBuilder->UnsafeAppend(0); - AppendCurrentOffset(); - return; - } - } - - const std::string_view str = value.AsStringRef(); - - size_t currentLen = DataBuilder->Length(); - // empty string can always be appended - if (!str.empty() && currentLen + str.size() > MaxBlockSizeInBytes) { - if (currentLen) { - FlushChunk(false); - } - if (str.size() > MaxBlockSizeInBytes) { - DataBuilder->Reserve(str.size()); - } - } - - AppendCurrentOffset(); - DataBuilder->UnsafeAppend((const ui8*)str.data(), str.size()); - if constexpr (Nullable) { - NullBuilder->UnsafeAppend(1); - } - } - - - void DoAddDefault() final { - if constexpr (Nullable) { - NullBuilder->UnsafeAppend(1); - } - AppendCurrentOffset(); - } - - void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final { - Y_VERIFY(array.buffers.size() > 2); - Y_VERIFY(!Nullable || NullBuilder->Length() == OffsetsBuilder->Length()); - - const ui8* srcNulls = array.GetValues<ui8>(0, 0); - const TOffset* srcOffset = array.GetValues<TOffset>(1); - const ui8* srcData = array.GetValues<ui8>(2, 0); - - const ui8* chunkStart = srcData; - const ui8* chunkEnd = chunkStart; - size_t dataLen = DataBuilder->Length(); - - ui8* dstNulls = Nullable ? NullBuilder->End() : nullptr; - TOffset* dstOffset = OffsetsBuilder->End(); - size_t countAdded = 0; - for (size_t i = 0; i < array.length; i++) { - if (!sparseBitmap[i]) { - continue; - } - - const ui8* begin = srcData + srcOffset[i]; - const ui8* end = srcData + srcOffset[i + 1]; - const size_t strSize = end - begin; - - size_t availBytes = std::max(dataLen, MaxBlockSizeInBytes) - dataLen; - - for (;;) { - // try to append ith string - if (strSize <= availBytes) { - if (begin == chunkEnd) { - chunkEnd = end; - } else { - DataBuilder->UnsafeAppend(chunkStart, chunkEnd - chunkStart); - chunkStart = begin; - chunkEnd = end; - } - - size_t nullOffset = i + array.offset; - if constexpr (Nullable) { - *dstNulls++ = srcNulls ? ((srcNulls[nullOffset >> 3] >> (nullOffset & 7)) & 1) : 1u; - } - *dstOffset++ = dataLen; - - dataLen += strSize; - ++countAdded; - - break; - } - - if (dataLen) { - if (chunkStart != chunkEnd) { - DataBuilder->UnsafeAppend(chunkStart, chunkEnd - chunkStart); - chunkStart = chunkEnd = srcData; - } - Y_VERIFY(dataLen == DataBuilder->Length()); - OffsetsBuilder->UnsafeAdvance(countAdded); - if constexpr (Nullable) { - NullBuilder->UnsafeAdvance(countAdded); - } - FlushChunk(false); - - dataLen = 0; - countAdded = 0; - if constexpr (Nullable) { - dstNulls = NullBuilder->End(); - } - dstOffset = OffsetsBuilder->End(); - } else { - DataBuilder->Reserve(strSize); - availBytes = strSize; - } - } - } - if (chunkStart != chunkEnd) { - DataBuilder->UnsafeAppend(chunkStart, chunkEnd - chunkStart); - } - Y_VERIFY(dataLen == DataBuilder->Length()); - OffsetsBuilder->UnsafeAdvance(countAdded); - if constexpr (Nullable) { - NullBuilder->UnsafeAdvance(countAdded); - } - } - - TBlockArrayTree::Ptr DoBuildTree(bool finish) final { - FlushChunk(finish); - TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>(); - result->Payload = std::move(Chunks); - Chunks.clear(); - return result; - } - -private: - void Reserve() { - if constexpr (Nullable) { - NullBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool); - NullBuilder->Reserve(MaxLen + 1); - } - OffsetsBuilder = std::make_unique<TTypedBufferBuilder<TOffset>>(Pool); - OffsetsBuilder->Reserve(MaxLen + 1); - DataBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool); - DataBuilder->Reserve(MaxBlockSizeInBytes); - } - - void AppendCurrentOffset() { - OffsetsBuilder->UnsafeAppend(DataBuilder->Length()); - } - - void FlushChunk(bool finish) { - const auto length = OffsetsBuilder->Length(); - Y_VERIFY(length > 0); - - AppendCurrentOffset(); - std::shared_ptr<arrow::Buffer> nullBitmap; - if constexpr (Nullable) { - nullBitmap = NullBuilder->Finish(); - nullBitmap = MakeDenseBitmap(nullBitmap->data(), length, Pool); - } - std::shared_ptr<arrow::Buffer> offsets = OffsetsBuilder->Finish(); - std::shared_ptr<arrow::Buffer> data = DataBuilder->Finish(); - - auto arrowType = std::make_shared<TStringType>(); - Chunks.push_back(arrow::ArrayData::Make(arrowType, length, { nullBitmap, offsets, data })); - if (!finish) { - Reserve(); - } - } - - std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder; - std::unique_ptr<TTypedBufferBuilder<TOffset>> OffsetsBuilder; - std::unique_ptr<TTypedBufferBuilder<ui8>> DataBuilder; - - std::deque<std::shared_ptr<arrow::ArrayData>> Chunks; -}; - -template<bool Nullable> -class TTupleBlockBuilder : public TBlockBuilderBase { -public: - TTupleBlockBuilder(TType* type, arrow::MemoryPool& pool, size_t maxLen, - TVector<TBlockBuilderBase::Ptr>&& children) - : TBlockBuilderBase(type, pool, maxLen) - , Children(std::move(children)) - { - Reserve(); - } - - void DoAdd(NUdf::TUnboxedValuePod value) final { - TTupleType* tupleType = AS_TYPE(TTupleType, Type); - if constexpr (Nullable) { - if (!value) { - NullBuilder->UnsafeAppend(0); - for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { - Children[i]->AddDefault(); - } - return; - } - NullBuilder->UnsafeAppend(1); - } - - auto elements = value.GetElements(); - if (elements) { - for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { - Children[i]->Add(elements[i]); - } - } else { - for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { - auto element = value.GetElement(i); - Children[i]->Add(element); - } - } - } - - void DoAdd(TBlockItem value) final { - TTupleType* tupleType = AS_TYPE(TTupleType, Type); - if constexpr (Nullable) { - if (!value) { - NullBuilder->UnsafeAppend(0); - for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { - Children[i]->AddDefault(); - } - return; - } - NullBuilder->UnsafeAppend(1); - } - - auto elements = value.AsTuple(); - for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { - Children[i]->Add(elements[i]); - } - } - - void DoAddDefault() final { - TTupleType* tupleType = AS_TYPE(TTupleType, Type); - if constexpr (Nullable) { - NullBuilder->UnsafeAppend(1); - } - for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { - Children[i]->AddDefault(); - } - } - - void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final { - Y_VERIFY(!array.buffers.empty()); - Y_VERIFY(array.child_data.size() == Children.size()); - - if constexpr (Nullable) { - if (array.buffers.front()) { - ui8* dstBitmap = NullBuilder->End(); - CompressAsSparseBitmap(array.GetValues<ui8>(0, 0), array.offset, sparseBitmap, dstBitmap, array.length); - NullBuilder->UnsafeAdvance(popCount); - } else { - NullBuilder->UnsafeAppend(popCount, 1); - } - } - - for (size_t i = 0; i < Children.size(); ++i) { - Children[i]->AddMany(*array.child_data[i], popCount, sparseBitmap, array.length); - } - } - - TBlockArrayTree::Ptr DoBuildTree(bool finish) final { - auto tupleType = AS_TYPE(TTupleType, Type); - TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>(); - - std::shared_ptr<arrow::Buffer> nullBitmap; - const size_t length = GetCurrLen(); - if constexpr (Nullable) { - MKQL_ENSURE(length == NullBuilder->Length(), "Unexpected NullBuilder length"); - nullBitmap = NullBuilder->Finish(); - nullBitmap = MakeDenseBitmap(nullBitmap->data(), length, Pool); - } - - Y_VERIFY(length); - result->Payload.push_back(arrow::ArrayData::Make(GetArrowType(Type), length, { nullBitmap })); - result->Children.reserve(tupleType->GetElementsCount()); - for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { - result->Children.emplace_back(Children[i]->BuildTree(finish)); - } - - if (!finish) { - Reserve(); - } - - return result; - } - -private: - void Reserve() { - if constexpr (Nullable) { - NullBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool); - NullBuilder->Reserve(MaxLen + 1); - } - } - -private: - TVector<std::unique_ptr<TBlockBuilderBase>> Children; - std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder; -}; - -class TExternalOptionalBlockBuilder : public TBlockBuilderBase { -public: - TExternalOptionalBlockBuilder(TType* type, arrow::MemoryPool& pool, size_t maxLen, std::unique_ptr<TBlockBuilderBase>&& inner) - : TBlockBuilderBase(type, pool, maxLen) - , Inner(std::move(inner)) - { - Reserve(); - } - - void DoAdd(NUdf::TUnboxedValuePod value) final { - if (!value) { - NullBuilder->UnsafeAppend(0); - Inner->AddDefault(); - return; - } - - NullBuilder->UnsafeAppend(1); - Inner->Add(value.GetOptionalValue()); - } - - void DoAdd(TBlockItem value) final { - if (!value) { - NullBuilder->UnsafeAppend(0); - Inner->AddDefault(); - return; - } - - NullBuilder->UnsafeAppend(1); - Inner->Add(value.GetOptionalValue()); - } - - void DoAddDefault() final { - NullBuilder->UnsafeAppend(1); - Inner->AddDefault(); - } - - void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final { - Y_VERIFY(!array.buffers.empty()); - Y_VERIFY(array.child_data.size() == 1); - - if (array.buffers.front()) { - ui8* dstBitmap = NullBuilder->End(); - CompressAsSparseBitmap(array.GetValues<ui8>(0, 0), array.offset, sparseBitmap, dstBitmap, array.length); - NullBuilder->UnsafeAdvance(popCount); - } else { - NullBuilder->UnsafeAppend(popCount, 1); - } - - Inner->AddMany(*array.child_data[0], popCount, sparseBitmap, array.length); - } - - TBlockArrayTree::Ptr DoBuildTree(bool finish) final { - TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>(); - - std::shared_ptr<arrow::Buffer> nullBitmap; - const size_t length = GetCurrLen(); - MKQL_ENSURE(length == NullBuilder->Length(), "Unexpected NullBuilder length"); - nullBitmap = NullBuilder->Finish(); - nullBitmap = MakeDenseBitmap(nullBitmap->data(), length, Pool); - - Y_VERIFY(length); - result->Payload.push_back(arrow::ArrayData::Make(GetArrowType(Type), length, { nullBitmap })); - result->Children.emplace_back(Inner->BuildTree(finish)); - - if (!finish) { - Reserve(); - } - - return result; - } - -private: - void Reserve() { - NullBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool); - NullBuilder->Reserve(MaxLen + 1); - } - -private: - std::unique_ptr<TBlockBuilderBase> Inner; - std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder; -}; - -std::unique_ptr<TBlockBuilderBase> MakeBlockBuilderBase(TType* type, arrow::MemoryPool& pool, size_t maxBlockLength); - -template<bool Nullable> -std::unique_ptr<TBlockBuilderBase> MakeBlockBuilderImpl(TType* type, arrow::MemoryPool& pool, size_t maxLen) { - if constexpr (Nullable) { - type = AS_TYPE(TOptionalType, type)->GetItemType(); - } - - if (type->IsTuple()) { - auto tupleType = AS_TYPE(TTupleType, type); - TVector<std::unique_ptr<TBlockBuilderBase>> children; - for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { - TType* childType = tupleType->GetElementType(i); - auto childBuilder = MakeBlockBuilderBase(childType, pool, maxLen); - children.push_back(std::move(childBuilder)); - } - - return std::make_unique<TTupleBlockBuilder<Nullable>>(type, pool, maxLen, std::move(children)); - } - - if (type->IsData()) { - auto slot = *AS_TYPE(TDataType, type)->GetDataSlot(); - switch (slot) { - case NUdf::EDataSlot::Int8: - return std::make_unique<TFixedSizeBlockBuilder<i8, Nullable>>(type, pool, maxLen); - case NUdf::EDataSlot::Uint8: - case NUdf::EDataSlot::Bool: - return std::make_unique<TFixedSizeBlockBuilder<ui8, Nullable>>(type, pool, maxLen); - case NUdf::EDataSlot::Int16: - return std::make_unique<TFixedSizeBlockBuilder<i16, Nullable>>(type, pool, maxLen); - case NUdf::EDataSlot::Uint16: - case NUdf::EDataSlot::Date: - return std::make_unique<TFixedSizeBlockBuilder<ui16, Nullable>>(type, pool, maxLen); - case NUdf::EDataSlot::Int32: - return std::make_unique<TFixedSizeBlockBuilder<i32, Nullable>>(type, pool, maxLen); - case NUdf::EDataSlot::Uint32: - case NUdf::EDataSlot::Datetime: - return std::make_unique<TFixedSizeBlockBuilder<ui32, Nullable>>(type, pool, maxLen); - case NUdf::EDataSlot::Int64: - case NUdf::EDataSlot::Interval: - return std::make_unique<TFixedSizeBlockBuilder<i64, Nullable>>(type, pool, maxLen); - case NUdf::EDataSlot::Uint64: - case NUdf::EDataSlot::Timestamp: - return std::make_unique<TFixedSizeBlockBuilder<ui64, Nullable>>(type, pool, maxLen); - case NUdf::EDataSlot::Float: - return std::make_unique<TFixedSizeBlockBuilder<float, Nullable>>(type, pool, maxLen); - case NUdf::EDataSlot::Double: - return std::make_unique<TFixedSizeBlockBuilder<double, Nullable>>(type, pool, maxLen); - case NUdf::EDataSlot::String: - return std::make_unique<TStringBlockBuilder<arrow::BinaryType, Nullable>>(type, pool, maxLen); - case NUdf::EDataSlot::Utf8: - return std::make_unique<TStringBlockBuilder<arrow::StringType, Nullable>>(type, pool, maxLen); - default: - MKQL_ENSURE(false, "Unsupported data slot"); - } - } - - MKQL_ENSURE(false, "Unsupported type"); -} - -std::unique_ptr<TBlockBuilderBase> MakeBlockBuilderBase(TType* type, arrow::MemoryPool& pool, size_t maxBlockLength) { - TType* unpacked = type; - if (type->IsOptional()) { - unpacked = AS_TYPE(TOptionalType, type)->GetItemType(); - } - - if (unpacked->IsOptional()) { - // at least 2 levels of optionals - ui32 nestLevel = 0; - auto currentType = type; - auto previousType = type; - TVector<TType*> types; - do { - ++nestLevel; - types.push_back(currentType); - previousType = currentType; - currentType = AS_TYPE(TOptionalType, currentType)->GetItemType(); - } while (currentType->IsOptional()); - - auto builder = MakeBlockBuilderBase(previousType, pool, maxBlockLength); - for (ui32 i = 1; i < nestLevel; ++i) { - builder = std::make_unique<TExternalOptionalBlockBuilder>(types[nestLevel - 1 - i], pool, maxBlockLength, std::move(builder)); - } - - return builder; - } else { - if (type->IsOptional()) { - return MakeBlockBuilderImpl<true>(type, pool, maxBlockLength); - } else { - return MakeBlockBuilderImpl<false>(type, pool, maxBlockLength); - } - } -} - -} // namespace - -size_t CalcMaxBlockItemSize(const TType* type) { - // we do not count block bitmap size - if (type->IsOptional()) { - return CalcMaxBlockItemSize(AS_TYPE(TOptionalType, type)->GetItemType()); - } - - if (type->IsTuple()) { - auto tupleType = AS_TYPE(TTupleType, type); - size_t result = 0; - for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { - result = std::max(result, CalcMaxBlockItemSize(tupleType->GetElementType(i))); - } - return result; - } - - if (type->IsData()) { - auto slot = *AS_TYPE(TDataType, type)->GetDataSlot(); - switch (slot) { - case NUdf::EDataSlot::Int8: - case NUdf::EDataSlot::Uint8: - case NUdf::EDataSlot::Bool: - case NUdf::EDataSlot::Int16: - case NUdf::EDataSlot::Uint16: - case NUdf::EDataSlot::Date: - case NUdf::EDataSlot::Int32: - case NUdf::EDataSlot::Uint32: - case NUdf::EDataSlot::Datetime: - case NUdf::EDataSlot::Int64: - case NUdf::EDataSlot::Interval: - case NUdf::EDataSlot::Uint64: - case NUdf::EDataSlot::Timestamp: - case NUdf::EDataSlot::Float: - case NUdf::EDataSlot::Double: { - size_t sz = GetDataTypeInfo(slot).FixedSize; - MKQL_ENSURE(sz > 0, "Unexpected fixed data size"); - return sz; - } - case NUdf::EDataSlot::String: - // size of offset part - return sizeof(arrow::BinaryType::offset_type); - case NUdf::EDataSlot::Utf8: - // size of offset part - return sizeof(arrow::StringType::offset_type); - default: - MKQL_ENSURE(false, "Unsupported data slot"); - } - } - - MKQL_ENSURE(false, "Unsupported type"); -} - -std::unique_ptr<IBlockBuilder> MakeBlockBuilder(TType* type, arrow::MemoryPool& pool, size_t maxBlockLength) { - return MakeBlockBuilderBase(type, pool, maxBlockLength); -} - } // namespace NMiniKQL } // namespace NKikimr diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.h index 3a937d6dd7..355c1be4ce 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.h @@ -5,36 +5,17 @@ #include <ydb/library/yql/minikql/mkql_node.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node.h> -#include <arrow/array/data.h> +#include <ydb/library/yql/public/udf/arrow/block_builder.h> -#include <util/generic/size_literals.h> +#include <arrow/array/data.h> #include <limits> namespace NKikimr { namespace NMiniKQL { -constexpr size_t MaxBlockSizeInBytes = 1_MB; -static_assert(MaxBlockSizeInBytes < (size_t)std::numeric_limits<i32>::max()); - -// maximum size of block item in bytes -size_t CalcMaxBlockItemSize(const TType* type); - -inline size_t CalcBlockLen(size_t maxBlockItemSize) { - return MaxBlockSizeInBytes / std::max<size_t>(maxBlockItemSize, 1); -} - -class IBlockBuilder { -public: - virtual ~IBlockBuilder() = default; - virtual size_t MaxLength() const = 0; - virtual void Add(NUdf::TUnboxedValuePod value) = 0; - virtual void Add(TBlockItem value) = 0; - virtual void AddMany(const arrow::ArrayData& array, size_t popCount, const ui8* sparseBitmap, size_t bitmapSize) = 0; - virtual arrow::Datum Build(bool finish) = 0; -}; - -std::unique_ptr<IBlockBuilder> MakeBlockBuilder(TType* type, arrow::MemoryPool& pool, size_t maxBlockLength); +using NYql::NUdf::IArrayBuilder; +using NYql::NUdf::MakeArrayBuilder; } } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp index 308ce6b854..726adcd5b1 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp @@ -254,7 +254,7 @@ private: TVector<NUdf::TUnboxedValue*> ValuePointers_; TVector<NUdf::TUnboxedValue> InputValues_; TVector<std::shared_ptr<arrow::ArrayData>> Arrays_; - TVector<std::unique_ptr<IBlockBuilder>> Builders_; + TVector<std::unique_ptr<IArrayBuilder>> Builders_; size_t InputSize_ = 0; size_t OutputPos_ = 0; size_t OutputSize_ = 0; @@ -289,7 +289,7 @@ private: for (ui32 i = 0, outIndex = 0; i < width; ++i) { if (i != bitmapIndex) { if (types[i]->GetShape() != TBlockType::EShape::Scalar && output[outIndex] != nullptr) { - Builders_[i] = MakeBlockBuilder(types[i]->GetItemType(), pool, maxBlockLen); + Builders_[i] = MakeArrayBuilder(TTypeInfoHelper(), types[i]->GetItemType(), pool, maxBlockLen); } outIndex++; } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_if.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_if.cpp index 52084e3ae6..10c3e41d8a 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_if.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_if.cpp @@ -73,8 +73,8 @@ class TIfBlockExec { public: explicit TIfBlockExec(TType* type) : Type(type) - , ThenReader(MakeBlockReader(type)) - , ElseReader(MakeBlockReader(type)) + , ThenReader(MakeBlockReader(TTypeInfoHelper(), type)) + , ElseReader(MakeBlockReader(TTypeInfoHelper(), type)) { } @@ -105,7 +105,7 @@ public: const std::shared_ptr<arrow::ArrayData>& pred = predDatum.array(); const size_t len = pred->length; - auto builder = MakeBlockBuilder(Type, *ctx->memory_pool(), len); + auto builder = MakeArrayBuilder(TTypeInfoHelper(), Type, *ctx->memory_pool(), len); const ui8* predValues = pred->GetValues<uint8_t>(1); for (size_t i = 0; i < len; ++i) { if constexpr (!ThenIsScalar) { diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_impl.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_impl.cpp index ce7803fe0e..6610a0de3a 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_impl.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_impl.cpp @@ -27,8 +27,8 @@ std::vector<arrow::ValueDescr> ToValueDescr(const TVector<TType*>& types) { arrow::Datum MakeArrayFromScalar(const arrow::Scalar& scalar, size_t len, TType* type, arrow::MemoryPool& pool) { MKQL_ENSURE(len > 0, "Invalid block size"); - auto reader = MakeBlockReader(type); - auto builder = MakeBlockBuilder(type, pool, len); + auto reader = MakeBlockReader(TTypeInfoHelper(), type); + auto builder = MakeArrayBuilder(TTypeInfoHelper(), type, pool, len); auto scalarItem = reader->GetScalarItem(scalar); for (size_t i = 0; i < len; ++i) { diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_item.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_item.h index 37c357fcc9..a01d2dc780 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_item.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_item.h @@ -1,149 +1,8 @@ #pragma once - -#include <ydb/library/yql/public/udf/udf_data_type.h> -#include <ydb/library/yql/public/udf/udf_string_ref.h> -#include <ydb/library/yql/public/udf/udf_type_size_check.h> +#include <ydb/library/yql/public/udf/arrow/block_item.h> namespace NKikimr::NMiniKQL { -class TBlockItem { - enum class EMarkers : ui8 { - Empty = 0, - Present = 1, - }; - -public: - TBlockItem() noexcept = default; - ~TBlockItem() noexcept = default; - - TBlockItem(const TBlockItem& value) noexcept = default; - TBlockItem(TBlockItem&& value) noexcept = default; - - TBlockItem& operator=(const TBlockItem& value) noexcept = default; - TBlockItem& operator=(TBlockItem&& value) noexcept = default; - - template <typename T, typename = std::enable_if_t<NYql::NUdf::TPrimitiveDataType<T>::Result>> - inline explicit TBlockItem(T value); - - inline explicit TBlockItem(NYql::NUdf::TStringRef value) { - Raw.String.Value = value.Data(); - Raw.String.Size = value.Size(); - Raw.Simple.Meta = static_cast<ui8>(EMarkers::Present); - } - - inline explicit TBlockItem(const TBlockItem* tupleItems) { - Raw.Tuple.Value = tupleItems; - Raw.Simple.Meta = static_cast<ui8>(EMarkers::Present); - } - - inline TBlockItem(ui64 low, ui64 high) { - Raw.Halfs[0] = low; - Raw.Halfs[1] = high; - } - - inline ui64 Low() const { - return Raw.Halfs[0]; - } - - inline ui64 High() const { - return Raw.Halfs[1]; - } - - template <typename T, typename = std::enable_if_t<NYql::NUdf::TPrimitiveDataType<T>::Result>> - inline T As() const; - - inline const TBlockItem* AsTuple() const { - Y_VERIFY_DEBUG(Raw.GetMarkers() == EMarkers::Present); - return Raw.Tuple.Value; - } - - inline NYql::NUdf::TStringRef AsStringRef() const { - Y_VERIFY_DEBUG(Raw.GetMarkers() == EMarkers::Present); - return NYql::NUdf::TStringRef(Raw.String.Value, Raw.String.Size); - } - - inline TBlockItem MakeOptional() const - { - if (Raw.Simple.Meta) - return *this; - - TBlockItem result(*this); - ++result.Raw.Simple.Count; - return result; - } - - inline TBlockItem GetOptionalValue() const - { - if (Raw.Simple.Meta) - return *this; - - Y_VERIFY_DEBUG(Raw.Simple.Count > 0U, "Can't get value from empty."); - - TBlockItem result(*this); - --result.Raw.Simple.Count; - return result; - } - - inline explicit operator bool() const { return bool(Raw); } -private: - union TRaw { - ui64 Halfs[2] = {0, 0}; - struct { - union { - #define FIELD(type) type type##_; - PRIMITIVE_VALUE_TYPES(FIELD); - #undef FIELD - ui64 Count; - }; - union { - ui64 Pad; - struct { - ui8 Reserved[7]; - ui8 Meta; - }; - }; - } Simple; - - struct { - const char* Value; - ui32 Size; - } String; - - struct { - // client should know tuple size - const TBlockItem* Value; - } Tuple; - - EMarkers GetMarkers() const { - return static_cast<EMarkers>(Simple.Meta); - } - - explicit operator bool() const { return Simple.Meta | Simple.Count; } - } Raw; -}; - -UDF_ASSERT_TYPE_SIZE(TBlockItem, 16); - -#define VALUE_GET(xType) \ - template <> \ - inline xType TBlockItem::As<xType>() const \ - { \ - Y_VERIFY_DEBUG(Raw.GetMarkers() == EMarkers::Present); \ - return Raw.Simple.xType##_; \ - } - -#define VALUE_CONSTR(xType) \ - template <> \ - inline TBlockItem::TBlockItem(xType value) \ - { \ - Raw.Simple.xType##_ = value; \ - Raw.Simple.Meta = static_cast<ui8>(EMarkers::Present); \ - } - -PRIMITIVE_VALUE_TYPES(VALUE_GET) -PRIMITIVE_VALUE_TYPES(VALUE_CONSTR) - -#undef VALUE_GET -#undef VALUE_CONSTR +using NYql::NUdf::TBlockItem; } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp index 914ec7e6ca..a73c7cabf2 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp @@ -4,6 +4,8 @@ #include <ydb/library/yql/minikql/mkql_node_builder.h> #include <ydb/library/yql/minikql/mkql_node_cast.h> +#include <ydb/library/yql/public/udf/udf_type_inspection.h> + #include <arrow/array/array_binary.h> #include <arrow/chunked_array.h> @@ -12,29 +14,9 @@ namespace NMiniKQL { namespace { -inline bool IsNull(const arrow::ArrayData& data, size_t index) { - return data.GetNullCount() > 0 && !arrow::BitUtil::GetBit(data.GetValues<uint8_t>(0, 0), index + data.offset); -} - template <typename T> -class TFixedSizeBlockReader : public IBlockReader { +class TFixedSizeBlockItemConverter : public IBlockItemConverter { public: - TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final { - if (IsNull(data, index)) { - return {}; - } - - return TBlockItem(data.GetValues<T>(1)[index]); - } - - TBlockItem GetScalarItem(const arrow::Scalar& scalar) final { - if (!scalar.is_valid) { - return {}; - } - - return TBlockItem(*static_cast<const T*>(arrow::internal::checked_cast<const arrow::internal::PrimitiveScalarBase&>(scalar).data())); - } - NUdf::TUnboxedValuePod MakeValue(TBlockItem item, const THolderFactory& holderFactory) const final { Y_UNUSED(holderFactory); return item ? NUdf::TUnboxedValuePod(item.As<T>()) : NUdf::TUnboxedValuePod{}; @@ -42,33 +24,8 @@ public: }; template<typename TStringType> -class TStringBlockReader : public IBlockReader { +class TStringBlockItemConverter : public IBlockItemConverter { public: - using TOffset = typename TStringType::offset_type; - - TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final { - Y_VERIFY_DEBUG(data.buffers.size() == 3); - if (IsNull(data, index)) { - return {}; - } - - const TOffset* offsets = data.GetValues<TOffset>(1); - const char* strData = data.GetValues<char>(2, 0); - - std::string_view str(strData + offsets[index], offsets[index + 1] - offsets[index]); - return TBlockItem(str); - } - - TBlockItem GetScalarItem(const arrow::Scalar& scalar) final { - if (!scalar.is_valid) { - return {}; - } - - auto buffer = arrow::internal::checked_cast<const arrow::BaseBinaryScalar&>(scalar).value; - std::string_view str(reinterpret_cast<const char*>(buffer->data()), buffer->size()); - return TBlockItem(str); - } - NUdf::TUnboxedValuePod MakeValue(TBlockItem item, const THolderFactory& holderFactory) const final { Y_UNUSED(holderFactory); if (!item) { @@ -78,39 +35,12 @@ public: } }; -class TTupleBlockReader : public IBlockReader { +class TTupleBlockItemConverter : public IBlockItemConverter { public: - TTupleBlockReader(TVector<std::unique_ptr<IBlockReader>>&& children) + TTupleBlockItemConverter(TVector<std::unique_ptr<IBlockItemConverter>>&& children) : Children(std::move(children)) - , Items(Children.size()) {} - TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final { - if (IsNull(data, index)) { - return {}; - } - - for (ui32 i = 0; i < Children.size(); ++i) { - Items[i] = Children[i]->GetItem(*data.child_data[i], index); - } - - return TBlockItem(Items.data()); - } - - TBlockItem GetScalarItem(const arrow::Scalar& scalar) final { - if (!scalar.is_valid) { - return {}; - } - - const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(scalar); - - for (ui32 i = 0; i < Children.size(); ++i) { - Items[i] = Children[i]->GetScalarItem(*structScalar.value[i]); - } - - return TBlockItem(Items.data()); - } - NUdf::TUnboxedValuePod MakeValue(TBlockItem item, const THolderFactory& holderFactory) const final { if (!item) { return {}; @@ -127,34 +57,16 @@ public: } private: - const TVector<std::unique_ptr<IBlockReader>> Children; - TVector<TBlockItem> Items; + const TVector<std::unique_ptr<IBlockItemConverter>> Children; mutable TPlainContainerCache Cache; }; -class TExternalOptionalBlockReader : public IBlockReader { +class TExternalOptionalBlockItemConverter : public IBlockItemConverter { public: - TExternalOptionalBlockReader(std::unique_ptr<IBlockReader>&& inner) + TExternalOptionalBlockItemConverter(std::unique_ptr<IBlockItemConverter>&& inner) : Inner(std::move(inner)) {} - TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final { - if (IsNull(data, index)) { - return {}; - } - - return Inner->GetItem(*data.child_data[0], index).MakeOptional(); - } - - TBlockItem GetScalarItem(const arrow::Scalar& scalar) final { - if (!scalar.is_valid) { - return {}; - } - - const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(scalar); - return Inner->GetScalarItem(*structScalar.value[0]).MakeOptional(); - } - NUdf::TUnboxedValuePod MakeValue(TBlockItem item, const THolderFactory& holderFactory) const final { if (!item) { return {}; @@ -163,86 +75,23 @@ public: } private: - const std::unique_ptr<IBlockReader> Inner; + const std::unique_ptr<IBlockItemConverter> Inner; }; -} // namespace - -std::unique_ptr<IBlockReader> MakeBlockReader(TType* type) { - TType* unpacked = type; - if (type->IsOptional()) { - unpacked = AS_TYPE(TOptionalType, type)->GetItemType(); - } - - if (unpacked->IsOptional()) { - // at least 2 levels of optionals - ui32 nestLevel = 0; - auto currentType = type; - auto previousType = type; - do { - ++nestLevel; - previousType = currentType; - currentType = AS_TYPE(TOptionalType, currentType)->GetItemType(); - } while (currentType->IsOptional()); - - std::unique_ptr<IBlockReader> reader = MakeBlockReader(previousType); - for (ui32 i = 1; i < nestLevel; ++i) { - reader = std::make_unique<TExternalOptionalBlockReader>(std::move(reader)); - } - - return reader; - } else { - type = unpacked; - } - - if (type->IsTuple()) { - auto tupleType = AS_TYPE(TTupleType, type); - TVector<std::unique_ptr<IBlockReader>> children; - for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { - children.emplace_back(MakeBlockReader(tupleType->GetElementType(i))); - } - - return std::make_unique<TTupleBlockReader>(std::move(children)); - } +struct TConverterTraits { + using TResult = IBlockItemConverter; + using TTuple = TTupleBlockItemConverter; + template <typename T> + using TFixedSize = TFixedSizeBlockItemConverter<T>; + template <typename TStringType> + using TStrings = TStringBlockItemConverter<TStringType>; + using TExtOptional = TExternalOptionalBlockItemConverter; +}; - if (type->IsData()) { - auto slot = *AS_TYPE(TDataType, type)->GetDataSlot(); - switch (slot) { - case NUdf::EDataSlot::Int8: - return std::make_unique<TFixedSizeBlockReader<i8>>(); - case NUdf::EDataSlot::Bool: - case NUdf::EDataSlot::Uint8: - return std::make_unique<TFixedSizeBlockReader<ui8>>(); - case NUdf::EDataSlot::Int16: - return std::make_unique<TFixedSizeBlockReader<i16>>(); - case NUdf::EDataSlot::Uint16: - case NUdf::EDataSlot::Date: - return std::make_unique<TFixedSizeBlockReader<ui16>>(); - case NUdf::EDataSlot::Int32: - return std::make_unique<TFixedSizeBlockReader<i32>>(); - case NUdf::EDataSlot::Uint32: - case NUdf::EDataSlot::Datetime: - return std::make_unique<TFixedSizeBlockReader<ui32>>(); - case NUdf::EDataSlot::Int64: - case NUdf::EDataSlot::Interval: - return std::make_unique<TFixedSizeBlockReader<i64>>(); - case NUdf::EDataSlot::Uint64: - case NUdf::EDataSlot::Timestamp: - return std::make_unique<TFixedSizeBlockReader<ui64>>(); - case NUdf::EDataSlot::Float: - return std::make_unique<TFixedSizeBlockReader<float>>(); - case NUdf::EDataSlot::Double: - return std::make_unique<TFixedSizeBlockReader<double>>(); - case NUdf::EDataSlot::String: - return std::make_unique<TStringBlockReader<arrow::BinaryType>>(); - case NUdf::EDataSlot::Utf8: - return std::make_unique<TStringBlockReader<arrow::StringType>>(); - default: - MKQL_ENSURE(false, "Unsupported data slot"); - } - } +} // namespace - MKQL_ENSURE(false, "Unsupported type"); +std::unique_ptr<IBlockItemConverter> MakeBlockItemConverter(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type) { + return NYql::NUdf::MakeBlockReaderImpl<TConverterTraits>(typeInfoHelper, type); } } // namespace NMiniKQL diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.h index 4e144d78a5..3e51769e7f 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.h @@ -5,20 +5,24 @@ #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> #include <ydb/library/yql/minikql/mkql_node.h> +#include <ydb/library/yql/public/udf/udf_types.h> +#include <ydb/library/yql/public/udf/arrow/block_reader.h> + #include <arrow/datum.h> namespace NKikimr::NMiniKQL { -class IBlockReader : private TNonCopyable { +using NYql::NUdf::IBlockReader; + +class IBlockItemConverter { public: - virtual ~IBlockReader() = default; - // result will reference to Array/Scalar internals and will be valid until next call to GetItem/GetScalarItem - virtual TBlockItem GetItem(const arrow::ArrayData& data, size_t index) = 0; - virtual TBlockItem GetScalarItem(const arrow::Scalar& scalar) = 0; + virtual ~IBlockItemConverter() = default; virtual NUdf::TUnboxedValuePod MakeValue(TBlockItem item, const THolderFactory& holderFactory) const = 0; }; -std::unique_ptr<IBlockReader> MakeBlockReader(TType* type); +using NYql::NUdf::MakeBlockReader; + +std::unique_ptr<IBlockItemConverter> MakeBlockItemConverter(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type); } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp index 2e02a6a9c6..4f747fb748 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp @@ -30,7 +30,7 @@ public: NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { const auto maxLen = CalcBlockLen(CalcMaxBlockItemSize(ItemType_)); - auto builder = MakeBlockBuilder(ItemType_, ctx.ArrowMemoryPool, maxLen); + auto builder = MakeArrayBuilder(TTypeInfoHelper(), ItemType_, ctx.ArrowMemoryPool, maxLen); for (size_t i = 0; i < builder->MaxLength(); ++i) { auto result = Flow_->GetValue(ctx); @@ -114,7 +114,7 @@ private: struct TState : public TComputationValue<TState> { std::vector<NUdf::TUnboxedValue> Values_; std::vector<NUdf::TUnboxedValue*> ValuePointers_; - std::vector<std::unique_ptr<IBlockBuilder>> Builders_; + std::vector<std::unique_ptr<IArrayBuilder>> Builders_; size_t MaxLength_; size_t Rows_ = 0; bool IsFinished_ = false; @@ -132,7 +132,7 @@ private: for (size_t i = 0; i < types.size(); ++i) { ValuePointers_[i] = &Values_[i]; - Builders_.push_back(MakeBlockBuilder(types[i], ctx.ArrowMemoryPool, MaxLength_)); + Builders_.push_back(MakeArrayBuilder(TTypeInfoHelper(), types[i], ctx.ArrowMemoryPool, MaxLength_)); } } }; @@ -192,7 +192,8 @@ private: TState(TMemoryUsageInfo* memInfo, TType* itemType) : TComputationValue(memInfo) - , Reader_(MakeBlockReader(itemType)) + , Reader_(MakeBlockReader(TTypeInfoHelper(), itemType)) + , Converter_(MakeBlockItemConverter(TTypeInfoHelper(), itemType)) { } @@ -207,7 +208,7 @@ private: Index_ = 0; Arrays_.pop_front(); } - return Reader_->MakeValue(Reader_->GetItem(*Arrays_.front(), Index_++), ctx.HolderFactory); + return Converter_->MakeValue(Reader_->GetItem(*Arrays_.front(), Index_++), ctx.HolderFactory); } void Reset(const arrow::Datum& datum) { @@ -225,6 +226,7 @@ private: private: const std::unique_ptr<IBlockReader> Reader_; + const std::unique_ptr<IBlockItemConverter> Converter_; TDeque<std::shared_ptr<arrow::ArrayData>> Arrays_; size_t Index_ = 0; }; @@ -289,7 +291,7 @@ public: item = s.Readers_[i]->GetItem(*datum.array(), s.Index_); } - *(output[i]) = s.Readers_[i]->MakeValue(item, ctx.HolderFactory); + *(output[i]) = s.Converters_[i]->MakeValue(item, ctx.HolderFactory); } ++s.Index_; @@ -301,6 +303,7 @@ private: TVector<NUdf::TUnboxedValue> Values_; TVector<NUdf::TUnboxedValue*> ValuePointers_; TVector<std::unique_ptr<IBlockReader>> Readers_; + TVector<std::unique_ptr<IBlockItemConverter>> Converters_; size_t Count_ = 0; size_t Index_ = 0; @@ -314,7 +317,8 @@ private: } for (size_t i = 0; i < types.size(); ++i) { - Readers_.push_back(MakeBlockReader(types[i])); + Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), types[i])); + Converters_.push_back(MakeBlockItemConverter(TTypeInfoHelper(), types[i])); } } }; diff --git a/ydb/library/yql/minikql/mkql_type_builder.cpp b/ydb/library/yql/minikql/mkql_type_builder.cpp index 773e52a93e..3d9e423a88 100644 --- a/ydb/library/yql/minikql/mkql_type_builder.cpp +++ b/ydb/library/yql/minikql/mkql_type_builder.cpp @@ -1860,6 +1860,14 @@ NUdf::IArrowType::TPtr TTypeInfoHelper::ImportArrowType(ArrowSchema* schema) con return new TArrowType(std::move(res).ValueOrDie()); } +ui64 TTypeInfoHelper::GetMaxBlockLength(const NUdf::TType* type) const { + return CalcBlockLen(CalcMaxBlockItemSize(static_cast<const TType*>(type))); +} + +ui64 TTypeInfoHelper::GetMaxBlockBytes() const { + return MaxBlockSizeInBytes; +} + void TTypeInfoHelper::DoData(const NMiniKQL::TDataType* dt, NUdf::ITypeVisitor* v) { const auto typeId = dt->GetSchemeType(); v->OnDataType(typeId); @@ -2097,5 +2105,56 @@ NUdf::IEquate::TPtr MakeEquateImpl(const NMiniKQL::TType* type) { } } +size_t CalcMaxBlockItemSize(const TType* type) { + // we do not count block bitmap size + if (type->IsOptional()) { + return CalcMaxBlockItemSize(AS_TYPE(TOptionalType, type)->GetItemType()); + } + + if (type->IsTuple()) { + auto tupleType = AS_TYPE(TTupleType, type); + size_t result = 0; + for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { + result = std::max(result, CalcMaxBlockItemSize(tupleType->GetElementType(i))); + } + return result; + } + + if (type->IsData()) { + auto slot = *AS_TYPE(TDataType, type)->GetDataSlot(); + switch (slot) { + case NUdf::EDataSlot::Int8: + case NUdf::EDataSlot::Uint8: + case NUdf::EDataSlot::Bool: + case NUdf::EDataSlot::Int16: + case NUdf::EDataSlot::Uint16: + case NUdf::EDataSlot::Date: + case NUdf::EDataSlot::Int32: + case NUdf::EDataSlot::Uint32: + case NUdf::EDataSlot::Datetime: + case NUdf::EDataSlot::Int64: + case NUdf::EDataSlot::Interval: + case NUdf::EDataSlot::Uint64: + case NUdf::EDataSlot::Timestamp: + case NUdf::EDataSlot::Float: + case NUdf::EDataSlot::Double: { + size_t sz = GetDataTypeInfo(slot).FixedSize; + MKQL_ENSURE(sz > 0, "Unexpected fixed data size"); + return sz; + } + case NUdf::EDataSlot::String: + // size of offset part + return sizeof(arrow::BinaryType::offset_type); + case NUdf::EDataSlot::Utf8: + // size of offset part + return sizeof(arrow::StringType::offset_type); + default: + MKQL_ENSURE(false, "Unsupported data slot"); + } + } + + MKQL_ENSURE(false, "Unsupported type"); +} + } // namespace NMiniKQL } // namespace Nkikimr diff --git a/ydb/library/yql/minikql/mkql_type_builder.h b/ydb/library/yql/minikql/mkql_type_builder.h index 0d1fe86799..4861cecca7 100644 --- a/ydb/library/yql/minikql/mkql_type_builder.h +++ b/ydb/library/yql/minikql/mkql_type_builder.h @@ -5,11 +5,23 @@ #include <ydb/library/yql/public/udf/udf_type_builder.h> #include <ydb/library/yql/parser/pg_wrapper/interface/compare.h> +#include <util/generic/size_literals.h> + #include <arrow/datum.h> namespace NKikimr { namespace NMiniKQL { +constexpr size_t MaxBlockSizeInBytes = 1_MB; +static_assert(MaxBlockSizeInBytes < (size_t)std::numeric_limits<i32>::max()); + +// maximum size of block item in bytes +size_t CalcMaxBlockItemSize(const TType* type); + +inline size_t CalcBlockLen(size_t maxBlockItemSize) { + return MaxBlockSizeInBytes / std::max<size_t>(maxBlockItemSize, 1); +} + bool ConvertArrowType(TType* itemType, std::shared_ptr<arrow::DataType>& type); bool ConvertArrowType(NUdf::EDataSlot slot, std::shared_ptr<arrow::DataType>& type); @@ -179,6 +191,8 @@ public: const NYql::NUdf::TPgTypeDescription* FindPgTypeDescription(ui32 typeId) const override; NUdf::IArrowType::TPtr MakeArrowType(const NUdf::TType* type) const override; NUdf::IArrowType::TPtr ImportArrowType(ArrowSchema* schema) const override; + ui64 GetMaxBlockLength(const NUdf::TType* type) const override; + ui64 GetMaxBlockBytes() const override; private: static void DoData(const NMiniKQL::TDataType* dt, NUdf::ITypeVisitor* v); diff --git a/ydb/library/yql/public/udf/arrow/CMakeLists.darwin.txt b/ydb/library/yql/public/udf/arrow/CMakeLists.darwin.txt index deda564cfd..c928bed6ea 100644 --- a/ydb/library/yql/public/udf/arrow/CMakeLists.darwin.txt +++ b/ydb/library/yql/public/udf/arrow/CMakeLists.darwin.txt @@ -20,5 +20,8 @@ target_link_libraries(public-udf-arrow PUBLIC target_sources(public-udf-arrow PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/args_dechunker.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/bit_util.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/util.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/block_reader.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/block_item.cpp ) diff --git a/ydb/library/yql/public/udf/arrow/CMakeLists.linux-aarch64.txt b/ydb/library/yql/public/udf/arrow/CMakeLists.linux-aarch64.txt index a87a49d25a..4c3155d270 100644 --- a/ydb/library/yql/public/udf/arrow/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/public/udf/arrow/CMakeLists.linux-aarch64.txt @@ -21,5 +21,8 @@ target_link_libraries(public-udf-arrow PUBLIC target_sources(public-udf-arrow PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/args_dechunker.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/bit_util.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/util.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/block_reader.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/block_item.cpp ) diff --git a/ydb/library/yql/public/udf/arrow/CMakeLists.linux.txt b/ydb/library/yql/public/udf/arrow/CMakeLists.linux.txt index a87a49d25a..4c3155d270 100644 --- a/ydb/library/yql/public/udf/arrow/CMakeLists.linux.txt +++ b/ydb/library/yql/public/udf/arrow/CMakeLists.linux.txt @@ -21,5 +21,8 @@ target_link_libraries(public-udf-arrow PUBLIC target_sources(public-udf-arrow PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/args_dechunker.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/bit_util.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/util.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/block_reader.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/block_item.cpp ) diff --git a/ydb/library/yql/public/udf/arrow/bit_util.cpp b/ydb/library/yql/public/udf/arrow/bit_util.cpp new file mode 100644 index 0000000000..569e207167 --- /dev/null +++ b/ydb/library/yql/public/udf/arrow/bit_util.cpp @@ -0,0 +1,7 @@ +#include "bit_util.h" + +namespace NYql { +namespace NUdf { + +} +} diff --git a/ydb/library/yql/public/udf/arrow/bit_util.h b/ydb/library/yql/public/udf/arrow/bit_util.h new file mode 100644 index 0000000000..218b2f0925 --- /dev/null +++ b/ydb/library/yql/public/udf/arrow/bit_util.h @@ -0,0 +1,68 @@ +#pragma once + +#include "defs.h" + +namespace NYql { +namespace NUdf { + +inline ui8* CompressAsSparseBitmap(const ui8* src, size_t srcOffset, const ui8* sparseBitmap, ui8* dst, size_t count) { + while (count--) { + ui8 inputBit = (src[srcOffset >> 3] >> (srcOffset & 7)) & 1; + *dst = inputBit; + ++srcOffset; + dst += *sparseBitmap++; + } + return dst; +} + +template<bool Negate> +inline void CompressSparseImpl(ui8* dst, const ui8* srcSparse, size_t len) { + while (len >= 8) { + ui8 result = 0; + result |= (*srcSparse++ & 1u) << 0; + result |= (*srcSparse++ & 1u) << 1; + result |= (*srcSparse++ & 1u) << 2; + result |= (*srcSparse++ & 1u) << 3; + result |= (*srcSparse++ & 1u) << 4; + result |= (*srcSparse++ & 1u) << 5; + result |= (*srcSparse++ & 1u) << 6; + result |= (*srcSparse++ & 1u) << 7; + if constexpr (Negate) { + *dst++ = ~result; + } else { + *dst++ = result; + } + len -= 8; + } + if (len) { + ui8 result = 0; + for (ui8 i = 0; i < len; ++i) { + result |= (*srcSparse++ & 1u) << i; + } + if constexpr (Negate) { + *dst++ = ~result; + } else { + *dst++ = result; + } + } +} + +inline void CompressSparseBitmap(ui8* dst, const ui8* srcSparse, size_t len) { + return CompressSparseImpl<false>(dst, srcSparse, len); +} + +inline void CompressSparseBitmapNegate(ui8* dst, const ui8* srcSparse, size_t len) { + return CompressSparseImpl<true>(dst, srcSparse, len); +} + +template<typename T> +inline T* CompressArray(const T* src, const ui8* sparseBitmap, T* dst, size_t count) { + while (count--) { + *dst = *src++; + dst += *sparseBitmap++; + } + return dst; +} + +} +} diff --git a/ydb/library/yql/public/udf/arrow/block_builder.h b/ydb/library/yql/public/udf/arrow/block_builder.h new file mode 100644 index 0000000000..f8aa5c8f6a --- /dev/null +++ b/ydb/library/yql/public/udf/arrow/block_builder.h @@ -0,0 +1,784 @@ +#pragma once + +#include "util.h" +#include "bit_util.h" + +#include <ydb/library/yql/public/udf/udf_type_inspection.h> + +#include <arrow/datum.h> +#include <arrow/c/bridge.h> + +namespace NYql { +namespace NUdf { + +class IArrayBuilder { +public: + virtual ~IArrayBuilder() = default; + virtual size_t MaxLength() const = 0; + virtual void Add(NUdf::TUnboxedValuePod value) = 0; + virtual void Add(TBlockItem value) = 0; + virtual void AddMany(const arrow::ArrayData& array, size_t popCount, const ui8* sparseBitmap, size_t bitmapSize) = 0; + virtual arrow::Datum Build(bool finish) = 0; +}; + +class IScalarBuilder { +public: + virtual ~IScalarBuilder() = default; + virtual arrow::Datum Build(TBlockItem value) const = 0; +}; + +inline std::shared_ptr<arrow::DataType> GetArrowType(const ITypeInfoHelper& typeInfoHelper, const TType* type) { + auto arrowTypeHandle = typeInfoHelper.MakeArrowType(type); + Y_ENSURE(arrowTypeHandle); + ArrowSchema s; + arrowTypeHandle->Export(&s); + return ARROW_RESULT(arrow::ImportType(&s)); +} + +class TArrayBuilderBase : public IArrayBuilder { +public: + using Ptr = std::unique_ptr<TArrayBuilderBase>; + + struct TBlockArrayTree { + using Ptr = std::shared_ptr<TBlockArrayTree>; + std::deque<std::shared_ptr<arrow::ArrayData>> Payload; + std::vector<TBlockArrayTree::Ptr> Children; + }; + + TArrayBuilderBase(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen) + : ArrowType(GetArrowType(typeInfoHelper, type)) + , Pool(&pool) + , MaxLen(maxLen) + , MaxBlockSizeInBytes(typeInfoHelper.GetMaxBlockBytes()) + { + Y_VERIFY(ArrowType); + Y_VERIFY(maxLen > 0); + } + + size_t MaxLength() const final { + return MaxLen; + } + + void Add(NUdf::TUnboxedValuePod value) final { + Y_VERIFY(CurrLen < MaxLen); + DoAdd(value); + CurrLen++; + } + + void Add(TBlockItem value) final { + Y_VERIFY(CurrLen < MaxLen); + DoAdd(value); + CurrLen++; + } + + + void AddDefault() { + Y_VERIFY(CurrLen < MaxLen); + DoAddDefault(); + CurrLen++; + } + + void AddMany(const arrow::ArrayData& array, size_t popCount, const ui8* sparseBitmap, size_t bitmapSize) final { + Y_VERIFY(size_t(array.length) == bitmapSize); + Y_VERIFY(popCount <= bitmapSize); + Y_VERIFY(CurrLen + popCount <= MaxLen); + + if (popCount) { + DoAddMany(array, sparseBitmap, popCount); + } + + CurrLen += popCount; + } + + arrow::Datum Build(bool finish) final { + auto tree = BuildTree(finish); + TVector<std::shared_ptr<arrow::ArrayData>> chunks; + while (size_t size = CalcSliceSize(*tree)) { + chunks.push_back(Slice(*tree, size)); + } + + return MakeArray(chunks); + } + + TBlockArrayTree::Ptr BuildTree(bool finish) { + auto result = DoBuildTree(finish); + CurrLen = 0; + return result; + } +protected: + virtual void DoAdd(NUdf::TUnboxedValuePod value) = 0; + virtual void DoAdd(TBlockItem value) = 0; + virtual void DoAddDefault() = 0; + virtual void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) = 0; + virtual TBlockArrayTree::Ptr DoBuildTree(bool finish) = 0; + +private: + static size_t CalcSliceSize(const TBlockArrayTree& tree) { + if (tree.Payload.empty()) { + return 0; + } + + if (!tree.Children.empty()) { + Y_VERIFY(tree.Payload.size() == 1); + size_t result = std::numeric_limits<size_t>::max(); + for (auto& child : tree.Children) { + size_t childSize = CalcSliceSize(*child); + result = std::min(result, childSize); + } + Y_VERIFY(result <= size_t(tree.Payload.front()->length)); + return result; + } + + int64_t result = std::numeric_limits<int64_t>::max(); + for (auto& data : tree.Payload) { + result = std::min(result, data->length); + } + + Y_VERIFY(result > 0); + return static_cast<size_t>(result); + } + + static std::shared_ptr<arrow::ArrayData> Slice(TBlockArrayTree& tree, size_t size) { + Y_VERIFY(size > 0); + + Y_VERIFY(!tree.Payload.empty()); + auto& main = tree.Payload.front(); + std::shared_ptr<arrow::ArrayData> sliced; + if (size == size_t(main->length)) { + sliced = main; + tree.Payload.pop_front(); + } else { + Y_VERIFY(size < size_t(main->length)); + sliced = Chop(main, size); + } + + if (!tree.Children.empty()) { + std::vector<std::shared_ptr<arrow::ArrayData>> children; + for (auto& child : tree.Children) { + children.push_back(Slice(*child, size)); + } + + sliced->child_data = std::move(children); + if (tree.Payload.empty()) { + tree.Children.clear(); + } + } + return sliced; + } + +protected: + size_t GetCurrLen() const { + return CurrLen; + } + + const std::shared_ptr<arrow::DataType> ArrowType; + arrow::MemoryPool* const Pool; + const size_t MaxLen; + const size_t MaxBlockSizeInBytes; +private: + size_t CurrLen = 0; +}; + +template <typename T, bool Nullable> +class TFixedSizeArrayBuilder : public TArrayBuilderBase { +public: + TFixedSizeArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen) + : TArrayBuilderBase(typeInfoHelper, type, pool, maxLen) + { + Reserve(); + } + + void DoAdd(NUdf::TUnboxedValuePod value) final { + if constexpr (Nullable) { + if (!value) { + return DoAdd(TBlockItem{}); + } + } + DoAdd(TBlockItem(value.Get<T>())); + } + + void DoAdd(TBlockItem value) final { + if constexpr (Nullable) { + if (!value) { + NullBuilder->UnsafeAppend(0); + DataBuilder->UnsafeAppend(T{}); + return; + } + NullBuilder->UnsafeAppend(1); + } + + DataBuilder->UnsafeAppend(value.As<T>()); + } + + void DoAddDefault() final { + if constexpr (Nullable) { + NullBuilder->UnsafeAppend(1); + } + DataBuilder->UnsafeAppend(T{}); + } + + void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final { + Y_VERIFY(array.buffers.size() > 1); + if constexpr (Nullable) { + Y_VERIFY(NullBuilder->Length() == DataBuilder->Length()); + if (array.buffers.front()) { + ui8* dstBitmap = NullBuilder->End(); + CompressAsSparseBitmap(array.GetValues<ui8>(0, 0), array.offset, sparseBitmap, dstBitmap, array.length); + NullBuilder->UnsafeAdvance(popCount); + } else { + NullBuilder->UnsafeAppend(popCount, 1); + } + } + + const T* src = array.GetValues<T>(1); + T* dst = DataBuilder->End(); + CompressArray(src, sparseBitmap, dst, array.length); + DataBuilder->UnsafeAdvance(popCount); + } + + TBlockArrayTree::Ptr DoBuildTree(bool finish) final { + const size_t len = DataBuilder->Length(); + std::shared_ptr<arrow::Buffer> nulls; + if constexpr (Nullable) { + Y_VERIFY(NullBuilder->Length() == len); + nulls = NullBuilder->Finish(); + nulls = MakeDenseBitmap(nulls->data(), len, Pool); + } + std::shared_ptr<arrow::Buffer> data = DataBuilder->Finish(); + + TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>(); + result->Payload.push_back(arrow::ArrayData::Make(ArrowType, len, {nulls, data})); + + NullBuilder.reset(); + DataBuilder.reset(); + if (!finish) { + Reserve(); + } + return result; + } + +private: + void Reserve() { + DataBuilder = std::make_unique<TTypedBufferBuilder<T>>(Pool); + DataBuilder->Reserve(MaxLen + 1); + if constexpr (Nullable) { + NullBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool); + NullBuilder->Reserve(MaxLen + 1); + } + } + + std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder; + std::unique_ptr<TTypedBufferBuilder<T>> DataBuilder; +}; + +template<typename TStringType, bool Nullable> +class TStringArrayBuilder : public TArrayBuilderBase { +public: + using TOffset = typename TStringType::offset_type; + + TStringArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen) + : TArrayBuilderBase(typeInfoHelper, type, pool, maxLen) + { + Reserve(); + } + + void DoAdd(NUdf::TUnboxedValuePod value) final { + if constexpr (Nullable) { + if (!value) { + return DoAdd(TBlockItem{}); + } + } + + DoAdd(TBlockItem(value.AsStringRef())); + } + + void DoAdd(TBlockItem value) final { + if constexpr (Nullable) { + if (!value) { + NullBuilder->UnsafeAppend(0); + AppendCurrentOffset(); + return; + } + } + + const std::string_view str = value.AsStringRef(); + + size_t currentLen = DataBuilder->Length(); + // empty string can always be appended + if (!str.empty() && currentLen + str.size() > MaxBlockSizeInBytes) { + if (currentLen) { + FlushChunk(false); + } + if (str.size() > MaxBlockSizeInBytes) { + DataBuilder->Reserve(str.size()); + } + } + + AppendCurrentOffset(); + DataBuilder->UnsafeAppend((const ui8*)str.data(), str.size()); + if constexpr (Nullable) { + NullBuilder->UnsafeAppend(1); + } + } + + + void DoAddDefault() final { + if constexpr (Nullable) { + NullBuilder->UnsafeAppend(1); + } + AppendCurrentOffset(); + } + + void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final { + Y_UNUSED(popCount); + Y_VERIFY(array.buffers.size() > 2); + Y_VERIFY(!Nullable || NullBuilder->Length() == OffsetsBuilder->Length()); + + const ui8* srcNulls = array.GetValues<ui8>(0, 0); + const TOffset* srcOffset = array.GetValues<TOffset>(1); + const ui8* srcData = array.GetValues<ui8>(2, 0); + + const ui8* chunkStart = srcData; + const ui8* chunkEnd = chunkStart; + size_t dataLen = DataBuilder->Length(); + + ui8* dstNulls = Nullable ? NullBuilder->End() : nullptr; + TOffset* dstOffset = OffsetsBuilder->End(); + size_t countAdded = 0; + for (size_t i = 0; i < size_t(array.length); i++) { + if (!sparseBitmap[i]) { + continue; + } + + const ui8* begin = srcData + srcOffset[i]; + const ui8* end = srcData + srcOffset[i + 1]; + const size_t strSize = end - begin; + + size_t availBytes = std::max(dataLen, MaxBlockSizeInBytes) - dataLen; + + for (;;) { + // try to append ith string + if (strSize <= availBytes) { + if (begin == chunkEnd) { + chunkEnd = end; + } else { + DataBuilder->UnsafeAppend(chunkStart, chunkEnd - chunkStart); + chunkStart = begin; + chunkEnd = end; + } + + size_t nullOffset = i + array.offset; + if constexpr (Nullable) { + *dstNulls++ = srcNulls ? ((srcNulls[nullOffset >> 3] >> (nullOffset & 7)) & 1) : 1u; + } + *dstOffset++ = dataLen; + + dataLen += strSize; + ++countAdded; + + break; + } + + if (dataLen) { + if (chunkStart != chunkEnd) { + DataBuilder->UnsafeAppend(chunkStart, chunkEnd - chunkStart); + chunkStart = chunkEnd = srcData; + } + Y_VERIFY(dataLen == DataBuilder->Length()); + OffsetsBuilder->UnsafeAdvance(countAdded); + if constexpr (Nullable) { + NullBuilder->UnsafeAdvance(countAdded); + } + FlushChunk(false); + + dataLen = 0; + countAdded = 0; + if constexpr (Nullable) { + dstNulls = NullBuilder->End(); + } + dstOffset = OffsetsBuilder->End(); + } else { + DataBuilder->Reserve(strSize); + availBytes = strSize; + } + } + } + if (chunkStart != chunkEnd) { + DataBuilder->UnsafeAppend(chunkStart, chunkEnd - chunkStart); + } + Y_VERIFY(dataLen == DataBuilder->Length()); + OffsetsBuilder->UnsafeAdvance(countAdded); + if constexpr (Nullable) { + NullBuilder->UnsafeAdvance(countAdded); + } + } + + TBlockArrayTree::Ptr DoBuildTree(bool finish) final { + FlushChunk(finish); + TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>(); + result->Payload = std::move(Chunks); + Chunks.clear(); + return result; + } + +private: + void Reserve() { + if constexpr (Nullable) { + NullBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool); + NullBuilder->Reserve(MaxLen + 1); + } + OffsetsBuilder = std::make_unique<TTypedBufferBuilder<TOffset>>(Pool); + OffsetsBuilder->Reserve(MaxLen + 1); + DataBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool); + DataBuilder->Reserve(MaxBlockSizeInBytes); + } + + void AppendCurrentOffset() { + OffsetsBuilder->UnsafeAppend(DataBuilder->Length()); + } + + void FlushChunk(bool finish) { + const auto length = OffsetsBuilder->Length(); + Y_VERIFY(length > 0); + + AppendCurrentOffset(); + std::shared_ptr<arrow::Buffer> nullBitmap; + if constexpr (Nullable) { + nullBitmap = NullBuilder->Finish(); + nullBitmap = MakeDenseBitmap(nullBitmap->data(), length, Pool); + } + std::shared_ptr<arrow::Buffer> offsets = OffsetsBuilder->Finish(); + std::shared_ptr<arrow::Buffer> data = DataBuilder->Finish(); + + auto arrowType = std::make_shared<TStringType>(); + Chunks.push_back(arrow::ArrayData::Make(arrowType, length, { nullBitmap, offsets, data })); + if (!finish) { + Reserve(); + } + } + + std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder; + std::unique_ptr<TTypedBufferBuilder<TOffset>> OffsetsBuilder; + std::unique_ptr<TTypedBufferBuilder<ui8>> DataBuilder; + + std::deque<std::shared_ptr<arrow::ArrayData>> Chunks; +}; + +template<bool Nullable> +class TTupleArrayBuilder : public TArrayBuilderBase { +public: + TTupleArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, + TVector<TArrayBuilderBase::Ptr>&& children) + : TArrayBuilderBase(typeInfoHelper, type, pool, maxLen) + , Children(std::move(children)) + { + Reserve(); + } + + void DoAdd(NUdf::TUnboxedValuePod value) final { + if constexpr (Nullable) { + if (!value) { + NullBuilder->UnsafeAppend(0); + for (ui32 i = 0; i < Children.size(); ++i) { + Children[i]->AddDefault(); + } + return; + } + NullBuilder->UnsafeAppend(1); + } + + auto elements = value.GetElements(); + if (elements) { + for (ui32 i = 0; i < Children.size(); ++i) { + Children[i]->Add(elements[i]); + } + } else { + for (ui32 i = 0; i < Children.size(); ++i) { + auto element = value.GetElement(i); + Children[i]->Add(element); + } + } + } + + void DoAdd(TBlockItem value) final { + if constexpr (Nullable) { + if (!value) { + NullBuilder->UnsafeAppend(0); + for (ui32 i = 0; i < Children.size(); ++i) { + Children[i]->AddDefault(); + } + return; + } + NullBuilder->UnsafeAppend(1); + } + + auto elements = value.AsTuple(); + for (ui32 i = 0; i < Children.size(); ++i) { + Children[i]->Add(elements[i]); + } + } + + void DoAddDefault() final { + if constexpr (Nullable) { + NullBuilder->UnsafeAppend(1); + } + for (ui32 i = 0; i < Children.size(); ++i) { + Children[i]->AddDefault(); + } + } + + void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final { + Y_VERIFY(!array.buffers.empty()); + Y_VERIFY(array.child_data.size() == Children.size()); + + if constexpr (Nullable) { + if (array.buffers.front()) { + ui8* dstBitmap = NullBuilder->End(); + CompressAsSparseBitmap(array.GetValues<ui8>(0, 0), array.offset, sparseBitmap, dstBitmap, array.length); + NullBuilder->UnsafeAdvance(popCount); + } else { + NullBuilder->UnsafeAppend(popCount, 1); + } + } + + for (size_t i = 0; i < Children.size(); ++i) { + Children[i]->AddMany(*array.child_data[i], popCount, sparseBitmap, array.length); + } + } + + TBlockArrayTree::Ptr DoBuildTree(bool finish) final { + TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>(); + + std::shared_ptr<arrow::Buffer> nullBitmap; + const size_t length = GetCurrLen(); + if constexpr (Nullable) { + Y_ENSURE(length == NullBuilder->Length(), "Unexpected NullBuilder length"); + nullBitmap = NullBuilder->Finish(); + nullBitmap = MakeDenseBitmap(nullBitmap->data(), length, Pool); + } + + Y_VERIFY(length); + result->Payload.push_back(arrow::ArrayData::Make(ArrowType, length, { nullBitmap })); + result->Children.reserve(Children.size()); + for (ui32 i = 0; i < Children.size(); ++i) { + result->Children.emplace_back(Children[i]->BuildTree(finish)); + } + + if (!finish) { + Reserve(); + } + + return result; + } + +private: + void Reserve() { + if constexpr (Nullable) { + NullBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool); + NullBuilder->Reserve(MaxLen + 1); + } + } + +private: + TVector<std::unique_ptr<TArrayBuilderBase>> Children; + std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder; +}; + +class TExternalOptionalArrayBuilder : public TArrayBuilderBase { +public: + TExternalOptionalArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, std::unique_ptr<TArrayBuilderBase>&& inner) + : TArrayBuilderBase(typeInfoHelper, type, pool, maxLen) + , Inner(std::move(inner)) + { + Reserve(); + } + + void DoAdd(NUdf::TUnboxedValuePod value) final { + if (!value) { + NullBuilder->UnsafeAppend(0); + Inner->AddDefault(); + return; + } + + NullBuilder->UnsafeAppend(1); + Inner->Add(value.GetOptionalValue()); + } + + void DoAdd(TBlockItem value) final { + if (!value) { + NullBuilder->UnsafeAppend(0); + Inner->AddDefault(); + return; + } + + NullBuilder->UnsafeAppend(1); + Inner->Add(value.GetOptionalValue()); + } + + void DoAddDefault() final { + NullBuilder->UnsafeAppend(1); + Inner->AddDefault(); + } + + void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final { + Y_VERIFY(!array.buffers.empty()); + Y_VERIFY(array.child_data.size() == 1); + + if (array.buffers.front()) { + ui8* dstBitmap = NullBuilder->End(); + CompressAsSparseBitmap(array.GetValues<ui8>(0, 0), array.offset, sparseBitmap, dstBitmap, array.length); + NullBuilder->UnsafeAdvance(popCount); + } else { + NullBuilder->UnsafeAppend(popCount, 1); + } + + Inner->AddMany(*array.child_data[0], popCount, sparseBitmap, array.length); + } + + TBlockArrayTree::Ptr DoBuildTree(bool finish) final { + TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>(); + + std::shared_ptr<arrow::Buffer> nullBitmap; + const size_t length = GetCurrLen(); + Y_ENSURE(length == NullBuilder->Length(), "Unexpected NullBuilder length"); + nullBitmap = NullBuilder->Finish(); + nullBitmap = MakeDenseBitmap(nullBitmap->data(), length, Pool); + + Y_VERIFY(length); + result->Payload.push_back(arrow::ArrayData::Make(ArrowType, length, { nullBitmap })); + result->Children.emplace_back(Inner->BuildTree(finish)); + + if (!finish) { + Reserve(); + } + + return result; + } + +private: + void Reserve() { + NullBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool); + NullBuilder->Reserve(MaxLen + 1); + } + +private: + std::unique_ptr<TArrayBuilderBase> Inner; + std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder; +}; + +std::unique_ptr<TArrayBuilderBase> MakeArrayBuilderBase(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxBlockLength); + +template<bool Nullable> +inline std::unique_ptr<TArrayBuilderBase> MakeArrayBuilderImpl(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen) { + if constexpr (Nullable) { + TOptionalTypeInspector typeOpt(typeInfoHelper, type); + type = typeOpt.GetItemType(); + } + + TTupleTypeInspector typeTuple(typeInfoHelper, type); + if (typeTuple) { + TVector<std::unique_ptr<TArrayBuilderBase>> children; + for (ui32 i = 0; i < typeTuple.GetElementsCount(); ++i) { + const TType* childType = typeTuple.GetElementType(i); + auto childBuilder = MakeArrayBuilderBase(typeInfoHelper, childType, pool, maxLen); + children.push_back(std::move(childBuilder)); + } + + return std::make_unique<TTupleArrayBuilder<Nullable>>(typeInfoHelper, type, pool, maxLen, std::move(children)); + } + + TDataTypeInspector typeData(typeInfoHelper, type); + if (typeData) { + auto typeId = typeData.GetTypeId(); + switch (GetDataSlot(typeId)) { + case NUdf::EDataSlot::Int8: + return std::make_unique<TFixedSizeArrayBuilder<i8, Nullable>>(typeInfoHelper, type, pool, maxLen); + case NUdf::EDataSlot::Uint8: + case NUdf::EDataSlot::Bool: + return std::make_unique<TFixedSizeArrayBuilder<ui8, Nullable>>(typeInfoHelper, type, pool, maxLen); + case NUdf::EDataSlot::Int16: + return std::make_unique<TFixedSizeArrayBuilder<i16, Nullable>>(typeInfoHelper, type, pool, maxLen); + case NUdf::EDataSlot::Uint16: + case NUdf::EDataSlot::Date: + return std::make_unique<TFixedSizeArrayBuilder<ui16, Nullable>>(typeInfoHelper, type, pool, maxLen); + case NUdf::EDataSlot::Int32: + return std::make_unique<TFixedSizeArrayBuilder<i32, Nullable>>(typeInfoHelper, type, pool, maxLen); + case NUdf::EDataSlot::Uint32: + case NUdf::EDataSlot::Datetime: + return std::make_unique<TFixedSizeArrayBuilder<ui32, Nullable>>(typeInfoHelper, type, pool, maxLen); + case NUdf::EDataSlot::Int64: + case NUdf::EDataSlot::Interval: + return std::make_unique<TFixedSizeArrayBuilder<i64, Nullable>>(typeInfoHelper, type, pool, maxLen); + case NUdf::EDataSlot::Uint64: + case NUdf::EDataSlot::Timestamp: + return std::make_unique<TFixedSizeArrayBuilder<ui64, Nullable>>(typeInfoHelper, type, pool, maxLen); + case NUdf::EDataSlot::Float: + return std::make_unique<TFixedSizeArrayBuilder<float, Nullable>>(typeInfoHelper, type, pool, maxLen); + case NUdf::EDataSlot::Double: + return std::make_unique<TFixedSizeArrayBuilder<double, Nullable>>(typeInfoHelper, type, pool, maxLen); + case NUdf::EDataSlot::String: + return std::make_unique<TStringArrayBuilder<arrow::BinaryType, Nullable>>(typeInfoHelper, type, pool, maxLen); + case NUdf::EDataSlot::Utf8: + return std::make_unique<TStringArrayBuilder<arrow::StringType, Nullable>>(typeInfoHelper, type, pool, maxLen); + default: + Y_ENSURE(false, "Unsupported data slot"); + } + } + + Y_ENSURE(false, "Unsupported type"); +} + +inline std::unique_ptr<TArrayBuilderBase> MakeArrayBuilderBase(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxBlockLength) { + const TType* unpacked = type; + TOptionalTypeInspector typeOpt(typeInfoHelper, type); + if (typeOpt) { + unpacked = typeOpt.GetItemType(); + } + + TOptionalTypeInspector unpackedOpt(typeInfoHelper, unpacked); + if (unpackedOpt) { + // at least 2 levels of optionals + ui32 nestLevel = 0; + auto currentType = type; + auto previousType = type; + TVector<const TType*> types; + for (;;) { + ++nestLevel; + previousType = currentType; + types.push_back(currentType); + TOptionalTypeInspector currentOpt(typeInfoHelper, currentType); + currentType = currentOpt.GetItemType(); + TOptionalTypeInspector nexOpt(typeInfoHelper, currentType); + if (!nexOpt) { + break; + } + } + + auto builder = MakeArrayBuilderBase(typeInfoHelper, previousType, pool, maxBlockLength); + for (ui32 i = 1; i < nestLevel; ++i) { + builder = std::make_unique<TExternalOptionalArrayBuilder>(typeInfoHelper, types[nestLevel - 1 - i], pool, maxBlockLength, std::move(builder)); + } + + return builder; + } else { + if (typeOpt) { + return MakeArrayBuilderImpl<true>(typeInfoHelper, type, pool, maxBlockLength); + } else { + return MakeArrayBuilderImpl<false>(typeInfoHelper, type, pool, maxBlockLength); + } + } +} + +inline std::unique_ptr<IArrayBuilder> MakeArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxBlockLength) { + return MakeArrayBuilderBase(typeInfoHelper, type, pool, maxBlockLength); +} + +inline std::unique_ptr<IScalarBuilder> MakeScalarBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type) { + Y_UNUSED(typeInfoHelper); + Y_UNUSED(type); + Y_ENSURE(false); + return nullptr; +} + +} +} diff --git a/ydb/library/yql/public/udf/arrow/block_item.cpp b/ydb/library/yql/public/udf/arrow/block_item.cpp new file mode 100644 index 0000000000..c5668bd73d --- /dev/null +++ b/ydb/library/yql/public/udf/arrow/block_item.cpp @@ -0,0 +1,7 @@ +#include "block_item.h" + +namespace NYql { +namespace NUdf { + +} +} diff --git a/ydb/library/yql/public/udf/arrow/block_item.h b/ydb/library/yql/public/udf/arrow/block_item.h new file mode 100644 index 0000000000..727d273d3a --- /dev/null +++ b/ydb/library/yql/public/udf/arrow/block_item.h @@ -0,0 +1,149 @@ +#pragma once + +#include <ydb/library/yql/public/udf/udf_data_type.h> +#include <ydb/library/yql/public/udf/udf_string_ref.h> +#include <ydb/library/yql/public/udf/udf_type_size_check.h> + +namespace NYql::NUdf { + +class TBlockItem { + enum class EMarkers : ui8 { + Empty = 0, + Present = 1, + }; + +public: + TBlockItem() noexcept = default; + ~TBlockItem() noexcept = default; + + TBlockItem(const TBlockItem& value) noexcept = default; + TBlockItem(TBlockItem&& value) noexcept = default; + + TBlockItem& operator=(const TBlockItem& value) noexcept = default; + TBlockItem& operator=(TBlockItem&& value) noexcept = default; + + template <typename T, typename = std::enable_if_t<TPrimitiveDataType<T>::Result>> + inline explicit TBlockItem(T value); + + inline explicit TBlockItem(TStringRef value) { + Raw.String.Value = value.Data(); + Raw.String.Size = value.Size(); + Raw.Simple.Meta = static_cast<ui8>(EMarkers::Present); + } + + inline explicit TBlockItem(const TBlockItem* tupleItems) { + Raw.Tuple.Value = tupleItems; + Raw.Simple.Meta = static_cast<ui8>(EMarkers::Present); + } + + inline TBlockItem(ui64 low, ui64 high) { + Raw.Halfs[0] = low; + Raw.Halfs[1] = high; + } + + inline ui64 Low() const { + return Raw.Halfs[0]; + } + + inline ui64 High() const { + return Raw.Halfs[1]; + } + + template <typename T, typename = std::enable_if_t<TPrimitiveDataType<T>::Result>> + inline T As() const; + + inline const TBlockItem* AsTuple() const { + Y_VERIFY_DEBUG(Raw.GetMarkers() == EMarkers::Present); + return Raw.Tuple.Value; + } + + inline TStringRef AsStringRef() const { + Y_VERIFY_DEBUG(Raw.GetMarkers() == EMarkers::Present); + return TStringRef(Raw.String.Value, Raw.String.Size); + } + + inline TBlockItem MakeOptional() const + { + if (Raw.Simple.Meta) + return *this; + + TBlockItem result(*this); + ++result.Raw.Simple.Count; + return result; + } + + inline TBlockItem GetOptionalValue() const + { + if (Raw.Simple.Meta) + return *this; + + Y_VERIFY_DEBUG(Raw.Simple.Count > 0U, "Can't get value from empty."); + + TBlockItem result(*this); + --result.Raw.Simple.Count; + return result; + } + + inline explicit operator bool() const { return bool(Raw); } +private: + union TRaw { + ui64 Halfs[2] = {0, 0}; + struct { + union { + #define FIELD(type) type type##_; + PRIMITIVE_VALUE_TYPES(FIELD); + #undef FIELD + ui64 Count; + }; + union { + ui64 Pad; + struct { + ui8 Reserved[7]; + ui8 Meta; + }; + }; + } Simple; + + struct { + const char* Value; + ui32 Size; + } String; + + struct { + // client should know tuple size + const TBlockItem* Value; + } Tuple; + + EMarkers GetMarkers() const { + return static_cast<EMarkers>(Simple.Meta); + } + + explicit operator bool() const { return Simple.Meta | Simple.Count; } + } Raw; +}; + +UDF_ASSERT_TYPE_SIZE(TBlockItem, 16); + +#define VALUE_GET(xType) \ + template <> \ + inline xType TBlockItem::As<xType>() const \ + { \ + Y_VERIFY_DEBUG(Raw.GetMarkers() == EMarkers::Present); \ + return Raw.Simple.xType##_; \ + } + +#define VALUE_CONSTR(xType) \ + template <> \ + inline TBlockItem::TBlockItem(xType value) \ + { \ + Raw.Simple.xType##_ = value; \ + Raw.Simple.Meta = static_cast<ui8>(EMarkers::Present); \ + } + +PRIMITIVE_VALUE_TYPES(VALUE_GET) +PRIMITIVE_VALUE_TYPES(VALUE_CONSTR) + +#undef VALUE_GET +#undef VALUE_CONSTR + +} diff --git a/ydb/library/yql/public/udf/arrow/block_reader.cpp b/ydb/library/yql/public/udf/arrow/block_reader.cpp new file mode 100644 index 0000000000..89d21389af --- /dev/null +++ b/ydb/library/yql/public/udf/arrow/block_reader.cpp @@ -0,0 +1,7 @@ +#include "block_reader.h" + +namespace NYql { +namespace NUdf { + +} +} diff --git a/ydb/library/yql/public/udf/arrow/block_reader.h b/ydb/library/yql/public/udf/arrow/block_reader.h new file mode 100644 index 0000000000..f998b9c357 --- /dev/null +++ b/ydb/library/yql/public/udf/arrow/block_reader.h @@ -0,0 +1,236 @@ +#pragma once + +#include "block_item.h" +#include "util.h" +#include <arrow/datum.h> + +#include <ydb/library/yql/public/udf/udf_type_inspection.h> + +namespace NYql { +namespace NUdf { + +class IBlockReader : private TNonCopyable { +public: + virtual ~IBlockReader() = default; + // result will reference to Array/Scalar internals and will be valid until next call to GetItem/GetScalarItem + virtual TBlockItem GetItem(const arrow::ArrayData& data, size_t index) = 0; + virtual TBlockItem GetScalarItem(const arrow::Scalar& scalar) = 0; +}; + +template <typename T> +class TFixedSizeBlockReader : public IBlockReader { +public: + TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final { + if (IsNull(data, index)) { + return {}; + } + + return TBlockItem(data.GetValues<T>(1)[index]); + } + + TBlockItem GetScalarItem(const arrow::Scalar& scalar) final { + if (!scalar.is_valid) { + return {}; + } + + return TBlockItem(*static_cast<const T*>(arrow::internal::checked_cast<const arrow::internal::PrimitiveScalarBase&>(scalar).data())); + } +}; + +template<typename TStringType> +class TStringBlockReader : public IBlockReader { +public: + using TOffset = typename TStringType::offset_type; + + TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final { + Y_VERIFY_DEBUG(data.buffers.size() == 3); + if (IsNull(data, index)) { + return {}; + } + + const TOffset* offsets = data.GetValues<TOffset>(1); + const char* strData = data.GetValues<char>(2, 0); + + std::string_view str(strData + offsets[index], offsets[index + 1] - offsets[index]); + return TBlockItem(str); + } + + TBlockItem GetScalarItem(const arrow::Scalar& scalar) final { + if (!scalar.is_valid) { + return {}; + } + + auto buffer = arrow::internal::checked_cast<const arrow::BaseBinaryScalar&>(scalar).value; + std::string_view str(reinterpret_cast<const char*>(buffer->data()), buffer->size()); + return TBlockItem(str); + } +}; + +class TTupleBlockReader : public IBlockReader { +public: + TTupleBlockReader(TVector<std::unique_ptr<IBlockReader>>&& children) + : Children(std::move(children)) + , Items(Children.size()) + {} + + TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final { + if (IsNull(data, index)) { + return {}; + } + + for (ui32 i = 0; i < Children.size(); ++i) { + Items[i] = Children[i]->GetItem(*data.child_data[i], index); + } + + return TBlockItem(Items.data()); + } + + TBlockItem GetScalarItem(const arrow::Scalar& scalar) final { + if (!scalar.is_valid) { + return {}; + } + + const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(scalar); + + for (ui32 i = 0; i < Children.size(); ++i) { + Items[i] = Children[i]->GetScalarItem(*structScalar.value[i]); + } + + return TBlockItem(Items.data()); + } + +private: + const TVector<std::unique_ptr<IBlockReader>> Children; + TVector<TBlockItem> Items; +}; + +class TExternalOptionalBlockReader : public IBlockReader { +public: + TExternalOptionalBlockReader(std::unique_ptr<IBlockReader>&& inner) + : Inner(std::move(inner)) + {} + + TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final { + if (IsNull(data, index)) { + return {}; + } + + return Inner->GetItem(*data.child_data[0], index).MakeOptional(); + } + + TBlockItem GetScalarItem(const arrow::Scalar& scalar) final { + if (!scalar.is_valid) { + return {}; + } + + const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(scalar); + return Inner->GetScalarItem(*structScalar.value[0]).MakeOptional(); + } + +private: + const std::unique_ptr<IBlockReader> Inner; +}; + +struct TReaderTraits { + using TResult = IBlockReader; + using TTuple = TTupleBlockReader; + template <typename T> + using TFixedSize = TFixedSizeBlockReader<T>; + template <typename TStringType> + using TStrings = TStringBlockReader<TStringType>; + using TExtOptional = TExternalOptionalBlockReader; +}; + +template <typename TTraits> +std::unique_ptr<typename TTraits::TResult> MakeBlockReaderImpl(const ITypeInfoHelper& typeInfoHelper, const TType* type) { + const TType* unpacked = type; + TOptionalTypeInspector typeOpt(typeInfoHelper, type); + if (typeOpt) { + unpacked = typeOpt.GetItemType(); + } + + TOptionalTypeInspector unpackedOpt(typeInfoHelper, unpacked); + if (unpackedOpt) { + // at least 2 levels of optionals + ui32 nestLevel = 0; + auto currentType = type; + auto previousType = type; + for (;;) { + ++nestLevel; + previousType = currentType; + TOptionalTypeInspector currentOpt(typeInfoHelper, currentType); + currentType = currentOpt.GetItemType(); + TOptionalTypeInspector nexOpt(typeInfoHelper, currentType); + if (!nexOpt) { + break; + } + } + + auto reader = MakeBlockReaderImpl<TTraits>(typeInfoHelper, previousType); + for (ui32 i = 1; i < nestLevel; ++i) { + reader = std::make_unique<typename TTraits::TExtOptional>(std::move(reader)); + } + + return reader; + } + else { + type = unpacked; + } + + TTupleTypeInspector typeTuple(typeInfoHelper, type); + if (typeTuple) { + TVector<std::unique_ptr<typename TTraits::TResult>> children; + for (ui32 i = 0; i < typeTuple.GetElementsCount(); ++i) { + children.emplace_back(MakeBlockReaderImpl<TTraits>(typeInfoHelper, typeTuple.GetElementType(i))); + } + + return std::make_unique<typename TTraits::TTuple>(std::move(children)); + } + + TDataTypeInspector typeData(typeInfoHelper, type); + if (typeData) { + auto typeId = typeData.GetTypeId(); + switch (GetDataSlot(typeId)) { + case NUdf::EDataSlot::Int8: + return std::make_unique<typename TTraits::template TFixedSize<i8>>(); + case NUdf::EDataSlot::Bool: + case NUdf::EDataSlot::Uint8: + return std::make_unique<typename TTraits::template TFixedSize<ui8>>(); + case NUdf::EDataSlot::Int16: + return std::make_unique<typename TTraits::template TFixedSize<i16>>(); + case NUdf::EDataSlot::Uint16: + case NUdf::EDataSlot::Date: + return std::make_unique<typename TTraits::template TFixedSize<ui16>>(); + case NUdf::EDataSlot::Int32: + return std::make_unique<typename TTraits::template TFixedSize<i32>>(); + case NUdf::EDataSlot::Uint32: + case NUdf::EDataSlot::Datetime: + return std::make_unique<typename TTraits::template TFixedSize<ui32>>(); + case NUdf::EDataSlot::Int64: + case NUdf::EDataSlot::Interval: + return std::make_unique<typename TTraits::template TFixedSize<i64>>(); + case NUdf::EDataSlot::Uint64: + case NUdf::EDataSlot::Timestamp: + return std::make_unique<typename TTraits::template TFixedSize<ui64>>(); + case NUdf::EDataSlot::Float: + return std::make_unique<typename TTraits::template TFixedSize<float>>(); + case NUdf::EDataSlot::Double: + return std::make_unique<typename TTraits::template TFixedSize<double>>(); + case NUdf::EDataSlot::String: + return std::make_unique<typename TTraits::template TStrings<arrow::BinaryType>>(); + case NUdf::EDataSlot::Utf8: + return std::make_unique<typename TTraits::template TStrings<arrow::StringType>>(); + default: + Y_ENSURE(false, "Unsupported data slot"); + } + } + + Y_ENSURE(false, "Unsupported type"); +} + +inline std::unique_ptr<IBlockReader> MakeBlockReader(const ITypeInfoHelper& typeInfoHelper, const TType* type) { + return MakeBlockReaderImpl<TReaderTraits>(typeInfoHelper, type); +} + +} +} diff --git a/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h b/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h index cfdc9030cf..5d90cc1380 100644 --- a/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h +++ b/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h @@ -8,6 +8,8 @@ #include "defs.h" #include "util.h" #include "args_dechunker.h" +#include "block_reader.h" +#include "block_builder.h" #include <arrow/array/array_base.h> #include <arrow/array/util.h> @@ -21,44 +23,103 @@ namespace NUdf { using TExec = arrow::Status(*)(arrow::compute::KernelContext*, const arrow::compute::ExecBatch&, arrow::Datum*); -class TSimpleArrowUdfImpl : public TBoxedValue { +class TUdfKernelState : public arrow::compute::KernelState { public: - TSimpleArrowUdfImpl(const TVector<std::shared_ptr<arrow::DataType>>& argTypes, bool onlyScalars, IArrowType::TPtr&& returnType, - TExec exec, IFunctionTypeInfoBuilder& builder, const TString& name) + TUdfKernelState(const TVector<const TType*>& argTypes, const TType* outputType, bool onlyScalars, const ITypeInfoHelper::TPtr& typeInfoHelper) : ArgTypes_(argTypes) + , OutputType_(outputType) , OnlyScalars_(onlyScalars) - , ReturnType_(std::move(returnType)) + , TypeInfoHelper_(typeInfoHelper) + { + Readers_.resize(ArgTypes_.size()); + Y_UNUSED(OutputType_); + } + + IBlockReader& GetReader(ui32 index) { + if (!Readers_[index]) { + Readers_[index] = MakeBlockReader(*TypeInfoHelper_, ArgTypes_[index]); + } + + return *Readers_[index]; + } + + IArrayBuilder& GetArrayBuilder() { + Y_ENSURE(!OnlyScalars_); + if (!ArrayBuilder_) { + ArrayBuilder_ = MakeArrayBuilder(*TypeInfoHelper_, OutputType_, *arrow::default_memory_pool(), TypeInfoHelper_->GetMaxBlockLength(OutputType_)); + } + + return *ArrayBuilder_; + } + + IScalarBuilder& GetScalarBuilder() { + Y_ENSURE(OnlyScalars_); + if (!ScalarBuilder_) { + ScalarBuilder_ = MakeScalarBuilder(*TypeInfoHelper_, OutputType_); + } + + return *ScalarBuilder_; + } + +private: + const TVector<const TType*> ArgTypes_; + const TType* OutputType_; + const bool OnlyScalars_; + const ITypeInfoHelper::TPtr TypeInfoHelper_; + TVector<std::unique_ptr<IBlockReader>> Readers_; + std::unique_ptr<IArrayBuilder> ArrayBuilder_; + std::unique_ptr<IScalarBuilder> ScalarBuilder_; +}; + +class TSimpleArrowUdfImpl : public TBoxedValue { +public: + TSimpleArrowUdfImpl(const TVector<const TType*> argTypes, const TType* outputType, bool onlyScalars, + TExec exec, IFunctionTypeInfoBuilder& builder, const TString& name) + : OnlyScalars_(onlyScalars) , Exec_(exec) , Pos_(GetSourcePosition(builder)) , Name_(name) , KernelContext_(&ExecContext_) { + auto typeInfoHelper = builder.TypeInfoHelper(); Kernel_.null_handling = arrow::compute::NullHandling::COMPUTED_NO_PREALLOCATE; Kernel_.exec = Exec_; std::vector<arrow::compute::InputType> inTypes; - for (const auto& t : ArgTypes_) { - inTypes.emplace_back(t); - ArgsValuesDescr_.emplace_back(t); + for (const auto& t : argTypes) { + auto arrowTypeHandle = typeInfoHelper->MakeArrowType(t); + Y_ENSURE(arrowTypeHandle); + ArrowSchema s; + arrowTypeHandle->Export(&s); + auto type = ARROW_RESULT(arrow::ImportType(&s)); + ArgArrowTypes_.emplace_back(type); + + inTypes.emplace_back(type); + ArgsValuesDescr_.emplace_back(type); } + ReturnArrowTypeHandle_ = typeInfoHelper->MakeArrowType(outputType); + Y_ENSURE(ReturnArrowTypeHandle_); + ArrowSchema s; - ReturnType_->Export(&s); + ReturnArrowTypeHandle_->Export(&s); arrow::compute::OutputType outType = ARROW_RESULT(arrow::ImportType(&s)); Kernel_.signature = arrow::compute::KernelSignature::Make(std::move(inTypes), std::move(outType)); + KernelState_ = std::make_unique<TUdfKernelState>(argTypes, outputType, onlyScalars, typeInfoHelper); + KernelContext_.SetState(KernelState_.get()); } TUnboxedValue Run(const IValueBuilder* valueBuilder, const TUnboxedValuePod* args) const final { try { - TVector<arrow::Datum> argDatums(ArgTypes_.size()); - for (ui32 i = 0; i < ArgTypes_.size(); ++i) { + TVector<arrow::Datum> argDatums(ArgArrowTypes_.size()); + for (ui32 i = 0; i < ArgArrowTypes_.size(); ++i) { bool isScalar; ui64 length; ui32 chunkCount = valueBuilder->GetArrowBlockChunks(args[i], isScalar, length); if (isScalar) { ArrowArray a; valueBuilder->ExportArrowBlock(args[i], 0, &a); - auto arr = ARROW_RESULT(arrow::ImportArray(&a, ArgTypes_[i])); + auto arr = ARROW_RESULT(arrow::ImportArray(&a, ArgArrowTypes_[i])); auto scalar = ARROW_RESULT(arr->GetScalar(0)); argDatums[i] = scalar; } else { @@ -66,14 +127,14 @@ public: for (ui32 i = 0; i < chunkCount; ++i) { ArrowArray a; valueBuilder->ExportArrowBlock(args[i], i, &a); - auto arr = ARROW_RESULT(arrow::ImportArray(&a, ArgTypes_[i])); + auto arr = ARROW_RESULT(arrow::ImportArray(&a, ArgArrowTypes_[i])); imported[i] = arr; } if (chunkCount == 1) { argDatums[i] = imported.front(); } else { - argDatums[i] = ARROW_RESULT(arrow::ChunkedArray::Make(std::move(imported), ArgTypes_[i])); + argDatums[i] = ARROW_RESULT(arrow::ChunkedArray::Make(std::move(imported), ArgArrowTypes_[i])); } } } @@ -106,7 +167,7 @@ public: auto arr = ARROW_RESULT(arrow::MakeArrayFromScalar(*res.scalar(), 1)); ArrowArray a; ARROW_OK(arrow::ExportArray(*arr, &a)); - return valueBuilder->ImportArrowBlock(&a, 1, true, *ReturnType_); + return valueBuilder->ImportArrowBlock(&a, 1, true, *ReturnArrowTypeHandle_); } else { TVector<ArrowArray> a; if (res.is_array()) { @@ -120,7 +181,7 @@ public: } } - return valueBuilder->ImportArrowBlock(a.data(), a.size(), false, *ReturnType_); + return valueBuilder->ImportArrowBlock(a.data(), a.size(), false, *ReturnArrowTypeHandle_); } } catch (const std::exception&) { TStringBuilder sb; @@ -132,17 +193,19 @@ public: } private: - const TVector<std::shared_ptr<arrow::DataType>> ArgTypes_; const bool OnlyScalars_; - IArrowType::TPtr ReturnType_; const TExec Exec_; TSourcePosition Pos_; const TString Name_; + TVector<std::shared_ptr<arrow::DataType>> ArgArrowTypes_; + IArrowType::TPtr ReturnArrowTypeHandle_; + arrow::compute::ExecContext ExecContext_; mutable arrow::compute::KernelContext KernelContext_; arrow::compute::ScalarKernel Kernel_; std::vector<arrow::ValueDescr> ArgsValuesDescr_; + std::unique_ptr<TUdfKernelState> KernelState_; }; inline void PrepareSimpleArrowUdf(IFunctionTypeInfoBuilder& builder, TType* signature, TType* userType, TExec exec, bool typesOnly, @@ -177,12 +240,12 @@ inline void PrepareSimpleArrowUdf(IFunctionTypeInfoBuilder& builder, TType* sign builder.UserType(userType); Y_ENSURE(hasBlocks); - TVector<std::shared_ptr<arrow::DataType>> argTypes; + TVector<const TType*> argTypes; auto argsBuilder = builder.Args(callableInspector.GetArgsCount()); for (ui32 i = 0; i < argsInspector.GetElementsCount(); ++i) { TBlockTypeInspector blockInspector(*typeInfoHelper, argsInspector.GetElementType(i)); - auto initalType = callableInspector.GetArgType(i); - argsBuilder->Add(builder.Block(blockInspector.IsScalar())->Item(initalType).Build()); + auto type = callableInspector.GetArgType(i); + argsBuilder->Add(builder.Block(blockInspector.IsScalar())->Item(type).Build()); if (callableInspector.GetArgumentName(i).Size() > 0) { argsBuilder->Name(callableInspector.GetArgumentName(i)); } @@ -191,11 +254,6 @@ inline void PrepareSimpleArrowUdf(IFunctionTypeInfoBuilder& builder, TType* sign argsBuilder->Flags(callableInspector.GetArgumentFlags(i)); } - auto arrowTypeHandle = typeInfoHelper->MakeArrowType(initalType); - Y_ENSURE(arrowTypeHandle); - ArrowSchema s; - arrowTypeHandle->Export(&s); - auto type = ARROW_RESULT(arrow::ImportType(&s)); argTypes.emplace_back(type); } @@ -209,12 +267,40 @@ inline void PrepareSimpleArrowUdf(IFunctionTypeInfoBuilder& builder, TType* sign } if (!typesOnly) { - auto returnType = typeInfoHelper->MakeArrowType(callableInspector.GetReturnType()); - Y_ENSURE(returnType); - builder.Implementation(new TSimpleArrowUdfImpl(argTypes, onlyScalars, std::move(returnType), exec, builder, name)); + builder.Implementation(new TSimpleArrowUdfImpl(argTypes, callableInspector.GetReturnType(), onlyScalars, exec, builder, name)); } } +template <typename TDerived> +struct TUnaryKernelExec { + static arrow::Status Do(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) { + auto& state = dynamic_cast<TUdfKernelState&>(*ctx->state()); + auto& reader = state.GetReader(0); + const auto& arg = batch.values[0]; + if (arg.is_scalar()) { + auto& builder = state.GetScalarBuilder(); + auto item = reader.GetScalarItem(*arg.scalar()); + TDerived::Process(item, [&](TBlockItem out) { + *res = builder.Build(out); + }); + } + else { + auto& array = *arg.array(); + auto& builder = state.GetArrayBuilder(); + for (int64_t i = 0; i < array.length; ++i) { + auto item = reader.GetItem(array, i); + TDerived::Process(item, [&](TBlockItem out) { + builder.Add(out); + }); + } + + *res = builder.Build(false); + } + + return arrow::Status::OK(); + } +}; + } } diff --git a/ydb/library/yql/public/udf/arrow/util.cpp b/ydb/library/yql/public/udf/arrow/util.cpp index d8b8c6b6bb..6544018f5b 100644 --- a/ydb/library/yql/public/udf/arrow/util.cpp +++ b/ydb/library/yql/public/udf/arrow/util.cpp @@ -1,4 +1,5 @@ #include "util.h" +#include "bit_util.h" #include "defs.h" #include <arrow/array/array_base.h> @@ -7,6 +8,20 @@ namespace NYql { namespace NUdf { +std::shared_ptr<arrow::Buffer> AllocateBitmapWithReserve(size_t bitCount, arrow::MemoryPool* pool) { + // align up to 64 bit + bitCount = (bitCount + 63u) & ~size_t(63u); + // this simplifies code compression code - we can write single 64 bit word after array boundaries + bitCount += 64; + return ARROW_RESULT(arrow::AllocateBitmap(bitCount, pool)); +} + +std::shared_ptr<arrow::Buffer> MakeDenseBitmap(const ui8* srcSparse, size_t len, arrow::MemoryPool* pool) { + auto bitmap = AllocateBitmapWithReserve(len, pool); + CompressSparseBitmap(bitmap->mutable_data(), srcSparse, len); + return bitmap; +} + std::shared_ptr<arrow::ArrayData> DeepSlice(const std::shared_ptr<arrow::ArrayData>& data, size_t offset, size_t len) { Y_ENSURE(data->length >= 0); Y_ENSURE(offset + len <= (size_t)data->length); diff --git a/ydb/library/yql/public/udf/arrow/util.h b/ydb/library/yql/public/udf/arrow/util.h index ec8a0ebb11..07accc005e 100644 --- a/ydb/library/yql/public/udf/arrow/util.h +++ b/ydb/library/yql/public/udf/arrow/util.h @@ -1,14 +1,21 @@ #pragma once +#include "defs.h" + #include <util/generic/vector.h> +#include <arrow/buffer_builder.h> #include <arrow/datum.h> +#include <arrow/util/bit_util.h> #include <functional> namespace NYql { namespace NUdf { +std::shared_ptr<arrow::Buffer> AllocateBitmapWithReserve(size_t bitCount, arrow::MemoryPool* pool); +std::shared_ptr<arrow::Buffer> MakeDenseBitmap(const ui8* srcSparse, size_t len, arrow::MemoryPool* pool); + /// \brief Recursive version of ArrayData::Slice() method std::shared_ptr<arrow::ArrayData> DeepSlice(const std::shared_ptr<arrow::ArrayData>& data, size_t offset, size_t len); @@ -18,5 +25,69 @@ std::shared_ptr<arrow::ArrayData> Chop(std::shared_ptr<arrow::ArrayData>& data, void ForEachArrayData(const arrow::Datum& datum, const std::function<void(const std::shared_ptr<arrow::ArrayData>&)>& func); arrow::Datum MakeArray(const TVector<std::shared_ptr<arrow::ArrayData>>& chunks); +inline bool IsNull(const arrow::ArrayData& data, size_t index) { + return data.GetNullCount() > 0 && !arrow::BitUtil::GetBit(data.GetValues<uint8_t>(0, 0), index + data.offset); +} + +// similar to arrow::TypedBufferBuilder, but with UnsafeAdvance() method +// and shrinkToFit = false +template<typename T> +class TTypedBufferBuilder { + static_assert(std::is_pod_v<T>); + static_assert(!std::is_same_v<T, bool>); +public: + explicit TTypedBufferBuilder(arrow::MemoryPool* pool) + : Builder(pool) + { + } + + inline void Reserve(size_t size) { + ARROW_OK(Builder.Reserve(size * sizeof(T))); + } + + inline size_t Length() const { + return Builder.length() / sizeof(T); + } + + inline T* MutableData() { + return reinterpret_cast<T*>(Builder.mutable_data()); + } + + inline T* End() { + return MutableData() + Length(); + } + + inline const T* Data() const { + return reinterpret_cast<const T*>(Builder.data()); + } + + inline void UnsafeAppend(const T* values, size_t count) { + std::memcpy(End(), values, count * sizeof(T)); + UnsafeAdvance(count); + } + + inline void UnsafeAppend(size_t count, const T& value) { + T* target = End(); + std::fill(target, target + count, value); + UnsafeAdvance(count); + } + + inline void UnsafeAppend(T&& value) { + *End() = std::move(value); + UnsafeAdvance(1); + } + + inline void UnsafeAdvance(size_t count) { + Builder.UnsafeAdvance(count * sizeof(T)); + } + + inline std::shared_ptr<arrow::Buffer> Finish() { + bool shrinkToFit = false; + return ARROW_RESULT(Builder.Finish(shrinkToFit)); + } +private: + arrow::BufferBuilder Builder; +}; + } } diff --git a/ydb/library/yql/public/udf/udf_types.h b/ydb/library/yql/public/udf/udf_types.h index 5fee361e3d..cc31f536c4 100644 --- a/ydb/library/yql/public/udf/udf_types.h +++ b/ydb/library/yql/public/udf/udf_types.h @@ -270,6 +270,7 @@ private: } }; +#if UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 25) class ITypeInfoHelper2 : public ITypeInfoHelper1 { public: using TPtr = TRefCountedPtr<ITypeInfoHelper2>; @@ -277,6 +278,7 @@ public: public: virtual const TPgTypeDescription* FindPgTypeDescription(ui32 typeId) const = 0; }; +#endif ////////////////////////////////////////////////////////////////////////////// // IArrowType @@ -293,6 +295,7 @@ public: UDF_ASSERT_TYPE_SIZE(IArrowType, 8); +#if UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 26) class ITypeInfoHelper3 : public ITypeInfoHelper2 { public: using TPtr = TRefCountedPtr<ITypeInfoHelper3>; @@ -303,8 +306,22 @@ public: // The given ArrowSchema struct is released, even if this function fails. virtual IArrowType::TPtr ImportArrowType(ArrowSchema* schema) const = 0; }; +#endif -#if UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 26) +#if UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 29) +class ITypeInfoHelper4 : public ITypeInfoHelper3 { +public: + using TPtr = TRefCountedPtr<ITypeInfoHelper4>; + +public: + virtual ui64 GetMaxBlockLength(const TType* type) const = 0; + virtual ui64 GetMaxBlockBytes() const = 0; +}; +#endif + +#if UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 29) +using ITypeInfoHelper = ITypeInfoHelper4; +#elif UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 26) using ITypeInfoHelper = ITypeInfoHelper3; #elif UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 25) using ITypeInfoHelper = ITypeInfoHelper2; diff --git a/ydb/library/yql/public/udf/udf_version.h b/ydb/library/yql/public/udf/udf_version.h index 303a707147..52cb0f27b6 100644 --- a/ydb/library/yql/public/udf/udf_version.h +++ b/ydb/library/yql/public/udf/udf_version.h @@ -7,7 +7,7 @@ namespace NYql { namespace NUdf { #define CURRENT_UDF_ABI_VERSION_MAJOR 2 -#define CURRENT_UDF_ABI_VERSION_MINOR 28 +#define CURRENT_UDF_ABI_VERSION_MINOR 29 #define CURRENT_UDF_ABI_VERSION_PATCH 0 #ifdef USE_CURRENT_UDF_ABI_VERSION diff --git a/ydb/library/yql/udfs/common/url_base/CMakeLists.darwin.txt b/ydb/library/yql/udfs/common/url_base/CMakeLists.darwin.txt index a6ac71608b..d6161089ba 100644 --- a/ydb/library/yql/udfs/common/url_base/CMakeLists.darwin.txt +++ b/ydb/library/yql/udfs/common/url_base/CMakeLists.darwin.txt @@ -20,7 +20,7 @@ target_link_libraries(url_udf INTERFACE add_global_library_for(url_udf.global url_udf) target_compile_options(url_udf.global PRIVATE -DUDF_ABI_VERSION_MAJOR=2 - -DUDF_ABI_VERSION_MINOR=28 + -DUDF_ABI_VERSION_MINOR=29 -DUDF_ABI_VERSION_PATCH=0 ) target_link_libraries(url_udf.global PUBLIC diff --git a/ydb/library/yql/udfs/common/url_base/CMakeLists.linux-aarch64.txt b/ydb/library/yql/udfs/common/url_base/CMakeLists.linux-aarch64.txt index ad561a195c..dc4b9a0dba 100644 --- a/ydb/library/yql/udfs/common/url_base/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/udfs/common/url_base/CMakeLists.linux-aarch64.txt @@ -21,7 +21,7 @@ target_link_libraries(url_udf INTERFACE add_global_library_for(url_udf.global url_udf) target_compile_options(url_udf.global PRIVATE -DUDF_ABI_VERSION_MAJOR=2 - -DUDF_ABI_VERSION_MINOR=28 + -DUDF_ABI_VERSION_MINOR=29 -DUDF_ABI_VERSION_PATCH=0 ) target_link_libraries(url_udf.global PUBLIC diff --git a/ydb/library/yql/udfs/common/url_base/CMakeLists.linux.txt b/ydb/library/yql/udfs/common/url_base/CMakeLists.linux.txt index ad561a195c..dc4b9a0dba 100644 --- a/ydb/library/yql/udfs/common/url_base/CMakeLists.linux.txt +++ b/ydb/library/yql/udfs/common/url_base/CMakeLists.linux.txt @@ -21,7 +21,7 @@ target_link_libraries(url_udf INTERFACE add_global_library_for(url_udf.global url_udf) target_compile_options(url_udf.global PRIVATE -DUDF_ABI_VERSION_MAJOR=2 - -DUDF_ABI_VERSION_MINOR=28 + -DUDF_ABI_VERSION_MINOR=29 -DUDF_ABI_VERSION_PATCH=0 ) target_link_libraries(url_udf.global PUBLIC diff --git a/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.darwin.txt b/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.darwin.txt index c56c96c1fb..cf9f8d6bda 100644 --- a/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.darwin.txt +++ b/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.darwin.txt @@ -10,7 +10,7 @@ add_library(common-url_base-lib) target_compile_options(common-url_base-lib PRIVATE -DUDF_ABI_VERSION_MAJOR=2 - -DUDF_ABI_VERSION_MINOR=28 + -DUDF_ABI_VERSION_MINOR=29 -DUDF_ABI_VERSION_PATCH=0 ) target_link_libraries(common-url_base-lib PUBLIC diff --git a/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.linux-aarch64.txt b/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.linux-aarch64.txt index 8346301c6c..cdc5b23fcb 100644 --- a/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.linux-aarch64.txt @@ -10,7 +10,7 @@ add_library(common-url_base-lib) target_compile_options(common-url_base-lib PRIVATE -DUDF_ABI_VERSION_MAJOR=2 - -DUDF_ABI_VERSION_MINOR=28 + -DUDF_ABI_VERSION_MINOR=29 -DUDF_ABI_VERSION_PATCH=0 ) target_link_libraries(common-url_base-lib PUBLIC diff --git a/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.linux.txt b/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.linux.txt index 8346301c6c..cdc5b23fcb 100644 --- a/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.linux.txt +++ b/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.linux.txt @@ -10,7 +10,7 @@ add_library(common-url_base-lib) target_compile_options(common-url_base-lib PRIVATE -DUDF_ABI_VERSION_MAJOR=2 - -DUDF_ABI_VERSION_MINOR=28 + -DUDF_ABI_VERSION_MINOR=29 -DUDF_ABI_VERSION_PATCH=0 ) target_link_libraries(common-url_base-lib PUBLIC diff --git a/ydb/library/yql/udfs/common/url_base/lib/url_base_udf.h b/ydb/library/yql/udfs/common/url_base/lib/url_base_udf.h index 5fcf17c914..b2c74915d9 100644 --- a/ydb/library/yql/udfs/common/url_base/lib/url_base_udf.h +++ b/ydb/library/yql/udfs/common/url_base/lib/url_base_udf.h @@ -41,12 +41,6 @@ SIMPLE_UDF(TGetScheme, char*(TAutoMap<char*>)) { return valueBuilder->SubString(args[0], std::distance(url.begin(), prefix.begin()), prefix.size()); } -inline arrow::Status GetHostKernelExec(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) { - Y_UNUSED(ctx); - *res = batch.values[0]; - return arrow::Status::OK(); -} - BEGIN_SIMPLE_ARROW_UDF(TGetHost, TOptional<char*>(TOptional<char*>)) { EMPTY_RESULT_ON_EMPTY_ARG(0); const std::string_view url(args[0].AsStringRef()); @@ -55,7 +49,24 @@ BEGIN_SIMPLE_ARROW_UDF(TGetHost, TOptional<char*>(TOptional<char*>)) { valueBuilder->SubString(args[0], std::distance(url.begin(), host.begin()), host.size()); } -END_SIMPLE_ARROW_UDF(TGetHost, GetHostKernelExec); +struct TGetHostKernelExec : public TUnaryKernelExec<TGetHostKernelExec> { + template <typename TSink> + static void Process(TBlockItem arg, const TSink& sink) { + if (!arg) { + return sink(TBlockItem()); + } + + const std::string_view url(arg.AsStringRef()); + const std::string_view host(GetOnlyHost(url)); + if (host.empty()) { + return sink(TBlockItem()); + } + + sink(TBlockItem(TStringRef(host))); + } +}; + +END_SIMPLE_ARROW_UDF(TGetHost, TGetHostKernelExec::Do); SIMPLE_UDF(TGetHostPort, TOptional<char*>(TOptional<char*>)) { EMPTY_RESULT_ON_EMPTY_ARG(0); |