aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-01-27 14:03:05 +0300
committersnaury <snaury@ydb.tech>2023-01-27 14:03:05 +0300
commit180cad240c3e53366d3dc63f9be43b5eedbd8476 (patch)
tree247eb3e45eccdedc5be9979db80c93c5c679b59e
parent31ca75033f014b6aeb68f0bf407bd07db926450c (diff)
downloadydb-180cad240c3e53366d3dc63f9be43b5eedbd8476.tar.gz
Wait for volatile transactions to commit before scheme operations and split/merges
-rw-r--r--ydb/core/tx/datashard/build_and_wait_dependencies_unit.cpp28
-rw-r--r--ydb/core/tx/datashard/datashard_split_src.cpp2
-rw-r--r--ydb/core/tx/datashard/datashard_ut_volatile.cpp161
-rw-r--r--ydb/core/tx/datashard/volatile_tx.cpp8
-rw-r--r--ydb/core/tx/datashard/volatile_tx.h2
5 files changed, 197 insertions, 4 deletions
diff --git a/ydb/core/tx/datashard/build_and_wait_dependencies_unit.cpp b/ydb/core/tx/datashard/build_and_wait_dependencies_unit.cpp
index 7eb786866a9..bd55d64eb20 100644
--- a/ydb/core/tx/datashard/build_and_wait_dependencies_unit.cpp
+++ b/ydb/core/tx/datashard/build_and_wait_dependencies_unit.cpp
@@ -23,6 +23,7 @@ public:
private:
void BuildDependencies(const TOperation::TPtr &op);
+ bool BuildVolatileDependencies(const TOperation::TPtr &op);
};
TBuildAndWaitDependenciesUnit::TBuildAndWaitDependenciesUnit(TDataShard &dataShard,
@@ -42,7 +43,9 @@ bool TBuildAndWaitDependenciesUnit::HasDirectBlockers(const TOperation::TPtr& op
{
Y_VERIFY_DEBUG(op->IsWaitingDependencies());
- return !op->GetDependencies().empty() || !op->GetSpecialDependencies().empty();
+ return !op->GetDependencies().empty()
+ || !op->GetSpecialDependencies().empty()
+ || op->HasVolatileDependencies();
}
bool TBuildAndWaitDependenciesUnit::IsReadyToExecute(TOperation::TPtr op) const
@@ -66,6 +69,7 @@ EExecutionStatus TBuildAndWaitDependenciesUnit::Execute(TOperation::TPtr op,
// Build dependencies if not yet.
if (!op->IsWaitingDependencies()) {
BuildDependencies(op);
+ BuildVolatileDependencies(op);
// After dependencies are built we can add operation to active ops.
// For planned operations it means we can load and process the next
@@ -107,6 +111,10 @@ EExecutionStatus TBuildAndWaitDependenciesUnit::Execute(TOperation::TPtr op,
return EExecutionStatus::Continue;
}
+ } else if (BuildVolatileDependencies(op)) {
+ // We acquired new volatile dependencies, wait for them too
+ Y_VERIFY(!IsReadyToExecute(op));
+ return EExecutionStatus::Continue;
}
DataShard.IncCounter(COUNTER_WAIT_DEPENDENCIES_LATENCY_MS, op->GetCurrentElapsedAndReset().MilliSeconds());
@@ -168,6 +176,24 @@ void TBuildAndWaitDependenciesUnit::BuildDependencies(const TOperation::TPtr &op
op->ResetCurrentTimer();
}
+bool TBuildAndWaitDependenciesUnit::BuildVolatileDependencies(const TOperation::TPtr &op) {
+ // Scheme operations need to wait for all volatile transactions below them
+ if (op->IsSchemeTx()) {
+ TRowVersion current(op->GetStep(), op->GetTxId());
+ for (auto* info : DataShard.GetVolatileTxManager().GetVolatileTxByVersion()) {
+ if (current < info->Version) {
+ break;
+ }
+ op->AddVolatileDependency(info->TxId);
+ bool added = DataShard.GetVolatileTxManager()
+ .AttachWaitingRemovalOperation(info->TxId, op->GetTxId());
+ Y_VERIFY(added);
+ }
+ }
+
+ return op->HasVolatileDependencies();
+}
+
void TBuildAndWaitDependenciesUnit::Complete(TOperation::TPtr,
const TActorContext &)
{
diff --git a/ydb/core/tx/datashard/datashard_split_src.cpp b/ydb/core/tx/datashard/datashard_split_src.cpp
index f762801c221..8fbcf1af960 100644
--- a/ydb/core/tx/datashard/datashard_split_src.cpp
+++ b/ydb/core/tx/datashard/datashard_split_src.cpp
@@ -96,7 +96,7 @@ public:
void TDataShard::CheckSplitCanStart(const TActorContext& ctx) {
if (State == TShardState::SplitSrcWaitForNoTxInFlight) {
- ui64 txInFly = TxInFly();
+ ui64 txInFly = TxInFly() + VolatileTxManager.GetTxInFlight();
ui64 immediateTxInFly = ImmediateInFly();
SetCounter(COUNTER_SPLIT_SRC_WAIT_TX_IN_FLY, txInFly);
SetCounter(COUNTER_SPLIT_SRC_WAIT_IMMEDIATE_TX_IN_FLY, immediateTxInFly);
diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
index 25e719470d9..40fd19086fa 100644
--- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
@@ -787,6 +787,167 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
"{ items { uint32_value: 20 } items { uint32_value: 20 } items { null_flag_value: NULL_VALUE } }");
}
+ Y_UNIT_TEST(DistributedWriteThenCopyTable) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetDomainPlanResolution(1000);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+ CreateShardedTable(server, sender, "/Root", "table-2", 1);
+
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);");
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10);");
+
+ size_t observedPropose = 0;
+ TVector<THolder<IEventHandle>> capturedReadSets;
+ auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto {
+ switch (ev->GetTypeRewrite()) {
+ case TEvDataShard::TEvProposeTransaction::EventType: {
+ ++observedPropose;
+ Cerr << "... observed TEvProposeTransaction" << Endl;
+ break;
+ }
+ case TEvTxProcessing::TEvReadSet::EventType: {
+ const auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>();
+ Cerr << "... captured TEvReadSet for " << msg->Record.GetTabletDest() << Endl;
+ capturedReadSets.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ auto prevObserverFunc = runtime.SetObserverFunc(captureEvents);
+
+ runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true);
+
+ TString sessionId = CreateSessionRPC(runtime, "/Root");
+
+ auto future = SendRequest(runtime, MakeSimpleRequestRPC(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2);
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20);
+ )", sessionId, "", true /* commitTx */), "/Root");
+
+ WaitFor(runtime, [&]{ return capturedReadSets.size() >= 4; }, "captured readsets");
+ UNIT_ASSERT_VALUES_EQUAL(capturedReadSets.size(), 4u);
+
+ runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false);
+
+ observedPropose = 0;
+ ui64 txId = AsyncCreateCopyTable(server, sender, "/Root", "table-1-copy", "/Root/table-1");
+ WaitFor(runtime, [&]{ return observedPropose > 0; }, "observed propose");
+
+ SimulateSleep(runtime, TDuration::Seconds(1));
+
+ runtime.SetObserverFunc(prevObserverFunc);
+ for (auto& ev : capturedReadSets) {
+ runtime.Send(ev.Release(), 0, true);
+ }
+
+ // Wait for copy table to finish
+ WaitTxNotification(server, sender, txId);
+
+ // Verify table copy has above changes committed
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, R"(
+ SELECT key, value FROM `/Root/table-1-copy`
+ ORDER BY key
+ )"),
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
+ "{ items { uint32_value: 2 } items { uint32_value: 2 } }");
+ }
+
+ Y_UNIT_TEST(DistributedWriteThenSplit) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetDomainPlanResolution(1000);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+ CreateShardedTable(server, sender, "/Root", "table-2", 1);
+
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);");
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10);");
+
+ size_t observedSplit = 0;
+ TVector<THolder<IEventHandle>> capturedReadSets;
+ auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto {
+ switch (ev->GetTypeRewrite()) {
+ case TEvDataShard::TEvSplit::EventType: {
+ ++observedSplit;
+ Cerr << "... observed TEvSplit" << Endl;
+ break;
+ }
+ case TEvTxProcessing::TEvReadSet::EventType: {
+ const auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>();
+ Cerr << "... captured TEvReadSet for " << msg->Record.GetTabletDest() << Endl;
+ capturedReadSets.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ auto prevObserverFunc = runtime.SetObserverFunc(captureEvents);
+
+ runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true);
+
+ TString sessionId = CreateSessionRPC(runtime, "/Root");
+
+ auto future = SendRequest(runtime, MakeSimpleRequestRPC(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2);
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20);
+ )", sessionId, "", true /* commitTx */), "/Root");
+
+ WaitFor(runtime, [&]{ return capturedReadSets.size() >= 4; }, "captured readsets");
+ UNIT_ASSERT_VALUES_EQUAL(capturedReadSets.size(), 4u);
+
+ runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false);
+
+ SetSplitMergePartCountLimit(server->GetRuntime(), -1);
+ auto shards1before = GetTableShards(server, sender, "/Root/table-1");
+ ui64 txId = AsyncSplitTable(server, sender, "/Root/table-1", shards1before.at(0), 2);
+ WaitFor(runtime, [&]{ return observedSplit > 0; }, "observed split");
+
+ SimulateSleep(runtime, TDuration::Seconds(1));
+
+ runtime.SetObserverFunc(prevObserverFunc);
+ for (auto& ev : capturedReadSets) {
+ runtime.Send(ev.Release(), 0, true);
+ }
+
+ // Wait for split to finish
+ WaitTxNotification(server, sender, txId);
+
+ // Verify table has changes committed
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )"),
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
+ "{ items { uint32_value: 2 } items { uint32_value: 2 } }");
+ }
+
} // Y_UNIT_TEST_SUITE(DataShardVolatile)
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/volatile_tx.cpp b/ydb/core/tx/datashard/volatile_tx.cpp
index 30c87fd16b7..9d5a41d2346 100644
--- a/ydb/core/tx/datashard/volatile_tx.cpp
+++ b/ydb/core/tx/datashard/volatile_tx.cpp
@@ -13,7 +13,7 @@ namespace NKikimr::NDataShard {
TTxType GetTxType() const override { return TXTYPE_VOLATILE_TX_COMMIT; }
- bool Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext&) override {
+ bool Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) override {
Y_VERIFY(Self->VolatileTxManager.PendingCommitTxScheduled);
Self->VolatileTxManager.PendingCommitTxScheduled = false;
@@ -45,6 +45,8 @@ namespace NKikimr::NDataShard {
Self->VolatileTxManager.RemoveFromTxMap(info);
Self->VolatileTxManager.RemoveVolatileTx(TxId);
+
+ Self->CheckSplitCanStart(ctx);
return true;
}
@@ -95,7 +97,7 @@ namespace NKikimr::NDataShard {
return true;
}
- void Complete(const TActorContext&) override {
+ void Complete(const TActorContext& ctx) override {
auto* info = Self->VolatileTxManager.FindByTxId(TxId);
Y_VERIFY(info && info->State == EVolatileTxState::Aborting);
@@ -107,6 +109,8 @@ namespace NKikimr::NDataShard {
Self->VolatileTxManager.RemoveFromTxMap(info);
Self->VolatileTxManager.RemoveVolatileTx(TxId);
+
+ Self->CheckSplitCanStart(ctx);
}
private:
diff --git a/ydb/core/tx/datashard/volatile_tx.h b/ydb/core/tx/datashard/volatile_tx.h
index ef05ee9c896..154ad150308 100644
--- a/ydb/core/tx/datashard/volatile_tx.h
+++ b/ydb/core/tx/datashard/volatile_tx.h
@@ -147,6 +147,8 @@ namespace NKikimr::NDataShard {
TVolatileTxInfo* FindByTxId(ui64 txId) const;
TVolatileTxInfo* FindByCommitTxId(ui64 txId) const;
+ size_t GetTxInFlight() const { return VolatileTxs.size(); }
+
const TVolatileTxByVersion& GetVolatileTxByVersion() const { return VolatileTxByVersion; }
void PersistAddVolatileTx(