diff options
author | shumkovnd <shumkovnd@yandex-team.com> | 2023-03-02 20:31:10 +0300 |
---|---|---|
committer | shumkovnd <shumkovnd@yandex-team.com> | 2023-03-02 20:31:10 +0300 |
commit | 50f71db153aefbf57790eababbae66ee298f4ef1 (patch) | |
tree | 0d556ac878f3017307c8d336b66ddf2bb3c56b05 | |
parent | e92cfd20e40d4577bafbae4a0fe7077d4df07bdd (diff) | |
download | ydb-50f71db153aefbf57790eababbae66ee298f4ef1.tar.gz |
add publishing kqp proxy data by RM
-rw-r--r-- | ydb/core/driver_lib/run/kikimr_services_initializers.cpp | 7 | ||||
-rw-r--r-- | ydb/core/kqp/common/kqp.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/common/kqp_event_ids.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_proxy_service.cpp | 95 | ||||
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_proxy_service.h | 5 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/CMakeLists.darwin.txt | 2 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/CMakeLists.linux-aarch64.txt | 2 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/CMakeLists.linux.txt | 2 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/kqp_rm_service.cpp | 80 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/kqp_rm_service.h | 7 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 1 | ||||
-rw-r--r-- | ydb/core/protos/kqp.proto | 2 | ||||
-rw-r--r-- | ydb/core/testlib/test_client.cpp | 12 |
13 files changed, 192 insertions, 27 deletions
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index d071e27eb0..9750acab39 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -2280,14 +2280,17 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu enableSpilling.SetValue(appData->EnableKqpSpilling ? "true" : "false"); settings.emplace_back(std::move(enableSpilling)); + auto kqpProxySharedResources = std::make_shared<NKqp::TKqpProxySharedResources>(); + // Crate resource manager - auto rm = NKqp::CreateKqpResourceManagerActor(Config.GetTableServiceConfig().GetResourceManager(), nullptr); + auto rm = NKqp::CreateKqpResourceManagerActor(Config.GetTableServiceConfig().GetResourceManager(), nullptr, + {}, kqpProxySharedResources); setup->LocalServices.push_back(std::make_pair( NKqp::MakeKqpRmServiceID(NodeId), TActorSetupCmd(rm, TMailboxType::HTSwap, appData->UserPoolId))); auto proxy = NKqp::CreateKqpProxyService(Config.GetLogConfig(), Config.GetTableServiceConfig(), - std::move(settings), Factories->QueryReplayBackendFactory); + std::move(settings), Factories->QueryReplayBackendFactory, std::move(kqpProxySharedResources)); setup->LocalServices.push_back(std::make_pair( NKqp::MakeKqpProxyID(NodeId), TActorSetupCmd(proxy, TMailboxType::HTSwap, appData->UserPoolId))); diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h index 16e4666af4..6140298401 100644 --- a/ydb/core/kqp/common/kqp.h +++ b/ydb/core/kqp/common/kqp.h @@ -718,6 +718,9 @@ struct TEvKqp { NLWTrace::TOrbit Orbit; }; + struct TEvKqpProxyPublishRequest : + public TEventLocal<TEvKqpProxyPublishRequest, TKqpEvents::EvKqpProxyPublishRequest> {}; + struct TEvCompileInvalidateRequest : public TEventLocal<TEvCompileInvalidateRequest, TKqpEvents::EvCompileInvalidateRequest> { diff --git a/ydb/core/kqp/common/kqp_event_ids.h b/ydb/core/kqp/common/kqp_event_ids.h index 05012cbd21..526317996b 100644 --- a/ydb/core/kqp/common/kqp_event_ids.h +++ b/ydb/core/kqp/common/kqp_event_ids.h @@ -34,6 +34,7 @@ struct TKqpEvents { EvScriptResponse, EvFetchScriptResultsRequest, EvFetchScriptResultsResponse, + EvKqpProxyPublishRequest, }; static_assert (EvCompileInvalidateRequest + 1 == EvAbortExecution); diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index ee2a2969ab..a43d14955e 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -12,7 +12,6 @@ #include <ydb/core/kqp/compile_service/kqp_compile_service.h> #include <ydb/core/kqp/session_actor/kqp_worker_common.h> #include <ydb/core/kqp/node_service/kqp_node_service.h> -#include <ydb/core/kqp/rm_service/kqp_rm_service.h> #include <ydb/core/kqp/run_script_actor/kqp_run_script_actor.h> #include <ydb/core/kqp/runtime/kqp_spilling_file.h> #include <ydb/core/kqp/runtime/kqp_spilling.h> @@ -109,6 +108,7 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> { EvCollectPeerProxyData, EvOnRequestTimeout, EvCloseIdleSessions, + EvResourcesSnapshot, }; struct TEvReadyToPublishResources : public TEventLocal<TEvReadyToPublishResources, EEv::EvReadyToPublishResources> {}; @@ -134,6 +134,13 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> { }; struct TEvCloseIdleSessions : public TEventLocal<TEvCloseIdleSessions, EEv::EvCloseIdleSessions> {}; + + struct TEvResourcesSnapshot : public TEventLocal<TEvResourcesSnapshot, EEv::EvResourcesSnapshot> { + TVector<NKikimrKqp::TKqpNodeResources> Snapshot; + + TEvResourcesSnapshot(TVector<NKikimrKqp::TKqpNodeResources>&& snapshot) + : Snapshot(std::move(snapshot)) {} + }; }; public: @@ -144,7 +151,8 @@ public: TKqpProxyService(const NKikimrConfig::TLogConfig& logConfig, const NKikimrConfig::TTableServiceConfig& tableServiceConfig, TVector<NKikimrKqp::TKqpSetting>&& settings, - std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory) + std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory, + std::shared_ptr<TKqpProxySharedResources>&& kqpProxySharedResources) : YqlLoggerScope(new NYql::NLog::TTlsLogBackend(new TNullLogBackend())) , LogConfig(logConfig) , TableServiceConfig(tableServiceConfig) @@ -152,6 +160,7 @@ public: , QueryReplayFactory(std::move(queryReplayFactory)) , PendingRequests() , ModuleResolverState() + , KqpProxySharedResources(std::move(kqpProxySharedResources)) {} void Bootstrap() { @@ -202,10 +211,13 @@ public: NActors::TMon* mon = AppData()->Mon; if (mon) { - NMonitoring::TIndexMonPage* actorsMonPage = mon->RegisterIndexPage("actors", "Actors"); - mon->RegisterActorPage(actorsMonPage, "kqp_proxy", "KQP Proxy", false, TlsActivationContext->ExecutorThread.ActorSystem, SelfId()); + NMonitoring::TIndexMonPage* actorsMonPage = mon->RegisterIndexPage("actors", "Actors"); + mon->RegisterActorPage(actorsMonPage, "kqp_proxy", "KQP Proxy", false, + TlsActivationContext->ExecutorThread.ActorSystem, SelfId()); } + KqpRmServiceActor = MakeKqpRmServiceID(SelfId().NodeId()); + Become(&TKqpProxyService::MainState); StartCollectPeerProxyData(); PublishResourceUsage(); @@ -310,13 +322,14 @@ public: SendWhiteboardRequest(); if (AppData()->TenantName.empty() || !SelfDataCenterId) { - KQP_PROXY_LOG_E("Cannot start publishing usage, tenants: " << AppData()->TenantName << ", " << SelfDataCenterId.value_or("empty")); + KQP_PROXY_LOG_E("Cannot start publishing usage, tenants: " << AppData()->TenantName << ", " << SelfDataCenterId.value_or("empty")); return; } auto groupId = GetDefaultStateStorageGroupId(AppData()->TenantName); if (!groupId) { - KQP_PROXY_LOG_D("Unable to determine default state storage group id for database " << AppData()->TenantName); + KQP_PROXY_LOG_D("Unable to determine default state storage group id for database " << + AppData()->TenantName); return; } @@ -328,13 +341,22 @@ public: } void PublishResourceUsage() { + const auto& sbs = TableServiceConfig.GetSessionBalancerSettings(); + auto now = TAppData::TimeProvider->Now(); + TDuration batchingInterval = TDuration::MilliSeconds(sbs.GetBoardPublishIntervalMs()); + + if (TableServiceConfig.GetEnablePublishKqpProxyByRM()) { + if (!LastPublishResourcesAt.has_value() || now - *LastPublishResourcesAt < batchingInterval) { + LastPublishResourcesAt = TAppData::TimeProvider->Now(); + Send(KqpRmServiceActor, std::make_unique<TEvKqp::TEvKqpProxyPublishRequest>()); + } + return; + } + if (ResourcesPublishScheduled) { return; } - const auto& sbs = TableServiceConfig.GetSessionBalancerSettings(); - auto now = TAppData::TimeProvider->Now(); - TDuration batchingInterval = TDuration::MilliSeconds(sbs.GetBoardPublishIntervalMs()); if (LastPublishResourcesAt && now - *LastPublishResourcesAt < batchingInterval) { ResourcesPublishScheduled = true; Schedule(batchingInterval, new TEvPrivate::TEvReadyToPublishResources()); @@ -692,7 +714,15 @@ public: } void Handle(TEvPrivate::TEvCollectPeerProxyData::TPtr&) { - LookupPeerProxyData(); + if (!TableServiceConfig.GetEnablePublishKqpProxyByRM()) { + LookupPeerProxyData(); + } else { + GetKqpResourceManager()->RequestClusterResourcesInfo( + [as = TlsActivationContext->ActorSystem(), self = SelfId()](TVector<NKikimrKqp::TKqpNodeResources>&& resources) { + TAutoPtr<IEventHandle> eh = new IEventHandle(self, self, new TEvPrivate::TEvResourcesSnapshot(std::move(resources))); + as->Send(eh); + }); + } if (!ShutdownRequested) { const auto& sbs = TableServiceConfig.GetSessionBalancerSettings(); ui64 millis = sbs.GetBoardLookupIntervalMs(); @@ -732,6 +762,38 @@ public: TryKickSession(); } + void Handle(TEvPrivate::TEvResourcesSnapshot::TPtr& ev) { + if (ev->Get()->Snapshot.empty()) { + PeerProxyNodeResources.clear(); + KQP_PROXY_LOG_E("Can not find default state storage group for database " << + AppData()->TenantName); + return; + } + + Y_VERIFY(SelfDataCenterId); + PeerProxyNodeResources.resize(ev->Get()->Snapshot.size()); + size_t idx = 0; + auto getDataCenterId = [](const auto& entry) { + return entry.HasDataCenterId() ? entry.GetDataCenterId() : DataCenterToString(entry.GetDataCenterNumId()); + }; + + LocalDatacenterProxies.clear(); + for(auto& nodeResource : ev->Get()->Snapshot) { + auto* proxyNodeResources = nodeResource.MutableKqpProxyNodeResources(); + proxyNodeResources->SetNodeId(nodeResource.GetNodeId()); + + PeerProxyNodeResources[idx] = std::move(*proxyNodeResources); + + if (getDataCenterId(PeerProxyNodeResources[idx]) == *SelfDataCenterId) { + LocalDatacenterProxies.emplace_back(PeerProxyNodeResources[idx].GetNodeId()); + } + ++idx; + } + + PeerStats = CalcPeerStats(PeerProxyNodeResources, *SelfDataCenterId); + TryKickSession(); + } + bool ShouldStartBalancing(const TSimpleResourceStats& stats, const double minResourceThreshold, const double currentResourceUsage) const { const auto& sbs = TableServiceConfig.GetSessionBalancerSettings(); if (stats.CV < sbs.GetMinCVTreshold()) { @@ -1028,6 +1090,7 @@ public: hFunc(TEvKqp::TEvPingSessionResponse, ForwardEvent); hFunc(TEvKqp::TEvInitiateShutdownRequest, Handle); hFunc(TEvPrivate::TEvOnRequestTimeout, Handle); + hFunc(TEvPrivate::TEvResourcesSnapshot, Handle); hFunc(NNodeWhiteboard::TEvWhiteboard::TEvSystemStateResponse, Handle); hFunc(TEvKqp::TEvCreateSessionResponse, ForwardEvent); hFunc(TEvPrivate::TEvCloseIdleSessions, Handle); @@ -1156,6 +1219,7 @@ private: auto workerId = TlsActivationContext->ExecutorThread.RegisterActor(sessionActor, TMailboxType::HTSwap, AppData()->UserPoolId); TKqpSessionInfo* sessionInfo = LocalSessions->Create( sessionId, workerId, database, dbCounters, supportsBalancing, GetSessionIdleDuration()); + KqpProxySharedResources->AtomicLocalSessionCount.store(LocalSessions->size()); KQP_PROXY_LOG_D(requestInfo << "Created new session" << ", sessionId: " << sessionInfo->SessionId @@ -1199,6 +1263,7 @@ private: void RemoveSession(const TString& sessionId, const TActorId& workerId) { if (!sessionId.empty()) { LocalSessions->Erase(sessionId); + KqpProxySharedResources->AtomicLocalSessionCount.store(LocalSessions->size()); PublishResourceUsage(); if (ShutdownRequested) { ShutdownState->Update(LocalSessions->size()); @@ -1208,6 +1273,7 @@ private: } LocalSessions->Erase(workerId); + KqpProxySharedResources->AtomicLocalSessionCount.store(LocalSessions->size()); PublishResourceUsage(); if (ShutdownRequested) { ShutdownState->Update(LocalSessions->size()); @@ -1251,6 +1317,7 @@ private: TIntrusivePtr<TKqpCounters> Counters; std::unique_ptr<TLocalSessionsRegistry> LocalSessions; + std::shared_ptr<TKqpProxySharedResources> KqpProxySharedResources; bool ServerWorkerBalancerComplete = false; std::optional<TString> SelfDataCenterId; @@ -1260,6 +1327,8 @@ private: bool ResourcesPublishScheduled = false; TString PublishBoardPath; std::optional<TInstant> LastPublishResourcesAt; + + TActorId KqpRmServiceActor; TActorId BoardLookupActor; TActorId BoardPublishActor; TActorId CompileService; @@ -1274,9 +1343,11 @@ private: IActor* CreateKqpProxyService(const NKikimrConfig::TLogConfig& logConfig, const NKikimrConfig::TTableServiceConfig& tableServiceConfig, TVector<NKikimrKqp::TKqpSetting>&& settings, - std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory) + std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory, + std::shared_ptr<TKqpProxySharedResources> kqpProxySharedResources) { - return new TKqpProxyService(logConfig, tableServiceConfig, std::move(settings), std::move(queryReplayFactory)); + return new TKqpProxyService(logConfig, tableServiceConfig, std::move(settings), + std::move(queryReplayFactory),std::move(kqpProxySharedResources)); } } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.h b/ydb/core/kqp/proxy_service/kqp_proxy_service.h index 8dc1b9622d..f4ef31c29b 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.h +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.h @@ -3,6 +3,7 @@ #include <ydb/core/base/appdata.h> #include <ydb/core/kqp/common/kqp.h> #include <ydb/core/kqp/counters/kqp_counters.h> +#include <ydb/core/kqp/rm_service/kqp_rm_service.h> #include <ydb/core/protos/kqp.pb.h> #include <library/cpp/actors/core/actorid.h> @@ -317,7 +318,6 @@ private: } }; - TSimpleResourceStats CalcPeerStats( const TVector<NKikimrKqp::TKqpProxyNodeResources>& data, const TString& selfDataCenterId, bool localDatacenterPolicy, std::function<double(const NKikimrKqp::TKqpProxyNodeResources& entry)> ExtractValue); @@ -327,6 +327,7 @@ TPeerStats CalcPeerStats(const TVector<NKikimrKqp::TKqpProxyNodeResources>& data IActor* CreateKqpProxyService(const NKikimrConfig::TLogConfig& logConfig, const NKikimrConfig::TTableServiceConfig& tableServiceConfig, TVector<NKikimrKqp::TKqpSetting>&& settings, - std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory); + std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory, + std::shared_ptr<TKqpProxySharedResources> kqpProxySharedResources); } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/rm_service/CMakeLists.darwin.txt b/ydb/core/kqp/rm_service/CMakeLists.darwin.txt index 316317ff2f..d7f93026d2 100644 --- a/ydb/core/kqp/rm_service/CMakeLists.darwin.txt +++ b/ydb/core/kqp/rm_service/CMakeLists.darwin.txt @@ -16,6 +16,7 @@ target_link_libraries(core-kqp-rm_service PUBLIC contrib-libs-cxxsupp yutil cpp-actors-core + cpp-actors-interconnect ydb-core-actorlib_impl ydb-core-base core-cms-console @@ -25,6 +26,7 @@ target_link_libraries(core-kqp-rm_service PUBLIC ydb-core-mon ydb-core-protos ydb-core-tablet + ydb-core-node_whiteboard ) target_sources(core-kqp-rm_service PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_resource_tracker.cpp diff --git a/ydb/core/kqp/rm_service/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/rm_service/CMakeLists.linux-aarch64.txt index 8f967d11b7..78bf95bf0f 100644 --- a/ydb/core/kqp/rm_service/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/rm_service/CMakeLists.linux-aarch64.txt @@ -17,6 +17,7 @@ target_link_libraries(core-kqp-rm_service PUBLIC contrib-libs-cxxsupp yutil cpp-actors-core + cpp-actors-interconnect ydb-core-actorlib_impl ydb-core-base core-cms-console @@ -26,6 +27,7 @@ target_link_libraries(core-kqp-rm_service PUBLIC ydb-core-mon ydb-core-protos ydb-core-tablet + ydb-core-node_whiteboard ) target_sources(core-kqp-rm_service PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_resource_tracker.cpp diff --git a/ydb/core/kqp/rm_service/CMakeLists.linux.txt b/ydb/core/kqp/rm_service/CMakeLists.linux.txt index 8f967d11b7..78bf95bf0f 100644 --- a/ydb/core/kqp/rm_service/CMakeLists.linux.txt +++ b/ydb/core/kqp/rm_service/CMakeLists.linux.txt @@ -17,6 +17,7 @@ target_link_libraries(core-kqp-rm_service PUBLIC contrib-libs-cxxsupp yutil cpp-actors-core + cpp-actors-interconnect ydb-core-actorlib_impl ydb-core-base core-cms-console @@ -26,6 +27,7 @@ target_link_libraries(core-kqp-rm_service PUBLIC ydb-core-mon ydb-core-protos ydb-core-tablet + ydb-core-node_whiteboard ) target_sources(core-kqp-rm_service PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_resource_tracker.cpp diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.cpp b/ydb/core/kqp/rm_service/kqp_rm_service.cpp index 45023d56a2..c68af8de8e 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_service.cpp +++ b/ydb/core/kqp/rm_service/kqp_rm_service.cpp @@ -1,14 +1,17 @@ #include "kqp_rm_service.h" +#include <ydb/core/base/location.h> #include <ydb/core/base/statestorage.h> #include <ydb/core/cms/console/configs_dispatcher.h> #include <ydb/core/cms/console/console.h> +#include <ydb/core/kqp/common/kqp.h> #include <ydb/core/mind/tenant_pool.h> #include <ydb/core/mon/mon.h> +#include <ydb/core/node_whiteboard/node_whiteboard.h> #include <ydb/core/tablet/resource_broker.h> -#include <ydb/core/kqp/common/kqp.h> #include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/interconnect/interconnect.h> #include <library/cpp/monlib/service/pages/templates.h> #include <ydb/library/yql/utils/yql_panic.h> @@ -158,12 +161,14 @@ public: } TKqpResourceManagerActor(const NKikimrConfig::TTableServiceConfig::TResourceManager& config, - TIntrusivePtr<TKqpCounters> counters, const TActorId& resourceBrokerId) + TIntrusivePtr<TKqpCounters> counters, const TActorId& resourceBrokerId, + std::shared_ptr<TKqpProxySharedResources>&& kqpProxySharedResources) : Config(config) , Counters(counters) , ResourceBrokerId(resourceBrokerId ? resourceBrokerId : MakeResourceBrokerID()) , ExecutionUnitsResource(Config.GetComputeActorsCount()) , ScanQueryMemoryResource(Config.GetQueryMemoryLimit()) + , KqpProxySharedResources(std::move(kqpProxySharedResources)) {} void Bootstrap() { @@ -194,8 +199,13 @@ public: ActorSystem, SelfId()); } + WhiteBoardService = NNodeWhiteboard::MakeNodeWhiteboardServiceId(SelfId().NodeId()); + Become(&TKqpResourceManagerActor::WorkState); + AskSelfNodeInfo(); + SendWhiteboardRequest(); + auto rm = new TKqpNodeResourceManager(SelfId().NodeId(), this); with_lock (ResourceManagers.Lock) { ResourceManagers.ByNodeId[SelfId().NodeId()] = rm; @@ -340,6 +350,33 @@ public: return true; } + void SendWhiteboardRequest() { + auto ev = std::make_unique<NNodeWhiteboard::TEvWhiteboard::TEvSystemStateRequest>(); + Send(WhiteBoardService, ev.release(), IEventHandle::FlagTrackDelivery, SelfId().NodeId()); + } + + void Handle(NNodeWhiteboard::TEvWhiteboard::TEvSystemStateResponse::TPtr& ev) { + const auto& record = ev->Get()->Record; + if (record.SystemStateInfoSize() != 1) { + LOG_C("Unexpected whiteboard info"); + return; + } + + const auto& info = record.GetSystemStateInfo(0); + if (AppData()->UserPoolId >= info.PoolStatsSize()) { + LOG_C("Unexpected whiteboard info: pool size is smaller than user pool id" + << ", pool size: " << info.PoolStatsSize() + << ", user pool id: " << AppData()->UserPoolId); + return; + } + + const auto& pool = info.GetPoolStats(AppData()->UserPoolId); + + LOG_C("Received node white board pool stats: " << pool.usage()); + ProxyNodeResources.SetCpuUsage(pool.usage()); + ProxyNodeResources.SetThreads(pool.threads()); + } + bool AllocateResources(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources, TResourcesAllocatedCallback&& onSuccess, TNotEnoughtResourcesCallback&& onFail, TDuration timeout) override { @@ -588,9 +625,12 @@ public: private: STATEFN(WorkState) { switch (ev->GetTypeRewrite()) { + hFunc(TEvInterconnect::TEvNodeInfo, Handle); hFunc(TEvPrivate::TEvPublishResources, HandleWork); hFunc(TEvPrivate::TEvSchedulePublishResources, HandleWork); hFunc(TEvPrivate::TEvTakeResourcesSnapshot, HandleWork); + hFunc(NNodeWhiteboard::TEvWhiteboard::TEvSystemStateResponse, Handle); + hFunc(TEvKqp::TEvKqpProxyPublishRequest, HandleWork); hFunc(TEvResourceBroker::TEvConfigResponse, HandleWork); hFunc(TEvResourceBroker::TEvResourceBrokerResponse, HandleWork); hFunc(TEvTenantPool::TEvTenantPoolStatus, HandleWork); @@ -617,6 +657,10 @@ private: PublishResourceUsage("alloc"); } + void HandleWork(TEvKqp::TEvKqpProxyPublishRequest::TPtr&) { + PublishResourceUsage("kqp_proxy"); + } + void HandleWork(TEvPrivate::TEvTakeResourcesSnapshot::TPtr& ev) { if (WbState.StateStorageGroupId == std::numeric_limits<ui32>::max()) { LOG_E("Can not take resources snapshot, ssGroupId not set. Tenant: " << WbState.Tenant @@ -652,6 +696,21 @@ private: } } + void AskSelfNodeInfo() { + Send(GetNameserviceActorId(), new TEvInterconnect::TEvGetNode(SelfId().NodeId())); + } + + void Handle(TEvInterconnect::TEvNodeInfo::TPtr& ev) { + auto selfDataCenterId = TString(); + if (const auto& node = ev->Get()->Node) { + selfDataCenterId = node->Location.GetDataCenterId(); + } + + ProxyNodeResources.SetDataCenterNumId(DataCenterFromString(selfDataCenterId)); + ProxyNodeResources.SetDataCenterId(selfDataCenterId); + PublishResourceUsage("data_center update"); + } + void HandleWork(TEvTenantPool::TEvTenantPoolStatus::TPtr& ev) { TString tenant; for (auto &slot : ev->Get()->Record.GetSlots()) { @@ -867,6 +926,13 @@ private: NKikimrKqp::TKqpNodeResources payload; payload.SetNodeId(SelfId().NodeId()); payload.SetTimestamp(now.Seconds()); + auto* proxyNodeResources = payload.MutableKqpProxyNodeResources(); + if (KqpProxySharedResources) { + ProxyNodeResources.SetActiveWorkersCount(KqpProxySharedResources->AtomicLocalSessionCount.load()); + } else { + LOG_D("Don't set KqpProxySharedResources"); + } + *proxyNodeResources = ProxyNodeResources; ActorIdToProto(MakeKqpResourceManagerServiceID(SelfId().NodeId()), payload.MutableResourceManagerActorId()); // legacy with_lock (Lock) { payload.SetAvailableComputeActors(ExecutionUnitsResource.Available()); // legacy @@ -924,15 +990,21 @@ private: // pattern cache for different actors std::shared_ptr<NMiniKQL::TComputationPatternLRUCache> PatternCache; + + std::shared_ptr<TKqpProxySharedResources> KqpProxySharedResources; + NKikimrKqp::TKqpProxyNodeResources ProxyNodeResources; + + TActorId WhiteBoardService; }; } // namespace NRm NActors::IActor* CreateKqpResourceManagerActor(const NKikimrConfig::TTableServiceConfig::TResourceManager& config, - TIntrusivePtr<TKqpCounters> counters, NActors::TActorId resourceBroker) + TIntrusivePtr<TKqpCounters> counters, NActors::TActorId resourceBroker, + std::shared_ptr<TKqpProxySharedResources> kqpProxySharedResources) { - return new NRm::TKqpResourceManagerActor(config, counters, resourceBroker); + return new NRm::TKqpResourceManagerActor(config, counters, resourceBroker, std::move(kqpProxySharedResources)); } NRm::IKqpResourceManager* GetKqpResourceManager(TMaybe<ui32> _nodeId) { diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.h b/ydb/core/kqp/rm_service/kqp_rm_service.h index d83d53ab56..49f0d0db90 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_service.h +++ b/ydb/core/kqp/rm_service/kqp_rm_service.h @@ -100,8 +100,13 @@ NActors::IActor* CreateTakeResourcesSnapshotActor( } // namespace NRm +struct TKqpProxySharedResources { + std::atomic<ui32> AtomicLocalSessionCount{0}; +}; + NActors::IActor* CreateKqpResourceManagerActor(const NKikimrConfig::TTableServiceConfig::TResourceManager& config, - TIntrusivePtr<TKqpCounters> counters, NActors::TActorId resourceBroker = {}); + TIntrusivePtr<TKqpCounters> counters, NActors::TActorId resourceBroker = {}, + std::shared_ptr<TKqpProxySharedResources> kqpProxySharedResources = nullptr); NRm::IKqpResourceManager* GetKqpResourceManager(TMaybe<ui32> nodeId = Nothing()); NRm::IKqpResourceManager* TryGetKqpResourceManager(TMaybe<ui32> nodeId = Nothing()); diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 596ca448cb..6ffb6805d8 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1260,6 +1260,7 @@ message TTableServiceConfig { optional bool EnableKqpDataQueryStreamLookup = 31 [default = false]; optional TExecuterRetriesConfig ExecuterRetriesConfig = 32; optional bool EnableKqpDataQueryStreamPointLookup = 33 [default = false]; + optional bool EnablePublishKqpProxyByRM = 34 [default = false]; }; // Config describes immediate controls and allows diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index 887c91aeb8..af80aa3eb1 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -522,6 +522,8 @@ message TKqpNodeResources { } repeated TMemory Memory = 8; optional uint32 ExecutionUnits = 9; + + optional TKqpProxyNodeResources KqpProxyNodeResources = 10; } /// Scans diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 1fe50ed10c..b3038e1d88 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -753,18 +753,18 @@ namespace Tests { TActorId healthCheckId = Runtime->Register(healthCheck, nodeIdx); Runtime->RegisterService(NHealthCheck::MakeHealthCheckID(), healthCheckId, nodeIdx); } - { - IActor* kqpRmService = NKqp::CreateKqpResourceManagerActor(Settings->AppConfig.GetTableServiceConfig().GetResourceManager(), nullptr); + auto kqpProxySharedResources = std::make_shared<NKqp::TKqpProxySharedResources>(); + + IActor* kqpRmService = NKqp::CreateKqpResourceManagerActor( + Settings->AppConfig.GetTableServiceConfig().GetResourceManager(), nullptr, {}, kqpProxySharedResources); TActorId kqpRmServiceId = Runtime->Register(kqpRmService, nodeIdx); Runtime->RegisterService(NKqp::MakeKqpRmServiceID(Runtime->GetNodeId(nodeIdx)), kqpRmServiceId, nodeIdx); - } - - { + IActor* kqpProxyService = NKqp::CreateKqpProxyService(Settings->AppConfig.GetLogConfig(), Settings->AppConfig.GetTableServiceConfig(), TVector<NKikimrKqp::TKqpSetting>(Settings->KqpSettings), - nullptr); + nullptr, std::move(kqpProxySharedResources)); TActorId kqpProxyServiceId = Runtime->Register(kqpProxyService, nodeIdx); Runtime->RegisterService(NKqp::MakeKqpProxyID(Runtime->GetNodeId(nodeIdx)), kqpProxyServiceId, nodeIdx); } |