aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2022-09-06 19:33:10 +0300
committersnaury <snaury@ydb.tech>2022-09-06 19:33:10 +0300
commit54ead21458150a3f3055a5f4077cd5f5309b3818 (patch)
tree308a356dbba7dda083a6cb46905b43bab9fcff4d
parent4e10d96b22c65771d1a1319595630c13fe7085ef (diff)
downloadydb-54ead21458150a3f3055a5f4077cd5f5309b3818.tar.gz
Fix incorrectly replying with potentially uncommitted data in reads
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.h10
-rw-r--r--ydb/core/tx/datashard/datashard_ut_order.cpp148
2 files changed, 154 insertions, 4 deletions
diff --git a/ydb/core/tx/datashard/datashard_pipeline.h b/ydb/core/tx/datashard/datashard_pipeline.h
index 36d5995dca4..ffd06be63c5 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.h
+++ b/ydb/core/tx/datashard/datashard_pipeline.h
@@ -388,8 +388,8 @@ private:
return a.Step < b.Step || (a.Step == b.Step && a.TxId < b.TxId);
}
- friend constexpr bool operator<(const TItem& a, const TRowVersion& b) {
- return a.Step < b.Step || (a.Step == b.Step && a.TxId < b.TxId);
+ friend constexpr bool operator<=(const TItem& a, const TRowVersion& b) {
+ return a.Step < b.Step || (a.Step == b.Step && a.TxId <= b.TxId);
}
};
@@ -400,7 +400,9 @@ private:
auto res = ItemsSet.emplace(version);
if (!res.second)
res.first->Counter += 1;
- TxIdMap.emplace(txId, res.first);
+ auto res2 = TxIdMap.emplace(txId, res.first);
+ Y_VERIFY_S(res2.second, "Unexpected duplicate immediate tx " << txId
+ << " committing at " << version);
}
inline void Add(TRowVersion version) {
@@ -423,7 +425,7 @@ private:
}
inline bool HasOpsBelow(TRowVersion upperBound) const {
- return bool(ItemsSet) && *ItemsSet.begin() < upperBound;
+ return bool(ItemsSet) && *ItemsSet.begin() <= upperBound;
}
private:
diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp
index 4df58ff07a4..dddd6665065 100644
--- a/ydb/core/tx/datashard/datashard_ut_order.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_order.cpp
@@ -5422,6 +5422,154 @@ Y_UNIT_TEST_TWIN(UncommittedReadSetAck, UseNewEngine) {
)"));
}
+Y_UNIT_TEST_TWIN(UncommittedReads, UseNewEngine) {
+ TPortManager pm;
+ TServerSettings::TControls controls;
+ controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetEnableLockedWrites(1);
+
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetEnableKqpSessionActor(UseNewEngine)
+ .SetUseRealThreads(false);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_MEDIATOR_TIMECAST, NLog::PRI_TRACE);
+
+ InitRoot(server, sender);
+
+ TDisableDataShardLogBatching disableDataShardLogBatching;
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+
+ auto shards1 = GetTableShards(server, sender, "/Root/table-1");
+
+ auto isTableShard = [&](ui64 tabletId) -> bool {
+ return std::find(shards1.begin(), shards1.end(), tabletId) != shards1.end();
+ };
+
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)"));
+
+ SimulateSleep(server, TDuration::Seconds(1));
+
+ // Read the upserted row and also prime shard for unprotected reads
+ TString sessionId, txId;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleBegin(runtime, sessionId, txId, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+
+ // Make sure we are at the max immediate write edge for current step and it's confirmed
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2)"));
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 3)"));
+
+ // Block commits and start counting propose responses
+ TVector<THolder<IEventHandle>> blockedCommits;
+ size_t seenProposeResults = 0;
+ auto blockCommits = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto {
+ switch (ev->GetTypeRewrite()) {
+ case TEvTablet::TEvCommit::EventType: {
+ auto* msg = ev->Get<TEvTablet::TEvCommit>();
+ if (isTableShard(msg->TabletID)) {
+ Cerr << "... blocked commit for tablet " << msg->TabletID << Endl;
+ blockedCommits.push_back(std::move(ev));
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ break;
+ }
+ case TEvDataShard::TEvProposeTransactionResult::EventType: {
+ Cerr << "... observed propose transaction result" << Endl;
+ ++seenProposeResults;
+ break;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ auto prevObserver = runtime.SetObserverFunc(blockCommits);
+
+ auto waitFor = [&](const auto& condition, const TString& description) {
+ while (!condition()) {
+ Cerr << "... waiting for " << description << Endl;
+ TDispatchOptions options;
+ options.CustomFinalCondition = [&]() {
+ return condition();
+ };
+ runtime.DispatchEvents(options);
+ }
+ };
+
+ // Start upserting a row with blocked commits, it will stick to the same version as the last upsert
+ auto upsertSender = runtime.AllocateEdgeActor();
+ SendRequest(runtime, upsertSender, MakeSimpleRequest(Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (4, 4)")));
+
+ waitFor([&]{ return blockedCommits.size() > 0; }, "blocked commit");
+
+ // Start reading data, we know it must read confirmed data, but it will also include the blocked row above
+ auto readSender = runtime.AllocateEdgeActor();
+ SendRequest(runtime, readSender, MakeSimpleRequest(Q_("SELECT key, value FROM `/Root/table-1` ORDER BY key")));
+
+ // Sleep for 1 second
+ SimulateSleep(runtime, TDuration::Seconds(1));
+
+ // We are blocking commits, so read must not see a 4th row until we unblock
+ if (seenProposeResults > 0) {
+ // We might make it possible in the future to run reads like that without blocking
+ // However, it still means we must not return the 4th row that is not committed
+ auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(readSender);
+ auto& response = ev->Get()->Record.GetRef();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
+ UNIT_ASSERT_VALUES_EQUAL(
+ response.GetResponse().GetResults()[0].GetValue().ShortDebugString(),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } "
+ "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
+ "} Struct { Bool: false }");
+ return;
+ }
+
+ // Unblock all commits
+ runtime.SetObserverFunc(prevObserver);
+ for (auto& ev : blockedCommits) {
+ runtime.Send(ev.Release(), 0, true);
+ }
+ blockedCommits.clear();
+
+ // We must successfully upsert the row
+ {
+ auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(upsertSender);
+ auto& response = ev->Get()->Record.GetRef();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ }
+
+ // We must successfully read including the 4th row
+ {
+ auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(readSender);
+ auto& response = ev->Get()->Record.GetRef();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
+ UNIT_ASSERT_VALUES_EQUAL(
+ response.GetResponse().GetResults()[0].GetValue().ShortDebugString(),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } "
+ "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
+ "List { Struct { Optional { Uint32: 4 } } Struct { Optional { Uint32: 4 } } } "
+ "} Struct { Bool: false }");
+ }
+}
+
} // Y_UNIT_TEST_SUITE(DataShardOutOfOrder)
} // namespace NKikimr