diff options
author | kungurtsev <kungasc@ydb.tech> | 2025-06-05 17:17:12 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-06-05 17:17:12 +0200 |
commit | aa1663fc442172a2428feb9c36fc0a925fe0ee6d (patch) | |
tree | 872a74a2435af61b47389e9fb4b5fdd21a782ae3 | |
parent | 1180c0895c16bb0cedf5a345f31df743b5d5e4b8 (diff) | |
download | ydb-aa1663fc442172a2428feb9c36fc0a925fe0ee6d.tar.gz |
Handle unhandled exceptions during build index SchemeShard init (#19312)
9 files changed, 254 insertions, 122 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index 7817bc65ae9..924c1ca8678 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -4561,6 +4561,36 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { // Read index build { + auto fillBuildInfoSafe = [&](TIndexBuildInfo& buildInfo, const TString& stepName, const auto& fill) { + try { + fill(buildInfo); + } catch (const std::exception& exc) { + LOG_ERROR_S(ctx, NKikimrServices::BUILD_INDEX, + "Init " << stepName << " unhandled exception, id#" << buildInfo.Id + << " " << TypeName(exc) << ": " << exc.what() << Endl + << TBackTrace::FromCurrentException().PrintToString() + << ", TIndexBuildInfo: " << buildInfo); + + // in-memory volatile state: + buildInfo.IsBroken = true; + buildInfo.AddIssue(TStringBuilder() << "Init " << stepName << " unhandled exception " << exc.what()); + } + }; + + auto fillBuildInfoByIdSafe = [&](TIndexBuildId id, const TString& stepName, const auto& fill) { + const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(id); + Y_ASSERT(buildInfoPtr); + if (!buildInfoPtr) { + LOG_ERROR_S(ctx, NKikimrServices::BUILD_INDEX, + "Init " << stepName << " BuildInfo not found: id#" << id); + return; + } + auto& buildInfo = *buildInfoPtr->Get(); + if (!buildInfo.IsBroken) { + fillBuildInfoSafe(buildInfo, stepName, fill); + } + }; + // read main info { auto rowset = db.Table<Schema::IndexBuild>().Range().Select(); @@ -4569,17 +4599,21 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { } while (!rowset.EndOfSet()) { - TIndexBuildInfo::TPtr indexInfo = TIndexBuildInfo::FromRow(rowset); - - auto [it, emplaced] = Self->IndexBuilds.emplace(indexInfo->Id, indexInfo); - Y_ABORT_UNLESS(emplaced); - if (indexInfo->Uid) { - // TODO(mbkkt) It also should be unique, but we're not sure. - Y_ASSERT(!Self->IndexBuildsByUid.contains(indexInfo->Uid)); - Self->IndexBuildsByUid[indexInfo->Uid] = indexInfo; + TIndexBuildInfo::TPtr buildInfo = new TIndexBuildInfo(); + fillBuildInfoSafe(*buildInfo, "IndexBuild", [&](TIndexBuildInfo& buildInfo) { + TIndexBuildInfo::FillFromRow(rowset, &buildInfo); + }); + + // Note: broken build are also added to IndexBuilds + Y_ASSERT(!Self->IndexBuilds.contains(buildInfo->Id)); + Self->IndexBuilds[buildInfo->Id] = buildInfo; + + if (buildInfo->Uid) { + Y_ASSERT(!Self->IndexBuildsByUid.contains(buildInfo->Uid)); + Self->IndexBuildsByUid[buildInfo->Uid] = buildInfo; } - OnComplete.ToProgress(indexInfo->Id); + OnComplete.ToProgress(buildInfo->Id); if (!rowset.Next()) { return false; @@ -4601,19 +4635,18 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { while (!rowset.EndOfSet()) { TIndexBuildId id = rowset.GetValue<Schema::KMeansTreeProgress::Id>(); - const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(id); - Y_VERIFY_S(buildInfoPtr, "BuildIndex not found: id# " << id); - auto& buildInfo = *buildInfoPtr->Get(); - buildInfo.KMeans.Set( - rowset.GetValue<Schema::KMeansTreeProgress::Level>(), - rowset.GetValue<Schema::KMeansTreeProgress::ParentBegin>(), - rowset.GetValue<Schema::KMeansTreeProgress::Parent>(), - rowset.GetValue<Schema::KMeansTreeProgress::ChildBegin>(), - rowset.GetValue<Schema::KMeansTreeProgress::Child>(), - rowset.GetValue<Schema::KMeansTreeProgress::State>(), - rowset.GetValue<Schema::KMeansTreeProgress::TableSize>() - ); - buildInfo.Sample.Rows.reserve(buildInfo.KMeans.K * 2); + fillBuildInfoByIdSafe(id, "KMeansTreeProgress", [&](TIndexBuildInfo& buildInfo) { + buildInfo.KMeans.Set( + rowset.GetValue<Schema::KMeansTreeProgress::Level>(), + rowset.GetValue<Schema::KMeansTreeProgress::ParentBegin>(), + rowset.GetValue<Schema::KMeansTreeProgress::Parent>(), + rowset.GetValue<Schema::KMeansTreeProgress::ChildBegin>(), + rowset.GetValue<Schema::KMeansTreeProgress::Child>(), + rowset.GetValue<Schema::KMeansTreeProgress::State>(), + rowset.GetValue<Schema::KMeansTreeProgress::TableSize>() + ); + buildInfo.Sample.Rows.reserve(buildInfo.KMeans.K * 2); + }); if (!rowset.Next()) { return false; @@ -4632,13 +4665,12 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { size_t sampleCount = 0; while (!rowset.EndOfSet()) { TIndexBuildId id = rowset.GetValue<Schema::KMeansTreeSample::Id>(); - const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(id); - Y_VERIFY_S(buildInfoPtr, "BuildIndex not found: id# " << id); - auto& buildInfo = *buildInfoPtr->Get(); - buildInfo.Sample.Add( - rowset.GetValue<Schema::KMeansTreeSample::Probability>(), - rowset.GetValue<Schema::KMeansTreeSample::Data>() - ); + fillBuildInfoByIdSafe(id, "KMeansTreeSample", [&](TIndexBuildInfo& buildInfo) { + buildInfo.Sample.Add( + rowset.GetValue<Schema::KMeansTreeSample::Probability>(), + rowset.GetValue<Schema::KMeansTreeSample::Data>() + ); + }); sampleCount++; if (!rowset.Next()) { @@ -4660,11 +4692,9 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { while (!rowset.EndOfSet()) { TIndexBuildId id = rowset.GetValue<Schema::IndexBuildColumns::Id>(); - const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(id); - Y_VERIFY_S(buildInfoPtr, "BuildIndex not found" - << ": id# " << id); - auto& buildInfo = *buildInfoPtr->Get(); - buildInfo.AddIndexColumnInfo(rowset); + fillBuildInfoByIdSafe(id, "IndexBuildColumns", [&](TIndexBuildInfo& buildInfo) { + buildInfo.AddIndexColumnInfo(rowset); + }); if (!rowset.Next()) { return false; @@ -4680,11 +4710,9 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { while (!rowset.EndOfSet()) { TIndexBuildId id = rowset.GetValue<Schema::BuildColumnOperationSettings::Id>(); - const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(id); - Y_VERIFY_S(buildInfoPtr, "BuildIndex not found" - << ": id# " << id); - auto& buildInfo = *buildInfoPtr->Get(); - buildInfo.AddBuildColumnInfo(rowset); + fillBuildInfoByIdSafe(id, "BuildColumnOperationSettings", [&](TIndexBuildInfo& buildInfo) { + buildInfo.AddBuildColumnInfo(rowset); + }); if (!rowset.Next()) { return false; @@ -4701,11 +4729,9 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { while (!rowset.EndOfSet()) { TIndexBuildId id = rowset.GetValue<Schema::IndexBuildShardStatus::Id>(); - const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(id); - Y_VERIFY_S(buildInfoPtr, "BuildIndex not found" - << ": id# " << id); - auto& buildInfo = *buildInfoPtr->Get(); - buildInfo.AddShardStatus(rowset); + fillBuildInfoByIdSafe(id, "IndexBuildShardStatus", [&](TIndexBuildInfo& buildInfo) { + buildInfo.AddShardStatus(rowset); + }); if (!rowset.Next()) { return false; diff --git a/ydb/core/tx/schemeshard/schemeshard__monitoring.cpp b/ydb/core/tx/schemeshard/schemeshard__monitoring.cpp index 93d1c82792c..405baa3b81f 100644 --- a/ydb/core/tx/schemeshard/schemeshard__monitoring.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__monitoring.cpp @@ -818,7 +818,8 @@ private: << "CancelRequested: " << (info.CancelRequested ? "YES" : "NO") << Endl << "State: " << info.State << Endl - << "Issue: " << info.Issue << Endl + << "IsBroken: " << info.IsBroken << Endl + << "Issue: " << info.GetIssue() << Endl << "Shards.size: " << info.Shards.size() << Endl << "ToUploadShards.size: " << info.ToUploadShards.size() << Endl diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index.cpp index 58850d7cf33..9016995a00f 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index.cpp @@ -53,7 +53,7 @@ void TSchemeShard::Handle(TEvPrivate::TEvIndexBuildingMakeABill::TPtr& ev, const } void TSchemeShard::PersistCreateBuildIndex(NIceDb::TNiceDb& db, const TIndexBuildInfo& info) { - Y_ABORT_UNLESS(info.BuildKind != TIndexBuildInfo::EBuildKind::BuildKindUnspecified); + Y_ENSURE(info.BuildKind != TIndexBuildInfo::EBuildKind::BuildKindUnspecified); auto persistedBuildIndex = db.Table<Schema::IndexBuild>().Key(info.Id); persistedBuildIndex.Update( NIceDb::TUpdate<Schema::IndexBuild::Uid>(info.Uid), @@ -126,7 +126,7 @@ void TSchemeShard::PersistCreateBuildIndex(NIceDb::TNiceDb& db, const TIndexBuil void TSchemeShard::PersistBuildIndexState(NIceDb::TNiceDb& db, const TIndexBuildInfo& indexInfo) { db.Table<Schema::IndexBuild>().Key(indexInfo.Id).Update( NIceDb::TUpdate<Schema::IndexBuild::State>(ui32(indexInfo.State)), - NIceDb::TUpdate<Schema::IndexBuild::Issue>(indexInfo.Issue), + NIceDb::TUpdate<Schema::IndexBuild::Issue>(indexInfo.GetIssue()), NIceDb::TUpdate<Schema::IndexBuild::StartTime>(indexInfo.StartTime.Seconds()), NIceDb::TUpdate<Schema::IndexBuild::EndTime>(indexInfo.EndTime.Seconds()) ); @@ -139,7 +139,7 @@ void TSchemeShard::PersistBuildIndexCancelRequest(NIceDb::TNiceDb& db, const TIn void TSchemeShard::PersistBuildIndexIssue(NIceDb::TNiceDb& db, const TIndexBuildInfo& indexInfo) { db.Table<Schema::IndexBuild>().Key(indexInfo.Id).Update( - NIceDb::TUpdate<Schema::IndexBuild::Issue>(indexInfo.Issue)); + NIceDb::TUpdate<Schema::IndexBuild::Issue>(indexInfo.GetIssue())); } void TSchemeShard::PersistBuildIndexAlterMainTableTxId(NIceDb::TNiceDb& db, const TIndexBuildInfo& indexInfo) { @@ -314,9 +314,12 @@ void TSchemeShard::PersistBuildIndexForget(NIceDb::TNiceDb& db, const TIndexBuil void TSchemeShard::Resume(const TDeque<TIndexBuildId>& indexIds, const TActorContext& ctx) { for (const auto& id : indexIds) { - if (IndexBuilds.contains(id)) { - Execute(CreateTxProgress(id), ctx); + const auto* buildInfoPtr = IndexBuilds.FindPtr(id); + if (!buildInfoPtr || buildInfoPtr->Get()->IsBroken) { + continue; } + + Execute(CreateTxProgress(id), ctx); } } @@ -331,7 +334,7 @@ void TSchemeShard::SetupRouting(const TDeque<TIndexBuildId>& indexIds, const TAc auto handle = [&] (auto txId) { if (txId) { auto [it, emplaced] = TxIdToIndexBuilds.try_emplace(txId, buildInfo.Id); - Y_ABORT_UNLESS(it->second == buildInfo.Id); + Y_ENSURE(it->second == buildInfo.Id); } }; diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp index 7e7ad2af6e5..56a13b97438 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp @@ -82,7 +82,9 @@ public: } } - TIndexBuildInfo::TPtr buildInfo = new TIndexBuildInfo(BuildId, uid); + TIndexBuildInfo::TPtr buildInfo = new TIndexBuildInfo(); + buildInfo->Id = BuildId; + buildInfo->Uid = uid; buildInfo->DomainPathId = domainPath.Base()->PathId; buildInfo->TablePathId = tablePath.Base()->PathId; diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp index 85f453f6faf..64e534407cd 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp @@ -1123,6 +1123,10 @@ public: LOG_N("TTxBuildProgress: Execute: " << BuildId << " " << buildInfo.State); LOG_D("TTxBuildProgress: Execute: " << BuildId << " " << buildInfo.State << " " << buildInfo); + if (buildInfo.IsBroken) { + return true; + } + switch (buildInfo.State) { case TIndexBuildInfo::EState::Invalid: Y_ENSURE(false, "Unreachable"); @@ -1355,10 +1359,9 @@ public: } NIceDb::TNiceDb db(txc.DB); - if (!buildInfo->Issue.Contains(exc.what())) { - buildInfo->Issue += TStringBuilder() << "Unhandled exception " << exc.what(); + if (buildInfo->AddIssue(TStringBuilder() << "Unhandled exception " << exc.what())) { + Self->PersistBuildIndexIssue(db, *buildInfo); } - Self->PersistBuildIndexIssue(db, *buildInfo); if (buildInfo->State != TIndexBuildInfo::EState::Filling) { // no idea how to gracefully stop index build otherwise @@ -1497,10 +1500,9 @@ public: } NIceDb::TNiceDb db(txc.DB); - if (!buildInfo->Issue.Contains(exc.what())) { - buildInfo->Issue += TStringBuilder() << "Unhandled exception " << exc.what(); + if (buildInfo->AddIssue(TStringBuilder() << "Unhandled exception " << exc.what())) { + Self->PersistBuildIndexIssue(db, *buildInfo); } - Self->PersistBuildIndexIssue(db, *buildInfo); if (buildInfo->State != TIndexBuildInfo::EState::Filling) { // most replies are used at Filling stage @@ -1675,11 +1677,11 @@ public: break; case NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR: case NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST: - buildInfo.Issue += TStringBuilder() + buildInfo.AddIssue(TStringBuilder() << "One of the shards report " << shardStatus.Status << " at Filling stage, process has to be canceled" << ", shardId: " << shardId - << ", shardIdx: " << shardIdx; + << ", shardIdx: " << shardIdx); Self->PersistBuildIndexIssue(db, buildInfo); ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying); Progress(BuildId); @@ -1776,11 +1778,11 @@ public: break; case NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR: case NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST: - buildInfo.Issue += TStringBuilder() + buildInfo.AddIssue(TStringBuilder() << "One of the shards report " << shardStatus.Status << " at Filling stage, process has to be canceled" << ", shardId: " << shardId - << ", shardIdx: " << shardIdx; + << ", shardIdx: " << shardIdx); Self->PersistBuildIndexIssue(db, buildInfo); ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying); Progress(BuildId); @@ -1877,11 +1879,11 @@ public: break; case NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR: case NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST: - buildInfo.Issue += TStringBuilder() + buildInfo.AddIssue(TStringBuilder() << "One of the shards report " << shardStatus.Status << " at Filling stage, process has to be canceled" << ", shardId: " << shardId - << ", shardIdx: " << shardIdx; + << ", shardIdx: " << shardIdx); Self->PersistBuildIndexIssue(db, buildInfo); ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying); Progress(BuildId); @@ -1978,11 +1980,11 @@ public: break; case NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR: case NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST: - buildInfo.Issue += TStringBuilder() + buildInfo.AddIssue(TStringBuilder() << "One of the shards report " << shardStatus.Status << " at Filling stage, process has to be canceled" << ", shardId: " << shardId - << ", shardIdx: " << shardIdx; + << ", shardIdx: " << shardIdx); Self->PersistBuildIndexIssue(db, buildInfo); ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying); Progress(BuildId); @@ -2051,7 +2053,7 @@ public: } else { NYql::TIssues issues; NYql::IssuesFromMessage(record.GetIssues(), issues); - buildInfo.Issue += issues.ToString(); + buildInfo.AddIssue(issues.ToString()); Self->PersistBuildIndexIssue(db, buildInfo); ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying); Progress(BuildId); @@ -2174,11 +2176,11 @@ public: break; case NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR: case NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST: - buildInfo.Issue += TStringBuilder() + buildInfo.AddIssue(TStringBuilder() << "One of the shards report " << shardStatus.Status << " at Filling stage, process has to be canceled" << ", shardId: " << shardId - << ", shardIdx: " << shardIdx; + << ", shardIdx: " << shardIdx); Self->PersistBuildIndexIssue(db, buildInfo); ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying); Progress(BuildId); @@ -2303,14 +2305,14 @@ public: auto& response = responseEv->Record; response.SetStatus(status); - if (buildInfo.Issue) { - AddIssue(response.MutableIssues(), buildInfo.Issue); + if (buildInfo.GetIssue()) { + AddIssue(response.MutableIssues(), buildInfo.GetIssue()); } LOG_N("TIndexBuilder::TTxReply: ReplyOnCreation" << ", BuildIndexId: " << buildInfo.Id << ", status: " << Ydb::StatusIds::StatusCode_Name(status) - << ", error: " << buildInfo.Issue + << ", error: " << buildInfo.GetIssue() << ", replyTo: " << buildInfo.CreateSender.ToString() << ", message: " << responseEv->Record.ShortDebugString()); @@ -2352,10 +2354,10 @@ public: auto statusCode = TranslateStatusCode(record.GetStatus()); if (statusCode != Ydb::StatusIds::SUCCESS) { - buildInfo.Issue += TStringBuilder() + buildInfo.AddIssue(TStringBuilder() << "At " << state << " state got unsuccess propose result" << ", status: " << NKikimrScheme::EStatus_Name(record.GetStatus()) - << ", reason: " << record.GetReason(); + << ", reason: " << record.GetReason()); Self->PersistBuildIndexIssue(db, buildInfo); Self->PersistBuildIndexForget(db, buildInfo); EraseBuildInfo(buildInfo); @@ -2371,10 +2373,10 @@ public: Y_ENSURE(false, "NEED MORE TESTING"); // no op } else { - buildInfo.Issue += TStringBuilder() + buildInfo.AddIssue(TStringBuilder() << "At " << state << " state got unsuccess propose result" << ", status: " << NKikimrScheme::EStatus_Name(record.GetStatus()) - << ", reason: " << record.GetReason(); + << ", reason: " << record.GetReason()); Self->PersistBuildIndexIssue(db, buildInfo); ChangeState(buildInfo.Id, to); } diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp index 647c7b8e44e..aadff71fa7a 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp @@ -197,8 +197,8 @@ void TSchemeShard::TIndexBuilder::TTxBase::Progress(TIndexBuildId id) { void TSchemeShard::TIndexBuilder::TTxBase::Fill(NKikimrIndexBuilder::TIndexBuild& index, const TIndexBuildInfo& indexInfo) { index.SetId(ui64(indexInfo.Id)); - if (indexInfo.Issue) { - AddIssue(index.MutableIssues(), indexInfo.Issue); + if (indexInfo.GetIssue()) { + AddIssue(index.MutableIssues(), indexInfo.GetIssue()); } if (indexInfo.StartTime != TInstant::Zero()) { *index.MutableStartTime() = SecondsToProtoTimeStamp(indexInfo.StartTime.Seconds()); diff --git a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp index a505f61165e..aff34a00ff9 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp @@ -847,7 +847,7 @@ private: return Nothing(); } - return indexInfo.Issue; + return indexInfo.GetIssue(); } TString GetIssues(const NKikimrIndexBuilder::TEvCreateResponse& proto) { diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index f1cc1387fb6..d2b26d54355 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -3337,9 +3337,12 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> { TKMeans KMeans; EState State = EState::Invalid; +private: TString Issue; +public: TInstant StartTime = TInstant::Zero(); TInstant EndTime = TInstant::Zero(); + bool IsBroken = false; TSet<TActorId> Subscribers; @@ -3515,11 +3518,6 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> { void AddParent(const TSerializedTableRange& range, TShardIdx shard); - TIndexBuildInfo(TIndexBuildId id, TString uid) - : Id(id) - , Uid(uid) - {} - template<class TRow> void AddBuildColumnInfo(const TRow& row) { TString columnName = row.template GetValue<Schema::BuildColumnOperationSettings::ColumnName>(); @@ -3559,11 +3557,26 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> { } template<class TRow> - static TIndexBuildInfo::TPtr FromRow(const TRow& row) { + static void FillFromRow(const TRow& row, TIndexBuildInfo* indexInfo) { + Y_ENSURE(indexInfo); // TODO: pass by ref + TIndexBuildId id = row.template GetValue<Schema::IndexBuild::Id>(); TString uid = row.template GetValue<Schema::IndexBuild::Uid>(); - TIndexBuildInfo::TPtr indexInfo = new TIndexBuildInfo(id, uid); + // note: essential fields go first to be filled if an error occurs + indexInfo->Id = id; + indexInfo->Uid = uid; + + indexInfo->State = TIndexBuildInfo::EState( + row.template GetValue<Schema::IndexBuild::State>()); + indexInfo->Issue = + row.template GetValueOrDefault<Schema::IndexBuild::Issue>(); + + // note: please note that here we specify BuildSecondaryIndex as operation default, + // because previously this table was dedicated for build secondary index operations only. + indexInfo->BuildKind = TIndexBuildInfo::EBuildKind( + row.template GetValueOrDefault<Schema::IndexBuild::BuildKind>( + ui32(TIndexBuildInfo::EBuildKind::BuildSecondaryIndex))); indexInfo->DomainPathId = TPathId(row.template GetValue<Schema::IndexBuild::DomainOwnerId>(), @@ -3576,40 +3589,6 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> { indexInfo->IndexName = row.template GetValue<Schema::IndexBuild::IndexName>(); indexInfo->IndexType = row.template GetValue<Schema::IndexBuild::IndexType>(); - // note: please note that here we specify BuildSecondaryIndex as operation default, - // because previosly this table was dedicated for build secondary index operations only. - indexInfo->BuildKind = TIndexBuildInfo::EBuildKind( - row.template GetValueOrDefault<Schema::IndexBuild::BuildKind>( - ui32(TIndexBuildInfo::EBuildKind::BuildSecondaryIndex))); - - // Restore the operation details: ImplTableDescriptions and SpecializedIndexDescription. - if (row.template HaveValue<Schema::IndexBuild::CreationConfig>()) { - NKikimrSchemeOp::TIndexCreationConfig creationConfig; - Y_ENSURE(creationConfig.ParseFromString(row.template GetValue<Schema::IndexBuild::CreationConfig>())); - - auto& descriptions = *creationConfig.MutableIndexImplTableDescriptions(); - indexInfo->ImplTableDescriptions.reserve(descriptions.size()); - for (auto& description : descriptions) { - indexInfo->ImplTableDescriptions.emplace_back(std::move(description)); - } - - switch (creationConfig.GetSpecializedIndexDescriptionCase()) { - case NKikimrSchemeOp::TIndexCreationConfig::kVectorIndexKmeansTreeDescription: { - auto& desc = *creationConfig.MutableVectorIndexKmeansTreeDescription(); - indexInfo->KMeans.K = std::max<ui32>(2, desc.settings().clusters()); - indexInfo->KMeans.Levels = indexInfo->IsBuildPrefixedVectorIndex() + std::max<ui32>(1, desc.settings().levels()); - indexInfo->SpecializedIndexDescription =std::move(desc); - } break; - case NKikimrSchemeOp::TIndexCreationConfig::SPECIALIZEDINDEXDESCRIPTION_NOT_SET: - /* do nothing */ - break; - } - } - - indexInfo->State = TIndexBuildInfo::EState( - row.template GetValue<Schema::IndexBuild::State>()); - indexInfo->Issue = - row.template GetValueOrDefault<Schema::IndexBuild::Issue>(); indexInfo->CancelRequested = row.template GetValueOrDefault<Schema::IndexBuild::CancelRequest>(false); if (row.template HaveValue<Schema::IndexBuild::UserSID>()) { @@ -3697,7 +3676,29 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> { row.template GetValueOrDefault<Schema::IndexBuild::ReadBytesProcessed>(0), }; - return indexInfo; + // Restore the operation details: ImplTableDescriptions and SpecializedIndexDescription. + if (row.template HaveValue<Schema::IndexBuild::CreationConfig>()) { + NKikimrSchemeOp::TIndexCreationConfig creationConfig; + Y_ENSURE(creationConfig.ParseFromString(row.template GetValue<Schema::IndexBuild::CreationConfig>())); + + auto& descriptions = *creationConfig.MutableIndexImplTableDescriptions(); + indexInfo->ImplTableDescriptions.reserve(descriptions.size()); + for (auto& description : descriptions) { + indexInfo->ImplTableDescriptions.emplace_back(std::move(description)); + } + + switch (creationConfig.GetSpecializedIndexDescriptionCase()) { + case NKikimrSchemeOp::TIndexCreationConfig::kVectorIndexKmeansTreeDescription: { + auto& desc = *creationConfig.MutableVectorIndexKmeansTreeDescription(); + indexInfo->KMeans.K = std::max<ui32>(2, desc.settings().clusters()); + indexInfo->KMeans.Levels = indexInfo->IsBuildPrefixedVectorIndex() + std::max<ui32>(1, desc.settings().levels()); + indexInfo->SpecializedIndexDescription =std::move(desc); + } break; + case NKikimrSchemeOp::TIndexCreationConfig::SPECIALIZEDINDEXDESCRIPTION_NOT_SET: + /* do nothing */ + break; + } + } } template<class TRow> @@ -3791,6 +3792,23 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> { Subscribers.insert(actorID); } + const TString& GetIssue() const { + return Issue; + } + + bool AddIssue(TString issue) { + if (Issue.Contains(issue)) { // deduplication + return false; + } + + if (Issue) { + // TODO: store as list? + Issue += "; "; + } + Issue += issue; + return true; + } + float CalcProgressPercent() const { const auto total = Shards.size(); const auto done = DoneShards.size(); @@ -3942,9 +3960,10 @@ inline void Out<NKikimr::NSchemeShard::TIndexBuildInfo> } o << ", State: " << info.State; + o << ", IsBroken: " << info.IsBroken; o << ", IsCancellationRequested: " << info.CancelRequested; - o << ", Issue: " << info.Issue; + o << ", Issue: " << info.GetIssue(); o << ", SubscribersCount: " << info.Subscribers.size(); o << ", CreateSender: " << info.CreateSender.ToString(); diff --git a/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp b/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp index 49e2b911029..5b7b54e8ee8 100644 --- a/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp +++ b/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp @@ -566,4 +566,83 @@ Y_UNIT_TEST_SUITE (VectorIndexBuildTest) { UNIT_ASSERT_STRING_CONTAINS(buildIndexOperation.DebugString(), "Unreachable"); } } + + Y_UNIT_TEST(TTxInit_Throws) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::BUILD_INDEX, NLog::PRI_TRACE); + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "vectors" + Columns { Name: "id" Type: "Uint64" } + Columns { Name: "embedding" Type: "String" } + KeyColumnNames: [ "id" ] + )"); + env.TestWaitNotification(runtime, txId); + + NYdb::NTable::TGlobalIndexSettings globalIndexSettings; + + std::unique_ptr<NYdb::NTable::TKMeansTreeSettings> kmeansTreeSettings; + { + Ydb::Table::KMeansTreeSettings proto; + UNIT_ASSERT(google::protobuf::TextFormat::ParseFromString(R"( + settings { + metric: DISTANCE_COSINE + vector_type: VECTOR_TYPE_FLOAT + vector_dimension: 1024 + } + levels: 5 + clusters: 4 + )", &proto)); + using T = NYdb::NTable::TKMeansTreeSettings; + kmeansTreeSettings = std::make_unique<T>(T::FromProto(proto)); + } + + const ui64 buildIndexTx = ++txId; + const TVector<TString> dataColumns; + const TVector<TString> indexColumns{"embedding"}; + TestBuildIndex(runtime, buildIndexTx, TTestTxConfig::SchemeShard, "/MyRoot", "/MyRoot/vectors", TBuildIndexConfig{ + "by_embedding", NKikimrSchemeOp::EIndexTypeGlobalVectorKmeansTree, indexColumns, dataColumns, + { globalIndexSettings, globalIndexSettings, globalIndexSettings }, std::move(kmeansTreeSettings) + }); + + env.TestWaitNotification(runtime, buildIndexTx); + + { + auto buildIndexOperation = TestGetBuildIndex(runtime, TTestTxConfig::SchemeShard, "/MyRoot", buildIndexTx); + UNIT_ASSERT_VALUES_EQUAL_C( + buildIndexOperation.GetIndexBuild().GetState(), Ydb::Table::IndexBuildState::STATE_DONE, + buildIndexOperation.DebugString() + ); + } + + { + TString writeQuery = Sprintf(R"( + ( + (let key '( '('Id (Uint64 '%lu)) ) ) + (let value '('('CreationConfig (String 'aaaaaaaa)) ) ) + (return (AsList (UpdateRow 'IndexBuild key value) )) + ) + )", buildIndexTx); + NKikimrMiniKQL::TResult result; + TString err; + NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, TTestTxConfig::SchemeShard, writeQuery, result, err); + UNIT_ASSERT_VALUES_EQUAL_C(status, NKikimrProto::EReplyStatus::OK, err); + } + + RebootTablet(runtime, TTestTxConfig::SchemeShard, runtime.AllocateEdgeActor()); + + { + auto buildIndexOperation = TestGetBuildIndex(runtime, TTestTxConfig::SchemeShard, "/MyRoot", buildIndexTx); + UNIT_ASSERT_VALUES_EQUAL_C( + buildIndexOperation.GetIndexBuild().GetState(), Ydb::Table::IndexBuildState::STATE_DONE, + buildIndexOperation.DebugString() + ); + UNIT_ASSERT_STRING_CONTAINS(buildIndexOperation.DebugString(), "Init IndexBuild unhandled exception"); + UNIT_ASSERT_STRING_CONTAINS(buildIndexOperation.DebugString(), "Condition violated: `creationConfig.ParseFromString"); + } + } } |