diff options
author | snaury <snaury@ydb.tech> | 2023-02-06 14:23:40 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-02-06 14:23:40 +0300 |
commit | e21adc22a0c6891199aa38afb236ca7c82167a39 (patch) | |
tree | 6b7f4dd272856d3483a41b20d3fd4c6e41575904 | |
parent | c1a46c578c1553a1541aa1435c3356740f2a58ac (diff) | |
download | ydb-e21adc22a0c6891199aa38afb236ca7c82167a39.tar.gz |
Wait for volatile transactions in scans
-rw-r--r-- | ydb/core/tx/datashard/build_index.cpp | 8 | ||||
-rw-r--r-- | ydb/core/tx/datashard/cdc_stream_scan.cpp | 3 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__kqp_scan.cpp | 7 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__read_columns.cpp | 9 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_common.h | 12 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_common_kqp.h | 8 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_volatile.cpp | 120 | ||||
-rw-r--r-- | ydb/core/tx/datashard/read_table_scan_unit.cpp | 10 | ||||
-rw-r--r-- | ydb/core/tx/datashard/volatile_tx.cpp | 21 | ||||
-rw-r--r-- | ydb/core/tx/datashard/volatile_tx.h | 23 |
10 files changed, 212 insertions, 9 deletions
diff --git a/ydb/core/tx/datashard/build_index.cpp b/ydb/core/tx/datashard/build_index.cpp index 7be78b61763..1eb08345a05 100644 --- a/ydb/core/tx/datashard/build_index.cpp +++ b/ydb/core/tx/datashard/build_index.cpp @@ -555,6 +555,14 @@ TAutoPtr<NTable::IScan> CreateBuildIndexScan( void TDataShard::Handle(TEvDataShard::TEvBuildIndexCreateRequest::TPtr& ev, const TActorContext& ctx) { const auto& record = ev->Get()->Record; + // Note: it's very unlikely that we have volatile txs before this snapshot + if (VolatileTxManager.HasVolatileTxsAtSnapshot(TRowVersion(record.GetSnapshotStep(), record.GetSnapshotTxId()))) { + VolatileTxManager.AttachWaitingSnapshotEvent( + TRowVersion(record.GetSnapshotStep(), record.GetSnapshotTxId()), + std::unique_ptr<IEventHandle>(ev.Release())); + return; + } + auto response = MakeHolder<TEvDataShard::TEvBuildIndexProgressResponse>(); response->Record.SetBuildIndexId(record.GetBuildIndexId()); response->Record.SetTabletId(TabletID()); diff --git a/ydb/core/tx/datashard/cdc_stream_scan.cpp b/ydb/core/tx/datashard/cdc_stream_scan.cpp index 28f708445d2..6d5641802a2 100644 --- a/ydb/core/tx/datashard/cdc_stream_scan.cpp +++ b/ydb/core/tx/datashard/cdc_stream_scan.cpp @@ -640,6 +640,9 @@ public: const auto snapshotVersion = TRowVersion(snapshotKey.Step, snapshotKey.TxId); Y_VERIFY(info->SnapshotVersion == snapshotVersion); + // Note: cdc stream is added with a schema transaction and those wait for volatile txs + Y_VERIFY(!Self->GetVolatileTxManager().HasVolatileTxsAtSnapshot(snapshotVersion)); + const ui64 localTxId = ++Self->NextTieBreakerIndex; auto scan = MakeHolder<TCdcStreamScan>(Self, Request->Sender, localTxId, tablePathId, streamPathId, snapshotVersion, valueTags, info->LastKey, info->Stats, record.GetLimits()); diff --git a/ydb/core/tx/datashard/datashard__kqp_scan.cpp b/ydb/core/tx/datashard/datashard__kqp_scan.cpp index a691ba1182f..f04adb2952b 100644 --- a/ydb/core/tx/datashard/datashard__kqp_scan.cpp +++ b/ydb/core/tx/datashard/datashard__kqp_scan.cpp @@ -543,6 +543,13 @@ void TDataShard::Handle(TEvDataShard::TEvKqpScan::TPtr& ev, const TActorContext& auto scanComputeActor = ev->Sender; auto generation = request.GetGeneration(); + if (VolatileTxManager.HasVolatileTxsAtSnapshot(TRowVersion(request.GetSnapshot().GetStep(), request.GetSnapshot().GetTxId()))) { + VolatileTxManager.AttachWaitingSnapshotEvent( + TRowVersion(request.GetSnapshot().GetStep(), request.GetSnapshot().GetTxId()), + std::unique_ptr<IEventHandle>(ev.Release())); + return; + } + auto infoIt = TableInfos.find(request.GetLocalPathId()); auto reportError = [this, scanComputeActor, generation] (const TString& table, const TString& detailedReason) { diff --git a/ydb/core/tx/datashard/datashard__read_columns.cpp b/ydb/core/tx/datashard/datashard__read_columns.cpp index 983abe32f79..66cbf2c77fc 100644 --- a/ydb/core/tx/datashard/datashard__read_columns.cpp +++ b/ydb/core/tx/datashard/datashard__read_columns.cpp @@ -212,6 +212,15 @@ public: } bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { + // FIXME: we need to transform HEAD into some non-repeatable snapshot here + if (!ReadVersion.IsMax() && Self->GetVolatileTxManager().HasVolatileTxsAtSnapshot(ReadVersion)) { + Self->GetVolatileTxManager().AttachWaitingSnapshotEvent( + ReadVersion, + std::unique_ptr<IEventHandle>(Ev.Release())); + Result.Destroy(); + return true; + } + Result = new TEvDataShard::TEvReadColumnsResponse(Self->TabletID()); bool useScan = Self->ReadColumnsScanEnabled; diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h index 146032bdc8c..0055bc07dd2 100644 --- a/ydb/core/tx/datashard/datashard_ut_common.h +++ b/ydb/core/tx/datashard/datashard_ut_common.h @@ -366,7 +366,8 @@ void InitRoot(Tests::TServer::TPtr server, class TLambdaActor : public IActorCallback { public: - using TCallback = std::function<void(TAutoPtr<IEventHandle>&)>; + using TCallback = std::function<void(TAutoPtr<IEventHandle>&, const TActorContext&)>; + using TNoCtxCallback = std::function<void(TAutoPtr<IEventHandle>&)>; public: TLambdaActor(TCallback&& callback) @@ -374,10 +375,15 @@ public: , Callback(std::move(callback)) { } + TLambdaActor(TNoCtxCallback&& callback) + : TLambdaActor([callback = std::move(callback)](auto& ev, auto&) { + callback(ev); + }) + { } + private: STFUNC(StateWork) { - Y_UNUSED(ctx); - Callback(ev); + Callback(ev, ctx); } private: diff --git a/ydb/core/tx/datashard/datashard_ut_common_kqp.h b/ydb/core/tx/datashard/datashard_ut_common_kqp.h index 01827e50c52..9994ee28b37 100644 --- a/ydb/core/tx/datashard/datashard_ut_common_kqp.h +++ b/ydb/core/tx/datashard/datashard_ut_common_kqp.h @@ -120,12 +120,16 @@ namespace NKqpHelpers { return request; } + inline TString FormatResult(const Ydb::ResultSet& rs) { + Cerr << JoinSeq(", ", rs.rows()); + return JoinSeq(", ", rs.rows()); + } + inline TString FormatResult(const Ydb::Table::ExecuteQueryResult& result) { if (result.result_sets_size() == 0) { return "<empty>"; } - Cerr << JoinSeq(", ", result.result_sets(0).rows()); - return JoinSeq(", ", result.result_sets(0).rows()); + return FormatResult(result.result_sets(0)); } inline TString FormatResult(const Ydb::Table::ExecuteDataQueryResponse& response) { diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp index 3057ac7b5d2..3152351d6dd 100644 --- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp +++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp @@ -2,6 +2,8 @@ #include "datashard_ut_common_kqp.h" #include "datashard_active_transaction.h" +#include <ydb/core/kqp/executer_actor/kqp_executer.h> + namespace NKikimr { using namespace NKikimr::NDataShard; @@ -1232,6 +1234,124 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) { } } + Y_UNIT_TEST(DistributedWriteThenScanQuery) { + TPortManager pm; + NKikimrConfig::TAppConfig app; + app.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(false); + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetDomainPlanResolution(1000) + .SetAppConfig(app); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + auto opts = TShardedTableOptions() + .Shards(1) + .Columns({ + {"key", "Uint32", true, false}, + {"value", "Uint32", false, false}}); + CreateShardedTable(server, sender, "/Root", "table-1", opts); + CreateShardedTable(server, sender, "/Root", "table-2", opts); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);"); + ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10);"); + + ui64 maxReadSetStep = 0; + bool captureReadSets = true; + TVector<THolder<IEventHandle>> capturedReadSets; + auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto { + switch (ev->GetTypeRewrite()) { + case TEvTxProcessing::TEvReadSet::EventType: { + const auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>(); + maxReadSetStep = Max(maxReadSetStep, msg->Record.GetStep()); + if (captureReadSets) { + Cerr << "... captured TEvReadSet for " << msg->Record.GetTabletDest() << Endl; + capturedReadSets.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + break; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + auto prevObserverFunc = runtime.SetObserverFunc(captureEvents); + + runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true); + + TString sessionId = CreateSessionRPC(runtime, "/Root"); + + auto future = SendRequest(runtime, MakeSimpleRequestRPC(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2); + UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20); + )", sessionId, "", true /* commitTx */), "/Root"); + + WaitFor(runtime, [&]{ return capturedReadSets.size() >= 4; }, "captured readsets"); + UNIT_ASSERT_VALUES_EQUAL(capturedReadSets.size(), 4u); + + runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false); + + TVector<TString> observedResults; + TMaybe<Ydb::StatusIds::StatusCode> observedStatus; + auto scanSender = runtime.Register(new TLambdaActor([&](TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) { + switch (ev->GetTypeRewrite()) { + case NKqp::TEvKqpExecuter::TEvStreamData::EventType: { + auto* msg = ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>(); + Cerr << "... observed stream data" << Endl; + observedResults.push_back(FormatResult(msg->Record.GetResultSet())); + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); + resp->Record.SetSeqNo(msg->Record.GetSeqNo()); + resp->Record.SetFreeSpace(1); + ctx.Send(ev->Sender, resp.Release()); + break; + } + case NKqp::TEvKqp::TEvQueryResponse::EventType: { + auto* msg = ev->Get<NKqp::TEvKqp::TEvQueryResponse>(); + Cerr << "... observed query result" << Endl; + observedStatus = msg->Record.GetRef().GetYdbStatus(); + break; + } + default: { + Cerr << "... ignored event " << ev->GetTypeRewrite(); + if (ev->GetBase()) { + Cerr << " " << ev->GetBase()->ToString(); + } + Cerr << Endl; + } + } + })); + + SendRequest(runtime, scanSender, MakeStreamRequest(scanSender, R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key; + )")); + + SimulateSleep(runtime, TDuration::Seconds(2)); + + UNIT_ASSERT_VALUES_EQUAL(observedResults.size(), 0u); + + captureReadSets = false; + for (auto& ev : capturedReadSets) { + runtime.Send(ev.Release(), 0, true); + } + + SimulateSleep(runtime, TDuration::Seconds(2)); + + UNIT_ASSERT_VALUES_EQUAL(observedResults.size(), 1u); + UNIT_ASSERT_VALUES_EQUAL( + observedResults[0], + "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 2 } }"); + UNIT_ASSERT_VALUES_EQUAL(observedStatus, Ydb::StatusIds::SUCCESS); + } + } // Y_UNIT_TEST_SUITE(DataShardVolatile) } // namespace NKikimr diff --git a/ydb/core/tx/datashard/read_table_scan_unit.cpp b/ydb/core/tx/datashard/read_table_scan_unit.cpp index 7ad857035ba..9fc30ad9d29 100644 --- a/ydb/core/tx/datashard/read_table_scan_unit.cpp +++ b/ydb/core/tx/datashard/read_table_scan_unit.cpp @@ -99,10 +99,11 @@ EExecutionStatus TReadTableScanUnit::Execute(TOperation::TPtr op, Y_VERIFY(op->HasAcquiredSnapshotKey(), "Missing snapshot reference in ReadTable tx"); bool wait = false; - const auto& byVersion = DataShard.GetVolatileTxManager().GetVolatileTxByVersion(); - auto end = byVersion.upper_bound(TRowVersion(record.GetSnapshotStep(), record.GetSnapshotTxId())); - for (auto it = byVersion.begin(); it != end; ++it) { - auto* info = *it; + TRowVersion snapshot(record.GetSnapshotStep(), record.GetSnapshotTxId()); + for (auto* info : DataShard.GetVolatileTxManager().GetVolatileTxByVersion()) { + if (!(info->Version <= snapshot)) { + break; + } op->AddVolatileDependency(info->TxId); bool ok = DataShard.GetVolatileTxManager().AttachWaitingRemovalOperation(info->TxId, op->GetTxId()); Y_VERIFY_S(ok, "Unexpected failure to attach TxId# " << op->GetTxId() << " to volatile tx " << info->TxId); @@ -132,6 +133,7 @@ EExecutionStatus TReadTableScanUnit::Execute(TOperation::TPtr op, auto readVersion = TRowVersion(record.GetSnapshotStep(), record.GetSnapshotTxId()); options.SetSnapshotRowVersion(readVersion); } else if (DataShard.IsMvccEnabled()) { + // Note: this mode is only used in legacy tests and may not work with volatile transactions // With mvcc we have to mark all preceding transactions as logically complete auto readVersion = DataShard.GetReadWriteVersions(tx).ReadVersion; hadWrites |= Pipeline.MarkPlannedLogicallyCompleteUpTo(readVersion, txc); diff --git a/ydb/core/tx/datashard/volatile_tx.cpp b/ydb/core/tx/datashard/volatile_tx.cpp index d24058be1ef..0f079b37df5 100644 --- a/ydb/core/tx/datashard/volatile_tx.cpp +++ b/ydb/core/tx/datashard/volatile_tx.cpp @@ -471,6 +471,20 @@ namespace NKikimr::NDataShard { } VolatileTxByVersion.erase(info); VolatileTxs.erase(txId); + + if (!WaitingSnapshotEvents.empty()) { + TVolatileTxInfo* next = !VolatileTxByVersion.empty() ? *VolatileTxByVersion.begin() : nullptr; + while (!WaitingSnapshotEvents.empty()) { + auto& top = WaitingSnapshotEvents.front(); + if (next && next->Version <= top.Snapshot) { + // Still waiting + break; + } + TActivationContext::Send(std::move(top.Event)); + std::pop_heap(WaitingSnapshotEvents.begin(), WaitingSnapshotEvents.end()); + WaitingSnapshotEvents.pop_back(); + } + } } bool TVolatileTxManager::AttachVolatileTxCallback(ui64 txId, IVolatileTxCallback::TPtr callback) { @@ -533,6 +547,13 @@ namespace NKikimr::NDataShard { return true; } + void TVolatileTxManager::AttachWaitingSnapshotEvent(const TRowVersion& snapshot, std::unique_ptr<IEventHandle>&& event) { + Y_VERIFY(!VolatileTxByVersion.empty() && (*VolatileTxByVersion.begin())->Version <= snapshot); + + WaitingSnapshotEvents.emplace_back(snapshot, std::move(event)); + std::push_heap(WaitingSnapshotEvents.begin(), WaitingSnapshotEvents.end()); + } + void TVolatileTxManager::AbortWaitingTransaction(TVolatileTxInfo* info) { Y_VERIFY(info && info->State == EVolatileTxState::Waiting); diff --git a/ydb/core/tx/datashard/volatile_tx.h b/ydb/core/tx/datashard/volatile_tx.h index 110701e39df..be6202a8d1f 100644 --- a/ydb/core/tx/datashard/volatile_tx.h +++ b/ydb/core/tx/datashard/volatile_tx.h @@ -132,6 +132,21 @@ namespace NKikimr::NDataShard { } }; + struct TWaitingSnapshotEvent { + TRowVersion Snapshot; + std::unique_ptr<IEventHandle> Event; + + TWaitingSnapshotEvent(const TRowVersion& snapshot, std::unique_ptr<IEventHandle>&& event) + : Snapshot(snapshot) + , Event(std::move(event)) + { } + + bool operator<(const TWaitingSnapshotEvent& rhs) const { + // Note: inverted for max-heap + return rhs.Snapshot < Snapshot; + } + }; + public: using TVolatileTxByVersion = std::set<TVolatileTxInfo*, TCompareInfoByVersion>; @@ -151,6 +166,10 @@ namespace NKikimr::NDataShard { const TVolatileTxByVersion& GetVolatileTxByVersion() const { return VolatileTxByVersion; } + bool HasVolatileTxsAtSnapshot(const TRowVersion& snapshot) const { + return !VolatileTxByVersion.empty() && (*VolatileTxByVersion.begin())->Version <= snapshot; + } + void PersistAddVolatileTx( ui64 txId, const TRowVersion& version, TConstArrayRef<ui64> commitTxIds, @@ -167,6 +186,9 @@ namespace NKikimr::NDataShard { bool AttachWaitingRemovalOperation( ui64 txId, ui64 dependentTxId); + void AttachWaitingSnapshotEvent( + const TRowVersion& snapshot, std::unique_ptr<IEventHandle>&& event); + void AbortWaitingTransaction(TVolatileTxInfo* info); void ProcessReadSet( @@ -203,6 +225,7 @@ namespace NKikimr::NDataShard { absl::flat_hash_map<ui64, std::unique_ptr<TVolatileTxInfo>> VolatileTxs; // TxId -> Info absl::flat_hash_map<ui64, TVolatileTxInfo*> VolatileTxByCommitTxId; // CommitTxId -> Info TVolatileTxByVersion VolatileTxByVersion; + std::vector<TWaitingSnapshotEvent> WaitingSnapshotEvents; TIntrusivePtr<TTxMap> TxMap; std::deque<ui64> PendingCommits; std::deque<ui64> PendingAborts; |