aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksei Borzenkov <snaury@ydb.tech>2024-03-11 16:32:01 +0300
committerGitHub <noreply@github.com>2024-03-11 16:32:01 +0300
commit4494ea79dab1e861359d09a76175e48e5f9a900d (patch)
tree3c72fa908020fc37c31a53fd2c0de183d9303cb3
parent3f03b4a0c88ddb640d65ab6c546c3c4861ca1ebc (diff)
downloadydb-4494ea79dab1e861359d09a76175e48e5f9a900d.tar.gz
Fix volatile result sent before it is fully committed (#2598)
-rw-r--r--ydb/core/tx/datashard/datashard_active_transaction.cpp4
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common_kqp.h22
-rw-r--r--ydb/core/tx/datashard/datashard_ut_volatile.cpp114
-rw-r--r--ydb/core/tx/datashard/finish_propose_unit.cpp7
-rw-r--r--ydb/core/tx/datashard/finish_propose_write_unit.cpp7
-rw-r--r--ydb/core/tx/datashard/operation.h13
6 files changed, 146 insertions, 21 deletions
diff --git a/ydb/core/tx/datashard/datashard_active_transaction.cpp b/ydb/core/tx/datashard/datashard_active_transaction.cpp
index ed7430fce86..feafaf70886 100644
--- a/ydb/core/tx/datashard/datashard_active_transaction.cpp
+++ b/ydb/core/tx/datashard/datashard_active_transaction.cpp
@@ -410,7 +410,7 @@ TValidatedDataTx::TPtr TActiveTransaction::BuildDataTx(TDataShard *self,
if (!DataTx) {
Y_ABORT_UNLESS(TxBody);
DataTx = std::make_shared<TValidatedDataTx>(self, txc, ctx, GetStepOrder(),
- GetReceivedAt(), TxBody, MvccSnapshotRepeatable);
+ GetReceivedAt(), TxBody, IsMvccSnapshotRepeatable());
if (DataTx->HasStreamResponse())
SetStreamSink(DataTx->GetSink());
}
@@ -639,7 +639,7 @@ ERestoreDataStatus TActiveTransaction::RestoreTxData(
bool extractKeys = DataTx->IsTxInfoLoaded();
DataTx = std::make_shared<TValidatedDataTx>(self, txc, ctx, GetStepOrder(),
- GetReceivedAt(), TxBody, MvccSnapshotRepeatable);
+ GetReceivedAt(), TxBody, IsMvccSnapshotRepeatable());
if (DataTx->Ready() && extractKeys) {
DataTx->ExtractKeys(true);
}
diff --git a/ydb/core/tx/datashard/datashard_ut_common_kqp.h b/ydb/core/tx/datashard/datashard_ut_common_kqp.h
index d7182e08e3d..de0e2ef7a24 100644
--- a/ydb/core/tx/datashard/datashard_ut_common_kqp.h
+++ b/ydb/core/tx/datashard/datashard_ut_common_kqp.h
@@ -42,7 +42,7 @@ namespace NKqpHelpers {
inline TString CreateSessionRPC(TTestActorRuntime& runtime, const TString& database = {}) {
Ydb::Table::CreateSessionRequest request;
auto future = NRpcService::DoLocalRpc<TEvCreateSessionRequest>(
- std::move(request), database, "", /* token */ runtime.GetActorSystem(0));
+ std::move(request), database, /* token */ "", runtime.GetActorSystem(0));
TString sessionId;
auto response = AwaitResponse(runtime, future);
UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
@@ -71,7 +71,7 @@ namespace NKqpHelpers {
TTestActorRuntime& runtime, Ydb::Table::ExecuteDataQueryRequest&& request, const TString& database = {})
{
return NRpcService::DoLocalRpc<TEvExecuteDataQueryRequest>(
- std::move(request), database, "" /* token */, runtime.GetActorSystem(0));
+ std::move(request), database, /* token */ "", runtime.GetActorSystem(0));
}
inline Ydb::Table::ExecuteDataQueryRequest MakeSimpleRequestRPC(
@@ -119,7 +119,7 @@ namespace NKqpHelpers {
Ydb::Table::DeleteSessionRequest request;
request.set_session_id(sessionId);
auto future = NRpcService::DoLocalRpc<TEvDeleteSessionRequest>(
- std::move(request), "", "", /* token */ runtime.GetActorSystem(0));
+ std::move(request), "", /* token */ "", runtime.GetActorSystem(0));
}
inline THolder<NKqp::TEvKqp::TEvQueryRequest> MakeStreamRequest(
@@ -168,17 +168,15 @@ namespace NKqpHelpers {
return FormatResult(result);
}
- inline TString KqpSimpleExec(TTestActorRuntime& runtime, const TString& query, bool staleRo = false, const TString& database = {}) {
+ inline auto KqpSimpleSend(TTestActorRuntime& runtime, const TString& query, bool staleRo = false, const TString& database = {}) {
TString sessionId = CreateSessionRPC(runtime, database);
TString txId;
- auto response = AwaitResponse(
- runtime, SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, true /* commitTx */, staleRo), database));
- if (response.operation().status() != Ydb::StatusIds::SUCCESS) {
- return TStringBuilder() << "ERROR: " << response.operation().status();
- }
- Ydb::Table::ExecuteQueryResult result;
- response.operation().result().UnpackTo(&result);
- return FormatResult(result);
+ return SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, /* commitTx */ true, staleRo), database);
+ }
+
+ inline TString KqpSimpleExec(TTestActorRuntime& runtime, const TString& query, bool staleRo = false, const TString& database = {}) {
+ auto response = AwaitResponse(runtime, KqpSimpleSend(runtime, query, staleRo, database));
+ return FormatResult(response);
}
inline TString KqpSimpleStaleRoExec(TTestActorRuntime& runtime, const TString& query, const TString& database = {}) {
diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
index 5d982825973..2b14e090501 100644
--- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
@@ -2263,6 +2263,120 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
UNIT_ASSERT_VALUES_EQUAL(volatileTxs, 2u);
}
+ // Regression test for KIKIMR-21156
+ Y_UNIT_TEST(VolatileCommitOnBlobStorageFailure) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetDomainPlanResolution(1000)
+ .SetEnableDataShardVolatileTransactions(true);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::KQP_SESSION, NLog::PRI_TRACE);
+
+ InitRoot(server, sender);
+
+ TDisableDataShardLogBatching disableDataShardLogBatching;
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+ CreateShardedTable(server, sender, "/Root", "table-2", 1);
+
+ // Make sure read flags are persisted by performing a snapshot read
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, R"(
+ SELECT key, value FROM `/Root/table-1`
+ UNION ALL
+ SELECT key, value FROM `/Root/table-2`
+ ORDER BY key
+ )"),
+ "");
+
+ // Insert initial values
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 10);"));
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 20);"));
+
+ // Start blocking commits for table-1
+ const auto shards1 = GetTableShards(server, sender, "/Root/table-1");
+ UNIT_ASSERT_VALUES_EQUAL(shards1.size(), 1u);
+ std::deque<TEvBlobStorage::TEvPut::TPtr> blockedPuts;
+ auto blockCommits = runtime.AddObserver<TEvBlobStorage::TEvPut>([&](TEvBlobStorage::TEvPut::TPtr& ev) {
+ auto* msg = ev->Get();
+ // Drop all put requests for table-1
+ if (msg->Id.TabletID() == shards1.at(0)) {
+ Cerr << "... blocking put " << msg->Id << Endl;
+ blockedPuts.push_back(std::move(ev));
+ }
+ });
+
+ // Start an upsert to table-1, this will block further readonly localdb tx completions
+ Cerr << "... starting an upsert to table-1" << Endl;
+ auto firstUpsertFuture = KqpSimpleSend(runtime, R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 30);
+ )");
+
+ // Wait until puts are blocked
+ WaitFor(runtime, [&]{ return blockedPuts.size() > 0; }, "blocked puts");
+ auto firstUpsertPuts = std::move(blockedPuts);
+ UNIT_ASSERT(blockedPuts.empty());
+
+ // Read from table-2 and write to table-1 based on the result
+ // This will result in a two-shard volatile tx writing to table-1
+ Cerr << "... starting distributed tx between table-1 and table-2" << Endl;
+ auto volatileFuture = KqpSimpleSend(runtime, R"(
+ UPSERT INTO `/Root/table-1`
+ SELECT key + 2u AS key, value + 2u AS value
+ FROM `/Root/table-2`;
+ )");
+
+ // Wait until it also tries to commit
+ WaitFor(runtime, [&]{ return blockedPuts.size() > 0; }, "blocked puts");
+
+ // Now unblock the first upsert puts
+ blockCommits.Remove();
+ for (auto& ev : firstUpsertPuts) {
+ runtime.Send(ev.Release(), 0, true);
+ }
+ firstUpsertPuts.clear();
+
+ // And wait for it to finish successfully
+ Cerr << "... waiting for first upsert result" << Endl;
+ UNIT_ASSERT_VALUES_EQUAL(
+ FormatResult(AwaitResponse(runtime, std::move(firstUpsertFuture))),
+ "<empty>");
+
+ // Reply to everything previously blocked with an error, the shard will restart
+ for (auto& ev : blockedPuts) {
+ auto proxy = ev->Recipient;
+ ui32 groupId = GroupIDFromBlobStorageProxyID(proxy);
+ auto res = ev->Get()->MakeErrorResponse(NKikimrProto::ERROR, "Something went wrong", groupId);
+ runtime.Send(new IEventHandle(ev->Sender, proxy, res.release()), 0, true);
+ }
+
+ // Wait for the volatile tx result
+ Cerr << "... waiting for volatile tx result" << Endl;
+ auto result = FormatResult(AwaitResponse(runtime, std::move(volatileFuture)));
+ if (result == "<empty>") {
+ // A success result is not ok now, but in the future we might migrate state
+ // Check that the supposedly committed row actually exists
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, R"(
+ SELECT key, value FROM `/Root/table-1` ORDER BY key;
+ )"),
+ "{ items { uint32_value: 1 } items { uint32_value: 10 } }, "
+ "{ items { uint32_value: 3 } items { uint32_value: 30 } }, "
+ "{ items { uint32_value: 4 } items { uint32_value: 22 } }");
+ } else {
+ // Otherwise the result must be undetermined
+ UNIT_ASSERT_VALUES_EQUAL(result, "ERROR: UNDETERMINED");
+ }
+ }
+
} // Y_UNIT_TEST_SUITE(DataShardVolatile)
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/finish_propose_unit.cpp b/ydb/core/tx/datashard/finish_propose_unit.cpp
index 545a1f88b2c..3f76432c700 100644
--- a/ydb/core/tx/datashard/finish_propose_unit.cpp
+++ b/ydb/core/tx/datashard/finish_propose_unit.cpp
@@ -99,8 +99,11 @@ EExecutionStatus TFinishProposeUnit::Execute(TOperation::TPtr op,
op->SetFinishProposeTs(DataShard.ConfirmReadOnlyLease());
}
- if (!op->HasResultSentFlag() && (op->IsDirty() || op->HasVolatilePrepareFlag() || !Pipeline.WaitCompletion(op)))
+ if (!op->HasResultSentFlag() && (op->IsDirty() || op->HasVolatilePrepareFlag() || !Pipeline.WaitCompletion(op))) {
+ DataShard.IncCounter(COUNTER_PREPARE_COMPLETE);
+ op->SetProposeResultSentEarly();
CompleteRequest(op, ctx);
+ }
if (!DataShard.IsFollower())
DataShard.PlanCleanup(ctx);
@@ -128,7 +131,7 @@ EExecutionStatus TFinishProposeUnit::Execute(TOperation::TPtr op,
void TFinishProposeUnit::Complete(TOperation::TPtr op,
const TActorContext &ctx)
{
- if (!op->HasResultSentFlag()) {
+ if (!op->HasResultSentFlag() && !op->IsProposeResultSentEarly()) {
DataShard.IncCounter(COUNTER_PREPARE_COMPLETE);
if (op->Result())
diff --git a/ydb/core/tx/datashard/finish_propose_write_unit.cpp b/ydb/core/tx/datashard/finish_propose_write_unit.cpp
index 5a1e4af8f1f..13aceb011c5 100644
--- a/ydb/core/tx/datashard/finish_propose_write_unit.cpp
+++ b/ydb/core/tx/datashard/finish_propose_write_unit.cpp
@@ -97,8 +97,11 @@ EExecutionStatus TFinishProposeWriteUnit::Execute(TOperation::TPtr op,
op->SetFinishProposeTs(DataShard.ConfirmReadOnlyLease());
}
- if (!op->HasResultSentFlag() && (op->IsDirty() || op->HasVolatilePrepareFlag() || !Pipeline.WaitCompletion(op)))
+ if (!op->HasResultSentFlag() && (op->IsDirty() || op->HasVolatilePrepareFlag() || !Pipeline.WaitCompletion(op))) {
+ DataShard.IncCounter(COUNTER_PREPARE_COMPLETE);
+ op->SetProposeResultSentEarly();
CompleteRequest(op, ctx);
+ }
if (!DataShard.IsFollower())
DataShard.PlanCleanup(ctx);
@@ -127,7 +130,7 @@ void TFinishProposeWriteUnit::Complete(TOperation::TPtr op, const TActorContext
{
TWriteOperation* writeOp = TWriteOperation::CastWriteOperation(op);
- if (!op->HasResultSentFlag()) {
+ if (!op->HasResultSentFlag() && !op->IsProposeResultSentEarly()) {
DataShard.IncCounter(COUNTER_WRITE_COMPLETE);
if (writeOp->GetWriteResult())
diff --git a/ydb/core/tx/datashard/operation.h b/ydb/core/tx/datashard/operation.h
index 0b8e13a271d..192661fb1fe 100644
--- a/ydb/core/tx/datashard/operation.h
+++ b/ydb/core/tx/datashard/operation.h
@@ -408,12 +408,15 @@ public:
bool IsMvccSnapshotRead() const { return !MvccSnapshot.IsMax(); }
const TRowVersion& GetMvccSnapshot() const { return MvccSnapshot; }
- bool IsMvccSnapshotRepeatable() const { return MvccSnapshotRepeatable; }
+ bool IsMvccSnapshotRepeatable() const { return MvccSnapshotRepeatable_; }
void SetMvccSnapshot(const TRowVersion& snapshot, bool isRepeatable = true) {
MvccSnapshot = snapshot;
- MvccSnapshotRepeatable = isRepeatable;
+ MvccSnapshotRepeatable_ = isRepeatable;
}
+ bool IsProposeResultSentEarly() const { return ProposeResultSentEarly_; }
+ void SetProposeResultSentEarly(bool value = true) { ProposeResultSentEarly_ = value; }
+
///////////////////////////////////
// DEBUG AND MONITORING //
///////////////////////////////////
@@ -435,7 +438,11 @@ protected:
TSnapshotKey AcquiredSnapshotKey;
TRowVersion MvccSnapshot = TRowVersion::Max();
- bool MvccSnapshotRepeatable = false;
+
+private:
+ // Runtime flags
+ ui8 MvccSnapshotRepeatable_ : 1 = 0;
+ ui8 ProposeResultSentEarly_ : 1 = 0;
};
struct TRSData {