diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-07-04 00:47:01 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-07-04 00:47:01 +0300 |
commit | 9872bd553bcf72e2661bf0598b67f10e4a254744 (patch) | |
tree | 4bcbd31f3cc92954dadd690d200b754e831e3f53 | |
parent | 0521dca8fe30807d125366b9f169f25bae6c51aa (diff) | |
download | ydb-9872bd553bcf72e2661bf0598b67f10e4a254744.tar.gz |
Do not intercept notifications when processing concurrent meta-operations
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp | 22 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_export__create.cpp | 20 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_impl.cpp | 22 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_impl.h | 6 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_import__create.cpp | 20 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_export.cpp | 99 |
6 files changed, 146 insertions, 43 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp index b58d3a500f..6d51b93e7e 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp @@ -481,7 +481,7 @@ struct TSchemeShard::TIndexBuilder::TTxReply: public TSchemeShard::TIndexBuilder private: TEvTxAllocatorClient::TEvAllocateResult::TPtr AllocateResult; TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr ModifyResult; - TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr Notification; + TTxId CompletedTxId = InvalidTxId; TEvDataShard::TEvBuildIndexProgressResponse::TPtr ShardProgress; struct { TIndexBuildId BuildIndexId; @@ -502,9 +502,9 @@ public: { } - explicit TTxReply(TSelf* self, TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& notification) + explicit TTxReply(TSelf* self, TTxId completedTxId) : TSchemeShard::TIndexBuilder::TTxBase(self) - , Notification(notification) + , CompletedTxId(completedTxId) { } @@ -530,7 +530,7 @@ public: return OnAllocation(txc, ctx); } else if (ModifyResult) { return OnModifyResult(txc, ctx); - } else if (Notification) { + } else if (CompletedTxId) { return OnNotification(txc, ctx); } else if (ShardProgress) { return OnProgress(txc, ctx); @@ -773,12 +773,10 @@ public: } bool OnNotification(TTransactionContext& txc, const TActorContext&) { - const auto& record = Notification->Get()->Record; - - const auto txId = TTxId(record.GetTxId()); + const auto txId = CompletedTxId; if (!Self->TxIdToIndexBuilds.contains(txId)) { LOG_I("TTxReply : TEvNotifyTxCompletionResult superfluous message" - << ", txId: " << record.GetTxId() + << ", txId: " << txId << ", buildInfoId not found"); return true; } @@ -788,10 +786,10 @@ public: TIndexBuildInfo::TPtr buildInfo = Self->IndexBuilds.at(buildId); LOG_I("TTxReply : TEvNotifyTxCompletionResult" - << ", txId# " << record.GetTxId() + << ", txId# " << txId << ", buildInfoId: " << buildInfo->Id); LOG_D("TTxReply : TEvNotifyTxCompletionResult" - << ", txId# " << record.GetTxId() + << ", txId# " << txId << ", buildInfo: " << *buildInfo); switch (buildInfo->State) { @@ -1280,8 +1278,8 @@ ITransaction* TSchemeShard::CreateTxReply(TEvSchemeShard::TEvModifySchemeTransac return new TIndexBuilder::TTxReply(this, modifyResult); } -ITransaction* TSchemeShard::CreateTxReply(TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& notification) { - return new TIndexBuilder::TTxReply(this, notification); +ITransaction* TSchemeShard::CreateTxReply(TTxId completedTxId) { + return new TIndexBuilder::TTxReply(this, completedTxId); } ITransaction* TSchemeShard::CreateTxReply(TEvDataShard::TEvBuildIndexProgressResponse::TPtr& progress) { diff --git a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp index 5b00d9a828..30ac978ff0 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp @@ -226,7 +226,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase ui64 Id; TEvTxAllocatorClient::TEvAllocateResult::TPtr AllocateResult = nullptr; TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr ModifyResult = nullptr; - TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr NotifyResult = nullptr; + TTxId CompletedTxId = InvalidTxId; explicit TTxProgress(TSelf* self, ui64 id) : TXxport::TTxBase(self) @@ -246,9 +246,9 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase { } - explicit TTxProgress(TSelf* self, TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev) + explicit TTxProgress(TSelf* self, TTxId completedTxId) : TXxport::TTxBase(self) - , NotifyResult(ev) + , CompletedTxId(completedTxId) { } @@ -263,7 +263,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase OnAllocateResult(txc, ctx); } else if (ModifyResult) { OnModifyResult(txc, ctx); - } else if (NotifyResult) { + } else if (CompletedTxId) { OnNotifyResult(txc, ctx); } else { Resume(txc, ctx); @@ -858,13 +858,11 @@ private: } void OnNotifyResult(TTransactionContext& txc, const TActorContext&) { - Y_VERIFY(NotifyResult); - const auto& record = NotifyResult->Get()->Record; - + Y_VERIFY(CompletedTxId); LOG_D("TExport::TTxProgress: OnNotifyResult" - << ": txId# " << record.GetTxId()); + << ": txId# " << CompletedTxId); - const auto txId = TTxId(record.GetTxId()); + const auto txId = CompletedTxId; if (!Self->TxIdToExport.contains(txId) && !Self->TxIdToDependentExport.contains(txId)) { LOG_E("TExport::TTxProgress: OnNotifyResult received unknown txId" << ": txId# " << txId); @@ -1000,8 +998,8 @@ ITransaction* TSchemeShard::CreateTxProgressExport(TEvSchemeShard::TEvModifySche return new TExport::TTxProgress(this, ev); } -ITransaction* TSchemeShard::CreateTxProgressExport(TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev) { - return new TExport::TTxProgress(this, ev); +ITransaction* TSchemeShard::CreateTxProgressExport(TTxId completedTxId) { + return new TExport::TTxProgress(this, completedTxId); } } // NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 36257559ce..57a364418e 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -6013,13 +6013,23 @@ void TSchemeShard::Handle(TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev, "Message:\n" << ev->Get()->Record.ShortDebugString()); const auto txId = TTxId(ev->Get()->Record.GetTxId()); - + bool executed = false; + if (TxIdToExport.contains(txId) || TxIdToDependentExport.contains(txId)) { - return Execute(CreateTxProgressExport(ev), ctx); - } else if (TxIdToImport.contains(txId)) { - return Execute(CreateTxProgressImport(ev), ctx); - } else if (TxIdToIndexBuilds.contains(txId)) { - return Execute(CreateTxReply(ev), ctx); + Execute(CreateTxProgressExport(txId), ctx); + executed = true; + } + if (TxIdToImport.contains(txId)) { + Execute(CreateTxProgressImport(txId), ctx); + executed = true; + } + if (TxIdToIndexBuilds.contains(txId)) { + Execute(CreateTxReply(txId), ctx); + executed = true; + } + + if (executed) { + return; } LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 8d16565aab..639da0462f 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -1076,7 +1076,7 @@ public: NTabletFlatExecutor::ITransaction* CreateTxProgressExport(ui64 id); NTabletFlatExecutor::ITransaction* CreateTxProgressExport(TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev); NTabletFlatExecutor::ITransaction* CreateTxProgressExport(TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& ev); - NTabletFlatExecutor::ITransaction* CreateTxProgressExport(TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev); + NTabletFlatExecutor::ITransaction* CreateTxProgressExport(TTxId completedTxId); void Handle(TEvExport::TEvCreateExportRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvExport::TEvGetExportRequest::TPtr& ev, const TActorContext& ctx); @@ -1125,7 +1125,7 @@ public: NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev); NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& ev); NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvIndexBuilder::TEvCreateResponse::TPtr& ev); - NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev); + NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TTxId completedTxId); void Handle(TEvImport::TEvCreateImportRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvImport::TEvGetImportRequest::TPtr& ev, const TActorContext& ctx); @@ -1201,7 +1201,7 @@ public: NTabletFlatExecutor::ITransaction* CreateTxProgress(TIndexBuildId id); NTabletFlatExecutor::ITransaction* CreateTxReply(TEvTxAllocatorClient::TEvAllocateResult::TPtr& allocateResult); NTabletFlatExecutor::ITransaction* CreateTxReply(TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& modifyResult); - NTabletFlatExecutor::ITransaction* CreateTxReply(TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& modifyResult); + NTabletFlatExecutor::ITransaction* CreateTxReply(TTxId completedTxId); NTabletFlatExecutor::ITransaction* CreateTxReply(TEvDataShard::TEvBuildIndexProgressResponse::TPtr& progress); NTabletFlatExecutor::ITransaction* CreatePipeRetry(TIndexBuildId indexBuildId, TTabletId tabletId); NTabletFlatExecutor::ITransaction* CreateTxBilling(TEvPrivate::TEvIndexBuildingMakeABill::TPtr& ev); diff --git a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp index a3a879f1d3..b47201c311 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp @@ -224,7 +224,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase TEvTxAllocatorClient::TEvAllocateResult::TPtr AllocateResult = nullptr; TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr ModifyResult = nullptr; TEvIndexBuilder::TEvCreateResponse::TPtr CreateIndexResult = nullptr; - TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr NotifyResult = nullptr; + TTxId CompletedTxId = InvalidTxId; explicit TTxProgress(TSelf* self, ui64 id, const TMaybe<ui32>& itemIdx) : TXxport::TTxBase(self) @@ -257,9 +257,9 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase { } - explicit TTxProgress(TSelf* self, TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev) + explicit TTxProgress(TSelf* self, TTxId completedTxId) : TXxport::TTxBase(self) - , NotifyResult(ev) + , CompletedTxId(completedTxId) { } @@ -278,7 +278,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase OnModifyResult(txc, ctx); } else if (CreateIndexResult) { OnCreateIndexResult(txc, ctx); - } else if (NotifyResult) { + } else if (CompletedTxId) { OnNotifyResult(txc, ctx); } else { Resume(txc, ctx); @@ -908,13 +908,11 @@ private: } void OnNotifyResult(TTransactionContext& txc, const TActorContext&) { - Y_VERIFY(NotifyResult); - const auto& record = NotifyResult->Get()->Record; - + Y_VERIFY(CompletedTxId); LOG_D("TImport::TTxProgress: OnNotifyResult" - << ": txId# " << record.GetTxId()); + << ": txId# " << CompletedTxId); - const auto txId = TTxId(record.GetTxId()); + const auto txId = CompletedTxId; if (!Self->TxIdToImport.contains(txId)) { LOG_E("TImport::TTxProgress: OnNotifyResult received unknown txId" << ": txId# " << txId); @@ -1018,8 +1016,8 @@ ITransaction* TSchemeShard::CreateTxProgressImport(TEvIndexBuilder::TEvCreateRes return new TImport::TTxProgress(this, ev); } -ITransaction* TSchemeShard::CreateTxProgressImport(TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev) { - return new TImport::TTxProgress(this, ev); +ITransaction* TSchemeShard::CreateTxProgressImport(TTxId completedTxId) { + return new TImport::TTxProgress(this, completedTxId); } } // NSchemeShard diff --git a/ydb/core/tx/schemeshard/ut_export.cpp b/ydb/core/tx/schemeshard/ut_export.cpp index e182fdd94a..4c17ba3d2a 100644 --- a/ydb/core/tx/schemeshard/ut_export.cpp +++ b/ydb/core/tx/schemeshard/ut_export.cpp @@ -1138,6 +1138,105 @@ partitioning_settings { } } + Y_UNIT_TEST(ShouldSucceedOnConcurrentImport) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Utf8" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + TPortManager portManager; + const ui16 port = portManager.GetPort(); + + TS3Mock s3Mock({}, TS3Mock::TSettings(port)); + UNIT_ASSERT(s3Mock.Start()); + + // prepare backup data + TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"( + ExportToS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_path: "/MyRoot/Table" + destination_prefix: "Backup1" + } + } + )", port)); + env.TestWaitNotification(runtime, txId); + TestGetExport(runtime, txId, "/MyRoot"); + + TVector<THolder<IEventHandle>> delayed; + auto origObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { + if (ev->GetTypeRewrite() == TEvSchemeShard::EvModifySchemeTransaction) { + const auto& record = ev->Get<TEvSchemeShard::TEvModifySchemeTransaction>()->Record; + const auto opType = record.GetTransaction(0).GetOperationType(); + switch (opType) { + case NKikimrSchemeOp::ESchemeOpRestore: + case NKikimrSchemeOp::ESchemeOpCreateConsistentCopyTables: + delayed.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + default: + break; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }); + + auto waitForDelayed = [&runtime, &delayed](ui32 size) { + if (delayed.size() != size) { + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&delayed, size](IEventHandle&) -> bool { + return delayed.size() == size; + }); + runtime.DispatchEvents(opts); + } + }; + + const auto importId = ++txId; + TestImport(runtime, importId, "/MyRoot", Sprintf(R"( + ImportFromS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_prefix: "Backup1" + destination_path: "/MyRoot/Restored" + } + } + )", port)); + // wait for restore op + waitForDelayed(1); + + const auto exportId = ++txId; + TestExport(runtime, exportId, "/MyRoot", Sprintf(R"( + ExportToS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_path: "/MyRoot/Restored" + destination_prefix: "Backup2" + } + } + )", port)); + // wait for copy table op + waitForDelayed(2); + + runtime.SetObserverFunc(origObserver); + for (auto& ev : delayed) { + runtime.Send(ev.Release(), 0, true); + } + + env.TestWaitNotification(runtime, importId); + TestGetImport(runtime, importId, "/MyRoot"); + env.TestWaitNotification(runtime, exportId); + TestGetExport(runtime, exportId, "/MyRoot"); + } + void ShouldCheckQuotas(const TSchemeLimits& limits, Ydb::StatusIds::StatusCode expectedFailStatus) { TPortManager portManager; const ui16 port = portManager.GetPort(); |