diff options
author | hcpp <hcpp@ydb.tech> | 2023-06-09 15:09:10 +0300 |
---|---|---|
committer | hcpp <hcpp@ydb.tech> | 2023-06-09 15:09:10 +0300 |
commit | 4e48f42bb1da68482885363f334d1f317a51f78c (patch) | |
tree | 64bba2dd0af928f62eceae57edf83b22301af1c6 | |
parent | 771b2d98e7a5045e038d42923b65879135603368 (diff) | |
download | ydb-4e48f42bb1da68482885363f334d1f317a51f78c.tar.gz |
yqv2 fetcher
cleanup
the first version of yqv2
56 files changed, 2585 insertions, 128 deletions
diff --git a/ydb/core/fq/libs/CMakeLists.txt b/ydb/core/fq/libs/CMakeLists.txt index bda2195d98..bf260a6276 100644 --- a/ydb/core/fq/libs/CMakeLists.txt +++ b/ydb/core/fq/libs/CMakeLists.txt @@ -13,6 +13,7 @@ add_subdirectory(checkpointing) add_subdirectory(checkpointing_common) add_subdirectory(cloud_audit) add_subdirectory(common) +add_subdirectory(compute) add_subdirectory(config) add_subdirectory(control_plane_config) add_subdirectory(control_plane_proxy) diff --git a/ydb/core/fq/libs/actors/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/actors/CMakeLists.darwin-x86_64.txt index 116fc09710..9dfe2f8f63 100644 --- a/ydb/core/fq/libs/actors/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/fq/libs/actors/CMakeLists.darwin-x86_64.txt @@ -29,10 +29,12 @@ target_link_libraries(fq-libs-actors PUBLIC libs-actors-logging fq-libs-checkpointing fq-libs-checkpointing_common - fq-libs-db_id_async_resolver_impl fq-libs-common + libs-compute-common + libs-compute-ydb fq-libs-control_plane_storage libs-control_plane_storage-events + fq-libs-db_id_async_resolver_impl fq-libs-db_schema fq-libs-events fq-libs-grpc @@ -47,14 +49,9 @@ target_link_libraries(fq-libs-actors PUBLIC library-yql-ast yql-core-facade core-services-mounts + dq-integration-transform library-yql-minikql yql-minikql-comp_nodes - common-token_accessor-client - yql-public-issue - public-issue-protos - yql-sql-settings - yql-utils-actor_log - dq-integration-transform providers-clickhouse-provider providers-common-codec providers-common-comp_nodes @@ -62,6 +59,7 @@ target_link_libraries(fq-libs-actors PUBLIC providers-common-metrics providers-common-provider common-schema-mkql + common-token_accessor-client providers-common-udf_resolve providers-dq-actors providers-dq-common @@ -74,9 +72,15 @@ target_link_libraries(fq-libs-actors PUBLIC providers-pq-task_meta providers-s3-provider providers-ydb-provider + yql-public-issue + public-issue-protos + yql-sql-settings library-yql-utils + yql-utils-actor_log api-protos public-lib-fq + client-draft-ydb_query + cpp-client-ydb_operation cpp-client-ydb_table ) target_sources(fq-libs-actors PRIVATE @@ -86,14 +90,12 @@ target_sources(fq-libs-actors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/nodes_health_check.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/nodes_manager.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/pending_fetcher.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/pinger.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/proxy.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/proxy_private.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/rate_limiter_resources.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/rate_limiter.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/rate_limiter_resources.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/result_writer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/run_actor.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/run_actor_params.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/table_bindings_from_bindings.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/task_get.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/task_ping.cpp diff --git a/ydb/core/fq/libs/actors/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/actors/CMakeLists.linux-aarch64.txt index 587f709afd..2e61ddafd6 100644 --- a/ydb/core/fq/libs/actors/CMakeLists.linux-aarch64.txt +++ b/ydb/core/fq/libs/actors/CMakeLists.linux-aarch64.txt @@ -30,10 +30,12 @@ target_link_libraries(fq-libs-actors PUBLIC libs-actors-logging fq-libs-checkpointing fq-libs-checkpointing_common - fq-libs-db_id_async_resolver_impl fq-libs-common + libs-compute-common + libs-compute-ydb fq-libs-control_plane_storage libs-control_plane_storage-events + fq-libs-db_id_async_resolver_impl fq-libs-db_schema fq-libs-events fq-libs-grpc @@ -48,14 +50,9 @@ target_link_libraries(fq-libs-actors PUBLIC library-yql-ast yql-core-facade core-services-mounts + dq-integration-transform library-yql-minikql yql-minikql-comp_nodes - common-token_accessor-client - yql-public-issue - public-issue-protos - yql-sql-settings - yql-utils-actor_log - dq-integration-transform providers-clickhouse-provider providers-common-codec providers-common-comp_nodes @@ -63,6 +60,7 @@ target_link_libraries(fq-libs-actors PUBLIC providers-common-metrics providers-common-provider common-schema-mkql + common-token_accessor-client providers-common-udf_resolve providers-dq-actors providers-dq-common @@ -75,9 +73,15 @@ target_link_libraries(fq-libs-actors PUBLIC providers-pq-task_meta providers-s3-provider providers-ydb-provider + yql-public-issue + public-issue-protos + yql-sql-settings library-yql-utils + yql-utils-actor_log api-protos public-lib-fq + client-draft-ydb_query + cpp-client-ydb_operation cpp-client-ydb_table ) target_sources(fq-libs-actors PRIVATE @@ -87,14 +91,12 @@ target_sources(fq-libs-actors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/nodes_health_check.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/nodes_manager.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/pending_fetcher.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/pinger.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/proxy.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/proxy_private.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/rate_limiter_resources.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/rate_limiter.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/rate_limiter_resources.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/result_writer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/run_actor.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/run_actor_params.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/table_bindings_from_bindings.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/task_get.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/task_ping.cpp diff --git a/ydb/core/fq/libs/actors/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/actors/CMakeLists.linux-x86_64.txt index 587f709afd..2e61ddafd6 100644 --- a/ydb/core/fq/libs/actors/CMakeLists.linux-x86_64.txt +++ b/ydb/core/fq/libs/actors/CMakeLists.linux-x86_64.txt @@ -30,10 +30,12 @@ target_link_libraries(fq-libs-actors PUBLIC libs-actors-logging fq-libs-checkpointing fq-libs-checkpointing_common - fq-libs-db_id_async_resolver_impl fq-libs-common + libs-compute-common + libs-compute-ydb fq-libs-control_plane_storage libs-control_plane_storage-events + fq-libs-db_id_async_resolver_impl fq-libs-db_schema fq-libs-events fq-libs-grpc @@ -48,14 +50,9 @@ target_link_libraries(fq-libs-actors PUBLIC library-yql-ast yql-core-facade core-services-mounts + dq-integration-transform library-yql-minikql yql-minikql-comp_nodes - common-token_accessor-client - yql-public-issue - public-issue-protos - yql-sql-settings - yql-utils-actor_log - dq-integration-transform providers-clickhouse-provider providers-common-codec providers-common-comp_nodes @@ -63,6 +60,7 @@ target_link_libraries(fq-libs-actors PUBLIC providers-common-metrics providers-common-provider common-schema-mkql + common-token_accessor-client providers-common-udf_resolve providers-dq-actors providers-dq-common @@ -75,9 +73,15 @@ target_link_libraries(fq-libs-actors PUBLIC providers-pq-task_meta providers-s3-provider providers-ydb-provider + yql-public-issue + public-issue-protos + yql-sql-settings library-yql-utils + yql-utils-actor_log api-protos public-lib-fq + client-draft-ydb_query + cpp-client-ydb_operation cpp-client-ydb_table ) target_sources(fq-libs-actors PRIVATE @@ -87,14 +91,12 @@ target_sources(fq-libs-actors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/nodes_health_check.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/nodes_manager.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/pending_fetcher.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/pinger.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/proxy.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/proxy_private.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/rate_limiter_resources.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/rate_limiter.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/rate_limiter_resources.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/result_writer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/run_actor.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/run_actor_params.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/table_bindings_from_bindings.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/task_get.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/task_ping.cpp diff --git a/ydb/core/fq/libs/actors/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/actors/CMakeLists.windows-x86_64.txt index 116fc09710..9dfe2f8f63 100644 --- a/ydb/core/fq/libs/actors/CMakeLists.windows-x86_64.txt +++ b/ydb/core/fq/libs/actors/CMakeLists.windows-x86_64.txt @@ -29,10 +29,12 @@ target_link_libraries(fq-libs-actors PUBLIC libs-actors-logging fq-libs-checkpointing fq-libs-checkpointing_common - fq-libs-db_id_async_resolver_impl fq-libs-common + libs-compute-common + libs-compute-ydb fq-libs-control_plane_storage libs-control_plane_storage-events + fq-libs-db_id_async_resolver_impl fq-libs-db_schema fq-libs-events fq-libs-grpc @@ -47,14 +49,9 @@ target_link_libraries(fq-libs-actors PUBLIC library-yql-ast yql-core-facade core-services-mounts + dq-integration-transform library-yql-minikql yql-minikql-comp_nodes - common-token_accessor-client - yql-public-issue - public-issue-protos - yql-sql-settings - yql-utils-actor_log - dq-integration-transform providers-clickhouse-provider providers-common-codec providers-common-comp_nodes @@ -62,6 +59,7 @@ target_link_libraries(fq-libs-actors PUBLIC providers-common-metrics providers-common-provider common-schema-mkql + common-token_accessor-client providers-common-udf_resolve providers-dq-actors providers-dq-common @@ -74,9 +72,15 @@ target_link_libraries(fq-libs-actors PUBLIC providers-pq-task_meta providers-s3-provider providers-ydb-provider + yql-public-issue + public-issue-protos + yql-sql-settings library-yql-utils + yql-utils-actor_log api-protos public-lib-fq + client-draft-ydb_query + cpp-client-ydb_operation cpp-client-ydb_table ) target_sources(fq-libs-actors PRIVATE @@ -86,14 +90,12 @@ target_sources(fq-libs-actors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/nodes_health_check.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/nodes_manager.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/pending_fetcher.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/pinger.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/proxy.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/proxy_private.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/rate_limiter_resources.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/rate_limiter.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/rate_limiter_resources.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/result_writer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/run_actor.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/run_actor_params.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/table_bindings_from_bindings.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/task_get.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/actors/task_ping.cpp diff --git a/ydb/core/fq/libs/actors/pending_fetcher.cpp b/ydb/core/fq/libs/actors/pending_fetcher.cpp index 4abcf3ed50..8def74bf71 100644 --- a/ydb/core/fq/libs/actors/pending_fetcher.cpp +++ b/ydb/core/fq/libs/actors/pending_fetcher.cpp @@ -46,11 +46,14 @@ #include <ydb/core/fq/libs/common/compression.h> #include <ydb/core/fq/libs/common/entity_id.h> #include <ydb/core/fq/libs/common/util.h> -#include <ydb/core/fq/libs/events/events.h> +#include <ydb/core/fq/libs/compute/common/config.h> +#include <ydb/core/fq/libs/compute/ydb/actors_factory.h> +#include <ydb/core/fq/libs/compute/ydb/ydb_run_actor.h> #include <ydb/core/fq/libs/config/protos/fq_config.pb.h> #include <ydb/core/fq/libs/config/protos/pinger.pb.h> #include <ydb/core/fq/libs/control_plane_storage/control_plane_storage.h> #include <ydb/core/fq/libs/control_plane_storage/events/events.h> +#include <ydb/core/fq/libs/events/events.h> #include <ydb/core/fq/libs/private_client/internal_service.h> #include <library/cpp/actors/core/log.h> @@ -138,6 +141,7 @@ public: , TenantName(tenantName) , InternalServiceId(MakeInternalServiceActorId()) , Monitoring(monitoring) + , ComputeConfig(config.GetCompute()) { Y_ENSURE(GetYqlDefaultModuleResolverWithContext(ModuleResolver)); } @@ -380,10 +384,15 @@ private: NProtoInterop::CastFromProto(task.request_started_at()), task.restart_count(), task.job_id().value(), - resources + resources, + task.execution_id(), + task.operation_id() ); - auto runActorId = Register(CreateRunActor(SelfId(), queryCounters, std::move(params))); + auto runActorId = + ComputeConfig.GetComputeType(task) == NConfig::EComputeType::YDB + ? Register(CreateYdbRunActor(SelfId(), queryCounters, std::move(params), CreateActorFactory(params, queryCounters))) + : Register(CreateRunActor(SelfId(), queryCounters, std::move(params))); RunActorMap[runActorId] = TRunActorInfo { .QueryId = queryId, .QueryName = task.query_name() }; if (!task.automatic()) { @@ -444,6 +453,7 @@ private: TString TenantName; TActorId InternalServiceId; NActors::TMon* Monitoring; + TComputeConfig ComputeConfig; }; diff --git a/ydb/core/fq/libs/actors/proxy.h b/ydb/core/fq/libs/actors/proxy.h index 27dde3798a..94ccd2e2f0 100644 --- a/ydb/core/fq/libs/actors/proxy.h +++ b/ydb/core/fq/libs/actors/proxy.h @@ -1,8 +1,8 @@ #pragma once -#include "run_actor_params.h" #include <ydb/core/mon/mon.h> +#include <ydb/core/fq/libs/compute/common/run_actor_params.h> #include <ydb/core/fq/libs/config/protos/pinger.pb.h> #include <ydb/core/fq/libs/events/events.h> #include <ydb/core/fq/libs/private_client/private_client.h> @@ -75,18 +75,6 @@ NActors::IActor* CreateResultWriter( const TInstant& deadline, ui64 resultBytesLimit); -NActors::IActor* CreatePingerActor( - const TString& tenantName, - const NYdb::NFq::TScope& scope, - const TString& userId, - const TString& id, - const TString& owner, - const NActors::TActorId parent, - const NConfig::TPingerConfig& config, - TInstant deadline, - const ::NYql::NCommon::TServiceCounters& queryCounters, - TInstant createdAt); - NActors::IActor* CreateRateLimiterResourceCreator( const NActors::TActorId& parent, const TString& ownerId, diff --git a/ydb/core/fq/libs/actors/run_actor.cpp b/ydb/core/fq/libs/actors/run_actor.cpp index 5eff64fb87..c69cae91bc 100644 --- a/ydb/core/fq/libs/actors/run_actor.cpp +++ b/ydb/core/fq/libs/actors/run_actor.cpp @@ -49,16 +49,17 @@ #include <ydb/core/protos/services.pb.h> #include <ydb/core/fq/libs/actors/nodes_manager.h> +#include <ydb/core/fq/libs/checkpoint_storage/storage_service.h> +#include <ydb/core/fq/libs/checkpointing/checkpoint_coordinator.h> +#include <ydb/core/fq/libs/checkpointing_common/defs.h> #include <ydb/core/fq/libs/common/compression.h> #include <ydb/core/fq/libs/common/entity_id.h> +#include <ydb/core/fq/libs/compute/common/pinger.h> #include <ydb/core/fq/libs/control_plane_storage/control_plane_storage.h> #include <ydb/core/fq/libs/control_plane_storage/events/events.h> #include <ydb/core/fq/libs/control_plane_storage/util.h> #include <ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h> #include <ydb/core/fq/libs/gateway/empty_gateway.h> -#include <ydb/core/fq/libs/checkpointing/checkpoint_coordinator.h> -#include <ydb/core/fq/libs/checkpointing_common/defs.h> -#include <ydb/core/fq/libs/checkpoint_storage/storage_service.h> #include <ydb/core/fq/libs/private_client/events.h> #include <ydb/core/fq/libs/private_client/private_client.h> #include <ydb/core/fq/libs/rate_limiter/utils/path.h> @@ -1249,7 +1250,7 @@ private: if (statusCode == NYql::NDqProto::StatusIds::UNSPECIFIED) { LOG_E("StatusCode == NYql::NDqProto::StatusIds::UNSPECIFIED, it is not expected, the query will be failed."); } - + if (statusCode != NYql::NDqProto::StatusIds::SUCCESS) { // Error ResignQuery(statusCode); diff --git a/ydb/core/fq/libs/compute/CMakeLists.txt b/ydb/core/fq/libs/compute/CMakeLists.txt new file mode 100644 index 0000000000..5809a856aa --- /dev/null +++ b/ydb/core/fq/libs/compute/CMakeLists.txt @@ -0,0 +1,10 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(common) +add_subdirectory(ydb) diff --git a/ydb/core/fq/libs/compute/common/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/compute/common/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..470f41558c --- /dev/null +++ b/ydb/core/fq/libs/compute/common/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,25 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(libs-compute-common) +target_compile_options(libs-compute-common PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(libs-compute-common PUBLIC + contrib-libs-cxxsupp + yutil + libs-config-protos + fq-libs-grpc + fq-libs-shared_resources + providers-dq-provider +) +target_sources(libs-compute-common PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/common/pinger.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/common/run_actor_params.cpp +) diff --git a/ydb/core/fq/libs/compute/common/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/compute/common/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..ae037da489 --- /dev/null +++ b/ydb/core/fq/libs/compute/common/CMakeLists.linux-aarch64.txt @@ -0,0 +1,26 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(libs-compute-common) +target_compile_options(libs-compute-common PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(libs-compute-common PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-config-protos + fq-libs-grpc + fq-libs-shared_resources + providers-dq-provider +) +target_sources(libs-compute-common PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/common/pinger.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/common/run_actor_params.cpp +) diff --git a/ydb/core/fq/libs/compute/common/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/compute/common/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..ae037da489 --- /dev/null +++ b/ydb/core/fq/libs/compute/common/CMakeLists.linux-x86_64.txt @@ -0,0 +1,26 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(libs-compute-common) +target_compile_options(libs-compute-common PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(libs-compute-common PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-config-protos + fq-libs-grpc + fq-libs-shared_resources + providers-dq-provider +) +target_sources(libs-compute-common PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/common/pinger.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/common/run_actor_params.cpp +) diff --git a/ydb/core/fq/libs/compute/common/CMakeLists.txt b/ydb/core/fq/libs/compute/common/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/core/fq/libs/compute/common/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/fq/libs/compute/common/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/compute/common/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..470f41558c --- /dev/null +++ b/ydb/core/fq/libs/compute/common/CMakeLists.windows-x86_64.txt @@ -0,0 +1,25 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(libs-compute-common) +target_compile_options(libs-compute-common PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(libs-compute-common PUBLIC + contrib-libs-cxxsupp + yutil + libs-config-protos + fq-libs-grpc + fq-libs-shared_resources + providers-dq-provider +) +target_sources(libs-compute-common PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/common/pinger.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/common/run_actor_params.cpp +) diff --git a/ydb/core/fq/libs/compute/common/config.h b/ydb/core/fq/libs/compute/common/config.h new file mode 100644 index 0000000000..c2b02c7ea1 --- /dev/null +++ b/ydb/core/fq/libs/compute/common/config.h @@ -0,0 +1,35 @@ +#pragma once + +#include <ydb/core/fq/libs/config/protos/compute.pb.h> +#include <ydb/core/fq/libs/protos/fq_private.pb.h> + +namespace NFq { + +class TComputeConfig { +public: + explicit TComputeConfig(const NFq::NConfig::TComputeConfig& computeConfig) + : ComputeConfig(computeConfig) + {} + + NFq::NConfig::EComputeType GetComputeType(const Fq::Private::GetTaskResult::Task& task) const { + for (const auto& rule: ComputeConfig.GetComputeMapping().GetRule()) { + const bool isMatched = AllOf(rule.GetKey(), + [&task](const auto& key) { + switch (key.key_case()) { + case NConfig::TComputeMappingRuleKey::kQueryType: + return key.GetQueryType() == task.query_type(); + case NConfig::TComputeMappingRuleKey::KEY_NOT_SET: + return false; + } + }); + if (isMatched) { + return rule.GetCompute(); + } + } + return NFq::NConfig::EComputeType::IN_PLACE; + } +private: + NFq::NConfig::TComputeConfig ComputeConfig; +}; + +} /* NFq */ diff --git a/ydb/core/fq/libs/compute/common/metrics.h b/ydb/core/fq/libs/compute/common/metrics.h new file mode 100644 index 0000000000..b13e4c81e7 --- /dev/null +++ b/ydb/core/fq/libs/compute/common/metrics.h @@ -0,0 +1,44 @@ +#pragma once + +#include <library/cpp/monlib/dynamic_counters/counters.h> + +namespace NFq { + +struct TComputeRequestCounters: public virtual TThrRefBase { + const TString Name; + + ::NMonitoring::TDynamicCounterPtr Counters; + ::NMonitoring::TDynamicCounters::TCounterPtr InFly; + ::NMonitoring::TDynamicCounters::TCounterPtr Ok; + ::NMonitoring::TDynamicCounters::TCounterPtr Error; + ::NMonitoring::TDynamicCounters::TCounterPtr Retry; + ::NMonitoring::THistogramPtr LatencyMs; + + explicit TComputeRequestCounters(const TString& name, const ::NMonitoring::TDynamicCounterPtr& counters = nullptr) + : Name(name) + , Counters(counters) + { } + + void Register(const ::NMonitoring::TDynamicCounterPtr& counters) { + Counters = counters; + Register(); + } + + void Register() { + ::NMonitoring::TDynamicCounterPtr subgroup = Counters->GetSubgroup("request", Name); + InFly = subgroup->GetCounter("InFly", false); + Ok = subgroup->GetCounter("Ok", true); + Error = subgroup->GetCounter("Error", true); + Retry = subgroup->GetCounter("Retry", true); + LatencyMs = subgroup->GetHistogram("LatencyMs", GetLatencyHistogramBuckets()); + } + +private: + static ::NMonitoring::IHistogramCollectorPtr GetLatencyHistogramBuckets() { + return ::NMonitoring::ExplicitHistogram({0, 1, 2, 5, 10, 20, 50, 100, 500, 1000, 2000, 5000, 10000, 30000, 50000, 500000}); + } +}; + +using TComputeRequestCountersPtr = TIntrusivePtr<TComputeRequestCounters>; + +} /* NFq */ diff --git a/ydb/core/fq/libs/actors/pinger.cpp b/ydb/core/fq/libs/compute/common/pinger.cpp index c9fefa9479..e4eeb9fe4f 100644 --- a/ydb/core/fq/libs/actors/pinger.cpp +++ b/ydb/core/fq/libs/compute/common/pinger.cpp @@ -1,4 +1,4 @@ -#include "proxy.h" +#include "pinger.h" #include <ydb/core/fq/libs/config/protos/pinger.pb.h> #include <ydb/core/fq/libs/control_plane_storage/control_plane_storage.h> @@ -148,7 +148,8 @@ public: const NConfig::TPingerConfig& config, TInstant deadline, const ::NYql::NCommon::TServiceCounters& queryCounters, - TInstant createdAt) + TInstant createdAt, + bool replyToSender) : Config(config) , TenantName(tenantName) , Scope(scope) @@ -160,6 +161,7 @@ public: , QueryCounters(queryCounters) , CreatedAt(createdAt) , InternalServiceId(MakeInternalServiceActorId()) + , ReplyToSender(replyToSender) { } @@ -339,7 +341,7 @@ private: if (continueLeaseRequest) { ScheduleNextPing(); } else { - Send(Parent, new TEvents::TEvForwardPingResponse(true, action), 0, ev->Cookie); + Send(ReplyToSender ? ForwardRequests.front().Request->Sender : Parent, new TEvents::TEvForwardPingResponse(true, action), 0, ev->Cookie); ForwardRequests.pop_front(); // Process next forward ping request. @@ -357,7 +359,13 @@ private: } LOG_E("Ping response error: " << errorMessage << ". Retried " << retryStateForLogging->GetRetriesCount() << " times during " << retryStateForLogging->GetRetryTime(now)); auto action = ev->Get()->Status.IsSuccess() ? ev->Get()->Result.action() : FederatedQuery::QUERY_ACTION_UNSPECIFIED; - Send(Parent, new TEvents::TEvForwardPingResponse(false, action), 0, ev->Cookie); + if (ReplyToSender) { + for (const auto& forwardRequest: ForwardRequests) { + Send(forwardRequest.Request->Sender, new TEvents::TEvForwardPingResponse(false, action), 0, ev->Cookie); + } + } else { + Send(Parent, new TEvents::TEvForwardPingResponse(false, action), 0, ev->Cookie); + } FatalError = true; ForwardRequests.clear(); } @@ -437,6 +445,7 @@ private: TSchedulerCookieHolder SchedulerCookieHolder; TActorId InternalServiceId; + bool ReplyToSender = false; }; IActor* CreatePingerActor( @@ -449,7 +458,8 @@ IActor* CreatePingerActor( const NConfig::TPingerConfig& config, TInstant deadline, const ::NYql::NCommon::TServiceCounters& queryCounters, - TInstant createdAt) + TInstant createdAt, + bool replyToSender) { return new TPingerActor( tenantName, @@ -461,7 +471,8 @@ IActor* CreatePingerActor( config, deadline, queryCounters, - createdAt); + createdAt, + replyToSender); } } /* NFq */ diff --git a/ydb/core/fq/libs/compute/common/pinger.h b/ydb/core/fq/libs/compute/common/pinger.h new file mode 100644 index 0000000000..50567377dc --- /dev/null +++ b/ydb/core/fq/libs/compute/common/pinger.h @@ -0,0 +1,23 @@ +#pragma once + +#include <library/cpp/actors/core/actorsystem.h> +#include <ydb/core/fq/libs/config/protos/pinger.pb.h> +#include <ydb/library/yql/providers/common/metrics/service_counters.h> +#include <ydb/public/lib/fq/scope.h> + +namespace NFq { + +NActors::IActor* CreatePingerActor( + const TString& tenantName, + const NYdb::NFq::TScope& scope, + const TString& userId, + const TString& id, + const TString& owner, + const NActors::TActorId parent, + const NConfig::TPingerConfig& config, + TInstant deadline, + const ::NYql::NCommon::TServiceCounters& queryCounters, + TInstant createdAt, + bool replyToSender = false); + +} /* NFq */ diff --git a/ydb/core/fq/libs/compute/common/retry_actor.h b/ydb/core/fq/libs/compute/common/retry_actor.h new file mode 100644 index 0000000000..8ffceb6357 --- /dev/null +++ b/ydb/core/fq/libs/compute/common/retry_actor.h @@ -0,0 +1,132 @@ +#pragma once + +#include "metrics.h" + +#include <ydb/core/fq/libs/compute/common/run_actor_params.h> + +#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h> + +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/retry/retry_policy.h> + +namespace NFq { + +template<typename TRequest, typename TResponse, typename ...TArgs> +class TRetryActor : public NActors::TActorBootstrapped<TRetryActor<TRequest, TResponse, TArgs...>> { +public: + using TBase = NActors::TActorBootstrapped<TRetryActor<TRequest, TResponse, TArgs...>>; + using TBase::Become; + using TBase::PassAway; + using TBase::SelfId; + using TBase::Send; + + using IRetryPolicy = IRetryPolicy<const typename TResponse::TPtr&>; + + TRetryActor(const TComputeRequestCountersPtr& counters, const NActors::TActorId& sender, const NActors::TActorId& recipient, const TArgs&... args) + : Sender(sender) + , Recipient(recipient) + , CreateMessage([=]() { + return new TRequest(args...); + }) + , RetryState(GetRetryPolicy()->CreateRetryState()) + , Delay(TDuration::Zero()) + , StartTime(TInstant::Now()) + , Counters(counters) + {} + + TRetryActor(const TComputeRequestCountersPtr& counters, const TDuration& delay, const NActors::TActorId& sender, const NActors::TActorId& recipient, const TArgs&... args) + : Sender(sender) + , Recipient(recipient) + , CreateMessage([=]() { + return new TRequest(args...); + }) + , RetryState(GetRetryPolicy()->CreateRetryState()) + , Delay(delay) + , StartTime(TInstant::Now()) + , Counters(counters) + {} + + void Bootstrap() { + Counters->InFly->Inc(); + Become(&TRetryActor::StateFunc); + NActors::TActivationContext::Schedule(Delay, new NActors::IEventHandle(Recipient, static_cast<const NActors::TActorId&>(SelfId()), CreateMessage())); + } + + STRICT_STFUNC(StateFunc, + hFunc(TResponse, Handle); + ) + + void Handle(const typename TResponse::TPtr& ev) { + const auto& response = *ev.Get()->Get(); + auto delay = RetryState->GetNextRetryDelay(ev); + if (delay) { + Counters->Retry->Inc(); + NActors::TActivationContext::Schedule(*delay, new NActors::IEventHandle(Recipient, static_cast<const NActors::TActorId&>(SelfId()), CreateMessage())); + return; + } + Counters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds()); + if (response.Status != NYdb::EStatus::SUCCESS) { + Counters->Error->Inc(); + } else { + Counters->Ok->Inc(); + } + Send(ev->Forward(Sender)); + PassAway(); + } + + static const typename IRetryPolicy::TPtr& GetRetryPolicy() { + static typename IRetryPolicy::TPtr policy = IRetryPolicy::GetExponentialBackoffPolicy([](const typename TResponse::TPtr& ev) { + const auto& status = ev->Get()->Status; + return RetryComputeClass(status); + }, TDuration::MilliSeconds(10), TDuration::MilliSeconds(200), TDuration::Seconds(30), 5); + return policy; + } + + static bool IsTransportError(const NYdb::EStatus& status) { + return static_cast<size_t>(status) >= NYdb::TRANSPORT_STATUSES_FIRST + && static_cast<size_t>(status) <= NYdb::TRANSPORT_STATUSES_LAST; + } + + static ERetryErrorClass RetryComputeClass(const NYdb::EStatus& status) { + if (IsTransportError(status)) { + return ERetryErrorClass::ShortRetry; + } + + if (status == NYdb::EStatus::INTERNAL_ERROR + || status == NYdb::EStatus::UNAVAILABLE + || status == NYdb::EStatus::TIMEOUT + || status == NYdb::EStatus::BAD_SESSION + || status == NYdb::EStatus::SESSION_EXPIRED + || status == NYdb::EStatus::UNDETERMINED + || status == NYdb::EStatus::TRANSPORT_UNAVAILABLE + || status == NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED + || status == NYdb::EStatus::CLIENT_INTERNAL_ERROR + || status == NYdb::EStatus::CLIENT_OUT_OF_RANGE + || status == NYdb::EStatus::CLIENT_DISCOVERY_FAILED) { + return ERetryErrorClass::ShortRetry; + } + + if (status == NYdb::EStatus::OVERLOADED + || status == NYdb::EStatus::SESSION_BUSY + || status == NYdb::EStatus::CLIENT_RESOURCE_EXHAUSTED + || status == NYdb::EStatus::CLIENT_LIMITS_REACHED) { + return ERetryErrorClass::LongRetry; + } + return ERetryErrorClass::NoRetry; + } + + virtual ~TRetryActor() { + Counters->InFly->Dec(); + } + +private: + NActors::TActorId Sender; + NActors::TActorId Recipient; + std::function<TRequest*()> CreateMessage; + typename IRetryPolicy::IRetryState::TPtr RetryState; + TDuration Delay; + TInstant StartTime; + TComputeRequestCountersPtr Counters; +}; + +} /* NFq */ diff --git a/ydb/core/fq/libs/actors/run_actor_params.cpp b/ydb/core/fq/libs/compute/common/run_actor_params.cpp index 3728a0ddbc..21ed96052e 100644 --- a/ydb/core/fq/libs/actors/run_actor_params.cpp +++ b/ydb/core/fq/libs/compute/common/run_actor_params.cpp @@ -49,7 +49,9 @@ TRunActorParams::TRunActorParams( TInstant requestStartedAt, ui32 restartCount, const TString& jobId, - const Fq::Private::TaskResources& resources + const Fq::Private::TaskResources& resources, + const TString& executionId, + const TString& operationId ) : YqSharedResources(yqSharedResources) , CredentialsProviderFactory(credentialsProviderFactory) @@ -96,7 +98,32 @@ TRunActorParams::TRunActorParams( , RestartCount(restartCount) , JobId(jobId) , Resources(resources) + , ExecutionId(executionId) + , OperationId(operationId, true) { } +IOutputStream& operator<<(IOutputStream& out, const TRunActorParams& params) { + return out << "Run actors params: { QueryId: " << params.QueryId + << " CloudId: " << params.CloudId + << " UserId: " << params.UserId + << " Owner: " << params.Owner + << " PreviousQueryRevision: " << params.PreviousQueryRevision + << " Connections: " << params.Connections.size() + << " Bindings: " << params.Bindings.size() + << " AccountIdSignatures: " << params.AccountIdSignatures.size() + << " QueryType: " << FederatedQuery::QueryContent::QueryType_Name(params.QueryType) + << " ExecuteMode: " << FederatedQuery::ExecuteMode_Name(params.ExecuteMode) + << " ResultId: " << params.ResultId + << " StateLoadMode: " << FederatedQuery::StateLoadMode_Name(params.StateLoadMode) + << " StreamingDisposition: " << params.StreamingDisposition + << " Status: " << FederatedQuery::QueryMeta::ComputeStatus_Name(params.Status) + << " DqGraphs: " << params.DqGraphs.size() + << " DqGraphIndex: " << params.DqGraphIndex + << " Resource.TopicConsumers: " << params.Resources.topic_consumers().size() + << " ExecutionId: " << params.ExecutionId + << " OperationId: " << (params.OperationId.GetKind() != Ydb::TOperationId::UNUSED ? ProtoToString(params.OperationId) : "<empty>") + << " }"; +} + } /* NFq */ diff --git a/ydb/core/fq/libs/actors/run_actor_params.h b/ydb/core/fq/libs/compute/common/run_actor_params.h index 4114f62fa6..b5f0843d60 100644 --- a/ydb/core/fq/libs/actors/run_actor_params.h +++ b/ydb/core/fq/libs/compute/common/run_actor_params.h @@ -1,22 +1,23 @@ #pragma once + #include <ydb/core/fq/libs/config/protos/common.pb.h> -#include <ydb/core/fq/libs/config/protos/pinger.pb.h> #include <ydb/core/fq/libs/config/protos/fq_config.pb.h> +#include <ydb/core/fq/libs/config/protos/pinger.pb.h> #include <ydb/core/fq/libs/events/events.h> #include <ydb/core/fq/libs/shared_resources/shared_resources.h> -#include <ydb/library/yql/providers/common/token_accessor/client/factory.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node.h> +#include <ydb/library/yql/providers/common/token_accessor/client/factory.h> #include <ydb/library/yql/providers/dq/provider/yql_dq_gateway.h> #include <ydb/library/yql/providers/dq/worker_manager/interface/counters.h> -#include <ydb/library/yql/providers/solomon/provider/yql_solomon_gateway.h> #include <ydb/library/yql/providers/pq/cm_client/client.h> +#include <ydb/library/yql/providers/solomon/provider/yql_solomon_gateway.h> #include <ydb/public/lib/fq/scope.h> #include <library/cpp/actors/core/actorsystem.h> -#include <library/cpp/time_provider/time_provider.h> #include <library/cpp/random_provider/random_provider.h> +#include <library/cpp/time_provider/time_provider.h> namespace NFq { @@ -66,12 +67,16 @@ struct TRunActorParams { // TODO2 : Change name TInstant requestStartedAt, ui32 restartCount, const TString& jobId, - const Fq::Private::TaskResources& resources + const Fq::Private::TaskResources& resources, + const TString& executionId, + const TString& operationId ); TRunActorParams(const TRunActorParams& params) = default; TRunActorParams(TRunActorParams&& params) = default; + friend IOutputStream& operator<<(IOutputStream& out, const TRunActorParams& params); + TYqSharedResources::TPtr YqSharedResources; NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory; NYql::IHTTPGateway::TPtr S3Gateway; @@ -120,6 +125,8 @@ struct TRunActorParams { // TODO2 : Change name const ui32 RestartCount; const TString JobId; Fq::Private::TaskResources Resources; + TString ExecutionId; + NYdb::TOperation::TOperationId OperationId; }; } /* NFq */ diff --git a/ydb/core/fq/libs/compute/ydb/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/compute/ydb/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..0dce7fd471 --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,45 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(libs-compute-ydb) +target_compile_options(libs-compute-ydb PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(libs-compute-ydb PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-protos + cpp-lwtrace-protos + libs-compute-common + libs-config-protos + libs-control_plane_storage-proto + libs-graph_params-proto + fq-libs-grpc + libs-quota_manager-proto + ydb-core-protos + library-db_pool-protos + yql-core-expr_nodes + yql-dq-expr_nodes + yql-minikql-arrow + dq-api-protos + api-grpc + api-grpc-draft + lib-operation_id-protos +) +target_sources(libs-compute-ydb PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/actors_factory.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/executer_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/stopper_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp +) diff --git a/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..cef154a1eb --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-aarch64.txt @@ -0,0 +1,46 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(libs-compute-ydb) +target_compile_options(libs-compute-ydb PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(libs-compute-ydb PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-actors-protos + cpp-lwtrace-protos + libs-compute-common + libs-config-protos + libs-control_plane_storage-proto + libs-graph_params-proto + fq-libs-grpc + libs-quota_manager-proto + ydb-core-protos + library-db_pool-protos + yql-core-expr_nodes + yql-dq-expr_nodes + yql-minikql-arrow + dq-api-protos + api-grpc + api-grpc-draft + lib-operation_id-protos +) +target_sources(libs-compute-ydb PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/actors_factory.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/executer_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/stopper_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp +) diff --git a/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..cef154a1eb --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-x86_64.txt @@ -0,0 +1,46 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(libs-compute-ydb) +target_compile_options(libs-compute-ydb PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(libs-compute-ydb PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-actors-protos + cpp-lwtrace-protos + libs-compute-common + libs-config-protos + libs-control_plane_storage-proto + libs-graph_params-proto + fq-libs-grpc + libs-quota_manager-proto + ydb-core-protos + library-db_pool-protos + yql-core-expr_nodes + yql-dq-expr_nodes + yql-minikql-arrow + dq-api-protos + api-grpc + api-grpc-draft + lib-operation_id-protos +) +target_sources(libs-compute-ydb PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/actors_factory.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/executer_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/stopper_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp +) diff --git a/ydb/core/fq/libs/compute/ydb/CMakeLists.txt b/ydb/core/fq/libs/compute/ydb/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/fq/libs/compute/ydb/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/compute/ydb/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..0dce7fd471 --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/CMakeLists.windows-x86_64.txt @@ -0,0 +1,45 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(libs-compute-ydb) +target_compile_options(libs-compute-ydb PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(libs-compute-ydb PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-protos + cpp-lwtrace-protos + libs-compute-common + libs-config-protos + libs-control_plane_storage-proto + libs-graph_params-proto + fq-libs-grpc + libs-quota_manager-proto + ydb-core-protos + library-db_pool-protos + yql-core-expr_nodes + yql-dq-expr_nodes + yql-minikql-arrow + dq-api-protos + api-grpc + api-grpc-draft + lib-operation_id-protos +) +target_sources(libs-compute-ydb PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/actors_factory.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/executer_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/stopper_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp +) diff --git a/ydb/core/fq/libs/compute/ydb/actors_factory.cpp b/ydb/core/fq/libs/compute/ydb/actors_factory.cpp new file mode 100644 index 0000000000..6f299dd6aa --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/actors_factory.cpp @@ -0,0 +1,87 @@ +#include "actors_factory.h" +#include "executer_actor.h" +#include "finalizer_actor.h" +#include "resources_cleaner_actor.h" +#include "result_writer_actor.h" +#include "status_tracker_actor.h" +#include "stopper_actor.h" +#include "ydb_connector_actor.h" + +#include <ydb/core/fq/libs/compute/common/pinger.h> + +namespace NFq { + +struct TActorFactory : public IActorFactory { + TActorFactory(const NFq::TRunActorParams& params, const ::NYql::NCommon::TServiceCounters& counters) + : Params(params) + , Counters(counters) + {} + + std::unique_ptr<NActors::IActor> CreatePinger(const NActors::TActorId& parent) const override { + return std::unique_ptr<NActors::IActor>(CreatePingerActor( + Params.TenantName, + Params.Scope, + Params.UserId, + Params.QueryId, + Params.Owner, + parent, + Params.Config.GetPinger(), + Params.Deadline, + Counters, + Params.CreatedAt, + true + )); + } + + std::unique_ptr<NActors::IActor> CreateConnector() const override { + return CreateConnectorActor(Params); + } + + std::unique_ptr<NActors::IActor> CreateExecuter(const NActors::TActorId &parent, + const NActors::TActorId &connector, + const NActors::TActorId &pinger) const override { + return CreateExecuterActor(Params, parent, connector, pinger, Counters); + } + + std::unique_ptr<NActors::IActor> CreateStatusTracker(const NActors::TActorId &parent, + const NActors::TActorId &connector, + const NActors::TActorId &pinger, + const NYdb::TOperation::TOperationId& operationId) const override { + return CreateStatusTrackerActor(Params, parent, connector, pinger, operationId, Counters); + } + + std::unique_ptr<NActors::IActor> CreateResultWriter(const NActors::TActorId& parent, + const NActors::TActorId& connector, + const NActors::TActorId& pinger, + const TString& executionId) const override { + return CreateResultWriterActor(Params, parent, connector, pinger, executionId, Counters); + } + + std::unique_ptr<NActors::IActor> CreateResourcesCleaner(const NActors::TActorId& parent, + const NActors::TActorId& connector, + const NYdb::TOperation::TOperationId& operationId) const override { + return CreateResourcesCleanerActor(Params, parent, connector, operationId, Counters); + } + + std::unique_ptr<NActors::IActor> CreateFinalizer(const NActors::TActorId& parent, + const NActors::TActorId& pinger, + NYdb::NQuery::EExecStatus execStatus) const override { + return CreateFinalizerActor(Params, parent, pinger, execStatus, Counters); + } + + std::unique_ptr<NActors::IActor> CreateStopper(const NActors::TActorId& parent, + const NActors::TActorId& connector, + const NYdb::TOperation::TOperationId& operationId) const override { + return CreateStopperActor(Params, parent, connector, operationId, Counters); + } + +private: + NFq::TRunActorParams Params; + ::NYql::NCommon::TServiceCounters Counters; +}; + +IActorFactory::TPtr CreateActorFactory(const NFq::TRunActorParams& params, const ::NYql::NCommon::TServiceCounters& counters) { + return MakeIntrusive<TActorFactory>(params, counters); +} + +} diff --git a/ydb/core/fq/libs/compute/ydb/actors_factory.h b/ydb/core/fq/libs/compute/ydb/actors_factory.h new file mode 100644 index 0000000000..77dd76b7e9 --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/actors_factory.h @@ -0,0 +1,43 @@ +#pragma once + +#include <ydb/core/fq/libs/compute/common/run_actor_params.h> + +#include <ydb/library/yql/providers/common/metrics/service_counters.h> + +#include <ydb/public/sdk/cpp/client/draft/ydb_query/query.h> + +#include <util/generic/ptr.h> + +namespace NFq { + +struct IActorFactory : public TThrRefBase { + using TPtr = TIntrusivePtr<IActorFactory>; + + virtual std::unique_ptr<NActors::IActor> CreatePinger(const NActors::TActorId& parent) const = 0; + virtual std::unique_ptr<NActors::IActor> CreateConnector() const = 0; + + virtual std::unique_ptr<NActors::IActor> CreateExecuter(const NActors::TActorId &parent, + const NActors::TActorId &connector, + const NActors::TActorId &pinger) const = 0; + virtual std::unique_ptr<NActors::IActor> CreateStatusTracker(const NActors::TActorId &parent, + const NActors::TActorId &connector, + const NActors::TActorId &pinger, + const NYdb::TOperation::TOperationId& operationId) const = 0; + virtual std::unique_ptr<NActors::IActor> CreateResultWriter(const NActors::TActorId& parent, + const NActors::TActorId& connector, + const NActors::TActorId& pinger, + const TString& executionId) const = 0; + virtual std::unique_ptr<NActors::IActor> CreateResourcesCleaner(const NActors::TActorId& parent, + const NActors::TActorId& connector, + const NYdb::TOperation::TOperationId& operationId) const = 0; + virtual std::unique_ptr<NActors::IActor> CreateFinalizer(const NActors::TActorId& parent, + const NActors::TActorId& pinger, + NYdb::NQuery::EExecStatus execStatus) const = 0; + virtual std::unique_ptr<NActors::IActor> CreateStopper(const NActors::TActorId& parent, + const NActors::TActorId& connector, + const NYdb::TOperation::TOperationId& operationId) const = 0; +}; + +IActorFactory::TPtr CreateActorFactory(const TRunActorParams& params, const ::NYql::NCommon::TServiceCounters& counters); + +} diff --git a/ydb/core/fq/libs/compute/ydb/base_compute_actor.h b/ydb/core/fq/libs/compute/ydb/base_compute_actor.h new file mode 100644 index 0000000000..8148271ec2 --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/base_compute_actor.h @@ -0,0 +1,61 @@ +#pragma once + +#include <ydb/core/fq/libs/compute/common/run_actor_params.h> + +#include <ydb/core/fq/libs/compute/common/metrics.h> + +#include <ydb/library/yql/providers/common/metrics/service_counters.h> + +#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h> + +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/retry/retry_policy.h> + +namespace NFq { + +template<typename TDerived> +class TBaseComputeActor : public NActors::TActorBootstrapped<TDerived> { +public: + using TBase = NActors::TActorBootstrapped<TDerived>; + using TBase::PassAway; + + TBaseComputeActor(const ::NYql::NCommon::TServiceCounters& queryCounters, const TString& stepName) + : Counters(MakeIntrusive<TComputeRequestCounters>("Total", queryCounters.Counters->GetSubgroup("step", stepName))) + , TotalStartTime(TInstant::Now()) + {} + + void Bootstrap() { + Counters->Register(); + Counters->InFly->Inc(); + AsDerived()->Start(); + } + + TDerived* AsDerived() { + return static_cast<TDerived*>(this); + } + + void CompleteAndPassAway() { + Counters->Ok->Inc(); + PassAway(); + } + + void FailedAndPassAway() { + Counters->Error->Inc(); + PassAway(); + } + + virtual ~TBaseComputeActor() { + Counters->InFly->Dec(); + Counters->LatencyMs->Collect((TInstant::Now() - TotalStartTime).MilliSeconds()); + } + + ::NMonitoring::TDynamicCounterPtr GetStepCountersSubgroup() const { + return Counters->Counters; + } + +private: + TComputeRequestCountersPtr Counters; + TInstant TotalStartTime; +}; + +} /* NFq */ diff --git a/ydb/core/fq/libs/compute/ydb/events/events.h b/ydb/core/fq/libs/compute/ydb/events/events.h new file mode 100644 index 0000000000..1c041a59d2 --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/events/events.h @@ -0,0 +1,232 @@ +#pragma once + +#include <ydb/core/fq/libs/control_plane_storage/events/events.h> +#include <ydb/core/fq/libs/quota_manager/events/events.h> + +#include <ydb/public/api/protos/draft/fq.pb.h> +#include <ydb/public/sdk/cpp/client/draft/ydb_query/query.h> + +#include <library/cpp/actors/core/event_pb.h> +#include <library/cpp/actors/core/events.h> +#include <library/cpp/actors/interconnect/events_local.h> + +#include <ydb/library/yql/public/issue/yql_issue.h> + +namespace NFq { + +struct TEvPrivate { + // Event ids + enum EEv : ui32 { + EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), + + EvExecuteScriptRequest = EvBegin, + EvExecuteScriptResponse, + EvGetOperationRequest, + EvGetOperationResponse, + EvFetchScriptResultRequest, + EvFetchScriptResultResponse, + EvCancelOperationRequest, + EvCancelOperationResponse, + EvForgetOperationRequest, + EvForgetOperationResponse, + + EvExecuterResponse, + EvStatusTrackerResponse, + EvResultWriterResponse, + EvResourcesCleanerResponse, + EvFinalizerResponse, + EvStopperResponse, + + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)"); + + // Events + struct TEvExecuteScriptRequest : public NActors::TEventLocal<TEvExecuteScriptRequest, EvExecuteScriptRequest> { + TEvExecuteScriptRequest(TString sql, TString idempotencyKey) + : Sql(std::move(sql)) + , IdempotencyKey(std::move(idempotencyKey)) + {} + + TString Sql; + TString IdempotencyKey; + }; + + struct TEvExecuteScriptResponse : public NActors::TEventLocal<TEvExecuteScriptResponse, EvExecuteScriptResponse> { + TEvExecuteScriptResponse(NYql::TIssues issues, NYdb::EStatus status) + : Issues(issues) + , Status(status) + {} + + TEvExecuteScriptResponse(NYdb::TOperation::TOperationId operationId, const TString& executionId) + : OperationId(operationId) + , ExecutionId(executionId) + , Status(NYdb::EStatus::SUCCESS) + {} + + NYdb::TOperation::TOperationId OperationId; + TString ExecutionId; + NYql::TIssues Issues; + NYdb::EStatus Status; + }; + + struct TEvGetOperationRequest : public NActors::TEventLocal<TEvGetOperationRequest, EvGetOperationRequest> { + explicit TEvGetOperationRequest(const NYdb::TOperation::TOperationId& operationId) + : OperationId(operationId) + {} + + NYdb::TOperation::TOperationId OperationId; + }; + + struct TEvGetOperationResponse : public NActors::TEventLocal<TEvGetOperationResponse, EvGetOperationResponse> { + TEvGetOperationResponse(NYql::TIssues issues, NYdb::EStatus status) + : Issues(issues) + , Status(status) + {} + + TEvGetOperationResponse(NYdb::NQuery::EExecStatus execStatus, NYql::TIssues issues) + : ExecStatus(execStatus) + , Issues(issues) + , Status(NYdb::EStatus::SUCCESS) + {} + + NYdb::NQuery::EExecStatus ExecStatus = NYdb::NQuery::EExecStatus::Unspecified; + NYql::TIssues Issues; + NYdb::EStatus Status; + }; + + struct TEvFetchScriptResultRequest : public NActors::TEventLocal<TEvFetchScriptResultRequest, EvFetchScriptResultRequest> { + TEvFetchScriptResultRequest(int64_t rowOffset, TString executionId) + : RowOffset(rowOffset) + , ExecutionId(std::move(executionId)) + {} + + int64_t RowOffset = 0; + TString ExecutionId; + }; + + struct TEvFetchScriptResultResponse : public NActors::TEventLocal<TEvFetchScriptResultResponse, EvFetchScriptResultResponse> { + TEvFetchScriptResultResponse(NYql::TIssues issues, NYdb::EStatus status) + : Issues(issues) + , Status(status) + {} + + explicit TEvFetchScriptResultResponse(NYdb::TResultSet resultSet) + : ResultSet(std::move(resultSet)) + , Status(NYdb::EStatus::SUCCESS) + {} + + TMaybe<NYdb::TResultSet> ResultSet; + NYql::TIssues Issues; + NYdb::EStatus Status; + }; + + struct TEvCancelOperationRequest : public NActors::TEventLocal<TEvCancelOperationRequest, EvCancelOperationRequest> { + explicit TEvCancelOperationRequest(const NYdb::TOperation::TOperationId& operationId) + : OperationId(operationId) + {} + + NYdb::TOperation::TOperationId OperationId; + }; + + struct TEvCancelOperationResponse : public NActors::TEventLocal<TEvCancelOperationResponse, EvCancelOperationResponse> { + TEvCancelOperationResponse(NYql::TIssues issues, NYdb::EStatus status) + : Issues(issues) + , Status(status) + {} + + NYql::TIssues Issues; + NYdb::EStatus Status; + }; + + struct TEvForgetOperationRequest : public NActors::TEventLocal<TEvForgetOperationRequest, EvForgetOperationRequest> { + explicit TEvForgetOperationRequest(const NYdb::TOperation::TOperationId& operationId) + : OperationId(operationId) + {} + + NYdb::TOperation::TOperationId OperationId; + }; + + struct TEvForgetOperationResponse : public NActors::TEventLocal<TEvForgetOperationResponse, EvForgetOperationResponse> { + TEvForgetOperationResponse(NYql::TIssues issues, NYdb::EStatus status) + : Issues(issues) + , Status(status) + {} + + NYql::TIssues Issues; + NYdb::EStatus Status; + }; + + struct TEvExecuterResponse : public NActors::TEventLocal<TEvExecuterResponse, EvExecuterResponse> { + TEvExecuterResponse(NYdb::TOperation::TOperationId operationId, const TString& executionId) + : OperationId(operationId) + , ExecutionId(executionId) + , Success(true) + {} + + explicit TEvExecuterResponse(const NYql::TIssues& issues) + : Success(false) + , Issues(issues) + {} + + NYdb::TOperation::TOperationId OperationId; + TString ExecutionId; + bool Success = true; + NYql::TIssues Issues; + }; + + struct TEvStatusTrackerResponse : public NActors::TEventLocal<TEvStatusTrackerResponse, EvStatusTrackerResponse> { + TEvStatusTrackerResponse(NYql::TIssues issues, NYdb::EStatus status, NYdb::NQuery::EExecStatus execStatus) + : Issues(issues) + , Status(status) + , ExecStatus(execStatus) + {} + + NYql::TIssues Issues; + NYdb::EStatus Status; + NYdb::NQuery::EExecStatus ExecStatus; + }; + + struct TEvResultWriterResponse : public NActors::TEventLocal<TEvResultWriterResponse, EvResultWriterResponse> { + TEvResultWriterResponse(NYql::TIssues issues, NYdb::EStatus status) + : Issues(issues) + , Status(status) + {} + + NYql::TIssues Issues; + NYdb::EStatus Status; + }; + + struct TEvResourcesCleanerResponse : public NActors::TEventLocal<TEvResourcesCleanerResponse, EvResourcesCleanerResponse> { + TEvResourcesCleanerResponse(NYql::TIssues issues, NYdb::EStatus status) + : Issues(issues) + , Status(status) + {} + + NYql::TIssues Issues; + NYdb::EStatus Status; + }; + + struct TEvFinalizerResponse : public NActors::TEventLocal<TEvFinalizerResponse, EvFinalizerResponse> { + TEvFinalizerResponse(NYql::TIssues issues, NYdb::EStatus status) + : Issues(issues) + , Status(status) + {} + + NYql::TIssues Issues; + NYdb::EStatus Status; + }; + + struct TEvStopperResponse : public NActors::TEventLocal<TEvStopperResponse, EvStopperResponse> { + TEvStopperResponse(NYql::TIssues issues, NYdb::EStatus status) + : Issues(issues) + , Status(status) + {} + + NYql::TIssues Issues; + NYdb::EStatus Status; + }; +}; + +} diff --git a/ydb/core/fq/libs/compute/ydb/executer_actor.cpp b/ydb/core/fq/libs/compute/ydb/executer_actor.cpp new file mode 100644 index 0000000000..563c3151cd --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/executer_actor.cpp @@ -0,0 +1,150 @@ +#include "base_compute_actor.h" + +#include <ydb/core/fq/libs/common/util.h> +#include <ydb/core/fq/libs/compute/common/metrics.h> +#include <ydb/core/fq/libs/compute/common/retry_actor.h> +#include <ydb/core/fq/libs/compute/common/run_actor_params.h> +#include <ydb/core/fq/libs/compute/ydb/events/events.h> +#include <ydb/core/fq/libs/ydb/ydb.h> +#include <ydb/core/protos/services.pb.h> + +#include <ydb/public/sdk/cpp/client/draft/ydb_query/client.h> +#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h> + +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/log.h> + + +#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ExecuterActor] QueryId: " << Params.QueryId << " " << stream) +#define LOG_W(stream) LOG_WARN_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ExecuterActor] QueryId: " << Params.QueryId << " " << stream) +#define LOG_I(stream) LOG_INFO_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ExecuterActor] QueryId: " << Params.QueryId << " " << stream) +#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ExecuterActor] QueryId: " << Params.QueryId << " " << stream) +#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ExecuterActor] QueryId: " << Params.QueryId << " " << stream) + +namespace NFq { + +using namespace NActors; +using namespace NFq; + +class TExecuterActor : public TBaseComputeActor<TExecuterActor> { +public: + enum ERequestType { + RT_EXECUTE_SCRIPT, + RT_PING, + RT_MAX + }; + + class TCounters: public virtual TThrRefBase { + std::array<TComputeRequestCountersPtr, RT_MAX> Requests = CreateArray<RT_MAX, TComputeRequestCountersPtr>({ + { MakeIntrusive<TComputeRequestCounters>("ExecuteScript") }, + { MakeIntrusive<TComputeRequestCounters>("Ping") } + }); + + ::NMonitoring::TDynamicCounterPtr Counters; + + public: + explicit TCounters(const ::NMonitoring::TDynamicCounterPtr& counters) + : Counters(counters) + { + for (auto& request: Requests) { + request->Register(Counters); + } + } + + TComputeRequestCountersPtr GetCounters(ERequestType type) { + return Requests[type]; + } + }; + + TExecuterActor(const TRunActorParams& params, const TActorId& parent, const TActorId& connector, const TActorId& pinger, const ::NYql::NCommon::TServiceCounters& queryCounters) + : TBaseComputeActor(queryCounters, "Executer") + , Params(params) + , Parent(parent) + , Connector(connector) + , Pinger(pinger) + , Counters(GetStepCountersSubgroup()) + {} + + static constexpr char ActorName[] = "FQ_EXECUTER_ACTOR"; + + void Start() { + LOG_I("Bootstrap"); + Become(&TExecuterActor::StateFunc); + SendExecuteScript(); + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvPrivate::TEvExecuteScriptResponse, Handle); + hFunc(TEvents::TEvForwardPingResponse, Handle); + ) + + void Handle(const TEvents::TEvForwardPingResponse::TPtr& ev) { + auto pingCounters = Counters.GetCounters(ERequestType::RT_PING); + pingCounters->InFly->Dec(); + pingCounters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds()); + if (ev.Get()->Get()->Success) { + pingCounters->Ok->Inc(); + LOG_I("Information about the operation id and execution id is stored. ExecutionId: " << ExecutionId << " OperationId: " << ProtoToString(OperationId)); + Send(Parent, new TEvPrivate::TEvExecuterResponse(OperationId, ExecutionId)); + CompleteAndPassAway(); + } else { + pingCounters->Error->Inc(); + // Without the idempotency key, we lose the running operation here + LOG_E("Error saving information about the operation id and execution id. ExecutionId: " << ExecutionId << " OperationId: " << ProtoToString(OperationId)); + Send(Parent, new TEvPrivate::TEvExecuterResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error saving information about the operation id and execution id. ExecutionId: " << ExecutionId << " OperationId: " << ProtoToString(OperationId)}})); + FailedAndPassAway(); + } + } + + void Handle(const TEvPrivate::TEvExecuteScriptResponse::TPtr& ev) { + const auto& response = *ev.Get()->Get(); + if (response.Status != NYdb::EStatus::SUCCESS) { + LOG_E("Can't execute script: " << ev->Get()->Issues.ToOneLineString()); + Send(Parent, new TEvPrivate::TEvExecuterResponse(ev->Get()->Issues)); + FailedAndPassAway(); + return; + } + ExecutionId = response.ExecutionId; + OperationId = response.OperationId; + LOG_I("Execution has been created. ExecutionId: " << ExecutionId << " OperationId: " << ProtoToString(OperationId)); + SendPingTask(); + } + + void SendExecuteScript() { + Register(new TRetryActor<TEvPrivate::TEvExecuteScriptRequest, TEvPrivate::TEvExecuteScriptResponse, TString, TString>(Counters.GetCounters(ERequestType::RT_EXECUTE_SCRIPT), SelfId(), Connector, Params.Sql, Params.JobId)); + } + + void SendPingTask() { + Fq::Private::PingTaskRequest pingTaskRequest; + pingTaskRequest.set_execution_id(ExecutionId); + pingTaskRequest.set_operation_id(ProtoToString(OperationId)); + pingTaskRequest.set_status(::FederatedQuery::QueryMeta::RUNNING); + auto pingCounters = Counters.GetCounters(ERequestType::RT_PING); + pingCounters->InFly->Inc(); + StartTime = TInstant::Now(); + Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest)); + } + +private: + TRunActorParams Params; + TActorId Parent; + TActorId Connector; + TActorId Pinger; + NYdb::TOperation::TOperationId OperationId; + TString ExecutionId; + TCounters Counters; + TInstant StartTime; +}; + +std::unique_ptr<NActors::IActor> CreateExecuterActor(const TRunActorParams& params, + const TActorId& parent, + const TActorId& connector, + const TActorId& pinger, + const ::NYql::NCommon::TServiceCounters& queryCounters) { + return std::make_unique<TExecuterActor>(params, parent, connector, pinger, queryCounters); +} + +} diff --git a/ydb/core/fq/libs/compute/ydb/executer_actor.h b/ydb/core/fq/libs/compute/ydb/executer_actor.h new file mode 100644 index 0000000000..7d50f267ff --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/executer_actor.h @@ -0,0 +1,17 @@ +#pragma once + +#include <ydb/core/fq/libs/compute/common/run_actor_params.h> + +#include <ydb/library/yql/providers/common/metrics/service_counters.h> + +#include <library/cpp/actors/core/actor.h> + +namespace NFq { + +std::unique_ptr<NActors::IActor> CreateExecuterActor(const TRunActorParams& params, + const NActors::TActorId& parent, + const NActors::TActorId& connector, + const NActors::TActorId& pinger, + const ::NYql::NCommon::TServiceCounters& queryCounters); + +} diff --git a/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp b/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp new file mode 100644 index 0000000000..137169651c --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp @@ -0,0 +1,124 @@ +#include "base_compute_actor.h" + +#include <ydb/core/fq/libs/common/util.h> +#include <ydb/core/fq/libs/compute/common/metrics.h> +#include <ydb/core/fq/libs/compute/common/run_actor_params.h> +#include <ydb/core/fq/libs/compute/ydb/events/events.h> +#include <ydb/core/fq/libs/ydb/ydb.h> +#include <ydb/core/protos/services.pb.h> + +#include <ydb/library/yql/providers/common/metrics/service_counters.h> + +#include <ydb/public/sdk/cpp/client/draft/ydb_query/client.h> +#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h> + +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/log.h> + + +#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [Finalizer] QueryId: " << Params.QueryId << " " << stream) +#define LOG_W(stream) LOG_WARN_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [Finalizer] QueryId: " << Params.QueryId << " " << stream) +#define LOG_I(stream) LOG_INFO_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [Finalizer] QueryId: " << Params.QueryId << " " << stream) +#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [Finalizer] QueryId: " << Params.QueryId << " " << stream) +#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [Finalizer] QueryId: " << Params.QueryId << " " << stream) + +namespace NFq { + +using namespace NActors; +using namespace NFq; + +class TFinalizerActor : public TBaseComputeActor<TFinalizerActor> { +public: + enum ERequestType { + RT_PING, + RT_MAX + }; + + class TCounters: public virtual TThrRefBase { + std::array<TComputeRequestCountersPtr, RT_MAX> Requests = CreateArray<RT_MAX, TComputeRequestCountersPtr>({ + { MakeIntrusive<TComputeRequestCounters>("Ping") } + }); + + ::NMonitoring::TDynamicCounterPtr Counters; + + public: + explicit TCounters(const ::NMonitoring::TDynamicCounterPtr& counters) + : Counters(counters) + { + for (auto& request: Requests) { + request->Register(Counters); + } + } + + TComputeRequestCountersPtr GetCounters(ERequestType type) { + return Requests[type]; + } + }; + + TFinalizerActor(const TRunActorParams& params, const TActorId& parent, const TActorId& pinger, NYdb::NQuery::EExecStatus execStatus, const ::NYql::NCommon::TServiceCounters& queryCounters) + : TBaseComputeActor(queryCounters, "Finalizer") + , Params(params) + , Parent(parent) + , Pinger(pinger) + , ExecStatus(execStatus) + , Counters(GetStepCountersSubgroup()) + , StartTime(TInstant::Now()) + {} + + static constexpr char ActorName[] = "FQ_FINALIZER_ACTOR"; + + void Start() { + LOG_I("Start finalizer actor. Compute state: " << FederatedQuery::QueryMeta::ComputeStatus_Name(Params.Status)); + auto pingCounters = Counters.GetCounters(ERequestType::RT_PING); + pingCounters->InFly->Inc(); + Become(&TFinalizerActor::StateFunc); + Fq::Private::PingTaskRequest pingTaskRequest; + if (ExecStatus == NYdb::NQuery::EExecStatus::Completed || Params.Status == FederatedQuery::QueryMeta::COMPLETING) { + pingTaskRequest.mutable_result_id()->set_value(Params.ResultId); + } + pingTaskRequest.set_status(ExecStatus == NYdb::NQuery::EExecStatus::Completed || Params.Status == FederatedQuery::QueryMeta::COMPLETING ? ::FederatedQuery::QueryMeta::COMPLETED : ::FederatedQuery::QueryMeta::FAILED); + Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest, true)); + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvents::TEvForwardPingResponse, Handle); + ) + + void Handle(const TEvents::TEvForwardPingResponse::TPtr& ev) { + auto pingCounters = Counters.GetCounters(ERequestType::RT_PING); + pingCounters->InFly->Dec(); + pingCounters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds()); + if (ev.Get()->Get()->Success) { + pingCounters->Ok->Inc(); + LOG_I("Query moved to terminal state "); + Send(Parent, new TEvPrivate::TEvFinalizerResponse({}, NYdb::EStatus::SUCCESS)); + CompleteAndPassAway(); + } else { + pingCounters->Error->Inc(); + LOG_E("Error moving the query to the terminal state"); + Send(Parent, new TEvPrivate::TEvFinalizerResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error moving the query to the terminal state"}}, NYdb::EStatus::INTERNAL_ERROR)); + FailedAndPassAway(); + } + } + +private: + TRunActorParams Params; + TActorId Parent; + TActorId Pinger; + NYdb::NQuery::EExecStatus ExecStatus; + TCounters Counters; + TInstant StartTime; +}; + +std::unique_ptr<NActors::IActor> CreateFinalizerActor(const TRunActorParams& params, + const TActorId& parent, + const TActorId& pinger, + NYdb::NQuery::EExecStatus execStatus, + const ::NYql::NCommon::TServiceCounters& queryCounters) { + return std::make_unique<TFinalizerActor>(params, parent, pinger, execStatus, queryCounters); +} + +} diff --git a/ydb/core/fq/libs/compute/ydb/finalizer_actor.h b/ydb/core/fq/libs/compute/ydb/finalizer_actor.h new file mode 100644 index 0000000000..1435328285 --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/finalizer_actor.h @@ -0,0 +1,19 @@ +#pragma once + +#include <ydb/core/fq/libs/compute/common/run_actor_params.h> + +#include <ydb/library/yql/providers/common/metrics/service_counters.h> + +#include <ydb/public/sdk/cpp/client/draft/ydb_query/query.h> + +#include <library/cpp/actors/core/actor.h> + +namespace NFq { + +std::unique_ptr<NActors::IActor> CreateFinalizerActor(const TRunActorParams& params, + const NActors::TActorId& parent, + const NActors::TActorId& pinger, + NYdb::NQuery::EExecStatus execStatus, + const ::NYql::NCommon::TServiceCounters& queryCounters); + +} diff --git a/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.cpp b/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.cpp new file mode 100644 index 0000000000..90c0fda4ca --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.cpp @@ -0,0 +1,111 @@ +#include "base_compute_actor.h" +#include "resources_cleaner_actor.h" + +#include <ydb/core/fq/libs/common/util.h> +#include <ydb/core/fq/libs/compute/common/metrics.h> +#include <ydb/core/fq/libs/compute/common/retry_actor.h> +#include <ydb/core/fq/libs/compute/common/run_actor_params.h> +#include <ydb/core/fq/libs/compute/ydb/events/events.h> +#include <ydb/core/fq/libs/ydb/ydb.h> +#include <ydb/core/protos/services.pb.h> + +#include <ydb/public/sdk/cpp/client/draft/ydb_query/client.h> +#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h> + +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/log.h> + + +#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ResourcesCleaner] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream) +#define LOG_W(stream) LOG_WARN_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ResourcesCleaner] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream) +#define LOG_I(stream) LOG_INFO_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ResourcesCleaner] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream) +#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ResourcesCleaner] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream) +#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ResourcesCleaner] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream) + +namespace NFq { + +using namespace NActors; +using namespace NFq; + +class TResourcesCleanerActor : public TBaseComputeActor<TResourcesCleanerActor> { +public: + enum ERequestType { + RT_FORGET_OPERATION, + RT_MAX + }; + + class TCounters: public virtual TThrRefBase { + std::array<TComputeRequestCountersPtr, RT_MAX> Requests = CreateArray<RT_MAX, TComputeRequestCountersPtr>({ + { MakeIntrusive<TComputeRequestCounters>("ForgetOperation") } + }); + + ::NMonitoring::TDynamicCounterPtr Counters; + + public: + explicit TCounters(const ::NMonitoring::TDynamicCounterPtr& counters) + : Counters(counters) + { + for (auto& request: Requests) { + request->Register(Counters); + } + } + + TComputeRequestCountersPtr GetCounters(ERequestType type) { + return Requests[type]; + } + }; + + TResourcesCleanerActor(const TRunActorParams& params, const TActorId& parent, const TActorId& connector, const NYdb::TOperation::TOperationId& operationId, const ::NYql::NCommon::TServiceCounters& queryCounters) + : TBaseComputeActor(queryCounters, "ResourcesCleaner") + , Params(params) + , Parent(parent) + , Connector(connector) + , OperationId(operationId) + , Counters(GetStepCountersSubgroup()) + {} + + static constexpr char ActorName[] = "FQ_RESOURCES_CLEANER_ACTOR"; + + void Start() { + LOG_I("Start resources cleaner actor. Compute state: " << FederatedQuery::QueryMeta::ComputeStatus_Name(Params.Status)); + Become(&TResourcesCleanerActor::StateFunc); + Register(new TRetryActor<TEvPrivate::TEvForgetOperationRequest, TEvPrivate::TEvForgetOperationResponse, NYdb::TOperation::TOperationId>(Counters.GetCounters(ERequestType::RT_FORGET_OPERATION), SelfId(), Connector, OperationId)); + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvPrivate::TEvForgetOperationResponse, Handle); + ) + + void Handle(const TEvPrivate::TEvForgetOperationResponse::TPtr& ev) { + const auto& response = *ev.Get()->Get(); + if (response.Status != NYdb::EStatus::SUCCESS) { + LOG_E("Can't forget operation: " << ev->Get()->Issues.ToOneLineString()); + Send(Parent, new TEvPrivate::TEvResourcesCleanerResponse(ev->Get()->Issues, ev->Get()->Status)); + FailedAndPassAway(); + return; + } + LOG_I("Operation successfully forgotten"); + Send(Parent, new TEvPrivate::TEvResourcesCleanerResponse({}, NYdb::EStatus::SUCCESS)); + CompleteAndPassAway(); + } + +private: + TRunActorParams Params; + TActorId Parent; + TActorId Connector; + NYdb::TOperation::TOperationId OperationId; + TCounters Counters; +}; + +std::unique_ptr<NActors::IActor> CreateResourcesCleanerActor(const TRunActorParams& params, + const TActorId& parent, + const TActorId& connector, + const NYdb::TOperation::TOperationId& operationId, + const ::NYql::NCommon::TServiceCounters& queryCounters) { + return std::make_unique<TResourcesCleanerActor>(params, parent, connector, operationId, queryCounters); +} + +} diff --git a/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.h b/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.h new file mode 100644 index 0000000000..0d4e796b43 --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.h @@ -0,0 +1,17 @@ +#pragma once + +#include <ydb/core/fq/libs/compute/common/run_actor_params.h> + +#include <ydb/library/yql/providers/common/metrics/service_counters.h> + +#include <library/cpp/actors/core/actor.h> + +namespace NFq { + +std::unique_ptr<NActors::IActor> CreateResourcesCleanerActor(const TRunActorParams& params, + const NActors::TActorId& parent, + const NActors::TActorId& connector, + const NYdb::TOperation::TOperationId& operationId, + const ::NYql::NCommon::TServiceCounters& queryCounters); + +} diff --git a/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp b/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp new file mode 100644 index 0000000000..6f46f98cf8 --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp @@ -0,0 +1,196 @@ +#include "base_compute_actor.h" + +#include <ydb/core/fq/libs/common/util.h> +#include <ydb/core/fq/libs/compute/common/metrics.h> +#include <ydb/core/fq/libs/compute/common/retry_actor.h> +#include <ydb/core/fq/libs/compute/common/run_actor_params.h> +#include <ydb/core/fq/libs/compute/ydb/events/events.h> +#include <ydb/core/fq/libs/private_client/events.h> +#include <ydb/core/fq/libs/ydb/ydb.h> +#include <ydb/core/protos/services.pb.h> + +#include <ydb/library/yql/providers/common/metrics/service_counters.h> + +#include <ydb/public/sdk/cpp/client/draft/ydb_query/client.h> +#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h> + +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/log.h> +#include <library/cpp/protobuf/interop/cast.h> + +#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ResultWriter] QueryId: " << Params.QueryId << " ExecutionId: " << ExecutionId << " " << stream) +#define LOG_W(stream) LOG_WARN_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ResultWriter] QueryId: " << Params.QueryId << " ExecutionId: " << ExecutionId << " " << stream) +#define LOG_I(stream) LOG_INFO_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ResultWriter] QueryId: " << Params.QueryId << " ExecutionId: " << ExecutionId << " " << stream) +#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ResultWriter] QueryId: " << Params.QueryId << " ExecutionId: " << ExecutionId << " " << stream) +#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ResultWriter] QueryId: " << Params.QueryId << " ExecutionId: " << ExecutionId << " " << stream) + +namespace NFq { + +using namespace NActors; +using namespace NFq; + +class TResultWriterActor : public TBaseComputeActor<TResultWriterActor> { +public: + enum ERequestType { + RT_FETCH_SCRIPT_RESULT, + RT_WRITE_RESULT, + RT_PING, + RT_MAX + }; + + class TCounters: public virtual TThrRefBase { + std::array<TComputeRequestCountersPtr, RT_MAX> Requests = CreateArray<RT_MAX, TComputeRequestCountersPtr>({ + { MakeIntrusive<TComputeRequestCounters>("FetchScriptResult") }, + { MakeIntrusive<TComputeRequestCounters>("WriteResult") }, + { MakeIntrusive<TComputeRequestCounters>("Ping") } + }); + + ::NMonitoring::TDynamicCounterPtr Counters; + + public: + explicit TCounters(const ::NMonitoring::TDynamicCounterPtr& counters) + : Counters(counters) + { + for (auto& request: Requests) { + request->Register(Counters); + } + } + + TComputeRequestCountersPtr GetCounters(ERequestType type) { + return Requests[type]; + } + }; + + TResultWriterActor(const TRunActorParams& params, const TActorId& parent, const TActorId& connector, const TActorId& pinger, const TString& executionId, const ::NYql::NCommon::TServiceCounters& queryCounters) + : TBaseComputeActor(queryCounters, "ResultWriter") + , Params(params) + , Parent(parent) + , Connector(connector) + , Pinger(pinger) + , ExecutionId(executionId) + , Counters(GetStepCountersSubgroup()) + {} + + static constexpr char ActorName[] = "FQ_RESULT_WRITER_ACTOR"; + + void Start() { + LOG_I("Start result writer actor. Compute state: " << FederatedQuery::QueryMeta::ComputeStatus_Name(Params.Status)); + Become(&TResultWriterActor::StateFunc); + SendFetchScriptResultRequest(); + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvPrivate::TEvFetchScriptResultResponse, Handle); + hFunc(NFq::TEvInternalService::TEvWriteResultResponse, Handle); + hFunc(TEvents::TEvForwardPingResponse, Handle); + ) + + void Handle(const TEvents::TEvForwardPingResponse::TPtr& ev) { + auto pingCounters = Counters.GetCounters(ERequestType::RT_PING); + pingCounters->InFly->Dec(); + pingCounters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds()); + if (ev.Get()->Get()->Success) { + pingCounters->Ok->Inc(); + LOG_I("The result has been moved"); + Send(Parent, new TEvPrivate::TEvResultWriterResponse({}, NYdb::EStatus::SUCCESS)); + CompleteAndPassAway(); + } else { + pingCounters->Error->Inc(); + LOG_E("Move result error"); + Send(Parent, new TEvPrivate::TEvResultWriterResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Move result error. ExecutionId: " << ExecutionId}}, NYdb::EStatus::INTERNAL_ERROR)); + FailedAndPassAway(); + } + } + + void Handle(const TEvPrivate::TEvFetchScriptResultResponse::TPtr& ev) { + const auto& response = *ev.Get()->Get(); + if (response.Status != NYdb::EStatus::SUCCESS) { + LOG_E("Can't fetch script result: " << ev->Get()->Issues.ToOneLineString()); + Send(Parent, new TEvPrivate::TEvResultWriterResponse(ev->Get()->Issues, NYdb::EStatus::INTERNAL_ERROR)); + FailedAndPassAway(); + return; + } + + StartTime = TInstant::Now(); + const auto resultSetProto = NYdb::TProtoAccessor::GetProto(*response.ResultSet); + if (response.ResultSet->RowsCount() == 0) { + Fq::Private::PingTaskRequest pingTaskRequest; + pingTaskRequest.mutable_result_id()->set_value(Params.ResultId); + pingTaskRequest.set_result_set_count(1); + auto& resultSetMeta = *pingTaskRequest.add_result_set_meta(); + resultSetMeta.set_rows_count(Offset); + for (const auto& column: resultSetProto.columns()) { + *resultSetMeta.add_column() = column; + } + auto pingCounters = Counters.GetCounters(ERequestType::RT_PING); + pingCounters->InFly->Inc(); + Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest)); + return; + } + + auto chunk = CreateProtoRequestWithoutResultSet(Offset); + Offset += response.ResultSet->RowsCount(); + *chunk.mutable_result_set() = resultSetProto; + auto writeResultCounters = Counters.GetCounters(ERequestType::RT_WRITE_RESULT); + writeResultCounters->InFly->Inc(); + Send(NFq::MakeInternalServiceActorId(), new NFq::TEvInternalService::TEvWriteResultRequest(std::move(chunk))); + } + + void Handle(const NFq::TEvInternalService::TEvWriteResultResponse::TPtr& ev) { + auto writeResultCounters = Counters.GetCounters(ERequestType::RT_WRITE_RESULT); + writeResultCounters->InFly->Dec(); + writeResultCounters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds()); + if (ev.Get()->Get()->Status.IsSuccess()) { + writeResultCounters->Ok->Inc(); + LOG_I("Result successfully written for offset " << Offset); + SendFetchScriptResultRequest(); + } else { + writeResultCounters->Error->Inc(); + LOG_E("Error writing result for offset " << Offset); + Send(Parent, new TEvPrivate::TEvResultWriterResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error writing result for offset " << Offset}}, NYdb::EStatus::INTERNAL_ERROR)); + FailedAndPassAway(); + } + } + + void SendFetchScriptResultRequest() { + auto fetchScriptResultCounters = Counters.GetCounters(ERequestType::RT_FETCH_SCRIPT_RESULT); + fetchScriptResultCounters->InFly->Inc(); + StartTime = TInstant::Now(); + Register(new TRetryActor<TEvPrivate::TEvFetchScriptResultRequest, TEvPrivate::TEvFetchScriptResultResponse, int64_t, TString>(Counters.GetCounters(ERequestType::RT_FETCH_SCRIPT_RESULT), SelfId(), Connector, Offset, ExecutionId)); + } + + Fq::Private::WriteTaskResultRequest CreateProtoRequestWithoutResultSet(ui64 startRowIndex) { + Fq::Private::WriteTaskResultRequest protoReq; + protoReq.set_owner_id(Params.Owner); + protoReq.mutable_result_id()->set_value(Params.ResultId); + protoReq.set_offset(startRowIndex); + protoReq.set_result_set_id(0); + protoReq.set_request_id(0); + *protoReq.mutable_deadline() = NProtoInterop::CastToProto(Params.Deadline); + return protoReq; + } + +private: + TRunActorParams Params; + TActorId Parent; + TActorId Connector; + TActorId Pinger; + TString ExecutionId; + TCounters Counters; + TInstant StartTime; + int64_t Offset = 0; +}; + +std::unique_ptr<NActors::IActor> CreateResultWriterActor(const TRunActorParams& params, + const TActorId& parent, + const TActorId& connector, + const TActorId& pinger, + const TString& executionId, + const ::NYql::NCommon::TServiceCounters& queryCounters) { + return std::make_unique<TResultWriterActor>(params, parent, connector, pinger, executionId, queryCounters); +} + +} diff --git a/ydb/core/fq/libs/compute/ydb/result_writer_actor.h b/ydb/core/fq/libs/compute/ydb/result_writer_actor.h new file mode 100644 index 0000000000..82418704f4 --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/result_writer_actor.h @@ -0,0 +1,18 @@ +#pragma once + +#include <ydb/core/fq/libs/compute/common/run_actor_params.h> + +#include <ydb/library/yql/providers/common/metrics/service_counters.h> + +#include <library/cpp/actors/core/actor.h> + +namespace NFq { + +std::unique_ptr<NActors::IActor> CreateResultWriterActor(const TRunActorParams& params, + const NActors::TActorId& parent, + const NActors::TActorId& connector, + const NActors::TActorId& pinger, + const TString& executionId, + const ::NYql::NCommon::TServiceCounters& queryCounters); + +} diff --git a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp new file mode 100644 index 0000000000..19f428fe3d --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp @@ -0,0 +1,186 @@ +#include "base_compute_actor.h" + +#include <ydb/core/fq/libs/common/util.h> +#include <ydb/core/fq/libs/compute/common/metrics.h> +#include <ydb/core/fq/libs/compute/common/retry_actor.h> +#include <ydb/core/fq/libs/compute/common/run_actor_params.h> +#include <ydb/core/fq/libs/compute/ydb/events/events.h> +#include <ydb/core/fq/libs/ydb/ydb.h> +#include <ydb/core/protos/services.pb.h> + +#include <ydb/library/yql/providers/common/metrics/service_counters.h> +#include <ydb/library/yql/public/issue/yql_issue_message.h> + +#include <ydb/public/sdk/cpp/client/draft/ydb_query/client.h> +#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h> + +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/log.h> + + +#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [StatusTracker] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream) +#define LOG_W(stream) LOG_WARN_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [StatusTracker] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream) +#define LOG_I(stream) LOG_INFO_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [StatusTracker] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream) +#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [StatusTracker] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream) +#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [StatusTracker] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream) + +namespace NFq { + +using namespace NActors; +using namespace NFq; + +class TStatusTrackerActor : public TBaseComputeActor<TStatusTrackerActor> { +public: + using IRetryPolicy = IRetryPolicy<const TEvPrivate::TEvGetOperationResponse::TPtr&>; + + enum ERequestType { + RT_GET_OPERATION, + RT_PING, + RT_MAX + }; + + class TCounters: public virtual TThrRefBase { + std::array<TComputeRequestCountersPtr, RT_MAX> Requests = CreateArray<RT_MAX, TComputeRequestCountersPtr>({ + { MakeIntrusive<TComputeRequestCounters>("GetOperation") }, + { MakeIntrusive<TComputeRequestCounters>("Ping") } + }); + + ::NMonitoring::TDynamicCounterPtr Counters; + + public: + explicit TCounters(const ::NMonitoring::TDynamicCounterPtr& counters) + : Counters(counters) + { + for (auto& request: Requests) { + request->Register(Counters); + } + } + + TComputeRequestCountersPtr GetCounters(ERequestType type) { + return Requests[type]; + } + }; + + TStatusTrackerActor(const TRunActorParams& params, const TActorId& parent, const TActorId& connector, const TActorId& pinger, const NYdb::TOperation::TOperationId& operationId, const ::NYql::NCommon::TServiceCounters& queryCounters) + : TBaseComputeActor(queryCounters, "StatusTracker") + , Params(params) + , Parent(parent) + , Connector(connector) + , Pinger(pinger) + , OperationId(operationId) + , Counters(GetStepCountersSubgroup()) + {} + + static constexpr char ActorName[] = "FQ_STATUS_TRACKER"; + + void Start() { + LOG_I("Become"); + Become(&TStatusTrackerActor::StateFunc); + SendGetOperation(); + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvPrivate::TEvGetOperationResponse, Handle); + hFunc(TEvents::TEvForwardPingResponse, Handle); + ) + + void Handle(const TEvents::TEvForwardPingResponse::TPtr& ev) { + auto pingCounters = Counters.GetCounters(ERequestType::RT_PING); + pingCounters->InFly->Dec(); + pingCounters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds()); + if (ev.Get()->Get()->Success) { + pingCounters->Ok->Inc(); + LOG_I("Information about the status of operation is stored"); + Send(Parent, new TEvPrivate::TEvStatusTrackerResponse(Issues, Status, ExecStatus)); + CompleteAndPassAway(); + } else { + pingCounters->Error->Inc(); + LOG_E("Error saving information about the status of operation"); + Send(Parent, new TEvPrivate::TEvStatusTrackerResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error saving information about the status of operation: " << ProtoToString(OperationId)}}, NYdb::EStatus::INTERNAL_ERROR, ExecStatus)); + FailedAndPassAway(); + } + } + + void Handle(const TEvPrivate::TEvGetOperationResponse::TPtr& ev) { + const auto& response = *ev.Get()->Get(); + if (response.Status != NYdb::EStatus::SUCCESS) { + LOG_E("Can't get operation: " << ev->Get()->Issues.ToOneLineString()); + Send(Parent, new TEvPrivate::TEvStatusTrackerResponse(ev->Get()->Issues, ev->Get()->Status, ExecStatus)); + FailedAndPassAway(); + return; + } + + StartTime = TInstant::Now(); + LOG_D("Execution status: " << static_cast<int>(response.ExecStatus)); + switch (response.ExecStatus) { + case NYdb::NQuery::EExecStatus::Unspecified: + case NYdb::NQuery::EExecStatus::Starting: + SendGetOperation(TDuration::Seconds(1)); + break; + case NYdb::NQuery::EExecStatus::Aborted: + case NYdb::NQuery::EExecStatus::Canceled: + case NYdb::NQuery::EExecStatus::Failed: + Issues = response.Issues; + Status = response.Status; + ExecStatus = response.ExecStatus; + Failed(); + break; + case NYdb::NQuery::EExecStatus::Completed: + Issues = response.Issues; + Status = response.Status; + ExecStatus = response.ExecStatus; + Complete(); + break; + } + } + + void SendGetOperation(const TDuration& delay = TDuration::Zero()) { + Register(new TRetryActor<TEvPrivate::TEvGetOperationRequest, TEvPrivate::TEvGetOperationResponse, NYdb::TOperation::TOperationId>(Counters.GetCounters(ERequestType::RT_GET_OPERATION), delay, SelfId(), Connector, OperationId)); + } + + void Failed() { + LOG_I("Execution status: Failed, " << Status); + auto pingCounters = Counters.GetCounters(ERequestType::RT_PING); + pingCounters->InFly->Inc(); + Fq::Private::PingTaskRequest pingTaskRequest; + NYql::IssuesToMessage(Issues, pingTaskRequest.mutable_issues()); + pingTaskRequest.set_status(::FederatedQuery::QueryMeta::FAILING); + Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest)); + } + + void Complete() { + LOG_I("Execution status: Complete" << Status); + auto pingCounters = Counters.GetCounters(ERequestType::RT_PING); + pingCounters->InFly->Inc(); + Fq::Private::PingTaskRequest pingTaskRequest; + NYql::IssuesToMessage(Issues, pingTaskRequest.mutable_issues()); + pingTaskRequest.set_status(::FederatedQuery::QueryMeta::COMPLETING); + Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest)); + } + +private: + TRunActorParams Params; + TActorId Parent; + TActorId Connector; + TActorId Pinger; + NYdb::TOperation::TOperationId OperationId; + TCounters Counters; + TInstant StartTime; + NYql::TIssues Issues; + NYdb::EStatus Status = NYdb::EStatus::SUCCESS; + NYdb::NQuery::EExecStatus ExecStatus = NYdb::NQuery::EExecStatus::Unspecified; +}; + +std::unique_ptr<NActors::IActor> CreateStatusTrackerActor(const TRunActorParams& params, + const TActorId& parent, + const TActorId& connector, + const TActorId& pinger, + const NYdb::TOperation::TOperationId& operationId, + const ::NYql::NCommon::TServiceCounters& queryCounters) { + return std::make_unique<TStatusTrackerActor>(params, parent, connector, pinger, operationId, queryCounters); +} + +} diff --git a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.h b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.h new file mode 100644 index 0000000000..e330be962a --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.h @@ -0,0 +1,18 @@ +#pragma once + +#include <ydb/core/fq/libs/compute/common/run_actor_params.h> + +#include <ydb/library/yql/providers/common/metrics/service_counters.h> + +#include <library/cpp/actors/core/actor.h> + +namespace NFq { + +std::unique_ptr<NActors::IActor> CreateStatusTrackerActor(const TRunActorParams& params, + const NActors::TActorId& parent, + const NActors::TActorId& connector, + const NActors::TActorId& pinger, + const NYdb::TOperation::TOperationId& operationId, + const ::NYql::NCommon::TServiceCounters& queryCounters); + +} diff --git a/ydb/core/fq/libs/compute/ydb/stopper_actor.cpp b/ydb/core/fq/libs/compute/ydb/stopper_actor.cpp new file mode 100644 index 0000000000..59e60c6128 --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/stopper_actor.cpp @@ -0,0 +1,111 @@ +#include "base_compute_actor.h" +#include "resources_cleaner_actor.h" + +#include <ydb/core/fq/libs/common/util.h> +#include <ydb/core/fq/libs/compute/common/metrics.h> +#include <ydb/core/fq/libs/compute/common/retry_actor.h> +#include <ydb/core/fq/libs/compute/common/run_actor_params.h> +#include <ydb/core/fq/libs/compute/ydb/events/events.h> +#include <ydb/core/fq/libs/ydb/ydb.h> +#include <ydb/core/protos/services.pb.h> + +#include <ydb/public/sdk/cpp/client/draft/ydb_query/client.h> +#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h> + +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/log.h> + + +#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [Stopper] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream) +#define LOG_W(stream) LOG_WARN_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [Stopper] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream) +#define LOG_I(stream) LOG_INFO_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [Stopper] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream) +#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [Stopper] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream) +#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [Stopper] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream) + +namespace NFq { + +using namespace NActors; +using namespace NFq; + +class TStopperActor : public TBaseComputeActor<TStopperActor> { +public: + enum ERequestType { + RT_CANCEL_OPERATION, + RT_MAX + }; + + class TCounters: public virtual TThrRefBase { + std::array<TComputeRequestCountersPtr, RT_MAX> Requests = CreateArray<RT_MAX, TComputeRequestCountersPtr>({ + { MakeIntrusive<TComputeRequestCounters>("CancelOperation") } + }); + + ::NMonitoring::TDynamicCounterPtr Counters; + + public: + explicit TCounters(const ::NMonitoring::TDynamicCounterPtr& counters) + : Counters(counters) + { + for (auto& request: Requests) { + request->Register(Counters); + } + } + + TComputeRequestCountersPtr GetCounters(ERequestType type) { + return Requests[type]; + } + }; + + TStopperActor(const TRunActorParams& params, const TActorId& parent, const TActorId& connector, const NYdb::TOperation::TOperationId& operationId, const ::NYql::NCommon::TServiceCounters& queryCounters) + : TBaseComputeActor(queryCounters, "Stopper") + , Params(params) + , Parent(parent) + , Connector(connector) + , OperationId(operationId) + , Counters(GetStepCountersSubgroup()) + {} + + static constexpr char ActorName[] = "FQ_STOPPER_ACTOR"; + + void Start() { + LOG_I("Start stopper actor. Compute state: " << FederatedQuery::QueryMeta::ComputeStatus_Name(Params.Status)); + Become(&TStopperActor::StateFunc); + Register(new TRetryActor<TEvPrivate::TEvCancelOperationRequest, TEvPrivate::TEvCancelOperationResponse, NYdb::TOperation::TOperationId>(Counters.GetCounters(ERequestType::RT_CANCEL_OPERATION), SelfId(), Connector, OperationId)); + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvPrivate::TEvCancelOperationResponse, Handle); + ) + + void Handle(const TEvPrivate::TEvCancelOperationResponse::TPtr& ev) { + const auto& response = *ev.Get()->Get(); + if (response.Status != NYdb::EStatus::SUCCESS) { + LOG_E("Can't cancel operation: " << ev->Get()->Issues.ToOneLineString()); + Send(Parent, new TEvPrivate::TEvStopperResponse(ev->Get()->Issues, ev->Get()->Status)); + FailedAndPassAway(); + return; + } + LOG_I("Operation successfully canceled"); + Send(Parent, new TEvPrivate::TEvStopperResponse({}, NYdb::EStatus::SUCCESS)); + CompleteAndPassAway(); + } + +private: + TRunActorParams Params; + TActorId Parent; + TActorId Connector; + NYdb::TOperation::TOperationId OperationId; + TCounters Counters; +}; + +std::unique_ptr<NActors::IActor> CreateStopperActor(const TRunActorParams& params, + const TActorId& parent, + const TActorId& connector, + const NYdb::TOperation::TOperationId& operationId, + const ::NYql::NCommon::TServiceCounters& queryCounters) { + return std::make_unique<TStopperActor>(params, parent, connector, operationId, queryCounters); +} + +} diff --git a/ydb/core/fq/libs/compute/ydb/stopper_actor.h b/ydb/core/fq/libs/compute/ydb/stopper_actor.h new file mode 100644 index 0000000000..a66c3ba7e0 --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/stopper_actor.h @@ -0,0 +1,17 @@ +#pragma once + +#include <ydb/core/fq/libs/compute/common/run_actor_params.h> + +#include <ydb/library/yql/providers/common/metrics/service_counters.h> + +#include <library/cpp/actors/core/actor.h> + +namespace NFq { + +std::unique_ptr<NActors::IActor> CreateStopperActor(const TRunActorParams& params, + const NActors::TActorId& parent, + const NActors::TActorId& connector, + const NYdb::TOperation::TOperationId& operationId, + const ::NYql::NCommon::TServiceCounters& queryCounters); + +} diff --git a/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp new file mode 100644 index 0000000000..164fe79513 --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp @@ -0,0 +1,128 @@ +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/core/hfunc.h> +#include <ydb/core/fq/libs/compute/ydb/events/events.h> +#include <ydb/core/fq/libs/compute/common/run_actor_params.h> +#include <ydb/public/sdk/cpp/client/draft/ydb_query/client.h> +#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h> +#include <ydb/core/fq/libs/ydb/ydb.h> + +namespace NFq { + +using namespace NActors; +using namespace NFq; + +class TYdbConnectorActor : public NActors::TActorBootstrapped<TYdbConnectorActor> { +public: + explicit TYdbConnectorActor(const TRunActorParams& params) + : Params(params) + {} + + void Bootstrap() { + auto querySettings = NFq::GetClientSettings<NYdb::NQuery::TClientSettings>(Params.Config.GetCompute().GetYdb().GetConnection(), Params.CredentialsProviderFactory); + QueryClient = std::make_unique<NYdb::NQuery::TQueryClient>(Params.YqSharedResources->UserSpaceYdbDriver, querySettings); + auto operationSettings = NFq::GetClientSettings<NYdb::TCommonClientSettings>(Params.Config.GetCompute().GetYdb().GetConnection(), Params.CredentialsProviderFactory); + OperationClient = std::make_unique<NYdb::NOperation::TOperationClient>(Params.YqSharedResources->UserSpaceYdbDriver, operationSettings); + Become(&TYdbConnectorActor::StateFunc); + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvPrivate::TEvExecuteScriptRequest, Handle); + hFunc(TEvPrivate::TEvGetOperationRequest, Handle); + hFunc(TEvPrivate::TEvFetchScriptResultRequest, Handle); + hFunc(TEvPrivate::TEvCancelOperationRequest, Handle); + hFunc(TEvPrivate::TEvForgetOperationRequest, Handle); + cFunc(NActors::TEvents::TEvPoisonPill::EventType, PassAway); + ) + + void Handle(const TEvPrivate::TEvExecuteScriptRequest::TPtr& ev) { + QueryClient + ->ExecuteScript(ev->Get()->Sql) + .Apply([actorSystem = NActors::TActivationContext::ActorSystem(), recipient = ev->Sender, cookie = ev->Cookie](auto future) { + try { + auto response = future.ExtractValueSync(); + if (response.Status().IsSuccess()) { + actorSystem->Send(recipient, new TEvPrivate::TEvExecuteScriptResponse(response.Id(), response.Metadata().ExecutionId), 0, cookie); + } else { + actorSystem->Send(recipient, new TEvPrivate::TEvExecuteScriptResponse(response.Status().GetIssues(), response.Status().GetStatus()), 0, cookie); + } + } catch (...) { + actorSystem->Send(recipient, new TEvPrivate::TEvExecuteScriptResponse(NYql::TIssues{NYql::TIssue{CurrentExceptionMessage()}}, NYdb::EStatus::GENERIC_ERROR), 0, cookie); + } + }); + } + + void Handle(const TEvPrivate::TEvGetOperationRequest::TPtr& ev) { + OperationClient + ->Get<NYdb::NQuery::TScriptExecutionOperation>(ev->Get()->OperationId) + .Apply([actorSystem = NActors::TActivationContext::ActorSystem(), recipient = ev->Sender, cookie = ev->Cookie](auto future) { + try { + auto response = future.ExtractValueSync(); + if (response.Status().IsSuccess()) { + actorSystem->Send(recipient, new TEvPrivate::TEvGetOperationResponse(response.Metadata().ExecStatus, response.Status().GetIssues()), 0, cookie); + } else { + actorSystem->Send(recipient, new TEvPrivate::TEvGetOperationResponse(response.Status().GetIssues(), response.Status().GetStatus()), 0, cookie); + } + } catch (...) { + actorSystem->Send(recipient, new TEvPrivate::TEvGetOperationResponse(NYql::TIssues{NYql::TIssue{CurrentExceptionMessage()}}, NYdb::EStatus::GENERIC_ERROR), 0, cookie); + } + }); + } + + void Handle(const TEvPrivate::TEvFetchScriptResultRequest::TPtr& ev) { + NYdb::NQuery::TFetchScriptResultsSettings settings; + settings.RowsOffset(ev->Get()->RowOffset); + QueryClient + ->FetchScriptResults(ev->Get()->ExecutionId, settings) + .Apply([actorSystem = NActors::TActivationContext::ActorSystem(), recipient = ev->Sender, cookie = ev->Cookie](auto future) { + try { + auto response = future.ExtractValueSync(); + if (response.IsSuccess()) { + actorSystem->Send(recipient, new TEvPrivate::TEvFetchScriptResultResponse(response.ExtractResultSet()), 0, cookie); + } else { + actorSystem->Send(recipient, new TEvPrivate::TEvFetchScriptResultResponse(response.GetIssues(), response.GetStatus()), 0, cookie); + } + } catch (...) { + actorSystem->Send(recipient, new TEvPrivate::TEvFetchScriptResultResponse(NYql::TIssues{NYql::TIssue{CurrentExceptionMessage()}}, NYdb::EStatus::GENERIC_ERROR), 0, cookie); + } + }); + } + + void Handle(const TEvPrivate::TEvCancelOperationRequest::TPtr& ev) { + OperationClient + ->Cancel(ev->Get()->OperationId) + .Apply([actorSystem = NActors::TActivationContext::ActorSystem(), recipient = ev->Sender, cookie = ev->Cookie](auto future) { + try { + auto response = future.ExtractValueSync(); + actorSystem->Send(recipient, new TEvPrivate::TEvCancelOperationResponse(response.GetIssues(), response.GetStatus()), 0, cookie); + } catch (...) { + actorSystem->Send(recipient, new TEvPrivate::TEvCancelOperationResponse(NYql::TIssues{NYql::TIssue{CurrentExceptionMessage()}}, NYdb::EStatus::GENERIC_ERROR), 0, cookie); + } + }); + } + + void Handle(const TEvPrivate::TEvForgetOperationRequest::TPtr& ev) { + OperationClient + ->Forget(ev->Get()->OperationId) + .Apply([actorSystem = NActors::TActivationContext::ActorSystem(), recipient = ev->Sender, cookie = ev->Cookie](auto future) { + try { + auto response = future.ExtractValueSync(); + actorSystem->Send(recipient, new TEvPrivate::TEvForgetOperationResponse(response.GetIssues(), response.GetStatus()), 0, cookie); + } catch (...) { + actorSystem->Send(recipient, new TEvPrivate::TEvForgetOperationResponse(NYql::TIssues{NYql::TIssue{CurrentExceptionMessage()}}, NYdb::EStatus::GENERIC_ERROR), 0, cookie); + } + }); + } + +private: + TRunActorParams Params; + std::unique_ptr<NYdb::NQuery::TQueryClient> QueryClient; + std::unique_ptr<NYdb::NOperation::TOperationClient> OperationClient; +}; + +std::unique_ptr<NActors::IActor> CreateConnectorActor(const TRunActorParams& params) { + return std::make_unique<TYdbConnectorActor>(params); +} + +} diff --git a/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.h b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.h new file mode 100644 index 0000000000..ae1ea7accf --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.h @@ -0,0 +1,9 @@ +#pragma once + +#include <ydb/core/fq/libs/compute/common/run_actor_params.h> + +namespace NFq { + +std::unique_ptr<NActors::IActor> CreateConnectorActor(const TRunActorParams& params); + +} diff --git a/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp b/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp new file mode 100644 index 0000000000..c1da57d093 --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp @@ -0,0 +1,217 @@ +#include "ydb_run_actor.h" + +#include <library/cpp/actors/core/events.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/log.h> + +#include <google/protobuf/util/time_util.h> + +#include <util/string/split.h> +#include <util/system/hostname.h> + +#include <library/cpp/actors/core/log.h> +#include <library/cpp/actors/core/mon.h> +#include <library/cpp/protobuf/interop/cast.h> +#include <ydb/core/fq/libs/compute/common/pinger.h> +#include <ydb/core/fq/libs/compute/ydb/events/events.h> +#include <ydb/core/fq/libs/control_plane_storage/util.h> +#include <ydb/core/fq/libs/private_client/events.h> +#include <ydb/core/fq/libs/ydb/ydb.h> +#include <ydb/core/protos/services.pb.h> +#include <ydb/library/yql/public/issue/yql_issue_message.h> +#include <ydb/public/sdk/cpp/client/draft/ydb_query/client.h> +#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h> +#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> + + +#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] QueryId: " << Params.QueryId << " " << stream) +#define LOG_W(stream) LOG_WARN_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] QueryId: " << Params.QueryId << " " << stream) +#define LOG_I(stream) LOG_INFO_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] QueryId: " << Params.QueryId << " " << stream) +#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] QueryId: " << Params.QueryId << " " << stream) +#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] QueryId: " << Params.QueryId << " " << stream) + +namespace NFq { + +using namespace NActors; +using namespace NFq; + +class TYdbRunActor : public NActors::TActorBootstrapped<TYdbRunActor> { +public: + explicit TYdbRunActor( + const TActorId& fetcherId, + const ::NYql::NCommon::TServiceCounters& queryCounters, + TRunActorParams&& params, + const IActorFactory::TPtr& actorFactory) + : FetcherId(fetcherId) + , Params(std::move(params)) + , CreatedAt(Params.CreatedAt) + , QueryCounters(queryCounters) + , ActorFactory(actorFactory) + {} + + static constexpr char ActorName[] = "YDB_RUN_ACTOR"; + + void Bootstrap() { + LOG_I("Boostrap. " << Params); + Pinger = Register(ActorFactory->CreatePinger(SelfId()).release()); + Connector = Register(ActorFactory->CreateConnector().release()); + Become(&TYdbRunActor::StateFunc); + Run(); + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvPrivate::TEvExecuterResponse, Handle); + hFunc(TEvPrivate::TEvStatusTrackerResponse, Handle); + hFunc(TEvPrivate::TEvResultWriterResponse, Handle); + hFunc(TEvPrivate::TEvResourcesCleanerResponse, Handle); + hFunc(TEvPrivate::TEvFinalizerResponse, Handle); + hFunc(TEvPrivate::TEvStopperResponse, Handle); + ) + + void Handle(const TEvPrivate::TEvExecuterResponse::TPtr& ev) { + auto& response = *ev->Get(); + if (!response.Success) { + LOG_I("ExecuterResponse (failed). Issues: " << response.Issues.ToOneLineString()); + ResignAndPassAway(response.Issues); + return; + } + Params.ExecutionId = response.ExecutionId; + Params.OperationId = response.OperationId; + LOG_I("ExecuterResponse (success). ExecutionId: " << Params.ExecutionId << " OperationId: " << ProtoToString(Params.OperationId)); + Register(ActorFactory->CreateStatusTracker(SelfId(), Connector, Pinger, Params.OperationId).release()); + } + + void Handle(const TEvPrivate::TEvStatusTrackerResponse::TPtr& ev) { + auto& response = *ev->Get(); + if (response.Status != NYdb::EStatus::SUCCESS) { + LOG_I("StatusTrackerResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString()); + ResignAndPassAway(response.Issues); + return; + } + ExecStatus = response.ExecStatus; + LOG_I("StatusTrackerResponse (success) " << response.Status << " ExecStatus: " << static_cast<int>(response.ExecStatus) << " Issues: " << response.Issues.ToOneLineString()); + if (response.ExecStatus == NYdb::NQuery::EExecStatus::Completed) { + Register(ActorFactory->CreateResultWriter(SelfId(), Connector, Pinger, Params.ExecutionId).release()); + } else { + Register(ActorFactory->CreateResourcesCleaner(SelfId(), Connector, Params.OperationId).release()); + } + } + + void Handle(const TEvPrivate::TEvResultWriterResponse::TPtr& ev) { + auto& response = *ev->Get(); + if (response.Status != NYdb::EStatus::SUCCESS) { + LOG_I("ResultWriterResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString()); + ResignAndPassAway(response.Issues); + return; + } + LOG_I("ResultWriterResponse (success) " << response.Status << " Issues: " << response.Issues.ToOneLineString()); + Register(ActorFactory->CreateResourcesCleaner(SelfId(), Connector, Params.OperationId).release()); + } + + void Handle(const TEvPrivate::TEvResourcesCleanerResponse::TPtr& ev) { + auto& response = *ev->Get(); + if (response.Status != NYdb::EStatus::SUCCESS && response.Status != NYdb::EStatus::UNSUPPORTED) { + LOG_I("ResourcesCleanerResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString()); + ResignAndPassAway(response.Issues); + return; + } + LOG_I("ResourcesCleanerResponse (success) " << response.Status << " Issues: " << response.Issues.ToOneLineString()); + Register(ActorFactory->CreateFinalizer(SelfId(), Pinger, ExecStatus).release()); + } + + void Handle(const TEvPrivate::TEvFinalizerResponse::TPtr ev) { + auto& response = *ev->Get(); + if (response.Status != NYdb::EStatus::SUCCESS) { + LOG_I("FinalizerResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString()); + ResignAndPassAway(response.Issues); + return; + } + LOG_I("FinalizerResponse (success) " << response.Status << " Issues: " << response.Issues.ToOneLineString()); + FinishAndPassAway(); + } + + void Handle(TEvents::TEvQueryActionResult::TPtr& ev) { + LOG_I("QueryActionResult: " << ev->Get()->Action); + if (Params.OperationId.GetKind() != Ydb::TOperationId::UNUSED && !IsAborted) { + IsAborted = true; + Register(ActorFactory->CreateStopper(SelfId(), Connector, Params.OperationId).release()); + } + } + + void Handle(const TEvPrivate::TEvStopperResponse::TPtr& ev) { + auto& response = *ev->Get(); + if (response.Status != NYdb::EStatus::SUCCESS) { + LOG_I("StopperResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString()); + ResignAndPassAway(response.Issues); + return; + } + LOG_I("StopperResponse (success) " << response.Status << " Issues: " << response.Issues.ToOneLineString()); + Register(ActorFactory->CreateResourcesCleaner(SelfId(), Connector, Params.OperationId).release()); + } + + void Run() { + switch (Params.Status) { + case FederatedQuery::QueryMeta::ABORTING_BY_USER: + case FederatedQuery::QueryMeta::ABORTING_BY_SYSTEM: + case FederatedQuery::QueryMeta::FAILING: + if (Params.OperationId.GetKind() != Ydb::TOperationId::UNUSED) { + Register(ActorFactory->CreateResourcesCleaner(SelfId(), Connector, Params.OperationId).release()); + } else { + Register(ActorFactory->CreateFinalizer(SelfId(), Pinger, ExecStatus).release()); + } + break; + case FederatedQuery::QueryMeta::COMPLETING: + if (Params.OperationId.GetKind() != Ydb::TOperationId::UNUSED) { + Register(ActorFactory->CreateResultWriter(SelfId(), Connector, Pinger, Params.ExecutionId).release()); + } else { + Register(ActorFactory->CreateFinalizer(SelfId(), Pinger, ExecStatus).release()); + } + break; + case FederatedQuery::QueryMeta::STARTING: + Register(ActorFactory->CreateExecuter(SelfId(), Connector, Pinger).release()); + break; + case FederatedQuery::QueryMeta::RUNNING: + Register(ActorFactory->CreateStatusTracker(SelfId(), Connector, Pinger, Params.OperationId).release()); + break; + default: + break; + } + } + + void ResignAndPassAway(const NYql::TIssues& issues) { + Fq::Private::PingTaskRequest pingTaskRequest; + NYql::IssuesToMessage(issues, pingTaskRequest.mutable_transient_issues()); + pingTaskRequest.set_resign_query(true); + Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest, true)); + FinishAndPassAway(); + } + + void FinishAndPassAway() { + Send(Connector, new NActors::TEvents::TEvPoisonPill()); + PassAway(); + } + +private: + bool IsAborted = false; + TActorId FetcherId; + NYdb::NQuery::EExecStatus ExecStatus = NYdb::NQuery::EExecStatus::Unspecified; + TRunActorParams Params; + TInstant CreatedAt; + NYql::TIssues TransientIssues; + ::NYql::NCommon::TServiceCounters QueryCounters; + TActorId Pinger; + TActorId Connector; + IActorFactory::TPtr ActorFactory; +}; + +IActor* CreateYdbRunActor( + const NActors::TActorId& fetcherId, + const ::NYql::NCommon::TServiceCounters& serviceCounters, + TRunActorParams&& params, + const IActorFactory::TPtr& actorFactory +) { + return new TYdbRunActor(fetcherId, serviceCounters, std::move(params), actorFactory); +} + +} /* NFq */ diff --git a/ydb/core/fq/libs/compute/ydb/ydb_run_actor.h b/ydb/core/fq/libs/compute/ydb/ydb_run_actor.h new file mode 100644 index 0000000000..2b738db143 --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/ydb_run_actor.h @@ -0,0 +1,38 @@ +#pragma once + +#include "actors_factory.h" + +#include <ydb/core/mon/mon.h> + +#include <ydb/core/fq/libs/compute/common/run_actor_params.h> +#include <ydb/core/fq/libs/config/protos/pinger.pb.h> +#include <ydb/core/fq/libs/events/events.h> +#include <ydb/core/fq/libs/private_client/private_client.h> +#include <ydb/core/fq/libs/shared_resources/shared_resources.h> +#include <ydb/core/fq/libs/signer/signer.h> + +#include <ydb/library/yql/minikql/computation/mkql_computation_node.h> +#include <ydb/library/yql/providers/dq/provider/yql_dq_gateway.h> +#include <ydb/library/yql/providers/dq/worker_manager/interface/counters.h> +#include <ydb/library/yql/providers/dq/actors/proto_builder.h> +#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> +#include <ydb/library/yql/providers/common/metrics/service_counters.h> +#include <ydb/library/yql/providers/pq/cm_client/client.h> + +#include <ydb/public/lib/fq/scope.h> + +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/random_provider/random_provider.h> +#include <library/cpp/time_provider/time_provider.h> + +#include <util/datetime/base.h> + +namespace NFq { + +NActors::IActor* CreateYdbRunActor( + const NActors::TActorId& fetcherId, + const ::NYql::NCommon::TServiceCounters& serviceCounters, + TRunActorParams&& params, + const IActorFactory::TPtr& actorFactory); + +} /* NFq */ diff --git a/ydb/core/fq/libs/config/protos/compute.proto b/ydb/core/fq/libs/config/protos/compute.proto index 2bed58d5d4..90ece7388c 100644 --- a/ydb/core/fq/libs/config/protos/compute.proto +++ b/ydb/core/fq/libs/config/protos/compute.proto @@ -5,6 +5,7 @@ package NFq.NConfig; option java_package = "ru.yandex.kikimr.proto"; import "ydb/core/fq/libs/config/protos/storage.proto"; +import "ydb/public/api/protos/draft/fq.proto"; //////////////////////////////////////////////////////////// @@ -12,12 +13,34 @@ message TInPlaceCompute { } message TYdbCompute { - TYdbStorageConfig Connection = 1; + bool Enable = 1; + TYdbStorageConfig Connection = 2; } -message TComputeConfig { - oneof type { - TInPlaceCompute InPlace = 1; - TYdbCompute Ydb = 2; +enum EComputeType { + UNKNOWN = 0; + IN_PLACE = 1; + YDB = 2; +} + +message TComputeMappingRuleKey { + oneof key { + FederatedQuery.QueryContent.QueryType QueryType = 1; + // FederatedQuery.QueryContent.EngineType EngineType = 2; } } + +message TComputeMappingRule { + repeated TComputeMappingRuleKey Key = 1; + EComputeType Compute = 2; +} + +message TComputeMapping { + repeated TComputeMappingRule Rule = 1; +} + +message TComputeConfig { + TInPlaceCompute InPlace = 1; + TYdbCompute Ydb = 2; + TComputeMapping ComputeMapping = 3; +} diff --git a/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp b/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp index 8ecb120c6f..a86d817e87 100644 --- a/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp +++ b/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp @@ -491,6 +491,9 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ *newTask->mutable_result_set_meta() = task.Query.result_set_meta(); newTask->set_scope(task.Scope); *newTask->mutable_resources() = task.Internal.resources(); + + newTask->set_execution_id(task.Internal.execution_id()); + newTask->set_operation_id(task.Internal.operation_id()); } return result; diff --git a/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp b/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp index d41d5f2650..829155c6fd 100644 --- a/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp +++ b/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp @@ -170,7 +170,7 @@ TPingTaskParams ConstructHardPingTask( } if (request.status_code() != NYql::NDqProto::StatusIds::UNSPECIFIED) { - internal.set_status_code(request.status_code()); + internal.set_status_code(request.status_code()); } if (issues) { @@ -277,6 +277,8 @@ TPingTaskParams ConstructHardPingTask( internal.clear_created_topic_consumers(); // internal.clear_dq_graph(); keep for debug internal.clear_dq_graph_index(); + // internal.clear_execution_id(); keep for debug + // internal.clear_operation_id(); keep for debug } if (!request.created_topic_consumers().empty()) { @@ -293,6 +295,14 @@ TPingTaskParams ConstructHardPingTask( } } + if (!request.execution_id().empty()) { + internal.set_execution_id(request.execution_id()); + } + + if (!request.operation_id().empty()) { + internal.set_operation_id(request.operation_id()); + } + if (!request.dq_graph().empty()) { *internal.mutable_dq_graph() = request.dq_graph(); } diff --git a/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto b/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto index dcb545065e..ff7d943d41 100644 --- a/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto +++ b/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto @@ -41,7 +41,9 @@ message QueryInternal { repeated Fq.Private.CompressedData dq_graph_compressed = 20; Fq.Private.TaskResources resources = 21; repeated Ydb.Issue.IssueMessage internal_issue = 22; - NYql.NDqProto.StatusIds.StatusCode status_code = 23; + NYql.NDqProto.StatusIds.StatusCode status_code = 23; + string operation_id = 24; + string execution_id = 25; } message JobInternal { diff --git a/ydb/core/fq/libs/control_plane_storage/util.cpp b/ydb/core/fq/libs/control_plane_storage/util.cpp index 61eaa149bf..c31aa8e20b 100644 --- a/ydb/core/fq/libs/control_plane_storage/util.cpp +++ b/ydb/core/fq/libs/control_plane_storage/util.cpp @@ -185,6 +185,9 @@ bool DoesPingTaskUpdateQueriesTable(const Fq::Private::PingTaskRequest& request) || request.has_disposition() || request.has_resources() || !request.internal_issues().empty() + || !request.execution_id().empty() + || !request.operation_id().empty() + || !request.result_id().value().empty() ; } diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp index fc1097df9d..db24cda40e 100644 --- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp +++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp @@ -955,6 +955,8 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyQuery internal.clear_plan_compressed(); internal.clear_ast_compressed(); internal.clear_dq_graph_compressed(); + internal.clear_execution_id(); + internal.clear_operation_id(); auto& jobMeta = *job.mutable_meta(); jobMeta.set_id(jobId); diff --git a/ydb/core/fq/libs/events/events.h b/ydb/core/fq/libs/events/events.h index 18e7724946..3d5e4e6e2c 100644 --- a/ydb/core/fq/libs/events/events.h +++ b/ydb/core/fq/libs/events/events.h @@ -243,7 +243,7 @@ struct TEvents { struct TEvEffectApplicationResult : public NActors::TEventLocal<TEvEffectApplicationResult, TEventIds::EvEffectApplicationResult> { explicit TEvEffectApplicationResult(const NYql::TIssues& issues, bool fataError = false) : Issues(issues), FatalError(fataError) { - } + } NYql::TIssues Issues; const bool FatalError; }; diff --git a/ydb/core/fq/libs/protos/fq_private.proto b/ydb/core/fq/libs/protos/fq_private.proto index 80b507f18d..9320cdaf02 100644 --- a/ydb/core/fq/libs/protos/fq_private.proto +++ b/ydb/core/fq/libs/protos/fq_private.proto @@ -114,6 +114,8 @@ message GetTaskResult { TaskResources resources = 33; FederatedQuery.QueryContent.QuerySyntax query_syntax = 34; + string operation_id = 35; + string execution_id = 36; } repeated Task tasks = 1; } @@ -154,6 +156,8 @@ message PingTaskRequest { string tenant = 104; TaskResources resources = 33; repeated Ydb.Issue.IssueMessage internal_issues = 34; + string operation_id = 35; + string execution_id = 36; } message PingTaskResult { diff --git a/ydb/core/fq/libs/ydb/ydb.cpp b/ydb/core/fq/libs/ydb/ydb.cpp index 117a4905f8..b20d38c672 100644 --- a/ydb/core/fq/libs/ydb/ydb.cpp +++ b/ydb/core/fq/libs/ydb/ydb.cpp @@ -4,11 +4,6 @@ #include <util/stream/str.h> #include <util/string/printf.h> -#include <util/stream/file.h> -#include <util/string/strip.h> -#include <util/system/env.h> - -#include <ydb/library/security/ydb_credentials_provider_factory.h> namespace NFq { @@ -22,9 +17,6 @@ namespace { //////////////////////////////////////////////////////////////////////////////// -template <class TSettings> -TSettings GetClientSettings(const NConfig::TYdbStorageConfig& config, const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory); - TFuture<TDataQueryResult> SelectGeneration(const TGenerationContextPtr& context) { // TODO: use prepared queries @@ -169,46 +161,6 @@ TFuture<TStatus> RegisterGenerationWrapper( }); } -template <class TSettings> -TSettings GetClientSettings(const NConfig::TYdbStorageConfig& config, - const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory) { - TString oauth; - if (config.GetToken()) { - oauth = config.GetToken(); - } else if (config.GetOAuthFile()) { - oauth = StripString(TFileInput(config.GetOAuthFile()).ReadAll()); - } else { - oauth = GetEnv("YDB_TOKEN"); - } - - const TString iamEndpoint = config.GetIamEndpoint(); - const TString saKeyFile = config.GetSaKeyFile(); - - TSettings settings; - settings - .DiscoveryEndpoint(config.GetEndpoint()) - .Database(config.GetDatabase()); - - NKikimr::TYdbCredentialsSettings credSettings; - credSettings.UseLocalMetadata = config.GetUseLocalMetadataService(); - credSettings.OAuthToken = oauth; - credSettings.SaKeyFile = config.GetSaKeyFile(); - credSettings.IamEndpoint = config.GetIamEndpoint(); - - settings.CredentialsProviderFactory(credProviderFactory(credSettings)); - - if (config.GetUseLocalMetadataService()) { - settings.SslCredentials(TSslCredentials(true)); - } - - if (config.GetCertificateFile()) { - auto cert = StripString(TFileInput(config.GetCertificateFile()).ReadAll()); - settings.SslCredentials(TSslCredentials(true, cert)); - } - - return settings; -} - } // namespace //////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/fq/libs/ydb/ydb.h b/ydb/core/fq/libs/ydb/ydb.h index 6c4b99e09f..85891e336b 100644 --- a/ydb/core/fq/libs/ydb/ydb.h +++ b/ydb/core/fq/libs/ydb/ydb.h @@ -8,6 +8,10 @@ #include <ydb/public/sdk/cpp/client/ydb_table/table.h> #include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h> +#include <util/stream/file.h> +#include <util/string/strip.h> +#include <util/system/env.h> + namespace NFq { //////////////////////////////////////////////////////////////////////////////// @@ -130,4 +134,44 @@ NThreading::TFuture<NYdb::TStatus> CheckGeneration(const TGenerationContextPtr& NThreading::TFuture<NYdb::TStatus> RollbackTransaction(const TGenerationContextPtr& context); +template <class TSettings> +TSettings GetClientSettings(const NConfig::TYdbStorageConfig& config, + const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory) { + TString oauth; + if (config.GetToken()) { + oauth = config.GetToken(); + } else if (config.GetOAuthFile()) { + oauth = StripString(TFileInput(config.GetOAuthFile()).ReadAll()); + } else { + oauth = GetEnv("YDB_TOKEN"); + } + + const TString iamEndpoint = config.GetIamEndpoint(); + const TString saKeyFile = config.GetSaKeyFile(); + + TSettings settings; + settings + .DiscoveryEndpoint(config.GetEndpoint()) + .Database(config.GetDatabase()); + + NKikimr::TYdbCredentialsSettings credSettings; + credSettings.UseLocalMetadata = config.GetUseLocalMetadataService(); + credSettings.OAuthToken = oauth; + credSettings.SaKeyFile = config.GetSaKeyFile(); + credSettings.IamEndpoint = config.GetIamEndpoint(); + + settings.CredentialsProviderFactory(credProviderFactory(credSettings)); + + if (config.GetUseLocalMetadataService()) { + settings.SslCredentials(NYdb::TSslCredentials(true)); + } + + if (config.GetCertificateFile()) { + auto cert = StripString(TFileInput(config.GetCertificateFile()).ReadAll()); + settings.SslCredentials(NYdb::TSslCredentials(true, cert)); + } + + return settings; +} + } // namespace NFq |