aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVladislav Kuznetsov <va.kuznecov@physics.msu.ru>2022-04-20 23:18:40 +0300
committerVladislav Kuznetsov <va.kuznecov@physics.msu.ru>2022-04-20 23:18:40 +0300
commit0c9516d6478f79f48d05fc65a1e18ef76b3c00d7 (patch)
tree4857fa8c9d9a205fd1f46076c6ca34f8c333eaad
parentaeb42d9215cd131948447bbef79b6afdcba6399c (diff)
downloadydb-0c9516d6478f79f48d05fc65a1e18ef76b3c00d7.tar.gz
Rework SessionActor's cleanup and reply mechanisms KIKIMR-11938
ref:08f21405d3231143cc9e3e51c7898bba35487e2d
-rw-r--r--ydb/core/kqp/kqp_session_actor.cpp180
-rw-r--r--ydb/core/kqp/ut/kqp_tx_ut.cpp53
2 files changed, 145 insertions, 88 deletions
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp
index aa54c02f18f..c7e509654ae 100644
--- a/ydb/core/kqp/kqp_session_actor.cpp
+++ b/ydb/core/kqp/kqp_session_actor.cpp
@@ -44,6 +44,7 @@ namespace {
#define LOG_N(msg) LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg)
#define LOG_I(msg) LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg)
#define LOG_D(msg) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg)
+#define LOG_T(msg) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg)
class TRequestFail : public yexception {
public:
@@ -242,11 +243,10 @@ public:
} else {
QueryState->TxCtx = txCtx;
txCtx->Invalidate();
- AbortedTransactions.emplace_back(txCtx);
+ TransactionsToBeAborted.emplace_back(txCtx);
RemoveTransaction(txId);
- SendRollbackRequest(txCtx.Get());
- Become(&TKqpSessionActor::ExecuteState);
+ ReplySuccess();
}
}
@@ -435,12 +435,7 @@ public:
LWTRACK(KqpQueryCompiled, QueryState->Orbit, TStringBuilder() << compileResult->Status);
if (compileResult->Status != Ydb::StatusIds::SUCCESS) {
- if (ReplyQueryCompileError(compileResult)) {
- Cleanup();
- StartIdleTimer();
- } else {
- FinalCleanup();
- }
+ ReplyQueryCompileError(compileResult);
return;
}
@@ -456,12 +451,7 @@ public:
auto& queryRequest = QueryState->Request;
if (queryRequest.GetAction() == NKikimrKqp::QUERY_ACTION_PREPARE) {
- if (ReplyPrepareResult(compileResult)) {
- Cleanup();
- StartIdleTimer();
- } else {
- FinalCleanup();
- }
+ ReplyPrepareResult(compileResult);
return;
}
@@ -563,7 +553,7 @@ public:
auto idleDuration = TInstant::Now() - it.Value()->LastAccessTime;
if (idleDuration.Seconds() >= *Config->_KqpTxIdleTimeoutSec.Get()) {
it.Value()->Invalidate();
- AbortedTransactions.emplace_back(std::move(it.Value()));
+ TransactionsToBeAborted.emplace_back(std::move(it.Value()));
ExplicitTransactions.Erase(it);
} else {
auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
@@ -687,14 +677,15 @@ public:
}
}
- bool ReplyPrepareResult(const TKqpCompileResult::TConstPtr& compileResult) {
- auto responseEv = std::make_unique<TEvKqp::TEvQueryResponse>();
- FillCompileStatus(compileResult, responseEv->Record);
+ void ReplyPrepareResult(const TKqpCompileResult::TConstPtr& compileResult) {
+ QueryResponse = std::make_unique<TEvKqp::TEvQueryResponse>();
+ FillCompileStatus(compileResult, QueryResponse->Record);
auto ru = NRuCalc::CpuTimeToUnit(TDuration::MicroSeconds(QueryState->CompileStats.GetCpuTimeUs()));
- responseEv->Record.GetRef().SetConsumedRu(ru);
+ auto& record = QueryResponse->Record.GetRef();
+ record.SetConsumedRu(ru);
- return Reply(std::move(responseEv));
+ Cleanup(IsFatalError(record.GetYdbStatus()));
}
IKqpGateway::TExecPhysicalRequest PreparePhysicalRequest(TKqpQueryState *queryState) {
@@ -1023,7 +1014,7 @@ public:
void HandleExecute(TEvKqpExecuter::TEvTxResponse::TPtr& ev) {
auto* response = ev->Get()->Record.MutableResponse();
- LOG_D("TEvTxResponse, CurrentTx: " << QueryState->CurrentTx << " response: " << response->DebugString());
+ LOG_T("TEvTxResponse, CurrentTx: " << QueryState->CurrentTx << " response: " << response->DebugString());
ExecuterId = TActorId{};
if (response->GetStatus() != Ydb::StatusIds::SUCCESS) {
@@ -1032,7 +1023,7 @@ public:
auto& txCtx = QueryState->TxCtx;
txCtx->Invalidate();
- AbortedTransactions.emplace_back(txCtx);
+ TransactionsToBeAborted.emplace_back(txCtx);
RemoveTransaction(QueryState->TxId);
auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
@@ -1268,7 +1259,6 @@ public:
}
void ReplySuccess() {
- // return result
auto resEv = std::make_unique<TEvKqp::TEvQueryResponse>();
std::shared_ptr<google::protobuf::Arena> arena(new google::protobuf::Arena());
resEv->Record.Realloc(arena);
@@ -1277,6 +1267,7 @@ public:
FillStats(record);
+ YQL_ENSURE(QueryState);
if (QueryState->Commit) {
ResetTxState();
}
@@ -1343,16 +1334,15 @@ public:
resEv->Record.GetRef().SetYdbStatus(Ydb::StatusIds::SUCCESS);
LOG_D("Reply for action: " << queryRequest.GetAction() << " with SUCCESS status");
- Reply(std::move(resEv));
- LWTRACK(KqpQueryReplySuccess, QueryState->Orbit, arena->SpaceUsed());
+ QueryResponse = std::move(resEv);
Cleanup();
}
- bool ReplyQueryCompileError(const TKqpCompileResult::TConstPtr& compileResult) {
- auto responseEv = std::make_unique<TEvKqp::TEvQueryResponse>();
- FillCompileStatus(compileResult, responseEv->Record);
+ void ReplyQueryCompileError(const TKqpCompileResult::TConstPtr& compileResult) {
+ QueryResponse = std::make_unique<TEvKqp::TEvQueryResponse>();
+ FillCompileStatus(compileResult, QueryResponse->Record);
auto& queryRequest = QueryState->Request;
TString txId = "";
@@ -1368,16 +1358,16 @@ public:
auto txCtx = FindTransaction(txId);
if (txCtx) {
txCtx->Invalidate();
- AbortedTransactions.emplace_back(txCtx);
+ TransactionsToBeAborted.emplace_back(txCtx);
RemoveTransaction(txId);
}
txId = "";
- auto* record = &responseEv->Record.GetRef();
+ auto* record = &QueryResponse->Record.GetRef();
FillTxInfo(record->MutableResponse());
record->SetConsumedRu(1);
- return Reply(std::move(responseEv));
+ Cleanup(IsFatalError(record->GetYdbStatus()));
}
void ReplyBusy(TEvKqp::TEvQueryRequest::TPtr& ev) {
@@ -1400,9 +1390,18 @@ public:
Send(ev->Sender, response.Release(), 0, proxyRequestId);
}
- bool Reply(std::unique_ptr<TEvKqp::TEvQueryResponse> responseEv) {
- YQL_ENSURE(QueryState);
+ static bool IsFatalError(const Ydb::StatusIds::StatusCode status) {
+ switch (status) {
+ case Ydb::StatusIds::INTERNAL_ERROR:
+ case Ydb::StatusIds::BAD_SESSION:
+ return true;
+ default:
+ return false;
+ }
+ }
+ void Reply() {
+ Y_VERIFY(QueryState);
auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
auto& queryRequest = QueryState->Request;
@@ -1410,28 +1409,28 @@ public:
YQL_ENSURE(Counters);
Counters->ReportQueryLatency(Settings.DbCounters, queryRequest.GetAction(), queryDuration);
- auto& record = responseEv->Record.GetRef();
+ auto& record = QueryResponse->Record.GetRef();
auto& response = *record.MutableResponse();
const auto& status = record.GetYdbStatus();
response.SetSessionId(SessionId);
- Send(QueryState->Sender, responseEv.release(), 0, QueryState->ProxyRequestId);
+ if (status == Ydb::StatusIds::SUCCESS) {
+ LWTRACK(KqpQueryReplySuccess, QueryState->Orbit, record.GetArena()->SpaceUsed());
+ } else {
+ LWTRACK(KqpQueryReplyError, QueryState->Orbit, TStringBuilder() << status);
+ }
+ Send(QueryState->Sender, QueryResponse.release(), 0, QueryState->ProxyRequestId);
LOG_D(requestInfo << "Sent query response back to proxy, proxyRequestId: " << QueryState->ProxyRequestId
<< ", proxyId: " << QueryState->Sender.ToString());
if (status == Ydb::StatusIds::INTERNAL_ERROR) {
LOG_D(requestInfo << "SessionActor destroyed due to internal error");
Counters->ReportSessionActorClosedError(Settings.DbCounters);
- return false;
- }
- if (status == Ydb::StatusIds::BAD_SESSION) {
+ } else if (status == Ydb::StatusIds::BAD_SESSION) {
LOG_D(requestInfo << "SessionActor destroyed due to session error");
Counters->ReportSessionActorClosedError(Settings.DbCounters);
- return false;
}
-
- return true;
}
void FillCompileStatus(const TKqpCompileResult::TConstPtr& compileResult,
@@ -1481,8 +1480,6 @@ public:
auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
ReplyQueryError(requestInfo, Ydb::StatusIds::SESSION_EXPIRED,
"Request cancelled due to explicit session close request");
- // TODO Remove cleanup from ReplyQueryError since now it is possible
- // for TxCtx in ExplicitTransactions to leak
}
if (CleanupCtx) {
CleanupCtx->Final = true;
@@ -1542,11 +1539,42 @@ public:
}
}
+ void HandleNoop(TEvKqp::TEvIdleTimeout::TPtr&) {
+ }
+
+ void HandleCleanup(TEvKqp::TEvQueryRequest::TPtr& ev) {
+ ReplyBusy(ev);
+ }
+
void FinalCleanup() {
Cleanup(true);
}
- void HandleNoop(TEvKqp::TEvIdleTimeout::TPtr&) {
+ void Cleanup(bool isFinal = false) {
+ if (isFinal) {
+ for (auto it = ExplicitTransactions.Begin(); it != ExplicitTransactions.End(); ++it) {
+ it.Value()->Invalidate();
+ TransactionsToBeAborted.emplace_back(std::move(it.Value()));
+ }
+ ExplicitTransactions.Clear();
+ }
+
+ if (TransactionsToBeAborted.size()) {
+ YQL_ENSURE(!CleanupCtx);
+ CleanupCtx.reset(new TKqpCleanupCtx);
+ CleanupCtx->Final = isFinal;
+ CleanupCtx->AbortedTransactionsCount = 0;
+ CleanupCtx->TransactionsToBeAborted = TransactionsToBeAborted.size();
+ SendRollbackRequest(TransactionsToBeAborted.front().Get());
+ }
+
+ LOG_I("Cleanup start, isFinal: " << isFinal << " CleanupCtx: " << bool{CleanupCtx}
+ << " TransactionsToBeAborted.size(): " << TransactionsToBeAborted.size());
+ if (CleanupCtx) {
+ Become(&TKqpSessionActor::CleanupState);
+ } else {
+ EndCleanup(isFinal);
+ }
}
void HandleCleanup(TEvKqpExecuter::TEvTxResponse::TPtr& ev) {
@@ -1555,13 +1583,20 @@ public:
YQL_ENSURE(response.GetStatus() == Ydb::StatusIds::SUCCESS);
YQL_ENSURE(CleanupCtx);
++CleanupCtx->AbortedTransactionsCount;
- if (CleanupCtx->AbortedTransactionsCount == CleanupCtx->TransactionsToBeAborted) {
+ if (CleanupCtx->AbortedTransactionsCount < CleanupCtx->TransactionsToBeAborted) {
+ auto& txCtx = TransactionsToBeAborted[CleanupCtx->AbortedTransactionsCount];
+ SendRollbackRequest(txCtx.Get());
+ } else {
EndCleanup(CleanupCtx->Final);
}
}
void EndCleanup(bool isFinal) {
LOG_D("EndCleanup, isFinal: " << isFinal);
+
+ if (QueryResponse)
+ Reply();
+
if (isFinal) {
auto lifeSpan = TInstant::Now() - CreationTime;
Counters->ReportSessionActorFinished(Settings.DbCounters, lifeSpan);
@@ -1575,44 +1610,14 @@ public:
PassAway();
} else {
- AbortedTransactions.clear();
+ TransactionsToBeAborted.clear();
CleanupCtx.reset();
+ QueryState.reset();
StartIdleTimer();
Become(&TKqpSessionActor::ReadyState);
}
}
- void Cleanup(bool isFinal = false) {
- if (isFinal) {
- for (auto it = ExplicitTransactions.Begin(); it != ExplicitTransactions.End(); ++it) {
- it.Value()->Invalidate();
- AbortedTransactions.emplace_back(std::move(it.Value()));
- }
- ExplicitTransactions.Clear();
- }
-
- if (AbortedTransactions.size()) {
- YQL_ENSURE(!CleanupCtx);
- CleanupCtx.reset(new TKqpCleanupCtx);
- CleanupCtx->Final = isFinal;
- CleanupCtx->AbortedTransactionsCount = 0;
- CleanupCtx->TransactionsToBeAborted = AbortedTransactions.size();
- // TODO Rollback one-by-one to avoid burst
- for (auto& txCtx : AbortedTransactions) {
- SendRollbackRequest(txCtx.Get());
- }
- }
-
- LOG_I("Cleanup start, isFinal: " << isFinal << " CleanupCtx: " << bool{CleanupCtx}
- << " AbortedTransactions.size(): " << AbortedTransactions.size());
- QueryState.reset();
- if (CleanupCtx) {
- Become(&TKqpSessionActor::CleanupState);
- } else {
- EndCleanup(isFinal);
- }
- }
-
template<class T>
static google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> MessageFromIssues(const T& issues) {
google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> issueMessage;
@@ -1623,16 +1628,15 @@ public:
return issueMessage;
}
- bool ReplyQueryError(const TKqpRequestInfo& requestInfo, Ydb::StatusIds::StatusCode ydbStatus,
+ void ReplyQueryError(const TKqpRequestInfo& requestInfo, Ydb::StatusIds::StatusCode ydbStatus,
const TString& message, std::optional<google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>> issues = {})
{
LOG_W("Reply error on query request: " << requestInfo << " msg: " << message);
- auto ev = std::make_unique<TEvKqp::TEvQueryResponse>();
- ev->Record.GetRef().SetYdbStatus(ydbStatus);
+ QueryResponse = std::make_unique<TEvKqp::TEvQueryResponse>();
+ QueryResponse->Record.GetRef().SetYdbStatus(ydbStatus);
- LWTRACK(KqpQueryReplyError, QueryState->Orbit, message);
- auto* response = ev->Record.GetRef().MutableResponse();
+ auto* response = QueryResponse->Record.GetRef().MutableResponse();
auto *queryIssue = response->AddQueryIssues();
if (issues) {
@@ -1648,9 +1652,7 @@ public:
FillTxInfo(response);
}
- bool canContinue = Reply(std::move(ev));
- Cleanup(!canContinue);
- return canContinue;
+ Cleanup(IsFatalError(ydbStatus));
}
STATEFN(ReadyState) {
@@ -1718,10 +1720,11 @@ public:
}
}
- // optional -- only if there were any AbortedTransactions
+ // optional -- only if there were any TransactionsToBeAborted
STATEFN(CleanupState) {
try {
switch (ev->GetTypeRewrite()) {
+ hFunc(TEvKqp::TEvQueryRequest, HandleCleanup);
hFunc(TEvKqpExecuter::TEvTxResponse, HandleCleanup);
hFunc(TEvKqpExecuter::TEvExecuterProgress, HandleNoop);
@@ -1769,7 +1772,8 @@ private:
ui32 QueryId = 0;
TKikimrConfiguration::TPtr Config;
TLRUCache<TString, TIntrusivePtr<TKqpTransactionContext>> ExplicitTransactions;
- std::vector<TIntrusivePtr<TKqpTransactionContext>> AbortedTransactions;
+ std::vector<TIntrusivePtr<TKqpTransactionContext>> TransactionsToBeAborted;
+ std::unique_ptr<TEvKqp::TEvQueryResponse> QueryResponse;
TActorId IdleTimerActorId;
ui32 IdleTimerId = 0;
diff --git a/ydb/core/kqp/ut/kqp_tx_ut.cpp b/ydb/core/kqp/ut/kqp_tx_ut.cpp
index 5837331604c..3cd43ec57e8 100644
--- a/ydb/core/kqp/ut/kqp_tx_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_tx_ut.cpp
@@ -203,6 +203,59 @@ Y_UNIT_TEST_SUITE(KqpTx) {
UNIT_ASSERT(HasIssue(rollbackResult.GetIssues(), NYql::TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND));
}
+ Y_UNIT_TEST(RollbackManyTx) {
+ auto setting = NKikimrKqp::TKqpSetting();
+ setting.SetName("_KqpMaxActiveTxPerSession");
+ setting.SetValue("10");
+
+ TKikimrRunner kikimr({setting});
+ auto db = kikimr.GetTableClient();
+
+ auto query = R"(
+ --!syntax_v1
+ PRAGMA kikimr.UseNewEngine = "true";
+
+ DECLARE $key AS Uint64;
+
+ UPDATE `/Root/TwoShard` SET Value1 = "Updated" WHERE Key = $key;
+ )";
+
+ auto session = db.CreateSession().GetValueSync().GetSession();
+ auto beginTx = [&](ui32 idx) {
+ auto params = kikimr.GetTableClient().GetParamsBuilder()
+ .AddParam("$key").Uint64(302 + idx).Build()
+ .Build();
+
+ NYdb::NTable::TExecDataQuerySettings execSettings;
+ execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+
+ auto result = session.ExecuteDataQuery(query,
+ TTxControl::BeginTx(TTxSettings::SerializableRW()), params, execSettings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ return result;
+ };
+
+ // Explicit rollback
+ for (ui32 i = 0; i < 10; ++i) {
+ auto result = beginTx(i);
+
+ auto tx = result.GetTransaction();
+ UNIT_ASSERT(tx);
+ UNIT_ASSERT(tx->IsActive());
+
+ auto rollbackResult = tx->Rollback().ExtractValueSync();
+ UNIT_ASSERT(rollbackResult.IsSuccess());
+ }
+ session.Close();
+
+ // Implicit rollback
+ session = db.CreateSession().GetValueSync().GetSession();
+ for (ui32 i = 0; i < 10; ++i) {
+ beginTx(i);
+ }
+ session.Close();
+ }
+
Y_UNIT_TEST_NEW_ENGINE(RollbackRoTx) {
TKikimrRunner kikimr;
auto db = kikimr.GetTableClient();