aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Borzenkov <snaury@yandex-team.ru>2022-06-06 16:47:07 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-06-06 16:47:07 +0300
commitfa8ff7b07c7ef2cac601269486b51a1b720dcd02 (patch)
treef1b20e085896812f8d195e77a8c770b909990cc2
parent0087b374fbbdf87d5f8f7429afa866dfecba977a (diff)
downloadydb-fa8ff7b07c7ef2cac601269486b51a1b720dcd02.tar.gz
22-2: Fix uncommitted resent readset ack on unrelated tx completion, KIKIMR-15011
Merge from trunk: r9530205 REVIEW: 2599071 x-ydb-stable-ref: 97aab06fb7651b9f640a63f6fd243c3f2a4c7c39
-rw-r--r--ydb/core/testlib/tablet_helpers.cpp3
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.cpp4
-rw-r--r--ydb/core/tx/datashard/datashard_ut_order.cpp237
3 files changed, 241 insertions, 3 deletions
diff --git a/ydb/core/testlib/tablet_helpers.cpp b/ydb/core/testlib/tablet_helpers.cpp
index ce5588e08d..fa585cffab 100644
--- a/ydb/core/testlib/tablet_helpers.cpp
+++ b/ydb/core/testlib/tablet_helpers.cpp
@@ -575,7 +575,8 @@ namespace NKikimr {
TActorId ResolveTablet(TTestActorRuntime &runtime, ui64 tabletId, ui32 nodeIndex, bool sysTablet) {
auto sender = runtime.AllocateEdgeActor(nodeIndex);
runtime.Send(new IEventHandle(MakeTabletResolverID(), sender,
- new TEvTabletResolver::TEvForward(tabletId, nullptr)));
+ new TEvTabletResolver::TEvForward(tabletId, nullptr)),
+ nodeIndex, true);
auto ev = runtime.GrabEdgeEventRethrow<TEvTabletResolver::TEvForwardResult>(sender);
Y_VERIFY(ev->Get()->Status == NKikimrProto::OK, "Failed to resolve tablet %" PRIu64, tabletId);
if (sysTablet) {
diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp
index c3dc581b24..1f5796d7b4 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.cpp
+++ b/ydb/core/tx/datashard/datashard_pipeline.cpp
@@ -907,10 +907,10 @@ void TPipeline::CompleteTx(const TOperation::TPtr op, TTransactionContext& txc,
auto &pr = *DelayedAcks.begin();
LOG_NOTICE(ctx, NKikimrServices::TX_DATASHARD,
- "Send outdated delayed readset ack for %" PRIu64 ":%" PRIu64 " at %" PRIu64,
+ "Will send outdated delayed readset ack for %" PRIu64 ":%" PRIu64 " at %" PRIu64,
pr.first.Step, pr.first.TxId, Self->TabletID());
- ctx.Send(pr.second.Release());
+ op->AddDelayedAck(std::move(pr.second));
DelayedAcks.erase(DelayedAcks.begin());
}
diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp
index 7afe94b3d4..e57b2d35ba 100644
--- a/ydb/core/tx/datashard/datashard_ut_order.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_order.cpp
@@ -5142,6 +5142,243 @@ Y_UNIT_TEST_TWIN(TestUnprotectedReadsThenWriteVisibility, UseNewEngine) {
"} Struct { Bool: false }");
}
+Y_UNIT_TEST_TWIN(UncommittedReadSetAck, UseNewEngine) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetNodeCount(2)
+ .SetUseRealThreads(false);
+
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_MEDIATOR_TIMECAST, NLog::PRI_TRACE);
+
+ InitRoot(server, sender);
+
+ const ui64 hiveTabletId = ChangeStateStorage(Hive, server->GetSettings().Domain);
+
+ TDisableDataShardLogBatching disableDataShardLogBatching;
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+ CreateShardedTable(server, sender, "/Root", "table-2", 1);
+
+ auto table1shards = GetTableShards(server, sender, "/Root/table-1");
+ auto table2shards = GetTableShards(server, sender, "/Root/table-2");
+
+ // Make sure these tablets are at node 1
+ runtime.SendToPipe(hiveTabletId, sender, new TEvHive::TEvFillNode(runtime.GetNodeId(0)));
+ {
+ auto ev = runtime.GrabEdgeEventRethrow<TEvHive::TEvFillNodeResult>(sender);
+ UNIT_ASSERT(ev->Get()->Record.GetStatus() == NKikimrProto::OK);
+ }
+
+ // Create one more table, we expect it to run at node 2
+ CreateShardedTable(server, sender, "/Root", "table-3", 1);
+ auto table3shards = GetTableShards(server, sender, "/Root/table-3");
+ auto table3actor = ResolveTablet(runtime, table3shards.at(0), /* nodeIndex */ 1);
+ UNIT_ASSERT_VALUES_EQUAL(table3actor.NodeId(), runtime.GetNodeId(1));
+
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)"));
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 2)"));
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-3` (key, value) VALUES (3, 3)"));
+
+ auto beginTx = [&](TString& sessionId, TString& txId, const TString& query) -> TString {
+ auto reqSender = runtime.AllocateEdgeActor();
+ sessionId = CreateSession(runtime, reqSender);
+ auto ev = ExecRequest(runtime, reqSender, MakeBeginRequest(sessionId, query));
+ auto& response = ev->Get()->Record.GetRef();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ txId = response.GetResponse().GetTxMeta().id();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
+ return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ };
+
+ TString sessionId1, txId1;
+ UNIT_ASSERT_VALUES_EQUAL(
+ beginTx(sessionId1, txId1, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+
+ TString sessionId2, txId2;
+ UNIT_ASSERT_VALUES_EQUAL(
+ beginTx(sessionId2, txId2, Q_(R"(
+ SELECT key, value FROM `/Root/table-2`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } "
+ "} Struct { Bool: false }");
+
+ bool capturePlanSteps = true;
+ TVector<THolder<IEventHandle>> capturedPlanSteps;
+ THashSet<ui64> passReadSetTxIds;
+ ui64 observedReadSets = 0;
+ TVector<THolder<IEventHandle>> capturedReadSets;
+ ui64 observedReadSetAcks = 0;
+ bool captureCommits = false;
+ TVector<THolder<IEventHandle>> capturedCommits;
+
+ auto captureCommitAfterReadSet = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto {
+ const ui32 nodeId = ev->GetRecipientRewrite().NodeId();
+ const ui32 nodeIndex = nodeId - runtime.GetNodeId(0);
+ if (nodeIndex == 1) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvTxProcessing::TEvPlanStep::EventType: {
+ if (nodeIndex == 1 && ev->GetRecipientRewrite() == table3actor && capturePlanSteps) {
+ Cerr << "... captured plan step for table-3" << Endl;
+ capturedPlanSteps.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ break;
+ }
+ case TEvTxProcessing::TEvReadSet::EventType: {
+ if (nodeIndex == 1 && ev->GetRecipientRewrite() == table3actor) {
+ auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>();
+ ui64 txId = msg->Record.GetTxId();
+ ++observedReadSets;
+ if (!passReadSetTxIds.contains(txId)) {
+ Cerr << "... readset for txid# " << txId << " was blocked" << Endl;
+ capturedReadSets.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ Cerr << "... passing readset for txid# " << txId << Endl;
+ }
+ break;
+ }
+ case TEvTxProcessing::TEvReadSetAck::EventType: {
+ Cerr << "... read set ack" << Endl;
+ ++observedReadSetAcks;
+ break;
+ }
+ case TEvBlobStorage::TEvPut::EventType: {
+ auto* msg = ev->Get<TEvBlobStorage::TEvPut>();
+ if (nodeIndex == 1 && msg->Id.TabletID() == table3shards.at(0) && captureCommits) {
+ Cerr << "... capturing put " << msg->Id << " for table-3" << Endl;
+ capturedCommits.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ }
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ auto prevObserverFunc = runtime.SetObserverFunc(captureCommitAfterReadSet);
+
+ // Make two commits in parallel, one of them will receive a readset and become complete
+ auto reqCommit1Sender = runtime.AllocateEdgeActor();
+ SendRequest(runtime, reqCommit1Sender, MakeCommitRequest(sessionId1, txId1,
+ Q_(R"(
+ UPSERT INTO `/Root/table-3` (key, value) VALUES (4, 4)
+ )")));
+ auto reqCommit2Sender = runtime.AllocateEdgeActor();
+ SendRequest(runtime, reqCommit2Sender, MakeCommitRequest(sessionId2, txId2,
+ Q_(R"(
+ UPSERT INTO `/Root/table-3` (key, value) VALUES (5, 5)
+ )")));
+
+ auto waitFor = [&](const auto& condition, const TString& description) {
+ while (!condition()) {
+ Cerr << "... waiting for " << description << Endl;
+ TDispatchOptions options;
+ options.CustomFinalCondition = [&]() {
+ return condition();
+ };
+ runtime.DispatchEvents(options);
+ }
+ };
+
+ waitFor([&]{ return capturedPlanSteps.size() > 0; }, "plan step");
+ UNIT_ASSERT_VALUES_EQUAL(capturedPlanSteps.size(), 1u);
+ ui64 realTxId1, realTxId2;
+ {
+ auto* msg = capturedPlanSteps[0]->Get<TEvTxProcessing::TEvPlanStep>();
+ TVector<ui64> realTxIds;
+ for (const auto& tx : msg->Record.GetTransactions()) {
+ realTxIds.emplace_back(tx.GetTxId());
+ }
+ UNIT_ASSERT_VALUES_EQUAL(realTxIds.size(), 2u);
+ std::sort(realTxIds.begin(), realTxIds.end());
+ realTxId1 = realTxIds.at(0);
+ realTxId2 = realTxIds.at(1);
+ }
+
+ // Unblock and resend the plan step message
+ capturePlanSteps = false;
+ for (auto& ev : capturedPlanSteps) {
+ runtime.Send(ev.Release(), 1, true);
+ }
+ capturedPlanSteps.clear();
+
+ // Wait until there are 2 readset messages
+ waitFor([&]{ return capturedReadSets.size() >= 2; }, "initial readsets");
+ SimulateSleep(runtime, TDuration::MilliSeconds(5));
+
+ // Unblock readset messages for txId1, but block commits
+ captureCommits = true;
+ observedReadSetAcks = 0;
+ passReadSetTxIds.insert(realTxId1);
+ for (auto& ev : capturedReadSets) {
+ runtime.Send(ev.Release(), 1, true);
+ }
+ capturedReadSets.clear();
+
+ // Wait until transaction is complete and tries to commit
+ waitFor([&]{ return capturedCommits.size() > 0 && capturedReadSets.size() > 0; }, "tx complete");
+ SimulateSleep(runtime, TDuration::MilliSeconds(5));
+
+ // Reboot tablets and wait for resent readsets
+ // Since tx is already complete, it will be added to DelayedAcks
+ observedReadSets = 0;
+ capturedReadSets.clear();
+ RebootTablet(runtime, table1shards.at(0), sender, 0, true);
+ RebootTablet(runtime, table2shards.at(0), sender, 0, true);
+
+ waitFor([&]{ return observedReadSets >= 2; }, "resent readsets");
+ SimulateSleep(runtime, TDuration::MilliSeconds(5));
+
+ // Now we unblock the second readset and resend it
+ passReadSetTxIds.insert(realTxId2);
+ for (auto& ev : capturedReadSets) {
+ runtime.Send(ev.Release(), 1, true);
+ }
+ capturedReadSets.clear();
+ observedReadSets = 0;
+
+ // Wait until the second transaction commits
+ ui64 prevCommitsBlocked = capturedCommits.size();
+ waitFor([&]{ return capturedCommits.size() > prevCommitsBlocked && observedReadSets >= 1; }, "second tx complete");
+ SimulateSleep(runtime, TDuration::MilliSeconds(5));
+
+ // There must be no readset acks, since we're blocking all commits
+ UNIT_ASSERT_VALUES_EQUAL(observedReadSetAcks, 0);
+
+ // Now we stop blocking anything and "reply" to all blocked commits with an error
+ runtime.SetObserverFunc(prevObserverFunc);
+ for (auto& ev : capturedCommits) {
+ auto proxy = ev->Recipient;
+ ui32 groupId = GroupIDFromBlobStorageProxyID(proxy);
+ auto res = ev->Get<TEvBlobStorage::TEvPut>()->MakeErrorResponse(NKikimrProto::ERROR, "Something went wrong", groupId);
+ runtime.Send(new IEventHandle(ev->Sender, proxy, res.release()), 1, true);
+ }
+ capturedCommits.clear();
+
+ SimulateSleep(runtime, TDuration::MilliSeconds(5));
+
+ // This should succeed, unless the bug was triggered and readset acknowledged before commit
+ ExecSQL(server, sender, Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 42);
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 42);
+ UPSERT INTO `/Root/table-3` (key, value) VALUES (4, 42), (5, 42);
+ )"));
+}
+
} // Y_UNIT_TEST_SUITE(DataShardOutOfOrder)
} // namespace NKikimr