diff options
author | shumkovnd <shumkovnd@yandex-team.com> | 2023-06-22 15:30:02 +0300 |
---|---|---|
committer | shumkovnd <shumkovnd@yandex-team.com> | 2023-06-22 15:30:02 +0300 |
commit | c3748121232aaab487cd1623b9ce7e225f904c22 (patch) | |
tree | 5ce88417926ec0e2d48f0777dc117a4a3f4982a4 | |
parent | 17b977f039bd579d528a4fdd5a1d635088f99b9c (diff) | |
download | ydb-c3748121232aaab487cd1623b9ce7e225f904c22.tar.gz |
Add TKqpResourceInfoExchangerActor
-rw-r--r-- | ydb/core/kqp/common/simple/kqp_event_ids.h | 7 | ||||
-rw-r--r-- | ydb/core/kqp/counters/kqp_counters.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/counters/kqp_counters.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/CMakeLists.darwin-x86_64.txt | 2 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/CMakeLists.linux-aarch64.txt | 2 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/CMakeLists.linux-x86_64.txt | 2 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/CMakeLists.windows-x86_64.txt | 2 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp | 524 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/kqp_rm_service.cpp | 77 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/kqp_rm_service.h | 28 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/kqp_rm_ut.cpp | 346 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/ya.make | 2 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 1 | ||||
-rw-r--r-- | ydb/core/protos/kqp.proto | 17 |
14 files changed, 926 insertions, 87 deletions
diff --git a/ydb/core/kqp/common/simple/kqp_event_ids.h b/ydb/core/kqp/common/simple/kqp_event_ids.h index 35648143ad2..2f800dcb005 100644 --- a/ydb/core/kqp/common/simple/kqp_event_ids.h +++ b/ydb/core/kqp/common/simple/kqp_event_ids.h @@ -141,5 +141,12 @@ struct TKqpScriptExecutionEvents { }; }; +struct TKqpResourceInfoExchangerEvents { + enum EKqpResourceInfoExchangerEvents { + EvPublishResource = EventSpaceBegin(TKikimrEvents::ES_KQP) + 600, + EvSendResources, + }; +}; + } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp index fb3f5bcd43d..6d352b049b2 100644 --- a/ydb/core/kqp/counters/kqp_counters.cpp +++ b/ydb/core/kqp/counters/kqp_counters.cpp @@ -757,6 +757,8 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co RmNotEnoughComputeActors = KqpGroup->GetCounter("RM/NotEnoughComputeActors", true); RmExtraMemAllocs = KqpGroup->GetCounter("RM/ExtraMemAllocs", true); RmInternalError = KqpGroup->GetCounter("RM/InternalError", true); + RmSnapshotLatency = KqpGroup->GetHistogram( + "RM/SnapshotLatency", NMonitoring::ExponentialHistogram(20, 2, 1)); /* Spilling */ SpillingWriteBlobs = KqpGroup->GetCounter("Spilling/WriteBlobs", true); diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h index ec906a81fc6..0d2943e0fbe 100644 --- a/ydb/core/kqp/counters/kqp_counters.h +++ b/ydb/core/kqp/counters/kqp_counters.h @@ -356,6 +356,7 @@ public: ::NMonitoring::TDynamicCounters::TCounterPtr RmNotEnoughComputeActors; ::NMonitoring::TDynamicCounters::TCounterPtr RmExtraMemAllocs; ::NMonitoring::TDynamicCounters::TCounterPtr RmInternalError; + NMonitoring::THistogramPtr RmSnapshotLatency; // Spilling counters ::NMonitoring::TDynamicCounters::TCounterPtr SpillingWriteBlobs; diff --git a/ydb/core/kqp/rm_service/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/rm_service/CMakeLists.darwin-x86_64.txt index d7f93026d26..4f01ea52d22 100644 --- a/ydb/core/kqp/rm_service/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/rm_service/CMakeLists.darwin-x86_64.txt @@ -27,10 +27,12 @@ target_link_libraries(core-kqp-rm_service PUBLIC ydb-core-protos ydb-core-tablet ydb-core-node_whiteboard + ydb-core-util ) target_sources(core-kqp-rm_service PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_resource_tracker.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_resource_estimation.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_rm_service.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp ) diff --git a/ydb/core/kqp/rm_service/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/rm_service/CMakeLists.linux-aarch64.txt index 78bf95bf0fc..8b9fb7fe00b 100644 --- a/ydb/core/kqp/rm_service/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/rm_service/CMakeLists.linux-aarch64.txt @@ -28,10 +28,12 @@ target_link_libraries(core-kqp-rm_service PUBLIC ydb-core-protos ydb-core-tablet ydb-core-node_whiteboard + ydb-core-util ) target_sources(core-kqp-rm_service PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_resource_tracker.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_resource_estimation.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_rm_service.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp ) diff --git a/ydb/core/kqp/rm_service/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/rm_service/CMakeLists.linux-x86_64.txt index 78bf95bf0fc..8b9fb7fe00b 100644 --- a/ydb/core/kqp/rm_service/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/rm_service/CMakeLists.linux-x86_64.txt @@ -28,10 +28,12 @@ target_link_libraries(core-kqp-rm_service PUBLIC ydb-core-protos ydb-core-tablet ydb-core-node_whiteboard + ydb-core-util ) target_sources(core-kqp-rm_service PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_resource_tracker.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_resource_estimation.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_rm_service.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp ) diff --git a/ydb/core/kqp/rm_service/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/rm_service/CMakeLists.windows-x86_64.txt index d7f93026d26..4f01ea52d22 100644 --- a/ydb/core/kqp/rm_service/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/rm_service/CMakeLists.windows-x86_64.txt @@ -27,10 +27,12 @@ target_link_libraries(core-kqp-rm_service PUBLIC ydb-core-protos ydb-core-tablet ydb-core-node_whiteboard + ydb-core-util ) target_sources(core-kqp-rm_service PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_resource_tracker.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_resource_estimation.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_rm_service.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp ) diff --git a/ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp b/ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp new file mode 100644 index 00000000000..836c61262d0 --- /dev/null +++ b/ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp @@ -0,0 +1,524 @@ +#include "kqp_rm_service.h" + +#include <ydb/core/base/location.h> +#include <ydb/core/base/statestorage.h> +#include <ydb/core/kqp/common/kqp.h> +#include <ydb/core/mind/tenant_pool.h> +#include <ydb/core/kqp/common/kqp_event_ids.h> + +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/interconnect/interconnect.h> + +#include <ydb/core/util/ulid.h> + +namespace NKikimr { +namespace NKqp { +namespace NRm { + +#define LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream) +#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream) +#define LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream) +#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream) +#define LOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream) +#define LOG_N(stream) LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream) + +class TKqpResourceInfoExchangerActor : public TActorBootstrapped<TKqpResourceInfoExchangerActor> { + using TBase = TActorBootstrapped<TKqpResourceInfoExchangerActor>; + + struct TEvPrivate { + enum EEv { + EvRetrySending = EventSpaceBegin(TEvents::ES_PRIVATE), + EvRetrySubscriber, + }; + + struct TEvRetrySending : + public TEventLocal<TEvRetrySending, EEv::EvRetrySending> { + ui32 NodeId; + + TEvRetrySending(ui32 nodeId) : NodeId(nodeId) { + } + }; + + struct TEvRetrySubscriber : + public TEventLocal<TEvRetrySubscriber, EEv::EvRetrySubscriber> { + }; + }; + + struct TRetryState { + bool IsScheduled = false; + NMonotonic::TMonotonic LastRetryAt = TMonotonic::Zero(); + TDuration CurrentDelay = TDuration::MilliSeconds(10); + }; + + struct TNodeState { + TRetryState RetryState; + NMonotonic::TMonotonic LastUpdateAt = TMonotonic::Zero(); + NKikimrKqp::TResourceExchangeNodeData NodeData; + }; + +public: + + TKqpResourceInfoExchangerActor(TIntrusivePtr<TKqpCounters> counters, + std::shared_ptr<TResourceSnapshotState> resourceSnapshotState) + : ResourceSnapshotState(std::move(resourceSnapshotState)) + , Counters(counters) + { + Y_UNUSED(counters); + } + + void Bootstrap() { + LOG_D("Start KqpResourceInfoExchangerActor at " << SelfId()); + + Become(&TKqpResourceInfoExchangerActor::WorkState); + + Send(MakeTenantPoolRootID(), new TEvents::TEvSubscribe); + } + +private: + + static TString MakeKqpInfoExchangerBoardPath(TStringBuf database) { + return TStringBuilder() << "kqpexch+" << database; + } + + void CreateSubscriber() { + auto& retryState = BoardState.RetryStateForSubscriber; + + auto now = TlsActivationContext->Monotonic(); + if (now - retryState.LastRetryAt < retryState.CurrentDelay) { + auto at = retryState.LastRetryAt + GetRetryDelay(retryState); + Schedule(at - now, new TEvPrivate::TEvRetrySubscriber); + return; + } + + if (BoardState.Subscriber) { + LOG_I("Kill previous info exchanger subscriber for '" << BoardState.Path + << "' at " << BoardState.Subscriber << ", reason: tenant updated"); + Send(BoardState.Subscriber, new TEvents::TEvPoison); + } + BoardState.Subscriber = TActorId(); + + auto subscriber = CreateBoardLookupActor( + BoardState.Path, SelfId(), BoardState.StateStorageGroupId, EBoardLookupMode::Subscription); + BoardState.Subscriber = Register(subscriber); + + retryState.LastRetryAt = now; + } + + void CreatePublisher() { + if (BoardState.Publisher) { + LOG_I("Kill previous info exchanger publisher for '" << BoardState.Path + << "' at " << BoardState.Publisher << ", reason: tenant updated"); + Send(BoardState.Publisher, new TEvents::TEvPoison); + } + BoardState.Publisher = TActorId(); + + auto publisher = CreateBoardPublishActor(BoardState.Path, SelfInfo, SelfId(), + BoardState.StateStorageGroupId, /* ttlMs */ 0, /* reg */ true); + BoardState.Publisher = Register(publisher); + + auto& nodeState = NodesState[SelfId().NodeId()]; + auto& info = nodeState.NodeData; + auto* boardData = info.MutableResourceExchangeBoardData(); + ActorIdToProto(BoardState.Publisher, boardData->MutablePublisher()); + } + + TVector<ui32> UpdateBoardInfo(const TMap<TActorId, TEvStateStorage::TBoardInfoEntry>& infos) { + auto nodeIds = TVector<ui32>(); + + auto now = TlsActivationContext->Monotonic(); + for (const auto& [id, entry] : infos) { + if (entry.Dropped) { + auto nodesStateIt = NodesState.find(id.NodeId()); + if (nodesStateIt == NodesState.end()) { + continue; + } + auto& currentBoardData = nodesStateIt->second.NodeData.GetResourceExchangeBoardData(); + auto currentPublisher = ActorIdFromProto(currentBoardData.GetPublisher()); + if (currentPublisher == id) { + NodesState.erase(nodesStateIt); + } + continue; + } + + NKikimrKqp::TResourceExchangeBoardData boardData; + Y_PROTOBUF_SUPPRESS_NODISCARD boardData.ParseFromString(entry.Payload); + auto owner = ActorIdFromProto(boardData.GetOwner()); + auto ulidString = boardData.GetUlid(); + + auto& nodeState = NodesState[owner.NodeId()]; + auto& currentInfo = nodeState.NodeData; + auto* currentBoardData = currentInfo.MutableResourceExchangeBoardData(); + auto currentUlidString = currentBoardData->GetUlid(); + + if (currentUlidString.empty()) { + *currentBoardData = std::move(boardData); + ActorIdToProto(id, currentBoardData->MutablePublisher()); + nodeIds.push_back(owner.NodeId()); + nodeState.LastUpdateAt = now; + continue; + } + + auto currentUlid = TULID::FromBinary(currentUlidString); + auto ulid = TULID::FromBinary(ulidString); + + if (currentUlid < ulid) { + *currentBoardData = std::move(boardData); + ActorIdToProto(id, currentBoardData->MutablePublisher()); + currentInfo.SetRound(0); + (*currentInfo.MutableResources()) = NKikimrKqp::TKqpNodeResources(); + nodeIds.push_back(owner.NodeId()); + nodeState.LastUpdateAt = now; + continue; + } + } + + return nodeIds; + } + + void UpdateResourceInfo(const TVector<NKikimrKqp::TResourceExchangeNodeData>& infos) { + auto now = TlsActivationContext->Monotonic(); + + for (const auto& info : infos) { + const auto& boardData = info.GetResourceExchangeBoardData(); + auto owner = ActorIdFromProto(boardData.GetOwner()); + + auto nodesStateIt = NodesState.find(owner.NodeId()); + if (nodesStateIt == NodesState.end()) { + continue; + } + + auto& nodeState = nodesStateIt->second; + auto& currentInfo = nodeState.NodeData; + auto* currentBoardData = currentInfo.MutableResourceExchangeBoardData(); + auto currentUlidString = (*currentBoardData).GetUlid(); + + auto round = info.GetRound(); + auto ulidString = boardData.GetUlid(); + auto ulid = TULID::FromBinary(boardData.GetUlid()); + + auto currentUlid = TULID::FromBinary(currentUlidString); + auto currentRound = currentInfo.GetRound(); + + if (currentUlid < ulid || (currentUlid == ulid && currentRound < round)) { + currentInfo = info; + auto latency = now - nodeState.LastUpdateAt; + Counters->RmSnapshotLatency->Collect(latency.MilliSeconds()); + nodeState.LastUpdateAt = now; + continue; + } + } + } + + void UpdateResourceSnapshotState() { + TVector<NKikimrKqp::TKqpNodeResources> resources; + resources.reserve(NodesState.size()); + for (const auto& [id, state] : NodesState) { + auto currentResources = state.NodeData.GetResources(); + if (currentResources.GetNodeId()) { + resources.push_back(currentResources); + } + } + + with_lock (ResourceSnapshotState->Lock) { + ResourceSnapshotState->Snapshot = + std::make_shared<TVector<NKikimrKqp::TKqpNodeResources>>(std::move(resources)); + } + } + + void SendInfos(TVector<ui32> infoNodeIds, TVector<ui32> nodeIds = {}) { + auto& nodeState = NodesState[SelfId().NodeId()]; + nodeState.NodeData.SetRound(Round++); + + if (!nodeIds.empty()) { + auto snapshotMsg = CreateSnapshotMessage(infoNodeIds, true); + + for (const auto& nodeId : nodeIds) { + auto nodesStateIt = NodesState.find(nodeId); + + if (nodesStateIt == NodesState.end()) { + return; + } + SendingToNode(nodesStateIt->second, false, {}, snapshotMsg); + } + } else { + auto snapshotMsg = CreateSnapshotMessage(infoNodeIds, false); + + for (auto& [id, state] : NodesState) { + SendingToNode(state, false, {}, snapshotMsg); + } + } + } + +private: + + void Handle(TEvTenantPool::TEvTenantPoolStatus::TPtr& ev) { + TString tenant; + for (auto &slot : ev->Get()->Record.GetSlots()) { + if (slot.HasAssignedTenant()) { + if (tenant.empty()) { + tenant = slot.GetAssignedTenant(); + } else { + LOG_E("Multiple tenants are served by the node: " << ev->Get()->Record.ShortDebugString()); + } + } + } + + BoardState.Tenant = tenant; + BoardState.Path = MakeKqpInfoExchangerBoardPath(tenant); + + if (auto* domainInfo = AppData()->DomainsInfo->GetDomainByName(ExtractDomain(tenant))) { + BoardState.StateStorageGroupId = domainInfo->DefaultStateStorageGroup; + } else { + BoardState.StateStorageGroupId = std::numeric_limits<ui32>::max(); + } + + if (BoardState.StateStorageGroupId == std::numeric_limits<ui32>::max()) { + LOG_E("Can not find default state storage group for database " << BoardState.Tenant); + return; + } + + TULIDGenerator ulidGen; + auto now = TInstant::Now(); + Ulid = ulidGen.Next(now); + UlidString = Ulid.ToBinary(); + + NKikimrKqp::TResourceExchangeBoardData payload; + ActorIdToProto(SelfId(), payload.MutableOwner()); + payload.SetUlid(UlidString); + + auto nowMonotic = TlsActivationContext->Monotonic(); + + auto& nodeState = NodesState[SelfId().NodeId()]; + nodeState.LastUpdateAt = nowMonotic; + + (*nodeState.NodeData.MutableResourceExchangeBoardData()) = payload; + + SelfInfo = payload.SerializeAsString(); + + CreatePublisher(); + CreateSubscriber(); + + LOG_I("Received tenant pool status for exchanger, serving tenant: " << BoardState.Tenant + << ", board: " << BoardState.Path + << ", ssGroupId: " << BoardState.StateStorageGroupId); + } + + + void Handle(TEvStateStorage::TEvBoardInfo::TPtr& ev) { + if (ev->Get()->Status == TEvStateStorage::TEvBoardInfo::EStatus::NotAvailable) { + LOG_I("Subcriber is not available for info exchanger, serving tenant: " << BoardState.Tenant + << ", board: " << BoardState.Path + << ", ssGroupId: " << BoardState.StateStorageGroupId); + CreateSubscriber(); + return; + } + LOG_D("Get board info from subscriber, serving tenant: " << BoardState.Tenant + << ", board: " << BoardState.Path + << ", ssGroupId: " << BoardState.StateStorageGroupId + << ", with size: " << ev->Get()->InfoEntries.size()); + + auto nodeIds = UpdateBoardInfo(ev->Get()->InfoEntries); + + SendInfos({SelfId().NodeId()}, std::move(nodeIds)); + + UpdateResourceSnapshotState(); + } + + void Handle(TEvStateStorage::TEvBoardInfoUpdate::TPtr& ev) { + if (ev->Get()->Status == TEvStateStorage::TEvBoardInfo::EStatus::NotAvailable) { + LOG_I("Subcriber is not available for info exchanger, serving tenant: " << BoardState.Tenant + << ", board: " << BoardState.Path + << ", ssGroupId: " << BoardState.StateStorageGroupId); + CreateSubscriber(); + return; + } + LOG_D("Get board info update from subscriber, serving tenant: " << BoardState.Tenant + << ", board: " << BoardState.Path + << ", ssGroupId: " << BoardState.StateStorageGroupId + << ", with size: " << ev->Get()->Updates.size()); + + auto nodeIds = UpdateBoardInfo(ev->Get()->Updates); + + SendInfos({SelfId().NodeId()}, std::move(nodeIds)); + + UpdateResourceSnapshotState(); + } + + void Handle(TEvKqpResourceInfoExchanger::TEvPublishResource::TPtr& ev) { + auto& nodeState = NodesState[SelfId().NodeId()]; + + (*nodeState.NodeData.MutableResources()) = std::move(ev->Get()->Resources); + SendInfos({SelfId().NodeId()}); + + UpdateResourceSnapshotState(); + } + + void Handle(TEvKqpResourceInfoExchanger::TEvSendResources::TPtr& ev) { + auto nodeId = ev->Sender.NodeId(); + + LOG_D("Get resources info from node: " << nodeId); + + const TVector<NKikimrKqp::TResourceExchangeNodeData> resourceInfos( + ev->Get()->Record.GetSnapshot().begin(), ev->Get()->Record.GetSnapshot().end()); + + UpdateResourceInfo(resourceInfos); + + if (ev->Get()->Record.GetNeedResending()) { + auto nodesStateIt = NodesState.find(nodeId); + + if (nodesStateIt == NodesState.end()) { + return; + } + SendingToNode(nodesStateIt->second, false, {SelfId().NodeId()}); + } + + UpdateResourceSnapshotState(); + } + + void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr &ev) { + const auto& nodeId = ev->Get()->NodeId; + auto nodesStateIt = NodesState.find(nodeId); + + if (nodesStateIt == NodesState.end()) { + return; + } + SendingToNode(nodesStateIt->second, true, {SelfId().NodeId()}); + } + + void Handle(TEvPrivate::TEvRetrySending::TPtr& ev) { + const auto& nodeId = ev->Get()->NodeId; + auto nodesStateIt = NodesState.find(nodeId); + + if (nodesStateIt == NodesState.end()) { + return; + } + nodesStateIt->second.RetryState.IsScheduled = false; + + SendingToNode(nodesStateIt->second, true, {SelfId().NodeId()}); + } + + void Handle(TEvPrivate::TEvRetrySubscriber::TPtr&) { + CreateSubscriber(); + } + + void Handle(TEvents::TEvPoison::TPtr&) { + PassAway(); + } + +private: + STATEFN(WorkState) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvStateStorage::TEvBoardInfo, Handle); + hFunc(TEvStateStorage::TEvBoardInfoUpdate, Handle); + hFunc(TEvTenantPool::TEvTenantPoolStatus, Handle); + hFunc(TEvKqpResourceInfoExchanger::TEvPublishResource, Handle); + hFunc(TEvKqpResourceInfoExchanger::TEvSendResources, Handle); + hFunc(TEvPrivate::TEvRetrySubscriber, Handle); + hFunc(TEvPrivate::TEvRetrySending, Handle); + hFunc(TEvInterconnect::TEvNodeDisconnected, Handle); + hFunc(TEvents::TEvPoison, Handle); + } + } + +private: + + void PassAway() override { + Send(BoardState.Publisher, new TEvents::TEvPoison); + Send(BoardState.Subscriber, new TEvents::TEvPoison); + + TActor::PassAway(); + } + + TDuration GetRetryDelay(TRetryState& state) { + auto ret = state.CurrentDelay; + auto newDelay = state.CurrentDelay; + newDelay *= 2; + if (newDelay > TDuration::Seconds(1)) { + newDelay = TDuration::Seconds(1); + } + newDelay *= AppData()->RandomProvider->Uniform(100, 115); + newDelay /= 100; + state.CurrentDelay = newDelay; + return ret; + } + + void SendingToNode(TNodeState& state, bool retry, + TVector<ui32> nodeIdsForSnapshot = {}, NKikimrKqp::TResourceExchangeSnapshot snapshotMsg = {}) { + auto& retryState = state.RetryState; + if (retryState.IsScheduled) { + return; + } + + auto owner = ActorIdFromProto(state.NodeData.GetResourceExchangeBoardData().GetOwner()); + auto nodeId = owner.NodeId(); + + auto now = TlsActivationContext->Monotonic(); + if (retry && now - retryState.LastRetryAt < retryState.CurrentDelay) { + auto at = retryState.LastRetryAt + GetRetryDelay(retryState); + retryState.IsScheduled = true; + Schedule(at - now, new TEvPrivate::TEvRetrySending(nodeId)); + return; + } + + if (owner != SelfId()) { + auto msg = MakeHolder<TEvKqpResourceInfoExchanger::TEvSendResources>(); + if (nodeIdsForSnapshot.empty()) { + msg->Record = std::move(snapshotMsg); + } else { + msg->Record = CreateSnapshotMessage(nodeIdsForSnapshot); + } + Send(owner, msg.Release(), IEventHandle::FlagSubscribeOnSession); + } + + retryState.LastRetryAt = now; + } + + NKikimrKqp::TResourceExchangeSnapshot CreateSnapshotMessage(const TVector<ui32>& nodeIds, + bool needResending = false) { + NKikimrKqp::TResourceExchangeSnapshot snapshotMsg; + snapshotMsg.SetNeedResending(needResending); + auto snapshot = snapshotMsg.MutableSnapshot(); + snapshot->Reserve(nodeIds.size()); + + for (const auto& nodeId: nodeIds) { + auto* el = snapshot->Add(); + *el = NodesState[nodeId].NodeData; + } + + return snapshotMsg; + } + +private: + + TString SelfInfo; + TULID Ulid; + TString UlidString; + + ui64 Round = 0; + + struct TBoardState { + TString Tenant; + TString Path; + ui32 StateStorageGroupId = std::numeric_limits<ui32>::max(); + TActorId Publisher = TActorId(); + TActorId Subscriber = TActorId(); + TRetryState RetryStateForSubscriber; + std::optional<TInstant> LastPublishTime; + }; + TBoardState BoardState; + + THashMap<ui32, TNodeState> NodesState; + std::shared_ptr<TResourceSnapshotState> ResourceSnapshotState; + + TIntrusivePtr<TKqpCounters> Counters; +}; + +NActors::IActor* CreateKqpResourceInfoExchangerActor(TIntrusivePtr<TKqpCounters> counters, + std::shared_ptr<TResourceSnapshotState> resourceSnapshotState) +{ + return new TKqpResourceInfoExchangerActor(counters, std::move(resourceSnapshotState)); +} + +} // namespace NRm +} // namespace NKqp +} // namespace NKikimr diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.cpp b/ydb/core/kqp/rm_service/kqp_rm_service.cpp index 3b11a03ab7a..97d7fadf662 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_service.cpp +++ b/ydb/core/kqp/rm_service/kqp_rm_service.cpp @@ -10,7 +10,9 @@ #include <ydb/core/node_whiteboard/node_whiteboard.h> #include <ydb/core/tablet/resource_broker.h> + #include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/actor.h> #include <library/cpp/actors/interconnect/interconnect.h> #include <library/cpp/monlib/service/pages/templates.h> @@ -131,14 +133,14 @@ struct TEvPrivate { }; class TKqpResourceManager : public IKqpResourceManager { - public: TKqpResourceManager(const NKikimrConfig::TTableServiceConfig::TResourceManager& config, TIntrusivePtr<TKqpCounters> counters) : Config(config) , Counters(counters) , ExecutionUnitsResource(Config.GetComputeActorsCount()) - , ScanQueryMemoryResource(Config.GetQueryMemoryLimit()) { + , ScanQueryMemoryResource(Config.GetQueryMemoryLimit()) + , PublishResourcesByExchanger(Config.GetEnablePublishResourcesByExchanger()) { } @@ -149,6 +151,13 @@ public: ActorSystem = actorSystem; SelfId = selfId; UpdatePatternCache(Config.GetKqpPatternCacheCapacityBytes()); + + if (PublishResourcesByExchanger) { + ResourceSnapshotState = std::make_shared<TResourceSnapshotState>(); + auto exchanger = CreateKqpResourceInfoExchangerActor(Counters, ResourceSnapshotState); + ResourceInfoExchanger = ActorSystem->Register(exchanger); + return; + } } bool AllocateResources(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources, @@ -540,6 +549,15 @@ public: void RequestClusterResourcesInfo(TOnResourcesSnapshotCallback&& callback) override { LOG_AS_D("Schedule Snapshot request"); + if (PublishResourcesByExchanger) { + std::shared_ptr<const TVector<NKikimrKqp::TKqpNodeResources>> infos; + with_lock (ResourceSnapshotState->Lock) { + infos = ResourceSnapshotState->Snapshot; + } + TVector<NKikimrKqp::TKqpNodeResources> resources = *infos; + callback(std::move(resources)); + return; + } auto ev = MakeHolder<TEvPrivate::TEvTakeResourcesSnapshot>(); ev->Callback = std::move(callback); TAutoPtr<IEventHandle> handle = new IEventHandle(SelfId, SelfId, ev.Release()); @@ -622,6 +640,11 @@ public: // pattern cache for different actors std::shared_ptr<NMiniKQL::TComputationPatternLRUCache> PatternCache; + + // state for resource info exchanger + std::shared_ptr<TResourceSnapshotState> ResourceSnapshotState; + bool PublishResourcesByExchanger; + TActorId ResourceInfoExchanger = TActorId(); }; struct TResourceManagers { @@ -645,10 +668,11 @@ public: } TKqpResourceManagerActor(const NKikimrConfig::TTableServiceConfig::TResourceManager& config, - TIntrusivePtr<TKqpCounters> counters, const TActorId& resourceBrokerId, + TIntrusivePtr<TKqpCounters> counters, const TActorId& resourceBrokerId, std::shared_ptr<TKqpProxySharedResources>&& kqpProxySharedResources) : ResourceBrokerId(resourceBrokerId ? resourceBrokerId : MakeResourceBrokerID()) , KqpProxySharedResources(std::move(kqpProxySharedResources)) + , PublishResourcesByExchanger(config.GetEnablePublishResourcesByExchanger()) { ResourceManager = std::make_shared<TKqpResourceManager>(config, counters); } @@ -945,6 +969,13 @@ private: private: void PassAway() override { ToBroker(new TEvResourceBroker::TEvNotifyActorDied); + if (ResourceManager->ResourceInfoExchanger) { + Send(ResourceManager->ResourceInfoExchanger, new TEvents::TEvPoison); + } + if (WbState.BoardPublisherActorId) { + + Send(WbState.BoardPublisherActorId, new TEvents::TEvPoison); + } TActor::PassAway(); } @@ -982,19 +1013,6 @@ private: return; } - if (WbState.BoardPublisherActorId) { - LOG_I("Kill previous board publisher for '" << WbState.BoardPath - << "' at " << WbState.BoardPublisherActorId << ", reason: " << reason); - Send(WbState.BoardPublisherActorId, new TEvents::TEvPoison); - } - - WbState.BoardPublisherActorId = TActorId(); - - if (WbState.StateStorageGroupId == std::numeric_limits<ui32>::max()) { - LOG_E("Can not find default state storage group for database " << WbState.Tenant); - return; - } - NKikimrKqp::TKqpNodeResources payload; payload.SetNodeId(SelfId().NodeId()); payload.SetTimestamp(now.Seconds()); @@ -1017,6 +1035,29 @@ private: pool->SetAvailable(ResourceManager->ScanQueryMemoryResource.Available()); } + if (PublishResourcesByExchanger) { + LOG_I("Send to publish resource usage for " + << "reason: " << reason + << ", payload: " << payload.ShortDebugString()); + WbState.LastPublishTime = now; + Send(ResourceManager->ResourceInfoExchanger, + new TEvKqpResourceInfoExchanger::TEvPublishResource(std::move(payload))); + return; + } + + if (WbState.BoardPublisherActorId) { + LOG_I("Kill previous board publisher for '" << WbState.BoardPath + << "' at " << WbState.BoardPublisherActorId << ", reason: " << reason); + Send(WbState.BoardPublisherActorId, new TEvents::TEvPoison); + } + + WbState.BoardPublisherActorId = TActorId(); + + if (WbState.StateStorageGroupId == std::numeric_limits<ui32>::max()) { + LOG_E("Can not find default state storage group for database " << WbState.Tenant); + return; + } + auto boardPublisher = CreateBoardPublishActor(WbState.BoardPath, payload.SerializeAsString(), SelfId(), WbState.StateStorageGroupId, /* ttlMs */ 0, /* reg */ true); WbState.BoardPublisherActorId = Register(boardPublisher); @@ -1047,13 +1088,15 @@ private: TActorId WhiteBoardService; std::shared_ptr<TKqpResourceManager> ResourceManager; + + bool PublishResourcesByExchanger; }; } // namespace NRm NActors::IActor* CreateKqpResourceManagerActor(const NKikimrConfig::TTableServiceConfig::TResourceManager& config, - TIntrusivePtr<TKqpCounters> counters, NActors::TActorId resourceBroker, + TIntrusivePtr<TKqpCounters> counters, NActors::TActorId resourceBroker, std::shared_ptr<TKqpProxySharedResources> kqpProxySharedResources) { return new NRm::TKqpResourceManagerActor(config, counters, resourceBroker, std::move(kqpProxySharedResources)); diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.h b/ydb/core/kqp/rm_service/kqp_rm_service.h index 5b505e01b1c..2f149098b38 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_service.h +++ b/ydb/core/kqp/rm_service/kqp_rm_service.h @@ -1,6 +1,7 @@ #pragma once #include <ydb/core/protos/config.pb.h> +#include <ydb/core/kqp/common/simple/kqp_event_ids.h> #include <ydb/core/kqp/counters/kqp_counters.h> #include <ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h> @@ -94,7 +95,7 @@ public: virtual std::shared_ptr<NMiniKQL::TComputationPatternLRUCache> GetPatternCache() = 0; virtual ui32 GetNodeId() { - return 0; + return 0; } }; @@ -103,6 +104,29 @@ NActors::IActor* CreateTakeResourcesSnapshotActor( const TString& boardPath, ui32 stateStorageGroupId, std::function<void(TVector<NKikimrKqp::TKqpNodeResources>&&)>&& callback); + +struct TResourceSnapshotState { + std::shared_ptr<TVector<NKikimrKqp::TKqpNodeResources>> Snapshot; + TMutex Lock; +}; + +struct TEvKqpResourceInfoExchanger { + struct TEvPublishResource : public TEventLocal<TEvPublishResource, + TKqpResourceInfoExchangerEvents::EvPublishResource> + { + const NKikimrKqp::TKqpNodeResources Resources; + TEvPublishResource(NKikimrKqp::TKqpNodeResources resources) : Resources(std::move(resources)) { + } + }; + + struct TEvSendResources : public TEventPB<TEvSendResources, NKikimrKqp::TResourceExchangeSnapshot, + TKqpResourceInfoExchangerEvents::EvSendResources> + {}; +}; + +NActors::IActor* CreateKqpResourceInfoExchangerActor(TIntrusivePtr<TKqpCounters> counters, + std::shared_ptr<TResourceSnapshotState> resourceSnapshotState); + } // namespace NRm struct TKqpProxySharedResources { @@ -110,7 +134,7 @@ struct TKqpProxySharedResources { }; NActors::IActor* CreateKqpResourceManagerActor(const NKikimrConfig::TTableServiceConfig::TResourceManager& config, - TIntrusivePtr<TKqpCounters> counters, NActors::TActorId resourceBroker = {}, + TIntrusivePtr<TKqpCounters> counters, NActors::TActorId resourceBroker = {}, std::shared_ptr<TKqpProxySharedResources> kqpProxySharedResources = nullptr); std::shared_ptr<NRm::IKqpResourceManager> GetKqpResourceManager(TMaybe<ui32> nodeId = Nothing()); diff --git a/ydb/core/kqp/rm_service/kqp_rm_ut.cpp b/ydb/core/kqp/rm_service/kqp_rm_ut.cpp index 30eb41d7ca4..f109e925858 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_ut.cpp +++ b/ydb/core/kqp/rm_service/kqp_rm_ut.cpp @@ -4,6 +4,10 @@ #include <ydb/core/testlib/actor_helpers.h> #include <ydb/core/testlib/tablet_helpers.h> #include <ydb/core/testlib/tenant_runtime.h> +#include <ydb/core/kqp/common/simple/services.h> + +#include <library/cpp/actors/core/interconnect.h> +#include <library/cpp/actors/interconnect/interconnect_impl.h> #include <library/cpp/testing/unittest/registar.h> @@ -43,6 +47,15 @@ TTenantTestConfig MakeTenantTestConfig() { {{{DOMAIN1_NAME, {1, 1, 1}}}}, "node-type" } + }, + // Node1 + { + // TenantPoolConfig + { + // Static slots {tenant, {cpu, memory, network}} + {{{DOMAIN1_NAME, {1, 1, 1}}}}, + "node-type" + } } }}, // DataCenterCount @@ -81,12 +94,14 @@ TResourceBrokerConfig MakeResourceBrokerTestConfig() { return config; } -NKikimrConfig::TTableServiceConfig::TResourceManager MakeKqpResourceManagerConfig() { +NKikimrConfig::TTableServiceConfig::TResourceManager MakeKqpResourceManagerConfig( + bool EnablePublishResourcesByExchanger = false) { NKikimrConfig::TTableServiceConfig::TResourceManager config; config.SetComputeActorsCount(100); config.SetPublishStatisticsIntervalSec(0); config.SetQueryMemoryLimit(1000); + config.SetEnablePublishResourcesByExchanger(EnablePublishResourcesByExchanger); return config; } @@ -107,31 +122,50 @@ public: Counters = MakeIntrusive<::NMonitoring::TDynamicCounters>(); - auto resourceBrokerConfig = MakeResourceBrokerTestConfig(); - auto broker = CreateResourceBrokerActor(resourceBrokerConfig, Counters); - ResourceBrokerActorId = Runtime->Register(broker); + for (ui32 nodeIndex = 0; nodeIndex < Runtime->GetNodeCount(); ++nodeIndex) { + auto resourceBrokerConfig = MakeResourceBrokerTestConfig(); + auto broker = CreateResourceBrokerActor(resourceBrokerConfig, Counters); + auto resourceBrokerActorId = Runtime->Register(broker, nodeIndex); + ResourceBrokers.push_back(resourceBrokerActorId); + } WaitForBootstrap(); } + void TearDown() override { + ResourceBrokers.clear(); + ResourceManagers.clear(); + Runtime.Reset(); + } + void WaitForBootstrap() { TDispatchOptions options; options.FinalEvents.emplace_back(TEvents::TSystem::Bootstrap, 1); UNIT_ASSERT(Runtime->DispatchEvents(options)); } - IActor* CreateKqpResourceManager(const NKikimrConfig::TTableServiceConfig::TResourceManager& config = {}) { + void CreateKqpResourceManager( + const NKikimrConfig::TTableServiceConfig::TResourceManager& config, ui32 nodeInd = 0) { auto kqpCounters = MakeIntrusive<TKqpCounters>(Counters); - auto resman = CreateKqpResourceManagerActor(config, kqpCounters, ResourceBrokerActorId); - ResourceManagerActorId = Runtime->Register(resman); - Runtime->EnableScheduleForActor(ResourceManagerActorId, true); + auto resman = CreateKqpResourceManagerActor(config, kqpCounters, ResourceBrokers[nodeInd]); + ResourceManagers.push_back(Runtime->Register(resman, nodeInd)); + Runtime->RegisterService(MakeKqpResourceManagerServiceID( + Runtime->GetNodeId(nodeInd)), ResourceManagers.back(), nodeInd); + Runtime->EnableScheduleForActor(ResourceManagers.back(), true); + } + + void StartRms(const TVector<NKikimrConfig::TTableServiceConfig::TResourceManager>& configs = {}) { + for (ui32 nodeIndex = 0; nodeIndex < Runtime->GetNodeCount(); ++nodeIndex) { + if (configs.empty()) { + CreateKqpResourceManager(MakeKqpResourceManagerConfig(), nodeIndex); + } else { + CreateKqpResourceManager(configs[nodeIndex], nodeIndex); + } + } WaitForBootstrap(); - return resman; } void AssertResourceBrokerSensors(i64 cpu, i64 mem, i64 enqueued, i64 finished, i64 infly) { auto q = Counters->GetSubgroup("queue", "queue_kqp_resource_manager"); -// Cerr << "-- queue_kqp_resource_manager\n"; -// q->OutputPlainText(Cerr, " "); UNIT_ASSERT_VALUES_EQUAL(q->GetCounter("CPUConsumption")->Val(), cpu); UNIT_ASSERT_VALUES_EQUAL(q->GetCounter("MemoryConsumption")->Val(), mem); UNIT_ASSERT_VALUES_EQUAL(q->GetCounter("EnqueuedTasks")->Val(), enqueued); @@ -139,8 +173,6 @@ public: UNIT_ASSERT_VALUES_EQUAL(q->GetCounter("InFlyTasks")->Val(), infly); auto t = Counters->GetSubgroup("task", "kqp_query"); -// Cerr << "-- kqp_query\n"; -// t->OutputPlainText(Cerr, " "); UNIT_ASSERT_VALUES_EQUAL(t->GetCounter("CPUConsumption")->Val(), cpu); UNIT_ASSERT_VALUES_EQUAL(t->GetCounter("MemoryConsumption")->Val(), mem); UNIT_ASSERT_VALUES_EQUAL(t->GetCounter("EnqueuedTasks")->Val(), enqueued); @@ -148,20 +180,92 @@ public: UNIT_ASSERT_VALUES_EQUAL(t->GetCounter("InFlyTasks")->Val(), infly); } - void AssertResourceManagerStats(std::shared_ptr<NRm::IKqpResourceManager> rm, ui64 scanQueryMemory, ui32 executionUnits) { + void AssertResourceManagerStats( + std::shared_ptr<NRm::IKqpResourceManager> rm, ui64 scanQueryMemory, ui32 executionUnits) { auto stats = rm->GetLocalResources(); UNIT_ASSERT_VALUES_EQUAL(scanQueryMemory, stats.Memory[NRm::EKqpMemoryPool::ScanQuery]); UNIT_ASSERT_VALUES_EQUAL(executionUnits, stats.ExecutionUnits); } + void Disconnect(ui32 nodeIndexFrom, ui32 nodeIndexTo) { + const TActorId proxy = Runtime->GetInterconnectProxy(nodeIndexFrom, nodeIndexTo); + + Runtime->Send( + new IEventHandle( + proxy, TActorId(), new TEvInterconnect::TEvDisconnect(), 0, 0), + nodeIndexFrom, true); + + //Wait for event TEvInterconnect::EvNodeDisconnected + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvInterconnect::EvNodeDisconnected); + Runtime->DispatchEvents(options); + } + + struct TCheckedResources { + ui64 ScanQueryMemory; + ui32 ExecutionUnits; + + bool operator==(const TCheckedResources& other) const { + return ScanQueryMemory == other.ScanQueryMemory && + ExecutionUnits == other.ExecutionUnits; + } + }; + + void CheckSnapshot(ui32 nodeIndToCheck, TVector<TCheckedResources> verificationData, + std::shared_ptr<NRm::IKqpResourceManager> currentRm) { + TVector<NKikimrKqp::TKqpNodeResources> snapshot; + std::atomic<int> ready = 0; + + while(true) { + currentRm->RequestClusterResourcesInfo( + [&](TVector<NKikimrKqp::TKqpNodeResources>&& resources) { + snapshot = std::move(resources); + ready = 1; + }); + + while (ready.load() != 1) { + Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(100)); + } + + if (snapshot.size() != verificationData.size()) { + continue; + } + std::sort(snapshot.begin(), snapshot.end(), [](auto first, auto second) { + return first.GetNodeId() < second.GetNodeId(); + }); + + TVector<TCheckedResources> currentData; + std::transform(snapshot.cbegin(), snapshot.cend(), std::back_inserter(currentData), + [](const NKikimrKqp::TKqpNodeResources& cur) { + return TCheckedResources{cur.GetMemory()[0].GetAvailable(), cur.GetExecutionUnits()}; + }); + + if (verificationData[nodeIndToCheck] == currentData[nodeIndToCheck]) { + for (ui32 i = 0; i < verificationData.size(); i++) { + if (i != nodeIndToCheck) { + UNIT_ASSERT_VALUES_EQUAL(verificationData[i].ScanQueryMemory, currentData[i].ScanQueryMemory); + UNIT_ASSERT_VALUES_EQUAL(verificationData[i].ExecutionUnits, currentData[i].ExecutionUnits); + } + } + break; + } + } + } + UNIT_TEST_SUITE(KqpRm); UNIT_TEST(SingleTask); UNIT_TEST(ManyTasks); UNIT_TEST(NotEnoughMemory); UNIT_TEST(NotEnoughExecutionUnits); UNIT_TEST(ResourceBrokerNotEnoughResources); - UNIT_TEST(Snapshot); + UNIT_TEST(SingleSnapshotByStateStorage); + UNIT_TEST(SingleSnapshotByExchanger); UNIT_TEST(Reduce); + UNIT_TEST(SnapshotSharingByStateStorage); + UNIT_TEST(SnapshotSharingByExchanger); + UNIT_TEST(NodesMembershipByStateStorage); + UNIT_TEST(NodesMembershipByExchanger); + UNIT_TEST(DisonnectNodes); UNIT_TEST_SUITE_END(); void SingleTask(); @@ -169,24 +273,35 @@ public: void NotEnoughMemory(); void NotEnoughExecutionUnits(); void ResourceBrokerNotEnoughResources(); - void Snapshot(); + void Snapshot(bool byExchanger); + void SingleSnapshotByStateStorage(); + void SingleSnapshotByExchanger(); void Reduce(); + void SnapshotSharing(bool byExchanger); + void SnapshotSharingByStateStorage(); + void SnapshotSharingByExchanger(); + void NodesMembership(bool byExchanger); + void NodesMembershipByStateStorage(); + void NodesMembershipByExchanger(); + void DisonnectNodes(); private: THolder<TTestBasicRuntime> Runtime; TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters; - // TIntrusivePtr<TKqpCounters> KqpCounters; - TActorId ResourceBrokerActorId; - TActorId ResourceManagerActorId; + TVector<TActorId> ResourceBrokers; + TVector<TActorId> ResourceManagers; }; UNIT_TEST_SUITE_REGISTRATION(KqpRm); void KqpRm::SingleTask() { - CreateKqpResourceManager(MakeKqpResourceManagerConfig()); + StartRms(); NKikimr::TActorSystemStub stub; - auto rm = GetKqpResourceManager(ResourceManagerActorId.NodeId()); + auto rm = GetKqpResourceManager(ResourceManagers.front().NodeId()); + + auto stats = rm->GetLocalResources(); + UNIT_ASSERT_VALUES_EQUAL(1000, stats.Memory[NRm::EKqpMemoryPool::ScanQuery]); NRm::TKqpResourcesRequest request; request.ExecutionUnits = 10; @@ -205,10 +320,10 @@ void KqpRm::SingleTask() { } void KqpRm::ManyTasks() { - CreateKqpResourceManager(MakeKqpResourceManagerConfig()); + StartRms(); NKikimr::TActorSystemStub stub; - auto rm = GetKqpResourceManager(ResourceManagerActorId.NodeId()); + auto rm = GetKqpResourceManager(ResourceManagers.front().NodeId()); NRm::TKqpResourcesRequest request; request.ExecutionUnits = 10; @@ -243,10 +358,10 @@ void KqpRm::ManyTasks() { } void KqpRm::NotEnoughMemory() { - CreateKqpResourceManager(MakeKqpResourceManagerConfig()); + StartRms(); NKikimr::TActorSystemStub stub; - auto rm = GetKqpResourceManager(ResourceManagerActorId.NodeId()); + auto rm = GetKqpResourceManager(ResourceManagers.front().NodeId()); NRm::TKqpResourcesRequest request; request.ExecutionUnits = 10; @@ -261,10 +376,10 @@ void KqpRm::NotEnoughMemory() { } void KqpRm::NotEnoughExecutionUnits() { - CreateKqpResourceManager(MakeKqpResourceManagerConfig()); + StartRms(); NKikimr::TActorSystemStub stub; - auto rm = GetKqpResourceManager(ResourceManagerActorId.NodeId()); + auto rm = GetKqpResourceManager(ResourceManagers.front().NodeId()); NRm::TKqpResourcesRequest request; request.ExecutionUnits = 1000; @@ -281,10 +396,11 @@ void KqpRm::NotEnoughExecutionUnits() { void KqpRm::ResourceBrokerNotEnoughResources() { auto config = MakeKqpResourceManagerConfig(); config.SetQueryMemoryLimit(100000000); - CreateKqpResourceManager(config); + + StartRms({config, MakeKqpResourceManagerConfig()}); NKikimr::TActorSystemStub stub; - auto rm = GetKqpResourceManager(ResourceManagerActorId.NodeId()); + auto rm = GetKqpResourceManager(ResourceManagers.front().NodeId()); NRm::TKqpResourcesRequest request; request.ExecutionUnits = 10; @@ -303,11 +419,11 @@ void KqpRm::ResourceBrokerNotEnoughResources() { AssertResourceBrokerSensors(0, 1000, 0, 0, 1); } -void KqpRm::Snapshot() { - CreateKqpResourceManager(MakeKqpResourceManagerConfig()); +void KqpRm::Snapshot(bool byExchanger) { + StartRms({MakeKqpResourceManagerConfig(byExchanger), MakeKqpResourceManagerConfig(byExchanger)}); NKikimr::TActorSystemStub stub; - auto rm = GetKqpResourceManager(ResourceManagerActorId.NodeId()); + auto rm = GetKqpResourceManager(ResourceManagers.front().NodeId()); NRm::TKqpResourcesRequest request; request.ExecutionUnits = 10; @@ -325,24 +441,7 @@ void KqpRm::Snapshot() { Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(500)); - { - TVector<NKikimrKqp::TKqpNodeResources> snapshot; - std::atomic<int> ready = 0; - rm->RequestClusterResourcesInfo([&](TVector<NKikimrKqp::TKqpNodeResources>&& resources) { - snapshot = std::move(resources); - ready = 1; - }); - - while (ready.load() != 1) { - Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(100)); - } - - UNIT_ASSERT_VALUES_EQUAL(1, snapshot.size()); - UNIT_ASSERT_VALUES_EQUAL(80, snapshot[0].GetExecutionUnits()); - UNIT_ASSERT_VALUES_EQUAL(1, snapshot[0].GetMemory().size()); - UNIT_ASSERT_VALUES_EQUAL(1, snapshot[0].GetMemory()[0].GetPool()); - UNIT_ASSERT_VALUES_EQUAL(800, snapshot[0].GetMemory()[0].GetAvailable()); - } + CheckSnapshot(0, {{800, 80}, {1000, 100}}, rm); rm->FreeResources(1); rm->FreeResources(2); @@ -351,31 +450,22 @@ void KqpRm::Snapshot() { Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(500)); - { - TVector<NKikimrKqp::TKqpNodeResources> snapshot; - std::atomic<int> ready = 0; - rm->RequestClusterResourcesInfo([&](TVector<NKikimrKqp::TKqpNodeResources>&& resources) { - snapshot = std::move(resources); - ready = 1; - }); + CheckSnapshot(0, {{1000, 100}, {1000, 100}}, rm); +} - while (ready.load() != 1) { - Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(100)); - } +void KqpRm::SingleSnapshotByStateStorage() { + Snapshot(false); +} - UNIT_ASSERT_VALUES_EQUAL(1, snapshot.size()); - UNIT_ASSERT_VALUES_EQUAL(100, snapshot[0].GetExecutionUnits()); - UNIT_ASSERT_VALUES_EQUAL(1, snapshot[0].GetMemory().size()); - UNIT_ASSERT_VALUES_EQUAL(1, snapshot[0].GetMemory()[0].GetPool()); - UNIT_ASSERT_VALUES_EQUAL(1000, snapshot[0].GetMemory()[0].GetAvailable()); - } +void KqpRm::SingleSnapshotByExchanger() { + Snapshot(true); } void KqpRm::Reduce() { - CreateKqpResourceManager(MakeKqpResourceManagerConfig()); + StartRms(); NKikimr::TActorSystemStub stub; - auto rm = GetKqpResourceManager(ResourceManagerActorId.NodeId()); + auto rm = GetKqpResourceManager(ResourceManagers.front().NodeId()); NRm::TKqpResourcesRequest request; request.ExecutionUnits = 10; @@ -408,5 +498,125 @@ void KqpRm::Reduce() { AssertResourceBrokerSensors(0, 30, 0, 0, 1); } +void KqpRm::SnapshotSharing(bool byExchanger) { + StartRms({MakeKqpResourceManagerConfig(byExchanger), MakeKqpResourceManagerConfig(byExchanger)}); + NKikimr::TActorSystemStub stub; + + auto rm_first = GetKqpResourceManager(ResourceManagers[0].NodeId()); + auto rm_second = GetKqpResourceManager(ResourceManagers[1].NodeId()); + + CheckSnapshot(0, {{1000, 100}, {1000, 100}}, rm_first); + CheckSnapshot(1, {{1000, 100}, {1000, 100}}, rm_second); + + NRm::TKqpResourcesRequest request; + request.ExecutionUnits = 10; + request.MemoryPool = NRm::EKqpMemoryPool::ScanQuery; + request.Memory = 100; + + { + bool allocated = rm_first->AllocateResources(1, 2, request); + UNIT_ASSERT(allocated); + + allocated = rm_first->AllocateResources(2, 1, request); + UNIT_ASSERT(allocated); + + Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(500)); + + CheckSnapshot(0, {{800, 80}, {1000, 100}}, rm_second); + } + + { + bool allocated = rm_second->AllocateResources(1, 2, request); + UNIT_ASSERT(allocated); + + allocated = rm_second->AllocateResources(2, 1, request); + UNIT_ASSERT(allocated); + + Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(500)); + + CheckSnapshot(1, {{800, 80}, {800, 80}}, rm_first); + } + + { + rm_first->FreeResources(1); + rm_first->FreeResources(2); + + Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(500)); + + CheckSnapshot(0, {{1000, 100}, {800, 80}}, rm_second); + } + + { + rm_second->FreeResources(1); + rm_second->FreeResources(2); + + Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(500)); + + CheckSnapshot(1, {{1000, 100}, {1000, 100}}, rm_first); + } +} + +void KqpRm::SnapshotSharingByStateStorage() { + SnapshotSharing(false); +} + +void KqpRm::SnapshotSharingByExchanger() { + SnapshotSharing(true); +} + +void KqpRm::NodesMembership(bool byExchanger) { + StartRms({MakeKqpResourceManagerConfig(byExchanger), MakeKqpResourceManagerConfig(byExchanger)}); + NKikimr::TActorSystemStub stub; + + auto rm_first = GetKqpResourceManager(ResourceManagers[0].NodeId()); + auto rm_second = GetKqpResourceManager(ResourceManagers[1].NodeId()); + + CheckSnapshot(0, {{1000, 100}, {1000, 100}}, rm_first); + CheckSnapshot(1, {{1000, 100}, {1000, 100}}, rm_second); + + const TActorId edge = Runtime->AllocateEdgeActor(1); + Runtime->Send(new IEventHandle( + ResourceManagers[1], edge, new TEvents::TEvPoison, IEventHandle::FlagTrackDelivery, 0), + 1, false); + + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvents::TSystem::Poison, 1); + UNIT_ASSERT(Runtime->DispatchEvents(options)); + + CheckSnapshot(0, {{1000, 100}}, rm_first); +} + +void KqpRm::NodesMembershipByStateStorage() { + NodesMembership(false); +} + +void KqpRm::NodesMembershipByExchanger() { + NodesMembership(true); +} + +void KqpRm::DisonnectNodes() { + StartRms({MakeKqpResourceManagerConfig(true), MakeKqpResourceManagerConfig(true)}); + NKikimr::TActorSystemStub stub; + + auto rm_first = GetKqpResourceManager(ResourceManagers[0].NodeId()); + auto rm_second = GetKqpResourceManager(ResourceManagers[1].NodeId()); + + CheckSnapshot(0, {{1000, 100}, {1000, 100}}, rm_first); + CheckSnapshot(1, {{1000, 100}, {1000, 100}}, rm_second); + + auto prevObserverFunc = Runtime->SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { + switch (ev->GetTypeRewrite()) { + case NRm::TEvKqpResourceInfoExchanger::TEvSendResources::EventType: { + return TTestActorRuntime::EEventAction::DROP; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }); + + Disconnect(0, 1); + + CheckSnapshot(0, {{1000, 100}}, rm_first); +} + } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/rm_service/ya.make b/ydb/core/kqp/rm_service/ya.make index 9fc7b2b5f89..46fe31663d1 100644 --- a/ydb/core/kqp/rm_service/ya.make +++ b/ydb/core/kqp/rm_service/ya.make @@ -3,6 +3,7 @@ LIBRARY() SRCS( kqp_resource_tracker.cpp kqp_resource_estimation.cpp + kqp_resource_info_exchanger.cpp kqp_rm_service.cpp kqp_snapshot_manager.cpp ) @@ -20,6 +21,7 @@ PEERDIR( ydb/core/protos ydb/core/tablet ydb/core/node_whiteboard + ydb/core/util ) YQL_LAST_ABI_VERSION() diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index e09a4bd41be..c036d3de57d 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1126,6 +1126,7 @@ message TTableServiceConfig { reserved 14; optional TShardsScanningPolicy ShardsScanningPolicy = 16; optional uint64 KqpPatternCacheCapacityBytes = 17 [default = 104857600]; // 100 MiB, 0 is for disable + optional bool EnablePublishResourcesByExchanger = 18 [default = false]; } message TSpillingServiceConfig { diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index 876d6de11b6..605e01cb57a 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -515,6 +515,12 @@ message TEvCancelKqpTasksRequest { message TEvCancelKqpTasksResponse { } +message TResourceExchangeBoardData { + optional NActorsProto.TActorId Owner = 1; + optional bytes Ulid = 2; + optional NActorsProto.TActorId Publisher = 3; +} + message TKqpNodeResources { optional NActorsProto.TActorId ResourceManagerActorId = 2; // legacy optional uint32 AvailableComputeActors = 4; // legacy @@ -629,6 +635,17 @@ message TEvFetchScriptResultsResponse { optional Ydb.ResultSet ResultSet = 4; } +message TResourceExchangeNodeData { + optional TKqpNodeResources Resources = 1; + optional TResourceExchangeBoardData ResourceExchangeBoardData = 2; + optional uint64 Round = 3; +} + +message TResourceExchangeSnapshot { + optional bool NeedResending = 1; + repeated TResourceExchangeNodeData Snapshot = 2; +} + // Request that is sent to run script actor to cancel execution and write finish status to database. message TEvCancelScriptExecutionRequest { } |