aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhcpp <hcpp@ydb.tech>2023-06-29 13:22:51 +0300
committerhcpp <hcpp@ydb.tech>2023-06-29 13:22:51 +0300
commit81ed8b7a026c3cc783d5d3f143f5e742d84076b1 (patch)
treec16c472ee16171f2738e4c987cafe8abf405f176
parent73a2c5edc8221c6b78f738254db6718adbb96f35 (diff)
downloadydb-81ed8b7a026c3cc783d5d3f143f5e742d84076b1.tar.gz
compute database control plane has been added
-rw-r--r--ydb/core/fq/libs/actors/pending_fetcher.cpp8
-rw-r--r--ydb/core/fq/libs/compute/common/config.h31
-rw-r--r--ydb/core/fq/libs/compute/common/run_actor_params.cpp5
-rw-r--r--ydb/core/fq/libs/compute/common/run_actor_params.h5
-rw-r--r--ydb/core/fq/libs/compute/ydb/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/core/fq/libs/compute/ydb/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/fq/libs/compute/ydb/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/core/fq/libs/compute/ydb/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/core/fq/libs/compute/ydb/control_plane/CMakeLists.darwin-x86_64.txt32
-rw-r--r--ydb/core/fq/libs/compute/ydb/control_plane/CMakeLists.linux-aarch64.txt33
-rw-r--r--ydb/core/fq/libs/compute/ydb/control_plane/CMakeLists.linux-x86_64.txt33
-rw-r--r--ydb/core/fq/libs/compute/ydb/control_plane/CMakeLists.txt17
-rw-r--r--ydb/core/fq/libs/compute/ydb/control_plane/CMakeLists.windows-x86_64.txt32
-rw-r--r--ydb/core/fq/libs/compute/ydb/control_plane/cms_grpc_client_actor.cpp116
-rw-r--r--ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp216
-rw-r--r--ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h20
-rw-r--r--ydb/core/fq/libs/compute/ydb/control_plane/ya.make24
-rw-r--r--ydb/core/fq/libs/compute/ydb/control_plane/ydbcp_grpc_client_actor.cpp38
-rw-r--r--ydb/core/fq/libs/compute/ydb/events/CMakeLists.darwin-x86_64.txt23
-rw-r--r--ydb/core/fq/libs/compute/ydb/events/CMakeLists.linux-aarch64.txt24
-rw-r--r--ydb/core/fq/libs/compute/ydb/events/CMakeLists.linux-x86_64.txt24
-rw-r--r--ydb/core/fq/libs/compute/ydb/events/CMakeLists.txt17
-rw-r--r--ydb/core/fq/libs/compute/ydb/events/CMakeLists.windows-x86_64.txt23
-rw-r--r--ydb/core/fq/libs/compute/ydb/events/events.cpp1
-rw-r--r--ydb/core/fq/libs/compute/ydb/events/events.h57
-rw-r--r--ydb/core/fq/libs/compute/ydb/events/ya.make16
-rw-r--r--ydb/core/fq/libs/compute/ydb/executer_actor.cpp12
-rw-r--r--ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp4
-rw-r--r--ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.cpp10
-rw-r--r--ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp15
-rw-r--r--ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp14
-rw-r--r--ydb/core/fq/libs/compute/ydb/stopper_actor.cpp10
-rw-r--r--ydb/core/fq/libs/compute/ydb/ya.make5
-rw-r--r--ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp76
-rw-r--r--ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp24
-rw-r--r--ydb/core/fq/libs/config/protos/compute.proto33
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/CMakeLists.darwin-x86_64.txt4
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-aarch64.txt4
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-x86_64.txt4
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/CMakeLists.windows-x86_64.txt4
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/config.cpp4
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/config.h8
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp274
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.h2
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/events/events.h1
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp1
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/ya.make4
-rw-r--r--ydb/core/fq/libs/control_plane_storage/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/fq/libs/control_plane_storage/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/fq/libs/control_plane_storage/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/fq/libs/control_plane_storage/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/fq/libs/control_plane_storage/events/events.h95
-rw-r--r--ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp3
-rw-r--r--ydb/core/fq/libs/control_plane_storage/probes.h8
-rw-r--r--ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto17
-rw-r--r--ydb/core/fq/libs/control_plane_storage/schema.h1
-rw-r--r--ydb/core/fq/libs/control_plane_storage/ya.make1
-rw-r--r--ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp13
-rw-r--r--ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_compute_database.cpp118
-rw-r--r--ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_impl.h16
-rw-r--r--ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp5
-rw-r--r--ydb/core/fq/libs/events/event_subspace.h1
-rw-r--r--ydb/core/fq/libs/init/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/fq/libs/init/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/fq/libs/init/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/fq/libs/init/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/fq/libs/init/init.cpp14
-rw-r--r--ydb/core/fq/libs/init/ya.make1
-rw-r--r--ydb/core/fq/libs/protos/CMakeLists.darwin-x86_64.txt3
-rw-r--r--ydb/core/fq/libs/protos/CMakeLists.linux-aarch64.txt3
-rw-r--r--ydb/core/fq/libs/protos/CMakeLists.linux-x86_64.txt3
-rw-r--r--ydb/core/fq/libs/protos/CMakeLists.windows-x86_64.txt3
-rw-r--r--ydb/core/fq/libs/protos/fq_private.proto2
-rw-r--r--ydb/core/fq/libs/protos/ya.make3
-rw-r--r--ydb/core/fq/libs/test_connection/events/events.h5
-rw-r--r--ydb/core/fq/libs/ydb/ydb.cpp18
-rw-r--r--ydb/core/fq/libs/ydb/ydb.h22
-rw-r--r--ydb/library/ycloud/impl/grpc_service_client.h9
78 files changed, 1510 insertions, 178 deletions
diff --git a/ydb/core/fq/libs/actors/pending_fetcher.cpp b/ydb/core/fq/libs/actors/pending_fetcher.cpp
index 8def74bf714..0da5ba789a5 100644
--- a/ydb/core/fq/libs/actors/pending_fetcher.cpp
+++ b/ydb/core/fq/libs/actors/pending_fetcher.cpp
@@ -349,6 +349,11 @@ private:
*resources.mutable_topic_consumers() = task.created_topic_consumers();
}
+ NFq::NConfig::TYdbStorageConfig computeConnection = ComputeConfig.GetConnection(task.scope());
+ computeConnection.set_endpoint(task.compute_connection().endpoint());
+ computeConnection.set_database(task.compute_connection().database());
+ computeConnection.set_usessl(task.compute_connection().usessl());
+
TRunActorParams params(
YqSharedResources, CredentialsProviderFactory, S3Gateway,
FunctionRegistry, RandomProvider,
@@ -386,7 +391,8 @@ private:
task.job_id().value(),
resources,
task.execution_id(),
- task.operation_id()
+ task.operation_id(),
+ computeConnection
);
auto runActorId =
diff --git a/ydb/core/fq/libs/compute/common/config.h b/ydb/core/fq/libs/compute/common/config.h
index c2b02c7ea16..ddce2995513 100644
--- a/ydb/core/fq/libs/compute/common/config.h
+++ b/ydb/core/fq/libs/compute/common/config.h
@@ -3,6 +3,10 @@
#include <ydb/core/fq/libs/config/protos/compute.pb.h>
#include <ydb/core/fq/libs/protos/fq_private.pb.h>
+#include <util/generic/algorithm.h>
+
+#include <util/digest/multi.h>
+
namespace NFq {
class TComputeConfig {
@@ -28,6 +32,33 @@ public:
}
return NFq::NConfig::EComputeType::IN_PLACE;
}
+
+ NFq::NConfig::TYdbStorageConfig GetConnection(const TString& scope) const {
+ const auto& controlPlane = ComputeConfig.GetYdb().GetControlPlane();
+ switch (controlPlane.type_case()) {
+ case NConfig::TYdbComputeControlPlane::TYPE_NOT_SET:
+ return {};
+ case NConfig::TYdbComputeControlPlane::kSingle:
+ return controlPlane.GetSingle().GetConnection();
+ case NConfig::TYdbComputeControlPlane::kCms:
+ return GetConnection(scope, controlPlane.GetCms().GetDatabaseMapping());
+ case NConfig::TYdbComputeControlPlane::kYdbcp:
+ return GetConnection(scope, controlPlane.GetYdbcp().GetDatabaseMapping());
+ }
+ }
+
+ NFq::NConfig::TYdbStorageConfig GetConnection(const TString& scope, const ::NFq::NConfig::TDatabaseMapping& databaseMapping) const {
+ auto it = databaseMapping.GetScopeToComputeDatabase().find(scope);
+ if (it != databaseMapping.GetScopeToComputeDatabase().end()) {
+ return it->second.GetConnection();
+ }
+ return databaseMapping.GetCommon().empty() ? NFq::NConfig::TYdbStorageConfig{} : databaseMapping.GetCommon(MultiHash(scope) % databaseMapping.GetCommon().size()).GetConnection();
+ }
+
+ bool YdbComputeControlPlaneEnabled() const {
+ return ComputeConfig.GetYdb().GetEnable() && ComputeConfig.GetYdb().GetControlPlane().GetEnable();
+ }
+
private:
NFq::NConfig::TComputeConfig ComputeConfig;
};
diff --git a/ydb/core/fq/libs/compute/common/run_actor_params.cpp b/ydb/core/fq/libs/compute/common/run_actor_params.cpp
index 21ed96052ef..e0c67efc743 100644
--- a/ydb/core/fq/libs/compute/common/run_actor_params.cpp
+++ b/ydb/core/fq/libs/compute/common/run_actor_params.cpp
@@ -51,7 +51,8 @@ TRunActorParams::TRunActorParams(
const TString& jobId,
const Fq::Private::TaskResources& resources,
const TString& executionId,
- const TString& operationId
+ const TString& operationId,
+ const NFq::NConfig::TYdbStorageConfig& computeConnection
)
: YqSharedResources(yqSharedResources)
, CredentialsProviderFactory(credentialsProviderFactory)
@@ -100,6 +101,7 @@ TRunActorParams::TRunActorParams(
, Resources(resources)
, ExecutionId(executionId)
, OperationId(operationId, true)
+ , ComputeConnection(computeConnection)
{
}
@@ -123,6 +125,7 @@ IOutputStream& operator<<(IOutputStream& out, const TRunActorParams& params) {
<< " Resource.TopicConsumers: " << params.Resources.topic_consumers().size()
<< " ExecutionId: " << params.ExecutionId
<< " OperationId: " << (params.OperationId.GetKind() != Ydb::TOperationId::UNUSED ? ProtoToString(params.OperationId) : "<empty>")
+ << " ComputeConnection: " << params.ComputeConnection.ShortDebugString()
<< " }";
}
diff --git a/ydb/core/fq/libs/compute/common/run_actor_params.h b/ydb/core/fq/libs/compute/common/run_actor_params.h
index b5f0843d601..db5e480dddc 100644
--- a/ydb/core/fq/libs/compute/common/run_actor_params.h
+++ b/ydb/core/fq/libs/compute/common/run_actor_params.h
@@ -3,6 +3,7 @@
#include <ydb/core/fq/libs/config/protos/common.pb.h>
#include <ydb/core/fq/libs/config/protos/fq_config.pb.h>
#include <ydb/core/fq/libs/config/protos/pinger.pb.h>
+#include <ydb/core/fq/libs/config/protos/storage.pb.h>
#include <ydb/core/fq/libs/events/events.h>
#include <ydb/core/fq/libs/shared_resources/shared_resources.h>
@@ -69,7 +70,8 @@ struct TRunActorParams { // TODO2 : Change name
const TString& jobId,
const Fq::Private::TaskResources& resources,
const TString& executionId,
- const TString& operationId
+ const TString& operationId,
+ const NFq::NConfig::TYdbStorageConfig& computeConnection
);
TRunActorParams(const TRunActorParams& params) = default;
@@ -127,6 +129,7 @@ struct TRunActorParams { // TODO2 : Change name
Fq::Private::TaskResources Resources;
TString ExecutionId;
NYdb::TOperation::TOperationId OperationId;
+ NFq::NConfig::TYdbStorageConfig ComputeConnection;
};
} /* NFq */
diff --git a/ydb/core/fq/libs/compute/ydb/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/compute/ydb/CMakeLists.darwin-x86_64.txt
index 0dce7fd471e..a89b1eff250 100644
--- a/ydb/core/fq/libs/compute/ydb/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/fq/libs/compute/ydb/CMakeLists.darwin-x86_64.txt
@@ -6,6 +6,8 @@
# original buildsystem will not be accepted.
+add_subdirectory(control_plane)
+add_subdirectory(events)
add_library(libs-compute-ydb)
target_compile_options(libs-compute-ydb PRIVATE
diff --git a/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-aarch64.txt
index cef154a1ebc..fb3627b9597 100644
--- a/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-aarch64.txt
@@ -6,6 +6,8 @@
# original buildsystem will not be accepted.
+add_subdirectory(control_plane)
+add_subdirectory(events)
add_library(libs-compute-ydb)
target_compile_options(libs-compute-ydb PRIVATE
diff --git a/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-x86_64.txt
index cef154a1ebc..fb3627b9597 100644
--- a/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-x86_64.txt
@@ -6,6 +6,8 @@
# original buildsystem will not be accepted.
+add_subdirectory(control_plane)
+add_subdirectory(events)
add_library(libs-compute-ydb)
target_compile_options(libs-compute-ydb PRIVATE
diff --git a/ydb/core/fq/libs/compute/ydb/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/compute/ydb/CMakeLists.windows-x86_64.txt
index 0dce7fd471e..a89b1eff250 100644
--- a/ydb/core/fq/libs/compute/ydb/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/fq/libs/compute/ydb/CMakeLists.windows-x86_64.txt
@@ -6,6 +6,8 @@
# original buildsystem will not be accepted.
+add_subdirectory(control_plane)
+add_subdirectory(events)
add_library(libs-compute-ydb)
target_compile_options(libs-compute-ydb PRIVATE
diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/compute/ydb/control_plane/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..7b5b602ee1b
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/control_plane/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,32 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(compute-ydb-control_plane)
+target_compile_options(compute-ydb-control_plane PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(compute-ydb-control_plane PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ cpp-actors-protos
+ libs-control_plane_storage-proto
+ libs-quota_manager-proto
+ ydb-core-protos
+ library-db_pool-protos
+ yql-public-issue
+ api-grpc
+ api-grpc-draft
+ lib-operation_id-protos
+)
+target_sources(compute-ydb-control_plane PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/control_plane/cms_grpc_client_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/control_plane/ydbcp_grpc_client_actor.cpp
+)
diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/compute/ydb/control_plane/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..7c699add786
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/control_plane/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,33 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(compute-ydb-control_plane)
+target_compile_options(compute-ydb-control_plane PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(compute-ydb-control_plane PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ cpp-actors-protos
+ libs-control_plane_storage-proto
+ libs-quota_manager-proto
+ ydb-core-protos
+ library-db_pool-protos
+ yql-public-issue
+ api-grpc
+ api-grpc-draft
+ lib-operation_id-protos
+)
+target_sources(compute-ydb-control_plane PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/control_plane/cms_grpc_client_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/control_plane/ydbcp_grpc_client_actor.cpp
+)
diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/compute/ydb/control_plane/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..7c699add786
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/control_plane/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,33 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(compute-ydb-control_plane)
+target_compile_options(compute-ydb-control_plane PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(compute-ydb-control_plane PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ cpp-actors-protos
+ libs-control_plane_storage-proto
+ libs-quota_manager-proto
+ ydb-core-protos
+ library-db_pool-protos
+ yql-public-issue
+ api-grpc
+ api-grpc-draft
+ lib-operation_id-protos
+)
+target_sources(compute-ydb-control_plane PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/control_plane/cms_grpc_client_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/control_plane/ydbcp_grpc_client_actor.cpp
+)
diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/CMakeLists.txt b/ydb/core/fq/libs/compute/ydb/control_plane/CMakeLists.txt
new file mode 100644
index 00000000000..f8b31df0c11
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/control_plane/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/compute/ydb/control_plane/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..7b5b602ee1b
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/control_plane/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,32 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(compute-ydb-control_plane)
+target_compile_options(compute-ydb-control_plane PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(compute-ydb-control_plane PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ cpp-actors-protos
+ libs-control_plane_storage-proto
+ libs-quota_manager-proto
+ ydb-core-protos
+ library-db_pool-protos
+ yql-public-issue
+ api-grpc
+ api-grpc-draft
+ lib-operation_id-protos
+)
+target_sources(compute-ydb-control_plane PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/control_plane/cms_grpc_client_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/control_plane/ydbcp_grpc_client_actor.cpp
+)
diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/cms_grpc_client_actor.cpp b/ydb/core/fq/libs/compute/ydb/control_plane/cms_grpc_client_actor.cpp
new file mode 100644
index 00000000000..0d9233e7367
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/control_plane/cms_grpc_client_actor.cpp
@@ -0,0 +1,116 @@
+#include <ydb/public/api/grpc/ydb_cms_v1.grpc.pb.h>
+
+#include <ydb/core/fq/libs/compute/ydb/events/events.h>
+#include <ydb/core/protos/services.pb.h>
+
+#include <ydb/library/security/ydb_credentials_provider_factory.h>
+#include <ydb/library/ycloud/api/events.h>
+#include <ydb/library/ycloud/impl/grpc_service_client.h>
+
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/event.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/log.h>
+
+#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [CmsGrpcClient]: " << stream)
+#define LOG_W(stream) LOG_WARN_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [CmsGrpcClient]: " << stream)
+#define LOG_I(stream) LOG_INFO_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [CmsGrpcClient]: " << stream)
+#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [CmsGrpcClient]: " << stream)
+#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [CmsGrpcClient]: " << stream)
+
+namespace NFq {
+
+using namespace NActors;
+
+namespace {
+
+struct TEvPrivate {
+ enum EEv {
+ // requests
+ EvCreateDatabaseRequest = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
+
+ // replies
+ EvCreateDatabaseResponse,
+
+ EvEnd
+ };
+
+ static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)");
+
+ struct TEvCreateDatabaseRequest : NCloud::TEvGrpcProtoRequest<TEvCreateDatabaseRequest, EvCreateDatabaseRequest, Ydb::Cms::CreateDatabaseRequest> {};
+ struct TEvCreateDatabaseResponse : NCloud::TEvGrpcProtoResponse<TEvCreateDatabaseResponse, EvCreateDatabaseResponse, Ydb::Cms::CreateDatabaseResponse> {};
+};
+
+}
+
+class TCmsGrpcServiceActor : public NActors::TActor<TCmsGrpcServiceActor>, TGrpcServiceClient<Ydb::Cms::V1::CmsService> {
+public:
+ using TBase = NActors::TActor<TCmsGrpcServiceActor>;
+ struct TCreateDatabaseGrpcRequest : TGrpcRequest {
+ static constexpr auto Request = &Ydb::Cms::V1::CmsService::Stub::AsyncCreateDatabase;
+ using TRequestEventType = TEvPrivate::TEvCreateDatabaseRequest;
+ using TResponseEventType = TEvPrivate::TEvCreateDatabaseResponse;
+ };
+
+ TCmsGrpcServiceActor(const NCloud::TGrpcClientSettings& settings, const NYdb::TCredentialsProviderPtr& credentialsProvider)
+ : TBase(&TCmsGrpcServiceActor::StateFunc)
+ , TGrpcServiceClient(settings)
+ , Settings(settings)
+ , CredentialsProvider(credentialsProvider)
+ {}
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvYdbCompute::TEvCreateDatabaseRequest, Handle);
+ hFunc(TEvPrivate::TEvCreateDatabaseResponse, Handle);
+ )
+
+ void Handle(TEvYdbCompute::TEvCreateDatabaseRequest::TPtr& ev) {
+ const auto& request = *ev.Get()->Get();
+ auto forwardRequest = std::make_unique<TEvPrivate::TEvCreateDatabaseRequest>();
+ forwardRequest->Request.mutable_serverless_resources()->set_shared_database_path(request.BasePath);
+ forwardRequest->Request.set_path(request.Path);
+ forwardRequest->Token = CredentialsProvider->GetAuthInfo();
+ TEvPrivate::TEvCreateDatabaseRequest::TPtr forwardEvent = (NActors::TEventHandle<TEvPrivate::TEvCreateDatabaseRequest>*)new IEventHandle(SelfId(), SelfId(), forwardRequest.release(), 0, Cookie);
+ MakeCall<TCreateDatabaseGrpcRequest>(std::move(forwardEvent));
+ Requests[Cookie++] = ev;
+ }
+
+ void Handle(TEvPrivate::TEvCreateDatabaseResponse::TPtr& ev) {
+ const auto& status = ev->Get()->Status;
+ auto it = Requests.find(ev->Cookie);
+ if (it == Requests.end()) {
+ LOG_E("Request doesn't exist. Need to fix this bug urgently");
+ return;
+ }
+ auto request = it->second;
+ Requests.erase(it);
+
+ auto forwardResponse = std::make_unique<TEvYdbCompute::TEvCreateDatabaseResponse>();
+ if (!status.Ok() && status.GRpcStatusCode != grpc::StatusCode::ALREADY_EXISTS) {
+ forwardResponse->Issues.AddIssue("GrpcCode: " + ToString(status.GRpcStatusCode));
+ forwardResponse->Issues.AddIssue("Message: " + status.Msg);
+ forwardResponse->Issues.AddIssue("Details: " + status.Details);
+ Send(request->Sender, forwardResponse.release(), 0, request->Cookie);
+ return;
+ }
+
+ forwardResponse->Result.set_id(request.Get()->Get()->Path);
+ forwardResponse->Result.mutable_connection()->set_endpoint(Settings.Endpoint);
+ forwardResponse->Result.mutable_connection()->set_database(request.Get()->Get()->Path);
+ forwardResponse->Result.mutable_connection()->set_usessl(Settings.EnableSsl);
+
+ Send(request->Sender, forwardResponse.release(), 0, request->Cookie);
+ }
+
+private:
+ NCloud::TGrpcClientSettings Settings;
+ TMap<uint64_t, TEvYdbCompute::TEvCreateDatabaseRequest::TPtr> Requests;
+ NYdb::TCredentialsProviderPtr CredentialsProvider;
+ int64_t Cookie = 0;
+};
+
+std::unique_ptr<NActors::IActor> CreateCmsGrpcClientActor(const NCloud::TGrpcClientSettings& settings, const NYdb::TCredentialsProviderPtr& credentialsProvider) {
+ return std::make_unique<TCmsGrpcServiceActor>(settings, credentialsProvider);
+}
+
+}
diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp b/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp
new file mode 100644
index 00000000000..1881a49e2a6
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp
@@ -0,0 +1,216 @@
+#include "compute_database_control_plane_service.h"
+
+#include <ydb/core/fq/libs/compute/ydb/events/events.h>
+#include <ydb/core/fq/libs/control_plane_storage/events/events.h>
+#include <ydb/core/fq/libs/config/protos/compute.pb.h>
+#include <ydb/core/fq/libs/control_plane_storage/control_plane_storage.h>
+#include <ydb/core/fq/libs/config/protos/issue_id.pb.h>
+#include <ydb/core/fq/libs/ydb/ydb.h>
+
+#include <ydb/library/security/ydb_credentials_provider_factory.h>
+
+#include <ydb/public/lib/fq/scope.h>
+#include <ydb/public/sdk/cpp/client/draft/ydb_query/client.h>
+#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
+
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/log.h>
+
+#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ComputeDatabaseControlPlane]: " << stream)
+#define LOG_W(stream) LOG_WARN_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ComputeDatabaseControlPlane]: " << stream)
+#define LOG_I(stream) LOG_INFO_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ComputeDatabaseControlPlane]: " << stream)
+#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ComputeDatabaseControlPlane]: " << stream)
+#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ComputeDatabaseControlPlane]: " << stream)
+
+namespace NFq {
+
+using namespace NActors;
+using namespace NFq;
+
+class TCreateDatabaseRequestActor : public NActors::TActorBootstrapped<TCreateDatabaseRequestActor> {
+public:
+ TCreateDatabaseRequestActor(const TActorId& databaseClientActorId, TEvYdbCompute::TEvCreateDatabaseRequest::TPtr& request)
+ : DatabaseClientActorId(databaseClientActorId)
+ , Request(request)
+ {}
+
+ static constexpr char ActorName[] = "FQ_CREATE_DATABASE_REQUEST_ACTOR";
+
+ void Bootstrap() {
+ Send(NFq::ControlPlaneStorageServiceActorId(), new TEvControlPlaneStorage::TEvDescribeDatabaseRequest{Request.Get()->Get()->CloudId, Request.Get()->Get()->Scope});
+ Become(&TCreateDatabaseRequestActor::StateFunc);
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvControlPlaneStorage::TEvDescribeDatabaseResponse, Handle);
+ hFunc(TEvYdbCompute::TEvCreateDatabaseResponse, Handle);
+ hFunc(TEvControlPlaneStorage::TEvCreateDatabaseResponse, Handle);
+ )
+
+ void Handle(TEvControlPlaneStorage::TEvDescribeDatabaseResponse::TPtr& ev) {
+ const auto issues = ev->Get()->Issues;
+ const auto result = ev->Get()->Record;
+
+ if (issues && issues.back().IssueCode == TIssuesIds::ACCESS_DENIED) {
+ Send(DatabaseClientActorId, new TEvYdbCompute::TEvCreateDatabaseRequest{Request->Get()->CloudId, Request->Get()->Scope, Request->Get()->BasePath, Request->Get()->Path});
+ return;
+ }
+
+ if (issues) {
+ FailedAndPassAway(issues);
+ return;
+ }
+
+ Send(Request->Sender, new TEvYdbCompute::TEvCreateDatabaseResponse{result});
+ PassAway();
+ }
+
+ void Handle(TEvYdbCompute::TEvCreateDatabaseResponse::TPtr& ev) {
+ const auto issues = ev->Get()->Issues;
+ if (issues) {
+ FailedAndPassAway(issues);
+ return;
+ }
+
+ Result = ev->Get()->Result;
+ Send(ControlPlaneStorageServiceActorId(), new TEvControlPlaneStorage::TEvCreateDatabaseRequest{Request->Get()->CloudId, Request->Get()->Scope, Result});
+ }
+
+ void Handle(TEvControlPlaneStorage::TEvCreateDatabaseResponse::TPtr& ev) {
+ const auto issues = ev->Get()->Issues;
+ if (issues) {
+ FailedAndPassAway(issues);
+ return;
+ }
+
+ Send(Request->Sender, new TEvYdbCompute::TEvCreateDatabaseResponse{Result});
+ PassAway();
+ }
+
+ void FailedAndPassAway(const NYql::TIssues& issues) {
+ Send(Request->Sender, new TEvYdbCompute::TEvCreateDatabaseResponse{issues});
+ PassAway();
+ }
+
+ TActorId DatabaseClientActorId;
+ TEvYdbCompute::TEvCreateDatabaseRequest::TPtr Request;
+ FederatedQuery::Internal::ComputeDatabaseInternal Result;
+};
+
+class TComputeDatabaseControlPlaneServiceActor : public NActors::TActorBootstrapped<TComputeDatabaseControlPlaneServiceActor> {
+ struct TClientConfig {
+ TActorId ActorId;
+ NConfig::TComputeDatabaseConfig Config;
+ };
+
+public:
+ TComputeDatabaseControlPlaneServiceActor(const NFq::NConfig::TComputeConfig& config, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory)
+ : Config(config)
+ , CredentialsProviderFactory(credentialsProviderFactory)
+ {}
+
+ static constexpr char ActorName[] = "FQ_COMPUTE_DATABASE_SERVICE_ACTOR";
+
+ void Bootstrap() {
+ const auto& controlPlane = Config.GetYdb().GetControlPlane();
+ switch (controlPlane.type_case()) {
+ case NConfig::TYdbComputeControlPlane::TYPE_NOT_SET:
+ case NConfig::TYdbComputeControlPlane::kSingle:
+ break;
+ case NConfig::TYdbComputeControlPlane::kCms:
+ CreateCmsClientActors(controlPlane.GetCms());
+ break;
+ case NConfig::TYdbComputeControlPlane::kYdbcp:
+ CreateControlPlaneClientActors(controlPlane.GetYdbcp());
+ break;
+ }
+ Become(&TComputeDatabaseControlPlaneServiceActor::StateFunc);
+ }
+
+ static NCloud::TGrpcClientSettings CreateGrpcClientSettings(const NConfig::TComputeDatabaseConfig& config) {
+ NCloud::TGrpcClientSettings settings;
+ const auto& connection = config.GetConnection();
+ settings.Endpoint = connection.GetEndpoint();
+ settings.EnableSsl = connection.GetUseSsl();
+ if (connection.GetCertificateFile()) {
+ settings.CertificateRootCA = StripString(TFileInput(connection.GetCertificateFile()).ReadAll());
+ }
+ return settings;
+ }
+
+ void CreateCmsClientActors(const NConfig::TYdbComputeControlPlane::TCms& cmsConfig) {
+ const auto& mapping = cmsConfig.GetDatabaseMapping();
+ for (const auto& config: mapping.GetCommon()) {
+ CommonDatabaseClients.push_back({Register(CreateCmsGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.connection()))->CreateProvider()).release()), config});
+ }
+
+ Y_VERIFY(CommonDatabaseClients);
+
+ for (const auto& [scope, config]: mapping.GetScopeToComputeDatabase()) {
+ ScopeToDatabaseClient[scope] = {Register(CreateCmsGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.connection()))->CreateProvider()).release()), config};
+ }
+ }
+
+ void CreateControlPlaneClientActors(const NConfig::TYdbComputeControlPlane::TYdbcp& controlPlaneConfig) {
+ const auto& mapping = controlPlaneConfig.GetDatabaseMapping();
+ for (const auto& config: mapping.GetCommon()) {
+ CommonDatabaseClients.push_back({Register(CreateYdbcpGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.connection()))->CreateProvider()).release()), config});
+ }
+
+ Y_VERIFY(CommonDatabaseClients);
+
+ for (const auto& [scope, config]: mapping.GetScopeToComputeDatabase()) {
+ ScopeToDatabaseClient[scope] = {Register(CreateYdbcpGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.connection()))->CreateProvider()).release()), config};
+ }
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvYdbCompute::TEvCreateDatabaseRequest, Handle);
+ )
+
+ void Handle(TEvYdbCompute::TEvCreateDatabaseRequest::TPtr& ev) {
+ if (Config.GetYdb().GetControlPlane().HasSingle()) {
+ FederatedQuery::Internal::ComputeDatabaseInternal result;
+ *result.mutable_connection() = Config.GetYdb().GetControlPlane().GetSingle().GetConnection();
+ Send(ev->Sender, new TEvYdbCompute::TEvCreateDatabaseResponse(result));
+ return;
+ }
+
+ const auto& scope = ev->Get()->Scope;
+ auto it = ScopeToDatabaseClient.find(scope);
+ if (it != ScopeToDatabaseClient.end()) {
+ FillRequest(ev, it->second.Config);
+ Register(new TCreateDatabaseRequestActor(it->second.ActorId, ev));
+ return;
+ }
+ const auto& clientConfig = CommonDatabaseClients[MultiHash(scope) % CommonDatabaseClients.size()];
+ FillRequest(ev, clientConfig.Config);
+ Register(new TCreateDatabaseRequestActor(clientConfig.ActorId, ev));
+ }
+
+ void FillRequest(TEvYdbCompute::TEvCreateDatabaseRequest::TPtr& ev, const NConfig::TComputeDatabaseConfig& config) {
+ NYdb::NFq::TScope scope(ev.Get()->Get()->Scope);
+ ev.Get()->Get()->BasePath = config.GetConnection().GetDatabase();
+ ev.Get()->Get()->Path = config.GetTenant() ? config.GetTenant() + "/" + scope.ParseFolder() : scope.ParseFolder();
+ }
+
+private:
+ NFq::NConfig::TComputeConfig Config;
+ TVector<TClientConfig> CommonDatabaseClients;
+ TMap<TString, TClientConfig> ScopeToDatabaseClient;
+ NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory;
+};
+
+std::unique_ptr<NActors::IActor> CreateComputeDatabaseControlPlaneServiceActor(const NFq::NConfig::TComputeConfig& config, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory) {
+ return std::make_unique<TComputeDatabaseControlPlaneServiceActor>(config, credentialsProviderFactory);
+}
+
+NActors::TActorId ComputeDatabaseControlPlaneServiceActorId() {
+ constexpr TStringBuf name = "COMDBSRV";
+ return NActors::TActorId(0, name);
+}
+
+}
diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h b/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h
new file mode 100644
index 00000000000..7b2a13dfcb9
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h
@@ -0,0 +1,20 @@
+#pragma once
+
+#include <ydb/core/fq/libs/config/protos/compute.pb.h>
+
+#include <ydb/library/security/ydb_credentials_provider_factory.h>
+#include <ydb/library/ycloud/impl/grpc_service_settings.h>
+
+#include <library/cpp/actors/core/actor.h>
+
+namespace NFq {
+
+NActors::TActorId ComputeDatabaseControlPlaneServiceActorId();
+
+std::unique_ptr<NActors::IActor> CreateComputeDatabaseControlPlaneServiceActor(const NFq::NConfig::TComputeConfig& config, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory);
+
+std::unique_ptr<NActors::IActor> CreateYdbcpGrpcClientActor(const NCloud::TGrpcClientSettings& settings, const NYdb::TCredentialsProviderPtr& credentialsProvider);
+
+std::unique_ptr<NActors::IActor> CreateCmsGrpcClientActor(const NCloud::TGrpcClientSettings& settings, const NYdb::TCredentialsProviderPtr& credentialsProvider);
+
+}
diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/ya.make b/ydb/core/fq/libs/compute/ydb/control_plane/ya.make
new file mode 100644
index 00000000000..5692cef4366
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/control_plane/ya.make
@@ -0,0 +1,24 @@
+LIBRARY()
+
+SRCS(
+ cms_grpc_client_actor.cpp
+ compute_database_control_plane_service.cpp
+ ydbcp_grpc_client_actor.cpp
+)
+
+PEERDIR(
+ library/cpp/actors/core
+ library/cpp/actors/protos
+ ydb/core/fq/libs/control_plane_storage/proto
+ ydb/core/fq/libs/quota_manager/proto
+ ydb/core/protos
+ ydb/library/db_pool/protos
+ ydb/library/yql/public/issue
+ ydb/public/api/grpc
+ ydb/public/api/grpc/draft
+ ydb/public/lib/operation_id/protos
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/ydbcp_grpc_client_actor.cpp b/ydb/core/fq/libs/compute/ydb/control_plane/ydbcp_grpc_client_actor.cpp
new file mode 100644
index 00000000000..22acd0a55ff
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/control_plane/ydbcp_grpc_client_actor.cpp
@@ -0,0 +1,38 @@
+#include <ydb/core/fq/libs/compute/ydb/events/events.h>
+
+#include <ydb/core/protos/services.pb.h>
+
+#include <ydb/library/security/ydb_credentials_provider_factory.h>
+#include <ydb/library/ycloud/api/events.h>
+#include <ydb/library/ycloud/impl/grpc_service_client.h>
+
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/event.h>
+#include <library/cpp/actors/core/hfunc.h>
+
+namespace NFq {
+
+class TYdbcpGrpcServiceActor : public NActors::TActor<TYdbcpGrpcServiceActor> {
+public:
+ using TBase = NActors::TActor<TYdbcpGrpcServiceActor>;
+ TYdbcpGrpcServiceActor(const NCloud::TGrpcClientSettings&,
+ const NYdb::TCredentialsProviderPtr&)
+ : TBase(&TYdbcpGrpcServiceActor::StateFunc)
+ {}
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvYdbCompute::TEvCreateDatabaseRequest, Handle);
+ )
+
+ void Handle(TEvYdbCompute::TEvCreateDatabaseRequest::TPtr& ev) {
+ auto forwardResponse = std::make_unique<TEvYdbCompute::TEvCreateDatabaseResponse>();
+ forwardResponse->Issues.AddIssue("Ydbcp grpc client hasn't supported yet");
+ Send(ev->Sender, forwardResponse.release(), 0, ev->Cookie);
+ }
+};
+
+std::unique_ptr<NActors::IActor> CreateYdbcpGrpcClientActor(const NCloud::TGrpcClientSettings& settings, const NYdb::TCredentialsProviderPtr& credentialsProvider) {
+ return std::make_unique<TYdbcpGrpcServiceActor>(settings, credentialsProvider);
+}
+
+}
diff --git a/ydb/core/fq/libs/compute/ydb/events/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/compute/ydb/events/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..d42e81090a6
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/events/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,23 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(compute-ydb-events)
+target_link_libraries(compute-ydb-events PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ libs-config-protos
+ libs-control_plane_storage-proto
+ fq-libs-protos
+ api-grpc-draft
+ lib-operation_id-protos
+)
+target_sources(compute-ydb-events PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/events/events.cpp
+)
diff --git a/ydb/core/fq/libs/compute/ydb/events/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/compute/ydb/events/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..086155d27af
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/events/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,24 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(compute-ydb-events)
+target_link_libraries(compute-ydb-events PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ libs-config-protos
+ libs-control_plane_storage-proto
+ fq-libs-protos
+ api-grpc-draft
+ lib-operation_id-protos
+)
+target_sources(compute-ydb-events PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/events/events.cpp
+)
diff --git a/ydb/core/fq/libs/compute/ydb/events/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/compute/ydb/events/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..086155d27af
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/events/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,24 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(compute-ydb-events)
+target_link_libraries(compute-ydb-events PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ libs-config-protos
+ libs-control_plane_storage-proto
+ fq-libs-protos
+ api-grpc-draft
+ lib-operation_id-protos
+)
+target_sources(compute-ydb-events PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/events/events.cpp
+)
diff --git a/ydb/core/fq/libs/compute/ydb/events/CMakeLists.txt b/ydb/core/fq/libs/compute/ydb/events/CMakeLists.txt
new file mode 100644
index 00000000000..f8b31df0c11
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/events/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/fq/libs/compute/ydb/events/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/compute/ydb/events/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..d42e81090a6
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/events/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,23 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(compute-ydb-events)
+target_link_libraries(compute-ydb-events PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ libs-config-protos
+ libs-control_plane_storage-proto
+ fq-libs-protos
+ api-grpc-draft
+ lib-operation_id-protos
+)
+target_sources(compute-ydb-events PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/events/events.cpp
+)
diff --git a/ydb/core/fq/libs/compute/ydb/events/events.cpp b/ydb/core/fq/libs/compute/ydb/events/events.cpp
new file mode 100644
index 00000000000..6c3d2603e7e
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/events/events.cpp
@@ -0,0 +1 @@
+#include "events.h"
diff --git a/ydb/core/fq/libs/compute/ydb/events/events.h b/ydb/core/fq/libs/compute/ydb/events/events.h
index de40ba19cbf..c593c9af0f9 100644
--- a/ydb/core/fq/libs/compute/ydb/events/events.h
+++ b/ydb/core/fq/libs/compute/ydb/events/events.h
@@ -1,23 +1,24 @@
#pragma once
-#include <ydb/core/fq/libs/control_plane_storage/events/events.h>
-#include <ydb/core/fq/libs/quota_manager/events/events.h>
-
-#include <ydb/public/api/protos/draft/fq.pb.h>
+#include <ydb/core/fq/libs/control_plane_storage/proto/yq_internal.pb.h>
+#include <ydb/core/fq/libs/events/event_subspace.h>
+#include <ydb/core/fq/libs/protos/fq_private.pb.h>
#include <ydb/public/sdk/cpp/client/draft/ydb_query/query.h>
+#include <ydb/public/sdk/cpp/client/ydb_types/operation/operation.h>
+#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h>
+
+#include <ydb/library/yql/public/issue/yql_issue.h>
#include <library/cpp/actors/core/event_pb.h>
#include <library/cpp/actors/core/events.h>
#include <library/cpp/actors/interconnect/events_local.h>
-#include <ydb/library/yql/public/issue/yql_issue.h>
-
namespace NFq {
-struct TEvPrivate {
+struct TEvYdbCompute {
// Event ids
enum EEv : ui32 {
- EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
+ EvBegin = YqEventSubspaceBegin(NFq::TYqEventSubspace::YdbCompute),
EvExecuteScriptRequest = EvBegin,
EvExecuteScriptResponse,
@@ -29,6 +30,8 @@ struct TEvPrivate {
EvCancelOperationResponse,
EvForgetOperationRequest,
EvForgetOperationResponse,
+ EvCreateDatabaseRequest,
+ EvCreateDatabaseResponse,
EvExecuterResponse,
EvStatusTrackerResponse,
@@ -160,6 +163,44 @@ struct TEvPrivate {
NYdb::EStatus Status;
};
+ struct TEvCreateDatabaseRequest : public NActors::TEventLocal<TEvCreateDatabaseRequest, EvCreateDatabaseRequest> {
+ TEvCreateDatabaseRequest(const TString& cloudId, const TString& scope)
+ : CloudId(cloudId)
+ , Scope(scope)
+ {}
+
+ TEvCreateDatabaseRequest(const TString& cloudId,
+ const TString& scope,
+ const TString& basePath,
+ const TString& path)
+ : CloudId(cloudId)
+ , Scope(scope)
+ , BasePath(basePath)
+ , Path(path)
+ {}
+
+ TString CloudId;
+ TString Scope;
+ TString BasePath;
+ TString Path;
+ };
+
+ struct TEvCreateDatabaseResponse : public NActors::TEventLocal<TEvCreateDatabaseResponse, EvCreateDatabaseResponse> {
+ TEvCreateDatabaseResponse()
+ {}
+
+ explicit TEvCreateDatabaseResponse(NYql::TIssues issues)
+ : Issues(issues)
+ {}
+
+ TEvCreateDatabaseResponse(const FederatedQuery::Internal::ComputeDatabaseInternal& result)
+ : Result(result)
+ {}
+
+ FederatedQuery::Internal::ComputeDatabaseInternal Result;
+ NYql::TIssues Issues;
+ };
+
struct TEvExecuterResponse : public NActors::TEventLocal<TEvExecuterResponse, EvExecuterResponse> {
TEvExecuterResponse(NYdb::TOperation::TOperationId operationId, const TString& executionId)
: OperationId(operationId)
diff --git a/ydb/core/fq/libs/compute/ydb/events/ya.make b/ydb/core/fq/libs/compute/ydb/events/ya.make
new file mode 100644
index 00000000000..5bc14a3a972
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/events/ya.make
@@ -0,0 +1,16 @@
+LIBRARY()
+
+SRCS(
+ events.cpp
+)
+
+PEERDIR(
+ library/cpp/actors/core
+ ydb/core/fq/libs/config/protos
+ ydb/core/fq/libs/control_plane_storage/proto
+ ydb/core/fq/libs/protos
+ ydb/public/api/grpc/draft
+ ydb/public/lib/operation_id/protos
+)
+
+END()
diff --git a/ydb/core/fq/libs/compute/ydb/executer_actor.cpp b/ydb/core/fq/libs/compute/ydb/executer_actor.cpp
index 563c3151cda..feed9c68d5a 100644
--- a/ydb/core/fq/libs/compute/ydb/executer_actor.cpp
+++ b/ydb/core/fq/libs/compute/ydb/executer_actor.cpp
@@ -77,7 +77,7 @@ public:
}
STRICT_STFUNC(StateFunc,
- hFunc(TEvPrivate::TEvExecuteScriptResponse, Handle);
+ hFunc(TEvYdbCompute::TEvExecuteScriptResponse, Handle);
hFunc(TEvents::TEvForwardPingResponse, Handle);
)
@@ -88,22 +88,22 @@ public:
if (ev.Get()->Get()->Success) {
pingCounters->Ok->Inc();
LOG_I("Information about the operation id and execution id is stored. ExecutionId: " << ExecutionId << " OperationId: " << ProtoToString(OperationId));
- Send(Parent, new TEvPrivate::TEvExecuterResponse(OperationId, ExecutionId));
+ Send(Parent, new TEvYdbCompute::TEvExecuterResponse(OperationId, ExecutionId));
CompleteAndPassAway();
} else {
pingCounters->Error->Inc();
// Without the idempotency key, we lose the running operation here
LOG_E("Error saving information about the operation id and execution id. ExecutionId: " << ExecutionId << " OperationId: " << ProtoToString(OperationId));
- Send(Parent, new TEvPrivate::TEvExecuterResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error saving information about the operation id and execution id. ExecutionId: " << ExecutionId << " OperationId: " << ProtoToString(OperationId)}}));
+ Send(Parent, new TEvYdbCompute::TEvExecuterResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error saving information about the operation id and execution id. ExecutionId: " << ExecutionId << " OperationId: " << ProtoToString(OperationId)}}));
FailedAndPassAway();
}
}
- void Handle(const TEvPrivate::TEvExecuteScriptResponse::TPtr& ev) {
+ void Handle(const TEvYdbCompute::TEvExecuteScriptResponse::TPtr& ev) {
const auto& response = *ev.Get()->Get();
if (response.Status != NYdb::EStatus::SUCCESS) {
LOG_E("Can't execute script: " << ev->Get()->Issues.ToOneLineString());
- Send(Parent, new TEvPrivate::TEvExecuterResponse(ev->Get()->Issues));
+ Send(Parent, new TEvYdbCompute::TEvExecuterResponse(ev->Get()->Issues));
FailedAndPassAway();
return;
}
@@ -114,7 +114,7 @@ public:
}
void SendExecuteScript() {
- Register(new TRetryActor<TEvPrivate::TEvExecuteScriptRequest, TEvPrivate::TEvExecuteScriptResponse, TString, TString>(Counters.GetCounters(ERequestType::RT_EXECUTE_SCRIPT), SelfId(), Connector, Params.Sql, Params.JobId));
+ Register(new TRetryActor<TEvYdbCompute::TEvExecuteScriptRequest, TEvYdbCompute::TEvExecuteScriptResponse, TString, TString>(Counters.GetCounters(ERequestType::RT_EXECUTE_SCRIPT), SelfId(), Connector, Params.Sql, Params.JobId));
}
void SendPingTask() {
diff --git a/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp b/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp
index 137169651c4..41563e65e90 100644
--- a/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp
+++ b/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp
@@ -94,12 +94,12 @@ public:
if (ev.Get()->Get()->Success) {
pingCounters->Ok->Inc();
LOG_I("Query moved to terminal state ");
- Send(Parent, new TEvPrivate::TEvFinalizerResponse({}, NYdb::EStatus::SUCCESS));
+ Send(Parent, new TEvYdbCompute::TEvFinalizerResponse({}, NYdb::EStatus::SUCCESS));
CompleteAndPassAway();
} else {
pingCounters->Error->Inc();
LOG_E("Error moving the query to the terminal state");
- Send(Parent, new TEvPrivate::TEvFinalizerResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error moving the query to the terminal state"}}, NYdb::EStatus::INTERNAL_ERROR));
+ Send(Parent, new TEvYdbCompute::TEvFinalizerResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error moving the query to the terminal state"}}, NYdb::EStatus::INTERNAL_ERROR));
FailedAndPassAway();
}
}
diff --git a/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.cpp b/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.cpp
index 90c0fda4cac..4a7884cb314 100644
--- a/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.cpp
+++ b/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.cpp
@@ -72,23 +72,23 @@ public:
void Start() {
LOG_I("Start resources cleaner actor. Compute state: " << FederatedQuery::QueryMeta::ComputeStatus_Name(Params.Status));
Become(&TResourcesCleanerActor::StateFunc);
- Register(new TRetryActor<TEvPrivate::TEvForgetOperationRequest, TEvPrivate::TEvForgetOperationResponse, NYdb::TOperation::TOperationId>(Counters.GetCounters(ERequestType::RT_FORGET_OPERATION), SelfId(), Connector, OperationId));
+ Register(new TRetryActor<TEvYdbCompute::TEvForgetOperationRequest, TEvYdbCompute::TEvForgetOperationResponse, NYdb::TOperation::TOperationId>(Counters.GetCounters(ERequestType::RT_FORGET_OPERATION), SelfId(), Connector, OperationId));
}
STRICT_STFUNC(StateFunc,
- hFunc(TEvPrivate::TEvForgetOperationResponse, Handle);
+ hFunc(TEvYdbCompute::TEvForgetOperationResponse, Handle);
)
- void Handle(const TEvPrivate::TEvForgetOperationResponse::TPtr& ev) {
+ void Handle(const TEvYdbCompute::TEvForgetOperationResponse::TPtr& ev) {
const auto& response = *ev.Get()->Get();
if (response.Status != NYdb::EStatus::SUCCESS) {
LOG_E("Can't forget operation: " << ev->Get()->Issues.ToOneLineString());
- Send(Parent, new TEvPrivate::TEvResourcesCleanerResponse(ev->Get()->Issues, ev->Get()->Status));
+ Send(Parent, new TEvYdbCompute::TEvResourcesCleanerResponse(ev->Get()->Issues, ev->Get()->Status));
FailedAndPassAway();
return;
}
LOG_I("Operation successfully forgotten");
- Send(Parent, new TEvPrivate::TEvResourcesCleanerResponse({}, NYdb::EStatus::SUCCESS));
+ Send(Parent, new TEvYdbCompute::TEvResourcesCleanerResponse({}, NYdb::EStatus::SUCCESS));
CompleteAndPassAway();
}
diff --git a/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp b/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp
index 04d5f1df558..603ddb62878 100644
--- a/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp
+++ b/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp
@@ -13,6 +13,7 @@
#include <ydb/public/sdk/cpp/client/draft/ydb_query/client.h>
#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
+#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
#include <library/cpp/actors/core/actor.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
@@ -83,7 +84,7 @@ public:
}
STRICT_STFUNC(StateFunc,
- hFunc(TEvPrivate::TEvFetchScriptResultResponse, Handle);
+ hFunc(TEvYdbCompute::TEvFetchScriptResultResponse, Handle);
hFunc(NFq::TEvInternalService::TEvWriteResultResponse, Handle);
hFunc(TEvents::TEvForwardPingResponse, Handle);
)
@@ -95,21 +96,21 @@ public:
if (ev.Get()->Get()->Success) {
pingCounters->Ok->Inc();
LOG_I("The result has been moved");
- Send(Parent, new TEvPrivate::TEvResultWriterResponse({}, NYdb::EStatus::SUCCESS));
+ Send(Parent, new TEvYdbCompute::TEvResultWriterResponse({}, NYdb::EStatus::SUCCESS));
CompleteAndPassAway();
} else {
pingCounters->Error->Inc();
LOG_E("Move result error");
- Send(Parent, new TEvPrivate::TEvResultWriterResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Move result error. ExecutionId: " << ExecutionId}}, NYdb::EStatus::INTERNAL_ERROR));
+ Send(Parent, new TEvYdbCompute::TEvResultWriterResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Move result error. ExecutionId: " << ExecutionId}}, NYdb::EStatus::INTERNAL_ERROR));
FailedAndPassAway();
}
}
- void Handle(const TEvPrivate::TEvFetchScriptResultResponse::TPtr& ev) {
+ void Handle(const TEvYdbCompute::TEvFetchScriptResultResponse::TPtr& ev) {
const auto& response = *ev.Get()->Get();
if (response.Status != NYdb::EStatus::SUCCESS) {
LOG_E("Can't fetch script result: " << ev->Get()->Issues.ToOneLineString());
- Send(Parent, new TEvPrivate::TEvResultWriterResponse(ev->Get()->Issues, NYdb::EStatus::INTERNAL_ERROR));
+ Send(Parent, new TEvYdbCompute::TEvResultWriterResponse(ev->Get()->Issues, NYdb::EStatus::INTERNAL_ERROR));
FailedAndPassAway();
return;
}
@@ -150,7 +151,7 @@ public:
} else {
writeResultCounters->Error->Inc();
LOG_E("Error writing result for offset " << Offset);
- Send(Parent, new TEvPrivate::TEvResultWriterResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error writing result for offset " << Offset}}, NYdb::EStatus::INTERNAL_ERROR));
+ Send(Parent, new TEvYdbCompute::TEvResultWriterResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error writing result for offset " << Offset}}, NYdb::EStatus::INTERNAL_ERROR));
FailedAndPassAway();
}
}
@@ -159,7 +160,7 @@ public:
auto fetchScriptResultCounters = Counters.GetCounters(ERequestType::RT_FETCH_SCRIPT_RESULT);
fetchScriptResultCounters->InFly->Inc();
StartTime = TInstant::Now();
- Register(new TRetryActor<TEvPrivate::TEvFetchScriptResultRequest, TEvPrivate::TEvFetchScriptResultResponse, TString, int64_t, int64_t>(Counters.GetCounters(ERequestType::RT_FETCH_SCRIPT_RESULT), SelfId(), Connector, ExecutionId, 0, Offset));
+ Register(new TRetryActor<TEvYdbCompute::TEvFetchScriptResultRequest, TEvYdbCompute::TEvFetchScriptResultResponse, TString, int64_t, int64_t>(Counters.GetCounters(ERequestType::RT_FETCH_SCRIPT_RESULT), SelfId(), Connector, ExecutionId, 0, Offset));
}
Fq::Private::WriteTaskResultRequest CreateProtoRequestWithoutResultSet(ui64 startRowIndex) {
diff --git a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
index 19f428fe3d0..f062d0671a4 100644
--- a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
+++ b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
@@ -34,7 +34,7 @@ using namespace NFq;
class TStatusTrackerActor : public TBaseComputeActor<TStatusTrackerActor> {
public:
- using IRetryPolicy = IRetryPolicy<const TEvPrivate::TEvGetOperationResponse::TPtr&>;
+ using IRetryPolicy = IRetryPolicy<const TEvYdbCompute::TEvGetOperationResponse::TPtr&>;
enum ERequestType {
RT_GET_OPERATION,
@@ -83,7 +83,7 @@ public:
}
STRICT_STFUNC(StateFunc,
- hFunc(TEvPrivate::TEvGetOperationResponse, Handle);
+ hFunc(TEvYdbCompute::TEvGetOperationResponse, Handle);
hFunc(TEvents::TEvForwardPingResponse, Handle);
)
@@ -94,21 +94,21 @@ public:
if (ev.Get()->Get()->Success) {
pingCounters->Ok->Inc();
LOG_I("Information about the status of operation is stored");
- Send(Parent, new TEvPrivate::TEvStatusTrackerResponse(Issues, Status, ExecStatus));
+ Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(Issues, Status, ExecStatus));
CompleteAndPassAway();
} else {
pingCounters->Error->Inc();
LOG_E("Error saving information about the status of operation");
- Send(Parent, new TEvPrivate::TEvStatusTrackerResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error saving information about the status of operation: " << ProtoToString(OperationId)}}, NYdb::EStatus::INTERNAL_ERROR, ExecStatus));
+ Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error saving information about the status of operation: " << ProtoToString(OperationId)}}, NYdb::EStatus::INTERNAL_ERROR, ExecStatus));
FailedAndPassAway();
}
}
- void Handle(const TEvPrivate::TEvGetOperationResponse::TPtr& ev) {
+ void Handle(const TEvYdbCompute::TEvGetOperationResponse::TPtr& ev) {
const auto& response = *ev.Get()->Get();
if (response.Status != NYdb::EStatus::SUCCESS) {
LOG_E("Can't get operation: " << ev->Get()->Issues.ToOneLineString());
- Send(Parent, new TEvPrivate::TEvStatusTrackerResponse(ev->Get()->Issues, ev->Get()->Status, ExecStatus));
+ Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(ev->Get()->Issues, ev->Get()->Status, ExecStatus));
FailedAndPassAway();
return;
}
@@ -138,7 +138,7 @@ public:
}
void SendGetOperation(const TDuration& delay = TDuration::Zero()) {
- Register(new TRetryActor<TEvPrivate::TEvGetOperationRequest, TEvPrivate::TEvGetOperationResponse, NYdb::TOperation::TOperationId>(Counters.GetCounters(ERequestType::RT_GET_OPERATION), delay, SelfId(), Connector, OperationId));
+ Register(new TRetryActor<TEvYdbCompute::TEvGetOperationRequest, TEvYdbCompute::TEvGetOperationResponse, NYdb::TOperation::TOperationId>(Counters.GetCounters(ERequestType::RT_GET_OPERATION), delay, SelfId(), Connector, OperationId));
}
void Failed() {
diff --git a/ydb/core/fq/libs/compute/ydb/stopper_actor.cpp b/ydb/core/fq/libs/compute/ydb/stopper_actor.cpp
index 59e60c6128b..b7d0596e7ef 100644
--- a/ydb/core/fq/libs/compute/ydb/stopper_actor.cpp
+++ b/ydb/core/fq/libs/compute/ydb/stopper_actor.cpp
@@ -72,23 +72,23 @@ public:
void Start() {
LOG_I("Start stopper actor. Compute state: " << FederatedQuery::QueryMeta::ComputeStatus_Name(Params.Status));
Become(&TStopperActor::StateFunc);
- Register(new TRetryActor<TEvPrivate::TEvCancelOperationRequest, TEvPrivate::TEvCancelOperationResponse, NYdb::TOperation::TOperationId>(Counters.GetCounters(ERequestType::RT_CANCEL_OPERATION), SelfId(), Connector, OperationId));
+ Register(new TRetryActor<TEvYdbCompute::TEvCancelOperationRequest, TEvYdbCompute::TEvCancelOperationResponse, NYdb::TOperation::TOperationId>(Counters.GetCounters(ERequestType::RT_CANCEL_OPERATION), SelfId(), Connector, OperationId));
}
STRICT_STFUNC(StateFunc,
- hFunc(TEvPrivate::TEvCancelOperationResponse, Handle);
+ hFunc(TEvYdbCompute::TEvCancelOperationResponse, Handle);
)
- void Handle(const TEvPrivate::TEvCancelOperationResponse::TPtr& ev) {
+ void Handle(const TEvYdbCompute::TEvCancelOperationResponse::TPtr& ev) {
const auto& response = *ev.Get()->Get();
if (response.Status != NYdb::EStatus::SUCCESS) {
LOG_E("Can't cancel operation: " << ev->Get()->Issues.ToOneLineString());
- Send(Parent, new TEvPrivate::TEvStopperResponse(ev->Get()->Issues, ev->Get()->Status));
+ Send(Parent, new TEvYdbCompute::TEvStopperResponse(ev->Get()->Issues, ev->Get()->Status));
FailedAndPassAway();
return;
}
LOG_I("Operation successfully canceled");
- Send(Parent, new TEvPrivate::TEvStopperResponse({}, NYdb::EStatus::SUCCESS));
+ Send(Parent, new TEvYdbCompute::TEvStopperResponse({}, NYdb::EStatus::SUCCESS));
CompleteAndPassAway();
}
diff --git a/ydb/core/fq/libs/compute/ydb/ya.make b/ydb/core/fq/libs/compute/ydb/ya.make
index c4f65784190..62fba34c4d1 100644
--- a/ydb/core/fq/libs/compute/ydb/ya.make
+++ b/ydb/core/fq/libs/compute/ydb/ya.make
@@ -35,3 +35,8 @@ PEERDIR(
YQL_LAST_ABI_VERSION()
END()
+
+RECURSE(
+ control_plane
+ events
+)
diff --git a/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp
index 50a820d1721..4c76f624c27 100644
--- a/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp
+++ b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp
@@ -1,12 +1,16 @@
+#include <ydb/core/fq/libs/compute/common/run_actor_params.h>
+#include <ydb/core/fq/libs/compute/ydb/events/events.h>
+#include <ydb/core/fq/libs/ydb/ydb.h>
+
+#include <ydb/public/sdk/cpp/client/draft/ydb_query/client.h>
+#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
+
+#include <ydb/library/yql/public/issue/yql_issue_message.h>
+
#include <library/cpp/actors/core/actor.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/actors/core/hfunc.h>
-#include <ydb/core/fq/libs/compute/ydb/events/events.h>
-#include <ydb/core/fq/libs/compute/common/run_actor_params.h>
-#include <ydb/public/sdk/cpp/client/draft/ydb_query/client.h>
-#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
-#include <ydb/core/fq/libs/ydb/ydb.h>
namespace NFq {
@@ -16,61 +20,63 @@ using namespace NFq;
class TYdbConnectorActor : public NActors::TActorBootstrapped<TYdbConnectorActor> {
public:
explicit TYdbConnectorActor(const TRunActorParams& params)
- : Params(params)
+ : YqSharedResources(params.YqSharedResources)
+ , CredentialsProviderFactory(params.CredentialsProviderFactory)
+ , ComputeConnection(params.ComputeConnection)
{}
void Bootstrap() {
- auto querySettings = NFq::GetClientSettings<NYdb::NQuery::TClientSettings>(Params.Config.GetCompute().GetYdb().GetConnection(), Params.CredentialsProviderFactory);
- QueryClient = std::make_unique<NYdb::NQuery::TQueryClient>(Params.YqSharedResources->UserSpaceYdbDriver, querySettings);
- auto operationSettings = NFq::GetClientSettings<NYdb::TCommonClientSettings>(Params.Config.GetCompute().GetYdb().GetConnection(), Params.CredentialsProviderFactory);
- OperationClient = std::make_unique<NYdb::NOperation::TOperationClient>(Params.YqSharedResources->UserSpaceYdbDriver, operationSettings);
+ auto querySettings = NFq::GetClientSettings<NYdb::NQuery::TClientSettings>(ComputeConnection, CredentialsProviderFactory);
+ QueryClient = std::make_unique<NYdb::NQuery::TQueryClient>(YqSharedResources->UserSpaceYdbDriver, querySettings);
+ auto operationSettings = NFq::GetClientSettings<NYdb::TCommonClientSettings>(ComputeConnection, CredentialsProviderFactory);
+ OperationClient = std::make_unique<NYdb::NOperation::TOperationClient>(YqSharedResources->UserSpaceYdbDriver, operationSettings);
Become(&TYdbConnectorActor::StateFunc);
}
STRICT_STFUNC(StateFunc,
- hFunc(TEvPrivate::TEvExecuteScriptRequest, Handle);
- hFunc(TEvPrivate::TEvGetOperationRequest, Handle);
- hFunc(TEvPrivate::TEvFetchScriptResultRequest, Handle);
- hFunc(TEvPrivate::TEvCancelOperationRequest, Handle);
- hFunc(TEvPrivate::TEvForgetOperationRequest, Handle);
+ hFunc(TEvYdbCompute::TEvExecuteScriptRequest, Handle);
+ hFunc(TEvYdbCompute::TEvGetOperationRequest, Handle);
+ hFunc(TEvYdbCompute::TEvFetchScriptResultRequest, Handle);
+ hFunc(TEvYdbCompute::TEvCancelOperationRequest, Handle);
+ hFunc(TEvYdbCompute::TEvForgetOperationRequest, Handle);
cFunc(NActors::TEvents::TEvPoisonPill::EventType, PassAway);
)
- void Handle(const TEvPrivate::TEvExecuteScriptRequest::TPtr& ev) {
+ void Handle(const TEvYdbCompute::TEvExecuteScriptRequest::TPtr& ev) {
QueryClient
->ExecuteScript(ev->Get()->Sql)
.Apply([actorSystem = NActors::TActivationContext::ActorSystem(), recipient = ev->Sender, cookie = ev->Cookie](auto future) {
try {
auto response = future.ExtractValueSync();
if (response.Status().IsSuccess()) {
- actorSystem->Send(recipient, new TEvPrivate::TEvExecuteScriptResponse(response.Id(), response.Metadata().ExecutionId), 0, cookie);
+ actorSystem->Send(recipient, new TEvYdbCompute::TEvExecuteScriptResponse(response.Id(), response.Metadata().ExecutionId), 0, cookie);
} else {
- actorSystem->Send(recipient, new TEvPrivate::TEvExecuteScriptResponse(response.Status().GetIssues(), response.Status().GetStatus()), 0, cookie);
+ actorSystem->Send(recipient, new TEvYdbCompute::TEvExecuteScriptResponse(response.Status().GetIssues(), response.Status().GetStatus()), 0, cookie);
}
} catch (...) {
- actorSystem->Send(recipient, new TEvPrivate::TEvExecuteScriptResponse(NYql::TIssues{NYql::TIssue{CurrentExceptionMessage()}}, NYdb::EStatus::GENERIC_ERROR), 0, cookie);
+ actorSystem->Send(recipient, new TEvYdbCompute::TEvExecuteScriptResponse(NYql::TIssues{NYql::TIssue{CurrentExceptionMessage()}}, NYdb::EStatus::GENERIC_ERROR), 0, cookie);
}
});
}
- void Handle(const TEvPrivate::TEvGetOperationRequest::TPtr& ev) {
+ void Handle(const TEvYdbCompute::TEvGetOperationRequest::TPtr& ev) {
OperationClient
->Get<NYdb::NQuery::TScriptExecutionOperation>(ev->Get()->OperationId)
.Apply([actorSystem = NActors::TActivationContext::ActorSystem(), recipient = ev->Sender, cookie = ev->Cookie](auto future) {
try {
auto response = future.ExtractValueSync();
if (response.Status().IsSuccess()) {
- actorSystem->Send(recipient, new TEvPrivate::TEvGetOperationResponse(response.Metadata().ExecStatus, response.Status().GetIssues()), 0, cookie);
+ actorSystem->Send(recipient, new TEvYdbCompute::TEvGetOperationResponse(response.Metadata().ExecStatus, response.Status().GetIssues()), 0, cookie);
} else {
- actorSystem->Send(recipient, new TEvPrivate::TEvGetOperationResponse(response.Status().GetIssues(), response.Status().GetStatus()), 0, cookie);
+ actorSystem->Send(recipient, new TEvYdbCompute::TEvGetOperationResponse(response.Status().GetIssues(), response.Status().GetStatus()), 0, cookie);
}
} catch (...) {
- actorSystem->Send(recipient, new TEvPrivate::TEvGetOperationResponse(NYql::TIssues{NYql::TIssue{CurrentExceptionMessage()}}, NYdb::EStatus::GENERIC_ERROR), 0, cookie);
+ actorSystem->Send(recipient, new TEvYdbCompute::TEvGetOperationResponse(NYql::TIssues{NYql::TIssue{CurrentExceptionMessage()}}, NYdb::EStatus::GENERIC_ERROR), 0, cookie);
}
});
}
- void Handle(const TEvPrivate::TEvFetchScriptResultRequest::TPtr& ev) {
+ void Handle(const TEvYdbCompute::TEvFetchScriptResultRequest::TPtr& ev) {
NYdb::NQuery::TFetchScriptResultsSettings settings;
settings.RowsOffset(ev->Get()->RowOffset);
QueryClient
@@ -79,44 +85,46 @@ public:
try {
auto response = future.ExtractValueSync();
if (response.IsSuccess()) {
- actorSystem->Send(recipient, new TEvPrivate::TEvFetchScriptResultResponse(response.ExtractResultSet()), 0, cookie);
+ actorSystem->Send(recipient, new TEvYdbCompute::TEvFetchScriptResultResponse(response.ExtractResultSet()), 0, cookie);
} else {
- actorSystem->Send(recipient, new TEvPrivate::TEvFetchScriptResultResponse(response.GetIssues(), response.GetStatus()), 0, cookie);
+ actorSystem->Send(recipient, new TEvYdbCompute::TEvFetchScriptResultResponse(response.GetIssues(), response.GetStatus()), 0, cookie);
}
} catch (...) {
- actorSystem->Send(recipient, new TEvPrivate::TEvFetchScriptResultResponse(NYql::TIssues{NYql::TIssue{CurrentExceptionMessage()}}, NYdb::EStatus::GENERIC_ERROR), 0, cookie);
+ actorSystem->Send(recipient, new TEvYdbCompute::TEvFetchScriptResultResponse(NYql::TIssues{NYql::TIssue{CurrentExceptionMessage()}}, NYdb::EStatus::GENERIC_ERROR), 0, cookie);
}
});
}
- void Handle(const TEvPrivate::TEvCancelOperationRequest::TPtr& ev) {
+ void Handle(const TEvYdbCompute::TEvCancelOperationRequest::TPtr& ev) {
OperationClient
->Cancel(ev->Get()->OperationId)
.Apply([actorSystem = NActors::TActivationContext::ActorSystem(), recipient = ev->Sender, cookie = ev->Cookie](auto future) {
try {
auto response = future.ExtractValueSync();
- actorSystem->Send(recipient, new TEvPrivate::TEvCancelOperationResponse(response.GetIssues(), response.GetStatus()), 0, cookie);
+ actorSystem->Send(recipient, new TEvYdbCompute::TEvCancelOperationResponse(response.GetIssues(), response.GetStatus()), 0, cookie);
} catch (...) {
- actorSystem->Send(recipient, new TEvPrivate::TEvCancelOperationResponse(NYql::TIssues{NYql::TIssue{CurrentExceptionMessage()}}, NYdb::EStatus::GENERIC_ERROR), 0, cookie);
+ actorSystem->Send(recipient, new TEvYdbCompute::TEvCancelOperationResponse(NYql::TIssues{NYql::TIssue{CurrentExceptionMessage()}}, NYdb::EStatus::GENERIC_ERROR), 0, cookie);
}
});
}
- void Handle(const TEvPrivate::TEvForgetOperationRequest::TPtr& ev) {
+ void Handle(const TEvYdbCompute::TEvForgetOperationRequest::TPtr& ev) {
OperationClient
->Forget(ev->Get()->OperationId)
.Apply([actorSystem = NActors::TActivationContext::ActorSystem(), recipient = ev->Sender, cookie = ev->Cookie](auto future) {
try {
auto response = future.ExtractValueSync();
- actorSystem->Send(recipient, new TEvPrivate::TEvForgetOperationResponse(response.GetIssues(), response.GetStatus()), 0, cookie);
+ actorSystem->Send(recipient, new TEvYdbCompute::TEvForgetOperationResponse(response.GetIssues(), response.GetStatus()), 0, cookie);
} catch (...) {
- actorSystem->Send(recipient, new TEvPrivate::TEvForgetOperationResponse(NYql::TIssues{NYql::TIssue{CurrentExceptionMessage()}}, NYdb::EStatus::GENERIC_ERROR), 0, cookie);
+ actorSystem->Send(recipient, new TEvYdbCompute::TEvForgetOperationResponse(NYql::TIssues{NYql::TIssue{CurrentExceptionMessage()}}, NYdb::EStatus::GENERIC_ERROR), 0, cookie);
}
});
}
private:
- TRunActorParams Params;
+ TYqSharedResources::TPtr YqSharedResources;
+ NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory;
+ NConfig::TYdbStorageConfig ComputeConnection;
std::unique_ptr<NYdb::NQuery::TQueryClient> QueryClient;
std::unique_ptr<NYdb::NOperation::TOperationClient> OperationClient;
};
diff --git a/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp b/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp
index c1da57d093c..c00b1b0669e 100644
--- a/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp
+++ b/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp
@@ -61,15 +61,15 @@ public:
}
STRICT_STFUNC(StateFunc,
- hFunc(TEvPrivate::TEvExecuterResponse, Handle);
- hFunc(TEvPrivate::TEvStatusTrackerResponse, Handle);
- hFunc(TEvPrivate::TEvResultWriterResponse, Handle);
- hFunc(TEvPrivate::TEvResourcesCleanerResponse, Handle);
- hFunc(TEvPrivate::TEvFinalizerResponse, Handle);
- hFunc(TEvPrivate::TEvStopperResponse, Handle);
+ hFunc(TEvYdbCompute::TEvExecuterResponse, Handle);
+ hFunc(TEvYdbCompute::TEvStatusTrackerResponse, Handle);
+ hFunc(TEvYdbCompute::TEvResultWriterResponse, Handle);
+ hFunc(TEvYdbCompute::TEvResourcesCleanerResponse, Handle);
+ hFunc(TEvYdbCompute::TEvFinalizerResponse, Handle);
+ hFunc(TEvYdbCompute::TEvStopperResponse, Handle);
)
- void Handle(const TEvPrivate::TEvExecuterResponse::TPtr& ev) {
+ void Handle(const TEvYdbCompute::TEvExecuterResponse::TPtr& ev) {
auto& response = *ev->Get();
if (!response.Success) {
LOG_I("ExecuterResponse (failed). Issues: " << response.Issues.ToOneLineString());
@@ -82,7 +82,7 @@ public:
Register(ActorFactory->CreateStatusTracker(SelfId(), Connector, Pinger, Params.OperationId).release());
}
- void Handle(const TEvPrivate::TEvStatusTrackerResponse::TPtr& ev) {
+ void Handle(const TEvYdbCompute::TEvStatusTrackerResponse::TPtr& ev) {
auto& response = *ev->Get();
if (response.Status != NYdb::EStatus::SUCCESS) {
LOG_I("StatusTrackerResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString());
@@ -98,7 +98,7 @@ public:
}
}
- void Handle(const TEvPrivate::TEvResultWriterResponse::TPtr& ev) {
+ void Handle(const TEvYdbCompute::TEvResultWriterResponse::TPtr& ev) {
auto& response = *ev->Get();
if (response.Status != NYdb::EStatus::SUCCESS) {
LOG_I("ResultWriterResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString());
@@ -109,7 +109,7 @@ public:
Register(ActorFactory->CreateResourcesCleaner(SelfId(), Connector, Params.OperationId).release());
}
- void Handle(const TEvPrivate::TEvResourcesCleanerResponse::TPtr& ev) {
+ void Handle(const TEvYdbCompute::TEvResourcesCleanerResponse::TPtr& ev) {
auto& response = *ev->Get();
if (response.Status != NYdb::EStatus::SUCCESS && response.Status != NYdb::EStatus::UNSUPPORTED) {
LOG_I("ResourcesCleanerResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString());
@@ -120,7 +120,7 @@ public:
Register(ActorFactory->CreateFinalizer(SelfId(), Pinger, ExecStatus).release());
}
- void Handle(const TEvPrivate::TEvFinalizerResponse::TPtr ev) {
+ void Handle(const TEvYdbCompute::TEvFinalizerResponse::TPtr ev) {
auto& response = *ev->Get();
if (response.Status != NYdb::EStatus::SUCCESS) {
LOG_I("FinalizerResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString());
@@ -139,7 +139,7 @@ public:
}
}
- void Handle(const TEvPrivate::TEvStopperResponse::TPtr& ev) {
+ void Handle(const TEvYdbCompute::TEvStopperResponse::TPtr& ev) {
auto& response = *ev->Get();
if (response.Status != NYdb::EStatus::SUCCESS) {
LOG_I("StopperResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString());
diff --git a/ydb/core/fq/libs/config/protos/compute.proto b/ydb/core/fq/libs/config/protos/compute.proto
index 90ece7388c7..385120d3ff4 100644
--- a/ydb/core/fq/libs/config/protos/compute.proto
+++ b/ydb/core/fq/libs/config/protos/compute.proto
@@ -12,9 +12,40 @@ import "ydb/public/api/protos/draft/fq.proto";
message TInPlaceCompute {
}
+message TComputeDatabaseConfig {
+ TYdbStorageConfig Connection = 1;
+ string Tenant = 2;
+}
+
+message TDatabaseMapping {
+ repeated TComputeDatabaseConfig Common = 1;
+ map<string, TComputeDatabaseConfig> ScopeToComputeDatabase = 2;
+}
+
+message TYdbComputeControlPlane {
+ message TSingle {
+ TYdbStorageConfig Connection = 1;
+ }
+
+ message TCms {
+ TDatabaseMapping DatabaseMapping = 1;
+ }
+
+ message TYdbcp {
+ TDatabaseMapping DatabaseMapping = 2;
+ }
+
+ bool Enable = 1;
+ oneof type {
+ TSingle Single = 2;
+ TCms Cms = 3;
+ TYdbcp Ydbcp = 4;
+ }
+}
+
message TYdbCompute {
bool Enable = 1;
- TYdbStorageConfig Connection = 2;
+ TYdbComputeControlPlane ControlPlane = 2;
}
enum EComputeType {
diff --git a/ydb/core/fq/libs/control_plane_proxy/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/control_plane_proxy/CMakeLists.darwin-x86_64.txt
index 1fb66e6dbd0..e8fc9d59713 100644
--- a/ydb/core/fq/libs/control_plane_proxy/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/fq/libs/control_plane_proxy/CMakeLists.darwin-x86_64.txt
@@ -18,8 +18,10 @@ target_link_libraries(fq-libs-control_plane_proxy PUBLIC
yutil
cpp-actors-core
ydb-core-base
- libs-actors-logging
fq-libs-actors
+ libs-actors-logging
+ libs-compute-ydb
+ compute-ydb-control_plane
fq-libs-control_plane_config
libs-control_plane_proxy-events
fq-libs-control_plane_storage
diff --git a/ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-aarch64.txt
index 8ac43676221..ef702e4856c 100644
--- a/ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-aarch64.txt
@@ -19,8 +19,10 @@ target_link_libraries(fq-libs-control_plane_proxy PUBLIC
yutil
cpp-actors-core
ydb-core-base
- libs-actors-logging
fq-libs-actors
+ libs-actors-logging
+ libs-compute-ydb
+ compute-ydb-control_plane
fq-libs-control_plane_config
libs-control_plane_proxy-events
fq-libs-control_plane_storage
diff --git a/ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-x86_64.txt
index 8ac43676221..ef702e4856c 100644
--- a/ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/fq/libs/control_plane_proxy/CMakeLists.linux-x86_64.txt
@@ -19,8 +19,10 @@ target_link_libraries(fq-libs-control_plane_proxy PUBLIC
yutil
cpp-actors-core
ydb-core-base
- libs-actors-logging
fq-libs-actors
+ libs-actors-logging
+ libs-compute-ydb
+ compute-ydb-control_plane
fq-libs-control_plane_config
libs-control_plane_proxy-events
fq-libs-control_plane_storage
diff --git a/ydb/core/fq/libs/control_plane_proxy/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/control_plane_proxy/CMakeLists.windows-x86_64.txt
index 1fb66e6dbd0..e8fc9d59713 100644
--- a/ydb/core/fq/libs/control_plane_proxy/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/fq/libs/control_plane_proxy/CMakeLists.windows-x86_64.txt
@@ -18,8 +18,10 @@ target_link_libraries(fq-libs-control_plane_proxy PUBLIC
yutil
cpp-actors-core
ydb-core-base
- libs-actors-logging
fq-libs-actors
+ libs-actors-logging
+ libs-compute-ydb
+ compute-ydb-control_plane
fq-libs-control_plane_config
libs-control_plane_proxy-events
fq-libs-control_plane_storage
diff --git a/ydb/core/fq/libs/control_plane_proxy/config.cpp b/ydb/core/fq/libs/control_plane_proxy/config.cpp
index 0a2a63fe298..37ad16e8365 100644
--- a/ydb/core/fq/libs/control_plane_proxy/config.cpp
+++ b/ydb/core/fq/libs/control_plane_proxy/config.cpp
@@ -42,8 +42,4 @@ TControlPlaneProxyConfig::TControlPlaneProxyConfig(
, ConfigRetryPeriod(
GetDuration(Proto.GetConfigRetryPeriod(), TDuration::MilliSeconds(100))) { }
-bool TControlPlaneProxyConfig::IsYDBComputeEngineEnabled() const {
- return ComputeConfig.HasYdb() && ComputeConfig.GetYdb().GetEnable();
-}
-
} // NFq
diff --git a/ydb/core/fq/libs/control_plane_proxy/config.h b/ydb/core/fq/libs/control_plane_proxy/config.h
index 32f09af9e4c..ae98bb989bc 100644
--- a/ydb/core/fq/libs/control_plane_proxy/config.h
+++ b/ydb/core/fq/libs/control_plane_proxy/config.h
@@ -1,7 +1,8 @@
#pragma once
-#include "ydb/core/fq/libs/config/protos/common.pb.h"
-#include "ydb/core/fq/libs/config/protos/compute.pb.h"
+#include <ydb/core/fq/libs/compute/common/config.h>
+#include <ydb/core/fq/libs/config/protos/common.pb.h>
+#include <ydb/core/fq/libs/config/protos/compute.pb.h>
#include <ydb/core/fq/libs/config/protos/control_plane_proxy.pb.h>
#include <util/datetime/base.h>
@@ -10,7 +11,7 @@ namespace NFq {
struct TControlPlaneProxyConfig {
NConfig::TControlPlaneProxyConfig Proto;
- NConfig::TComputeConfig ComputeConfig;
+ TComputeConfig ComputeConfig;
NConfig::TCommonConfig CommonConfig;
TDuration RequestTimeout;
TDuration MetricsTtl;
@@ -20,7 +21,6 @@ struct TControlPlaneProxyConfig {
const NConfig::TControlPlaneProxyConfig& config,
const NConfig::TComputeConfig& computeConfig,
const NConfig::TCommonConfig& commonConfig);
- bool IsYDBComputeEngineEnabled() const;
};
} // NFq
diff --git a/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp b/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp
index 617a425fff4..c01bf40ac2f 100644
--- a/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp
+++ b/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp
@@ -5,6 +5,8 @@
#include <ydb/core/fq/libs/actors/logging/log.h>
#include <ydb/core/fq/libs/common/cache.h>
+#include <ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h>
+#include <ydb/core/fq/libs/compute/ydb/events/events.h>
#include <ydb/core/fq/libs/control_plane_config/control_plane_config.h>
#include <ydb/core/fq/libs/control_plane_storage/control_plane_storage.h>
#include <ydb/core/fq/libs/control_plane_storage/events/events.h>
@@ -423,19 +425,21 @@ class TCreateConnectionInYDBActor :
ui32 Cookie;
TDuration RequestTimeout;
TInstant StartTime;
+ TString Scope;
TTableClientPtr TableClient;
TString ObjectStorageEndpoint;
public:
TCreateConnectionInYDBActor(
const TRequestCommonCountersPtr& counters,
- const NConfig::TYdbCompute& ydbComputeConfig,
+ const NFq::TComputeConfig& computeConfig,
const TYqSharedResources::TPtr& yqSharedResources,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
const TString& objectStorageEndpoint,
TActorId sender,
TEventRequest event,
ui32 cookie,
+ const TString& scope,
TDuration requestTimeout)
: Sender(sender)
, Counters(counters)
@@ -443,8 +447,9 @@ public:
, Cookie(cookie)
, RequestTimeout(requestTimeout)
, StartTime(TInstant::Now())
+ , Scope(scope)
, TableClient(CreateNewTableClient(
- ydbComputeConfig, yqSharedResources, credentialsProviderFactory))
+ computeConfig, yqSharedResources, credentialsProviderFactory))
, ObjectStorageEndpoint(objectStorageEndpoint) { }
static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_CREATE_CONNECTION_IN_YDB";
@@ -593,14 +598,20 @@ public:
}
private:
- static TTableClientPtr CreateNewTableClient(
- const NConfig::TYdbCompute& ydbComputeConfig,
+ TTableClientPtr CreateNewTableClient(
+ const NFq::TComputeConfig& computeConfig,
const TYqSharedResources::TPtr& yqSharedResources,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory) {
- auto tableSettigns = GetClientSettings<NYdb::NTable::TClientSettings>(
- ydbComputeConfig.connection(), credentialsProviderFactory);
+
+ NFq::NConfig::TYdbStorageConfig computeConnection = computeConfig.GetConnection(Scope);
+ computeConnection.set_endpoint(Event->Get()->ComputeDatabase->connection().endpoint());
+ computeConnection.set_database(Event->Get()->ComputeDatabase->connection().database());
+ computeConnection.set_usessl(Event->Get()->ComputeDatabase->connection().usessl());
+
+ auto tableSettings = GetClientSettings<NYdb::NTable::TClientSettings>(
+ computeConnection, credentialsProviderFactory);
return std::make_unique<NYdb::NTable::TTableClient>(
- yqSharedResources->UserSpaceYdbDriver, tableSettigns);
+ yqSharedResources->UserSpaceYdbDriver, tableSettings);
}
};
@@ -655,19 +666,21 @@ class TCreateBindingInYDBActor :
TInstant StartTime;
TPermissions Permissions;
TDuration RequestTimeout;
+ TString Scope;
TTableClientPtr TableClient;
TString ConnectionName;
public:
TCreateBindingInYDBActor(
const TRequestCommonCountersPtr& counters,
- const NConfig::TYdbCompute& ydbComputeConfig,
+ const NFq::TComputeConfig& computeConfig,
const TYqSharedResources::TPtr& yqSharedResources,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
TActorId sender,
TEventRequest event,
ui32 cookie,
TPermissions permissions,
+ const TString& scope,
TDuration requestTimeout)
: Sender(sender)
, Counters(counters)
@@ -676,8 +689,9 @@ public:
, StartTime(TInstant::Now())
, Permissions(std::move(permissions))
, RequestTimeout(requestTimeout)
+ , Scope(scope)
, TableClient(CreateNewTableClient(
- ydbComputeConfig, yqSharedResources, credentialsProviderFactory)) { }
+ computeConfig, yqSharedResources, credentialsProviderFactory)) { }
static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_CREATE_BINDING_IN_YDB";
@@ -710,14 +724,15 @@ public:
"Create external table in YDB. Resolving connection id. Actor id: "
<< SelfId() << " connection_id: " << connectionId);
auto event = new TEvControlPlaneStorage::TEvDescribeConnectionRequest(
- "yandexcloud://" + Event->Get()->FolderId,
+ Scope,
request,
Event->Get()->User,
Event->Get()->Token,
Event->Get()->CloudId,
Permissions,
Event->Get()->Quotas,
- Event->Get()->TenantInfo);
+ Event->Get()->TenantInfo,
+ {});
Send(ControlPlaneStorageServiceActorId(), event);
}
@@ -955,14 +970,20 @@ public:
}
private:
- static TTableClientPtr CreateNewTableClient(
- const NConfig::TYdbCompute& ydbComputeConfig,
+ TTableClientPtr CreateNewTableClient(
+ const NFq::TComputeConfig& computeConfig,
const TYqSharedResources::TPtr& yqSharedResources,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory) {
- auto tableSettigns = GetClientSettings<NYdb::NTable::TClientSettings>(
- ydbComputeConfig.connection(), credentialsProviderFactory);
+
+ NFq::NConfig::TYdbStorageConfig computeConnection = computeConfig.GetConnection(Scope);
+ computeConnection.set_endpoint(Event->Get()->ComputeDatabase->connection().endpoint());
+ computeConnection.set_database(Event->Get()->ComputeDatabase->connection().database());
+ computeConnection.set_usessl(Event->Get()->ComputeDatabase->connection().usessl());
+
+ auto tableSettings = GetClientSettings<NYdb::NTable::TClientSettings>(
+ computeConnection, credentialsProviderFactory);
return std::make_unique<NYdb::NTable::TTableClient>(
- yqSharedResources->UserSpaceYdbDriver, tableSettigns);
+ yqSharedResources->UserSpaceYdbDriver, tableSettings);
}
};
@@ -1061,7 +1082,6 @@ public:
NYql::TIssues issues;
NYql::TIssue issue = MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, "Resolve folder error");
issues.AddIssue(issue);
- Counters->Error->Inc();
const TDuration delta = TInstant::Now() - StartTime;
Probe(delta, false, false);
Send(Sender, new TResponseProxy(issues, {}), 0, Cookie);
@@ -1092,6 +1112,104 @@ private:
}
};
+template<class TEventRequest, class TResponseProxy>
+class TCreateComputeDatabaseActor : public NActors::TActorBootstrapped<TCreateComputeDatabaseActor<TEventRequest, TResponseProxy>> {
+ using TBase = NActors::TActorBootstrapped<TCreateComputeDatabaseActor<TEventRequest, TResponseProxy>>;
+ using TBase::SelfId;
+ using TBase::Send;
+ using TBase::PassAway;
+ using TBase::Become;
+ using TBase::Register;
+
+ ::NFq::TControlPlaneProxyConfig Config;
+ ::NFq::TComputeConfig ComputeConfig;
+ TActorId Sender;
+ TRequestCommonCountersPtr Counters;
+ TString CloudId;
+ TString FolderId;
+ TString Scope;
+ TString Token;
+ std::function<void(const TDuration&, bool, bool)> Probe;
+ TEventRequest Event;
+ ui32 Cookie;
+ TInstant StartTime;
+
+public:
+ TCreateComputeDatabaseActor(const TRequestCommonCountersPtr& counters,
+ TActorId sender, const ::NFq::TControlPlaneProxyConfig& config,
+ const ::NFq::TComputeConfig& computeConfig, const TString& cloudId,
+ const TString& folderId, const TString& scope,
+ const std::function<void(const TDuration&, bool, bool)>& probe,
+ TEventRequest event, ui32 cookie)
+ : Config(config)
+ , ComputeConfig(computeConfig)
+ , Sender(sender)
+ , Counters(counters)
+ , CloudId(cloudId)
+ , FolderId(folderId)
+ , Scope(scope)
+ , Probe(probe)
+ , Event(event)
+ , Cookie(cookie)
+ , StartTime(TInstant::Now())
+ {}
+
+ static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_CREATE_DATABASE";
+
+ void Bootstrap() {
+ CPP_LOG_T("Create database bootstrap. CloudId: " << CloudId << " FolderId: " << FolderId << " Scope: " << Scope << " Actor id: " << SelfId());
+ if (!ComputeConfig.YdbComputeControlPlaneEnabled()) {
+ Event->Get()->ComputeDatabase = FederatedQuery::Internal::ComputeDatabaseInternal{};
+ TActivationContext::Send(Event->Forward(ControlPlaneProxyActorId()));
+ PassAway();
+ return;
+ }
+ Become(&TCreateComputeDatabaseActor::StateFunc, Config.RequestTimeout, new NActors::TEvents::TEvWakeup());
+ Counters->InFly->Inc();
+ Send(NFq::ComputeDatabaseControlPlaneServiceActorId(), CreateRequest().release(), 0, 0);
+ }
+
+ std::unique_ptr<TEvYdbCompute::TEvCreateDatabaseRequest> CreateRequest() {
+ return std::make_unique<TEvYdbCompute::TEvCreateDatabaseRequest>(CloudId, Scope);
+ }
+
+ STRICT_STFUNC(StateFunc,
+ cFunc(NActors::TEvents::TSystem::Wakeup, HandleTimeout);
+ hFunc(TEvYdbCompute::TEvCreateDatabaseResponse, Handle);
+ )
+
+ void HandleTimeout() {
+ CPP_LOG_D("Create database timeout. CloudId: " << CloudId << " FolderId: " << FolderId << " Scope: " << Scope << " Actor id: " << SelfId());
+ NYql::TIssues issues;
+ NYql::TIssue issue = MakeErrorIssue(TIssuesIds::TIMEOUT, "Create database: request timeout. Try repeating the request later");
+ issues.AddIssue(issue);
+ Counters->Error->Inc();
+ Counters->Timeout->Inc();
+ const TDuration delta = TInstant::Now() - StartTime;
+ Probe(delta, false, true);
+ Send(Sender, new TResponseProxy(issues, {}), 0, Cookie);
+ PassAway();
+ }
+
+ void Handle(TEvYdbCompute::TEvCreateDatabaseResponse::TPtr& ev) {
+ Counters->InFly->Dec();
+ Counters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds());
+ if (ev->Get()->Issues) {
+ Counters->Error->Inc();
+ CPP_LOG_E(ev->Get()->Issues.ToOneLineString());
+ const TDuration delta = TInstant::Now() - StartTime;
+ Probe(delta, false, false);
+ Send(Sender, new TResponseProxy(ev->Get()->Issues, {}), 0, Cookie);
+ PassAway();
+ return;
+ }
+ Counters->Ok->Inc();
+ Event->Get()->ComputeDatabase = ev->Get()->Result;
+ TActivationContext::Send(Event->Forward(ControlPlaneProxyActorId()));
+ PassAway();
+ }
+};
+
template<class TRequestProto, class TRequest, class TResponse, class TResponseProxy>
class TRequestActor : public NActors::TActorBootstrapped<TRequestActor<TRequestProto, TRequest, TResponse, TResponseProxy>> {
protected:
@@ -1119,6 +1237,7 @@ protected:
TString SubjectType;
const TMaybe<TQuotaMap> Quotas;
TTenantInfo::TPtr TenantInfo;
+ TMaybe<FederatedQuery::Internal::ComputeDatabaseInternal> ComputeDatabase;
ui32 RetryCount = 0;
public:
@@ -1131,7 +1250,8 @@ public:
const TRequestCounters& counters,
const std::function<void(const TDuration&, bool, bool)>& probe,
TPermissions permissions,
- const TString& cloudId, const TString& subjectType, TMaybe<TQuotaMap>&& quotas = Nothing())
+ const TString& cloudId, const TString& subjectType, TMaybe<TQuotaMap>&& quotas = Nothing(),
+ TMaybe<FederatedQuery::Internal::ComputeDatabaseInternal>&& computeDatabase = Nothing())
: Config(config)
, RequestProto(std::forward<TRequestProto>(requestProto))
, Scope(scope)
@@ -1148,6 +1268,7 @@ public:
, CloudId(cloudId)
, SubjectType(subjectType)
, Quotas(std::move(quotas))
+ , ComputeDatabase(std::move(computeDatabase))
{
Counters.IncInFly();
}
@@ -1239,7 +1360,7 @@ public:
void SendRequestIfCan() {
if (CanSendRequest()) {
- Send(ServiceId, new TRequest(Scope, RequestProto, User, Token, CloudId, Permissions, Quotas, TenantInfo), 0, Cookie);
+ Send(ServiceId, new TRequest(Scope, RequestProto, User, Token, CloudId, Permissions, Quotas, TenantInfo, ComputeDatabase.GetOrElse({})), 0, Cookie);
}
}
@@ -1366,6 +1487,7 @@ class TControlPlaneProxyActor : public NActors::TActorBootstrapped<TControlPlane
RTC_RESOLVE_SUBJECT_TYPE,
RTC_CREATE_CONNECTION_IN_YDB,
RTC_CREATE_BINDING_IN_YDB,
+ RTC_CREATE_COMPUTE_DATABASE,
RTC_MAX,
};
@@ -1414,7 +1536,8 @@ class TControlPlaneProxyActor : public NActors::TActorBootstrapped<TControlPlane
{ MakeIntrusive<TRequestCommonCounters>("DeleteBinding") },
{ MakeIntrusive<TRequestCommonCounters>("ResolveSubjectType") },
{ MakeIntrusive<TRequestCommonCounters>("CreateConnectionInYDB") },
- { MakeIntrusive<TRequestCommonCounters>("CreateBindingInYDB") }
+ { MakeIntrusive<TRequestCommonCounters>("CreateBindingInYDB") },
+ { MakeIntrusive<TRequestCommonCounters>("CreateComputeDatabase") },
});
TTtlCache<TMetricsScope, TScopeCountersPtr, TMap> ScopeCounters{TTtlCacheSettings{}.SetTtl(TDuration::Days(1))};
@@ -1487,6 +1610,7 @@ class TControlPlaneProxyActor : public NActors::TActorBootstrapped<TControlPlane
const TYqSharedResources::TPtr YqSharedResources;
const NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory;
const bool QuotaManagerEnabled;
+ NConfig::TComputeConfig ComputeConfig;
TActorId AccessService;
public:
@@ -1647,17 +1771,28 @@ private:
return;
}
+ if (!ev->Get()->ComputeDatabase) {
+ Register(new TCreateComputeDatabaseActor<TEvControlPlaneProxy::TEvCreateQueryRequest::TPtr,
+ TEvControlPlaneProxy::TEvCreateQueryResponse>
+ (Counters.GetCommonCounters(RTC_CREATE_COMPUTE_DATABASE),
+ sender, Config, Config.ComputeConfig, cloudId,
+ folderId, scope, probe, ev, cookie));
+ return;
+ }
+
static const TPermissions availablePermissions {
TPermissions::TPermission::QUERY_INVOKE
| TPermissions::TPermission::MANAGE_PUBLIC
};
Register(new TCreateQueryRequestActor
- (Config, ev->Sender, ev->Cookie, scope, folderId,
+ (Config, sender, cookie, scope, folderId,
std::move(request), std::move(user), std::move(token),
ControlPlaneStorageServiceActorId(),
requestCounters,
- probe, ExtractPermissions(ev, availablePermissions), cloudId, subjectType, std::move(ev->Get()->Quotas)));
+ probe, ExtractPermissions(ev, availablePermissions),
+ cloudId, subjectType, std::move(ev->Get()->Quotas),
+ std::move(ev->Get()->ComputeDatabase)));
}
void Handle(TEvControlPlaneProxy::TEvListQueriesRequest::TPtr& ev) {
@@ -1908,6 +2043,15 @@ private:
return;
}
+ if (!ev->Get()->ComputeDatabase) {
+ Register(new TCreateComputeDatabaseActor<TEvControlPlaneProxy::TEvModifyQueryRequest::TPtr,
+ TEvControlPlaneProxy::TEvModifyQueryResponse>
+ (Counters.GetCommonCounters(RTC_CREATE_COMPUTE_DATABASE),
+ sender, Config, Config.ComputeConfig, cloudId,
+ folderId, scope, probe, ev, cookie));
+ return;
+ }
+
static const TPermissions availablePermissions {
TPermissions::TPermission::QUERY_INVOKE
| TPermissions::TPermission::MANAGE_PUBLIC
@@ -1922,8 +2066,8 @@ private:
std::move(request), std::move(user), std::move(token),
ControlPlaneStorageServiceActorId(),
requestCounters,
- probe,
- ExtractPermissions(ev, availablePermissions), cloudId, subjectType));
+ probe, ExtractPermissions(ev, availablePermissions),
+ cloudId, subjectType, {}, std::move(ev->Get()->ComputeDatabase)));
}
void Handle(TEvControlPlaneProxy::TEvDeleteQueryRequest::TPtr& ev) {
@@ -2315,20 +2459,30 @@ private:
return;
}
+ if (!ev->Get()->ComputeDatabase) {
+ Register(new TCreateComputeDatabaseActor<TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr,
+ TEvControlPlaneProxy::TEvCreateConnectionResponse>
+ (Counters.GetCommonCounters(RTC_CREATE_COMPUTE_DATABASE),
+ sender, Config, Config.ComputeConfig, cloudId,
+ folderId, scope, probe, ev, cookie));
+ return;
+ }
+
static const TPermissions availablePermissions {
TPermissions::TPermission::MANAGE_PUBLIC
};
- if (Config.IsYDBComputeEngineEnabled() && !ydbOperationWasPerformed) {
+ if (Config.ComputeConfig.YdbComputeControlPlaneEnabled() && !ydbOperationWasPerformed) {
Register(new TCreateConnectionInYDBActor(
Counters.GetCommonCounters(RTC_CREATE_CONNECTION_IN_YDB),
- Config.ComputeConfig.GetYdb(),
+ Config.ComputeConfig,
YqSharedResources,
CredentialsProviderFactory,
Config.CommonConfig.GetObjectStorageEndpoint(),
sender,
ev,
cookie,
+ scope,
Config.RequestTimeout));
return;
}
@@ -2341,7 +2495,8 @@ private:
std::move(request), std::move(user), std::move(token),
ControlPlaneStorageServiceActorId(),
requestCounters,
- probe, ExtractPermissions(ev, availablePermissions), cloudId, subjectType));
+ probe, ExtractPermissions(ev, availablePermissions),
+ cloudId, subjectType, {}, std::move(ev->Get()->ComputeDatabase)));
}
void Handle(TEvControlPlaneProxy::TEvListConnectionsRequest::TPtr& ev) {
@@ -2529,6 +2684,15 @@ private:
return;
}
+ if (!ev->Get()->ComputeDatabase) {
+ Register(new TCreateComputeDatabaseActor<TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr,
+ TEvControlPlaneProxy::TEvModifyConnectionResponse>
+ (Counters.GetCommonCounters(RTC_CREATE_COMPUTE_DATABASE),
+ sender, Config, Config.ComputeConfig, cloudId,
+ folderId, scope, probe, ev, cookie));
+ return;
+ }
+
static const TPermissions availablePermissions {
TPermissions::TPermission::MANAGE_PUBLIC
| TPermissions::TPermission::MANAGE_PRIVATE
@@ -2543,7 +2707,8 @@ private:
ControlPlaneStorageServiceActorId(),
requestCounters,
probe,
- ExtractPermissions(ev, availablePermissions), cloudId, subjectType));
+ ExtractPermissions(ev, availablePermissions),
+ cloudId, subjectType, {}, std::move(ev->Get()->ComputeDatabase)));
}
void Handle(TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr& ev) {
@@ -2595,6 +2760,15 @@ private:
return;
}
+ if (!ev->Get()->ComputeDatabase) {
+ Register(new TCreateComputeDatabaseActor<TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr,
+ TEvControlPlaneProxy::TEvDeleteConnectionResponse>
+ (Counters.GetCommonCounters(RTC_CREATE_COMPUTE_DATABASE),
+ sender, Config, Config.ComputeConfig, cloudId,
+ folderId, scope, probe, ev, cookie));
+ return;
+ }
+
static const TPermissions availablePermissions {
TPermissions::TPermission::MANAGE_PUBLIC
| TPermissions::TPermission::MANAGE_PRIVATE
@@ -2609,7 +2783,8 @@ private:
ControlPlaneStorageServiceActorId(),
requestCounters,
probe,
- ExtractPermissions(ev, availablePermissions), cloudId, subjectType));
+ ExtractPermissions(ev, availablePermissions),
+ cloudId, subjectType, {}, std::move(ev->Get()->ComputeDatabase)));
}
void Handle(TEvControlPlaneProxy::TEvTestConnectionRequest::TPtr& ev) {
@@ -2728,23 +2903,33 @@ private:
return;
}
+ if (!ev->Get()->ComputeDatabase) {
+ Register(new TCreateComputeDatabaseActor<TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr,
+ TEvControlPlaneProxy::TEvCreateBindingResponse>
+ (Counters.GetCommonCounters(RTC_CREATE_COMPUTE_DATABASE),
+ sender, Config, Config.ComputeConfig, cloudId,
+ folderId, scope, probe, ev, cookie));
+ return;
+ }
+
static const TPermissions availablePermissions {
TPermissions::TPermission::VIEW_PUBLIC
| TPermissions::TPermission::MANAGE_PUBLIC
| TPermissions::TPermission::MANAGE_PRIVATE
};
- if (Config.IsYDBComputeEngineEnabled() && !ydbOperationWasPerformed) {
+ if (Config.ComputeConfig.YdbComputeControlPlaneEnabled() && !ydbOperationWasPerformed) {
auto permissions = ExtractPermissions(ev, availablePermissions);
Register(new TCreateBindingInYDBActor(
Counters.GetCommonCounters(RTC_CREATE_BINDING_IN_YDB),
- Config.ComputeConfig.GetYdb(),
+ Config.ComputeConfig,
YqSharedResources,
CredentialsProviderFactory,
sender,
ev,
cookie,
std::move(permissions),
+ scope,
Config.RequestTimeout));
return;
}
@@ -2757,7 +2942,8 @@ private:
std::move(request), std::move(user), std::move(token),
ControlPlaneStorageServiceActorId(),
requestCounters,
- probe, ExtractPermissions(ev, availablePermissions), cloudId, subjectType));
+ probe, ExtractPermissions(ev, availablePermissions),
+ cloudId, subjectType, {}, std::move(ev->Get()->ComputeDatabase)));
}
void Handle(TEvControlPlaneProxy::TEvListBindingsRequest::TPtr& ev) {
@@ -2940,6 +3126,15 @@ private:
return;
}
+ if (!ev->Get()->ComputeDatabase) {
+ Register(new TCreateComputeDatabaseActor<TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr,
+ TEvControlPlaneProxy::TEvModifyBindingResponse>
+ (Counters.GetCommonCounters(RTC_CREATE_COMPUTE_DATABASE),
+ sender, Config, Config.ComputeConfig, cloudId,
+ folderId, scope, probe, ev, cookie));
+ return;
+ }
+
static const TPermissions availablePermissions {
TPermissions::TPermission::MANAGE_PUBLIC
| TPermissions::TPermission::MANAGE_PRIVATE
@@ -2954,7 +3149,8 @@ private:
ControlPlaneStorageServiceActorId(),
requestCounters,
probe,
- ExtractPermissions(ev, availablePermissions), cloudId, subjectType));
+ ExtractPermissions(ev, availablePermissions),
+ cloudId, subjectType, {}, std::move(ev->Get()->ComputeDatabase)));
}
void Handle(TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr& ev) {
@@ -3006,6 +3202,15 @@ private:
return;
}
+ if (!ev->Get()->ComputeDatabase) {
+ Register(new TCreateComputeDatabaseActor<TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr,
+ TEvControlPlaneProxy::TEvDeleteBindingResponse>
+ (Counters.GetCommonCounters(RTC_CREATE_COMPUTE_DATABASE),
+ sender, Config, Config.ComputeConfig, cloudId,
+ folderId, scope, probe, ev, cookie));
+ return;
+ }
+
static const TPermissions availablePermissions {
TPermissions::TPermission::MANAGE_PUBLIC
| TPermissions::TPermission::MANAGE_PRIVATE
@@ -3020,7 +3225,8 @@ private:
ControlPlaneStorageServiceActorId(),
requestCounters,
probe,
- ExtractPermissions(ev, availablePermissions), cloudId, subjectType));
+ ExtractPermissions(ev, availablePermissions),
+ cloudId, subjectType, {}, std::move(ev->Get()->ComputeDatabase)));
}
void Handle(NMon::TEvHttpInfo::TPtr& ev) {
diff --git a/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.h b/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.h
index 128f97aa9fb..f624ae93952 100644
--- a/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.h
+++ b/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.h
@@ -1,7 +1,7 @@
#pragma once
#include <ydb/core/fq/libs/actors/logging/log.h>
-#include "ydb/core/fq/libs/config/protos/compute.pb.h"
+#include <ydb/core/fq/libs/config/protos/compute.pb.h>
#include <ydb/core/fq/libs/config/protos/control_plane_proxy.pb.h>
#include <ydb/library/security/ydb_credentials_provider_factory.h>
#include <ydb/core/fq/libs/shared_resources/shared_resources.h>
diff --git a/ydb/core/fq/libs/control_plane_proxy/events/events.h b/ydb/core/fq/libs/control_plane_proxy/events/events.h
index e01117000d7..e027245b724 100644
--- a/ydb/core/fq/libs/control_plane_proxy/events/events.h
+++ b/ydb/core/fq/libs/control_plane_proxy/events/events.h
@@ -93,6 +93,7 @@ struct TEvControlPlaneProxy {
TTenantInfo::TPtr TenantInfo;
TString SubjectType;
bool ComputeYDBOperationWasPerformed;
+ TMaybe<FederatedQuery::Internal::ComputeDatabaseInternal> ComputeDatabase;
};
template<typename TDerived, typename ProtoMessage, ui32 EventType>
diff --git a/ydb/core/fq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp b/ydb/core/fq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp
index d1c22bb089f..f0aeea941be 100644
--- a/ydb/core/fq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp
+++ b/ydb/core/fq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp
@@ -380,7 +380,6 @@ private:
{
TRuntimePtr runtime(new TTestBasicRuntime());
runtime->SetLogPriority(NKikimrServices::STREAMS_CONTROL_PLANE_SERVICE, NLog::PRI_DEBUG);
-
auto controlPlaneProxy = CreateControlPlaneProxyActor(
Config,
ComputeConfig,
diff --git a/ydb/core/fq/libs/control_plane_proxy/ya.make b/ydb/core/fq/libs/control_plane_proxy/ya.make
index 980fde857fc..264f857556a 100644
--- a/ydb/core/fq/libs/control_plane_proxy/ya.make
+++ b/ydb/core/fq/libs/control_plane_proxy/ya.make
@@ -9,8 +9,10 @@ SRCS(
PEERDIR(
library/cpp/actors/core
ydb/core/base
- ydb/core/fq/libs/actors/logging
ydb/core/fq/libs/actors
+ ydb/core/fq/libs/actors/logging
+ ydb/core/fq/libs/compute/ydb
+ ydb/core/fq/libs/compute/ydb/control_plane
ydb/core/fq/libs/control_plane_config
ydb/core/fq/libs/control_plane_proxy/events
ydb/core/fq/libs/control_plane_storage
diff --git a/ydb/core/fq/libs/control_plane_storage/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/control_plane_storage/CMakeLists.darwin-x86_64.txt
index 2d7c639ac17..be2bb947e2e 100644
--- a/ydb/core/fq/libs/control_plane_storage/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/fq/libs/control_plane_storage/CMakeLists.darwin-x86_64.txt
@@ -51,6 +51,7 @@ target_sources(fq-libs-control_plane_storage PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/validators.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_bindings.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_compute_database.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_connections.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp
diff --git a/ydb/core/fq/libs/control_plane_storage/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/control_plane_storage/CMakeLists.linux-aarch64.txt
index 02563d00a11..a1e8304a36d 100644
--- a/ydb/core/fq/libs/control_plane_storage/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/fq/libs/control_plane_storage/CMakeLists.linux-aarch64.txt
@@ -52,6 +52,7 @@ target_sources(fq-libs-control_plane_storage PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/validators.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_bindings.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_compute_database.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_connections.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp
diff --git a/ydb/core/fq/libs/control_plane_storage/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/control_plane_storage/CMakeLists.linux-x86_64.txt
index 02563d00a11..a1e8304a36d 100644
--- a/ydb/core/fq/libs/control_plane_storage/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/fq/libs/control_plane_storage/CMakeLists.linux-x86_64.txt
@@ -52,6 +52,7 @@ target_sources(fq-libs-control_plane_storage PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/validators.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_bindings.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_compute_database.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_connections.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp
diff --git a/ydb/core/fq/libs/control_plane_storage/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/control_plane_storage/CMakeLists.windows-x86_64.txt
index 2d7c639ac17..be2bb947e2e 100644
--- a/ydb/core/fq/libs/control_plane_storage/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/fq/libs/control_plane_storage/CMakeLists.windows-x86_64.txt
@@ -51,6 +51,7 @@ target_sources(fq-libs-control_plane_storage PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/validators.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_bindings.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_compute_database.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_connections.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp
diff --git a/ydb/core/fq/libs/control_plane_storage/events/events.h b/ydb/core/fq/libs/control_plane_storage/events/events.h
index 1a6b902e791..bc3f49e3eb2 100644
--- a/ydb/core/fq/libs/control_plane_storage/events/events.h
+++ b/ydb/core/fq/libs/control_plane_storage/events/events.h
@@ -164,6 +164,10 @@ struct TEvControlPlaneStorage {
EvDeleteRateLimiterResourceRequest,
EvDeleteRateLimiterResourceResponse,
EvDbRequestResult, // private // internal_events.h
+ EvCreateDatabaseRequest,
+ EvCreateDatabaseResponse,
+ EvDescribeDatabaseRequest,
+ EvDescribeDatabaseResponse,
EvEnd,
};
@@ -180,7 +184,8 @@ struct TEvControlPlaneStorage {
const TString& cloudId,
TPermissions permissions,
TMaybe<TQuotaMap> quotas,
- TTenantInfo::TPtr tenantInfo)
+ TTenantInfo::TPtr tenantInfo,
+ const FederatedQuery::Internal::ComputeDatabaseInternal& computeDatabase)
: Scope(scope)
, Request(request)
, User(user)
@@ -189,6 +194,7 @@ struct TEvControlPlaneStorage {
, Permissions(permissions)
, Quotas(std::move(quotas))
, TenantInfo(tenantInfo)
+ , ComputeDatabase(computeDatabase)
{
}
@@ -209,6 +215,7 @@ struct TEvControlPlaneStorage {
TPermissions Permissions;
TMaybe<TQuotaMap> Quotas;
TTenantInfo::TPtr TenantInfo;
+ FederatedQuery::Internal::ComputeDatabaseInternal ComputeDatabase;
};
template<typename TDerived, typename ProtoMessage, ui32 EventType>
@@ -612,6 +619,92 @@ struct TEvControlPlaneStorage {
NYql::TIssues Issues;
TDebugInfoPtr DebugInfo;
};
+
+ struct TEvCreateDatabaseRequest : NActors::TEventLocal<TEvCreateDatabaseRequest, EvCreateDatabaseRequest> {
+ TEvCreateDatabaseRequest() = default;
+
+ explicit TEvCreateDatabaseRequest(const TString& cloudId, const TString& scope, const FederatedQuery::Internal::ComputeDatabaseInternal& record)
+ : CloudId(cloudId)
+ , Scope(scope)
+ , Record(record)
+ {}
+
+ size_t GetByteSize() const {
+ return sizeof(*this)
+ + Scope.Size();
+ }
+
+ TString CloudId;
+ TString Scope;
+ FederatedQuery::Internal::ComputeDatabaseInternal Record;
+ };
+
+ struct TEvCreateDatabaseResponse : NActors::TEventLocal<TEvCreateDatabaseResponse, EvCreateDatabaseResponse> {
+ static constexpr bool Auditable = false;
+
+ explicit TEvCreateDatabaseResponse()
+ {}
+
+ explicit TEvCreateDatabaseResponse(
+ const NYql::TIssues& issues
+ )
+ : Issues(issues)
+ {}
+
+ size_t GetByteSize() const {
+ return sizeof(*this)
+ + GetIssuesByteSize(Issues)
+ + GetDebugInfoByteSize(DebugInfo);
+ }
+
+ NYql::TIssues Issues;
+ TDebugInfoPtr DebugInfo;
+ };
+
+ struct TEvDescribeDatabaseRequest : NActors::TEventLocal<TEvDescribeDatabaseRequest, EvDescribeDatabaseRequest> {
+
+ TEvDescribeDatabaseRequest() = default;
+
+ explicit TEvDescribeDatabaseRequest(const TString& cloudId, const TString& scope)
+ : CloudId(cloudId)
+ , Scope(scope)
+ {}
+
+ size_t GetByteSize() const {
+ return sizeof(*this)
+ + Scope.Size();
+ }
+
+ google::protobuf::Empty Request;
+ TString CloudId;
+ TString Scope;
+ };
+
+ struct TEvDescribeDatabaseResponse : NActors::TEventLocal<TEvDescribeDatabaseResponse, EvDescribeDatabaseResponse> {
+ static constexpr bool Auditable = false;
+
+ explicit TEvDescribeDatabaseResponse(
+ const FederatedQuery::Internal::ComputeDatabaseInternal& record)
+ : Record(record)
+ {}
+
+ explicit TEvDescribeDatabaseResponse(
+ const NYql::TIssues& issues
+ )
+ : Issues(issues)
+ {}
+
+ size_t GetByteSize() const {
+ return sizeof(*this)
+ + Record.ByteSizeLong()
+ + GetIssuesByteSize(Issues)
+ + GetDebugInfoByteSize(DebugInfo);
+ }
+
+ FederatedQuery::Internal::ComputeDatabaseInternal Record;
+ NYql::TIssues Issues;
+ TDebugInfoPtr DebugInfo;
+ };
};
}
diff --git a/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp b/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp
index a86d817e87d..cda14c2a5c4 100644
--- a/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp
+++ b/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp
@@ -7,6 +7,8 @@
#include <ydb/core/fq/libs/control_plane_storage/schema.h>
#include <ydb/core/fq/libs/db_schema/db_schema.h>
+#include <ydb/public/lib/fq/scope.h>
+
#include <library/cpp/protobuf/interop/cast.h>
namespace NFq {
@@ -494,6 +496,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ
newTask->set_execution_id(task.Internal.execution_id());
newTask->set_operation_id(task.Internal.operation_id());
+ *newTask->mutable_compute_connection() = task.Internal.compute_connection();
}
return result;
diff --git a/ydb/core/fq/libs/control_plane_storage/probes.h b/ydb/core/fq/libs/control_plane_storage/probes.h
index 92a4dceb2c6..c0f1881cc2b 100644
--- a/ydb/core/fq/libs/control_plane_storage/probes.h
+++ b/ydb/core/fq/libs/control_plane_storage/probes.h
@@ -107,6 +107,14 @@
GROUPS(), \
TYPES(TString, TDuration, bool), \
NAMES("queryId", "latencyMs", "success")) \
+ PROBE(CreateDatabaseRequest, \
+ GROUPS(), \
+ TYPES(TString, TString, TDuration, i64, bool), \
+ NAMES("scope", "user", "latencyMs", "size", "success")) \
+ PROBE(DescribeDatabaseRequest, \
+ GROUPS(), \
+ TYPES(TString, TString, TDuration, i64, bool), \
+ NAMES("scope", "user", "latencyMs", "size", "success")) \
// YQ_CONTROL_PLANE_STORAGE_PROVIDER
diff --git a/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto b/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto
index ff7d943d412..5d30311219c 100644
--- a/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto
+++ b/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto
@@ -5,12 +5,15 @@ package FederatedQuery.Internal;
option java_package = "com.yandex.query.internal";
option java_outer_classname = "YandexQueryInternalProtos";
-import "ydb/library/yql/providers/dq/api/protos/service.proto";
-import "ydb/library/yql/dq/actors/protos/dq_status_codes.proto";
-import "ydb/library/yql/dq/proto/dq_tasks.proto";
-import "ydb/public/api/protos/ydb_issue_message.proto";
+import "ydb/core/fq/libs/config/protos/storage.proto";
import "ydb/core/fq/libs/protos/fq_private.proto";
+
import "ydb/public/api/protos/draft/fq.proto";
+import "ydb/public/api/protos/ydb_issue_message.proto";
+
+import "ydb/library/yql/dq/actors/protos/dq_status_codes.proto";
+import "ydb/library/yql/dq/proto/dq_tasks.proto";
+import "ydb/library/yql/providers/dq/api/protos/service.proto";
import "google/protobuf/duration.proto";
@@ -44,6 +47,7 @@ message QueryInternal {
NYql.NDqProto.StatusIds.StatusCode status_code = 23;
string operation_id = 24;
string execution_id = 25;
+ NFq.NConfig.TYdbStorageConfig compute_connection = 26;
}
message JobInternal {
@@ -59,3 +63,8 @@ message ConnectionInternal {
message BindingInternal {
string cloud_id = 2;
}
+
+message ComputeDatabaseInternal {
+ string id = 1;
+ NFq.NConfig.TYdbStorageConfig connection = 2;
+}
diff --git a/ydb/core/fq/libs/control_plane_storage/schema.h b/ydb/core/fq/libs/control_plane_storage/schema.h
index 3d7e541f7d9..5cbcc6b7b7a 100644
--- a/ydb/core/fq/libs/control_plane_storage/schema.h
+++ b/ydb/core/fq/libs/control_plane_storage/schema.h
@@ -15,6 +15,7 @@ namespace NFq {
#define TENANTS_TABLE_NAME "tenants"
#define TENANT_ACKS_TABLE_NAME "tenant_acks"
#define MAPPINGS_TABLE_NAME "mappings"
+#define COMPUTE_DATABASES_TABLE_NAME "compute_databases"
// columns
#define SCOPE_COLUMN_NAME "scope"
diff --git a/ydb/core/fq/libs/control_plane_storage/ya.make b/ydb/core/fq/libs/control_plane_storage/ya.make
index 8912d9e9ccf..269be37204c 100644
--- a/ydb/core/fq/libs/control_plane_storage/ya.make
+++ b/ydb/core/fq/libs/control_plane_storage/ya.make
@@ -10,6 +10,7 @@ SRCS(
validators.cpp
ydb_control_plane_storage.cpp
ydb_control_plane_storage_bindings.cpp
+ ydb_control_plane_storage_compute_database.cpp
ydb_control_plane_storage_connections.cpp
ydb_control_plane_storage_queries.cpp
ydb_control_plane_storage_quotas.cpp
diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp
index b5e4636e130..2a9aa170e66 100644
--- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp
+++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp
@@ -53,6 +53,7 @@ void TYdbControlPlaneStorageActor::Bootstrap() {
CreateTenantsTable();
CreateTenantAcksTable();
CreateMappingsTable();
+ CreateComputeDatabasesTable();
Become(&TThis::StateFunc);
}
@@ -304,6 +305,18 @@ void TYdbControlPlaneStorageActor::CreateMappingsTable()
RunCreateTableActor(tablePath, TTableDescription(description));
}
+void TYdbControlPlaneStorageActor::CreateComputeDatabasesTable()
+{
+ auto tablePath = JoinPath(YdbConnection->TablePathPrefix, COMPUTE_DATABASES_TABLE_NAME);
+ auto description = TTableBuilder()
+ .AddNullableColumn(SCOPE_COLUMN_NAME, EPrimitiveType::String)
+ .AddNullableColumn(INTERNAL_COLUMN_NAME, EPrimitiveType::String)
+ .SetPrimaryKeyColumns({SCOPE_COLUMN_NAME})
+ .Build();
+
+ RunCreateTableActor(tablePath, TTableDescription(description));
+}
+
void TYdbControlPlaneStorageActor::AfterTablesCreated() {
// Schedule(TDuration::Zero(), new NActors::TEvents::TEvWakeup());
}
diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_compute_database.cpp b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_compute_database.cpp
new file mode 100644
index 00000000000..6b0d32b32fd
--- /dev/null
+++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_compute_database.cpp
@@ -0,0 +1,118 @@
+#include "validators.h"
+#include "ydb_control_plane_storage_impl.h"
+
+#include <util/string/join.h>
+
+#include <ydb/public/api/protos/draft/fq.pb.h>
+
+#include <ydb/core/fq/libs/config/protos/issue_id.pb.h>
+#include <ydb/core/fq/libs/db_schema/db_schema.h>
+
+namespace NFq {
+
+void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateDatabaseRequest::TPtr& ev)
+{
+ TInstant startTime = TInstant::Now();
+ const TEvControlPlaneStorage::TEvCreateDatabaseRequest& event = *ev->Get();
+ const TString cloudId = event.CloudId;
+ const TString scope = event.Scope;
+ TRequestCounters requestCounters = Counters.GetCounters(cloudId, scope, RTS_CREATE_DATABASE, RTC_CREATE_DATABASE);
+ requestCounters.IncInFly();
+ requestCounters.Common->RequestBytes->Add(event.GetByteSize());
+ const FederatedQuery::Internal::ComputeDatabaseInternal& request = event.Record;
+ const int byteSize = request.ByteSize();
+
+ CPS_LOG_T(MakeLogPrefix(scope, "internal", request.id())
+ << "CreateDatabaseRequest: "
+ << request.DebugString());
+
+ TSqlQueryBuilder queryBuilder(YdbConnection->TablePathPrefix, "CreateDatabase");
+ queryBuilder.AddString("scope", scope);
+ queryBuilder.AddString("internal", request.SerializeAsString());
+
+ queryBuilder.AddText(
+ "INSERT INTO `" COMPUTE_DATABASES_TABLE_NAME "` (`" SCOPE_COLUMN_NAME "`, `" INTERNAL_COLUMN_NAME "`) VALUES\n"
+ " ($scope, $internal);"
+ );
+
+ const auto query = queryBuilder.Build();
+ auto debugInfo = Config->Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
+ TAsyncStatus result = Write(query.Sql, query.Params, requestCounters, debugInfo);
+ auto prepare = [] { return std::make_tuple<NYql::TIssues>(NYql::TIssues{}); };
+ auto success = SendResponseTuple<TEvControlPlaneStorage::TEvCreateDatabaseResponse, std::tuple<NYql::TIssues>>(
+ MakeLogPrefix(scope, "internal", request.id()) + "CreateDatabaseRequest",
+ NActors::TActivationContext::ActorSystem(),
+ result,
+ SelfId(),
+ ev,
+ startTime,
+ requestCounters,
+ prepare,
+ debugInfo);
+
+ success.Apply([=](const auto& future) {
+ TDuration delta = TInstant::Now() - startTime;
+ LWPROBE(CreateDatabaseRequest, scope, "internal", delta, byteSize, future.GetValue());
+ });
+}
+
+void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDescribeDatabaseRequest::TPtr& ev)
+{
+ TInstant startTime = TInstant::Now();
+ const TEvControlPlaneStorage::TEvDescribeDatabaseRequest& event = *ev->Get();
+ const TString cloudId = event.CloudId;
+ const TString scope = event.Scope;
+ TRequestCounters requestCounters = Counters.GetCounters(cloudId, scope, RTS_DESCRIBE_DATABASE, RTC_DESCRIBE_DATABASE);
+ requestCounters.IncInFly();
+ requestCounters.Common->RequestBytes->Add(event.GetByteSize());
+ const auto byteSize = event.GetByteSize();
+
+ CPS_LOG_T(MakeLogPrefix(scope, "internal", scope)
+ << "DescribeDatabaseRequest");
+
+ TSqlQueryBuilder queryBuilder(YdbConnection->TablePathPrefix, "DescribeDatabase");
+ queryBuilder.AddString("scope", scope);
+ queryBuilder.AddText(
+ "SELECT `" INTERNAL_COLUMN_NAME "` FROM `" COMPUTE_DATABASES_TABLE_NAME "`\n"
+ "WHERE `" SCOPE_COLUMN_NAME "` = $scope;"
+ );
+
+ const auto query = queryBuilder.Build();
+ auto debugInfo = Config->Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
+ auto [result, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo);
+ auto prepare = [=, resultSets=resultSets] {
+ if (resultSets->size() != 1) {
+ ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 1 but equal " << resultSets->size() << ". Please contact internal support";
+ }
+
+ TResultSetParser parser(resultSets->front());
+ if (!parser.TryNextRow()) {
+ ythrow TCodeLineException(TIssuesIds::ACCESS_DENIED) << "Database does not exist or permission denied. Please check the id database or your access rights";
+ }
+
+ FederatedQuery::Internal::ComputeDatabaseInternal result;
+ if (!result.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) {
+ ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for internal compute database. Please contact internal support";
+ }
+
+ return result;
+ };
+
+ auto success = SendResponse<TEvControlPlaneStorage::TEvDescribeDatabaseResponse, FederatedQuery::Internal::ComputeDatabaseInternal>(
+ MakeLogPrefix(scope, "internal", scope) + "DescribeDatabaseRequest",
+ NActors::TActivationContext::ActorSystem(),
+ result,
+ SelfId(),
+ ev,
+ startTime,
+ requestCounters,
+ prepare,
+ debugInfo);
+
+ success.Apply([=](const auto& future) {
+ TDuration delta = TInstant::Now() - startTime;
+ LWPROBE(DescribeDatabaseRequest, scope, "internal", delta, byteSize, future.GetValue());
+ });
+}
+
+} // NFq
diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_impl.h b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
index f8f9ad6e46a..408413ee902 100644
--- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
+++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
@@ -381,6 +381,8 @@ class TYdbControlPlaneStorageActor : public NActors::TActorBootstrapped<TYdbCont
RTS_MODIFY_BINDING,
RTS_DELETE_BINDING,
RTS_PING_TASK,
+ RTS_CREATE_DATABASE,
+ RTS_DESCRIBE_DATABASE,
RTS_MAX,
};
@@ -405,7 +407,9 @@ class TYdbControlPlaneStorageActor : public NActors::TActorBootstrapped<TYdbCont
"DescribeBinding",
"ModifyBinding",
"DeleteBinding",
- "PingTask"
+ "PingTask",
+ "CreateDatabase",
+ "DescribeDatabase"
};
enum ERequestTypeCommon {
@@ -436,6 +440,8 @@ class TYdbControlPlaneStorageActor : public NActors::TActorBootstrapped<TYdbCont
RTC_MODIFY_BINDING,
RTC_DELETE_BINDING,
RTC_PING_TASK,
+ RTC_CREATE_DATABASE,
+ RTC_DESCRIBE_DATABASE,
RTC_MAX,
};
@@ -487,6 +493,8 @@ class TYdbControlPlaneStorageActor : public NActors::TActorBootstrapped<TYdbCont
{ MakeIntrusive<TRequestCommonCounters>("ModifyBinding") },
{ MakeIntrusive<TRequestCommonCounters>("DeleteBinding") },
{ MakeIntrusive<TRequestCommonCounters>("PingTask") },
+ { MakeIntrusive<TRequestCommonCounters>("CreateDatabase") },
+ { MakeIntrusive<TRequestCommonCounters>("DescribeDatabase") }
});
TTtlCache<TMetricsScope, TScopeCountersPtr, TMap> ScopeCounters{TTtlCacheSettings{}.SetTtl(TDuration::Days(1))};
@@ -621,6 +629,8 @@ public:
hFunc(TEvQuotaService::TQuotaLimitChangeRequest, Handle);
hFunc(TEvents::TEvCallback, [](TEvents::TEvCallback::TPtr& ev) { ev->Get()->Callback(); } );
hFunc(TEvents::TEvSchemaCreated, Handle);
+ hFunc(TEvControlPlaneStorage::TEvCreateDatabaseRequest, Handle);
+ hFunc(TEvControlPlaneStorage::TEvDescribeDatabaseRequest, Handle);
)
void Handle(TEvControlPlaneStorage::TEvCreateQueryRequest::TPtr& ev);
@@ -659,6 +669,9 @@ public:
void Handle(TEvQuotaService::TQuotaUsageRequest::TPtr& ev);
void Handle(TEvQuotaService::TQuotaLimitChangeRequest::TPtr& ev);
+ void Handle(TEvControlPlaneStorage::TEvCreateDatabaseRequest::TPtr& ev);
+ void Handle(TEvControlPlaneStorage::TEvDescribeDatabaseRequest::TPtr& ev);
+
template <class TEventPtr, class TRequestActor, ERequestTypeCommon requestType>
void HandleRateLimiterImpl(TEventPtr& ev);
@@ -690,6 +703,7 @@ public:
void CreateTenantsTable();
void CreateTenantAcksTable();
void CreateMappingsTable();
+ void CreateComputeDatabasesTable();
void RunCreateTableActor(const TString& path, NYdb::NTable::TTableDescription desc);
void AfterTablesCreated();
diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
index 5a8fe10221a..9c8764e5df8 100644
--- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
+++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
@@ -52,6 +52,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery
const TEvControlPlaneStorage::TEvCreateQueryRequest& event = *ev->Get();
const TString cloudId = event.CloudId;
const FederatedQuery::CreateQueryRequest& request = event.Request;
+ const FederatedQuery::Internal::ComputeDatabaseInternal& computeDatabase = event.ComputeDatabase;
ui64 resultLimit = 0;
if (event.Quotas) {
if (auto it = event.Quotas->find(QUOTA_QUERY_RESULT_LIMIT); it != event.Quotas->end()) {
@@ -212,7 +213,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery
if (request.execute_mode() != FederatedQuery::SAVE) {
// TODO: move to run actor priority selection
-
+ *queryInternal.mutable_compute_connection() = computeDatabase.connection();
TSet<TString> disabledConnections;
for (const auto& connection: GetEntities<FederatedQuery::Connection>(resultSets[resultSets.size() - 2], CONNECTION_COLUMN_NAME)) {
if (!Config->AvailableConnections.contains(connection.content().setting().connection_case())) {
@@ -735,6 +736,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyQuery
permissions.SetAll();
}
FederatedQuery::ModifyQueryRequest& request = event.Request;
+ FederatedQuery::Internal::ComputeDatabaseInternal& computeDatabase = event.ComputeDatabase;
const TString queryId = request.query_id();
const int byteSize = request.ByteSize();
const int64_t previousRevision = request.previous_revision();
@@ -873,6 +875,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyQuery
internal.clear_binding();
internal.clear_connection();
internal.clear_resources();
+ *internal.mutable_compute_connection() = computeDatabase.connection();
// TODO: move to run actor priority selection
TSet<TString> disabledConnections;
diff --git a/ydb/core/fq/libs/events/event_subspace.h b/ydb/core/fq/libs/events/event_subspace.h
index 2a9f2117dee..b7c8f34a54a 100644
--- a/ydb/core/fq/libs/events/event_subspace.h
+++ b/ydb/core/fq/libs/events/event_subspace.h
@@ -30,6 +30,7 @@ struct TYqEventSubspace {
QuotaService,
RateLimiter,
ControlPlaneConfig,
+ YdbCompute,
SubspacesEnd,
};
diff --git a/ydb/core/fq/libs/init/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/init/CMakeLists.darwin-x86_64.txt
index 6972b937899..588923a63f0 100644
--- a/ydb/core/fq/libs/init/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/fq/libs/init/CMakeLists.darwin-x86_64.txt
@@ -23,6 +23,7 @@ target_link_libraries(fq-libs-init PUBLIC
fq-libs-checkpointing
fq-libs-cloud_audit
fq-libs-common
+ compute-ydb-control_plane
fq-libs-control_plane_config
fq-libs-control_plane_proxy
fq-libs-control_plane_storage
diff --git a/ydb/core/fq/libs/init/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/init/CMakeLists.linux-aarch64.txt
index edfaf53c423..8bbcde4f247 100644
--- a/ydb/core/fq/libs/init/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/fq/libs/init/CMakeLists.linux-aarch64.txt
@@ -24,6 +24,7 @@ target_link_libraries(fq-libs-init PUBLIC
fq-libs-checkpointing
fq-libs-cloud_audit
fq-libs-common
+ compute-ydb-control_plane
fq-libs-control_plane_config
fq-libs-control_plane_proxy
fq-libs-control_plane_storage
diff --git a/ydb/core/fq/libs/init/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/init/CMakeLists.linux-x86_64.txt
index edfaf53c423..8bbcde4f247 100644
--- a/ydb/core/fq/libs/init/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/fq/libs/init/CMakeLists.linux-x86_64.txt
@@ -24,6 +24,7 @@ target_link_libraries(fq-libs-init PUBLIC
fq-libs-checkpointing
fq-libs-cloud_audit
fq-libs-common
+ compute-ydb-control_plane
fq-libs-control_plane_config
fq-libs-control_plane_proxy
fq-libs-control_plane_storage
diff --git a/ydb/core/fq/libs/init/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/init/CMakeLists.windows-x86_64.txt
index 6972b937899..588923a63f0 100644
--- a/ydb/core/fq/libs/init/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/fq/libs/init/CMakeLists.windows-x86_64.txt
@@ -23,6 +23,7 @@ target_link_libraries(fq-libs-init PUBLIC
fq-libs-checkpointing
fq-libs-cloud_audit
fq-libs-common
+ compute-ydb-control_plane
fq-libs-control_plane_config
fq-libs-control_plane_proxy
fq-libs-control_plane_storage
diff --git a/ydb/core/fq/libs/init/init.cpp b/ydb/core/fq/libs/init/init.cpp
index 20a61112756..ee21d26d4a0 100644
--- a/ydb/core/fq/libs/init/init.cpp
+++ b/ydb/core/fq/libs/init/init.cpp
@@ -1,15 +1,14 @@
#include "init.h"
-#include <ydb/core/fq/libs/control_plane_storage/control_plane_storage.h>
-#include <ydb/core/fq/libs/test_connection/test_connection.h>
-
#include <ydb/core/fq/libs/audit/yq_audit_service.h>
#include <ydb/core/fq/libs/checkpoint_storage/storage_service.h>
+#include <ydb/core/fq/libs/checkpoint_storage/storage_service.h>
#include <ydb/core/fq/libs/cloud_audit/yq_cloud_audit_service.h>
+#include <ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h>
#include <ydb/core/fq/libs/control_plane_config/control_plane_config.h>
#include <ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.h>
+#include <ydb/core/fq/libs/control_plane_storage/control_plane_storage.h>
#include <ydb/core/fq/libs/health/health.h>
-#include <ydb/core/fq/libs/checkpoint_storage/storage_service.h>
#include <ydb/core/fq/libs/private_client/internal_service.h>
#include <ydb/core/fq/libs/private_client/loopback_service.h>
#include <ydb/core/fq/libs/quota_manager/quota_manager.h>
@@ -19,6 +18,8 @@
#include <ydb/core/fq/libs/rate_limiter/events/data_plane.h>
#include <ydb/core/fq/libs/rate_limiter/quoter_service/quoter_service.h>
#include <ydb/core/fq/libs/shared_resources/shared_resources.h>
+#include <ydb/core/fq/libs/test_connection/test_connection.h>
+
#include <ydb/library/folder_service/folder_service.h>
#include <ydb/library/yql/providers/common/metrics/service_counters.h>
@@ -107,6 +108,11 @@ void Init(
actorRegistrator(NFq::ControlPlaneProxyActorId(), controlPlaneProxy);
}
+ if (protoConfig.GetCompute().GetYdb().GetEnable() && protoConfig.GetCompute().GetYdb().GetControlPlane().GetEnable()) {
+ auto computeDatabaseService = NFq::CreateComputeDatabaseControlPlaneServiceActor(protoConfig.GetCompute(), NKikimr::CreateYdbCredentialsProviderFactory);
+ actorRegistrator(NFq::ComputeDatabaseControlPlaneServiceActorId(), computeDatabaseService.release());
+ }
+
if (protoConfig.GetRateLimiter().GetControlPlaneEnabled()) {
Y_VERIFY(protoConfig.GetQuotasManager().GetEnabled()); // Rate limiter resources want to know CPU quota on creation
NActors::IActor* rateLimiterService = NFq::CreateRateLimiterControlPlaneService(protoConfig.GetRateLimiter(), yqSharedResources, NKikimr::CreateYdbCredentialsProviderFactory);
diff --git a/ydb/core/fq/libs/init/ya.make b/ydb/core/fq/libs/init/ya.make
index 9693c2038f1..3278a3f60c0 100644
--- a/ydb/core/fq/libs/init/ya.make
+++ b/ydb/core/fq/libs/init/ya.make
@@ -14,6 +14,7 @@ PEERDIR(
ydb/core/fq/libs/checkpointing
ydb/core/fq/libs/cloud_audit
ydb/core/fq/libs/common
+ ydb/core/fq/libs/compute/ydb/control_plane
ydb/core/fq/libs/control_plane_config
ydb/core/fq/libs/control_plane_proxy
ydb/core/fq/libs/control_plane_storage
diff --git a/ydb/core/fq/libs/protos/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/protos/CMakeLists.darwin-x86_64.txt
index 3cd22906186..7e39a3f60b1 100644
--- a/ydb/core/fq/libs/protos/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/fq/libs/protos/CMakeLists.darwin-x86_64.txt
@@ -35,8 +35,9 @@ add_library(fq-libs-protos)
target_link_libraries(fq-libs-protos PUBLIC
contrib-libs-cxxsupp
yutil
- api-protos
+ libs-config-protos
dq-actors-protos
+ api-protos
contrib-libs-protobuf
)
target_proto_messages(fq-libs-protos PRIVATE
diff --git a/ydb/core/fq/libs/protos/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/protos/CMakeLists.linux-aarch64.txt
index 7a522343447..4ebf71e3764 100644
--- a/ydb/core/fq/libs/protos/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/fq/libs/protos/CMakeLists.linux-aarch64.txt
@@ -36,8 +36,9 @@ target_link_libraries(fq-libs-protos PUBLIC
contrib-libs-linux-headers
contrib-libs-cxxsupp
yutil
- api-protos
+ libs-config-protos
dq-actors-protos
+ api-protos
contrib-libs-protobuf
)
target_proto_messages(fq-libs-protos PRIVATE
diff --git a/ydb/core/fq/libs/protos/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/protos/CMakeLists.linux-x86_64.txt
index 7a522343447..4ebf71e3764 100644
--- a/ydb/core/fq/libs/protos/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/fq/libs/protos/CMakeLists.linux-x86_64.txt
@@ -36,8 +36,9 @@ target_link_libraries(fq-libs-protos PUBLIC
contrib-libs-linux-headers
contrib-libs-cxxsupp
yutil
- api-protos
+ libs-config-protos
dq-actors-protos
+ api-protos
contrib-libs-protobuf
)
target_proto_messages(fq-libs-protos PRIVATE
diff --git a/ydb/core/fq/libs/protos/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/protos/CMakeLists.windows-x86_64.txt
index 3cd22906186..7e39a3f60b1 100644
--- a/ydb/core/fq/libs/protos/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/fq/libs/protos/CMakeLists.windows-x86_64.txt
@@ -35,8 +35,9 @@ add_library(fq-libs-protos)
target_link_libraries(fq-libs-protos PUBLIC
contrib-libs-cxxsupp
yutil
- api-protos
+ libs-config-protos
dq-actors-protos
+ api-protos
contrib-libs-protobuf
)
target_proto_messages(fq-libs-protos PRIVATE
diff --git a/ydb/core/fq/libs/protos/fq_private.proto b/ydb/core/fq/libs/protos/fq_private.proto
index 9320cdaf024..b1e9003e876 100644
--- a/ydb/core/fq/libs/protos/fq_private.proto
+++ b/ydb/core/fq/libs/protos/fq_private.proto
@@ -3,6 +3,7 @@ option cc_enable_arenas = true;
package Fq.Private;
+import "ydb/core/fq/libs/config/protos/storage.proto";
import "ydb/core/fq/libs/protos/dq_effects.proto";
import "ydb/library/yql/dq/actors/protos/dq_status_codes.proto";
@@ -116,6 +117,7 @@ message GetTaskResult {
FederatedQuery.QueryContent.QuerySyntax query_syntax = 34;
string operation_id = 35;
string execution_id = 36;
+ NFq.NConfig.TYdbStorageConfig compute_connection = 37;
}
repeated Task tasks = 1;
}
diff --git a/ydb/core/fq/libs/protos/ya.make b/ydb/core/fq/libs/protos/ya.make
index f53349ac5b8..c9ae3ae3da1 100644
--- a/ydb/core/fq/libs/protos/ya.make
+++ b/ydb/core/fq/libs/protos/ya.make
@@ -1,8 +1,9 @@
PROTO_LIBRARY()
PEERDIR(
- ydb/public/api/protos
+ ydb/core/fq/libs/config/protos
ydb/library/yql/dq/actors/protos
+ ydb/public/api/protos
)
SRCS(
diff --git a/ydb/core/fq/libs/test_connection/events/events.h b/ydb/core/fq/libs/test_connection/events/events.h
index 875077e96eb..743c706cc45 100644
--- a/ydb/core/fq/libs/test_connection/events/events.h
+++ b/ydb/core/fq/libs/test_connection/events/events.h
@@ -32,7 +32,8 @@ struct TEvTestConnection {
const TString& cloudId,
const TPermissions& permissions,
TMaybe<TQuotaMap> quotas,
- TTenantInfo::TPtr tenantInfo)
+ TTenantInfo::TPtr tenantInfo,
+ const FederatedQuery::Internal::ComputeDatabaseInternal& computeDatabase)
: CloudId(cloudId)
, Scope(scope)
, Request(request)
@@ -41,6 +42,7 @@ struct TEvTestConnection {
, Permissions(permissions)
, Quotas(std::move(quotas))
, TenantInfo(tenantInfo)
+ , ComputeDatabase(computeDatabase)
{
}
@@ -52,6 +54,7 @@ struct TEvTestConnection {
TPermissions Permissions;
const TMaybe<TQuotaMap> Quotas;
TTenantInfo::TPtr TenantInfo;
+ TMaybe<FederatedQuery::Internal::ComputeDatabaseInternal> ComputeDatabase;
};
struct TEvTestConnectionResponse : NActors::TEventLocal<TEvTestConnectionResponse, EvTestConnectionResponse> {
diff --git a/ydb/core/fq/libs/ydb/ydb.cpp b/ydb/core/fq/libs/ydb/ydb.cpp
index b20d38c672c..ca72290bccc 100644
--- a/ydb/core/fq/libs/ydb/ydb.cpp
+++ b/ydb/core/fq/libs/ydb/ydb.cpp
@@ -296,4 +296,22 @@ TFuture<TStatus> RollbackTransaction(const TGenerationContextPtr& context) {
return future;
}
+NKikimr::TYdbCredentialsSettings GetYdbCredentialSettings(const NConfig::TYdbStorageConfig& config) {
+ TString oauth;
+ if (config.GetToken()) {
+ oauth = config.GetToken();
+ } else if (config.GetOAuthFile()) {
+ oauth = StripString(TFileInput(config.GetOAuthFile()).ReadAll());
+ } else {
+ oauth = GetEnv("YDB_TOKEN");
+ }
+
+ NKikimr::TYdbCredentialsSettings credSettings;
+ credSettings.UseLocalMetadata = config.GetUseLocalMetadataService();
+ credSettings.OAuthToken = oauth;
+ credSettings.SaKeyFile = config.GetSaKeyFile();
+ credSettings.IamEndpoint = config.GetIamEndpoint();
+ return credSettings;
+}
+
} // namespace NFq
diff --git a/ydb/core/fq/libs/ydb/ydb.h b/ydb/core/fq/libs/ydb/ydb.h
index 85891e336bd..200de1df104 100644
--- a/ydb/core/fq/libs/ydb/ydb.h
+++ b/ydb/core/fq/libs/ydb/ydb.h
@@ -134,33 +134,17 @@ NThreading::TFuture<NYdb::TStatus> CheckGeneration(const TGenerationContextPtr&
NThreading::TFuture<NYdb::TStatus> RollbackTransaction(const TGenerationContextPtr& context);
+NKikimr::TYdbCredentialsSettings GetYdbCredentialSettings(const NConfig::TYdbStorageConfig& config);
+
template <class TSettings>
TSettings GetClientSettings(const NConfig::TYdbStorageConfig& config,
const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory) {
- TString oauth;
- if (config.GetToken()) {
- oauth = config.GetToken();
- } else if (config.GetOAuthFile()) {
- oauth = StripString(TFileInput(config.GetOAuthFile()).ReadAll());
- } else {
- oauth = GetEnv("YDB_TOKEN");
- }
-
- const TString iamEndpoint = config.GetIamEndpoint();
- const TString saKeyFile = config.GetSaKeyFile();
-
TSettings settings;
settings
.DiscoveryEndpoint(config.GetEndpoint())
.Database(config.GetDatabase());
- NKikimr::TYdbCredentialsSettings credSettings;
- credSettings.UseLocalMetadata = config.GetUseLocalMetadataService();
- credSettings.OAuthToken = oauth;
- credSettings.SaKeyFile = config.GetSaKeyFile();
- credSettings.IamEndpoint = config.GetIamEndpoint();
-
- settings.CredentialsProviderFactory(credProviderFactory(credSettings));
+ settings.CredentialsProviderFactory(credProviderFactory(GetYdbCredentialSettings(config)));
if (config.GetUseLocalMetadataService()) {
settings.SslCredentials(NYdb::TSslCredentials(true));
diff --git a/ydb/library/ycloud/impl/grpc_service_client.h b/ydb/library/ycloud/impl/grpc_service_client.h
index d2837a69264..a9c2ff571c7 100644
--- a/ydb/library/ycloud/impl/grpc_service_client.h
+++ b/ydb/library/ycloud/impl/grpc_service_client.h
@@ -1,9 +1,10 @@
#pragma once
+#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/actors/core/log.h>
-#include <library/cpp/actors/core/actor_bootstrapped.h>
-#include <library/cpp/grpc/client/grpc_client_low.h>
#include <library/cpp/digest/crc32c/crc32c.h>
+#include <library/cpp/grpc/client/grpc_client_low.h>
+#include <ydb/core/protos/services.pb.h>
#include "grpc_service_settings.h"
#define BLOG_GRPC_D(stream) LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::GRPC_CLIENT, stream)
@@ -97,10 +98,12 @@ public:
BLOG_GRPC_DC(*actorSystem, prefix << "Status " << status);
}
auto respEv = MakeHolder<typename TCallType::TResponseEventType>();
+ const auto sender = request->Sender;
+ const auto cookie = request->Cookie;
respEv->Request = request;
respEv->Status = status;
respEv->Response = response;
- actorSystem->Send(respEv->Request->Sender, respEv.Release());
+ actorSystem->Send(sender, respEv.Release(), 0, cookie);
};
BLOG_GRPC_D(Prefix(requestId) << "Request " << Trim(TCallType::Obfuscate(request)));