aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2023-09-25 18:33:10 +0300
committergvit <gvit@ydb.tech>2023-09-25 19:09:04 +0300
commit124bebf10c9197dac2d547a00e311c1ce423cd6a (patch)
treea68cfa5b4c2c20116a78a58c937a9f235cd17c6d
parent561f679a31cf9a6e2e26ea3e479b482b65eed046 (diff)
downloadydb-124bebf10c9197dac2d547a00e311c1ce423cd6a.tar.gz
refactor TIndexBuildInfo & add initial cancel support KIKIMR-18963
-rw-r--r--ydb/core/tx/datashard/build_index.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__init.cpp113
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_apply_build_index.cpp69
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.h173
-rw-r--r--ydb/core/tx/schemeshard/ut_column_build/ut_column_build.cpp283
5 files changed, 502 insertions, 138 deletions
diff --git a/ydb/core/tx/datashard/build_index.cpp b/ydb/core/tx/datashard/build_index.cpp
index 50feb7ee778..f97c5301fb1 100644
--- a/ydb/core/tx/datashard/build_index.cpp
+++ b/ydb/core/tx/datashard/build_index.cpp
@@ -378,7 +378,7 @@ public:
TMemoryPool valueDataPool(256);
TVector<TCell> cells;
TString err;
- BuildExtraColumns(cells, ColumnBuildSettings, err, valueDataPool);
+ Y_VERIFY(BuildExtraColumns(cells, ColumnBuildSettings, err, valueDataPool));
TSerializedCellVec valueCells(cells);
TString serializedValue = TSerializedCellVec::Serialize(cells);
TSerializedCellVec keyCopy(key);
diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp
index c1f0b5b3790..36b3c78eee7 100644
--- a/ydb/core/tx/schemeshard/schemeshard__init.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp
@@ -4237,67 +4237,15 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
}
while (!rowset.EndOfSet()) {
- TIndexBuildId id = rowset.GetValue<Schema::IndexBuild::Id>();
- TString uid = rowset.GetValue<Schema::IndexBuild::Uid>();
+ TIndexBuildInfo::TPtr indexInfo = TIndexBuildInfo::FromRow(rowset);
- TIndexBuildInfo::TPtr indexInfo = new TIndexBuildInfo(id, uid);
-
- indexInfo->DomainPathId = TPathId(
- rowset.GetValue<Schema::IndexBuild::DomainOwnerId>(),
- rowset.GetValue<Schema::IndexBuild::DomainLocalId>());
-
- indexInfo->TablePathId = TPathId(
- rowset.GetValue<Schema::IndexBuild::TableOwnerId>(),
- rowset.GetValue<Schema::IndexBuild::TableLocalId>());
-
- indexInfo->IndexName = rowset.GetValue<Schema::IndexBuild::IndexName>();
- indexInfo->IndexType = rowset.GetValue<Schema::IndexBuild::IndexType>();
-
- indexInfo->State = TIndexBuildInfo::EState(rowset.GetValue<Schema::IndexBuild::State>());
- indexInfo->Issue = rowset.GetValueOrDefault<Schema::IndexBuild::Issue>();
- indexInfo->CancelRequested = rowset.GetValueOrDefault<Schema::IndexBuild::CancelRequest>(false);
-
- indexInfo->LockTxId = rowset.GetValueOrDefault<Schema::IndexBuild::LockTxId>(indexInfo->LockTxId);
- indexInfo->LockTxStatus = rowset.GetValueOrDefault<Schema::IndexBuild::LockTxStatus>(indexInfo->LockTxStatus);
- indexInfo->LockTxDone = rowset.GetValueOrDefault<Schema::IndexBuild::LockTxDone>(indexInfo->LockTxDone);
-
- indexInfo->InitiateTxId = rowset.GetValueOrDefault<Schema::IndexBuild::InitiateTxId>(indexInfo->InitiateTxId);
- indexInfo->InitiateTxStatus = rowset.GetValueOrDefault<Schema::IndexBuild::InitiateTxStatus>(indexInfo->InitiateTxStatus);
- indexInfo->InitiateTxDone = rowset.GetValueOrDefault<Schema::IndexBuild::InitiateTxDone>(indexInfo->InitiateTxDone);
-
- indexInfo->Limits.MaxBatchRows = rowset.GetValue<Schema::IndexBuild::MaxBatchRows>();
- indexInfo->Limits.MaxBatchBytes = rowset.GetValue<Schema::IndexBuild::MaxBatchBytes>();
- indexInfo->Limits.MaxShards = rowset.GetValue<Schema::IndexBuild::MaxShards>();
- indexInfo->Limits.MaxRetries = rowset.GetValueOrDefault<Schema::IndexBuild::MaxRetries>(indexInfo->Limits.MaxRetries);
-
- indexInfo->ApplyTxId = rowset.GetValueOrDefault<Schema::IndexBuild::ApplyTxId>(indexInfo->ApplyTxId);
- indexInfo->ApplyTxStatus = rowset.GetValueOrDefault<Schema::IndexBuild::ApplyTxStatus>(indexInfo->ApplyTxStatus);
- indexInfo->ApplyTxDone = rowset.GetValueOrDefault<Schema::IndexBuild::ApplyTxDone>(indexInfo->ApplyTxDone);
-
- indexInfo->UnlockTxId = rowset.GetValueOrDefault<Schema::IndexBuild::UnlockTxId>(indexInfo->UnlockTxId);
- indexInfo->UnlockTxStatus = rowset.GetValueOrDefault<Schema::IndexBuild::UnlockTxStatus>(indexInfo->UnlockTxStatus);
- indexInfo->UnlockTxDone = rowset.GetValueOrDefault<Schema::IndexBuild::UnlockTxDone>(indexInfo->UnlockTxDone);
-
- // note: please note that here we specify BuildIndex as operation default,
- // because previosly this table was dedicated for build index operations only.
- indexInfo->BuildKind = TIndexBuildInfo::EBuildKind(
- rowset.GetValueOrDefault<Schema::IndexBuild::BuildKind>(ui32(TIndexBuildInfo::EBuildKind::BuildIndex)));
-
- indexInfo->AlterMainTableTxId = rowset.GetValueOrDefault<Schema::IndexBuild::AlterMainTableTxId>(indexInfo->AlterMainTableTxId);
- indexInfo->AlterMainTableTxStatus = rowset.GetValueOrDefault<Schema::IndexBuild::AlterMainTableTxStatus>(indexInfo->AlterMainTableTxStatus);
- indexInfo->AlterMainTableTxDone = rowset.GetValueOrDefault<Schema::IndexBuild::AlterMainTableTxDone>(indexInfo->AlterMainTableTxDone);
-
- indexInfo->Billed = TBillingStats(
- rowset.GetValueOrDefault<Schema::IndexBuild::RowsBilled>(0),
- rowset.GetValueOrDefault<Schema::IndexBuild::BytesBilled>(0));
-
- Y_VERIFY(!Self->IndexBuilds.contains(id));
- Self->IndexBuilds[id] = indexInfo;
- if (uid) {
- Self->IndexBuildsByUid[uid] = indexInfo;
+ Y_VERIFY(!Self->IndexBuilds.contains(indexInfo->Id));
+ Self->IndexBuilds[indexInfo->Id] = indexInfo;
+ if (indexInfo->Uid) {
+ Self->IndexBuildsByUid[indexInfo->Uid] = indexInfo;
}
- OnComplete.ToProgress(id);
+ OnComplete.ToProgress(indexInfo->Id);
if (!rowset.Next()) {
return false;
@@ -4323,27 +4271,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
<< ": id# " << id);
TIndexBuildInfo::TPtr buildInfo = Self->IndexBuilds.at(id);
-
- TString columnName = rowset.GetValue<Schema::IndexBuildColumns::ColumnName>();
- EIndexColumnKind columnKind = rowset.GetValueOrDefault<Schema::IndexBuildColumns::ColumnKind>(EIndexColumnKind::KeyColumn);
- ui32 columnNo = rowset.GetValue<Schema::IndexBuildColumns::ColumnNo>();
-
- Y_VERIFY_S(columnNo == (buildInfo->IndexColumns.size() + buildInfo->DataColumns.size()),
- "Unexpected non contiguous column number# " << columnNo <<
- " indexColumns# " << buildInfo->IndexColumns.size() <<
- " dataColumns# " << buildInfo->DataColumns.size());
-
- switch (columnKind) {
- case EIndexColumnKind::KeyColumn:
- buildInfo->IndexColumns.push_back(columnName);
- break;
- case EIndexColumnKind::DataColumn:
- buildInfo->DataColumns.push_back(columnName);
- break;
- default:
- Y_FAIL_S("Unknown column kind# " << (int)columnKind);
- break;
- }
+ buildInfo->AddIndexColumnInfo(rowset);
if (!rowset.Next()) {
return false;
@@ -4363,11 +4291,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
<< ": id# " << id);
TIndexBuildInfo::TPtr buildInfo = Self->IndexBuilds.at(id);
-
- TString columnName = rowset.GetValue<Schema::BuildColumnOperationSettings::ColumnName>();
- TString defaultFromLiteral = rowset.GetValue<Schema::BuildColumnOperationSettings::DefaultFromLiteral>();
-
- buildInfo->BuildColumns.push_back(TIndexBuildInfo::TColumnBuildInfo(columnName, defaultFromLiteral));
+ buildInfo->AddBuildColumnInfo(rowset);
if (!rowset.Next()) {
return false;
@@ -4388,26 +4312,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
<< ": id# " << id);
TIndexBuildInfo::TPtr buildInfo = Self->IndexBuilds.at(id);
-
- TShardIdx shardIdx = TShardIdx(rowset.GetValue<Schema::IndexBuildShardStatus::OwnerShardIdx>(),
- rowset.GetValue<Schema::IndexBuildShardStatus::LocalShardIdx>());
-
- NKikimrTx::TKeyRange range = rowset.GetValue<Schema::IndexBuildShardStatus::Range>();
- TString lastKeyAck = rowset.GetValue<Schema::IndexBuildShardStatus::LastKeyAck>();
-
- buildInfo->Shards.emplace(shardIdx, TIndexBuildInfo::TShardStatus(TSerializedTableRange(range), std::move(lastKeyAck)));
- TIndexBuildInfo::TShardStatus& shardStatus = buildInfo->Shards.at(shardIdx);
-
- shardStatus.Status = rowset.GetValue<Schema::IndexBuildShardStatus::Status>();
-
- shardStatus.DebugMessage = rowset.GetValueOrDefault<Schema::IndexBuildShardStatus::Message>();
- shardStatus.UploadStatus = rowset.GetValueOrDefault<Schema::IndexBuildShardStatus::UploadStatus>(Ydb::StatusIds::STATUS_CODE_UNSPECIFIED);
-
- shardStatus.Processed = TBillingStats(
- rowset.GetValueOrDefault<Schema::IndexBuildShardStatus::RowsProcessed>(0),
- rowset.GetValueOrDefault<Schema::IndexBuildShardStatus::BytesProcessed>(0));
-
- buildInfo->Processed += shardStatus.Processed;
+ buildInfo->AddShardStatus(rowset);
if (!rowset.Next()) {
return false;
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_apply_build_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_apply_build_index.cpp
index 21e03524eaf..70454921730 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_apply_build_index.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_apply_build_index.cpp
@@ -73,14 +73,8 @@ TVector<ISubOperation::TPtr> CancelBuildIndex(TOperationId nextId, const TTxTran
TString indexName = config.GetIndexName();
TPath table = TPath::Resolve(tablePath, context.SS);
- TPath index = table.Child(indexName);
-
- //check idempotence
-
- //check limits
-
+
TVector<ISubOperation::TPtr> result;
-
{
auto finalize = TransactionTemplate(table.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpFinalizeBuildIndexMainTable);
*finalize.MutableLockGuard() = tx.GetLockGuard();
@@ -88,12 +82,18 @@ TVector<ISubOperation::TPtr> CancelBuildIndex(TOperationId nextId, const TTxTran
op->SetTableName(table.LeafName());
op->SetSnapshotTxId(config.GetSnaphotTxId());
op->SetBuildIndexId(config.GetBuildIndexId());
- PathIdFromPathId(index.Base()->PathId, op->MutableOutcome()->MutableCancel()->MutableIndexPathId());
+
+ if (!indexName.empty()) {
+ TPath index = table.Child(indexName);
+ PathIdFromPathId(index.Base()->PathId, op->MutableOutcome()->MutableCancel()->MutableIndexPathId());
+ }
result.push_back(CreateFinalizeBuildIndexMainTable(NextPartId(nextId, result), finalize));
}
+ if (!indexName.empty())
{
+ TPath index = table.Child(indexName);
auto tableIndexDropping = TransactionTemplate(table.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropTableIndex);
auto operation = tableIndexDropping.MutableDrop();
operation->SetName(ToString(index.Base()->Name));
@@ -101,34 +101,37 @@ TVector<ISubOperation::TPtr> CancelBuildIndex(TOperationId nextId, const TTxTran
result.push_back(CreateDropTableIndex(NextPartId(nextId, result), tableIndexDropping));
}
- Y_VERIFY(index.Base()->GetChildren().size() == 1);
- for (auto& indexChildItems: index.Base()->GetChildren()) {
- const TString& implTableName = indexChildItems.first;
- Y_VERIFY(implTableName == "indexImplTable", "unexpected name %s", implTableName.c_str());
-
- TPath implTable = index.Child(implTableName);
- {
- TPath::TChecker checks = implTable.Check();
- checks.NotEmpty()
- .IsResolved()
- .NotDeleted()
- .IsTable()
- .IsInsideTableIndexPath()
- .NotUnderDeleting()
- .NotUnderOperation();
-
- if (!checks) {
- return {CreateReject(nextId, checks.GetStatus(), checks.GetError())};
+ if (!indexName.empty()) {
+ TPath index = table.Child(indexName);
+ Y_VERIFY(index.Base()->GetChildren().size() == 1);
+ for (auto& indexChildItems: index.Base()->GetChildren()) {
+ const TString& implTableName = indexChildItems.first;
+ Y_VERIFY(implTableName == "indexImplTable", "unexpected name %s", implTableName.c_str());
+
+ TPath implTable = index.Child(implTableName);
+ {
+ TPath::TChecker checks = implTable.Check();
+ checks.NotEmpty()
+ .IsResolved()
+ .NotDeleted()
+ .IsTable()
+ .IsInsideTableIndexPath()
+ .NotUnderDeleting()
+ .NotUnderOperation();
+
+ if (!checks) {
+ return {CreateReject(nextId, checks.GetStatus(), checks.GetError())};
+ }
}
- }
- Y_VERIFY(implTable.Base()->PathId == indexChildItems.second);
+ Y_VERIFY(implTable.Base()->PathId == indexChildItems.second);
- {
- auto implTableDropping = TransactionTemplate(index.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropTable);
- auto operation = implTableDropping.MutableDrop();
- operation->SetName(ToString(implTable.Base()->Name));
+ {
+ auto implTableDropping = TransactionTemplate(index.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropTable);
+ auto operation = implTableDropping.MutableDrop();
+ operation->SetName(ToString(implTable.Base()->Name));
- result.push_back(CreateDropTable(NextPartId(nextId,result), implTableDropping));
+ result.push_back(CreateDropTable(NextPartId(nextId,result), implTableDropping));
+ }
}
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h
index 23bdfff5b16..7648ab37c72 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.h
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h
@@ -6,6 +6,7 @@
#include "schemeshard_path_element.h"
#include "schemeshard_identificators.h"
#include "schemeshard_olap_types.h"
+#include "schemeshard_schema.h"
#include <ydb/core/tx/message_seqno.h>
#include <ydb/core/tx/datashard/datashard.h>
@@ -3073,6 +3074,178 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
, Uid(uid)
{}
+ template<class TRow>
+ void AddBuildColumnInfo(const TRow& row){
+ TString columnName = row.template GetValue<Schema::BuildColumnOperationSettings::ColumnName>();
+ TString defaultFromLiteral = row.template GetValue<Schema::BuildColumnOperationSettings::DefaultFromLiteral>();
+ BuildColumns.push_back(TColumnBuildInfo(columnName, defaultFromLiteral));
+ }
+
+ template<class TRowSetType>
+ void AddIndexColumnInfo(const TRowSetType& row) {
+
+ TString columnName =
+ row.template GetValue<Schema::IndexBuildColumns::ColumnName>();
+ EIndexColumnKind columnKind =
+ row.template GetValueOrDefault<Schema::IndexBuildColumns::ColumnKind>(
+ EIndexColumnKind::KeyColumn);
+ ui32 columnNo = row.template GetValue<Schema::IndexBuildColumns::ColumnNo>();
+
+ Y_VERIFY_S(columnNo == (IndexColumns.size() + DataColumns.size()),
+ "Unexpected non contiguous column number# "
+ << columnNo << " indexColumns# "
+ << IndexColumns.size() << " dataColumns# "
+ << DataColumns.size());
+
+ switch (columnKind) {
+ case EIndexColumnKind::KeyColumn:
+ IndexColumns.push_back(columnName);
+ break;
+ case EIndexColumnKind::DataColumn:
+ DataColumns.push_back(columnName);
+ break;
+ default:
+ Y_FAIL_S("Unknown column kind# " << (int)columnKind);
+ break;
+ }
+ }
+
+ template<class TRow>
+ static TIndexBuildInfo::TPtr FromRow(const TRow& row) {
+ TIndexBuildId id = row.template GetValue<Schema::IndexBuild::Id>();
+ TString uid = row.template GetValue<Schema::IndexBuild::Uid>();
+
+ TIndexBuildInfo::TPtr indexInfo = new TIndexBuildInfo(id, uid);
+
+ indexInfo->DomainPathId =
+ TPathId(row.template GetValue<Schema::IndexBuild::DomainOwnerId>(),
+ row.template GetValue<Schema::IndexBuild::DomainLocalId>());
+
+ indexInfo->TablePathId =
+ TPathId(row.template GetValue<Schema::IndexBuild::TableOwnerId>(),
+ row.template GetValue<Schema::IndexBuild::TableLocalId>());
+
+ indexInfo->IndexName = row.template GetValue<Schema::IndexBuild::IndexName>();
+ indexInfo->IndexType = row.template GetValue<Schema::IndexBuild::IndexType>();
+
+ indexInfo->State = TIndexBuildInfo::EState(
+ row.template GetValue<Schema::IndexBuild::State>());
+ indexInfo->Issue =
+ row.template GetValueOrDefault<Schema::IndexBuild::Issue>();
+ indexInfo->CancelRequested =
+ row.template GetValueOrDefault<Schema::IndexBuild::CancelRequest>(false);
+
+ indexInfo->LockTxId =
+ row.template GetValueOrDefault<Schema::IndexBuild::LockTxId>(
+ indexInfo->LockTxId);
+ indexInfo->LockTxStatus =
+ row.template GetValueOrDefault<Schema::IndexBuild::LockTxStatus>(
+ indexInfo->LockTxStatus);
+ indexInfo->LockTxDone =
+ row.template GetValueOrDefault<Schema::IndexBuild::LockTxDone>(
+ indexInfo->LockTxDone);
+
+ indexInfo->InitiateTxId =
+ row.template GetValueOrDefault<Schema::IndexBuild::InitiateTxId>(
+ indexInfo->InitiateTxId);
+ indexInfo->InitiateTxStatus =
+ row.template GetValueOrDefault<Schema::IndexBuild::InitiateTxStatus>(
+ indexInfo->InitiateTxStatus);
+ indexInfo->InitiateTxDone =
+ row.template GetValueOrDefault<Schema::IndexBuild::InitiateTxDone>(
+ indexInfo->InitiateTxDone);
+
+ indexInfo->Limits.MaxBatchRows =
+ row.template GetValue<Schema::IndexBuild::MaxBatchRows>();
+ indexInfo->Limits.MaxBatchBytes =
+ row.template GetValue<Schema::IndexBuild::MaxBatchBytes>();
+ indexInfo->Limits.MaxShards =
+ row.template GetValue<Schema::IndexBuild::MaxShards>();
+ indexInfo->Limits.MaxRetries =
+ row.template GetValueOrDefault<Schema::IndexBuild::MaxRetries>(
+ indexInfo->Limits.MaxRetries);
+
+ indexInfo->ApplyTxId =
+ row.template GetValueOrDefault<Schema::IndexBuild::ApplyTxId>(
+ indexInfo->ApplyTxId);
+ indexInfo->ApplyTxStatus =
+ row.template GetValueOrDefault<Schema::IndexBuild::ApplyTxStatus>(
+ indexInfo->ApplyTxStatus);
+ indexInfo->ApplyTxDone =
+ row.template GetValueOrDefault<Schema::IndexBuild::ApplyTxDone>(
+ indexInfo->ApplyTxDone);
+
+ indexInfo->UnlockTxId =
+ row.template GetValueOrDefault<Schema::IndexBuild::UnlockTxId>(
+ indexInfo->UnlockTxId);
+ indexInfo->UnlockTxStatus =
+ row.template GetValueOrDefault<Schema::IndexBuild::UnlockTxStatus>(
+ indexInfo->UnlockTxStatus);
+ indexInfo->UnlockTxDone =
+ row.template GetValueOrDefault<Schema::IndexBuild::UnlockTxDone>(
+ indexInfo->UnlockTxDone);
+
+ // note: please note that here we specify BuildIndex as operation
+ // default, because previosly this table was dedicated for build index
+ // operations only.
+ indexInfo->BuildKind = TIndexBuildInfo::EBuildKind(
+ row.template GetValueOrDefault<Schema::IndexBuild::BuildKind>(
+ ui32(TIndexBuildInfo::EBuildKind::BuildIndex)));
+
+ indexInfo->AlterMainTableTxId =
+ row.template GetValueOrDefault<Schema::IndexBuild::AlterMainTableTxId>(
+ indexInfo->AlterMainTableTxId);
+ indexInfo->AlterMainTableTxStatus =
+ row
+ .template GetValueOrDefault<Schema::IndexBuild::AlterMainTableTxStatus>(
+ indexInfo->AlterMainTableTxStatus);
+ indexInfo->AlterMainTableTxDone =
+ row.template GetValueOrDefault<Schema::IndexBuild::AlterMainTableTxDone>(
+ indexInfo->AlterMainTableTxDone);
+
+ indexInfo->Billed = TBillingStats(
+ row.template GetValueOrDefault<Schema::IndexBuild::RowsBilled>(0),
+ row.template GetValueOrDefault<Schema::IndexBuild::BytesBilled>(0));
+
+ return indexInfo;
+ }
+
+ template<class TRow>
+ void AddShardStatus(const TRow& row) {
+ TShardIdx shardIdx =
+ TShardIdx(row.template GetValue<
+ Schema::IndexBuildShardStatus::OwnerShardIdx>(),
+ row.template GetValue<
+ Schema::IndexBuildShardStatus::LocalShardIdx>());
+
+ NKikimrTx::TKeyRange range =
+ row.template GetValue<Schema::IndexBuildShardStatus::Range>();
+ TString lastKeyAck =
+ row.template GetValue<Schema::IndexBuildShardStatus::LastKeyAck>();
+
+ Shards.emplace(
+ shardIdx, TIndexBuildInfo::TShardStatus(
+ TSerializedTableRange(range), std::move(lastKeyAck)));
+ TIndexBuildInfo::TShardStatus &shardStatus = Shards.at(shardIdx);
+
+ shardStatus.Status =
+ row.template GetValue<Schema::IndexBuildShardStatus::Status>();
+
+ shardStatus.DebugMessage = row.template GetValueOrDefault<
+ Schema::IndexBuildShardStatus::Message>();
+ shardStatus.UploadStatus = row.template GetValueOrDefault<
+ Schema::IndexBuildShardStatus::UploadStatus>(
+ Ydb::StatusIds::STATUS_CODE_UNSPECIFIED);
+
+ shardStatus.Processed = TBillingStats(
+ row.template GetValueOrDefault<
+ Schema::IndexBuildShardStatus::RowsProcessed>(0),
+ row.template GetValueOrDefault<
+ Schema::IndexBuildShardStatus::BytesProcessed>(0));
+
+ Processed += shardStatus.Processed;
+ }
+
bool IsCancellationRequested() const {
return CancelRequested;
}
diff --git a/ydb/core/tx/schemeshard/ut_column_build/ut_column_build.cpp b/ydb/core/tx/schemeshard/ut_column_build/ut_column_build.cpp
index ecbe2aba1ef..714fc1187c3 100644
--- a/ydb/core/tx/schemeshard/ut_column_build/ut_column_build.cpp
+++ b/ydb/core/tx/schemeshard/ut_column_build/ut_column_build.cpp
@@ -10,6 +10,224 @@ using namespace NSchemeShard;
using namespace NSchemeShardUT_Private;
Y_UNIT_TEST_SUITE(IndexBuildTest) {
+ Y_UNIT_TEST(AlreadyExists) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+ ui64 txId = 100;
+
+ TestCreateExtSubDomain(runtime, ++txId, "/MyRoot",
+ "Name: \"ResourceDB\"");
+ env.TestWaitNotification(runtime, txId);
+
+ TestAlterExtSubDomain(runtime, ++txId, "/MyRoot",
+ "StoragePools { "
+ " Name: \"pool-1\" "
+ " Kind: \"pool-kind-1\" "
+ "} "
+ "StoragePools { "
+ " Name: \"pool-2\" "
+ " Kind: \"pool-kind-2\" "
+ "} "
+ "PlanResolution: 50 "
+ "Coordinators: 1 "
+ "Mediators: 1 "
+ "TimeCastBucketsPerMediator: 2 "
+ "ExternalSchemeShard: true "
+ "Name: \"ResourceDB\"");
+ env.TestWaitNotification(runtime, txId);
+
+ const auto attrs = AlterUserAttrs({
+ {"cloud_id", "CLOUD_ID_VAL"},
+ {"folder_id", "FOLDER_ID_VAL"},
+ {"database_id", "DATABASE_ID_VAL"}
+ });
+
+ TestCreateExtSubDomain(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ Name: "ServerLessDB"
+ ResourcesDomainKey {
+ SchemeShard: %lu
+ PathId: 2
+ }
+ )", TTestTxConfig::SchemeShard), attrs);
+ env.TestWaitNotification(runtime, txId);
+
+ TString alterData = TStringBuilder()
+ << "PlanResolution: 50 "
+ << "Coordinators: 1 "
+ << "Mediators: 1 "
+ << "TimeCastBucketsPerMediator: 2 "
+ << "ExternalSchemeShard: true "
+ << "ExternalHive: false "
+ << "Name: \"ServerLessDB\" "
+ << "StoragePools { "
+ << " Name: \"pool-1\" "
+ << " Kind: \"pool-kind-1\" "
+ << "} ";
+ TestAlterExtSubDomain(runtime, ++txId, "/MyRoot", alterData);
+ env.TestWaitNotification(runtime, txId);
+
+ ui64 tenantSchemeShard = 0;
+ TestDescribeResult(DescribePath(runtime, "/MyRoot/ServerLessDB"),
+ {NLs::PathExist,
+ NLs::IsExternalSubDomain("ServerLessDB"),
+ NLs::ExtractTenantSchemeshard(&tenantSchemeShard)});
+
+ // Just create main table
+ TestCreateTable(runtime, tenantSchemeShard, ++txId, "/MyRoot/ServerLessDB", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint32" }
+ Columns { Name: "index" Type: "Uint32" }
+ Columns { Name: "value" Type: "Utf8" }
+ KeyColumnNames: ["key"]
+ )");
+ env.TestWaitNotification(runtime, txId, tenantSchemeShard);
+
+ auto fnWriteRow = [&] (ui64 tabletId, ui32 key, ui32 index, TString value, const char* table) {
+ TString writeQuery = Sprintf(R"(
+ (
+ (let key '( '('key (Uint32 '%u ) ) ) )
+ (let row '( '('index (Uint32 '%u ) ) '('value (Utf8 '%s) ) ) )
+ (return (AsList (UpdateRow '__user__%s key row) ))
+ )
+ )", key, index, value.c_str(), table);
+ NKikimrMiniKQL::TResult result;
+ TString err;
+ NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, tabletId, writeQuery, result, err);
+ UNIT_ASSERT_VALUES_EQUAL(err, "");
+ UNIT_ASSERT_VALUES_EQUAL(status, NKikimrProto::EReplyStatus::OK);;
+ };
+ for (ui32 delta = 0; delta < 101; ++delta) {
+ fnWriteRow(TTestTxConfig::FakeHiveTablets + 6, 1 + delta, 1000 + delta, "aaaa", "Table");
+ }
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::BUILD_INDEX, NLog::PRI_TRACE);
+
+ TestDescribeResult(DescribePath(runtime, tenantSchemeShard, "/MyRoot/ServerLessDB/Table"),
+ {NLs::PathExist,
+ NLs::IndexesCount(0),
+ NLs::PathVersionEqual(3)});
+
+ Ydb::TypedValue defaultValue;
+ defaultValue.mutable_type()->set_type_id(Ydb::Type::UINT64);
+ defaultValue.mutable_value()->set_uint64_value(10);
+
+ TestBuildColumn(runtime, ++txId, tenantSchemeShard, "/MyRoot/ServerLessDB", "/MyRoot/ServerLessDB/Table", "value", defaultValue, Ydb::StatusIds::BAD_REQUEST);
+ }
+
+ Y_UNIT_TEST(InvalidValue) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+ ui64 txId = 100;
+
+ TestCreateExtSubDomain(runtime, ++txId, "/MyRoot",
+ "Name: \"ResourceDB\"");
+ env.TestWaitNotification(runtime, txId);
+
+ TestAlterExtSubDomain(runtime, ++txId, "/MyRoot",
+ "StoragePools { "
+ " Name: \"pool-1\" "
+ " Kind: \"pool-kind-1\" "
+ "} "
+ "StoragePools { "
+ " Name: \"pool-2\" "
+ " Kind: \"pool-kind-2\" "
+ "} "
+ "PlanResolution: 50 "
+ "Coordinators: 1 "
+ "Mediators: 1 "
+ "TimeCastBucketsPerMediator: 2 "
+ "ExternalSchemeShard: true "
+ "Name: \"ResourceDB\"");
+ env.TestWaitNotification(runtime, txId);
+
+ const auto attrs = AlterUserAttrs({
+ {"cloud_id", "CLOUD_ID_VAL"},
+ {"folder_id", "FOLDER_ID_VAL"},
+ {"database_id", "DATABASE_ID_VAL"}
+ });
+
+ TestCreateExtSubDomain(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ Name: "ServerLessDB"
+ ResourcesDomainKey {
+ SchemeShard: %lu
+ PathId: 2
+ }
+ )", TTestTxConfig::SchemeShard), attrs);
+ env.TestWaitNotification(runtime, txId);
+
+ TString alterData = TStringBuilder()
+ << "PlanResolution: 50 "
+ << "Coordinators: 1 "
+ << "Mediators: 1 "
+ << "TimeCastBucketsPerMediator: 2 "
+ << "ExternalSchemeShard: true "
+ << "ExternalHive: false "
+ << "Name: \"ServerLessDB\" "
+ << "StoragePools { "
+ << " Name: \"pool-1\" "
+ << " Kind: \"pool-kind-1\" "
+ << "} ";
+ TestAlterExtSubDomain(runtime, ++txId, "/MyRoot", alterData);
+ env.TestWaitNotification(runtime, txId);
+
+ ui64 tenantSchemeShard = 0;
+ TestDescribeResult(DescribePath(runtime, "/MyRoot/ServerLessDB"),
+ {NLs::PathExist,
+ NLs::IsExternalSubDomain("ServerLessDB"),
+ NLs::ExtractTenantSchemeshard(&tenantSchemeShard)});
+
+ // Just create main table
+ TestCreateTable(runtime, tenantSchemeShard, ++txId, "/MyRoot/ServerLessDB", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint32" }
+ Columns { Name: "index" Type: "Uint32" }
+ Columns { Name: "value" Type: "Utf8" }
+ KeyColumnNames: ["key"]
+ )");
+ env.TestWaitNotification(runtime, txId, tenantSchemeShard);
+
+ auto fnWriteRow = [&] (ui64 tabletId, ui32 key, ui32 index, TString value, const char* table) {
+ TString writeQuery = Sprintf(R"(
+ (
+ (let key '( '('key (Uint32 '%u ) ) ) )
+ (let row '( '('index (Uint32 '%u ) ) '('value (Utf8 '%s) ) ) )
+ (return (AsList (UpdateRow '__user__%s key row) ))
+ )
+ )", key, index, value.c_str(), table);
+ NKikimrMiniKQL::TResult result;
+ TString err;
+ NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, tabletId, writeQuery, result, err);
+ UNIT_ASSERT_VALUES_EQUAL(err, "");
+ UNIT_ASSERT_VALUES_EQUAL(status, NKikimrProto::EReplyStatus::OK);;
+ };
+ for (ui32 delta = 0; delta < 101; ++delta) {
+ fnWriteRow(TTestTxConfig::FakeHiveTablets + 6, 1 + delta, 1000 + delta, "aaaa", "Table");
+ }
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::BUILD_INDEX, NLog::PRI_TRACE);
+
+ TestDescribeResult(DescribePath(runtime, tenantSchemeShard, "/MyRoot/ServerLessDB/Table"),
+ {NLs::PathExist,
+ NLs::IndexesCount(0),
+ NLs::PathVersionEqual(3)});
+
+ Ydb::TypedValue defaultValue;
+ defaultValue.mutable_type()->set_type_id(Ydb::Type::UINT64);
+ defaultValue.mutable_value()->set_text_value("1111");
+
+ TestBuildColumn(runtime, ++txId, tenantSchemeShard, "/MyRoot/ServerLessDB", "/MyRoot/ServerLessDB/Table", "ColumnValue", defaultValue, Ydb::StatusIds::SUCCESS);
+
+ auto listing = TestListBuildIndex(runtime, tenantSchemeShard, "/MyRoot/ServerLessDB");
+ Y_ASSERT(listing.EntriesSize() == 1);
+
+ env.TestWaitNotification(runtime, txId, tenantSchemeShard);
+
+ auto descr = TestGetBuildIndex(runtime, tenantSchemeShard, "/MyRoot/ServerLessDB", txId);
+ Y_ASSERT(descr.GetIndexBuild().GetState() == Ydb::Table::IndexBuildState::STATE_DONE);
+ }
+
Y_UNIT_TEST(BaseCase) {
TTestBasicRuntime runtime;
TTestEnv env(runtime);
@@ -154,4 +372,69 @@ Y_UNIT_TEST_SUITE(IndexBuildTest) {
Y_ASSERT(listing.EntriesSize() == 0);
*/
}
+
+ Y_UNIT_TEST(CancelBuild) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+ ui64 txId = 100;
+
+ // Just create main table
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint32" }
+ Columns { Name: "index" Type: "Uint32" }
+ Columns { Name: "value" Type: "Utf8" }
+ KeyColumnNames: ["key"]
+ UniformPartitionsCount: 10
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ auto fnWriteRow = [&] (ui64 tabletId, ui32 key, ui32 index, TString value, const char* table) {
+ TString writeQuery = Sprintf(R"(
+ (
+ (let key '( '('key (Uint32 '%u ) ) ) )
+ (let row '( '('index (Uint32 '%u ) ) '('value (Utf8 '%s) ) ) )
+ (return (AsList (UpdateRow '__user__%s key row) ))
+ )
+ )", key, index, value.c_str(), table);
+ NKikimrMiniKQL::TResult result;
+ TString err;
+ NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, tabletId, writeQuery, result, err);
+ UNIT_ASSERT_VALUES_EQUAL(err, "");
+ UNIT_ASSERT_VALUES_EQUAL(status, NKikimrProto::EReplyStatus::OK);;
+ };
+ for (ui32 delta = 0; delta < 101; ++delta) {
+ fnWriteRow(TTestTxConfig::FakeHiveTablets, 1 + delta, 1000 + delta, "aaaa", "Table");
+ }
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+
+ TestDescribeResult(DescribePath(runtime, "/MyRoot/Table"),
+ {NLs::PathExist,
+ NLs::IndexesCount(0),
+ NLs::PathVersionEqual(3)});
+
+ Ydb::TypedValue defaultValue;
+ defaultValue.mutable_type()->set_type_id(Ydb::Type::UINT64);
+ defaultValue.mutable_value()->set_uint64_value(10);
+
+ TestBuildColumn(runtime, ++txId, TTestTxConfig::SchemeShard, "/MyRoot", "/MyRoot/Table", "DefaultValue", defaultValue, Ydb::StatusIds::SUCCESS);
+
+ ui64 buildIndexId = txId;
+
+ auto listing = TestListBuildIndex(runtime, TTestTxConfig::SchemeShard, "/MyRoot");
+ Y_ASSERT(listing.EntriesSize() == 1);
+
+ TestCancelBuildIndex(runtime, ++txId, TTestTxConfig::SchemeShard, "/MyRoot", buildIndexId);
+
+ env.TestWaitNotification(runtime, buildIndexId);
+
+ auto descr = TestGetBuildIndex(runtime, TTestTxConfig::SchemeShard, "/MyRoot", buildIndexId);
+ Y_ASSERT(descr.GetIndexBuild().GetState() == Ydb::Table::IndexBuildState::STATE_CANCELLED);
+
+ TestDescribeResult(DescribePath(runtime, "/MyRoot/Table"),
+ {NLs::PathExist,
+ NLs::IndexesCount(0),
+ NLs::PathVersionEqual(6)});
+ }
}