diff options
author | dcherednik <dcherednik@ydb.tech> | 2023-01-17 21:16:18 +0300 |
---|---|---|
committer | dcherednik <dcherednik@ydb.tech> | 2023-01-17 21:16:18 +0300 |
commit | c4c2cdbbda0d92a4adfb40515374132e1180a725 (patch) | |
tree | dc491ee809d8805ca6c2ad899696516227ed4044 | |
parent | 45939a7a0e41d208a1fd1519a8429c0a688f313a (diff) | |
download | ydb-c4c2cdbbda0d92a4adfb40515374132e1180a725.tar.gz |
Extract locks in session actor.
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_locks_helper.cpp | 45 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_locks_helper.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scan_executer.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/gateway/kqp_gateway.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/gateway/kqp_ic_gateway.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 11 |
7 files changed, 34 insertions, 33 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index eff1912d7cd..48266494cd5 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -1858,7 +1858,7 @@ private: // Transactions with topics must always use generic readsets !topicTxs.empty()); - if (auto locksMap = ExtractLocks(Request.Locks); + if (auto locksMap = Request.DataShardLocks; !locksMap.empty() || VolatileTx || Request.TopicOperations.HasReadOperations()) diff --git a/ydb/core/kqp/executer_actor/kqp_locks_helper.cpp b/ydb/core/kqp/executer_actor/kqp_locks_helper.cpp index cb1178b74f6..19fbb4cd602 100644 --- a/ydb/core/kqp/executer_actor/kqp_locks_helper.cpp +++ b/ydb/core/kqp/executer_actor/kqp_locks_helper.cpp @@ -37,41 +37,36 @@ void BuildLocks(NKikimrMiniKQL::TResult& result, const TVector<NKikimrTxDataShar } } -TMap<ui64, TVector<NKikimrTxDataShard::TLock>> ExtractLocks(const TVector<NYql::NDq::TMkqlValueRef>& locks) { +NKikimrTxDataShard::TLock ExtractLock(const NYql::NDq::TMkqlValueRef& lock) { auto ensureMemberDataType = [] (const NKikimrMiniKQL::TMember& member, const TString& name, ui32 scheme) { YQL_ENSURE(member.GetName() == name); YQL_ENSURE(member.GetType().GetKind() == NKikimrMiniKQL::ETypeKind::Data); YQL_ENSURE(member.GetType().GetData().GetScheme() == scheme); }; - TMap<ui64, TVector<NKikimrTxDataShard::TLock>> locksMap; - for (auto& lock : locks) { - const auto& type = lock.GetType(); - const auto& value = lock.GetValue(); - - YQL_ENSURE(type.GetKind() == NKikimrMiniKQL::ETypeKind::Struct); - auto& structType = type.GetStruct(); + const auto& type = lock.GetType(); + const auto& value = lock.GetValue(); - YQL_ENSURE(structType.MemberSize() == 6); - ensureMemberDataType(structType.GetMember(0), "Counter", NKikimr::NUdf::TDataType<ui64>::Id); - ensureMemberDataType(structType.GetMember(1), "DataShard", NKikimr::NUdf::TDataType<ui64>::Id); - ensureMemberDataType(structType.GetMember(2), "Generation", NKikimr::NUdf::TDataType<ui32>::Id); - ensureMemberDataType(structType.GetMember(3), "LockId", NKikimr::NUdf::TDataType<ui64>::Id); - ensureMemberDataType(structType.GetMember(4), "PathId", NKikimr::NUdf::TDataType<ui64>::Id); - ensureMemberDataType(structType.GetMember(5), "SchemeShard", NKikimr::NUdf::TDataType<ui64>::Id); + YQL_ENSURE(type.GetKind() == NKikimrMiniKQL::ETypeKind::Struct); + auto& structType = type.GetStruct(); - NKikimrTxDataShard::TLock dsLock; - dsLock.SetCounter(value.GetStruct(0).GetUint64()); - dsLock.SetDataShard(value.GetStruct(1).GetUint64()); - dsLock.SetGeneration(value.GetStruct(2).GetUint32()); - dsLock.SetLockId(value.GetStruct(3).GetUint64()); - dsLock.SetPathId(value.GetStruct(4).GetUint64()); - dsLock.SetSchemeShard(value.GetStruct(5).GetUint64()); + YQL_ENSURE(structType.MemberSize() == 6); + ensureMemberDataType(structType.GetMember(0), "Counter", NKikimr::NUdf::TDataType<ui64>::Id); + ensureMemberDataType(structType.GetMember(1), "DataShard", NKikimr::NUdf::TDataType<ui64>::Id); + ensureMemberDataType(structType.GetMember(2), "Generation", NKikimr::NUdf::TDataType<ui32>::Id); + ensureMemberDataType(structType.GetMember(3), "LockId", NKikimr::NUdf::TDataType<ui64>::Id); + ensureMemberDataType(structType.GetMember(4), "PathId", NKikimr::NUdf::TDataType<ui64>::Id); + ensureMemberDataType(structType.GetMember(5), "SchemeShard", NKikimr::NUdf::TDataType<ui64>::Id); - locksMap[dsLock.GetDataShard()].emplace_back(std::move(dsLock)); - } + NKikimrTxDataShard::TLock dsLock; + dsLock.SetCounter(value.GetStruct(0).GetUint64()); + dsLock.SetDataShard(value.GetStruct(1).GetUint64()); + dsLock.SetGeneration(value.GetStruct(2).GetUint32()); + dsLock.SetLockId(value.GetStruct(3).GetUint64()); + dsLock.SetPathId(value.GetStruct(4).GetUint64()); + dsLock.SetSchemeShard(value.GetStruct(5).GetUint64()); - return locksMap; + return dsLock; } } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_locks_helper.h b/ydb/core/kqp/executer_actor/kqp_locks_helper.h index 1e8b175219d..f4ea90a0eb9 100644 --- a/ydb/core/kqp/executer_actor/kqp_locks_helper.h +++ b/ydb/core/kqp/executer_actor/kqp_locks_helper.h @@ -10,6 +10,6 @@ namespace NKikimr::NKqp { void BuildLocks(NKikimrMiniKQL::TResult& result, const TVector<NKikimrTxDataShard::TLock>& locks); -TMap<ui64, TVector<NKikimrTxDataShard::TLock>> ExtractLocks(const TVector<NYql::NDq::TMkqlValueRef>& locks); +NKikimrTxDataShard::TLock ExtractLock(const NYql::NDq::TMkqlValueRef& lock); } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index bb3a39707fe..161c70545a3 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -49,7 +49,7 @@ public: : TBase(std::move(request), database, userToken, counters, TWilsonKqp::ScanExecuter, "ScanExecuter") { YQL_ENSURE(Request.Transactions.size() == 1); - YQL_ENSURE(Request.Locks.empty()); + YQL_ENSURE(Request.DataShardLocks.empty()); YQL_ENSURE(!Request.ValidateLocks); YQL_ENSURE(!Request.EraseLocks); YQL_ENSURE(Request.IsolationLevel == NKikimrKqp::ISOLATION_LEVEL_UNDEFINED); diff --git a/ydb/core/kqp/gateway/kqp_gateway.h b/ydb/core/kqp/gateway/kqp_gateway.h index a28b3c406bc..c8ba81c82ae 100644 --- a/ydb/core/kqp/gateway/kqp_gateway.h +++ b/ydb/core/kqp/gateway/kqp_gateway.h @@ -4,6 +4,7 @@ #include <ydb/core/protos/kqp_physical.pb.h> #include <ydb/core/protos/tx_proxy.pb.h> +#include <ydb/core/protos/tx_datashard.pb.h> #include <ydb/library/yql/ast/yql_expr.h> #include <ydb/library/yql/dq/common/dq_value.h> @@ -109,7 +110,7 @@ public: {} TVector<TPhysicalTxData> Transactions; - TVector<NYql::NDq::TMkqlValueRef> Locks; + TMap<ui64, TVector<NKikimrTxDataShard::TLock>> DataShardLocks; NKikimr::NKqp::TTxAllocatorState::TPtr TxAlloc; bool ValidateLocks = false; bool EraseLocks = false; diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index 1618da995e3..2cc4f28835d 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -1669,7 +1669,7 @@ public: TFuture<TExecPhysicalResult> ExecutePure(TExecPhysicalRequest&& request, TQueryData::TPtr params) override { YQL_ENSURE(!request.Transactions.empty()); - YQL_ENSURE(request.Locks.empty()); + YQL_ENSURE(request.DataShardLocks.empty()); YQL_ENSURE(!request.NeedTxId); auto containOnlyPureStages = [](const auto& request) { diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 5eda7bcb2b0..3f27a14b1dc 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -7,6 +7,7 @@ #include <ydb/core/kqp/common/kqp_timeouts.h> #include <ydb/core/kqp/compile_service/kqp_compile_service.h> #include <ydb/core/kqp/executer_actor/kqp_executer.h> +#include <ydb/core/kqp/executer_actor/kqp_locks_helper.h> #include <ydb/core/kqp/host/kqp_host_impl.h> #include <ydb/core/kqp/opt/kqp_query_plan.h> #include <ydb/core/kqp/provider/yql_kikimr_provider.h> @@ -1275,8 +1276,10 @@ public: << " EraseLocks: " << request.EraseLocks); for (auto& [lockId, lock] : txCtx.Locks.LocksMap) { - request.Locks.emplace_back(lock.GetValueRef(txCtx.Locks.LockType)); + auto dsLock = ExtractLock(lock.GetValueRef(txCtx.Locks.LockType)); + request.DataShardLocks[dsLock.GetDataShard()].emplace_back(dsLock); } + } request.TopicOperations = std::move(txCtx.TopicOperations); @@ -1288,7 +1291,7 @@ public: QueryState->Orbit, QueryState->CurrentTx, request.Transactions.size(), - request.Locks.size(), + txCtx.Locks.Size(), request.AcquireLocksTxId.Defined()); SendToExecuter(std::move(request)); @@ -1886,8 +1889,10 @@ public: // Should tx with empty LocksMap be aborted? for (auto& [lockId, lock] : txCtx->Locks.LocksMap) { - request.Locks.emplace_back(lock.GetValueRef(txCtx->Locks.LockType)); + auto dsLock = ExtractLock(lock.GetValueRef(txCtx->Locks.LockType)); + request.DataShardLocks[dsLock.GetDataShard()].emplace_back(dsLock); } + SendToExecuter(std::move(request), true); } |