diff options
author | kungurtsev <kungasc@ydb.tech> | 2025-06-04 16:21:39 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-06-04 16:21:39 +0200 |
commit | 1b9117ef252f8bc4fc40c1e9cab503a38a86bc5d (patch) | |
tree | 5c5b56e04ebcf0c36e15b07b2be6cbfe16449e9a | |
parent | ee494fe01cde439ee03e4394ca4aa8499f7aaf78 (diff) | |
download | ydb-1b9117ef252f8bc4fc40c1e9cab503a38a86bc5d.tar.gz |
Handle unhandled exceptions during build index on SchemeShard (#19246)
9 files changed, 365 insertions, 134 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__cancel.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__cancel.cpp index d520b6f2f66..760341aeed7 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index__cancel.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index__cancel.cpp @@ -10,7 +10,7 @@ using namespace NTabletFlatExecutor; struct TSchemeShard::TIndexBuilder::TTxCancel: public TSchemeShard::TIndexBuilder::TTxSimple<TEvIndexBuilder::TEvCancelRequest, TEvIndexBuilder::TEvCancelResponse> { public: explicit TTxCancel(TSelf* self, TEvIndexBuilder::TEvCancelRequest::TPtr& ev) - : TTxSimple(self, ev, TXTYPE_CANCEL_INDEX_BUILD) + : TTxSimple(self, TIndexBuildId(ev->Get()->Record.GetIndexBuildId()), ev, TXTYPE_CANCEL_INDEX_BUILD) {} bool DoExecute(TTransactionContext& txc, const TActorContext&) override { @@ -27,40 +27,39 @@ public: } const TPathId domainPathId = database.GetPathIdForDomain(); - TIndexBuildId indexBuildId = TIndexBuildId(record.GetIndexBuildId()); - const auto* indexBuildInfoPtr = Self->IndexBuilds.FindPtr(indexBuildId); + const auto* indexBuildInfoPtr = Self->IndexBuilds.FindPtr(BuildId); if (!indexBuildInfoPtr) { return Reply( Ydb::StatusIds::NOT_FOUND, - TStringBuilder() << "Index build process with id <" << indexBuildId << "> not found" + TStringBuilder() << "Index build process with id <" << BuildId << "> not found" ); } auto& indexBuildInfo = *indexBuildInfoPtr->Get(); if (indexBuildInfo.DomainPathId != domainPathId) { return Reply( Ydb::StatusIds::NOT_FOUND, - TStringBuilder() << "Index build process with id <" << indexBuildId << "> not found in database <" << record.GetDatabaseName() << ">" + TStringBuilder() << "Index build process with id <" << BuildId << "> not found in database <" << record.GetDatabaseName() << ">" ); } if (indexBuildInfo.IsFinished()) { return Reply( Ydb::StatusIds::PRECONDITION_FAILED, - TStringBuilder() << "Index build process with id <" << indexBuildId << "> has been finished already" + TStringBuilder() << "Index build process with id <" << BuildId << "> has been finished already" ); } if (indexBuildInfo.IsCancellationRequested()) { return Reply( Ydb::StatusIds::PRECONDITION_FAILED, - TStringBuilder() << "Index build process with id <" << indexBuildId << "> canceling already" + TStringBuilder() << "Index build process with id <" << BuildId << "> canceling already" ); } if (indexBuildInfo.State > TIndexBuildInfo::EState::Filling) { return Reply( Ydb::StatusIds::PRECONDITION_FAILED, - TStringBuilder() << "Index build process with id <" << indexBuildId << "> are almost done, cancellation has no sense" + TStringBuilder() << "Index build process with id <" << BuildId << "> are almost done, cancellation has no sense" ); } diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp index 85a7d215678..7e7ad2af6e5 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp @@ -14,7 +14,7 @@ using namespace NTabletFlatExecutor; class TSchemeShard::TIndexBuilder::TTxCreate: public TSchemeShard::TIndexBuilder::TTxSimple<TEvIndexBuilder::TEvCreateRequest, TEvIndexBuilder::TEvCreateResponse> { public: explicit TTxCreate(TSelf* self, TEvIndexBuilder::TEvCreateRequest::TPtr& ev) - : TTxSimple(self, ev, TXTYPE_CREATE_INDEX_BUILD) + : TTxSimple(self, TIndexBuildId(ev->Get()->Record.GetTxId()), ev, TXTYPE_CREATE_INDEX_BUILD) {} bool DoExecute(TTransactionContext& txc, const TActorContext& ctx) override { @@ -24,10 +24,9 @@ public: Response = MakeHolder<TEvIndexBuilder::TEvCreateResponse>(request.GetTxId()); - const auto id = TIndexBuildId(request.GetTxId()); - if (Self->IndexBuilds.contains(id)) { + if (Self->IndexBuilds.contains(BuildId)) { return Reply(Ydb::StatusIds::ALREADY_EXISTS, TStringBuilder() - << "Index build with id '" << id << "' already exists"); + << "Index build with id '" << BuildId << "' already exists"); } const TString& uid = GetUid(request.GetOperationParams()); @@ -83,7 +82,7 @@ public: } } - TIndexBuildInfo::TPtr buildInfo = new TIndexBuildInfo(id, uid); + TIndexBuildInfo::TPtr buildInfo = new TIndexBuildInfo(BuildId, uid); buildInfo->DomainPathId = domainPath.Base()->PathId; buildInfo->TablePathId = tablePath.Base()->PathId; @@ -198,14 +197,14 @@ public: Self->PersistBuildIndexState(db, *buildInfo); - auto [it, emplaced] = Self->IndexBuilds.emplace(id, buildInfo); + auto [it, emplaced] = Self->IndexBuilds.emplace(BuildId, buildInfo); Y_ASSERT(emplaced); if (uid) { std::tie(std::ignore, emplaced) = Self->IndexBuildsByUid.emplace(uid, buildInfo); Y_ASSERT(emplaced); } - Progress(id); + Progress(BuildId); return true; } diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__forget.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__forget.cpp index f681f79dbd8..ff1a8492729 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index__forget.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index__forget.cpp @@ -10,7 +10,7 @@ using namespace NTabletFlatExecutor; struct TSchemeShard::TIndexBuilder::TTxForget: public TSchemeShard::TIndexBuilder::TTxSimple<TEvIndexBuilder::TEvForgetRequest, TEvIndexBuilder::TEvForgetResponse> { public: explicit TTxForget(TSelf* self, TEvIndexBuilder::TEvForgetRequest::TPtr& ev) - : TTxSimple(self, ev, TXTYPE_FORGET_INDEX_BUILD) + : TTxSimple(self, TIndexBuildId(ev->Get()->Record.GetIndexBuildId()), ev, TXTYPE_FORGET_INDEX_BUILD) {} bool DoExecute(TTransactionContext& txc, const TActorContext&) override { @@ -27,26 +27,25 @@ public: } const TPathId domainPathId = database.GetPathIdForDomain(); - TIndexBuildId indexBuildId = TIndexBuildId(record.GetIndexBuildId()); - const auto* indexBuildInfoPtr = Self->IndexBuilds.FindPtr(indexBuildId); + const auto* indexBuildInfoPtr = Self->IndexBuilds.FindPtr(BuildId); if (!indexBuildInfoPtr) { return Reply( Ydb::StatusIds::NOT_FOUND, - TStringBuilder() << "Index build process with id <" << indexBuildId << "> not found" + TStringBuilder() << "Index build process with id <" << BuildId << "> not found" ); } const auto& indexBuildInfo = *indexBuildInfoPtr->Get(); if (indexBuildInfo.DomainPathId != domainPathId) { return Reply( Ydb::StatusIds::NOT_FOUND, - TStringBuilder() << "Index build process with id <" << indexBuildId << "> not found in database <" << record.GetDatabaseName() << ">" + TStringBuilder() << "Index build process with id <" << BuildId << "> not found in database <" << record.GetDatabaseName() << ">" ); } if (!indexBuildInfo.IsFinished()) { return Reply( Ydb::StatusIds::PRECONDITION_FAILED, - TStringBuilder() << "Index build process with id <" << indexBuildId << "> hasn't been finished yet" + TStringBuilder() << "Index build process with id <" << BuildId << "> hasn't been finished yet" ); } diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__get.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__get.cpp index 72af67a620b..16f4007b2bb 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index__get.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index__get.cpp @@ -11,7 +11,7 @@ using namespace NTabletFlatExecutor; struct TSchemeShard::TIndexBuilder::TTxGet: public TSchemeShard::TIndexBuilder::TTxSimple<TEvIndexBuilder::TEvGetRequest, TEvIndexBuilder::TEvGetResponse> { public: explicit TTxGet(TSelf* self, TEvIndexBuilder::TEvGetRequest::TPtr& ev) - : TTxSimple(self, ev, TXTYPE_GET_INDEX_BUILD, false) + : TTxSimple(self, TIndexBuildId(ev->Get()->Record.GetIndexBuildId()), ev, TXTYPE_GET_INDEX_BUILD, false) {} bool DoExecute(TTransactionContext&, const TActorContext&) override { @@ -28,20 +28,18 @@ public: } const TPathId domainPathId = database.GetPathIdForDomain(); - TIndexBuildId indexBuildId = TIndexBuildId(record.GetIndexBuildId()); - - const auto* indexBuildInfoPtr = Self->IndexBuilds.FindPtr(indexBuildId); + const auto* indexBuildInfoPtr = Self->IndexBuilds.FindPtr(BuildId); if (!indexBuildInfoPtr) { return Reply( Ydb::StatusIds::PRECONDITION_FAILED, - TStringBuilder() << "Index build process with id <" << indexBuildId << "> not found" + TStringBuilder() << "Index build process with id <" << BuildId << "> not found" ); } const auto& indexBuildInfo = *indexBuildInfoPtr->Get(); if (indexBuildInfo.DomainPathId != domainPathId) { return Reply( Ydb::StatusIds::BAD_REQUEST, - TStringBuilder() << "Index build process with id <" << indexBuildId << "> not found in database <" << record.GetDatabaseName() << ">" + TStringBuilder() << "Index build process with id <" << BuildId << "> not found in database <" << record.GetDatabaseName() << ">" ); } diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__list.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__list.cpp index 9b1402bcf9a..adef1160159 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index__list.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index__list.cpp @@ -15,7 +15,7 @@ private: static constexpr ui64 DefaultPage = 1; public: explicit TTxList(TSelf* self, TEvIndexBuilder::TEvListRequest::TPtr& ev) - : TTxSimple(self, ev, TXTYPE_LIST_INDEX_BUILD, false) + : TTxSimple(self, InvalidIndexBuildId, ev, TXTYPE_LIST_INDEX_BUILD, false) {} bool DoExecute(TTransactionContext&, const TActorContext&) override { diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp index dddcbce4aa2..85f453f6faf 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp @@ -51,7 +51,7 @@ protected: const NKikimrIndexBuilder::TIndexBuildScanSettings ScanSettings; const TActorId ResponseActorId; - const ui64 BuildIndexId = 0; + const TIndexBuildId BuildId; TIndexBuildInfo::TSample::TRows Init; std::shared_ptr<NTxProxy::TUploadTypes> Types; @@ -70,7 +70,7 @@ public: bool isPostingLevel, const NKikimrIndexBuilder::TIndexBuildScanSettings& scanSettings, const TActorId& responseActorId, - ui64 buildIndexId, + TIndexBuildId buildId, TIndexBuildInfo::TSample::TRows init, NTableIndex::TClusterId parent, NTableIndex::TClusterId child) @@ -78,13 +78,13 @@ public: , IsPostingLevel(isPostingLevel) , ScanSettings(scanSettings) , ResponseActorId(responseActorId) - , BuildIndexId(buildIndexId) + , BuildId(buildId) , Init(std::move(init)) , Parent(parent) , Child(child) { LogPrefix = TStringBuilder() - << "TUploadSampleK: BuildIndexId: " << BuildIndexId + << "TUploadSampleK: BuildIndexId: " << BuildId << " ResponseActorId: " << ResponseActorId; Y_ENSURE(!Init.empty()); Y_ENSURE(Parent < Child); @@ -186,7 +186,7 @@ private: } TAutoPtr<TEvIndexBuilder::TEvUploadSampleKResponse> response = new TEvIndexBuilder::TEvUploadSampleKResponse; - response->Record.SetId(BuildIndexId); + response->Record.SetId(ui64(BuildId)); response->Record.SetUploadRows(Rows->size()); response->Record.SetUploadBytes(RowsBytes); @@ -259,7 +259,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateIndexPropose( modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateColumnBuild); buildInfo.SerializeToProto(ss, modifyScheme.MutableInitiateColumnBuild()); } else { - Y_ABORT("Unknown operation kind while building CreateIndexPropose"); + Y_ENSURE(false, "Unknown operation kind while building CreateIndexPropose"); } LOG_DEBUG_S((TlsActivationContext->AsActorContext()), NKikimrServices::BUILD_INDEX, @@ -407,7 +407,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> AlterMainTablePropose( TString error; if (!ExtractColumnTypeInfo(typeInfo, typeMod, colInfo.DefaultFromLiteral.type(), status, error)) { // todo gvit fix that - Y_ABORT("failed to extract column type info"); + Y_ENSURE(false, "failed to extract column type info"); } col->SetType(NScheme::TypeName(typeInfo, typeMod)); @@ -510,8 +510,6 @@ using namespace NTabletFlatExecutor; struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuilder::TTxBase { private: - TIndexBuildId BuildId; - TMap<TTabletId, THolder<IEventBase>> ToTabletSend; template <bool WithSnapshot = true, typename Record> @@ -590,7 +588,7 @@ private: ev->Record.SetParent(buildInfo.KMeans.Parent); ev->Record.SetChild(buildInfo.KMeans.Child); - Y_VERIFY_DEBUG(buildInfo.Sample.Rows.size() <= buildInfo.KMeans.K); + Y_ENSURE(buildInfo.Sample.Rows.size() <= buildInfo.KMeans.K); auto& clusters = *ev->Record.MutableClusters(); clusters.Reserve(buildInfo.Sample.Rows.size()); for (const auto& [_, row] : buildInfo.Sample.Rows) { @@ -760,7 +758,7 @@ private: auto path = GetBuildPath(Self, buildInfo, NTableIndex::NTableVectorKmeansTreeIndex::LevelTable); Y_ENSURE(buildInfo.Sample.Rows.size() <= buildInfo.KMeans.K); auto actor = new TUploadSampleK(path.PathString(), !buildInfo.KMeans.NeedsAnotherLevel(), - buildInfo.ScanSettings, Self->SelfId(), ui64(BuildId), + buildInfo.ScanSettings, Self->SelfId(), BuildId, buildInfo.Sample.Rows, buildInfo.KMeans.Parent, buildInfo.KMeans.Child); TActivationContext::AsActorContext().MakeFor(Self->SelfId()).Register(actor); @@ -802,7 +800,7 @@ private: break; case NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR: case NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST: - Y_ABORT("Unreachable"); + Y_ENSURE(false, "Unreachable"); } } @@ -1113,9 +1111,8 @@ private: } public: - explicit TTxProgress(TSelf* self, TIndexBuildId id) - : TTxBase(self, TXTYPE_PROGRESS_INDEX_BUILD) - , BuildId(id) + explicit TTxProgress(TSelf* self, TIndexBuildId buildId) + : TTxBase(self, buildId, TXTYPE_PROGRESS_INDEX_BUILD) {} bool DoExecute(TTransactionContext& txc, const TActorContext& ctx) override { @@ -1128,7 +1125,7 @@ public: switch (buildInfo.State) { case TIndexBuildInfo::EState::Invalid: - Y_ABORT("Unreachable"); + Y_ENSURE(false, "Unreachable"); case TIndexBuildInfo::EState::AlterMainTable: if (buildInfo.AlterMainTableTxId == InvalidTxId) { @@ -1350,6 +1347,32 @@ public: return true; } + void OnUnhandledException(TTransactionContext& txc, const TActorContext& ctx, TIndexBuildInfo* buildInfo, const std::exception& exc) override { + if (!buildInfo) { + LOG_N("TTxBuildProgress: OnUnhandledException: BuildIndexId not found " + << (BuildId == InvalidIndexBuildId ? TString("") : TStringBuilder() << ", id# " << BuildId)); + return; + } + + NIceDb::TNiceDb db(txc.DB); + if (!buildInfo->Issue.Contains(exc.what())) { + buildInfo->Issue += TStringBuilder() << "Unhandled exception " << exc.what(); + } + Self->PersistBuildIndexIssue(db, *buildInfo); + + if (buildInfo->State != TIndexBuildInfo::EState::Filling) { + // no idea how to gracefully stop index build otherwise + // leave everything as is + LOG_E("TTxBuildProgress: OnUnhandledException: not a Filling state, id# " << buildInfo->Id + << ", TIndexBuildInfo: " << *buildInfo); + return; + } + + buildInfo->State = TIndexBuildInfo::EState::Rejection_Applying; + Self->PersistBuildIndexState(db, *buildInfo); + Self->Execute(Self->CreateTxProgress(buildInfo->Id), ctx); + } + static TSerializedTableRange InfiniteRange(ui32 columns) { TVector<TCell> vec(columns, TCell()); TArrayRef<TCell> cells(vec); @@ -1424,20 +1447,18 @@ ITransaction* TSchemeShard::CreateTxProgress(TIndexBuildId id) { struct TSchemeShard::TIndexBuilder::TTxBilling: public TSchemeShard::TIndexBuilder::TTxBase { private: - TIndexBuildId BuildIndexId; TInstant ScheduledAt; public: explicit TTxBilling(TSelf* self, TEvPrivate::TEvIndexBuildingMakeABill::TPtr& ev) - : TTxBase(self, TXTYPE_PROGRESS_INDEX_BUILD) - , BuildIndexId(ev->Get()->BuildId) + : TTxBase(self, TIndexBuildId(ev->Get()->BuildId), TXTYPE_PROGRESS_INDEX_BUILD) , ScheduledAt(ev->Get()->SendAt) {} bool DoExecute(TTransactionContext& , const TActorContext& ctx) override { - LOG_I("TTxReply : TTxBilling, id# " << BuildIndexId); + LOG_I("TTxReply : TTxBilling, id# " << BuildId); - const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(BuildIndexId); + const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(BuildId); if (!buildInfoPtr) { return true; } @@ -1456,27 +1477,56 @@ public: void DoComplete(const TActorContext&) override { } + + void OnUnhandledException(TTransactionContext&, const TActorContext&, TIndexBuildInfo*, const std::exception&) override { + // seems safe to ignore + } }; struct TSchemeShard::TIndexBuilder::TTxReply: public TSchemeShard::TIndexBuilder::TTxBase { public: - explicit TTxReply(TSelf* self) - : TTxBase(self, TXTYPE_PROGRESS_INDEX_BUILD) + explicit TTxReply(TSelf* self, TIndexBuildId buildId) + : TTxBase(self, buildId, TXTYPE_PROGRESS_INDEX_BUILD) {} + void OnUnhandledException(TTransactionContext& txc, const TActorContext& ctx, TIndexBuildInfo* buildInfo, const std::exception& exc) override { + if (!buildInfo) { + LOG_E("TTxReply : OnUnhandledException BuildIndexId not found" + << (BuildId == InvalidIndexBuildId ? TString("") : TStringBuilder() << ", id# " << BuildId)); + return; + } + + NIceDb::TNiceDb db(txc.DB); + if (!buildInfo->Issue.Contains(exc.what())) { + buildInfo->Issue += TStringBuilder() << "Unhandled exception " << exc.what(); + } + Self->PersistBuildIndexIssue(db, *buildInfo); + + if (buildInfo->State != TIndexBuildInfo::EState::Filling) { + // most replies are used at Filling stage + // no idea how to gracefully stop index build otherwise + // leave everything as is + LOG_E("TTxReply : OnUnhandledException not a Filling state, id# " << buildInfo->Id + << ", TIndexBuildInfo: " << *buildInfo); + return; + } + + buildInfo->State = TIndexBuildInfo::EState::Rejection_Applying; + Self->PersistBuildIndexState(db, *buildInfo); + Self->Execute(Self->CreateTxProgress(buildInfo->Id), ctx); + } + void DoComplete(const TActorContext&) override { } }; struct TSchemeShard::TIndexBuilder::TTxReplyRetry: public TSchemeShard::TIndexBuilder::TTxReply { private: - TIndexBuildId BuildId; TTabletId ShardId; public: explicit TTxReplyRetry(TSelf* self, TIndexBuildId buildId, TTabletId shardId) - : TTxReply(self) - , BuildId(buildId) + : TTxReply(self, buildId) , ShardId(shardId) {} @@ -1526,22 +1576,21 @@ private: public: explicit TTxReplySampleK(TSelf* self, TEvDataShard::TEvSampleKResponse::TPtr& sampleK) - : TTxReply(self) + : TTxReply(self, TIndexBuildId(sampleK->Get()->Record.GetId())) , SampleK(sampleK) { } bool DoExecute(TTransactionContext& txc, const TActorContext& ctx) override { auto& record = SampleK->Get()->Record; - const auto buildId = TIndexBuildId(record.GetId()); TTabletId shardId = TTabletId(record.GetTabletId()); TShardIdx shardIdx = Self->GetShardIdx(shardId); - LOG_N("TTxReply : TEvSampleKResponse, id# " << buildId + LOG_N("TTxReply : TEvSampleKResponse, id# " << BuildId << ", shardId# " << shardId << ", shardIdx# " << shardIdx); - const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(buildId); + const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(BuildId); if (!buildInfoPtr) { return true; } @@ -1558,7 +1607,7 @@ public: } if (buildInfo.State != TIndexBuildInfo::EState::Filling) { - LOG_I("TTxReply : TEvSampleKResponse superfluous event, id# " << buildId); + LOG_I("TTxReply : TEvSampleKResponse superfluous event, id# " << BuildId); return true; } @@ -1633,16 +1682,16 @@ public: << ", shardIdx: " << shardIdx; Self->PersistBuildIndexIssue(db, buildInfo); ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying); - Progress(buildId); + Progress(BuildId); return true; case NKikimrIndexBuilder::EBuildStatus::INVALID: case NKikimrIndexBuilder::EBuildStatus::ACCEPTED: case NKikimrIndexBuilder::EBuildStatus::IN_PROGRESS: - Y_ABORT("Unreachable"); + Y_ENSURE(false, "Unreachable"); } - Self->PersistBuildIndexUploadProgress(db, buildId, shardIdx, shardStatus); - Self->IndexBuildPipes.Close(buildId, shardId, ctx); - Progress(buildId); + Self->PersistBuildIndexUploadProgress(db, BuildId, shardIdx, shardStatus); + Self->IndexBuildPipes.Close(BuildId, shardId, ctx); + Progress(BuildId); return true; } @@ -1654,22 +1703,21 @@ private: public: explicit TTxReplyLocalKMeans(TSelf* self, TEvDataShard::TEvLocalKMeansResponse::TPtr& local) - : TTxReply(self) + : TTxReply(self, TIndexBuildId(local->Get()->Record.GetId())) , Local(local) { } bool DoExecute(TTransactionContext& txc, const TActorContext& ctx) override { const auto& record = Local->Get()->Record; - const auto buildId = TIndexBuildId(record.GetId()); TTabletId shardId = TTabletId(record.GetTabletId()); TShardIdx shardIdx = Self->GetShardIdx(shardId); - LOG_N("TTxReply : TEvLocalKMeansResponse, id# " << record.GetId() + LOG_N("TTxReply : TEvLocalKMeansResponse, id# " << BuildId << ", shardId# " << shardId << ", shardIdx# " << shardIdx); - const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(buildId); + const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(BuildId); if (!buildInfoPtr) { return true; } @@ -1686,7 +1734,7 @@ public: } if (buildInfo.State != TIndexBuildInfo::EState::Filling) { - LOG_I("TTxReply : TEvLocalKMeansResponse superfluous event, id# " << buildId); + LOG_I("TTxReply : TEvLocalKMeansResponse superfluous event, id# " << BuildId); return true; } @@ -1735,16 +1783,16 @@ public: << ", shardIdx: " << shardIdx; Self->PersistBuildIndexIssue(db, buildInfo); ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying); - Progress(buildId); + Progress(BuildId); return true; case NKikimrIndexBuilder::EBuildStatus::INVALID: case NKikimrIndexBuilder::EBuildStatus::ACCEPTED: case NKikimrIndexBuilder::EBuildStatus::IN_PROGRESS: - Y_ABORT("Unreachable"); + Y_ENSURE(false, "Unreachable"); } - Self->PersistBuildIndexUploadProgress(db, buildId, shardIdx, shardStatus); - Self->IndexBuildPipes.Close(buildId, shardId, ctx); - Progress(buildId); + Self->PersistBuildIndexUploadProgress(db, BuildId, shardIdx, shardStatus); + Self->IndexBuildPipes.Close(BuildId, shardId, ctx); + Progress(BuildId); return true; } @@ -1756,22 +1804,21 @@ private: public: explicit TTxReplyReshuffleKMeans(TSelf* self, TEvDataShard::TEvReshuffleKMeansResponse::TPtr& reshuffle) - : TTxReply(self) + : TTxReply(self, TIndexBuildId(reshuffle->Get()->Record.GetId())) , Reshuffle(reshuffle) { } bool DoExecute(TTransactionContext& txc, const TActorContext& ctx) override { const auto& record = Reshuffle->Get()->Record; - const auto buildId = TIndexBuildId(record.GetId()); TTabletId shardId = TTabletId(record.GetTabletId()); TShardIdx shardIdx = Self->GetShardIdx(shardId); - LOG_N("TTxReply : TEvReshuffleKMeansResponse, id# " << record.GetId() + LOG_N("TTxReply : TEvReshuffleKMeansResponse, id# " << BuildId << ", shardId# " << shardId << ", shardIdx# " << shardIdx); - const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(buildId); + const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(BuildId); if (!buildInfoPtr) { return true; } @@ -1788,7 +1835,7 @@ public: } if (buildInfo.State != TIndexBuildInfo::EState::Filling) { - LOG_I("TTxReply : TEvReshuffleKMeansResponse superfluous event, id# " << buildId); + LOG_I("TTxReply : TEvReshuffleKMeansResponse superfluous event, id# " << BuildId); return true; } @@ -1837,16 +1884,16 @@ public: << ", shardIdx: " << shardIdx; Self->PersistBuildIndexIssue(db, buildInfo); ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying); - Progress(buildId); + Progress(BuildId); return true; case NKikimrIndexBuilder::EBuildStatus::INVALID: case NKikimrIndexBuilder::EBuildStatus::ACCEPTED: case NKikimrIndexBuilder::EBuildStatus::IN_PROGRESS: - Y_ABORT("Unreachable"); + Y_ENSURE(false, "Unreachable"); } - Self->PersistBuildIndexUploadProgress(db, buildId, shardIdx, shardStatus); - Self->IndexBuildPipes.Close(buildId, shardId, ctx); - Progress(buildId); + Self->PersistBuildIndexUploadProgress(db, BuildId, shardIdx, shardStatus); + Self->IndexBuildPipes.Close(BuildId, shardId, ctx); + Progress(BuildId); return true; } @@ -1858,22 +1905,21 @@ private: public: explicit TTxReplyPrefixKMeans(TSelf* self, TEvDataShard::TEvPrefixKMeansResponse::TPtr& prefix) - : TTxReply(self) + : TTxReply(self, TIndexBuildId(prefix->Get()->Record.GetId())) , Prefix(prefix) { } bool DoExecute(TTransactionContext& txc, const TActorContext& ctx) override { const auto& record = Prefix->Get()->Record; - const auto buildId = TIndexBuildId(record.GetId()); TTabletId shardId = TTabletId(record.GetTabletId()); TShardIdx shardIdx = Self->GetShardIdx(shardId); - LOG_N("TTxReply : TEvPrefixKMeansResponse, id# " << record.GetId() + LOG_N("TTxReply : TEvPrefixKMeansResponse, id# " << BuildId << ", shardId# " << shardId << ", shardIdx# " << shardIdx); - const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(buildId); + const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(BuildId); if (!buildInfoPtr) { return true; } @@ -1890,7 +1936,7 @@ public: } if (buildInfo.State != TIndexBuildInfo::EState::Filling) { - LOG_I("TTxReply : TEvPrefixKMeansResponse superfluous event, id# " << buildId); + LOG_I("TTxReply : TEvPrefixKMeansResponse superfluous event, id# " << BuildId); return true; } @@ -1939,16 +1985,16 @@ public: << ", shardIdx: " << shardIdx; Self->PersistBuildIndexIssue(db, buildInfo); ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying); - Progress(buildId); + Progress(BuildId); return true; case NKikimrIndexBuilder::EBuildStatus::INVALID: case NKikimrIndexBuilder::EBuildStatus::ACCEPTED: case NKikimrIndexBuilder::EBuildStatus::IN_PROGRESS: - Y_ABORT("Unreachable"); + Y_ENSURE(false, "Unreachable"); } - Self->PersistBuildIndexUploadProgress(db, buildId, shardIdx, shardStatus); - Self->IndexBuildPipes.Close(buildId, shardId, ctx); - Progress(buildId); + Self->PersistBuildIndexUploadProgress(db, BuildId, shardIdx, shardStatus); + Self->IndexBuildPipes.Close(BuildId, shardId, ctx); + Progress(BuildId); return true; } @@ -1959,19 +2005,18 @@ private: TEvIndexBuilder::TEvUploadSampleKResponse::TPtr UploadSample; public: - explicit TTxReplyUploadSample(TSelf* self, TEvIndexBuilder::TEvUploadSampleKResponse::TPtr& upload) - : TTxReply(self) - , UploadSample(upload) + explicit TTxReplyUploadSample(TSelf* self, TEvIndexBuilder::TEvUploadSampleKResponse::TPtr& uploadSample) + : TTxReply(self, TIndexBuildId(uploadSample->Get()->Record.GetId())) + , UploadSample(uploadSample) { } bool DoExecute([[maybe_unused]] TTransactionContext& txc, [[maybe_unused]] const TActorContext& ctx) override { const auto& record = UploadSample->Get()->Record; - const auto buildId = TIndexBuildId(record.GetId()); - LOG_N("TTxReply : TEvUploadSampleKResponse, id# " << record.GetId()); + LOG_N("TTxReply : TEvUploadSampleKResponse, id# " << BuildId); - const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(buildId); + const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(BuildId); if (!buildInfoPtr) { return true; } @@ -1983,7 +2028,7 @@ public: Y_ENSURE(buildInfo.IsBuildVectorIndex()); if (buildInfo.State != TIndexBuildInfo::EState::Filling) { - LOG_I("TTxReply : TEvUploadSampleKResponse superfluous event, id# " << buildId); + LOG_I("TTxReply : TEvUploadSampleKResponse superfluous event, id# " << BuildId); return true; } Y_ENSURE(buildInfo.Sample.State == TIndexBuildInfo::TSample::EState::Upload); @@ -2002,14 +2047,14 @@ public: auto status = record.GetUploadStatus(); if (status == Ydb::StatusIds::SUCCESS) { buildInfo.Sample.State = TIndexBuildInfo::TSample::EState::Done; - Progress(buildId); + Progress(BuildId); } else { NYql::TIssues issues; NYql::IssuesFromMessage(record.GetIssues(), issues); buildInfo.Issue += issues.ToString(); Self->PersistBuildIndexIssue(db, buildInfo); ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying); - Progress(buildId); + Progress(BuildId); } return true; @@ -2021,22 +2066,21 @@ private: TEvDataShard::TEvBuildIndexProgressResponse::TPtr ShardProgress; public: explicit TTxReplyProgress(TSelf* self, TEvDataShard::TEvBuildIndexProgressResponse::TPtr& shardProgress) - : TTxReply(self) + : TTxReply(self, TIndexBuildId(shardProgress->Get()->Record.GetId())) , ShardProgress(shardProgress) {} bool DoExecute(TTransactionContext& txc, const TActorContext& ctx) override { const auto& record = ShardProgress->Get()->Record; - const auto buildId = TIndexBuildId(record.GetId()); TTabletId shardId = TTabletId(record.GetTabletId()); TShardIdx shardIdx = Self->GetShardIdx(shardId); - LOG_I("TTxReply : TEvBuildIndexProgressResponse, id# " << record.GetId() + LOG_I("TTxReply : TEvBuildIndexProgressResponse, id# " << BuildId << ", shardId# " << shardId << ", shardIdx# " << shardIdx); - const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(buildId); + const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(BuildId); if (!buildInfoPtr) { return true; } @@ -2052,7 +2096,7 @@ public: } if (buildInfo.State != TIndexBuildInfo::EState::Filling) { - LOG_I("TTxReply : TEvBuildIndexProgressResponse superfluous event, id# " << buildId); + LOG_I("TTxReply : TEvBuildIndexProgressResponse superfluous event, id# " << BuildId); return true; } @@ -2111,11 +2155,11 @@ public: switch (shardStatus.Status) { case NKikimrIndexBuilder::EBuildStatus::INVALID: - Y_ABORT("Unreachable"); + Y_ENSURE(false, "Unreachable"); case NKikimrIndexBuilder::EBuildStatus::ACCEPTED: case NKikimrIndexBuilder::EBuildStatus::IN_PROGRESS: // no oop, wait resolution. Progress key are persisted - Self->PersistBuildIndexUploadProgress(db, buildId, shardIdx, shardStatus); + Self->PersistBuildIndexUploadProgress(db, BuildId, shardIdx, shardStatus); return true; case NKikimrIndexBuilder::EBuildStatus::DONE: if (buildInfo.InProgressShards.erase(shardIdx)) { @@ -2137,12 +2181,12 @@ public: << ", shardIdx: " << shardIdx; Self->PersistBuildIndexIssue(db, buildInfo); ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying); - Progress(buildId); + Progress(BuildId); return true; } - Self->PersistBuildIndexUploadProgress(db, buildId, shardIdx, shardStatus); - Self->IndexBuildPipes.Close(buildId, shardId, ctx); - Progress(buildId); + Self->PersistBuildIndexUploadProgress(db, BuildId, shardIdx, shardStatus); + Self->IndexBuildPipes.Close(BuildId, shardId, ctx); + Progress(BuildId); return true; } @@ -2153,7 +2197,7 @@ private: TTxId CompletedTxId; public: explicit TTxReplyCompleted(TSelf* self, TTxId completedTxId) - : TTxReply(self) + : TTxReply(self, InvalidIndexBuildId) , CompletedTxId(completedTxId) {} @@ -2168,9 +2212,9 @@ public: return true; } - const auto buildId = *buildIdPtr; - auto& buildInfo = *Self->IndexBuilds.at(buildId); - LOG_I("TTxReply : TEvNotifyTxCompletionResult, id# " << buildId + BuildId = *buildIdPtr; + auto& buildInfo = *Self->IndexBuilds.at(BuildId); + LOG_I("TTxReply : TEvNotifyTxCompletionResult, id# " << BuildId << ", txId# " << txId); LOG_D("TTxReply : TEvNotifyTxCompletionResult" << ", TIndexBuildInfo: " << buildInfo @@ -2235,7 +2279,7 @@ public: Y_ENSURE(false, "Unreachable " << state); } - Progress(buildId); + Progress(BuildId); return true; } @@ -2246,7 +2290,7 @@ private: TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr ModifyResult; public: explicit TTxReplyModify(TSelf* self, TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& modifyResult) - : TTxReply(self) + : TTxReply(self, InvalidIndexBuildId) , ModifyResult(modifyResult) {} @@ -2287,11 +2331,11 @@ public: return true; } - const auto buildId = *buildIdPtr; + BuildId = *buildIdPtr; // We need this because we use buildInfo after EraseBuildInfo - auto buildInfoPin = Self->IndexBuilds.at(buildId); + auto buildInfoPin = Self->IndexBuilds.at(BuildId); auto& buildInfo = *buildInfoPin; - LOG_I("TTxReply : TEvModifySchemeTransactionResult, id# " << buildId + LOG_I("TTxReply : TEvModifySchemeTransactionResult, id# " << BuildId << ", cookie: " << ModifyResult->Cookie << ", record: " << record.ShortDebugString() << ", status: " << NKikimrScheme::EStatus_Name(record.GetStatus())); @@ -2324,7 +2368,7 @@ public: if (record.GetStatus() == NKikimrScheme::StatusAccepted) { // no op } else if (record.GetStatus() == NKikimrScheme::StatusAlreadyExists) { - Y_ABORT("NEED MORE TESTING"); + Y_ENSURE(false, "NEED MORE TESTING"); // no op } else { buildInfo.Issue += TStringBuilder() @@ -2430,7 +2474,7 @@ public: Y_ENSURE(false, "Unreachable " << state); } - Progress(buildId); + Progress(BuildId); return true; } @@ -2441,15 +2485,14 @@ private: TEvTxAllocatorClient::TEvAllocateResult::TPtr AllocateResult; public: explicit TTxReplyAllocate(TSelf* self, TEvTxAllocatorClient::TEvAllocateResult::TPtr& allocateResult) - : TTxReply(self) + : TTxReply(self, TIndexBuildId(allocateResult->Cookie)) , AllocateResult(allocateResult) {} bool DoExecute(TTransactionContext& txc,[[maybe_unused]] const TActorContext& ctx) override { - TIndexBuildId buildId = TIndexBuildId(AllocateResult->Cookie); const auto txId = TTxId(AllocateResult->Get()->TxIds.front()); - const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(buildId); + const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(BuildId); if (!buildInfoPtr) { LOG_I("TTxReply : TEvAllocateResult superfluous message" << ", cookie: " << AllocateResult->Cookie @@ -2459,7 +2502,7 @@ public: } auto& buildInfo = *buildInfoPtr->Get(); - LOG_I("TTxReply : TEvAllocateResult, id# " << buildId + LOG_I("TTxReply : TEvAllocateResult, id# " << BuildId << ", txId# " << txId); LOG_D("TTxReply : TEvAllocateResult" << ", TIndexBuildInfo: " << buildInfo @@ -2515,7 +2558,7 @@ public: Y_ENSURE(false, "Unreachable " << state); } - Progress(buildId); + Progress(BuildId); return true; } diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp index eb84bb449a2..647c7b8e44e 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp @@ -428,7 +428,18 @@ bool TSchemeShard::TIndexBuilder::TTxBase::GotScheduledBilling(TIndexBuildInfo& } bool TSchemeShard::TIndexBuilder::TTxBase::Execute(TTransactionContext& txc, const TActorContext& ctx) { - if (!DoExecute(txc, ctx)) { + bool executeResult; + + try { + executeResult = DoExecute(txc, ctx); + } catch (const std::exception& exc) { + if (OnUnhandledExceptionSafe(txc, ctx, exc)) { + return true; + } + throw; // fail process, a really bad thing has happened + } + + if (!executeResult) { return false; } @@ -436,6 +447,30 @@ bool TSchemeShard::TIndexBuilder::TTxBase::Execute(TTransactionContext& txc, con return true; } +bool TSchemeShard::TIndexBuilder::TTxBase::OnUnhandledExceptionSafe(TTransactionContext& txc, const TActorContext& ctx, const std::exception& originalExc) { + try { + const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(BuildId); + TIndexBuildInfo* buildInfo = buildInfoPtr + ? buildInfoPtr->Get() + : nullptr; + + LOG_E("Unhandled exception, id#" + << (BuildId == InvalidIndexBuildId ? TString("<no id>") : TStringBuilder() << BuildId) + << " " << TypeName(originalExc) << ": " << originalExc.what() << Endl + << TBackTrace::FromCurrentException().PrintToString() + << ", TIndexBuildInfo: " << (buildInfo ? TStringBuilder() << (*buildInfo) : TString("<no build info>"))); + + OnUnhandledException(txc, ctx, buildInfo, originalExc); + + return true; + } catch (const std::exception& handleExc) { + LOG_E("OnUnhandledException throws unhandled exception " + << TypeName(handleExc) << ": " << handleExc.what() << Endl + << TBackTrace::FromCurrentException().PrintToString()); + return false; + } +} + void TSchemeShard::TIndexBuilder::TTxBase::Complete(const TActorContext& ctx) { DoComplete(ctx); diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.h b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.h index 02f96b34aef..52d63138167 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.h +++ b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.h @@ -11,6 +11,9 @@ namespace NSchemeShard { class TSchemeShard::TIndexBuilder::TTxBase: public NTabletFlatExecutor::TTransactionBase<TSchemeShard> { private: TSideEffects SideEffects; +protected: + TIndexBuildId BuildId; +private: const NKikimr::NSchemeShard::ETxTypes TxType; public: const TString LogPrefix; @@ -29,6 +32,7 @@ private: ui64 RequestUnits(const TBillingStats& stats); void RoundPeriod(TInstant& start, TInstant& end); void ApplyBill(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx); + bool OnUnhandledExceptionSafe(TTransactionContext& txc, const TActorContext& ctx, const std::exception& exc); protected: void Send(TActorId dst, THolder<IEventBase> message, ui32 flags = 0, ui64 cookie = 0); @@ -48,8 +52,9 @@ protected: bool GotScheduledBilling(TIndexBuildInfo& indexBuildInfo); public: - explicit TTxBase(TSelf* self, NKikimr::NSchemeShard::ETxTypes txType) + explicit TTxBase(TSelf* self, TIndexBuildId buildId, NKikimr::NSchemeShard::ETxTypes txType) : TBase(self) + , BuildId(buildId) , TxType(txType) , LogPrefix(TStringBuilder() << "TIndexBuilder::" << NKikimr::NSchemeShard::ETxTypes_Name(txType) << ": ") { } @@ -60,6 +65,7 @@ public: virtual bool DoExecute(TTransactionContext& txc, const TActorContext& ctx) = 0; virtual void DoComplete(const TActorContext& ctx) = 0; + virtual void OnUnhandledException(TTransactionContext& txc, const TActorContext& ctx, TIndexBuildInfo* buildInfo, const std::exception& exc) = 0; bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; void Complete(const TActorContext& ctx) override; @@ -72,12 +78,17 @@ public: THolder<TResponse> Response; const bool IsMutableOperation; - explicit TTxSimple(TSelf* self, typename TRequest::TPtr& ev, NKikimr::NSchemeShard::ETxTypes txType, bool isMutableOperation = true) - : TTxBase(self, txType) + explicit TTxSimple(TSelf* self, TIndexBuildId buildId, typename TRequest::TPtr& ev, NKikimr::NSchemeShard::ETxTypes txType, bool isMutableOperation = true) + : TTxBase(self, buildId, txType) , Request(ev) , IsMutableOperation(isMutableOperation) { } + void OnUnhandledException(TTransactionContext&, const TActorContext&, TIndexBuildInfo*, const std::exception& exc) override { + Reply(Ydb::StatusIds::INTERNAL_ERROR, TStringBuilder() + << "Unhandled exception " << exc.what()); + } + bool Reply(const Ydb::StatusIds::StatusCode status = Ydb::StatusIds::SUCCESS, const TString& errorMessage = TString()) { Y_ABORT_UNLESS(Response); diff --git a/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp b/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp index 8bae31a12a1..49e2b911029 100644 --- a/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp +++ b/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp @@ -419,4 +419,151 @@ Y_UNIT_TEST_SUITE (VectorIndexBuildTest) { }); } } + + Y_UNIT_TEST(TTxReply_DoExecute_Throws) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::BUILD_INDEX, NLog::PRI_TRACE); + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "vectors" + Columns { Name: "id" Type: "Uint64" } + Columns { Name: "embedding" Type: "String" } + KeyColumnNames: [ "id" ] + )"); + env.TestWaitNotification(runtime, txId); + + NYdb::NTable::TGlobalIndexSettings globalIndexSettings; + + std::unique_ptr<NYdb::NTable::TKMeansTreeSettings> kmeansTreeSettings; + { + Ydb::Table::KMeansTreeSettings proto; + UNIT_ASSERT(google::protobuf::TextFormat::ParseFromString(R"( + settings { + metric: DISTANCE_COSINE + vector_type: VECTOR_TYPE_FLOAT + vector_dimension: 1024 + } + levels: 5 + clusters: 4 + )", &proto)); + using T = NYdb::NTable::TKMeansTreeSettings; + kmeansTreeSettings = std::make_unique<T>(T::FromProto(proto)); + } + + TBlockEvents<TEvDataShard::TEvLocalKMeansResponse> blocked(runtime, [&](auto& ev) { + ev->Get()->Record.SetRequestSeqNoRound(999); + return true; + }); + + const ui64 buildIndexTx = ++txId; + const TVector<TString> dataColumns; + const TVector<TString> indexColumns{"embedding"}; + AsyncBuildVectorIndex(runtime, buildIndexTx, TTestTxConfig::SchemeShard, "/MyRoot", "/MyRoot/vectors", "index1", "embedding"); + + runtime.WaitFor("block", [&]{ return blocked.size(); }); + blocked.Stop().Unblock(); + + env.TestWaitNotification(runtime, buildIndexTx); + + { + auto buildIndexOperation = TestGetBuildIndex(runtime, TTestTxConfig::SchemeShard, "/MyRoot", buildIndexTx); + UNIT_ASSERT_VALUES_EQUAL_C( + buildIndexOperation.GetIndexBuild().GetState(), Ydb::Table::IndexBuildState::STATE_REJECTED, + buildIndexOperation.DebugString() + ); + UNIT_ASSERT_STRING_CONTAINS(buildIndexOperation.DebugString(), "Condition violated: `actualSeqNo > recordSeqNo"); + } + + RebootTablet(runtime, TTestTxConfig::SchemeShard, runtime.AllocateEdgeActor()); + + { + auto buildIndexOperation = TestGetBuildIndex(runtime, TTestTxConfig::SchemeShard, "/MyRoot", buildIndexTx); + UNIT_ASSERT_VALUES_EQUAL_C( + buildIndexOperation.GetIndexBuild().GetState(), Ydb::Table::IndexBuildState::STATE_REJECTED, + buildIndexOperation.DebugString() + ); + UNIT_ASSERT_STRING_CONTAINS(buildIndexOperation.DebugString(), "Unhandled exception"); + UNIT_ASSERT_STRING_CONTAINS(buildIndexOperation.DebugString(), "Condition violated: `actualSeqNo > recordSeqNo"); + } + } + + Y_UNIT_TEST(TTxProgress_Throws) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::BUILD_INDEX, NLog::PRI_TRACE); + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "vectors" + Columns { Name: "id" Type: "Uint64" } + Columns { Name: "embedding" Type: "String" } + KeyColumnNames: [ "id" ] + )"); + env.TestWaitNotification(runtime, txId); + + NYdb::NTable::TGlobalIndexSettings globalIndexSettings; + + std::unique_ptr<NYdb::NTable::TKMeansTreeSettings> kmeansTreeSettings; + { + Ydb::Table::KMeansTreeSettings proto; + UNIT_ASSERT(google::protobuf::TextFormat::ParseFromString(R"( + settings { + metric: DISTANCE_COSINE + vector_type: VECTOR_TYPE_FLOAT + vector_dimension: 1024 + } + levels: 5 + clusters: 4 + )", &proto)); + using T = NYdb::NTable::TKMeansTreeSettings; + kmeansTreeSettings = std::make_unique<T>(T::FromProto(proto)); + } + + const ui64 buildIndexTx = ++txId; + const TVector<TString> dataColumns; + const TVector<TString> indexColumns{"embedding"}; + AsyncBuildVectorIndex(runtime, buildIndexTx, TTestTxConfig::SchemeShard, "/MyRoot", "/MyRoot/vectors", "index1", "embedding"); + + env.TestWaitNotification(runtime, buildIndexTx); + + { + auto buildIndexOperation = TestGetBuildIndex(runtime, TTestTxConfig::SchemeShard, "/MyRoot", buildIndexTx); + UNIT_ASSERT_VALUES_EQUAL_C( + buildIndexOperation.GetIndexBuild().GetState(), Ydb::Table::IndexBuildState::STATE_DONE, + buildIndexOperation.DebugString() + ); + } + + { // set 'Invalid' state + TString writeQuery = Sprintf(R"( + ( + (let key '( '('Id (Uint64 '%lu)) ) ) + (let value '('('State (Uint32 '0)) ) ) + (return (AsList (UpdateRow 'IndexBuild key value) )) + ) + )", buildIndexTx); + NKikimrMiniKQL::TResult result; + TString err; + NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, TTestTxConfig::SchemeShard, writeQuery, result, err); + UNIT_ASSERT_VALUES_EQUAL_C(status, NKikimrProto::EReplyStatus::OK, err); + } + + RebootTablet(runtime, TTestTxConfig::SchemeShard, runtime.AllocateEdgeActor()); + + { + auto buildIndexOperation = TestGetBuildIndex(runtime, TTestTxConfig::SchemeShard, "/MyRoot", buildIndexTx); + UNIT_ASSERT_VALUES_EQUAL_C( + buildIndexOperation.GetIndexBuild().GetState(), Ydb::Table::IndexBuildState::STATE_UNSPECIFIED, + buildIndexOperation.DebugString() + ); + UNIT_ASSERT_STRING_CONTAINS(buildIndexOperation.DebugString(), "Unhandled exception"); + UNIT_ASSERT_STRING_CONTAINS(buildIndexOperation.DebugString(), "Unreachable"); + } + } } |