diff options
author | nadya02 <nadya02@yandex-team.com> | 2024-07-23 18:23:59 +0300 |
---|---|---|
committer | nadya02 <nadya02@yandex-team.com> | 2024-07-23 18:36:35 +0300 |
commit | 607f5b88730b5af340c4445bacfd7cf3c7bd89e2 (patch) | |
tree | 71dc4d637dabe822b96cf3f5ec4ed168f1157d55 | |
parent | 9b4a47f1203d5ba3eaa5029818a76e4b9c6dc81a (diff) | |
download | ydb-607f5b88730b5af340c4445bacfd7cf3c7bd89e2.tar.gz |
YT-13309: Add merge schemas
5471ff5bcd9f7c7a9325085249492d293a8df0bd
-rw-r--r-- | yt/yt/client/complex_types/check_type_compatibility.cpp | 5 | ||||
-rw-r--r-- | yt/yt/client/complex_types/check_type_compatibility.h | 10 | ||||
-rw-r--r-- | yt/yt/client/complex_types/merge_complex_types.cpp | 333 | ||||
-rw-r--r-- | yt/yt/client/complex_types/merge_complex_types.h | 15 | ||||
-rw-r--r-- | yt/yt/client/table_client/merge_table_schemas.cpp | 132 | ||||
-rw-r--r-- | yt/yt/client/table_client/merge_table_schemas.h | 16 | ||||
-rw-r--r-- | yt/yt/client/ya.make | 6 |
7 files changed, 509 insertions, 8 deletions
diff --git a/yt/yt/client/complex_types/check_type_compatibility.cpp b/yt/yt/client/complex_types/check_type_compatibility.cpp index bcfe503860..5d5ecf70b2 100644 --- a/yt/yt/client/complex_types/check_type_compatibility.cpp +++ b/yt/yt/client/complex_types/check_type_compatibility.cpp @@ -352,10 +352,7 @@ TCompatibilityPair CheckDecimalTypeCompatibility( } } -// Returns pair: -// 1. Inner element that is neither optional nor tagged. -// 2. How many times this element is wrapped into Optional type. -static std::pair<TComplexTypeFieldDescriptor, int> UnwrapOptionalAndTagged(const TComplexTypeFieldDescriptor& descriptor) +std::pair<TComplexTypeFieldDescriptor, int> UnwrapOptionalAndTagged(const TComplexTypeFieldDescriptor& descriptor) { int nesting = 0; auto current = descriptor; diff --git a/yt/yt/client/complex_types/check_type_compatibility.h b/yt/yt/client/complex_types/check_type_compatibility.h index 086f1d9134..2895cbc779 100644 --- a/yt/yt/client/complex_types/check_type_compatibility.h +++ b/yt/yt/client/complex_types/check_type_compatibility.h @@ -8,12 +8,18 @@ namespace NYT::NComplexTypes { //////////////////////////////////////////////////////////////////////////////// +// Returns pair: +// 1. Inner element that is neither optional nor tagged. +// 2. How many times this element is wrapped into Optional type. +std::pair<NTableClient::TComplexTypeFieldDescriptor, int> UnwrapOptionalAndTagged( + const NTableClient::TComplexTypeFieldDescriptor& descriptor); + // Returned value is pair with elements // 1. Compatibility of types. // 2. If types are NOT FullyCompatible, error contains description of incompatibility. std::pair<NTableClient::ESchemaCompatibility, TError> CheckTypeCompatibility( - const NYT::NTableClient::TLogicalTypePtr& oldType, - const NYT::NTableClient::TLogicalTypePtr& newType); + const NTableClient::TLogicalTypePtr& oldType, + const NTableClient::TLogicalTypePtr& newType); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/complex_types/merge_complex_types.cpp b/yt/yt/client/complex_types/merge_complex_types.cpp new file mode 100644 index 0000000000..032a0ff569 --- /dev/null +++ b/yt/yt/client/complex_types/merge_complex_types.cpp @@ -0,0 +1,333 @@ +#include "merge_complex_types.h" + +#include "check_type_compatibility.h" + +#include <yt/yt/client/table_client/logical_type.h> + +namespace NYT::NComplexTypes { + +using namespace NYT::NTableClient; + +//////////////////////////////////////////////////////////////////////////////// + +namespace { + +std::vector<TLogicalTypePtr> MergeTupleTypes( + const TComplexTypeFieldDescriptor& firstDescriptor, + const TComplexTypeFieldDescriptor& secondDescriptor) +{ + YT_VERIFY(firstDescriptor.GetType()->GetMetatype() == ELogicalMetatype::Tuple + || firstDescriptor.GetType()->GetMetatype() == ELogicalMetatype::VariantTuple); + + YT_VERIFY(firstDescriptor.GetType()->GetMetatype() == secondDescriptor.GetType()->GetMetatype()); + + auto allowNewElements = firstDescriptor.GetType()->GetMetatype() == ELogicalMetatype::VariantTuple; + + auto firstSize = std::ssize(firstDescriptor.GetType()->GetElements()); + auto secondSize = std::ssize(secondDescriptor.GetType()->GetElements()); + + if (firstSize > secondSize) { + return MergeTupleTypes( + secondDescriptor, + firstDescriptor); + } + + if (!allowNewElements && firstSize != secondSize) { + THROW_ERROR_EXCEPTION( + "Tuple type of fields %Qv and %Qv of different size cannot be merged", + firstDescriptor.GetDescription(), + secondDescriptor.GetDescription()); + } + + std::vector<TLogicalTypePtr> resultElements; + resultElements.reserve(secondSize); + + int elementIndex = 0; + for (; elementIndex < firstSize; ++elementIndex) { + auto mergedType = MergeTypes( + firstDescriptor.Element(elementIndex).GetType(), + secondDescriptor.Element(elementIndex).GetType()); + + resultElements.push_back(std::move(mergedType)); + } + for (; elementIndex < secondSize; ++elementIndex) { + resultElements.push_back(secondDescriptor.Element(elementIndex).GetType()); + } + return resultElements; +} + +std::vector<TStructField> MergeStructTypes( + const TComplexTypeFieldDescriptor& firstDescriptor, + const TComplexTypeFieldDescriptor& secondDescriptor) +{ + YT_VERIFY(firstDescriptor.GetType()->GetMetatype() == ELogicalMetatype::Struct + || firstDescriptor.GetType()->GetMetatype() == ELogicalMetatype::VariantStruct); + + YT_VERIFY(firstDescriptor.GetType()->GetMetatype() == secondDescriptor.GetType()->GetMetatype()); + + auto firstFields = firstDescriptor.GetType()->GetFields(); + auto secondFields = secondDescriptor.GetType()->GetFields(); + + auto firstSize = std::ssize(firstFields); + auto secondSize = std::ssize(secondFields); + + if (firstSize > secondSize) { + return MergeStructTypes(secondDescriptor, firstDescriptor); + } + + auto makeNullability = firstDescriptor.GetType()->GetMetatype() == ELogicalMetatype::Struct; + + std::vector<TStructField> resultFields; + resultFields.reserve(secondSize); + + ssize_t fieldIndex = 0; + for (; fieldIndex < firstSize; ++fieldIndex) { + const auto& firstName = firstFields[fieldIndex].Name; + const auto& secondName = secondFields[fieldIndex].Name; + if (firstName != secondName) { + THROW_ERROR_EXCEPTION( + "Struct member name mismatch in %Qv", + firstDescriptor.GetDescription()) + << TErrorAttribute("first_name", firstName) + << TErrorAttribute("second_name", secondName); + } + const auto& firstFieldDescriptor = firstDescriptor.Field(fieldIndex); + const auto& secondFieldDescriptor = secondDescriptor.Field(fieldIndex); + try { + auto mergedField = MergeTypes( + firstFieldDescriptor.GetType(), + secondFieldDescriptor.GetType()); + resultFields.push_back(TStructField{ + .Name = firstName, + .Type = std::move(mergedField), + }); + } catch (const std::exception& ex) { + THROW_ERROR_EXCEPTION( + "Struct member type mismatch in %Qv", + firstDescriptor.GetDescription()) + << ex; + } + } + + for (; fieldIndex < secondSize; ++fieldIndex) { + const auto& secondFieldDescriptor = secondDescriptor.Field(fieldIndex); + if (!secondFieldDescriptor.GetType()->IsNullable() && makeNullability) { + resultFields.push_back(TStructField{ + .Name = secondFields[fieldIndex].Name, + .Type = New<TOptionalLogicalType>(secondFieldDescriptor.GetType()), + }); + } else { + resultFields.push_back(secondFields[fieldIndex]); + } + } + + return resultFields; +} + +TLogicalTypePtr MergeDictTypes( + const TComplexTypeFieldDescriptor& firstDescriptor, + const TComplexTypeFieldDescriptor& secondDescriptor) +{ + auto mergedKey = MergeTypes( + firstDescriptor.DictKey().GetType(), + secondDescriptor.DictKey().GetType()); + + auto mergedValue = MergeTypes( + firstDescriptor.DictValue().GetType(), + secondDescriptor.DictValue().GetType()); + + return New<TDictLogicalType>(std::move(mergedKey), std::move(mergedValue)); +} + +//////////////////////////////////////////////////////////////////////////////// + +TLogicalTypePtr UnwrapOptionalType(const TLogicalTypePtr& type) +{ + if (type->GetMetatype() == ELogicalMetatype::Optional) { + auto descriptor = TComplexTypeFieldDescriptor(type); + return descriptor.OptionalElement().GetType(); + } + return type; +} + +TLogicalTypePtr UnwrapTaggedType(const TLogicalTypePtr& type) +{ + if (type->GetMetatype() == ELogicalMetatype::Tagged) { + auto descriptor = TComplexTypeFieldDescriptor(type); + return descriptor.TaggedElement().GetType(); + } + return type; +} + +TString GetTag(const TLogicalTypePtr& type) +{ + return type->AsTaggedTypeRef().GetTag(); +} + +TString ExtractTagFromOneOfTypes( + const TLogicalTypePtr& firstType, + const TLogicalTypePtr& secondType) +{ + if (firstType->GetMetatype() == ELogicalMetatype::Tagged) { + return GetTag(firstType); + } else if (secondType->GetMetatype() == ELogicalMetatype::Tagged) { + return GetTag(secondType); + } + YT_ABORT(); +} + +} // namespace + +//////////////////////////////////////////////////////////////////////////////// + +TLogicalTypePtr MergeTypes( + const TLogicalTypePtr& firstType, + const TLogicalTypePtr& secondType) +{ + auto firstDescriptor = TComplexTypeFieldDescriptor(firstType); + auto secondDescriptor = TComplexTypeFieldDescriptor(secondType); + + const auto firstMetatype = firstDescriptor.GetType()->GetMetatype(); + const auto secondMetatype = secondDescriptor.GetType()->GetMetatype(); + + // It needs to handle tag before optional. + if (firstMetatype == ELogicalMetatype::Tagged + || secondMetatype == ELogicalMetatype::Tagged) + { + if (firstMetatype == ELogicalMetatype::Tagged + && secondMetatype == ELogicalMetatype::Tagged + && GetTag(firstType) != GetTag(secondType)) + { + THROW_ERROR_EXCEPTION( + "The type tags do not match: first tag %Qv and second tag %Qv in %Qv", + GetTag(firstType), + GetTag(secondType), + firstDescriptor.GetDescription()); + } + auto mergedType = MergeTypes( + UnwrapTaggedType(firstType), + UnwrapTaggedType(secondType)); + + return New<TTaggedLogicalType>( + ExtractTagFromOneOfTypes(firstType, secondType), + std::move(mergedType)); + } + + if (firstMetatype == ELogicalMetatype::Optional + || secondMetatype == ELogicalMetatype::Optional) + { + int firstLayerCount = UnwrapOptionalAndTagged(firstDescriptor).second; + int secondLayerCount = UnwrapOptionalAndTagged(secondDescriptor).second; + + if (firstLayerCount != secondLayerCount && (firstLayerCount > 1 || secondLayerCount > 1)) { + THROW_ERROR_EXCEPTION( + "Type of fields %Qv and %Qv cannot be merged", + firstDescriptor.GetDescription(), + secondDescriptor.GetDescription()) + << TErrorAttribute("first_type", ToString(*firstDescriptor.GetType())) + << TErrorAttribute("second_type", ToString(*secondDescriptor.GetType())); + } + + auto mergedType = MergeTypes( + UnwrapOptionalType(firstType), + UnwrapOptionalType(secondType)); + + return New<TOptionalLogicalType>(std::move(mergedType)); + } + + if (firstMetatype != secondMetatype) { + THROW_ERROR_EXCEPTION( + "Type of %Qv field cannot be merged: metatypes are incompatible", + firstDescriptor.GetDescription()) + << TErrorAttribute("first_type", ToString(*firstDescriptor.GetType())) + << TErrorAttribute("second_type", ToString(*secondDescriptor.GetType())); + } + + switch (firstMetatype) { + case ELogicalMetatype::Simple: + { + if (CheckTypeCompatibility(firstType, secondType).first == ESchemaCompatibility::FullyCompatible) { + return secondType; + } + if (CheckTypeCompatibility(secondType, firstType).first == ESchemaCompatibility::FullyCompatible) { + return firstType; + } + THROW_ERROR_EXCEPTION( + "Type of fields %Qv and %Qv cannot be merged", + firstDescriptor.GetDescription(), + secondDescriptor.GetDescription()) + << TErrorAttribute("first_type", ToString(*firstDescriptor.GetType())) + << TErrorAttribute("second_type", ToString(*secondDescriptor.GetType())); + + } + case ELogicalMetatype::List: + { + auto mergedType = MergeTypes( + firstType->AsListTypeRef().GetElement(), + secondType->AsListTypeRef().GetElement()); + + return New<TListLogicalType>(mergedType); + } + case ELogicalMetatype::VariantStruct: + { + auto mergedFields = MergeStructTypes( + firstDescriptor, + secondDescriptor); + + return New<TVariantStructLogicalType>(mergedFields); + } + + case ELogicalMetatype::Struct: + { + auto mergedFields = MergeStructTypes( + firstDescriptor, + secondDescriptor); + + return New<TStructLogicalType>(mergedFields); + } + + case ELogicalMetatype::Tuple: + { + auto mergedElements = MergeTupleTypes( + firstDescriptor, + secondDescriptor); + + return New<TTupleLogicalType>(mergedElements); + } + + case ELogicalMetatype::VariantTuple: + { + auto mergedElements = MergeTupleTypes( + firstDescriptor, + secondDescriptor); + + return New<TVariantTupleLogicalType>(mergedElements); + } + + case ELogicalMetatype::Dict: + return MergeDictTypes(firstDescriptor, secondDescriptor); + + case ELogicalMetatype::Decimal: + { + if (*firstDescriptor.GetType() == *secondDescriptor.GetType()) { + return firstType; + } else { + THROW_ERROR_EXCEPTION( + "Type of fields %Qv and %Qv cannot be merged", + firstDescriptor.GetDescription(), + secondDescriptor.GetDescription()) + << TErrorAttribute("first_type", ToString(*firstDescriptor.GetType())) + << TErrorAttribute("second_type", ToString(*secondDescriptor.GetType())); + } + } + + case ELogicalMetatype::Optional: + case ELogicalMetatype::Tagged: + // Optional and Tagged cases were checked earlier in this function. + YT_ABORT(); + } +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NComplexTypes diff --git a/yt/yt/client/complex_types/merge_complex_types.h b/yt/yt/client/complex_types/merge_complex_types.h new file mode 100644 index 0000000000..7f891b5e94 --- /dev/null +++ b/yt/yt/client/complex_types/merge_complex_types.h @@ -0,0 +1,15 @@ +#pragma once + +#include <yt/yt/client/table_client/public.h> + +namespace NYT::NComplexTypes { + +//////////////////////////////////////////////////////////////////////////////// + +NTableClient::TLogicalTypePtr MergeTypes( + const NTableClient::TLogicalTypePtr& firstType, + const NTableClient::TLogicalTypePtr& secondType); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NComplexTypes diff --git a/yt/yt/client/table_client/merge_table_schemas.cpp b/yt/yt/client/table_client/merge_table_schemas.cpp new file mode 100644 index 0000000000..40b55bb519 --- /dev/null +++ b/yt/yt/client/table_client/merge_table_schemas.cpp @@ -0,0 +1,132 @@ +#include "merge_table_schemas.h" + +#include "check_schema_compatibility.h" +#include "comparator.h" +#include "logical_type.h" +#include "schema.h" + +#include <yt/yt/client/complex_types/check_type_compatibility.h> +#include <yt/yt/client/complex_types/merge_complex_types.h> + +#include <yt/yt/core/ytree/convert.h> + +namespace NYT::NTableClient { + +using namespace NComplexTypes; + +//////////////////////////////////////////////////////////////////////////////// + +namespace { + +TColumnSchema MakeOptionalSchema(const TColumnSchema& columnSchema) +{ + if (columnSchema.LogicalType()->GetMetatype() == ELogicalMetatype::Optional) { + return columnSchema; + } + auto optionalType = New<TOptionalLogicalType>(columnSchema.LogicalType()); + auto resultSchema = TColumnSchema( + columnSchema.Name(), + optionalType, + columnSchema.SortOrder()); + resultSchema.SetStableName(columnSchema.StableName()); + return resultSchema; +} + +} // namespace + +//////////////////////////////////////////////////////////////////////////////// + +TTableSchemaPtr MergeTableSchemas( + const TTableSchemaPtr& firstSchema, + const TTableSchemaPtr& secondSchema) +{ + std::vector<TColumnSchema> resultColumns; + resultColumns.reserve(std::max(secondSchema->Columns().size(), firstSchema->Columns().size())); + + for (const auto& secondSchemaColumn : secondSchema->Columns()) { + const auto* firstSchemaColumn = firstSchema->FindColumn(secondSchemaColumn.Name()); + + if (firstSchemaColumn) { + if (firstSchemaColumn->StableName() != secondSchemaColumn.StableName()) { + THROW_ERROR_EXCEPTION("Mismatching stable names in column %Qv: %Qv and %Qv", + firstSchemaColumn->Name(), + firstSchemaColumn->StableName(), + secondSchemaColumn.StableName()); + } + if (firstSchemaColumn->SortOrder() != secondSchemaColumn.SortOrder()) { + THROW_ERROR_EXCEPTION("Mismatching sort orders in column %Qv: %Qv and %Qv", + firstSchemaColumn->Name()); + } + + try { + auto mergedType = MergeTypes( + firstSchemaColumn->LogicalType(), + secondSchemaColumn.LogicalType()); + + auto resultSchema = TColumnSchema( + firstSchemaColumn->Name(), + mergedType, + firstSchemaColumn->SortOrder()); + + resultSchema.SetStableName(firstSchemaColumn->StableName()); + resultColumns.push_back(std::move(resultSchema)); + + } catch(const std::exception& ex) { + THROW_ERROR_EXCEPTION( + "Column %v first schema type is incompatible with second schema type", + firstSchemaColumn->GetDiagnosticNameString()) + << ex; + } + + } else if (!firstSchema->GetStrict()) { + THROW_ERROR_EXCEPTION("Column %v is present in second schema and is missing in non-strict first schema", + secondSchemaColumn.GetDiagnosticNameString()); + } else { + resultColumns.push_back(MakeOptionalSchema(secondSchemaColumn)); + } + } + + for (const auto& firstSchemaColumn : firstSchema->Columns()) { + if (!secondSchema->FindColumn(firstSchemaColumn.Name())) { + if (!secondSchema->GetStrict()) { + THROW_ERROR_EXCEPTION("Column %v is present in first schema and is missing in non-strict second schema", + firstSchemaColumn.GetDiagnosticNameString()); + } + resultColumns.push_back(MakeOptionalSchema(firstSchemaColumn)); + } + } + + auto getDeletedColumnsStableNames = [] (const std::vector<TDeletedColumn>& deletedColumns) { + THashSet<TColumnStableName> stableNames; + for (const auto& column: deletedColumns) { + stableNames.insert(column.StableName()); + } + return stableNames; + }; + + auto firstDeletedStableNames = getDeletedColumnsStableNames(firstSchema->DeletedColumns()); + auto secondDeletedStableNames = getDeletedColumnsStableNames(secondSchema->DeletedColumns()); + + if (firstDeletedStableNames == secondDeletedStableNames) { + // If the deleted columns completely match, then the table can be teleported. + return { + New<TTableSchema>( + resultColumns, + /*strict*/ firstSchema->GetStrict() && secondSchema->GetStrict(), + firstSchema->GetUniqueKeys() && secondSchema->GetUniqueKeys(), + ETableSchemaModification::None, + firstSchema->DeletedColumns()) + }; + } else { + return { + New<TTableSchema>( + resultColumns, + /*strict*/ firstSchema->GetStrict() && secondSchema->GetStrict(), + firstSchema->GetUniqueKeys() && secondSchema->GetUniqueKeys()) + }; + } +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NTableClient diff --git a/yt/yt/client/table_client/merge_table_schemas.h b/yt/yt/client/table_client/merge_table_schemas.h new file mode 100644 index 0000000000..af2f37553f --- /dev/null +++ b/yt/yt/client/table_client/merge_table_schemas.h @@ -0,0 +1,16 @@ +#pragma once + +#include "public.h" + +namespace NYT::NTableClient { + +//////////////////////////////////////////////////////////////////////////////// + +// Create schema that match both two schemas. +TTableSchemaPtr MergeTableSchemas( + const TTableSchemaPtr& firstSchema, + const TTableSchemaPtr& secondSchema); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NTableClient diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make index ebb3eefbaf..1886973025 100644 --- a/yt/yt/client/ya.make +++ b/yt/yt/client/ya.make @@ -124,6 +124,7 @@ SRCS( table_client/schemaless_dynamic_table_writer.cpp table_client/serialize.cpp table_client/logical_type.cpp + table_client/merge_table_schemas.cpp table_client/name_table.cpp table_client/wire_protocol.cpp table_client/columnar_statistics.cpp @@ -177,9 +178,10 @@ SRCS( complex_types/check_yson_token.cpp complex_types/check_type_compatibility.cpp complex_types/infinite_entity.cpp - complex_types/yson_format_conversion.cpp - complex_types/uuid_text.cpp + complex_types/merge_complex_types.cpp complex_types/time_text.cpp + complex_types/uuid_text.cpp + complex_types/yson_format_conversion.cpp zookeeper/packet.cpp zookeeper/protocol.cpp |