aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Borzenkov <snaury@yandex-team.ru>2022-04-07 00:00:25 +0300
committerAlexey Borzenkov <snaury@yandex-team.ru>2022-04-07 00:00:25 +0300
commitfa5a33da2c010ab24687352fbad18d5128d68802 (patch)
tree14d2531a81dfce81914cc8d8fb2a1eb2abdb8054
parentf657a6d0589c229e17caf52f740ab030119f25c2 (diff)
downloadydb-fa5a33da2c010ab24687352fbad18d5128d68802.tar.gz
Wait for correct mediator time on datashard restore, KIKIMR-13910
ref:8b28434653a69ff2f5d194702be762b5d12372cb
-rw-r--r--ydb/core/tx/datashard/datashard.cpp28
-rw-r--r--ydb/core/tx/datashard/datashard_ut_order.cpp218
2 files changed, 238 insertions, 8 deletions
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index 01828ab4e12..242cbd733fa 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -1598,29 +1598,41 @@ void TDataShard::CheckMediatorStateRestored() {
// When some coordinators are still pending we use CoordinatorPrevReadStepMax
// as a worst case read step in the future, hoping to make a tighter
// prediction while we wait for that.
- const ui64 step = CoordinatorSubscriptionsPending == 0
+ // Note we always need to wait for CoordinatorPrevReadStepMax because
+ // previous generation may have observed it and may have replied to
+ // immediate writes at that step.
+ const ui64 waitStep = CoordinatorPrevReadStepMax;
+ const ui64 readStep = CoordinatorSubscriptionsPending == 0
? Min(CoordinatorPrevReadStepMax, CoordinatorPrevReadStepMin)
: CoordinatorPrevReadStepMax;
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "CheckMediatorStateRestored: waitStep# " << waitStep << " readStep# " << readStep);
+
// WARNING: we must perform this check BEFORE we update unprotected read edge
// We may enter this code path multiple times, and we expect that the above
// read step may be refined while we wait based on pessimistic backup step.
- if (GetMaxObservedStep() < step) {
+ if (GetMaxObservedStep() < waitStep) {
// We need to wait until we observe mediator step that is at least
// as large as the step we found.
- if (MediatorTimeCastWaitingSteps.insert(step).second) {
- Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvWaitPlanStep(TabletID(), step));
- LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Waiting for PlanStep# " << step << " from mediator time cast");
+ if (MediatorTimeCastWaitingSteps.insert(waitStep).second) {
+ Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvWaitPlanStep(TabletID(), waitStep));
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Waiting for PlanStep# " << waitStep << " from mediator time cast");
}
return;
}
// Using the inferred last read step we restore the pessimistic unprotected
// read edge. Note we only need to do so if there have actually been any
- // unprotected reads in this datashard history.
- const TRowVersion lastReadEdge(step, Max<ui64>());
+ // unprotected reads in this datashard history. We also need to make sure
+ // this edge is at least one smaller than ImmediateWriteEdge when we know
+ // we started unconfirmed immediate writes in the last generation.
if (SnapshotManager.GetPerformedUnprotectedReads()) {
- SnapshotManager.PromoteUnprotectedReadEdge(lastReadEdge);
+ const TRowVersion lastReadEdge(readStep, Max<ui64>());
+ const TRowVersion preImmediateWriteEdge =
+ SnapshotManager.GetImmediateWriteEdge().Step > SnapshotManager.GetCompleteEdge().Step
+ ? SnapshotManager.GetImmediateWriteEdge().Prev()
+ : TRowVersion::Min();
+ SnapshotManager.PromoteUnprotectedReadEdge(Max(lastReadEdge, preImmediateWriteEdge));
}
// Promote the replied immediate write edge up to the currently observed step
diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp
index 3a568664ae4..6ad0dc0fbf0 100644
--- a/ydb/core/tx/datashard/datashard_ut_order.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_order.cpp
@@ -4913,6 +4913,224 @@ Y_UNIT_TEST_QUAD(TestSnapshotReadPriority, UnprotectedReads, UseNewEngine) {
"} Struct { Bool: false }");
}
+Y_UNIT_TEST_TWIN(TestUnprotectedReadsThenWriteVisibility, UseNewEngine) {
+ TPortManager pm;
+ TServerSettings::TControls controls;
+ controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1);
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetNodeCount(2)
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetControls(controls)
+ .SetUseRealThreads(false);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_MEDIATOR_TIMECAST, NLog::PRI_TRACE);
+
+ InitRoot(server, sender);
+
+ const ui64 hiveTabletId = ChangeStateStorage(Hive, server->GetSettings().Domain);
+
+ struct TNodeState {
+ // mediator -> bucket -> [observed, passed] step
+ THashMap<ui64, THashMap<ui32, std::pair<ui64, ui64>>> Steps;
+ ui64 AllowedStep = 0;
+ };
+ THashMap<ui32, TNodeState> mediatorState;
+
+ bool mustWaitForSteps[2] = { false, false };
+
+ auto captureTimecast = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto {
+ const ui32 nodeId = ev->GetRecipientRewrite().NodeId();
+ const ui32 nodeIndex = nodeId - runtime.GetNodeId(0);
+ switch (ev->GetTypeRewrite()) {
+ case TEvMediatorTimecast::TEvUpdate::EventType: {
+ auto* msg = ev->Get<TEvMediatorTimecast::TEvUpdate>();
+ const ui64 mediatorId = msg->Record.GetMediator();
+ const ui32 bucket = msg->Record.GetBucket();
+ ui64 step = msg->Record.GetTimeBarrier();
+ auto& state = mediatorState[nodeId];
+ if (!mustWaitForSteps[nodeIndex]) {
+ // Automatically allow all new steps
+ state.AllowedStep = Max(state.AllowedStep, step);
+ }
+ Cerr << "... node " << nodeId << " observed update from " << mediatorId
+ << " for bucket " << bucket
+ << " to step " << step
+ << " (allowed " << state.AllowedStep << ")"
+ << Endl;
+ auto& [observedStep, passedStep] = state.Steps[mediatorId][bucket];
+ observedStep = Max(observedStep, step);
+ if (step >= passedStep) {
+ if (step < state.AllowedStep) {
+ step = state.AllowedStep;
+ msg->Record.SetTimeBarrier(step);
+ Cerr << "... shifted to allowed step " << step << Endl;
+ }
+ passedStep = step;
+ break;
+ }
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ case TEvMediatorTimecast::TEvWaitPlanStep::EventType: {
+ const auto* msg = ev->Get<TEvMediatorTimecast::TEvWaitPlanStep>();
+ const ui64 tabletId = msg->TabletId;
+ const ui64 step = msg->PlanStep;
+ Cerr << "... node " << nodeId << " observed wait by " << tabletId
+ << " for step " << step
+ << Endl;
+ auto& state = mediatorState[nodeId];
+ if (state.AllowedStep < step) {
+ state.AllowedStep = step;
+ for (auto& kv1 : state.Steps) {
+ const ui64 mediatorId = kv1.first;
+ for (auto& kv2 : kv1.second) {
+ const ui32 bucket = kv2.first;
+ auto& [observedStep, passedStep] = kv2.second;
+ if (passedStep < step && passedStep < observedStep) {
+ passedStep = Min(step, observedStep);
+ auto* update = new TEvMediatorTimecast::TEvUpdate();
+ update->Record.SetMediator(mediatorId);
+ update->Record.SetBucket(bucket);
+ update->Record.SetTimeBarrier(passedStep);
+ runtime.Send(new IEventHandle(ev->GetRecipientRewrite(), ev->GetRecipientRewrite(), update), nodeIndex, /* viaActorSystem */ true);
+ }
+ }
+ }
+ }
+ break;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ auto prevObserverFunc = runtime.SetObserverFunc(captureTimecast);
+
+ TDisableDataShardLogBatching disableDataShardLogBatching;
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+
+ auto table1shards = GetTableShards(server, sender, "/Root/table-1");
+
+ // Make sure tablet is at node 1
+ runtime.SendToPipe(hiveTabletId, sender, new TEvHive::TEvFillNode(runtime.GetNodeId(0)));
+ {
+ auto ev = runtime.GrabEdgeEventRethrow<TEvHive::TEvFillNodeResult>(sender);
+ UNIT_ASSERT(ev->Get()->Record.GetStatus() == NKikimrProto::OK);
+ }
+
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)"));
+
+ SimulateSleep(server, TDuration::Seconds(1));
+
+ auto execSimpleRequest = [&](const TString& query) -> TString {
+ auto reqSender = runtime.AllocateEdgeActor();
+ auto ev = ExecRequest(runtime, reqSender, MakeSimpleRequest(query));
+ auto& response = ev->Get()->Record.GetRef();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
+ return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ };
+
+ auto beginSnapshotRequest = [&](TString& sessionId, TString& txId, const TString& query) -> TString {
+ auto reqSender = runtime.AllocateEdgeActor();
+ sessionId = CreateSession(runtime, reqSender);
+ auto ev = ExecRequest(runtime, reqSender, MakeBeginRequest(sessionId, query));
+ auto& response = ev->Get()->Record.GetRef();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ txId = response.GetResponse().GetTxMeta().id();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
+ return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ };
+
+ auto continueSnapshotRequest = [&](const TString& sessionId, const TString& txId, const TString& query) -> TString {
+ auto reqSender = runtime.AllocateEdgeActor();
+ auto ev = ExecRequest(runtime, reqSender, MakeContinueRequest(sessionId, txId, query));
+ auto& response = ev->Get()->Record.GetRef();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
+ return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ };
+
+ auto execSnapshotRequest = [&](const TString& query) -> TString {
+ auto reqSender = runtime.AllocateEdgeActor();
+ TString sessionId, txId;
+ TString result = beginSnapshotRequest(sessionId, txId, query);
+ CloseSession(runtime, reqSender, sessionId);
+ return result;
+ };
+
+ // Perform an immediate read, we should observe the initial write
+ UNIT_ASSERT_VALUES_EQUAL(
+ execSimpleRequest(Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+
+ // Same when using a fresh snapshot read
+ TString sessionId, txId;
+ UNIT_ASSERT_VALUES_EQUAL(
+ beginSnapshotRequest(sessionId, txId, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+
+ // Stop updating mediator timecast on the second node
+ mustWaitForSteps[1] = true;
+
+ // Insert a new row and wait for result
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2)"));
+
+ // Make sure tablet is at node 2
+ runtime.SendToPipe(hiveTabletId, sender, new TEvHive::TEvFillNode(runtime.GetNodeId(1)));
+ {
+ auto ev = runtime.GrabEdgeEventRethrow<TEvHive::TEvFillNodeResult>(sender);
+ UNIT_ASSERT(ev->Get()->Record.GetStatus() == NKikimrProto::OK);
+ }
+
+ // Perform an immediate read, we should observe confirmed writes after restart
+ UNIT_ASSERT_VALUES_EQUAL(
+ execSimpleRequest(Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } "
+ "} Struct { Bool: false }");
+
+ // Previous snapshot must see original data
+ UNIT_ASSERT_VALUES_EQUAL(
+ continueSnapshotRequest(sessionId, txId, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+
+ // However new snapshots must see updated data
+ UNIT_ASSERT_VALUES_EQUAL(
+ execSnapshotRequest(Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } "
+ "} Struct { Bool: false }");
+}
+
} // Y_UNIT_TEST_SUITE(DataShardOutOfOrder)
} // namespace NKikimr