diff options
author | Aleksei Borzenkov <snaury@ydb.tech> | 2024-02-15 13:28:02 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-02-15 13:28:02 +0300 |
commit | 16f7980968f8ef94ba96fecb48c3611c5d5db182 (patch) | |
tree | d7414a82faebad530fd877795899fcc0666d4a59 | |
parent | f6070cc929fbca0fe0d8b6a4512be84f7f308d6e (diff) | |
download | ydb-16f7980968f8ef94ba96fecb48c3611c5d5db182.tar.gz |
Fix readset acks sent too early in volatile transactions (#1945)
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_volatile.cpp | 108 | ||||
-rw-r--r-- | ydb/core/tx/datashard/volatile_tx.cpp | 4 |
2 files changed, 110 insertions, 2 deletions
diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp index efc134ed160..9d12f0ee2de 100644 --- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp +++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp @@ -3,6 +3,7 @@ #include "datashard_ut_common_pq.h" #include "datashard_active_transaction.h" +#include <ydb/core/base/blobstorage.h> #include <ydb/core/kqp/executer_actor/kqp_executer.h> namespace NKikimr { @@ -2106,6 +2107,113 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) { "{ items { uint32_value: 6 } items { uint32_value: 6 } }"); } + // Regression test for KIKIMR-21060 + Y_UNIT_TEST(DistributedWriteRSNotAckedBeforeCommit) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetDomainPlanResolution(1000) + .SetEnableDataShardVolatileTransactions(true); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + CreateShardedTable(server, sender, "/Root", "table-1", 1); + CreateShardedTable(server, sender, "/Root", "table-2", 1); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 10);"); + ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 20);"); + + // Block readset exchange + std::vector<std::unique_ptr<IEventHandle>> readSets; + auto blockReadSets = runtime.AddObserver<TEvTxProcessing::TEvReadSet>([&](TEvTxProcessing::TEvReadSet::TPtr& ev) { + Cerr << "... blocking readset" << Endl; + readSets.emplace_back(ev.Release()); + }); + + // Start a distributed write to both tables + TString sessionId = CreateSessionRPC(runtime, "/Root"); + auto upsertResult = SendRequest( + runtime, + MakeSimpleRequestRPC(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 30); + UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 40); + )", sessionId, /* txId */ "", /* commitTx */ true), + "/Root"); + WaitFor(runtime, [&]{ return readSets.size() >= 4; }, "readsets"); + + // Stop blocking further readsets + blockReadSets.Remove(); + + // Sleep a little to make sure everything so far is fully committed + runtime.SimulateSleep(TDuration::Seconds(1)); + + // Start blocking commits for table-1 + const auto shards1 = GetTableShards(server, sender, "/Root/table-1"); + UNIT_ASSERT_VALUES_EQUAL(shards1.size(), 1u); + std::vector<std::unique_ptr<IEventHandle>> putResponses; + auto blockCommits = runtime.AddObserver<TEvBlobStorage::TEvPut>([&](TEvBlobStorage::TEvPut::TPtr& ev) { + auto* msg = ev->Get(); + // Drop all put requests for table-1 + if (msg->Id.TabletID() == shards1.at(0)) { + // We can't just drop requests, we must reply to it later + putResponses.emplace_back(new IEventHandle( + ev->Sender, + ev->GetRecipientRewrite(), + msg->MakeErrorResponse(NKikimrProto::BLOCKED, "Fake blocked response", 0).release(), + 0, + ev->Cookie)); + Cerr << "... dropping put " << msg->Id << Endl; + ev.Reset(); + } + }); + + // Unblock readsets + for (auto& ev : readSets) { + runtime.Send(ev.release(), 0, true); + } + readSets.clear(); + + // Sleep to make sure those readsets are fully processed + // Bug was acknowledging readsets before tx state is fully persisted + runtime.SimulateSleep(TDuration::Seconds(1)); + + // Transaction will return success even when commits are blocked at this point + Cerr << "... awaiting upsert result" << Endl; + UNIT_ASSERT_VALUES_EQUAL( + FormatResult(AwaitResponse(runtime, std::move(upsertResult))), + "<empty>"); + + // Now we stop blocking commits and gracefully restart the tablet, all pending commits will be lost + blockCommits.Remove(); + for (auto& ev : putResponses) { + runtime.Send(ev.release(), 0, true); + } + Cerr << "... restarting tablet " << shards1.at(0) << Endl; + GracefulRestartTablet(runtime, shards1.at(0), sender); + + // We must see all rows as committed, i.e. nothing should be lost + Cerr << "... reading final result" << Endl; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, R"( + SELECT key, value FROM `/Root/table-1` + UNION ALL + SELECT key, value FROM `/Root/table-2` + ORDER BY key + )"), + "{ items { uint32_value: 1 } items { uint32_value: 10 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 20 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 30 } }, " + "{ items { uint32_value: 4 } items { uint32_value: 40 } }"); + } + } // Y_UNIT_TEST_SUITE(DataShardVolatile) } // namespace NKikimr diff --git a/ydb/core/tx/datashard/volatile_tx.cpp b/ydb/core/tx/datashard/volatile_tx.cpp index edd5ca149d6..73a233ae0d5 100644 --- a/ydb/core/tx/datashard/volatile_tx.cpp +++ b/ydb/core/tx/datashard/volatile_tx.cpp @@ -824,7 +824,7 @@ namespace NKikimr::NDataShard { } info->DelayedConfirmations.clear(); - // Send delayed acks on commit + // Send delayed acks when changes are persisted // TODO: maybe move it into a parameter? struct TDelayedAcksState : public TThrRefBase { TVector<THolder<IEventHandle>> DelayedAcks; @@ -833,7 +833,7 @@ namespace NKikimr::NDataShard { : DelayedAcks(std::move(info->DelayedAcks)) {} }; - txc.DB.OnCommit([state = MakeIntrusive<TDelayedAcksState>(info)]() { + txc.DB.OnPersistent([state = MakeIntrusive<TDelayedAcksState>(info)]() { for (auto& ev : state->DelayedAcks) { TActivationContext::Send(ev.Release()); } |