aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorandrewproni <andrewproni@yandex-team.com>2023-08-30 19:37:26 +0300
committerandrewproni <andrewproni@yandex-team.com>2023-08-30 19:54:14 +0300
commite6a04952dc423a4290e2c3b2bc9b5faff1c3a214 (patch)
treead7c9fd16c95121d6a1cca35a1fcc50f856f0417
parent5205b9dff65d60847cdbfd4479b9b8462c33d148 (diff)
downloadydb-e6a04952dc423a4290e2c3b2bc9b5faff1c3a214.tar.gz
KIKIMR-19038: config support for QueryService
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp2
-rw-r--r--ydb/core/kqp/common/kqp_timeouts.cpp17
-rw-r--r--ydb/core/kqp/common/kqp_timeouts.h7
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.cpp18
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.h1
-rw-r--r--ydb/core/kqp/proxy_service/kqp_script_executions.cpp57
-rw-r--r--ydb/core/kqp/proxy_service/kqp_script_executions.h3
-rw-r--r--ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp18
-rw-r--r--ydb/core/kqp/run_script_actor/kqp_run_script_actor.h3
-rw-r--r--ydb/core/kqp/session_actor/kqp_query_state.h9
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp8
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.h10
-rw-r--r--ydb/core/kqp/session_actor/kqp_worker_actor.cpp10
-rw-r--r--ydb/core/kqp/session_actor/kqp_worker_common.cpp2
-rw-r--r--ydb/core/kqp/session_actor/kqp_worker_common.h2
-rw-r--r--ydb/core/protos/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/protos/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/protos/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/protos/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/protos/ya.make1
-rw-r--r--ydb/core/testlib/test_client.cpp1
21 files changed, 113 insertions, 60 deletions
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index 161733d738..607364d78e 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -2104,7 +2104,7 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu
new NYql::NLog::TTlsLogBackend(new TNullLogBackend())));
auto proxy = NKqp::CreateKqpProxyService(Config.GetLogConfig(), Config.GetTableServiceConfig(), Config.GetAuthConfig().GetTokenAccessorConfig(),
- std::move(settings), Factories->QueryReplayBackendFactory, std::move(kqpProxySharedResources));
+ Config.GetQueryServiceConfig(), std::move(settings), Factories->QueryReplayBackendFactory, std::move(kqpProxySharedResources));
setup->LocalServices.push_back(std::make_pair(
NKqp::MakeKqpProxyID(NodeId),
TActorSetupCmd(proxy, TMailboxType::HTSwap, appData->UserPoolId)));
diff --git a/ydb/core/kqp/common/kqp_timeouts.cpp b/ydb/core/kqp/common/kqp_timeouts.cpp
index 7124e11943..66f8ba5103 100644
--- a/ydb/core/kqp/common/kqp_timeouts.cpp
+++ b/ydb/core/kqp/common/kqp_timeouts.cpp
@@ -5,7 +5,9 @@ namespace NKikimr::NKqp {
namespace {
-ui64 GetDefaultQueryTimeoutMs(NKikimrKqp::EQueryType queryType, const NKikimrConfig::TTableServiceConfig& tableServiceConfig) {
+ui64 GetDefaultQueryTimeoutMs(NKikimrKqp::EQueryType queryType,
+ const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
+ const NKikimrConfig::TQueryServiceConfig& queryServiceConfig) {
const auto& queryLimits = tableServiceConfig.GetQueryLimits();
switch (queryType) {
@@ -17,9 +19,13 @@ ui64 GetDefaultQueryTimeoutMs(NKikimrKqp::EQueryType queryType, const NKikimrCon
case NKikimrKqp::QUERY_TYPE_AST_DML:
case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY:
case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY:
- case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT:
return queryLimits.GetDataQueryTimeoutMs();
+ case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT:
+ return queryServiceConfig.GetScriptOperationTimeoutDefaultSeconds()
+ ? queryServiceConfig.GetScriptOperationTimeoutDefaultSeconds()
+ : SCRIPT_TIMEOUT_LIMIT.MilliSeconds();
+
case NKikimrKqp::QUERY_TYPE_SQL_SCAN:
case NKikimrKqp::QUERY_TYPE_AST_SCAN:
return queryLimits.GetScanQueryTimeoutMs();
@@ -31,8 +37,11 @@ ui64 GetDefaultQueryTimeoutMs(NKikimrKqp::EQueryType queryType, const NKikimrCon
}
-TDuration GetQueryTimeout(NKikimrKqp::EQueryType queryType, ui64 timeoutMs, const NKikimrConfig::TTableServiceConfig& tableServiceConfig) {
- ui64 defaultTimeoutMs = GetDefaultQueryTimeoutMs(queryType, tableServiceConfig);
+TDuration GetQueryTimeout(NKikimrKqp::EQueryType queryType,
+ ui64 timeoutMs,
+ const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
+ const NKikimrConfig::TQueryServiceConfig& queryServiceConfig) {
+ ui64 defaultTimeoutMs = GetDefaultQueryTimeoutMs(queryType, tableServiceConfig, queryServiceConfig);
return timeoutMs
diff --git a/ydb/core/kqp/common/kqp_timeouts.h b/ydb/core/kqp/common/kqp_timeouts.h
index 1a710d49d1..00cd245e29 100644
--- a/ydb/core/kqp/common/kqp_timeouts.h
+++ b/ydb/core/kqp/common/kqp_timeouts.h
@@ -6,6 +6,11 @@
namespace NKikimr::NKqp {
-TDuration GetQueryTimeout(NKikimrKqp::EQueryType queryType, ui64 timeoutMs, const NKikimrConfig::TTableServiceConfig& tableServiceConfig);
+constexpr TDuration SCRIPT_TIMEOUT_LIMIT = TDuration::Days(365);
+
+TDuration GetQueryTimeout(NKikimrKqp::EQueryType queryType,
+ ui64 timeoutMs,
+ const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
+ const NKikimrConfig::TQueryServiceConfig& queryServiceConfig);
}
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
index 053556b262..9451e3193e 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
@@ -170,6 +170,7 @@ public:
TKqpProxyService(const NKikimrConfig::TLogConfig& logConfig,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
const NKikimrProto::TTokenAccessorConfig& tokenAccessorConfig,
+ const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
TVector<NKikimrKqp::TKqpSetting>&& settings,
std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory,
std::shared_ptr<TKqpProxySharedResources>&& kqpProxySharedResources)
@@ -177,6 +178,7 @@ public:
, LogConfig(logConfig)
, TableServiceConfig(tableServiceConfig)
, TokenAccessorConfig(tokenAccessorConfig)
+ , QueryServiceConfig(queryServiceConfig)
, KqpSettings(std::make_shared<const TKqpSettings>(std::move(settings)))
, QueryReplayFactory(std::move(queryReplayFactory))
, HttpGateway(NYql::IHTTPGateway::Make(&HttpGatewayConfig)) // TODO: pass config and counters
@@ -645,7 +647,7 @@ public:
auto cancelAfter = ev->Get()->GetCancelAfter();
auto timeout = ev->Get()->GetOperationTimeout();
- auto timerDuration = GetQueryTimeout(queryType, timeout.MilliSeconds(), TableServiceConfig);
+ auto timerDuration = GetQueryTimeout(queryType, timeout.MilliSeconds(), TableServiceConfig, QueryServiceConfig);
if (cancelAfter) {
timerDuration = Min(timerDuration, cancelAfter);
}
@@ -659,7 +661,13 @@ public:
void Handle(TEvKqp::TEvScriptRequest::TPtr& ev) {
if (CheckScriptExecutionsTablesReady<TEvKqp::TEvScriptResponse>(ev)) {
- Register(CreateScriptExecutionCreatorActor(std::move(ev)), TMailboxType::HTSwap, AppData()->SystemPoolId);
+ auto req = ev->Get()->Record.MutableRequest();
+ auto maxRunTime = GetQueryTimeout(req->GetType(), req->GetTimeoutMs(), TableServiceConfig, QueryServiceConfig);
+ req->SetTimeoutMs(maxRunTime.MilliSeconds());
+ if (req->GetCancelAfterMs()) {
+ maxRunTime = TDuration::MilliSeconds(Min(req->GetCancelAfterMs(), maxRunTime.MilliSeconds()));
+ }
+ Register(CreateScriptExecutionCreatorActor(std::move(ev), QueryServiceConfig, maxRunTime), TMailboxType::HTSwap, AppData()->SystemPoolId);
}
}
@@ -1350,7 +1358,7 @@ private:
auto dbCounters = Counters->GetDbCounters(database);
- TKqpWorkerSettings workerSettings(cluster, database, TableServiceConfig, dbCounters);
+ TKqpWorkerSettings workerSettings(cluster, database, TableServiceConfig, QueryServiceConfig, dbCounters);
workerSettings.LongSession = longSession;
auto config = CreateConfig(KqpSettings, workerSettings);
@@ -1550,6 +1558,7 @@ private:
NKikimrConfig::TLogConfig LogConfig;
NKikimrConfig::TTableServiceConfig TableServiceConfig;
NKikimrProto::TTokenAccessorConfig TokenAccessorConfig;
+ NKikimrConfig::TQueryServiceConfig QueryServiceConfig;
TKqpSettings::TConstPtr KqpSettings;
NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory;
std::shared_ptr<IQueryReplayBackendFactory> QueryReplayFactory;
@@ -1603,11 +1612,12 @@ private:
IActor* CreateKqpProxyService(const NKikimrConfig::TLogConfig& logConfig,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
const NKikimrProto::TTokenAccessorConfig& tokenAccessorConfig,
+ const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
TVector<NKikimrKqp::TKqpSetting>&& settings,
std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory,
std::shared_ptr<TKqpProxySharedResources> kqpProxySharedResources)
{
- return new TKqpProxyService(logConfig, tableServiceConfig, tokenAccessorConfig, std::move(settings),
+ return new TKqpProxyService(logConfig, tableServiceConfig, tokenAccessorConfig, queryServiceConfig, std::move(settings),
std::move(queryReplayFactory),std::move(kqpProxySharedResources));
}
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.h b/ydb/core/kqp/proxy_service/kqp_proxy_service.h
index 868bab3975..c2ac2de2c8 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.h
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.h
@@ -50,6 +50,7 @@ TPeerStats CalcPeerStats(const TVector<NKikimrKqp::TKqpProxyNodeResources>& data
IActor* CreateKqpProxyService(const NKikimrConfig::TLogConfig& logConfig,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
const NKikimrProto::TTokenAccessorConfig& tokenAccessorConfig,
+ const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
TVector<NKikimrKqp::TKqpSetting>&& settings,
std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory,
std::shared_ptr<TKqpProxySharedResources> kqpProxySharedResources);
diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
index d226a491d9..f33e4d100e 100644
--- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
@@ -44,8 +44,6 @@ namespace {
constexpr TDuration LEASE_DURATION = TDuration::Seconds(30);
constexpr TDuration DEADLINE_OFFSET = TDuration::Minutes(20);
constexpr TDuration BRO_RUN_INTERVAL = TDuration::Minutes(60);
-constexpr TDuration DEFAULT_OPERATION_TTL = TDuration::Days(1);
-constexpr TDuration DEFAULT_RESULTS_TTL = TDuration::Days(1);
TString SerializeIssues(const NYql::TIssues& issues) {
NYql::TIssue root;
@@ -229,14 +227,16 @@ Ydb::Query::ExecMode GetExecModeFromAction(NKikimrKqp::EQueryAction action) {
class TCreateScriptOperationQuery : public TQueryBase {
public:
- TCreateScriptOperationQuery(const TString& executionId, const NActors::TActorId& runScriptActorId, const NKikimrKqp::TEvQueryRequest& req, TDuration operationTtl, TDuration resultsTtl, TDuration leaseDuration = TDuration::Zero())
+ TCreateScriptOperationQuery(const TString& executionId, const NActors::TActorId& runScriptActorId, const NKikimrKqp::TEvQueryRequest& req, TDuration operationTtl, TDuration resultsTtl, TDuration leaseDuration = TDuration::Zero(), TDuration maxRunTime = SCRIPT_TIMEOUT_LIMIT)
: ExecutionId(executionId)
, RunScriptActorId(runScriptActorId)
, Request(req)
, OperationTtl(operationTtl)
, ResultsTtl(resultsTtl)
, LeaseDuration(leaseDuration ? leaseDuration : LEASE_DURATION)
+ , MaxRunTime(Max(maxRunTime, TDuration::Days(1)))
{
+ Y_ENSURE(MaxRunTime);
}
void OnRunQuery() override {
@@ -251,10 +251,11 @@ public:
DECLARE $syntax AS Int32;
DECLARE $meta AS JsonDocument;
DECLARE $lease_duration AS Interval;
+ DECLARE $max_run_time AS Interval;
UPSERT INTO `.metadata/script_executions`
- (database, execution_id, run_script_actor_id, execution_status, execution_mode, start_ts, query_text, syntax, meta)
- VALUES ($database, $execution_id, $run_script_actor_id, $execution_status, $execution_mode, CurrentUtcTimestamp(), $query_text, $syntax, $meta);
+ (database, execution_id, run_script_actor_id, execution_status, execution_mode, start_ts, query_text, syntax, meta, expire_at)
+ VALUES ($database, $execution_id, $run_script_actor_id, $execution_status, $execution_mode, CurrentUtcTimestamp(), $query_text, $syntax, $meta, CurrentUtcTimestamp() + $max_run_time);
UPSERT INTO `.metadata/script_execution_leases`
(database, execution_id, lease_deadline, lease_generation)
@@ -293,6 +294,9 @@ public:
.Build()
.AddParam("$lease_duration")
.Interval(static_cast<i64>(LeaseDuration.MicroSeconds()))
+ .Build()
+ .AddParam("$max_run_time")
+ .Interval(static_cast<i64>(MaxRunTime.MicroSeconds()))
.Build();
RunDataQuery(sql, &params);
@@ -318,12 +322,15 @@ private:
const TDuration OperationTtl;
const TDuration ResultsTtl;
const TDuration LeaseDuration;
+ const TDuration MaxRunTime;
};
struct TCreateScriptExecutionActor : public TActorBootstrapped<TCreateScriptExecutionActor> {
- TCreateScriptExecutionActor(TEvKqp::TEvScriptRequest::TPtr&& ev, TDuration leaseDuration = TDuration::Zero())
+ TCreateScriptExecutionActor(TEvKqp::TEvScriptRequest::TPtr&& ev, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, TDuration maxRunTime = SCRIPT_TIMEOUT_LIMIT, TDuration leaseDuration = TDuration::Zero())
: Event(std::move(ev))
+ , QueryServiceConfig(queryServiceConfig)
, LeaseDuration(leaseDuration ? leaseDuration : LEASE_DURATION)
+ , MaxRunTime(maxRunTime)
{
}
@@ -331,12 +338,16 @@ struct TCreateScriptExecutionActor : public TActorBootstrapped<TCreateScriptExec
Become(&TCreateScriptExecutionActor::StateFunc);
ExecutionId = CreateGuidAsString();
- auto operationTtl = Event->Get()->ForgetAfter ? Event->Get()->ForgetAfter : DEFAULT_OPERATION_TTL;
- auto resultsTtl = Event->Get()->ResultsTtl ? Event->Get()->ResultsTtl : DEFAULT_RESULTS_TTL;
- resultsTtl = Min(operationTtl, resultsTtl);
+
+ auto operationTtl = Event->Get()->ForgetAfter ? Event->Get()->ForgetAfter : TDuration::Seconds(QueryServiceConfig.GetScriptForgetAfterDefaultSeconds());
+ auto resultsTtl = Event->Get()->ResultsTtl ? Event->Get()->ResultsTtl : TDuration::Seconds(QueryServiceConfig.GetScriptResultsTtlDefaultSeconds());
+ if (operationTtl) {
+ resultsTtl = Min(operationTtl, resultsTtl);
+ }
+
// Start request
- RunScriptActorId = Register(CreateRunScriptActor(ExecutionId, Event->Get()->Record, Event->Get()->Record.GetRequest().GetDatabase(), 1, LeaseDuration));
- Register(new TCreateScriptOperationQuery(ExecutionId, RunScriptActorId, Event->Get()->Record, operationTtl, resultsTtl, LeaseDuration));
+ RunScriptActorId = Register(CreateRunScriptActor(ExecutionId, Event->Get()->Record, Event->Get()->Record.GetRequest().GetDatabase(), 1, LeaseDuration, QueryServiceConfig));
+ Register(new TCreateScriptOperationQuery(ExecutionId, RunScriptActorId, Event->Get()->Record, operationTtl, resultsTtl, LeaseDuration, MaxRunTime));
}
void Handle(TEvPrivate::TEvCreateScriptOperationResponse::TPtr& ev) {
@@ -355,9 +366,11 @@ struct TCreateScriptExecutionActor : public TActorBootstrapped<TCreateScriptExec
private:
TEvKqp::TEvScriptRequest::TPtr Event;
+ const NKikimrConfig::TQueryServiceConfig QueryServiceConfig;
TString ExecutionId;
NActors::TActorId RunScriptActorId;
- TDuration LeaseDuration;
+ const TDuration LeaseDuration;
+ const TDuration MaxRunTime;
};
class TScriptLeaseUpdater : public TQueryBase {
@@ -566,7 +579,7 @@ public:
end_ts = CurrentUtcTimestamp(),
stats = $stats,
ast = $ast,
- expire_at = CurrentUtcTimestamp() + $operation_ttl
+ expire_at = IF($operation_ttl > CAST(0 AS Interval), CurrentUtcTimestamp() + $operation_ttl, NULL)
WHERE database = $database AND execution_id = $execution_id;
DELETE FROM `.metadata/script_execution_leases`
@@ -574,7 +587,7 @@ public:
UPDATE `.metadata/result_sets`
SET
- expire_at = CurrentUtcTimestamp() + $results_ttl
+ expire_at = IF($results_ttl > CAST(0 AS Interval), CurrentUtcTimestamp() + $results_ttl, NULL)
where database = $database AND execution_id = $execution_id;
)";
@@ -1948,12 +1961,12 @@ public:
}
const auto ttl = GetTtlFromSerializedMeta(*serializedMeta);
- if (!ttl) {
- Finish(Ydb::StatusIds::INTERNAL_ERROR, "Metainformation is corrupted");
- return;
- }
- const auto [_, resultsTtl] = *ttl;
- if ((*endTs + resultsTtl) < TInstant::Now()){
+ if (!ttl) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Metainformation is corrupted");
+ return;
+ }
+ const auto [_, resultsTtl] = *ttl;
+ if (resultsTtl && (*endTs + resultsTtl) < TInstant::Now()){
Finish(Ydb::StatusIds::NOT_FOUND, "Results are expired");
return;
}
@@ -2068,8 +2081,8 @@ private:
} // anonymous namespace
-NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPtr&& ev) {
- return new TCreateScriptExecutionActor(std::move(ev));
+NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPtr&& ev, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, TDuration maxRunTime) {
+ return new TCreateScriptExecutionActor(std::move(ev), queryServiceConfig, maxRunTime);
}
NActors::IActor* CreateScriptExecutionsTablesCreator(THolder<NActors::IEventBase> resultEvent) {
diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.h b/ydb/core/kqp/proxy_service/kqp_script_executions.h
index 2aaaa65f9c..84c6594d31 100644
--- a/ydb/core/kqp/proxy_service/kqp_script_executions.h
+++ b/ydb/core/kqp/proxy_service/kqp_script_executions.h
@@ -1,6 +1,7 @@
#pragma once
#include <ydb/core/kqp/common/events/script_executions.h>
#include <ydb/core/kqp/common/kqp.h>
+#include <ydb/core/kqp/common/kqp_timeouts.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
#include <library/cpp/actors/core/actor.h>
@@ -14,7 +15,7 @@ namespace NKikimr::NKqp {
NActors::IActor* CreateScriptExecutionsTablesCreator(THolder<NActors::IEventBase> resultEvent);
// Create script execution and run it.
-NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPtr&& ev);
+NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPtr&& ev, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, TDuration maxRunTime = SCRIPT_TIMEOUT_LIMIT);
// Operation API impl.
NActors::IActor* CreateForgetScriptExecutionOperationActor(TEvForgetScriptExecutionOperation::TPtr ev);
diff --git a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
index 29c8795c85..37427c9372 100644
--- a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
+++ b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
@@ -30,8 +30,6 @@ namespace NKikimr::NKqp {
namespace {
-constexpr ui64 RESULT_SIZE_LIMIT = 10_MB;
-constexpr int RESULT_ROWS_LIMIT = 1000;
constexpr ui32 LEASE_UPDATE_FREQUENCY = 2;
class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
@@ -50,12 +48,13 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
};
public:
- TRunScriptActor(const TString& executionId, const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration, TDuration leaseDuration)
+ TRunScriptActor(const TString& executionId, const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration, TDuration leaseDuration, NKikimrConfig::TQueryServiceConfig&& queryServiceConfig)
: ExecutionId(executionId)
, Request(request)
, Database(database)
, LeaseGeneration(leaseGeneration)
, LeaseDuration(leaseDuration)
+ , QueryServiceConfig(queryServiceConfig)
{}
static constexpr char ActorName[] = "KQP_RUN_SCRIPT_ACTOR";
@@ -220,7 +219,9 @@ private:
}
auto resp = MakeHolder<TEvKqpExecuter::TEvStreamDataAck>();
resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
- resp->Record.SetFreeSpace(RESULT_SIZE_LIMIT);
+ resp->Record.SetFreeSpace(QueryServiceConfig.GetScriptResultSizeLimit()
+ ? QueryServiceConfig.GetScriptResultSizeLimit()
+ : std::numeric_limits<ui64>::max());
LOG_D("Send stream data ack"
<< ", seqNo: " << ev->Get()->Record.GetSeqNo()
@@ -252,13 +253,13 @@ private:
std::vector<TString> serializedRows;
for (const auto& row : ev->Get()->Record.GetResultSet().rows()) {
- if (rowCount > RESULT_ROWS_LIMIT) {
+ if (QueryServiceConfig.GetScriptResultRowsLimit() && rowCount > QueryServiceConfig.GetScriptResultRowsLimit()) {
Truncated[resultSetIndex] = true;
break;
}
auto serializedSize = row.ByteSizeLong();
- if (byteCount + serializedSize > RESULT_SIZE_LIMIT) {
+ if (QueryServiceConfig.GetScriptResultSizeLimit() && byteCount + serializedSize > QueryServiceConfig.GetScriptResultSizeLimit()) {
Truncated[resultSetIndex] = true;
break;
}
@@ -476,6 +477,7 @@ private:
const TString Database;
const ui64 LeaseGeneration;
const TDuration LeaseDuration;
+ const NKikimrConfig::TQueryServiceConfig QueryServiceConfig;
TString SessionId;
bool LeaseUpdateQueryRunning = false;
bool FinalStatusIsSaved = false;
@@ -504,8 +506,8 @@ private:
} // namespace
-NActors::IActor* CreateRunScriptActor(const TString& executionId, const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration, TDuration leaseDuration) {
- return new TRunScriptActor(executionId, request, database, leaseGeneration, leaseDuration);
+NActors::IActor* CreateRunScriptActor(const TString& executionId, const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration, TDuration leaseDuration, NKikimrConfig::TQueryServiceConfig queryServiceConfig) {
+ return new TRunScriptActor(executionId, request, database, leaseGeneration, leaseDuration, std::move(queryServiceConfig));
}
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.h b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.h
index f87ba6ffd3..aeffbe6c49 100644
--- a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.h
+++ b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.h
@@ -1,6 +1,7 @@
#pragma once
#include <ydb/core/protos/kqp.pb.h>
+#include <ydb/core/base/appdata.h>
#include <library/cpp/actors/core/actor.h>
@@ -9,6 +10,6 @@ namespace NKikimr::NKqp {
struct TEvKqpRunScriptActor {
};
-NActors::IActor* CreateRunScriptActor(const TString& executionId, const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration, TDuration leaseDuration);
+NActors::IActor* CreateRunScriptActor(const TString& executionId, const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration, TDuration leaseDuration, NKikimrConfig::TQueryServiceConfig queryServiceConfig);
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h
index 4cc5e80927..21bdaafaaf 100644
--- a/ydb/core/kqp/session_actor/kqp_query_state.h
+++ b/ydb/core/kqp/session_actor/kqp_query_state.h
@@ -31,7 +31,8 @@ class TKqpQueryState : public TNonCopyable {
public:
TKqpQueryState(TEvKqp::TEvQueryRequest::TPtr& ev, ui64 queryId, const TString& database,
const TString& cluster, TKqpDbCountersPtr dbCounters, bool longSession,
- const NKikimrConfig::TTableServiceConfig& config, NWilson::TTraceId&& traceId)
+ const NKikimrConfig::TTableServiceConfig& tableServiceConfig, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
+ NWilson::TTraceId&& traceId)
: QueryId(queryId)
, Database(database)
, Cluster(cluster)
@@ -55,7 +56,7 @@ public:
}
}
- SetQueryDeadlines(config);
+ SetQueryDeadlines(tableServiceConfig, queryServiceConfig);
auto action = GetAction();
KqpSessionSpan = NWilson::TSpan(
TWilsonKqp::KqpSession, std::move(traceId),
@@ -151,7 +152,7 @@ public:
TempTablesState = std::make_shared<const TKqpTempTablesState>(tempTablesState);
}
- void SetQueryDeadlines(const NKikimrConfig::TTableServiceConfig& service) {
+ void SetQueryDeadlines(const NKikimrConfig::TTableServiceConfig& tableService, const NKikimrConfig::TQueryServiceConfig& queryService) {
auto now = TAppData::TimeProvider->Now();
auto cancelAfter = RequestEv->GetCancelAfter();
auto timeout = RequestEv->GetOperationTimeout();
@@ -159,7 +160,7 @@ public:
QueryDeadlines.CancelAt = now + cancelAfter;
}
- auto timeoutMs = GetQueryTimeout(GetType(), timeout.MilliSeconds(), service);
+ auto timeoutMs = GetQueryTimeout(GetType(), timeout.MilliSeconds(), tableService, queryService);
QueryDeadlines.TimeoutAt = now + timeoutMs;
}
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 75a6cd2ae4..c28d4d4620 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -214,7 +214,7 @@ public:
ev->Get()->SetClientLostAction(selfId, as);
QueryState = std::make_shared<TKqpQueryState>(
ev, QueryId, Settings.Database, Settings.Cluster, Settings.DbCounters, Settings.LongSession,
- Settings.Service, std::move(id));
+ Settings.TableService, Settings.QueryService, std::move(id));
}
void ForwardRequest(TEvKqp::TEvQueryRequest::TPtr& ev) {
@@ -1045,8 +1045,8 @@ public:
auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database,
QueryState ? QueryState->UserToken : TIntrusiveConstPtr<NACLib::TUserToken>(),
- RequestCounters, Settings.Service.GetAggregationConfig(), Settings.Service.GetExecuterRetriesConfig(),
- AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, Settings.Service.GetChannelTransportVersion(), SelfId());
+ RequestCounters, Settings.TableService.GetAggregationConfig(), Settings.TableService.GetExecuterRetriesConfig(),
+ AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, Settings.TableService.GetChannelTransportVersion(), SelfId());
auto exId = RegisterWithSameMailbox(executerActor);
LOG_D("Created new KQP executer: " << exId << " isRollback: " << isRollback);
@@ -1525,7 +1525,7 @@ public:
void ReplyBusy(TEvKqp::TEvQueryRequest::TPtr& ev) {
ui64 proxyRequestId = ev->Cookie;
- auto busyStatus = Settings.Service.GetUseSessionBusyStatus()
+ auto busyStatus = Settings.TableService.GetUseSessionBusyStatus()
? Ydb::StatusIds::SESSION_BUSY
: Ydb::StatusIds::PRECONDITION_FAILED;
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.h b/ydb/core/kqp/session_actor/kqp_session_actor.h
index 1478b9aae9..9015272288 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.h
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.h
@@ -17,15 +17,19 @@ struct TKqpWorkerSettings {
TString Database;
bool LongSession = false;
- NKikimrConfig::TTableServiceConfig Service;
+ NKikimrConfig::TTableServiceConfig TableService;
+ NKikimrConfig::TQueryServiceConfig QueryService;
TKqpDbCountersPtr DbCounters;
TKqpWorkerSettings(const TString& cluster, const TString& database,
- const NKikimrConfig::TTableServiceConfig& serviceConfig, TKqpDbCountersPtr dbCounters)
+ const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
+ const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
+ TKqpDbCountersPtr dbCounters)
: Cluster(cluster)
, Database(database)
- , Service(serviceConfig)
+ , TableService(tableServiceConfig)
+ , QueryService(queryServiceConfig)
, DbCounters(dbCounters) {}
};
diff --git a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
index e17ab10cdc..3fe99ef21f 100644
--- a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
@@ -119,7 +119,7 @@ public:
Config->_KqpTablePathPrefix = Settings.Database;
}
- ApplyServiceConfig(*Config, Settings.Service);
+ ApplyServiceConfig(*Config, Settings.TableService);
Config->FreezeDefaults();
@@ -203,7 +203,7 @@ public:
QueryState->QueryDeadlines.CancelAt = now + QueryState->RequestEv->GetCancelAfter();
}
- auto timeoutMs = GetQueryTimeout(QueryState->RequestEv->GetType(), QueryState->RequestEv->GetOperationTimeout().MilliSeconds(), Settings.Service);
+ auto timeoutMs = GetQueryTimeout(QueryState->RequestEv->GetType(), QueryState->RequestEv->GetOperationTimeout().MilliSeconds(), Settings.TableService, Settings.QueryService);
QueryState->QueryDeadlines.TimeoutAt = now + timeoutMs;
auto onError = [this, &ctx] (Ydb::StatusIds::StatusCode status, const TString& message) {
@@ -329,7 +329,7 @@ public:
if (CleanupState->Final) {
ReplyProcessError(ev->Sender, proxyRequestId, Ydb::StatusIds::BAD_SESSION, "Session is being closed");
} else {
- auto busyStatus = Settings.Service.GetUseSessionBusyStatus()
+ auto busyStatus = Settings.TableService.GetUseSessionBusyStatus()
? Ydb::StatusIds::SESSION_BUSY
: Ydb::StatusIds::PRECONDITION_FAILED;
@@ -874,7 +874,7 @@ private:
return;
}
- auto busyStatus = Settings.Service.GetUseSessionBusyStatus()
+ auto busyStatus = Settings.TableService.GetUseSessionBusyStatus()
? Ydb::StatusIds::SESSION_BUSY
: Ydb::StatusIds::PRECONDITION_FAILED;
@@ -1014,7 +1014,7 @@ private:
}
static TKikimrQueryLimits GetQueryLimits(const TKqpWorkerSettings& settings) {
- const auto& queryLimitsProto = settings.Service.GetQueryLimits();
+ const auto& queryLimitsProto = settings.TableService.GetQueryLimits();
const auto& phaseLimitsProto = queryLimitsProto.GetPhaseLimits();
TKikimrQueryLimits queryLimits;
diff --git a/ydb/core/kqp/session_actor/kqp_worker_common.cpp b/ydb/core/kqp/session_actor/kqp_worker_common.cpp
index 901fed55c0..2237192d18 100644
--- a/ydb/core/kqp/session_actor/kqp_worker_common.cpp
+++ b/ydb/core/kqp/session_actor/kqp_worker_common.cpp
@@ -67,7 +67,7 @@ bool IsSameProtoTypeImpl(const NKikimrMiniKQL::TVariantType& actual, const NKiki
} // namespace
TKikimrQueryLimits GetQueryLimits(const TKqpWorkerSettings& settings) {
- const auto& queryLimitsProto = settings.Service.GetQueryLimits();
+ const auto& queryLimitsProto = settings.TableService.GetQueryLimits();
const auto& phaseLimitsProto = queryLimitsProto.GetPhaseLimits();
TKikimrQueryLimits queryLimits;
diff --git a/ydb/core/kqp/session_actor/kqp_worker_common.h b/ydb/core/kqp/session_actor/kqp_worker_common.h
index ad32008494..1fecdefbcc 100644
--- a/ydb/core/kqp/session_actor/kqp_worker_common.h
+++ b/ydb/core/kqp/session_actor/kqp_worker_common.h
@@ -89,7 +89,7 @@ inline TIntrusivePtr<NYql::TKikimrConfiguration> CreateConfig(const TKqpSettings
cfg->_KqpTablePathPrefix = workerSettings.Database;
}
- ApplyServiceConfig(*cfg, workerSettings.Service);
+ ApplyServiceConfig(*cfg, workerSettings.TableService);
cfg->FreezeDefaults();
return cfg;
diff --git a/ydb/core/protos/CMakeLists.darwin-x86_64.txt b/ydb/core/protos/CMakeLists.darwin-x86_64.txt
index 4e2496f3b6..88fdf1dbaf 100644
--- a/ydb/core/protos/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/protos/CMakeLists.darwin-x86_64.txt
@@ -26,6 +26,7 @@ target_link_libraries(ydb-core-protos PUBLIC
core-issue-protos
dq-actors-protos
yql-dq-proto
+ providers-common-proto
public-issue-protos
yql-public-types
ydb-library-services
diff --git a/ydb/core/protos/CMakeLists.linux-aarch64.txt b/ydb/core/protos/CMakeLists.linux-aarch64.txt
index ec9fe6c704..5d7ad322df 100644
--- a/ydb/core/protos/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/protos/CMakeLists.linux-aarch64.txt
@@ -27,6 +27,7 @@ target_link_libraries(ydb-core-protos PUBLIC
core-issue-protos
dq-actors-protos
yql-dq-proto
+ providers-common-proto
public-issue-protos
yql-public-types
ydb-library-services
diff --git a/ydb/core/protos/CMakeLists.linux-x86_64.txt b/ydb/core/protos/CMakeLists.linux-x86_64.txt
index ec9fe6c704..5d7ad322df 100644
--- a/ydb/core/protos/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/protos/CMakeLists.linux-x86_64.txt
@@ -27,6 +27,7 @@ target_link_libraries(ydb-core-protos PUBLIC
core-issue-protos
dq-actors-protos
yql-dq-proto
+ providers-common-proto
public-issue-protos
yql-public-types
ydb-library-services
diff --git a/ydb/core/protos/CMakeLists.windows-x86_64.txt b/ydb/core/protos/CMakeLists.windows-x86_64.txt
index 4e2496f3b6..88fdf1dbaf 100644
--- a/ydb/core/protos/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/protos/CMakeLists.windows-x86_64.txt
@@ -26,6 +26,7 @@ target_link_libraries(ydb-core-protos PUBLIC
core-issue-protos
dq-actors-protos
yql-dq-proto
+ providers-common-proto
public-issue-protos
yql-public-types
ydb-library-services
diff --git a/ydb/core/protos/ya.make b/ydb/core/protos/ya.make
index b1ef42be64..7736817dbc 100644
--- a/ydb/core/protos/ya.make
+++ b/ydb/core/protos/ya.make
@@ -148,6 +148,7 @@ PEERDIR(
ydb/library/yql/core/issue/protos
ydb/library/yql/dq/actors/protos
ydb/library/yql/dq/proto
+ ydb/library/yql/providers/common/proto
ydb/library/yql/public/issue/protos
ydb/library/yql/public/types
ydb/library/services
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index c53a12b2c2..904f24f6b6 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -818,6 +818,7 @@ namespace Tests {
IActor* kqpProxyService = NKqp::CreateKqpProxyService(Settings->AppConfig.GetLogConfig(),
Settings->AppConfig.GetTableServiceConfig(),
Settings->AppConfig.GetAuthConfig().GetTokenAccessorConfig(),
+ Settings->AppConfig.GetQueryServiceConfig(),
TVector<NKikimrKqp::TKqpSetting>(Settings->KqpSettings),
nullptr, std::move(kqpProxySharedResources));
TActorId kqpProxyServiceId = Runtime->Register(kqpProxyService, nodeIdx);