aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkungurtsev <kungasc@ydb.tech>2025-06-05 17:17:12 +0200
committerGitHub <noreply@github.com>2025-06-05 17:17:12 +0200
commitaa1663fc442172a2428feb9c36fc0a925fe0ee6d (patch)
tree872a74a2435af61b47389e9fb4b5fdd21a782ae3
parent1180c0895c16bb0cedf5a345f31df743b5d5e4b8 (diff)
downloadydb-aa1663fc442172a2428feb9c36fc0a925fe0ee6d.tar.gz
Handle unhandled exceptions during build index SchemeShard init (#19312)
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__init.cpp114
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__monitoring.cpp3
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_build_index.cpp15
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp50
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_import__create.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.h105
-rw-r--r--ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp79
9 files changed, 254 insertions, 122 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp
index 7817bc65ae9..924c1ca8678 100644
--- a/ydb/core/tx/schemeshard/schemeshard__init.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp
@@ -4561,6 +4561,36 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
// Read index build
{
+ auto fillBuildInfoSafe = [&](TIndexBuildInfo& buildInfo, const TString& stepName, const auto& fill) {
+ try {
+ fill(buildInfo);
+ } catch (const std::exception& exc) {
+ LOG_ERROR_S(ctx, NKikimrServices::BUILD_INDEX,
+ "Init " << stepName << " unhandled exception, id#" << buildInfo.Id
+ << " " << TypeName(exc) << ": " << exc.what() << Endl
+ << TBackTrace::FromCurrentException().PrintToString()
+ << ", TIndexBuildInfo: " << buildInfo);
+
+ // in-memory volatile state:
+ buildInfo.IsBroken = true;
+ buildInfo.AddIssue(TStringBuilder() << "Init " << stepName << " unhandled exception " << exc.what());
+ }
+ };
+
+ auto fillBuildInfoByIdSafe = [&](TIndexBuildId id, const TString& stepName, const auto& fill) {
+ const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(id);
+ Y_ASSERT(buildInfoPtr);
+ if (!buildInfoPtr) {
+ LOG_ERROR_S(ctx, NKikimrServices::BUILD_INDEX,
+ "Init " << stepName << " BuildInfo not found: id#" << id);
+ return;
+ }
+ auto& buildInfo = *buildInfoPtr->Get();
+ if (!buildInfo.IsBroken) {
+ fillBuildInfoSafe(buildInfo, stepName, fill);
+ }
+ };
+
// read main info
{
auto rowset = db.Table<Schema::IndexBuild>().Range().Select();
@@ -4569,17 +4599,21 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
}
while (!rowset.EndOfSet()) {
- TIndexBuildInfo::TPtr indexInfo = TIndexBuildInfo::FromRow(rowset);
-
- auto [it, emplaced] = Self->IndexBuilds.emplace(indexInfo->Id, indexInfo);
- Y_ABORT_UNLESS(emplaced);
- if (indexInfo->Uid) {
- // TODO(mbkkt) It also should be unique, but we're not sure.
- Y_ASSERT(!Self->IndexBuildsByUid.contains(indexInfo->Uid));
- Self->IndexBuildsByUid[indexInfo->Uid] = indexInfo;
+ TIndexBuildInfo::TPtr buildInfo = new TIndexBuildInfo();
+ fillBuildInfoSafe(*buildInfo, "IndexBuild", [&](TIndexBuildInfo& buildInfo) {
+ TIndexBuildInfo::FillFromRow(rowset, &buildInfo);
+ });
+
+ // Note: broken build are also added to IndexBuilds
+ Y_ASSERT(!Self->IndexBuilds.contains(buildInfo->Id));
+ Self->IndexBuilds[buildInfo->Id] = buildInfo;
+
+ if (buildInfo->Uid) {
+ Y_ASSERT(!Self->IndexBuildsByUid.contains(buildInfo->Uid));
+ Self->IndexBuildsByUid[buildInfo->Uid] = buildInfo;
}
- OnComplete.ToProgress(indexInfo->Id);
+ OnComplete.ToProgress(buildInfo->Id);
if (!rowset.Next()) {
return false;
@@ -4601,19 +4635,18 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
while (!rowset.EndOfSet()) {
TIndexBuildId id = rowset.GetValue<Schema::KMeansTreeProgress::Id>();
- const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(id);
- Y_VERIFY_S(buildInfoPtr, "BuildIndex not found: id# " << id);
- auto& buildInfo = *buildInfoPtr->Get();
- buildInfo.KMeans.Set(
- rowset.GetValue<Schema::KMeansTreeProgress::Level>(),
- rowset.GetValue<Schema::KMeansTreeProgress::ParentBegin>(),
- rowset.GetValue<Schema::KMeansTreeProgress::Parent>(),
- rowset.GetValue<Schema::KMeansTreeProgress::ChildBegin>(),
- rowset.GetValue<Schema::KMeansTreeProgress::Child>(),
- rowset.GetValue<Schema::KMeansTreeProgress::State>(),
- rowset.GetValue<Schema::KMeansTreeProgress::TableSize>()
- );
- buildInfo.Sample.Rows.reserve(buildInfo.KMeans.K * 2);
+ fillBuildInfoByIdSafe(id, "KMeansTreeProgress", [&](TIndexBuildInfo& buildInfo) {
+ buildInfo.KMeans.Set(
+ rowset.GetValue<Schema::KMeansTreeProgress::Level>(),
+ rowset.GetValue<Schema::KMeansTreeProgress::ParentBegin>(),
+ rowset.GetValue<Schema::KMeansTreeProgress::Parent>(),
+ rowset.GetValue<Schema::KMeansTreeProgress::ChildBegin>(),
+ rowset.GetValue<Schema::KMeansTreeProgress::Child>(),
+ rowset.GetValue<Schema::KMeansTreeProgress::State>(),
+ rowset.GetValue<Schema::KMeansTreeProgress::TableSize>()
+ );
+ buildInfo.Sample.Rows.reserve(buildInfo.KMeans.K * 2);
+ });
if (!rowset.Next()) {
return false;
@@ -4632,13 +4665,12 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
size_t sampleCount = 0;
while (!rowset.EndOfSet()) {
TIndexBuildId id = rowset.GetValue<Schema::KMeansTreeSample::Id>();
- const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(id);
- Y_VERIFY_S(buildInfoPtr, "BuildIndex not found: id# " << id);
- auto& buildInfo = *buildInfoPtr->Get();
- buildInfo.Sample.Add(
- rowset.GetValue<Schema::KMeansTreeSample::Probability>(),
- rowset.GetValue<Schema::KMeansTreeSample::Data>()
- );
+ fillBuildInfoByIdSafe(id, "KMeansTreeSample", [&](TIndexBuildInfo& buildInfo) {
+ buildInfo.Sample.Add(
+ rowset.GetValue<Schema::KMeansTreeSample::Probability>(),
+ rowset.GetValue<Schema::KMeansTreeSample::Data>()
+ );
+ });
sampleCount++;
if (!rowset.Next()) {
@@ -4660,11 +4692,9 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
while (!rowset.EndOfSet()) {
TIndexBuildId id = rowset.GetValue<Schema::IndexBuildColumns::Id>();
- const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(id);
- Y_VERIFY_S(buildInfoPtr, "BuildIndex not found"
- << ": id# " << id);
- auto& buildInfo = *buildInfoPtr->Get();
- buildInfo.AddIndexColumnInfo(rowset);
+ fillBuildInfoByIdSafe(id, "IndexBuildColumns", [&](TIndexBuildInfo& buildInfo) {
+ buildInfo.AddIndexColumnInfo(rowset);
+ });
if (!rowset.Next()) {
return false;
@@ -4680,11 +4710,9 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
while (!rowset.EndOfSet()) {
TIndexBuildId id = rowset.GetValue<Schema::BuildColumnOperationSettings::Id>();
- const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(id);
- Y_VERIFY_S(buildInfoPtr, "BuildIndex not found"
- << ": id# " << id);
- auto& buildInfo = *buildInfoPtr->Get();
- buildInfo.AddBuildColumnInfo(rowset);
+ fillBuildInfoByIdSafe(id, "BuildColumnOperationSettings", [&](TIndexBuildInfo& buildInfo) {
+ buildInfo.AddBuildColumnInfo(rowset);
+ });
if (!rowset.Next()) {
return false;
@@ -4701,11 +4729,9 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
while (!rowset.EndOfSet()) {
TIndexBuildId id = rowset.GetValue<Schema::IndexBuildShardStatus::Id>();
- const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(id);
- Y_VERIFY_S(buildInfoPtr, "BuildIndex not found"
- << ": id# " << id);
- auto& buildInfo = *buildInfoPtr->Get();
- buildInfo.AddShardStatus(rowset);
+ fillBuildInfoByIdSafe(id, "IndexBuildShardStatus", [&](TIndexBuildInfo& buildInfo) {
+ buildInfo.AddShardStatus(rowset);
+ });
if (!rowset.Next()) {
return false;
diff --git a/ydb/core/tx/schemeshard/schemeshard__monitoring.cpp b/ydb/core/tx/schemeshard/schemeshard__monitoring.cpp
index 93d1c82792c..405baa3b81f 100644
--- a/ydb/core/tx/schemeshard/schemeshard__monitoring.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__monitoring.cpp
@@ -818,7 +818,8 @@ private:
<< "CancelRequested: " << (info.CancelRequested ? "YES" : "NO") << Endl
<< "State: " << info.State << Endl
- << "Issue: " << info.Issue << Endl
+ << "IsBroken: " << info.IsBroken << Endl
+ << "Issue: " << info.GetIssue() << Endl
<< "Shards.size: " << info.Shards.size() << Endl
<< "ToUploadShards.size: " << info.ToUploadShards.size() << Endl
diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index.cpp
index 58850d7cf33..9016995a00f 100644
--- a/ydb/core/tx/schemeshard/schemeshard_build_index.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_build_index.cpp
@@ -53,7 +53,7 @@ void TSchemeShard::Handle(TEvPrivate::TEvIndexBuildingMakeABill::TPtr& ev, const
}
void TSchemeShard::PersistCreateBuildIndex(NIceDb::TNiceDb& db, const TIndexBuildInfo& info) {
- Y_ABORT_UNLESS(info.BuildKind != TIndexBuildInfo::EBuildKind::BuildKindUnspecified);
+ Y_ENSURE(info.BuildKind != TIndexBuildInfo::EBuildKind::BuildKindUnspecified);
auto persistedBuildIndex = db.Table<Schema::IndexBuild>().Key(info.Id);
persistedBuildIndex.Update(
NIceDb::TUpdate<Schema::IndexBuild::Uid>(info.Uid),
@@ -126,7 +126,7 @@ void TSchemeShard::PersistCreateBuildIndex(NIceDb::TNiceDb& db, const TIndexBuil
void TSchemeShard::PersistBuildIndexState(NIceDb::TNiceDb& db, const TIndexBuildInfo& indexInfo) {
db.Table<Schema::IndexBuild>().Key(indexInfo.Id).Update(
NIceDb::TUpdate<Schema::IndexBuild::State>(ui32(indexInfo.State)),
- NIceDb::TUpdate<Schema::IndexBuild::Issue>(indexInfo.Issue),
+ NIceDb::TUpdate<Schema::IndexBuild::Issue>(indexInfo.GetIssue()),
NIceDb::TUpdate<Schema::IndexBuild::StartTime>(indexInfo.StartTime.Seconds()),
NIceDb::TUpdate<Schema::IndexBuild::EndTime>(indexInfo.EndTime.Seconds())
);
@@ -139,7 +139,7 @@ void TSchemeShard::PersistBuildIndexCancelRequest(NIceDb::TNiceDb& db, const TIn
void TSchemeShard::PersistBuildIndexIssue(NIceDb::TNiceDb& db, const TIndexBuildInfo& indexInfo) {
db.Table<Schema::IndexBuild>().Key(indexInfo.Id).Update(
- NIceDb::TUpdate<Schema::IndexBuild::Issue>(indexInfo.Issue));
+ NIceDb::TUpdate<Schema::IndexBuild::Issue>(indexInfo.GetIssue()));
}
void TSchemeShard::PersistBuildIndexAlterMainTableTxId(NIceDb::TNiceDb& db, const TIndexBuildInfo& indexInfo) {
@@ -314,9 +314,12 @@ void TSchemeShard::PersistBuildIndexForget(NIceDb::TNiceDb& db, const TIndexBuil
void TSchemeShard::Resume(const TDeque<TIndexBuildId>& indexIds, const TActorContext& ctx) {
for (const auto& id : indexIds) {
- if (IndexBuilds.contains(id)) {
- Execute(CreateTxProgress(id), ctx);
+ const auto* buildInfoPtr = IndexBuilds.FindPtr(id);
+ if (!buildInfoPtr || buildInfoPtr->Get()->IsBroken) {
+ continue;
}
+
+ Execute(CreateTxProgress(id), ctx);
}
}
@@ -331,7 +334,7 @@ void TSchemeShard::SetupRouting(const TDeque<TIndexBuildId>& indexIds, const TAc
auto handle = [&] (auto txId) {
if (txId) {
auto [it, emplaced] = TxIdToIndexBuilds.try_emplace(txId, buildInfo.Id);
- Y_ABORT_UNLESS(it->second == buildInfo.Id);
+ Y_ENSURE(it->second == buildInfo.Id);
}
};
diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp
index 7e7ad2af6e5..56a13b97438 100644
--- a/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp
@@ -82,7 +82,9 @@ public:
}
}
- TIndexBuildInfo::TPtr buildInfo = new TIndexBuildInfo(BuildId, uid);
+ TIndexBuildInfo::TPtr buildInfo = new TIndexBuildInfo();
+ buildInfo->Id = BuildId;
+ buildInfo->Uid = uid;
buildInfo->DomainPathId = domainPath.Base()->PathId;
buildInfo->TablePathId = tablePath.Base()->PathId;
diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp
index 85f453f6faf..64e534407cd 100644
--- a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp
@@ -1123,6 +1123,10 @@ public:
LOG_N("TTxBuildProgress: Execute: " << BuildId << " " << buildInfo.State);
LOG_D("TTxBuildProgress: Execute: " << BuildId << " " << buildInfo.State << " " << buildInfo);
+ if (buildInfo.IsBroken) {
+ return true;
+ }
+
switch (buildInfo.State) {
case TIndexBuildInfo::EState::Invalid:
Y_ENSURE(false, "Unreachable");
@@ -1355,10 +1359,9 @@ public:
}
NIceDb::TNiceDb db(txc.DB);
- if (!buildInfo->Issue.Contains(exc.what())) {
- buildInfo->Issue += TStringBuilder() << "Unhandled exception " << exc.what();
+ if (buildInfo->AddIssue(TStringBuilder() << "Unhandled exception " << exc.what())) {
+ Self->PersistBuildIndexIssue(db, *buildInfo);
}
- Self->PersistBuildIndexIssue(db, *buildInfo);
if (buildInfo->State != TIndexBuildInfo::EState::Filling) {
// no idea how to gracefully stop index build otherwise
@@ -1497,10 +1500,9 @@ public:
}
NIceDb::TNiceDb db(txc.DB);
- if (!buildInfo->Issue.Contains(exc.what())) {
- buildInfo->Issue += TStringBuilder() << "Unhandled exception " << exc.what();
+ if (buildInfo->AddIssue(TStringBuilder() << "Unhandled exception " << exc.what())) {
+ Self->PersistBuildIndexIssue(db, *buildInfo);
}
- Self->PersistBuildIndexIssue(db, *buildInfo);
if (buildInfo->State != TIndexBuildInfo::EState::Filling) {
// most replies are used at Filling stage
@@ -1675,11 +1677,11 @@ public:
break;
case NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR:
case NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST:
- buildInfo.Issue += TStringBuilder()
+ buildInfo.AddIssue(TStringBuilder()
<< "One of the shards report " << shardStatus.Status
<< " at Filling stage, process has to be canceled"
<< ", shardId: " << shardId
- << ", shardIdx: " << shardIdx;
+ << ", shardIdx: " << shardIdx);
Self->PersistBuildIndexIssue(db, buildInfo);
ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying);
Progress(BuildId);
@@ -1776,11 +1778,11 @@ public:
break;
case NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR:
case NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST:
- buildInfo.Issue += TStringBuilder()
+ buildInfo.AddIssue(TStringBuilder()
<< "One of the shards report " << shardStatus.Status
<< " at Filling stage, process has to be canceled"
<< ", shardId: " << shardId
- << ", shardIdx: " << shardIdx;
+ << ", shardIdx: " << shardIdx);
Self->PersistBuildIndexIssue(db, buildInfo);
ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying);
Progress(BuildId);
@@ -1877,11 +1879,11 @@ public:
break;
case NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR:
case NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST:
- buildInfo.Issue += TStringBuilder()
+ buildInfo.AddIssue(TStringBuilder()
<< "One of the shards report " << shardStatus.Status
<< " at Filling stage, process has to be canceled"
<< ", shardId: " << shardId
- << ", shardIdx: " << shardIdx;
+ << ", shardIdx: " << shardIdx);
Self->PersistBuildIndexIssue(db, buildInfo);
ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying);
Progress(BuildId);
@@ -1978,11 +1980,11 @@ public:
break;
case NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR:
case NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST:
- buildInfo.Issue += TStringBuilder()
+ buildInfo.AddIssue(TStringBuilder()
<< "One of the shards report " << shardStatus.Status
<< " at Filling stage, process has to be canceled"
<< ", shardId: " << shardId
- << ", shardIdx: " << shardIdx;
+ << ", shardIdx: " << shardIdx);
Self->PersistBuildIndexIssue(db, buildInfo);
ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying);
Progress(BuildId);
@@ -2051,7 +2053,7 @@ public:
} else {
NYql::TIssues issues;
NYql::IssuesFromMessage(record.GetIssues(), issues);
- buildInfo.Issue += issues.ToString();
+ buildInfo.AddIssue(issues.ToString());
Self->PersistBuildIndexIssue(db, buildInfo);
ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying);
Progress(BuildId);
@@ -2174,11 +2176,11 @@ public:
break;
case NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR:
case NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST:
- buildInfo.Issue += TStringBuilder()
+ buildInfo.AddIssue(TStringBuilder()
<< "One of the shards report " << shardStatus.Status
<< " at Filling stage, process has to be canceled"
<< ", shardId: " << shardId
- << ", shardIdx: " << shardIdx;
+ << ", shardIdx: " << shardIdx);
Self->PersistBuildIndexIssue(db, buildInfo);
ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying);
Progress(BuildId);
@@ -2303,14 +2305,14 @@ public:
auto& response = responseEv->Record;
response.SetStatus(status);
- if (buildInfo.Issue) {
- AddIssue(response.MutableIssues(), buildInfo.Issue);
+ if (buildInfo.GetIssue()) {
+ AddIssue(response.MutableIssues(), buildInfo.GetIssue());
}
LOG_N("TIndexBuilder::TTxReply: ReplyOnCreation"
<< ", BuildIndexId: " << buildInfo.Id
<< ", status: " << Ydb::StatusIds::StatusCode_Name(status)
- << ", error: " << buildInfo.Issue
+ << ", error: " << buildInfo.GetIssue()
<< ", replyTo: " << buildInfo.CreateSender.ToString()
<< ", message: " << responseEv->Record.ShortDebugString());
@@ -2352,10 +2354,10 @@ public:
auto statusCode = TranslateStatusCode(record.GetStatus());
if (statusCode != Ydb::StatusIds::SUCCESS) {
- buildInfo.Issue += TStringBuilder()
+ buildInfo.AddIssue(TStringBuilder()
<< "At " << state << " state got unsuccess propose result"
<< ", status: " << NKikimrScheme::EStatus_Name(record.GetStatus())
- << ", reason: " << record.GetReason();
+ << ", reason: " << record.GetReason());
Self->PersistBuildIndexIssue(db, buildInfo);
Self->PersistBuildIndexForget(db, buildInfo);
EraseBuildInfo(buildInfo);
@@ -2371,10 +2373,10 @@ public:
Y_ENSURE(false, "NEED MORE TESTING");
// no op
} else {
- buildInfo.Issue += TStringBuilder()
+ buildInfo.AddIssue(TStringBuilder()
<< "At " << state << " state got unsuccess propose result"
<< ", status: " << NKikimrScheme::EStatus_Name(record.GetStatus())
- << ", reason: " << record.GetReason();
+ << ", reason: " << record.GetReason());
Self->PersistBuildIndexIssue(db, buildInfo);
ChangeState(buildInfo.Id, to);
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp
index 647c7b8e44e..aadff71fa7a 100644
--- a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp
@@ -197,8 +197,8 @@ void TSchemeShard::TIndexBuilder::TTxBase::Progress(TIndexBuildId id) {
void TSchemeShard::TIndexBuilder::TTxBase::Fill(NKikimrIndexBuilder::TIndexBuild& index, const TIndexBuildInfo& indexInfo) {
index.SetId(ui64(indexInfo.Id));
- if (indexInfo.Issue) {
- AddIssue(index.MutableIssues(), indexInfo.Issue);
+ if (indexInfo.GetIssue()) {
+ AddIssue(index.MutableIssues(), indexInfo.GetIssue());
}
if (indexInfo.StartTime != TInstant::Zero()) {
*index.MutableStartTime() = SecondsToProtoTimeStamp(indexInfo.StartTime.Seconds());
diff --git a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp
index a505f61165e..aff34a00ff9 100644
--- a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp
@@ -847,7 +847,7 @@ private:
return Nothing();
}
- return indexInfo.Issue;
+ return indexInfo.GetIssue();
}
TString GetIssues(const NKikimrIndexBuilder::TEvCreateResponse& proto) {
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h
index f1cc1387fb6..d2b26d54355 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.h
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h
@@ -3337,9 +3337,12 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
TKMeans KMeans;
EState State = EState::Invalid;
+private:
TString Issue;
+public:
TInstant StartTime = TInstant::Zero();
TInstant EndTime = TInstant::Zero();
+ bool IsBroken = false;
TSet<TActorId> Subscribers;
@@ -3515,11 +3518,6 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
void AddParent(const TSerializedTableRange& range, TShardIdx shard);
- TIndexBuildInfo(TIndexBuildId id, TString uid)
- : Id(id)
- , Uid(uid)
- {}
-
template<class TRow>
void AddBuildColumnInfo(const TRow& row) {
TString columnName = row.template GetValue<Schema::BuildColumnOperationSettings::ColumnName>();
@@ -3559,11 +3557,26 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
}
template<class TRow>
- static TIndexBuildInfo::TPtr FromRow(const TRow& row) {
+ static void FillFromRow(const TRow& row, TIndexBuildInfo* indexInfo) {
+ Y_ENSURE(indexInfo); // TODO: pass by ref
+
TIndexBuildId id = row.template GetValue<Schema::IndexBuild::Id>();
TString uid = row.template GetValue<Schema::IndexBuild::Uid>();
- TIndexBuildInfo::TPtr indexInfo = new TIndexBuildInfo(id, uid);
+ // note: essential fields go first to be filled if an error occurs
+ indexInfo->Id = id;
+ indexInfo->Uid = uid;
+
+ indexInfo->State = TIndexBuildInfo::EState(
+ row.template GetValue<Schema::IndexBuild::State>());
+ indexInfo->Issue =
+ row.template GetValueOrDefault<Schema::IndexBuild::Issue>();
+
+ // note: please note that here we specify BuildSecondaryIndex as operation default,
+ // because previously this table was dedicated for build secondary index operations only.
+ indexInfo->BuildKind = TIndexBuildInfo::EBuildKind(
+ row.template GetValueOrDefault<Schema::IndexBuild::BuildKind>(
+ ui32(TIndexBuildInfo::EBuildKind::BuildSecondaryIndex)));
indexInfo->DomainPathId =
TPathId(row.template GetValue<Schema::IndexBuild::DomainOwnerId>(),
@@ -3576,40 +3589,6 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
indexInfo->IndexName = row.template GetValue<Schema::IndexBuild::IndexName>();
indexInfo->IndexType = row.template GetValue<Schema::IndexBuild::IndexType>();
- // note: please note that here we specify BuildSecondaryIndex as operation default,
- // because previosly this table was dedicated for build secondary index operations only.
- indexInfo->BuildKind = TIndexBuildInfo::EBuildKind(
- row.template GetValueOrDefault<Schema::IndexBuild::BuildKind>(
- ui32(TIndexBuildInfo::EBuildKind::BuildSecondaryIndex)));
-
- // Restore the operation details: ImplTableDescriptions and SpecializedIndexDescription.
- if (row.template HaveValue<Schema::IndexBuild::CreationConfig>()) {
- NKikimrSchemeOp::TIndexCreationConfig creationConfig;
- Y_ENSURE(creationConfig.ParseFromString(row.template GetValue<Schema::IndexBuild::CreationConfig>()));
-
- auto& descriptions = *creationConfig.MutableIndexImplTableDescriptions();
- indexInfo->ImplTableDescriptions.reserve(descriptions.size());
- for (auto& description : descriptions) {
- indexInfo->ImplTableDescriptions.emplace_back(std::move(description));
- }
-
- switch (creationConfig.GetSpecializedIndexDescriptionCase()) {
- case NKikimrSchemeOp::TIndexCreationConfig::kVectorIndexKmeansTreeDescription: {
- auto& desc = *creationConfig.MutableVectorIndexKmeansTreeDescription();
- indexInfo->KMeans.K = std::max<ui32>(2, desc.settings().clusters());
- indexInfo->KMeans.Levels = indexInfo->IsBuildPrefixedVectorIndex() + std::max<ui32>(1, desc.settings().levels());
- indexInfo->SpecializedIndexDescription =std::move(desc);
- } break;
- case NKikimrSchemeOp::TIndexCreationConfig::SPECIALIZEDINDEXDESCRIPTION_NOT_SET:
- /* do nothing */
- break;
- }
- }
-
- indexInfo->State = TIndexBuildInfo::EState(
- row.template GetValue<Schema::IndexBuild::State>());
- indexInfo->Issue =
- row.template GetValueOrDefault<Schema::IndexBuild::Issue>();
indexInfo->CancelRequested =
row.template GetValueOrDefault<Schema::IndexBuild::CancelRequest>(false);
if (row.template HaveValue<Schema::IndexBuild::UserSID>()) {
@@ -3697,7 +3676,29 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
row.template GetValueOrDefault<Schema::IndexBuild::ReadBytesProcessed>(0),
};
- return indexInfo;
+ // Restore the operation details: ImplTableDescriptions and SpecializedIndexDescription.
+ if (row.template HaveValue<Schema::IndexBuild::CreationConfig>()) {
+ NKikimrSchemeOp::TIndexCreationConfig creationConfig;
+ Y_ENSURE(creationConfig.ParseFromString(row.template GetValue<Schema::IndexBuild::CreationConfig>()));
+
+ auto& descriptions = *creationConfig.MutableIndexImplTableDescriptions();
+ indexInfo->ImplTableDescriptions.reserve(descriptions.size());
+ for (auto& description : descriptions) {
+ indexInfo->ImplTableDescriptions.emplace_back(std::move(description));
+ }
+
+ switch (creationConfig.GetSpecializedIndexDescriptionCase()) {
+ case NKikimrSchemeOp::TIndexCreationConfig::kVectorIndexKmeansTreeDescription: {
+ auto& desc = *creationConfig.MutableVectorIndexKmeansTreeDescription();
+ indexInfo->KMeans.K = std::max<ui32>(2, desc.settings().clusters());
+ indexInfo->KMeans.Levels = indexInfo->IsBuildPrefixedVectorIndex() + std::max<ui32>(1, desc.settings().levels());
+ indexInfo->SpecializedIndexDescription =std::move(desc);
+ } break;
+ case NKikimrSchemeOp::TIndexCreationConfig::SPECIALIZEDINDEXDESCRIPTION_NOT_SET:
+ /* do nothing */
+ break;
+ }
+ }
}
template<class TRow>
@@ -3791,6 +3792,23 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
Subscribers.insert(actorID);
}
+ const TString& GetIssue() const {
+ return Issue;
+ }
+
+ bool AddIssue(TString issue) {
+ if (Issue.Contains(issue)) { // deduplication
+ return false;
+ }
+
+ if (Issue) {
+ // TODO: store as list?
+ Issue += "; ";
+ }
+ Issue += issue;
+ return true;
+ }
+
float CalcProgressPercent() const {
const auto total = Shards.size();
const auto done = DoneShards.size();
@@ -3942,9 +3960,10 @@ inline void Out<NKikimr::NSchemeShard::TIndexBuildInfo>
}
o << ", State: " << info.State;
+ o << ", IsBroken: " << info.IsBroken;
o << ", IsCancellationRequested: " << info.CancelRequested;
- o << ", Issue: " << info.Issue;
+ o << ", Issue: " << info.GetIssue();
o << ", SubscribersCount: " << info.Subscribers.size();
o << ", CreateSender: " << info.CreateSender.ToString();
diff --git a/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp b/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp
index 49e2b911029..5b7b54e8ee8 100644
--- a/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp
+++ b/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp
@@ -566,4 +566,83 @@ Y_UNIT_TEST_SUITE (VectorIndexBuildTest) {
UNIT_ASSERT_STRING_CONTAINS(buildIndexOperation.DebugString(), "Unreachable");
}
}
+
+ Y_UNIT_TEST(TTxInit_Throws) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+ ui64 txId = 100;
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::BUILD_INDEX, NLog::PRI_TRACE);
+
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "vectors"
+ Columns { Name: "id" Type: "Uint64" }
+ Columns { Name: "embedding" Type: "String" }
+ KeyColumnNames: [ "id" ]
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ NYdb::NTable::TGlobalIndexSettings globalIndexSettings;
+
+ std::unique_ptr<NYdb::NTable::TKMeansTreeSettings> kmeansTreeSettings;
+ {
+ Ydb::Table::KMeansTreeSettings proto;
+ UNIT_ASSERT(google::protobuf::TextFormat::ParseFromString(R"(
+ settings {
+ metric: DISTANCE_COSINE
+ vector_type: VECTOR_TYPE_FLOAT
+ vector_dimension: 1024
+ }
+ levels: 5
+ clusters: 4
+ )", &proto));
+ using T = NYdb::NTable::TKMeansTreeSettings;
+ kmeansTreeSettings = std::make_unique<T>(T::FromProto(proto));
+ }
+
+ const ui64 buildIndexTx = ++txId;
+ const TVector<TString> dataColumns;
+ const TVector<TString> indexColumns{"embedding"};
+ TestBuildIndex(runtime, buildIndexTx, TTestTxConfig::SchemeShard, "/MyRoot", "/MyRoot/vectors", TBuildIndexConfig{
+ "by_embedding", NKikimrSchemeOp::EIndexTypeGlobalVectorKmeansTree, indexColumns, dataColumns,
+ { globalIndexSettings, globalIndexSettings, globalIndexSettings }, std::move(kmeansTreeSettings)
+ });
+
+ env.TestWaitNotification(runtime, buildIndexTx);
+
+ {
+ auto buildIndexOperation = TestGetBuildIndex(runtime, TTestTxConfig::SchemeShard, "/MyRoot", buildIndexTx);
+ UNIT_ASSERT_VALUES_EQUAL_C(
+ buildIndexOperation.GetIndexBuild().GetState(), Ydb::Table::IndexBuildState::STATE_DONE,
+ buildIndexOperation.DebugString()
+ );
+ }
+
+ {
+ TString writeQuery = Sprintf(R"(
+ (
+ (let key '( '('Id (Uint64 '%lu)) ) )
+ (let value '('('CreationConfig (String 'aaaaaaaa)) ) )
+ (return (AsList (UpdateRow 'IndexBuild key value) ))
+ )
+ )", buildIndexTx);
+ NKikimrMiniKQL::TResult result;
+ TString err;
+ NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, TTestTxConfig::SchemeShard, writeQuery, result, err);
+ UNIT_ASSERT_VALUES_EQUAL_C(status, NKikimrProto::EReplyStatus::OK, err);
+ }
+
+ RebootTablet(runtime, TTestTxConfig::SchemeShard, runtime.AllocateEdgeActor());
+
+ {
+ auto buildIndexOperation = TestGetBuildIndex(runtime, TTestTxConfig::SchemeShard, "/MyRoot", buildIndexTx);
+ UNIT_ASSERT_VALUES_EQUAL_C(
+ buildIndexOperation.GetIndexBuild().GetState(), Ydb::Table::IndexBuildState::STATE_DONE,
+ buildIndexOperation.DebugString()
+ );
+ UNIT_ASSERT_STRING_CONTAINS(buildIndexOperation.DebugString(), "Init IndexBuild unhandled exception");
+ UNIT_ASSERT_STRING_CONTAINS(buildIndexOperation.DebugString(), "Condition violated: `creationConfig.ParseFromString");
+ }
+ }
}