diff options
author | stanislav_shchetinin <st-shchetinin@ydb.tech> | 2025-03-14 15:23:03 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-03-14 12:23:03 +0000 |
commit | 0a97f4cb25aaf11d7d4c67e36ffd89d53a020095 (patch) | |
tree | 6609602e5ccfc4848e60ac46d435c2b733c99c09 | |
parent | 2bcfa98fde6424e45d5f162360b439e34cc5c45a (diff) | |
download | ydb-0a97f4cb25aaf11d7d4c67e36ffd89d53a020095.tar.gz |
Сancellation of the operation if CreateChangefeedPropose returns nullptr (#15713)
4 files changed, 33 insertions, 16 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp index c9dd2323e3..8dd8523e83 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp @@ -526,7 +526,7 @@ private: return true; } - void CreateChangefeed(TImportInfo::TPtr importInfo, ui32 itemIdx, TTxId txId) { + bool CreateChangefeed(TImportInfo::TPtr importInfo, ui32 itemIdx, TTxId txId, TString& error) { Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); auto& item = importInfo->Items.at(itemIdx); item.SubState = ESubState::Proposed; @@ -537,11 +537,14 @@ private: << ", txId# " << txId); Y_ABORT_UNLESS(item.WaitTxId == InvalidTxId); - - auto propose = CreateChangefeedPropose(Self, txId, item); - Y_ABORT_UNLESS(propose); + + auto propose = CreateChangefeedPropose(Self, txId, item, error); + if (!propose) { + return false; + } Send(Self->SelfId(), std::move(propose)); + return true; } void CreateConsumers(TImportInfo::TPtr importInfo, ui32 itemIdx, TTxId txId) { @@ -1080,7 +1083,11 @@ private: case EState::CreateChangefeed: if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) { - CreateChangefeed(importInfo, i, txId); + TString error; + if (!CreateChangefeed(importInfo, i, txId, error)) { + NIceDb::TNiceDb db(txc.DB); + CancelAndPersist(db, importInfo, i, error, "creation changefeed failed"); + } } else { CreateConsumers(importInfo, i, txId); } diff --git a/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp b/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp index da36478b7a..602ddf438e 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp @@ -237,7 +237,8 @@ THolder<TEvIndexBuilder::TEvCancelRequest> CancelIndexBuildPropose( THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose( TSchemeShard* ss, TTxId txId, - const TImportInfo::TItem& item + const TImportInfo::TItem& item, + TString& error ) { Y_ABORT_UNLESS(item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size()); @@ -255,10 +256,8 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose( modifyScheme.SetWorkingDir(dstPath.Parent().PathString()); cdcStream.SetTableName(dstPath.LeafName()); - TString error; - Ydb::StatusIds::StatusCode status; - auto& cdcStreamDescription = *cdcStream.MutableStreamDescription(); + Ydb::StatusIds::StatusCode status; if (!FillChangefeedDescription(cdcStreamDescription, changefeed, status, error)) { return nullptr; } @@ -271,6 +270,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose( i64 minActivePartitions = topic.partitioning_settings().min_active_partitions(); if (minActivePartitions < 0) { + error = "minActivePartitions must be >= 0"; return nullptr; } else if (minActivePartitions == 0) { minActivePartitions = 1; @@ -284,6 +284,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose( i64 maxActivePartitions = topic.partitioning_settings().max_active_partitions(); if (maxActivePartitions < 0) { + error = "maxActivePartitions must be >= 0"; return nullptr; } else if (maxActivePartitions == 0) { maxActivePartitions = 50; diff --git a/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.h b/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.h index 107db1b34c..6d1615f0cf 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.h +++ b/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.h @@ -49,7 +49,8 @@ THolder<TEvIndexBuilder::TEvCancelRequest> CancelIndexBuildPropose( THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose( TSchemeShard* ss, TTxId txId, - const TImportInfo::TItem& item + const TImportInfo::TItem& item, + TString& error ); THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateConsumersPropose( diff --git a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp index b015ab6f4c..b6c32293ae 100644 --- a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp +++ b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp @@ -5203,21 +5203,21 @@ Y_UNIT_TEST_SUITE(TImportTests) { TestImportChangefeeds(3, AddedSchemeWithPermissions); } - void TestCreateCdcStreams(TTestEnv& env, TTestActorRuntime& runtime, ui64& txId, const TString& dbName, ui64 count) { + void TestCreateCdcStreams(TTestEnv& env, TTestActorRuntime& runtime, ui64& txId, const TString& dbName, ui64 count, bool isShouldSuccess) { for (ui64 i = 1; i <= count; ++i) { TestCreateCdcStream(runtime, ++txId, dbName, Sprintf(R"( TableName: "Original" StreamDescription { Name: "Stream_%d" Mode: ECdcStreamModeKeysOnly - Format: ECdcStreamFormatJson + Format: %s } - )", i)); + )", i, isShouldSuccess ? "ECdcStreamFormatJson" : "ECdcStreamFormatProto")); env.TestWaitNotification(runtime, txId); } } - Y_UNIT_TEST(ChangefeedsExportRestore) { + void ChangefeedsExportRestore(bool isShouldSuccess) { TPortManager portManager; const ui16 port = portManager.GetPort(); @@ -5244,7 +5244,7 @@ Y_UNIT_TEST_SUITE(TImportTests) { )"); env.TestWaitNotification(runtime, txId); - TestCreateCdcStreams(env, runtime, txId, "/MyRoot", 3); + TestCreateCdcStreams(env, runtime, txId, "/MyRoot", 3, isShouldSuccess); TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"( ExportToS3Settings { @@ -5270,7 +5270,15 @@ Y_UNIT_TEST_SUITE(TImportTests) { } )", port)); env.TestWaitNotification(runtime, txId); - TestGetImport(runtime, txId, "/MyRoot"); + TestGetImport(runtime, txId, "/MyRoot", isShouldSuccess ? Ydb::StatusIds::SUCCESS : Ydb::StatusIds::CANCELLED); + } + + Y_UNIT_TEST(ChangefeedsExportRestore) { + ChangefeedsExportRestore(true); + } + + Y_UNIT_TEST(ChangefeedsExportRestoreUnhappyPropose) { + ChangefeedsExportRestore(false); } Y_UNIT_TEST(IgnoreBasicSchemeLimits) { |