aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksei Borzenkov <snaury@ydb.tech>2024-02-15 13:28:02 +0300
committerGitHub <noreply@github.com>2024-02-15 13:28:02 +0300
commit16f7980968f8ef94ba96fecb48c3611c5d5db182 (patch)
treed7414a82faebad530fd877795899fcc0666d4a59
parentf6070cc929fbca0fe0d8b6a4512be84f7f308d6e (diff)
downloadydb-16f7980968f8ef94ba96fecb48c3611c5d5db182.tar.gz
Fix readset acks sent too early in volatile transactions (#1945)
-rw-r--r--ydb/core/tx/datashard/datashard_ut_volatile.cpp108
-rw-r--r--ydb/core/tx/datashard/volatile_tx.cpp4
2 files changed, 110 insertions, 2 deletions
diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
index efc134ed160..9d12f0ee2de 100644
--- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
@@ -3,6 +3,7 @@
#include "datashard_ut_common_pq.h"
#include "datashard_active_transaction.h"
+#include <ydb/core/base/blobstorage.h>
#include <ydb/core/kqp/executer_actor/kqp_executer.h>
namespace NKikimr {
@@ -2106,6 +2107,113 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
"{ items { uint32_value: 6 } items { uint32_value: 6 } }");
}
+ // Regression test for KIKIMR-21060
+ Y_UNIT_TEST(DistributedWriteRSNotAckedBeforeCommit) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetDomainPlanResolution(1000)
+ .SetEnableDataShardVolatileTransactions(true);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+ CreateShardedTable(server, sender, "/Root", "table-2", 1);
+
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 10);");
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 20);");
+
+ // Block readset exchange
+ std::vector<std::unique_ptr<IEventHandle>> readSets;
+ auto blockReadSets = runtime.AddObserver<TEvTxProcessing::TEvReadSet>([&](TEvTxProcessing::TEvReadSet::TPtr& ev) {
+ Cerr << "... blocking readset" << Endl;
+ readSets.emplace_back(ev.Release());
+ });
+
+ // Start a distributed write to both tables
+ TString sessionId = CreateSessionRPC(runtime, "/Root");
+ auto upsertResult = SendRequest(
+ runtime,
+ MakeSimpleRequestRPC(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 30);
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 40);
+ )", sessionId, /* txId */ "", /* commitTx */ true),
+ "/Root");
+ WaitFor(runtime, [&]{ return readSets.size() >= 4; }, "readsets");
+
+ // Stop blocking further readsets
+ blockReadSets.Remove();
+
+ // Sleep a little to make sure everything so far is fully committed
+ runtime.SimulateSleep(TDuration::Seconds(1));
+
+ // Start blocking commits for table-1
+ const auto shards1 = GetTableShards(server, sender, "/Root/table-1");
+ UNIT_ASSERT_VALUES_EQUAL(shards1.size(), 1u);
+ std::vector<std::unique_ptr<IEventHandle>> putResponses;
+ auto blockCommits = runtime.AddObserver<TEvBlobStorage::TEvPut>([&](TEvBlobStorage::TEvPut::TPtr& ev) {
+ auto* msg = ev->Get();
+ // Drop all put requests for table-1
+ if (msg->Id.TabletID() == shards1.at(0)) {
+ // We can't just drop requests, we must reply to it later
+ putResponses.emplace_back(new IEventHandle(
+ ev->Sender,
+ ev->GetRecipientRewrite(),
+ msg->MakeErrorResponse(NKikimrProto::BLOCKED, "Fake blocked response", 0).release(),
+ 0,
+ ev->Cookie));
+ Cerr << "... dropping put " << msg->Id << Endl;
+ ev.Reset();
+ }
+ });
+
+ // Unblock readsets
+ for (auto& ev : readSets) {
+ runtime.Send(ev.release(), 0, true);
+ }
+ readSets.clear();
+
+ // Sleep to make sure those readsets are fully processed
+ // Bug was acknowledging readsets before tx state is fully persisted
+ runtime.SimulateSleep(TDuration::Seconds(1));
+
+ // Transaction will return success even when commits are blocked at this point
+ Cerr << "... awaiting upsert result" << Endl;
+ UNIT_ASSERT_VALUES_EQUAL(
+ FormatResult(AwaitResponse(runtime, std::move(upsertResult))),
+ "<empty>");
+
+ // Now we stop blocking commits and gracefully restart the tablet, all pending commits will be lost
+ blockCommits.Remove();
+ for (auto& ev : putResponses) {
+ runtime.Send(ev.release(), 0, true);
+ }
+ Cerr << "... restarting tablet " << shards1.at(0) << Endl;
+ GracefulRestartTablet(runtime, shards1.at(0), sender);
+
+ // We must see all rows as committed, i.e. nothing should be lost
+ Cerr << "... reading final result" << Endl;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, R"(
+ SELECT key, value FROM `/Root/table-1`
+ UNION ALL
+ SELECT key, value FROM `/Root/table-2`
+ ORDER BY key
+ )"),
+ "{ items { uint32_value: 1 } items { uint32_value: 10 } }, "
+ "{ items { uint32_value: 2 } items { uint32_value: 20 } }, "
+ "{ items { uint32_value: 3 } items { uint32_value: 30 } }, "
+ "{ items { uint32_value: 4 } items { uint32_value: 40 } }");
+ }
+
} // Y_UNIT_TEST_SUITE(DataShardVolatile)
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/volatile_tx.cpp b/ydb/core/tx/datashard/volatile_tx.cpp
index edd5ca149d6..73a233ae0d5 100644
--- a/ydb/core/tx/datashard/volatile_tx.cpp
+++ b/ydb/core/tx/datashard/volatile_tx.cpp
@@ -824,7 +824,7 @@ namespace NKikimr::NDataShard {
}
info->DelayedConfirmations.clear();
- // Send delayed acks on commit
+ // Send delayed acks when changes are persisted
// TODO: maybe move it into a parameter?
struct TDelayedAcksState : public TThrRefBase {
TVector<THolder<IEventHandle>> DelayedAcks;
@@ -833,7 +833,7 @@ namespace NKikimr::NDataShard {
: DelayedAcks(std::move(info->DelayedAcks))
{}
};
- txc.DB.OnCommit([state = MakeIntrusive<TDelayedAcksState>(info)]() {
+ txc.DB.OnPersistent([state = MakeIntrusive<TDelayedAcksState>(info)]() {
for (auto& ev : state->DelayedAcks) {
TActivationContext::Send(ev.Release());
}