diff options
author | snaury <snaury@ydb.tech> | 2023-01-26 18:43:14 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-01-26 18:43:14 +0300 |
commit | bcb5d533a29a90c4099f725e73442909cc462635 (patch) | |
tree | c76924b76070298e39157af91f1980e53f068d26 | |
parent | 63aa601686f846db5c3fc5056316a31d6aca4a4a (diff) | |
download | ydb-bcb5d533a29a90c4099f725e73442909cc462635.tar.gz |
Support some edge cases in volatile transactions
-rw-r--r-- | ydb/core/tx/datashard/datashard.cpp | 9 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__engine_host.cpp | 33 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__engine_host.h | 3 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__monitoring.cpp | 11 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_pipeline.cpp | 20 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_pipeline.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_trans_queue.cpp | 5 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_common.cpp | 13 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_common.h | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_volatile.cpp | 232 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/read_table_scan_unit.cpp | 20 | ||||
-rw-r--r-- | ydb/core/tx/datashard/volatile_tx.cpp | 43 | ||||
-rw-r--r-- | ydb/core/tx/datashard/volatile_tx.h | 28 |
14 files changed, 412 insertions, 18 deletions
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 19a2651bcfb..4424a1481ca 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -3091,6 +3091,15 @@ bool TDataShard::RemoveExpectation(ui64 target, ui64 txId) { auto ctx = TActivationContext::ActorContextFor(SelfId()); ResendReadSetPipeTracker.DetachTablet(Max<ui64>(), target, 0, ctx); } + + // progress one more tx to force delayed schema operations + if (removed && OutReadSets.Empty() && Pipeline.HasSchemaOperation()) { + // TODO: wait for empty OutRS in a separate unit? + auto ctx = TActivationContext::ActorContextFor(SelfId()); + Pipeline.AddCandidateUnit(EExecutionUnitKind::PlanQueue); + PlanQueue.Progress(ctx); + } + return removed; } diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp index 57573184875..48043bcd66a 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.cpp +++ b/ydb/core/tx/datashard/datashard__engine_host.cpp @@ -774,7 +774,7 @@ public: class TLockedWriteTxObserver : public NTable::ITransactionObserver { public: - TLockedWriteTxObserver(const TDataShardEngineHost* host, ui64 txId, ui64& skipCount, ui32 localTid) + TLockedWriteTxObserver(TDataShardEngineHost* host, ui64 txId, ui64& skipCount, ui32 localTid) : Host(host) , SelfTxId(txId) , SkipCount(skipCount) @@ -814,7 +814,7 @@ public: } private: - const TDataShardEngineHost* const Host; + TDataShardEngineHost* const Host; const ui64 SelfTxId; ui64& SkipCount; const ui32 LocalTid; @@ -823,7 +823,7 @@ public: class TWriteTxObserver : public NTable::ITransactionObserver { public: - TWriteTxObserver(const TDataShardEngineHost* host) + TWriteTxObserver(TDataShardEngineHost* host) : Host(host) { } @@ -851,7 +851,7 @@ public: } private: - const TDataShardEngineHost* const Host; + TDataShardEngineHost* const Host; }; void AddWriteConflict(ui64 txId) const { @@ -862,19 +862,26 @@ public: } } - void BreakWriteConflict(ui64 txId) const { + void BreakWriteConflict(ui64 txId) { if (VolatileCommitTxIds.contains(txId)) { // Skip our own commits } else if (auto* info = Self->GetVolatileTxManager().FindByCommitTxId(txId)) { // We must not overwrite uncommitted changes that may become committed - // later, so we need to switch transaction to volatile writes mid - // flight. This is only needed so we don't block writes and still - // commit changes in the correct order. - if (!VolatileTxId && info->State != EVolatileTxState::Aborting) { - Y_FAIL("TODO: implement switching to volatile writes mid-transaction"); - } - // Add dependency on uncommitted transactions - if (VolatileTxId && info->State != EVolatileTxState::Committed) { + // later, so we need to add a dependency that will force us to wait + // until it is persistently committed. We may ignore aborting changes + // even though they may not be persistent yet, since this tx will + // also perform writes, and either it fails, or future generation + // could not have possibly committed it already. + if (info->State != EVolatileTxState::Aborting) { + if (!VolatileTxId) { + // All further writes will use this VolatileTxId and will + // add it to VolatileCommitTxIds, forcing it to be committed + // like a volatile transaction. Note that this does not make + // it into a real volatile transaction, it works as usual in + // every sense, only persistent commit order is affected by + // a dependency below. + VolatileTxId = EngineBay.GetTxId(); + } VolatileDependencies.insert(info->TxId); } } else { diff --git a/ydb/core/tx/datashard/datashard__engine_host.h b/ydb/core/tx/datashard/datashard__engine_host.h index 6a4b877b7d2..70c285fede7 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.h +++ b/ydb/core/tx/datashard/datashard__engine_host.h @@ -94,6 +94,9 @@ public: EngineHost.Reset(); } + ui64 GetStep() const { return StepTxId.first; } + ui64 GetTxId() const { return StepTxId.second; } + const TValidationInfo& TxInfo() const { return Info; } TEngineBay::TSizes CalcSizes(bool needsTotalKeysSize) const; diff --git a/ydb/core/tx/datashard/datashard__monitoring.cpp b/ydb/core/tx/datashard/datashard__monitoring.cpp index 644f9a74199..957b7c00891 100644 --- a/ydb/core/tx/datashard/datashard__monitoring.cpp +++ b/ydb/core/tx/datashard/datashard__monitoring.cpp @@ -255,6 +255,7 @@ public: auto &deps = *resp.MutableDependencies(); FillDependencies(op->GetDependencies(), *deps.MutableDependencies()); FillDependencies(op->GetDependents(), *deps.MutableDependents()); + FillDependencies(op->GetVolatileDependencies(), *deps.MutableDependencies()); if (op->IsExecuting() && !op->InReadSets().empty()) { auto &inData = *resp.MutableInputData(); @@ -299,6 +300,16 @@ public: } } + void FillDependencies(const absl::flat_hash_set<ui64> &deps, + ::google::protobuf::RepeatedPtrField<NKikimrTxDataShard::TEvGetOperationResponse_TDependency> &arr) + { + for (ui64 txId : deps) { + auto &dep = *arr.Add(); + dep.SetTarget(txId); + dep.AddTypes("Data"); + } + } + void Complete(const TActorContext &) override {} TTxType GetTxType() const override { return TXTYPE_MONITORING; } diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index 753066d0e0c..075155d1a2e 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -517,6 +517,15 @@ TOperation::TPtr TPipeline::GetActiveOp(ui64 txId) return nullptr; } +TOperation::TPtr TPipeline::GetVolatileOp(ui64 txId) +{ + TOperation::TPtr op = FindOp(txId); + if (op && op->HasVolatilePrepareFlag()) { + return op; + } + return nullptr; +} + bool TPipeline::LoadTxDetails(TTransactionContext &txc, const TActorContext &ctx, TActiveTransaction::TPtr tx) @@ -644,6 +653,11 @@ bool TPipeline::SaveInReadSet(const TEvTxProcessing::TEvReadSet &rs, } TOperation::TPtr op = GetActiveOp(txId); + bool active = true; + if (!op) { + op = GetVolatileOp(txId); + active = false; + } if (op) { // If input read sets are not loaded yet then // it will be added at load. @@ -655,8 +669,10 @@ bool TPipeline::SaveInReadSet(const TEvTxProcessing::TEvReadSet &rs, } op->AddDelayedInReadSet(rs.Record); - AddCandidateOp(op); - Self->PlanQueue.Progress(ctx); + if (active) { + AddCandidateOp(op); + Self->PlanQueue.Progress(ctx); + } return false; } diff --git a/ydb/core/tx/datashard/datashard_pipeline.h b/ydb/core/tx/datashard/datashard_pipeline.h index f4c235d8edc..e8c06dea8a1 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.h +++ b/ydb/core/tx/datashard/datashard_pipeline.h @@ -230,6 +230,7 @@ public: TOperation::TPtr FindOp(ui64 txId); TOperation::TPtr GetActiveOp(ui64 txId); + TOperation::TPtr GetVolatileOp(ui64 txId); const TMap<TStepOrder, TOperation::TPtr> &GetActiveOps() const { return ActiveOps; } void AddActiveOp(TOperation::TPtr op); diff --git a/ydb/core/tx/datashard/datashard_trans_queue.cpp b/ydb/core/tx/datashard/datashard_trans_queue.cpp index b6e07f742ca..3c1f4633574 100644 --- a/ydb/core/tx/datashard/datashard_trans_queue.cpp +++ b/ydb/core/tx/datashard/datashard_trans_queue.cpp @@ -254,7 +254,10 @@ void TTransQueue::UpdateTxFlags(NIceDb::TNiceDb& db, ui64 txId, ui64 flags) { auto it = TxsInFly.find(txId); Y_VERIFY(it != TxsInFly.end()); - Y_VERIFY(!it->second->HasVolatilePrepareFlag(), "Unexpected UpdateTxFlags for a volatile transaction"); + if (it->second->HasVolatilePrepareFlag()) { + // We keep volatile transactions in memory and don't store anything + return; + } const ui64 preserveFlagsMask = TTxFlags::PublicFlagsMask | TTxFlags::PreservedPrivateFlagsMask; diff --git a/ydb/core/tx/datashard/datashard_ut_common.cpp b/ydb/core/tx/datashard/datashard_ut_common.cpp index 79124ae262c..5ade7fe5bf3 100644 --- a/ydb/core/tx/datashard/datashard_ut_common.cpp +++ b/ydb/core/tx/datashard/datashard_ut_common.cpp @@ -1462,6 +1462,19 @@ TRowVersion CommitWrites( return { step, txId }; } +ui64 AsyncDropTable( + Tests::TServer::TPtr server, + TActorId sender, + const TString& workingDir, + const TString& name) +{ + auto request = SchemeTxTemplate(NKikimrSchemeOp::ESchemeOpDropTable, workingDir); + auto& desc = *request->Record.MutableTransaction()->MutableModifyScheme()->MutableDrop(); + desc.SetName(name); + + return RunSchemeTx(*server->GetRuntime(), std::move(request), sender, true); +} + ui64 AsyncSplitTable( Tests::TServer::TPtr server, TActorId sender, diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h index 9bac26dbfe1..c81b07a1252 100644 --- a/ydb/core/tx/datashard/datashard_ut_common.h +++ b/ydb/core/tx/datashard/datashard_ut_common.h @@ -547,6 +547,12 @@ TRowVersion CommitWrites( const TVector<TString>& tables, ui64 writeTxId); +ui64 AsyncDropTable( + Tests::TServer::TPtr server, + TActorId sender, + const TString& workingDir, + const TString& name); + ui64 AsyncSplitTable( Tests::TServer::TPtr server, TActorId sender, diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp index 2d1efb37f22..25e719470d9 100644 --- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp +++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp @@ -555,6 +555,238 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) { "{ items { uint32_value: 10 } items { uint32_value: 10 } }"); } + Y_UNIT_TEST(DistributedWriteAsymmetricExecute) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetDomainPlanResolution(1000); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + CreateShardedTable(server, sender, "/Root", "table-1", 1); + CreateShardedTable(server, sender, "/Root", "table-2", 1); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);"); + ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10);"); + + const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0); + + size_t observedPlans = 0; + TVector<THolder<IEventHandle>> capturedPlans; + auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto { + switch (ev->GetTypeRewrite()) { + case TEvTxProcessing::TEvPlanStep::EventType: { + const auto* msg = ev->Get<TEvTxProcessing::TEvPlanStep>(); + ++observedPlans; + if (msg->Record.GetTabletID() == shard1) { + Cerr << "... captured TEvPlanStep for " << msg->Record.GetTabletID() << Endl; + capturedPlans.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + break; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + auto prevObserverFunc = runtime.SetObserverFunc(captureEvents); + + runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true); + + TString sessionId = CreateSessionRPC(runtime, "/Root"); + + auto future = SendRequest(runtime, MakeSimpleRequestRPC(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2); + UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20); + )", sessionId, "", true /* commitTx */), "/Root"); + + WaitFor(runtime, [&]{ return observedPlans >= 2; }, "observed plans"); + UNIT_ASSERT_VALUES_EQUAL(capturedPlans.size(), 1u); + + runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false); + + // Wait until it completes at shard2 + SimulateSleep(runtime, TDuration::Seconds(1)); + + // Unblock plan at shard1 + runtime.SetObserverFunc(prevObserverFunc); + for (auto& ev : capturedPlans) { + runtime.Send(ev.Release(), 0, true); + } + + UNIT_ASSERT_VALUES_EQUAL( + FormatResult(AwaitResponse(runtime, std::move(future))), + "<empty>"); + Cerr << "!!! distributed write end" << Endl; + } + + Y_UNIT_TEST(DistributedWriteThenDropTable) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetDomainPlanResolution(1000); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + CreateShardedTable(server, sender, "/Root", "table-1", 1); + CreateShardedTable(server, sender, "/Root", "table-2", 1); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);"); + ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10);"); + + size_t observedPropose = 0; + TVector<THolder<IEventHandle>> capturedReadSets; + auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto { + switch (ev->GetTypeRewrite()) { + case TEvDataShard::TEvProposeTransaction::EventType: { + ++observedPropose; + Cerr << "... observed TEvProposeTransaction" << Endl; + break; + } + case TEvTxProcessing::TEvReadSet::EventType: { + const auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>(); + Cerr << "... captured TEvReadSet for " << msg->Record.GetTabletDest() << Endl; + capturedReadSets.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + auto prevObserverFunc = runtime.SetObserverFunc(captureEvents); + + runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true); + + TString sessionId = CreateSessionRPC(runtime, "/Root"); + + auto future = SendRequest(runtime, MakeSimpleRequestRPC(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2); + UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20); + )", sessionId, "", true /* commitTx */), "/Root"); + + WaitFor(runtime, [&]{ return capturedReadSets.size() >= 4; }, "captured readsets"); + UNIT_ASSERT_VALUES_EQUAL(capturedReadSets.size(), 4u); + + runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false); + + observedPropose = 0; + ui64 txId = AsyncDropTable(server, sender, "/Root", "table-1"); + WaitFor(runtime, [&]{ return observedPropose > 0; }, "observed propose"); + + SimulateSleep(runtime, TDuration::Seconds(1)); + + runtime.SetObserverFunc(prevObserverFunc); + for (auto& ev : capturedReadSets) { + runtime.Send(ev.Release(), 0, true); + } + + WaitTxNotification(server, sender, txId); + } + + Y_UNIT_TEST(DistributedWriteThenImmediateUpsert) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetDomainPlanResolution(1000); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + auto opts = TShardedTableOptions() + .Shards(1) + .Columns({ + {"key", "Uint32", true, false}, + {"value", "Uint32", false, false}, + {"value2", "Uint32", false, false}}); + CreateShardedTable(server, sender, "/Root", "table-1", opts); + CreateShardedTable(server, sender, "/Root", "table-2", opts); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);"); + ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10);"); + + const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0); + const auto tableId1 = ResolveTableId(server, sender, "/Root/table-1"); + + TVector<THolder<IEventHandle>> capturedReadSets; + auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto { + switch (ev->GetTypeRewrite()) { + case TEvTxProcessing::TEvReadSet::EventType: { + const auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>(); + Cerr << "... captured TEvReadSet for " << msg->Record.GetTabletDest() << Endl; + capturedReadSets.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + auto prevObserverFunc = runtime.SetObserverFunc(captureEvents); + + runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true); + + TString sessionId = CreateSessionRPC(runtime, "/Root"); + + auto future = SendRequest(runtime, MakeSimpleRequestRPC(R"( + UPSERT INTO `/Root/table-1` (key, value, value2) VALUES (2, 2, 42); + UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20); + )", sessionId, "", true /* commitTx */), "/Root"); + + WaitFor(runtime, [&]{ return capturedReadSets.size() >= 4; }, "captured readsets"); + UNIT_ASSERT_VALUES_EQUAL(capturedReadSets.size(), 4u); + + runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false); + + // Note: this upsert happens over the upsert into the value column + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value2) VALUES (2, 51);"); + + // This compaction verifies there's no commit race with the waiting + // distributed transaction. If commits happen in incorrect order we + // would observe unexpected results. + CompactTable(runtime, shard1, tableId1, false); + + runtime.SetObserverFunc(prevObserverFunc); + for (auto& ev : capturedReadSets) { + runtime.Send(ev.Release(), 0, true); + } + + UNIT_ASSERT_VALUES_EQUAL( + FormatResult(AwaitResponse(runtime, std::move(future))), + "<empty>"); + + // Verify the result + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, R"( + SELECT key, value, value2 FROM `/Root/table-1` + UNION ALL + SELECT key, value, value2 FROM `/Root/table-2` + ORDER BY key + )"), + "{ items { uint32_value: 1 } items { uint32_value: 1 } items { null_flag_value: NULL_VALUE } }, " + "{ items { uint32_value: 2 } items { uint32_value: 2 } items { uint32_value: 51 } }, " + "{ items { uint32_value: 10 } items { uint32_value: 10 } items { null_flag_value: NULL_VALUE } }, " + "{ items { uint32_value: 20 } items { uint32_value: 20 } items { null_flag_value: NULL_VALUE } }"); + } + } // Y_UNIT_TEST_SUITE(DataShardVolatile) } // namespace NKikimr diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp index ffea1144e2e..bc1529bf25b 100644 --- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp @@ -288,6 +288,12 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio tx->ReleaseTxData(txc, ctx); + // Rollback database changes, if any + if (txc.DB.HasChanges()) { + txc.Reschedule(); + return EExecutionStatus::Restart; + } + return EExecutionStatus::Continue; } diff --git a/ydb/core/tx/datashard/read_table_scan_unit.cpp b/ydb/core/tx/datashard/read_table_scan_unit.cpp index c020c6c6abd..7ad857035ba 100644 --- a/ydb/core/tx/datashard/read_table_scan_unit.cpp +++ b/ydb/core/tx/datashard/read_table_scan_unit.cpp @@ -48,7 +48,7 @@ bool TReadTableScanUnit::IsReadyToExecute(TOperation::TPtr op) const return true; if (!op->IsWaitingForScan()) - return true; + return !op->HasRuntimeConflicts(); if (op->HasScanResult()) return true; @@ -97,6 +97,24 @@ EExecutionStatus TReadTableScanUnit::Execute(TOperation::TPtr op, if (record.HasSnapshotStep() && record.HasSnapshotTxId()) { Y_VERIFY(op->HasAcquiredSnapshotKey(), "Missing snapshot reference in ReadTable tx"); + + bool wait = false; + const auto& byVersion = DataShard.GetVolatileTxManager().GetVolatileTxByVersion(); + auto end = byVersion.upper_bound(TRowVersion(record.GetSnapshotStep(), record.GetSnapshotTxId())); + for (auto it = byVersion.begin(); it != end; ++it) { + auto* info = *it; + op->AddVolatileDependency(info->TxId); + bool ok = DataShard.GetVolatileTxManager().AttachWaitingRemovalOperation(info->TxId, op->GetTxId()); + Y_VERIFY_S(ok, "Unexpected failure to attach TxId# " << op->GetTxId() << " to volatile tx " << info->TxId); + wait = true; + } + + if (wait) { + // Wait until all volatile transactions below snapshot are removed + // This guarantees they are either committed or aborted and will + // be visible without any special tx map. + return EExecutionStatus::Continue; + } } else if (!DataShard.IsMvccEnabled()) { Y_VERIFY(tx->GetScanSnapshotId(), "Missing snapshot in ReadTable tx"); } diff --git a/ydb/core/tx/datashard/volatile_tx.cpp b/ydb/core/tx/datashard/volatile_tx.cpp index 8f8f6c6354f..30c87fd16b7 100644 --- a/ydb/core/tx/datashard/volatile_tx.cpp +++ b/ydb/core/tx/datashard/volatile_tx.cpp @@ -131,6 +131,7 @@ namespace NKikimr::NDataShard { void TVolatileTxManager::Clear() { VolatileTxs.clear(); + VolatileTxByVersion.clear(); VolatileTxByCommitTxId.clear(); TxMap.Reset(); } @@ -138,7 +139,11 @@ namespace NKikimr::NDataShard { bool TVolatileTxManager::Load(NIceDb::TNiceDb& db) { using Schema = TDataShard::Schema; - Y_VERIFY(VolatileTxs.empty() && VolatileTxByCommitTxId.empty() && !TxMap, + Y_VERIFY( + VolatileTxs.empty() && + VolatileTxByVersion.empty() && + VolatileTxByCommitTxId.empty() && + !TxMap, "Unexpected Load into non-empty volatile tx manager"); // Tables may not exist in some inactive shards, which cannot have transactions @@ -260,6 +265,7 @@ namespace NKikimr::NDataShard { for (auto& pr : VolatileTxs) { postProcessTxInfo(pr.second.get()); + VolatileTxByVersion.insert(pr.second.get()); } return true; @@ -351,6 +357,8 @@ namespace NKikimr::NDataShard { info->State = EVolatileTxState::Committed; } + VolatileTxByVersion.insert(info); + if (!TxMap) { TxMap = MakeIntrusive<TTxMap>(); } @@ -435,9 +443,12 @@ namespace NKikimr::NDataShard { Y_VERIFY_S(info->Dependencies.empty(), "Unexpected remove of volatile tx " << txId << " with dependencies"); Y_VERIFY_S(info->Dependents.empty(), "Unexpected remove of volatile tx " << txId << " with dependents"); + UnblockWaitingRemovalOperations(info); + for (ui64 commitTxId : info->CommitTxIds) { VolatileTxByCommitTxId.erase(commitTxId); } + VolatileTxByVersion.erase(info); VolatileTxs.erase(txId); } @@ -491,6 +502,16 @@ namespace NKikimr::NDataShard { return false; } + bool TVolatileTxManager::AttachWaitingRemovalOperation(ui64 txId, ui64 dependentTxId) { + auto it = VolatileTxs.find(txId); + if (it == VolatileTxs.end()) { + return false; + } + + it->second->WaitingRemovalOperations.insert(dependentTxId); + return true; + } + void TVolatileTxManager::AbortWaitingTransaction(TVolatileTxInfo* info) { Y_VERIFY(info && info->State == EVolatileTxState::Waiting); @@ -583,6 +604,7 @@ namespace NKikimr::NDataShard { NIceDb::TNiceDb db(txc.DB); db.Table<Schema::TxVolatileParticipants>().Key(txId, srcTabletId).Delete(); + Self->RemoveExpectation(srcTabletId, txId); if (info->Participants.empty()) { // Move tx to committed. @@ -669,6 +691,25 @@ namespace NKikimr::NDataShard { } } + void TVolatileTxManager::UnblockWaitingRemovalOperations(TVolatileTxInfo* info) { + bool added = false; + for (ui64 dependentTxId : info->WaitingRemovalOperations) { + if (auto op = Self->Pipeline.FindOp(dependentTxId)) { + op->RemoveVolatileDependency(info->TxId, info->State == EVolatileTxState::Committed); + if (!op->HasVolatileDependencies() && !op->HasRuntimeConflicts()) { + Self->Pipeline.AddCandidateOp(op); + added = true; + } + } + } + info->WaitingRemovalOperations.clear(); + + if (added && Self->Pipeline.CanRunAnotherOp()) { + auto ctx = TActivationContext::ActorContextFor(Self->SelfId()); + Self->PlanQueue.Progress(ctx); + } + } + void TVolatileTxManager::AddPendingCommit(ui64 txId) { PendingCommits.push_back(txId); RunPendingCommitTx(); diff --git a/ydb/core/tx/datashard/volatile_tx.h b/ydb/core/tx/datashard/volatile_tx.h index 09979f0977a..ef05ee9c896 100644 --- a/ydb/core/tx/datashard/volatile_tx.h +++ b/ydb/core/tx/datashard/volatile_tx.h @@ -50,6 +50,7 @@ namespace NKikimr::NDataShard { absl::flat_hash_set<ui64> Participants; bool AddCommitted = false; absl::flat_hash_set<ui64> BlockedOperations; + absl::flat_hash_set<ui64> WaitingRemovalOperations; TStackVec<IVolatileTxCallback::TPtr, 2> Callbacks; }; @@ -114,6 +115,26 @@ namespace NKikimr::NDataShard { const TIntrusivePtr<TTxMap>& TxMap; }; + struct TCompareInfoByVersion { + using is_transparent = void; + + bool operator()(const TVolatileTxInfo* a, const TVolatileTxInfo* b) const { + // Note: we may have multiple infos with the same version + return std::tie(a->Version, a) < std::tie(b->Version, b); + } + + bool operator()(const TVolatileTxInfo* a, const TRowVersion& b) const { + return a->Version < b; + } + + bool operator()(const TRowVersion& a, const TVolatileTxInfo* b) const { + return a < b->Version; + } + }; + + public: + using TVolatileTxByVersion = std::set<TVolatileTxInfo*, TCompareInfoByVersion>; + public: TVolatileTxManager(TDataShard* self) : Self(self) @@ -126,6 +147,8 @@ namespace NKikimr::NDataShard { TVolatileTxInfo* FindByTxId(ui64 txId) const; TVolatileTxInfo* FindByCommitTxId(ui64 txId) const; + const TVolatileTxByVersion& GetVolatileTxByVersion() const { return VolatileTxByVersion; } + void PersistAddVolatileTx( ui64 txId, const TRowVersion& version, TConstArrayRef<ui64> commitTxIds, @@ -139,6 +162,9 @@ namespace NKikimr::NDataShard { bool AttachBlockedOperation( ui64 txId, ui64 dependentTxId); + bool AttachWaitingRemovalOperation( + ui64 txId, ui64 dependentTxId); + void AbortWaitingTransaction(TVolatileTxInfo* info); void ProcessReadSet( @@ -164,6 +190,7 @@ namespace NKikimr::NDataShard { void RemoveFromTxMap(TVolatileTxInfo* info); void UnblockDependents(TVolatileTxInfo* info); void UnblockOperations(TVolatileTxInfo* info, bool success); + void UnblockWaitingRemovalOperations(TVolatileTxInfo* info); void AddPendingCommit(ui64 txId); void AddPendingAbort(ui64 txId); void RunPendingCommitTx(); @@ -173,6 +200,7 @@ namespace NKikimr::NDataShard { TDataShard* const Self; absl::flat_hash_map<ui64, std::unique_ptr<TVolatileTxInfo>> VolatileTxs; // TxId -> Info absl::flat_hash_map<ui64, TVolatileTxInfo*> VolatileTxByCommitTxId; // CommitTxId -> Info + TVolatileTxByVersion VolatileTxByVersion; TIntrusivePtr<TTxMap> TxMap; std::deque<ui64> PendingCommits; std::deque<ui64> PendingAborts; |