aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhcpp <hcpp@ydb.tech>2023-06-09 15:09:10 +0300
committerhcpp <hcpp@ydb.tech>2023-06-09 15:09:10 +0300
commit4e48f42bb1da68482885363f334d1f317a51f78c (patch)
tree64bba2dd0af928f62eceae57edf83b22301af1c6
parent771b2d98e7a5045e038d42923b65879135603368 (diff)
downloadydb-4e48f42bb1da68482885363f334d1f317a51f78c.tar.gz
yqv2 fetcher
cleanup the first version of yqv2
-rw-r--r--ydb/core/fq/libs/CMakeLists.txt1
-rw-r--r--ydb/core/fq/libs/actors/CMakeLists.darwin-x86_64.txt22
-rw-r--r--ydb/core/fq/libs/actors/CMakeLists.linux-aarch64.txt22
-rw-r--r--ydb/core/fq/libs/actors/CMakeLists.linux-x86_64.txt22
-rw-r--r--ydb/core/fq/libs/actors/CMakeLists.windows-x86_64.txt22
-rw-r--r--ydb/core/fq/libs/actors/pending_fetcher.cpp16
-rw-r--r--ydb/core/fq/libs/actors/proxy.h14
-rw-r--r--ydb/core/fq/libs/actors/run_actor.cpp9
-rw-r--r--ydb/core/fq/libs/compute/CMakeLists.txt10
-rw-r--r--ydb/core/fq/libs/compute/common/CMakeLists.darwin-x86_64.txt25
-rw-r--r--ydb/core/fq/libs/compute/common/CMakeLists.linux-aarch64.txt26
-rw-r--r--ydb/core/fq/libs/compute/common/CMakeLists.linux-x86_64.txt26
-rw-r--r--ydb/core/fq/libs/compute/common/CMakeLists.txt17
-rw-r--r--ydb/core/fq/libs/compute/common/CMakeLists.windows-x86_64.txt25
-rw-r--r--ydb/core/fq/libs/compute/common/config.h35
-rw-r--r--ydb/core/fq/libs/compute/common/metrics.h44
-rw-r--r--ydb/core/fq/libs/compute/common/pinger.cpp (renamed from ydb/core/fq/libs/actors/pinger.cpp)23
-rw-r--r--ydb/core/fq/libs/compute/common/pinger.h23
-rw-r--r--ydb/core/fq/libs/compute/common/retry_actor.h132
-rw-r--r--ydb/core/fq/libs/compute/common/run_actor_params.cpp (renamed from ydb/core/fq/libs/actors/run_actor_params.cpp)29
-rw-r--r--ydb/core/fq/libs/compute/common/run_actor_params.h (renamed from ydb/core/fq/libs/actors/run_actor_params.h)17
-rw-r--r--ydb/core/fq/libs/compute/ydb/CMakeLists.darwin-x86_64.txt45
-rw-r--r--ydb/core/fq/libs/compute/ydb/CMakeLists.linux-aarch64.txt46
-rw-r--r--ydb/core/fq/libs/compute/ydb/CMakeLists.linux-x86_64.txt46
-rw-r--r--ydb/core/fq/libs/compute/ydb/CMakeLists.txt17
-rw-r--r--ydb/core/fq/libs/compute/ydb/CMakeLists.windows-x86_64.txt45
-rw-r--r--ydb/core/fq/libs/compute/ydb/actors_factory.cpp87
-rw-r--r--ydb/core/fq/libs/compute/ydb/actors_factory.h43
-rw-r--r--ydb/core/fq/libs/compute/ydb/base_compute_actor.h61
-rw-r--r--ydb/core/fq/libs/compute/ydb/events/events.h232
-rw-r--r--ydb/core/fq/libs/compute/ydb/executer_actor.cpp150
-rw-r--r--ydb/core/fq/libs/compute/ydb/executer_actor.h17
-rw-r--r--ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp124
-rw-r--r--ydb/core/fq/libs/compute/ydb/finalizer_actor.h19
-rw-r--r--ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.cpp111
-rw-r--r--ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.h17
-rw-r--r--ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp196
-rw-r--r--ydb/core/fq/libs/compute/ydb/result_writer_actor.h18
-rw-r--r--ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp186
-rw-r--r--ydb/core/fq/libs/compute/ydb/status_tracker_actor.h18
-rw-r--r--ydb/core/fq/libs/compute/ydb/stopper_actor.cpp111
-rw-r--r--ydb/core/fq/libs/compute/ydb/stopper_actor.h17
-rw-r--r--ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp128
-rw-r--r--ydb/core/fq/libs/compute/ydb/ydb_connector_actor.h9
-rw-r--r--ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp217
-rw-r--r--ydb/core/fq/libs/compute/ydb/ydb_run_actor.h38
-rw-r--r--ydb/core/fq/libs/config/protos/compute.proto33
-rw-r--r--ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp3
-rw-r--r--ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp12
-rw-r--r--ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto4
-rw-r--r--ydb/core/fq/libs/control_plane_storage/util.cpp3
-rw-r--r--ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp2
-rw-r--r--ydb/core/fq/libs/events/events.h2
-rw-r--r--ydb/core/fq/libs/protos/fq_private.proto4
-rw-r--r--ydb/core/fq/libs/ydb/ydb.cpp48
-rw-r--r--ydb/core/fq/libs/ydb/ydb.h44
56 files changed, 2585 insertions, 128 deletions
diff --git a/ydb/core/fq/libs/CMakeLists.txt b/ydb/core/fq/libs/CMakeLists.txt
index bda2195d98..bf260a6276 100644
--- a/ydb/core/fq/libs/CMakeLists.txt
+++ b/ydb/core/fq/libs/CMakeLists.txt
@@ -13,6 +13,7 @@ add_subdirectory(checkpointing)
add_subdirectory(checkpointing_common)
add_subdirectory(cloud_audit)
add_subdirectory(common)
+add_subdirectory(compute)
add_subdirectory(config)
add_subdirectory(control_plane_config)
add_subdirectory(control_plane_proxy)
diff --git a/ydb/core/fq/libs/actors/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/actors/CMakeLists.darwin-x86_64.txt
index 116fc09710..9dfe2f8f63 100644
--- a/ydb/core/fq/libs/actors/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/fq/libs/actors/CMakeLists.darwin-x86_64.txt
@@ -29,10 +29,12 @@ target_link_libraries(fq-libs-actors PUBLIC
libs-actors-logging
fq-libs-checkpointing
fq-libs-checkpointing_common
- fq-libs-db_id_async_resolver_impl
fq-libs-common
+ libs-compute-common
+ libs-compute-ydb
fq-libs-control_plane_storage
libs-control_plane_storage-events
+ fq-libs-db_id_async_resolver_impl
fq-libs-db_schema
fq-libs-events
fq-libs-grpc
@@ -47,14 +49,9 @@ target_link_libraries(fq-libs-actors PUBLIC
library-yql-ast
yql-core-facade
core-services-mounts
+ dq-integration-transform
library-yql-minikql
yql-minikql-comp_nodes
- common-token_accessor-client
- yql-public-issue
- public-issue-protos
- yql-sql-settings
- yql-utils-actor_log
- dq-integration-transform
providers-clickhouse-provider
providers-common-codec
providers-common-comp_nodes
@@ -62,6 +59,7 @@ target_link_libraries(fq-libs-actors PUBLIC
providers-common-metrics
providers-common-provider
common-schema-mkql
+ common-token_accessor-client
providers-common-udf_resolve
providers-dq-actors
providers-dq-common
@@ -74,9 +72,15 @@ target_link_libraries(fq-libs-actors PUBLIC
providers-pq-task_meta
providers-s3-provider
providers-ydb-provider
+ yql-public-issue
+ public-issue-protos
+ yql-sql-settings
library-yql-utils
+ yql-utils-actor_log
api-protos
public-lib-fq
+ client-draft-ydb_query
+ cpp-client-ydb_operation
cpp-client-ydb_table
)
target_sources(fq-libs-actors PRIVATE
@@ -86,14 +90,12 @@ target_sources(fq-libs-actors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/nodes_health_check.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/nodes_manager.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/pending_fetcher.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/pinger.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/proxy.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/proxy_private.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/rate_limiter_resources.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/rate_limiter.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/rate_limiter_resources.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/result_writer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/run_actor.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/run_actor_params.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/table_bindings_from_bindings.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/task_get.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/task_ping.cpp
diff --git a/ydb/core/fq/libs/actors/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/actors/CMakeLists.linux-aarch64.txt
index 587f709afd..2e61ddafd6 100644
--- a/ydb/core/fq/libs/actors/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/fq/libs/actors/CMakeLists.linux-aarch64.txt
@@ -30,10 +30,12 @@ target_link_libraries(fq-libs-actors PUBLIC
libs-actors-logging
fq-libs-checkpointing
fq-libs-checkpointing_common
- fq-libs-db_id_async_resolver_impl
fq-libs-common
+ libs-compute-common
+ libs-compute-ydb
fq-libs-control_plane_storage
libs-control_plane_storage-events
+ fq-libs-db_id_async_resolver_impl
fq-libs-db_schema
fq-libs-events
fq-libs-grpc
@@ -48,14 +50,9 @@ target_link_libraries(fq-libs-actors PUBLIC
library-yql-ast
yql-core-facade
core-services-mounts
+ dq-integration-transform
library-yql-minikql
yql-minikql-comp_nodes
- common-token_accessor-client
- yql-public-issue
- public-issue-protos
- yql-sql-settings
- yql-utils-actor_log
- dq-integration-transform
providers-clickhouse-provider
providers-common-codec
providers-common-comp_nodes
@@ -63,6 +60,7 @@ target_link_libraries(fq-libs-actors PUBLIC
providers-common-metrics
providers-common-provider
common-schema-mkql
+ common-token_accessor-client
providers-common-udf_resolve
providers-dq-actors
providers-dq-common
@@ -75,9 +73,15 @@ target_link_libraries(fq-libs-actors PUBLIC
providers-pq-task_meta
providers-s3-provider
providers-ydb-provider
+ yql-public-issue
+ public-issue-protos
+ yql-sql-settings
library-yql-utils
+ yql-utils-actor_log
api-protos
public-lib-fq
+ client-draft-ydb_query
+ cpp-client-ydb_operation
cpp-client-ydb_table
)
target_sources(fq-libs-actors PRIVATE
@@ -87,14 +91,12 @@ target_sources(fq-libs-actors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/nodes_health_check.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/nodes_manager.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/pending_fetcher.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/pinger.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/proxy.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/proxy_private.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/rate_limiter_resources.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/rate_limiter.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/rate_limiter_resources.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/result_writer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/run_actor.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/run_actor_params.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/table_bindings_from_bindings.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/task_get.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/task_ping.cpp
diff --git a/ydb/core/fq/libs/actors/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/actors/CMakeLists.linux-x86_64.txt
index 587f709afd..2e61ddafd6 100644
--- a/ydb/core/fq/libs/actors/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/fq/libs/actors/CMakeLists.linux-x86_64.txt
@@ -30,10 +30,12 @@ target_link_libraries(fq-libs-actors PUBLIC
libs-actors-logging
fq-libs-checkpointing
fq-libs-checkpointing_common
- fq-libs-db_id_async_resolver_impl
fq-libs-common
+ libs-compute-common
+ libs-compute-ydb
fq-libs-control_plane_storage
libs-control_plane_storage-events
+ fq-libs-db_id_async_resolver_impl
fq-libs-db_schema
fq-libs-events
fq-libs-grpc
@@ -48,14 +50,9 @@ target_link_libraries(fq-libs-actors PUBLIC
library-yql-ast
yql-core-facade
core-services-mounts
+ dq-integration-transform
library-yql-minikql
yql-minikql-comp_nodes
- common-token_accessor-client
- yql-public-issue
- public-issue-protos
- yql-sql-settings
- yql-utils-actor_log
- dq-integration-transform
providers-clickhouse-provider
providers-common-codec
providers-common-comp_nodes
@@ -63,6 +60,7 @@ target_link_libraries(fq-libs-actors PUBLIC
providers-common-metrics
providers-common-provider
common-schema-mkql
+ common-token_accessor-client
providers-common-udf_resolve
providers-dq-actors
providers-dq-common
@@ -75,9 +73,15 @@ target_link_libraries(fq-libs-actors PUBLIC
providers-pq-task_meta
providers-s3-provider
providers-ydb-provider
+ yql-public-issue
+ public-issue-protos
+ yql-sql-settings
library-yql-utils
+ yql-utils-actor_log
api-protos
public-lib-fq
+ client-draft-ydb_query
+ cpp-client-ydb_operation
cpp-client-ydb_table
)
target_sources(fq-libs-actors PRIVATE
@@ -87,14 +91,12 @@ target_sources(fq-libs-actors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/nodes_health_check.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/nodes_manager.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/pending_fetcher.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/pinger.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/proxy.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/proxy_private.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/rate_limiter_resources.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/rate_limiter.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/rate_limiter_resources.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/result_writer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/run_actor.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/run_actor_params.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/table_bindings_from_bindings.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/task_get.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/task_ping.cpp
diff --git a/ydb/core/fq/libs/actors/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/actors/CMakeLists.windows-x86_64.txt
index 116fc09710..9dfe2f8f63 100644
--- a/ydb/core/fq/libs/actors/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/fq/libs/actors/CMakeLists.windows-x86_64.txt
@@ -29,10 +29,12 @@ target_link_libraries(fq-libs-actors PUBLIC
libs-actors-logging
fq-libs-checkpointing
fq-libs-checkpointing_common
- fq-libs-db_id_async_resolver_impl
fq-libs-common
+ libs-compute-common
+ libs-compute-ydb
fq-libs-control_plane_storage
libs-control_plane_storage-events
+ fq-libs-db_id_async_resolver_impl
fq-libs-db_schema
fq-libs-events
fq-libs-grpc
@@ -47,14 +49,9 @@ target_link_libraries(fq-libs-actors PUBLIC
library-yql-ast
yql-core-facade
core-services-mounts
+ dq-integration-transform
library-yql-minikql
yql-minikql-comp_nodes
- common-token_accessor-client
- yql-public-issue
- public-issue-protos
- yql-sql-settings
- yql-utils-actor_log
- dq-integration-transform
providers-clickhouse-provider
providers-common-codec
providers-common-comp_nodes
@@ -62,6 +59,7 @@ target_link_libraries(fq-libs-actors PUBLIC
providers-common-metrics
providers-common-provider
common-schema-mkql
+ common-token_accessor-client
providers-common-udf_resolve
providers-dq-actors
providers-dq-common
@@ -74,9 +72,15 @@ target_link_libraries(fq-libs-actors PUBLIC
providers-pq-task_meta
providers-s3-provider
providers-ydb-provider
+ yql-public-issue
+ public-issue-protos
+ yql-sql-settings
library-yql-utils
+ yql-utils-actor_log
api-protos
public-lib-fq
+ client-draft-ydb_query
+ cpp-client-ydb_operation
cpp-client-ydb_table
)
target_sources(fq-libs-actors PRIVATE
@@ -86,14 +90,12 @@ target_sources(fq-libs-actors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/nodes_health_check.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/nodes_manager.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/pending_fetcher.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/pinger.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/proxy.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/proxy_private.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/rate_limiter_resources.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/rate_limiter.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/rate_limiter_resources.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/result_writer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/run_actor.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/run_actor_params.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/table_bindings_from_bindings.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/task_get.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/task_ping.cpp
diff --git a/ydb/core/fq/libs/actors/pending_fetcher.cpp b/ydb/core/fq/libs/actors/pending_fetcher.cpp
index 4abcf3ed50..8def74bf71 100644
--- a/ydb/core/fq/libs/actors/pending_fetcher.cpp
+++ b/ydb/core/fq/libs/actors/pending_fetcher.cpp
@@ -46,11 +46,14 @@
#include <ydb/core/fq/libs/common/compression.h>
#include <ydb/core/fq/libs/common/entity_id.h>
#include <ydb/core/fq/libs/common/util.h>
-#include <ydb/core/fq/libs/events/events.h>
+#include <ydb/core/fq/libs/compute/common/config.h>
+#include <ydb/core/fq/libs/compute/ydb/actors_factory.h>
+#include <ydb/core/fq/libs/compute/ydb/ydb_run_actor.h>
#include <ydb/core/fq/libs/config/protos/fq_config.pb.h>
#include <ydb/core/fq/libs/config/protos/pinger.pb.h>
#include <ydb/core/fq/libs/control_plane_storage/control_plane_storage.h>
#include <ydb/core/fq/libs/control_plane_storage/events/events.h>
+#include <ydb/core/fq/libs/events/events.h>
#include <ydb/core/fq/libs/private_client/internal_service.h>
#include <library/cpp/actors/core/log.h>
@@ -138,6 +141,7 @@ public:
, TenantName(tenantName)
, InternalServiceId(MakeInternalServiceActorId())
, Monitoring(monitoring)
+ , ComputeConfig(config.GetCompute())
{
Y_ENSURE(GetYqlDefaultModuleResolverWithContext(ModuleResolver));
}
@@ -380,10 +384,15 @@ private:
NProtoInterop::CastFromProto(task.request_started_at()),
task.restart_count(),
task.job_id().value(),
- resources
+ resources,
+ task.execution_id(),
+ task.operation_id()
);
- auto runActorId = Register(CreateRunActor(SelfId(), queryCounters, std::move(params)));
+ auto runActorId =
+ ComputeConfig.GetComputeType(task) == NConfig::EComputeType::YDB
+ ? Register(CreateYdbRunActor(SelfId(), queryCounters, std::move(params), CreateActorFactory(params, queryCounters)))
+ : Register(CreateRunActor(SelfId(), queryCounters, std::move(params)));
RunActorMap[runActorId] = TRunActorInfo { .QueryId = queryId, .QueryName = task.query_name() };
if (!task.automatic()) {
@@ -444,6 +453,7 @@ private:
TString TenantName;
TActorId InternalServiceId;
NActors::TMon* Monitoring;
+ TComputeConfig ComputeConfig;
};
diff --git a/ydb/core/fq/libs/actors/proxy.h b/ydb/core/fq/libs/actors/proxy.h
index 27dde3798a..94ccd2e2f0 100644
--- a/ydb/core/fq/libs/actors/proxy.h
+++ b/ydb/core/fq/libs/actors/proxy.h
@@ -1,8 +1,8 @@
#pragma once
-#include "run_actor_params.h"
#include <ydb/core/mon/mon.h>
+#include <ydb/core/fq/libs/compute/common/run_actor_params.h>
#include <ydb/core/fq/libs/config/protos/pinger.pb.h>
#include <ydb/core/fq/libs/events/events.h>
#include <ydb/core/fq/libs/private_client/private_client.h>
@@ -75,18 +75,6 @@ NActors::IActor* CreateResultWriter(
const TInstant& deadline,
ui64 resultBytesLimit);
-NActors::IActor* CreatePingerActor(
- const TString& tenantName,
- const NYdb::NFq::TScope& scope,
- const TString& userId,
- const TString& id,
- const TString& owner,
- const NActors::TActorId parent,
- const NConfig::TPingerConfig& config,
- TInstant deadline,
- const ::NYql::NCommon::TServiceCounters& queryCounters,
- TInstant createdAt);
-
NActors::IActor* CreateRateLimiterResourceCreator(
const NActors::TActorId& parent,
const TString& ownerId,
diff --git a/ydb/core/fq/libs/actors/run_actor.cpp b/ydb/core/fq/libs/actors/run_actor.cpp
index 5eff64fb87..c69cae91bc 100644
--- a/ydb/core/fq/libs/actors/run_actor.cpp
+++ b/ydb/core/fq/libs/actors/run_actor.cpp
@@ -49,16 +49,17 @@
#include <ydb/core/protos/services.pb.h>
#include <ydb/core/fq/libs/actors/nodes_manager.h>
+#include <ydb/core/fq/libs/checkpoint_storage/storage_service.h>
+#include <ydb/core/fq/libs/checkpointing/checkpoint_coordinator.h>
+#include <ydb/core/fq/libs/checkpointing_common/defs.h>
#include <ydb/core/fq/libs/common/compression.h>
#include <ydb/core/fq/libs/common/entity_id.h>
+#include <ydb/core/fq/libs/compute/common/pinger.h>
#include <ydb/core/fq/libs/control_plane_storage/control_plane_storage.h>
#include <ydb/core/fq/libs/control_plane_storage/events/events.h>
#include <ydb/core/fq/libs/control_plane_storage/util.h>
#include <ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h>
#include <ydb/core/fq/libs/gateway/empty_gateway.h>
-#include <ydb/core/fq/libs/checkpointing/checkpoint_coordinator.h>
-#include <ydb/core/fq/libs/checkpointing_common/defs.h>
-#include <ydb/core/fq/libs/checkpoint_storage/storage_service.h>
#include <ydb/core/fq/libs/private_client/events.h>
#include <ydb/core/fq/libs/private_client/private_client.h>
#include <ydb/core/fq/libs/rate_limiter/utils/path.h>
@@ -1249,7 +1250,7 @@ private:
if (statusCode == NYql::NDqProto::StatusIds::UNSPECIFIED) {
LOG_E("StatusCode == NYql::NDqProto::StatusIds::UNSPECIFIED, it is not expected, the query will be failed.");
}
-
+
if (statusCode != NYql::NDqProto::StatusIds::SUCCESS) {
// Error
ResignQuery(statusCode);
diff --git a/ydb/core/fq/libs/compute/CMakeLists.txt b/ydb/core/fq/libs/compute/CMakeLists.txt
new file mode 100644
index 0000000000..5809a856aa
--- /dev/null
+++ b/ydb/core/fq/libs/compute/CMakeLists.txt
@@ -0,0 +1,10 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(common)
+add_subdirectory(ydb)
diff --git a/ydb/core/fq/libs/compute/common/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/compute/common/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..470f41558c
--- /dev/null
+++ b/ydb/core/fq/libs/compute/common/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,25 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(libs-compute-common)
+target_compile_options(libs-compute-common PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(libs-compute-common PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-config-protos
+ fq-libs-grpc
+ fq-libs-shared_resources
+ providers-dq-provider
+)
+target_sources(libs-compute-common PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/common/pinger.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/common/run_actor_params.cpp
+)
diff --git a/ydb/core/fq/libs/compute/common/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/compute/common/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..ae037da489
--- /dev/null
+++ b/ydb/core/fq/libs/compute/common/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,26 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(libs-compute-common)
+target_compile_options(libs-compute-common PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(libs-compute-common PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-config-protos
+ fq-libs-grpc
+ fq-libs-shared_resources
+ providers-dq-provider
+)
+target_sources(libs-compute-common PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/common/pinger.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/common/run_actor_params.cpp
+)
diff --git a/ydb/core/fq/libs/compute/common/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/compute/common/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..ae037da489
--- /dev/null
+++ b/ydb/core/fq/libs/compute/common/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,26 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(libs-compute-common)
+target_compile_options(libs-compute-common PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(libs-compute-common PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-config-protos
+ fq-libs-grpc
+ fq-libs-shared_resources
+ providers-dq-provider
+)
+target_sources(libs-compute-common PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/common/pinger.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/common/run_actor_params.cpp
+)
diff --git a/ydb/core/fq/libs/compute/common/CMakeLists.txt b/ydb/core/fq/libs/compute/common/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/core/fq/libs/compute/common/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/fq/libs/compute/common/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/compute/common/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..470f41558c
--- /dev/null
+++ b/ydb/core/fq/libs/compute/common/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,25 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(libs-compute-common)
+target_compile_options(libs-compute-common PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(libs-compute-common PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-config-protos
+ fq-libs-grpc
+ fq-libs-shared_resources
+ providers-dq-provider
+)
+target_sources(libs-compute-common PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/common/pinger.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/common/run_actor_params.cpp
+)
diff --git a/ydb/core/fq/libs/compute/common/config.h b/ydb/core/fq/libs/compute/common/config.h
new file mode 100644
index 0000000000..c2b02c7ea1
--- /dev/null
+++ b/ydb/core/fq/libs/compute/common/config.h
@@ -0,0 +1,35 @@
+#pragma once
+
+#include <ydb/core/fq/libs/config/protos/compute.pb.h>
+#include <ydb/core/fq/libs/protos/fq_private.pb.h>
+
+namespace NFq {
+
+class TComputeConfig {
+public:
+ explicit TComputeConfig(const NFq::NConfig::TComputeConfig& computeConfig)
+ : ComputeConfig(computeConfig)
+ {}
+
+ NFq::NConfig::EComputeType GetComputeType(const Fq::Private::GetTaskResult::Task& task) const {
+ for (const auto& rule: ComputeConfig.GetComputeMapping().GetRule()) {
+ const bool isMatched = AllOf(rule.GetKey(),
+ [&task](const auto& key) {
+ switch (key.key_case()) {
+ case NConfig::TComputeMappingRuleKey::kQueryType:
+ return key.GetQueryType() == task.query_type();
+ case NConfig::TComputeMappingRuleKey::KEY_NOT_SET:
+ return false;
+ }
+ });
+ if (isMatched) {
+ return rule.GetCompute();
+ }
+ }
+ return NFq::NConfig::EComputeType::IN_PLACE;
+ }
+private:
+ NFq::NConfig::TComputeConfig ComputeConfig;
+};
+
+} /* NFq */
diff --git a/ydb/core/fq/libs/compute/common/metrics.h b/ydb/core/fq/libs/compute/common/metrics.h
new file mode 100644
index 0000000000..b13e4c81e7
--- /dev/null
+++ b/ydb/core/fq/libs/compute/common/metrics.h
@@ -0,0 +1,44 @@
+#pragma once
+
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+
+namespace NFq {
+
+struct TComputeRequestCounters: public virtual TThrRefBase {
+ const TString Name;
+
+ ::NMonitoring::TDynamicCounterPtr Counters;
+ ::NMonitoring::TDynamicCounters::TCounterPtr InFly;
+ ::NMonitoring::TDynamicCounters::TCounterPtr Ok;
+ ::NMonitoring::TDynamicCounters::TCounterPtr Error;
+ ::NMonitoring::TDynamicCounters::TCounterPtr Retry;
+ ::NMonitoring::THistogramPtr LatencyMs;
+
+ explicit TComputeRequestCounters(const TString& name, const ::NMonitoring::TDynamicCounterPtr& counters = nullptr)
+ : Name(name)
+ , Counters(counters)
+ { }
+
+ void Register(const ::NMonitoring::TDynamicCounterPtr& counters) {
+ Counters = counters;
+ Register();
+ }
+
+ void Register() {
+ ::NMonitoring::TDynamicCounterPtr subgroup = Counters->GetSubgroup("request", Name);
+ InFly = subgroup->GetCounter("InFly", false);
+ Ok = subgroup->GetCounter("Ok", true);
+ Error = subgroup->GetCounter("Error", true);
+ Retry = subgroup->GetCounter("Retry", true);
+ LatencyMs = subgroup->GetHistogram("LatencyMs", GetLatencyHistogramBuckets());
+ }
+
+private:
+ static ::NMonitoring::IHistogramCollectorPtr GetLatencyHistogramBuckets() {
+ return ::NMonitoring::ExplicitHistogram({0, 1, 2, 5, 10, 20, 50, 100, 500, 1000, 2000, 5000, 10000, 30000, 50000, 500000});
+ }
+};
+
+using TComputeRequestCountersPtr = TIntrusivePtr<TComputeRequestCounters>;
+
+} /* NFq */
diff --git a/ydb/core/fq/libs/actors/pinger.cpp b/ydb/core/fq/libs/compute/common/pinger.cpp
index c9fefa9479..e4eeb9fe4f 100644
--- a/ydb/core/fq/libs/actors/pinger.cpp
+++ b/ydb/core/fq/libs/compute/common/pinger.cpp
@@ -1,4 +1,4 @@
-#include "proxy.h"
+#include "pinger.h"
#include <ydb/core/fq/libs/config/protos/pinger.pb.h>
#include <ydb/core/fq/libs/control_plane_storage/control_plane_storage.h>
@@ -148,7 +148,8 @@ public:
const NConfig::TPingerConfig& config,
TInstant deadline,
const ::NYql::NCommon::TServiceCounters& queryCounters,
- TInstant createdAt)
+ TInstant createdAt,
+ bool replyToSender)
: Config(config)
, TenantName(tenantName)
, Scope(scope)
@@ -160,6 +161,7 @@ public:
, QueryCounters(queryCounters)
, CreatedAt(createdAt)
, InternalServiceId(MakeInternalServiceActorId())
+ , ReplyToSender(replyToSender)
{
}
@@ -339,7 +341,7 @@ private:
if (continueLeaseRequest) {
ScheduleNextPing();
} else {
- Send(Parent, new TEvents::TEvForwardPingResponse(true, action), 0, ev->Cookie);
+ Send(ReplyToSender ? ForwardRequests.front().Request->Sender : Parent, new TEvents::TEvForwardPingResponse(true, action), 0, ev->Cookie);
ForwardRequests.pop_front();
// Process next forward ping request.
@@ -357,7 +359,13 @@ private:
}
LOG_E("Ping response error: " << errorMessage << ". Retried " << retryStateForLogging->GetRetriesCount() << " times during " << retryStateForLogging->GetRetryTime(now));
auto action = ev->Get()->Status.IsSuccess() ? ev->Get()->Result.action() : FederatedQuery::QUERY_ACTION_UNSPECIFIED;
- Send(Parent, new TEvents::TEvForwardPingResponse(false, action), 0, ev->Cookie);
+ if (ReplyToSender) {
+ for (const auto& forwardRequest: ForwardRequests) {
+ Send(forwardRequest.Request->Sender, new TEvents::TEvForwardPingResponse(false, action), 0, ev->Cookie);
+ }
+ } else {
+ Send(Parent, new TEvents::TEvForwardPingResponse(false, action), 0, ev->Cookie);
+ }
FatalError = true;
ForwardRequests.clear();
}
@@ -437,6 +445,7 @@ private:
TSchedulerCookieHolder SchedulerCookieHolder;
TActorId InternalServiceId;
+ bool ReplyToSender = false;
};
IActor* CreatePingerActor(
@@ -449,7 +458,8 @@ IActor* CreatePingerActor(
const NConfig::TPingerConfig& config,
TInstant deadline,
const ::NYql::NCommon::TServiceCounters& queryCounters,
- TInstant createdAt)
+ TInstant createdAt,
+ bool replyToSender)
{
return new TPingerActor(
tenantName,
@@ -461,7 +471,8 @@ IActor* CreatePingerActor(
config,
deadline,
queryCounters,
- createdAt);
+ createdAt,
+ replyToSender);
}
} /* NFq */
diff --git a/ydb/core/fq/libs/compute/common/pinger.h b/ydb/core/fq/libs/compute/common/pinger.h
new file mode 100644
index 0000000000..50567377dc
--- /dev/null
+++ b/ydb/core/fq/libs/compute/common/pinger.h
@@ -0,0 +1,23 @@
+#pragma once
+
+#include <library/cpp/actors/core/actorsystem.h>
+#include <ydb/core/fq/libs/config/protos/pinger.pb.h>
+#include <ydb/library/yql/providers/common/metrics/service_counters.h>
+#include <ydb/public/lib/fq/scope.h>
+
+namespace NFq {
+
+NActors::IActor* CreatePingerActor(
+ const TString& tenantName,
+ const NYdb::NFq::TScope& scope,
+ const TString& userId,
+ const TString& id,
+ const TString& owner,
+ const NActors::TActorId parent,
+ const NConfig::TPingerConfig& config,
+ TInstant deadline,
+ const ::NYql::NCommon::TServiceCounters& queryCounters,
+ TInstant createdAt,
+ bool replyToSender = false);
+
+} /* NFq */
diff --git a/ydb/core/fq/libs/compute/common/retry_actor.h b/ydb/core/fq/libs/compute/common/retry_actor.h
new file mode 100644
index 0000000000..8ffceb6357
--- /dev/null
+++ b/ydb/core/fq/libs/compute/common/retry_actor.h
@@ -0,0 +1,132 @@
+#pragma once
+
+#include "metrics.h"
+
+#include <ydb/core/fq/libs/compute/common/run_actor_params.h>
+
+#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h>
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/retry/retry_policy.h>
+
+namespace NFq {
+
+template<typename TRequest, typename TResponse, typename ...TArgs>
+class TRetryActor : public NActors::TActorBootstrapped<TRetryActor<TRequest, TResponse, TArgs...>> {
+public:
+ using TBase = NActors::TActorBootstrapped<TRetryActor<TRequest, TResponse, TArgs...>>;
+ using TBase::Become;
+ using TBase::PassAway;
+ using TBase::SelfId;
+ using TBase::Send;
+
+ using IRetryPolicy = IRetryPolicy<const typename TResponse::TPtr&>;
+
+ TRetryActor(const TComputeRequestCountersPtr& counters, const NActors::TActorId& sender, const NActors::TActorId& recipient, const TArgs&... args)
+ : Sender(sender)
+ , Recipient(recipient)
+ , CreateMessage([=]() {
+ return new TRequest(args...);
+ })
+ , RetryState(GetRetryPolicy()->CreateRetryState())
+ , Delay(TDuration::Zero())
+ , StartTime(TInstant::Now())
+ , Counters(counters)
+ {}
+
+ TRetryActor(const TComputeRequestCountersPtr& counters, const TDuration& delay, const NActors::TActorId& sender, const NActors::TActorId& recipient, const TArgs&... args)
+ : Sender(sender)
+ , Recipient(recipient)
+ , CreateMessage([=]() {
+ return new TRequest(args...);
+ })
+ , RetryState(GetRetryPolicy()->CreateRetryState())
+ , Delay(delay)
+ , StartTime(TInstant::Now())
+ , Counters(counters)
+ {}
+
+ void Bootstrap() {
+ Counters->InFly->Inc();
+ Become(&TRetryActor::StateFunc);
+ NActors::TActivationContext::Schedule(Delay, new NActors::IEventHandle(Recipient, static_cast<const NActors::TActorId&>(SelfId()), CreateMessage()));
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TResponse, Handle);
+ )
+
+ void Handle(const typename TResponse::TPtr& ev) {
+ const auto& response = *ev.Get()->Get();
+ auto delay = RetryState->GetNextRetryDelay(ev);
+ if (delay) {
+ Counters->Retry->Inc();
+ NActors::TActivationContext::Schedule(*delay, new NActors::IEventHandle(Recipient, static_cast<const NActors::TActorId&>(SelfId()), CreateMessage()));
+ return;
+ }
+ Counters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds());
+ if (response.Status != NYdb::EStatus::SUCCESS) {
+ Counters->Error->Inc();
+ } else {
+ Counters->Ok->Inc();
+ }
+ Send(ev->Forward(Sender));
+ PassAway();
+ }
+
+ static const typename IRetryPolicy::TPtr& GetRetryPolicy() {
+ static typename IRetryPolicy::TPtr policy = IRetryPolicy::GetExponentialBackoffPolicy([](const typename TResponse::TPtr& ev) {
+ const auto& status = ev->Get()->Status;
+ return RetryComputeClass(status);
+ }, TDuration::MilliSeconds(10), TDuration::MilliSeconds(200), TDuration::Seconds(30), 5);
+ return policy;
+ }
+
+ static bool IsTransportError(const NYdb::EStatus& status) {
+ return static_cast<size_t>(status) >= NYdb::TRANSPORT_STATUSES_FIRST
+ && static_cast<size_t>(status) <= NYdb::TRANSPORT_STATUSES_LAST;
+ }
+
+ static ERetryErrorClass RetryComputeClass(const NYdb::EStatus& status) {
+ if (IsTransportError(status)) {
+ return ERetryErrorClass::ShortRetry;
+ }
+
+ if (status == NYdb::EStatus::INTERNAL_ERROR
+ || status == NYdb::EStatus::UNAVAILABLE
+ || status == NYdb::EStatus::TIMEOUT
+ || status == NYdb::EStatus::BAD_SESSION
+ || status == NYdb::EStatus::SESSION_EXPIRED
+ || status == NYdb::EStatus::UNDETERMINED
+ || status == NYdb::EStatus::TRANSPORT_UNAVAILABLE
+ || status == NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED
+ || status == NYdb::EStatus::CLIENT_INTERNAL_ERROR
+ || status == NYdb::EStatus::CLIENT_OUT_OF_RANGE
+ || status == NYdb::EStatus::CLIENT_DISCOVERY_FAILED) {
+ return ERetryErrorClass::ShortRetry;
+ }
+
+ if (status == NYdb::EStatus::OVERLOADED
+ || status == NYdb::EStatus::SESSION_BUSY
+ || status == NYdb::EStatus::CLIENT_RESOURCE_EXHAUSTED
+ || status == NYdb::EStatus::CLIENT_LIMITS_REACHED) {
+ return ERetryErrorClass::LongRetry;
+ }
+ return ERetryErrorClass::NoRetry;
+ }
+
+ virtual ~TRetryActor() {
+ Counters->InFly->Dec();
+ }
+
+private:
+ NActors::TActorId Sender;
+ NActors::TActorId Recipient;
+ std::function<TRequest*()> CreateMessage;
+ typename IRetryPolicy::IRetryState::TPtr RetryState;
+ TDuration Delay;
+ TInstant StartTime;
+ TComputeRequestCountersPtr Counters;
+};
+
+} /* NFq */
diff --git a/ydb/core/fq/libs/actors/run_actor_params.cpp b/ydb/core/fq/libs/compute/common/run_actor_params.cpp
index 3728a0ddbc..21ed96052e 100644
--- a/ydb/core/fq/libs/actors/run_actor_params.cpp
+++ b/ydb/core/fq/libs/compute/common/run_actor_params.cpp
@@ -49,7 +49,9 @@ TRunActorParams::TRunActorParams(
TInstant requestStartedAt,
ui32 restartCount,
const TString& jobId,
- const Fq::Private::TaskResources& resources
+ const Fq::Private::TaskResources& resources,
+ const TString& executionId,
+ const TString& operationId
)
: YqSharedResources(yqSharedResources)
, CredentialsProviderFactory(credentialsProviderFactory)
@@ -96,7 +98,32 @@ TRunActorParams::TRunActorParams(
, RestartCount(restartCount)
, JobId(jobId)
, Resources(resources)
+ , ExecutionId(executionId)
+ , OperationId(operationId, true)
{
}
+IOutputStream& operator<<(IOutputStream& out, const TRunActorParams& params) {
+ return out << "Run actors params: { QueryId: " << params.QueryId
+ << " CloudId: " << params.CloudId
+ << " UserId: " << params.UserId
+ << " Owner: " << params.Owner
+ << " PreviousQueryRevision: " << params.PreviousQueryRevision
+ << " Connections: " << params.Connections.size()
+ << " Bindings: " << params.Bindings.size()
+ << " AccountIdSignatures: " << params.AccountIdSignatures.size()
+ << " QueryType: " << FederatedQuery::QueryContent::QueryType_Name(params.QueryType)
+ << " ExecuteMode: " << FederatedQuery::ExecuteMode_Name(params.ExecuteMode)
+ << " ResultId: " << params.ResultId
+ << " StateLoadMode: " << FederatedQuery::StateLoadMode_Name(params.StateLoadMode)
+ << " StreamingDisposition: " << params.StreamingDisposition
+ << " Status: " << FederatedQuery::QueryMeta::ComputeStatus_Name(params.Status)
+ << " DqGraphs: " << params.DqGraphs.size()
+ << " DqGraphIndex: " << params.DqGraphIndex
+ << " Resource.TopicConsumers: " << params.Resources.topic_consumers().size()
+ << " ExecutionId: " << params.ExecutionId
+ << " OperationId: " << (params.OperationId.GetKind() != Ydb::TOperationId::UNUSED ? ProtoToString(params.OperationId) : "<empty>")
+ << " }";
+}
+
} /* NFq */
diff --git a/ydb/core/fq/libs/actors/run_actor_params.h b/ydb/core/fq/libs/compute/common/run_actor_params.h
index 4114f62fa6..b5f0843d60 100644
--- a/ydb/core/fq/libs/actors/run_actor_params.h
+++ b/ydb/core/fq/libs/compute/common/run_actor_params.h
@@ -1,22 +1,23 @@
#pragma once
+
#include <ydb/core/fq/libs/config/protos/common.pb.h>
-#include <ydb/core/fq/libs/config/protos/pinger.pb.h>
#include <ydb/core/fq/libs/config/protos/fq_config.pb.h>
+#include <ydb/core/fq/libs/config/protos/pinger.pb.h>
#include <ydb/core/fq/libs/events/events.h>
#include <ydb/core/fq/libs/shared_resources/shared_resources.h>
-#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
+#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
#include <ydb/library/yql/providers/dq/provider/yql_dq_gateway.h>
#include <ydb/library/yql/providers/dq/worker_manager/interface/counters.h>
-#include <ydb/library/yql/providers/solomon/provider/yql_solomon_gateway.h>
#include <ydb/library/yql/providers/pq/cm_client/client.h>
+#include <ydb/library/yql/providers/solomon/provider/yql_solomon_gateway.h>
#include <ydb/public/lib/fq/scope.h>
#include <library/cpp/actors/core/actorsystem.h>
-#include <library/cpp/time_provider/time_provider.h>
#include <library/cpp/random_provider/random_provider.h>
+#include <library/cpp/time_provider/time_provider.h>
namespace NFq {
@@ -66,12 +67,16 @@ struct TRunActorParams { // TODO2 : Change name
TInstant requestStartedAt,
ui32 restartCount,
const TString& jobId,
- const Fq::Private::TaskResources& resources
+ const Fq::Private::TaskResources& resources,
+ const TString& executionId,
+ const TString& operationId
);
TRunActorParams(const TRunActorParams& params) = default;
TRunActorParams(TRunActorParams&& params) = default;
+ friend IOutputStream& operator<<(IOutputStream& out, const TRunActorParams& params);
+
TYqSharedResources::TPtr YqSharedResources;
NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory;
NYql::IHTTPGateway::TPtr S3Gateway;
@@ -120,6 +125,8 @@ struct TRunActorParams { // TODO2 : Change name
const ui32 RestartCount;
const TString JobId;
Fq::Private::TaskResources Resources;
+ TString ExecutionId;
+ NYdb::TOperation::TOperationId OperationId;
};
} /* NFq */
diff --git a/ydb/core/fq/libs/compute/ydb/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/compute/ydb/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..0dce7fd471
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,45 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(libs-compute-ydb)
+target_compile_options(libs-compute-ydb PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(libs-compute-ydb PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-protos
+ cpp-lwtrace-protos
+ libs-compute-common
+ libs-config-protos
+ libs-control_plane_storage-proto
+ libs-graph_params-proto
+ fq-libs-grpc
+ libs-quota_manager-proto
+ ydb-core-protos
+ library-db_pool-protos
+ yql-core-expr_nodes
+ yql-dq-expr_nodes
+ yql-minikql-arrow
+ dq-api-protos
+ api-grpc
+ api-grpc-draft
+ lib-operation_id-protos
+)
+target_sources(libs-compute-ydb PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/actors_factory.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/executer_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/stopper_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp
+)
diff --git a/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..cef154a1eb
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,46 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(libs-compute-ydb)
+target_compile_options(libs-compute-ydb PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(libs-compute-ydb PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-protos
+ cpp-lwtrace-protos
+ libs-compute-common
+ libs-config-protos
+ libs-control_plane_storage-proto
+ libs-graph_params-proto
+ fq-libs-grpc
+ libs-quota_manager-proto
+ ydb-core-protos
+ library-db_pool-protos
+ yql-core-expr_nodes
+ yql-dq-expr_nodes
+ yql-minikql-arrow
+ dq-api-protos
+ api-grpc
+ api-grpc-draft
+ lib-operation_id-protos
+)
+target_sources(libs-compute-ydb PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/actors_factory.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/executer_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/stopper_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp
+)
diff --git a/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..cef154a1eb
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,46 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(libs-compute-ydb)
+target_compile_options(libs-compute-ydb PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(libs-compute-ydb PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-protos
+ cpp-lwtrace-protos
+ libs-compute-common
+ libs-config-protos
+ libs-control_plane_storage-proto
+ libs-graph_params-proto
+ fq-libs-grpc
+ libs-quota_manager-proto
+ ydb-core-protos
+ library-db_pool-protos
+ yql-core-expr_nodes
+ yql-dq-expr_nodes
+ yql-minikql-arrow
+ dq-api-protos
+ api-grpc
+ api-grpc-draft
+ lib-operation_id-protos
+)
+target_sources(libs-compute-ydb PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/actors_factory.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/executer_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/stopper_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp
+)
diff --git a/ydb/core/fq/libs/compute/ydb/CMakeLists.txt b/ydb/core/fq/libs/compute/ydb/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/fq/libs/compute/ydb/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/compute/ydb/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..0dce7fd471
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,45 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(libs-compute-ydb)
+target_compile_options(libs-compute-ydb PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(libs-compute-ydb PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-protos
+ cpp-lwtrace-protos
+ libs-compute-common
+ libs-config-protos
+ libs-control_plane_storage-proto
+ libs-graph_params-proto
+ fq-libs-grpc
+ libs-quota_manager-proto
+ ydb-core-protos
+ library-db_pool-protos
+ yql-core-expr_nodes
+ yql-dq-expr_nodes
+ yql-minikql-arrow
+ dq-api-protos
+ api-grpc
+ api-grpc-draft
+ lib-operation_id-protos
+)
+target_sources(libs-compute-ydb PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/actors_factory.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/executer_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/stopper_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp
+)
diff --git a/ydb/core/fq/libs/compute/ydb/actors_factory.cpp b/ydb/core/fq/libs/compute/ydb/actors_factory.cpp
new file mode 100644
index 0000000000..6f299dd6aa
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/actors_factory.cpp
@@ -0,0 +1,87 @@
+#include "actors_factory.h"
+#include "executer_actor.h"
+#include "finalizer_actor.h"
+#include "resources_cleaner_actor.h"
+#include "result_writer_actor.h"
+#include "status_tracker_actor.h"
+#include "stopper_actor.h"
+#include "ydb_connector_actor.h"
+
+#include <ydb/core/fq/libs/compute/common/pinger.h>
+
+namespace NFq {
+
+struct TActorFactory : public IActorFactory {
+ TActorFactory(const NFq::TRunActorParams& params, const ::NYql::NCommon::TServiceCounters& counters)
+ : Params(params)
+ , Counters(counters)
+ {}
+
+ std::unique_ptr<NActors::IActor> CreatePinger(const NActors::TActorId& parent) const override {
+ return std::unique_ptr<NActors::IActor>(CreatePingerActor(
+ Params.TenantName,
+ Params.Scope,
+ Params.UserId,
+ Params.QueryId,
+ Params.Owner,
+ parent,
+ Params.Config.GetPinger(),
+ Params.Deadline,
+ Counters,
+ Params.CreatedAt,
+ true
+ ));
+ }
+
+ std::unique_ptr<NActors::IActor> CreateConnector() const override {
+ return CreateConnectorActor(Params);
+ }
+
+ std::unique_ptr<NActors::IActor> CreateExecuter(const NActors::TActorId &parent,
+ const NActors::TActorId &connector,
+ const NActors::TActorId &pinger) const override {
+ return CreateExecuterActor(Params, parent, connector, pinger, Counters);
+ }
+
+ std::unique_ptr<NActors::IActor> CreateStatusTracker(const NActors::TActorId &parent,
+ const NActors::TActorId &connector,
+ const NActors::TActorId &pinger,
+ const NYdb::TOperation::TOperationId& operationId) const override {
+ return CreateStatusTrackerActor(Params, parent, connector, pinger, operationId, Counters);
+ }
+
+ std::unique_ptr<NActors::IActor> CreateResultWriter(const NActors::TActorId& parent,
+ const NActors::TActorId& connector,
+ const NActors::TActorId& pinger,
+ const TString& executionId) const override {
+ return CreateResultWriterActor(Params, parent, connector, pinger, executionId, Counters);
+ }
+
+ std::unique_ptr<NActors::IActor> CreateResourcesCleaner(const NActors::TActorId& parent,
+ const NActors::TActorId& connector,
+ const NYdb::TOperation::TOperationId& operationId) const override {
+ return CreateResourcesCleanerActor(Params, parent, connector, operationId, Counters);
+ }
+
+ std::unique_ptr<NActors::IActor> CreateFinalizer(const NActors::TActorId& parent,
+ const NActors::TActorId& pinger,
+ NYdb::NQuery::EExecStatus execStatus) const override {
+ return CreateFinalizerActor(Params, parent, pinger, execStatus, Counters);
+ }
+
+ std::unique_ptr<NActors::IActor> CreateStopper(const NActors::TActorId& parent,
+ const NActors::TActorId& connector,
+ const NYdb::TOperation::TOperationId& operationId) const override {
+ return CreateStopperActor(Params, parent, connector, operationId, Counters);
+ }
+
+private:
+ NFq::TRunActorParams Params;
+ ::NYql::NCommon::TServiceCounters Counters;
+};
+
+IActorFactory::TPtr CreateActorFactory(const NFq::TRunActorParams& params, const ::NYql::NCommon::TServiceCounters& counters) {
+ return MakeIntrusive<TActorFactory>(params, counters);
+}
+
+}
diff --git a/ydb/core/fq/libs/compute/ydb/actors_factory.h b/ydb/core/fq/libs/compute/ydb/actors_factory.h
new file mode 100644
index 0000000000..77dd76b7e9
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/actors_factory.h
@@ -0,0 +1,43 @@
+#pragma once
+
+#include <ydb/core/fq/libs/compute/common/run_actor_params.h>
+
+#include <ydb/library/yql/providers/common/metrics/service_counters.h>
+
+#include <ydb/public/sdk/cpp/client/draft/ydb_query/query.h>
+
+#include <util/generic/ptr.h>
+
+namespace NFq {
+
+struct IActorFactory : public TThrRefBase {
+ using TPtr = TIntrusivePtr<IActorFactory>;
+
+ virtual std::unique_ptr<NActors::IActor> CreatePinger(const NActors::TActorId& parent) const = 0;
+ virtual std::unique_ptr<NActors::IActor> CreateConnector() const = 0;
+
+ virtual std::unique_ptr<NActors::IActor> CreateExecuter(const NActors::TActorId &parent,
+ const NActors::TActorId &connector,
+ const NActors::TActorId &pinger) const = 0;
+ virtual std::unique_ptr<NActors::IActor> CreateStatusTracker(const NActors::TActorId &parent,
+ const NActors::TActorId &connector,
+ const NActors::TActorId &pinger,
+ const NYdb::TOperation::TOperationId& operationId) const = 0;
+ virtual std::unique_ptr<NActors::IActor> CreateResultWriter(const NActors::TActorId& parent,
+ const NActors::TActorId& connector,
+ const NActors::TActorId& pinger,
+ const TString& executionId) const = 0;
+ virtual std::unique_ptr<NActors::IActor> CreateResourcesCleaner(const NActors::TActorId& parent,
+ const NActors::TActorId& connector,
+ const NYdb::TOperation::TOperationId& operationId) const = 0;
+ virtual std::unique_ptr<NActors::IActor> CreateFinalizer(const NActors::TActorId& parent,
+ const NActors::TActorId& pinger,
+ NYdb::NQuery::EExecStatus execStatus) const = 0;
+ virtual std::unique_ptr<NActors::IActor> CreateStopper(const NActors::TActorId& parent,
+ const NActors::TActorId& connector,
+ const NYdb::TOperation::TOperationId& operationId) const = 0;
+};
+
+IActorFactory::TPtr CreateActorFactory(const TRunActorParams& params, const ::NYql::NCommon::TServiceCounters& counters);
+
+}
diff --git a/ydb/core/fq/libs/compute/ydb/base_compute_actor.h b/ydb/core/fq/libs/compute/ydb/base_compute_actor.h
new file mode 100644
index 0000000000..8148271ec2
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/base_compute_actor.h
@@ -0,0 +1,61 @@
+#pragma once
+
+#include <ydb/core/fq/libs/compute/common/run_actor_params.h>
+
+#include <ydb/core/fq/libs/compute/common/metrics.h>
+
+#include <ydb/library/yql/providers/common/metrics/service_counters.h>
+
+#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h>
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/retry/retry_policy.h>
+
+namespace NFq {
+
+template<typename TDerived>
+class TBaseComputeActor : public NActors::TActorBootstrapped<TDerived> {
+public:
+ using TBase = NActors::TActorBootstrapped<TDerived>;
+ using TBase::PassAway;
+
+ TBaseComputeActor(const ::NYql::NCommon::TServiceCounters& queryCounters, const TString& stepName)
+ : Counters(MakeIntrusive<TComputeRequestCounters>("Total", queryCounters.Counters->GetSubgroup("step", stepName)))
+ , TotalStartTime(TInstant::Now())
+ {}
+
+ void Bootstrap() {
+ Counters->Register();
+ Counters->InFly->Inc();
+ AsDerived()->Start();
+ }
+
+ TDerived* AsDerived() {
+ return static_cast<TDerived*>(this);
+ }
+
+ void CompleteAndPassAway() {
+ Counters->Ok->Inc();
+ PassAway();
+ }
+
+ void FailedAndPassAway() {
+ Counters->Error->Inc();
+ PassAway();
+ }
+
+ virtual ~TBaseComputeActor() {
+ Counters->InFly->Dec();
+ Counters->LatencyMs->Collect((TInstant::Now() - TotalStartTime).MilliSeconds());
+ }
+
+ ::NMonitoring::TDynamicCounterPtr GetStepCountersSubgroup() const {
+ return Counters->Counters;
+ }
+
+private:
+ TComputeRequestCountersPtr Counters;
+ TInstant TotalStartTime;
+};
+
+} /* NFq */
diff --git a/ydb/core/fq/libs/compute/ydb/events/events.h b/ydb/core/fq/libs/compute/ydb/events/events.h
new file mode 100644
index 0000000000..1c041a59d2
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/events/events.h
@@ -0,0 +1,232 @@
+#pragma once
+
+#include <ydb/core/fq/libs/control_plane_storage/events/events.h>
+#include <ydb/core/fq/libs/quota_manager/events/events.h>
+
+#include <ydb/public/api/protos/draft/fq.pb.h>
+#include <ydb/public/sdk/cpp/client/draft/ydb_query/query.h>
+
+#include <library/cpp/actors/core/event_pb.h>
+#include <library/cpp/actors/core/events.h>
+#include <library/cpp/actors/interconnect/events_local.h>
+
+#include <ydb/library/yql/public/issue/yql_issue.h>
+
+namespace NFq {
+
+struct TEvPrivate {
+ // Event ids
+ enum EEv : ui32 {
+ EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
+
+ EvExecuteScriptRequest = EvBegin,
+ EvExecuteScriptResponse,
+ EvGetOperationRequest,
+ EvGetOperationResponse,
+ EvFetchScriptResultRequest,
+ EvFetchScriptResultResponse,
+ EvCancelOperationRequest,
+ EvCancelOperationResponse,
+ EvForgetOperationRequest,
+ EvForgetOperationResponse,
+
+ EvExecuterResponse,
+ EvStatusTrackerResponse,
+ EvResultWriterResponse,
+ EvResourcesCleanerResponse,
+ EvFinalizerResponse,
+ EvStopperResponse,
+
+ EvEnd
+ };
+
+ static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)");
+
+ // Events
+ struct TEvExecuteScriptRequest : public NActors::TEventLocal<TEvExecuteScriptRequest, EvExecuteScriptRequest> {
+ TEvExecuteScriptRequest(TString sql, TString idempotencyKey)
+ : Sql(std::move(sql))
+ , IdempotencyKey(std::move(idempotencyKey))
+ {}
+
+ TString Sql;
+ TString IdempotencyKey;
+ };
+
+ struct TEvExecuteScriptResponse : public NActors::TEventLocal<TEvExecuteScriptResponse, EvExecuteScriptResponse> {
+ TEvExecuteScriptResponse(NYql::TIssues issues, NYdb::EStatus status)
+ : Issues(issues)
+ , Status(status)
+ {}
+
+ TEvExecuteScriptResponse(NYdb::TOperation::TOperationId operationId, const TString& executionId)
+ : OperationId(operationId)
+ , ExecutionId(executionId)
+ , Status(NYdb::EStatus::SUCCESS)
+ {}
+
+ NYdb::TOperation::TOperationId OperationId;
+ TString ExecutionId;
+ NYql::TIssues Issues;
+ NYdb::EStatus Status;
+ };
+
+ struct TEvGetOperationRequest : public NActors::TEventLocal<TEvGetOperationRequest, EvGetOperationRequest> {
+ explicit TEvGetOperationRequest(const NYdb::TOperation::TOperationId& operationId)
+ : OperationId(operationId)
+ {}
+
+ NYdb::TOperation::TOperationId OperationId;
+ };
+
+ struct TEvGetOperationResponse : public NActors::TEventLocal<TEvGetOperationResponse, EvGetOperationResponse> {
+ TEvGetOperationResponse(NYql::TIssues issues, NYdb::EStatus status)
+ : Issues(issues)
+ , Status(status)
+ {}
+
+ TEvGetOperationResponse(NYdb::NQuery::EExecStatus execStatus, NYql::TIssues issues)
+ : ExecStatus(execStatus)
+ , Issues(issues)
+ , Status(NYdb::EStatus::SUCCESS)
+ {}
+
+ NYdb::NQuery::EExecStatus ExecStatus = NYdb::NQuery::EExecStatus::Unspecified;
+ NYql::TIssues Issues;
+ NYdb::EStatus Status;
+ };
+
+ struct TEvFetchScriptResultRequest : public NActors::TEventLocal<TEvFetchScriptResultRequest, EvFetchScriptResultRequest> {
+ TEvFetchScriptResultRequest(int64_t rowOffset, TString executionId)
+ : RowOffset(rowOffset)
+ , ExecutionId(std::move(executionId))
+ {}
+
+ int64_t RowOffset = 0;
+ TString ExecutionId;
+ };
+
+ struct TEvFetchScriptResultResponse : public NActors::TEventLocal<TEvFetchScriptResultResponse, EvFetchScriptResultResponse> {
+ TEvFetchScriptResultResponse(NYql::TIssues issues, NYdb::EStatus status)
+ : Issues(issues)
+ , Status(status)
+ {}
+
+ explicit TEvFetchScriptResultResponse(NYdb::TResultSet resultSet)
+ : ResultSet(std::move(resultSet))
+ , Status(NYdb::EStatus::SUCCESS)
+ {}
+
+ TMaybe<NYdb::TResultSet> ResultSet;
+ NYql::TIssues Issues;
+ NYdb::EStatus Status;
+ };
+
+ struct TEvCancelOperationRequest : public NActors::TEventLocal<TEvCancelOperationRequest, EvCancelOperationRequest> {
+ explicit TEvCancelOperationRequest(const NYdb::TOperation::TOperationId& operationId)
+ : OperationId(operationId)
+ {}
+
+ NYdb::TOperation::TOperationId OperationId;
+ };
+
+ struct TEvCancelOperationResponse : public NActors::TEventLocal<TEvCancelOperationResponse, EvCancelOperationResponse> {
+ TEvCancelOperationResponse(NYql::TIssues issues, NYdb::EStatus status)
+ : Issues(issues)
+ , Status(status)
+ {}
+
+ NYql::TIssues Issues;
+ NYdb::EStatus Status;
+ };
+
+ struct TEvForgetOperationRequest : public NActors::TEventLocal<TEvForgetOperationRequest, EvForgetOperationRequest> {
+ explicit TEvForgetOperationRequest(const NYdb::TOperation::TOperationId& operationId)
+ : OperationId(operationId)
+ {}
+
+ NYdb::TOperation::TOperationId OperationId;
+ };
+
+ struct TEvForgetOperationResponse : public NActors::TEventLocal<TEvForgetOperationResponse, EvForgetOperationResponse> {
+ TEvForgetOperationResponse(NYql::TIssues issues, NYdb::EStatus status)
+ : Issues(issues)
+ , Status(status)
+ {}
+
+ NYql::TIssues Issues;
+ NYdb::EStatus Status;
+ };
+
+ struct TEvExecuterResponse : public NActors::TEventLocal<TEvExecuterResponse, EvExecuterResponse> {
+ TEvExecuterResponse(NYdb::TOperation::TOperationId operationId, const TString& executionId)
+ : OperationId(operationId)
+ , ExecutionId(executionId)
+ , Success(true)
+ {}
+
+ explicit TEvExecuterResponse(const NYql::TIssues& issues)
+ : Success(false)
+ , Issues(issues)
+ {}
+
+ NYdb::TOperation::TOperationId OperationId;
+ TString ExecutionId;
+ bool Success = true;
+ NYql::TIssues Issues;
+ };
+
+ struct TEvStatusTrackerResponse : public NActors::TEventLocal<TEvStatusTrackerResponse, EvStatusTrackerResponse> {
+ TEvStatusTrackerResponse(NYql::TIssues issues, NYdb::EStatus status, NYdb::NQuery::EExecStatus execStatus)
+ : Issues(issues)
+ , Status(status)
+ , ExecStatus(execStatus)
+ {}
+
+ NYql::TIssues Issues;
+ NYdb::EStatus Status;
+ NYdb::NQuery::EExecStatus ExecStatus;
+ };
+
+ struct TEvResultWriterResponse : public NActors::TEventLocal<TEvResultWriterResponse, EvResultWriterResponse> {
+ TEvResultWriterResponse(NYql::TIssues issues, NYdb::EStatus status)
+ : Issues(issues)
+ , Status(status)
+ {}
+
+ NYql::TIssues Issues;
+ NYdb::EStatus Status;
+ };
+
+ struct TEvResourcesCleanerResponse : public NActors::TEventLocal<TEvResourcesCleanerResponse, EvResourcesCleanerResponse> {
+ TEvResourcesCleanerResponse(NYql::TIssues issues, NYdb::EStatus status)
+ : Issues(issues)
+ , Status(status)
+ {}
+
+ NYql::TIssues Issues;
+ NYdb::EStatus Status;
+ };
+
+ struct TEvFinalizerResponse : public NActors::TEventLocal<TEvFinalizerResponse, EvFinalizerResponse> {
+ TEvFinalizerResponse(NYql::TIssues issues, NYdb::EStatus status)
+ : Issues(issues)
+ , Status(status)
+ {}
+
+ NYql::TIssues Issues;
+ NYdb::EStatus Status;
+ };
+
+ struct TEvStopperResponse : public NActors::TEventLocal<TEvStopperResponse, EvStopperResponse> {
+ TEvStopperResponse(NYql::TIssues issues, NYdb::EStatus status)
+ : Issues(issues)
+ , Status(status)
+ {}
+
+ NYql::TIssues Issues;
+ NYdb::EStatus Status;
+ };
+};
+
+}
diff --git a/ydb/core/fq/libs/compute/ydb/executer_actor.cpp b/ydb/core/fq/libs/compute/ydb/executer_actor.cpp
new file mode 100644
index 0000000000..563c3151cd
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/executer_actor.cpp
@@ -0,0 +1,150 @@
+#include "base_compute_actor.h"
+
+#include <ydb/core/fq/libs/common/util.h>
+#include <ydb/core/fq/libs/compute/common/metrics.h>
+#include <ydb/core/fq/libs/compute/common/retry_actor.h>
+#include <ydb/core/fq/libs/compute/common/run_actor_params.h>
+#include <ydb/core/fq/libs/compute/ydb/events/events.h>
+#include <ydb/core/fq/libs/ydb/ydb.h>
+#include <ydb/core/protos/services.pb.h>
+
+#include <ydb/public/sdk/cpp/client/draft/ydb_query/client.h>
+#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
+
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/log.h>
+
+
+#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ExecuterActor] QueryId: " << Params.QueryId << " " << stream)
+#define LOG_W(stream) LOG_WARN_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ExecuterActor] QueryId: " << Params.QueryId << " " << stream)
+#define LOG_I(stream) LOG_INFO_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ExecuterActor] QueryId: " << Params.QueryId << " " << stream)
+#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ExecuterActor] QueryId: " << Params.QueryId << " " << stream)
+#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ExecuterActor] QueryId: " << Params.QueryId << " " << stream)
+
+namespace NFq {
+
+using namespace NActors;
+using namespace NFq;
+
+class TExecuterActor : public TBaseComputeActor<TExecuterActor> {
+public:
+ enum ERequestType {
+ RT_EXECUTE_SCRIPT,
+ RT_PING,
+ RT_MAX
+ };
+
+ class TCounters: public virtual TThrRefBase {
+ std::array<TComputeRequestCountersPtr, RT_MAX> Requests = CreateArray<RT_MAX, TComputeRequestCountersPtr>({
+ { MakeIntrusive<TComputeRequestCounters>("ExecuteScript") },
+ { MakeIntrusive<TComputeRequestCounters>("Ping") }
+ });
+
+ ::NMonitoring::TDynamicCounterPtr Counters;
+
+ public:
+ explicit TCounters(const ::NMonitoring::TDynamicCounterPtr& counters)
+ : Counters(counters)
+ {
+ for (auto& request: Requests) {
+ request->Register(Counters);
+ }
+ }
+
+ TComputeRequestCountersPtr GetCounters(ERequestType type) {
+ return Requests[type];
+ }
+ };
+
+ TExecuterActor(const TRunActorParams& params, const TActorId& parent, const TActorId& connector, const TActorId& pinger, const ::NYql::NCommon::TServiceCounters& queryCounters)
+ : TBaseComputeActor(queryCounters, "Executer")
+ , Params(params)
+ , Parent(parent)
+ , Connector(connector)
+ , Pinger(pinger)
+ , Counters(GetStepCountersSubgroup())
+ {}
+
+ static constexpr char ActorName[] = "FQ_EXECUTER_ACTOR";
+
+ void Start() {
+ LOG_I("Bootstrap");
+ Become(&TExecuterActor::StateFunc);
+ SendExecuteScript();
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvPrivate::TEvExecuteScriptResponse, Handle);
+ hFunc(TEvents::TEvForwardPingResponse, Handle);
+ )
+
+ void Handle(const TEvents::TEvForwardPingResponse::TPtr& ev) {
+ auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
+ pingCounters->InFly->Dec();
+ pingCounters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds());
+ if (ev.Get()->Get()->Success) {
+ pingCounters->Ok->Inc();
+ LOG_I("Information about the operation id and execution id is stored. ExecutionId: " << ExecutionId << " OperationId: " << ProtoToString(OperationId));
+ Send(Parent, new TEvPrivate::TEvExecuterResponse(OperationId, ExecutionId));
+ CompleteAndPassAway();
+ } else {
+ pingCounters->Error->Inc();
+ // Without the idempotency key, we lose the running operation here
+ LOG_E("Error saving information about the operation id and execution id. ExecutionId: " << ExecutionId << " OperationId: " << ProtoToString(OperationId));
+ Send(Parent, new TEvPrivate::TEvExecuterResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error saving information about the operation id and execution id. ExecutionId: " << ExecutionId << " OperationId: " << ProtoToString(OperationId)}}));
+ FailedAndPassAway();
+ }
+ }
+
+ void Handle(const TEvPrivate::TEvExecuteScriptResponse::TPtr& ev) {
+ const auto& response = *ev.Get()->Get();
+ if (response.Status != NYdb::EStatus::SUCCESS) {
+ LOG_E("Can't execute script: " << ev->Get()->Issues.ToOneLineString());
+ Send(Parent, new TEvPrivate::TEvExecuterResponse(ev->Get()->Issues));
+ FailedAndPassAway();
+ return;
+ }
+ ExecutionId = response.ExecutionId;
+ OperationId = response.OperationId;
+ LOG_I("Execution has been created. ExecutionId: " << ExecutionId << " OperationId: " << ProtoToString(OperationId));
+ SendPingTask();
+ }
+
+ void SendExecuteScript() {
+ Register(new TRetryActor<TEvPrivate::TEvExecuteScriptRequest, TEvPrivate::TEvExecuteScriptResponse, TString, TString>(Counters.GetCounters(ERequestType::RT_EXECUTE_SCRIPT), SelfId(), Connector, Params.Sql, Params.JobId));
+ }
+
+ void SendPingTask() {
+ Fq::Private::PingTaskRequest pingTaskRequest;
+ pingTaskRequest.set_execution_id(ExecutionId);
+ pingTaskRequest.set_operation_id(ProtoToString(OperationId));
+ pingTaskRequest.set_status(::FederatedQuery::QueryMeta::RUNNING);
+ auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
+ pingCounters->InFly->Inc();
+ StartTime = TInstant::Now();
+ Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest));
+ }
+
+private:
+ TRunActorParams Params;
+ TActorId Parent;
+ TActorId Connector;
+ TActorId Pinger;
+ NYdb::TOperation::TOperationId OperationId;
+ TString ExecutionId;
+ TCounters Counters;
+ TInstant StartTime;
+};
+
+std::unique_ptr<NActors::IActor> CreateExecuterActor(const TRunActorParams& params,
+ const TActorId& parent,
+ const TActorId& connector,
+ const TActorId& pinger,
+ const ::NYql::NCommon::TServiceCounters& queryCounters) {
+ return std::make_unique<TExecuterActor>(params, parent, connector, pinger, queryCounters);
+}
+
+}
diff --git a/ydb/core/fq/libs/compute/ydb/executer_actor.h b/ydb/core/fq/libs/compute/ydb/executer_actor.h
new file mode 100644
index 0000000000..7d50f267ff
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/executer_actor.h
@@ -0,0 +1,17 @@
+#pragma once
+
+#include <ydb/core/fq/libs/compute/common/run_actor_params.h>
+
+#include <ydb/library/yql/providers/common/metrics/service_counters.h>
+
+#include <library/cpp/actors/core/actor.h>
+
+namespace NFq {
+
+std::unique_ptr<NActors::IActor> CreateExecuterActor(const TRunActorParams& params,
+ const NActors::TActorId& parent,
+ const NActors::TActorId& connector,
+ const NActors::TActorId& pinger,
+ const ::NYql::NCommon::TServiceCounters& queryCounters);
+
+}
diff --git a/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp b/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp
new file mode 100644
index 0000000000..137169651c
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp
@@ -0,0 +1,124 @@
+#include "base_compute_actor.h"
+
+#include <ydb/core/fq/libs/common/util.h>
+#include <ydb/core/fq/libs/compute/common/metrics.h>
+#include <ydb/core/fq/libs/compute/common/run_actor_params.h>
+#include <ydb/core/fq/libs/compute/ydb/events/events.h>
+#include <ydb/core/fq/libs/ydb/ydb.h>
+#include <ydb/core/protos/services.pb.h>
+
+#include <ydb/library/yql/providers/common/metrics/service_counters.h>
+
+#include <ydb/public/sdk/cpp/client/draft/ydb_query/client.h>
+#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
+
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/log.h>
+
+
+#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [Finalizer] QueryId: " << Params.QueryId << " " << stream)
+#define LOG_W(stream) LOG_WARN_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [Finalizer] QueryId: " << Params.QueryId << " " << stream)
+#define LOG_I(stream) LOG_INFO_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [Finalizer] QueryId: " << Params.QueryId << " " << stream)
+#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [Finalizer] QueryId: " << Params.QueryId << " " << stream)
+#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [Finalizer] QueryId: " << Params.QueryId << " " << stream)
+
+namespace NFq {
+
+using namespace NActors;
+using namespace NFq;
+
+class TFinalizerActor : public TBaseComputeActor<TFinalizerActor> {
+public:
+ enum ERequestType {
+ RT_PING,
+ RT_MAX
+ };
+
+ class TCounters: public virtual TThrRefBase {
+ std::array<TComputeRequestCountersPtr, RT_MAX> Requests = CreateArray<RT_MAX, TComputeRequestCountersPtr>({
+ { MakeIntrusive<TComputeRequestCounters>("Ping") }
+ });
+
+ ::NMonitoring::TDynamicCounterPtr Counters;
+
+ public:
+ explicit TCounters(const ::NMonitoring::TDynamicCounterPtr& counters)
+ : Counters(counters)
+ {
+ for (auto& request: Requests) {
+ request->Register(Counters);
+ }
+ }
+
+ TComputeRequestCountersPtr GetCounters(ERequestType type) {
+ return Requests[type];
+ }
+ };
+
+ TFinalizerActor(const TRunActorParams& params, const TActorId& parent, const TActorId& pinger, NYdb::NQuery::EExecStatus execStatus, const ::NYql::NCommon::TServiceCounters& queryCounters)
+ : TBaseComputeActor(queryCounters, "Finalizer")
+ , Params(params)
+ , Parent(parent)
+ , Pinger(pinger)
+ , ExecStatus(execStatus)
+ , Counters(GetStepCountersSubgroup())
+ , StartTime(TInstant::Now())
+ {}
+
+ static constexpr char ActorName[] = "FQ_FINALIZER_ACTOR";
+
+ void Start() {
+ LOG_I("Start finalizer actor. Compute state: " << FederatedQuery::QueryMeta::ComputeStatus_Name(Params.Status));
+ auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
+ pingCounters->InFly->Inc();
+ Become(&TFinalizerActor::StateFunc);
+ Fq::Private::PingTaskRequest pingTaskRequest;
+ if (ExecStatus == NYdb::NQuery::EExecStatus::Completed || Params.Status == FederatedQuery::QueryMeta::COMPLETING) {
+ pingTaskRequest.mutable_result_id()->set_value(Params.ResultId);
+ }
+ pingTaskRequest.set_status(ExecStatus == NYdb::NQuery::EExecStatus::Completed || Params.Status == FederatedQuery::QueryMeta::COMPLETING ? ::FederatedQuery::QueryMeta::COMPLETED : ::FederatedQuery::QueryMeta::FAILED);
+ Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest, true));
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvents::TEvForwardPingResponse, Handle);
+ )
+
+ void Handle(const TEvents::TEvForwardPingResponse::TPtr& ev) {
+ auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
+ pingCounters->InFly->Dec();
+ pingCounters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds());
+ if (ev.Get()->Get()->Success) {
+ pingCounters->Ok->Inc();
+ LOG_I("Query moved to terminal state ");
+ Send(Parent, new TEvPrivate::TEvFinalizerResponse({}, NYdb::EStatus::SUCCESS));
+ CompleteAndPassAway();
+ } else {
+ pingCounters->Error->Inc();
+ LOG_E("Error moving the query to the terminal state");
+ Send(Parent, new TEvPrivate::TEvFinalizerResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error moving the query to the terminal state"}}, NYdb::EStatus::INTERNAL_ERROR));
+ FailedAndPassAway();
+ }
+ }
+
+private:
+ TRunActorParams Params;
+ TActorId Parent;
+ TActorId Pinger;
+ NYdb::NQuery::EExecStatus ExecStatus;
+ TCounters Counters;
+ TInstant StartTime;
+};
+
+std::unique_ptr<NActors::IActor> CreateFinalizerActor(const TRunActorParams& params,
+ const TActorId& parent,
+ const TActorId& pinger,
+ NYdb::NQuery::EExecStatus execStatus,
+ const ::NYql::NCommon::TServiceCounters& queryCounters) {
+ return std::make_unique<TFinalizerActor>(params, parent, pinger, execStatus, queryCounters);
+}
+
+}
diff --git a/ydb/core/fq/libs/compute/ydb/finalizer_actor.h b/ydb/core/fq/libs/compute/ydb/finalizer_actor.h
new file mode 100644
index 0000000000..1435328285
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/finalizer_actor.h
@@ -0,0 +1,19 @@
+#pragma once
+
+#include <ydb/core/fq/libs/compute/common/run_actor_params.h>
+
+#include <ydb/library/yql/providers/common/metrics/service_counters.h>
+
+#include <ydb/public/sdk/cpp/client/draft/ydb_query/query.h>
+
+#include <library/cpp/actors/core/actor.h>
+
+namespace NFq {
+
+std::unique_ptr<NActors::IActor> CreateFinalizerActor(const TRunActorParams& params,
+ const NActors::TActorId& parent,
+ const NActors::TActorId& pinger,
+ NYdb::NQuery::EExecStatus execStatus,
+ const ::NYql::NCommon::TServiceCounters& queryCounters);
+
+}
diff --git a/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.cpp b/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.cpp
new file mode 100644
index 0000000000..90c0fda4ca
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.cpp
@@ -0,0 +1,111 @@
+#include "base_compute_actor.h"
+#include "resources_cleaner_actor.h"
+
+#include <ydb/core/fq/libs/common/util.h>
+#include <ydb/core/fq/libs/compute/common/metrics.h>
+#include <ydb/core/fq/libs/compute/common/retry_actor.h>
+#include <ydb/core/fq/libs/compute/common/run_actor_params.h>
+#include <ydb/core/fq/libs/compute/ydb/events/events.h>
+#include <ydb/core/fq/libs/ydb/ydb.h>
+#include <ydb/core/protos/services.pb.h>
+
+#include <ydb/public/sdk/cpp/client/draft/ydb_query/client.h>
+#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
+
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/log.h>
+
+
+#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ResourcesCleaner] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream)
+#define LOG_W(stream) LOG_WARN_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ResourcesCleaner] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream)
+#define LOG_I(stream) LOG_INFO_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ResourcesCleaner] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream)
+#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ResourcesCleaner] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream)
+#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ResourcesCleaner] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream)
+
+namespace NFq {
+
+using namespace NActors;
+using namespace NFq;
+
+class TResourcesCleanerActor : public TBaseComputeActor<TResourcesCleanerActor> {
+public:
+ enum ERequestType {
+ RT_FORGET_OPERATION,
+ RT_MAX
+ };
+
+ class TCounters: public virtual TThrRefBase {
+ std::array<TComputeRequestCountersPtr, RT_MAX> Requests = CreateArray<RT_MAX, TComputeRequestCountersPtr>({
+ { MakeIntrusive<TComputeRequestCounters>("ForgetOperation") }
+ });
+
+ ::NMonitoring::TDynamicCounterPtr Counters;
+
+ public:
+ explicit TCounters(const ::NMonitoring::TDynamicCounterPtr& counters)
+ : Counters(counters)
+ {
+ for (auto& request: Requests) {
+ request->Register(Counters);
+ }
+ }
+
+ TComputeRequestCountersPtr GetCounters(ERequestType type) {
+ return Requests[type];
+ }
+ };
+
+ TResourcesCleanerActor(const TRunActorParams& params, const TActorId& parent, const TActorId& connector, const NYdb::TOperation::TOperationId& operationId, const ::NYql::NCommon::TServiceCounters& queryCounters)
+ : TBaseComputeActor(queryCounters, "ResourcesCleaner")
+ , Params(params)
+ , Parent(parent)
+ , Connector(connector)
+ , OperationId(operationId)
+ , Counters(GetStepCountersSubgroup())
+ {}
+
+ static constexpr char ActorName[] = "FQ_RESOURCES_CLEANER_ACTOR";
+
+ void Start() {
+ LOG_I("Start resources cleaner actor. Compute state: " << FederatedQuery::QueryMeta::ComputeStatus_Name(Params.Status));
+ Become(&TResourcesCleanerActor::StateFunc);
+ Register(new TRetryActor<TEvPrivate::TEvForgetOperationRequest, TEvPrivate::TEvForgetOperationResponse, NYdb::TOperation::TOperationId>(Counters.GetCounters(ERequestType::RT_FORGET_OPERATION), SelfId(), Connector, OperationId));
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvPrivate::TEvForgetOperationResponse, Handle);
+ )
+
+ void Handle(const TEvPrivate::TEvForgetOperationResponse::TPtr& ev) {
+ const auto& response = *ev.Get()->Get();
+ if (response.Status != NYdb::EStatus::SUCCESS) {
+ LOG_E("Can't forget operation: " << ev->Get()->Issues.ToOneLineString());
+ Send(Parent, new TEvPrivate::TEvResourcesCleanerResponse(ev->Get()->Issues, ev->Get()->Status));
+ FailedAndPassAway();
+ return;
+ }
+ LOG_I("Operation successfully forgotten");
+ Send(Parent, new TEvPrivate::TEvResourcesCleanerResponse({}, NYdb::EStatus::SUCCESS));
+ CompleteAndPassAway();
+ }
+
+private:
+ TRunActorParams Params;
+ TActorId Parent;
+ TActorId Connector;
+ NYdb::TOperation::TOperationId OperationId;
+ TCounters Counters;
+};
+
+std::unique_ptr<NActors::IActor> CreateResourcesCleanerActor(const TRunActorParams& params,
+ const TActorId& parent,
+ const TActorId& connector,
+ const NYdb::TOperation::TOperationId& operationId,
+ const ::NYql::NCommon::TServiceCounters& queryCounters) {
+ return std::make_unique<TResourcesCleanerActor>(params, parent, connector, operationId, queryCounters);
+}
+
+}
diff --git a/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.h b/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.h
new file mode 100644
index 0000000000..0d4e796b43
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.h
@@ -0,0 +1,17 @@
+#pragma once
+
+#include <ydb/core/fq/libs/compute/common/run_actor_params.h>
+
+#include <ydb/library/yql/providers/common/metrics/service_counters.h>
+
+#include <library/cpp/actors/core/actor.h>
+
+namespace NFq {
+
+std::unique_ptr<NActors::IActor> CreateResourcesCleanerActor(const TRunActorParams& params,
+ const NActors::TActorId& parent,
+ const NActors::TActorId& connector,
+ const NYdb::TOperation::TOperationId& operationId,
+ const ::NYql::NCommon::TServiceCounters& queryCounters);
+
+}
diff --git a/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp b/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp
new file mode 100644
index 0000000000..6f46f98cf8
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp
@@ -0,0 +1,196 @@
+#include "base_compute_actor.h"
+
+#include <ydb/core/fq/libs/common/util.h>
+#include <ydb/core/fq/libs/compute/common/metrics.h>
+#include <ydb/core/fq/libs/compute/common/retry_actor.h>
+#include <ydb/core/fq/libs/compute/common/run_actor_params.h>
+#include <ydb/core/fq/libs/compute/ydb/events/events.h>
+#include <ydb/core/fq/libs/private_client/events.h>
+#include <ydb/core/fq/libs/ydb/ydb.h>
+#include <ydb/core/protos/services.pb.h>
+
+#include <ydb/library/yql/providers/common/metrics/service_counters.h>
+
+#include <ydb/public/sdk/cpp/client/draft/ydb_query/client.h>
+#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
+
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/log.h>
+#include <library/cpp/protobuf/interop/cast.h>
+
+#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ResultWriter] QueryId: " << Params.QueryId << " ExecutionId: " << ExecutionId << " " << stream)
+#define LOG_W(stream) LOG_WARN_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ResultWriter] QueryId: " << Params.QueryId << " ExecutionId: " << ExecutionId << " " << stream)
+#define LOG_I(stream) LOG_INFO_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ResultWriter] QueryId: " << Params.QueryId << " ExecutionId: " << ExecutionId << " " << stream)
+#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ResultWriter] QueryId: " << Params.QueryId << " ExecutionId: " << ExecutionId << " " << stream)
+#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ResultWriter] QueryId: " << Params.QueryId << " ExecutionId: " << ExecutionId << " " << stream)
+
+namespace NFq {
+
+using namespace NActors;
+using namespace NFq;
+
+class TResultWriterActor : public TBaseComputeActor<TResultWriterActor> {
+public:
+ enum ERequestType {
+ RT_FETCH_SCRIPT_RESULT,
+ RT_WRITE_RESULT,
+ RT_PING,
+ RT_MAX
+ };
+
+ class TCounters: public virtual TThrRefBase {
+ std::array<TComputeRequestCountersPtr, RT_MAX> Requests = CreateArray<RT_MAX, TComputeRequestCountersPtr>({
+ { MakeIntrusive<TComputeRequestCounters>("FetchScriptResult") },
+ { MakeIntrusive<TComputeRequestCounters>("WriteResult") },
+ { MakeIntrusive<TComputeRequestCounters>("Ping") }
+ });
+
+ ::NMonitoring::TDynamicCounterPtr Counters;
+
+ public:
+ explicit TCounters(const ::NMonitoring::TDynamicCounterPtr& counters)
+ : Counters(counters)
+ {
+ for (auto& request: Requests) {
+ request->Register(Counters);
+ }
+ }
+
+ TComputeRequestCountersPtr GetCounters(ERequestType type) {
+ return Requests[type];
+ }
+ };
+
+ TResultWriterActor(const TRunActorParams& params, const TActorId& parent, const TActorId& connector, const TActorId& pinger, const TString& executionId, const ::NYql::NCommon::TServiceCounters& queryCounters)
+ : TBaseComputeActor(queryCounters, "ResultWriter")
+ , Params(params)
+ , Parent(parent)
+ , Connector(connector)
+ , Pinger(pinger)
+ , ExecutionId(executionId)
+ , Counters(GetStepCountersSubgroup())
+ {}
+
+ static constexpr char ActorName[] = "FQ_RESULT_WRITER_ACTOR";
+
+ void Start() {
+ LOG_I("Start result writer actor. Compute state: " << FederatedQuery::QueryMeta::ComputeStatus_Name(Params.Status));
+ Become(&TResultWriterActor::StateFunc);
+ SendFetchScriptResultRequest();
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvPrivate::TEvFetchScriptResultResponse, Handle);
+ hFunc(NFq::TEvInternalService::TEvWriteResultResponse, Handle);
+ hFunc(TEvents::TEvForwardPingResponse, Handle);
+ )
+
+ void Handle(const TEvents::TEvForwardPingResponse::TPtr& ev) {
+ auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
+ pingCounters->InFly->Dec();
+ pingCounters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds());
+ if (ev.Get()->Get()->Success) {
+ pingCounters->Ok->Inc();
+ LOG_I("The result has been moved");
+ Send(Parent, new TEvPrivate::TEvResultWriterResponse({}, NYdb::EStatus::SUCCESS));
+ CompleteAndPassAway();
+ } else {
+ pingCounters->Error->Inc();
+ LOG_E("Move result error");
+ Send(Parent, new TEvPrivate::TEvResultWriterResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Move result error. ExecutionId: " << ExecutionId}}, NYdb::EStatus::INTERNAL_ERROR));
+ FailedAndPassAway();
+ }
+ }
+
+ void Handle(const TEvPrivate::TEvFetchScriptResultResponse::TPtr& ev) {
+ const auto& response = *ev.Get()->Get();
+ if (response.Status != NYdb::EStatus::SUCCESS) {
+ LOG_E("Can't fetch script result: " << ev->Get()->Issues.ToOneLineString());
+ Send(Parent, new TEvPrivate::TEvResultWriterResponse(ev->Get()->Issues, NYdb::EStatus::INTERNAL_ERROR));
+ FailedAndPassAway();
+ return;
+ }
+
+ StartTime = TInstant::Now();
+ const auto resultSetProto = NYdb::TProtoAccessor::GetProto(*response.ResultSet);
+ if (response.ResultSet->RowsCount() == 0) {
+ Fq::Private::PingTaskRequest pingTaskRequest;
+ pingTaskRequest.mutable_result_id()->set_value(Params.ResultId);
+ pingTaskRequest.set_result_set_count(1);
+ auto& resultSetMeta = *pingTaskRequest.add_result_set_meta();
+ resultSetMeta.set_rows_count(Offset);
+ for (const auto& column: resultSetProto.columns()) {
+ *resultSetMeta.add_column() = column;
+ }
+ auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
+ pingCounters->InFly->Inc();
+ Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest));
+ return;
+ }
+
+ auto chunk = CreateProtoRequestWithoutResultSet(Offset);
+ Offset += response.ResultSet->RowsCount();
+ *chunk.mutable_result_set() = resultSetProto;
+ auto writeResultCounters = Counters.GetCounters(ERequestType::RT_WRITE_RESULT);
+ writeResultCounters->InFly->Inc();
+ Send(NFq::MakeInternalServiceActorId(), new NFq::TEvInternalService::TEvWriteResultRequest(std::move(chunk)));
+ }
+
+ void Handle(const NFq::TEvInternalService::TEvWriteResultResponse::TPtr& ev) {
+ auto writeResultCounters = Counters.GetCounters(ERequestType::RT_WRITE_RESULT);
+ writeResultCounters->InFly->Dec();
+ writeResultCounters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds());
+ if (ev.Get()->Get()->Status.IsSuccess()) {
+ writeResultCounters->Ok->Inc();
+ LOG_I("Result successfully written for offset " << Offset);
+ SendFetchScriptResultRequest();
+ } else {
+ writeResultCounters->Error->Inc();
+ LOG_E("Error writing result for offset " << Offset);
+ Send(Parent, new TEvPrivate::TEvResultWriterResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error writing result for offset " << Offset}}, NYdb::EStatus::INTERNAL_ERROR));
+ FailedAndPassAway();
+ }
+ }
+
+ void SendFetchScriptResultRequest() {
+ auto fetchScriptResultCounters = Counters.GetCounters(ERequestType::RT_FETCH_SCRIPT_RESULT);
+ fetchScriptResultCounters->InFly->Inc();
+ StartTime = TInstant::Now();
+ Register(new TRetryActor<TEvPrivate::TEvFetchScriptResultRequest, TEvPrivate::TEvFetchScriptResultResponse, int64_t, TString>(Counters.GetCounters(ERequestType::RT_FETCH_SCRIPT_RESULT), SelfId(), Connector, Offset, ExecutionId));
+ }
+
+ Fq::Private::WriteTaskResultRequest CreateProtoRequestWithoutResultSet(ui64 startRowIndex) {
+ Fq::Private::WriteTaskResultRequest protoReq;
+ protoReq.set_owner_id(Params.Owner);
+ protoReq.mutable_result_id()->set_value(Params.ResultId);
+ protoReq.set_offset(startRowIndex);
+ protoReq.set_result_set_id(0);
+ protoReq.set_request_id(0);
+ *protoReq.mutable_deadline() = NProtoInterop::CastToProto(Params.Deadline);
+ return protoReq;
+ }
+
+private:
+ TRunActorParams Params;
+ TActorId Parent;
+ TActorId Connector;
+ TActorId Pinger;
+ TString ExecutionId;
+ TCounters Counters;
+ TInstant StartTime;
+ int64_t Offset = 0;
+};
+
+std::unique_ptr<NActors::IActor> CreateResultWriterActor(const TRunActorParams& params,
+ const TActorId& parent,
+ const TActorId& connector,
+ const TActorId& pinger,
+ const TString& executionId,
+ const ::NYql::NCommon::TServiceCounters& queryCounters) {
+ return std::make_unique<TResultWriterActor>(params, parent, connector, pinger, executionId, queryCounters);
+}
+
+}
diff --git a/ydb/core/fq/libs/compute/ydb/result_writer_actor.h b/ydb/core/fq/libs/compute/ydb/result_writer_actor.h
new file mode 100644
index 0000000000..82418704f4
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/result_writer_actor.h
@@ -0,0 +1,18 @@
+#pragma once
+
+#include <ydb/core/fq/libs/compute/common/run_actor_params.h>
+
+#include <ydb/library/yql/providers/common/metrics/service_counters.h>
+
+#include <library/cpp/actors/core/actor.h>
+
+namespace NFq {
+
+std::unique_ptr<NActors::IActor> CreateResultWriterActor(const TRunActorParams& params,
+ const NActors::TActorId& parent,
+ const NActors::TActorId& connector,
+ const NActors::TActorId& pinger,
+ const TString& executionId,
+ const ::NYql::NCommon::TServiceCounters& queryCounters);
+
+}
diff --git a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
new file mode 100644
index 0000000000..19f428fe3d
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
@@ -0,0 +1,186 @@
+#include "base_compute_actor.h"
+
+#include <ydb/core/fq/libs/common/util.h>
+#include <ydb/core/fq/libs/compute/common/metrics.h>
+#include <ydb/core/fq/libs/compute/common/retry_actor.h>
+#include <ydb/core/fq/libs/compute/common/run_actor_params.h>
+#include <ydb/core/fq/libs/compute/ydb/events/events.h>
+#include <ydb/core/fq/libs/ydb/ydb.h>
+#include <ydb/core/protos/services.pb.h>
+
+#include <ydb/library/yql/providers/common/metrics/service_counters.h>
+#include <ydb/library/yql/public/issue/yql_issue_message.h>
+
+#include <ydb/public/sdk/cpp/client/draft/ydb_query/client.h>
+#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
+
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/log.h>
+
+
+#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [StatusTracker] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream)
+#define LOG_W(stream) LOG_WARN_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [StatusTracker] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream)
+#define LOG_I(stream) LOG_INFO_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [StatusTracker] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream)
+#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [StatusTracker] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream)
+#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [StatusTracker] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream)
+
+namespace NFq {
+
+using namespace NActors;
+using namespace NFq;
+
+class TStatusTrackerActor : public TBaseComputeActor<TStatusTrackerActor> {
+public:
+ using IRetryPolicy = IRetryPolicy<const TEvPrivate::TEvGetOperationResponse::TPtr&>;
+
+ enum ERequestType {
+ RT_GET_OPERATION,
+ RT_PING,
+ RT_MAX
+ };
+
+ class TCounters: public virtual TThrRefBase {
+ std::array<TComputeRequestCountersPtr, RT_MAX> Requests = CreateArray<RT_MAX, TComputeRequestCountersPtr>({
+ { MakeIntrusive<TComputeRequestCounters>("GetOperation") },
+ { MakeIntrusive<TComputeRequestCounters>("Ping") }
+ });
+
+ ::NMonitoring::TDynamicCounterPtr Counters;
+
+ public:
+ explicit TCounters(const ::NMonitoring::TDynamicCounterPtr& counters)
+ : Counters(counters)
+ {
+ for (auto& request: Requests) {
+ request->Register(Counters);
+ }
+ }
+
+ TComputeRequestCountersPtr GetCounters(ERequestType type) {
+ return Requests[type];
+ }
+ };
+
+ TStatusTrackerActor(const TRunActorParams& params, const TActorId& parent, const TActorId& connector, const TActorId& pinger, const NYdb::TOperation::TOperationId& operationId, const ::NYql::NCommon::TServiceCounters& queryCounters)
+ : TBaseComputeActor(queryCounters, "StatusTracker")
+ , Params(params)
+ , Parent(parent)
+ , Connector(connector)
+ , Pinger(pinger)
+ , OperationId(operationId)
+ , Counters(GetStepCountersSubgroup())
+ {}
+
+ static constexpr char ActorName[] = "FQ_STATUS_TRACKER";
+
+ void Start() {
+ LOG_I("Become");
+ Become(&TStatusTrackerActor::StateFunc);
+ SendGetOperation();
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvPrivate::TEvGetOperationResponse, Handle);
+ hFunc(TEvents::TEvForwardPingResponse, Handle);
+ )
+
+ void Handle(const TEvents::TEvForwardPingResponse::TPtr& ev) {
+ auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
+ pingCounters->InFly->Dec();
+ pingCounters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds());
+ if (ev.Get()->Get()->Success) {
+ pingCounters->Ok->Inc();
+ LOG_I("Information about the status of operation is stored");
+ Send(Parent, new TEvPrivate::TEvStatusTrackerResponse(Issues, Status, ExecStatus));
+ CompleteAndPassAway();
+ } else {
+ pingCounters->Error->Inc();
+ LOG_E("Error saving information about the status of operation");
+ Send(Parent, new TEvPrivate::TEvStatusTrackerResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error saving information about the status of operation: " << ProtoToString(OperationId)}}, NYdb::EStatus::INTERNAL_ERROR, ExecStatus));
+ FailedAndPassAway();
+ }
+ }
+
+ void Handle(const TEvPrivate::TEvGetOperationResponse::TPtr& ev) {
+ const auto& response = *ev.Get()->Get();
+ if (response.Status != NYdb::EStatus::SUCCESS) {
+ LOG_E("Can't get operation: " << ev->Get()->Issues.ToOneLineString());
+ Send(Parent, new TEvPrivate::TEvStatusTrackerResponse(ev->Get()->Issues, ev->Get()->Status, ExecStatus));
+ FailedAndPassAway();
+ return;
+ }
+
+ StartTime = TInstant::Now();
+ LOG_D("Execution status: " << static_cast<int>(response.ExecStatus));
+ switch (response.ExecStatus) {
+ case NYdb::NQuery::EExecStatus::Unspecified:
+ case NYdb::NQuery::EExecStatus::Starting:
+ SendGetOperation(TDuration::Seconds(1));
+ break;
+ case NYdb::NQuery::EExecStatus::Aborted:
+ case NYdb::NQuery::EExecStatus::Canceled:
+ case NYdb::NQuery::EExecStatus::Failed:
+ Issues = response.Issues;
+ Status = response.Status;
+ ExecStatus = response.ExecStatus;
+ Failed();
+ break;
+ case NYdb::NQuery::EExecStatus::Completed:
+ Issues = response.Issues;
+ Status = response.Status;
+ ExecStatus = response.ExecStatus;
+ Complete();
+ break;
+ }
+ }
+
+ void SendGetOperation(const TDuration& delay = TDuration::Zero()) {
+ Register(new TRetryActor<TEvPrivate::TEvGetOperationRequest, TEvPrivate::TEvGetOperationResponse, NYdb::TOperation::TOperationId>(Counters.GetCounters(ERequestType::RT_GET_OPERATION), delay, SelfId(), Connector, OperationId));
+ }
+
+ void Failed() {
+ LOG_I("Execution status: Failed, " << Status);
+ auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
+ pingCounters->InFly->Inc();
+ Fq::Private::PingTaskRequest pingTaskRequest;
+ NYql::IssuesToMessage(Issues, pingTaskRequest.mutable_issues());
+ pingTaskRequest.set_status(::FederatedQuery::QueryMeta::FAILING);
+ Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest));
+ }
+
+ void Complete() {
+ LOG_I("Execution status: Complete" << Status);
+ auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
+ pingCounters->InFly->Inc();
+ Fq::Private::PingTaskRequest pingTaskRequest;
+ NYql::IssuesToMessage(Issues, pingTaskRequest.mutable_issues());
+ pingTaskRequest.set_status(::FederatedQuery::QueryMeta::COMPLETING);
+ Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest));
+ }
+
+private:
+ TRunActorParams Params;
+ TActorId Parent;
+ TActorId Connector;
+ TActorId Pinger;
+ NYdb::TOperation::TOperationId OperationId;
+ TCounters Counters;
+ TInstant StartTime;
+ NYql::TIssues Issues;
+ NYdb::EStatus Status = NYdb::EStatus::SUCCESS;
+ NYdb::NQuery::EExecStatus ExecStatus = NYdb::NQuery::EExecStatus::Unspecified;
+};
+
+std::unique_ptr<NActors::IActor> CreateStatusTrackerActor(const TRunActorParams& params,
+ const TActorId& parent,
+ const TActorId& connector,
+ const TActorId& pinger,
+ const NYdb::TOperation::TOperationId& operationId,
+ const ::NYql::NCommon::TServiceCounters& queryCounters) {
+ return std::make_unique<TStatusTrackerActor>(params, parent, connector, pinger, operationId, queryCounters);
+}
+
+}
diff --git a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.h b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.h
new file mode 100644
index 0000000000..e330be962a
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.h
@@ -0,0 +1,18 @@
+#pragma once
+
+#include <ydb/core/fq/libs/compute/common/run_actor_params.h>
+
+#include <ydb/library/yql/providers/common/metrics/service_counters.h>
+
+#include <library/cpp/actors/core/actor.h>
+
+namespace NFq {
+
+std::unique_ptr<NActors::IActor> CreateStatusTrackerActor(const TRunActorParams& params,
+ const NActors::TActorId& parent,
+ const NActors::TActorId& connector,
+ const NActors::TActorId& pinger,
+ const NYdb::TOperation::TOperationId& operationId,
+ const ::NYql::NCommon::TServiceCounters& queryCounters);
+
+}
diff --git a/ydb/core/fq/libs/compute/ydb/stopper_actor.cpp b/ydb/core/fq/libs/compute/ydb/stopper_actor.cpp
new file mode 100644
index 0000000000..59e60c6128
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/stopper_actor.cpp
@@ -0,0 +1,111 @@
+#include "base_compute_actor.h"
+#include "resources_cleaner_actor.h"
+
+#include <ydb/core/fq/libs/common/util.h>
+#include <ydb/core/fq/libs/compute/common/metrics.h>
+#include <ydb/core/fq/libs/compute/common/retry_actor.h>
+#include <ydb/core/fq/libs/compute/common/run_actor_params.h>
+#include <ydb/core/fq/libs/compute/ydb/events/events.h>
+#include <ydb/core/fq/libs/ydb/ydb.h>
+#include <ydb/core/protos/services.pb.h>
+
+#include <ydb/public/sdk/cpp/client/draft/ydb_query/client.h>
+#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
+
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/log.h>
+
+
+#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [Stopper] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream)
+#define LOG_W(stream) LOG_WARN_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [Stopper] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream)
+#define LOG_I(stream) LOG_INFO_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [Stopper] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream)
+#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [Stopper] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream)
+#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [Stopper] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream)
+
+namespace NFq {
+
+using namespace NActors;
+using namespace NFq;
+
+class TStopperActor : public TBaseComputeActor<TStopperActor> {
+public:
+ enum ERequestType {
+ RT_CANCEL_OPERATION,
+ RT_MAX
+ };
+
+ class TCounters: public virtual TThrRefBase {
+ std::array<TComputeRequestCountersPtr, RT_MAX> Requests = CreateArray<RT_MAX, TComputeRequestCountersPtr>({
+ { MakeIntrusive<TComputeRequestCounters>("CancelOperation") }
+ });
+
+ ::NMonitoring::TDynamicCounterPtr Counters;
+
+ public:
+ explicit TCounters(const ::NMonitoring::TDynamicCounterPtr& counters)
+ : Counters(counters)
+ {
+ for (auto& request: Requests) {
+ request->Register(Counters);
+ }
+ }
+
+ TComputeRequestCountersPtr GetCounters(ERequestType type) {
+ return Requests[type];
+ }
+ };
+
+ TStopperActor(const TRunActorParams& params, const TActorId& parent, const TActorId& connector, const NYdb::TOperation::TOperationId& operationId, const ::NYql::NCommon::TServiceCounters& queryCounters)
+ : TBaseComputeActor(queryCounters, "Stopper")
+ , Params(params)
+ , Parent(parent)
+ , Connector(connector)
+ , OperationId(operationId)
+ , Counters(GetStepCountersSubgroup())
+ {}
+
+ static constexpr char ActorName[] = "FQ_STOPPER_ACTOR";
+
+ void Start() {
+ LOG_I("Start stopper actor. Compute state: " << FederatedQuery::QueryMeta::ComputeStatus_Name(Params.Status));
+ Become(&TStopperActor::StateFunc);
+ Register(new TRetryActor<TEvPrivate::TEvCancelOperationRequest, TEvPrivate::TEvCancelOperationResponse, NYdb::TOperation::TOperationId>(Counters.GetCounters(ERequestType::RT_CANCEL_OPERATION), SelfId(), Connector, OperationId));
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvPrivate::TEvCancelOperationResponse, Handle);
+ )
+
+ void Handle(const TEvPrivate::TEvCancelOperationResponse::TPtr& ev) {
+ const auto& response = *ev.Get()->Get();
+ if (response.Status != NYdb::EStatus::SUCCESS) {
+ LOG_E("Can't cancel operation: " << ev->Get()->Issues.ToOneLineString());
+ Send(Parent, new TEvPrivate::TEvStopperResponse(ev->Get()->Issues, ev->Get()->Status));
+ FailedAndPassAway();
+ return;
+ }
+ LOG_I("Operation successfully canceled");
+ Send(Parent, new TEvPrivate::TEvStopperResponse({}, NYdb::EStatus::SUCCESS));
+ CompleteAndPassAway();
+ }
+
+private:
+ TRunActorParams Params;
+ TActorId Parent;
+ TActorId Connector;
+ NYdb::TOperation::TOperationId OperationId;
+ TCounters Counters;
+};
+
+std::unique_ptr<NActors::IActor> CreateStopperActor(const TRunActorParams& params,
+ const TActorId& parent,
+ const TActorId& connector,
+ const NYdb::TOperation::TOperationId& operationId,
+ const ::NYql::NCommon::TServiceCounters& queryCounters) {
+ return std::make_unique<TStopperActor>(params, parent, connector, operationId, queryCounters);
+}
+
+}
diff --git a/ydb/core/fq/libs/compute/ydb/stopper_actor.h b/ydb/core/fq/libs/compute/ydb/stopper_actor.h
new file mode 100644
index 0000000000..a66c3ba7e0
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/stopper_actor.h
@@ -0,0 +1,17 @@
+#pragma once
+
+#include <ydb/core/fq/libs/compute/common/run_actor_params.h>
+
+#include <ydb/library/yql/providers/common/metrics/service_counters.h>
+
+#include <library/cpp/actors/core/actor.h>
+
+namespace NFq {
+
+std::unique_ptr<NActors::IActor> CreateStopperActor(const TRunActorParams& params,
+ const NActors::TActorId& parent,
+ const NActors::TActorId& connector,
+ const NYdb::TOperation::TOperationId& operationId,
+ const ::NYql::NCommon::TServiceCounters& queryCounters);
+
+}
diff --git a/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp
new file mode 100644
index 0000000000..164fe79513
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp
@@ -0,0 +1,128 @@
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <ydb/core/fq/libs/compute/ydb/events/events.h>
+#include <ydb/core/fq/libs/compute/common/run_actor_params.h>
+#include <ydb/public/sdk/cpp/client/draft/ydb_query/client.h>
+#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
+#include <ydb/core/fq/libs/ydb/ydb.h>
+
+namespace NFq {
+
+using namespace NActors;
+using namespace NFq;
+
+class TYdbConnectorActor : public NActors::TActorBootstrapped<TYdbConnectorActor> {
+public:
+ explicit TYdbConnectorActor(const TRunActorParams& params)
+ : Params(params)
+ {}
+
+ void Bootstrap() {
+ auto querySettings = NFq::GetClientSettings<NYdb::NQuery::TClientSettings>(Params.Config.GetCompute().GetYdb().GetConnection(), Params.CredentialsProviderFactory);
+ QueryClient = std::make_unique<NYdb::NQuery::TQueryClient>(Params.YqSharedResources->UserSpaceYdbDriver, querySettings);
+ auto operationSettings = NFq::GetClientSettings<NYdb::TCommonClientSettings>(Params.Config.GetCompute().GetYdb().GetConnection(), Params.CredentialsProviderFactory);
+ OperationClient = std::make_unique<NYdb::NOperation::TOperationClient>(Params.YqSharedResources->UserSpaceYdbDriver, operationSettings);
+ Become(&TYdbConnectorActor::StateFunc);
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvPrivate::TEvExecuteScriptRequest, Handle);
+ hFunc(TEvPrivate::TEvGetOperationRequest, Handle);
+ hFunc(TEvPrivate::TEvFetchScriptResultRequest, Handle);
+ hFunc(TEvPrivate::TEvCancelOperationRequest, Handle);
+ hFunc(TEvPrivate::TEvForgetOperationRequest, Handle);
+ cFunc(NActors::TEvents::TEvPoisonPill::EventType, PassAway);
+ )
+
+ void Handle(const TEvPrivate::TEvExecuteScriptRequest::TPtr& ev) {
+ QueryClient
+ ->ExecuteScript(ev->Get()->Sql)
+ .Apply([actorSystem = NActors::TActivationContext::ActorSystem(), recipient = ev->Sender, cookie = ev->Cookie](auto future) {
+ try {
+ auto response = future.ExtractValueSync();
+ if (response.Status().IsSuccess()) {
+ actorSystem->Send(recipient, new TEvPrivate::TEvExecuteScriptResponse(response.Id(), response.Metadata().ExecutionId), 0, cookie);
+ } else {
+ actorSystem->Send(recipient, new TEvPrivate::TEvExecuteScriptResponse(response.Status().GetIssues(), response.Status().GetStatus()), 0, cookie);
+ }
+ } catch (...) {
+ actorSystem->Send(recipient, new TEvPrivate::TEvExecuteScriptResponse(NYql::TIssues{NYql::TIssue{CurrentExceptionMessage()}}, NYdb::EStatus::GENERIC_ERROR), 0, cookie);
+ }
+ });
+ }
+
+ void Handle(const TEvPrivate::TEvGetOperationRequest::TPtr& ev) {
+ OperationClient
+ ->Get<NYdb::NQuery::TScriptExecutionOperation>(ev->Get()->OperationId)
+ .Apply([actorSystem = NActors::TActivationContext::ActorSystem(), recipient = ev->Sender, cookie = ev->Cookie](auto future) {
+ try {
+ auto response = future.ExtractValueSync();
+ if (response.Status().IsSuccess()) {
+ actorSystem->Send(recipient, new TEvPrivate::TEvGetOperationResponse(response.Metadata().ExecStatus, response.Status().GetIssues()), 0, cookie);
+ } else {
+ actorSystem->Send(recipient, new TEvPrivate::TEvGetOperationResponse(response.Status().GetIssues(), response.Status().GetStatus()), 0, cookie);
+ }
+ } catch (...) {
+ actorSystem->Send(recipient, new TEvPrivate::TEvGetOperationResponse(NYql::TIssues{NYql::TIssue{CurrentExceptionMessage()}}, NYdb::EStatus::GENERIC_ERROR), 0, cookie);
+ }
+ });
+ }
+
+ void Handle(const TEvPrivate::TEvFetchScriptResultRequest::TPtr& ev) {
+ NYdb::NQuery::TFetchScriptResultsSettings settings;
+ settings.RowsOffset(ev->Get()->RowOffset);
+ QueryClient
+ ->FetchScriptResults(ev->Get()->ExecutionId, settings)
+ .Apply([actorSystem = NActors::TActivationContext::ActorSystem(), recipient = ev->Sender, cookie = ev->Cookie](auto future) {
+ try {
+ auto response = future.ExtractValueSync();
+ if (response.IsSuccess()) {
+ actorSystem->Send(recipient, new TEvPrivate::TEvFetchScriptResultResponse(response.ExtractResultSet()), 0, cookie);
+ } else {
+ actorSystem->Send(recipient, new TEvPrivate::TEvFetchScriptResultResponse(response.GetIssues(), response.GetStatus()), 0, cookie);
+ }
+ } catch (...) {
+ actorSystem->Send(recipient, new TEvPrivate::TEvFetchScriptResultResponse(NYql::TIssues{NYql::TIssue{CurrentExceptionMessage()}}, NYdb::EStatus::GENERIC_ERROR), 0, cookie);
+ }
+ });
+ }
+
+ void Handle(const TEvPrivate::TEvCancelOperationRequest::TPtr& ev) {
+ OperationClient
+ ->Cancel(ev->Get()->OperationId)
+ .Apply([actorSystem = NActors::TActivationContext::ActorSystem(), recipient = ev->Sender, cookie = ev->Cookie](auto future) {
+ try {
+ auto response = future.ExtractValueSync();
+ actorSystem->Send(recipient, new TEvPrivate::TEvCancelOperationResponse(response.GetIssues(), response.GetStatus()), 0, cookie);
+ } catch (...) {
+ actorSystem->Send(recipient, new TEvPrivate::TEvCancelOperationResponse(NYql::TIssues{NYql::TIssue{CurrentExceptionMessage()}}, NYdb::EStatus::GENERIC_ERROR), 0, cookie);
+ }
+ });
+ }
+
+ void Handle(const TEvPrivate::TEvForgetOperationRequest::TPtr& ev) {
+ OperationClient
+ ->Forget(ev->Get()->OperationId)
+ .Apply([actorSystem = NActors::TActivationContext::ActorSystem(), recipient = ev->Sender, cookie = ev->Cookie](auto future) {
+ try {
+ auto response = future.ExtractValueSync();
+ actorSystem->Send(recipient, new TEvPrivate::TEvForgetOperationResponse(response.GetIssues(), response.GetStatus()), 0, cookie);
+ } catch (...) {
+ actorSystem->Send(recipient, new TEvPrivate::TEvForgetOperationResponse(NYql::TIssues{NYql::TIssue{CurrentExceptionMessage()}}, NYdb::EStatus::GENERIC_ERROR), 0, cookie);
+ }
+ });
+ }
+
+private:
+ TRunActorParams Params;
+ std::unique_ptr<NYdb::NQuery::TQueryClient> QueryClient;
+ std::unique_ptr<NYdb::NOperation::TOperationClient> OperationClient;
+};
+
+std::unique_ptr<NActors::IActor> CreateConnectorActor(const TRunActorParams& params) {
+ return std::make_unique<TYdbConnectorActor>(params);
+}
+
+}
diff --git a/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.h b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.h
new file mode 100644
index 0000000000..ae1ea7accf
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.h
@@ -0,0 +1,9 @@
+#pragma once
+
+#include <ydb/core/fq/libs/compute/common/run_actor_params.h>
+
+namespace NFq {
+
+std::unique_ptr<NActors::IActor> CreateConnectorActor(const TRunActorParams& params);
+
+}
diff --git a/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp b/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp
new file mode 100644
index 0000000000..c1da57d093
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp
@@ -0,0 +1,217 @@
+#include "ydb_run_actor.h"
+
+#include <library/cpp/actors/core/events.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/log.h>
+
+#include <google/protobuf/util/time_util.h>
+
+#include <util/string/split.h>
+#include <util/system/hostname.h>
+
+#include <library/cpp/actors/core/log.h>
+#include <library/cpp/actors/core/mon.h>
+#include <library/cpp/protobuf/interop/cast.h>
+#include <ydb/core/fq/libs/compute/common/pinger.h>
+#include <ydb/core/fq/libs/compute/ydb/events/events.h>
+#include <ydb/core/fq/libs/control_plane_storage/util.h>
+#include <ydb/core/fq/libs/private_client/events.h>
+#include <ydb/core/fq/libs/ydb/ydb.h>
+#include <ydb/core/protos/services.pb.h>
+#include <ydb/library/yql/public/issue/yql_issue_message.h>
+#include <ydb/public/sdk/cpp/client/draft/ydb_query/client.h>
+#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
+#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
+
+
+#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] QueryId: " << Params.QueryId << " " << stream)
+#define LOG_W(stream) LOG_WARN_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] QueryId: " << Params.QueryId << " " << stream)
+#define LOG_I(stream) LOG_INFO_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] QueryId: " << Params.QueryId << " " << stream)
+#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] QueryId: " << Params.QueryId << " " << stream)
+#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] QueryId: " << Params.QueryId << " " << stream)
+
+namespace NFq {
+
+using namespace NActors;
+using namespace NFq;
+
+class TYdbRunActor : public NActors::TActorBootstrapped<TYdbRunActor> {
+public:
+ explicit TYdbRunActor(
+ const TActorId& fetcherId,
+ const ::NYql::NCommon::TServiceCounters& queryCounters,
+ TRunActorParams&& params,
+ const IActorFactory::TPtr& actorFactory)
+ : FetcherId(fetcherId)
+ , Params(std::move(params))
+ , CreatedAt(Params.CreatedAt)
+ , QueryCounters(queryCounters)
+ , ActorFactory(actorFactory)
+ {}
+
+ static constexpr char ActorName[] = "YDB_RUN_ACTOR";
+
+ void Bootstrap() {
+ LOG_I("Boostrap. " << Params);
+ Pinger = Register(ActorFactory->CreatePinger(SelfId()).release());
+ Connector = Register(ActorFactory->CreateConnector().release());
+ Become(&TYdbRunActor::StateFunc);
+ Run();
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvPrivate::TEvExecuterResponse, Handle);
+ hFunc(TEvPrivate::TEvStatusTrackerResponse, Handle);
+ hFunc(TEvPrivate::TEvResultWriterResponse, Handle);
+ hFunc(TEvPrivate::TEvResourcesCleanerResponse, Handle);
+ hFunc(TEvPrivate::TEvFinalizerResponse, Handle);
+ hFunc(TEvPrivate::TEvStopperResponse, Handle);
+ )
+
+ void Handle(const TEvPrivate::TEvExecuterResponse::TPtr& ev) {
+ auto& response = *ev->Get();
+ if (!response.Success) {
+ LOG_I("ExecuterResponse (failed). Issues: " << response.Issues.ToOneLineString());
+ ResignAndPassAway(response.Issues);
+ return;
+ }
+ Params.ExecutionId = response.ExecutionId;
+ Params.OperationId = response.OperationId;
+ LOG_I("ExecuterResponse (success). ExecutionId: " << Params.ExecutionId << " OperationId: " << ProtoToString(Params.OperationId));
+ Register(ActorFactory->CreateStatusTracker(SelfId(), Connector, Pinger, Params.OperationId).release());
+ }
+
+ void Handle(const TEvPrivate::TEvStatusTrackerResponse::TPtr& ev) {
+ auto& response = *ev->Get();
+ if (response.Status != NYdb::EStatus::SUCCESS) {
+ LOG_I("StatusTrackerResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString());
+ ResignAndPassAway(response.Issues);
+ return;
+ }
+ ExecStatus = response.ExecStatus;
+ LOG_I("StatusTrackerResponse (success) " << response.Status << " ExecStatus: " << static_cast<int>(response.ExecStatus) << " Issues: " << response.Issues.ToOneLineString());
+ if (response.ExecStatus == NYdb::NQuery::EExecStatus::Completed) {
+ Register(ActorFactory->CreateResultWriter(SelfId(), Connector, Pinger, Params.ExecutionId).release());
+ } else {
+ Register(ActorFactory->CreateResourcesCleaner(SelfId(), Connector, Params.OperationId).release());
+ }
+ }
+
+ void Handle(const TEvPrivate::TEvResultWriterResponse::TPtr& ev) {
+ auto& response = *ev->Get();
+ if (response.Status != NYdb::EStatus::SUCCESS) {
+ LOG_I("ResultWriterResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString());
+ ResignAndPassAway(response.Issues);
+ return;
+ }
+ LOG_I("ResultWriterResponse (success) " << response.Status << " Issues: " << response.Issues.ToOneLineString());
+ Register(ActorFactory->CreateResourcesCleaner(SelfId(), Connector, Params.OperationId).release());
+ }
+
+ void Handle(const TEvPrivate::TEvResourcesCleanerResponse::TPtr& ev) {
+ auto& response = *ev->Get();
+ if (response.Status != NYdb::EStatus::SUCCESS && response.Status != NYdb::EStatus::UNSUPPORTED) {
+ LOG_I("ResourcesCleanerResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString());
+ ResignAndPassAway(response.Issues);
+ return;
+ }
+ LOG_I("ResourcesCleanerResponse (success) " << response.Status << " Issues: " << response.Issues.ToOneLineString());
+ Register(ActorFactory->CreateFinalizer(SelfId(), Pinger, ExecStatus).release());
+ }
+
+ void Handle(const TEvPrivate::TEvFinalizerResponse::TPtr ev) {
+ auto& response = *ev->Get();
+ if (response.Status != NYdb::EStatus::SUCCESS) {
+ LOG_I("FinalizerResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString());
+ ResignAndPassAway(response.Issues);
+ return;
+ }
+ LOG_I("FinalizerResponse (success) " << response.Status << " Issues: " << response.Issues.ToOneLineString());
+ FinishAndPassAway();
+ }
+
+ void Handle(TEvents::TEvQueryActionResult::TPtr& ev) {
+ LOG_I("QueryActionResult: " << ev->Get()->Action);
+ if (Params.OperationId.GetKind() != Ydb::TOperationId::UNUSED && !IsAborted) {
+ IsAborted = true;
+ Register(ActorFactory->CreateStopper(SelfId(), Connector, Params.OperationId).release());
+ }
+ }
+
+ void Handle(const TEvPrivate::TEvStopperResponse::TPtr& ev) {
+ auto& response = *ev->Get();
+ if (response.Status != NYdb::EStatus::SUCCESS) {
+ LOG_I("StopperResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString());
+ ResignAndPassAway(response.Issues);
+ return;
+ }
+ LOG_I("StopperResponse (success) " << response.Status << " Issues: " << response.Issues.ToOneLineString());
+ Register(ActorFactory->CreateResourcesCleaner(SelfId(), Connector, Params.OperationId).release());
+ }
+
+ void Run() {
+ switch (Params.Status) {
+ case FederatedQuery::QueryMeta::ABORTING_BY_USER:
+ case FederatedQuery::QueryMeta::ABORTING_BY_SYSTEM:
+ case FederatedQuery::QueryMeta::FAILING:
+ if (Params.OperationId.GetKind() != Ydb::TOperationId::UNUSED) {
+ Register(ActorFactory->CreateResourcesCleaner(SelfId(), Connector, Params.OperationId).release());
+ } else {
+ Register(ActorFactory->CreateFinalizer(SelfId(), Pinger, ExecStatus).release());
+ }
+ break;
+ case FederatedQuery::QueryMeta::COMPLETING:
+ if (Params.OperationId.GetKind() != Ydb::TOperationId::UNUSED) {
+ Register(ActorFactory->CreateResultWriter(SelfId(), Connector, Pinger, Params.ExecutionId).release());
+ } else {
+ Register(ActorFactory->CreateFinalizer(SelfId(), Pinger, ExecStatus).release());
+ }
+ break;
+ case FederatedQuery::QueryMeta::STARTING:
+ Register(ActorFactory->CreateExecuter(SelfId(), Connector, Pinger).release());
+ break;
+ case FederatedQuery::QueryMeta::RUNNING:
+ Register(ActorFactory->CreateStatusTracker(SelfId(), Connector, Pinger, Params.OperationId).release());
+ break;
+ default:
+ break;
+ }
+ }
+
+ void ResignAndPassAway(const NYql::TIssues& issues) {
+ Fq::Private::PingTaskRequest pingTaskRequest;
+ NYql::IssuesToMessage(issues, pingTaskRequest.mutable_transient_issues());
+ pingTaskRequest.set_resign_query(true);
+ Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest, true));
+ FinishAndPassAway();
+ }
+
+ void FinishAndPassAway() {
+ Send(Connector, new NActors::TEvents::TEvPoisonPill());
+ PassAway();
+ }
+
+private:
+ bool IsAborted = false;
+ TActorId FetcherId;
+ NYdb::NQuery::EExecStatus ExecStatus = NYdb::NQuery::EExecStatus::Unspecified;
+ TRunActorParams Params;
+ TInstant CreatedAt;
+ NYql::TIssues TransientIssues;
+ ::NYql::NCommon::TServiceCounters QueryCounters;
+ TActorId Pinger;
+ TActorId Connector;
+ IActorFactory::TPtr ActorFactory;
+};
+
+IActor* CreateYdbRunActor(
+ const NActors::TActorId& fetcherId,
+ const ::NYql::NCommon::TServiceCounters& serviceCounters,
+ TRunActorParams&& params,
+ const IActorFactory::TPtr& actorFactory
+) {
+ return new TYdbRunActor(fetcherId, serviceCounters, std::move(params), actorFactory);
+}
+
+} /* NFq */
diff --git a/ydb/core/fq/libs/compute/ydb/ydb_run_actor.h b/ydb/core/fq/libs/compute/ydb/ydb_run_actor.h
new file mode 100644
index 0000000000..2b738db143
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/ydb_run_actor.h
@@ -0,0 +1,38 @@
+#pragma once
+
+#include "actors_factory.h"
+
+#include <ydb/core/mon/mon.h>
+
+#include <ydb/core/fq/libs/compute/common/run_actor_params.h>
+#include <ydb/core/fq/libs/config/protos/pinger.pb.h>
+#include <ydb/core/fq/libs/events/events.h>
+#include <ydb/core/fq/libs/private_client/private_client.h>
+#include <ydb/core/fq/libs/shared_resources/shared_resources.h>
+#include <ydb/core/fq/libs/signer/signer.h>
+
+#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
+#include <ydb/library/yql/providers/dq/provider/yql_dq_gateway.h>
+#include <ydb/library/yql/providers/dq/worker_manager/interface/counters.h>
+#include <ydb/library/yql/providers/dq/actors/proto_builder.h>
+#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
+#include <ydb/library/yql/providers/common/metrics/service_counters.h>
+#include <ydb/library/yql/providers/pq/cm_client/client.h>
+
+#include <ydb/public/lib/fq/scope.h>
+
+#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/random_provider/random_provider.h>
+#include <library/cpp/time_provider/time_provider.h>
+
+#include <util/datetime/base.h>
+
+namespace NFq {
+
+NActors::IActor* CreateYdbRunActor(
+ const NActors::TActorId& fetcherId,
+ const ::NYql::NCommon::TServiceCounters& serviceCounters,
+ TRunActorParams&& params,
+ const IActorFactory::TPtr& actorFactory);
+
+} /* NFq */
diff --git a/ydb/core/fq/libs/config/protos/compute.proto b/ydb/core/fq/libs/config/protos/compute.proto
index 2bed58d5d4..90ece7388c 100644
--- a/ydb/core/fq/libs/config/protos/compute.proto
+++ b/ydb/core/fq/libs/config/protos/compute.proto
@@ -5,6 +5,7 @@ package NFq.NConfig;
option java_package = "ru.yandex.kikimr.proto";
import "ydb/core/fq/libs/config/protos/storage.proto";
+import "ydb/public/api/protos/draft/fq.proto";
////////////////////////////////////////////////////////////
@@ -12,12 +13,34 @@ message TInPlaceCompute {
}
message TYdbCompute {
- TYdbStorageConfig Connection = 1;
+ bool Enable = 1;
+ TYdbStorageConfig Connection = 2;
}
-message TComputeConfig {
- oneof type {
- TInPlaceCompute InPlace = 1;
- TYdbCompute Ydb = 2;
+enum EComputeType {
+ UNKNOWN = 0;
+ IN_PLACE = 1;
+ YDB = 2;
+}
+
+message TComputeMappingRuleKey {
+ oneof key {
+ FederatedQuery.QueryContent.QueryType QueryType = 1;
+ // FederatedQuery.QueryContent.EngineType EngineType = 2;
}
}
+
+message TComputeMappingRule {
+ repeated TComputeMappingRuleKey Key = 1;
+ EComputeType Compute = 2;
+}
+
+message TComputeMapping {
+ repeated TComputeMappingRule Rule = 1;
+}
+
+message TComputeConfig {
+ TInPlaceCompute InPlace = 1;
+ TYdbCompute Ydb = 2;
+ TComputeMapping ComputeMapping = 3;
+}
diff --git a/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp b/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp
index 8ecb120c6f..a86d817e87 100644
--- a/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp
+++ b/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp
@@ -491,6 +491,9 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ
*newTask->mutable_result_set_meta() = task.Query.result_set_meta();
newTask->set_scope(task.Scope);
*newTask->mutable_resources() = task.Internal.resources();
+
+ newTask->set_execution_id(task.Internal.execution_id());
+ newTask->set_operation_id(task.Internal.operation_id());
}
return result;
diff --git a/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp b/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp
index d41d5f2650..829155c6fd 100644
--- a/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp
+++ b/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp
@@ -170,7 +170,7 @@ TPingTaskParams ConstructHardPingTask(
}
if (request.status_code() != NYql::NDqProto::StatusIds::UNSPECIFIED) {
- internal.set_status_code(request.status_code());
+ internal.set_status_code(request.status_code());
}
if (issues) {
@@ -277,6 +277,8 @@ TPingTaskParams ConstructHardPingTask(
internal.clear_created_topic_consumers();
// internal.clear_dq_graph(); keep for debug
internal.clear_dq_graph_index();
+ // internal.clear_execution_id(); keep for debug
+ // internal.clear_operation_id(); keep for debug
}
if (!request.created_topic_consumers().empty()) {
@@ -293,6 +295,14 @@ TPingTaskParams ConstructHardPingTask(
}
}
+ if (!request.execution_id().empty()) {
+ internal.set_execution_id(request.execution_id());
+ }
+
+ if (!request.operation_id().empty()) {
+ internal.set_operation_id(request.operation_id());
+ }
+
if (!request.dq_graph().empty()) {
*internal.mutable_dq_graph() = request.dq_graph();
}
diff --git a/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto b/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto
index dcb545065e..ff7d943d41 100644
--- a/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto
+++ b/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto
@@ -41,7 +41,9 @@ message QueryInternal {
repeated Fq.Private.CompressedData dq_graph_compressed = 20;
Fq.Private.TaskResources resources = 21;
repeated Ydb.Issue.IssueMessage internal_issue = 22;
- NYql.NDqProto.StatusIds.StatusCode status_code = 23;
+ NYql.NDqProto.StatusIds.StatusCode status_code = 23;
+ string operation_id = 24;
+ string execution_id = 25;
}
message JobInternal {
diff --git a/ydb/core/fq/libs/control_plane_storage/util.cpp b/ydb/core/fq/libs/control_plane_storage/util.cpp
index 61eaa149bf..c31aa8e20b 100644
--- a/ydb/core/fq/libs/control_plane_storage/util.cpp
+++ b/ydb/core/fq/libs/control_plane_storage/util.cpp
@@ -185,6 +185,9 @@ bool DoesPingTaskUpdateQueriesTable(const Fq::Private::PingTaskRequest& request)
|| request.has_disposition()
|| request.has_resources()
|| !request.internal_issues().empty()
+ || !request.execution_id().empty()
+ || !request.operation_id().empty()
+ || !request.result_id().value().empty()
;
}
diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
index fc1097df9d..db24cda40e 100644
--- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
+++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
@@ -955,6 +955,8 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyQuery
internal.clear_plan_compressed();
internal.clear_ast_compressed();
internal.clear_dq_graph_compressed();
+ internal.clear_execution_id();
+ internal.clear_operation_id();
auto& jobMeta = *job.mutable_meta();
jobMeta.set_id(jobId);
diff --git a/ydb/core/fq/libs/events/events.h b/ydb/core/fq/libs/events/events.h
index 18e7724946..3d5e4e6e2c 100644
--- a/ydb/core/fq/libs/events/events.h
+++ b/ydb/core/fq/libs/events/events.h
@@ -243,7 +243,7 @@ struct TEvents {
struct TEvEffectApplicationResult : public NActors::TEventLocal<TEvEffectApplicationResult, TEventIds::EvEffectApplicationResult> {
explicit TEvEffectApplicationResult(const NYql::TIssues& issues, bool fataError = false)
: Issues(issues), FatalError(fataError) {
- }
+ }
NYql::TIssues Issues;
const bool FatalError;
};
diff --git a/ydb/core/fq/libs/protos/fq_private.proto b/ydb/core/fq/libs/protos/fq_private.proto
index 80b507f18d..9320cdaf02 100644
--- a/ydb/core/fq/libs/protos/fq_private.proto
+++ b/ydb/core/fq/libs/protos/fq_private.proto
@@ -114,6 +114,8 @@ message GetTaskResult {
TaskResources resources = 33;
FederatedQuery.QueryContent.QuerySyntax query_syntax = 34;
+ string operation_id = 35;
+ string execution_id = 36;
}
repeated Task tasks = 1;
}
@@ -154,6 +156,8 @@ message PingTaskRequest {
string tenant = 104;
TaskResources resources = 33;
repeated Ydb.Issue.IssueMessage internal_issues = 34;
+ string operation_id = 35;
+ string execution_id = 36;
}
message PingTaskResult {
diff --git a/ydb/core/fq/libs/ydb/ydb.cpp b/ydb/core/fq/libs/ydb/ydb.cpp
index 117a4905f8..b20d38c672 100644
--- a/ydb/core/fq/libs/ydb/ydb.cpp
+++ b/ydb/core/fq/libs/ydb/ydb.cpp
@@ -4,11 +4,6 @@
#include <util/stream/str.h>
#include <util/string/printf.h>
-#include <util/stream/file.h>
-#include <util/string/strip.h>
-#include <util/system/env.h>
-
-#include <ydb/library/security/ydb_credentials_provider_factory.h>
namespace NFq {
@@ -22,9 +17,6 @@ namespace {
////////////////////////////////////////////////////////////////////////////////
-template <class TSettings>
-TSettings GetClientSettings(const NConfig::TYdbStorageConfig& config, const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory);
-
TFuture<TDataQueryResult> SelectGeneration(const TGenerationContextPtr& context) {
// TODO: use prepared queries
@@ -169,46 +161,6 @@ TFuture<TStatus> RegisterGenerationWrapper(
});
}
-template <class TSettings>
-TSettings GetClientSettings(const NConfig::TYdbStorageConfig& config,
- const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory) {
- TString oauth;
- if (config.GetToken()) {
- oauth = config.GetToken();
- } else if (config.GetOAuthFile()) {
- oauth = StripString(TFileInput(config.GetOAuthFile()).ReadAll());
- } else {
- oauth = GetEnv("YDB_TOKEN");
- }
-
- const TString iamEndpoint = config.GetIamEndpoint();
- const TString saKeyFile = config.GetSaKeyFile();
-
- TSettings settings;
- settings
- .DiscoveryEndpoint(config.GetEndpoint())
- .Database(config.GetDatabase());
-
- NKikimr::TYdbCredentialsSettings credSettings;
- credSettings.UseLocalMetadata = config.GetUseLocalMetadataService();
- credSettings.OAuthToken = oauth;
- credSettings.SaKeyFile = config.GetSaKeyFile();
- credSettings.IamEndpoint = config.GetIamEndpoint();
-
- settings.CredentialsProviderFactory(credProviderFactory(credSettings));
-
- if (config.GetUseLocalMetadataService()) {
- settings.SslCredentials(TSslCredentials(true));
- }
-
- if (config.GetCertificateFile()) {
- auto cert = StripString(TFileInput(config.GetCertificateFile()).ReadAll());
- settings.SslCredentials(TSslCredentials(true, cert));
- }
-
- return settings;
-}
-
} // namespace
////////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/core/fq/libs/ydb/ydb.h b/ydb/core/fq/libs/ydb/ydb.h
index 6c4b99e09f..85891e336b 100644
--- a/ydb/core/fq/libs/ydb/ydb.h
+++ b/ydb/core/fq/libs/ydb/ydb.h
@@ -8,6 +8,10 @@
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
+#include <util/stream/file.h>
+#include <util/string/strip.h>
+#include <util/system/env.h>
+
namespace NFq {
////////////////////////////////////////////////////////////////////////////////
@@ -130,4 +134,44 @@ NThreading::TFuture<NYdb::TStatus> CheckGeneration(const TGenerationContextPtr&
NThreading::TFuture<NYdb::TStatus> RollbackTransaction(const TGenerationContextPtr& context);
+template <class TSettings>
+TSettings GetClientSettings(const NConfig::TYdbStorageConfig& config,
+ const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory) {
+ TString oauth;
+ if (config.GetToken()) {
+ oauth = config.GetToken();
+ } else if (config.GetOAuthFile()) {
+ oauth = StripString(TFileInput(config.GetOAuthFile()).ReadAll());
+ } else {
+ oauth = GetEnv("YDB_TOKEN");
+ }
+
+ const TString iamEndpoint = config.GetIamEndpoint();
+ const TString saKeyFile = config.GetSaKeyFile();
+
+ TSettings settings;
+ settings
+ .DiscoveryEndpoint(config.GetEndpoint())
+ .Database(config.GetDatabase());
+
+ NKikimr::TYdbCredentialsSettings credSettings;
+ credSettings.UseLocalMetadata = config.GetUseLocalMetadataService();
+ credSettings.OAuthToken = oauth;
+ credSettings.SaKeyFile = config.GetSaKeyFile();
+ credSettings.IamEndpoint = config.GetIamEndpoint();
+
+ settings.CredentialsProviderFactory(credProviderFactory(credSettings));
+
+ if (config.GetUseLocalMetadataService()) {
+ settings.SslCredentials(NYdb::TSslCredentials(true));
+ }
+
+ if (config.GetCertificateFile()) {
+ auto cert = StripString(TFileInput(config.GetCertificateFile()).ReadAll());
+ settings.SslCredentials(NYdb::TSslCredentials(true, cert));
+ }
+
+ return settings;
+}
+
} // namespace NFq