aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorziganshinmr <ziganshinmr@yandex-team.com>2025-01-10 17:23:43 +0300
committerziganshinmr <ziganshinmr@yandex-team.com>2025-01-10 17:39:29 +0300
commitf3443af4eb64d5b450497b61625932d66a27dfc8 (patch)
tree3559af21c163f1a2ef04a21fcbb94b2ca0bc0b20
parent9729f7ea47301feb1fa702b0aa3ab98198c5e3a2 (diff)
downloadydb-f3443af4eb64d5b450497b61625932d66a27dfc8.tar.gz
Block trimmer and its usage in BlockMapJoinCore
commit_hash:568373541db82f01bd26ce36651f8dbb92a007e1
-rw-r--r--yql/essentials/minikql/arrow/arrow_util.h2
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp54
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp71
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.cpp35
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.h4
-rw-r--r--yql/essentials/minikql/computation/mkql_block_trimmer.cpp251
-rw-r--r--yql/essentials/minikql/computation/mkql_block_trimmer.h21
-rw-r--r--yql/essentials/minikql/computation/mkql_block_trimmer_ut.cpp373
-rw-r--r--yql/essentials/minikql/computation/ut/ya.make.inc1
-rw-r--r--yql/essentials/minikql/computation/ya.make2
-rw-r--r--yql/essentials/minikql/computation/ya.make.inc1
-rw-r--r--yql/essentials/public/udf/arrow/bit_util.h25
-rw-r--r--yql/essentials/public/udf/arrow/block_reader.h99
-rw-r--r--yql/essentials/public/udf/arrow/ut/bit_util_ut.cpp41
-rw-r--r--yql/essentials/public/udf/arrow/ut/ya.make2
-rw-r--r--yql/essentials/public/udf/arrow/util.cpp12
-rw-r--r--yql/essentials/public/udf/arrow/util.h3
17 files changed, 917 insertions, 80 deletions
diff --git a/yql/essentials/minikql/arrow/arrow_util.h b/yql/essentials/minikql/arrow/arrow_util.h
index efb071ff71..083ca37141 100644
--- a/yql/essentials/minikql/arrow/arrow_util.h
+++ b/yql/essentials/minikql/arrow/arrow_util.h
@@ -22,6 +22,8 @@ std::shared_ptr<arrow::ArrayData> Unwrap(const arrow::ArrayData& data, TType* it
using NYql::NUdf::AllocateBitmapWithReserve;
using NYql::NUdf::MakeDenseBitmap;
+using NYql::NUdf::MakeDenseBitmapCopy;
+using NYql::NUdf::MakeDenseFalseBitmap;
inline arrow::internal::Bitmap GetBitmap(const arrow::ArrayData& arr, int index) {
return arrow::internal::Bitmap{ arr.buffers[index], arr.offset, arr.length };
diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp
index 827e295db6..847e4409b6 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp
@@ -3,6 +3,7 @@
#include <yql/essentials/minikql/computation/mkql_block_builder.h>
#include <yql/essentials/minikql/computation/mkql_block_impl.h>
#include <yql/essentials/minikql/computation/mkql_block_reader.h>
+#include <yql/essentials/minikql/computation/mkql_block_trimmer.h>
#include <yql/essentials/minikql/computation/mkql_computation_node_holders_codegen.h>
#include <yql/essentials/minikql/comp_nodes/mkql_rh_hash.h>
#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
@@ -329,12 +330,12 @@ public:
public:
TIterator() = default;
- TIterator(TBlockIndex* blockIndex)
+ TIterator(const TBlockIndex* blockIndex)
: Type_(EIteratorType::EMPTY)
, BlockIndex_(blockIndex)
{}
- TIterator(TBlockIndex* blockIndex, TIndexEntry entry, std::vector<NYql::NUdf::TBlockItem> itemsToLookup)
+ TIterator(const TBlockIndex* blockIndex, TIndexEntry entry, std::vector<NYql::NUdf::TBlockItem> itemsToLookup)
: Type_(EIteratorType::INPLACE)
, BlockIndex_(blockIndex)
, Entry_(entry)
@@ -342,7 +343,7 @@ public:
, ItemsToLookup_(std::move(itemsToLookup))
{}
- TIterator(TBlockIndex* blockIndex, TIndexNode* node, std::vector<NYql::NUdf::TBlockItem> itemsToLookup)
+ TIterator(const TBlockIndex* blockIndex, TIndexNode* node, std::vector<NYql::NUdf::TBlockItem> itemsToLookup)
: Type_(EIteratorType::LIST)
, BlockIndex_(blockIndex)
, Node_(node)
@@ -432,7 +433,7 @@ public:
private:
EIteratorType Type_;
- TBlockIndex* BlockIndex_ = nullptr;
+ const TBlockIndex* BlockIndex_ = nullptr;
union {
TIndexNode* Node_;
@@ -451,7 +452,8 @@ public:
const TVector<TType*>& itemTypes,
const TVector<ui32>& keyColumns,
NUdf::TUnboxedValue stream,
- bool any
+ bool any,
+ arrow::MemoryPool* pool
)
: TBase(memInfo)
, InputsDescr_(ToValueDescr(itemTypes))
@@ -466,6 +468,7 @@ public:
Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType));
Hashers_.push_back(helper.MakeHasher(blockItemType));
Comparators_.push_back(helper.MakeComparator(blockItemType));
+ Trimmers_.push_back(MakeBlockTrimmer(TTypeInfoHelper(), blockItemType, pool));
}
}
@@ -484,7 +487,12 @@ public:
for (size_t i = 0; i < Inputs_.size() - 1; i++) {
auto& datum = TArrowBlock::From(Inputs_[i]).GetDatum();
ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[i], datum.descr());
- block.push_back(std::move(datum));
+ if (datum.is_scalar()) {
+ block.push_back(datum);
+ } else {
+ MKQL_ENSURE(datum.is_array(), "Expecting array");
+ block.push_back(Trimmers_[i]->Trim(datum.array()));
+ }
}
Data_.push_back(std::move(block));
}
@@ -565,7 +573,7 @@ public:
return;
}
- auto value = static_cast<TIndexMapValue*>(Index_.GetMutablePayload(iter));
+ auto value = static_cast<const TIndexMapValue*>(Index_.GetPayload(iter));
if (value->IsInplace()) {
iterators[i] = TIterator(this, value->GetEntry(), std::move(itemsBatch[i]));
} else {
@@ -574,23 +582,20 @@ public:
});
}
- TBlockItem GetItem(TIndexEntry entry, ui32 columnIdx) {
+ TBlockItem GetItem(TIndexEntry entry, ui32 columnIdx) const {
Y_ENSURE(entry.BlockOffset < Data_.size());
Y_ENSURE(columnIdx < Inputs_.size() - 1);
-
- auto& datum = Data_[entry.BlockOffset][columnIdx];
- MKQL_ENSURE(datum.is_array(), "Expecting array");
- return Readers_[columnIdx]->GetItem(*datum.array(), entry.ItemOffset);
+ return GetItemFromBlock(Data_[entry.BlockOffset], columnIdx, entry.ItemOffset);
}
- void GetRow(TIndexEntry entry, const TVector<ui32>& ioMap, std::vector<NYql::NUdf::TBlockItem>& row) {
+ void GetRow(TIndexEntry entry, const TVector<ui32>& ioMap, std::vector<NYql::NUdf::TBlockItem>& row) const {
Y_ENSURE(row.size() == ioMap.size());
for (size_t i = 0; i < row.size(); i++) {
row[i] = GetItem(entry, ioMap[i]);
}
}
- bool IsKeyEquals(TIndexEntry entry, const std::vector<NYql::NUdf::TBlockItem>& keyItems) {
+ bool IsKeyEquals(TIndexEntry entry, const std::vector<NYql::NUdf::TBlockItem>& keyItems) const {
Y_ENSURE(keyItems.size() == KeyColumns_.size());
for (size_t i = 0; i < KeyColumns_.size(); i++) {
auto indexItem = GetItem(entry, KeyColumns_[i]);
@@ -607,10 +612,7 @@ private:
ui64 keyHash = 0;
keyItems.clear();
for (ui32 keyColumn : KeyColumns_) {
- auto& datum = block[keyColumn];
- MKQL_ENSURE(datum.is_array(), "Expecting array");
-
- auto item = Readers_[keyColumn]->GetItem(*datum.array(), offset);
+ auto item = GetItemFromBlock(block, keyColumn, offset);
if (!item) {
keyItems.clear();
return 0;
@@ -623,11 +625,21 @@ private:
return keyHash;
}
+ TBlockItem GetItemFromBlock(const std::vector<arrow::Datum>& block, ui32 columnIdx, size_t offset) const {
+ const auto& datum = block[columnIdx];
+ if (datum.is_scalar()) {
+ return Readers_[columnIdx]->GetScalarItem(*datum.scalar());
+ } else {
+ MKQL_ENSURE(datum.is_array(), "Expecting array");
+ return Readers_[columnIdx]->GetItem(*datum.array(), offset);
+ }
+ }
+
TIndexNode* InsertIndexNode(TIndexEntry entry, TIndexNode* currentHead = nullptr) {
return &IndexNodes_.emplace_back(entry, currentHead);
}
- bool ContainsKey(const TIndexMapValue* chain, const std::vector<NYql::NUdf::TBlockItem>& keyItems) {
+ bool ContainsKey(const TIndexMapValue* chain, const std::vector<NYql::NUdf::TBlockItem>& keyItems) const {
if (chain->IsInplace()) {
return IsKeyEquals(chain->GetEntry(), keyItems);
} else {
@@ -650,6 +662,7 @@ private:
TVector<std::unique_ptr<IBlockReader>> Readers_;
TVector<NUdf::IBlockItemHasher::TPtr> Hashers_;
TVector<NUdf::IBlockItemComparator::TPtr> Comparators_;
+ TVector<IBlockTrimmer::TPtr> Trimmers_;
std::vector<std::vector<arrow::Datum>> Data_;
@@ -705,7 +718,8 @@ public:
RightItemTypes_,
RightKeyColumns_,
std::move(RightStream_->GetValue(ctx)),
- RightAny
+ RightAny,
+ &ctx.ArrowMemoryPool
);
return ctx.HolderFactory.Create<TStreamValue>(ctx.HolderFactory,
diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp
index 214b7ae8ff..ca710d0328 100644
--- a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp
+++ b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp
@@ -96,19 +96,19 @@ TRuntimeNode BuildBlockJoin(TProgramBuilder& pgmBuilder, EJoinKind joinKind,
NUdf::TUnboxedValue DoTestBlockJoin(TSetup<false>& setup,
TType* leftType, NUdf::TUnboxedValue&& leftListValue, const TVector<ui32>& leftKeyColumns, const TVector<ui32>& leftKeyDrops,
TType* rightType, NUdf::TUnboxedValue&& rightListValue, const TVector<ui32>& rightKeyColumns, const TVector<ui32>& rightKeyDrops, bool rightAny,
- EJoinKind joinKind, size_t blockSize
+ EJoinKind joinKind, size_t blockSize, bool scalar
) {
TProgramBuilder& pb = *setup.PgmBuilder;
Y_ENSURE(leftType->IsList(), "Left node has to be list");
const auto leftItemType = AS_TYPE(TListType, leftType)->GetItemType();
Y_ENSURE(leftItemType->IsTuple(), "List item has to be tuple");
- TType* leftBlockType = MakeBlockTupleType(pb, leftItemType);
+ TType* leftBlockType = MakeBlockTupleType(pb, leftItemType, scalar);
Y_ENSURE(rightType->IsList(), "Right node has to be list");
const auto rightItemType = AS_TYPE(TListType, rightType)->GetItemType();
Y_ENSURE(rightItemType->IsTuple(), "Right item has to be tuple");
- TType* rightBlockType = MakeBlockTupleType(pb, rightItemType);
+ TType* rightBlockType = MakeBlockTupleType(pb, rightItemType, scalar);
TRuntimeNode leftList = pb.Arg(pb.NewListType(leftBlockType));
TRuntimeNode rightList = pb.Arg(pb.NewListType(rightBlockType));
@@ -122,8 +122,18 @@ NUdf::TUnboxedValue DoTestBlockJoin(TSetup<false>& setup,
const auto graph = setup.BuildGraph(joinNode, {leftList.GetNode(), rightList.GetNode()});
auto& ctx = graph->GetContext();
- graph->GetEntryPoint(0, true)->SetValue(ctx, ToBlocks(ctx, blockSize, AS_TYPE(TTupleType, leftItemType)->GetElements(), std::move(leftListValue)));
- graph->GetEntryPoint(1, true)->SetValue(ctx, ToBlocks(ctx, blockSize, AS_TYPE(TTupleType, rightItemType)->GetElements(), std::move(rightListValue)));
+
+ NUdf::TUnboxedValuePod leftBlockListValue, rightBlockListValue;
+ if (scalar) {
+ leftBlockListValue = MakeUint64ScalarBlock(ctx, blockSize, AS_TYPE(TTupleType, leftItemType)->GetElements(), std::move(leftListValue));
+ rightBlockListValue = MakeUint64ScalarBlock(ctx, blockSize, AS_TYPE(TTupleType, rightItemType)->GetElements(), std::move(rightListValue));
+ } else {
+ leftBlockListValue = ToBlocks(ctx, blockSize, AS_TYPE(TTupleType, leftItemType)->GetElements(), std::move(leftListValue));
+ rightBlockListValue = ToBlocks(ctx, blockSize, AS_TYPE(TTupleType, rightItemType)->GetElements(), std::move(rightListValue));
+ }
+
+ graph->GetEntryPoint(0, true)->SetValue(ctx, leftBlockListValue);
+ graph->GetEntryPoint(1, true)->SetValue(ctx, rightBlockListValue);
return FromBlocks(ctx, AS_TYPE(TTupleType, joinItemType)->GetElements(), graph->GetValue());
}
@@ -131,14 +141,15 @@ void RunTestBlockJoin(TSetup<false>& setup, EJoinKind joinKind,
TType* expectedType, const NUdf::TUnboxedValue& expected,
TType* leftType, NUdf::TUnboxedValue&& leftListValue, const TVector<ui32>& leftKeyColumns,
TType* rightType, NUdf::TUnboxedValue&& rightListValue, const TVector<ui32>& rightKeyColumns,
- const TVector<ui32>& leftKeyDrops = {}, const TVector<ui32>& rightKeyDrops = {}, bool rightAny = false
+ const TVector<ui32>& leftKeyDrops = {}, const TVector<ui32>& rightKeyDrops = {},
+ bool rightAny = false, bool scalar = false
) {
const size_t testSize = leftListValue.GetListLength();
for (size_t blockSize = 1; blockSize <= testSize; blockSize <<= 1) {
const auto got = DoTestBlockJoin(setup,
leftType, std::move(leftListValue), leftKeyColumns, leftKeyDrops,
rightType, std::move(rightListValue), rightKeyColumns, rightKeyDrops, rightAny,
- joinKind, blockSize
+ joinKind, blockSize, scalar
);
CompareResults(expectedType, expected, got);
}
@@ -685,6 +696,52 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinTestBasic) {
);
}
+ Y_UNIT_TEST(TestScalar) {
+ TSetup<false> setup(GetNodeFactory());
+ const size_t testSize = 1 << 7;
+
+ // 1. Make input for the "left" stream.
+ TVector<ui64> leftKeyInit(testSize, 1);
+ TVector<ui64> leftSubkeyInit(testSize, 2);
+ TVector<ui64> leftValueInit(testSize, 3);
+
+ // 2. Make input for the "right" stream.
+ TVector<ui64> rightKeyInit(testSize, 1);
+ TVector<ui64> rightValueInit(testSize, 2);
+
+ // 3. Make "expected" data.
+ TMultiMap<ui64, ui64> rightMap;
+ for (size_t i = 0; i < testSize; i++) {
+ rightMap.insert({rightKeyInit[i], rightValueInit[i]});
+ }
+ TVector<ui64> expectedKey;
+ TVector<ui64> expectedSubkey;
+ TVector<ui64> expectedValue;
+ TVector<ui64> expectedRightValue;
+ for (size_t i = 0; i < testSize; i++) {
+ const auto& [begin, end] = rightMap.equal_range(leftKeyInit[i]);
+ for (auto it = begin; it != end; it++) {
+ expectedKey.push_back(leftKeyInit[i]);
+ expectedSubkey.push_back(leftSubkeyInit[i]);
+ expectedValue.push_back(leftValueInit[i]);
+ expectedRightValue.push_back(it->second);
+ }
+ }
+
+ auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ leftKeyInit, leftSubkeyInit, leftValueInit);
+ auto [rightType, rightList] = ConvertVectorsToTuples(setup,
+ rightKeyInit, rightValueInit);
+ auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ expectedKey, expectedSubkey, expectedValue, expectedRightValue);
+
+ RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected,
+ leftType, std::move(leftList), {0},
+ rightType, std::move(rightList), {0},
+ {}, {0}, false, true
+ );
+ }
+
} // Y_UNIT_TEST_SUITE
Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinTestOptional) {
diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.cpp
index 2a65da92ed..fdd65bbba2 100644
--- a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.cpp
+++ b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.cpp
@@ -137,7 +137,7 @@ IComputationNode* WrapWideStreamDethrottler(TCallable& callable, const TComputat
}
-TType* MakeBlockTupleType(TProgramBuilder& pgmBuilder, TType* tupleType) {
+TType* MakeBlockTupleType(TProgramBuilder& pgmBuilder, TType* tupleType, bool scalar) {
const auto itemTypes = AS_TYPE(TTupleType, tupleType)->GetElements();
const auto ui64Type = pgmBuilder.NewDataType(NUdf::TDataType<ui64>::Id);
const auto blockLenType = pgmBuilder.NewBlockType(ui64Type, TBlockType::EShape::Scalar);
@@ -145,7 +145,7 @@ TType* MakeBlockTupleType(TProgramBuilder& pgmBuilder, TType* tupleType) {
TVector<TType*> blockItemTypes;
std::transform(itemTypes.cbegin(), itemTypes.cend(), std::back_inserter(blockItemTypes),
[&](const auto& itemType) {
- return pgmBuilder.NewBlockType(itemType, TBlockType::EShape::Many);
+ return pgmBuilder.NewBlockType(itemType, scalar ? TBlockType::EShape::Scalar : TBlockType::EShape::Many);
});
// XXX: Mind the last block length column.
blockItemTypes.push_back(blockLenType);
@@ -204,6 +204,37 @@ NUdf::TUnboxedValuePod ToBlocks(TComputationContext& ctx, size_t blockSize,
return holderFactory.CreateDirectListHolder(std::move(listValues));
}
+NUdf::TUnboxedValuePod MakeUint64ScalarBlock(TComputationContext& ctx, size_t blockSize,
+ const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values
+) {
+ // Creates a block of scalar values using the first element of the given list
+
+ for (auto type : types) {
+ // Because IScalarBuilder has no implementations
+ Y_ENSURE(AS_TYPE(TDataType, type)->GetDataSlot() == NYql::NUdf::EDataSlot::Uint64);
+ }
+
+ const auto& holderFactory = ctx.HolderFactory;
+ const size_t width = types.size();
+ const size_t rowsCount = values.GetListLength();
+
+ NUdf::TUnboxedValue row;
+ Y_ENSURE(values.GetListIterator().Next(row));
+ TDefaultListRepresentation listValues;
+ for (size_t rowOffset = 0; rowOffset < rowsCount; rowOffset += blockSize) {
+ NUdf::TUnboxedValue* items = nullptr;
+ const auto tuple = holderFactory.CreateDirectArrayHolder(width + 1, items);
+ for (size_t i = 0; i < width; i++) {
+ const NUdf::TUnboxedValuePod& item = row.GetElement(i);
+ items[i] = holderFactory.CreateArrowBlock(arrow::Datum(static_cast<uint64_t>(item.Get<ui64>())));
+ }
+ items[width] = MakeBlockCount(holderFactory, std::min(blockSize, rowsCount - rowOffset));
+ listValues = listValues.Append(std::move(tuple));
+ }
+
+ return holderFactory.CreateDirectListHolder(std::move(listValues));
+}
+
NUdf::TUnboxedValuePod FromBlocks(TComputationContext& ctx,
const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values
) {
diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.h b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.h
index 2c2f32b312..ecff426c92 100644
--- a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.h
+++ b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.h
@@ -9,10 +9,12 @@ inline bool IsOptionalOrNull(const TType* type) {
return type->IsOptional() || type->IsNull() || type->IsPg();
}
-TType* MakeBlockTupleType(TProgramBuilder& pgmBuilder, TType* tupleType);
+TType* MakeBlockTupleType(TProgramBuilder& pgmBuilder, TType* tupleType, bool scalar);
NUdf::TUnboxedValuePod ToBlocks(TComputationContext& ctx, size_t blockSize,
const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values);
+NUdf::TUnboxedValuePod MakeUint64ScalarBlock(TComputationContext& ctx, size_t blockSize,
+ const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values);
NUdf::TUnboxedValuePod FromBlocks(TComputationContext& ctx,
const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values);
diff --git a/yql/essentials/minikql/computation/mkql_block_trimmer.cpp b/yql/essentials/minikql/computation/mkql_block_trimmer.cpp
new file mode 100644
index 0000000000..c6f0f9109a
--- /dev/null
+++ b/yql/essentials/minikql/computation/mkql_block_trimmer.cpp
@@ -0,0 +1,251 @@
+#include "mkql_block_trimmer.h"
+
+#include <yql/essentials/minikql/arrow/arrow_util.h>
+#include <yql/essentials/public/decimal/yql_decimal.h>
+#include <yql/essentials/public/udf/arrow/block_reader.h>
+#include <yql/essentials/public/udf/arrow/defs.h>
+#include <yql/essentials/public/udf/arrow/util.h>
+#include <yql/essentials/public/udf/udf_type_inspection.h>
+#include <yql/essentials/public/udf/udf_value.h>
+#include <yql/essentials/public/udf/udf_value_builder.h>
+#include <yql/essentials/utils/yql_panic.h>
+
+#include <arrow/array/data.h>
+#include <arrow/datum.h>
+
+namespace NKikimr::NMiniKQL {
+
+class TBlockTrimmerBase : public IBlockTrimmer {
+protected:
+ TBlockTrimmerBase(arrow::MemoryPool* pool)
+ : Pool_(pool)
+ {}
+
+ TBlockTrimmerBase() = delete;
+
+ std::shared_ptr<arrow::Buffer> TrimNullBitmap(const std::shared_ptr<arrow::ArrayData>& array) {
+ auto& nullBitmapBuffer = array->buffers[0];
+
+ std::shared_ptr<arrow::Buffer> result;
+ auto nullCount = array->GetNullCount();
+ if (nullCount == array->length) {
+ result = MakeDenseFalseBitmap(array->length, Pool_);
+ } else if (nullCount > 0) {
+ result = MakeDenseBitmapCopy(nullBitmapBuffer->data(), array->length, array->offset, Pool_);
+ }
+
+ return result;
+ }
+
+protected:
+ arrow::MemoryPool* Pool_;
+};
+
+template<typename TLayout, bool Nullable>
+class TFixedSizeBlockTrimmer : public TBlockTrimmerBase {
+public:
+ TFixedSizeBlockTrimmer(arrow::MemoryPool* pool)
+ : TBlockTrimmerBase(pool)
+ {}
+
+ std::shared_ptr<arrow::ArrayData> Trim(const std::shared_ptr<arrow::ArrayData>& array) override {
+ Y_ENSURE(array->buffers.size() == 2);
+ Y_ENSURE(array->child_data.empty());
+
+ std::shared_ptr<arrow::Buffer> trimmedNullBitmap;
+ if constexpr (Nullable) {
+ trimmedNullBitmap = TrimNullBitmap(array);
+ }
+
+ auto origData = array->GetValues<TLayout>(1);
+ auto dataSize = sizeof(TLayout) * array->length;
+
+ auto trimmedDataBuffer = NUdf::AllocateResizableBuffer(dataSize, Pool_);
+ memcpy(trimmedDataBuffer->mutable_data(), origData, dataSize);
+
+ return arrow::ArrayData::Make(array->type, array->length, {std::move(trimmedNullBitmap), std::move(trimmedDataBuffer)}, array->GetNullCount());
+ }
+};
+
+template<bool Nullable>
+class TResourceBlockTrimmer : public TBlockTrimmerBase {
+public:
+ TResourceBlockTrimmer(arrow::MemoryPool* pool)
+ : TBlockTrimmerBase(pool)
+ {}
+
+ std::shared_ptr<arrow::ArrayData> Trim(const std::shared_ptr<arrow::ArrayData>& array) override {
+ Y_ENSURE(array->buffers.size() == 2);
+ Y_ENSURE(array->child_data.empty());
+
+ std::shared_ptr<arrow::Buffer> trimmedNullBitmap;
+ if constexpr (Nullable) {
+ trimmedNullBitmap = TrimNullBitmap(array);
+ }
+
+ auto origData = array->GetValues<NUdf::TUnboxedValue>(1);
+ auto dataSize = sizeof(NUdf::TUnboxedValue) * array->length;
+
+ auto trimmedBuffer = NUdf::AllocateResizableBuffer<NUdf::TResizableManagedBuffer<NUdf::TUnboxedValue>>(dataSize, Pool_);
+ ARROW_OK(trimmedBuffer->Resize(dataSize));
+ auto trimmedBufferData = reinterpret_cast<NUdf::TUnboxedValue*>(trimmedBuffer->mutable_data());
+
+ for (int64_t i = 0; i < array->length; i++) {
+ ::new(&trimmedBufferData[i]) NUdf::TUnboxedValue(origData[i]);
+ }
+
+ return arrow::ArrayData::Make(array->type, array->length, {std::move(trimmedNullBitmap), std::move(trimmedBuffer)}, array->GetNullCount());
+ }
+};
+
+template<typename TStringType, bool Nullable>
+class TStringBlockTrimmer : public TBlockTrimmerBase {
+ using TOffset = typename TStringType::offset_type;
+
+public:
+ TStringBlockTrimmer(arrow::MemoryPool* pool)
+ : TBlockTrimmerBase(pool)
+ {}
+
+ std::shared_ptr<arrow::ArrayData> Trim(const std::shared_ptr<arrow::ArrayData>& array) override {
+ Y_ENSURE(array->buffers.size() == 3);
+ Y_ENSURE(array->child_data.empty());
+
+ std::shared_ptr<arrow::Buffer> trimmedNullBitmap;
+ if constexpr (Nullable) {
+ trimmedNullBitmap = TrimNullBitmap(array);
+ }
+
+ auto origOffsetData = array->GetValues<TOffset>(1);
+ auto origStringData = reinterpret_cast<const char*>(array->buffers[2]->data() + origOffsetData[0]);
+ auto stringDataSize = origOffsetData[array->length] - origOffsetData[0];
+
+ auto trimmedOffsetBuffer = NUdf::AllocateResizableBuffer(sizeof(TOffset) * (array->length + 1), Pool_);
+ auto trimmedStringBuffer = NUdf::AllocateResizableBuffer(stringDataSize, Pool_);
+
+ auto trimmedOffsetBufferData = reinterpret_cast<TOffset*>(trimmedOffsetBuffer->mutable_data());
+ auto trimmedStringBufferData = reinterpret_cast<char*>(trimmedStringBuffer->mutable_data());
+
+ for (int64_t i = 0; i < array->length + 1; i++) {
+ trimmedOffsetBufferData[i] = origOffsetData[i] - origOffsetData[0];
+ }
+ memcpy(trimmedStringBufferData, origStringData, stringDataSize);
+
+ return arrow::ArrayData::Make(array->type, array->length, {std::move(trimmedNullBitmap), std::move(trimmedOffsetBuffer), std::move(trimmedStringBuffer)}, array->GetNullCount());
+ }
+};
+
+template<bool Nullable>
+class TTupleBlockTrimmer : public TBlockTrimmerBase {
+public:
+ TTupleBlockTrimmer(std::vector<IBlockTrimmer::TPtr> children, arrow::MemoryPool* pool)
+ : TBlockTrimmerBase(pool)
+ , Children_(std::move(children))
+ {}
+
+ std::shared_ptr<arrow::ArrayData> Trim(const std::shared_ptr<arrow::ArrayData>& array) override {
+ Y_ENSURE(array->buffers.size() == 1);
+
+ std::shared_ptr<arrow::Buffer> trimmedNullBitmap;
+ if constexpr (Nullable) {
+ trimmedNullBitmap = TrimNullBitmap(array);
+ }
+
+ std::vector<std::shared_ptr<arrow::ArrayData>> trimmedChildren;
+ Y_ENSURE(array->child_data.size() == Children_.size());
+ for (size_t i = 0; i < Children_.size(); i++) {
+ trimmedChildren.push_back(Children_[i]->Trim(array->child_data[i]));
+ }
+
+ return arrow::ArrayData::Make(array->type, array->length, {std::move(trimmedNullBitmap)}, std::move(trimmedChildren), array->GetNullCount());
+ }
+
+protected:
+ TTupleBlockTrimmer(arrow::MemoryPool* pool)
+ : TBlockTrimmerBase(pool)
+ {}
+
+protected:
+ std::vector<IBlockTrimmer::TPtr> Children_;
+};
+
+template<typename TDate, bool Nullable>
+class TTzDateBlockTrimmer : public TTupleBlockTrimmer<Nullable> {
+ using TBase = TTupleBlockTrimmer<Nullable>;
+ using TDateLayout = typename NUdf::TDataType<TDate>::TLayout;
+
+public:
+ TTzDateBlockTrimmer(arrow::MemoryPool* pool)
+ : TBase(pool)
+ {
+ this->Children_.push_back(std::make_unique<TFixedSizeBlockTrimmer<TDateLayout, false>>(pool));
+ this->Children_.push_back(std::make_unique<TFixedSizeBlockTrimmer<ui16, false>>(pool));
+ }
+};
+
+class TExternalOptionalBlockTrimmer : public TBlockTrimmerBase {
+public:
+ TExternalOptionalBlockTrimmer(IBlockTrimmer::TPtr inner, arrow::MemoryPool* pool)
+ : TBlockTrimmerBase(pool)
+ , Inner_(std::move(inner))
+ {}
+
+ std::shared_ptr<arrow::ArrayData> Trim(const std::shared_ptr<arrow::ArrayData>& array) override {
+ Y_ENSURE(array->buffers.size() == 1);
+ Y_ENSURE(array->child_data.size() == 1);
+
+ auto trimmedNullBitmap = TrimNullBitmap(array);
+ auto trimmedInner = Inner_->Trim(array->child_data[0]);
+
+ return arrow::ArrayData::Make(array->type, array->length, {std::move(trimmedNullBitmap)}, {std::move(trimmedInner)}, array->GetNullCount());
+ }
+
+private:
+ IBlockTrimmer::TPtr Inner_;
+};
+
+struct TTrimmerTraits {
+ using TResult = IBlockTrimmer;
+ template <bool Nullable>
+ using TTuple = TTupleBlockTrimmer<Nullable>;
+ template <typename T, bool Nullable>
+ using TFixedSize = TFixedSizeBlockTrimmer<T, Nullable>;
+ template <typename TStringType, bool Nullable, NKikimr::NUdf::EDataSlot>
+ using TStrings = TStringBlockTrimmer<TStringType, Nullable>;
+ using TExtOptional = TExternalOptionalBlockTrimmer;
+ template<bool Nullable>
+ using TResource = TResourceBlockTrimmer<Nullable>;
+ template<typename TTzDate, bool Nullable>
+ using TTzDateReader = TTzDateBlockTrimmer<TTzDate, Nullable>;
+
+ static TResult::TPtr MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder, arrow::MemoryPool* pool) {
+ if (desc.PassByValue) {
+ return std::make_unique<TFixedSize<ui64, true>>(pool);
+ } else {
+ return std::make_unique<TStrings<arrow::BinaryType, true, NKikimr::NUdf::EDataSlot::String>>(pool);
+ }
+ }
+
+ static TResult::TPtr MakeResource(bool isOptional, arrow::MemoryPool* pool) {
+ if (isOptional) {
+ return std::make_unique<TResource<true>>(pool);
+ } else {
+ return std::make_unique<TResource<false>>(pool);
+ }
+ }
+
+ template<typename TTzDate>
+ static TResult::TPtr MakeTzDate(bool isOptional, arrow::MemoryPool* pool) {
+ if (isOptional) {
+ return std::make_unique<TTzDateReader<TTzDate, true>>(pool);
+ } else {
+ return std::make_unique<TTzDateReader<TTzDate, false>>(pool);
+ }
+ }
+};
+
+IBlockTrimmer::TPtr MakeBlockTrimmer(const NUdf::ITypeInfoHelper& typeInfoHelper, const NUdf::TType* type, arrow::MemoryPool* pool) {
+ return MakeBlockReaderImpl<TTrimmerTraits>(typeInfoHelper, type, nullptr, pool);
+}
+
+}
diff --git a/yql/essentials/minikql/computation/mkql_block_trimmer.h b/yql/essentials/minikql/computation/mkql_block_trimmer.h
new file mode 100644
index 0000000000..0ec46ea83c
--- /dev/null
+++ b/yql/essentials/minikql/computation/mkql_block_trimmer.h
@@ -0,0 +1,21 @@
+#pragma once
+
+#include <util/generic/noncopyable.h>
+#include <yql/essentials/public/udf/udf_types.h>
+
+#include <arrow/type.h>
+
+namespace NKikimr::NMiniKQL {
+
+class IBlockTrimmer : private TNonCopyable {
+public:
+ using TPtr = std::unique_ptr<IBlockTrimmer>;
+
+ virtual ~IBlockTrimmer() = default;
+
+ virtual std::shared_ptr<arrow::ArrayData> Trim(const std::shared_ptr<arrow::ArrayData>& array) = 0;
+};
+
+IBlockTrimmer::TPtr MakeBlockTrimmer(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type, arrow::MemoryPool* pool);
+
+}
diff --git a/yql/essentials/minikql/computation/mkql_block_trimmer_ut.cpp b/yql/essentials/minikql/computation/mkql_block_trimmer_ut.cpp
new file mode 100644
index 0000000000..4ebb74f3ec
--- /dev/null
+++ b/yql/essentials/minikql/computation/mkql_block_trimmer_ut.cpp
@@ -0,0 +1,373 @@
+#include "mkql_block_trimmer.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <yql/essentials/public/udf/arrow/block_builder.h>
+#include <yql/essentials/public/udf/arrow/memory_pool.h>
+#include <yql/essentials/minikql/mkql_type_builder.h>
+#include <yql/essentials/minikql/mkql_function_registry.h>
+#include <yql/essentials/minikql/mkql_program_builder.h>
+#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
+
+using namespace NYql::NUdf;
+using namespace NKikimr;
+
+struct TBlockTrimmerTestData {
+ TBlockTrimmerTestData()
+ : FunctionRegistry(NMiniKQL::CreateFunctionRegistry(NMiniKQL::CreateBuiltinRegistry()))
+ , Alloc(__LOCATION__)
+ , Env(Alloc)
+ , PgmBuilder(Env, *FunctionRegistry)
+ , MemInfo("Memory")
+ , ArrowPool(GetYqlMemoryPool())
+ {
+ }
+
+ TIntrusivePtr<NMiniKQL::IFunctionRegistry> FunctionRegistry;
+ NMiniKQL::TScopedAlloc Alloc;
+ NMiniKQL::TTypeEnvironment Env;
+ NMiniKQL::TProgramBuilder PgmBuilder;
+ NMiniKQL::TMemoryUsageInfo MemInfo;
+ arrow::MemoryPool* const ArrowPool;
+};
+
+Y_UNIT_TEST_SUITE(TBlockTrimmerTest) {
+ Y_UNIT_TEST(TestFixedSize) {
+ TBlockTrimmerTestData data;
+
+ const auto int64Type = data.PgmBuilder.NewDataType(NUdf::EDataSlot::Int64, false);
+
+ size_t itemSize = NMiniKQL::CalcMaxBlockItemSize(int64Type);
+ size_t blockLen = NMiniKQL::CalcBlockLen(itemSize);
+ Y_ENSURE(blockLen > 8);
+
+ constexpr auto testSize = NMiniKQL::MaxBlockSizeInBytes / sizeof(i64);
+ constexpr auto sliceSize = 1024;
+ static_assert(testSize % sliceSize == 0);
+
+ auto builder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), int64Type, *data.ArrowPool, blockLen, nullptr);
+ auto reader = MakeBlockReader(NMiniKQL::TTypeInfoHelper(), int64Type);
+ auto trimmer = MakeBlockTrimmer(NMiniKQL::TTypeInfoHelper(), int64Type, data.ArrowPool);
+
+ for (size_t i = 0; i < testSize; i++) {
+ builder->Add(TBlockItem(i));
+ }
+ auto datum = builder->Build(true);
+ Y_ENSURE(datum.is_array());
+ auto array = datum.array();
+
+ for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) {
+ auto slice = Chop(array, sliceSize);
+ auto trimmedSlice = trimmer->Trim(slice);
+
+ for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) {
+ TBlockItem lhs = reader->GetItem(*slice, elemIdx);
+ TBlockItem rhs = reader->GetItem(*trimmedSlice, elemIdx);
+ UNIT_ASSERT_VALUES_EQUAL_C(lhs.Get<i64>(), rhs.Get<i64>(), "Expected the same data after trim");
+ }
+ }
+ }
+
+ Y_UNIT_TEST(TestString) {
+ TBlockTrimmerTestData data;
+
+ const auto stringType = data.PgmBuilder.NewDataType(NUdf::EDataSlot::String, false);
+
+ size_t itemSize = NMiniKQL::CalcMaxBlockItemSize(stringType);
+ size_t blockLen = NMiniKQL::CalcBlockLen(itemSize);
+ Y_ENSURE(blockLen > 8);
+
+ // To fit all strings into single block
+ constexpr auto testSize = 512;
+ constexpr auto sliceSize = 128;
+ static_assert(testSize % sliceSize == 0);
+
+ auto builder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), stringType, *data.ArrowPool, blockLen, nullptr);
+ auto reader = MakeBlockReader(NMiniKQL::TTypeInfoHelper(), stringType);
+ auto trimmer = MakeBlockTrimmer(NMiniKQL::TTypeInfoHelper(), stringType, data.ArrowPool);
+
+ std::string testString;
+ testString.resize(testSize);
+ for (size_t i = 0; i < testSize; i++) {
+ testString[i] = static_cast<char>(i);
+ if (i % 2) {
+ builder->Add(TBlockItem(TStringRef(testString.data(), i + 1)));
+ } else {
+ // Empty string
+ builder->Add(TBlockItem(TStringRef()));
+ }
+ }
+ auto datum = builder->Build(true);
+ Y_ENSURE(datum.is_array());
+ auto array = datum.array();
+
+ for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) {
+ auto slice = Chop(array, sliceSize);
+ auto trimmedSlice = trimmer->Trim(slice);
+
+ for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) {
+ TBlockItem lhs = reader->GetItem(*slice, elemIdx);
+ TBlockItem rhs = reader->GetItem(*trimmedSlice, elemIdx);
+ UNIT_ASSERT_VALUES_EQUAL_C(lhs.AsStringRef(), rhs.AsStringRef(), "Expected the same data after trim");
+ }
+ }
+ }
+
+ Y_UNIT_TEST(TestOptional) {
+ TBlockTrimmerTestData data;
+
+ const auto optionalInt64Type = data.PgmBuilder.NewDataType(NUdf::EDataSlot::Int64, true);
+
+ size_t itemSize = NMiniKQL::CalcMaxBlockItemSize(optionalInt64Type);
+ size_t blockLen = NMiniKQL::CalcBlockLen(itemSize);
+ Y_ENSURE(blockLen > 8);
+
+ constexpr auto testSize = NMiniKQL::MaxBlockSizeInBytes / sizeof(i64);
+ constexpr auto sliceSize = 1024;
+ static_assert(testSize % sliceSize == 0);
+
+ auto builder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), optionalInt64Type, *data.ArrowPool, blockLen, nullptr);
+ auto reader = MakeBlockReader(NMiniKQL::TTypeInfoHelper(), optionalInt64Type);
+ auto trimmer = MakeBlockTrimmer(NMiniKQL::TTypeInfoHelper(), optionalInt64Type, data.ArrowPool);
+
+ for (size_t i = 0; i < testSize; i++) {
+ if (i % 2) {
+ builder->Add(TBlockItem());
+ } else {
+ builder->Add(TBlockItem(i));
+ }
+ }
+ auto datum = builder->Build(true);
+ Y_ENSURE(datum.is_array());
+ auto array = datum.array();
+
+ for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) {
+ auto slice = Chop(array, sliceSize);
+ auto trimmedSlice = trimmer->Trim(slice);
+
+ for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) {
+ TBlockItem lhs = reader->GetItem(*slice, elemIdx);
+ TBlockItem rhs = reader->GetItem(*trimmedSlice, elemIdx);
+ UNIT_ASSERT_VALUES_EQUAL_C(bool(lhs), bool(rhs), "Expected the same optionality after trim");
+
+ if (lhs) {
+ UNIT_ASSERT_VALUES_EQUAL_C(lhs.Get<i64>(), rhs.Get<i64>(), "Expected the same data after trim");
+ }
+ }
+ }
+ }
+
+ Y_UNIT_TEST(TestExternalOptional) {
+ TBlockTrimmerTestData data;
+
+ const auto doubleOptInt64Type = data.PgmBuilder.NewOptionalType(data.PgmBuilder.NewDataType(NUdf::EDataSlot::Int64, true));
+
+ size_t itemSize = NMiniKQL::CalcMaxBlockItemSize(doubleOptInt64Type);
+ size_t blockLen = NMiniKQL::CalcBlockLen(itemSize);
+ Y_ENSURE(blockLen > 8);
+
+ constexpr auto testSize = NMiniKQL::MaxBlockSizeInBytes / sizeof(i64);
+ constexpr auto sliceSize = 1024;
+ static_assert(testSize % sliceSize == 0);
+
+ auto builder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), doubleOptInt64Type, *data.ArrowPool, blockLen, nullptr);
+ auto reader = MakeBlockReader(NMiniKQL::TTypeInfoHelper(), doubleOptInt64Type);
+ auto trimmer = MakeBlockTrimmer(NMiniKQL::TTypeInfoHelper(), doubleOptInt64Type, data.ArrowPool);
+
+ for (size_t i = 0; i < testSize; i++) {
+ if (i % 2) {
+ builder->Add(TBlockItem(i).MakeOptional());
+ } else if (i % 4) {
+ builder->Add(TBlockItem());
+ } else {
+ builder->Add(TBlockItem().MakeOptional());
+ }
+ }
+ auto datum = builder->Build(true);
+ Y_ENSURE(datum.is_array());
+ auto array = datum.array();
+
+ for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) {
+ auto slice = Chop(array, sliceSize);
+ auto trimmedSlice = trimmer->Trim(slice);
+
+ for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) {
+ TBlockItem lhs = reader->GetItem(*slice, elemIdx);
+ TBlockItem rhs = reader->GetItem(*trimmedSlice, elemIdx);
+
+ for (size_t i = 0; i < 2; i++) {
+ UNIT_ASSERT_VALUES_EQUAL_C(bool(lhs), bool(rhs), "Expected the same optionality after trim");
+ if (!lhs) {
+ break;
+ }
+
+ lhs = lhs.GetOptionalValue();
+ rhs = rhs.GetOptionalValue();
+ }
+
+ if (lhs) {
+ UNIT_ASSERT_VALUES_EQUAL_C(lhs.Get<i64>(), rhs.Get<i64>(), "Expected the same data after trim");
+ }
+ }
+ }
+ }
+
+ Y_UNIT_TEST(TestTuple) {
+ TBlockTrimmerTestData data;
+
+ std::vector<NMiniKQL::TType*> types;
+ types.push_back(data.PgmBuilder.NewDataType(NUdf::EDataSlot::Int64));
+ types.push_back(data.PgmBuilder.NewDataType(NUdf::EDataSlot::String));
+ types.push_back(data.PgmBuilder.NewDataType(NUdf::EDataSlot::Int64, true));
+ const auto tupleType = data.PgmBuilder.NewTupleType(types);
+
+ size_t itemSize = NMiniKQL::CalcMaxBlockItemSize(tupleType);
+ size_t blockLen = NMiniKQL::CalcBlockLen(itemSize);
+ Y_ENSURE(blockLen > 8);
+
+ // To fit all strings into single block
+ constexpr auto testSize = 512;
+ constexpr auto sliceSize = 128;
+ static_assert(testSize % sliceSize == 0);
+
+ auto builder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), tupleType, *data.ArrowPool, blockLen, nullptr);
+ auto reader = MakeBlockReader(NMiniKQL::TTypeInfoHelper(), tupleType);
+ auto trimmer = MakeBlockTrimmer(NMiniKQL::TTypeInfoHelper(), tupleType, data.ArrowPool);
+
+ std::string testString;
+ testString.resize(testSize);
+ std::vector<TBlockItem*> testTuples(testSize);
+ for (size_t i = 0; i < testSize; i++) {
+ testString[i] = static_cast<char>(i);
+
+ TBlockItem* tupleItems = new TBlockItem[3];
+ testTuples.push_back(tupleItems);
+ tupleItems[0] = TBlockItem(i);
+ tupleItems[1] = TBlockItem(TStringRef(testString.data(), i + 1));
+ tupleItems[2] = i % 2 ? TBlockItem(i) : TBlockItem();
+
+ builder->Add(TBlockItem(tupleItems));
+ }
+ auto datum = builder->Build(true);
+ Y_ENSURE(datum.is_array());
+ auto array = datum.array();
+
+ for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) {
+ auto slice = Chop(array, sliceSize);
+ auto trimmedSlice = trimmer->Trim(slice);
+
+ for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) {
+ TBlockItem lhs = reader->GetItem(*slice, elemIdx);
+ TBlockItem rhs = reader->GetItem(*trimmedSlice, elemIdx);
+
+ UNIT_ASSERT_VALUES_EQUAL_C(lhs.GetElement(0).Get<i64>(), rhs.GetElement(0).Get<i64>(), "Expected the same data after trim");
+ UNIT_ASSERT_VALUES_EQUAL_C(lhs.GetElement(1).AsStringRef(), rhs.GetElement(1).AsStringRef(), "Expected the same data after trim");
+ UNIT_ASSERT_VALUES_EQUAL_C(bool(lhs.GetElement(2)), bool(rhs.GetElement(2)), "Expected the same optionality after trim");
+ if (bool(lhs.GetElement(2))) {
+ UNIT_ASSERT_VALUES_EQUAL_C(lhs.GetElement(2).Get<i64>(), rhs.GetElement(2).Get<i64>(), "Expected the same data after trim");
+ }
+ }
+ }
+
+ for (auto tupleItems : testTuples) {
+ delete[] tupleItems;
+ }
+ }
+
+ Y_UNIT_TEST(TestTzDate) {
+ TBlockTrimmerTestData data;
+ using TDtLayout = TDataType<TTzDatetime>::TLayout;
+
+ const auto tzDatetimeType = data.PgmBuilder.NewDataType(NUdf::EDataSlot::TzDatetime, false);
+
+ size_t itemSize = NMiniKQL::CalcMaxBlockItemSize(tzDatetimeType);
+ size_t blockLen = NMiniKQL::CalcBlockLen(itemSize);
+ Y_ENSURE(blockLen > 8);
+
+ constexpr auto testSize = NMiniKQL::MaxBlockSizeInBytes / (sizeof(TDtLayout) + sizeof(ui16));
+ constexpr auto sliceSize = 1024;
+ static_assert(testSize % sliceSize == 0);
+
+ auto builder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), tzDatetimeType, *data.ArrowPool, blockLen, nullptr);
+ auto reader = MakeBlockReader(NMiniKQL::TTypeInfoHelper(), tzDatetimeType);
+ auto trimmer = MakeBlockTrimmer(NMiniKQL::TTypeInfoHelper(), tzDatetimeType, data.ArrowPool);
+
+ for (size_t i = 0; i < testSize; i++) {
+ TBlockItem dt = TBlockItem(i);
+ dt.SetTimezoneId(i * 2);
+ builder->Add(dt);
+ }
+ auto datum = builder->Build(true);
+ Y_ENSURE(datum.is_array());
+ auto array = datum.array();
+
+ for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) {
+ auto slice = Chop(array, sliceSize);
+ auto trimmedSlice = trimmer->Trim(slice);
+
+ for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) {
+ TBlockItem lhs = reader->GetItem(*slice, elemIdx);
+ TBlockItem rhs = reader->GetItem(*trimmedSlice, elemIdx);
+ UNIT_ASSERT_VALUES_EQUAL_C(lhs.Get<TDtLayout>(), rhs.Get<TDtLayout>(), "Expected the same data after trim");
+ UNIT_ASSERT_VALUES_EQUAL_C(lhs.GetTimezoneId(), rhs.GetTimezoneId(), "Expected the same data after trim");
+ }
+ }
+ }
+
+ extern const char ResourceName[] = "Resource.Name";
+ Y_UNIT_TEST(TestResource) {
+ TBlockTrimmerTestData data;
+
+ const auto resourceType = data.PgmBuilder.NewResourceType(ResourceName);
+
+ size_t itemSize = NMiniKQL::CalcMaxBlockItemSize(resourceType);
+ size_t blockLen = NMiniKQL::CalcBlockLen(itemSize);
+ Y_ENSURE(blockLen > 8);
+
+ constexpr auto testSize = NMiniKQL::MaxBlockSizeInBytes / sizeof(TUnboxedValue);
+ constexpr auto sliceSize = 1024;
+ static_assert(testSize % sliceSize == 0);
+
+ auto builder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), resourceType, *data.ArrowPool, blockLen, nullptr);
+ auto reader = MakeBlockReader(NMiniKQL::TTypeInfoHelper(), resourceType);
+ auto trimmer = MakeBlockTrimmer(NMiniKQL::TTypeInfoHelper(), resourceType, data.ArrowPool);
+
+ struct TWithDtor {
+ int Payload;
+ std::shared_ptr<int> DestructorCallsCnt;
+ TWithDtor(int payload, std::shared_ptr<int> destructorCallsCnt):
+ Payload(payload), DestructorCallsCnt(std::move(destructorCallsCnt)) {
+ }
+ ~TWithDtor() {
+ *DestructorCallsCnt = *DestructorCallsCnt + 1;
+ }
+ };
+ using TTestResource = TBoxedResource<TWithDtor, ResourceName>;
+
+ auto destructorCallsCnt = std::make_shared<int>(0);
+ {
+ for (size_t i = 0; i < testSize; i++) {
+ builder->Add(TUnboxedValuePod(new TTestResource(i, destructorCallsCnt)));
+ }
+ auto datum = builder->Build(true);
+ Y_ENSURE(datum.is_array());
+ auto array = datum.array();
+
+ for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) {
+ auto slice = Chop(array, sliceSize);
+ auto trimmedSlice = trimmer->Trim(slice);
+
+ for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) {
+ TBlockItem lhs = reader->GetItem(*slice, elemIdx);
+ TBlockItem rhs = reader->GetItem(*trimmedSlice, elemIdx);
+
+ auto lhsResource = reinterpret_cast<TTestResource*>(lhs.GetBoxed().Get());
+ auto rhsResource = reinterpret_cast<TTestResource*>(rhs.GetBoxed().Get());
+ UNIT_ASSERT_VALUES_EQUAL_C(lhsResource->Get()->Payload, rhsResource->Get()->Payload, "Expected the same data after trim");
+ }
+ }
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL_C(*destructorCallsCnt, testSize, "Expected 1 call to resource destructor");
+ }
+}
diff --git a/yql/essentials/minikql/computation/ut/ya.make.inc b/yql/essentials/minikql/computation/ut/ya.make.inc
index 4de4b13fc5..1f7ae7d6dc 100644
--- a/yql/essentials/minikql/computation/ut/ya.make.inc
+++ b/yql/essentials/minikql/computation/ut/ya.make.inc
@@ -12,6 +12,7 @@ ENDIF()
SRCDIR(yql/essentials/minikql/computation)
SRCS(
+ mkql_block_trimmer_ut.cpp
mkql_computation_node_holders_ut.cpp
mkql_computation_node_pack_ut.cpp
mkql_computation_node_list_ut.cpp
diff --git a/yql/essentials/minikql/computation/ya.make b/yql/essentials/minikql/computation/ya.make
index c5a4490817..4374567135 100644
--- a/yql/essentials/minikql/computation/ya.make
+++ b/yql/essentials/minikql/computation/ya.make
@@ -5,6 +5,7 @@ SRCS(
mkql_block_impl.cpp
mkql_block_reader.cpp
mkql_block_transport.cpp
+ mkql_block_trimmer.cpp
mkql_computation_node.cpp
mkql_computation_node_holders.cpp
mkql_computation_node_impl.cpp
@@ -22,6 +23,7 @@ PEERDIR(
yql/essentials/public/types
yql/essentials/parser/pg_wrapper/interface
yql/essentials/public/udf
+ yql/essentials/public/udf/arrow
yql/essentials/minikql/arrow
)
diff --git a/yql/essentials/minikql/computation/ya.make.inc b/yql/essentials/minikql/computation/ya.make.inc
index 7a663f1a46..14f51bfcee 100644
--- a/yql/essentials/minikql/computation/ya.make.inc
+++ b/yql/essentials/minikql/computation/ya.make.inc
@@ -23,6 +23,7 @@ PEERDIR(
yql/essentials/minikql/computation
yql/essentials/parser/pg_wrapper/interface
yql/essentials/public/udf
+ yql/essentials/public/udf/arrow
yql/essentials/utils
library/cpp/threading/future
)
diff --git a/yql/essentials/public/udf/arrow/bit_util.h b/yql/essentials/public/udf/arrow/bit_util.h
index c2c891b269..4dbe785aa7 100644
--- a/yql/essentials/public/udf/arrow/bit_util.h
+++ b/yql/essentials/public/udf/arrow/bit_util.h
@@ -94,5 +94,30 @@ inline T* CompressArray(const T* src, const ui8* sparseBitmap, T* dst, size_t co
return dst;
}
+inline void CopyDenseBitmap(ui8* dst, const ui8* src, size_t srcOffset, size_t len) {
+ if ((srcOffset & 7) != 0) {
+ size_t offsetBytes = srcOffset >> 3;
+ src += offsetBytes;
+
+ ui8 offsetTail = srcOffset & 7;
+ ui8 offsetHead = 8 - offsetTail;
+
+ ui8 remainder = *src++ >> offsetTail;
+ size_t dstOffset = offsetHead;
+ for (; dstOffset < len; dstOffset += 8) {
+ *dst++ = remainder | (*src << offsetHead);
+ remainder = *src >> offsetTail;
+ src++;
+ }
+ // dst is guaranteed to have extra length even if it's not needed
+ *dst++ = remainder;
+ } else {
+ src += srcOffset >> 3;
+ // Round up to 8
+ len = (len + 7u) & ~size_t(7u);
+ memcpy(dst, src, len >> 3);
+ }
+}
+
}
}
diff --git a/yql/essentials/public/udf/arrow/block_reader.h b/yql/essentials/public/udf/arrow/block_reader.h
index d7863329ee..77a74b155a 100644
--- a/yql/essentials/public/udf/arrow/block_reader.h
+++ b/yql/essentials/public/udf/arrow/block_reader.h
@@ -32,7 +32,7 @@ struct TBlockItemSerializeProps {
bool IsFixed = true; // true if each block item takes fixed size
};
-template<typename T, bool Nullable, typename TDerived>
+template<typename T, bool Nullable, typename TDerived>
class TFixedSizeBlockReaderBase : public IBlockReader {
public:
TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final {
@@ -309,7 +309,7 @@ public:
return TBlockItem(Items.data());
}
-
+
size_t GetDataWeightImpl(const TBlockItem& item) const {
const TBlockItem* items = nullptr;
ui64 size = 0;
@@ -352,7 +352,7 @@ public:
Children[i]->SaveItem(*data.child_data[i], index, out);
}
}
-
+
void SaveChildrenScalarItems(const arrow::StructScalar& structScalar, TOutputBuffer& out) const {
for (ui32 i = 0; i < Children.size(); ++i) {
Children[i]->SaveScalarItem(*structScalar.value[i], out);
@@ -396,7 +396,7 @@ public:
Y_UNUSED(item);
return GetChildrenDefaultDataWeight();
}
-
+
size_t GetChildrenDefaultDataWeight() const {
ui64 size = 0;
if constexpr (Nullable) {
@@ -412,7 +412,7 @@ public:
DateReader_.SaveItem(*data.child_data[0], index, out);
TimezoneReader_.SaveItem(*data.child_data[1], index, out);
}
-
+
void SaveChildrenScalarItems(const arrow::StructScalar& structScalar, TOutputBuffer& out) const {
DateReader_.SaveScalarItem(*structScalar.value[0], out);
TimezoneReader_.SaveScalarItem(*structScalar.value[1], out);
@@ -525,31 +525,30 @@ struct TReaderTraits {
}
};
-template <typename TTraits>
-std::unique_ptr<typename TTraits::TResult> MakeTupleBlockReaderImpl(bool isOptional, TVector<std::unique_ptr<typename TTraits::TResult>>&& children) {
+template <typename TTraits, typename... TArgs>
+std::unique_ptr<typename TTraits::TResult> MakeTupleBlockReaderImpl(bool isOptional, TVector<std::unique_ptr<typename TTraits::TResult>>&& children, TArgs... args) {
if (isOptional) {
- return std::make_unique<typename TTraits::template TTuple<true>>(std::move(children));
+ return std::make_unique<typename TTraits::template TTuple<true>>(std::move(children), args...);
} else {
- return std::make_unique<typename TTraits::template TTuple<false>>(std::move(children));
+ return std::make_unique<typename TTraits::template TTuple<false>>(std::move(children), args...);
}
}
-template <typename TTraits, typename T>
-std::unique_ptr<typename TTraits::TResult> MakeFixedSizeBlockReaderImpl(bool isOptional) {
+template <typename TTraits, typename T, typename... TArgs>
+std::unique_ptr<typename TTraits::TResult> MakeFixedSizeBlockReaderImpl(bool isOptional, TArgs... args) {
if (isOptional) {
- return std::make_unique<typename TTraits::template TFixedSize<T, true>>();
+ return std::make_unique<typename TTraits::template TFixedSize<T, true>>(args...);
} else {
- return std::make_unique<typename TTraits::template TFixedSize<T, false>>();
+ return std::make_unique<typename TTraits::template TFixedSize<T, false>>(args...);
}
}
-
-template <typename TTraits, typename T, NKikimr::NUdf::EDataSlot TOriginal>
-std::unique_ptr<typename TTraits::TResult> MakeStringBlockReaderImpl(bool isOptional) {
+template <typename TTraits, typename T, NKikimr::NUdf::EDataSlot TOriginal, typename... TArgs>
+std::unique_ptr<typename TTraits::TResult> MakeStringBlockReaderImpl(bool isOptional, TArgs... args) {
if (isOptional) {
- return std::make_unique<typename TTraits::template TStrings<T, true, TOriginal>>();
+ return std::make_unique<typename TTraits::template TStrings<T, true, TOriginal>>(args...);
} else {
- return std::make_unique<typename TTraits::template TStrings<T, false, TOriginal>>();
+ return std::make_unique<typename TTraits::template TStrings<T, false, TOriginal>>(args...);
}
}
@@ -558,8 +557,8 @@ concept CanInstantiateBlockReaderForDecimal = requires {
typename TTraits::template TFixedSize<NYql::NDecimal::TInt128, true>;
};
-template <typename TTraits>
-std::unique_ptr<typename TTraits::TResult> MakeBlockReaderImpl(const ITypeInfoHelper& typeInfoHelper, const TType* type, const IPgBuilder* pgBuilder) {
+template <typename TTraits, typename... TArgs>
+std::unique_ptr<typename TTraits::TResult> MakeBlockReaderImpl(const ITypeInfoHelper& typeInfoHelper, const TType* type, const IPgBuilder* pgBuilder, TArgs... args) {
const TType* unpacked = type;
TOptionalTypeInspector typeOpt(typeInfoHelper, type);
bool isOptional = false;
@@ -591,9 +590,9 @@ std::unique_ptr<typename TTraits::TResult> MakeBlockReaderImpl(const ITypeInfoHe
++nestLevel;
}
- auto reader = MakeBlockReaderImpl<TTraits>(typeInfoHelper, previousType, pgBuilder);
+ auto reader = MakeBlockReaderImpl<TTraits>(typeInfoHelper, previousType, pgBuilder, args...);
for (ui32 i = 1; i < nestLevel; ++i) {
- reader = std::make_unique<typename TTraits::TExtOptional>(std::move(reader));
+ reader = std::make_unique<typename TTraits::TExtOptional>(std::move(reader), args...);
}
return reader;
@@ -606,20 +605,20 @@ std::unique_ptr<typename TTraits::TResult> MakeBlockReaderImpl(const ITypeInfoHe
if (typeStruct) {
TVector<std::unique_ptr<typename TTraits::TResult>> members;
for (ui32 i = 0; i < typeStruct.GetMembersCount(); i++) {
- members.emplace_back(MakeBlockReaderImpl<TTraits>(typeInfoHelper, typeStruct.GetMemberType(i), pgBuilder));
+ members.emplace_back(MakeBlockReaderImpl<TTraits>(typeInfoHelper, typeStruct.GetMemberType(i), pgBuilder, args...));
}
// XXX: Use Tuple block reader for Struct.
- return MakeTupleBlockReaderImpl<TTraits>(isOptional, std::move(members));
+ return MakeTupleBlockReaderImpl<TTraits>(isOptional, std::move(members), args...);
}
TTupleTypeInspector typeTuple(typeInfoHelper, type);
if (typeTuple) {
TVector<std::unique_ptr<typename TTraits::TResult>> children;
for (ui32 i = 0; i < typeTuple.GetElementsCount(); ++i) {
- children.emplace_back(MakeBlockReaderImpl<TTraits>(typeInfoHelper, typeTuple.GetElementType(i), pgBuilder));
+ children.emplace_back(MakeBlockReaderImpl<TTraits>(typeInfoHelper, typeTuple.GetElementType(i), pgBuilder, args...));
}
- return MakeTupleBlockReaderImpl<TTraits>(isOptional, std::move(children));
+ return MakeTupleBlockReaderImpl<TTraits>(isOptional, std::move(children), args...);
}
TDataTypeInspector typeData(typeInfoHelper, type);
@@ -627,59 +626,59 @@ std::unique_ptr<typename TTraits::TResult> MakeBlockReaderImpl(const ITypeInfoHe
auto typeId = typeData.GetTypeId();
switch (GetDataSlot(typeId)) {
case NUdf::EDataSlot::Int8:
- return MakeFixedSizeBlockReaderImpl<TTraits, i8>(isOptional);
+ return MakeFixedSizeBlockReaderImpl<TTraits, i8>(isOptional, args...);
case NUdf::EDataSlot::Bool:
case NUdf::EDataSlot::Uint8:
- return MakeFixedSizeBlockReaderImpl<TTraits, ui8>(isOptional);
+ return MakeFixedSizeBlockReaderImpl<TTraits, ui8>(isOptional, args...);
case NUdf::EDataSlot::Int16:
- return MakeFixedSizeBlockReaderImpl<TTraits, i16>(isOptional);
+ return MakeFixedSizeBlockReaderImpl<TTraits, i16>(isOptional, args...);
case NUdf::EDataSlot::Uint16:
case NUdf::EDataSlot::Date:
- return MakeFixedSizeBlockReaderImpl<TTraits, ui16>(isOptional);
+ return MakeFixedSizeBlockReaderImpl<TTraits, ui16>(isOptional, args...);
case NUdf::EDataSlot::Int32:
case NUdf::EDataSlot::Date32:
- return MakeFixedSizeBlockReaderImpl<TTraits, i32>(isOptional);
+ return MakeFixedSizeBlockReaderImpl<TTraits, i32>(isOptional, args...);
case NUdf::EDataSlot::Uint32:
case NUdf::EDataSlot::Datetime:
- return MakeFixedSizeBlockReaderImpl<TTraits, ui32>(isOptional);
+ return MakeFixedSizeBlockReaderImpl<TTraits, ui32>(isOptional, args...);
case NUdf::EDataSlot::Int64:
case NUdf::EDataSlot::Interval:
case NUdf::EDataSlot::Interval64:
case NUdf::EDataSlot::Datetime64:
case NUdf::EDataSlot::Timestamp64:
- return MakeFixedSizeBlockReaderImpl<TTraits, i64>(isOptional);
+ return MakeFixedSizeBlockReaderImpl<TTraits, i64>(isOptional, args...);
case NUdf::EDataSlot::Uint64:
case NUdf::EDataSlot::Timestamp:
- return MakeFixedSizeBlockReaderImpl<TTraits, ui64>(isOptional);
+ return MakeFixedSizeBlockReaderImpl<TTraits, ui64>(isOptional, args...);
case NUdf::EDataSlot::Float:
- return MakeFixedSizeBlockReaderImpl<TTraits, float>(isOptional);
+ return MakeFixedSizeBlockReaderImpl<TTraits, float>(isOptional, args...);
case NUdf::EDataSlot::Double:
- return MakeFixedSizeBlockReaderImpl<TTraits, double>(isOptional);
+ return MakeFixedSizeBlockReaderImpl<TTraits, double>(isOptional, args...);
case NUdf::EDataSlot::String:
- return MakeStringBlockReaderImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::String>(isOptional);
+ return MakeStringBlockReaderImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::String>(isOptional, args...);
case NUdf::EDataSlot::Yson:
- return MakeStringBlockReaderImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::Yson>(isOptional);
+ return MakeStringBlockReaderImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::Yson>(isOptional, args...);
case NUdf::EDataSlot::JsonDocument:
- return MakeStringBlockReaderImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::JsonDocument>(isOptional);
+ return MakeStringBlockReaderImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::JsonDocument>(isOptional, args...);
case NUdf::EDataSlot::Utf8:
- return MakeStringBlockReaderImpl<TTraits, arrow::StringType, NUdf::EDataSlot::Utf8>(isOptional);
+ return MakeStringBlockReaderImpl<TTraits, arrow::StringType, NUdf::EDataSlot::Utf8>(isOptional, args...);
case NUdf::EDataSlot::Json:
- return MakeStringBlockReaderImpl<TTraits, arrow::StringType, NUdf::EDataSlot::Json>(isOptional);
+ return MakeStringBlockReaderImpl<TTraits, arrow::StringType, NUdf::EDataSlot::Json>(isOptional, args...);
case NUdf::EDataSlot::TzDate:
- return TTraits::template MakeTzDate<TTzDate>(isOptional);
+ return TTraits::template MakeTzDate<TTzDate>(isOptional, args...);
case NUdf::EDataSlot::TzDatetime:
- return TTraits::template MakeTzDate<TTzDatetime>(isOptional);
+ return TTraits::template MakeTzDate<TTzDatetime>(isOptional, args...);
case NUdf::EDataSlot::TzTimestamp:
- return TTraits::template MakeTzDate<TTzTimestamp>(isOptional);
+ return TTraits::template MakeTzDate<TTzTimestamp>(isOptional, args...);
case NUdf::EDataSlot::TzDate32:
- return TTraits::template MakeTzDate<TTzDate32>(isOptional);
+ return TTraits::template MakeTzDate<TTzDate32>(isOptional, args...);
case NUdf::EDataSlot::TzDatetime64:
- return TTraits::template MakeTzDate<TTzDatetime64>(isOptional);
+ return TTraits::template MakeTzDate<TTzDatetime64>(isOptional, args...);
case NUdf::EDataSlot::TzTimestamp64:
- return TTraits::template MakeTzDate<TTzTimestamp64>(isOptional);
+ return TTraits::template MakeTzDate<TTzTimestamp64>(isOptional, args...);
case NUdf::EDataSlot::Decimal: {
if constexpr (CanInstantiateBlockReaderForDecimal<TTraits>) {
- return MakeFixedSizeBlockReaderImpl<TTraits, NYql::NDecimal::TInt128>(isOptional);
+ return MakeFixedSizeBlockReaderImpl<TTraits, NYql::NDecimal::TInt128>(isOptional, args...);
} else {
Y_ENSURE(false, "Unsupported data slot");
}
@@ -692,13 +691,13 @@ std::unique_ptr<typename TTraits::TResult> MakeBlockReaderImpl(const ITypeInfoHe
TResourceTypeInspector resource(typeInfoHelper, type);
if (resource) {
- return TTraits::MakeResource(isOptional);
+ return TTraits::MakeResource(isOptional, args...);
}
TPgTypeInspector typePg(typeInfoHelper, type);
if (typePg) {
auto desc = typeInfoHelper.FindPgTypeDescription(typePg.GetTypeId());
- return TTraits::MakePg(*desc, pgBuilder);
+ return TTraits::MakePg(*desc, pgBuilder, args...);
}
Y_ENSURE(false, "Unsupported type");
diff --git a/yql/essentials/public/udf/arrow/ut/bit_util_ut.cpp b/yql/essentials/public/udf/arrow/ut/bit_util_ut.cpp
new file mode 100644
index 0000000000..601d3be7c8
--- /dev/null
+++ b/yql/essentials/public/udf/arrow/ut/bit_util_ut.cpp
@@ -0,0 +1,41 @@
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <yql/essentials/public/udf/arrow/bit_util.h>
+#include <util/random/random.h>
+
+#include <arrow/util/bit_util.h>
+
+using namespace NYql::NUdf;
+
+namespace {
+ void PerformTest(const ui8* testData, size_t offset, size_t length) {
+ std::vector<ui8> copied(arrow::BitUtil::BytesForBits(length) + 1, 0);
+ CopyDenseBitmap(copied.data(), testData, offset, length);
+
+ std::vector<ui8> origSparse(length), copiedSparse(length);
+ DecompressToSparseBitmap(origSparse.data(), testData, offset, length);
+ DecompressToSparseBitmap(copiedSparse.data(), copied.data(), 0, length);
+ for (size_t i = 0; i < length; i++) {
+ UNIT_ASSERT_EQUAL_C(origSparse[i], copiedSparse[i], "Expected the same data");
+ }
+ }
+}
+
+Y_UNIT_TEST_SUITE(CopyDenseBitmapTest) {
+ constexpr size_t testSize = 32;
+
+ Y_UNIT_TEST(Test) {
+ SetRandomSeed(0);
+
+ std::vector<ui8> testData(testSize);
+ for (size_t i = 0; i < testSize; i++) {
+ testData[i] = RandomNumber<ui8>();
+ }
+
+ for (size_t offset = 0; offset < testSize * 8; offset++) {
+ for (size_t length = 0; length <= testSize * 8 - offset; length++) {
+ PerformTest(testData.data(), offset, length);
+ }
+ }
+ }
+}
diff --git a/yql/essentials/public/udf/arrow/ut/ya.make b/yql/essentials/public/udf/arrow/ut/ya.make
index da52d50ded..ecd94b9c03 100644
--- a/yql/essentials/public/udf/arrow/ut/ya.make
+++ b/yql/essentials/public/udf/arrow/ut/ya.make
@@ -2,9 +2,11 @@ UNITTEST()
SRCS(
array_builder_ut.cpp
+ bit_util_ut.cpp
)
PEERDIR(
+ contrib/libs/apache/arrow
yql/essentials/public/udf/arrow
yql/essentials/public/udf/service/exception_policy
yql/essentials/sql/pg_dummy
diff --git a/yql/essentials/public/udf/arrow/util.cpp b/yql/essentials/public/udf/arrow/util.cpp
index 55a728207b..b36b4f9ce5 100644
--- a/yql/essentials/public/udf/arrow/util.cpp
+++ b/yql/essentials/public/udf/arrow/util.cpp
@@ -68,6 +68,18 @@ std::shared_ptr<arrow::Buffer> MakeDenseBitmapNegate(const ui8* srcSparse, size_
return bitmap;
}
+std::shared_ptr<arrow::Buffer> MakeDenseBitmapCopy(const ui8* src, size_t len, size_t offset, arrow::MemoryPool* pool) {
+ auto bitmap = AllocateBitmapWithReserve(len, pool);
+ CopyDenseBitmap(bitmap->mutable_data(), src, offset, len);
+ return bitmap;
+}
+
+std::shared_ptr<arrow::Buffer> MakeDenseFalseBitmap(int64_t len, arrow::MemoryPool* pool) {
+ auto bitmap = AllocateBitmapWithReserve(len, pool);
+ std::memset(bitmap->mutable_data(), 0, bitmap->size());
+ return bitmap;
+}
+
std::shared_ptr<arrow::ArrayData> DeepSlice(const std::shared_ptr<arrow::ArrayData>& data, size_t offset, size_t len) {
Y_ENSURE(data->length >= 0);
Y_ENSURE(offset + len <= (size_t)data->length);
diff --git a/yql/essentials/public/udf/arrow/util.h b/yql/essentials/public/udf/arrow/util.h
index b4068f10c9..f7bdb715f9 100644
--- a/yql/essentials/public/udf/arrow/util.h
+++ b/yql/essentials/public/udf/arrow/util.h
@@ -25,6 +25,9 @@ enum class EPgStringType {
std::shared_ptr<arrow::Buffer> AllocateBitmapWithReserve(size_t bitCount, arrow::MemoryPool* pool);
std::shared_ptr<arrow::Buffer> MakeDenseBitmap(const ui8* srcSparse, size_t len, arrow::MemoryPool* pool);
std::shared_ptr<arrow::Buffer> MakeDenseBitmapNegate(const ui8* srcSparse, size_t len, arrow::MemoryPool* pool);
+std::shared_ptr<arrow::Buffer> MakeDenseBitmapCopy(const ui8* src, size_t len, size_t offset, arrow::MemoryPool* pool);
+
+std::shared_ptr<arrow::Buffer> MakeDenseFalseBitmap(int64_t len, arrow::MemoryPool* pool);
/// \brief Recursive version of ArrayData::Slice() method
std::shared_ptr<arrow::ArrayData> DeepSlice(const std::shared_ptr<arrow::ArrayData>& data, size_t offset, size_t len);