diff options
authorAleksei Borzenkov <snaury@ydb.tech>2024-01-26 18:03:02 +0300
committerGitHub <noreply@github.com>2024-01-26 18:03:02 +0300
commit169272ca676e67b63b1e66619093863e1e55e9db (patch)
parent7ebcfd058d924bcc8c23da70e034f7415687885c (diff)
Make sure out-of-order volatile commits are not visible on followers KIKIMR-20853 (#1344)
5 files changed, 149 insertions, 2 deletions
diff --git a/ydb/core/tablet/tablet_resolver.cpp b/ydb/core/tablet/tablet_resolver.cpp
index 5c5e5bdc0d..e6c1508d90 100644
--- a/ydb/core/tablet/tablet_resolver.cpp
+++ b/ydb/core/tablet/tablet_resolver.cpp
@@ -649,8 +649,9 @@ class TTabletResolver : public TActorBootstrapped<TTabletResolver> {
if (!(entry.KnownLeaderTablet == msg->CurrentLeaderTablet || !entry.KnownLeaderTablet)) {
DropEntry(tabletId, entry, ctx); // got info but not full, occurs on transitional cluster states
} else {
- entry.KnownLeaderTablet = msg->CurrentLeaderTablet;
entry.State = TEntry::StProblemPing;
+ entry.KnownLeaderTablet = msg->CurrentLeaderTablet;
+ entry.KnownFollowers = std::move(msg->Followers);
SendPing(tabletId, entry, ctx);
} else {
diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
index 31b6456c8a..8ae953f629 100644
--- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
@@ -1992,6 +1992,127 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
UNIT_ASSERT(splitLatency < TDuration::Seconds(5));
+ Y_UNIT_TEST(DistributedOutOfOrderFollowerConsistency) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetNodeCount(1)
+ .SetUseRealThreads(false)
+ .SetEnableDataShardVolatileTransactions(true);
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::TABLET_RESOLVER, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::STATESTORAGE, NLog::PRI_TRACE);
+ InitRoot(server, sender);
+ auto opts = TShardedTableOptions()
+ .Shards(1)
+ .Followers(1);
+ CreateShardedTable(server, sender, "/Root", "table-1", opts);
+ CreateShardedTable(server, sender, "/Root", "table-2", opts);
+ runtime.SimulateSleep(TDuration::Seconds(1));
+ for (ui64 shard : GetTableShards(server, sender, "/Root/table-1")) {
+ InvalidateTabletResolverCache(runtime, shard);
+ }
+ for (ui64 shard : GetTableShards(server, sender, "/Root/table-2")) {
+ InvalidateTabletResolverCache(runtime, shard);
+ }
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);");
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 2);");
+ // Let followers catch up
+ runtime.SimulateSleep(TDuration::Seconds(1));
+ // Block readset exchange
+ std::vector<std::unique_ptr<IEventHandle>> readSets;
+ auto blockReadSets = runtime.AddObserver<TEvTxProcessing::TEvReadSet>([&](TEvTxProcessing::TEvReadSet::TPtr& ev) {
+ readSets.emplace_back(ev.Release());
+ });
+ // Start a distributed write to both tables
+ TString sessionId = CreateSessionRPC(runtime, "/Root");
+ auto upsertResult = SendRequest(
+ runtime,
+ MakeSimpleRequestRPC(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 3);
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 4);
+ )", sessionId, /* txId */ "", /* commitTx */ true),
+ "/Root");
+ WaitFor(runtime, [&]{ return readSets.size() >= 4; }, "readsets");
+ // Stop blocking further readsets
+ blockReadSets.Remove();
+ // Start another distributed write to both tables, it should succeed
+ ExecSQL(server, sender, R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (5, 5);
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (6, 6);
+ )");
+ // Let followers catch up
+ runtime.SimulateSleep(TDuration::Seconds(1));
+ for (ui64 shard : GetTableShards(server, sender, "/Root/table-1")) {
+ InvalidateTabletResolverCache(runtime, shard);
+ }
+ for (ui64 shard : GetTableShards(server, sender, "/Root/table-2")) {
+ InvalidateTabletResolverCache(runtime, shard);
+ }
+ // Check tables, they shouldn't see inconsistent results with the latest write
+ KqpSimpleStaleRoExec(runtime, Q_(R"(
+ SELECT key, value
+ FROM `/Root/table-1`
+ ORDER BY key
+ )"), "/Root"),
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }");
+ KqpSimpleStaleRoExec(runtime, Q_(R"(
+ SELECT key, value
+ FROM `/Root/table-2`
+ ORDER BY key
+ )"), "/Root"),
+ "{ items { uint32_value: 2 } items { uint32_value: 2 } }");
+ // Unblock readsets
+ for (auto& ev : readSets) {
+ ui32 nodeIndex = ev->GetRecipientRewrite().NodeId() - runtime.GetNodeId(0);
+ runtime.Send(ev.release(), nodeIndex, true);
+ }
+ readSets.clear();
+ // Let followers catch up
+ runtime.SimulateSleep(TDuration::Seconds(1));
+ // Check tables again, they should have all rows visible now
+ KqpSimpleStaleRoExec(runtime, Q_(R"(
+ SELECT key, value
+ FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
+ "{ items { uint32_value: 3 } items { uint32_value: 3 } }, "
+ "{ items { uint32_value: 5 } items { uint32_value: 5 } }");
+ KqpSimpleStaleRoExec(runtime, Q_(R"(
+ SELECT key, value
+ FROM `/Root/table-2`
+ ORDER BY key
+ )")),
+ "{ items { uint32_value: 2 } items { uint32_value: 2 } }, "
+ "{ items { uint32_value: 4 } items { uint32_value: 4 } }, "
+ "{ items { uint32_value: 6 } items { uint32_value: 6 } }");
+ }
} // Y_UNIT_TEST_SUITE(DataShardVolatile)
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/follower_edge.cpp b/ydb/core/tx/datashard/follower_edge.cpp
index 0663f004fd..69de5fe15b 100644
--- a/ydb/core/tx/datashard/follower_edge.cpp
+++ b/ydb/core/tx/datashard/follower_edge.cpp
@@ -6,13 +6,24 @@ std::tuple<TRowVersion, bool, ui64> TDataShard::CalculateFollowerReadEdge() cons
+ TRowVersion volatileUncertain = VolatileTxManager.GetMinUncertainVersion();
for (auto order : TransQueue.GetPlan()) {
// When we have planned operations we assume the first one may be used
// for new writes, so we mark is as non-repeatable. We could skip
// readonly operations, but there's little benefit in that, and it's
// complicated to determine which is the first readable given we may
// have executed some out of order.
- return { TRowVersion(order.Step, order.TxId), false, 0 };
+ return { Min(volatileUncertain, TRowVersion(order.Step, order.TxId)), false, 0 };
+ }
+ if (!volatileUncertain.IsMax()) {
+ // We have some uncertainty in an unresolved volatile commit
+ // Allow followers to read from it in non-repeatable snapshot modes
+ // FIXME: when at least one write is committed at this version, it
+ // should stop being non-repeatable, and followers need to resolve
+ // other possibly out-of-order commits.
+ return { volatileUncertain, false, 0 };
// This is the max version where we had any writes
diff --git a/ydb/core/tx/datashard/volatile_tx.cpp b/ydb/core/tx/datashard/volatile_tx.cpp
index 470fa7ef44..35af1fd255 100644
--- a/ydb/core/tx/datashard/volatile_tx.cpp
+++ b/ydb/core/tx/datashard/volatile_tx.cpp
@@ -598,12 +598,18 @@ namespace NKikimr::NDataShard {
+ TRowVersion prevUncertain = GetMinUncertainVersion();
for (ui64 commitTxId : info->CommitTxIds) {
+ if (prevUncertain < GetMinUncertainVersion()) {
+ Self->PromoteFollowerReadEdge();
+ }
if (!WaitingSnapshotEvents.empty()) {
TVolatileTxInfo* next = !VolatileTxByVersion.empty() ? *VolatileTxByVersion.begin() : nullptr;
while (!WaitingSnapshotEvents.empty()) {
diff --git a/ydb/core/tx/datashard/volatile_tx.h b/ydb/core/tx/datashard/volatile_tx.h
index ca7c05e205..ab7533038e 100644
--- a/ydb/core/tx/datashard/volatile_tx.h
+++ b/ydb/core/tx/datashard/volatile_tx.h
@@ -197,6 +197,14 @@ namespace NKikimr::NDataShard {
return !VolatileTxByVersion.empty() && (*VolatileTxByVersion.begin())->Version <= snapshot;
+ TRowVersion GetMinUncertainVersion() const {
+ if (!VolatileTxByVersion.empty()) {
+ return (*VolatileTxByVersion.begin())->Version;
+ } else {
+ return TRowVersion::Max();
+ }
+ }
void PersistAddVolatileTx(
ui64 txId, const TRowVersion& version,
TConstArrayRef<ui64> commitTxIds,