aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornadya02 <nadya02@yandex-team.com>2024-07-23 18:23:59 +0300
committernadya02 <nadya02@yandex-team.com>2024-07-23 18:36:35 +0300
commit607f5b88730b5af340c4445bacfd7cf3c7bd89e2 (patch)
tree71dc4d637dabe822b96cf3f5ec4ed168f1157d55
parent9b4a47f1203d5ba3eaa5029818a76e4b9c6dc81a (diff)
downloadydb-607f5b88730b5af340c4445bacfd7cf3c7bd89e2.tar.gz
YT-13309: Add merge schemas
5471ff5bcd9f7c7a9325085249492d293a8df0bd
-rw-r--r--yt/yt/client/complex_types/check_type_compatibility.cpp5
-rw-r--r--yt/yt/client/complex_types/check_type_compatibility.h10
-rw-r--r--yt/yt/client/complex_types/merge_complex_types.cpp333
-rw-r--r--yt/yt/client/complex_types/merge_complex_types.h15
-rw-r--r--yt/yt/client/table_client/merge_table_schemas.cpp132
-rw-r--r--yt/yt/client/table_client/merge_table_schemas.h16
-rw-r--r--yt/yt/client/ya.make6
7 files changed, 509 insertions, 8 deletions
diff --git a/yt/yt/client/complex_types/check_type_compatibility.cpp b/yt/yt/client/complex_types/check_type_compatibility.cpp
index bcfe503860..5d5ecf70b2 100644
--- a/yt/yt/client/complex_types/check_type_compatibility.cpp
+++ b/yt/yt/client/complex_types/check_type_compatibility.cpp
@@ -352,10 +352,7 @@ TCompatibilityPair CheckDecimalTypeCompatibility(
}
}
-// Returns pair:
-// 1. Inner element that is neither optional nor tagged.
-// 2. How many times this element is wrapped into Optional type.
-static std::pair<TComplexTypeFieldDescriptor, int> UnwrapOptionalAndTagged(const TComplexTypeFieldDescriptor& descriptor)
+std::pair<TComplexTypeFieldDescriptor, int> UnwrapOptionalAndTagged(const TComplexTypeFieldDescriptor& descriptor)
{
int nesting = 0;
auto current = descriptor;
diff --git a/yt/yt/client/complex_types/check_type_compatibility.h b/yt/yt/client/complex_types/check_type_compatibility.h
index 086f1d9134..2895cbc779 100644
--- a/yt/yt/client/complex_types/check_type_compatibility.h
+++ b/yt/yt/client/complex_types/check_type_compatibility.h
@@ -8,12 +8,18 @@ namespace NYT::NComplexTypes {
////////////////////////////////////////////////////////////////////////////////
+// Returns pair:
+// 1. Inner element that is neither optional nor tagged.
+// 2. How many times this element is wrapped into Optional type.
+std::pair<NTableClient::TComplexTypeFieldDescriptor, int> UnwrapOptionalAndTagged(
+ const NTableClient::TComplexTypeFieldDescriptor& descriptor);
+
// Returned value is pair with elements
// 1. Compatibility of types.
// 2. If types are NOT FullyCompatible, error contains description of incompatibility.
std::pair<NTableClient::ESchemaCompatibility, TError> CheckTypeCompatibility(
- const NYT::NTableClient::TLogicalTypePtr& oldType,
- const NYT::NTableClient::TLogicalTypePtr& newType);
+ const NTableClient::TLogicalTypePtr& oldType,
+ const NTableClient::TLogicalTypePtr& newType);
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/complex_types/merge_complex_types.cpp b/yt/yt/client/complex_types/merge_complex_types.cpp
new file mode 100644
index 0000000000..032a0ff569
--- /dev/null
+++ b/yt/yt/client/complex_types/merge_complex_types.cpp
@@ -0,0 +1,333 @@
+#include "merge_complex_types.h"
+
+#include "check_type_compatibility.h"
+
+#include <yt/yt/client/table_client/logical_type.h>
+
+namespace NYT::NComplexTypes {
+
+using namespace NYT::NTableClient;
+
+////////////////////////////////////////////////////////////////////////////////
+
+namespace {
+
+std::vector<TLogicalTypePtr> MergeTupleTypes(
+ const TComplexTypeFieldDescriptor& firstDescriptor,
+ const TComplexTypeFieldDescriptor& secondDescriptor)
+{
+ YT_VERIFY(firstDescriptor.GetType()->GetMetatype() == ELogicalMetatype::Tuple
+ || firstDescriptor.GetType()->GetMetatype() == ELogicalMetatype::VariantTuple);
+
+ YT_VERIFY(firstDescriptor.GetType()->GetMetatype() == secondDescriptor.GetType()->GetMetatype());
+
+ auto allowNewElements = firstDescriptor.GetType()->GetMetatype() == ELogicalMetatype::VariantTuple;
+
+ auto firstSize = std::ssize(firstDescriptor.GetType()->GetElements());
+ auto secondSize = std::ssize(secondDescriptor.GetType()->GetElements());
+
+ if (firstSize > secondSize) {
+ return MergeTupleTypes(
+ secondDescriptor,
+ firstDescriptor);
+ }
+
+ if (!allowNewElements && firstSize != secondSize) {
+ THROW_ERROR_EXCEPTION(
+ "Tuple type of fields %Qv and %Qv of different size cannot be merged",
+ firstDescriptor.GetDescription(),
+ secondDescriptor.GetDescription());
+ }
+
+ std::vector<TLogicalTypePtr> resultElements;
+ resultElements.reserve(secondSize);
+
+ int elementIndex = 0;
+ for (; elementIndex < firstSize; ++elementIndex) {
+ auto mergedType = MergeTypes(
+ firstDescriptor.Element(elementIndex).GetType(),
+ secondDescriptor.Element(elementIndex).GetType());
+
+ resultElements.push_back(std::move(mergedType));
+ }
+ for (; elementIndex < secondSize; ++elementIndex) {
+ resultElements.push_back(secondDescriptor.Element(elementIndex).GetType());
+ }
+ return resultElements;
+}
+
+std::vector<TStructField> MergeStructTypes(
+ const TComplexTypeFieldDescriptor& firstDescriptor,
+ const TComplexTypeFieldDescriptor& secondDescriptor)
+{
+ YT_VERIFY(firstDescriptor.GetType()->GetMetatype() == ELogicalMetatype::Struct
+ || firstDescriptor.GetType()->GetMetatype() == ELogicalMetatype::VariantStruct);
+
+ YT_VERIFY(firstDescriptor.GetType()->GetMetatype() == secondDescriptor.GetType()->GetMetatype());
+
+ auto firstFields = firstDescriptor.GetType()->GetFields();
+ auto secondFields = secondDescriptor.GetType()->GetFields();
+
+ auto firstSize = std::ssize(firstFields);
+ auto secondSize = std::ssize(secondFields);
+
+ if (firstSize > secondSize) {
+ return MergeStructTypes(secondDescriptor, firstDescriptor);
+ }
+
+ auto makeNullability = firstDescriptor.GetType()->GetMetatype() == ELogicalMetatype::Struct;
+
+ std::vector<TStructField> resultFields;
+ resultFields.reserve(secondSize);
+
+ ssize_t fieldIndex = 0;
+ for (; fieldIndex < firstSize; ++fieldIndex) {
+ const auto& firstName = firstFields[fieldIndex].Name;
+ const auto& secondName = secondFields[fieldIndex].Name;
+ if (firstName != secondName) {
+ THROW_ERROR_EXCEPTION(
+ "Struct member name mismatch in %Qv",
+ firstDescriptor.GetDescription())
+ << TErrorAttribute("first_name", firstName)
+ << TErrorAttribute("second_name", secondName);
+ }
+ const auto& firstFieldDescriptor = firstDescriptor.Field(fieldIndex);
+ const auto& secondFieldDescriptor = secondDescriptor.Field(fieldIndex);
+ try {
+ auto mergedField = MergeTypes(
+ firstFieldDescriptor.GetType(),
+ secondFieldDescriptor.GetType());
+ resultFields.push_back(TStructField{
+ .Name = firstName,
+ .Type = std::move(mergedField),
+ });
+ } catch (const std::exception& ex) {
+ THROW_ERROR_EXCEPTION(
+ "Struct member type mismatch in %Qv",
+ firstDescriptor.GetDescription())
+ << ex;
+ }
+ }
+
+ for (; fieldIndex < secondSize; ++fieldIndex) {
+ const auto& secondFieldDescriptor = secondDescriptor.Field(fieldIndex);
+ if (!secondFieldDescriptor.GetType()->IsNullable() && makeNullability) {
+ resultFields.push_back(TStructField{
+ .Name = secondFields[fieldIndex].Name,
+ .Type = New<TOptionalLogicalType>(secondFieldDescriptor.GetType()),
+ });
+ } else {
+ resultFields.push_back(secondFields[fieldIndex]);
+ }
+ }
+
+ return resultFields;
+}
+
+TLogicalTypePtr MergeDictTypes(
+ const TComplexTypeFieldDescriptor& firstDescriptor,
+ const TComplexTypeFieldDescriptor& secondDescriptor)
+{
+ auto mergedKey = MergeTypes(
+ firstDescriptor.DictKey().GetType(),
+ secondDescriptor.DictKey().GetType());
+
+ auto mergedValue = MergeTypes(
+ firstDescriptor.DictValue().GetType(),
+ secondDescriptor.DictValue().GetType());
+
+ return New<TDictLogicalType>(std::move(mergedKey), std::move(mergedValue));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TLogicalTypePtr UnwrapOptionalType(const TLogicalTypePtr& type)
+{
+ if (type->GetMetatype() == ELogicalMetatype::Optional) {
+ auto descriptor = TComplexTypeFieldDescriptor(type);
+ return descriptor.OptionalElement().GetType();
+ }
+ return type;
+}
+
+TLogicalTypePtr UnwrapTaggedType(const TLogicalTypePtr& type)
+{
+ if (type->GetMetatype() == ELogicalMetatype::Tagged) {
+ auto descriptor = TComplexTypeFieldDescriptor(type);
+ return descriptor.TaggedElement().GetType();
+ }
+ return type;
+}
+
+TString GetTag(const TLogicalTypePtr& type)
+{
+ return type->AsTaggedTypeRef().GetTag();
+}
+
+TString ExtractTagFromOneOfTypes(
+ const TLogicalTypePtr& firstType,
+ const TLogicalTypePtr& secondType)
+{
+ if (firstType->GetMetatype() == ELogicalMetatype::Tagged) {
+ return GetTag(firstType);
+ } else if (secondType->GetMetatype() == ELogicalMetatype::Tagged) {
+ return GetTag(secondType);
+ }
+ YT_ABORT();
+}
+
+} // namespace
+
+////////////////////////////////////////////////////////////////////////////////
+
+TLogicalTypePtr MergeTypes(
+ const TLogicalTypePtr& firstType,
+ const TLogicalTypePtr& secondType)
+{
+ auto firstDescriptor = TComplexTypeFieldDescriptor(firstType);
+ auto secondDescriptor = TComplexTypeFieldDescriptor(secondType);
+
+ const auto firstMetatype = firstDescriptor.GetType()->GetMetatype();
+ const auto secondMetatype = secondDescriptor.GetType()->GetMetatype();
+
+ // It needs to handle tag before optional.
+ if (firstMetatype == ELogicalMetatype::Tagged
+ || secondMetatype == ELogicalMetatype::Tagged)
+ {
+ if (firstMetatype == ELogicalMetatype::Tagged
+ && secondMetatype == ELogicalMetatype::Tagged
+ && GetTag(firstType) != GetTag(secondType))
+ {
+ THROW_ERROR_EXCEPTION(
+ "The type tags do not match: first tag %Qv and second tag %Qv in %Qv",
+ GetTag(firstType),
+ GetTag(secondType),
+ firstDescriptor.GetDescription());
+ }
+ auto mergedType = MergeTypes(
+ UnwrapTaggedType(firstType),
+ UnwrapTaggedType(secondType));
+
+ return New<TTaggedLogicalType>(
+ ExtractTagFromOneOfTypes(firstType, secondType),
+ std::move(mergedType));
+ }
+
+ if (firstMetatype == ELogicalMetatype::Optional
+ || secondMetatype == ELogicalMetatype::Optional)
+ {
+ int firstLayerCount = UnwrapOptionalAndTagged(firstDescriptor).second;
+ int secondLayerCount = UnwrapOptionalAndTagged(secondDescriptor).second;
+
+ if (firstLayerCount != secondLayerCount && (firstLayerCount > 1 || secondLayerCount > 1)) {
+ THROW_ERROR_EXCEPTION(
+ "Type of fields %Qv and %Qv cannot be merged",
+ firstDescriptor.GetDescription(),
+ secondDescriptor.GetDescription())
+ << TErrorAttribute("first_type", ToString(*firstDescriptor.GetType()))
+ << TErrorAttribute("second_type", ToString(*secondDescriptor.GetType()));
+ }
+
+ auto mergedType = MergeTypes(
+ UnwrapOptionalType(firstType),
+ UnwrapOptionalType(secondType));
+
+ return New<TOptionalLogicalType>(std::move(mergedType));
+ }
+
+ if (firstMetatype != secondMetatype) {
+ THROW_ERROR_EXCEPTION(
+ "Type of %Qv field cannot be merged: metatypes are incompatible",
+ firstDescriptor.GetDescription())
+ << TErrorAttribute("first_type", ToString(*firstDescriptor.GetType()))
+ << TErrorAttribute("second_type", ToString(*secondDescriptor.GetType()));
+ }
+
+ switch (firstMetatype) {
+ case ELogicalMetatype::Simple:
+ {
+ if (CheckTypeCompatibility(firstType, secondType).first == ESchemaCompatibility::FullyCompatible) {
+ return secondType;
+ }
+ if (CheckTypeCompatibility(secondType, firstType).first == ESchemaCompatibility::FullyCompatible) {
+ return firstType;
+ }
+ THROW_ERROR_EXCEPTION(
+ "Type of fields %Qv and %Qv cannot be merged",
+ firstDescriptor.GetDescription(),
+ secondDescriptor.GetDescription())
+ << TErrorAttribute("first_type", ToString(*firstDescriptor.GetType()))
+ << TErrorAttribute("second_type", ToString(*secondDescriptor.GetType()));
+
+ }
+ case ELogicalMetatype::List:
+ {
+ auto mergedType = MergeTypes(
+ firstType->AsListTypeRef().GetElement(),
+ secondType->AsListTypeRef().GetElement());
+
+ return New<TListLogicalType>(mergedType);
+ }
+ case ELogicalMetatype::VariantStruct:
+ {
+ auto mergedFields = MergeStructTypes(
+ firstDescriptor,
+ secondDescriptor);
+
+ return New<TVariantStructLogicalType>(mergedFields);
+ }
+
+ case ELogicalMetatype::Struct:
+ {
+ auto mergedFields = MergeStructTypes(
+ firstDescriptor,
+ secondDescriptor);
+
+ return New<TStructLogicalType>(mergedFields);
+ }
+
+ case ELogicalMetatype::Tuple:
+ {
+ auto mergedElements = MergeTupleTypes(
+ firstDescriptor,
+ secondDescriptor);
+
+ return New<TTupleLogicalType>(mergedElements);
+ }
+
+ case ELogicalMetatype::VariantTuple:
+ {
+ auto mergedElements = MergeTupleTypes(
+ firstDescriptor,
+ secondDescriptor);
+
+ return New<TVariantTupleLogicalType>(mergedElements);
+ }
+
+ case ELogicalMetatype::Dict:
+ return MergeDictTypes(firstDescriptor, secondDescriptor);
+
+ case ELogicalMetatype::Decimal:
+ {
+ if (*firstDescriptor.GetType() == *secondDescriptor.GetType()) {
+ return firstType;
+ } else {
+ THROW_ERROR_EXCEPTION(
+ "Type of fields %Qv and %Qv cannot be merged",
+ firstDescriptor.GetDescription(),
+ secondDescriptor.GetDescription())
+ << TErrorAttribute("first_type", ToString(*firstDescriptor.GetType()))
+ << TErrorAttribute("second_type", ToString(*secondDescriptor.GetType()));
+ }
+ }
+
+ case ELogicalMetatype::Optional:
+ case ELogicalMetatype::Tagged:
+ // Optional and Tagged cases were checked earlier in this function.
+ YT_ABORT();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NComplexTypes
diff --git a/yt/yt/client/complex_types/merge_complex_types.h b/yt/yt/client/complex_types/merge_complex_types.h
new file mode 100644
index 0000000000..7f891b5e94
--- /dev/null
+++ b/yt/yt/client/complex_types/merge_complex_types.h
@@ -0,0 +1,15 @@
+#pragma once
+
+#include <yt/yt/client/table_client/public.h>
+
+namespace NYT::NComplexTypes {
+
+////////////////////////////////////////////////////////////////////////////////
+
+NTableClient::TLogicalTypePtr MergeTypes(
+ const NTableClient::TLogicalTypePtr& firstType,
+ const NTableClient::TLogicalTypePtr& secondType);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NComplexTypes
diff --git a/yt/yt/client/table_client/merge_table_schemas.cpp b/yt/yt/client/table_client/merge_table_schemas.cpp
new file mode 100644
index 0000000000..40b55bb519
--- /dev/null
+++ b/yt/yt/client/table_client/merge_table_schemas.cpp
@@ -0,0 +1,132 @@
+#include "merge_table_schemas.h"
+
+#include "check_schema_compatibility.h"
+#include "comparator.h"
+#include "logical_type.h"
+#include "schema.h"
+
+#include <yt/yt/client/complex_types/check_type_compatibility.h>
+#include <yt/yt/client/complex_types/merge_complex_types.h>
+
+#include <yt/yt/core/ytree/convert.h>
+
+namespace NYT::NTableClient {
+
+using namespace NComplexTypes;
+
+////////////////////////////////////////////////////////////////////////////////
+
+namespace {
+
+TColumnSchema MakeOptionalSchema(const TColumnSchema& columnSchema)
+{
+ if (columnSchema.LogicalType()->GetMetatype() == ELogicalMetatype::Optional) {
+ return columnSchema;
+ }
+ auto optionalType = New<TOptionalLogicalType>(columnSchema.LogicalType());
+ auto resultSchema = TColumnSchema(
+ columnSchema.Name(),
+ optionalType,
+ columnSchema.SortOrder());
+ resultSchema.SetStableName(columnSchema.StableName());
+ return resultSchema;
+}
+
+} // namespace
+
+////////////////////////////////////////////////////////////////////////////////
+
+TTableSchemaPtr MergeTableSchemas(
+ const TTableSchemaPtr& firstSchema,
+ const TTableSchemaPtr& secondSchema)
+{
+ std::vector<TColumnSchema> resultColumns;
+ resultColumns.reserve(std::max(secondSchema->Columns().size(), firstSchema->Columns().size()));
+
+ for (const auto& secondSchemaColumn : secondSchema->Columns()) {
+ const auto* firstSchemaColumn = firstSchema->FindColumn(secondSchemaColumn.Name());
+
+ if (firstSchemaColumn) {
+ if (firstSchemaColumn->StableName() != secondSchemaColumn.StableName()) {
+ THROW_ERROR_EXCEPTION("Mismatching stable names in column %Qv: %Qv and %Qv",
+ firstSchemaColumn->Name(),
+ firstSchemaColumn->StableName(),
+ secondSchemaColumn.StableName());
+ }
+ if (firstSchemaColumn->SortOrder() != secondSchemaColumn.SortOrder()) {
+ THROW_ERROR_EXCEPTION("Mismatching sort orders in column %Qv: %Qv and %Qv",
+ firstSchemaColumn->Name());
+ }
+
+ try {
+ auto mergedType = MergeTypes(
+ firstSchemaColumn->LogicalType(),
+ secondSchemaColumn.LogicalType());
+
+ auto resultSchema = TColumnSchema(
+ firstSchemaColumn->Name(),
+ mergedType,
+ firstSchemaColumn->SortOrder());
+
+ resultSchema.SetStableName(firstSchemaColumn->StableName());
+ resultColumns.push_back(std::move(resultSchema));
+
+ } catch(const std::exception& ex) {
+ THROW_ERROR_EXCEPTION(
+ "Column %v first schema type is incompatible with second schema type",
+ firstSchemaColumn->GetDiagnosticNameString())
+ << ex;
+ }
+
+ } else if (!firstSchema->GetStrict()) {
+ THROW_ERROR_EXCEPTION("Column %v is present in second schema and is missing in non-strict first schema",
+ secondSchemaColumn.GetDiagnosticNameString());
+ } else {
+ resultColumns.push_back(MakeOptionalSchema(secondSchemaColumn));
+ }
+ }
+
+ for (const auto& firstSchemaColumn : firstSchema->Columns()) {
+ if (!secondSchema->FindColumn(firstSchemaColumn.Name())) {
+ if (!secondSchema->GetStrict()) {
+ THROW_ERROR_EXCEPTION("Column %v is present in first schema and is missing in non-strict second schema",
+ firstSchemaColumn.GetDiagnosticNameString());
+ }
+ resultColumns.push_back(MakeOptionalSchema(firstSchemaColumn));
+ }
+ }
+
+ auto getDeletedColumnsStableNames = [] (const std::vector<TDeletedColumn>& deletedColumns) {
+ THashSet<TColumnStableName> stableNames;
+ for (const auto& column: deletedColumns) {
+ stableNames.insert(column.StableName());
+ }
+ return stableNames;
+ };
+
+ auto firstDeletedStableNames = getDeletedColumnsStableNames(firstSchema->DeletedColumns());
+ auto secondDeletedStableNames = getDeletedColumnsStableNames(secondSchema->DeletedColumns());
+
+ if (firstDeletedStableNames == secondDeletedStableNames) {
+ // If the deleted columns completely match, then the table can be teleported.
+ return {
+ New<TTableSchema>(
+ resultColumns,
+ /*strict*/ firstSchema->GetStrict() && secondSchema->GetStrict(),
+ firstSchema->GetUniqueKeys() && secondSchema->GetUniqueKeys(),
+ ETableSchemaModification::None,
+ firstSchema->DeletedColumns())
+ };
+ } else {
+ return {
+ New<TTableSchema>(
+ resultColumns,
+ /*strict*/ firstSchema->GetStrict() && secondSchema->GetStrict(),
+ firstSchema->GetUniqueKeys() && secondSchema->GetUniqueKeys())
+ };
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NTableClient
diff --git a/yt/yt/client/table_client/merge_table_schemas.h b/yt/yt/client/table_client/merge_table_schemas.h
new file mode 100644
index 0000000000..af2f37553f
--- /dev/null
+++ b/yt/yt/client/table_client/merge_table_schemas.h
@@ -0,0 +1,16 @@
+#pragma once
+
+#include "public.h"
+
+namespace NYT::NTableClient {
+
+////////////////////////////////////////////////////////////////////////////////
+
+// Create schema that match both two schemas.
+TTableSchemaPtr MergeTableSchemas(
+ const TTableSchemaPtr& firstSchema,
+ const TTableSchemaPtr& secondSchema);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NTableClient
diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make
index ebb3eefbaf..1886973025 100644
--- a/yt/yt/client/ya.make
+++ b/yt/yt/client/ya.make
@@ -124,6 +124,7 @@ SRCS(
table_client/schemaless_dynamic_table_writer.cpp
table_client/serialize.cpp
table_client/logical_type.cpp
+ table_client/merge_table_schemas.cpp
table_client/name_table.cpp
table_client/wire_protocol.cpp
table_client/columnar_statistics.cpp
@@ -177,9 +178,10 @@ SRCS(
complex_types/check_yson_token.cpp
complex_types/check_type_compatibility.cpp
complex_types/infinite_entity.cpp
- complex_types/yson_format_conversion.cpp
- complex_types/uuid_text.cpp
+ complex_types/merge_complex_types.cpp
complex_types/time_text.cpp
+ complex_types/uuid_text.cpp
+ complex_types/yson_format_conversion.cpp
zookeeper/packet.cpp
zookeeper/protocol.cpp