diff options
author | aneporada <aneporada@ydb.tech> | 2023-11-13 19:58:31 +0300 |
---|---|---|
committer | aneporada <aneporada@ydb.tech> | 2023-11-13 20:56:23 +0300 |
commit | d16314fe6e0b4555fc0a741dc828de0f54b5c2b3 (patch) | |
tree | 60d8598be274def4ab6b79dd726e78885f9e4f40 | |
parent | 83fcb7ff092bebf3766e9f7b9bbace583eec5ef3 (diff) | |
download | ydb-d16314fe6e0b4555fc0a741dc828de0f54b5c2b3.tar.gz |
Support legacy s3 blocks-inside-struct
3 files changed, 221 insertions, 87 deletions
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp index 75f988fe85..2aad147ed1 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp @@ -867,6 +867,76 @@ bool HasOffset(const arrow::ArrayData& array, i64 expectedOffset) { AllOf(array.child_data, [&](const auto& child) { return HasOffset(*child, expectedOffset); }); } +bool IsUi64Scalar(const TBlockType* blockType) { + if (blockType->GetShape() != TBlockType::EShape::Scalar) { + return false; + } + + if (!blockType->GetItemType()->IsData()) { + return false; + } + + return static_cast<const TDataType*>(blockType->GetItemType())->GetDataSlot() == NUdf::EDataSlot::Uint64; +} + +bool IsLegacyStructBlock(const TType* type, ui32& blockLengthIndex, TVector<const TBlockType*>& items) { + items.clear(); + blockLengthIndex = Max<ui32>(); + if (!type->IsStruct()) { + return false; + } + const TStructType* structType = static_cast<const TStructType*>(type); + static const TStringBuf blockLenColumnName = "_yql_block_length"; + auto index = structType->FindMemberIndex(blockLenColumnName); + if (!index) { + return false; + } + + for (ui32 i = 0; i < structType->GetMembersCount(); i++) { + auto type = structType->GetMemberType(i); + if (!type->IsBlock()) { + return false; + } + const TBlockType* blockType = static_cast<const TBlockType*>(type); + items.push_back(blockType); + if (i == *index && !IsUi64Scalar(blockType)) { + return false; + } + } + blockLengthIndex = *index; + return true; +} + +bool IsMultiBlock(const TType* type, ui32& blockLengthIndex, TVector<const TBlockType*>& items) { + items.clear(); + blockLengthIndex = Max<ui32>(); + + if (!type->IsMulti()) { + return false; + } + + const TMultiType* multiType = static_cast<const TMultiType*>(type); + ui32 width = multiType->GetElementsCount(); + if (!width) { + return false; + } + + for (ui32 i = 0; i < width; i++) { + auto type = multiType->GetElementType(i); + if (!type->IsBlock()) { + return false; + } + const TBlockType* blockType = static_cast<const TBlockType*>(type); + items.push_back(blockType); + if (i == width - 1 && !IsUi64Scalar(blockType)) { + return false; + } + } + + blockLengthIndex = width - 1; + return true; +} + } // namespace template<bool Fast> @@ -972,54 +1042,33 @@ TValuePackerTransport<Fast>::TValuePackerTransport(const TType* type, arrow::Mem template<bool Fast> void TValuePackerTransport<Fast>::InitBlocks() { - if (!Type_->IsMulti()) { - return; - } - - const TMultiType* multiType = static_cast<const TMultiType*>(Type_); - ui32 width = multiType->GetElementsCount(); - if (!width) { - return; - } - - const TType* last = multiType->GetElementType(width - 1); - if (!last->IsBlock()) { - return; - } - - const TBlockType* blockLenType = static_cast<const TBlockType*>(multiType->GetElementType(width - 1)); - if (blockLenType->GetShape() != TBlockType::EShape::Scalar) { - return; - } - - if (!blockLenType->GetItemType()->IsData()) { - return; - } - - if (static_cast<const TDataType*>(blockLenType->GetItemType())->GetDataSlot() != NUdf::EDataSlot::Uint64) { - return; - } - - if (AnyOf(multiType->GetElements(), [](const auto& t) { return !t->IsBlock(); })) { + TVector<const TBlockType*> items; + if (IsLegacyStructBlock(Type_, BlockLenIndex_, items)) { + IsLegacyBlock_ = true; + } else if (!IsMultiBlock(Type_, BlockLenIndex_, items)) { return; } IsBlock_ = true; - ConvertedScalars_.resize(width - 1); - for (ui32 i = 0; i < width - 1; ++i) { - const TBlockType* itemType = static_cast<const TBlockType*>(multiType->GetElementType(i)); - BlockSerializers_.emplace_back(MakeBlockSerializer(TTypeInfoHelper(), itemType->GetItemType())); - BlockDeserializers_.emplace_back(MakeBlockDeserializer(TTypeInfoHelper(), itemType->GetItemType())); - if (itemType->GetShape() == TBlockType::EShape::Scalar) { - BlockReaders_.emplace_back(NYql::NUdf::MakeBlockReader(TTypeInfoHelper(), itemType->GetItemType())); - } else { - BlockReaders_.emplace_back(); + ConvertedScalars_.resize(items.size()); + BlockReaders_.resize(items.size()); + BlockSerializers_.resize(items.size()); + BlockDeserializers_.resize(items.size()); + for (ui32 i = 0; i < items.size(); ++i) { + if (i != BlockLenIndex_) { + const TBlockType* itemType = items[i]; + BlockSerializers_[i] = MakeBlockSerializer(TTypeInfoHelper(), itemType->GetItemType()); + BlockDeserializers_[i] = MakeBlockDeserializer(TTypeInfoHelper(), itemType->GetItemType()); + if (itemType->GetShape() == TBlockType::EShape::Scalar) { + BlockReaders_[i] = NYql::NUdf::MakeBlockReader(TTypeInfoHelper(), itemType->GetItemType()); + } } } } template<bool Fast> NUdf::TUnboxedValue TValuePackerTransport<Fast>::Unpack(TRope&& buf, const THolderFactory& holderFactory) const { + MKQL_ENSURE(!IsBlock_, "Unpack() should not be used for blocks"); const size_t totalSize = buf.GetSize(); TChunkedInputBuffer chunked(std::move(buf)); return DoUnpack<Fast>(Type_, chunked, totalSize, holderFactory, State_); @@ -1038,6 +1087,7 @@ void TValuePackerTransport<Fast>::UnpackBatch(TRope&& buf, const THolderFactory& template<bool Fast> TRope TValuePackerTransport<Fast>::Pack(const NUdf::TUnboxedValuePod& value) const { MKQL_ENSURE(ItemCount_ == 0, "Can not mix Pack() and AddItem() calls"); + MKQL_ENSURE(!IsBlock_, "Pack() should not be used for blocks"); TPagedBuffer::TPtr result = std::make_shared<TPagedBuffer>(); if constexpr (Fast) { PackImpl<Fast, false>(Type_, *result, value, State_); @@ -1065,6 +1115,11 @@ void TValuePackerTransport<Fast>::StartPack() { template<bool Fast> TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddItem(const NUdf::TUnboxedValuePod& value) { Y_DEBUG_ABORT_UNLESS(!Type_->IsMulti()); + if (IsLegacyBlock_) { + static_assert(sizeof(NUdf::TUnboxedValuePod) == sizeof(NUdf::TUnboxedValue)); + const NUdf::TUnboxedValuePod* values = static_cast<const NUdf::TUnboxedValuePod*>(value.GetElements()); + return AddWideItemBlocks(values, BlockSerializers_.size()); + } const TType* itemType = Type_; if (!ItemCount_) { StartPack(); @@ -1097,13 +1152,17 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItem(const NUdf template<bool Fast> TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(const NUdf::TUnboxedValuePod* values, ui32 width) { - const ui64 len = TArrowBlock::From(values[width - 1]).GetDatum().scalar_as<arrow::UInt64Scalar>().value; + MKQL_ENSURE(width == BlockSerializers_.size(), "Invalid width"); + const ui64 len = TArrowBlock::From(values[BlockLenIndex_]).GetDatum().scalar_as<arrow::UInt64Scalar>().value; auto metadataBuffer = std::make_shared<TBuffer>(); ui32 totalMetadataCount = 0; - for (size_t i = 0; i < width - 1; ++i) { - totalMetadataCount += BlockSerializers_[i]->ArrayMetadataCount(); + for (size_t i = 0; i < width; ++i) { + if (i != BlockLenIndex_) { + MKQL_ENSURE(BlockSerializers_[i], "Invalid serializer"); + totalMetadataCount += BlockSerializers_[i]->ArrayMetadataCount(); + } } // calculate approximate metadata size @@ -1130,10 +1189,12 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons const ui64 metadataFlags = 1 << 0; PackData<false>(metadataFlags, *metadataBuffer); - TVector<std::shared_ptr<arrow::ArrayData>> arrays; - arrays.reserve(width - 1); + TVector<std::shared_ptr<arrow::ArrayData>> arrays(width); // save reminder of original offset for each column - it is needed to properly handle offset in bitmaps - for (size_t i = 0; i < width - 1; ++i) { + for (size_t i = 0; i < width; ++i) { + if (i == BlockLenIndex_) { + continue; + } arrow::Datum datum = TArrowBlock::From(values[i]).GetDatum(); ui8 reminder = 0; if (datum.is_array()) { @@ -1142,16 +1203,17 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons // all offsets should be equal MKQL_ENSURE(HasOffset(*datum.array(), offset), "Unexpected offset in child data"); reminder = offset % 8; - arrays.emplace_back(datum.array()); + arrays[i] = datum.array(); } else { MKQL_ENSURE(datum.is_scalar(), "Expecting array or scalar"); if (!ConvertedScalars_[i]) { - const TType* itemType = static_cast<const TMultiType*>(Type_)->GetElementType(i); + const TType* itemType = IsLegacyBlock_ ? static_cast<const TStructType*>(Type_)->GetMemberType(i) : + static_cast<const TMultiType*>(Type_)->GetElementType(i); datum = MakeArrayFromScalar(*datum.scalar(), 1, static_cast<const TBlockType*>(itemType)->GetItemType(), ArrowPool_); MKQL_ENSURE(HasOffset(*datum.array(), 0), "Expected zero array offset after scalar is converted to array"); ConvertedScalars_[i] = datum.array(); } - arrays.emplace_back(ConvertedScalars_[i]); + arrays[i] = ConvertedScalars_[i]; } PackData<false>(reminder, *metadataBuffer); } @@ -1161,11 +1223,13 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons // save metadata itself ui32 savedMetadata = 0; - for (size_t i = 0; i < width - 1; ++i) { - BlockSerializers_[i]->StoreMetadata(*arrays[i], [&](ui64 meta) { - PackData<false>(meta, *metadataBuffer); - ++savedMetadata; - }); + for (size_t i = 0; i < width; ++i) { + if (i != BlockLenIndex_) { + BlockSerializers_[i]->StoreMetadata(*arrays[i], [&](ui64 meta) { + PackData<false>(meta, *metadataBuffer); + ++savedMetadata; + }); + } } MKQL_ENSURE(savedMetadata == totalMetadataCount, "Serialization metadata error"); @@ -1173,8 +1237,10 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons BlockBuffer_.Insert(BlockBuffer_.End(), NYql::MakeReadOnlyRope(metadataBuffer, metadataBuffer->data(), metadataBuffer->size())); // save buffers - for (size_t i = 0; i < width - 1; ++i) { - BlockSerializers_[i]->StoreArray(*arrays[i], BlockBuffer_); + for (size_t i = 0; i < width; ++i) { + if (i != BlockLenIndex_) { + BlockSerializers_[i]->StoreArray(*arrays[i], BlockBuffer_); + } } ++ItemCount_; return *this; @@ -1196,38 +1262,58 @@ void TValuePackerTransport<Fast>::UnpackBatchBlocks(TRope&& buf, const THolderFa MKQL_ENSURE(metadataFlags == 1, "Unsupported metadata flags"); // unpack array offsets - TVector<ui64> offsets; - for (size_t i = 0; i < BlockDeserializers_.size(); ++i) { - offsets.emplace_back(UnpackData<false, ui8>(chunked)); - MKQL_ENSURE(offsets.back() < 8, "Unexpected offset value"); + const ui32 width = BlockDeserializers_.size(); + MKQL_ENSURE(width > 0, "Invalid width"); + TVector<ui64> offsets(width); + for (ui32 i = 0; i < width; ++i) { + if (BlockDeserializers_[i]) { + offsets[i] = UnpackData<false, ui8>(chunked); + MKQL_ENSURE(offsets[i] < 8, "Unexpected offset value"); + } } // unpack metadata ui32 metaCount = UnpackData<false, ui32>(chunked); - for (auto& deserializer : BlockDeserializers_) { - deserializer->LoadMetadata([&]() -> ui64 { - MKQL_ENSURE(metaCount > 0, "No more metadata available"); - --metaCount; - return UnpackData<false, ui64>(chunked); - }); + for (ui32 i = 0; i < width; ++i) { + if (BlockDeserializers_[i]) { + BlockDeserializers_[i]->LoadMetadata([&]() -> ui64 { + MKQL_ENSURE(metaCount > 0, "No more metadata available"); + --metaCount; + return UnpackData<false, ui64>(chunked); + }); + } } MKQL_ENSURE(metaCount == 0, "Partial buffers read"); TRope ropeTail = chunked.ReleaseRope(); // unpack buffers - result.PushRow([&](ui32 i) { - if (i < BlockDeserializers_.size()) { + + auto producer = [&](ui32 i) { + MKQL_ENSURE(i < width, "Unexpected row index"); + if (i != BlockLenIndex_) { + MKQL_ENSURE(BlockDeserializers_[i], "Missing deserializer"); const bool isScalar = BlockReaders_[i] != nullptr; auto array = BlockDeserializers_[i]->LoadArray(ropeTail, isScalar ? 1 : len, offsets[i]); if (isScalar) { TBlockItem item = BlockReaders_[i]->GetItem(*array, 0); - const TBlockType* itemType = static_cast<const TBlockType*>(static_cast<const TMultiType*>(Type_)->GetElementType(i)); - return holderFactory.CreateArrowBlock(ConvertScalar(itemType->GetItemType(), item, ArrowPool_)); + const TType* itemType = IsLegacyBlock_ ? static_cast<const TStructType*>(Type_)->GetMemberType(i) : + static_cast<const TMultiType*>(Type_)->GetElementType(i); + return holderFactory.CreateArrowBlock(ConvertScalar(static_cast<const TBlockType*>(itemType)->GetItemType(), item, ArrowPool_)); } return holderFactory.CreateArrowBlock(array); } - MKQL_ENSURE(i == BlockDeserializers_.size(), "Unexpected row index"); return holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(len))); - }); + }; + + if (IsLegacyBlock_) { + NYql::NUdf::TUnboxedValue* valueItems; + auto structValue = holderFactory.CreateDirectArrayHolder(width, valueItems); + for (ui32 i = 0; i < width; ++i) { + valueItems[i] = producer(i); + } + result.emplace_back(std::move(structValue)); + } else { + result.PushRow(producer); + } buf = std::move(ropeTail); } } diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h index 52c8b10a25..f1948212bd 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h @@ -109,6 +109,8 @@ private: arrow::MemoryPool& ArrowPool_; bool IsBlock_ = false; + bool IsLegacyBlock_ = false; + ui32 BlockLenIndex_ = 0; TVector<std::unique_ptr<IBlockSerializer>> BlockSerializers_; TVector<std::unique_ptr<IBlockReader>> BlockReaders_; diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack_ut.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_pack_ut.cpp index 75669f9048..fd46cf1986 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack_ut.cpp +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack_ut.cpp @@ -628,7 +628,7 @@ protected: } } - void DoTestBlockPacking(ui64 offset, ui64 len) { + void DoTestBlockPacking(ui64 offset, ui64 len, bool legacyStruct) { if constexpr (Transport) { auto strType = PgmBuilder.NewDataType(NUdf::TDataType<char*>::Id); auto ui32Type = PgmBuilder.NewDataType(NUdf::TDataType<ui32>::Id); @@ -645,8 +645,18 @@ protected: auto blockOptTupleOptUi32StrType = PgmBuilder.NewBlockType(optTupleOptUi32StrType, TBlockType::EShape::Many); auto scalarUi64Type = PgmBuilder.NewBlockType(ui64Type, TBlockType::EShape::Scalar); - auto rowType = PgmBuilder.NewMultiType({ blockUi32Type, blockOptStrType, scalarOptStrType, blockOptTupleOptUi32StrType, scalarUi64Type }); - + auto rowType = + legacyStruct + ? PgmBuilder.NewStructType({ + {"A", blockUi32Type}, + {"B", blockOptStrType}, + {"_yql_block_length", scalarUi64Type}, + {"a", scalarOptStrType}, + {"b", blockOptTupleOptUi32StrType}, + }) + : PgmBuilder.NewMultiType( + {blockUi32Type, blockOptStrType, scalarOptStrType, + blockOptTupleOptUi32StrType, scalarUi64Type}); ui64 blockLen = 1000; UNIT_ASSERT_LE(offset + len, blockLen); @@ -672,11 +682,19 @@ protected: auto strbuf = std::make_shared<arrow::Buffer>((const ui8*)testScalarString.data(), testScalarString.size()); TVector<arrow::Datum> datums; - datums.emplace_back(builder1->Build(true)); - datums.emplace_back(builder2->Build(true)); - datums.emplace_back(arrow::Datum(std::make_shared<arrow::BinaryScalar>(strbuf))); - datums.emplace_back(builder3->Build(true)); - datums.emplace_back(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(blockLen))); + if (legacyStruct) { + datums.emplace_back(builder1->Build(true)); + datums.emplace_back(builder2->Build(true)); + datums.emplace_back(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(blockLen))); + datums.emplace_back(arrow::Datum(std::make_shared<arrow::BinaryScalar>(strbuf))); + datums.emplace_back(builder3->Build(true)); + } else { + datums.emplace_back(builder1->Build(true)); + datums.emplace_back(builder2->Build(true)); + datums.emplace_back(arrow::Datum(std::make_shared<arrow::BinaryScalar>(strbuf))); + datums.emplace_back(builder3->Build(true)); + datums.emplace_back(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(blockLen))); + } if (offset != 0 || len != blockLen) { for (auto& datum : datums) { @@ -691,7 +709,13 @@ protected: } TValuePackerType packer(false, rowType, ArrowPool_); - packer.AddWideItem(columns.data(), columns.size()); + if (legacyStruct) { + TUnboxedValueVector columnsCopy = columns; + NUdf::TUnboxedValue row = HolderFactory.VectorAsArray(columnsCopy); + packer.AddItem(row); + } else { + packer.AddWideItem(columns.data(), columns.size()); + } TRope packed = packer.Finish(); TUnboxedValueBatch unpacked(rowType); @@ -700,13 +724,23 @@ protected: UNIT_ASSERT_VALUES_EQUAL(unpacked.RowCount(), 1); TUnboxedValueVector unpackedColumns; - unpacked.ForEachRowWide([&](const NYql::NUdf::TUnboxedValue* values, ui32 count) { - unpackedColumns.insert(unpackedColumns.end(), values, values + count); - }); + if (legacyStruct) { + auto elements = unpacked.Head()->GetElements(); + unpackedColumns.insert(unpackedColumns.end(), elements, elements + columns.size()); + } else { + unpacked.ForEachRowWide([&](const NYql::NUdf::TUnboxedValue* values, ui32 count) { + unpackedColumns.insert(unpackedColumns.end(), values, values + count); + }); + } UNIT_ASSERT_VALUES_EQUAL(unpackedColumns.size(), columns.size()); - UNIT_ASSERT_VALUES_EQUAL(TArrowBlock::From(unpackedColumns.back()).GetDatum().scalar_as<arrow::UInt64Scalar>().value, blockLen); - UNIT_ASSERT_VALUES_EQUAL(TArrowBlock::From(unpackedColumns[2]).GetDatum().scalar_as<arrow::BinaryScalar>().value->ToString(), testScalarString); + if (legacyStruct) { + UNIT_ASSERT_VALUES_EQUAL(TArrowBlock::From(unpackedColumns[2]).GetDatum().scalar_as<arrow::UInt64Scalar>().value, blockLen); + UNIT_ASSERT_VALUES_EQUAL(TArrowBlock::From(unpackedColumns[3]).GetDatum().scalar_as<arrow::BinaryScalar>().value->ToString(), testScalarString); + } else { + UNIT_ASSERT_VALUES_EQUAL(TArrowBlock::From(unpackedColumns.back()).GetDatum().scalar_as<arrow::UInt64Scalar>().value, blockLen); + UNIT_ASSERT_VALUES_EQUAL(TArrowBlock::From(unpackedColumns[2]).GetDatum().scalar_as<arrow::BinaryScalar>().value->ToString(), testScalarString); + } auto reader1 = MakeBlockReader(TTypeInfoHelper(), ui32Type); @@ -725,7 +759,7 @@ protected: UNIT_ASSERT(!b2); } - TBlockItem b3 = reader3->GetItem(*TArrowBlock::From(unpackedColumns[3]).GetDatum().array(), i - offset); + TBlockItem b3 = reader3->GetItem(*TArrowBlock::From(unpackedColumns[legacyStruct ? 4 : 3]).GetDatum().array(), i - offset); if (i % 7) { auto elements = b3.GetElements(); if (i % 2) { @@ -742,11 +776,19 @@ protected: } void TestBlockPacking() { - DoTestBlockPacking(0, 1000); + DoTestBlockPacking(0, 1000, false); } void TestBlockPackingSliced() { - DoTestBlockPacking(19, 623); + DoTestBlockPacking(19, 623, false); + } + + void TestLegacyBlockPacking() { + DoTestBlockPacking(0, 1000, true); + } + + void TestLegacyBlockPackingSliced() { + DoTestBlockPacking(19, 623, true); } private: TIntrusivePtr<NMiniKQL::IFunctionRegistry> FunctionRegistry; @@ -826,6 +868,8 @@ class TMiniKQLComputationNodeTransportPackTest: public TMiniKQLComputationNodePa UNIT_TEST(TestIncrementalPacking); UNIT_TEST(TestBlockPacking); UNIT_TEST(TestBlockPackingSliced); + UNIT_TEST(TestLegacyBlockPacking); + UNIT_TEST(TestLegacyBlockPackingSliced); UNIT_TEST_SUITE_END(); }; @@ -852,6 +896,8 @@ class TMiniKQLComputationNodeTransportFastPackTest: public TMiniKQLComputationNo UNIT_TEST(TestIncrementalPacking); UNIT_TEST(TestBlockPacking); UNIT_TEST(TestBlockPackingSliced); + UNIT_TEST(TestLegacyBlockPacking); + UNIT_TEST(TestLegacyBlockPackingSliced); UNIT_TEST_SUITE_END(); }; |