diff options
author | andrewproni <andrewproni@yandex-team.com> | 2023-08-30 19:37:26 +0300 |
---|---|---|
committer | andrewproni <andrewproni@yandex-team.com> | 2023-08-30 19:54:14 +0300 |
commit | e6a04952dc423a4290e2c3b2bc9b5faff1c3a214 (patch) | |
tree | ad7c9fd16c95121d6a1cca35a1fcc50f856f0417 | |
parent | 5205b9dff65d60847cdbfd4479b9b8462c33d148 (diff) | |
download | ydb-e6a04952dc423a4290e2c3b2bc9b5faff1c3a214.tar.gz |
KIKIMR-19038: config support for QueryService
21 files changed, 113 insertions, 60 deletions
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index 161733d738..607364d78e 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -2104,7 +2104,7 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu new NYql::NLog::TTlsLogBackend(new TNullLogBackend()))); auto proxy = NKqp::CreateKqpProxyService(Config.GetLogConfig(), Config.GetTableServiceConfig(), Config.GetAuthConfig().GetTokenAccessorConfig(), - std::move(settings), Factories->QueryReplayBackendFactory, std::move(kqpProxySharedResources)); + Config.GetQueryServiceConfig(), std::move(settings), Factories->QueryReplayBackendFactory, std::move(kqpProxySharedResources)); setup->LocalServices.push_back(std::make_pair( NKqp::MakeKqpProxyID(NodeId), TActorSetupCmd(proxy, TMailboxType::HTSwap, appData->UserPoolId))); diff --git a/ydb/core/kqp/common/kqp_timeouts.cpp b/ydb/core/kqp/common/kqp_timeouts.cpp index 7124e11943..66f8ba5103 100644 --- a/ydb/core/kqp/common/kqp_timeouts.cpp +++ b/ydb/core/kqp/common/kqp_timeouts.cpp @@ -5,7 +5,9 @@ namespace NKikimr::NKqp { namespace { -ui64 GetDefaultQueryTimeoutMs(NKikimrKqp::EQueryType queryType, const NKikimrConfig::TTableServiceConfig& tableServiceConfig) { +ui64 GetDefaultQueryTimeoutMs(NKikimrKqp::EQueryType queryType, + const NKikimrConfig::TTableServiceConfig& tableServiceConfig, + const NKikimrConfig::TQueryServiceConfig& queryServiceConfig) { const auto& queryLimits = tableServiceConfig.GetQueryLimits(); switch (queryType) { @@ -17,9 +19,13 @@ ui64 GetDefaultQueryTimeoutMs(NKikimrKqp::EQueryType queryType, const NKikimrCon case NKikimrKqp::QUERY_TYPE_AST_DML: case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY: case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY: - case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT: return queryLimits.GetDataQueryTimeoutMs(); + case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT: + return queryServiceConfig.GetScriptOperationTimeoutDefaultSeconds() + ? queryServiceConfig.GetScriptOperationTimeoutDefaultSeconds() + : SCRIPT_TIMEOUT_LIMIT.MilliSeconds(); + case NKikimrKqp::QUERY_TYPE_SQL_SCAN: case NKikimrKqp::QUERY_TYPE_AST_SCAN: return queryLimits.GetScanQueryTimeoutMs(); @@ -31,8 +37,11 @@ ui64 GetDefaultQueryTimeoutMs(NKikimrKqp::EQueryType queryType, const NKikimrCon } -TDuration GetQueryTimeout(NKikimrKqp::EQueryType queryType, ui64 timeoutMs, const NKikimrConfig::TTableServiceConfig& tableServiceConfig) { - ui64 defaultTimeoutMs = GetDefaultQueryTimeoutMs(queryType, tableServiceConfig); +TDuration GetQueryTimeout(NKikimrKqp::EQueryType queryType, + ui64 timeoutMs, + const NKikimrConfig::TTableServiceConfig& tableServiceConfig, + const NKikimrConfig::TQueryServiceConfig& queryServiceConfig) { + ui64 defaultTimeoutMs = GetDefaultQueryTimeoutMs(queryType, tableServiceConfig, queryServiceConfig); return timeoutMs diff --git a/ydb/core/kqp/common/kqp_timeouts.h b/ydb/core/kqp/common/kqp_timeouts.h index 1a710d49d1..00cd245e29 100644 --- a/ydb/core/kqp/common/kqp_timeouts.h +++ b/ydb/core/kqp/common/kqp_timeouts.h @@ -6,6 +6,11 @@ namespace NKikimr::NKqp { -TDuration GetQueryTimeout(NKikimrKqp::EQueryType queryType, ui64 timeoutMs, const NKikimrConfig::TTableServiceConfig& tableServiceConfig); +constexpr TDuration SCRIPT_TIMEOUT_LIMIT = TDuration::Days(365); + +TDuration GetQueryTimeout(NKikimrKqp::EQueryType queryType, + ui64 timeoutMs, + const NKikimrConfig::TTableServiceConfig& tableServiceConfig, + const NKikimrConfig::TQueryServiceConfig& queryServiceConfig); } diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 053556b262..9451e3193e 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -170,6 +170,7 @@ public: TKqpProxyService(const NKikimrConfig::TLogConfig& logConfig, const NKikimrConfig::TTableServiceConfig& tableServiceConfig, const NKikimrProto::TTokenAccessorConfig& tokenAccessorConfig, + const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, TVector<NKikimrKqp::TKqpSetting>&& settings, std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory, std::shared_ptr<TKqpProxySharedResources>&& kqpProxySharedResources) @@ -177,6 +178,7 @@ public: , LogConfig(logConfig) , TableServiceConfig(tableServiceConfig) , TokenAccessorConfig(tokenAccessorConfig) + , QueryServiceConfig(queryServiceConfig) , KqpSettings(std::make_shared<const TKqpSettings>(std::move(settings))) , QueryReplayFactory(std::move(queryReplayFactory)) , HttpGateway(NYql::IHTTPGateway::Make(&HttpGatewayConfig)) // TODO: pass config and counters @@ -645,7 +647,7 @@ public: auto cancelAfter = ev->Get()->GetCancelAfter(); auto timeout = ev->Get()->GetOperationTimeout(); - auto timerDuration = GetQueryTimeout(queryType, timeout.MilliSeconds(), TableServiceConfig); + auto timerDuration = GetQueryTimeout(queryType, timeout.MilliSeconds(), TableServiceConfig, QueryServiceConfig); if (cancelAfter) { timerDuration = Min(timerDuration, cancelAfter); } @@ -659,7 +661,13 @@ public: void Handle(TEvKqp::TEvScriptRequest::TPtr& ev) { if (CheckScriptExecutionsTablesReady<TEvKqp::TEvScriptResponse>(ev)) { - Register(CreateScriptExecutionCreatorActor(std::move(ev)), TMailboxType::HTSwap, AppData()->SystemPoolId); + auto req = ev->Get()->Record.MutableRequest(); + auto maxRunTime = GetQueryTimeout(req->GetType(), req->GetTimeoutMs(), TableServiceConfig, QueryServiceConfig); + req->SetTimeoutMs(maxRunTime.MilliSeconds()); + if (req->GetCancelAfterMs()) { + maxRunTime = TDuration::MilliSeconds(Min(req->GetCancelAfterMs(), maxRunTime.MilliSeconds())); + } + Register(CreateScriptExecutionCreatorActor(std::move(ev), QueryServiceConfig, maxRunTime), TMailboxType::HTSwap, AppData()->SystemPoolId); } } @@ -1350,7 +1358,7 @@ private: auto dbCounters = Counters->GetDbCounters(database); - TKqpWorkerSettings workerSettings(cluster, database, TableServiceConfig, dbCounters); + TKqpWorkerSettings workerSettings(cluster, database, TableServiceConfig, QueryServiceConfig, dbCounters); workerSettings.LongSession = longSession; auto config = CreateConfig(KqpSettings, workerSettings); @@ -1550,6 +1558,7 @@ private: NKikimrConfig::TLogConfig LogConfig; NKikimrConfig::TTableServiceConfig TableServiceConfig; NKikimrProto::TTokenAccessorConfig TokenAccessorConfig; + NKikimrConfig::TQueryServiceConfig QueryServiceConfig; TKqpSettings::TConstPtr KqpSettings; NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory; std::shared_ptr<IQueryReplayBackendFactory> QueryReplayFactory; @@ -1603,11 +1612,12 @@ private: IActor* CreateKqpProxyService(const NKikimrConfig::TLogConfig& logConfig, const NKikimrConfig::TTableServiceConfig& tableServiceConfig, const NKikimrProto::TTokenAccessorConfig& tokenAccessorConfig, + const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, TVector<NKikimrKqp::TKqpSetting>&& settings, std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory, std::shared_ptr<TKqpProxySharedResources> kqpProxySharedResources) { - return new TKqpProxyService(logConfig, tableServiceConfig, tokenAccessorConfig, std::move(settings), + return new TKqpProxyService(logConfig, tableServiceConfig, tokenAccessorConfig, queryServiceConfig, std::move(settings), std::move(queryReplayFactory),std::move(kqpProxySharedResources)); } diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.h b/ydb/core/kqp/proxy_service/kqp_proxy_service.h index 868bab3975..c2ac2de2c8 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.h +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.h @@ -50,6 +50,7 @@ TPeerStats CalcPeerStats(const TVector<NKikimrKqp::TKqpProxyNodeResources>& data IActor* CreateKqpProxyService(const NKikimrConfig::TLogConfig& logConfig, const NKikimrConfig::TTableServiceConfig& tableServiceConfig, const NKikimrProto::TTokenAccessorConfig& tokenAccessorConfig, + const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, TVector<NKikimrKqp::TKqpSetting>&& settings, std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory, std::shared_ptr<TKqpProxySharedResources> kqpProxySharedResources); diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp index d226a491d9..f33e4d100e 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp +++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp @@ -44,8 +44,6 @@ namespace { constexpr TDuration LEASE_DURATION = TDuration::Seconds(30); constexpr TDuration DEADLINE_OFFSET = TDuration::Minutes(20); constexpr TDuration BRO_RUN_INTERVAL = TDuration::Minutes(60); -constexpr TDuration DEFAULT_OPERATION_TTL = TDuration::Days(1); -constexpr TDuration DEFAULT_RESULTS_TTL = TDuration::Days(1); TString SerializeIssues(const NYql::TIssues& issues) { NYql::TIssue root; @@ -229,14 +227,16 @@ Ydb::Query::ExecMode GetExecModeFromAction(NKikimrKqp::EQueryAction action) { class TCreateScriptOperationQuery : public TQueryBase { public: - TCreateScriptOperationQuery(const TString& executionId, const NActors::TActorId& runScriptActorId, const NKikimrKqp::TEvQueryRequest& req, TDuration operationTtl, TDuration resultsTtl, TDuration leaseDuration = TDuration::Zero()) + TCreateScriptOperationQuery(const TString& executionId, const NActors::TActorId& runScriptActorId, const NKikimrKqp::TEvQueryRequest& req, TDuration operationTtl, TDuration resultsTtl, TDuration leaseDuration = TDuration::Zero(), TDuration maxRunTime = SCRIPT_TIMEOUT_LIMIT) : ExecutionId(executionId) , RunScriptActorId(runScriptActorId) , Request(req) , OperationTtl(operationTtl) , ResultsTtl(resultsTtl) , LeaseDuration(leaseDuration ? leaseDuration : LEASE_DURATION) + , MaxRunTime(Max(maxRunTime, TDuration::Days(1))) { + Y_ENSURE(MaxRunTime); } void OnRunQuery() override { @@ -251,10 +251,11 @@ public: DECLARE $syntax AS Int32; DECLARE $meta AS JsonDocument; DECLARE $lease_duration AS Interval; + DECLARE $max_run_time AS Interval; UPSERT INTO `.metadata/script_executions` - (database, execution_id, run_script_actor_id, execution_status, execution_mode, start_ts, query_text, syntax, meta) - VALUES ($database, $execution_id, $run_script_actor_id, $execution_status, $execution_mode, CurrentUtcTimestamp(), $query_text, $syntax, $meta); + (database, execution_id, run_script_actor_id, execution_status, execution_mode, start_ts, query_text, syntax, meta, expire_at) + VALUES ($database, $execution_id, $run_script_actor_id, $execution_status, $execution_mode, CurrentUtcTimestamp(), $query_text, $syntax, $meta, CurrentUtcTimestamp() + $max_run_time); UPSERT INTO `.metadata/script_execution_leases` (database, execution_id, lease_deadline, lease_generation) @@ -293,6 +294,9 @@ public: .Build() .AddParam("$lease_duration") .Interval(static_cast<i64>(LeaseDuration.MicroSeconds())) + .Build() + .AddParam("$max_run_time") + .Interval(static_cast<i64>(MaxRunTime.MicroSeconds())) .Build(); RunDataQuery(sql, ¶ms); @@ -318,12 +322,15 @@ private: const TDuration OperationTtl; const TDuration ResultsTtl; const TDuration LeaseDuration; + const TDuration MaxRunTime; }; struct TCreateScriptExecutionActor : public TActorBootstrapped<TCreateScriptExecutionActor> { - TCreateScriptExecutionActor(TEvKqp::TEvScriptRequest::TPtr&& ev, TDuration leaseDuration = TDuration::Zero()) + TCreateScriptExecutionActor(TEvKqp::TEvScriptRequest::TPtr&& ev, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, TDuration maxRunTime = SCRIPT_TIMEOUT_LIMIT, TDuration leaseDuration = TDuration::Zero()) : Event(std::move(ev)) + , QueryServiceConfig(queryServiceConfig) , LeaseDuration(leaseDuration ? leaseDuration : LEASE_DURATION) + , MaxRunTime(maxRunTime) { } @@ -331,12 +338,16 @@ struct TCreateScriptExecutionActor : public TActorBootstrapped<TCreateScriptExec Become(&TCreateScriptExecutionActor::StateFunc); ExecutionId = CreateGuidAsString(); - auto operationTtl = Event->Get()->ForgetAfter ? Event->Get()->ForgetAfter : DEFAULT_OPERATION_TTL; - auto resultsTtl = Event->Get()->ResultsTtl ? Event->Get()->ResultsTtl : DEFAULT_RESULTS_TTL; - resultsTtl = Min(operationTtl, resultsTtl); + + auto operationTtl = Event->Get()->ForgetAfter ? Event->Get()->ForgetAfter : TDuration::Seconds(QueryServiceConfig.GetScriptForgetAfterDefaultSeconds()); + auto resultsTtl = Event->Get()->ResultsTtl ? Event->Get()->ResultsTtl : TDuration::Seconds(QueryServiceConfig.GetScriptResultsTtlDefaultSeconds()); + if (operationTtl) { + resultsTtl = Min(operationTtl, resultsTtl); + } + // Start request - RunScriptActorId = Register(CreateRunScriptActor(ExecutionId, Event->Get()->Record, Event->Get()->Record.GetRequest().GetDatabase(), 1, LeaseDuration)); - Register(new TCreateScriptOperationQuery(ExecutionId, RunScriptActorId, Event->Get()->Record, operationTtl, resultsTtl, LeaseDuration)); + RunScriptActorId = Register(CreateRunScriptActor(ExecutionId, Event->Get()->Record, Event->Get()->Record.GetRequest().GetDatabase(), 1, LeaseDuration, QueryServiceConfig)); + Register(new TCreateScriptOperationQuery(ExecutionId, RunScriptActorId, Event->Get()->Record, operationTtl, resultsTtl, LeaseDuration, MaxRunTime)); } void Handle(TEvPrivate::TEvCreateScriptOperationResponse::TPtr& ev) { @@ -355,9 +366,11 @@ struct TCreateScriptExecutionActor : public TActorBootstrapped<TCreateScriptExec private: TEvKqp::TEvScriptRequest::TPtr Event; + const NKikimrConfig::TQueryServiceConfig QueryServiceConfig; TString ExecutionId; NActors::TActorId RunScriptActorId; - TDuration LeaseDuration; + const TDuration LeaseDuration; + const TDuration MaxRunTime; }; class TScriptLeaseUpdater : public TQueryBase { @@ -566,7 +579,7 @@ public: end_ts = CurrentUtcTimestamp(), stats = $stats, ast = $ast, - expire_at = CurrentUtcTimestamp() + $operation_ttl + expire_at = IF($operation_ttl > CAST(0 AS Interval), CurrentUtcTimestamp() + $operation_ttl, NULL) WHERE database = $database AND execution_id = $execution_id; DELETE FROM `.metadata/script_execution_leases` @@ -574,7 +587,7 @@ public: UPDATE `.metadata/result_sets` SET - expire_at = CurrentUtcTimestamp() + $results_ttl + expire_at = IF($results_ttl > CAST(0 AS Interval), CurrentUtcTimestamp() + $results_ttl, NULL) where database = $database AND execution_id = $execution_id; )"; @@ -1948,12 +1961,12 @@ public: } const auto ttl = GetTtlFromSerializedMeta(*serializedMeta); - if (!ttl) { - Finish(Ydb::StatusIds::INTERNAL_ERROR, "Metainformation is corrupted"); - return; - } - const auto [_, resultsTtl] = *ttl; - if ((*endTs + resultsTtl) < TInstant::Now()){ + if (!ttl) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Metainformation is corrupted"); + return; + } + const auto [_, resultsTtl] = *ttl; + if (resultsTtl && (*endTs + resultsTtl) < TInstant::Now()){ Finish(Ydb::StatusIds::NOT_FOUND, "Results are expired"); return; } @@ -2068,8 +2081,8 @@ private: } // anonymous namespace -NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPtr&& ev) { - return new TCreateScriptExecutionActor(std::move(ev)); +NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPtr&& ev, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, TDuration maxRunTime) { + return new TCreateScriptExecutionActor(std::move(ev), queryServiceConfig, maxRunTime); } NActors::IActor* CreateScriptExecutionsTablesCreator(THolder<NActors::IEventBase> resultEvent) { diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.h b/ydb/core/kqp/proxy_service/kqp_script_executions.h index 2aaaa65f9c..84c6594d31 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions.h +++ b/ydb/core/kqp/proxy_service/kqp_script_executions.h @@ -1,6 +1,7 @@ #pragma once #include <ydb/core/kqp/common/events/script_executions.h> #include <ydb/core/kqp/common/kqp.h> +#include <ydb/core/kqp/common/kqp_timeouts.h> #include <ydb/library/yql/public/issue/yql_issue.h> #include <library/cpp/actors/core/actor.h> @@ -14,7 +15,7 @@ namespace NKikimr::NKqp { NActors::IActor* CreateScriptExecutionsTablesCreator(THolder<NActors::IEventBase> resultEvent); // Create script execution and run it. -NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPtr&& ev); +NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPtr&& ev, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, TDuration maxRunTime = SCRIPT_TIMEOUT_LIMIT); // Operation API impl. NActors::IActor* CreateForgetScriptExecutionOperationActor(TEvForgetScriptExecutionOperation::TPtr ev); diff --git a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp index 29c8795c85..37427c9372 100644 --- a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp +++ b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp @@ -30,8 +30,6 @@ namespace NKikimr::NKqp { namespace { -constexpr ui64 RESULT_SIZE_LIMIT = 10_MB; -constexpr int RESULT_ROWS_LIMIT = 1000; constexpr ui32 LEASE_UPDATE_FREQUENCY = 2; class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> { @@ -50,12 +48,13 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> { }; public: - TRunScriptActor(const TString& executionId, const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration, TDuration leaseDuration) + TRunScriptActor(const TString& executionId, const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration, TDuration leaseDuration, NKikimrConfig::TQueryServiceConfig&& queryServiceConfig) : ExecutionId(executionId) , Request(request) , Database(database) , LeaseGeneration(leaseGeneration) , LeaseDuration(leaseDuration) + , QueryServiceConfig(queryServiceConfig) {} static constexpr char ActorName[] = "KQP_RUN_SCRIPT_ACTOR"; @@ -220,7 +219,9 @@ private: } auto resp = MakeHolder<TEvKqpExecuter::TEvStreamDataAck>(); resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); - resp->Record.SetFreeSpace(RESULT_SIZE_LIMIT); + resp->Record.SetFreeSpace(QueryServiceConfig.GetScriptResultSizeLimit() + ? QueryServiceConfig.GetScriptResultSizeLimit() + : std::numeric_limits<ui64>::max()); LOG_D("Send stream data ack" << ", seqNo: " << ev->Get()->Record.GetSeqNo() @@ -252,13 +253,13 @@ private: std::vector<TString> serializedRows; for (const auto& row : ev->Get()->Record.GetResultSet().rows()) { - if (rowCount > RESULT_ROWS_LIMIT) { + if (QueryServiceConfig.GetScriptResultRowsLimit() && rowCount > QueryServiceConfig.GetScriptResultRowsLimit()) { Truncated[resultSetIndex] = true; break; } auto serializedSize = row.ByteSizeLong(); - if (byteCount + serializedSize > RESULT_SIZE_LIMIT) { + if (QueryServiceConfig.GetScriptResultSizeLimit() && byteCount + serializedSize > QueryServiceConfig.GetScriptResultSizeLimit()) { Truncated[resultSetIndex] = true; break; } @@ -476,6 +477,7 @@ private: const TString Database; const ui64 LeaseGeneration; const TDuration LeaseDuration; + const NKikimrConfig::TQueryServiceConfig QueryServiceConfig; TString SessionId; bool LeaseUpdateQueryRunning = false; bool FinalStatusIsSaved = false; @@ -504,8 +506,8 @@ private: } // namespace -NActors::IActor* CreateRunScriptActor(const TString& executionId, const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration, TDuration leaseDuration) { - return new TRunScriptActor(executionId, request, database, leaseGeneration, leaseDuration); +NActors::IActor* CreateRunScriptActor(const TString& executionId, const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration, TDuration leaseDuration, NKikimrConfig::TQueryServiceConfig queryServiceConfig) { + return new TRunScriptActor(executionId, request, database, leaseGeneration, leaseDuration, std::move(queryServiceConfig)); } } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.h b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.h index f87ba6ffd3..aeffbe6c49 100644 --- a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.h +++ b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.h @@ -1,6 +1,7 @@ #pragma once #include <ydb/core/protos/kqp.pb.h> +#include <ydb/core/base/appdata.h> #include <library/cpp/actors/core/actor.h> @@ -9,6 +10,6 @@ namespace NKikimr::NKqp { struct TEvKqpRunScriptActor { }; -NActors::IActor* CreateRunScriptActor(const TString& executionId, const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration, TDuration leaseDuration); +NActors::IActor* CreateRunScriptActor(const TString& executionId, const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration, TDuration leaseDuration, NKikimrConfig::TQueryServiceConfig queryServiceConfig); } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 4cc5e80927..21bdaafaaf 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -31,7 +31,8 @@ class TKqpQueryState : public TNonCopyable { public: TKqpQueryState(TEvKqp::TEvQueryRequest::TPtr& ev, ui64 queryId, const TString& database, const TString& cluster, TKqpDbCountersPtr dbCounters, bool longSession, - const NKikimrConfig::TTableServiceConfig& config, NWilson::TTraceId&& traceId) + const NKikimrConfig::TTableServiceConfig& tableServiceConfig, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, + NWilson::TTraceId&& traceId) : QueryId(queryId) , Database(database) , Cluster(cluster) @@ -55,7 +56,7 @@ public: } } - SetQueryDeadlines(config); + SetQueryDeadlines(tableServiceConfig, queryServiceConfig); auto action = GetAction(); KqpSessionSpan = NWilson::TSpan( TWilsonKqp::KqpSession, std::move(traceId), @@ -151,7 +152,7 @@ public: TempTablesState = std::make_shared<const TKqpTempTablesState>(tempTablesState); } - void SetQueryDeadlines(const NKikimrConfig::TTableServiceConfig& service) { + void SetQueryDeadlines(const NKikimrConfig::TTableServiceConfig& tableService, const NKikimrConfig::TQueryServiceConfig& queryService) { auto now = TAppData::TimeProvider->Now(); auto cancelAfter = RequestEv->GetCancelAfter(); auto timeout = RequestEv->GetOperationTimeout(); @@ -159,7 +160,7 @@ public: QueryDeadlines.CancelAt = now + cancelAfter; } - auto timeoutMs = GetQueryTimeout(GetType(), timeout.MilliSeconds(), service); + auto timeoutMs = GetQueryTimeout(GetType(), timeout.MilliSeconds(), tableService, queryService); QueryDeadlines.TimeoutAt = now + timeoutMs; } diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 75a6cd2ae4..c28d4d4620 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -214,7 +214,7 @@ public: ev->Get()->SetClientLostAction(selfId, as); QueryState = std::make_shared<TKqpQueryState>( ev, QueryId, Settings.Database, Settings.Cluster, Settings.DbCounters, Settings.LongSession, - Settings.Service, std::move(id)); + Settings.TableService, Settings.QueryService, std::move(id)); } void ForwardRequest(TEvKqp::TEvQueryRequest::TPtr& ev) { @@ -1045,8 +1045,8 @@ public: auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database, QueryState ? QueryState->UserToken : TIntrusiveConstPtr<NACLib::TUserToken>(), - RequestCounters, Settings.Service.GetAggregationConfig(), Settings.Service.GetExecuterRetriesConfig(), - AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, Settings.Service.GetChannelTransportVersion(), SelfId()); + RequestCounters, Settings.TableService.GetAggregationConfig(), Settings.TableService.GetExecuterRetriesConfig(), + AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, Settings.TableService.GetChannelTransportVersion(), SelfId()); auto exId = RegisterWithSameMailbox(executerActor); LOG_D("Created new KQP executer: " << exId << " isRollback: " << isRollback); @@ -1525,7 +1525,7 @@ public: void ReplyBusy(TEvKqp::TEvQueryRequest::TPtr& ev) { ui64 proxyRequestId = ev->Cookie; - auto busyStatus = Settings.Service.GetUseSessionBusyStatus() + auto busyStatus = Settings.TableService.GetUseSessionBusyStatus() ? Ydb::StatusIds::SESSION_BUSY : Ydb::StatusIds::PRECONDITION_FAILED; diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.h b/ydb/core/kqp/session_actor/kqp_session_actor.h index 1478b9aae9..9015272288 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.h +++ b/ydb/core/kqp/session_actor/kqp_session_actor.h @@ -17,15 +17,19 @@ struct TKqpWorkerSettings { TString Database; bool LongSession = false; - NKikimrConfig::TTableServiceConfig Service; + NKikimrConfig::TTableServiceConfig TableService; + NKikimrConfig::TQueryServiceConfig QueryService; TKqpDbCountersPtr DbCounters; TKqpWorkerSettings(const TString& cluster, const TString& database, - const NKikimrConfig::TTableServiceConfig& serviceConfig, TKqpDbCountersPtr dbCounters) + const NKikimrConfig::TTableServiceConfig& tableServiceConfig, + const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, + TKqpDbCountersPtr dbCounters) : Cluster(cluster) , Database(database) - , Service(serviceConfig) + , TableService(tableServiceConfig) + , QueryService(queryServiceConfig) , DbCounters(dbCounters) {} }; diff --git a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp index e17ab10cdc..3fe99ef21f 100644 --- a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp @@ -119,7 +119,7 @@ public: Config->_KqpTablePathPrefix = Settings.Database; } - ApplyServiceConfig(*Config, Settings.Service); + ApplyServiceConfig(*Config, Settings.TableService); Config->FreezeDefaults(); @@ -203,7 +203,7 @@ public: QueryState->QueryDeadlines.CancelAt = now + QueryState->RequestEv->GetCancelAfter(); } - auto timeoutMs = GetQueryTimeout(QueryState->RequestEv->GetType(), QueryState->RequestEv->GetOperationTimeout().MilliSeconds(), Settings.Service); + auto timeoutMs = GetQueryTimeout(QueryState->RequestEv->GetType(), QueryState->RequestEv->GetOperationTimeout().MilliSeconds(), Settings.TableService, Settings.QueryService); QueryState->QueryDeadlines.TimeoutAt = now + timeoutMs; auto onError = [this, &ctx] (Ydb::StatusIds::StatusCode status, const TString& message) { @@ -329,7 +329,7 @@ public: if (CleanupState->Final) { ReplyProcessError(ev->Sender, proxyRequestId, Ydb::StatusIds::BAD_SESSION, "Session is being closed"); } else { - auto busyStatus = Settings.Service.GetUseSessionBusyStatus() + auto busyStatus = Settings.TableService.GetUseSessionBusyStatus() ? Ydb::StatusIds::SESSION_BUSY : Ydb::StatusIds::PRECONDITION_FAILED; @@ -874,7 +874,7 @@ private: return; } - auto busyStatus = Settings.Service.GetUseSessionBusyStatus() + auto busyStatus = Settings.TableService.GetUseSessionBusyStatus() ? Ydb::StatusIds::SESSION_BUSY : Ydb::StatusIds::PRECONDITION_FAILED; @@ -1014,7 +1014,7 @@ private: } static TKikimrQueryLimits GetQueryLimits(const TKqpWorkerSettings& settings) { - const auto& queryLimitsProto = settings.Service.GetQueryLimits(); + const auto& queryLimitsProto = settings.TableService.GetQueryLimits(); const auto& phaseLimitsProto = queryLimitsProto.GetPhaseLimits(); TKikimrQueryLimits queryLimits; diff --git a/ydb/core/kqp/session_actor/kqp_worker_common.cpp b/ydb/core/kqp/session_actor/kqp_worker_common.cpp index 901fed55c0..2237192d18 100644 --- a/ydb/core/kqp/session_actor/kqp_worker_common.cpp +++ b/ydb/core/kqp/session_actor/kqp_worker_common.cpp @@ -67,7 +67,7 @@ bool IsSameProtoTypeImpl(const NKikimrMiniKQL::TVariantType& actual, const NKiki } // namespace TKikimrQueryLimits GetQueryLimits(const TKqpWorkerSettings& settings) { - const auto& queryLimitsProto = settings.Service.GetQueryLimits(); + const auto& queryLimitsProto = settings.TableService.GetQueryLimits(); const auto& phaseLimitsProto = queryLimitsProto.GetPhaseLimits(); TKikimrQueryLimits queryLimits; diff --git a/ydb/core/kqp/session_actor/kqp_worker_common.h b/ydb/core/kqp/session_actor/kqp_worker_common.h index ad32008494..1fecdefbcc 100644 --- a/ydb/core/kqp/session_actor/kqp_worker_common.h +++ b/ydb/core/kqp/session_actor/kqp_worker_common.h @@ -89,7 +89,7 @@ inline TIntrusivePtr<NYql::TKikimrConfiguration> CreateConfig(const TKqpSettings cfg->_KqpTablePathPrefix = workerSettings.Database; } - ApplyServiceConfig(*cfg, workerSettings.Service); + ApplyServiceConfig(*cfg, workerSettings.TableService); cfg->FreezeDefaults(); return cfg; diff --git a/ydb/core/protos/CMakeLists.darwin-x86_64.txt b/ydb/core/protos/CMakeLists.darwin-x86_64.txt index 4e2496f3b6..88fdf1dbaf 100644 --- a/ydb/core/protos/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/protos/CMakeLists.darwin-x86_64.txt @@ -26,6 +26,7 @@ target_link_libraries(ydb-core-protos PUBLIC core-issue-protos dq-actors-protos yql-dq-proto + providers-common-proto public-issue-protos yql-public-types ydb-library-services diff --git a/ydb/core/protos/CMakeLists.linux-aarch64.txt b/ydb/core/protos/CMakeLists.linux-aarch64.txt index ec9fe6c704..5d7ad322df 100644 --- a/ydb/core/protos/CMakeLists.linux-aarch64.txt +++ b/ydb/core/protos/CMakeLists.linux-aarch64.txt @@ -27,6 +27,7 @@ target_link_libraries(ydb-core-protos PUBLIC core-issue-protos dq-actors-protos yql-dq-proto + providers-common-proto public-issue-protos yql-public-types ydb-library-services diff --git a/ydb/core/protos/CMakeLists.linux-x86_64.txt b/ydb/core/protos/CMakeLists.linux-x86_64.txt index ec9fe6c704..5d7ad322df 100644 --- a/ydb/core/protos/CMakeLists.linux-x86_64.txt +++ b/ydb/core/protos/CMakeLists.linux-x86_64.txt @@ -27,6 +27,7 @@ target_link_libraries(ydb-core-protos PUBLIC core-issue-protos dq-actors-protos yql-dq-proto + providers-common-proto public-issue-protos yql-public-types ydb-library-services diff --git a/ydb/core/protos/CMakeLists.windows-x86_64.txt b/ydb/core/protos/CMakeLists.windows-x86_64.txt index 4e2496f3b6..88fdf1dbaf 100644 --- a/ydb/core/protos/CMakeLists.windows-x86_64.txt +++ b/ydb/core/protos/CMakeLists.windows-x86_64.txt @@ -26,6 +26,7 @@ target_link_libraries(ydb-core-protos PUBLIC core-issue-protos dq-actors-protos yql-dq-proto + providers-common-proto public-issue-protos yql-public-types ydb-library-services diff --git a/ydb/core/protos/ya.make b/ydb/core/protos/ya.make index b1ef42be64..7736817dbc 100644 --- a/ydb/core/protos/ya.make +++ b/ydb/core/protos/ya.make @@ -148,6 +148,7 @@ PEERDIR( ydb/library/yql/core/issue/protos ydb/library/yql/dq/actors/protos ydb/library/yql/dq/proto + ydb/library/yql/providers/common/proto ydb/library/yql/public/issue/protos ydb/library/yql/public/types ydb/library/services diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index c53a12b2c2..904f24f6b6 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -818,6 +818,7 @@ namespace Tests { IActor* kqpProxyService = NKqp::CreateKqpProxyService(Settings->AppConfig.GetLogConfig(), Settings->AppConfig.GetTableServiceConfig(), Settings->AppConfig.GetAuthConfig().GetTokenAccessorConfig(), + Settings->AppConfig.GetQueryServiceConfig(), TVector<NKikimrKqp::TKqpSetting>(Settings->KqpSettings), nullptr, std::move(kqpProxySharedResources)); TActorId kqpProxyServiceId = Runtime->Register(kqpProxyService, nodeIdx); |