diff options
author | va-kuznecov <[email protected]> | 2023-01-23 17:26:08 +0300 |
---|---|---|
committer | va-kuznecov <[email protected]> | 2023-01-23 17:26:08 +0300 |
commit | db3ac1ca828c3f6b79f1f6832e4a52900a696ebe (patch) | |
tree | 5489ed9f378d616ebf6e1e29707cb8642bf1b52c | |
parent | a65aef1c0441efcfc8721c8a67fd25195f34d402 (diff) |
Move transactions cache code outsize of session_actor
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 156 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_tx.h | 92 |
2 files changed, 140 insertions, 108 deletions
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 083c3035c1a..7864c67e648 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -295,8 +295,7 @@ struct TKqpQueryState { }; struct TKqpCleanupCtx { - ui64 AbortedTransactionsCount = 0; - ui64 TransactionsToBeAborted = 0; + std::deque<TIntrusivePtr<TKqpTransactionContext>> TransactionsToBeAborted; bool IsWaitingForWorkerToClose = false; bool Final = false; TInstant Start = TInstant::Now(); @@ -340,7 +339,7 @@ public: , ModuleResolverState(moduleResolverState) , KqpSettings(kqpSettings) , Config(CreateConfig(kqpSettings, workerSettings)) - , ExplicitTransactions(*Config->_KqpMaxActiveTxPerSession.Get()) + , Transactions(*Config->_KqpMaxActiveTxPerSession.Get(), TDuration::Seconds(*Config->_KqpTxIdleTimeoutSec.Get())) { RequestCounters = MakeIntrusive<TKqpRequestCounters>(); RequestCounters->Counters = Counters; @@ -419,41 +418,24 @@ public: Cleanup(); } - TIntrusivePtr<TKqpTransactionContext> FindTransaction(const TULID& id) { - auto it = ExplicitTransactions.Find(id); - if (it != ExplicitTransactions.End()) { - auto& value = it.Value(); - value->Touch(); - return value; - } - - return {}; - } - - void RemoveTransaction(const TULID& txId) { - auto it = ExplicitTransactions.FindWithoutPromote(txId); - if (it != ExplicitTransactions.End()) { - ExplicitTransactions.Erase(it); - } + void ReplyTransactionNotFound(const TString& txId) { + std::vector<TIssue> issues{YqlIssue(TPosition(), TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND, + TStringBuilder() << "Transaction not found: " << txId)}; + ReplyQueryError(Ydb::StatusIds::NOT_FOUND, "", MessageFromIssues(issues)); } void RollbackTx() { - YQL_ENSURE(QueryState->HasTxControl(), - "Can't perform ROLLBACK_TX: TxControl isn't set in TQueryRequest"); + YQL_ENSURE(QueryState->HasTxControl(), "Can't perform ROLLBACK_TX: TxControl isn't set in TQueryRequest"); const auto& txControl = QueryState->GetTxControl(); QueryState->Commit = txControl.commit_tx(); TULID txId; txId.ParseString(txControl.tx_id()); - auto txCtx = FindTransaction(txId); - if (!txCtx) { - std::vector<TIssue> issues{YqlIssue(TPosition(), TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND, - TStringBuilder() << "Transaction not found: " << txControl.tx_id())}; - ReplyQueryError(Ydb::StatusIds::NOT_FOUND, "", MessageFromIssues(issues)); - } else { - txCtx->Invalidate(); - InvalidateExplicitTransaction(txCtx, txId); - + if (auto ctx = Transactions.ReleaseTransaction(txId)) { + ctx->Invalidate(); + Transactions.AddToBeAborted(std::move(ctx)); ReplySuccess(); + } else { + ReplyTransactionNotFound(txControl.tx_id()); } } @@ -467,12 +449,10 @@ public: TULID txId; txId.ParseString(txControl.tx_id()); - auto txCtx = FindTransaction(txId); + auto txCtx = Transactions.Find(txId); LOG_D("queryRequest TxControl: " << txControl.DebugString() << " txCtx: " << (void*)txCtx.Get()); if (!txCtx) { - std::vector<TIssue> issues{YqlIssue(TPosition(), TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND, - TStringBuilder() << "Transaction not found: " << txControl.tx_id())}; - ReplyQueryError(Ydb::StatusIds::NOT_FOUND, "", MessageFromIssues(issues)); + ReplyTransactionNotFound(txControl.tx_id()); return; } QueryState->TxCtx = std::move(txCtx); @@ -865,43 +845,24 @@ public: ExecuteOrDefer(); } - void RemoveOldTransactions() { - if (ExplicitTransactions.Size() == *Config->_KqpMaxActiveTxPerSession.Get()) { - auto it = ExplicitTransactions.FindOldest(); - auto idleDuration = TInstant::Now() - it.Value()->LastAccessTime; - if (idleDuration.Seconds() >= *Config->_KqpTxIdleTimeoutSec.Get()) { - it.Value()->Invalidate(); - TransactionsToBeAborted.emplace_back(std::move(it.Value())); - ExplicitTransactions.Erase(it); - ++EvictedTx; - } else { - std::vector<TIssue> issues{ - YqlIssue({}, TIssuesIds::KIKIMR_TOO_MANY_TRANSACTIONS) - }; - ythrow TRequestFail(Ydb::StatusIds::BAD_SESSION, MessageFromIssues(issues)) - << "Too many transactions, current active: " << ExplicitTransactions.Size() - << " MaxTxPerSession: " << *Config->_KqpMaxActiveTxPerSession.Get(); - } - } - } - - void CreateNewTx() { - RemoveOldTransactions(); - auto success = ExplicitTransactions.Insert(std::make_pair(QueryState->TxId, QueryState->TxCtx)); - YQL_ENSURE(success); - } - void BeginTx(const Ydb::Table::TransactionSettings& settings) { QueryState->TxId = UlidGen.Next(); QueryState->TxId_Human = QueryState->TxId.ToString(); QueryState->TxCtx = MakeIntrusive<TKqpTransactionContext>(false, AppData()->FunctionRegistry, AppData()->TimeProvider, AppData()->RandomProvider); QueryState->QueryData = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc); QueryState->TxCtx->SetIsolationLevel(settings); - CreateNewTx(); + + if (!Transactions.CreateNew(QueryState->TxId, QueryState->TxCtx)) { + std::vector<TIssue> issues{ + YqlIssue({}, TIssuesIds::KIKIMR_TOO_MANY_TRANSACTIONS)}; + ythrow TRequestFail(Ydb::StatusIds::BAD_SESSION, + MessageFromIssues(issues)) + << "Too many transactions, current active: " << Transactions.Size() + << " MaxTxPerSession: " << Transactions.MaxSize(); + } Counters->ReportTxCreated(Settings.DbCounters); - Counters->ReportBeginTransaction(Settings.DbCounters, EvictedTx, ExplicitTransactions.Size(), - TransactionsToBeAborted.size()); + Counters->ReportBeginTransaction(Settings.DbCounters, Transactions.EvictedTx, Transactions.Size(), Transactions.ToBeAbortedSize()); } std::pair<bool, TIssues> ApplyTableOperations(TKqpTransactionContext* txCtx, const NKqpProto::TKqpPhyQuery& query) { @@ -923,14 +884,12 @@ public: case Ydb::Table::TransactionControl::kTxId: { TULID txId; txId.ParseString(txControl.tx_id()); - auto it = ExplicitTransactions.Find(txId); - if (it == ExplicitTransactions.End()) { - std::vector<TIssue> issues{YqlIssue(TPosition(), TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND, - TStringBuilder() << "Transaction not found: " << txControl.tx_id())}; - ReplyQueryError(Ydb::StatusIds::NOT_FOUND, "", MessageFromIssues(issues)); + auto txCtx = Transactions.Find(txId); + if (!txCtx) { + ReplyTransactionNotFound(txControl.tx_id()); return false; } - QueryState->TxCtx = *it; + QueryState->TxCtx = txCtx; QueryState->QueryData = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc); QueryState->TxId = txId; QueryState->TxId_Human = txControl.tx_id(); @@ -1616,7 +1575,10 @@ public: void FillTxInfo(NKikimrKqp::TQueryResponse* response) { YQL_ENSURE(QueryState); if (QueryState->Commit) { - RemoveTransaction(QueryState->TxId); + if (auto ctx = Transactions.ReleaseTransaction(QueryState->TxId)) { + ctx->Invalidate(); + Transactions.AddToBeAborted(std::move(ctx)); + } QueryState->TxId = CreateEmptyULID(); QueryState->TxId_Human = ""; } @@ -1782,10 +1744,9 @@ public: } LOG_W("ReplyQueryCompileError, status " << compileResult->Status << " remove tx with tx_id: " << txId_Human); - auto txCtx = FindTransaction(txId); - if (txCtx) { - txCtx->Invalidate(); - InvalidateExplicitTransaction(txCtx, txId); + if (auto ctx = Transactions.ReleaseTransaction(txId)) { + ctx->Invalidate(); + Transactions.AddToBeAborted(std::move(ctx)); } txId = CreateEmptyULID(); txId_Human = ""; @@ -1967,18 +1928,14 @@ public: Cleanup(true); } - void InvalidateExplicitTransaction(TIntrusivePtr<TKqpTransactionContext> txCtx, const TULID& txId) { - TransactionsToBeAborted.emplace_back(std::move(txCtx)); - RemoveTransaction(txId); - } - void Cleanup(bool isFinal = false) { isFinal = isFinal || QueryState && !QueryState->KeepSession; if (QueryState && QueryState->TxCtx) { auto& txCtx = QueryState->TxCtx; if (txCtx->IsInvalidated()) { - InvalidateExplicitTransaction(QueryState->TxCtx, QueryState->TxId); + Transactions.AddToBeAborted(txCtx); + Transactions.ReleaseTransaction(QueryState->TxId); } DiscardPersistentSnapshot(txCtx->SnapshotHandle); } @@ -1987,12 +1944,8 @@ public: Counters->ReportSessionActorClosedRequest(Settings.DbCounters); if (isFinal) { - for (auto it = ExplicitTransactions.Begin(); it != ExplicitTransactions.End(); ++it) { - it.Value()->Invalidate(); - TransactionsToBeAborted.emplace_back(std::move(it.Value())); - } - Counters->ReportTxAborted(Settings.DbCounters, TransactionsToBeAborted.size()); - ExplicitTransactions.Clear(); + Transactions.FinalCleanup(); + Counters->ReportTxAborted(Settings.DbCounters, Transactions.ToBeAbortedSize()); } if (WorkerId) { @@ -2006,17 +1959,16 @@ public: CleanupCtx->IsWaitingForWorkerToClose = true; } - if (TransactionsToBeAborted.size()) { + if (Transactions.ToBeAbortedSize()) { if (!CleanupCtx) CleanupCtx.reset(new TKqpCleanupCtx); CleanupCtx->Final = isFinal; - CleanupCtx->AbortedTransactionsCount = 0; - CleanupCtx->TransactionsToBeAborted = TransactionsToBeAborted.size(); - SendRollbackRequest(TransactionsToBeAborted.front().Get()); + CleanupCtx->TransactionsToBeAborted = Transactions.ReleaseToBeAborted(); + SendRollbackRequest(CleanupCtx->TransactionsToBeAborted.front().Get()); } LOG_I("Cleanup start, isFinal: " << isFinal << " CleanupCtx: " << bool{CleanupCtx} - << " TransactionsToBeAborted.size(): " << TransactionsToBeAborted.size()); + << " TransactionsToBeAborted.size(): " << (CleanupCtx ? CleanupCtx->TransactionsToBeAborted.size() : 0)); if (CleanupCtx) { Become(&TKqpSessionActor::CleanupState); } else { @@ -2026,7 +1978,7 @@ public: void HandleCleanup(TEvKqp::TEvCloseSessionResponse::TPtr&) { CleanupCtx->IsWaitingForWorkerToClose = false; - if (CleanupCtx->AbortedTransactionsCount == CleanupCtx->TransactionsToBeAborted) { + if (CleanupCtx->TransactionsToBeAborted.empty()) { EndCleanup(CleanupCtx->Final); } } @@ -2046,10 +1998,9 @@ public: } YQL_ENSURE(CleanupCtx); - ++CleanupCtx->AbortedTransactionsCount; - if (CleanupCtx->AbortedTransactionsCount < CleanupCtx->TransactionsToBeAborted) { - auto& txCtx = TransactionsToBeAborted[CleanupCtx->AbortedTransactionsCount]; - SendRollbackRequest(txCtx.Get()); + CleanupCtx->TransactionsToBeAborted.pop_front(); + if (CleanupCtx->TransactionsToBeAborted.size()) { + SendRollbackRequest(CleanupCtx->TransactionsToBeAborted.front().Get()); } else { if (!CleanupCtx->IsWaitingForWorkerToClose) EndCleanup(CleanupCtx->Final); @@ -2079,7 +2030,6 @@ public: LOG_D("Session actor destroyed"); PassAway(); } else { - TransactionsToBeAborted.clear(); CleanupCtx.reset(); QueryState.reset(); Become(&TKqpSessionActor::ReadyState); @@ -2378,14 +2328,6 @@ private: } private: - - TULID CreateEmptyULID() { - TULID next; - memset(next.Data, 0, sizeof(next.Data)); - return next; - } - -private: TActorId Owner; TString SessionId; @@ -2403,9 +2345,7 @@ private: ui32 QueryId = 0; TKikimrConfiguration::TPtr Config; IDataProvider::TFillSettings FillSettings; - TLRUCache<TULID, TIntrusivePtr<TKqpTransactionContext>> ExplicitTransactions; - std::vector<TIntrusivePtr<TKqpTransactionContext>> TransactionsToBeAborted; - ui64 EvictedTx = 0; + TTransactionsCache Transactions; std::unique_ptr<TEvKqp::TEvQueryResponse> QueryResponse; std::optional<TSessionShutdownState> ShutdownState; TULIDGenerator UlidGen; diff --git a/ydb/core/kqp/session_actor/kqp_tx.h b/ydb/core/kqp/session_actor/kqp_tx.h index 377d8ddd58c..7629dc08f7d 100644 --- a/ydb/core/kqp/session_actor/kqp_tx.h +++ b/ydb/core/kqp/session_actor/kqp_tx.h @@ -4,6 +4,8 @@ #include <ydb/core/kqp/gateway/kqp_gateway.h> #include <ydb/core/kqp/provider/yql_kikimr_provider.h> +#include <ydb/core/util/ulid.h> + #include <ydb/library/mkql_proto/protos/minikql.pb.h> #include <library/cpp/actors/core/actorid.h> @@ -241,6 +243,96 @@ public: IKqpGateway::TKqpSnapshotHandle SnapshotHandle; }; +class TTransactionsCache { + TLRUCache<TULID, TIntrusivePtr<TKqpTransactionContext>> Active; + std::deque<TIntrusivePtr<TKqpTransactionContext>> ToBeAborted; +public: + ui64 EvictedTx = 0; + TDuration IdleTimeout; + + TTransactionsCache(size_t size, TDuration idleTimeout) + : Active(size) + , IdleTimeout(idleTimeout) + {} + + size_t Size() { + return Active.Size(); + } + + size_t MaxSize() { + return Active.GetMaxSize(); + } + + TIntrusivePtr<TKqpTransactionContext> Find(const TULID& id) { + if (auto it = Active.Find(id); it != Active.End()) { + it.Value()->Touch(); + return *it; + } else { + return {}; + } + } + + TIntrusivePtr<TKqpTransactionContext> ReleaseTransaction(const TULID& txId) { + if (auto it = Active.FindWithoutPromote(txId); it != Active.End()) { + auto ret = std::move(it.Value()); + Active.Erase(it); + return ret; + } + return {}; + } + + void AddToBeAborted(TIntrusivePtr<TKqpTransactionContext> ctx) { + ToBeAborted.emplace_back(std::move(ctx)); + } + + bool RemoveOldTransactions() { + if (Active.Size() < Active.GetMaxSize()) { + return true; + } else { + auto it = Active.FindOldest(); + auto currentIdle = TInstant::Now() - it.Value()->LastAccessTime; + if (currentIdle >= IdleTimeout) { + it.Value()->Invalidate(); + ToBeAborted.emplace_back(std::move(it.Value())); + Active.Erase(it); + ++EvictedTx; + return true; + } else { + return false; + } + } + } + + bool CreateNew(const TULID& txId, TIntrusivePtr<TKqpTransactionContext> txCtx) { + if (!RemoveOldTransactions()) { + return false; + } + return Active.Insert(std::make_pair(txId, txCtx)); + } + + void FinalCleanup() { + for (auto it = Active.Begin(); it != Active.End(); ++it) { + it.Value()->Invalidate(); + ToBeAborted.emplace_back(std::move(it.Value())); + } + Active.Clear(); + } + + size_t ToBeAbortedSize() { + return ToBeAborted.size(); + } + + std::deque<TIntrusivePtr<TKqpTransactionContext>> ReleaseToBeAborted() { + return std::exchange(ToBeAborted, {}); + } +}; + +inline TULID CreateEmptyULID() { + TULID next; + memset(next.Data, 0, sizeof(next.Data)); + return next; +} + bool MergeLocks(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue& value, TKqpTransactionContext& txCtx, NYql::TExprContext& ctx); |