diff options
author | hor911 <hor911@ydb.tech> | 2022-11-06 09:37:55 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2022-11-06 09:37:55 +0300 |
commit | 5ce93e61d8e4c82be5f018623b234a572e4461dc (patch) | |
tree | ab59655e07fc444da9d172076aee0e44da348bb9 | |
parent | 734abe9d61f771fc2f32c707117305271f0a14a4 (diff) | |
download | ydb-5ce93e61d8e4c82be5f018623b234a572e4461dc.tar.gz |
V-tenants and mappings in DB
34 files changed, 780 insertions, 130 deletions
diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto index c2c24e2f068..4dd801466b4 100644 --- a/ydb/core/protos/services.proto +++ b/ydb/core/protos/services.proto @@ -309,6 +309,7 @@ enum EServiceKikimr { YQ_RATE_LIMITER = 1155; FQ_QUOTA_PROXY = 1156; PUBLIC_HTTP = 1157; + FQ_CONTROL_PLANE_CONFIG = 1158; // 1024 - 1099 is reserved for nbs diff --git a/ydb/core/yq/libs/CMakeLists.txt b/ydb/core/yq/libs/CMakeLists.txt index c451003f013..15ccfbffe4a 100644 --- a/ydb/core/yq/libs/CMakeLists.txt +++ b/ydb/core/yq/libs/CMakeLists.txt @@ -13,6 +13,7 @@ add_subdirectory(checkpointing) add_subdirectory(checkpointing_common) add_subdirectory(common) add_subdirectory(config) +add_subdirectory(control_plane_config) add_subdirectory(control_plane_proxy) add_subdirectory(control_plane_storage) add_subdirectory(db_id_async_resolver_impl) diff --git a/ydb/core/yq/libs/actors/logging/log.h b/ydb/core/yq/libs/actors/logging/log.h index 78270e26eda..e5923ed150d 100644 --- a/ydb/core/yq/libs/actors/logging/log.h +++ b/ydb/core/yq/libs/actors/logging/log.h @@ -88,38 +88,16 @@ #define LOG_STREAMS_CONTROL_PLANE_SERVICE_DEBUG(logRecordStream) LOG_STREAMS_IMPL(DEBUG, STREAMS_CONTROL_PLANE_SERVICE, logRecordStream) #define LOG_STREAMS_CONTROL_PLANE_SERVICE_TRACE(logRecordStream) LOG_STREAMS_IMPL(TRACE, STREAMS_CONTROL_PLANE_SERVICE, logRecordStream) -// Component: STREAMS_GRAND_LEADER_SERVICE. -#define LOG_STREAMS_GRAND_LEADER_SERVICE_EMERG(logRecordStream) LOG_STREAMS_IMPL(EMERG, STREAMS_GRAND_LEADER_SERVICE, logRecordStream) -#define LOG_STREAMS_GRAND_LEADER_SERVICE_ALERT(logRecordStream) LOG_STREAMS_IMPL(ALERT, STREAMS_GRAND_LEADER_SERVICE, logRecordStream) -#define LOG_STREAMS_GRAND_LEADER_SERVICE_CRIT(logRecordStream) LOG_STREAMS_IMPL(CRIT, STREAMS_GRAND_LEADER_SERVICE, logRecordStream) -#define LOG_STREAMS_GRAND_LEADER_SERVICE_ERROR(logRecordStream) LOG_STREAMS_IMPL(ERROR, STREAMS_GRAND_LEADER_SERVICE, logRecordStream) -#define LOG_STREAMS_GRAND_LEADER_SERVICE_WARN(logRecordStream) LOG_STREAMS_IMPL(WARN, STREAMS_GRAND_LEADER_SERVICE, logRecordStream) -#define LOG_STREAMS_GRAND_LEADER_SERVICE_NOTICE(logRecordStream) LOG_STREAMS_IMPL(NOTICE, STREAMS_GRAND_LEADER_SERVICE, logRecordStream) -#define LOG_STREAMS_GRAND_LEADER_SERVICE_INFO(logRecordStream) LOG_STREAMS_IMPL(INFO, STREAMS_GRAND_LEADER_SERVICE, logRecordStream) -#define LOG_STREAMS_GRAND_LEADER_SERVICE_DEBUG(logRecordStream) LOG_STREAMS_IMPL(DEBUG, STREAMS_GRAND_LEADER_SERVICE, logRecordStream) -#define LOG_STREAMS_GRAND_LEADER_SERVICE_TRACE(logRecordStream) LOG_STREAMS_IMPL(TRACE, STREAMS_GRAND_LEADER_SERVICE, logRecordStream) - -// Component: STREAMS_META_STORAGE_SERVICE. -#define LOG_STREAMS_META_STORAGE_SERVICE_EMERG(logRecordStream) LOG_STREAMS_IMPL(EMERG, STREAMS_META_STORAGE_SERVICE, logRecordStream) -#define LOG_STREAMS_META_STORAGE_SERVICE_ALERT(logRecordStream) LOG_STREAMS_IMPL(ALERT, STREAMS_META_STORAGE_SERVICE, logRecordStream) -#define LOG_STREAMS_META_STORAGE_SERVICE_CRIT(logRecordStream) LOG_STREAMS_IMPL(CRIT, STREAMS_META_STORAGE_SERVICE, logRecordStream) -#define LOG_STREAMS_META_STORAGE_SERVICE_ERROR(logRecordStream) LOG_STREAMS_IMPL(ERROR, STREAMS_META_STORAGE_SERVICE, logRecordStream) -#define LOG_STREAMS_META_STORAGE_SERVICE_WARN(logRecordStream) LOG_STREAMS_IMPL(WARN, STREAMS_META_STORAGE_SERVICE, logRecordStream) -#define LOG_STREAMS_META_STORAGE_SERVICE_NOTICE(logRecordStream) LOG_STREAMS_IMPL(NOTICE, STREAMS_META_STORAGE_SERVICE, logRecordStream) -#define LOG_STREAMS_META_STORAGE_SERVICE_INFO(logRecordStream) LOG_STREAMS_IMPL(INFO, STREAMS_META_STORAGE_SERVICE, logRecordStream) -#define LOG_STREAMS_META_STORAGE_SERVICE_DEBUG(logRecordStream) LOG_STREAMS_IMPL(DEBUG, STREAMS_META_STORAGE_SERVICE, logRecordStream) -#define LOG_STREAMS_META_STORAGE_SERVICE_TRACE(logRecordStream) LOG_STREAMS_IMPL(TRACE, STREAMS_META_STORAGE_SERVICE, logRecordStream) - -// Component: STREAMS_GRAPH_LEADER. -#define LOG_STREAMS_GRAPH_LEADER_EMERG(logRecordStream) LOG_STREAMS_IMPL(EMERG, STREAMS_GRAPH_LEADER, logRecordStream) -#define LOG_STREAMS_GRAPH_LEADER_ALERT(logRecordStream) LOG_STREAMS_IMPL(ALERT, STREAMS_GRAPH_LEADER, logRecordStream) -#define LOG_STREAMS_GRAPH_LEADER_CRIT(logRecordStream) LOG_STREAMS_IMPL(CRIT, STREAMS_GRAPH_LEADER, logRecordStream) -#define LOG_STREAMS_GRAPH_LEADER_ERROR(logRecordStream) LOG_STREAMS_IMPL(ERROR, STREAMS_GRAPH_LEADER, logRecordStream) -#define LOG_STREAMS_GRAPH_LEADER_WARN(logRecordStream) LOG_STREAMS_IMPL(WARN, STREAMS_GRAPH_LEADER, logRecordStream) -#define LOG_STREAMS_GRAPH_LEADER_NOTICE(logRecordStream) LOG_STREAMS_IMPL(NOTICE, STREAMS_GRAPH_LEADER, logRecordStream) -#define LOG_STREAMS_GRAPH_LEADER_INFO(logRecordStream) LOG_STREAMS_IMPL(INFO, STREAMS_GRAPH_LEADER, logRecordStream) -#define LOG_STREAMS_GRAPH_LEADER_DEBUG(logRecordStream) LOG_STREAMS_IMPL(DEBUG, STREAMS_GRAPH_LEADER, logRecordStream) -#define LOG_STREAMS_GRAPH_LEADER_TRACE(logRecordStream) LOG_STREAMS_IMPL(TRACE, STREAMS_GRAPH_LEADER, logRecordStream) +// Component: STREAMS_GRAPH_LEADER. +#define LOG_STREAMS_GRAPH_LEADER_EMERG(logRecordStream) LOG_STREAMS_IMPL(EMERG, STREAMS_GRAPH_LEADER, logRecordStream) +#define LOG_STREAMS_GRAPH_LEADER_ALERT(logRecordStream) LOG_STREAMS_IMPL(ALERT, STREAMS_GRAPH_LEADER, logRecordStream) +#define LOG_STREAMS_GRAPH_LEADER_CRIT(logRecordStream) LOG_STREAMS_IMPL(CRIT, STREAMS_GRAPH_LEADER, logRecordStream) +#define LOG_STREAMS_GRAPH_LEADER_ERROR(logRecordStream) LOG_STREAMS_IMPL(ERROR, STREAMS_GRAPH_LEADER, logRecordStream) +#define LOG_STREAMS_GRAPH_LEADER_WARN(logRecordStream) LOG_STREAMS_IMPL(WARN, STREAMS_GRAPH_LEADER, logRecordStream) +#define LOG_STREAMS_GRAPH_LEADER_NOTICE(logRecordStream) LOG_STREAMS_IMPL(NOTICE, STREAMS_GRAPH_LEADER, logRecordStream) +#define LOG_STREAMS_GRAPH_LEADER_INFO(logRecordStream) LOG_STREAMS_IMPL(INFO, STREAMS_GRAPH_LEADER, logRecordStream) +#define LOG_STREAMS_GRAPH_LEADER_DEBUG(logRecordStream) LOG_STREAMS_IMPL(DEBUG, STREAMS_GRAPH_LEADER, logRecordStream) +#define LOG_STREAMS_GRAPH_LEADER_TRACE(logRecordStream) LOG_STREAMS_IMPL(TRACE, STREAMS_GRAPH_LEADER, logRecordStream) // Component: YQ_CONTROL_PLANE_STORAGE. #define LOG_YQ_CONTROL_PLANE_STORAGE_EMERG(logRecordStream) LOG_STREAMS_IMPL(EMERG, YQ_CONTROL_PLANE_STORAGE, logRecordStream) @@ -196,3 +174,14 @@ #define LOG_YQ_HEALTH_INFO(logRecordStream) LOG_STREAMS_IMPL(INFO, YQ_HEALTH, logRecordStream) #define LOG_YQ_HEALTH_DEBUG(logRecordStream) LOG_STREAMS_IMPL(DEBUG, YQ_HEALTH, logRecordStream) #define LOG_YQ_HEALTH_TRACE(logRecordStream) LOG_STREAMS_IMPL(TRACE, YQ_HEALTH, logRecordStream) + +// Component: FQ_CONTROL_PLANE_CONFIG. +#define LOG_FQ_CONTROL_PLANE_CONFIG_EMERG(logRecordStream) LOG_STREAMS_IMPL(EMERG, FQ_CONTROL_PLANE_CONFIG, logRecordStream) +#define LOG_FQ_CONTROL_PLANE_CONFIG_ALERT(logRecordStream) LOG_STREAMS_IMPL(ALERT, FQ_CONTROL_PLANE_CONFIG, logRecordStream) +#define LOG_FQ_CONTROL_PLANE_CONFIG_CRIT(logRecordStream) LOG_STREAMS_IMPL(CRIT, FQ_CONTROL_PLANE_CONFIG, logRecordStream) +#define LOG_FQ_CONTROL_PLANE_CONFIG_ERROR(logRecordStream) LOG_STREAMS_IMPL(ERROR, FQ_CONTROL_PLANE_CONFIG, logRecordStream) +#define LOG_FQ_CONTROL_PLANE_CONFIG_WARN(logRecordStream) LOG_STREAMS_IMPL(WARN, FQ_CONTROL_PLANE_CONFIG, logRecordStream) +#define LOG_FQ_CONTROL_PLANE_CONFIG_NOTICE(logRecordStream) LOG_STREAMS_IMPL(NOTICE, FQ_CONTROL_PLANE_CONFIG, logRecordStream) +#define LOG_FQ_CONTROL_PLANE_CONFIG_INFO(logRecordStream) LOG_STREAMS_IMPL(INFO, FQ_CONTROL_PLANE_CONFIG, logRecordStream) +#define LOG_FQ_CONTROL_PLANE_CONFIG_DEBUG(logRecordStream) LOG_STREAMS_IMPL(DEBUG, FQ_CONTROL_PLANE_CONFIG, logRecordStream) +#define LOG_FQ_CONTROL_PLANE_CONFIG_TRACE(logRecordStream) LOG_STREAMS_IMPL(TRACE, FQ_CONTROL_PLANE_CONFIG, logRecordStream) diff --git a/ydb/core/yq/libs/actors/task_get.cpp b/ydb/core/yq/libs/actors/task_get.cpp index c4eec33e2f8..607eb4a0962 100644 --- a/ydb/core/yq/libs/actors/task_get.cpp +++ b/ydb/core/yq/libs/actors/task_get.cpp @@ -13,6 +13,7 @@ #include <ydb/core/yq/libs/common/entity_id.h> +#include <ydb/core/yq/libs/control_plane_config/control_plane_config.h> #include <ydb/core/yq/libs/control_plane_storage/control_plane_storage.h> #include <ydb/core/yq/libs/control_plane_storage/events/events.h> #include <ydb/library/security/util.h> @@ -83,13 +84,12 @@ public: void Bootstrap() { Become(&TGetTaskRequestActor::StateFunc); auto request = Ev->Record; - LOG_D("Request CP::GetTask with size: " << request.ByteSize() << " bytes"); - RequestedMBytes->Collect(request.ByteSize() / 1024 / 1024); OwnerId = request.owner_id(); Host = request.host(); Tenant = request.tenant(); - Send(NYq::ControlPlaneStorageServiceActorId(), - new NYq::TEvControlPlaneStorage::TEvGetTaskRequest(std::move(request))); + LOG_D("Request CP::GetTask with size: " << request.ByteSize() << " bytes"); + RequestedMBytes->Collect(request.ByteSize() / 1024 / 1024); + Send(ControlPlaneConfigActorId(), new TEvControlPlaneConfig::TEvGetTenantInfoRequest()); } private: @@ -128,12 +128,20 @@ private: } } + void Handle(TEvControlPlaneConfig::TEvGetTenantInfoResponse::TPtr& ev) { + auto request = Ev->Record; + auto event = std::make_unique<NYq::TEvControlPlaneStorage::TEvGetTaskRequest>(std::move(request)); + event->TenantInfo = ev->Get()->TenantInfo; + Send(NYq::ControlPlaneStorageServiceActorId(), event.release()); + } + private: STRICT_STFUNC( StateFunc, cFunc(NActors::TEvents::TEvPoison::EventType, PassAway) hFunc(NActors::TEvents::TEvUndelivered, OnUndelivered) hFunc(NYq::TEvControlPlaneStorage::TEvGetTaskResponse, HandleResponse) + hFunc(TEvControlPlaneConfig::TEvGetTenantInfoResponse, Handle) ) const NConfig::TTokenAccessorConfig TokenAccessorConfig; diff --git a/ydb/core/yq/libs/actors/task_ping.cpp b/ydb/core/yq/libs/actors/task_ping.cpp index d7b9cf1a7ff..d35e64918cb 100644 --- a/ydb/core/yq/libs/actors/task_ping.cpp +++ b/ydb/core/yq/libs/actors/task_ping.cpp @@ -11,8 +11,10 @@ #include <library/cpp/actors/core/log.h> #include <library/cpp/protobuf/interop/cast.h> +#include <ydb/core/yq/libs/control_plane_config/control_plane_config.h> #include <ydb/core/yq/libs/control_plane_storage/control_plane_storage.h> #include <ydb/core/yq/libs/control_plane_storage/events/events.h> + #include <google/protobuf/util/time_util.h> #define LOG_E(stream) \ @@ -82,13 +84,7 @@ public: Deadline = NProtoInterop::CastFromProto(req.deadline()); LOG_D("Request CP::PingTask with size: " << req.ByteSize() << " bytes"); RequestedMBytes->Collect(req.ByteSize() / 1024 / 1024); - try { - auto event = CreateControlPlaneEvent(); - Send(NYq::ControlPlaneStorageServiceActorId(), event.release()); - } catch (const std::exception& err) { - const auto msg = TStringBuilder() << "PingTask Boostrap Error: " << CurrentExceptionMessage(); - Fail(msg); - } + Send(ControlPlaneConfigActorId(), new TEvControlPlaneConfig::TEvGetTenantInfoRequest()); } private: @@ -97,6 +93,7 @@ private: cFunc(NActors::TEvents::TEvPoison::EventType, PassAway) hFunc(NYq::TEvControlPlaneStorage::TEvPingTaskResponse, HandleResponse) hFunc(NActors::TEvents::TEvUndelivered, OnUndelivered) + hFunc(TEvControlPlaneConfig::TEvGetTenantInfoResponse, Handle); ) std::unique_ptr<NYq::TEvControlPlaneStorage::TEvPingTaskRequest> CreateControlPlaneEvent() { @@ -120,6 +117,17 @@ private: PassAway(); } + void Handle(TEvControlPlaneConfig::TEvGetTenantInfoResponse::TPtr& ev) { + try { + auto event = CreateControlPlaneEvent(); + event->TenantInfo = ev->Get()->TenantInfo; + Send(NYq::ControlPlaneStorageServiceActorId(), event.release()); + } catch (const std::exception& err) { + const auto msg = TStringBuilder() << "PingTask Boostrap Error: " << CurrentExceptionMessage(); + Fail(msg); + } + } + private: const TActorId Sender; TIntrusivePtr<ITimeProvider> TimeProvider; diff --git a/ydb/core/yq/libs/config/protos/control_plane_proxy.proto b/ydb/core/yq/libs/config/protos/control_plane_proxy.proto index b7fa0c54647..f4ce0db813c 100644 --- a/ydb/core/yq/libs/config/protos/control_plane_proxy.proto +++ b/ydb/core/yq/libs/config/protos/control_plane_proxy.proto @@ -11,4 +11,5 @@ message TControlPlaneProxyConfig { string RequestTimeout = 2; bool EnablePermissions = 3; string MetricsTtl = 4; + string ConfigRetryPeriod = 31; } diff --git a/ydb/core/yq/libs/config/protos/control_plane_storage.proto b/ydb/core/yq/libs/config/protos/control_plane_storage.proto index f2417d42444..20d0e66814c 100644 --- a/ydb/core/yq/libs/config/protos/control_plane_storage.proto +++ b/ydb/core/yq/libs/config/protos/control_plane_storage.proto @@ -67,4 +67,6 @@ message TControlPlaneStorageConfig { repeated TRetryPolicyMapping RetryPolicyMapping = 26; string QuotaTtl = 28; string MetricsTtl = 29; + bool UseDbMapping = 30; + string DbReloadPeriod = 31; } diff --git a/ydb/core/yq/libs/control_plane_config/CMakeLists.txt b/ydb/core/yq/libs/control_plane_config/CMakeLists.txt new file mode 100644 index 00000000000..f36c4d681b8 --- /dev/null +++ b/ydb/core/yq/libs/control_plane_config/CMakeLists.txt @@ -0,0 +1,38 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(events) + +add_library(yq-libs-control_plane_config) +target_compile_options(yq-libs-control_plane_config PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(yq-libs-control_plane_config PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core + cpp-lwtrace-mon + monlib-service-pages + ydb-core-base + ydb-core-mon + yq-libs-common + yq-libs-config + libs-control_plane_config-events + yq-libs-quota_manager + libs-quota_manager-events + libs-rate_limiter-events + yq-libs-ydb + ydb-library-security + cpp-client-ydb_scheme + cpp-client-ydb_value + ydb-library-protobuf_printer + yql-public-issue +) +target_sources(yq-libs-control_plane_config PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/control_plane_config/control_plane_config.cpp +) diff --git a/ydb/core/yq/libs/control_plane_config/control_plane_config.cpp b/ydb/core/yq/libs/control_plane_config/control_plane_config.cpp new file mode 100644 index 00000000000..6f22f18875c --- /dev/null +++ b/ydb/core/yq/libs/control_plane_config/control_plane_config.cpp @@ -0,0 +1,259 @@ +#include "control_plane_config.h" + +#include <ydb/core/yq/libs/actors/logging/log.h> +#include <ydb/core/yq/libs/config/yq_issue.h> +#include <ydb/core/yq/libs/common/cache.h> +#include <ydb/core/yq/libs/common/entity_id.h> +#include <ydb/core/yq/libs/control_plane_storage/control_plane_storage.h> +#include <ydb/core/yq/libs/control_plane_storage/events/events.h> +#include <ydb/core/yq/libs/control_plane_storage/util.h> +#include <ydb/core/yq/libs/quota_manager/quota_manager.h> +#include <ydb/core/yq/libs/shared_resources/db_exec.h> +#include <ydb/core/yq/libs/test_connection/events/events.h> +//#include <ydb/core/yq/libs/test_connection/test_connection.h> +#include <ydb/core/yq/libs/ydb/util.h> +#include <ydb/core/yq/libs/ydb/ydb.h> +#include <ydb/core/yq/libs/control_plane_storage/schema.h> +#include <ydb/core/yq/libs/db_schema/db_schema.h> +#include <ydb/core/yq/libs/quota_manager/quota_manager.h> + +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/actor.h> + +#include <ydb/core/base/kikimr_issue.h> +#include <ydb/library/yql/public/issue/yql_issue_message.h> +#include <ydb/library/security/util.h> +#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h> + +#include <util/generic/maybe.h> +#include <util/generic/ptr.h> +#include <util/datetime/base.h> +#include <util/digest/multi.h> +#include <util/generic/yexception.h> +#include <util/string/join.h> +#include <util/system/hostname.h> + +namespace NYq { + +using TTenantExecuter = TDbExecuter<TTenantInfo::TPtr>; +using TStateTimeExecuter = TDbExecuter<TInstant>; + +class TControlPlaneConfigActor : public NActors::TActorBootstrapped<TControlPlaneConfigActor> { + + ::NYq::TYqSharedResources::TPtr YqSharedResources; + NKikimr::TYdbCredentialsProviderFactory CredProviderFactory; + TYdbConnectionPtr YdbConnection; + TDbPool::TPtr DbPool; + ::NMonitoring::TDynamicCounterPtr Counters; + NConfig::TControlPlaneStorageConfig Config; + TTenantInfo::TPtr TenantInfo; + bool LoadInProgress = false; + TDuration DbReloadPeriod; + +public: + TControlPlaneConfigActor(const ::NYq::TYqSharedResources::TPtr& yqSharedResources, const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory, const NConfig::TControlPlaneStorageConfig& config, const ::NMonitoring::TDynamicCounterPtr& counters) + : YqSharedResources(yqSharedResources) + , CredProviderFactory(credProviderFactory) + , Counters(counters) + , Config(config) + { + DbReloadPeriod = GetDuration(Config.GetDbReloadPeriod(), TDuration::Seconds(3)); + } + + static constexpr char ActorName[] = "FQ_CONTROL_PLANE_CONFIG"; + + void Bootstrap() { + CPC_LOG_D("STARTING: " << SelfId()); + Become(&TControlPlaneConfigActor::StateFunc); + YdbConnection = NewYdbConnection(Config.GetStorage(), CredProviderFactory, YqSharedResources->CoreYdbDriver); + DbPool = YqSharedResources->DbPoolHolder->GetOrCreate(EDbPoolId::MAIN, 10, YdbConnection->TablePathPrefix); + if (Config.GetUseDbMapping()) { + Schedule(TDuration::Zero(), new NActors::TEvents::TEvWakeup()); + } else { + TenantInfo.reset(new TTenantInfo()); + const auto& mapping = Config.GetMapping(); + for (const auto& cloudToTenant : mapping.GetCloudIdToTenantName()) { + TenantInfo->SubjectMapping[SUBJECT_TYPE_CLOUD].emplace(cloudToTenant.GetKey(), cloudToTenant.GetValue()); + TenantInfo->TenantMapping.emplace(cloudToTenant.GetValue(), cloudToTenant.GetValue()); + } + for (const auto& commonTenantName : mapping.GetCommonTenantName()) { + TenantInfo->TenantMapping.emplace(commonTenantName, commonTenantName); + TenantInfo->CommonVTenants.push_back(commonTenantName); + } + } + } + +private: + + STRICT_STFUNC(StateFunc, + hFunc(NActors::TEvents::TEvWakeup, Handle); + hFunc(TEvents::TEvCallback, [](TEvents::TEvCallback::TPtr& ev) { ev->Get()->Callback(); } ); + hFunc(TEvControlPlaneStorage::TEvGetTaskResponse, Handle); // self ping response + hFunc(TEvControlPlaneConfig::TEvGetTenantInfoRequest, Handle); + ) + + void Handle(NActors::TEvents::TEvWakeup::TPtr&) { + if (!LoadInProgress) { + LoadTenantsAndMapping(); + } + Schedule(DbReloadPeriod, new NActors::TEvents::TEvWakeup()); + } + + void Handle(TEvControlPlaneStorage::TEvGetTaskResponse::TPtr& ev) { + if (ev->Get()->Issues) { + CPC_LOG_E("TEvGetTaskResponse (Self Ping): " << ev->Get()->Issues.ToOneLineString()); + } else if (ev->Get()->Record.tasks().size()) { + CPC_LOG_E("TEvGetTaskResponse (Self Ping) returned : " << ev->Get()->Record.tasks().size() << " tasks, empty list expected"); + } + } + + void Handle(TEvControlPlaneConfig::TEvGetTenantInfoRequest::TPtr& ev) { + if (!TenantInfo) { + CPC_LOG_W("TEvGetTenantInfoRequest: IS NOT READY yet"); + } + Send(ev->Sender, new TEvControlPlaneConfig::TEvGetTenantInfoResponse(TenantInfo), 0, ev->Cookie); + } + + void LoadTenantsAndMapping() { + + LoadInProgress = true; + TDbExecutable::TPtr executable; + auto& executer = TTenantExecuter::Create(executable, true, [](TTenantExecuter& executer) { executer.State.reset(new TTenantInfo()); } ); + + executer.Read( + [=](TTenantExecuter&, TSqlQueryBuilder& builder) { + builder.AddText( + "SELECT `" TENANT_COLUMN_NAME "`, `" VTENANT_COLUMN_NAME "`, `" COMMON_COLUMN_NAME "`, `" STATE_COLUMN_NAME "`, `" STATE_TIME_COLUMN_NAME "`\n" + "FROM `" TENANTS_TABLE_NAME "`;\n" + "SELECT `" SUBJECT_TYPE_COLUMN_NAME "`, `" SUBJECT_ID_COLUMN_NAME "`, `" VTENANT_COLUMN_NAME "`\n" + "FROM `" MAPPINGS_TABLE_NAME "`;\n" + ); + }, + [=](TTenantExecuter& executer, const TVector<NYdb::TResultSet>& resultSets) { + + auto& info = *executer.State; + + { + info.CommonVTenants.clear(); + TResultSetParser parser(resultSets.front()); + while (parser.TryNextRow()) { + auto tenant = *parser.ColumnParser(TENANT_COLUMN_NAME).GetOptionalString(); + auto vtenantColumn = parser.ColumnParser(VTENANT_COLUMN_NAME).GetOptionalString(); + TString vtenant = vtenantColumn ? *vtenantColumn : ""; + auto common = *parser.ColumnParser(COMMON_COLUMN_NAME).GetOptionalBool(); + auto state = *parser.ColumnParser(STATE_COLUMN_NAME).GetOptionalUint32(); + auto stateTime = *parser.ColumnParser(STATE_TIME_COLUMN_NAME).GetOptionalTimestamp(); + + info.TenantState.emplace(tenant, state); + if (info.StateTime < stateTime) { + info.StateTime = stateTime; + } + + if (vtenant) { + info.TenantMapping.emplace(vtenant, tenant); + } + + if (vtenant && common) { + info.CommonVTenants.push_back(vtenant); + } + } + } + + { + TResultSetParser parser(resultSets[1]); + while (parser.TryNextRow()) { + auto subject_type = *parser.ColumnParser(SUBJECT_TYPE_COLUMN_NAME).GetOptionalString(); + auto subject_id = *parser.ColumnParser(SUBJECT_ID_COLUMN_NAME).GetOptionalString(); + auto vtenant = *parser.ColumnParser(VTENANT_COLUMN_NAME).GetOptionalString(); + info.SubjectMapping[subject_type].emplace(subject_id, vtenant); + } + } + }, + "ReadTenants", true + ).Process(SelfId(), + [=, this](TTenantExecuter& executer) { + if (executer.State->CommonVTenants.size()) { + std::sort(executer.State->CommonVTenants.begin(), executer.State->CommonVTenants.end()); + } + bool refreshed = !this->TenantInfo || (this->TenantInfo->StateTime < executer.State->StateTime); + auto oldInfo = this->TenantInfo; + this->TenantInfo = executer.State; + + if (refreshed) { + CPC_LOG_D("LOADED TenantInfo: State CHANGED at " << this->TenantInfo->StateTime); + } else { + CPC_LOG_T("LOADED TenantInfo: State NOT changed"); + } + + if (refreshed) { + // write ack only after all members are assigned + TDbExecutable::TPtr executable; + auto& executer = TStateTimeExecuter::Create(executable, true, nullptr); + executer.State = this->TenantInfo->StateTime; + + executer.Write( + [=, nodeId = this->SelfId().NodeId()](TStateTimeExecuter& executer, TSqlQueryBuilder& builder) { + builder.AddUint32("node_id", nodeId); + builder.AddTimestamp("state_time", executer.State); + builder.AddText( + "UPSERT INTO `" TENANT_ACKS_TABLE_NAME "` (`" NODE_ID_COLUMN_NAME "`, `" STATE_TIME_COLUMN_NAME "`)\n" + " VALUES ($node_id, $state_time);\n" + ); + }, + "WriteStateTime", true + ); + + if (oldInfo) { + executer.Process(SelfId(), + [this, oldInfo=oldInfo](TStateTimeExecuter&) { + this->ReflectTenantChanges(oldInfo); + } + ); + } + + Exec(DbPool, executable); + } + + LoadInProgress = false; + } + ); + + Exec(DbPool, executable); + } + + void ReflectTenantChanges(TTenantInfo::TPtr oldInfo) { + for (auto& p : TenantInfo->TenantState) { + auto& tenant = p.first; + auto state = p.second; + if (oldInfo->TenantState.Value(tenant, state) != state) { + CPC_LOG_D("Tenant " << tenant << " state CHANGED to " << state); + switch (state) { + case TenantState::Idle: + ReassignPending(tenant); + break; + } + } + } + } + + void ReassignPending(const TString& tenant) { + Fq::Private::GetTaskRequest request; + request.set_tenant(tenant); + request.set_owner_id("Self-Ping-To-Move-Jobs"); + request.set_host(HostName()); + auto event = std::make_unique<TEvControlPlaneStorage::TEvGetTaskRequest>(std::move(request)); + event->TenantInfo = TenantInfo; + Send(ControlPlaneStorageServiceActorId(), event.release()); + } +}; + +TActorId ControlPlaneConfigActorId() { + constexpr TStringBuf name = "FQCTLCFG"; + return NActors::TActorId(0, name); +} + +IActor* CreateControlPlaneConfigActor(const ::NYq::TYqSharedResources::TPtr& yqSharedResources, const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory, const NConfig::TControlPlaneStorageConfig& config, const ::NMonitoring::TDynamicCounterPtr& counters) { + return new TControlPlaneConfigActor(yqSharedResources, credProviderFactory, config, counters); +} + +} // namespace NYq diff --git a/ydb/core/yq/libs/control_plane_config/control_plane_config.h b/ydb/core/yq/libs/control_plane_config/control_plane_config.h new file mode 100644 index 00000000000..6894f6bc2e2 --- /dev/null +++ b/ydb/core/yq/libs/control_plane_config/control_plane_config.h @@ -0,0 +1,28 @@ +#pragma once + +#include <ydb/core/yq/libs/actors/logging/log.h> +#include <ydb/core/yq/libs/config/protos/control_plane_storage.pb.h> +#include <ydb/core/yq/libs/shared_resources/shared_resources.h> + +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> + + +#define CPC_LOG_D(s) \ + LOG_FQ_CONTROL_PLANE_CONFIG_DEBUG(s) +#define CPC_LOG_I(s) \ + LOG_FQ_CONTROL_PLANE_CONFIG_INFO(s) +#define CPC_LOG_W(s) \ + LOG_FQ_CONTROL_PLANE_CONFIG_WARN(s) +#define CPC_LOG_E(s) \ + LOG_FQ_CONTROL_PLANE_CONFIG_ERROR(s) +#define CPC_LOG_T(s) \ + LOG_FQ_CONTROL_PLANE_CONFIG_TRACE(s) + +namespace NYq { + +NActors::TActorId ControlPlaneConfigActorId(); + +NActors::IActor* CreateControlPlaneConfigActor(const ::NYq::TYqSharedResources::TPtr& yqSharedResources, const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory, const NConfig::TControlPlaneStorageConfig& config, const ::NMonitoring::TDynamicCounterPtr& counters); + +} // namespace NYq diff --git a/ydb/core/yq/libs/control_plane_config/events/CMakeLists.txt b/ydb/core/yq/libs/control_plane_config/events/CMakeLists.txt new file mode 100644 index 00000000000..b33491aad2e --- /dev/null +++ b/ydb/core/yq/libs/control_plane_config/events/CMakeLists.txt @@ -0,0 +1,22 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(libs-control_plane_config-events) +target_link_libraries(libs-control_plane_config-events PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core + cpp-actors-interconnect + libs-control_plane_storage-proto + yq-libs-events + libs-quota_manager-events +) +target_sources(libs-control_plane_config-events PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/control_plane_config/events/events.cpp +) diff --git a/ydb/core/yq/libs/control_plane_config/events/events.cpp b/ydb/core/yq/libs/control_plane_config/events/events.cpp new file mode 100644 index 00000000000..6c3d2603e7e --- /dev/null +++ b/ydb/core/yq/libs/control_plane_config/events/events.cpp @@ -0,0 +1 @@ +#include "events.h" diff --git a/ydb/core/yq/libs/control_plane_config/events/events.h b/ydb/core/yq/libs/control_plane_config/events/events.h new file mode 100644 index 00000000000..67ff3df811b --- /dev/null +++ b/ydb/core/yq/libs/control_plane_config/events/events.h @@ -0,0 +1,68 @@ +#pragma once + +#include <ydb/core/yq/libs/events/event_subspace.h> +#include <ydb/core/yq/libs/quota_manager/events/events.h> + +#include <ydb/public/api/protos/yq.pb.h> + +#include <library/cpp/actors/core/event_pb.h> +#include <library/cpp/actors/core/events.h> +#include <library/cpp/actors/interconnect/events_local.h> + +#include <ydb/library/yql/public/issue/yql_issue.h> + +#include <util/digest/multi.h> + +namespace NYq { + +enum TenantState : ui64 { + Active = 0, + Pending = 1, + Idle = 2, +}; + +struct TTenantInfo { + + using TPtr = std::shared_ptr<TTenantInfo>; + + THashMap<TString /* subject type */, THashMap<TString /* subject id */, TString /* vtenant */>> SubjectMapping; + TVector<TString> CommonVTenants; + THashMap<TString /* vtenant */, TString /* tenant */> TenantMapping; + THashMap<TString /* tenant */, ui32 /* state */> TenantState; + TInstant StateTime; + + TString Assign(const TString& cloudId, const TString& /* scope */, const TString& DefaultTenantName = "") { + + auto vTenant = SubjectMapping[SUBJECT_TYPE_CLOUD].Value(cloudId, ""); + if (!vTenant && CommonVTenants.size()) { + vTenant = CommonVTenants[MultiHash(cloudId) % CommonVTenants.size()]; + } + + auto tenant = vTenant ? TenantMapping.Value(vTenant, DefaultTenantName) : DefaultTenantName; + // CPS_LOG_D("AssignTenantName: {" << cloudId << ", " << scope << "} => " << tenant); + // Cerr << "AssignTenantName: {" << cloudId << ", " << scope << "} => " << tenant << Endl; + return tenant; + } +}; + +struct TEvControlPlaneConfig { + // Event ids. + enum EEv : ui32 { + EvGetTenantInfoRequest = YqEventSubspaceBegin(NYq::TYqEventSubspace::ControlPlaneConfig), + EvGetTenantInfoResponse, + EvEnd, + }; + + static_assert(EvEnd <= YqEventSubspaceEnd(NYq::TYqEventSubspace::ControlPlaneConfig), "All events must be in their subspace"); + + struct TEvGetTenantInfoRequest : NActors::TEventLocal<TEvGetTenantInfoRequest, EvGetTenantInfoRequest> { + }; + + struct TEvGetTenantInfoResponse : NActors::TEventLocal<TEvGetTenantInfoResponse, EvGetTenantInfoResponse> { + TEvGetTenantInfoResponse(TTenantInfo::TPtr tenantInfo) : TenantInfo(tenantInfo) { + } + TTenantInfo::TPtr TenantInfo; + }; +}; + +} diff --git a/ydb/core/yq/libs/control_plane_proxy/CMakeLists.txt b/ydb/core/yq/libs/control_plane_proxy/CMakeLists.txt index 1fb348a7995..5b7328b0288 100644 --- a/ydb/core/yq/libs/control_plane_proxy/CMakeLists.txt +++ b/ydb/core/yq/libs/control_plane_proxy/CMakeLists.txt @@ -21,6 +21,7 @@ target_link_libraries(yq-libs-control_plane_proxy PUBLIC ydb-core-mon libs-actors-logging yq-libs-actors + yq-libs-control_plane_config libs-control_plane_proxy-events yq-libs-control_plane_storage ydb-library-folder_service diff --git a/ydb/core/yq/libs/control_plane_proxy/config.cpp b/ydb/core/yq/libs/control_plane_proxy/config.cpp index f89cf1e2019..afd948cc645 100644 --- a/ydb/core/yq/libs/control_plane_proxy/config.cpp +++ b/ydb/core/yq/libs/control_plane_proxy/config.cpp @@ -21,6 +21,10 @@ NConfig::TControlPlaneProxyConfig FillDefaultParameters(NConfig::TControlPlanePr config.SetMetricsTtl("1d"); } + if (!config.GetConfigRetryPeriod()) { + config.SetConfigRetryPeriod("100ms"); + } + return config; } @@ -30,6 +34,7 @@ TControlPlaneProxyConfig::TControlPlaneProxyConfig(const NConfig::TControlPlaneP : Proto(FillDefaultParameters(config)) , RequestTimeout(GetDuration(Proto.GetRequestTimeout(), TDuration::Seconds(30))) , MetricsTtl(GetDuration(Proto.GetMetricsTtl(), TDuration::Days(1))) + , ConfigRetryPeriod(GetDuration(Proto.GetConfigRetryPeriod(), TDuration::MilliSeconds(100))) { } diff --git a/ydb/core/yq/libs/control_plane_proxy/config.h b/ydb/core/yq/libs/control_plane_proxy/config.h index 8cb2d46dacf..af6342cfc12 100644 --- a/ydb/core/yq/libs/control_plane_proxy/config.h +++ b/ydb/core/yq/libs/control_plane_proxy/config.h @@ -10,6 +10,7 @@ struct TControlPlaneProxyConfig { NConfig::TControlPlaneProxyConfig Proto; TDuration RequestTimeout; TDuration MetricsTtl; + TDuration ConfigRetryPeriod; TControlPlaneProxyConfig(const NConfig::TControlPlaneProxyConfig& config); }; diff --git a/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp b/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp index 57013f959b2..ebc34c2b6cf 100644 --- a/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp +++ b/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp @@ -5,6 +5,7 @@ #include <ydb/core/yq/libs/actors/logging/log.h> #include <ydb/core/yq/libs/common/cache.h> +#include <ydb/core/yq/libs/control_plane_config/control_plane_config.h> #include <ydb/core/yq/libs/control_plane_storage/control_plane_storage.h> #include <ydb/core/yq/libs/control_plane_storage/events/events.h> #include <ydb/core/yq/libs/control_plane_storage/util.h> @@ -16,6 +17,7 @@ #include <ydb/core/yq/libs/config/yq_issue.h> #include <ydb/core/yq/libs/control_plane_proxy/events/events.h> +#include <ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.h> #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/actor.h> @@ -321,6 +323,7 @@ class TRequestActor : public NActors::TActorBootstrapped<TRequestActor<TRequestP using TBase::Send; using TBase::PassAway; using TBase::Become; + using TBase::Schedule; ::NYq::TControlPlaneProxyConfig Config; TRequestProto RequestProto; @@ -333,10 +336,11 @@ class TRequestActor : public NActors::TActorBootstrapped<TRequestActor<TRequestP TActorId ServiceId; TRequestCounters Counters; TInstant StartTime; - std::function<void(const TDuration&, bool, bool)> Probe; + std::function<void(const TDuration&, bool /* isSuccess */, bool /* isTimeout */)> Probe; TPermissions Permissions; TString CloudId; TQuotaMap Quotas; + ui32 RetryCount = 0; public: static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_REQUEST_ACTOR"; @@ -373,14 +377,30 @@ public: void Bootstrap() { CPP_LOG_T("Request actor. Actor id: " << SelfId()); Become(&TRequestActor::StateFunc, Config.RequestTimeout, new NActors::TEvents::TEvWakeup()); - Send(ServiceId, new TRequest(Scope, RequestProto, User, Token, CloudId, Permissions, Quotas), 0, Cookie); + Send(ControlPlaneConfigActorId(), new TEvControlPlaneConfig::TEvGetTenantInfoRequest()); } STRICT_STFUNC(StateFunc, cFunc(NActors::TEvents::TSystem::Wakeup, HandleTimeout); hFunc(TResponse, Handle); + cFunc(TEvControlPlaneConfig::EvGetTenantInfoRequest, HandleRetry); + hFunc(TEvControlPlaneConfig::TEvGetTenantInfoResponse, Handle); ) + void HandleRetry() { + Send(ControlPlaneConfigActorId(), new TEvControlPlaneConfig::TEvGetTenantInfoRequest()); + } + + void Handle(TEvControlPlaneConfig::TEvGetTenantInfoResponse::TPtr& ev) { + auto tenantInfo = ev->Get()->TenantInfo; + if (tenantInfo) { + Send(ServiceId, new TRequest(Scope, RequestProto, User, Token, CloudId, Permissions, Quotas, tenantInfo), 0, Cookie); + } else { + RetryCount++; + Schedule(Now() + Config.ConfigRetryPeriod * (1 << RetryCount), new TEvControlPlaneConfig::TEvGetTenantInfoRequest()); + } + } + void HandleTimeout() { CPP_LOG_D("Request timeout. " << RequestProto.DebugString()); NYql::TIssues issues; diff --git a/ydb/core/yq/libs/control_plane_proxy/events/events.h b/ydb/core/yq/libs/control_plane_proxy/events/events.h index bf0d956865f..cb6279d2871 100644 --- a/ydb/core/yq/libs/control_plane_proxy/events/events.h +++ b/ydb/core/yq/libs/control_plane_proxy/events/events.h @@ -70,13 +70,15 @@ struct TEvControlPlaneProxy { const TString& user, const TString& token, const TVector<TString>& permissions, - const TQuotaMap& quotas = {}) + const TQuotaMap& quotas = {}, + TTenantInfo::TPtr tenantInfo = nullptr) : FolderId(folderId) , Request(request) , User(user) , Token(token) , Permissions(permissions) , Quotas(quotas) + , TenantInfo(tenantInfo) { } @@ -87,6 +89,7 @@ struct TEvControlPlaneProxy { TString Token; TVector<TString> Permissions; TQuotaMap Quotas; + TTenantInfo::TPtr TenantInfo; }; template<typename TDerived, typename ProtoMessage, ui32 EventType> diff --git a/ydb/core/yq/libs/control_plane_storage/events/events.h b/ydb/core/yq/libs/control_plane_storage/events/events.h index a142d1b565d..99a570b52a5 100644 --- a/ydb/core/yq/libs/control_plane_storage/events/events.h +++ b/ydb/core/yq/libs/control_plane_storage/events/events.h @@ -14,6 +14,7 @@ #include <ydb/library/yql/public/issue/yql_issue.h> +#include <ydb/core/yq/libs/control_plane_config/events/events.h> #include <ydb/core/yq/libs/control_plane_storage/proto/yq_internal.pb.h> #include <ydb/core/yq/libs/events/event_subspace.h> #include <ydb/core/yq/libs/quota_manager/events/events.h> @@ -214,7 +215,8 @@ struct TEvControlPlaneStorage { const TString& token, const TString& cloudId, TPermissions permissions, - const TQuotaMap& quotas) + const TQuotaMap& quotas, + TTenantInfo::TPtr tenantInfo) : Scope(scope) , Request(request) , User(user) @@ -222,6 +224,7 @@ struct TEvControlPlaneStorage { , CloudId(cloudId) , Permissions(permissions) , Quotas(quotas) + , TenantInfo(tenantInfo) { } @@ -241,6 +244,7 @@ struct TEvControlPlaneStorage { TString CloudId; TPermissions Permissions; TQuotaMap Quotas; + TTenantInfo::TPtr TenantInfo; }; template<typename TDerived, typename ProtoMessage, ui32 EventType> @@ -417,6 +421,7 @@ struct TEvControlPlaneStorage { } Fq::Private::GetTaskRequest Request; + TTenantInfo::TPtr TenantInfo; }; struct TEvGetTaskResponse : NActors::TEventLocal<TEvGetTaskResponse, EvGetTaskResponse> { @@ -480,6 +485,7 @@ struct TEvControlPlaneStorage { } Fq::Private::PingTaskRequest Request; + TTenantInfo::TPtr TenantInfo; }; struct TEvPingTaskResponse : NActors::TEventLocal<TEvPingTaskResponse, EvPingTaskResponse> { diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp index e5860b8c724..79f18095922 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp +++ b/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp @@ -16,13 +16,15 @@ namespace { struct TTaskInternal { TTask Task; TRetryLimiter RetryLimiter; - bool ShouldAbortTask = false; + bool ShouldAbortTask = false; // force ABORTED_BY_SYSTEM + bool ShouldSkipTask = false; // tenant fetch denied or tenant must be changed TString TablePathPrefix; TString Owner; TString HostName; TMaybe<YandexQuery::Job> Job; TInstant Deadline; TString TenantName; + TString NewTenantName; }; TString GetServiceAccountId(const YandexQuery::IamAuth& auth) { @@ -56,38 +58,88 @@ struct TTaskInternal { } std::pair<TString, NYdb::TParams> MakeSql(const TTaskInternal& taskInternal, const TInstant& nowTimestamp, const TInstant& taskLeaseUntil) { - const auto& task = taskInternal.Task; - TSqlQueryBuilder queryBuilder(taskInternal.TablePathPrefix, "GetTask(write)"); - queryBuilder.AddString("tenant", taskInternal.TenantName); - queryBuilder.AddString("scope", task.Scope); - queryBuilder.AddString("query_id", task.QueryId); - queryBuilder.AddString("query", task.Query.SerializeAsString()); - queryBuilder.AddString("internal", task.Internal.SerializeAsString()); - queryBuilder.AddString("host", taskInternal.HostName); - queryBuilder.AddString("owner", taskInternal.Owner); - queryBuilder.AddTimestamp("now", nowTimestamp); - queryBuilder.AddTimestamp("ttl", taskLeaseUntil); - queryBuilder.AddUint64("retry_counter", taskInternal.RetryLimiter.RetryCount); - queryBuilder.AddUint64("generation", task.Generation); - queryBuilder.AddTimestamp("retry_counter_update_time", taskInternal.RetryLimiter.RetryCounterUpdatedAt); - queryBuilder.AddDouble("retry_rate", taskInternal.RetryLimiter.RetryRate); - // update queries - queryBuilder.AddText( - "UPDATE `" QUERIES_TABLE_NAME "` SET `" GENERATION_COLUMN_NAME "` = $generation, `" QUERY_COLUMN_NAME "` = $query, `" INTERNAL_COLUMN_NAME "` = $internal\n" - "WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n" - ); + if (taskInternal.ShouldSkipTask) { + + if (taskInternal.NewTenantName) { + const auto& task = taskInternal.Task; + TSqlQueryBuilder queryBuilder(taskInternal.TablePathPrefix, "GetTask(move)"); + queryBuilder.AddString("tenant", taskInternal.TenantName); + queryBuilder.AddString("new_tenant", taskInternal.NewTenantName); + queryBuilder.AddString("scope", task.Scope); + queryBuilder.AddString("query_id", task.QueryId); + queryBuilder.AddInt64("query_type", task.Query.content().type()); + queryBuilder.AddString("query", task.Query.SerializeAsString()); + queryBuilder.AddString("internal", task.Internal.SerializeAsString()); + queryBuilder.AddTimestamp("now", nowTimestamp); + queryBuilder.AddTimestamp("zero_timestamp", TInstant::Zero()); + queryBuilder.AddUint64("retry_counter", taskInternal.RetryLimiter.RetryCount); + queryBuilder.AddUint64("generation", task.Generation); + queryBuilder.AddTimestamp("retry_counter_update_time", taskInternal.RetryLimiter.RetryCounterUpdatedAt); + queryBuilder.AddDouble("retry_rate", taskInternal.RetryLimiter.RetryRate); + + // update queries + queryBuilder.AddText( + "UPDATE `" QUERIES_TABLE_NAME "` SET `" GENERATION_COLUMN_NAME "` = $generation, `" QUERY_COLUMN_NAME "` = $query, `" INTERNAL_COLUMN_NAME "` = $internal, `" TENANT_COLUMN_NAME "` = $new_tenant\n" + "WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n" + ); - // update pending small - queryBuilder.AddText( - "UPDATE `" PENDING_SMALL_TABLE_NAME "` SET `" LAST_SEEN_AT_COLUMN_NAME "` = $now, `" ASSIGNED_UNTIL_COLUMN_NAME "` = $ttl,\n" - "`" RETRY_COUNTER_COLUMN_NAME "` = $retry_counter, `" RETRY_COUNTER_UPDATE_COLUMN_NAME "` = $retry_counter_update_time, `" RETRY_RATE_COLUMN_NAME "` = $retry_rate,\n" - "`" HOST_NAME_COLUMN_NAME "` = $host, `" OWNER_COLUMN_NAME "` = $owner\n" - "WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n" - ); + // delete old record + queryBuilder.AddText( + "DELETE FROM `" PENDING_SMALL_TABLE_NAME "`\n" + "WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n" + ); - const auto query = queryBuilder.Build(); - return std::make_pair(query.Sql, query.Params); + // insert for new tenant + queryBuilder.AddText( + "UPSERT INTO `" PENDING_SMALL_TABLE_NAME "`\n" + "(`" TENANT_COLUMN_NAME "`, `" SCOPE_COLUMN_NAME "`, `" QUERY_ID_COLUMN_NAME "`, `" QUERY_TYPE_COLUMN_NAME "`, `" LAST_SEEN_AT_COLUMN_NAME "`, `" ASSIGNED_UNTIL_COLUMN_NAME "`,\n" + "`" RETRY_RATE_COLUMN_NAME "`, `" RETRY_COUNTER_COLUMN_NAME "`, `" RETRY_COUNTER_UPDATE_COLUMN_NAME "`, `" HOST_NAME_COLUMN_NAME "`, `" OWNER_COLUMN_NAME "`)\n" + "VALUES\n" + " ($new_tenant, $scope, $query_id, $query_type, $now, $zero_timestamp, $retry_rate, $retry_counter, $retry_counter_update_time, \"\", \"\");" + ); + + const auto query = queryBuilder.Build(); + return std::make_pair(query.Sql, query.Params); + + } else { + return std::make_pair("", NYdb::TParamsBuilder().Build()); + } + + } else { + const auto& task = taskInternal.Task; + TSqlQueryBuilder queryBuilder(taskInternal.TablePathPrefix, "GetTask(write)"); + queryBuilder.AddString("tenant", taskInternal.TenantName); + queryBuilder.AddString("scope", task.Scope); + queryBuilder.AddString("query_id", task.QueryId); + queryBuilder.AddString("query", task.Query.SerializeAsString()); + queryBuilder.AddString("internal", task.Internal.SerializeAsString()); + queryBuilder.AddString("host", taskInternal.HostName); + queryBuilder.AddString("owner", taskInternal.Owner); + queryBuilder.AddTimestamp("now", nowTimestamp); + queryBuilder.AddTimestamp("ttl", taskLeaseUntil); + queryBuilder.AddUint64("retry_counter", taskInternal.RetryLimiter.RetryCount); + queryBuilder.AddUint64("generation", task.Generation); + queryBuilder.AddTimestamp("retry_counter_update_time", taskInternal.RetryLimiter.RetryCounterUpdatedAt); + queryBuilder.AddDouble("retry_rate", taskInternal.RetryLimiter.RetryRate); + + // update queries + queryBuilder.AddText( + "UPDATE `" QUERIES_TABLE_NAME "` SET `" GENERATION_COLUMN_NAME "` = $generation, `" QUERY_COLUMN_NAME "` = $query, `" INTERNAL_COLUMN_NAME "` = $internal\n" + "WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n" + ); + + // update pending small + queryBuilder.AddText( + "UPDATE `" PENDING_SMALL_TABLE_NAME "` SET `" LAST_SEEN_AT_COLUMN_NAME "` = $now, `" ASSIGNED_UNTIL_COLUMN_NAME "` = $ttl,\n" + "`" RETRY_COUNTER_COLUMN_NAME "` = $retry_counter, `" RETRY_COUNTER_UPDATE_COLUMN_NAME "` = $retry_counter_update_time, `" RETRY_RATE_COLUMN_NAME "` = $retry_rate,\n" + "`" HOST_NAME_COLUMN_NAME "` = $host, `" OWNER_COLUMN_NAME "` = $owner\n" + "WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n" + ); + + const auto query = queryBuilder.Build(); + return std::make_pair(query.Sql, query.Params); + } } } // namespace @@ -99,7 +151,8 @@ std::tuple<TString, NYdb::TParams, std::function<std::pair<TString, NYdb::TParam const TInstant& taskLeaseUntil, bool disableCurrentIam, const TDuration& automaticQueriesTtl, - const TDuration& resultSetsTtl) + const TDuration& resultSetsTtl, + std::shared_ptr<TTenantInfo> tenantInfo) { const auto& task = taskInternal.Task; @@ -118,7 +171,7 @@ std::tuple<TString, NYdb::TParams, std::function<std::pair<TString, NYdb::TParam "WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id AND `" ASSIGNED_UNTIL_COLUMN_NAME "` < $now;\n" ); - auto prepareParams = [=, taskInternal=taskInternal, responseTasks=responseTasks](const TVector<TResultSet>& resultSets) mutable { + auto prepareParams = [=, taskInternal=taskInternal, responseTasks=responseTasks, tenantInfo=tenantInfo](const TVector<TResultSet>& resultSets) mutable { auto& task = taskInternal.Task; const auto shouldAbortTask = taskInternal.ShouldAbortTask; constexpr size_t expectedResultSetsSize = 2; @@ -155,8 +208,23 @@ std::tuple<TString, NYdb::TParams, std::function<std::pair<TString, NYdb::TParam task.Query.mutable_meta()->set_status(YandexQuery::QueryMeta::ABORTING_BY_SYSTEM); } - responseTasks->AddTaskBlocking(task.QueryId, task); + if (tenantInfo->TenantState.Value(taskInternal.TenantName, TenantState::Active) != TenantState::Active) { + // tenant graceful shutdown, task fetch prohibited + taskInternal.ShouldSkipTask = true; + } + if (tenantInfo) { + auto tenant = tenantInfo->Assign(taskInternal.Task.Internal.cloud_id(), task.Scope, taskInternal.TenantName); + if (tenant != taskInternal.TenantName) { + // mapping changed, reassign tenant + taskInternal.ShouldSkipTask = true; + taskInternal.NewTenantName = tenant; + } + } + + if (!taskInternal.ShouldSkipTask) { + responseTasks->AddTaskBlocking(task.QueryId, task); + } return MakeSql(taskInternal, nowTimestamp, taskLeaseUntil); }; @@ -217,8 +285,13 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ CPS_LOG_T("GetTaskRequest: {" << request.DebugString() << "}"); NYql::TIssues issues = ValidateGetTask(owner, hostName); + + if (!ev->Get()->TenantInfo) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::NOT_READY, "Control Plane is not ready yet. Please retry later.")); + } + if (issues) { - CPS_LOG_W("GetTaskRequest: {" << request.DebugString() << "} validation FAILED: " << issues.ToOneLineString()); + CPS_LOG_W("GetTaskRequest: {" << request.DebugString() << "} FAILED: " << issues.ToOneLineString()); const TDuration delta = TInstant::Now() - startTime; SendResponseIssues<TEvControlPlaneStorage::TEvGetTaskResponse>(ev->Sender, issues, ev->Cookie, delta, requestCounters); LWPROBE(GetTaskRequest, owner, hostName, delta, false); @@ -243,7 +316,11 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ auto responseTasks = std::make_shared<TResponseTasks>(); - auto prepareParams = [=, rootCounters=Counters.Counters, actorSystem=NActors::TActivationContext::ActorSystem(), responseTasks=responseTasks](const TVector<TResultSet>& resultSets) mutable { + auto prepareParams = [=, rootCounters=Counters.Counters, + actorSystem=NActors::TActivationContext::ActorSystem(), + responseTasks=responseTasks, + tenantInfo=ev->Get()->TenantInfo + ](const TVector<TResultSet>& resultSets) mutable { TVector<TTaskInternal> tasks; TVector<TPickTaskParams> pickTaskParams; const auto now = TInstant::Now(); @@ -290,7 +367,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ for (size_t i = 0; i < numTasks; ++i) { auto tupleParams = MakeGetTaskUpdateQuery(tasks[i], responseTasks, now, now + Config.TaskLeaseTtl, Config.Proto.GetDisableCurrentIam(), - Config.AutomaticQueriesTtl, Config.ResultSetsTtl); // using for win32 build + Config.AutomaticQueriesTtl, Config.ResultSetsTtl, tenantInfo); // using for win32 build auto readQuery = std::get<0>(tupleParams); auto readParams = std::get<1>(tupleParams); auto prepareParams = std::get<2>(tupleParams); diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp index bc4544042e6..c0d28bdfca2 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp +++ b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp @@ -455,10 +455,17 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvPingTaskReq const TString queryId = request.query_id().value(); const TString owner = request.owner_id(); const TInstant deadline = NProtoInterop::CastFromProto(request.deadline()); + const TString tenant = request.tenant(); CPS_LOG_T("PingTaskRequest: {" << request.DebugString() << "}"); NYql::TIssues issues = ValidatePingTask(scope, queryId, owner, deadline, Config.ResultSetsTtl); + + auto tenantInfo = ev->Get()->TenantInfo; + if (tenantInfo && tenantInfo->TenantState.Value(tenant, TenantState::Active) == TenantState::Idle) { + issues.AddIssue("Tenant is idle, no processing is allowed"); + } + if (issues) { CPS_LOG_W("PingTaskRequest: {" << request.DebugString() << "} validation FAILED: " << issues.ToOneLineString()); const TDuration delta = TInstant::Now() - startTime; diff --git a/ydb/core/yq/libs/control_plane_storage/message_builders.h b/ydb/core/yq/libs/control_plane_storage/message_builders.h index 9b92c7c14f5..27578bd843d 100644 --- a/ydb/core/yq/libs/control_plane_storage/message_builders.h +++ b/ydb/core/yq/libs/control_plane_storage/message_builders.h @@ -975,6 +975,7 @@ class TGetTaskBuilder { TString Owner; TString HostName; TString TenantName; + NYq::TTenantInfo::TPtr TenantInfo; public: TGetTaskBuilder() @@ -982,6 +983,7 @@ public: SetOwner(DefaultOwner()); SetHostName("localhost"); SetTenantName("/root/tenant"); + SetTenantInfo(std::make_shared<NYq::TTenantInfo>()); } static TString DefaultOwner() { @@ -1006,12 +1008,19 @@ public: return *this; } + TGetTaskBuilder& SetTenantInfo(NYq::TTenantInfo::TPtr tenantInfo) + { + TenantInfo = tenantInfo; + return *this; + } + std::unique_ptr<NYq::TEvControlPlaneStorage::TEvGetTaskRequest> Build() { auto request = std::make_unique<NYq::TEvControlPlaneStorage::TEvGetTaskRequest>(); request->Request.set_tenant(TenantName); request->Request.set_owner_id(Owner); request->Request.set_host(HostName); + request->TenantInfo = TenantInfo; return request; } }; @@ -1038,12 +1047,14 @@ class TPingTaskBuilder { TVector<NYq::TEvControlPlaneStorage::TTopicConsumer> CreatedTopicConsumers; TVector<TString> DqGraphs; i32 DqGraphIndex = 0; + NYq::TTenantInfo::TPtr TenantInfo; public: TPingTaskBuilder() { SetDeadline(TInstant::Now() + TDuration::Minutes(5)); SetTenantName("/root/tenant"); + SetTenantInfo(std::make_shared<NYq::TTenantInfo>()); } TPingTaskBuilder& SetTenantName(const TString& tenantName) @@ -1172,7 +1183,13 @@ public: return *this; } - std::unique_ptr<NYq::TEvControlPlaneStorage::TEvPingTaskRequest> Build() + TPingTaskBuilder& SetTenantInfo(NYq::TTenantInfo::TPtr tenantInfo) + { + TenantInfo = tenantInfo; + return *this; + } + +std::unique_ptr<NYq::TEvControlPlaneStorage::TEvPingTaskRequest> Build() { Fq::Private::PingTaskRequest request; request.set_owner_id(Owner); @@ -1230,7 +1247,9 @@ public: *request.mutable_finished_at() = NProtoInterop::CastToProto(*FinishedAt); } - return std::make_unique<NYq::TEvControlPlaneStorage::TEvPingTaskRequest>(std::move(request)); + auto pingRequest = std::make_unique<NYq::TEvControlPlaneStorage::TEvPingTaskRequest>(std::move(request)); + pingRequest->TenantInfo = TenantInfo; + return pingRequest; } }; diff --git a/ydb/core/yq/libs/control_plane_storage/message_builders_yq.h b/ydb/core/yq/libs/control_plane_storage/message_builders_yq.h index bee788a83f1..ae5f68c56d8 100644 --- a/ydb/core/yq/libs/control_plane_storage/message_builders_yq.h +++ b/ydb/core/yq/libs/control_plane_storage/message_builders_yq.h @@ -975,6 +975,7 @@ class TGetTaskBuilder { TString Owner; TString HostName; TString TenantName; + NYq::TTenantInfo::TPtr TenantInfo; public: TGetTaskBuilder() @@ -982,6 +983,7 @@ public: SetOwner(DefaultOwner()); SetHostName("localhost"); SetTenantName("/root/tenant"); + SetTenantInfo(std::make_shared<NYq::TTenantInfo>()); } static TString DefaultOwner() { @@ -1006,12 +1008,19 @@ public: return *this; } + TGetTaskBuilder& SetTenantInfo(NYq::TTenantInfo::TPtr tenantInfo) + { + TenantInfo = tenantInfo; + return *this; + } + std::unique_ptr<TEvControlPlaneStorage::TEvGetTaskRequest> Build() { auto request = std::make_unique<TEvControlPlaneStorage::TEvGetTaskRequest>(); request->Request.set_tenant(TenantName); request->Request.set_owner_id(Owner); request->Request.set_host(HostName); + request->TenantInfo = TenantInfo; return request; } }; diff --git a/ydb/core/yq/libs/control_plane_storage/schema.h b/ydb/core/yq/libs/control_plane_storage/schema.h index 33b9ede7a45..5dcde8db98d 100644 --- a/ydb/core/yq/libs/control_plane_storage/schema.h +++ b/ydb/core/yq/libs/control_plane_storage/schema.h @@ -12,6 +12,9 @@ namespace NYq { #define JOBS_TABLE_NAME "jobs" #define NODES_TABLE_NAME "nodes" #define QUOTAS_TABLE_NAME "quotas" +#define TENANTS_TABLE_NAME "tenants" +#define TENANT_ACKS_TABLE_NAME "tenant_acks" +#define MAPPINGS_TABLE_NAME "mappings" // columns #define SCOPE_COLUMN_NAME "scope" @@ -79,4 +82,9 @@ namespace NYq { #define METRIC_USAGE_COLUMN_NAME "metric_usage" #define USAGE_UPDATED_AT_COLUMN_NAME "usage_updated_at" +#define VTENANT_COLUMN_NAME "vtenant" +#define COMMON_COLUMN_NAME "common" +#define STATE_COLUMN_NAME "state" +#define STATE_TIME_COLUMN_NAME "state_time" + } // namespace NYq diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp index 3d099b0a6f4..ea0418a0efc 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp @@ -38,6 +38,7 @@ void TYdbControlPlaneStorageActor::Bootstrap() { YdbConnection = NewYdbConnection(Config.Proto.GetStorage(), CredProviderFactory, YqSharedResources->CoreYdbDriver); DbPool = YqSharedResources->DbPoolHolder->GetOrCreate(EDbPoolId::MAIN, 10, YdbConnection->TablePathPrefix); CreateDirectory(); + CreateQueriesTable(); CreatePendingSmallTable(); CreateConnectionsTable(); @@ -47,6 +48,10 @@ void TYdbControlPlaneStorageActor::Bootstrap() { CreateJobsTable(); CreateNodesTable(); CreateQuotasTable(); + CreateTenantsTable(); + CreateTenantAcksTable(); + CreateMappingsTable(); + Become(&TThis::StateFunc); } @@ -54,7 +59,11 @@ void TYdbControlPlaneStorageActor::Bootstrap() { * Creating tables */ void TYdbControlPlaneStorageActor::RunCreateTableActor(const TString& path, NYdb::NTable::TTableDescription desc) { - Register(MakeCreateTableActor({}, NKikimrServices::YQ_CONTROL_PLANE_STORAGE, YdbConnection, path, std::move(desc), MakeCreateSchemaRetryPolicy())); + Register(MakeCreateTableActor(SelfId(), NKikimrServices::YQ_CONTROL_PLANE_STORAGE, YdbConnection, path, std::move(desc), MakeCreateSchemaRetryPolicy())); +} + +void TYdbControlPlaneStorageActor::Handle(TEvents::TEvSchemaCreated::TPtr&) { + // skip for now } void TYdbControlPlaneStorageActor::CreateQueriesTable() @@ -250,6 +259,53 @@ void TYdbControlPlaneStorageActor::CreateQuotasTable() RunCreateTableActor(tablePath, TTableDescription(description)); } +void TYdbControlPlaneStorageActor::CreateTenantsTable() +{ + auto tablePath = JoinPath(YdbConnection->TablePathPrefix, TENANTS_TABLE_NAME); + + auto description = TTableBuilder() + .AddNullableColumn(TENANT_COLUMN_NAME, EPrimitiveType::String) + .AddNullableColumn(VTENANT_COLUMN_NAME, EPrimitiveType::String) + .AddNullableColumn(COMMON_COLUMN_NAME, EPrimitiveType::Bool) + .AddNullableColumn(STATE_COLUMN_NAME, EPrimitiveType::Uint32) + .AddNullableColumn(STATE_TIME_COLUMN_NAME, EPrimitiveType::Timestamp) + .SetPrimaryKeyColumns({TENANT_COLUMN_NAME}) + .Build(); + + RunCreateTableActor(tablePath, TTableDescription(description)); +} + +void TYdbControlPlaneStorageActor::CreateTenantAcksTable() +{ + auto tablePath = JoinPath(YdbConnection->TablePathPrefix, TENANT_ACKS_TABLE_NAME); + + auto description = TTableBuilder() + .AddNullableColumn(NODE_ID_COLUMN_NAME, EPrimitiveType::Uint32) + .AddNullableColumn(STATE_TIME_COLUMN_NAME, EPrimitiveType::Timestamp) + .SetPrimaryKeyColumns({NODE_ID_COLUMN_NAME}) + .Build(); + + RunCreateTableActor(tablePath, TTableDescription(description)); +} + +void TYdbControlPlaneStorageActor::CreateMappingsTable() +{ + auto tablePath = JoinPath(YdbConnection->TablePathPrefix, MAPPINGS_TABLE_NAME); + + auto description = TTableBuilder() + .AddNullableColumn(SUBJECT_TYPE_COLUMN_NAME, EPrimitiveType::String) + .AddNullableColumn(SUBJECT_ID_COLUMN_NAME, EPrimitiveType::String) + .AddNullableColumn(VTENANT_COLUMN_NAME, EPrimitiveType::String) + .SetPrimaryKeyColumns({SUBJECT_TYPE_COLUMN_NAME, SUBJECT_ID_COLUMN_NAME}) + .Build(); + + RunCreateTableActor(tablePath, TTableDescription(description)); +} + +void TYdbControlPlaneStorageActor::AfterTablesCreated() { + // Schedule(TDuration::Zero(), new NActors::TEvents::TEvWakeup()); +} + bool TYdbControlPlaneStorageActor::IsSuperUser(const TString& user) { return AnyOf(Config.Proto.GetSuperUsers(), [&user](const auto& superUser) { diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h index f91618ecf3b..81f6fa4ccd0 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h @@ -523,6 +523,7 @@ public: hFunc(TEvQuotaService::TQuotaUsageRequest, Handle); hFunc(TEvQuotaService::TQuotaLimitChangeRequest, Handle); hFunc(TEvents::TEvCallback, [](TEvents::TEvCallback::TPtr& ev) { ev->Get()->Callback(); } ); + hFunc(TEvents::TEvSchemaCreated, Handle); ) void Handle(TEvControlPlaneStorage::TEvCreateQueryRequest::TPtr& ev); @@ -556,6 +557,8 @@ public: void Handle(TEvControlPlaneStorage::TEvNodesHealthCheckRequest::TPtr& ev); + void Handle(TEvents::TEvSchemaCreated::TPtr& ev); + void Handle(TEvQuotaService::TQuotaUsageRequest::TPtr& ev); void Handle(TEvQuotaService::TQuotaLimitChangeRequest::TPtr& ev); @@ -613,8 +616,12 @@ public: void CreateIdempotencyKeysTable(); void CreateResultSetsTable(); void CreateQuotasTable(); + void CreateTenantsTable(); + void CreateTenantAcksTable(); + void CreateMappingsTable(); void RunCreateTableActor(const TString& path, NYdb::NTable::TTableDescription desc); + void AfterTablesCreated(); private: /* @@ -892,8 +899,6 @@ private: std::shared_ptr<TResponseTasks> responseTasks, const TVector<TValidationQuery>& validators = {}, TTxSettings transactionMode = TTxSettings::SerializableRW()); - - TString AssignTenantName(const TString& cloudId, const TString& scope); }; } diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp index 8a175b4f555..2a6752f6505 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp @@ -45,35 +45,6 @@ YandexQuery::IamAuth::IdentityCase GetIamAuth(const YandexQuery::Connection& con namespace NYq { -TString TYdbControlPlaneStorageActor::AssignTenantName(const TString& cloudId, const TString& scope) { - const auto& mapping = Config.Proto.GetMapping(); - - if (scope) { - for (const auto& scopeToTenant: mapping.GetScopeToTenantName()) { - if (scopeToTenant.GetKey() == scope) { - return scopeToTenant.GetValue(); - } - } - } - - if (cloudId) { - for (const auto& cloudToTenant: mapping.GetCloudIdToTenantName()) { - if (cloudToTenant.GetKey() == cloudId) { - return cloudToTenant.GetValue(); - } - } - } - - auto size = mapping.CommonTenantNameSize(); - - if (size) { - auto index = MultiHash(cloudId, scope) % size; - return mapping.GetCommonTenantName(index); - } else { - return TenantName; - } -} - void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQueryRequest::TPtr& ev) { TInstant startTime = TInstant::Now(); @@ -124,6 +95,8 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "Streaming disposition \"from_last_checkpoint\" is not allowed in CreateQuery request")); } + auto tenant = ev->Get()->TenantInfo->Assign(cloudId, scope, TenantName); + { TQuotaMap::const_iterator it = event.Quotas.end(); if (queryType == YandexQuery::QueryContent::ANALYTICS) { @@ -290,7 +263,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery response->second.After.ConstructInPlace().CopyFrom(query); TSqlQueryBuilder writeQueryBuilder(YdbConnection->TablePathPrefix, "CreateQuery(write)"); - writeQueryBuilder.AddString("tenant", AssignTenantName(cloudId, scope)); + writeQueryBuilder.AddString("tenant", tenant); writeQueryBuilder.AddString("scope", scope); writeQueryBuilder.AddString("query_id", queryId); writeQueryBuilder.AddString("name", query.content().name()); @@ -325,9 +298,9 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery writeQueryBuilder.AddText( "INSERT INTO `" PENDING_SMALL_TABLE_NAME "`\n" "(`" TENANT_COLUMN_NAME "`, `" SCOPE_COLUMN_NAME "`, `" QUERY_ID_COLUMN_NAME "`, `" QUERY_TYPE_COLUMN_NAME "`, `" LAST_SEEN_AT_COLUMN_NAME "`, `" ASSIGNED_UNTIL_COLUMN_NAME "`,\n" - "`" RETRY_COUNTER_COLUMN_NAME "`, `" RETRY_COUNTER_UPDATE_COLUMN_NAME "`, `" HOST_NAME_COLUMN_NAME "`, `" OWNER_COLUMN_NAME "`)\n" + "`" RETRY_RATE_COLUMN_NAME "`, `" RETRY_COUNTER_COLUMN_NAME "`, `" RETRY_COUNTER_UPDATE_COLUMN_NAME "`, `" HOST_NAME_COLUMN_NAME "`, `" OWNER_COLUMN_NAME "`)\n" "VALUES\n" - " ($tenant, $scope, $query_id, $query_type, $zero_timestamp, $zero_timestamp, 0, $now, \"\", \"\");" + " ($tenant, $scope, $query_id, $query_type, $zero_timestamp, $zero_timestamp, 0, 0, $now, \"\", \"\");" ); } @@ -778,6 +751,9 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyQuery if (request.state_load_mode() == YandexQuery::FROM_LAST_CHECKPOINT) { issues.AddIssue(MakeErrorIssue(TIssuesIds::UNSUPPORTED, "State load mode \"FROM_LAST_CHECKPOINT\" is not supported")); } + + auto tenant = ev->Get()->TenantInfo->Assign(cloudId, scope, TenantName); + if (issues) { CPS_LOG_W("ModifyQueryRequest: {" << request.DebugString() << "} " << MakeUserInfo(user, token) << "validation FAILED: " << issues.ToOneLineString()); const TDuration delta = TInstant::Now() - startTime; @@ -991,7 +967,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyQuery response->second.CloudId = internal.cloud_id(); TSqlQueryBuilder writeQueryBuilder(YdbConnection->TablePathPrefix, "ModifyQuery(write)"); - writeQueryBuilder.AddString("tenant", AssignTenantName(internal.cloud_id(), scope)); + writeQueryBuilder.AddString("tenant", tenant); writeQueryBuilder.AddString("scope", scope); writeQueryBuilder.AddString("query_id", queryId); writeQueryBuilder.AddUint64("max_count_jobs", Config.Proto.GetMaxCountJobs()); diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp index 0cfba053b2f..128301adc25 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp @@ -44,7 +44,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvQuotaService::TQuotaUsageRequest::T QueryQuotas.clear(); TDbExecutable::TPtr executable; - auto& executer = TQuotaCountExecuter::Create(executable, [](TQuotaCountExecuter& executer) { executer.State.clear(); } ); + auto& executer = TQuotaCountExecuter::Create(executable, false, [](TQuotaCountExecuter& executer) { executer.State.clear(); } ); executer.Read( [=](TQuotaCountExecuter&, TSqlQueryBuilder& builder) { @@ -76,11 +76,11 @@ void TYdbControlPlaneStorageActor::Handle(TEvQuotaService::TQuotaUsageRequest::T executer.State[internal.cloud_id()] += count; } }, - "GetScopeCloud_" + scope + "GetScopeCloud_" + scope, true ); } }, - "GroupByPendingSmall" + "GroupByPendingSmall", true ).Process(SelfId(), [=, this](TQuotaCountExecuter& executer) { this->QuotasUpdatedAt = Now(); diff --git a/ydb/core/yq/libs/events/event_subspace.h b/ydb/core/yq/libs/events/event_subspace.h index e283d642539..ea0323aa8f4 100644 --- a/ydb/core/yq/libs/events/event_subspace.h +++ b/ydb/core/yq/libs/events/event_subspace.h @@ -29,6 +29,7 @@ struct TYqEventSubspace { InternalService, QuotaService, RateLimiter, + ControlPlaneConfig, SubspacesEnd, }; diff --git a/ydb/core/yq/libs/init/CMakeLists.txt b/ydb/core/yq/libs/init/CMakeLists.txt index bad3af568fe..11142bedfd2 100644 --- a/ydb/core/yq/libs/init/CMakeLists.txt +++ b/ydb/core/yq/libs/init/CMakeLists.txt @@ -23,6 +23,7 @@ target_link_libraries(yq-libs-init PUBLIC yq-libs-checkpoint_storage yq-libs-checkpointing yq-libs-common + yq-libs-control_plane_config yq-libs-control_plane_proxy yq-libs-control_plane_storage yq-libs-events diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp index 744828532ff..bfc26130148 100644 --- a/ydb/core/yq/libs/init/init.cpp +++ b/ydb/core/yq/libs/init/init.cpp @@ -5,6 +5,7 @@ #include <ydb/core/yq/libs/audit/yq_audit_service.h> #include <ydb/core/yq/libs/checkpoint_storage/storage_service.h> +#include <ydb/core/yq/libs/control_plane_config/control_plane_config.h> #include <ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.h> #include <ydb/core/yq/libs/health/health.h> #include <ydb/core/yq/libs/checkpoint_storage/storage_service.h> @@ -83,6 +84,11 @@ void Init( credentialsProviderFactory, tenant); actorRegistrator(NYq::ControlPlaneStorageServiceActorId(), controlPlaneStorage); + + actorRegistrator(NYq::ControlPlaneConfigActorId(), + CreateControlPlaneConfigActor(yqSharedResources, credentialsProviderFactory, protoConfig.GetControlPlaneStorage(), + appData->Counters->GetSubgroup("counters", "yq")->GetSubgroup("subsystem", "ControlPlaneConfig")) + ); } if (protoConfig.GetControlPlaneProxy().GetEnabled()) { diff --git a/ydb/core/yq/libs/private_client/loopback_service.cpp b/ydb/core/yq/libs/private_client/loopback_service.cpp index e47f7231b6e..4d211d20ea3 100644 --- a/ydb/core/yq/libs/private_client/loopback_service.cpp +++ b/ydb/core/yq/libs/private_client/loopback_service.cpp @@ -6,6 +6,8 @@ #include <ydb/core/protos/services.pb.h> +#include <ydb/core/yq/libs/control_plane_config/control_plane_config.h> +#include <ydb/core/yq/libs/control_plane_config/events/events.h> #include <ydb/core/yq/libs/control_plane_storage/control_plane_storage.h> #include <ydb/core/yq/libs/control_plane_storage/events/events.h> @@ -39,6 +41,7 @@ private: STRICT_STFUNC(StateFunc, hFunc(TEvInternalService::TEvHealthCheckRequest, Handle) hFunc(TEvInternalService::TEvGetTaskRequest, Handle) + hFunc(NYq::TEvControlPlaneConfig::TEvGetTenantInfoResponse, Handle) hFunc(TEvInternalService::TEvPingTaskRequest, Handle) hFunc(TEvInternalService::TEvWriteResultRequest, Handle) hFunc(TEvInternalService::TEvCreateRateLimiterResourceRequest, Handle) @@ -77,7 +80,20 @@ private: Cookie++; Senders[Cookie] = ev->Sender; auto request = ev->Get()->Request; - Send(NYq::ControlPlaneStorageServiceActorId(), new NYq::TEvControlPlaneStorage::TEvGetTaskRequest(std::move(request)), 0, Cookie); + GetRequests.emplace(Cookie, std::move(request)); + Send(NYq::ControlPlaneConfigActorId(), new NYq::TEvControlPlaneConfig::TEvGetTenantInfoRequest(), 0, Cookie); + } + + void Handle(NYq::TEvControlPlaneConfig::TEvGetTenantInfoResponse::TPtr& ev) { + TenantInfo = ev->Get()->TenantInfo; + auto it = GetRequests.find(ev->Cookie); + if (it != GetRequests.end()) { + auto request = it->second; + GetRequests.erase(it); + auto event = std::make_unique<NYq::TEvControlPlaneStorage::TEvGetTaskRequest>(std::move(request)); + event->TenantInfo = TenantInfo; + Send(NYq::ControlPlaneStorageServiceActorId(), event.release(), 0, ev->Cookie); + } } void Handle(NYq::TEvControlPlaneStorage::TEvGetTaskResponse::TPtr& ev) { @@ -99,7 +115,9 @@ private: Senders[Cookie] = ev->Sender; OriginalCookies[Cookie] = ev->Cookie; auto request = ev->Get()->Request; - Send(NYq::ControlPlaneStorageServiceActorId(), new NYq::TEvControlPlaneStorage::TEvPingTaskRequest(std::move(request)), 0, Cookie); + auto event = std::make_unique<NYq::TEvControlPlaneStorage::TEvPingTaskRequest>(std::move(request)); + event->TenantInfo = TenantInfo; + Send(NYq::ControlPlaneStorageServiceActorId(), event.release(), 0, Cookie); } void Handle(NYq::TEvControlPlaneStorage::TEvPingTaskResponse::TPtr& ev) { @@ -185,6 +203,8 @@ private: ui64 Cookie = 0; THashMap<ui64, NActors::TActorId> Senders; THashMap<ui64, ui64> OriginalCookies; + THashMap<ui64, Fq::Private::GetTaskRequest> GetRequests; + NYq::TTenantInfo::TPtr TenantInfo; }; NActors::IActor* CreateLoopbackServiceActor( diff --git a/ydb/core/yq/libs/shared_resources/db_exec.h b/ydb/core/yq/libs/shared_resources/db_exec.h index da99cd7dbb6..1add5e82b3c 100644 --- a/ydb/core/yq/libs/shared_resources/db_exec.h +++ b/ydb/core/yq/libs/shared_resources/db_exec.h @@ -135,7 +135,6 @@ public: TCommitTransactionResult result = future.GetValue(); auto status = static_cast<TStatus>(result); - if (!status.IsSuccess()) { return MakeFuture(status); } else { diff --git a/ydb/core/yq/libs/test_connection/events/events.h b/ydb/core/yq/libs/test_connection/events/events.h index 7cbb61bdccc..ef2c4368c16 100644 --- a/ydb/core/yq/libs/test_connection/events/events.h +++ b/ydb/core/yq/libs/test_connection/events/events.h @@ -3,6 +3,7 @@ #include <ydb/public/api/protos/yq.pb.h> #include <ydb/core/yq/libs/events/event_subspace.h> +#include <ydb/core/yq/libs/control_plane_config/events/events.h> #include <ydb/core/yq/libs/control_plane_storage/events/events.h> #include <library/cpp/actors/core/event_pb.h> @@ -16,12 +17,12 @@ namespace NYq { struct TEvTestConnection { // Event ids. enum EEv : ui32 { - EvTestConnectionRequest = YqEventSubspaceBegin(NYq::TYqEventSubspace::TestConnection), + EvTestConnectionRequest = YqEventSubspaceBegin(::NYq::TYqEventSubspace::TestConnection), EvTestConnectionResponse, EvEnd, }; - static_assert(EvEnd <= YqEventSubspaceEnd(NYq::TYqEventSubspace::TestConnection), "All events must be in their subspace"); + static_assert(EvEnd <= YqEventSubspaceEnd(::NYq::TYqEventSubspace::TestConnection), "All events must be in their subspace"); struct TEvTestConnectionRequest : NActors::TEventLocal<TEvTestConnectionRequest, EvTestConnectionRequest> { explicit TEvTestConnectionRequest(const TString& scope, @@ -30,7 +31,8 @@ struct TEvTestConnection { const TString& token, const TString& cloudId, const TPermissions& permissions, - const TQuotaMap& quotas) + const TQuotaMap& quotas, + TTenantInfo::TPtr tenantInfo) : CloudId(cloudId) , Scope(scope) , Request(request) @@ -38,6 +40,7 @@ struct TEvTestConnection { , Token(token) , Permissions(permissions) , Quotas(quotas) + , TenantInfo(tenantInfo) { } @@ -48,6 +51,7 @@ struct TEvTestConnection { TString Token; TPermissions Permissions; const TQuotaMap Quotas; + TTenantInfo::TPtr TenantInfo; }; struct TEvTestConnectionResponse : NActors::TEventLocal<TEvTestConnectionResponse, EvTestConnectionResponse> { |