aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-07-04 00:47:01 +0300
committerilnaz <ilnaz@ydb.tech>2023-07-04 00:47:01 +0300
commit9872bd553bcf72e2661bf0598b67f10e4a254744 (patch)
tree4bcbd31f3cc92954dadd690d200b754e831e3f53
parent0521dca8fe30807d125366b9f169f25bae6c51aa (diff)
downloadydb-9872bd553bcf72e2661bf0598b67f10e4a254744.tar.gz
Do not intercept notifications when processing concurrent meta-operations
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp22
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_export__create.cpp20
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp22
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.h6
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_import__create.cpp20
-rw-r--r--ydb/core/tx/schemeshard/ut_export.cpp99
6 files changed, 146 insertions, 43 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp
index b58d3a500f..6d51b93e7e 100644
--- a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp
@@ -481,7 +481,7 @@ struct TSchemeShard::TIndexBuilder::TTxReply: public TSchemeShard::TIndexBuilder
private:
TEvTxAllocatorClient::TEvAllocateResult::TPtr AllocateResult;
TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr ModifyResult;
- TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr Notification;
+ TTxId CompletedTxId = InvalidTxId;
TEvDataShard::TEvBuildIndexProgressResponse::TPtr ShardProgress;
struct {
TIndexBuildId BuildIndexId;
@@ -502,9 +502,9 @@ public:
{
}
- explicit TTxReply(TSelf* self, TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& notification)
+ explicit TTxReply(TSelf* self, TTxId completedTxId)
: TSchemeShard::TIndexBuilder::TTxBase(self)
- , Notification(notification)
+ , CompletedTxId(completedTxId)
{
}
@@ -530,7 +530,7 @@ public:
return OnAllocation(txc, ctx);
} else if (ModifyResult) {
return OnModifyResult(txc, ctx);
- } else if (Notification) {
+ } else if (CompletedTxId) {
return OnNotification(txc, ctx);
} else if (ShardProgress) {
return OnProgress(txc, ctx);
@@ -773,12 +773,10 @@ public:
}
bool OnNotification(TTransactionContext& txc, const TActorContext&) {
- const auto& record = Notification->Get()->Record;
-
- const auto txId = TTxId(record.GetTxId());
+ const auto txId = CompletedTxId;
if (!Self->TxIdToIndexBuilds.contains(txId)) {
LOG_I("TTxReply : TEvNotifyTxCompletionResult superfluous message"
- << ", txId: " << record.GetTxId()
+ << ", txId: " << txId
<< ", buildInfoId not found");
return true;
}
@@ -788,10 +786,10 @@ public:
TIndexBuildInfo::TPtr buildInfo = Self->IndexBuilds.at(buildId);
LOG_I("TTxReply : TEvNotifyTxCompletionResult"
- << ", txId# " << record.GetTxId()
+ << ", txId# " << txId
<< ", buildInfoId: " << buildInfo->Id);
LOG_D("TTxReply : TEvNotifyTxCompletionResult"
- << ", txId# " << record.GetTxId()
+ << ", txId# " << txId
<< ", buildInfo: " << *buildInfo);
switch (buildInfo->State) {
@@ -1280,8 +1278,8 @@ ITransaction* TSchemeShard::CreateTxReply(TEvSchemeShard::TEvModifySchemeTransac
return new TIndexBuilder::TTxReply(this, modifyResult);
}
-ITransaction* TSchemeShard::CreateTxReply(TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& notification) {
- return new TIndexBuilder::TTxReply(this, notification);
+ITransaction* TSchemeShard::CreateTxReply(TTxId completedTxId) {
+ return new TIndexBuilder::TTxReply(this, completedTxId);
}
ITransaction* TSchemeShard::CreateTxReply(TEvDataShard::TEvBuildIndexProgressResponse::TPtr& progress) {
diff --git a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp
index 5b00d9a828..30ac978ff0 100644
--- a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp
@@ -226,7 +226,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
ui64 Id;
TEvTxAllocatorClient::TEvAllocateResult::TPtr AllocateResult = nullptr;
TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr ModifyResult = nullptr;
- TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr NotifyResult = nullptr;
+ TTxId CompletedTxId = InvalidTxId;
explicit TTxProgress(TSelf* self, ui64 id)
: TXxport::TTxBase(self)
@@ -246,9 +246,9 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
{
}
- explicit TTxProgress(TSelf* self, TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev)
+ explicit TTxProgress(TSelf* self, TTxId completedTxId)
: TXxport::TTxBase(self)
- , NotifyResult(ev)
+ , CompletedTxId(completedTxId)
{
}
@@ -263,7 +263,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
OnAllocateResult(txc, ctx);
} else if (ModifyResult) {
OnModifyResult(txc, ctx);
- } else if (NotifyResult) {
+ } else if (CompletedTxId) {
OnNotifyResult(txc, ctx);
} else {
Resume(txc, ctx);
@@ -858,13 +858,11 @@ private:
}
void OnNotifyResult(TTransactionContext& txc, const TActorContext&) {
- Y_VERIFY(NotifyResult);
- const auto& record = NotifyResult->Get()->Record;
-
+ Y_VERIFY(CompletedTxId);
LOG_D("TExport::TTxProgress: OnNotifyResult"
- << ": txId# " << record.GetTxId());
+ << ": txId# " << CompletedTxId);
- const auto txId = TTxId(record.GetTxId());
+ const auto txId = CompletedTxId;
if (!Self->TxIdToExport.contains(txId) && !Self->TxIdToDependentExport.contains(txId)) {
LOG_E("TExport::TTxProgress: OnNotifyResult received unknown txId"
<< ": txId# " << txId);
@@ -1000,8 +998,8 @@ ITransaction* TSchemeShard::CreateTxProgressExport(TEvSchemeShard::TEvModifySche
return new TExport::TTxProgress(this, ev);
}
-ITransaction* TSchemeShard::CreateTxProgressExport(TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev) {
- return new TExport::TTxProgress(this, ev);
+ITransaction* TSchemeShard::CreateTxProgressExport(TTxId completedTxId) {
+ return new TExport::TTxProgress(this, completedTxId);
}
} // NSchemeShard
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index 36257559ce..57a364418e 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -6013,13 +6013,23 @@ void TSchemeShard::Handle(TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev,
"Message:\n" << ev->Get()->Record.ShortDebugString());
const auto txId = TTxId(ev->Get()->Record.GetTxId());
-
+ bool executed = false;
+
if (TxIdToExport.contains(txId) || TxIdToDependentExport.contains(txId)) {
- return Execute(CreateTxProgressExport(ev), ctx);
- } else if (TxIdToImport.contains(txId)) {
- return Execute(CreateTxProgressImport(ev), ctx);
- } else if (TxIdToIndexBuilds.contains(txId)) {
- return Execute(CreateTxReply(ev), ctx);
+ Execute(CreateTxProgressExport(txId), ctx);
+ executed = true;
+ }
+ if (TxIdToImport.contains(txId)) {
+ Execute(CreateTxProgressImport(txId), ctx);
+ executed = true;
+ }
+ if (TxIdToIndexBuilds.contains(txId)) {
+ Execute(CreateTxReply(txId), ctx);
+ executed = true;
+ }
+
+ if (executed) {
+ return;
}
LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h
index 8d16565aab..639da0462f 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.h
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.h
@@ -1076,7 +1076,7 @@ public:
NTabletFlatExecutor::ITransaction* CreateTxProgressExport(ui64 id);
NTabletFlatExecutor::ITransaction* CreateTxProgressExport(TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev);
NTabletFlatExecutor::ITransaction* CreateTxProgressExport(TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& ev);
- NTabletFlatExecutor::ITransaction* CreateTxProgressExport(TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev);
+ NTabletFlatExecutor::ITransaction* CreateTxProgressExport(TTxId completedTxId);
void Handle(TEvExport::TEvCreateExportRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvExport::TEvGetExportRequest::TPtr& ev, const TActorContext& ctx);
@@ -1125,7 +1125,7 @@ public:
NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev);
NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& ev);
NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvIndexBuilder::TEvCreateResponse::TPtr& ev);
- NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev);
+ NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TTxId completedTxId);
void Handle(TEvImport::TEvCreateImportRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvImport::TEvGetImportRequest::TPtr& ev, const TActorContext& ctx);
@@ -1201,7 +1201,7 @@ public:
NTabletFlatExecutor::ITransaction* CreateTxProgress(TIndexBuildId id);
NTabletFlatExecutor::ITransaction* CreateTxReply(TEvTxAllocatorClient::TEvAllocateResult::TPtr& allocateResult);
NTabletFlatExecutor::ITransaction* CreateTxReply(TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& modifyResult);
- NTabletFlatExecutor::ITransaction* CreateTxReply(TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& modifyResult);
+ NTabletFlatExecutor::ITransaction* CreateTxReply(TTxId completedTxId);
NTabletFlatExecutor::ITransaction* CreateTxReply(TEvDataShard::TEvBuildIndexProgressResponse::TPtr& progress);
NTabletFlatExecutor::ITransaction* CreatePipeRetry(TIndexBuildId indexBuildId, TTabletId tabletId);
NTabletFlatExecutor::ITransaction* CreateTxBilling(TEvPrivate::TEvIndexBuildingMakeABill::TPtr& ev);
diff --git a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp
index a3a879f1d3..b47201c311 100644
--- a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp
@@ -224,7 +224,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
TEvTxAllocatorClient::TEvAllocateResult::TPtr AllocateResult = nullptr;
TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr ModifyResult = nullptr;
TEvIndexBuilder::TEvCreateResponse::TPtr CreateIndexResult = nullptr;
- TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr NotifyResult = nullptr;
+ TTxId CompletedTxId = InvalidTxId;
explicit TTxProgress(TSelf* self, ui64 id, const TMaybe<ui32>& itemIdx)
: TXxport::TTxBase(self)
@@ -257,9 +257,9 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
{
}
- explicit TTxProgress(TSelf* self, TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev)
+ explicit TTxProgress(TSelf* self, TTxId completedTxId)
: TXxport::TTxBase(self)
- , NotifyResult(ev)
+ , CompletedTxId(completedTxId)
{
}
@@ -278,7 +278,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
OnModifyResult(txc, ctx);
} else if (CreateIndexResult) {
OnCreateIndexResult(txc, ctx);
- } else if (NotifyResult) {
+ } else if (CompletedTxId) {
OnNotifyResult(txc, ctx);
} else {
Resume(txc, ctx);
@@ -908,13 +908,11 @@ private:
}
void OnNotifyResult(TTransactionContext& txc, const TActorContext&) {
- Y_VERIFY(NotifyResult);
- const auto& record = NotifyResult->Get()->Record;
-
+ Y_VERIFY(CompletedTxId);
LOG_D("TImport::TTxProgress: OnNotifyResult"
- << ": txId# " << record.GetTxId());
+ << ": txId# " << CompletedTxId);
- const auto txId = TTxId(record.GetTxId());
+ const auto txId = CompletedTxId;
if (!Self->TxIdToImport.contains(txId)) {
LOG_E("TImport::TTxProgress: OnNotifyResult received unknown txId"
<< ": txId# " << txId);
@@ -1018,8 +1016,8 @@ ITransaction* TSchemeShard::CreateTxProgressImport(TEvIndexBuilder::TEvCreateRes
return new TImport::TTxProgress(this, ev);
}
-ITransaction* TSchemeShard::CreateTxProgressImport(TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev) {
- return new TImport::TTxProgress(this, ev);
+ITransaction* TSchemeShard::CreateTxProgressImport(TTxId completedTxId) {
+ return new TImport::TTxProgress(this, completedTxId);
}
} // NSchemeShard
diff --git a/ydb/core/tx/schemeshard/ut_export.cpp b/ydb/core/tx/schemeshard/ut_export.cpp
index e182fdd94a..4c17ba3d2a 100644
--- a/ydb/core/tx/schemeshard/ut_export.cpp
+++ b/ydb/core/tx/schemeshard/ut_export.cpp
@@ -1138,6 +1138,105 @@ partitioning_settings {
}
}
+ Y_UNIT_TEST(ShouldSucceedOnConcurrentImport) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+ ui64 txId = 100;
+
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Utf8" }
+ Columns { Name: "value" Type: "Utf8" }
+ KeyColumnNames: ["key"]
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ TPortManager portManager;
+ const ui16 port = portManager.GetPort();
+
+ TS3Mock s3Mock({}, TS3Mock::TSettings(port));
+ UNIT_ASSERT(s3Mock.Start());
+
+ // prepare backup data
+ TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ ExportToS3Settings {
+ endpoint: "localhost:%d"
+ scheme: HTTP
+ items {
+ source_path: "/MyRoot/Table"
+ destination_prefix: "Backup1"
+ }
+ }
+ )", port));
+ env.TestWaitNotification(runtime, txId);
+ TestGetExport(runtime, txId, "/MyRoot");
+
+ TVector<THolder<IEventHandle>> delayed;
+ auto origObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ if (ev->GetTypeRewrite() == TEvSchemeShard::EvModifySchemeTransaction) {
+ const auto& record = ev->Get<TEvSchemeShard::TEvModifySchemeTransaction>()->Record;
+ const auto opType = record.GetTransaction(0).GetOperationType();
+ switch (opType) {
+ case NKikimrSchemeOp::ESchemeOpRestore:
+ case NKikimrSchemeOp::ESchemeOpCreateConsistentCopyTables:
+ delayed.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ default:
+ break;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+
+ auto waitForDelayed = [&runtime, &delayed](ui32 size) {
+ if (delayed.size() != size) {
+ TDispatchOptions opts;
+ opts.FinalEvents.emplace_back([&delayed, size](IEventHandle&) -> bool {
+ return delayed.size() == size;
+ });
+ runtime.DispatchEvents(opts);
+ }
+ };
+
+ const auto importId = ++txId;
+ TestImport(runtime, importId, "/MyRoot", Sprintf(R"(
+ ImportFromS3Settings {
+ endpoint: "localhost:%d"
+ scheme: HTTP
+ items {
+ source_prefix: "Backup1"
+ destination_path: "/MyRoot/Restored"
+ }
+ }
+ )", port));
+ // wait for restore op
+ waitForDelayed(1);
+
+ const auto exportId = ++txId;
+ TestExport(runtime, exportId, "/MyRoot", Sprintf(R"(
+ ExportToS3Settings {
+ endpoint: "localhost:%d"
+ scheme: HTTP
+ items {
+ source_path: "/MyRoot/Restored"
+ destination_prefix: "Backup2"
+ }
+ }
+ )", port));
+ // wait for copy table op
+ waitForDelayed(2);
+
+ runtime.SetObserverFunc(origObserver);
+ for (auto& ev : delayed) {
+ runtime.Send(ev.Release(), 0, true);
+ }
+
+ env.TestWaitNotification(runtime, importId);
+ TestGetImport(runtime, importId, "/MyRoot");
+ env.TestWaitNotification(runtime, exportId);
+ TestGetExport(runtime, exportId, "/MyRoot");
+ }
+
void ShouldCheckQuotas(const TSchemeLimits& limits, Ydb::StatusIds::StatusCode expectedFailStatus) {
TPortManager portManager;
const ui16 port = portManager.GetPort();