aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorva-kuznecov <va-kuznecov@ydb.tech>2023-03-06 13:58:53 +0300
committerva-kuznecov <va-kuznecov@ydb.tech>2023-03-06 13:58:53 +0300
commit49189329e118c0d747c9f5977496bc5003fb7b49 (patch)
treed815f50985b2dcb1b12ed3fcd0900ebc08bb49fd
parentc845dcd89fe965d6ead1089ed80373ce5da12723 (diff)
downloadydb-49189329e118c0d747c9f5977496bc5003fb7b49.tar.gz
Move ShouldAcquireLocks into TKqpTransactionContext
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp35
-rw-r--r--ydb/core/kqp/session_actor/kqp_tx.cpp20
-rw-r--r--ydb/core/kqp/session_actor/kqp_tx.h34
3 files changed, 32 insertions, 57 deletions
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 2bdd5b08ea..0b1f8175af 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -1054,39 +1054,6 @@ public:
return QueryState->QueryData;
}
- bool ShouldAcquireLocks(const NKqpProto::TKqpPhyQuery* query) {
- auto& txCtx = *QueryState->TxCtx;
-
- if (*txCtx.EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE) {
- return false;
- }
-
- if (txCtx.Locks.Broken()) {
- return false; // Do not acquire locks after first lock issue
- }
-
- if (!txCtx.DeferredEffects.Empty()) {
- return true; // Acquire locks in read write tx
- }
-
- YQL_ENSURE(query);
- for (auto& tx : query->GetTransactions()) {
- if (tx.GetHasEffects()) {
- return true; // Acquire locks in read write tx
- }
- }
-
- if (!QueryState->Commit) {
- return true; // Is not a commit tx
- }
-
- if (txCtx.GetSnapshot().IsValid()) {
- return false; // It is a read only tx with snapshot, no need to acquire locks
- }
-
- return true;
- }
-
TQueryData::TPtr CreateKqpValueMap(const TKqpPhyTxHolder::TConstPtr& tx) {
for (const auto& paramBinding : tx->GetParamBindings()) {
QueryState->QueryData->MaterializeParamValue(true, paramBinding);
@@ -1248,7 +1215,7 @@ public:
}
request.TopicOperations = std::move(txCtx.TopicOperations);
- } else if (ShouldAcquireLocks(query)) {
+ } else if (txCtx.ShouldAcquireLocks(query, QueryState->Commit)) {
request.AcquireLocksTxId = txCtx.Locks.GetLockTxId();
}
diff --git a/ydb/core/kqp/session_actor/kqp_tx.cpp b/ydb/core/kqp/session_actor/kqp_tx.cpp
index 201e051cd8..8ae8e01429 100644
--- a/ydb/core/kqp/session_actor/kqp_tx.cpp
+++ b/ydb/core/kqp/session_actor/kqp_tx.cpp
@@ -78,26 +78,6 @@ std::pair<bool, std::vector<TIssue>> MergeLocks(const NKikimrMiniKQL::TType& typ
return res;
}
-bool MergeLocks(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue& value, TKqpTransactionContext& txCtx,
- TExprContext& ctx) {
- auto [success, issues] = MergeLocks(type, value, txCtx);
- if (!success) {
- if (!txCtx.GetSnapshot().IsValid()) {
- for (auto& issue : issues) {
- ctx.AddError(std::move(issue));
- }
- return false;
- } else {
- txCtx.Locks.MarkBroken(issues.back());
- if (txCtx.TxHasEffects()) {
- txCtx.Locks.ReportIssues(ctx);
- return false;
- }
- }
- }
- return true;
-}
-
TKqpTransactionInfo TKqpTransactionContext::GetInfo() const {
TKqpTransactionInfo txInfo;
diff --git a/ydb/core/kqp/session_actor/kqp_tx.h b/ydb/core/kqp/session_actor/kqp_tx.h
index 0d63338bb7..f56ac6cf34 100644
--- a/ydb/core/kqp/session_actor/kqp_tx.h
+++ b/ydb/core/kqp/session_actor/kqp_tx.h
@@ -217,6 +217,37 @@ public:
};
}
+ bool ShouldAcquireLocks(const NKqpProto::TKqpPhyQuery* query, bool commit) {
+ if (*EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE) {
+ return false;
+ }
+
+ if (Locks.Broken()) {
+ return false; // Do not acquire locks after first lock issue
+ }
+
+ if (!DeferredEffects.Empty()) {
+ return true; // Acquire locks in read write tx
+ }
+
+ YQL_ENSURE(query);
+ for (auto& tx : query->GetTransactions()) {
+ if (tx.GetHasEffects()) {
+ return true; // Acquire locks in read write tx
+ }
+ }
+
+ if (!commit) {
+ return true; // Is not a commit tx
+ }
+
+ if (GetSnapshot().IsValid()) {
+ return false; // It is a read only tx with snapshot, no need to acquire locks
+ }
+
+ return true;
+ }
+
public:
struct TParamsState : public TThrRefBase {
ui32 LastIndex = 0;
@@ -357,9 +388,6 @@ public:
}
};
-bool MergeLocks(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue& value, TKqpTransactionContext& txCtx,
- NYql::TExprContext& ctx);
-
std::pair<bool, std::vector<NYql::TIssue>> MergeLocks(const NKikimrMiniKQL::TType& type,
const NKikimrMiniKQL::TValue& value, TKqpTransactionContext& txCtx);