diff options
author | auzhegov <auzhegov@yandex-team.com> | 2023-07-25 10:37:07 +0300 |
---|---|---|
committer | auzhegov <auzhegov@yandex-team.com> | 2023-07-25 10:37:07 +0300 |
commit | a8d09bf9e0b3e44c19dbfcc49e7f4d558513897b (patch) | |
tree | e74a45937ad4198be462b5f907e01e0d9f80d8cc | |
parent | 8bc959020d0214c816dea46289a86351455cc27b (diff) | |
download | ydb-a8d09bf9e0b3e44c19dbfcc49e7f4d558513897b.tar.gz |
Delete/Modify Connection/Binding
25 files changed, 2787 insertions, 1260 deletions
diff --git a/ydb/core/fq/libs/control_plane_proxy/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/control_plane_proxy/CMakeLists.darwin-x86_64.txt index 631673b30cc..959fb597690 100644 --- a/ydb/core/fq/libs/control_plane_proxy/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/fq/libs/control_plane_proxy/CMakeLists.darwin-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(actors) add_subdirectory(events) add_subdirectory(ut) @@ -23,6 +24,7 @@ target_link_libraries(fq-libs-control_plane_proxy PUBLIC libs-compute-ydb compute-ydb-control_plane fq-libs-control_plane_config + libs-control_plane_proxy-actors libs-control_plane_proxy-events fq-libs-control_plane_storage libs-rate_limiter-events diff --git a/ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-aarch64.txt index df189785d00..d24f05ca436 100644 --- a/ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-aarch64.txt +++ b/ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-aarch64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(actors) add_subdirectory(events) add_subdirectory(ut) @@ -24,6 +25,7 @@ target_link_libraries(fq-libs-control_plane_proxy PUBLIC libs-compute-ydb compute-ydb-control_plane fq-libs-control_plane_config + libs-control_plane_proxy-actors libs-control_plane_proxy-events fq-libs-control_plane_storage libs-rate_limiter-events diff --git a/ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-x86_64.txt index df189785d00..d24f05ca436 100644 --- a/ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-x86_64.txt +++ b/ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(actors) add_subdirectory(events) add_subdirectory(ut) @@ -24,6 +25,7 @@ target_link_libraries(fq-libs-control_plane_proxy PUBLIC libs-compute-ydb compute-ydb-control_plane fq-libs-control_plane_config + libs-control_plane_proxy-actors libs-control_plane_proxy-events fq-libs-control_plane_storage libs-rate_limiter-events diff --git a/ydb/core/fq/libs/control_plane_proxy/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/control_plane_proxy/CMakeLists.windows-x86_64.txt index 631673b30cc..959fb597690 100644 --- a/ydb/core/fq/libs/control_plane_proxy/CMakeLists.windows-x86_64.txt +++ b/ydb/core/fq/libs/control_plane_proxy/CMakeLists.windows-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(actors) add_subdirectory(events) add_subdirectory(ut) @@ -23,6 +24,7 @@ target_link_libraries(fq-libs-control_plane_proxy PUBLIC libs-compute-ydb compute-ydb-control_plane fq-libs-control_plane_config + libs-control_plane_proxy-actors libs-control_plane_proxy-events fq-libs-control_plane_storage libs-rate_limiter-events diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..870e205dcc4 --- /dev/null +++ b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,29 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(libs-control_plane_proxy-actors) +target_compile_options(libs-control_plane_proxy-actors PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(libs-control_plane_proxy-actors PUBLIC + contrib-libs-cxxsupp + yutil + contrib-libs-fmt + library-cpp-iterator + libs-control_plane_proxy-events + libs-control_plane_storage-events + fq-libs-result_formatter + core-kqp-provider + library-db_pool-protos +) +target_sources(libs-control_plane_proxy-actors PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp +) diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..5429166e90d --- /dev/null +++ b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-aarch64.txt @@ -0,0 +1,30 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(libs-control_plane_proxy-actors) +target_compile_options(libs-control_plane_proxy-actors PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(libs-control_plane_proxy-actors PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + contrib-libs-fmt + library-cpp-iterator + libs-control_plane_proxy-events + libs-control_plane_storage-events + fq-libs-result_formatter + core-kqp-provider + library-db_pool-protos +) +target_sources(libs-control_plane_proxy-actors PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp +) diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..5429166e90d --- /dev/null +++ b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-x86_64.txt @@ -0,0 +1,30 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(libs-control_plane_proxy-actors) +target_compile_options(libs-control_plane_proxy-actors PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(libs-control_plane_proxy-actors PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + contrib-libs-fmt + library-cpp-iterator + libs-control_plane_proxy-events + libs-control_plane_storage-events + fq-libs-result_formatter + core-kqp-provider + library-db_pool-protos +) +target_sources(libs-control_plane_proxy-actors PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp +) diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.txt b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.txt new file mode 100644 index 00000000000..f8b31df0c11 --- /dev/null +++ b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..870e205dcc4 --- /dev/null +++ b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.windows-x86_64.txt @@ -0,0 +1,29 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(libs-control_plane_proxy-actors) +target_compile_options(libs-control_plane_proxy-actors PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(libs-control_plane_proxy-actors PUBLIC + contrib-libs-cxxsupp + yutil + contrib-libs-fmt + library-cpp-iterator + libs-control_plane_proxy-events + libs-control_plane_storage-events + fq-libs-result_formatter + core-kqp-provider + library-db_pool-protos +) +target_sources(libs-control_plane_proxy-actors PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp +) diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/base_actor.h b/ydb/core/fq/libs/control_plane_proxy/actors/base_actor.h new file mode 100644 index 00000000000..aeb6ec9409a --- /dev/null +++ b/ydb/core/fq/libs/control_plane_proxy/actors/base_actor.h @@ -0,0 +1,105 @@ +#pragma once + +#include "counters.h" + +#include <contrib/libs/fmt/include/fmt/format.h> +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> + +#include <ydb/core/fq/libs/actors/logging/log.h> +#include <ydb/core/fq/libs/config/protos/issue_id.pb.h> +#include <ydb/core/fq/libs/config/yq_issue.h> +#include <ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.h> +#include <ydb/library/yql/public/issue/yql_issue.h> + +namespace NFq { +namespace NPrivate { + +using namespace NActors; +using namespace NThreading; +using namespace NYdb; + +template<typename T> +struct TBaseActorTypeTag { + using TRequest = typename T::TRequest; + using TResponse = typename T::TResponse; +}; + +template<typename TDerived> +class TBaseActor : public NActors::TActorBootstrapped<TDerived> { +public: + using TBase = NActors::TActorBootstrapped<TDerived>; + using TBase::Become; + using TBase::PassAway; + using TBase::SelfId; + using TBase::Send; + + using TEventRequestPtr = typename TBaseActorTypeTag<TDerived>::TRequest::TPtr; + using TEventResponse = typename TBaseActorTypeTag<TDerived>::TResponse; + +public: + TBaseActor(const TActorId& sender, + const TEventRequestPtr request, + TDuration requestTimeout, + const NPrivate::TRequestCommonCountersPtr& counters) + : Request(std::move(request)) + , Counters(counters) + , Sender(sender) + , RequestTimeout(requestTimeout) { } + + void Bootstrap() { + CPP_LOG_T("TBaseActor Bootstrap started. Actor id: " << SelfId()); + Become(&TDerived::StateFunc, RequestTimeout, new NActors::TEvents::TEvWakeup()); + Counters->InFly->Inc(); + BootstrapImpl(); + } + + void SendErrorMessageToSender(const NYql::TIssue& issue) { + Counters->Error->Inc(); + NYql::TIssues issues; + issues.AddIssue(issue); + Send(Sender, new TEventResponse(issues, {}), 0, Request->Cookie); + PassAway(); + } + + void SendRequestToSender() { + Counters->Ok->Inc(); + Send(Request->Forward(ControlPlaneProxyActorId())); + PassAway(); + } + + void HandleTimeout() { + CPP_LOG_D("TBaseActor Timeout occurred. Actor id: " + << SelfId()); + Counters->Timeout->Inc(); + SendErrorMessageToSender(MakeErrorIssue( + TIssuesIds::TIMEOUT, + "Timeout occurred. Try repeating the request later")); + } + + void HandleError(const TString& message, EStatus status, const NYql::TIssues& issues) { + TString errorMessage = TStringBuilder{} << message << ". Status " << status; + HandleError(errorMessage, issues); + } + + void HandleError(const TString& message, const NYql::TIssues& issues) { + CPP_LOG_E(message); + NYql::TIssue issue = MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, message); + for (auto& subIssue : issues) { + issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue)); + } + SendErrorMessageToSender(std::move(issue)); + } + +private: + virtual void BootstrapImpl() = 0; + +protected: + const TEventRequestPtr Request; + const NPrivate::TRequestCommonCountersPtr Counters; + const TActorId Sender; + const TDuration RequestTimeout; +}; + +} // namespace NPrivate +} // namespace NFq diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp new file mode 100644 index 00000000000..168e4f6fa2c --- /dev/null +++ b/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp @@ -0,0 +1,331 @@ +#include "control_plane_storage_requester_actor.h" +#include "base_actor.h" + +#include <contrib/libs/fmt/include/fmt/format.h> +#include <library/cpp/actors/core/event.h> + +#include <ydb/core/fq/libs/control_plane_proxy/events/events.h> +#include <ydb/core/fq/libs/control_plane_storage/control_plane_storage.h> +#include <ydb/core/fq/libs/control_plane_storage/events/events.h> +#include <ydb/library/yql/public/issue/yql_issue.h> +#include <ydb/public/api/protos/draft/fq.pb.h> +#include <ydb/public/sdk/cpp/client/ydb_table/table.h> + +namespace NFq { +namespace NPrivate { + +using namespace NActors; +using namespace NFq::NConfig; +using namespace NKikimr; +using namespace NThreading; + +template<class TEventRequest, class TEventResponse, class TCPSEventRequest, class TCPSEventResponse> +class TControlPlaneStorageRequesterActor; + +template<class TEventRequest, class TEventResponse, class TCPSEventRequest, class TCPSEventResponse> +struct TBaseActorTypeTag< + TControlPlaneStorageRequesterActor<TEventRequest, TEventResponse, TCPSEventRequest, TCPSEventResponse>> { + using TRequest = TEventRequest; + using TResponse = TEventResponse; +}; + +template<class TEventRequest, class TEventResponse, class TCPSEventRequest, class TCPSEventResponse> +class TControlPlaneStorageRequesterActor : + public TBaseActor< + TControlPlaneStorageRequesterActor<TEventRequest, TEventResponse, TCPSEventRequest, TCPSEventResponse>> { +private: + using TBase = TBaseActor< + TControlPlaneStorageRequesterActor<TEventRequest, TEventResponse, TCPSEventRequest, TCPSEventResponse>>; + using TBase::SelfId; + using TBase::Send; + using TBase::Request; + + using TEventRequestPtr = typename TEventRequest::TPtr; + +public: + using TCPSRequestFactory = + std::function<typename TCPSEventRequest::TProto(const TEventRequestPtr& request)>; + using TErrorMessageFactoryMethod = std::function<TString(const NYql::TIssues& issues)>; + using TEntityNameExtractorFactoryMethod = + std::function<void(const TEventRequestPtr& request, + const typename TCPSEventResponse::TProto& response)>; + + TControlPlaneStorageRequesterActor(const TActorId& sender, + const TEventRequestPtr request, + TDuration requestTimeout, + const NPrivate::TRequestCommonCountersPtr& counters, + TPermissions permissions, + TCPSRequestFactory cpsRequestFactory, + TErrorMessageFactoryMethod errorMessageFactoryMethod, + TEntityNameExtractorFactoryMethod entityNameExtractorFactoryMethod) + : TBaseActor< + TControlPlaneStorageRequesterActor<TEventRequest, TEventResponse, TCPSEventRequest, TCPSEventResponse>>( + sender, std::move(request), requestTimeout, counters) + , Permissions(permissions) + , CPSRequestFactory(cpsRequestFactory) + , ErrorMessageFactoryMethod(errorMessageFactoryMethod) + , EntityNameExtractorFactoryMethod(entityNameExtractorFactoryMethod) { } + + static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_REQUEST_CONTROL_PLANE_STORAGE"; + + void BootstrapImpl() override { + SendCPSRequest(); + } + + void SendCPSRequest() { + CPP_LOG_I("TControlPlaneStorageRequesterActor Sending CPS request. Actor id: " << TBase::SelfId()); + const auto& request = Request; + auto event = new TCPSEventRequest("yandexcloud://" + request->Get()->FolderId, + CPSRequestFactory(request), + request->Get()->User, + request->Get()->Token, + request->Get()->CloudId, + Permissions, + request->Get()->Quotas, + request->Get()->TenantInfo, + {}); + Send(ControlPlaneStorageServiceActorId(), event); + } + + STRICT_STFUNC(StateFunc, + cFunc(NActors::TEvents::TSystem::Wakeup, TBase::HandleTimeout); + hFunc(TCPSEventResponse, Handle); + ) + + void Handle(typename TCPSEventResponse::TPtr& event) { + CPP_LOG_I("TControlPlaneStorageRequesterActor Handling CPS response. Actor id: " << TBase::SelfId()); + auto issues = event->Get()->Issues; + if (!issues.Empty()) { + CPP_LOG_I("TControlPlaneStorageRequesterActor Handling CPS response. Request finished with issues. Actor id: " << TBase::SelfId()); + TString errorMessage = ErrorMessageFactoryMethod(issues); + TBase::HandleError(errorMessage, issues); + return; + } + + CPP_LOG_I("TControlPlaneStorageRequesterActor Handling CPS response. Request finished successfully. Actor id: " << TBase::SelfId()); + EntityNameExtractorFactoryMethod(Request, event->Get()->Result); + TBase::SendRequestToSender(); + } + +private: + TPermissions Permissions; + TCPSRequestFactory CPSRequestFactory; + TErrorMessageFactoryMethod ErrorMessageFactoryMethod; + TEntityNameExtractorFactoryMethod EntityNameExtractorFactoryMethod; +}; + +/// Discover connection_name +TString DescribeConnectionErrorMessageFactoryMethod(const NYql::TIssues& issues) { + Y_UNUSED(issues); + return "Couldn't resolve connection"; +}; +NActors::IActor* MakeDiscoverYDBConnectionName( + const TActorId& sender, + const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& request, + TCounters& counters, + TDuration requestTimeout, + TPermissions permissions) { + auto cpsRequestFactory = + [](const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& event) { + FederatedQuery::DescribeConnectionRequest result; + auto connectionId = event->Get()->Request.content().connection_id(); + result.set_connection_id(connectionId); + return result; + }; + auto entityNameExtractorFactoryMethod = + [](const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& event, + const FederatedQuery::DescribeConnectionResult& result) { + event->Get()->ConnectionName = result.connection().content().name(); + }; + + return new TControlPlaneStorageRequesterActor<TEvControlPlaneProxy::TEvCreateBindingRequest, + TEvControlPlaneProxy::TEvCreateBindingResponse, + TEvControlPlaneStorage::TEvDescribeConnectionRequest, + TEvControlPlaneStorage::TEvDescribeConnectionResponse>( + sender, + request, + requestTimeout, + counters.GetCommonCounters(RTC_DESCRIBE_CPS_ENTITY), + permissions, + cpsRequestFactory, + DescribeConnectionErrorMessageFactoryMethod, + entityNameExtractorFactoryMethod); +} + +NActors::IActor* MakeDiscoverYDBConnectionName( + const TActorId& sender, + const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& request, + TCounters& counters, + TDuration requestTimeout, + TPermissions permissions) { + auto cpsRequestFactory = + [](const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& event) { + FederatedQuery::DescribeConnectionRequest result; + auto connectionId = event->Get()->Request.connection_id(); + result.set_connection_id(connectionId); + return result; + }; + auto entityNameExtractorFactoryMethod = + [](const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& event, + const FederatedQuery::DescribeConnectionResult& result) { + event->Get()->OldConnectionContent = result.connection().content(); + }; + + return new TControlPlaneStorageRequesterActor<TEvControlPlaneProxy::TEvModifyConnectionRequest, + TEvControlPlaneProxy::TEvModifyConnectionResponse, + TEvControlPlaneStorage::TEvDescribeConnectionRequest, + TEvControlPlaneStorage::TEvDescribeConnectionResponse>( + sender, + request, + requestTimeout, + counters.GetCommonCounters(RTC_DESCRIBE_CPS_ENTITY), + permissions, + cpsRequestFactory, + DescribeConnectionErrorMessageFactoryMethod, + entityNameExtractorFactoryMethod); +} + +NActors::IActor* MakeDiscoverYDBConnectionName( + const TActorId& sender, + const TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr& request, + TCounters& counters, + TDuration requestTimeout, + TPermissions permissions) { + auto cpsRequestFactory = + [](const TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr& event) { + FederatedQuery::DescribeConnectionRequest result; + auto connectionId = event->Get()->Request.connection_id(); + result.set_connection_id(connectionId); + return result; + }; + auto entityNameExtractorFactoryMethod = + [](const TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr& event, + const FederatedQuery::DescribeConnectionResult& result) { + event->Get()->ConnectionContent = result.connection().content(); + }; + + return new TControlPlaneStorageRequesterActor<TEvControlPlaneProxy::TEvDeleteConnectionRequest, + TEvControlPlaneProxy::TEvDeleteConnectionResponse, + TEvControlPlaneStorage::TEvDescribeConnectionRequest, + TEvControlPlaneStorage::TEvDescribeConnectionResponse>( + sender, + request, + requestTimeout, + counters.GetCommonCounters(RTC_DESCRIBE_CPS_ENTITY), + permissions, + cpsRequestFactory, + DescribeConnectionErrorMessageFactoryMethod, + entityNameExtractorFactoryMethod); +} + +NActors::IActor* MakeDiscoverYDBConnectionName( + const TActorId& sender, + const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& request, + TCounters& counters, + TDuration requestTimeout, + TPermissions permissions) { + auto cpsRequestFactory = + [](const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& event) { + FederatedQuery::DescribeConnectionRequest result; + auto connectionId = *event->Get()->ConnectionId; + result.set_connection_id(connectionId); + return result; + }; + auto entityNameExtractorFactoryMethod = + [](const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& event, + const FederatedQuery::DescribeConnectionResult& result) { + event->Get()->ConnectionName = result.connection().content().name(); + }; + + return new TControlPlaneStorageRequesterActor<TEvControlPlaneProxy::TEvModifyBindingRequest, + TEvControlPlaneProxy::TEvModifyBindingResponse, + TEvControlPlaneStorage::TEvDescribeConnectionRequest, + TEvControlPlaneStorage::TEvDescribeConnectionResponse>( + sender, + request, + requestTimeout, + counters.GetCommonCounters(RTC_DESCRIBE_CPS_ENTITY), + permissions, + cpsRequestFactory, + DescribeConnectionErrorMessageFactoryMethod, + entityNameExtractorFactoryMethod); +} + +/// Discover binding_name + +NActors::IActor* MakeDiscoverYDBBindingName( + const TActorId& sender, + const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& request, + TCounters& counters, + TDuration requestTimeout, + TPermissions permissions) { + auto cpsRequestFactory = + [](const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& event) { + FederatedQuery::DescribeBindingRequest result; + result.set_binding_id(event->Get()->Request.binding_id()); + return result; + }; + + auto errorMessageFactoryMethod = [](const NYql::TIssues& issues) -> TString { + Y_UNUSED(issues); + return "Couldn't resolve binding name"; + }; + auto entityNameExtractorFactoryMethod = + [](const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& event, + const FederatedQuery::DescribeBindingResult& result) { + event->Get()->OldBindingName = result.binding().content().name(); + event->Get()->ConnectionId = result.binding().content().connection_id(); + }; + + return new TControlPlaneStorageRequesterActor<TEvControlPlaneProxy::TEvModifyBindingRequest, + TEvControlPlaneProxy::TEvModifyBindingResponse, + TEvControlPlaneStorage::TEvDescribeBindingRequest, + TEvControlPlaneStorage::TEvDescribeBindingResponse>( + sender, + request, + requestTimeout, + counters.GetCommonCounters(RTC_DESCRIBE_CPS_ENTITY), + permissions, + cpsRequestFactory, + errorMessageFactoryMethod, + entityNameExtractorFactoryMethod); +} + +NActors::IActor* MakeDiscoverYDBBindingName( + const TActorId& sender, + const TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr& request, + TCounters& counters, + TDuration requestTimeout, + TPermissions permissions) { + auto cpsRequestFactory = + [](const TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr& event) { + FederatedQuery::DescribeBindingRequest result; + result.set_binding_id(event->Get()->Request.binding_id()); + return result; + }; + + auto errorMessageFactoryMethod = [](const NYql::TIssues& issues) -> TString { + Y_UNUSED(issues); + return "Couldn't resolve binding name"; + }; + auto entityNameExtractorFactoryMethod = + [](const TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr& event, + const FederatedQuery::DescribeBindingResult& result) { + event->Get()->OldBindingName = result.binding().content().name(); + }; + + return new TControlPlaneStorageRequesterActor<TEvControlPlaneProxy::TEvDeleteBindingRequest, + TEvControlPlaneProxy::TEvDeleteBindingResponse, + TEvControlPlaneStorage::TEvDescribeBindingRequest, + TEvControlPlaneStorage::TEvDescribeBindingResponse>( + sender, + request, + requestTimeout, + counters.GetCommonCounters(RTC_DESCRIBE_CPS_ENTITY), + permissions, + cpsRequestFactory, + errorMessageFactoryMethod, + entityNameExtractorFactoryMethod); +} +} // namespace NPrivate +} // namespace NFq diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h b/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h new file mode 100644 index 00000000000..24995807ea2 --- /dev/null +++ b/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h @@ -0,0 +1,58 @@ +#pragma once + +#include "counters.h" + +#include <library/cpp/actors/core/actor.h> +#include <ydb/core/fq/libs/control_plane_proxy/events/events.h> + +namespace NFq { +namespace NPrivate { + +using namespace NActors; + +/// Discover connection_name +NActors::IActor* MakeDiscoverYDBConnectionName( + const TActorId& sender, + const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& request, + TCounters& counters, + TDuration requestTimeout, + TPermissions permissions); + +NActors::IActor* MakeDiscoverYDBConnectionName( + const TActorId& sender, + const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& request, + TCounters& counters, + TDuration requestTimeout, + TPermissions permissions); + +NActors::IActor* MakeDiscoverYDBConnectionName( + const TActorId& sender, + const TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr& request, + TCounters& counters, + TDuration requestTimeout, + TPermissions permissions); + +NActors::IActor* MakeDiscoverYDBConnectionName( + const TActorId& sender, + const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& request, + TCounters& counters, + TDuration requestTimeout, + TPermissions permissions); + +/// Discover binding_name + +NActors::IActor* MakeDiscoverYDBBindingName( + const TActorId& sender, + const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& request, + TCounters& counters, + TDuration requestTimeout, + TPermissions permissions); + +NActors::IActor* MakeDiscoverYDBBindingName( + const TActorId& sender, + const TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr& request, + TCounters& counters, + TDuration requestTimeout, + TPermissions permissions); +} // namespace NPrivate +} // namespace NFq diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/counters.h b/ydb/core/fq/libs/control_plane_proxy/actors/counters.h new file mode 100644 index 00000000000..8ab57302955 --- /dev/null +++ b/ydb/core/fq/libs/control_plane_proxy/actors/counters.h @@ -0,0 +1,297 @@ +#pragma once + +#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <ydb/core/fq/libs/common/util.h> +#include <ydb/core/fq/libs/common/cache.h> + +namespace NFq { +namespace NPrivate { + +struct TRequestScopeCounters : public virtual TThrRefBase { + const TString Name; + + ::NMonitoring::TDynamicCounterPtr Counters; + ::NMonitoring::TDynamicCounters::TCounterPtr InFly; + ::NMonitoring::TDynamicCounters::TCounterPtr Ok; + ::NMonitoring::TDynamicCounters::TCounterPtr Error; + ::NMonitoring::TDynamicCounters::TCounterPtr Timeout; + ::NMonitoring::TDynamicCounters::TCounterPtr Retry; + + explicit TRequestScopeCounters(const TString& name) + : Name(name) { } + + void Register(const ::NMonitoring::TDynamicCounterPtr& counters) { + Counters = counters; + ::NMonitoring::TDynamicCounterPtr subgroup = + counters->GetSubgroup("request_scope", Name); + InFly = subgroup->GetCounter("InFly", false); + Ok = subgroup->GetCounter("Ok", true); + Error = subgroup->GetCounter("Error", true); + Timeout = subgroup->GetCounter("Timeout", true); + Timeout = subgroup->GetCounter("Retry", true); + } + + virtual ~TRequestScopeCounters() override { + Counters->RemoveSubgroup("request_scope", Name); + } + +private: + static ::NMonitoring::IHistogramCollectorPtr GetLatencyHistogramBuckets() { + return ::NMonitoring::ExplicitHistogram( + {0, 1, 2, 5, 10, 20, 50, 100, 500, 1000, 2000, 5000, 10000, 30000, 50000, 500000}); + } +}; + +struct TRequestCommonCounters : public virtual TThrRefBase { + const TString Name; + + ::NMonitoring::TDynamicCounterPtr Counters; + ::NMonitoring::TDynamicCounters::TCounterPtr InFly; + ::NMonitoring::TDynamicCounters::TCounterPtr Ok; + ::NMonitoring::TDynamicCounters::TCounterPtr Error; + ::NMonitoring::TDynamicCounters::TCounterPtr Timeout; + ::NMonitoring::TDynamicCounters::TCounterPtr Retry; + ::NMonitoring::THistogramPtr LatencyMs; + + explicit TRequestCommonCounters(const TString& name) + : Name(name) { } + + void Register(const ::NMonitoring::TDynamicCounterPtr& counters) { + Counters = counters; + ::NMonitoring::TDynamicCounterPtr subgroup = + counters->GetSubgroup("request_common", Name); + InFly = subgroup->GetCounter("InFly", false); + Ok = subgroup->GetCounter("Ok", true); + Error = subgroup->GetCounter("Error", true); + Timeout = subgroup->GetCounter("Timeout", true); + Retry = subgroup->GetCounter("Retry", true); + LatencyMs = subgroup->GetHistogram("LatencyMs", GetLatencyHistogramBuckets()); + } + + virtual ~TRequestCommonCounters() override { + Counters->RemoveSubgroup("request_common", Name); + } + +private: + static ::NMonitoring::IHistogramCollectorPtr GetLatencyHistogramBuckets() { + return ::NMonitoring::ExplicitHistogram( + {0, 1, 2, 5, 10, 20, 50, 100, 500, 1000, 2000, 5000, 10000, 30000, 50000, 500000}); + } +}; + +using TRequestScopeCountersPtr = TIntrusivePtr<TRequestScopeCounters>; +using TRequestCommonCountersPtr = TIntrusivePtr<TRequestCommonCounters>; + +struct TRequestCounters { + TRequestScopeCountersPtr Scope; + TRequestCommonCountersPtr Common; + + void IncInFly() { + Scope->InFly->Inc(); + Common->InFly->Inc(); + } + + void DecInFly() { + Scope->InFly->Dec(); + Common->InFly->Dec(); + } + + void IncOk() { + Scope->Ok->Inc(); + Common->Ok->Inc(); + } + + void IncError() { + Scope->Error->Inc(); + Common->Error->Inc(); + } + + void IncTimeout() { + Scope->Timeout->Inc(); + Common->Timeout->Inc(); + } +}; + +enum ERequestTypeScope { + RTS_CREATE_QUERY, + RTS_LIST_QUERIES, + RTS_DESCRIBE_QUERY, + RTS_GET_QUERY_STATUS, + RTS_MODIFY_QUERY, + RTS_DELETE_QUERY, + RTS_CONTROL_QUERY, + RTS_GET_RESULT_DATA, + RTS_LIST_JOBS, + RTS_DESCRIBE_JOB, + RTS_CREATE_CONNECTION, + RTS_LIST_CONNECTIONS, + RTS_DESCRIBE_CONNECTION, + RTS_MODIFY_CONNECTION, + RTS_DELETE_CONNECTION, + RTS_TEST_CONNECTION, + RTS_CREATE_BINDING, + RTS_LIST_BINDINGS, + RTS_DESCRIBE_BINDING, + RTS_MODIFY_BINDING, + RTS_DELETE_BINDING, + RTS_MAX, +}; + +enum ERequestTypeCommon { + RTC_RESOLVE_FOLDER, + RTC_CREATE_QUERY, + RTC_LIST_QUERIES, + RTC_DESCRIBE_QUERY, + RTC_GET_QUERY_STATUS, + RTC_MODIFY_QUERY, + RTC_DELETE_QUERY, + RTC_CONTROL_QUERY, + RTC_GET_RESULT_DATA, + RTC_LIST_JOBS, + RTC_DESCRIBE_JOB, + RTC_CREATE_CONNECTION, + RTC_LIST_CONNECTIONS, + RTC_DESCRIBE_CONNECTION, + RTC_MODIFY_CONNECTION, + RTC_DELETE_CONNECTION, + RTC_TEST_CONNECTION, + RTC_CREATE_BINDING, + RTC_LIST_BINDINGS, + RTC_DESCRIBE_BINDING, + RTC_MODIFY_BINDING, + RTC_DELETE_BINDING, + RTC_RESOLVE_SUBJECT_TYPE, + RTC_DESCRIBE_CPS_ENTITY, + RTC_CREATE_YDB_SESSION, + RTC_CREATE_CONNECTION_IN_YDB, + RTC_CREATE_BINDING_IN_YDB, + RTC_MODIFY_CONNECTION_IN_YDB, + RTC_MODIFY_BINDING_IN_YDB, + RTC_DELETE_CONNECTION_IN_YDB, + RTC_DELETE_BINDING_IN_YDB, + RTC_CREATE_COMPUTE_DATABASE, + RTC_MAX, +}; + +class TCounters : public virtual TThrRefBase { + struct TMetricsScope { + TString CloudId; + TString Scope; + + TMetricsScope() = default; + + TMetricsScope(const TString& cloudId, const TString& scope) + : CloudId(cloudId) + , Scope(scope) + { } + + bool operator<(const TMetricsScope& right) const { + return std::tie(CloudId, Scope) < std::tie(right.CloudId, right.Scope); + } + }; + + using TScopeCounters = std::array<TRequestScopeCountersPtr, RTS_MAX>; + using TScopeCountersPtr = std::shared_ptr<TScopeCounters>; + + std::array<TRequestCommonCountersPtr, RTC_MAX> CommonRequests = CreateArray<RTC_MAX, TRequestCommonCountersPtr>({ + {MakeIntrusive<TRequestCommonCounters>("ResolveFolder")}, + {MakeIntrusive<TRequestCommonCounters>("CreateQuery")}, + {MakeIntrusive<TRequestCommonCounters>("ListQueries")}, + {MakeIntrusive<TRequestCommonCounters>("DescribeQuery")}, + {MakeIntrusive<TRequestCommonCounters>("GetQueryStatus")}, + {MakeIntrusive<TRequestCommonCounters>("ModifyQuery")}, + {MakeIntrusive<TRequestCommonCounters>("DeleteQuery")}, + {MakeIntrusive<TRequestCommonCounters>("ControlQuery")}, + {MakeIntrusive<TRequestCommonCounters>("GetResultData")}, + {MakeIntrusive<TRequestCommonCounters>("ListJobs")}, + {MakeIntrusive<TRequestCommonCounters>("DescribeJob")}, + {MakeIntrusive<TRequestCommonCounters>("CreateConnection")}, + {MakeIntrusive<TRequestCommonCounters>("ListConnections")}, + {MakeIntrusive<TRequestCommonCounters>("DescribeConnection")}, + {MakeIntrusive<TRequestCommonCounters>("ModifyConnection")}, + {MakeIntrusive<TRequestCommonCounters>("DeleteConnection")}, + {MakeIntrusive<TRequestCommonCounters>("TestConnection")}, + {MakeIntrusive<TRequestCommonCounters>("CreateBinding")}, + {MakeIntrusive<TRequestCommonCounters>("ListBindings")}, + {MakeIntrusive<TRequestCommonCounters>("DescribeBinding")}, + {MakeIntrusive<TRequestCommonCounters>("ModifyBinding")}, + {MakeIntrusive<TRequestCommonCounters>("DeleteBinding")}, + {MakeIntrusive<TRequestCommonCounters>("ResolveSubjectType")}, + {MakeIntrusive<TRequestCommonCounters>("DescribeCPSEntity")}, + {MakeIntrusive<TRequestCommonCounters>("CreateYDBSession")}, + {MakeIntrusive<TRequestCommonCounters>("CreateConnectionInYDB")}, + {MakeIntrusive<TRequestCommonCounters>("CreateBindingInYDB")}, + {MakeIntrusive<TRequestCommonCounters>("ModifyConnectionInYDB")}, + {MakeIntrusive<TRequestCommonCounters>("ModifyBindingInYDB")}, + {MakeIntrusive<TRequestCommonCounters>("DeleteConnectionInYDB")}, + {MakeIntrusive<TRequestCommonCounters>("DeleteBindingInYDB")}, + {MakeIntrusive<TRequestCommonCounters>("CreateComputeDatabase")}, + }); + + TTtlCache<TMetricsScope, TScopeCountersPtr, TMap> ScopeCounters{TTtlCacheSettings{}.SetTtl(TDuration::Days(1))}; + ::NMonitoring::TDynamicCounterPtr Counters; + +public: + explicit TCounters(const ::NMonitoring::TDynamicCounterPtr& counters) + : Counters(counters) + { + for (auto& request : CommonRequests) { + request->Register(Counters); + } + } + + TRequestCounters GetCounters(const TString& cloudId, const TString& scope, ERequestTypeScope scopeType, ERequestTypeCommon commonType) { + return {GetScopeCounters(cloudId, scope, scopeType), GetCommonCounters(commonType)}; + } + + TRequestCommonCountersPtr GetCommonCounters(ERequestTypeCommon type) { + return CommonRequests[type]; + } + + TRequestScopeCountersPtr GetScopeCounters(const TString& cloudId, const TString& scope, ERequestTypeScope type) { + TMetricsScope key{cloudId, scope}; + TMaybe<TScopeCountersPtr> cacheVal; + ScopeCounters.Get(key, &cacheVal); + if (cacheVal) { + return (**cacheVal)[type]; + } + + auto scopeRequests = std::make_shared<TScopeCounters>(CreateArray<RTS_MAX, TRequestScopeCountersPtr>({ + {MakeIntrusive<TRequestScopeCounters>("CreateQuery")}, + {MakeIntrusive<TRequestScopeCounters>("ListQueries")}, + {MakeIntrusive<TRequestScopeCounters>("DescribeQuery")}, + {MakeIntrusive<TRequestScopeCounters>("GetQueryStatus")}, + {MakeIntrusive<TRequestScopeCounters>("ModifyQuery")}, + {MakeIntrusive<TRequestScopeCounters>("DeleteQuery")}, + {MakeIntrusive<TRequestScopeCounters>("ControlQuery")}, + {MakeIntrusive<TRequestScopeCounters>("GetResultData")}, + {MakeIntrusive<TRequestScopeCounters>("ListJobs")}, + {MakeIntrusive<TRequestScopeCounters>("DescribeJob")}, + {MakeIntrusive<TRequestScopeCounters>("CreateConnection")}, + {MakeIntrusive<TRequestScopeCounters>("ListConnections")}, + {MakeIntrusive<TRequestScopeCounters>("DescribeConnection")}, + {MakeIntrusive<TRequestScopeCounters>("ModifyConnection")}, + {MakeIntrusive<TRequestScopeCounters>("DeleteConnection")}, + {MakeIntrusive<TRequestScopeCounters>("TestConnection")}, + {MakeIntrusive<TRequestScopeCounters>("CreateBinding")}, + {MakeIntrusive<TRequestScopeCounters>("ListBindings")}, + {MakeIntrusive<TRequestScopeCounters>("DescribeBinding")}, + {MakeIntrusive<TRequestScopeCounters>("ModifyBinding")}, + {MakeIntrusive<TRequestScopeCounters>("DeleteBinding")}, + })); + + auto scopeCounters = Counters + ->GetSubgroup("cloud_id", cloudId) + ->GetSubgroup("scope", scope); + + for (auto& request : *scopeRequests) { + request->Register(scopeCounters); + } + cacheVal = scopeRequests; + ScopeCounters.Put(key, cacheVal); + return (*scopeRequests)[type]; + } +}; + +} // namespace NPrivate +} // namespace NFq diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp new file mode 100644 index 00000000000..9859dac2f31 --- /dev/null +++ b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp @@ -0,0 +1,256 @@ +#include "query_utils.h" +#include "ydb/public/api/protos/draft/fq.pb.h" + +#include <contrib/libs/fmt/include/fmt/format.h> +#include <library/cpp/iterator/mapped.h> +#include <util/generic/maybe.h> +#include <util/string/join.h> + +#include <ydb/core/fq/libs/result_formatter/result_formatter.h> +#include <ydb/core/kqp/provider/yql_kikimr_results.h> + +namespace NFq { +namespace NPrivate { + +TString EscapeString(const TString& value, + const TString& enclosingSeq, + const TString& replaceWith) { + auto escapedValue = value; + SubstGlobal(escapedValue, enclosingSeq, replaceWith); + return escapedValue; +} +TString EscapeString(const TString& value, char enclosingChar) { + auto escapedValue = value; + SubstGlobal(escapedValue, + TString{enclosingChar}, + TStringBuilder{} << '\\' << enclosingChar); + return escapedValue; +} + +TString EncloseAndEscapeString(const TString& value, char enclosingChar) { + return TStringBuilder{} << enclosingChar + << EscapeString(value, + enclosingChar) + << enclosingChar; +} + +TString EncloseAndEscapeString(const TString& value, + const TString& enclosingSeq, + const TString& replaceWith) { + return TStringBuilder{} << enclosingSeq + << EscapeString(value, enclosingSeq, replaceWith) + << enclosingSeq; +} + +TString MakeCreateExternalDataTableQuery(const FederatedQuery::BindingContent& content, + const TString& connectionName) { + using namespace fmt::literals; + + auto bindingName = content.name(); + auto objectStorageParams = content.setting().object_storage(); + const auto& subset = objectStorageParams.subset(0); + + // Schema + NYql::TExprContext context; + auto columnsTransformFunction = [&](const Ydb::Column& column) -> TString { + NYdb::TTypeParser typeParser(column.type()); + auto node = MakeType(typeParser, context); + auto typeName = NYql::FormatType(node); + const TString notNull = + (node->GetKind() == NYql::ETypeAnnotationKind::Optional) ? "" : "NOT NULL"; + return fmt::format(" {columnName} {columnType} {notNull}", + "columnName"_a = EncloseAndEscapeString(column.name(), '`'), + "columnType"_a = typeName, + "notNull"_a = notNull); + }; + auto columnsBegin = + MakeMappedIterator(subset.schema().column().begin(), columnsTransformFunction); + auto columnsEnd = + MakeMappedIterator(subset.schema().column().end(), columnsTransformFunction); + + // WithOptions + auto withOptions = std::unordered_map<TString, TString>{}; + withOptions.insert({"DATA_SOURCE", TStringBuilder{} << '"' << connectionName << '"'}); + withOptions.insert({"LOCATION", EncloseAndEscapeString(subset.path_pattern(), '"')}); + if (!subset.format().Empty()) { + withOptions.insert({"FORMAT", EncloseAndEscapeString(subset.format(), '"')}); + } + if (!subset.compression().Empty()) { + withOptions.insert( + {"COMPRESSION", EncloseAndEscapeString(subset.compression(), '"')}); + } + for (auto& kv : subset.format_setting()) { + withOptions.insert({EncloseAndEscapeString(kv.first, '`'), + EncloseAndEscapeString(kv.second, '"')}); + } + + if (!subset.partitioned_by().empty()) { + auto stringEscapeMapper = [](const TString& value) { + return EscapeString(value, '"'); + }; + + auto partitionBy = TStringBuilder{} + << "\"[" + << JoinRange(", ", + MakeMappedIterator(subset.partitioned_by().begin(), + stringEscapeMapper), + MakeMappedIterator(subset.partitioned_by().end(), + stringEscapeMapper)) + << "]\""; + withOptions.insert({"PARTITIONED_BY", partitionBy}); + } + + for (auto& kv : subset.projection()) { + withOptions.insert({EncloseAndEscapeString(kv.first, '`'), + EncloseAndEscapeString(kv.second, '"')}); + } + + auto concatEscapedKeyValueMapper = [](const std::pair<TString, TString>& kv) -> TString { + return TStringBuilder{} << " " << kv.first << " = " << kv.second; + }; + + auto withOptionsBegin = + MakeMappedIterator(withOptions.begin(), concatEscapedKeyValueMapper); + auto withOptionsEnd = + MakeMappedIterator(withOptions.end(), concatEscapedKeyValueMapper); + + return fmt::format( + R"( + CREATE EXTERNAL TABLE {externalTableName} ( + {columns} + ) WITH ( + {withOptions} + );)", + "externalTableName"_a = EncloseAndEscapeString(bindingName, '`'), + "columns"_a = JoinRange(",\n", columnsBegin, columnsEnd), + "withOptions"_a = JoinRange(",\n", withOptionsBegin, withOptionsEnd)); +} + +TString SignAccountId(const TString& id, const TSigner::TPtr& signer) { + return signer ? signer->SignAccountId(id) : TString{}; +} + +TString CreateSecretObjectQuery(const FederatedQuery::IamAuth& auth, + const TString& name, + const TSigner::TPtr& signer) { + using namespace fmt::literals; + switch (auth.identity_case()) { + case FederatedQuery::IamAuth::kServiceAccount: { + if (!signer) { + return {}; + } + return fmt::format(R"( + UPSERT OBJECT {external_source} (TYPE SECRET) WITH value={signature}; + )", + "external_source"_a = EncloseAndEscapeString(name, '`'), + "signature"_a = EncloseAndEscapeString( + SignAccountId(auth.service_account().id(), signer), '`')); + } + case FederatedQuery::IamAuth::kNone: + case FederatedQuery::IamAuth::kCurrentIam: + // Do not replace with default. Adding a new auth item should cause a compilation error + case FederatedQuery::IamAuth::IDENTITY_NOT_SET: + return {}; + } +} + +TString CreateAuthParamsQuery(const FederatedQuery::IamAuth& auth, + const TString& name, + const TSigner::TPtr& signer) { + using namespace fmt::literals; + switch (auth.identity_case()) { + case FederatedQuery::IamAuth::kNone: + return R"(, AUTH_METHOD="NONE")"; + case FederatedQuery::IamAuth::kServiceAccount: + return fmt::format(R"(, + AUTH_METHOD="SERVICE_ACCOUNT", + SERVICE_ACCOUNT_ID={service_account_id}, + SERVICE_ACCOUNT_SECRET_NAME={secret_name} + )", + "service_account_id"_a = + EncloseAndEscapeString(auth.service_account().id(), '"'), + "external_source"_a = EncloseAndEscapeString(name, '"'), + "secret_name"_a = + EncloseAndEscapeString(signer ? name : TString{}, '"')); + case FederatedQuery::IamAuth::kCurrentIam: + // Do not replace with default. Adding a new auth item should cause a compilation error + case FederatedQuery::IamAuth::IDENTITY_NOT_SET: + return {}; + } +} + +TString MakeCreateExternalDataSourceQuery( + const FederatedQuery::ConnectionContent& connectionContent, + const TString& objectStorageEndpoint, + const TSigner::TPtr& signer) { + using namespace fmt::literals; + + auto sourceName = connectionContent.name(); + auto bucketName = connectionContent.setting().object_storage().bucket(); + + return fmt::format( + R"( + {upsert_object}; + CREATE EXTERNAL DATA SOURCE {external_source} WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}" + {auth_params} + ); + )", + "external_source"_a = EncloseAndEscapeString(sourceName, '`'), + "location"_a = objectStorageEndpoint + "/" + EscapeString(bucketName, '"') + "/", + "upsert_object"_a = + CreateSecretObjectQuery(connectionContent.setting().object_storage().auth(), + connectionContent.name(), + signer), + "auth_params"_a = + CreateAuthParamsQuery(connectionContent.setting().object_storage().auth(), + connectionContent.name(), + signer)); +} + +TString DropSecretObjectQuery(const FederatedQuery::IamAuth& auth, + const TString& name, + const TSigner::TPtr& signer) { + using namespace fmt::literals; + switch (auth.identity_case()) { + case FederatedQuery::IamAuth::kServiceAccount: { + if (!signer) { + return {}; + } + return fmt::format("DROP OBJECT {secret_name} (TYPE SECRET);", + "secret_name"_a = + EncloseAndEscapeString(name, '`')); + } + case FederatedQuery::IamAuth::kNone: + case FederatedQuery::IamAuth::kCurrentIam: + // Do not replace with default. Adding a new auth item should cause a compilation error + case FederatedQuery::IamAuth::IDENTITY_NOT_SET: + return {}; + } +} + +TString MakeDeleteExternalDataSourceQuery( + const FederatedQuery::ConnectionContent& connectionContent, + const TSigner::TPtr& signer) { + using namespace fmt::literals; + return fmt::format( + R"( + {drop_secret_statement}; + DROP EXTERNAL DATA SOURCE {external_source}; + )", + "drop_secret_statement"_a = + DropSecretObjectQuery(connectionContent.setting().object_storage().auth(), + connectionContent.name(), + signer), + "external_source"_a = EncloseAndEscapeString(connectionContent.name(), '`')); +} + +TString MakeDeleteExternalDataTableQuery(const TString& tableName) { + using namespace fmt::literals; + return fmt::format("DROP EXTERNAL TABLE {external_table};", + "external_table"_a = EncloseAndEscapeString(tableName, '`')); +} + +} // namespace NPrivate +} // namespace NFq diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h new file mode 100644 index 00000000000..b5824ce6852 --- /dev/null +++ b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h @@ -0,0 +1,29 @@ +#pragma once + +#include <util/generic/string.h> +#include <ydb/core/fq/libs/control_plane_storage/events/events.h> +#include <ydb/core/fq/libs/signer/signer.h> + +namespace NFq { +namespace NPrivate { + +TString EscapeString(const TString& value, char enclosingChar); + +TString EncloseAndEscapeString(const TString& value, char enclosingChar); + +TString MakeCreateExternalDataTableQuery(const FederatedQuery::BindingContent& content, + const TString& connectionName); + +TString MakeCreateExternalDataSourceQuery( + const FederatedQuery::ConnectionContent& connectionContent, + const TString& objectStorageEndpoint, + const TSigner::TPtr& signer); + +TString MakeDeleteExternalDataSourceQuery( + const FederatedQuery::ConnectionContent& connectionContent, + const TSigner::TPtr& signer); + +TString MakeDeleteExternalDataTableQuery(const TString& tableName); + +} // namespace NPrivate +} // namespace NFq diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ya.make b/ydb/core/fq/libs/control_plane_proxy/actors/ya.make new file mode 100644 index 00000000000..212a8d0f3c4 --- /dev/null +++ b/ydb/core/fq/libs/control_plane_proxy/actors/ya.make @@ -0,0 +1,21 @@ +LIBRARY() + +SRCS( + control_plane_storage_requester_actor.cpp + query_utils.cpp + ydb_schema_query_actor.cpp +) + +PEERDIR( + contrib/libs/fmt + library/cpp/iterator + ydb/core/fq/libs/control_plane_proxy/events + ydb/core/fq/libs/control_plane_storage/events + ydb/core/fq/libs/result_formatter + ydb/core/kqp/provider + ydb/library/db_pool/protos +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_create_compute_session_actor.h b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_create_compute_session_actor.h new file mode 100644 index 00000000000..3345263e6df --- /dev/null +++ b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_create_compute_session_actor.h @@ -0,0 +1,181 @@ +#pragma once + +#include <ydb/core/fq/libs/actors/logging/log.h> +#include <ydb/core/fq/libs/compute/common/config.h> +#include <ydb/core/fq/libs/config/yq_issue.h> +#include <ydb/core/fq/libs/control_plane_proxy/actors/base_actor.h> +#include <ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h> +#include <ydb/core/fq/libs/control_plane_proxy/actors/counters.h> +#include <ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h> +#include <ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h> +#include <ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.h> +#include <ydb/core/fq/libs/control_plane_proxy/events/events.h> +#include <ydb/core/fq/libs/control_plane_storage/events/events.h> +#include <ydb/core/fq/libs/ydb/ydb.h> +#include <ydb/library/yql/public/issue/yql_issue.h> +#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h> + +#include <contrib/libs/fmt/include/fmt/format.h> +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/lwtrace/mon/mon_lwtrace.h> + +#include <util/generic/maybe.h> +#include <util/generic/ptr.h> +#include <util/string/join.h> +#include <util/system/types.h> + +namespace NFq { +namespace NPrivate { + +using namespace NActors; +using namespace NFq::NConfig; +using namespace NKikimr; +using namespace NThreading; +using namespace NYdb; +using namespace NYdb::NTable; + +using TTableClientPtr = std::shared_ptr<NYdb::NTable::TTableClient>; + +struct TEvPrivate { + enum EEv { + EvCreateSessionResponse = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), + EvEnd + }; + + struct TEvCreateSessionResponse : + NActors::TEventLocal<TEvCreateSessionResponse, EvCreateSessionResponse> { + TCreateSessionResult Result; + + TEvCreateSessionResponse(TCreateSessionResult result) + : Result(std::move(result)) { } + }; + + static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), + "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)"); +}; + +template<typename TEventRequest, typename TEventResponse> +class TCreateYdbComputeSessionActor : + public TBaseActor<TCreateYdbComputeSessionActor<TEventRequest, TEventResponse>> { +private: + using TBase = TBaseActor<TCreateYdbComputeSessionActor<TEventRequest, TEventResponse>>; + using TBase::SelfId; + using TBase::SendRequestToSender; + using TBase::Request; + using TEventRequestPtr = typename TEventRequest::TPtr; + +public: + TCreateYdbComputeSessionActor( + const TActorId& sender, + const TEventRequestPtr request, + TDuration requestTimeout, + const NPrivate::TRequestCommonCountersPtr& counters, + const NFq::TComputeConfig& computeConfig, + const TYqSharedResources::TPtr& yqSharedResources, + const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory) + : TBaseActor<TCreateYdbComputeSessionActor<TEventRequest, TEventResponse>>( + sender, std::move(request), requestTimeout, counters) + , ComputeConfig(computeConfig) + , YqSharedResources(yqSharedResources) + , CredentialsProviderFactory(credentialsProviderFactory) { } + + static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_DELETE_CONNECTION_IN_YDB"; + + void BootstrapImpl() override { + InitiateConnectionCreation(); + } + + STRICT_STFUNC(StateFunc, + cFunc(NActors::TEvents::TSystem::Wakeup, TBase::HandleTimeout); + hFunc(TEvPrivate::TEvCreateSessionResponse, Handle); + ) + + void InitiateConnectionCreation() { + CPP_LOG_D("TCreateYdbComputeSessionActor InitiateConnectionCreation called. Actor id: " + << SelfId()); + auto tableClient = CreateNewTableClient(ComputeConfig, + YqSharedResources, + CredentialsProviderFactory); + tableClient->CreateSession().Subscribe( + [actorSystem = NActors::TActivationContext::ActorSystem(), + self = SelfId()](const TAsyncCreateSessionResult& future) { + actorSystem->Send(self, + new TEvPrivate::TEvCreateSessionResponse( + future.GetValueSync())); + }); + } + + void Handle(TEvPrivate::TEvCreateSessionResponse::TPtr& event) { + CPP_LOG_D("TCreateYdbComputeSessionActor " + "Handle for TEvCreateSessionResponse called. Actor id: " + << SelfId()); + const auto& createSessionResult = event->Get()->Result; + if (!createSessionResult.IsSuccess()) { + TBase::HandleError("Couldn't create YDB session", + createSessionResult.GetStatus(), + createSessionResult.GetIssues()); + return; + } + + CPP_LOG_D("TCreateYdbComputeSessionActor Session was successfully acquired. Actor id: " + << SelfId()); + Request->Get()->YDBSession = createSessionResult.GetSession(); + SendRequestToSender(); + } + +private: + const NFq::TComputeConfig ComputeConfig; + const TYqSharedResources::TPtr YqSharedResources; + const NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory; + + TTableClientPtr CreateNewTableClient( + const NFq::TComputeConfig& computeConfig, + const TYqSharedResources::TPtr& yqSharedResources, + const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory) { + + auto scope = "yandexcloud://" + Request->Get()->FolderId; + NFq::NConfig::TYdbStorageConfig computeConnection = + computeConfig.GetConnection(scope); + computeConnection.set_endpoint( + Request->Get()->ComputeDatabase->connection().endpoint()); + computeConnection.set_database( + Request->Get()->ComputeDatabase->connection().database()); + computeConnection.set_usessl( + Request->Get()->ComputeDatabase->connection().usessl()); + + auto tableSettings = + GetClientSettings<NYdb::NTable::TClientSettings>(computeConnection, + credentialsProviderFactory); + return std::make_shared<NYdb::NTable::TTableClient>( + yqSharedResources->UserSpaceYdbDriver, tableSettings); + } +}; + +template<typename TEventRequest, typename TEventResponse> +struct TBaseActorTypeTag<TCreateYdbComputeSessionActor<TEventRequest, TEventResponse>> { + using TRequest = TEventRequest; + using TResponse = TEventResponse; +}; + +template<typename TEventRequest, typename TEventResponse> +NActors::IActor* MakeComputeYDBSessionActor( + const NActors::TActorId sender, + const typename TEventRequest::TPtr request, + TDuration requestTimeout, + const NPrivate::TRequestCommonCountersPtr& counters, + const NFq::TComputeConfig& computeConfig, + const TYqSharedResources::TPtr& yqSharedResources, + const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory) { + return new NPrivate::TCreateYdbComputeSessionActor<TEventRequest, TEventResponse>( + sender, + std::move(request), + requestTimeout, + counters, + computeConfig, + yqSharedResources, + credentialsProviderFactory); +} + +} // namespace NPrivate +} // namespace NFq diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp new file mode 100644 index 00000000000..dd5e2a634b6 --- /dev/null +++ b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp @@ -0,0 +1,328 @@ +#include "base_actor.h" +#include "query_utils.h" + +#include <contrib/libs/fmt/include/fmt/format.h> +#include <ydb/core/fq/libs/control_plane_proxy/events/events.h> +#include <ydb/public/sdk/cpp/client/ydb_table/table.h> + +namespace NFq { +namespace NPrivate { + +using namespace NActors; +using namespace NFq::NConfig; +using namespace NKikimr; +using namespace NThreading; +using namespace NYdb; +using namespace NYdb::NTable; + +template<class TEventRequest, class TEventResponse> +class TSchemaQueryYDBActor; + +template<class TEventRequest, class TEventResponse> +struct TBaseActorTypeTag<TSchemaQueryYDBActor<TEventRequest, TEventResponse>> { + using TRequest = TEventRequest; + using TResponse = TEventResponse; +}; + +template<class TEventRequest, class TEventResponse> +class TSchemaQueryYDBActor : + public TBaseActor<TSchemaQueryYDBActor<TEventRequest, TEventResponse>> { +private: + struct TEvPrivate { + enum EEv { + EvQueryExecutionResponse = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), + "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)"); + + struct TEvQueryExecutionResponse : + NActors::TEventLocal<TEvQueryExecutionResponse, EvQueryExecutionResponse> { + TStatus Result; + + TEvQueryExecutionResponse(TStatus result) + : Result(std::move(result)) { } + }; + }; + + using TBase = TBaseActor<TSchemaQueryYDBActor<TEventRequest, TEventResponse>>; + using TBase::SelfId; + using TBase::Request; + + using TEventRequestPtr = typename TEventRequest::TPtr; + +public: + using TQueryFactoryMethod = std::function<TString(const TEventRequestPtr& request)>; + using TErrorMessageFactoryMethod = std::function<TString(const TStatus& status)>; + + TSchemaQueryYDBActor(const TActorId& sender, + const TEventRequestPtr request, + TDuration requestTimeout, + const NPrivate::TRequestCommonCountersPtr& counters, + TQueryFactoryMethod queryFactoryMethod, + TErrorMessageFactoryMethod errorMessageFactoryMethod) + : TBaseActor<TSchemaQueryYDBActor<TEventRequest, TEventResponse>>( + sender, std::move(request), requestTimeout, counters) + , QueryFactoryMethod(queryFactoryMethod) + , ErrorMessageFactoryMethod(errorMessageFactoryMethod) { } + + static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_DELETE_CONNECTION_IN_YDB"; + + void BootstrapImpl() override { + CPP_LOG_I("TSchemaQueryYDBActor BootstrapImpl. Actor id: " << TBase::SelfId()); + InitiateSchemaQueryExecution(); + } + + void InitiateSchemaQueryExecution() { + CPP_LOG_I("TSchemaQueryYDBActor Executing schema query. Actor id: " << TBase::SelfId()); + + const auto& request = Request; + TString schemeQuery = QueryFactoryMethod(request); + request->Get() + ->YDBSession->ExecuteSchemeQuery(schemeQuery) + .Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(), + self = SelfId()](const TAsyncStatus& future) { + actorSystem->Send(self, + new typename TEvPrivate::TEvQueryExecutionResponse( + std::move(future.GetValueSync()))); + }); + } + + STRICT_STFUNC(StateFunc, + cFunc(NActors::TEvents::TSystem::Wakeup, TBase::HandleTimeout); + hFunc(TEvPrivate::TEvQueryExecutionResponse, Handle); + ) + + void Handle(typename TEvPrivate::TEvQueryExecutionResponse::TPtr& event) { + CPP_LOG_I("TSchemaQueryYDBActor Handling query execution response. Actor id: " << TBase::SelfId()); + const auto& executeSchemeQueryStatus = event->Get()->Result; + if (!executeSchemeQueryStatus.IsSuccess()) { + CPP_LOG_I("TSchemaQueryYDBActor Handling query execution response. Query finished with issues. Actor id: " << TBase::SelfId()); + TString errorMessage = ErrorMessageFactoryMethod(executeSchemeQueryStatus); + + TBase::HandleError(errorMessage, + executeSchemeQueryStatus.GetStatus(), + executeSchemeQueryStatus.GetIssues()); + return; + } + + CPP_LOG_I("TSchemaQueryYDBActor Handling query execution response. Query finished successfully. Actor id: " << TBase::SelfId()); + Request->Get()->ComputeYDBOperationWasPerformed = true; + TBase::SendRequestToSender(); + } + +private: + TQueryFactoryMethod QueryFactoryMethod; + TErrorMessageFactoryMethod ErrorMessageFactoryMethod; +}; + +/// Connection actors +NActors::IActor* MakeCreateConnectionActor( + const TActorId& sender, + TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr request, + TDuration requestTimeout, + TCounters& counters, + const TString& objectStorageEndpoint, + TSigner::TPtr signer) { + auto queryFactoryMethod = + [objectStorageEndpoint, signer = std::move(signer)]( + const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request) -> TString { + using namespace fmt::literals; + + return MakeCreateExternalDataSourceQuery(request->Get()->Request.content(), + objectStorageEndpoint, + signer); + }; + + auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString { + if (queryStatus.GetStatus() == NYdb::EStatus::ALREADY_EXISTS) { + return "External data source with such name already exists"; + } else { + return "Couldn't create external data source in YDB"; + } + }; + + return new TSchemaQueryYDBActor<TEvControlPlaneProxy::TEvCreateConnectionRequest, + TEvControlPlaneProxy::TEvCreateConnectionResponse>( + sender, + std::move(request), + requestTimeout, + counters.GetCommonCounters(RTC_CREATE_CONNECTION_IN_YDB), + queryFactoryMethod, + errorMessageFactoryMethod); +} + +NActors::IActor* MakeModifyConnectionActor( + const TActorId& sender, + TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr request, + TDuration requestTimeout, + TCounters& counters, + const TString& objectStorageEndpoint, + TSigner::TPtr signer) { + auto queryFactoryMethod = + [objectStorageEndpoint, signer = std::move(signer)]( + const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& request) -> TString { + using namespace fmt::literals; + + auto& oldConnectionContent = (*request->Get()->OldConnectionContent); + auto& newConnectionContent = request->Get()->Request.content(); + return fmt::format( + R"( + {delete_external_data_source}; + {create_external_data_source}; + )", + "delete_external_data_source"_a = + MakeDeleteExternalDataSourceQuery(oldConnectionContent, signer), + "create_external_data_source"_a = MakeCreateExternalDataSourceQuery( + newConnectionContent, objectStorageEndpoint, signer)); + }; + + auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString { + Y_UNUSED(queryStatus); + return "Couldn't modify external data source in YDB"; + }; + + return new TSchemaQueryYDBActor<TEvControlPlaneProxy::TEvModifyConnectionRequest, + TEvControlPlaneProxy::TEvModifyConnectionResponse>( + sender, + std::move(request), + requestTimeout, + counters.GetCommonCounters(RTC_MODIFY_CONNECTION_IN_YDB), + queryFactoryMethod, + errorMessageFactoryMethod); +} + +NActors::IActor* MakeDeleteConnectionActor( + const TActorId& sender, + TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr request, + TDuration requestTimeout, + TCounters& counters, + TSigner::TPtr signer) { + auto queryFactoryMethod = + [signer = std::move(signer)]( + const TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr& request) -> TString { + using namespace fmt::literals; + return MakeDeleteExternalDataSourceQuery(*request->Get()->ConnectionContent, + signer); + }; + + auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString { + if (queryStatus.GetStatus() == NYdb::EStatus::ALREADY_EXISTS) { + return "External data source with such name already exists"; + } else { + return "Couldn't delete external data source in YDB"; + } + }; + + return new TSchemaQueryYDBActor<TEvControlPlaneProxy::TEvDeleteConnectionRequest, + TEvControlPlaneProxy::TEvDeleteConnectionResponse>( + sender, + std::move(request), + requestTimeout, + counters.GetCommonCounters(RTC_DELETE_CONNECTION_IN_YDB), + queryFactoryMethod, + errorMessageFactoryMethod); +} + +/// Bindings actors +NActors::IActor* MakeCreateBindingActor( + const TActorId& sender, + TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr request, + TDuration requestTimeout, + TCounters& counters) { + auto queryFactoryMethod = + [](const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& request) -> TString { + using namespace fmt::literals; + + auto externalSourceName = *request->Get()->ConnectionName; + return MakeCreateExternalDataTableQuery(request->Get()->Request.content(), + externalSourceName); + }; + + auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString { + if (queryStatus.GetStatus() == NYdb::EStatus::ALREADY_EXISTS) { + return "External data table with such name already exists"; + } else { + return "Couldn't create external data table in YDB"; + } + }; + + return new TSchemaQueryYDBActor<TEvControlPlaneProxy::TEvCreateBindingRequest, + TEvControlPlaneProxy::TEvCreateBindingResponse>( + sender, + std::move(request), + requestTimeout, + counters.GetCommonCounters(RTC_CREATE_BINDING_IN_YDB), + queryFactoryMethod, + errorMessageFactoryMethod); +} + +NActors::IActor* MakeModifyBindingActor( + const TActorId& sender, + TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr request, + TDuration requestTimeout, + TCounters& counters) { + auto queryFactoryMethod = + [](const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& request) -> TString { + using namespace fmt::literals; + + auto sourceName = *request->Get()->ConnectionName; + auto oldTableName = *request->Get()->OldBindingName; + return fmt::format( + R"( + {delete_external_data_table}; + {create_external_data_table}; + )", + "delete_external_data_table"_a = MakeDeleteExternalDataTableQuery(oldTableName), + "create_external_data_table"_a = + MakeCreateExternalDataTableQuery(request->Get()->Request.content(), + sourceName)); + }; + + auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString { + Y_UNUSED(queryStatus); + return "Couldn't modify external data table in YDB"; + }; + + return new TSchemaQueryYDBActor<TEvControlPlaneProxy::TEvModifyBindingRequest, + TEvControlPlaneProxy::TEvModifyBindingResponse>( + sender, + std::move(request), + requestTimeout, + counters.GetCommonCounters(RTC_MODIFY_BINDING_IN_YDB), + queryFactoryMethod, + errorMessageFactoryMethod); +} + +NActors::IActor* MakeDeleteBindingActor( + const TActorId& sender, + TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr request, + TDuration requestTimeout, + TCounters& counters) { + auto queryFactoryMethod = + [](const TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr& request) -> TString { + using namespace fmt::literals; + return MakeDeleteExternalDataTableQuery(*request->Get()->OldBindingName); + }; + + auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString { + if (queryStatus.GetStatus() == NYdb::EStatus::ALREADY_EXISTS) { + return "External data source with such name already exists"; + } else { + return "Couldn't delete external data source in YDB"; + } + }; + + return new TSchemaQueryYDBActor<TEvControlPlaneProxy::TEvDeleteBindingRequest, + TEvControlPlaneProxy::TEvDeleteBindingResponse>( + sender, + std::move(request), + requestTimeout, + counters.GetCommonCounters(RTC_DELETE_BINDING_IN_YDB), + queryFactoryMethod, + errorMessageFactoryMethod); +} + +} // namespace NPrivate +} // namespace NFq diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h new file mode 100644 index 00000000000..7ba6a06038c --- /dev/null +++ b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h @@ -0,0 +1,58 @@ +#pragma once + +#include "counters.h" + +#include <library/cpp/actors/core/actor.h> +#include <ydb/core/fq/libs/control_plane_proxy/events/events.h> +#include <ydb/core/fq/libs/signer/signer.h> + +namespace NFq { +namespace NPrivate { + +using namespace NActors; + +/// Connection manipulation actors +NActors::IActor* MakeCreateConnectionActor( + const TActorId& sender, + TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr request, + TDuration requestTimeout, + TCounters& counters, + const TString& objectStorageEndpoint, + TSigner::TPtr signer); + +NActors::IActor* MakeModifyConnectionActor( + const TActorId& sender, + TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr request, + TDuration requestTimeout, + TCounters& counters, + const TString& objectStorageEndpoint, + TSigner::TPtr signer); + +NActors::IActor* MakeDeleteConnectionActor( + const TActorId& sender, + TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr request, + TDuration requestTimeout, + TCounters& counters, + TSigner::TPtr signer); + +/// Binding manipulation actors +NActors::IActor* MakeCreateBindingActor( + const TActorId& sender, + TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr request, + TDuration requestTimeout, + TCounters& counters); + +NActors::IActor* MakeModifyBindingActor( + const TActorId& sender, + TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr request, + TDuration requestTimeout, + TCounters& counters); + +NActors::IActor* MakeDeleteBindingActor( + const TActorId& sender, + TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr request, + TDuration requestTimeout, + TCounters& counters); + +} // namespace NPrivate +} // namespace NFq diff --git a/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp b/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp index 6f5f1ce0a46..c81e34bee9d 100644 --- a/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp +++ b/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp @@ -4,30 +4,26 @@ #include "utils.h" #include <ydb/core/fq/libs/actors/logging/log.h> -#include <ydb/core/fq/libs/common/cache.h> #include <ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h> #include <ydb/core/fq/libs/compute/ydb/events/events.h> #include <ydb/core/fq/libs/control_plane_config/control_plane_config.h> #include <ydb/core/fq/libs/control_plane_storage/control_plane_storage.h> #include <ydb/core/fq/libs/control_plane_storage/events/events.h> -#include <ydb/core/fq/libs/control_plane_storage/util.h> #include <ydb/core/fq/libs/quota_manager/quota_manager.h> #include <ydb/core/fq/libs/rate_limiter/events/control_plane_events.h> #include <ydb/core/fq/libs/result_formatter/result_formatter.h> #include <ydb/core/fq/libs/test_connection/events/events.h> #include <ydb/core/fq/libs/test_connection/test_connection.h> -#include <ydb/core/fq/libs/ydb/util.h> #include <ydb/core/fq/libs/ydb/ydb.h> #include <ydb/core/fq/libs/config/yq_issue.h> +#include <ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h> +#include <ydb/core/fq/libs/control_plane_proxy/actors/ydb_create_compute_session_actor.h> +#include <ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h> #include <ydb/core/fq/libs/control_plane_proxy/events/events.h> -#include <ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.h> -#include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/actor.h> - -#include <ydb/library/ydb_issue/issue_helpers.h> -#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> #include <ydb/library/ycloud/api/access_service.h> #include <ydb/library/ycloud/impl/access_service.h> @@ -61,137 +57,10 @@ using namespace NKikimr; using namespace NThreading; using namespace NYdb; using namespace NYdb::NTable; +using namespace NFq::NPrivate; LWTRACE_USING(YQ_CONTROL_PLANE_PROXY_PROVIDER); -struct TRequestScopeCounters: public virtual TThrRefBase { - const TString Name; - - ::NMonitoring::TDynamicCounterPtr Counters; - ::NMonitoring::TDynamicCounters::TCounterPtr InFly; - ::NMonitoring::TDynamicCounters::TCounterPtr Ok; - ::NMonitoring::TDynamicCounters::TCounterPtr Error; - ::NMonitoring::TDynamicCounters::TCounterPtr Timeout; - ::NMonitoring::TDynamicCounters::TCounterPtr Retry; - - explicit TRequestScopeCounters(const TString& name) - : Name(name) - { } - - void Register(const ::NMonitoring::TDynamicCounterPtr& counters) { - Counters = counters; - ::NMonitoring::TDynamicCounterPtr subgroup = counters->GetSubgroup("request_scope", Name); - InFly = subgroup->GetCounter("InFly", false); - Ok = subgroup->GetCounter("Ok", true); - Error = subgroup->GetCounter("Error", true); - Timeout = subgroup->GetCounter("Timeout", true); - Timeout = subgroup->GetCounter("Retry", true); - } - - virtual ~TRequestScopeCounters() override { - Counters->RemoveSubgroup("request_scope", Name); - } - -private: - static ::NMonitoring::IHistogramCollectorPtr GetLatencyHistogramBuckets() { - return ::NMonitoring::ExplicitHistogram({0, 1, 2, 5, 10, 20, 50, 100, 500, 1000, 2000, 5000, 10000, 30000, 50000, 500000}); - } -}; - -struct TRequestCommonCounters: public virtual TThrRefBase { - const TString Name; - - ::NMonitoring::TDynamicCounterPtr Counters; - ::NMonitoring::TDynamicCounters::TCounterPtr InFly; - ::NMonitoring::TDynamicCounters::TCounterPtr Ok; - ::NMonitoring::TDynamicCounters::TCounterPtr Error; - ::NMonitoring::TDynamicCounters::TCounterPtr Timeout; - ::NMonitoring::TDynamicCounters::TCounterPtr Retry; - ::NMonitoring::THistogramPtr LatencyMs; - - explicit TRequestCommonCounters(const TString& name) - : Name(name) - { } - - void Register(const ::NMonitoring::TDynamicCounterPtr& counters) { - Counters = counters; - ::NMonitoring::TDynamicCounterPtr subgroup = counters->GetSubgroup("request_common", Name); - InFly = subgroup->GetCounter("InFly", false); - Ok = subgroup->GetCounter("Ok", true); - Error = subgroup->GetCounter("Error", true); - Timeout = subgroup->GetCounter("Timeout", true); - Retry = subgroup->GetCounter("Retry", true); - LatencyMs = subgroup->GetHistogram("LatencyMs", GetLatencyHistogramBuckets()); - } - - virtual ~TRequestCommonCounters() override { - Counters->RemoveSubgroup("request_common", Name); - } - -private: - static ::NMonitoring::IHistogramCollectorPtr GetLatencyHistogramBuckets() { - return ::NMonitoring::ExplicitHistogram({0, 1, 2, 5, 10, 20, 50, 100, 500, 1000, 2000, 5000, 10000, 30000, 50000, 500000}); - } -}; - -using TRequestScopeCountersPtr = TIntrusivePtr<TRequestScopeCounters>; -using TRequestCommonCountersPtr = TIntrusivePtr<TRequestCommonCounters>; - -struct TRequestCounters { - TRequestScopeCountersPtr Scope; - TRequestCommonCountersPtr Common; - - void IncInFly() { - Scope->InFly->Inc(); - Common->InFly->Inc(); - } - - void DecInFly() { - Scope->InFly->Dec(); - Common->InFly->Dec(); - } - - void IncOk() { - Scope->Ok->Inc(); - Common->Ok->Inc(); - } - - void DecOk() { - Scope->Ok->Dec(); - Common->Ok->Dec(); - } - - void IncError() { - Scope->Error->Inc(); - Common->Error->Inc(); - } - - void DecError() { - Scope->Error->Dec(); - Common->Error->Dec(); - } - - void IncTimeout() { - Scope->Timeout->Inc(); - Common->Timeout->Inc(); - } - - void DecTimeout() { - Scope->Timeout->Dec(); - Common->Timeout->Dec(); - } - - void IncRetry() { - Scope->Retry->Inc(); - Common->Retry->Inc(); - } - - void DecRetry() { - Scope->Retry->Dec(); - Common->Retry->Dec(); - } -}; - template<class TEventRequest, class TResponseProxy> class TGetQuotaActor : public NActors::TActorBootstrapped<TGetQuotaActor<TEventRequest, TResponseProxy>> { using TBase = NActors::TActorBootstrapped<TGetQuotaActor<TEventRequest, TResponseProxy>>; @@ -346,7 +215,6 @@ public: PassAway(); } - private: static TString GetSubjectType(const yandex::cloud::priv::servicecontrol::v1::Subject& subject) { switch (subject.type_case()) { @@ -370,671 +238,6 @@ private: } }; -TString EscapeString(const TString& value, char enclosingChar) { - auto escapedValue = value; - SubstGlobal( - escapedValue, TString{enclosingChar}, TStringBuilder{} << '\\' << enclosingChar); - return escapedValue; -} - -TString EncloseAndEscapeString(const TString& value, char enclosingChar) { - return TStringBuilder{} << enclosingChar << EscapeString(value, enclosingChar) << enclosingChar; -} - -class TCreateConnectionInYDBActor : - public NActors::TActorBootstrapped<TCreateConnectionInYDBActor> { - struct TEvPrivate { - enum EEv { - EvCreateSessionResponse = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), - EvCreateConnectionExecutionResponse, - EvEnd - }; - - static_assert( - EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), - "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)"); - - struct TEvCreateSessionResponse : - NActors::TEventLocal<TEvCreateSessionResponse, EvCreateSessionResponse> { - TAsyncCreateSessionResult Result; - - TEvCreateSessionResponse(TAsyncCreateSessionResult result) - : Result(std::move(result)) { } - }; - - struct TEvCreateConnectionExecutionResponse : - NActors::TEventLocal<TEvCreateConnectionExecutionResponse, EvCreateConnectionExecutionResponse> { - TAsyncStatus Result; - - TEvCreateConnectionExecutionResponse(TAsyncStatus result) - : Result(std::move(result)) { } - }; - }; - - using TBase = NActors::TActorBootstrapped<TCreateConnectionInYDBActor>; - using TBase::Become; - using TBase::PassAway; - using TBase::Register; - using TBase::SelfId; - using TBase::Send; - using IRetryPolicy = - IRetryPolicy<NCloud::TEvAccessService::TEvAuthenticateResponse::TPtr&>; - using TTableClientPtr = std::unique_ptr<NYdb::NTable::TTableClient>; - - using TEventRequest = TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr; - using TResponce = TEvControlPlaneProxy::TEvCreateConnectionResponse; - - TActorId Sender; - TRequestCommonCountersPtr Counters; - TEventRequest Event; - ui32 Cookie; - TDuration RequestTimeout; - TInstant StartTime; - TString Scope; - TTableClientPtr TableClient; - TString ObjectStorageEndpoint; - NFq::TSigner::TPtr Signer; - -public: - TCreateConnectionInYDBActor( - const TRequestCommonCountersPtr& counters, - const NFq::TComputeConfig& computeConfig, - const TYqSharedResources::TPtr& yqSharedResources, - const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory, - const TString& objectStorageEndpoint, - TActorId sender, - TEventRequest event, - ui32 cookie, - const TString& scope, - const NFq::TSigner::TPtr& signer, - TDuration requestTimeout) - : Sender(sender) - , Counters(counters) - , Event(event) - , Cookie(cookie) - , RequestTimeout(requestTimeout) - , StartTime(TInstant::Now()) - , Scope(scope) - , TableClient(CreateNewTableClient( - computeConfig, yqSharedResources, credentialsProviderFactory)) - , ObjectStorageEndpoint(objectStorageEndpoint) - , Signer(signer) { } - - static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_CREATE_CONNECTION_IN_YDB"; - - void Bootstrap() { - CPP_LOG_T("Create connection in YDB. Actor id: " << SelfId()); - - if (auto issues = ValidateRequest(Event->Get()->Request); !issues.Empty()) { - NYql::TIssue issue = MakeErrorIssue( - TIssuesIds::INTERNAL_ERROR, "CreateConnectionRequest is not valid"); - for (auto& subIssue : issues) { - issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue)); - } - SendErrorMessageToSender(std::move(issue)); - return; - } - - Become( - &TCreateConnectionInYDBActor::StateFunc, - RequestTimeout, - new NActors::TEvents::TEvWakeup()); - Counters->InFly->Inc(); - InitiateConnectionCreation(); - } - - NYql::TIssues ValidateRequest(const FederatedQuery::CreateConnectionRequest& request) { - NYql::TIssues issues; - if (request.content().name().Contains('/')) { - issues.AddIssue(MakeErrorIssue( - TIssuesIds::INTERNAL_ERROR, "'/' is not allowed in connection name")); - } - return issues; - } - - void InitiateConnectionCreation() { - auto request = Event->Get()->Request; - TableClient->CreateSession().Subscribe( - [actorSystem = NActors::TActivationContext::ActorSystem(), - self = SelfId()](const TAsyncCreateSessionResult& future) { - actorSystem->Send( - self, new TEvPrivate::TEvCreateSessionResponse(std::move(future))); - }); - } - - void Handle(TEvPrivate::TEvCreateSessionResponse::TPtr& event) { - using namespace fmt::literals; - auto createSessionResult = event->Get()->Result.GetValueSync(); - if (!createSessionResult.IsSuccess()) { - TString errorMessage = TStringBuilder{} - << "Couldn't create YDB session. Status" - << createSessionResult.GetStatus(); - CPP_LOG_E(errorMessage); - - NYql::TIssue issue = - MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, "Couldn't create YDB session"); - for (auto& subIssue : createSessionResult.GetIssues()) { - issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue)); - } - SendErrorMessageToSender(std::move(issue)); - return; - } - - auto request = Event->Get()->Request; - auto session = createSessionResult.GetSession(); - auto bucketName = request.content().setting().object_storage().bucket(); - SubstGlobal(bucketName, TString{'"'}, "\\\""); - session - .ExecuteSchemeQuery(fmt::format( - R"( - {upsert_object}; - CREATE EXTERNAL DATA SOURCE {external_source} WITH ( - SOURCE_TYPE="ObjectStorage", - LOCATION="{location}" - {auth_params} - ); - )", - "external_source"_a = EncloseAndEscapeString(request.content().name(), '`'), - "location"_a = ObjectStorageEndpoint + "/" + bucketName + "/", - "upsert_object"_a = CreateSecretObjectQuery(request.content().setting().object_storage().auth(), request.content().name()), - "auth_params"_a = CreateAuthParamsQuery(request.content().setting().object_storage().auth(), request.content().name()))) - .Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(), - self = SelfId()](const TAsyncStatus& future) { - actorSystem->Send( - self, - new TEvPrivate::TEvCreateConnectionExecutionResponse(std::move(future))); - }); - } - - TString CreateSecretObjectQuery(const FederatedQuery::IamAuth& auth, const TString& name) const { - using namespace fmt::literals; - switch (auth.identity_case()) { - case FederatedQuery::IamAuth::kServiceAccount: { - if (!Signer) { - return {}; - } - return fmt::format(R"( - UPSERT OBJECT {external_source} (TYPE SECRET) WITH value={signature}; - )", - "external_source"_a = EncloseAndEscapeString(name, '`'), - "signature"_a = EncloseAndEscapeString(SignAccountId(auth.service_account().id()), '`')); - } - case FederatedQuery::IamAuth::kNone: - case FederatedQuery::IamAuth::kCurrentIam: - // Do not replace with default. Adding a new auth item should cause a compilation error - case FederatedQuery::IamAuth::IDENTITY_NOT_SET: - return {}; - } - } - - TString CreateAuthParamsQuery(const FederatedQuery::IamAuth& auth, const TString& name) const { - using namespace fmt::literals; - switch (auth.identity_case()) { - case FederatedQuery::IamAuth::kNone: - return R"(, AUTH_METHOD="NONE")"; - case FederatedQuery::IamAuth::kServiceAccount: - return fmt::format(R"(, - AUTH_METHOD="SERVICE_ACCOUNT", - SERVICE_ACCOUNT_ID={service_account_id}, - SERVICE_ACCOUNT_SECRET_NAME={secret_name} - )", - "service_account_id"_a = EncloseAndEscapeString(auth.service_account().id(), '"'), - "external_source"_a = EncloseAndEscapeString(name, '"'), - "secret_name"_a = EncloseAndEscapeString(Signer ? name : TString{}, '"')); - case FederatedQuery::IamAuth::kCurrentIam: - // Do not replace with default. Adding a new auth item should cause a compilation error - case FederatedQuery::IamAuth::IDENTITY_NOT_SET: - return {}; - } - } - - TString SignAccountId(const TString& id) const { - return Signer ? Signer->SignAccountId(id) : TString{}; - } - - void Handle(TEvPrivate::TEvCreateConnectionExecutionResponse::TPtr& event) { - Counters->InFly->Dec(); - Counters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds()); - - const auto& status = event->Get()->Result.GetValueSync(); - if (!status.IsSuccess()) { - TString errorMessage; - if (status.GetStatus() == NYdb::EStatus::ALREADY_EXISTS) { - errorMessage = "External data source with such name already exists"; - } else { - errorMessage = TStringBuilder{} - << "Couldn't create external data source in YDB. Status" - << status.GetStatus(); - } - - CPP_LOG_E(errorMessage); - - NYql::TIssue issue = MakeErrorIssue( - TIssuesIds::INTERNAL_ERROR, "Couldn't create external data source in YDB"); - for (auto& subIssue : status.GetIssues()) { - issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue)); - } - SendErrorMessageToSender(std::move(issue)); - return; - } - - CPP_LOG_T("External data source in YDB was successfully created"); - Counters->Ok->Inc(); - Event->Get()->ComputeYDBOperationWasPerformed = true; - - TActivationContext::Send(Event->Forward(ControlPlaneProxyActorId())); - PassAway(); - } - - void PassAway() override { TActor::PassAway(); } - - STRICT_STFUNC(StateFunc, cFunc(NActors::TEvents::TSystem::Wakeup, HandleTimeout); - hFunc(TEvPrivate::TEvCreateSessionResponse, Handle); - hFunc(TEvPrivate::TEvCreateConnectionExecutionResponse, Handle);) - - void HandleTimeout() { - CPP_LOG_D( - "Timeout occurred while creating external data source in YDB. Actor id: " - << SelfId()); - Counters->Timeout->Inc(); - SendErrorMessageToSender(MakeErrorIssue( - TIssuesIds::TIMEOUT, - "Timeout occurred while creating external data source in YDB. Try repeating the request later")); - } - - void SendErrorMessageToSender(NYql::TIssue issue) { - Counters->Error->Inc(); - NYql::TIssues issues; - issues.AddIssue(issue); - Send( - Sender, - new TEvControlPlaneProxy::TEvCreateConnectionResponse(issues, {}), - 0, - Cookie); // Change to template - PassAway(); - } - -private: - TTableClientPtr CreateNewTableClient( - const NFq::TComputeConfig& computeConfig, - const TYqSharedResources::TPtr& yqSharedResources, - const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory) { - - NFq::NConfig::TYdbStorageConfig computeConnection = computeConfig.GetConnection(Scope); - computeConnection.set_endpoint(Event->Get()->ComputeDatabase->connection().endpoint()); - computeConnection.set_database(Event->Get()->ComputeDatabase->connection().database()); - computeConnection.set_usessl(Event->Get()->ComputeDatabase->connection().usessl()); - - auto tableSettings = GetClientSettings<NYdb::NTable::TClientSettings>( - computeConnection, credentialsProviderFactory); - return std::make_unique<NYdb::NTable::TTableClient>( - yqSharedResources->UserSpaceYdbDriver, tableSettings); - } -}; - -class TCreateBindingInYDBActor : - public NActors::TActorBootstrapped<TCreateBindingInYDBActor> { - struct TEvPrivate { - enum EEv { - EvCreateSessionResponse = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), - EvCreateBindingExecutionResponse, - EvEnd - }; - - static_assert( - EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), - "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)"); - - struct TEvCreateSessionResponse : - NActors::TEventLocal<TEvCreateSessionResponse, EvCreateSessionResponse> { - TAsyncCreateSessionResult Result; - - TEvCreateSessionResponse(TAsyncCreateSessionResult result) - : Result(std::move(result)) { } - }; - - struct TEvCreateBindingExecutionResponse : - NActors::TEventLocal<TEvCreateBindingExecutionResponse, EvCreateBindingExecutionResponse> { - TAsyncStatus Result; - - TEvCreateBindingExecutionResponse(TAsyncStatus result) - : Result(std::move(result)) { } - }; - }; - - using TBase = NActors::TActorBootstrapped<TCreateBindingInYDBActor>; - using TBase::Become; - using TBase::PassAway; - using TBase::Register; - using TBase::SelfId; - using TBase::Send; - using IRetryPolicy = - IRetryPolicy<NCloud::TEvAccessService::TEvAuthenticateResponse::TPtr&>; - using TTableClientPtr = std::unique_ptr<NYdb::NTable::TTableClient>; - - using TEventRequest = TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr; - using TResponce = TEvControlPlaneProxy::TEvCreateBindingResponse; - using TResponseProxy = TEvControlPlaneProxy::TEvCreateBindingResponse; - - TActorId Sender; - TRequestCommonCountersPtr Counters; - TEventRequest Event; - ui32 Cookie; - TInstant StartTime; - TPermissions Permissions; - TDuration RequestTimeout; - TString Scope; - TTableClientPtr TableClient; - TString ConnectionName; - -public: - TCreateBindingInYDBActor( - const TRequestCommonCountersPtr& counters, - const NFq::TComputeConfig& computeConfig, - const TYqSharedResources::TPtr& yqSharedResources, - const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory, - TActorId sender, - TEventRequest event, - ui32 cookie, - TPermissions permissions, - const TString& scope, - TDuration requestTimeout) - : Sender(sender) - , Counters(counters) - , Event(event) - , Cookie(cookie) - , StartTime(TInstant::Now()) - , Permissions(std::move(permissions)) - , RequestTimeout(requestTimeout) - , Scope(scope) - , TableClient(CreateNewTableClient( - computeConfig, yqSharedResources, credentialsProviderFactory)) { } - - static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_CREATE_BINDING_IN_YDB"; - - void Bootstrap() { - CPP_LOG_T("Create external table in YDB. Actor id: " << SelfId()); - - if (auto issues = ValidateRequest(Event->Get()->Request); !issues.Empty()) { - NYql::TIssue issue = MakeErrorIssue( - TIssuesIds::INTERNAL_ERROR, "CreateBindingRequest is not valid"); - for (auto& subIssue : issues) { - issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue)); - } - SendErrorMessageToSender(std::move(issue)); - return; - } - - Become( - &TCreateBindingInYDBActor::StateFunc, - RequestTimeout, - new NActors::TEvents::TEvWakeup()); - Counters->InFly->Inc(); - ResolveConnectionId(); - } - - void ResolveConnectionId() { - FederatedQuery::DescribeConnectionRequest request; - auto connectionId = Event->Get()->Request.content().connection_id(); - request.set_connection_id(connectionId); - CPP_LOG_T( - "Create external table in YDB. Resolving connection id. Actor id: " - << SelfId() << " connection_id: " << connectionId); - auto event = new TEvControlPlaneStorage::TEvDescribeConnectionRequest( - Scope, - request, - Event->Get()->User, - Event->Get()->Token, - Event->Get()->CloudId, - Permissions, - Event->Get()->Quotas, - Event->Get()->TenantInfo, - {}); - Send(ControlPlaneStorageServiceActorId(), event); - } - - void Handle(TEvControlPlaneStorage::TEvDescribeConnectionResponse::TPtr& event) { - const auto& issues = event->Get()->Issues; - if (!issues.Empty()) { - CPP_LOG_E( - "Couldn't resolve connection id. Actor id: " << SelfId() << " Status: " - << issues.ToOneLineString()); - - NYql::TIssue issue = - MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, "Couldn't resolve connection id"); - for (auto& subIssue : issues) { - issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue)); - } - SendErrorMessageToSender(std::move(issue)); - return; - } - - ConnectionName = event->Get()->Result.connection().content().name(); - CPP_LOG_T( - "Create external table in YDB. Resolved connection name. Actor id: " - << SelfId() << " Connection name: " << ConnectionName); - InitiateConnectionCreation(); - } - - void InitiateConnectionCreation() { - auto request = Event->Get()->Request; - TableClient->CreateSession().Subscribe( - [actorSystem = NActors::TActivationContext::ActorSystem(), - self = SelfId()](const TAsyncCreateSessionResult& future) { - actorSystem->Send( - self, new TEvPrivate::TEvCreateSessionResponse(std::move(future))); - }); - } - - void Handle(TEvPrivate::TEvCreateSessionResponse::TPtr& event) { - auto createSessionResult = event->Get()->Result.GetValueSync(); - if (!createSessionResult.IsSuccess()) { - CPP_LOG_E( - "Couldn't create YDB session. Actor id: " - << SelfId() << " Status: " << createSessionResult.GetStatus()); - - NYql::TIssue issue = - MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, "Couldn't create YDB session"); - for (auto& subIssue : createSessionResult.GetIssues()) { - issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue)); - } - SendErrorMessageToSender(std::move(issue)); - return; - } - - auto session = createSessionResult.GetSession(); - auto query = CreateSchemaQuery(Event->Get()->Request); - CPP_LOG_T( - "Create external table in YDB. Actor id: " << SelfId() << " Query: " << query); - session.ExecuteSchemeQuery(query).Subscribe( - [actorSystem = NActors::TActivationContext::ActorSystem(), - self = SelfId()](const TAsyncStatus& future) { - actorSystem->Send( - self, new TEvPrivate::TEvCreateBindingExecutionResponse(future)); - }); - } - - NYql::TIssues ValidateRequest(const FederatedQuery::CreateBindingRequest& request) { - NYql::TIssues issues; - if (request.content().setting().binding_case() != - FederatedQuery::BindingSetting::BindingCase::kObjectStorage) { - issues.AddIssue( - MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, "Unsupported binding type")); - } - if (request.content().setting().object_storage().subset().size() != 1) { - issues.AddIssue(MakeErrorIssue( - TIssuesIds::INTERNAL_ERROR, - "Cannot create external table due to wrong amount of subsets in request")); - } - if (request.content().name().Contains('/')) { - issues.AddIssue(MakeErrorIssue( - TIssuesIds::INTERNAL_ERROR, "'/' is not allowed in binding name")); - } - return issues; - } - - TString CreateSchemaQuery(const FederatedQuery::CreateBindingRequest& request) { - using namespace fmt::literals; - - auto bindingName = request.content().name(); - auto objectStorageParams = request.content().setting().object_storage(); - const auto& subset = objectStorageParams.subset(0); - - // Schema - NYql::TExprContext context; - auto columnsTransformFunction = [&](const Ydb::Column& column) -> TString { - NYdb::TTypeParser typeParser(column.type()); - auto node = MakeType(typeParser, context); - auto typeName = NYql::FormatType(node); - const TString notNull = (node->GetKind() == NYql::ETypeAnnotationKind::Optional) ? "" : "NOT NULL"; - return fmt::format( - " {columnName} {columnType} {notNull}", - "columnName"_a = EncloseAndEscapeString(column.name(), '`'), - "columnType"_a = typeName, - "notNull"_a = notNull - ); - }; - auto columnsBegin = - MakeMappedIterator(subset.schema().column().begin(), columnsTransformFunction); - auto columnsEnd = - MakeMappedIterator(subset.schema().column().end(), columnsTransformFunction); - - // WithOptions - auto withOptions = std::unordered_map<TString, TString>{}; - withOptions.insert( - {"DATA_SOURCE", TStringBuilder{} << '"' << ConnectionName << '"'}); - withOptions.insert({"LOCATION", EncloseAndEscapeString(subset.path_pattern(), '"')}); - if (!subset.format().Empty()) { - withOptions.insert({"FORMAT", EncloseAndEscapeString(subset.format(), '"')}); - } - if (!subset.compression().Empty()) { - withOptions.insert( - {"COMPRESSION", EncloseAndEscapeString(subset.compression(), '"')}); - } - for (auto& kv : subset.format_setting()) { - withOptions.insert( - {EncloseAndEscapeString(kv.first, '`'), - EncloseAndEscapeString(kv.second, '"')}); - } - - if (!subset.partitioned_by().empty()) { - auto stringEscapeMapper = [](const TString& value) { - return EscapeString(value, '"'); - }; - - auto partitionBy = - TStringBuilder{} - << "\"[" - << JoinRange( - ", ", - MakeMappedIterator(subset.partitioned_by().begin(), stringEscapeMapper), - MakeMappedIterator(subset.partitioned_by().end(), stringEscapeMapper)) - << "]\""; - withOptions.insert({"PARTITIONED_BY", partitionBy}); - } - - for (auto& kv : subset.projection()) { - withOptions.insert( - {EncloseAndEscapeString(kv.first, '`'), - EncloseAndEscapeString(kv.second, '"')}); - } - - auto concatEscapedKeyValueMapper = - [](const std::pair<TString, TString>& kv) -> TString { - return TStringBuilder{} << " " << kv.first << " = " << kv.second; - }; - - auto withOptionsBegin = - MakeMappedIterator(withOptions.begin(), concatEscapedKeyValueMapper); - auto withOptionsEnd = - MakeMappedIterator(withOptions.end(), concatEscapedKeyValueMapper); - - return fmt::format( - R"( - CREATE EXTERNAL TABLE {externalTableName} ( - {columns} - ) WITH ( - {withOptions} - );)", - "externalTableName"_a = EncloseAndEscapeString(bindingName, '`'), - "columns"_a = JoinRange(",\n", columnsBegin, columnsEnd), - "withOptions"_a = JoinRange(",\n", withOptionsBegin, withOptionsEnd)); - } - - void Handle(TEvPrivate::TEvCreateBindingExecutionResponse::TPtr& event) { - Counters->InFly->Dec(); - Counters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds()); - - const auto& status = event->Get()->Result.GetValueSync(); - if (!status.IsSuccess()) { - TString errorMessage; - if (status.GetStatus() == NYdb::EStatus::ALREADY_EXISTS) { - errorMessage = "External table with such name already exists"; - } else { - errorMessage = TStringBuilder{} - << "Couldn't create external table in YDB. Status: " - << status.GetStatus(); - } - - CPP_LOG_E(errorMessage); - - NYql::TIssue issue = MakeErrorIssue( - TIssuesIds::INTERNAL_ERROR, "Couldn't create external table in YDB"); - for (auto& subIssue : status.GetIssues()) { - issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue)); - } - SendErrorMessageToSender(std::move(issue)); - return; - } - - Counters->Ok->Inc(); - Event->Get()->ComputeYDBOperationWasPerformed = true; - CPP_LOG_T("External table in YDB was successfully created"); - - TActivationContext::Send(Event->Forward(ControlPlaneProxyActorId())); - PassAway(); - } - - STRICT_STFUNC(StateFunc, cFunc(NActors::TEvents::TSystem::Wakeup, HandleTimeout); - hFunc(TEvControlPlaneStorage::TEvDescribeConnectionResponse, Handle); - hFunc(TEvPrivate::TEvCreateSessionResponse, Handle); - hFunc(TEvPrivate::TEvCreateBindingExecutionResponse, Handle);) - - void HandleTimeout() { - CPP_LOG_D("Create external table in YDB timeout. Actor id: " << SelfId()); - Counters->Timeout->Inc(); - SendErrorMessageToSender(MakeErrorIssue( - TIssuesIds::TIMEOUT, - "Create external table in YDB timeout. Try repeating the request later")); - } - - void SendErrorMessageToSender(NYql::TIssue issue) { - Counters->Error->Inc(); - NYql::TIssues issues; - issues.AddIssue(issue); - Send(Sender, new TResponseProxy(issues, {}), 0, - Cookie); // Change to template - PassAway(); - } - -private: - TTableClientPtr CreateNewTableClient( - const NFq::TComputeConfig& computeConfig, - const TYqSharedResources::TPtr& yqSharedResources, - const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory) { - - NFq::NConfig::TYdbStorageConfig computeConnection = computeConfig.GetConnection(Scope); - computeConnection.set_endpoint(Event->Get()->ComputeDatabase->connection().endpoint()); - computeConnection.set_database(Event->Get()->ComputeDatabase->connection().database()); - computeConnection.set_usessl(Event->Get()->ComputeDatabase->connection().usessl()); - - auto tableSettings = GetClientSettings<NYdb::NTable::TClientSettings>( - computeConnection, credentialsProviderFactory); - return std::make_unique<NYdb::NTable::TTableClient>( - yqSharedResources->UserSpaceYdbDriver, tableSettings); - } -}; - template<class TEventRequest, class TResponseProxy> class TResolveFolderActor : public NActors::TActorBootstrapped<TResolveFolderActor<TEventRequest, TResponseProxy>> { using TBase = NActors::TActorBootstrapped<TResolveFolderActor<TEventRequest, TResponseProxy>>; @@ -1258,16 +461,17 @@ public: } }; -template<class TRequestProto, class TRequest, class TResponse, class TResponseProxy> -class TRequestActor : public NActors::TActorBootstrapped<TRequestActor<TRequestProto, TRequest, TResponse, TResponseProxy>> { +template<class TRequestProto, class TRequest, class TResponse, class TRequestProxy, class TResponseProxy> +class TRequestActor : public NActors::TActorBootstrapped<TRequestActor<TRequestProto, TRequest, TResponse, TRequestProxy, TResponseProxy>> { protected: - using TBase = NActors::TActorBootstrapped<TRequestActor<TRequestProto, TRequest, TResponse, TResponseProxy>>; + using TBase = NActors::TActorBootstrapped<TRequestActor<TRequestProto, TRequest, TResponse, TRequestProxy, TResponseProxy>>; using TBase::SelfId; using TBase::Send; using TBase::PassAway; using TBase::Become; using TBase::Schedule; + typename TRequestProxy::TPtr RequestProxy; ::NFq::TControlPlaneProxyConfig Config; TRequestProto RequestProto; TString Scope; @@ -1287,20 +491,31 @@ protected: TTenantInfo::TPtr TenantInfo; TMaybe<FederatedQuery::Internal::ComputeDatabaseInternal> ComputeDatabase; ui32 RetryCount = 0; + bool ReplyWithResponseOnSuccess = true; public: static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_REQUEST_ACTOR"; - explicit TRequestActor(const ::NFq::TControlPlaneProxyConfig& config, - TActorId sender, ui32 cookie, - const TString& scope, const TString& folderId, TRequestProto&& requestProto, - TString&& user, TString&& token, const TActorId& serviceId, + explicit TRequestActor(typename TRequestProxy::TPtr requestProxy, + const ::NFq::TControlPlaneProxyConfig& config, + TActorId sender, + ui32 cookie, + const TString& scope, + const TString& folderId, + TRequestProto&& requestProto, + TString&& user, + TString&& token, + const TActorId& serviceId, const TRequestCounters& counters, const std::function<void(const TDuration&, bool, bool)>& probe, TPermissions permissions, - const TString& cloudId, const TString& subjectType, TMaybe<TQuotaMap>&& quotas = Nothing(), - TMaybe<FederatedQuery::Internal::ComputeDatabaseInternal>&& computeDatabase = Nothing()) - : Config(config) + const TString& cloudId, + const TString& subjectType, + TMaybe<TQuotaMap>&& quotas = Nothing(), + TMaybe<FederatedQuery::Internal::ComputeDatabaseInternal>&& computeDatabase = Nothing(), + bool replyWithResponseOnSuccess = true) + : RequestProxy(requestProxy) + , Config(config) , RequestProto(std::forward<TRequestProto>(requestProto)) , Scope(scope) , FolderId(folderId) @@ -1317,6 +532,7 @@ public: , SubjectType(subjectType) , Quotas(std::move(quotas)) , ComputeDatabase(std::move(computeDatabase)) + , ReplyWithResponseOnSuccess(replyWithResponseOnSuccess) { Counters.IncInFly(); } @@ -1398,7 +614,13 @@ public: const TDuration delta = TInstant::Now() - StartTime; Counters.IncOk(); Probe(delta, true, false); - Send(Sender, new TResponseProxy(std::forward<TArgs>(args)..., SubjectType), 0, Cookie); + if (ReplyWithResponseOnSuccess) { // constexpr + Send(Sender, new TResponseProxy(std::forward<TArgs>(args)..., SubjectType), 0, Cookie); + } else { + RequestProxy->Get()->Response = std::make_unique<TResponseProxy>(std::forward<TArgs>(args)..., SubjectType); + RequestProxy->Get()->ControlPlaneYDBOperationWasPerformed = true; + Send(RequestProxy->Forward(ControlPlaneProxyActorId())); + } PassAway(); } @@ -1421,13 +643,16 @@ public: class TCreateQueryRequestActor : public TRequestActor<FederatedQuery::CreateQueryRequest, TEvControlPlaneStorage::TEvCreateQueryRequest, TEvControlPlaneStorage::TEvCreateQueryResponse, + TEvControlPlaneProxy::TEvCreateQueryRequest, TEvControlPlaneProxy::TEvCreateQueryResponse> { bool QuoterResourceCreated = false; + public: using TBaseRequestActor = TRequestActor<FederatedQuery::CreateQueryRequest, TEvControlPlaneStorage::TEvCreateQueryRequest, TEvControlPlaneStorage::TEvCreateQueryResponse, + TEvControlPlaneProxy::TEvCreateQueryRequest, TEvControlPlaneProxy::TEvCreateQueryResponse>; using TBaseRequestActor::TBaseRequestActor; @@ -1484,175 +709,7 @@ public: }; class TControlPlaneProxyActor : public NActors::TActorBootstrapped<TControlPlaneProxyActor> { - enum ERequestTypeScope { - RTS_CREATE_QUERY, - RTS_LIST_QUERIES, - RTS_DESCRIBE_QUERY, - RTS_GET_QUERY_STATUS, - RTS_MODIFY_QUERY, - RTS_DELETE_QUERY, - RTS_CONTROL_QUERY, - RTS_GET_RESULT_DATA, - RTS_LIST_JOBS, - RTS_DESCRIBE_JOB, - RTS_CREATE_CONNECTION, - RTS_LIST_CONNECTIONS, - RTS_DESCRIBE_CONNECTION, - RTS_MODIFY_CONNECTION, - RTS_DELETE_CONNECTION, - RTS_TEST_CONNECTION, - RTS_CREATE_BINDING, - RTS_LIST_BINDINGS, - RTS_DESCRIBE_BINDING, - RTS_MODIFY_BINDING, - RTS_DELETE_BINDING, - RTS_MAX, - }; - - enum ERequestTypeCommon { - RTC_RESOLVE_FOLDER, - RTC_CREATE_QUERY, - RTC_LIST_QUERIES, - RTC_DESCRIBE_QUERY, - RTC_GET_QUERY_STATUS, - RTC_MODIFY_QUERY, - RTC_DELETE_QUERY, - RTC_CONTROL_QUERY, - RTC_GET_RESULT_DATA, - RTC_LIST_JOBS, - RTC_DESCRIBE_JOB, - RTC_CREATE_CONNECTION, - RTC_LIST_CONNECTIONS, - RTC_DESCRIBE_CONNECTION, - RTC_MODIFY_CONNECTION, - RTC_DELETE_CONNECTION, - RTC_TEST_CONNECTION, - RTC_CREATE_BINDING, - RTC_LIST_BINDINGS, - RTC_DESCRIBE_BINDING, - RTC_MODIFY_BINDING, - RTC_DELETE_BINDING, - RTC_RESOLVE_SUBJECT_TYPE, - RTC_CREATE_CONNECTION_IN_YDB, - RTC_CREATE_BINDING_IN_YDB, - RTC_CREATE_COMPUTE_DATABASE, - RTC_MAX, - }; - - class TCounters: public virtual TThrRefBase { - struct TMetricsScope { - TString CloudId; - TString Scope; - - TMetricsScope() = default; - - TMetricsScope(const TString& cloudId, const TString& scope) - : CloudId(cloudId) - , Scope(scope) - {} - - bool operator<(const TMetricsScope& right) const { - return std::make_pair(CloudId, Scope) < std::make_pair(right.CloudId, right.Scope); - } - }; - - using TScopeCounters = std::array<TRequestScopeCountersPtr, RTS_MAX>; - using TScopeCountersPtr = std::shared_ptr<TScopeCounters>; - - std::array<TRequestCommonCountersPtr, RTC_MAX> CommonRequests = CreateArray<RTC_MAX, TRequestCommonCountersPtr>({ - { MakeIntrusive<TRequestCommonCounters>("ResolveFolder") }, - { MakeIntrusive<TRequestCommonCounters>("CreateQuery") }, - { MakeIntrusive<TRequestCommonCounters>("ListQueries") }, - { MakeIntrusive<TRequestCommonCounters>("DescribeQuery") }, - { MakeIntrusive<TRequestCommonCounters>("GetQueryStatus") }, - { MakeIntrusive<TRequestCommonCounters>("ModifyQuery") }, - { MakeIntrusive<TRequestCommonCounters>("DeleteQuery") }, - { MakeIntrusive<TRequestCommonCounters>("ControlQuery") }, - { MakeIntrusive<TRequestCommonCounters>("GetResultData") }, - { MakeIntrusive<TRequestCommonCounters>("ListJobs") }, - { MakeIntrusive<TRequestCommonCounters>("DescribeJob") }, - { MakeIntrusive<TRequestCommonCounters>("CreateConnection") }, - { MakeIntrusive<TRequestCommonCounters>("ListConnections") }, - { MakeIntrusive<TRequestCommonCounters>("DescribeConnection") }, - { MakeIntrusive<TRequestCommonCounters>("ModifyConnection") }, - { MakeIntrusive<TRequestCommonCounters>("DeleteConnection") }, - { MakeIntrusive<TRequestCommonCounters>("TestConnection") }, - { MakeIntrusive<TRequestCommonCounters>("CreateBinding") }, - { MakeIntrusive<TRequestCommonCounters>("ListBindings") }, - { MakeIntrusive<TRequestCommonCounters>("DescribeBinding") }, - { MakeIntrusive<TRequestCommonCounters>("ModifyBinding") }, - { MakeIntrusive<TRequestCommonCounters>("DeleteBinding") }, - { MakeIntrusive<TRequestCommonCounters>("ResolveSubjectType") }, - { MakeIntrusive<TRequestCommonCounters>("CreateConnectionInYDB") }, - { MakeIntrusive<TRequestCommonCounters>("CreateBindingInYDB") }, - { MakeIntrusive<TRequestCommonCounters>("CreateComputeDatabase") }, - }); - - TTtlCache<TMetricsScope, TScopeCountersPtr, TMap> ScopeCounters{TTtlCacheSettings{}.SetTtl(TDuration::Days(1))}; - ::NMonitoring::TDynamicCounterPtr Counters; - - public: - explicit TCounters(const ::NMonitoring::TDynamicCounterPtr& counters) - : Counters(counters) - { - for (auto& request: CommonRequests) { - request->Register(Counters); - } - } - - TRequestCounters GetCounters(const TString& cloudId, const TString& scope, ERequestTypeScope scopeType, ERequestTypeCommon commonType) { - return {GetScopeCounters(cloudId, scope, scopeType), GetCommonCounters(commonType)}; - } - - TRequestCommonCountersPtr GetCommonCounters(ERequestTypeCommon type) { - return CommonRequests[type]; - } - - TRequestScopeCountersPtr GetScopeCounters(const TString& cloudId, const TString& scope, ERequestTypeScope type) { - TMetricsScope key{cloudId, scope}; - TMaybe<TScopeCountersPtr> cacheVal; - ScopeCounters.Get(key, &cacheVal); - if (cacheVal) { - return (**cacheVal)[type]; - } - - auto scopeRequests = std::make_shared<TScopeCounters>(CreateArray<RTS_MAX, TRequestScopeCountersPtr>({ - { MakeIntrusive<TRequestScopeCounters>("CreateQuery") }, - { MakeIntrusive<TRequestScopeCounters>("ListQueries") }, - { MakeIntrusive<TRequestScopeCounters>("DescribeQuery") }, - { MakeIntrusive<TRequestScopeCounters>("GetQueryStatus") }, - { MakeIntrusive<TRequestScopeCounters>("ModifyQuery") }, - { MakeIntrusive<TRequestScopeCounters>("DeleteQuery") }, - { MakeIntrusive<TRequestScopeCounters>("ControlQuery") }, - { MakeIntrusive<TRequestScopeCounters>("GetResultData") }, - { MakeIntrusive<TRequestScopeCounters>("ListJobs") }, - { MakeIntrusive<TRequestScopeCounters>("DescribeJob") }, - { MakeIntrusive<TRequestScopeCounters>("CreateConnection") }, - { MakeIntrusive<TRequestScopeCounters>("ListConnections") }, - { MakeIntrusive<TRequestScopeCounters>("DescribeConnection") }, - { MakeIntrusive<TRequestScopeCounters>("ModifyConnection") }, - { MakeIntrusive<TRequestScopeCounters>("DeleteConnection") }, - { MakeIntrusive<TRequestScopeCounters>("TestConnection") }, - { MakeIntrusive<TRequestScopeCounters>("CreateBinding") }, - { MakeIntrusive<TRequestScopeCounters>("ListBindings") }, - { MakeIntrusive<TRequestScopeCounters>("DescribeBinding") }, - { MakeIntrusive<TRequestScopeCounters>("ModifyBinding") }, - { MakeIntrusive<TRequestScopeCounters>("DeleteBinding") }, - })); - - auto scopeCounters = Counters - ->GetSubgroup("cloud_id", cloudId) - ->GetSubgroup("scope", scope); - - for (auto& request: *scopeRequests) { - request->Register(scopeCounters); - } - cacheVal = scopeRequests; - ScopeCounters.Put(key, cacheVal); - return (*scopeRequests)[type]; - } - }; - +private: TCounters Counters; const ::NFq::TControlPlaneProxyConfig Config; const TYqSharedResources::TPtr YqSharedResources; @@ -1748,7 +805,7 @@ private: template<typename T> TPermissions ExtractPermissions(T& ev, const TPermissions& availablePermissions) { TPermissions permissions; - for (const auto& permission: ev->Get()->Permissions) { + for (const auto& permission : ev->Get()->Permissions) { if (auto it = PermissionsItems.find(permission); it != PermissionsItems.end()) { // cut off permissions that should not be used in other services if (availablePermissions.Check(it->second)) { @@ -1766,7 +823,7 @@ private: return issues; } - for (const auto& requiredPermission: requiredPermissions) { + for (const auto& requiredPermission : requiredPermissions) { if (!IsIn(ev->Get()->Permissions, requiredPermission)) { issues.AddIssue(MakeErrorIssue(TIssuesIds::ACCESS_DENIED, "No permission " + requiredPermission + " in a given scope yandexcloud://" + ev->Get()->FolderId)); } @@ -1837,14 +894,26 @@ private: | TPermissions::TPermission::MANAGE_PUBLIC }; - Register(new TCreateQueryRequestActor - (Config, sender, cookie, scope, folderId, - std::move(request), std::move(user), std::move(token), - ControlPlaneStorageServiceActorId(), - requestCounters, - probe, ExtractPermissions(ev, availablePermissions), - cloudId, subjectType, std::move(ev->Get()->Quotas), - std::move(ev->Get()->ComputeDatabase))); + { + auto sender = ev->Sender; + auto cookie = ev->Cookie; + auto permissions = ExtractPermissions(ev, availablePermissions); + auto computeDatabase = ev->Get()->ComputeDatabase; + auto quotas = ev->Get()->Quotas; + Register(new TCreateQueryRequestActor + (ev, Config, sender, cookie, scope, folderId, + std::move(request), + std::move(user), + std::move(token), + ControlPlaneStorageServiceActorId(), + requestCounters, + probe, + permissions, + cloudId, + subjectType, + std::move(quotas), + std::move(computeDatabase))); + } } void Handle(TEvControlPlaneProxy::TEvListQueriesRequest::TPtr& ev) { @@ -1900,16 +969,26 @@ private: | TPermissions::TPermission::VIEW_PRIVATE }; - Register(new TRequestActor<FederatedQuery::ListQueriesRequest, - TEvControlPlaneStorage::TEvListQueriesRequest, - TEvControlPlaneStorage::TEvListQueriesResponse, - TEvControlPlaneProxy::TEvListQueriesResponse> - (Config, ev->Sender, ev->Cookie, scope, folderId, - std::move(request), std::move(user), std::move(token), - ControlPlaneStorageServiceActorId(), - requestCounters, - probe, - ExtractPermissions(ev, availablePermissions), cloudId, subjectType)); + { + auto sender = ev->Sender; + auto cookie = ev->Cookie; + auto permissions = ExtractPermissions(ev, availablePermissions); + Register(new TRequestActor<FederatedQuery::ListQueriesRequest, + TEvControlPlaneStorage::TEvListQueriesRequest, + TEvControlPlaneStorage::TEvListQueriesResponse, + TEvControlPlaneProxy::TEvListQueriesRequest, + TEvControlPlaneProxy::TEvListQueriesResponse> + (ev, Config, sender, cookie, scope, folderId, + std::move(request), + std::move(user), + std::move(token), + ControlPlaneStorageServiceActorId(), + requestCounters, + probe, + permissions, + cloudId, + subjectType)); + } } void Handle(TEvControlPlaneProxy::TEvDescribeQueryRequest::TPtr& ev) { @@ -1968,16 +1047,26 @@ private: | TPermissions::VIEW_QUERY_TEXT }; - Register(new TRequestActor<FederatedQuery::DescribeQueryRequest, - TEvControlPlaneStorage::TEvDescribeQueryRequest, - TEvControlPlaneStorage::TEvDescribeQueryResponse, - TEvControlPlaneProxy::TEvDescribeQueryResponse> - (Config, ev->Sender, ev->Cookie, scope, folderId, - std::move(request), std::move(user), std::move(token), - ControlPlaneStorageServiceActorId(), - requestCounters, - probe, - ExtractPermissions(ev, availablePermissions), cloudId, subjectType)); + { + auto sender = ev->Sender; + auto cookie = ev->Cookie; + auto permissions = ExtractPermissions(ev, availablePermissions); + Register(new TRequestActor<FederatedQuery::DescribeQueryRequest, + TEvControlPlaneStorage::TEvDescribeQueryRequest, + TEvControlPlaneStorage::TEvDescribeQueryResponse, + TEvControlPlaneProxy::TEvDescribeQueryRequest, + TEvControlPlaneProxy::TEvDescribeQueryResponse> + (ev, Config, sender, cookie, scope, folderId, + std::move(request), + std::move(user), + std::move(token), + ControlPlaneStorageServiceActorId(), + requestCounters, + probe, + permissions, + cloudId, + subjectType)); + } } void Handle(TEvControlPlaneProxy::TEvGetQueryStatusRequest::TPtr& ev) { @@ -2034,16 +1123,26 @@ private: | TPermissions::TPermission::VIEW_PRIVATE }; - Register(new TRequestActor<FederatedQuery::GetQueryStatusRequest, - TEvControlPlaneStorage::TEvGetQueryStatusRequest, - TEvControlPlaneStorage::TEvGetQueryStatusResponse, - TEvControlPlaneProxy::TEvGetQueryStatusResponse> - (Config, ev->Sender, ev->Cookie, scope, folderId, - std::move(request), std::move(user), std::move(token), - ControlPlaneStorageServiceActorId(), - requestCounters, - probe, - ExtractPermissions(ev, availablePermissions), cloudId, subjectType)); + { + auto sender = ev->Sender; + auto cookie = ev->Cookie; + auto permissions = ExtractPermissions(ev, availablePermissions); + Register(new TRequestActor<FederatedQuery::GetQueryStatusRequest, + TEvControlPlaneStorage::TEvGetQueryStatusRequest, + TEvControlPlaneStorage::TEvGetQueryStatusResponse, + TEvControlPlaneProxy::TEvGetQueryStatusRequest, + TEvControlPlaneProxy::TEvGetQueryStatusResponse> + (ev, Config, sender, cookie, scope, folderId, + std::move(request), + std::move(user), + std::move(token), + ControlPlaneStorageServiceActorId(), + requestCounters, + probe, + permissions, + cloudId, + subjectType)); + } } void Handle(TEvControlPlaneProxy::TEvModifyQueryRequest::TPtr& ev) { @@ -2110,16 +1209,29 @@ private: | TPermissions::TPermission::MANAGE_PRIVATE }; - Register(new TRequestActor<FederatedQuery::ModifyQueryRequest, - TEvControlPlaneStorage::TEvModifyQueryRequest, - TEvControlPlaneStorage::TEvModifyQueryResponse, - TEvControlPlaneProxy::TEvModifyQueryResponse> - (Config, ev->Sender, ev->Cookie, scope, folderId, - std::move(request), std::move(user), std::move(token), - ControlPlaneStorageServiceActorId(), - requestCounters, - probe, ExtractPermissions(ev, availablePermissions), - cloudId, subjectType, {}, std::move(ev->Get()->ComputeDatabase))); + { + auto sender = ev->Sender; + auto cookie = ev->Cookie; + auto permissions = ExtractPermissions(ev, availablePermissions); + auto computeDatabase = ev->Get()->ComputeDatabase; + Register(new TRequestActor<FederatedQuery::ModifyQueryRequest, + TEvControlPlaneStorage::TEvModifyQueryRequest, + TEvControlPlaneStorage::TEvModifyQueryResponse, + TEvControlPlaneProxy::TEvModifyQueryRequest, + TEvControlPlaneProxy::TEvModifyQueryResponse> + (ev, Config, sender, cookie, scope, folderId, + std::move(request), + std::move(user), + std::move(token), + ControlPlaneStorageServiceActorId(), + requestCounters, + probe, + permissions, + cloudId, + subjectType, + {}, + std::move(computeDatabase))); + } } void Handle(TEvControlPlaneProxy::TEvDeleteQueryRequest::TPtr& ev) { @@ -2176,16 +1288,26 @@ private: | TPermissions::TPermission::MANAGE_PRIVATE }; - Register(new TRequestActor<FederatedQuery::DeleteQueryRequest, - TEvControlPlaneStorage::TEvDeleteQueryRequest, - TEvControlPlaneStorage::TEvDeleteQueryResponse, - TEvControlPlaneProxy::TEvDeleteQueryResponse> - (Config, ev->Sender, ev->Cookie, scope, folderId, - std::move(request), std::move(user), std::move(token), - ControlPlaneStorageServiceActorId(), - requestCounters, - probe, - ExtractPermissions(ev, availablePermissions), cloudId, subjectType)); + { + auto sender = ev->Sender; + auto cookie = ev->Cookie; + auto permissions = ExtractPermissions(ev, availablePermissions); + Register(new TRequestActor<FederatedQuery::DeleteQueryRequest, + TEvControlPlaneStorage::TEvDeleteQueryRequest, + TEvControlPlaneStorage::TEvDeleteQueryResponse, + TEvControlPlaneProxy::TEvDeleteQueryRequest, + TEvControlPlaneProxy::TEvDeleteQueryResponse> + (ev, Config, sender, cookie, scope, folderId, + std::move(request), + std::move(user), + std::move(token), + ControlPlaneStorageServiceActorId(), + requestCounters, + probe, + permissions, + cloudId, + subjectType)); + } } void Handle(TEvControlPlaneProxy::TEvControlQueryRequest::TPtr& ev) { @@ -2242,16 +1364,26 @@ private: | TPermissions::TPermission::MANAGE_PRIVATE }; - Register(new TRequestActor<FederatedQuery::ControlQueryRequest, - TEvControlPlaneStorage::TEvControlQueryRequest, - TEvControlPlaneStorage::TEvControlQueryResponse, - TEvControlPlaneProxy::TEvControlQueryResponse> - (Config, ev->Sender, ev->Cookie, scope, folderId, - std::move(request), std::move(user), std::move(token), - ControlPlaneStorageServiceActorId(), - requestCounters, - probe, - ExtractPermissions(ev, availablePermissions), cloudId, subjectType)); + { + auto sender = ev->Sender; + auto cookie = ev->Cookie; + auto permissions = ExtractPermissions(ev, availablePermissions); + Register(new TRequestActor<FederatedQuery::ControlQueryRequest, + TEvControlPlaneStorage::TEvControlQueryRequest, + TEvControlPlaneStorage::TEvControlQueryResponse, + TEvControlPlaneProxy::TEvControlQueryRequest, + TEvControlPlaneProxy::TEvControlQueryResponse> + (ev, Config, sender, cookie, scope, folderId, + std::move(request), + std::move(user), + std::move(token), + ControlPlaneStorageServiceActorId(), + requestCounters, + probe, + permissions, + cloudId, + subjectType)); + } } void Handle(TEvControlPlaneProxy::TEvGetResultDataRequest::TPtr& ev) { @@ -2311,16 +1443,26 @@ private: | TPermissions::TPermission::VIEW_PRIVATE }; - Register(new TRequestActor<FederatedQuery::GetResultDataRequest, - TEvControlPlaneStorage::TEvGetResultDataRequest, - TEvControlPlaneStorage::TEvGetResultDataResponse, - TEvControlPlaneProxy::TEvGetResultDataResponse> - (Config, ev->Sender, ev->Cookie, scope, folderId, - std::move(request), std::move(user), std::move(token), - ControlPlaneStorageServiceActorId(), - requestCounters, - probe, - ExtractPermissions(ev, availablePermissions), cloudId, subjectType)); + { + auto sender = ev->Sender; + auto cookie = ev->Cookie; + auto permissions = ExtractPermissions(ev, availablePermissions); + Register(new TRequestActor<FederatedQuery::GetResultDataRequest, + TEvControlPlaneStorage::TEvGetResultDataRequest, + TEvControlPlaneStorage::TEvGetResultDataResponse, + TEvControlPlaneProxy::TEvGetResultDataRequest, + TEvControlPlaneProxy::TEvGetResultDataResponse> + (ev, Config, sender, cookie, scope, folderId, + std::move(request), + std::move(user), + std::move(token), + ControlPlaneStorageServiceActorId(), + requestCounters, + probe, + permissions, + cloudId, + subjectType)); + } } void Handle(TEvControlPlaneProxy::TEvListJobsRequest::TPtr& ev) { @@ -2377,16 +1519,26 @@ private: | TPermissions::TPermission::VIEW_PRIVATE }; - Register(new TRequestActor<FederatedQuery::ListJobsRequest, - TEvControlPlaneStorage::TEvListJobsRequest, - TEvControlPlaneStorage::TEvListJobsResponse, - TEvControlPlaneProxy::TEvListJobsResponse> - (Config, ev->Sender, ev->Cookie, scope, folderId, - std::move(request), std::move(user), std::move(token), - ControlPlaneStorageServiceActorId(), - requestCounters, - probe, - ExtractPermissions(ev, availablePermissions), cloudId, subjectType)); + { + auto sender = ev->Sender; + auto cookie = ev->Cookie; + auto permissions = ExtractPermissions(ev, availablePermissions); + Register(new TRequestActor<FederatedQuery::ListJobsRequest, + TEvControlPlaneStorage::TEvListJobsRequest, + TEvControlPlaneStorage::TEvListJobsResponse, + TEvControlPlaneProxy::TEvListJobsRequest, + TEvControlPlaneProxy::TEvListJobsResponse> + (ev, Config, sender, cookie, scope, folderId, + std::move(request), + std::move(user), + std::move(token), + ControlPlaneStorageServiceActorId(), + requestCounters, + probe, + permissions, + cloudId, + subjectType)); + } } void Handle(TEvControlPlaneProxy::TEvDescribeJobRequest::TPtr& ev) { @@ -2445,16 +1597,26 @@ private: | TPermissions::VIEW_QUERY_TEXT }; - Register(new TRequestActor<FederatedQuery::DescribeJobRequest, - TEvControlPlaneStorage::TEvDescribeJobRequest, - TEvControlPlaneStorage::TEvDescribeJobResponse, - TEvControlPlaneProxy::TEvDescribeJobResponse> - (Config, ev->Sender, ev->Cookie, scope, folderId, - std::move(request), std::move(user), std::move(token), - ControlPlaneStorageServiceActorId(), - requestCounters, - probe, - ExtractPermissions(ev, availablePermissions), cloudId, subjectType)); + { + auto sender = ev->Sender; + auto cookie = ev->Cookie; + auto permissions = ExtractPermissions(ev, availablePermissions); + Register(new TRequestActor<FederatedQuery::DescribeJobRequest, + TEvControlPlaneStorage::TEvDescribeJobRequest, + TEvControlPlaneStorage::TEvDescribeJobResponse, + TEvControlPlaneProxy::TEvDescribeJobRequest, + TEvControlPlaneProxy::TEvDescribeJobResponse> + (ev, Config, sender, cookie, scope, folderId, + std::move(request), + std::move(user), + std::move(token), + ControlPlaneStorageServiceActorId(), + requestCounters, + probe, + permissions, + cloudId, + subjectType)); + } } void Handle(TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& ev) { @@ -2525,31 +1687,53 @@ private: }; if (Config.ComputeConfig.YdbComputeControlPlaneEnabled() && !ydbOperationWasPerformed) { - Register(new TCreateConnectionInYDBActor( - Counters.GetCommonCounters(RTC_CREATE_CONNECTION_IN_YDB), - Config.ComputeConfig, - YqSharedResources, - CredentialsProviderFactory, - Config.CommonConfig.GetObjectStorageEndpoint(), + if (!ev->Get()->YDBSession) { + Register(MakeComputeYDBSessionActor< + TEvControlPlaneProxy::TEvCreateConnectionRequest, + TEvControlPlaneProxy::TEvCreateConnectionResponse>( + sender, + std::move(ev), + Config.RequestTimeout, + Counters.GetCommonCounters(RTC_CREATE_YDB_SESSION), + Config.ComputeConfig, + YqSharedResources, + CredentialsProviderFactory)); + return; + } + + Register(NPrivate::MakeCreateConnectionActor( sender, - ev, - cookie, - scope, - Signer, - Config.RequestTimeout)); - return; - } - - Register(new TRequestActor<FederatedQuery::CreateConnectionRequest, - TEvControlPlaneStorage::TEvCreateConnectionRequest, - TEvControlPlaneStorage::TEvCreateConnectionResponse, - TEvControlPlaneProxy::TEvCreateConnectionResponse> - (Config, ev->Sender, ev->Cookie, scope, folderId, - std::move(request), std::move(user), std::move(token), - ControlPlaneStorageServiceActorId(), - requestCounters, - probe, ExtractPermissions(ev, availablePermissions), - cloudId, subjectType, {}, std::move(ev->Get()->ComputeDatabase))); + std::move(ev), + Config.RequestTimeout, + Counters, + Config.CommonConfig.GetObjectStorageEndpoint(), + Signer)); + return; + } + + { + auto sender = ev->Sender; + auto cookie = ev->Cookie; + auto permissions = ExtractPermissions(ev, availablePermissions); + auto computeDatabase = ev->Get()->ComputeDatabase; + Register(new TRequestActor<FederatedQuery::CreateConnectionRequest, + TEvControlPlaneStorage::TEvCreateConnectionRequest, + TEvControlPlaneStorage::TEvCreateConnectionResponse, + TEvControlPlaneProxy::TEvCreateConnectionRequest, + TEvControlPlaneProxy::TEvCreateConnectionResponse> + (ev, Config, sender, cookie, scope, folderId, + std::move(request), + std::move(user), + std::move(token), + ControlPlaneStorageServiceActorId(), + requestCounters, + probe, + permissions, + cloudId, + subjectType, + {}, + std::move(computeDatabase))); + } } void Handle(TEvControlPlaneProxy::TEvListConnectionsRequest::TPtr& ev) { @@ -2605,16 +1789,26 @@ private: | TPermissions::TPermission::VIEW_PRIVATE }; - Register(new TRequestActor<FederatedQuery::ListConnectionsRequest, - TEvControlPlaneStorage::TEvListConnectionsRequest, - TEvControlPlaneStorage::TEvListConnectionsResponse, - TEvControlPlaneProxy::TEvListConnectionsResponse> - (Config, ev->Sender, ev->Cookie, scope, folderId, - std::move(request), std::move(user), std::move(token), - ControlPlaneStorageServiceActorId(), - requestCounters, - probe, - ExtractPermissions(ev, availablePermissions), cloudId, subjectType)); + { + auto sender = ev->Sender; + auto cookie = ev->Cookie; + auto permissions = ExtractPermissions(ev, availablePermissions); + Register(new TRequestActor<FederatedQuery::ListConnectionsRequest, + TEvControlPlaneStorage::TEvListConnectionsRequest, + TEvControlPlaneStorage::TEvListConnectionsResponse, + TEvControlPlaneProxy::TEvListConnectionsRequest, + TEvControlPlaneProxy::TEvListConnectionsResponse> + (ev, Config, sender, cookie, scope, folderId, + std::move(request), + std::move(user), + std::move(token), + ControlPlaneStorageServiceActorId(), + requestCounters, + probe, + permissions, + cloudId, + subjectType)); + } } void Handle(TEvControlPlaneProxy::TEvDescribeConnectionRequest::TPtr& ev) { @@ -2671,16 +1865,26 @@ private: | TPermissions::TPermission::VIEW_PRIVATE }; - Register(new TRequestActor<FederatedQuery::DescribeConnectionRequest, - TEvControlPlaneStorage::TEvDescribeConnectionRequest, - TEvControlPlaneStorage::TEvDescribeConnectionResponse, - TEvControlPlaneProxy::TEvDescribeConnectionResponse> - (Config, ev->Sender, ev->Cookie, scope, folderId, - std::move(request), std::move(user), std::move(token), - ControlPlaneStorageServiceActorId(), - requestCounters, - probe, - ExtractPermissions(ev, availablePermissions), cloudId, subjectType)); + { + auto sender = ev->Sender; + auto cookie = ev->Cookie; + auto permissions = ExtractPermissions(ev, availablePermissions); + Register(new TRequestActor<FederatedQuery::DescribeConnectionRequest, + TEvControlPlaneStorage::TEvDescribeConnectionRequest, + TEvControlPlaneStorage::TEvDescribeConnectionResponse, + TEvControlPlaneProxy::TEvDescribeConnectionRequest, + TEvControlPlaneProxy::TEvDescribeConnectionResponse> + (ev, Config, sender, cookie, scope, folderId, + std::move(request), + std::move(user), + std::move(token), + ControlPlaneStorageServiceActorId(), + requestCounters, + probe, + permissions, + cloudId, + subjectType)); + } } void Handle(TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& ev) { @@ -2749,19 +1953,73 @@ private: static const TPermissions availablePermissions { TPermissions::TPermission::MANAGE_PUBLIC | TPermissions::TPermission::MANAGE_PRIVATE + | TPermissions::TPermission::VIEW_PUBLIC + | TPermissions::TPermission::VIEW_PRIVATE }; - Register(new TRequestActor<FederatedQuery::ModifyConnectionRequest, - TEvControlPlaneStorage::TEvModifyConnectionRequest, - TEvControlPlaneStorage::TEvModifyConnectionResponse, - TEvControlPlaneProxy::TEvModifyConnectionResponse> - (Config, ev->Sender, ev->Cookie, scope, folderId, - std::move(request), std::move(user), std::move(token), - ControlPlaneStorageServiceActorId(), - requestCounters, - probe, - ExtractPermissions(ev, availablePermissions), - cloudId, subjectType, {}, std::move(ev->Get()->ComputeDatabase))); + if (Config.ComputeConfig.YdbComputeControlPlaneEnabled() && !ev->Get()->OldConnectionContent) { + auto permissions = ExtractPermissions(ev, availablePermissions); + Register(MakeDiscoverYDBConnectionName( + sender, ev, Counters, Config.RequestTimeout, permissions)); + return; + } + + const bool controlPlaneYDBOperationWasPerformed = ev->Get()->ControlPlaneYDBOperationWasPerformed; + if (!controlPlaneYDBOperationWasPerformed) { + auto sender = ev->Sender; + auto cookie = ev->Cookie; + auto permissions = ExtractPermissions(ev, availablePermissions); + auto computeDatabase = ev->Get()->ComputeDatabase; + Register(new TRequestActor<FederatedQuery::ModifyConnectionRequest, + TEvControlPlaneStorage::TEvModifyConnectionRequest, + TEvControlPlaneStorage::TEvModifyConnectionResponse, + TEvControlPlaneProxy::TEvModifyConnectionRequest, + TEvControlPlaneProxy::TEvModifyConnectionResponse> + (ev, Config, sender, cookie, scope, folderId, + std::move(request), + std::move(user), + std::move(token), + ControlPlaneStorageServiceActorId(), + requestCounters, + probe, + permissions, + cloudId, + subjectType, + {}, + std::move(computeDatabase), + !Config.ComputeConfig.YdbComputeControlPlaneEnabled())); + return; + } + + if (Config.ComputeConfig.YdbComputeControlPlaneEnabled()) { + if (!ev->Get()->YDBSession) { + Register(MakeComputeYDBSessionActor< + TEvControlPlaneProxy::TEvModifyConnectionRequest, + TEvControlPlaneProxy::TEvModifyConnectionResponse>( + sender, + std::move(ev), + Config.RequestTimeout, + Counters.GetCommonCounters(RTC_CREATE_YDB_SESSION), + Config.ComputeConfig, + YqSharedResources, + CredentialsProviderFactory)); + return; + } + + if (!ev->Get()->ComputeYDBOperationWasPerformed) { + Register(MakeModifyConnectionActor( + sender, + ev, + Config.RequestTimeout, + Counters, + Config.CommonConfig.GetObjectStorageEndpoint(), + Signer)); + return; + } + + Send(sender, ev->Get()->Response.release()); + return; + } } void Handle(TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr& ev) { @@ -2825,19 +2083,59 @@ private: static const TPermissions availablePermissions { TPermissions::TPermission::MANAGE_PUBLIC | TPermissions::TPermission::MANAGE_PRIVATE + | TPermissions::TPermission::VIEW_PUBLIC + | TPermissions::TPermission::VIEW_PRIVATE }; - Register(new TRequestActor<FederatedQuery::DeleteConnectionRequest, - TEvControlPlaneStorage::TEvDeleteConnectionRequest, - TEvControlPlaneStorage::TEvDeleteConnectionResponse, - TEvControlPlaneProxy::TEvDeleteConnectionResponse> - (Config, ev->Sender, ev->Cookie, scope, folderId, - std::move(request), std::move(user), std::move(token), - ControlPlaneStorageServiceActorId(), - requestCounters, - probe, - ExtractPermissions(ev, availablePermissions), - cloudId, subjectType, {}, std::move(ev->Get()->ComputeDatabase))); + if (Config.ComputeConfig.YdbComputeControlPlaneEnabled() && !ev->Get()->ConnectionContent) { + auto permissions = ExtractPermissions(ev, availablePermissions); + Register(MakeDiscoverYDBConnectionName( + sender, ev, Counters, Config.RequestTimeout, permissions)); + return; + } + + const bool controlPlaneYDBOperationWasPerformed = ev->Get()->ControlPlaneYDBOperationWasPerformed; + if (!controlPlaneYDBOperationWasPerformed) { + auto sender = ev->Sender; + auto cookie = ev->Cookie; + auto permissions = ExtractPermissions(ev, availablePermissions); + auto computeDatabase = ev->Get()->ComputeDatabase; + Register(new TRequestActor<FederatedQuery::DeleteConnectionRequest, + TEvControlPlaneStorage::TEvDeleteConnectionRequest, + TEvControlPlaneStorage::TEvDeleteConnectionResponse, + TEvControlPlaneProxy::TEvDeleteConnectionRequest, + TEvControlPlaneProxy::TEvDeleteConnectionResponse> + (ev, Config, sender, cookie, scope, folderId, + std::move(request), std::move(user), std::move(token), + ControlPlaneStorageServiceActorId(), + requestCounters, + probe, + permissions, + cloudId, subjectType, {}, std::move(computeDatabase), !Config.ComputeConfig.YdbComputeControlPlaneEnabled())); + return; + } + + if (Config.ComputeConfig.YdbComputeControlPlaneEnabled()) { + if (!ev->Get()->YDBSession) { + Register(MakeComputeYDBSessionActor< + TEvControlPlaneProxy::TEvDeleteConnectionRequest, + TEvControlPlaneProxy::TEvDeleteConnectionResponse>( + sender, + std::move(ev), + Config.RequestTimeout, + Counters.GetCommonCounters(RTC_CREATE_YDB_SESSION), + Config.ComputeConfig, YqSharedResources, CredentialsProviderFactory)); + return; + } + + if (!ev->Get()->ComputeYDBOperationWasPerformed) { + Register(MakeDeleteConnectionActor( + sender, ev, Config.RequestTimeout, Counters, Signer)); + return; + } + + Send(sender, ev->Get()->Response.release()); + } } void Handle(TEvControlPlaneProxy::TEvTestConnectionRequest::TPtr& ev) { @@ -2893,15 +2191,21 @@ private: return; } - Register(new TRequestActor<FederatedQuery::TestConnectionRequest, - TEvTestConnection::TEvTestConnectionRequest, - TEvTestConnection::TEvTestConnectionResponse, - TEvControlPlaneProxy::TEvTestConnectionResponse> - (Config, ev->Sender, ev->Cookie, scope, folderId, - std::move(request), std::move(user), std::move(token), - TestConnectionActorId(), - requestCounters, - probe, ExtractPermissions(ev, {}), cloudId, subjectType)); + { + auto sender = ev->Sender; + auto cookie = ev->Cookie; + auto permissions = ExtractPermissions(ev, {}); + Register(new TRequestActor<FederatedQuery::TestConnectionRequest, + TEvTestConnection::TEvTestConnectionRequest, + TEvTestConnection::TEvTestConnectionResponse, + TEvControlPlaneProxy::TEvTestConnectionRequest, + TEvControlPlaneProxy::TEvTestConnectionResponse> + (ev, Config, sender, cookie, scope, folderId, + std::move(request), std::move(user), std::move(token), + TestConnectionActorId(), + requestCounters, + probe, permissions, cloudId, subjectType)); + } } void Handle(TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& ev) { @@ -2972,31 +2276,47 @@ private: }; if (Config.ComputeConfig.YdbComputeControlPlaneEnabled() && !ydbOperationWasPerformed) { + if (!ev->Get()->ConnectionName) { + auto permissions = ExtractPermissions(ev, availablePermissions); + Register(MakeDiscoverYDBConnectionName( + sender, ev, Counters, Config.RequestTimeout, permissions)); + return; + } + + if (!ev->Get()->YDBSession) { + Register(MakeComputeYDBSessionActor< + TEvControlPlaneProxy::TEvCreateBindingRequest, + TEvControlPlaneProxy::TEvCreateBindingResponse>( + sender, + std::move(ev), + Config.RequestTimeout, + Counters.GetCommonCounters(RTC_CREATE_YDB_SESSION), + Config.ComputeConfig, YqSharedResources, CredentialsProviderFactory)); + return; + } + + Register(MakeCreateBindingActor( + sender, std::move(ev), Config.RequestTimeout, Counters)); + return; + } + + { + auto sender = ev->Sender; + auto cookie = ev->Cookie; auto permissions = ExtractPermissions(ev, availablePermissions); - Register(new TCreateBindingInYDBActor( - Counters.GetCommonCounters(RTC_CREATE_BINDING_IN_YDB), - Config.ComputeConfig, - YqSharedResources, - CredentialsProviderFactory, - sender, - ev, - cookie, - std::move(permissions), - scope, - Config.RequestTimeout)); - return; - } - - Register(new TRequestActor<FederatedQuery::CreateBindingRequest, - TEvControlPlaneStorage::TEvCreateBindingRequest, - TEvControlPlaneStorage::TEvCreateBindingResponse, - TEvControlPlaneProxy::TEvCreateBindingResponse> - (Config, ev->Sender, ev->Cookie, scope, folderId, - std::move(request), std::move(user), std::move(token), - ControlPlaneStorageServiceActorId(), - requestCounters, - probe, ExtractPermissions(ev, availablePermissions), - cloudId, subjectType, {}, std::move(ev->Get()->ComputeDatabase))); + auto computeDatabase = ev->Get()->ComputeDatabase; + Register(new TRequestActor<FederatedQuery::CreateBindingRequest, + TEvControlPlaneStorage::TEvCreateBindingRequest, + TEvControlPlaneStorage::TEvCreateBindingResponse, + TEvControlPlaneProxy::TEvCreateBindingRequest, + TEvControlPlaneProxy::TEvCreateBindingResponse> + (ev, Config, sender, cookie, scope, folderId, + std::move(request), std::move(user), std::move(token), + ControlPlaneStorageServiceActorId(), + requestCounters, + probe, permissions, + cloudId, subjectType, {}, std::move(computeDatabase))); + } } void Handle(TEvControlPlaneProxy::TEvListBindingsRequest::TPtr& ev) { @@ -3052,16 +2372,22 @@ private: | TPermissions::TPermission::VIEW_PRIVATE }; - Register(new TRequestActor<FederatedQuery::ListBindingsRequest, - TEvControlPlaneStorage::TEvListBindingsRequest, - TEvControlPlaneStorage::TEvListBindingsResponse, - TEvControlPlaneProxy::TEvListBindingsResponse> - (Config, ev->Sender, ev->Cookie, scope, folderId, - std::move(request), std::move(user), std::move(token), - ControlPlaneStorageServiceActorId(), - requestCounters, - probe, - ExtractPermissions(ev, availablePermissions), cloudId, subjectType)); + { + auto sender = ev->Sender; + auto cookie = ev->Cookie; + auto permissions = ExtractPermissions(ev, availablePermissions); + Register(new TRequestActor<FederatedQuery::ListBindingsRequest, + TEvControlPlaneStorage::TEvListBindingsRequest, + TEvControlPlaneStorage::TEvListBindingsResponse, + TEvControlPlaneProxy::TEvListBindingsRequest, + TEvControlPlaneProxy::TEvListBindingsResponse> + (ev, Config, sender, cookie, scope, folderId, + std::move(request), std::move(user), std::move(token), + ControlPlaneStorageServiceActorId(), + requestCounters, + probe, + permissions, cloudId, subjectType)); + } } void Handle(TEvControlPlaneProxy::TEvDescribeBindingRequest::TPtr& ev) { @@ -3118,16 +2444,22 @@ private: | TPermissions::TPermission::VIEW_PRIVATE }; - Register(new TRequestActor<FederatedQuery::DescribeBindingRequest, - TEvControlPlaneStorage::TEvDescribeBindingRequest, - TEvControlPlaneStorage::TEvDescribeBindingResponse, - TEvControlPlaneProxy::TEvDescribeBindingResponse> - (Config, ev->Sender, ev->Cookie, scope, folderId, - std::move(request), std::move(user), std::move(token), - ControlPlaneStorageServiceActorId(), - requestCounters, - probe, - ExtractPermissions(ev, availablePermissions), cloudId, subjectType)); + { + auto sender = ev->Sender; + auto cookie = ev->Cookie; + auto permissions = ExtractPermissions(ev, availablePermissions); + Register(new TRequestActor<FederatedQuery::DescribeBindingRequest, + TEvControlPlaneStorage::TEvDescribeBindingRequest, + TEvControlPlaneStorage::TEvDescribeBindingResponse, + TEvControlPlaneProxy::TEvDescribeBindingRequest, + TEvControlPlaneProxy::TEvDescribeBindingResponse> + (ev, Config, sender, cookie, scope, folderId, + std::move(request), std::move(user), std::move(token), + ControlPlaneStorageServiceActorId(), + requestCounters, + probe, + permissions, cloudId, subjectType)); + } } void Handle(TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& ev) { @@ -3191,19 +2523,72 @@ private: static const TPermissions availablePermissions { TPermissions::TPermission::MANAGE_PUBLIC | TPermissions::TPermission::MANAGE_PRIVATE + | TPermissions::TPermission::VIEW_PUBLIC + | TPermissions::TPermission::VIEW_PRIVATE }; - Register(new TRequestActor<FederatedQuery::ModifyBindingRequest, - TEvControlPlaneStorage::TEvModifyBindingRequest, - TEvControlPlaneStorage::TEvModifyBindingResponse, - TEvControlPlaneProxy::TEvModifyBindingResponse> - (Config, ev->Sender, ev->Cookie, scope, folderId, - std::move(request), std::move(user), std::move(token), - ControlPlaneStorageServiceActorId(), - requestCounters, - probe, - ExtractPermissions(ev, availablePermissions), - cloudId, subjectType, {}, std::move(ev->Get()->ComputeDatabase))); + if (Config.ComputeConfig.YdbComputeControlPlaneEnabled()) { + if (!ev->Get()->OldBindingName) { + auto permissions = ExtractPermissions(ev, availablePermissions); + Register(MakeDiscoverYDBBindingName( + sender, ev, Counters, Config.RequestTimeout, permissions)); + return; + } + if (!ev->Get()->ConnectionName) { + auto permissions = ExtractPermissions(ev, availablePermissions); + Register(MakeDiscoverYDBConnectionName( + sender, ev, Counters, Config.RequestTimeout, permissions)); + return; + } + } + + const bool controlPlaneYDBOperationWasPerformed = ev->Get()->ControlPlaneYDBOperationWasPerformed; + if (!controlPlaneYDBOperationWasPerformed) { + auto sender = ev->Sender; + auto cookie = ev->Cookie; + auto permissions = ExtractPermissions(ev, availablePermissions); + auto computeDatabase = ev->Get()->ComputeDatabase; + Register(new TRequestActor<FederatedQuery::ModifyBindingRequest, + TEvControlPlaneStorage::TEvModifyBindingRequest, + TEvControlPlaneStorage::TEvModifyBindingResponse, + TEvControlPlaneProxy::TEvModifyBindingRequest, + TEvControlPlaneProxy::TEvModifyBindingResponse> + (ev, Config, sender, cookie, scope, folderId, + std::move(request), std::move(user), std::move(token), + ControlPlaneStorageServiceActorId(), + requestCounters, + probe, + permissions, + cloudId, subjectType, {}, std::move(computeDatabase), !Config.ComputeConfig.YdbComputeControlPlaneEnabled())); + return; + } + + if (Config.ComputeConfig.YdbComputeControlPlaneEnabled()) { + if (!ev->Get()->YDBSession) { + Register(MakeComputeYDBSessionActor< + TEvControlPlaneProxy::TEvModifyBindingRequest, + TEvControlPlaneProxy::TEvModifyBindingResponse>( + sender, + std::move(ev), + Config.RequestTimeout, + Counters.GetCommonCounters(RTC_CREATE_YDB_SESSION), + Config.ComputeConfig, + YqSharedResources, + CredentialsProviderFactory)); + return; + } + + if (!ev->Get()->ComputeYDBOperationWasPerformed) { + Register(MakeModifyBindingActor( + sender, + std::move(ev), + Config.RequestTimeout, + Counters)); + return; + } + + Send(sender, ev->Get()->Response.release()); + } } void Handle(TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr& ev) { @@ -3267,19 +2652,62 @@ private: static const TPermissions availablePermissions { TPermissions::TPermission::MANAGE_PUBLIC | TPermissions::TPermission::MANAGE_PRIVATE + | TPermissions::TPermission::VIEW_PUBLIC + | TPermissions::TPermission::VIEW_PRIVATE }; - Register(new TRequestActor<FederatedQuery::DeleteBindingRequest, - TEvControlPlaneStorage::TEvDeleteBindingRequest, - TEvControlPlaneStorage::TEvDeleteBindingResponse, - TEvControlPlaneProxy::TEvDeleteBindingResponse> - (Config, ev->Sender, ev->Cookie, scope, folderId, - std::move(request), std::move(user), std::move(token), - ControlPlaneStorageServiceActorId(), - requestCounters, - probe, - ExtractPermissions(ev, availablePermissions), - cloudId, subjectType, {}, std::move(ev->Get()->ComputeDatabase))); + if (Config.ComputeConfig.YdbComputeControlPlaneEnabled() && + !ev->Get()->OldBindingName) { + auto permissions = ExtractPermissions(ev, availablePermissions); + Register(MakeDiscoverYDBBindingName( + sender, ev, Counters, Config.RequestTimeout, permissions)); + return; + } + + const bool controlPlaneYDBOperationWasPerformed = ev->Get()->ControlPlaneYDBOperationWasPerformed; + if (!controlPlaneYDBOperationWasPerformed) { + auto sender = ev->Sender; + auto cookie = ev->Cookie; + auto permissions = ExtractPermissions(ev, availablePermissions); + auto computeDatabase = ev->Get()->ComputeDatabase; + Register(new TRequestActor<FederatedQuery::DeleteBindingRequest, + TEvControlPlaneStorage::TEvDeleteBindingRequest, + TEvControlPlaneStorage::TEvDeleteBindingResponse, + TEvControlPlaneProxy::TEvDeleteBindingRequest, + TEvControlPlaneProxy::TEvDeleteBindingResponse> + (ev, Config, sender, cookie, scope, folderId, + std::move(request), std::move(user), std::move(token), + ControlPlaneStorageServiceActorId(), + requestCounters, + probe, + permissions, + cloudId, subjectType, {}, std::move(computeDatabase), !Config.ComputeConfig.YdbComputeControlPlaneEnabled())); + return; + } + + if (Config.ComputeConfig.YdbComputeControlPlaneEnabled()) { + if (!ev->Get()->YDBSession) { + Register(MakeComputeYDBSessionActor< + TEvControlPlaneProxy::TEvDeleteBindingRequest, + TEvControlPlaneProxy::TEvDeleteBindingResponse>( + sender, + std::move(ev), + Config.RequestTimeout, + Counters.GetCommonCounters(RTC_CREATE_YDB_SESSION), + Config.ComputeConfig, + YqSharedResources, + CredentialsProviderFactory)); + return; + } + + if (!ev->Get()->ComputeYDBOperationWasPerformed) { + Register(MakeDeleteBindingActor( + sender, std::move(ev), Config.RequestTimeout, Counters)); + return; + } + + Send(sender, ev->Get()->Response.release()); + } } void Handle(NMon::TEvHttpInfo::TPtr& ev) { diff --git a/ydb/core/fq/libs/control_plane_proxy/events/events.h b/ydb/core/fq/libs/control_plane_proxy/events/events.h index e027245b724..bde54c6a8a0 100644 --- a/ydb/core/fq/libs/control_plane_proxy/events/events.h +++ b/ydb/core/fq/libs/control_plane_proxy/events/events.h @@ -1,5 +1,9 @@ #pragma once +#include "type_traits.h" + +#include <util/generic/maybe.h> +#include <ydb/public/sdk/cpp/client/ydb_table/table.h> #include <ydb/core/fq/libs/control_plane_storage/events/events.h> #include <ydb/core/fq/libs/quota_manager/events/events.h> @@ -10,6 +14,7 @@ #include <library/cpp/actors/interconnect/events_local.h> #include <ydb/library/yql/public/issue/yql_issue.h> +#include <memory> namespace NFq { @@ -63,15 +68,20 @@ struct TEvControlPlaneProxy { static_assert(EvEnd <= YqEventSubspaceEnd(NFq::TYqEventSubspace::ControlPlaneProxy), "All events must be in their subspace"); - template<typename ProtoMessage, ui32 EventType> - struct TControlPlaneRequest : NActors::TEventLocal<TControlPlaneRequest<ProtoMessage, EventType>, EventType> { - TControlPlaneRequest(const TString& folderId, - const ProtoMessage& request, - const TString& user, - const TString& token, - const TVector<TString>& permissions, - TMaybe<TQuotaMap> quotas = Nothing(), - TTenantInfo::TPtr tenantInfo = nullptr) + template<class Request> + struct TResponseSelector; + + template<typename TDerived, typename ProtoMessage, ui32 EventType> + struct TBaseControlPlaneRequest : NActors::TEventLocal<TDerived, EventType> { + using TProxyResponse = typename TResponseSelector<TDerived>::type; + + TBaseControlPlaneRequest(const TString& folderId, + const ProtoMessage& request, + const TString& user, + const TString& token, + const TVector<TString>& permissions, + TMaybe<TQuotaMap> quotas = Nothing(), + TTenantInfo::TPtr tenantInfo = nullptr) : FolderId(folderId) , Request(request) , User(user) @@ -80,8 +90,7 @@ struct TEvControlPlaneProxy { , Quotas(std::move(quotas)) , TenantInfo(tenantInfo) , ComputeYDBOperationWasPerformed(false) - { - } + , ControlPlaneYDBOperationWasPerformed(false) { } TString FolderId; TString CloudId; @@ -93,9 +102,15 @@ struct TEvControlPlaneProxy { TTenantInfo::TPtr TenantInfo; TString SubjectType; bool ComputeYDBOperationWasPerformed; + bool ControlPlaneYDBOperationWasPerformed; + std::unique_ptr<TProxyResponse> Response; + TMaybe<NYdb::NTable::TSession> YDBSession; TMaybe<FederatedQuery::Internal::ComputeDatabaseInternal> ComputeDatabase; }; + template<typename ProtoMessage, ui32 EventType> + struct TControlPlaneRequest; + template<typename TDerived, typename ProtoMessage, ui32 EventType> struct TControlPlaneResponse : NActors::TEventLocal<TDerived, EventType> { TControlPlaneResponse(const ProtoMessage& result, const TString& subjectType) @@ -188,6 +203,168 @@ struct TEvControlPlaneProxy { using TEvModifyBindingResponse = TControlPlaneAuditableResponse<FederatedQuery::ModifyBindingResult, FederatedQuery::Binding, EvModifyBindingResponse>; using TEvDeleteBindingRequest = TControlPlaneRequest<FederatedQuery::DeleteBindingRequest, EvDeleteBindingRequest>; using TEvDeleteBindingResponse = TControlPlaneAuditableResponse<FederatedQuery::DeleteBindingResult, FederatedQuery::Binding, EvDeleteBindingResponse>; + + template<> + struct TResponseSelector<TEvCreateQueryRequest> { + using type = TEvCreateQueryResponse; + }; + template<> + struct TResponseSelector<TEvListQueriesRequest> { + using type = TEvListQueriesResponse; + }; + template<> + struct TResponseSelector<TEvDescribeQueryRequest> { + using type = TEvDescribeQueryResponse; + }; + template<> + struct TResponseSelector<TEvGetQueryStatusRequest> { + using type = TEvGetQueryStatusResponse; + }; + template<> + struct TResponseSelector<TEvModifyQueryRequest> { + using type = TEvModifyQueryResponse; + }; + template<> + struct TResponseSelector<TEvDeleteQueryRequest> { + using type = TEvDeleteQueryResponse; + }; + template<> + struct TResponseSelector<TEvControlQueryRequest> { + using type = TEvControlQueryResponse; + }; + template<> + struct TResponseSelector<TEvGetResultDataRequest> { + using type = TEvGetResultDataResponse; + }; + template<> + struct TResponseSelector<TEvListJobsRequest> { + using type = TEvListJobsResponse; + }; + template<> + struct TResponseSelector<TEvDescribeJobRequest> { + using type = TEvDescribeJobResponse; + }; + template<> + struct TResponseSelector<TEvCreateConnectionRequest> { + using type = TEvCreateConnectionResponse; + }; + template<> + struct TResponseSelector<TEvListConnectionsRequest> { + using type = TEvListConnectionsResponse; + }; + template<> + struct TResponseSelector<TEvDescribeConnectionRequest> { + using type = TEvDescribeConnectionResponse; + }; + template<> + struct TResponseSelector<TEvModifyConnectionRequest> { + using type = TEvModifyConnectionResponse; + }; + template<> + struct TResponseSelector<TEvDeleteConnectionRequest> { + using type = TEvDeleteConnectionResponse; + }; + template<> + struct TResponseSelector<TEvTestConnectionRequest> { + using type = TEvTestConnectionResponse; + }; + template<> + struct TResponseSelector<TEvCreateBindingRequest> { + using type = TEvCreateBindingResponse; + }; + template<> + struct TResponseSelector<TEvListBindingsRequest> { + using type = TEvListBindingsResponse; + }; + template<> + struct TResponseSelector<TEvDescribeBindingRequest> { + using type = TEvDescribeBindingResponse; + }; + template<> + struct TResponseSelector<TEvModifyBindingRequest> { + using type = TEvModifyBindingResponse; + }; + template<> + struct TResponseSelector<TEvDeleteBindingRequest> { + using type = TEvDeleteBindingResponse; + }; + + template<typename ProtoMessage, ui32 EventType> + struct TControlPlaneRequest : + public TBaseControlPlaneRequest<TControlPlaneRequest<ProtoMessage, EventType>, + ProtoMessage, + EventType> { + using TBaseControlPlaneRequest<TControlPlaneRequest<ProtoMessage, EventType>, + ProtoMessage, + EventType>::TBaseControlPlaneRequest; + }; + + template<> + struct TControlPlaneRequest<FederatedQuery::ModifyConnectionRequest, EvModifyConnectionRequest> : + public TBaseControlPlaneRequest<TEvModifyConnectionRequest, + FederatedQuery::ModifyConnectionRequest, + EvModifyConnectionRequest> { + using TBaseControlPlaneRequest< + TControlPlaneRequest<FederatedQuery::ModifyConnectionRequest, EvModifyConnectionRequest>, + FederatedQuery::ModifyConnectionRequest, + EvModifyConnectionRequest>::TBaseControlPlaneRequest; + + TMaybe<FederatedQuery::ConnectionContent> OldConnectionContent; + }; + + template<> + struct TControlPlaneRequest<FederatedQuery::DeleteConnectionRequest, EvDeleteConnectionRequest> : + public TBaseControlPlaneRequest<TEvDeleteConnectionRequest, + FederatedQuery::DeleteConnectionRequest, + EvDeleteConnectionRequest> { + using TBaseControlPlaneRequest< + TControlPlaneRequest<FederatedQuery::DeleteConnectionRequest, EvDeleteConnectionRequest>, + FederatedQuery::DeleteConnectionRequest, + EvDeleteConnectionRequest>::TBaseControlPlaneRequest; + + TMaybe<FederatedQuery::ConnectionContent> ConnectionContent; + }; + + template<> + struct TControlPlaneRequest<FederatedQuery::CreateBindingRequest, EvCreateBindingRequest> : + public TBaseControlPlaneRequest<TEvCreateBindingRequest, + FederatedQuery::CreateBindingRequest, + EvCreateBindingRequest> { + using TBaseControlPlaneRequest< + TControlPlaneRequest<FederatedQuery::CreateBindingRequest, EvCreateBindingRequest>, + FederatedQuery::CreateBindingRequest, + EvCreateBindingRequest>::TBaseControlPlaneRequest; + + TMaybe<TString> ConnectionName; + }; + + template<> + struct TControlPlaneRequest<FederatedQuery::ModifyBindingRequest, EvModifyBindingRequest> : + public TBaseControlPlaneRequest<TEvModifyBindingRequest, + FederatedQuery::ModifyBindingRequest, + EvModifyBindingRequest> { + using TBaseControlPlaneRequest< + TControlPlaneRequest<FederatedQuery::ModifyBindingRequest, EvModifyBindingRequest>, + FederatedQuery::ModifyBindingRequest, + EvModifyBindingRequest>::TBaseControlPlaneRequest; + + TMaybe<TString> OldBindingName; + TMaybe<TString> ConnectionId; + TMaybe<TString> ConnectionName; + }; + + template<> + struct TControlPlaneRequest<FederatedQuery::DeleteBindingRequest, EvDeleteBindingRequest> : + public TBaseControlPlaneRequest<TEvDeleteBindingRequest, + FederatedQuery::DeleteBindingRequest, + EvDeleteBindingRequest> { + using TBaseControlPlaneRequest< + TControlPlaneRequest<FederatedQuery::DeleteBindingRequest, EvDeleteBindingRequest>, + FederatedQuery::DeleteBindingRequest, + EvDeleteBindingRequest>::TBaseControlPlaneRequest; + + TMaybe<TString> OldBindingName; + }; }; } diff --git a/ydb/core/fq/libs/control_plane_proxy/events/type_traits.h b/ydb/core/fq/libs/control_plane_proxy/events/type_traits.h new file mode 100644 index 00000000000..029c34cfcc4 --- /dev/null +++ b/ydb/core/fq/libs/control_plane_proxy/events/type_traits.h @@ -0,0 +1,100 @@ +#pragma once + +#include <ydb/public/api/protos/draft/fq.pb.h> + +namespace NFq { +namespace NEvControlPlaneProxy { + +template<class RequestProtoMessage> +struct TResponseProtoMessage; + +template<> +struct TResponseProtoMessage<FederatedQuery::CreateQueryRequest> { + using type = FederatedQuery::CreateQueryResult; +}; +template<> +struct TResponseProtoMessage<FederatedQuery::ListQueriesRequest> { + using type = FederatedQuery::ListQueriesResult; +}; +template<> +struct TResponseProtoMessage<FederatedQuery::DescribeQueryRequest> { + using type = FederatedQuery::DescribeQueryResult; +}; +template<> +struct TResponseProtoMessage<FederatedQuery::GetQueryStatusRequest> { + using type = FederatedQuery::GetQueryStatusResult; +}; +template<> +struct TResponseProtoMessage<FederatedQuery::ModifyQueryRequest> { + using type = FederatedQuery::ModifyQueryResult; +}; +template<> +struct TResponseProtoMessage<FederatedQuery::DeleteQueryRequest> { + using type = FederatedQuery::DeleteQueryResult; +}; +template<> +struct TResponseProtoMessage<FederatedQuery::ControlQueryRequest> { + using type = FederatedQuery::ControlQueryResult; +}; +template<> +struct TResponseProtoMessage<FederatedQuery::GetResultDataRequest> { + using type = FederatedQuery::GetResultDataResult; +}; +template<> +struct TResponseProtoMessage<FederatedQuery::ListJobsRequest> { + using type = FederatedQuery::ListJobsResult; +}; +template<> +struct TResponseProtoMessage<FederatedQuery::DescribeJobRequest> { + using type = FederatedQuery::DescribeJobResult; +}; +template<> +struct TResponseProtoMessage<FederatedQuery::CreateConnectionRequest> { + using type = FederatedQuery::CreateConnectionResult; +}; +template<> +struct TResponseProtoMessage<FederatedQuery::ListConnectionsRequest> { + using type = FederatedQuery::ListConnectionsResult; +}; +template<> +struct TResponseProtoMessage<FederatedQuery::DescribeConnectionRequest> { + using type = FederatedQuery::DescribeConnectionResult; +}; +template<> +struct TResponseProtoMessage<FederatedQuery::ModifyConnectionRequest> { + using type = FederatedQuery::ModifyConnectionResult; +}; +template<> +struct TResponseProtoMessage<FederatedQuery::DeleteConnectionRequest> { + using type = FederatedQuery::DeleteConnectionResult; +}; +template<> +struct TResponseProtoMessage<FederatedQuery::TestConnectionRequest> { + using type = FederatedQuery::TestConnectionResult; +}; +template<> +struct TResponseProtoMessage<FederatedQuery::CreateBindingRequest> { + using type = FederatedQuery::CreateBindingResult; +}; +template<> +struct TResponseProtoMessage<FederatedQuery::ListBindingsRequest> { + using type = FederatedQuery::ListBindingsResult; +}; +template<> +struct TResponseProtoMessage<FederatedQuery::DescribeBindingRequest> { + using type = FederatedQuery::DescribeBindingResult; +}; +template<> +struct TResponseProtoMessage<FederatedQuery::ModifyBindingRequest> { + using type = FederatedQuery::ModifyBindingResult; +}; +template<> +struct TResponseProtoMessage<FederatedQuery::DeleteBindingRequest> { + using type = FederatedQuery::DeleteBindingResult; +}; + +template<class Request> +struct ResponseSelector; + +} // namespace NEvControlPlaneProxy +} // namespace NFq diff --git a/ydb/core/fq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp b/ydb/core/fq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp index c0a70b40789..6c36450e35d 100644 --- a/ydb/core/fq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp +++ b/ydb/core/fq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp @@ -2468,8 +2468,8 @@ Y_UNIT_TEST_SUITE(TControlPlaneProxyCheckNegativePermissionsSuccess) { auto event = request->Get<TEvControlPlaneStorage::TEvModifyConnectionRequest>(); auto permissions = event->Permissions; UNIT_ASSERT_VALUES_EQUAL(event->Scope, "yandexcloud://my_folder"); - UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PUBLIC)); - UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PRIVATE)); + UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PUBLIC)); + UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PRIVATE)); UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_AST)); UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PUBLIC)); UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PRIVATE)); @@ -2487,8 +2487,8 @@ Y_UNIT_TEST_SUITE(TControlPlaneProxyCheckNegativePermissionsSuccess) { auto event = request->Get<TEvControlPlaneStorage::TEvModifyConnectionRequest>(); auto permissions = event->Permissions; UNIT_ASSERT_VALUES_EQUAL(event->Scope, "yandexcloud://my_folder"); - UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PUBLIC)); - UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PRIVATE)); + UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PUBLIC)); + UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PRIVATE)); UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_AST)); UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PUBLIC)); UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PRIVATE)); @@ -2506,8 +2506,8 @@ Y_UNIT_TEST_SUITE(TControlPlaneProxyCheckNegativePermissionsSuccess) { auto event = request->Get<TEvControlPlaneStorage::TEvDeleteConnectionRequest>(); auto permissions = event->Permissions; UNIT_ASSERT_VALUES_EQUAL(event->Scope, "yandexcloud://my_folder"); - UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PUBLIC)); - UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PRIVATE)); + UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PUBLIC)); + UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PRIVATE)); UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_AST)); UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PUBLIC)); UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PRIVATE)); @@ -2620,8 +2620,8 @@ Y_UNIT_TEST_SUITE(TControlPlaneProxyCheckNegativePermissionsSuccess) { auto event = request->Get<TEvControlPlaneStorage::TEvModifyBindingRequest>(); auto permissions = event->Permissions; UNIT_ASSERT_VALUES_EQUAL(event->Scope, "yandexcloud://my_folder"); - UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PUBLIC)); - UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PRIVATE)); + UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PUBLIC)); + UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PRIVATE)); UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_AST)); UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PUBLIC)); UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PRIVATE)); @@ -2639,8 +2639,8 @@ Y_UNIT_TEST_SUITE(TControlPlaneProxyCheckNegativePermissionsSuccess) { auto event = request->Get<TEvControlPlaneStorage::TEvDeleteBindingRequest>(); auto permissions = event->Permissions; UNIT_ASSERT_VALUES_EQUAL(event->Scope, "yandexcloud://my_folder"); - UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PUBLIC)); - UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PRIVATE)); + UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PUBLIC)); + UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PRIVATE)); UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_AST)); UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PUBLIC)); UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PRIVATE)); @@ -2931,8 +2931,8 @@ Y_UNIT_TEST_SUITE(TControlPlaneProxyShouldPassHids) { auto event = request->Get<TEvControlPlaneStorage::TEvModifyConnectionRequest>(); auto permissions = event->Permissions; UNIT_ASSERT_VALUES_EQUAL(event->Scope, "yandexcloud://my_folder"); - UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PUBLIC)); - UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PRIVATE)); + UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PUBLIC)); + UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PRIVATE)); UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_AST)); UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PUBLIC)); UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PRIVATE)); @@ -2949,8 +2949,8 @@ Y_UNIT_TEST_SUITE(TControlPlaneProxyShouldPassHids) { auto event = request->Get<TEvControlPlaneStorage::TEvDeleteConnectionRequest>(); auto permissions = event->Permissions; UNIT_ASSERT_VALUES_EQUAL(event->Scope, "yandexcloud://my_folder"); - UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PUBLIC)); - UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PRIVATE)); + UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PUBLIC)); + UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PRIVATE)); UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_AST)); UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PUBLIC)); UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PRIVATE)); @@ -3039,8 +3039,8 @@ Y_UNIT_TEST_SUITE(TControlPlaneProxyShouldPassHids) { auto event = request->Get<TEvControlPlaneStorage::TEvModifyBindingRequest>(); auto permissions = event->Permissions; UNIT_ASSERT_VALUES_EQUAL(event->Scope, "yandexcloud://my_folder"); - UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PUBLIC)); - UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PRIVATE)); + UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PUBLIC)); + UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PRIVATE)); UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_AST)); UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PUBLIC)); UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PRIVATE)); @@ -3057,8 +3057,8 @@ Y_UNIT_TEST_SUITE(TControlPlaneProxyShouldPassHids) { auto event = request->Get<TEvControlPlaneStorage::TEvDeleteBindingRequest>(); auto permissions = event->Permissions; UNIT_ASSERT_VALUES_EQUAL(event->Scope, "yandexcloud://my_folder"); - UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PUBLIC)); - UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PRIVATE)); + UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PUBLIC)); + UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PRIVATE)); UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_AST)); UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PUBLIC)); UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PRIVATE)); @@ -3339,7 +3339,7 @@ Y_UNIT_TEST_SUITE(TControlPlaneProxyShouldPassHids) { auto event = request->Get<TEvControlPlaneStorage::TEvModifyConnectionRequest>(); auto permissions = event->Permissions; UNIT_ASSERT_VALUES_EQUAL(event->Scope, "yandexcloud://my_folder"); - UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PUBLIC)); + UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PUBLIC)); UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PRIVATE)); UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_AST)); UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PUBLIC)); @@ -3357,7 +3357,7 @@ Y_UNIT_TEST_SUITE(TControlPlaneProxyShouldPassHids) { auto event = request->Get<TEvControlPlaneStorage::TEvDeleteConnectionRequest>(); auto permissions = event->Permissions; UNIT_ASSERT_VALUES_EQUAL(event->Scope, "yandexcloud://my_folder"); - UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PUBLIC)); + UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PUBLIC)); UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PRIVATE)); UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_AST)); UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PUBLIC)); @@ -3447,7 +3447,7 @@ Y_UNIT_TEST_SUITE(TControlPlaneProxyShouldPassHids) { auto event = request->Get<TEvControlPlaneStorage::TEvModifyBindingRequest>(); auto permissions = event->Permissions; UNIT_ASSERT_VALUES_EQUAL(event->Scope, "yandexcloud://my_folder"); - UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PUBLIC)); + UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PUBLIC)); UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PRIVATE)); UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_AST)); UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PUBLIC)); @@ -3465,7 +3465,7 @@ Y_UNIT_TEST_SUITE(TControlPlaneProxyShouldPassHids) { auto event = request->Get<TEvControlPlaneStorage::TEvDeleteBindingRequest>(); auto permissions = event->Permissions; UNIT_ASSERT_VALUES_EQUAL(event->Scope, "yandexcloud://my_folder"); - UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PUBLIC)); + UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PUBLIC)); UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PRIVATE)); UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_AST)); UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PUBLIC)); diff --git a/ydb/core/fq/libs/control_plane_proxy/ya.make b/ydb/core/fq/libs/control_plane_proxy/ya.make index ebcb92eadae..44f5c3c6348 100644 --- a/ydb/core/fq/libs/control_plane_proxy/ya.make +++ b/ydb/core/fq/libs/control_plane_proxy/ya.make @@ -14,6 +14,7 @@ PEERDIR( ydb/core/fq/libs/compute/ydb ydb/core/fq/libs/compute/ydb/control_plane ydb/core/fq/libs/control_plane_config + ydb/core/fq/libs/control_plane_proxy/actors ydb/core/fq/libs/control_plane_proxy/events ydb/core/fq/libs/control_plane_storage ydb/core/fq/libs/rate_limiter/events @@ -30,6 +31,7 @@ YQL_LAST_ABI_VERSION() END() RECURSE( + actors events ) diff --git a/ydb/core/grpc_services/rpc_fq.cpp b/ydb/core/grpc_services/rpc_fq.cpp index b68d76d2e0b..1e6c59ea339 100644 --- a/ydb/core/grpc_services/rpc_fq.cpp +++ b/ydb/core/grpc_services/rpc_fq.cpp @@ -545,7 +545,10 @@ std::unique_ptr<TEvProxyRuntimeEvent> CreateFederatedQueryModifyConnectionReques static const std::function permissions{ [](const FederatedQuery::ModifyConnectionRequest& request) -> TVector<NPerms::TPermission> { TVector<NPerms::TPermission> basePermissions{ NPerms::Required("yq.connections.update"), - NPerms::Optional("yq.resources.managePrivate") + NPerms::Required("yq.connections.get"), + NPerms::Optional("yq.resources.managePrivate"), + NPerms::Optional("yq.resources.viewPublic"), + NPerms::Optional("yq.resources.viewPrivate") }; if (request.content().acl().visibility() == FederatedQuery::Acl::SCOPE) { basePermissions.push_back(NPerms::Required("yq.resources.managePublic")); @@ -560,8 +563,11 @@ std::unique_ptr<TEvProxyRuntimeEvent> CreateFederatedQueryDeleteConnectionReques static const std::function permissions{ [](const FederatedQuery::DeleteConnectionRequest&) -> TVector<NPerms::TPermission> { return { NPerms::Required("yq.connections.delete"), + NPerms::Required("yq.connections.get"), NPerms::Optional("yq.resources.managePublic"), - NPerms::Optional("yq.resources.managePrivate") + NPerms::Optional("yq.resources.managePrivate"), + NPerms::Optional("yq.resources.viewPublic"), + NPerms::Optional("yq.resources.viewPrivate") }; } }; @@ -628,8 +634,12 @@ std::unique_ptr<TEvProxyRuntimeEvent> CreateFederatedQueryModifyBindingRequestOp // so yq.resources.managePublic is always requested as optional return { NPerms::Required("yq.bindings.update"), + NPerms::Required("yq.bindings.get"), + NPerms::Required("yq.connections.get"), NPerms::Optional("yq.resources.managePrivate"), - NPerms::Optional("yq.resources.managePublic") + NPerms::Optional("yq.resources.managePublic"), + NPerms::Optional("yq.resources.viewPublic"), + NPerms::Optional("yq.resources.viewPrivate") }; } }; @@ -640,8 +650,11 @@ std::unique_ptr<TEvProxyRuntimeEvent> CreateFederatedQueryDeleteBindingRequestOp static const std::function permissions{ [](const FederatedQuery::DeleteBindingRequest&) -> TVector<NPerms::TPermission> { return { NPerms::Required("yq.bindings.delete"), + NPerms::Required("yq.bindings.get"), NPerms::Optional("yq.resources.managePublic"), - NPerms::Optional("yq.resources.managePrivate") + NPerms::Optional("yq.resources.managePrivate"), + NPerms::Optional("yq.resources.viewPublic"), + NPerms::Optional("yq.resources.viewPrivate") }; } }; |