diff options
author | vvvv <vvvv@ydb.tech> | 2022-12-29 18:34:38 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2022-12-29 18:34:38 +0300 |
commit | 73b247f4a0932b7d9fb693cfcc28965862abb20a (patch) | |
tree | 32a64dd51a010930c7781108a284e3edcd842177 | |
parent | 52fd853619c9f751019975f7ce6a4fe2ce906fae (diff) | |
download | ydb-73b247f4a0932b7d9fb693cfcc28965862abb20a.tar.gz |
nested optionals
11 files changed, 212 insertions, 42 deletions
diff --git a/ydb/library/yql/minikql/arrow/mkql_functions.cpp b/ydb/library/yql/minikql/arrow/mkql_functions.cpp index 2f9250f88a..92a553136d 100644 --- a/ydb/library/yql/minikql/arrow/mkql_functions.cpp +++ b/ydb/library/yql/minikql/arrow/mkql_functions.cpp @@ -12,10 +12,10 @@ namespace NKikimr::NMiniKQL { -bool ConvertInputArrowType(TType* blockType, bool& isOptional, arrow::ValueDescr& descr) { +bool ConvertInputArrowType(TType* blockType, arrow::ValueDescr& descr) { auto asBlockType = AS_TYPE(TBlockType, blockType); descr.shape = asBlockType->GetShape() == TBlockType::EShape::Scalar ? arrow::ValueDescr::SCALAR : arrow::ValueDescr::ARRAY; - return ConvertArrowType(asBlockType->GetItemType(), isOptional, descr.type); + return ConvertArrowType(asBlockType->GetItemType(), descr.type); } class TOutputTypeVisitor : public arrow::TypeVisitor @@ -175,13 +175,12 @@ bool FindArrowFunction(TStringBuf name, const TArrayRef<TType*>& inputTypes, TTy } bool HasArrowCast(TType* from, TType* to) { - bool isOptional; std::shared_ptr<arrow::DataType> fromArrowType, toArrowType; - if (!ConvertArrowType(from, isOptional, fromArrowType)) { + if (!ConvertArrowType(from, fromArrowType)) { return false; } - if (!ConvertArrowType(to, isOptional, toArrowType)) { + if (!ConvertArrowType(to, toArrowType)) { return false; } diff --git a/ydb/library/yql/minikql/arrow/mkql_functions.h b/ydb/library/yql/minikql/arrow/mkql_functions.h index f33443debe..225822be3f 100644 --- a/ydb/library/yql/minikql/arrow/mkql_functions.h +++ b/ydb/library/yql/minikql/arrow/mkql_functions.h @@ -8,6 +8,6 @@ namespace NKikimr::NMiniKQL { class IBuiltinFunctionRegistry; bool FindArrowFunction(TStringBuf name, const TArrayRef<TType*>& inputTypes, TType* outputType, const IBuiltinFunctionRegistry& registry); -bool ConvertInputArrowType(TType* blockType, bool& isOptional, arrow::ValueDescr& descr); +bool ConvertInputArrowType(TType* blockType, arrow::ValueDescr& descr); bool HasArrowCast(TType* from, TType* to); } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp index 9d93077092..8cce82cc5d 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp @@ -878,10 +878,10 @@ std::unique_ptr<typename TTag::TPreparedAggregator> PrepareAvgOverInput(TTupleTy TVector<TType*> tupleElements = { doubleType, ui64Type }; auto avgRetType = TTupleType::Create(2, tupleElements.data(), env); std::shared_ptr<arrow::DataType> builderDataType; - bool isOptional; - MKQL_ENSURE(ConvertArrowType(avgRetType, isOptional, builderDataType), "Unsupported builder type"); + MKQL_ENSURE(ConvertArrowType(avgRetType, builderDataType), "Unsupported builder type"); auto argType = AS_TYPE(TBlockType, tupleType->GetElementType(argColumn))->GetItemType(); + bool isOptional; auto dataType = UnpackOptionalData(argType, isOptional); switch (*dataType->GetDataSlot()) { case NUdf::EDataSlot::Int8: diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp index 3fcb0f1523..ef6914c379 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp @@ -19,7 +19,7 @@ namespace { bool AlwaysUseChunks(const TType* type) { if (type->IsOptional()) { - type = AS_TYPE(TOptionalType, type)->GetItemType(); + return AlwaysUseChunks(AS_TYPE(TOptionalType, type)->GetItemType()); } if (type->IsTuple()) { @@ -42,8 +42,7 @@ bool AlwaysUseChunks(const TType* type) { std::shared_ptr<arrow::DataType> GetArrowType(TType* type) { std::shared_ptr<arrow::DataType> result; - bool isOptional; - Y_VERIFY(ConvertArrowType(type, isOptional, result)); + Y_VERIFY(ConvertArrowType(type, result)); return result; } @@ -629,6 +628,79 @@ private: std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder; }; +class TExternalOptionalBlockBuilder : public TBlockBuilderBase { +public: + TExternalOptionalBlockBuilder(TType* type, arrow::MemoryPool& pool, size_t maxLen, std::unique_ptr<TBlockBuilderBase>&& inner) + : TBlockBuilderBase(type, pool, maxLen) + , Inner(std::move(inner)) + { + Reserve(); + } + + void DoAdd(NUdf::TUnboxedValuePod value) final { + if (!value) { + NullBuilder->UnsafeAppend(0); + Inner->AddDefault(); + return; + } + + NullBuilder->UnsafeAppend(1); + Inner->Add(value.GetOptionalValue()); + } + + void DoAddDefault() final { + NullBuilder->UnsafeAppend(1); + Inner->AddDefault(); + } + + void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final { + Y_VERIFY(!array.buffers.empty()); + Y_VERIFY(array.child_data.size() == 1); + + if (array.buffers.front()) { + ui8* dstBitmap = NullBuilder->End(); + CompressAsSparseBitmap(array.GetValues<ui8>(0, 0), array.offset, sparseBitmap, dstBitmap, array.length); + NullBuilder->UnsafeAdvance(popCount); + } else { + NullBuilder->UnsafeAppend(popCount, 1); + } + + Inner->AddMany(*array.child_data[0], popCount, sparseBitmap, array.length); + } + + TBlockArrayTree::Ptr DoBuildTree(bool finish) final { + TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>(); + + std::shared_ptr<arrow::Buffer> nullBitmap; + const size_t length = GetCurrLen(); + MKQL_ENSURE(length == NullBuilder->Length(), "Unexpected NullBuilder length"); + nullBitmap = NullBuilder->Finish(); + nullBitmap = MakeDenseBitmap(nullBitmap->data(), length, Pool); + + Y_VERIFY(length); + result->Payload.push_back(arrow::ArrayData::Make(GetArrowType(Type), length, { nullBitmap })); + result->Children.emplace_back(Inner->BuildTree(finish)); + + if (!finish) { + Reserve(); + } + + return result; + } + +private: + void Reserve() { + NullBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool); + NullBuilder->Reserve(MaxLen + 1); + } + +private: + std::unique_ptr<TBlockBuilderBase> Inner; + std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder; +}; + +std::unique_ptr<TBlockBuilderBase> MakeBlockBuilderBase(TType* type, arrow::MemoryPool& pool, size_t maxBlockLength); + template<bool Nullable> std::unique_ptr<TBlockBuilderBase> MakeBlockBuilderImpl(TType* type, arrow::MemoryPool& pool, size_t maxLen) { if constexpr (Nullable) { @@ -640,9 +712,7 @@ std::unique_ptr<TBlockBuilderBase> MakeBlockBuilderImpl(TType* type, arrow::Memo TVector<std::unique_ptr<TBlockBuilderBase>> children; for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { TType* childType = tupleType->GetElementType(i); - auto childBuilder = childType->IsOptional() ? - MakeBlockBuilderImpl<true>(childType, pool, maxLen) : - MakeBlockBuilderImpl<false>(childType, pool, maxLen); + auto childBuilder = MakeBlockBuilderBase(childType, pool, maxLen); children.push_back(std::move(childBuilder)); } @@ -689,12 +759,46 @@ std::unique_ptr<TBlockBuilderBase> MakeBlockBuilderImpl(TType* type, arrow::Memo MKQL_ENSURE(false, "Unsupported type"); } +std::unique_ptr<TBlockBuilderBase> MakeBlockBuilderBase(TType* type, arrow::MemoryPool& pool, size_t maxBlockLength) { + TType* unpacked = type; + if (type->IsOptional()) { + unpacked = AS_TYPE(TOptionalType, type)->GetItemType(); + } + + if (unpacked->IsOptional()) { + // at least 2 levels of optionals + ui32 nestLevel = 0; + auto currentType = type; + auto previousType = type; + TVector<TType*> types; + do { + ++nestLevel; + types.push_back(currentType); + previousType = currentType; + currentType = AS_TYPE(TOptionalType, currentType)->GetItemType(); + } while (currentType->IsOptional()); + + auto builder = MakeBlockBuilderBase(previousType, pool, maxBlockLength); + for (ui32 i = 1; i < nestLevel; ++i) { + builder = std::make_unique<TExternalOptionalBlockBuilder>(types[nestLevel - 1 - i], pool, maxBlockLength, std::move(builder)); + } + + return builder; + } else { + if (type->IsOptional()) { + return MakeBlockBuilderImpl<true>(type, pool, maxBlockLength); + } else { + return MakeBlockBuilderImpl<false>(type, pool, maxBlockLength); + } + } +} + } // namespace size_t CalcMaxBlockItemSize(const TType* type) { // we do not count block bitmap size if (type->IsOptional()) { - type = AS_TYPE(TOptionalType, type)->GetItemType(); + return CalcMaxBlockItemSize(AS_TYPE(TOptionalType, type)->GetItemType()); } if (type->IsTuple()) { @@ -743,10 +847,7 @@ size_t CalcMaxBlockItemSize(const TType* type) { } std::unique_ptr<IBlockBuilder> MakeBlockBuilder(TType* type, arrow::MemoryPool& pool, size_t maxBlockLength) { - if (type->IsOptional()) { - return MakeBlockBuilderImpl<true>(type, pool, maxBlockLength); - } - return MakeBlockBuilderImpl<false>(type, pool, maxBlockLength); + return MakeBlockBuilderBase(type, pool, maxBlockLength); } } // namespace NMiniKQL diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp index 7d13b9e828..d8b4aa8b43 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp @@ -22,9 +22,8 @@ namespace NMiniKQL { namespace { arrow::ValueDescr ToValueDescr(TType* type) { - bool isOptional; arrow::ValueDescr ret; - MKQL_ENSURE(ConvertInputArrowType(type, isOptional, ret), "can't get arrow type"); + MKQL_ENSURE(ConvertInputArrowType(type, ret), "can't get arrow type"); return ret; } @@ -197,9 +196,8 @@ private: } static const arrow::compute::Function& ResolveFunction(TType* to) { - bool isOptional; std::shared_ptr<arrow::DataType> type; - MKQL_ENSURE(ConvertArrowType(to, isOptional, type), "can't get arrow type"); + MKQL_ENSURE(ConvertArrowType(to, type), "can't get arrow type"); auto function = ARROW_RESULT(arrow::compute::GetCastFunction(type)); MKQL_ENSURE(function != nullptr, "missing function"); diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp index a39c38d2cd..5c6e4585f0 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp @@ -150,9 +150,58 @@ private: TPlainContainerCache Cache; }; +class TExternalOptionalBlockReader : public TBlockReaderBase { +public: + TExternalOptionalBlockReader(std::unique_ptr<TBlockReaderBase>&& inner) + : Inner(std::move(inner)) + {} + + NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) final { + if (data.GetNullCount() > 0 && !arrow::BitUtil::GetBit(data.GetValues<uint8_t>(0, 0), index + data.offset)) { + return {}; + } + + return Inner->Get(*data.child_data[0], index).MakeOptional(); + } + + NUdf::TUnboxedValuePod GetScalar(const arrow::Scalar& scalar) final { + if (!scalar.is_valid) { + return {}; + } + + const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(scalar); + return Inner->GetScalar(*structScalar.value[0]).MakeOptional(); + } + +private: + std::unique_ptr<TBlockReaderBase> Inner; +}; + std::unique_ptr<TBlockReaderBase> MakeBlockReaderBase(TType* type, const THolderFactory& holderFactory) { + TType* unpacked = type; if (type->IsOptional()) { - type = AS_TYPE(TOptionalType, type)->GetItemType(); + unpacked = AS_TYPE(TOptionalType, type)->GetItemType(); + } + + if (unpacked->IsOptional()) { + // at least 2 levels of optionals + ui32 nestLevel = 0; + auto currentType = type; + auto previousType = type; + do { + ++nestLevel; + previousType = currentType; + currentType = AS_TYPE(TOptionalType, currentType)->GetItemType(); + } while (currentType->IsOptional()); + + std::unique_ptr<TBlockReaderBase> reader = MakeBlockReaderBase(previousType, holderFactory); + for (ui32 i = 1; i < nestLevel; ++i) { + reader = std::make_unique<TExternalOptionalBlockReader>(std::move(reader)); + } + + return reader; + } else { + type = unpacked; } if (type->IsTuple()) { diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp index 0ced6726b6..55dc51100f 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp @@ -321,9 +321,8 @@ public: , Arg_(arg) , Type_(type) { - bool isOptional; std::shared_ptr<arrow::DataType> arrowType; - MKQL_ENSURE(ConvertArrowType(Type_, isOptional, arrowType), "Unsupported type of scalar"); + MKQL_ENSURE(ConvertArrowType(Type_, arrowType), "Unsupported type of scalar"); } NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { @@ -339,9 +338,8 @@ private: arrow::Datum ConvertScalar(TType* type, const NUdf::TUnboxedValuePod& value, TComputationContext& ctx) const { if (!value) { - bool isOptional; std::shared_ptr<arrow::DataType> arrowType; - MKQL_ENSURE(ConvertArrowType(type, isOptional, arrowType), "Unsupported type of scalar"); + MKQL_ENSURE(ConvertArrowType(type, arrowType), "Unsupported type of scalar"); return arrow::MakeNullScalar(arrowType); } @@ -351,9 +349,8 @@ private: if (type->IsTuple()) { auto tupleType = AS_TYPE(TTupleType, type); - bool isOptional; std::shared_ptr<arrow::DataType> arrowType; - MKQL_ENSURE(ConvertArrowType(type, isOptional, arrowType), "Unsupported type of scalar"); + MKQL_ENSURE(ConvertArrowType(type, arrowType), "Unsupported type of scalar"); std::vector<std::shared_ptr<arrow::Scalar>> arrowValue; for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { diff --git a/ydb/library/yql/minikql/mkql_type_builder.cpp b/ydb/library/yql/minikql/mkql_type_builder.cpp index dba908a706..badbe461ea 100644 --- a/ydb/library/yql/minikql/mkql_type_builder.cpp +++ b/ydb/library/yql/minikql/mkql_type_builder.cpp @@ -1361,19 +1361,48 @@ bool ConvertArrowType(NUdf::EDataSlot slot, std::shared_ptr<arrow::DataType>& ty } } -bool ConvertArrowType(TType* itemType, bool& isOptional, std::shared_ptr<arrow::DataType>& type) { +bool ConvertArrowType(TType* itemType, std::shared_ptr<arrow::DataType>& type) { + bool isOptional; auto unpacked = UnpackOptional(itemType, isOptional); + if (unpacked->IsOptional()) { + // at least 2 levels of optionals + ui32 nestLevel = 0; + auto currentType = itemType; + auto previousType = itemType; + do { + ++nestLevel; + previousType = currentType; + currentType = AS_TYPE(TOptionalType, currentType)->GetItemType(); + } while (currentType->IsOptional()); + + // previousType is always Optional + std::shared_ptr<arrow::DataType> innerArrowType; + if (!ConvertArrowType(previousType, innerArrowType)) { + return false; + } + + for (ui32 i = 1; i < nestLevel; ++i) { + // wrap as one nullable field in struct + std::vector<std::shared_ptr<arrow::Field>> fields; + fields.emplace_back(std::make_shared<arrow::Field>("opt", innerArrowType, true)); + innerArrowType = std::make_shared<arrow::StructType>(fields); + } + + type = innerArrowType; + return true; + } + if (unpacked->IsTuple()) { auto tupleType = AS_TYPE(TTupleType, unpacked); std::vector<std::shared_ptr<arrow::Field>> fields; for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { std::shared_ptr<arrow::DataType> childType; - bool isChildOptional; - if (!ConvertArrowType(tupleType->GetElementType(i), isChildOptional, childType)) { + auto elementType = tupleType->GetElementType(i); + if (!ConvertArrowType(elementType, childType)) { return false; } - fields.emplace_back(std::make_shared<arrow::Field>("field" + ToString(i), childType, isChildOptional)); + fields.emplace_back(std::make_shared<arrow::Field>("field" + ToString(i), childType, elementType->IsOptional())); } type = std::make_shared<arrow::StructType>(fields); @@ -1805,9 +1834,8 @@ const NYql::NUdf::TPgTypeDescription* TTypeInfoHelper::FindPgTypeDescription(ui3 } NUdf::IArrowType::TPtr TTypeInfoHelper::MakeArrowType(const NUdf::TType* type) const { - bool isOptional; std::shared_ptr<arrow::DataType> arrowType; - if (!ConvertArrowType(const_cast<TType*>(static_cast<const TType*>(type)), isOptional, arrowType)) { + if (!ConvertArrowType(const_cast<TType*>(static_cast<const TType*>(type)), arrowType)) { return nullptr; } diff --git a/ydb/library/yql/minikql/mkql_type_builder.h b/ydb/library/yql/minikql/mkql_type_builder.h index 7e5e95bcba..af6ba7e315 100644 --- a/ydb/library/yql/minikql/mkql_type_builder.h +++ b/ydb/library/yql/minikql/mkql_type_builder.h @@ -10,7 +10,7 @@ namespace NKikimr { namespace NMiniKQL { -bool ConvertArrowType(TType* itemType, bool& isOptional, std::shared_ptr<arrow::DataType>& type); +bool ConvertArrowType(TType* itemType, std::shared_ptr<arrow::DataType>& type); bool ConvertArrowType(NUdf::EDataSlot slot, std::shared_ptr<arrow::DataType>& type); class TArrowType : public NUdf::IArrowType { diff --git a/ydb/library/yql/providers/common/arrow_resolve/yql_simple_arrow_resolver.cpp b/ydb/library/yql/providers/common/arrow_resolve/yql_simple_arrow_resolver.cpp index 418acc486d..4341422d2b 100644 --- a/ydb/library/yql/providers/common/arrow_resolve/yql_simple_arrow_resolver.cpp +++ b/ydb/library/yql/providers/common/arrow_resolve/yql_simple_arrow_resolver.cpp @@ -65,9 +65,8 @@ private: for (const auto& type : types) { TNullOutput null; auto mkqlType = NCommon::BuildType(*type, pgmBuilder, null); - bool isOptional; std::shared_ptr<arrow::DataType> arrowType; - if (!ConvertArrowType(mkqlType, isOptional, arrowType)) { + if (!ConvertArrowType(mkqlType, arrowType)) { return EStatus::NOT_FOUND; } } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 9db30c0043..532667ab41 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -1486,11 +1486,10 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( } auto memberType = extraStructType->GetMemberType(i); - bool isOptional; std::shared_ptr<arrow::DataType> dataType; - YQL_ENSURE(ConvertArrowType(memberType, isOptional, dataType), "Unsupported arrow type"); - THROW_ARROW_NOT_OK(builder.AddField(std::make_shared<arrow::Field>(std::string(extraStructType->GetMemberName(i)), dataType, isOptional))); + YQL_ENSURE(ConvertArrowType(memberType, dataType), "Unsupported arrow type"); + THROW_ARROW_NOT_OK(builder.AddField(std::make_shared<arrow::Field>(std::string(extraStructType->GetMemberName(i)), dataType, memberType->IsOptional()))); readSpec->ColumnReorder.push_back(i); } |