aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorshumkovnd <shumkovnd@yandex-team.com>2023-06-22 15:30:02 +0300
committershumkovnd <shumkovnd@yandex-team.com>2023-06-22 15:30:02 +0300
commitc3748121232aaab487cd1623b9ce7e225f904c22 (patch)
tree5ce88417926ec0e2d48f0777dc117a4a3f4982a4
parent17b977f039bd579d528a4fdd5a1d635088f99b9c (diff)
downloadydb-c3748121232aaab487cd1623b9ce7e225f904c22.tar.gz
Add TKqpResourceInfoExchangerActor
-rw-r--r--ydb/core/kqp/common/simple/kqp_event_ids.h7
-rw-r--r--ydb/core/kqp/counters/kqp_counters.cpp2
-rw-r--r--ydb/core/kqp/counters/kqp_counters.h1
-rw-r--r--ydb/core/kqp/rm_service/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/core/kqp/rm_service/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/kqp/rm_service/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/core/kqp/rm_service/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp524
-rw-r--r--ydb/core/kqp/rm_service/kqp_rm_service.cpp77
-rw-r--r--ydb/core/kqp/rm_service/kqp_rm_service.h28
-rw-r--r--ydb/core/kqp/rm_service/kqp_rm_ut.cpp346
-rw-r--r--ydb/core/kqp/rm_service/ya.make2
-rw-r--r--ydb/core/protos/config.proto1
-rw-r--r--ydb/core/protos/kqp.proto17
14 files changed, 926 insertions, 87 deletions
diff --git a/ydb/core/kqp/common/simple/kqp_event_ids.h b/ydb/core/kqp/common/simple/kqp_event_ids.h
index 35648143ad2..2f800dcb005 100644
--- a/ydb/core/kqp/common/simple/kqp_event_ids.h
+++ b/ydb/core/kqp/common/simple/kqp_event_ids.h
@@ -141,5 +141,12 @@ struct TKqpScriptExecutionEvents {
};
};
+struct TKqpResourceInfoExchangerEvents {
+ enum EKqpResourceInfoExchangerEvents {
+ EvPublishResource = EventSpaceBegin(TKikimrEvents::ES_KQP) + 600,
+ EvSendResources,
+ };
+};
+
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp
index fb3f5bcd43d..6d352b049b2 100644
--- a/ydb/core/kqp/counters/kqp_counters.cpp
+++ b/ydb/core/kqp/counters/kqp_counters.cpp
@@ -757,6 +757,8 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co
RmNotEnoughComputeActors = KqpGroup->GetCounter("RM/NotEnoughComputeActors", true);
RmExtraMemAllocs = KqpGroup->GetCounter("RM/ExtraMemAllocs", true);
RmInternalError = KqpGroup->GetCounter("RM/InternalError", true);
+ RmSnapshotLatency = KqpGroup->GetHistogram(
+ "RM/SnapshotLatency", NMonitoring::ExponentialHistogram(20, 2, 1));
/* Spilling */
SpillingWriteBlobs = KqpGroup->GetCounter("Spilling/WriteBlobs", true);
diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h
index ec906a81fc6..0d2943e0fbe 100644
--- a/ydb/core/kqp/counters/kqp_counters.h
+++ b/ydb/core/kqp/counters/kqp_counters.h
@@ -356,6 +356,7 @@ public:
::NMonitoring::TDynamicCounters::TCounterPtr RmNotEnoughComputeActors;
::NMonitoring::TDynamicCounters::TCounterPtr RmExtraMemAllocs;
::NMonitoring::TDynamicCounters::TCounterPtr RmInternalError;
+ NMonitoring::THistogramPtr RmSnapshotLatency;
// Spilling counters
::NMonitoring::TDynamicCounters::TCounterPtr SpillingWriteBlobs;
diff --git a/ydb/core/kqp/rm_service/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/rm_service/CMakeLists.darwin-x86_64.txt
index d7f93026d26..4f01ea52d22 100644
--- a/ydb/core/kqp/rm_service/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/rm_service/CMakeLists.darwin-x86_64.txt
@@ -27,10 +27,12 @@ target_link_libraries(core-kqp-rm_service PUBLIC
ydb-core-protos
ydb-core-tablet
ydb-core-node_whiteboard
+ ydb-core-util
)
target_sources(core-kqp-rm_service PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_resource_tracker.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_resource_estimation.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_rm_service.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp
)
diff --git a/ydb/core/kqp/rm_service/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/rm_service/CMakeLists.linux-aarch64.txt
index 78bf95bf0fc..8b9fb7fe00b 100644
--- a/ydb/core/kqp/rm_service/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/rm_service/CMakeLists.linux-aarch64.txt
@@ -28,10 +28,12 @@ target_link_libraries(core-kqp-rm_service PUBLIC
ydb-core-protos
ydb-core-tablet
ydb-core-node_whiteboard
+ ydb-core-util
)
target_sources(core-kqp-rm_service PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_resource_tracker.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_resource_estimation.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_rm_service.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp
)
diff --git a/ydb/core/kqp/rm_service/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/rm_service/CMakeLists.linux-x86_64.txt
index 78bf95bf0fc..8b9fb7fe00b 100644
--- a/ydb/core/kqp/rm_service/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/rm_service/CMakeLists.linux-x86_64.txt
@@ -28,10 +28,12 @@ target_link_libraries(core-kqp-rm_service PUBLIC
ydb-core-protos
ydb-core-tablet
ydb-core-node_whiteboard
+ ydb-core-util
)
target_sources(core-kqp-rm_service PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_resource_tracker.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_resource_estimation.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_rm_service.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp
)
diff --git a/ydb/core/kqp/rm_service/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/rm_service/CMakeLists.windows-x86_64.txt
index d7f93026d26..4f01ea52d22 100644
--- a/ydb/core/kqp/rm_service/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/rm_service/CMakeLists.windows-x86_64.txt
@@ -27,10 +27,12 @@ target_link_libraries(core-kqp-rm_service PUBLIC
ydb-core-protos
ydb-core-tablet
ydb-core-node_whiteboard
+ ydb-core-util
)
target_sources(core-kqp-rm_service PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_resource_tracker.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_resource_estimation.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_rm_service.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp
)
diff --git a/ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp b/ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp
new file mode 100644
index 00000000000..836c61262d0
--- /dev/null
+++ b/ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp
@@ -0,0 +1,524 @@
+#include "kqp_rm_service.h"
+
+#include <ydb/core/base/location.h>
+#include <ydb/core/base/statestorage.h>
+#include <ydb/core/kqp/common/kqp.h>
+#include <ydb/core/mind/tenant_pool.h>
+#include <ydb/core/kqp/common/kqp_event_ids.h>
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/interconnect/interconnect.h>
+
+#include <ydb/core/util/ulid.h>
+
+namespace NKikimr {
+namespace NKqp {
+namespace NRm {
+
+#define LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream)
+#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream)
+#define LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream)
+#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream)
+#define LOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream)
+#define LOG_N(stream) LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream)
+
+class TKqpResourceInfoExchangerActor : public TActorBootstrapped<TKqpResourceInfoExchangerActor> {
+ using TBase = TActorBootstrapped<TKqpResourceInfoExchangerActor>;
+
+ struct TEvPrivate {
+ enum EEv {
+ EvRetrySending = EventSpaceBegin(TEvents::ES_PRIVATE),
+ EvRetrySubscriber,
+ };
+
+ struct TEvRetrySending :
+ public TEventLocal<TEvRetrySending, EEv::EvRetrySending> {
+ ui32 NodeId;
+
+ TEvRetrySending(ui32 nodeId) : NodeId(nodeId) {
+ }
+ };
+
+ struct TEvRetrySubscriber :
+ public TEventLocal<TEvRetrySubscriber, EEv::EvRetrySubscriber> {
+ };
+ };
+
+ struct TRetryState {
+ bool IsScheduled = false;
+ NMonotonic::TMonotonic LastRetryAt = TMonotonic::Zero();
+ TDuration CurrentDelay = TDuration::MilliSeconds(10);
+ };
+
+ struct TNodeState {
+ TRetryState RetryState;
+ NMonotonic::TMonotonic LastUpdateAt = TMonotonic::Zero();
+ NKikimrKqp::TResourceExchangeNodeData NodeData;
+ };
+
+public:
+
+ TKqpResourceInfoExchangerActor(TIntrusivePtr<TKqpCounters> counters,
+ std::shared_ptr<TResourceSnapshotState> resourceSnapshotState)
+ : ResourceSnapshotState(std::move(resourceSnapshotState))
+ , Counters(counters)
+ {
+ Y_UNUSED(counters);
+ }
+
+ void Bootstrap() {
+ LOG_D("Start KqpResourceInfoExchangerActor at " << SelfId());
+
+ Become(&TKqpResourceInfoExchangerActor::WorkState);
+
+ Send(MakeTenantPoolRootID(), new TEvents::TEvSubscribe);
+ }
+
+private:
+
+ static TString MakeKqpInfoExchangerBoardPath(TStringBuf database) {
+ return TStringBuilder() << "kqpexch+" << database;
+ }
+
+ void CreateSubscriber() {
+ auto& retryState = BoardState.RetryStateForSubscriber;
+
+ auto now = TlsActivationContext->Monotonic();
+ if (now - retryState.LastRetryAt < retryState.CurrentDelay) {
+ auto at = retryState.LastRetryAt + GetRetryDelay(retryState);
+ Schedule(at - now, new TEvPrivate::TEvRetrySubscriber);
+ return;
+ }
+
+ if (BoardState.Subscriber) {
+ LOG_I("Kill previous info exchanger subscriber for '" << BoardState.Path
+ << "' at " << BoardState.Subscriber << ", reason: tenant updated");
+ Send(BoardState.Subscriber, new TEvents::TEvPoison);
+ }
+ BoardState.Subscriber = TActorId();
+
+ auto subscriber = CreateBoardLookupActor(
+ BoardState.Path, SelfId(), BoardState.StateStorageGroupId, EBoardLookupMode::Subscription);
+ BoardState.Subscriber = Register(subscriber);
+
+ retryState.LastRetryAt = now;
+ }
+
+ void CreatePublisher() {
+ if (BoardState.Publisher) {
+ LOG_I("Kill previous info exchanger publisher for '" << BoardState.Path
+ << "' at " << BoardState.Publisher << ", reason: tenant updated");
+ Send(BoardState.Publisher, new TEvents::TEvPoison);
+ }
+ BoardState.Publisher = TActorId();
+
+ auto publisher = CreateBoardPublishActor(BoardState.Path, SelfInfo, SelfId(),
+ BoardState.StateStorageGroupId, /* ttlMs */ 0, /* reg */ true);
+ BoardState.Publisher = Register(publisher);
+
+ auto& nodeState = NodesState[SelfId().NodeId()];
+ auto& info = nodeState.NodeData;
+ auto* boardData = info.MutableResourceExchangeBoardData();
+ ActorIdToProto(BoardState.Publisher, boardData->MutablePublisher());
+ }
+
+ TVector<ui32> UpdateBoardInfo(const TMap<TActorId, TEvStateStorage::TBoardInfoEntry>& infos) {
+ auto nodeIds = TVector<ui32>();
+
+ auto now = TlsActivationContext->Monotonic();
+ for (const auto& [id, entry] : infos) {
+ if (entry.Dropped) {
+ auto nodesStateIt = NodesState.find(id.NodeId());
+ if (nodesStateIt == NodesState.end()) {
+ continue;
+ }
+ auto& currentBoardData = nodesStateIt->second.NodeData.GetResourceExchangeBoardData();
+ auto currentPublisher = ActorIdFromProto(currentBoardData.GetPublisher());
+ if (currentPublisher == id) {
+ NodesState.erase(nodesStateIt);
+ }
+ continue;
+ }
+
+ NKikimrKqp::TResourceExchangeBoardData boardData;
+ Y_PROTOBUF_SUPPRESS_NODISCARD boardData.ParseFromString(entry.Payload);
+ auto owner = ActorIdFromProto(boardData.GetOwner());
+ auto ulidString = boardData.GetUlid();
+
+ auto& nodeState = NodesState[owner.NodeId()];
+ auto& currentInfo = nodeState.NodeData;
+ auto* currentBoardData = currentInfo.MutableResourceExchangeBoardData();
+ auto currentUlidString = currentBoardData->GetUlid();
+
+ if (currentUlidString.empty()) {
+ *currentBoardData = std::move(boardData);
+ ActorIdToProto(id, currentBoardData->MutablePublisher());
+ nodeIds.push_back(owner.NodeId());
+ nodeState.LastUpdateAt = now;
+ continue;
+ }
+
+ auto currentUlid = TULID::FromBinary(currentUlidString);
+ auto ulid = TULID::FromBinary(ulidString);
+
+ if (currentUlid < ulid) {
+ *currentBoardData = std::move(boardData);
+ ActorIdToProto(id, currentBoardData->MutablePublisher());
+ currentInfo.SetRound(0);
+ (*currentInfo.MutableResources()) = NKikimrKqp::TKqpNodeResources();
+ nodeIds.push_back(owner.NodeId());
+ nodeState.LastUpdateAt = now;
+ continue;
+ }
+ }
+
+ return nodeIds;
+ }
+
+ void UpdateResourceInfo(const TVector<NKikimrKqp::TResourceExchangeNodeData>& infos) {
+ auto now = TlsActivationContext->Monotonic();
+
+ for (const auto& info : infos) {
+ const auto& boardData = info.GetResourceExchangeBoardData();
+ auto owner = ActorIdFromProto(boardData.GetOwner());
+
+ auto nodesStateIt = NodesState.find(owner.NodeId());
+ if (nodesStateIt == NodesState.end()) {
+ continue;
+ }
+
+ auto& nodeState = nodesStateIt->second;
+ auto& currentInfo = nodeState.NodeData;
+ auto* currentBoardData = currentInfo.MutableResourceExchangeBoardData();
+ auto currentUlidString = (*currentBoardData).GetUlid();
+
+ auto round = info.GetRound();
+ auto ulidString = boardData.GetUlid();
+ auto ulid = TULID::FromBinary(boardData.GetUlid());
+
+ auto currentUlid = TULID::FromBinary(currentUlidString);
+ auto currentRound = currentInfo.GetRound();
+
+ if (currentUlid < ulid || (currentUlid == ulid && currentRound < round)) {
+ currentInfo = info;
+ auto latency = now - nodeState.LastUpdateAt;
+ Counters->RmSnapshotLatency->Collect(latency.MilliSeconds());
+ nodeState.LastUpdateAt = now;
+ continue;
+ }
+ }
+ }
+
+ void UpdateResourceSnapshotState() {
+ TVector<NKikimrKqp::TKqpNodeResources> resources;
+ resources.reserve(NodesState.size());
+ for (const auto& [id, state] : NodesState) {
+ auto currentResources = state.NodeData.GetResources();
+ if (currentResources.GetNodeId()) {
+ resources.push_back(currentResources);
+ }
+ }
+
+ with_lock (ResourceSnapshotState->Lock) {
+ ResourceSnapshotState->Snapshot =
+ std::make_shared<TVector<NKikimrKqp::TKqpNodeResources>>(std::move(resources));
+ }
+ }
+
+ void SendInfos(TVector<ui32> infoNodeIds, TVector<ui32> nodeIds = {}) {
+ auto& nodeState = NodesState[SelfId().NodeId()];
+ nodeState.NodeData.SetRound(Round++);
+
+ if (!nodeIds.empty()) {
+ auto snapshotMsg = CreateSnapshotMessage(infoNodeIds, true);
+
+ for (const auto& nodeId : nodeIds) {
+ auto nodesStateIt = NodesState.find(nodeId);
+
+ if (nodesStateIt == NodesState.end()) {
+ return;
+ }
+ SendingToNode(nodesStateIt->second, false, {}, snapshotMsg);
+ }
+ } else {
+ auto snapshotMsg = CreateSnapshotMessage(infoNodeIds, false);
+
+ for (auto& [id, state] : NodesState) {
+ SendingToNode(state, false, {}, snapshotMsg);
+ }
+ }
+ }
+
+private:
+
+ void Handle(TEvTenantPool::TEvTenantPoolStatus::TPtr& ev) {
+ TString tenant;
+ for (auto &slot : ev->Get()->Record.GetSlots()) {
+ if (slot.HasAssignedTenant()) {
+ if (tenant.empty()) {
+ tenant = slot.GetAssignedTenant();
+ } else {
+ LOG_E("Multiple tenants are served by the node: " << ev->Get()->Record.ShortDebugString());
+ }
+ }
+ }
+
+ BoardState.Tenant = tenant;
+ BoardState.Path = MakeKqpInfoExchangerBoardPath(tenant);
+
+ if (auto* domainInfo = AppData()->DomainsInfo->GetDomainByName(ExtractDomain(tenant))) {
+ BoardState.StateStorageGroupId = domainInfo->DefaultStateStorageGroup;
+ } else {
+ BoardState.StateStorageGroupId = std::numeric_limits<ui32>::max();
+ }
+
+ if (BoardState.StateStorageGroupId == std::numeric_limits<ui32>::max()) {
+ LOG_E("Can not find default state storage group for database " << BoardState.Tenant);
+ return;
+ }
+
+ TULIDGenerator ulidGen;
+ auto now = TInstant::Now();
+ Ulid = ulidGen.Next(now);
+ UlidString = Ulid.ToBinary();
+
+ NKikimrKqp::TResourceExchangeBoardData payload;
+ ActorIdToProto(SelfId(), payload.MutableOwner());
+ payload.SetUlid(UlidString);
+
+ auto nowMonotic = TlsActivationContext->Monotonic();
+
+ auto& nodeState = NodesState[SelfId().NodeId()];
+ nodeState.LastUpdateAt = nowMonotic;
+
+ (*nodeState.NodeData.MutableResourceExchangeBoardData()) = payload;
+
+ SelfInfo = payload.SerializeAsString();
+
+ CreatePublisher();
+ CreateSubscriber();
+
+ LOG_I("Received tenant pool status for exchanger, serving tenant: " << BoardState.Tenant
+ << ", board: " << BoardState.Path
+ << ", ssGroupId: " << BoardState.StateStorageGroupId);
+ }
+
+
+ void Handle(TEvStateStorage::TEvBoardInfo::TPtr& ev) {
+ if (ev->Get()->Status == TEvStateStorage::TEvBoardInfo::EStatus::NotAvailable) {
+ LOG_I("Subcriber is not available for info exchanger, serving tenant: " << BoardState.Tenant
+ << ", board: " << BoardState.Path
+ << ", ssGroupId: " << BoardState.StateStorageGroupId);
+ CreateSubscriber();
+ return;
+ }
+ LOG_D("Get board info from subscriber, serving tenant: " << BoardState.Tenant
+ << ", board: " << BoardState.Path
+ << ", ssGroupId: " << BoardState.StateStorageGroupId
+ << ", with size: " << ev->Get()->InfoEntries.size());
+
+ auto nodeIds = UpdateBoardInfo(ev->Get()->InfoEntries);
+
+ SendInfos({SelfId().NodeId()}, std::move(nodeIds));
+
+ UpdateResourceSnapshotState();
+ }
+
+ void Handle(TEvStateStorage::TEvBoardInfoUpdate::TPtr& ev) {
+ if (ev->Get()->Status == TEvStateStorage::TEvBoardInfo::EStatus::NotAvailable) {
+ LOG_I("Subcriber is not available for info exchanger, serving tenant: " << BoardState.Tenant
+ << ", board: " << BoardState.Path
+ << ", ssGroupId: " << BoardState.StateStorageGroupId);
+ CreateSubscriber();
+ return;
+ }
+ LOG_D("Get board info update from subscriber, serving tenant: " << BoardState.Tenant
+ << ", board: " << BoardState.Path
+ << ", ssGroupId: " << BoardState.StateStorageGroupId
+ << ", with size: " << ev->Get()->Updates.size());
+
+ auto nodeIds = UpdateBoardInfo(ev->Get()->Updates);
+
+ SendInfos({SelfId().NodeId()}, std::move(nodeIds));
+
+ UpdateResourceSnapshotState();
+ }
+
+ void Handle(TEvKqpResourceInfoExchanger::TEvPublishResource::TPtr& ev) {
+ auto& nodeState = NodesState[SelfId().NodeId()];
+
+ (*nodeState.NodeData.MutableResources()) = std::move(ev->Get()->Resources);
+ SendInfos({SelfId().NodeId()});
+
+ UpdateResourceSnapshotState();
+ }
+
+ void Handle(TEvKqpResourceInfoExchanger::TEvSendResources::TPtr& ev) {
+ auto nodeId = ev->Sender.NodeId();
+
+ LOG_D("Get resources info from node: " << nodeId);
+
+ const TVector<NKikimrKqp::TResourceExchangeNodeData> resourceInfos(
+ ev->Get()->Record.GetSnapshot().begin(), ev->Get()->Record.GetSnapshot().end());
+
+ UpdateResourceInfo(resourceInfos);
+
+ if (ev->Get()->Record.GetNeedResending()) {
+ auto nodesStateIt = NodesState.find(nodeId);
+
+ if (nodesStateIt == NodesState.end()) {
+ return;
+ }
+ SendingToNode(nodesStateIt->second, false, {SelfId().NodeId()});
+ }
+
+ UpdateResourceSnapshotState();
+ }
+
+ void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr &ev) {
+ const auto& nodeId = ev->Get()->NodeId;
+ auto nodesStateIt = NodesState.find(nodeId);
+
+ if (nodesStateIt == NodesState.end()) {
+ return;
+ }
+ SendingToNode(nodesStateIt->second, true, {SelfId().NodeId()});
+ }
+
+ void Handle(TEvPrivate::TEvRetrySending::TPtr& ev) {
+ const auto& nodeId = ev->Get()->NodeId;
+ auto nodesStateIt = NodesState.find(nodeId);
+
+ if (nodesStateIt == NodesState.end()) {
+ return;
+ }
+ nodesStateIt->second.RetryState.IsScheduled = false;
+
+ SendingToNode(nodesStateIt->second, true, {SelfId().NodeId()});
+ }
+
+ void Handle(TEvPrivate::TEvRetrySubscriber::TPtr&) {
+ CreateSubscriber();
+ }
+
+ void Handle(TEvents::TEvPoison::TPtr&) {
+ PassAway();
+ }
+
+private:
+ STATEFN(WorkState) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvStateStorage::TEvBoardInfo, Handle);
+ hFunc(TEvStateStorage::TEvBoardInfoUpdate, Handle);
+ hFunc(TEvTenantPool::TEvTenantPoolStatus, Handle);
+ hFunc(TEvKqpResourceInfoExchanger::TEvPublishResource, Handle);
+ hFunc(TEvKqpResourceInfoExchanger::TEvSendResources, Handle);
+ hFunc(TEvPrivate::TEvRetrySubscriber, Handle);
+ hFunc(TEvPrivate::TEvRetrySending, Handle);
+ hFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
+ hFunc(TEvents::TEvPoison, Handle);
+ }
+ }
+
+private:
+
+ void PassAway() override {
+ Send(BoardState.Publisher, new TEvents::TEvPoison);
+ Send(BoardState.Subscriber, new TEvents::TEvPoison);
+
+ TActor::PassAway();
+ }
+
+ TDuration GetRetryDelay(TRetryState& state) {
+ auto ret = state.CurrentDelay;
+ auto newDelay = state.CurrentDelay;
+ newDelay *= 2;
+ if (newDelay > TDuration::Seconds(1)) {
+ newDelay = TDuration::Seconds(1);
+ }
+ newDelay *= AppData()->RandomProvider->Uniform(100, 115);
+ newDelay /= 100;
+ state.CurrentDelay = newDelay;
+ return ret;
+ }
+
+ void SendingToNode(TNodeState& state, bool retry,
+ TVector<ui32> nodeIdsForSnapshot = {}, NKikimrKqp::TResourceExchangeSnapshot snapshotMsg = {}) {
+ auto& retryState = state.RetryState;
+ if (retryState.IsScheduled) {
+ return;
+ }
+
+ auto owner = ActorIdFromProto(state.NodeData.GetResourceExchangeBoardData().GetOwner());
+ auto nodeId = owner.NodeId();
+
+ auto now = TlsActivationContext->Monotonic();
+ if (retry && now - retryState.LastRetryAt < retryState.CurrentDelay) {
+ auto at = retryState.LastRetryAt + GetRetryDelay(retryState);
+ retryState.IsScheduled = true;
+ Schedule(at - now, new TEvPrivate::TEvRetrySending(nodeId));
+ return;
+ }
+
+ if (owner != SelfId()) {
+ auto msg = MakeHolder<TEvKqpResourceInfoExchanger::TEvSendResources>();
+ if (nodeIdsForSnapshot.empty()) {
+ msg->Record = std::move(snapshotMsg);
+ } else {
+ msg->Record = CreateSnapshotMessage(nodeIdsForSnapshot);
+ }
+ Send(owner, msg.Release(), IEventHandle::FlagSubscribeOnSession);
+ }
+
+ retryState.LastRetryAt = now;
+ }
+
+ NKikimrKqp::TResourceExchangeSnapshot CreateSnapshotMessage(const TVector<ui32>& nodeIds,
+ bool needResending = false) {
+ NKikimrKqp::TResourceExchangeSnapshot snapshotMsg;
+ snapshotMsg.SetNeedResending(needResending);
+ auto snapshot = snapshotMsg.MutableSnapshot();
+ snapshot->Reserve(nodeIds.size());
+
+ for (const auto& nodeId: nodeIds) {
+ auto* el = snapshot->Add();
+ *el = NodesState[nodeId].NodeData;
+ }
+
+ return snapshotMsg;
+ }
+
+private:
+
+ TString SelfInfo;
+ TULID Ulid;
+ TString UlidString;
+
+ ui64 Round = 0;
+
+ struct TBoardState {
+ TString Tenant;
+ TString Path;
+ ui32 StateStorageGroupId = std::numeric_limits<ui32>::max();
+ TActorId Publisher = TActorId();
+ TActorId Subscriber = TActorId();
+ TRetryState RetryStateForSubscriber;
+ std::optional<TInstant> LastPublishTime;
+ };
+ TBoardState BoardState;
+
+ THashMap<ui32, TNodeState> NodesState;
+ std::shared_ptr<TResourceSnapshotState> ResourceSnapshotState;
+
+ TIntrusivePtr<TKqpCounters> Counters;
+};
+
+NActors::IActor* CreateKqpResourceInfoExchangerActor(TIntrusivePtr<TKqpCounters> counters,
+ std::shared_ptr<TResourceSnapshotState> resourceSnapshotState)
+{
+ return new TKqpResourceInfoExchangerActor(counters, std::move(resourceSnapshotState));
+}
+
+} // namespace NRm
+} // namespace NKqp
+} // namespace NKikimr
diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.cpp b/ydb/core/kqp/rm_service/kqp_rm_service.cpp
index 3b11a03ab7a..97d7fadf662 100644
--- a/ydb/core/kqp/rm_service/kqp_rm_service.cpp
+++ b/ydb/core/kqp/rm_service/kqp_rm_service.cpp
@@ -10,7 +10,9 @@
#include <ydb/core/node_whiteboard/node_whiteboard.h>
#include <ydb/core/tablet/resource_broker.h>
+
#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/actor.h>
#include <library/cpp/actors/interconnect/interconnect.h>
#include <library/cpp/monlib/service/pages/templates.h>
@@ -131,14 +133,14 @@ struct TEvPrivate {
};
class TKqpResourceManager : public IKqpResourceManager {
-
public:
TKqpResourceManager(const NKikimrConfig::TTableServiceConfig::TResourceManager& config, TIntrusivePtr<TKqpCounters> counters)
: Config(config)
, Counters(counters)
, ExecutionUnitsResource(Config.GetComputeActorsCount())
- , ScanQueryMemoryResource(Config.GetQueryMemoryLimit()) {
+ , ScanQueryMemoryResource(Config.GetQueryMemoryLimit())
+ , PublishResourcesByExchanger(Config.GetEnablePublishResourcesByExchanger()) {
}
@@ -149,6 +151,13 @@ public:
ActorSystem = actorSystem;
SelfId = selfId;
UpdatePatternCache(Config.GetKqpPatternCacheCapacityBytes());
+
+ if (PublishResourcesByExchanger) {
+ ResourceSnapshotState = std::make_shared<TResourceSnapshotState>();
+ auto exchanger = CreateKqpResourceInfoExchangerActor(Counters, ResourceSnapshotState);
+ ResourceInfoExchanger = ActorSystem->Register(exchanger);
+ return;
+ }
}
bool AllocateResources(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources,
@@ -540,6 +549,15 @@ public:
void RequestClusterResourcesInfo(TOnResourcesSnapshotCallback&& callback) override {
LOG_AS_D("Schedule Snapshot request");
+ if (PublishResourcesByExchanger) {
+ std::shared_ptr<const TVector<NKikimrKqp::TKqpNodeResources>> infos;
+ with_lock (ResourceSnapshotState->Lock) {
+ infos = ResourceSnapshotState->Snapshot;
+ }
+ TVector<NKikimrKqp::TKqpNodeResources> resources = *infos;
+ callback(std::move(resources));
+ return;
+ }
auto ev = MakeHolder<TEvPrivate::TEvTakeResourcesSnapshot>();
ev->Callback = std::move(callback);
TAutoPtr<IEventHandle> handle = new IEventHandle(SelfId, SelfId, ev.Release());
@@ -622,6 +640,11 @@ public:
// pattern cache for different actors
std::shared_ptr<NMiniKQL::TComputationPatternLRUCache> PatternCache;
+
+ // state for resource info exchanger
+ std::shared_ptr<TResourceSnapshotState> ResourceSnapshotState;
+ bool PublishResourcesByExchanger;
+ TActorId ResourceInfoExchanger = TActorId();
};
struct TResourceManagers {
@@ -645,10 +668,11 @@ public:
}
TKqpResourceManagerActor(const NKikimrConfig::TTableServiceConfig::TResourceManager& config,
- TIntrusivePtr<TKqpCounters> counters, const TActorId& resourceBrokerId,
+ TIntrusivePtr<TKqpCounters> counters, const TActorId& resourceBrokerId,
std::shared_ptr<TKqpProxySharedResources>&& kqpProxySharedResources)
: ResourceBrokerId(resourceBrokerId ? resourceBrokerId : MakeResourceBrokerID())
, KqpProxySharedResources(std::move(kqpProxySharedResources))
+ , PublishResourcesByExchanger(config.GetEnablePublishResourcesByExchanger())
{
ResourceManager = std::make_shared<TKqpResourceManager>(config, counters);
}
@@ -945,6 +969,13 @@ private:
private:
void PassAway() override {
ToBroker(new TEvResourceBroker::TEvNotifyActorDied);
+ if (ResourceManager->ResourceInfoExchanger) {
+ Send(ResourceManager->ResourceInfoExchanger, new TEvents::TEvPoison);
+ }
+ if (WbState.BoardPublisherActorId) {
+
+ Send(WbState.BoardPublisherActorId, new TEvents::TEvPoison);
+ }
TActor::PassAway();
}
@@ -982,19 +1013,6 @@ private:
return;
}
- if (WbState.BoardPublisherActorId) {
- LOG_I("Kill previous board publisher for '" << WbState.BoardPath
- << "' at " << WbState.BoardPublisherActorId << ", reason: " << reason);
- Send(WbState.BoardPublisherActorId, new TEvents::TEvPoison);
- }
-
- WbState.BoardPublisherActorId = TActorId();
-
- if (WbState.StateStorageGroupId == std::numeric_limits<ui32>::max()) {
- LOG_E("Can not find default state storage group for database " << WbState.Tenant);
- return;
- }
-
NKikimrKqp::TKqpNodeResources payload;
payload.SetNodeId(SelfId().NodeId());
payload.SetTimestamp(now.Seconds());
@@ -1017,6 +1035,29 @@ private:
pool->SetAvailable(ResourceManager->ScanQueryMemoryResource.Available());
}
+ if (PublishResourcesByExchanger) {
+ LOG_I("Send to publish resource usage for "
+ << "reason: " << reason
+ << ", payload: " << payload.ShortDebugString());
+ WbState.LastPublishTime = now;
+ Send(ResourceManager->ResourceInfoExchanger,
+ new TEvKqpResourceInfoExchanger::TEvPublishResource(std::move(payload)));
+ return;
+ }
+
+ if (WbState.BoardPublisherActorId) {
+ LOG_I("Kill previous board publisher for '" << WbState.BoardPath
+ << "' at " << WbState.BoardPublisherActorId << ", reason: " << reason);
+ Send(WbState.BoardPublisherActorId, new TEvents::TEvPoison);
+ }
+
+ WbState.BoardPublisherActorId = TActorId();
+
+ if (WbState.StateStorageGroupId == std::numeric_limits<ui32>::max()) {
+ LOG_E("Can not find default state storage group for database " << WbState.Tenant);
+ return;
+ }
+
auto boardPublisher = CreateBoardPublishActor(WbState.BoardPath, payload.SerializeAsString(), SelfId(),
WbState.StateStorageGroupId, /* ttlMs */ 0, /* reg */ true);
WbState.BoardPublisherActorId = Register(boardPublisher);
@@ -1047,13 +1088,15 @@ private:
TActorId WhiteBoardService;
std::shared_ptr<TKqpResourceManager> ResourceManager;
+
+ bool PublishResourcesByExchanger;
};
} // namespace NRm
NActors::IActor* CreateKqpResourceManagerActor(const NKikimrConfig::TTableServiceConfig::TResourceManager& config,
- TIntrusivePtr<TKqpCounters> counters, NActors::TActorId resourceBroker,
+ TIntrusivePtr<TKqpCounters> counters, NActors::TActorId resourceBroker,
std::shared_ptr<TKqpProxySharedResources> kqpProxySharedResources)
{
return new NRm::TKqpResourceManagerActor(config, counters, resourceBroker, std::move(kqpProxySharedResources));
diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.h b/ydb/core/kqp/rm_service/kqp_rm_service.h
index 5b505e01b1c..2f149098b38 100644
--- a/ydb/core/kqp/rm_service/kqp_rm_service.h
+++ b/ydb/core/kqp/rm_service/kqp_rm_service.h
@@ -1,6 +1,7 @@
#pragma once
#include <ydb/core/protos/config.pb.h>
+#include <ydb/core/kqp/common/simple/kqp_event_ids.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h>
@@ -94,7 +95,7 @@ public:
virtual std::shared_ptr<NMiniKQL::TComputationPatternLRUCache> GetPatternCache() = 0;
virtual ui32 GetNodeId() {
- return 0;
+ return 0;
}
};
@@ -103,6 +104,29 @@ NActors::IActor* CreateTakeResourcesSnapshotActor(
const TString& boardPath, ui32 stateStorageGroupId,
std::function<void(TVector<NKikimrKqp::TKqpNodeResources>&&)>&& callback);
+
+struct TResourceSnapshotState {
+ std::shared_ptr<TVector<NKikimrKqp::TKqpNodeResources>> Snapshot;
+ TMutex Lock;
+};
+
+struct TEvKqpResourceInfoExchanger {
+ struct TEvPublishResource : public TEventLocal<TEvPublishResource,
+ TKqpResourceInfoExchangerEvents::EvPublishResource>
+ {
+ const NKikimrKqp::TKqpNodeResources Resources;
+ TEvPublishResource(NKikimrKqp::TKqpNodeResources resources) : Resources(std::move(resources)) {
+ }
+ };
+
+ struct TEvSendResources : public TEventPB<TEvSendResources, NKikimrKqp::TResourceExchangeSnapshot,
+ TKqpResourceInfoExchangerEvents::EvSendResources>
+ {};
+};
+
+NActors::IActor* CreateKqpResourceInfoExchangerActor(TIntrusivePtr<TKqpCounters> counters,
+ std::shared_ptr<TResourceSnapshotState> resourceSnapshotState);
+
} // namespace NRm
struct TKqpProxySharedResources {
@@ -110,7 +134,7 @@ struct TKqpProxySharedResources {
};
NActors::IActor* CreateKqpResourceManagerActor(const NKikimrConfig::TTableServiceConfig::TResourceManager& config,
- TIntrusivePtr<TKqpCounters> counters, NActors::TActorId resourceBroker = {},
+ TIntrusivePtr<TKqpCounters> counters, NActors::TActorId resourceBroker = {},
std::shared_ptr<TKqpProxySharedResources> kqpProxySharedResources = nullptr);
std::shared_ptr<NRm::IKqpResourceManager> GetKqpResourceManager(TMaybe<ui32> nodeId = Nothing());
diff --git a/ydb/core/kqp/rm_service/kqp_rm_ut.cpp b/ydb/core/kqp/rm_service/kqp_rm_ut.cpp
index 30eb41d7ca4..f109e925858 100644
--- a/ydb/core/kqp/rm_service/kqp_rm_ut.cpp
+++ b/ydb/core/kqp/rm_service/kqp_rm_ut.cpp
@@ -4,6 +4,10 @@
#include <ydb/core/testlib/actor_helpers.h>
#include <ydb/core/testlib/tablet_helpers.h>
#include <ydb/core/testlib/tenant_runtime.h>
+#include <ydb/core/kqp/common/simple/services.h>
+
+#include <library/cpp/actors/core/interconnect.h>
+#include <library/cpp/actors/interconnect/interconnect_impl.h>
#include <library/cpp/testing/unittest/registar.h>
@@ -43,6 +47,15 @@ TTenantTestConfig MakeTenantTestConfig() {
{{{DOMAIN1_NAME, {1, 1, 1}}}},
"node-type"
}
+ },
+ // Node1
+ {
+ // TenantPoolConfig
+ {
+ // Static slots {tenant, {cpu, memory, network}}
+ {{{DOMAIN1_NAME, {1, 1, 1}}}},
+ "node-type"
+ }
}
}},
// DataCenterCount
@@ -81,12 +94,14 @@ TResourceBrokerConfig MakeResourceBrokerTestConfig() {
return config;
}
-NKikimrConfig::TTableServiceConfig::TResourceManager MakeKqpResourceManagerConfig() {
+NKikimrConfig::TTableServiceConfig::TResourceManager MakeKqpResourceManagerConfig(
+ bool EnablePublishResourcesByExchanger = false) {
NKikimrConfig::TTableServiceConfig::TResourceManager config;
config.SetComputeActorsCount(100);
config.SetPublishStatisticsIntervalSec(0);
config.SetQueryMemoryLimit(1000);
+ config.SetEnablePublishResourcesByExchanger(EnablePublishResourcesByExchanger);
return config;
}
@@ -107,31 +122,50 @@ public:
Counters = MakeIntrusive<::NMonitoring::TDynamicCounters>();
- auto resourceBrokerConfig = MakeResourceBrokerTestConfig();
- auto broker = CreateResourceBrokerActor(resourceBrokerConfig, Counters);
- ResourceBrokerActorId = Runtime->Register(broker);
+ for (ui32 nodeIndex = 0; nodeIndex < Runtime->GetNodeCount(); ++nodeIndex) {
+ auto resourceBrokerConfig = MakeResourceBrokerTestConfig();
+ auto broker = CreateResourceBrokerActor(resourceBrokerConfig, Counters);
+ auto resourceBrokerActorId = Runtime->Register(broker, nodeIndex);
+ ResourceBrokers.push_back(resourceBrokerActorId);
+ }
WaitForBootstrap();
}
+ void TearDown() override {
+ ResourceBrokers.clear();
+ ResourceManagers.clear();
+ Runtime.Reset();
+ }
+
void WaitForBootstrap() {
TDispatchOptions options;
options.FinalEvents.emplace_back(TEvents::TSystem::Bootstrap, 1);
UNIT_ASSERT(Runtime->DispatchEvents(options));
}
- IActor* CreateKqpResourceManager(const NKikimrConfig::TTableServiceConfig::TResourceManager& config = {}) {
+ void CreateKqpResourceManager(
+ const NKikimrConfig::TTableServiceConfig::TResourceManager& config, ui32 nodeInd = 0) {
auto kqpCounters = MakeIntrusive<TKqpCounters>(Counters);
- auto resman = CreateKqpResourceManagerActor(config, kqpCounters, ResourceBrokerActorId);
- ResourceManagerActorId = Runtime->Register(resman);
- Runtime->EnableScheduleForActor(ResourceManagerActorId, true);
+ auto resman = CreateKqpResourceManagerActor(config, kqpCounters, ResourceBrokers[nodeInd]);
+ ResourceManagers.push_back(Runtime->Register(resman, nodeInd));
+ Runtime->RegisterService(MakeKqpResourceManagerServiceID(
+ Runtime->GetNodeId(nodeInd)), ResourceManagers.back(), nodeInd);
+ Runtime->EnableScheduleForActor(ResourceManagers.back(), true);
+ }
+
+ void StartRms(const TVector<NKikimrConfig::TTableServiceConfig::TResourceManager>& configs = {}) {
+ for (ui32 nodeIndex = 0; nodeIndex < Runtime->GetNodeCount(); ++nodeIndex) {
+ if (configs.empty()) {
+ CreateKqpResourceManager(MakeKqpResourceManagerConfig(), nodeIndex);
+ } else {
+ CreateKqpResourceManager(configs[nodeIndex], nodeIndex);
+ }
+ }
WaitForBootstrap();
- return resman;
}
void AssertResourceBrokerSensors(i64 cpu, i64 mem, i64 enqueued, i64 finished, i64 infly) {
auto q = Counters->GetSubgroup("queue", "queue_kqp_resource_manager");
-// Cerr << "-- queue_kqp_resource_manager\n";
-// q->OutputPlainText(Cerr, " ");
UNIT_ASSERT_VALUES_EQUAL(q->GetCounter("CPUConsumption")->Val(), cpu);
UNIT_ASSERT_VALUES_EQUAL(q->GetCounter("MemoryConsumption")->Val(), mem);
UNIT_ASSERT_VALUES_EQUAL(q->GetCounter("EnqueuedTasks")->Val(), enqueued);
@@ -139,8 +173,6 @@ public:
UNIT_ASSERT_VALUES_EQUAL(q->GetCounter("InFlyTasks")->Val(), infly);
auto t = Counters->GetSubgroup("task", "kqp_query");
-// Cerr << "-- kqp_query\n";
-// t->OutputPlainText(Cerr, " ");
UNIT_ASSERT_VALUES_EQUAL(t->GetCounter("CPUConsumption")->Val(), cpu);
UNIT_ASSERT_VALUES_EQUAL(t->GetCounter("MemoryConsumption")->Val(), mem);
UNIT_ASSERT_VALUES_EQUAL(t->GetCounter("EnqueuedTasks")->Val(), enqueued);
@@ -148,20 +180,92 @@ public:
UNIT_ASSERT_VALUES_EQUAL(t->GetCounter("InFlyTasks")->Val(), infly);
}
- void AssertResourceManagerStats(std::shared_ptr<NRm::IKqpResourceManager> rm, ui64 scanQueryMemory, ui32 executionUnits) {
+ void AssertResourceManagerStats(
+ std::shared_ptr<NRm::IKqpResourceManager> rm, ui64 scanQueryMemory, ui32 executionUnits) {
auto stats = rm->GetLocalResources();
UNIT_ASSERT_VALUES_EQUAL(scanQueryMemory, stats.Memory[NRm::EKqpMemoryPool::ScanQuery]);
UNIT_ASSERT_VALUES_EQUAL(executionUnits, stats.ExecutionUnits);
}
+ void Disconnect(ui32 nodeIndexFrom, ui32 nodeIndexTo) {
+ const TActorId proxy = Runtime->GetInterconnectProxy(nodeIndexFrom, nodeIndexTo);
+
+ Runtime->Send(
+ new IEventHandle(
+ proxy, TActorId(), new TEvInterconnect::TEvDisconnect(), 0, 0),
+ nodeIndexFrom, true);
+
+ //Wait for event TEvInterconnect::EvNodeDisconnected
+ TDispatchOptions options;
+ options.FinalEvents.emplace_back(TEvInterconnect::EvNodeDisconnected);
+ Runtime->DispatchEvents(options);
+ }
+
+ struct TCheckedResources {
+ ui64 ScanQueryMemory;
+ ui32 ExecutionUnits;
+
+ bool operator==(const TCheckedResources& other) const {
+ return ScanQueryMemory == other.ScanQueryMemory &&
+ ExecutionUnits == other.ExecutionUnits;
+ }
+ };
+
+ void CheckSnapshot(ui32 nodeIndToCheck, TVector<TCheckedResources> verificationData,
+ std::shared_ptr<NRm::IKqpResourceManager> currentRm) {
+ TVector<NKikimrKqp::TKqpNodeResources> snapshot;
+ std::atomic<int> ready = 0;
+
+ while(true) {
+ currentRm->RequestClusterResourcesInfo(
+ [&](TVector<NKikimrKqp::TKqpNodeResources>&& resources) {
+ snapshot = std::move(resources);
+ ready = 1;
+ });
+
+ while (ready.load() != 1) {
+ Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(100));
+ }
+
+ if (snapshot.size() != verificationData.size()) {
+ continue;
+ }
+ std::sort(snapshot.begin(), snapshot.end(), [](auto first, auto second) {
+ return first.GetNodeId() < second.GetNodeId();
+ });
+
+ TVector<TCheckedResources> currentData;
+ std::transform(snapshot.cbegin(), snapshot.cend(), std::back_inserter(currentData),
+ [](const NKikimrKqp::TKqpNodeResources& cur) {
+ return TCheckedResources{cur.GetMemory()[0].GetAvailable(), cur.GetExecutionUnits()};
+ });
+
+ if (verificationData[nodeIndToCheck] == currentData[nodeIndToCheck]) {
+ for (ui32 i = 0; i < verificationData.size(); i++) {
+ if (i != nodeIndToCheck) {
+ UNIT_ASSERT_VALUES_EQUAL(verificationData[i].ScanQueryMemory, currentData[i].ScanQueryMemory);
+ UNIT_ASSERT_VALUES_EQUAL(verificationData[i].ExecutionUnits, currentData[i].ExecutionUnits);
+ }
+ }
+ break;
+ }
+ }
+ }
+
UNIT_TEST_SUITE(KqpRm);
UNIT_TEST(SingleTask);
UNIT_TEST(ManyTasks);
UNIT_TEST(NotEnoughMemory);
UNIT_TEST(NotEnoughExecutionUnits);
UNIT_TEST(ResourceBrokerNotEnoughResources);
- UNIT_TEST(Snapshot);
+ UNIT_TEST(SingleSnapshotByStateStorage);
+ UNIT_TEST(SingleSnapshotByExchanger);
UNIT_TEST(Reduce);
+ UNIT_TEST(SnapshotSharingByStateStorage);
+ UNIT_TEST(SnapshotSharingByExchanger);
+ UNIT_TEST(NodesMembershipByStateStorage);
+ UNIT_TEST(NodesMembershipByExchanger);
+ UNIT_TEST(DisonnectNodes);
UNIT_TEST_SUITE_END();
void SingleTask();
@@ -169,24 +273,35 @@ public:
void NotEnoughMemory();
void NotEnoughExecutionUnits();
void ResourceBrokerNotEnoughResources();
- void Snapshot();
+ void Snapshot(bool byExchanger);
+ void SingleSnapshotByStateStorage();
+ void SingleSnapshotByExchanger();
void Reduce();
+ void SnapshotSharing(bool byExchanger);
+ void SnapshotSharingByStateStorage();
+ void SnapshotSharingByExchanger();
+ void NodesMembership(bool byExchanger);
+ void NodesMembershipByStateStorage();
+ void NodesMembershipByExchanger();
+ void DisonnectNodes();
private:
THolder<TTestBasicRuntime> Runtime;
TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters;
- // TIntrusivePtr<TKqpCounters> KqpCounters;
- TActorId ResourceBrokerActorId;
- TActorId ResourceManagerActorId;
+ TVector<TActorId> ResourceBrokers;
+ TVector<TActorId> ResourceManagers;
};
UNIT_TEST_SUITE_REGISTRATION(KqpRm);
void KqpRm::SingleTask() {
- CreateKqpResourceManager(MakeKqpResourceManagerConfig());
+ StartRms();
NKikimr::TActorSystemStub stub;
- auto rm = GetKqpResourceManager(ResourceManagerActorId.NodeId());
+ auto rm = GetKqpResourceManager(ResourceManagers.front().NodeId());
+
+ auto stats = rm->GetLocalResources();
+ UNIT_ASSERT_VALUES_EQUAL(1000, stats.Memory[NRm::EKqpMemoryPool::ScanQuery]);
NRm::TKqpResourcesRequest request;
request.ExecutionUnits = 10;
@@ -205,10 +320,10 @@ void KqpRm::SingleTask() {
}
void KqpRm::ManyTasks() {
- CreateKqpResourceManager(MakeKqpResourceManagerConfig());
+ StartRms();
NKikimr::TActorSystemStub stub;
- auto rm = GetKqpResourceManager(ResourceManagerActorId.NodeId());
+ auto rm = GetKqpResourceManager(ResourceManagers.front().NodeId());
NRm::TKqpResourcesRequest request;
request.ExecutionUnits = 10;
@@ -243,10 +358,10 @@ void KqpRm::ManyTasks() {
}
void KqpRm::NotEnoughMemory() {
- CreateKqpResourceManager(MakeKqpResourceManagerConfig());
+ StartRms();
NKikimr::TActorSystemStub stub;
- auto rm = GetKqpResourceManager(ResourceManagerActorId.NodeId());
+ auto rm = GetKqpResourceManager(ResourceManagers.front().NodeId());
NRm::TKqpResourcesRequest request;
request.ExecutionUnits = 10;
@@ -261,10 +376,10 @@ void KqpRm::NotEnoughMemory() {
}
void KqpRm::NotEnoughExecutionUnits() {
- CreateKqpResourceManager(MakeKqpResourceManagerConfig());
+ StartRms();
NKikimr::TActorSystemStub stub;
- auto rm = GetKqpResourceManager(ResourceManagerActorId.NodeId());
+ auto rm = GetKqpResourceManager(ResourceManagers.front().NodeId());
NRm::TKqpResourcesRequest request;
request.ExecutionUnits = 1000;
@@ -281,10 +396,11 @@ void KqpRm::NotEnoughExecutionUnits() {
void KqpRm::ResourceBrokerNotEnoughResources() {
auto config = MakeKqpResourceManagerConfig();
config.SetQueryMemoryLimit(100000000);
- CreateKqpResourceManager(config);
+
+ StartRms({config, MakeKqpResourceManagerConfig()});
NKikimr::TActorSystemStub stub;
- auto rm = GetKqpResourceManager(ResourceManagerActorId.NodeId());
+ auto rm = GetKqpResourceManager(ResourceManagers.front().NodeId());
NRm::TKqpResourcesRequest request;
request.ExecutionUnits = 10;
@@ -303,11 +419,11 @@ void KqpRm::ResourceBrokerNotEnoughResources() {
AssertResourceBrokerSensors(0, 1000, 0, 0, 1);
}
-void KqpRm::Snapshot() {
- CreateKqpResourceManager(MakeKqpResourceManagerConfig());
+void KqpRm::Snapshot(bool byExchanger) {
+ StartRms({MakeKqpResourceManagerConfig(byExchanger), MakeKqpResourceManagerConfig(byExchanger)});
NKikimr::TActorSystemStub stub;
- auto rm = GetKqpResourceManager(ResourceManagerActorId.NodeId());
+ auto rm = GetKqpResourceManager(ResourceManagers.front().NodeId());
NRm::TKqpResourcesRequest request;
request.ExecutionUnits = 10;
@@ -325,24 +441,7 @@ void KqpRm::Snapshot() {
Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(500));
- {
- TVector<NKikimrKqp::TKqpNodeResources> snapshot;
- std::atomic<int> ready = 0;
- rm->RequestClusterResourcesInfo([&](TVector<NKikimrKqp::TKqpNodeResources>&& resources) {
- snapshot = std::move(resources);
- ready = 1;
- });
-
- while (ready.load() != 1) {
- Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(100));
- }
-
- UNIT_ASSERT_VALUES_EQUAL(1, snapshot.size());
- UNIT_ASSERT_VALUES_EQUAL(80, snapshot[0].GetExecutionUnits());
- UNIT_ASSERT_VALUES_EQUAL(1, snapshot[0].GetMemory().size());
- UNIT_ASSERT_VALUES_EQUAL(1, snapshot[0].GetMemory()[0].GetPool());
- UNIT_ASSERT_VALUES_EQUAL(800, snapshot[0].GetMemory()[0].GetAvailable());
- }
+ CheckSnapshot(0, {{800, 80}, {1000, 100}}, rm);
rm->FreeResources(1);
rm->FreeResources(2);
@@ -351,31 +450,22 @@ void KqpRm::Snapshot() {
Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(500));
- {
- TVector<NKikimrKqp::TKqpNodeResources> snapshot;
- std::atomic<int> ready = 0;
- rm->RequestClusterResourcesInfo([&](TVector<NKikimrKqp::TKqpNodeResources>&& resources) {
- snapshot = std::move(resources);
- ready = 1;
- });
+ CheckSnapshot(0, {{1000, 100}, {1000, 100}}, rm);
+}
- while (ready.load() != 1) {
- Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(100));
- }
+void KqpRm::SingleSnapshotByStateStorage() {
+ Snapshot(false);
+}
- UNIT_ASSERT_VALUES_EQUAL(1, snapshot.size());
- UNIT_ASSERT_VALUES_EQUAL(100, snapshot[0].GetExecutionUnits());
- UNIT_ASSERT_VALUES_EQUAL(1, snapshot[0].GetMemory().size());
- UNIT_ASSERT_VALUES_EQUAL(1, snapshot[0].GetMemory()[0].GetPool());
- UNIT_ASSERT_VALUES_EQUAL(1000, snapshot[0].GetMemory()[0].GetAvailable());
- }
+void KqpRm::SingleSnapshotByExchanger() {
+ Snapshot(true);
}
void KqpRm::Reduce() {
- CreateKqpResourceManager(MakeKqpResourceManagerConfig());
+ StartRms();
NKikimr::TActorSystemStub stub;
- auto rm = GetKqpResourceManager(ResourceManagerActorId.NodeId());
+ auto rm = GetKqpResourceManager(ResourceManagers.front().NodeId());
NRm::TKqpResourcesRequest request;
request.ExecutionUnits = 10;
@@ -408,5 +498,125 @@ void KqpRm::Reduce() {
AssertResourceBrokerSensors(0, 30, 0, 0, 1);
}
+void KqpRm::SnapshotSharing(bool byExchanger) {
+ StartRms({MakeKqpResourceManagerConfig(byExchanger), MakeKqpResourceManagerConfig(byExchanger)});
+ NKikimr::TActorSystemStub stub;
+
+ auto rm_first = GetKqpResourceManager(ResourceManagers[0].NodeId());
+ auto rm_second = GetKqpResourceManager(ResourceManagers[1].NodeId());
+
+ CheckSnapshot(0, {{1000, 100}, {1000, 100}}, rm_first);
+ CheckSnapshot(1, {{1000, 100}, {1000, 100}}, rm_second);
+
+ NRm::TKqpResourcesRequest request;
+ request.ExecutionUnits = 10;
+ request.MemoryPool = NRm::EKqpMemoryPool::ScanQuery;
+ request.Memory = 100;
+
+ {
+ bool allocated = rm_first->AllocateResources(1, 2, request);
+ UNIT_ASSERT(allocated);
+
+ allocated = rm_first->AllocateResources(2, 1, request);
+ UNIT_ASSERT(allocated);
+
+ Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(500));
+
+ CheckSnapshot(0, {{800, 80}, {1000, 100}}, rm_second);
+ }
+
+ {
+ bool allocated = rm_second->AllocateResources(1, 2, request);
+ UNIT_ASSERT(allocated);
+
+ allocated = rm_second->AllocateResources(2, 1, request);
+ UNIT_ASSERT(allocated);
+
+ Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(500));
+
+ CheckSnapshot(1, {{800, 80}, {800, 80}}, rm_first);
+ }
+
+ {
+ rm_first->FreeResources(1);
+ rm_first->FreeResources(2);
+
+ Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(500));
+
+ CheckSnapshot(0, {{1000, 100}, {800, 80}}, rm_second);
+ }
+
+ {
+ rm_second->FreeResources(1);
+ rm_second->FreeResources(2);
+
+ Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(500));
+
+ CheckSnapshot(1, {{1000, 100}, {1000, 100}}, rm_first);
+ }
+}
+
+void KqpRm::SnapshotSharingByStateStorage() {
+ SnapshotSharing(false);
+}
+
+void KqpRm::SnapshotSharingByExchanger() {
+ SnapshotSharing(true);
+}
+
+void KqpRm::NodesMembership(bool byExchanger) {
+ StartRms({MakeKqpResourceManagerConfig(byExchanger), MakeKqpResourceManagerConfig(byExchanger)});
+ NKikimr::TActorSystemStub stub;
+
+ auto rm_first = GetKqpResourceManager(ResourceManagers[0].NodeId());
+ auto rm_second = GetKqpResourceManager(ResourceManagers[1].NodeId());
+
+ CheckSnapshot(0, {{1000, 100}, {1000, 100}}, rm_first);
+ CheckSnapshot(1, {{1000, 100}, {1000, 100}}, rm_second);
+
+ const TActorId edge = Runtime->AllocateEdgeActor(1);
+ Runtime->Send(new IEventHandle(
+ ResourceManagers[1], edge, new TEvents::TEvPoison, IEventHandle::FlagTrackDelivery, 0),
+ 1, false);
+
+ TDispatchOptions options;
+ options.FinalEvents.emplace_back(TEvents::TSystem::Poison, 1);
+ UNIT_ASSERT(Runtime->DispatchEvents(options));
+
+ CheckSnapshot(0, {{1000, 100}}, rm_first);
+}
+
+void KqpRm::NodesMembershipByStateStorage() {
+ NodesMembership(false);
+}
+
+void KqpRm::NodesMembershipByExchanger() {
+ NodesMembership(true);
+}
+
+void KqpRm::DisonnectNodes() {
+ StartRms({MakeKqpResourceManagerConfig(true), MakeKqpResourceManagerConfig(true)});
+ NKikimr::TActorSystemStub stub;
+
+ auto rm_first = GetKqpResourceManager(ResourceManagers[0].NodeId());
+ auto rm_second = GetKqpResourceManager(ResourceManagers[1].NodeId());
+
+ CheckSnapshot(0, {{1000, 100}, {1000, 100}}, rm_first);
+ CheckSnapshot(1, {{1000, 100}, {1000, 100}}, rm_second);
+
+ auto prevObserverFunc = Runtime->SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ switch (ev->GetTypeRewrite()) {
+ case NRm::TEvKqpResourceInfoExchanger::TEvSendResources::EventType: {
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+
+ Disconnect(0, 1);
+
+ CheckSnapshot(0, {{1000, 100}}, rm_first);
+}
+
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/rm_service/ya.make b/ydb/core/kqp/rm_service/ya.make
index 9fc7b2b5f89..46fe31663d1 100644
--- a/ydb/core/kqp/rm_service/ya.make
+++ b/ydb/core/kqp/rm_service/ya.make
@@ -3,6 +3,7 @@ LIBRARY()
SRCS(
kqp_resource_tracker.cpp
kqp_resource_estimation.cpp
+ kqp_resource_info_exchanger.cpp
kqp_rm_service.cpp
kqp_snapshot_manager.cpp
)
@@ -20,6 +21,7 @@ PEERDIR(
ydb/core/protos
ydb/core/tablet
ydb/core/node_whiteboard
+ ydb/core/util
)
YQL_LAST_ABI_VERSION()
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index e09a4bd41be..c036d3de57d 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1126,6 +1126,7 @@ message TTableServiceConfig {
reserved 14;
optional TShardsScanningPolicy ShardsScanningPolicy = 16;
optional uint64 KqpPatternCacheCapacityBytes = 17 [default = 104857600]; // 100 MiB, 0 is for disable
+ optional bool EnablePublishResourcesByExchanger = 18 [default = false];
}
message TSpillingServiceConfig {
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index 876d6de11b6..605e01cb57a 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -515,6 +515,12 @@ message TEvCancelKqpTasksRequest {
message TEvCancelKqpTasksResponse {
}
+message TResourceExchangeBoardData {
+ optional NActorsProto.TActorId Owner = 1;
+ optional bytes Ulid = 2;
+ optional NActorsProto.TActorId Publisher = 3;
+}
+
message TKqpNodeResources {
optional NActorsProto.TActorId ResourceManagerActorId = 2; // legacy
optional uint32 AvailableComputeActors = 4; // legacy
@@ -629,6 +635,17 @@ message TEvFetchScriptResultsResponse {
optional Ydb.ResultSet ResultSet = 4;
}
+message TResourceExchangeNodeData {
+ optional TKqpNodeResources Resources = 1;
+ optional TResourceExchangeBoardData ResourceExchangeBoardData = 2;
+ optional uint64 Round = 3;
+}
+
+message TResourceExchangeSnapshot {
+ optional bool NeedResending = 1;
+ repeated TResourceExchangeNodeData Snapshot = 2;
+}
+
// Request that is sent to run script actor to cancel execution and write finish status to database.
message TEvCancelScriptExecutionRequest {
}