diff options
author | snaury <snaury@ydb.tech> | 2023-09-20 17:11:41 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-09-20 18:10:12 +0300 |
commit | 9b0bfe3ad5aa33c2c4e5fcec9677e870c55e5636 (patch) | |
tree | c0630af34ca269cfc55aa68bbaafdfb59e07f1bc | |
parent | 82a23bf3bafa4db2bbb68ea7a068748f63fafe43 (diff) | |
download | ydb-9b0bfe3ad5aa33c2c4e5fcec9677e870c55e5636.tar.gz |
Fix inconsistent local snapshot read after datashard restart KIKIMR-19395
-rw-r--r-- | ydb/core/tx/datashard/datashard.cpp | 28 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_snapshot.cpp | 134 |
2 files changed, 147 insertions, 15 deletions
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 86f2a1ef72d..213bfc00616 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -2295,22 +2295,20 @@ void TDataShard::CheckMediatorStateRestored() { } // CoordinatorPrevReadStepMax shows us what is the next minimum step that - // may be acquired as a snapshot. This tells as that no previous read - // could have happened after this step, even if it has been acquired. + // may be acquired as a snapshot. This tells us that no previous read + // could have happened after this step, even if it was acquired, since it + // would have been waiting until mediator time advances to that step. // CoordinatorPrevReadStepMin shows us the maximum step that could have - // been acquired before we subscribed. Even if the next step is very - // large it may be used to infer an erlier step, as previous generation - // could not have read any step that was not acquired. - // When some coordinators are still pending we use CoordinatorPrevReadStepMax - // as a worst case read step in the future, hoping to make a tighter - // prediction while we wait for that. - // Note we always need to wait for CoordinatorPrevReadStepMax because - // previous generation may have observed it and may have replied to - // immediate writes at that step. + // been acquired at coordinators before we subscribed, however this does + // not include possible local snapshots that could have been acquired by a + // previous generation during iterator reads, so we have to always use + // CoordinatorPrevReadStepMax as a worst case possible readStep. + // Note we always need to wait for CoordinatorPrevReadStepMax even without + // local snapshots, because previous generation may have observed it and + // may have replied to immediate writes at that step, and new immediate + // HEAD reads must include that in their results. const ui64 waitStep = CoordinatorPrevReadStepMax; - const ui64 readStep = CoordinatorSubscriptionsPending == 0 - ? Min(CoordinatorPrevReadStepMax, CoordinatorPrevReadStepMin) - : CoordinatorPrevReadStepMax; + const ui64 readStep = CoordinatorPrevReadStepMax; LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "CheckMediatorStateRestored: waitStep# " << waitStep << " readStep# " << readStep); @@ -3106,7 +3104,7 @@ void TDataShard::Handle(TEvMediatorTimecast::TEvSubscribeReadStepResult::TPtr& e "Got TEvMediatorTimecast::TEvSubscribeReadStepResult at " << TabletID() << " coordinator " << msg->CoordinatorId << " last step " << msg->LastReadStep - << " next step " << msg->ReadStep->Get()); + << " next step " << msg->NextReadStep); auto it = CoordinatorSubscriptionById.find(msg->CoordinatorId); Y_VERIFY_S(it != CoordinatorSubscriptionById.end(), "Unexpected TEvSubscribeReadStepResult for coordinator " << msg->CoordinatorId); diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp index bc309da9f1a..cce17c176a3 100644 --- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp +++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp @@ -3639,6 +3639,140 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { "{ items { uint32_value: 2 } items { uint32_value: 2 } items { uint32_value: 22 } }"); } + Y_UNIT_TEST(ReadIteratorLocalSnapshotThenRestart) { + NKikimrConfig::TAppConfig app; + app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(true); + + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetAppConfig(app); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + auto opts = TShardedTableOptions() + .Shards(1) + .Columns({ + {"key", "Uint32", true, false}, + {"value", "Uint32", false, false}}); + CreateShardedTable(server, sender, "/Root", "table-1", opts); + CreateShardedTable(server, sender, "/Root", "table-2", opts); + + const auto shards1 = GetTableShards(server, sender, "/Root/table-1"); + + // Perform a snapshot read, this will persist "reads from snapshots" flag + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + SELECT key, value + FROM `/Root/table-1` + UNION ALL + SELECT key, value + FROM `/Root/table-2` + )")), + ""); + + // Insert rows using a single-shard write + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1), (2, 2), (3, 3)")); + + bool haveReadResult = false; + bool haveReadResultSnapshot = false; + bool blockReads = false; + bool blockReadAcks = true; + bool blockReadResults = true; + std::vector<std::unique_ptr<IEventHandle>> reads; + std::vector<std::unique_ptr<IEventHandle>> readAcks; + std::vector<std::unique_ptr<IEventHandle>> readResults; + auto observer = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto { + switch (ev->GetTypeRewrite()) { + case TEvDataShard::TEvRead::EventType: { + auto* msg = ev->Get<TEvDataShard::TEvRead>(); + if (blockReads) { + reads.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + msg->Record.SetMaxRowsInResult(1); + break; + } + case TEvDataShard::TEvReadResult::EventType: { + auto* msg = ev->Get<TEvDataShard::TEvReadResult>(); + if (!haveReadResult) { + haveReadResult = true; + haveReadResultSnapshot = msg->Record.HasSnapshot(); + break; + } + if (blockReadResults) { + readResults.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + break; + } + case TEvDataShard::TEvReadAck::EventType: { + if (blockReadAcks) { + readAcks.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + break; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + auto prevObserver = runtime.SetObserverFunc(observer); + + TString sessionId = CreateSessionRPC(runtime, "/Root"); + auto readFuture = SendRequest(runtime, + MakeSimpleRequestRPC("SELECT key, value FROM `/Root/table-1` ORDER BY key", sessionId, "", true /* commitTx */), + "/Root"); + + auto waitFor = [&](const auto& condition, const TString& description) { + if (!condition()) { + Cerr << "... waiting for " << description << Endl; + TDispatchOptions options; + options.CustomFinalCondition = [&]() { + return condition(); + }; + runtime.DispatchEvents(options); + UNIT_ASSERT_C(condition(), "... failed to wait for " << description); + } + }; + + waitFor([&]{ return haveReadResult; }, "read result"); + UNIT_ASSERT(haveReadResultSnapshot); + + blockReads = true; + RebootTablet(runtime, shards1.at(0), sender); + waitFor([&]{ return reads.size() > 0; }, "read retry"); + UNIT_ASSERT_VALUES_EQUAL(reads.size(), 1u); + + // Update all keys in a single operation + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 11), (2, 22), (3, 33)")); + + blockReads = false; + blockReadAcks = false; + blockReadResults = false; + readAcks.clear(); + readResults.clear(); + for (auto& ev : reads) { + runtime.Send(ev.release(), 0, true); + } + reads.clear(); + + auto readResponse = AwaitResponse(runtime, std::move(readFuture)); + UNIT_ASSERT_VALUES_EQUAL( + FormatResult(readResponse), + "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 2 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 3 } }"); + } + } } // namespace NKikimr |