diff options
author | va-kuznecov <va-kuznecov@ydb.tech> | 2023-03-06 13:58:53 +0300 |
---|---|---|
committer | va-kuznecov <va-kuznecov@ydb.tech> | 2023-03-06 13:58:53 +0300 |
commit | 49189329e118c0d747c9f5977496bc5003fb7b49 (patch) | |
tree | d815f50985b2dcb1b12ed3fcd0900ebc08bb49fd | |
parent | c845dcd89fe965d6ead1089ed80373ce5da12723 (diff) | |
download | ydb-49189329e118c0d747c9f5977496bc5003fb7b49.tar.gz |
Move ShouldAcquireLocks into TKqpTransactionContext
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 35 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_tx.cpp | 20 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_tx.h | 34 |
3 files changed, 32 insertions, 57 deletions
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 2bdd5b08ea..0b1f8175af 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1054,39 +1054,6 @@ public: return QueryState->QueryData; } - bool ShouldAcquireLocks(const NKqpProto::TKqpPhyQuery* query) { - auto& txCtx = *QueryState->TxCtx; - - if (*txCtx.EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE) { - return false; - } - - if (txCtx.Locks.Broken()) { - return false; // Do not acquire locks after first lock issue - } - - if (!txCtx.DeferredEffects.Empty()) { - return true; // Acquire locks in read write tx - } - - YQL_ENSURE(query); - for (auto& tx : query->GetTransactions()) { - if (tx.GetHasEffects()) { - return true; // Acquire locks in read write tx - } - } - - if (!QueryState->Commit) { - return true; // Is not a commit tx - } - - if (txCtx.GetSnapshot().IsValid()) { - return false; // It is a read only tx with snapshot, no need to acquire locks - } - - return true; - } - TQueryData::TPtr CreateKqpValueMap(const TKqpPhyTxHolder::TConstPtr& tx) { for (const auto& paramBinding : tx->GetParamBindings()) { QueryState->QueryData->MaterializeParamValue(true, paramBinding); @@ -1248,7 +1215,7 @@ public: } request.TopicOperations = std::move(txCtx.TopicOperations); - } else if (ShouldAcquireLocks(query)) { + } else if (txCtx.ShouldAcquireLocks(query, QueryState->Commit)) { request.AcquireLocksTxId = txCtx.Locks.GetLockTxId(); } diff --git a/ydb/core/kqp/session_actor/kqp_tx.cpp b/ydb/core/kqp/session_actor/kqp_tx.cpp index 201e051cd8..8ae8e01429 100644 --- a/ydb/core/kqp/session_actor/kqp_tx.cpp +++ b/ydb/core/kqp/session_actor/kqp_tx.cpp @@ -78,26 +78,6 @@ std::pair<bool, std::vector<TIssue>> MergeLocks(const NKikimrMiniKQL::TType& typ return res; } -bool MergeLocks(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue& value, TKqpTransactionContext& txCtx, - TExprContext& ctx) { - auto [success, issues] = MergeLocks(type, value, txCtx); - if (!success) { - if (!txCtx.GetSnapshot().IsValid()) { - for (auto& issue : issues) { - ctx.AddError(std::move(issue)); - } - return false; - } else { - txCtx.Locks.MarkBroken(issues.back()); - if (txCtx.TxHasEffects()) { - txCtx.Locks.ReportIssues(ctx); - return false; - } - } - } - return true; -} - TKqpTransactionInfo TKqpTransactionContext::GetInfo() const { TKqpTransactionInfo txInfo; diff --git a/ydb/core/kqp/session_actor/kqp_tx.h b/ydb/core/kqp/session_actor/kqp_tx.h index 0d63338bb7..f56ac6cf34 100644 --- a/ydb/core/kqp/session_actor/kqp_tx.h +++ b/ydb/core/kqp/session_actor/kqp_tx.h @@ -217,6 +217,37 @@ public: }; } + bool ShouldAcquireLocks(const NKqpProto::TKqpPhyQuery* query, bool commit) { + if (*EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE) { + return false; + } + + if (Locks.Broken()) { + return false; // Do not acquire locks after first lock issue + } + + if (!DeferredEffects.Empty()) { + return true; // Acquire locks in read write tx + } + + YQL_ENSURE(query); + for (auto& tx : query->GetTransactions()) { + if (tx.GetHasEffects()) { + return true; // Acquire locks in read write tx + } + } + + if (!commit) { + return true; // Is not a commit tx + } + + if (GetSnapshot().IsValid()) { + return false; // It is a read only tx with snapshot, no need to acquire locks + } + + return true; + } + public: struct TParamsState : public TThrRefBase { ui32 LastIndex = 0; @@ -357,9 +388,6 @@ public: } }; -bool MergeLocks(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue& value, TKqpTransactionContext& txCtx, - NYql::TExprContext& ctx); - std::pair<bool, std::vector<NYql::TIssue>> MergeLocks(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue& value, TKqpTransactionContext& txCtx); |