summaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp
diff options
context:
space:
mode:
authorgrigoriypisar <[email protected]>2025-03-19 16:51:27 +0300
committergrigoriypisar <[email protected]>2025-03-19 17:17:00 +0300
commit8abfb3cd4a6f7251be9cf73563349a97dc8f8ea0 (patch)
treeb4e3e6ccd8526a5822b0e1cb76d014657af26e25 /yql/essentials/minikql/computation/mkql_computation_node_pack.cpp
parent5ccc45c97200ac3e0cd3464f37fb931163ac0214 (diff)
YQL mkql blocks transport, added triming by MinFillPercentage
Added block trimming in transport commit_hash:9794613300322045a81a9b40d4ebe519d30937a4
Diffstat (limited to 'yql/essentials/minikql/computation/mkql_computation_node_pack.cpp')
-rw-r--r--yql/essentials/minikql/computation/mkql_computation_node_pack.cpp23
1 files changed, 17 insertions, 6 deletions
diff --git a/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp b/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp
index 00bd3fdad25..ace74a62df9 100644
--- a/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp
+++ b/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp
@@ -1067,28 +1067,28 @@ TStringBuf TValuePackerGeneric<Fast>::Pack(const NUdf::TUnboxedValuePod& value)
// Transport packer
template<bool Fast>
-TValuePackerTransport<Fast>::TValuePackerTransport(bool stable, const TType* type, arrow::MemoryPool* pool)
+TValuePackerTransport<Fast>::TValuePackerTransport(bool stable, const TType* type, arrow::MemoryPool* pool, TMaybe<ui8> minFillPercentage)
: Type_(type)
, State_(ScanTypeProperties(Type_, false))
, IncrementalState_(ScanTypeProperties(Type_, true))
, ArrowPool_(pool ? *pool : *NYql::NUdf::GetYqlMemoryPool())
{
MKQL_ENSURE(!stable, "Stable packing is not supported");
- InitBlocks();
+ InitBlocks(minFillPercentage);
}
template<bool Fast>
-TValuePackerTransport<Fast>::TValuePackerTransport(const TType* type, arrow::MemoryPool* pool)
+TValuePackerTransport<Fast>::TValuePackerTransport(const TType* type, arrow::MemoryPool* pool, TMaybe<ui8> minFillPercentage)
: Type_(type)
, State_(ScanTypeProperties(Type_, false))
, IncrementalState_(ScanTypeProperties(Type_, true))
, ArrowPool_(pool ? *pool : *NYql::NUdf::GetYqlMemoryPool())
{
- InitBlocks();
+ InitBlocks(minFillPercentage);
}
template<bool Fast>
-void TValuePackerTransport<Fast>::InitBlocks() {
+void TValuePackerTransport<Fast>::InitBlocks(TMaybe<ui8> minFillPercentage) {
TVector<const TBlockType*> items;
if (IsLegacyStructBlock(Type_, BlockLenIndex_, items)) {
IsLegacyBlock_ = true;
@@ -1096,6 +1096,11 @@ void TValuePackerTransport<Fast>::InitBlocks() {
return;
}
+ const TBlockSerializerParams serializerParams = {
+ .Pool = &ArrowPool_,
+ .MinFillPercentage = minFillPercentage
+ };
+
IsBlock_ = true;
ConvertedScalars_.resize(items.size());
BlockReaders_.resize(items.size());
@@ -1104,7 +1109,7 @@ void TValuePackerTransport<Fast>::InitBlocks() {
for (ui32 i = 0; i < items.size(); ++i) {
if (i != BlockLenIndex_) {
const TBlockType* itemType = items[i];
- BlockSerializers_[i] = MakeBlockSerializer(TTypeInfoHelper(), itemType->GetItemType());
+ BlockSerializers_[i] = MakeBlockSerializer(TTypeInfoHelper(), itemType->GetItemType(), serializerParams);
BlockDeserializers_[i] = MakeBlockDeserializer(TTypeInfoHelper(), itemType->GetItemType());
if (itemType->GetShape() == TBlockType::EShape::Scalar) {
BlockReaders_[i] = NYql::NUdf::MakeBlockReader(TTypeInfoHelper(), itemType->GetItemType());
@@ -1114,6 +1119,12 @@ void TValuePackerTransport<Fast>::InitBlocks() {
}
template<bool Fast>
+void TValuePackerTransport<Fast>::SetMinFillPercentage(TMaybe<ui8> minFillPercentage) {
+ MKQL_ENSURE(IsBlock_, "SetMinFillPercentage() can be used only for blocks");
+ InitBlocks(minFillPercentage);
+}
+
+template<bool Fast>
NUdf::TUnboxedValue TValuePackerTransport<Fast>::Unpack(TChunkedBuffer&& buf, const THolderFactory& holderFactory) const {
MKQL_ENSURE(!IsBlock_, "Unpack() should not be used for blocks");
const size_t totalSize = buf.Size();