diff options
author | gvit <gvit@ydb.tech> | 2023-09-20 09:13:15 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2023-09-20 09:33:14 +0300 |
commit | 0ba56e8b5a443974649ae9f2ee9af833b168cd1b (patch) | |
tree | 4e81a8871cc9a2791871dcd23e5e56944e0c4d8c | |
parent | 9e080cab0238736f5f0bd0fa69ed33b8abc8a106 (diff) | |
download | ydb-0ba56e8b5a443974649ae9f2ee9af833b168cd1b.tar.gz |
initial build column implementation
42 files changed, 1424 insertions, 513 deletions
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 1be4e5efd4..0f68d97e4e 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -14,6 +14,7 @@ import "ydb/core/protos/blob_depot_config.proto"; import "ydb/public/api/protos/ydb_coordination.proto"; import "ydb/public/api/protos/ydb_export.proto"; import "ydb/library/mkql_proto/protos/minikql.proto"; +import "ydb/core/protos/index_builder.proto"; import "google/protobuf/empty.proto"; @@ -1371,6 +1372,8 @@ enum EOperationType { ESchemeOpCreateExternalDataSource = 89; ESchemeOpDropExternalDataSource = 90; ESchemeOpAlterExternalDataSource = 91; + + ESchemeOpCreateColumnBuild = 92; } message TApplyIf { @@ -1485,6 +1488,8 @@ message TModifyScheme { optional TExternalDataSourceDescription CreateExternalDataSource = 59; optional TDropBlockStoreVolume DropBlockStoreVolume = 60; + + optional NKikimrIndexBuilder.TColumnBuildSettings InitiateColumnBuild = 61; } // "Script", used by client to parse text files with multiple DDL commands diff --git a/ydb/core/protos/index_builder.proto b/ydb/core/protos/index_builder.proto index f6ca5f9da6..7c56553d63 100644 --- a/ydb/core/protos/index_builder.proto +++ b/ydb/core/protos/index_builder.proto @@ -2,13 +2,25 @@ import "ydb/public/api/protos/ydb_issue_message.proto"; import "ydb/public/api/protos/ydb_operation.proto"; import "ydb/public/api/protos/ydb_status_codes.proto"; import "ydb/public/api/protos/ydb_table.proto"; +import "ydb/public/api/protos/ydb_value.proto"; package NKikimrIndexBuilder; option java_package = "ru.yandex.kikimr.proto"; +message TColumnBuildSetting { + optional string ColumnName = 1; + optional Ydb.TypedValue default_from_literal = 2; +} + +message TColumnBuildSettings { + repeated TColumnBuildSetting column = 1; + optional string Table = 2; +} + message TIndexBuildSettings { optional string source_path = 1; optional Ydb.Table.TableIndex index = 2; + optional TColumnBuildSettings column_build_operation = 7; optional uint32 max_batch_rows = 3 [ default = 500 ]; optional uint64 max_batch_bytes = 4 [ default = 8388608 ]; diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index 948ea0d2e0..da9a3f1537 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -19,6 +19,7 @@ import "ydb/library/yql/dq/actors/protos/dq_events.proto"; import "ydb/library/yql/dq/actors/protos/dq_stats.proto"; import "ydb/library/yql/dq/proto/dq_tasks.proto"; import "google/protobuf/empty.proto"; +import "ydb/core/protos/index_builder.proto"; package NKikimrTxDataShard; option java_package = "ru.yandex.kikimr.proto"; @@ -1410,6 +1411,8 @@ message TEvBuildIndexCreateRequest { optional uint64 MaxRetries = 14; repeated string DataColumns = 15; + + optional NKikimrIndexBuilder.TColumnBuildSettings ColumnBuildSettings = 16; } message TEvBuildIndexProgressResponse { diff --git a/ydb/core/tx/datashard/build_index.cpp b/ydb/core/tx/datashard/build_index.cpp index 79554928eb..50feb7ee77 100644 --- a/ydb/core/tx/datashard/build_index.cpp +++ b/ydb/core/tx/datashard/build_index.cpp @@ -14,8 +14,10 @@ #include <ydb/library/yql/public/issue/yql_issue_message.h> +#include <ydb/core/ydb_convert/ydb_convert.h> #include <util/generic/algorithm.h> #include <util/string/builder.h> +#include <ydb/core/ydb_convert/table_description.h> namespace NKikimr::NDataShard { @@ -73,25 +75,69 @@ static void ProtoYdbTypeFromTypeInfo(Ydb::Type* type, const NScheme::TTypeInfo t } } -static std::shared_ptr<TTypes> BuildTypes(const TColumnsTypes& types, const TVector<TString>& indexColumns, const TVector<TString>& dataColumns) { +static std::shared_ptr<TTypes> BuildTypes(const TColumnsTypes& types, const TUserTable::TCPtr& tableInfo, const NKikimrIndexBuilder::TColumnBuildSettings& buildSettings, const TVector<TString>& indexColumns, const TVector<TString>& dataColumns) { auto result = std::make_shared<TTypes>(); result->reserve(indexColumns.size()); - for (const auto& colName: indexColumns) { - Ydb::Type type; - ProtoYdbTypeFromTypeInfo(&type, types.at(colName)); - result->emplace_back(colName, type); - } + if (buildSettings.columnSize() > 0) { + for(const auto& keyColId : tableInfo->KeyColumnIds) { + auto it = tableInfo->Columns.at(keyColId); + Ydb::Type type; + ProtoYdbTypeFromTypeInfo(&type, it.Type); + result->emplace_back(it.Name, type); + } + + for(size_t i = 0; i < buildSettings.columnSize(); i++) { + const auto& column = buildSettings.column(i); + result->emplace_back(column.GetColumnName(), column.default_from_literal().type()); + } + + } else { + for (const auto& colName: indexColumns) { + Ydb::Type type; + ProtoYdbTypeFromTypeInfo(&type, types.at(colName)); + result->emplace_back(colName, type); + } + + for (const auto& colName: dataColumns) { + Ydb::Type type; + ProtoYdbTypeFromTypeInfo(&type, types.at(colName)); + result->emplace_back(colName, type); + } - for (const auto& colName: dataColumns) { - Ydb::Type type; - ProtoYdbTypeFromTypeInfo(&type, types.at(colName)); - result->emplace_back(colName, type); } return result; } +bool BuildExtraColumns(TVector<TCell>& cells, const NKikimrIndexBuilder::TColumnBuildSettings& buildSettings, TString& err, TMemoryPool& valueDataPool) { + cells.clear(); + cells.reserve(buildSettings.columnSize()); + for(size_t i = 0; i < buildSettings.columnSize(); i++) { + const auto& column = buildSettings.column(i); + + NScheme::TTypeInfo typeInfo; + i32 typeMod = -1; + Ydb::StatusIds::StatusCode status; + + if (column.default_from_literal().type().has_pg_type()) { + typeMod = column.default_from_literal().type().pg_type().typmod(); + } + + TString unusedtm; + if (!ExtractColumnTypeInfo(typeInfo, unusedtm, column.default_from_literal().type(), status, err)) { + return false; + } + + cells.push_back({}); + if (!CellFromProtoVal(typeInfo, typeMod, &column.default_from_literal().value(), cells.back(), err, valueDataPool)) { + return false; + } + } + + return true; +} + struct TStatus { Ydb::StatusIds::StatusCode StatusCode = Ydb::StatusIds::STATUS_CODE_UNSPECIFIED; NYql::TIssues Issues; @@ -197,6 +243,7 @@ private: class TBuildIndexScan : public TActor<TBuildIndexScan>, public NTable::IScan { const TUploadLimits Limits; + const NKikimrIndexBuilder::TColumnBuildSettings ColumnBuildSettings; const ui64 BuildIndexId; const TString TargetTable; @@ -242,10 +289,12 @@ public: const TSerializedTableRange& range, const TVector<TString> targetIndexColumns, const TVector<TString> targetDataColumns, + NKikimrIndexBuilder::TColumnBuildSettings&& columnsToBuild, TUserTable::TCPtr tableInfo, TUploadLimits limits) : TActor(&TThis::StateWork) , Limits(limits) + , ColumnBuildSettings(std::move(columnsToBuild)) , BuildIndexId(buildIndexId) , TargetTable(target) , SeqNo(seqNo) @@ -253,13 +302,14 @@ public: , DatashardActorId(datashardActorId) , SchemeShardActorID(schemeshardActorId) , ScanTags(BuildTags(GetAllTags(tableInfo), targetIndexColumns, targetDataColumns)) - , UploadColumnsTypes(BuildTypes(GetAllTypes(tableInfo), targetIndexColumns, targetDataColumns)) + , UploadColumnsTypes(BuildTypes(GetAllTypes(tableInfo), tableInfo, ColumnBuildSettings, targetIndexColumns, targetDataColumns)) , TargetDataColumnPos(targetIndexColumns.size()) , KeyColumnIds(tableInfo->KeyColumnIds) , KeyTypes(tableInfo->KeyColumnTypes) , TableRange(tableInfo->Range) , RequestedRange(range) { + } ~TBuildIndexScan() override = default; @@ -323,9 +373,25 @@ public: const TConstArrayRef<TCell> rowCells = *row; - ReadBuf.AddRow(TSerializedCellVec(key), - TSerializedCellVec(rowCells.Slice(0, TargetDataColumnPos)), - TSerializedCellVec::Serialize(rowCells.Slice(TargetDataColumnPos))); + + if (ColumnBuildSettings.columnSize() > 0) { + TMemoryPool valueDataPool(256); + TVector<TCell> cells; + TString err; + BuildExtraColumns(cells, ColumnBuildSettings, err, valueDataPool); + TSerializedCellVec valueCells(cells); + TString serializedValue = TSerializedCellVec::Serialize(cells); + TSerializedCellVec keyCopy(key); + ReadBuf.AddRow( + TSerializedCellVec(key), + std::move(keyCopy), + std::move(serializedValue)); + } else { + ReadBuf.AddRow( + TSerializedCellVec(key), + TSerializedCellVec(rowCells.Slice(0, TargetDataColumnPos)), + TSerializedCellVec::Serialize(rowCells.Slice(TargetDataColumnPos))); + } if (!ReadBuf.IsReachLimits(Limits)) { return EScan::Feed; @@ -547,11 +613,12 @@ TAutoPtr<NTable::IScan> CreateBuildIndexScan( const TSerializedTableRange& range, const TVector<TString>& targetIndexColumns, const TVector<TString>& targetDataColumns, + NKikimrIndexBuilder::TColumnBuildSettings&& columnsToBuild, TUserTable::TCPtr tableInfo, TUploadLimits limits) { return new TBuildIndexScan( - buildIndexId, target, seqNo, dataShardId, datashardActorId, schemeshardActorId, range, targetIndexColumns, targetDataColumns, tableInfo, limits); + buildIndexId, target, seqNo, dataShardId, datashardActorId, schemeshardActorId, range, targetIndexColumns, targetDataColumns, std::move(columnsToBuild), tableInfo, limits); } class TDataShard::TTxHandleSafeBuildIndexScan : public NTabletFlatExecutor::TTransactionBase<TDataShard> { @@ -692,6 +759,9 @@ void TDataShard::HandleSafe(TEvDataShard::TEvBuildIndexCreateRequest::TPtr& ev, limits.MaxUploadRowsRetryCount = record.GetMaxRetries(); } + NKikimrIndexBuilder::TColumnBuildSettings columnsToBuild; + columnsToBuild.Swap(ev->Get()->Record.MutableColumnBuildSettings()); + const auto scanId = QueueScan(userTable->LocalTid, CreateBuildIndexScan(buildIndexId, record.GetTargetName(), @@ -702,6 +772,7 @@ void TDataShard::HandleSafe(TEvDataShard::TEvBuildIndexCreateRequest::TPtr& ev, requestedRange, targetIndexColumns, targetDataColumns, + std::move(columnsToBuild), userTable, limits), ev->Cookie, diff --git a/ydb/core/tx/schemeshard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/schemeshard/CMakeLists.darwin-x86_64.txt index 4fef5b01e6..b116ba3b81 100644 --- a/ydb/core/tx/schemeshard/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/schemeshard/CMakeLists.darwin-x86_64.txt @@ -13,6 +13,7 @@ add_subdirectory(ut_bsvolume) add_subdirectory(ut_bsvolume_reboots) add_subdirectory(ut_cdc_stream) add_subdirectory(ut_cdc_stream_reboots) +add_subdirectory(ut_column_build) add_subdirectory(ut_compaction) add_subdirectory(ut_export) add_subdirectory(ut_export_reboots_s3) diff --git a/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt index fe4acc584c..cc7239bab7 100644 --- a/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt @@ -13,6 +13,7 @@ add_subdirectory(ut_bsvolume) add_subdirectory(ut_bsvolume_reboots) add_subdirectory(ut_cdc_stream) add_subdirectory(ut_cdc_stream_reboots) +add_subdirectory(ut_column_build) add_subdirectory(ut_compaction) add_subdirectory(ut_export) add_subdirectory(ut_export_reboots_s3) diff --git a/ydb/core/tx/schemeshard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/schemeshard/CMakeLists.linux-x86_64.txt index fe4acc584c..cc7239bab7 100644 --- a/ydb/core/tx/schemeshard/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/schemeshard/CMakeLists.linux-x86_64.txt @@ -13,6 +13,7 @@ add_subdirectory(ut_bsvolume) add_subdirectory(ut_bsvolume_reboots) add_subdirectory(ut_cdc_stream) add_subdirectory(ut_cdc_stream_reboots) +add_subdirectory(ut_column_build) add_subdirectory(ut_compaction) add_subdirectory(ut_export) add_subdirectory(ut_export_reboots_s3) diff --git a/ydb/core/tx/schemeshard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/schemeshard/CMakeLists.windows-x86_64.txt index 31f95d1a00..458da9a9a1 100644 --- a/ydb/core/tx/schemeshard/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/schemeshard/CMakeLists.windows-x86_64.txt @@ -13,6 +13,7 @@ add_subdirectory(ut_bsvolume) add_subdirectory(ut_bsvolume_reboots) add_subdirectory(ut_cdc_stream) add_subdirectory(ut_cdc_stream_reboots) +add_subdirectory(ut_column_build) add_subdirectory(ut_compaction) add_subdirectory(ut_export) add_subdirectory(ut_export_reboots_s3) diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index e696105160..c435b2d79b 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -4277,6 +4277,15 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { indexInfo->UnlockTxStatus = rowset.GetValueOrDefault<Schema::IndexBuild::UnlockTxStatus>(indexInfo->UnlockTxStatus); indexInfo->UnlockTxDone = rowset.GetValueOrDefault<Schema::IndexBuild::UnlockTxDone>(indexInfo->UnlockTxDone); + // note: please note that here we specify BuildIndex as operation default, + // because previosly this table was dedicated for build index operations only. + indexInfo->BuildKind = TIndexBuildInfo::EBuildKind( + rowset.GetValueOrDefault<Schema::IndexBuild::BuildKind>(ui32(TIndexBuildInfo::EBuildKind::BuildIndex))); + + indexInfo->AlterMainTableTxId = rowset.GetValueOrDefault<Schema::IndexBuild::AlterMainTableTxId>(indexInfo->AlterMainTableTxId); + indexInfo->AlterMainTableTxStatus = rowset.GetValueOrDefault<Schema::IndexBuild::AlterMainTableTxStatus>(indexInfo->AlterMainTableTxStatus); + indexInfo->AlterMainTableTxDone = rowset.GetValueOrDefault<Schema::IndexBuild::AlterMainTableTxDone>(indexInfo->AlterMainTableTxDone); + indexInfo->Billed = TBillingStats( rowset.GetValueOrDefault<Schema::IndexBuild::RowsBilled>(0), rowset.GetValueOrDefault<Schema::IndexBuild::BytesBilled>(0)); @@ -4341,6 +4350,30 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { } } + { + auto rowset = db.Table<Schema::BuildColumnOperationSettings>().Range().Select(); + if (!rowset.IsReady()) { + return false; + } + + while (!rowset.EndOfSet()) { + TIndexBuildId id = rowset.GetValue<Schema::BuildColumnOperationSettings::Id>(); + Y_VERIFY_S(Self->IndexBuilds.contains(id), "BuildIndex not found" + << ": id# " << id); + + TIndexBuildInfo::TPtr buildInfo = Self->IndexBuilds.at(id); + + TString columnName = rowset.GetValue<Schema::BuildColumnOperationSettings::ColumnName>(); + TString defaultFromLiteral = rowset.GetValue<Schema::BuildColumnOperationSettings::DefaultFromLiteral>(); + + buildInfo->BuildColumns.push_back(TIndexBuildInfo::TColumnBuildInfo(columnName, defaultFromLiteral)); + + if (!rowset.Next()) { + return false; + } + } + } + // read index build upload progress { auto rowset = db.Table<Schema::IndexBuildShardStatus>().Range().Select(); diff --git a/ydb/core/tx/schemeshard/schemeshard__monitoring.cpp b/ydb/core/tx/schemeshard/schemeshard__monitoring.cpp index 65b359a753..5566c21f93 100644 --- a/ydb/core/tx/schemeshard/schemeshard__monitoring.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__monitoring.cpp @@ -736,6 +736,10 @@ private: str << "Subscribers.size: " << info->Subscribers.size() << Endl + << "AlterMainTableTxId: " << info->AlterMainTableTxId << Endl + << "AlterMainTableTxStatus: " << NKikimrScheme::EStatus_Name(info->AlterMainTableTxStatus) << Endl + << "AlterMainTableTxDone: " << (info->AlterMainTableTxDone ? "DONE": "not done") << Endl + << "LockTxId: " << info->LockTxId << Endl << "LockTxStatus: " << NKikimrScheme::EStatus_Name(info->LockTxStatus) << Endl << "LockTxDone " << (info->LockTxDone ? "DONE" : "not done") << Endl diff --git a/ydb/core/tx/schemeshard/schemeshard__operation.cpp b/ydb/core/tx/schemeshard/schemeshard__operation.cpp index 9b6c224ef6..7be89c7188 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation.cpp @@ -1162,6 +1162,7 @@ ISubOperation::TPtr TOperation::ConstructPart(NKikimrSchemeOp::EOperationType op return CreateUpgradeSubDomain(NextPartId(), tx); case NKikimrSchemeOp::EOperationType::ESchemeOpUpgradeSubDomainDecision: return CreateUpgradeSubDomainDecision(NextPartId(), tx); + case NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnBuild: case NKikimrSchemeOp::EOperationType::ESchemeOpCreateIndexBuild: Y_FAIL("multipart operations are handled before, also they require transaction details"); case NKikimrSchemeOp::EOperationType::ESchemeOpCreateLock: @@ -1301,6 +1302,8 @@ TVector<ISubOperation::TPtr> TOperation::ConstructParts(const TTxTransaction& tx return CreateDropIndexedTable(NextPartId(), tx, context); case NKikimrSchemeOp::EOperationType::ESchemeOpForceDropSubDomain: return {CreateCompatibleSubdomainDrop(context.SS, NextPartId(), tx)}; + case NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnBuild: + return CreateBuildColumn(NextPartId(), tx, context); case NKikimrSchemeOp::EOperationType::ESchemeOpCreateIndexBuild: return CreateBuildIndex(NextPartId(), tx, context); case NKikimrSchemeOp::EOperationType::ESchemeOpApplyIndexBuild: diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_apply_build_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_apply_build_index.cpp index 06f5bc5b13..21e03524ea 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_apply_build_index.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_apply_build_index.cpp @@ -22,18 +22,7 @@ TVector<ISubOperation::TPtr> ApplyBuildIndex(TOperationId nextId, const TTxTrans TString indexName = config.GetIndexName(); TPath table = TPath::Resolve(tablePath, context.SS); - TPath index = table.Child(indexName); - TPath implIndexTable = index.Child("indexImplTable"); - - - TTableInfo::TPtr implIndexTableInfo = context.SS->Tables.at(implIndexTable.Base()->PathId); - - //check idempotence - - //check limits - TVector<ISubOperation::TPtr> result; - { auto finalize = TransactionTemplate(table.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpFinalizeBuildIndexMainTable); *finalize.MutableLockGuard() = tx.GetLockGuard(); @@ -46,7 +35,9 @@ TVector<ISubOperation::TPtr> ApplyBuildIndex(TOperationId nextId, const TTxTrans result.push_back(CreateFinalizeBuildIndexMainTable(NextPartId(nextId, result), finalize)); } + if (!indexName.empty()) { + TPath index = table.Child(indexName); auto tableIndexAltering = TransactionTemplate(table.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterTableIndex); *tableIndexAltering.MutableLockGuard() = tx.GetLockGuard(); auto alterIndex = tableIndexAltering.MutableAlterTableIndex(); @@ -56,7 +47,11 @@ TVector<ISubOperation::TPtr> ApplyBuildIndex(TOperationId nextId, const TTxTrans result.push_back(CreateAlterTableIndex(NextPartId(nextId, result), tableIndexAltering)); } + if (!indexName.empty()) { + TPath index = table.Child(indexName); + TPath implIndexTable = index.Child("indexImplTable"); + TTableInfo::TPtr implIndexTableInfo = context.SS->Tables.at(implIndexTable.Base()->PathId); auto indexImplTableAltering = TransactionTemplate(index.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpFinalizeBuildIndexImplTable); auto alterTable = indexImplTableAltering.MutableAlterTable(); alterTable->SetName(implIndexTable.LeafName()); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_build_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_build_index.cpp index 3de0ea098f..124d35229e 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_build_index.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_build_index.cpp @@ -6,9 +6,33 @@ #include <ydb/core/protos/flat_tx_scheme.pb.h> #include <ydb/core/protos/flat_scheme_op.pb.h> +#include <ydb/core/ydb_convert/table_description.h> namespace NKikimr::NSchemeShard { +TVector<ISubOperation::TPtr> CreateBuildColumn(TOperationId opId, const TTxTransaction& tx, TOperationContext& context) { + Y_VERIFY(tx.GetOperationType() == NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnBuild); + + const auto& op = tx.GetInitiateColumnBuild(); + + const auto table = TPath::Resolve(op.GetTable(), context.SS); + TVector<ISubOperation::TPtr> result; + + // altering version of the table. + { + auto outTx = TransactionTemplate(table.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpInitiateBuildIndexMainTable); + *outTx.MutableLockGuard() = tx.GetLockGuard(); + + auto& snapshot = *outTx.MutableInitiateBuildIndexMainTable(); + snapshot.SetTableName(table.LeafName()); + + result.push_back(CreateInitializeBuildIndexMainTable(NextPartId(opId, result), outTx)); + } + + return result; + +} + TVector<ISubOperation::TPtr> CreateBuildIndex(TOperationId opId, const TTxTransaction& tx, TOperationContext& context) { Y_VERIFY(tx.GetOperationType() == NKikimrSchemeOp::EOperationType::ESchemeOpCreateIndexBuild); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_part.h b/ydb/core/tx/schemeshard/schemeshard__operation_part.h index 441d19b6b6..419332651a 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_part.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_part.h @@ -352,6 +352,8 @@ ISubOperation::TPtr CreateSplitMerge(TOperationId id, TTxState::ETxState state); ISubOperation::TPtr CreateDropTable(TOperationId id, const TTxTransaction& tx); ISubOperation::TPtr CreateDropTable(TOperationId id, TTxState::ETxState state); +TVector<ISubOperation::TPtr> CreateBuildColumn(TOperationId id, const TTxTransaction& tx, TOperationContext& context); + TVector<ISubOperation::TPtr> CreateBuildIndex(TOperationId id, const TTxTransaction& tx, TOperationContext& context); TVector<ISubOperation::TPtr> ApplyBuildIndex(TOperationId id, const TTxTransaction& tx, TOperationContext& context); TVector<ISubOperation::TPtr> CancelBuildIndex(TOperationId id, const TTxTransaction& tx, TOperationContext& context); diff --git a/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp b/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp index 7ff867f8fa..5faf6d996e 100644 --- a/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp @@ -218,6 +218,8 @@ TString DefineUserOperationName(const NKikimrSchemeOp::TModifyScheme& tx) { return "DROP EXTERNAL DATA SOURCE"; case NKikimrSchemeOp::EOperationType::ESchemeOpAlterExternalDataSource: return "ALTER EXTERNAL DATA SOURCE"; + case NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnBuild: + return "ALTER TABLE ADD COLUMN DEFAULT"; } Y_FAIL("switch should cover all operation types"); } @@ -495,6 +497,8 @@ TVector<TString> ExtractChangingPaths(const NKikimrSchemeOp::TModifyScheme& tx) case NKikimrSchemeOp::EOperationType::ESchemeOpAlterExternalDataSource: // TODO: unimplemented break; + case NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnBuild: + result.emplace_back(tx.GetInitiateColumnBuild().GetTable()); } return result; diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index.cpp index 6d8cdc38dc..e31136eb09 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index.cpp @@ -32,35 +32,45 @@ void TSchemeShard::Handle(TEvPrivate::TEvIndexBuildingMakeABill::TPtr& ev, const Execute(CreateTxBilling(ev), ctx); } -void TSchemeShard::PersistCreateBuildIndex(NIceDb::TNiceDb& db, const TIndexBuildInfo::TPtr indexInfo) { - db.Table<Schema::IndexBuild>().Key(indexInfo->Id).Update( - NIceDb::TUpdate<Schema::IndexBuild::Uid>(indexInfo->Uid), - NIceDb::TUpdate<Schema::IndexBuild::DomainOwnerId>(indexInfo->DomainPathId.OwnerId), - NIceDb::TUpdate<Schema::IndexBuild::DomainLocalId>(indexInfo->DomainPathId.LocalPathId), - NIceDb::TUpdate<Schema::IndexBuild::TableOwnerId>(indexInfo->TablePathId.OwnerId), - NIceDb::TUpdate<Schema::IndexBuild::TableLocalId>(indexInfo->TablePathId.LocalPathId), - NIceDb::TUpdate<Schema::IndexBuild::IndexName>(indexInfo->IndexName), - NIceDb::TUpdate<Schema::IndexBuild::IndexType>(indexInfo->IndexType), - NIceDb::TUpdate<Schema::IndexBuild::MaxBatchRows>(indexInfo->Limits.MaxBatchRows), - NIceDb::TUpdate<Schema::IndexBuild::MaxBatchBytes>(indexInfo->Limits.MaxBatchBytes), - NIceDb::TUpdate<Schema::IndexBuild::MaxShards>(indexInfo->Limits.MaxShards), - NIceDb::TUpdate<Schema::IndexBuild::MaxRetries>(indexInfo->Limits.MaxRetries) +void TSchemeShard::PersistCreateBuildIndex(NIceDb::TNiceDb& db, const TIndexBuildInfo::TPtr info) { + Y_VERIFY(info->BuildKind != TIndexBuildInfo::EBuildKind::BuildKindUnspecified); + db.Table<Schema::IndexBuild>().Key(info->Id).Update( + NIceDb::TUpdate<Schema::IndexBuild::Uid>(info->Uid), + NIceDb::TUpdate<Schema::IndexBuild::DomainOwnerId>(info->DomainPathId.OwnerId), + NIceDb::TUpdate<Schema::IndexBuild::DomainLocalId>(info->DomainPathId.LocalPathId), + NIceDb::TUpdate<Schema::IndexBuild::TableOwnerId>(info->TablePathId.OwnerId), + NIceDb::TUpdate<Schema::IndexBuild::TableLocalId>(info->TablePathId.LocalPathId), + NIceDb::TUpdate<Schema::IndexBuild::IndexName>(info->IndexName), + NIceDb::TUpdate<Schema::IndexBuild::IndexType>(info->IndexType), + NIceDb::TUpdate<Schema::IndexBuild::MaxBatchRows>(info->Limits.MaxBatchRows), + NIceDb::TUpdate<Schema::IndexBuild::MaxBatchBytes>(info->Limits.MaxBatchBytes), + NIceDb::TUpdate<Schema::IndexBuild::MaxShards>(info->Limits.MaxShards), + NIceDb::TUpdate<Schema::IndexBuild::MaxRetries>(info->Limits.MaxRetries), + NIceDb::TUpdate<Schema::IndexBuild::BuildKind>(ui32(info->BuildKind)) ); ui32 columnNo = 0; - for (ui32 i = 0; i < indexInfo->IndexColumns.size(); ++i, ++columnNo) { - db.Table<Schema::IndexBuildColumns>().Key(indexInfo->Id, columnNo).Update( - NIceDb::TUpdate<Schema::IndexBuildColumns::ColumnName>(indexInfo->IndexColumns[i]), + for (ui32 i = 0; i < info->IndexColumns.size(); ++i, ++columnNo) { + db.Table<Schema::IndexBuildColumns>().Key(info->Id, columnNo).Update( + NIceDb::TUpdate<Schema::IndexBuildColumns::ColumnName>(info->IndexColumns[i]), NIceDb::TUpdate<Schema::IndexBuildColumns::ColumnKind>(EIndexColumnKind::KeyColumn) ); } - for (ui32 i = 0; i < indexInfo->DataColumns.size(); ++i, ++columnNo) { - db.Table<Schema::IndexBuildColumns>().Key(indexInfo->Id, columnNo).Update( - NIceDb::TUpdate<Schema::IndexBuildColumns::ColumnName>(indexInfo->DataColumns[i]), + for (ui32 i = 0; i < info->DataColumns.size(); ++i, ++columnNo) { + db.Table<Schema::IndexBuildColumns>().Key(info->Id, columnNo).Update( + NIceDb::TUpdate<Schema::IndexBuildColumns::ColumnName>(info->DataColumns[i]), NIceDb::TUpdate<Schema::IndexBuildColumns::ColumnKind>(EIndexColumnKind::DataColumn) ); } + + for(ui32 i = 0; i < info->BuildColumns.size(); i++) { + db.Table<Schema::BuildColumnOperationSettings>().Key(info->Id, i).Update( + NIceDb::TUpdate<Schema::BuildColumnOperationSettings::ColumnName>(info->BuildColumns[i].ColumnName), + NIceDb::TUpdate<Schema::BuildColumnOperationSettings::DefaultFromLiteral>( + TString(info->BuildColumns[i].DefaultFromLiteral.SerializeAsString())) + ); + } } void TSchemeShard::PersistBuildIndexState(NIceDb::TNiceDb& db, const TIndexBuildInfo::TPtr indexInfo) { @@ -78,6 +88,21 @@ void TSchemeShard::PersistBuildIndexIssue(NIceDb::TNiceDb& db, const TIndexBuild NIceDb::TUpdate<Schema::IndexBuild::Issue>(indexInfo->Issue)); } +void TSchemeShard::PersistBuildIndexAlterMainTableTxId(NIceDb::TNiceDb& db, const TIndexBuildInfo::TPtr indexInfo) { + db.Table<Schema::IndexBuild>().Key(indexInfo->Id).Update( + NIceDb::TUpdate<Schema::IndexBuild::AlterMainTableTxId>(indexInfo->AlterMainTableTxId)); +} + +void TSchemeShard::PersistBuildIndexAlterMainTableTxStatus(NIceDb::TNiceDb& db, const TIndexBuildInfo::TPtr indexInfo) { + db.Table<Schema::IndexBuild>().Key(indexInfo->Id).Update( + NIceDb::TUpdate<Schema::IndexBuild::AlterMainTableTxStatus>(indexInfo->AlterMainTableTxStatus)); +} + +void TSchemeShard::PersistBuildIndexAlterMainTableTxDone(NIceDb::TNiceDb& db, const TIndexBuildInfo::TPtr indexInfo) { + db.Table<Schema::IndexBuild>().Key(indexInfo->Id).Update( + NIceDb::TUpdate<Schema::IndexBuild::AlterMainTableTxDone>(indexInfo->AlterMainTableTxDone)); +} + void TSchemeShard::PersistBuildIndexInitiateTxId(NIceDb::TNiceDb& db, const TIndexBuildInfo::TPtr indexInfo) { db.Table<Schema::IndexBuild>().Key(indexInfo->Id).Update( NIceDb::TUpdate<Schema::IndexBuild::InitiateTxId>(indexInfo->InitiateTxId)); @@ -168,21 +193,25 @@ void TSchemeShard::PersistBuildIndexUploadInitiate(NIceDb::TNiceDb& db, const TI NIceDb::TUpdate<Schema::IndexBuildShardStatus::UploadStatus>(shardStatus.UploadStatus)); } -void TSchemeShard::PersistBuildIndexForget(NIceDb::TNiceDb& db, const TIndexBuildInfo::TPtr indexInfo) { - db.Table<Schema::IndexBuild>().Key(indexInfo->Id).Delete(); +void TSchemeShard::PersistBuildIndexForget(NIceDb::TNiceDb& db, const TIndexBuildInfo::TPtr info) { + db.Table<Schema::IndexBuild>().Key(info->Id).Delete(); ui32 columnNo = 0; - for (ui32 i = 0; i < indexInfo->IndexColumns.size(); ++i, ++columnNo) { - db.Table<Schema::IndexBuildColumns>().Key(indexInfo->Id, columnNo).Delete(); + for (ui32 i = 0; i < info->IndexColumns.size(); ++i, ++columnNo) { + db.Table<Schema::IndexBuildColumns>().Key(info->Id, columnNo).Delete(); } - for (ui32 i = 0; i < indexInfo->DataColumns.size(); ++i, ++columnNo) { - db.Table<Schema::IndexBuildColumns>().Key(indexInfo->Id, columnNo).Delete(); + for (ui32 i = 0; i < info->DataColumns.size(); ++i, ++columnNo) { + db.Table<Schema::IndexBuildColumns>().Key(info->Id, columnNo).Delete(); } - for (const auto& item: indexInfo->Shards) { + for (const auto& item: info->Shards) { auto shardIdx = item.first; - db.Table<Schema::IndexBuildShardStatus>().Key(indexInfo->Id, shardIdx.GetOwnerId(), shardIdx.GetLocalId()).Delete(); + db.Table<Schema::IndexBuildShardStatus>().Key(info->Id, shardIdx.GetOwnerId(), shardIdx.GetLocalId()).Delete(); + } + + for(ui32 idx = 0; idx < info->BuildColumns.size(); ++idx) { + db.Table<Schema::BuildColumnOperationSettings>().Key(info->Id, idx).Delete(); } } @@ -208,6 +237,12 @@ void TSchemeShard::SetupRouting(const TDeque<TIndexBuildId>& indexIds, const TAc TxIdToIndexBuilds[buildInfo->LockTxId] = buildInfo->Id; } + if (buildInfo->AlterMainTableTxId) { + Y_VERIFY(!TxIdToIndexBuilds.contains(buildInfo->AlterMainTableTxId) + || TxIdToIndexBuilds.at(buildInfo->AlterMainTableTxId) == buildInfo->Id); + TxIdToIndexBuilds[buildInfo->AlterMainTableTxId] = buildInfo->Id; + } + if (buildInfo->InitiateTxId) { Y_VERIFY(!TxIdToIndexBuilds.contains(buildInfo->InitiateTxId) || TxIdToIndexBuilds.at(buildInfo->InitiateTxId) == buildInfo->Id); diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index.h b/ydb/core/tx/schemeshard/schemeshard_build_index.h index 5ca64eb459..7846c0230d 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index.h +++ b/ydb/core/tx/schemeshard/schemeshard_build_index.h @@ -34,8 +34,8 @@ struct TEvIndexBuilder { explicit TEvCreateRequest( const ui64 txId, const TString& dbName, - NKikimrIndexBuilder::TIndexBuildSettings settings - ) { + NKikimrIndexBuilder::TIndexBuildSettings settings) + { Record.SetTxId(txId); Record.SetDatabaseName(dbName); *Record.MutableSettings() = std::move(settings); @@ -69,8 +69,7 @@ struct TEvIndexBuilder { explicit TEvCancelRequest( const ui64 txId, const TString& dbName, - ui64 buildIndexId - ) + ui64 buildIndexId) { Record.SetTxId(txId); Record.SetDatabaseName(dbName); diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__cancel.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__cancel.cpp index 3db9c44483..5ce746aa83 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index__cancel.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index__cancel.cpp @@ -3,47 +3,27 @@ #include "schemeshard_build_index_helpers.h" #include "schemeshard_build_index_tx_base.h" -#include <ydb/public/api/protos/ydb_issue_message.pb.h> -#include <ydb/public/api/protos/ydb_status_codes.pb.h> - - -namespace NKikimr { -namespace NSchemeShard { +namespace NKikimr::NSchemeShard { using namespace NTabletFlatExecutor; -struct TSchemeShard::TIndexBuilder::TTxCancel: public TSchemeShard::TIndexBuilder::TTxBase { -private: - TEvIndexBuilder::TEvCancelRequest::TPtr Request; - +struct TSchemeShard::TIndexBuilder::TTxCancel: public TSchemeShard::TIndexBuilder::TTxSimple<TEvIndexBuilder::TEvCancelRequest, TEvIndexBuilder::TEvCancelResponse> { public: explicit TTxCancel(TSelf* self, TEvIndexBuilder::TEvCancelRequest::TPtr& ev) - : TTxBase(self) - , Request(ev) - { - } - - TTxType GetTxType() const override { - return TXTYPE_CANCEL_INDEX_BUILD; - } + : TTxSimple(self, ev, TXTYPE_CANCEL_INDEX_BUILD) + {} bool DoExecute(TTransactionContext& txc, const TActorContext&) override { const auto& record = Request->Get()->Record; + LOG_N("DoExecute " << record.ShortDebugString()); - LOG_N("TIndexBuilder::TTxCancel: DoExecute" - << ", Database: " << record.GetDatabaseName() - << ", BuildIndexId: " << record.GetIndexBuildId()); - LOG_D("Message: " << record.ShortDebugString()); - - - auto response = MakeHolder<TEvIndexBuilder::TEvCancelResponse>(record.GetTxId()); + Response = MakeHolder<TEvIndexBuilder::TEvCancelResponse>(record.GetTxId()); TPath database = TPath::Resolve(record.GetDatabaseName(), Self); if (!database.IsResolved()) { return Reply( - std::move(response), Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Database <" << record.GetDatabaseName() << "> not found" - ); + ); } const TPathId domainPathId = database.GetPathIdForDomain(); @@ -51,43 +31,38 @@ public: if (!Self->IndexBuilds.contains(indexBuildId)) { return Reply( - std::move(response), Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Index build process with id <" << indexBuildId << "> not found" - ); + ); } TIndexBuildInfo::TPtr indexBuildInfo = Self->IndexBuilds.at(indexBuildId); if (indexBuildInfo->DomainPathId != domainPathId) { return Reply( - std::move(response), Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Index build process with id <" << indexBuildId << "> not found in database <" << record.GetDatabaseName() << ">" - ); + ); } if (indexBuildInfo->IsFinished()) { return Reply( - std::move(response), Ydb::StatusIds::PRECONDITION_FAILED, TStringBuilder() << "Index build process with id <" << indexBuildId << "> has been finished already" - ); + ); } if (indexBuildInfo->IsCancellationRequested()) { return Reply( - std::move(response), Ydb::StatusIds::PRECONDITION_FAILED, TStringBuilder() << "Index build process with id <" << indexBuildId << "> canceling already" - ); + ); } if (indexBuildInfo->State > TIndexBuildInfo::EState::Filling) { return Reply( - std::move(response), Ydb::StatusIds::PRECONDITION_FAILED, TStringBuilder() << "Index build process with id <" << indexBuildId << "> are almost done, cancellation has no sense" - ); + ); } NIceDb::TNiceDb db(txc.DB); @@ -96,42 +71,14 @@ public: Progress(indexBuildInfo->Id); - return Reply(std::move(response)); + return Reply(); } - bool Reply(THolder<TEvIndexBuilder::TEvCancelResponse> response, - const Ydb::StatusIds::StatusCode status = Ydb::StatusIds::SUCCESS, - const TString& errorMessage = TString()) - { - - LOG_N("TIndexBuilder::TTxCancel: Reply" - << ", BuildIndexId: " << response->Record.GetTxId() - << ", status: " << status - << ", error: " << errorMessage); - LOG_D("Message: " << response->Record.ShortDebugString()); - - auto& record = response->Record; - record.SetStatus(status); - if (errorMessage) { - AddIssue(record.MutableIssues(), errorMessage); - } - - Send(Request->Sender, std::move(response), 0, Request->Cookie); - - return true; - } - - void DoComplete(const TActorContext&) override { - } - -private: - - + void DoComplete(const TActorContext&) override {} }; ITransaction* TSchemeShard::CreateTxCancel(TEvIndexBuilder::TEvCancelRequest::TPtr& ev) { return new TIndexBuilder::TTxCancel(this, ev); } -} // NSchemeShard -} // NKikimr +} // NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp index e6a33ac9ee..6e9f57cecf 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp @@ -4,49 +4,32 @@ #include "schemeshard_impl.h" #include "schemeshard_utils.h" -#include <ydb/public/api/protos/ydb_issue_message.pb.h> -#include <ydb/public/api/protos/ydb_status_codes.pb.h> - namespace NKikimr::NSchemeShard { using namespace NTabletFlatExecutor; -class TSchemeShard::TIndexBuilder::TTxCreate: public TSchemeShard::TIndexBuilder::TTxBase { - TEvIndexBuilder::TEvCreateRequest::TPtr Request; - +class TSchemeShard::TIndexBuilder::TTxCreate: public TSchemeShard::TIndexBuilder::TTxSimple<TEvIndexBuilder::TEvCreateRequest, TEvIndexBuilder::TEvCreateResponse> { public: explicit TTxCreate(TSelf* self, TEvIndexBuilder::TEvCreateRequest::TPtr& ev) - : TSchemeShard::TIndexBuilder::TTxBase(self) - , Request(ev) - { - } - - TTxType GetTxType() const override { - return TXTYPE_CREATE_INDEX_BUILD; - } + : TTxSimple(self, ev, TXTYPE_CREATE_INDEX_BUILD) + {} bool DoExecute(TTransactionContext& txc, const TActorContext& ctx) override { const auto& request = Request->Get()->Record; const auto& settings = request.GetSettings(); + LOG_N("DoExecute " << request.ShortDebugString()); - LOG_N("TIndexBuilder::TTxCreate: DoExecute" - << ", Database: " << request.GetDatabaseName() - << ", BuildIndexId: " << request.GetTxId() - << ", Table: " << request.GetSettings().source_path() - << ", IndexName: " << request.GetSettings().index().name()); - LOG_D("Message: " << request.ShortDebugString()); - - auto response = MakeHolder<TEvIndexBuilder::TEvCreateResponse>(request.GetTxId()); + Response = MakeHolder<TEvIndexBuilder::TEvCreateResponse>(request.GetTxId()); const auto id = TIndexBuildId(request.GetTxId()); if (Self->IndexBuilds.contains(id)) { - return Reply(std::move(response), Ydb::StatusIds::ALREADY_EXISTS, TStringBuilder() + return Reply(Ydb::StatusIds::ALREADY_EXISTS, TStringBuilder() << "Index build with id '" << id << "' already exists"); } const TString& uid = GetUid(request.GetOperationParams().labels()); if (uid && Self->IndexBuildsByUid.contains(uid)) { - return Reply(std::move(response), Ydb::StatusIds::ALREADY_EXISTS, TStringBuilder() + return Reply(Ydb::StatusIds::ALREADY_EXISTS, TStringBuilder() << "Index build with uid '" << uid << "' already exists"); } @@ -62,7 +45,7 @@ public: .NotUnderDomainUpgrade(); if (!checks) { - return Reply(std::move(response), TranslateStatusCode(checks.GetStatus()), checks.GetError()); + return Reply(TranslateStatusCode(checks.GetStatus()), checks.GetError()); } } @@ -75,7 +58,7 @@ public: Self->PersistSubDomainSchemeQuotas(db, subDomainPathId, *subDomainInfo); if (!quotaAcquired) { - return Reply(std::move(response), Ydb::StatusIds::OVERLOADED, + return Reply(Ydb::StatusIds::OVERLOADED, "Request exceeded a limit on the number of schema operations, try again later."); } @@ -93,61 +76,96 @@ public: .IsTheSameDomain(domainPath); if (!checks) { - return Reply(std::move(response), TranslateStatusCode(checks.GetStatus()), checks.GetError()); + return Reply(TranslateStatusCode(checks.GetStatus()), checks.GetError()); } } - const auto indexPath = tablePath.Child(settings.index().name()); - { - const auto checks = indexPath.Check(); - checks - .IsAtLocalSchemeShard(); - - if (indexPath.IsResolved()) { + TIndexBuildInfo::TPtr buildInfo = new TIndexBuildInfo(id, uid); + buildInfo->DomainPathId = domainPath.Base()->PathId; + buildInfo->TablePathId = tablePath.Base()->PathId; + + if (settings.has_index()) { + buildInfo->BuildKind = TIndexBuildInfo::EBuildKind::BuildIndex; + const auto indexPath = tablePath.Child(settings.index().name()); + { + const auto checks = indexPath.Check(); checks - .IsResolved() - .NotUnderDeleting() - .FailOnExist(TPathElement::EPathType::EPathTypeTableIndex, false); - } else { + .IsAtLocalSchemeShard(); + + if (indexPath.IsResolved()) { + checks + .IsResolved() + .NotUnderDeleting() + .FailOnExist(TPathElement::EPathType::EPathTypeTableIndex, false); + } else { + checks + .NotEmpty() + .NotResolved(); + } + checks - .NotEmpty() - .NotResolved(); + .IsValidLeafName() + .PathsLimit(2) // index and impl-table + .DirChildrenLimit() + .ShardsLimit(1); // impl-table + + if (!checks) { + return Reply(TranslateStatusCode(checks.GetStatus()), checks.GetError()); + } } - checks - .IsValidLeafName() - .PathsLimit(2) // index and impl-table - .DirChildrenLimit() - .ShardsLimit(1); // impl-table - - if (!checks) { - return Reply(std::move(response), TranslateStatusCode(checks.GetStatus()), checks.GetError()); + auto tableInfo = Self->Tables.at(tablePath.Base()->PathId); + auto domainInfo = tablePath.DomainInfo(); + + const ui64 aliveIndices = Self->GetAliveChildren( + tablePath.Base(), NKikimrSchemeOp::EPathTypeTableIndex); + + if (aliveIndices + 1 > + domainInfo->GetSchemeLimits().MaxTableIndices) { + return Reply( + Ydb::StatusIds::PRECONDITION_FAILED, + TStringBuilder() + << "indexes count has reached maximum value in the " + "table" + << ", children limit for dir in domain: " + << domainInfo->GetSchemeLimits().MaxTableIndices + << ", intention to create new children: " + << aliveIndices + 1); } - } - auto tableInfo = Self->Tables.at(tablePath.Base()->PathId); - auto domainInfo = tablePath.DomainInfo(); + TString explain; + if (!Prepare(buildInfo, settings, explain)) { + return Reply(Ydb::StatusIds::BAD_REQUEST, + TStringBuilder() + << "Failed item check: " << explain); + } - const ui64 aliveIndices = Self->GetAliveChildren(tablePath.Base(), NKikimrSchemeOp::EPathTypeTableIndex); - if (aliveIndices + 1 > domainInfo->GetSchemeLimits().MaxTableIndices) { - return Reply(std::move(response), Ydb::StatusIds::PRECONDITION_FAILED, TStringBuilder() - << "indexes count has reached maximum value in the table" - << ", children limit for dir in domain: " << domainInfo->GetSchemeLimits().MaxTableIndices - << ", intention to create new children: " << aliveIndices + 1); + NKikimrSchemeOp::TIndexBuildConfig tmpConfig; + buildInfo->SerializeToProto(Self, &tmpConfig); + const auto indexDesc = tmpConfig.GetIndex(); + if (!NTableIndex::CommonCheck(tableInfo, indexDesc, + domainInfo->GetSchemeLimits(), + explain)) { + return Reply(Ydb::StatusIds::BAD_REQUEST, explain); + } } - TIndexBuildInfo::TPtr buildInfo = new TIndexBuildInfo(id, uid); - - TString explain; - if (!Prepare(buildInfo, domainPath, tablePath, settings, explain)) { - return Reply(std::move(response), Ydb::StatusIds::BAD_REQUEST, TStringBuilder() - << "Failed item check: " << explain); + if (settings.has_column_build_operation()) { + // put some validation here for the build + // operation + buildInfo->BuildKind = TIndexBuildInfo::EBuildKind::BuildColumn; + buildInfo->BuildColumns.reserve(settings.column_build_operation().column_size()); + for(int i = 0; i < settings.column_build_operation().column_size(); i++) { + const auto& colInfo = settings.column_build_operation().column(i); + buildInfo->BuildColumns.push_back( + TIndexBuildInfo::TColumnBuildInfo(colInfo.GetColumnName(), colInfo.default_from_literal())); + } } - const auto indexDesc = buildInfo->SerializeToProto(Self).GetIndex(); - if (!NTableIndex::CommonCheck(tableInfo, indexDesc, domainInfo->GetSchemeLimits(), explain)) { - return Reply(std::move(response), Ydb::StatusIds::BAD_REQUEST, explain); - } + buildInfo->Limits.MaxBatchRows = settings.max_batch_rows(); + buildInfo->Limits.MaxBatchBytes = settings.max_batch_bytes(); + buildInfo->Limits.MaxShards = settings.max_shards_in_flight(); + buildInfo->Limits.MaxRetries = settings.max_retries_upload_batch(); Y_VERIFY(buildInfo != nullptr); @@ -156,7 +174,12 @@ public: Self->PersistCreateBuildIndex(db, buildInfo); - buildInfo->State = TIndexBuildInfo::EState::Locking; + if (buildInfo->IsBuildIndex()) { + buildInfo->State = TIndexBuildInfo::EState::Locking; + } else { + buildInfo->State = TIndexBuildInfo::EState::AlterMainTable; + } + Self->PersistBuildIndexState(db, buildInfo); Self->IndexBuilds[id] = buildInfo; @@ -169,36 +192,37 @@ public: return true; } - void DoComplete(const TActorContext&) override { - LOG_D("TIndexBuilder::TTxCreate: DoComplete"); - } + void DoComplete(const TActorContext&) override {} private: - bool Prepare(TIndexBuildInfo::TPtr buildInfo, const TPath& database, const TPath& path, const NKikimrIndexBuilder::TIndexBuildSettings& settings, TString& explain) { - buildInfo->DomainPathId = database.Base()->PathId; - buildInfo->TablePathId = path.Base()->PathId; - - switch (settings.index().type_case()) { - case Ydb::Table::TableIndex::TypeCase::kGlobalIndex: - buildInfo->IndexType = NKikimrSchemeOp::EIndexType::EIndexTypeGlobal; - break; - case Ydb::Table::TableIndex::TypeCase::kGlobalAsyncIndex: - buildInfo->IndexType = NKikimrSchemeOp::EIndexType::EIndexTypeGlobalAsync; - break; - case Ydb::Table::TableIndex::TypeCase::TYPE_NOT_SET: - explain = "invalid or unset index type"; + bool Prepare(TIndexBuildInfo::TPtr buildInfo, const NKikimrIndexBuilder::TIndexBuildSettings& settings, TString& explain) { + if (!settings.has_index() && !settings.has_column_build_operation()) { + explain = "missing index or column to build"; return false; - }; - - buildInfo->IndexName = settings.index().name(); - buildInfo->IndexColumns.assign(settings.index().index_columns().begin(), settings.index().index_columns().end()); - buildInfo->DataColumns.assign(settings.index().data_columns().begin(), settings.index().data_columns().end()); + } - buildInfo->Limits.MaxBatchRows = settings.max_batch_rows(); - buildInfo->Limits.MaxBatchBytes = settings.max_batch_bytes(); - buildInfo->Limits.MaxShards = settings.max_shards_in_flight(); - buildInfo->Limits.MaxRetries = settings.max_retries_upload_batch(); + if (settings.has_index() && settings.has_column_build_operation()) { + explain = "unable to build index and column in the single operation"; + return false; + } + if (settings.has_index()) { + switch (settings.index().type_case()) { + case Ydb::Table::TableIndex::TypeCase::kGlobalIndex: + buildInfo->IndexType = NKikimrSchemeOp::EIndexType::EIndexTypeGlobal; + break; + case Ydb::Table::TableIndex::TypeCase::kGlobalAsyncIndex: + buildInfo->IndexType = NKikimrSchemeOp::EIndexType::EIndexTypeGlobalAsync; + break; + case Ydb::Table::TableIndex::TypeCase::TYPE_NOT_SET: + explain = "invalid or unset index type"; + return false; + }; + + buildInfo->IndexName = settings.index().name(); + buildInfo->IndexColumns.assign(settings.index().index_columns().begin(), settings.index().index_columns().end()); + buildInfo->DataColumns.assign(settings.index().data_columns().begin(), settings.index().data_columns().end()); + } return true; } @@ -210,27 +234,6 @@ private: return it->second; } - - bool Reply(THolder<TEvIndexBuilder::TEvCreateResponse> responseEv, - const Ydb::StatusIds::StatusCode status = Ydb::StatusIds::SUCCESS, - const TString& errorMessage = TString()) - { - LOG_N("TIndexBuilder::TTxCreate: Reply" - << ", BuildIndexId: " << responseEv->Record.GetTxId() - << ", status: " << status - << ", error: " << errorMessage); - LOG_D("Message: " << responseEv->Record.ShortDebugString()); - - auto& record = responseEv->Record; - record.SetStatus(status); - if (errorMessage) { - AddIssue(record.MutableIssues(), errorMessage); - } - - Send(Request->Sender, std::move(responseEv), 0, Request->Cookie); - - return true; - } }; ITransaction* TSchemeShard::CreateTxCreate(TEvIndexBuilder::TEvCreateRequest::TPtr& ev) { diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__forget.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__forget.cpp index 1d77fb728a..8ff1f8607a 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index__forget.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index__forget.cpp @@ -3,46 +3,27 @@ #include "schemeshard_build_index_helpers.h" #include "schemeshard_build_index_tx_base.h" -#include <ydb/public/api/protos/ydb_issue_message.pb.h> -#include <ydb/public/api/protos/ydb_status_codes.pb.h> - - -namespace NKikimr { -namespace NSchemeShard { +namespace NKikimr::NSchemeShard { using namespace NTabletFlatExecutor; -struct TSchemeShard::TIndexBuilder::TTxForget: public TSchemeShard::TIndexBuilder::TTxBase { -private: - TEvIndexBuilder::TEvForgetRequest::TPtr Request; - +struct TSchemeShard::TIndexBuilder::TTxForget: public TSchemeShard::TIndexBuilder::TTxSimple<TEvIndexBuilder::TEvForgetRequest, TEvIndexBuilder::TEvForgetResponse> { public: explicit TTxForget(TSelf* self, TEvIndexBuilder::TEvForgetRequest::TPtr& ev) - : TTxBase(self) - , Request(ev) - { - } - - TTxType GetTxType() const override { - return TXTYPE_FORGET_INDEX_BUILD; - } + : TTxSimple(self, ev, TXTYPE_FORGET_INDEX_BUILD) + {} bool DoExecute(TTransactionContext& txc, const TActorContext&) override { const auto& record = Request->Get()->Record; + LOG_N("DoExecute " << record.ShortDebugString()); - LOG_N("TIndexBuilder::TTxForget: DoExecute" - << ", Database: " << record.GetDatabaseName() - << ", BuildIndexId: " << record.GetIndexBuildId()); - LOG_D("Message: " << record.ShortDebugString()); - - auto response = MakeHolder<TEvIndexBuilder::TEvForgetResponse>(record.GetTxId()); + Response = MakeHolder<TEvIndexBuilder::TEvForgetResponse>(record.GetTxId()); TPath database = TPath::Resolve(record.GetDatabaseName(), Self); if (!database.IsResolved()) { return Reply( - std::move(response), Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Database <" << record.GetDatabaseName() << "> not found" - ); + ); } const TPathId domainPathId = database.GetPathIdForDomain(); @@ -50,27 +31,24 @@ public: if (!Self->IndexBuilds.contains(indexBuildId)) { return Reply( - std::move(response), Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Index build process with id <" << indexBuildId << "> not found" - ); + ); } TIndexBuildInfo::TPtr indexBuildInfo = Self->IndexBuilds.at(indexBuildId); if (indexBuildInfo->DomainPathId != domainPathId) { return Reply( - std::move(response), Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Index build process with id <" << indexBuildId << "> not found in database <" << record.GetDatabaseName() << ">" - ); + ); } if (!indexBuildInfo->IsFinished()) { return Reply( - std::move(response), Ydb::StatusIds::PRECONDITION_FAILED, TStringBuilder() << "Index build process with id <" << indexBuildId << "> hasn't been finished yet" - ); + ); } NIceDb::TNiceDb db(txc.DB); @@ -78,41 +56,14 @@ public: EraseBuildInfo(indexBuildInfo); - return Reply(std::move(response)); + return Reply(); } - bool Reply(THolder<TEvIndexBuilder::TEvForgetResponse> response, - const Ydb::StatusIds::StatusCode status = Ydb::StatusIds::SUCCESS, - const TString& errorMessage = TString()) - { - LOG_N("TIndexBuilder::TTxForget: Reply" - << ", BuildIndexId: " << response->Record.GetTxId() - << ", status: " << status - << ", error: " << errorMessage); - LOG_D("Message: " << response->Record.ShortDebugString()); - - auto& respRecord = response->Record; - respRecord.SetStatus(status); - if (errorMessage) { - AddIssue(respRecord.MutableIssues(), errorMessage); - } - - Send(Request->Sender, std::move(response), 0, Request->Cookie); - - return true; - } - - void DoComplete(const TActorContext&) override { - } - -private: - - + void DoComplete(const TActorContext&) override {} }; ITransaction* TSchemeShard::CreateTxForget(TEvIndexBuilder::TEvForgetRequest::TPtr& ev) { return new TIndexBuilder::TTxForget(this, ev); } -} // NSchemeShard -} // NKikimr +} // NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__get.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__get.cpp index f6292fc85d..2b1cc46a0f 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index__get.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index__get.cpp @@ -3,46 +3,28 @@ #include "schemeshard_build_index_helpers.h" #include "schemeshard_build_index_tx_base.h" -#include <ydb/public/api/protos/ydb_issue_message.pb.h> -#include <ydb/public/api/protos/ydb_status_codes.pb.h> - -namespace NKikimr { -namespace NSchemeShard { +namespace NKikimr::NSchemeShard { using namespace NTabletFlatExecutor; -struct TSchemeShard::TIndexBuilder::TTxGet: public TSchemeShard::TIndexBuilder::TTxBase { -private: - TEvIndexBuilder::TEvGetRequest::TPtr Request; - +struct TSchemeShard::TIndexBuilder::TTxGet: public TSchemeShard::TIndexBuilder::TTxSimple<TEvIndexBuilder::TEvGetRequest, TEvIndexBuilder::TEvGetResponse> { public: explicit TTxGet(TSelf* self, TEvIndexBuilder::TEvGetRequest::TPtr& ev) - : TSchemeShard::TIndexBuilder::TTxBase(self) - , Request(ev) - { - } - - TTxType GetTxType() const override { - return TXTYPE_GET_INDEX_BUILD; - } + : TTxSimple(self, ev, TXTYPE_GET_INDEX_BUILD, false) + {} bool DoExecute(TTransactionContext&, const TActorContext&) override { const auto& record = Request->Get()->Record; + LOG_D("DoExecute " << record.ShortDebugString()); - LOG_D("TIndexBuilder::TTxGet: DoExecute" - << ", Database: " << record.GetDatabaseName() - << ", BuildIndexId: " << record.GetIndexBuildId()); - LOG_T("Message: " << record.ShortDebugString()); - - auto response = MakeHolder<TEvIndexBuilder::TEvGetResponse>(); + Response = MakeHolder<TEvIndexBuilder::TEvGetResponse>(); TPath database = TPath::Resolve(record.GetDatabaseName(), Self); if (!database.IsResolved()) { return Reply( - std::move(response), Ydb::StatusIds::BAD_REQUEST, TStringBuilder() << "Database <" << record.GetDatabaseName() << "> not found" - ); + ); } const TPathId domainPathId = database.GetPathIdForDomain(); @@ -50,60 +32,31 @@ public: if (!Self->IndexBuilds.contains(indexBuildId)) { return Reply( - std::move(response), Ydb::StatusIds::PRECONDITION_FAILED, TStringBuilder() << "Index build process with id <" << indexBuildId << "> not found" - ); + ); } TIndexBuildInfo::TPtr indexBuildInfo = Self->IndexBuilds.at(indexBuildId); if (indexBuildInfo->DomainPathId != domainPathId) { return Reply( - std::move(response), Ydb::StatusIds::BAD_REQUEST, TStringBuilder() << "Index build process with id <" << indexBuildId << "> not found in database <" << record.GetDatabaseName() << ">" - ); + ); } - auto& respRecord = response->Record; + auto& respRecord = Response->Record; respRecord.SetStatus(Ydb::StatusIds::SUCCESS); Fill(*respRecord.MutableIndexBuild(), indexBuildInfo); - return Reply(std::move(response)); - } - - bool Reply(THolder<TEvIndexBuilder::TEvGetResponse> response, - const Ydb::StatusIds::StatusCode status = Ydb::StatusIds::SUCCESS, - const TString& errorMessage = TString()) - { - LOG_D("TIndexBuilder::TTxForget: Reply" - << ", BuildIndexId: " << response->Record.GetIndexBuild().GetId() - << ", status: " << status - << ", error: " << errorMessage); - LOG_T("Message: " << response->Record.ShortDebugString()); - - auto& respRecord = response->Record; - respRecord.SetStatus(status); - if (errorMessage) { - AddIssue(respRecord.MutableIssues(), errorMessage); - } - - Send(Request->Sender, std::move(response), 0, Request->Cookie); - - return true; + return Reply(); } - void DoComplete(const TActorContext&) override { - } - -private: - - + void DoComplete(const TActorContext&) override {} }; ITransaction* TSchemeShard::CreateTxGet(TEvIndexBuilder::TEvGetRequest::TPtr& ev) { return new TIndexBuilder::TTxGet(this, ev); } -} // NSchemeShard -} // NKikimr +} // NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__list.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__list.cpp index b6ceeeeee6..d14f756831 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index__list.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index__list.cpp @@ -3,61 +3,41 @@ #include "schemeshard_build_index_helpers.h" #include "schemeshard_build_index_tx_base.h" -#include <ydb/public/api/protos/ydb_issue_message.pb.h> -#include <ydb/public/api/protos/ydb_status_codes.pb.h> - - -namespace NKikimr { -namespace NSchemeShard { +namespace NKikimr::NSchemeShard { using namespace NTabletFlatExecutor; -struct TSchemeShard::TIndexBuilder::TTxList: public TSchemeShard::TIndexBuilder::TTxBase { +struct TSchemeShard::TIndexBuilder::TTxList: public TSchemeShard::TIndexBuilder::TTxSimple<TEvIndexBuilder::TEvListRequest, TEvIndexBuilder::TEvListResponse> { private: static constexpr ui64 DefaultPageSize = 10; static constexpr ui64 MinPageSize = 1; static constexpr ui64 MaxPageSize = 100; static constexpr ui64 DefaultPage = 1; - - - TEvIndexBuilder::TEvListRequest::TPtr Request; - public: explicit TTxList(TSelf* self, TEvIndexBuilder::TEvListRequest::TPtr& ev) - : TSchemeShard::TIndexBuilder::TTxBase(self) - , Request(ev) - { - } - - TTxType GetTxType() const override { - return TXTYPE_LIST_INDEX_BUILD; - } + : TTxSimple(self, ev, TXTYPE_LIST_INDEX_BUILD, false) + {} bool DoExecute(TTransactionContext&, const TActorContext&) override { const auto& record = Request->Get()->Record; + LOG_D("DoExecute " << record.ShortDebugString()); - LOG_D("TIndexBuilder::TTxList: DoExecute" - << ", Database: " << record.GetDatabaseName()); - LOG_T("Message: " << record.ShortDebugString()); - - auto response = MakeHolder<TEvIndexBuilder::TEvListResponse>(); + Response = MakeHolder<TEvIndexBuilder::TEvListResponse>(); TPath database = TPath::Resolve(record.GetDatabaseName(), Self); if (!database.IsResolved()) { return Reply( - std::move(response), Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Database <" << record.GetDatabaseName() << "> not found" - ); + ); } const TPathId domainPathId = database.GetPathIdForDomain(); ui64 page = DefaultPage; if (record.GetPageToken() && !TryFromString(record.GetPageToken(), page)) { return Reply( - std::move(response), Ydb::StatusIds::BAD_REQUEST, TStringBuilder() << "Unable to parse page token" - ); + ); } page = Max(page, DefaultPage); const ui64 pageSize = Min(record.GetPageSize() ? Max(record.GetPageSize(), MinPageSize) : DefaultPageSize, MaxPageSize); @@ -72,7 +52,7 @@ public: ++it; } - auto& respRecord = response->Record; + auto& respRecord = Response->Record; respRecord.SetStatus(Ydb::StatusIds::SUCCESS); ui64 size = 0; @@ -90,41 +70,14 @@ public: respRecord.SetNextPageToken(ToString(page + 1)); } - return Reply(std::move(response)); - } - - bool Reply(THolder<TEvIndexBuilder::TEvListResponse> response, - const Ydb::StatusIds::StatusCode status = Ydb::StatusIds::SUCCESS, - const TString& errorMessage = TString()) - { - LOG_D("TIndexBuilder::TTxForget: Reply" - << ", BuildIndexCount: " << response->Record.EntriesSize() - << ", status: " << status - << ", error: " << errorMessage); - LOG_T("Message: " << response->Record.ShortDebugString()); - - auto& resp = response->Record; - resp.SetStatus(status); - if (errorMessage) { - AddIssue(resp.MutableIssues(), errorMessage); - } - - Send(Request->Sender, std::move(response), 0, Request->Cookie); - - return true; + return Reply(); } - void DoComplete(const TActorContext&) override { - } - -private: - + void DoComplete(const TActorContext&) override {} }; ITransaction* TSchemeShard::CreateTxList(TEvIndexBuilder::TEvListRequest::TPtr& ev) { return new TIndexBuilder::TTxList(this, ev); } -} // NSchemeShard -} // NKikimr - +} // NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp index 6d51b93e7e..bbd1d1cf56 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp @@ -9,6 +9,7 @@ #include <ydb/public/api/protos/ydb_status_codes.pb.h> #include <ydb/library/yql/public/issue/yql_issue_message.h> +#include <ydb/core/ydb_convert/table_description.h> namespace NKikimr { @@ -30,7 +31,13 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> LockPropose( auto& lockConfig = *modifyScheme.MutableLockConfig(); lockConfig.SetName(path.LeafName()); - *modifyScheme.MutableInitiateIndexBuild() = buildInfo->SerializeToProto(ss); + if (buildInfo->IsBuildIndex()) { + buildInfo->SerializeToProto(ss, modifyScheme.MutableInitiateIndexBuild()); + } else if (buildInfo->IsBuildColumn()) { + buildInfo->SerializeToProto(ss, modifyScheme.MutableInitiateColumnBuild()); + } else { + Y_FAIL("Unknown operation kind while building LockPropose"); + } return propose; } @@ -42,14 +49,59 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> InitiatePropose( propose->Record.SetFailOnExist(true); NKikimrSchemeOp::TModifyScheme& modifyScheme = *propose->Record.AddTransaction(); - modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateIndexBuild); - modifyScheme.SetInternal(true); + if (buildInfo->IsBuildIndex()) { + modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateIndexBuild); + modifyScheme.SetInternal(true); - modifyScheme.SetWorkingDir(TPath::Init(buildInfo->DomainPathId, ss).PathString()); + modifyScheme.SetWorkingDir(TPath::Init(buildInfo->DomainPathId, ss).PathString()); - modifyScheme.MutableLockGuard()->SetOwnerTxId(ui64(buildInfo->LockTxId)); + modifyScheme.MutableLockGuard()->SetOwnerTxId(ui64(buildInfo->LockTxId)); + + buildInfo->SerializeToProto(ss, modifyScheme.MutableInitiateIndexBuild()); + } else if (buildInfo->IsBuildColumn()) { + modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateColumnBuild); + modifyScheme.SetInternal(true); + modifyScheme.SetWorkingDir(TPath::Init(buildInfo->DomainPathId, ss).PathString()); + modifyScheme.MutableLockGuard()->SetOwnerTxId(ui64(buildInfo->LockTxId)); + + buildInfo->SerializeToProto(ss, modifyScheme.MutableInitiateColumnBuild()); + } else { + Y_FAIL("Unknown operation kind while building InitiatePropose"); + } + + return propose; +} + +THolder<TEvSchemeShard::TEvModifySchemeTransaction> AlterMainTablePropose( + TSchemeShard* ss, const TIndexBuildInfo::TPtr buildInfo) +{ + auto propose = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(ui64(buildInfo->AlterMainTableTxId), ss->TabletID()); + propose->Record.SetFailOnExist(true); + + NKikimrSchemeOp::TModifyScheme& modifyScheme = *propose->Record.AddTransaction(); + if (buildInfo->IsBuildColumn()) { + modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterTable); + modifyScheme.SetInternal(true); + modifyScheme.SetWorkingDir(TPath::Init(buildInfo->TablePathId, ss).Parent().PathString()); + modifyScheme.MutableAlterTable()->SetName(TPath::Init(buildInfo->TablePathId, ss).LeafName()); + for(auto& colInfo : buildInfo->BuildColumns) { + auto col = modifyScheme.MutableAlterTable()->AddColumns(); + NScheme::TTypeInfo typeInfo; + TString typeMod; + Ydb::StatusIds::StatusCode status; + TString error; + if (!ExtractColumnTypeInfo(typeInfo, typeMod, colInfo.DefaultFromLiteral.type(), status, error)) { + // todo gvit fix that + Y_FAIL("failed to extract column type info"); + } + + col->SetType(NScheme::TypeName(typeInfo, typeMod)); + col->SetName(colInfo.ColumnName); + } - *modifyScheme.MutableInitiateIndexBuild() = buildInfo->SerializeToProto(ss); + } else { + Y_FAIL("Unknown operation kind while building AlterMainTablePropose"); + } return propose; } @@ -70,7 +122,11 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> ApplyPropose( auto& indexBuild = *modifyScheme.MutableApplyIndexBuild(); indexBuild.SetTablePath(TPath::Init(buildInfo->TablePathId, ss).PathString()); - indexBuild.SetIndexName(buildInfo->IndexName); + + if (buildInfo->IsBuildIndex()) { + indexBuild.SetIndexName(buildInfo->IndexName); + } + indexBuild.SetSnaphotTxId(ui64(buildInfo->InitiateTxId)); indexBuild.SetBuildIndexId(ui64(buildInfo->Id)); @@ -131,14 +187,9 @@ private: public: explicit TTxProgress(TSelf* self, TIndexBuildId id) - : TSchemeShard::TIndexBuilder::TTxBase(self) + : TTxBase(self, TXTYPE_PROGRESS_INDEX_BUILD) , BuildId(id) - { - } - - TTxType GetTxType() const override { - return TXTYPE_PROGRESS_INDEX_BUILD; - } + {} bool DoExecute(TTransactionContext& txc, const TActorContext& ctx) override { Y_VERIFY(Self->IndexBuilds.contains(BuildId)); @@ -153,6 +204,19 @@ public: case TIndexBuildInfo::EState::Invalid: Y_FAIL("Unreachable"); + case TIndexBuildInfo::EState::AlterMainTable: + if (buildInfo->AlterMainTableTxId == InvalidTxId) { + Send(Self->TxAllocatorClient, MakeHolder<TEvTxAllocatorClient::TEvAllocate>(), 0, ui64(BuildId)); + } else if (buildInfo->AlterMainTableTxStatus == NKikimrScheme::StatusSuccess) { + Send(Self->SelfId(), AlterMainTablePropose(Self, buildInfo), 0, ui64(BuildId)); + } else if (!buildInfo->AlterMainTableTxDone) { + Send(Self->SelfId(), MakeHolder<TEvSchemeShard::TEvNotifyTxCompletion>(ui64(buildInfo->AlterMainTableTxId))); + } else { + ChangeState(BuildId, TIndexBuildInfo::EState::Locking); + Progress(BuildId); + } + break; + case TIndexBuildInfo::EState::Locking: if (buildInfo->LockTxId == InvalidTxId) { Send(Self->TxAllocatorClient, MakeHolder<TEvTxAllocatorClient::TEvAllocate>(), 0, ui64(BuildId)); @@ -238,7 +302,7 @@ public: Y_VERIFY(buildInfo->SnapshotStep); } - if (buildInfo->ImplTablePath.Empty()) { + if (buildInfo->ImplTablePath.Empty() && buildInfo->IsBuildIndex()) { TPath implTable = TPath::Init(buildInfo->TablePathId, Self).Dive(buildInfo->IndexName).Dive("indexImplTable"); buildInfo->ImplTablePath = implTable.PathString(); @@ -262,15 +326,23 @@ public: ev->Record.SetOwnerId(buildInfo->TablePathId.OwnerId); ev->Record.SetPathId(buildInfo->TablePathId.LocalPathId); - ev->Record.SetTargetName(buildInfo->ImplTablePath); - - THashSet<TString> columns = buildInfo->ImplTableColumns.Columns; - for (const auto& x: buildInfo->ImplTableColumns.Keys) { - *ev->Record.AddIndexColumns() = x; - columns.erase(x); + if (buildInfo->IsBuildColumn()) { + ev->Record.SetTargetName(TPath::Init(buildInfo->TablePathId, Self).PathString()); + } else if (buildInfo->IsBuildIndex()) { + ev->Record.SetTargetName(buildInfo->ImplTablePath); } - for (const auto& x: columns) { - *ev->Record.AddDataColumns() = x; + + if (buildInfo->IsBuildIndex()) { + THashSet<TString> columns = buildInfo->ImplTableColumns.Columns; + for (const auto& x: buildInfo->ImplTableColumns.Keys) { + *ev->Record.AddIndexColumns() = x; + columns.erase(x); + } + for (const auto& x: columns) { + *ev->Record.AddDataColumns() = x; + } + } else if (buildInfo->IsBuildColumn()) { + buildInfo->SerializeToProto(Self, ev->Record.MutableColumnBuildSettings()); } TIndexBuildInfo::TShardStatus& shardStatus = buildInfo->Shards.at(shardIdx); @@ -442,16 +514,12 @@ private: public: explicit TTxBilling(TSelf* self, TEvPrivate::TEvIndexBuildingMakeABill::TPtr& ev) - : TSchemeShard::TIndexBuilder::TTxBase(self) + : TTxBase(self, TXTYPE_PROGRESS_INDEX_BUILD) , BuildIndexId(ev->Get()->BuildId) , ScheduledAt(ev->Get()->SendAt) { } - TTxType GetTxType() const override { - return TXTYPE_MAKEBILL_INDEX_BUILD; - } - bool DoExecute(TTransactionContext& , const TActorContext& ctx) override { LOG_I("TTxReply : TTxBilling" << ", buildIndexId# " << BuildIndexId); @@ -491,40 +559,35 @@ private: public: explicit TTxReply(TSelf* self, TEvTxAllocatorClient::TEvAllocateResult::TPtr& allocateResult) - : TSchemeShard::TIndexBuilder::TTxBase(self) + : TTxBase(self, TXTYPE_PROGRESS_INDEX_BUILD) , AllocateResult(allocateResult) { } explicit TTxReply(TSelf* self, TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& modifyResult) - : TSchemeShard::TIndexBuilder::TTxBase(self) + : TTxBase(self, TXTYPE_PROGRESS_INDEX_BUILD) , ModifyResult(modifyResult) { } explicit TTxReply(TSelf* self, TTxId completedTxId) - : TSchemeShard::TIndexBuilder::TTxBase(self) + : TTxBase(self, TXTYPE_PROGRESS_INDEX_BUILD) , CompletedTxId(completedTxId) { } explicit TTxReply(TSelf* self, TEvDataShard::TEvBuildIndexProgressResponse::TPtr& shardProgress) - : TSchemeShard::TIndexBuilder::TTxBase(self) + : TTxBase(self, TXTYPE_PROGRESS_INDEX_BUILD) , ShardProgress(shardProgress) { } explicit TTxReply(TSelf* self, TIndexBuildId buildId, TTabletId tabletId) - : TSchemeShard::TIndexBuilder::TTxBase(self) + : TSchemeShard::TIndexBuilder::TTxBase(self, TXTYPE_PROGRESS_INDEX_BUILD) , PipeRetry({buildId, tabletId}) { } - - TTxType GetTxType() const override { - return TXTYPE_PROGRESS_INDEX_BUILD; - } - bool DoExecute(TTransactionContext& txc, const TActorContext& ctx) override { if (AllocateResult) { return OnAllocation(txc, ctx); @@ -565,6 +628,7 @@ public: << ", TIndexBuildInfo: " << *buildInfo); switch (buildInfo->State) { + case TIndexBuildInfo::EState::AlterMainTable: case TIndexBuildInfo::EState::Invalid: case TIndexBuildInfo::EState::Locking: case TIndexBuildInfo::EState::GatheringStatistics: @@ -632,6 +696,7 @@ public: } switch (buildInfo->State) { + case TIndexBuildInfo::EState::AlterMainTable: case TIndexBuildInfo::EState::Invalid: case TIndexBuildInfo::EState::Locking: case TIndexBuildInfo::EState::GatheringStatistics: @@ -796,6 +861,16 @@ public: case TIndexBuildInfo::EState::Invalid: Y_FAIL("Unreachable"); + case TIndexBuildInfo::EState::AlterMainTable: + { + Y_VERIFY(txId == buildInfo->AlterMainTableTxId); + + buildInfo->AlterMainTableTxDone = true; + NIceDb::TNiceDb db(txc.DB); + Self->PersistBuildIndexAlterMainTableTxDone(db, buildInfo); + break; + } + case TIndexBuildInfo::EState::Locking: { Y_VERIFY(txId == buildInfo->LockTxId); @@ -950,6 +1025,31 @@ public: case TIndexBuildInfo::EState::Invalid: Y_FAIL("Unreachable"); + case TIndexBuildInfo::EState::AlterMainTable: + { + Y_VERIFY(txId == buildInfo->AlterMainTableTxId); + + buildInfo->AlterMainTableTxStatus = record.GetStatus(); + NIceDb::TNiceDb db(txc.DB); + Self->PersistBuildIndexAlterMainTableTxStatus(db, buildInfo); + + auto statusCode = TranslateStatusCode(record.GetStatus()); + + if (statusCode != Ydb::StatusIds::SUCCESS) { + buildInfo->Issue += TStringBuilder() + << "At alter main table state got unsuccess propose result" + << ", status: " << NKikimrScheme::EStatus_Name(buildInfo->AlterMainTableTxStatus) + << ", reason: " << record.GetReason(); + Self->PersistBuildIndexIssue(db, buildInfo); + NIceDb::TNiceDb db(txc.DB); + Self->PersistBuildIndexForget(db, buildInfo); + EraseBuildInfo(buildInfo); + } + + ReplyOnCreation(buildInfo, statusCode); + break; + } + case TIndexBuildInfo::EState::Locking: { Y_VERIFY(txId == buildInfo->LockTxId); @@ -1181,6 +1281,15 @@ public: case TIndexBuildInfo::EState::Invalid: Y_FAIL("Unreachable"); + case TIndexBuildInfo::EState::AlterMainTable: + if (!buildInfo->AlterMainTableTxId) { + buildInfo->AlterMainTableTxId = txId; + NIceDb::TNiceDb db(txc.DB); + Self->PersistBuildIndexAlterMainTableTxId(db, buildInfo); + + } + break; + case TIndexBuildInfo::EState::Locking: if (!buildInfo->LockTxId) { buildInfo->LockTxId = txId; diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index_helpers.h b/ydb/core/tx/schemeshard/schemeshard_build_index_helpers.h index 4d8fea7309..a0b0f53fbe 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index_helpers.h +++ b/ydb/core/tx/schemeshard/schemeshard_build_index_helpers.h @@ -11,10 +11,9 @@ # error log macro redefinition #endif -#define LOG_T(stream) LOG_TRACE_S((TlsActivationContext->AsActorContext()), NKikimrServices::BUILD_INDEX, stream) -#define LOG_D(stream) LOG_DEBUG_S((TlsActivationContext->AsActorContext()), NKikimrServices::BUILD_INDEX, stream) -#define LOG_I(stream) LOG_INFO_S((TlsActivationContext->AsActorContext()), NKikimrServices::BUILD_INDEX, stream) -#define LOG_N(stream) LOG_NOTICE_S((TlsActivationContext->AsActorContext()), NKikimrServices::BUILD_INDEX, stream) -#define LOG_W(stream) LOG_WARN_S((TlsActivationContext->AsActorContext()), NKikimrServices::BUILD_INDEX, stream) -#define LOG_E(stream) LOG_ERROR_S((TlsActivationContext->AsActorContext()), NKikimrServices::BUILD_INDEX, stream) - +#define LOG_T(stream) LOG_TRACE_S((TlsActivationContext->AsActorContext()), NKikimrServices::BUILD_INDEX, LogPrefix << stream) +#define LOG_D(stream) LOG_DEBUG_S((TlsActivationContext->AsActorContext()), NKikimrServices::BUILD_INDEX, LogPrefix << stream) +#define LOG_I(stream) LOG_INFO_S((TlsActivationContext->AsActorContext()), NKikimrServices::BUILD_INDEX, LogPrefix << stream) +#define LOG_N(stream) LOG_NOTICE_S((TlsActivationContext->AsActorContext()), NKikimrServices::BUILD_INDEX, LogPrefix << stream) +#define LOG_W(stream) LOG_WARN_S((TlsActivationContext->AsActorContext()), NKikimrServices::BUILD_INDEX, LogPrefix << stream) +#define LOG_E(stream) LOG_ERROR_S((TlsActivationContext->AsActorContext()), NKikimrServices::BUILD_INDEX, LogPrefix << stream) diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp index 8b2f74aa19..64312685e9 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp @@ -199,6 +199,7 @@ void TSchemeShard::TIndexBuilder::TTxBase::Fill(NKikimrIndexBuilder::TIndexBuild } switch (indexInfo->State) { + case TIndexBuildInfo::EState::AlterMainTable: case TIndexBuildInfo::EState::Locking: case TIndexBuildInfo::EState::GatheringStatistics: case TIndexBuildInfo::EState::Initiating: @@ -247,39 +248,49 @@ void TSchemeShard::TIndexBuilder::TTxBase::Fill(NKikimrIndexBuilder::TIndexBuild Fill(*index.MutableSettings(), indexInfo); } -void TSchemeShard::TIndexBuilder::TTxBase::Fill(NKikimrIndexBuilder::TIndexBuildSettings& settings, const TIndexBuildInfo::TPtr indexInfo) { - TPath table = TPath::Init(indexInfo->TablePathId, Self); +void TSchemeShard::TIndexBuilder::TTxBase::Fill(NKikimrIndexBuilder::TIndexBuildSettings& settings, const TIndexBuildInfo::TPtr info) { + TPath table = TPath::Init(info->TablePathId, Self); settings.set_source_path(table.PathString()); - Ydb::Table::TableIndex& index = *settings.mutable_index(); - index.set_name(indexInfo->IndexName); - - *index.mutable_index_columns() = { - indexInfo->IndexColumns.begin(), - indexInfo->IndexColumns.end() - }; + if (info->IsBuildIndex()) { + Ydb::Table::TableIndex& index = *settings.mutable_index(); + index.set_name(info->IndexName); + + *index.mutable_index_columns() = { + info->IndexColumns.begin(), + info->IndexColumns.end() + }; + + *index.mutable_data_columns() = { + info->DataColumns.begin(), + info->DataColumns.end() + }; + + switch (info->IndexType) { + case NKikimrSchemeOp::EIndexType::EIndexTypeGlobal: + case NKikimrSchemeOp::EIndexType::EIndexTypeGlobalUnique: + *index.mutable_global_index() = Ydb::Table::GlobalIndex(); + break; + case NKikimrSchemeOp::EIndexType::EIndexTypeGlobalAsync: + *index.mutable_global_async_index() = Ydb::Table::GlobalAsyncIndex(); + break; + case NKikimrSchemeOp::EIndexType::EIndexTypeInvalid: + Y_FAIL("Unreachable"); + }; + } - *index.mutable_data_columns() = { - indexInfo->DataColumns.begin(), - indexInfo->DataColumns.end() - }; + if (info->IsBuildColumn()) { + for(const auto& column : info->BuildColumns) { + auto* columnProto = settings.mutable_column_build_operation()->add_column(); + columnProto->SetColumnName(column.ColumnName); + columnProto->mutable_default_from_literal()->CopyFrom(column.DefaultFromLiteral); + } + } - switch (indexInfo->IndexType) { - case NKikimrSchemeOp::EIndexType::EIndexTypeGlobal: - case NKikimrSchemeOp::EIndexType::EIndexTypeGlobalUnique: - *index.mutable_global_index() = Ydb::Table::GlobalIndex(); - break; - case NKikimrSchemeOp::EIndexType::EIndexTypeGlobalAsync: - *index.mutable_global_async_index() = Ydb::Table::GlobalAsyncIndex(); - break; - case NKikimrSchemeOp::EIndexType::EIndexTypeInvalid: - Y_FAIL("Unreachable"); - }; - - settings.set_max_batch_bytes(indexInfo->Limits.MaxBatchBytes); - settings.set_max_batch_rows(indexInfo->Limits.MaxBatchRows); - settings.set_max_shards_in_flight(indexInfo->Limits.MaxShards); - settings.set_max_retries_upload_batch(indexInfo->Limits.MaxRetries); + settings.set_max_batch_bytes(info->Limits.MaxBatchBytes); + settings.set_max_batch_rows(info->Limits.MaxBatchRows); + settings.set_max_shards_in_flight(info->Limits.MaxShards); + settings.set_max_retries_upload_batch(info->Limits.MaxRetries); } void TSchemeShard::TIndexBuilder::TTxBase::AddIssue(::google::protobuf::RepeatedPtrField<::Ydb::Issue::IssueMessage>* issues, @@ -315,6 +326,7 @@ void TSchemeShard::TIndexBuilder::TTxBase::EraseBuildInfo(const TIndexBuildInfo: Self->TxIdToIndexBuilds.erase(indexBuildInfo->InitiateTxId); Self->TxIdToIndexBuilds.erase(indexBuildInfo->ApplyTxId); Self->TxIdToIndexBuilds.erase(indexBuildInfo->UnlockTxId); + Self->TxIdToIndexBuilds.erase(indexBuildInfo->AlterMainTableTxId); } Ydb::StatusIds::StatusCode TSchemeShard::TIndexBuilder::TTxBase::TranslateStatusCode(NKikimrScheme::EStatus status) { diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.h b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.h index 4542a8c7d4..bf89dddac9 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.h +++ b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.h @@ -2,13 +2,19 @@ #include "schemeshard_impl.h" +#include <ydb/public/api/protos/ydb_issue_message.pb.h> +#include <ydb/public/api/protos/ydb_status_codes.pb.h> + namespace NKikimr { namespace NSchemeShard { class TSchemeShard::TIndexBuilder::TTxBase: public NTabletFlatExecutor::TTransactionBase<TSchemeShard> { private: TSideEffects SideEffects; - + const NKikimr::NSchemeShard::ETxTypes TxType; +public: + const TString LogPrefix; +private: using TChangeStateRec = std::tuple<TIndexBuildId, TIndexBuildInfo::EState>; TDeque<TChangeStateRec> StateChanges; using TBillingEventSchedule = std::tuple<TIndexBuildId, TDuration>; @@ -41,10 +47,14 @@ protected: bool GotScheduledBilling(const TIndexBuildInfo::TPtr& indexBuildInfo); public: - explicit TTxBase(TSelf* self) + explicit TTxBase(TSelf* self, NKikimr::NSchemeShard::ETxTypes txType) : TBase(self) + , TxType(txType) + , LogPrefix(TStringBuilder() << "TIndexBuilder::" << NKikimr::NSchemeShard::ETxTypes_Name(txType) << ": ") { } + TTxType GetTxType() const override { return TxType; } + virtual ~TTxBase() = default; virtual bool DoExecute(TTransactionContext& txc, const TActorContext& ctx) = 0; @@ -54,5 +64,38 @@ public: void Complete(const TActorContext& ctx) override; }; +template<typename TRequest, typename TResponse> +class TSchemeShard::TIndexBuilder::TTxSimple : public TSchemeShard::TIndexBuilder::TTxBase { +public: + typename TRequest::TPtr Request; + THolder<TResponse> Response; + const bool IsMutableOperation; + + explicit TTxSimple(TSelf* self, typename TRequest::TPtr& ev, NKikimr::NSchemeShard::ETxTypes txType, bool isMutableOperation = true) + : TTxBase(self, txType) + , Request(ev) + , IsMutableOperation(isMutableOperation) + { } + + bool Reply(const Ydb::StatusIds::StatusCode status = Ydb::StatusIds::SUCCESS, const TString& errorMessage = TString()) + { + Y_VERIFY(Response); + auto& record = Response->Record; + record.SetStatus(status); + if (errorMessage) { + AddIssue(record.MutableIssues(), errorMessage); + } + + if (IsMutableOperation) { + LOG_N("Reply " << Response->Record.ShortDebugString()); + } else { + LOG_D("Reply " << Response->Record.ShortDebugString()); + } + + Send(Request->Sender, std::move(Response), 0, Request->Cookie); + return true; + } +}; + } // NSchemeShard } // NKikimr diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 6bcc2aa3d7..794b836be0 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -1155,6 +1155,10 @@ public: void PersistBuildIndexIssue(NIceDb::TNiceDb& db, const TIndexBuildInfo::TPtr indexInfo); void PersistBuildIndexCancelRequest(NIceDb::TNiceDb& db, const TIndexBuildInfo::TPtr indexInfo); + void PersistBuildIndexAlterMainTableTxId(NIceDb::TNiceDb& db, const TIndexBuildInfo::TPtr indexInfo); + void PersistBuildIndexAlterMainTableTxStatus(NIceDb::TNiceDb& db, const TIndexBuildInfo::TPtr indexInfo); + void PersistBuildIndexAlterMainTableTxDone(NIceDb::TNiceDb& db, const TIndexBuildInfo::TPtr indexInfo); + void PersistBuildIndexInitiateTxId(NIceDb::TNiceDb& db, const TIndexBuildInfo::TPtr indexInfo); void PersistBuildIndexInitiateTxStatus(NIceDb::TNiceDb& db, const TIndexBuildInfo::TPtr indexInfo); void PersistBuildIndexInitiateTxDone(NIceDb::TNiceDb& db, const TIndexBuildInfo::TPtr indexInfo); @@ -1180,6 +1184,9 @@ public: struct TIndexBuilder { class TTxBase; + template<typename TRequest, typename TResponse> + class TTxSimple; + class TTxCreate; struct TTxGet; struct TTxCancel; diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp index f7785c87ba..e338383eef 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp @@ -1795,11 +1795,11 @@ TIndexBuildInfo::TShardStatus::TShardStatus(TSerializedTableRange range, TString , LastKeyAck(std::move(lastKeyAck)) {} -NKikimrSchemeOp::TIndexBuildConfig TIndexBuildInfo::SerializeToProto(TSchemeShard* ss) const { - NKikimrSchemeOp::TIndexBuildConfig result; - result.SetTable(TPath::Init(TablePathId, ss).PathString()); +void TIndexBuildInfo::SerializeToProto(TSchemeShard* ss, NKikimrSchemeOp::TIndexBuildConfig* result) const { + Y_VERIFY(IsBuildIndex()); + result->SetTable(TPath::Init(TablePathId, ss).PathString()); - auto& index = *result.MutableIndex(); + auto& index = *result->MutableIndex(); index.SetName(IndexName); index.SetType(IndexType); @@ -1810,8 +1810,14 @@ NKikimrSchemeOp::TIndexBuildConfig TIndexBuildInfo::SerializeToProto(TSchemeShar for (const auto& x : DataColumns) { *index.AddDataColumnNames() = x; } +} - return result; +void TIndexBuildInfo::SerializeToProto(TSchemeShard* ss, NKikimrIndexBuilder::TColumnBuildSettings* result) const { + Y_VERIFY(IsBuildColumn()); + result->SetTable(TPath::Init(TablePathId, ss).PathString()); + for(const auto& column : BuildColumns) { + column.SerializeToProto(result->add_column()); + } } TColumnFamiliesMerger::TColumnFamiliesMerger(NKikimrSchemeOp::TPartitionConfig &container) diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 2df40b4ea1..2447715ae6 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -2922,6 +2922,7 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> { enum class EState: ui32 { Invalid = 0, + AlterMainTable = 5, Locking = 10, GatheringStatistics = 20, Initiating = 30, @@ -2939,6 +2940,34 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> { Rejected = 550 }; + struct TColumnBuildInfo { + TString ColumnName; + Ydb::TypedValue DefaultFromLiteral; + + TColumnBuildInfo(const TString& name, const TString& serializedLiteral) + : ColumnName(name) + { + Y_VERIFY(DefaultFromLiteral.ParseFromString(serializedLiteral)); + } + + TColumnBuildInfo(const TString& name, const Ydb::TypedValue& defaultFromLiteral) + : ColumnName(name) + , DefaultFromLiteral(defaultFromLiteral) + { + } + + void SerializeToProto(NKikimrIndexBuilder::TColumnBuildSetting* setting) const { + setting->SetColumnName(ColumnName); + setting->mutable_default_from_literal()->CopyFrom(DefaultFromLiteral); + } + }; + + enum class EBuildKind : ui32 { + BuildKindUnspecified = 0, + BuildIndex = 10, + BuildColumn = 20 + }; + TActorId CreateSender; ui64 SenderCookie = 0; @@ -2949,9 +2978,13 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> { TPathId TablePathId; NKikimrSchemeOp::EIndexType IndexType = NKikimrSchemeOp::EIndexTypeInvalid; + EBuildKind BuildKind = EBuildKind::BuildKindUnspecified; + TString IndexName; TVector<TString> IndexColumns; TVector<TString> DataColumns; + + TVector<TColumnBuildInfo> BuildColumns; TString ImplTablePath; NTableIndex::TTableColumns ImplTableColumns; @@ -2963,6 +2996,10 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> { bool CancelRequested = false; + TTxId AlterMainTableTxId = TTxId(); + NKikimrScheme::EStatus AlterMainTableTxStatus = NKikimrScheme::StatusSuccess; + bool AlterMainTableTxDone = false; + TTxId LockTxId = TTxId(); NKikimrScheme::EStatus LockTxStatus = NKikimrScheme::StatusSuccess; bool LockTxDone = false; @@ -3030,7 +3067,6 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> { TBillingStats Processed; TBillingStats Billed; - TIndexBuildInfo(TIndexBuildId id, TString uid) : Id(id) , Uid(uid) @@ -3040,6 +3076,14 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> { return CancelRequested; } + bool IsBuildIndex() const { + return BuildKind == EBuildKind::BuildIndex; + } + + bool IsBuildColumn() const { + return BuildKind == EBuildKind::BuildColumn; + } + bool IsDone() const { return State == EState::Done; } @@ -3066,7 +3110,9 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> { return 0.0; } - NKikimrSchemeOp::TIndexBuildConfig SerializeToProto(TSchemeShard* ss) const; + void SerializeToProto(TSchemeShard* ss, NKikimrIndexBuilder::TColumnBuildSettings* to) const; + void SerializeToProto(TSchemeShard* ss, NKikimrSchemeOp::TIndexBuildConfig* to) const; + }; struct TExternalTableInfo: TSimpleRefCount<TExternalTableInfo> { @@ -3142,6 +3188,10 @@ inline void Out<NKikimr::NSchemeShard::TIndexBuildInfo> o << ", CreateSender: " << info.CreateSender.ToString(); + o << ", AlterMainTableTxId: " << info.AlterMainTableTxId; + o << ", AlterMainTableTxStatus: " << NKikimrScheme::EStatus_Name(info.AlterMainTableTxStatus); + o << ", AlterMainTableTxDone: " << info.AlterMainTableTxDone; + o << ", LockTxId: " << info.LockTxId; o << ", LockTxStatus: " << NKikimrScheme::EStatus_Name(info.LockTxStatus); o << ", LockTxDone: " << info.LockTxDone; diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h index 1a2f6e1e5e..cbf39666e1 100644 --- a/ydb/core/tx/schemeshard/schemeshard_schema.h +++ b/ydb/core/tx/schemeshard/schemeshard_schema.h @@ -1266,6 +1266,12 @@ struct Schema : NIceDb::Schema { struct RowsBilled : Column<28, NScheme::NTypeIds::Uint64> {}; struct BytesBilled : Column<29, NScheme::NTypeIds::Uint64> {}; + struct BuildKind : Column<30, NScheme::NTypeIds::Uint32> {}; + + struct AlterMainTableTxId : Column<31, NScheme::NTypeIds::Uint64> { using Type = TTxId; }; + struct AlterMainTableTxStatus : Column<32, NScheme::NTypeIds::Uint32> { using Type = NKikimrScheme::EStatus; }; + struct AlterMainTableTxDone : Column<33, NScheme::NTypeIds::Bool> {}; + using TKey = TableKey<Id>; using TColumns = TableColumns< Id, @@ -1296,7 +1302,11 @@ struct Schema : NIceDb::Schema { CancelRequest, MaxRetries, RowsBilled, - BytesBilled + BytesBilled, + BuildKind, + AlterMainTableTxId, + AlterMainTableTxStatus, + AlterMainTableTxDone >; }; @@ -1699,6 +1709,21 @@ struct Schema : NIceDb::Schema { using TColumns = TableColumns<PathId, SeqNoGeneration, SeqNoRound, DataSize, UsedReserveSize>; }; + struct BuildColumnOperationSettings : Table<107> { + struct Id : Column<1, NScheme::NTypeIds::Uint64> { using Type = TIndexBuildId; }; + struct ColumnNo : Column<2, NScheme::NTypeIds::Uint64> {}; + struct ColumnName : Column<3, NScheme::NTypeIds::Utf8> {}; + struct DefaultFromLiteral : Column<4, NScheme::NTypeIds::String> {}; + + using TKey = TableKey<Id, ColumnNo>; + using TColumns = TableColumns< + Id, + ColumnNo, + ColumnName, + DefaultFromLiteral + >; + }; + using TTables = SchemaTables< Paths, TxInFlight, @@ -1804,7 +1829,8 @@ struct Schema : NIceDb::Schema { CdcStreamScanShardStatus, ExternalTable, ExternalDataSource, - PersQueueGroupStats + PersQueueGroupStats, + BuildColumnOperationSettings >; static constexpr ui64 SysParam_NextPathId = 1; diff --git a/ydb/core/tx/schemeshard/ut_column_build/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/schemeshard/ut_column_build/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..cbb1cb24fe --- /dev/null +++ b/ydb/core/tx/schemeshard/ut_column_build/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,84 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-tx-schemeshard-ut_column_build) +target_compile_options(ydb-core-tx-schemeshard-ut_column_build PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-core-tx-schemeshard-ut_column_build PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard +) +target_link_libraries(ydb-core-tx-schemeshard-ut_column_build PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + core-tx-schemeshard + library-cpp-getopt + cpp-regex-pcre + library-cpp-svnversion + ydb-core-metering + core-testlib-default + ydb-core-tx + tx-schemeshard-ut_helpers +) +target_link_options(ydb-core-tx-schemeshard-ut_column_build PRIVATE + -Wl,-platform_version,macos,11.0,11.0 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(ydb-core-tx-schemeshard-ut_column_build PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/ut_column_build/ut_column_build.cpp +) +set_property( + TARGET + ydb-core-tx-schemeshard-ut_column_build + PROPERTY + SPLIT_FACTOR + 10 +) +add_yunittest( + NAME + ydb-core-tx-schemeshard-ut_column_build + TEST_TARGET + ydb-core-tx-schemeshard-ut_column_build + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-core-tx-schemeshard-ut_column_build + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-core-tx-schemeshard-ut_column_build + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-core-tx-schemeshard-ut_column_build + PROPERTY + TIMEOUT + 600 +) +target_allocator(ydb-core-tx-schemeshard-ut_column_build + system_allocator +) +vcs_info(ydb-core-tx-schemeshard-ut_column_build) diff --git a/ydb/core/tx/schemeshard/ut_column_build/CMakeLists.linux-aarch64.txt b/ydb/core/tx/schemeshard/ut_column_build/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..557f2dcbbb --- /dev/null +++ b/ydb/core/tx/schemeshard/ut_column_build/CMakeLists.linux-aarch64.txt @@ -0,0 +1,87 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-tx-schemeshard-ut_column_build) +target_compile_options(ydb-core-tx-schemeshard-ut_column_build PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-core-tx-schemeshard-ut_column_build PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard +) +target_link_libraries(ydb-core-tx-schemeshard-ut_column_build PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-testing-unittest_main + core-tx-schemeshard + library-cpp-getopt + cpp-regex-pcre + library-cpp-svnversion + ydb-core-metering + core-testlib-default + ydb-core-tx + tx-schemeshard-ut_helpers +) +target_link_options(ydb-core-tx-schemeshard-ut_column_build PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-core-tx-schemeshard-ut_column_build PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/ut_column_build/ut_column_build.cpp +) +set_property( + TARGET + ydb-core-tx-schemeshard-ut_column_build + PROPERTY + SPLIT_FACTOR + 10 +) +add_yunittest( + NAME + ydb-core-tx-schemeshard-ut_column_build + TEST_TARGET + ydb-core-tx-schemeshard-ut_column_build + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-core-tx-schemeshard-ut_column_build + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-core-tx-schemeshard-ut_column_build + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-core-tx-schemeshard-ut_column_build + PROPERTY + TIMEOUT + 600 +) +target_allocator(ydb-core-tx-schemeshard-ut_column_build + cpp-malloc-jemalloc +) +vcs_info(ydb-core-tx-schemeshard-ut_column_build) diff --git a/ydb/core/tx/schemeshard/ut_column_build/CMakeLists.linux-x86_64.txt b/ydb/core/tx/schemeshard/ut_column_build/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..33c7cbbf2a --- /dev/null +++ b/ydb/core/tx/schemeshard/ut_column_build/CMakeLists.linux-x86_64.txt @@ -0,0 +1,89 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-tx-schemeshard-ut_column_build) +target_compile_options(ydb-core-tx-schemeshard-ut_column_build PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-core-tx-schemeshard-ut_column_build PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard +) +target_link_libraries(ydb-core-tx-schemeshard-ut_column_build PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + core-tx-schemeshard + library-cpp-getopt + cpp-regex-pcre + library-cpp-svnversion + ydb-core-metering + core-testlib-default + ydb-core-tx + tx-schemeshard-ut_helpers +) +target_link_options(ydb-core-tx-schemeshard-ut_column_build PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-core-tx-schemeshard-ut_column_build PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/ut_column_build/ut_column_build.cpp +) +set_property( + TARGET + ydb-core-tx-schemeshard-ut_column_build + PROPERTY + SPLIT_FACTOR + 10 +) +add_yunittest( + NAME + ydb-core-tx-schemeshard-ut_column_build + TEST_TARGET + ydb-core-tx-schemeshard-ut_column_build + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-core-tx-schemeshard-ut_column_build + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-core-tx-schemeshard-ut_column_build + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-core-tx-schemeshard-ut_column_build + PROPERTY + TIMEOUT + 600 +) +target_allocator(ydb-core-tx-schemeshard-ut_column_build + cpp-malloc-tcmalloc + libs-tcmalloc-no_percpu_cache +) +vcs_info(ydb-core-tx-schemeshard-ut_column_build) diff --git a/ydb/core/tx/schemeshard/ut_column_build/CMakeLists.txt b/ydb/core/tx/schemeshard/ut_column_build/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/core/tx/schemeshard/ut_column_build/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/tx/schemeshard/ut_column_build/CMakeLists.windows-x86_64.txt b/ydb/core/tx/schemeshard/ut_column_build/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..8c045b2320 --- /dev/null +++ b/ydb/core/tx/schemeshard/ut_column_build/CMakeLists.windows-x86_64.txt @@ -0,0 +1,77 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-tx-schemeshard-ut_column_build) +target_compile_options(ydb-core-tx-schemeshard-ut_column_build PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-core-tx-schemeshard-ut_column_build PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard +) +target_link_libraries(ydb-core-tx-schemeshard-ut_column_build PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + core-tx-schemeshard + library-cpp-getopt + cpp-regex-pcre + library-cpp-svnversion + ydb-core-metering + core-testlib-default + ydb-core-tx + tx-schemeshard-ut_helpers +) +target_sources(ydb-core-tx-schemeshard-ut_column_build PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/ut_column_build/ut_column_build.cpp +) +set_property( + TARGET + ydb-core-tx-schemeshard-ut_column_build + PROPERTY + SPLIT_FACTOR + 10 +) +add_yunittest( + NAME + ydb-core-tx-schemeshard-ut_column_build + TEST_TARGET + ydb-core-tx-schemeshard-ut_column_build + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-core-tx-schemeshard-ut_column_build + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-core-tx-schemeshard-ut_column_build + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-core-tx-schemeshard-ut_column_build + PROPERTY + TIMEOUT + 600 +) +target_allocator(ydb-core-tx-schemeshard-ut_column_build + system_allocator +) +vcs_info(ydb-core-tx-schemeshard-ut_column_build) diff --git a/ydb/core/tx/schemeshard/ut_column_build/ut_column_build.cpp b/ydb/core/tx/schemeshard/ut_column_build/ut_column_build.cpp new file mode 100644 index 0000000000..ecbe2aba1e --- /dev/null +++ b/ydb/core/tx/schemeshard/ut_column_build/ut_column_build.cpp @@ -0,0 +1,157 @@ +#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h> +#include <ydb/core/tx/schemeshard/schemeshard_billing_helpers.h> +#include <ydb/core/testlib/tablet_helpers.h> + +#include <ydb/core/tx/datashard/datashard.h> +#include <ydb/core/metering/metering.h> + +using namespace NKikimr; +using namespace NSchemeShard; +using namespace NSchemeShardUT_Private; + +Y_UNIT_TEST_SUITE(IndexBuildTest) { + Y_UNIT_TEST(BaseCase) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + TestCreateExtSubDomain(runtime, ++txId, "/MyRoot", + "Name: \"ResourceDB\""); + env.TestWaitNotification(runtime, txId); + + TestAlterExtSubDomain(runtime, ++txId, "/MyRoot", + "StoragePools { " + " Name: \"pool-1\" " + " Kind: \"pool-kind-1\" " + "} " + "StoragePools { " + " Name: \"pool-2\" " + " Kind: \"pool-kind-2\" " + "} " + "PlanResolution: 50 " + "Coordinators: 1 " + "Mediators: 1 " + "TimeCastBucketsPerMediator: 2 " + "ExternalSchemeShard: true " + "Name: \"ResourceDB\""); + env.TestWaitNotification(runtime, txId); + + const auto attrs = AlterUserAttrs({ + {"cloud_id", "CLOUD_ID_VAL"}, + {"folder_id", "FOLDER_ID_VAL"}, + {"database_id", "DATABASE_ID_VAL"} + }); + + TestCreateExtSubDomain(runtime, ++txId, "/MyRoot", Sprintf(R"( + Name: "ServerLessDB" + ResourcesDomainKey { + SchemeShard: %lu + PathId: 2 + } + )", TTestTxConfig::SchemeShard), attrs); + env.TestWaitNotification(runtime, txId); + + TString alterData = TStringBuilder() + << "PlanResolution: 50 " + << "Coordinators: 1 " + << "Mediators: 1 " + << "TimeCastBucketsPerMediator: 2 " + << "ExternalSchemeShard: true " + << "ExternalHive: false " + << "Name: \"ServerLessDB\" " + << "StoragePools { " + << " Name: \"pool-1\" " + << " Kind: \"pool-kind-1\" " + << "} "; + TestAlterExtSubDomain(runtime, ++txId, "/MyRoot", alterData); + env.TestWaitNotification(runtime, txId); + + ui64 tenantSchemeShard = 0; + TestDescribeResult(DescribePath(runtime, "/MyRoot/ServerLessDB"), + {NLs::PathExist, + NLs::IsExternalSubDomain("ServerLessDB"), + NLs::ExtractTenantSchemeshard(&tenantSchemeShard)}); + + // Just create main table + TestCreateTable(runtime, tenantSchemeShard, ++txId, "/MyRoot/ServerLessDB", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint32" } + Columns { Name: "index" Type: "Uint32" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId, tenantSchemeShard); + + auto fnWriteRow = [&] (ui64 tabletId, ui32 key, ui32 index, TString value, const char* table) { + TString writeQuery = Sprintf(R"( + ( + (let key '( '('key (Uint32 '%u ) ) ) ) + (let row '( '('index (Uint32 '%u ) ) '('value (Utf8 '%s) ) ) ) + (return (AsList (UpdateRow '__user__%s key row) )) + ) + )", key, index, value.c_str(), table); + NKikimrMiniKQL::TResult result; + TString err; + NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, tabletId, writeQuery, result, err); + UNIT_ASSERT_VALUES_EQUAL(err, ""); + UNIT_ASSERT_VALUES_EQUAL(status, NKikimrProto::EReplyStatus::OK);; + }; + for (ui32 delta = 0; delta < 101; ++delta) { + fnWriteRow(TTestTxConfig::FakeHiveTablets + 6, 1 + delta, 1000 + delta, "aaaa", "Table"); + } + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::BUILD_INDEX, NLog::PRI_TRACE); + + TestDescribeResult(DescribePath(runtime, tenantSchemeShard, "/MyRoot/ServerLessDB/Table"), + {NLs::PathExist, + NLs::IndexesCount(0), + NLs::PathVersionEqual(3)}); + + TStringBuilder meteringMessages; + auto grabMeteringMessage = [&meteringMessages](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &ev) -> auto { + if (ev->Type == NMetering::TEvMetering::TEvWriteMeteringJson::EventType) { + auto *msg = ev->Get<NMetering::TEvMetering::TEvWriteMeteringJson>(); + Cerr << "grabMeteringMessage has happened" << Endl; + meteringMessages << msg->MeteringJson; + } + + return TTestActorRuntime::EEventAction::PROCESS; + }; + + runtime.SetObserverFunc(grabMeteringMessage); + + Ydb::TypedValue defaultValue; + defaultValue.mutable_type()->set_type_id(Ydb::Type::UINT64); + defaultValue.mutable_value()->set_uint64_value(10); + + TestBuildColumn(runtime, ++txId, tenantSchemeShard, "/MyRoot/ServerLessDB", "/MyRoot/ServerLessDB/Table", "DefaultValue", defaultValue, Ydb::StatusIds::SUCCESS); + // ui64 buildIndexId = txId; + + auto listing = TestListBuildIndex(runtime, tenantSchemeShard, "/MyRoot/ServerLessDB"); + Y_ASSERT(listing.EntriesSize() == 1); + + env.TestWaitNotification(runtime, txId, tenantSchemeShard); + + auto descr = TestGetBuildIndex(runtime, tenantSchemeShard, "/MyRoot/ServerLessDB", txId); + Y_ASSERT(descr.GetIndexBuild().GetState() == Ydb::Table::IndexBuildState::STATE_DONE); +/* + const TString meteringData = R"({"usage":{"start":0,"quantity":179,"finish":0,"unit":"request_unit","type":"delta"},"tags":{},"id":"106-9437197-2-101-1818-101-1818","cloud_id":"CLOUD_ID_VAL","source_wt":0,"source_id":"sless-docapi-ydb-ss","resource_id":"DATABASE_ID_VAL","schema":"ydb.serverless.requests.v1","folder_id":"FOLDER_ID_VAL","version":"1.0.0"})"; + + UNIT_ASSERT_NO_DIFF(meteringMessages, meteringData + "\n"); + + TestDescribeResult(DescribePath(runtime, tenantSchemeShard, "/MyRoot/ServerLessDB/Table"), + {NLs::PathExist, + NLs::IndexesCount(1), + NLs::PathVersionEqual(6)}); + + TestDescribeResult(DescribePath(runtime, tenantSchemeShard, "/MyRoot/ServerLessDB/Table/index1", true, true, true), + {NLs::PathExist, + NLs::IndexState(NKikimrSchemeOp::EIndexState::EIndexStateReady)}); + + TestForgetBuildIndex(runtime, ++txId, tenantSchemeShard, "/MyRoot/ServerLessDB", buildIndexId); + listing = TestListBuildIndex(runtime, tenantSchemeShard, "/MyRoot/ServerLessDB"); + Y_ASSERT(listing.EntriesSize() == 0); +*/ + } +} diff --git a/ydb/core/tx/schemeshard/ut_column_build/ya.make b/ydb/core/tx/schemeshard/ut_column_build/ya.make new file mode 100644 index 0000000000..8f4fb540bb --- /dev/null +++ b/ydb/core/tx/schemeshard/ut_column_build/ya.make @@ -0,0 +1,25 @@ +UNITTEST_FOR(ydb/core/tx/schemeshard) + +FORK_SUBTESTS() + +TIMEOUT(600) + +SIZE(MEDIUM) + +PEERDIR( + library/cpp/getopt + library/cpp/regex/pcre + library/cpp/svnversion + ydb/core/metering + ydb/core/testlib/default + ydb/core/tx + ydb/core/tx/schemeshard/ut_helpers +) + +YQL_LAST_ABI_VERSION() + +SRCS( + ut_column_build.cpp +) + +END() diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp index c75b27ba0a..dfa3e72ab1 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp @@ -1663,6 +1663,19 @@ namespace NSchemeShardUT_Private { return new TEvIndexBuilder::TEvCreateRequest(id, dbName, std::move(settings)); } + std::unique_ptr<TEvIndexBuilder::TEvCreateRequest> CreateBuildColumnRequest(ui64 id, const TString& dbName, const TString& src, const TString& columnName, const Ydb::TypedValue& literal) { + NKikimrIndexBuilder::TIndexBuildSettings settings; + settings.set_source_path(src); + settings.set_max_batch_rows(2); + settings.set_max_shards_in_flight(2); + + auto* col = settings.mutable_column_build_operation()->add_column(); + col->SetColumnName(columnName); + col->mutable_default_from_literal()->CopyFrom(literal); + + return std::make_unique<TEvIndexBuilder::TEvCreateRequest>(id, dbName, std::move(settings)); + } + TStringBuilder PrintIssues(const ::google::protobuf::RepeatedPtrField< ::Ydb::Issue::IssueMessage >& issues) { TStringBuilder result; for (const auto& x: issues) { @@ -1678,6 +1691,13 @@ namespace NSchemeShardUT_Private { ForwardToTablet(runtime, schemeShard, sender, request); } + void AsyncBuildColumn(TTestActorRuntime& runtime, ui64 id, ui64 schemeShard, const TString &dbName, const TString &src, const TString& columnName, const Ydb::TypedValue& literal) { + auto sender = runtime.AllocateEdgeActor(); + auto request = CreateBuildColumnRequest(id, dbName, src, columnName, literal); + + ForwardToTablet(runtime, schemeShard, sender, request.release()); + } + void AsyncBuildIndex(TTestActorRuntime& runtime, ui64 id, ui64 schemeShard, const TString &dbName, const TString &src, const TString &name, TVector<TString> columns, TVector<TString> dataColumns) { @@ -1686,6 +1706,23 @@ namespace NSchemeShardUT_Private { }); } + void TestBuildColumn(TTestActorRuntime& runtime, ui64 id, ui64 schemeShard, const TString &dbName, + const TString &src, const TString& columnName, const Ydb::TypedValue& literal, Ydb::StatusIds::StatusCode expectedStatus) + { + AsyncBuildColumn(runtime, id, schemeShard, dbName, src, columnName, literal); + + TAutoPtr<IEventHandle> handle; + TEvIndexBuilder::TEvCreateResponse* event = runtime.GrabEdgeEvent<TEvIndexBuilder::TEvCreateResponse>(handle); + UNIT_ASSERT(event); + + Cerr << "BUILDINDEX RESPONSE CREATE: " << event->ToString() << Endl; + UNIT_ASSERT_EQUAL_C(event->Record.GetStatus(), expectedStatus, + "status mismatch" + << " got " << Ydb::StatusIds::StatusCode_Name(event->Record.GetStatus()) + << " expected " << Ydb::StatusIds::StatusCode_Name(expectedStatus) + << " issues was " << PrintIssues(event->Record.GetIssues())); + } + void TestBuildIndex(TTestActorRuntime& runtime, ui64 id, ui64 schemeShard, const TString &dbName, const TString &src, const TBuildIndexConfig& cfg, Ydb::StatusIds::StatusCode expectedStatus) { diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.h b/ydb/core/tx/schemeshard/ut_helpers/helpers.h index d9dd10febd..7400e89c19 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/helpers.h +++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.h @@ -330,9 +330,13 @@ namespace NSchemeShardUT_Private { TVector<TString> DataColumns; }; + std::unique_ptr<TEvIndexBuilder::TEvCreateRequest> CreateBuildColumnRequest(ui64 id, const TString& dbName, const TString& src, const TString& columnName, const Ydb::TypedValue& literal); TEvIndexBuilder::TEvCreateRequest* CreateBuildIndexRequest(ui64 id, const TString& dbName, const TString& src, const TBuildIndexConfig& cfg); + void AsyncBuildColumn(TTestActorRuntime& runtime, ui64 id, ui64 schemeShard, const TString &dbName, const TString &src, const TString& columnName, const Ydb::TypedValue& literal); void AsyncBuildIndex(TTestActorRuntime& runtime, ui64 id, ui64 schemeShard, const TString &dbName, const TString &src, const TBuildIndexConfig &cfg); void AsyncBuildIndex(TTestActorRuntime& runtime, ui64 id, ui64 schemeShard, const TString &dbName, const TString &src, const TString &name, TVector<TString> columns, TVector<TString> dataColumns = {}); + void TestBuildColumn(TTestActorRuntime& runtime, ui64 id, ui64 schemeShard, const TString &dbName, + const TString &src, const TString& columnName, const Ydb::TypedValue& literal, Ydb::StatusIds::StatusCode expectedStatus); void TestBuildIndex(TTestActorRuntime& runtime, ui64 id, ui64 schemeShard, const TString &dbName, const TString &src, const TBuildIndexConfig &cfg, Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS); void TestBuildIndex(TTestActorRuntime& runtime, ui64 id, ui64 schemeShard, const TString &dbName, const TString &src, const TString &name, TVector<TString> columns, Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS); TEvIndexBuilder::TEvCancelRequest* CreateCancelBuildIndexRequest(const ui64 id, const TString& dbName, const ui64 buildIndexId); diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make index f4af193a0f..c662e6b6ff 100644 --- a/ydb/core/tx/schemeshard/ya.make +++ b/ydb/core/tx/schemeshard/ya.make @@ -6,6 +6,7 @@ RECURSE_FOR_TESTS( ut_bsvolume_reboots ut_cdc_stream ut_cdc_stream_reboots + ut_column_build ut_compaction ut_export ut_export_reboots_s3 diff --git a/ydb/core/tx/tx_proxy/schemereq.cpp b/ydb/core/tx/tx_proxy/schemereq.cpp index 57ee8797db..c7345c50d7 100644 --- a/ydb/core/tx/tx_proxy/schemereq.cpp +++ b/ydb/core/tx/tx_proxy/schemereq.cpp @@ -215,8 +215,9 @@ struct TBaseSchemeReq: public TActorBootstrapped<TDerived> { case NKikimrSchemeOp::ESchemeOpUpgradeSubDomainDecision: return *modifyScheme.MutableUpgradeSubDomain()->MutableName(); + case NKikimrSchemeOp::ESchemeOpCreateColumnBuild: case NKikimrSchemeOp::ESchemeOpCreateIndexBuild: - Y_FAIL("no implementation for ESchemeOpCreateIndexBuild"); + Y_FAIL("no implementation for ESchemeOpCreateIndexBuild/ESchemeOpCreateColumnBuild"); case NKikimrSchemeOp::ESchemeOpInitiateBuildIndexMainTable: Y_FAIL("no implementation for ESchemeOpInitiateBuildIndexMainTable"); @@ -746,6 +747,7 @@ struct TBaseSchemeReq: public TActorBootstrapped<TDerived> { case NKikimrSchemeOp::ESchemeOpCreateTableIndex: case NKikimrSchemeOp::ESchemeOpDropTableIndex: case NKikimrSchemeOp::ESchemeOp_DEPRECATED_35: + case NKikimrSchemeOp::ESchemeOpCreateColumnBuild: case NKikimrSchemeOp::ESchemeOpCreateIndexBuild: case NKikimrSchemeOp::ESchemeOpInitiateBuildIndexMainTable: case NKikimrSchemeOp::ESchemeOpCreateLock: diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema index 17db38d82b..2252cfe087 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema @@ -4505,11 +4505,6 @@ ], "ColumnsAdded": [ { - "ColumnId": 29, - "ColumnName": "BytesBilled", - "ColumnType": "Uint64" - }, - { "ColumnId": 1, "ColumnName": "Id", "ColumnType": "Uint64" @@ -4648,13 +4643,37 @@ "ColumnId": 28, "ColumnName": "RowsBilled", "ColumnType": "Uint64" + }, + { + "ColumnId": 29, + "ColumnName": "BytesBilled", + "ColumnType": "Uint64" + }, + { + "ColumnId": 30, + "ColumnName": "BuildKind", + "ColumnType": "Uint32" + }, + { + "ColumnId": 31, + "ColumnName": "AlterMainTableTxId", + "ColumnType": "Uint64" + }, + { + "ColumnId": 32, + "ColumnName": "AlterMainTableTxStatus", + "ColumnType": "Uint32" + }, + { + "ColumnId": 33, + "ColumnName": "AlterMainTableTxDone", + "ColumnType": "Bool" } ], "ColumnsDropped": [], "ColumnFamilies": { "0": { "Columns": [ - 29, 1, 2, 3, @@ -4682,7 +4701,12 @@ 25, 26, 27, - 28 + 28, + 29, + 30, + 31, + 32, + 33 ], "RoomID": 0, "Codec": 0, @@ -7057,5 +7081,59 @@ "Blobs": 1 } } + }, + { + "TableId": 107, + "TableName": "BuildColumnOperationSettings", + "TableKey": [ + 1, + 2 + ], + "ColumnsAdded": [ + { + "ColumnId": 1, + "ColumnName": "Id", + "ColumnType": "Uint64" + }, + { + "ColumnId": 2, + "ColumnName": "ColumnNo", + "ColumnType": "Uint64" + }, + { + "ColumnId": 3, + "ColumnName": "ColumnName", + "ColumnType": "Utf8" + }, + { + "ColumnId": 4, + "ColumnName": "DefaultFromLiteral", + "ColumnType": "String" + } + ], + "ColumnsDropped": [], + "ColumnFamilies": { + "0": { + "Columns": [ + 1, + 2, + 3, + 4 + ], + "RoomID": 0, + "Codec": 0, + "InMemory": false, + "Cache": 0, + "Small": 4294967295, + "Large": 4294967295 + } + }, + "Rooms": { + "0": { + "Main": 1, + "Outer": 1, + "Blobs": 1 + } + } } ]
\ No newline at end of file |