diff options
author | Alexey Borzenkov <snaury@yandex-team.ru> | 2022-06-06 16:47:07 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-06-06 16:47:07 +0300 |
commit | fa8ff7b07c7ef2cac601269486b51a1b720dcd02 (patch) | |
tree | f1b20e085896812f8d195e77a8c770b909990cc2 | |
parent | 0087b374fbbdf87d5f8f7429afa866dfecba977a (diff) | |
download | ydb-fa8ff7b07c7ef2cac601269486b51a1b720dcd02.tar.gz |
22-2: Fix uncommitted resent readset ack on unrelated tx completion, KIKIMR-15011
Merge from trunk: r9530205
REVIEW: 2599071
x-ydb-stable-ref: 97aab06fb7651b9f640a63f6fd243c3f2a4c7c39
-rw-r--r-- | ydb/core/testlib/tablet_helpers.cpp | 3 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_pipeline.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_order.cpp | 237 |
3 files changed, 241 insertions, 3 deletions
diff --git a/ydb/core/testlib/tablet_helpers.cpp b/ydb/core/testlib/tablet_helpers.cpp index ce5588e08d..fa585cffab 100644 --- a/ydb/core/testlib/tablet_helpers.cpp +++ b/ydb/core/testlib/tablet_helpers.cpp @@ -575,7 +575,8 @@ namespace NKikimr { TActorId ResolveTablet(TTestActorRuntime &runtime, ui64 tabletId, ui32 nodeIndex, bool sysTablet) { auto sender = runtime.AllocateEdgeActor(nodeIndex); runtime.Send(new IEventHandle(MakeTabletResolverID(), sender, - new TEvTabletResolver::TEvForward(tabletId, nullptr))); + new TEvTabletResolver::TEvForward(tabletId, nullptr)), + nodeIndex, true); auto ev = runtime.GrabEdgeEventRethrow<TEvTabletResolver::TEvForwardResult>(sender); Y_VERIFY(ev->Get()->Status == NKikimrProto::OK, "Failed to resolve tablet %" PRIu64, tabletId); if (sysTablet) { diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index c3dc581b24..1f5796d7b4 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -907,10 +907,10 @@ void TPipeline::CompleteTx(const TOperation::TPtr op, TTransactionContext& txc, auto &pr = *DelayedAcks.begin(); LOG_NOTICE(ctx, NKikimrServices::TX_DATASHARD, - "Send outdated delayed readset ack for %" PRIu64 ":%" PRIu64 " at %" PRIu64, + "Will send outdated delayed readset ack for %" PRIu64 ":%" PRIu64 " at %" PRIu64, pr.first.Step, pr.first.TxId, Self->TabletID()); - ctx.Send(pr.second.Release()); + op->AddDelayedAck(std::move(pr.second)); DelayedAcks.erase(DelayedAcks.begin()); } diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp index 7afe94b3d4..e57b2d35ba 100644 --- a/ydb/core/tx/datashard/datashard_ut_order.cpp +++ b/ydb/core/tx/datashard/datashard_ut_order.cpp @@ -5142,6 +5142,243 @@ Y_UNIT_TEST_TWIN(TestUnprotectedReadsThenWriteVisibility, UseNewEngine) { "} Struct { Bool: false }"); } +Y_UNIT_TEST_TWIN(UncommittedReadSetAck, UseNewEngine) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetNodeCount(2) + .SetUseRealThreads(false); + + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_MEDIATOR_TIMECAST, NLog::PRI_TRACE); + + InitRoot(server, sender); + + const ui64 hiveTabletId = ChangeStateStorage(Hive, server->GetSettings().Domain); + + TDisableDataShardLogBatching disableDataShardLogBatching; + CreateShardedTable(server, sender, "/Root", "table-1", 1); + CreateShardedTable(server, sender, "/Root", "table-2", 1); + + auto table1shards = GetTableShards(server, sender, "/Root/table-1"); + auto table2shards = GetTableShards(server, sender, "/Root/table-2"); + + // Make sure these tablets are at node 1 + runtime.SendToPipe(hiveTabletId, sender, new TEvHive::TEvFillNode(runtime.GetNodeId(0))); + { + auto ev = runtime.GrabEdgeEventRethrow<TEvHive::TEvFillNodeResult>(sender); + UNIT_ASSERT(ev->Get()->Record.GetStatus() == NKikimrProto::OK); + } + + // Create one more table, we expect it to run at node 2 + CreateShardedTable(server, sender, "/Root", "table-3", 1); + auto table3shards = GetTableShards(server, sender, "/Root/table-3"); + auto table3actor = ResolveTablet(runtime, table3shards.at(0), /* nodeIndex */ 1); + UNIT_ASSERT_VALUES_EQUAL(table3actor.NodeId(), runtime.GetNodeId(1)); + + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)")); + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 2)")); + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-3` (key, value) VALUES (3, 3)")); + + auto beginTx = [&](TString& sessionId, TString& txId, const TString& query) -> TString { + auto reqSender = runtime.AllocateEdgeActor(); + sessionId = CreateSession(runtime, reqSender); + auto ev = ExecRequest(runtime, reqSender, MakeBeginRequest(sessionId, query)); + auto& response = ev->Get()->Record.GetRef(); + UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + txId = response.GetResponse().GetTxMeta().id(); + UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); + return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + }; + + TString sessionId1, txId1; + UNIT_ASSERT_VALUES_EQUAL( + beginTx(sessionId1, txId1, Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + + TString sessionId2, txId2; + UNIT_ASSERT_VALUES_EQUAL( + beginTx(sessionId2, txId2, Q_(R"( + SELECT key, value FROM `/Root/table-2` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } " + "} Struct { Bool: false }"); + + bool capturePlanSteps = true; + TVector<THolder<IEventHandle>> capturedPlanSteps; + THashSet<ui64> passReadSetTxIds; + ui64 observedReadSets = 0; + TVector<THolder<IEventHandle>> capturedReadSets; + ui64 observedReadSetAcks = 0; + bool captureCommits = false; + TVector<THolder<IEventHandle>> capturedCommits; + + auto captureCommitAfterReadSet = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto { + const ui32 nodeId = ev->GetRecipientRewrite().NodeId(); + const ui32 nodeIndex = nodeId - runtime.GetNodeId(0); + if (nodeIndex == 1) { + switch (ev->GetTypeRewrite()) { + case TEvTxProcessing::TEvPlanStep::EventType: { + if (nodeIndex == 1 && ev->GetRecipientRewrite() == table3actor && capturePlanSteps) { + Cerr << "... captured plan step for table-3" << Endl; + capturedPlanSteps.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + break; + } + case TEvTxProcessing::TEvReadSet::EventType: { + if (nodeIndex == 1 && ev->GetRecipientRewrite() == table3actor) { + auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>(); + ui64 txId = msg->Record.GetTxId(); + ++observedReadSets; + if (!passReadSetTxIds.contains(txId)) { + Cerr << "... readset for txid# " << txId << " was blocked" << Endl; + capturedReadSets.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + Cerr << "... passing readset for txid# " << txId << Endl; + } + break; + } + case TEvTxProcessing::TEvReadSetAck::EventType: { + Cerr << "... read set ack" << Endl; + ++observedReadSetAcks; + break; + } + case TEvBlobStorage::TEvPut::EventType: { + auto* msg = ev->Get<TEvBlobStorage::TEvPut>(); + if (nodeIndex == 1 && msg->Id.TabletID() == table3shards.at(0) && captureCommits) { + Cerr << "... capturing put " << msg->Id << " for table-3" << Endl; + capturedCommits.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + } + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + auto prevObserverFunc = runtime.SetObserverFunc(captureCommitAfterReadSet); + + // Make two commits in parallel, one of them will receive a readset and become complete + auto reqCommit1Sender = runtime.AllocateEdgeActor(); + SendRequest(runtime, reqCommit1Sender, MakeCommitRequest(sessionId1, txId1, + Q_(R"( + UPSERT INTO `/Root/table-3` (key, value) VALUES (4, 4) + )"))); + auto reqCommit2Sender = runtime.AllocateEdgeActor(); + SendRequest(runtime, reqCommit2Sender, MakeCommitRequest(sessionId2, txId2, + Q_(R"( + UPSERT INTO `/Root/table-3` (key, value) VALUES (5, 5) + )"))); + + auto waitFor = [&](const auto& condition, const TString& description) { + while (!condition()) { + Cerr << "... waiting for " << description << Endl; + TDispatchOptions options; + options.CustomFinalCondition = [&]() { + return condition(); + }; + runtime.DispatchEvents(options); + } + }; + + waitFor([&]{ return capturedPlanSteps.size() > 0; }, "plan step"); + UNIT_ASSERT_VALUES_EQUAL(capturedPlanSteps.size(), 1u); + ui64 realTxId1, realTxId2; + { + auto* msg = capturedPlanSteps[0]->Get<TEvTxProcessing::TEvPlanStep>(); + TVector<ui64> realTxIds; + for (const auto& tx : msg->Record.GetTransactions()) { + realTxIds.emplace_back(tx.GetTxId()); + } + UNIT_ASSERT_VALUES_EQUAL(realTxIds.size(), 2u); + std::sort(realTxIds.begin(), realTxIds.end()); + realTxId1 = realTxIds.at(0); + realTxId2 = realTxIds.at(1); + } + + // Unblock and resend the plan step message + capturePlanSteps = false; + for (auto& ev : capturedPlanSteps) { + runtime.Send(ev.Release(), 1, true); + } + capturedPlanSteps.clear(); + + // Wait until there are 2 readset messages + waitFor([&]{ return capturedReadSets.size() >= 2; }, "initial readsets"); + SimulateSleep(runtime, TDuration::MilliSeconds(5)); + + // Unblock readset messages for txId1, but block commits + captureCommits = true; + observedReadSetAcks = 0; + passReadSetTxIds.insert(realTxId1); + for (auto& ev : capturedReadSets) { + runtime.Send(ev.Release(), 1, true); + } + capturedReadSets.clear(); + + // Wait until transaction is complete and tries to commit + waitFor([&]{ return capturedCommits.size() > 0 && capturedReadSets.size() > 0; }, "tx complete"); + SimulateSleep(runtime, TDuration::MilliSeconds(5)); + + // Reboot tablets and wait for resent readsets + // Since tx is already complete, it will be added to DelayedAcks + observedReadSets = 0; + capturedReadSets.clear(); + RebootTablet(runtime, table1shards.at(0), sender, 0, true); + RebootTablet(runtime, table2shards.at(0), sender, 0, true); + + waitFor([&]{ return observedReadSets >= 2; }, "resent readsets"); + SimulateSleep(runtime, TDuration::MilliSeconds(5)); + + // Now we unblock the second readset and resend it + passReadSetTxIds.insert(realTxId2); + for (auto& ev : capturedReadSets) { + runtime.Send(ev.Release(), 1, true); + } + capturedReadSets.clear(); + observedReadSets = 0; + + // Wait until the second transaction commits + ui64 prevCommitsBlocked = capturedCommits.size(); + waitFor([&]{ return capturedCommits.size() > prevCommitsBlocked && observedReadSets >= 1; }, "second tx complete"); + SimulateSleep(runtime, TDuration::MilliSeconds(5)); + + // There must be no readset acks, since we're blocking all commits + UNIT_ASSERT_VALUES_EQUAL(observedReadSetAcks, 0); + + // Now we stop blocking anything and "reply" to all blocked commits with an error + runtime.SetObserverFunc(prevObserverFunc); + for (auto& ev : capturedCommits) { + auto proxy = ev->Recipient; + ui32 groupId = GroupIDFromBlobStorageProxyID(proxy); + auto res = ev->Get<TEvBlobStorage::TEvPut>()->MakeErrorResponse(NKikimrProto::ERROR, "Something went wrong", groupId); + runtime.Send(new IEventHandle(ev->Sender, proxy, res.release()), 1, true); + } + capturedCommits.clear(); + + SimulateSleep(runtime, TDuration::MilliSeconds(5)); + + // This should succeed, unless the bug was triggered and readset acknowledged before commit + ExecSQL(server, sender, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 42); + UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 42); + UPSERT INTO `/Root/table-3` (key, value) VALUES (4, 42), (5, 42); + )")); +} + } // Y_UNIT_TEST_SUITE(DataShardOutOfOrder) } // namespace NKikimr |