diff options
author | Aleksei Borzenkov <snaury@ydb.tech> | 2024-03-11 16:32:01 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-03-11 16:32:01 +0300 |
commit | 4494ea79dab1e861359d09a76175e48e5f9a900d (patch) | |
tree | 3c72fa908020fc37c31a53fd2c0de183d9303cb3 | |
parent | 3f03b4a0c88ddb640d65ab6c546c3c4861ca1ebc (diff) | |
download | ydb-4494ea79dab1e861359d09a76175e48e5f9a900d.tar.gz |
Fix volatile result sent before it is fully committed (#2598)
-rw-r--r-- | ydb/core/tx/datashard/datashard_active_transaction.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_common_kqp.h | 22 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_volatile.cpp | 114 | ||||
-rw-r--r-- | ydb/core/tx/datashard/finish_propose_unit.cpp | 7 | ||||
-rw-r--r-- | ydb/core/tx/datashard/finish_propose_write_unit.cpp | 7 | ||||
-rw-r--r-- | ydb/core/tx/datashard/operation.h | 13 |
6 files changed, 146 insertions, 21 deletions
diff --git a/ydb/core/tx/datashard/datashard_active_transaction.cpp b/ydb/core/tx/datashard/datashard_active_transaction.cpp index ed7430fce86..feafaf70886 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.cpp +++ b/ydb/core/tx/datashard/datashard_active_transaction.cpp @@ -410,7 +410,7 @@ TValidatedDataTx::TPtr TActiveTransaction::BuildDataTx(TDataShard *self, if (!DataTx) { Y_ABORT_UNLESS(TxBody); DataTx = std::make_shared<TValidatedDataTx>(self, txc, ctx, GetStepOrder(), - GetReceivedAt(), TxBody, MvccSnapshotRepeatable); + GetReceivedAt(), TxBody, IsMvccSnapshotRepeatable()); if (DataTx->HasStreamResponse()) SetStreamSink(DataTx->GetSink()); } @@ -639,7 +639,7 @@ ERestoreDataStatus TActiveTransaction::RestoreTxData( bool extractKeys = DataTx->IsTxInfoLoaded(); DataTx = std::make_shared<TValidatedDataTx>(self, txc, ctx, GetStepOrder(), - GetReceivedAt(), TxBody, MvccSnapshotRepeatable); + GetReceivedAt(), TxBody, IsMvccSnapshotRepeatable()); if (DataTx->Ready() && extractKeys) { DataTx->ExtractKeys(true); } diff --git a/ydb/core/tx/datashard/datashard_ut_common_kqp.h b/ydb/core/tx/datashard/datashard_ut_common_kqp.h index d7182e08e3d..de0e2ef7a24 100644 --- a/ydb/core/tx/datashard/datashard_ut_common_kqp.h +++ b/ydb/core/tx/datashard/datashard_ut_common_kqp.h @@ -42,7 +42,7 @@ namespace NKqpHelpers { inline TString CreateSessionRPC(TTestActorRuntime& runtime, const TString& database = {}) { Ydb::Table::CreateSessionRequest request; auto future = NRpcService::DoLocalRpc<TEvCreateSessionRequest>( - std::move(request), database, "", /* token */ runtime.GetActorSystem(0)); + std::move(request), database, /* token */ "", runtime.GetActorSystem(0)); TString sessionId; auto response = AwaitResponse(runtime, future); UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); @@ -71,7 +71,7 @@ namespace NKqpHelpers { TTestActorRuntime& runtime, Ydb::Table::ExecuteDataQueryRequest&& request, const TString& database = {}) { return NRpcService::DoLocalRpc<TEvExecuteDataQueryRequest>( - std::move(request), database, "" /* token */, runtime.GetActorSystem(0)); + std::move(request), database, /* token */ "", runtime.GetActorSystem(0)); } inline Ydb::Table::ExecuteDataQueryRequest MakeSimpleRequestRPC( @@ -119,7 +119,7 @@ namespace NKqpHelpers { Ydb::Table::DeleteSessionRequest request; request.set_session_id(sessionId); auto future = NRpcService::DoLocalRpc<TEvDeleteSessionRequest>( - std::move(request), "", "", /* token */ runtime.GetActorSystem(0)); + std::move(request), "", /* token */ "", runtime.GetActorSystem(0)); } inline THolder<NKqp::TEvKqp::TEvQueryRequest> MakeStreamRequest( @@ -168,17 +168,15 @@ namespace NKqpHelpers { return FormatResult(result); } - inline TString KqpSimpleExec(TTestActorRuntime& runtime, const TString& query, bool staleRo = false, const TString& database = {}) { + inline auto KqpSimpleSend(TTestActorRuntime& runtime, const TString& query, bool staleRo = false, const TString& database = {}) { TString sessionId = CreateSessionRPC(runtime, database); TString txId; - auto response = AwaitResponse( - runtime, SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, true /* commitTx */, staleRo), database)); - if (response.operation().status() != Ydb::StatusIds::SUCCESS) { - return TStringBuilder() << "ERROR: " << response.operation().status(); - } - Ydb::Table::ExecuteQueryResult result; - response.operation().result().UnpackTo(&result); - return FormatResult(result); + return SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, /* commitTx */ true, staleRo), database); + } + + inline TString KqpSimpleExec(TTestActorRuntime& runtime, const TString& query, bool staleRo = false, const TString& database = {}) { + auto response = AwaitResponse(runtime, KqpSimpleSend(runtime, query, staleRo, database)); + return FormatResult(response); } inline TString KqpSimpleStaleRoExec(TTestActorRuntime& runtime, const TString& query, const TString& database = {}) { diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp index 5d982825973..2b14e090501 100644 --- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp +++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp @@ -2263,6 +2263,120 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) { UNIT_ASSERT_VALUES_EQUAL(volatileTxs, 2u); } + // Regression test for KIKIMR-21156 + Y_UNIT_TEST(VolatileCommitOnBlobStorageFailure) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetDomainPlanResolution(1000) + .SetEnableDataShardVolatileTransactions(true); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::KQP_SESSION, NLog::PRI_TRACE); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + CreateShardedTable(server, sender, "/Root", "table-1", 1); + CreateShardedTable(server, sender, "/Root", "table-2", 1); + + // Make sure read flags are persisted by performing a snapshot read + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, R"( + SELECT key, value FROM `/Root/table-1` + UNION ALL + SELECT key, value FROM `/Root/table-2` + ORDER BY key + )"), + ""); + + // Insert initial values + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 10);")); + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 20);")); + + // Start blocking commits for table-1 + const auto shards1 = GetTableShards(server, sender, "/Root/table-1"); + UNIT_ASSERT_VALUES_EQUAL(shards1.size(), 1u); + std::deque<TEvBlobStorage::TEvPut::TPtr> blockedPuts; + auto blockCommits = runtime.AddObserver<TEvBlobStorage::TEvPut>([&](TEvBlobStorage::TEvPut::TPtr& ev) { + auto* msg = ev->Get(); + // Drop all put requests for table-1 + if (msg->Id.TabletID() == shards1.at(0)) { + Cerr << "... blocking put " << msg->Id << Endl; + blockedPuts.push_back(std::move(ev)); + } + }); + + // Start an upsert to table-1, this will block further readonly localdb tx completions + Cerr << "... starting an upsert to table-1" << Endl; + auto firstUpsertFuture = KqpSimpleSend(runtime, R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 30); + )"); + + // Wait until puts are blocked + WaitFor(runtime, [&]{ return blockedPuts.size() > 0; }, "blocked puts"); + auto firstUpsertPuts = std::move(blockedPuts); + UNIT_ASSERT(blockedPuts.empty()); + + // Read from table-2 and write to table-1 based on the result + // This will result in a two-shard volatile tx writing to table-1 + Cerr << "... starting distributed tx between table-1 and table-2" << Endl; + auto volatileFuture = KqpSimpleSend(runtime, R"( + UPSERT INTO `/Root/table-1` + SELECT key + 2u AS key, value + 2u AS value + FROM `/Root/table-2`; + )"); + + // Wait until it also tries to commit + WaitFor(runtime, [&]{ return blockedPuts.size() > 0; }, "blocked puts"); + + // Now unblock the first upsert puts + blockCommits.Remove(); + for (auto& ev : firstUpsertPuts) { + runtime.Send(ev.Release(), 0, true); + } + firstUpsertPuts.clear(); + + // And wait for it to finish successfully + Cerr << "... waiting for first upsert result" << Endl; + UNIT_ASSERT_VALUES_EQUAL( + FormatResult(AwaitResponse(runtime, std::move(firstUpsertFuture))), + "<empty>"); + + // Reply to everything previously blocked with an error, the shard will restart + for (auto& ev : blockedPuts) { + auto proxy = ev->Recipient; + ui32 groupId = GroupIDFromBlobStorageProxyID(proxy); + auto res = ev->Get()->MakeErrorResponse(NKikimrProto::ERROR, "Something went wrong", groupId); + runtime.Send(new IEventHandle(ev->Sender, proxy, res.release()), 0, true); + } + + // Wait for the volatile tx result + Cerr << "... waiting for volatile tx result" << Endl; + auto result = FormatResult(AwaitResponse(runtime, std::move(volatileFuture))); + if (result == "<empty>") { + // A success result is not ok now, but in the future we might migrate state + // Check that the supposedly committed row actually exists + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, R"( + SELECT key, value FROM `/Root/table-1` ORDER BY key; + )"), + "{ items { uint32_value: 1 } items { uint32_value: 10 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 30 } }, " + "{ items { uint32_value: 4 } items { uint32_value: 22 } }"); + } else { + // Otherwise the result must be undetermined + UNIT_ASSERT_VALUES_EQUAL(result, "ERROR: UNDETERMINED"); + } + } + } // Y_UNIT_TEST_SUITE(DataShardVolatile) } // namespace NKikimr diff --git a/ydb/core/tx/datashard/finish_propose_unit.cpp b/ydb/core/tx/datashard/finish_propose_unit.cpp index 545a1f88b2c..3f76432c700 100644 --- a/ydb/core/tx/datashard/finish_propose_unit.cpp +++ b/ydb/core/tx/datashard/finish_propose_unit.cpp @@ -99,8 +99,11 @@ EExecutionStatus TFinishProposeUnit::Execute(TOperation::TPtr op, op->SetFinishProposeTs(DataShard.ConfirmReadOnlyLease()); } - if (!op->HasResultSentFlag() && (op->IsDirty() || op->HasVolatilePrepareFlag() || !Pipeline.WaitCompletion(op))) + if (!op->HasResultSentFlag() && (op->IsDirty() || op->HasVolatilePrepareFlag() || !Pipeline.WaitCompletion(op))) { + DataShard.IncCounter(COUNTER_PREPARE_COMPLETE); + op->SetProposeResultSentEarly(); CompleteRequest(op, ctx); + } if (!DataShard.IsFollower()) DataShard.PlanCleanup(ctx); @@ -128,7 +131,7 @@ EExecutionStatus TFinishProposeUnit::Execute(TOperation::TPtr op, void TFinishProposeUnit::Complete(TOperation::TPtr op, const TActorContext &ctx) { - if (!op->HasResultSentFlag()) { + if (!op->HasResultSentFlag() && !op->IsProposeResultSentEarly()) { DataShard.IncCounter(COUNTER_PREPARE_COMPLETE); if (op->Result()) diff --git a/ydb/core/tx/datashard/finish_propose_write_unit.cpp b/ydb/core/tx/datashard/finish_propose_write_unit.cpp index 5a1e4af8f1f..13aceb011c5 100644 --- a/ydb/core/tx/datashard/finish_propose_write_unit.cpp +++ b/ydb/core/tx/datashard/finish_propose_write_unit.cpp @@ -97,8 +97,11 @@ EExecutionStatus TFinishProposeWriteUnit::Execute(TOperation::TPtr op, op->SetFinishProposeTs(DataShard.ConfirmReadOnlyLease()); } - if (!op->HasResultSentFlag() && (op->IsDirty() || op->HasVolatilePrepareFlag() || !Pipeline.WaitCompletion(op))) + if (!op->HasResultSentFlag() && (op->IsDirty() || op->HasVolatilePrepareFlag() || !Pipeline.WaitCompletion(op))) { + DataShard.IncCounter(COUNTER_PREPARE_COMPLETE); + op->SetProposeResultSentEarly(); CompleteRequest(op, ctx); + } if (!DataShard.IsFollower()) DataShard.PlanCleanup(ctx); @@ -127,7 +130,7 @@ void TFinishProposeWriteUnit::Complete(TOperation::TPtr op, const TActorContext { TWriteOperation* writeOp = TWriteOperation::CastWriteOperation(op); - if (!op->HasResultSentFlag()) { + if (!op->HasResultSentFlag() && !op->IsProposeResultSentEarly()) { DataShard.IncCounter(COUNTER_WRITE_COMPLETE); if (writeOp->GetWriteResult()) diff --git a/ydb/core/tx/datashard/operation.h b/ydb/core/tx/datashard/operation.h index 0b8e13a271d..192661fb1fe 100644 --- a/ydb/core/tx/datashard/operation.h +++ b/ydb/core/tx/datashard/operation.h @@ -408,12 +408,15 @@ public: bool IsMvccSnapshotRead() const { return !MvccSnapshot.IsMax(); } const TRowVersion& GetMvccSnapshot() const { return MvccSnapshot; } - bool IsMvccSnapshotRepeatable() const { return MvccSnapshotRepeatable; } + bool IsMvccSnapshotRepeatable() const { return MvccSnapshotRepeatable_; } void SetMvccSnapshot(const TRowVersion& snapshot, bool isRepeatable = true) { MvccSnapshot = snapshot; - MvccSnapshotRepeatable = isRepeatable; + MvccSnapshotRepeatable_ = isRepeatable; } + bool IsProposeResultSentEarly() const { return ProposeResultSentEarly_; } + void SetProposeResultSentEarly(bool value = true) { ProposeResultSentEarly_ = value; } + /////////////////////////////////// // DEBUG AND MONITORING // /////////////////////////////////// @@ -435,7 +438,11 @@ protected: TSnapshotKey AcquiredSnapshotKey; TRowVersion MvccSnapshot = TRowVersion::Max(); - bool MvccSnapshotRepeatable = false; + +private: + // Runtime flags + ui8 MvccSnapshotRepeatable_ : 1 = 0; + ui8 ProposeResultSentEarly_ : 1 = 0; }; struct TRSData { |