diff options
author | nsofya <nsofya@yandex-team.com> | 2023-04-04 19:31:00 +0300 |
---|---|---|
committer | nsofya <nsofya@yandex-team.com> | 2023-04-04 19:31:00 +0300 |
commit | 9c844c132d2f12b82fff2fff1bd0f7048fa96b28 (patch) | |
tree | 1cf6d55d949d97acb09406991d13e3fb3370e0d0 | |
parent | 3f75ec8cd3385bb1b3218186ed863714070c3b47 (diff) | |
download | ydb-9c844c132d2f12b82fff2fff1bd0f7048fa96b28.tar.gz |
Table schema modification prepare
Что сделала тут:
1. И основное, преобразовала класс TOlapScheme так, чтобы разделить логику построения схемы (сам TOlapSchema) и логику парсинга запроса (TOlapSchemaUpdate). И в итоге имеем прозрачную и читаемую логику: распарсил дифф (новая таблица - это тоже дифф к пустой схеме) -> наложил на существующую схему -> серилизовал результат и сохранил. По идее должно завестись на AddColumn , как только уберем проверку, которая не позволяет сейчас схему передавать просто во входящем протобуфе. Но полностью это я смогу протестировать, когда поддержку на уровне колумншарда работу с разными версиями схем.
2. Сделала пару полезных функций внутри схемашарда: по поиску транзакции и проверки стейта (а то везде с разной степенью надежности пишутся эти проверки) и по инициализации транзакции внутри схемашарда - InitializeSchemaTx . Пока делала использование только в тех файлах, которые правила.
16 files changed, 866 insertions, 794 deletions
diff --git a/ydb/core/tx/schemeshard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/schemeshard/CMakeLists.darwin-x86_64.txt index fc366875e06..5235a82ee86 100644 --- a/ydb/core/tx/schemeshard/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/schemeshard/CMakeLists.darwin-x86_64.txt @@ -234,6 +234,7 @@ target_sources(core-tx-schemeshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_effective_acl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_identificators.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_info_types.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_olap_types.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_path_element.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_path.cpp diff --git a/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt index 213ff7b5b4e..5946984c262 100644 --- a/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt @@ -235,6 +235,7 @@ target_sources(core-tx-schemeshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_effective_acl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_identificators.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_info_types.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_olap_types.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_path_element.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_path.cpp diff --git a/ydb/core/tx/schemeshard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/schemeshard/CMakeLists.linux-x86_64.txt index 213ff7b5b4e..5946984c262 100644 --- a/ydb/core/tx/schemeshard/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/schemeshard/CMakeLists.linux-x86_64.txt @@ -235,6 +235,7 @@ target_sources(core-tx-schemeshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_effective_acl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_identificators.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_info_types.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_olap_types.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_path_element.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_path.cpp diff --git a/ydb/core/tx/schemeshard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/schemeshard/CMakeLists.windows-x86_64.txt index acc7c9238b5..beb995860d5 100644 --- a/ydb/core/tx/schemeshard/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/schemeshard/CMakeLists.windows-x86_64.txt @@ -234,6 +234,7 @@ target_sources(core-tx-schemeshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_effective_acl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_identificators.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_info_types.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_olap_types.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_path_element.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_path.cpp diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_store.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_store.cpp index 3ae45674d72..0fdae35b713 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_store.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_store.cpp @@ -44,7 +44,7 @@ TOlapStoreInfo::TPtr ParseParams( const ui32 presetId = alterData->SchemaPresetByName.at(presetName); auto& preset = alterData->SchemaPresets.at(presetId); - auto& presetProto = *alterData->Description.MutableSchemaPresets(preset.ProtoIndex); + auto& presetProto = *alterData->Description.MutableSchemaPresets(preset.GetProtoIndex()); auto& schemaProto = *presetProto.MutableSchema(); const auto& alterSchema = alterSchemaPreset.GetAlterSchema(); @@ -52,7 +52,7 @@ TOlapStoreInfo::TPtr ParseParams( THashSet<ui32> droppedColumns; for (const auto& dropColumn : alterSchema.GetDropColumns()) { const TString& columnName = dropColumn.GetName(); - const auto* column = preset.FindColumnByName(columnName); + const auto* column = preset.GetColumnByName(columnName); if (!column) { status = NKikimrScheme::StatusInvalidParameter; errStr = TStringBuilder() << "Cannot drop non-existant column '" << columnName << "'"; @@ -225,8 +225,7 @@ public: DebugHint() << " ProgressState" << " at tabletId# " << ssId); - TTxState* txState = context.SS->FindTx(OperationId); - Y_VERIFY(txState->TxType == TTxState::TxAlterOlapStore); + TTxState* txState = context.SS->FindTxSafe(OperationId, TTxState::TxAlterOlapStore); TOlapStoreInfo::TPtr storeInfo = context.SS->OlapStores[txState->TargetPathId]; Y_VERIFY(storeInfo); TOlapStoreInfo::TPtr alterData = storeInfo->AlterData; @@ -234,8 +233,6 @@ public: txState->ClearShardsInProgress(); - auto seqNo = context.SS->StartRound(*txState); - TVector<ui32> droppedSchemaPresets; for (const auto& presetProto : storeInfo->Description.GetSchemaPresets()) { const ui32 presetId = presetProto.GetId(); @@ -257,6 +254,7 @@ public: TString columnShardTxBody; { + auto seqNo = context.SS->StartRound(*txState); NKikimrTxColumnShard::TSchemaTxBody tx; context.SS->FillSeqNo(tx, seqNo); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp index dd110ab865d..7e927ea4520 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp @@ -1,5 +1,6 @@ #include "schemeshard__operation_part.h" #include "schemeshard__operation_common.h" +#include "schemeshard_olap_types.h" #include "schemeshard_impl.h" #include <ydb/core/scheme/scheme_types_proto.h> @@ -11,107 +12,99 @@ using namespace NSchemeShard; class TTableInfoConstructor { - const NKikimrSchemeOp::TModifyScheme& ModifyRequest; NKikimrSchemeOp::TAlterColumnTable AlterRequest; public: - TTableInfoConstructor(const NKikimrSchemeOp::TModifyScheme& modify) - : ModifyRequest(modify) {} - - const TString& GetTableName() const { - return AlterRequest.GetName(); - } - - bool CheckAndPrepare(TProposeResponse& errors) { - if (ModifyRequest.HasAlterColumnTable()) { - if (ModifyRequest.GetOperationType() != NKikimrSchemeOp::ESchemeOpAlterColumnTable) { - errors.SetError(NKikimrScheme::StatusSchemeError, "Invalid operation type"); + bool Deserialize(const NKikimrSchemeOp::TModifyScheme& modify, IErrorCollector& errors) { + if (modify.HasAlterColumnTable()) { + if (modify.GetOperationType() != NKikimrSchemeOp::ESchemeOpAlterColumnTable) { + errors.AddError(NKikimrScheme::StatusSchemeError, "Invalid operation type"); return false; } - AlterRequest = ModifyRequest.GetAlterColumnTable(); + AlterRequest = modify.GetAlterColumnTable(); } else { // from DDL (not known table type) - if (ModifyRequest.GetOperationType() != NKikimrSchemeOp::ESchemeOpAlterTable) { - errors.SetError(NKikimrScheme::StatusSchemeError, "Invalid operation type"); + if (modify.GetOperationType() != NKikimrSchemeOp::ESchemeOpAlterTable) { + errors.AddError(NKikimrScheme::StatusSchemeError, "Invalid operation type"); return false; } - if (!ParseFromDSRequest(ModifyRequest.GetAlterTable(), AlterRequest, errors)) { + if (!ParseFromDSRequest(modify.GetAlterTable(), AlterRequest, errors)) { return false; } } if (!AlterRequest.HasName()) { - errors.SetError(NKikimrScheme::StatusInvalidParameter, "No table name in Alter"); + errors.AddError(NKikimrScheme::StatusInvalidParameter, "No table name in Alter"); return false; } if (AlterRequest.HasAlterSchema() || AlterRequest.HasAlterSchemaPresetName()) { - errors.SetError(NKikimrScheme::StatusSchemeError, "Changing table schema is not supported"); + errors.AddError(NKikimrScheme::StatusSchemeError, "Changing table schema is not supported"); return false; } if (AlterRequest.HasRESERVED_AlterTtlSettingsPresetName()) { - errors.SetError(NKikimrScheme::StatusSchemeError, "TTL presets are not supported"); + errors.AddError(NKikimrScheme::StatusSchemeError, "TTL presets are not supported"); return false; } return true; } - TColumnTableInfo::TPtr BuildTableinfo(const TTablesStorage::TTableExtractedGuard& tableInfo, const TOlapStoreInfo::TPtr& storeInfo, TProposeResponse& errors) const { - TColumnTableInfo::TPtr alterData = new TColumnTableInfo(*tableInfo); - alterData->AlterBody.ConstructInPlace(AlterRequest); - ++alterData->AlterVersion; - - ui64 currentTtlVersion = 0; - if (alterData->Description.HasTtlSettings()) { - currentTtlVersion = alterData->Description.GetTtlSettings().GetVersion(); - } - - const NKikimrSchemeOp::TColumnTableSchema* tableSchema = nullptr; + const NKikimrSchemeOp::TColumnTableSchema* GetTableSchema(const TTablesStorage::TTableExtractedGuard& tableInfo, const TOlapStoreInfo::TPtr& storeInfo, IErrorCollector& errors) const { if (storeInfo) { if (!storeInfo->SchemaPresets.count(tableInfo->Description.GetSchemaPresetId())) { - errors.SetError(NKikimrScheme::StatusSchemeError, "No preset for in-store column table"); + errors.AddError(NKikimrScheme::StatusSchemeError, "No preset for in-store column table"); return nullptr; } auto& preset = storeInfo->SchemaPresets.at(tableInfo->Description.GetSchemaPresetId()); - auto& presetProto = storeInfo->Description.GetSchemaPresets(preset.ProtoIndex); + auto& presetProto = storeInfo->Description.GetSchemaPresets(preset.GetProtoIndex()); if (!presetProto.HasSchema()) { - errors.SetError(NKikimrScheme::StatusSchemeError, "No schema in preset for in-store column table"); + errors.AddError(NKikimrScheme::StatusSchemeError, "No schema in preset for in-store column table"); return nullptr; } - - tableSchema = &presetProto.GetSchema(); + return &presetProto.GetSchema(); } else { if (!tableInfo->Description.HasSchema()) { - errors.SetError(NKikimrScheme::StatusSchemeError, "No schema for standalone column table"); + errors.AddError(NKikimrScheme::StatusSchemeError, "No schema for standalone column table"); return nullptr; } - - tableSchema = &tableInfo->Description.GetSchema(); + return &tableInfo->Description.GetSchema(); } + } - THashMap<ui32, TOlapSchema::TColumn> columns; - THashMap<TString, ui32> columnsByName; - for (const auto& col : tableSchema->GetColumns()) { - ui32 id = col.GetId(); - TString name = col.GetName(); - auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(col.GetTypeId(), - col.HasTypeInfo() ? &col.GetTypeInfo() : nullptr); - columns[id] = TOlapSchema::TColumn{id, name, typeInfoMod.TypeInfo, Max<ui32>()}; - columnsByName[name] = id; + TColumnTableInfo::TPtr BuildTableInfo(const TTablesStorage::TTableExtractedGuard& tableInfo, const TOlapStoreInfo::TPtr& storeInfo, IErrorCollector& errors) const { + TColumnTableInfo::TPtr alterData = new TColumnTableInfo(*tableInfo); + alterData->AlterBody.ConstructInPlace(AlterRequest); + ++alterData->AlterVersion; - // TODO: add checks for compatibility with new schema after we allow such changes + const NKikimrSchemeOp::TColumnTableSchema* tableSchema = GetTableSchema(tableInfo, storeInfo, errors); + if (!tableSchema) { + return nullptr; } - if (AlterRequest.HasAlterTtlSettings()) { - TString errStr; - if (!ValidateTtlSettings(AlterRequest.GetAlterTtlSettings(), columns, columnsByName, errStr)) { - errors.SetError(NKikimrScheme::StatusSchemeError, errStr); + TOlapSchema localSchema; + localSchema.Parse(*tableSchema); + if (!storeInfo) { + TOlapSchemaUpdate schemaUpdate; + if (!schemaUpdate.Parse(AlterRequest, errors)) { return nullptr; } + if (!localSchema.Update(schemaUpdate, errors)) { + return nullptr; + } + NKikimrSchemeOp::TColumnTableSchema schemaUdpateProto; + localSchema.Serialize(schemaUdpateProto); + *alterData->Description.MutableSchema() = schemaUdpateProto; + } + + if (AlterRequest.HasAlterTtlSettings()) { + if (!localSchema.ValidateTtlSettings(AlterRequest.GetAlterTtlSettings(), errors)) { + return nullptr; + } + const ui64 currentTtlVersion = alterData->Description.HasTtlSettings() ? alterData->Description.GetTtlSettings().GetVersion() : 0; *alterData->Description.MutableTtlSettings() = AlterRequest.GetAlterTtlSettings(); alterData->Description.MutableTtlSettings()->SetVersion(currentTtlVersion + 1); } @@ -119,7 +112,7 @@ public: } private: - bool ParseFromDSRequest(const NKikimrSchemeOp::TTableDescription& dsDescription, NKikimrSchemeOp::TAlterColumnTable& olapDescription, TProposeResponse& errors) const { + bool ParseFromDSRequest(const NKikimrSchemeOp::TTableDescription& dsDescription, NKikimrSchemeOp::TAlterColumnTable& olapDescription, IErrorCollector& errors) const { olapDescription.SetName(dsDescription.GetName()); if (dsDescription.HasTTLSettings()) { @@ -163,7 +156,7 @@ private: return true; } - bool ParseFromDSRequest(const NKikimrSchemeOp::TColumnDescription& dsColumn, NKikimrSchemeOp::TOlapColumnDescription& olapColumn, TProposeResponse& errors) const { + bool ParseFromDSRequest(const NKikimrSchemeOp::TColumnDescription& dsColumn, NKikimrSchemeOp::TOlapColumnDescription& olapColumn, IErrorCollector& errors) const { olapColumn.SetName(dsColumn.GetName()); olapColumn.SetType(dsColumn.GetType()); if (dsColumn.HasTypeId()) { @@ -179,11 +172,11 @@ private: olapColumn.SetId(dsColumn.GetId()); } if (dsColumn.HasDefaultFromSequence()) { - errors.SetError(NKikimrScheme::StatusInvalidParameter, "DefaultFromSequence not supported"); + errors.AddError(NKikimrScheme::StatusInvalidParameter, "DefaultFromSequence not supported"); return false; } if (dsColumn.HasFamilyName() || dsColumn.HasFamily()) { - errors.SetError(NKikimrScheme::StatusInvalidParameter, "FamilyName and Family not supported"); + errors.AddError(NKikimrScheme::StatusInvalidParameter, "FamilyName and Family not supported"); return false; } return true; @@ -218,9 +211,8 @@ public: DebugHint() << " ProgressState" << " at tabletId# " << ssId); - TTxState* txState = context.SS->FindTx(OperationId); - Y_VERIFY(txState->TxType == TTxState::TxAlterColumnTable); - + TTxState* txState = context.SS->FindTxSafe(OperationId, TTxState::TxAlterColumnTable); + TPathId pathId = txState->TargetPathId; TPath path = TPath::Init(pathId, context.SS); TString pathString = path.PathString(); @@ -235,14 +227,12 @@ public: } txState->ClearShardsInProgress(); - - auto seqNo = context.SS->StartRound(*txState); - TString columnShardTxBody; { + auto seqNo = context.SS->StartRound(*txState); NKikimrTxColumnShard::TSchemaTxBody tx; context.SS->FillSeqNo(tx, seqNo); - + auto* alter = tx.MutableAlterTable(); alter->SetPathId(pathId.LocalPathId); @@ -261,7 +251,7 @@ public: " in an olap store", presetId); auto &preset = storeInfo->SchemaPresets.at(presetId); - size_t presetIndex = preset.ProtoIndex; + size_t presetIndex = preset.GetProtoIndex(); *alter->MutableSchemaPreset() = storeInfo->Description.GetSchemaPresets(presetIndex); } @@ -333,9 +323,8 @@ public: << " at tablet: " << ssId << ", stepId: " << step); - TTxState* txState = context.SS->FindTx(OperationId); - Y_VERIFY(txState->TxType == TTxState::TxAlterColumnTable); - + TTxState* txState = context.SS->FindTxSafe(OperationId, TTxState::TxAlterColumnTable); + TPathId pathId = txState->TargetPathId; TPathElement::TPtr path = context.SS->PathsById.at(pathId); @@ -369,9 +358,7 @@ public: DebugHint() << " HandleReply ProgressState" << " at tablet: " << ssId); - TTxState* txState = context.SS->FindTx(OperationId); - Y_VERIFY(txState); - Y_VERIFY(txState->TxType == TTxState::TxAlterColumnTable); + TTxState* txState = context.SS->FindTxSafe(OperationId, TTxState::TxAlterColumnTable); TSet<TTabletId> shardSet; for (const auto& shard : txState->Shards) { @@ -406,10 +393,7 @@ public: } bool HandleReply(TEvColumnShard::TEvNotifyTxCompletionResult::TPtr& ev, TOperationContext& context) override { - TTxState* txState = context.SS->FindTx(OperationId); - Y_VERIFY(txState); - Y_VERIFY(txState->TxType == TTxState::TxAlterColumnTable); - + TTxState* txState = context.SS->FindTxSafe(OperationId, TTxState::TxAlterColumnTable); auto shardId = TTabletId(ev->Get()->Record.GetOrigin()); auto shardIdx = context.SS->MustGetShardIdx(shardId); Y_VERIFY(context.SS->ShardInfos.contains(shardIdx)); @@ -425,10 +409,7 @@ public: DebugHint() << " ProgressState" << " at tablet: " << ssId); - TTxState* txState = context.SS->FindTx(OperationId); - Y_VERIFY(txState); - Y_VERIFY(txState->TxType == TTxState::TxAlterColumnTable); - + TTxState* txState = context.SS->FindTxSafe(OperationId, TTxState::TxAlterColumnTable); txState->ClearShardsInProgress(); for (auto& shard : txState->Shards) { @@ -496,15 +477,9 @@ public: const TTabletId ssId = context.SS->SelfTabletId(); auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), ui64(ssId)); - - TTableInfoConstructor schemaConstructor(Transaction); - if (!schemaConstructor.CheckAndPrepare(*result)) { - return result; - } const TString& parentPathStr = Transaction.GetWorkingDir(); - const TString& name = schemaConstructor.GetTableName(); - + const TString& name = Transaction.HasAlterColumnTable() ? Transaction.GetAlterColumnTable().GetName() : Transaction.GetAlterTable().GetName(); LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TAlterColumnTable Propose" << ", path: " << parentPathStr << "/" << name @@ -540,6 +515,12 @@ public: return result; } + TProposeErrorCollector errors(*result); + TTableInfoConstructor schemaConstructor; + if (!schemaConstructor.Deserialize(Transaction, errors)) { + return result; + } + TOlapStoreInfo::TPtr storeInfo; if (tableInfo->OlapStorePathId) { auto& storePathId = *tableInfo->OlapStorePathId; @@ -562,7 +543,7 @@ public: storeInfo = context.SS->OlapStores.at(storePathId); } - TColumnTableInfo::TPtr alterData = schemaConstructor.BuildTableinfo(tableInfo, storeInfo, *result); + TColumnTableInfo::TPtr alterData = schemaConstructor.BuildTableInfo(tableInfo, storeInfo, errors); if (!alterData) { return result; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp index 27d193fc47f..48fae9cfcd1 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp @@ -1,6 +1,7 @@ #include "schemeshard__operation_part.h" #include "schemeshard__operation_common.h" #include "schemeshard_impl.h" +#include "schemeshard_olap_types.h" #include <ydb/core/base/subdomain.h> #include <ydb/core/scheme/scheme_types_proto.h> @@ -12,54 +13,41 @@ namespace { using namespace NKikimr; using namespace NSchemeShard; -bool PrepareSchema(NKikimrSchemeOp::TColumnTableSchema& proto, TOlapSchema& schema, TString& errStr) { - if (!TOlapSchema::UpdateProto(proto, errStr)) { - return false; - } - // Backward compatibility. It should be removed in future versions. - // ColumnShards do not allow nullable PK. But it was possible to make tables with such PK before. - bool allowNullableKeys = true; - return schema.Parse(proto, errStr, allowNullableKeys); -} // TODO: make it a part of TOlapStoreInfo bool PrepareSchemaPreset(NKikimrSchemeOp::TColumnTableSchemaPreset& proto, TOlapStoreInfo& store, - size_t protoIndex, TString& errStr) + size_t protoIndex, IErrorCollector& errors) { - if (proto.GetName().empty()) { - errStr = Sprintf("Schema preset name cannot be empty"); + TOlapStoreSchemaPreset preset; + if (!preset.ParseFromRequest(proto, errors)) { return false; } - if (!proto.HasId()) { - proto.SetId(store.Description.GetNextSchemaPresetId()); - store.Description.SetNextSchemaPresetId(proto.GetId() + 1); - } else if (proto.GetId() <= 0 || proto.GetId() >= store.Description.GetNextSchemaPresetId()) { - errStr = Sprintf("Schema preset id is incorrect"); + const auto presetId = store.Description.GetNextSchemaPresetId(); + if (store.SchemaPresets.contains(presetId) || store.SchemaPresetByName.contains(preset.GetName())) { + errors.AddError(Sprintf("Duplicate schema preset %" PRIu32 " with name '%s'", presetId, proto.GetName().c_str())); return false; } - if (store.SchemaPresets.contains(proto.GetId()) || - store.SchemaPresetByName.contains(proto.GetName())) - { - errStr = Sprintf("Duplicate schema preset %" PRIu32 " with name '%s'", proto.GetId(), proto.GetName().c_str()); + preset.SetId(presetId); + preset.SetProtoIndex(protoIndex); + + TOlapSchemaUpdate schemaDiff; + if (!schemaDiff.Parse(proto.GetSchema(), errors, true)) { return false; } - auto& preset = store.SchemaPresets[proto.GetId()]; - preset.Id = proto.GetId(); - preset.Name = proto.GetName(); - preset.ProtoIndex = protoIndex; - store.SchemaPresetByName[preset.Name] = preset.Id; - proto.MutableSchema()->SetNextColumnId(1); - proto.MutableSchema()->SetVersion(1); - - if (!PrepareSchema(*proto.MutableSchema(), preset, errStr)) { + if (!preset.Update(schemaDiff, errors)) { return false; } + proto.Clear(); + preset.Serialize(proto); + + store.Description.SetNextSchemaPresetId(presetId + 1); + store.SchemaPresetByName[preset.GetName()] = preset.GetId(); + store.SchemaPresets[preset.GetId()] = std::move(preset); return true; } -TOlapStoreInfo::TPtr CreateOlapStore(const NKikimrSchemeOp::TColumnStoreDescription& opSrc, - TEvSchemeShard::EStatus& status, TString& errStr) +TOlapStoreInfo::TPtr CreateOlapStore(const NKikimrSchemeOp::TColumnStoreDescription& opSrc, IErrorCollector& errors) { TOlapStoreInfo::TPtr storeInfo = new TOlapStoreInfo; storeInfo->AlterVersion = 1; @@ -67,20 +55,23 @@ TOlapStoreInfo::TPtr CreateOlapStore(const NKikimrSchemeOp::TColumnStoreDescript auto& op = storeInfo->Description; if (op.GetRESERVED_MetaShardCount() != 0) { - status = NKikimrScheme::StatusSchemeError; - errStr = Sprintf("trying to create OLAP store with meta shards (not supported yet)"); + errors.AddError("trying to create OLAP store with meta shards (not supported yet)"); return nullptr; } if (!op.HasColumnShardCount()) { - status = NKikimrScheme::StatusSchemeError; - errStr = Sprintf("trying to create OLAP store without shards number specified"); + errors.AddError("trying to create OLAP store without shards number specified"); return nullptr; } if (op.GetColumnShardCount() == 0) { - status = NKikimrScheme::StatusSchemeError; - errStr = Sprintf("trying to create OLAP store without zero shards"); + errors.AddError("trying to create OLAP store without zero shards"); + return nullptr; + } + + for (auto& presetProto : *op.MutableRESERVED_TtlSettingsPresets()) { + Y_UNUSED(presetProto); + errors.AddError("TTL presets are not supported"); return nullptr; } @@ -90,27 +81,16 @@ TOlapStoreInfo::TPtr CreateOlapStore(const NKikimrSchemeOp::TColumnStoreDescript size_t protoIndex = 0; for (auto& presetProto : *op.MutableSchemaPresets()) { if (presetProto.HasId()) { - status = NKikimrScheme::StatusSchemeError; - errStr = Sprintf("Schema preset id cannot be specified explicitly"); + errors.AddError("Schema preset id cannot be specified explicitly"); return nullptr; } - if (!PrepareSchemaPreset(presetProto, *storeInfo, protoIndex++, errStr)) { - status = NKikimrScheme::StatusSchemeError; + if (!PrepareSchemaPreset(presetProto, *storeInfo, protoIndex++, errors)) { return nullptr; } } - protoIndex = 0; - for (auto& presetProto : *op.MutableRESERVED_TtlSettingsPresets()) { - Y_UNUSED(presetProto); - status = NKikimrScheme::StatusSchemeError; - errStr = "TTL presets are not supported"; - return nullptr; - } - if (!storeInfo->SchemaPresetByName.contains("default") || storeInfo->SchemaPresets.size() > 1) { - status = NKikimrScheme::StatusSchemeError; - errStr = "A single schema preset named 'default' is required"; + errors.AddError("A single schema preset named 'default' is required"); return nullptr; } @@ -173,22 +153,19 @@ public: DebugHint() << " ProgressState" << " at tabletId# " << ssId); - TTxState* txState = context.SS->FindTx(OperationId); - Y_VERIFY(txState->TxType == TTxState::TxCreateOlapStore); + TTxState* txState = context.SS->FindTxSafe(OperationId, TTxState::TxCreateOlapStore); TOlapStoreInfo::TPtr pendingInfo = context.SS->OlapStores[txState->TargetPathId]; Y_VERIFY(pendingInfo); Y_VERIFY(pendingInfo->AlterData); TOlapStoreInfo::TPtr storeInfo = pendingInfo->AlterData; txState->ClearShardsInProgress(); - - auto seqNo = context.SS->StartRound(*txState); - TString columnShardTxBody; { + auto seqNo = context.SS->StartRound(*txState); NKikimrTxColumnShard::TSchemaTxBody tx; context.SS->FillSeqNo(tx, seqNo); - + NSchemeShard::TPath path = NSchemeShard::TPath::Init(txState->TargetPathId, context.SS); Y_VERIFY(path.IsResolved()); @@ -257,8 +234,7 @@ public: << " at tablet: " << ssId << ", stepId: " << step); - TTxState* txState = context.SS->FindTx(OperationId); - Y_VERIFY(txState->TxType == TTxState::TxCreateOlapStore); + TTxState* txState = context.SS->FindTxSafe(OperationId, TTxState::TxCreateOlapStore); TPathId pathId = txState->TargetPathId; TPathElement::TPtr path = context.SS->PathsById.at(pathId); @@ -301,9 +277,7 @@ public: DebugHint() << " ProgressState" << " at tablet: " << ssId); - TTxState* txState = context.SS->FindTx(OperationId); - Y_VERIFY(txState); - Y_VERIFY(txState->TxType == TTxState::TxCreateOlapStore); + TTxState* txState = context.SS->FindTxSafe(OperationId, TTxState::TxCreateOlapStore); TSet<TTabletId> shardSet; for (const auto& shard : txState->Shards) { @@ -338,9 +312,7 @@ public: } bool HandleReply(TEvColumnShard::TEvNotifyTxCompletionResult::TPtr& ev, TOperationContext& context) override { - TTxState* txState = context.SS->FindTx(OperationId); - Y_VERIFY(txState); - Y_VERIFY(txState->TxType == TTxState::TxCreateOlapStore); + TTxState* txState = context.SS->FindTxSafe(OperationId, TTxState::TxCreateOlapStore); auto shardId = TTabletId(ev->Get()->Record.GetOrigin()); auto shardIdx = context.SS->MustGetShardIdx(shardId); @@ -357,10 +329,7 @@ public: DebugHint() << " ProgressState" << " at tablet: " << ssId); - TTxState* txState = context.SS->FindTx(OperationId); - Y_VERIFY(txState); - Y_VERIFY(txState->TxType == TTxState::TxCreateOlapStore); - + TTxState* txState = context.SS->FindTxSafe(OperationId, TTxState::TxCreateOlapStore); txState->ClearShardsInProgress(); for (auto& shard : txState->Shards) { @@ -516,9 +485,9 @@ public: return result; } - TOlapStoreInfo::TPtr storeInfo = CreateOlapStore(createDescription, status, errStr); + TProposeErrorCollector errors(*result); + TOlapStoreInfo::TPtr storeInfo = CreateOlapStore(createDescription, errors); if (!storeInfo.Get()) { - result->SetError(status, errStr); return result; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp index 80566d4c67c..91f53bf1841 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp @@ -1,5 +1,6 @@ #include "schemeshard__operation_part.h" #include "schemeshard__operation_common.h" +#include "schemeshard_olap_types.h" #include "schemeshard_impl.h" #include <ydb/core/base/subdomain.h> @@ -12,182 +13,221 @@ namespace NKikimr::NSchemeShard { namespace { -bool PrepareSchema(NKikimrSchemeOp::TColumnTableSchema& proto, TOlapSchema& schema, TString& errStr) { - proto.SetNextColumnId(1); - proto.SetVersion(1); +class TTableConstructorBase { + YDB_READONLY(ui32, ShardsCount, 1); + YDB_READONLY_DEF(TString, Name); + YDB_OPT(NKikimrSchemeOp::TColumnDataLifeCycle, TtlSettings); + YDB_OPT(NKikimrSchemeOp::TColumnTableSharding, Sharding); - if (!TOlapSchema::UpdateProto(proto, errStr)) { - return false; - } - bool allowNullableKeys = false; - return schema.Parse(proto, errStr, allowNullableKeys); -} +public: + bool Deserialize(const NKikimrSchemeOp::TColumnTableDescription& description, IErrorCollector& errors) { + Name = description.GetName(); + if (description.HasRESERVED_TtlSettingsPresetName() || description.HasRESERVED_TtlSettingsPresetId()) { + errors.AddError("TTL presets are not supported"); + return false; + } -bool SetSharding(const TOlapSchema& schema, NKikimrSchemeOp::TColumnTableDescription& op, - TColumnTableInfo::TPtr tableInfo, - TEvSchemeShard::EStatus& status, TString& errStr) -{ - ui32 shardsCount = Max(ui32(1), op.GetColumnShardCount()); - if (op.HasSharding()) { - tableInfo->Sharding = std::move(*op.MutableSharding()); - } else if (shardsCount < 2) { - tableInfo->Sharding.MutableRandomSharding(); - } else { - status = NKikimrScheme::StatusSchemeError; - errStr = Sprintf("Sharding is not set"); - return false; - } + if (description.HasRESERVED_TtlSettingsPresetName() || description.HasRESERVED_TtlSettingsPresetId()) { + errors.AddError("TTL presets are not supported"); + return false; + } - op.ClearSharding(); + ShardsCount = Max(ui32(1), description.GetColumnShardCount()); + if (description.HasSharding()) { + Sharding = description.GetSharding(); + } - switch (tableInfo->Sharding.Method_case()) { - case NKikimrSchemeOp::TColumnTableSharding::kRandomSharding: { - // Random sharding implies non-unique primary key - if (shardsCount > 1) { - tableInfo->Sharding.SetUniquePrimaryKey(false); - } - break; + if (!Sharding && ShardsCount > 1) { + errors.AddError("Sharding is not set"); + return false; + } + + if (!DoDeserialize(description, errors)) { + return false; } - case NKikimrSchemeOp::TColumnTableSharding::kHashSharding: { - auto& sharding = *tableInfo->Sharding.MutableHashSharding(); - if (sharding.ColumnsSize() == 0) { - status = NKikimrScheme::StatusSchemeError; - errStr = Sprintf("Hash sharding requires a non-empty list of columns"); + + if (description.HasTtlSettings()) { + TtlSettings = description.GetTtlSettings(); + if (!GetSchema().ValidateTtlSettings(description.GetTtlSettings(), errors)) { return false; } - bool keysOnly = true; - for (const TString& columnName : sharding.GetColumns()) { - auto* pColumn = schema.FindColumnByName(columnName); - if (!pColumn) { - status = NKikimrScheme::StatusSchemeError; - errStr = Sprintf("Hash sharding is using an unknown column '%s'", columnName.c_str()); + } + return true; + } + + TColumnTableInfo::TPtr BuildTableInfo(IErrorCollector& errors) const { + TColumnTableInfo::TPtr tableInfo = new TColumnTableInfo; + tableInfo->AlterVersion = 1; + + BuildDescription(tableInfo->Description); + tableInfo->Description.SetColumnShardCount(ShardsCount); + tableInfo->Description.SetName(Name); + if (HasTtlSettings()) { + tableInfo->Description.MutableTtlSettings()->CopyFrom(GetTtlSettingsUnsafe()); + tableInfo->Description.MutableTtlSettings()->SetVersion(1); + } + if (!SetSharding(GetSchema(), tableInfo, errors)) { + return nullptr; + } + return tableInfo; + } + +private: + virtual void BuildDescription(NKikimrSchemeOp::TColumnTableDescription& description) const = 0; + virtual bool DoDeserialize(const NKikimrSchemeOp::TColumnTableDescription& description, IErrorCollector& errors) = 0; + virtual const TOlapSchema& GetSchema() const = 0; + + bool SetSharding(const TOlapSchema& schema, TColumnTableInfo::TPtr tableInfo, IErrorCollector& errors) const { + if (Sharding) { + tableInfo->Sharding = *Sharding; + } else { + Y_VERIFY(ShardsCount == 1); + tableInfo->Sharding.MutableRandomSharding(); + }; + + switch (tableInfo->Sharding.Method_case()) { + case NKikimrSchemeOp::TColumnTableSharding::kRandomSharding: { + // Random sharding implies non-unique primary key + if (ShardsCount > 1) { + tableInfo->Sharding.SetUniquePrimaryKey(false); + } + break; + } + case NKikimrSchemeOp::TColumnTableSharding::kHashSharding: { + auto& sharding = *tableInfo->Sharding.MutableHashSharding(); + if (sharding.ColumnsSize() == 0) { + errors.AddError(Sprintf("Hash sharding requires a non-empty list of columns")); return false; } - if (!pColumn->IsKeyColumn()) { - keysOnly = false; + bool keysOnly = true; + for (const TString& columnName : sharding.GetColumns()) { + auto* pColumn = schema.GetColumnByName(columnName); + if (!pColumn) { + errors.AddError(Sprintf("Hash sharding is using an unknown column '%s'", columnName.c_str())); + return false; + } + if (!pColumn->IsKeyColumn()) { + keysOnly = false; + } } + sharding.SetUniqueShardKey(true); + tableInfo->Sharding.SetUniquePrimaryKey(keysOnly); + break; + } + default: { + errors.AddError("Unsupported sharding method"); + return false; } - sharding.SetUniqueShardKey(true); - tableInfo->Sharding.SetUniquePrimaryKey(keysOnly); - break; - } - default: { - status = NKikimrScheme::StatusSchemeError; - errStr = "Unsupported sharding method"; - return false; } + return true; } - return true; -} +}; -bool CheckSupported(const NKikimrSchemeOp::TColumnTableDescription& op, - TEvSchemeShard::EStatus& status, TString& errStr) -{ - if (op.HasRESERVED_TtlSettingsPresetName() || op.HasRESERVED_TtlSettingsPresetId()) { - status = NKikimrScheme::StatusSchemeError; - errStr = "TTL presets are not supported"; - return false; - } - if (op.HasRESERVED_TtlSettingsPresetName() || op.HasRESERVED_TtlSettingsPresetId()) { - status = NKikimrScheme::StatusSchemeError; - errStr = "TTL presets are not supported"; - return false; - } - return true; -} +class TOlapPresetConstructor : public TTableConstructorBase { + YDB_READONLY(ui32, PresetId, 0); + YDB_READONLY(TString, PresetName, "default"); -TColumnTableInfo::TPtr CreateColumnTableInStore( - TColumnTableInfo::TPtr& tableInfo, - TOlapStoreInfo::TPtr storeInfo, - ui32 columnShardCount, - TEvSchemeShard::EStatus& status, TString& errStr) -{ - auto& op = tableInfo->Description; + const TOlapStoreInfo& StoreInfo; +public: + TOlapPresetConstructor(const TOlapStoreInfo& storeInfo) + : StoreInfo(storeInfo) + {} + + bool DoDeserialize(const NKikimrSchemeOp::TColumnTableDescription& description, IErrorCollector& errors) override { + if (description.GetColumnShardCount() > StoreInfo.ColumnShards.size()) { + errors.AddError(Sprintf("Cannot create table with %" PRIu32 " column shards, only %" PRIu32 " are available", + description.GetColumnShardCount(), ui32(StoreInfo.ColumnShards.size()))); + return false; + } - if (!CheckSupported(op, status, errStr)) { - return nullptr; + if (description.HasSchemaPresetId()) { + PresetId = description.GetSchemaPresetId(); + if (!StoreInfo.SchemaPresets.contains(PresetId)) { + errors.AddError(Sprintf("Specified schema preset %" PRIu32 " does not exist in tablestore", PresetId)); + return false; + } + PresetName = StoreInfo.SchemaPresets.at(PresetId).GetName(); + } else { + if (description.HasSchemaPresetName()) { + PresetName = description.GetSchemaPresetName(); + } + if (!StoreInfo.SchemaPresetByName.contains(PresetName)) { + errors.AddError(Sprintf("Specified schema preset '%s' does not exist in tablestore", PresetName.c_str())); + return false; + } + PresetId = StoreInfo.SchemaPresetByName.at(PresetName); + Y_VERIFY(StoreInfo.SchemaPresets.contains(PresetId)); + } + + if (description.HasSchema()) { + if (!GetSchema().Validate(description.GetSchema(), errors)) { + return false; + } + } + return true; } - if (!op.HasSchemaPresetName() && !op.HasSchemaPresetId()) { - op.SetSchemaPresetName("default"); +private: + void BuildDescription(NKikimrSchemeOp::TColumnTableDescription& description) const override { + description.SetSchemaPresetId(PresetId); + description.SetSchemaPresetName(PresetName); } - const TOlapSchema* pSchema = nullptr; + const TOlapSchema& GetSchema() const override { + return StoreInfo.SchemaPresets.at(PresetId); + }; +}; - if (op.HasSchemaPresetName()) { - const TString presetName = op.GetSchemaPresetName(); - if (!storeInfo->SchemaPresetByName.contains(presetName)) { - status = NKikimrScheme::StatusSchemeError; - errStr = Sprintf("Specified schema preset '%s' does not exist in tablestore", presetName.c_str()); - return nullptr; - } - const ui32 presetId = storeInfo->SchemaPresetByName.at(presetName); - if (!op.HasSchemaPresetId()) { - op.SetSchemaPresetId(presetId); - } - if (op.GetSchemaPresetId() != presetId) { - status = NKikimrScheme::StatusSchemeError; - errStr = Sprintf("Specified schema preset '%s' and id %" PRIu32 " do not match in tablestore", presetName.c_str(), presetId); - return nullptr; - } - pSchema = &storeInfo->SchemaPresets.at(presetId); - } else if (op.HasSchemaPresetId()) { - const ui32 presetId = op.GetSchemaPresetId(); - if (!storeInfo->SchemaPresets.contains(presetId)) { - status = NKikimrScheme::StatusSchemeError; - errStr = Sprintf("Specified schema preset %" PRIu32 " does not exist in tablestore", presetId); - return nullptr; +class TOlapTableConstructor : public TTableConstructorBase { + TOlapSchema TableSchema; + bool HasDataChannels = false; +private: + bool DoDeserialize(const NKikimrSchemeOp::TColumnTableDescription& description, IErrorCollector& errors) override { + if (description.HasSchemaPresetName() || description.HasSchemaPresetId()) { + errors.AddError("Schema presets are not supported for standalone column tables"); + return false; } - const TString& presetName = storeInfo->SchemaPresets.at(presetId).Name; - op.SetSchemaPresetName(presetName); - pSchema = &storeInfo->SchemaPresets.at(presetId); - } - Y_VERIFY(pSchema, "No schema preset id/name for in-store column table"); - - if (op.HasSchema()) { - auto& opSchema = op.GetSchema(); - if (!pSchema->Validate(opSchema, status, errStr)) { - return nullptr; + if (!description.HasSchema()) { + errors.AddError("No schema for column table specified"); + return false; } - op.ClearSchema(); - } + HasDataChannels = description.GetStorageConfig().HasDataChannelCount(); - if (op.HasTtlSettings()) { - op.MutableTtlSettings()->SetVersion(1); - } + TOlapSchemaUpdate schemaDiff; + if (!schemaDiff.Parse(description.GetSchema(), errors)) { + return false; + } - // Validate ttl settings and schema compatibility - if (op.HasTtlSettings()) { - if (!ValidateTtlSettings(op.GetTtlSettings(), pSchema->Columns, pSchema->ColumnsByName, errStr)) { - status = NKikimrScheme::StatusSchemeError; - return nullptr; + if (!TableSchema.Update(schemaDiff, errors)) { + return false; } + return true; } - if (!SetSharding(*pSchema, op, tableInfo, status, errStr)) { - return nullptr; +private: + void BuildDescription(NKikimrSchemeOp::TColumnTableDescription& description) const override { + if (HasDataChannels) { + description.MutableStorageConfig()->SetDataChannelCount(1); + } + TableSchema.Serialize(*description.MutableSchema()); } - if (columnShardCount > storeInfo->ColumnShards.size()) { - status = NKikimrScheme::StatusSchemeError; - errStr = Sprintf("Cannot create table with %" PRIu32 " column shards, only %" PRIu32 " are available", - columnShardCount, ui32(storeInfo->ColumnShards.size())); - return nullptr; - } + const TOlapSchema& GetSchema() const override { + return TableSchema; + }; +}; - return tableInfo; -} void SetShardingTablets( TColumnTableInfo::TPtr tableInfo, const TVector<TShardIdx>& columnShards, ui32 columnShardCount, bool shuffle, - TSchemeShard* ss) + const TSchemeShard& ss) { tableInfo->ColumnShards.reserve(columnShards.size()); for (const auto& shardIdx : columnShards) { - auto* shardInfo = ss->ShardInfos.FindPtr(shardIdx); + auto* shardInfo = ss.ShardInfos.FindPtr(shardIdx); Y_VERIFY(shardInfo, "ColumnShard not found"); tableInfo->ColumnShards.push_back(shardInfo->TabletID.GetValue()); } @@ -203,61 +243,9 @@ void SetShardingTablets( for (ui64 columnShard : tableInfo->ColumnShards) { tableInfo->Sharding.AddColumnShards(columnShard); } - tableInfo->Sharding.ClearAdditionalColumnShards(); } -TColumnTableInfo::TPtr CreateColumnTable( - TColumnTableInfo::TPtr& tableInfo, - TEvSchemeShard::EStatus& status, TString& errStr) -{ - auto& op = tableInfo->Description; - - if (!CheckSupported(op, status, errStr)) { - return nullptr; - } - - if (op.HasSchemaPresetName() || op.HasSchemaPresetId()) { - status = NKikimrScheme::StatusSchemeError; - errStr = "Schema presets are not supported for standalone column tables"; - return nullptr; - } - - if (!op.HasSchema()) { - status = NKikimrScheme::StatusSchemeError; - errStr = "No schema for column table specified"; - return nullptr; - } - - NKikimrSchemeOp::TColumnTableSchema* opSchema = op.MutableSchema(); - tableInfo->Schema = TOlapSchema(); - auto& schema = *tableInfo->Schema; - - if (!PrepareSchema(*opSchema, schema, errStr)) { - status = NKikimrScheme::StatusSchemeError; - return nullptr; - } - - if (op.HasTtlSettings()) { - op.MutableTtlSettings()->SetVersion(1); - - if (!ValidateTtlSettings(op.GetTtlSettings(), schema.Columns, schema.ColumnsByName, errStr)) { - status = NKikimrScheme::StatusSchemeError; - return nullptr; - } - } - - if (!SetSharding(schema, op, tableInfo, status, errStr)) { - return nullptr; - } - - if (!op.GetStorageConfig().HasDataChannelCount()) { - op.MutableStorageConfig()->SetDataChannelCount(1); - } - - return tableInfo; -} - class TConfigureParts: public TSubOperationState { private: @@ -287,8 +275,7 @@ public: DebugHint() << " ProgressState" << " at tabletId# " << ssId); - TTxState* txState = context.SS->FindTx(OperationId); - Y_VERIFY(txState->TxType == TTxState::TxCreateColumnTable); + TTxState* txState = context.SS->FindTxSafe(OperationId, TTxState::TxCreateColumnTable); TPathId pathId = txState->TargetPathId; TPath path = TPath::Init(pathId, context.SS); @@ -299,13 +286,13 @@ public: txState->ClearShardsInProgress(); - auto seqNo = context.SS->StartRound(*txState); - Y_VERIFY(tableInfo->ColumnShards.empty() || tableInfo->OwnedColumnShards.empty()); TString columnShardTxBody; + auto seqNo = context.SS->StartRound(*txState); NKikimrTxColumnShard::TSchemaTxBody tx; context.SS->FillSeqNo(tx, seqNo); + { NKikimrTxColumnShard::TCreateTable* create{}; if (tableInfo->IsStandalone()) { @@ -338,7 +325,7 @@ public: Y_VERIFY(storeInfo->SchemaPresets.contains(presetId), "Failed to find schema preset %" PRIu32 " in a tablestore", presetId); auto& preset = storeInfo->SchemaPresets.at(presetId); - size_t presetIndex = preset.ProtoIndex; + size_t presetIndex = preset.GetProtoIndex(); create->MutableSchemaPreset()->CopyFrom(storeInfo->Description.GetSchemaPresets(presetIndex)); } @@ -422,7 +409,7 @@ public: auto table = context.SS->ColumnTables.TakeAlterVerified(pathId); if (table->IsStandalone()) { Y_VERIFY(table->ColumnShards.empty()); - SetShardingTablets(table.GetPtr(), table->OwnedColumnShards, table->OwnedColumnShards.size(), false, context.SS); + SetShardingTablets(table.GetPtr(), table->OwnedColumnShards, table->OwnedColumnShards.size(), false, *context.SS); } context.SS->PersistColumnTableAlterRemove(db, pathId); @@ -695,26 +682,26 @@ public: return result; } - TColumnTableInfo::TPtr tableInfo = new TColumnTableInfo; - { - tableInfo->AlterVersion = 1; - tableInfo->Description.CopyFrom(createDescription); - // Don't allow users to set these fields - tableInfo->Description.ClearSchemaPresetVersionAdj(); - tableInfo->Description.ClearTtlSettingsPresetVersionAdj(); - } - + TProposeErrorCollector errors(*result); + TColumnTableInfo::TPtr tableInfo; if (storeInfo) { - tableInfo = CreateColumnTableInStore(tableInfo, storeInfo, shardsCount, status, errStr); + TOlapPresetConstructor tableConstructor(*storeInfo); + if (!tableConstructor.Deserialize(createDescription, errors)) { + return result; + } + tableInfo = tableConstructor.BuildTableInfo(errors); if (tableInfo) { - SetShardingTablets(tableInfo, storeInfo->ColumnShards, shardsCount, true, context.SS); + SetShardingTablets(tableInfo, storeInfo->ColumnShards, shardsCount, true, *context.SS); } } else { - tableInfo = CreateColumnTable(tableInfo, status, errStr); + TOlapTableConstructor tableConstructor; + if (!tableConstructor.Deserialize(createDescription, errors)) { + return result; + } + tableInfo = tableConstructor.BuildTableInfo(errors); } if (!tableInfo) { - result->SetError(status, errStr); return result; } @@ -733,7 +720,6 @@ public: TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxCreateColumnTable, pathId); - Y_VERIFY(tableInfo); if (storeInfo) { auto olapStorePath = parentPath.FindOlapStore(); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index f2d8653426e..565551d6998 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -3838,7 +3838,7 @@ NKikimrSchemeOp::TPathVersion TSchemeShard::GetPathVersion(const TPath& path) co Y_VERIFY(OlapStores.contains(*tableInfo->OlapStorePathId)); auto& storeInfo = OlapStores.at(*tableInfo->OlapStorePathId); auto& preset = storeInfo->SchemaPresets.at(tableInfo->Description.GetSchemaPresetId()); - result.SetColumnTableSchemaVersion(tableInfo->Description.GetSchemaPresetVersionAdj() + preset.Version); + result.SetColumnTableSchemaVersion(tableInfo->Description.GetSchemaPresetVersionAdj() + preset.GetVersion()); } else { result.SetColumnTableSchemaVersion(tableInfo->Description.GetSchemaPresetVersionAdj()); } @@ -4455,6 +4455,13 @@ TTxState *TSchemeShard::FindTx(TOperationId opId) { return txState; } +TTxState* TSchemeShard::FindTxSafe(TOperationId opId, const TTxState::ETxType& txType) { + TTxState* txState = FindTx(opId); + Y_VERIFY(txState); + Y_VERIFY(txState->TxType == txType); + return txState; +} + void TSchemeShard::RemoveTx(const TActorContext &ctx, NIceDb::TNiceDb &db, TOperationId opId, TTxState *txState) { if (!txState) { txState = TxInFlight.FindPtr(opId); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 75a3701fd80..5c8e30adf03 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -517,6 +517,7 @@ public: TTxState& CreateTx(TOperationId opId, TTxState::ETxType txType, TPathId targetPath, TPathId sourcePath = InvalidPathId); TTxState* FindTx(TOperationId opId); + TTxState* FindTxSafe(TOperationId opId, const TTxState::ETxType& txType); void RemoveTx(const TActorContext &ctx, NIceDb::TNiceDb& db, TOperationId opId, TTxState* txState); static TPathElement::EPathState CalcPathState(TTxState::ETxType txType, TPathElement::EPathState oldState); diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp index 9eb71f22fc8..92f421ce352 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp @@ -1996,314 +1996,6 @@ NKikimr::NSchemeShard::TBillingStats::operator bool() const { return Rows || Bytes; } -bool TOlapSchema::UpdateProto(NKikimrSchemeOp::TColumnTableSchema& proto, TString& errStr) { - ui32 nextColumnId = proto.GetNextColumnId(); - - if (proto.ColumnsSize() && !proto.HasEngine()) { - proto.SetEngine(NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES); - } - - const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry; - - for (auto& colProto : *proto.MutableColumns()) { - auto& colName = colProto.GetName(); - - if (!colProto.HasId()) { - colProto.SetId(nextColumnId++); - } else if (colProto.GetId() <= 0 || colProto.GetId() >= nextColumnId) { - errStr = Sprintf("Column id is incorrect"); - return false; - } - - if (colProto.HasTypeId()) { - errStr = Sprintf("Cannot set TypeId for column '%s', use Type", colName.c_str()); - return false; - } - if (!colProto.HasType()) { - errStr = Sprintf("Missing Type for column '%s'", colName.c_str()); - return false; - } - - auto typeName = NMiniKQL::AdaptLegacyYqlType(colProto.GetType()); - const NScheme::IType* type = typeRegistry->GetType(typeName); - NScheme::TTypeInfo typeInfo; - if (type) { - if (!NScheme::NTypeIds::IsYqlType(type->GetTypeId())) { - errStr = Sprintf("Type '%s' specified for column '%s' is not supported", - colProto.GetType().c_str(), colName.c_str()); - return false; - } - typeInfo = NScheme::TTypeInfo(type->GetTypeId()); - } else { -#if 0 // TODO: support PG types in ColumnShard - auto* typeDesc = NPg::TypeDescFromPgTypeName(typeName); - if (!typeDesc) { - errStr = Sprintf("Type '%s' specified for column '%s' is not supported", - colProto.GetType().c_str(), colName.c_str()); - return false; - } - typeInfo = NScheme::TTypeInfo(NScheme::NTypeIds::Pg, typeDesc); -#else - errStr = Sprintf("Type '%s' specified for column '%s' is not supported", - colProto.GetType().c_str(), colName.c_str()); - return false; -#endif - } - - auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(typeInfo, ""); - colProto.SetTypeId(columnType.TypeId); - if (columnType.TypeInfo) { - *colProto.MutableTypeInfo() = *columnType.TypeInfo; - } - } - - proto.SetNextColumnId(nextColumnId); - return true; -} - -bool TOlapSchema::IsAllowedType(ui32 typeId) { - if (!NScheme::NTypeIds::IsYqlType(typeId)) { - return false; - } - - switch (typeId) { - case NYql::NProto::Bool: - case NYql::NProto::Interval: - case NYql::NProto::Decimal: - case NYql::NProto::DyNumber: - return false; - default: - break; - } - return true; -} - -bool TOlapSchema::IsAllowedFirstPkType(ui32 typeId) { - switch (typeId) { - //case NYql::NProto::Bool - case NYql::NProto::Uint8: // Byte - case NYql::NProto::Int32: - case NYql::NProto::Uint32: - case NYql::NProto::Int64: - case NYql::NProto::Uint64: - //case NYql::NProto::Float: - //case NYql::NProto::Double: - case NYql::NProto::String: - case NYql::NProto::Utf8: - //case NYql::NProto::Yson: - //case NYql::NProto::Json: - //case NYql::NProto::JsonDocument: - case NYql::NProto::Date: - case NYql::NProto::Datetime: - case NYql::NProto::Timestamp: - //case NYql::NProto::Interval: - //case NYql::NProto::Decimal: - //case NYql::NProto::DyNumber: - return true; - default: - break; - } - return false; -} - -bool TOlapSchema::Parse(const NKikimrSchemeOp::TColumnTableSchema& proto, TString& errStr, bool allowNullableKeys) { - NextColumnId = proto.GetNextColumnId(); - - if (proto.GetKeyColumnNames().empty()) { - errStr = Sprintf("No primary key specified"); - return false; - } - - auto firstPkKey = proto.GetKeyColumnNames()[0]; - - Columns.clear(); - for (auto& colProto : proto.GetColumns()) { - if (colProto.GetName().empty()) { - errStr = Sprintf("Columns cannot have an empty name"); - return false; - } - - ui32 colId = colProto.GetId(); - if (Columns.contains(colId)) { - errStr = Sprintf("Duplicate column id %" PRIu32 " for column '%s'", colId, colProto.GetName().c_str()); - return false; - } - auto& col = Columns[colId]; - col.Id = colId; - col.Name = colProto.GetName(); - col.NotNull = colProto.GetNotNull(); - - if (ColumnsByName.contains(col.Name)) { - errStr = Sprintf("Duplicate column '%s'", col.Name.c_str()); - return false; - } - - if (!colProto.HasType()) { - errStr = TStringBuilder() << "Missing Type for column '" << col.Name << "'"; - return false; - } - - if (!colProto.HasTypeId()) { - errStr = TStringBuilder() << "No generated TypeId for column '" << col.Name << "'"; - return false; - } - - if (col.Name == firstPkKey && !IsAllowedFirstPkType(colProto.GetTypeId())) { - errStr = TStringBuilder() - << "Type '" << colProto.GetType() << "' specified for column '" << col.Name - << "' is not supported in first PK position"; - return false; - } else if (!IsAllowedType(colProto.GetTypeId())) { - errStr = TStringBuilder() - << "Type '" << colProto.GetType() << "' specified for column '" << col.Name << "' is not supported"; - return false; - } - - if (colProto.HasTypeInfo()) { - auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(colProto.GetTypeId(), &colProto.GetTypeInfo()); - col.Type = typeInfoMod.TypeInfo; - } else { - col.Type = NScheme::TTypeInfo(colProto.GetTypeId()); - } - - ColumnsByName[col.Name] = col.Id; - } - - if (Columns.empty()) { - errStr = Sprintf("At least one column is required"); - return false; - } - - KeyColumnIds.clear(); - for (const TString& keyName : proto.GetKeyColumnNames()) { - auto* col = FindColumnByName(keyName); - if (!col) { - errStr = Sprintf("Unknown key column '%s'", keyName.c_str()); - return false; - } - if (!col->NotNull && !allowNullableKeys) { - errStr = Sprintf("Nullable key column '%s'", keyName.c_str()); - return false; - } - if (col->IsKeyColumn()) { - errStr = Sprintf("Duplicate key column '%s'", keyName.c_str()); - return false; - } - col->KeyOrder = KeyColumnIds.size(); - KeyColumnIds.push_back(col->Id); - } - - if (KeyColumnIds.empty()) { - errStr = "At least one key column is required"; - return false; - } - - Engine = proto.GetEngine(); - return true; -} - -bool TOlapSchema::Validate(const NKikimrSchemeOp::TColumnTableSchema& opSchema, - TEvSchemeShard::EStatus& status, TString& errStr) const -{ - const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry; - - ui32 lastColumnId = 0; - THashSet<ui32> usedColumns; - for (const auto& colProto : opSchema.GetColumns()) { - if (colProto.GetName().empty()) { - status = NKikimrScheme::StatusSchemeError; - errStr = "Columns cannot have an empty name"; - return false; - } - const TString& colName = colProto.GetName(); - auto* col = FindColumnByName(colName); - if (!col) { - status = NKikimrScheme::StatusSchemeError; - errStr = TStringBuilder() - << "Column '" << colName << "' does not match schema preset"; - return false; - } - if (colProto.HasId() && colProto.GetId() != col->Id) { - status = NKikimrScheme::StatusSchemeError; - errStr = TStringBuilder() - << "Column '" << colName << "' has id " << colProto.GetId() << " that does not match schema preset"; - return false; - } - - if (!usedColumns.insert(col->Id).second) { - status = NKikimrScheme::StatusSchemeError; - errStr = TStringBuilder() << "Column '" << colName << "' is specified multiple times"; - return false; - } - if (col->Id < lastColumnId) { - status = NKikimrScheme::StatusSchemeError; - errStr = "Column order does not match schema preset"; - return false; - } - lastColumnId = col->Id; - - if (colProto.HasTypeId()) { - status = NKikimrScheme::StatusSchemeError; - errStr = TStringBuilder() << "Cannot set TypeId for column '" << colName << "', use Type"; - return false; - } - if (!colProto.HasType()) { - status = NKikimrScheme::StatusSchemeError; - errStr = TStringBuilder() << "Missing Type for column '" << colName << "'"; - return false; - } - - auto typeName = NMiniKQL::AdaptLegacyYqlType(colProto.GetType()); - const NScheme::IType* type = typeRegistry->GetType(typeName); - if (!type || !IsAllowedType(type->GetTypeId())) { - status = NKikimrScheme::StatusSchemeError; - errStr = TStringBuilder() - << "Type '" << colProto.GetType() << "' specified for column '" << colName << "' is not supported"; - return false; - } - NScheme::TTypeInfo typeInfo(type->GetTypeId()); - - if (typeInfo != col->Type) { - status = NKikimrScheme::StatusSchemeError; - errStr = TStringBuilder() - << "Type '" << TypeName(typeInfo) << "' specified for column '" << colName - << "' does not match schema preset type '" << TypeName(col->Type) << "'"; - return false; - } - } - - for (auto& pr : Columns) { - if (!usedColumns.contains(pr.second.Id)) { - status = NKikimrScheme::StatusSchemeError; - errStr = "Specified schema is missing some schema preset columns"; - return false; - } - } - - TVector<ui32> keyColumnIds; - for (const TString& keyName : opSchema.GetKeyColumnNames()) { - auto* col = FindColumnByName(keyName); - if (!col) { - status = NKikimrScheme::StatusSchemeError; - errStr = TStringBuilder() << "Unknown key column '" << keyName << "'"; - return false; - } - keyColumnIds.push_back(col->Id); - } - if (keyColumnIds != KeyColumnIds) { - status = NKikimrScheme::StatusSchemeError; - errStr = "Specified schema key columns not matching schema preset"; - return false; - } - - if (opSchema.GetEngine() != Engine) { - status = NKikimrScheme::StatusSchemeError; - errStr = "Specified schema engine does not match schema preset"; - return false; - } - return true; -} - TOlapStoreInfo::TOlapStoreInfo( ui64 alterVersion, NKikimrSchemeOp::TColumnStoreDescription&& description, @@ -2316,38 +2008,11 @@ TOlapStoreInfo::TOlapStoreInfo( { size_t schemaPresetIndex = 0; for (const auto& presetProto : Description.GetSchemaPresets()) { - Y_VERIFY(presetProto.HasId()); - Y_VERIFY(presetProto.HasName()); - Y_VERIFY(presetProto.HasSchema()); Y_VERIFY(!SchemaPresets.contains(presetProto.GetId())); auto& preset = SchemaPresets[presetProto.GetId()]; - preset.Id = presetProto.GetId(); - preset.Name = presetProto.GetName(); - preset.ProtoIndex = schemaPresetIndex++; - SchemaPresetByName[preset.Name] = preset.Id; - for (const auto& colProto : presetProto.GetSchema().GetColumns()) { - Y_VERIFY(colProto.HasId()); - Y_VERIFY(colProto.HasName()); - Y_VERIFY(!preset.Columns.contains(colProto.GetId())); - Y_VERIFY(!preset.ColumnsByName.contains(colProto.GetName())); - auto& col = preset.Columns[colProto.GetId()]; - col.Id = colProto.GetId(); - col.Name = colProto.GetName(); - auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(colProto.GetTypeId(), - colProto.HasTypeInfo() ? &colProto.GetTypeInfo() : nullptr); - col.Type = typeInfoMod.TypeInfo; - preset.ColumnsByName[col.Name] = col.Id; - } - for (const auto& keyName : presetProto.GetSchema().GetKeyColumnNames()) { - Y_VERIFY(preset.ColumnsByName.contains(keyName)); - auto& col = *preset.FindColumnByName(keyName); - Y_VERIFY(col.KeyOrder == Max<ui32>()); - col.KeyOrder = preset.KeyColumnIds.size(); - preset.KeyColumnIds.push_back(col.Id); - } - preset.Engine = presetProto.GetSchema().GetEngine(); - preset.NextColumnId = presetProto.GetSchema().GetNextColumnId(); - preset.Version = presetProto.GetSchema().GetVersion(); + preset.ParseFromLocalDB(presetProto); + preset.SetProtoIndex(schemaPresetIndex++); + SchemaPresetByName[preset.GetName()] = preset.GetId(); } for (const auto& shardIdx : Sharding.GetColumnShards()) { @@ -2376,9 +2041,8 @@ TColumnTableInfo::TColumnTableInfo( } if (Description.HasSchema()) { - Schema = TOlapSchema(); - TString strError; - Y_VERIFY((*Schema).Parse(Description.GetSchema(), strError, true), "Cannot parse column table schema"); + TOlapSchema schema; + schema.Parse(Description.GetSchema()); } ColumnShards.reserve(Sharding.GetColumnShards().size()); diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 2aa3bd2a028..0623ec830bd 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -5,6 +5,7 @@ #include "schemeshard_tx_infly.h" #include "schemeshard_path_element.h" #include "schemeshard_identificators.h" +#include "schemeshard_olap_types.h" #include <ydb/core/tx/message_seqno.h> #include <ydb/core/tx/datashard/datashard.h> @@ -846,64 +847,6 @@ public: } }; -struct TOlapSchema { - struct TColumn { - ui32 Id = Max<ui32>(); - TString Name; - NScheme::TTypeInfo Type; - ui32 KeyOrder = Max<ui32>(); - bool NotNull = false; - // TODO: DefaultValue - - bool IsKeyColumn() const { return KeyOrder != Max<ui32>(); } - }; - - using TColumns = THashMap<ui32, TColumn>; - using TColumnsByName = THashMap<TString, ui32>; - - TColumns Columns; - TColumnsByName ColumnsByName; - TVector<ui32> KeyColumnIds; - NKikimrSchemeOp::EColumnTableEngine Engine = NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES; - ui32 NextColumnId = 1; - ui64 Version = 1; - - const TColumn* FindColumnByName(const TString& name) const noexcept { - auto it = ColumnsByName.find(name); - if (it != ColumnsByName.end()) { - return &Columns.at(it->second); - } - return nullptr; - } - - TColumn* FindColumnByName(const TString& name) noexcept { - auto it = ColumnsByName.find(name); - if (it != ColumnsByName.end()) { - return &Columns.at(it->second); - } - return nullptr; - } - - static bool UpdateProto(NKikimrSchemeOp::TColumnTableSchema& proto, TString& errStr); - static bool IsAllowedType(ui32 typeId); - static bool IsAllowedFirstPkType(ui32 typeId); - - bool Parse(const NKikimrSchemeOp::TColumnTableSchema& proto, TString& errStr, bool allowNullableKeys); - bool Validate(const NKikimrSchemeOp::TColumnTableSchema& opSchema, TEvSchemeShard::EStatus& status, TString& errStr) const; -}; - -struct TOlapStoreSchemaPreset : public TOlapSchema { - ui32 Id; - TString Name; - - // Preset index in the olap store description - size_t ProtoIndex = -1; -}; - -struct TOlapTtlSettings { - // TODO: add parsed settings - ui64 Version = 1; -}; struct TOlapStoreInfo : TSimpleRefCount<TOlapStoreInfo> { using TPtr = TIntrusivePtr<TOlapStoreInfo>; @@ -952,7 +895,6 @@ struct TColumnTableInfo : TSimpleRefCount<TColumnTableInfo> { TMaybe<NKikimrSchemeOp::TAlterColumnTable> AlterBody; TMaybe<TPathId> OlapStorePathId; // PathId of the table store - TMaybe<TOlapSchema> Schema; // schema for standalone table TVector<ui64> ColumnShards; // Current list of column shards TVector<TShardIdx> OwnedColumnShards; @@ -2964,12 +2906,8 @@ bool ValidateTtlSettings(const NKikimrSchemeOp::TTTLSettings& ttl, const THashMap<ui32, TTableInfo::TColumn>& alterColumns, const THashMap<TString, ui32>& colName2Id, const TSubDomainInfo& subDomain, TString& errStr); -bool ValidateTtlSettings(const NKikimrSchemeOp::TColumnDataLifeCycle& ttl, - const THashMap<ui32, TOlapSchema::TColumn>& columns, - const THashMap<TString, ui32>& columnsByName, - TString& errStr); - } + } template <> diff --git a/ydb/core/tx/schemeshard/schemeshard_olap_types.cpp b/ydb/core/tx/schemeshard/schemeshard_olap_types.cpp new file mode 100644 index 00000000000..fb45a6fcba1 --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard_olap_types.cpp @@ -0,0 +1,407 @@ +#include "schemeshard_olap_types.h" +#include <ydb/library/yql/minikql/mkql_type_ops.h> +#include <ydb/core/scheme_types/scheme_type_registry.h> + +namespace NKikimr::NSchemeShard { + + void TOlapColumnSchema::Serialize(NKikimrSchemeOp::TOlapColumnDescription& columnSchema) const { + columnSchema.SetId(Id); + columnSchema.SetName(Name); + columnSchema.SetType(TypeName); + columnSchema.SetNotNull(NotNullFlag); + + auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(Type, ""); + columnSchema.SetTypeId(columnType.TypeId); + if (columnType.TypeInfo) { + *columnSchema.MutableTypeInfo() = *columnType.TypeInfo; + } + } + + void TOlapColumnSchema::ParseFromLocalDB(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema) { + Id = columnSchema.GetId(); + Name = columnSchema.GetName(); + TypeName = columnSchema.GetType(); + + if (columnSchema.HasTypeInfo()) { + Type = NScheme::TypeInfoModFromProtoColumnType( + columnSchema.GetTypeId(), &columnSchema.GetTypeInfo()) + .TypeInfo; + } else { + Type = NScheme::TypeInfoModFromProtoColumnType( + columnSchema.GetTypeId(), nullptr) + .TypeInfo; + } + NotNullFlag = columnSchema.GetNotNull(); + } + + bool TOlapColumnSchema::ParseFromRequest(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema, IErrorCollector& errors) { + if (!columnSchema.GetName()) { + errors.AddError("Columns cannot have an empty name"); + return false; + } + Name = columnSchema.GetName(); + NotNullFlag = columnSchema.GetNotNull(); + TypeName = columnSchema.GetType(); + + if (columnSchema.HasTypeId()) { + errors.AddError(TStringBuilder() << "Cannot set TypeId for column '" << Name << ", use Type"); + return false; + } + + if (!columnSchema.HasType()) { + errors.AddError(TStringBuilder() << "Missing Type for column '" << Name); + return false; + } + + auto typeName = NMiniKQL::AdaptLegacyYqlType(TypeName); + Y_VERIFY(AppData()->TypeRegistry); + const NScheme::IType* type = + AppData()->TypeRegistry->GetType(typeName); + if (!type) { + errors.AddError(TStringBuilder() << "Type '" << typeName << "' specified for column '" << Name << "' is not supported"); + return false; + } + if (!NScheme::NTypeIds::IsYqlType(type->GetTypeId())) { + errors.AddError(TStringBuilder() << "Type '" << typeName << "' specified for column '" << Name << "' is not supported"); + return false;; + } + Type = NScheme::TTypeInfo(type->GetTypeId()); + if (!TOlapSchema::IsAllowedType(type->GetTypeId())){ + errors.AddError(TStringBuilder() << "Type '" << typeName << "' specified for column '" << Name << "' is not supported"); + return false; + } + return true; + } + + + bool TOlapSchemaUpdate::Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema, IErrorCollector& errors, bool allowNullKeys) { + if (tableSchema.HasEngine()) { + Engine = tableSchema.GetEngine(); + } + + TSet<TString> keyColumnNames; + for (auto&& pkKey : tableSchema.GetKeyColumnNames()) { + if (keyColumnNames.contains(pkKey)) { + errors.AddError(Sprintf("Duplicate key column '%s'", pkKey.c_str())); + return false; + } + keyColumnNames.emplace(pkKey); + KeyColumnNames.emplace_back(pkKey); + } + + TSet<TString> columnNames; + for (auto& columnSchema : tableSchema.GetColumns()) { + TOlapColumnSchema column; + if (!column.ParseFromRequest(columnSchema, errors)) { + return false; + } + if (columnNames.contains(column.GetName())) { + errors.AddError(Sprintf("Duplicate column '%s'", column.GetName().c_str())); + return false; + } + if (!allowNullKeys) { + if (keyColumnNames.contains(column.GetName()) && !column.IsNotNull()) { + errors.AddError(Sprintf("Nullable key column '%s'", column.GetName().c_str())); + return false; + } + } + columnNames.emplace(column.GetName()); + Columns.emplace_back(std::move(column)); + } + return true; + } + + bool TOlapSchemaUpdate::Parse(const NKikimrSchemeOp::TAlterColumnTable& alterRequest, IErrorCollector& errors) { + TSet<TString> columnNames; + for (auto& columnSchema : alterRequest.GetAlterSchema().GetColumns()) { + TOlapColumnSchema column; + if (!column.ParseFromRequest(columnSchema, errors)) { + return false; + } + if (columnNames.contains(column.GetName())) { + errors.AddError(Sprintf("Duplicate column '%s'", column.GetName().c_str())); + return false; + } + if (column.IsNotNull()) { + errors.AddError("Not null updates not supported"); + return false; + } + columnNames.emplace(column.GetName()); + Columns.emplace_back(std::move(column)); + } + return true; + } + + bool TOlapSchema::Update(const TOlapSchemaUpdate& schemaUpdate, IErrorCollector& errors) { + if (Columns.empty() && schemaUpdate.GetColumns().empty()) { + errors.AddError("No columns specified"); + return false; + } + + if (KeyColumnIds.empty()) { + if (schemaUpdate.GetKeyColumnNames().empty()) { + errors.AddError("No primary key specified"); + return false; + } + } else { + if (!schemaUpdate.GetKeyColumnNames().empty()) { + errors.AddError("No primary key updates supported"); + return false; + } + } + + TMap<TString, ui32> keyIndexes; + for (ui32 i = 0; i < schemaUpdate.GetKeyColumnNames().size(); ++i) { + keyIndexes[schemaUpdate.GetKeyColumnNames()[i]] = i; + } + + if (!HasEngine()) { + Engine = schemaUpdate.GetEngineDef(NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES); + } else { + if (schemaUpdate.HasEngine()) { + errors.AddError("No engine updates supported"); + return false; + } + } + + for (auto&& column : schemaUpdate.GetColumns()) { + if (ColumnsByName.contains(column.GetName())) { + errors.AddError("No special column updates supported"); + return false; + } + TOlapColumnSchema newColumn = column; + newColumn.SetId(NextColumnId); + ++NextColumnId; + + if (keyIndexes.contains(newColumn.GetName())) { + auto keyOrder = keyIndexes.at(newColumn.GetName()); + if (keyOrder == 0) { + if (!IsAllowedFirstPkType(newColumn.GetType().GetTypeId())) { + errors.AddError(TStringBuilder() + << "Type '" << newColumn.GetType().GetTypeId() << "' specified for column '" << newColumn.GetName() + << "' is not supported in first PK position"); + return false; + } + } + newColumn.SetKeyOrder(keyOrder); + } + ColumnsByName[newColumn.GetName()] = newColumn.GetId(); + Columns[newColumn.GetId()] = std::move(newColumn); + } + + if (KeyColumnIds.empty()) { + TVector<ui32> keyColumnIds; + keyColumnIds.reserve(schemaUpdate.GetKeyColumnNames().size()); + for (auto&& columnName : schemaUpdate.GetKeyColumnNames()) { + auto it = ColumnsByName.find(columnName); + if (it == ColumnsByName.end()) { + errors.AddError("Invalid key column " + columnName); + return false; + } + keyColumnIds.push_back(it->second); + } + KeyColumnIds.swap(keyColumnIds); + } + return true; + } + + void TOlapSchema::Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema) { + NextColumnId = tableSchema.GetNextColumnId(); + Version = tableSchema.GetVersion(); + Y_VERIFY(tableSchema.HasEngine()); + Engine = tableSchema.GetEngine(); + + TMap<TString, ui32> keyIndexes; + ui32 idx = 0; + for (auto&& kName : tableSchema.GetKeyColumnNames()) { + keyIndexes[kName] = idx++; + } + + TVector<ui32> keyIds; + keyIds.resize(tableSchema.GetKeyColumnNames().size(), 0); + for (const auto& columnSchema : tableSchema.GetColumns()) { + TOlapColumnSchema column; + column.ParseFromLocalDB(columnSchema); + + if (keyIndexes.contains(column.GetName())) { + auto keyOrder = keyIndexes.at(column.GetName()); + column.SetKeyOrder(keyOrder); + Y_VERIFY(keyOrder < keyIds.size()); + keyIds[keyOrder] = column.GetId(); + } + ColumnsByName[column.GetName()] = column.GetId(); + Columns[column.GetId()] = std::move(column); + } + KeyColumnIds.swap(keyIds); + } + + void TOlapSchema::Serialize(NKikimrSchemeOp::TColumnTableSchema& tableSchema) const { + tableSchema.SetNextColumnId(NextColumnId); + tableSchema.SetVersion(Version); + + Y_VERIFY(HasEngine()); + tableSchema.SetEngine(GetEngineUnsafe()); + + for (const auto& column : Columns) { + column.second.Serialize(*tableSchema.AddColumns()); + } + + for (auto&& cId : KeyColumnIds) { + auto column = GetColumnById(cId); + Y_VERIFY(!!column); + *tableSchema.AddKeyColumnNames() = column->GetName(); + } + } + + bool TOlapSchema::Validate(const NKikimrSchemeOp::TColumnTableSchema& opSchema, IErrorCollector& errors) const { + const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry; + + ui32 lastColumnId = 0; + THashSet<ui32> usedColumns; + for (const auto& colProto : opSchema.GetColumns()) { + if (colProto.GetName().empty()) { + errors.AddError("Columns cannot have an empty name"); + return false; + } + const TString& colName = colProto.GetName(); + auto* col = GetColumnByName(colName); + if (!col) { + errors.AddError("Column '" + colName + "' does not match schema preset"); + return false; + } + if (colProto.HasId() && colProto.GetId() != col->GetId()) { + errors.AddError("Column '" + colName + "' has id " + colProto.GetId() + " that does not match schema preset"); + return false; + } + + if (!usedColumns.insert(col->GetId()).second) { + errors.AddError("Column '" + colName + "' is specified multiple times"); + return false; + } + if (col->GetId() < lastColumnId) { + errors.AddError("Column order does not match schema preset"); + return false; + } + lastColumnId = col->GetId(); + + if (colProto.HasTypeId()) { + errors.AddError("Cannot set TypeId for column '" + colName + "', use Type"); + return false; + } + if (!colProto.HasType()) { + errors.AddError("Missing Type for column '" + colName + "'"); + return false; + } + + auto typeName = NMiniKQL::AdaptLegacyYqlType(colProto.GetType()); + const NScheme::IType* type = typeRegistry->GetType(typeName); + if (!type || !IsAllowedType(type->GetTypeId())) { + errors.AddError("Type '" + colProto.GetType() + "' specified for column '" + colName + "' is not supported"); + return false; + } + NScheme::TTypeInfo typeInfo(type->GetTypeId()); + + if (typeInfo != col->GetType()) { + errors.AddError("Type '" + TypeName(typeInfo) + "' specified for column '" + colName + "' does not match schema preset type '" + TypeName(col->GetType()) + "'"); + return false; + } + } + + for (auto& pr : Columns) { + if (!usedColumns.contains(pr.second.GetId())) { + errors.AddError("Specified schema is missing some schema preset columns"); + return false; + } + } + + TVector<ui32> keyColumnIds; + for (const TString& keyName : opSchema.GetKeyColumnNames()) { + auto* col = GetColumnByName(keyName); + if (!col) { + errors.AddError("Unknown key column '" + keyName + "'"); + return false; + } + keyColumnIds.push_back(col->GetId()); + } + if (keyColumnIds != KeyColumnIds) { + errors.AddError("Specified schema key columns not matching schema preset"); + return false; + } + + if (opSchema.GetEngine() != Engine) { + errors.AddError("Specified schema engine does not match schema preset"); + return false; + } + return true; + } + + bool TOlapSchema::IsAllowedType(ui32 typeId) { + if (!NScheme::NTypeIds::IsYqlType(typeId)) { + return false; + } + + switch (typeId) { + case NYql::NProto::Bool: + case NYql::NProto::Interval: + case NYql::NProto::Decimal: + case NYql::NProto::DyNumber: + return false; + default: + break; + } + return true; + } + + bool TOlapSchema::IsAllowedFirstPkType(ui32 typeId) { + switch (typeId) { + case NYql::NProto::Uint8: // Byte + case NYql::NProto::Int32: + case NYql::NProto::Uint32: + case NYql::NProto::Int64: + case NYql::NProto::Uint64: + case NYql::NProto::String: + case NYql::NProto::Utf8: + case NYql::NProto::Date: + case NYql::NProto::Datetime: + case NYql::NProto::Timestamp: + return true; + case NYql::NProto::Interval: + case NYql::NProto::Decimal: + case NYql::NProto::DyNumber: + case NYql::NProto::Yson: + case NYql::NProto::Json: + case NYql::NProto::JsonDocument: + case NYql::NProto::Float: + case NYql::NProto::Double: + case NYql::NProto::Bool: + return false; + default: + break; + } + return false; + } + + void TOlapStoreSchemaPreset::ParseFromLocalDB(const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto) { + Y_VERIFY(presetProto.HasId()); + Y_VERIFY(presetProto.HasName()); + Y_VERIFY(presetProto.HasSchema()); + Id = presetProto.GetId(); + Name = presetProto.GetName(); + TOlapSchema::Parse(presetProto.GetSchema()); + } + + void TOlapStoreSchemaPreset::Serialize(NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto) const { + presetProto.SetId(Id); + presetProto.SetName(Name); + TOlapSchema::Serialize(*presetProto.MutableSchema()); + } + + bool TOlapStoreSchemaPreset::ParseFromRequest(const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto, IErrorCollector& errors) { + if (!presetProto.GetName()) { + errors.AddError("Schema preset name cannot be empty"); + return false; + } + Name = presetProto.GetName(); + return true; + } +} diff --git a/ydb/core/tx/schemeshard/schemeshard_olap_types.h b/ydb/core/tx/schemeshard/schemeshard_olap_types.h new file mode 100644 index 00000000000..f85cc2c25d4 --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard_olap_types.h @@ -0,0 +1,115 @@ +#pragma once + +#include "defs.h" +#include "schemeshard.h" +#include <ydb/library/accessor/accessor.h> +#include <ydb/core/scheme/scheme_types_proto.h> + + +namespace NKikimr::NSchemeShard { + + class IErrorCollector { + public: + virtual void AddError(const TEvSchemeShard::EStatus& errorStatus, const TString& errorMsg) = 0; + virtual void AddError(const TString& errorMsg) = 0; + }; + + class TProposeErrorCollector : public IErrorCollector { + NKikimr::NSchemeShard::TEvSchemeShard::TEvModifySchemeTransactionResult& TxResult; + public: + TProposeErrorCollector(NKikimr::NSchemeShard::TEvSchemeShard::TEvModifySchemeTransactionResult& txResult) + : TxResult(txResult) + {} + + void AddError(const TEvSchemeShard::EStatus& errorStatus, const TString& errorMsg) override { + TxResult.SetError(errorStatus, errorMsg); + } + + void AddError(const TString& errorMsg) override { + TxResult.SetError(NKikimrScheme::StatusSchemeError, errorMsg); + } + }; + + class TOlapColumnSchema { + YDB_ACCESSOR(ui32, Id, Max<ui32>()); + YDB_ACCESSOR(ui32, KeyOrder, Max<ui32>()); + + YDB_READONLY_DEF(TString, Name); + YDB_READONLY_DEF(TString, TypeName); + YDB_READONLY_DEF(NScheme::TTypeInfo, Type); + YDB_FLAG_ACCESSOR(NotNull, false); + + public: + bool IsKeyColumn() const { + return KeyOrder != Max<ui32>(); + } + + void Serialize(NKikimrSchemeOp::TOlapColumnDescription& columnSchema) const; + void ParseFromLocalDB(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema); + bool ParseFromRequest(const NKikimrSchemeOp::TOlapColumnDescription& columnSchema, IErrorCollector& errors); + }; + + class TOlapSchemaUpdate { + YDB_READONLY_OPT(NKikimrSchemeOp::EColumnTableEngine, Engine); + YDB_READONLY_DEF(TVector<TOlapColumnSchema>, Columns); + YDB_READONLY_DEF(TVector<TString>, KeyColumnNames); + + public: + bool Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema, IErrorCollector& errors, bool allowNullKeys = false); + bool Parse(const NKikimrSchemeOp::TAlterColumnTable& alterRequest, IErrorCollector& errors); + }; + + class TOlapSchema { + public: + using TColumn = TOlapColumnSchema; + using TColumns = THashMap<ui32, TOlapColumnSchema>; + using TColumnsByName = THashMap<TString, ui32>; + + private: + YDB_READONLY_OPT(NKikimrSchemeOp::EColumnTableEngine, Engine); + YDB_READONLY_DEF(TColumns, Columns); + YDB_READONLY_DEF(TColumnsByName, ColumnsByName); + YDB_READONLY_DEF(TVector<ui32>, KeyColumnIds); + + YDB_READONLY(ui32, NextColumnId, 1); + YDB_READONLY(ui32, Version, 1); + + public: + const TOlapColumnSchema* GetColumnByName(const TString& name) const noexcept { + auto it = ColumnsByName.find(name); + if (it != ColumnsByName.end()) { + return &Columns.at(it->second); + } + return nullptr; + } + + const TOlapColumnSchema* GetColumnById(const ui32 id) const noexcept { + auto it = Columns.find(id); + if (it != Columns.end()) { + return &it->second; + } + return nullptr; + } + + bool Update(const TOlapSchemaUpdate& schemaUpdate, IErrorCollector& errors); + + void Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema); + void Serialize(NKikimrSchemeOp::TColumnTableSchema& tableSchema) const; + bool Validate(const NKikimrSchemeOp::TColumnTableSchema& opSchema, IErrorCollector& errors) const; + bool ValidateTtlSettings(const NKikimrSchemeOp::TColumnDataLifeCycle& ttlSettings, IErrorCollector& errors) const; + + static bool UpdateProto(NKikimrSchemeOp::TColumnTableSchema& proto, TString& errStr); + static bool IsAllowedType(ui32 typeId); + static bool IsAllowedFirstPkType(ui32 typeId); + }; + + class TOlapStoreSchemaPreset : public TOlapSchema { + YDB_ACCESSOR_DEF(TString, Name); + YDB_ACCESSOR_DEF(ui32, Id); + YDB_ACCESSOR(size_t, ProtoIndex, -1); // Preset index in the olap store description + public: + void Serialize(NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto) const; + void ParseFromLocalDB(const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto); + bool ParseFromRequest(const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto, IErrorCollector& errors); + }; +} diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp index 61485013500..31c7c5246e3 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp @@ -402,7 +402,7 @@ void TPathDescriber::DescribeColumnTable(TPathId pathId, TPathElement::TPtr path Y_VERIFY(storeInfo, "OlapStore not found"); auto& preset = storeInfo->SchemaPresets.at(description->GetSchemaPresetId()); - auto& presetProto = storeInfo->Description.GetSchemaPresets(preset.ProtoIndex); + auto& presetProto = storeInfo->Description.GetSchemaPresets(preset.GetProtoIndex()); *description->MutableSchema() = presetProto.GetSchema(); if (description->HasSchemaPresetVersionAdj()) { description->MutableSchema()->SetVersion(description->GetSchema().GetVersion() + description->GetSchemaPresetVersionAdj()); diff --git a/ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp b/ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp index c8a50ea7f2d..2279a221fc4 100644 --- a/ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp @@ -1,4 +1,5 @@ #include "schemeshard_info_types.h" +#include "schemeshard_olap_types.h" #include <ydb/core/protos/flat_scheme_op.pb.h> namespace NKikimr { @@ -14,8 +15,8 @@ namespace { inline ui32 GetType(const TOlapSchema::TColumn& col) { - Y_VERIFY(col.Type.GetTypeId() != NScheme::NTypeIds::Pg, "pg types are not supported"); - return col.Type.GetTypeId(); + Y_VERIFY(col.GetType().GetTypeId() != NScheme::NTypeIds::Pg, "pg types are not supported"); + return col.GetType().GetTypeId(); } inline @@ -120,13 +121,13 @@ static bool ValidateColumnTableTtl(const NKikimrSchemeOp::TColumnDataLifeCycle:: const THashMap<ui32, TOlapSchema::TColumn>& sourceColumns, const THashMap<ui32, TOlapSchema::TColumn>& alterColumns, const THashMap<TString, ui32>& colName2Id, - TString& errStr) + IErrorCollector& errors) { const TString colName = ttl.GetColumnName(); auto it = colName2Id.find(colName); if (it == colName2Id.end()) { - errStr = Sprintf("Cannot enable TTL on unknown column: '%s'", colName.data()); + errors.AddError(Sprintf("Cannot enable TTL on unknown column: '%s'", colName.data())); return false; } @@ -141,17 +142,17 @@ static bool ValidateColumnTableTtl(const NKikimrSchemeOp::TColumnDataLifeCycle:: } if (IsDropped(*column)) { - errStr = Sprintf("Cannot enable TTL on dropped column: '%s'", colName.data()); + errors.AddError(Sprintf("Cannot enable TTL on dropped column: '%s'", colName.data())); return false; } if (ttl.HasExpireAfterBytes()) { - errStr = "TTL with eviction by size is not supported yet"; + errors.AddError("TTL with eviction by size is not supported yet"); return false; } if (!ttl.HasExpireAfterSeconds()) { - errStr = "TTL without eviction time"; + errors.AddError("TTL without eviction time"); return false; } @@ -159,25 +160,26 @@ static bool ValidateColumnTableTtl(const NKikimrSchemeOp::TColumnDataLifeCycle:: switch (GetType(*column)) { case NScheme::NTypeIds::DyNumber: - errStr = "Unsupported column type for TTL in column tables"; // TODO + errors.AddError("Unsupported column type for TTL in column tables"); return false; default: break; } - return ValidateUnit(*column, unit, errStr); + TString errStr; + if (!ValidateUnit(*column, unit, errStr)) { + errors.AddError(errStr); + return false; + } + return true; } -bool ValidateTtlSettings(const NKikimrSchemeOp::TColumnDataLifeCycle& ttl, - const THashMap<ui32, TOlapSchema::TColumn>& columns, - const THashMap<TString, ui32>& columnsByName, - TString& errStr) -{ +bool TOlapSchema::ValidateTtlSettings(const NKikimrSchemeOp::TColumnDataLifeCycle& ttl, IErrorCollector& errors) const { using TTtlProto = NKikimrSchemeOp::TColumnDataLifeCycle; switch (ttl.GetStatusCase()) { case TTtlProto::kEnabled: - return ValidateColumnTableTtl(ttl.GetEnabled(), {}, columns, columnsByName, errStr); + return ValidateColumnTableTtl(ttl.GetEnabled(), {}, Columns, ColumnsByName, errors); case TTtlProto::kDisabled: default: break; |