summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorva-kuznecov <[email protected]>2023-01-23 17:26:08 +0300
committerva-kuznecov <[email protected]>2023-01-23 17:26:08 +0300
commitdb3ac1ca828c3f6b79f1f6832e4a52900a696ebe (patch)
tree5489ed9f378d616ebf6e1e29707cb8642bf1b52c
parenta65aef1c0441efcfc8721c8a67fd25195f34d402 (diff)
Move transactions cache code outsize of session_actor
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp156
-rw-r--r--ydb/core/kqp/session_actor/kqp_tx.h92
2 files changed, 140 insertions, 108 deletions
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 083c3035c1a..7864c67e648 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -295,8 +295,7 @@ struct TKqpQueryState {
};
struct TKqpCleanupCtx {
- ui64 AbortedTransactionsCount = 0;
- ui64 TransactionsToBeAborted = 0;
+ std::deque<TIntrusivePtr<TKqpTransactionContext>> TransactionsToBeAborted;
bool IsWaitingForWorkerToClose = false;
bool Final = false;
TInstant Start = TInstant::Now();
@@ -340,7 +339,7 @@ public:
, ModuleResolverState(moduleResolverState)
, KqpSettings(kqpSettings)
, Config(CreateConfig(kqpSettings, workerSettings))
- , ExplicitTransactions(*Config->_KqpMaxActiveTxPerSession.Get())
+ , Transactions(*Config->_KqpMaxActiveTxPerSession.Get(), TDuration::Seconds(*Config->_KqpTxIdleTimeoutSec.Get()))
{
RequestCounters = MakeIntrusive<TKqpRequestCounters>();
RequestCounters->Counters = Counters;
@@ -419,41 +418,24 @@ public:
Cleanup();
}
- TIntrusivePtr<TKqpTransactionContext> FindTransaction(const TULID& id) {
- auto it = ExplicitTransactions.Find(id);
- if (it != ExplicitTransactions.End()) {
- auto& value = it.Value();
- value->Touch();
- return value;
- }
-
- return {};
- }
-
- void RemoveTransaction(const TULID& txId) {
- auto it = ExplicitTransactions.FindWithoutPromote(txId);
- if (it != ExplicitTransactions.End()) {
- ExplicitTransactions.Erase(it);
- }
+ void ReplyTransactionNotFound(const TString& txId) {
+ std::vector<TIssue> issues{YqlIssue(TPosition(), TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND,
+ TStringBuilder() << "Transaction not found: " << txId)};
+ ReplyQueryError(Ydb::StatusIds::NOT_FOUND, "", MessageFromIssues(issues));
}
void RollbackTx() {
- YQL_ENSURE(QueryState->HasTxControl(),
- "Can't perform ROLLBACK_TX: TxControl isn't set in TQueryRequest");
+ YQL_ENSURE(QueryState->HasTxControl(), "Can't perform ROLLBACK_TX: TxControl isn't set in TQueryRequest");
const auto& txControl = QueryState->GetTxControl();
QueryState->Commit = txControl.commit_tx();
TULID txId;
txId.ParseString(txControl.tx_id());
- auto txCtx = FindTransaction(txId);
- if (!txCtx) {
- std::vector<TIssue> issues{YqlIssue(TPosition(), TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND,
- TStringBuilder() << "Transaction not found: " << txControl.tx_id())};
- ReplyQueryError(Ydb::StatusIds::NOT_FOUND, "", MessageFromIssues(issues));
- } else {
- txCtx->Invalidate();
- InvalidateExplicitTransaction(txCtx, txId);
-
+ if (auto ctx = Transactions.ReleaseTransaction(txId)) {
+ ctx->Invalidate();
+ Transactions.AddToBeAborted(std::move(ctx));
ReplySuccess();
+ } else {
+ ReplyTransactionNotFound(txControl.tx_id());
}
}
@@ -467,12 +449,10 @@ public:
TULID txId;
txId.ParseString(txControl.tx_id());
- auto txCtx = FindTransaction(txId);
+ auto txCtx = Transactions.Find(txId);
LOG_D("queryRequest TxControl: " << txControl.DebugString() << " txCtx: " << (void*)txCtx.Get());
if (!txCtx) {
- std::vector<TIssue> issues{YqlIssue(TPosition(), TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND,
- TStringBuilder() << "Transaction not found: " << txControl.tx_id())};
- ReplyQueryError(Ydb::StatusIds::NOT_FOUND, "", MessageFromIssues(issues));
+ ReplyTransactionNotFound(txControl.tx_id());
return;
}
QueryState->TxCtx = std::move(txCtx);
@@ -865,43 +845,24 @@ public:
ExecuteOrDefer();
}
- void RemoveOldTransactions() {
- if (ExplicitTransactions.Size() == *Config->_KqpMaxActiveTxPerSession.Get()) {
- auto it = ExplicitTransactions.FindOldest();
- auto idleDuration = TInstant::Now() - it.Value()->LastAccessTime;
- if (idleDuration.Seconds() >= *Config->_KqpTxIdleTimeoutSec.Get()) {
- it.Value()->Invalidate();
- TransactionsToBeAborted.emplace_back(std::move(it.Value()));
- ExplicitTransactions.Erase(it);
- ++EvictedTx;
- } else {
- std::vector<TIssue> issues{
- YqlIssue({}, TIssuesIds::KIKIMR_TOO_MANY_TRANSACTIONS)
- };
- ythrow TRequestFail(Ydb::StatusIds::BAD_SESSION, MessageFromIssues(issues))
- << "Too many transactions, current active: " << ExplicitTransactions.Size()
- << " MaxTxPerSession: " << *Config->_KqpMaxActiveTxPerSession.Get();
- }
- }
- }
-
- void CreateNewTx() {
- RemoveOldTransactions();
- auto success = ExplicitTransactions.Insert(std::make_pair(QueryState->TxId, QueryState->TxCtx));
- YQL_ENSURE(success);
- }
-
void BeginTx(const Ydb::Table::TransactionSettings& settings) {
QueryState->TxId = UlidGen.Next();
QueryState->TxId_Human = QueryState->TxId.ToString();
QueryState->TxCtx = MakeIntrusive<TKqpTransactionContext>(false, AppData()->FunctionRegistry, AppData()->TimeProvider, AppData()->RandomProvider);
QueryState->QueryData = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc);
QueryState->TxCtx->SetIsolationLevel(settings);
- CreateNewTx();
+
+ if (!Transactions.CreateNew(QueryState->TxId, QueryState->TxCtx)) {
+ std::vector<TIssue> issues{
+ YqlIssue({}, TIssuesIds::KIKIMR_TOO_MANY_TRANSACTIONS)};
+ ythrow TRequestFail(Ydb::StatusIds::BAD_SESSION,
+ MessageFromIssues(issues))
+ << "Too many transactions, current active: " << Transactions.Size()
+ << " MaxTxPerSession: " << Transactions.MaxSize();
+ }
Counters->ReportTxCreated(Settings.DbCounters);
- Counters->ReportBeginTransaction(Settings.DbCounters, EvictedTx, ExplicitTransactions.Size(),
- TransactionsToBeAborted.size());
+ Counters->ReportBeginTransaction(Settings.DbCounters, Transactions.EvictedTx, Transactions.Size(), Transactions.ToBeAbortedSize());
}
std::pair<bool, TIssues> ApplyTableOperations(TKqpTransactionContext* txCtx, const NKqpProto::TKqpPhyQuery& query) {
@@ -923,14 +884,12 @@ public:
case Ydb::Table::TransactionControl::kTxId: {
TULID txId;
txId.ParseString(txControl.tx_id());
- auto it = ExplicitTransactions.Find(txId);
- if (it == ExplicitTransactions.End()) {
- std::vector<TIssue> issues{YqlIssue(TPosition(), TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND,
- TStringBuilder() << "Transaction not found: " << txControl.tx_id())};
- ReplyQueryError(Ydb::StatusIds::NOT_FOUND, "", MessageFromIssues(issues));
+ auto txCtx = Transactions.Find(txId);
+ if (!txCtx) {
+ ReplyTransactionNotFound(txControl.tx_id());
return false;
}
- QueryState->TxCtx = *it;
+ QueryState->TxCtx = txCtx;
QueryState->QueryData = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc);
QueryState->TxId = txId;
QueryState->TxId_Human = txControl.tx_id();
@@ -1616,7 +1575,10 @@ public:
void FillTxInfo(NKikimrKqp::TQueryResponse* response) {
YQL_ENSURE(QueryState);
if (QueryState->Commit) {
- RemoveTransaction(QueryState->TxId);
+ if (auto ctx = Transactions.ReleaseTransaction(QueryState->TxId)) {
+ ctx->Invalidate();
+ Transactions.AddToBeAborted(std::move(ctx));
+ }
QueryState->TxId = CreateEmptyULID();
QueryState->TxId_Human = "";
}
@@ -1782,10 +1744,9 @@ public:
}
LOG_W("ReplyQueryCompileError, status " << compileResult->Status << " remove tx with tx_id: " << txId_Human);
- auto txCtx = FindTransaction(txId);
- if (txCtx) {
- txCtx->Invalidate();
- InvalidateExplicitTransaction(txCtx, txId);
+ if (auto ctx = Transactions.ReleaseTransaction(txId)) {
+ ctx->Invalidate();
+ Transactions.AddToBeAborted(std::move(ctx));
}
txId = CreateEmptyULID();
txId_Human = "";
@@ -1967,18 +1928,14 @@ public:
Cleanup(true);
}
- void InvalidateExplicitTransaction(TIntrusivePtr<TKqpTransactionContext> txCtx, const TULID& txId) {
- TransactionsToBeAborted.emplace_back(std::move(txCtx));
- RemoveTransaction(txId);
- }
-
void Cleanup(bool isFinal = false) {
isFinal = isFinal || QueryState && !QueryState->KeepSession;
if (QueryState && QueryState->TxCtx) {
auto& txCtx = QueryState->TxCtx;
if (txCtx->IsInvalidated()) {
- InvalidateExplicitTransaction(QueryState->TxCtx, QueryState->TxId);
+ Transactions.AddToBeAborted(txCtx);
+ Transactions.ReleaseTransaction(QueryState->TxId);
}
DiscardPersistentSnapshot(txCtx->SnapshotHandle);
}
@@ -1987,12 +1944,8 @@ public:
Counters->ReportSessionActorClosedRequest(Settings.DbCounters);
if (isFinal) {
- for (auto it = ExplicitTransactions.Begin(); it != ExplicitTransactions.End(); ++it) {
- it.Value()->Invalidate();
- TransactionsToBeAborted.emplace_back(std::move(it.Value()));
- }
- Counters->ReportTxAborted(Settings.DbCounters, TransactionsToBeAborted.size());
- ExplicitTransactions.Clear();
+ Transactions.FinalCleanup();
+ Counters->ReportTxAborted(Settings.DbCounters, Transactions.ToBeAbortedSize());
}
if (WorkerId) {
@@ -2006,17 +1959,16 @@ public:
CleanupCtx->IsWaitingForWorkerToClose = true;
}
- if (TransactionsToBeAborted.size()) {
+ if (Transactions.ToBeAbortedSize()) {
if (!CleanupCtx)
CleanupCtx.reset(new TKqpCleanupCtx);
CleanupCtx->Final = isFinal;
- CleanupCtx->AbortedTransactionsCount = 0;
- CleanupCtx->TransactionsToBeAborted = TransactionsToBeAborted.size();
- SendRollbackRequest(TransactionsToBeAborted.front().Get());
+ CleanupCtx->TransactionsToBeAborted = Transactions.ReleaseToBeAborted();
+ SendRollbackRequest(CleanupCtx->TransactionsToBeAborted.front().Get());
}
LOG_I("Cleanup start, isFinal: " << isFinal << " CleanupCtx: " << bool{CleanupCtx}
- << " TransactionsToBeAborted.size(): " << TransactionsToBeAborted.size());
+ << " TransactionsToBeAborted.size(): " << (CleanupCtx ? CleanupCtx->TransactionsToBeAborted.size() : 0));
if (CleanupCtx) {
Become(&TKqpSessionActor::CleanupState);
} else {
@@ -2026,7 +1978,7 @@ public:
void HandleCleanup(TEvKqp::TEvCloseSessionResponse::TPtr&) {
CleanupCtx->IsWaitingForWorkerToClose = false;
- if (CleanupCtx->AbortedTransactionsCount == CleanupCtx->TransactionsToBeAborted) {
+ if (CleanupCtx->TransactionsToBeAborted.empty()) {
EndCleanup(CleanupCtx->Final);
}
}
@@ -2046,10 +1998,9 @@ public:
}
YQL_ENSURE(CleanupCtx);
- ++CleanupCtx->AbortedTransactionsCount;
- if (CleanupCtx->AbortedTransactionsCount < CleanupCtx->TransactionsToBeAborted) {
- auto& txCtx = TransactionsToBeAborted[CleanupCtx->AbortedTransactionsCount];
- SendRollbackRequest(txCtx.Get());
+ CleanupCtx->TransactionsToBeAborted.pop_front();
+ if (CleanupCtx->TransactionsToBeAborted.size()) {
+ SendRollbackRequest(CleanupCtx->TransactionsToBeAborted.front().Get());
} else {
if (!CleanupCtx->IsWaitingForWorkerToClose)
EndCleanup(CleanupCtx->Final);
@@ -2079,7 +2030,6 @@ public:
LOG_D("Session actor destroyed");
PassAway();
} else {
- TransactionsToBeAborted.clear();
CleanupCtx.reset();
QueryState.reset();
Become(&TKqpSessionActor::ReadyState);
@@ -2378,14 +2328,6 @@ private:
}
private:
-
- TULID CreateEmptyULID() {
- TULID next;
- memset(next.Data, 0, sizeof(next.Data));
- return next;
- }
-
-private:
TActorId Owner;
TString SessionId;
@@ -2403,9 +2345,7 @@ private:
ui32 QueryId = 0;
TKikimrConfiguration::TPtr Config;
IDataProvider::TFillSettings FillSettings;
- TLRUCache<TULID, TIntrusivePtr<TKqpTransactionContext>> ExplicitTransactions;
- std::vector<TIntrusivePtr<TKqpTransactionContext>> TransactionsToBeAborted;
- ui64 EvictedTx = 0;
+ TTransactionsCache Transactions;
std::unique_ptr<TEvKqp::TEvQueryResponse> QueryResponse;
std::optional<TSessionShutdownState> ShutdownState;
TULIDGenerator UlidGen;
diff --git a/ydb/core/kqp/session_actor/kqp_tx.h b/ydb/core/kqp/session_actor/kqp_tx.h
index 377d8ddd58c..7629dc08f7d 100644
--- a/ydb/core/kqp/session_actor/kqp_tx.h
+++ b/ydb/core/kqp/session_actor/kqp_tx.h
@@ -4,6 +4,8 @@
#include <ydb/core/kqp/gateway/kqp_gateway.h>
#include <ydb/core/kqp/provider/yql_kikimr_provider.h>
+#include <ydb/core/util/ulid.h>
+
#include <ydb/library/mkql_proto/protos/minikql.pb.h>
#include <library/cpp/actors/core/actorid.h>
@@ -241,6 +243,96 @@ public:
IKqpGateway::TKqpSnapshotHandle SnapshotHandle;
};
+class TTransactionsCache {
+ TLRUCache<TULID, TIntrusivePtr<TKqpTransactionContext>> Active;
+ std::deque<TIntrusivePtr<TKqpTransactionContext>> ToBeAborted;
+public:
+ ui64 EvictedTx = 0;
+ TDuration IdleTimeout;
+
+ TTransactionsCache(size_t size, TDuration idleTimeout)
+ : Active(size)
+ , IdleTimeout(idleTimeout)
+ {}
+
+ size_t Size() {
+ return Active.Size();
+ }
+
+ size_t MaxSize() {
+ return Active.GetMaxSize();
+ }
+
+ TIntrusivePtr<TKqpTransactionContext> Find(const TULID& id) {
+ if (auto it = Active.Find(id); it != Active.End()) {
+ it.Value()->Touch();
+ return *it;
+ } else {
+ return {};
+ }
+ }
+
+ TIntrusivePtr<TKqpTransactionContext> ReleaseTransaction(const TULID& txId) {
+ if (auto it = Active.FindWithoutPromote(txId); it != Active.End()) {
+ auto ret = std::move(it.Value());
+ Active.Erase(it);
+ return ret;
+ }
+ return {};
+ }
+
+ void AddToBeAborted(TIntrusivePtr<TKqpTransactionContext> ctx) {
+ ToBeAborted.emplace_back(std::move(ctx));
+ }
+
+ bool RemoveOldTransactions() {
+ if (Active.Size() < Active.GetMaxSize()) {
+ return true;
+ } else {
+ auto it = Active.FindOldest();
+ auto currentIdle = TInstant::Now() - it.Value()->LastAccessTime;
+ if (currentIdle >= IdleTimeout) {
+ it.Value()->Invalidate();
+ ToBeAborted.emplace_back(std::move(it.Value()));
+ Active.Erase(it);
+ ++EvictedTx;
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ bool CreateNew(const TULID& txId, TIntrusivePtr<TKqpTransactionContext> txCtx) {
+ if (!RemoveOldTransactions()) {
+ return false;
+ }
+ return Active.Insert(std::make_pair(txId, txCtx));
+ }
+
+ void FinalCleanup() {
+ for (auto it = Active.Begin(); it != Active.End(); ++it) {
+ it.Value()->Invalidate();
+ ToBeAborted.emplace_back(std::move(it.Value()));
+ }
+ Active.Clear();
+ }
+
+ size_t ToBeAbortedSize() {
+ return ToBeAborted.size();
+ }
+
+ std::deque<TIntrusivePtr<TKqpTransactionContext>> ReleaseToBeAborted() {
+ return std::exchange(ToBeAborted, {});
+ }
+};
+
+inline TULID CreateEmptyULID() {
+ TULID next;
+ memset(next.Data, 0, sizeof(next.Data));
+ return next;
+}
+
bool MergeLocks(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue& value, TKqpTransactionContext& txCtx,
NYql::TExprContext& ctx);