aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorshumkovnd <shumkovnd@yandex-team.com>2023-03-02 20:31:10 +0300
committershumkovnd <shumkovnd@yandex-team.com>2023-03-02 20:31:10 +0300
commit50f71db153aefbf57790eababbae66ee298f4ef1 (patch)
tree0d556ac878f3017307c8d336b66ddf2bb3c56b05
parente92cfd20e40d4577bafbae4a0fe7077d4df07bdd (diff)
downloadydb-50f71db153aefbf57790eababbae66ee298f4ef1.tar.gz
add publishing kqp proxy data by RM
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp7
-rw-r--r--ydb/core/kqp/common/kqp.h3
-rw-r--r--ydb/core/kqp/common/kqp_event_ids.h1
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.cpp95
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.h5
-rw-r--r--ydb/core/kqp/rm_service/CMakeLists.darwin.txt2
-rw-r--r--ydb/core/kqp/rm_service/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/kqp/rm_service/CMakeLists.linux.txt2
-rw-r--r--ydb/core/kqp/rm_service/kqp_rm_service.cpp80
-rw-r--r--ydb/core/kqp/rm_service/kqp_rm_service.h7
-rw-r--r--ydb/core/protos/config.proto1
-rw-r--r--ydb/core/protos/kqp.proto2
-rw-r--r--ydb/core/testlib/test_client.cpp12
13 files changed, 192 insertions, 27 deletions
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index d071e27eb0..9750acab39 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -2280,14 +2280,17 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu
enableSpilling.SetValue(appData->EnableKqpSpilling ? "true" : "false");
settings.emplace_back(std::move(enableSpilling));
+ auto kqpProxySharedResources = std::make_shared<NKqp::TKqpProxySharedResources>();
+
// Crate resource manager
- auto rm = NKqp::CreateKqpResourceManagerActor(Config.GetTableServiceConfig().GetResourceManager(), nullptr);
+ auto rm = NKqp::CreateKqpResourceManagerActor(Config.GetTableServiceConfig().GetResourceManager(), nullptr,
+ {}, kqpProxySharedResources);
setup->LocalServices.push_back(std::make_pair(
NKqp::MakeKqpRmServiceID(NodeId),
TActorSetupCmd(rm, TMailboxType::HTSwap, appData->UserPoolId)));
auto proxy = NKqp::CreateKqpProxyService(Config.GetLogConfig(), Config.GetTableServiceConfig(),
- std::move(settings), Factories->QueryReplayBackendFactory);
+ std::move(settings), Factories->QueryReplayBackendFactory, std::move(kqpProxySharedResources));
setup->LocalServices.push_back(std::make_pair(
NKqp::MakeKqpProxyID(NodeId),
TActorSetupCmd(proxy, TMailboxType::HTSwap, appData->UserPoolId)));
diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h
index 16e4666af4..6140298401 100644
--- a/ydb/core/kqp/common/kqp.h
+++ b/ydb/core/kqp/common/kqp.h
@@ -718,6 +718,9 @@ struct TEvKqp {
NLWTrace::TOrbit Orbit;
};
+ struct TEvKqpProxyPublishRequest :
+ public TEventLocal<TEvKqpProxyPublishRequest, TKqpEvents::EvKqpProxyPublishRequest> {};
+
struct TEvCompileInvalidateRequest : public TEventLocal<TEvCompileInvalidateRequest,
TKqpEvents::EvCompileInvalidateRequest>
{
diff --git a/ydb/core/kqp/common/kqp_event_ids.h b/ydb/core/kqp/common/kqp_event_ids.h
index 05012cbd21..526317996b 100644
--- a/ydb/core/kqp/common/kqp_event_ids.h
+++ b/ydb/core/kqp/common/kqp_event_ids.h
@@ -34,6 +34,7 @@ struct TKqpEvents {
EvScriptResponse,
EvFetchScriptResultsRequest,
EvFetchScriptResultsResponse,
+ EvKqpProxyPublishRequest,
};
static_assert (EvCompileInvalidateRequest + 1 == EvAbortExecution);
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
index ee2a2969ab..a43d14955e 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
@@ -12,7 +12,6 @@
#include <ydb/core/kqp/compile_service/kqp_compile_service.h>
#include <ydb/core/kqp/session_actor/kqp_worker_common.h>
#include <ydb/core/kqp/node_service/kqp_node_service.h>
-#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
#include <ydb/core/kqp/run_script_actor/kqp_run_script_actor.h>
#include <ydb/core/kqp/runtime/kqp_spilling_file.h>
#include <ydb/core/kqp/runtime/kqp_spilling.h>
@@ -109,6 +108,7 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
EvCollectPeerProxyData,
EvOnRequestTimeout,
EvCloseIdleSessions,
+ EvResourcesSnapshot,
};
struct TEvReadyToPublishResources : public TEventLocal<TEvReadyToPublishResources, EEv::EvReadyToPublishResources> {};
@@ -134,6 +134,13 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
};
struct TEvCloseIdleSessions : public TEventLocal<TEvCloseIdleSessions, EEv::EvCloseIdleSessions> {};
+
+ struct TEvResourcesSnapshot : public TEventLocal<TEvResourcesSnapshot, EEv::EvResourcesSnapshot> {
+ TVector<NKikimrKqp::TKqpNodeResources> Snapshot;
+
+ TEvResourcesSnapshot(TVector<NKikimrKqp::TKqpNodeResources>&& snapshot)
+ : Snapshot(std::move(snapshot)) {}
+ };
};
public:
@@ -144,7 +151,8 @@ public:
TKqpProxyService(const NKikimrConfig::TLogConfig& logConfig,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
TVector<NKikimrKqp::TKqpSetting>&& settings,
- std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory)
+ std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory,
+ std::shared_ptr<TKqpProxySharedResources>&& kqpProxySharedResources)
: YqlLoggerScope(new NYql::NLog::TTlsLogBackend(new TNullLogBackend()))
, LogConfig(logConfig)
, TableServiceConfig(tableServiceConfig)
@@ -152,6 +160,7 @@ public:
, QueryReplayFactory(std::move(queryReplayFactory))
, PendingRequests()
, ModuleResolverState()
+ , KqpProxySharedResources(std::move(kqpProxySharedResources))
{}
void Bootstrap() {
@@ -202,10 +211,13 @@ public:
NActors::TMon* mon = AppData()->Mon;
if (mon) {
- NMonitoring::TIndexMonPage* actorsMonPage = mon->RegisterIndexPage("actors", "Actors");
- mon->RegisterActorPage(actorsMonPage, "kqp_proxy", "KQP Proxy", false, TlsActivationContext->ExecutorThread.ActorSystem, SelfId());
+ NMonitoring::TIndexMonPage* actorsMonPage = mon->RegisterIndexPage("actors", "Actors");
+ mon->RegisterActorPage(actorsMonPage, "kqp_proxy", "KQP Proxy", false,
+ TlsActivationContext->ExecutorThread.ActorSystem, SelfId());
}
+ KqpRmServiceActor = MakeKqpRmServiceID(SelfId().NodeId());
+
Become(&TKqpProxyService::MainState);
StartCollectPeerProxyData();
PublishResourceUsage();
@@ -310,13 +322,14 @@ public:
SendWhiteboardRequest();
if (AppData()->TenantName.empty() || !SelfDataCenterId) {
- KQP_PROXY_LOG_E("Cannot start publishing usage, tenants: " << AppData()->TenantName << ", " << SelfDataCenterId.value_or("empty"));
+ KQP_PROXY_LOG_E("Cannot start publishing usage, tenants: " << AppData()->TenantName << ", " << SelfDataCenterId.value_or("empty"));
return;
}
auto groupId = GetDefaultStateStorageGroupId(AppData()->TenantName);
if (!groupId) {
- KQP_PROXY_LOG_D("Unable to determine default state storage group id for database " << AppData()->TenantName);
+ KQP_PROXY_LOG_D("Unable to determine default state storage group id for database " <<
+ AppData()->TenantName);
return;
}
@@ -328,13 +341,22 @@ public:
}
void PublishResourceUsage() {
+ const auto& sbs = TableServiceConfig.GetSessionBalancerSettings();
+ auto now = TAppData::TimeProvider->Now();
+ TDuration batchingInterval = TDuration::MilliSeconds(sbs.GetBoardPublishIntervalMs());
+
+ if (TableServiceConfig.GetEnablePublishKqpProxyByRM()) {
+ if (!LastPublishResourcesAt.has_value() || now - *LastPublishResourcesAt < batchingInterval) {
+ LastPublishResourcesAt = TAppData::TimeProvider->Now();
+ Send(KqpRmServiceActor, std::make_unique<TEvKqp::TEvKqpProxyPublishRequest>());
+ }
+ return;
+ }
+
if (ResourcesPublishScheduled) {
return;
}
- const auto& sbs = TableServiceConfig.GetSessionBalancerSettings();
- auto now = TAppData::TimeProvider->Now();
- TDuration batchingInterval = TDuration::MilliSeconds(sbs.GetBoardPublishIntervalMs());
if (LastPublishResourcesAt && now - *LastPublishResourcesAt < batchingInterval) {
ResourcesPublishScheduled = true;
Schedule(batchingInterval, new TEvPrivate::TEvReadyToPublishResources());
@@ -692,7 +714,15 @@ public:
}
void Handle(TEvPrivate::TEvCollectPeerProxyData::TPtr&) {
- LookupPeerProxyData();
+ if (!TableServiceConfig.GetEnablePublishKqpProxyByRM()) {
+ LookupPeerProxyData();
+ } else {
+ GetKqpResourceManager()->RequestClusterResourcesInfo(
+ [as = TlsActivationContext->ActorSystem(), self = SelfId()](TVector<NKikimrKqp::TKqpNodeResources>&& resources) {
+ TAutoPtr<IEventHandle> eh = new IEventHandle(self, self, new TEvPrivate::TEvResourcesSnapshot(std::move(resources)));
+ as->Send(eh);
+ });
+ }
if (!ShutdownRequested) {
const auto& sbs = TableServiceConfig.GetSessionBalancerSettings();
ui64 millis = sbs.GetBoardLookupIntervalMs();
@@ -732,6 +762,38 @@ public:
TryKickSession();
}
+ void Handle(TEvPrivate::TEvResourcesSnapshot::TPtr& ev) {
+ if (ev->Get()->Snapshot.empty()) {
+ PeerProxyNodeResources.clear();
+ KQP_PROXY_LOG_E("Can not find default state storage group for database " <<
+ AppData()->TenantName);
+ return;
+ }
+
+ Y_VERIFY(SelfDataCenterId);
+ PeerProxyNodeResources.resize(ev->Get()->Snapshot.size());
+ size_t idx = 0;
+ auto getDataCenterId = [](const auto& entry) {
+ return entry.HasDataCenterId() ? entry.GetDataCenterId() : DataCenterToString(entry.GetDataCenterNumId());
+ };
+
+ LocalDatacenterProxies.clear();
+ for(auto& nodeResource : ev->Get()->Snapshot) {
+ auto* proxyNodeResources = nodeResource.MutableKqpProxyNodeResources();
+ proxyNodeResources->SetNodeId(nodeResource.GetNodeId());
+
+ PeerProxyNodeResources[idx] = std::move(*proxyNodeResources);
+
+ if (getDataCenterId(PeerProxyNodeResources[idx]) == *SelfDataCenterId) {
+ LocalDatacenterProxies.emplace_back(PeerProxyNodeResources[idx].GetNodeId());
+ }
+ ++idx;
+ }
+
+ PeerStats = CalcPeerStats(PeerProxyNodeResources, *SelfDataCenterId);
+ TryKickSession();
+ }
+
bool ShouldStartBalancing(const TSimpleResourceStats& stats, const double minResourceThreshold, const double currentResourceUsage) const {
const auto& sbs = TableServiceConfig.GetSessionBalancerSettings();
if (stats.CV < sbs.GetMinCVTreshold()) {
@@ -1028,6 +1090,7 @@ public:
hFunc(TEvKqp::TEvPingSessionResponse, ForwardEvent);
hFunc(TEvKqp::TEvInitiateShutdownRequest, Handle);
hFunc(TEvPrivate::TEvOnRequestTimeout, Handle);
+ hFunc(TEvPrivate::TEvResourcesSnapshot, Handle);
hFunc(NNodeWhiteboard::TEvWhiteboard::TEvSystemStateResponse, Handle);
hFunc(TEvKqp::TEvCreateSessionResponse, ForwardEvent);
hFunc(TEvPrivate::TEvCloseIdleSessions, Handle);
@@ -1156,6 +1219,7 @@ private:
auto workerId = TlsActivationContext->ExecutorThread.RegisterActor(sessionActor, TMailboxType::HTSwap, AppData()->UserPoolId);
TKqpSessionInfo* sessionInfo = LocalSessions->Create(
sessionId, workerId, database, dbCounters, supportsBalancing, GetSessionIdleDuration());
+ KqpProxySharedResources->AtomicLocalSessionCount.store(LocalSessions->size());
KQP_PROXY_LOG_D(requestInfo << "Created new session"
<< ", sessionId: " << sessionInfo->SessionId
@@ -1199,6 +1263,7 @@ private:
void RemoveSession(const TString& sessionId, const TActorId& workerId) {
if (!sessionId.empty()) {
LocalSessions->Erase(sessionId);
+ KqpProxySharedResources->AtomicLocalSessionCount.store(LocalSessions->size());
PublishResourceUsage();
if (ShutdownRequested) {
ShutdownState->Update(LocalSessions->size());
@@ -1208,6 +1273,7 @@ private:
}
LocalSessions->Erase(workerId);
+ KqpProxySharedResources->AtomicLocalSessionCount.store(LocalSessions->size());
PublishResourceUsage();
if (ShutdownRequested) {
ShutdownState->Update(LocalSessions->size());
@@ -1251,6 +1317,7 @@ private:
TIntrusivePtr<TKqpCounters> Counters;
std::unique_ptr<TLocalSessionsRegistry> LocalSessions;
+ std::shared_ptr<TKqpProxySharedResources> KqpProxySharedResources;
bool ServerWorkerBalancerComplete = false;
std::optional<TString> SelfDataCenterId;
@@ -1260,6 +1327,8 @@ private:
bool ResourcesPublishScheduled = false;
TString PublishBoardPath;
std::optional<TInstant> LastPublishResourcesAt;
+
+ TActorId KqpRmServiceActor;
TActorId BoardLookupActor;
TActorId BoardPublishActor;
TActorId CompileService;
@@ -1274,9 +1343,11 @@ private:
IActor* CreateKqpProxyService(const NKikimrConfig::TLogConfig& logConfig,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
TVector<NKikimrKqp::TKqpSetting>&& settings,
- std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory)
+ std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory,
+ std::shared_ptr<TKqpProxySharedResources> kqpProxySharedResources)
{
- return new TKqpProxyService(logConfig, tableServiceConfig, std::move(settings), std::move(queryReplayFactory));
+ return new TKqpProxyService(logConfig, tableServiceConfig, std::move(settings),
+ std::move(queryReplayFactory),std::move(kqpProxySharedResources));
}
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.h b/ydb/core/kqp/proxy_service/kqp_proxy_service.h
index 8dc1b9622d..f4ef31c29b 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.h
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.h
@@ -3,6 +3,7 @@
#include <ydb/core/base/appdata.h>
#include <ydb/core/kqp/common/kqp.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
+#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
#include <ydb/core/protos/kqp.pb.h>
#include <library/cpp/actors/core/actorid.h>
@@ -317,7 +318,6 @@ private:
}
};
-
TSimpleResourceStats CalcPeerStats(
const TVector<NKikimrKqp::TKqpProxyNodeResources>& data, const TString& selfDataCenterId, bool localDatacenterPolicy,
std::function<double(const NKikimrKqp::TKqpProxyNodeResources& entry)> ExtractValue);
@@ -327,6 +327,7 @@ TPeerStats CalcPeerStats(const TVector<NKikimrKqp::TKqpProxyNodeResources>& data
IActor* CreateKqpProxyService(const NKikimrConfig::TLogConfig& logConfig,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
TVector<NKikimrKqp::TKqpSetting>&& settings,
- std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory);
+ std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory,
+ std::shared_ptr<TKqpProxySharedResources> kqpProxySharedResources);
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/rm_service/CMakeLists.darwin.txt b/ydb/core/kqp/rm_service/CMakeLists.darwin.txt
index 316317ff2f..d7f93026d2 100644
--- a/ydb/core/kqp/rm_service/CMakeLists.darwin.txt
+++ b/ydb/core/kqp/rm_service/CMakeLists.darwin.txt
@@ -16,6 +16,7 @@ target_link_libraries(core-kqp-rm_service PUBLIC
contrib-libs-cxxsupp
yutil
cpp-actors-core
+ cpp-actors-interconnect
ydb-core-actorlib_impl
ydb-core-base
core-cms-console
@@ -25,6 +26,7 @@ target_link_libraries(core-kqp-rm_service PUBLIC
ydb-core-mon
ydb-core-protos
ydb-core-tablet
+ ydb-core-node_whiteboard
)
target_sources(core-kqp-rm_service PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_resource_tracker.cpp
diff --git a/ydb/core/kqp/rm_service/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/rm_service/CMakeLists.linux-aarch64.txt
index 8f967d11b7..78bf95bf0f 100644
--- a/ydb/core/kqp/rm_service/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/rm_service/CMakeLists.linux-aarch64.txt
@@ -17,6 +17,7 @@ target_link_libraries(core-kqp-rm_service PUBLIC
contrib-libs-cxxsupp
yutil
cpp-actors-core
+ cpp-actors-interconnect
ydb-core-actorlib_impl
ydb-core-base
core-cms-console
@@ -26,6 +27,7 @@ target_link_libraries(core-kqp-rm_service PUBLIC
ydb-core-mon
ydb-core-protos
ydb-core-tablet
+ ydb-core-node_whiteboard
)
target_sources(core-kqp-rm_service PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_resource_tracker.cpp
diff --git a/ydb/core/kqp/rm_service/CMakeLists.linux.txt b/ydb/core/kqp/rm_service/CMakeLists.linux.txt
index 8f967d11b7..78bf95bf0f 100644
--- a/ydb/core/kqp/rm_service/CMakeLists.linux.txt
+++ b/ydb/core/kqp/rm_service/CMakeLists.linux.txt
@@ -17,6 +17,7 @@ target_link_libraries(core-kqp-rm_service PUBLIC
contrib-libs-cxxsupp
yutil
cpp-actors-core
+ cpp-actors-interconnect
ydb-core-actorlib_impl
ydb-core-base
core-cms-console
@@ -26,6 +27,7 @@ target_link_libraries(core-kqp-rm_service PUBLIC
ydb-core-mon
ydb-core-protos
ydb-core-tablet
+ ydb-core-node_whiteboard
)
target_sources(core-kqp-rm_service PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_resource_tracker.cpp
diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.cpp b/ydb/core/kqp/rm_service/kqp_rm_service.cpp
index 45023d56a2..c68af8de8e 100644
--- a/ydb/core/kqp/rm_service/kqp_rm_service.cpp
+++ b/ydb/core/kqp/rm_service/kqp_rm_service.cpp
@@ -1,14 +1,17 @@
#include "kqp_rm_service.h"
+#include <ydb/core/base/location.h>
#include <ydb/core/base/statestorage.h>
#include <ydb/core/cms/console/configs_dispatcher.h>
#include <ydb/core/cms/console/console.h>
+#include <ydb/core/kqp/common/kqp.h>
#include <ydb/core/mind/tenant_pool.h>
#include <ydb/core/mon/mon.h>
+#include <ydb/core/node_whiteboard/node_whiteboard.h>
#include <ydb/core/tablet/resource_broker.h>
-#include <ydb/core/kqp/common/kqp.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/interconnect/interconnect.h>
#include <library/cpp/monlib/service/pages/templates.h>
#include <ydb/library/yql/utils/yql_panic.h>
@@ -158,12 +161,14 @@ public:
}
TKqpResourceManagerActor(const NKikimrConfig::TTableServiceConfig::TResourceManager& config,
- TIntrusivePtr<TKqpCounters> counters, const TActorId& resourceBrokerId)
+ TIntrusivePtr<TKqpCounters> counters, const TActorId& resourceBrokerId,
+ std::shared_ptr<TKqpProxySharedResources>&& kqpProxySharedResources)
: Config(config)
, Counters(counters)
, ResourceBrokerId(resourceBrokerId ? resourceBrokerId : MakeResourceBrokerID())
, ExecutionUnitsResource(Config.GetComputeActorsCount())
, ScanQueryMemoryResource(Config.GetQueryMemoryLimit())
+ , KqpProxySharedResources(std::move(kqpProxySharedResources))
{}
void Bootstrap() {
@@ -194,8 +199,13 @@ public:
ActorSystem, SelfId());
}
+ WhiteBoardService = NNodeWhiteboard::MakeNodeWhiteboardServiceId(SelfId().NodeId());
+
Become(&TKqpResourceManagerActor::WorkState);
+ AskSelfNodeInfo();
+ SendWhiteboardRequest();
+
auto rm = new TKqpNodeResourceManager(SelfId().NodeId(), this);
with_lock (ResourceManagers.Lock) {
ResourceManagers.ByNodeId[SelfId().NodeId()] = rm;
@@ -340,6 +350,33 @@ public:
return true;
}
+ void SendWhiteboardRequest() {
+ auto ev = std::make_unique<NNodeWhiteboard::TEvWhiteboard::TEvSystemStateRequest>();
+ Send(WhiteBoardService, ev.release(), IEventHandle::FlagTrackDelivery, SelfId().NodeId());
+ }
+
+ void Handle(NNodeWhiteboard::TEvWhiteboard::TEvSystemStateResponse::TPtr& ev) {
+ const auto& record = ev->Get()->Record;
+ if (record.SystemStateInfoSize() != 1) {
+ LOG_C("Unexpected whiteboard info");
+ return;
+ }
+
+ const auto& info = record.GetSystemStateInfo(0);
+ if (AppData()->UserPoolId >= info.PoolStatsSize()) {
+ LOG_C("Unexpected whiteboard info: pool size is smaller than user pool id"
+ << ", pool size: " << info.PoolStatsSize()
+ << ", user pool id: " << AppData()->UserPoolId);
+ return;
+ }
+
+ const auto& pool = info.GetPoolStats(AppData()->UserPoolId);
+
+ LOG_C("Received node white board pool stats: " << pool.usage());
+ ProxyNodeResources.SetCpuUsage(pool.usage());
+ ProxyNodeResources.SetThreads(pool.threads());
+ }
+
bool AllocateResources(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources,
TResourcesAllocatedCallback&& onSuccess, TNotEnoughtResourcesCallback&& onFail, TDuration timeout) override
{
@@ -588,9 +625,12 @@ public:
private:
STATEFN(WorkState) {
switch (ev->GetTypeRewrite()) {
+ hFunc(TEvInterconnect::TEvNodeInfo, Handle);
hFunc(TEvPrivate::TEvPublishResources, HandleWork);
hFunc(TEvPrivate::TEvSchedulePublishResources, HandleWork);
hFunc(TEvPrivate::TEvTakeResourcesSnapshot, HandleWork);
+ hFunc(NNodeWhiteboard::TEvWhiteboard::TEvSystemStateResponse, Handle);
+ hFunc(TEvKqp::TEvKqpProxyPublishRequest, HandleWork);
hFunc(TEvResourceBroker::TEvConfigResponse, HandleWork);
hFunc(TEvResourceBroker::TEvResourceBrokerResponse, HandleWork);
hFunc(TEvTenantPool::TEvTenantPoolStatus, HandleWork);
@@ -617,6 +657,10 @@ private:
PublishResourceUsage("alloc");
}
+ void HandleWork(TEvKqp::TEvKqpProxyPublishRequest::TPtr&) {
+ PublishResourceUsage("kqp_proxy");
+ }
+
void HandleWork(TEvPrivate::TEvTakeResourcesSnapshot::TPtr& ev) {
if (WbState.StateStorageGroupId == std::numeric_limits<ui32>::max()) {
LOG_E("Can not take resources snapshot, ssGroupId not set. Tenant: " << WbState.Tenant
@@ -652,6 +696,21 @@ private:
}
}
+ void AskSelfNodeInfo() {
+ Send(GetNameserviceActorId(), new TEvInterconnect::TEvGetNode(SelfId().NodeId()));
+ }
+
+ void Handle(TEvInterconnect::TEvNodeInfo::TPtr& ev) {
+ auto selfDataCenterId = TString();
+ if (const auto& node = ev->Get()->Node) {
+ selfDataCenterId = node->Location.GetDataCenterId();
+ }
+
+ ProxyNodeResources.SetDataCenterNumId(DataCenterFromString(selfDataCenterId));
+ ProxyNodeResources.SetDataCenterId(selfDataCenterId);
+ PublishResourceUsage("data_center update");
+ }
+
void HandleWork(TEvTenantPool::TEvTenantPoolStatus::TPtr& ev) {
TString tenant;
for (auto &slot : ev->Get()->Record.GetSlots()) {
@@ -867,6 +926,13 @@ private:
NKikimrKqp::TKqpNodeResources payload;
payload.SetNodeId(SelfId().NodeId());
payload.SetTimestamp(now.Seconds());
+ auto* proxyNodeResources = payload.MutableKqpProxyNodeResources();
+ if (KqpProxySharedResources) {
+ ProxyNodeResources.SetActiveWorkersCount(KqpProxySharedResources->AtomicLocalSessionCount.load());
+ } else {
+ LOG_D("Don't set KqpProxySharedResources");
+ }
+ *proxyNodeResources = ProxyNodeResources;
ActorIdToProto(MakeKqpResourceManagerServiceID(SelfId().NodeId()), payload.MutableResourceManagerActorId()); // legacy
with_lock (Lock) {
payload.SetAvailableComputeActors(ExecutionUnitsResource.Available()); // legacy
@@ -924,15 +990,21 @@ private:
// pattern cache for different actors
std::shared_ptr<NMiniKQL::TComputationPatternLRUCache> PatternCache;
+
+ std::shared_ptr<TKqpProxySharedResources> KqpProxySharedResources;
+ NKikimrKqp::TKqpProxyNodeResources ProxyNodeResources;
+
+ TActorId WhiteBoardService;
};
} // namespace NRm
NActors::IActor* CreateKqpResourceManagerActor(const NKikimrConfig::TTableServiceConfig::TResourceManager& config,
- TIntrusivePtr<TKqpCounters> counters, NActors::TActorId resourceBroker)
+ TIntrusivePtr<TKqpCounters> counters, NActors::TActorId resourceBroker,
+ std::shared_ptr<TKqpProxySharedResources> kqpProxySharedResources)
{
- return new NRm::TKqpResourceManagerActor(config, counters, resourceBroker);
+ return new NRm::TKqpResourceManagerActor(config, counters, resourceBroker, std::move(kqpProxySharedResources));
}
NRm::IKqpResourceManager* GetKqpResourceManager(TMaybe<ui32> _nodeId) {
diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.h b/ydb/core/kqp/rm_service/kqp_rm_service.h
index d83d53ab56..49f0d0db90 100644
--- a/ydb/core/kqp/rm_service/kqp_rm_service.h
+++ b/ydb/core/kqp/rm_service/kqp_rm_service.h
@@ -100,8 +100,13 @@ NActors::IActor* CreateTakeResourcesSnapshotActor(
} // namespace NRm
+struct TKqpProxySharedResources {
+ std::atomic<ui32> AtomicLocalSessionCount{0};
+};
+
NActors::IActor* CreateKqpResourceManagerActor(const NKikimrConfig::TTableServiceConfig::TResourceManager& config,
- TIntrusivePtr<TKqpCounters> counters, NActors::TActorId resourceBroker = {});
+ TIntrusivePtr<TKqpCounters> counters, NActors::TActorId resourceBroker = {},
+ std::shared_ptr<TKqpProxySharedResources> kqpProxySharedResources = nullptr);
NRm::IKqpResourceManager* GetKqpResourceManager(TMaybe<ui32> nodeId = Nothing());
NRm::IKqpResourceManager* TryGetKqpResourceManager(TMaybe<ui32> nodeId = Nothing());
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 596ca448cb..6ffb6805d8 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1260,6 +1260,7 @@ message TTableServiceConfig {
optional bool EnableKqpDataQueryStreamLookup = 31 [default = false];
optional TExecuterRetriesConfig ExecuterRetriesConfig = 32;
optional bool EnableKqpDataQueryStreamPointLookup = 33 [default = false];
+ optional bool EnablePublishKqpProxyByRM = 34 [default = false];
};
// Config describes immediate controls and allows
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index 887c91aeb8..af80aa3eb1 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -522,6 +522,8 @@ message TKqpNodeResources {
}
repeated TMemory Memory = 8;
optional uint32 ExecutionUnits = 9;
+
+ optional TKqpProxyNodeResources KqpProxyNodeResources = 10;
}
/// Scans
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index 1fe50ed10c..b3038e1d88 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -753,18 +753,18 @@ namespace Tests {
TActorId healthCheckId = Runtime->Register(healthCheck, nodeIdx);
Runtime->RegisterService(NHealthCheck::MakeHealthCheckID(), healthCheckId, nodeIdx);
}
-
{
- IActor* kqpRmService = NKqp::CreateKqpResourceManagerActor(Settings->AppConfig.GetTableServiceConfig().GetResourceManager(), nullptr);
+ auto kqpProxySharedResources = std::make_shared<NKqp::TKqpProxySharedResources>();
+
+ IActor* kqpRmService = NKqp::CreateKqpResourceManagerActor(
+ Settings->AppConfig.GetTableServiceConfig().GetResourceManager(), nullptr, {}, kqpProxySharedResources);
TActorId kqpRmServiceId = Runtime->Register(kqpRmService, nodeIdx);
Runtime->RegisterService(NKqp::MakeKqpRmServiceID(Runtime->GetNodeId(nodeIdx)), kqpRmServiceId, nodeIdx);
- }
-
- {
+
IActor* kqpProxyService = NKqp::CreateKqpProxyService(Settings->AppConfig.GetLogConfig(),
Settings->AppConfig.GetTableServiceConfig(),
TVector<NKikimrKqp::TKqpSetting>(Settings->KqpSettings),
- nullptr);
+ nullptr, std::move(kqpProxySharedResources));
TActorId kqpProxyServiceId = Runtime->Register(kqpProxyService, nodeIdx);
Runtime->RegisterService(NKqp::MakeKqpProxyID(Runtime->GetNodeId(nodeIdx)), kqpProxyServiceId, nodeIdx);
}