summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorva-kuznecov <[email protected]>2022-10-14 12:41:20 +0300
committerva-kuznecov <[email protected]>2022-10-14 12:41:20 +0300
commita61dc3f3f718d33a48fa0597c1376ad017921449 (patch)
treec96e7ab2acaa3f7560cec839b1e33e7f085be9a5
parent3a6da1a3e3ef57df3863e2884519ff1b9ec7b8e9 (diff)
Move some pure functions from kqp_session_actor.cpp
-rw-r--r--ydb/core/kqp/common/kqp_transform.h31
-rw-r--r--ydb/core/kqp/kqp_session_actor.cpp64
-rw-r--r--ydb/core/kqp/kqp_worker_common.cpp33
-rw-r--r--ydb/core/kqp/kqp_worker_common.h9
4 files changed, 71 insertions, 66 deletions
diff --git a/ydb/core/kqp/common/kqp_transform.h b/ydb/core/kqp/common/kqp_transform.h
index a9b1face3b0..320d0dffeeb 100644
--- a/ydb/core/kqp/common/kqp_transform.h
+++ b/ydb/core/kqp/common/kqp_transform.h
@@ -241,6 +241,37 @@ public:
ForceNewEngineSettings.ForceNewEngineLevel = level;
}
+ void SetIsolationLevel(const Ydb::Table::TransactionSettings& settings) {
+ switch (settings.tx_mode_case()) {
+ case Ydb::Table::TransactionSettings::kSerializableReadWrite:
+ EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE;
+ Readonly = false;
+ break;
+
+ case Ydb::Table::TransactionSettings::kOnlineReadOnly:
+ EffectiveIsolationLevel = settings.online_read_only().allow_inconsistent_reads()
+ ? NKikimrKqp::ISOLATION_LEVEL_READ_UNCOMMITTED
+ : NKikimrKqp::ISOLATION_LEVEL_READ_COMMITTED;
+ Readonly = true;
+ break;
+
+ case Ydb::Table::TransactionSettings::kStaleReadOnly:
+ EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_READ_STALE;
+ Readonly = true;
+ break;
+
+ case Ydb::Table::TransactionSettings::kSnapshotReadOnly:
+ // TODO: (KIKIMR-3374) Use separate isolation mode to avoid optimistic locks.
+ EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE;
+ Readonly = true;
+ break;
+
+ case Ydb::Table::TransactionSettings::TX_MODE_NOT_SET:
+ YQL_ENSURE(false, "tx_mode not set, settings: " << settings);
+ break;
+ };
+ }
+
public:
struct TParamsState : public TThrRefBase {
TParamValueMap Values;
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp
index 1743b8995b2..7cff1bf519e 100644
--- a/ydb/core/kqp/kqp_session_actor.cpp
+++ b/ydb/core/kqp/kqp_session_actor.cpp
@@ -110,35 +110,6 @@ struct TKqpCleanupCtx {
TInstant Start = TInstant::Now();
};
-EKikimrStatsMode GetStatsModeInt(const NKikimrKqp::TQueryRequest& queryRequest) {
- switch (queryRequest.GetCollectStats()) {
- case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE:
- return EKikimrStatsMode::None;
- case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_BASIC:
- return EKikimrStatsMode::Basic;
- case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL:
- return EKikimrStatsMode::Full;
- case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_PROFILE:
- return EKikimrStatsMode::Profile;
- default:
- return EKikimrStatsMode::None;
- }
-}
-
-TKikimrQueryLimits GetQueryLimits(const TKqpWorkerSettings& settings) {
- const auto& queryLimitsProto = settings.Service.GetQueryLimits();
- const auto& phaseLimitsProto = queryLimitsProto.GetPhaseLimits();
-
- TKikimrQueryLimits queryLimits;
- auto& phaseLimits = queryLimits.PhaseLimits;
- phaseLimits.AffectedShardsLimit = phaseLimitsProto.GetAffectedShardsLimit();
- phaseLimits.ReadsetCountLimit = phaseLimitsProto.GetReadsetCountLimit();
- phaseLimits.ComputeNodeMemoryLimitBytes = phaseLimitsProto.GetComputeNodeMemoryLimitBytes();
- phaseLimits.TotalReadSizeLimitBytes = phaseLimitsProto.GetTotalReadSizeLimitBytes();
-
- return queryLimits;
-}
-
class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
@@ -626,39 +597,6 @@ public:
ExecuteOrDefer();
}
- void SetIsolationLevel(const Ydb::Table::TransactionSettings& settings) {
- YQL_ENSURE(QueryState->TxCtx);
- auto& txCtx = QueryState->TxCtx;
- switch (settings.tx_mode_case()) {
- case Ydb::Table::TransactionSettings::kSerializableReadWrite:
- txCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE;
- txCtx->Readonly = false;
- break;
-
- case Ydb::Table::TransactionSettings::kOnlineReadOnly:
- txCtx->EffectiveIsolationLevel = settings.online_read_only().allow_inconsistent_reads()
- ? NKikimrKqp::ISOLATION_LEVEL_READ_UNCOMMITTED
- : NKikimrKqp::ISOLATION_LEVEL_READ_COMMITTED;
- txCtx->Readonly = true;
- break;
-
- case Ydb::Table::TransactionSettings::kStaleReadOnly:
- txCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_READ_STALE;
- txCtx->Readonly = true;
- break;
-
- case Ydb::Table::TransactionSettings::kSnapshotReadOnly:
- // TODO: (KIKIMR-3374) Use separate isolation mode to avoid optimistic locks.
- txCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE;
- txCtx->Readonly = true;
- break;
-
- case Ydb::Table::TransactionSettings::TX_MODE_NOT_SET:
- YQL_ENSURE(false, "tx_mode not set, settings: " << settings);
- break;
- };
- }
-
void RemoveOldTransactions() {
if (ExplicitTransactions.Size() == *Config->_KqpMaxActiveTxPerSession.Get()) {
auto it = ExplicitTransactions.FindOldest();
@@ -690,7 +628,7 @@ public:
QueryState->TxId = UlidGen.Next();
QueryState->TxId_Human = QueryState->TxId.ToString();
QueryState->TxCtx = MakeIntrusive<TKqpTransactionContext>(false);
- SetIsolationLevel(settings);
+ QueryState->TxCtx->SetIsolationLevel(settings);
CreateNewTx();
Counters->ReportTxCreated(Settings.DbCounters);
diff --git a/ydb/core/kqp/kqp_worker_common.cpp b/ydb/core/kqp/kqp_worker_common.cpp
index 29afa0c9dda..392156519da 100644
--- a/ydb/core/kqp/kqp_worker_common.cpp
+++ b/ydb/core/kqp/kqp_worker_common.cpp
@@ -2,7 +2,38 @@
namespace NKikimr::NKqp {
-void SlowLogQuery(const TActorContext &ctx, const NYql::TKikimrConfiguration* config, const TKqpRequestInfo& requestInfo,
+using namespace NYql;
+
+EKikimrStatsMode GetStatsModeInt(const NKikimrKqp::TQueryRequest& queryRequest) {
+ switch (queryRequest.GetCollectStats()) {
+ case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE:
+ return EKikimrStatsMode::None;
+ case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_BASIC:
+ return EKikimrStatsMode::Basic;
+ case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL:
+ return EKikimrStatsMode::Full;
+ case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_PROFILE:
+ return EKikimrStatsMode::Profile;
+ default:
+ return EKikimrStatsMode::None;
+ }
+}
+
+TKikimrQueryLimits GetQueryLimits(const TKqpWorkerSettings& settings) {
+ const auto& queryLimitsProto = settings.Service.GetQueryLimits();
+ const auto& phaseLimitsProto = queryLimitsProto.GetPhaseLimits();
+
+ TKikimrQueryLimits queryLimits;
+ auto& phaseLimits = queryLimits.PhaseLimits;
+ phaseLimits.AffectedShardsLimit = phaseLimitsProto.GetAffectedShardsLimit();
+ phaseLimits.ReadsetCountLimit = phaseLimitsProto.GetReadsetCountLimit();
+ phaseLimits.ComputeNodeMemoryLimitBytes = phaseLimitsProto.GetComputeNodeMemoryLimitBytes();
+ phaseLimits.TotalReadSizeLimitBytes = phaseLimitsProto.GetTotalReadSizeLimitBytes();
+
+ return queryLimits;
+}
+
+void SlowLogQuery(const TActorContext &ctx, const TKikimrConfiguration* config, const TKqpRequestInfo& requestInfo,
const TDuration& duration, Ydb::StatusIds::StatusCode status, const TString& userToken, ui64 parametersSize,
NKikimrKqp::TEvQueryResponse *record, const std::function<TString()> extractQueryText)
{
diff --git a/ydb/core/kqp/kqp_worker_common.h b/ydb/core/kqp/kqp_worker_common.h
index 571f486d3fd..871c8be02a0 100644
--- a/ydb/core/kqp/kqp_worker_common.h
+++ b/ydb/core/kqp/kqp_worker_common.h
@@ -1,6 +1,8 @@
-#include <ydb/core/protos/kqp.pb.h>
-#include <ydb/core/kqp/provider/yql_kikimr_settings.h>
#include <ydb/core/kqp/kqp_impl.h>
+#include <ydb/core/kqp/provider/yql_kikimr_gateway.h>
+#include <ydb/core/kqp/provider/yql_kikimr_provider.h>
+#include <ydb/core/kqp/provider/yql_kikimr_settings.h>
+#include <ydb/core/protos/kqp.pb.h>
#include <library/cpp/actors/core/actor.h>
#include <library/cpp/actors/core/log.h>
@@ -122,4 +124,7 @@ void SlowLogQuery(const TActorContext &ctx, const NYql::TKikimrConfiguration* co
const TDuration& duration, Ydb::StatusIds::StatusCode status, const TString& userToken, ui64 parametersSize,
NKikimrKqp::TEvQueryResponse *record, const std::function<TString()> extractQueryText);
+NYql::EKikimrStatsMode GetStatsModeInt(const NKikimrKqp::TQueryRequest& queryRequest);
+NYql::TKikimrQueryLimits GetQueryLimits(const TKqpWorkerSettings& settings);
+
}