aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkungurtsev <kungasc@ydb.tech>2025-06-04 16:21:39 +0200
committerGitHub <noreply@github.com>2025-06-04 16:21:39 +0200
commit1b9117ef252f8bc4fc40c1e9cab503a38a86bc5d (patch)
tree5c5b56e04ebcf0c36e15b07b2be6cbfe16449e9a
parentee494fe01cde439ee03e4394ca4aa8499f7aaf78 (diff)
downloadydb-1b9117ef252f8bc4fc40c1e9cab503a38a86bc5d.tar.gz
Handle unhandled exceptions during build index on SchemeShard (#19246)
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_build_index__cancel.cpp15
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp13
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_build_index__forget.cpp11
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_build_index__get.cpp10
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_build_index__list.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp247
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp37
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.h17
-rw-r--r--ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp147
9 files changed, 365 insertions, 134 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__cancel.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__cancel.cpp
index d520b6f2f66..760341aeed7 100644
--- a/ydb/core/tx/schemeshard/schemeshard_build_index__cancel.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_build_index__cancel.cpp
@@ -10,7 +10,7 @@ using namespace NTabletFlatExecutor;
struct TSchemeShard::TIndexBuilder::TTxCancel: public TSchemeShard::TIndexBuilder::TTxSimple<TEvIndexBuilder::TEvCancelRequest, TEvIndexBuilder::TEvCancelResponse> {
public:
explicit TTxCancel(TSelf* self, TEvIndexBuilder::TEvCancelRequest::TPtr& ev)
- : TTxSimple(self, ev, TXTYPE_CANCEL_INDEX_BUILD)
+ : TTxSimple(self, TIndexBuildId(ev->Get()->Record.GetIndexBuildId()), ev, TXTYPE_CANCEL_INDEX_BUILD)
{}
bool DoExecute(TTransactionContext& txc, const TActorContext&) override {
@@ -27,40 +27,39 @@ public:
}
const TPathId domainPathId = database.GetPathIdForDomain();
- TIndexBuildId indexBuildId = TIndexBuildId(record.GetIndexBuildId());
- const auto* indexBuildInfoPtr = Self->IndexBuilds.FindPtr(indexBuildId);
+ const auto* indexBuildInfoPtr = Self->IndexBuilds.FindPtr(BuildId);
if (!indexBuildInfoPtr) {
return Reply(
Ydb::StatusIds::NOT_FOUND,
- TStringBuilder() << "Index build process with id <" << indexBuildId << "> not found"
+ TStringBuilder() << "Index build process with id <" << BuildId << "> not found"
);
}
auto& indexBuildInfo = *indexBuildInfoPtr->Get();
if (indexBuildInfo.DomainPathId != domainPathId) {
return Reply(
Ydb::StatusIds::NOT_FOUND,
- TStringBuilder() << "Index build process with id <" << indexBuildId << "> not found in database <" << record.GetDatabaseName() << ">"
+ TStringBuilder() << "Index build process with id <" << BuildId << "> not found in database <" << record.GetDatabaseName() << ">"
);
}
if (indexBuildInfo.IsFinished()) {
return Reply(
Ydb::StatusIds::PRECONDITION_FAILED,
- TStringBuilder() << "Index build process with id <" << indexBuildId << "> has been finished already"
+ TStringBuilder() << "Index build process with id <" << BuildId << "> has been finished already"
);
}
if (indexBuildInfo.IsCancellationRequested()) {
return Reply(
Ydb::StatusIds::PRECONDITION_FAILED,
- TStringBuilder() << "Index build process with id <" << indexBuildId << "> canceling already"
+ TStringBuilder() << "Index build process with id <" << BuildId << "> canceling already"
);
}
if (indexBuildInfo.State > TIndexBuildInfo::EState::Filling) {
return Reply(
Ydb::StatusIds::PRECONDITION_FAILED,
- TStringBuilder() << "Index build process with id <" << indexBuildId << "> are almost done, cancellation has no sense"
+ TStringBuilder() << "Index build process with id <" << BuildId << "> are almost done, cancellation has no sense"
);
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp
index 85a7d215678..7e7ad2af6e5 100644
--- a/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp
@@ -14,7 +14,7 @@ using namespace NTabletFlatExecutor;
class TSchemeShard::TIndexBuilder::TTxCreate: public TSchemeShard::TIndexBuilder::TTxSimple<TEvIndexBuilder::TEvCreateRequest, TEvIndexBuilder::TEvCreateResponse> {
public:
explicit TTxCreate(TSelf* self, TEvIndexBuilder::TEvCreateRequest::TPtr& ev)
- : TTxSimple(self, ev, TXTYPE_CREATE_INDEX_BUILD)
+ : TTxSimple(self, TIndexBuildId(ev->Get()->Record.GetTxId()), ev, TXTYPE_CREATE_INDEX_BUILD)
{}
bool DoExecute(TTransactionContext& txc, const TActorContext& ctx) override {
@@ -24,10 +24,9 @@ public:
Response = MakeHolder<TEvIndexBuilder::TEvCreateResponse>(request.GetTxId());
- const auto id = TIndexBuildId(request.GetTxId());
- if (Self->IndexBuilds.contains(id)) {
+ if (Self->IndexBuilds.contains(BuildId)) {
return Reply(Ydb::StatusIds::ALREADY_EXISTS, TStringBuilder()
- << "Index build with id '" << id << "' already exists");
+ << "Index build with id '" << BuildId << "' already exists");
}
const TString& uid = GetUid(request.GetOperationParams());
@@ -83,7 +82,7 @@ public:
}
}
- TIndexBuildInfo::TPtr buildInfo = new TIndexBuildInfo(id, uid);
+ TIndexBuildInfo::TPtr buildInfo = new TIndexBuildInfo(BuildId, uid);
buildInfo->DomainPathId = domainPath.Base()->PathId;
buildInfo->TablePathId = tablePath.Base()->PathId;
@@ -198,14 +197,14 @@ public:
Self->PersistBuildIndexState(db, *buildInfo);
- auto [it, emplaced] = Self->IndexBuilds.emplace(id, buildInfo);
+ auto [it, emplaced] = Self->IndexBuilds.emplace(BuildId, buildInfo);
Y_ASSERT(emplaced);
if (uid) {
std::tie(std::ignore, emplaced) = Self->IndexBuildsByUid.emplace(uid, buildInfo);
Y_ASSERT(emplaced);
}
- Progress(id);
+ Progress(BuildId);
return true;
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__forget.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__forget.cpp
index f681f79dbd8..ff1a8492729 100644
--- a/ydb/core/tx/schemeshard/schemeshard_build_index__forget.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_build_index__forget.cpp
@@ -10,7 +10,7 @@ using namespace NTabletFlatExecutor;
struct TSchemeShard::TIndexBuilder::TTxForget: public TSchemeShard::TIndexBuilder::TTxSimple<TEvIndexBuilder::TEvForgetRequest, TEvIndexBuilder::TEvForgetResponse> {
public:
explicit TTxForget(TSelf* self, TEvIndexBuilder::TEvForgetRequest::TPtr& ev)
- : TTxSimple(self, ev, TXTYPE_FORGET_INDEX_BUILD)
+ : TTxSimple(self, TIndexBuildId(ev->Get()->Record.GetIndexBuildId()), ev, TXTYPE_FORGET_INDEX_BUILD)
{}
bool DoExecute(TTransactionContext& txc, const TActorContext&) override {
@@ -27,26 +27,25 @@ public:
}
const TPathId domainPathId = database.GetPathIdForDomain();
- TIndexBuildId indexBuildId = TIndexBuildId(record.GetIndexBuildId());
- const auto* indexBuildInfoPtr = Self->IndexBuilds.FindPtr(indexBuildId);
+ const auto* indexBuildInfoPtr = Self->IndexBuilds.FindPtr(BuildId);
if (!indexBuildInfoPtr) {
return Reply(
Ydb::StatusIds::NOT_FOUND,
- TStringBuilder() << "Index build process with id <" << indexBuildId << "> not found"
+ TStringBuilder() << "Index build process with id <" << BuildId << "> not found"
);
}
const auto& indexBuildInfo = *indexBuildInfoPtr->Get();
if (indexBuildInfo.DomainPathId != domainPathId) {
return Reply(
Ydb::StatusIds::NOT_FOUND,
- TStringBuilder() << "Index build process with id <" << indexBuildId << "> not found in database <" << record.GetDatabaseName() << ">"
+ TStringBuilder() << "Index build process with id <" << BuildId << "> not found in database <" << record.GetDatabaseName() << ">"
);
}
if (!indexBuildInfo.IsFinished()) {
return Reply(
Ydb::StatusIds::PRECONDITION_FAILED,
- TStringBuilder() << "Index build process with id <" << indexBuildId << "> hasn't been finished yet"
+ TStringBuilder() << "Index build process with id <" << BuildId << "> hasn't been finished yet"
);
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__get.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__get.cpp
index 72af67a620b..16f4007b2bb 100644
--- a/ydb/core/tx/schemeshard/schemeshard_build_index__get.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_build_index__get.cpp
@@ -11,7 +11,7 @@ using namespace NTabletFlatExecutor;
struct TSchemeShard::TIndexBuilder::TTxGet: public TSchemeShard::TIndexBuilder::TTxSimple<TEvIndexBuilder::TEvGetRequest, TEvIndexBuilder::TEvGetResponse> {
public:
explicit TTxGet(TSelf* self, TEvIndexBuilder::TEvGetRequest::TPtr& ev)
- : TTxSimple(self, ev, TXTYPE_GET_INDEX_BUILD, false)
+ : TTxSimple(self, TIndexBuildId(ev->Get()->Record.GetIndexBuildId()), ev, TXTYPE_GET_INDEX_BUILD, false)
{}
bool DoExecute(TTransactionContext&, const TActorContext&) override {
@@ -28,20 +28,18 @@ public:
}
const TPathId domainPathId = database.GetPathIdForDomain();
- TIndexBuildId indexBuildId = TIndexBuildId(record.GetIndexBuildId());
-
- const auto* indexBuildInfoPtr = Self->IndexBuilds.FindPtr(indexBuildId);
+ const auto* indexBuildInfoPtr = Self->IndexBuilds.FindPtr(BuildId);
if (!indexBuildInfoPtr) {
return Reply(
Ydb::StatusIds::PRECONDITION_FAILED,
- TStringBuilder() << "Index build process with id <" << indexBuildId << "> not found"
+ TStringBuilder() << "Index build process with id <" << BuildId << "> not found"
);
}
const auto& indexBuildInfo = *indexBuildInfoPtr->Get();
if (indexBuildInfo.DomainPathId != domainPathId) {
return Reply(
Ydb::StatusIds::BAD_REQUEST,
- TStringBuilder() << "Index build process with id <" << indexBuildId << "> not found in database <" << record.GetDatabaseName() << ">"
+ TStringBuilder() << "Index build process with id <" << BuildId << "> not found in database <" << record.GetDatabaseName() << ">"
);
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__list.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__list.cpp
index 9b1402bcf9a..adef1160159 100644
--- a/ydb/core/tx/schemeshard/schemeshard_build_index__list.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_build_index__list.cpp
@@ -15,7 +15,7 @@ private:
static constexpr ui64 DefaultPage = 1;
public:
explicit TTxList(TSelf* self, TEvIndexBuilder::TEvListRequest::TPtr& ev)
- : TTxSimple(self, ev, TXTYPE_LIST_INDEX_BUILD, false)
+ : TTxSimple(self, InvalidIndexBuildId, ev, TXTYPE_LIST_INDEX_BUILD, false)
{}
bool DoExecute(TTransactionContext&, const TActorContext&) override {
diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp
index dddcbce4aa2..85f453f6faf 100644
--- a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp
@@ -51,7 +51,7 @@ protected:
const NKikimrIndexBuilder::TIndexBuildScanSettings ScanSettings;
const TActorId ResponseActorId;
- const ui64 BuildIndexId = 0;
+ const TIndexBuildId BuildId;
TIndexBuildInfo::TSample::TRows Init;
std::shared_ptr<NTxProxy::TUploadTypes> Types;
@@ -70,7 +70,7 @@ public:
bool isPostingLevel,
const NKikimrIndexBuilder::TIndexBuildScanSettings& scanSettings,
const TActorId& responseActorId,
- ui64 buildIndexId,
+ TIndexBuildId buildId,
TIndexBuildInfo::TSample::TRows init,
NTableIndex::TClusterId parent,
NTableIndex::TClusterId child)
@@ -78,13 +78,13 @@ public:
, IsPostingLevel(isPostingLevel)
, ScanSettings(scanSettings)
, ResponseActorId(responseActorId)
- , BuildIndexId(buildIndexId)
+ , BuildId(buildId)
, Init(std::move(init))
, Parent(parent)
, Child(child)
{
LogPrefix = TStringBuilder()
- << "TUploadSampleK: BuildIndexId: " << BuildIndexId
+ << "TUploadSampleK: BuildIndexId: " << BuildId
<< " ResponseActorId: " << ResponseActorId;
Y_ENSURE(!Init.empty());
Y_ENSURE(Parent < Child);
@@ -186,7 +186,7 @@ private:
}
TAutoPtr<TEvIndexBuilder::TEvUploadSampleKResponse> response = new TEvIndexBuilder::TEvUploadSampleKResponse;
- response->Record.SetId(BuildIndexId);
+ response->Record.SetId(ui64(BuildId));
response->Record.SetUploadRows(Rows->size());
response->Record.SetUploadBytes(RowsBytes);
@@ -259,7 +259,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateIndexPropose(
modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateColumnBuild);
buildInfo.SerializeToProto(ss, modifyScheme.MutableInitiateColumnBuild());
} else {
- Y_ABORT("Unknown operation kind while building CreateIndexPropose");
+ Y_ENSURE(false, "Unknown operation kind while building CreateIndexPropose");
}
LOG_DEBUG_S((TlsActivationContext->AsActorContext()), NKikimrServices::BUILD_INDEX,
@@ -407,7 +407,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> AlterMainTablePropose(
TString error;
if (!ExtractColumnTypeInfo(typeInfo, typeMod, colInfo.DefaultFromLiteral.type(), status, error)) {
// todo gvit fix that
- Y_ABORT("failed to extract column type info");
+ Y_ENSURE(false, "failed to extract column type info");
}
col->SetType(NScheme::TypeName(typeInfo, typeMod));
@@ -510,8 +510,6 @@ using namespace NTabletFlatExecutor;
struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuilder::TTxBase {
private:
- TIndexBuildId BuildId;
-
TMap<TTabletId, THolder<IEventBase>> ToTabletSend;
template <bool WithSnapshot = true, typename Record>
@@ -590,7 +588,7 @@ private:
ev->Record.SetParent(buildInfo.KMeans.Parent);
ev->Record.SetChild(buildInfo.KMeans.Child);
- Y_VERIFY_DEBUG(buildInfo.Sample.Rows.size() <= buildInfo.KMeans.K);
+ Y_ENSURE(buildInfo.Sample.Rows.size() <= buildInfo.KMeans.K);
auto& clusters = *ev->Record.MutableClusters();
clusters.Reserve(buildInfo.Sample.Rows.size());
for (const auto& [_, row] : buildInfo.Sample.Rows) {
@@ -760,7 +758,7 @@ private:
auto path = GetBuildPath(Self, buildInfo, NTableIndex::NTableVectorKmeansTreeIndex::LevelTable);
Y_ENSURE(buildInfo.Sample.Rows.size() <= buildInfo.KMeans.K);
auto actor = new TUploadSampleK(path.PathString(), !buildInfo.KMeans.NeedsAnotherLevel(),
- buildInfo.ScanSettings, Self->SelfId(), ui64(BuildId),
+ buildInfo.ScanSettings, Self->SelfId(), BuildId,
buildInfo.Sample.Rows, buildInfo.KMeans.Parent, buildInfo.KMeans.Child);
TActivationContext::AsActorContext().MakeFor(Self->SelfId()).Register(actor);
@@ -802,7 +800,7 @@ private:
break;
case NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR:
case NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST:
- Y_ABORT("Unreachable");
+ Y_ENSURE(false, "Unreachable");
}
}
@@ -1113,9 +1111,8 @@ private:
}
public:
- explicit TTxProgress(TSelf* self, TIndexBuildId id)
- : TTxBase(self, TXTYPE_PROGRESS_INDEX_BUILD)
- , BuildId(id)
+ explicit TTxProgress(TSelf* self, TIndexBuildId buildId)
+ : TTxBase(self, buildId, TXTYPE_PROGRESS_INDEX_BUILD)
{}
bool DoExecute(TTransactionContext& txc, const TActorContext& ctx) override {
@@ -1128,7 +1125,7 @@ public:
switch (buildInfo.State) {
case TIndexBuildInfo::EState::Invalid:
- Y_ABORT("Unreachable");
+ Y_ENSURE(false, "Unreachable");
case TIndexBuildInfo::EState::AlterMainTable:
if (buildInfo.AlterMainTableTxId == InvalidTxId) {
@@ -1350,6 +1347,32 @@ public:
return true;
}
+ void OnUnhandledException(TTransactionContext& txc, const TActorContext& ctx, TIndexBuildInfo* buildInfo, const std::exception& exc) override {
+ if (!buildInfo) {
+ LOG_N("TTxBuildProgress: OnUnhandledException: BuildIndexId not found "
+ << (BuildId == InvalidIndexBuildId ? TString("") : TStringBuilder() << ", id# " << BuildId));
+ return;
+ }
+
+ NIceDb::TNiceDb db(txc.DB);
+ if (!buildInfo->Issue.Contains(exc.what())) {
+ buildInfo->Issue += TStringBuilder() << "Unhandled exception " << exc.what();
+ }
+ Self->PersistBuildIndexIssue(db, *buildInfo);
+
+ if (buildInfo->State != TIndexBuildInfo::EState::Filling) {
+ // no idea how to gracefully stop index build otherwise
+ // leave everything as is
+ LOG_E("TTxBuildProgress: OnUnhandledException: not a Filling state, id# " << buildInfo->Id
+ << ", TIndexBuildInfo: " << *buildInfo);
+ return;
+ }
+
+ buildInfo->State = TIndexBuildInfo::EState::Rejection_Applying;
+ Self->PersistBuildIndexState(db, *buildInfo);
+ Self->Execute(Self->CreateTxProgress(buildInfo->Id), ctx);
+ }
+
static TSerializedTableRange InfiniteRange(ui32 columns) {
TVector<TCell> vec(columns, TCell());
TArrayRef<TCell> cells(vec);
@@ -1424,20 +1447,18 @@ ITransaction* TSchemeShard::CreateTxProgress(TIndexBuildId id) {
struct TSchemeShard::TIndexBuilder::TTxBilling: public TSchemeShard::TIndexBuilder::TTxBase {
private:
- TIndexBuildId BuildIndexId;
TInstant ScheduledAt;
public:
explicit TTxBilling(TSelf* self, TEvPrivate::TEvIndexBuildingMakeABill::TPtr& ev)
- : TTxBase(self, TXTYPE_PROGRESS_INDEX_BUILD)
- , BuildIndexId(ev->Get()->BuildId)
+ : TTxBase(self, TIndexBuildId(ev->Get()->BuildId), TXTYPE_PROGRESS_INDEX_BUILD)
, ScheduledAt(ev->Get()->SendAt)
{}
bool DoExecute(TTransactionContext& , const TActorContext& ctx) override {
- LOG_I("TTxReply : TTxBilling, id# " << BuildIndexId);
+ LOG_I("TTxReply : TTxBilling, id# " << BuildId);
- const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(BuildIndexId);
+ const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(BuildId);
if (!buildInfoPtr) {
return true;
}
@@ -1456,27 +1477,56 @@ public:
void DoComplete(const TActorContext&) override {
}
+
+ void OnUnhandledException(TTransactionContext&, const TActorContext&, TIndexBuildInfo*, const std::exception&) override {
+ // seems safe to ignore
+ }
};
struct TSchemeShard::TIndexBuilder::TTxReply: public TSchemeShard::TIndexBuilder::TTxBase {
public:
- explicit TTxReply(TSelf* self)
- : TTxBase(self, TXTYPE_PROGRESS_INDEX_BUILD)
+ explicit TTxReply(TSelf* self, TIndexBuildId buildId)
+ : TTxBase(self, buildId, TXTYPE_PROGRESS_INDEX_BUILD)
{}
+ void OnUnhandledException(TTransactionContext& txc, const TActorContext& ctx, TIndexBuildInfo* buildInfo, const std::exception& exc) override {
+ if (!buildInfo) {
+ LOG_E("TTxReply : OnUnhandledException BuildIndexId not found"
+ << (BuildId == InvalidIndexBuildId ? TString("") : TStringBuilder() << ", id# " << BuildId));
+ return;
+ }
+
+ NIceDb::TNiceDb db(txc.DB);
+ if (!buildInfo->Issue.Contains(exc.what())) {
+ buildInfo->Issue += TStringBuilder() << "Unhandled exception " << exc.what();
+ }
+ Self->PersistBuildIndexIssue(db, *buildInfo);
+
+ if (buildInfo->State != TIndexBuildInfo::EState::Filling) {
+ // most replies are used at Filling stage
+ // no idea how to gracefully stop index build otherwise
+ // leave everything as is
+ LOG_E("TTxReply : OnUnhandledException not a Filling state, id# " << buildInfo->Id
+ << ", TIndexBuildInfo: " << *buildInfo);
+ return;
+ }
+
+ buildInfo->State = TIndexBuildInfo::EState::Rejection_Applying;
+ Self->PersistBuildIndexState(db, *buildInfo);
+ Self->Execute(Self->CreateTxProgress(buildInfo->Id), ctx);
+ }
+
void DoComplete(const TActorContext&) override {
}
};
struct TSchemeShard::TIndexBuilder::TTxReplyRetry: public TSchemeShard::TIndexBuilder::TTxReply {
private:
- TIndexBuildId BuildId;
TTabletId ShardId;
public:
explicit TTxReplyRetry(TSelf* self, TIndexBuildId buildId, TTabletId shardId)
- : TTxReply(self)
- , BuildId(buildId)
+ : TTxReply(self, buildId)
, ShardId(shardId)
{}
@@ -1526,22 +1576,21 @@ private:
public:
explicit TTxReplySampleK(TSelf* self, TEvDataShard::TEvSampleKResponse::TPtr& sampleK)
- : TTxReply(self)
+ : TTxReply(self, TIndexBuildId(sampleK->Get()->Record.GetId()))
, SampleK(sampleK)
{
}
bool DoExecute(TTransactionContext& txc, const TActorContext& ctx) override {
auto& record = SampleK->Get()->Record;
- const auto buildId = TIndexBuildId(record.GetId());
TTabletId shardId = TTabletId(record.GetTabletId());
TShardIdx shardIdx = Self->GetShardIdx(shardId);
- LOG_N("TTxReply : TEvSampleKResponse, id# " << buildId
+ LOG_N("TTxReply : TEvSampleKResponse, id# " << BuildId
<< ", shardId# " << shardId
<< ", shardIdx# " << shardIdx);
- const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(buildId);
+ const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(BuildId);
if (!buildInfoPtr) {
return true;
}
@@ -1558,7 +1607,7 @@ public:
}
if (buildInfo.State != TIndexBuildInfo::EState::Filling) {
- LOG_I("TTxReply : TEvSampleKResponse superfluous event, id# " << buildId);
+ LOG_I("TTxReply : TEvSampleKResponse superfluous event, id# " << BuildId);
return true;
}
@@ -1633,16 +1682,16 @@ public:
<< ", shardIdx: " << shardIdx;
Self->PersistBuildIndexIssue(db, buildInfo);
ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying);
- Progress(buildId);
+ Progress(BuildId);
return true;
case NKikimrIndexBuilder::EBuildStatus::INVALID:
case NKikimrIndexBuilder::EBuildStatus::ACCEPTED:
case NKikimrIndexBuilder::EBuildStatus::IN_PROGRESS:
- Y_ABORT("Unreachable");
+ Y_ENSURE(false, "Unreachable");
}
- Self->PersistBuildIndexUploadProgress(db, buildId, shardIdx, shardStatus);
- Self->IndexBuildPipes.Close(buildId, shardId, ctx);
- Progress(buildId);
+ Self->PersistBuildIndexUploadProgress(db, BuildId, shardIdx, shardStatus);
+ Self->IndexBuildPipes.Close(BuildId, shardId, ctx);
+ Progress(BuildId);
return true;
}
@@ -1654,22 +1703,21 @@ private:
public:
explicit TTxReplyLocalKMeans(TSelf* self, TEvDataShard::TEvLocalKMeansResponse::TPtr& local)
- : TTxReply(self)
+ : TTxReply(self, TIndexBuildId(local->Get()->Record.GetId()))
, Local(local)
{
}
bool DoExecute(TTransactionContext& txc, const TActorContext& ctx) override {
const auto& record = Local->Get()->Record;
- const auto buildId = TIndexBuildId(record.GetId());
TTabletId shardId = TTabletId(record.GetTabletId());
TShardIdx shardIdx = Self->GetShardIdx(shardId);
- LOG_N("TTxReply : TEvLocalKMeansResponse, id# " << record.GetId()
+ LOG_N("TTxReply : TEvLocalKMeansResponse, id# " << BuildId
<< ", shardId# " << shardId
<< ", shardIdx# " << shardIdx);
- const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(buildId);
+ const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(BuildId);
if (!buildInfoPtr) {
return true;
}
@@ -1686,7 +1734,7 @@ public:
}
if (buildInfo.State != TIndexBuildInfo::EState::Filling) {
- LOG_I("TTxReply : TEvLocalKMeansResponse superfluous event, id# " << buildId);
+ LOG_I("TTxReply : TEvLocalKMeansResponse superfluous event, id# " << BuildId);
return true;
}
@@ -1735,16 +1783,16 @@ public:
<< ", shardIdx: " << shardIdx;
Self->PersistBuildIndexIssue(db, buildInfo);
ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying);
- Progress(buildId);
+ Progress(BuildId);
return true;
case NKikimrIndexBuilder::EBuildStatus::INVALID:
case NKikimrIndexBuilder::EBuildStatus::ACCEPTED:
case NKikimrIndexBuilder::EBuildStatus::IN_PROGRESS:
- Y_ABORT("Unreachable");
+ Y_ENSURE(false, "Unreachable");
}
- Self->PersistBuildIndexUploadProgress(db, buildId, shardIdx, shardStatus);
- Self->IndexBuildPipes.Close(buildId, shardId, ctx);
- Progress(buildId);
+ Self->PersistBuildIndexUploadProgress(db, BuildId, shardIdx, shardStatus);
+ Self->IndexBuildPipes.Close(BuildId, shardId, ctx);
+ Progress(BuildId);
return true;
}
@@ -1756,22 +1804,21 @@ private:
public:
explicit TTxReplyReshuffleKMeans(TSelf* self, TEvDataShard::TEvReshuffleKMeansResponse::TPtr& reshuffle)
- : TTxReply(self)
+ : TTxReply(self, TIndexBuildId(reshuffle->Get()->Record.GetId()))
, Reshuffle(reshuffle)
{
}
bool DoExecute(TTransactionContext& txc, const TActorContext& ctx) override {
const auto& record = Reshuffle->Get()->Record;
- const auto buildId = TIndexBuildId(record.GetId());
TTabletId shardId = TTabletId(record.GetTabletId());
TShardIdx shardIdx = Self->GetShardIdx(shardId);
- LOG_N("TTxReply : TEvReshuffleKMeansResponse, id# " << record.GetId()
+ LOG_N("TTxReply : TEvReshuffleKMeansResponse, id# " << BuildId
<< ", shardId# " << shardId
<< ", shardIdx# " << shardIdx);
- const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(buildId);
+ const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(BuildId);
if (!buildInfoPtr) {
return true;
}
@@ -1788,7 +1835,7 @@ public:
}
if (buildInfo.State != TIndexBuildInfo::EState::Filling) {
- LOG_I("TTxReply : TEvReshuffleKMeansResponse superfluous event, id# " << buildId);
+ LOG_I("TTxReply : TEvReshuffleKMeansResponse superfluous event, id# " << BuildId);
return true;
}
@@ -1837,16 +1884,16 @@ public:
<< ", shardIdx: " << shardIdx;
Self->PersistBuildIndexIssue(db, buildInfo);
ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying);
- Progress(buildId);
+ Progress(BuildId);
return true;
case NKikimrIndexBuilder::EBuildStatus::INVALID:
case NKikimrIndexBuilder::EBuildStatus::ACCEPTED:
case NKikimrIndexBuilder::EBuildStatus::IN_PROGRESS:
- Y_ABORT("Unreachable");
+ Y_ENSURE(false, "Unreachable");
}
- Self->PersistBuildIndexUploadProgress(db, buildId, shardIdx, shardStatus);
- Self->IndexBuildPipes.Close(buildId, shardId, ctx);
- Progress(buildId);
+ Self->PersistBuildIndexUploadProgress(db, BuildId, shardIdx, shardStatus);
+ Self->IndexBuildPipes.Close(BuildId, shardId, ctx);
+ Progress(BuildId);
return true;
}
@@ -1858,22 +1905,21 @@ private:
public:
explicit TTxReplyPrefixKMeans(TSelf* self, TEvDataShard::TEvPrefixKMeansResponse::TPtr& prefix)
- : TTxReply(self)
+ : TTxReply(self, TIndexBuildId(prefix->Get()->Record.GetId()))
, Prefix(prefix)
{
}
bool DoExecute(TTransactionContext& txc, const TActorContext& ctx) override {
const auto& record = Prefix->Get()->Record;
- const auto buildId = TIndexBuildId(record.GetId());
TTabletId shardId = TTabletId(record.GetTabletId());
TShardIdx shardIdx = Self->GetShardIdx(shardId);
- LOG_N("TTxReply : TEvPrefixKMeansResponse, id# " << record.GetId()
+ LOG_N("TTxReply : TEvPrefixKMeansResponse, id# " << BuildId
<< ", shardId# " << shardId
<< ", shardIdx# " << shardIdx);
- const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(buildId);
+ const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(BuildId);
if (!buildInfoPtr) {
return true;
}
@@ -1890,7 +1936,7 @@ public:
}
if (buildInfo.State != TIndexBuildInfo::EState::Filling) {
- LOG_I("TTxReply : TEvPrefixKMeansResponse superfluous event, id# " << buildId);
+ LOG_I("TTxReply : TEvPrefixKMeansResponse superfluous event, id# " << BuildId);
return true;
}
@@ -1939,16 +1985,16 @@ public:
<< ", shardIdx: " << shardIdx;
Self->PersistBuildIndexIssue(db, buildInfo);
ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying);
- Progress(buildId);
+ Progress(BuildId);
return true;
case NKikimrIndexBuilder::EBuildStatus::INVALID:
case NKikimrIndexBuilder::EBuildStatus::ACCEPTED:
case NKikimrIndexBuilder::EBuildStatus::IN_PROGRESS:
- Y_ABORT("Unreachable");
+ Y_ENSURE(false, "Unreachable");
}
- Self->PersistBuildIndexUploadProgress(db, buildId, shardIdx, shardStatus);
- Self->IndexBuildPipes.Close(buildId, shardId, ctx);
- Progress(buildId);
+ Self->PersistBuildIndexUploadProgress(db, BuildId, shardIdx, shardStatus);
+ Self->IndexBuildPipes.Close(BuildId, shardId, ctx);
+ Progress(BuildId);
return true;
}
@@ -1959,19 +2005,18 @@ private:
TEvIndexBuilder::TEvUploadSampleKResponse::TPtr UploadSample;
public:
- explicit TTxReplyUploadSample(TSelf* self, TEvIndexBuilder::TEvUploadSampleKResponse::TPtr& upload)
- : TTxReply(self)
- , UploadSample(upload)
+ explicit TTxReplyUploadSample(TSelf* self, TEvIndexBuilder::TEvUploadSampleKResponse::TPtr& uploadSample)
+ : TTxReply(self, TIndexBuildId(uploadSample->Get()->Record.GetId()))
+ , UploadSample(uploadSample)
{
}
bool DoExecute([[maybe_unused]] TTransactionContext& txc, [[maybe_unused]] const TActorContext& ctx) override {
const auto& record = UploadSample->Get()->Record;
- const auto buildId = TIndexBuildId(record.GetId());
- LOG_N("TTxReply : TEvUploadSampleKResponse, id# " << record.GetId());
+ LOG_N("TTxReply : TEvUploadSampleKResponse, id# " << BuildId);
- const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(buildId);
+ const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(BuildId);
if (!buildInfoPtr) {
return true;
}
@@ -1983,7 +2028,7 @@ public:
Y_ENSURE(buildInfo.IsBuildVectorIndex());
if (buildInfo.State != TIndexBuildInfo::EState::Filling) {
- LOG_I("TTxReply : TEvUploadSampleKResponse superfluous event, id# " << buildId);
+ LOG_I("TTxReply : TEvUploadSampleKResponse superfluous event, id# " << BuildId);
return true;
}
Y_ENSURE(buildInfo.Sample.State == TIndexBuildInfo::TSample::EState::Upload);
@@ -2002,14 +2047,14 @@ public:
auto status = record.GetUploadStatus();
if (status == Ydb::StatusIds::SUCCESS) {
buildInfo.Sample.State = TIndexBuildInfo::TSample::EState::Done;
- Progress(buildId);
+ Progress(BuildId);
} else {
NYql::TIssues issues;
NYql::IssuesFromMessage(record.GetIssues(), issues);
buildInfo.Issue += issues.ToString();
Self->PersistBuildIndexIssue(db, buildInfo);
ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying);
- Progress(buildId);
+ Progress(BuildId);
}
return true;
@@ -2021,22 +2066,21 @@ private:
TEvDataShard::TEvBuildIndexProgressResponse::TPtr ShardProgress;
public:
explicit TTxReplyProgress(TSelf* self, TEvDataShard::TEvBuildIndexProgressResponse::TPtr& shardProgress)
- : TTxReply(self)
+ : TTxReply(self, TIndexBuildId(shardProgress->Get()->Record.GetId()))
, ShardProgress(shardProgress)
{}
bool DoExecute(TTransactionContext& txc, const TActorContext& ctx) override {
const auto& record = ShardProgress->Get()->Record;
- const auto buildId = TIndexBuildId(record.GetId());
TTabletId shardId = TTabletId(record.GetTabletId());
TShardIdx shardIdx = Self->GetShardIdx(shardId);
- LOG_I("TTxReply : TEvBuildIndexProgressResponse, id# " << record.GetId()
+ LOG_I("TTxReply : TEvBuildIndexProgressResponse, id# " << BuildId
<< ", shardId# " << shardId
<< ", shardIdx# " << shardIdx);
- const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(buildId);
+ const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(BuildId);
if (!buildInfoPtr) {
return true;
}
@@ -2052,7 +2096,7 @@ public:
}
if (buildInfo.State != TIndexBuildInfo::EState::Filling) {
- LOG_I("TTxReply : TEvBuildIndexProgressResponse superfluous event, id# " << buildId);
+ LOG_I("TTxReply : TEvBuildIndexProgressResponse superfluous event, id# " << BuildId);
return true;
}
@@ -2111,11 +2155,11 @@ public:
switch (shardStatus.Status) {
case NKikimrIndexBuilder::EBuildStatus::INVALID:
- Y_ABORT("Unreachable");
+ Y_ENSURE(false, "Unreachable");
case NKikimrIndexBuilder::EBuildStatus::ACCEPTED:
case NKikimrIndexBuilder::EBuildStatus::IN_PROGRESS:
// no oop, wait resolution. Progress key are persisted
- Self->PersistBuildIndexUploadProgress(db, buildId, shardIdx, shardStatus);
+ Self->PersistBuildIndexUploadProgress(db, BuildId, shardIdx, shardStatus);
return true;
case NKikimrIndexBuilder::EBuildStatus::DONE:
if (buildInfo.InProgressShards.erase(shardIdx)) {
@@ -2137,12 +2181,12 @@ public:
<< ", shardIdx: " << shardIdx;
Self->PersistBuildIndexIssue(db, buildInfo);
ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying);
- Progress(buildId);
+ Progress(BuildId);
return true;
}
- Self->PersistBuildIndexUploadProgress(db, buildId, shardIdx, shardStatus);
- Self->IndexBuildPipes.Close(buildId, shardId, ctx);
- Progress(buildId);
+ Self->PersistBuildIndexUploadProgress(db, BuildId, shardIdx, shardStatus);
+ Self->IndexBuildPipes.Close(BuildId, shardId, ctx);
+ Progress(BuildId);
return true;
}
@@ -2153,7 +2197,7 @@ private:
TTxId CompletedTxId;
public:
explicit TTxReplyCompleted(TSelf* self, TTxId completedTxId)
- : TTxReply(self)
+ : TTxReply(self, InvalidIndexBuildId)
, CompletedTxId(completedTxId)
{}
@@ -2168,9 +2212,9 @@ public:
return true;
}
- const auto buildId = *buildIdPtr;
- auto& buildInfo = *Self->IndexBuilds.at(buildId);
- LOG_I("TTxReply : TEvNotifyTxCompletionResult, id# " << buildId
+ BuildId = *buildIdPtr;
+ auto& buildInfo = *Self->IndexBuilds.at(BuildId);
+ LOG_I("TTxReply : TEvNotifyTxCompletionResult, id# " << BuildId
<< ", txId# " << txId);
LOG_D("TTxReply : TEvNotifyTxCompletionResult"
<< ", TIndexBuildInfo: " << buildInfo
@@ -2235,7 +2279,7 @@ public:
Y_ENSURE(false, "Unreachable " << state);
}
- Progress(buildId);
+ Progress(BuildId);
return true;
}
@@ -2246,7 +2290,7 @@ private:
TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr ModifyResult;
public:
explicit TTxReplyModify(TSelf* self, TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& modifyResult)
- : TTxReply(self)
+ : TTxReply(self, InvalidIndexBuildId)
, ModifyResult(modifyResult)
{}
@@ -2287,11 +2331,11 @@ public:
return true;
}
- const auto buildId = *buildIdPtr;
+ BuildId = *buildIdPtr;
// We need this because we use buildInfo after EraseBuildInfo
- auto buildInfoPin = Self->IndexBuilds.at(buildId);
+ auto buildInfoPin = Self->IndexBuilds.at(BuildId);
auto& buildInfo = *buildInfoPin;
- LOG_I("TTxReply : TEvModifySchemeTransactionResult, id# " << buildId
+ LOG_I("TTxReply : TEvModifySchemeTransactionResult, id# " << BuildId
<< ", cookie: " << ModifyResult->Cookie
<< ", record: " << record.ShortDebugString()
<< ", status: " << NKikimrScheme::EStatus_Name(record.GetStatus()));
@@ -2324,7 +2368,7 @@ public:
if (record.GetStatus() == NKikimrScheme::StatusAccepted) {
// no op
} else if (record.GetStatus() == NKikimrScheme::StatusAlreadyExists) {
- Y_ABORT("NEED MORE TESTING");
+ Y_ENSURE(false, "NEED MORE TESTING");
// no op
} else {
buildInfo.Issue += TStringBuilder()
@@ -2430,7 +2474,7 @@ public:
Y_ENSURE(false, "Unreachable " << state);
}
- Progress(buildId);
+ Progress(BuildId);
return true;
}
@@ -2441,15 +2485,14 @@ private:
TEvTxAllocatorClient::TEvAllocateResult::TPtr AllocateResult;
public:
explicit TTxReplyAllocate(TSelf* self, TEvTxAllocatorClient::TEvAllocateResult::TPtr& allocateResult)
- : TTxReply(self)
+ : TTxReply(self, TIndexBuildId(allocateResult->Cookie))
, AllocateResult(allocateResult)
{}
bool DoExecute(TTransactionContext& txc,[[maybe_unused]] const TActorContext& ctx) override {
- TIndexBuildId buildId = TIndexBuildId(AllocateResult->Cookie);
const auto txId = TTxId(AllocateResult->Get()->TxIds.front());
- const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(buildId);
+ const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(BuildId);
if (!buildInfoPtr) {
LOG_I("TTxReply : TEvAllocateResult superfluous message"
<< ", cookie: " << AllocateResult->Cookie
@@ -2459,7 +2502,7 @@ public:
}
auto& buildInfo = *buildInfoPtr->Get();
- LOG_I("TTxReply : TEvAllocateResult, id# " << buildId
+ LOG_I("TTxReply : TEvAllocateResult, id# " << BuildId
<< ", txId# " << txId);
LOG_D("TTxReply : TEvAllocateResult"
<< ", TIndexBuildInfo: " << buildInfo
@@ -2515,7 +2558,7 @@ public:
Y_ENSURE(false, "Unreachable " << state);
}
- Progress(buildId);
+ Progress(BuildId);
return true;
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp
index eb84bb449a2..647c7b8e44e 100644
--- a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp
@@ -428,7 +428,18 @@ bool TSchemeShard::TIndexBuilder::TTxBase::GotScheduledBilling(TIndexBuildInfo&
}
bool TSchemeShard::TIndexBuilder::TTxBase::Execute(TTransactionContext& txc, const TActorContext& ctx) {
- if (!DoExecute(txc, ctx)) {
+ bool executeResult;
+
+ try {
+ executeResult = DoExecute(txc, ctx);
+ } catch (const std::exception& exc) {
+ if (OnUnhandledExceptionSafe(txc, ctx, exc)) {
+ return true;
+ }
+ throw; // fail process, a really bad thing has happened
+ }
+
+ if (!executeResult) {
return false;
}
@@ -436,6 +447,30 @@ bool TSchemeShard::TIndexBuilder::TTxBase::Execute(TTransactionContext& txc, con
return true;
}
+bool TSchemeShard::TIndexBuilder::TTxBase::OnUnhandledExceptionSafe(TTransactionContext& txc, const TActorContext& ctx, const std::exception& originalExc) {
+ try {
+ const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(BuildId);
+ TIndexBuildInfo* buildInfo = buildInfoPtr
+ ? buildInfoPtr->Get()
+ : nullptr;
+
+ LOG_E("Unhandled exception, id#"
+ << (BuildId == InvalidIndexBuildId ? TString("<no id>") : TStringBuilder() << BuildId)
+ << " " << TypeName(originalExc) << ": " << originalExc.what() << Endl
+ << TBackTrace::FromCurrentException().PrintToString()
+ << ", TIndexBuildInfo: " << (buildInfo ? TStringBuilder() << (*buildInfo) : TString("<no build info>")));
+
+ OnUnhandledException(txc, ctx, buildInfo, originalExc);
+
+ return true;
+ } catch (const std::exception& handleExc) {
+ LOG_E("OnUnhandledException throws unhandled exception "
+ << TypeName(handleExc) << ": " << handleExc.what() << Endl
+ << TBackTrace::FromCurrentException().PrintToString());
+ return false;
+ }
+}
+
void TSchemeShard::TIndexBuilder::TTxBase::Complete(const TActorContext& ctx) {
DoComplete(ctx);
diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.h b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.h
index 02f96b34aef..52d63138167 100644
--- a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.h
+++ b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.h
@@ -11,6 +11,9 @@ namespace NSchemeShard {
class TSchemeShard::TIndexBuilder::TTxBase: public NTabletFlatExecutor::TTransactionBase<TSchemeShard> {
private:
TSideEffects SideEffects;
+protected:
+ TIndexBuildId BuildId;
+private:
const NKikimr::NSchemeShard::ETxTypes TxType;
public:
const TString LogPrefix;
@@ -29,6 +32,7 @@ private:
ui64 RequestUnits(const TBillingStats& stats);
void RoundPeriod(TInstant& start, TInstant& end);
void ApplyBill(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx);
+ bool OnUnhandledExceptionSafe(TTransactionContext& txc, const TActorContext& ctx, const std::exception& exc);
protected:
void Send(TActorId dst, THolder<IEventBase> message, ui32 flags = 0, ui64 cookie = 0);
@@ -48,8 +52,9 @@ protected:
bool GotScheduledBilling(TIndexBuildInfo& indexBuildInfo);
public:
- explicit TTxBase(TSelf* self, NKikimr::NSchemeShard::ETxTypes txType)
+ explicit TTxBase(TSelf* self, TIndexBuildId buildId, NKikimr::NSchemeShard::ETxTypes txType)
: TBase(self)
+ , BuildId(buildId)
, TxType(txType)
, LogPrefix(TStringBuilder() << "TIndexBuilder::" << NKikimr::NSchemeShard::ETxTypes_Name(txType) << ": ")
{ }
@@ -60,6 +65,7 @@ public:
virtual bool DoExecute(TTransactionContext& txc, const TActorContext& ctx) = 0;
virtual void DoComplete(const TActorContext& ctx) = 0;
+ virtual void OnUnhandledException(TTransactionContext& txc, const TActorContext& ctx, TIndexBuildInfo* buildInfo, const std::exception& exc) = 0;
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
void Complete(const TActorContext& ctx) override;
@@ -72,12 +78,17 @@ public:
THolder<TResponse> Response;
const bool IsMutableOperation;
- explicit TTxSimple(TSelf* self, typename TRequest::TPtr& ev, NKikimr::NSchemeShard::ETxTypes txType, bool isMutableOperation = true)
- : TTxBase(self, txType)
+ explicit TTxSimple(TSelf* self, TIndexBuildId buildId, typename TRequest::TPtr& ev, NKikimr::NSchemeShard::ETxTypes txType, bool isMutableOperation = true)
+ : TTxBase(self, buildId, txType)
, Request(ev)
, IsMutableOperation(isMutableOperation)
{ }
+ void OnUnhandledException(TTransactionContext&, const TActorContext&, TIndexBuildInfo*, const std::exception& exc) override {
+ Reply(Ydb::StatusIds::INTERNAL_ERROR, TStringBuilder()
+ << "Unhandled exception " << exc.what());
+ }
+
bool Reply(const Ydb::StatusIds::StatusCode status = Ydb::StatusIds::SUCCESS, const TString& errorMessage = TString())
{
Y_ABORT_UNLESS(Response);
diff --git a/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp b/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp
index 8bae31a12a1..49e2b911029 100644
--- a/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp
+++ b/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp
@@ -419,4 +419,151 @@ Y_UNIT_TEST_SUITE (VectorIndexBuildTest) {
});
}
}
+
+ Y_UNIT_TEST(TTxReply_DoExecute_Throws) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+ ui64 txId = 100;
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::BUILD_INDEX, NLog::PRI_TRACE);
+
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "vectors"
+ Columns { Name: "id" Type: "Uint64" }
+ Columns { Name: "embedding" Type: "String" }
+ KeyColumnNames: [ "id" ]
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ NYdb::NTable::TGlobalIndexSettings globalIndexSettings;
+
+ std::unique_ptr<NYdb::NTable::TKMeansTreeSettings> kmeansTreeSettings;
+ {
+ Ydb::Table::KMeansTreeSettings proto;
+ UNIT_ASSERT(google::protobuf::TextFormat::ParseFromString(R"(
+ settings {
+ metric: DISTANCE_COSINE
+ vector_type: VECTOR_TYPE_FLOAT
+ vector_dimension: 1024
+ }
+ levels: 5
+ clusters: 4
+ )", &proto));
+ using T = NYdb::NTable::TKMeansTreeSettings;
+ kmeansTreeSettings = std::make_unique<T>(T::FromProto(proto));
+ }
+
+ TBlockEvents<TEvDataShard::TEvLocalKMeansResponse> blocked(runtime, [&](auto& ev) {
+ ev->Get()->Record.SetRequestSeqNoRound(999);
+ return true;
+ });
+
+ const ui64 buildIndexTx = ++txId;
+ const TVector<TString> dataColumns;
+ const TVector<TString> indexColumns{"embedding"};
+ AsyncBuildVectorIndex(runtime, buildIndexTx, TTestTxConfig::SchemeShard, "/MyRoot", "/MyRoot/vectors", "index1", "embedding");
+
+ runtime.WaitFor("block", [&]{ return blocked.size(); });
+ blocked.Stop().Unblock();
+
+ env.TestWaitNotification(runtime, buildIndexTx);
+
+ {
+ auto buildIndexOperation = TestGetBuildIndex(runtime, TTestTxConfig::SchemeShard, "/MyRoot", buildIndexTx);
+ UNIT_ASSERT_VALUES_EQUAL_C(
+ buildIndexOperation.GetIndexBuild().GetState(), Ydb::Table::IndexBuildState::STATE_REJECTED,
+ buildIndexOperation.DebugString()
+ );
+ UNIT_ASSERT_STRING_CONTAINS(buildIndexOperation.DebugString(), "Condition violated: `actualSeqNo > recordSeqNo");
+ }
+
+ RebootTablet(runtime, TTestTxConfig::SchemeShard, runtime.AllocateEdgeActor());
+
+ {
+ auto buildIndexOperation = TestGetBuildIndex(runtime, TTestTxConfig::SchemeShard, "/MyRoot", buildIndexTx);
+ UNIT_ASSERT_VALUES_EQUAL_C(
+ buildIndexOperation.GetIndexBuild().GetState(), Ydb::Table::IndexBuildState::STATE_REJECTED,
+ buildIndexOperation.DebugString()
+ );
+ UNIT_ASSERT_STRING_CONTAINS(buildIndexOperation.DebugString(), "Unhandled exception");
+ UNIT_ASSERT_STRING_CONTAINS(buildIndexOperation.DebugString(), "Condition violated: `actualSeqNo > recordSeqNo");
+ }
+ }
+
+ Y_UNIT_TEST(TTxProgress_Throws) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+ ui64 txId = 100;
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::BUILD_INDEX, NLog::PRI_TRACE);
+
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "vectors"
+ Columns { Name: "id" Type: "Uint64" }
+ Columns { Name: "embedding" Type: "String" }
+ KeyColumnNames: [ "id" ]
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ NYdb::NTable::TGlobalIndexSettings globalIndexSettings;
+
+ std::unique_ptr<NYdb::NTable::TKMeansTreeSettings> kmeansTreeSettings;
+ {
+ Ydb::Table::KMeansTreeSettings proto;
+ UNIT_ASSERT(google::protobuf::TextFormat::ParseFromString(R"(
+ settings {
+ metric: DISTANCE_COSINE
+ vector_type: VECTOR_TYPE_FLOAT
+ vector_dimension: 1024
+ }
+ levels: 5
+ clusters: 4
+ )", &proto));
+ using T = NYdb::NTable::TKMeansTreeSettings;
+ kmeansTreeSettings = std::make_unique<T>(T::FromProto(proto));
+ }
+
+ const ui64 buildIndexTx = ++txId;
+ const TVector<TString> dataColumns;
+ const TVector<TString> indexColumns{"embedding"};
+ AsyncBuildVectorIndex(runtime, buildIndexTx, TTestTxConfig::SchemeShard, "/MyRoot", "/MyRoot/vectors", "index1", "embedding");
+
+ env.TestWaitNotification(runtime, buildIndexTx);
+
+ {
+ auto buildIndexOperation = TestGetBuildIndex(runtime, TTestTxConfig::SchemeShard, "/MyRoot", buildIndexTx);
+ UNIT_ASSERT_VALUES_EQUAL_C(
+ buildIndexOperation.GetIndexBuild().GetState(), Ydb::Table::IndexBuildState::STATE_DONE,
+ buildIndexOperation.DebugString()
+ );
+ }
+
+ { // set 'Invalid' state
+ TString writeQuery = Sprintf(R"(
+ (
+ (let key '( '('Id (Uint64 '%lu)) ) )
+ (let value '('('State (Uint32 '0)) ) )
+ (return (AsList (UpdateRow 'IndexBuild key value) ))
+ )
+ )", buildIndexTx);
+ NKikimrMiniKQL::TResult result;
+ TString err;
+ NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, TTestTxConfig::SchemeShard, writeQuery, result, err);
+ UNIT_ASSERT_VALUES_EQUAL_C(status, NKikimrProto::EReplyStatus::OK, err);
+ }
+
+ RebootTablet(runtime, TTestTxConfig::SchemeShard, runtime.AllocateEdgeActor());
+
+ {
+ auto buildIndexOperation = TestGetBuildIndex(runtime, TTestTxConfig::SchemeShard, "/MyRoot", buildIndexTx);
+ UNIT_ASSERT_VALUES_EQUAL_C(
+ buildIndexOperation.GetIndexBuild().GetState(), Ydb::Table::IndexBuildState::STATE_UNSPECIFIED,
+ buildIndexOperation.DebugString()
+ );
+ UNIT_ASSERT_STRING_CONTAINS(buildIndexOperation.DebugString(), "Unhandled exception");
+ UNIT_ASSERT_STRING_CONTAINS(buildIndexOperation.DebugString(), "Unreachable");
+ }
+ }
}