summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <[email protected]>2023-07-03 15:13:47 +0300
committerilnaz <[email protected]>2023-07-03 15:13:47 +0300
commitefdeceaec166d448f790872dd64d27f74666f3c3 (patch)
tree983640de754dc737272accad6d2e63136c83da79
parent96f32bb93c5d057ff875f56755cdb322ffaa30b7 (diff)
Wait active & concurrent CopyTable() properly
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_export__create.cpp37
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.h1
-rw-r--r--ydb/core/tx/schemeshard/ut_export.cpp67
4 files changed, 99 insertions, 8 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp
index 210c80b84b4..5b00d9a828e 100644
--- a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp
@@ -427,6 +427,10 @@ private:
return InvalidTxId;
}
+ if (!ItemPathId(Self, exportInfo, 0)) {
+ return InvalidTxId;
+ }
+
return path->LastTxId;
}
@@ -782,7 +786,7 @@ private:
SubscribeTx(path->LastTxId);
Y_VERIFY_DEBUG(itemIdx == Max<ui32>());
- Self->TxIdToExport[path->LastTxId] = {exportInfo->Id, itemIdx};
+ Self->TxIdToDependentExport[path->LastTxId].insert(exportInfo->Id);
}
}
@@ -861,23 +865,42 @@ private:
<< ": txId# " << record.GetTxId());
const auto txId = TTxId(record.GetTxId());
- if (!Self->TxIdToExport.contains(txId)) {
+ if (!Self->TxIdToExport.contains(txId) && !Self->TxIdToDependentExport.contains(txId)) {
LOG_E("TExport::TTxProgress: OnNotifyResult received unknown txId"
<< ": txId# " << txId);
return;
}
- ui64 id;
- ui32 itemIdx;
- std::tie(id, itemIdx) = Self->TxIdToExport.at(txId);
+ if (Self->TxIdToExport.contains(txId)) {
+ ui64 id;
+ ui32 itemIdx;
+ std::tie(id, itemIdx) = Self->TxIdToExport.at(txId);
+
+ OnNotifyResult(txId, id, itemIdx, txc);
+ Self->TxIdToExport.erase(txId);
+ }
+
+ if (Self->TxIdToDependentExport.contains(txId)) {
+ for (const auto id : Self->TxIdToDependentExport.at(txId)) {
+ OnNotifyResult(txId, id, Max<ui32>(), txc);
+ }
+
+ Self->TxIdToDependentExport.erase(txId);
+ }
+ }
+
+ void OnNotifyResult(TTxId txId, ui64 id, ui32 itemIdx, TTransactionContext& txc) {
+ LOG_D("TExport::TTxProgress: OnNotifyResult"
+ << ": txId# " << txId
+ << ", id# " << id
+ << ", itemIdx# " << itemIdx);
+
if (!Self->Exports.contains(id)) {
LOG_E("TExport::TTxProgress: OnNotifyResult received unknown id"
<< ": id# " << id);
return;
}
- Self->TxIdToExport.erase(txId);
-
TExportInfo::TPtr exportInfo = Self->Exports.at(id);
NIceDb::TNiceDb db(txc.DB);
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index aaee9be1bf8..36257559ce3 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -6014,7 +6014,7 @@ void TSchemeShard::Handle(TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev,
const auto txId = TTxId(ev->Get()->Record.GetTxId());
- if (TxIdToExport.contains(txId)) {
+ if (TxIdToExport.contains(txId) || TxIdToDependentExport.contains(txId)) {
return Execute(CreateTxProgressExport(ev), ctx);
} else if (TxIdToImport.contains(txId)) {
return Execute(CreateTxProgressImport(ev), ctx);
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h
index 349e1861b59..8d16565aab5 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.h
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.h
@@ -1045,6 +1045,7 @@ public:
THashMap<ui64, TExportInfo::TPtr> Exports;
THashMap<TString, TExportInfo::TPtr> ExportsByUid;
THashMap<TTxId, std::pair<ui64, ui32>> TxIdToExport;
+ THashMap<TTxId, THashSet<ui64>> TxIdToDependentExport;
void FromXxportInfo(NKikimrExport::TExport& exprt, const TExportInfo::TPtr exportInfo);
diff --git a/ydb/core/tx/schemeshard/ut_export.cpp b/ydb/core/tx/schemeshard/ut_export.cpp
index c545f9a4c0a..e182fdd94a5 100644
--- a/ydb/core/tx/schemeshard/ut_export.cpp
+++ b/ydb/core/tx/schemeshard/ut_export.cpp
@@ -1071,6 +1071,73 @@ partitioning_settings {
TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::SUCCESS);
}
+ Y_UNIT_TEST(ShouldSucceedOnConcurrentExport) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+ ui64 txId = 100;
+
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Utf8" }
+ Columns { Name: "value" Type: "Utf8" }
+ KeyColumnNames: ["key"]
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ TPortManager portManager;
+ const ui16 port = portManager.GetPort();
+
+ TS3Mock s3Mock({}, TS3Mock::TSettings(port));
+ UNIT_ASSERT(s3Mock.Start());
+
+ TVector<THolder<IEventHandle>> copyTables;
+ auto origObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ if (ev->GetTypeRewrite() == TEvSchemeShard::EvModifySchemeTransaction) {
+ const auto& record = ev->Get<TEvSchemeShard::TEvModifySchemeTransaction>()->Record;
+ if (record.GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpCreateConsistentCopyTables) {
+ copyTables.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+ auto waitCopyTables = [&runtime, &copyTables](ui32 size) {
+ if (copyTables.size() != size) {
+ TDispatchOptions opts;
+ opts.FinalEvents.emplace_back([&copyTables, size](IEventHandle&) -> bool {
+ return copyTables.size() == size;
+ });
+ runtime.DispatchEvents(opts);
+ }
+ };
+
+ TVector<ui64> exportIds;
+ for (ui32 i = 1; i <= 3; ++i) {
+ exportIds.push_back(++txId);
+ TestExport(runtime, exportIds[i - 1], "/MyRoot", Sprintf(R"(
+ ExportToS3Settings {
+ endpoint: "localhost:%d"
+ scheme: HTTP
+ items {
+ source_path: "/MyRoot/Table"
+ destination_prefix: "Table%u"
+ }
+ }
+ )", port, i));
+ waitCopyTables(i);
+ }
+
+ runtime.SetObserverFunc(origObserver);
+ for (auto& ev : copyTables) {
+ runtime.Send(ev.Release(), 0, true);
+ }
+
+ for (ui64 exportId : exportIds) {
+ env.TestWaitNotification(runtime, exportId);
+ TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::SUCCESS);
+ }
+ }
+
void ShouldCheckQuotas(const TSchemeLimits& limits, Ydb::StatusIds::StatusCode expectedFailStatus) {
TPortManager portManager;
const ui16 port = portManager.GetPort();