diff options
author | Vladislav Kuznetsov <va.kuznecov@physics.msu.ru> | 2022-04-20 23:18:40 +0300 |
---|---|---|
committer | Vladislav Kuznetsov <va.kuznecov@physics.msu.ru> | 2022-04-20 23:18:40 +0300 |
commit | 0c9516d6478f79f48d05fc65a1e18ef76b3c00d7 (patch) | |
tree | 4857fa8c9d9a205fd1f46076c6ca34f8c333eaad | |
parent | aeb42d9215cd131948447bbef79b6afdcba6399c (diff) | |
download | ydb-0c9516d6478f79f48d05fc65a1e18ef76b3c00d7.tar.gz |
Rework SessionActor's cleanup and reply mechanisms KIKIMR-11938
ref:08f21405d3231143cc9e3e51c7898bba35487e2d
-rw-r--r-- | ydb/core/kqp/kqp_session_actor.cpp | 180 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_tx_ut.cpp | 53 |
2 files changed, 145 insertions, 88 deletions
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index aa54c02f18f..c7e509654ae 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -44,6 +44,7 @@ namespace { #define LOG_N(msg) LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg) #define LOG_I(msg) LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg) #define LOG_D(msg) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg) +#define LOG_T(msg) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg) class TRequestFail : public yexception { public: @@ -242,11 +243,10 @@ public: } else { QueryState->TxCtx = txCtx; txCtx->Invalidate(); - AbortedTransactions.emplace_back(txCtx); + TransactionsToBeAborted.emplace_back(txCtx); RemoveTransaction(txId); - SendRollbackRequest(txCtx.Get()); - Become(&TKqpSessionActor::ExecuteState); + ReplySuccess(); } } @@ -435,12 +435,7 @@ public: LWTRACK(KqpQueryCompiled, QueryState->Orbit, TStringBuilder() << compileResult->Status); if (compileResult->Status != Ydb::StatusIds::SUCCESS) { - if (ReplyQueryCompileError(compileResult)) { - Cleanup(); - StartIdleTimer(); - } else { - FinalCleanup(); - } + ReplyQueryCompileError(compileResult); return; } @@ -456,12 +451,7 @@ public: auto& queryRequest = QueryState->Request; if (queryRequest.GetAction() == NKikimrKqp::QUERY_ACTION_PREPARE) { - if (ReplyPrepareResult(compileResult)) { - Cleanup(); - StartIdleTimer(); - } else { - FinalCleanup(); - } + ReplyPrepareResult(compileResult); return; } @@ -563,7 +553,7 @@ public: auto idleDuration = TInstant::Now() - it.Value()->LastAccessTime; if (idleDuration.Seconds() >= *Config->_KqpTxIdleTimeoutSec.Get()) { it.Value()->Invalidate(); - AbortedTransactions.emplace_back(std::move(it.Value())); + TransactionsToBeAborted.emplace_back(std::move(it.Value())); ExplicitTransactions.Erase(it); } else { auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); @@ -687,14 +677,15 @@ public: } } - bool ReplyPrepareResult(const TKqpCompileResult::TConstPtr& compileResult) { - auto responseEv = std::make_unique<TEvKqp::TEvQueryResponse>(); - FillCompileStatus(compileResult, responseEv->Record); + void ReplyPrepareResult(const TKqpCompileResult::TConstPtr& compileResult) { + QueryResponse = std::make_unique<TEvKqp::TEvQueryResponse>(); + FillCompileStatus(compileResult, QueryResponse->Record); auto ru = NRuCalc::CpuTimeToUnit(TDuration::MicroSeconds(QueryState->CompileStats.GetCpuTimeUs())); - responseEv->Record.GetRef().SetConsumedRu(ru); + auto& record = QueryResponse->Record.GetRef(); + record.SetConsumedRu(ru); - return Reply(std::move(responseEv)); + Cleanup(IsFatalError(record.GetYdbStatus())); } IKqpGateway::TExecPhysicalRequest PreparePhysicalRequest(TKqpQueryState *queryState) { @@ -1023,7 +1014,7 @@ public: void HandleExecute(TEvKqpExecuter::TEvTxResponse::TPtr& ev) { auto* response = ev->Get()->Record.MutableResponse(); - LOG_D("TEvTxResponse, CurrentTx: " << QueryState->CurrentTx << " response: " << response->DebugString()); + LOG_T("TEvTxResponse, CurrentTx: " << QueryState->CurrentTx << " response: " << response->DebugString()); ExecuterId = TActorId{}; if (response->GetStatus() != Ydb::StatusIds::SUCCESS) { @@ -1032,7 +1023,7 @@ public: auto& txCtx = QueryState->TxCtx; txCtx->Invalidate(); - AbortedTransactions.emplace_back(txCtx); + TransactionsToBeAborted.emplace_back(txCtx); RemoveTransaction(QueryState->TxId); auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); @@ -1268,7 +1259,6 @@ public: } void ReplySuccess() { - // return result auto resEv = std::make_unique<TEvKqp::TEvQueryResponse>(); std::shared_ptr<google::protobuf::Arena> arena(new google::protobuf::Arena()); resEv->Record.Realloc(arena); @@ -1277,6 +1267,7 @@ public: FillStats(record); + YQL_ENSURE(QueryState); if (QueryState->Commit) { ResetTxState(); } @@ -1343,16 +1334,15 @@ public: resEv->Record.GetRef().SetYdbStatus(Ydb::StatusIds::SUCCESS); LOG_D("Reply for action: " << queryRequest.GetAction() << " with SUCCESS status"); - Reply(std::move(resEv)); - LWTRACK(KqpQueryReplySuccess, QueryState->Orbit, arena->SpaceUsed()); + QueryResponse = std::move(resEv); Cleanup(); } - bool ReplyQueryCompileError(const TKqpCompileResult::TConstPtr& compileResult) { - auto responseEv = std::make_unique<TEvKqp::TEvQueryResponse>(); - FillCompileStatus(compileResult, responseEv->Record); + void ReplyQueryCompileError(const TKqpCompileResult::TConstPtr& compileResult) { + QueryResponse = std::make_unique<TEvKqp::TEvQueryResponse>(); + FillCompileStatus(compileResult, QueryResponse->Record); auto& queryRequest = QueryState->Request; TString txId = ""; @@ -1368,16 +1358,16 @@ public: auto txCtx = FindTransaction(txId); if (txCtx) { txCtx->Invalidate(); - AbortedTransactions.emplace_back(txCtx); + TransactionsToBeAborted.emplace_back(txCtx); RemoveTransaction(txId); } txId = ""; - auto* record = &responseEv->Record.GetRef(); + auto* record = &QueryResponse->Record.GetRef(); FillTxInfo(record->MutableResponse()); record->SetConsumedRu(1); - return Reply(std::move(responseEv)); + Cleanup(IsFatalError(record->GetYdbStatus())); } void ReplyBusy(TEvKqp::TEvQueryRequest::TPtr& ev) { @@ -1400,9 +1390,18 @@ public: Send(ev->Sender, response.Release(), 0, proxyRequestId); } - bool Reply(std::unique_ptr<TEvKqp::TEvQueryResponse> responseEv) { - YQL_ENSURE(QueryState); + static bool IsFatalError(const Ydb::StatusIds::StatusCode status) { + switch (status) { + case Ydb::StatusIds::INTERNAL_ERROR: + case Ydb::StatusIds::BAD_SESSION: + return true; + default: + return false; + } + } + void Reply() { + Y_VERIFY(QueryState); auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); auto& queryRequest = QueryState->Request; @@ -1410,28 +1409,28 @@ public: YQL_ENSURE(Counters); Counters->ReportQueryLatency(Settings.DbCounters, queryRequest.GetAction(), queryDuration); - auto& record = responseEv->Record.GetRef(); + auto& record = QueryResponse->Record.GetRef(); auto& response = *record.MutableResponse(); const auto& status = record.GetYdbStatus(); response.SetSessionId(SessionId); - Send(QueryState->Sender, responseEv.release(), 0, QueryState->ProxyRequestId); + if (status == Ydb::StatusIds::SUCCESS) { + LWTRACK(KqpQueryReplySuccess, QueryState->Orbit, record.GetArena()->SpaceUsed()); + } else { + LWTRACK(KqpQueryReplyError, QueryState->Orbit, TStringBuilder() << status); + } + Send(QueryState->Sender, QueryResponse.release(), 0, QueryState->ProxyRequestId); LOG_D(requestInfo << "Sent query response back to proxy, proxyRequestId: " << QueryState->ProxyRequestId << ", proxyId: " << QueryState->Sender.ToString()); if (status == Ydb::StatusIds::INTERNAL_ERROR) { LOG_D(requestInfo << "SessionActor destroyed due to internal error"); Counters->ReportSessionActorClosedError(Settings.DbCounters); - return false; - } - if (status == Ydb::StatusIds::BAD_SESSION) { + } else if (status == Ydb::StatusIds::BAD_SESSION) { LOG_D(requestInfo << "SessionActor destroyed due to session error"); Counters->ReportSessionActorClosedError(Settings.DbCounters); - return false; } - - return true; } void FillCompileStatus(const TKqpCompileResult::TConstPtr& compileResult, @@ -1481,8 +1480,6 @@ public: auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); ReplyQueryError(requestInfo, Ydb::StatusIds::SESSION_EXPIRED, "Request cancelled due to explicit session close request"); - // TODO Remove cleanup from ReplyQueryError since now it is possible - // for TxCtx in ExplicitTransactions to leak } if (CleanupCtx) { CleanupCtx->Final = true; @@ -1542,11 +1539,42 @@ public: } } + void HandleNoop(TEvKqp::TEvIdleTimeout::TPtr&) { + } + + void HandleCleanup(TEvKqp::TEvQueryRequest::TPtr& ev) { + ReplyBusy(ev); + } + void FinalCleanup() { Cleanup(true); } - void HandleNoop(TEvKqp::TEvIdleTimeout::TPtr&) { + void Cleanup(bool isFinal = false) { + if (isFinal) { + for (auto it = ExplicitTransactions.Begin(); it != ExplicitTransactions.End(); ++it) { + it.Value()->Invalidate(); + TransactionsToBeAborted.emplace_back(std::move(it.Value())); + } + ExplicitTransactions.Clear(); + } + + if (TransactionsToBeAborted.size()) { + YQL_ENSURE(!CleanupCtx); + CleanupCtx.reset(new TKqpCleanupCtx); + CleanupCtx->Final = isFinal; + CleanupCtx->AbortedTransactionsCount = 0; + CleanupCtx->TransactionsToBeAborted = TransactionsToBeAborted.size(); + SendRollbackRequest(TransactionsToBeAborted.front().Get()); + } + + LOG_I("Cleanup start, isFinal: " << isFinal << " CleanupCtx: " << bool{CleanupCtx} + << " TransactionsToBeAborted.size(): " << TransactionsToBeAborted.size()); + if (CleanupCtx) { + Become(&TKqpSessionActor::CleanupState); + } else { + EndCleanup(isFinal); + } } void HandleCleanup(TEvKqpExecuter::TEvTxResponse::TPtr& ev) { @@ -1555,13 +1583,20 @@ public: YQL_ENSURE(response.GetStatus() == Ydb::StatusIds::SUCCESS); YQL_ENSURE(CleanupCtx); ++CleanupCtx->AbortedTransactionsCount; - if (CleanupCtx->AbortedTransactionsCount == CleanupCtx->TransactionsToBeAborted) { + if (CleanupCtx->AbortedTransactionsCount < CleanupCtx->TransactionsToBeAborted) { + auto& txCtx = TransactionsToBeAborted[CleanupCtx->AbortedTransactionsCount]; + SendRollbackRequest(txCtx.Get()); + } else { EndCleanup(CleanupCtx->Final); } } void EndCleanup(bool isFinal) { LOG_D("EndCleanup, isFinal: " << isFinal); + + if (QueryResponse) + Reply(); + if (isFinal) { auto lifeSpan = TInstant::Now() - CreationTime; Counters->ReportSessionActorFinished(Settings.DbCounters, lifeSpan); @@ -1575,44 +1610,14 @@ public: PassAway(); } else { - AbortedTransactions.clear(); + TransactionsToBeAborted.clear(); CleanupCtx.reset(); + QueryState.reset(); StartIdleTimer(); Become(&TKqpSessionActor::ReadyState); } } - void Cleanup(bool isFinal = false) { - if (isFinal) { - for (auto it = ExplicitTransactions.Begin(); it != ExplicitTransactions.End(); ++it) { - it.Value()->Invalidate(); - AbortedTransactions.emplace_back(std::move(it.Value())); - } - ExplicitTransactions.Clear(); - } - - if (AbortedTransactions.size()) { - YQL_ENSURE(!CleanupCtx); - CleanupCtx.reset(new TKqpCleanupCtx); - CleanupCtx->Final = isFinal; - CleanupCtx->AbortedTransactionsCount = 0; - CleanupCtx->TransactionsToBeAborted = AbortedTransactions.size(); - // TODO Rollback one-by-one to avoid burst - for (auto& txCtx : AbortedTransactions) { - SendRollbackRequest(txCtx.Get()); - } - } - - LOG_I("Cleanup start, isFinal: " << isFinal << " CleanupCtx: " << bool{CleanupCtx} - << " AbortedTransactions.size(): " << AbortedTransactions.size()); - QueryState.reset(); - if (CleanupCtx) { - Become(&TKqpSessionActor::CleanupState); - } else { - EndCleanup(isFinal); - } - } - template<class T> static google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> MessageFromIssues(const T& issues) { google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> issueMessage; @@ -1623,16 +1628,15 @@ public: return issueMessage; } - bool ReplyQueryError(const TKqpRequestInfo& requestInfo, Ydb::StatusIds::StatusCode ydbStatus, + void ReplyQueryError(const TKqpRequestInfo& requestInfo, Ydb::StatusIds::StatusCode ydbStatus, const TString& message, std::optional<google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>> issues = {}) { LOG_W("Reply error on query request: " << requestInfo << " msg: " << message); - auto ev = std::make_unique<TEvKqp::TEvQueryResponse>(); - ev->Record.GetRef().SetYdbStatus(ydbStatus); + QueryResponse = std::make_unique<TEvKqp::TEvQueryResponse>(); + QueryResponse->Record.GetRef().SetYdbStatus(ydbStatus); - LWTRACK(KqpQueryReplyError, QueryState->Orbit, message); - auto* response = ev->Record.GetRef().MutableResponse(); + auto* response = QueryResponse->Record.GetRef().MutableResponse(); auto *queryIssue = response->AddQueryIssues(); if (issues) { @@ -1648,9 +1652,7 @@ public: FillTxInfo(response); } - bool canContinue = Reply(std::move(ev)); - Cleanup(!canContinue); - return canContinue; + Cleanup(IsFatalError(ydbStatus)); } STATEFN(ReadyState) { @@ -1718,10 +1720,11 @@ public: } } - // optional -- only if there were any AbortedTransactions + // optional -- only if there were any TransactionsToBeAborted STATEFN(CleanupState) { try { switch (ev->GetTypeRewrite()) { + hFunc(TEvKqp::TEvQueryRequest, HandleCleanup); hFunc(TEvKqpExecuter::TEvTxResponse, HandleCleanup); hFunc(TEvKqpExecuter::TEvExecuterProgress, HandleNoop); @@ -1769,7 +1772,8 @@ private: ui32 QueryId = 0; TKikimrConfiguration::TPtr Config; TLRUCache<TString, TIntrusivePtr<TKqpTransactionContext>> ExplicitTransactions; - std::vector<TIntrusivePtr<TKqpTransactionContext>> AbortedTransactions; + std::vector<TIntrusivePtr<TKqpTransactionContext>> TransactionsToBeAborted; + std::unique_ptr<TEvKqp::TEvQueryResponse> QueryResponse; TActorId IdleTimerActorId; ui32 IdleTimerId = 0; diff --git a/ydb/core/kqp/ut/kqp_tx_ut.cpp b/ydb/core/kqp/ut/kqp_tx_ut.cpp index 5837331604c..3cd43ec57e8 100644 --- a/ydb/core/kqp/ut/kqp_tx_ut.cpp +++ b/ydb/core/kqp/ut/kqp_tx_ut.cpp @@ -203,6 +203,59 @@ Y_UNIT_TEST_SUITE(KqpTx) { UNIT_ASSERT(HasIssue(rollbackResult.GetIssues(), NYql::TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND)); } + Y_UNIT_TEST(RollbackManyTx) { + auto setting = NKikimrKqp::TKqpSetting(); + setting.SetName("_KqpMaxActiveTxPerSession"); + setting.SetValue("10"); + + TKikimrRunner kikimr({setting}); + auto db = kikimr.GetTableClient(); + + auto query = R"( + --!syntax_v1 + PRAGMA kikimr.UseNewEngine = "true"; + + DECLARE $key AS Uint64; + + UPDATE `/Root/TwoShard` SET Value1 = "Updated" WHERE Key = $key; + )"; + + auto session = db.CreateSession().GetValueSync().GetSession(); + auto beginTx = [&](ui32 idx) { + auto params = kikimr.GetTableClient().GetParamsBuilder() + .AddParam("$key").Uint64(302 + idx).Build() + .Build(); + + NYdb::NTable::TExecDataQuerySettings execSettings; + execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + + auto result = session.ExecuteDataQuery(query, + TTxControl::BeginTx(TTxSettings::SerializableRW()), params, execSettings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + return result; + }; + + // Explicit rollback + for (ui32 i = 0; i < 10; ++i) { + auto result = beginTx(i); + + auto tx = result.GetTransaction(); + UNIT_ASSERT(tx); + UNIT_ASSERT(tx->IsActive()); + + auto rollbackResult = tx->Rollback().ExtractValueSync(); + UNIT_ASSERT(rollbackResult.IsSuccess()); + } + session.Close(); + + // Implicit rollback + session = db.CreateSession().GetValueSync().GetSession(); + for (ui32 i = 0; i < 10; ++i) { + beginTx(i); + } + session.Close(); + } + Y_UNIT_TEST_NEW_ENGINE(RollbackRoTx) { TKikimrRunner kikimr; auto db = kikimr.GetTableClient(); |