diff options
author | ilnaz <[email protected]> | 2023-07-03 15:13:47 +0300 |
---|---|---|
committer | ilnaz <[email protected]> | 2023-07-03 15:13:47 +0300 |
commit | efdeceaec166d448f790872dd64d27f74666f3c3 (patch) | |
tree | 983640de754dc737272accad6d2e63136c83da79 | |
parent | 96f32bb93c5d057ff875f56755cdb322ffaa30b7 (diff) |
Wait active & concurrent CopyTable() properly
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_export__create.cpp | 37 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_impl.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_impl.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_export.cpp | 67 |
4 files changed, 99 insertions, 8 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp index 210c80b84b4..5b00d9a828e 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp @@ -427,6 +427,10 @@ private: return InvalidTxId; } + if (!ItemPathId(Self, exportInfo, 0)) { + return InvalidTxId; + } + return path->LastTxId; } @@ -782,7 +786,7 @@ private: SubscribeTx(path->LastTxId); Y_VERIFY_DEBUG(itemIdx == Max<ui32>()); - Self->TxIdToExport[path->LastTxId] = {exportInfo->Id, itemIdx}; + Self->TxIdToDependentExport[path->LastTxId].insert(exportInfo->Id); } } @@ -861,23 +865,42 @@ private: << ": txId# " << record.GetTxId()); const auto txId = TTxId(record.GetTxId()); - if (!Self->TxIdToExport.contains(txId)) { + if (!Self->TxIdToExport.contains(txId) && !Self->TxIdToDependentExport.contains(txId)) { LOG_E("TExport::TTxProgress: OnNotifyResult received unknown txId" << ": txId# " << txId); return; } - ui64 id; - ui32 itemIdx; - std::tie(id, itemIdx) = Self->TxIdToExport.at(txId); + if (Self->TxIdToExport.contains(txId)) { + ui64 id; + ui32 itemIdx; + std::tie(id, itemIdx) = Self->TxIdToExport.at(txId); + + OnNotifyResult(txId, id, itemIdx, txc); + Self->TxIdToExport.erase(txId); + } + + if (Self->TxIdToDependentExport.contains(txId)) { + for (const auto id : Self->TxIdToDependentExport.at(txId)) { + OnNotifyResult(txId, id, Max<ui32>(), txc); + } + + Self->TxIdToDependentExport.erase(txId); + } + } + + void OnNotifyResult(TTxId txId, ui64 id, ui32 itemIdx, TTransactionContext& txc) { + LOG_D("TExport::TTxProgress: OnNotifyResult" + << ": txId# " << txId + << ", id# " << id + << ", itemIdx# " << itemIdx); + if (!Self->Exports.contains(id)) { LOG_E("TExport::TTxProgress: OnNotifyResult received unknown id" << ": id# " << id); return; } - Self->TxIdToExport.erase(txId); - TExportInfo::TPtr exportInfo = Self->Exports.at(id); NIceDb::TNiceDb db(txc.DB); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index aaee9be1bf8..36257559ce3 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -6014,7 +6014,7 @@ void TSchemeShard::Handle(TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev, const auto txId = TTxId(ev->Get()->Record.GetTxId()); - if (TxIdToExport.contains(txId)) { + if (TxIdToExport.contains(txId) || TxIdToDependentExport.contains(txId)) { return Execute(CreateTxProgressExport(ev), ctx); } else if (TxIdToImport.contains(txId)) { return Execute(CreateTxProgressImport(ev), ctx); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 349e1861b59..8d16565aab5 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -1045,6 +1045,7 @@ public: THashMap<ui64, TExportInfo::TPtr> Exports; THashMap<TString, TExportInfo::TPtr> ExportsByUid; THashMap<TTxId, std::pair<ui64, ui32>> TxIdToExport; + THashMap<TTxId, THashSet<ui64>> TxIdToDependentExport; void FromXxportInfo(NKikimrExport::TExport& exprt, const TExportInfo::TPtr exportInfo); diff --git a/ydb/core/tx/schemeshard/ut_export.cpp b/ydb/core/tx/schemeshard/ut_export.cpp index c545f9a4c0a..e182fdd94a5 100644 --- a/ydb/core/tx/schemeshard/ut_export.cpp +++ b/ydb/core/tx/schemeshard/ut_export.cpp @@ -1071,6 +1071,73 @@ partitioning_settings { TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::SUCCESS); } + Y_UNIT_TEST(ShouldSucceedOnConcurrentExport) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Utf8" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + TPortManager portManager; + const ui16 port = portManager.GetPort(); + + TS3Mock s3Mock({}, TS3Mock::TSettings(port)); + UNIT_ASSERT(s3Mock.Start()); + + TVector<THolder<IEventHandle>> copyTables; + auto origObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { + if (ev->GetTypeRewrite() == TEvSchemeShard::EvModifySchemeTransaction) { + const auto& record = ev->Get<TEvSchemeShard::TEvModifySchemeTransaction>()->Record; + if (record.GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpCreateConsistentCopyTables) { + copyTables.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }); + auto waitCopyTables = [&runtime, ©Tables](ui32 size) { + if (copyTables.size() != size) { + TDispatchOptions opts; + opts.FinalEvents.emplace_back([©Tables, size](IEventHandle&) -> bool { + return copyTables.size() == size; + }); + runtime.DispatchEvents(opts); + } + }; + + TVector<ui64> exportIds; + for (ui32 i = 1; i <= 3; ++i) { + exportIds.push_back(++txId); + TestExport(runtime, exportIds[i - 1], "/MyRoot", Sprintf(R"( + ExportToS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_path: "/MyRoot/Table" + destination_prefix: "Table%u" + } + } + )", port, i)); + waitCopyTables(i); + } + + runtime.SetObserverFunc(origObserver); + for (auto& ev : copyTables) { + runtime.Send(ev.Release(), 0, true); + } + + for (ui64 exportId : exportIds) { + env.TestWaitNotification(runtime, exportId); + TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::SUCCESS); + } + } + void ShouldCheckQuotas(const TSchemeLimits& limits, Ydb::StatusIds::StatusCode expectedFailStatus) { TPortManager portManager; const ui16 port = portManager.GetPort(); |