aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2022-11-06 09:37:55 +0300
committerhor911 <hor911@ydb.tech>2022-11-06 09:37:55 +0300
commit5ce93e61d8e4c82be5f018623b234a572e4461dc (patch)
treeab59655e07fc444da9d172076aee0e44da348bb9
parent734abe9d61f771fc2f32c707117305271f0a14a4 (diff)
downloadydb-5ce93e61d8e4c82be5f018623b234a572e4461dc.tar.gz
V-tenants and mappings in DB
-rw-r--r--ydb/core/protos/services.proto1
-rw-r--r--ydb/core/yq/libs/CMakeLists.txt1
-rw-r--r--ydb/core/yq/libs/actors/logging/log.h53
-rw-r--r--ydb/core/yq/libs/actors/task_get.cpp16
-rw-r--r--ydb/core/yq/libs/actors/task_ping.cpp22
-rw-r--r--ydb/core/yq/libs/config/protos/control_plane_proxy.proto1
-rw-r--r--ydb/core/yq/libs/config/protos/control_plane_storage.proto2
-rw-r--r--ydb/core/yq/libs/control_plane_config/CMakeLists.txt38
-rw-r--r--ydb/core/yq/libs/control_plane_config/control_plane_config.cpp259
-rw-r--r--ydb/core/yq/libs/control_plane_config/control_plane_config.h28
-rw-r--r--ydb/core/yq/libs/control_plane_config/events/CMakeLists.txt22
-rw-r--r--ydb/core/yq/libs/control_plane_config/events/events.cpp1
-rw-r--r--ydb/core/yq/libs/control_plane_config/events/events.h68
-rw-r--r--ydb/core/yq/libs/control_plane_proxy/CMakeLists.txt1
-rw-r--r--ydb/core/yq/libs/control_plane_proxy/config.cpp5
-rw-r--r--ydb/core/yq/libs/control_plane_proxy/config.h1
-rw-r--r--ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp24
-rw-r--r--ydb/core/yq/libs/control_plane_proxy/events/events.h5
-rw-r--r--ydb/core/yq/libs/control_plane_storage/events/events.h8
-rw-r--r--ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp149
-rw-r--r--ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp7
-rw-r--r--ydb/core/yq/libs/control_plane_storage/message_builders.h23
-rw-r--r--ydb/core/yq/libs/control_plane_storage/message_builders_yq.h9
-rw-r--r--ydb/core/yq/libs/control_plane_storage/schema.h8
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp58
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h9
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp42
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp6
-rw-r--r--ydb/core/yq/libs/events/event_subspace.h1
-rw-r--r--ydb/core/yq/libs/init/CMakeLists.txt1
-rw-r--r--ydb/core/yq/libs/init/init.cpp6
-rw-r--r--ydb/core/yq/libs/private_client/loopback_service.cpp24
-rw-r--r--ydb/core/yq/libs/shared_resources/db_exec.h1
-rw-r--r--ydb/core/yq/libs/test_connection/events/events.h10
34 files changed, 780 insertions, 130 deletions
diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto
index c2c24e2f068..4dd801466b4 100644
--- a/ydb/core/protos/services.proto
+++ b/ydb/core/protos/services.proto
@@ -309,6 +309,7 @@ enum EServiceKikimr {
YQ_RATE_LIMITER = 1155;
FQ_QUOTA_PROXY = 1156;
PUBLIC_HTTP = 1157;
+ FQ_CONTROL_PLANE_CONFIG = 1158;
// 1024 - 1099 is reserved for nbs
diff --git a/ydb/core/yq/libs/CMakeLists.txt b/ydb/core/yq/libs/CMakeLists.txt
index c451003f013..15ccfbffe4a 100644
--- a/ydb/core/yq/libs/CMakeLists.txt
+++ b/ydb/core/yq/libs/CMakeLists.txt
@@ -13,6 +13,7 @@ add_subdirectory(checkpointing)
add_subdirectory(checkpointing_common)
add_subdirectory(common)
add_subdirectory(config)
+add_subdirectory(control_plane_config)
add_subdirectory(control_plane_proxy)
add_subdirectory(control_plane_storage)
add_subdirectory(db_id_async_resolver_impl)
diff --git a/ydb/core/yq/libs/actors/logging/log.h b/ydb/core/yq/libs/actors/logging/log.h
index 78270e26eda..e5923ed150d 100644
--- a/ydb/core/yq/libs/actors/logging/log.h
+++ b/ydb/core/yq/libs/actors/logging/log.h
@@ -88,38 +88,16 @@
#define LOG_STREAMS_CONTROL_PLANE_SERVICE_DEBUG(logRecordStream) LOG_STREAMS_IMPL(DEBUG, STREAMS_CONTROL_PLANE_SERVICE, logRecordStream)
#define LOG_STREAMS_CONTROL_PLANE_SERVICE_TRACE(logRecordStream) LOG_STREAMS_IMPL(TRACE, STREAMS_CONTROL_PLANE_SERVICE, logRecordStream)
-// Component: STREAMS_GRAND_LEADER_SERVICE.
-#define LOG_STREAMS_GRAND_LEADER_SERVICE_EMERG(logRecordStream) LOG_STREAMS_IMPL(EMERG, STREAMS_GRAND_LEADER_SERVICE, logRecordStream)
-#define LOG_STREAMS_GRAND_LEADER_SERVICE_ALERT(logRecordStream) LOG_STREAMS_IMPL(ALERT, STREAMS_GRAND_LEADER_SERVICE, logRecordStream)
-#define LOG_STREAMS_GRAND_LEADER_SERVICE_CRIT(logRecordStream) LOG_STREAMS_IMPL(CRIT, STREAMS_GRAND_LEADER_SERVICE, logRecordStream)
-#define LOG_STREAMS_GRAND_LEADER_SERVICE_ERROR(logRecordStream) LOG_STREAMS_IMPL(ERROR, STREAMS_GRAND_LEADER_SERVICE, logRecordStream)
-#define LOG_STREAMS_GRAND_LEADER_SERVICE_WARN(logRecordStream) LOG_STREAMS_IMPL(WARN, STREAMS_GRAND_LEADER_SERVICE, logRecordStream)
-#define LOG_STREAMS_GRAND_LEADER_SERVICE_NOTICE(logRecordStream) LOG_STREAMS_IMPL(NOTICE, STREAMS_GRAND_LEADER_SERVICE, logRecordStream)
-#define LOG_STREAMS_GRAND_LEADER_SERVICE_INFO(logRecordStream) LOG_STREAMS_IMPL(INFO, STREAMS_GRAND_LEADER_SERVICE, logRecordStream)
-#define LOG_STREAMS_GRAND_LEADER_SERVICE_DEBUG(logRecordStream) LOG_STREAMS_IMPL(DEBUG, STREAMS_GRAND_LEADER_SERVICE, logRecordStream)
-#define LOG_STREAMS_GRAND_LEADER_SERVICE_TRACE(logRecordStream) LOG_STREAMS_IMPL(TRACE, STREAMS_GRAND_LEADER_SERVICE, logRecordStream)
-
-// Component: STREAMS_META_STORAGE_SERVICE.
-#define LOG_STREAMS_META_STORAGE_SERVICE_EMERG(logRecordStream) LOG_STREAMS_IMPL(EMERG, STREAMS_META_STORAGE_SERVICE, logRecordStream)
-#define LOG_STREAMS_META_STORAGE_SERVICE_ALERT(logRecordStream) LOG_STREAMS_IMPL(ALERT, STREAMS_META_STORAGE_SERVICE, logRecordStream)
-#define LOG_STREAMS_META_STORAGE_SERVICE_CRIT(logRecordStream) LOG_STREAMS_IMPL(CRIT, STREAMS_META_STORAGE_SERVICE, logRecordStream)
-#define LOG_STREAMS_META_STORAGE_SERVICE_ERROR(logRecordStream) LOG_STREAMS_IMPL(ERROR, STREAMS_META_STORAGE_SERVICE, logRecordStream)
-#define LOG_STREAMS_META_STORAGE_SERVICE_WARN(logRecordStream) LOG_STREAMS_IMPL(WARN, STREAMS_META_STORAGE_SERVICE, logRecordStream)
-#define LOG_STREAMS_META_STORAGE_SERVICE_NOTICE(logRecordStream) LOG_STREAMS_IMPL(NOTICE, STREAMS_META_STORAGE_SERVICE, logRecordStream)
-#define LOG_STREAMS_META_STORAGE_SERVICE_INFO(logRecordStream) LOG_STREAMS_IMPL(INFO, STREAMS_META_STORAGE_SERVICE, logRecordStream)
-#define LOG_STREAMS_META_STORAGE_SERVICE_DEBUG(logRecordStream) LOG_STREAMS_IMPL(DEBUG, STREAMS_META_STORAGE_SERVICE, logRecordStream)
-#define LOG_STREAMS_META_STORAGE_SERVICE_TRACE(logRecordStream) LOG_STREAMS_IMPL(TRACE, STREAMS_META_STORAGE_SERVICE, logRecordStream)
-
-// Component: STREAMS_GRAPH_LEADER.
-#define LOG_STREAMS_GRAPH_LEADER_EMERG(logRecordStream) LOG_STREAMS_IMPL(EMERG, STREAMS_GRAPH_LEADER, logRecordStream)
-#define LOG_STREAMS_GRAPH_LEADER_ALERT(logRecordStream) LOG_STREAMS_IMPL(ALERT, STREAMS_GRAPH_LEADER, logRecordStream)
-#define LOG_STREAMS_GRAPH_LEADER_CRIT(logRecordStream) LOG_STREAMS_IMPL(CRIT, STREAMS_GRAPH_LEADER, logRecordStream)
-#define LOG_STREAMS_GRAPH_LEADER_ERROR(logRecordStream) LOG_STREAMS_IMPL(ERROR, STREAMS_GRAPH_LEADER, logRecordStream)
-#define LOG_STREAMS_GRAPH_LEADER_WARN(logRecordStream) LOG_STREAMS_IMPL(WARN, STREAMS_GRAPH_LEADER, logRecordStream)
-#define LOG_STREAMS_GRAPH_LEADER_NOTICE(logRecordStream) LOG_STREAMS_IMPL(NOTICE, STREAMS_GRAPH_LEADER, logRecordStream)
-#define LOG_STREAMS_GRAPH_LEADER_INFO(logRecordStream) LOG_STREAMS_IMPL(INFO, STREAMS_GRAPH_LEADER, logRecordStream)
-#define LOG_STREAMS_GRAPH_LEADER_DEBUG(logRecordStream) LOG_STREAMS_IMPL(DEBUG, STREAMS_GRAPH_LEADER, logRecordStream)
-#define LOG_STREAMS_GRAPH_LEADER_TRACE(logRecordStream) LOG_STREAMS_IMPL(TRACE, STREAMS_GRAPH_LEADER, logRecordStream)
+// Component: STREAMS_GRAPH_LEADER.
+#define LOG_STREAMS_GRAPH_LEADER_EMERG(logRecordStream) LOG_STREAMS_IMPL(EMERG, STREAMS_GRAPH_LEADER, logRecordStream)
+#define LOG_STREAMS_GRAPH_LEADER_ALERT(logRecordStream) LOG_STREAMS_IMPL(ALERT, STREAMS_GRAPH_LEADER, logRecordStream)
+#define LOG_STREAMS_GRAPH_LEADER_CRIT(logRecordStream) LOG_STREAMS_IMPL(CRIT, STREAMS_GRAPH_LEADER, logRecordStream)
+#define LOG_STREAMS_GRAPH_LEADER_ERROR(logRecordStream) LOG_STREAMS_IMPL(ERROR, STREAMS_GRAPH_LEADER, logRecordStream)
+#define LOG_STREAMS_GRAPH_LEADER_WARN(logRecordStream) LOG_STREAMS_IMPL(WARN, STREAMS_GRAPH_LEADER, logRecordStream)
+#define LOG_STREAMS_GRAPH_LEADER_NOTICE(logRecordStream) LOG_STREAMS_IMPL(NOTICE, STREAMS_GRAPH_LEADER, logRecordStream)
+#define LOG_STREAMS_GRAPH_LEADER_INFO(logRecordStream) LOG_STREAMS_IMPL(INFO, STREAMS_GRAPH_LEADER, logRecordStream)
+#define LOG_STREAMS_GRAPH_LEADER_DEBUG(logRecordStream) LOG_STREAMS_IMPL(DEBUG, STREAMS_GRAPH_LEADER, logRecordStream)
+#define LOG_STREAMS_GRAPH_LEADER_TRACE(logRecordStream) LOG_STREAMS_IMPL(TRACE, STREAMS_GRAPH_LEADER, logRecordStream)
// Component: YQ_CONTROL_PLANE_STORAGE.
#define LOG_YQ_CONTROL_PLANE_STORAGE_EMERG(logRecordStream) LOG_STREAMS_IMPL(EMERG, YQ_CONTROL_PLANE_STORAGE, logRecordStream)
@@ -196,3 +174,14 @@
#define LOG_YQ_HEALTH_INFO(logRecordStream) LOG_STREAMS_IMPL(INFO, YQ_HEALTH, logRecordStream)
#define LOG_YQ_HEALTH_DEBUG(logRecordStream) LOG_STREAMS_IMPL(DEBUG, YQ_HEALTH, logRecordStream)
#define LOG_YQ_HEALTH_TRACE(logRecordStream) LOG_STREAMS_IMPL(TRACE, YQ_HEALTH, logRecordStream)
+
+// Component: FQ_CONTROL_PLANE_CONFIG.
+#define LOG_FQ_CONTROL_PLANE_CONFIG_EMERG(logRecordStream) LOG_STREAMS_IMPL(EMERG, FQ_CONTROL_PLANE_CONFIG, logRecordStream)
+#define LOG_FQ_CONTROL_PLANE_CONFIG_ALERT(logRecordStream) LOG_STREAMS_IMPL(ALERT, FQ_CONTROL_PLANE_CONFIG, logRecordStream)
+#define LOG_FQ_CONTROL_PLANE_CONFIG_CRIT(logRecordStream) LOG_STREAMS_IMPL(CRIT, FQ_CONTROL_PLANE_CONFIG, logRecordStream)
+#define LOG_FQ_CONTROL_PLANE_CONFIG_ERROR(logRecordStream) LOG_STREAMS_IMPL(ERROR, FQ_CONTROL_PLANE_CONFIG, logRecordStream)
+#define LOG_FQ_CONTROL_PLANE_CONFIG_WARN(logRecordStream) LOG_STREAMS_IMPL(WARN, FQ_CONTROL_PLANE_CONFIG, logRecordStream)
+#define LOG_FQ_CONTROL_PLANE_CONFIG_NOTICE(logRecordStream) LOG_STREAMS_IMPL(NOTICE, FQ_CONTROL_PLANE_CONFIG, logRecordStream)
+#define LOG_FQ_CONTROL_PLANE_CONFIG_INFO(logRecordStream) LOG_STREAMS_IMPL(INFO, FQ_CONTROL_PLANE_CONFIG, logRecordStream)
+#define LOG_FQ_CONTROL_PLANE_CONFIG_DEBUG(logRecordStream) LOG_STREAMS_IMPL(DEBUG, FQ_CONTROL_PLANE_CONFIG, logRecordStream)
+#define LOG_FQ_CONTROL_PLANE_CONFIG_TRACE(logRecordStream) LOG_STREAMS_IMPL(TRACE, FQ_CONTROL_PLANE_CONFIG, logRecordStream)
diff --git a/ydb/core/yq/libs/actors/task_get.cpp b/ydb/core/yq/libs/actors/task_get.cpp
index c4eec33e2f8..607eb4a0962 100644
--- a/ydb/core/yq/libs/actors/task_get.cpp
+++ b/ydb/core/yq/libs/actors/task_get.cpp
@@ -13,6 +13,7 @@
#include <ydb/core/yq/libs/common/entity_id.h>
+#include <ydb/core/yq/libs/control_plane_config/control_plane_config.h>
#include <ydb/core/yq/libs/control_plane_storage/control_plane_storage.h>
#include <ydb/core/yq/libs/control_plane_storage/events/events.h>
#include <ydb/library/security/util.h>
@@ -83,13 +84,12 @@ public:
void Bootstrap() {
Become(&TGetTaskRequestActor::StateFunc);
auto request = Ev->Record;
- LOG_D("Request CP::GetTask with size: " << request.ByteSize() << " bytes");
- RequestedMBytes->Collect(request.ByteSize() / 1024 / 1024);
OwnerId = request.owner_id();
Host = request.host();
Tenant = request.tenant();
- Send(NYq::ControlPlaneStorageServiceActorId(),
- new NYq::TEvControlPlaneStorage::TEvGetTaskRequest(std::move(request)));
+ LOG_D("Request CP::GetTask with size: " << request.ByteSize() << " bytes");
+ RequestedMBytes->Collect(request.ByteSize() / 1024 / 1024);
+ Send(ControlPlaneConfigActorId(), new TEvControlPlaneConfig::TEvGetTenantInfoRequest());
}
private:
@@ -128,12 +128,20 @@ private:
}
}
+ void Handle(TEvControlPlaneConfig::TEvGetTenantInfoResponse::TPtr& ev) {
+ auto request = Ev->Record;
+ auto event = std::make_unique<NYq::TEvControlPlaneStorage::TEvGetTaskRequest>(std::move(request));
+ event->TenantInfo = ev->Get()->TenantInfo;
+ Send(NYq::ControlPlaneStorageServiceActorId(), event.release());
+ }
+
private:
STRICT_STFUNC(
StateFunc,
cFunc(NActors::TEvents::TEvPoison::EventType, PassAway)
hFunc(NActors::TEvents::TEvUndelivered, OnUndelivered)
hFunc(NYq::TEvControlPlaneStorage::TEvGetTaskResponse, HandleResponse)
+ hFunc(TEvControlPlaneConfig::TEvGetTenantInfoResponse, Handle)
)
const NConfig::TTokenAccessorConfig TokenAccessorConfig;
diff --git a/ydb/core/yq/libs/actors/task_ping.cpp b/ydb/core/yq/libs/actors/task_ping.cpp
index d7b9cf1a7ff..d35e64918cb 100644
--- a/ydb/core/yq/libs/actors/task_ping.cpp
+++ b/ydb/core/yq/libs/actors/task_ping.cpp
@@ -11,8 +11,10 @@
#include <library/cpp/actors/core/log.h>
#include <library/cpp/protobuf/interop/cast.h>
+#include <ydb/core/yq/libs/control_plane_config/control_plane_config.h>
#include <ydb/core/yq/libs/control_plane_storage/control_plane_storage.h>
#include <ydb/core/yq/libs/control_plane_storage/events/events.h>
+
#include <google/protobuf/util/time_util.h>
#define LOG_E(stream) \
@@ -82,13 +84,7 @@ public:
Deadline = NProtoInterop::CastFromProto(req.deadline());
LOG_D("Request CP::PingTask with size: " << req.ByteSize() << " bytes");
RequestedMBytes->Collect(req.ByteSize() / 1024 / 1024);
- try {
- auto event = CreateControlPlaneEvent();
- Send(NYq::ControlPlaneStorageServiceActorId(), event.release());
- } catch (const std::exception& err) {
- const auto msg = TStringBuilder() << "PingTask Boostrap Error: " << CurrentExceptionMessage();
- Fail(msg);
- }
+ Send(ControlPlaneConfigActorId(), new TEvControlPlaneConfig::TEvGetTenantInfoRequest());
}
private:
@@ -97,6 +93,7 @@ private:
cFunc(NActors::TEvents::TEvPoison::EventType, PassAway)
hFunc(NYq::TEvControlPlaneStorage::TEvPingTaskResponse, HandleResponse)
hFunc(NActors::TEvents::TEvUndelivered, OnUndelivered)
+ hFunc(TEvControlPlaneConfig::TEvGetTenantInfoResponse, Handle);
)
std::unique_ptr<NYq::TEvControlPlaneStorage::TEvPingTaskRequest> CreateControlPlaneEvent() {
@@ -120,6 +117,17 @@ private:
PassAway();
}
+ void Handle(TEvControlPlaneConfig::TEvGetTenantInfoResponse::TPtr& ev) {
+ try {
+ auto event = CreateControlPlaneEvent();
+ event->TenantInfo = ev->Get()->TenantInfo;
+ Send(NYq::ControlPlaneStorageServiceActorId(), event.release());
+ } catch (const std::exception& err) {
+ const auto msg = TStringBuilder() << "PingTask Boostrap Error: " << CurrentExceptionMessage();
+ Fail(msg);
+ }
+ }
+
private:
const TActorId Sender;
TIntrusivePtr<ITimeProvider> TimeProvider;
diff --git a/ydb/core/yq/libs/config/protos/control_plane_proxy.proto b/ydb/core/yq/libs/config/protos/control_plane_proxy.proto
index b7fa0c54647..f4ce0db813c 100644
--- a/ydb/core/yq/libs/config/protos/control_plane_proxy.proto
+++ b/ydb/core/yq/libs/config/protos/control_plane_proxy.proto
@@ -11,4 +11,5 @@ message TControlPlaneProxyConfig {
string RequestTimeout = 2;
bool EnablePermissions = 3;
string MetricsTtl = 4;
+ string ConfigRetryPeriod = 31;
}
diff --git a/ydb/core/yq/libs/config/protos/control_plane_storage.proto b/ydb/core/yq/libs/config/protos/control_plane_storage.proto
index f2417d42444..20d0e66814c 100644
--- a/ydb/core/yq/libs/config/protos/control_plane_storage.proto
+++ b/ydb/core/yq/libs/config/protos/control_plane_storage.proto
@@ -67,4 +67,6 @@ message TControlPlaneStorageConfig {
repeated TRetryPolicyMapping RetryPolicyMapping = 26;
string QuotaTtl = 28;
string MetricsTtl = 29;
+ bool UseDbMapping = 30;
+ string DbReloadPeriod = 31;
}
diff --git a/ydb/core/yq/libs/control_plane_config/CMakeLists.txt b/ydb/core/yq/libs/control_plane_config/CMakeLists.txt
new file mode 100644
index 00000000000..f36c4d681b8
--- /dev/null
+++ b/ydb/core/yq/libs/control_plane_config/CMakeLists.txt
@@ -0,0 +1,38 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(events)
+
+add_library(yq-libs-control_plane_config)
+target_compile_options(yq-libs-control_plane_config PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(yq-libs-control_plane_config PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ cpp-lwtrace-mon
+ monlib-service-pages
+ ydb-core-base
+ ydb-core-mon
+ yq-libs-common
+ yq-libs-config
+ libs-control_plane_config-events
+ yq-libs-quota_manager
+ libs-quota_manager-events
+ libs-rate_limiter-events
+ yq-libs-ydb
+ ydb-library-security
+ cpp-client-ydb_scheme
+ cpp-client-ydb_value
+ ydb-library-protobuf_printer
+ yql-public-issue
+)
+target_sources(yq-libs-control_plane_config PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/control_plane_config/control_plane_config.cpp
+)
diff --git a/ydb/core/yq/libs/control_plane_config/control_plane_config.cpp b/ydb/core/yq/libs/control_plane_config/control_plane_config.cpp
new file mode 100644
index 00000000000..6f22f18875c
--- /dev/null
+++ b/ydb/core/yq/libs/control_plane_config/control_plane_config.cpp
@@ -0,0 +1,259 @@
+#include "control_plane_config.h"
+
+#include <ydb/core/yq/libs/actors/logging/log.h>
+#include <ydb/core/yq/libs/config/yq_issue.h>
+#include <ydb/core/yq/libs/common/cache.h>
+#include <ydb/core/yq/libs/common/entity_id.h>
+#include <ydb/core/yq/libs/control_plane_storage/control_plane_storage.h>
+#include <ydb/core/yq/libs/control_plane_storage/events/events.h>
+#include <ydb/core/yq/libs/control_plane_storage/util.h>
+#include <ydb/core/yq/libs/quota_manager/quota_manager.h>
+#include <ydb/core/yq/libs/shared_resources/db_exec.h>
+#include <ydb/core/yq/libs/test_connection/events/events.h>
+//#include <ydb/core/yq/libs/test_connection/test_connection.h>
+#include <ydb/core/yq/libs/ydb/util.h>
+#include <ydb/core/yq/libs/ydb/ydb.h>
+#include <ydb/core/yq/libs/control_plane_storage/schema.h>
+#include <ydb/core/yq/libs/db_schema/db_schema.h>
+#include <ydb/core/yq/libs/quota_manager/quota_manager.h>
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/actor.h>
+
+#include <ydb/core/base/kikimr_issue.h>
+#include <ydb/library/yql/public/issue/yql_issue_message.h>
+#include <ydb/library/security/util.h>
+#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
+
+#include <util/generic/maybe.h>
+#include <util/generic/ptr.h>
+#include <util/datetime/base.h>
+#include <util/digest/multi.h>
+#include <util/generic/yexception.h>
+#include <util/string/join.h>
+#include <util/system/hostname.h>
+
+namespace NYq {
+
+using TTenantExecuter = TDbExecuter<TTenantInfo::TPtr>;
+using TStateTimeExecuter = TDbExecuter<TInstant>;
+
+class TControlPlaneConfigActor : public NActors::TActorBootstrapped<TControlPlaneConfigActor> {
+
+ ::NYq::TYqSharedResources::TPtr YqSharedResources;
+ NKikimr::TYdbCredentialsProviderFactory CredProviderFactory;
+ TYdbConnectionPtr YdbConnection;
+ TDbPool::TPtr DbPool;
+ ::NMonitoring::TDynamicCounterPtr Counters;
+ NConfig::TControlPlaneStorageConfig Config;
+ TTenantInfo::TPtr TenantInfo;
+ bool LoadInProgress = false;
+ TDuration DbReloadPeriod;
+
+public:
+ TControlPlaneConfigActor(const ::NYq::TYqSharedResources::TPtr& yqSharedResources, const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory, const NConfig::TControlPlaneStorageConfig& config, const ::NMonitoring::TDynamicCounterPtr& counters)
+ : YqSharedResources(yqSharedResources)
+ , CredProviderFactory(credProviderFactory)
+ , Counters(counters)
+ , Config(config)
+ {
+ DbReloadPeriod = GetDuration(Config.GetDbReloadPeriod(), TDuration::Seconds(3));
+ }
+
+ static constexpr char ActorName[] = "FQ_CONTROL_PLANE_CONFIG";
+
+ void Bootstrap() {
+ CPC_LOG_D("STARTING: " << SelfId());
+ Become(&TControlPlaneConfigActor::StateFunc);
+ YdbConnection = NewYdbConnection(Config.GetStorage(), CredProviderFactory, YqSharedResources->CoreYdbDriver);
+ DbPool = YqSharedResources->DbPoolHolder->GetOrCreate(EDbPoolId::MAIN, 10, YdbConnection->TablePathPrefix);
+ if (Config.GetUseDbMapping()) {
+ Schedule(TDuration::Zero(), new NActors::TEvents::TEvWakeup());
+ } else {
+ TenantInfo.reset(new TTenantInfo());
+ const auto& mapping = Config.GetMapping();
+ for (const auto& cloudToTenant : mapping.GetCloudIdToTenantName()) {
+ TenantInfo->SubjectMapping[SUBJECT_TYPE_CLOUD].emplace(cloudToTenant.GetKey(), cloudToTenant.GetValue());
+ TenantInfo->TenantMapping.emplace(cloudToTenant.GetValue(), cloudToTenant.GetValue());
+ }
+ for (const auto& commonTenantName : mapping.GetCommonTenantName()) {
+ TenantInfo->TenantMapping.emplace(commonTenantName, commonTenantName);
+ TenantInfo->CommonVTenants.push_back(commonTenantName);
+ }
+ }
+ }
+
+private:
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(NActors::TEvents::TEvWakeup, Handle);
+ hFunc(TEvents::TEvCallback, [](TEvents::TEvCallback::TPtr& ev) { ev->Get()->Callback(); } );
+ hFunc(TEvControlPlaneStorage::TEvGetTaskResponse, Handle); // self ping response
+ hFunc(TEvControlPlaneConfig::TEvGetTenantInfoRequest, Handle);
+ )
+
+ void Handle(NActors::TEvents::TEvWakeup::TPtr&) {
+ if (!LoadInProgress) {
+ LoadTenantsAndMapping();
+ }
+ Schedule(DbReloadPeriod, new NActors::TEvents::TEvWakeup());
+ }
+
+ void Handle(TEvControlPlaneStorage::TEvGetTaskResponse::TPtr& ev) {
+ if (ev->Get()->Issues) {
+ CPC_LOG_E("TEvGetTaskResponse (Self Ping): " << ev->Get()->Issues.ToOneLineString());
+ } else if (ev->Get()->Record.tasks().size()) {
+ CPC_LOG_E("TEvGetTaskResponse (Self Ping) returned : " << ev->Get()->Record.tasks().size() << " tasks, empty list expected");
+ }
+ }
+
+ void Handle(TEvControlPlaneConfig::TEvGetTenantInfoRequest::TPtr& ev) {
+ if (!TenantInfo) {
+ CPC_LOG_W("TEvGetTenantInfoRequest: IS NOT READY yet");
+ }
+ Send(ev->Sender, new TEvControlPlaneConfig::TEvGetTenantInfoResponse(TenantInfo), 0, ev->Cookie);
+ }
+
+ void LoadTenantsAndMapping() {
+
+ LoadInProgress = true;
+ TDbExecutable::TPtr executable;
+ auto& executer = TTenantExecuter::Create(executable, true, [](TTenantExecuter& executer) { executer.State.reset(new TTenantInfo()); } );
+
+ executer.Read(
+ [=](TTenantExecuter&, TSqlQueryBuilder& builder) {
+ builder.AddText(
+ "SELECT `" TENANT_COLUMN_NAME "`, `" VTENANT_COLUMN_NAME "`, `" COMMON_COLUMN_NAME "`, `" STATE_COLUMN_NAME "`, `" STATE_TIME_COLUMN_NAME "`\n"
+ "FROM `" TENANTS_TABLE_NAME "`;\n"
+ "SELECT `" SUBJECT_TYPE_COLUMN_NAME "`, `" SUBJECT_ID_COLUMN_NAME "`, `" VTENANT_COLUMN_NAME "`\n"
+ "FROM `" MAPPINGS_TABLE_NAME "`;\n"
+ );
+ },
+ [=](TTenantExecuter& executer, const TVector<NYdb::TResultSet>& resultSets) {
+
+ auto& info = *executer.State;
+
+ {
+ info.CommonVTenants.clear();
+ TResultSetParser parser(resultSets.front());
+ while (parser.TryNextRow()) {
+ auto tenant = *parser.ColumnParser(TENANT_COLUMN_NAME).GetOptionalString();
+ auto vtenantColumn = parser.ColumnParser(VTENANT_COLUMN_NAME).GetOptionalString();
+ TString vtenant = vtenantColumn ? *vtenantColumn : "";
+ auto common = *parser.ColumnParser(COMMON_COLUMN_NAME).GetOptionalBool();
+ auto state = *parser.ColumnParser(STATE_COLUMN_NAME).GetOptionalUint32();
+ auto stateTime = *parser.ColumnParser(STATE_TIME_COLUMN_NAME).GetOptionalTimestamp();
+
+ info.TenantState.emplace(tenant, state);
+ if (info.StateTime < stateTime) {
+ info.StateTime = stateTime;
+ }
+
+ if (vtenant) {
+ info.TenantMapping.emplace(vtenant, tenant);
+ }
+
+ if (vtenant && common) {
+ info.CommonVTenants.push_back(vtenant);
+ }
+ }
+ }
+
+ {
+ TResultSetParser parser(resultSets[1]);
+ while (parser.TryNextRow()) {
+ auto subject_type = *parser.ColumnParser(SUBJECT_TYPE_COLUMN_NAME).GetOptionalString();
+ auto subject_id = *parser.ColumnParser(SUBJECT_ID_COLUMN_NAME).GetOptionalString();
+ auto vtenant = *parser.ColumnParser(VTENANT_COLUMN_NAME).GetOptionalString();
+ info.SubjectMapping[subject_type].emplace(subject_id, vtenant);
+ }
+ }
+ },
+ "ReadTenants", true
+ ).Process(SelfId(),
+ [=, this](TTenantExecuter& executer) {
+ if (executer.State->CommonVTenants.size()) {
+ std::sort(executer.State->CommonVTenants.begin(), executer.State->CommonVTenants.end());
+ }
+ bool refreshed = !this->TenantInfo || (this->TenantInfo->StateTime < executer.State->StateTime);
+ auto oldInfo = this->TenantInfo;
+ this->TenantInfo = executer.State;
+
+ if (refreshed) {
+ CPC_LOG_D("LOADED TenantInfo: State CHANGED at " << this->TenantInfo->StateTime);
+ } else {
+ CPC_LOG_T("LOADED TenantInfo: State NOT changed");
+ }
+
+ if (refreshed) {
+ // write ack only after all members are assigned
+ TDbExecutable::TPtr executable;
+ auto& executer = TStateTimeExecuter::Create(executable, true, nullptr);
+ executer.State = this->TenantInfo->StateTime;
+
+ executer.Write(
+ [=, nodeId = this->SelfId().NodeId()](TStateTimeExecuter& executer, TSqlQueryBuilder& builder) {
+ builder.AddUint32("node_id", nodeId);
+ builder.AddTimestamp("state_time", executer.State);
+ builder.AddText(
+ "UPSERT INTO `" TENANT_ACKS_TABLE_NAME "` (`" NODE_ID_COLUMN_NAME "`, `" STATE_TIME_COLUMN_NAME "`)\n"
+ " VALUES ($node_id, $state_time);\n"
+ );
+ },
+ "WriteStateTime", true
+ );
+
+ if (oldInfo) {
+ executer.Process(SelfId(),
+ [this, oldInfo=oldInfo](TStateTimeExecuter&) {
+ this->ReflectTenantChanges(oldInfo);
+ }
+ );
+ }
+
+ Exec(DbPool, executable);
+ }
+
+ LoadInProgress = false;
+ }
+ );
+
+ Exec(DbPool, executable);
+ }
+
+ void ReflectTenantChanges(TTenantInfo::TPtr oldInfo) {
+ for (auto& p : TenantInfo->TenantState) {
+ auto& tenant = p.first;
+ auto state = p.second;
+ if (oldInfo->TenantState.Value(tenant, state) != state) {
+ CPC_LOG_D("Tenant " << tenant << " state CHANGED to " << state);
+ switch (state) {
+ case TenantState::Idle:
+ ReassignPending(tenant);
+ break;
+ }
+ }
+ }
+ }
+
+ void ReassignPending(const TString& tenant) {
+ Fq::Private::GetTaskRequest request;
+ request.set_tenant(tenant);
+ request.set_owner_id("Self-Ping-To-Move-Jobs");
+ request.set_host(HostName());
+ auto event = std::make_unique<TEvControlPlaneStorage::TEvGetTaskRequest>(std::move(request));
+ event->TenantInfo = TenantInfo;
+ Send(ControlPlaneStorageServiceActorId(), event.release());
+ }
+};
+
+TActorId ControlPlaneConfigActorId() {
+ constexpr TStringBuf name = "FQCTLCFG";
+ return NActors::TActorId(0, name);
+}
+
+IActor* CreateControlPlaneConfigActor(const ::NYq::TYqSharedResources::TPtr& yqSharedResources, const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory, const NConfig::TControlPlaneStorageConfig& config, const ::NMonitoring::TDynamicCounterPtr& counters) {
+ return new TControlPlaneConfigActor(yqSharedResources, credProviderFactory, config, counters);
+}
+
+} // namespace NYq
diff --git a/ydb/core/yq/libs/control_plane_config/control_plane_config.h b/ydb/core/yq/libs/control_plane_config/control_plane_config.h
new file mode 100644
index 00000000000..6894f6bc2e2
--- /dev/null
+++ b/ydb/core/yq/libs/control_plane_config/control_plane_config.h
@@ -0,0 +1,28 @@
+#pragma once
+
+#include <ydb/core/yq/libs/actors/logging/log.h>
+#include <ydb/core/yq/libs/config/protos/control_plane_storage.pb.h>
+#include <ydb/core/yq/libs/shared_resources/shared_resources.h>
+
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+
+
+#define CPC_LOG_D(s) \
+ LOG_FQ_CONTROL_PLANE_CONFIG_DEBUG(s)
+#define CPC_LOG_I(s) \
+ LOG_FQ_CONTROL_PLANE_CONFIG_INFO(s)
+#define CPC_LOG_W(s) \
+ LOG_FQ_CONTROL_PLANE_CONFIG_WARN(s)
+#define CPC_LOG_E(s) \
+ LOG_FQ_CONTROL_PLANE_CONFIG_ERROR(s)
+#define CPC_LOG_T(s) \
+ LOG_FQ_CONTROL_PLANE_CONFIG_TRACE(s)
+
+namespace NYq {
+
+NActors::TActorId ControlPlaneConfigActorId();
+
+NActors::IActor* CreateControlPlaneConfigActor(const ::NYq::TYqSharedResources::TPtr& yqSharedResources, const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory, const NConfig::TControlPlaneStorageConfig& config, const ::NMonitoring::TDynamicCounterPtr& counters);
+
+} // namespace NYq
diff --git a/ydb/core/yq/libs/control_plane_config/events/CMakeLists.txt b/ydb/core/yq/libs/control_plane_config/events/CMakeLists.txt
new file mode 100644
index 00000000000..b33491aad2e
--- /dev/null
+++ b/ydb/core/yq/libs/control_plane_config/events/CMakeLists.txt
@@ -0,0 +1,22 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(libs-control_plane_config-events)
+target_link_libraries(libs-control_plane_config-events PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ cpp-actors-interconnect
+ libs-control_plane_storage-proto
+ yq-libs-events
+ libs-quota_manager-events
+)
+target_sources(libs-control_plane_config-events PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/control_plane_config/events/events.cpp
+)
diff --git a/ydb/core/yq/libs/control_plane_config/events/events.cpp b/ydb/core/yq/libs/control_plane_config/events/events.cpp
new file mode 100644
index 00000000000..6c3d2603e7e
--- /dev/null
+++ b/ydb/core/yq/libs/control_plane_config/events/events.cpp
@@ -0,0 +1 @@
+#include "events.h"
diff --git a/ydb/core/yq/libs/control_plane_config/events/events.h b/ydb/core/yq/libs/control_plane_config/events/events.h
new file mode 100644
index 00000000000..67ff3df811b
--- /dev/null
+++ b/ydb/core/yq/libs/control_plane_config/events/events.h
@@ -0,0 +1,68 @@
+#pragma once
+
+#include <ydb/core/yq/libs/events/event_subspace.h>
+#include <ydb/core/yq/libs/quota_manager/events/events.h>
+
+#include <ydb/public/api/protos/yq.pb.h>
+
+#include <library/cpp/actors/core/event_pb.h>
+#include <library/cpp/actors/core/events.h>
+#include <library/cpp/actors/interconnect/events_local.h>
+
+#include <ydb/library/yql/public/issue/yql_issue.h>
+
+#include <util/digest/multi.h>
+
+namespace NYq {
+
+enum TenantState : ui64 {
+ Active = 0,
+ Pending = 1,
+ Idle = 2,
+};
+
+struct TTenantInfo {
+
+ using TPtr = std::shared_ptr<TTenantInfo>;
+
+ THashMap<TString /* subject type */, THashMap<TString /* subject id */, TString /* vtenant */>> SubjectMapping;
+ TVector<TString> CommonVTenants;
+ THashMap<TString /* vtenant */, TString /* tenant */> TenantMapping;
+ THashMap<TString /* tenant */, ui32 /* state */> TenantState;
+ TInstant StateTime;
+
+ TString Assign(const TString& cloudId, const TString& /* scope */, const TString& DefaultTenantName = "") {
+
+ auto vTenant = SubjectMapping[SUBJECT_TYPE_CLOUD].Value(cloudId, "");
+ if (!vTenant && CommonVTenants.size()) {
+ vTenant = CommonVTenants[MultiHash(cloudId) % CommonVTenants.size()];
+ }
+
+ auto tenant = vTenant ? TenantMapping.Value(vTenant, DefaultTenantName) : DefaultTenantName;
+ // CPS_LOG_D("AssignTenantName: {" << cloudId << ", " << scope << "} => " << tenant);
+ // Cerr << "AssignTenantName: {" << cloudId << ", " << scope << "} => " << tenant << Endl;
+ return tenant;
+ }
+};
+
+struct TEvControlPlaneConfig {
+ // Event ids.
+ enum EEv : ui32 {
+ EvGetTenantInfoRequest = YqEventSubspaceBegin(NYq::TYqEventSubspace::ControlPlaneConfig),
+ EvGetTenantInfoResponse,
+ EvEnd,
+ };
+
+ static_assert(EvEnd <= YqEventSubspaceEnd(NYq::TYqEventSubspace::ControlPlaneConfig), "All events must be in their subspace");
+
+ struct TEvGetTenantInfoRequest : NActors::TEventLocal<TEvGetTenantInfoRequest, EvGetTenantInfoRequest> {
+ };
+
+ struct TEvGetTenantInfoResponse : NActors::TEventLocal<TEvGetTenantInfoResponse, EvGetTenantInfoResponse> {
+ TEvGetTenantInfoResponse(TTenantInfo::TPtr tenantInfo) : TenantInfo(tenantInfo) {
+ }
+ TTenantInfo::TPtr TenantInfo;
+ };
+};
+
+}
diff --git a/ydb/core/yq/libs/control_plane_proxy/CMakeLists.txt b/ydb/core/yq/libs/control_plane_proxy/CMakeLists.txt
index 1fb348a7995..5b7328b0288 100644
--- a/ydb/core/yq/libs/control_plane_proxy/CMakeLists.txt
+++ b/ydb/core/yq/libs/control_plane_proxy/CMakeLists.txt
@@ -21,6 +21,7 @@ target_link_libraries(yq-libs-control_plane_proxy PUBLIC
ydb-core-mon
libs-actors-logging
yq-libs-actors
+ yq-libs-control_plane_config
libs-control_plane_proxy-events
yq-libs-control_plane_storage
ydb-library-folder_service
diff --git a/ydb/core/yq/libs/control_plane_proxy/config.cpp b/ydb/core/yq/libs/control_plane_proxy/config.cpp
index f89cf1e2019..afd948cc645 100644
--- a/ydb/core/yq/libs/control_plane_proxy/config.cpp
+++ b/ydb/core/yq/libs/control_plane_proxy/config.cpp
@@ -21,6 +21,10 @@ NConfig::TControlPlaneProxyConfig FillDefaultParameters(NConfig::TControlPlanePr
config.SetMetricsTtl("1d");
}
+ if (!config.GetConfigRetryPeriod()) {
+ config.SetConfigRetryPeriod("100ms");
+ }
+
return config;
}
@@ -30,6 +34,7 @@ TControlPlaneProxyConfig::TControlPlaneProxyConfig(const NConfig::TControlPlaneP
: Proto(FillDefaultParameters(config))
, RequestTimeout(GetDuration(Proto.GetRequestTimeout(), TDuration::Seconds(30)))
, MetricsTtl(GetDuration(Proto.GetMetricsTtl(), TDuration::Days(1)))
+ , ConfigRetryPeriod(GetDuration(Proto.GetConfigRetryPeriod(), TDuration::MilliSeconds(100)))
{
}
diff --git a/ydb/core/yq/libs/control_plane_proxy/config.h b/ydb/core/yq/libs/control_plane_proxy/config.h
index 8cb2d46dacf..af6342cfc12 100644
--- a/ydb/core/yq/libs/control_plane_proxy/config.h
+++ b/ydb/core/yq/libs/control_plane_proxy/config.h
@@ -10,6 +10,7 @@ struct TControlPlaneProxyConfig {
NConfig::TControlPlaneProxyConfig Proto;
TDuration RequestTimeout;
TDuration MetricsTtl;
+ TDuration ConfigRetryPeriod;
TControlPlaneProxyConfig(const NConfig::TControlPlaneProxyConfig& config);
};
diff --git a/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp b/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp
index 57013f959b2..ebc34c2b6cf 100644
--- a/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp
+++ b/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp
@@ -5,6 +5,7 @@
#include <ydb/core/yq/libs/actors/logging/log.h>
#include <ydb/core/yq/libs/common/cache.h>
+#include <ydb/core/yq/libs/control_plane_config/control_plane_config.h>
#include <ydb/core/yq/libs/control_plane_storage/control_plane_storage.h>
#include <ydb/core/yq/libs/control_plane_storage/events/events.h>
#include <ydb/core/yq/libs/control_plane_storage/util.h>
@@ -16,6 +17,7 @@
#include <ydb/core/yq/libs/config/yq_issue.h>
#include <ydb/core/yq/libs/control_plane_proxy/events/events.h>
+#include <ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/actor.h>
@@ -321,6 +323,7 @@ class TRequestActor : public NActors::TActorBootstrapped<TRequestActor<TRequestP
using TBase::Send;
using TBase::PassAway;
using TBase::Become;
+ using TBase::Schedule;
::NYq::TControlPlaneProxyConfig Config;
TRequestProto RequestProto;
@@ -333,10 +336,11 @@ class TRequestActor : public NActors::TActorBootstrapped<TRequestActor<TRequestP
TActorId ServiceId;
TRequestCounters Counters;
TInstant StartTime;
- std::function<void(const TDuration&, bool, bool)> Probe;
+ std::function<void(const TDuration&, bool /* isSuccess */, bool /* isTimeout */)> Probe;
TPermissions Permissions;
TString CloudId;
TQuotaMap Quotas;
+ ui32 RetryCount = 0;
public:
static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_REQUEST_ACTOR";
@@ -373,14 +377,30 @@ public:
void Bootstrap() {
CPP_LOG_T("Request actor. Actor id: " << SelfId());
Become(&TRequestActor::StateFunc, Config.RequestTimeout, new NActors::TEvents::TEvWakeup());
- Send(ServiceId, new TRequest(Scope, RequestProto, User, Token, CloudId, Permissions, Quotas), 0, Cookie);
+ Send(ControlPlaneConfigActorId(), new TEvControlPlaneConfig::TEvGetTenantInfoRequest());
}
STRICT_STFUNC(StateFunc,
cFunc(NActors::TEvents::TSystem::Wakeup, HandleTimeout);
hFunc(TResponse, Handle);
+ cFunc(TEvControlPlaneConfig::EvGetTenantInfoRequest, HandleRetry);
+ hFunc(TEvControlPlaneConfig::TEvGetTenantInfoResponse, Handle);
)
+ void HandleRetry() {
+ Send(ControlPlaneConfigActorId(), new TEvControlPlaneConfig::TEvGetTenantInfoRequest());
+ }
+
+ void Handle(TEvControlPlaneConfig::TEvGetTenantInfoResponse::TPtr& ev) {
+ auto tenantInfo = ev->Get()->TenantInfo;
+ if (tenantInfo) {
+ Send(ServiceId, new TRequest(Scope, RequestProto, User, Token, CloudId, Permissions, Quotas, tenantInfo), 0, Cookie);
+ } else {
+ RetryCount++;
+ Schedule(Now() + Config.ConfigRetryPeriod * (1 << RetryCount), new TEvControlPlaneConfig::TEvGetTenantInfoRequest());
+ }
+ }
+
void HandleTimeout() {
CPP_LOG_D("Request timeout. " << RequestProto.DebugString());
NYql::TIssues issues;
diff --git a/ydb/core/yq/libs/control_plane_proxy/events/events.h b/ydb/core/yq/libs/control_plane_proxy/events/events.h
index bf0d956865f..cb6279d2871 100644
--- a/ydb/core/yq/libs/control_plane_proxy/events/events.h
+++ b/ydb/core/yq/libs/control_plane_proxy/events/events.h
@@ -70,13 +70,15 @@ struct TEvControlPlaneProxy {
const TString& user,
const TString& token,
const TVector<TString>& permissions,
- const TQuotaMap& quotas = {})
+ const TQuotaMap& quotas = {},
+ TTenantInfo::TPtr tenantInfo = nullptr)
: FolderId(folderId)
, Request(request)
, User(user)
, Token(token)
, Permissions(permissions)
, Quotas(quotas)
+ , TenantInfo(tenantInfo)
{
}
@@ -87,6 +89,7 @@ struct TEvControlPlaneProxy {
TString Token;
TVector<TString> Permissions;
TQuotaMap Quotas;
+ TTenantInfo::TPtr TenantInfo;
};
template<typename TDerived, typename ProtoMessage, ui32 EventType>
diff --git a/ydb/core/yq/libs/control_plane_storage/events/events.h b/ydb/core/yq/libs/control_plane_storage/events/events.h
index a142d1b565d..99a570b52a5 100644
--- a/ydb/core/yq/libs/control_plane_storage/events/events.h
+++ b/ydb/core/yq/libs/control_plane_storage/events/events.h
@@ -14,6 +14,7 @@
#include <ydb/library/yql/public/issue/yql_issue.h>
+#include <ydb/core/yq/libs/control_plane_config/events/events.h>
#include <ydb/core/yq/libs/control_plane_storage/proto/yq_internal.pb.h>
#include <ydb/core/yq/libs/events/event_subspace.h>
#include <ydb/core/yq/libs/quota_manager/events/events.h>
@@ -214,7 +215,8 @@ struct TEvControlPlaneStorage {
const TString& token,
const TString& cloudId,
TPermissions permissions,
- const TQuotaMap& quotas)
+ const TQuotaMap& quotas,
+ TTenantInfo::TPtr tenantInfo)
: Scope(scope)
, Request(request)
, User(user)
@@ -222,6 +224,7 @@ struct TEvControlPlaneStorage {
, CloudId(cloudId)
, Permissions(permissions)
, Quotas(quotas)
+ , TenantInfo(tenantInfo)
{
}
@@ -241,6 +244,7 @@ struct TEvControlPlaneStorage {
TString CloudId;
TPermissions Permissions;
TQuotaMap Quotas;
+ TTenantInfo::TPtr TenantInfo;
};
template<typename TDerived, typename ProtoMessage, ui32 EventType>
@@ -417,6 +421,7 @@ struct TEvControlPlaneStorage {
}
Fq::Private::GetTaskRequest Request;
+ TTenantInfo::TPtr TenantInfo;
};
struct TEvGetTaskResponse : NActors::TEventLocal<TEvGetTaskResponse, EvGetTaskResponse> {
@@ -480,6 +485,7 @@ struct TEvControlPlaneStorage {
}
Fq::Private::PingTaskRequest Request;
+ TTenantInfo::TPtr TenantInfo;
};
struct TEvPingTaskResponse : NActors::TEventLocal<TEvPingTaskResponse, EvPingTaskResponse> {
diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp
index e5860b8c724..79f18095922 100644
--- a/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp
@@ -16,13 +16,15 @@ namespace {
struct TTaskInternal {
TTask Task;
TRetryLimiter RetryLimiter;
- bool ShouldAbortTask = false;
+ bool ShouldAbortTask = false; // force ABORTED_BY_SYSTEM
+ bool ShouldSkipTask = false; // tenant fetch denied or tenant must be changed
TString TablePathPrefix;
TString Owner;
TString HostName;
TMaybe<YandexQuery::Job> Job;
TInstant Deadline;
TString TenantName;
+ TString NewTenantName;
};
TString GetServiceAccountId(const YandexQuery::IamAuth& auth) {
@@ -56,38 +58,88 @@ struct TTaskInternal {
}
std::pair<TString, NYdb::TParams> MakeSql(const TTaskInternal& taskInternal, const TInstant& nowTimestamp, const TInstant& taskLeaseUntil) {
- const auto& task = taskInternal.Task;
- TSqlQueryBuilder queryBuilder(taskInternal.TablePathPrefix, "GetTask(write)");
- queryBuilder.AddString("tenant", taskInternal.TenantName);
- queryBuilder.AddString("scope", task.Scope);
- queryBuilder.AddString("query_id", task.QueryId);
- queryBuilder.AddString("query", task.Query.SerializeAsString());
- queryBuilder.AddString("internal", task.Internal.SerializeAsString());
- queryBuilder.AddString("host", taskInternal.HostName);
- queryBuilder.AddString("owner", taskInternal.Owner);
- queryBuilder.AddTimestamp("now", nowTimestamp);
- queryBuilder.AddTimestamp("ttl", taskLeaseUntil);
- queryBuilder.AddUint64("retry_counter", taskInternal.RetryLimiter.RetryCount);
- queryBuilder.AddUint64("generation", task.Generation);
- queryBuilder.AddTimestamp("retry_counter_update_time", taskInternal.RetryLimiter.RetryCounterUpdatedAt);
- queryBuilder.AddDouble("retry_rate", taskInternal.RetryLimiter.RetryRate);
- // update queries
- queryBuilder.AddText(
- "UPDATE `" QUERIES_TABLE_NAME "` SET `" GENERATION_COLUMN_NAME "` = $generation, `" QUERY_COLUMN_NAME "` = $query, `" INTERNAL_COLUMN_NAME "` = $internal\n"
- "WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
- );
+ if (taskInternal.ShouldSkipTask) {
+
+ if (taskInternal.NewTenantName) {
+ const auto& task = taskInternal.Task;
+ TSqlQueryBuilder queryBuilder(taskInternal.TablePathPrefix, "GetTask(move)");
+ queryBuilder.AddString("tenant", taskInternal.TenantName);
+ queryBuilder.AddString("new_tenant", taskInternal.NewTenantName);
+ queryBuilder.AddString("scope", task.Scope);
+ queryBuilder.AddString("query_id", task.QueryId);
+ queryBuilder.AddInt64("query_type", task.Query.content().type());
+ queryBuilder.AddString("query", task.Query.SerializeAsString());
+ queryBuilder.AddString("internal", task.Internal.SerializeAsString());
+ queryBuilder.AddTimestamp("now", nowTimestamp);
+ queryBuilder.AddTimestamp("zero_timestamp", TInstant::Zero());
+ queryBuilder.AddUint64("retry_counter", taskInternal.RetryLimiter.RetryCount);
+ queryBuilder.AddUint64("generation", task.Generation);
+ queryBuilder.AddTimestamp("retry_counter_update_time", taskInternal.RetryLimiter.RetryCounterUpdatedAt);
+ queryBuilder.AddDouble("retry_rate", taskInternal.RetryLimiter.RetryRate);
+
+ // update queries
+ queryBuilder.AddText(
+ "UPDATE `" QUERIES_TABLE_NAME "` SET `" GENERATION_COLUMN_NAME "` = $generation, `" QUERY_COLUMN_NAME "` = $query, `" INTERNAL_COLUMN_NAME "` = $internal, `" TENANT_COLUMN_NAME "` = $new_tenant\n"
+ "WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
+ );
- // update pending small
- queryBuilder.AddText(
- "UPDATE `" PENDING_SMALL_TABLE_NAME "` SET `" LAST_SEEN_AT_COLUMN_NAME "` = $now, `" ASSIGNED_UNTIL_COLUMN_NAME "` = $ttl,\n"
- "`" RETRY_COUNTER_COLUMN_NAME "` = $retry_counter, `" RETRY_COUNTER_UPDATE_COLUMN_NAME "` = $retry_counter_update_time, `" RETRY_RATE_COLUMN_NAME "` = $retry_rate,\n"
- "`" HOST_NAME_COLUMN_NAME "` = $host, `" OWNER_COLUMN_NAME "` = $owner\n"
- "WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
- );
+ // delete old record
+ queryBuilder.AddText(
+ "DELETE FROM `" PENDING_SMALL_TABLE_NAME "`\n"
+ "WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
+ );
- const auto query = queryBuilder.Build();
- return std::make_pair(query.Sql, query.Params);
+ // insert for new tenant
+ queryBuilder.AddText(
+ "UPSERT INTO `" PENDING_SMALL_TABLE_NAME "`\n"
+ "(`" TENANT_COLUMN_NAME "`, `" SCOPE_COLUMN_NAME "`, `" QUERY_ID_COLUMN_NAME "`, `" QUERY_TYPE_COLUMN_NAME "`, `" LAST_SEEN_AT_COLUMN_NAME "`, `" ASSIGNED_UNTIL_COLUMN_NAME "`,\n"
+ "`" RETRY_RATE_COLUMN_NAME "`, `" RETRY_COUNTER_COLUMN_NAME "`, `" RETRY_COUNTER_UPDATE_COLUMN_NAME "`, `" HOST_NAME_COLUMN_NAME "`, `" OWNER_COLUMN_NAME "`)\n"
+ "VALUES\n"
+ " ($new_tenant, $scope, $query_id, $query_type, $now, $zero_timestamp, $retry_rate, $retry_counter, $retry_counter_update_time, \"\", \"\");"
+ );
+
+ const auto query = queryBuilder.Build();
+ return std::make_pair(query.Sql, query.Params);
+
+ } else {
+ return std::make_pair("", NYdb::TParamsBuilder().Build());
+ }
+
+ } else {
+ const auto& task = taskInternal.Task;
+ TSqlQueryBuilder queryBuilder(taskInternal.TablePathPrefix, "GetTask(write)");
+ queryBuilder.AddString("tenant", taskInternal.TenantName);
+ queryBuilder.AddString("scope", task.Scope);
+ queryBuilder.AddString("query_id", task.QueryId);
+ queryBuilder.AddString("query", task.Query.SerializeAsString());
+ queryBuilder.AddString("internal", task.Internal.SerializeAsString());
+ queryBuilder.AddString("host", taskInternal.HostName);
+ queryBuilder.AddString("owner", taskInternal.Owner);
+ queryBuilder.AddTimestamp("now", nowTimestamp);
+ queryBuilder.AddTimestamp("ttl", taskLeaseUntil);
+ queryBuilder.AddUint64("retry_counter", taskInternal.RetryLimiter.RetryCount);
+ queryBuilder.AddUint64("generation", task.Generation);
+ queryBuilder.AddTimestamp("retry_counter_update_time", taskInternal.RetryLimiter.RetryCounterUpdatedAt);
+ queryBuilder.AddDouble("retry_rate", taskInternal.RetryLimiter.RetryRate);
+
+ // update queries
+ queryBuilder.AddText(
+ "UPDATE `" QUERIES_TABLE_NAME "` SET `" GENERATION_COLUMN_NAME "` = $generation, `" QUERY_COLUMN_NAME "` = $query, `" INTERNAL_COLUMN_NAME "` = $internal\n"
+ "WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
+ );
+
+ // update pending small
+ queryBuilder.AddText(
+ "UPDATE `" PENDING_SMALL_TABLE_NAME "` SET `" LAST_SEEN_AT_COLUMN_NAME "` = $now, `" ASSIGNED_UNTIL_COLUMN_NAME "` = $ttl,\n"
+ "`" RETRY_COUNTER_COLUMN_NAME "` = $retry_counter, `" RETRY_COUNTER_UPDATE_COLUMN_NAME "` = $retry_counter_update_time, `" RETRY_RATE_COLUMN_NAME "` = $retry_rate,\n"
+ "`" HOST_NAME_COLUMN_NAME "` = $host, `" OWNER_COLUMN_NAME "` = $owner\n"
+ "WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
+ );
+
+ const auto query = queryBuilder.Build();
+ return std::make_pair(query.Sql, query.Params);
+ }
}
} // namespace
@@ -99,7 +151,8 @@ std::tuple<TString, NYdb::TParams, std::function<std::pair<TString, NYdb::TParam
const TInstant& taskLeaseUntil,
bool disableCurrentIam,
const TDuration& automaticQueriesTtl,
- const TDuration& resultSetsTtl)
+ const TDuration& resultSetsTtl,
+ std::shared_ptr<TTenantInfo> tenantInfo)
{
const auto& task = taskInternal.Task;
@@ -118,7 +171,7 @@ std::tuple<TString, NYdb::TParams, std::function<std::pair<TString, NYdb::TParam
"WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id AND `" ASSIGNED_UNTIL_COLUMN_NAME "` < $now;\n"
);
- auto prepareParams = [=, taskInternal=taskInternal, responseTasks=responseTasks](const TVector<TResultSet>& resultSets) mutable {
+ auto prepareParams = [=, taskInternal=taskInternal, responseTasks=responseTasks, tenantInfo=tenantInfo](const TVector<TResultSet>& resultSets) mutable {
auto& task = taskInternal.Task;
const auto shouldAbortTask = taskInternal.ShouldAbortTask;
constexpr size_t expectedResultSetsSize = 2;
@@ -155,8 +208,23 @@ std::tuple<TString, NYdb::TParams, std::function<std::pair<TString, NYdb::TParam
task.Query.mutable_meta()->set_status(YandexQuery::QueryMeta::ABORTING_BY_SYSTEM);
}
- responseTasks->AddTaskBlocking(task.QueryId, task);
+ if (tenantInfo->TenantState.Value(taskInternal.TenantName, TenantState::Active) != TenantState::Active) {
+ // tenant graceful shutdown, task fetch prohibited
+ taskInternal.ShouldSkipTask = true;
+ }
+ if (tenantInfo) {
+ auto tenant = tenantInfo->Assign(taskInternal.Task.Internal.cloud_id(), task.Scope, taskInternal.TenantName);
+ if (tenant != taskInternal.TenantName) {
+ // mapping changed, reassign tenant
+ taskInternal.ShouldSkipTask = true;
+ taskInternal.NewTenantName = tenant;
+ }
+ }
+
+ if (!taskInternal.ShouldSkipTask) {
+ responseTasks->AddTaskBlocking(task.QueryId, task);
+ }
return MakeSql(taskInternal, nowTimestamp, taskLeaseUntil);
};
@@ -217,8 +285,13 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ
CPS_LOG_T("GetTaskRequest: {" << request.DebugString() << "}");
NYql::TIssues issues = ValidateGetTask(owner, hostName);
+
+ if (!ev->Get()->TenantInfo) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::NOT_READY, "Control Plane is not ready yet. Please retry later."));
+ }
+
if (issues) {
- CPS_LOG_W("GetTaskRequest: {" << request.DebugString() << "} validation FAILED: " << issues.ToOneLineString());
+ CPS_LOG_W("GetTaskRequest: {" << request.DebugString() << "} FAILED: " << issues.ToOneLineString());
const TDuration delta = TInstant::Now() - startTime;
SendResponseIssues<TEvControlPlaneStorage::TEvGetTaskResponse>(ev->Sender, issues, ev->Cookie, delta, requestCounters);
LWPROBE(GetTaskRequest, owner, hostName, delta, false);
@@ -243,7 +316,11 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ
auto responseTasks = std::make_shared<TResponseTasks>();
- auto prepareParams = [=, rootCounters=Counters.Counters, actorSystem=NActors::TActivationContext::ActorSystem(), responseTasks=responseTasks](const TVector<TResultSet>& resultSets) mutable {
+ auto prepareParams = [=, rootCounters=Counters.Counters,
+ actorSystem=NActors::TActivationContext::ActorSystem(),
+ responseTasks=responseTasks,
+ tenantInfo=ev->Get()->TenantInfo
+ ](const TVector<TResultSet>& resultSets) mutable {
TVector<TTaskInternal> tasks;
TVector<TPickTaskParams> pickTaskParams;
const auto now = TInstant::Now();
@@ -290,7 +367,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ
for (size_t i = 0; i < numTasks; ++i) {
auto tupleParams = MakeGetTaskUpdateQuery(tasks[i],
responseTasks, now, now + Config.TaskLeaseTtl, Config.Proto.GetDisableCurrentIam(),
- Config.AutomaticQueriesTtl, Config.ResultSetsTtl); // using for win32 build
+ Config.AutomaticQueriesTtl, Config.ResultSetsTtl, tenantInfo); // using for win32 build
auto readQuery = std::get<0>(tupleParams);
auto readParams = std::get<1>(tupleParams);
auto prepareParams = std::get<2>(tupleParams);
diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp
index bc4544042e6..c0d28bdfca2 100644
--- a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp
@@ -455,10 +455,17 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvPingTaskReq
const TString queryId = request.query_id().value();
const TString owner = request.owner_id();
const TInstant deadline = NProtoInterop::CastFromProto(request.deadline());
+ const TString tenant = request.tenant();
CPS_LOG_T("PingTaskRequest: {" << request.DebugString() << "}");
NYql::TIssues issues = ValidatePingTask(scope, queryId, owner, deadline, Config.ResultSetsTtl);
+
+ auto tenantInfo = ev->Get()->TenantInfo;
+ if (tenantInfo && tenantInfo->TenantState.Value(tenant, TenantState::Active) == TenantState::Idle) {
+ issues.AddIssue("Tenant is idle, no processing is allowed");
+ }
+
if (issues) {
CPS_LOG_W("PingTaskRequest: {" << request.DebugString() << "} validation FAILED: " << issues.ToOneLineString());
const TDuration delta = TInstant::Now() - startTime;
diff --git a/ydb/core/yq/libs/control_plane_storage/message_builders.h b/ydb/core/yq/libs/control_plane_storage/message_builders.h
index 9b92c7c14f5..27578bd843d 100644
--- a/ydb/core/yq/libs/control_plane_storage/message_builders.h
+++ b/ydb/core/yq/libs/control_plane_storage/message_builders.h
@@ -975,6 +975,7 @@ class TGetTaskBuilder {
TString Owner;
TString HostName;
TString TenantName;
+ NYq::TTenantInfo::TPtr TenantInfo;
public:
TGetTaskBuilder()
@@ -982,6 +983,7 @@ public:
SetOwner(DefaultOwner());
SetHostName("localhost");
SetTenantName("/root/tenant");
+ SetTenantInfo(std::make_shared<NYq::TTenantInfo>());
}
static TString DefaultOwner() {
@@ -1006,12 +1008,19 @@ public:
return *this;
}
+ TGetTaskBuilder& SetTenantInfo(NYq::TTenantInfo::TPtr tenantInfo)
+ {
+ TenantInfo = tenantInfo;
+ return *this;
+ }
+
std::unique_ptr<NYq::TEvControlPlaneStorage::TEvGetTaskRequest> Build()
{
auto request = std::make_unique<NYq::TEvControlPlaneStorage::TEvGetTaskRequest>();
request->Request.set_tenant(TenantName);
request->Request.set_owner_id(Owner);
request->Request.set_host(HostName);
+ request->TenantInfo = TenantInfo;
return request;
}
};
@@ -1038,12 +1047,14 @@ class TPingTaskBuilder {
TVector<NYq::TEvControlPlaneStorage::TTopicConsumer> CreatedTopicConsumers;
TVector<TString> DqGraphs;
i32 DqGraphIndex = 0;
+ NYq::TTenantInfo::TPtr TenantInfo;
public:
TPingTaskBuilder()
{
SetDeadline(TInstant::Now() + TDuration::Minutes(5));
SetTenantName("/root/tenant");
+ SetTenantInfo(std::make_shared<NYq::TTenantInfo>());
}
TPingTaskBuilder& SetTenantName(const TString& tenantName)
@@ -1172,7 +1183,13 @@ public:
return *this;
}
- std::unique_ptr<NYq::TEvControlPlaneStorage::TEvPingTaskRequest> Build()
+ TPingTaskBuilder& SetTenantInfo(NYq::TTenantInfo::TPtr tenantInfo)
+ {
+ TenantInfo = tenantInfo;
+ return *this;
+ }
+
+std::unique_ptr<NYq::TEvControlPlaneStorage::TEvPingTaskRequest> Build()
{
Fq::Private::PingTaskRequest request;
request.set_owner_id(Owner);
@@ -1230,7 +1247,9 @@ public:
*request.mutable_finished_at() = NProtoInterop::CastToProto(*FinishedAt);
}
- return std::make_unique<NYq::TEvControlPlaneStorage::TEvPingTaskRequest>(std::move(request));
+ auto pingRequest = std::make_unique<NYq::TEvControlPlaneStorage::TEvPingTaskRequest>(std::move(request));
+ pingRequest->TenantInfo = TenantInfo;
+ return pingRequest;
}
};
diff --git a/ydb/core/yq/libs/control_plane_storage/message_builders_yq.h b/ydb/core/yq/libs/control_plane_storage/message_builders_yq.h
index bee788a83f1..ae5f68c56d8 100644
--- a/ydb/core/yq/libs/control_plane_storage/message_builders_yq.h
+++ b/ydb/core/yq/libs/control_plane_storage/message_builders_yq.h
@@ -975,6 +975,7 @@ class TGetTaskBuilder {
TString Owner;
TString HostName;
TString TenantName;
+ NYq::TTenantInfo::TPtr TenantInfo;
public:
TGetTaskBuilder()
@@ -982,6 +983,7 @@ public:
SetOwner(DefaultOwner());
SetHostName("localhost");
SetTenantName("/root/tenant");
+ SetTenantInfo(std::make_shared<NYq::TTenantInfo>());
}
static TString DefaultOwner() {
@@ -1006,12 +1008,19 @@ public:
return *this;
}
+ TGetTaskBuilder& SetTenantInfo(NYq::TTenantInfo::TPtr tenantInfo)
+ {
+ TenantInfo = tenantInfo;
+ return *this;
+ }
+
std::unique_ptr<TEvControlPlaneStorage::TEvGetTaskRequest> Build()
{
auto request = std::make_unique<TEvControlPlaneStorage::TEvGetTaskRequest>();
request->Request.set_tenant(TenantName);
request->Request.set_owner_id(Owner);
request->Request.set_host(HostName);
+ request->TenantInfo = TenantInfo;
return request;
}
};
diff --git a/ydb/core/yq/libs/control_plane_storage/schema.h b/ydb/core/yq/libs/control_plane_storage/schema.h
index 33b9ede7a45..5dcde8db98d 100644
--- a/ydb/core/yq/libs/control_plane_storage/schema.h
+++ b/ydb/core/yq/libs/control_plane_storage/schema.h
@@ -12,6 +12,9 @@ namespace NYq {
#define JOBS_TABLE_NAME "jobs"
#define NODES_TABLE_NAME "nodes"
#define QUOTAS_TABLE_NAME "quotas"
+#define TENANTS_TABLE_NAME "tenants"
+#define TENANT_ACKS_TABLE_NAME "tenant_acks"
+#define MAPPINGS_TABLE_NAME "mappings"
// columns
#define SCOPE_COLUMN_NAME "scope"
@@ -79,4 +82,9 @@ namespace NYq {
#define METRIC_USAGE_COLUMN_NAME "metric_usage"
#define USAGE_UPDATED_AT_COLUMN_NAME "usage_updated_at"
+#define VTENANT_COLUMN_NAME "vtenant"
+#define COMMON_COLUMN_NAME "common"
+#define STATE_COLUMN_NAME "state"
+#define STATE_TIME_COLUMN_NAME "state_time"
+
} // namespace NYq
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
index 3d099b0a6f4..ea0418a0efc 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
@@ -38,6 +38,7 @@ void TYdbControlPlaneStorageActor::Bootstrap() {
YdbConnection = NewYdbConnection(Config.Proto.GetStorage(), CredProviderFactory, YqSharedResources->CoreYdbDriver);
DbPool = YqSharedResources->DbPoolHolder->GetOrCreate(EDbPoolId::MAIN, 10, YdbConnection->TablePathPrefix);
CreateDirectory();
+
CreateQueriesTable();
CreatePendingSmallTable();
CreateConnectionsTable();
@@ -47,6 +48,10 @@ void TYdbControlPlaneStorageActor::Bootstrap() {
CreateJobsTable();
CreateNodesTable();
CreateQuotasTable();
+ CreateTenantsTable();
+ CreateTenantAcksTable();
+ CreateMappingsTable();
+
Become(&TThis::StateFunc);
}
@@ -54,7 +59,11 @@ void TYdbControlPlaneStorageActor::Bootstrap() {
* Creating tables
*/
void TYdbControlPlaneStorageActor::RunCreateTableActor(const TString& path, NYdb::NTable::TTableDescription desc) {
- Register(MakeCreateTableActor({}, NKikimrServices::YQ_CONTROL_PLANE_STORAGE, YdbConnection, path, std::move(desc), MakeCreateSchemaRetryPolicy()));
+ Register(MakeCreateTableActor(SelfId(), NKikimrServices::YQ_CONTROL_PLANE_STORAGE, YdbConnection, path, std::move(desc), MakeCreateSchemaRetryPolicy()));
+}
+
+void TYdbControlPlaneStorageActor::Handle(TEvents::TEvSchemaCreated::TPtr&) {
+ // skip for now
}
void TYdbControlPlaneStorageActor::CreateQueriesTable()
@@ -250,6 +259,53 @@ void TYdbControlPlaneStorageActor::CreateQuotasTable()
RunCreateTableActor(tablePath, TTableDescription(description));
}
+void TYdbControlPlaneStorageActor::CreateTenantsTable()
+{
+ auto tablePath = JoinPath(YdbConnection->TablePathPrefix, TENANTS_TABLE_NAME);
+
+ auto description = TTableBuilder()
+ .AddNullableColumn(TENANT_COLUMN_NAME, EPrimitiveType::String)
+ .AddNullableColumn(VTENANT_COLUMN_NAME, EPrimitiveType::String)
+ .AddNullableColumn(COMMON_COLUMN_NAME, EPrimitiveType::Bool)
+ .AddNullableColumn(STATE_COLUMN_NAME, EPrimitiveType::Uint32)
+ .AddNullableColumn(STATE_TIME_COLUMN_NAME, EPrimitiveType::Timestamp)
+ .SetPrimaryKeyColumns({TENANT_COLUMN_NAME})
+ .Build();
+
+ RunCreateTableActor(tablePath, TTableDescription(description));
+}
+
+void TYdbControlPlaneStorageActor::CreateTenantAcksTable()
+{
+ auto tablePath = JoinPath(YdbConnection->TablePathPrefix, TENANT_ACKS_TABLE_NAME);
+
+ auto description = TTableBuilder()
+ .AddNullableColumn(NODE_ID_COLUMN_NAME, EPrimitiveType::Uint32)
+ .AddNullableColumn(STATE_TIME_COLUMN_NAME, EPrimitiveType::Timestamp)
+ .SetPrimaryKeyColumns({NODE_ID_COLUMN_NAME})
+ .Build();
+
+ RunCreateTableActor(tablePath, TTableDescription(description));
+}
+
+void TYdbControlPlaneStorageActor::CreateMappingsTable()
+{
+ auto tablePath = JoinPath(YdbConnection->TablePathPrefix, MAPPINGS_TABLE_NAME);
+
+ auto description = TTableBuilder()
+ .AddNullableColumn(SUBJECT_TYPE_COLUMN_NAME, EPrimitiveType::String)
+ .AddNullableColumn(SUBJECT_ID_COLUMN_NAME, EPrimitiveType::String)
+ .AddNullableColumn(VTENANT_COLUMN_NAME, EPrimitiveType::String)
+ .SetPrimaryKeyColumns({SUBJECT_TYPE_COLUMN_NAME, SUBJECT_ID_COLUMN_NAME})
+ .Build();
+
+ RunCreateTableActor(tablePath, TTableDescription(description));
+}
+
+void TYdbControlPlaneStorageActor::AfterTablesCreated() {
+ // Schedule(TDuration::Zero(), new NActors::TEvents::TEvWakeup());
+}
+
bool TYdbControlPlaneStorageActor::IsSuperUser(const TString& user)
{
return AnyOf(Config.Proto.GetSuperUsers(), [&user](const auto& superUser) {
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
index f91618ecf3b..81f6fa4ccd0 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
@@ -523,6 +523,7 @@ public:
hFunc(TEvQuotaService::TQuotaUsageRequest, Handle);
hFunc(TEvQuotaService::TQuotaLimitChangeRequest, Handle);
hFunc(TEvents::TEvCallback, [](TEvents::TEvCallback::TPtr& ev) { ev->Get()->Callback(); } );
+ hFunc(TEvents::TEvSchemaCreated, Handle);
)
void Handle(TEvControlPlaneStorage::TEvCreateQueryRequest::TPtr& ev);
@@ -556,6 +557,8 @@ public:
void Handle(TEvControlPlaneStorage::TEvNodesHealthCheckRequest::TPtr& ev);
+ void Handle(TEvents::TEvSchemaCreated::TPtr& ev);
+
void Handle(TEvQuotaService::TQuotaUsageRequest::TPtr& ev);
void Handle(TEvQuotaService::TQuotaLimitChangeRequest::TPtr& ev);
@@ -613,8 +616,12 @@ public:
void CreateIdempotencyKeysTable();
void CreateResultSetsTable();
void CreateQuotasTable();
+ void CreateTenantsTable();
+ void CreateTenantAcksTable();
+ void CreateMappingsTable();
void RunCreateTableActor(const TString& path, NYdb::NTable::TTableDescription desc);
+ void AfterTablesCreated();
private:
/*
@@ -892,8 +899,6 @@ private:
std::shared_ptr<TResponseTasks> responseTasks,
const TVector<TValidationQuery>& validators = {},
TTxSettings transactionMode = TTxSettings::SerializableRW());
-
- TString AssignTenantName(const TString& cloudId, const TString& scope);
};
}
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
index 8a175b4f555..2a6752f6505 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
@@ -45,35 +45,6 @@ YandexQuery::IamAuth::IdentityCase GetIamAuth(const YandexQuery::Connection& con
namespace NYq {
-TString TYdbControlPlaneStorageActor::AssignTenantName(const TString& cloudId, const TString& scope) {
- const auto& mapping = Config.Proto.GetMapping();
-
- if (scope) {
- for (const auto& scopeToTenant: mapping.GetScopeToTenantName()) {
- if (scopeToTenant.GetKey() == scope) {
- return scopeToTenant.GetValue();
- }
- }
- }
-
- if (cloudId) {
- for (const auto& cloudToTenant: mapping.GetCloudIdToTenantName()) {
- if (cloudToTenant.GetKey() == cloudId) {
- return cloudToTenant.GetValue();
- }
- }
- }
-
- auto size = mapping.CommonTenantNameSize();
-
- if (size) {
- auto index = MultiHash(cloudId, scope) % size;
- return mapping.GetCommonTenantName(index);
- } else {
- return TenantName;
- }
-}
-
void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQueryRequest::TPtr& ev)
{
TInstant startTime = TInstant::Now();
@@ -124,6 +95,8 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "Streaming disposition \"from_last_checkpoint\" is not allowed in CreateQuery request"));
}
+ auto tenant = ev->Get()->TenantInfo->Assign(cloudId, scope, TenantName);
+
{
TQuotaMap::const_iterator it = event.Quotas.end();
if (queryType == YandexQuery::QueryContent::ANALYTICS) {
@@ -290,7 +263,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery
response->second.After.ConstructInPlace().CopyFrom(query);
TSqlQueryBuilder writeQueryBuilder(YdbConnection->TablePathPrefix, "CreateQuery(write)");
- writeQueryBuilder.AddString("tenant", AssignTenantName(cloudId, scope));
+ writeQueryBuilder.AddString("tenant", tenant);
writeQueryBuilder.AddString("scope", scope);
writeQueryBuilder.AddString("query_id", queryId);
writeQueryBuilder.AddString("name", query.content().name());
@@ -325,9 +298,9 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery
writeQueryBuilder.AddText(
"INSERT INTO `" PENDING_SMALL_TABLE_NAME "`\n"
"(`" TENANT_COLUMN_NAME "`, `" SCOPE_COLUMN_NAME "`, `" QUERY_ID_COLUMN_NAME "`, `" QUERY_TYPE_COLUMN_NAME "`, `" LAST_SEEN_AT_COLUMN_NAME "`, `" ASSIGNED_UNTIL_COLUMN_NAME "`,\n"
- "`" RETRY_COUNTER_COLUMN_NAME "`, `" RETRY_COUNTER_UPDATE_COLUMN_NAME "`, `" HOST_NAME_COLUMN_NAME "`, `" OWNER_COLUMN_NAME "`)\n"
+ "`" RETRY_RATE_COLUMN_NAME "`, `" RETRY_COUNTER_COLUMN_NAME "`, `" RETRY_COUNTER_UPDATE_COLUMN_NAME "`, `" HOST_NAME_COLUMN_NAME "`, `" OWNER_COLUMN_NAME "`)\n"
"VALUES\n"
- " ($tenant, $scope, $query_id, $query_type, $zero_timestamp, $zero_timestamp, 0, $now, \"\", \"\");"
+ " ($tenant, $scope, $query_id, $query_type, $zero_timestamp, $zero_timestamp, 0, 0, $now, \"\", \"\");"
);
}
@@ -778,6 +751,9 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyQuery
if (request.state_load_mode() == YandexQuery::FROM_LAST_CHECKPOINT) {
issues.AddIssue(MakeErrorIssue(TIssuesIds::UNSUPPORTED, "State load mode \"FROM_LAST_CHECKPOINT\" is not supported"));
}
+
+ auto tenant = ev->Get()->TenantInfo->Assign(cloudId, scope, TenantName);
+
if (issues) {
CPS_LOG_W("ModifyQueryRequest: {" << request.DebugString() << "} " << MakeUserInfo(user, token) << "validation FAILED: " << issues.ToOneLineString());
const TDuration delta = TInstant::Now() - startTime;
@@ -991,7 +967,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyQuery
response->second.CloudId = internal.cloud_id();
TSqlQueryBuilder writeQueryBuilder(YdbConnection->TablePathPrefix, "ModifyQuery(write)");
- writeQueryBuilder.AddString("tenant", AssignTenantName(internal.cloud_id(), scope));
+ writeQueryBuilder.AddString("tenant", tenant);
writeQueryBuilder.AddString("scope", scope);
writeQueryBuilder.AddString("query_id", queryId);
writeQueryBuilder.AddUint64("max_count_jobs", Config.Proto.GetMaxCountJobs());
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp
index 0cfba053b2f..128301adc25 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp
@@ -44,7 +44,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvQuotaService::TQuotaUsageRequest::T
QueryQuotas.clear();
TDbExecutable::TPtr executable;
- auto& executer = TQuotaCountExecuter::Create(executable, [](TQuotaCountExecuter& executer) { executer.State.clear(); } );
+ auto& executer = TQuotaCountExecuter::Create(executable, false, [](TQuotaCountExecuter& executer) { executer.State.clear(); } );
executer.Read(
[=](TQuotaCountExecuter&, TSqlQueryBuilder& builder) {
@@ -76,11 +76,11 @@ void TYdbControlPlaneStorageActor::Handle(TEvQuotaService::TQuotaUsageRequest::T
executer.State[internal.cloud_id()] += count;
}
},
- "GetScopeCloud_" + scope
+ "GetScopeCloud_" + scope, true
);
}
},
- "GroupByPendingSmall"
+ "GroupByPendingSmall", true
).Process(SelfId(),
[=, this](TQuotaCountExecuter& executer) {
this->QuotasUpdatedAt = Now();
diff --git a/ydb/core/yq/libs/events/event_subspace.h b/ydb/core/yq/libs/events/event_subspace.h
index e283d642539..ea0323aa8f4 100644
--- a/ydb/core/yq/libs/events/event_subspace.h
+++ b/ydb/core/yq/libs/events/event_subspace.h
@@ -29,6 +29,7 @@ struct TYqEventSubspace {
InternalService,
QuotaService,
RateLimiter,
+ ControlPlaneConfig,
SubspacesEnd,
};
diff --git a/ydb/core/yq/libs/init/CMakeLists.txt b/ydb/core/yq/libs/init/CMakeLists.txt
index bad3af568fe..11142bedfd2 100644
--- a/ydb/core/yq/libs/init/CMakeLists.txt
+++ b/ydb/core/yq/libs/init/CMakeLists.txt
@@ -23,6 +23,7 @@ target_link_libraries(yq-libs-init PUBLIC
yq-libs-checkpoint_storage
yq-libs-checkpointing
yq-libs-common
+ yq-libs-control_plane_config
yq-libs-control_plane_proxy
yq-libs-control_plane_storage
yq-libs-events
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp
index 744828532ff..bfc26130148 100644
--- a/ydb/core/yq/libs/init/init.cpp
+++ b/ydb/core/yq/libs/init/init.cpp
@@ -5,6 +5,7 @@
#include <ydb/core/yq/libs/audit/yq_audit_service.h>
#include <ydb/core/yq/libs/checkpoint_storage/storage_service.h>
+#include <ydb/core/yq/libs/control_plane_config/control_plane_config.h>
#include <ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.h>
#include <ydb/core/yq/libs/health/health.h>
#include <ydb/core/yq/libs/checkpoint_storage/storage_service.h>
@@ -83,6 +84,11 @@ void Init(
credentialsProviderFactory,
tenant);
actorRegistrator(NYq::ControlPlaneStorageServiceActorId(), controlPlaneStorage);
+
+ actorRegistrator(NYq::ControlPlaneConfigActorId(),
+ CreateControlPlaneConfigActor(yqSharedResources, credentialsProviderFactory, protoConfig.GetControlPlaneStorage(),
+ appData->Counters->GetSubgroup("counters", "yq")->GetSubgroup("subsystem", "ControlPlaneConfig"))
+ );
}
if (protoConfig.GetControlPlaneProxy().GetEnabled()) {
diff --git a/ydb/core/yq/libs/private_client/loopback_service.cpp b/ydb/core/yq/libs/private_client/loopback_service.cpp
index e47f7231b6e..4d211d20ea3 100644
--- a/ydb/core/yq/libs/private_client/loopback_service.cpp
+++ b/ydb/core/yq/libs/private_client/loopback_service.cpp
@@ -6,6 +6,8 @@
#include <ydb/core/protos/services.pb.h>
+#include <ydb/core/yq/libs/control_plane_config/control_plane_config.h>
+#include <ydb/core/yq/libs/control_plane_config/events/events.h>
#include <ydb/core/yq/libs/control_plane_storage/control_plane_storage.h>
#include <ydb/core/yq/libs/control_plane_storage/events/events.h>
@@ -39,6 +41,7 @@ private:
STRICT_STFUNC(StateFunc,
hFunc(TEvInternalService::TEvHealthCheckRequest, Handle)
hFunc(TEvInternalService::TEvGetTaskRequest, Handle)
+ hFunc(NYq::TEvControlPlaneConfig::TEvGetTenantInfoResponse, Handle)
hFunc(TEvInternalService::TEvPingTaskRequest, Handle)
hFunc(TEvInternalService::TEvWriteResultRequest, Handle)
hFunc(TEvInternalService::TEvCreateRateLimiterResourceRequest, Handle)
@@ -77,7 +80,20 @@ private:
Cookie++;
Senders[Cookie] = ev->Sender;
auto request = ev->Get()->Request;
- Send(NYq::ControlPlaneStorageServiceActorId(), new NYq::TEvControlPlaneStorage::TEvGetTaskRequest(std::move(request)), 0, Cookie);
+ GetRequests.emplace(Cookie, std::move(request));
+ Send(NYq::ControlPlaneConfigActorId(), new NYq::TEvControlPlaneConfig::TEvGetTenantInfoRequest(), 0, Cookie);
+ }
+
+ void Handle(NYq::TEvControlPlaneConfig::TEvGetTenantInfoResponse::TPtr& ev) {
+ TenantInfo = ev->Get()->TenantInfo;
+ auto it = GetRequests.find(ev->Cookie);
+ if (it != GetRequests.end()) {
+ auto request = it->second;
+ GetRequests.erase(it);
+ auto event = std::make_unique<NYq::TEvControlPlaneStorage::TEvGetTaskRequest>(std::move(request));
+ event->TenantInfo = TenantInfo;
+ Send(NYq::ControlPlaneStorageServiceActorId(), event.release(), 0, ev->Cookie);
+ }
}
void Handle(NYq::TEvControlPlaneStorage::TEvGetTaskResponse::TPtr& ev) {
@@ -99,7 +115,9 @@ private:
Senders[Cookie] = ev->Sender;
OriginalCookies[Cookie] = ev->Cookie;
auto request = ev->Get()->Request;
- Send(NYq::ControlPlaneStorageServiceActorId(), new NYq::TEvControlPlaneStorage::TEvPingTaskRequest(std::move(request)), 0, Cookie);
+ auto event = std::make_unique<NYq::TEvControlPlaneStorage::TEvPingTaskRequest>(std::move(request));
+ event->TenantInfo = TenantInfo;
+ Send(NYq::ControlPlaneStorageServiceActorId(), event.release(), 0, Cookie);
}
void Handle(NYq::TEvControlPlaneStorage::TEvPingTaskResponse::TPtr& ev) {
@@ -185,6 +203,8 @@ private:
ui64 Cookie = 0;
THashMap<ui64, NActors::TActorId> Senders;
THashMap<ui64, ui64> OriginalCookies;
+ THashMap<ui64, Fq::Private::GetTaskRequest> GetRequests;
+ NYq::TTenantInfo::TPtr TenantInfo;
};
NActors::IActor* CreateLoopbackServiceActor(
diff --git a/ydb/core/yq/libs/shared_resources/db_exec.h b/ydb/core/yq/libs/shared_resources/db_exec.h
index da99cd7dbb6..1add5e82b3c 100644
--- a/ydb/core/yq/libs/shared_resources/db_exec.h
+++ b/ydb/core/yq/libs/shared_resources/db_exec.h
@@ -135,7 +135,6 @@ public:
TCommitTransactionResult result = future.GetValue();
auto status = static_cast<TStatus>(result);
-
if (!status.IsSuccess()) {
return MakeFuture(status);
} else {
diff --git a/ydb/core/yq/libs/test_connection/events/events.h b/ydb/core/yq/libs/test_connection/events/events.h
index 7cbb61bdccc..ef2c4368c16 100644
--- a/ydb/core/yq/libs/test_connection/events/events.h
+++ b/ydb/core/yq/libs/test_connection/events/events.h
@@ -3,6 +3,7 @@
#include <ydb/public/api/protos/yq.pb.h>
#include <ydb/core/yq/libs/events/event_subspace.h>
+#include <ydb/core/yq/libs/control_plane_config/events/events.h>
#include <ydb/core/yq/libs/control_plane_storage/events/events.h>
#include <library/cpp/actors/core/event_pb.h>
@@ -16,12 +17,12 @@ namespace NYq {
struct TEvTestConnection {
// Event ids.
enum EEv : ui32 {
- EvTestConnectionRequest = YqEventSubspaceBegin(NYq::TYqEventSubspace::TestConnection),
+ EvTestConnectionRequest = YqEventSubspaceBegin(::NYq::TYqEventSubspace::TestConnection),
EvTestConnectionResponse,
EvEnd,
};
- static_assert(EvEnd <= YqEventSubspaceEnd(NYq::TYqEventSubspace::TestConnection), "All events must be in their subspace");
+ static_assert(EvEnd <= YqEventSubspaceEnd(::NYq::TYqEventSubspace::TestConnection), "All events must be in their subspace");
struct TEvTestConnectionRequest : NActors::TEventLocal<TEvTestConnectionRequest, EvTestConnectionRequest> {
explicit TEvTestConnectionRequest(const TString& scope,
@@ -30,7 +31,8 @@ struct TEvTestConnection {
const TString& token,
const TString& cloudId,
const TPermissions& permissions,
- const TQuotaMap& quotas)
+ const TQuotaMap& quotas,
+ TTenantInfo::TPtr tenantInfo)
: CloudId(cloudId)
, Scope(scope)
, Request(request)
@@ -38,6 +40,7 @@ struct TEvTestConnection {
, Token(token)
, Permissions(permissions)
, Quotas(quotas)
+ , TenantInfo(tenantInfo)
{
}
@@ -48,6 +51,7 @@ struct TEvTestConnection {
TString Token;
TPermissions Permissions;
const TQuotaMap Quotas;
+ TTenantInfo::TPtr TenantInfo;
};
struct TEvTestConnectionResponse : NActors::TEventLocal<TEvTestConnectionResponse, EvTestConnectionResponse> {