aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-09-20 17:11:41 +0300
committersnaury <snaury@ydb.tech>2023-09-20 18:10:12 +0300
commit9b0bfe3ad5aa33c2c4e5fcec9677e870c55e5636 (patch)
treec0630af34ca269cfc55aa68bbaafdfb59e07f1bc
parent82a23bf3bafa4db2bbb68ea7a068748f63fafe43 (diff)
downloadydb-9b0bfe3ad5aa33c2c4e5fcec9677e870c55e5636.tar.gz
Fix inconsistent local snapshot read after datashard restart KIKIMR-19395
-rw-r--r--ydb/core/tx/datashard/datashard.cpp28
-rw-r--r--ydb/core/tx/datashard/datashard_ut_snapshot.cpp134
2 files changed, 147 insertions, 15 deletions
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index 86f2a1ef72d..213bfc00616 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -2295,22 +2295,20 @@ void TDataShard::CheckMediatorStateRestored() {
}
// CoordinatorPrevReadStepMax shows us what is the next minimum step that
- // may be acquired as a snapshot. This tells as that no previous read
- // could have happened after this step, even if it has been acquired.
+ // may be acquired as a snapshot. This tells us that no previous read
+ // could have happened after this step, even if it was acquired, since it
+ // would have been waiting until mediator time advances to that step.
// CoordinatorPrevReadStepMin shows us the maximum step that could have
- // been acquired before we subscribed. Even if the next step is very
- // large it may be used to infer an erlier step, as previous generation
- // could not have read any step that was not acquired.
- // When some coordinators are still pending we use CoordinatorPrevReadStepMax
- // as a worst case read step in the future, hoping to make a tighter
- // prediction while we wait for that.
- // Note we always need to wait for CoordinatorPrevReadStepMax because
- // previous generation may have observed it and may have replied to
- // immediate writes at that step.
+ // been acquired at coordinators before we subscribed, however this does
+ // not include possible local snapshots that could have been acquired by a
+ // previous generation during iterator reads, so we have to always use
+ // CoordinatorPrevReadStepMax as a worst case possible readStep.
+ // Note we always need to wait for CoordinatorPrevReadStepMax even without
+ // local snapshots, because previous generation may have observed it and
+ // may have replied to immediate writes at that step, and new immediate
+ // HEAD reads must include that in their results.
const ui64 waitStep = CoordinatorPrevReadStepMax;
- const ui64 readStep = CoordinatorSubscriptionsPending == 0
- ? Min(CoordinatorPrevReadStepMax, CoordinatorPrevReadStepMin)
- : CoordinatorPrevReadStepMax;
+ const ui64 readStep = CoordinatorPrevReadStepMax;
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "CheckMediatorStateRestored: waitStep# " << waitStep << " readStep# " << readStep);
@@ -3106,7 +3104,7 @@ void TDataShard::Handle(TEvMediatorTimecast::TEvSubscribeReadStepResult::TPtr& e
"Got TEvMediatorTimecast::TEvSubscribeReadStepResult at " << TabletID()
<< " coordinator " << msg->CoordinatorId
<< " last step " << msg->LastReadStep
- << " next step " << msg->ReadStep->Get());
+ << " next step " << msg->NextReadStep);
auto it = CoordinatorSubscriptionById.find(msg->CoordinatorId);
Y_VERIFY_S(it != CoordinatorSubscriptionById.end(),
"Unexpected TEvSubscribeReadStepResult for coordinator " << msg->CoordinatorId);
diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
index bc309da9f1a..cce17c176a3 100644
--- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
@@ -3639,6 +3639,140 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
"{ items { uint32_value: 2 } items { uint32_value: 2 } items { uint32_value: 22 } }");
}
+ Y_UNIT_TEST(ReadIteratorLocalSnapshotThenRestart) {
+ NKikimrConfig::TAppConfig app;
+ app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(true);
+
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetAppConfig(app);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ TDisableDataShardLogBatching disableDataShardLogBatching;
+ auto opts = TShardedTableOptions()
+ .Shards(1)
+ .Columns({
+ {"key", "Uint32", true, false},
+ {"value", "Uint32", false, false}});
+ CreateShardedTable(server, sender, "/Root", "table-1", opts);
+ CreateShardedTable(server, sender, "/Root", "table-2", opts);
+
+ const auto shards1 = GetTableShards(server, sender, "/Root/table-1");
+
+ // Perform a snapshot read, this will persist "reads from snapshots" flag
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ SELECT key, value
+ FROM `/Root/table-1`
+ UNION ALL
+ SELECT key, value
+ FROM `/Root/table-2`
+ )")),
+ "");
+
+ // Insert rows using a single-shard write
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1), (2, 2), (3, 3)"));
+
+ bool haveReadResult = false;
+ bool haveReadResultSnapshot = false;
+ bool blockReads = false;
+ bool blockReadAcks = true;
+ bool blockReadResults = true;
+ std::vector<std::unique_ptr<IEventHandle>> reads;
+ std::vector<std::unique_ptr<IEventHandle>> readAcks;
+ std::vector<std::unique_ptr<IEventHandle>> readResults;
+ auto observer = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto {
+ switch (ev->GetTypeRewrite()) {
+ case TEvDataShard::TEvRead::EventType: {
+ auto* msg = ev->Get<TEvDataShard::TEvRead>();
+ if (blockReads) {
+ reads.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ msg->Record.SetMaxRowsInResult(1);
+ break;
+ }
+ case TEvDataShard::TEvReadResult::EventType: {
+ auto* msg = ev->Get<TEvDataShard::TEvReadResult>();
+ if (!haveReadResult) {
+ haveReadResult = true;
+ haveReadResultSnapshot = msg->Record.HasSnapshot();
+ break;
+ }
+ if (blockReadResults) {
+ readResults.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ break;
+ }
+ case TEvDataShard::TEvReadAck::EventType: {
+ if (blockReadAcks) {
+ readAcks.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ break;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ auto prevObserver = runtime.SetObserverFunc(observer);
+
+ TString sessionId = CreateSessionRPC(runtime, "/Root");
+ auto readFuture = SendRequest(runtime,
+ MakeSimpleRequestRPC("SELECT key, value FROM `/Root/table-1` ORDER BY key", sessionId, "", true /* commitTx */),
+ "/Root");
+
+ auto waitFor = [&](const auto& condition, const TString& description) {
+ if (!condition()) {
+ Cerr << "... waiting for " << description << Endl;
+ TDispatchOptions options;
+ options.CustomFinalCondition = [&]() {
+ return condition();
+ };
+ runtime.DispatchEvents(options);
+ UNIT_ASSERT_C(condition(), "... failed to wait for " << description);
+ }
+ };
+
+ waitFor([&]{ return haveReadResult; }, "read result");
+ UNIT_ASSERT(haveReadResultSnapshot);
+
+ blockReads = true;
+ RebootTablet(runtime, shards1.at(0), sender);
+ waitFor([&]{ return reads.size() > 0; }, "read retry");
+ UNIT_ASSERT_VALUES_EQUAL(reads.size(), 1u);
+
+ // Update all keys in a single operation
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 11), (2, 22), (3, 33)"));
+
+ blockReads = false;
+ blockReadAcks = false;
+ blockReadResults = false;
+ readAcks.clear();
+ readResults.clear();
+ for (auto& ev : reads) {
+ runtime.Send(ev.release(), 0, true);
+ }
+ reads.clear();
+
+ auto readResponse = AwaitResponse(runtime, std::move(readFuture));
+ UNIT_ASSERT_VALUES_EQUAL(
+ FormatResult(readResponse),
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
+ "{ items { uint32_value: 2 } items { uint32_value: 2 } }, "
+ "{ items { uint32_value: 3 } items { uint32_value: 3 } }");
+ }
+
}
} // namespace NKikimr