aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-01-26 18:43:14 +0300
committersnaury <snaury@ydb.tech>2023-01-26 18:43:14 +0300
commitbcb5d533a29a90c4099f725e73442909cc462635 (patch)
treec76924b76070298e39157af91f1980e53f068d26
parent63aa601686f846db5c3fc5056316a31d6aca4a4a (diff)
downloadydb-bcb5d533a29a90c4099f725e73442909cc462635.tar.gz
Support some edge cases in volatile transactions
-rw-r--r--ydb/core/tx/datashard/datashard.cpp9
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.cpp33
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.h3
-rw-r--r--ydb/core/tx/datashard/datashard__monitoring.cpp11
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.cpp20
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.h1
-rw-r--r--ydb/core/tx/datashard/datashard_trans_queue.cpp5
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common.cpp13
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common.h6
-rw-r--r--ydb/core/tx/datashard/datashard_ut_volatile.cpp232
-rw-r--r--ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp6
-rw-r--r--ydb/core/tx/datashard/read_table_scan_unit.cpp20
-rw-r--r--ydb/core/tx/datashard/volatile_tx.cpp43
-rw-r--r--ydb/core/tx/datashard/volatile_tx.h28
14 files changed, 412 insertions, 18 deletions
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index 19a2651bcfb..4424a1481ca 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -3091,6 +3091,15 @@ bool TDataShard::RemoveExpectation(ui64 target, ui64 txId) {
auto ctx = TActivationContext::ActorContextFor(SelfId());
ResendReadSetPipeTracker.DetachTablet(Max<ui64>(), target, 0, ctx);
}
+
+ // progress one more tx to force delayed schema operations
+ if (removed && OutReadSets.Empty() && Pipeline.HasSchemaOperation()) {
+ // TODO: wait for empty OutRS in a separate unit?
+ auto ctx = TActivationContext::ActorContextFor(SelfId());
+ Pipeline.AddCandidateUnit(EExecutionUnitKind::PlanQueue);
+ PlanQueue.Progress(ctx);
+ }
+
return removed;
}
diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp
index 57573184875..48043bcd66a 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.cpp
+++ b/ydb/core/tx/datashard/datashard__engine_host.cpp
@@ -774,7 +774,7 @@ public:
class TLockedWriteTxObserver : public NTable::ITransactionObserver {
public:
- TLockedWriteTxObserver(const TDataShardEngineHost* host, ui64 txId, ui64& skipCount, ui32 localTid)
+ TLockedWriteTxObserver(TDataShardEngineHost* host, ui64 txId, ui64& skipCount, ui32 localTid)
: Host(host)
, SelfTxId(txId)
, SkipCount(skipCount)
@@ -814,7 +814,7 @@ public:
}
private:
- const TDataShardEngineHost* const Host;
+ TDataShardEngineHost* const Host;
const ui64 SelfTxId;
ui64& SkipCount;
const ui32 LocalTid;
@@ -823,7 +823,7 @@ public:
class TWriteTxObserver : public NTable::ITransactionObserver {
public:
- TWriteTxObserver(const TDataShardEngineHost* host)
+ TWriteTxObserver(TDataShardEngineHost* host)
: Host(host)
{
}
@@ -851,7 +851,7 @@ public:
}
private:
- const TDataShardEngineHost* const Host;
+ TDataShardEngineHost* const Host;
};
void AddWriteConflict(ui64 txId) const {
@@ -862,19 +862,26 @@ public:
}
}
- void BreakWriteConflict(ui64 txId) const {
+ void BreakWriteConflict(ui64 txId) {
if (VolatileCommitTxIds.contains(txId)) {
// Skip our own commits
} else if (auto* info = Self->GetVolatileTxManager().FindByCommitTxId(txId)) {
// We must not overwrite uncommitted changes that may become committed
- // later, so we need to switch transaction to volatile writes mid
- // flight. This is only needed so we don't block writes and still
- // commit changes in the correct order.
- if (!VolatileTxId && info->State != EVolatileTxState::Aborting) {
- Y_FAIL("TODO: implement switching to volatile writes mid-transaction");
- }
- // Add dependency on uncommitted transactions
- if (VolatileTxId && info->State != EVolatileTxState::Committed) {
+ // later, so we need to add a dependency that will force us to wait
+ // until it is persistently committed. We may ignore aborting changes
+ // even though they may not be persistent yet, since this tx will
+ // also perform writes, and either it fails, or future generation
+ // could not have possibly committed it already.
+ if (info->State != EVolatileTxState::Aborting) {
+ if (!VolatileTxId) {
+ // All further writes will use this VolatileTxId and will
+ // add it to VolatileCommitTxIds, forcing it to be committed
+ // like a volatile transaction. Note that this does not make
+ // it into a real volatile transaction, it works as usual in
+ // every sense, only persistent commit order is affected by
+ // a dependency below.
+ VolatileTxId = EngineBay.GetTxId();
+ }
VolatileDependencies.insert(info->TxId);
}
} else {
diff --git a/ydb/core/tx/datashard/datashard__engine_host.h b/ydb/core/tx/datashard/datashard__engine_host.h
index 6a4b877b7d2..70c285fede7 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.h
+++ b/ydb/core/tx/datashard/datashard__engine_host.h
@@ -94,6 +94,9 @@ public:
EngineHost.Reset();
}
+ ui64 GetStep() const { return StepTxId.first; }
+ ui64 GetTxId() const { return StepTxId.second; }
+
const TValidationInfo& TxInfo() const { return Info; }
TEngineBay::TSizes CalcSizes(bool needsTotalKeysSize) const;
diff --git a/ydb/core/tx/datashard/datashard__monitoring.cpp b/ydb/core/tx/datashard/datashard__monitoring.cpp
index 644f9a74199..957b7c00891 100644
--- a/ydb/core/tx/datashard/datashard__monitoring.cpp
+++ b/ydb/core/tx/datashard/datashard__monitoring.cpp
@@ -255,6 +255,7 @@ public:
auto &deps = *resp.MutableDependencies();
FillDependencies(op->GetDependencies(), *deps.MutableDependencies());
FillDependencies(op->GetDependents(), *deps.MutableDependents());
+ FillDependencies(op->GetVolatileDependencies(), *deps.MutableDependencies());
if (op->IsExecuting() && !op->InReadSets().empty()) {
auto &inData = *resp.MutableInputData();
@@ -299,6 +300,16 @@ public:
}
}
+ void FillDependencies(const absl::flat_hash_set<ui64> &deps,
+ ::google::protobuf::RepeatedPtrField<NKikimrTxDataShard::TEvGetOperationResponse_TDependency> &arr)
+ {
+ for (ui64 txId : deps) {
+ auto &dep = *arr.Add();
+ dep.SetTarget(txId);
+ dep.AddTypes("Data");
+ }
+ }
+
void Complete(const TActorContext &) override {}
TTxType GetTxType() const override { return TXTYPE_MONITORING; }
diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp
index 753066d0e0c..075155d1a2e 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.cpp
+++ b/ydb/core/tx/datashard/datashard_pipeline.cpp
@@ -517,6 +517,15 @@ TOperation::TPtr TPipeline::GetActiveOp(ui64 txId)
return nullptr;
}
+TOperation::TPtr TPipeline::GetVolatileOp(ui64 txId)
+{
+ TOperation::TPtr op = FindOp(txId);
+ if (op && op->HasVolatilePrepareFlag()) {
+ return op;
+ }
+ return nullptr;
+}
+
bool TPipeline::LoadTxDetails(TTransactionContext &txc,
const TActorContext &ctx,
TActiveTransaction::TPtr tx)
@@ -644,6 +653,11 @@ bool TPipeline::SaveInReadSet(const TEvTxProcessing::TEvReadSet &rs,
}
TOperation::TPtr op = GetActiveOp(txId);
+ bool active = true;
+ if (!op) {
+ op = GetVolatileOp(txId);
+ active = false;
+ }
if (op) {
// If input read sets are not loaded yet then
// it will be added at load.
@@ -655,8 +669,10 @@ bool TPipeline::SaveInReadSet(const TEvTxProcessing::TEvReadSet &rs,
}
op->AddDelayedInReadSet(rs.Record);
- AddCandidateOp(op);
- Self->PlanQueue.Progress(ctx);
+ if (active) {
+ AddCandidateOp(op);
+ Self->PlanQueue.Progress(ctx);
+ }
return false;
}
diff --git a/ydb/core/tx/datashard/datashard_pipeline.h b/ydb/core/tx/datashard/datashard_pipeline.h
index f4c235d8edc..e8c06dea8a1 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.h
+++ b/ydb/core/tx/datashard/datashard_pipeline.h
@@ -230,6 +230,7 @@ public:
TOperation::TPtr FindOp(ui64 txId);
TOperation::TPtr GetActiveOp(ui64 txId);
+ TOperation::TPtr GetVolatileOp(ui64 txId);
const TMap<TStepOrder, TOperation::TPtr> &GetActiveOps() const { return ActiveOps; }
void AddActiveOp(TOperation::TPtr op);
diff --git a/ydb/core/tx/datashard/datashard_trans_queue.cpp b/ydb/core/tx/datashard/datashard_trans_queue.cpp
index b6e07f742ca..3c1f4633574 100644
--- a/ydb/core/tx/datashard/datashard_trans_queue.cpp
+++ b/ydb/core/tx/datashard/datashard_trans_queue.cpp
@@ -254,7 +254,10 @@ void TTransQueue::UpdateTxFlags(NIceDb::TNiceDb& db, ui64 txId, ui64 flags) {
auto it = TxsInFly.find(txId);
Y_VERIFY(it != TxsInFly.end());
- Y_VERIFY(!it->second->HasVolatilePrepareFlag(), "Unexpected UpdateTxFlags for a volatile transaction");
+ if (it->second->HasVolatilePrepareFlag()) {
+ // We keep volatile transactions in memory and don't store anything
+ return;
+ }
const ui64 preserveFlagsMask = TTxFlags::PublicFlagsMask | TTxFlags::PreservedPrivateFlagsMask;
diff --git a/ydb/core/tx/datashard/datashard_ut_common.cpp b/ydb/core/tx/datashard/datashard_ut_common.cpp
index 79124ae262c..5ade7fe5bf3 100644
--- a/ydb/core/tx/datashard/datashard_ut_common.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_common.cpp
@@ -1462,6 +1462,19 @@ TRowVersion CommitWrites(
return { step, txId };
}
+ui64 AsyncDropTable(
+ Tests::TServer::TPtr server,
+ TActorId sender,
+ const TString& workingDir,
+ const TString& name)
+{
+ auto request = SchemeTxTemplate(NKikimrSchemeOp::ESchemeOpDropTable, workingDir);
+ auto& desc = *request->Record.MutableTransaction()->MutableModifyScheme()->MutableDrop();
+ desc.SetName(name);
+
+ return RunSchemeTx(*server->GetRuntime(), std::move(request), sender, true);
+}
+
ui64 AsyncSplitTable(
Tests::TServer::TPtr server,
TActorId sender,
diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h
index 9bac26dbfe1..c81b07a1252 100644
--- a/ydb/core/tx/datashard/datashard_ut_common.h
+++ b/ydb/core/tx/datashard/datashard_ut_common.h
@@ -547,6 +547,12 @@ TRowVersion CommitWrites(
const TVector<TString>& tables,
ui64 writeTxId);
+ui64 AsyncDropTable(
+ Tests::TServer::TPtr server,
+ TActorId sender,
+ const TString& workingDir,
+ const TString& name);
+
ui64 AsyncSplitTable(
Tests::TServer::TPtr server,
TActorId sender,
diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
index 2d1efb37f22..25e719470d9 100644
--- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
@@ -555,6 +555,238 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
"{ items { uint32_value: 10 } items { uint32_value: 10 } }");
}
+ Y_UNIT_TEST(DistributedWriteAsymmetricExecute) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetDomainPlanResolution(1000);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+ CreateShardedTable(server, sender, "/Root", "table-2", 1);
+
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);");
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10);");
+
+ const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0);
+
+ size_t observedPlans = 0;
+ TVector<THolder<IEventHandle>> capturedPlans;
+ auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto {
+ switch (ev->GetTypeRewrite()) {
+ case TEvTxProcessing::TEvPlanStep::EventType: {
+ const auto* msg = ev->Get<TEvTxProcessing::TEvPlanStep>();
+ ++observedPlans;
+ if (msg->Record.GetTabletID() == shard1) {
+ Cerr << "... captured TEvPlanStep for " << msg->Record.GetTabletID() << Endl;
+ capturedPlans.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ break;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ auto prevObserverFunc = runtime.SetObserverFunc(captureEvents);
+
+ runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true);
+
+ TString sessionId = CreateSessionRPC(runtime, "/Root");
+
+ auto future = SendRequest(runtime, MakeSimpleRequestRPC(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2);
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20);
+ )", sessionId, "", true /* commitTx */), "/Root");
+
+ WaitFor(runtime, [&]{ return observedPlans >= 2; }, "observed plans");
+ UNIT_ASSERT_VALUES_EQUAL(capturedPlans.size(), 1u);
+
+ runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false);
+
+ // Wait until it completes at shard2
+ SimulateSleep(runtime, TDuration::Seconds(1));
+
+ // Unblock plan at shard1
+ runtime.SetObserverFunc(prevObserverFunc);
+ for (auto& ev : capturedPlans) {
+ runtime.Send(ev.Release(), 0, true);
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ FormatResult(AwaitResponse(runtime, std::move(future))),
+ "<empty>");
+ Cerr << "!!! distributed write end" << Endl;
+ }
+
+ Y_UNIT_TEST(DistributedWriteThenDropTable) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetDomainPlanResolution(1000);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+ CreateShardedTable(server, sender, "/Root", "table-2", 1);
+
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);");
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10);");
+
+ size_t observedPropose = 0;
+ TVector<THolder<IEventHandle>> capturedReadSets;
+ auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto {
+ switch (ev->GetTypeRewrite()) {
+ case TEvDataShard::TEvProposeTransaction::EventType: {
+ ++observedPropose;
+ Cerr << "... observed TEvProposeTransaction" << Endl;
+ break;
+ }
+ case TEvTxProcessing::TEvReadSet::EventType: {
+ const auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>();
+ Cerr << "... captured TEvReadSet for " << msg->Record.GetTabletDest() << Endl;
+ capturedReadSets.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ auto prevObserverFunc = runtime.SetObserverFunc(captureEvents);
+
+ runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true);
+
+ TString sessionId = CreateSessionRPC(runtime, "/Root");
+
+ auto future = SendRequest(runtime, MakeSimpleRequestRPC(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2);
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20);
+ )", sessionId, "", true /* commitTx */), "/Root");
+
+ WaitFor(runtime, [&]{ return capturedReadSets.size() >= 4; }, "captured readsets");
+ UNIT_ASSERT_VALUES_EQUAL(capturedReadSets.size(), 4u);
+
+ runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false);
+
+ observedPropose = 0;
+ ui64 txId = AsyncDropTable(server, sender, "/Root", "table-1");
+ WaitFor(runtime, [&]{ return observedPropose > 0; }, "observed propose");
+
+ SimulateSleep(runtime, TDuration::Seconds(1));
+
+ runtime.SetObserverFunc(prevObserverFunc);
+ for (auto& ev : capturedReadSets) {
+ runtime.Send(ev.Release(), 0, true);
+ }
+
+ WaitTxNotification(server, sender, txId);
+ }
+
+ Y_UNIT_TEST(DistributedWriteThenImmediateUpsert) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetDomainPlanResolution(1000);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ auto opts = TShardedTableOptions()
+ .Shards(1)
+ .Columns({
+ {"key", "Uint32", true, false},
+ {"value", "Uint32", false, false},
+ {"value2", "Uint32", false, false}});
+ CreateShardedTable(server, sender, "/Root", "table-1", opts);
+ CreateShardedTable(server, sender, "/Root", "table-2", opts);
+
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);");
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10);");
+
+ const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0);
+ const auto tableId1 = ResolveTableId(server, sender, "/Root/table-1");
+
+ TVector<THolder<IEventHandle>> capturedReadSets;
+ auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto {
+ switch (ev->GetTypeRewrite()) {
+ case TEvTxProcessing::TEvReadSet::EventType: {
+ const auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>();
+ Cerr << "... captured TEvReadSet for " << msg->Record.GetTabletDest() << Endl;
+ capturedReadSets.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ auto prevObserverFunc = runtime.SetObserverFunc(captureEvents);
+
+ runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true);
+
+ TString sessionId = CreateSessionRPC(runtime, "/Root");
+
+ auto future = SendRequest(runtime, MakeSimpleRequestRPC(R"(
+ UPSERT INTO `/Root/table-1` (key, value, value2) VALUES (2, 2, 42);
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20);
+ )", sessionId, "", true /* commitTx */), "/Root");
+
+ WaitFor(runtime, [&]{ return capturedReadSets.size() >= 4; }, "captured readsets");
+ UNIT_ASSERT_VALUES_EQUAL(capturedReadSets.size(), 4u);
+
+ runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false);
+
+ // Note: this upsert happens over the upsert into the value column
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value2) VALUES (2, 51);");
+
+ // This compaction verifies there's no commit race with the waiting
+ // distributed transaction. If commits happen in incorrect order we
+ // would observe unexpected results.
+ CompactTable(runtime, shard1, tableId1, false);
+
+ runtime.SetObserverFunc(prevObserverFunc);
+ for (auto& ev : capturedReadSets) {
+ runtime.Send(ev.Release(), 0, true);
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ FormatResult(AwaitResponse(runtime, std::move(future))),
+ "<empty>");
+
+ // Verify the result
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, R"(
+ SELECT key, value, value2 FROM `/Root/table-1`
+ UNION ALL
+ SELECT key, value, value2 FROM `/Root/table-2`
+ ORDER BY key
+ )"),
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } items { null_flag_value: NULL_VALUE } }, "
+ "{ items { uint32_value: 2 } items { uint32_value: 2 } items { uint32_value: 51 } }, "
+ "{ items { uint32_value: 10 } items { uint32_value: 10 } items { null_flag_value: NULL_VALUE } }, "
+ "{ items { uint32_value: 20 } items { uint32_value: 20 } items { null_flag_value: NULL_VALUE } }");
+ }
+
} // Y_UNIT_TEST_SUITE(DataShardVolatile)
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
index ffea1144e2e..bc1529bf25b 100644
--- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
@@ -288,6 +288,12 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
tx->ReleaseTxData(txc, ctx);
+ // Rollback database changes, if any
+ if (txc.DB.HasChanges()) {
+ txc.Reschedule();
+ return EExecutionStatus::Restart;
+ }
+
return EExecutionStatus::Continue;
}
diff --git a/ydb/core/tx/datashard/read_table_scan_unit.cpp b/ydb/core/tx/datashard/read_table_scan_unit.cpp
index c020c6c6abd..7ad857035ba 100644
--- a/ydb/core/tx/datashard/read_table_scan_unit.cpp
+++ b/ydb/core/tx/datashard/read_table_scan_unit.cpp
@@ -48,7 +48,7 @@ bool TReadTableScanUnit::IsReadyToExecute(TOperation::TPtr op) const
return true;
if (!op->IsWaitingForScan())
- return true;
+ return !op->HasRuntimeConflicts();
if (op->HasScanResult())
return true;
@@ -97,6 +97,24 @@ EExecutionStatus TReadTableScanUnit::Execute(TOperation::TPtr op,
if (record.HasSnapshotStep() && record.HasSnapshotTxId()) {
Y_VERIFY(op->HasAcquiredSnapshotKey(), "Missing snapshot reference in ReadTable tx");
+
+ bool wait = false;
+ const auto& byVersion = DataShard.GetVolatileTxManager().GetVolatileTxByVersion();
+ auto end = byVersion.upper_bound(TRowVersion(record.GetSnapshotStep(), record.GetSnapshotTxId()));
+ for (auto it = byVersion.begin(); it != end; ++it) {
+ auto* info = *it;
+ op->AddVolatileDependency(info->TxId);
+ bool ok = DataShard.GetVolatileTxManager().AttachWaitingRemovalOperation(info->TxId, op->GetTxId());
+ Y_VERIFY_S(ok, "Unexpected failure to attach TxId# " << op->GetTxId() << " to volatile tx " << info->TxId);
+ wait = true;
+ }
+
+ if (wait) {
+ // Wait until all volatile transactions below snapshot are removed
+ // This guarantees they are either committed or aborted and will
+ // be visible without any special tx map.
+ return EExecutionStatus::Continue;
+ }
} else if (!DataShard.IsMvccEnabled()) {
Y_VERIFY(tx->GetScanSnapshotId(), "Missing snapshot in ReadTable tx");
}
diff --git a/ydb/core/tx/datashard/volatile_tx.cpp b/ydb/core/tx/datashard/volatile_tx.cpp
index 8f8f6c6354f..30c87fd16b7 100644
--- a/ydb/core/tx/datashard/volatile_tx.cpp
+++ b/ydb/core/tx/datashard/volatile_tx.cpp
@@ -131,6 +131,7 @@ namespace NKikimr::NDataShard {
void TVolatileTxManager::Clear() {
VolatileTxs.clear();
+ VolatileTxByVersion.clear();
VolatileTxByCommitTxId.clear();
TxMap.Reset();
}
@@ -138,7 +139,11 @@ namespace NKikimr::NDataShard {
bool TVolatileTxManager::Load(NIceDb::TNiceDb& db) {
using Schema = TDataShard::Schema;
- Y_VERIFY(VolatileTxs.empty() && VolatileTxByCommitTxId.empty() && !TxMap,
+ Y_VERIFY(
+ VolatileTxs.empty() &&
+ VolatileTxByVersion.empty() &&
+ VolatileTxByCommitTxId.empty() &&
+ !TxMap,
"Unexpected Load into non-empty volatile tx manager");
// Tables may not exist in some inactive shards, which cannot have transactions
@@ -260,6 +265,7 @@ namespace NKikimr::NDataShard {
for (auto& pr : VolatileTxs) {
postProcessTxInfo(pr.second.get());
+ VolatileTxByVersion.insert(pr.second.get());
}
return true;
@@ -351,6 +357,8 @@ namespace NKikimr::NDataShard {
info->State = EVolatileTxState::Committed;
}
+ VolatileTxByVersion.insert(info);
+
if (!TxMap) {
TxMap = MakeIntrusive<TTxMap>();
}
@@ -435,9 +443,12 @@ namespace NKikimr::NDataShard {
Y_VERIFY_S(info->Dependencies.empty(), "Unexpected remove of volatile tx " << txId << " with dependencies");
Y_VERIFY_S(info->Dependents.empty(), "Unexpected remove of volatile tx " << txId << " with dependents");
+ UnblockWaitingRemovalOperations(info);
+
for (ui64 commitTxId : info->CommitTxIds) {
VolatileTxByCommitTxId.erase(commitTxId);
}
+ VolatileTxByVersion.erase(info);
VolatileTxs.erase(txId);
}
@@ -491,6 +502,16 @@ namespace NKikimr::NDataShard {
return false;
}
+ bool TVolatileTxManager::AttachWaitingRemovalOperation(ui64 txId, ui64 dependentTxId) {
+ auto it = VolatileTxs.find(txId);
+ if (it == VolatileTxs.end()) {
+ return false;
+ }
+
+ it->second->WaitingRemovalOperations.insert(dependentTxId);
+ return true;
+ }
+
void TVolatileTxManager::AbortWaitingTransaction(TVolatileTxInfo* info) {
Y_VERIFY(info && info->State == EVolatileTxState::Waiting);
@@ -583,6 +604,7 @@ namespace NKikimr::NDataShard {
NIceDb::TNiceDb db(txc.DB);
db.Table<Schema::TxVolatileParticipants>().Key(txId, srcTabletId).Delete();
+ Self->RemoveExpectation(srcTabletId, txId);
if (info->Participants.empty()) {
// Move tx to committed.
@@ -669,6 +691,25 @@ namespace NKikimr::NDataShard {
}
}
+ void TVolatileTxManager::UnblockWaitingRemovalOperations(TVolatileTxInfo* info) {
+ bool added = false;
+ for (ui64 dependentTxId : info->WaitingRemovalOperations) {
+ if (auto op = Self->Pipeline.FindOp(dependentTxId)) {
+ op->RemoveVolatileDependency(info->TxId, info->State == EVolatileTxState::Committed);
+ if (!op->HasVolatileDependencies() && !op->HasRuntimeConflicts()) {
+ Self->Pipeline.AddCandidateOp(op);
+ added = true;
+ }
+ }
+ }
+ info->WaitingRemovalOperations.clear();
+
+ if (added && Self->Pipeline.CanRunAnotherOp()) {
+ auto ctx = TActivationContext::ActorContextFor(Self->SelfId());
+ Self->PlanQueue.Progress(ctx);
+ }
+ }
+
void TVolatileTxManager::AddPendingCommit(ui64 txId) {
PendingCommits.push_back(txId);
RunPendingCommitTx();
diff --git a/ydb/core/tx/datashard/volatile_tx.h b/ydb/core/tx/datashard/volatile_tx.h
index 09979f0977a..ef05ee9c896 100644
--- a/ydb/core/tx/datashard/volatile_tx.h
+++ b/ydb/core/tx/datashard/volatile_tx.h
@@ -50,6 +50,7 @@ namespace NKikimr::NDataShard {
absl::flat_hash_set<ui64> Participants;
bool AddCommitted = false;
absl::flat_hash_set<ui64> BlockedOperations;
+ absl::flat_hash_set<ui64> WaitingRemovalOperations;
TStackVec<IVolatileTxCallback::TPtr, 2> Callbacks;
};
@@ -114,6 +115,26 @@ namespace NKikimr::NDataShard {
const TIntrusivePtr<TTxMap>& TxMap;
};
+ struct TCompareInfoByVersion {
+ using is_transparent = void;
+
+ bool operator()(const TVolatileTxInfo* a, const TVolatileTxInfo* b) const {
+ // Note: we may have multiple infos with the same version
+ return std::tie(a->Version, a) < std::tie(b->Version, b);
+ }
+
+ bool operator()(const TVolatileTxInfo* a, const TRowVersion& b) const {
+ return a->Version < b;
+ }
+
+ bool operator()(const TRowVersion& a, const TVolatileTxInfo* b) const {
+ return a < b->Version;
+ }
+ };
+
+ public:
+ using TVolatileTxByVersion = std::set<TVolatileTxInfo*, TCompareInfoByVersion>;
+
public:
TVolatileTxManager(TDataShard* self)
: Self(self)
@@ -126,6 +147,8 @@ namespace NKikimr::NDataShard {
TVolatileTxInfo* FindByTxId(ui64 txId) const;
TVolatileTxInfo* FindByCommitTxId(ui64 txId) const;
+ const TVolatileTxByVersion& GetVolatileTxByVersion() const { return VolatileTxByVersion; }
+
void PersistAddVolatileTx(
ui64 txId, const TRowVersion& version,
TConstArrayRef<ui64> commitTxIds,
@@ -139,6 +162,9 @@ namespace NKikimr::NDataShard {
bool AttachBlockedOperation(
ui64 txId, ui64 dependentTxId);
+ bool AttachWaitingRemovalOperation(
+ ui64 txId, ui64 dependentTxId);
+
void AbortWaitingTransaction(TVolatileTxInfo* info);
void ProcessReadSet(
@@ -164,6 +190,7 @@ namespace NKikimr::NDataShard {
void RemoveFromTxMap(TVolatileTxInfo* info);
void UnblockDependents(TVolatileTxInfo* info);
void UnblockOperations(TVolatileTxInfo* info, bool success);
+ void UnblockWaitingRemovalOperations(TVolatileTxInfo* info);
void AddPendingCommit(ui64 txId);
void AddPendingAbort(ui64 txId);
void RunPendingCommitTx();
@@ -173,6 +200,7 @@ namespace NKikimr::NDataShard {
TDataShard* const Self;
absl::flat_hash_map<ui64, std::unique_ptr<TVolatileTxInfo>> VolatileTxs; // TxId -> Info
absl::flat_hash_map<ui64, TVolatileTxInfo*> VolatileTxByCommitTxId; // CommitTxId -> Info
+ TVolatileTxByVersion VolatileTxByVersion;
TIntrusivePtr<TTxMap> TxMap;
std::deque<ui64> PendingCommits;
std::deque<ui64> PendingAborts;