diff options
author | gvit <gvit@ydb.tech> | 2022-10-21 22:34:14 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2022-10-21 22:34:14 +0300 |
commit | bc055f986c600b51998110bd2d3e6a58a51a568d (patch) | |
tree | 74e6ada2a6820a3eedf6e850383d73d1d631d8a6 | |
parent | 1f1966ecbf022c02cbfd0afbbdb3c6aafec6fbb3 (diff) | |
download | ydb-bc055f986c600b51998110bd2d3e6a58a51a568d.tar.gz |
use tenant name from app data in kqp proxy
-rw-r--r-- | ydb/core/kqp/proxy/kqp_proxy_service.cpp | 69 | ||||
-rw-r--r-- | ydb/core/testlib/test_client.cpp | 1 |
2 files changed, 17 insertions, 53 deletions
diff --git a/ydb/core/kqp/proxy/kqp_proxy_service.cpp b/ydb/core/kqp/proxy/kqp_proxy_service.cpp index b4898e7916..3ccce5cef4 100644 --- a/ydb/core/kqp/proxy/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy/kqp_proxy_service.cpp @@ -7,7 +7,6 @@ #include <ydb/core/base/statestorage.h> #include <ydb/core/cms/console/configs_dispatcher.h> #include <ydb/core/cms/console/console.h> -#include <ydb/core/mind/tenant_pool.h> #include <ydb/core/kqp/counters/kqp_counters.h> #include <ydb/core/kqp/common/kqp_lwtrace_probes.h> #include <ydb/core/kqp/common/kqp_timeouts.h> @@ -152,8 +151,6 @@ public: , KqpSettings(std::make_shared<const TKqpSettings>(std::move(settings))) , QueryReplayFactory(std::move(queryReplayFactory)) , PendingRequests() - , TenantsReady(false) - , Tenants() , ModuleResolverState() {} @@ -186,8 +183,6 @@ public: IEventHandle::FlagTrackDelivery); WhiteBoardService = NNodeWhiteboard::MakeNodeWhiteboardServiceId(SelfId().NodeId()); - // Subscribe for tenant changes - Send(MakeTenantPoolRootID(), new TEvents::TEvSubscribe()); if (auto& cfg = TableServiceConfig.GetSpillingServiceConfig().GetLocalFileConfig(); cfg.GetEnable()) { SpillingService = TlsActivationContext->ExecutorThread.RegisterActor(CreateKqpLocalFileSpillingService(cfg, Counters)); @@ -278,20 +273,19 @@ public: SendBoardPublishPoison(); SendWhiteboardRequest(); - if (Tenants.empty() || !SelfDataCenterId) { - KQP_PROXY_LOG_E("Cannot start publishing usage, tenants: " << Tenants.size() << ", " << SelfDataCenterId.value_or("empty")); + if (AppData()->TenantName.empty() || !SelfDataCenterId) { + KQP_PROXY_LOG_E("Cannot start publishing usage, tenants: " << AppData()->TenantName << ", " << SelfDataCenterId.value_or("empty")); return; } - const TString& database = *Tenants.begin(); - auto groupId = GetDefaultStateStorageGroupId(database); + auto groupId = GetDefaultStateStorageGroupId(AppData()->TenantName); if (!groupId) { - KQP_PROXY_LOG_D("Unable to determine default state storage group id for database " << database); + KQP_PROXY_LOG_D("Unable to determine default state storage group id for database " << AppData()->TenantName); return; } NodeResources.SetActiveWorkersCount(LocalSessions->size()); - PublishBoardPath = MakeKqpProxyBoardPath(database); + PublishBoardPath = MakeKqpProxyBoardPath(AppData()->TenantName); auto actor = CreateBoardPublishActor(PublishBoardPath, NodeResources.SerializeAsString(), SelfId(), *groupId, 0, true); BoardPublishActor = Register(actor); LastPublishResourcesAt = TAppData::TimeProvider->Now(); @@ -329,27 +323,6 @@ public: return TActor::PassAway(); } - void Handle(TEvTenantPool::TEvTenantPoolStatus::TPtr& ev) { - const auto &event = ev->Get()->Record; - - TenantsReady = true; - Tenants.clear(); - for (auto &slot : event.GetSlots()) { - Tenants.insert(slot.GetAssignedTenant()); - } - - KQP_PROXY_LOG_I("Received tenant pool status, serving tenants: " << JoinRange(", ", Tenants.begin(), Tenants.end())); - for (auto& [_, sessionInfo] : *LocalSessions) { - if (!sessionInfo.Database.empty() && !Tenants.contains(sessionInfo.Database)) { - auto closeSessionEv = MakeHolder<TEvKqp::TEvCloseSessionRequest>(); - closeSessionEv->Record.MutableRequest()->SetSessionId(sessionInfo.SessionId); - Send(sessionInfo.WorkerId, closeSessionEv.Release()); - } - } - - PublishResourceUsage(); - } - void Handle(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse::TPtr&) { KQP_PROXY_LOG_D("Subscribed for config changes."); } @@ -647,12 +620,11 @@ public: } void LookupPeerProxyData() { - if (!SelfDataCenterId || BoardLookupActor || Tenants.empty()) { + if (!SelfDataCenterId || BoardLookupActor || AppData()->TenantName.empty()) { return; } - const TString& database = *Tenants.begin(); - auto groupId = GetDefaultStateStorageGroupId(database); + auto groupId = GetDefaultStateStorageGroupId(AppData()->TenantName); if (!groupId) { KQP_PROXY_LOG_W("Unable to determine default state storage group id"); return; @@ -848,11 +820,7 @@ public: str << " - DataCenterId: " << *SelfDataCenterId << Endl; } - str << "Serving tenants: " << Endl; - for(auto& tenant: Tenants) { - str << " - " << tenant << Endl; - } - str << Endl; + str << "Serving tenant: " << AppData()->TenantName << Endl; { auto cgiTmp = cgi; @@ -982,7 +950,6 @@ public: hFunc(TEvents::TEvUndelivered, Handle); hFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse, Handle); hFunc(NConsole::TEvConsole::TEvConfigNotificationRequest, Handle); - hFunc(TEvTenantPool::TEvTenantPoolStatus, Handle); hFunc(TEvKqp::TEvQueryRequest, Handle); hFunc(TEvKqp::TEvCloseSessionRequest, Handle); hFunc(TEvKqp::TEvQueryResponse, ForwardEvent); @@ -1108,16 +1075,14 @@ private: bool CreateNewSessionWorker(const TKqpRequestInfo& requestInfo, const TString& cluster, bool longSession, const TString& database, bool supportsBalancing, TProcessResult<TKqpSessionInfo*>& result) { - if (!database.empty()) { - if (!TenantsReady) { - TString error = TStringBuilder() << "Node isn't ready to serve database requests."; + if (!database.empty() && AppData()->TenantName.empty()) { + TString error = TStringBuilder() << "Node isn't ready to serve database requests."; - KQP_PROXY_LOG_E(requestInfo << error); + KQP_PROXY_LOG_E(requestInfo << error); - result.YdbStatus = Ydb::StatusIds::UNAVAILABLE; - result.Error = error; - return false; - } + result.YdbStatus = Ydb::StatusIds::UNAVAILABLE; + result.Error = error; + return false; } if (ShutdownRequested) { @@ -1188,8 +1153,8 @@ private: return TActorId(); } - if (!Tenants.empty()) { - auto counters = Counters->GetDbCounters(*Tenants.begin()); + if (!AppData()->TenantName.empty()) { + auto counters = Counters->GetDbCounters(AppData()->TenantName); Counters->ReportProxyForwardedRequest(counters); } @@ -1242,11 +1207,9 @@ private: std::optional<TPeerStats> PeerStats; TKqpProxyRequestTracker PendingRequests; - bool TenantsReady; bool ShutdownRequested = false; THashMap<ui64, NKikimrConsole::TConfigItem::EKind> ConfigSubscriptions; THashMap<ui64, TActorId> TimeoutTimers; - THashSet<TString> Tenants; TIntrusivePtr<TKqpShutdownState> ShutdownState; TIntrusivePtr<TModuleResolverState> ModuleResolverState; diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 3a3980fa19..c82ee285c2 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -569,6 +569,7 @@ namespace Tests { TTenantPoolConfig::TPtr tenantPoolConfig = new TTenantPoolConfig(localConfig); tenantPoolConfig->AddStaticSlot(domainName); + appData.TenantName = domainName; auto poolId = Runtime->Register(CreateTenantPool(tenantPoolConfig), nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0); |