diff options
author | snaury <snaury@ydb.tech> | 2023-01-27 14:03:05 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-01-27 14:03:05 +0300 |
commit | 180cad240c3e53366d3dc63f9be43b5eedbd8476 (patch) | |
tree | 247eb3e45eccdedc5be9979db80c93c5c679b59e | |
parent | 31ca75033f014b6aeb68f0bf407bd07db926450c (diff) | |
download | ydb-180cad240c3e53366d3dc63f9be43b5eedbd8476.tar.gz |
Wait for volatile transactions to commit before scheme operations and split/merges
-rw-r--r-- | ydb/core/tx/datashard/build_and_wait_dependencies_unit.cpp | 28 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_split_src.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_volatile.cpp | 161 | ||||
-rw-r--r-- | ydb/core/tx/datashard/volatile_tx.cpp | 8 | ||||
-rw-r--r-- | ydb/core/tx/datashard/volatile_tx.h | 2 |
5 files changed, 197 insertions, 4 deletions
diff --git a/ydb/core/tx/datashard/build_and_wait_dependencies_unit.cpp b/ydb/core/tx/datashard/build_and_wait_dependencies_unit.cpp index 7eb786866a9..bd55d64eb20 100644 --- a/ydb/core/tx/datashard/build_and_wait_dependencies_unit.cpp +++ b/ydb/core/tx/datashard/build_and_wait_dependencies_unit.cpp @@ -23,6 +23,7 @@ public: private: void BuildDependencies(const TOperation::TPtr &op); + bool BuildVolatileDependencies(const TOperation::TPtr &op); }; TBuildAndWaitDependenciesUnit::TBuildAndWaitDependenciesUnit(TDataShard &dataShard, @@ -42,7 +43,9 @@ bool TBuildAndWaitDependenciesUnit::HasDirectBlockers(const TOperation::TPtr& op { Y_VERIFY_DEBUG(op->IsWaitingDependencies()); - return !op->GetDependencies().empty() || !op->GetSpecialDependencies().empty(); + return !op->GetDependencies().empty() + || !op->GetSpecialDependencies().empty() + || op->HasVolatileDependencies(); } bool TBuildAndWaitDependenciesUnit::IsReadyToExecute(TOperation::TPtr op) const @@ -66,6 +69,7 @@ EExecutionStatus TBuildAndWaitDependenciesUnit::Execute(TOperation::TPtr op, // Build dependencies if not yet. if (!op->IsWaitingDependencies()) { BuildDependencies(op); + BuildVolatileDependencies(op); // After dependencies are built we can add operation to active ops. // For planned operations it means we can load and process the next @@ -107,6 +111,10 @@ EExecutionStatus TBuildAndWaitDependenciesUnit::Execute(TOperation::TPtr op, return EExecutionStatus::Continue; } + } else if (BuildVolatileDependencies(op)) { + // We acquired new volatile dependencies, wait for them too + Y_VERIFY(!IsReadyToExecute(op)); + return EExecutionStatus::Continue; } DataShard.IncCounter(COUNTER_WAIT_DEPENDENCIES_LATENCY_MS, op->GetCurrentElapsedAndReset().MilliSeconds()); @@ -168,6 +176,24 @@ void TBuildAndWaitDependenciesUnit::BuildDependencies(const TOperation::TPtr &op op->ResetCurrentTimer(); } +bool TBuildAndWaitDependenciesUnit::BuildVolatileDependencies(const TOperation::TPtr &op) { + // Scheme operations need to wait for all volatile transactions below them + if (op->IsSchemeTx()) { + TRowVersion current(op->GetStep(), op->GetTxId()); + for (auto* info : DataShard.GetVolatileTxManager().GetVolatileTxByVersion()) { + if (current < info->Version) { + break; + } + op->AddVolatileDependency(info->TxId); + bool added = DataShard.GetVolatileTxManager() + .AttachWaitingRemovalOperation(info->TxId, op->GetTxId()); + Y_VERIFY(added); + } + } + + return op->HasVolatileDependencies(); +} + void TBuildAndWaitDependenciesUnit::Complete(TOperation::TPtr, const TActorContext &) { diff --git a/ydb/core/tx/datashard/datashard_split_src.cpp b/ydb/core/tx/datashard/datashard_split_src.cpp index f762801c221..8fbcf1af960 100644 --- a/ydb/core/tx/datashard/datashard_split_src.cpp +++ b/ydb/core/tx/datashard/datashard_split_src.cpp @@ -96,7 +96,7 @@ public: void TDataShard::CheckSplitCanStart(const TActorContext& ctx) { if (State == TShardState::SplitSrcWaitForNoTxInFlight) { - ui64 txInFly = TxInFly(); + ui64 txInFly = TxInFly() + VolatileTxManager.GetTxInFlight(); ui64 immediateTxInFly = ImmediateInFly(); SetCounter(COUNTER_SPLIT_SRC_WAIT_TX_IN_FLY, txInFly); SetCounter(COUNTER_SPLIT_SRC_WAIT_IMMEDIATE_TX_IN_FLY, immediateTxInFly); diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp index 25e719470d9..40fd19086fa 100644 --- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp +++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp @@ -787,6 +787,167 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) { "{ items { uint32_value: 20 } items { uint32_value: 20 } items { null_flag_value: NULL_VALUE } }"); } + Y_UNIT_TEST(DistributedWriteThenCopyTable) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetDomainPlanResolution(1000); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + CreateShardedTable(server, sender, "/Root", "table-1", 1); + CreateShardedTable(server, sender, "/Root", "table-2", 1); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);"); + ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10);"); + + size_t observedPropose = 0; + TVector<THolder<IEventHandle>> capturedReadSets; + auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto { + switch (ev->GetTypeRewrite()) { + case TEvDataShard::TEvProposeTransaction::EventType: { + ++observedPropose; + Cerr << "... observed TEvProposeTransaction" << Endl; + break; + } + case TEvTxProcessing::TEvReadSet::EventType: { + const auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>(); + Cerr << "... captured TEvReadSet for " << msg->Record.GetTabletDest() << Endl; + capturedReadSets.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + auto prevObserverFunc = runtime.SetObserverFunc(captureEvents); + + runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true); + + TString sessionId = CreateSessionRPC(runtime, "/Root"); + + auto future = SendRequest(runtime, MakeSimpleRequestRPC(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2); + UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20); + )", sessionId, "", true /* commitTx */), "/Root"); + + WaitFor(runtime, [&]{ return capturedReadSets.size() >= 4; }, "captured readsets"); + UNIT_ASSERT_VALUES_EQUAL(capturedReadSets.size(), 4u); + + runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false); + + observedPropose = 0; + ui64 txId = AsyncCreateCopyTable(server, sender, "/Root", "table-1-copy", "/Root/table-1"); + WaitFor(runtime, [&]{ return observedPropose > 0; }, "observed propose"); + + SimulateSleep(runtime, TDuration::Seconds(1)); + + runtime.SetObserverFunc(prevObserverFunc); + for (auto& ev : capturedReadSets) { + runtime.Send(ev.Release(), 0, true); + } + + // Wait for copy table to finish + WaitTxNotification(server, sender, txId); + + // Verify table copy has above changes committed + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, R"( + SELECT key, value FROM `/Root/table-1-copy` + ORDER BY key + )"), + "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 2 } }"); + } + + Y_UNIT_TEST(DistributedWriteThenSplit) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetDomainPlanResolution(1000); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + CreateShardedTable(server, sender, "/Root", "table-1", 1); + CreateShardedTable(server, sender, "/Root", "table-2", 1); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);"); + ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10);"); + + size_t observedSplit = 0; + TVector<THolder<IEventHandle>> capturedReadSets; + auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto { + switch (ev->GetTypeRewrite()) { + case TEvDataShard::TEvSplit::EventType: { + ++observedSplit; + Cerr << "... observed TEvSplit" << Endl; + break; + } + case TEvTxProcessing::TEvReadSet::EventType: { + const auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>(); + Cerr << "... captured TEvReadSet for " << msg->Record.GetTabletDest() << Endl; + capturedReadSets.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + auto prevObserverFunc = runtime.SetObserverFunc(captureEvents); + + runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true); + + TString sessionId = CreateSessionRPC(runtime, "/Root"); + + auto future = SendRequest(runtime, MakeSimpleRequestRPC(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2); + UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20); + )", sessionId, "", true /* commitTx */), "/Root"); + + WaitFor(runtime, [&]{ return capturedReadSets.size() >= 4; }, "captured readsets"); + UNIT_ASSERT_VALUES_EQUAL(capturedReadSets.size(), 4u); + + runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false); + + SetSplitMergePartCountLimit(server->GetRuntime(), -1); + auto shards1before = GetTableShards(server, sender, "/Root/table-1"); + ui64 txId = AsyncSplitTable(server, sender, "/Root/table-1", shards1before.at(0), 2); + WaitFor(runtime, [&]{ return observedSplit > 0; }, "observed split"); + + SimulateSleep(runtime, TDuration::Seconds(1)); + + runtime.SetObserverFunc(prevObserverFunc); + for (auto& ev : capturedReadSets) { + runtime.Send(ev.Release(), 0, true); + } + + // Wait for split to finish + WaitTxNotification(server, sender, txId); + + // Verify table has changes committed + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )"), + "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 2 } }"); + } + } // Y_UNIT_TEST_SUITE(DataShardVolatile) } // namespace NKikimr diff --git a/ydb/core/tx/datashard/volatile_tx.cpp b/ydb/core/tx/datashard/volatile_tx.cpp index 30c87fd16b7..9d5a41d2346 100644 --- a/ydb/core/tx/datashard/volatile_tx.cpp +++ b/ydb/core/tx/datashard/volatile_tx.cpp @@ -13,7 +13,7 @@ namespace NKikimr::NDataShard { TTxType GetTxType() const override { return TXTYPE_VOLATILE_TX_COMMIT; } - bool Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext&) override { + bool Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) override { Y_VERIFY(Self->VolatileTxManager.PendingCommitTxScheduled); Self->VolatileTxManager.PendingCommitTxScheduled = false; @@ -45,6 +45,8 @@ namespace NKikimr::NDataShard { Self->VolatileTxManager.RemoveFromTxMap(info); Self->VolatileTxManager.RemoveVolatileTx(TxId); + + Self->CheckSplitCanStart(ctx); return true; } @@ -95,7 +97,7 @@ namespace NKikimr::NDataShard { return true; } - void Complete(const TActorContext&) override { + void Complete(const TActorContext& ctx) override { auto* info = Self->VolatileTxManager.FindByTxId(TxId); Y_VERIFY(info && info->State == EVolatileTxState::Aborting); @@ -107,6 +109,8 @@ namespace NKikimr::NDataShard { Self->VolatileTxManager.RemoveFromTxMap(info); Self->VolatileTxManager.RemoveVolatileTx(TxId); + + Self->CheckSplitCanStart(ctx); } private: diff --git a/ydb/core/tx/datashard/volatile_tx.h b/ydb/core/tx/datashard/volatile_tx.h index ef05ee9c896..154ad150308 100644 --- a/ydb/core/tx/datashard/volatile_tx.h +++ b/ydb/core/tx/datashard/volatile_tx.h @@ -147,6 +147,8 @@ namespace NKikimr::NDataShard { TVolatileTxInfo* FindByTxId(ui64 txId) const; TVolatileTxInfo* FindByCommitTxId(ui64 txId) const; + size_t GetTxInFlight() const { return VolatileTxs.size(); } + const TVolatileTxByVersion& GetVolatileTxByVersion() const { return VolatileTxByVersion; } void PersistAddVolatileTx( |