diff options
author | Vladislav Gogov <vlad-gogov@ydb.tech> | 2024-11-21 13:38:02 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-21 13:38:02 +0300 |
commit | 45513a5b190a63e4fd2e6482f39c8f6ba324a7e1 (patch) | |
tree | 6c2ec95857caf3e6c915822c0cbc82d86d6513a3 | |
parent | 4f2597be8b25e798ef980566eae6e589b5a01303 (diff) | |
download | ydb-45513a5b190a63e4fd2e6482f39c8f6ba324a7e1.tar.gz |
Column Family for ColumnTable (#9657)
34 files changed, 2531 insertions, 299 deletions
diff --git a/ydb/core/formats/arrow/serializer/native.cpp b/ydb/core/formats/arrow/serializer/native.cpp index 30ec056d33..a580ed3341 100644 --- a/ydb/core/formats/arrow/serializer/native.cpp +++ b/ydb/core/formats/arrow/serializer/native.cpp @@ -113,9 +113,8 @@ NKikimr::TConclusion<std::shared_ptr<arrow::util::Codec>> TNativeSerializer::Bui const int levelMin = codec->minimum_compression_level(); const int levelMax = codec->maximum_compression_level(); if (levelDef < levelMin || levelMax < levelDef) { - return TConclusionStatus::Fail( - TStringBuilder() << "incorrect level for codec. have to be: [" << levelMin << ":" << levelMax << "]" - ); + return TConclusionStatus::Fail(TStringBuilder() << "incorrect level for codec `" << arrow::util::Codec::GetCodecAsString(cType) + << "`. have to be: [" << levelMin << ":" << levelMax << "]"); } std::shared_ptr<arrow::util::Codec> codecPtr = std::move(NArrow::TStatusValidator::GetValid(arrow::util::Codec::Create(cType, levelDef))); return codecPtr; @@ -182,7 +181,9 @@ NKikimr::TConclusionStatus TNativeSerializer::DoDeserializeFromProto(const NKiki void TNativeSerializer::DoSerializeToProto(NKikimrSchemeOp::TOlapColumn::TSerializer& proto) const { if (Options.codec) { proto.MutableArrowCompression()->SetCodec(NArrow::CompressionToProto(Options.codec->compression_type())); - proto.MutableArrowCompression()->SetLevel(Options.codec->compression_level()); + if (arrow::util::Codec::SupportsCompressionLevel(Options.codec->compression_type())) { + proto.MutableArrowCompression()->SetLevel(Options.codec->compression_level()); + } } else { proto.MutableArrowCompression()->SetCodec(NArrow::CompressionToProto(arrow::Compression::UNCOMPRESSED)); } diff --git a/ydb/core/formats/arrow/serializer/native.h b/ydb/core/formats/arrow/serializer/native.h index 38615600fc..d09241a779 100644 --- a/ydb/core/formats/arrow/serializer/native.h +++ b/ydb/core/formats/arrow/serializer/native.h @@ -103,6 +103,20 @@ public: Options.use_threads = false; Options.memory_pool = pool; } + + arrow::Compression::type GetCodecType() const { + if (Options.codec) { + return Options.codec->compression_type(); + } + return arrow::Compression::type::UNCOMPRESSED; + } + + std::optional<i32> GetCodecLevel() const { + if (Options.codec && arrow::util::Codec::SupportsCompressionLevel(Options.codec->compression_type())) { + return Options.codec->compression_level(); + } + return {}; + } }; } diff --git a/ydb/core/formats/arrow/serializer/parsing.cpp b/ydb/core/formats/arrow/serializer/parsing.cpp index 8a394073bd..4b57d86da8 100644 --- a/ydb/core/formats/arrow/serializer/parsing.cpp +++ b/ydb/core/formats/arrow/serializer/parsing.cpp @@ -7,6 +7,18 @@ std::string CompressionToString(const arrow::Compression::type compression) { return arrow::util::Codec::GetCodecAsString(compression); } +std::string CompressionToString(const NKikimrSchemeOp::EColumnCodec compression) { + switch (compression) { + case NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain: + return "off"; + case NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD: + return "zstd"; + case NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4: + return "lz4"; + } + return ""; +} + std::optional<arrow::Compression::type> CompressionFromString(const std::string& compressionStr) { auto result = arrow::util::Codec::GetCompressionType(compressionStr); if (!result.ok()) { diff --git a/ydb/core/formats/arrow/serializer/parsing.h b/ydb/core/formats/arrow/serializer/parsing.h index e1dbbf9bad..4ea4371fa5 100644 --- a/ydb/core/formats/arrow/serializer/parsing.h +++ b/ydb/core/formats/arrow/serializer/parsing.h @@ -9,9 +9,9 @@ namespace NKikimr::NArrow { std::string CompressionToString(const arrow::Compression::type compression); +std::string CompressionToString(const NKikimrSchemeOp::EColumnCodec compression); std::optional<arrow::Compression::type> CompressionFromString(const std::string& compressionStr); NKikimrSchemeOp::EColumnCodec CompressionToProto(const arrow::Compression::type compression); std::optional<arrow::Compression::type> CompressionFromProto(const NKikimrSchemeOp::EColumnCodec compression); - } diff --git a/ydb/core/formats/arrow/serializer/utils.cpp b/ydb/core/formats/arrow/serializer/utils.cpp new file mode 100644 index 0000000000..b33f7bc58a --- /dev/null +++ b/ydb/core/formats/arrow/serializer/utils.cpp @@ -0,0 +1,29 @@ +#include "parsing.h" +#include "utils.h" + +#include <ydb/library/formats/arrow/validation/validation.h> + +#include <contrib/libs/apache/arrow/cpp/src/arrow/util/compression.h> + +namespace NKikimr::NArrow { +bool SupportsCompressionLevel(const arrow::Compression::type compression) { + return arrow::util::Codec::SupportsCompressionLevel(compression); +} + +bool SupportsCompressionLevel(const NKikimrSchemeOp::EColumnCodec compression) { + return SupportsCompressionLevel(CompressionFromProto(compression).value()); +} + +std::optional<int> MinimumCompressionLevel(const arrow::Compression::type compression) { + if (!SupportsCompressionLevel(compression)) { + return {}; + } + return NArrow::TStatusValidator::GetValid(arrow::util::Codec::MinimumCompressionLevel(compression)); +} +std::optional<int> MaximumCompressionLevel(const arrow::Compression::type compression) { + if (!SupportsCompressionLevel(compression)) { + return {}; + } + return NArrow::TStatusValidator::GetValid(arrow::util::Codec::MaximumCompressionLevel(compression)); +} +} diff --git a/ydb/core/formats/arrow/serializer/utils.h b/ydb/core/formats/arrow/serializer/utils.h new file mode 100644 index 0000000000..954bc6dee9 --- /dev/null +++ b/ydb/core/formats/arrow/serializer/utils.h @@ -0,0 +1,16 @@ +#pragma once + +#include <ydb/core/protos/flat_scheme_op.pb.h> + +#include <contrib/libs/apache/arrow/cpp/src/arrow/util/type_fwd.h> +#include <util/system/yassert.h> + +#include <optional> + +namespace NKikimr::NArrow { +bool SupportsCompressionLevel(const arrow::Compression::type compression); +bool SupportsCompressionLevel(const NKikimrSchemeOp::EColumnCodec compression); + +std::optional<int> MinimumCompressionLevel(const arrow::Compression::type compression); +std::optional<int> MaximumCompressionLevel(const arrow::Compression::type compression); +} diff --git a/ydb/core/formats/arrow/serializer/ya.make b/ydb/core/formats/arrow/serializer/ya.make index 8c9fb49fe0..5100ce980a 100644 --- a/ydb/core/formats/arrow/serializer/ya.make +++ b/ydb/core/formats/arrow/serializer/ya.make @@ -13,6 +13,7 @@ SRCS( GLOBAL native.cpp stream.cpp parsing.cpp + utils.cpp ) END() diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp index c11b439ee6..65d284c5b8 100644 --- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp +++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp @@ -132,6 +132,8 @@ bool ConvertCreateTableSettingsToProto(NYql::TKikimrTableMetadataPtr metadata, Y familyProto->set_compression(Ydb::Table::ColumnFamily::COMPRESSION_NONE); } else if (to_lower(family.Compression.GetRef()) == "lz4") { familyProto->set_compression(Ydb::Table::ColumnFamily::COMPRESSION_LZ4); + } else if (to_lower(family.Compression.GetRef()) == "zstd") { + familyProto->set_compression(Ydb::Table::ColumnFamily::COMPRESSION_ZSTD); } else { code = Ydb::StatusIds::BAD_REQUEST; error = TStringBuilder() << "Unknown compression '" << family.Compression.GetRef() << "' for a column family"; @@ -383,9 +385,59 @@ bool FillCreateTableDesc(NYql::TKikimrTableMetadataPtr metadata, NKikimrSchemeOp } template <typename T> -void FillColumnTableSchema(NKikimrSchemeOp::TColumnTableSchema& schema, const T& metadata) -{ +bool FillColumnTableSchema(NKikimrSchemeOp::TColumnTableSchema& schema, const T& metadata, Ydb::StatusIds::StatusCode& code, TString& error) { Y_ENSURE(metadata.ColumnOrder.size() == metadata.Columns.size()); + + THashMap<TString, ui32> columnFamiliesByName; + ui32 columnFamilyId = 1; + for (const auto& family : metadata.ColumnFamilies) { + if (family.Data.Defined()) { + code = Ydb::StatusIds::BAD_REQUEST; + error = TStringBuilder() << "Field `DATA` is not supported for OLAP tables in column family '" << family.Name << "'"; + return false; + } + auto columnFamilyIt = columnFamiliesByName.find(family.Name); + if (!columnFamilyIt.IsEnd()) { + code = Ydb::StatusIds::BAD_REQUEST; + error = TStringBuilder() << "Duplicate column family `" << family.Name << '`'; + return false; + } + auto familyDescription = schema.AddColumnFamilies(); + familyDescription->SetName(family.Name); + if (familyDescription->GetName() == "default") { + familyDescription->SetId(0); + } else { + familyDescription->SetId(columnFamilyId++); + } + Y_ENSURE(columnFamiliesByName.emplace(familyDescription->GetName(), familyDescription->GetId()).second); + if (family.Compression.Defined()) { + NKikimrSchemeOp::EColumnCodec codec; + auto codecName = to_lower(family.Compression.GetRef()); + if (codecName == "off") { + codec = NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain; + } else if (codecName == "zstd") { + codec = NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD; + } else if (codecName == "lz4") { + codec = NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4; + } else { + code = Ydb::StatusIds::BAD_REQUEST; + error = TStringBuilder() << "Unknown compression '" << family.Compression.GetRef() << "' for a column family"; + return false; + } + familyDescription->SetColumnCodec(codec); + } else { + code = Ydb::StatusIds::BAD_REQUEST; + error = TStringBuilder() << "Compression is not set for column family'" << family.Name << "'"; + return false; + } + + if (family.CompressionLevel.Defined()) { + familyDescription->SetColumnCodecLevel(family.CompressionLevel.GetRef()); + } + } + + schema.SetNextColumnFamilyId(columnFamilyId); + for (const auto& name : metadata.ColumnOrder) { auto columnIt = metadata.Columns.find(name); Y_ENSURE(columnIt != metadata.Columns.end()); @@ -399,11 +451,29 @@ void FillColumnTableSchema(NKikimrSchemeOp::TColumnTableSchema& schema, const T& if (columnType.TypeInfo) { *columnDesc.MutableTypeInfo() = *columnType.TypeInfo; } + + if (!columnFamiliesByName.empty()) { + TString columnFamilyName = "default"; + ui32 columnFamilyId = 0; + if (columnIt->second.Families.size()) { + columnFamilyName = *columnIt->second.Families.begin(); + auto columnFamilyIdIt = columnFamiliesByName.find(columnFamilyName); + if (columnFamilyIdIt.IsEnd()) { + code = Ydb::StatusIds::BAD_REQUEST; + error = TStringBuilder() << "Unknown column family `" << columnFamilyName << "` for column `" << columnDesc.GetName() << "`"; + return false; + } + columnFamilyId = columnFamilyIdIt->second; + } + columnDesc.SetColumnFamilyName(columnFamilyName); + columnDesc.SetColumnFamilyId(columnFamilyId); + } } for (const auto& keyColumn : metadata.KeyColumnNames) { schema.AddKeyColumnNames(keyColumn); } + return true; } bool FillCreateColumnTableDesc(NYql::TKikimrTableMetadataPtr metadata, @@ -1705,7 +1775,12 @@ public: NKikimrSchemeOp::TColumnTableDescription* tableDesc = schemeTx.MutableCreateColumnTable(); tableDesc->SetName(pathPair.second); - FillColumnTableSchema(*tableDesc->MutableSchema(), *metadata); + if (!FillColumnTableSchema(*tableDesc->MutableSchema(), *metadata, code, error)) { + IKqpGateway::TGenericResult errResult; + errResult.AddIssue(NYql::TIssue(error)); + errResult.SetStatus(NYql::YqlStatusFromYdbStatus(code)); + return MakeFuture(std::move(errResult)); + } if (!FillCreateColumnTableDesc(metadata, *tableDesc, code, error)) { IKqpGateway::TGenericResult errResult; @@ -2016,7 +2091,22 @@ public: NKikimrSchemeOp::TColumnTableSchemaPreset* schemaPreset = storeDesc->AddSchemaPresets(); schemaPreset->SetName("default"); - FillColumnTableSchema(*schemaPreset->MutableSchema(), settings); + + if (!settings.ColumnFamilies.empty()) { + IKqpGateway::TGenericResult errResult; + errResult.AddIssue(NYql::TIssue("TableStore does not support column families")); + errResult.SetStatus(NYql::YqlStatusFromYdbStatus(Ydb::StatusIds::BAD_REQUEST)); + return MakeFuture(std::move(errResult)); + } + + Ydb::StatusIds::StatusCode code; + TString error; + if (!FillColumnTableSchema(*schemaPreset->MutableSchema(), settings, code, error)) { + IKqpGateway::TGenericResult errResult; + errResult.AddIssue(NYql::TIssue(error)); + errResult.SetStatus(NYql::YqlStatusFromYdbStatus(code)); + return MakeFuture(std::move(errResult)); + } if (IsPrepare()) { auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery(); diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index cf1358a00a..5384541314 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -182,7 +182,8 @@ namespace { return dropGroupSettings; } - TCreateTableStoreSettings ParseCreateTableStoreSettings(TKiCreateTable create, const TTableSettings& settings) { + TCreateTableStoreSettings ParseCreateTableStoreSettings( + TKiCreateTable create, const TTableSettings& settings, const TVector<TColumnFamily>& columnFamilies) { TCreateTableStoreSettings out; out.TableStore = TString(create.Table()); out.ShardsCount = settings.MinPartitions ? *settings.MinPartitions : 0; @@ -215,6 +216,13 @@ namespace { columnMeta.NotNull = notNull; } + if (columnTuple.Size() > 3) { + auto families = columnTuple.Item(3).Cast<TCoAtomList>(); + for (auto family : families) { + columnMeta.Families.push_back(TString(family.Value())); + } + } + out.ColumnOrder.push_back(columnName); out.Columns.insert(std::make_pair(columnName, columnMeta)); } @@ -224,6 +232,7 @@ namespace { out.Indexes.push_back(indexDesc); } #endif + out.ColumnFamilies = columnFamilies; return out; } @@ -1250,8 +1259,8 @@ public: TStringBuilder() << "TABLESTORE with not COLUMN store")); return SyncError(); } - future = Gateway->CreateTableStore(cluster, - ParseCreateTableStoreSettings(maybeCreate.Cast(), table.Metadata->TableSettings), existingOk); + future = Gateway->CreateTableStore(cluster, ParseCreateTableStoreSettings(maybeCreate.Cast(), table.Metadata->TableSettings, + table.Metadata->ColumnFamilies), existingOk); break; } case ETableType::Table: @@ -1569,6 +1578,8 @@ public: f->set_compression(Ydb::Table::ColumnFamily::COMPRESSION_NONE); } else if (to_lower(comp) == "lz4") { f->set_compression(Ydb::Table::ColumnFamily::COMPRESSION_LZ4); + } else if (to_lower(comp) == "zstd") { + f->set_compression(Ydb::Table::ColumnFamily::COMPRESSION_ZSTD); } else { auto errText = TStringBuilder() << "Unknown compression '" << comp << "' for a column family"; diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h index c487f12839..aa61f04f94 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway.h +++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h @@ -699,6 +699,7 @@ struct TCreateTableStoreSettings { TVector<TString> KeyColumnNames; TVector<TString> ColumnOrder; TVector<TIndexDescription> Indexes; + TVector<TColumnFamily> ColumnFamilies; }; struct TAlterTableStoreSettings { diff --git a/ydb/core/kqp/ut/common/columnshard.cpp b/ydb/core/kqp/ut/common/columnshard.cpp index 75b0b3c061..e5e442d7cf 100644 --- a/ydb/core/kqp/ut/common/columnshard.cpp +++ b/ydb/core/kqp/ut/common/columnshard.cpp @@ -1,6 +1,7 @@ #include "columnshard.h" #include <ydb/core/base/tablet_pipecache.h> +#include <ydb/core/formats/arrow/serializer/native.h> #include <ydb/core/formats/arrow/serializer/parsing.h> #include <ydb/core/testlib/cs_helper.h> @@ -152,6 +153,107 @@ namespace NKqp { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), expectedStatus, result.GetIssues().ToString()); } + bool TTestHelper::TCompression::DeserializeFromProto(const NKikimrSchemeOp::TOlapColumn::TSerializer& serializer) { + if (!serializer.GetClassName()) { + return false; + } + if (serializer.GetClassName() == NArrow::NSerialization::TNativeSerializer::GetClassNameStatic()) { + SerializerClassName = serializer.GetClassName(); + if (!serializer.HasArrowCompression() || !serializer.GetArrowCompression().HasCodec()) { + return false; + } + CompressionType = serializer.GetArrowCompression().GetCodec(); + if (serializer.GetArrowCompression().HasLevel()) { + CompressionLevel = serializer.GetArrowCompression().GetLevel(); + } + } else { + return false; + } + + return true; + } + + TString TTestHelper::TCompression::BuildQuery() const { + TStringBuilder str; + str << "COMPRESSION=\"" << NArrow::CompressionToString(CompressionType) << "\""; + if (CompressionLevel.has_value()) { + str << ", COMPRESSION_LEVEL=" << CompressionLevel.value(); + } + return str; + } + + bool TTestHelper::TCompression::IsEqual(const TCompression& rhs, TString& errorMessage) const { + if (SerializerClassName != rhs.GetSerializerClassName()) { + errorMessage = TStringBuilder() << "different serializer class name: in left value `" << SerializerClassName + << "` and in right value `" << rhs.GetSerializerClassName() << "`"; + return false; + } + if (CompressionType != rhs.GetCompressionType()) { + errorMessage = TStringBuilder() << "different compression type: in left value `" << NArrow::CompressionToString(CompressionType) + << "` and in right value `" << NArrow::CompressionToString(rhs.GetCompressionType()) << "`"; + return false; + } + if (CompressionLevel.has_value() && rhs.GetCompressionLevel().has_value() && + CompressionLevel.value() != rhs.GetCompressionLevel().value()) { + errorMessage = TStringBuilder() << "different compression level: in left value `" << CompressionLevel.value() + << "` and in right value `" << rhs.GetCompressionLevel().value() << "`"; + return false; + } else if (CompressionLevel.has_value() && !rhs.GetCompressionLevel().has_value()) { + errorMessage = TStringBuilder() << "compression level is set in left value, but not set in right value"; + return false; + } else if (!CompressionLevel.has_value() && rhs.GetCompressionLevel().has_value()) { + errorMessage = TStringBuilder() << "compression level not set in left value, but set in right value"; + return false; + } + + return true; + } + + TString TTestHelper::TCompression::ToString() const { + return BuildQuery(); + } + + bool TTestHelper::TColumnFamily::DeserializeFromProto(const NKikimrSchemeOp::TFamilyDescription& family) { + if (!family.HasId() || !family.HasName() || !family.HasColumnCodec()) { + return false; + } + Id = family.GetId(); + FamilyName = family.GetName(); + Compression = TTestHelper::TCompression().SetCompressionType(family.GetColumnCodec()); + if (family.HasColumnCodecLevel()) { + Compression.SetCompressionLevel(family.GetColumnCodecLevel()); + } + return true; + } + + TString TTestHelper::TColumnFamily::BuildQuery() const { + TStringBuilder str; + str << "FAMILY " << FamilyName << " ("; + if (!Data.empty()) { + str << "DATA=\"" << Data << "\", "; + } + str << Compression.BuildQuery() << ")"; + return str; + } + + bool TTestHelper::TColumnFamily::IsEqual(const TColumnFamily& rhs, TString& errorMessage) const { + if (Id != rhs.GetId()) { + errorMessage = TStringBuilder() << "different family id: in left value `" << Id << "` and in right value `" << rhs.GetId() << "`"; + return false; + } + if (FamilyName != rhs.GetFamilyName()) { + errorMessage = TStringBuilder() << "different family name: in left value `" << FamilyName << "` and in right value `" + << rhs.GetFamilyName() << "`"; + return false; + } + + return Compression.IsEqual(rhs.GetCompression(), errorMessage); + } + + TString TTestHelper::TColumnFamily::ToString() const { + return BuildQuery(); + } + TString TTestHelper::TColumnSchema::BuildQuery() const { TStringBuilder str; str << Name << ' '; @@ -168,6 +270,9 @@ namespace NKqp { default: str << NScheme::GetTypeName(TypeInfo.GetTypeId()); } + if (!ColumnFamilyName.empty()) { + str << " FAMILY " << ColumnFamilyName; + } if (!NullableFlag) { str << " NOT NULL"; } @@ -181,12 +286,21 @@ namespace NKqp { TString TTestHelper::TColumnTableBase::BuildQuery() const { auto str = TStringBuilder() << "CREATE " << GetObjectType() << " `" << Name << "`"; - str << " (" << BuildColumnsStr(Schema) << ", PRIMARY KEY (" << JoinStrings(PrimaryKey, ", ") << "))"; + str << " (" << BuildColumnsStr(Schema) << ", PRIMARY KEY (" << JoinStrings(PrimaryKey, ", ") << ")"; + if (!ColumnFamilies.empty()) { + TVector<TString> families; + families.reserve(ColumnFamilies.size()); + for (const auto& family : ColumnFamilies) { + families.push_back(family.BuildQuery()); + } + str << ", " << JoinStrings(families, ", "); + } + str << ")"; if (!Sharding.empty()) { str << " PARTITION BY HASH(" << JoinStrings(Sharding, ", ") << ")"; } str << " WITH (STORE = COLUMN"; - str << ", AUTO_PARTITIONING_MIN_PARTITIONS_COUNT =" << MinPartitionsCount; + str << ", AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = " << MinPartitionsCount; if (TTLConf) { str << ", TTL = " << TTLConf->second << " ON " << TTLConf->first; } @@ -196,10 +310,12 @@ namespace NKqp { TString TTestHelper::TColumnTableBase::BuildAlterCompressionQuery(const TString& columnName, const TCompression& compression) const { auto str = TStringBuilder() << "ALTER OBJECT `" << Name << "` (TYPE " << GetObjectType() << ") SET"; - str << " (ACTION=ALTER_COLUMN, NAME=" << columnName << ", `SERIALIZER.CLASS_NAME`=`" << compression.GetSerializerName() << "`,"; - str << " `COMPRESSION.TYPE`=`" << NArrow::CompressionToString(compression.GetType()) << "`"; - if (compression.GetCompressionLevel() != Max<i32>()) { - str << "`COMPRESSION.LEVEL`=" << compression.GetCompressionLevel(); + str << " (ACTION=ALTER_COLUMN, NAME=" << columnName << ", `SERIALIZER.CLASS_NAME`=`" << compression.GetSerializerClassName() << "`,"; + auto codec = NArrow::CompressionFromProto(compression.GetCompressionType()); + Y_VERIFY(codec.has_value()); + str << " `COMPRESSION.TYPE`=`" << NArrow::CompressionToString(codec.value()) << "`"; + if (compression.GetCompressionLevel().has_value()) { + str << "`COMPRESSION.LEVEL`=" << compression.GetCompressionLevel().value(); } str << ");"; return str; diff --git a/ydb/core/kqp/ut/common/columnshard.h b/ydb/core/kqp/ut/common/columnshard.h index 6888b757d7..e7d00ee6a1 100644 --- a/ydb/core/kqp/ut/common/columnshard.h +++ b/ydb/core/kqp/ut/common/columnshard.h @@ -19,15 +19,39 @@ namespace NKqp { class TTestHelper { public: class TCompression { - YDB_ACCESSOR(TString, SerializerName, "ARROW_SERIALIZER"); - YDB_ACCESSOR(arrow::Compression::type, Type, arrow::Compression::type::UNCOMPRESSED); - YDB_ACCESSOR(i32, CompressionLevel, Max<i32>()); + YDB_ACCESSOR(TString, SerializerClassName, "ARROW_SERIALIZER"); + YDB_ACCESSOR_DEF(NKikimrSchemeOp::EColumnCodec, CompressionType); + YDB_ACCESSOR_DEF(std::optional<i32>, CompressionLevel); + + public: + bool DeserializeFromProto(const NKikimrSchemeOp::TOlapColumn::TSerializer& serializer); + TString BuildQuery() const; + + bool IsEqual(const TCompression& rhs, TString& errorMessage) const; + + TString ToString() const; + }; + + class TColumnFamily { + YDB_ACCESSOR(ui32, Id, 0); + YDB_ACCESSOR_DEF(TString, FamilyName); + YDB_ACCESSOR_DEF(TString, Data); + YDB_ACCESSOR_DEF(TCompression, Compression); + + public: + bool DeserializeFromProto(const NKikimrSchemeOp::TFamilyDescription& family); + TString BuildQuery() const; + + bool IsEqual(const TColumnFamily& rhs, TString& errorMessage) const; + + TString ToString() const; }; class TColumnSchema { YDB_ACCESSOR_DEF(TString, Name); YDB_ACCESSOR_DEF(NScheme::TTypeInfo, TypeInfo); YDB_FLAG_ACCESSOR(Nullable, true); + YDB_ACCESSOR_DEF(TString, ColumnFamilyName); public: TString BuildQuery() const; @@ -43,6 +67,7 @@ public: YDB_ACCESSOR_DEF(TVector<TString>, PrimaryKey); YDB_ACCESSOR_DEF(TVector<TString>, Sharding); YDB_ACCESSOR(ui32, MinPartitionsCount, 1); + YDB_ACCESSOR_DEF(TVector<TColumnFamily>, ColumnFamilies); std::optional<std::pair<TString, TString>> TTLConf; diff --git a/ydb/core/kqp/ut/olap/compression_ut.cpp b/ydb/core/kqp/ut/olap/compression_ut.cpp index b7dd5bedf9..b325324a1f 100644 --- a/ydb/core/kqp/ut/olap/compression_ut.cpp +++ b/ydb/core/kqp/ut/olap/compression_ut.cpp @@ -9,7 +9,7 @@ Y_UNIT_TEST_SUITE(KqpOlapCompression) { TVector<TTestHelper::TColumnSchema> schema = { TTestHelper::TColumnSchema().SetName("pk_int").SetType(NScheme::NTypeIds::Uint64).SetNullable(false) }; - TTestHelper::TCompression compression = TTestHelper::TCompression().SetType(arrow::Compression::type::ZSTD); + TTestHelper::TCompression compression = TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD); TTestHelper::TColumnTable standaloneTable; standaloneTable.SetName("/Root/StandaloneTable").SetPrimaryKey({ "pk_int" }).SetSharding({ "pk_int" }).SetSchema(schema); @@ -33,7 +33,7 @@ Y_UNIT_TEST_SUITE(KqpOlapCompression) { TVector<TTestHelper::TColumnSchema> schema = { TTestHelper::TColumnSchema().SetName("pk_int").SetType(NScheme::NTypeIds::Uint64).SetNullable(false) }; - TTestHelper::TCompression compression = TTestHelper::TCompression().SetType(arrow::Compression::type::UNCOMPRESSED); + TTestHelper::TCompression compression = TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); TTestHelper::TColumnTable standaloneTable; standaloneTable.SetName("/Root/StandaloneTable").SetPrimaryKey({ "pk_int" }).SetSharding({ "pk_int" }).SetSchema(schema); @@ -52,7 +52,7 @@ Y_UNIT_TEST_SUITE(KqpOlapCompression) { TVector<TTestHelper::TColumnSchema> schema = { TTestHelper::TColumnSchema().SetName("pk_int").SetType(NScheme::NTypeIds::Uint64).SetNullable(false) }; - TTestHelper::TCompression compression = TTestHelper::TCompression().SetType(arrow::Compression::type::ZSTD); + TTestHelper::TCompression compression = TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD); TTestHelper::TColumnTableStore testTableStore; testTableStore.SetName("/Root/TableStoreTest").SetPrimaryKey({ "pk_int" }).SetSchema(schema); diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index 064ad12052..1bdd65aa71 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -9031,6 +9031,1200 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { csController->EnableBackground(NYDBTest::ICSController::EBackground::Indexation); csController->WaitIndexation(TDuration::Seconds(5)); } + + Y_UNIT_TEST(CreateWithoutColumnFamily) { + TKikimrSettings runnerSettings; + runnerSettings.WithSampleTables = false; + TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false)); + + TString tableName = "/Root/TableWithoutColumnFamily"; + { + TVector<TTestHelper::TColumnSchema> schema = { + TTestHelper::TColumnSchema().SetName("Key").SetType(NScheme::NTypeIds::Uint64).SetNullable(false), + TTestHelper::TColumnSchema().SetName("Value1").SetType(NScheme::NTypeIds::String).SetNullable(true), + TTestHelper::TColumnSchema().SetName("Value2").SetType(NScheme::NTypeIds::Uint32).SetNullable(true) + }; + + TTestHelper::TColumnTable testTable; + testTable.SetName(tableName).SetPrimaryKey({ "Key" }).SetSchema(schema); + testHelper.CreateTable(testTable); + } + + auto& runner = testHelper.GetKikimr(); + auto tableClient = runner.GetTableClient(); + auto session = tableClient.CreateSession().GetValueSync().GetSession(); + + auto runtime = runner.GetTestServer().GetRuntime(); + TActorId sender = runtime->AllocateEdgeActor(); + + auto describeResult = DescribeTable(&runner.GetTestServer(), sender, tableName); + auto schema = describeResult.GetPathDescription().GetColumnTableDescription().GetSchema(); + TTestHelper::TCompression plainCompression = + TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); + TTestHelper::TColumnFamily defaultFamily = + TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default").SetCompression(plainCompression); + + UNIT_ASSERT_EQUAL(schema.ColumnFamiliesSize(), 1); + TTestHelper::TColumnFamily defaultFromScheme; + UNIT_ASSERT(defaultFromScheme.DeserializeFromProto(schema.GetColumnFamilies(0))); + { + TString errorMessage; + UNIT_ASSERT_C(defaultFromScheme.IsEqual(defaultFamily, errorMessage), errorMessage); + } + + auto columns = schema.GetColumns(); + for (ui32 i = 0; i < schema.ColumnsSize(); i++) { + auto column = columns[i]; + UNIT_ASSERT(column.HasSerializer()); + UNIT_ASSERT_EQUAL_C( + column.GetColumnFamilyId(), 0, TStringBuilder() << "family for column " << column.GetName() << " is not default"); + TTestHelper::TCompression compression; + UNIT_ASSERT(compression.DeserializeFromProto(schema.GetColumns(i).GetSerializer())); + TString errorMessage; + UNIT_ASSERT_C(compression.IsEqual(defaultFamily.GetCompression(), errorMessage), errorMessage); + } + } + + // Field `Data` is not used in ColumnFamily for ColumnTable + Y_UNIT_TEST(ColumnFamilyWithFieldData) { + TKikimrSettings runnerSettings; + runnerSettings.WithSampleTables = false; + TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false)); + TString tableName = "/Root/TableWithoutColumnFamily"; + + { + TTestHelper::TCompression plainCompression = + TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); + TTestHelper::TColumnFamily defaultFamily = + TTestHelper::TColumnFamily().SetId(0).SetData("test").SetFamilyName("default").SetCompression(plainCompression); + + TVector<TTestHelper::TColumnSchema> schema = { + TTestHelper::TColumnSchema().SetName("Key").SetType(NScheme::NTypeIds::Uint64).SetNullable(false), + TTestHelper::TColumnSchema().SetName("Value1").SetType(NScheme::NTypeIds::String).SetNullable(true), + TTestHelper::TColumnSchema().SetName("Value2").SetType(NScheme::NTypeIds::Uint32).SetNullable(true) + }; + + TTestHelper::TColumnTable testTable; + testTable.SetName(tableName).SetPrimaryKey({ "Key" }).SetSchema(schema).SetColumnFamilies({ defaultFamily }); + testHelper.CreateTable(testTable, EStatus::GENERIC_ERROR); + } + + { + TTestHelper::TCompression plainCompression = + TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); + TTestHelper::TColumnFamily defaultFamily = + TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default").SetCompression(plainCompression); + + TTestHelper::TCompression lz4Compression = + TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4); + TTestHelper::TColumnFamily family1 = + TTestHelper::TColumnFamily().SetId(1).SetData("test").SetFamilyName("family1").SetCompression(lz4Compression); + + TVector<TTestHelper::TColumnSchema> schema = { + TTestHelper::TColumnSchema().SetName("Key").SetType(NScheme::NTypeIds::Uint64).SetNullable(false), + TTestHelper::TColumnSchema() + .SetName("Value1") + .SetType(NScheme::NTypeIds::String) + .SetNullable(true) + .SetColumnFamilyName(family1.GetFamilyName()), + TTestHelper::TColumnSchema().SetName("Value2").SetType(NScheme::NTypeIds::Uint32).SetNullable(true) + }; + + TTestHelper::TColumnTable testTable; + testTable.SetName(tableName).SetPrimaryKey({ "Key" }).SetSchema(schema).SetColumnFamilies({ defaultFamily, family1 }); + testHelper.CreateTable(testTable, EStatus::GENERIC_ERROR); + } + + { + TTestHelper::TCompression plainCompression = + TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); + TTestHelper::TColumnFamily defaultFamily = + TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default").SetCompression(plainCompression); + + TTestHelper::TCompression lz4Compression = + TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4); + TTestHelper::TColumnFamily family1 = TTestHelper::TColumnFamily().SetId(1).SetFamilyName("family1").SetCompression(lz4Compression); + + TVector<TTestHelper::TColumnSchema> schema = { + TTestHelper::TColumnSchema().SetName("Key").SetType(NScheme::NTypeIds::Uint64).SetNullable(false), + TTestHelper::TColumnSchema() + .SetName("Value1") + .SetType(NScheme::NTypeIds::String) + .SetNullable(true) + .SetColumnFamilyName(family1.GetFamilyName()), + TTestHelper::TColumnSchema().SetName("Value2").SetType(NScheme::NTypeIds::Uint32).SetNullable(true) + }; + + TTestHelper::TColumnTable testTable; + testTable.SetName(tableName).SetPrimaryKey({ "Key" }).SetSchema(schema).SetColumnFamilies({ defaultFamily, family1 }); + testHelper.CreateTable(testTable); + + auto query = TStringBuilder() << R"(ALTER TABLE `)" << tableName << R"(` + ALTER FAMILY ")" << family1.GetFamilyName() + << R"( SET COMPRESSION "lz4";)"; + auto session = testHelper.GetSession(); + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + } + } + + Y_UNIT_TEST(CreateWithDefaultColumnFamily) { + TKikimrSettings runnerSettings; + runnerSettings.WithSampleTables = false; + TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false)); + + TString tableName = "/Root/TableWithDefaultColumnFamily"; + TTestHelper::TCompression zstdCompression = + TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD).SetCompressionLevel(5); + + TVector<TTestHelper::TColumnFamily> families = { + TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default").SetCompression(zstdCompression), + }; + + { + TVector<TTestHelper::TColumnSchema> schema = { + TTestHelper::TColumnSchema().SetName("Key").SetType(NScheme::NTypeIds::Uint64).SetNullable(false), + TTestHelper::TColumnSchema().SetName("Value1").SetType(NScheme::NTypeIds::String).SetNullable(true), + TTestHelper::TColumnSchema().SetName("Value2").SetType(NScheme::NTypeIds::Uint32).SetNullable(true) + }; + + TTestHelper::TColumnTable testTable; + testTable.SetName(tableName).SetPrimaryKey({ "Key" }).SetSchema(schema).SetColumnFamilies(families); + testHelper.CreateTable(testTable); + } + + auto& runner = testHelper.GetKikimr(); + auto runtime = runner.GetTestServer().GetRuntime(); + TActorId sender = runtime->AllocateEdgeActor(); + auto describeResult = DescribeTable(&runner.GetTestServer(), sender, tableName); + auto schema = describeResult.GetPathDescription().GetColumnTableDescription().GetSchema(); + + UNIT_ASSERT_EQUAL(schema.ColumnFamiliesSize(), families.size()); + TTestHelper::TColumnFamily defaultFromScheme; + UNIT_ASSERT(defaultFromScheme.DeserializeFromProto(schema.GetColumnFamilies(0))); + { + TString errorMessage; + UNIT_ASSERT_C(defaultFromScheme.IsEqual(families[0], errorMessage), errorMessage); + } + + for (const auto& column : schema.GetColumns()) { + UNIT_ASSERT(column.HasSerializer()); + UNIT_ASSERT_EQUAL_C( + column.GetColumnFamilyId(), 0, TStringBuilder() << "family for column " << column.GetName() << " is not default"); + TTestHelper::TCompression compression; + UNIT_ASSERT(compression.DeserializeFromProto(column.GetSerializer())); + TString errorMessage; + UNIT_ASSERT_C(compression.IsEqual(families[0].GetCompression(), errorMessage), errorMessage); + } + } + + Y_UNIT_TEST(CrateWithWrongCodec) { + TKikimrSettings runnerSettings; + runnerSettings.WithSampleTables = false; + TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false)); + + TString tableName = "/Root/TableWithWrongCodec"; + TTestHelper::TCompression zstdCompression = + TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD).SetCompressionLevel(100); + + TVector<TTestHelper::TColumnFamily> families = { + TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default").SetCompression(zstdCompression), + }; + + { + TVector<TTestHelper::TColumnSchema> schema = { + TTestHelper::TColumnSchema().SetName("Key").SetType(NScheme::NTypeIds::Uint64).SetNullable(false), + TTestHelper::TColumnSchema().SetName("Value1").SetType(NScheme::NTypeIds::String).SetNullable(true), + TTestHelper::TColumnSchema().SetName("Value2").SetType(NScheme::NTypeIds::Uint32).SetNullable(true) + }; + + TTestHelper::TColumnTable testTable; + testTable.SetName(tableName).SetPrimaryKey({ "Key" }).SetSchema(schema).SetColumnFamilies(families); + testHelper.CreateTable(testTable, EStatus::SCHEME_ERROR); + } + + TTestHelper::TCompression lz4Compression = + TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4).SetCompressionLevel(100); + families[0].SetCompression(lz4Compression); + { + TVector<TTestHelper::TColumnSchema> schema = { + TTestHelper::TColumnSchema().SetName("Key").SetType(NScheme::NTypeIds::Uint64).SetNullable(false), + TTestHelper::TColumnSchema().SetName("Value1").SetType(NScheme::NTypeIds::String).SetNullable(true), + TTestHelper::TColumnSchema().SetName("Value2").SetType(NScheme::NTypeIds::Uint32).SetNullable(true) + }; + + TTestHelper::TColumnTable testTable; + testTable.SetName(tableName).SetPrimaryKey({ "Key" }).SetSchema(schema).SetColumnFamilies(families); + testHelper.CreateTable(testTable, EStatus::SCHEME_ERROR); + } + + { + auto session = testHelper.GetSession(); + auto createQuery = TStringBuilder() << R"(CREATE TABLE `)" << tableName << R"(` ( + Key Uint64 NOT NULL, + Value1 String, + Value2 Uint32, + PRIMARY KEY (Key), + FAMILY default ( + COMPRESSION="snappy" + )) WITH (STORE = COLUMN);)"; + auto result = session.ExecuteSchemeQuery(createQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + } + } + + Y_UNIT_TEST(AlterCompressionType) { + TKikimrSettings runnerSettings; + runnerSettings.WithSampleTables = false; + TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false)); + + TString tableName = "/Root/TableWithColumnFamily"; + TTestHelper::TCompression plainCompression = + TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); + TTestHelper::TCompression zstdCompression = + TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD).SetCompressionLevel(3); + TTestHelper::TCompression lz4Compression = TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4); + + TVector<TTestHelper::TColumnFamily> families = { + TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default").SetCompression(plainCompression), + TTestHelper::TColumnFamily().SetId(1).SetFamilyName("family1").SetCompression(zstdCompression), + TTestHelper::TColumnFamily().SetId(2).SetFamilyName("family2").SetCompression(lz4Compression), + }; + + { + TVector<TTestHelper::TColumnSchema> schema = { + TTestHelper::TColumnSchema().SetName("Key").SetType(NScheme::NTypeIds::Uint64).SetNullable(false), + TTestHelper::TColumnSchema() + .SetName("Value1") + .SetType(NScheme::NTypeIds::String) + .SetNullable(true) + .SetColumnFamilyName(families[1].GetFamilyName()), + TTestHelper::TColumnSchema() + .SetName("Value2") + .SetType(NScheme::NTypeIds::Uint32) + .SetNullable(true) + .SetColumnFamilyName(families[2].GetFamilyName()) + }; + + TTestHelper::TColumnTable testTable; + testTable.SetName(tableName).SetPrimaryKey({ "Key" }).SetSchema(schema).SetColumnFamilies(families); + testHelper.CreateTable(testTable); + } + + auto session = testHelper.GetSession(); + + families[1].MutableCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4).SetCompressionLevel({}); + auto query = TStringBuilder() << R"(ALTER TABLE `)" << tableName << R"(` + ALTER FAMILY family1 SET COMPRESSION "lz4";)"; + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto& runner = testHelper.GetKikimr(); + auto runtime = runner.GetTestServer().GetRuntime(); + TActorId sender = runtime->AllocateEdgeActor(); + auto describeResult = DescribeTable(&runner.GetTestServer(), sender, tableName); + auto schema = describeResult.GetPathDescription().GetColumnTableDescription().GetSchema(); + + UNIT_ASSERT_EQUAL(schema.ColumnFamiliesSize(), families.size()); + for (ui32 i = 0; i < families.size(); i++) { + TTestHelper::TColumnFamily familyFromScheme; + UNIT_ASSERT(familyFromScheme.DeserializeFromProto(schema.GetColumnFamilies(i))); + TString errorMessage; + UNIT_ASSERT_C(familyFromScheme.IsEqual(families[i], errorMessage), errorMessage); + } + + auto columns = schema.GetColumns(); + for (ui32 i = 0; i < schema.ColumnsSize(); i++) { + UNIT_ASSERT(columns[i].HasSerializer()); + UNIT_ASSERT_EQUAL_C(columns[i].GetColumnFamilyId(), i, + TStringBuilder() << "family for column `" << columns[i].GetName() << "` is not `" << families[i].GetFamilyName() << "`"); + TTestHelper::TCompression compression; + UNIT_ASSERT(compression.DeserializeFromProto(columns[i].GetSerializer())); + TString errorMessage; + UNIT_ASSERT_C(compression.IsEqual(families[i].GetCompression(), errorMessage), errorMessage); + } + } + + Y_UNIT_TEST(AlterCompressionLevel) { + TKikimrSettings runnerSettings; + runnerSettings.WithSampleTables = false; + TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false)); + + TString tableName = "/Root/TableWithColumnFamily"; + TTestHelper::TCompression zstdCompression = + TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD).SetCompressionLevel(5); + + TVector<TTestHelper::TColumnFamily> families = { + TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default").SetCompression(zstdCompression), + }; + + { + TVector<TTestHelper::TColumnSchema> schema = { + TTestHelper::TColumnSchema().SetName("Key").SetType(NScheme::NTypeIds::Uint64).SetNullable(false), + TTestHelper::TColumnSchema().SetName("Value1").SetType(NScheme::NTypeIds::String).SetNullable(true), + TTestHelper::TColumnSchema().SetName("Value2").SetType(NScheme::NTypeIds::Uint32).SetNullable(true) + }; + + TTestHelper::TColumnTable testTable; + testTable.SetName(tableName).SetPrimaryKey({ "Key" }).SetSchema(schema).SetColumnFamilies(families); + testHelper.CreateTable(testTable); + } + + families[0].MutableCompression().SetCompressionLevel(6); + auto alterFamilyCompressionLevel = TStringBuilder() << R"(ALTER TABLE `)" << tableName << R"(` + ALTER FAMILY default SET COMPRESSION_LEVEL 6;)"; + auto session = testHelper.GetSession(); + auto result = session.ExecuteSchemeQuery(alterFamilyCompressionLevel).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto& runner = testHelper.GetKikimr(); + auto runtime = runner.GetTestServer().GetRuntime(); + TActorId sender = runtime->AllocateEdgeActor(); + auto describeResult = DescribeTable(&runner.GetTestServer(), sender, tableName); + auto schema = describeResult.GetPathDescription().GetColumnTableDescription().GetSchema(); + + UNIT_ASSERT_EQUAL(schema.ColumnFamiliesSize(), families.size()); + TTestHelper::TColumnFamily defaultFromScheme; + UNIT_ASSERT(defaultFromScheme.DeserializeFromProto(schema.GetColumnFamilies(0))); + { + TString errorMessage; + UNIT_ASSERT_C(defaultFromScheme.IsEqual(families[0], errorMessage), errorMessage); + } + + for (const auto& column : schema.GetColumns()) { + UNIT_ASSERT(column.HasSerializer()); + UNIT_ASSERT_EQUAL_C( + column.GetColumnFamilyId(), 0, TStringBuilder() << "family for column " << column.GetName() << " is not default"); + TTestHelper::TCompression compression; + UNIT_ASSERT(compression.DeserializeFromProto(column.GetSerializer())); + TString errorMessage; + UNIT_ASSERT_C(compression.IsEqual(families[0].GetCompression(), errorMessage), errorMessage); + } + } + + Y_UNIT_TEST(AlterCompressionLevelError) { + TKikimrSettings runnerSettings; + runnerSettings.WithSampleTables = false; + TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false)); + + TString tableName = "/Root/TableWithColumnFamily"; + TTestHelper::TCompression lz4Compression = TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4); + + TVector<TTestHelper::TColumnFamily> families = { + TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default").SetCompression(lz4Compression), + }; + + { + TVector<TTestHelper::TColumnSchema> schema = { + TTestHelper::TColumnSchema().SetName("Key").SetType(NScheme::NTypeIds::Uint64).SetNullable(false), + TTestHelper::TColumnSchema().SetName("Value1").SetType(NScheme::NTypeIds::String).SetNullable(true), + TTestHelper::TColumnSchema().SetName("Value2").SetType(NScheme::NTypeIds::Uint32).SetNullable(true) + }; + + TTestHelper::TColumnTable testTable; + testTable.SetName(tableName).SetPrimaryKey({ "Key" }).SetSchema(schema).SetColumnFamilies(families); + testHelper.CreateTable(testTable); + } + + families[0].MutableCompression().SetCompressionLevel(6); + auto alterFamilyCompressionLevel = TStringBuilder() << R"(ALTER TABLE `)" << tableName << R"(` + ALTER FAMILY default SET COMPRESSION_LEVEL 6;)"; + auto session = testHelper.GetSession(); + auto result = session.ExecuteSchemeQuery(alterFamilyCompressionLevel).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString()); + } + + Y_UNIT_TEST(CreateWithColumnFamily) { + TKikimrSettings runnerSettings; + runnerSettings.WithSampleTables = false; + TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false)); + + TString tableName = "/Root/TableWithColumnFamily"; + TTestHelper::TCompression plainCompression = + TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); + TTestHelper::TCompression zstdCompression = + TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD).SetCompressionLevel(3); + TTestHelper::TCompression lz4Compression = TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4); + + TVector<TTestHelper::TColumnFamily> families = { + TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default").SetCompression(plainCompression), + TTestHelper::TColumnFamily().SetId(1).SetFamilyName("family1").SetCompression(zstdCompression), + TTestHelper::TColumnFamily().SetId(2).SetFamilyName("family2").SetCompression(lz4Compression), + }; + + { + TVector<TTestHelper::TColumnSchema> schema = { + TTestHelper::TColumnSchema().SetName("Key").SetType(NScheme::NTypeIds::Uint64).SetNullable(false), + TTestHelper::TColumnSchema() + .SetName("Value1") + .SetType(NScheme::NTypeIds::String) + .SetNullable(true) + .SetColumnFamilyName(families[1].GetFamilyName()), + TTestHelper::TColumnSchema() + .SetName("Value2") + .SetType(NScheme::NTypeIds::Uint32) + .SetNullable(true) + .SetColumnFamilyName(families[2].GetFamilyName()) + }; + + TTestHelper::TColumnTable testTable; + testTable.SetName(tableName).SetPrimaryKey({ "Key" }).SetSchema(schema).SetColumnFamilies(families); + testHelper.CreateTable(testTable); + } + + auto& runner = testHelper.GetKikimr(); + auto runtime = runner.GetTestServer().GetRuntime(); + TActorId sender = runtime->AllocateEdgeActor(); + auto describeResult = DescribeTable(&runner.GetTestServer(), sender, tableName); + auto schema = describeResult.GetPathDescription().GetColumnTableDescription().GetSchema(); + + UNIT_ASSERT_EQUAL(schema.ColumnFamiliesSize(), families.size()); + for (ui32 i = 0; i < families.size(); i++) { + TTestHelper::TColumnFamily familyFromScheme; + UNIT_ASSERT(familyFromScheme.DeserializeFromProto(schema.GetColumnFamilies(i))); + TString errorMessage; + UNIT_ASSERT_C(familyFromScheme.IsEqual(families[i], errorMessage), errorMessage); + } + + auto columns = schema.GetColumns(); + for (ui32 i = 0; i < schema.ColumnsSize(); i++) { + UNIT_ASSERT(columns[i].HasSerializer()); + UNIT_ASSERT_EQUAL_C(columns[i].GetColumnFamilyId(), i, + TStringBuilder() << "family for column `" << columns[i].GetName() << "` is not `" << families[i].GetFamilyName() << "`"); + TTestHelper::TCompression compression; + UNIT_ASSERT(compression.DeserializeFromProto(columns[i].GetSerializer())); + TString errorMessage; + UNIT_ASSERT_C(compression.IsEqual(families[i].GetCompression(), errorMessage), errorMessage); + } + } + + Y_UNIT_TEST(AddColumnFamily) { + TKikimrSettings runnerSettings; + runnerSettings.WithSampleTables = false; + TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false)); + + TString tableName = "/Root/TableWithColumnFamily"; + TTestHelper::TCompression plainCompression = + TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); + TTestHelper::TCompression zstdCompression = + TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD).SetCompressionLevel(1); + TTestHelper::TCompression lz4Compression = TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4); + + TVector<TTestHelper::TColumnFamily> families = { + TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default").SetCompression(plainCompression), + }; + + { + TVector<TTestHelper::TColumnSchema> schema = { + TTestHelper::TColumnSchema().SetName("Key").SetType(NScheme::NTypeIds::Uint64).SetNullable(false), + TTestHelper::TColumnSchema().SetName("Value1").SetType(NScheme::NTypeIds::String).SetNullable(true), + TTestHelper::TColumnSchema().SetName("Value2").SetType(NScheme::NTypeIds::Uint32).SetNullable(true) + }; + + TTestHelper::TColumnTable testTable; + testTable.SetName(tableName).SetPrimaryKey({ "Key" }).SetSchema(schema).SetColumnFamilies(families); + testHelper.CreateTable(testTable); + } + + auto session = testHelper.GetSession(); + auto& runner = testHelper.GetKikimr(); + auto runtime = runner.GetTestServer().GetRuntime(); + TActorId sender = runtime->AllocateEdgeActor(); + { + families.push_back(TTestHelper::TColumnFamily().SetId(1).SetFamilyName("family1").SetCompression(zstdCompression)); + auto query = TStringBuilder() << R"(ALTER TABLE `)" << tableName << R"(` + ADD FAMILY family1 ( + COMPRESSION = "zstd");)"; + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto describeResult = DescribeTable(&runner.GetTestServer(), sender, tableName); + auto schema = describeResult.GetPathDescription().GetColumnTableDescription().GetSchema(); + UNIT_ASSERT_EQUAL(schema.ColumnFamiliesSize(), families.size()); + for (ui32 i = 0; i < families.size(); i++) { + TTestHelper::TColumnFamily familyFromScheme; + UNIT_ASSERT(familyFromScheme.DeserializeFromProto(schema.GetColumnFamilies(i))); + TString errorMessage; + UNIT_ASSERT_C(familyFromScheme.IsEqual(families[i], errorMessage), errorMessage); + } + } + + { + families.push_back(TTestHelper::TColumnFamily().SetId(2).SetFamilyName("family2").SetCompression(lz4Compression)); + families.push_back(TTestHelper::TColumnFamily().SetId(3).SetFamilyName("family3").SetCompression(zstdCompression)); + auto query = TStringBuilder() << R"(ALTER TABLE `)" << tableName << R"(` + ADD FAMILY family2 ( + COMPRESSION = "lz4"), + ADD FAMILY family3 ( + COMPRESSION = "zstd");)"; + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto describeResult = DescribeTable(&runner.GetTestServer(), sender, tableName); + auto schema = describeResult.GetPathDescription().GetColumnTableDescription().GetSchema(); + UNIT_ASSERT_EQUAL(schema.ColumnFamiliesSize(), families.size()); + for (ui32 i = 0; i < families.size(); i++) { + TTestHelper::TColumnFamily familyFromScheme; + UNIT_ASSERT(familyFromScheme.DeserializeFromProto(schema.GetColumnFamilies(i))); + TString errorMessage; + UNIT_ASSERT_C(familyFromScheme.IsEqual(families[i], errorMessage), errorMessage); + } + } + } + + Y_UNIT_TEST(AddColumnWithoutColumnFamily) { + TKikimrSettings runnerSettings; + runnerSettings.WithSampleTables = false; + TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false)); + + TString tableName = "/Root/TableWithColumnFamily"; + TTestHelper::TCompression zstdCompression = + TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD).SetCompressionLevel(1); + + TVector<TTestHelper::TColumnFamily> families = { + TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default").SetCompression(zstdCompression), + }; + + { + TVector<TTestHelper::TColumnSchema> schema = { + TTestHelper::TColumnSchema().SetName("Key").SetType(NScheme::NTypeIds::Uint64).SetNullable(false), + TTestHelper::TColumnSchema().SetName("Value1").SetType(NScheme::NTypeIds::String).SetNullable(true), + TTestHelper::TColumnSchema().SetName("Value2").SetType(NScheme::NTypeIds::Uint32).SetNullable(true) + }; + + TTestHelper::TColumnTable testTable; + testTable.SetName(tableName).SetPrimaryKey({ "Key" }).SetSchema(schema).SetColumnFamilies(families); + testHelper.CreateTable(testTable); + } + + auto session = testHelper.GetSession(); + auto& runner = testHelper.GetKikimr(); + auto runtime = runner.GetTestServer().GetRuntime(); + TActorId sender = runtime->AllocateEdgeActor(); + + { + auto query = TStringBuilder() << R"(ALTER TABLE `)" << tableName << R"(` + ADD COLUMN Value3 Uint32;)"; + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto describeResult = DescribeTable(&runner.GetTestServer(), sender, tableName); + auto schema = describeResult.GetPathDescription().GetColumnTableDescription().GetSchema(); + UNIT_ASSERT_EQUAL(schema.ColumnFamiliesSize(), families.size()); + for (ui32 i = 0; i < families.size(); i++) { + TTestHelper::TColumnFamily familyFromScheme; + UNIT_ASSERT(familyFromScheme.DeserializeFromProto(schema.GetColumnFamilies(i))); + TString errorMessage; + UNIT_ASSERT_C(familyFromScheme.IsEqual(families[i], errorMessage), errorMessage); + } + + auto columns = schema.GetColumns(); + for (ui32 i = 0; i < schema.ColumnsSize(); i++) { + UNIT_ASSERT(columns[i].HasSerializer()); + UNIT_ASSERT_EQUAL_C(columns[i].GetColumnFamilyId(), 0, + TStringBuilder() << "family for column `" << columns[i].GetName() << "` is not `" << families[0].GetFamilyName() + << "`"); + TTestHelper::TCompression compression; + UNIT_ASSERT(compression.DeserializeFromProto(columns[i].GetSerializer())); + TString errorMessage; + UNIT_ASSERT_C(compression.IsEqual(families[0].GetCompression(), errorMessage), errorMessage); + } + } + + } + + Y_UNIT_TEST(AddColumnWithColumnFamily) { + TKikimrSettings runnerSettings; + runnerSettings.WithSampleTables = false; + TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false)); + + TString tableName = "/Root/TableWithColumnFamily"; + TTestHelper::TCompression plainCompression = + TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); + TTestHelper::TCompression zstdCompression = + TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD).SetCompressionLevel(1); + TTestHelper::TCompression lz4Compression = TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4); + + TVector<TTestHelper::TColumnFamily> families = { + TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default").SetCompression(plainCompression), + }; + + { + TVector<TTestHelper::TColumnSchema> schema = { + TTestHelper::TColumnSchema().SetName("Key").SetType(NScheme::NTypeIds::Uint64).SetNullable(false), + TTestHelper::TColumnSchema().SetName("Value1").SetType(NScheme::NTypeIds::String).SetNullable(true), + TTestHelper::TColumnSchema().SetName("Value2").SetType(NScheme::NTypeIds::Uint32).SetNullable(true) + }; + + TTestHelper::TColumnTable testTable; + testTable.SetName(tableName).SetPrimaryKey({ "Key" }).SetSchema(schema).SetColumnFamilies(families); + testHelper.CreateTable(testTable); + } + + auto session = testHelper.GetSession(); + auto& runner = testHelper.GetKikimr(); + auto runtime = runner.GetTestServer().GetRuntime(); + TActorId sender = runtime->AllocateEdgeActor(); + { + families.push_back(TTestHelper::TColumnFamily().SetId(1).SetFamilyName("family1").SetCompression(zstdCompression)); + auto query = TStringBuilder() << R"(ALTER TABLE `)" << tableName << R"(` + ADD FAMILY family1 ( + COMPRESSION = "zstd");)"; + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + auto query = TStringBuilder() << R"(ALTER TABLE `)" << tableName << R"(` + ADD COLUMN Value3 Uint32 FAMILY family1;)"; + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto describeResult = DescribeTable(&runner.GetTestServer(), sender, tableName); + auto schema = describeResult.GetPathDescription().GetColumnTableDescription().GetSchema(); + UNIT_ASSERT_EQUAL(schema.ColumnFamiliesSize(), families.size()); + for (ui32 i = 0; i < families.size(); i++) { + TTestHelper::TColumnFamily familyFromScheme; + UNIT_ASSERT(familyFromScheme.DeserializeFromProto(schema.GetColumnFamilies(i))); + TString errorMessage; + UNIT_ASSERT_C(familyFromScheme.IsEqual(families[i], errorMessage), errorMessage); + } + + auto columns = schema.GetColumns(); + for (ui32 i = 0; i < schema.ColumnsSize(); i++) { + UNIT_ASSERT(columns[i].HasSerializer()); + ui32 indexFamily = 0; + if (columns[i].GetName() == "Value3") { + indexFamily = 1; + } + UNIT_ASSERT_EQUAL_C(columns[i].GetColumnFamilyId(), indexFamily, + TStringBuilder() << "family for column `" << columns[i].GetName() << "` is not `" << families[indexFamily].GetFamilyName() + << "`"); + TTestHelper::TCompression compression; + UNIT_ASSERT(compression.DeserializeFromProto(columns[i].GetSerializer())); + TString errorMessage; + UNIT_ASSERT_C(compression.IsEqual(families[indexFamily].GetCompression(), errorMessage), errorMessage); + } + } + + { + families.push_back(TTestHelper::TColumnFamily().SetId(2).SetFamilyName("family2").SetCompression(lz4Compression)); + auto query = TStringBuilder() << R"(ALTER TABLE `)" << tableName << R"(` + ADD FAMILY family2 ( + COMPRESSION = "lz4"), + ADD COLUMN Value4 Uint32 FAMILY family2, + ADD COLUMN Value5 Uint32 FAMILY family1;)"; + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto describeResult = DescribeTable(&runner.GetTestServer(), sender, tableName); + auto schema = describeResult.GetPathDescription().GetColumnTableDescription().GetSchema(); + UNIT_ASSERT_EQUAL(schema.ColumnFamiliesSize(), families.size()); + for (ui32 i = 0; i < families.size(); i++) { + TTestHelper::TColumnFamily familyFromScheme; + UNIT_ASSERT(familyFromScheme.DeserializeFromProto(schema.GetColumnFamilies(i))); + TString errorMessage; + UNIT_ASSERT_C(familyFromScheme.IsEqual(families[i], errorMessage), errorMessage); + } + + auto columns = schema.GetColumns(); + for (ui32 i = 0; i < schema.ColumnsSize(); i++) { + UNIT_ASSERT(columns[i].HasSerializer()); + ui32 indexFamily = 0; + if (columns[i].GetName() == "Value3" || columns[i].GetName() == "Value5") { + indexFamily = 1; + } else if (columns[i].GetName() == "Value4") { + indexFamily = 2; + } + UNIT_ASSERT_EQUAL_C(columns[i].GetColumnFamilyId(), indexFamily, + TStringBuilder() << "family for column `" << columns[i].GetName() << "` is not `" << families[indexFamily].GetFamilyName() + << "`"); + TTestHelper::TCompression compression; + UNIT_ASSERT(compression.DeserializeFromProto(columns[i].GetSerializer())); + TString errorMessage; + UNIT_ASSERT_C(compression.IsEqual(families[indexFamily].GetCompression(), errorMessage), errorMessage); + } + } + } + + Y_UNIT_TEST(SetColumnFamily) { + TKikimrSettings runnerSettings; + runnerSettings.WithSampleTables = false; + TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false)); + + TString tableName = "/Root/TableWithColumnFamily"; + TTestHelper::TCompression plainCompression = + TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); + TTestHelper::TCompression zstdCompression = + TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD).SetCompressionLevel(1); + TTestHelper::TCompression lz4Compression = TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4); + + TVector<TTestHelper::TColumnFamily> families = { + TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default").SetCompression(plainCompression), + TTestHelper::TColumnFamily().SetId(1).SetFamilyName("family1").SetCompression(zstdCompression), + }; + + { + TVector<TTestHelper::TColumnSchema> schema = { + TTestHelper::TColumnSchema().SetName("Key").SetType(NScheme::NTypeIds::Uint64).SetNullable(false), + TTestHelper::TColumnSchema().SetName("Value1").SetType(NScheme::NTypeIds::String).SetNullable(true), + TTestHelper::TColumnSchema().SetName("Value2").SetType(NScheme::NTypeIds::Uint32).SetNullable(true) + }; + + TTestHelper::TColumnTable testTable; + testTable.SetName(tableName).SetPrimaryKey({ "Key" }).SetSchema(schema).SetColumnFamilies(families); + testHelper.CreateTable(testTable); + } + + auto session = testHelper.GetSession(); + auto& runner = testHelper.GetKikimr(); + auto runtime = runner.GetTestServer().GetRuntime(); + TActorId sender = runtime->AllocateEdgeActor(); + + { + auto query = TStringBuilder() << R"(ALTER TABLE `)" << tableName << R"(` + ALTER COLUMN Value1 SET FAMILY family1;)"; + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto describeResult = DescribeTable(&runner.GetTestServer(), sender, tableName); + auto schema = describeResult.GetPathDescription().GetColumnTableDescription().GetSchema(); + + auto columns = schema.GetColumns(); + for (ui32 i = 0; i < schema.ColumnsSize(); i++) { + UNIT_ASSERT(columns[i].HasSerializer()); + ui32 indexFamily = 0; + if (columns[i].GetName() == "Value1") { + indexFamily = 1; + } + + UNIT_ASSERT_EQUAL_C(columns[i].GetColumnFamilyId(), indexFamily, + TStringBuilder() << "family for column `" << columns[i].GetName() << "` is not `" << families[indexFamily].GetFamilyName() + << "`"); + TTestHelper::TCompression compression; + UNIT_ASSERT(compression.DeserializeFromProto(columns[i].GetSerializer())); + TString errorMessage; + UNIT_ASSERT_C(compression.IsEqual(families[indexFamily].GetCompression(), errorMessage), errorMessage); + } + } + + { + families.push_back(TTestHelper::TColumnFamily().SetId(2).SetFamilyName("family2").SetCompression(lz4Compression)); + auto query = TStringBuilder() << R"(ALTER TABLE `)" << tableName << R"(` + ADD FAMILY family2 ( + COMPRESSION = "lz4"), + ALTER COLUMN Value2 SET FAMILY family2;)"; + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto describeResult = DescribeTable(&runner.GetTestServer(), sender, tableName); + auto schema = describeResult.GetPathDescription().GetColumnTableDescription().GetSchema(); + UNIT_ASSERT_EQUAL(schema.ColumnFamiliesSize(), families.size()); + for (ui32 i = 0; i < families.size(); i++) { + TTestHelper::TColumnFamily familyFromScheme; + UNIT_ASSERT(familyFromScheme.DeserializeFromProto(schema.GetColumnFamilies(i))); + TString errorMessage; + UNIT_ASSERT_C(familyFromScheme.IsEqual(families[i], errorMessage), errorMessage); + } + + auto columns = schema.GetColumns(); + for (ui32 i = 0; i < schema.ColumnsSize(); i++) { + UNIT_ASSERT(columns[i].HasSerializer()); + ui32 indexFamily = 0; + if (columns[i].GetName() == "Value1") { + indexFamily = 1; + } else if (columns[i].GetName() == "Value2") { + indexFamily = 2; + } + + UNIT_ASSERT_EQUAL_C(columns[i].GetColumnFamilyId(), indexFamily, + TStringBuilder() << "family for column `" << columns[i].GetName() << "` is not `" << families[indexFamily].GetFamilyName() + << "`"); + TTestHelper::TCompression compression; + UNIT_ASSERT(compression.DeserializeFromProto(columns[i].GetSerializer())); + TString errorMessage; + UNIT_ASSERT_C(compression.IsEqual(families[indexFamily].GetCompression(), errorMessage), errorMessage); + } + } + } + + Y_UNIT_TEST(WithoutDefaultColumnFamily) { + TKikimrSettings runnerSettings; + runnerSettings.WithSampleTables = false; + TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false)); + TString tableName = "/Root/TableWithFamily"; + + TTestHelper::TCompression plainCompression = + TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); + + TVector<TTestHelper::TColumnFamily> families = { + TTestHelper::TColumnFamily().SetId(1).SetFamilyName("family1").SetCompression(plainCompression), + }; + + { + TVector<TTestHelper::TColumnSchema> schema = { + TTestHelper::TColumnSchema().SetName("Key").SetType(NScheme::NTypeIds::Uint64).SetNullable(false), + TTestHelper::TColumnSchema() + .SetName("Value1") + .SetType(NScheme::NTypeIds::String) + .SetNullable(true) + .SetColumnFamilyName(families[0].GetFamilyName()), + TTestHelper::TColumnSchema() + .SetName("Value2") + .SetType(NScheme::NTypeIds::Uint32) + .SetNullable(true) + .SetColumnFamilyName(families[0].GetFamilyName()) + }; + + TTestHelper::TColumnTable testTable; + testTable.SetName(tableName).SetPrimaryKey({ "Key" }).SetSchema(schema).SetColumnFamilies(families); + testHelper.CreateTable(testTable); + } + + families.push_back(TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default").SetCompression(plainCompression)); + auto& runner = testHelper.GetKikimr(); + auto runtime = runner.GetTestServer().GetRuntime(); + TActorId sender = runtime->AllocateEdgeActor(); + auto describeResult = DescribeTable(&runner.GetTestServer(), sender, tableName); + auto schema = describeResult.GetPathDescription().GetColumnTableDescription().GetSchema(); + UNIT_ASSERT_EQUAL(schema.ColumnFamiliesSize(), families.size()); + for (ui32 i = 0; i < families.size(); i++) { + TTestHelper::TColumnFamily familyFromScheme; + UNIT_ASSERT(familyFromScheme.DeserializeFromProto(schema.GetColumnFamilies(i))); + ui32 familyIndex = 0; + if (familyFromScheme.GetFamilyName() == "default") { + familyIndex = 1; + } + TString errorMessage; + UNIT_ASSERT_C(familyFromScheme.IsEqual(families[familyIndex], errorMessage), errorMessage); + } + } + + Y_UNIT_TEST(UnknownColumnFamily) { + TKikimrSettings runnerSettings; + runnerSettings.WithSampleTables = false; + TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false)); + TString tableName = "/Root/TableWithFamily"; + + TTestHelper::TCompression plainCompression = + TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); + + TVector<TTestHelper::TColumnFamily> families = { + TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default").SetCompression(plainCompression), + }; + + TVector<TTestHelper::TColumnSchema> schema = { + TTestHelper::TColumnSchema().SetName("Key").SetType(NScheme::NTypeIds::Uint64).SetNullable(false), + TTestHelper::TColumnSchema().SetName("Value1").SetType(NScheme::NTypeIds::String).SetNullable(true).SetColumnFamilyName("family1"), + TTestHelper::TColumnSchema().SetName("Value2").SetType(NScheme::NTypeIds::Uint32).SetNullable(true).SetColumnFamilyName("family1") + }; + + TTestHelper::TColumnTable testTable; + testTable.SetName(tableName).SetPrimaryKey({ "Key" }).SetSchema(schema).SetColumnFamilies(families); + testHelper.CreateTable(testTable, EStatus::GENERIC_ERROR); + } + + Y_UNIT_TEST(PrimaryKeyNotDefaultColumnFamily) { + TKikimrSettings runnerSettings; + runnerSettings.WithSampleTables = false; + TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false)); + TString tableName = "/Root/TableWithFamily"; + + TTestHelper::TCompression plainCompression = + TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); + TTestHelper::TCompression zstdCompression = + TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD).SetCompressionLevel(1); + + TVector<TTestHelper::TColumnFamily> families = { + TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default").SetCompression(plainCompression), + TTestHelper::TColumnFamily().SetId(1).SetFamilyName("family1").SetCompression(zstdCompression), + }; + + { + TVector<TTestHelper::TColumnSchema> schema = { TTestHelper::TColumnSchema() + .SetName("Key") + .SetType(NScheme::NTypeIds::Uint64) + .SetColumnFamilyName(families[1].GetFamilyName()) + .SetNullable(false), + TTestHelper::TColumnSchema() + .SetName("Value1") + .SetType(NScheme::NTypeIds::String) + .SetNullable(true) + .SetColumnFamilyName(families[1].GetFamilyName()), + TTestHelper::TColumnSchema() + .SetName("Value2") + .SetType(NScheme::NTypeIds::Uint32) + .SetNullable(true) + .SetColumnFamilyName(families[1].GetFamilyName()) }; + TTestHelper::TColumnTable testTable; + testTable.SetName(tableName).SetPrimaryKey({ "Key" }).SetSchema(schema).SetColumnFamilies(families); + testHelper.CreateTable(testTable); + } + + auto& runner = testHelper.GetKikimr(); + auto runtime = runner.GetTestServer().GetRuntime(); + TActorId sender = runtime->AllocateEdgeActor(); + auto describeResult = DescribeTable(&runner.GetTestServer(), sender, tableName); + auto schema = describeResult.GetPathDescription().GetColumnTableDescription().GetSchema(); + + UNIT_ASSERT_EQUAL(schema.ColumnFamiliesSize(), families.size()); + for (ui32 i = 0; i < families.size(); i++) { + TTestHelper::TColumnFamily familyFromScheme; + UNIT_ASSERT(familyFromScheme.DeserializeFromProto(schema.GetColumnFamilies(i))); + TString errorMessage; + UNIT_ASSERT_C(familyFromScheme.IsEqual(families[i], errorMessage), errorMessage); + } + + auto columns = schema.GetColumns(); + for (ui32 i = 0; i < schema.ColumnsSize(); i++) { + UNIT_ASSERT(columns[i].HasSerializer()); + UNIT_ASSERT_EQUAL_C(columns[i].GetColumnFamilyId(), 1, + TStringBuilder() << "family for column `" << columns[i].GetName() << "` is not `" << families[1].GetFamilyName() << "`"); + TTestHelper::TCompression compression; + UNIT_ASSERT(compression.DeserializeFromProto(columns[i].GetSerializer())); + TString errorMessage; + UNIT_ASSERT_C(compression.IsEqual(families[1].GetCompression(), errorMessage), errorMessage); + } + } + + Y_UNIT_TEST(SetNotDefaultColumnFamilyForPrimaryKey) { + TKikimrSettings runnerSettings; + runnerSettings.WithSampleTables = false; + TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false)); + TString tableName = "/Root/TableWithFamily"; + + TTestHelper::TCompression plainCompression = + TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); + TTestHelper::TCompression zstdCompression = + TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD).SetCompressionLevel(1); + + TVector<TTestHelper::TColumnFamily> families = { + TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default").SetCompression(plainCompression), + TTestHelper::TColumnFamily().SetId(1).SetFamilyName("family1").SetCompression(zstdCompression), + }; + + { + TVector<TTestHelper::TColumnSchema> schema = { + TTestHelper::TColumnSchema().SetName("Key").SetType(NScheme::NTypeIds::Uint64).SetNullable(false), + TTestHelper::TColumnSchema() + .SetName("Value1") + .SetType(NScheme::NTypeIds::String) + .SetNullable(true) + .SetColumnFamilyName(families[1].GetFamilyName()), + TTestHelper::TColumnSchema() + .SetName("Value2") + .SetType(NScheme::NTypeIds::Uint32) + .SetNullable(true) + .SetColumnFamilyName(families[1].GetFamilyName()) + }; + TTestHelper::TColumnTable testTable; + testTable.SetName(tableName).SetPrimaryKey({ "Key" }).SetSchema(schema).SetColumnFamilies(families); + testHelper.CreateTable(testTable); + } + + auto session = testHelper.GetSession(); + auto query = TStringBuilder() << R"(ALTER TABLE `)" << tableName << R"(` + ALTER COLUMN Key SET FAMILY family1;)"; + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto& runner = testHelper.GetKikimr(); + auto runtime = runner.GetTestServer().GetRuntime(); + TActorId sender = runtime->AllocateEdgeActor(); + auto describeResult = DescribeTable(&runner.GetTestServer(), sender, tableName); + auto schema = describeResult.GetPathDescription().GetColumnTableDescription().GetSchema(); + + UNIT_ASSERT_EQUAL(schema.ColumnFamiliesSize(), families.size()); + for (ui32 i = 0; i < families.size(); i++) { + TTestHelper::TColumnFamily familyFromScheme; + UNIT_ASSERT(familyFromScheme.DeserializeFromProto(schema.GetColumnFamilies(i))); + TString errorMessage; + UNIT_ASSERT_C(familyFromScheme.IsEqual(families[i], errorMessage), errorMessage); + } + + auto columns = schema.GetColumns(); + for (ui32 i = 0; i < schema.ColumnsSize(); i++) { + UNIT_ASSERT(columns[i].HasSerializer()); + UNIT_ASSERT_EQUAL_C(columns[i].GetColumnFamilyId(), 1, + TStringBuilder() << "family for column `" << columns[i].GetName() << "` is not `" << families[1].GetFamilyName() << "`"); + TTestHelper::TCompression compression; + UNIT_ASSERT(compression.DeserializeFromProto(columns[i].GetSerializer())); + TString errorMessage; + UNIT_ASSERT_C(compression.IsEqual(families[1].GetCompression(), errorMessage), errorMessage); + } + } + + Y_UNIT_TEST(AddExsitsColumnFamily) { + TKikimrSettings runnerSettings; + runnerSettings.WithSampleTables = false; + TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false)); + + TString tableName = "/Root/TableWithFamily"; + TTestHelper::TCompression plainCompression = + TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); + TTestHelper::TCompression lz4Compression = TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4); + + TVector<TTestHelper::TColumnFamily> families = { + TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default").SetCompression(plainCompression), + TTestHelper::TColumnFamily().SetId(1).SetFamilyName("family1").SetCompression(lz4Compression), + TTestHelper::TColumnFamily().SetId(2).SetFamilyName("family2").SetCompression(lz4Compression), + }; + + TVector<TTestHelper::TColumnSchema> schema = { + TTestHelper::TColumnSchema().SetName("Key").SetType(NScheme::NTypeIds::Uint64).SetNullable(false), + TTestHelper::TColumnSchema() + .SetName("Value1") + .SetType(NScheme::NTypeIds::String) + .SetNullable(true) + .SetColumnFamilyName(families[1].GetFamilyName()), + TTestHelper::TColumnSchema() + .SetName("Value2") + .SetType(NScheme::NTypeIds::Uint32) + .SetNullable(true) + .SetColumnFamilyName(families[1].GetFamilyName()) + }; + + TTestHelper::TColumnTable testTable; + testTable.SetName(tableName).SetPrimaryKey({ "Key" }).SetSchema(schema).SetColumnFamilies(families); + testHelper.CreateTable(testTable); + + auto session = testHelper.GetSession(); + { + auto query = TStringBuilder() << R"(ALTER TABLE `)" << tableName << R"(` + ADD FAMILY family1 (COMPRESSION = "lz4")"; + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + } + + { + auto query = TStringBuilder() << R"(ALTER TABLE `)" << tableName + << R"(` ADD FAMILY family3 (COMPRESSION = "lz4"), ADD FAMILY family3 (COMPRESSION = "zstd")"; + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + } + } + + Y_UNIT_TEST(AddColumnFamilyWithNotSupportedCodec) { + TKikimrSettings runnerSettings; + runnerSettings.WithSampleTables = false; + TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false)); + + TString tableName = "/Root/TableWithFamily"; + TTestHelper::TCompression plainCompression = + TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); + TTestHelper::TCompression lz4Compression = TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4); + + TVector<TTestHelper::TColumnFamily> families = { + TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default").SetCompression(plainCompression), + TTestHelper::TColumnFamily().SetId(1).SetFamilyName("family1").SetCompression(lz4Compression), + TTestHelper::TColumnFamily().SetId(2).SetFamilyName("family2").SetCompression(lz4Compression), + }; + + TVector<TTestHelper::TColumnSchema> schema = { + TTestHelper::TColumnSchema().SetName("Key").SetType(NScheme::NTypeIds::Uint64).SetNullable(false), + TTestHelper::TColumnSchema() + .SetName("Value1") + .SetType(NScheme::NTypeIds::String) + .SetNullable(true) + .SetColumnFamilyName(families[1].GetFamilyName()), + TTestHelper::TColumnSchema() + .SetName("Value2") + .SetType(NScheme::NTypeIds::Uint32) + .SetNullable(true) + .SetColumnFamilyName(families[1].GetFamilyName()) + }; + + TTestHelper::TColumnTable testTable; + testTable.SetName(tableName).SetPrimaryKey({ "Key" }).SetSchema(schema).SetColumnFamilies(families); + testHelper.CreateTable(testTable); + + auto session = testHelper.GetSession(); + { + auto query = TStringBuilder() << R"(ALTER TABLE `)" << tableName << R"(` + ADD FAMILY family1 (COMPRESSION = "snappy")"; + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + } + + { + auto query = TStringBuilder() << R"(ALTER TABLE `)" << tableName << R"(` + ADD FAMILY family1 (COMPRESSION = "lz4", COMPRESSION_LEVEL = 5)"; + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + } + } + + Y_UNIT_TEST(TwoSimilarColumnFamilies) { + TKikimrSettings runnerSettings; + runnerSettings.WithSampleTables = false; + TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false)); + + TString tableName = "/Root/TableWithFamily"; + TTestHelper::TCompression plainCompression = + TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); + TTestHelper::TCompression lz4Compression = TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4); + + TVector<TTestHelper::TColumnFamily> families = { + TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default").SetCompression(plainCompression), + TTestHelper::TColumnFamily().SetId(1).SetFamilyName("family1").SetCompression(lz4Compression), + TTestHelper::TColumnFamily().SetId(2).SetFamilyName("family1").SetCompression(lz4Compression), + }; + + TVector<TTestHelper::TColumnSchema> schema = { + TTestHelper::TColumnSchema().SetName("Key").SetType(NScheme::NTypeIds::Uint64).SetNullable(false), + TTestHelper::TColumnSchema() + .SetName("Value1") + .SetType(NScheme::NTypeIds::String) + .SetNullable(true) + .SetColumnFamilyName(families[1].GetFamilyName()), + TTestHelper::TColumnSchema() + .SetName("Value2") + .SetType(NScheme::NTypeIds::Uint32) + .SetNullable(true) + .SetColumnFamilyName(families[1].GetFamilyName()) + }; + + TTestHelper::TColumnTable testTable; + testTable.SetName(tableName).SetPrimaryKey({ "Key" }).SetSchema(schema).SetColumnFamilies(families); + testHelper.CreateTable(testTable, EStatus::GENERIC_ERROR); + } + + Y_UNIT_TEST(CreateTableStoreWithFamily) { + TKikimrSettings runnerSettings; + runnerSettings.WithSampleTables = false; + TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false)); + + TString tableName = "/Root/TableStoreWithColumnFamily"; + TTestHelper::TCompression plainCompression = + TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); + TTestHelper::TCompression zstdCompression = + TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD).SetCompressionLevel(1); + TTestHelper::TCompression lz4Compression = TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4); + + TVector<TTestHelper::TColumnFamily> families = { + TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default").SetCompression(plainCompression), + TTestHelper::TColumnFamily().SetId(1).SetFamilyName("family1").SetCompression(zstdCompression), + TTestHelper::TColumnFamily().SetId(2).SetFamilyName("family2").SetCompression(lz4Compression), + }; + + TVector<TTestHelper::TColumnSchema> schema = { + TTestHelper::TColumnSchema().SetName("Key").SetType(NScheme::NTypeIds::Uint64).SetNullable(false), + TTestHelper::TColumnSchema() + .SetName("Value1") + .SetType(NScheme::NTypeIds::String) + .SetNullable(true) + .SetColumnFamilyName(families[1].GetFamilyName()), + TTestHelper::TColumnSchema() + .SetName("Value2") + .SetType(NScheme::NTypeIds::Uint32) + .SetNullable(true) + .SetColumnFamilyName(families[2].GetFamilyName()) + }; + + TTestHelper::TColumnTableStore testTable; + testTable.SetName(tableName).SetPrimaryKey({ "Key" }).SetSchema(schema).SetColumnFamilies(families); + testHelper.CreateTable(testTable, EStatus::GENERIC_ERROR); + } } Y_UNIT_TEST_SUITE(KqpOlapTypes) { diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 1ddfa0ccdb..88f08508c0 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -347,6 +347,7 @@ message TOlapColumnDiff { optional string StorageId = 6; optional string DefaultValue = 7; optional NKikimrArrowAccessorProto.TRequestedConstructor DataAccessorConstructor = 8; + optional string ColumnFamilyName = 9; } message TOlapColumnDescription { @@ -367,6 +368,8 @@ message TOlapColumnDescription { optional string StorageId = 11; optional NKikimrColumnShardColumnDefaults.TColumnDefault DefaultValue = 12; optional NKikimrArrowAccessorProto.TConstructor DataAccessorConstructor = 13; + optional uint32 ColumnFamilyId = 14; + optional string ColumnFamilyName = 15; } message TRequestedBloomFilter { @@ -507,6 +510,10 @@ message TColumnTableSchema { repeated TOlapIndexDescription Indexes = 10; optional TColumnTableSchemeOptions Options = 12; + + repeated TFamilyDescription ColumnFamilies = 13; + // Internal fields + optional uint32 NextColumnFamilyId = 14; } message TColumnTableSchemaDiff { @@ -538,6 +545,8 @@ message TAlterColumnTableSchema { repeated TOlapIndexRequested UpsertIndexes = 8; repeated string DropIndexes = 9; optional TColumnTableRequestedOptions Options = 12; + repeated TFamilyDescription AddColumnFamily = 13; + repeated TFamilyDescription AlterColumnFamily = 14; } // Schema presets are used to manage multiple tables with the same schema diff --git a/ydb/core/tx/schemeshard/olap/column_families/schema.cpp b/ydb/core/tx/schemeshard/olap/column_families/schema.cpp new file mode 100644 index 0000000000..7667e984d5 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/column_families/schema.cpp @@ -0,0 +1,152 @@ +#include "schema.h" + +#include <ydb/library/accessor/validator.h> + +namespace NKikimr::NSchemeShard { + +void TOlapColumnFamily::Serialize(NKikimrSchemeOp::TFamilyDescription& columnFamily) const { + columnFamily.SetId(Id); + TBase::Serialize(columnFamily); +} + +void TOlapColumnFamily::ParseFromLocalDB(const NKikimrSchemeOp::TFamilyDescription& columnFamily) { + Id = columnFamily.GetId(); + TBase::ParseFromLocalDB(columnFamily); +} + +const TOlapColumnFamily* TOlapColumnFamiliesDescription::GetById(const ui32 id) const noexcept { + auto it = ColumnFamilies.find(id); + if (it == ColumnFamilies.end()) { + return nullptr; + } + return &it->second; +} + +const TOlapColumnFamily* TOlapColumnFamiliesDescription::GetByIdVerified(const ui32 id) const noexcept { + return TValidator::CheckNotNull(GetById(id)); +} + +const TOlapColumnFamily* TOlapColumnFamiliesDescription::GetByName(const TString& name) const noexcept { + auto it = ColumnFamiliesByName.find(name); + if (it.IsEnd()) { + return nullptr; + } + return GetByIdVerified(it->second); +} + +bool TOlapColumnFamiliesDescription::ApplyUpdate( + const TOlapColumnFamiliesUpdate& schemaUpdate, IErrorCollector& errors, ui32& NextColumnFamilyId) { + for (auto&& family : schemaUpdate.GetAddColumnFamilies()) { + auto familyName = family.GetName(); + if (ColumnFamiliesByName.contains(familyName)) { + errors.AddError(NKikimrScheme::StatusAlreadyExists, TStringBuilder() << "column family '" << familyName << "' already exists"); + return false; + } + ui32 index = 0; + if (familyName != "default") { + index = NextColumnFamilyId++; + } + TOlapColumnFamily columFamilyAdd(family, index); + Y_ABORT_UNLESS(ColumnFamilies.emplace(columFamilyAdd.GetId(), columFamilyAdd).second); + Y_ABORT_UNLESS(ColumnFamiliesByName.emplace(columFamilyAdd.GetName(), columFamilyAdd.GetId()).second); + } + + for (auto&& family : schemaUpdate.GetAlterColumnFamily()) { + auto familyName = family.GetName(); + auto it = ColumnFamiliesByName.find(familyName); + if (it.IsEnd()) { + errors.AddError( + NKikimrScheme::StatusSchemeError, TStringBuilder() << "column family '" << familyName << "' not exists for altering"); + return false; + } else { + auto itColumnFamily = ColumnFamilies.find(it->second); + Y_ABORT_UNLESS(itColumnFamily != ColumnFamilies.end()); + Y_ABORT_UNLESS(AlterColumnFamiliesId.insert(it->second).second); + TOlapColumnFamily& alterColumnFamily = itColumnFamily->second; + if (!alterColumnFamily.ApplyDiff(family, errors)) { + return false; + } + } + } + return true; +} + +bool TOlapColumnFamiliesDescription::Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema) { + for (const auto& family : tableSchema.GetColumnFamilies()) { + TOlapColumnFamily columFamily; + columFamily.ParseFromLocalDB(family); + Y_ABORT_UNLESS(ColumnFamilies.emplace(columFamily.GetId(), columFamily).second); + Y_ABORT_UNLESS(ColumnFamiliesByName.emplace(columFamily.GetName(), columFamily.GetId()).second); + } + return true; +} + +void TOlapColumnFamiliesDescription::Serialize(NKikimrSchemeOp::TColumnTableSchema& tableSchema) const { + for (const auto& [_, family] : ColumnFamilies) { + family.Serialize(*tableSchema.AddColumnFamilies()); + } +} + +bool TOlapColumnFamiliesDescription::Validate(const NKikimrSchemeOp::TColumnTableSchema& opSchema, IErrorCollector& errors) const { + ui32 lastColumnFamilyId = 0; + THashSet<ui32> usedColumnFamilies; + for (const auto& familyProto : opSchema.GetColumnFamilies()) { + if (familyProto.GetName().empty()) { + errors.AddError("column family can't have an empty name"); + return false; + } + + const TString& columnFamilyName = familyProto.GetName(); + auto* family = GetByName(columnFamilyName); + if (!family) { + errors.AddError("column family '" + columnFamilyName + "' does not match schema preset"); + return false; + } + + if (familyProto.HasId() && familyProto.GetId() != family->GetId()) { + errors.AddError("column family '" + columnFamilyName + "' has id " + familyProto.GetId() + " that does not match schema preset"); + return false; + } + + if (!usedColumnFamilies.insert(family->GetId()).second) { + errors.AddError("column family '" + columnFamilyName + "' is specified multiple times"); + return false; + } + + if (familyProto.GetId() < lastColumnFamilyId) { + errors.AddError("column family order does not match schema preset"); + return false; + } + lastColumnFamilyId = familyProto.GetId(); + + if (!familyProto.HasColumnCodec()) { + errors.AddError("missing column codec for column family '" + columnFamilyName + "'"); + return false; + } + + auto serializerProto = ConvertFamilyDescriptionToProtoSerializer(familyProto); + if (serializerProto.IsFail()) { + errors.AddError(serializerProto.GetErrorMessage()); + return false; + } + NArrow::NSerialization::TSerializerContainer serializer; + if (!serializer.DeserializeFromProto(serializerProto.GetResult())) { + errors.AddError(TStringBuilder() << "can't deserialize column family `" << columnFamilyName << "` from proto "); + return false; + } + if (!family->GetSerializerContainer().IsEqualTo(serializer)) { + errors.AddError(TStringBuilder() << "compression from column family '" << columnFamilyName << "` is not matching schema preset"); + return false; + } + } + + for (const auto& [_, family] : ColumnFamilies) { + if (!usedColumnFamilies.contains(family.GetId())) { + errors.AddError("specified schema is missing some schema preset column families"); + return false; + } + } + + return true; +} +} diff --git a/ydb/core/tx/schemeshard/olap/column_families/schema.h b/ydb/core/tx/schemeshard/olap/column_families/schema.h new file mode 100644 index 0000000000..046321f8a2 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/column_families/schema.h @@ -0,0 +1,44 @@ +#pragma once +#include "update.h" + +namespace NKikimr::NSchemeShard { + +class TOlapColumnFamily: public TOlapColumnFamlilyAdd { +private: + using TBase = TOlapColumnFamlilyAdd; + YDB_READONLY(ui32, Id, Max<ui32>()); + +public: + TOlapColumnFamily() = default; + TOlapColumnFamily(const TOlapColumnFamlilyAdd& base, ui32 Id) + : TBase(base) + , Id(Id) + { + } + + void Serialize(NKikimrSchemeOp::TFamilyDescription& columnFamily) const; + void ParseFromLocalDB(const NKikimrSchemeOp::TFamilyDescription& columnFamily); +}; + +class TOlapColumnFamiliesDescription { +public: +private: + using TOlapColumnFamilies = TMap<ui32, TOlapColumnFamily>; + using TOlapColumnFamiliesByName = THashMap<TString, ui32>; + + YDB_READONLY_DEF(TOlapColumnFamilies, ColumnFamilies); + YDB_READONLY_DEF(TOlapColumnFamiliesByName, ColumnFamiliesByName); + YDB_READONLY_DEF(THashSet<ui32>, AlterColumnFamiliesId); + +public: + const TOlapColumnFamily* GetById(const ui32 id) const noexcept; + const TOlapColumnFamily* GetByIdVerified(const ui32 id) const noexcept; + const TOlapColumnFamily* GetByName(const TString& name) const noexcept; + + bool ApplyUpdate(const TOlapColumnFamiliesUpdate& schemaUpdate, IErrorCollector& errors, ui32& NextColumnFamilyId); + + bool Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema); + void Serialize(NKikimrSchemeOp::TColumnTableSchema& tableSchema) const; + bool Validate(const NKikimrSchemeOp::TColumnTableSchema& opSchema, IErrorCollector& errors) const; +}; +} diff --git a/ydb/core/tx/schemeshard/olap/column_families/update.cpp b/ydb/core/tx/schemeshard/olap/column_families/update.cpp new file mode 100644 index 0000000000..9460f8f1be --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/column_families/update.cpp @@ -0,0 +1,196 @@ +#include "update.h" + +#include <ydb/core/formats/arrow/serializer/native.h> +#include <ydb/core/formats/arrow/serializer/parsing.h> +#include <ydb/core/formats/arrow/serializer/utils.h> + +namespace NKikimr::NSchemeShard { + +NKikimr::TConclusion<NKikimrSchemeOp::TOlapColumn::TSerializer> ConvertFamilyDescriptionToProtoSerializer( + const NKikimrSchemeOp::TFamilyDescription& familyDescription) { + NKikimrSchemeOp::TOlapColumn::TSerializer result; + if (!familyDescription.HasColumnCodec()) { + return NKikimr::TConclusionStatus::Fail(TStringBuilder() + << "family `" << familyDescription.GetName() + << "`: can't convert TFamilyDescription to Serializer: field `ColumnCodec` is empty"); + } + auto codec = NArrow::CompressionFromProto(familyDescription.GetColumnCodec()); + if (!codec.has_value()) { + return NKikimr::TConclusionStatus::Fail(TStringBuilder() << "family `" << familyDescription.GetName() << "`: unknown codec"); + } + if (familyDescription.HasColumnCodecLevel() && !NArrow::SupportsCompressionLevel(codec.value())) { + return NKikimr::TConclusionStatus::Fail(TStringBuilder() << "family `" << familyDescription.GetName() << "`: codec `" + << NArrow::CompressionToString(familyDescription.GetColumnCodec()) + << "` is not support compression level"); + } + if (familyDescription.HasColumnCodecLevel()) { + int level = familyDescription.GetColumnCodecLevel(); + int minLevel = NArrow::MinimumCompressionLevel(codec.value()).value(); + int maxLevel = NArrow::MaximumCompressionLevel(codec.value()).value(); + if (level < minLevel || level > maxLevel) { + return NKikimr::TConclusionStatus::Fail(TStringBuilder() + << "family `" << familyDescription.GetName() << "`: incorrect level for codec `" + << NArrow::CompressionToString(familyDescription.GetColumnCodec()) << "`. expected: [" + << minLevel << ":" << maxLevel << "]"); + } + } + + result.SetClassName("ARROW_SERIALIZER"); + auto arrowCompression = result.MutableArrowCompression(); + arrowCompression->SetCodec(familyDescription.GetColumnCodec()); + if (familyDescription.HasColumnCodecLevel()) { + arrowCompression->SetLevel(familyDescription.GetColumnCodecLevel()); + } + return result; +} + +NKikimr::TConclusion<NKikimrSchemeOp::TFamilyDescription> ConvertSerializerContainerToFamilyDescription( + const NArrow::NSerialization::TSerializerContainer& serializer) { + NKikimrSchemeOp::TFamilyDescription result; + if (serializer->GetClassName().empty()) { + return NKikimr::TConclusionStatus::Fail("convert TSerializerContainer to TFamilyDescription: field `ClassName` is empty"); + } + if (serializer.GetClassName() == NArrow::NSerialization::TNativeSerializer::GetClassNameStatic()) { + std::shared_ptr<NArrow::NSerialization::TNativeSerializer> nativeSerializer = + serializer.GetObjectPtrVerifiedAs<NArrow::NSerialization::TNativeSerializer>(); + result.SetColumnCodec(NKikimr::NArrow::CompressionToProto(nativeSerializer->GetCodecType())); + auto level = nativeSerializer->GetCodecLevel(); + if (level.has_value()) { + result.SetColumnCodecLevel(level.value()); + } + } else { + return NKikimr::TConclusionStatus::Fail("convert TSerializerContainer to TFamilyDescription: Unknown value in field `ClassName`"); + } + return result; +} + +bool TOlapColumnFamlilyDiff::ParseFromRequest(const NKikimrSchemeOp::TFamilyDescription& diffColumnFamily, IErrorCollector& errors) { + if (!diffColumnFamily.HasName()) { + errors.AddError("column family: empty field name"); + return false; + } + + Name = diffColumnFamily.GetName(); + if (diffColumnFamily.HasColumnCodec()) { + Codec = diffColumnFamily.GetColumnCodec(); + } + if (diffColumnFamily.HasColumnCodecLevel()) { + CodecLevel = diffColumnFamily.GetColumnCodecLevel(); + } + return true; +} + +bool TOlapColumnFamlilyAdd::ParseFromRequest(const NKikimrSchemeOp::TFamilyDescription& columnFamily, IErrorCollector& errors) { + if (!columnFamily.HasName()) { + errors.AddError("column family: empty field Name"); + return false; + } + + Name = columnFamily.GetName(); + auto serializer = ConvertFamilyDescriptionToProtoSerializer(columnFamily); + if (serializer.IsFail()) { + errors.AddError(serializer.GetErrorMessage()); + return false; + } + auto resultBuild = NArrow::NSerialization::TSerializerContainer::BuildFromProto(serializer.GetResult()); + if (resultBuild.IsFail()) { + errors.AddError(resultBuild.GetErrorMessage()); + return false; + } + SerializerContainer = resultBuild.GetResult(); + return true; +} + +void TOlapColumnFamlilyAdd::ParseFromLocalDB(const NKikimrSchemeOp::TFamilyDescription& columnFamily) { + Name = columnFamily.GetName(); + auto serializer = ConvertFamilyDescriptionToProtoSerializer(columnFamily); + Y_VERIFY_S(serializer.IsSuccess(), serializer.GetErrorMessage()); + Y_VERIFY(SerializerContainer.DeserializeFromProto(serializer.GetResult())); +} + +void TOlapColumnFamlilyAdd::Serialize(NKikimrSchemeOp::TFamilyDescription& columnFamily) const { + auto result = ConvertSerializerContainerToFamilyDescription(SerializerContainer); + Y_VERIFY_S(result.IsSuccess(), result.GetErrorMessage()); + columnFamily.SetName(Name); + columnFamily.SetColumnCodec(result->GetColumnCodec()); + if (result->HasColumnCodecLevel()) { + columnFamily.SetColumnCodecLevel(result->GetColumnCodecLevel()); + } +} + +bool TOlapColumnFamlilyAdd::ApplyDiff(const TOlapColumnFamlilyDiff& diffColumnFamily, IErrorCollector& errors) { + Y_ABORT_UNLESS(GetName() == diffColumnFamily.GetName()); + auto newColumnFamily = ConvertSerializerContainerToFamilyDescription(SerializerContainer); + if (newColumnFamily.IsFail()) { + errors.AddError(newColumnFamily.GetErrorMessage()); + return false; + } + newColumnFamily->SetName(GetName()); + auto codec = diffColumnFamily.GetCodec(); + if (codec.has_value()) { + newColumnFamily->SetColumnCodec(codec.value()); + newColumnFamily->ClearColumnCodecLevel(); + } + auto codecLevel = diffColumnFamily.GetCodecLevel(); + if (codecLevel.has_value()) { + newColumnFamily->SetColumnCodecLevel(codecLevel.value()); + } + auto serializer = ConvertFamilyDescriptionToProtoSerializer(newColumnFamily.GetResult()); + if (serializer.IsFail()) { + errors.AddError(serializer.GetErrorMessage()); + return false; + } + auto resultBuild = NArrow::NSerialization::TSerializerContainer::BuildFromProto(serializer.GetResult()); + if (resultBuild.IsFail()) { + errors.AddError(resultBuild.GetErrorMessage()); + return false; + } + SerializerContainer = resultBuild.GetResult(); + return true; +} + +bool TOlapColumnFamiliesUpdate::Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema, IErrorCollector& errors) { + TSet<TString> familyNames; + for (auto&& family : tableSchema.GetColumnFamilies()) { + auto familyName = family.GetName(); + if (!familyNames.emplace(familyName).second) { + errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "duplicate column family '" << familyName << "'"); + return false; + } + TOlapColumnFamlilyAdd columnFamily; + if (!columnFamily.ParseFromRequest(family, errors)) { + return false; + } + familyNames.insert(familyName); + AddColumnFamilies.emplace_back(columnFamily); + } + + return true; +} + +bool TOlapColumnFamiliesUpdate::Parse(const NKikimrSchemeOp::TAlterColumnTableSchema& alterRequest, IErrorCollector& errors) { + TSet<TString> addColumnFamilies; + for (auto&& family : alterRequest.GetAddColumnFamily()) { + auto familyName = family.GetName(); + if (!addColumnFamilies.emplace(familyName).second) { + errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "duplicate column family '" << familyName << "'"); + return false; + } + TOlapColumnFamlilyAdd columnFamily({}); + if (!columnFamily.ParseFromRequest(family, errors)) { + return false; + } + addColumnFamilies.insert(familyName); + AddColumnFamilies.emplace_back(columnFamily); + } + + for (auto&& family : alterRequest.GetAlterColumnFamily()) { + TOlapColumnFamlilyDiff columnFamily({}); + if (!columnFamily.ParseFromRequest(family, errors)) { + return false; + } + AlterColumnFamily.emplace_back(columnFamily); + } + return true; +} +} diff --git a/ydb/core/tx/schemeshard/olap/column_families/update.h b/ydb/core/tx/schemeshard/olap/column_families/update.h new file mode 100644 index 0000000000..63c0e53af1 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/column_families/update.h @@ -0,0 +1,45 @@ +#pragma once +#include <ydb/core/formats/arrow/serializer/abstract.h> +#include <ydb/core/protos/flat_scheme_op.pb.h> +#include <ydb/core/tx/schemeshard/olap/common/common.h> + +#include <ydb/library/accessor/accessor.h> + +namespace NKikimr::NSchemeShard { + +[[nodiscard]] NKikimr::TConclusion<NKikimrSchemeOp::TOlapColumn::TSerializer> ConvertFamilyDescriptionToProtoSerializer( + const NKikimrSchemeOp::TFamilyDescription& familyDescription); + +class TOlapColumnFamlilyDiff { +private: + YDB_ACCESSOR_DEF(TString, Name); + YDB_ACCESSOR_DEF(std::optional<NKikimrSchemeOp::EColumnCodec>, Codec); + YDB_ACCESSOR_DEF(std::optional<i32>, CodecLevel); + +public: + bool ParseFromRequest(const NKikimrSchemeOp::TFamilyDescription& diffColumnFamily, IErrorCollector& errors); +}; + +class TOlapColumnFamlilyAdd { +private: + YDB_READONLY_DEF(TString, Name); + YDB_READONLY_DEF(NArrow::NSerialization::TSerializerContainer, SerializerContainer); + +public: + bool ParseFromRequest(const NKikimrSchemeOp::TFamilyDescription& columnFamily, IErrorCollector& errors); + void ParseFromLocalDB(const NKikimrSchemeOp::TFamilyDescription& columnFamily); + void Serialize(NKikimrSchemeOp::TFamilyDescription& columnSchema) const; + bool ApplyDiff(const TOlapColumnFamlilyDiff& diffColumn, IErrorCollector& errors); +}; + +class TOlapColumnFamiliesUpdate { +private: + YDB_READONLY_DEF(TVector<TOlapColumnFamlilyAdd>, AddColumnFamilies); + YDB_READONLY_DEF(TVector<TOlapColumnFamlilyDiff>, AlterColumnFamily); + +public: + bool Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema, IErrorCollector& errors); + bool Parse(const NKikimrSchemeOp::TAlterColumnTableSchema& alterRequest, IErrorCollector& errors); +}; + +} diff --git a/ydb/core/tx/schemeshard/olap/column_families/ya.make b/ydb/core/tx/schemeshard/olap/column_families/ya.make new file mode 100644 index 0000000000..75f48026bd --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/column_families/ya.make @@ -0,0 +1,17 @@ +LIBRARY() + +SRCS( + update.cpp + schema.cpp +) + +PEERDIR( + ydb/core/protos + ydb/core/formats/arrow/dictionary + ydb/core/formats/arrow/serializer + ydb/core/tx/schemeshard/olap/common +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/tx/schemeshard/olap/columns/schema.cpp b/ydb/core/tx/schemeshard/olap/columns/schema.cpp index 02eb5b719e..950ba879b1 100644 --- a/ydb/core/tx/schemeshard/olap/columns/schema.cpp +++ b/ydb/core/tx/schemeshard/olap/columns/schema.cpp @@ -15,7 +15,8 @@ void TOlapColumnSchema::ParseFromLocalDB(const NKikimrSchemeOp::TOlapColumnDescr Id = columnSchema.GetId(); } -bool TOlapColumnsDescription::ApplyUpdate(const TOlapColumnsUpdate& schemaUpdate, IErrorCollector& errors, ui32& nextEntityId) { +bool TOlapColumnsDescription::ApplyUpdate( + const TOlapColumnsUpdate& schemaUpdate, const TOlapColumnFamiliesDescription& columnFamilies, IErrorCollector& errors, ui32& nextEntityId) { if (Columns.empty() && schemaUpdate.GetAddColumns().empty()) { errors.AddError(NKikimrScheme::StatusSchemeError, "No add columns specified"); return false; @@ -38,10 +39,27 @@ bool TOlapColumnsDescription::ApplyUpdate(const TOlapColumnsUpdate& schemaUpdate return false; } } - TOlapColumnSchema newColumn(column, nextEntityId++); + std::optional<ui32> columnFamilyId; + if (column.GetColumnFamilyName().has_value()) { + TString familyName = column.GetColumnFamilyName().value(); + const TOlapColumnFamily* columnFamily = columnFamilies.GetByName(familyName); + + if (!columnFamily) { + errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() + << "Cannot set column family `" << familyName << "` for column `" + << column.GetName() << "`. Family not found"); + return false; + } + columnFamilyId = columnFamily->GetId(); + } + TOlapColumnSchema newColumn(column, nextEntityId++, columnFamilyId); if (newColumn.GetKeyOrder()) { Y_ABORT_UNLESS(orderedKeyColumnIds.emplace(*newColumn.GetKeyOrder(), newColumn.GetId()).second); } + if (!newColumn.GetSerializer().has_value() && !columnFamilies.GetColumnFamilies().empty() && + !newColumn.ApplySerializerFromColumnFamily(columnFamilies, errors)) { + return false; + } Y_ABORT_UNLESS(ColumnsByName.emplace(newColumn.GetName(), newColumn.GetId()).second); Y_ABORT_UNLESS(Columns.emplace(newColumn.GetId(), std::move(newColumn)).second); @@ -56,7 +74,7 @@ bool TOlapColumnsDescription::ApplyUpdate(const TOlapColumnsUpdate& schemaUpdate auto itColumn = Columns.find(it->second); Y_ABORT_UNLESS(itColumn != Columns.end()); TOlapColumnSchema& newColumn = itColumn->second; - if (!newColumn.ApplyDiff(columnDiff, errors)) { + if (!newColumn.ApplyDiff(columnDiff, columnFamilies, errors)) { return false; } } @@ -89,6 +107,21 @@ bool TOlapColumnsDescription::ApplyUpdate(const TOlapColumnsUpdate& schemaUpdate Columns.erase(columnInfo->GetId()); } + auto alterColumnFamiliesId = columnFamilies.GetAlterColumnFamiliesId(); + if (!alterColumnFamiliesId.empty()) { + for (auto& [_, column] : Columns) { + if (!column.GetColumnFamilyId().has_value()) { + errors.AddError(NKikimrScheme::StatusSchemeError, + TStringBuilder() << "Cannot alter family for column `" << column.GetName() << "`. Column family is not set"); + return false; + } + ui32 id = column.GetColumnFamilyId().value(); + if (alterColumnFamiliesId.contains(id)) { + column.SetSerializer(columnFamilies.GetByIdVerified(id)->GetSerializerContainer()); + } + } + } + return true; } @@ -153,6 +186,12 @@ bool TOlapColumnsDescription::Validate(const NKikimrSchemeOp::TColumnTableSchema return false; } + if (colProto.HasColumnFamilyId() && colProto.GetColumnFamilyId() != col->GetColumnFamilyId()) { + errors.AddError(TStringBuilder() << "Column '" << colName << "' has column family id " << colProto.GetColumnFamilyId() + << " that does not match schema preset"); + return false; + } + if (!usedColumns.insert(col->GetId()).second) { errors.AddError("Column '" + colName + "' is specified multiple times"); return false; diff --git a/ydb/core/tx/schemeshard/olap/columns/schema.h b/ydb/core/tx/schemeshard/olap/columns/schema.h index 0abec27769..c8604d55e9 100644 --- a/ydb/core/tx/schemeshard/olap/columns/schema.h +++ b/ydb/core/tx/schemeshard/olap/columns/schema.h @@ -3,16 +3,18 @@ namespace NKikimr::NSchemeShard { -class TOlapColumnSchema: public TOlapColumnAdd { +class TOlapColumnSchema: public TOlapColumnBase { private: - using TBase = TOlapColumnAdd; + using TBase = TOlapColumnBase; YDB_READONLY(ui32, Id, Max<ui32>()); public: - TOlapColumnSchema(const TOlapColumnAdd& base, const ui32 id) + TOlapColumnSchema(const TOlapColumnBase& base, const ui32 id, const std::optional<ui32> columnFamilyId = {}) : TBase(base) - , Id(id) { - + , Id(id) + { + ColumnFamilyId = columnFamilyId; } + TOlapColumnSchema(const std::optional<ui32>& keyOrder) : TBase(keyOrder) { @@ -51,7 +53,8 @@ public: const TOlapColumnSchema* GetByIdVerified(const ui32 id) const noexcept; - bool ApplyUpdate(const TOlapColumnsUpdate& schemaUpdate, IErrorCollector& errors, ui32& nextEntityId); + bool ApplyUpdate(const TOlapColumnsUpdate& schemaUpdate, const TOlapColumnFamiliesDescription& columnFamilies, IErrorCollector& errors, + ui32& nextEntityId); void Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema); void Serialize(NKikimrSchemeOp::TColumnTableSchema& tableSchema) const; diff --git a/ydb/core/tx/schemeshard/olap/columns/update.cpp b/ydb/core/tx/schemeshard/olap/columns/update.cpp index 03ae01ae03..30ffe4e3fa 100644 --- a/ydb/core/tx/schemeshard/olap/columns/update.cpp +++ b/ydb/core/tx/schemeshard/olap/columns/update.cpp @@ -12,238 +12,327 @@ extern "C" { namespace NKikimr::NSchemeShard { - bool TOlapColumnAdd::ParseFromRequest(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema, IErrorCollector& errors) { - if (!columnSchema.GetName()) { - errors.AddError("Columns cannot have an empty name"); - return false; - } - Name = columnSchema.GetName(); - NotNullFlag = columnSchema.GetNotNull(); - TypeName = columnSchema.GetType(); +bool TOlapColumnDiff::ParseFromRequest(const NKikimrSchemeOp::TOlapColumnDiff& columnSchema, IErrorCollector& errors) { + Name = columnSchema.GetName(); + if (!!columnSchema.GetStorageId()) { StorageId = columnSchema.GetStorageId(); - if (columnSchema.HasSerializer()) { - NArrow::NSerialization::TSerializerContainer serializer; - if (!serializer.DeserializeFromProto(columnSchema.GetSerializer())) { - errors.AddError("Cannot parse serializer info"); - return false; - } - Serializer = serializer; - } - if (columnSchema.HasDictionaryEncoding()) { - auto settings = NArrow::NDictionary::TEncodingSettings::BuildFromProto(columnSchema.GetDictionaryEncoding()); - if (!settings) { - errors.AddError("Cannot parse dictionary compression info: " + settings.GetErrorMessage()); - return false; - } - DictionaryEncoding = *settings; + } + if (!Name) { + errors.AddError("empty field name"); + return false; + } + if (columnSchema.HasDefaultValue()) { + DefaultValue = columnSchema.GetDefaultValue(); + } + if (columnSchema.HasDataAccessorConstructor()) { + if (!AccessorConstructor.DeserializeFromProto(columnSchema.GetDataAccessorConstructor())) { + errors.AddError("cannot parse accessor constructor from proto"); + return false; } + } - if (columnSchema.HasTypeId()) { - errors.AddError(TStringBuilder() << "Cannot set TypeId for column '" << Name << ", use Type"); + if (columnSchema.HasColumnFamilyName()) { + ColumnFamilyName = columnSchema.GetColumnFamilyName(); + } + if (columnSchema.HasSerializer()) { + if (!Serializer.DeserializeFromProto(columnSchema.GetSerializer())) { + errors.AddError("cannot parse serializer diff from proto"); return false; } + } + if (!DictionaryEncoding.DeserializeFromProto(columnSchema.GetDictionaryEncoding())) { + errors.AddError("cannot parse dictionary encoding diff from proto"); + return false; + } + return true; +} - if (!columnSchema.HasType()) { - errors.AddError(TStringBuilder() << "Missing Type for column '" << Name); +bool TOlapColumnBase::ParseFromRequest(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema, IErrorCollector& errors) { + if (!columnSchema.GetName()) { + errors.AddError("Columns cannot have an empty name"); + return false; + } + Name = columnSchema.GetName(); + NotNullFlag = columnSchema.GetNotNull(); + TypeName = columnSchema.GetType(); + StorageId = columnSchema.GetStorageId(); + if (columnSchema.HasColumnFamilyId()) { + ColumnFamilyId = columnSchema.GetColumnFamilyId(); + } + if (columnSchema.HasSerializer()) { + NArrow::NSerialization::TSerializerContainer serializer; + if (!serializer.DeserializeFromProto(columnSchema.GetSerializer())) { + errors.AddError("Cannot parse serializer info"); return false; } - - TString errStr; - Y_ABORT_UNLESS(AppData()->TypeRegistry); - if (!GetTypeInfo(AppData()->TypeRegistry->GetType(TypeName), columnSchema.GetTypeInfo(), TypeName, Name, Type, errStr)) { - errors.AddError(errStr); + Serializer = serializer; + } + if (columnSchema.HasDictionaryEncoding()) { + auto settings = NArrow::NDictionary::TEncodingSettings::BuildFromProto(columnSchema.GetDictionaryEncoding()); + if (!settings) { + errors.AddError("Cannot parse dictionary compression info: " + settings.GetErrorMessage()); return false; } + DictionaryEncoding = *settings; + } - if (Type.GetTypeId() == NScheme::NTypeIds::Pg) { - if (!TOlapColumnAdd::IsAllowedPgType(NPg::PgTypeIdFromTypeDesc(Type.GetPgTypeDesc()))) { - errors.AddError(TStringBuilder() << "Type '" << TypeName << "' specified for column '" << Name << "' is not supported"); - return false; - } - } else { - if (!IsAllowedType(Type.GetTypeId())){ - errors.AddError(TStringBuilder() << "Type '" << TypeName << "' specified for column '" << Name << "' is not supported"); - return false; - } - } + if (columnSchema.HasTypeId()) { + errors.AddError(TStringBuilder() << "Cannot set TypeId for column '" << Name << ", use Type"); + return false; + } + + if (!columnSchema.HasType()) { + errors.AddError(TStringBuilder() << "Missing Type for column '" << Name); + return false; + } + TString errStr; + Y_ABORT_UNLESS(AppData()->TypeRegistry); + if (!GetTypeInfo(AppData()->TypeRegistry->GetType(TypeName), columnSchema.GetTypeInfo(), TypeName, Name, Type, errStr)) { + errors.AddError(errStr); + return false; + } - auto arrowTypeResult = NArrow::GetArrowType(Type); - const auto arrowTypeStatus = arrowTypeResult.status(); - if (!arrowTypeStatus.ok()) { - errors.AddError(TStringBuilder() << "Column '" << Name << "': " << arrowTypeStatus.ToString()); + if (Type.GetTypeId() == NScheme::NTypeIds::Pg) { + if (!IsAllowedPgType(NPg::PgTypeIdFromTypeDesc(Type.GetPgTypeDesc()))) { + errors.AddError(TStringBuilder() << "Type '" << TypeName << "' specified for column '" << Name << "' is not supported"); return false; } - if (columnSchema.HasDefaultValue()) { - auto conclusion = DefaultValue.DeserializeFromProto(columnSchema.GetDefaultValue()); - if (conclusion.IsFail()) { - errors.AddError(conclusion.GetErrorMessage()); - return false; - } - if (!DefaultValue.IsCompatibleType(*arrowTypeResult)) { - errors.AddError("incompatible types for default write: def" + DefaultValue.DebugString() + ", col:" + (*arrowTypeResult)->ToString()); - return false; - } + } else { + if (!IsAllowedType(Type.GetTypeId())) { + errors.AddError(TStringBuilder() << "Type '" << TypeName << "' specified for column '" << Name << "' is not supported"); + return false; } - return true; } - void TOlapColumnAdd::ParseFromLocalDB(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema) { - Name = columnSchema.GetName(); - TypeName = columnSchema.GetType(); - StorageId = columnSchema.GetStorageId(); - - if (columnSchema.HasTypeInfo()) { - Type = NScheme::TypeInfoModFromProtoColumnType( - columnSchema.GetTypeId(), &columnSchema.GetTypeInfo()) - .TypeInfo; - } else { - Type = NScheme::TypeInfoModFromProtoColumnType( - columnSchema.GetTypeId(), nullptr) - .TypeInfo; - } - auto arrowType = NArrow::TStatusValidator::GetValid(NArrow::GetArrowType(Type)); - if (columnSchema.HasDefaultValue()) { - DefaultValue.DeserializeFromProto(columnSchema.GetDefaultValue()).Validate(); - AFL_VERIFY(DefaultValue.IsCompatibleType(arrowType)); - } - if (columnSchema.HasSerializer()) { - NArrow::NSerialization::TSerializerContainer serializer; - AFL_VERIFY(serializer.DeserializeFromProto(columnSchema.GetSerializer())); - Serializer = serializer; - } else if (columnSchema.HasCompression()) { - NArrow::NSerialization::TSerializerContainer serializer; - serializer.DeserializeFromProto(columnSchema.GetCompression()).Validate(); - Serializer = serializer; - } - if (columnSchema.HasDataAccessorConstructor()) { - NArrow::NAccessor::TConstructorContainer container; - AFL_VERIFY(container.DeserializeFromProto(columnSchema.GetDataAccessorConstructor())); - AccessorConstructor = container; - } - if (columnSchema.HasDictionaryEncoding()) { - auto settings = NArrow::NDictionary::TEncodingSettings::BuildFromProto(columnSchema.GetDictionaryEncoding()); - Y_ABORT_UNLESS(settings.IsSuccess()); - DictionaryEncoding = *settings; + auto arrowTypeResult = NArrow::GetArrowType(Type); + const auto arrowTypeStatus = arrowTypeResult.status(); + if (!arrowTypeStatus.ok()) { + errors.AddError(TStringBuilder() << "Column '" << Name << "': " << arrowTypeStatus.ToString()); + return false; + } + if (columnSchema.HasDefaultValue()) { + auto conclusion = DefaultValue.DeserializeFromProto(columnSchema.GetDefaultValue()); + if (conclusion.IsFail()) { + errors.AddError(conclusion.GetErrorMessage()); + return false; } - if (columnSchema.HasNotNull()) { - NotNullFlag = columnSchema.GetNotNull(); - } else { - NotNullFlag = false; + if (!DefaultValue.IsCompatibleType(*arrowTypeResult)) { + errors.AddError( + "incompatible types for default write: def" + DefaultValue.DebugString() + ", col:" + (*arrowTypeResult)->ToString()); + return false; } } + return true; +} - void TOlapColumnAdd::Serialize(NKikimrSchemeOp::TOlapColumnDescription& columnSchema) const { - columnSchema.SetName(Name); - columnSchema.SetType(TypeName); - columnSchema.SetNotNull(NotNullFlag); - columnSchema.SetStorageId(StorageId); - *columnSchema.MutableDefaultValue() = DefaultValue.SerializeToProto(); - if (Serializer) { - Serializer->SerializeToProto(*columnSchema.MutableSerializer()); - } - if (AccessorConstructor) { - *columnSchema.MutableDataAccessorConstructor() = AccessorConstructor.SerializeToProto(); - } - if (DictionaryEncoding) { - *columnSchema.MutableDictionaryEncoding() = DictionaryEncoding->SerializeToProto(); - } +void TOlapColumnBase::ParseFromLocalDB(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema) { + Name = columnSchema.GetName(); + TypeName = columnSchema.GetType(); + StorageId = columnSchema.GetStorageId(); - auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(Type, ""); - columnSchema.SetTypeId(columnType.TypeId); - if (columnType.TypeInfo) { - *columnSchema.MutableTypeInfo() = *columnType.TypeInfo; - } + if (columnSchema.HasTypeInfo()) { + Type = NScheme::TypeInfoModFromProtoColumnType(columnSchema.GetTypeId(), &columnSchema.GetTypeInfo()).TypeInfo; + } else { + Type = NScheme::TypeInfoModFromProtoColumnType(columnSchema.GetTypeId(), nullptr).TypeInfo; + } + auto arrowType = NArrow::TStatusValidator::GetValid(NArrow::GetArrowType(Type)); + if (columnSchema.HasDefaultValue()) { + DefaultValue.DeserializeFromProto(columnSchema.GetDefaultValue()).Validate(); + AFL_VERIFY(DefaultValue.IsCompatibleType(arrowType)); } + if (columnSchema.HasColumnFamilyId()) { + ColumnFamilyId = columnSchema.GetColumnFamilyId(); + } + if (columnSchema.HasSerializer()) { + NArrow::NSerialization::TSerializerContainer serializer; + AFL_VERIFY(serializer.DeserializeFromProto(columnSchema.GetSerializer())); + Serializer = serializer; + } else if (columnSchema.HasCompression()) { + NArrow::NSerialization::TSerializerContainer serializer; + serializer.DeserializeFromProto(columnSchema.GetCompression()).Validate(); + Serializer = serializer; + } + if (columnSchema.HasDataAccessorConstructor()) { + NArrow::NAccessor::TConstructorContainer container; + AFL_VERIFY(container.DeserializeFromProto(columnSchema.GetDataAccessorConstructor())); + AccessorConstructor = container; + } + if (columnSchema.HasDictionaryEncoding()) { + auto settings = NArrow::NDictionary::TEncodingSettings::BuildFromProto(columnSchema.GetDictionaryEncoding()); + Y_ABORT_UNLESS(settings.IsSuccess()); + DictionaryEncoding = *settings; + } + if (columnSchema.HasNotNull()) { + NotNullFlag = columnSchema.GetNotNull(); + } else { + NotNullFlag = false; + } +} - bool TOlapColumnAdd::ApplyDiff(const TOlapColumnDiff& diffColumn, IErrorCollector& errors) { - Y_ABORT_UNLESS(GetName() == diffColumn.GetName()); - if (diffColumn.GetDefaultValue()) { - auto conclusion = DefaultValue.ParseFromString(*diffColumn.GetDefaultValue(), Type); - if (conclusion.IsFail()) { - errors.AddError(conclusion.GetErrorMessage()); - return false; - } - } - if (!!diffColumn.GetAccessorConstructor()) { - auto conclusion = diffColumn.GetAccessorConstructor()->BuildConstructor(); - if (conclusion.IsFail()) { - errors.AddError(conclusion.GetErrorMessage()); - return false; - } - AccessorConstructor = conclusion.DetachResult(); +void TOlapColumnBase::Serialize(NKikimrSchemeOp::TOlapColumnDescription& columnSchema) const { + columnSchema.SetName(Name); + columnSchema.SetType(TypeName); + columnSchema.SetNotNull(NotNullFlag); + columnSchema.SetStorageId(StorageId); + *columnSchema.MutableDefaultValue() = DefaultValue.SerializeToProto(); + if (ColumnFamilyId.has_value()) { + columnSchema.SetColumnFamilyId(ColumnFamilyId.value()); + } + if (Serializer) { + Serializer->SerializeToProto(*columnSchema.MutableSerializer()); + } + if (AccessorConstructor) { + *columnSchema.MutableDataAccessorConstructor() = AccessorConstructor.SerializeToProto(); + } + if (DictionaryEncoding) { + *columnSchema.MutableDictionaryEncoding() = DictionaryEncoding->SerializeToProto(); + } + + auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(Type, ""); + columnSchema.SetTypeId(columnType.TypeId); + if (columnType.TypeInfo) { + *columnSchema.MutableTypeInfo() = *columnType.TypeInfo; + } +} + +bool TOlapColumnBase::ApplySerializerFromColumnFamily(const TOlapColumnFamiliesDescription& columnFamilies, IErrorCollector& errors) { + if (GetColumnFamilyId().has_value()) { + SetSerializer(columnFamilies.GetByIdVerified(GetColumnFamilyId().value())->GetSerializerContainer()); + } else { + TString familyName = "default"; + const TOlapColumnFamily* columnFamily = columnFamilies.GetByName(familyName); + + if (!columnFamily) { + errors.AddError(NKikimrScheme::StatusSchemeError, + TStringBuilder() << "Cannot set column family `" << familyName << "` for column `" << GetName() << "`. Family not found"); + return false; } - if (diffColumn.GetStorageId()) { - StorageId = *diffColumn.GetStorageId(); + + ColumnFamilyId = columnFamily->GetId(); + SetSerializer(columnFamilies.GetByIdVerified(columnFamily->GetId())->GetSerializerContainer()); + } + return true; +} + +bool TOlapColumnBase::ApplyDiff( + const TOlapColumnDiff& diffColumn, const TOlapColumnFamiliesDescription& columnFamilies, IErrorCollector& errors) { + Y_ABORT_UNLESS(GetName() == diffColumn.GetName()); + if (diffColumn.GetDefaultValue()) { + auto conclusion = DefaultValue.ParseFromString(*diffColumn.GetDefaultValue(), Type); + if (conclusion.IsFail()) { + errors.AddError(conclusion.GetErrorMessage()); + return false; } - if (diffColumn.GetSerializer()) { - Serializer = diffColumn.GetSerializer(); + } + if (!!diffColumn.GetAccessorConstructor()) { + auto conclusion = diffColumn.GetAccessorConstructor()->BuildConstructor(); + if (conclusion.IsFail()) { + errors.AddError(conclusion.GetErrorMessage()); + return false; } - { - auto result = diffColumn.GetDictionaryEncoding().Apply(DictionaryEncoding); - if (!result) { - errors.AddError("Cannot merge dictionary encoding info: " + result.GetErrorMessage()); - return false; - } + AccessorConstructor = conclusion.DetachResult(); + } + if (diffColumn.GetStorageId()) { + StorageId = *diffColumn.GetStorageId(); + } + if (diffColumn.GetColumnFamilyName().has_value()) { + TString columnFamilyName = diffColumn.GetColumnFamilyName().value(); + const TOlapColumnFamily* columnFamily = columnFamilies.GetByName(columnFamilyName); + if (!columnFamily) { + errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "Cannot alter column family `" << columnFamilyName + << "` for column `" << GetName() << "`. Family not found"); + return false; } - return true; + ColumnFamilyId = columnFamily->GetId(); } - bool TOlapColumnAdd::IsAllowedType(ui32 typeId) { - if (!NScheme::NTypeIds::IsYqlType(typeId)) { + if (diffColumn.GetSerializer()) { + Serializer = diffColumn.GetSerializer(); + } else { + if (!columnFamilies.GetColumnFamilies().empty() && !ApplySerializerFromColumnFamily(columnFamilies, errors)) { return false; } - - switch (typeId) { - case NYql::NProto::Bool: - case NYql::NProto::Interval: - case NYql::NProto::DyNumber: - return false; - default: - break; + } + { + auto result = diffColumn.GetDictionaryEncoding().Apply(DictionaryEncoding); + if (!result) { + errors.AddError("Cannot merge dictionary encoding info: " + result.GetErrorMessage()); + return false; } - return true; } + return true; +} - bool TOlapColumnAdd::IsAllowedPgType(ui32 pgTypeId) { - switch (pgTypeId) { - case INT2OID: - case INT4OID: - case INT8OID: - case FLOAT4OID: - case FLOAT8OID: - return true; - default: - break; - } +bool TOlapColumnBase::IsAllowedType(ui32 typeId) { + if (!NScheme::NTypeIds::IsYqlType(typeId)) { return false; } - bool TOlapColumnAdd::IsAllowedPkType(ui32 typeId) { - switch (typeId) { - case NYql::NProto::Int8: - case NYql::NProto::Uint8: // Byte - case NYql::NProto::Int16: - case NYql::NProto::Uint16: - case NYql::NProto::Int32: - case NYql::NProto::Uint32: - case NYql::NProto::Int64: - case NYql::NProto::Uint64: - case NYql::NProto::String: - case NYql::NProto::Utf8: - case NYql::NProto::Date: - case NYql::NProto::Datetime: - case NYql::NProto::Timestamp: - case NYql::NProto::Date32: - case NYql::NProto::Datetime64: - case NYql::NProto::Timestamp64: - case NYql::NProto::Interval64: - case NYql::NProto::Decimal: - return true; - default: - return false; - } + switch (typeId) { + case NYql::NProto::Bool: + case NYql::NProto::Interval: + case NYql::NProto::DyNumber: + return false; + default: + break; } + return true; +} + +bool TOlapColumnBase::IsAllowedPgType(ui32 pgTypeId) { + switch (pgTypeId) { + case INT2OID: + case INT4OID: + case INT8OID: + case FLOAT4OID: + case FLOAT8OID: + return true; + default: + break; + } + return false; +} + +bool TOlapColumnBase::IsAllowedPkType(ui32 typeId) { + switch (typeId) { + case NYql::NProto::Int8: + case NYql::NProto::Uint8: // Byte + case NYql::NProto::Int16: + case NYql::NProto::Uint16: + case NYql::NProto::Int32: + case NYql::NProto::Uint32: + case NYql::NProto::Int64: + case NYql::NProto::Uint64: + case NYql::NProto::String: + case NYql::NProto::Utf8: + case NYql::NProto::Date: + case NYql::NProto::Datetime: + case NYql::NProto::Timestamp: + case NYql::NProto::Date32: + case NYql::NProto::Datetime64: + case NYql::NProto::Timestamp64: + case NYql::NProto::Interval64: + case NYql::NProto::Decimal: + return true; + default: + return false; + } +} + +bool TOlapColumnAdd::ParseFromRequest(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema, IErrorCollector& errors) { + if (columnSchema.HasColumnFamilyName()) { + ColumnFamilyName = columnSchema.GetColumnFamilyName(); + } + return TBase::ParseFromRequest(columnSchema, errors); +} + +void TOlapColumnAdd::ParseFromLocalDB(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema) { + if (columnSchema.HasColumnFamilyName()) { + ColumnFamilyName = columnSchema.GetColumnFamilyName(); + } + TBase::ParseFromLocalDB(columnSchema); +} bool TOlapColumnsUpdate::Parse(const NKikimrSchemeOp::TAlterColumnTableSchema& alterRequest, IErrorCollector& errors) { for (const auto& column : alterRequest.GetDropColumns()) { diff --git a/ydb/core/tx/schemeshard/olap/columns/update.h b/ydb/core/tx/schemeshard/olap/columns/update.h index 84a728829d..4f87cf014d 100644 --- a/ydb/core/tx/schemeshard/olap/columns/update.h +++ b/ydb/core/tx/schemeshard/olap/columns/update.h @@ -1,13 +1,15 @@ #pragma once -#include <ydb/core/formats/arrow/dictionary/diff.h> -#include <ydb/core/protos/flat_scheme_op.pb.h> -#include <ydb/core/tx/schemeshard/olap/common/common.h> -#include <ydb/library/accessor/accessor.h> -#include <ydb/core/scheme_types/scheme_type_info.h> #include <ydb/core/formats/arrow/accessor/abstract/request.h> +#include <ydb/core/formats/arrow/dictionary/diff.h> #include <ydb/core/formats/arrow/dictionary/object.h> #include <ydb/core/formats/arrow/serializer/abstract.h> +#include <ydb/core/protos/flat_scheme_op.pb.h> +#include <ydb/core/scheme_types/scheme_type_info.h> #include <ydb/core/tx/columnshard/engines/scheme/defaults/common/scalar.h> +#include <ydb/core/tx/schemeshard/olap/column_families/schema.h> +#include <ydb/core/tx/schemeshard/olap/common/common.h> + +#include <ydb/library/accessor/accessor.h> namespace NKikimr::NSchemeShard { @@ -19,40 +21,13 @@ private: YDB_READONLY_DEF(std::optional<TString>, StorageId); YDB_READONLY_DEF(std::optional<TString>, DefaultValue); YDB_READONLY_DEF(NArrow::NAccessor::TRequestedConstructorContainer, AccessorConstructor); + YDB_READONLY_DEF(std::optional<TString>, ColumnFamilyName); + public: - bool ParseFromRequest(const NKikimrSchemeOp::TOlapColumnDiff& columnSchema, IErrorCollector& errors) { - Name = columnSchema.GetName(); - if (!!columnSchema.GetStorageId()) { - StorageId = columnSchema.GetStorageId(); - } - if (!Name) { - errors.AddError("empty field name"); - return false; - } - if (columnSchema.HasDefaultValue()) { - DefaultValue = columnSchema.GetDefaultValue(); - } - if (columnSchema.HasDataAccessorConstructor()) { - if (!AccessorConstructor.DeserializeFromProto(columnSchema.GetDataAccessorConstructor())) { - errors.AddError("cannot parse accessor constructor from proto"); - return false; - } - } - if (columnSchema.HasSerializer()) { - if (!Serializer.DeserializeFromProto(columnSchema.GetSerializer())) { - errors.AddError("cannot parse serializer diff from proto"); - return false; - } - } - if (!DictionaryEncoding.DeserializeFromProto(columnSchema.GetDictionaryEncoding())) { - errors.AddError("cannot parse dictionary encoding diff from proto"); - return false; - } - return true; - } + bool ParseFromRequest(const NKikimrSchemeOp::TOlapColumnDiff& columnSchema, IErrorCollector& errors); }; -class TOlapColumnAdd { +class TOlapColumnBase { private: YDB_READONLY_DEF(std::optional<ui32>, KeyOrder); YDB_READONLY_DEF(TString, Name); @@ -60,19 +35,24 @@ private: YDB_READONLY_DEF(NScheme::TTypeInfo, Type); YDB_READONLY_DEF(TString, StorageId); YDB_FLAG_ACCESSOR(NotNull, false); - YDB_READONLY_DEF(std::optional<NArrow::NSerialization::TSerializerContainer>, Serializer); + YDB_ACCESSOR_DEF(std::optional<NArrow::NSerialization::TSerializerContainer>, Serializer); YDB_READONLY_DEF(std::optional<NArrow::NDictionary::TEncodingSettings>, DictionaryEncoding); YDB_READONLY_DEF(NOlap::TColumnDefaultScalarValue, DefaultValue); YDB_READONLY_DEF(NArrow::NAccessor::TConstructorContainer, AccessorConstructor); -public: - TOlapColumnAdd(const std::optional<ui32>& keyOrder) - : KeyOrder(keyOrder) { + YDB_READONLY_PROTECT(std::optional<ui32>, ColumnFamilyId, std::nullopt); +public: + TOlapColumnBase(const std::optional<ui32>& keyOrder, const std::optional<ui32> columnFamilyId = {}) + : KeyOrder(keyOrder) + , ColumnFamilyId(columnFamilyId) + { } bool ParseFromRequest(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema, IErrorCollector& errors); void ParseFromLocalDB(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema); void Serialize(NKikimrSchemeOp::TOlapColumnDescription& columnSchema) const; bool ApplyDiff(const TOlapColumnDiff& diffColumn, IErrorCollector& errors); + bool ApplySerializerFromColumnFamily(const TOlapColumnFamiliesDescription& columnFamilies, IErrorCollector& errors); + bool ApplyDiff(const TOlapColumnDiff& diffColumn, const TOlapColumnFamiliesDescription& columnFamilies, IErrorCollector& errors); bool IsKeyColumn() const { return !!KeyOrder; } @@ -81,6 +61,20 @@ public: static bool IsAllowedPgType(ui32 pgTypeId); }; +class TOlapColumnAdd: public TOlapColumnBase { +private: + using TBase = TOlapColumnBase; + YDB_READONLY_DEF(std::optional<TString>, ColumnFamilyName); + +public: + TOlapColumnAdd(const std::optional<ui32>& keyOrder) + : TBase(keyOrder) + { + } + bool ParseFromRequest(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema, IErrorCollector& errors); + void ParseFromLocalDB(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema); +}; + class TOlapColumnsUpdate { private: YDB_READONLY_DEF(TVector<TOlapColumnAdd>, AddColumns); diff --git a/ydb/core/tx/schemeshard/olap/operations/alter/abstract/converter.h b/ydb/core/tx/schemeshard/olap/operations/alter/abstract/converter.h index 7a45468961..e1974eb03c 100644 --- a/ydb/core/tx/schemeshard/olap/operations/alter/abstract/converter.h +++ b/ydb/core/tx/schemeshard/olap/operations/alter/abstract/converter.h @@ -50,6 +50,12 @@ private: return parse; } } + + for (auto&& family : dsDescription.GetPartitionConfig().GetColumnFamilies()) { + NKikimrSchemeOp::TAlterColumnTableSchema* alterSchema = olapDescription.MutableAlterSchema(); + alterSchema->AddAddColumnFamily()->CopyFrom(family); + } + return TConclusionStatus::Success(); } @@ -71,11 +77,27 @@ private: if (dsColumn.HasDefaultFromSequence()) { return TConclusionStatus::Fail("DefaultFromSequence not supported"); } - if (dsColumn.HasFamilyName() || dsColumn.HasFamily()) { - return TConclusionStatus::Fail("FamilyName and Family not supported"); + if (dsColumn.HasFamilyName()) { + olapColumn.SetColumnFamilyName(dsColumn.GetFamilyName()); + } + if (dsColumn.HasFamily()) { + olapColumn.SetColumnFamilyId(dsColumn.GetFamily()); } return TConclusionStatus::Success(); } + + TConclusionStatus ParseFromDSRequest( + const NKikimrSchemeOp::TColumnDescription& dsColumn, NKikimrSchemeOp::TOlapColumnDiff& olapColumn) const { + olapColumn.SetName(dsColumn.GetName()); + if (dsColumn.HasDefaultFromSequence()) { + return TConclusionStatus::Fail("DefaultFromSequence not supported"); + } + if (dsColumn.HasFamilyName()) { + olapColumn.SetColumnFamilyName(dsColumn.GetFamilyName()); + } + return TConclusionStatus::Success(); + } + public: TConclusion<NKikimrSchemeOp::TAlterColumnTable> Convert(const NKikimrSchemeOp::TModifyScheme& modify) { NKikimrSchemeOp::TAlterColumnTable result; diff --git a/ydb/core/tx/schemeshard/olap/operations/create_table.cpp b/ydb/core/tx/schemeshard/olap/operations/create_table.cpp index 0c155d68d7..48b515f4f5 100644 --- a/ydb/core/tx/schemeshard/olap/operations/create_table.cpp +++ b/ydb/core/tx/schemeshard/olap/operations/create_table.cpp @@ -558,13 +558,35 @@ class TCreateColumnTable: public TSubOperation { public: using TSubOperation::TSubOperation; + void AddDefaultFamilyIfNotExists(NKikimrSchemeOp::TColumnTableDescription& createDescription) { + auto schema = createDescription.GetSchema(); + for (const auto& family : schema.GetColumnFamilies()) { + if (family.GetName() == "default") { + return; + } + } + + auto mutableSchema = createDescription.MutableSchema(); + auto defaultFamily = mutableSchema->AddColumnFamilies(); + defaultFamily->SetName("default"); + defaultFamily->SetId(0); + defaultFamily->SetColumnCodec(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); + + for (ui32 i = 0; i < schema.ColumnsSize(); i++) { + if (!schema.GetColumns(i).HasColumnFamilyName() || !schema.GetColumns(i).HasColumnFamilyId()) { + mutableSchema->MutableColumns(i)->SetColumnFamilyName("default"); + mutableSchema->MutableColumns(i)->SetColumnFamilyId(0); + } + } + } + THolder<TProposeResponse> Propose(const TString& owner, TOperationContext& context) override { const TTabletId ssId = context.SS->SelfTabletId(); const auto acceptExisted = !Transaction.GetFailOnExist(); const TString& parentPathStr = Transaction.GetWorkingDir(); - // Copy CreateColumnTable for changes. Update default sharding if not set. + // Copy CreateColumnTable for changes. Update default sharding if not set and add default family if not set. auto createDescription = Transaction.GetCreateColumnTable(); if (!createDescription.HasColumnShardCount()) { createDescription.SetColumnShardCount(TTableConstructorBase::DEFAULT_SHARDS_COUNT); @@ -698,6 +720,7 @@ public: result->SetError(NKikimrScheme::StatusSchemeError, errStr); return result; } + AddDefaultFamilyIfNotExists(createDescription); TOlapTableConstructor tableConstructor; tableInfo = tableConstructor.BuildTableInfo(createDescription, context, errors); } diff --git a/ydb/core/tx/schemeshard/olap/schema/schema.cpp b/ydb/core/tx/schemeshard/olap/schema/schema.cpp index c9c71152b4..97cc54eeaa 100644 --- a/ydb/core/tx/schemeshard/olap/schema/schema.cpp +++ b/ydb/core/tx/schemeshard/olap/schema/schema.cpp @@ -1,4 +1,5 @@ #include "schema.h" + #include <ydb/core/tx/schemeshard/common/validation.h> #include <ydb/core/tx/schemeshard/olap/ttl/validator.h> @@ -7,7 +8,7 @@ namespace NKikimr::NSchemeShard { bool TOlapSchema::ValidateTtlSettings(const NKikimrSchemeOp::TColumnDataLifeCycle& ttl, IErrorCollector& errors) const { using TTtlProto = NKikimrSchemeOp::TColumnDataLifeCycle; switch (ttl.GetStatusCase()) { - case TTtlProto::kEnabled: + case TTtlProto::kEnabled: { const auto* column = Columns.GetByName(ttl.GetEnabled().GetColumnName()); if (!column) { @@ -25,7 +26,11 @@ bool TOlapSchema::ValidateTtlSettings(const NKikimrSchemeOp::TColumnDataLifeCycl } bool TOlapSchema::Update(const TOlapSchemaUpdate& schemaUpdate, IErrorCollector& errors) { - if (!Columns.ApplyUpdate(schemaUpdate.GetColumns(), errors, NextColumnId)) { + if (!ColumnFamilies.ApplyUpdate(schemaUpdate.GetColumnFamilies(), errors, NextColumnFamilyId)) { + return false; + } + + if (!Columns.ApplyUpdate(schemaUpdate.GetColumns(), ColumnFamilies, errors, NextColumnId)) { return false; } @@ -43,8 +48,10 @@ bool TOlapSchema::Update(const TOlapSchemaUpdate& schemaUpdate, IErrorCollector& void TOlapSchema::ParseFromLocalDB(const NKikimrSchemeOp::TColumnTableSchema& tableSchema) { NextColumnId = tableSchema.GetNextColumnId(); + NextColumnFamilyId = tableSchema.GetNextColumnFamilyId(); Version = tableSchema.GetVersion(); + ColumnFamilies.Parse(tableSchema); Columns.Parse(tableSchema); Indexes.Parse(tableSchema); Options.Parse(tableSchema); @@ -53,8 +60,10 @@ void TOlapSchema::ParseFromLocalDB(const NKikimrSchemeOp::TColumnTableSchema& ta void TOlapSchema::Serialize(NKikimrSchemeOp::TColumnTableSchema& tableSchemaExt) const { NKikimrSchemeOp::TColumnTableSchema resultLocal; resultLocal.SetNextColumnId(NextColumnId); + resultLocal.SetNextColumnFamilyId(NextColumnFamilyId); resultLocal.SetVersion(Version); + ColumnFamilies.Serialize(resultLocal); Columns.Serialize(resultLocal); Indexes.Serialize(resultLocal); Options.Serialize(resultLocal); diff --git a/ydb/core/tx/schemeshard/olap/schema/schema.h b/ydb/core/tx/schemeshard/olap/schema/schema.h index 31baf692e9..309ce3ab69 100644 --- a/ydb/core/tx/schemeshard/olap/schema/schema.h +++ b/ydb/core/tx/schemeshard/olap/schema/schema.h @@ -1,10 +1,12 @@ #pragma once -#include <ydb/core/tx/schemeshard/olap/columns/update.h> -#include <ydb/core/tx/schemeshard/olap/indexes/update.h> +#include "update.h" + +#include <ydb/core/tx/schemeshard/olap/column_families/schema.h> #include <ydb/core/tx/schemeshard/olap/columns/schema.h> +#include <ydb/core/tx/schemeshard/olap/columns/update.h> #include <ydb/core/tx/schemeshard/olap/indexes/schema.h> +#include <ydb/core/tx/schemeshard/olap/indexes/update.h> #include <ydb/core/tx/schemeshard/olap/options/schema.h> -#include "update.h" namespace NKikimr::NSchemeShard { @@ -13,9 +15,11 @@ namespace NKikimr::NSchemeShard { YDB_READONLY_DEF(TOlapColumnsDescription, Columns); YDB_READONLY_DEF(TOlapIndexesDescription, Indexes); YDB_READONLY_DEF(TOlapOptionsDescription, Options); + YDB_READONLY_DEF(TOlapColumnFamiliesDescription, ColumnFamilies); YDB_READONLY(ui32, NextColumnId, 1); YDB_READONLY(ui32, Version, 0); + YDB_READONLY(ui32, NextColumnFamilyId, 1); public: bool Update(const TOlapSchemaUpdate& schemaUpdate, IErrorCollector& errors); diff --git a/ydb/core/tx/schemeshard/olap/schema/update.cpp b/ydb/core/tx/schemeshard/olap/schema/update.cpp index 30555f63be..0414d14bec 100644 --- a/ydb/core/tx/schemeshard/olap/schema/update.cpp +++ b/ydb/core/tx/schemeshard/olap/schema/update.cpp @@ -2,27 +2,35 @@ namespace NKikimr::NSchemeShard { - bool TOlapSchemaUpdate::Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema, IErrorCollector& errors, bool allowNullKeys) { - if (!Columns.Parse(tableSchema, errors, allowNullKeys)) { - return false; - } +bool TOlapSchemaUpdate::Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema, IErrorCollector& errors, bool allowNullKeys) { + if (!ColumnFamilies.Parse(tableSchema, errors)) { + return false; + } - return true; + if (!Columns.Parse(tableSchema, errors, allowNullKeys)) { + return false; } - bool TOlapSchemaUpdate::Parse(const NKikimrSchemeOp::TAlterColumnTableSchema& alterRequest, IErrorCollector& errors) { - if (!Columns.Parse(alterRequest, errors)) { - return false; - } + return true; +} + +bool TOlapSchemaUpdate::Parse(const NKikimrSchemeOp::TAlterColumnTableSchema& alterRequest, IErrorCollector& errors) { + if (!ColumnFamilies.Parse(alterRequest, errors)) { + return false; + } - if (!Indexes.Parse(alterRequest, errors)) { - return false; - } + if (!Columns.Parse(alterRequest, errors)) { + return false; + } - if (!Options.Parse(alterRequest, errors)) { - return false; - } + if (!Indexes.Parse(alterRequest, errors)) { + return false; + } - return true; + if (!Options.Parse(alterRequest, errors)) { + return false; } + + return true; +} } diff --git a/ydb/core/tx/schemeshard/olap/schema/update.h b/ydb/core/tx/schemeshard/olap/schema/update.h index 70fed2fcf0..fe625bbe9a 100644 --- a/ydb/core/tx/schemeshard/olap/schema/update.h +++ b/ydb/core/tx/schemeshard/olap/schema/update.h @@ -1,17 +1,21 @@ #pragma once -#include <ydb/library/accessor/accessor.h> -#include <ydb/core/tx/schemeshard/olap/options/update.h> +#include <ydb/core/tx/schemeshard/olap/column_families/update.h> #include <ydb/core/tx/schemeshard/olap/columns/update.h> #include <ydb/core/tx/schemeshard/olap/indexes/update.h> +#include <ydb/core/tx/schemeshard/olap/options/update.h> + +#include <ydb/library/accessor/accessor.h> namespace NKikimr::NSchemeShard { - class TOlapSchemaUpdate { - YDB_READONLY_DEF(TOlapColumnsUpdate, Columns); - YDB_READONLY_DEF(TOlapIndexesUpdate, Indexes); - YDB_READONLY_DEF(TOlapOptionsUpdate, Options); - public: - bool Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema, IErrorCollector& errors, bool allowNullKeys = false); - bool Parse(const NKikimrSchemeOp::TAlterColumnTableSchema& alterRequest, IErrorCollector& errors); - }; +class TOlapSchemaUpdate { + YDB_READONLY_DEF(TOlapColumnsUpdate, Columns); + YDB_READONLY_DEF(TOlapIndexesUpdate, Indexes); + YDB_READONLY_DEF(TOlapOptionsUpdate, Options); + YDB_READONLY_DEF(TOlapColumnFamiliesUpdate, ColumnFamilies); + +public: + bool Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema, IErrorCollector& errors, bool allowNullKeys = false); + bool Parse(const NKikimrSchemeOp::TAlterColumnTableSchema& alterRequest, IErrorCollector& errors); +}; } diff --git a/ydb/core/tx/schemeshard/olap/schema/ya.make b/ydb/core/tx/schemeshard/olap/schema/ya.make index 76b2d2d1c8..77e32c36b5 100644 --- a/ydb/core/tx/schemeshard/olap/schema/ya.make +++ b/ydb/core/tx/schemeshard/olap/schema/ya.make @@ -6,6 +6,7 @@ SRCS( ) PEERDIR( + ydb/core/tx/schemeshard/olap/column_families ydb/core/tx/schemeshard/olap/columns ydb/core/tx/schemeshard/olap/indexes ydb/core/tx/schemeshard/olap/options diff --git a/ydb/core/tx/schemeshard/olap/ya.make b/ydb/core/tx/schemeshard/olap/ya.make index 4fde54f9fb..24a993e328 100644 --- a/ydb/core/tx/schemeshard/olap/ya.make +++ b/ydb/core/tx/schemeshard/olap/ya.make @@ -13,6 +13,7 @@ PEERDIR( ydb/core/tx/schemeshard/olap/store ydb/core/tx/schemeshard/olap/table ydb/core/tx/schemeshard/olap/ttl + ydb/core/tx/schemeshard/olap/column_families ) END() diff --git a/ydb/core/ydb_convert/table_description.cpp b/ydb/core/ydb_convert/table_description.cpp index 3370615dd1..4738846fbd 100644 --- a/ydb/core/ydb_convert/table_description.cpp +++ b/ydb/core/ydb_convert/table_description.cpp @@ -765,6 +765,10 @@ bool FillColumnDescriptionImpl(TColumnTable& out, const google::protobuf::Repeat if (NScheme::NTypeIds::IsParametrizedType(typeInfo.GetTypeId())) { NScheme::ProtoFromTypeInfo(typeInfo, typeMod, *columnDesc->MutableTypeInfo()); } + + if (!column.Getfamily().empty()) { + columnDesc->SetColumnFamilyName(column.Getfamily()); + } } return true; @@ -780,6 +784,38 @@ bool FillColumnDescription(NKikimrSchemeOp::TAlterColumnTable& out, const google return FillColumnDescriptionImpl(out, in, status, error); } +bool FillColumnFamily( + const Ydb::Table::ColumnFamily& from, NKikimrSchemeOp::TFamilyDescription* to, Ydb::StatusIds::StatusCode& status, TString& error) { + to->SetName(from.name()); + if (from.has_data()) { + status = Ydb::StatusIds::BAD_REQUEST; + error = TStringBuilder() << "Field `DATA` is not supported for OLAP tables in column family '" << from.name() << "'"; + return false; + } + switch (from.compression()) { + case Ydb::Table::ColumnFamily::COMPRESSION_UNSPECIFIED: + break; + case Ydb::Table::ColumnFamily::COMPRESSION_NONE: + to->SetColumnCodec(NKikimrSchemeOp::ColumnCodecPlain); + break; + case Ydb::Table::ColumnFamily::COMPRESSION_LZ4: + to->SetColumnCodec(NKikimrSchemeOp::ColumnCodecLZ4); + break; + case Ydb::Table::ColumnFamily::COMPRESSION_ZSTD: + to->SetColumnCodec(NKikimrSchemeOp::ColumnCodecZSTD); + break; + default: + status = Ydb::StatusIds::BAD_REQUEST; + error = TStringBuilder() << "Unsupported compression value " << (ui32)from.compression() << " in column family '" << from.name() + << "'"; + return false; + } + if (from.has_compression_level()) { + to->SetColumnCodecLevel(from.compression_level()); + } + return true; +} + bool BuildAlterColumnTableModifyScheme(const TString& path, const Ydb::Table::AlterTableRequest* req, NKikimrSchemeOp::TModifyScheme* modifyScheme, Ydb::StatusIds::StatusCode& status, TString& error) { const auto ops = GetAlterOperationKinds(req); @@ -822,6 +858,31 @@ bool BuildAlterColumnTableModifyScheme(const TString& path, const Ydb::Table::Al return false; } + for (const auto& alter : req->alter_columns()) { + auto alterColumn = alterColumnTable->MutableAlterSchema()->AddAlterColumns(); + alterColumn->SetName(alter.Getname()); + + if (!alter.family().empty()) { + alterColumn->SetColumnFamilyName(alter.family()); + } + } + + for (const auto& add : req->add_column_families()) { + if (add.compression() == Ydb::Table::ColumnFamily::COMPRESSION_UNSPECIFIED) { + status = Ydb::StatusIds::BAD_REQUEST; + error = TStringBuilder() << "Compression value is not set for column family '" << add.name() << "'"; + } + if (!FillColumnFamily(add, alterColumnTable->MutableAlterSchema()->AddAddColumnFamily(), status, error)) { + return false; + } + } + + for (const auto& alter : req->alter_column_families()) { + if (!FillColumnFamily(alter, alterColumnTable->MutableAlterSchema()->AddAlterColumnFamily(), status, error)) { + return false; + } + } + if (req->has_set_ttl_settings()) { if (!FillTtlSettings(*alterColumnTable->MutableAlterTtlSettings()->MutableEnabled(), req->Getset_ttl_settings(), status, error)) { return false; diff --git a/ydb/public/api/protos/ydb_table.proto b/ydb/public/api/protos/ydb_table.proto index 0cd658c453..6c307a27c3 100644 --- a/ydb/public/api/protos/ydb_table.proto +++ b/ydb/public/api/protos/ydb_table.proto @@ -518,6 +518,7 @@ message ColumnFamily { COMPRESSION_UNSPECIFIED = 0; COMPRESSION_NONE = 1; COMPRESSION_LZ4 = 2; + COMPRESSION_ZSTD = 3; } // Name of the column family, the name "default" must be used for the |