aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2023-01-25 23:20:45 +0300
committervvvv <vvvv@ydb.tech>2023-01-25 23:20:45 +0300
commit252723c46e7003fc87b6545a53370e25a4be74d0 (patch)
tree93f54cf5db40a67adefe8565df415425be0a94d1
parentfbcf4e52d02eef4937008d7d332b87be3fb0ed2e (diff)
downloadydb-252723c46e7003fc87b6545a53370e25a4be74d0.tar.gz
extracted readers&builders for simple block udfs, implemented vectorized Url::GetHost
-rw-r--r--ydb/library/yql/minikql/arrow/arrow_util.cpp14
-rw-r--r--ydb/library/yql/minikql/arrow/arrow_util.h64
-rw-r--r--ydb/library/yql/minikql/arrow/mkql_bit_utils.h65
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp4
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp792
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_builder.h27
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp4
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_if.cpp6
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_impl.cpp4
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_item.h145
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp195
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_reader.h16
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp18
-rw-r--r--ydb/library/yql/minikql/mkql_type_builder.cpp59
-rw-r--r--ydb/library/yql/minikql/mkql_type_builder.h14
-rw-r--r--ydb/library/yql/public/udf/arrow/CMakeLists.darwin.txt3
-rw-r--r--ydb/library/yql/public/udf/arrow/CMakeLists.linux-aarch64.txt3
-rw-r--r--ydb/library/yql/public/udf/arrow/CMakeLists.linux.txt3
-rw-r--r--ydb/library/yql/public/udf/arrow/bit_util.cpp7
-rw-r--r--ydb/library/yql/public/udf/arrow/bit_util.h68
-rw-r--r--ydb/library/yql/public/udf/arrow/block_builder.h784
-rw-r--r--ydb/library/yql/public/udf/arrow/block_item.cpp7
-rw-r--r--ydb/library/yql/public/udf/arrow/block_item.h149
-rw-r--r--ydb/library/yql/public/udf/arrow/block_reader.cpp7
-rw-r--r--ydb/library/yql/public/udf/arrow/block_reader.h236
-rw-r--r--ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h142
-rw-r--r--ydb/library/yql/public/udf/arrow/util.cpp15
-rw-r--r--ydb/library/yql/public/udf/arrow/util.h71
-rw-r--r--ydb/library/yql/public/udf/udf_types.h19
-rw-r--r--ydb/library/yql/public/udf/udf_version.h2
-rw-r--r--ydb/library/yql/udfs/common/url_base/CMakeLists.darwin.txt2
-rw-r--r--ydb/library/yql/udfs/common/url_base/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/library/yql/udfs/common/url_base/CMakeLists.linux.txt2
-rw-r--r--ydb/library/yql/udfs/common/url_base/lib/CMakeLists.darwin.txt2
-rw-r--r--ydb/library/yql/udfs/common/url_base/lib/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/library/yql/udfs/common/url_base/lib/CMakeLists.linux.txt2
-rw-r--r--ydb/library/yql/udfs/common/url_base/lib/url_base_udf.h25
37 files changed, 1649 insertions, 1331 deletions
diff --git a/ydb/library/yql/minikql/arrow/arrow_util.cpp b/ydb/library/yql/minikql/arrow/arrow_util.cpp
index 942ae6ef95..3c43da0b9d 100644
--- a/ydb/library/yql/minikql/arrow/arrow_util.cpp
+++ b/ydb/library/yql/minikql/arrow/arrow_util.cpp
@@ -25,18 +25,4 @@ std::shared_ptr<arrow::ArrayData> Unwrap(const arrow::ArrayData& data, TType* it
}
}
-std::shared_ptr<arrow::Buffer> AllocateBitmapWithReserve(size_t bitCount, arrow::MemoryPool* pool) {
- // align up to 64 bit
- bitCount = (bitCount + 63u) & ~size_t(63u);
- // this simplifies code compression code - we can write single 64 bit word after array boundaries
- bitCount += 64;
- return ARROW_RESULT(arrow::AllocateBitmap(bitCount, pool));
-}
-
-std::shared_ptr<arrow::Buffer> MakeDenseBitmap(const ui8* srcSparse, size_t len, arrow::MemoryPool* pool) {
- auto bitmap = AllocateBitmapWithReserve(len, pool);
- CompressSparseBitmap(bitmap->mutable_data(), srcSparse, len);
- return bitmap;
-}
-
}
diff --git a/ydb/library/yql/minikql/arrow/arrow_util.h b/ydb/library/yql/minikql/arrow/arrow_util.h
index 6623fc4384..a0e58ccbc1 100644
--- a/ydb/library/yql/minikql/arrow/arrow_util.h
+++ b/ydb/library/yql/minikql/arrow/arrow_util.h
@@ -19,8 +19,8 @@ using NYql::NUdf::Chop;
/// \brief Remove optional from `data` as new ArrayData object
std::shared_ptr<arrow::ArrayData> Unwrap(const arrow::ArrayData& data, TType* itemType);
-std::shared_ptr<arrow::Buffer> AllocateBitmapWithReserve(size_t bitCount, arrow::MemoryPool* pool);
-std::shared_ptr<arrow::Buffer> MakeDenseBitmap(const ui8* srcSparse, size_t len, arrow::MemoryPool* pool);
+using NYql::NUdf::AllocateBitmapWithReserve;
+using NYql::NUdf::MakeDenseBitmap;
inline arrow::internal::Bitmap GetBitmap(const arrow::ArrayData& arr, int index) {
return arrow::internal::Bitmap{ arr.buffers[index], arr.offset, arr.length };
@@ -233,64 +233,6 @@ inline arrow::Datum MakeScalarDatum<double>(double value) {
return arrow::Datum(std::make_shared<arrow::DoubleScalar>(value));
}
-// similar to arrow::TypedBufferBuilder, but with UnsafeAdvance() method
-// and shrinkToFit = false
-template<typename T>
-class TTypedBufferBuilder {
- static_assert(std::is_pod_v<T>);
- static_assert(!std::is_same_v<T, bool>);
-public:
- explicit TTypedBufferBuilder(arrow::MemoryPool* pool)
- : Builder(pool)
- {
- }
-
- inline void Reserve(size_t size) {
- ARROW_OK(Builder.Reserve(size * sizeof(T)));
- }
-
- inline size_t Length() const {
- return Builder.length() / sizeof(T);
- }
-
- inline T* MutableData() {
- return reinterpret_cast<T*>(Builder.mutable_data());
- }
-
- inline T* End() {
- return MutableData() + Length();
- }
-
- inline const T* Data() const {
- return reinterpret_cast<const T*>(Builder.data());
- }
-
- inline void UnsafeAppend(const T* values, size_t count) {
- std::memcpy(End(), values, count * sizeof(T));
- UnsafeAdvance(count);
- }
-
- inline void UnsafeAppend(size_t count, const T& value) {
- T* target = End();
- std::fill(target, target + count, value);
- UnsafeAdvance(count);
- }
-
- inline void UnsafeAppend(T&& value) {
- *End() = std::move(value);
- UnsafeAdvance(1);
- }
-
- inline void UnsafeAdvance(size_t count) {
- Builder.UnsafeAdvance(count * sizeof(T));
- }
-
- inline std::shared_ptr<arrow::Buffer> Finish() {
- bool shrinkToFit = false;
- return ARROW_RESULT(Builder.Finish(shrinkToFit));
- }
-private:
- arrow::BufferBuilder Builder;
-};
+using NYql::NUdf::TTypedBufferBuilder;
}
diff --git a/ydb/library/yql/minikql/arrow/mkql_bit_utils.h b/ydb/library/yql/minikql/arrow/mkql_bit_utils.h
index 9bb5d89887..1775dd68bb 100644
--- a/ydb/library/yql/minikql/arrow/mkql_bit_utils.h
+++ b/ydb/library/yql/minikql/arrow/mkql_bit_utils.h
@@ -1,5 +1,6 @@
#pragma once
#include <util/system/types.h>
+#include <ydb/library/yql/public/udf/arrow/bit_util.h>
namespace NKikimr {
namespace NMiniKQL {
@@ -70,48 +71,8 @@ inline size_t GetSparseBitmapPopCount(const ui8* src, size_t len) {
return result;
}
-namespace {
-template<bool Negate>
-inline void CompressSparseImpl(ui8* dst, const ui8* srcSparse, size_t len) {
- while (len >= 8) {
- ui8 result = 0;
- result |= (*srcSparse++ & 1u) << 0;
- result |= (*srcSparse++ & 1u) << 1;
- result |= (*srcSparse++ & 1u) << 2;
- result |= (*srcSparse++ & 1u) << 3;
- result |= (*srcSparse++ & 1u) << 4;
- result |= (*srcSparse++ & 1u) << 5;
- result |= (*srcSparse++ & 1u) << 6;
- result |= (*srcSparse++ & 1u) << 7;
- if constexpr (Negate) {
- *dst++ = ~result;
- } else {
- *dst++ = result;
- }
- len -= 8;
- }
- if (len) {
- ui8 result = 0;
- for (ui8 i = 0; i < len; ++i) {
- result |= (*srcSparse++ & 1u) << i;
- }
- if constexpr (Negate) {
- *dst++ = ~result;
- } else {
- *dst++ = result;
- }
- }
-}
-
-} // namespace
-
-inline void CompressSparseBitmap(ui8* dst, const ui8* srcSparse, size_t len) {
- return CompressSparseImpl<false>(dst, srcSparse, len);
-}
-
-inline void CompressSparseBitmapNegate(ui8* dst, const ui8* srcSparse, size_t len) {
- return CompressSparseImpl<true>(dst, srcSparse, len);
-}
+using NYql::NUdf::CompressSparseBitmap;
+using NYql::NUdf::CompressSparseBitmapNegate;
inline void NegateSparseBitmap(ui8* dst, const ui8* src, size_t len) {
while (len--) {
@@ -182,24 +143,8 @@ inline size_t CompressBitmap(const ui8* src, size_t srcOffset,
return dstOffset;
}
-template<typename T>
-inline T* CompressArray(const T* src, const ui8* sparseBitmap, T* dst, size_t count) {
- while (count--) {
- *dst = *src++;
- dst += *sparseBitmap++;
- }
- return dst;
-}
-
-inline ui8* CompressAsSparseBitmap(const ui8* src, size_t srcOffset, const ui8* sparseBitmap, ui8* dst, size_t count) {
- while (count--) {
- ui8 inputBit = (src[srcOffset >> 3] >> (srcOffset & 7)) & 1;
- *dst = inputBit;
- ++srcOffset;
- dst += *sparseBitmap++;
- }
- return dst;
-}
+using NYql::NUdf::CompressAsSparseBitmap;
+using NYql::NUdf::CompressArray;
} // namespace NMiniKQL
} // namespace NKikimr \ No newline at end of file
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp
index 2e933e3181..87b207e427 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp
@@ -144,7 +144,7 @@ private:
class TGenericColumnBuilder : public IAggColumnBuilder {
public:
TGenericColumnBuilder(ui64 size, TType* columnType, TComputationContext& ctx)
- : Builder_(MakeBlockBuilder(columnType, ctx.ArrowMemoryPool, size))
+ : Builder_(MakeArrayBuilder(TTypeInfoHelper(), columnType, ctx.ArrowMemoryPool, size))
, Ctx_(ctx)
{
}
@@ -158,7 +158,7 @@ public:
}
private:
- const std::unique_ptr<IBlockBuilder> Builder_;
+ const std::unique_ptr<IArrayBuilder> Builder_;
TComputationContext& Ctx_;
};
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp
index 08fdd91649..090df34937 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp
@@ -15,797 +15,5 @@
namespace NKikimr {
namespace NMiniKQL {
-namespace {
-
-std::shared_ptr<arrow::DataType> GetArrowType(TType* type) {
- std::shared_ptr<arrow::DataType> result;
- Y_VERIFY(ConvertArrowType(type, result));
- return result;
-}
-
-class TBlockBuilderBase : public IBlockBuilder {
-public:
- using Ptr = std::unique_ptr<TBlockBuilderBase>;
-
- struct TBlockArrayTree {
- using Ptr = std::shared_ptr<TBlockArrayTree>;
- std::deque<std::shared_ptr<arrow::ArrayData>> Payload;
- std::vector<TBlockArrayTree::Ptr> Children;
- };
-
-
- TBlockBuilderBase(TType* type, arrow::MemoryPool& pool, size_t maxLen)
- : Type(type)
- , Pool(&pool)
- , MaxLen(maxLen)
- {
- Y_VERIFY(type);
- Y_VERIFY(maxLen > 0);
- }
-
- size_t MaxLength() const final {
- return MaxLen;
- }
-
- void Add(NUdf::TUnboxedValuePod value) final {
- Y_VERIFY(CurrLen < MaxLen);
- DoAdd(value);
- CurrLen++;
- }
-
- void Add(TBlockItem value) final {
- Y_VERIFY(CurrLen < MaxLen);
- DoAdd(value);
- CurrLen++;
- }
-
-
- void AddDefault() {
- Y_VERIFY(CurrLen < MaxLen);
- DoAddDefault();
- CurrLen++;
- }
-
- void AddMany(const arrow::ArrayData& array, size_t popCount, const ui8* sparseBitmap, size_t bitmapSize) final {
- Y_VERIFY(array.length == bitmapSize);
- Y_VERIFY(popCount <= bitmapSize);
- Y_VERIFY(CurrLen + popCount <= MaxLen);
-
- if (popCount) {
- DoAddMany(array, sparseBitmap, popCount);
- }
-
- CurrLen += popCount;
- }
-
- arrow::Datum Build(bool finish) final {
- auto tree = BuildTree(finish);
- TVector<std::shared_ptr<arrow::ArrayData>> chunks;
- while (size_t size = CalcSliceSize(*tree)) {
- chunks.push_back(Slice(*tree, size));
- }
-
- return MakeArray(chunks);
- }
-
- TBlockArrayTree::Ptr BuildTree(bool finish) {
- auto result = DoBuildTree(finish);
- CurrLen = 0;
- return result;
- }
-protected:
- virtual void DoAdd(NUdf::TUnboxedValuePod value) = 0;
- virtual void DoAdd(TBlockItem value) = 0;
- virtual void DoAddDefault() = 0;
- virtual void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) = 0;
- virtual TBlockArrayTree::Ptr DoBuildTree(bool finish) = 0;
-
-private:
- static size_t CalcSliceSize(const TBlockArrayTree& tree) {
- if (tree.Payload.empty()) {
- return 0;
- }
-
- if (!tree.Children.empty()) {
- Y_VERIFY(tree.Payload.size() == 1);
- size_t result = std::numeric_limits<size_t>::max();
- for (auto& child : tree.Children) {
- size_t childSize = CalcSliceSize(*child);
- result = std::min(result, childSize);
- }
- Y_VERIFY(result <= tree.Payload.front()->length);
- return result;
- }
-
- int64_t result = std::numeric_limits<int64_t>::max();
- for (auto& data : tree.Payload) {
- result = std::min(result, data->length);
- }
-
- Y_VERIFY(result > 0);
- return static_cast<size_t>(result);
- }
-
- static std::shared_ptr<arrow::ArrayData> Slice(TBlockArrayTree& tree, size_t size) {
- Y_VERIFY(size > 0);
-
- Y_VERIFY(!tree.Payload.empty());
- auto& main = tree.Payload.front();
- std::shared_ptr<arrow::ArrayData> sliced;
- if (size == main->length) {
- sliced = main;
- tree.Payload.pop_front();
- } else {
- Y_VERIFY(size < main->length);
- sliced = Chop(main, size);
- }
-
- if (!tree.Children.empty()) {
- std::vector<std::shared_ptr<arrow::ArrayData>> children;
- for (auto& child : tree.Children) {
- children.push_back(Slice(*child, size));
- }
-
- sliced->child_data = std::move(children);
- if (tree.Payload.empty()) {
- tree.Children.clear();
- }
- }
- return sliced;
- }
-
-protected:
- size_t GetCurrLen() const {
- return CurrLen;
- }
-
- TType* const Type;
- arrow::MemoryPool* const Pool;
- const size_t MaxLen;
-private:
- size_t CurrLen = 0;
-};
-
-template <typename T, bool Nullable>
-class TFixedSizeBlockBuilder : public TBlockBuilderBase {
-public:
- TFixedSizeBlockBuilder(TType* type, arrow::MemoryPool& pool, size_t maxLen)
- : TBlockBuilderBase(type, pool, maxLen)
- {
- Reserve();
- }
-
- void DoAdd(NUdf::TUnboxedValuePod value) final {
- if constexpr (Nullable) {
- if (!value) {
- return DoAdd(TBlockItem{});
- }
- }
- DoAdd(TBlockItem(value.Get<T>()));
- }
-
- void DoAdd(TBlockItem value) final {
- if constexpr (Nullable) {
- if (!value) {
- NullBuilder->UnsafeAppend(0);
- DataBuilder->UnsafeAppend(T{});
- return;
- }
- NullBuilder->UnsafeAppend(1);
- }
-
- DataBuilder->UnsafeAppend(value.As<T>());
- }
-
- void DoAddDefault() final {
- if constexpr (Nullable) {
- NullBuilder->UnsafeAppend(1);
- }
- DataBuilder->UnsafeAppend(T{});
- }
-
- void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final {
- Y_VERIFY(array.buffers.size() > 1);
- if constexpr (Nullable) {
- Y_VERIFY(NullBuilder->Length() == DataBuilder->Length());
- if (array.buffers.front()) {
- ui8* dstBitmap = NullBuilder->End();
- CompressAsSparseBitmap(array.GetValues<ui8>(0, 0), array.offset, sparseBitmap, dstBitmap, array.length);
- NullBuilder->UnsafeAdvance(popCount);
- } else {
- NullBuilder->UnsafeAppend(popCount, 1);
- }
- }
-
- const T* src = array.GetValues<T>(1);
- T* dst = DataBuilder->End();
- CompressArray(src, sparseBitmap, dst, array.length);
- DataBuilder->UnsafeAdvance(popCount);
- }
-
- TBlockArrayTree::Ptr DoBuildTree(bool finish) final {
- const size_t len = DataBuilder->Length();
- std::shared_ptr<arrow::Buffer> nulls;
- if constexpr (Nullable) {
- Y_VERIFY(NullBuilder->Length() == len);
- nulls = NullBuilder->Finish();
- nulls = MakeDenseBitmap(nulls->data(), len, Pool);
- }
- std::shared_ptr<arrow::Buffer> data = DataBuilder->Finish();
-
- TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>();
- result->Payload.push_back(arrow::ArrayData::Make(GetArrowType(Type), len, {nulls, data}));
-
- NullBuilder.reset();
- DataBuilder.reset();
- if (!finish) {
- Reserve();
- }
- return result;
- }
-
-private:
- void Reserve() {
- DataBuilder = std::make_unique<TTypedBufferBuilder<T>>(Pool);
- DataBuilder->Reserve(MaxLen + 1);
- if constexpr (Nullable) {
- NullBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool);
- NullBuilder->Reserve(MaxLen + 1);
- }
- }
-
- std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder;
- std::unique_ptr<TTypedBufferBuilder<T>> DataBuilder;
-};
-
-template<typename TStringType, bool Nullable>
-class TStringBlockBuilder : public TBlockBuilderBase {
-public:
- using TOffset = typename TStringType::offset_type;
-
- TStringBlockBuilder(TType* type, arrow::MemoryPool& pool, size_t maxLen)
- : TBlockBuilderBase(type, pool, maxLen)
- {
- Reserve();
- }
-
- void DoAdd(NUdf::TUnboxedValuePod value) final {
- if constexpr (Nullable) {
- if (!value) {
- return DoAdd(TBlockItem{});
- }
- }
-
- DoAdd(TBlockItem(value.AsStringRef()));
- }
-
- void DoAdd(TBlockItem value) final {
- if constexpr (Nullable) {
- if (!value) {
- NullBuilder->UnsafeAppend(0);
- AppendCurrentOffset();
- return;
- }
- }
-
- const std::string_view str = value.AsStringRef();
-
- size_t currentLen = DataBuilder->Length();
- // empty string can always be appended
- if (!str.empty() && currentLen + str.size() > MaxBlockSizeInBytes) {
- if (currentLen) {
- FlushChunk(false);
- }
- if (str.size() > MaxBlockSizeInBytes) {
- DataBuilder->Reserve(str.size());
- }
- }
-
- AppendCurrentOffset();
- DataBuilder->UnsafeAppend((const ui8*)str.data(), str.size());
- if constexpr (Nullable) {
- NullBuilder->UnsafeAppend(1);
- }
- }
-
-
- void DoAddDefault() final {
- if constexpr (Nullable) {
- NullBuilder->UnsafeAppend(1);
- }
- AppendCurrentOffset();
- }
-
- void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final {
- Y_VERIFY(array.buffers.size() > 2);
- Y_VERIFY(!Nullable || NullBuilder->Length() == OffsetsBuilder->Length());
-
- const ui8* srcNulls = array.GetValues<ui8>(0, 0);
- const TOffset* srcOffset = array.GetValues<TOffset>(1);
- const ui8* srcData = array.GetValues<ui8>(2, 0);
-
- const ui8* chunkStart = srcData;
- const ui8* chunkEnd = chunkStart;
- size_t dataLen = DataBuilder->Length();
-
- ui8* dstNulls = Nullable ? NullBuilder->End() : nullptr;
- TOffset* dstOffset = OffsetsBuilder->End();
- size_t countAdded = 0;
- for (size_t i = 0; i < array.length; i++) {
- if (!sparseBitmap[i]) {
- continue;
- }
-
- const ui8* begin = srcData + srcOffset[i];
- const ui8* end = srcData + srcOffset[i + 1];
- const size_t strSize = end - begin;
-
- size_t availBytes = std::max(dataLen, MaxBlockSizeInBytes) - dataLen;
-
- for (;;) {
- // try to append ith string
- if (strSize <= availBytes) {
- if (begin == chunkEnd) {
- chunkEnd = end;
- } else {
- DataBuilder->UnsafeAppend(chunkStart, chunkEnd - chunkStart);
- chunkStart = begin;
- chunkEnd = end;
- }
-
- size_t nullOffset = i + array.offset;
- if constexpr (Nullable) {
- *dstNulls++ = srcNulls ? ((srcNulls[nullOffset >> 3] >> (nullOffset & 7)) & 1) : 1u;
- }
- *dstOffset++ = dataLen;
-
- dataLen += strSize;
- ++countAdded;
-
- break;
- }
-
- if (dataLen) {
- if (chunkStart != chunkEnd) {
- DataBuilder->UnsafeAppend(chunkStart, chunkEnd - chunkStart);
- chunkStart = chunkEnd = srcData;
- }
- Y_VERIFY(dataLen == DataBuilder->Length());
- OffsetsBuilder->UnsafeAdvance(countAdded);
- if constexpr (Nullable) {
- NullBuilder->UnsafeAdvance(countAdded);
- }
- FlushChunk(false);
-
- dataLen = 0;
- countAdded = 0;
- if constexpr (Nullable) {
- dstNulls = NullBuilder->End();
- }
- dstOffset = OffsetsBuilder->End();
- } else {
- DataBuilder->Reserve(strSize);
- availBytes = strSize;
- }
- }
- }
- if (chunkStart != chunkEnd) {
- DataBuilder->UnsafeAppend(chunkStart, chunkEnd - chunkStart);
- }
- Y_VERIFY(dataLen == DataBuilder->Length());
- OffsetsBuilder->UnsafeAdvance(countAdded);
- if constexpr (Nullable) {
- NullBuilder->UnsafeAdvance(countAdded);
- }
- }
-
- TBlockArrayTree::Ptr DoBuildTree(bool finish) final {
- FlushChunk(finish);
- TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>();
- result->Payload = std::move(Chunks);
- Chunks.clear();
- return result;
- }
-
-private:
- void Reserve() {
- if constexpr (Nullable) {
- NullBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool);
- NullBuilder->Reserve(MaxLen + 1);
- }
- OffsetsBuilder = std::make_unique<TTypedBufferBuilder<TOffset>>(Pool);
- OffsetsBuilder->Reserve(MaxLen + 1);
- DataBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool);
- DataBuilder->Reserve(MaxBlockSizeInBytes);
- }
-
- void AppendCurrentOffset() {
- OffsetsBuilder->UnsafeAppend(DataBuilder->Length());
- }
-
- void FlushChunk(bool finish) {
- const auto length = OffsetsBuilder->Length();
- Y_VERIFY(length > 0);
-
- AppendCurrentOffset();
- std::shared_ptr<arrow::Buffer> nullBitmap;
- if constexpr (Nullable) {
- nullBitmap = NullBuilder->Finish();
- nullBitmap = MakeDenseBitmap(nullBitmap->data(), length, Pool);
- }
- std::shared_ptr<arrow::Buffer> offsets = OffsetsBuilder->Finish();
- std::shared_ptr<arrow::Buffer> data = DataBuilder->Finish();
-
- auto arrowType = std::make_shared<TStringType>();
- Chunks.push_back(arrow::ArrayData::Make(arrowType, length, { nullBitmap, offsets, data }));
- if (!finish) {
- Reserve();
- }
- }
-
- std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder;
- std::unique_ptr<TTypedBufferBuilder<TOffset>> OffsetsBuilder;
- std::unique_ptr<TTypedBufferBuilder<ui8>> DataBuilder;
-
- std::deque<std::shared_ptr<arrow::ArrayData>> Chunks;
-};
-
-template<bool Nullable>
-class TTupleBlockBuilder : public TBlockBuilderBase {
-public:
- TTupleBlockBuilder(TType* type, arrow::MemoryPool& pool, size_t maxLen,
- TVector<TBlockBuilderBase::Ptr>&& children)
- : TBlockBuilderBase(type, pool, maxLen)
- , Children(std::move(children))
- {
- Reserve();
- }
-
- void DoAdd(NUdf::TUnboxedValuePod value) final {
- TTupleType* tupleType = AS_TYPE(TTupleType, Type);
- if constexpr (Nullable) {
- if (!value) {
- NullBuilder->UnsafeAppend(0);
- for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
- Children[i]->AddDefault();
- }
- return;
- }
- NullBuilder->UnsafeAppend(1);
- }
-
- auto elements = value.GetElements();
- if (elements) {
- for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
- Children[i]->Add(elements[i]);
- }
- } else {
- for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
- auto element = value.GetElement(i);
- Children[i]->Add(element);
- }
- }
- }
-
- void DoAdd(TBlockItem value) final {
- TTupleType* tupleType = AS_TYPE(TTupleType, Type);
- if constexpr (Nullable) {
- if (!value) {
- NullBuilder->UnsafeAppend(0);
- for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
- Children[i]->AddDefault();
- }
- return;
- }
- NullBuilder->UnsafeAppend(1);
- }
-
- auto elements = value.AsTuple();
- for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
- Children[i]->Add(elements[i]);
- }
- }
-
- void DoAddDefault() final {
- TTupleType* tupleType = AS_TYPE(TTupleType, Type);
- if constexpr (Nullable) {
- NullBuilder->UnsafeAppend(1);
- }
- for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
- Children[i]->AddDefault();
- }
- }
-
- void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final {
- Y_VERIFY(!array.buffers.empty());
- Y_VERIFY(array.child_data.size() == Children.size());
-
- if constexpr (Nullable) {
- if (array.buffers.front()) {
- ui8* dstBitmap = NullBuilder->End();
- CompressAsSparseBitmap(array.GetValues<ui8>(0, 0), array.offset, sparseBitmap, dstBitmap, array.length);
- NullBuilder->UnsafeAdvance(popCount);
- } else {
- NullBuilder->UnsafeAppend(popCount, 1);
- }
- }
-
- for (size_t i = 0; i < Children.size(); ++i) {
- Children[i]->AddMany(*array.child_data[i], popCount, sparseBitmap, array.length);
- }
- }
-
- TBlockArrayTree::Ptr DoBuildTree(bool finish) final {
- auto tupleType = AS_TYPE(TTupleType, Type);
- TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>();
-
- std::shared_ptr<arrow::Buffer> nullBitmap;
- const size_t length = GetCurrLen();
- if constexpr (Nullable) {
- MKQL_ENSURE(length == NullBuilder->Length(), "Unexpected NullBuilder length");
- nullBitmap = NullBuilder->Finish();
- nullBitmap = MakeDenseBitmap(nullBitmap->data(), length, Pool);
- }
-
- Y_VERIFY(length);
- result->Payload.push_back(arrow::ArrayData::Make(GetArrowType(Type), length, { nullBitmap }));
- result->Children.reserve(tupleType->GetElementsCount());
- for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
- result->Children.emplace_back(Children[i]->BuildTree(finish));
- }
-
- if (!finish) {
- Reserve();
- }
-
- return result;
- }
-
-private:
- void Reserve() {
- if constexpr (Nullable) {
- NullBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool);
- NullBuilder->Reserve(MaxLen + 1);
- }
- }
-
-private:
- TVector<std::unique_ptr<TBlockBuilderBase>> Children;
- std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder;
-};
-
-class TExternalOptionalBlockBuilder : public TBlockBuilderBase {
-public:
- TExternalOptionalBlockBuilder(TType* type, arrow::MemoryPool& pool, size_t maxLen, std::unique_ptr<TBlockBuilderBase>&& inner)
- : TBlockBuilderBase(type, pool, maxLen)
- , Inner(std::move(inner))
- {
- Reserve();
- }
-
- void DoAdd(NUdf::TUnboxedValuePod value) final {
- if (!value) {
- NullBuilder->UnsafeAppend(0);
- Inner->AddDefault();
- return;
- }
-
- NullBuilder->UnsafeAppend(1);
- Inner->Add(value.GetOptionalValue());
- }
-
- void DoAdd(TBlockItem value) final {
- if (!value) {
- NullBuilder->UnsafeAppend(0);
- Inner->AddDefault();
- return;
- }
-
- NullBuilder->UnsafeAppend(1);
- Inner->Add(value.GetOptionalValue());
- }
-
- void DoAddDefault() final {
- NullBuilder->UnsafeAppend(1);
- Inner->AddDefault();
- }
-
- void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final {
- Y_VERIFY(!array.buffers.empty());
- Y_VERIFY(array.child_data.size() == 1);
-
- if (array.buffers.front()) {
- ui8* dstBitmap = NullBuilder->End();
- CompressAsSparseBitmap(array.GetValues<ui8>(0, 0), array.offset, sparseBitmap, dstBitmap, array.length);
- NullBuilder->UnsafeAdvance(popCount);
- } else {
- NullBuilder->UnsafeAppend(popCount, 1);
- }
-
- Inner->AddMany(*array.child_data[0], popCount, sparseBitmap, array.length);
- }
-
- TBlockArrayTree::Ptr DoBuildTree(bool finish) final {
- TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>();
-
- std::shared_ptr<arrow::Buffer> nullBitmap;
- const size_t length = GetCurrLen();
- MKQL_ENSURE(length == NullBuilder->Length(), "Unexpected NullBuilder length");
- nullBitmap = NullBuilder->Finish();
- nullBitmap = MakeDenseBitmap(nullBitmap->data(), length, Pool);
-
- Y_VERIFY(length);
- result->Payload.push_back(arrow::ArrayData::Make(GetArrowType(Type), length, { nullBitmap }));
- result->Children.emplace_back(Inner->BuildTree(finish));
-
- if (!finish) {
- Reserve();
- }
-
- return result;
- }
-
-private:
- void Reserve() {
- NullBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool);
- NullBuilder->Reserve(MaxLen + 1);
- }
-
-private:
- std::unique_ptr<TBlockBuilderBase> Inner;
- std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder;
-};
-
-std::unique_ptr<TBlockBuilderBase> MakeBlockBuilderBase(TType* type, arrow::MemoryPool& pool, size_t maxBlockLength);
-
-template<bool Nullable>
-std::unique_ptr<TBlockBuilderBase> MakeBlockBuilderImpl(TType* type, arrow::MemoryPool& pool, size_t maxLen) {
- if constexpr (Nullable) {
- type = AS_TYPE(TOptionalType, type)->GetItemType();
- }
-
- if (type->IsTuple()) {
- auto tupleType = AS_TYPE(TTupleType, type);
- TVector<std::unique_ptr<TBlockBuilderBase>> children;
- for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
- TType* childType = tupleType->GetElementType(i);
- auto childBuilder = MakeBlockBuilderBase(childType, pool, maxLen);
- children.push_back(std::move(childBuilder));
- }
-
- return std::make_unique<TTupleBlockBuilder<Nullable>>(type, pool, maxLen, std::move(children));
- }
-
- if (type->IsData()) {
- auto slot = *AS_TYPE(TDataType, type)->GetDataSlot();
- switch (slot) {
- case NUdf::EDataSlot::Int8:
- return std::make_unique<TFixedSizeBlockBuilder<i8, Nullable>>(type, pool, maxLen);
- case NUdf::EDataSlot::Uint8:
- case NUdf::EDataSlot::Bool:
- return std::make_unique<TFixedSizeBlockBuilder<ui8, Nullable>>(type, pool, maxLen);
- case NUdf::EDataSlot::Int16:
- return std::make_unique<TFixedSizeBlockBuilder<i16, Nullable>>(type, pool, maxLen);
- case NUdf::EDataSlot::Uint16:
- case NUdf::EDataSlot::Date:
- return std::make_unique<TFixedSizeBlockBuilder<ui16, Nullable>>(type, pool, maxLen);
- case NUdf::EDataSlot::Int32:
- return std::make_unique<TFixedSizeBlockBuilder<i32, Nullable>>(type, pool, maxLen);
- case NUdf::EDataSlot::Uint32:
- case NUdf::EDataSlot::Datetime:
- return std::make_unique<TFixedSizeBlockBuilder<ui32, Nullable>>(type, pool, maxLen);
- case NUdf::EDataSlot::Int64:
- case NUdf::EDataSlot::Interval:
- return std::make_unique<TFixedSizeBlockBuilder<i64, Nullable>>(type, pool, maxLen);
- case NUdf::EDataSlot::Uint64:
- case NUdf::EDataSlot::Timestamp:
- return std::make_unique<TFixedSizeBlockBuilder<ui64, Nullable>>(type, pool, maxLen);
- case NUdf::EDataSlot::Float:
- return std::make_unique<TFixedSizeBlockBuilder<float, Nullable>>(type, pool, maxLen);
- case NUdf::EDataSlot::Double:
- return std::make_unique<TFixedSizeBlockBuilder<double, Nullable>>(type, pool, maxLen);
- case NUdf::EDataSlot::String:
- return std::make_unique<TStringBlockBuilder<arrow::BinaryType, Nullable>>(type, pool, maxLen);
- case NUdf::EDataSlot::Utf8:
- return std::make_unique<TStringBlockBuilder<arrow::StringType, Nullable>>(type, pool, maxLen);
- default:
- MKQL_ENSURE(false, "Unsupported data slot");
- }
- }
-
- MKQL_ENSURE(false, "Unsupported type");
-}
-
-std::unique_ptr<TBlockBuilderBase> MakeBlockBuilderBase(TType* type, arrow::MemoryPool& pool, size_t maxBlockLength) {
- TType* unpacked = type;
- if (type->IsOptional()) {
- unpacked = AS_TYPE(TOptionalType, type)->GetItemType();
- }
-
- if (unpacked->IsOptional()) {
- // at least 2 levels of optionals
- ui32 nestLevel = 0;
- auto currentType = type;
- auto previousType = type;
- TVector<TType*> types;
- do {
- ++nestLevel;
- types.push_back(currentType);
- previousType = currentType;
- currentType = AS_TYPE(TOptionalType, currentType)->GetItemType();
- } while (currentType->IsOptional());
-
- auto builder = MakeBlockBuilderBase(previousType, pool, maxBlockLength);
- for (ui32 i = 1; i < nestLevel; ++i) {
- builder = std::make_unique<TExternalOptionalBlockBuilder>(types[nestLevel - 1 - i], pool, maxBlockLength, std::move(builder));
- }
-
- return builder;
- } else {
- if (type->IsOptional()) {
- return MakeBlockBuilderImpl<true>(type, pool, maxBlockLength);
- } else {
- return MakeBlockBuilderImpl<false>(type, pool, maxBlockLength);
- }
- }
-}
-
-} // namespace
-
-size_t CalcMaxBlockItemSize(const TType* type) {
- // we do not count block bitmap size
- if (type->IsOptional()) {
- return CalcMaxBlockItemSize(AS_TYPE(TOptionalType, type)->GetItemType());
- }
-
- if (type->IsTuple()) {
- auto tupleType = AS_TYPE(TTupleType, type);
- size_t result = 0;
- for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
- result = std::max(result, CalcMaxBlockItemSize(tupleType->GetElementType(i)));
- }
- return result;
- }
-
- if (type->IsData()) {
- auto slot = *AS_TYPE(TDataType, type)->GetDataSlot();
- switch (slot) {
- case NUdf::EDataSlot::Int8:
- case NUdf::EDataSlot::Uint8:
- case NUdf::EDataSlot::Bool:
- case NUdf::EDataSlot::Int16:
- case NUdf::EDataSlot::Uint16:
- case NUdf::EDataSlot::Date:
- case NUdf::EDataSlot::Int32:
- case NUdf::EDataSlot::Uint32:
- case NUdf::EDataSlot::Datetime:
- case NUdf::EDataSlot::Int64:
- case NUdf::EDataSlot::Interval:
- case NUdf::EDataSlot::Uint64:
- case NUdf::EDataSlot::Timestamp:
- case NUdf::EDataSlot::Float:
- case NUdf::EDataSlot::Double: {
- size_t sz = GetDataTypeInfo(slot).FixedSize;
- MKQL_ENSURE(sz > 0, "Unexpected fixed data size");
- return sz;
- }
- case NUdf::EDataSlot::String:
- // size of offset part
- return sizeof(arrow::BinaryType::offset_type);
- case NUdf::EDataSlot::Utf8:
- // size of offset part
- return sizeof(arrow::StringType::offset_type);
- default:
- MKQL_ENSURE(false, "Unsupported data slot");
- }
- }
-
- MKQL_ENSURE(false, "Unsupported type");
-}
-
-std::unique_ptr<IBlockBuilder> MakeBlockBuilder(TType* type, arrow::MemoryPool& pool, size_t maxBlockLength) {
- return MakeBlockBuilderBase(type, pool, maxBlockLength);
-}
-
} // namespace NMiniKQL
} // namespace NKikimr
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.h
index 3a937d6dd7..355c1be4ce 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.h
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.h
@@ -5,36 +5,17 @@
#include <ydb/library/yql/minikql/mkql_node.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
-#include <arrow/array/data.h>
+#include <ydb/library/yql/public/udf/arrow/block_builder.h>
-#include <util/generic/size_literals.h>
+#include <arrow/array/data.h>
#include <limits>
namespace NKikimr {
namespace NMiniKQL {
-constexpr size_t MaxBlockSizeInBytes = 1_MB;
-static_assert(MaxBlockSizeInBytes < (size_t)std::numeric_limits<i32>::max());
-
-// maximum size of block item in bytes
-size_t CalcMaxBlockItemSize(const TType* type);
-
-inline size_t CalcBlockLen(size_t maxBlockItemSize) {
- return MaxBlockSizeInBytes / std::max<size_t>(maxBlockItemSize, 1);
-}
-
-class IBlockBuilder {
-public:
- virtual ~IBlockBuilder() = default;
- virtual size_t MaxLength() const = 0;
- virtual void Add(NUdf::TUnboxedValuePod value) = 0;
- virtual void Add(TBlockItem value) = 0;
- virtual void AddMany(const arrow::ArrayData& array, size_t popCount, const ui8* sparseBitmap, size_t bitmapSize) = 0;
- virtual arrow::Datum Build(bool finish) = 0;
-};
-
-std::unique_ptr<IBlockBuilder> MakeBlockBuilder(TType* type, arrow::MemoryPool& pool, size_t maxBlockLength);
+using NYql::NUdf::IArrayBuilder;
+using NYql::NUdf::MakeArrayBuilder;
}
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp
index 308ce6b854..726adcd5b1 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp
@@ -254,7 +254,7 @@ private:
TVector<NUdf::TUnboxedValue*> ValuePointers_;
TVector<NUdf::TUnboxedValue> InputValues_;
TVector<std::shared_ptr<arrow::ArrayData>> Arrays_;
- TVector<std::unique_ptr<IBlockBuilder>> Builders_;
+ TVector<std::unique_ptr<IArrayBuilder>> Builders_;
size_t InputSize_ = 0;
size_t OutputPos_ = 0;
size_t OutputSize_ = 0;
@@ -289,7 +289,7 @@ private:
for (ui32 i = 0, outIndex = 0; i < width; ++i) {
if (i != bitmapIndex) {
if (types[i]->GetShape() != TBlockType::EShape::Scalar && output[outIndex] != nullptr) {
- Builders_[i] = MakeBlockBuilder(types[i]->GetItemType(), pool, maxBlockLen);
+ Builders_[i] = MakeArrayBuilder(TTypeInfoHelper(), types[i]->GetItemType(), pool, maxBlockLen);
}
outIndex++;
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_if.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_if.cpp
index 52084e3ae6..10c3e41d8a 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_if.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_if.cpp
@@ -73,8 +73,8 @@ class TIfBlockExec {
public:
explicit TIfBlockExec(TType* type)
: Type(type)
- , ThenReader(MakeBlockReader(type))
- , ElseReader(MakeBlockReader(type))
+ , ThenReader(MakeBlockReader(TTypeInfoHelper(), type))
+ , ElseReader(MakeBlockReader(TTypeInfoHelper(), type))
{
}
@@ -105,7 +105,7 @@ public:
const std::shared_ptr<arrow::ArrayData>& pred = predDatum.array();
const size_t len = pred->length;
- auto builder = MakeBlockBuilder(Type, *ctx->memory_pool(), len);
+ auto builder = MakeArrayBuilder(TTypeInfoHelper(), Type, *ctx->memory_pool(), len);
const ui8* predValues = pred->GetValues<uint8_t>(1);
for (size_t i = 0; i < len; ++i) {
if constexpr (!ThenIsScalar) {
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_impl.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_impl.cpp
index ce7803fe0e..6610a0de3a 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_impl.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_impl.cpp
@@ -27,8 +27,8 @@ std::vector<arrow::ValueDescr> ToValueDescr(const TVector<TType*>& types) {
arrow::Datum MakeArrayFromScalar(const arrow::Scalar& scalar, size_t len, TType* type, arrow::MemoryPool& pool) {
MKQL_ENSURE(len > 0, "Invalid block size");
- auto reader = MakeBlockReader(type);
- auto builder = MakeBlockBuilder(type, pool, len);
+ auto reader = MakeBlockReader(TTypeInfoHelper(), type);
+ auto builder = MakeArrayBuilder(TTypeInfoHelper(), type, pool, len);
auto scalarItem = reader->GetScalarItem(scalar);
for (size_t i = 0; i < len; ++i) {
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_item.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_item.h
index 37c357fcc9..a01d2dc780 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_item.h
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_item.h
@@ -1,149 +1,8 @@
#pragma once
-
-#include <ydb/library/yql/public/udf/udf_data_type.h>
-#include <ydb/library/yql/public/udf/udf_string_ref.h>
-#include <ydb/library/yql/public/udf/udf_type_size_check.h>
+#include <ydb/library/yql/public/udf/arrow/block_item.h>
namespace NKikimr::NMiniKQL {
-class TBlockItem {
- enum class EMarkers : ui8 {
- Empty = 0,
- Present = 1,
- };
-
-public:
- TBlockItem() noexcept = default;
- ~TBlockItem() noexcept = default;
-
- TBlockItem(const TBlockItem& value) noexcept = default;
- TBlockItem(TBlockItem&& value) noexcept = default;
-
- TBlockItem& operator=(const TBlockItem& value) noexcept = default;
- TBlockItem& operator=(TBlockItem&& value) noexcept = default;
-
- template <typename T, typename = std::enable_if_t<NYql::NUdf::TPrimitiveDataType<T>::Result>>
- inline explicit TBlockItem(T value);
-
- inline explicit TBlockItem(NYql::NUdf::TStringRef value) {
- Raw.String.Value = value.Data();
- Raw.String.Size = value.Size();
- Raw.Simple.Meta = static_cast<ui8>(EMarkers::Present);
- }
-
- inline explicit TBlockItem(const TBlockItem* tupleItems) {
- Raw.Tuple.Value = tupleItems;
- Raw.Simple.Meta = static_cast<ui8>(EMarkers::Present);
- }
-
- inline TBlockItem(ui64 low, ui64 high) {
- Raw.Halfs[0] = low;
- Raw.Halfs[1] = high;
- }
-
- inline ui64 Low() const {
- return Raw.Halfs[0];
- }
-
- inline ui64 High() const {
- return Raw.Halfs[1];
- }
-
- template <typename T, typename = std::enable_if_t<NYql::NUdf::TPrimitiveDataType<T>::Result>>
- inline T As() const;
-
- inline const TBlockItem* AsTuple() const {
- Y_VERIFY_DEBUG(Raw.GetMarkers() == EMarkers::Present);
- return Raw.Tuple.Value;
- }
-
- inline NYql::NUdf::TStringRef AsStringRef() const {
- Y_VERIFY_DEBUG(Raw.GetMarkers() == EMarkers::Present);
- return NYql::NUdf::TStringRef(Raw.String.Value, Raw.String.Size);
- }
-
- inline TBlockItem MakeOptional() const
- {
- if (Raw.Simple.Meta)
- return *this;
-
- TBlockItem result(*this);
- ++result.Raw.Simple.Count;
- return result;
- }
-
- inline TBlockItem GetOptionalValue() const
- {
- if (Raw.Simple.Meta)
- return *this;
-
- Y_VERIFY_DEBUG(Raw.Simple.Count > 0U, "Can't get value from empty.");
-
- TBlockItem result(*this);
- --result.Raw.Simple.Count;
- return result;
- }
-
- inline explicit operator bool() const { return bool(Raw); }
-private:
- union TRaw {
- ui64 Halfs[2] = {0, 0};
- struct {
- union {
- #define FIELD(type) type type##_;
- PRIMITIVE_VALUE_TYPES(FIELD);
- #undef FIELD
- ui64 Count;
- };
- union {
- ui64 Pad;
- struct {
- ui8 Reserved[7];
- ui8 Meta;
- };
- };
- } Simple;
-
- struct {
- const char* Value;
- ui32 Size;
- } String;
-
- struct {
- // client should know tuple size
- const TBlockItem* Value;
- } Tuple;
-
- EMarkers GetMarkers() const {
- return static_cast<EMarkers>(Simple.Meta);
- }
-
- explicit operator bool() const { return Simple.Meta | Simple.Count; }
- } Raw;
-};
-
-UDF_ASSERT_TYPE_SIZE(TBlockItem, 16);
-
-#define VALUE_GET(xType) \
- template <> \
- inline xType TBlockItem::As<xType>() const \
- { \
- Y_VERIFY_DEBUG(Raw.GetMarkers() == EMarkers::Present); \
- return Raw.Simple.xType##_; \
- }
-
-#define VALUE_CONSTR(xType) \
- template <> \
- inline TBlockItem::TBlockItem(xType value) \
- { \
- Raw.Simple.xType##_ = value; \
- Raw.Simple.Meta = static_cast<ui8>(EMarkers::Present); \
- }
-
-PRIMITIVE_VALUE_TYPES(VALUE_GET)
-PRIMITIVE_VALUE_TYPES(VALUE_CONSTR)
-
-#undef VALUE_GET
-#undef VALUE_CONSTR
+using NYql::NUdf::TBlockItem;
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp
index 914ec7e6ca..a73c7cabf2 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp
@@ -4,6 +4,8 @@
#include <ydb/library/yql/minikql/mkql_node_builder.h>
#include <ydb/library/yql/minikql/mkql_node_cast.h>
+#include <ydb/library/yql/public/udf/udf_type_inspection.h>
+
#include <arrow/array/array_binary.h>
#include <arrow/chunked_array.h>
@@ -12,29 +14,9 @@ namespace NMiniKQL {
namespace {
-inline bool IsNull(const arrow::ArrayData& data, size_t index) {
- return data.GetNullCount() > 0 && !arrow::BitUtil::GetBit(data.GetValues<uint8_t>(0, 0), index + data.offset);
-}
-
template <typename T>
-class TFixedSizeBlockReader : public IBlockReader {
+class TFixedSizeBlockItemConverter : public IBlockItemConverter {
public:
- TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final {
- if (IsNull(data, index)) {
- return {};
- }
-
- return TBlockItem(data.GetValues<T>(1)[index]);
- }
-
- TBlockItem GetScalarItem(const arrow::Scalar& scalar) final {
- if (!scalar.is_valid) {
- return {};
- }
-
- return TBlockItem(*static_cast<const T*>(arrow::internal::checked_cast<const arrow::internal::PrimitiveScalarBase&>(scalar).data()));
- }
-
NUdf::TUnboxedValuePod MakeValue(TBlockItem item, const THolderFactory& holderFactory) const final {
Y_UNUSED(holderFactory);
return item ? NUdf::TUnboxedValuePod(item.As<T>()) : NUdf::TUnboxedValuePod{};
@@ -42,33 +24,8 @@ public:
};
template<typename TStringType>
-class TStringBlockReader : public IBlockReader {
+class TStringBlockItemConverter : public IBlockItemConverter {
public:
- using TOffset = typename TStringType::offset_type;
-
- TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final {
- Y_VERIFY_DEBUG(data.buffers.size() == 3);
- if (IsNull(data, index)) {
- return {};
- }
-
- const TOffset* offsets = data.GetValues<TOffset>(1);
- const char* strData = data.GetValues<char>(2, 0);
-
- std::string_view str(strData + offsets[index], offsets[index + 1] - offsets[index]);
- return TBlockItem(str);
- }
-
- TBlockItem GetScalarItem(const arrow::Scalar& scalar) final {
- if (!scalar.is_valid) {
- return {};
- }
-
- auto buffer = arrow::internal::checked_cast<const arrow::BaseBinaryScalar&>(scalar).value;
- std::string_view str(reinterpret_cast<const char*>(buffer->data()), buffer->size());
- return TBlockItem(str);
- }
-
NUdf::TUnboxedValuePod MakeValue(TBlockItem item, const THolderFactory& holderFactory) const final {
Y_UNUSED(holderFactory);
if (!item) {
@@ -78,39 +35,12 @@ public:
}
};
-class TTupleBlockReader : public IBlockReader {
+class TTupleBlockItemConverter : public IBlockItemConverter {
public:
- TTupleBlockReader(TVector<std::unique_ptr<IBlockReader>>&& children)
+ TTupleBlockItemConverter(TVector<std::unique_ptr<IBlockItemConverter>>&& children)
: Children(std::move(children))
- , Items(Children.size())
{}
- TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final {
- if (IsNull(data, index)) {
- return {};
- }
-
- for (ui32 i = 0; i < Children.size(); ++i) {
- Items[i] = Children[i]->GetItem(*data.child_data[i], index);
- }
-
- return TBlockItem(Items.data());
- }
-
- TBlockItem GetScalarItem(const arrow::Scalar& scalar) final {
- if (!scalar.is_valid) {
- return {};
- }
-
- const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(scalar);
-
- for (ui32 i = 0; i < Children.size(); ++i) {
- Items[i] = Children[i]->GetScalarItem(*structScalar.value[i]);
- }
-
- return TBlockItem(Items.data());
- }
-
NUdf::TUnboxedValuePod MakeValue(TBlockItem item, const THolderFactory& holderFactory) const final {
if (!item) {
return {};
@@ -127,34 +57,16 @@ public:
}
private:
- const TVector<std::unique_ptr<IBlockReader>> Children;
- TVector<TBlockItem> Items;
+ const TVector<std::unique_ptr<IBlockItemConverter>> Children;
mutable TPlainContainerCache Cache;
};
-class TExternalOptionalBlockReader : public IBlockReader {
+class TExternalOptionalBlockItemConverter : public IBlockItemConverter {
public:
- TExternalOptionalBlockReader(std::unique_ptr<IBlockReader>&& inner)
+ TExternalOptionalBlockItemConverter(std::unique_ptr<IBlockItemConverter>&& inner)
: Inner(std::move(inner))
{}
- TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final {
- if (IsNull(data, index)) {
- return {};
- }
-
- return Inner->GetItem(*data.child_data[0], index).MakeOptional();
- }
-
- TBlockItem GetScalarItem(const arrow::Scalar& scalar) final {
- if (!scalar.is_valid) {
- return {};
- }
-
- const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(scalar);
- return Inner->GetScalarItem(*structScalar.value[0]).MakeOptional();
- }
-
NUdf::TUnboxedValuePod MakeValue(TBlockItem item, const THolderFactory& holderFactory) const final {
if (!item) {
return {};
@@ -163,86 +75,23 @@ public:
}
private:
- const std::unique_ptr<IBlockReader> Inner;
+ const std::unique_ptr<IBlockItemConverter> Inner;
};
-} // namespace
-
-std::unique_ptr<IBlockReader> MakeBlockReader(TType* type) {
- TType* unpacked = type;
- if (type->IsOptional()) {
- unpacked = AS_TYPE(TOptionalType, type)->GetItemType();
- }
-
- if (unpacked->IsOptional()) {
- // at least 2 levels of optionals
- ui32 nestLevel = 0;
- auto currentType = type;
- auto previousType = type;
- do {
- ++nestLevel;
- previousType = currentType;
- currentType = AS_TYPE(TOptionalType, currentType)->GetItemType();
- } while (currentType->IsOptional());
-
- std::unique_ptr<IBlockReader> reader = MakeBlockReader(previousType);
- for (ui32 i = 1; i < nestLevel; ++i) {
- reader = std::make_unique<TExternalOptionalBlockReader>(std::move(reader));
- }
-
- return reader;
- } else {
- type = unpacked;
- }
-
- if (type->IsTuple()) {
- auto tupleType = AS_TYPE(TTupleType, type);
- TVector<std::unique_ptr<IBlockReader>> children;
- for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
- children.emplace_back(MakeBlockReader(tupleType->GetElementType(i)));
- }
-
- return std::make_unique<TTupleBlockReader>(std::move(children));
- }
+struct TConverterTraits {
+ using TResult = IBlockItemConverter;
+ using TTuple = TTupleBlockItemConverter;
+ template <typename T>
+ using TFixedSize = TFixedSizeBlockItemConverter<T>;
+ template <typename TStringType>
+ using TStrings = TStringBlockItemConverter<TStringType>;
+ using TExtOptional = TExternalOptionalBlockItemConverter;
+};
- if (type->IsData()) {
- auto slot = *AS_TYPE(TDataType, type)->GetDataSlot();
- switch (slot) {
- case NUdf::EDataSlot::Int8:
- return std::make_unique<TFixedSizeBlockReader<i8>>();
- case NUdf::EDataSlot::Bool:
- case NUdf::EDataSlot::Uint8:
- return std::make_unique<TFixedSizeBlockReader<ui8>>();
- case NUdf::EDataSlot::Int16:
- return std::make_unique<TFixedSizeBlockReader<i16>>();
- case NUdf::EDataSlot::Uint16:
- case NUdf::EDataSlot::Date:
- return std::make_unique<TFixedSizeBlockReader<ui16>>();
- case NUdf::EDataSlot::Int32:
- return std::make_unique<TFixedSizeBlockReader<i32>>();
- case NUdf::EDataSlot::Uint32:
- case NUdf::EDataSlot::Datetime:
- return std::make_unique<TFixedSizeBlockReader<ui32>>();
- case NUdf::EDataSlot::Int64:
- case NUdf::EDataSlot::Interval:
- return std::make_unique<TFixedSizeBlockReader<i64>>();
- case NUdf::EDataSlot::Uint64:
- case NUdf::EDataSlot::Timestamp:
- return std::make_unique<TFixedSizeBlockReader<ui64>>();
- case NUdf::EDataSlot::Float:
- return std::make_unique<TFixedSizeBlockReader<float>>();
- case NUdf::EDataSlot::Double:
- return std::make_unique<TFixedSizeBlockReader<double>>();
- case NUdf::EDataSlot::String:
- return std::make_unique<TStringBlockReader<arrow::BinaryType>>();
- case NUdf::EDataSlot::Utf8:
- return std::make_unique<TStringBlockReader<arrow::StringType>>();
- default:
- MKQL_ENSURE(false, "Unsupported data slot");
- }
- }
+} // namespace
- MKQL_ENSURE(false, "Unsupported type");
+std::unique_ptr<IBlockItemConverter> MakeBlockItemConverter(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type) {
+ return NYql::NUdf::MakeBlockReaderImpl<TConverterTraits>(typeInfoHelper, type);
}
} // namespace NMiniKQL
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.h
index 4e144d78a5..3e51769e7f 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.h
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.h
@@ -5,20 +5,24 @@
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/minikql/mkql_node.h>
+#include <ydb/library/yql/public/udf/udf_types.h>
+#include <ydb/library/yql/public/udf/arrow/block_reader.h>
+
#include <arrow/datum.h>
namespace NKikimr::NMiniKQL {
-class IBlockReader : private TNonCopyable {
+using NYql::NUdf::IBlockReader;
+
+class IBlockItemConverter {
public:
- virtual ~IBlockReader() = default;
- // result will reference to Array/Scalar internals and will be valid until next call to GetItem/GetScalarItem
- virtual TBlockItem GetItem(const arrow::ArrayData& data, size_t index) = 0;
- virtual TBlockItem GetScalarItem(const arrow::Scalar& scalar) = 0;
+ virtual ~IBlockItemConverter() = default;
virtual NUdf::TUnboxedValuePod MakeValue(TBlockItem item, const THolderFactory& holderFactory) const = 0;
};
-std::unique_ptr<IBlockReader> MakeBlockReader(TType* type);
+using NYql::NUdf::MakeBlockReader;
+
+std::unique_ptr<IBlockItemConverter> MakeBlockItemConverter(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type);
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
index 2e02a6a9c6..4f747fb748 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
@@ -30,7 +30,7 @@ public:
NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
const auto maxLen = CalcBlockLen(CalcMaxBlockItemSize(ItemType_));
- auto builder = MakeBlockBuilder(ItemType_, ctx.ArrowMemoryPool, maxLen);
+ auto builder = MakeArrayBuilder(TTypeInfoHelper(), ItemType_, ctx.ArrowMemoryPool, maxLen);
for (size_t i = 0; i < builder->MaxLength(); ++i) {
auto result = Flow_->GetValue(ctx);
@@ -114,7 +114,7 @@ private:
struct TState : public TComputationValue<TState> {
std::vector<NUdf::TUnboxedValue> Values_;
std::vector<NUdf::TUnboxedValue*> ValuePointers_;
- std::vector<std::unique_ptr<IBlockBuilder>> Builders_;
+ std::vector<std::unique_ptr<IArrayBuilder>> Builders_;
size_t MaxLength_;
size_t Rows_ = 0;
bool IsFinished_ = false;
@@ -132,7 +132,7 @@ private:
for (size_t i = 0; i < types.size(); ++i) {
ValuePointers_[i] = &Values_[i];
- Builders_.push_back(MakeBlockBuilder(types[i], ctx.ArrowMemoryPool, MaxLength_));
+ Builders_.push_back(MakeArrayBuilder(TTypeInfoHelper(), types[i], ctx.ArrowMemoryPool, MaxLength_));
}
}
};
@@ -192,7 +192,8 @@ private:
TState(TMemoryUsageInfo* memInfo, TType* itemType)
: TComputationValue(memInfo)
- , Reader_(MakeBlockReader(itemType))
+ , Reader_(MakeBlockReader(TTypeInfoHelper(), itemType))
+ , Converter_(MakeBlockItemConverter(TTypeInfoHelper(), itemType))
{
}
@@ -207,7 +208,7 @@ private:
Index_ = 0;
Arrays_.pop_front();
}
- return Reader_->MakeValue(Reader_->GetItem(*Arrays_.front(), Index_++), ctx.HolderFactory);
+ return Converter_->MakeValue(Reader_->GetItem(*Arrays_.front(), Index_++), ctx.HolderFactory);
}
void Reset(const arrow::Datum& datum) {
@@ -225,6 +226,7 @@ private:
private:
const std::unique_ptr<IBlockReader> Reader_;
+ const std::unique_ptr<IBlockItemConverter> Converter_;
TDeque<std::shared_ptr<arrow::ArrayData>> Arrays_;
size_t Index_ = 0;
};
@@ -289,7 +291,7 @@ public:
item = s.Readers_[i]->GetItem(*datum.array(), s.Index_);
}
- *(output[i]) = s.Readers_[i]->MakeValue(item, ctx.HolderFactory);
+ *(output[i]) = s.Converters_[i]->MakeValue(item, ctx.HolderFactory);
}
++s.Index_;
@@ -301,6 +303,7 @@ private:
TVector<NUdf::TUnboxedValue> Values_;
TVector<NUdf::TUnboxedValue*> ValuePointers_;
TVector<std::unique_ptr<IBlockReader>> Readers_;
+ TVector<std::unique_ptr<IBlockItemConverter>> Converters_;
size_t Count_ = 0;
size_t Index_ = 0;
@@ -314,7 +317,8 @@ private:
}
for (size_t i = 0; i < types.size(); ++i) {
- Readers_.push_back(MakeBlockReader(types[i]));
+ Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), types[i]));
+ Converters_.push_back(MakeBlockItemConverter(TTypeInfoHelper(), types[i]));
}
}
};
diff --git a/ydb/library/yql/minikql/mkql_type_builder.cpp b/ydb/library/yql/minikql/mkql_type_builder.cpp
index 773e52a93e..3d9e423a88 100644
--- a/ydb/library/yql/minikql/mkql_type_builder.cpp
+++ b/ydb/library/yql/minikql/mkql_type_builder.cpp
@@ -1860,6 +1860,14 @@ NUdf::IArrowType::TPtr TTypeInfoHelper::ImportArrowType(ArrowSchema* schema) con
return new TArrowType(std::move(res).ValueOrDie());
}
+ui64 TTypeInfoHelper::GetMaxBlockLength(const NUdf::TType* type) const {
+ return CalcBlockLen(CalcMaxBlockItemSize(static_cast<const TType*>(type)));
+}
+
+ui64 TTypeInfoHelper::GetMaxBlockBytes() const {
+ return MaxBlockSizeInBytes;
+}
+
void TTypeInfoHelper::DoData(const NMiniKQL::TDataType* dt, NUdf::ITypeVisitor* v) {
const auto typeId = dt->GetSchemeType();
v->OnDataType(typeId);
@@ -2097,5 +2105,56 @@ NUdf::IEquate::TPtr MakeEquateImpl(const NMiniKQL::TType* type) {
}
}
+size_t CalcMaxBlockItemSize(const TType* type) {
+ // we do not count block bitmap size
+ if (type->IsOptional()) {
+ return CalcMaxBlockItemSize(AS_TYPE(TOptionalType, type)->GetItemType());
+ }
+
+ if (type->IsTuple()) {
+ auto tupleType = AS_TYPE(TTupleType, type);
+ size_t result = 0;
+ for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
+ result = std::max(result, CalcMaxBlockItemSize(tupleType->GetElementType(i)));
+ }
+ return result;
+ }
+
+ if (type->IsData()) {
+ auto slot = *AS_TYPE(TDataType, type)->GetDataSlot();
+ switch (slot) {
+ case NUdf::EDataSlot::Int8:
+ case NUdf::EDataSlot::Uint8:
+ case NUdf::EDataSlot::Bool:
+ case NUdf::EDataSlot::Int16:
+ case NUdf::EDataSlot::Uint16:
+ case NUdf::EDataSlot::Date:
+ case NUdf::EDataSlot::Int32:
+ case NUdf::EDataSlot::Uint32:
+ case NUdf::EDataSlot::Datetime:
+ case NUdf::EDataSlot::Int64:
+ case NUdf::EDataSlot::Interval:
+ case NUdf::EDataSlot::Uint64:
+ case NUdf::EDataSlot::Timestamp:
+ case NUdf::EDataSlot::Float:
+ case NUdf::EDataSlot::Double: {
+ size_t sz = GetDataTypeInfo(slot).FixedSize;
+ MKQL_ENSURE(sz > 0, "Unexpected fixed data size");
+ return sz;
+ }
+ case NUdf::EDataSlot::String:
+ // size of offset part
+ return sizeof(arrow::BinaryType::offset_type);
+ case NUdf::EDataSlot::Utf8:
+ // size of offset part
+ return sizeof(arrow::StringType::offset_type);
+ default:
+ MKQL_ENSURE(false, "Unsupported data slot");
+ }
+ }
+
+ MKQL_ENSURE(false, "Unsupported type");
+}
+
} // namespace NMiniKQL
} // namespace Nkikimr
diff --git a/ydb/library/yql/minikql/mkql_type_builder.h b/ydb/library/yql/minikql/mkql_type_builder.h
index 0d1fe86799..4861cecca7 100644
--- a/ydb/library/yql/minikql/mkql_type_builder.h
+++ b/ydb/library/yql/minikql/mkql_type_builder.h
@@ -5,11 +5,23 @@
#include <ydb/library/yql/public/udf/udf_type_builder.h>
#include <ydb/library/yql/parser/pg_wrapper/interface/compare.h>
+#include <util/generic/size_literals.h>
+
#include <arrow/datum.h>
namespace NKikimr {
namespace NMiniKQL {
+constexpr size_t MaxBlockSizeInBytes = 1_MB;
+static_assert(MaxBlockSizeInBytes < (size_t)std::numeric_limits<i32>::max());
+
+// maximum size of block item in bytes
+size_t CalcMaxBlockItemSize(const TType* type);
+
+inline size_t CalcBlockLen(size_t maxBlockItemSize) {
+ return MaxBlockSizeInBytes / std::max<size_t>(maxBlockItemSize, 1);
+}
+
bool ConvertArrowType(TType* itemType, std::shared_ptr<arrow::DataType>& type);
bool ConvertArrowType(NUdf::EDataSlot slot, std::shared_ptr<arrow::DataType>& type);
@@ -179,6 +191,8 @@ public:
const NYql::NUdf::TPgTypeDescription* FindPgTypeDescription(ui32 typeId) const override;
NUdf::IArrowType::TPtr MakeArrowType(const NUdf::TType* type) const override;
NUdf::IArrowType::TPtr ImportArrowType(ArrowSchema* schema) const override;
+ ui64 GetMaxBlockLength(const NUdf::TType* type) const override;
+ ui64 GetMaxBlockBytes() const override;
private:
static void DoData(const NMiniKQL::TDataType* dt, NUdf::ITypeVisitor* v);
diff --git a/ydb/library/yql/public/udf/arrow/CMakeLists.darwin.txt b/ydb/library/yql/public/udf/arrow/CMakeLists.darwin.txt
index deda564cfd..c928bed6ea 100644
--- a/ydb/library/yql/public/udf/arrow/CMakeLists.darwin.txt
+++ b/ydb/library/yql/public/udf/arrow/CMakeLists.darwin.txt
@@ -20,5 +20,8 @@ target_link_libraries(public-udf-arrow PUBLIC
target_sources(public-udf-arrow PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/args_dechunker.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/bit_util.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/util.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/block_reader.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/block_item.cpp
)
diff --git a/ydb/library/yql/public/udf/arrow/CMakeLists.linux-aarch64.txt b/ydb/library/yql/public/udf/arrow/CMakeLists.linux-aarch64.txt
index a87a49d25a..4c3155d270 100644
--- a/ydb/library/yql/public/udf/arrow/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/public/udf/arrow/CMakeLists.linux-aarch64.txt
@@ -21,5 +21,8 @@ target_link_libraries(public-udf-arrow PUBLIC
target_sources(public-udf-arrow PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/args_dechunker.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/bit_util.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/util.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/block_reader.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/block_item.cpp
)
diff --git a/ydb/library/yql/public/udf/arrow/CMakeLists.linux.txt b/ydb/library/yql/public/udf/arrow/CMakeLists.linux.txt
index a87a49d25a..4c3155d270 100644
--- a/ydb/library/yql/public/udf/arrow/CMakeLists.linux.txt
+++ b/ydb/library/yql/public/udf/arrow/CMakeLists.linux.txt
@@ -21,5 +21,8 @@ target_link_libraries(public-udf-arrow PUBLIC
target_sources(public-udf-arrow PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/args_dechunker.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/bit_util.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/util.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/block_reader.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/block_item.cpp
)
diff --git a/ydb/library/yql/public/udf/arrow/bit_util.cpp b/ydb/library/yql/public/udf/arrow/bit_util.cpp
new file mode 100644
index 0000000000..569e207167
--- /dev/null
+++ b/ydb/library/yql/public/udf/arrow/bit_util.cpp
@@ -0,0 +1,7 @@
+#include "bit_util.h"
+
+namespace NYql {
+namespace NUdf {
+
+}
+}
diff --git a/ydb/library/yql/public/udf/arrow/bit_util.h b/ydb/library/yql/public/udf/arrow/bit_util.h
new file mode 100644
index 0000000000..218b2f0925
--- /dev/null
+++ b/ydb/library/yql/public/udf/arrow/bit_util.h
@@ -0,0 +1,68 @@
+#pragma once
+
+#include "defs.h"
+
+namespace NYql {
+namespace NUdf {
+
+inline ui8* CompressAsSparseBitmap(const ui8* src, size_t srcOffset, const ui8* sparseBitmap, ui8* dst, size_t count) {
+ while (count--) {
+ ui8 inputBit = (src[srcOffset >> 3] >> (srcOffset & 7)) & 1;
+ *dst = inputBit;
+ ++srcOffset;
+ dst += *sparseBitmap++;
+ }
+ return dst;
+}
+
+template<bool Negate>
+inline void CompressSparseImpl(ui8* dst, const ui8* srcSparse, size_t len) {
+ while (len >= 8) {
+ ui8 result = 0;
+ result |= (*srcSparse++ & 1u) << 0;
+ result |= (*srcSparse++ & 1u) << 1;
+ result |= (*srcSparse++ & 1u) << 2;
+ result |= (*srcSparse++ & 1u) << 3;
+ result |= (*srcSparse++ & 1u) << 4;
+ result |= (*srcSparse++ & 1u) << 5;
+ result |= (*srcSparse++ & 1u) << 6;
+ result |= (*srcSparse++ & 1u) << 7;
+ if constexpr (Negate) {
+ *dst++ = ~result;
+ } else {
+ *dst++ = result;
+ }
+ len -= 8;
+ }
+ if (len) {
+ ui8 result = 0;
+ for (ui8 i = 0; i < len; ++i) {
+ result |= (*srcSparse++ & 1u) << i;
+ }
+ if constexpr (Negate) {
+ *dst++ = ~result;
+ } else {
+ *dst++ = result;
+ }
+ }
+}
+
+inline void CompressSparseBitmap(ui8* dst, const ui8* srcSparse, size_t len) {
+ return CompressSparseImpl<false>(dst, srcSparse, len);
+}
+
+inline void CompressSparseBitmapNegate(ui8* dst, const ui8* srcSparse, size_t len) {
+ return CompressSparseImpl<true>(dst, srcSparse, len);
+}
+
+template<typename T>
+inline T* CompressArray(const T* src, const ui8* sparseBitmap, T* dst, size_t count) {
+ while (count--) {
+ *dst = *src++;
+ dst += *sparseBitmap++;
+ }
+ return dst;
+}
+
+}
+}
diff --git a/ydb/library/yql/public/udf/arrow/block_builder.h b/ydb/library/yql/public/udf/arrow/block_builder.h
new file mode 100644
index 0000000000..f8aa5c8f6a
--- /dev/null
+++ b/ydb/library/yql/public/udf/arrow/block_builder.h
@@ -0,0 +1,784 @@
+#pragma once
+
+#include "util.h"
+#include "bit_util.h"
+
+#include <ydb/library/yql/public/udf/udf_type_inspection.h>
+
+#include <arrow/datum.h>
+#include <arrow/c/bridge.h>
+
+namespace NYql {
+namespace NUdf {
+
+class IArrayBuilder {
+public:
+ virtual ~IArrayBuilder() = default;
+ virtual size_t MaxLength() const = 0;
+ virtual void Add(NUdf::TUnboxedValuePod value) = 0;
+ virtual void Add(TBlockItem value) = 0;
+ virtual void AddMany(const arrow::ArrayData& array, size_t popCount, const ui8* sparseBitmap, size_t bitmapSize) = 0;
+ virtual arrow::Datum Build(bool finish) = 0;
+};
+
+class IScalarBuilder {
+public:
+ virtual ~IScalarBuilder() = default;
+ virtual arrow::Datum Build(TBlockItem value) const = 0;
+};
+
+inline std::shared_ptr<arrow::DataType> GetArrowType(const ITypeInfoHelper& typeInfoHelper, const TType* type) {
+ auto arrowTypeHandle = typeInfoHelper.MakeArrowType(type);
+ Y_ENSURE(arrowTypeHandle);
+ ArrowSchema s;
+ arrowTypeHandle->Export(&s);
+ return ARROW_RESULT(arrow::ImportType(&s));
+}
+
+class TArrayBuilderBase : public IArrayBuilder {
+public:
+ using Ptr = std::unique_ptr<TArrayBuilderBase>;
+
+ struct TBlockArrayTree {
+ using Ptr = std::shared_ptr<TBlockArrayTree>;
+ std::deque<std::shared_ptr<arrow::ArrayData>> Payload;
+ std::vector<TBlockArrayTree::Ptr> Children;
+ };
+
+ TArrayBuilderBase(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen)
+ : ArrowType(GetArrowType(typeInfoHelper, type))
+ , Pool(&pool)
+ , MaxLen(maxLen)
+ , MaxBlockSizeInBytes(typeInfoHelper.GetMaxBlockBytes())
+ {
+ Y_VERIFY(ArrowType);
+ Y_VERIFY(maxLen > 0);
+ }
+
+ size_t MaxLength() const final {
+ return MaxLen;
+ }
+
+ void Add(NUdf::TUnboxedValuePod value) final {
+ Y_VERIFY(CurrLen < MaxLen);
+ DoAdd(value);
+ CurrLen++;
+ }
+
+ void Add(TBlockItem value) final {
+ Y_VERIFY(CurrLen < MaxLen);
+ DoAdd(value);
+ CurrLen++;
+ }
+
+
+ void AddDefault() {
+ Y_VERIFY(CurrLen < MaxLen);
+ DoAddDefault();
+ CurrLen++;
+ }
+
+ void AddMany(const arrow::ArrayData& array, size_t popCount, const ui8* sparseBitmap, size_t bitmapSize) final {
+ Y_VERIFY(size_t(array.length) == bitmapSize);
+ Y_VERIFY(popCount <= bitmapSize);
+ Y_VERIFY(CurrLen + popCount <= MaxLen);
+
+ if (popCount) {
+ DoAddMany(array, sparseBitmap, popCount);
+ }
+
+ CurrLen += popCount;
+ }
+
+ arrow::Datum Build(bool finish) final {
+ auto tree = BuildTree(finish);
+ TVector<std::shared_ptr<arrow::ArrayData>> chunks;
+ while (size_t size = CalcSliceSize(*tree)) {
+ chunks.push_back(Slice(*tree, size));
+ }
+
+ return MakeArray(chunks);
+ }
+
+ TBlockArrayTree::Ptr BuildTree(bool finish) {
+ auto result = DoBuildTree(finish);
+ CurrLen = 0;
+ return result;
+ }
+protected:
+ virtual void DoAdd(NUdf::TUnboxedValuePod value) = 0;
+ virtual void DoAdd(TBlockItem value) = 0;
+ virtual void DoAddDefault() = 0;
+ virtual void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) = 0;
+ virtual TBlockArrayTree::Ptr DoBuildTree(bool finish) = 0;
+
+private:
+ static size_t CalcSliceSize(const TBlockArrayTree& tree) {
+ if (tree.Payload.empty()) {
+ return 0;
+ }
+
+ if (!tree.Children.empty()) {
+ Y_VERIFY(tree.Payload.size() == 1);
+ size_t result = std::numeric_limits<size_t>::max();
+ for (auto& child : tree.Children) {
+ size_t childSize = CalcSliceSize(*child);
+ result = std::min(result, childSize);
+ }
+ Y_VERIFY(result <= size_t(tree.Payload.front()->length));
+ return result;
+ }
+
+ int64_t result = std::numeric_limits<int64_t>::max();
+ for (auto& data : tree.Payload) {
+ result = std::min(result, data->length);
+ }
+
+ Y_VERIFY(result > 0);
+ return static_cast<size_t>(result);
+ }
+
+ static std::shared_ptr<arrow::ArrayData> Slice(TBlockArrayTree& tree, size_t size) {
+ Y_VERIFY(size > 0);
+
+ Y_VERIFY(!tree.Payload.empty());
+ auto& main = tree.Payload.front();
+ std::shared_ptr<arrow::ArrayData> sliced;
+ if (size == size_t(main->length)) {
+ sliced = main;
+ tree.Payload.pop_front();
+ } else {
+ Y_VERIFY(size < size_t(main->length));
+ sliced = Chop(main, size);
+ }
+
+ if (!tree.Children.empty()) {
+ std::vector<std::shared_ptr<arrow::ArrayData>> children;
+ for (auto& child : tree.Children) {
+ children.push_back(Slice(*child, size));
+ }
+
+ sliced->child_data = std::move(children);
+ if (tree.Payload.empty()) {
+ tree.Children.clear();
+ }
+ }
+ return sliced;
+ }
+
+protected:
+ size_t GetCurrLen() const {
+ return CurrLen;
+ }
+
+ const std::shared_ptr<arrow::DataType> ArrowType;
+ arrow::MemoryPool* const Pool;
+ const size_t MaxLen;
+ const size_t MaxBlockSizeInBytes;
+private:
+ size_t CurrLen = 0;
+};
+
+template <typename T, bool Nullable>
+class TFixedSizeArrayBuilder : public TArrayBuilderBase {
+public:
+ TFixedSizeArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen)
+ : TArrayBuilderBase(typeInfoHelper, type, pool, maxLen)
+ {
+ Reserve();
+ }
+
+ void DoAdd(NUdf::TUnboxedValuePod value) final {
+ if constexpr (Nullable) {
+ if (!value) {
+ return DoAdd(TBlockItem{});
+ }
+ }
+ DoAdd(TBlockItem(value.Get<T>()));
+ }
+
+ void DoAdd(TBlockItem value) final {
+ if constexpr (Nullable) {
+ if (!value) {
+ NullBuilder->UnsafeAppend(0);
+ DataBuilder->UnsafeAppend(T{});
+ return;
+ }
+ NullBuilder->UnsafeAppend(1);
+ }
+
+ DataBuilder->UnsafeAppend(value.As<T>());
+ }
+
+ void DoAddDefault() final {
+ if constexpr (Nullable) {
+ NullBuilder->UnsafeAppend(1);
+ }
+ DataBuilder->UnsafeAppend(T{});
+ }
+
+ void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final {
+ Y_VERIFY(array.buffers.size() > 1);
+ if constexpr (Nullable) {
+ Y_VERIFY(NullBuilder->Length() == DataBuilder->Length());
+ if (array.buffers.front()) {
+ ui8* dstBitmap = NullBuilder->End();
+ CompressAsSparseBitmap(array.GetValues<ui8>(0, 0), array.offset, sparseBitmap, dstBitmap, array.length);
+ NullBuilder->UnsafeAdvance(popCount);
+ } else {
+ NullBuilder->UnsafeAppend(popCount, 1);
+ }
+ }
+
+ const T* src = array.GetValues<T>(1);
+ T* dst = DataBuilder->End();
+ CompressArray(src, sparseBitmap, dst, array.length);
+ DataBuilder->UnsafeAdvance(popCount);
+ }
+
+ TBlockArrayTree::Ptr DoBuildTree(bool finish) final {
+ const size_t len = DataBuilder->Length();
+ std::shared_ptr<arrow::Buffer> nulls;
+ if constexpr (Nullable) {
+ Y_VERIFY(NullBuilder->Length() == len);
+ nulls = NullBuilder->Finish();
+ nulls = MakeDenseBitmap(nulls->data(), len, Pool);
+ }
+ std::shared_ptr<arrow::Buffer> data = DataBuilder->Finish();
+
+ TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>();
+ result->Payload.push_back(arrow::ArrayData::Make(ArrowType, len, {nulls, data}));
+
+ NullBuilder.reset();
+ DataBuilder.reset();
+ if (!finish) {
+ Reserve();
+ }
+ return result;
+ }
+
+private:
+ void Reserve() {
+ DataBuilder = std::make_unique<TTypedBufferBuilder<T>>(Pool);
+ DataBuilder->Reserve(MaxLen + 1);
+ if constexpr (Nullable) {
+ NullBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool);
+ NullBuilder->Reserve(MaxLen + 1);
+ }
+ }
+
+ std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder;
+ std::unique_ptr<TTypedBufferBuilder<T>> DataBuilder;
+};
+
+template<typename TStringType, bool Nullable>
+class TStringArrayBuilder : public TArrayBuilderBase {
+public:
+ using TOffset = typename TStringType::offset_type;
+
+ TStringArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen)
+ : TArrayBuilderBase(typeInfoHelper, type, pool, maxLen)
+ {
+ Reserve();
+ }
+
+ void DoAdd(NUdf::TUnboxedValuePod value) final {
+ if constexpr (Nullable) {
+ if (!value) {
+ return DoAdd(TBlockItem{});
+ }
+ }
+
+ DoAdd(TBlockItem(value.AsStringRef()));
+ }
+
+ void DoAdd(TBlockItem value) final {
+ if constexpr (Nullable) {
+ if (!value) {
+ NullBuilder->UnsafeAppend(0);
+ AppendCurrentOffset();
+ return;
+ }
+ }
+
+ const std::string_view str = value.AsStringRef();
+
+ size_t currentLen = DataBuilder->Length();
+ // empty string can always be appended
+ if (!str.empty() && currentLen + str.size() > MaxBlockSizeInBytes) {
+ if (currentLen) {
+ FlushChunk(false);
+ }
+ if (str.size() > MaxBlockSizeInBytes) {
+ DataBuilder->Reserve(str.size());
+ }
+ }
+
+ AppendCurrentOffset();
+ DataBuilder->UnsafeAppend((const ui8*)str.data(), str.size());
+ if constexpr (Nullable) {
+ NullBuilder->UnsafeAppend(1);
+ }
+ }
+
+
+ void DoAddDefault() final {
+ if constexpr (Nullable) {
+ NullBuilder->UnsafeAppend(1);
+ }
+ AppendCurrentOffset();
+ }
+
+ void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final {
+ Y_UNUSED(popCount);
+ Y_VERIFY(array.buffers.size() > 2);
+ Y_VERIFY(!Nullable || NullBuilder->Length() == OffsetsBuilder->Length());
+
+ const ui8* srcNulls = array.GetValues<ui8>(0, 0);
+ const TOffset* srcOffset = array.GetValues<TOffset>(1);
+ const ui8* srcData = array.GetValues<ui8>(2, 0);
+
+ const ui8* chunkStart = srcData;
+ const ui8* chunkEnd = chunkStart;
+ size_t dataLen = DataBuilder->Length();
+
+ ui8* dstNulls = Nullable ? NullBuilder->End() : nullptr;
+ TOffset* dstOffset = OffsetsBuilder->End();
+ size_t countAdded = 0;
+ for (size_t i = 0; i < size_t(array.length); i++) {
+ if (!sparseBitmap[i]) {
+ continue;
+ }
+
+ const ui8* begin = srcData + srcOffset[i];
+ const ui8* end = srcData + srcOffset[i + 1];
+ const size_t strSize = end - begin;
+
+ size_t availBytes = std::max(dataLen, MaxBlockSizeInBytes) - dataLen;
+
+ for (;;) {
+ // try to append ith string
+ if (strSize <= availBytes) {
+ if (begin == chunkEnd) {
+ chunkEnd = end;
+ } else {
+ DataBuilder->UnsafeAppend(chunkStart, chunkEnd - chunkStart);
+ chunkStart = begin;
+ chunkEnd = end;
+ }
+
+ size_t nullOffset = i + array.offset;
+ if constexpr (Nullable) {
+ *dstNulls++ = srcNulls ? ((srcNulls[nullOffset >> 3] >> (nullOffset & 7)) & 1) : 1u;
+ }
+ *dstOffset++ = dataLen;
+
+ dataLen += strSize;
+ ++countAdded;
+
+ break;
+ }
+
+ if (dataLen) {
+ if (chunkStart != chunkEnd) {
+ DataBuilder->UnsafeAppend(chunkStart, chunkEnd - chunkStart);
+ chunkStart = chunkEnd = srcData;
+ }
+ Y_VERIFY(dataLen == DataBuilder->Length());
+ OffsetsBuilder->UnsafeAdvance(countAdded);
+ if constexpr (Nullable) {
+ NullBuilder->UnsafeAdvance(countAdded);
+ }
+ FlushChunk(false);
+
+ dataLen = 0;
+ countAdded = 0;
+ if constexpr (Nullable) {
+ dstNulls = NullBuilder->End();
+ }
+ dstOffset = OffsetsBuilder->End();
+ } else {
+ DataBuilder->Reserve(strSize);
+ availBytes = strSize;
+ }
+ }
+ }
+ if (chunkStart != chunkEnd) {
+ DataBuilder->UnsafeAppend(chunkStart, chunkEnd - chunkStart);
+ }
+ Y_VERIFY(dataLen == DataBuilder->Length());
+ OffsetsBuilder->UnsafeAdvance(countAdded);
+ if constexpr (Nullable) {
+ NullBuilder->UnsafeAdvance(countAdded);
+ }
+ }
+
+ TBlockArrayTree::Ptr DoBuildTree(bool finish) final {
+ FlushChunk(finish);
+ TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>();
+ result->Payload = std::move(Chunks);
+ Chunks.clear();
+ return result;
+ }
+
+private:
+ void Reserve() {
+ if constexpr (Nullable) {
+ NullBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool);
+ NullBuilder->Reserve(MaxLen + 1);
+ }
+ OffsetsBuilder = std::make_unique<TTypedBufferBuilder<TOffset>>(Pool);
+ OffsetsBuilder->Reserve(MaxLen + 1);
+ DataBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool);
+ DataBuilder->Reserve(MaxBlockSizeInBytes);
+ }
+
+ void AppendCurrentOffset() {
+ OffsetsBuilder->UnsafeAppend(DataBuilder->Length());
+ }
+
+ void FlushChunk(bool finish) {
+ const auto length = OffsetsBuilder->Length();
+ Y_VERIFY(length > 0);
+
+ AppendCurrentOffset();
+ std::shared_ptr<arrow::Buffer> nullBitmap;
+ if constexpr (Nullable) {
+ nullBitmap = NullBuilder->Finish();
+ nullBitmap = MakeDenseBitmap(nullBitmap->data(), length, Pool);
+ }
+ std::shared_ptr<arrow::Buffer> offsets = OffsetsBuilder->Finish();
+ std::shared_ptr<arrow::Buffer> data = DataBuilder->Finish();
+
+ auto arrowType = std::make_shared<TStringType>();
+ Chunks.push_back(arrow::ArrayData::Make(arrowType, length, { nullBitmap, offsets, data }));
+ if (!finish) {
+ Reserve();
+ }
+ }
+
+ std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder;
+ std::unique_ptr<TTypedBufferBuilder<TOffset>> OffsetsBuilder;
+ std::unique_ptr<TTypedBufferBuilder<ui8>> DataBuilder;
+
+ std::deque<std::shared_ptr<arrow::ArrayData>> Chunks;
+};
+
+template<bool Nullable>
+class TTupleArrayBuilder : public TArrayBuilderBase {
+public:
+ TTupleArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen,
+ TVector<TArrayBuilderBase::Ptr>&& children)
+ : TArrayBuilderBase(typeInfoHelper, type, pool, maxLen)
+ , Children(std::move(children))
+ {
+ Reserve();
+ }
+
+ void DoAdd(NUdf::TUnboxedValuePod value) final {
+ if constexpr (Nullable) {
+ if (!value) {
+ NullBuilder->UnsafeAppend(0);
+ for (ui32 i = 0; i < Children.size(); ++i) {
+ Children[i]->AddDefault();
+ }
+ return;
+ }
+ NullBuilder->UnsafeAppend(1);
+ }
+
+ auto elements = value.GetElements();
+ if (elements) {
+ for (ui32 i = 0; i < Children.size(); ++i) {
+ Children[i]->Add(elements[i]);
+ }
+ } else {
+ for (ui32 i = 0; i < Children.size(); ++i) {
+ auto element = value.GetElement(i);
+ Children[i]->Add(element);
+ }
+ }
+ }
+
+ void DoAdd(TBlockItem value) final {
+ if constexpr (Nullable) {
+ if (!value) {
+ NullBuilder->UnsafeAppend(0);
+ for (ui32 i = 0; i < Children.size(); ++i) {
+ Children[i]->AddDefault();
+ }
+ return;
+ }
+ NullBuilder->UnsafeAppend(1);
+ }
+
+ auto elements = value.AsTuple();
+ for (ui32 i = 0; i < Children.size(); ++i) {
+ Children[i]->Add(elements[i]);
+ }
+ }
+
+ void DoAddDefault() final {
+ if constexpr (Nullable) {
+ NullBuilder->UnsafeAppend(1);
+ }
+ for (ui32 i = 0; i < Children.size(); ++i) {
+ Children[i]->AddDefault();
+ }
+ }
+
+ void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final {
+ Y_VERIFY(!array.buffers.empty());
+ Y_VERIFY(array.child_data.size() == Children.size());
+
+ if constexpr (Nullable) {
+ if (array.buffers.front()) {
+ ui8* dstBitmap = NullBuilder->End();
+ CompressAsSparseBitmap(array.GetValues<ui8>(0, 0), array.offset, sparseBitmap, dstBitmap, array.length);
+ NullBuilder->UnsafeAdvance(popCount);
+ } else {
+ NullBuilder->UnsafeAppend(popCount, 1);
+ }
+ }
+
+ for (size_t i = 0; i < Children.size(); ++i) {
+ Children[i]->AddMany(*array.child_data[i], popCount, sparseBitmap, array.length);
+ }
+ }
+
+ TBlockArrayTree::Ptr DoBuildTree(bool finish) final {
+ TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>();
+
+ std::shared_ptr<arrow::Buffer> nullBitmap;
+ const size_t length = GetCurrLen();
+ if constexpr (Nullable) {
+ Y_ENSURE(length == NullBuilder->Length(), "Unexpected NullBuilder length");
+ nullBitmap = NullBuilder->Finish();
+ nullBitmap = MakeDenseBitmap(nullBitmap->data(), length, Pool);
+ }
+
+ Y_VERIFY(length);
+ result->Payload.push_back(arrow::ArrayData::Make(ArrowType, length, { nullBitmap }));
+ result->Children.reserve(Children.size());
+ for (ui32 i = 0; i < Children.size(); ++i) {
+ result->Children.emplace_back(Children[i]->BuildTree(finish));
+ }
+
+ if (!finish) {
+ Reserve();
+ }
+
+ return result;
+ }
+
+private:
+ void Reserve() {
+ if constexpr (Nullable) {
+ NullBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool);
+ NullBuilder->Reserve(MaxLen + 1);
+ }
+ }
+
+private:
+ TVector<std::unique_ptr<TArrayBuilderBase>> Children;
+ std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder;
+};
+
+class TExternalOptionalArrayBuilder : public TArrayBuilderBase {
+public:
+ TExternalOptionalArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, std::unique_ptr<TArrayBuilderBase>&& inner)
+ : TArrayBuilderBase(typeInfoHelper, type, pool, maxLen)
+ , Inner(std::move(inner))
+ {
+ Reserve();
+ }
+
+ void DoAdd(NUdf::TUnboxedValuePod value) final {
+ if (!value) {
+ NullBuilder->UnsafeAppend(0);
+ Inner->AddDefault();
+ return;
+ }
+
+ NullBuilder->UnsafeAppend(1);
+ Inner->Add(value.GetOptionalValue());
+ }
+
+ void DoAdd(TBlockItem value) final {
+ if (!value) {
+ NullBuilder->UnsafeAppend(0);
+ Inner->AddDefault();
+ return;
+ }
+
+ NullBuilder->UnsafeAppend(1);
+ Inner->Add(value.GetOptionalValue());
+ }
+
+ void DoAddDefault() final {
+ NullBuilder->UnsafeAppend(1);
+ Inner->AddDefault();
+ }
+
+ void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final {
+ Y_VERIFY(!array.buffers.empty());
+ Y_VERIFY(array.child_data.size() == 1);
+
+ if (array.buffers.front()) {
+ ui8* dstBitmap = NullBuilder->End();
+ CompressAsSparseBitmap(array.GetValues<ui8>(0, 0), array.offset, sparseBitmap, dstBitmap, array.length);
+ NullBuilder->UnsafeAdvance(popCount);
+ } else {
+ NullBuilder->UnsafeAppend(popCount, 1);
+ }
+
+ Inner->AddMany(*array.child_data[0], popCount, sparseBitmap, array.length);
+ }
+
+ TBlockArrayTree::Ptr DoBuildTree(bool finish) final {
+ TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>();
+
+ std::shared_ptr<arrow::Buffer> nullBitmap;
+ const size_t length = GetCurrLen();
+ Y_ENSURE(length == NullBuilder->Length(), "Unexpected NullBuilder length");
+ nullBitmap = NullBuilder->Finish();
+ nullBitmap = MakeDenseBitmap(nullBitmap->data(), length, Pool);
+
+ Y_VERIFY(length);
+ result->Payload.push_back(arrow::ArrayData::Make(ArrowType, length, { nullBitmap }));
+ result->Children.emplace_back(Inner->BuildTree(finish));
+
+ if (!finish) {
+ Reserve();
+ }
+
+ return result;
+ }
+
+private:
+ void Reserve() {
+ NullBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool);
+ NullBuilder->Reserve(MaxLen + 1);
+ }
+
+private:
+ std::unique_ptr<TArrayBuilderBase> Inner;
+ std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder;
+};
+
+std::unique_ptr<TArrayBuilderBase> MakeArrayBuilderBase(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxBlockLength);
+
+template<bool Nullable>
+inline std::unique_ptr<TArrayBuilderBase> MakeArrayBuilderImpl(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen) {
+ if constexpr (Nullable) {
+ TOptionalTypeInspector typeOpt(typeInfoHelper, type);
+ type = typeOpt.GetItemType();
+ }
+
+ TTupleTypeInspector typeTuple(typeInfoHelper, type);
+ if (typeTuple) {
+ TVector<std::unique_ptr<TArrayBuilderBase>> children;
+ for (ui32 i = 0; i < typeTuple.GetElementsCount(); ++i) {
+ const TType* childType = typeTuple.GetElementType(i);
+ auto childBuilder = MakeArrayBuilderBase(typeInfoHelper, childType, pool, maxLen);
+ children.push_back(std::move(childBuilder));
+ }
+
+ return std::make_unique<TTupleArrayBuilder<Nullable>>(typeInfoHelper, type, pool, maxLen, std::move(children));
+ }
+
+ TDataTypeInspector typeData(typeInfoHelper, type);
+ if (typeData) {
+ auto typeId = typeData.GetTypeId();
+ switch (GetDataSlot(typeId)) {
+ case NUdf::EDataSlot::Int8:
+ return std::make_unique<TFixedSizeArrayBuilder<i8, Nullable>>(typeInfoHelper, type, pool, maxLen);
+ case NUdf::EDataSlot::Uint8:
+ case NUdf::EDataSlot::Bool:
+ return std::make_unique<TFixedSizeArrayBuilder<ui8, Nullable>>(typeInfoHelper, type, pool, maxLen);
+ case NUdf::EDataSlot::Int16:
+ return std::make_unique<TFixedSizeArrayBuilder<i16, Nullable>>(typeInfoHelper, type, pool, maxLen);
+ case NUdf::EDataSlot::Uint16:
+ case NUdf::EDataSlot::Date:
+ return std::make_unique<TFixedSizeArrayBuilder<ui16, Nullable>>(typeInfoHelper, type, pool, maxLen);
+ case NUdf::EDataSlot::Int32:
+ return std::make_unique<TFixedSizeArrayBuilder<i32, Nullable>>(typeInfoHelper, type, pool, maxLen);
+ case NUdf::EDataSlot::Uint32:
+ case NUdf::EDataSlot::Datetime:
+ return std::make_unique<TFixedSizeArrayBuilder<ui32, Nullable>>(typeInfoHelper, type, pool, maxLen);
+ case NUdf::EDataSlot::Int64:
+ case NUdf::EDataSlot::Interval:
+ return std::make_unique<TFixedSizeArrayBuilder<i64, Nullable>>(typeInfoHelper, type, pool, maxLen);
+ case NUdf::EDataSlot::Uint64:
+ case NUdf::EDataSlot::Timestamp:
+ return std::make_unique<TFixedSizeArrayBuilder<ui64, Nullable>>(typeInfoHelper, type, pool, maxLen);
+ case NUdf::EDataSlot::Float:
+ return std::make_unique<TFixedSizeArrayBuilder<float, Nullable>>(typeInfoHelper, type, pool, maxLen);
+ case NUdf::EDataSlot::Double:
+ return std::make_unique<TFixedSizeArrayBuilder<double, Nullable>>(typeInfoHelper, type, pool, maxLen);
+ case NUdf::EDataSlot::String:
+ return std::make_unique<TStringArrayBuilder<arrow::BinaryType, Nullable>>(typeInfoHelper, type, pool, maxLen);
+ case NUdf::EDataSlot::Utf8:
+ return std::make_unique<TStringArrayBuilder<arrow::StringType, Nullable>>(typeInfoHelper, type, pool, maxLen);
+ default:
+ Y_ENSURE(false, "Unsupported data slot");
+ }
+ }
+
+ Y_ENSURE(false, "Unsupported type");
+}
+
+inline std::unique_ptr<TArrayBuilderBase> MakeArrayBuilderBase(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxBlockLength) {
+ const TType* unpacked = type;
+ TOptionalTypeInspector typeOpt(typeInfoHelper, type);
+ if (typeOpt) {
+ unpacked = typeOpt.GetItemType();
+ }
+
+ TOptionalTypeInspector unpackedOpt(typeInfoHelper, unpacked);
+ if (unpackedOpt) {
+ // at least 2 levels of optionals
+ ui32 nestLevel = 0;
+ auto currentType = type;
+ auto previousType = type;
+ TVector<const TType*> types;
+ for (;;) {
+ ++nestLevel;
+ previousType = currentType;
+ types.push_back(currentType);
+ TOptionalTypeInspector currentOpt(typeInfoHelper, currentType);
+ currentType = currentOpt.GetItemType();
+ TOptionalTypeInspector nexOpt(typeInfoHelper, currentType);
+ if (!nexOpt) {
+ break;
+ }
+ }
+
+ auto builder = MakeArrayBuilderBase(typeInfoHelper, previousType, pool, maxBlockLength);
+ for (ui32 i = 1; i < nestLevel; ++i) {
+ builder = std::make_unique<TExternalOptionalArrayBuilder>(typeInfoHelper, types[nestLevel - 1 - i], pool, maxBlockLength, std::move(builder));
+ }
+
+ return builder;
+ } else {
+ if (typeOpt) {
+ return MakeArrayBuilderImpl<true>(typeInfoHelper, type, pool, maxBlockLength);
+ } else {
+ return MakeArrayBuilderImpl<false>(typeInfoHelper, type, pool, maxBlockLength);
+ }
+ }
+}
+
+inline std::unique_ptr<IArrayBuilder> MakeArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxBlockLength) {
+ return MakeArrayBuilderBase(typeInfoHelper, type, pool, maxBlockLength);
+}
+
+inline std::unique_ptr<IScalarBuilder> MakeScalarBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type) {
+ Y_UNUSED(typeInfoHelper);
+ Y_UNUSED(type);
+ Y_ENSURE(false);
+ return nullptr;
+}
+
+}
+}
diff --git a/ydb/library/yql/public/udf/arrow/block_item.cpp b/ydb/library/yql/public/udf/arrow/block_item.cpp
new file mode 100644
index 0000000000..c5668bd73d
--- /dev/null
+++ b/ydb/library/yql/public/udf/arrow/block_item.cpp
@@ -0,0 +1,7 @@
+#include "block_item.h"
+
+namespace NYql {
+namespace NUdf {
+
+}
+}
diff --git a/ydb/library/yql/public/udf/arrow/block_item.h b/ydb/library/yql/public/udf/arrow/block_item.h
new file mode 100644
index 0000000000..727d273d3a
--- /dev/null
+++ b/ydb/library/yql/public/udf/arrow/block_item.h
@@ -0,0 +1,149 @@
+#pragma once
+
+#include <ydb/library/yql/public/udf/udf_data_type.h>
+#include <ydb/library/yql/public/udf/udf_string_ref.h>
+#include <ydb/library/yql/public/udf/udf_type_size_check.h>
+
+namespace NYql::NUdf {
+
+class TBlockItem {
+ enum class EMarkers : ui8 {
+ Empty = 0,
+ Present = 1,
+ };
+
+public:
+ TBlockItem() noexcept = default;
+ ~TBlockItem() noexcept = default;
+
+ TBlockItem(const TBlockItem& value) noexcept = default;
+ TBlockItem(TBlockItem&& value) noexcept = default;
+
+ TBlockItem& operator=(const TBlockItem& value) noexcept = default;
+ TBlockItem& operator=(TBlockItem&& value) noexcept = default;
+
+ template <typename T, typename = std::enable_if_t<TPrimitiveDataType<T>::Result>>
+ inline explicit TBlockItem(T value);
+
+ inline explicit TBlockItem(TStringRef value) {
+ Raw.String.Value = value.Data();
+ Raw.String.Size = value.Size();
+ Raw.Simple.Meta = static_cast<ui8>(EMarkers::Present);
+ }
+
+ inline explicit TBlockItem(const TBlockItem* tupleItems) {
+ Raw.Tuple.Value = tupleItems;
+ Raw.Simple.Meta = static_cast<ui8>(EMarkers::Present);
+ }
+
+ inline TBlockItem(ui64 low, ui64 high) {
+ Raw.Halfs[0] = low;
+ Raw.Halfs[1] = high;
+ }
+
+ inline ui64 Low() const {
+ return Raw.Halfs[0];
+ }
+
+ inline ui64 High() const {
+ return Raw.Halfs[1];
+ }
+
+ template <typename T, typename = std::enable_if_t<TPrimitiveDataType<T>::Result>>
+ inline T As() const;
+
+ inline const TBlockItem* AsTuple() const {
+ Y_VERIFY_DEBUG(Raw.GetMarkers() == EMarkers::Present);
+ return Raw.Tuple.Value;
+ }
+
+ inline TStringRef AsStringRef() const {
+ Y_VERIFY_DEBUG(Raw.GetMarkers() == EMarkers::Present);
+ return TStringRef(Raw.String.Value, Raw.String.Size);
+ }
+
+ inline TBlockItem MakeOptional() const
+ {
+ if (Raw.Simple.Meta)
+ return *this;
+
+ TBlockItem result(*this);
+ ++result.Raw.Simple.Count;
+ return result;
+ }
+
+ inline TBlockItem GetOptionalValue() const
+ {
+ if (Raw.Simple.Meta)
+ return *this;
+
+ Y_VERIFY_DEBUG(Raw.Simple.Count > 0U, "Can't get value from empty.");
+
+ TBlockItem result(*this);
+ --result.Raw.Simple.Count;
+ return result;
+ }
+
+ inline explicit operator bool() const { return bool(Raw); }
+private:
+ union TRaw {
+ ui64 Halfs[2] = {0, 0};
+ struct {
+ union {
+ #define FIELD(type) type type##_;
+ PRIMITIVE_VALUE_TYPES(FIELD);
+ #undef FIELD
+ ui64 Count;
+ };
+ union {
+ ui64 Pad;
+ struct {
+ ui8 Reserved[7];
+ ui8 Meta;
+ };
+ };
+ } Simple;
+
+ struct {
+ const char* Value;
+ ui32 Size;
+ } String;
+
+ struct {
+ // client should know tuple size
+ const TBlockItem* Value;
+ } Tuple;
+
+ EMarkers GetMarkers() const {
+ return static_cast<EMarkers>(Simple.Meta);
+ }
+
+ explicit operator bool() const { return Simple.Meta | Simple.Count; }
+ } Raw;
+};
+
+UDF_ASSERT_TYPE_SIZE(TBlockItem, 16);
+
+#define VALUE_GET(xType) \
+ template <> \
+ inline xType TBlockItem::As<xType>() const \
+ { \
+ Y_VERIFY_DEBUG(Raw.GetMarkers() == EMarkers::Present); \
+ return Raw.Simple.xType##_; \
+ }
+
+#define VALUE_CONSTR(xType) \
+ template <> \
+ inline TBlockItem::TBlockItem(xType value) \
+ { \
+ Raw.Simple.xType##_ = value; \
+ Raw.Simple.Meta = static_cast<ui8>(EMarkers::Present); \
+ }
+
+PRIMITIVE_VALUE_TYPES(VALUE_GET)
+PRIMITIVE_VALUE_TYPES(VALUE_CONSTR)
+
+#undef VALUE_GET
+#undef VALUE_CONSTR
+
+}
diff --git a/ydb/library/yql/public/udf/arrow/block_reader.cpp b/ydb/library/yql/public/udf/arrow/block_reader.cpp
new file mode 100644
index 0000000000..89d21389af
--- /dev/null
+++ b/ydb/library/yql/public/udf/arrow/block_reader.cpp
@@ -0,0 +1,7 @@
+#include "block_reader.h"
+
+namespace NYql {
+namespace NUdf {
+
+}
+}
diff --git a/ydb/library/yql/public/udf/arrow/block_reader.h b/ydb/library/yql/public/udf/arrow/block_reader.h
new file mode 100644
index 0000000000..f998b9c357
--- /dev/null
+++ b/ydb/library/yql/public/udf/arrow/block_reader.h
@@ -0,0 +1,236 @@
+#pragma once
+
+#include "block_item.h"
+#include "util.h"
+#include <arrow/datum.h>
+
+#include <ydb/library/yql/public/udf/udf_type_inspection.h>
+
+namespace NYql {
+namespace NUdf {
+
+class IBlockReader : private TNonCopyable {
+public:
+ virtual ~IBlockReader() = default;
+ // result will reference to Array/Scalar internals and will be valid until next call to GetItem/GetScalarItem
+ virtual TBlockItem GetItem(const arrow::ArrayData& data, size_t index) = 0;
+ virtual TBlockItem GetScalarItem(const arrow::Scalar& scalar) = 0;
+};
+
+template <typename T>
+class TFixedSizeBlockReader : public IBlockReader {
+public:
+ TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final {
+ if (IsNull(data, index)) {
+ return {};
+ }
+
+ return TBlockItem(data.GetValues<T>(1)[index]);
+ }
+
+ TBlockItem GetScalarItem(const arrow::Scalar& scalar) final {
+ if (!scalar.is_valid) {
+ return {};
+ }
+
+ return TBlockItem(*static_cast<const T*>(arrow::internal::checked_cast<const arrow::internal::PrimitiveScalarBase&>(scalar).data()));
+ }
+};
+
+template<typename TStringType>
+class TStringBlockReader : public IBlockReader {
+public:
+ using TOffset = typename TStringType::offset_type;
+
+ TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final {
+ Y_VERIFY_DEBUG(data.buffers.size() == 3);
+ if (IsNull(data, index)) {
+ return {};
+ }
+
+ const TOffset* offsets = data.GetValues<TOffset>(1);
+ const char* strData = data.GetValues<char>(2, 0);
+
+ std::string_view str(strData + offsets[index], offsets[index + 1] - offsets[index]);
+ return TBlockItem(str);
+ }
+
+ TBlockItem GetScalarItem(const arrow::Scalar& scalar) final {
+ if (!scalar.is_valid) {
+ return {};
+ }
+
+ auto buffer = arrow::internal::checked_cast<const arrow::BaseBinaryScalar&>(scalar).value;
+ std::string_view str(reinterpret_cast<const char*>(buffer->data()), buffer->size());
+ return TBlockItem(str);
+ }
+};
+
+class TTupleBlockReader : public IBlockReader {
+public:
+ TTupleBlockReader(TVector<std::unique_ptr<IBlockReader>>&& children)
+ : Children(std::move(children))
+ , Items(Children.size())
+ {}
+
+ TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final {
+ if (IsNull(data, index)) {
+ return {};
+ }
+
+ for (ui32 i = 0; i < Children.size(); ++i) {
+ Items[i] = Children[i]->GetItem(*data.child_data[i], index);
+ }
+
+ return TBlockItem(Items.data());
+ }
+
+ TBlockItem GetScalarItem(const arrow::Scalar& scalar) final {
+ if (!scalar.is_valid) {
+ return {};
+ }
+
+ const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(scalar);
+
+ for (ui32 i = 0; i < Children.size(); ++i) {
+ Items[i] = Children[i]->GetScalarItem(*structScalar.value[i]);
+ }
+
+ return TBlockItem(Items.data());
+ }
+
+private:
+ const TVector<std::unique_ptr<IBlockReader>> Children;
+ TVector<TBlockItem> Items;
+};
+
+class TExternalOptionalBlockReader : public IBlockReader {
+public:
+ TExternalOptionalBlockReader(std::unique_ptr<IBlockReader>&& inner)
+ : Inner(std::move(inner))
+ {}
+
+ TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final {
+ if (IsNull(data, index)) {
+ return {};
+ }
+
+ return Inner->GetItem(*data.child_data[0], index).MakeOptional();
+ }
+
+ TBlockItem GetScalarItem(const arrow::Scalar& scalar) final {
+ if (!scalar.is_valid) {
+ return {};
+ }
+
+ const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(scalar);
+ return Inner->GetScalarItem(*structScalar.value[0]).MakeOptional();
+ }
+
+private:
+ const std::unique_ptr<IBlockReader> Inner;
+};
+
+struct TReaderTraits {
+ using TResult = IBlockReader;
+ using TTuple = TTupleBlockReader;
+ template <typename T>
+ using TFixedSize = TFixedSizeBlockReader<T>;
+ template <typename TStringType>
+ using TStrings = TStringBlockReader<TStringType>;
+ using TExtOptional = TExternalOptionalBlockReader;
+};
+
+template <typename TTraits>
+std::unique_ptr<typename TTraits::TResult> MakeBlockReaderImpl(const ITypeInfoHelper& typeInfoHelper, const TType* type) {
+ const TType* unpacked = type;
+ TOptionalTypeInspector typeOpt(typeInfoHelper, type);
+ if (typeOpt) {
+ unpacked = typeOpt.GetItemType();
+ }
+
+ TOptionalTypeInspector unpackedOpt(typeInfoHelper, unpacked);
+ if (unpackedOpt) {
+ // at least 2 levels of optionals
+ ui32 nestLevel = 0;
+ auto currentType = type;
+ auto previousType = type;
+ for (;;) {
+ ++nestLevel;
+ previousType = currentType;
+ TOptionalTypeInspector currentOpt(typeInfoHelper, currentType);
+ currentType = currentOpt.GetItemType();
+ TOptionalTypeInspector nexOpt(typeInfoHelper, currentType);
+ if (!nexOpt) {
+ break;
+ }
+ }
+
+ auto reader = MakeBlockReaderImpl<TTraits>(typeInfoHelper, previousType);
+ for (ui32 i = 1; i < nestLevel; ++i) {
+ reader = std::make_unique<typename TTraits::TExtOptional>(std::move(reader));
+ }
+
+ return reader;
+ }
+ else {
+ type = unpacked;
+ }
+
+ TTupleTypeInspector typeTuple(typeInfoHelper, type);
+ if (typeTuple) {
+ TVector<std::unique_ptr<typename TTraits::TResult>> children;
+ for (ui32 i = 0; i < typeTuple.GetElementsCount(); ++i) {
+ children.emplace_back(MakeBlockReaderImpl<TTraits>(typeInfoHelper, typeTuple.GetElementType(i)));
+ }
+
+ return std::make_unique<typename TTraits::TTuple>(std::move(children));
+ }
+
+ TDataTypeInspector typeData(typeInfoHelper, type);
+ if (typeData) {
+ auto typeId = typeData.GetTypeId();
+ switch (GetDataSlot(typeId)) {
+ case NUdf::EDataSlot::Int8:
+ return std::make_unique<typename TTraits::template TFixedSize<i8>>();
+ case NUdf::EDataSlot::Bool:
+ case NUdf::EDataSlot::Uint8:
+ return std::make_unique<typename TTraits::template TFixedSize<ui8>>();
+ case NUdf::EDataSlot::Int16:
+ return std::make_unique<typename TTraits::template TFixedSize<i16>>();
+ case NUdf::EDataSlot::Uint16:
+ case NUdf::EDataSlot::Date:
+ return std::make_unique<typename TTraits::template TFixedSize<ui16>>();
+ case NUdf::EDataSlot::Int32:
+ return std::make_unique<typename TTraits::template TFixedSize<i32>>();
+ case NUdf::EDataSlot::Uint32:
+ case NUdf::EDataSlot::Datetime:
+ return std::make_unique<typename TTraits::template TFixedSize<ui32>>();
+ case NUdf::EDataSlot::Int64:
+ case NUdf::EDataSlot::Interval:
+ return std::make_unique<typename TTraits::template TFixedSize<i64>>();
+ case NUdf::EDataSlot::Uint64:
+ case NUdf::EDataSlot::Timestamp:
+ return std::make_unique<typename TTraits::template TFixedSize<ui64>>();
+ case NUdf::EDataSlot::Float:
+ return std::make_unique<typename TTraits::template TFixedSize<float>>();
+ case NUdf::EDataSlot::Double:
+ return std::make_unique<typename TTraits::template TFixedSize<double>>();
+ case NUdf::EDataSlot::String:
+ return std::make_unique<typename TTraits::template TStrings<arrow::BinaryType>>();
+ case NUdf::EDataSlot::Utf8:
+ return std::make_unique<typename TTraits::template TStrings<arrow::StringType>>();
+ default:
+ Y_ENSURE(false, "Unsupported data slot");
+ }
+ }
+
+ Y_ENSURE(false, "Unsupported type");
+}
+
+inline std::unique_ptr<IBlockReader> MakeBlockReader(const ITypeInfoHelper& typeInfoHelper, const TType* type) {
+ return MakeBlockReaderImpl<TReaderTraits>(typeInfoHelper, type);
+}
+
+}
+}
diff --git a/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h b/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h
index cfdc9030cf..5d90cc1380 100644
--- a/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h
+++ b/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h
@@ -8,6 +8,8 @@
#include "defs.h"
#include "util.h"
#include "args_dechunker.h"
+#include "block_reader.h"
+#include "block_builder.h"
#include <arrow/array/array_base.h>
#include <arrow/array/util.h>
@@ -21,44 +23,103 @@ namespace NUdf {
using TExec = arrow::Status(*)(arrow::compute::KernelContext*, const arrow::compute::ExecBatch&, arrow::Datum*);
-class TSimpleArrowUdfImpl : public TBoxedValue {
+class TUdfKernelState : public arrow::compute::KernelState {
public:
- TSimpleArrowUdfImpl(const TVector<std::shared_ptr<arrow::DataType>>& argTypes, bool onlyScalars, IArrowType::TPtr&& returnType,
- TExec exec, IFunctionTypeInfoBuilder& builder, const TString& name)
+ TUdfKernelState(const TVector<const TType*>& argTypes, const TType* outputType, bool onlyScalars, const ITypeInfoHelper::TPtr& typeInfoHelper)
: ArgTypes_(argTypes)
+ , OutputType_(outputType)
, OnlyScalars_(onlyScalars)
- , ReturnType_(std::move(returnType))
+ , TypeInfoHelper_(typeInfoHelper)
+ {
+ Readers_.resize(ArgTypes_.size());
+ Y_UNUSED(OutputType_);
+ }
+
+ IBlockReader& GetReader(ui32 index) {
+ if (!Readers_[index]) {
+ Readers_[index] = MakeBlockReader(*TypeInfoHelper_, ArgTypes_[index]);
+ }
+
+ return *Readers_[index];
+ }
+
+ IArrayBuilder& GetArrayBuilder() {
+ Y_ENSURE(!OnlyScalars_);
+ if (!ArrayBuilder_) {
+ ArrayBuilder_ = MakeArrayBuilder(*TypeInfoHelper_, OutputType_, *arrow::default_memory_pool(), TypeInfoHelper_->GetMaxBlockLength(OutputType_));
+ }
+
+ return *ArrayBuilder_;
+ }
+
+ IScalarBuilder& GetScalarBuilder() {
+ Y_ENSURE(OnlyScalars_);
+ if (!ScalarBuilder_) {
+ ScalarBuilder_ = MakeScalarBuilder(*TypeInfoHelper_, OutputType_);
+ }
+
+ return *ScalarBuilder_;
+ }
+
+private:
+ const TVector<const TType*> ArgTypes_;
+ const TType* OutputType_;
+ const bool OnlyScalars_;
+ const ITypeInfoHelper::TPtr TypeInfoHelper_;
+ TVector<std::unique_ptr<IBlockReader>> Readers_;
+ std::unique_ptr<IArrayBuilder> ArrayBuilder_;
+ std::unique_ptr<IScalarBuilder> ScalarBuilder_;
+};
+
+class TSimpleArrowUdfImpl : public TBoxedValue {
+public:
+ TSimpleArrowUdfImpl(const TVector<const TType*> argTypes, const TType* outputType, bool onlyScalars,
+ TExec exec, IFunctionTypeInfoBuilder& builder, const TString& name)
+ : OnlyScalars_(onlyScalars)
, Exec_(exec)
, Pos_(GetSourcePosition(builder))
, Name_(name)
, KernelContext_(&ExecContext_)
{
+ auto typeInfoHelper = builder.TypeInfoHelper();
Kernel_.null_handling = arrow::compute::NullHandling::COMPUTED_NO_PREALLOCATE;
Kernel_.exec = Exec_;
std::vector<arrow::compute::InputType> inTypes;
- for (const auto& t : ArgTypes_) {
- inTypes.emplace_back(t);
- ArgsValuesDescr_.emplace_back(t);
+ for (const auto& t : argTypes) {
+ auto arrowTypeHandle = typeInfoHelper->MakeArrowType(t);
+ Y_ENSURE(arrowTypeHandle);
+ ArrowSchema s;
+ arrowTypeHandle->Export(&s);
+ auto type = ARROW_RESULT(arrow::ImportType(&s));
+ ArgArrowTypes_.emplace_back(type);
+
+ inTypes.emplace_back(type);
+ ArgsValuesDescr_.emplace_back(type);
}
+ ReturnArrowTypeHandle_ = typeInfoHelper->MakeArrowType(outputType);
+ Y_ENSURE(ReturnArrowTypeHandle_);
+
ArrowSchema s;
- ReturnType_->Export(&s);
+ ReturnArrowTypeHandle_->Export(&s);
arrow::compute::OutputType outType = ARROW_RESULT(arrow::ImportType(&s));
Kernel_.signature = arrow::compute::KernelSignature::Make(std::move(inTypes), std::move(outType));
+ KernelState_ = std::make_unique<TUdfKernelState>(argTypes, outputType, onlyScalars, typeInfoHelper);
+ KernelContext_.SetState(KernelState_.get());
}
TUnboxedValue Run(const IValueBuilder* valueBuilder, const TUnboxedValuePod* args) const final {
try {
- TVector<arrow::Datum> argDatums(ArgTypes_.size());
- for (ui32 i = 0; i < ArgTypes_.size(); ++i) {
+ TVector<arrow::Datum> argDatums(ArgArrowTypes_.size());
+ for (ui32 i = 0; i < ArgArrowTypes_.size(); ++i) {
bool isScalar;
ui64 length;
ui32 chunkCount = valueBuilder->GetArrowBlockChunks(args[i], isScalar, length);
if (isScalar) {
ArrowArray a;
valueBuilder->ExportArrowBlock(args[i], 0, &a);
- auto arr = ARROW_RESULT(arrow::ImportArray(&a, ArgTypes_[i]));
+ auto arr = ARROW_RESULT(arrow::ImportArray(&a, ArgArrowTypes_[i]));
auto scalar = ARROW_RESULT(arr->GetScalar(0));
argDatums[i] = scalar;
} else {
@@ -66,14 +127,14 @@ public:
for (ui32 i = 0; i < chunkCount; ++i) {
ArrowArray a;
valueBuilder->ExportArrowBlock(args[i], i, &a);
- auto arr = ARROW_RESULT(arrow::ImportArray(&a, ArgTypes_[i]));
+ auto arr = ARROW_RESULT(arrow::ImportArray(&a, ArgArrowTypes_[i]));
imported[i] = arr;
}
if (chunkCount == 1) {
argDatums[i] = imported.front();
} else {
- argDatums[i] = ARROW_RESULT(arrow::ChunkedArray::Make(std::move(imported), ArgTypes_[i]));
+ argDatums[i] = ARROW_RESULT(arrow::ChunkedArray::Make(std::move(imported), ArgArrowTypes_[i]));
}
}
}
@@ -106,7 +167,7 @@ public:
auto arr = ARROW_RESULT(arrow::MakeArrayFromScalar(*res.scalar(), 1));
ArrowArray a;
ARROW_OK(arrow::ExportArray(*arr, &a));
- return valueBuilder->ImportArrowBlock(&a, 1, true, *ReturnType_);
+ return valueBuilder->ImportArrowBlock(&a, 1, true, *ReturnArrowTypeHandle_);
} else {
TVector<ArrowArray> a;
if (res.is_array()) {
@@ -120,7 +181,7 @@ public:
}
}
- return valueBuilder->ImportArrowBlock(a.data(), a.size(), false, *ReturnType_);
+ return valueBuilder->ImportArrowBlock(a.data(), a.size(), false, *ReturnArrowTypeHandle_);
}
} catch (const std::exception&) {
TStringBuilder sb;
@@ -132,17 +193,19 @@ public:
}
private:
- const TVector<std::shared_ptr<arrow::DataType>> ArgTypes_;
const bool OnlyScalars_;
- IArrowType::TPtr ReturnType_;
const TExec Exec_;
TSourcePosition Pos_;
const TString Name_;
+ TVector<std::shared_ptr<arrow::DataType>> ArgArrowTypes_;
+ IArrowType::TPtr ReturnArrowTypeHandle_;
+
arrow::compute::ExecContext ExecContext_;
mutable arrow::compute::KernelContext KernelContext_;
arrow::compute::ScalarKernel Kernel_;
std::vector<arrow::ValueDescr> ArgsValuesDescr_;
+ std::unique_ptr<TUdfKernelState> KernelState_;
};
inline void PrepareSimpleArrowUdf(IFunctionTypeInfoBuilder& builder, TType* signature, TType* userType, TExec exec, bool typesOnly,
@@ -177,12 +240,12 @@ inline void PrepareSimpleArrowUdf(IFunctionTypeInfoBuilder& builder, TType* sign
builder.UserType(userType);
Y_ENSURE(hasBlocks);
- TVector<std::shared_ptr<arrow::DataType>> argTypes;
+ TVector<const TType*> argTypes;
auto argsBuilder = builder.Args(callableInspector.GetArgsCount());
for (ui32 i = 0; i < argsInspector.GetElementsCount(); ++i) {
TBlockTypeInspector blockInspector(*typeInfoHelper, argsInspector.GetElementType(i));
- auto initalType = callableInspector.GetArgType(i);
- argsBuilder->Add(builder.Block(blockInspector.IsScalar())->Item(initalType).Build());
+ auto type = callableInspector.GetArgType(i);
+ argsBuilder->Add(builder.Block(blockInspector.IsScalar())->Item(type).Build());
if (callableInspector.GetArgumentName(i).Size() > 0) {
argsBuilder->Name(callableInspector.GetArgumentName(i));
}
@@ -191,11 +254,6 @@ inline void PrepareSimpleArrowUdf(IFunctionTypeInfoBuilder& builder, TType* sign
argsBuilder->Flags(callableInspector.GetArgumentFlags(i));
}
- auto arrowTypeHandle = typeInfoHelper->MakeArrowType(initalType);
- Y_ENSURE(arrowTypeHandle);
- ArrowSchema s;
- arrowTypeHandle->Export(&s);
- auto type = ARROW_RESULT(arrow::ImportType(&s));
argTypes.emplace_back(type);
}
@@ -209,12 +267,40 @@ inline void PrepareSimpleArrowUdf(IFunctionTypeInfoBuilder& builder, TType* sign
}
if (!typesOnly) {
- auto returnType = typeInfoHelper->MakeArrowType(callableInspector.GetReturnType());
- Y_ENSURE(returnType);
- builder.Implementation(new TSimpleArrowUdfImpl(argTypes, onlyScalars, std::move(returnType), exec, builder, name));
+ builder.Implementation(new TSimpleArrowUdfImpl(argTypes, callableInspector.GetReturnType(), onlyScalars, exec, builder, name));
}
}
+template <typename TDerived>
+struct TUnaryKernelExec {
+ static arrow::Status Do(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) {
+ auto& state = dynamic_cast<TUdfKernelState&>(*ctx->state());
+ auto& reader = state.GetReader(0);
+ const auto& arg = batch.values[0];
+ if (arg.is_scalar()) {
+ auto& builder = state.GetScalarBuilder();
+ auto item = reader.GetScalarItem(*arg.scalar());
+ TDerived::Process(item, [&](TBlockItem out) {
+ *res = builder.Build(out);
+ });
+ }
+ else {
+ auto& array = *arg.array();
+ auto& builder = state.GetArrayBuilder();
+ for (int64_t i = 0; i < array.length; ++i) {
+ auto item = reader.GetItem(array, i);
+ TDerived::Process(item, [&](TBlockItem out) {
+ builder.Add(out);
+ });
+ }
+
+ *res = builder.Build(false);
+ }
+
+ return arrow::Status::OK();
+ }
+};
+
}
}
diff --git a/ydb/library/yql/public/udf/arrow/util.cpp b/ydb/library/yql/public/udf/arrow/util.cpp
index d8b8c6b6bb..6544018f5b 100644
--- a/ydb/library/yql/public/udf/arrow/util.cpp
+++ b/ydb/library/yql/public/udf/arrow/util.cpp
@@ -1,4 +1,5 @@
#include "util.h"
+#include "bit_util.h"
#include "defs.h"
#include <arrow/array/array_base.h>
@@ -7,6 +8,20 @@
namespace NYql {
namespace NUdf {
+std::shared_ptr<arrow::Buffer> AllocateBitmapWithReserve(size_t bitCount, arrow::MemoryPool* pool) {
+ // align up to 64 bit
+ bitCount = (bitCount + 63u) & ~size_t(63u);
+ // this simplifies code compression code - we can write single 64 bit word after array boundaries
+ bitCount += 64;
+ return ARROW_RESULT(arrow::AllocateBitmap(bitCount, pool));
+}
+
+std::shared_ptr<arrow::Buffer> MakeDenseBitmap(const ui8* srcSparse, size_t len, arrow::MemoryPool* pool) {
+ auto bitmap = AllocateBitmapWithReserve(len, pool);
+ CompressSparseBitmap(bitmap->mutable_data(), srcSparse, len);
+ return bitmap;
+}
+
std::shared_ptr<arrow::ArrayData> DeepSlice(const std::shared_ptr<arrow::ArrayData>& data, size_t offset, size_t len) {
Y_ENSURE(data->length >= 0);
Y_ENSURE(offset + len <= (size_t)data->length);
diff --git a/ydb/library/yql/public/udf/arrow/util.h b/ydb/library/yql/public/udf/arrow/util.h
index ec8a0ebb11..07accc005e 100644
--- a/ydb/library/yql/public/udf/arrow/util.h
+++ b/ydb/library/yql/public/udf/arrow/util.h
@@ -1,14 +1,21 @@
#pragma once
+#include "defs.h"
+
#include <util/generic/vector.h>
+#include <arrow/buffer_builder.h>
#include <arrow/datum.h>
+#include <arrow/util/bit_util.h>
#include <functional>
namespace NYql {
namespace NUdf {
+std::shared_ptr<arrow::Buffer> AllocateBitmapWithReserve(size_t bitCount, arrow::MemoryPool* pool);
+std::shared_ptr<arrow::Buffer> MakeDenseBitmap(const ui8* srcSparse, size_t len, arrow::MemoryPool* pool);
+
/// \brief Recursive version of ArrayData::Slice() method
std::shared_ptr<arrow::ArrayData> DeepSlice(const std::shared_ptr<arrow::ArrayData>& data, size_t offset, size_t len);
@@ -18,5 +25,69 @@ std::shared_ptr<arrow::ArrayData> Chop(std::shared_ptr<arrow::ArrayData>& data,
void ForEachArrayData(const arrow::Datum& datum, const std::function<void(const std::shared_ptr<arrow::ArrayData>&)>& func);
arrow::Datum MakeArray(const TVector<std::shared_ptr<arrow::ArrayData>>& chunks);
+inline bool IsNull(const arrow::ArrayData& data, size_t index) {
+ return data.GetNullCount() > 0 && !arrow::BitUtil::GetBit(data.GetValues<uint8_t>(0, 0), index + data.offset);
+}
+
+// similar to arrow::TypedBufferBuilder, but with UnsafeAdvance() method
+// and shrinkToFit = false
+template<typename T>
+class TTypedBufferBuilder {
+ static_assert(std::is_pod_v<T>);
+ static_assert(!std::is_same_v<T, bool>);
+public:
+ explicit TTypedBufferBuilder(arrow::MemoryPool* pool)
+ : Builder(pool)
+ {
+ }
+
+ inline void Reserve(size_t size) {
+ ARROW_OK(Builder.Reserve(size * sizeof(T)));
+ }
+
+ inline size_t Length() const {
+ return Builder.length() / sizeof(T);
+ }
+
+ inline T* MutableData() {
+ return reinterpret_cast<T*>(Builder.mutable_data());
+ }
+
+ inline T* End() {
+ return MutableData() + Length();
+ }
+
+ inline const T* Data() const {
+ return reinterpret_cast<const T*>(Builder.data());
+ }
+
+ inline void UnsafeAppend(const T* values, size_t count) {
+ std::memcpy(End(), values, count * sizeof(T));
+ UnsafeAdvance(count);
+ }
+
+ inline void UnsafeAppend(size_t count, const T& value) {
+ T* target = End();
+ std::fill(target, target + count, value);
+ UnsafeAdvance(count);
+ }
+
+ inline void UnsafeAppend(T&& value) {
+ *End() = std::move(value);
+ UnsafeAdvance(1);
+ }
+
+ inline void UnsafeAdvance(size_t count) {
+ Builder.UnsafeAdvance(count * sizeof(T));
+ }
+
+ inline std::shared_ptr<arrow::Buffer> Finish() {
+ bool shrinkToFit = false;
+ return ARROW_RESULT(Builder.Finish(shrinkToFit));
+ }
+private:
+ arrow::BufferBuilder Builder;
+};
+
}
}
diff --git a/ydb/library/yql/public/udf/udf_types.h b/ydb/library/yql/public/udf/udf_types.h
index 5fee361e3d..cc31f536c4 100644
--- a/ydb/library/yql/public/udf/udf_types.h
+++ b/ydb/library/yql/public/udf/udf_types.h
@@ -270,6 +270,7 @@ private:
}
};
+#if UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 25)
class ITypeInfoHelper2 : public ITypeInfoHelper1 {
public:
using TPtr = TRefCountedPtr<ITypeInfoHelper2>;
@@ -277,6 +278,7 @@ public:
public:
virtual const TPgTypeDescription* FindPgTypeDescription(ui32 typeId) const = 0;
};
+#endif
//////////////////////////////////////////////////////////////////////////////
// IArrowType
@@ -293,6 +295,7 @@ public:
UDF_ASSERT_TYPE_SIZE(IArrowType, 8);
+#if UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 26)
class ITypeInfoHelper3 : public ITypeInfoHelper2 {
public:
using TPtr = TRefCountedPtr<ITypeInfoHelper3>;
@@ -303,8 +306,22 @@ public:
// The given ArrowSchema struct is released, even if this function fails.
virtual IArrowType::TPtr ImportArrowType(ArrowSchema* schema) const = 0;
};
+#endif
-#if UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 26)
+#if UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 29)
+class ITypeInfoHelper4 : public ITypeInfoHelper3 {
+public:
+ using TPtr = TRefCountedPtr<ITypeInfoHelper4>;
+
+public:
+ virtual ui64 GetMaxBlockLength(const TType* type) const = 0;
+ virtual ui64 GetMaxBlockBytes() const = 0;
+};
+#endif
+
+#if UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 29)
+using ITypeInfoHelper = ITypeInfoHelper4;
+#elif UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 26)
using ITypeInfoHelper = ITypeInfoHelper3;
#elif UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 25)
using ITypeInfoHelper = ITypeInfoHelper2;
diff --git a/ydb/library/yql/public/udf/udf_version.h b/ydb/library/yql/public/udf/udf_version.h
index 303a707147..52cb0f27b6 100644
--- a/ydb/library/yql/public/udf/udf_version.h
+++ b/ydb/library/yql/public/udf/udf_version.h
@@ -7,7 +7,7 @@ namespace NYql {
namespace NUdf {
#define CURRENT_UDF_ABI_VERSION_MAJOR 2
-#define CURRENT_UDF_ABI_VERSION_MINOR 28
+#define CURRENT_UDF_ABI_VERSION_MINOR 29
#define CURRENT_UDF_ABI_VERSION_PATCH 0
#ifdef USE_CURRENT_UDF_ABI_VERSION
diff --git a/ydb/library/yql/udfs/common/url_base/CMakeLists.darwin.txt b/ydb/library/yql/udfs/common/url_base/CMakeLists.darwin.txt
index a6ac71608b..d6161089ba 100644
--- a/ydb/library/yql/udfs/common/url_base/CMakeLists.darwin.txt
+++ b/ydb/library/yql/udfs/common/url_base/CMakeLists.darwin.txt
@@ -20,7 +20,7 @@ target_link_libraries(url_udf INTERFACE
add_global_library_for(url_udf.global url_udf)
target_compile_options(url_udf.global PRIVATE
-DUDF_ABI_VERSION_MAJOR=2
- -DUDF_ABI_VERSION_MINOR=28
+ -DUDF_ABI_VERSION_MINOR=29
-DUDF_ABI_VERSION_PATCH=0
)
target_link_libraries(url_udf.global PUBLIC
diff --git a/ydb/library/yql/udfs/common/url_base/CMakeLists.linux-aarch64.txt b/ydb/library/yql/udfs/common/url_base/CMakeLists.linux-aarch64.txt
index ad561a195c..dc4b9a0dba 100644
--- a/ydb/library/yql/udfs/common/url_base/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/udfs/common/url_base/CMakeLists.linux-aarch64.txt
@@ -21,7 +21,7 @@ target_link_libraries(url_udf INTERFACE
add_global_library_for(url_udf.global url_udf)
target_compile_options(url_udf.global PRIVATE
-DUDF_ABI_VERSION_MAJOR=2
- -DUDF_ABI_VERSION_MINOR=28
+ -DUDF_ABI_VERSION_MINOR=29
-DUDF_ABI_VERSION_PATCH=0
)
target_link_libraries(url_udf.global PUBLIC
diff --git a/ydb/library/yql/udfs/common/url_base/CMakeLists.linux.txt b/ydb/library/yql/udfs/common/url_base/CMakeLists.linux.txt
index ad561a195c..dc4b9a0dba 100644
--- a/ydb/library/yql/udfs/common/url_base/CMakeLists.linux.txt
+++ b/ydb/library/yql/udfs/common/url_base/CMakeLists.linux.txt
@@ -21,7 +21,7 @@ target_link_libraries(url_udf INTERFACE
add_global_library_for(url_udf.global url_udf)
target_compile_options(url_udf.global PRIVATE
-DUDF_ABI_VERSION_MAJOR=2
- -DUDF_ABI_VERSION_MINOR=28
+ -DUDF_ABI_VERSION_MINOR=29
-DUDF_ABI_VERSION_PATCH=0
)
target_link_libraries(url_udf.global PUBLIC
diff --git a/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.darwin.txt b/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.darwin.txt
index c56c96c1fb..cf9f8d6bda 100644
--- a/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.darwin.txt
+++ b/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.darwin.txt
@@ -10,7 +10,7 @@
add_library(common-url_base-lib)
target_compile_options(common-url_base-lib PRIVATE
-DUDF_ABI_VERSION_MAJOR=2
- -DUDF_ABI_VERSION_MINOR=28
+ -DUDF_ABI_VERSION_MINOR=29
-DUDF_ABI_VERSION_PATCH=0
)
target_link_libraries(common-url_base-lib PUBLIC
diff --git a/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.linux-aarch64.txt b/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.linux-aarch64.txt
index 8346301c6c..cdc5b23fcb 100644
--- a/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.linux-aarch64.txt
@@ -10,7 +10,7 @@
add_library(common-url_base-lib)
target_compile_options(common-url_base-lib PRIVATE
-DUDF_ABI_VERSION_MAJOR=2
- -DUDF_ABI_VERSION_MINOR=28
+ -DUDF_ABI_VERSION_MINOR=29
-DUDF_ABI_VERSION_PATCH=0
)
target_link_libraries(common-url_base-lib PUBLIC
diff --git a/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.linux.txt b/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.linux.txt
index 8346301c6c..cdc5b23fcb 100644
--- a/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.linux.txt
+++ b/ydb/library/yql/udfs/common/url_base/lib/CMakeLists.linux.txt
@@ -10,7 +10,7 @@
add_library(common-url_base-lib)
target_compile_options(common-url_base-lib PRIVATE
-DUDF_ABI_VERSION_MAJOR=2
- -DUDF_ABI_VERSION_MINOR=28
+ -DUDF_ABI_VERSION_MINOR=29
-DUDF_ABI_VERSION_PATCH=0
)
target_link_libraries(common-url_base-lib PUBLIC
diff --git a/ydb/library/yql/udfs/common/url_base/lib/url_base_udf.h b/ydb/library/yql/udfs/common/url_base/lib/url_base_udf.h
index 5fcf17c914..b2c74915d9 100644
--- a/ydb/library/yql/udfs/common/url_base/lib/url_base_udf.h
+++ b/ydb/library/yql/udfs/common/url_base/lib/url_base_udf.h
@@ -41,12 +41,6 @@ SIMPLE_UDF(TGetScheme, char*(TAutoMap<char*>)) {
return valueBuilder->SubString(args[0], std::distance(url.begin(), prefix.begin()), prefix.size());
}
-inline arrow::Status GetHostKernelExec(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) {
- Y_UNUSED(ctx);
- *res = batch.values[0];
- return arrow::Status::OK();
-}
-
BEGIN_SIMPLE_ARROW_UDF(TGetHost, TOptional<char*>(TOptional<char*>)) {
EMPTY_RESULT_ON_EMPTY_ARG(0);
const std::string_view url(args[0].AsStringRef());
@@ -55,7 +49,24 @@ BEGIN_SIMPLE_ARROW_UDF(TGetHost, TOptional<char*>(TOptional<char*>)) {
valueBuilder->SubString(args[0], std::distance(url.begin(), host.begin()), host.size());
}
-END_SIMPLE_ARROW_UDF(TGetHost, GetHostKernelExec);
+struct TGetHostKernelExec : public TUnaryKernelExec<TGetHostKernelExec> {
+ template <typename TSink>
+ static void Process(TBlockItem arg, const TSink& sink) {
+ if (!arg) {
+ return sink(TBlockItem());
+ }
+
+ const std::string_view url(arg.AsStringRef());
+ const std::string_view host(GetOnlyHost(url));
+ if (host.empty()) {
+ return sink(TBlockItem());
+ }
+
+ sink(TBlockItem(TStringRef(host)));
+ }
+};
+
+END_SIMPLE_ARROW_UDF(TGetHost, TGetHostKernelExec::Do);
SIMPLE_UDF(TGetHostPort, TOptional<char*>(TOptional<char*>)) {
EMPTY_RESULT_ON_EMPTY_ARG(0);