diff options
| author | va-kuznecov <[email protected]> | 2022-10-14 12:41:20 +0300 |
|---|---|---|
| committer | va-kuznecov <[email protected]> | 2022-10-14 12:41:20 +0300 |
| commit | a61dc3f3f718d33a48fa0597c1376ad017921449 (patch) | |
| tree | c96e7ab2acaa3f7560cec839b1e33e7f085be9a5 | |
| parent | 3a6da1a3e3ef57df3863e2884519ff1b9ec7b8e9 (diff) | |
Move some pure functions from kqp_session_actor.cpp
| -rw-r--r-- | ydb/core/kqp/common/kqp_transform.h | 31 | ||||
| -rw-r--r-- | ydb/core/kqp/kqp_session_actor.cpp | 64 | ||||
| -rw-r--r-- | ydb/core/kqp/kqp_worker_common.cpp | 33 | ||||
| -rw-r--r-- | ydb/core/kqp/kqp_worker_common.h | 9 |
4 files changed, 71 insertions, 66 deletions
diff --git a/ydb/core/kqp/common/kqp_transform.h b/ydb/core/kqp/common/kqp_transform.h index a9b1face3b0..320d0dffeeb 100644 --- a/ydb/core/kqp/common/kqp_transform.h +++ b/ydb/core/kqp/common/kqp_transform.h @@ -241,6 +241,37 @@ public: ForceNewEngineSettings.ForceNewEngineLevel = level; } + void SetIsolationLevel(const Ydb::Table::TransactionSettings& settings) { + switch (settings.tx_mode_case()) { + case Ydb::Table::TransactionSettings::kSerializableReadWrite: + EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE; + Readonly = false; + break; + + case Ydb::Table::TransactionSettings::kOnlineReadOnly: + EffectiveIsolationLevel = settings.online_read_only().allow_inconsistent_reads() + ? NKikimrKqp::ISOLATION_LEVEL_READ_UNCOMMITTED + : NKikimrKqp::ISOLATION_LEVEL_READ_COMMITTED; + Readonly = true; + break; + + case Ydb::Table::TransactionSettings::kStaleReadOnly: + EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_READ_STALE; + Readonly = true; + break; + + case Ydb::Table::TransactionSettings::kSnapshotReadOnly: + // TODO: (KIKIMR-3374) Use separate isolation mode to avoid optimistic locks. + EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE; + Readonly = true; + break; + + case Ydb::Table::TransactionSettings::TX_MODE_NOT_SET: + YQL_ENSURE(false, "tx_mode not set, settings: " << settings); + break; + }; + } + public: struct TParamsState : public TThrRefBase { TParamValueMap Values; diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index 1743b8995b2..7cff1bf519e 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -110,35 +110,6 @@ struct TKqpCleanupCtx { TInstant Start = TInstant::Now(); }; -EKikimrStatsMode GetStatsModeInt(const NKikimrKqp::TQueryRequest& queryRequest) { - switch (queryRequest.GetCollectStats()) { - case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE: - return EKikimrStatsMode::None; - case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_BASIC: - return EKikimrStatsMode::Basic; - case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL: - return EKikimrStatsMode::Full; - case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_PROFILE: - return EKikimrStatsMode::Profile; - default: - return EKikimrStatsMode::None; - } -} - -TKikimrQueryLimits GetQueryLimits(const TKqpWorkerSettings& settings) { - const auto& queryLimitsProto = settings.Service.GetQueryLimits(); - const auto& phaseLimitsProto = queryLimitsProto.GetPhaseLimits(); - - TKikimrQueryLimits queryLimits; - auto& phaseLimits = queryLimits.PhaseLimits; - phaseLimits.AffectedShardsLimit = phaseLimitsProto.GetAffectedShardsLimit(); - phaseLimits.ReadsetCountLimit = phaseLimitsProto.GetReadsetCountLimit(); - phaseLimits.ComputeNodeMemoryLimitBytes = phaseLimitsProto.GetComputeNodeMemoryLimitBytes(); - phaseLimits.TotalReadSizeLimitBytes = phaseLimitsProto.GetTotalReadSizeLimitBytes(); - - return queryLimits; -} - class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> { public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { @@ -626,39 +597,6 @@ public: ExecuteOrDefer(); } - void SetIsolationLevel(const Ydb::Table::TransactionSettings& settings) { - YQL_ENSURE(QueryState->TxCtx); - auto& txCtx = QueryState->TxCtx; - switch (settings.tx_mode_case()) { - case Ydb::Table::TransactionSettings::kSerializableReadWrite: - txCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE; - txCtx->Readonly = false; - break; - - case Ydb::Table::TransactionSettings::kOnlineReadOnly: - txCtx->EffectiveIsolationLevel = settings.online_read_only().allow_inconsistent_reads() - ? NKikimrKqp::ISOLATION_LEVEL_READ_UNCOMMITTED - : NKikimrKqp::ISOLATION_LEVEL_READ_COMMITTED; - txCtx->Readonly = true; - break; - - case Ydb::Table::TransactionSettings::kStaleReadOnly: - txCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_READ_STALE; - txCtx->Readonly = true; - break; - - case Ydb::Table::TransactionSettings::kSnapshotReadOnly: - // TODO: (KIKIMR-3374) Use separate isolation mode to avoid optimistic locks. - txCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE; - txCtx->Readonly = true; - break; - - case Ydb::Table::TransactionSettings::TX_MODE_NOT_SET: - YQL_ENSURE(false, "tx_mode not set, settings: " << settings); - break; - }; - } - void RemoveOldTransactions() { if (ExplicitTransactions.Size() == *Config->_KqpMaxActiveTxPerSession.Get()) { auto it = ExplicitTransactions.FindOldest(); @@ -690,7 +628,7 @@ public: QueryState->TxId = UlidGen.Next(); QueryState->TxId_Human = QueryState->TxId.ToString(); QueryState->TxCtx = MakeIntrusive<TKqpTransactionContext>(false); - SetIsolationLevel(settings); + QueryState->TxCtx->SetIsolationLevel(settings); CreateNewTx(); Counters->ReportTxCreated(Settings.DbCounters); diff --git a/ydb/core/kqp/kqp_worker_common.cpp b/ydb/core/kqp/kqp_worker_common.cpp index 29afa0c9dda..392156519da 100644 --- a/ydb/core/kqp/kqp_worker_common.cpp +++ b/ydb/core/kqp/kqp_worker_common.cpp @@ -2,7 +2,38 @@ namespace NKikimr::NKqp { -void SlowLogQuery(const TActorContext &ctx, const NYql::TKikimrConfiguration* config, const TKqpRequestInfo& requestInfo, +using namespace NYql; + +EKikimrStatsMode GetStatsModeInt(const NKikimrKqp::TQueryRequest& queryRequest) { + switch (queryRequest.GetCollectStats()) { + case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE: + return EKikimrStatsMode::None; + case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_BASIC: + return EKikimrStatsMode::Basic; + case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL: + return EKikimrStatsMode::Full; + case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_PROFILE: + return EKikimrStatsMode::Profile; + default: + return EKikimrStatsMode::None; + } +} + +TKikimrQueryLimits GetQueryLimits(const TKqpWorkerSettings& settings) { + const auto& queryLimitsProto = settings.Service.GetQueryLimits(); + const auto& phaseLimitsProto = queryLimitsProto.GetPhaseLimits(); + + TKikimrQueryLimits queryLimits; + auto& phaseLimits = queryLimits.PhaseLimits; + phaseLimits.AffectedShardsLimit = phaseLimitsProto.GetAffectedShardsLimit(); + phaseLimits.ReadsetCountLimit = phaseLimitsProto.GetReadsetCountLimit(); + phaseLimits.ComputeNodeMemoryLimitBytes = phaseLimitsProto.GetComputeNodeMemoryLimitBytes(); + phaseLimits.TotalReadSizeLimitBytes = phaseLimitsProto.GetTotalReadSizeLimitBytes(); + + return queryLimits; +} + +void SlowLogQuery(const TActorContext &ctx, const TKikimrConfiguration* config, const TKqpRequestInfo& requestInfo, const TDuration& duration, Ydb::StatusIds::StatusCode status, const TString& userToken, ui64 parametersSize, NKikimrKqp::TEvQueryResponse *record, const std::function<TString()> extractQueryText) { diff --git a/ydb/core/kqp/kqp_worker_common.h b/ydb/core/kqp/kqp_worker_common.h index 571f486d3fd..871c8be02a0 100644 --- a/ydb/core/kqp/kqp_worker_common.h +++ b/ydb/core/kqp/kqp_worker_common.h @@ -1,6 +1,8 @@ -#include <ydb/core/protos/kqp.pb.h> -#include <ydb/core/kqp/provider/yql_kikimr_settings.h> #include <ydb/core/kqp/kqp_impl.h> +#include <ydb/core/kqp/provider/yql_kikimr_gateway.h> +#include <ydb/core/kqp/provider/yql_kikimr_provider.h> +#include <ydb/core/kqp/provider/yql_kikimr_settings.h> +#include <ydb/core/protos/kqp.pb.h> #include <library/cpp/actors/core/actor.h> #include <library/cpp/actors/core/log.h> @@ -122,4 +124,7 @@ void SlowLogQuery(const TActorContext &ctx, const NYql::TKikimrConfiguration* co const TDuration& duration, Ydb::StatusIds::StatusCode status, const TString& userToken, ui64 parametersSize, NKikimrKqp::TEvQueryResponse *record, const std::function<TString()> extractQueryText); +NYql::EKikimrStatsMode GetStatsModeInt(const NKikimrKqp::TQueryRequest& queryRequest); +NYql::TKikimrQueryLimits GetQueryLimits(const TKqpWorkerSettings& settings); + } |
