summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhcpp <[email protected]>2023-08-07 08:45:50 +0300
committerhcpp <[email protected]>2023-08-07 09:23:30 +0300
commit62028269a141aa22adcee991fe7a34e6a0a91012 (patch)
tree4cefa5acffad9e7bea0cc034bc34c371e50a87e4
parent382ecfbdf714ad4e01c41c122c70ccd41e50730e (diff)
pushing new parameters
-rw-r--r--ydb/core/fq/libs/actors/pending_fetcher.cpp3
-rw-r--r--ydb/core/fq/libs/compute/common/run_actor_params.cpp5
-rw-r--r--ydb/core/fq/libs/compute/common/run_actor_params.h4
-rw-r--r--ydb/core/fq/libs/compute/ydb/events/events.h10
-rw-r--r--ydb/core/fq/libs/compute/ydb/executer_actor.cpp34
-rw-r--r--ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp8
-rw-r--r--ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp3
-rw-r--r--ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto1
-rw-r--r--ydb/core/fq/libs/protos/fq_private.proto1
9 files changed, 63 insertions, 6 deletions
diff --git a/ydb/core/fq/libs/actors/pending_fetcher.cpp b/ydb/core/fq/libs/actors/pending_fetcher.cpp
index a041a9d9af1..3e7c660e63b 100644
--- a/ydb/core/fq/libs/actors/pending_fetcher.cpp
+++ b/ydb/core/fq/libs/actors/pending_fetcher.cpp
@@ -393,7 +393,8 @@ private:
resources,
task.execution_id(),
task.operation_id(),
- computeConnection
+ computeConnection,
+ NProtoInterop::CastFromProto(task.result_ttl())
);
auto runActorId =
diff --git a/ydb/core/fq/libs/compute/common/run_actor_params.cpp b/ydb/core/fq/libs/compute/common/run_actor_params.cpp
index a939a8969f0..b621f895c81 100644
--- a/ydb/core/fq/libs/compute/common/run_actor_params.cpp
+++ b/ydb/core/fq/libs/compute/common/run_actor_params.cpp
@@ -53,7 +53,8 @@ TRunActorParams::TRunActorParams(
const Fq::Private::TaskResources& resources,
const TString& executionId,
const TString& operationId,
- const NFq::NConfig::TYdbStorageConfig& computeConnection
+ const NFq::NConfig::TYdbStorageConfig& computeConnection,
+ TDuration resultTtl
)
: YqSharedResources(yqSharedResources)
, CredentialsProviderFactory(credentialsProviderFactory)
@@ -104,6 +105,7 @@ TRunActorParams::TRunActorParams(
, ExecutionId(executionId)
, OperationId(operationId, true)
, ComputeConnection(computeConnection)
+ , ResultTtl(resultTtl)
{
}
@@ -128,6 +130,7 @@ IOutputStream& operator<<(IOutputStream& out, const TRunActorParams& params) {
<< " ExecutionId: " << params.ExecutionId
<< " OperationId: " << (params.OperationId.GetKind() != Ydb::TOperationId::UNUSED ? ProtoToString(params.OperationId) : "<empty>")
<< " ComputeConnection: " << params.ComputeConnection.ShortDebugString()
+ << " ResultTtl: " << params.ResultTtl
<< " }";
}
diff --git a/ydb/core/fq/libs/compute/common/run_actor_params.h b/ydb/core/fq/libs/compute/common/run_actor_params.h
index 2e307bbb4e8..f3c1f54dbe1 100644
--- a/ydb/core/fq/libs/compute/common/run_actor_params.h
+++ b/ydb/core/fq/libs/compute/common/run_actor_params.h
@@ -73,7 +73,8 @@ struct TRunActorParams { // TODO2 : Change name
const Fq::Private::TaskResources& resources,
const TString& executionId,
const TString& operationId,
- const NFq::NConfig::TYdbStorageConfig& computeConnection
+ const NFq::NConfig::TYdbStorageConfig& computeConnection,
+ TDuration resultTtl
);
TRunActorParams(const TRunActorParams& params) = default;
@@ -133,6 +134,7 @@ struct TRunActorParams { // TODO2 : Change name
TString ExecutionId;
NYdb::TOperation::TOperationId OperationId;
NFq::NConfig::TYdbStorageConfig ComputeConnection;
+ TDuration ResultTtl;
};
} /* NFq */
diff --git a/ydb/core/fq/libs/compute/ydb/events/events.h b/ydb/core/fq/libs/compute/ydb/events/events.h
index b055ec586c2..6888ecca3bb 100644
--- a/ydb/core/fq/libs/compute/ydb/events/events.h
+++ b/ydb/core/fq/libs/compute/ydb/events/events.h
@@ -52,13 +52,21 @@ struct TEvYdbCompute {
// Events
struct TEvExecuteScriptRequest : public NActors::TEventLocal<TEvExecuteScriptRequest, EvExecuteScriptRequest> {
- TEvExecuteScriptRequest(TString sql, TString idempotencyKey)
+ TEvExecuteScriptRequest(TString sql, TString idempotencyKey, const TDuration& resultTtl, const TDuration& operationTimeout, Ydb::Query::Syntax syntax, Ydb::Query::ExecMode execMode)
: Sql(std::move(sql))
, IdempotencyKey(std::move(idempotencyKey))
+ , ResultTtl(resultTtl)
+ , OperationTimeout(operationTimeout)
+ , Syntax(syntax)
+ , ExecMode(execMode)
{}
TString Sql;
TString IdempotencyKey;
+ TDuration ResultTtl;
+ TDuration OperationTimeout;
+ Ydb::Query::Syntax Syntax = Ydb::Query::SYNTAX_YQL_V1;
+ Ydb::Query::ExecMode ExecMode = Ydb::Query::EXEC_MODE_EXECUTE;
};
struct TEvExecuteScriptResponse : public NActors::TEventLocal<TEvExecuteScriptResponse, EvExecuteScriptResponse> {
diff --git a/ydb/core/fq/libs/compute/ydb/executer_actor.cpp b/ydb/core/fq/libs/compute/ydb/executer_actor.cpp
index 49d5378ef87..27325504bfb 100644
--- a/ydb/core/fq/libs/compute/ydb/executer_actor.cpp
+++ b/ydb/core/fq/libs/compute/ydb/executer_actor.cpp
@@ -114,7 +114,39 @@ public:
}
void SendExecuteScript() {
- Register(new TRetryActor<TEvYdbCompute::TEvExecuteScriptRequest, TEvYdbCompute::TEvExecuteScriptResponse, TString, TString>(Counters.GetCounters(ERequestType::RT_EXECUTE_SCRIPT), SelfId(), Connector, Params.Sql, Params.JobId));
+ Register(new TRetryActor<TEvYdbCompute::TEvExecuteScriptRequest, TEvYdbCompute::TEvExecuteScriptResponse, TString, TString, TDuration, TDuration, Ydb::Query::Syntax, Ydb::Query::ExecMode>(Counters.GetCounters(ERequestType::RT_EXECUTE_SCRIPT), SelfId(), Connector, Params.Sql, Params.JobId, Params.ResultTtl, Params.ExecutionTtl, GetSyntax(), GetExecuteMode()));
+ }
+
+ Ydb::Query::Syntax GetSyntax() const {
+ switch (Params.QuerySyntax) {
+ case FederatedQuery::QueryContent::PG:
+ return Ydb::Query::SYNTAX_PG;
+ case FederatedQuery::QueryContent::YQL_V1:
+ return Ydb::Query::SYNTAX_YQL_V1;
+ case FederatedQuery::QueryContent::QUERY_SYNTAX_UNSPECIFIED:
+ case FederatedQuery::QueryContent_QuerySyntax_QueryContent_QuerySyntax_INT_MAX_SENTINEL_DO_NOT_USE_:
+ case FederatedQuery::QueryContent_QuerySyntax_QueryContent_QuerySyntax_INT_MIN_SENTINEL_DO_NOT_USE_:
+ return Ydb::Query::SYNTAX_UNSPECIFIED;
+ }
+ }
+
+ Ydb::Query::ExecMode GetExecuteMode() const {
+ switch (Params.ExecuteMode) {
+ case FederatedQuery::RUN:
+ return Ydb::Query::ExecMode::EXEC_MODE_EXECUTE;
+ case FederatedQuery::PARSE:
+ return Ydb::Query::ExecMode::EXEC_MODE_PARSE;
+ case FederatedQuery::VALIDATE:
+ return Ydb::Query::ExecMode::EXEC_MODE_VALIDATE;
+ case FederatedQuery::EXPLAIN:
+ return Ydb::Query::ExecMode::EXEC_MODE_EXPLAIN;
+ case FederatedQuery::EXECUTE_MODE_UNSPECIFIED:
+ case FederatedQuery::COMPILE:
+ case FederatedQuery::SAVE:
+ case FederatedQuery::ExecuteMode_INT_MAX_SENTINEL_DO_NOT_USE_:
+ case FederatedQuery::ExecuteMode_INT_MIN_SENTINEL_DO_NOT_USE_:
+ return Ydb::Query::ExecMode::EXEC_MODE_UNSPECIFIED;
+ }
}
void SendPingTask() {
diff --git a/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp
index 4765e00b236..01197e381e1 100644
--- a/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp
+++ b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp
@@ -43,8 +43,14 @@ public:
)
void Handle(const TEvYdbCompute::TEvExecuteScriptRequest::TPtr& ev) {
+ const auto& event = *ev->Get();
+ NYdb::NQuery::TExecuteScriptSettings settings;
+ settings.ResultsTtl(event.ResultTtl);
+ settings.OperationTimeout(event.OperationTimeout);
+ settings.Syntax(event.Syntax);
+ settings.ExecMode(event.ExecMode);
QueryClient
- ->ExecuteScript(ev->Get()->Sql)
+ ->ExecuteScript(event.Sql, settings)
.Apply([actorSystem = NActors::TActivationContext::ActorSystem(), recipient = ev->Sender, cookie = ev->Cookie](auto future) {
try {
auto response = future.ExtractValueSync();
diff --git a/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp b/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp
index b32f8e1b557..a778a751f85 100644
--- a/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp
+++ b/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp
@@ -199,6 +199,8 @@ std::tuple<TString, NYdb::TParams, std::function<std::pair<TString, NYdb::TParam
throw TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for query internal. Please contact internal support";
}
+ *task.Internal.mutable_result_ttl() = NProtoInterop::CastToProto(resultSetsTtl);
+
if (disableCurrentIam) {
task.Internal.clear_token();
}
@@ -500,6 +502,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ
newTask->set_execution_id(task.Internal.execution_id());
newTask->set_operation_id(task.Internal.operation_id());
*newTask->mutable_compute_connection() = task.Internal.compute_connection();
+ *newTask->mutable_result_ttl() = task.Internal.result_ttl();
}
return result;
diff --git a/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto b/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto
index 7cc0a85a64f..1d7f9f3b2f6 100644
--- a/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto
+++ b/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto
@@ -48,6 +48,7 @@ message QueryInternal {
string operation_id = 24;
string execution_id = 25;
NFq.NConfig.TYdbStorageConfig compute_connection = 26;
+ google.protobuf.Duration result_ttl = 27;
}
message JobInternal {
diff --git a/ydb/core/fq/libs/protos/fq_private.proto b/ydb/core/fq/libs/protos/fq_private.proto
index b1e9003e876..5513f3a3eb5 100644
--- a/ydb/core/fq/libs/protos/fq_private.proto
+++ b/ydb/core/fq/libs/protos/fq_private.proto
@@ -118,6 +118,7 @@ message GetTaskResult {
string operation_id = 35;
string execution_id = 36;
NFq.NConfig.TYdbStorageConfig compute_connection = 37;
+ google.protobuf.Duration result_ttl = 38;
}
repeated Task tasks = 1;
}