aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorstanislav_shchetinin <st-shchetinin@ydb.tech>2025-03-14 15:23:03 +0300
committerGitHub <noreply@github.com>2025-03-14 12:23:03 +0000
commit0a97f4cb25aaf11d7d4c67e36ffd89d53a020095 (patch)
tree6609602e5ccfc4848e60ac46d435c2b733c99c09
parent2bcfa98fde6424e45d5f162360b439e34cc5c45a (diff)
downloadydb-0a97f4cb25aaf11d7d4c67e36ffd89d53a020095.tar.gz
Сancellation of the operation if CreateChangefeedPropose returns nullptr (#15713)
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_import__create.cpp17
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp9
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.h3
-rw-r--r--ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp20
4 files changed, 33 insertions, 16 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp
index c9dd2323e3..8dd8523e83 100644
--- a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp
@@ -526,7 +526,7 @@ private:
return true;
}
- void CreateChangefeed(TImportInfo::TPtr importInfo, ui32 itemIdx, TTxId txId) {
+ bool CreateChangefeed(TImportInfo::TPtr importInfo, ui32 itemIdx, TTxId txId, TString& error) {
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
auto& item = importInfo->Items.at(itemIdx);
item.SubState = ESubState::Proposed;
@@ -537,11 +537,14 @@ private:
<< ", txId# " << txId);
Y_ABORT_UNLESS(item.WaitTxId == InvalidTxId);
-
- auto propose = CreateChangefeedPropose(Self, txId, item);
- Y_ABORT_UNLESS(propose);
+
+ auto propose = CreateChangefeedPropose(Self, txId, item, error);
+ if (!propose) {
+ return false;
+ }
Send(Self->SelfId(), std::move(propose));
+ return true;
}
void CreateConsumers(TImportInfo::TPtr importInfo, ui32 itemIdx, TTxId txId) {
@@ -1080,7 +1083,11 @@ private:
case EState::CreateChangefeed:
if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) {
- CreateChangefeed(importInfo, i, txId);
+ TString error;
+ if (!CreateChangefeed(importInfo, i, txId, error)) {
+ NIceDb::TNiceDb db(txc.DB);
+ CancelAndPersist(db, importInfo, i, error, "creation changefeed failed");
+ }
} else {
CreateConsumers(importInfo, i, txId);
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp b/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp
index da36478b7a..602ddf438e 100644
--- a/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp
@@ -237,7 +237,8 @@ THolder<TEvIndexBuilder::TEvCancelRequest> CancelIndexBuildPropose(
THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose(
TSchemeShard* ss,
TTxId txId,
- const TImportInfo::TItem& item
+ const TImportInfo::TItem& item,
+ TString& error
) {
Y_ABORT_UNLESS(item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size());
@@ -255,10 +256,8 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose(
modifyScheme.SetWorkingDir(dstPath.Parent().PathString());
cdcStream.SetTableName(dstPath.LeafName());
- TString error;
- Ydb::StatusIds::StatusCode status;
-
auto& cdcStreamDescription = *cdcStream.MutableStreamDescription();
+ Ydb::StatusIds::StatusCode status;
if (!FillChangefeedDescription(cdcStreamDescription, changefeed, status, error)) {
return nullptr;
}
@@ -271,6 +270,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose(
i64 minActivePartitions =
topic.partitioning_settings().min_active_partitions();
if (minActivePartitions < 0) {
+ error = "minActivePartitions must be >= 0";
return nullptr;
} else if (minActivePartitions == 0) {
minActivePartitions = 1;
@@ -284,6 +284,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose(
i64 maxActivePartitions =
topic.partitioning_settings().max_active_partitions();
if (maxActivePartitions < 0) {
+ error = "maxActivePartitions must be >= 0";
return nullptr;
} else if (maxActivePartitions == 0) {
maxActivePartitions = 50;
diff --git a/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.h b/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.h
index 107db1b34c..6d1615f0cf 100644
--- a/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.h
+++ b/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.h
@@ -49,7 +49,8 @@ THolder<TEvIndexBuilder::TEvCancelRequest> CancelIndexBuildPropose(
THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose(
TSchemeShard* ss,
TTxId txId,
- const TImportInfo::TItem& item
+ const TImportInfo::TItem& item,
+ TString& error
);
THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateConsumersPropose(
diff --git a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp
index b015ab6f4c..b6c32293ae 100644
--- a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp
+++ b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp
@@ -5203,21 +5203,21 @@ Y_UNIT_TEST_SUITE(TImportTests) {
TestImportChangefeeds(3, AddedSchemeWithPermissions);
}
- void TestCreateCdcStreams(TTestEnv& env, TTestActorRuntime& runtime, ui64& txId, const TString& dbName, ui64 count) {
+ void TestCreateCdcStreams(TTestEnv& env, TTestActorRuntime& runtime, ui64& txId, const TString& dbName, ui64 count, bool isShouldSuccess) {
for (ui64 i = 1; i <= count; ++i) {
TestCreateCdcStream(runtime, ++txId, dbName, Sprintf(R"(
TableName: "Original"
StreamDescription {
Name: "Stream_%d"
Mode: ECdcStreamModeKeysOnly
- Format: ECdcStreamFormatJson
+ Format: %s
}
- )", i));
+ )", i, isShouldSuccess ? "ECdcStreamFormatJson" : "ECdcStreamFormatProto"));
env.TestWaitNotification(runtime, txId);
}
}
- Y_UNIT_TEST(ChangefeedsExportRestore) {
+ void ChangefeedsExportRestore(bool isShouldSuccess) {
TPortManager portManager;
const ui16 port = portManager.GetPort();
@@ -5244,7 +5244,7 @@ Y_UNIT_TEST_SUITE(TImportTests) {
)");
env.TestWaitNotification(runtime, txId);
- TestCreateCdcStreams(env, runtime, txId, "/MyRoot", 3);
+ TestCreateCdcStreams(env, runtime, txId, "/MyRoot", 3, isShouldSuccess);
TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
ExportToS3Settings {
@@ -5270,7 +5270,15 @@ Y_UNIT_TEST_SUITE(TImportTests) {
}
)", port));
env.TestWaitNotification(runtime, txId);
- TestGetImport(runtime, txId, "/MyRoot");
+ TestGetImport(runtime, txId, "/MyRoot", isShouldSuccess ? Ydb::StatusIds::SUCCESS : Ydb::StatusIds::CANCELLED);
+ }
+
+ Y_UNIT_TEST(ChangefeedsExportRestore) {
+ ChangefeedsExportRestore(true);
+ }
+
+ Y_UNIT_TEST(ChangefeedsExportRestoreUnhappyPropose) {
+ ChangefeedsExportRestore(false);
}
Y_UNIT_TEST(IgnoreBasicSchemeLimits) {