diff options
author | hcpp <[email protected]> | 2023-08-07 08:45:50 +0300 |
---|---|---|
committer | hcpp <[email protected]> | 2023-08-07 09:23:30 +0300 |
commit | 62028269a141aa22adcee991fe7a34e6a0a91012 (patch) | |
tree | 4cefa5acffad9e7bea0cc034bc34c371e50a87e4 | |
parent | 382ecfbdf714ad4e01c41c122c70ccd41e50730e (diff) |
pushing new parameters
9 files changed, 63 insertions, 6 deletions
diff --git a/ydb/core/fq/libs/actors/pending_fetcher.cpp b/ydb/core/fq/libs/actors/pending_fetcher.cpp index a041a9d9af1..3e7c660e63b 100644 --- a/ydb/core/fq/libs/actors/pending_fetcher.cpp +++ b/ydb/core/fq/libs/actors/pending_fetcher.cpp @@ -393,7 +393,8 @@ private: resources, task.execution_id(), task.operation_id(), - computeConnection + computeConnection, + NProtoInterop::CastFromProto(task.result_ttl()) ); auto runActorId = diff --git a/ydb/core/fq/libs/compute/common/run_actor_params.cpp b/ydb/core/fq/libs/compute/common/run_actor_params.cpp index a939a8969f0..b621f895c81 100644 --- a/ydb/core/fq/libs/compute/common/run_actor_params.cpp +++ b/ydb/core/fq/libs/compute/common/run_actor_params.cpp @@ -53,7 +53,8 @@ TRunActorParams::TRunActorParams( const Fq::Private::TaskResources& resources, const TString& executionId, const TString& operationId, - const NFq::NConfig::TYdbStorageConfig& computeConnection + const NFq::NConfig::TYdbStorageConfig& computeConnection, + TDuration resultTtl ) : YqSharedResources(yqSharedResources) , CredentialsProviderFactory(credentialsProviderFactory) @@ -104,6 +105,7 @@ TRunActorParams::TRunActorParams( , ExecutionId(executionId) , OperationId(operationId, true) , ComputeConnection(computeConnection) + , ResultTtl(resultTtl) { } @@ -128,6 +130,7 @@ IOutputStream& operator<<(IOutputStream& out, const TRunActorParams& params) { << " ExecutionId: " << params.ExecutionId << " OperationId: " << (params.OperationId.GetKind() != Ydb::TOperationId::UNUSED ? ProtoToString(params.OperationId) : "<empty>") << " ComputeConnection: " << params.ComputeConnection.ShortDebugString() + << " ResultTtl: " << params.ResultTtl << " }"; } diff --git a/ydb/core/fq/libs/compute/common/run_actor_params.h b/ydb/core/fq/libs/compute/common/run_actor_params.h index 2e307bbb4e8..f3c1f54dbe1 100644 --- a/ydb/core/fq/libs/compute/common/run_actor_params.h +++ b/ydb/core/fq/libs/compute/common/run_actor_params.h @@ -73,7 +73,8 @@ struct TRunActorParams { // TODO2 : Change name const Fq::Private::TaskResources& resources, const TString& executionId, const TString& operationId, - const NFq::NConfig::TYdbStorageConfig& computeConnection + const NFq::NConfig::TYdbStorageConfig& computeConnection, + TDuration resultTtl ); TRunActorParams(const TRunActorParams& params) = default; @@ -133,6 +134,7 @@ struct TRunActorParams { // TODO2 : Change name TString ExecutionId; NYdb::TOperation::TOperationId OperationId; NFq::NConfig::TYdbStorageConfig ComputeConnection; + TDuration ResultTtl; }; } /* NFq */ diff --git a/ydb/core/fq/libs/compute/ydb/events/events.h b/ydb/core/fq/libs/compute/ydb/events/events.h index b055ec586c2..6888ecca3bb 100644 --- a/ydb/core/fq/libs/compute/ydb/events/events.h +++ b/ydb/core/fq/libs/compute/ydb/events/events.h @@ -52,13 +52,21 @@ struct TEvYdbCompute { // Events struct TEvExecuteScriptRequest : public NActors::TEventLocal<TEvExecuteScriptRequest, EvExecuteScriptRequest> { - TEvExecuteScriptRequest(TString sql, TString idempotencyKey) + TEvExecuteScriptRequest(TString sql, TString idempotencyKey, const TDuration& resultTtl, const TDuration& operationTimeout, Ydb::Query::Syntax syntax, Ydb::Query::ExecMode execMode) : Sql(std::move(sql)) , IdempotencyKey(std::move(idempotencyKey)) + , ResultTtl(resultTtl) + , OperationTimeout(operationTimeout) + , Syntax(syntax) + , ExecMode(execMode) {} TString Sql; TString IdempotencyKey; + TDuration ResultTtl; + TDuration OperationTimeout; + Ydb::Query::Syntax Syntax = Ydb::Query::SYNTAX_YQL_V1; + Ydb::Query::ExecMode ExecMode = Ydb::Query::EXEC_MODE_EXECUTE; }; struct TEvExecuteScriptResponse : public NActors::TEventLocal<TEvExecuteScriptResponse, EvExecuteScriptResponse> { diff --git a/ydb/core/fq/libs/compute/ydb/executer_actor.cpp b/ydb/core/fq/libs/compute/ydb/executer_actor.cpp index 49d5378ef87..27325504bfb 100644 --- a/ydb/core/fq/libs/compute/ydb/executer_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/executer_actor.cpp @@ -114,7 +114,39 @@ public: } void SendExecuteScript() { - Register(new TRetryActor<TEvYdbCompute::TEvExecuteScriptRequest, TEvYdbCompute::TEvExecuteScriptResponse, TString, TString>(Counters.GetCounters(ERequestType::RT_EXECUTE_SCRIPT), SelfId(), Connector, Params.Sql, Params.JobId)); + Register(new TRetryActor<TEvYdbCompute::TEvExecuteScriptRequest, TEvYdbCompute::TEvExecuteScriptResponse, TString, TString, TDuration, TDuration, Ydb::Query::Syntax, Ydb::Query::ExecMode>(Counters.GetCounters(ERequestType::RT_EXECUTE_SCRIPT), SelfId(), Connector, Params.Sql, Params.JobId, Params.ResultTtl, Params.ExecutionTtl, GetSyntax(), GetExecuteMode())); + } + + Ydb::Query::Syntax GetSyntax() const { + switch (Params.QuerySyntax) { + case FederatedQuery::QueryContent::PG: + return Ydb::Query::SYNTAX_PG; + case FederatedQuery::QueryContent::YQL_V1: + return Ydb::Query::SYNTAX_YQL_V1; + case FederatedQuery::QueryContent::QUERY_SYNTAX_UNSPECIFIED: + case FederatedQuery::QueryContent_QuerySyntax_QueryContent_QuerySyntax_INT_MAX_SENTINEL_DO_NOT_USE_: + case FederatedQuery::QueryContent_QuerySyntax_QueryContent_QuerySyntax_INT_MIN_SENTINEL_DO_NOT_USE_: + return Ydb::Query::SYNTAX_UNSPECIFIED; + } + } + + Ydb::Query::ExecMode GetExecuteMode() const { + switch (Params.ExecuteMode) { + case FederatedQuery::RUN: + return Ydb::Query::ExecMode::EXEC_MODE_EXECUTE; + case FederatedQuery::PARSE: + return Ydb::Query::ExecMode::EXEC_MODE_PARSE; + case FederatedQuery::VALIDATE: + return Ydb::Query::ExecMode::EXEC_MODE_VALIDATE; + case FederatedQuery::EXPLAIN: + return Ydb::Query::ExecMode::EXEC_MODE_EXPLAIN; + case FederatedQuery::EXECUTE_MODE_UNSPECIFIED: + case FederatedQuery::COMPILE: + case FederatedQuery::SAVE: + case FederatedQuery::ExecuteMode_INT_MAX_SENTINEL_DO_NOT_USE_: + case FederatedQuery::ExecuteMode_INT_MIN_SENTINEL_DO_NOT_USE_: + return Ydb::Query::ExecMode::EXEC_MODE_UNSPECIFIED; + } } void SendPingTask() { diff --git a/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp index 4765e00b236..01197e381e1 100644 --- a/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp @@ -43,8 +43,14 @@ public: ) void Handle(const TEvYdbCompute::TEvExecuteScriptRequest::TPtr& ev) { + const auto& event = *ev->Get(); + NYdb::NQuery::TExecuteScriptSettings settings; + settings.ResultsTtl(event.ResultTtl); + settings.OperationTimeout(event.OperationTimeout); + settings.Syntax(event.Syntax); + settings.ExecMode(event.ExecMode); QueryClient - ->ExecuteScript(ev->Get()->Sql) + ->ExecuteScript(event.Sql, settings) .Apply([actorSystem = NActors::TActivationContext::ActorSystem(), recipient = ev->Sender, cookie = ev->Cookie](auto future) { try { auto response = future.ExtractValueSync(); diff --git a/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp b/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp index b32f8e1b557..a778a751f85 100644 --- a/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp +++ b/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp @@ -199,6 +199,8 @@ std::tuple<TString, NYdb::TParams, std::function<std::pair<TString, NYdb::TParam throw TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for query internal. Please contact internal support"; } + *task.Internal.mutable_result_ttl() = NProtoInterop::CastToProto(resultSetsTtl); + if (disableCurrentIam) { task.Internal.clear_token(); } @@ -500,6 +502,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ newTask->set_execution_id(task.Internal.execution_id()); newTask->set_operation_id(task.Internal.operation_id()); *newTask->mutable_compute_connection() = task.Internal.compute_connection(); + *newTask->mutable_result_ttl() = task.Internal.result_ttl(); } return result; diff --git a/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto b/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto index 7cc0a85a64f..1d7f9f3b2f6 100644 --- a/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto +++ b/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto @@ -48,6 +48,7 @@ message QueryInternal { string operation_id = 24; string execution_id = 25; NFq.NConfig.TYdbStorageConfig compute_connection = 26; + google.protobuf.Duration result_ttl = 27; } message JobInternal { diff --git a/ydb/core/fq/libs/protos/fq_private.proto b/ydb/core/fq/libs/protos/fq_private.proto index b1e9003e876..5513f3a3eb5 100644 --- a/ydb/core/fq/libs/protos/fq_private.proto +++ b/ydb/core/fq/libs/protos/fq_private.proto @@ -118,6 +118,7 @@ message GetTaskResult { string operation_id = 35; string execution_id = 36; NFq.NConfig.TYdbStorageConfig compute_connection = 37; + google.protobuf.Duration result_ttl = 38; } repeated Task tasks = 1; } |