diff options
author | gvit <gvit@ydb.tech> | 2022-10-02 18:46:07 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2022-10-02 18:46:07 +0300 |
commit | d67ad9fb32df5591cfb9ab995bcbe20830f2f96f (patch) | |
tree | 438d3a733d14d0c770700dff5a5e6fc0c5814a96 | |
parent | 4c3d3d183e0584e0dbe29ccffe4a6788c569f4d9 (diff) | |
download | ydb-d67ad9fb32df5591cfb9ab995bcbe20830f2f96f.tar.gz |
refactoring resource manager actors and planner process
-rw-r--r-- | ydb/core/kqp/rm/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/core/kqp/rm/kqp_resource_tracker.cpp | 107 | ||||
-rw-r--r-- | ydb/core/kqp/rm/kqp_rm.cpp | 71 | ||||
-rw-r--r-- | ydb/core/kqp/rm/kqp_rm.h | 9 |
4 files changed, 119 insertions, 69 deletions
diff --git a/ydb/core/kqp/rm/CMakeLists.txt b/ydb/core/kqp/rm/CMakeLists.txt index c5c4bcab4f5..b280ff2c469 100644 --- a/ydb/core/kqp/rm/CMakeLists.txt +++ b/ydb/core/kqp/rm/CMakeLists.txt @@ -27,6 +27,7 @@ target_link_libraries(core-kqp-rm PUBLIC ydb-core-tablet ) target_sources(core-kqp-rm PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm/kqp_resource_tracker.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm/kqp_resource_estimation.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm/kqp_snapshot_manager.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm/kqp_rm.cpp diff --git a/ydb/core/kqp/rm/kqp_resource_tracker.cpp b/ydb/core/kqp/rm/kqp_resource_tracker.cpp new file mode 100644 index 00000000000..8bf97523b24 --- /dev/null +++ b/ydb/core/kqp/rm/kqp_resource_tracker.cpp @@ -0,0 +1,107 @@ +#include "kqp_rm.h" + +#include <ydb/core/base/statestorage.h> + +#include <library/cpp/actors/core/actor_bootstrapped.h> + +#include <ydb/core/cms/console/configs_dispatcher.h> +#include <ydb/core/cms/console/console.h> +#include <ydb/core/mind/tenant_pool.h> +#include <ydb/core/mon/mon.h> +#include <ydb/core/tablet/resource_broker.h> +#include <ydb/core/kqp/kqp.h> + +#include <library/cpp/monlib/service/pages/templates.h> + +#include <ydb/library/yql/utils/yql_panic.h> + + +namespace NKikimr::NKqp::NRm { + +namespace { + +#define LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream) +#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream) +#define LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream) +#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream) +#define LOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream) +#define LOG_N(stream) LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream) + +using namespace NKikimr; +using namespace NActors; + + +class TTakeResourcesSnapshotActor : public TActorBootstrapped<TTakeResourcesSnapshotActor> { +public: + TTakeResourcesSnapshotActor(const TString& boardPath, ui32 stateStorageGroupId, + TOnResourcesSnapshotCallback&& callback) + : BoardPath(boardPath) + , StateStorageGroupId(stateStorageGroupId) + , Callback(std::move(callback)) {} + + void Bootstrap() { + auto boardLookup = CreateBoardLookupActor(BoardPath, SelfId(), StateStorageGroupId, EBoardLookupMode::Majority, + false, false); + BoardLookupId = Register(boardLookup); + + Become(&TTakeResourcesSnapshotActor::WorkState); + } + + STATEFN(WorkState) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvStateStorage::TEvBoardInfo, HandleWait); + cFunc(TEvents::TSystem::Poison, PassAway); + default: + LOG_C("Unexpected event type: " << ev->GetTypeRewrite() + << ", event: " << (ev->HasEvent() ? ev->GetBase()->ToString().data() : "<serialized>")); + } + } + + void HandleWait(TEvStateStorage::TEvBoardInfo::TPtr& ev) { + BoardLookupId = {}; + + TEvStateStorage::TEvBoardInfo* event = ev->Get(); + TVector<NKikimrKqp::TKqpNodeResources> resources; + + if (event->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok) { + LOG_I("WhiteBoard entries: " << event->InfoEntries.size()); + resources.resize(event->InfoEntries.size()); + + int i = 0; + for (auto& [_, entry] : event->InfoEntries) { + Y_PROTOBUF_SUPPRESS_NODISCARD resources[i].ParseFromString(entry.Payload); + LOG_D("WhiteBoard [" << i << "]: " << resources[i].ShortDebugString()); + i++; + } + } else { + LOG_E("WhiteBoard error: " << (int) event->Status << ", path: " << event->Path); + } + + Callback(std::move(resources)); + + PassAway(); + } + + void PassAway() { + if (BoardLookupId) { + Send(BoardLookupId, new TEvents::TEvPoison); + } + IActor::PassAway(); + } + +private: + const TString BoardPath; + const ui32 StateStorageGroupId; + TOnResourcesSnapshotCallback Callback; + TActorId BoardLookupId; +}; + +} // namespace + +NActors::IActor* CreateTakeResourcesSnapshotActor( + const TString& boardPath, ui32 stateStorageGroupId,TOnResourcesSnapshotCallback&& callback) +{ + return new TTakeResourcesSnapshotActor(boardPath, stateStorageGroupId, std::move(callback)); +} + +} // namespace NKikimr::NKqp::NRm diff --git a/ydb/core/kqp/rm/kqp_rm.cpp b/ydb/core/kqp/rm/kqp_rm.cpp index e3fa8998c95..13ccc51f7c3 100644 --- a/ydb/core/kqp/rm/kqp_rm.cpp +++ b/ydb/core/kqp/rm/kqp_rm.cpp @@ -100,71 +100,6 @@ struct TTxStatesBucket { constexpr ui64 BucketsCount = 64; -class TTakeResourcesSnapshotActor : public TActorBootstrapped<TTakeResourcesSnapshotActor> { -public: - TTakeResourcesSnapshotActor(const TString& boardPath, ui32 stateStorageGroupId, - std::function<void(TVector<NKikimrKqp::TKqpNodeResources>&&)>&& callback) - : BoardPath(boardPath) - , StateStorageGroupId(stateStorageGroupId) - , Callback(std::move(callback)) {} - - void Bootstrap() { - auto boardLookup = CreateBoardLookupActor(BoardPath, SelfId(), StateStorageGroupId, EBoardLookupMode::Majority, - false, false); - BoardLookupId = Register(boardLookup); - - Become(&TTakeResourcesSnapshotActor::WorkState); - } - - STATEFN(WorkState) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvStateStorage::TEvBoardInfo, HandleWait); - cFunc(TEvents::TSystem::Poison, PassAway); - default: - LOG_C("Unexpected event type: " << ev->GetTypeRewrite() - << ", event: " << (ev->HasEvent() ? ev->GetBase()->ToString().data() : "<serialized>")); - } - } - - void HandleWait(TEvStateStorage::TEvBoardInfo::TPtr& ev) { - BoardLookupId = {}; - - TEvStateStorage::TEvBoardInfo* event = ev->Get(); - TVector<NKikimrKqp::TKqpNodeResources> resources; - - if (event->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok) { - LOG_I("WhiteBoard entries: " << event->InfoEntries.size()); - resources.resize(event->InfoEntries.size()); - - int i = 0; - for (auto& [_, entry] : event->InfoEntries) { - Y_PROTOBUF_SUPPRESS_NODISCARD resources[i].ParseFromString(entry.Payload); - LOG_D("WhiteBoard [" << i << "]: " << resources[i].ShortDebugString()); - i++; - } - } else { - LOG_E("WhiteBoard error: " << (int) event->Status << ", path: " << event->Path); - } - - Callback(std::move(resources)); - - PassAway(); - } - - void PassAway() { - if (BoardLookupId) { - Send(BoardLookupId, new TEvents::TEvPoison); - } - IActor::PassAway(); - } - -private: - const TString BoardPath; - const ui32 StateStorageGroupId; - std::function<void(TVector<NKikimrKqp::TKqpNodeResources>&&)> Callback; - TActorId BoardLookupId; -}; - struct TKqpNodeResourceManager { ui32 NodeId; IKqpResourceManager* Instance; @@ -567,7 +502,7 @@ public: FireResourcesPublishing(); } - void RequestClusterResourcesInfo(std::function<void(TVector<NKikimrKqp::TKqpNodeResources>&&)>&& callback) override { + void RequestClusterResourcesInfo(TOnResourcesSnapshotCallback&& callback) override { LOG_DEBUG_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, "Schedule Snapshot request"); auto ev = MakeHolder<TEvPrivate::TEvTakeResourcesSnapshot>(); ev->Callback = std::move(callback); @@ -647,8 +582,8 @@ private: LOG_D("Create Snapshot actor, board: " << WbState.BoardPath << ", ssGroupId: " << WbState.StateStorageGroupId); - Register(new TTakeResourcesSnapshotActor( - WbState.BoardPath, WbState.StateStorageGroupId, std::move(ev->Get()->Callback))); + Register( + CreateTakeResourcesSnapshotActor(WbState.BoardPath, WbState.StateStorageGroupId, std::move(ev->Get()->Callback))); } void HandleWork(TEvResourceBroker::TEvConfigResponse::TPtr& ev) { diff --git a/ydb/core/kqp/rm/kqp_rm.h b/ydb/core/kqp/rm/kqp_rm.h index f52eed96ac7..fad2fe81118 100644 --- a/ydb/core/kqp/rm/kqp_rm.h +++ b/ydb/core/kqp/rm/kqp_rm.h @@ -28,6 +28,8 @@ enum EKqpMemoryPool : ui32 { Count = 3 }; +using TOnResourcesSnapshotCallback = std::function<void(TVector<NKikimrKqp::TKqpNodeResources>&&)>; + /// resources request struct TKqpResourcesRequest { ui32 ExecutionUnits = 0; @@ -83,7 +85,7 @@ public: virtual void NotifyExternalResourcesAllocated(ui64 queryId, ui64 taskId, const TKqpResourcesRequest& resources) = 0; virtual void NotifyExternalResourcesFreed(ui64 queryId, ui64 taskId) = 0; - virtual void RequestClusterResourcesInfo(std::function<void(TVector<NKikimrKqp::TKqpNodeResources>&& resources)>&& callback) = 0; + virtual void RequestClusterResourcesInfo(TOnResourcesSnapshotCallback&& callback) = 0; virtual TKqpLocalNodeResources GetLocalResources() const = 0; virtual NKikimrConfig::TTableServiceConfig::TResourceManager GetConfig() = 0; @@ -92,6 +94,11 @@ public: virtual std::shared_ptr<NMiniKQL::TComputationPatternLRUCache> GetComputeActorPatternCache() = 0; }; + +NActors::IActor* CreateTakeResourcesSnapshotActor( + const TString& boardPath, ui32 stateStorageGroupId, + std::function<void(TVector<NKikimrKqp::TKqpNodeResources>&&)>&& callback); + } // namespace NRm NActors::IActor* CreateKqpResourceManagerActor(const NKikimrConfig::TTableServiceConfig::TResourceManager& config, |