aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorshumkovnd <shumkovnd@yandex-team.com>2023-07-06 19:23:12 +0300
committershumkovnd <shumkovnd@yandex-team.com>2023-07-06 19:23:12 +0300
commit881a94d5b1395a4d439e6517f933f0253f3d6c80 (patch)
tree02e8d37b2fd3993e9001ff53b17649308641b286
parent8e303d9d16ea628a70ef50c435f225b8c4110641 (diff)
downloadydb-881a94d5b1395a4d439e6517f933f0253f3d6c80.tar.gz
KIKIMR-16187: update TKqpResourceInfoExchangerActor
-rw-r--r--ydb/core/kqp/counters/kqp_counters.cpp1
-rw-r--r--ydb/core/kqp/counters/kqp_counters.h1
-rw-r--r--ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp66
-rw-r--r--ydb/core/kqp/rm_service/kqp_rm_service.cpp39
-rw-r--r--ydb/core/kqp/rm_service/kqp_rm_ut.cpp26
-rw-r--r--ydb/core/protos/config.proto2
6 files changed, 119 insertions, 16 deletions
diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp
index 8efd5f0b86..9c8bee6885 100644
--- a/ydb/core/kqp/counters/kqp_counters.cpp
+++ b/ydb/core/kqp/counters/kqp_counters.cpp
@@ -761,6 +761,7 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co
RmInternalError = KqpGroup->GetCounter("RM/InternalError", true);
RmSnapshotLatency = KqpGroup->GetHistogram(
"RM/SnapshotLatency", NMonitoring::ExponentialHistogram(20, 2, 1));
+ RmMaxSnapshotLatency = KqpGroup->GetCounter("RM/MaxSnapshotLatency", false);
/* Spilling */
SpillingWriteBlobs = KqpGroup->GetCounter("Spilling/WriteBlobs", true);
diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h
index 0d2943e0fb..5862787aaf 100644
--- a/ydb/core/kqp/counters/kqp_counters.h
+++ b/ydb/core/kqp/counters/kqp_counters.h
@@ -357,6 +357,7 @@ public:
::NMonitoring::TDynamicCounters::TCounterPtr RmExtraMemAllocs;
::NMonitoring::TDynamicCounters::TCounterPtr RmInternalError;
NMonitoring::THistogramPtr RmSnapshotLatency;
+ ::NMonitoring::TDynamicCounters::TCounterPtr RmMaxSnapshotLatency;
// Spilling counters
::NMonitoring::TDynamicCounters::TCounterPtr SpillingWriteBlobs;
diff --git a/ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp b/ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp
index 836c61262d..50873cec8c 100644
--- a/ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp
+++ b/ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp
@@ -28,7 +28,9 @@ class TKqpResourceInfoExchangerActor : public TActorBootstrapped<TKqpResourceInf
struct TEvPrivate {
enum EEv {
EvRetrySending = EventSpaceBegin(TEvents::ES_PRIVATE),
+ EvRegularSending,
EvRetrySubscriber,
+ EvUpdateSnapshotState,
};
struct TEvRetrySending :
@@ -39,15 +41,23 @@ class TKqpResourceInfoExchangerActor : public TActorBootstrapped<TKqpResourceInf
}
};
+ struct TEvRegularSending :
+ public TEventLocal<TEvRegularSending, EEv::EvRegularSending> {
+ };
+
struct TEvRetrySubscriber :
public TEventLocal<TEvRetrySubscriber, EEv::EvRetrySubscriber> {
};
+
+ struct TEvUpdateSnapshotState:
+ public TEventLocal<TEvUpdateSnapshotState, EEv::EvUpdateSnapshotState> {
+ };
};
struct TRetryState {
bool IsScheduled = false;
NMonotonic::TMonotonic LastRetryAt = TMonotonic::Zero();
- TDuration CurrentDelay = TDuration::MilliSeconds(10);
+ TDuration CurrentDelay = TDuration::MilliSeconds(50);
};
struct TNodeState {
@@ -210,8 +220,19 @@ private:
}
void UpdateResourceSnapshotState() {
+ if (ResourceSnapshotRetryState.IsScheduled) {
+ return;
+ }
+ auto now = TlsActivationContext->Monotonic();
+ if (now - ResourceSnapshotRetryState.LastRetryAt < ResourceSnapshotRetryState.CurrentDelay) {
+ auto at = ResourceSnapshotRetryState.LastRetryAt + GetRetryDelay(ResourceSnapshotRetryState);
+ ResourceSnapshotRetryState.IsScheduled = true;
+ Schedule(at - now, new TEvPrivate::TEvUpdateSnapshotState);
+ return;
+ }
TVector<NKikimrKqp::TKqpNodeResources> resources;
resources.reserve(NodesState.size());
+
for (const auto& [id, state] : NodesState) {
auto currentResources = state.NodeData.GetResources();
if (currentResources.GetNodeId()) {
@@ -223,6 +244,8 @@ private:
ResourceSnapshotState->Snapshot =
std::make_shared<TVector<NKikimrKqp::TKqpNodeResources>>(std::move(resources));
}
+
+ ResourceSnapshotRetryState.LastRetryAt = now;
}
void SendInfos(TVector<ui32> infoNodeIds, TVector<ui32> nodeIds = {}) {
@@ -238,14 +261,22 @@ private:
if (nodesStateIt == NodesState.end()) {
return;
}
- SendingToNode(nodesStateIt->second, false, {}, snapshotMsg);
+ SendingToNode(nodesStateIt->second, true, {}, snapshotMsg);
}
} else {
auto snapshotMsg = CreateSnapshotMessage(infoNodeIds, false);
+ TDuration currentLatency = TDuration::MilliSeconds(0);
+ auto nowMonotic = TlsActivationContext->Monotonic();
+
for (auto& [id, state] : NodesState) {
- SendingToNode(state, false, {}, snapshotMsg);
+ SendingToNode(state, true, {}, snapshotMsg);
+ if (id != SelfId().NodeId()) {
+ currentLatency = Max(currentLatency, nowMonotic - state.LastUpdateAt);
+ }
}
+
+ Counters->RmMaxSnapshotLatency->Set(currentLatency.MilliSeconds());
}
}
@@ -301,6 +332,8 @@ private:
LOG_I("Received tenant pool status for exchanger, serving tenant: " << BoardState.Tenant
<< ", board: " << BoardState.Path
<< ", ssGroupId: " << BoardState.StateStorageGroupId);
+
+ Schedule(TDuration::Seconds(2), new TEvPrivate::TEvRegularSending);
}
@@ -312,6 +345,7 @@ private:
CreateSubscriber();
return;
}
+
LOG_D("Get board info from subscriber, serving tenant: " << BoardState.Tenant
<< ", board: " << BoardState.Path
<< ", ssGroupId: " << BoardState.StateStorageGroupId
@@ -405,6 +439,19 @@ private:
PassAway();
}
+ void Handle(TEvPrivate::TEvUpdateSnapshotState::TPtr&) {
+ ResourceSnapshotRetryState.IsScheduled = false;
+
+ UpdateResourceSnapshotState();
+ }
+
+ void Handle(TEvPrivate::TEvRegularSending::TPtr&) {
+ SendInfos({SelfId().NodeId()});
+
+ Schedule(TDuration::Seconds(2), new TEvPrivate::TEvRegularSending);
+ }
+
+
private:
STATEFN(WorkState) {
switch (ev->GetTypeRewrite()) {
@@ -415,6 +462,8 @@ private:
hFunc(TEvKqpResourceInfoExchanger::TEvSendResources, Handle);
hFunc(TEvPrivate::TEvRetrySubscriber, Handle);
hFunc(TEvPrivate::TEvRetrySending, Handle);
+ hFunc(TEvPrivate::TEvRegularSending, Handle);
+ hFunc(TEvPrivate::TEvUpdateSnapshotState, Handle);
hFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
hFunc(TEvents::TEvPoison, Handle);
}
@@ -430,16 +479,15 @@ private:
}
TDuration GetRetryDelay(TRetryState& state) {
- auto ret = state.CurrentDelay;
auto newDelay = state.CurrentDelay;
newDelay *= 2;
- if (newDelay > TDuration::Seconds(1)) {
- newDelay = TDuration::Seconds(1);
+ if (newDelay > TDuration::Seconds(2)) {
+ newDelay = TDuration::Seconds(2);
}
- newDelay *= AppData()->RandomProvider->Uniform(100, 115);
+ newDelay *= AppData()->RandomProvider->Uniform(10, 200);
newDelay /= 100;
state.CurrentDelay = newDelay;
- return ret;
+ return state.CurrentDelay;
}
void SendingToNode(TNodeState& state, bool retry,
@@ -507,6 +555,8 @@ private:
};
TBoardState BoardState;
+ TRetryState ResourceSnapshotRetryState;
+
THashMap<ui32, TNodeState> NodesState;
std::shared_ptr<TResourceSnapshotState> ResourceSnapshotState;
diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.cpp b/ydb/core/kqp/rm_service/kqp_rm_service.cpp
index 97d7fadf66..ef6dab2ac6 100644
--- a/ydb/core/kqp/rm_service/kqp_rm_service.cpp
+++ b/ydb/core/kqp/rm_service/kqp_rm_service.cpp
@@ -912,6 +912,16 @@ private:
void HandleWork(NMon::TEvHttpInfo::TPtr& ev) {
TStringStream str;
str.Reserve(8 * 1024);
+
+ auto snapshot = TVector<NKikimrKqp::TKqpNodeResources>();
+
+ if (PublishResourcesByExchanger) {
+ ResourceManager->RequestClusterResourcesInfo(
+ [&snapshot](TVector<NKikimrKqp::TKqpNodeResources>&& resources) {
+ snapshot = std::move(resources);
+ });
+ }
+
HTML(str) {
PRE() {
str << "Current config:" << Endl;
@@ -960,6 +970,35 @@ private:
}
} // with_lock (bucket.Lock)
}
+
+ if (snapshot.empty()) {
+ str << "No nodes resource info" << Endl;
+ } else {
+ str << Endl << "Resources info: " << Endl;
+ str << "Nodes count: " << snapshot.size() << Endl;
+ str << Endl;
+ for(const auto& entry : snapshot) {
+ str << " NodeId: " << entry.GetNodeId() << Endl;
+ str << " ResourceManagerActorId: " << entry.GetResourceManagerActorId() << Endl;
+ str << " AvailableComputeActors: " << entry.GetAvailableComputeActors() << Endl;
+ str << " UsedMemory: " << entry.GetUsedMemory() << Endl;
+ str << " TotalMemory: " << entry.GetTotalMemory() << Endl;
+ str << " Transactions:" << Endl;
+ for (const auto& tx: entry.GetTransactions()) {
+ str << " TxId: " << tx.GetTxId() << Endl;
+ str << " ComputeActors: " << tx.GetComputeActors() << Endl;
+ str << " Memory: " << tx.GetMemory() << Endl;
+ str << " StartTimestamp: " << tx.GetStartTimestamp() << Endl;
+ }
+ str << " Timestamp: " << entry.GetTimestamp() << Endl;
+ str << " Memory:" << Endl;;
+ for (const auto& memoryInfo: entry.GetMemory()) {
+ str << " Pool: " << memoryInfo.GetPool() << Endl;
+ str << " Available: " << memoryInfo.GetAvailable() << Endl;
+ }
+ str << " ExecutionUnits: " << entry.GetExecutionUnits() << Endl;
+ }
+ }
} // PRE()
}
diff --git a/ydb/core/kqp/rm_service/kqp_rm_ut.cpp b/ydb/core/kqp/rm_service/kqp_rm_ut.cpp
index f109e92585..e634bd4e0a 100644
--- a/ydb/core/kqp/rm_service/kqp_rm_ut.cpp
+++ b/ydb/core/kqp/rm_service/kqp_rm_ut.cpp
@@ -224,7 +224,7 @@ public:
});
while (ready.load() != 1) {
- Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(100));
+ Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1));
}
if (snapshot.size() != verificationData.size()) {
@@ -439,7 +439,7 @@ void KqpRm::Snapshot(bool byExchanger) {
AssertResourceManagerStats(rm, 800, 80);
AssertResourceBrokerSensors(0, 200, 0, 0, 2);
- Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(500));
+ Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1));
CheckSnapshot(0, {{800, 80}, {1000, 100}}, rm);
@@ -448,7 +448,7 @@ void KqpRm::Snapshot(bool byExchanger) {
AssertResourceManagerStats(rm, 1000, 100);
AssertResourceBrokerSensors(0, 0, 0, 2, 0);
- Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(500));
+ Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1));
CheckSnapshot(0, {{1000, 100}, {1000, 100}}, rm);
}
@@ -505,6 +505,8 @@ void KqpRm::SnapshotSharing(bool byExchanger) {
auto rm_first = GetKqpResourceManager(ResourceManagers[0].NodeId());
auto rm_second = GetKqpResourceManager(ResourceManagers[1].NodeId());
+ Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4));
+
CheckSnapshot(0, {{1000, 100}, {1000, 100}}, rm_first);
CheckSnapshot(1, {{1000, 100}, {1000, 100}}, rm_second);
@@ -517,10 +519,12 @@ void KqpRm::SnapshotSharing(bool byExchanger) {
bool allocated = rm_first->AllocateResources(1, 2, request);
UNIT_ASSERT(allocated);
+ Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4));
+
allocated = rm_first->AllocateResources(2, 1, request);
UNIT_ASSERT(allocated);
- Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(500));
+ Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4));
CheckSnapshot(0, {{800, 80}, {1000, 100}}, rm_second);
}
@@ -529,10 +533,12 @@ void KqpRm::SnapshotSharing(bool byExchanger) {
bool allocated = rm_second->AllocateResources(1, 2, request);
UNIT_ASSERT(allocated);
+ Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4));
+
allocated = rm_second->AllocateResources(2, 1, request);
UNIT_ASSERT(allocated);
- Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(500));
+ Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4));
CheckSnapshot(1, {{800, 80}, {800, 80}}, rm_first);
}
@@ -541,7 +547,7 @@ void KqpRm::SnapshotSharing(bool byExchanger) {
rm_first->FreeResources(1);
rm_first->FreeResources(2);
- Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(500));
+ Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4));
CheckSnapshot(0, {{1000, 100}, {800, 80}}, rm_second);
}
@@ -550,7 +556,7 @@ void KqpRm::SnapshotSharing(bool byExchanger) {
rm_second->FreeResources(1);
rm_second->FreeResources(2);
- Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(500));
+ Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4));
CheckSnapshot(1, {{1000, 100}, {1000, 100}}, rm_first);
}
@@ -571,6 +577,8 @@ void KqpRm::NodesMembership(bool byExchanger) {
auto rm_first = GetKqpResourceManager(ResourceManagers[0].NodeId());
auto rm_second = GetKqpResourceManager(ResourceManagers[1].NodeId());
+ Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4));
+
CheckSnapshot(0, {{1000, 100}, {1000, 100}}, rm_first);
CheckSnapshot(1, {{1000, 100}, {1000, 100}}, rm_second);
@@ -583,6 +591,8 @@ void KqpRm::NodesMembership(bool byExchanger) {
options.FinalEvents.emplace_back(TEvents::TSystem::Poison, 1);
UNIT_ASSERT(Runtime->DispatchEvents(options));
+ Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4));
+
CheckSnapshot(0, {{1000, 100}}, rm_first);
}
@@ -601,6 +611,8 @@ void KqpRm::DisonnectNodes() {
auto rm_first = GetKqpResourceManager(ResourceManagers[0].NodeId());
auto rm_second = GetKqpResourceManager(ResourceManagers[1].NodeId());
+ Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(500));
+
CheckSnapshot(0, {{1000, 100}, {1000, 100}}, rm_first);
CheckSnapshot(1, {{1000, 100}, {1000, 100}}, rm_second);
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 5c784ee380..50dc1d313e 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1127,7 +1127,7 @@ message TTableServiceConfig {
reserved 14;
optional TShardsScanningPolicy ShardsScanningPolicy = 16;
optional uint64 KqpPatternCacheCapacityBytes = 17 [default = 104857600]; // 100 MiB, 0 is for disable
- optional bool EnablePublishResourcesByExchanger = 18 [default = false];
+ optional bool EnablePublishResourcesByExchanger = 18 [default = true];
}
message TSpillingServiceConfig {