aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2022-10-21 22:34:14 +0300
committergvit <gvit@ydb.tech>2022-10-21 22:34:14 +0300
commitbc055f986c600b51998110bd2d3e6a58a51a568d (patch)
tree74e6ada2a6820a3eedf6e850383d73d1d631d8a6
parent1f1966ecbf022c02cbfd0afbbdb3c6aafec6fbb3 (diff)
downloadydb-bc055f986c600b51998110bd2d3e6a58a51a568d.tar.gz
use tenant name from app data in kqp proxy
-rw-r--r--ydb/core/kqp/proxy/kqp_proxy_service.cpp69
-rw-r--r--ydb/core/testlib/test_client.cpp1
2 files changed, 17 insertions, 53 deletions
diff --git a/ydb/core/kqp/proxy/kqp_proxy_service.cpp b/ydb/core/kqp/proxy/kqp_proxy_service.cpp
index b4898e7916..3ccce5cef4 100644
--- a/ydb/core/kqp/proxy/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy/kqp_proxy_service.cpp
@@ -7,7 +7,6 @@
#include <ydb/core/base/statestorage.h>
#include <ydb/core/cms/console/configs_dispatcher.h>
#include <ydb/core/cms/console/console.h>
-#include <ydb/core/mind/tenant_pool.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
#include <ydb/core/kqp/common/kqp_lwtrace_probes.h>
#include <ydb/core/kqp/common/kqp_timeouts.h>
@@ -152,8 +151,6 @@ public:
, KqpSettings(std::make_shared<const TKqpSettings>(std::move(settings)))
, QueryReplayFactory(std::move(queryReplayFactory))
, PendingRequests()
- , TenantsReady(false)
- , Tenants()
, ModuleResolverState()
{}
@@ -186,8 +183,6 @@ public:
IEventHandle::FlagTrackDelivery);
WhiteBoardService = NNodeWhiteboard::MakeNodeWhiteboardServiceId(SelfId().NodeId());
- // Subscribe for tenant changes
- Send(MakeTenantPoolRootID(), new TEvents::TEvSubscribe());
if (auto& cfg = TableServiceConfig.GetSpillingServiceConfig().GetLocalFileConfig(); cfg.GetEnable()) {
SpillingService = TlsActivationContext->ExecutorThread.RegisterActor(CreateKqpLocalFileSpillingService(cfg, Counters));
@@ -278,20 +273,19 @@ public:
SendBoardPublishPoison();
SendWhiteboardRequest();
- if (Tenants.empty() || !SelfDataCenterId) {
- KQP_PROXY_LOG_E("Cannot start publishing usage, tenants: " << Tenants.size() << ", " << SelfDataCenterId.value_or("empty"));
+ if (AppData()->TenantName.empty() || !SelfDataCenterId) {
+ KQP_PROXY_LOG_E("Cannot start publishing usage, tenants: " << AppData()->TenantName << ", " << SelfDataCenterId.value_or("empty"));
return;
}
- const TString& database = *Tenants.begin();
- auto groupId = GetDefaultStateStorageGroupId(database);
+ auto groupId = GetDefaultStateStorageGroupId(AppData()->TenantName);
if (!groupId) {
- KQP_PROXY_LOG_D("Unable to determine default state storage group id for database " << database);
+ KQP_PROXY_LOG_D("Unable to determine default state storage group id for database " << AppData()->TenantName);
return;
}
NodeResources.SetActiveWorkersCount(LocalSessions->size());
- PublishBoardPath = MakeKqpProxyBoardPath(database);
+ PublishBoardPath = MakeKqpProxyBoardPath(AppData()->TenantName);
auto actor = CreateBoardPublishActor(PublishBoardPath, NodeResources.SerializeAsString(), SelfId(), *groupId, 0, true);
BoardPublishActor = Register(actor);
LastPublishResourcesAt = TAppData::TimeProvider->Now();
@@ -329,27 +323,6 @@ public:
return TActor::PassAway();
}
- void Handle(TEvTenantPool::TEvTenantPoolStatus::TPtr& ev) {
- const auto &event = ev->Get()->Record;
-
- TenantsReady = true;
- Tenants.clear();
- for (auto &slot : event.GetSlots()) {
- Tenants.insert(slot.GetAssignedTenant());
- }
-
- KQP_PROXY_LOG_I("Received tenant pool status, serving tenants: " << JoinRange(", ", Tenants.begin(), Tenants.end()));
- for (auto& [_, sessionInfo] : *LocalSessions) {
- if (!sessionInfo.Database.empty() && !Tenants.contains(sessionInfo.Database)) {
- auto closeSessionEv = MakeHolder<TEvKqp::TEvCloseSessionRequest>();
- closeSessionEv->Record.MutableRequest()->SetSessionId(sessionInfo.SessionId);
- Send(sessionInfo.WorkerId, closeSessionEv.Release());
- }
- }
-
- PublishResourceUsage();
- }
-
void Handle(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse::TPtr&) {
KQP_PROXY_LOG_D("Subscribed for config changes.");
}
@@ -647,12 +620,11 @@ public:
}
void LookupPeerProxyData() {
- if (!SelfDataCenterId || BoardLookupActor || Tenants.empty()) {
+ if (!SelfDataCenterId || BoardLookupActor || AppData()->TenantName.empty()) {
return;
}
- const TString& database = *Tenants.begin();
- auto groupId = GetDefaultStateStorageGroupId(database);
+ auto groupId = GetDefaultStateStorageGroupId(AppData()->TenantName);
if (!groupId) {
KQP_PROXY_LOG_W("Unable to determine default state storage group id");
return;
@@ -848,11 +820,7 @@ public:
str << " - DataCenterId: " << *SelfDataCenterId << Endl;
}
- str << "Serving tenants: " << Endl;
- for(auto& tenant: Tenants) {
- str << " - " << tenant << Endl;
- }
- str << Endl;
+ str << "Serving tenant: " << AppData()->TenantName << Endl;
{
auto cgiTmp = cgi;
@@ -982,7 +950,6 @@ public:
hFunc(TEvents::TEvUndelivered, Handle);
hFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse, Handle);
hFunc(NConsole::TEvConsole::TEvConfigNotificationRequest, Handle);
- hFunc(TEvTenantPool::TEvTenantPoolStatus, Handle);
hFunc(TEvKqp::TEvQueryRequest, Handle);
hFunc(TEvKqp::TEvCloseSessionRequest, Handle);
hFunc(TEvKqp::TEvQueryResponse, ForwardEvent);
@@ -1108,16 +1075,14 @@ private:
bool CreateNewSessionWorker(const TKqpRequestInfo& requestInfo,
const TString& cluster, bool longSession, const TString& database, bool supportsBalancing, TProcessResult<TKqpSessionInfo*>& result)
{
- if (!database.empty()) {
- if (!TenantsReady) {
- TString error = TStringBuilder() << "Node isn't ready to serve database requests.";
+ if (!database.empty() && AppData()->TenantName.empty()) {
+ TString error = TStringBuilder() << "Node isn't ready to serve database requests.";
- KQP_PROXY_LOG_E(requestInfo << error);
+ KQP_PROXY_LOG_E(requestInfo << error);
- result.YdbStatus = Ydb::StatusIds::UNAVAILABLE;
- result.Error = error;
- return false;
- }
+ result.YdbStatus = Ydb::StatusIds::UNAVAILABLE;
+ result.Error = error;
+ return false;
}
if (ShutdownRequested) {
@@ -1188,8 +1153,8 @@ private:
return TActorId();
}
- if (!Tenants.empty()) {
- auto counters = Counters->GetDbCounters(*Tenants.begin());
+ if (!AppData()->TenantName.empty()) {
+ auto counters = Counters->GetDbCounters(AppData()->TenantName);
Counters->ReportProxyForwardedRequest(counters);
}
@@ -1242,11 +1207,9 @@ private:
std::optional<TPeerStats> PeerStats;
TKqpProxyRequestTracker PendingRequests;
- bool TenantsReady;
bool ShutdownRequested = false;
THashMap<ui64, NKikimrConsole::TConfigItem::EKind> ConfigSubscriptions;
THashMap<ui64, TActorId> TimeoutTimers;
- THashSet<TString> Tenants;
TIntrusivePtr<TKqpShutdownState> ShutdownState;
TIntrusivePtr<TModuleResolverState> ModuleResolverState;
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index 3a3980fa19..c82ee285c2 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -569,6 +569,7 @@ namespace Tests {
TTenantPoolConfig::TPtr tenantPoolConfig = new TTenantPoolConfig(localConfig);
tenantPoolConfig->AddStaticSlot(domainName);
+ appData.TenantName = domainName;
auto poolId = Runtime->Register(CreateTenantPool(tenantPoolConfig), nodeIdx, appData.SystemPoolId,
TMailboxType::Revolving, 0);