diff options
author | hcpp <hcpp@ydb.tech> | 2023-06-29 13:22:51 +0300 |
---|---|---|
committer | hcpp <hcpp@ydb.tech> | 2023-06-29 13:22:51 +0300 |
commit | 81ed8b7a026c3cc783d5d3f143f5e742d84076b1 (patch) | |
tree | c16c472ee16171f2738e4c987cafe8abf405f176 | |
parent | 73a2c5edc8221c6b78f738254db6718adbb96f35 (diff) | |
download | ydb-81ed8b7a026c3cc783d5d3f143f5e742d84076b1.tar.gz |
compute database control plane has been added
78 files changed, 1510 insertions, 178 deletions
diff --git a/ydb/core/fq/libs/actors/pending_fetcher.cpp b/ydb/core/fq/libs/actors/pending_fetcher.cpp index 8def74bf714..0da5ba789a5 100644 --- a/ydb/core/fq/libs/actors/pending_fetcher.cpp +++ b/ydb/core/fq/libs/actors/pending_fetcher.cpp @@ -349,6 +349,11 @@ private: *resources.mutable_topic_consumers() = task.created_topic_consumers(); } + NFq::NConfig::TYdbStorageConfig computeConnection = ComputeConfig.GetConnection(task.scope()); + computeConnection.set_endpoint(task.compute_connection().endpoint()); + computeConnection.set_database(task.compute_connection().database()); + computeConnection.set_usessl(task.compute_connection().usessl()); + TRunActorParams params( YqSharedResources, CredentialsProviderFactory, S3Gateway, FunctionRegistry, RandomProvider, @@ -386,7 +391,8 @@ private: task.job_id().value(), resources, task.execution_id(), - task.operation_id() + task.operation_id(), + computeConnection ); auto runActorId = diff --git a/ydb/core/fq/libs/compute/common/config.h b/ydb/core/fq/libs/compute/common/config.h index c2b02c7ea16..ddce2995513 100644 --- a/ydb/core/fq/libs/compute/common/config.h +++ b/ydb/core/fq/libs/compute/common/config.h @@ -3,6 +3,10 @@ #include <ydb/core/fq/libs/config/protos/compute.pb.h> #include <ydb/core/fq/libs/protos/fq_private.pb.h> +#include <util/generic/algorithm.h> + +#include <util/digest/multi.h> + namespace NFq { class TComputeConfig { @@ -28,6 +32,33 @@ public: } return NFq::NConfig::EComputeType::IN_PLACE; } + + NFq::NConfig::TYdbStorageConfig GetConnection(const TString& scope) const { + const auto& controlPlane = ComputeConfig.GetYdb().GetControlPlane(); + switch (controlPlane.type_case()) { + case NConfig::TYdbComputeControlPlane::TYPE_NOT_SET: + return {}; + case NConfig::TYdbComputeControlPlane::kSingle: + return controlPlane.GetSingle().GetConnection(); + case NConfig::TYdbComputeControlPlane::kCms: + return GetConnection(scope, controlPlane.GetCms().GetDatabaseMapping()); + case NConfig::TYdbComputeControlPlane::kYdbcp: + return GetConnection(scope, controlPlane.GetYdbcp().GetDatabaseMapping()); + } + } + + NFq::NConfig::TYdbStorageConfig GetConnection(const TString& scope, const ::NFq::NConfig::TDatabaseMapping& databaseMapping) const { + auto it = databaseMapping.GetScopeToComputeDatabase().find(scope); + if (it != databaseMapping.GetScopeToComputeDatabase().end()) { + return it->second.GetConnection(); + } + return databaseMapping.GetCommon().empty() ? NFq::NConfig::TYdbStorageConfig{} : databaseMapping.GetCommon(MultiHash(scope) % databaseMapping.GetCommon().size()).GetConnection(); + } + + bool YdbComputeControlPlaneEnabled() const { + return ComputeConfig.GetYdb().GetEnable() && ComputeConfig.GetYdb().GetControlPlane().GetEnable(); + } + private: NFq::NConfig::TComputeConfig ComputeConfig; }; diff --git a/ydb/core/fq/libs/compute/common/run_actor_params.cpp b/ydb/core/fq/libs/compute/common/run_actor_params.cpp index 21ed96052ef..e0c67efc743 100644 --- a/ydb/core/fq/libs/compute/common/run_actor_params.cpp +++ b/ydb/core/fq/libs/compute/common/run_actor_params.cpp @@ -51,7 +51,8 @@ TRunActorParams::TRunActorParams( const TString& jobId, const Fq::Private::TaskResources& resources, const TString& executionId, - const TString& operationId + const TString& operationId, + const NFq::NConfig::TYdbStorageConfig& computeConnection ) : YqSharedResources(yqSharedResources) , CredentialsProviderFactory(credentialsProviderFactory) @@ -100,6 +101,7 @@ TRunActorParams::TRunActorParams( , Resources(resources) , ExecutionId(executionId) , OperationId(operationId, true) + , ComputeConnection(computeConnection) { } @@ -123,6 +125,7 @@ IOutputStream& operator<<(IOutputStream& out, const TRunActorParams& params) { << " Resource.TopicConsumers: " << params.Resources.topic_consumers().size() << " ExecutionId: " << params.ExecutionId << " OperationId: " << (params.OperationId.GetKind() != Ydb::TOperationId::UNUSED ? ProtoToString(params.OperationId) : "<empty>") + << " ComputeConnection: " << params.ComputeConnection.ShortDebugString() << " }"; } diff --git a/ydb/core/fq/libs/compute/common/run_actor_params.h b/ydb/core/fq/libs/compute/common/run_actor_params.h index b5f0843d601..db5e480dddc 100644 --- a/ydb/core/fq/libs/compute/common/run_actor_params.h +++ b/ydb/core/fq/libs/compute/common/run_actor_params.h @@ -3,6 +3,7 @@ #include <ydb/core/fq/libs/config/protos/common.pb.h> #include <ydb/core/fq/libs/config/protos/fq_config.pb.h> #include <ydb/core/fq/libs/config/protos/pinger.pb.h> +#include <ydb/core/fq/libs/config/protos/storage.pb.h> #include <ydb/core/fq/libs/events/events.h> #include <ydb/core/fq/libs/shared_resources/shared_resources.h> @@ -69,7 +70,8 @@ struct TRunActorParams { // TODO2 : Change name const TString& jobId, const Fq::Private::TaskResources& resources, const TString& executionId, - const TString& operationId + const TString& operationId, + const NFq::NConfig::TYdbStorageConfig& computeConnection ); TRunActorParams(const TRunActorParams& params) = default; @@ -127,6 +129,7 @@ struct TRunActorParams { // TODO2 : Change name Fq::Private::TaskResources Resources; TString ExecutionId; NYdb::TOperation::TOperationId OperationId; + NFq::NConfig::TYdbStorageConfig ComputeConnection; }; } /* NFq */ diff --git a/ydb/core/fq/libs/compute/ydb/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/compute/ydb/CMakeLists.darwin-x86_64.txt index 0dce7fd471e..a89b1eff250 100644 --- a/ydb/core/fq/libs/compute/ydb/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/fq/libs/compute/ydb/CMakeLists.darwin-x86_64.txt @@ -6,6 +6,8 @@ # original buildsystem will not be accepted. +add_subdirectory(control_plane) +add_subdirectory(events) add_library(libs-compute-ydb) target_compile_options(libs-compute-ydb PRIVATE diff --git a/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-aarch64.txt index cef154a1ebc..fb3627b9597 100644 --- a/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-aarch64.txt +++ b/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-aarch64.txt @@ -6,6 +6,8 @@ # original buildsystem will not be accepted. +add_subdirectory(control_plane) +add_subdirectory(events) add_library(libs-compute-ydb) target_compile_options(libs-compute-ydb PRIVATE diff --git a/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-x86_64.txt index cef154a1ebc..fb3627b9597 100644 --- a/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-x86_64.txt +++ b/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-x86_64.txt @@ -6,6 +6,8 @@ # original buildsystem will not be accepted. +add_subdirectory(control_plane) +add_subdirectory(events) add_library(libs-compute-ydb) target_compile_options(libs-compute-ydb PRIVATE diff --git a/ydb/core/fq/libs/compute/ydb/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/compute/ydb/CMakeLists.windows-x86_64.txt index 0dce7fd471e..a89b1eff250 100644 --- a/ydb/core/fq/libs/compute/ydb/CMakeLists.windows-x86_64.txt +++ b/ydb/core/fq/libs/compute/ydb/CMakeLists.windows-x86_64.txt @@ -6,6 +6,8 @@ # original buildsystem will not be accepted. +add_subdirectory(control_plane) +add_subdirectory(events) add_library(libs-compute-ydb) target_compile_options(libs-compute-ydb PRIVATE diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/compute/ydb/control_plane/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..7b5b602ee1b --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/control_plane/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,32 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(compute-ydb-control_plane) +target_compile_options(compute-ydb-control_plane PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(compute-ydb-control_plane PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core + cpp-actors-protos + libs-control_plane_storage-proto + libs-quota_manager-proto + ydb-core-protos + library-db_pool-protos + yql-public-issue + api-grpc + api-grpc-draft + lib-operation_id-protos +) +target_sources(compute-ydb-control_plane PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/control_plane/cms_grpc_client_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/control_plane/ydbcp_grpc_client_actor.cpp +) diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/compute/ydb/control_plane/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..7c699add786 --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/control_plane/CMakeLists.linux-aarch64.txt @@ -0,0 +1,33 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(compute-ydb-control_plane) +target_compile_options(compute-ydb-control_plane PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(compute-ydb-control_plane PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-actors-core + cpp-actors-protos + libs-control_plane_storage-proto + libs-quota_manager-proto + ydb-core-protos + library-db_pool-protos + yql-public-issue + api-grpc + api-grpc-draft + lib-operation_id-protos +) +target_sources(compute-ydb-control_plane PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/control_plane/cms_grpc_client_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/control_plane/ydbcp_grpc_client_actor.cpp +) diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/compute/ydb/control_plane/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..7c699add786 --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/control_plane/CMakeLists.linux-x86_64.txt @@ -0,0 +1,33 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(compute-ydb-control_plane) +target_compile_options(compute-ydb-control_plane PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(compute-ydb-control_plane PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-actors-core + cpp-actors-protos + libs-control_plane_storage-proto + libs-quota_manager-proto + ydb-core-protos + library-db_pool-protos + yql-public-issue + api-grpc + api-grpc-draft + lib-operation_id-protos +) +target_sources(compute-ydb-control_plane PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/control_plane/cms_grpc_client_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/control_plane/ydbcp_grpc_client_actor.cpp +) diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/CMakeLists.txt b/ydb/core/fq/libs/compute/ydb/control_plane/CMakeLists.txt new file mode 100644 index 00000000000..f8b31df0c11 --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/control_plane/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/compute/ydb/control_plane/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..7b5b602ee1b --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/control_plane/CMakeLists.windows-x86_64.txt @@ -0,0 +1,32 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(compute-ydb-control_plane) +target_compile_options(compute-ydb-control_plane PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(compute-ydb-control_plane PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core + cpp-actors-protos + libs-control_plane_storage-proto + libs-quota_manager-proto + ydb-core-protos + library-db_pool-protos + yql-public-issue + api-grpc + api-grpc-draft + lib-operation_id-protos +) +target_sources(compute-ydb-control_plane PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/control_plane/cms_grpc_client_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/control_plane/ydbcp_grpc_client_actor.cpp +) diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/cms_grpc_client_actor.cpp b/ydb/core/fq/libs/compute/ydb/control_plane/cms_grpc_client_actor.cpp new file mode 100644 index 00000000000..0d9233e7367 --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/control_plane/cms_grpc_client_actor.cpp @@ -0,0 +1,116 @@ +#include <ydb/public/api/grpc/ydb_cms_v1.grpc.pb.h> + +#include <ydb/core/fq/libs/compute/ydb/events/events.h> +#include <ydb/core/protos/services.pb.h> + +#include <ydb/library/security/ydb_credentials_provider_factory.h> +#include <ydb/library/ycloud/api/events.h> +#include <ydb/library/ycloud/impl/grpc_service_client.h> + +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/event.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/log.h> + +#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [CmsGrpcClient]: " << stream) +#define LOG_W(stream) LOG_WARN_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [CmsGrpcClient]: " << stream) +#define LOG_I(stream) LOG_INFO_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [CmsGrpcClient]: " << stream) +#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [CmsGrpcClient]: " << stream) +#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [CmsGrpcClient]: " << stream) + +namespace NFq { + +using namespace NActors; + +namespace { + +struct TEvPrivate { + enum EEv { + // requests + EvCreateDatabaseRequest = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), + + // replies + EvCreateDatabaseResponse, + + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)"); + + struct TEvCreateDatabaseRequest : NCloud::TEvGrpcProtoRequest<TEvCreateDatabaseRequest, EvCreateDatabaseRequest, Ydb::Cms::CreateDatabaseRequest> {}; + struct TEvCreateDatabaseResponse : NCloud::TEvGrpcProtoResponse<TEvCreateDatabaseResponse, EvCreateDatabaseResponse, Ydb::Cms::CreateDatabaseResponse> {}; +}; + +} + +class TCmsGrpcServiceActor : public NActors::TActor<TCmsGrpcServiceActor>, TGrpcServiceClient<Ydb::Cms::V1::CmsService> { +public: + using TBase = NActors::TActor<TCmsGrpcServiceActor>; + struct TCreateDatabaseGrpcRequest : TGrpcRequest { + static constexpr auto Request = &Ydb::Cms::V1::CmsService::Stub::AsyncCreateDatabase; + using TRequestEventType = TEvPrivate::TEvCreateDatabaseRequest; + using TResponseEventType = TEvPrivate::TEvCreateDatabaseResponse; + }; + + TCmsGrpcServiceActor(const NCloud::TGrpcClientSettings& settings, const NYdb::TCredentialsProviderPtr& credentialsProvider) + : TBase(&TCmsGrpcServiceActor::StateFunc) + , TGrpcServiceClient(settings) + , Settings(settings) + , CredentialsProvider(credentialsProvider) + {} + + STRICT_STFUNC(StateFunc, + hFunc(TEvYdbCompute::TEvCreateDatabaseRequest, Handle); + hFunc(TEvPrivate::TEvCreateDatabaseResponse, Handle); + ) + + void Handle(TEvYdbCompute::TEvCreateDatabaseRequest::TPtr& ev) { + const auto& request = *ev.Get()->Get(); + auto forwardRequest = std::make_unique<TEvPrivate::TEvCreateDatabaseRequest>(); + forwardRequest->Request.mutable_serverless_resources()->set_shared_database_path(request.BasePath); + forwardRequest->Request.set_path(request.Path); + forwardRequest->Token = CredentialsProvider->GetAuthInfo(); + TEvPrivate::TEvCreateDatabaseRequest::TPtr forwardEvent = (NActors::TEventHandle<TEvPrivate::TEvCreateDatabaseRequest>*)new IEventHandle(SelfId(), SelfId(), forwardRequest.release(), 0, Cookie); + MakeCall<TCreateDatabaseGrpcRequest>(std::move(forwardEvent)); + Requests[Cookie++] = ev; + } + + void Handle(TEvPrivate::TEvCreateDatabaseResponse::TPtr& ev) { + const auto& status = ev->Get()->Status; + auto it = Requests.find(ev->Cookie); + if (it == Requests.end()) { + LOG_E("Request doesn't exist. Need to fix this bug urgently"); + return; + } + auto request = it->second; + Requests.erase(it); + + auto forwardResponse = std::make_unique<TEvYdbCompute::TEvCreateDatabaseResponse>(); + if (!status.Ok() && status.GRpcStatusCode != grpc::StatusCode::ALREADY_EXISTS) { + forwardResponse->Issues.AddIssue("GrpcCode: " + ToString(status.GRpcStatusCode)); + forwardResponse->Issues.AddIssue("Message: " + status.Msg); + forwardResponse->Issues.AddIssue("Details: " + status.Details); + Send(request->Sender, forwardResponse.release(), 0, request->Cookie); + return; + } + + forwardResponse->Result.set_id(request.Get()->Get()->Path); + forwardResponse->Result.mutable_connection()->set_endpoint(Settings.Endpoint); + forwardResponse->Result.mutable_connection()->set_database(request.Get()->Get()->Path); + forwardResponse->Result.mutable_connection()->set_usessl(Settings.EnableSsl); + + Send(request->Sender, forwardResponse.release(), 0, request->Cookie); + } + +private: + NCloud::TGrpcClientSettings Settings; + TMap<uint64_t, TEvYdbCompute::TEvCreateDatabaseRequest::TPtr> Requests; + NYdb::TCredentialsProviderPtr CredentialsProvider; + int64_t Cookie = 0; +}; + +std::unique_ptr<NActors::IActor> CreateCmsGrpcClientActor(const NCloud::TGrpcClientSettings& settings, const NYdb::TCredentialsProviderPtr& credentialsProvider) { + return std::make_unique<TCmsGrpcServiceActor>(settings, credentialsProvider); +} + +} diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp b/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp new file mode 100644 index 00000000000..1881a49e2a6 --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp @@ -0,0 +1,216 @@ +#include "compute_database_control_plane_service.h" + +#include <ydb/core/fq/libs/compute/ydb/events/events.h> +#include <ydb/core/fq/libs/control_plane_storage/events/events.h> +#include <ydb/core/fq/libs/config/protos/compute.pb.h> +#include <ydb/core/fq/libs/control_plane_storage/control_plane_storage.h> +#include <ydb/core/fq/libs/config/protos/issue_id.pb.h> +#include <ydb/core/fq/libs/ydb/ydb.h> + +#include <ydb/library/security/ydb_credentials_provider_factory.h> + +#include <ydb/public/lib/fq/scope.h> +#include <ydb/public/sdk/cpp/client/draft/ydb_query/client.h> +#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h> + +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/log.h> + +#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ComputeDatabaseControlPlane]: " << stream) +#define LOG_W(stream) LOG_WARN_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ComputeDatabaseControlPlane]: " << stream) +#define LOG_I(stream) LOG_INFO_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ComputeDatabaseControlPlane]: " << stream) +#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ComputeDatabaseControlPlane]: " << stream) +#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ComputeDatabaseControlPlane]: " << stream) + +namespace NFq { + +using namespace NActors; +using namespace NFq; + +class TCreateDatabaseRequestActor : public NActors::TActorBootstrapped<TCreateDatabaseRequestActor> { +public: + TCreateDatabaseRequestActor(const TActorId& databaseClientActorId, TEvYdbCompute::TEvCreateDatabaseRequest::TPtr& request) + : DatabaseClientActorId(databaseClientActorId) + , Request(request) + {} + + static constexpr char ActorName[] = "FQ_CREATE_DATABASE_REQUEST_ACTOR"; + + void Bootstrap() { + Send(NFq::ControlPlaneStorageServiceActorId(), new TEvControlPlaneStorage::TEvDescribeDatabaseRequest{Request.Get()->Get()->CloudId, Request.Get()->Get()->Scope}); + Become(&TCreateDatabaseRequestActor::StateFunc); + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvControlPlaneStorage::TEvDescribeDatabaseResponse, Handle); + hFunc(TEvYdbCompute::TEvCreateDatabaseResponse, Handle); + hFunc(TEvControlPlaneStorage::TEvCreateDatabaseResponse, Handle); + ) + + void Handle(TEvControlPlaneStorage::TEvDescribeDatabaseResponse::TPtr& ev) { + const auto issues = ev->Get()->Issues; + const auto result = ev->Get()->Record; + + if (issues && issues.back().IssueCode == TIssuesIds::ACCESS_DENIED) { + Send(DatabaseClientActorId, new TEvYdbCompute::TEvCreateDatabaseRequest{Request->Get()->CloudId, Request->Get()->Scope, Request->Get()->BasePath, Request->Get()->Path}); + return; + } + + if (issues) { + FailedAndPassAway(issues); + return; + } + + Send(Request->Sender, new TEvYdbCompute::TEvCreateDatabaseResponse{result}); + PassAway(); + } + + void Handle(TEvYdbCompute::TEvCreateDatabaseResponse::TPtr& ev) { + const auto issues = ev->Get()->Issues; + if (issues) { + FailedAndPassAway(issues); + return; + } + + Result = ev->Get()->Result; + Send(ControlPlaneStorageServiceActorId(), new TEvControlPlaneStorage::TEvCreateDatabaseRequest{Request->Get()->CloudId, Request->Get()->Scope, Result}); + } + + void Handle(TEvControlPlaneStorage::TEvCreateDatabaseResponse::TPtr& ev) { + const auto issues = ev->Get()->Issues; + if (issues) { + FailedAndPassAway(issues); + return; + } + + Send(Request->Sender, new TEvYdbCompute::TEvCreateDatabaseResponse{Result}); + PassAway(); + } + + void FailedAndPassAway(const NYql::TIssues& issues) { + Send(Request->Sender, new TEvYdbCompute::TEvCreateDatabaseResponse{issues}); + PassAway(); + } + + TActorId DatabaseClientActorId; + TEvYdbCompute::TEvCreateDatabaseRequest::TPtr Request; + FederatedQuery::Internal::ComputeDatabaseInternal Result; +}; + +class TComputeDatabaseControlPlaneServiceActor : public NActors::TActorBootstrapped<TComputeDatabaseControlPlaneServiceActor> { + struct TClientConfig { + TActorId ActorId; + NConfig::TComputeDatabaseConfig Config; + }; + +public: + TComputeDatabaseControlPlaneServiceActor(const NFq::NConfig::TComputeConfig& config, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory) + : Config(config) + , CredentialsProviderFactory(credentialsProviderFactory) + {} + + static constexpr char ActorName[] = "FQ_COMPUTE_DATABASE_SERVICE_ACTOR"; + + void Bootstrap() { + const auto& controlPlane = Config.GetYdb().GetControlPlane(); + switch (controlPlane.type_case()) { + case NConfig::TYdbComputeControlPlane::TYPE_NOT_SET: + case NConfig::TYdbComputeControlPlane::kSingle: + break; + case NConfig::TYdbComputeControlPlane::kCms: + CreateCmsClientActors(controlPlane.GetCms()); + break; + case NConfig::TYdbComputeControlPlane::kYdbcp: + CreateControlPlaneClientActors(controlPlane.GetYdbcp()); + break; + } + Become(&TComputeDatabaseControlPlaneServiceActor::StateFunc); + } + + static NCloud::TGrpcClientSettings CreateGrpcClientSettings(const NConfig::TComputeDatabaseConfig& config) { + NCloud::TGrpcClientSettings settings; + const auto& connection = config.GetConnection(); + settings.Endpoint = connection.GetEndpoint(); + settings.EnableSsl = connection.GetUseSsl(); + if (connection.GetCertificateFile()) { + settings.CertificateRootCA = StripString(TFileInput(connection.GetCertificateFile()).ReadAll()); + } + return settings; + } + + void CreateCmsClientActors(const NConfig::TYdbComputeControlPlane::TCms& cmsConfig) { + const auto& mapping = cmsConfig.GetDatabaseMapping(); + for (const auto& config: mapping.GetCommon()) { + CommonDatabaseClients.push_back({Register(CreateCmsGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.connection()))->CreateProvider()).release()), config}); + } + + Y_VERIFY(CommonDatabaseClients); + + for (const auto& [scope, config]: mapping.GetScopeToComputeDatabase()) { + ScopeToDatabaseClient[scope] = {Register(CreateCmsGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.connection()))->CreateProvider()).release()), config}; + } + } + + void CreateControlPlaneClientActors(const NConfig::TYdbComputeControlPlane::TYdbcp& controlPlaneConfig) { + const auto& mapping = controlPlaneConfig.GetDatabaseMapping(); + for (const auto& config: mapping.GetCommon()) { + CommonDatabaseClients.push_back({Register(CreateYdbcpGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.connection()))->CreateProvider()).release()), config}); + } + + Y_VERIFY(CommonDatabaseClients); + + for (const auto& [scope, config]: mapping.GetScopeToComputeDatabase()) { + ScopeToDatabaseClient[scope] = {Register(CreateYdbcpGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.connection()))->CreateProvider()).release()), config}; + } + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvYdbCompute::TEvCreateDatabaseRequest, Handle); + ) + + void Handle(TEvYdbCompute::TEvCreateDatabaseRequest::TPtr& ev) { + if (Config.GetYdb().GetControlPlane().HasSingle()) { + FederatedQuery::Internal::ComputeDatabaseInternal result; + *result.mutable_connection() = Config.GetYdb().GetControlPlane().GetSingle().GetConnection(); + Send(ev->Sender, new TEvYdbCompute::TEvCreateDatabaseResponse(result)); + return; + } + + const auto& scope = ev->Get()->Scope; + auto it = ScopeToDatabaseClient.find(scope); + if (it != ScopeToDatabaseClient.end()) { + FillRequest(ev, it->second.Config); + Register(new TCreateDatabaseRequestActor(it->second.ActorId, ev)); + return; + } + const auto& clientConfig = CommonDatabaseClients[MultiHash(scope) % CommonDatabaseClients.size()]; + FillRequest(ev, clientConfig.Config); + Register(new TCreateDatabaseRequestActor(clientConfig.ActorId, ev)); + } + + void FillRequest(TEvYdbCompute::TEvCreateDatabaseRequest::TPtr& ev, const NConfig::TComputeDatabaseConfig& config) { + NYdb::NFq::TScope scope(ev.Get()->Get()->Scope); + ev.Get()->Get()->BasePath = config.GetConnection().GetDatabase(); + ev.Get()->Get()->Path = config.GetTenant() ? config.GetTenant() + "/" + scope.ParseFolder() : scope.ParseFolder(); + } + +private: + NFq::NConfig::TComputeConfig Config; + TVector<TClientConfig> CommonDatabaseClients; + TMap<TString, TClientConfig> ScopeToDatabaseClient; + NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory; +}; + +std::unique_ptr<NActors::IActor> CreateComputeDatabaseControlPlaneServiceActor(const NFq::NConfig::TComputeConfig& config, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory) { + return std::make_unique<TComputeDatabaseControlPlaneServiceActor>(config, credentialsProviderFactory); +} + +NActors::TActorId ComputeDatabaseControlPlaneServiceActorId() { + constexpr TStringBuf name = "COMDBSRV"; + return NActors::TActorId(0, name); +} + +} diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h b/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h new file mode 100644 index 00000000000..7b2a13dfcb9 --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h @@ -0,0 +1,20 @@ +#pragma once + +#include <ydb/core/fq/libs/config/protos/compute.pb.h> + +#include <ydb/library/security/ydb_credentials_provider_factory.h> +#include <ydb/library/ycloud/impl/grpc_service_settings.h> + +#include <library/cpp/actors/core/actor.h> + +namespace NFq { + +NActors::TActorId ComputeDatabaseControlPlaneServiceActorId(); + +std::unique_ptr<NActors::IActor> CreateComputeDatabaseControlPlaneServiceActor(const NFq::NConfig::TComputeConfig& config, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory); + +std::unique_ptr<NActors::IActor> CreateYdbcpGrpcClientActor(const NCloud::TGrpcClientSettings& settings, const NYdb::TCredentialsProviderPtr& credentialsProvider); + +std::unique_ptr<NActors::IActor> CreateCmsGrpcClientActor(const NCloud::TGrpcClientSettings& settings, const NYdb::TCredentialsProviderPtr& credentialsProvider); + +} diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/ya.make b/ydb/core/fq/libs/compute/ydb/control_plane/ya.make new file mode 100644 index 00000000000..5692cef4366 --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/control_plane/ya.make @@ -0,0 +1,24 @@ +LIBRARY() + +SRCS( + cms_grpc_client_actor.cpp + compute_database_control_plane_service.cpp + ydbcp_grpc_client_actor.cpp +) + +PEERDIR( + library/cpp/actors/core + library/cpp/actors/protos + ydb/core/fq/libs/control_plane_storage/proto + ydb/core/fq/libs/quota_manager/proto + ydb/core/protos + ydb/library/db_pool/protos + ydb/library/yql/public/issue + ydb/public/api/grpc + ydb/public/api/grpc/draft + ydb/public/lib/operation_id/protos +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/ydbcp_grpc_client_actor.cpp b/ydb/core/fq/libs/compute/ydb/control_plane/ydbcp_grpc_client_actor.cpp new file mode 100644 index 00000000000..22acd0a55ff --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/control_plane/ydbcp_grpc_client_actor.cpp @@ -0,0 +1,38 @@ +#include <ydb/core/fq/libs/compute/ydb/events/events.h> + +#include <ydb/core/protos/services.pb.h> + +#include <ydb/library/security/ydb_credentials_provider_factory.h> +#include <ydb/library/ycloud/api/events.h> +#include <ydb/library/ycloud/impl/grpc_service_client.h> + +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/event.h> +#include <library/cpp/actors/core/hfunc.h> + +namespace NFq { + +class TYdbcpGrpcServiceActor : public NActors::TActor<TYdbcpGrpcServiceActor> { +public: + using TBase = NActors::TActor<TYdbcpGrpcServiceActor>; + TYdbcpGrpcServiceActor(const NCloud::TGrpcClientSettings&, + const NYdb::TCredentialsProviderPtr&) + : TBase(&TYdbcpGrpcServiceActor::StateFunc) + {} + + STRICT_STFUNC(StateFunc, + hFunc(TEvYdbCompute::TEvCreateDatabaseRequest, Handle); + ) + + void Handle(TEvYdbCompute::TEvCreateDatabaseRequest::TPtr& ev) { + auto forwardResponse = std::make_unique<TEvYdbCompute::TEvCreateDatabaseResponse>(); + forwardResponse->Issues.AddIssue("Ydbcp grpc client hasn't supported yet"); + Send(ev->Sender, forwardResponse.release(), 0, ev->Cookie); + } +}; + +std::unique_ptr<NActors::IActor> CreateYdbcpGrpcClientActor(const NCloud::TGrpcClientSettings& settings, const NYdb::TCredentialsProviderPtr& credentialsProvider) { + return std::make_unique<TYdbcpGrpcServiceActor>(settings, credentialsProvider); +} + +} diff --git a/ydb/core/fq/libs/compute/ydb/events/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/compute/ydb/events/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..d42e81090a6 --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/events/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(compute-ydb-events) +target_link_libraries(compute-ydb-events PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core + libs-config-protos + libs-control_plane_storage-proto + fq-libs-protos + api-grpc-draft + lib-operation_id-protos +) +target_sources(compute-ydb-events PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/events/events.cpp +) diff --git a/ydb/core/fq/libs/compute/ydb/events/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/compute/ydb/events/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..086155d27af --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/events/CMakeLists.linux-aarch64.txt @@ -0,0 +1,24 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(compute-ydb-events) +target_link_libraries(compute-ydb-events PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-actors-core + libs-config-protos + libs-control_plane_storage-proto + fq-libs-protos + api-grpc-draft + lib-operation_id-protos +) +target_sources(compute-ydb-events PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/events/events.cpp +) diff --git a/ydb/core/fq/libs/compute/ydb/events/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/compute/ydb/events/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..086155d27af --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/events/CMakeLists.linux-x86_64.txt @@ -0,0 +1,24 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(compute-ydb-events) +target_link_libraries(compute-ydb-events PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-actors-core + libs-config-protos + libs-control_plane_storage-proto + fq-libs-protos + api-grpc-draft + lib-operation_id-protos +) +target_sources(compute-ydb-events PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/events/events.cpp +) diff --git a/ydb/core/fq/libs/compute/ydb/events/CMakeLists.txt b/ydb/core/fq/libs/compute/ydb/events/CMakeLists.txt new file mode 100644 index 00000000000..f8b31df0c11 --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/events/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/fq/libs/compute/ydb/events/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/compute/ydb/events/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..d42e81090a6 --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/events/CMakeLists.windows-x86_64.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(compute-ydb-events) +target_link_libraries(compute-ydb-events PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core + libs-config-protos + libs-control_plane_storage-proto + fq-libs-protos + api-grpc-draft + lib-operation_id-protos +) +target_sources(compute-ydb-events PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/events/events.cpp +) diff --git a/ydb/core/fq/libs/compute/ydb/events/events.cpp b/ydb/core/fq/libs/compute/ydb/events/events.cpp new file mode 100644 index 00000000000..6c3d2603e7e --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/events/events.cpp @@ -0,0 +1 @@ +#include "events.h" diff --git a/ydb/core/fq/libs/compute/ydb/events/events.h b/ydb/core/fq/libs/compute/ydb/events/events.h index de40ba19cbf..c593c9af0f9 100644 --- a/ydb/core/fq/libs/compute/ydb/events/events.h +++ b/ydb/core/fq/libs/compute/ydb/events/events.h @@ -1,23 +1,24 @@ #pragma once -#include <ydb/core/fq/libs/control_plane_storage/events/events.h> -#include <ydb/core/fq/libs/quota_manager/events/events.h> - -#include <ydb/public/api/protos/draft/fq.pb.h> +#include <ydb/core/fq/libs/control_plane_storage/proto/yq_internal.pb.h> +#include <ydb/core/fq/libs/events/event_subspace.h> +#include <ydb/core/fq/libs/protos/fq_private.pb.h> #include <ydb/public/sdk/cpp/client/draft/ydb_query/query.h> +#include <ydb/public/sdk/cpp/client/ydb_types/operation/operation.h> +#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h> + +#include <ydb/library/yql/public/issue/yql_issue.h> #include <library/cpp/actors/core/event_pb.h> #include <library/cpp/actors/core/events.h> #include <library/cpp/actors/interconnect/events_local.h> -#include <ydb/library/yql/public/issue/yql_issue.h> - namespace NFq { -struct TEvPrivate { +struct TEvYdbCompute { // Event ids enum EEv : ui32 { - EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), + EvBegin = YqEventSubspaceBegin(NFq::TYqEventSubspace::YdbCompute), EvExecuteScriptRequest = EvBegin, EvExecuteScriptResponse, @@ -29,6 +30,8 @@ struct TEvPrivate { EvCancelOperationResponse, EvForgetOperationRequest, EvForgetOperationResponse, + EvCreateDatabaseRequest, + EvCreateDatabaseResponse, EvExecuterResponse, EvStatusTrackerResponse, @@ -160,6 +163,44 @@ struct TEvPrivate { NYdb::EStatus Status; }; + struct TEvCreateDatabaseRequest : public NActors::TEventLocal<TEvCreateDatabaseRequest, EvCreateDatabaseRequest> { + TEvCreateDatabaseRequest(const TString& cloudId, const TString& scope) + : CloudId(cloudId) + , Scope(scope) + {} + + TEvCreateDatabaseRequest(const TString& cloudId, + const TString& scope, + const TString& basePath, + const TString& path) + : CloudId(cloudId) + , Scope(scope) + , BasePath(basePath) + , Path(path) + {} + + TString CloudId; + TString Scope; + TString BasePath; + TString Path; + }; + + struct TEvCreateDatabaseResponse : public NActors::TEventLocal<TEvCreateDatabaseResponse, EvCreateDatabaseResponse> { + TEvCreateDatabaseResponse() + {} + + explicit TEvCreateDatabaseResponse(NYql::TIssues issues) + : Issues(issues) + {} + + TEvCreateDatabaseResponse(const FederatedQuery::Internal::ComputeDatabaseInternal& result) + : Result(result) + {} + + FederatedQuery::Internal::ComputeDatabaseInternal Result; + NYql::TIssues Issues; + }; + struct TEvExecuterResponse : public NActors::TEventLocal<TEvExecuterResponse, EvExecuterResponse> { TEvExecuterResponse(NYdb::TOperation::TOperationId operationId, const TString& executionId) : OperationId(operationId) diff --git a/ydb/core/fq/libs/compute/ydb/events/ya.make b/ydb/core/fq/libs/compute/ydb/events/ya.make new file mode 100644 index 00000000000..5bc14a3a972 --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/events/ya.make @@ -0,0 +1,16 @@ +LIBRARY() + +SRCS( + events.cpp +) + +PEERDIR( + library/cpp/actors/core + ydb/core/fq/libs/config/protos + ydb/core/fq/libs/control_plane_storage/proto + ydb/core/fq/libs/protos + ydb/public/api/grpc/draft + ydb/public/lib/operation_id/protos +) + +END() diff --git a/ydb/core/fq/libs/compute/ydb/executer_actor.cpp b/ydb/core/fq/libs/compute/ydb/executer_actor.cpp index 563c3151cda..feed9c68d5a 100644 --- a/ydb/core/fq/libs/compute/ydb/executer_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/executer_actor.cpp @@ -77,7 +77,7 @@ public: } STRICT_STFUNC(StateFunc, - hFunc(TEvPrivate::TEvExecuteScriptResponse, Handle); + hFunc(TEvYdbCompute::TEvExecuteScriptResponse, Handle); hFunc(TEvents::TEvForwardPingResponse, Handle); ) @@ -88,22 +88,22 @@ public: if (ev.Get()->Get()->Success) { pingCounters->Ok->Inc(); LOG_I("Information about the operation id and execution id is stored. ExecutionId: " << ExecutionId << " OperationId: " << ProtoToString(OperationId)); - Send(Parent, new TEvPrivate::TEvExecuterResponse(OperationId, ExecutionId)); + Send(Parent, new TEvYdbCompute::TEvExecuterResponse(OperationId, ExecutionId)); CompleteAndPassAway(); } else { pingCounters->Error->Inc(); // Without the idempotency key, we lose the running operation here LOG_E("Error saving information about the operation id and execution id. ExecutionId: " << ExecutionId << " OperationId: " << ProtoToString(OperationId)); - Send(Parent, new TEvPrivate::TEvExecuterResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error saving information about the operation id and execution id. ExecutionId: " << ExecutionId << " OperationId: " << ProtoToString(OperationId)}})); + Send(Parent, new TEvYdbCompute::TEvExecuterResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error saving information about the operation id and execution id. ExecutionId: " << ExecutionId << " OperationId: " << ProtoToString(OperationId)}})); FailedAndPassAway(); } } - void Handle(const TEvPrivate::TEvExecuteScriptResponse::TPtr& ev) { + void Handle(const TEvYdbCompute::TEvExecuteScriptResponse::TPtr& ev) { const auto& response = *ev.Get()->Get(); if (response.Status != NYdb::EStatus::SUCCESS) { LOG_E("Can't execute script: " << ev->Get()->Issues.ToOneLineString()); - Send(Parent, new TEvPrivate::TEvExecuterResponse(ev->Get()->Issues)); + Send(Parent, new TEvYdbCompute::TEvExecuterResponse(ev->Get()->Issues)); FailedAndPassAway(); return; } @@ -114,7 +114,7 @@ public: } void SendExecuteScript() { - Register(new TRetryActor<TEvPrivate::TEvExecuteScriptRequest, TEvPrivate::TEvExecuteScriptResponse, TString, TString>(Counters.GetCounters(ERequestType::RT_EXECUTE_SCRIPT), SelfId(), Connector, Params.Sql, Params.JobId)); + Register(new TRetryActor<TEvYdbCompute::TEvExecuteScriptRequest, TEvYdbCompute::TEvExecuteScriptResponse, TString, TString>(Counters.GetCounters(ERequestType::RT_EXECUTE_SCRIPT), SelfId(), Connector, Params.Sql, Params.JobId)); } void SendPingTask() { diff --git a/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp b/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp index 137169651c4..41563e65e90 100644 --- a/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp @@ -94,12 +94,12 @@ public: if (ev.Get()->Get()->Success) { pingCounters->Ok->Inc(); LOG_I("Query moved to terminal state "); - Send(Parent, new TEvPrivate::TEvFinalizerResponse({}, NYdb::EStatus::SUCCESS)); + Send(Parent, new TEvYdbCompute::TEvFinalizerResponse({}, NYdb::EStatus::SUCCESS)); CompleteAndPassAway(); } else { pingCounters->Error->Inc(); LOG_E("Error moving the query to the terminal state"); - Send(Parent, new TEvPrivate::TEvFinalizerResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error moving the query to the terminal state"}}, NYdb::EStatus::INTERNAL_ERROR)); + Send(Parent, new TEvYdbCompute::TEvFinalizerResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error moving the query to the terminal state"}}, NYdb::EStatus::INTERNAL_ERROR)); FailedAndPassAway(); } } diff --git a/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.cpp b/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.cpp index 90c0fda4cac..4a7884cb314 100644 --- a/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.cpp @@ -72,23 +72,23 @@ public: void Start() { LOG_I("Start resources cleaner actor. Compute state: " << FederatedQuery::QueryMeta::ComputeStatus_Name(Params.Status)); Become(&TResourcesCleanerActor::StateFunc); - Register(new TRetryActor<TEvPrivate::TEvForgetOperationRequest, TEvPrivate::TEvForgetOperationResponse, NYdb::TOperation::TOperationId>(Counters.GetCounters(ERequestType::RT_FORGET_OPERATION), SelfId(), Connector, OperationId)); + Register(new TRetryActor<TEvYdbCompute::TEvForgetOperationRequest, TEvYdbCompute::TEvForgetOperationResponse, NYdb::TOperation::TOperationId>(Counters.GetCounters(ERequestType::RT_FORGET_OPERATION), SelfId(), Connector, OperationId)); } STRICT_STFUNC(StateFunc, - hFunc(TEvPrivate::TEvForgetOperationResponse, Handle); + hFunc(TEvYdbCompute::TEvForgetOperationResponse, Handle); ) - void Handle(const TEvPrivate::TEvForgetOperationResponse::TPtr& ev) { + void Handle(const TEvYdbCompute::TEvForgetOperationResponse::TPtr& ev) { const auto& response = *ev.Get()->Get(); if (response.Status != NYdb::EStatus::SUCCESS) { LOG_E("Can't forget operation: " << ev->Get()->Issues.ToOneLineString()); - Send(Parent, new TEvPrivate::TEvResourcesCleanerResponse(ev->Get()->Issues, ev->Get()->Status)); + Send(Parent, new TEvYdbCompute::TEvResourcesCleanerResponse(ev->Get()->Issues, ev->Get()->Status)); FailedAndPassAway(); return; } LOG_I("Operation successfully forgotten"); - Send(Parent, new TEvPrivate::TEvResourcesCleanerResponse({}, NYdb::EStatus::SUCCESS)); + Send(Parent, new TEvYdbCompute::TEvResourcesCleanerResponse({}, NYdb::EStatus::SUCCESS)); CompleteAndPassAway(); } diff --git a/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp b/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp index 04d5f1df558..603ddb62878 100644 --- a/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp @@ -13,6 +13,7 @@ #include <ydb/public/sdk/cpp/client/draft/ydb_query/client.h> #include <ydb/public/sdk/cpp/client/ydb_operation/operation.h> +#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> #include <library/cpp/actors/core/actor.h> #include <library/cpp/actors/core/actor_bootstrapped.h> @@ -83,7 +84,7 @@ public: } STRICT_STFUNC(StateFunc, - hFunc(TEvPrivate::TEvFetchScriptResultResponse, Handle); + hFunc(TEvYdbCompute::TEvFetchScriptResultResponse, Handle); hFunc(NFq::TEvInternalService::TEvWriteResultResponse, Handle); hFunc(TEvents::TEvForwardPingResponse, Handle); ) @@ -95,21 +96,21 @@ public: if (ev.Get()->Get()->Success) { pingCounters->Ok->Inc(); LOG_I("The result has been moved"); - Send(Parent, new TEvPrivate::TEvResultWriterResponse({}, NYdb::EStatus::SUCCESS)); + Send(Parent, new TEvYdbCompute::TEvResultWriterResponse({}, NYdb::EStatus::SUCCESS)); CompleteAndPassAway(); } else { pingCounters->Error->Inc(); LOG_E("Move result error"); - Send(Parent, new TEvPrivate::TEvResultWriterResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Move result error. ExecutionId: " << ExecutionId}}, NYdb::EStatus::INTERNAL_ERROR)); + Send(Parent, new TEvYdbCompute::TEvResultWriterResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Move result error. ExecutionId: " << ExecutionId}}, NYdb::EStatus::INTERNAL_ERROR)); FailedAndPassAway(); } } - void Handle(const TEvPrivate::TEvFetchScriptResultResponse::TPtr& ev) { + void Handle(const TEvYdbCompute::TEvFetchScriptResultResponse::TPtr& ev) { const auto& response = *ev.Get()->Get(); if (response.Status != NYdb::EStatus::SUCCESS) { LOG_E("Can't fetch script result: " << ev->Get()->Issues.ToOneLineString()); - Send(Parent, new TEvPrivate::TEvResultWriterResponse(ev->Get()->Issues, NYdb::EStatus::INTERNAL_ERROR)); + Send(Parent, new TEvYdbCompute::TEvResultWriterResponse(ev->Get()->Issues, NYdb::EStatus::INTERNAL_ERROR)); FailedAndPassAway(); return; } @@ -150,7 +151,7 @@ public: } else { writeResultCounters->Error->Inc(); LOG_E("Error writing result for offset " << Offset); - Send(Parent, new TEvPrivate::TEvResultWriterResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error writing result for offset " << Offset}}, NYdb::EStatus::INTERNAL_ERROR)); + Send(Parent, new TEvYdbCompute::TEvResultWriterResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error writing result for offset " << Offset}}, NYdb::EStatus::INTERNAL_ERROR)); FailedAndPassAway(); } } @@ -159,7 +160,7 @@ public: auto fetchScriptResultCounters = Counters.GetCounters(ERequestType::RT_FETCH_SCRIPT_RESULT); fetchScriptResultCounters->InFly->Inc(); StartTime = TInstant::Now(); - Register(new TRetryActor<TEvPrivate::TEvFetchScriptResultRequest, TEvPrivate::TEvFetchScriptResultResponse, TString, int64_t, int64_t>(Counters.GetCounters(ERequestType::RT_FETCH_SCRIPT_RESULT), SelfId(), Connector, ExecutionId, 0, Offset)); + Register(new TRetryActor<TEvYdbCompute::TEvFetchScriptResultRequest, TEvYdbCompute::TEvFetchScriptResultResponse, TString, int64_t, int64_t>(Counters.GetCounters(ERequestType::RT_FETCH_SCRIPT_RESULT), SelfId(), Connector, ExecutionId, 0, Offset)); } Fq::Private::WriteTaskResultRequest CreateProtoRequestWithoutResultSet(ui64 startRowIndex) { diff --git a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp index 19f428fe3d0..f062d0671a4 100644 --- a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp @@ -34,7 +34,7 @@ using namespace NFq; class TStatusTrackerActor : public TBaseComputeActor<TStatusTrackerActor> { public: - using IRetryPolicy = IRetryPolicy<const TEvPrivate::TEvGetOperationResponse::TPtr&>; + using IRetryPolicy = IRetryPolicy<const TEvYdbCompute::TEvGetOperationResponse::TPtr&>; enum ERequestType { RT_GET_OPERATION, @@ -83,7 +83,7 @@ public: } STRICT_STFUNC(StateFunc, - hFunc(TEvPrivate::TEvGetOperationResponse, Handle); + hFunc(TEvYdbCompute::TEvGetOperationResponse, Handle); hFunc(TEvents::TEvForwardPingResponse, Handle); ) @@ -94,21 +94,21 @@ public: if (ev.Get()->Get()->Success) { pingCounters->Ok->Inc(); LOG_I("Information about the status of operation is stored"); - Send(Parent, new TEvPrivate::TEvStatusTrackerResponse(Issues, Status, ExecStatus)); + Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(Issues, Status, ExecStatus)); CompleteAndPassAway(); } else { pingCounters->Error->Inc(); LOG_E("Error saving information about the status of operation"); - Send(Parent, new TEvPrivate::TEvStatusTrackerResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error saving information about the status of operation: " << ProtoToString(OperationId)}}, NYdb::EStatus::INTERNAL_ERROR, ExecStatus)); + Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error saving information about the status of operation: " << ProtoToString(OperationId)}}, NYdb::EStatus::INTERNAL_ERROR, ExecStatus)); FailedAndPassAway(); } } - void Handle(const TEvPrivate::TEvGetOperationResponse::TPtr& ev) { + void Handle(const TEvYdbCompute::TEvGetOperationResponse::TPtr& ev) { const auto& response = *ev.Get()->Get(); if (response.Status != NYdb::EStatus::SUCCESS) { LOG_E("Can't get operation: " << ev->Get()->Issues.ToOneLineString()); - Send(Parent, new TEvPrivate::TEvStatusTrackerResponse(ev->Get()->Issues, ev->Get()->Status, ExecStatus)); + Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(ev->Get()->Issues, ev->Get()->Status, ExecStatus)); FailedAndPassAway(); return; } @@ -138,7 +138,7 @@ public: } void SendGetOperation(const TDuration& delay = TDuration::Zero()) { - Register(new TRetryActor<TEvPrivate::TEvGetOperationRequest, TEvPrivate::TEvGetOperationResponse, NYdb::TOperation::TOperationId>(Counters.GetCounters(ERequestType::RT_GET_OPERATION), delay, SelfId(), Connector, OperationId)); + Register(new TRetryActor<TEvYdbCompute::TEvGetOperationRequest, TEvYdbCompute::TEvGetOperationResponse, NYdb::TOperation::TOperationId>(Counters.GetCounters(ERequestType::RT_GET_OPERATION), delay, SelfId(), Connector, OperationId)); } void Failed() { diff --git a/ydb/core/fq/libs/compute/ydb/stopper_actor.cpp b/ydb/core/fq/libs/compute/ydb/stopper_actor.cpp index 59e60c6128b..b7d0596e7ef 100644 --- a/ydb/core/fq/libs/compute/ydb/stopper_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/stopper_actor.cpp @@ -72,23 +72,23 @@ public: void Start() { LOG_I("Start stopper actor. Compute state: " << FederatedQuery::QueryMeta::ComputeStatus_Name(Params.Status)); Become(&TStopperActor::StateFunc); - Register(new TRetryActor<TEvPrivate::TEvCancelOperationRequest, TEvPrivate::TEvCancelOperationResponse, NYdb::TOperation::TOperationId>(Counters.GetCounters(ERequestType::RT_CANCEL_OPERATION), SelfId(), Connector, OperationId)); + Register(new TRetryActor<TEvYdbCompute::TEvCancelOperationRequest, TEvYdbCompute::TEvCancelOperationResponse, NYdb::TOperation::TOperationId>(Counters.GetCounters(ERequestType::RT_CANCEL_OPERATION), SelfId(), Connector, OperationId)); } STRICT_STFUNC(StateFunc, - hFunc(TEvPrivate::TEvCancelOperationResponse, Handle); + hFunc(TEvYdbCompute::TEvCancelOperationResponse, Handle); ) - void Handle(const TEvPrivate::TEvCancelOperationResponse::TPtr& ev) { + void Handle(const TEvYdbCompute::TEvCancelOperationResponse::TPtr& ev) { const auto& response = *ev.Get()->Get(); if (response.Status != NYdb::EStatus::SUCCESS) { LOG_E("Can't cancel operation: " << ev->Get()->Issues.ToOneLineString()); - Send(Parent, new TEvPrivate::TEvStopperResponse(ev->Get()->Issues, ev->Get()->Status)); + Send(Parent, new TEvYdbCompute::TEvStopperResponse(ev->Get()->Issues, ev->Get()->Status)); FailedAndPassAway(); return; } LOG_I("Operation successfully canceled"); - Send(Parent, new TEvPrivate::TEvStopperResponse({}, NYdb::EStatus::SUCCESS)); + Send(Parent, new TEvYdbCompute::TEvStopperResponse({}, NYdb::EStatus::SUCCESS)); CompleteAndPassAway(); } diff --git a/ydb/core/fq/libs/compute/ydb/ya.make b/ydb/core/fq/libs/compute/ydb/ya.make index c4f65784190..62fba34c4d1 100644 --- a/ydb/core/fq/libs/compute/ydb/ya.make +++ b/ydb/core/fq/libs/compute/ydb/ya.make @@ -35,3 +35,8 @@ PEERDIR( YQL_LAST_ABI_VERSION() END() + +RECURSE( + control_plane + events +) diff --git a/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp index 50a820d1721..4c76f624c27 100644 --- a/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp @@ -1,12 +1,16 @@ +#include <ydb/core/fq/libs/compute/common/run_actor_params.h> +#include <ydb/core/fq/libs/compute/ydb/events/events.h> +#include <ydb/core/fq/libs/ydb/ydb.h> + +#include <ydb/public/sdk/cpp/client/draft/ydb_query/client.h> +#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h> + +#include <ydb/library/yql/public/issue/yql_issue_message.h> + #include <library/cpp/actors/core/actor.h> #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/actorsystem.h> #include <library/cpp/actors/core/hfunc.h> -#include <ydb/core/fq/libs/compute/ydb/events/events.h> -#include <ydb/core/fq/libs/compute/common/run_actor_params.h> -#include <ydb/public/sdk/cpp/client/draft/ydb_query/client.h> -#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h> -#include <ydb/core/fq/libs/ydb/ydb.h> namespace NFq { @@ -16,61 +20,63 @@ using namespace NFq; class TYdbConnectorActor : public NActors::TActorBootstrapped<TYdbConnectorActor> { public: explicit TYdbConnectorActor(const TRunActorParams& params) - : Params(params) + : YqSharedResources(params.YqSharedResources) + , CredentialsProviderFactory(params.CredentialsProviderFactory) + , ComputeConnection(params.ComputeConnection) {} void Bootstrap() { - auto querySettings = NFq::GetClientSettings<NYdb::NQuery::TClientSettings>(Params.Config.GetCompute().GetYdb().GetConnection(), Params.CredentialsProviderFactory); - QueryClient = std::make_unique<NYdb::NQuery::TQueryClient>(Params.YqSharedResources->UserSpaceYdbDriver, querySettings); - auto operationSettings = NFq::GetClientSettings<NYdb::TCommonClientSettings>(Params.Config.GetCompute().GetYdb().GetConnection(), Params.CredentialsProviderFactory); - OperationClient = std::make_unique<NYdb::NOperation::TOperationClient>(Params.YqSharedResources->UserSpaceYdbDriver, operationSettings); + auto querySettings = NFq::GetClientSettings<NYdb::NQuery::TClientSettings>(ComputeConnection, CredentialsProviderFactory); + QueryClient = std::make_unique<NYdb::NQuery::TQueryClient>(YqSharedResources->UserSpaceYdbDriver, querySettings); + auto operationSettings = NFq::GetClientSettings<NYdb::TCommonClientSettings>(ComputeConnection, CredentialsProviderFactory); + OperationClient = std::make_unique<NYdb::NOperation::TOperationClient>(YqSharedResources->UserSpaceYdbDriver, operationSettings); Become(&TYdbConnectorActor::StateFunc); } STRICT_STFUNC(StateFunc, - hFunc(TEvPrivate::TEvExecuteScriptRequest, Handle); - hFunc(TEvPrivate::TEvGetOperationRequest, Handle); - hFunc(TEvPrivate::TEvFetchScriptResultRequest, Handle); - hFunc(TEvPrivate::TEvCancelOperationRequest, Handle); - hFunc(TEvPrivate::TEvForgetOperationRequest, Handle); + hFunc(TEvYdbCompute::TEvExecuteScriptRequest, Handle); + hFunc(TEvYdbCompute::TEvGetOperationRequest, Handle); + hFunc(TEvYdbCompute::TEvFetchScriptResultRequest, Handle); + hFunc(TEvYdbCompute::TEvCancelOperationRequest, Handle); + hFunc(TEvYdbCompute::TEvForgetOperationRequest, Handle); cFunc(NActors::TEvents::TEvPoisonPill::EventType, PassAway); ) - void Handle(const TEvPrivate::TEvExecuteScriptRequest::TPtr& ev) { + void Handle(const TEvYdbCompute::TEvExecuteScriptRequest::TPtr& ev) { QueryClient ->ExecuteScript(ev->Get()->Sql) .Apply([actorSystem = NActors::TActivationContext::ActorSystem(), recipient = ev->Sender, cookie = ev->Cookie](auto future) { try { auto response = future.ExtractValueSync(); if (response.Status().IsSuccess()) { - actorSystem->Send(recipient, new TEvPrivate::TEvExecuteScriptResponse(response.Id(), response.Metadata().ExecutionId), 0, cookie); + actorSystem->Send(recipient, new TEvYdbCompute::TEvExecuteScriptResponse(response.Id(), response.Metadata().ExecutionId), 0, cookie); } else { - actorSystem->Send(recipient, new TEvPrivate::TEvExecuteScriptResponse(response.Status().GetIssues(), response.Status().GetStatus()), 0, cookie); + actorSystem->Send(recipient, new TEvYdbCompute::TEvExecuteScriptResponse(response.Status().GetIssues(), response.Status().GetStatus()), 0, cookie); } } catch (...) { - actorSystem->Send(recipient, new TEvPrivate::TEvExecuteScriptResponse(NYql::TIssues{NYql::TIssue{CurrentExceptionMessage()}}, NYdb::EStatus::GENERIC_ERROR), 0, cookie); + actorSystem->Send(recipient, new TEvYdbCompute::TEvExecuteScriptResponse(NYql::TIssues{NYql::TIssue{CurrentExceptionMessage()}}, NYdb::EStatus::GENERIC_ERROR), 0, cookie); } }); } - void Handle(const TEvPrivate::TEvGetOperationRequest::TPtr& ev) { + void Handle(const TEvYdbCompute::TEvGetOperationRequest::TPtr& ev) { OperationClient ->Get<NYdb::NQuery::TScriptExecutionOperation>(ev->Get()->OperationId) .Apply([actorSystem = NActors::TActivationContext::ActorSystem(), recipient = ev->Sender, cookie = ev->Cookie](auto future) { try { auto response = future.ExtractValueSync(); if (response.Status().IsSuccess()) { - actorSystem->Send(recipient, new TEvPrivate::TEvGetOperationResponse(response.Metadata().ExecStatus, response.Status().GetIssues()), 0, cookie); + actorSystem->Send(recipient, new TEvYdbCompute::TEvGetOperationResponse(response.Metadata().ExecStatus, response.Status().GetIssues()), 0, cookie); } else { - actorSystem->Send(recipient, new TEvPrivate::TEvGetOperationResponse(response.Status().GetIssues(), response.Status().GetStatus()), 0, cookie); + actorSystem->Send(recipient, new TEvYdbCompute::TEvGetOperationResponse(response.Status().GetIssues(), response.Status().GetStatus()), 0, cookie); } } catch (...) { - actorSystem->Send(recipient, new TEvPrivate::TEvGetOperationResponse(NYql::TIssues{NYql::TIssue{CurrentExceptionMessage()}}, NYdb::EStatus::GENERIC_ERROR), 0, cookie); + actorSystem->Send(recipient, new TEvYdbCompute::TEvGetOperationResponse(NYql::TIssues{NYql::TIssue{CurrentExceptionMessage()}}, NYdb::EStatus::GENERIC_ERROR), 0, cookie); } }); } - void Handle(const TEvPrivate::TEvFetchScriptResultRequest::TPtr& ev) { + void Handle(const TEvYdbCompute::TEvFetchScriptResultRequest::TPtr& ev) { NYdb::NQuery::TFetchScriptResultsSettings settings; settings.RowsOffset(ev->Get()->RowOffset); QueryClient @@ -79,44 +85,46 @@ public: try { auto response = future.ExtractValueSync(); if (response.IsSuccess()) { - actorSystem->Send(recipient, new TEvPrivate::TEvFetchScriptResultResponse(response.ExtractResultSet()), 0, cookie); + actorSystem->Send(recipient, new TEvYdbCompute::TEvFetchScriptResultResponse(response.ExtractResultSet()), 0, cookie); } else { - actorSystem->Send(recipient, new TEvPrivate::TEvFetchScriptResultResponse(response.GetIssues(), response.GetStatus()), 0, cookie); + actorSystem->Send(recipient, new TEvYdbCompute::TEvFetchScriptResultResponse(response.GetIssues(), response.GetStatus()), 0, cookie); } } catch (...) { - actorSystem->Send(recipient, new TEvPrivate::TEvFetchScriptResultResponse(NYql::TIssues{NYql::TIssue{CurrentExceptionMessage()}}, NYdb::EStatus::GENERIC_ERROR), 0, cookie); + actorSystem->Send(recipient, new TEvYdbCompute::TEvFetchScriptResultResponse(NYql::TIssues{NYql::TIssue{CurrentExceptionMessage()}}, NYdb::EStatus::GENERIC_ERROR), 0, cookie); } }); } - void Handle(const TEvPrivate::TEvCancelOperationRequest::TPtr& ev) { + void Handle(const TEvYdbCompute::TEvCancelOperationRequest::TPtr& ev) { OperationClient ->Cancel(ev->Get()->OperationId) .Apply([actorSystem = NActors::TActivationContext::ActorSystem(), recipient = ev->Sender, cookie = ev->Cookie](auto future) { try { auto response = future.ExtractValueSync(); - actorSystem->Send(recipient, new TEvPrivate::TEvCancelOperationResponse(response.GetIssues(), response.GetStatus()), 0, cookie); + actorSystem->Send(recipient, new TEvYdbCompute::TEvCancelOperationResponse(response.GetIssues(), response.GetStatus()), 0, cookie); } catch (...) { - actorSystem->Send(recipient, new TEvPrivate::TEvCancelOperationResponse(NYql::TIssues{NYql::TIssue{CurrentExceptionMessage()}}, NYdb::EStatus::GENERIC_ERROR), 0, cookie); + actorSystem->Send(recipient, new TEvYdbCompute::TEvCancelOperationResponse(NYql::TIssues{NYql::TIssue{CurrentExceptionMessage()}}, NYdb::EStatus::GENERIC_ERROR), 0, cookie); } }); } - void Handle(const TEvPrivate::TEvForgetOperationRequest::TPtr& ev) { + void Handle(const TEvYdbCompute::TEvForgetOperationRequest::TPtr& ev) { OperationClient ->Forget(ev->Get()->OperationId) .Apply([actorSystem = NActors::TActivationContext::ActorSystem(), recipient = ev->Sender, cookie = ev->Cookie](auto future) { try { auto response = future.ExtractValueSync(); - actorSystem->Send(recipient, new TEvPrivate::TEvForgetOperationResponse(response.GetIssues(), response.GetStatus()), 0, cookie); + actorSystem->Send(recipient, new TEvYdbCompute::TEvForgetOperationResponse(response.GetIssues(), response.GetStatus()), 0, cookie); } catch (...) { - actorSystem->Send(recipient, new TEvPrivate::TEvForgetOperationResponse(NYql::TIssues{NYql::TIssue{CurrentExceptionMessage()}}, NYdb::EStatus::GENERIC_ERROR), 0, cookie); + actorSystem->Send(recipient, new TEvYdbCompute::TEvForgetOperationResponse(NYql::TIssues{NYql::TIssue{CurrentExceptionMessage()}}, NYdb::EStatus::GENERIC_ERROR), 0, cookie); } }); } private: - TRunActorParams Params; + TYqSharedResources::TPtr YqSharedResources; + NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory; + NConfig::TYdbStorageConfig ComputeConnection; std::unique_ptr<NYdb::NQuery::TQueryClient> QueryClient; std::unique_ptr<NYdb::NOperation::TOperationClient> OperationClient; }; diff --git a/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp b/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp index c1da57d093c..c00b1b0669e 100644 --- a/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp @@ -61,15 +61,15 @@ public: } STRICT_STFUNC(StateFunc, - hFunc(TEvPrivate::TEvExecuterResponse, Handle); - hFunc(TEvPrivate::TEvStatusTrackerResponse, Handle); - hFunc(TEvPrivate::TEvResultWriterResponse, Handle); - hFunc(TEvPrivate::TEvResourcesCleanerResponse, Handle); - hFunc(TEvPrivate::TEvFinalizerResponse, Handle); - hFunc(TEvPrivate::TEvStopperResponse, Handle); + hFunc(TEvYdbCompute::TEvExecuterResponse, Handle); + hFunc(TEvYdbCompute::TEvStatusTrackerResponse, Handle); + hFunc(TEvYdbCompute::TEvResultWriterResponse, Handle); + hFunc(TEvYdbCompute::TEvResourcesCleanerResponse, Handle); + hFunc(TEvYdbCompute::TEvFinalizerResponse, Handle); + hFunc(TEvYdbCompute::TEvStopperResponse, Handle); ) - void Handle(const TEvPrivate::TEvExecuterResponse::TPtr& ev) { + void Handle(const TEvYdbCompute::TEvExecuterResponse::TPtr& ev) { auto& response = *ev->Get(); if (!response.Success) { LOG_I("ExecuterResponse (failed). Issues: " << response.Issues.ToOneLineString()); @@ -82,7 +82,7 @@ public: Register(ActorFactory->CreateStatusTracker(SelfId(), Connector, Pinger, Params.OperationId).release()); } - void Handle(const TEvPrivate::TEvStatusTrackerResponse::TPtr& ev) { + void Handle(const TEvYdbCompute::TEvStatusTrackerResponse::TPtr& ev) { auto& response = *ev->Get(); if (response.Status != NYdb::EStatus::SUCCESS) { LOG_I("StatusTrackerResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString()); @@ -98,7 +98,7 @@ public: } } - void Handle(const TEvPrivate::TEvResultWriterResponse::TPtr& ev) { + void Handle(const TEvYdbCompute::TEvResultWriterResponse::TPtr& ev) { auto& response = *ev->Get(); if (response.Status != NYdb::EStatus::SUCCESS) { LOG_I("ResultWriterResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString()); @@ -109,7 +109,7 @@ public: Register(ActorFactory->CreateResourcesCleaner(SelfId(), Connector, Params.OperationId).release()); } - void Handle(const TEvPrivate::TEvResourcesCleanerResponse::TPtr& ev) { + void Handle(const TEvYdbCompute::TEvResourcesCleanerResponse::TPtr& ev) { auto& response = *ev->Get(); if (response.Status != NYdb::EStatus::SUCCESS && response.Status != NYdb::EStatus::UNSUPPORTED) { LOG_I("ResourcesCleanerResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString()); @@ -120,7 +120,7 @@ public: Register(ActorFactory->CreateFinalizer(SelfId(), Pinger, ExecStatus).release()); } - void Handle(const TEvPrivate::TEvFinalizerResponse::TPtr ev) { + void Handle(const TEvYdbCompute::TEvFinalizerResponse::TPtr ev) { auto& response = *ev->Get(); if (response.Status != NYdb::EStatus::SUCCESS) { LOG_I("FinalizerResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString()); @@ -139,7 +139,7 @@ public: } } - void Handle(const TEvPrivate::TEvStopperResponse::TPtr& ev) { + void Handle(const TEvYdbCompute::TEvStopperResponse::TPtr& ev) { auto& response = *ev->Get(); if (response.Status != NYdb::EStatus::SUCCESS) { LOG_I("StopperResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString()); diff --git a/ydb/core/fq/libs/config/protos/compute.proto b/ydb/core/fq/libs/config/protos/compute.proto index 90ece7388c7..385120d3ff4 100644 --- a/ydb/core/fq/libs/config/protos/compute.proto +++ b/ydb/core/fq/libs/config/protos/compute.proto @@ -12,9 +12,40 @@ import "ydb/public/api/protos/draft/fq.proto"; message TInPlaceCompute { } +message TComputeDatabaseConfig { + TYdbStorageConfig Connection = 1; + string Tenant = 2; +} + +message TDatabaseMapping { + repeated TComputeDatabaseConfig Common = 1; + map<string, TComputeDatabaseConfig> ScopeToComputeDatabase = 2; +} + +message TYdbComputeControlPlane { + message TSingle { + TYdbStorageConfig Connection = 1; + } + + message TCms { + TDatabaseMapping DatabaseMapping = 1; + } + + message TYdbcp { + TDatabaseMapping DatabaseMapping = 2; + } + + bool Enable = 1; + oneof type { + TSingle Single = 2; + TCms Cms = 3; + TYdbcp Ydbcp = 4; + } +} + message TYdbCompute { bool Enable = 1; - TYdbStorageConfig Connection = 2; + TYdbComputeControlPlane ControlPlane = 2; } enum EComputeType { diff --git a/ydb/core/fq/libs/control_plane_proxy/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/control_plane_proxy/CMakeLists.darwin-x86_64.txt index 1fb66e6dbd0..e8fc9d59713 100644 --- a/ydb/core/fq/libs/control_plane_proxy/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/fq/libs/control_plane_proxy/CMakeLists.darwin-x86_64.txt @@ -18,8 +18,10 @@ target_link_libraries(fq-libs-control_plane_proxy PUBLIC yutil cpp-actors-core ydb-core-base - libs-actors-logging fq-libs-actors + libs-actors-logging + libs-compute-ydb + compute-ydb-control_plane fq-libs-control_plane_config libs-control_plane_proxy-events fq-libs-control_plane_storage diff --git a/ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-aarch64.txt index 8ac43676221..ef702e4856c 100644 --- a/ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-aarch64.txt +++ b/ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-aarch64.txt @@ -19,8 +19,10 @@ target_link_libraries(fq-libs-control_plane_proxy PUBLIC yutil cpp-actors-core ydb-core-base - libs-actors-logging fq-libs-actors + libs-actors-logging + libs-compute-ydb + compute-ydb-control_plane fq-libs-control_plane_config libs-control_plane_proxy-events fq-libs-control_plane_storage diff --git a/ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-x86_64.txt index 8ac43676221..ef702e4856c 100644 --- a/ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-x86_64.txt +++ b/ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-x86_64.txt @@ -19,8 +19,10 @@ target_link_libraries(fq-libs-control_plane_proxy PUBLIC yutil cpp-actors-core ydb-core-base - libs-actors-logging fq-libs-actors + libs-actors-logging + libs-compute-ydb + compute-ydb-control_plane fq-libs-control_plane_config libs-control_plane_proxy-events fq-libs-control_plane_storage diff --git a/ydb/core/fq/libs/control_plane_proxy/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/control_plane_proxy/CMakeLists.windows-x86_64.txt index 1fb66e6dbd0..e8fc9d59713 100644 --- a/ydb/core/fq/libs/control_plane_proxy/CMakeLists.windows-x86_64.txt +++ b/ydb/core/fq/libs/control_plane_proxy/CMakeLists.windows-x86_64.txt @@ -18,8 +18,10 @@ target_link_libraries(fq-libs-control_plane_proxy PUBLIC yutil cpp-actors-core ydb-core-base - libs-actors-logging fq-libs-actors + libs-actors-logging + libs-compute-ydb + compute-ydb-control_plane fq-libs-control_plane_config libs-control_plane_proxy-events fq-libs-control_plane_storage diff --git a/ydb/core/fq/libs/control_plane_proxy/config.cpp b/ydb/core/fq/libs/control_plane_proxy/config.cpp index 0a2a63fe298..37ad16e8365 100644 --- a/ydb/core/fq/libs/control_plane_proxy/config.cpp +++ b/ydb/core/fq/libs/control_plane_proxy/config.cpp @@ -42,8 +42,4 @@ TControlPlaneProxyConfig::TControlPlaneProxyConfig( , ConfigRetryPeriod( GetDuration(Proto.GetConfigRetryPeriod(), TDuration::MilliSeconds(100))) { } -bool TControlPlaneProxyConfig::IsYDBComputeEngineEnabled() const { - return ComputeConfig.HasYdb() && ComputeConfig.GetYdb().GetEnable(); -} - } // NFq diff --git a/ydb/core/fq/libs/control_plane_proxy/config.h b/ydb/core/fq/libs/control_plane_proxy/config.h index 32f09af9e4c..ae98bb989bc 100644 --- a/ydb/core/fq/libs/control_plane_proxy/config.h +++ b/ydb/core/fq/libs/control_plane_proxy/config.h @@ -1,7 +1,8 @@ #pragma once -#include "ydb/core/fq/libs/config/protos/common.pb.h" -#include "ydb/core/fq/libs/config/protos/compute.pb.h" +#include <ydb/core/fq/libs/compute/common/config.h> +#include <ydb/core/fq/libs/config/protos/common.pb.h> +#include <ydb/core/fq/libs/config/protos/compute.pb.h> #include <ydb/core/fq/libs/config/protos/control_plane_proxy.pb.h> #include <util/datetime/base.h> @@ -10,7 +11,7 @@ namespace NFq { struct TControlPlaneProxyConfig { NConfig::TControlPlaneProxyConfig Proto; - NConfig::TComputeConfig ComputeConfig; + TComputeConfig ComputeConfig; NConfig::TCommonConfig CommonConfig; TDuration RequestTimeout; TDuration MetricsTtl; @@ -20,7 +21,6 @@ struct TControlPlaneProxyConfig { const NConfig::TControlPlaneProxyConfig& config, const NConfig::TComputeConfig& computeConfig, const NConfig::TCommonConfig& commonConfig); - bool IsYDBComputeEngineEnabled() const; }; } // NFq diff --git a/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp b/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp index 617a425fff4..c01bf40ac2f 100644 --- a/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp +++ b/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp @@ -5,6 +5,8 @@ #include <ydb/core/fq/libs/actors/logging/log.h> #include <ydb/core/fq/libs/common/cache.h> +#include <ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h> +#include <ydb/core/fq/libs/compute/ydb/events/events.h> #include <ydb/core/fq/libs/control_plane_config/control_plane_config.h> #include <ydb/core/fq/libs/control_plane_storage/control_plane_storage.h> #include <ydb/core/fq/libs/control_plane_storage/events/events.h> @@ -423,19 +425,21 @@ class TCreateConnectionInYDBActor : ui32 Cookie; TDuration RequestTimeout; TInstant StartTime; + TString Scope; TTableClientPtr TableClient; TString ObjectStorageEndpoint; public: TCreateConnectionInYDBActor( const TRequestCommonCountersPtr& counters, - const NConfig::TYdbCompute& ydbComputeConfig, + const NFq::TComputeConfig& computeConfig, const TYqSharedResources::TPtr& yqSharedResources, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory, const TString& objectStorageEndpoint, TActorId sender, TEventRequest event, ui32 cookie, + const TString& scope, TDuration requestTimeout) : Sender(sender) , Counters(counters) @@ -443,8 +447,9 @@ public: , Cookie(cookie) , RequestTimeout(requestTimeout) , StartTime(TInstant::Now()) + , Scope(scope) , TableClient(CreateNewTableClient( - ydbComputeConfig, yqSharedResources, credentialsProviderFactory)) + computeConfig, yqSharedResources, credentialsProviderFactory)) , ObjectStorageEndpoint(objectStorageEndpoint) { } static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_CREATE_CONNECTION_IN_YDB"; @@ -593,14 +598,20 @@ public: } private: - static TTableClientPtr CreateNewTableClient( - const NConfig::TYdbCompute& ydbComputeConfig, + TTableClientPtr CreateNewTableClient( + const NFq::TComputeConfig& computeConfig, const TYqSharedResources::TPtr& yqSharedResources, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory) { - auto tableSettigns = GetClientSettings<NYdb::NTable::TClientSettings>( - ydbComputeConfig.connection(), credentialsProviderFactory); + + NFq::NConfig::TYdbStorageConfig computeConnection = computeConfig.GetConnection(Scope); + computeConnection.set_endpoint(Event->Get()->ComputeDatabase->connection().endpoint()); + computeConnection.set_database(Event->Get()->ComputeDatabase->connection().database()); + computeConnection.set_usessl(Event->Get()->ComputeDatabase->connection().usessl()); + + auto tableSettings = GetClientSettings<NYdb::NTable::TClientSettings>( + computeConnection, credentialsProviderFactory); return std::make_unique<NYdb::NTable::TTableClient>( - yqSharedResources->UserSpaceYdbDriver, tableSettigns); + yqSharedResources->UserSpaceYdbDriver, tableSettings); } }; @@ -655,19 +666,21 @@ class TCreateBindingInYDBActor : TInstant StartTime; TPermissions Permissions; TDuration RequestTimeout; + TString Scope; TTableClientPtr TableClient; TString ConnectionName; public: TCreateBindingInYDBActor( const TRequestCommonCountersPtr& counters, - const NConfig::TYdbCompute& ydbComputeConfig, + const NFq::TComputeConfig& computeConfig, const TYqSharedResources::TPtr& yqSharedResources, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory, TActorId sender, TEventRequest event, ui32 cookie, TPermissions permissions, + const TString& scope, TDuration requestTimeout) : Sender(sender) , Counters(counters) @@ -676,8 +689,9 @@ public: , StartTime(TInstant::Now()) , Permissions(std::move(permissions)) , RequestTimeout(requestTimeout) + , Scope(scope) , TableClient(CreateNewTableClient( - ydbComputeConfig, yqSharedResources, credentialsProviderFactory)) { } + computeConfig, yqSharedResources, credentialsProviderFactory)) { } static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_CREATE_BINDING_IN_YDB"; @@ -710,14 +724,15 @@ public: "Create external table in YDB. Resolving connection id. Actor id: " << SelfId() << " connection_id: " << connectionId); auto event = new TEvControlPlaneStorage::TEvDescribeConnectionRequest( - "yandexcloud://" + Event->Get()->FolderId, + Scope, request, Event->Get()->User, Event->Get()->Token, Event->Get()->CloudId, Permissions, Event->Get()->Quotas, - Event->Get()->TenantInfo); + Event->Get()->TenantInfo, + {}); Send(ControlPlaneStorageServiceActorId(), event); } @@ -955,14 +970,20 @@ public: } private: - static TTableClientPtr CreateNewTableClient( - const NConfig::TYdbCompute& ydbComputeConfig, + TTableClientPtr CreateNewTableClient( + const NFq::TComputeConfig& computeConfig, const TYqSharedResources::TPtr& yqSharedResources, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory) { - auto tableSettigns = GetClientSettings<NYdb::NTable::TClientSettings>( - ydbComputeConfig.connection(), credentialsProviderFactory); + + NFq::NConfig::TYdbStorageConfig computeConnection = computeConfig.GetConnection(Scope); + computeConnection.set_endpoint(Event->Get()->ComputeDatabase->connection().endpoint()); + computeConnection.set_database(Event->Get()->ComputeDatabase->connection().database()); + computeConnection.set_usessl(Event->Get()->ComputeDatabase->connection().usessl()); + + auto tableSettings = GetClientSettings<NYdb::NTable::TClientSettings>( + computeConnection, credentialsProviderFactory); return std::make_unique<NYdb::NTable::TTableClient>( - yqSharedResources->UserSpaceYdbDriver, tableSettigns); + yqSharedResources->UserSpaceYdbDriver, tableSettings); } }; @@ -1061,7 +1082,6 @@ public: NYql::TIssues issues; NYql::TIssue issue = MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, "Resolve folder error"); issues.AddIssue(issue); - Counters->Error->Inc(); const TDuration delta = TInstant::Now() - StartTime; Probe(delta, false, false); Send(Sender, new TResponseProxy(issues, {}), 0, Cookie); @@ -1092,6 +1112,104 @@ private: } }; +template<class TEventRequest, class TResponseProxy> +class TCreateComputeDatabaseActor : public NActors::TActorBootstrapped<TCreateComputeDatabaseActor<TEventRequest, TResponseProxy>> { + using TBase = NActors::TActorBootstrapped<TCreateComputeDatabaseActor<TEventRequest, TResponseProxy>>; + using TBase::SelfId; + using TBase::Send; + using TBase::PassAway; + using TBase::Become; + using TBase::Register; + + ::NFq::TControlPlaneProxyConfig Config; + ::NFq::TComputeConfig ComputeConfig; + TActorId Sender; + TRequestCommonCountersPtr Counters; + TString CloudId; + TString FolderId; + TString Scope; + TString Token; + std::function<void(const TDuration&, bool, bool)> Probe; + TEventRequest Event; + ui32 Cookie; + TInstant StartTime; + +public: + TCreateComputeDatabaseActor(const TRequestCommonCountersPtr& counters, + TActorId sender, const ::NFq::TControlPlaneProxyConfig& config, + const ::NFq::TComputeConfig& computeConfig, const TString& cloudId, + const TString& folderId, const TString& scope, + const std::function<void(const TDuration&, bool, bool)>& probe, + TEventRequest event, ui32 cookie) + : Config(config) + , ComputeConfig(computeConfig) + , Sender(sender) + , Counters(counters) + , CloudId(cloudId) + , FolderId(folderId) + , Scope(scope) + , Probe(probe) + , Event(event) + , Cookie(cookie) + , StartTime(TInstant::Now()) + {} + + static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_CREATE_DATABASE"; + + void Bootstrap() { + CPP_LOG_T("Create database bootstrap. CloudId: " << CloudId << " FolderId: " << FolderId << " Scope: " << Scope << " Actor id: " << SelfId()); + if (!ComputeConfig.YdbComputeControlPlaneEnabled()) { + Event->Get()->ComputeDatabase = FederatedQuery::Internal::ComputeDatabaseInternal{}; + TActivationContext::Send(Event->Forward(ControlPlaneProxyActorId())); + PassAway(); + return; + } + Become(&TCreateComputeDatabaseActor::StateFunc, Config.RequestTimeout, new NActors::TEvents::TEvWakeup()); + Counters->InFly->Inc(); + Send(NFq::ComputeDatabaseControlPlaneServiceActorId(), CreateRequest().release(), 0, 0); + } + + std::unique_ptr<TEvYdbCompute::TEvCreateDatabaseRequest> CreateRequest() { + return std::make_unique<TEvYdbCompute::TEvCreateDatabaseRequest>(CloudId, Scope); + } + + STRICT_STFUNC(StateFunc, + cFunc(NActors::TEvents::TSystem::Wakeup, HandleTimeout); + hFunc(TEvYdbCompute::TEvCreateDatabaseResponse, Handle); + ) + + void HandleTimeout() { + CPP_LOG_D("Create database timeout. CloudId: " << CloudId << " FolderId: " << FolderId << " Scope: " << Scope << " Actor id: " << SelfId()); + NYql::TIssues issues; + NYql::TIssue issue = MakeErrorIssue(TIssuesIds::TIMEOUT, "Create database: request timeout. Try repeating the request later"); + issues.AddIssue(issue); + Counters->Error->Inc(); + Counters->Timeout->Inc(); + const TDuration delta = TInstant::Now() - StartTime; + Probe(delta, false, true); + Send(Sender, new TResponseProxy(issues, {}), 0, Cookie); + PassAway(); + } + + void Handle(TEvYdbCompute::TEvCreateDatabaseResponse::TPtr& ev) { + Counters->InFly->Dec(); + Counters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds()); + if (ev->Get()->Issues) { + Counters->Error->Inc(); + CPP_LOG_E(ev->Get()->Issues.ToOneLineString()); + const TDuration delta = TInstant::Now() - StartTime; + Probe(delta, false, false); + Send(Sender, new TResponseProxy(ev->Get()->Issues, {}), 0, Cookie); + PassAway(); + return; + } + Counters->Ok->Inc(); + Event->Get()->ComputeDatabase = ev->Get()->Result; + TActivationContext::Send(Event->Forward(ControlPlaneProxyActorId())); + PassAway(); + } +}; + template<class TRequestProto, class TRequest, class TResponse, class TResponseProxy> class TRequestActor : public NActors::TActorBootstrapped<TRequestActor<TRequestProto, TRequest, TResponse, TResponseProxy>> { protected: @@ -1119,6 +1237,7 @@ protected: TString SubjectType; const TMaybe<TQuotaMap> Quotas; TTenantInfo::TPtr TenantInfo; + TMaybe<FederatedQuery::Internal::ComputeDatabaseInternal> ComputeDatabase; ui32 RetryCount = 0; public: @@ -1131,7 +1250,8 @@ public: const TRequestCounters& counters, const std::function<void(const TDuration&, bool, bool)>& probe, TPermissions permissions, - const TString& cloudId, const TString& subjectType, TMaybe<TQuotaMap>&& quotas = Nothing()) + const TString& cloudId, const TString& subjectType, TMaybe<TQuotaMap>&& quotas = Nothing(), + TMaybe<FederatedQuery::Internal::ComputeDatabaseInternal>&& computeDatabase = Nothing()) : Config(config) , RequestProto(std::forward<TRequestProto>(requestProto)) , Scope(scope) @@ -1148,6 +1268,7 @@ public: , CloudId(cloudId) , SubjectType(subjectType) , Quotas(std::move(quotas)) + , ComputeDatabase(std::move(computeDatabase)) { Counters.IncInFly(); } @@ -1239,7 +1360,7 @@ public: void SendRequestIfCan() { if (CanSendRequest()) { - Send(ServiceId, new TRequest(Scope, RequestProto, User, Token, CloudId, Permissions, Quotas, TenantInfo), 0, Cookie); + Send(ServiceId, new TRequest(Scope, RequestProto, User, Token, CloudId, Permissions, Quotas, TenantInfo, ComputeDatabase.GetOrElse({})), 0, Cookie); } } @@ -1366,6 +1487,7 @@ class TControlPlaneProxyActor : public NActors::TActorBootstrapped<TControlPlane RTC_RESOLVE_SUBJECT_TYPE, RTC_CREATE_CONNECTION_IN_YDB, RTC_CREATE_BINDING_IN_YDB, + RTC_CREATE_COMPUTE_DATABASE, RTC_MAX, }; @@ -1414,7 +1536,8 @@ class TControlPlaneProxyActor : public NActors::TActorBootstrapped<TControlPlane { MakeIntrusive<TRequestCommonCounters>("DeleteBinding") }, { MakeIntrusive<TRequestCommonCounters>("ResolveSubjectType") }, { MakeIntrusive<TRequestCommonCounters>("CreateConnectionInYDB") }, - { MakeIntrusive<TRequestCommonCounters>("CreateBindingInYDB") } + { MakeIntrusive<TRequestCommonCounters>("CreateBindingInYDB") }, + { MakeIntrusive<TRequestCommonCounters>("CreateComputeDatabase") }, }); TTtlCache<TMetricsScope, TScopeCountersPtr, TMap> ScopeCounters{TTtlCacheSettings{}.SetTtl(TDuration::Days(1))}; @@ -1487,6 +1610,7 @@ class TControlPlaneProxyActor : public NActors::TActorBootstrapped<TControlPlane const TYqSharedResources::TPtr YqSharedResources; const NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory; const bool QuotaManagerEnabled; + NConfig::TComputeConfig ComputeConfig; TActorId AccessService; public: @@ -1647,17 +1771,28 @@ private: return; } + if (!ev->Get()->ComputeDatabase) { + Register(new TCreateComputeDatabaseActor<TEvControlPlaneProxy::TEvCreateQueryRequest::TPtr, + TEvControlPlaneProxy::TEvCreateQueryResponse> + (Counters.GetCommonCounters(RTC_CREATE_COMPUTE_DATABASE), + sender, Config, Config.ComputeConfig, cloudId, + folderId, scope, probe, ev, cookie)); + return; + } + static const TPermissions availablePermissions { TPermissions::TPermission::QUERY_INVOKE | TPermissions::TPermission::MANAGE_PUBLIC }; Register(new TCreateQueryRequestActor - (Config, ev->Sender, ev->Cookie, scope, folderId, + (Config, sender, cookie, scope, folderId, std::move(request), std::move(user), std::move(token), ControlPlaneStorageServiceActorId(), requestCounters, - probe, ExtractPermissions(ev, availablePermissions), cloudId, subjectType, std::move(ev->Get()->Quotas))); + probe, ExtractPermissions(ev, availablePermissions), + cloudId, subjectType, std::move(ev->Get()->Quotas), + std::move(ev->Get()->ComputeDatabase))); } void Handle(TEvControlPlaneProxy::TEvListQueriesRequest::TPtr& ev) { @@ -1908,6 +2043,15 @@ private: return; } + if (!ev->Get()->ComputeDatabase) { + Register(new TCreateComputeDatabaseActor<TEvControlPlaneProxy::TEvModifyQueryRequest::TPtr, + TEvControlPlaneProxy::TEvModifyQueryResponse> + (Counters.GetCommonCounters(RTC_CREATE_COMPUTE_DATABASE), + sender, Config, Config.ComputeConfig, cloudId, + folderId, scope, probe, ev, cookie)); + return; + } + static const TPermissions availablePermissions { TPermissions::TPermission::QUERY_INVOKE | TPermissions::TPermission::MANAGE_PUBLIC @@ -1922,8 +2066,8 @@ private: std::move(request), std::move(user), std::move(token), ControlPlaneStorageServiceActorId(), requestCounters, - probe, - ExtractPermissions(ev, availablePermissions), cloudId, subjectType)); + probe, ExtractPermissions(ev, availablePermissions), + cloudId, subjectType, {}, std::move(ev->Get()->ComputeDatabase))); } void Handle(TEvControlPlaneProxy::TEvDeleteQueryRequest::TPtr& ev) { @@ -2315,20 +2459,30 @@ private: return; } + if (!ev->Get()->ComputeDatabase) { + Register(new TCreateComputeDatabaseActor<TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr, + TEvControlPlaneProxy::TEvCreateConnectionResponse> + (Counters.GetCommonCounters(RTC_CREATE_COMPUTE_DATABASE), + sender, Config, Config.ComputeConfig, cloudId, + folderId, scope, probe, ev, cookie)); + return; + } + static const TPermissions availablePermissions { TPermissions::TPermission::MANAGE_PUBLIC }; - if (Config.IsYDBComputeEngineEnabled() && !ydbOperationWasPerformed) { + if (Config.ComputeConfig.YdbComputeControlPlaneEnabled() && !ydbOperationWasPerformed) { Register(new TCreateConnectionInYDBActor( Counters.GetCommonCounters(RTC_CREATE_CONNECTION_IN_YDB), - Config.ComputeConfig.GetYdb(), + Config.ComputeConfig, YqSharedResources, CredentialsProviderFactory, Config.CommonConfig.GetObjectStorageEndpoint(), sender, ev, cookie, + scope, Config.RequestTimeout)); return; } @@ -2341,7 +2495,8 @@ private: std::move(request), std::move(user), std::move(token), ControlPlaneStorageServiceActorId(), requestCounters, - probe, ExtractPermissions(ev, availablePermissions), cloudId, subjectType)); + probe, ExtractPermissions(ev, availablePermissions), + cloudId, subjectType, {}, std::move(ev->Get()->ComputeDatabase))); } void Handle(TEvControlPlaneProxy::TEvListConnectionsRequest::TPtr& ev) { @@ -2529,6 +2684,15 @@ private: return; } + if (!ev->Get()->ComputeDatabase) { + Register(new TCreateComputeDatabaseActor<TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr, + TEvControlPlaneProxy::TEvModifyConnectionResponse> + (Counters.GetCommonCounters(RTC_CREATE_COMPUTE_DATABASE), + sender, Config, Config.ComputeConfig, cloudId, + folderId, scope, probe, ev, cookie)); + return; + } + static const TPermissions availablePermissions { TPermissions::TPermission::MANAGE_PUBLIC | TPermissions::TPermission::MANAGE_PRIVATE @@ -2543,7 +2707,8 @@ private: ControlPlaneStorageServiceActorId(), requestCounters, probe, - ExtractPermissions(ev, availablePermissions), cloudId, subjectType)); + ExtractPermissions(ev, availablePermissions), + cloudId, subjectType, {}, std::move(ev->Get()->ComputeDatabase))); } void Handle(TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr& ev) { @@ -2595,6 +2760,15 @@ private: return; } + if (!ev->Get()->ComputeDatabase) { + Register(new TCreateComputeDatabaseActor<TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr, + TEvControlPlaneProxy::TEvDeleteConnectionResponse> + (Counters.GetCommonCounters(RTC_CREATE_COMPUTE_DATABASE), + sender, Config, Config.ComputeConfig, cloudId, + folderId, scope, probe, ev, cookie)); + return; + } + static const TPermissions availablePermissions { TPermissions::TPermission::MANAGE_PUBLIC | TPermissions::TPermission::MANAGE_PRIVATE @@ -2609,7 +2783,8 @@ private: ControlPlaneStorageServiceActorId(), requestCounters, probe, - ExtractPermissions(ev, availablePermissions), cloudId, subjectType)); + ExtractPermissions(ev, availablePermissions), + cloudId, subjectType, {}, std::move(ev->Get()->ComputeDatabase))); } void Handle(TEvControlPlaneProxy::TEvTestConnectionRequest::TPtr& ev) { @@ -2728,23 +2903,33 @@ private: return; } + if (!ev->Get()->ComputeDatabase) { + Register(new TCreateComputeDatabaseActor<TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr, + TEvControlPlaneProxy::TEvCreateBindingResponse> + (Counters.GetCommonCounters(RTC_CREATE_COMPUTE_DATABASE), + sender, Config, Config.ComputeConfig, cloudId, + folderId, scope, probe, ev, cookie)); + return; + } + static const TPermissions availablePermissions { TPermissions::TPermission::VIEW_PUBLIC | TPermissions::TPermission::MANAGE_PUBLIC | TPermissions::TPermission::MANAGE_PRIVATE }; - if (Config.IsYDBComputeEngineEnabled() && !ydbOperationWasPerformed) { + if (Config.ComputeConfig.YdbComputeControlPlaneEnabled() && !ydbOperationWasPerformed) { auto permissions = ExtractPermissions(ev, availablePermissions); Register(new TCreateBindingInYDBActor( Counters.GetCommonCounters(RTC_CREATE_BINDING_IN_YDB), - Config.ComputeConfig.GetYdb(), + Config.ComputeConfig, YqSharedResources, CredentialsProviderFactory, sender, ev, cookie, std::move(permissions), + scope, Config.RequestTimeout)); return; } @@ -2757,7 +2942,8 @@ private: std::move(request), std::move(user), std::move(token), ControlPlaneStorageServiceActorId(), requestCounters, - probe, ExtractPermissions(ev, availablePermissions), cloudId, subjectType)); + probe, ExtractPermissions(ev, availablePermissions), + cloudId, subjectType, {}, std::move(ev->Get()->ComputeDatabase))); } void Handle(TEvControlPlaneProxy::TEvListBindingsRequest::TPtr& ev) { @@ -2940,6 +3126,15 @@ private: return; } + if (!ev->Get()->ComputeDatabase) { + Register(new TCreateComputeDatabaseActor<TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr, + TEvControlPlaneProxy::TEvModifyBindingResponse> + (Counters.GetCommonCounters(RTC_CREATE_COMPUTE_DATABASE), + sender, Config, Config.ComputeConfig, cloudId, + folderId, scope, probe, ev, cookie)); + return; + } + static const TPermissions availablePermissions { TPermissions::TPermission::MANAGE_PUBLIC | TPermissions::TPermission::MANAGE_PRIVATE @@ -2954,7 +3149,8 @@ private: ControlPlaneStorageServiceActorId(), requestCounters, probe, - ExtractPermissions(ev, availablePermissions), cloudId, subjectType)); + ExtractPermissions(ev, availablePermissions), + cloudId, subjectType, {}, std::move(ev->Get()->ComputeDatabase))); } void Handle(TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr& ev) { @@ -3006,6 +3202,15 @@ private: return; } + if (!ev->Get()->ComputeDatabase) { + Register(new TCreateComputeDatabaseActor<TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr, + TEvControlPlaneProxy::TEvDeleteBindingResponse> + (Counters.GetCommonCounters(RTC_CREATE_COMPUTE_DATABASE), + sender, Config, Config.ComputeConfig, cloudId, + folderId, scope, probe, ev, cookie)); + return; + } + static const TPermissions availablePermissions { TPermissions::TPermission::MANAGE_PUBLIC | TPermissions::TPermission::MANAGE_PRIVATE @@ -3020,7 +3225,8 @@ private: ControlPlaneStorageServiceActorId(), requestCounters, probe, - ExtractPermissions(ev, availablePermissions), cloudId, subjectType)); + ExtractPermissions(ev, availablePermissions), + cloudId, subjectType, {}, std::move(ev->Get()->ComputeDatabase))); } void Handle(NMon::TEvHttpInfo::TPtr& ev) { diff --git a/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.h b/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.h index 128f97aa9fb..f624ae93952 100644 --- a/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.h +++ b/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.h @@ -1,7 +1,7 @@ #pragma once #include <ydb/core/fq/libs/actors/logging/log.h> -#include "ydb/core/fq/libs/config/protos/compute.pb.h" +#include <ydb/core/fq/libs/config/protos/compute.pb.h> #include <ydb/core/fq/libs/config/protos/control_plane_proxy.pb.h> #include <ydb/library/security/ydb_credentials_provider_factory.h> #include <ydb/core/fq/libs/shared_resources/shared_resources.h> diff --git a/ydb/core/fq/libs/control_plane_proxy/events/events.h b/ydb/core/fq/libs/control_plane_proxy/events/events.h index e01117000d7..e027245b724 100644 --- a/ydb/core/fq/libs/control_plane_proxy/events/events.h +++ b/ydb/core/fq/libs/control_plane_proxy/events/events.h @@ -93,6 +93,7 @@ struct TEvControlPlaneProxy { TTenantInfo::TPtr TenantInfo; TString SubjectType; bool ComputeYDBOperationWasPerformed; + TMaybe<FederatedQuery::Internal::ComputeDatabaseInternal> ComputeDatabase; }; template<typename TDerived, typename ProtoMessage, ui32 EventType> diff --git a/ydb/core/fq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp b/ydb/core/fq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp index d1c22bb089f..f0aeea941be 100644 --- a/ydb/core/fq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp +++ b/ydb/core/fq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp @@ -380,7 +380,6 @@ private: { TRuntimePtr runtime(new TTestBasicRuntime()); runtime->SetLogPriority(NKikimrServices::STREAMS_CONTROL_PLANE_SERVICE, NLog::PRI_DEBUG); - auto controlPlaneProxy = CreateControlPlaneProxyActor( Config, ComputeConfig, diff --git a/ydb/core/fq/libs/control_plane_proxy/ya.make b/ydb/core/fq/libs/control_plane_proxy/ya.make index 980fde857fc..264f857556a 100644 --- a/ydb/core/fq/libs/control_plane_proxy/ya.make +++ b/ydb/core/fq/libs/control_plane_proxy/ya.make @@ -9,8 +9,10 @@ SRCS( PEERDIR( library/cpp/actors/core ydb/core/base - ydb/core/fq/libs/actors/logging ydb/core/fq/libs/actors + ydb/core/fq/libs/actors/logging + ydb/core/fq/libs/compute/ydb + ydb/core/fq/libs/compute/ydb/control_plane ydb/core/fq/libs/control_plane_config ydb/core/fq/libs/control_plane_proxy/events ydb/core/fq/libs/control_plane_storage diff --git a/ydb/core/fq/libs/control_plane_storage/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/control_plane_storage/CMakeLists.darwin-x86_64.txt index 2d7c639ac17..be2bb947e2e 100644 --- a/ydb/core/fq/libs/control_plane_storage/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/fq/libs/control_plane_storage/CMakeLists.darwin-x86_64.txt @@ -51,6 +51,7 @@ target_sources(fq-libs-control_plane_storage PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/validators.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_bindings.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_compute_database.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_connections.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp diff --git a/ydb/core/fq/libs/control_plane_storage/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/control_plane_storage/CMakeLists.linux-aarch64.txt index 02563d00a11..a1e8304a36d 100644 --- a/ydb/core/fq/libs/control_plane_storage/CMakeLists.linux-aarch64.txt +++ b/ydb/core/fq/libs/control_plane_storage/CMakeLists.linux-aarch64.txt @@ -52,6 +52,7 @@ target_sources(fq-libs-control_plane_storage PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/validators.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_bindings.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_compute_database.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_connections.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp diff --git a/ydb/core/fq/libs/control_plane_storage/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/control_plane_storage/CMakeLists.linux-x86_64.txt index 02563d00a11..a1e8304a36d 100644 --- a/ydb/core/fq/libs/control_plane_storage/CMakeLists.linux-x86_64.txt +++ b/ydb/core/fq/libs/control_plane_storage/CMakeLists.linux-x86_64.txt @@ -52,6 +52,7 @@ target_sources(fq-libs-control_plane_storage PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/validators.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_bindings.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_compute_database.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_connections.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp diff --git a/ydb/core/fq/libs/control_plane_storage/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/control_plane_storage/CMakeLists.windows-x86_64.txt index 2d7c639ac17..be2bb947e2e 100644 --- a/ydb/core/fq/libs/control_plane_storage/CMakeLists.windows-x86_64.txt +++ b/ydb/core/fq/libs/control_plane_storage/CMakeLists.windows-x86_64.txt @@ -51,6 +51,7 @@ target_sources(fq-libs-control_plane_storage PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/validators.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_bindings.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_compute_database.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_connections.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp diff --git a/ydb/core/fq/libs/control_plane_storage/events/events.h b/ydb/core/fq/libs/control_plane_storage/events/events.h index 1a6b902e791..bc3f49e3eb2 100644 --- a/ydb/core/fq/libs/control_plane_storage/events/events.h +++ b/ydb/core/fq/libs/control_plane_storage/events/events.h @@ -164,6 +164,10 @@ struct TEvControlPlaneStorage { EvDeleteRateLimiterResourceRequest, EvDeleteRateLimiterResourceResponse, EvDbRequestResult, // private // internal_events.h + EvCreateDatabaseRequest, + EvCreateDatabaseResponse, + EvDescribeDatabaseRequest, + EvDescribeDatabaseResponse, EvEnd, }; @@ -180,7 +184,8 @@ struct TEvControlPlaneStorage { const TString& cloudId, TPermissions permissions, TMaybe<TQuotaMap> quotas, - TTenantInfo::TPtr tenantInfo) + TTenantInfo::TPtr tenantInfo, + const FederatedQuery::Internal::ComputeDatabaseInternal& computeDatabase) : Scope(scope) , Request(request) , User(user) @@ -189,6 +194,7 @@ struct TEvControlPlaneStorage { , Permissions(permissions) , Quotas(std::move(quotas)) , TenantInfo(tenantInfo) + , ComputeDatabase(computeDatabase) { } @@ -209,6 +215,7 @@ struct TEvControlPlaneStorage { TPermissions Permissions; TMaybe<TQuotaMap> Quotas; TTenantInfo::TPtr TenantInfo; + FederatedQuery::Internal::ComputeDatabaseInternal ComputeDatabase; }; template<typename TDerived, typename ProtoMessage, ui32 EventType> @@ -612,6 +619,92 @@ struct TEvControlPlaneStorage { NYql::TIssues Issues; TDebugInfoPtr DebugInfo; }; + + struct TEvCreateDatabaseRequest : NActors::TEventLocal<TEvCreateDatabaseRequest, EvCreateDatabaseRequest> { + TEvCreateDatabaseRequest() = default; + + explicit TEvCreateDatabaseRequest(const TString& cloudId, const TString& scope, const FederatedQuery::Internal::ComputeDatabaseInternal& record) + : CloudId(cloudId) + , Scope(scope) + , Record(record) + {} + + size_t GetByteSize() const { + return sizeof(*this) + + Scope.Size(); + } + + TString CloudId; + TString Scope; + FederatedQuery::Internal::ComputeDatabaseInternal Record; + }; + + struct TEvCreateDatabaseResponse : NActors::TEventLocal<TEvCreateDatabaseResponse, EvCreateDatabaseResponse> { + static constexpr bool Auditable = false; + + explicit TEvCreateDatabaseResponse() + {} + + explicit TEvCreateDatabaseResponse( + const NYql::TIssues& issues + ) + : Issues(issues) + {} + + size_t GetByteSize() const { + return sizeof(*this) + + GetIssuesByteSize(Issues) + + GetDebugInfoByteSize(DebugInfo); + } + + NYql::TIssues Issues; + TDebugInfoPtr DebugInfo; + }; + + struct TEvDescribeDatabaseRequest : NActors::TEventLocal<TEvDescribeDatabaseRequest, EvDescribeDatabaseRequest> { + + TEvDescribeDatabaseRequest() = default; + + explicit TEvDescribeDatabaseRequest(const TString& cloudId, const TString& scope) + : CloudId(cloudId) + , Scope(scope) + {} + + size_t GetByteSize() const { + return sizeof(*this) + + Scope.Size(); + } + + google::protobuf::Empty Request; + TString CloudId; + TString Scope; + }; + + struct TEvDescribeDatabaseResponse : NActors::TEventLocal<TEvDescribeDatabaseResponse, EvDescribeDatabaseResponse> { + static constexpr bool Auditable = false; + + explicit TEvDescribeDatabaseResponse( + const FederatedQuery::Internal::ComputeDatabaseInternal& record) + : Record(record) + {} + + explicit TEvDescribeDatabaseResponse( + const NYql::TIssues& issues + ) + : Issues(issues) + {} + + size_t GetByteSize() const { + return sizeof(*this) + + Record.ByteSizeLong() + + GetIssuesByteSize(Issues) + + GetDebugInfoByteSize(DebugInfo); + } + + FederatedQuery::Internal::ComputeDatabaseInternal Record; + NYql::TIssues Issues; + TDebugInfoPtr DebugInfo; + }; }; } diff --git a/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp b/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp index a86d817e87d..cda14c2a5c4 100644 --- a/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp +++ b/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp @@ -7,6 +7,8 @@ #include <ydb/core/fq/libs/control_plane_storage/schema.h> #include <ydb/core/fq/libs/db_schema/db_schema.h> +#include <ydb/public/lib/fq/scope.h> + #include <library/cpp/protobuf/interop/cast.h> namespace NFq { @@ -494,6 +496,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ newTask->set_execution_id(task.Internal.execution_id()); newTask->set_operation_id(task.Internal.operation_id()); + *newTask->mutable_compute_connection() = task.Internal.compute_connection(); } return result; diff --git a/ydb/core/fq/libs/control_plane_storage/probes.h b/ydb/core/fq/libs/control_plane_storage/probes.h index 92a4dceb2c6..c0f1881cc2b 100644 --- a/ydb/core/fq/libs/control_plane_storage/probes.h +++ b/ydb/core/fq/libs/control_plane_storage/probes.h @@ -107,6 +107,14 @@ GROUPS(), \ TYPES(TString, TDuration, bool), \ NAMES("queryId", "latencyMs", "success")) \ + PROBE(CreateDatabaseRequest, \ + GROUPS(), \ + TYPES(TString, TString, TDuration, i64, bool), \ + NAMES("scope", "user", "latencyMs", "size", "success")) \ + PROBE(DescribeDatabaseRequest, \ + GROUPS(), \ + TYPES(TString, TString, TDuration, i64, bool), \ + NAMES("scope", "user", "latencyMs", "size", "success")) \ // YQ_CONTROL_PLANE_STORAGE_PROVIDER diff --git a/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto b/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto index ff7d943d412..5d30311219c 100644 --- a/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto +++ b/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto @@ -5,12 +5,15 @@ package FederatedQuery.Internal; option java_package = "com.yandex.query.internal"; option java_outer_classname = "YandexQueryInternalProtos"; -import "ydb/library/yql/providers/dq/api/protos/service.proto"; -import "ydb/library/yql/dq/actors/protos/dq_status_codes.proto"; -import "ydb/library/yql/dq/proto/dq_tasks.proto"; -import "ydb/public/api/protos/ydb_issue_message.proto"; +import "ydb/core/fq/libs/config/protos/storage.proto"; import "ydb/core/fq/libs/protos/fq_private.proto"; + import "ydb/public/api/protos/draft/fq.proto"; +import "ydb/public/api/protos/ydb_issue_message.proto"; + +import "ydb/library/yql/dq/actors/protos/dq_status_codes.proto"; +import "ydb/library/yql/dq/proto/dq_tasks.proto"; +import "ydb/library/yql/providers/dq/api/protos/service.proto"; import "google/protobuf/duration.proto"; @@ -44,6 +47,7 @@ message QueryInternal { NYql.NDqProto.StatusIds.StatusCode status_code = 23; string operation_id = 24; string execution_id = 25; + NFq.NConfig.TYdbStorageConfig compute_connection = 26; } message JobInternal { @@ -59,3 +63,8 @@ message ConnectionInternal { message BindingInternal { string cloud_id = 2; } + +message ComputeDatabaseInternal { + string id = 1; + NFq.NConfig.TYdbStorageConfig connection = 2; +} diff --git a/ydb/core/fq/libs/control_plane_storage/schema.h b/ydb/core/fq/libs/control_plane_storage/schema.h index 3d7e541f7d9..5cbcc6b7b7a 100644 --- a/ydb/core/fq/libs/control_plane_storage/schema.h +++ b/ydb/core/fq/libs/control_plane_storage/schema.h @@ -15,6 +15,7 @@ namespace NFq { #define TENANTS_TABLE_NAME "tenants" #define TENANT_ACKS_TABLE_NAME "tenant_acks" #define MAPPINGS_TABLE_NAME "mappings" +#define COMPUTE_DATABASES_TABLE_NAME "compute_databases" // columns #define SCOPE_COLUMN_NAME "scope" diff --git a/ydb/core/fq/libs/control_plane_storage/ya.make b/ydb/core/fq/libs/control_plane_storage/ya.make index 8912d9e9ccf..269be37204c 100644 --- a/ydb/core/fq/libs/control_plane_storage/ya.make +++ b/ydb/core/fq/libs/control_plane_storage/ya.make @@ -10,6 +10,7 @@ SRCS( validators.cpp ydb_control_plane_storage.cpp ydb_control_plane_storage_bindings.cpp + ydb_control_plane_storage_compute_database.cpp ydb_control_plane_storage_connections.cpp ydb_control_plane_storage_queries.cpp ydb_control_plane_storage_quotas.cpp diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp index b5e4636e130..2a9aa170e66 100644 --- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp +++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp @@ -53,6 +53,7 @@ void TYdbControlPlaneStorageActor::Bootstrap() { CreateTenantsTable(); CreateTenantAcksTable(); CreateMappingsTable(); + CreateComputeDatabasesTable(); Become(&TThis::StateFunc); } @@ -304,6 +305,18 @@ void TYdbControlPlaneStorageActor::CreateMappingsTable() RunCreateTableActor(tablePath, TTableDescription(description)); } +void TYdbControlPlaneStorageActor::CreateComputeDatabasesTable() +{ + auto tablePath = JoinPath(YdbConnection->TablePathPrefix, COMPUTE_DATABASES_TABLE_NAME); + auto description = TTableBuilder() + .AddNullableColumn(SCOPE_COLUMN_NAME, EPrimitiveType::String) + .AddNullableColumn(INTERNAL_COLUMN_NAME, EPrimitiveType::String) + .SetPrimaryKeyColumns({SCOPE_COLUMN_NAME}) + .Build(); + + RunCreateTableActor(tablePath, TTableDescription(description)); +} + void TYdbControlPlaneStorageActor::AfterTablesCreated() { // Schedule(TDuration::Zero(), new NActors::TEvents::TEvWakeup()); } diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_compute_database.cpp b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_compute_database.cpp new file mode 100644 index 00000000000..6b0d32b32fd --- /dev/null +++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_compute_database.cpp @@ -0,0 +1,118 @@ +#include "validators.h" +#include "ydb_control_plane_storage_impl.h" + +#include <util/string/join.h> + +#include <ydb/public/api/protos/draft/fq.pb.h> + +#include <ydb/core/fq/libs/config/protos/issue_id.pb.h> +#include <ydb/core/fq/libs/db_schema/db_schema.h> + +namespace NFq { + +void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateDatabaseRequest::TPtr& ev) +{ + TInstant startTime = TInstant::Now(); + const TEvControlPlaneStorage::TEvCreateDatabaseRequest& event = *ev->Get(); + const TString cloudId = event.CloudId; + const TString scope = event.Scope; + TRequestCounters requestCounters = Counters.GetCounters(cloudId, scope, RTS_CREATE_DATABASE, RTC_CREATE_DATABASE); + requestCounters.IncInFly(); + requestCounters.Common->RequestBytes->Add(event.GetByteSize()); + const FederatedQuery::Internal::ComputeDatabaseInternal& request = event.Record; + const int byteSize = request.ByteSize(); + + CPS_LOG_T(MakeLogPrefix(scope, "internal", request.id()) + << "CreateDatabaseRequest: " + << request.DebugString()); + + TSqlQueryBuilder queryBuilder(YdbConnection->TablePathPrefix, "CreateDatabase"); + queryBuilder.AddString("scope", scope); + queryBuilder.AddString("internal", request.SerializeAsString()); + + queryBuilder.AddText( + "INSERT INTO `" COMPUTE_DATABASES_TABLE_NAME "` (`" SCOPE_COLUMN_NAME "`, `" INTERNAL_COLUMN_NAME "`) VALUES\n" + " ($scope, $internal);" + ); + + const auto query = queryBuilder.Build(); + auto debugInfo = Config->Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; + TAsyncStatus result = Write(query.Sql, query.Params, requestCounters, debugInfo); + auto prepare = [] { return std::make_tuple<NYql::TIssues>(NYql::TIssues{}); }; + auto success = SendResponseTuple<TEvControlPlaneStorage::TEvCreateDatabaseResponse, std::tuple<NYql::TIssues>>( + MakeLogPrefix(scope, "internal", request.id()) + "CreateDatabaseRequest", + NActors::TActivationContext::ActorSystem(), + result, + SelfId(), + ev, + startTime, + requestCounters, + prepare, + debugInfo); + + success.Apply([=](const auto& future) { + TDuration delta = TInstant::Now() - startTime; + LWPROBE(CreateDatabaseRequest, scope, "internal", delta, byteSize, future.GetValue()); + }); +} + +void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDescribeDatabaseRequest::TPtr& ev) +{ + TInstant startTime = TInstant::Now(); + const TEvControlPlaneStorage::TEvDescribeDatabaseRequest& event = *ev->Get(); + const TString cloudId = event.CloudId; + const TString scope = event.Scope; + TRequestCounters requestCounters = Counters.GetCounters(cloudId, scope, RTS_DESCRIBE_DATABASE, RTC_DESCRIBE_DATABASE); + requestCounters.IncInFly(); + requestCounters.Common->RequestBytes->Add(event.GetByteSize()); + const auto byteSize = event.GetByteSize(); + + CPS_LOG_T(MakeLogPrefix(scope, "internal", scope) + << "DescribeDatabaseRequest"); + + TSqlQueryBuilder queryBuilder(YdbConnection->TablePathPrefix, "DescribeDatabase"); + queryBuilder.AddString("scope", scope); + queryBuilder.AddText( + "SELECT `" INTERNAL_COLUMN_NAME "` FROM `" COMPUTE_DATABASES_TABLE_NAME "`\n" + "WHERE `" SCOPE_COLUMN_NAME "` = $scope;" + ); + + const auto query = queryBuilder.Build(); + auto debugInfo = Config->Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; + auto [result, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo); + auto prepare = [=, resultSets=resultSets] { + if (resultSets->size() != 1) { + ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 1 but equal " << resultSets->size() << ". Please contact internal support"; + } + + TResultSetParser parser(resultSets->front()); + if (!parser.TryNextRow()) { + ythrow TCodeLineException(TIssuesIds::ACCESS_DENIED) << "Database does not exist or permission denied. Please check the id database or your access rights"; + } + + FederatedQuery::Internal::ComputeDatabaseInternal result; + if (!result.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) { + ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for internal compute database. Please contact internal support"; + } + + return result; + }; + + auto success = SendResponse<TEvControlPlaneStorage::TEvDescribeDatabaseResponse, FederatedQuery::Internal::ComputeDatabaseInternal>( + MakeLogPrefix(scope, "internal", scope) + "DescribeDatabaseRequest", + NActors::TActivationContext::ActorSystem(), + result, + SelfId(), + ev, + startTime, + requestCounters, + prepare, + debugInfo); + + success.Apply([=](const auto& future) { + TDuration delta = TInstant::Now() - startTime; + LWPROBE(DescribeDatabaseRequest, scope, "internal", delta, byteSize, future.GetValue()); + }); +} + +} // NFq diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_impl.h b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_impl.h index f8f9ad6e46a..408413ee902 100644 --- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_impl.h +++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_impl.h @@ -381,6 +381,8 @@ class TYdbControlPlaneStorageActor : public NActors::TActorBootstrapped<TYdbCont RTS_MODIFY_BINDING, RTS_DELETE_BINDING, RTS_PING_TASK, + RTS_CREATE_DATABASE, + RTS_DESCRIBE_DATABASE, RTS_MAX, }; @@ -405,7 +407,9 @@ class TYdbControlPlaneStorageActor : public NActors::TActorBootstrapped<TYdbCont "DescribeBinding", "ModifyBinding", "DeleteBinding", - "PingTask" + "PingTask", + "CreateDatabase", + "DescribeDatabase" }; enum ERequestTypeCommon { @@ -436,6 +440,8 @@ class TYdbControlPlaneStorageActor : public NActors::TActorBootstrapped<TYdbCont RTC_MODIFY_BINDING, RTC_DELETE_BINDING, RTC_PING_TASK, + RTC_CREATE_DATABASE, + RTC_DESCRIBE_DATABASE, RTC_MAX, }; @@ -487,6 +493,8 @@ class TYdbControlPlaneStorageActor : public NActors::TActorBootstrapped<TYdbCont { MakeIntrusive<TRequestCommonCounters>("ModifyBinding") }, { MakeIntrusive<TRequestCommonCounters>("DeleteBinding") }, { MakeIntrusive<TRequestCommonCounters>("PingTask") }, + { MakeIntrusive<TRequestCommonCounters>("CreateDatabase") }, + { MakeIntrusive<TRequestCommonCounters>("DescribeDatabase") } }); TTtlCache<TMetricsScope, TScopeCountersPtr, TMap> ScopeCounters{TTtlCacheSettings{}.SetTtl(TDuration::Days(1))}; @@ -621,6 +629,8 @@ public: hFunc(TEvQuotaService::TQuotaLimitChangeRequest, Handle); hFunc(TEvents::TEvCallback, [](TEvents::TEvCallback::TPtr& ev) { ev->Get()->Callback(); } ); hFunc(TEvents::TEvSchemaCreated, Handle); + hFunc(TEvControlPlaneStorage::TEvCreateDatabaseRequest, Handle); + hFunc(TEvControlPlaneStorage::TEvDescribeDatabaseRequest, Handle); ) void Handle(TEvControlPlaneStorage::TEvCreateQueryRequest::TPtr& ev); @@ -659,6 +669,9 @@ public: void Handle(TEvQuotaService::TQuotaUsageRequest::TPtr& ev); void Handle(TEvQuotaService::TQuotaLimitChangeRequest::TPtr& ev); + void Handle(TEvControlPlaneStorage::TEvCreateDatabaseRequest::TPtr& ev); + void Handle(TEvControlPlaneStorage::TEvDescribeDatabaseRequest::TPtr& ev); + template <class TEventPtr, class TRequestActor, ERequestTypeCommon requestType> void HandleRateLimiterImpl(TEventPtr& ev); @@ -690,6 +703,7 @@ public: void CreateTenantsTable(); void CreateTenantAcksTable(); void CreateMappingsTable(); + void CreateComputeDatabasesTable(); void RunCreateTableActor(const TString& path, NYdb::NTable::TTableDescription desc); void AfterTablesCreated(); diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp index 5a8fe10221a..9c8764e5df8 100644 --- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp +++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp @@ -52,6 +52,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery const TEvControlPlaneStorage::TEvCreateQueryRequest& event = *ev->Get(); const TString cloudId = event.CloudId; const FederatedQuery::CreateQueryRequest& request = event.Request; + const FederatedQuery::Internal::ComputeDatabaseInternal& computeDatabase = event.ComputeDatabase; ui64 resultLimit = 0; if (event.Quotas) { if (auto it = event.Quotas->find(QUOTA_QUERY_RESULT_LIMIT); it != event.Quotas->end()) { @@ -212,7 +213,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery if (request.execute_mode() != FederatedQuery::SAVE) { // TODO: move to run actor priority selection - + *queryInternal.mutable_compute_connection() = computeDatabase.connection(); TSet<TString> disabledConnections; for (const auto& connection: GetEntities<FederatedQuery::Connection>(resultSets[resultSets.size() - 2], CONNECTION_COLUMN_NAME)) { if (!Config->AvailableConnections.contains(connection.content().setting().connection_case())) { @@ -735,6 +736,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyQuery permissions.SetAll(); } FederatedQuery::ModifyQueryRequest& request = event.Request; + FederatedQuery::Internal::ComputeDatabaseInternal& computeDatabase = event.ComputeDatabase; const TString queryId = request.query_id(); const int byteSize = request.ByteSize(); const int64_t previousRevision = request.previous_revision(); @@ -873,6 +875,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyQuery internal.clear_binding(); internal.clear_connection(); internal.clear_resources(); + *internal.mutable_compute_connection() = computeDatabase.connection(); // TODO: move to run actor priority selection TSet<TString> disabledConnections; diff --git a/ydb/core/fq/libs/events/event_subspace.h b/ydb/core/fq/libs/events/event_subspace.h index 2a9f2117dee..b7c8f34a54a 100644 --- a/ydb/core/fq/libs/events/event_subspace.h +++ b/ydb/core/fq/libs/events/event_subspace.h @@ -30,6 +30,7 @@ struct TYqEventSubspace { QuotaService, RateLimiter, ControlPlaneConfig, + YdbCompute, SubspacesEnd, }; diff --git a/ydb/core/fq/libs/init/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/init/CMakeLists.darwin-x86_64.txt index 6972b937899..588923a63f0 100644 --- a/ydb/core/fq/libs/init/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/fq/libs/init/CMakeLists.darwin-x86_64.txt @@ -23,6 +23,7 @@ target_link_libraries(fq-libs-init PUBLIC fq-libs-checkpointing fq-libs-cloud_audit fq-libs-common + compute-ydb-control_plane fq-libs-control_plane_config fq-libs-control_plane_proxy fq-libs-control_plane_storage diff --git a/ydb/core/fq/libs/init/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/init/CMakeLists.linux-aarch64.txt index edfaf53c423..8bbcde4f247 100644 --- a/ydb/core/fq/libs/init/CMakeLists.linux-aarch64.txt +++ b/ydb/core/fq/libs/init/CMakeLists.linux-aarch64.txt @@ -24,6 +24,7 @@ target_link_libraries(fq-libs-init PUBLIC fq-libs-checkpointing fq-libs-cloud_audit fq-libs-common + compute-ydb-control_plane fq-libs-control_plane_config fq-libs-control_plane_proxy fq-libs-control_plane_storage diff --git a/ydb/core/fq/libs/init/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/init/CMakeLists.linux-x86_64.txt index edfaf53c423..8bbcde4f247 100644 --- a/ydb/core/fq/libs/init/CMakeLists.linux-x86_64.txt +++ b/ydb/core/fq/libs/init/CMakeLists.linux-x86_64.txt @@ -24,6 +24,7 @@ target_link_libraries(fq-libs-init PUBLIC fq-libs-checkpointing fq-libs-cloud_audit fq-libs-common + compute-ydb-control_plane fq-libs-control_plane_config fq-libs-control_plane_proxy fq-libs-control_plane_storage diff --git a/ydb/core/fq/libs/init/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/init/CMakeLists.windows-x86_64.txt index 6972b937899..588923a63f0 100644 --- a/ydb/core/fq/libs/init/CMakeLists.windows-x86_64.txt +++ b/ydb/core/fq/libs/init/CMakeLists.windows-x86_64.txt @@ -23,6 +23,7 @@ target_link_libraries(fq-libs-init PUBLIC fq-libs-checkpointing fq-libs-cloud_audit fq-libs-common + compute-ydb-control_plane fq-libs-control_plane_config fq-libs-control_plane_proxy fq-libs-control_plane_storage diff --git a/ydb/core/fq/libs/init/init.cpp b/ydb/core/fq/libs/init/init.cpp index 20a61112756..ee21d26d4a0 100644 --- a/ydb/core/fq/libs/init/init.cpp +++ b/ydb/core/fq/libs/init/init.cpp @@ -1,15 +1,14 @@ #include "init.h" -#include <ydb/core/fq/libs/control_plane_storage/control_plane_storage.h> -#include <ydb/core/fq/libs/test_connection/test_connection.h> - #include <ydb/core/fq/libs/audit/yq_audit_service.h> #include <ydb/core/fq/libs/checkpoint_storage/storage_service.h> +#include <ydb/core/fq/libs/checkpoint_storage/storage_service.h> #include <ydb/core/fq/libs/cloud_audit/yq_cloud_audit_service.h> +#include <ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h> #include <ydb/core/fq/libs/control_plane_config/control_plane_config.h> #include <ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.h> +#include <ydb/core/fq/libs/control_plane_storage/control_plane_storage.h> #include <ydb/core/fq/libs/health/health.h> -#include <ydb/core/fq/libs/checkpoint_storage/storage_service.h> #include <ydb/core/fq/libs/private_client/internal_service.h> #include <ydb/core/fq/libs/private_client/loopback_service.h> #include <ydb/core/fq/libs/quota_manager/quota_manager.h> @@ -19,6 +18,8 @@ #include <ydb/core/fq/libs/rate_limiter/events/data_plane.h> #include <ydb/core/fq/libs/rate_limiter/quoter_service/quoter_service.h> #include <ydb/core/fq/libs/shared_resources/shared_resources.h> +#include <ydb/core/fq/libs/test_connection/test_connection.h> + #include <ydb/library/folder_service/folder_service.h> #include <ydb/library/yql/providers/common/metrics/service_counters.h> @@ -107,6 +108,11 @@ void Init( actorRegistrator(NFq::ControlPlaneProxyActorId(), controlPlaneProxy); } + if (protoConfig.GetCompute().GetYdb().GetEnable() && protoConfig.GetCompute().GetYdb().GetControlPlane().GetEnable()) { + auto computeDatabaseService = NFq::CreateComputeDatabaseControlPlaneServiceActor(protoConfig.GetCompute(), NKikimr::CreateYdbCredentialsProviderFactory); + actorRegistrator(NFq::ComputeDatabaseControlPlaneServiceActorId(), computeDatabaseService.release()); + } + if (protoConfig.GetRateLimiter().GetControlPlaneEnabled()) { Y_VERIFY(protoConfig.GetQuotasManager().GetEnabled()); // Rate limiter resources want to know CPU quota on creation NActors::IActor* rateLimiterService = NFq::CreateRateLimiterControlPlaneService(protoConfig.GetRateLimiter(), yqSharedResources, NKikimr::CreateYdbCredentialsProviderFactory); diff --git a/ydb/core/fq/libs/init/ya.make b/ydb/core/fq/libs/init/ya.make index 9693c2038f1..3278a3f60c0 100644 --- a/ydb/core/fq/libs/init/ya.make +++ b/ydb/core/fq/libs/init/ya.make @@ -14,6 +14,7 @@ PEERDIR( ydb/core/fq/libs/checkpointing ydb/core/fq/libs/cloud_audit ydb/core/fq/libs/common + ydb/core/fq/libs/compute/ydb/control_plane ydb/core/fq/libs/control_plane_config ydb/core/fq/libs/control_plane_proxy ydb/core/fq/libs/control_plane_storage diff --git a/ydb/core/fq/libs/protos/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/protos/CMakeLists.darwin-x86_64.txt index 3cd22906186..7e39a3f60b1 100644 --- a/ydb/core/fq/libs/protos/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/fq/libs/protos/CMakeLists.darwin-x86_64.txt @@ -35,8 +35,9 @@ add_library(fq-libs-protos) target_link_libraries(fq-libs-protos PUBLIC contrib-libs-cxxsupp yutil - api-protos + libs-config-protos dq-actors-protos + api-protos contrib-libs-protobuf ) target_proto_messages(fq-libs-protos PRIVATE diff --git a/ydb/core/fq/libs/protos/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/protos/CMakeLists.linux-aarch64.txt index 7a522343447..4ebf71e3764 100644 --- a/ydb/core/fq/libs/protos/CMakeLists.linux-aarch64.txt +++ b/ydb/core/fq/libs/protos/CMakeLists.linux-aarch64.txt @@ -36,8 +36,9 @@ target_link_libraries(fq-libs-protos PUBLIC contrib-libs-linux-headers contrib-libs-cxxsupp yutil - api-protos + libs-config-protos dq-actors-protos + api-protos contrib-libs-protobuf ) target_proto_messages(fq-libs-protos PRIVATE diff --git a/ydb/core/fq/libs/protos/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/protos/CMakeLists.linux-x86_64.txt index 7a522343447..4ebf71e3764 100644 --- a/ydb/core/fq/libs/protos/CMakeLists.linux-x86_64.txt +++ b/ydb/core/fq/libs/protos/CMakeLists.linux-x86_64.txt @@ -36,8 +36,9 @@ target_link_libraries(fq-libs-protos PUBLIC contrib-libs-linux-headers contrib-libs-cxxsupp yutil - api-protos + libs-config-protos dq-actors-protos + api-protos contrib-libs-protobuf ) target_proto_messages(fq-libs-protos PRIVATE diff --git a/ydb/core/fq/libs/protos/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/protos/CMakeLists.windows-x86_64.txt index 3cd22906186..7e39a3f60b1 100644 --- a/ydb/core/fq/libs/protos/CMakeLists.windows-x86_64.txt +++ b/ydb/core/fq/libs/protos/CMakeLists.windows-x86_64.txt @@ -35,8 +35,9 @@ add_library(fq-libs-protos) target_link_libraries(fq-libs-protos PUBLIC contrib-libs-cxxsupp yutil - api-protos + libs-config-protos dq-actors-protos + api-protos contrib-libs-protobuf ) target_proto_messages(fq-libs-protos PRIVATE diff --git a/ydb/core/fq/libs/protos/fq_private.proto b/ydb/core/fq/libs/protos/fq_private.proto index 9320cdaf024..b1e9003e876 100644 --- a/ydb/core/fq/libs/protos/fq_private.proto +++ b/ydb/core/fq/libs/protos/fq_private.proto @@ -3,6 +3,7 @@ option cc_enable_arenas = true; package Fq.Private; +import "ydb/core/fq/libs/config/protos/storage.proto"; import "ydb/core/fq/libs/protos/dq_effects.proto"; import "ydb/library/yql/dq/actors/protos/dq_status_codes.proto"; @@ -116,6 +117,7 @@ message GetTaskResult { FederatedQuery.QueryContent.QuerySyntax query_syntax = 34; string operation_id = 35; string execution_id = 36; + NFq.NConfig.TYdbStorageConfig compute_connection = 37; } repeated Task tasks = 1; } diff --git a/ydb/core/fq/libs/protos/ya.make b/ydb/core/fq/libs/protos/ya.make index f53349ac5b8..c9ae3ae3da1 100644 --- a/ydb/core/fq/libs/protos/ya.make +++ b/ydb/core/fq/libs/protos/ya.make @@ -1,8 +1,9 @@ PROTO_LIBRARY() PEERDIR( - ydb/public/api/protos + ydb/core/fq/libs/config/protos ydb/library/yql/dq/actors/protos + ydb/public/api/protos ) SRCS( diff --git a/ydb/core/fq/libs/test_connection/events/events.h b/ydb/core/fq/libs/test_connection/events/events.h index 875077e96eb..743c706cc45 100644 --- a/ydb/core/fq/libs/test_connection/events/events.h +++ b/ydb/core/fq/libs/test_connection/events/events.h @@ -32,7 +32,8 @@ struct TEvTestConnection { const TString& cloudId, const TPermissions& permissions, TMaybe<TQuotaMap> quotas, - TTenantInfo::TPtr tenantInfo) + TTenantInfo::TPtr tenantInfo, + const FederatedQuery::Internal::ComputeDatabaseInternal& computeDatabase) : CloudId(cloudId) , Scope(scope) , Request(request) @@ -41,6 +42,7 @@ struct TEvTestConnection { , Permissions(permissions) , Quotas(std::move(quotas)) , TenantInfo(tenantInfo) + , ComputeDatabase(computeDatabase) { } @@ -52,6 +54,7 @@ struct TEvTestConnection { TPermissions Permissions; const TMaybe<TQuotaMap> Quotas; TTenantInfo::TPtr TenantInfo; + TMaybe<FederatedQuery::Internal::ComputeDatabaseInternal> ComputeDatabase; }; struct TEvTestConnectionResponse : NActors::TEventLocal<TEvTestConnectionResponse, EvTestConnectionResponse> { diff --git a/ydb/core/fq/libs/ydb/ydb.cpp b/ydb/core/fq/libs/ydb/ydb.cpp index b20d38c672c..ca72290bccc 100644 --- a/ydb/core/fq/libs/ydb/ydb.cpp +++ b/ydb/core/fq/libs/ydb/ydb.cpp @@ -296,4 +296,22 @@ TFuture<TStatus> RollbackTransaction(const TGenerationContextPtr& context) { return future; } +NKikimr::TYdbCredentialsSettings GetYdbCredentialSettings(const NConfig::TYdbStorageConfig& config) { + TString oauth; + if (config.GetToken()) { + oauth = config.GetToken(); + } else if (config.GetOAuthFile()) { + oauth = StripString(TFileInput(config.GetOAuthFile()).ReadAll()); + } else { + oauth = GetEnv("YDB_TOKEN"); + } + + NKikimr::TYdbCredentialsSettings credSettings; + credSettings.UseLocalMetadata = config.GetUseLocalMetadataService(); + credSettings.OAuthToken = oauth; + credSettings.SaKeyFile = config.GetSaKeyFile(); + credSettings.IamEndpoint = config.GetIamEndpoint(); + return credSettings; +} + } // namespace NFq diff --git a/ydb/core/fq/libs/ydb/ydb.h b/ydb/core/fq/libs/ydb/ydb.h index 85891e336bd..200de1df104 100644 --- a/ydb/core/fq/libs/ydb/ydb.h +++ b/ydb/core/fq/libs/ydb/ydb.h @@ -134,33 +134,17 @@ NThreading::TFuture<NYdb::TStatus> CheckGeneration(const TGenerationContextPtr& NThreading::TFuture<NYdb::TStatus> RollbackTransaction(const TGenerationContextPtr& context); +NKikimr::TYdbCredentialsSettings GetYdbCredentialSettings(const NConfig::TYdbStorageConfig& config); + template <class TSettings> TSettings GetClientSettings(const NConfig::TYdbStorageConfig& config, const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory) { - TString oauth; - if (config.GetToken()) { - oauth = config.GetToken(); - } else if (config.GetOAuthFile()) { - oauth = StripString(TFileInput(config.GetOAuthFile()).ReadAll()); - } else { - oauth = GetEnv("YDB_TOKEN"); - } - - const TString iamEndpoint = config.GetIamEndpoint(); - const TString saKeyFile = config.GetSaKeyFile(); - TSettings settings; settings .DiscoveryEndpoint(config.GetEndpoint()) .Database(config.GetDatabase()); - NKikimr::TYdbCredentialsSettings credSettings; - credSettings.UseLocalMetadata = config.GetUseLocalMetadataService(); - credSettings.OAuthToken = oauth; - credSettings.SaKeyFile = config.GetSaKeyFile(); - credSettings.IamEndpoint = config.GetIamEndpoint(); - - settings.CredentialsProviderFactory(credProviderFactory(credSettings)); + settings.CredentialsProviderFactory(credProviderFactory(GetYdbCredentialSettings(config))); if (config.GetUseLocalMetadataService()) { settings.SslCredentials(NYdb::TSslCredentials(true)); diff --git a/ydb/library/ycloud/impl/grpc_service_client.h b/ydb/library/ycloud/impl/grpc_service_client.h index d2837a69264..a9c2ff571c7 100644 --- a/ydb/library/ycloud/impl/grpc_service_client.h +++ b/ydb/library/ycloud/impl/grpc_service_client.h @@ -1,9 +1,10 @@ #pragma once +#include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/actorsystem.h> #include <library/cpp/actors/core/log.h> -#include <library/cpp/actors/core/actor_bootstrapped.h> -#include <library/cpp/grpc/client/grpc_client_low.h> #include <library/cpp/digest/crc32c/crc32c.h> +#include <library/cpp/grpc/client/grpc_client_low.h> +#include <ydb/core/protos/services.pb.h> #include "grpc_service_settings.h" #define BLOG_GRPC_D(stream) LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::GRPC_CLIENT, stream) @@ -97,10 +98,12 @@ public: BLOG_GRPC_DC(*actorSystem, prefix << "Status " << status); } auto respEv = MakeHolder<typename TCallType::TResponseEventType>(); + const auto sender = request->Sender; + const auto cookie = request->Cookie; respEv->Request = request; respEv->Status = status; respEv->Response = response; - actorSystem->Send(respEv->Request->Sender, respEv.Release()); + actorSystem->Send(sender, respEv.Release(), 0, cookie); }; BLOG_GRPC_D(Prefix(requestId) << "Request " << Trim(TCallType::Obfuscate(request))); |