diff options
author | snaury <snaury@ydb.tech> | 2022-09-06 19:33:10 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2022-09-06 19:33:10 +0300 |
commit | 54ead21458150a3f3055a5f4077cd5f5309b3818 (patch) | |
tree | 308a356dbba7dda083a6cb46905b43bab9fcff4d | |
parent | 4e10d96b22c65771d1a1319595630c13fe7085ef (diff) | |
download | ydb-54ead21458150a3f3055a5f4077cd5f5309b3818.tar.gz |
Fix incorrectly replying with potentially uncommitted data in reads
-rw-r--r-- | ydb/core/tx/datashard/datashard_pipeline.h | 10 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_order.cpp | 148 |
2 files changed, 154 insertions, 4 deletions
diff --git a/ydb/core/tx/datashard/datashard_pipeline.h b/ydb/core/tx/datashard/datashard_pipeline.h index 36d5995dca4..ffd06be63c5 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.h +++ b/ydb/core/tx/datashard/datashard_pipeline.h @@ -388,8 +388,8 @@ private: return a.Step < b.Step || (a.Step == b.Step && a.TxId < b.TxId); } - friend constexpr bool operator<(const TItem& a, const TRowVersion& b) { - return a.Step < b.Step || (a.Step == b.Step && a.TxId < b.TxId); + friend constexpr bool operator<=(const TItem& a, const TRowVersion& b) { + return a.Step < b.Step || (a.Step == b.Step && a.TxId <= b.TxId); } }; @@ -400,7 +400,9 @@ private: auto res = ItemsSet.emplace(version); if (!res.second) res.first->Counter += 1; - TxIdMap.emplace(txId, res.first); + auto res2 = TxIdMap.emplace(txId, res.first); + Y_VERIFY_S(res2.second, "Unexpected duplicate immediate tx " << txId + << " committing at " << version); } inline void Add(TRowVersion version) { @@ -423,7 +425,7 @@ private: } inline bool HasOpsBelow(TRowVersion upperBound) const { - return bool(ItemsSet) && *ItemsSet.begin() < upperBound; + return bool(ItemsSet) && *ItemsSet.begin() <= upperBound; } private: diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp index 4df58ff07a4..dddd6665065 100644 --- a/ydb/core/tx/datashard/datashard_ut_order.cpp +++ b/ydb/core/tx/datashard/datashard_ut_order.cpp @@ -5422,6 +5422,154 @@ Y_UNIT_TEST_TWIN(UncommittedReadSetAck, UseNewEngine) { )")); } +Y_UNIT_TEST_TWIN(UncommittedReads, UseNewEngine) { + TPortManager pm; + TServerSettings::TControls controls; + controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetEnableLockedWrites(1); + + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetEnableKqpSessionActor(UseNewEngine) + .SetUseRealThreads(false); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_MEDIATOR_TIMECAST, NLog::PRI_TRACE); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + CreateShardedTable(server, sender, "/Root", "table-1", 1); + + auto shards1 = GetTableShards(server, sender, "/Root/table-1"); + + auto isTableShard = [&](ui64 tabletId) -> bool { + return std::find(shards1.begin(), shards1.end(), tabletId) != shards1.end(); + }; + + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)")); + + SimulateSleep(server, TDuration::Seconds(1)); + + // Read the upserted row and also prime shard for unprotected reads + TString sessionId, txId; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, sessionId, txId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + + // Make sure we are at the max immediate write edge for current step and it's confirmed + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2)")); + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 3)")); + + // Block commits and start counting propose responses + TVector<THolder<IEventHandle>> blockedCommits; + size_t seenProposeResults = 0; + auto blockCommits = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto { + switch (ev->GetTypeRewrite()) { + case TEvTablet::TEvCommit::EventType: { + auto* msg = ev->Get<TEvTablet::TEvCommit>(); + if (isTableShard(msg->TabletID)) { + Cerr << "... blocked commit for tablet " << msg->TabletID << Endl; + blockedCommits.push_back(std::move(ev)); + return TTestActorRuntime::EEventAction::DROP; + } + break; + } + case TEvDataShard::TEvProposeTransactionResult::EventType: { + Cerr << "... observed propose transaction result" << Endl; + ++seenProposeResults; + break; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + auto prevObserver = runtime.SetObserverFunc(blockCommits); + + auto waitFor = [&](const auto& condition, const TString& description) { + while (!condition()) { + Cerr << "... waiting for " << description << Endl; + TDispatchOptions options; + options.CustomFinalCondition = [&]() { + return condition(); + }; + runtime.DispatchEvents(options); + } + }; + + // Start upserting a row with blocked commits, it will stick to the same version as the last upsert + auto upsertSender = runtime.AllocateEdgeActor(); + SendRequest(runtime, upsertSender, MakeSimpleRequest(Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (4, 4)"))); + + waitFor([&]{ return blockedCommits.size() > 0; }, "blocked commit"); + + // Start reading data, we know it must read confirmed data, but it will also include the blocked row above + auto readSender = runtime.AllocateEdgeActor(); + SendRequest(runtime, readSender, MakeSimpleRequest(Q_("SELECT key, value FROM `/Root/table-1` ORDER BY key"))); + + // Sleep for 1 second + SimulateSleep(runtime, TDuration::Seconds(1)); + + // We are blocking commits, so read must not see a 4th row until we unblock + if (seenProposeResults > 0) { + // We might make it possible in the future to run reads like that without blocking + // However, it still means we must not return the 4th row that is not committed + auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(readSender); + auto& response = ev->Get()->Record.GetRef(); + UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); + UNIT_ASSERT_VALUES_EQUAL( + response.GetResponse().GetResults()[0].GetValue().ShortDebugString(), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } " + "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " + "} Struct { Bool: false }"); + return; + } + + // Unblock all commits + runtime.SetObserverFunc(prevObserver); + for (auto& ev : blockedCommits) { + runtime.Send(ev.Release(), 0, true); + } + blockedCommits.clear(); + + // We must successfully upsert the row + { + auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(upsertSender); + auto& response = ev->Get()->Record.GetRef(); + UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + } + + // We must successfully read including the 4th row + { + auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(readSender); + auto& response = ev->Get()->Record.GetRef(); + UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); + UNIT_ASSERT_VALUES_EQUAL( + response.GetResponse().GetResults()[0].GetValue().ShortDebugString(), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } " + "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " + "List { Struct { Optional { Uint32: 4 } } Struct { Optional { Uint32: 4 } } } " + "} Struct { Bool: false }"); + } +} + } // Y_UNIT_TEST_SUITE(DataShardOutOfOrder) } // namespace NKikimr |