aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2022-10-02 18:46:07 +0300
committergvit <gvit@ydb.tech>2022-10-02 18:46:07 +0300
commitd67ad9fb32df5591cfb9ab995bcbe20830f2f96f (patch)
tree438d3a733d14d0c770700dff5a5e6fc0c5814a96
parent4c3d3d183e0584e0dbe29ccffe4a6788c569f4d9 (diff)
downloadydb-d67ad9fb32df5591cfb9ab995bcbe20830f2f96f.tar.gz
refactoring resource manager actors and planner process
-rw-r--r--ydb/core/kqp/rm/CMakeLists.txt1
-rw-r--r--ydb/core/kqp/rm/kqp_resource_tracker.cpp107
-rw-r--r--ydb/core/kqp/rm/kqp_rm.cpp71
-rw-r--r--ydb/core/kqp/rm/kqp_rm.h9
4 files changed, 119 insertions, 69 deletions
diff --git a/ydb/core/kqp/rm/CMakeLists.txt b/ydb/core/kqp/rm/CMakeLists.txt
index c5c4bcab4f5..b280ff2c469 100644
--- a/ydb/core/kqp/rm/CMakeLists.txt
+++ b/ydb/core/kqp/rm/CMakeLists.txt
@@ -27,6 +27,7 @@ target_link_libraries(core-kqp-rm PUBLIC
ydb-core-tablet
)
target_sources(core-kqp-rm PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm/kqp_resource_tracker.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm/kqp_resource_estimation.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm/kqp_snapshot_manager.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm/kqp_rm.cpp
diff --git a/ydb/core/kqp/rm/kqp_resource_tracker.cpp b/ydb/core/kqp/rm/kqp_resource_tracker.cpp
new file mode 100644
index 00000000000..8bf97523b24
--- /dev/null
+++ b/ydb/core/kqp/rm/kqp_resource_tracker.cpp
@@ -0,0 +1,107 @@
+#include "kqp_rm.h"
+
+#include <ydb/core/base/statestorage.h>
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+
+#include <ydb/core/cms/console/configs_dispatcher.h>
+#include <ydb/core/cms/console/console.h>
+#include <ydb/core/mind/tenant_pool.h>
+#include <ydb/core/mon/mon.h>
+#include <ydb/core/tablet/resource_broker.h>
+#include <ydb/core/kqp/kqp.h>
+
+#include <library/cpp/monlib/service/pages/templates.h>
+
+#include <ydb/library/yql/utils/yql_panic.h>
+
+
+namespace NKikimr::NKqp::NRm {
+
+namespace {
+
+#define LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream)
+#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream)
+#define LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream)
+#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream)
+#define LOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream)
+#define LOG_N(stream) LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream)
+
+using namespace NKikimr;
+using namespace NActors;
+
+
+class TTakeResourcesSnapshotActor : public TActorBootstrapped<TTakeResourcesSnapshotActor> {
+public:
+ TTakeResourcesSnapshotActor(const TString& boardPath, ui32 stateStorageGroupId,
+ TOnResourcesSnapshotCallback&& callback)
+ : BoardPath(boardPath)
+ , StateStorageGroupId(stateStorageGroupId)
+ , Callback(std::move(callback)) {}
+
+ void Bootstrap() {
+ auto boardLookup = CreateBoardLookupActor(BoardPath, SelfId(), StateStorageGroupId, EBoardLookupMode::Majority,
+ false, false);
+ BoardLookupId = Register(boardLookup);
+
+ Become(&TTakeResourcesSnapshotActor::WorkState);
+ }
+
+ STATEFN(WorkState) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvStateStorage::TEvBoardInfo, HandleWait);
+ cFunc(TEvents::TSystem::Poison, PassAway);
+ default:
+ LOG_C("Unexpected event type: " << ev->GetTypeRewrite()
+ << ", event: " << (ev->HasEvent() ? ev->GetBase()->ToString().data() : "<serialized>"));
+ }
+ }
+
+ void HandleWait(TEvStateStorage::TEvBoardInfo::TPtr& ev) {
+ BoardLookupId = {};
+
+ TEvStateStorage::TEvBoardInfo* event = ev->Get();
+ TVector<NKikimrKqp::TKqpNodeResources> resources;
+
+ if (event->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok) {
+ LOG_I("WhiteBoard entries: " << event->InfoEntries.size());
+ resources.resize(event->InfoEntries.size());
+
+ int i = 0;
+ for (auto& [_, entry] : event->InfoEntries) {
+ Y_PROTOBUF_SUPPRESS_NODISCARD resources[i].ParseFromString(entry.Payload);
+ LOG_D("WhiteBoard [" << i << "]: " << resources[i].ShortDebugString());
+ i++;
+ }
+ } else {
+ LOG_E("WhiteBoard error: " << (int) event->Status << ", path: " << event->Path);
+ }
+
+ Callback(std::move(resources));
+
+ PassAway();
+ }
+
+ void PassAway() {
+ if (BoardLookupId) {
+ Send(BoardLookupId, new TEvents::TEvPoison);
+ }
+ IActor::PassAway();
+ }
+
+private:
+ const TString BoardPath;
+ const ui32 StateStorageGroupId;
+ TOnResourcesSnapshotCallback Callback;
+ TActorId BoardLookupId;
+};
+
+} // namespace
+
+NActors::IActor* CreateTakeResourcesSnapshotActor(
+ const TString& boardPath, ui32 stateStorageGroupId,TOnResourcesSnapshotCallback&& callback)
+{
+ return new TTakeResourcesSnapshotActor(boardPath, stateStorageGroupId, std::move(callback));
+}
+
+} // namespace NKikimr::NKqp::NRm
diff --git a/ydb/core/kqp/rm/kqp_rm.cpp b/ydb/core/kqp/rm/kqp_rm.cpp
index e3fa8998c95..13ccc51f7c3 100644
--- a/ydb/core/kqp/rm/kqp_rm.cpp
+++ b/ydb/core/kqp/rm/kqp_rm.cpp
@@ -100,71 +100,6 @@ struct TTxStatesBucket {
constexpr ui64 BucketsCount = 64;
-class TTakeResourcesSnapshotActor : public TActorBootstrapped<TTakeResourcesSnapshotActor> {
-public:
- TTakeResourcesSnapshotActor(const TString& boardPath, ui32 stateStorageGroupId,
- std::function<void(TVector<NKikimrKqp::TKqpNodeResources>&&)>&& callback)
- : BoardPath(boardPath)
- , StateStorageGroupId(stateStorageGroupId)
- , Callback(std::move(callback)) {}
-
- void Bootstrap() {
- auto boardLookup = CreateBoardLookupActor(BoardPath, SelfId(), StateStorageGroupId, EBoardLookupMode::Majority,
- false, false);
- BoardLookupId = Register(boardLookup);
-
- Become(&TTakeResourcesSnapshotActor::WorkState);
- }
-
- STATEFN(WorkState) {
- switch (ev->GetTypeRewrite()) {
- hFunc(TEvStateStorage::TEvBoardInfo, HandleWait);
- cFunc(TEvents::TSystem::Poison, PassAway);
- default:
- LOG_C("Unexpected event type: " << ev->GetTypeRewrite()
- << ", event: " << (ev->HasEvent() ? ev->GetBase()->ToString().data() : "<serialized>"));
- }
- }
-
- void HandleWait(TEvStateStorage::TEvBoardInfo::TPtr& ev) {
- BoardLookupId = {};
-
- TEvStateStorage::TEvBoardInfo* event = ev->Get();
- TVector<NKikimrKqp::TKqpNodeResources> resources;
-
- if (event->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok) {
- LOG_I("WhiteBoard entries: " << event->InfoEntries.size());
- resources.resize(event->InfoEntries.size());
-
- int i = 0;
- for (auto& [_, entry] : event->InfoEntries) {
- Y_PROTOBUF_SUPPRESS_NODISCARD resources[i].ParseFromString(entry.Payload);
- LOG_D("WhiteBoard [" << i << "]: " << resources[i].ShortDebugString());
- i++;
- }
- } else {
- LOG_E("WhiteBoard error: " << (int) event->Status << ", path: " << event->Path);
- }
-
- Callback(std::move(resources));
-
- PassAway();
- }
-
- void PassAway() {
- if (BoardLookupId) {
- Send(BoardLookupId, new TEvents::TEvPoison);
- }
- IActor::PassAway();
- }
-
-private:
- const TString BoardPath;
- const ui32 StateStorageGroupId;
- std::function<void(TVector<NKikimrKqp::TKqpNodeResources>&&)> Callback;
- TActorId BoardLookupId;
-};
-
struct TKqpNodeResourceManager {
ui32 NodeId;
IKqpResourceManager* Instance;
@@ -567,7 +502,7 @@ public:
FireResourcesPublishing();
}
- void RequestClusterResourcesInfo(std::function<void(TVector<NKikimrKqp::TKqpNodeResources>&&)>&& callback) override {
+ void RequestClusterResourcesInfo(TOnResourcesSnapshotCallback&& callback) override {
LOG_DEBUG_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, "Schedule Snapshot request");
auto ev = MakeHolder<TEvPrivate::TEvTakeResourcesSnapshot>();
ev->Callback = std::move(callback);
@@ -647,8 +582,8 @@ private:
LOG_D("Create Snapshot actor, board: " << WbState.BoardPath << ", ssGroupId: " << WbState.StateStorageGroupId);
- Register(new TTakeResourcesSnapshotActor(
- WbState.BoardPath, WbState.StateStorageGroupId, std::move(ev->Get()->Callback)));
+ Register(
+ CreateTakeResourcesSnapshotActor(WbState.BoardPath, WbState.StateStorageGroupId, std::move(ev->Get()->Callback)));
}
void HandleWork(TEvResourceBroker::TEvConfigResponse::TPtr& ev) {
diff --git a/ydb/core/kqp/rm/kqp_rm.h b/ydb/core/kqp/rm/kqp_rm.h
index f52eed96ac7..fad2fe81118 100644
--- a/ydb/core/kqp/rm/kqp_rm.h
+++ b/ydb/core/kqp/rm/kqp_rm.h
@@ -28,6 +28,8 @@ enum EKqpMemoryPool : ui32 {
Count = 3
};
+using TOnResourcesSnapshotCallback = std::function<void(TVector<NKikimrKqp::TKqpNodeResources>&&)>;
+
/// resources request
struct TKqpResourcesRequest {
ui32 ExecutionUnits = 0;
@@ -83,7 +85,7 @@ public:
virtual void NotifyExternalResourcesAllocated(ui64 queryId, ui64 taskId, const TKqpResourcesRequest& resources) = 0;
virtual void NotifyExternalResourcesFreed(ui64 queryId, ui64 taskId) = 0;
- virtual void RequestClusterResourcesInfo(std::function<void(TVector<NKikimrKqp::TKqpNodeResources>&& resources)>&& callback) = 0;
+ virtual void RequestClusterResourcesInfo(TOnResourcesSnapshotCallback&& callback) = 0;
virtual TKqpLocalNodeResources GetLocalResources() const = 0;
virtual NKikimrConfig::TTableServiceConfig::TResourceManager GetConfig() = 0;
@@ -92,6 +94,11 @@ public:
virtual std::shared_ptr<NMiniKQL::TComputationPatternLRUCache> GetComputeActorPatternCache() = 0;
};
+
+NActors::IActor* CreateTakeResourcesSnapshotActor(
+ const TString& boardPath, ui32 stateStorageGroupId,
+ std::function<void(TVector<NKikimrKqp::TKqpNodeResources>&&)>&& callback);
+
} // namespace NRm
NActors::IActor* CreateKqpResourceManagerActor(const NKikimrConfig::TTableServiceConfig::TResourceManager& config,