aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorauzhegov <auzhegov@yandex-team.com>2023-07-25 10:37:07 +0300
committerauzhegov <auzhegov@yandex-team.com>2023-07-25 10:37:07 +0300
commita8d09bf9e0b3e44c19dbfcc49e7f4d558513897b (patch)
treee74a45937ad4198be462b5f907e01e0d9f80d8cc
parent8bc959020d0214c816dea46289a86351455cc27b (diff)
downloadydb-a8d09bf9e0b3e44c19dbfcc49e7f4d558513897b.tar.gz
Delete/Modify Connection/Binding
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.darwin-x86_64.txt29
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-aarch64.txt30
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-x86_64.txt30
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.txt17
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.windows-x86_64.txt29
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/base_actor.h105
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp331
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h58
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/counters.h297
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp256
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h29
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/ya.make21
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/ydb_create_compute_session_actor.h181
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp328
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h58
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp1874
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/events/events.h199
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/events/type_traits.h100
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp44
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/ya.make2
-rw-r--r--ydb/core/grpc_services/rpc_fq.cpp21
25 files changed, 2787 insertions, 1260 deletions
diff --git a/ydb/core/fq/libs/control_plane_proxy/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/control_plane_proxy/CMakeLists.darwin-x86_64.txt
index 631673b30cc..959fb597690 100644
--- a/ydb/core/fq/libs/control_plane_proxy/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/fq/libs/control_plane_proxy/CMakeLists.darwin-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(actors)
add_subdirectory(events)
add_subdirectory(ut)
@@ -23,6 +24,7 @@ target_link_libraries(fq-libs-control_plane_proxy PUBLIC
libs-compute-ydb
compute-ydb-control_plane
fq-libs-control_plane_config
+ libs-control_plane_proxy-actors
libs-control_plane_proxy-events
fq-libs-control_plane_storage
libs-rate_limiter-events
diff --git a/ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-aarch64.txt
index df189785d00..d24f05ca436 100644
--- a/ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-aarch64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(actors)
add_subdirectory(events)
add_subdirectory(ut)
@@ -24,6 +25,7 @@ target_link_libraries(fq-libs-control_plane_proxy PUBLIC
libs-compute-ydb
compute-ydb-control_plane
fq-libs-control_plane_config
+ libs-control_plane_proxy-actors
libs-control_plane_proxy-events
fq-libs-control_plane_storage
libs-rate_limiter-events
diff --git a/ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-x86_64.txt
index df189785d00..d24f05ca436 100644
--- a/ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(actors)
add_subdirectory(events)
add_subdirectory(ut)
@@ -24,6 +25,7 @@ target_link_libraries(fq-libs-control_plane_proxy PUBLIC
libs-compute-ydb
compute-ydb-control_plane
fq-libs-control_plane_config
+ libs-control_plane_proxy-actors
libs-control_plane_proxy-events
fq-libs-control_plane_storage
libs-rate_limiter-events
diff --git a/ydb/core/fq/libs/control_plane_proxy/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/control_plane_proxy/CMakeLists.windows-x86_64.txt
index 631673b30cc..959fb597690 100644
--- a/ydb/core/fq/libs/control_plane_proxy/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/fq/libs/control_plane_proxy/CMakeLists.windows-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(actors)
add_subdirectory(events)
add_subdirectory(ut)
@@ -23,6 +24,7 @@ target_link_libraries(fq-libs-control_plane_proxy PUBLIC
libs-compute-ydb
compute-ydb-control_plane
fq-libs-control_plane_config
+ libs-control_plane_proxy-actors
libs-control_plane_proxy-events
fq-libs-control_plane_storage
libs-rate_limiter-events
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..870e205dcc4
--- /dev/null
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,29 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(libs-control_plane_proxy-actors)
+target_compile_options(libs-control_plane_proxy-actors PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(libs-control_plane_proxy-actors PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-fmt
+ library-cpp-iterator
+ libs-control_plane_proxy-events
+ libs-control_plane_storage-events
+ fq-libs-result_formatter
+ core-kqp-provider
+ library-db_pool-protos
+)
+target_sources(libs-control_plane_proxy-actors PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp
+)
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..5429166e90d
--- /dev/null
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,30 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(libs-control_plane_proxy-actors)
+target_compile_options(libs-control_plane_proxy-actors PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(libs-control_plane_proxy-actors PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-fmt
+ library-cpp-iterator
+ libs-control_plane_proxy-events
+ libs-control_plane_storage-events
+ fq-libs-result_formatter
+ core-kqp-provider
+ library-db_pool-protos
+)
+target_sources(libs-control_plane_proxy-actors PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp
+)
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..5429166e90d
--- /dev/null
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,30 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(libs-control_plane_proxy-actors)
+target_compile_options(libs-control_plane_proxy-actors PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(libs-control_plane_proxy-actors PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-fmt
+ library-cpp-iterator
+ libs-control_plane_proxy-events
+ libs-control_plane_storage-events
+ fq-libs-result_formatter
+ core-kqp-provider
+ library-db_pool-protos
+)
+target_sources(libs-control_plane_proxy-actors PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp
+)
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.txt b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.txt
new file mode 100644
index 00000000000..f8b31df0c11
--- /dev/null
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..870e205dcc4
--- /dev/null
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,29 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(libs-control_plane_proxy-actors)
+target_compile_options(libs-control_plane_proxy-actors PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(libs-control_plane_proxy-actors PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-fmt
+ library-cpp-iterator
+ libs-control_plane_proxy-events
+ libs-control_plane_storage-events
+ fq-libs-result_formatter
+ core-kqp-provider
+ library-db_pool-protos
+)
+target_sources(libs-control_plane_proxy-actors PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp
+)
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/base_actor.h b/ydb/core/fq/libs/control_plane_proxy/actors/base_actor.h
new file mode 100644
index 00000000000..aeb6ec9409a
--- /dev/null
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/base_actor.h
@@ -0,0 +1,105 @@
+#pragma once
+
+#include "counters.h"
+
+#include <contrib/libs/fmt/include/fmt/format.h>
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+
+#include <ydb/core/fq/libs/actors/logging/log.h>
+#include <ydb/core/fq/libs/config/protos/issue_id.pb.h>
+#include <ydb/core/fq/libs/config/yq_issue.h>
+#include <ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.h>
+#include <ydb/library/yql/public/issue/yql_issue.h>
+
+namespace NFq {
+namespace NPrivate {
+
+using namespace NActors;
+using namespace NThreading;
+using namespace NYdb;
+
+template<typename T>
+struct TBaseActorTypeTag {
+ using TRequest = typename T::TRequest;
+ using TResponse = typename T::TResponse;
+};
+
+template<typename TDerived>
+class TBaseActor : public NActors::TActorBootstrapped<TDerived> {
+public:
+ using TBase = NActors::TActorBootstrapped<TDerived>;
+ using TBase::Become;
+ using TBase::PassAway;
+ using TBase::SelfId;
+ using TBase::Send;
+
+ using TEventRequestPtr = typename TBaseActorTypeTag<TDerived>::TRequest::TPtr;
+ using TEventResponse = typename TBaseActorTypeTag<TDerived>::TResponse;
+
+public:
+ TBaseActor(const TActorId& sender,
+ const TEventRequestPtr request,
+ TDuration requestTimeout,
+ const NPrivate::TRequestCommonCountersPtr& counters)
+ : Request(std::move(request))
+ , Counters(counters)
+ , Sender(sender)
+ , RequestTimeout(requestTimeout) { }
+
+ void Bootstrap() {
+ CPP_LOG_T("TBaseActor Bootstrap started. Actor id: " << SelfId());
+ Become(&TDerived::StateFunc, RequestTimeout, new NActors::TEvents::TEvWakeup());
+ Counters->InFly->Inc();
+ BootstrapImpl();
+ }
+
+ void SendErrorMessageToSender(const NYql::TIssue& issue) {
+ Counters->Error->Inc();
+ NYql::TIssues issues;
+ issues.AddIssue(issue);
+ Send(Sender, new TEventResponse(issues, {}), 0, Request->Cookie);
+ PassAway();
+ }
+
+ void SendRequestToSender() {
+ Counters->Ok->Inc();
+ Send(Request->Forward(ControlPlaneProxyActorId()));
+ PassAway();
+ }
+
+ void HandleTimeout() {
+ CPP_LOG_D("TBaseActor Timeout occurred. Actor id: "
+ << SelfId());
+ Counters->Timeout->Inc();
+ SendErrorMessageToSender(MakeErrorIssue(
+ TIssuesIds::TIMEOUT,
+ "Timeout occurred. Try repeating the request later"));
+ }
+
+ void HandleError(const TString& message, EStatus status, const NYql::TIssues& issues) {
+ TString errorMessage = TStringBuilder{} << message << ". Status " << status;
+ HandleError(errorMessage, issues);
+ }
+
+ void HandleError(const TString& message, const NYql::TIssues& issues) {
+ CPP_LOG_E(message);
+ NYql::TIssue issue = MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, message);
+ for (auto& subIssue : issues) {
+ issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue));
+ }
+ SendErrorMessageToSender(std::move(issue));
+ }
+
+private:
+ virtual void BootstrapImpl() = 0;
+
+protected:
+ const TEventRequestPtr Request;
+ const NPrivate::TRequestCommonCountersPtr Counters;
+ const TActorId Sender;
+ const TDuration RequestTimeout;
+};
+
+} // namespace NPrivate
+} // namespace NFq
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp
new file mode 100644
index 00000000000..168e4f6fa2c
--- /dev/null
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp
@@ -0,0 +1,331 @@
+#include "control_plane_storage_requester_actor.h"
+#include "base_actor.h"
+
+#include <contrib/libs/fmt/include/fmt/format.h>
+#include <library/cpp/actors/core/event.h>
+
+#include <ydb/core/fq/libs/control_plane_proxy/events/events.h>
+#include <ydb/core/fq/libs/control_plane_storage/control_plane_storage.h>
+#include <ydb/core/fq/libs/control_plane_storage/events/events.h>
+#include <ydb/library/yql/public/issue/yql_issue.h>
+#include <ydb/public/api/protos/draft/fq.pb.h>
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+
+namespace NFq {
+namespace NPrivate {
+
+using namespace NActors;
+using namespace NFq::NConfig;
+using namespace NKikimr;
+using namespace NThreading;
+
+template<class TEventRequest, class TEventResponse, class TCPSEventRequest, class TCPSEventResponse>
+class TControlPlaneStorageRequesterActor;
+
+template<class TEventRequest, class TEventResponse, class TCPSEventRequest, class TCPSEventResponse>
+struct TBaseActorTypeTag<
+ TControlPlaneStorageRequesterActor<TEventRequest, TEventResponse, TCPSEventRequest, TCPSEventResponse>> {
+ using TRequest = TEventRequest;
+ using TResponse = TEventResponse;
+};
+
+template<class TEventRequest, class TEventResponse, class TCPSEventRequest, class TCPSEventResponse>
+class TControlPlaneStorageRequesterActor :
+ public TBaseActor<
+ TControlPlaneStorageRequesterActor<TEventRequest, TEventResponse, TCPSEventRequest, TCPSEventResponse>> {
+private:
+ using TBase = TBaseActor<
+ TControlPlaneStorageRequesterActor<TEventRequest, TEventResponse, TCPSEventRequest, TCPSEventResponse>>;
+ using TBase::SelfId;
+ using TBase::Send;
+ using TBase::Request;
+
+ using TEventRequestPtr = typename TEventRequest::TPtr;
+
+public:
+ using TCPSRequestFactory =
+ std::function<typename TCPSEventRequest::TProto(const TEventRequestPtr& request)>;
+ using TErrorMessageFactoryMethod = std::function<TString(const NYql::TIssues& issues)>;
+ using TEntityNameExtractorFactoryMethod =
+ std::function<void(const TEventRequestPtr& request,
+ const typename TCPSEventResponse::TProto& response)>;
+
+ TControlPlaneStorageRequesterActor(const TActorId& sender,
+ const TEventRequestPtr request,
+ TDuration requestTimeout,
+ const NPrivate::TRequestCommonCountersPtr& counters,
+ TPermissions permissions,
+ TCPSRequestFactory cpsRequestFactory,
+ TErrorMessageFactoryMethod errorMessageFactoryMethod,
+ TEntityNameExtractorFactoryMethod entityNameExtractorFactoryMethod)
+ : TBaseActor<
+ TControlPlaneStorageRequesterActor<TEventRequest, TEventResponse, TCPSEventRequest, TCPSEventResponse>>(
+ sender, std::move(request), requestTimeout, counters)
+ , Permissions(permissions)
+ , CPSRequestFactory(cpsRequestFactory)
+ , ErrorMessageFactoryMethod(errorMessageFactoryMethod)
+ , EntityNameExtractorFactoryMethod(entityNameExtractorFactoryMethod) { }
+
+ static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_REQUEST_CONTROL_PLANE_STORAGE";
+
+ void BootstrapImpl() override {
+ SendCPSRequest();
+ }
+
+ void SendCPSRequest() {
+ CPP_LOG_I("TControlPlaneStorageRequesterActor Sending CPS request. Actor id: " << TBase::SelfId());
+ const auto& request = Request;
+ auto event = new TCPSEventRequest("yandexcloud://" + request->Get()->FolderId,
+ CPSRequestFactory(request),
+ request->Get()->User,
+ request->Get()->Token,
+ request->Get()->CloudId,
+ Permissions,
+ request->Get()->Quotas,
+ request->Get()->TenantInfo,
+ {});
+ Send(ControlPlaneStorageServiceActorId(), event);
+ }
+
+ STRICT_STFUNC(StateFunc,
+ cFunc(NActors::TEvents::TSystem::Wakeup, TBase::HandleTimeout);
+ hFunc(TCPSEventResponse, Handle);
+ )
+
+ void Handle(typename TCPSEventResponse::TPtr& event) {
+ CPP_LOG_I("TControlPlaneStorageRequesterActor Handling CPS response. Actor id: " << TBase::SelfId());
+ auto issues = event->Get()->Issues;
+ if (!issues.Empty()) {
+ CPP_LOG_I("TControlPlaneStorageRequesterActor Handling CPS response. Request finished with issues. Actor id: " << TBase::SelfId());
+ TString errorMessage = ErrorMessageFactoryMethod(issues);
+ TBase::HandleError(errorMessage, issues);
+ return;
+ }
+
+ CPP_LOG_I("TControlPlaneStorageRequesterActor Handling CPS response. Request finished successfully. Actor id: " << TBase::SelfId());
+ EntityNameExtractorFactoryMethod(Request, event->Get()->Result);
+ TBase::SendRequestToSender();
+ }
+
+private:
+ TPermissions Permissions;
+ TCPSRequestFactory CPSRequestFactory;
+ TErrorMessageFactoryMethod ErrorMessageFactoryMethod;
+ TEntityNameExtractorFactoryMethod EntityNameExtractorFactoryMethod;
+};
+
+/// Discover connection_name
+TString DescribeConnectionErrorMessageFactoryMethod(const NYql::TIssues& issues) {
+ Y_UNUSED(issues);
+ return "Couldn't resolve connection";
+};
+NActors::IActor* MakeDiscoverYDBConnectionName(
+ const TActorId& sender,
+ const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& request,
+ TCounters& counters,
+ TDuration requestTimeout,
+ TPermissions permissions) {
+ auto cpsRequestFactory =
+ [](const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& event) {
+ FederatedQuery::DescribeConnectionRequest result;
+ auto connectionId = event->Get()->Request.content().connection_id();
+ result.set_connection_id(connectionId);
+ return result;
+ };
+ auto entityNameExtractorFactoryMethod =
+ [](const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& event,
+ const FederatedQuery::DescribeConnectionResult& result) {
+ event->Get()->ConnectionName = result.connection().content().name();
+ };
+
+ return new TControlPlaneStorageRequesterActor<TEvControlPlaneProxy::TEvCreateBindingRequest,
+ TEvControlPlaneProxy::TEvCreateBindingResponse,
+ TEvControlPlaneStorage::TEvDescribeConnectionRequest,
+ TEvControlPlaneStorage::TEvDescribeConnectionResponse>(
+ sender,
+ request,
+ requestTimeout,
+ counters.GetCommonCounters(RTC_DESCRIBE_CPS_ENTITY),
+ permissions,
+ cpsRequestFactory,
+ DescribeConnectionErrorMessageFactoryMethod,
+ entityNameExtractorFactoryMethod);
+}
+
+NActors::IActor* MakeDiscoverYDBConnectionName(
+ const TActorId& sender,
+ const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& request,
+ TCounters& counters,
+ TDuration requestTimeout,
+ TPermissions permissions) {
+ auto cpsRequestFactory =
+ [](const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& event) {
+ FederatedQuery::DescribeConnectionRequest result;
+ auto connectionId = event->Get()->Request.connection_id();
+ result.set_connection_id(connectionId);
+ return result;
+ };
+ auto entityNameExtractorFactoryMethod =
+ [](const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& event,
+ const FederatedQuery::DescribeConnectionResult& result) {
+ event->Get()->OldConnectionContent = result.connection().content();
+ };
+
+ return new TControlPlaneStorageRequesterActor<TEvControlPlaneProxy::TEvModifyConnectionRequest,
+ TEvControlPlaneProxy::TEvModifyConnectionResponse,
+ TEvControlPlaneStorage::TEvDescribeConnectionRequest,
+ TEvControlPlaneStorage::TEvDescribeConnectionResponse>(
+ sender,
+ request,
+ requestTimeout,
+ counters.GetCommonCounters(RTC_DESCRIBE_CPS_ENTITY),
+ permissions,
+ cpsRequestFactory,
+ DescribeConnectionErrorMessageFactoryMethod,
+ entityNameExtractorFactoryMethod);
+}
+
+NActors::IActor* MakeDiscoverYDBConnectionName(
+ const TActorId& sender,
+ const TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr& request,
+ TCounters& counters,
+ TDuration requestTimeout,
+ TPermissions permissions) {
+ auto cpsRequestFactory =
+ [](const TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr& event) {
+ FederatedQuery::DescribeConnectionRequest result;
+ auto connectionId = event->Get()->Request.connection_id();
+ result.set_connection_id(connectionId);
+ return result;
+ };
+ auto entityNameExtractorFactoryMethod =
+ [](const TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr& event,
+ const FederatedQuery::DescribeConnectionResult& result) {
+ event->Get()->ConnectionContent = result.connection().content();
+ };
+
+ return new TControlPlaneStorageRequesterActor<TEvControlPlaneProxy::TEvDeleteConnectionRequest,
+ TEvControlPlaneProxy::TEvDeleteConnectionResponse,
+ TEvControlPlaneStorage::TEvDescribeConnectionRequest,
+ TEvControlPlaneStorage::TEvDescribeConnectionResponse>(
+ sender,
+ request,
+ requestTimeout,
+ counters.GetCommonCounters(RTC_DESCRIBE_CPS_ENTITY),
+ permissions,
+ cpsRequestFactory,
+ DescribeConnectionErrorMessageFactoryMethod,
+ entityNameExtractorFactoryMethod);
+}
+
+NActors::IActor* MakeDiscoverYDBConnectionName(
+ const TActorId& sender,
+ const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& request,
+ TCounters& counters,
+ TDuration requestTimeout,
+ TPermissions permissions) {
+ auto cpsRequestFactory =
+ [](const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& event) {
+ FederatedQuery::DescribeConnectionRequest result;
+ auto connectionId = *event->Get()->ConnectionId;
+ result.set_connection_id(connectionId);
+ return result;
+ };
+ auto entityNameExtractorFactoryMethod =
+ [](const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& event,
+ const FederatedQuery::DescribeConnectionResult& result) {
+ event->Get()->ConnectionName = result.connection().content().name();
+ };
+
+ return new TControlPlaneStorageRequesterActor<TEvControlPlaneProxy::TEvModifyBindingRequest,
+ TEvControlPlaneProxy::TEvModifyBindingResponse,
+ TEvControlPlaneStorage::TEvDescribeConnectionRequest,
+ TEvControlPlaneStorage::TEvDescribeConnectionResponse>(
+ sender,
+ request,
+ requestTimeout,
+ counters.GetCommonCounters(RTC_DESCRIBE_CPS_ENTITY),
+ permissions,
+ cpsRequestFactory,
+ DescribeConnectionErrorMessageFactoryMethod,
+ entityNameExtractorFactoryMethod);
+}
+
+/// Discover binding_name
+
+NActors::IActor* MakeDiscoverYDBBindingName(
+ const TActorId& sender,
+ const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& request,
+ TCounters& counters,
+ TDuration requestTimeout,
+ TPermissions permissions) {
+ auto cpsRequestFactory =
+ [](const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& event) {
+ FederatedQuery::DescribeBindingRequest result;
+ result.set_binding_id(event->Get()->Request.binding_id());
+ return result;
+ };
+
+ auto errorMessageFactoryMethod = [](const NYql::TIssues& issues) -> TString {
+ Y_UNUSED(issues);
+ return "Couldn't resolve binding name";
+ };
+ auto entityNameExtractorFactoryMethod =
+ [](const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& event,
+ const FederatedQuery::DescribeBindingResult& result) {
+ event->Get()->OldBindingName = result.binding().content().name();
+ event->Get()->ConnectionId = result.binding().content().connection_id();
+ };
+
+ return new TControlPlaneStorageRequesterActor<TEvControlPlaneProxy::TEvModifyBindingRequest,
+ TEvControlPlaneProxy::TEvModifyBindingResponse,
+ TEvControlPlaneStorage::TEvDescribeBindingRequest,
+ TEvControlPlaneStorage::TEvDescribeBindingResponse>(
+ sender,
+ request,
+ requestTimeout,
+ counters.GetCommonCounters(RTC_DESCRIBE_CPS_ENTITY),
+ permissions,
+ cpsRequestFactory,
+ errorMessageFactoryMethod,
+ entityNameExtractorFactoryMethod);
+}
+
+NActors::IActor* MakeDiscoverYDBBindingName(
+ const TActorId& sender,
+ const TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr& request,
+ TCounters& counters,
+ TDuration requestTimeout,
+ TPermissions permissions) {
+ auto cpsRequestFactory =
+ [](const TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr& event) {
+ FederatedQuery::DescribeBindingRequest result;
+ result.set_binding_id(event->Get()->Request.binding_id());
+ return result;
+ };
+
+ auto errorMessageFactoryMethod = [](const NYql::TIssues& issues) -> TString {
+ Y_UNUSED(issues);
+ return "Couldn't resolve binding name";
+ };
+ auto entityNameExtractorFactoryMethod =
+ [](const TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr& event,
+ const FederatedQuery::DescribeBindingResult& result) {
+ event->Get()->OldBindingName = result.binding().content().name();
+ };
+
+ return new TControlPlaneStorageRequesterActor<TEvControlPlaneProxy::TEvDeleteBindingRequest,
+ TEvControlPlaneProxy::TEvDeleteBindingResponse,
+ TEvControlPlaneStorage::TEvDescribeBindingRequest,
+ TEvControlPlaneStorage::TEvDescribeBindingResponse>(
+ sender,
+ request,
+ requestTimeout,
+ counters.GetCommonCounters(RTC_DESCRIBE_CPS_ENTITY),
+ permissions,
+ cpsRequestFactory,
+ errorMessageFactoryMethod,
+ entityNameExtractorFactoryMethod);
+}
+} // namespace NPrivate
+} // namespace NFq
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h b/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h
new file mode 100644
index 00000000000..24995807ea2
--- /dev/null
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h
@@ -0,0 +1,58 @@
+#pragma once
+
+#include "counters.h"
+
+#include <library/cpp/actors/core/actor.h>
+#include <ydb/core/fq/libs/control_plane_proxy/events/events.h>
+
+namespace NFq {
+namespace NPrivate {
+
+using namespace NActors;
+
+/// Discover connection_name
+NActors::IActor* MakeDiscoverYDBConnectionName(
+ const TActorId& sender,
+ const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& request,
+ TCounters& counters,
+ TDuration requestTimeout,
+ TPermissions permissions);
+
+NActors::IActor* MakeDiscoverYDBConnectionName(
+ const TActorId& sender,
+ const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& request,
+ TCounters& counters,
+ TDuration requestTimeout,
+ TPermissions permissions);
+
+NActors::IActor* MakeDiscoverYDBConnectionName(
+ const TActorId& sender,
+ const TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr& request,
+ TCounters& counters,
+ TDuration requestTimeout,
+ TPermissions permissions);
+
+NActors::IActor* MakeDiscoverYDBConnectionName(
+ const TActorId& sender,
+ const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& request,
+ TCounters& counters,
+ TDuration requestTimeout,
+ TPermissions permissions);
+
+/// Discover binding_name
+
+NActors::IActor* MakeDiscoverYDBBindingName(
+ const TActorId& sender,
+ const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& request,
+ TCounters& counters,
+ TDuration requestTimeout,
+ TPermissions permissions);
+
+NActors::IActor* MakeDiscoverYDBBindingName(
+ const TActorId& sender,
+ const TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr& request,
+ TCounters& counters,
+ TDuration requestTimeout,
+ TPermissions permissions);
+} // namespace NPrivate
+} // namespace NFq
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/counters.h b/ydb/core/fq/libs/control_plane_proxy/actors/counters.h
new file mode 100644
index 00000000000..8ab57302955
--- /dev/null
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/counters.h
@@ -0,0 +1,297 @@
+#pragma once
+
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <ydb/core/fq/libs/common/util.h>
+#include <ydb/core/fq/libs/common/cache.h>
+
+namespace NFq {
+namespace NPrivate {
+
+struct TRequestScopeCounters : public virtual TThrRefBase {
+ const TString Name;
+
+ ::NMonitoring::TDynamicCounterPtr Counters;
+ ::NMonitoring::TDynamicCounters::TCounterPtr InFly;
+ ::NMonitoring::TDynamicCounters::TCounterPtr Ok;
+ ::NMonitoring::TDynamicCounters::TCounterPtr Error;
+ ::NMonitoring::TDynamicCounters::TCounterPtr Timeout;
+ ::NMonitoring::TDynamicCounters::TCounterPtr Retry;
+
+ explicit TRequestScopeCounters(const TString& name)
+ : Name(name) { }
+
+ void Register(const ::NMonitoring::TDynamicCounterPtr& counters) {
+ Counters = counters;
+ ::NMonitoring::TDynamicCounterPtr subgroup =
+ counters->GetSubgroup("request_scope", Name);
+ InFly = subgroup->GetCounter("InFly", false);
+ Ok = subgroup->GetCounter("Ok", true);
+ Error = subgroup->GetCounter("Error", true);
+ Timeout = subgroup->GetCounter("Timeout", true);
+ Timeout = subgroup->GetCounter("Retry", true);
+ }
+
+ virtual ~TRequestScopeCounters() override {
+ Counters->RemoveSubgroup("request_scope", Name);
+ }
+
+private:
+ static ::NMonitoring::IHistogramCollectorPtr GetLatencyHistogramBuckets() {
+ return ::NMonitoring::ExplicitHistogram(
+ {0, 1, 2, 5, 10, 20, 50, 100, 500, 1000, 2000, 5000, 10000, 30000, 50000, 500000});
+ }
+};
+
+struct TRequestCommonCounters : public virtual TThrRefBase {
+ const TString Name;
+
+ ::NMonitoring::TDynamicCounterPtr Counters;
+ ::NMonitoring::TDynamicCounters::TCounterPtr InFly;
+ ::NMonitoring::TDynamicCounters::TCounterPtr Ok;
+ ::NMonitoring::TDynamicCounters::TCounterPtr Error;
+ ::NMonitoring::TDynamicCounters::TCounterPtr Timeout;
+ ::NMonitoring::TDynamicCounters::TCounterPtr Retry;
+ ::NMonitoring::THistogramPtr LatencyMs;
+
+ explicit TRequestCommonCounters(const TString& name)
+ : Name(name) { }
+
+ void Register(const ::NMonitoring::TDynamicCounterPtr& counters) {
+ Counters = counters;
+ ::NMonitoring::TDynamicCounterPtr subgroup =
+ counters->GetSubgroup("request_common", Name);
+ InFly = subgroup->GetCounter("InFly", false);
+ Ok = subgroup->GetCounter("Ok", true);
+ Error = subgroup->GetCounter("Error", true);
+ Timeout = subgroup->GetCounter("Timeout", true);
+ Retry = subgroup->GetCounter("Retry", true);
+ LatencyMs = subgroup->GetHistogram("LatencyMs", GetLatencyHistogramBuckets());
+ }
+
+ virtual ~TRequestCommonCounters() override {
+ Counters->RemoveSubgroup("request_common", Name);
+ }
+
+private:
+ static ::NMonitoring::IHistogramCollectorPtr GetLatencyHistogramBuckets() {
+ return ::NMonitoring::ExplicitHistogram(
+ {0, 1, 2, 5, 10, 20, 50, 100, 500, 1000, 2000, 5000, 10000, 30000, 50000, 500000});
+ }
+};
+
+using TRequestScopeCountersPtr = TIntrusivePtr<TRequestScopeCounters>;
+using TRequestCommonCountersPtr = TIntrusivePtr<TRequestCommonCounters>;
+
+struct TRequestCounters {
+ TRequestScopeCountersPtr Scope;
+ TRequestCommonCountersPtr Common;
+
+ void IncInFly() {
+ Scope->InFly->Inc();
+ Common->InFly->Inc();
+ }
+
+ void DecInFly() {
+ Scope->InFly->Dec();
+ Common->InFly->Dec();
+ }
+
+ void IncOk() {
+ Scope->Ok->Inc();
+ Common->Ok->Inc();
+ }
+
+ void IncError() {
+ Scope->Error->Inc();
+ Common->Error->Inc();
+ }
+
+ void IncTimeout() {
+ Scope->Timeout->Inc();
+ Common->Timeout->Inc();
+ }
+};
+
+enum ERequestTypeScope {
+ RTS_CREATE_QUERY,
+ RTS_LIST_QUERIES,
+ RTS_DESCRIBE_QUERY,
+ RTS_GET_QUERY_STATUS,
+ RTS_MODIFY_QUERY,
+ RTS_DELETE_QUERY,
+ RTS_CONTROL_QUERY,
+ RTS_GET_RESULT_DATA,
+ RTS_LIST_JOBS,
+ RTS_DESCRIBE_JOB,
+ RTS_CREATE_CONNECTION,
+ RTS_LIST_CONNECTIONS,
+ RTS_DESCRIBE_CONNECTION,
+ RTS_MODIFY_CONNECTION,
+ RTS_DELETE_CONNECTION,
+ RTS_TEST_CONNECTION,
+ RTS_CREATE_BINDING,
+ RTS_LIST_BINDINGS,
+ RTS_DESCRIBE_BINDING,
+ RTS_MODIFY_BINDING,
+ RTS_DELETE_BINDING,
+ RTS_MAX,
+};
+
+enum ERequestTypeCommon {
+ RTC_RESOLVE_FOLDER,
+ RTC_CREATE_QUERY,
+ RTC_LIST_QUERIES,
+ RTC_DESCRIBE_QUERY,
+ RTC_GET_QUERY_STATUS,
+ RTC_MODIFY_QUERY,
+ RTC_DELETE_QUERY,
+ RTC_CONTROL_QUERY,
+ RTC_GET_RESULT_DATA,
+ RTC_LIST_JOBS,
+ RTC_DESCRIBE_JOB,
+ RTC_CREATE_CONNECTION,
+ RTC_LIST_CONNECTIONS,
+ RTC_DESCRIBE_CONNECTION,
+ RTC_MODIFY_CONNECTION,
+ RTC_DELETE_CONNECTION,
+ RTC_TEST_CONNECTION,
+ RTC_CREATE_BINDING,
+ RTC_LIST_BINDINGS,
+ RTC_DESCRIBE_BINDING,
+ RTC_MODIFY_BINDING,
+ RTC_DELETE_BINDING,
+ RTC_RESOLVE_SUBJECT_TYPE,
+ RTC_DESCRIBE_CPS_ENTITY,
+ RTC_CREATE_YDB_SESSION,
+ RTC_CREATE_CONNECTION_IN_YDB,
+ RTC_CREATE_BINDING_IN_YDB,
+ RTC_MODIFY_CONNECTION_IN_YDB,
+ RTC_MODIFY_BINDING_IN_YDB,
+ RTC_DELETE_CONNECTION_IN_YDB,
+ RTC_DELETE_BINDING_IN_YDB,
+ RTC_CREATE_COMPUTE_DATABASE,
+ RTC_MAX,
+};
+
+class TCounters : public virtual TThrRefBase {
+ struct TMetricsScope {
+ TString CloudId;
+ TString Scope;
+
+ TMetricsScope() = default;
+
+ TMetricsScope(const TString& cloudId, const TString& scope)
+ : CloudId(cloudId)
+ , Scope(scope)
+ { }
+
+ bool operator<(const TMetricsScope& right) const {
+ return std::tie(CloudId, Scope) < std::tie(right.CloudId, right.Scope);
+ }
+ };
+
+ using TScopeCounters = std::array<TRequestScopeCountersPtr, RTS_MAX>;
+ using TScopeCountersPtr = std::shared_ptr<TScopeCounters>;
+
+ std::array<TRequestCommonCountersPtr, RTC_MAX> CommonRequests = CreateArray<RTC_MAX, TRequestCommonCountersPtr>({
+ {MakeIntrusive<TRequestCommonCounters>("ResolveFolder")},
+ {MakeIntrusive<TRequestCommonCounters>("CreateQuery")},
+ {MakeIntrusive<TRequestCommonCounters>("ListQueries")},
+ {MakeIntrusive<TRequestCommonCounters>("DescribeQuery")},
+ {MakeIntrusive<TRequestCommonCounters>("GetQueryStatus")},
+ {MakeIntrusive<TRequestCommonCounters>("ModifyQuery")},
+ {MakeIntrusive<TRequestCommonCounters>("DeleteQuery")},
+ {MakeIntrusive<TRequestCommonCounters>("ControlQuery")},
+ {MakeIntrusive<TRequestCommonCounters>("GetResultData")},
+ {MakeIntrusive<TRequestCommonCounters>("ListJobs")},
+ {MakeIntrusive<TRequestCommonCounters>("DescribeJob")},
+ {MakeIntrusive<TRequestCommonCounters>("CreateConnection")},
+ {MakeIntrusive<TRequestCommonCounters>("ListConnections")},
+ {MakeIntrusive<TRequestCommonCounters>("DescribeConnection")},
+ {MakeIntrusive<TRequestCommonCounters>("ModifyConnection")},
+ {MakeIntrusive<TRequestCommonCounters>("DeleteConnection")},
+ {MakeIntrusive<TRequestCommonCounters>("TestConnection")},
+ {MakeIntrusive<TRequestCommonCounters>("CreateBinding")},
+ {MakeIntrusive<TRequestCommonCounters>("ListBindings")},
+ {MakeIntrusive<TRequestCommonCounters>("DescribeBinding")},
+ {MakeIntrusive<TRequestCommonCounters>("ModifyBinding")},
+ {MakeIntrusive<TRequestCommonCounters>("DeleteBinding")},
+ {MakeIntrusive<TRequestCommonCounters>("ResolveSubjectType")},
+ {MakeIntrusive<TRequestCommonCounters>("DescribeCPSEntity")},
+ {MakeIntrusive<TRequestCommonCounters>("CreateYDBSession")},
+ {MakeIntrusive<TRequestCommonCounters>("CreateConnectionInYDB")},
+ {MakeIntrusive<TRequestCommonCounters>("CreateBindingInYDB")},
+ {MakeIntrusive<TRequestCommonCounters>("ModifyConnectionInYDB")},
+ {MakeIntrusive<TRequestCommonCounters>("ModifyBindingInYDB")},
+ {MakeIntrusive<TRequestCommonCounters>("DeleteConnectionInYDB")},
+ {MakeIntrusive<TRequestCommonCounters>("DeleteBindingInYDB")},
+ {MakeIntrusive<TRequestCommonCounters>("CreateComputeDatabase")},
+ });
+
+ TTtlCache<TMetricsScope, TScopeCountersPtr, TMap> ScopeCounters{TTtlCacheSettings{}.SetTtl(TDuration::Days(1))};
+ ::NMonitoring::TDynamicCounterPtr Counters;
+
+public:
+ explicit TCounters(const ::NMonitoring::TDynamicCounterPtr& counters)
+ : Counters(counters)
+ {
+ for (auto& request : CommonRequests) {
+ request->Register(Counters);
+ }
+ }
+
+ TRequestCounters GetCounters(const TString& cloudId, const TString& scope, ERequestTypeScope scopeType, ERequestTypeCommon commonType) {
+ return {GetScopeCounters(cloudId, scope, scopeType), GetCommonCounters(commonType)};
+ }
+
+ TRequestCommonCountersPtr GetCommonCounters(ERequestTypeCommon type) {
+ return CommonRequests[type];
+ }
+
+ TRequestScopeCountersPtr GetScopeCounters(const TString& cloudId, const TString& scope, ERequestTypeScope type) {
+ TMetricsScope key{cloudId, scope};
+ TMaybe<TScopeCountersPtr> cacheVal;
+ ScopeCounters.Get(key, &cacheVal);
+ if (cacheVal) {
+ return (**cacheVal)[type];
+ }
+
+ auto scopeRequests = std::make_shared<TScopeCounters>(CreateArray<RTS_MAX, TRequestScopeCountersPtr>({
+ {MakeIntrusive<TRequestScopeCounters>("CreateQuery")},
+ {MakeIntrusive<TRequestScopeCounters>("ListQueries")},
+ {MakeIntrusive<TRequestScopeCounters>("DescribeQuery")},
+ {MakeIntrusive<TRequestScopeCounters>("GetQueryStatus")},
+ {MakeIntrusive<TRequestScopeCounters>("ModifyQuery")},
+ {MakeIntrusive<TRequestScopeCounters>("DeleteQuery")},
+ {MakeIntrusive<TRequestScopeCounters>("ControlQuery")},
+ {MakeIntrusive<TRequestScopeCounters>("GetResultData")},
+ {MakeIntrusive<TRequestScopeCounters>("ListJobs")},
+ {MakeIntrusive<TRequestScopeCounters>("DescribeJob")},
+ {MakeIntrusive<TRequestScopeCounters>("CreateConnection")},
+ {MakeIntrusive<TRequestScopeCounters>("ListConnections")},
+ {MakeIntrusive<TRequestScopeCounters>("DescribeConnection")},
+ {MakeIntrusive<TRequestScopeCounters>("ModifyConnection")},
+ {MakeIntrusive<TRequestScopeCounters>("DeleteConnection")},
+ {MakeIntrusive<TRequestScopeCounters>("TestConnection")},
+ {MakeIntrusive<TRequestScopeCounters>("CreateBinding")},
+ {MakeIntrusive<TRequestScopeCounters>("ListBindings")},
+ {MakeIntrusive<TRequestScopeCounters>("DescribeBinding")},
+ {MakeIntrusive<TRequestScopeCounters>("ModifyBinding")},
+ {MakeIntrusive<TRequestScopeCounters>("DeleteBinding")},
+ }));
+
+ auto scopeCounters = Counters
+ ->GetSubgroup("cloud_id", cloudId)
+ ->GetSubgroup("scope", scope);
+
+ for (auto& request : *scopeRequests) {
+ request->Register(scopeCounters);
+ }
+ cacheVal = scopeRequests;
+ ScopeCounters.Put(key, cacheVal);
+ return (*scopeRequests)[type];
+ }
+};
+
+} // namespace NPrivate
+} // namespace NFq
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp
new file mode 100644
index 00000000000..9859dac2f31
--- /dev/null
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp
@@ -0,0 +1,256 @@
+#include "query_utils.h"
+#include "ydb/public/api/protos/draft/fq.pb.h"
+
+#include <contrib/libs/fmt/include/fmt/format.h>
+#include <library/cpp/iterator/mapped.h>
+#include <util/generic/maybe.h>
+#include <util/string/join.h>
+
+#include <ydb/core/fq/libs/result_formatter/result_formatter.h>
+#include <ydb/core/kqp/provider/yql_kikimr_results.h>
+
+namespace NFq {
+namespace NPrivate {
+
+TString EscapeString(const TString& value,
+ const TString& enclosingSeq,
+ const TString& replaceWith) {
+ auto escapedValue = value;
+ SubstGlobal(escapedValue, enclosingSeq, replaceWith);
+ return escapedValue;
+}
+TString EscapeString(const TString& value, char enclosingChar) {
+ auto escapedValue = value;
+ SubstGlobal(escapedValue,
+ TString{enclosingChar},
+ TStringBuilder{} << '\\' << enclosingChar);
+ return escapedValue;
+}
+
+TString EncloseAndEscapeString(const TString& value, char enclosingChar) {
+ return TStringBuilder{} << enclosingChar
+ << EscapeString(value,
+ enclosingChar)
+ << enclosingChar;
+}
+
+TString EncloseAndEscapeString(const TString& value,
+ const TString& enclosingSeq,
+ const TString& replaceWith) {
+ return TStringBuilder{} << enclosingSeq
+ << EscapeString(value, enclosingSeq, replaceWith)
+ << enclosingSeq;
+}
+
+TString MakeCreateExternalDataTableQuery(const FederatedQuery::BindingContent& content,
+ const TString& connectionName) {
+ using namespace fmt::literals;
+
+ auto bindingName = content.name();
+ auto objectStorageParams = content.setting().object_storage();
+ const auto& subset = objectStorageParams.subset(0);
+
+ // Schema
+ NYql::TExprContext context;
+ auto columnsTransformFunction = [&](const Ydb::Column& column) -> TString {
+ NYdb::TTypeParser typeParser(column.type());
+ auto node = MakeType(typeParser, context);
+ auto typeName = NYql::FormatType(node);
+ const TString notNull =
+ (node->GetKind() == NYql::ETypeAnnotationKind::Optional) ? "" : "NOT NULL";
+ return fmt::format(" {columnName} {columnType} {notNull}",
+ "columnName"_a = EncloseAndEscapeString(column.name(), '`'),
+ "columnType"_a = typeName,
+ "notNull"_a = notNull);
+ };
+ auto columnsBegin =
+ MakeMappedIterator(subset.schema().column().begin(), columnsTransformFunction);
+ auto columnsEnd =
+ MakeMappedIterator(subset.schema().column().end(), columnsTransformFunction);
+
+ // WithOptions
+ auto withOptions = std::unordered_map<TString, TString>{};
+ withOptions.insert({"DATA_SOURCE", TStringBuilder{} << '"' << connectionName << '"'});
+ withOptions.insert({"LOCATION", EncloseAndEscapeString(subset.path_pattern(), '"')});
+ if (!subset.format().Empty()) {
+ withOptions.insert({"FORMAT", EncloseAndEscapeString(subset.format(), '"')});
+ }
+ if (!subset.compression().Empty()) {
+ withOptions.insert(
+ {"COMPRESSION", EncloseAndEscapeString(subset.compression(), '"')});
+ }
+ for (auto& kv : subset.format_setting()) {
+ withOptions.insert({EncloseAndEscapeString(kv.first, '`'),
+ EncloseAndEscapeString(kv.second, '"')});
+ }
+
+ if (!subset.partitioned_by().empty()) {
+ auto stringEscapeMapper = [](const TString& value) {
+ return EscapeString(value, '"');
+ };
+
+ auto partitionBy = TStringBuilder{}
+ << "\"["
+ << JoinRange(", ",
+ MakeMappedIterator(subset.partitioned_by().begin(),
+ stringEscapeMapper),
+ MakeMappedIterator(subset.partitioned_by().end(),
+ stringEscapeMapper))
+ << "]\"";
+ withOptions.insert({"PARTITIONED_BY", partitionBy});
+ }
+
+ for (auto& kv : subset.projection()) {
+ withOptions.insert({EncloseAndEscapeString(kv.first, '`'),
+ EncloseAndEscapeString(kv.second, '"')});
+ }
+
+ auto concatEscapedKeyValueMapper = [](const std::pair<TString, TString>& kv) -> TString {
+ return TStringBuilder{} << " " << kv.first << " = " << kv.second;
+ };
+
+ auto withOptionsBegin =
+ MakeMappedIterator(withOptions.begin(), concatEscapedKeyValueMapper);
+ auto withOptionsEnd =
+ MakeMappedIterator(withOptions.end(), concatEscapedKeyValueMapper);
+
+ return fmt::format(
+ R"(
+ CREATE EXTERNAL TABLE {externalTableName} (
+ {columns}
+ ) WITH (
+ {withOptions}
+ );)",
+ "externalTableName"_a = EncloseAndEscapeString(bindingName, '`'),
+ "columns"_a = JoinRange(",\n", columnsBegin, columnsEnd),
+ "withOptions"_a = JoinRange(",\n", withOptionsBegin, withOptionsEnd));
+}
+
+TString SignAccountId(const TString& id, const TSigner::TPtr& signer) {
+ return signer ? signer->SignAccountId(id) : TString{};
+}
+
+TString CreateSecretObjectQuery(const FederatedQuery::IamAuth& auth,
+ const TString& name,
+ const TSigner::TPtr& signer) {
+ using namespace fmt::literals;
+ switch (auth.identity_case()) {
+ case FederatedQuery::IamAuth::kServiceAccount: {
+ if (!signer) {
+ return {};
+ }
+ return fmt::format(R"(
+ UPSERT OBJECT {external_source} (TYPE SECRET) WITH value={signature};
+ )",
+ "external_source"_a = EncloseAndEscapeString(name, '`'),
+ "signature"_a = EncloseAndEscapeString(
+ SignAccountId(auth.service_account().id(), signer), '`'));
+ }
+ case FederatedQuery::IamAuth::kNone:
+ case FederatedQuery::IamAuth::kCurrentIam:
+ // Do not replace with default. Adding a new auth item should cause a compilation error
+ case FederatedQuery::IamAuth::IDENTITY_NOT_SET:
+ return {};
+ }
+}
+
+TString CreateAuthParamsQuery(const FederatedQuery::IamAuth& auth,
+ const TString& name,
+ const TSigner::TPtr& signer) {
+ using namespace fmt::literals;
+ switch (auth.identity_case()) {
+ case FederatedQuery::IamAuth::kNone:
+ return R"(, AUTH_METHOD="NONE")";
+ case FederatedQuery::IamAuth::kServiceAccount:
+ return fmt::format(R"(,
+ AUTH_METHOD="SERVICE_ACCOUNT",
+ SERVICE_ACCOUNT_ID={service_account_id},
+ SERVICE_ACCOUNT_SECRET_NAME={secret_name}
+ )",
+ "service_account_id"_a =
+ EncloseAndEscapeString(auth.service_account().id(), '"'),
+ "external_source"_a = EncloseAndEscapeString(name, '"'),
+ "secret_name"_a =
+ EncloseAndEscapeString(signer ? name : TString{}, '"'));
+ case FederatedQuery::IamAuth::kCurrentIam:
+ // Do not replace with default. Adding a new auth item should cause a compilation error
+ case FederatedQuery::IamAuth::IDENTITY_NOT_SET:
+ return {};
+ }
+}
+
+TString MakeCreateExternalDataSourceQuery(
+ const FederatedQuery::ConnectionContent& connectionContent,
+ const TString& objectStorageEndpoint,
+ const TSigner::TPtr& signer) {
+ using namespace fmt::literals;
+
+ auto sourceName = connectionContent.name();
+ auto bucketName = connectionContent.setting().object_storage().bucket();
+
+ return fmt::format(
+ R"(
+ {upsert_object};
+ CREATE EXTERNAL DATA SOURCE {external_source} WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="{location}"
+ {auth_params}
+ );
+ )",
+ "external_source"_a = EncloseAndEscapeString(sourceName, '`'),
+ "location"_a = objectStorageEndpoint + "/" + EscapeString(bucketName, '"') + "/",
+ "upsert_object"_a =
+ CreateSecretObjectQuery(connectionContent.setting().object_storage().auth(),
+ connectionContent.name(),
+ signer),
+ "auth_params"_a =
+ CreateAuthParamsQuery(connectionContent.setting().object_storage().auth(),
+ connectionContent.name(),
+ signer));
+}
+
+TString DropSecretObjectQuery(const FederatedQuery::IamAuth& auth,
+ const TString& name,
+ const TSigner::TPtr& signer) {
+ using namespace fmt::literals;
+ switch (auth.identity_case()) {
+ case FederatedQuery::IamAuth::kServiceAccount: {
+ if (!signer) {
+ return {};
+ }
+ return fmt::format("DROP OBJECT {secret_name} (TYPE SECRET);",
+ "secret_name"_a =
+ EncloseAndEscapeString(name, '`'));
+ }
+ case FederatedQuery::IamAuth::kNone:
+ case FederatedQuery::IamAuth::kCurrentIam:
+ // Do not replace with default. Adding a new auth item should cause a compilation error
+ case FederatedQuery::IamAuth::IDENTITY_NOT_SET:
+ return {};
+ }
+}
+
+TString MakeDeleteExternalDataSourceQuery(
+ const FederatedQuery::ConnectionContent& connectionContent,
+ const TSigner::TPtr& signer) {
+ using namespace fmt::literals;
+ return fmt::format(
+ R"(
+ {drop_secret_statement};
+ DROP EXTERNAL DATA SOURCE {external_source};
+ )",
+ "drop_secret_statement"_a =
+ DropSecretObjectQuery(connectionContent.setting().object_storage().auth(),
+ connectionContent.name(),
+ signer),
+ "external_source"_a = EncloseAndEscapeString(connectionContent.name(), '`'));
+}
+
+TString MakeDeleteExternalDataTableQuery(const TString& tableName) {
+ using namespace fmt::literals;
+ return fmt::format("DROP EXTERNAL TABLE {external_table};",
+ "external_table"_a = EncloseAndEscapeString(tableName, '`'));
+}
+
+} // namespace NPrivate
+} // namespace NFq
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h
new file mode 100644
index 00000000000..b5824ce6852
--- /dev/null
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h
@@ -0,0 +1,29 @@
+#pragma once
+
+#include <util/generic/string.h>
+#include <ydb/core/fq/libs/control_plane_storage/events/events.h>
+#include <ydb/core/fq/libs/signer/signer.h>
+
+namespace NFq {
+namespace NPrivate {
+
+TString EscapeString(const TString& value, char enclosingChar);
+
+TString EncloseAndEscapeString(const TString& value, char enclosingChar);
+
+TString MakeCreateExternalDataTableQuery(const FederatedQuery::BindingContent& content,
+ const TString& connectionName);
+
+TString MakeCreateExternalDataSourceQuery(
+ const FederatedQuery::ConnectionContent& connectionContent,
+ const TString& objectStorageEndpoint,
+ const TSigner::TPtr& signer);
+
+TString MakeDeleteExternalDataSourceQuery(
+ const FederatedQuery::ConnectionContent& connectionContent,
+ const TSigner::TPtr& signer);
+
+TString MakeDeleteExternalDataTableQuery(const TString& tableName);
+
+} // namespace NPrivate
+} // namespace NFq
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ya.make b/ydb/core/fq/libs/control_plane_proxy/actors/ya.make
new file mode 100644
index 00000000000..212a8d0f3c4
--- /dev/null
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/ya.make
@@ -0,0 +1,21 @@
+LIBRARY()
+
+SRCS(
+ control_plane_storage_requester_actor.cpp
+ query_utils.cpp
+ ydb_schema_query_actor.cpp
+)
+
+PEERDIR(
+ contrib/libs/fmt
+ library/cpp/iterator
+ ydb/core/fq/libs/control_plane_proxy/events
+ ydb/core/fq/libs/control_plane_storage/events
+ ydb/core/fq/libs/result_formatter
+ ydb/core/kqp/provider
+ ydb/library/db_pool/protos
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_create_compute_session_actor.h b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_create_compute_session_actor.h
new file mode 100644
index 00000000000..3345263e6df
--- /dev/null
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_create_compute_session_actor.h
@@ -0,0 +1,181 @@
+#pragma once
+
+#include <ydb/core/fq/libs/actors/logging/log.h>
+#include <ydb/core/fq/libs/compute/common/config.h>
+#include <ydb/core/fq/libs/config/yq_issue.h>
+#include <ydb/core/fq/libs/control_plane_proxy/actors/base_actor.h>
+#include <ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h>
+#include <ydb/core/fq/libs/control_plane_proxy/actors/counters.h>
+#include <ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h>
+#include <ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h>
+#include <ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.h>
+#include <ydb/core/fq/libs/control_plane_proxy/events/events.h>
+#include <ydb/core/fq/libs/control_plane_storage/events/events.h>
+#include <ydb/core/fq/libs/ydb/ydb.h>
+#include <ydb/library/yql/public/issue/yql_issue.h>
+#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
+
+#include <contrib/libs/fmt/include/fmt/format.h>
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/lwtrace/mon/mon_lwtrace.h>
+
+#include <util/generic/maybe.h>
+#include <util/generic/ptr.h>
+#include <util/string/join.h>
+#include <util/system/types.h>
+
+namespace NFq {
+namespace NPrivate {
+
+using namespace NActors;
+using namespace NFq::NConfig;
+using namespace NKikimr;
+using namespace NThreading;
+using namespace NYdb;
+using namespace NYdb::NTable;
+
+using TTableClientPtr = std::shared_ptr<NYdb::NTable::TTableClient>;
+
+struct TEvPrivate {
+ enum EEv {
+ EvCreateSessionResponse = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
+ EvEnd
+ };
+
+ struct TEvCreateSessionResponse :
+ NActors::TEventLocal<TEvCreateSessionResponse, EvCreateSessionResponse> {
+ TCreateSessionResult Result;
+
+ TEvCreateSessionResponse(TCreateSessionResult result)
+ : Result(std::move(result)) { }
+ };
+
+ static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE),
+ "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)");
+};
+
+template<typename TEventRequest, typename TEventResponse>
+class TCreateYdbComputeSessionActor :
+ public TBaseActor<TCreateYdbComputeSessionActor<TEventRequest, TEventResponse>> {
+private:
+ using TBase = TBaseActor<TCreateYdbComputeSessionActor<TEventRequest, TEventResponse>>;
+ using TBase::SelfId;
+ using TBase::SendRequestToSender;
+ using TBase::Request;
+ using TEventRequestPtr = typename TEventRequest::TPtr;
+
+public:
+ TCreateYdbComputeSessionActor(
+ const TActorId& sender,
+ const TEventRequestPtr request,
+ TDuration requestTimeout,
+ const NPrivate::TRequestCommonCountersPtr& counters,
+ const NFq::TComputeConfig& computeConfig,
+ const TYqSharedResources::TPtr& yqSharedResources,
+ const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory)
+ : TBaseActor<TCreateYdbComputeSessionActor<TEventRequest, TEventResponse>>(
+ sender, std::move(request), requestTimeout, counters)
+ , ComputeConfig(computeConfig)
+ , YqSharedResources(yqSharedResources)
+ , CredentialsProviderFactory(credentialsProviderFactory) { }
+
+ static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_DELETE_CONNECTION_IN_YDB";
+
+ void BootstrapImpl() override {
+ InitiateConnectionCreation();
+ }
+
+ STRICT_STFUNC(StateFunc,
+ cFunc(NActors::TEvents::TSystem::Wakeup, TBase::HandleTimeout);
+ hFunc(TEvPrivate::TEvCreateSessionResponse, Handle);
+ )
+
+ void InitiateConnectionCreation() {
+ CPP_LOG_D("TCreateYdbComputeSessionActor InitiateConnectionCreation called. Actor id: "
+ << SelfId());
+ auto tableClient = CreateNewTableClient(ComputeConfig,
+ YqSharedResources,
+ CredentialsProviderFactory);
+ tableClient->CreateSession().Subscribe(
+ [actorSystem = NActors::TActivationContext::ActorSystem(),
+ self = SelfId()](const TAsyncCreateSessionResult& future) {
+ actorSystem->Send(self,
+ new TEvPrivate::TEvCreateSessionResponse(
+ future.GetValueSync()));
+ });
+ }
+
+ void Handle(TEvPrivate::TEvCreateSessionResponse::TPtr& event) {
+ CPP_LOG_D("TCreateYdbComputeSessionActor "
+ "Handle for TEvCreateSessionResponse called. Actor id: "
+ << SelfId());
+ const auto& createSessionResult = event->Get()->Result;
+ if (!createSessionResult.IsSuccess()) {
+ TBase::HandleError("Couldn't create YDB session",
+ createSessionResult.GetStatus(),
+ createSessionResult.GetIssues());
+ return;
+ }
+
+ CPP_LOG_D("TCreateYdbComputeSessionActor Session was successfully acquired. Actor id: "
+ << SelfId());
+ Request->Get()->YDBSession = createSessionResult.GetSession();
+ SendRequestToSender();
+ }
+
+private:
+ const NFq::TComputeConfig ComputeConfig;
+ const TYqSharedResources::TPtr YqSharedResources;
+ const NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory;
+
+ TTableClientPtr CreateNewTableClient(
+ const NFq::TComputeConfig& computeConfig,
+ const TYqSharedResources::TPtr& yqSharedResources,
+ const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory) {
+
+ auto scope = "yandexcloud://" + Request->Get()->FolderId;
+ NFq::NConfig::TYdbStorageConfig computeConnection =
+ computeConfig.GetConnection(scope);
+ computeConnection.set_endpoint(
+ Request->Get()->ComputeDatabase->connection().endpoint());
+ computeConnection.set_database(
+ Request->Get()->ComputeDatabase->connection().database());
+ computeConnection.set_usessl(
+ Request->Get()->ComputeDatabase->connection().usessl());
+
+ auto tableSettings =
+ GetClientSettings<NYdb::NTable::TClientSettings>(computeConnection,
+ credentialsProviderFactory);
+ return std::make_shared<NYdb::NTable::TTableClient>(
+ yqSharedResources->UserSpaceYdbDriver, tableSettings);
+ }
+};
+
+template<typename TEventRequest, typename TEventResponse>
+struct TBaseActorTypeTag<TCreateYdbComputeSessionActor<TEventRequest, TEventResponse>> {
+ using TRequest = TEventRequest;
+ using TResponse = TEventResponse;
+};
+
+template<typename TEventRequest, typename TEventResponse>
+NActors::IActor* MakeComputeYDBSessionActor(
+ const NActors::TActorId sender,
+ const typename TEventRequest::TPtr request,
+ TDuration requestTimeout,
+ const NPrivate::TRequestCommonCountersPtr& counters,
+ const NFq::TComputeConfig& computeConfig,
+ const TYqSharedResources::TPtr& yqSharedResources,
+ const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory) {
+ return new NPrivate::TCreateYdbComputeSessionActor<TEventRequest, TEventResponse>(
+ sender,
+ std::move(request),
+ requestTimeout,
+ counters,
+ computeConfig,
+ yqSharedResources,
+ credentialsProviderFactory);
+}
+
+} // namespace NPrivate
+} // namespace NFq
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp
new file mode 100644
index 00000000000..dd5e2a634b6
--- /dev/null
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp
@@ -0,0 +1,328 @@
+#include "base_actor.h"
+#include "query_utils.h"
+
+#include <contrib/libs/fmt/include/fmt/format.h>
+#include <ydb/core/fq/libs/control_plane_proxy/events/events.h>
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+
+namespace NFq {
+namespace NPrivate {
+
+using namespace NActors;
+using namespace NFq::NConfig;
+using namespace NKikimr;
+using namespace NThreading;
+using namespace NYdb;
+using namespace NYdb::NTable;
+
+template<class TEventRequest, class TEventResponse>
+class TSchemaQueryYDBActor;
+
+template<class TEventRequest, class TEventResponse>
+struct TBaseActorTypeTag<TSchemaQueryYDBActor<TEventRequest, TEventResponse>> {
+ using TRequest = TEventRequest;
+ using TResponse = TEventResponse;
+};
+
+template<class TEventRequest, class TEventResponse>
+class TSchemaQueryYDBActor :
+ public TBaseActor<TSchemaQueryYDBActor<TEventRequest, TEventResponse>> {
+private:
+ struct TEvPrivate {
+ enum EEv {
+ EvQueryExecutionResponse = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
+ EvEnd
+ };
+
+ static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE),
+ "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)");
+
+ struct TEvQueryExecutionResponse :
+ NActors::TEventLocal<TEvQueryExecutionResponse, EvQueryExecutionResponse> {
+ TStatus Result;
+
+ TEvQueryExecutionResponse(TStatus result)
+ : Result(std::move(result)) { }
+ };
+ };
+
+ using TBase = TBaseActor<TSchemaQueryYDBActor<TEventRequest, TEventResponse>>;
+ using TBase::SelfId;
+ using TBase::Request;
+
+ using TEventRequestPtr = typename TEventRequest::TPtr;
+
+public:
+ using TQueryFactoryMethod = std::function<TString(const TEventRequestPtr& request)>;
+ using TErrorMessageFactoryMethod = std::function<TString(const TStatus& status)>;
+
+ TSchemaQueryYDBActor(const TActorId& sender,
+ const TEventRequestPtr request,
+ TDuration requestTimeout,
+ const NPrivate::TRequestCommonCountersPtr& counters,
+ TQueryFactoryMethod queryFactoryMethod,
+ TErrorMessageFactoryMethod errorMessageFactoryMethod)
+ : TBaseActor<TSchemaQueryYDBActor<TEventRequest, TEventResponse>>(
+ sender, std::move(request), requestTimeout, counters)
+ , QueryFactoryMethod(queryFactoryMethod)
+ , ErrorMessageFactoryMethod(errorMessageFactoryMethod) { }
+
+ static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_DELETE_CONNECTION_IN_YDB";
+
+ void BootstrapImpl() override {
+ CPP_LOG_I("TSchemaQueryYDBActor BootstrapImpl. Actor id: " << TBase::SelfId());
+ InitiateSchemaQueryExecution();
+ }
+
+ void InitiateSchemaQueryExecution() {
+ CPP_LOG_I("TSchemaQueryYDBActor Executing schema query. Actor id: " << TBase::SelfId());
+
+ const auto& request = Request;
+ TString schemeQuery = QueryFactoryMethod(request);
+ request->Get()
+ ->YDBSession->ExecuteSchemeQuery(schemeQuery)
+ .Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(),
+ self = SelfId()](const TAsyncStatus& future) {
+ actorSystem->Send(self,
+ new typename TEvPrivate::TEvQueryExecutionResponse(
+ std::move(future.GetValueSync())));
+ });
+ }
+
+ STRICT_STFUNC(StateFunc,
+ cFunc(NActors::TEvents::TSystem::Wakeup, TBase::HandleTimeout);
+ hFunc(TEvPrivate::TEvQueryExecutionResponse, Handle);
+ )
+
+ void Handle(typename TEvPrivate::TEvQueryExecutionResponse::TPtr& event) {
+ CPP_LOG_I("TSchemaQueryYDBActor Handling query execution response. Actor id: " << TBase::SelfId());
+ const auto& executeSchemeQueryStatus = event->Get()->Result;
+ if (!executeSchemeQueryStatus.IsSuccess()) {
+ CPP_LOG_I("TSchemaQueryYDBActor Handling query execution response. Query finished with issues. Actor id: " << TBase::SelfId());
+ TString errorMessage = ErrorMessageFactoryMethod(executeSchemeQueryStatus);
+
+ TBase::HandleError(errorMessage,
+ executeSchemeQueryStatus.GetStatus(),
+ executeSchemeQueryStatus.GetIssues());
+ return;
+ }
+
+ CPP_LOG_I("TSchemaQueryYDBActor Handling query execution response. Query finished successfully. Actor id: " << TBase::SelfId());
+ Request->Get()->ComputeYDBOperationWasPerformed = true;
+ TBase::SendRequestToSender();
+ }
+
+private:
+ TQueryFactoryMethod QueryFactoryMethod;
+ TErrorMessageFactoryMethod ErrorMessageFactoryMethod;
+};
+
+/// Connection actors
+NActors::IActor* MakeCreateConnectionActor(
+ const TActorId& sender,
+ TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr request,
+ TDuration requestTimeout,
+ TCounters& counters,
+ const TString& objectStorageEndpoint,
+ TSigner::TPtr signer) {
+ auto queryFactoryMethod =
+ [objectStorageEndpoint, signer = std::move(signer)](
+ const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request) -> TString {
+ using namespace fmt::literals;
+
+ return MakeCreateExternalDataSourceQuery(request->Get()->Request.content(),
+ objectStorageEndpoint,
+ signer);
+ };
+
+ auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString {
+ if (queryStatus.GetStatus() == NYdb::EStatus::ALREADY_EXISTS) {
+ return "External data source with such name already exists";
+ } else {
+ return "Couldn't create external data source in YDB";
+ }
+ };
+
+ return new TSchemaQueryYDBActor<TEvControlPlaneProxy::TEvCreateConnectionRequest,
+ TEvControlPlaneProxy::TEvCreateConnectionResponse>(
+ sender,
+ std::move(request),
+ requestTimeout,
+ counters.GetCommonCounters(RTC_CREATE_CONNECTION_IN_YDB),
+ queryFactoryMethod,
+ errorMessageFactoryMethod);
+}
+
+NActors::IActor* MakeModifyConnectionActor(
+ const TActorId& sender,
+ TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr request,
+ TDuration requestTimeout,
+ TCounters& counters,
+ const TString& objectStorageEndpoint,
+ TSigner::TPtr signer) {
+ auto queryFactoryMethod =
+ [objectStorageEndpoint, signer = std::move(signer)](
+ const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& request) -> TString {
+ using namespace fmt::literals;
+
+ auto& oldConnectionContent = (*request->Get()->OldConnectionContent);
+ auto& newConnectionContent = request->Get()->Request.content();
+ return fmt::format(
+ R"(
+ {delete_external_data_source};
+ {create_external_data_source};
+ )",
+ "delete_external_data_source"_a =
+ MakeDeleteExternalDataSourceQuery(oldConnectionContent, signer),
+ "create_external_data_source"_a = MakeCreateExternalDataSourceQuery(
+ newConnectionContent, objectStorageEndpoint, signer));
+ };
+
+ auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString {
+ Y_UNUSED(queryStatus);
+ return "Couldn't modify external data source in YDB";
+ };
+
+ return new TSchemaQueryYDBActor<TEvControlPlaneProxy::TEvModifyConnectionRequest,
+ TEvControlPlaneProxy::TEvModifyConnectionResponse>(
+ sender,
+ std::move(request),
+ requestTimeout,
+ counters.GetCommonCounters(RTC_MODIFY_CONNECTION_IN_YDB),
+ queryFactoryMethod,
+ errorMessageFactoryMethod);
+}
+
+NActors::IActor* MakeDeleteConnectionActor(
+ const TActorId& sender,
+ TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr request,
+ TDuration requestTimeout,
+ TCounters& counters,
+ TSigner::TPtr signer) {
+ auto queryFactoryMethod =
+ [signer = std::move(signer)](
+ const TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr& request) -> TString {
+ using namespace fmt::literals;
+ return MakeDeleteExternalDataSourceQuery(*request->Get()->ConnectionContent,
+ signer);
+ };
+
+ auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString {
+ if (queryStatus.GetStatus() == NYdb::EStatus::ALREADY_EXISTS) {
+ return "External data source with such name already exists";
+ } else {
+ return "Couldn't delete external data source in YDB";
+ }
+ };
+
+ return new TSchemaQueryYDBActor<TEvControlPlaneProxy::TEvDeleteConnectionRequest,
+ TEvControlPlaneProxy::TEvDeleteConnectionResponse>(
+ sender,
+ std::move(request),
+ requestTimeout,
+ counters.GetCommonCounters(RTC_DELETE_CONNECTION_IN_YDB),
+ queryFactoryMethod,
+ errorMessageFactoryMethod);
+}
+
+/// Bindings actors
+NActors::IActor* MakeCreateBindingActor(
+ const TActorId& sender,
+ TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr request,
+ TDuration requestTimeout,
+ TCounters& counters) {
+ auto queryFactoryMethod =
+ [](const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& request) -> TString {
+ using namespace fmt::literals;
+
+ auto externalSourceName = *request->Get()->ConnectionName;
+ return MakeCreateExternalDataTableQuery(request->Get()->Request.content(),
+ externalSourceName);
+ };
+
+ auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString {
+ if (queryStatus.GetStatus() == NYdb::EStatus::ALREADY_EXISTS) {
+ return "External data table with such name already exists";
+ } else {
+ return "Couldn't create external data table in YDB";
+ }
+ };
+
+ return new TSchemaQueryYDBActor<TEvControlPlaneProxy::TEvCreateBindingRequest,
+ TEvControlPlaneProxy::TEvCreateBindingResponse>(
+ sender,
+ std::move(request),
+ requestTimeout,
+ counters.GetCommonCounters(RTC_CREATE_BINDING_IN_YDB),
+ queryFactoryMethod,
+ errorMessageFactoryMethod);
+}
+
+NActors::IActor* MakeModifyBindingActor(
+ const TActorId& sender,
+ TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr request,
+ TDuration requestTimeout,
+ TCounters& counters) {
+ auto queryFactoryMethod =
+ [](const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& request) -> TString {
+ using namespace fmt::literals;
+
+ auto sourceName = *request->Get()->ConnectionName;
+ auto oldTableName = *request->Get()->OldBindingName;
+ return fmt::format(
+ R"(
+ {delete_external_data_table};
+ {create_external_data_table};
+ )",
+ "delete_external_data_table"_a = MakeDeleteExternalDataTableQuery(oldTableName),
+ "create_external_data_table"_a =
+ MakeCreateExternalDataTableQuery(request->Get()->Request.content(),
+ sourceName));
+ };
+
+ auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString {
+ Y_UNUSED(queryStatus);
+ return "Couldn't modify external data table in YDB";
+ };
+
+ return new TSchemaQueryYDBActor<TEvControlPlaneProxy::TEvModifyBindingRequest,
+ TEvControlPlaneProxy::TEvModifyBindingResponse>(
+ sender,
+ std::move(request),
+ requestTimeout,
+ counters.GetCommonCounters(RTC_MODIFY_BINDING_IN_YDB),
+ queryFactoryMethod,
+ errorMessageFactoryMethod);
+}
+
+NActors::IActor* MakeDeleteBindingActor(
+ const TActorId& sender,
+ TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr request,
+ TDuration requestTimeout,
+ TCounters& counters) {
+ auto queryFactoryMethod =
+ [](const TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr& request) -> TString {
+ using namespace fmt::literals;
+ return MakeDeleteExternalDataTableQuery(*request->Get()->OldBindingName);
+ };
+
+ auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString {
+ if (queryStatus.GetStatus() == NYdb::EStatus::ALREADY_EXISTS) {
+ return "External data source with such name already exists";
+ } else {
+ return "Couldn't delete external data source in YDB";
+ }
+ };
+
+ return new TSchemaQueryYDBActor<TEvControlPlaneProxy::TEvDeleteBindingRequest,
+ TEvControlPlaneProxy::TEvDeleteBindingResponse>(
+ sender,
+ std::move(request),
+ requestTimeout,
+ counters.GetCommonCounters(RTC_DELETE_BINDING_IN_YDB),
+ queryFactoryMethod,
+ errorMessageFactoryMethod);
+}
+
+} // namespace NPrivate
+} // namespace NFq
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h
new file mode 100644
index 00000000000..7ba6a06038c
--- /dev/null
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h
@@ -0,0 +1,58 @@
+#pragma once
+
+#include "counters.h"
+
+#include <library/cpp/actors/core/actor.h>
+#include <ydb/core/fq/libs/control_plane_proxy/events/events.h>
+#include <ydb/core/fq/libs/signer/signer.h>
+
+namespace NFq {
+namespace NPrivate {
+
+using namespace NActors;
+
+/// Connection manipulation actors
+NActors::IActor* MakeCreateConnectionActor(
+ const TActorId& sender,
+ TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr request,
+ TDuration requestTimeout,
+ TCounters& counters,
+ const TString& objectStorageEndpoint,
+ TSigner::TPtr signer);
+
+NActors::IActor* MakeModifyConnectionActor(
+ const TActorId& sender,
+ TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr request,
+ TDuration requestTimeout,
+ TCounters& counters,
+ const TString& objectStorageEndpoint,
+ TSigner::TPtr signer);
+
+NActors::IActor* MakeDeleteConnectionActor(
+ const TActorId& sender,
+ TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr request,
+ TDuration requestTimeout,
+ TCounters& counters,
+ TSigner::TPtr signer);
+
+/// Binding manipulation actors
+NActors::IActor* MakeCreateBindingActor(
+ const TActorId& sender,
+ TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr request,
+ TDuration requestTimeout,
+ TCounters& counters);
+
+NActors::IActor* MakeModifyBindingActor(
+ const TActorId& sender,
+ TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr request,
+ TDuration requestTimeout,
+ TCounters& counters);
+
+NActors::IActor* MakeDeleteBindingActor(
+ const TActorId& sender,
+ TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr request,
+ TDuration requestTimeout,
+ TCounters& counters);
+
+} // namespace NPrivate
+} // namespace NFq
diff --git a/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp b/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp
index 6f5f1ce0a46..c81e34bee9d 100644
--- a/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp
+++ b/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp
@@ -4,30 +4,26 @@
#include "utils.h"
#include <ydb/core/fq/libs/actors/logging/log.h>
-#include <ydb/core/fq/libs/common/cache.h>
#include <ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h>
#include <ydb/core/fq/libs/compute/ydb/events/events.h>
#include <ydb/core/fq/libs/control_plane_config/control_plane_config.h>
#include <ydb/core/fq/libs/control_plane_storage/control_plane_storage.h>
#include <ydb/core/fq/libs/control_plane_storage/events/events.h>
-#include <ydb/core/fq/libs/control_plane_storage/util.h>
#include <ydb/core/fq/libs/quota_manager/quota_manager.h>
#include <ydb/core/fq/libs/rate_limiter/events/control_plane_events.h>
#include <ydb/core/fq/libs/result_formatter/result_formatter.h>
#include <ydb/core/fq/libs/test_connection/events/events.h>
#include <ydb/core/fq/libs/test_connection/test_connection.h>
-#include <ydb/core/fq/libs/ydb/util.h>
#include <ydb/core/fq/libs/ydb/ydb.h>
#include <ydb/core/fq/libs/config/yq_issue.h>
+#include <ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h>
+#include <ydb/core/fq/libs/control_plane_proxy/actors/ydb_create_compute_session_actor.h>
+#include <ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h>
#include <ydb/core/fq/libs/control_plane_proxy/events/events.h>
-#include <ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.h>
-#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/actor.h>
-
-#include <ydb/library/ydb_issue/issue_helpers.h>
-#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <ydb/library/ycloud/api/access_service.h>
#include <ydb/library/ycloud/impl/access_service.h>
@@ -61,137 +57,10 @@ using namespace NKikimr;
using namespace NThreading;
using namespace NYdb;
using namespace NYdb::NTable;
+using namespace NFq::NPrivate;
LWTRACE_USING(YQ_CONTROL_PLANE_PROXY_PROVIDER);
-struct TRequestScopeCounters: public virtual TThrRefBase {
- const TString Name;
-
- ::NMonitoring::TDynamicCounterPtr Counters;
- ::NMonitoring::TDynamicCounters::TCounterPtr InFly;
- ::NMonitoring::TDynamicCounters::TCounterPtr Ok;
- ::NMonitoring::TDynamicCounters::TCounterPtr Error;
- ::NMonitoring::TDynamicCounters::TCounterPtr Timeout;
- ::NMonitoring::TDynamicCounters::TCounterPtr Retry;
-
- explicit TRequestScopeCounters(const TString& name)
- : Name(name)
- { }
-
- void Register(const ::NMonitoring::TDynamicCounterPtr& counters) {
- Counters = counters;
- ::NMonitoring::TDynamicCounterPtr subgroup = counters->GetSubgroup("request_scope", Name);
- InFly = subgroup->GetCounter("InFly", false);
- Ok = subgroup->GetCounter("Ok", true);
- Error = subgroup->GetCounter("Error", true);
- Timeout = subgroup->GetCounter("Timeout", true);
- Timeout = subgroup->GetCounter("Retry", true);
- }
-
- virtual ~TRequestScopeCounters() override {
- Counters->RemoveSubgroup("request_scope", Name);
- }
-
-private:
- static ::NMonitoring::IHistogramCollectorPtr GetLatencyHistogramBuckets() {
- return ::NMonitoring::ExplicitHistogram({0, 1, 2, 5, 10, 20, 50, 100, 500, 1000, 2000, 5000, 10000, 30000, 50000, 500000});
- }
-};
-
-struct TRequestCommonCounters: public virtual TThrRefBase {
- const TString Name;
-
- ::NMonitoring::TDynamicCounterPtr Counters;
- ::NMonitoring::TDynamicCounters::TCounterPtr InFly;
- ::NMonitoring::TDynamicCounters::TCounterPtr Ok;
- ::NMonitoring::TDynamicCounters::TCounterPtr Error;
- ::NMonitoring::TDynamicCounters::TCounterPtr Timeout;
- ::NMonitoring::TDynamicCounters::TCounterPtr Retry;
- ::NMonitoring::THistogramPtr LatencyMs;
-
- explicit TRequestCommonCounters(const TString& name)
- : Name(name)
- { }
-
- void Register(const ::NMonitoring::TDynamicCounterPtr& counters) {
- Counters = counters;
- ::NMonitoring::TDynamicCounterPtr subgroup = counters->GetSubgroup("request_common", Name);
- InFly = subgroup->GetCounter("InFly", false);
- Ok = subgroup->GetCounter("Ok", true);
- Error = subgroup->GetCounter("Error", true);
- Timeout = subgroup->GetCounter("Timeout", true);
- Retry = subgroup->GetCounter("Retry", true);
- LatencyMs = subgroup->GetHistogram("LatencyMs", GetLatencyHistogramBuckets());
- }
-
- virtual ~TRequestCommonCounters() override {
- Counters->RemoveSubgroup("request_common", Name);
- }
-
-private:
- static ::NMonitoring::IHistogramCollectorPtr GetLatencyHistogramBuckets() {
- return ::NMonitoring::ExplicitHistogram({0, 1, 2, 5, 10, 20, 50, 100, 500, 1000, 2000, 5000, 10000, 30000, 50000, 500000});
- }
-};
-
-using TRequestScopeCountersPtr = TIntrusivePtr<TRequestScopeCounters>;
-using TRequestCommonCountersPtr = TIntrusivePtr<TRequestCommonCounters>;
-
-struct TRequestCounters {
- TRequestScopeCountersPtr Scope;
- TRequestCommonCountersPtr Common;
-
- void IncInFly() {
- Scope->InFly->Inc();
- Common->InFly->Inc();
- }
-
- void DecInFly() {
- Scope->InFly->Dec();
- Common->InFly->Dec();
- }
-
- void IncOk() {
- Scope->Ok->Inc();
- Common->Ok->Inc();
- }
-
- void DecOk() {
- Scope->Ok->Dec();
- Common->Ok->Dec();
- }
-
- void IncError() {
- Scope->Error->Inc();
- Common->Error->Inc();
- }
-
- void DecError() {
- Scope->Error->Dec();
- Common->Error->Dec();
- }
-
- void IncTimeout() {
- Scope->Timeout->Inc();
- Common->Timeout->Inc();
- }
-
- void DecTimeout() {
- Scope->Timeout->Dec();
- Common->Timeout->Dec();
- }
-
- void IncRetry() {
- Scope->Retry->Inc();
- Common->Retry->Inc();
- }
-
- void DecRetry() {
- Scope->Retry->Dec();
- Common->Retry->Dec();
- }
-};
-
template<class TEventRequest, class TResponseProxy>
class TGetQuotaActor : public NActors::TActorBootstrapped<TGetQuotaActor<TEventRequest, TResponseProxy>> {
using TBase = NActors::TActorBootstrapped<TGetQuotaActor<TEventRequest, TResponseProxy>>;
@@ -346,7 +215,6 @@ public:
PassAway();
}
-
private:
static TString GetSubjectType(const yandex::cloud::priv::servicecontrol::v1::Subject& subject) {
switch (subject.type_case()) {
@@ -370,671 +238,6 @@ private:
}
};
-TString EscapeString(const TString& value, char enclosingChar) {
- auto escapedValue = value;
- SubstGlobal(
- escapedValue, TString{enclosingChar}, TStringBuilder{} << '\\' << enclosingChar);
- return escapedValue;
-}
-
-TString EncloseAndEscapeString(const TString& value, char enclosingChar) {
- return TStringBuilder{} << enclosingChar << EscapeString(value, enclosingChar) << enclosingChar;
-}
-
-class TCreateConnectionInYDBActor :
- public NActors::TActorBootstrapped<TCreateConnectionInYDBActor> {
- struct TEvPrivate {
- enum EEv {
- EvCreateSessionResponse = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
- EvCreateConnectionExecutionResponse,
- EvEnd
- };
-
- static_assert(
- EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE),
- "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)");
-
- struct TEvCreateSessionResponse :
- NActors::TEventLocal<TEvCreateSessionResponse, EvCreateSessionResponse> {
- TAsyncCreateSessionResult Result;
-
- TEvCreateSessionResponse(TAsyncCreateSessionResult result)
- : Result(std::move(result)) { }
- };
-
- struct TEvCreateConnectionExecutionResponse :
- NActors::TEventLocal<TEvCreateConnectionExecutionResponse, EvCreateConnectionExecutionResponse> {
- TAsyncStatus Result;
-
- TEvCreateConnectionExecutionResponse(TAsyncStatus result)
- : Result(std::move(result)) { }
- };
- };
-
- using TBase = NActors::TActorBootstrapped<TCreateConnectionInYDBActor>;
- using TBase::Become;
- using TBase::PassAway;
- using TBase::Register;
- using TBase::SelfId;
- using TBase::Send;
- using IRetryPolicy =
- IRetryPolicy<NCloud::TEvAccessService::TEvAuthenticateResponse::TPtr&>;
- using TTableClientPtr = std::unique_ptr<NYdb::NTable::TTableClient>;
-
- using TEventRequest = TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr;
- using TResponce = TEvControlPlaneProxy::TEvCreateConnectionResponse;
-
- TActorId Sender;
- TRequestCommonCountersPtr Counters;
- TEventRequest Event;
- ui32 Cookie;
- TDuration RequestTimeout;
- TInstant StartTime;
- TString Scope;
- TTableClientPtr TableClient;
- TString ObjectStorageEndpoint;
- NFq::TSigner::TPtr Signer;
-
-public:
- TCreateConnectionInYDBActor(
- const TRequestCommonCountersPtr& counters,
- const NFq::TComputeConfig& computeConfig,
- const TYqSharedResources::TPtr& yqSharedResources,
- const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
- const TString& objectStorageEndpoint,
- TActorId sender,
- TEventRequest event,
- ui32 cookie,
- const TString& scope,
- const NFq::TSigner::TPtr& signer,
- TDuration requestTimeout)
- : Sender(sender)
- , Counters(counters)
- , Event(event)
- , Cookie(cookie)
- , RequestTimeout(requestTimeout)
- , StartTime(TInstant::Now())
- , Scope(scope)
- , TableClient(CreateNewTableClient(
- computeConfig, yqSharedResources, credentialsProviderFactory))
- , ObjectStorageEndpoint(objectStorageEndpoint)
- , Signer(signer) { }
-
- static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_CREATE_CONNECTION_IN_YDB";
-
- void Bootstrap() {
- CPP_LOG_T("Create connection in YDB. Actor id: " << SelfId());
-
- if (auto issues = ValidateRequest(Event->Get()->Request); !issues.Empty()) {
- NYql::TIssue issue = MakeErrorIssue(
- TIssuesIds::INTERNAL_ERROR, "CreateConnectionRequest is not valid");
- for (auto& subIssue : issues) {
- issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue));
- }
- SendErrorMessageToSender(std::move(issue));
- return;
- }
-
- Become(
- &TCreateConnectionInYDBActor::StateFunc,
- RequestTimeout,
- new NActors::TEvents::TEvWakeup());
- Counters->InFly->Inc();
- InitiateConnectionCreation();
- }
-
- NYql::TIssues ValidateRequest(const FederatedQuery::CreateConnectionRequest& request) {
- NYql::TIssues issues;
- if (request.content().name().Contains('/')) {
- issues.AddIssue(MakeErrorIssue(
- TIssuesIds::INTERNAL_ERROR, "'/' is not allowed in connection name"));
- }
- return issues;
- }
-
- void InitiateConnectionCreation() {
- auto request = Event->Get()->Request;
- TableClient->CreateSession().Subscribe(
- [actorSystem = NActors::TActivationContext::ActorSystem(),
- self = SelfId()](const TAsyncCreateSessionResult& future) {
- actorSystem->Send(
- self, new TEvPrivate::TEvCreateSessionResponse(std::move(future)));
- });
- }
-
- void Handle(TEvPrivate::TEvCreateSessionResponse::TPtr& event) {
- using namespace fmt::literals;
- auto createSessionResult = event->Get()->Result.GetValueSync();
- if (!createSessionResult.IsSuccess()) {
- TString errorMessage = TStringBuilder{}
- << "Couldn't create YDB session. Status"
- << createSessionResult.GetStatus();
- CPP_LOG_E(errorMessage);
-
- NYql::TIssue issue =
- MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, "Couldn't create YDB session");
- for (auto& subIssue : createSessionResult.GetIssues()) {
- issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue));
- }
- SendErrorMessageToSender(std::move(issue));
- return;
- }
-
- auto request = Event->Get()->Request;
- auto session = createSessionResult.GetSession();
- auto bucketName = request.content().setting().object_storage().bucket();
- SubstGlobal(bucketName, TString{'"'}, "\\\"");
- session
- .ExecuteSchemeQuery(fmt::format(
- R"(
- {upsert_object};
- CREATE EXTERNAL DATA SOURCE {external_source} WITH (
- SOURCE_TYPE="ObjectStorage",
- LOCATION="{location}"
- {auth_params}
- );
- )",
- "external_source"_a = EncloseAndEscapeString(request.content().name(), '`'),
- "location"_a = ObjectStorageEndpoint + "/" + bucketName + "/",
- "upsert_object"_a = CreateSecretObjectQuery(request.content().setting().object_storage().auth(), request.content().name()),
- "auth_params"_a = CreateAuthParamsQuery(request.content().setting().object_storage().auth(), request.content().name())))
- .Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(),
- self = SelfId()](const TAsyncStatus& future) {
- actorSystem->Send(
- self,
- new TEvPrivate::TEvCreateConnectionExecutionResponse(std::move(future)));
- });
- }
-
- TString CreateSecretObjectQuery(const FederatedQuery::IamAuth& auth, const TString& name) const {
- using namespace fmt::literals;
- switch (auth.identity_case()) {
- case FederatedQuery::IamAuth::kServiceAccount: {
- if (!Signer) {
- return {};
- }
- return fmt::format(R"(
- UPSERT OBJECT {external_source} (TYPE SECRET) WITH value={signature};
- )",
- "external_source"_a = EncloseAndEscapeString(name, '`'),
- "signature"_a = EncloseAndEscapeString(SignAccountId(auth.service_account().id()), '`'));
- }
- case FederatedQuery::IamAuth::kNone:
- case FederatedQuery::IamAuth::kCurrentIam:
- // Do not replace with default. Adding a new auth item should cause a compilation error
- case FederatedQuery::IamAuth::IDENTITY_NOT_SET:
- return {};
- }
- }
-
- TString CreateAuthParamsQuery(const FederatedQuery::IamAuth& auth, const TString& name) const {
- using namespace fmt::literals;
- switch (auth.identity_case()) {
- case FederatedQuery::IamAuth::kNone:
- return R"(, AUTH_METHOD="NONE")";
- case FederatedQuery::IamAuth::kServiceAccount:
- return fmt::format(R"(,
- AUTH_METHOD="SERVICE_ACCOUNT",
- SERVICE_ACCOUNT_ID={service_account_id},
- SERVICE_ACCOUNT_SECRET_NAME={secret_name}
- )",
- "service_account_id"_a = EncloseAndEscapeString(auth.service_account().id(), '"'),
- "external_source"_a = EncloseAndEscapeString(name, '"'),
- "secret_name"_a = EncloseAndEscapeString(Signer ? name : TString{}, '"'));
- case FederatedQuery::IamAuth::kCurrentIam:
- // Do not replace with default. Adding a new auth item should cause a compilation error
- case FederatedQuery::IamAuth::IDENTITY_NOT_SET:
- return {};
- }
- }
-
- TString SignAccountId(const TString& id) const {
- return Signer ? Signer->SignAccountId(id) : TString{};
- }
-
- void Handle(TEvPrivate::TEvCreateConnectionExecutionResponse::TPtr& event) {
- Counters->InFly->Dec();
- Counters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds());
-
- const auto& status = event->Get()->Result.GetValueSync();
- if (!status.IsSuccess()) {
- TString errorMessage;
- if (status.GetStatus() == NYdb::EStatus::ALREADY_EXISTS) {
- errorMessage = "External data source with such name already exists";
- } else {
- errorMessage = TStringBuilder{}
- << "Couldn't create external data source in YDB. Status"
- << status.GetStatus();
- }
-
- CPP_LOG_E(errorMessage);
-
- NYql::TIssue issue = MakeErrorIssue(
- TIssuesIds::INTERNAL_ERROR, "Couldn't create external data source in YDB");
- for (auto& subIssue : status.GetIssues()) {
- issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue));
- }
- SendErrorMessageToSender(std::move(issue));
- return;
- }
-
- CPP_LOG_T("External data source in YDB was successfully created");
- Counters->Ok->Inc();
- Event->Get()->ComputeYDBOperationWasPerformed = true;
-
- TActivationContext::Send(Event->Forward(ControlPlaneProxyActorId()));
- PassAway();
- }
-
- void PassAway() override { TActor::PassAway(); }
-
- STRICT_STFUNC(StateFunc, cFunc(NActors::TEvents::TSystem::Wakeup, HandleTimeout);
- hFunc(TEvPrivate::TEvCreateSessionResponse, Handle);
- hFunc(TEvPrivate::TEvCreateConnectionExecutionResponse, Handle);)
-
- void HandleTimeout() {
- CPP_LOG_D(
- "Timeout occurred while creating external data source in YDB. Actor id: "
- << SelfId());
- Counters->Timeout->Inc();
- SendErrorMessageToSender(MakeErrorIssue(
- TIssuesIds::TIMEOUT,
- "Timeout occurred while creating external data source in YDB. Try repeating the request later"));
- }
-
- void SendErrorMessageToSender(NYql::TIssue issue) {
- Counters->Error->Inc();
- NYql::TIssues issues;
- issues.AddIssue(issue);
- Send(
- Sender,
- new TEvControlPlaneProxy::TEvCreateConnectionResponse(issues, {}),
- 0,
- Cookie); // Change to template
- PassAway();
- }
-
-private:
- TTableClientPtr CreateNewTableClient(
- const NFq::TComputeConfig& computeConfig,
- const TYqSharedResources::TPtr& yqSharedResources,
- const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory) {
-
- NFq::NConfig::TYdbStorageConfig computeConnection = computeConfig.GetConnection(Scope);
- computeConnection.set_endpoint(Event->Get()->ComputeDatabase->connection().endpoint());
- computeConnection.set_database(Event->Get()->ComputeDatabase->connection().database());
- computeConnection.set_usessl(Event->Get()->ComputeDatabase->connection().usessl());
-
- auto tableSettings = GetClientSettings<NYdb::NTable::TClientSettings>(
- computeConnection, credentialsProviderFactory);
- return std::make_unique<NYdb::NTable::TTableClient>(
- yqSharedResources->UserSpaceYdbDriver, tableSettings);
- }
-};
-
-class TCreateBindingInYDBActor :
- public NActors::TActorBootstrapped<TCreateBindingInYDBActor> {
- struct TEvPrivate {
- enum EEv {
- EvCreateSessionResponse = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
- EvCreateBindingExecutionResponse,
- EvEnd
- };
-
- static_assert(
- EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE),
- "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)");
-
- struct TEvCreateSessionResponse :
- NActors::TEventLocal<TEvCreateSessionResponse, EvCreateSessionResponse> {
- TAsyncCreateSessionResult Result;
-
- TEvCreateSessionResponse(TAsyncCreateSessionResult result)
- : Result(std::move(result)) { }
- };
-
- struct TEvCreateBindingExecutionResponse :
- NActors::TEventLocal<TEvCreateBindingExecutionResponse, EvCreateBindingExecutionResponse> {
- TAsyncStatus Result;
-
- TEvCreateBindingExecutionResponse(TAsyncStatus result)
- : Result(std::move(result)) { }
- };
- };
-
- using TBase = NActors::TActorBootstrapped<TCreateBindingInYDBActor>;
- using TBase::Become;
- using TBase::PassAway;
- using TBase::Register;
- using TBase::SelfId;
- using TBase::Send;
- using IRetryPolicy =
- IRetryPolicy<NCloud::TEvAccessService::TEvAuthenticateResponse::TPtr&>;
- using TTableClientPtr = std::unique_ptr<NYdb::NTable::TTableClient>;
-
- using TEventRequest = TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr;
- using TResponce = TEvControlPlaneProxy::TEvCreateBindingResponse;
- using TResponseProxy = TEvControlPlaneProxy::TEvCreateBindingResponse;
-
- TActorId Sender;
- TRequestCommonCountersPtr Counters;
- TEventRequest Event;
- ui32 Cookie;
- TInstant StartTime;
- TPermissions Permissions;
- TDuration RequestTimeout;
- TString Scope;
- TTableClientPtr TableClient;
- TString ConnectionName;
-
-public:
- TCreateBindingInYDBActor(
- const TRequestCommonCountersPtr& counters,
- const NFq::TComputeConfig& computeConfig,
- const TYqSharedResources::TPtr& yqSharedResources,
- const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
- TActorId sender,
- TEventRequest event,
- ui32 cookie,
- TPermissions permissions,
- const TString& scope,
- TDuration requestTimeout)
- : Sender(sender)
- , Counters(counters)
- , Event(event)
- , Cookie(cookie)
- , StartTime(TInstant::Now())
- , Permissions(std::move(permissions))
- , RequestTimeout(requestTimeout)
- , Scope(scope)
- , TableClient(CreateNewTableClient(
- computeConfig, yqSharedResources, credentialsProviderFactory)) { }
-
- static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_CREATE_BINDING_IN_YDB";
-
- void Bootstrap() {
- CPP_LOG_T("Create external table in YDB. Actor id: " << SelfId());
-
- if (auto issues = ValidateRequest(Event->Get()->Request); !issues.Empty()) {
- NYql::TIssue issue = MakeErrorIssue(
- TIssuesIds::INTERNAL_ERROR, "CreateBindingRequest is not valid");
- for (auto& subIssue : issues) {
- issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue));
- }
- SendErrorMessageToSender(std::move(issue));
- return;
- }
-
- Become(
- &TCreateBindingInYDBActor::StateFunc,
- RequestTimeout,
- new NActors::TEvents::TEvWakeup());
- Counters->InFly->Inc();
- ResolveConnectionId();
- }
-
- void ResolveConnectionId() {
- FederatedQuery::DescribeConnectionRequest request;
- auto connectionId = Event->Get()->Request.content().connection_id();
- request.set_connection_id(connectionId);
- CPP_LOG_T(
- "Create external table in YDB. Resolving connection id. Actor id: "
- << SelfId() << " connection_id: " << connectionId);
- auto event = new TEvControlPlaneStorage::TEvDescribeConnectionRequest(
- Scope,
- request,
- Event->Get()->User,
- Event->Get()->Token,
- Event->Get()->CloudId,
- Permissions,
- Event->Get()->Quotas,
- Event->Get()->TenantInfo,
- {});
- Send(ControlPlaneStorageServiceActorId(), event);
- }
-
- void Handle(TEvControlPlaneStorage::TEvDescribeConnectionResponse::TPtr& event) {
- const auto& issues = event->Get()->Issues;
- if (!issues.Empty()) {
- CPP_LOG_E(
- "Couldn't resolve connection id. Actor id: " << SelfId() << " Status: "
- << issues.ToOneLineString());
-
- NYql::TIssue issue =
- MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, "Couldn't resolve connection id");
- for (auto& subIssue : issues) {
- issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue));
- }
- SendErrorMessageToSender(std::move(issue));
- return;
- }
-
- ConnectionName = event->Get()->Result.connection().content().name();
- CPP_LOG_T(
- "Create external table in YDB. Resolved connection name. Actor id: "
- << SelfId() << " Connection name: " << ConnectionName);
- InitiateConnectionCreation();
- }
-
- void InitiateConnectionCreation() {
- auto request = Event->Get()->Request;
- TableClient->CreateSession().Subscribe(
- [actorSystem = NActors::TActivationContext::ActorSystem(),
- self = SelfId()](const TAsyncCreateSessionResult& future) {
- actorSystem->Send(
- self, new TEvPrivate::TEvCreateSessionResponse(std::move(future)));
- });
- }
-
- void Handle(TEvPrivate::TEvCreateSessionResponse::TPtr& event) {
- auto createSessionResult = event->Get()->Result.GetValueSync();
- if (!createSessionResult.IsSuccess()) {
- CPP_LOG_E(
- "Couldn't create YDB session. Actor id: "
- << SelfId() << " Status: " << createSessionResult.GetStatus());
-
- NYql::TIssue issue =
- MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, "Couldn't create YDB session");
- for (auto& subIssue : createSessionResult.GetIssues()) {
- issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue));
- }
- SendErrorMessageToSender(std::move(issue));
- return;
- }
-
- auto session = createSessionResult.GetSession();
- auto query = CreateSchemaQuery(Event->Get()->Request);
- CPP_LOG_T(
- "Create external table in YDB. Actor id: " << SelfId() << " Query: " << query);
- session.ExecuteSchemeQuery(query).Subscribe(
- [actorSystem = NActors::TActivationContext::ActorSystem(),
- self = SelfId()](const TAsyncStatus& future) {
- actorSystem->Send(
- self, new TEvPrivate::TEvCreateBindingExecutionResponse(future));
- });
- }
-
- NYql::TIssues ValidateRequest(const FederatedQuery::CreateBindingRequest& request) {
- NYql::TIssues issues;
- if (request.content().setting().binding_case() !=
- FederatedQuery::BindingSetting::BindingCase::kObjectStorage) {
- issues.AddIssue(
- MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, "Unsupported binding type"));
- }
- if (request.content().setting().object_storage().subset().size() != 1) {
- issues.AddIssue(MakeErrorIssue(
- TIssuesIds::INTERNAL_ERROR,
- "Cannot create external table due to wrong amount of subsets in request"));
- }
- if (request.content().name().Contains('/')) {
- issues.AddIssue(MakeErrorIssue(
- TIssuesIds::INTERNAL_ERROR, "'/' is not allowed in binding name"));
- }
- return issues;
- }
-
- TString CreateSchemaQuery(const FederatedQuery::CreateBindingRequest& request) {
- using namespace fmt::literals;
-
- auto bindingName = request.content().name();
- auto objectStorageParams = request.content().setting().object_storage();
- const auto& subset = objectStorageParams.subset(0);
-
- // Schema
- NYql::TExprContext context;
- auto columnsTransformFunction = [&](const Ydb::Column& column) -> TString {
- NYdb::TTypeParser typeParser(column.type());
- auto node = MakeType(typeParser, context);
- auto typeName = NYql::FormatType(node);
- const TString notNull = (node->GetKind() == NYql::ETypeAnnotationKind::Optional) ? "" : "NOT NULL";
- return fmt::format(
- " {columnName} {columnType} {notNull}",
- "columnName"_a = EncloseAndEscapeString(column.name(), '`'),
- "columnType"_a = typeName,
- "notNull"_a = notNull
- );
- };
- auto columnsBegin =
- MakeMappedIterator(subset.schema().column().begin(), columnsTransformFunction);
- auto columnsEnd =
- MakeMappedIterator(subset.schema().column().end(), columnsTransformFunction);
-
- // WithOptions
- auto withOptions = std::unordered_map<TString, TString>{};
- withOptions.insert(
- {"DATA_SOURCE", TStringBuilder{} << '"' << ConnectionName << '"'});
- withOptions.insert({"LOCATION", EncloseAndEscapeString(subset.path_pattern(), '"')});
- if (!subset.format().Empty()) {
- withOptions.insert({"FORMAT", EncloseAndEscapeString(subset.format(), '"')});
- }
- if (!subset.compression().Empty()) {
- withOptions.insert(
- {"COMPRESSION", EncloseAndEscapeString(subset.compression(), '"')});
- }
- for (auto& kv : subset.format_setting()) {
- withOptions.insert(
- {EncloseAndEscapeString(kv.first, '`'),
- EncloseAndEscapeString(kv.second, '"')});
- }
-
- if (!subset.partitioned_by().empty()) {
- auto stringEscapeMapper = [](const TString& value) {
- return EscapeString(value, '"');
- };
-
- auto partitionBy =
- TStringBuilder{}
- << "\"["
- << JoinRange(
- ", ",
- MakeMappedIterator(subset.partitioned_by().begin(), stringEscapeMapper),
- MakeMappedIterator(subset.partitioned_by().end(), stringEscapeMapper))
- << "]\"";
- withOptions.insert({"PARTITIONED_BY", partitionBy});
- }
-
- for (auto& kv : subset.projection()) {
- withOptions.insert(
- {EncloseAndEscapeString(kv.first, '`'),
- EncloseAndEscapeString(kv.second, '"')});
- }
-
- auto concatEscapedKeyValueMapper =
- [](const std::pair<TString, TString>& kv) -> TString {
- return TStringBuilder{} << " " << kv.first << " = " << kv.second;
- };
-
- auto withOptionsBegin =
- MakeMappedIterator(withOptions.begin(), concatEscapedKeyValueMapper);
- auto withOptionsEnd =
- MakeMappedIterator(withOptions.end(), concatEscapedKeyValueMapper);
-
- return fmt::format(
- R"(
- CREATE EXTERNAL TABLE {externalTableName} (
- {columns}
- ) WITH (
- {withOptions}
- );)",
- "externalTableName"_a = EncloseAndEscapeString(bindingName, '`'),
- "columns"_a = JoinRange(",\n", columnsBegin, columnsEnd),
- "withOptions"_a = JoinRange(",\n", withOptionsBegin, withOptionsEnd));
- }
-
- void Handle(TEvPrivate::TEvCreateBindingExecutionResponse::TPtr& event) {
- Counters->InFly->Dec();
- Counters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds());
-
- const auto& status = event->Get()->Result.GetValueSync();
- if (!status.IsSuccess()) {
- TString errorMessage;
- if (status.GetStatus() == NYdb::EStatus::ALREADY_EXISTS) {
- errorMessage = "External table with such name already exists";
- } else {
- errorMessage = TStringBuilder{}
- << "Couldn't create external table in YDB. Status: "
- << status.GetStatus();
- }
-
- CPP_LOG_E(errorMessage);
-
- NYql::TIssue issue = MakeErrorIssue(
- TIssuesIds::INTERNAL_ERROR, "Couldn't create external table in YDB");
- for (auto& subIssue : status.GetIssues()) {
- issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue));
- }
- SendErrorMessageToSender(std::move(issue));
- return;
- }
-
- Counters->Ok->Inc();
- Event->Get()->ComputeYDBOperationWasPerformed = true;
- CPP_LOG_T("External table in YDB was successfully created");
-
- TActivationContext::Send(Event->Forward(ControlPlaneProxyActorId()));
- PassAway();
- }
-
- STRICT_STFUNC(StateFunc, cFunc(NActors::TEvents::TSystem::Wakeup, HandleTimeout);
- hFunc(TEvControlPlaneStorage::TEvDescribeConnectionResponse, Handle);
- hFunc(TEvPrivate::TEvCreateSessionResponse, Handle);
- hFunc(TEvPrivate::TEvCreateBindingExecutionResponse, Handle);)
-
- void HandleTimeout() {
- CPP_LOG_D("Create external table in YDB timeout. Actor id: " << SelfId());
- Counters->Timeout->Inc();
- SendErrorMessageToSender(MakeErrorIssue(
- TIssuesIds::TIMEOUT,
- "Create external table in YDB timeout. Try repeating the request later"));
- }
-
- void SendErrorMessageToSender(NYql::TIssue issue) {
- Counters->Error->Inc();
- NYql::TIssues issues;
- issues.AddIssue(issue);
- Send(Sender, new TResponseProxy(issues, {}), 0,
- Cookie); // Change to template
- PassAway();
- }
-
-private:
- TTableClientPtr CreateNewTableClient(
- const NFq::TComputeConfig& computeConfig,
- const TYqSharedResources::TPtr& yqSharedResources,
- const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory) {
-
- NFq::NConfig::TYdbStorageConfig computeConnection = computeConfig.GetConnection(Scope);
- computeConnection.set_endpoint(Event->Get()->ComputeDatabase->connection().endpoint());
- computeConnection.set_database(Event->Get()->ComputeDatabase->connection().database());
- computeConnection.set_usessl(Event->Get()->ComputeDatabase->connection().usessl());
-
- auto tableSettings = GetClientSettings<NYdb::NTable::TClientSettings>(
- computeConnection, credentialsProviderFactory);
- return std::make_unique<NYdb::NTable::TTableClient>(
- yqSharedResources->UserSpaceYdbDriver, tableSettings);
- }
-};
-
template<class TEventRequest, class TResponseProxy>
class TResolveFolderActor : public NActors::TActorBootstrapped<TResolveFolderActor<TEventRequest, TResponseProxy>> {
using TBase = NActors::TActorBootstrapped<TResolveFolderActor<TEventRequest, TResponseProxy>>;
@@ -1258,16 +461,17 @@ public:
}
};
-template<class TRequestProto, class TRequest, class TResponse, class TResponseProxy>
-class TRequestActor : public NActors::TActorBootstrapped<TRequestActor<TRequestProto, TRequest, TResponse, TResponseProxy>> {
+template<class TRequestProto, class TRequest, class TResponse, class TRequestProxy, class TResponseProxy>
+class TRequestActor : public NActors::TActorBootstrapped<TRequestActor<TRequestProto, TRequest, TResponse, TRequestProxy, TResponseProxy>> {
protected:
- using TBase = NActors::TActorBootstrapped<TRequestActor<TRequestProto, TRequest, TResponse, TResponseProxy>>;
+ using TBase = NActors::TActorBootstrapped<TRequestActor<TRequestProto, TRequest, TResponse, TRequestProxy, TResponseProxy>>;
using TBase::SelfId;
using TBase::Send;
using TBase::PassAway;
using TBase::Become;
using TBase::Schedule;
+ typename TRequestProxy::TPtr RequestProxy;
::NFq::TControlPlaneProxyConfig Config;
TRequestProto RequestProto;
TString Scope;
@@ -1287,20 +491,31 @@ protected:
TTenantInfo::TPtr TenantInfo;
TMaybe<FederatedQuery::Internal::ComputeDatabaseInternal> ComputeDatabase;
ui32 RetryCount = 0;
+ bool ReplyWithResponseOnSuccess = true;
public:
static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_REQUEST_ACTOR";
- explicit TRequestActor(const ::NFq::TControlPlaneProxyConfig& config,
- TActorId sender, ui32 cookie,
- const TString& scope, const TString& folderId, TRequestProto&& requestProto,
- TString&& user, TString&& token, const TActorId& serviceId,
+ explicit TRequestActor(typename TRequestProxy::TPtr requestProxy,
+ const ::NFq::TControlPlaneProxyConfig& config,
+ TActorId sender,
+ ui32 cookie,
+ const TString& scope,
+ const TString& folderId,
+ TRequestProto&& requestProto,
+ TString&& user,
+ TString&& token,
+ const TActorId& serviceId,
const TRequestCounters& counters,
const std::function<void(const TDuration&, bool, bool)>& probe,
TPermissions permissions,
- const TString& cloudId, const TString& subjectType, TMaybe<TQuotaMap>&& quotas = Nothing(),
- TMaybe<FederatedQuery::Internal::ComputeDatabaseInternal>&& computeDatabase = Nothing())
- : Config(config)
+ const TString& cloudId,
+ const TString& subjectType,
+ TMaybe<TQuotaMap>&& quotas = Nothing(),
+ TMaybe<FederatedQuery::Internal::ComputeDatabaseInternal>&& computeDatabase = Nothing(),
+ bool replyWithResponseOnSuccess = true)
+ : RequestProxy(requestProxy)
+ , Config(config)
, RequestProto(std::forward<TRequestProto>(requestProto))
, Scope(scope)
, FolderId(folderId)
@@ -1317,6 +532,7 @@ public:
, SubjectType(subjectType)
, Quotas(std::move(quotas))
, ComputeDatabase(std::move(computeDatabase))
+ , ReplyWithResponseOnSuccess(replyWithResponseOnSuccess)
{
Counters.IncInFly();
}
@@ -1398,7 +614,13 @@ public:
const TDuration delta = TInstant::Now() - StartTime;
Counters.IncOk();
Probe(delta, true, false);
- Send(Sender, new TResponseProxy(std::forward<TArgs>(args)..., SubjectType), 0, Cookie);
+ if (ReplyWithResponseOnSuccess) { // constexpr
+ Send(Sender, new TResponseProxy(std::forward<TArgs>(args)..., SubjectType), 0, Cookie);
+ } else {
+ RequestProxy->Get()->Response = std::make_unique<TResponseProxy>(std::forward<TArgs>(args)..., SubjectType);
+ RequestProxy->Get()->ControlPlaneYDBOperationWasPerformed = true;
+ Send(RequestProxy->Forward(ControlPlaneProxyActorId()));
+ }
PassAway();
}
@@ -1421,13 +643,16 @@ public:
class TCreateQueryRequestActor : public TRequestActor<FederatedQuery::CreateQueryRequest,
TEvControlPlaneStorage::TEvCreateQueryRequest,
TEvControlPlaneStorage::TEvCreateQueryResponse,
+ TEvControlPlaneProxy::TEvCreateQueryRequest,
TEvControlPlaneProxy::TEvCreateQueryResponse>
{
bool QuoterResourceCreated = false;
+
public:
using TBaseRequestActor = TRequestActor<FederatedQuery::CreateQueryRequest,
TEvControlPlaneStorage::TEvCreateQueryRequest,
TEvControlPlaneStorage::TEvCreateQueryResponse,
+ TEvControlPlaneProxy::TEvCreateQueryRequest,
TEvControlPlaneProxy::TEvCreateQueryResponse>;
using TBaseRequestActor::TBaseRequestActor;
@@ -1484,175 +709,7 @@ public:
};
class TControlPlaneProxyActor : public NActors::TActorBootstrapped<TControlPlaneProxyActor> {
- enum ERequestTypeScope {
- RTS_CREATE_QUERY,
- RTS_LIST_QUERIES,
- RTS_DESCRIBE_QUERY,
- RTS_GET_QUERY_STATUS,
- RTS_MODIFY_QUERY,
- RTS_DELETE_QUERY,
- RTS_CONTROL_QUERY,
- RTS_GET_RESULT_DATA,
- RTS_LIST_JOBS,
- RTS_DESCRIBE_JOB,
- RTS_CREATE_CONNECTION,
- RTS_LIST_CONNECTIONS,
- RTS_DESCRIBE_CONNECTION,
- RTS_MODIFY_CONNECTION,
- RTS_DELETE_CONNECTION,
- RTS_TEST_CONNECTION,
- RTS_CREATE_BINDING,
- RTS_LIST_BINDINGS,
- RTS_DESCRIBE_BINDING,
- RTS_MODIFY_BINDING,
- RTS_DELETE_BINDING,
- RTS_MAX,
- };
-
- enum ERequestTypeCommon {
- RTC_RESOLVE_FOLDER,
- RTC_CREATE_QUERY,
- RTC_LIST_QUERIES,
- RTC_DESCRIBE_QUERY,
- RTC_GET_QUERY_STATUS,
- RTC_MODIFY_QUERY,
- RTC_DELETE_QUERY,
- RTC_CONTROL_QUERY,
- RTC_GET_RESULT_DATA,
- RTC_LIST_JOBS,
- RTC_DESCRIBE_JOB,
- RTC_CREATE_CONNECTION,
- RTC_LIST_CONNECTIONS,
- RTC_DESCRIBE_CONNECTION,
- RTC_MODIFY_CONNECTION,
- RTC_DELETE_CONNECTION,
- RTC_TEST_CONNECTION,
- RTC_CREATE_BINDING,
- RTC_LIST_BINDINGS,
- RTC_DESCRIBE_BINDING,
- RTC_MODIFY_BINDING,
- RTC_DELETE_BINDING,
- RTC_RESOLVE_SUBJECT_TYPE,
- RTC_CREATE_CONNECTION_IN_YDB,
- RTC_CREATE_BINDING_IN_YDB,
- RTC_CREATE_COMPUTE_DATABASE,
- RTC_MAX,
- };
-
- class TCounters: public virtual TThrRefBase {
- struct TMetricsScope {
- TString CloudId;
- TString Scope;
-
- TMetricsScope() = default;
-
- TMetricsScope(const TString& cloudId, const TString& scope)
- : CloudId(cloudId)
- , Scope(scope)
- {}
-
- bool operator<(const TMetricsScope& right) const {
- return std::make_pair(CloudId, Scope) < std::make_pair(right.CloudId, right.Scope);
- }
- };
-
- using TScopeCounters = std::array<TRequestScopeCountersPtr, RTS_MAX>;
- using TScopeCountersPtr = std::shared_ptr<TScopeCounters>;
-
- std::array<TRequestCommonCountersPtr, RTC_MAX> CommonRequests = CreateArray<RTC_MAX, TRequestCommonCountersPtr>({
- { MakeIntrusive<TRequestCommonCounters>("ResolveFolder") },
- { MakeIntrusive<TRequestCommonCounters>("CreateQuery") },
- { MakeIntrusive<TRequestCommonCounters>("ListQueries") },
- { MakeIntrusive<TRequestCommonCounters>("DescribeQuery") },
- { MakeIntrusive<TRequestCommonCounters>("GetQueryStatus") },
- { MakeIntrusive<TRequestCommonCounters>("ModifyQuery") },
- { MakeIntrusive<TRequestCommonCounters>("DeleteQuery") },
- { MakeIntrusive<TRequestCommonCounters>("ControlQuery") },
- { MakeIntrusive<TRequestCommonCounters>("GetResultData") },
- { MakeIntrusive<TRequestCommonCounters>("ListJobs") },
- { MakeIntrusive<TRequestCommonCounters>("DescribeJob") },
- { MakeIntrusive<TRequestCommonCounters>("CreateConnection") },
- { MakeIntrusive<TRequestCommonCounters>("ListConnections") },
- { MakeIntrusive<TRequestCommonCounters>("DescribeConnection") },
- { MakeIntrusive<TRequestCommonCounters>("ModifyConnection") },
- { MakeIntrusive<TRequestCommonCounters>("DeleteConnection") },
- { MakeIntrusive<TRequestCommonCounters>("TestConnection") },
- { MakeIntrusive<TRequestCommonCounters>("CreateBinding") },
- { MakeIntrusive<TRequestCommonCounters>("ListBindings") },
- { MakeIntrusive<TRequestCommonCounters>("DescribeBinding") },
- { MakeIntrusive<TRequestCommonCounters>("ModifyBinding") },
- { MakeIntrusive<TRequestCommonCounters>("DeleteBinding") },
- { MakeIntrusive<TRequestCommonCounters>("ResolveSubjectType") },
- { MakeIntrusive<TRequestCommonCounters>("CreateConnectionInYDB") },
- { MakeIntrusive<TRequestCommonCounters>("CreateBindingInYDB") },
- { MakeIntrusive<TRequestCommonCounters>("CreateComputeDatabase") },
- });
-
- TTtlCache<TMetricsScope, TScopeCountersPtr, TMap> ScopeCounters{TTtlCacheSettings{}.SetTtl(TDuration::Days(1))};
- ::NMonitoring::TDynamicCounterPtr Counters;
-
- public:
- explicit TCounters(const ::NMonitoring::TDynamicCounterPtr& counters)
- : Counters(counters)
- {
- for (auto& request: CommonRequests) {
- request->Register(Counters);
- }
- }
-
- TRequestCounters GetCounters(const TString& cloudId, const TString& scope, ERequestTypeScope scopeType, ERequestTypeCommon commonType) {
- return {GetScopeCounters(cloudId, scope, scopeType), GetCommonCounters(commonType)};
- }
-
- TRequestCommonCountersPtr GetCommonCounters(ERequestTypeCommon type) {
- return CommonRequests[type];
- }
-
- TRequestScopeCountersPtr GetScopeCounters(const TString& cloudId, const TString& scope, ERequestTypeScope type) {
- TMetricsScope key{cloudId, scope};
- TMaybe<TScopeCountersPtr> cacheVal;
- ScopeCounters.Get(key, &cacheVal);
- if (cacheVal) {
- return (**cacheVal)[type];
- }
-
- auto scopeRequests = std::make_shared<TScopeCounters>(CreateArray<RTS_MAX, TRequestScopeCountersPtr>({
- { MakeIntrusive<TRequestScopeCounters>("CreateQuery") },
- { MakeIntrusive<TRequestScopeCounters>("ListQueries") },
- { MakeIntrusive<TRequestScopeCounters>("DescribeQuery") },
- { MakeIntrusive<TRequestScopeCounters>("GetQueryStatus") },
- { MakeIntrusive<TRequestScopeCounters>("ModifyQuery") },
- { MakeIntrusive<TRequestScopeCounters>("DeleteQuery") },
- { MakeIntrusive<TRequestScopeCounters>("ControlQuery") },
- { MakeIntrusive<TRequestScopeCounters>("GetResultData") },
- { MakeIntrusive<TRequestScopeCounters>("ListJobs") },
- { MakeIntrusive<TRequestScopeCounters>("DescribeJob") },
- { MakeIntrusive<TRequestScopeCounters>("CreateConnection") },
- { MakeIntrusive<TRequestScopeCounters>("ListConnections") },
- { MakeIntrusive<TRequestScopeCounters>("DescribeConnection") },
- { MakeIntrusive<TRequestScopeCounters>("ModifyConnection") },
- { MakeIntrusive<TRequestScopeCounters>("DeleteConnection") },
- { MakeIntrusive<TRequestScopeCounters>("TestConnection") },
- { MakeIntrusive<TRequestScopeCounters>("CreateBinding") },
- { MakeIntrusive<TRequestScopeCounters>("ListBindings") },
- { MakeIntrusive<TRequestScopeCounters>("DescribeBinding") },
- { MakeIntrusive<TRequestScopeCounters>("ModifyBinding") },
- { MakeIntrusive<TRequestScopeCounters>("DeleteBinding") },
- }));
-
- auto scopeCounters = Counters
- ->GetSubgroup("cloud_id", cloudId)
- ->GetSubgroup("scope", scope);
-
- for (auto& request: *scopeRequests) {
- request->Register(scopeCounters);
- }
- cacheVal = scopeRequests;
- ScopeCounters.Put(key, cacheVal);
- return (*scopeRequests)[type];
- }
- };
-
+private:
TCounters Counters;
const ::NFq::TControlPlaneProxyConfig Config;
const TYqSharedResources::TPtr YqSharedResources;
@@ -1748,7 +805,7 @@ private:
template<typename T>
TPermissions ExtractPermissions(T& ev, const TPermissions& availablePermissions) {
TPermissions permissions;
- for (const auto& permission: ev->Get()->Permissions) {
+ for (const auto& permission : ev->Get()->Permissions) {
if (auto it = PermissionsItems.find(permission); it != PermissionsItems.end()) {
// cut off permissions that should not be used in other services
if (availablePermissions.Check(it->second)) {
@@ -1766,7 +823,7 @@ private:
return issues;
}
- for (const auto& requiredPermission: requiredPermissions) {
+ for (const auto& requiredPermission : requiredPermissions) {
if (!IsIn(ev->Get()->Permissions, requiredPermission)) {
issues.AddIssue(MakeErrorIssue(TIssuesIds::ACCESS_DENIED, "No permission " + requiredPermission + " in a given scope yandexcloud://" + ev->Get()->FolderId));
}
@@ -1837,14 +894,26 @@ private:
| TPermissions::TPermission::MANAGE_PUBLIC
};
- Register(new TCreateQueryRequestActor
- (Config, sender, cookie, scope, folderId,
- std::move(request), std::move(user), std::move(token),
- ControlPlaneStorageServiceActorId(),
- requestCounters,
- probe, ExtractPermissions(ev, availablePermissions),
- cloudId, subjectType, std::move(ev->Get()->Quotas),
- std::move(ev->Get()->ComputeDatabase)));
+ {
+ auto sender = ev->Sender;
+ auto cookie = ev->Cookie;
+ auto permissions = ExtractPermissions(ev, availablePermissions);
+ auto computeDatabase = ev->Get()->ComputeDatabase;
+ auto quotas = ev->Get()->Quotas;
+ Register(new TCreateQueryRequestActor
+ (ev, Config, sender, cookie, scope, folderId,
+ std::move(request),
+ std::move(user),
+ std::move(token),
+ ControlPlaneStorageServiceActorId(),
+ requestCounters,
+ probe,
+ permissions,
+ cloudId,
+ subjectType,
+ std::move(quotas),
+ std::move(computeDatabase)));
+ }
}
void Handle(TEvControlPlaneProxy::TEvListQueriesRequest::TPtr& ev) {
@@ -1900,16 +969,26 @@ private:
| TPermissions::TPermission::VIEW_PRIVATE
};
- Register(new TRequestActor<FederatedQuery::ListQueriesRequest,
- TEvControlPlaneStorage::TEvListQueriesRequest,
- TEvControlPlaneStorage::TEvListQueriesResponse,
- TEvControlPlaneProxy::TEvListQueriesResponse>
- (Config, ev->Sender, ev->Cookie, scope, folderId,
- std::move(request), std::move(user), std::move(token),
- ControlPlaneStorageServiceActorId(),
- requestCounters,
- probe,
- ExtractPermissions(ev, availablePermissions), cloudId, subjectType));
+ {
+ auto sender = ev->Sender;
+ auto cookie = ev->Cookie;
+ auto permissions = ExtractPermissions(ev, availablePermissions);
+ Register(new TRequestActor<FederatedQuery::ListQueriesRequest,
+ TEvControlPlaneStorage::TEvListQueriesRequest,
+ TEvControlPlaneStorage::TEvListQueriesResponse,
+ TEvControlPlaneProxy::TEvListQueriesRequest,
+ TEvControlPlaneProxy::TEvListQueriesResponse>
+ (ev, Config, sender, cookie, scope, folderId,
+ std::move(request),
+ std::move(user),
+ std::move(token),
+ ControlPlaneStorageServiceActorId(),
+ requestCounters,
+ probe,
+ permissions,
+ cloudId,
+ subjectType));
+ }
}
void Handle(TEvControlPlaneProxy::TEvDescribeQueryRequest::TPtr& ev) {
@@ -1968,16 +1047,26 @@ private:
| TPermissions::VIEW_QUERY_TEXT
};
- Register(new TRequestActor<FederatedQuery::DescribeQueryRequest,
- TEvControlPlaneStorage::TEvDescribeQueryRequest,
- TEvControlPlaneStorage::TEvDescribeQueryResponse,
- TEvControlPlaneProxy::TEvDescribeQueryResponse>
- (Config, ev->Sender, ev->Cookie, scope, folderId,
- std::move(request), std::move(user), std::move(token),
- ControlPlaneStorageServiceActorId(),
- requestCounters,
- probe,
- ExtractPermissions(ev, availablePermissions), cloudId, subjectType));
+ {
+ auto sender = ev->Sender;
+ auto cookie = ev->Cookie;
+ auto permissions = ExtractPermissions(ev, availablePermissions);
+ Register(new TRequestActor<FederatedQuery::DescribeQueryRequest,
+ TEvControlPlaneStorage::TEvDescribeQueryRequest,
+ TEvControlPlaneStorage::TEvDescribeQueryResponse,
+ TEvControlPlaneProxy::TEvDescribeQueryRequest,
+ TEvControlPlaneProxy::TEvDescribeQueryResponse>
+ (ev, Config, sender, cookie, scope, folderId,
+ std::move(request),
+ std::move(user),
+ std::move(token),
+ ControlPlaneStorageServiceActorId(),
+ requestCounters,
+ probe,
+ permissions,
+ cloudId,
+ subjectType));
+ }
}
void Handle(TEvControlPlaneProxy::TEvGetQueryStatusRequest::TPtr& ev) {
@@ -2034,16 +1123,26 @@ private:
| TPermissions::TPermission::VIEW_PRIVATE
};
- Register(new TRequestActor<FederatedQuery::GetQueryStatusRequest,
- TEvControlPlaneStorage::TEvGetQueryStatusRequest,
- TEvControlPlaneStorage::TEvGetQueryStatusResponse,
- TEvControlPlaneProxy::TEvGetQueryStatusResponse>
- (Config, ev->Sender, ev->Cookie, scope, folderId,
- std::move(request), std::move(user), std::move(token),
- ControlPlaneStorageServiceActorId(),
- requestCounters,
- probe,
- ExtractPermissions(ev, availablePermissions), cloudId, subjectType));
+ {
+ auto sender = ev->Sender;
+ auto cookie = ev->Cookie;
+ auto permissions = ExtractPermissions(ev, availablePermissions);
+ Register(new TRequestActor<FederatedQuery::GetQueryStatusRequest,
+ TEvControlPlaneStorage::TEvGetQueryStatusRequest,
+ TEvControlPlaneStorage::TEvGetQueryStatusResponse,
+ TEvControlPlaneProxy::TEvGetQueryStatusRequest,
+ TEvControlPlaneProxy::TEvGetQueryStatusResponse>
+ (ev, Config, sender, cookie, scope, folderId,
+ std::move(request),
+ std::move(user),
+ std::move(token),
+ ControlPlaneStorageServiceActorId(),
+ requestCounters,
+ probe,
+ permissions,
+ cloudId,
+ subjectType));
+ }
}
void Handle(TEvControlPlaneProxy::TEvModifyQueryRequest::TPtr& ev) {
@@ -2110,16 +1209,29 @@ private:
| TPermissions::TPermission::MANAGE_PRIVATE
};
- Register(new TRequestActor<FederatedQuery::ModifyQueryRequest,
- TEvControlPlaneStorage::TEvModifyQueryRequest,
- TEvControlPlaneStorage::TEvModifyQueryResponse,
- TEvControlPlaneProxy::TEvModifyQueryResponse>
- (Config, ev->Sender, ev->Cookie, scope, folderId,
- std::move(request), std::move(user), std::move(token),
- ControlPlaneStorageServiceActorId(),
- requestCounters,
- probe, ExtractPermissions(ev, availablePermissions),
- cloudId, subjectType, {}, std::move(ev->Get()->ComputeDatabase)));
+ {
+ auto sender = ev->Sender;
+ auto cookie = ev->Cookie;
+ auto permissions = ExtractPermissions(ev, availablePermissions);
+ auto computeDatabase = ev->Get()->ComputeDatabase;
+ Register(new TRequestActor<FederatedQuery::ModifyQueryRequest,
+ TEvControlPlaneStorage::TEvModifyQueryRequest,
+ TEvControlPlaneStorage::TEvModifyQueryResponse,
+ TEvControlPlaneProxy::TEvModifyQueryRequest,
+ TEvControlPlaneProxy::TEvModifyQueryResponse>
+ (ev, Config, sender, cookie, scope, folderId,
+ std::move(request),
+ std::move(user),
+ std::move(token),
+ ControlPlaneStorageServiceActorId(),
+ requestCounters,
+ probe,
+ permissions,
+ cloudId,
+ subjectType,
+ {},
+ std::move(computeDatabase)));
+ }
}
void Handle(TEvControlPlaneProxy::TEvDeleteQueryRequest::TPtr& ev) {
@@ -2176,16 +1288,26 @@ private:
| TPermissions::TPermission::MANAGE_PRIVATE
};
- Register(new TRequestActor<FederatedQuery::DeleteQueryRequest,
- TEvControlPlaneStorage::TEvDeleteQueryRequest,
- TEvControlPlaneStorage::TEvDeleteQueryResponse,
- TEvControlPlaneProxy::TEvDeleteQueryResponse>
- (Config, ev->Sender, ev->Cookie, scope, folderId,
- std::move(request), std::move(user), std::move(token),
- ControlPlaneStorageServiceActorId(),
- requestCounters,
- probe,
- ExtractPermissions(ev, availablePermissions), cloudId, subjectType));
+ {
+ auto sender = ev->Sender;
+ auto cookie = ev->Cookie;
+ auto permissions = ExtractPermissions(ev, availablePermissions);
+ Register(new TRequestActor<FederatedQuery::DeleteQueryRequest,
+ TEvControlPlaneStorage::TEvDeleteQueryRequest,
+ TEvControlPlaneStorage::TEvDeleteQueryResponse,
+ TEvControlPlaneProxy::TEvDeleteQueryRequest,
+ TEvControlPlaneProxy::TEvDeleteQueryResponse>
+ (ev, Config, sender, cookie, scope, folderId,
+ std::move(request),
+ std::move(user),
+ std::move(token),
+ ControlPlaneStorageServiceActorId(),
+ requestCounters,
+ probe,
+ permissions,
+ cloudId,
+ subjectType));
+ }
}
void Handle(TEvControlPlaneProxy::TEvControlQueryRequest::TPtr& ev) {
@@ -2242,16 +1364,26 @@ private:
| TPermissions::TPermission::MANAGE_PRIVATE
};
- Register(new TRequestActor<FederatedQuery::ControlQueryRequest,
- TEvControlPlaneStorage::TEvControlQueryRequest,
- TEvControlPlaneStorage::TEvControlQueryResponse,
- TEvControlPlaneProxy::TEvControlQueryResponse>
- (Config, ev->Sender, ev->Cookie, scope, folderId,
- std::move(request), std::move(user), std::move(token),
- ControlPlaneStorageServiceActorId(),
- requestCounters,
- probe,
- ExtractPermissions(ev, availablePermissions), cloudId, subjectType));
+ {
+ auto sender = ev->Sender;
+ auto cookie = ev->Cookie;
+ auto permissions = ExtractPermissions(ev, availablePermissions);
+ Register(new TRequestActor<FederatedQuery::ControlQueryRequest,
+ TEvControlPlaneStorage::TEvControlQueryRequest,
+ TEvControlPlaneStorage::TEvControlQueryResponse,
+ TEvControlPlaneProxy::TEvControlQueryRequest,
+ TEvControlPlaneProxy::TEvControlQueryResponse>
+ (ev, Config, sender, cookie, scope, folderId,
+ std::move(request),
+ std::move(user),
+ std::move(token),
+ ControlPlaneStorageServiceActorId(),
+ requestCounters,
+ probe,
+ permissions,
+ cloudId,
+ subjectType));
+ }
}
void Handle(TEvControlPlaneProxy::TEvGetResultDataRequest::TPtr& ev) {
@@ -2311,16 +1443,26 @@ private:
| TPermissions::TPermission::VIEW_PRIVATE
};
- Register(new TRequestActor<FederatedQuery::GetResultDataRequest,
- TEvControlPlaneStorage::TEvGetResultDataRequest,
- TEvControlPlaneStorage::TEvGetResultDataResponse,
- TEvControlPlaneProxy::TEvGetResultDataResponse>
- (Config, ev->Sender, ev->Cookie, scope, folderId,
- std::move(request), std::move(user), std::move(token),
- ControlPlaneStorageServiceActorId(),
- requestCounters,
- probe,
- ExtractPermissions(ev, availablePermissions), cloudId, subjectType));
+ {
+ auto sender = ev->Sender;
+ auto cookie = ev->Cookie;
+ auto permissions = ExtractPermissions(ev, availablePermissions);
+ Register(new TRequestActor<FederatedQuery::GetResultDataRequest,
+ TEvControlPlaneStorage::TEvGetResultDataRequest,
+ TEvControlPlaneStorage::TEvGetResultDataResponse,
+ TEvControlPlaneProxy::TEvGetResultDataRequest,
+ TEvControlPlaneProxy::TEvGetResultDataResponse>
+ (ev, Config, sender, cookie, scope, folderId,
+ std::move(request),
+ std::move(user),
+ std::move(token),
+ ControlPlaneStorageServiceActorId(),
+ requestCounters,
+ probe,
+ permissions,
+ cloudId,
+ subjectType));
+ }
}
void Handle(TEvControlPlaneProxy::TEvListJobsRequest::TPtr& ev) {
@@ -2377,16 +1519,26 @@ private:
| TPermissions::TPermission::VIEW_PRIVATE
};
- Register(new TRequestActor<FederatedQuery::ListJobsRequest,
- TEvControlPlaneStorage::TEvListJobsRequest,
- TEvControlPlaneStorage::TEvListJobsResponse,
- TEvControlPlaneProxy::TEvListJobsResponse>
- (Config, ev->Sender, ev->Cookie, scope, folderId,
- std::move(request), std::move(user), std::move(token),
- ControlPlaneStorageServiceActorId(),
- requestCounters,
- probe,
- ExtractPermissions(ev, availablePermissions), cloudId, subjectType));
+ {
+ auto sender = ev->Sender;
+ auto cookie = ev->Cookie;
+ auto permissions = ExtractPermissions(ev, availablePermissions);
+ Register(new TRequestActor<FederatedQuery::ListJobsRequest,
+ TEvControlPlaneStorage::TEvListJobsRequest,
+ TEvControlPlaneStorage::TEvListJobsResponse,
+ TEvControlPlaneProxy::TEvListJobsRequest,
+ TEvControlPlaneProxy::TEvListJobsResponse>
+ (ev, Config, sender, cookie, scope, folderId,
+ std::move(request),
+ std::move(user),
+ std::move(token),
+ ControlPlaneStorageServiceActorId(),
+ requestCounters,
+ probe,
+ permissions,
+ cloudId,
+ subjectType));
+ }
}
void Handle(TEvControlPlaneProxy::TEvDescribeJobRequest::TPtr& ev) {
@@ -2445,16 +1597,26 @@ private:
| TPermissions::VIEW_QUERY_TEXT
};
- Register(new TRequestActor<FederatedQuery::DescribeJobRequest,
- TEvControlPlaneStorage::TEvDescribeJobRequest,
- TEvControlPlaneStorage::TEvDescribeJobResponse,
- TEvControlPlaneProxy::TEvDescribeJobResponse>
- (Config, ev->Sender, ev->Cookie, scope, folderId,
- std::move(request), std::move(user), std::move(token),
- ControlPlaneStorageServiceActorId(),
- requestCounters,
- probe,
- ExtractPermissions(ev, availablePermissions), cloudId, subjectType));
+ {
+ auto sender = ev->Sender;
+ auto cookie = ev->Cookie;
+ auto permissions = ExtractPermissions(ev, availablePermissions);
+ Register(new TRequestActor<FederatedQuery::DescribeJobRequest,
+ TEvControlPlaneStorage::TEvDescribeJobRequest,
+ TEvControlPlaneStorage::TEvDescribeJobResponse,
+ TEvControlPlaneProxy::TEvDescribeJobRequest,
+ TEvControlPlaneProxy::TEvDescribeJobResponse>
+ (ev, Config, sender, cookie, scope, folderId,
+ std::move(request),
+ std::move(user),
+ std::move(token),
+ ControlPlaneStorageServiceActorId(),
+ requestCounters,
+ probe,
+ permissions,
+ cloudId,
+ subjectType));
+ }
}
void Handle(TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& ev) {
@@ -2525,31 +1687,53 @@ private:
};
if (Config.ComputeConfig.YdbComputeControlPlaneEnabled() && !ydbOperationWasPerformed) {
- Register(new TCreateConnectionInYDBActor(
- Counters.GetCommonCounters(RTC_CREATE_CONNECTION_IN_YDB),
- Config.ComputeConfig,
- YqSharedResources,
- CredentialsProviderFactory,
- Config.CommonConfig.GetObjectStorageEndpoint(),
+ if (!ev->Get()->YDBSession) {
+ Register(MakeComputeYDBSessionActor<
+ TEvControlPlaneProxy::TEvCreateConnectionRequest,
+ TEvControlPlaneProxy::TEvCreateConnectionResponse>(
+ sender,
+ std::move(ev),
+ Config.RequestTimeout,
+ Counters.GetCommonCounters(RTC_CREATE_YDB_SESSION),
+ Config.ComputeConfig,
+ YqSharedResources,
+ CredentialsProviderFactory));
+ return;
+ }
+
+ Register(NPrivate::MakeCreateConnectionActor(
sender,
- ev,
- cookie,
- scope,
- Signer,
- Config.RequestTimeout));
- return;
- }
-
- Register(new TRequestActor<FederatedQuery::CreateConnectionRequest,
- TEvControlPlaneStorage::TEvCreateConnectionRequest,
- TEvControlPlaneStorage::TEvCreateConnectionResponse,
- TEvControlPlaneProxy::TEvCreateConnectionResponse>
- (Config, ev->Sender, ev->Cookie, scope, folderId,
- std::move(request), std::move(user), std::move(token),
- ControlPlaneStorageServiceActorId(),
- requestCounters,
- probe, ExtractPermissions(ev, availablePermissions),
- cloudId, subjectType, {}, std::move(ev->Get()->ComputeDatabase)));
+ std::move(ev),
+ Config.RequestTimeout,
+ Counters,
+ Config.CommonConfig.GetObjectStorageEndpoint(),
+ Signer));
+ return;
+ }
+
+ {
+ auto sender = ev->Sender;
+ auto cookie = ev->Cookie;
+ auto permissions = ExtractPermissions(ev, availablePermissions);
+ auto computeDatabase = ev->Get()->ComputeDatabase;
+ Register(new TRequestActor<FederatedQuery::CreateConnectionRequest,
+ TEvControlPlaneStorage::TEvCreateConnectionRequest,
+ TEvControlPlaneStorage::TEvCreateConnectionResponse,
+ TEvControlPlaneProxy::TEvCreateConnectionRequest,
+ TEvControlPlaneProxy::TEvCreateConnectionResponse>
+ (ev, Config, sender, cookie, scope, folderId,
+ std::move(request),
+ std::move(user),
+ std::move(token),
+ ControlPlaneStorageServiceActorId(),
+ requestCounters,
+ probe,
+ permissions,
+ cloudId,
+ subjectType,
+ {},
+ std::move(computeDatabase)));
+ }
}
void Handle(TEvControlPlaneProxy::TEvListConnectionsRequest::TPtr& ev) {
@@ -2605,16 +1789,26 @@ private:
| TPermissions::TPermission::VIEW_PRIVATE
};
- Register(new TRequestActor<FederatedQuery::ListConnectionsRequest,
- TEvControlPlaneStorage::TEvListConnectionsRequest,
- TEvControlPlaneStorage::TEvListConnectionsResponse,
- TEvControlPlaneProxy::TEvListConnectionsResponse>
- (Config, ev->Sender, ev->Cookie, scope, folderId,
- std::move(request), std::move(user), std::move(token),
- ControlPlaneStorageServiceActorId(),
- requestCounters,
- probe,
- ExtractPermissions(ev, availablePermissions), cloudId, subjectType));
+ {
+ auto sender = ev->Sender;
+ auto cookie = ev->Cookie;
+ auto permissions = ExtractPermissions(ev, availablePermissions);
+ Register(new TRequestActor<FederatedQuery::ListConnectionsRequest,
+ TEvControlPlaneStorage::TEvListConnectionsRequest,
+ TEvControlPlaneStorage::TEvListConnectionsResponse,
+ TEvControlPlaneProxy::TEvListConnectionsRequest,
+ TEvControlPlaneProxy::TEvListConnectionsResponse>
+ (ev, Config, sender, cookie, scope, folderId,
+ std::move(request),
+ std::move(user),
+ std::move(token),
+ ControlPlaneStorageServiceActorId(),
+ requestCounters,
+ probe,
+ permissions,
+ cloudId,
+ subjectType));
+ }
}
void Handle(TEvControlPlaneProxy::TEvDescribeConnectionRequest::TPtr& ev) {
@@ -2671,16 +1865,26 @@ private:
| TPermissions::TPermission::VIEW_PRIVATE
};
- Register(new TRequestActor<FederatedQuery::DescribeConnectionRequest,
- TEvControlPlaneStorage::TEvDescribeConnectionRequest,
- TEvControlPlaneStorage::TEvDescribeConnectionResponse,
- TEvControlPlaneProxy::TEvDescribeConnectionResponse>
- (Config, ev->Sender, ev->Cookie, scope, folderId,
- std::move(request), std::move(user), std::move(token),
- ControlPlaneStorageServiceActorId(),
- requestCounters,
- probe,
- ExtractPermissions(ev, availablePermissions), cloudId, subjectType));
+ {
+ auto sender = ev->Sender;
+ auto cookie = ev->Cookie;
+ auto permissions = ExtractPermissions(ev, availablePermissions);
+ Register(new TRequestActor<FederatedQuery::DescribeConnectionRequest,
+ TEvControlPlaneStorage::TEvDescribeConnectionRequest,
+ TEvControlPlaneStorage::TEvDescribeConnectionResponse,
+ TEvControlPlaneProxy::TEvDescribeConnectionRequest,
+ TEvControlPlaneProxy::TEvDescribeConnectionResponse>
+ (ev, Config, sender, cookie, scope, folderId,
+ std::move(request),
+ std::move(user),
+ std::move(token),
+ ControlPlaneStorageServiceActorId(),
+ requestCounters,
+ probe,
+ permissions,
+ cloudId,
+ subjectType));
+ }
}
void Handle(TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& ev) {
@@ -2749,19 +1953,73 @@ private:
static const TPermissions availablePermissions {
TPermissions::TPermission::MANAGE_PUBLIC
| TPermissions::TPermission::MANAGE_PRIVATE
+ | TPermissions::TPermission::VIEW_PUBLIC
+ | TPermissions::TPermission::VIEW_PRIVATE
};
- Register(new TRequestActor<FederatedQuery::ModifyConnectionRequest,
- TEvControlPlaneStorage::TEvModifyConnectionRequest,
- TEvControlPlaneStorage::TEvModifyConnectionResponse,
- TEvControlPlaneProxy::TEvModifyConnectionResponse>
- (Config, ev->Sender, ev->Cookie, scope, folderId,
- std::move(request), std::move(user), std::move(token),
- ControlPlaneStorageServiceActorId(),
- requestCounters,
- probe,
- ExtractPermissions(ev, availablePermissions),
- cloudId, subjectType, {}, std::move(ev->Get()->ComputeDatabase)));
+ if (Config.ComputeConfig.YdbComputeControlPlaneEnabled() && !ev->Get()->OldConnectionContent) {
+ auto permissions = ExtractPermissions(ev, availablePermissions);
+ Register(MakeDiscoverYDBConnectionName(
+ sender, ev, Counters, Config.RequestTimeout, permissions));
+ return;
+ }
+
+ const bool controlPlaneYDBOperationWasPerformed = ev->Get()->ControlPlaneYDBOperationWasPerformed;
+ if (!controlPlaneYDBOperationWasPerformed) {
+ auto sender = ev->Sender;
+ auto cookie = ev->Cookie;
+ auto permissions = ExtractPermissions(ev, availablePermissions);
+ auto computeDatabase = ev->Get()->ComputeDatabase;
+ Register(new TRequestActor<FederatedQuery::ModifyConnectionRequest,
+ TEvControlPlaneStorage::TEvModifyConnectionRequest,
+ TEvControlPlaneStorage::TEvModifyConnectionResponse,
+ TEvControlPlaneProxy::TEvModifyConnectionRequest,
+ TEvControlPlaneProxy::TEvModifyConnectionResponse>
+ (ev, Config, sender, cookie, scope, folderId,
+ std::move(request),
+ std::move(user),
+ std::move(token),
+ ControlPlaneStorageServiceActorId(),
+ requestCounters,
+ probe,
+ permissions,
+ cloudId,
+ subjectType,
+ {},
+ std::move(computeDatabase),
+ !Config.ComputeConfig.YdbComputeControlPlaneEnabled()));
+ return;
+ }
+
+ if (Config.ComputeConfig.YdbComputeControlPlaneEnabled()) {
+ if (!ev->Get()->YDBSession) {
+ Register(MakeComputeYDBSessionActor<
+ TEvControlPlaneProxy::TEvModifyConnectionRequest,
+ TEvControlPlaneProxy::TEvModifyConnectionResponse>(
+ sender,
+ std::move(ev),
+ Config.RequestTimeout,
+ Counters.GetCommonCounters(RTC_CREATE_YDB_SESSION),
+ Config.ComputeConfig,
+ YqSharedResources,
+ CredentialsProviderFactory));
+ return;
+ }
+
+ if (!ev->Get()->ComputeYDBOperationWasPerformed) {
+ Register(MakeModifyConnectionActor(
+ sender,
+ ev,
+ Config.RequestTimeout,
+ Counters,
+ Config.CommonConfig.GetObjectStorageEndpoint(),
+ Signer));
+ return;
+ }
+
+ Send(sender, ev->Get()->Response.release());
+ return;
+ }
}
void Handle(TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr& ev) {
@@ -2825,19 +2083,59 @@ private:
static const TPermissions availablePermissions {
TPermissions::TPermission::MANAGE_PUBLIC
| TPermissions::TPermission::MANAGE_PRIVATE
+ | TPermissions::TPermission::VIEW_PUBLIC
+ | TPermissions::TPermission::VIEW_PRIVATE
};
- Register(new TRequestActor<FederatedQuery::DeleteConnectionRequest,
- TEvControlPlaneStorage::TEvDeleteConnectionRequest,
- TEvControlPlaneStorage::TEvDeleteConnectionResponse,
- TEvControlPlaneProxy::TEvDeleteConnectionResponse>
- (Config, ev->Sender, ev->Cookie, scope, folderId,
- std::move(request), std::move(user), std::move(token),
- ControlPlaneStorageServiceActorId(),
- requestCounters,
- probe,
- ExtractPermissions(ev, availablePermissions),
- cloudId, subjectType, {}, std::move(ev->Get()->ComputeDatabase)));
+ if (Config.ComputeConfig.YdbComputeControlPlaneEnabled() && !ev->Get()->ConnectionContent) {
+ auto permissions = ExtractPermissions(ev, availablePermissions);
+ Register(MakeDiscoverYDBConnectionName(
+ sender, ev, Counters, Config.RequestTimeout, permissions));
+ return;
+ }
+
+ const bool controlPlaneYDBOperationWasPerformed = ev->Get()->ControlPlaneYDBOperationWasPerformed;
+ if (!controlPlaneYDBOperationWasPerformed) {
+ auto sender = ev->Sender;
+ auto cookie = ev->Cookie;
+ auto permissions = ExtractPermissions(ev, availablePermissions);
+ auto computeDatabase = ev->Get()->ComputeDatabase;
+ Register(new TRequestActor<FederatedQuery::DeleteConnectionRequest,
+ TEvControlPlaneStorage::TEvDeleteConnectionRequest,
+ TEvControlPlaneStorage::TEvDeleteConnectionResponse,
+ TEvControlPlaneProxy::TEvDeleteConnectionRequest,
+ TEvControlPlaneProxy::TEvDeleteConnectionResponse>
+ (ev, Config, sender, cookie, scope, folderId,
+ std::move(request), std::move(user), std::move(token),
+ ControlPlaneStorageServiceActorId(),
+ requestCounters,
+ probe,
+ permissions,
+ cloudId, subjectType, {}, std::move(computeDatabase), !Config.ComputeConfig.YdbComputeControlPlaneEnabled()));
+ return;
+ }
+
+ if (Config.ComputeConfig.YdbComputeControlPlaneEnabled()) {
+ if (!ev->Get()->YDBSession) {
+ Register(MakeComputeYDBSessionActor<
+ TEvControlPlaneProxy::TEvDeleteConnectionRequest,
+ TEvControlPlaneProxy::TEvDeleteConnectionResponse>(
+ sender,
+ std::move(ev),
+ Config.RequestTimeout,
+ Counters.GetCommonCounters(RTC_CREATE_YDB_SESSION),
+ Config.ComputeConfig, YqSharedResources, CredentialsProviderFactory));
+ return;
+ }
+
+ if (!ev->Get()->ComputeYDBOperationWasPerformed) {
+ Register(MakeDeleteConnectionActor(
+ sender, ev, Config.RequestTimeout, Counters, Signer));
+ return;
+ }
+
+ Send(sender, ev->Get()->Response.release());
+ }
}
void Handle(TEvControlPlaneProxy::TEvTestConnectionRequest::TPtr& ev) {
@@ -2893,15 +2191,21 @@ private:
return;
}
- Register(new TRequestActor<FederatedQuery::TestConnectionRequest,
- TEvTestConnection::TEvTestConnectionRequest,
- TEvTestConnection::TEvTestConnectionResponse,
- TEvControlPlaneProxy::TEvTestConnectionResponse>
- (Config, ev->Sender, ev->Cookie, scope, folderId,
- std::move(request), std::move(user), std::move(token),
- TestConnectionActorId(),
- requestCounters,
- probe, ExtractPermissions(ev, {}), cloudId, subjectType));
+ {
+ auto sender = ev->Sender;
+ auto cookie = ev->Cookie;
+ auto permissions = ExtractPermissions(ev, {});
+ Register(new TRequestActor<FederatedQuery::TestConnectionRequest,
+ TEvTestConnection::TEvTestConnectionRequest,
+ TEvTestConnection::TEvTestConnectionResponse,
+ TEvControlPlaneProxy::TEvTestConnectionRequest,
+ TEvControlPlaneProxy::TEvTestConnectionResponse>
+ (ev, Config, sender, cookie, scope, folderId,
+ std::move(request), std::move(user), std::move(token),
+ TestConnectionActorId(),
+ requestCounters,
+ probe, permissions, cloudId, subjectType));
+ }
}
void Handle(TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& ev) {
@@ -2972,31 +2276,47 @@ private:
};
if (Config.ComputeConfig.YdbComputeControlPlaneEnabled() && !ydbOperationWasPerformed) {
+ if (!ev->Get()->ConnectionName) {
+ auto permissions = ExtractPermissions(ev, availablePermissions);
+ Register(MakeDiscoverYDBConnectionName(
+ sender, ev, Counters, Config.RequestTimeout, permissions));
+ return;
+ }
+
+ if (!ev->Get()->YDBSession) {
+ Register(MakeComputeYDBSessionActor<
+ TEvControlPlaneProxy::TEvCreateBindingRequest,
+ TEvControlPlaneProxy::TEvCreateBindingResponse>(
+ sender,
+ std::move(ev),
+ Config.RequestTimeout,
+ Counters.GetCommonCounters(RTC_CREATE_YDB_SESSION),
+ Config.ComputeConfig, YqSharedResources, CredentialsProviderFactory));
+ return;
+ }
+
+ Register(MakeCreateBindingActor(
+ sender, std::move(ev), Config.RequestTimeout, Counters));
+ return;
+ }
+
+ {
+ auto sender = ev->Sender;
+ auto cookie = ev->Cookie;
auto permissions = ExtractPermissions(ev, availablePermissions);
- Register(new TCreateBindingInYDBActor(
- Counters.GetCommonCounters(RTC_CREATE_BINDING_IN_YDB),
- Config.ComputeConfig,
- YqSharedResources,
- CredentialsProviderFactory,
- sender,
- ev,
- cookie,
- std::move(permissions),
- scope,
- Config.RequestTimeout));
- return;
- }
-
- Register(new TRequestActor<FederatedQuery::CreateBindingRequest,
- TEvControlPlaneStorage::TEvCreateBindingRequest,
- TEvControlPlaneStorage::TEvCreateBindingResponse,
- TEvControlPlaneProxy::TEvCreateBindingResponse>
- (Config, ev->Sender, ev->Cookie, scope, folderId,
- std::move(request), std::move(user), std::move(token),
- ControlPlaneStorageServiceActorId(),
- requestCounters,
- probe, ExtractPermissions(ev, availablePermissions),
- cloudId, subjectType, {}, std::move(ev->Get()->ComputeDatabase)));
+ auto computeDatabase = ev->Get()->ComputeDatabase;
+ Register(new TRequestActor<FederatedQuery::CreateBindingRequest,
+ TEvControlPlaneStorage::TEvCreateBindingRequest,
+ TEvControlPlaneStorage::TEvCreateBindingResponse,
+ TEvControlPlaneProxy::TEvCreateBindingRequest,
+ TEvControlPlaneProxy::TEvCreateBindingResponse>
+ (ev, Config, sender, cookie, scope, folderId,
+ std::move(request), std::move(user), std::move(token),
+ ControlPlaneStorageServiceActorId(),
+ requestCounters,
+ probe, permissions,
+ cloudId, subjectType, {}, std::move(computeDatabase)));
+ }
}
void Handle(TEvControlPlaneProxy::TEvListBindingsRequest::TPtr& ev) {
@@ -3052,16 +2372,22 @@ private:
| TPermissions::TPermission::VIEW_PRIVATE
};
- Register(new TRequestActor<FederatedQuery::ListBindingsRequest,
- TEvControlPlaneStorage::TEvListBindingsRequest,
- TEvControlPlaneStorage::TEvListBindingsResponse,
- TEvControlPlaneProxy::TEvListBindingsResponse>
- (Config, ev->Sender, ev->Cookie, scope, folderId,
- std::move(request), std::move(user), std::move(token),
- ControlPlaneStorageServiceActorId(),
- requestCounters,
- probe,
- ExtractPermissions(ev, availablePermissions), cloudId, subjectType));
+ {
+ auto sender = ev->Sender;
+ auto cookie = ev->Cookie;
+ auto permissions = ExtractPermissions(ev, availablePermissions);
+ Register(new TRequestActor<FederatedQuery::ListBindingsRequest,
+ TEvControlPlaneStorage::TEvListBindingsRequest,
+ TEvControlPlaneStorage::TEvListBindingsResponse,
+ TEvControlPlaneProxy::TEvListBindingsRequest,
+ TEvControlPlaneProxy::TEvListBindingsResponse>
+ (ev, Config, sender, cookie, scope, folderId,
+ std::move(request), std::move(user), std::move(token),
+ ControlPlaneStorageServiceActorId(),
+ requestCounters,
+ probe,
+ permissions, cloudId, subjectType));
+ }
}
void Handle(TEvControlPlaneProxy::TEvDescribeBindingRequest::TPtr& ev) {
@@ -3118,16 +2444,22 @@ private:
| TPermissions::TPermission::VIEW_PRIVATE
};
- Register(new TRequestActor<FederatedQuery::DescribeBindingRequest,
- TEvControlPlaneStorage::TEvDescribeBindingRequest,
- TEvControlPlaneStorage::TEvDescribeBindingResponse,
- TEvControlPlaneProxy::TEvDescribeBindingResponse>
- (Config, ev->Sender, ev->Cookie, scope, folderId,
- std::move(request), std::move(user), std::move(token),
- ControlPlaneStorageServiceActorId(),
- requestCounters,
- probe,
- ExtractPermissions(ev, availablePermissions), cloudId, subjectType));
+ {
+ auto sender = ev->Sender;
+ auto cookie = ev->Cookie;
+ auto permissions = ExtractPermissions(ev, availablePermissions);
+ Register(new TRequestActor<FederatedQuery::DescribeBindingRequest,
+ TEvControlPlaneStorage::TEvDescribeBindingRequest,
+ TEvControlPlaneStorage::TEvDescribeBindingResponse,
+ TEvControlPlaneProxy::TEvDescribeBindingRequest,
+ TEvControlPlaneProxy::TEvDescribeBindingResponse>
+ (ev, Config, sender, cookie, scope, folderId,
+ std::move(request), std::move(user), std::move(token),
+ ControlPlaneStorageServiceActorId(),
+ requestCounters,
+ probe,
+ permissions, cloudId, subjectType));
+ }
}
void Handle(TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& ev) {
@@ -3191,19 +2523,72 @@ private:
static const TPermissions availablePermissions {
TPermissions::TPermission::MANAGE_PUBLIC
| TPermissions::TPermission::MANAGE_PRIVATE
+ | TPermissions::TPermission::VIEW_PUBLIC
+ | TPermissions::TPermission::VIEW_PRIVATE
};
- Register(new TRequestActor<FederatedQuery::ModifyBindingRequest,
- TEvControlPlaneStorage::TEvModifyBindingRequest,
- TEvControlPlaneStorage::TEvModifyBindingResponse,
- TEvControlPlaneProxy::TEvModifyBindingResponse>
- (Config, ev->Sender, ev->Cookie, scope, folderId,
- std::move(request), std::move(user), std::move(token),
- ControlPlaneStorageServiceActorId(),
- requestCounters,
- probe,
- ExtractPermissions(ev, availablePermissions),
- cloudId, subjectType, {}, std::move(ev->Get()->ComputeDatabase)));
+ if (Config.ComputeConfig.YdbComputeControlPlaneEnabled()) {
+ if (!ev->Get()->OldBindingName) {
+ auto permissions = ExtractPermissions(ev, availablePermissions);
+ Register(MakeDiscoverYDBBindingName(
+ sender, ev, Counters, Config.RequestTimeout, permissions));
+ return;
+ }
+ if (!ev->Get()->ConnectionName) {
+ auto permissions = ExtractPermissions(ev, availablePermissions);
+ Register(MakeDiscoverYDBConnectionName(
+ sender, ev, Counters, Config.RequestTimeout, permissions));
+ return;
+ }
+ }
+
+ const bool controlPlaneYDBOperationWasPerformed = ev->Get()->ControlPlaneYDBOperationWasPerformed;
+ if (!controlPlaneYDBOperationWasPerformed) {
+ auto sender = ev->Sender;
+ auto cookie = ev->Cookie;
+ auto permissions = ExtractPermissions(ev, availablePermissions);
+ auto computeDatabase = ev->Get()->ComputeDatabase;
+ Register(new TRequestActor<FederatedQuery::ModifyBindingRequest,
+ TEvControlPlaneStorage::TEvModifyBindingRequest,
+ TEvControlPlaneStorage::TEvModifyBindingResponse,
+ TEvControlPlaneProxy::TEvModifyBindingRequest,
+ TEvControlPlaneProxy::TEvModifyBindingResponse>
+ (ev, Config, sender, cookie, scope, folderId,
+ std::move(request), std::move(user), std::move(token),
+ ControlPlaneStorageServiceActorId(),
+ requestCounters,
+ probe,
+ permissions,
+ cloudId, subjectType, {}, std::move(computeDatabase), !Config.ComputeConfig.YdbComputeControlPlaneEnabled()));
+ return;
+ }
+
+ if (Config.ComputeConfig.YdbComputeControlPlaneEnabled()) {
+ if (!ev->Get()->YDBSession) {
+ Register(MakeComputeYDBSessionActor<
+ TEvControlPlaneProxy::TEvModifyBindingRequest,
+ TEvControlPlaneProxy::TEvModifyBindingResponse>(
+ sender,
+ std::move(ev),
+ Config.RequestTimeout,
+ Counters.GetCommonCounters(RTC_CREATE_YDB_SESSION),
+ Config.ComputeConfig,
+ YqSharedResources,
+ CredentialsProviderFactory));
+ return;
+ }
+
+ if (!ev->Get()->ComputeYDBOperationWasPerformed) {
+ Register(MakeModifyBindingActor(
+ sender,
+ std::move(ev),
+ Config.RequestTimeout,
+ Counters));
+ return;
+ }
+
+ Send(sender, ev->Get()->Response.release());
+ }
}
void Handle(TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr& ev) {
@@ -3267,19 +2652,62 @@ private:
static const TPermissions availablePermissions {
TPermissions::TPermission::MANAGE_PUBLIC
| TPermissions::TPermission::MANAGE_PRIVATE
+ | TPermissions::TPermission::VIEW_PUBLIC
+ | TPermissions::TPermission::VIEW_PRIVATE
};
- Register(new TRequestActor<FederatedQuery::DeleteBindingRequest,
- TEvControlPlaneStorage::TEvDeleteBindingRequest,
- TEvControlPlaneStorage::TEvDeleteBindingResponse,
- TEvControlPlaneProxy::TEvDeleteBindingResponse>
- (Config, ev->Sender, ev->Cookie, scope, folderId,
- std::move(request), std::move(user), std::move(token),
- ControlPlaneStorageServiceActorId(),
- requestCounters,
- probe,
- ExtractPermissions(ev, availablePermissions),
- cloudId, subjectType, {}, std::move(ev->Get()->ComputeDatabase)));
+ if (Config.ComputeConfig.YdbComputeControlPlaneEnabled() &&
+ !ev->Get()->OldBindingName) {
+ auto permissions = ExtractPermissions(ev, availablePermissions);
+ Register(MakeDiscoverYDBBindingName(
+ sender, ev, Counters, Config.RequestTimeout, permissions));
+ return;
+ }
+
+ const bool controlPlaneYDBOperationWasPerformed = ev->Get()->ControlPlaneYDBOperationWasPerformed;
+ if (!controlPlaneYDBOperationWasPerformed) {
+ auto sender = ev->Sender;
+ auto cookie = ev->Cookie;
+ auto permissions = ExtractPermissions(ev, availablePermissions);
+ auto computeDatabase = ev->Get()->ComputeDatabase;
+ Register(new TRequestActor<FederatedQuery::DeleteBindingRequest,
+ TEvControlPlaneStorage::TEvDeleteBindingRequest,
+ TEvControlPlaneStorage::TEvDeleteBindingResponse,
+ TEvControlPlaneProxy::TEvDeleteBindingRequest,
+ TEvControlPlaneProxy::TEvDeleteBindingResponse>
+ (ev, Config, sender, cookie, scope, folderId,
+ std::move(request), std::move(user), std::move(token),
+ ControlPlaneStorageServiceActorId(),
+ requestCounters,
+ probe,
+ permissions,
+ cloudId, subjectType, {}, std::move(computeDatabase), !Config.ComputeConfig.YdbComputeControlPlaneEnabled()));
+ return;
+ }
+
+ if (Config.ComputeConfig.YdbComputeControlPlaneEnabled()) {
+ if (!ev->Get()->YDBSession) {
+ Register(MakeComputeYDBSessionActor<
+ TEvControlPlaneProxy::TEvDeleteBindingRequest,
+ TEvControlPlaneProxy::TEvDeleteBindingResponse>(
+ sender,
+ std::move(ev),
+ Config.RequestTimeout,
+ Counters.GetCommonCounters(RTC_CREATE_YDB_SESSION),
+ Config.ComputeConfig,
+ YqSharedResources,
+ CredentialsProviderFactory));
+ return;
+ }
+
+ if (!ev->Get()->ComputeYDBOperationWasPerformed) {
+ Register(MakeDeleteBindingActor(
+ sender, std::move(ev), Config.RequestTimeout, Counters));
+ return;
+ }
+
+ Send(sender, ev->Get()->Response.release());
+ }
}
void Handle(NMon::TEvHttpInfo::TPtr& ev) {
diff --git a/ydb/core/fq/libs/control_plane_proxy/events/events.h b/ydb/core/fq/libs/control_plane_proxy/events/events.h
index e027245b724..bde54c6a8a0 100644
--- a/ydb/core/fq/libs/control_plane_proxy/events/events.h
+++ b/ydb/core/fq/libs/control_plane_proxy/events/events.h
@@ -1,5 +1,9 @@
#pragma once
+#include "type_traits.h"
+
+#include <util/generic/maybe.h>
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/core/fq/libs/control_plane_storage/events/events.h>
#include <ydb/core/fq/libs/quota_manager/events/events.h>
@@ -10,6 +14,7 @@
#include <library/cpp/actors/interconnect/events_local.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
+#include <memory>
namespace NFq {
@@ -63,15 +68,20 @@ struct TEvControlPlaneProxy {
static_assert(EvEnd <= YqEventSubspaceEnd(NFq::TYqEventSubspace::ControlPlaneProxy), "All events must be in their subspace");
- template<typename ProtoMessage, ui32 EventType>
- struct TControlPlaneRequest : NActors::TEventLocal<TControlPlaneRequest<ProtoMessage, EventType>, EventType> {
- TControlPlaneRequest(const TString& folderId,
- const ProtoMessage& request,
- const TString& user,
- const TString& token,
- const TVector<TString>& permissions,
- TMaybe<TQuotaMap> quotas = Nothing(),
- TTenantInfo::TPtr tenantInfo = nullptr)
+ template<class Request>
+ struct TResponseSelector;
+
+ template<typename TDerived, typename ProtoMessage, ui32 EventType>
+ struct TBaseControlPlaneRequest : NActors::TEventLocal<TDerived, EventType> {
+ using TProxyResponse = typename TResponseSelector<TDerived>::type;
+
+ TBaseControlPlaneRequest(const TString& folderId,
+ const ProtoMessage& request,
+ const TString& user,
+ const TString& token,
+ const TVector<TString>& permissions,
+ TMaybe<TQuotaMap> quotas = Nothing(),
+ TTenantInfo::TPtr tenantInfo = nullptr)
: FolderId(folderId)
, Request(request)
, User(user)
@@ -80,8 +90,7 @@ struct TEvControlPlaneProxy {
, Quotas(std::move(quotas))
, TenantInfo(tenantInfo)
, ComputeYDBOperationWasPerformed(false)
- {
- }
+ , ControlPlaneYDBOperationWasPerformed(false) { }
TString FolderId;
TString CloudId;
@@ -93,9 +102,15 @@ struct TEvControlPlaneProxy {
TTenantInfo::TPtr TenantInfo;
TString SubjectType;
bool ComputeYDBOperationWasPerformed;
+ bool ControlPlaneYDBOperationWasPerformed;
+ std::unique_ptr<TProxyResponse> Response;
+ TMaybe<NYdb::NTable::TSession> YDBSession;
TMaybe<FederatedQuery::Internal::ComputeDatabaseInternal> ComputeDatabase;
};
+ template<typename ProtoMessage, ui32 EventType>
+ struct TControlPlaneRequest;
+
template<typename TDerived, typename ProtoMessage, ui32 EventType>
struct TControlPlaneResponse : NActors::TEventLocal<TDerived, EventType> {
TControlPlaneResponse(const ProtoMessage& result, const TString& subjectType)
@@ -188,6 +203,168 @@ struct TEvControlPlaneProxy {
using TEvModifyBindingResponse = TControlPlaneAuditableResponse<FederatedQuery::ModifyBindingResult, FederatedQuery::Binding, EvModifyBindingResponse>;
using TEvDeleteBindingRequest = TControlPlaneRequest<FederatedQuery::DeleteBindingRequest, EvDeleteBindingRequest>;
using TEvDeleteBindingResponse = TControlPlaneAuditableResponse<FederatedQuery::DeleteBindingResult, FederatedQuery::Binding, EvDeleteBindingResponse>;
+
+ template<>
+ struct TResponseSelector<TEvCreateQueryRequest> {
+ using type = TEvCreateQueryResponse;
+ };
+ template<>
+ struct TResponseSelector<TEvListQueriesRequest> {
+ using type = TEvListQueriesResponse;
+ };
+ template<>
+ struct TResponseSelector<TEvDescribeQueryRequest> {
+ using type = TEvDescribeQueryResponse;
+ };
+ template<>
+ struct TResponseSelector<TEvGetQueryStatusRequest> {
+ using type = TEvGetQueryStatusResponse;
+ };
+ template<>
+ struct TResponseSelector<TEvModifyQueryRequest> {
+ using type = TEvModifyQueryResponse;
+ };
+ template<>
+ struct TResponseSelector<TEvDeleteQueryRequest> {
+ using type = TEvDeleteQueryResponse;
+ };
+ template<>
+ struct TResponseSelector<TEvControlQueryRequest> {
+ using type = TEvControlQueryResponse;
+ };
+ template<>
+ struct TResponseSelector<TEvGetResultDataRequest> {
+ using type = TEvGetResultDataResponse;
+ };
+ template<>
+ struct TResponseSelector<TEvListJobsRequest> {
+ using type = TEvListJobsResponse;
+ };
+ template<>
+ struct TResponseSelector<TEvDescribeJobRequest> {
+ using type = TEvDescribeJobResponse;
+ };
+ template<>
+ struct TResponseSelector<TEvCreateConnectionRequest> {
+ using type = TEvCreateConnectionResponse;
+ };
+ template<>
+ struct TResponseSelector<TEvListConnectionsRequest> {
+ using type = TEvListConnectionsResponse;
+ };
+ template<>
+ struct TResponseSelector<TEvDescribeConnectionRequest> {
+ using type = TEvDescribeConnectionResponse;
+ };
+ template<>
+ struct TResponseSelector<TEvModifyConnectionRequest> {
+ using type = TEvModifyConnectionResponse;
+ };
+ template<>
+ struct TResponseSelector<TEvDeleteConnectionRequest> {
+ using type = TEvDeleteConnectionResponse;
+ };
+ template<>
+ struct TResponseSelector<TEvTestConnectionRequest> {
+ using type = TEvTestConnectionResponse;
+ };
+ template<>
+ struct TResponseSelector<TEvCreateBindingRequest> {
+ using type = TEvCreateBindingResponse;
+ };
+ template<>
+ struct TResponseSelector<TEvListBindingsRequest> {
+ using type = TEvListBindingsResponse;
+ };
+ template<>
+ struct TResponseSelector<TEvDescribeBindingRequest> {
+ using type = TEvDescribeBindingResponse;
+ };
+ template<>
+ struct TResponseSelector<TEvModifyBindingRequest> {
+ using type = TEvModifyBindingResponse;
+ };
+ template<>
+ struct TResponseSelector<TEvDeleteBindingRequest> {
+ using type = TEvDeleteBindingResponse;
+ };
+
+ template<typename ProtoMessage, ui32 EventType>
+ struct TControlPlaneRequest :
+ public TBaseControlPlaneRequest<TControlPlaneRequest<ProtoMessage, EventType>,
+ ProtoMessage,
+ EventType> {
+ using TBaseControlPlaneRequest<TControlPlaneRequest<ProtoMessage, EventType>,
+ ProtoMessage,
+ EventType>::TBaseControlPlaneRequest;
+ };
+
+ template<>
+ struct TControlPlaneRequest<FederatedQuery::ModifyConnectionRequest, EvModifyConnectionRequest> :
+ public TBaseControlPlaneRequest<TEvModifyConnectionRequest,
+ FederatedQuery::ModifyConnectionRequest,
+ EvModifyConnectionRequest> {
+ using TBaseControlPlaneRequest<
+ TControlPlaneRequest<FederatedQuery::ModifyConnectionRequest, EvModifyConnectionRequest>,
+ FederatedQuery::ModifyConnectionRequest,
+ EvModifyConnectionRequest>::TBaseControlPlaneRequest;
+
+ TMaybe<FederatedQuery::ConnectionContent> OldConnectionContent;
+ };
+
+ template<>
+ struct TControlPlaneRequest<FederatedQuery::DeleteConnectionRequest, EvDeleteConnectionRequest> :
+ public TBaseControlPlaneRequest<TEvDeleteConnectionRequest,
+ FederatedQuery::DeleteConnectionRequest,
+ EvDeleteConnectionRequest> {
+ using TBaseControlPlaneRequest<
+ TControlPlaneRequest<FederatedQuery::DeleteConnectionRequest, EvDeleteConnectionRequest>,
+ FederatedQuery::DeleteConnectionRequest,
+ EvDeleteConnectionRequest>::TBaseControlPlaneRequest;
+
+ TMaybe<FederatedQuery::ConnectionContent> ConnectionContent;
+ };
+
+ template<>
+ struct TControlPlaneRequest<FederatedQuery::CreateBindingRequest, EvCreateBindingRequest> :
+ public TBaseControlPlaneRequest<TEvCreateBindingRequest,
+ FederatedQuery::CreateBindingRequest,
+ EvCreateBindingRequest> {
+ using TBaseControlPlaneRequest<
+ TControlPlaneRequest<FederatedQuery::CreateBindingRequest, EvCreateBindingRequest>,
+ FederatedQuery::CreateBindingRequest,
+ EvCreateBindingRequest>::TBaseControlPlaneRequest;
+
+ TMaybe<TString> ConnectionName;
+ };
+
+ template<>
+ struct TControlPlaneRequest<FederatedQuery::ModifyBindingRequest, EvModifyBindingRequest> :
+ public TBaseControlPlaneRequest<TEvModifyBindingRequest,
+ FederatedQuery::ModifyBindingRequest,
+ EvModifyBindingRequest> {
+ using TBaseControlPlaneRequest<
+ TControlPlaneRequest<FederatedQuery::ModifyBindingRequest, EvModifyBindingRequest>,
+ FederatedQuery::ModifyBindingRequest,
+ EvModifyBindingRequest>::TBaseControlPlaneRequest;
+
+ TMaybe<TString> OldBindingName;
+ TMaybe<TString> ConnectionId;
+ TMaybe<TString> ConnectionName;
+ };
+
+ template<>
+ struct TControlPlaneRequest<FederatedQuery::DeleteBindingRequest, EvDeleteBindingRequest> :
+ public TBaseControlPlaneRequest<TEvDeleteBindingRequest,
+ FederatedQuery::DeleteBindingRequest,
+ EvDeleteBindingRequest> {
+ using TBaseControlPlaneRequest<
+ TControlPlaneRequest<FederatedQuery::DeleteBindingRequest, EvDeleteBindingRequest>,
+ FederatedQuery::DeleteBindingRequest,
+ EvDeleteBindingRequest>::TBaseControlPlaneRequest;
+
+ TMaybe<TString> OldBindingName;
+ };
};
}
diff --git a/ydb/core/fq/libs/control_plane_proxy/events/type_traits.h b/ydb/core/fq/libs/control_plane_proxy/events/type_traits.h
new file mode 100644
index 00000000000..029c34cfcc4
--- /dev/null
+++ b/ydb/core/fq/libs/control_plane_proxy/events/type_traits.h
@@ -0,0 +1,100 @@
+#pragma once
+
+#include <ydb/public/api/protos/draft/fq.pb.h>
+
+namespace NFq {
+namespace NEvControlPlaneProxy {
+
+template<class RequestProtoMessage>
+struct TResponseProtoMessage;
+
+template<>
+struct TResponseProtoMessage<FederatedQuery::CreateQueryRequest> {
+ using type = FederatedQuery::CreateQueryResult;
+};
+template<>
+struct TResponseProtoMessage<FederatedQuery::ListQueriesRequest> {
+ using type = FederatedQuery::ListQueriesResult;
+};
+template<>
+struct TResponseProtoMessage<FederatedQuery::DescribeQueryRequest> {
+ using type = FederatedQuery::DescribeQueryResult;
+};
+template<>
+struct TResponseProtoMessage<FederatedQuery::GetQueryStatusRequest> {
+ using type = FederatedQuery::GetQueryStatusResult;
+};
+template<>
+struct TResponseProtoMessage<FederatedQuery::ModifyQueryRequest> {
+ using type = FederatedQuery::ModifyQueryResult;
+};
+template<>
+struct TResponseProtoMessage<FederatedQuery::DeleteQueryRequest> {
+ using type = FederatedQuery::DeleteQueryResult;
+};
+template<>
+struct TResponseProtoMessage<FederatedQuery::ControlQueryRequest> {
+ using type = FederatedQuery::ControlQueryResult;
+};
+template<>
+struct TResponseProtoMessage<FederatedQuery::GetResultDataRequest> {
+ using type = FederatedQuery::GetResultDataResult;
+};
+template<>
+struct TResponseProtoMessage<FederatedQuery::ListJobsRequest> {
+ using type = FederatedQuery::ListJobsResult;
+};
+template<>
+struct TResponseProtoMessage<FederatedQuery::DescribeJobRequest> {
+ using type = FederatedQuery::DescribeJobResult;
+};
+template<>
+struct TResponseProtoMessage<FederatedQuery::CreateConnectionRequest> {
+ using type = FederatedQuery::CreateConnectionResult;
+};
+template<>
+struct TResponseProtoMessage<FederatedQuery::ListConnectionsRequest> {
+ using type = FederatedQuery::ListConnectionsResult;
+};
+template<>
+struct TResponseProtoMessage<FederatedQuery::DescribeConnectionRequest> {
+ using type = FederatedQuery::DescribeConnectionResult;
+};
+template<>
+struct TResponseProtoMessage<FederatedQuery::ModifyConnectionRequest> {
+ using type = FederatedQuery::ModifyConnectionResult;
+};
+template<>
+struct TResponseProtoMessage<FederatedQuery::DeleteConnectionRequest> {
+ using type = FederatedQuery::DeleteConnectionResult;
+};
+template<>
+struct TResponseProtoMessage<FederatedQuery::TestConnectionRequest> {
+ using type = FederatedQuery::TestConnectionResult;
+};
+template<>
+struct TResponseProtoMessage<FederatedQuery::CreateBindingRequest> {
+ using type = FederatedQuery::CreateBindingResult;
+};
+template<>
+struct TResponseProtoMessage<FederatedQuery::ListBindingsRequest> {
+ using type = FederatedQuery::ListBindingsResult;
+};
+template<>
+struct TResponseProtoMessage<FederatedQuery::DescribeBindingRequest> {
+ using type = FederatedQuery::DescribeBindingResult;
+};
+template<>
+struct TResponseProtoMessage<FederatedQuery::ModifyBindingRequest> {
+ using type = FederatedQuery::ModifyBindingResult;
+};
+template<>
+struct TResponseProtoMessage<FederatedQuery::DeleteBindingRequest> {
+ using type = FederatedQuery::DeleteBindingResult;
+};
+
+template<class Request>
+struct ResponseSelector;
+
+} // namespace NEvControlPlaneProxy
+} // namespace NFq
diff --git a/ydb/core/fq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp b/ydb/core/fq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp
index c0a70b40789..6c36450e35d 100644
--- a/ydb/core/fq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp
+++ b/ydb/core/fq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp
@@ -2468,8 +2468,8 @@ Y_UNIT_TEST_SUITE(TControlPlaneProxyCheckNegativePermissionsSuccess) {
auto event = request->Get<TEvControlPlaneStorage::TEvModifyConnectionRequest>();
auto permissions = event->Permissions;
UNIT_ASSERT_VALUES_EQUAL(event->Scope, "yandexcloud://my_folder");
- UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PUBLIC));
- UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PRIVATE));
+ UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PUBLIC));
+ UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PRIVATE));
UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_AST));
UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PUBLIC));
UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PRIVATE));
@@ -2487,8 +2487,8 @@ Y_UNIT_TEST_SUITE(TControlPlaneProxyCheckNegativePermissionsSuccess) {
auto event = request->Get<TEvControlPlaneStorage::TEvModifyConnectionRequest>();
auto permissions = event->Permissions;
UNIT_ASSERT_VALUES_EQUAL(event->Scope, "yandexcloud://my_folder");
- UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PUBLIC));
- UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PRIVATE));
+ UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PUBLIC));
+ UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PRIVATE));
UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_AST));
UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PUBLIC));
UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PRIVATE));
@@ -2506,8 +2506,8 @@ Y_UNIT_TEST_SUITE(TControlPlaneProxyCheckNegativePermissionsSuccess) {
auto event = request->Get<TEvControlPlaneStorage::TEvDeleteConnectionRequest>();
auto permissions = event->Permissions;
UNIT_ASSERT_VALUES_EQUAL(event->Scope, "yandexcloud://my_folder");
- UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PUBLIC));
- UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PRIVATE));
+ UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PUBLIC));
+ UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PRIVATE));
UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_AST));
UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PUBLIC));
UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PRIVATE));
@@ -2620,8 +2620,8 @@ Y_UNIT_TEST_SUITE(TControlPlaneProxyCheckNegativePermissionsSuccess) {
auto event = request->Get<TEvControlPlaneStorage::TEvModifyBindingRequest>();
auto permissions = event->Permissions;
UNIT_ASSERT_VALUES_EQUAL(event->Scope, "yandexcloud://my_folder");
- UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PUBLIC));
- UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PRIVATE));
+ UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PUBLIC));
+ UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PRIVATE));
UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_AST));
UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PUBLIC));
UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PRIVATE));
@@ -2639,8 +2639,8 @@ Y_UNIT_TEST_SUITE(TControlPlaneProxyCheckNegativePermissionsSuccess) {
auto event = request->Get<TEvControlPlaneStorage::TEvDeleteBindingRequest>();
auto permissions = event->Permissions;
UNIT_ASSERT_VALUES_EQUAL(event->Scope, "yandexcloud://my_folder");
- UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PUBLIC));
- UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PRIVATE));
+ UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PUBLIC));
+ UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PRIVATE));
UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_AST));
UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PUBLIC));
UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PRIVATE));
@@ -2931,8 +2931,8 @@ Y_UNIT_TEST_SUITE(TControlPlaneProxyShouldPassHids) {
auto event = request->Get<TEvControlPlaneStorage::TEvModifyConnectionRequest>();
auto permissions = event->Permissions;
UNIT_ASSERT_VALUES_EQUAL(event->Scope, "yandexcloud://my_folder");
- UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PUBLIC));
- UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PRIVATE));
+ UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PUBLIC));
+ UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PRIVATE));
UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_AST));
UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PUBLIC));
UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PRIVATE));
@@ -2949,8 +2949,8 @@ Y_UNIT_TEST_SUITE(TControlPlaneProxyShouldPassHids) {
auto event = request->Get<TEvControlPlaneStorage::TEvDeleteConnectionRequest>();
auto permissions = event->Permissions;
UNIT_ASSERT_VALUES_EQUAL(event->Scope, "yandexcloud://my_folder");
- UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PUBLIC));
- UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PRIVATE));
+ UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PUBLIC));
+ UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PRIVATE));
UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_AST));
UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PUBLIC));
UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PRIVATE));
@@ -3039,8 +3039,8 @@ Y_UNIT_TEST_SUITE(TControlPlaneProxyShouldPassHids) {
auto event = request->Get<TEvControlPlaneStorage::TEvModifyBindingRequest>();
auto permissions = event->Permissions;
UNIT_ASSERT_VALUES_EQUAL(event->Scope, "yandexcloud://my_folder");
- UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PUBLIC));
- UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PRIVATE));
+ UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PUBLIC));
+ UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PRIVATE));
UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_AST));
UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PUBLIC));
UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PRIVATE));
@@ -3057,8 +3057,8 @@ Y_UNIT_TEST_SUITE(TControlPlaneProxyShouldPassHids) {
auto event = request->Get<TEvControlPlaneStorage::TEvDeleteBindingRequest>();
auto permissions = event->Permissions;
UNIT_ASSERT_VALUES_EQUAL(event->Scope, "yandexcloud://my_folder");
- UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PUBLIC));
- UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PRIVATE));
+ UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PUBLIC));
+ UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PRIVATE));
UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_AST));
UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PUBLIC));
UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PRIVATE));
@@ -3339,7 +3339,7 @@ Y_UNIT_TEST_SUITE(TControlPlaneProxyShouldPassHids) {
auto event = request->Get<TEvControlPlaneStorage::TEvModifyConnectionRequest>();
auto permissions = event->Permissions;
UNIT_ASSERT_VALUES_EQUAL(event->Scope, "yandexcloud://my_folder");
- UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PUBLIC));
+ UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PUBLIC));
UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PRIVATE));
UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_AST));
UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PUBLIC));
@@ -3357,7 +3357,7 @@ Y_UNIT_TEST_SUITE(TControlPlaneProxyShouldPassHids) {
auto event = request->Get<TEvControlPlaneStorage::TEvDeleteConnectionRequest>();
auto permissions = event->Permissions;
UNIT_ASSERT_VALUES_EQUAL(event->Scope, "yandexcloud://my_folder");
- UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PUBLIC));
+ UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PUBLIC));
UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PRIVATE));
UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_AST));
UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PUBLIC));
@@ -3447,7 +3447,7 @@ Y_UNIT_TEST_SUITE(TControlPlaneProxyShouldPassHids) {
auto event = request->Get<TEvControlPlaneStorage::TEvModifyBindingRequest>();
auto permissions = event->Permissions;
UNIT_ASSERT_VALUES_EQUAL(event->Scope, "yandexcloud://my_folder");
- UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PUBLIC));
+ UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PUBLIC));
UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PRIVATE));
UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_AST));
UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PUBLIC));
@@ -3465,7 +3465,7 @@ Y_UNIT_TEST_SUITE(TControlPlaneProxyShouldPassHids) {
auto event = request->Get<TEvControlPlaneStorage::TEvDeleteBindingRequest>();
auto permissions = event->Permissions;
UNIT_ASSERT_VALUES_EQUAL(event->Scope, "yandexcloud://my_folder");
- UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PUBLIC));
+ UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PUBLIC));
UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PRIVATE));
UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_AST));
UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PUBLIC));
diff --git a/ydb/core/fq/libs/control_plane_proxy/ya.make b/ydb/core/fq/libs/control_plane_proxy/ya.make
index ebcb92eadae..44f5c3c6348 100644
--- a/ydb/core/fq/libs/control_plane_proxy/ya.make
+++ b/ydb/core/fq/libs/control_plane_proxy/ya.make
@@ -14,6 +14,7 @@ PEERDIR(
ydb/core/fq/libs/compute/ydb
ydb/core/fq/libs/compute/ydb/control_plane
ydb/core/fq/libs/control_plane_config
+ ydb/core/fq/libs/control_plane_proxy/actors
ydb/core/fq/libs/control_plane_proxy/events
ydb/core/fq/libs/control_plane_storage
ydb/core/fq/libs/rate_limiter/events
@@ -30,6 +31,7 @@ YQL_LAST_ABI_VERSION()
END()
RECURSE(
+ actors
events
)
diff --git a/ydb/core/grpc_services/rpc_fq.cpp b/ydb/core/grpc_services/rpc_fq.cpp
index b68d76d2e0b..1e6c59ea339 100644
--- a/ydb/core/grpc_services/rpc_fq.cpp
+++ b/ydb/core/grpc_services/rpc_fq.cpp
@@ -545,7 +545,10 @@ std::unique_ptr<TEvProxyRuntimeEvent> CreateFederatedQueryModifyConnectionReques
static const std::function permissions{ [](const FederatedQuery::ModifyConnectionRequest& request) -> TVector<NPerms::TPermission> {
TVector<NPerms::TPermission> basePermissions{
NPerms::Required("yq.connections.update"),
- NPerms::Optional("yq.resources.managePrivate")
+ NPerms::Required("yq.connections.get"),
+ NPerms::Optional("yq.resources.managePrivate"),
+ NPerms::Optional("yq.resources.viewPublic"),
+ NPerms::Optional("yq.resources.viewPrivate")
};
if (request.content().acl().visibility() == FederatedQuery::Acl::SCOPE) {
basePermissions.push_back(NPerms::Required("yq.resources.managePublic"));
@@ -560,8 +563,11 @@ std::unique_ptr<TEvProxyRuntimeEvent> CreateFederatedQueryDeleteConnectionReques
static const std::function permissions{ [](const FederatedQuery::DeleteConnectionRequest&) -> TVector<NPerms::TPermission> {
return {
NPerms::Required("yq.connections.delete"),
+ NPerms::Required("yq.connections.get"),
NPerms::Optional("yq.resources.managePublic"),
- NPerms::Optional("yq.resources.managePrivate")
+ NPerms::Optional("yq.resources.managePrivate"),
+ NPerms::Optional("yq.resources.viewPublic"),
+ NPerms::Optional("yq.resources.viewPrivate")
};
} };
@@ -628,8 +634,12 @@ std::unique_ptr<TEvProxyRuntimeEvent> CreateFederatedQueryModifyBindingRequestOp
// so yq.resources.managePublic is always requested as optional
return {
NPerms::Required("yq.bindings.update"),
+ NPerms::Required("yq.bindings.get"),
+ NPerms::Required("yq.connections.get"),
NPerms::Optional("yq.resources.managePrivate"),
- NPerms::Optional("yq.resources.managePublic")
+ NPerms::Optional("yq.resources.managePublic"),
+ NPerms::Optional("yq.resources.viewPublic"),
+ NPerms::Optional("yq.resources.viewPrivate")
};
} };
@@ -640,8 +650,11 @@ std::unique_ptr<TEvProxyRuntimeEvent> CreateFederatedQueryDeleteBindingRequestOp
static const std::function permissions{ [](const FederatedQuery::DeleteBindingRequest&) -> TVector<NPerms::TPermission> {
return {
NPerms::Required("yq.bindings.delete"),
+ NPerms::Required("yq.bindings.get"),
NPerms::Optional("yq.resources.managePublic"),
- NPerms::Optional("yq.resources.managePrivate")
+ NPerms::Optional("yq.resources.managePrivate"),
+ NPerms::Optional("yq.resources.viewPublic"),
+ NPerms::Optional("yq.resources.viewPrivate")
};
} };