diff options
author | Alexey Borzenkov <snaury@yandex-team.ru> | 2022-04-07 00:00:25 +0300 |
---|---|---|
committer | Alexey Borzenkov <snaury@yandex-team.ru> | 2022-04-07 00:00:25 +0300 |
commit | fa5a33da2c010ab24687352fbad18d5128d68802 (patch) | |
tree | 14d2531a81dfce81914cc8d8fb2a1eb2abdb8054 | |
parent | f657a6d0589c229e17caf52f740ab030119f25c2 (diff) | |
download | ydb-fa5a33da2c010ab24687352fbad18d5128d68802.tar.gz |
Wait for correct mediator time on datashard restore, KIKIMR-13910
ref:8b28434653a69ff2f5d194702be762b5d12372cb
-rw-r--r-- | ydb/core/tx/datashard/datashard.cpp | 28 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_order.cpp | 218 |
2 files changed, 238 insertions, 8 deletions
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 01828ab4e12..242cbd733fa 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -1598,29 +1598,41 @@ void TDataShard::CheckMediatorStateRestored() { // When some coordinators are still pending we use CoordinatorPrevReadStepMax // as a worst case read step in the future, hoping to make a tighter // prediction while we wait for that. - const ui64 step = CoordinatorSubscriptionsPending == 0 + // Note we always need to wait for CoordinatorPrevReadStepMax because + // previous generation may have observed it and may have replied to + // immediate writes at that step. + const ui64 waitStep = CoordinatorPrevReadStepMax; + const ui64 readStep = CoordinatorSubscriptionsPending == 0 ? Min(CoordinatorPrevReadStepMax, CoordinatorPrevReadStepMin) : CoordinatorPrevReadStepMax; + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "CheckMediatorStateRestored: waitStep# " << waitStep << " readStep# " << readStep); + // WARNING: we must perform this check BEFORE we update unprotected read edge // We may enter this code path multiple times, and we expect that the above // read step may be refined while we wait based on pessimistic backup step. - if (GetMaxObservedStep() < step) { + if (GetMaxObservedStep() < waitStep) { // We need to wait until we observe mediator step that is at least // as large as the step we found. - if (MediatorTimeCastWaitingSteps.insert(step).second) { - Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvWaitPlanStep(TabletID(), step)); - LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Waiting for PlanStep# " << step << " from mediator time cast"); + if (MediatorTimeCastWaitingSteps.insert(waitStep).second) { + Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvWaitPlanStep(TabletID(), waitStep)); + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Waiting for PlanStep# " << waitStep << " from mediator time cast"); } return; } // Using the inferred last read step we restore the pessimistic unprotected // read edge. Note we only need to do so if there have actually been any - // unprotected reads in this datashard history. - const TRowVersion lastReadEdge(step, Max<ui64>()); + // unprotected reads in this datashard history. We also need to make sure + // this edge is at least one smaller than ImmediateWriteEdge when we know + // we started unconfirmed immediate writes in the last generation. if (SnapshotManager.GetPerformedUnprotectedReads()) { - SnapshotManager.PromoteUnprotectedReadEdge(lastReadEdge); + const TRowVersion lastReadEdge(readStep, Max<ui64>()); + const TRowVersion preImmediateWriteEdge = + SnapshotManager.GetImmediateWriteEdge().Step > SnapshotManager.GetCompleteEdge().Step + ? SnapshotManager.GetImmediateWriteEdge().Prev() + : TRowVersion::Min(); + SnapshotManager.PromoteUnprotectedReadEdge(Max(lastReadEdge, preImmediateWriteEdge)); } // Promote the replied immediate write edge up to the currently observed step diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp index 3a568664ae4..6ad0dc0fbf0 100644 --- a/ydb/core/tx/datashard/datashard_ut_order.cpp +++ b/ydb/core/tx/datashard/datashard_ut_order.cpp @@ -4913,6 +4913,224 @@ Y_UNIT_TEST_QUAD(TestSnapshotReadPriority, UnprotectedReads, UseNewEngine) { "} Struct { Bool: false }"); } +Y_UNIT_TEST_TWIN(TestUnprotectedReadsThenWriteVisibility, UseNewEngine) { + TPortManager pm; + TServerSettings::TControls controls; + controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetNodeCount(2) + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetControls(controls) + .SetUseRealThreads(false); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_MEDIATOR_TIMECAST, NLog::PRI_TRACE); + + InitRoot(server, sender); + + const ui64 hiveTabletId = ChangeStateStorage(Hive, server->GetSettings().Domain); + + struct TNodeState { + // mediator -> bucket -> [observed, passed] step + THashMap<ui64, THashMap<ui32, std::pair<ui64, ui64>>> Steps; + ui64 AllowedStep = 0; + }; + THashMap<ui32, TNodeState> mediatorState; + + bool mustWaitForSteps[2] = { false, false }; + + auto captureTimecast = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto { + const ui32 nodeId = ev->GetRecipientRewrite().NodeId(); + const ui32 nodeIndex = nodeId - runtime.GetNodeId(0); + switch (ev->GetTypeRewrite()) { + case TEvMediatorTimecast::TEvUpdate::EventType: { + auto* msg = ev->Get<TEvMediatorTimecast::TEvUpdate>(); + const ui64 mediatorId = msg->Record.GetMediator(); + const ui32 bucket = msg->Record.GetBucket(); + ui64 step = msg->Record.GetTimeBarrier(); + auto& state = mediatorState[nodeId]; + if (!mustWaitForSteps[nodeIndex]) { + // Automatically allow all new steps + state.AllowedStep = Max(state.AllowedStep, step); + } + Cerr << "... node " << nodeId << " observed update from " << mediatorId + << " for bucket " << bucket + << " to step " << step + << " (allowed " << state.AllowedStep << ")" + << Endl; + auto& [observedStep, passedStep] = state.Steps[mediatorId][bucket]; + observedStep = Max(observedStep, step); + if (step >= passedStep) { + if (step < state.AllowedStep) { + step = state.AllowedStep; + msg->Record.SetTimeBarrier(step); + Cerr << "... shifted to allowed step " << step << Endl; + } + passedStep = step; + break; + } + return TTestActorRuntime::EEventAction::DROP; + } + case TEvMediatorTimecast::TEvWaitPlanStep::EventType: { + const auto* msg = ev->Get<TEvMediatorTimecast::TEvWaitPlanStep>(); + const ui64 tabletId = msg->TabletId; + const ui64 step = msg->PlanStep; + Cerr << "... node " << nodeId << " observed wait by " << tabletId + << " for step " << step + << Endl; + auto& state = mediatorState[nodeId]; + if (state.AllowedStep < step) { + state.AllowedStep = step; + for (auto& kv1 : state.Steps) { + const ui64 mediatorId = kv1.first; + for (auto& kv2 : kv1.second) { + const ui32 bucket = kv2.first; + auto& [observedStep, passedStep] = kv2.second; + if (passedStep < step && passedStep < observedStep) { + passedStep = Min(step, observedStep); + auto* update = new TEvMediatorTimecast::TEvUpdate(); + update->Record.SetMediator(mediatorId); + update->Record.SetBucket(bucket); + update->Record.SetTimeBarrier(passedStep); + runtime.Send(new IEventHandle(ev->GetRecipientRewrite(), ev->GetRecipientRewrite(), update), nodeIndex, /* viaActorSystem */ true); + } + } + } + } + break; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + auto prevObserverFunc = runtime.SetObserverFunc(captureTimecast); + + TDisableDataShardLogBatching disableDataShardLogBatching; + CreateShardedTable(server, sender, "/Root", "table-1", 1); + + auto table1shards = GetTableShards(server, sender, "/Root/table-1"); + + // Make sure tablet is at node 1 + runtime.SendToPipe(hiveTabletId, sender, new TEvHive::TEvFillNode(runtime.GetNodeId(0))); + { + auto ev = runtime.GrabEdgeEventRethrow<TEvHive::TEvFillNodeResult>(sender); + UNIT_ASSERT(ev->Get()->Record.GetStatus() == NKikimrProto::OK); + } + + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)")); + + SimulateSleep(server, TDuration::Seconds(1)); + + auto execSimpleRequest = [&](const TString& query) -> TString { + auto reqSender = runtime.AllocateEdgeActor(); + auto ev = ExecRequest(runtime, reqSender, MakeSimpleRequest(query)); + auto& response = ev->Get()->Record.GetRef(); + UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); + return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + }; + + auto beginSnapshotRequest = [&](TString& sessionId, TString& txId, const TString& query) -> TString { + auto reqSender = runtime.AllocateEdgeActor(); + sessionId = CreateSession(runtime, reqSender); + auto ev = ExecRequest(runtime, reqSender, MakeBeginRequest(sessionId, query)); + auto& response = ev->Get()->Record.GetRef(); + UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + txId = response.GetResponse().GetTxMeta().id(); + UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); + return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + }; + + auto continueSnapshotRequest = [&](const TString& sessionId, const TString& txId, const TString& query) -> TString { + auto reqSender = runtime.AllocateEdgeActor(); + auto ev = ExecRequest(runtime, reqSender, MakeContinueRequest(sessionId, txId, query)); + auto& response = ev->Get()->Record.GetRef(); + UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); + return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + }; + + auto execSnapshotRequest = [&](const TString& query) -> TString { + auto reqSender = runtime.AllocateEdgeActor(); + TString sessionId, txId; + TString result = beginSnapshotRequest(sessionId, txId, query); + CloseSession(runtime, reqSender, sessionId); + return result; + }; + + // Perform an immediate read, we should observe the initial write + UNIT_ASSERT_VALUES_EQUAL( + execSimpleRequest(Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + + // Same when using a fresh snapshot read + TString sessionId, txId; + UNIT_ASSERT_VALUES_EQUAL( + beginSnapshotRequest(sessionId, txId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + + // Stop updating mediator timecast on the second node + mustWaitForSteps[1] = true; + + // Insert a new row and wait for result + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2)")); + + // Make sure tablet is at node 2 + runtime.SendToPipe(hiveTabletId, sender, new TEvHive::TEvFillNode(runtime.GetNodeId(1))); + { + auto ev = runtime.GrabEdgeEventRethrow<TEvHive::TEvFillNodeResult>(sender); + UNIT_ASSERT(ev->Get()->Record.GetStatus() == NKikimrProto::OK); + } + + // Perform an immediate read, we should observe confirmed writes after restart + UNIT_ASSERT_VALUES_EQUAL( + execSimpleRequest(Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } " + "} Struct { Bool: false }"); + + // Previous snapshot must see original data + UNIT_ASSERT_VALUES_EQUAL( + continueSnapshotRequest(sessionId, txId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + + // However new snapshots must see updated data + UNIT_ASSERT_VALUES_EQUAL( + execSnapshotRequest(Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } " + "} Struct { Bool: false }"); +} + } // Y_UNIT_TEST_SUITE(DataShardOutOfOrder) } // namespace NKikimr |