diff options
author | grigoriypisar <[email protected]> | 2025-03-19 16:51:27 +0300 |
---|---|---|
committer | grigoriypisar <[email protected]> | 2025-03-19 17:17:00 +0300 |
commit | 8abfb3cd4a6f7251be9cf73563349a97dc8f8ea0 (patch) | |
tree | b4e3e6ccd8526a5822b0e1cb76d014657af26e25 /yql/essentials/minikql/computation/mkql_computation_node_pack.cpp | |
parent | 5ccc45c97200ac3e0cd3464f37fb931163ac0214 (diff) |
YQL mkql blocks transport, added triming by MinFillPercentage
Added block trimming in transport
commit_hash:9794613300322045a81a9b40d4ebe519d30937a4
Diffstat (limited to 'yql/essentials/minikql/computation/mkql_computation_node_pack.cpp')
-rw-r--r-- | yql/essentials/minikql/computation/mkql_computation_node_pack.cpp | 23 |
1 files changed, 17 insertions, 6 deletions
diff --git a/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp b/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp index 00bd3fdad25..ace74a62df9 100644 --- a/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp +++ b/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp @@ -1067,28 +1067,28 @@ TStringBuf TValuePackerGeneric<Fast>::Pack(const NUdf::TUnboxedValuePod& value) // Transport packer template<bool Fast> -TValuePackerTransport<Fast>::TValuePackerTransport(bool stable, const TType* type, arrow::MemoryPool* pool) +TValuePackerTransport<Fast>::TValuePackerTransport(bool stable, const TType* type, arrow::MemoryPool* pool, TMaybe<ui8> minFillPercentage) : Type_(type) , State_(ScanTypeProperties(Type_, false)) , IncrementalState_(ScanTypeProperties(Type_, true)) , ArrowPool_(pool ? *pool : *NYql::NUdf::GetYqlMemoryPool()) { MKQL_ENSURE(!stable, "Stable packing is not supported"); - InitBlocks(); + InitBlocks(minFillPercentage); } template<bool Fast> -TValuePackerTransport<Fast>::TValuePackerTransport(const TType* type, arrow::MemoryPool* pool) +TValuePackerTransport<Fast>::TValuePackerTransport(const TType* type, arrow::MemoryPool* pool, TMaybe<ui8> minFillPercentage) : Type_(type) , State_(ScanTypeProperties(Type_, false)) , IncrementalState_(ScanTypeProperties(Type_, true)) , ArrowPool_(pool ? *pool : *NYql::NUdf::GetYqlMemoryPool()) { - InitBlocks(); + InitBlocks(minFillPercentage); } template<bool Fast> -void TValuePackerTransport<Fast>::InitBlocks() { +void TValuePackerTransport<Fast>::InitBlocks(TMaybe<ui8> minFillPercentage) { TVector<const TBlockType*> items; if (IsLegacyStructBlock(Type_, BlockLenIndex_, items)) { IsLegacyBlock_ = true; @@ -1096,6 +1096,11 @@ void TValuePackerTransport<Fast>::InitBlocks() { return; } + const TBlockSerializerParams serializerParams = { + .Pool = &ArrowPool_, + .MinFillPercentage = minFillPercentage + }; + IsBlock_ = true; ConvertedScalars_.resize(items.size()); BlockReaders_.resize(items.size()); @@ -1104,7 +1109,7 @@ void TValuePackerTransport<Fast>::InitBlocks() { for (ui32 i = 0; i < items.size(); ++i) { if (i != BlockLenIndex_) { const TBlockType* itemType = items[i]; - BlockSerializers_[i] = MakeBlockSerializer(TTypeInfoHelper(), itemType->GetItemType()); + BlockSerializers_[i] = MakeBlockSerializer(TTypeInfoHelper(), itemType->GetItemType(), serializerParams); BlockDeserializers_[i] = MakeBlockDeserializer(TTypeInfoHelper(), itemType->GetItemType()); if (itemType->GetShape() == TBlockType::EShape::Scalar) { BlockReaders_[i] = NYql::NUdf::MakeBlockReader(TTypeInfoHelper(), itemType->GetItemType()); @@ -1114,6 +1119,12 @@ void TValuePackerTransport<Fast>::InitBlocks() { } template<bool Fast> +void TValuePackerTransport<Fast>::SetMinFillPercentage(TMaybe<ui8> minFillPercentage) { + MKQL_ENSURE(IsBlock_, "SetMinFillPercentage() can be used only for blocks"); + InitBlocks(minFillPercentage); +} + +template<bool Fast> NUdf::TUnboxedValue TValuePackerTransport<Fast>::Unpack(TChunkedBuffer&& buf, const THolderFactory& holderFactory) const { MKQL_ENSURE(!IsBlock_, "Unpack() should not be used for blocks"); const size_t totalSize = buf.Size(); |