aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordcherednik <dcherednik@ydb.tech>2023-01-17 21:16:18 +0300
committerdcherednik <dcherednik@ydb.tech>2023-01-17 21:16:18 +0300
commitc4c2cdbbda0d92a4adfb40515374132e1180a725 (patch)
treedc491ee809d8805ca6c2ad899696516227ed4044
parent45939a7a0e41d208a1fd1519a8429c0a688f313a (diff)
downloadydb-c4c2cdbbda0d92a4adfb40515374132e1180a725.tar.gz
Extract locks in session actor.
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp2
-rw-r--r--ydb/core/kqp/executer_actor/kqp_locks_helper.cpp45
-rw-r--r--ydb/core/kqp/executer_actor/kqp_locks_helper.h2
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp2
-rw-r--r--ydb/core/kqp/gateway/kqp_gateway.h3
-rw-r--r--ydb/core/kqp/gateway/kqp_ic_gateway.cpp2
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp11
7 files changed, 34 insertions, 33 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index eff1912d7cd..48266494cd5 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -1858,7 +1858,7 @@ private:
// Transactions with topics must always use generic readsets
!topicTxs.empty());
- if (auto locksMap = ExtractLocks(Request.Locks);
+ if (auto locksMap = Request.DataShardLocks;
!locksMap.empty() ||
VolatileTx ||
Request.TopicOperations.HasReadOperations())
diff --git a/ydb/core/kqp/executer_actor/kqp_locks_helper.cpp b/ydb/core/kqp/executer_actor/kqp_locks_helper.cpp
index cb1178b74f6..19fbb4cd602 100644
--- a/ydb/core/kqp/executer_actor/kqp_locks_helper.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_locks_helper.cpp
@@ -37,41 +37,36 @@ void BuildLocks(NKikimrMiniKQL::TResult& result, const TVector<NKikimrTxDataShar
}
}
-TMap<ui64, TVector<NKikimrTxDataShard::TLock>> ExtractLocks(const TVector<NYql::NDq::TMkqlValueRef>& locks) {
+NKikimrTxDataShard::TLock ExtractLock(const NYql::NDq::TMkqlValueRef& lock) {
auto ensureMemberDataType = [] (const NKikimrMiniKQL::TMember& member, const TString& name, ui32 scheme) {
YQL_ENSURE(member.GetName() == name);
YQL_ENSURE(member.GetType().GetKind() == NKikimrMiniKQL::ETypeKind::Data);
YQL_ENSURE(member.GetType().GetData().GetScheme() == scheme);
};
- TMap<ui64, TVector<NKikimrTxDataShard::TLock>> locksMap;
- for (auto& lock : locks) {
- const auto& type = lock.GetType();
- const auto& value = lock.GetValue();
-
- YQL_ENSURE(type.GetKind() == NKikimrMiniKQL::ETypeKind::Struct);
- auto& structType = type.GetStruct();
+ const auto& type = lock.GetType();
+ const auto& value = lock.GetValue();
- YQL_ENSURE(structType.MemberSize() == 6);
- ensureMemberDataType(structType.GetMember(0), "Counter", NKikimr::NUdf::TDataType<ui64>::Id);
- ensureMemberDataType(structType.GetMember(1), "DataShard", NKikimr::NUdf::TDataType<ui64>::Id);
- ensureMemberDataType(structType.GetMember(2), "Generation", NKikimr::NUdf::TDataType<ui32>::Id);
- ensureMemberDataType(structType.GetMember(3), "LockId", NKikimr::NUdf::TDataType<ui64>::Id);
- ensureMemberDataType(structType.GetMember(4), "PathId", NKikimr::NUdf::TDataType<ui64>::Id);
- ensureMemberDataType(structType.GetMember(5), "SchemeShard", NKikimr::NUdf::TDataType<ui64>::Id);
+ YQL_ENSURE(type.GetKind() == NKikimrMiniKQL::ETypeKind::Struct);
+ auto& structType = type.GetStruct();
- NKikimrTxDataShard::TLock dsLock;
- dsLock.SetCounter(value.GetStruct(0).GetUint64());
- dsLock.SetDataShard(value.GetStruct(1).GetUint64());
- dsLock.SetGeneration(value.GetStruct(2).GetUint32());
- dsLock.SetLockId(value.GetStruct(3).GetUint64());
- dsLock.SetPathId(value.GetStruct(4).GetUint64());
- dsLock.SetSchemeShard(value.GetStruct(5).GetUint64());
+ YQL_ENSURE(structType.MemberSize() == 6);
+ ensureMemberDataType(structType.GetMember(0), "Counter", NKikimr::NUdf::TDataType<ui64>::Id);
+ ensureMemberDataType(structType.GetMember(1), "DataShard", NKikimr::NUdf::TDataType<ui64>::Id);
+ ensureMemberDataType(structType.GetMember(2), "Generation", NKikimr::NUdf::TDataType<ui32>::Id);
+ ensureMemberDataType(structType.GetMember(3), "LockId", NKikimr::NUdf::TDataType<ui64>::Id);
+ ensureMemberDataType(structType.GetMember(4), "PathId", NKikimr::NUdf::TDataType<ui64>::Id);
+ ensureMemberDataType(structType.GetMember(5), "SchemeShard", NKikimr::NUdf::TDataType<ui64>::Id);
- locksMap[dsLock.GetDataShard()].emplace_back(std::move(dsLock));
- }
+ NKikimrTxDataShard::TLock dsLock;
+ dsLock.SetCounter(value.GetStruct(0).GetUint64());
+ dsLock.SetDataShard(value.GetStruct(1).GetUint64());
+ dsLock.SetGeneration(value.GetStruct(2).GetUint32());
+ dsLock.SetLockId(value.GetStruct(3).GetUint64());
+ dsLock.SetPathId(value.GetStruct(4).GetUint64());
+ dsLock.SetSchemeShard(value.GetStruct(5).GetUint64());
- return locksMap;
+ return dsLock;
}
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/executer_actor/kqp_locks_helper.h b/ydb/core/kqp/executer_actor/kqp_locks_helper.h
index 1e8b175219d..f4ea90a0eb9 100644
--- a/ydb/core/kqp/executer_actor/kqp_locks_helper.h
+++ b/ydb/core/kqp/executer_actor/kqp_locks_helper.h
@@ -10,6 +10,6 @@ namespace NKikimr::NKqp {
void BuildLocks(NKikimrMiniKQL::TResult& result, const TVector<NKikimrTxDataShard::TLock>& locks);
-TMap<ui64, TVector<NKikimrTxDataShard::TLock>> ExtractLocks(const TVector<NYql::NDq::TMkqlValueRef>& locks);
+NKikimrTxDataShard::TLock ExtractLock(const NYql::NDq::TMkqlValueRef& lock);
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index bb3a39707fe..161c70545a3 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -49,7 +49,7 @@ public:
: TBase(std::move(request), database, userToken, counters, TWilsonKqp::ScanExecuter, "ScanExecuter")
{
YQL_ENSURE(Request.Transactions.size() == 1);
- YQL_ENSURE(Request.Locks.empty());
+ YQL_ENSURE(Request.DataShardLocks.empty());
YQL_ENSURE(!Request.ValidateLocks);
YQL_ENSURE(!Request.EraseLocks);
YQL_ENSURE(Request.IsolationLevel == NKikimrKqp::ISOLATION_LEVEL_UNDEFINED);
diff --git a/ydb/core/kqp/gateway/kqp_gateway.h b/ydb/core/kqp/gateway/kqp_gateway.h
index a28b3c406bc..c8ba81c82ae 100644
--- a/ydb/core/kqp/gateway/kqp_gateway.h
+++ b/ydb/core/kqp/gateway/kqp_gateway.h
@@ -4,6 +4,7 @@
#include <ydb/core/protos/kqp_physical.pb.h>
#include <ydb/core/protos/tx_proxy.pb.h>
+#include <ydb/core/protos/tx_datashard.pb.h>
#include <ydb/library/yql/ast/yql_expr.h>
#include <ydb/library/yql/dq/common/dq_value.h>
@@ -109,7 +110,7 @@ public:
{}
TVector<TPhysicalTxData> Transactions;
- TVector<NYql::NDq::TMkqlValueRef> Locks;
+ TMap<ui64, TVector<NKikimrTxDataShard::TLock>> DataShardLocks;
NKikimr::NKqp::TTxAllocatorState::TPtr TxAlloc;
bool ValidateLocks = false;
bool EraseLocks = false;
diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
index 1618da995e3..2cc4f28835d 100644
--- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
+++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
@@ -1669,7 +1669,7 @@ public:
TFuture<TExecPhysicalResult> ExecutePure(TExecPhysicalRequest&& request, TQueryData::TPtr params) override {
YQL_ENSURE(!request.Transactions.empty());
- YQL_ENSURE(request.Locks.empty());
+ YQL_ENSURE(request.DataShardLocks.empty());
YQL_ENSURE(!request.NeedTxId);
auto containOnlyPureStages = [](const auto& request) {
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 5eda7bcb2b0..3f27a14b1dc 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -7,6 +7,7 @@
#include <ydb/core/kqp/common/kqp_timeouts.h>
#include <ydb/core/kqp/compile_service/kqp_compile_service.h>
#include <ydb/core/kqp/executer_actor/kqp_executer.h>
+#include <ydb/core/kqp/executer_actor/kqp_locks_helper.h>
#include <ydb/core/kqp/host/kqp_host_impl.h>
#include <ydb/core/kqp/opt/kqp_query_plan.h>
#include <ydb/core/kqp/provider/yql_kikimr_provider.h>
@@ -1275,8 +1276,10 @@ public:
<< " EraseLocks: " << request.EraseLocks);
for (auto& [lockId, lock] : txCtx.Locks.LocksMap) {
- request.Locks.emplace_back(lock.GetValueRef(txCtx.Locks.LockType));
+ auto dsLock = ExtractLock(lock.GetValueRef(txCtx.Locks.LockType));
+ request.DataShardLocks[dsLock.GetDataShard()].emplace_back(dsLock);
}
+
}
request.TopicOperations = std::move(txCtx.TopicOperations);
@@ -1288,7 +1291,7 @@ public:
QueryState->Orbit,
QueryState->CurrentTx,
request.Transactions.size(),
- request.Locks.size(),
+ txCtx.Locks.Size(),
request.AcquireLocksTxId.Defined());
SendToExecuter(std::move(request));
@@ -1886,8 +1889,10 @@ public:
// Should tx with empty LocksMap be aborted?
for (auto& [lockId, lock] : txCtx->Locks.LocksMap) {
- request.Locks.emplace_back(lock.GetValueRef(txCtx->Locks.LockType));
+ auto dsLock = ExtractLock(lock.GetValueRef(txCtx->Locks.LockType));
+ request.DataShardLocks[dsLock.GetDataShard()].emplace_back(dsLock);
}
+
SendToExecuter(std::move(request), true);
}