summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorshumkovnd <[email protected]>2023-07-26 17:42:56 +0300
committershumkovnd <[email protected]>2023-07-26 17:42:56 +0300
commit732af3036d7c8921f80eeb7127a8d817611224ab (patch)
tree048e055b8a9e799de3ad45040048f2d10aa1bd56
parent3785d5f97965bccf048718d8717904cf50f9f8f9 (diff)
KIKIMR-16187: fix gossip
-rw-r--r--ydb/core/base/board_lookup.cpp41
-rw-r--r--ydb/core/base/board_publish.cpp41
-rw-r--r--ydb/core/base/statestorage.h13
-rw-r--r--ydb/core/kqp/counters/kqp_counters.cpp1
-rw-r--r--ydb/core/kqp/counters/kqp_counters.h1
-rw-r--r--ydb/core/kqp/node_service/kqp_node_ut.cpp5
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.cpp91
-rw-r--r--ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp397
-rw-r--r--ydb/core/kqp/rm_service/kqp_rm_service.cpp71
-rw-r--r--ydb/core/kqp/rm_service/kqp_rm_service.h3
-rw-r--r--ydb/core/kqp/rm_service/kqp_rm_ut.cpp27
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.h5
-rw-r--r--ydb/core/protos/config.proto12
13 files changed, 526 insertions, 182 deletions
diff --git a/ydb/core/base/board_lookup.cpp b/ydb/core/base/board_lookup.cpp
index fc1e0f5fb28..c453dc754bb 100644
--- a/ydb/core/base/board_lookup.cpp
+++ b/ydb/core/base/board_lookup.cpp
@@ -27,6 +27,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
const EBoardLookupMode Mode;
const ui32 StateStorageGroupId;
const bool Subscriber;
+ TBoardRetrySettings BoardRetrySettings;
static constexpr int MAX_REPLICAS_COUNT_EXP = 32; // Replicas.size() <= 2**MAX_REPLICAS_COUNT_EXP
@@ -59,7 +60,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
bool IsScheduled = false;
ui32 ReconnectNumber = 0;
NMonotonic::TMonotonic LastReconnectAt = TMonotonic::Zero();
- TDuration CurrentDelay = TDuration::MilliSeconds(100);
+ TDuration CurrentDelay = TDuration::Zero();
};
TVector<TReplica> Replicas;
@@ -69,13 +70,20 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
ui32 WaitForReplicasToSuccess;
+ const TDuration& GetCurrentDelay(TReplica& replica) {
+ if (replica.CurrentDelay == TDuration::Zero()) {
+ replica.CurrentDelay = BoardRetrySettings.StartDelayMs;
+ }
+ return replica.CurrentDelay;
+ }
+
TDuration GetReconnectDelayForReplica(TReplica& replica) {
auto newDelay = replica.CurrentDelay;
newDelay *= 2;
- if (newDelay > TDuration::Seconds(5)) {
- newDelay = TDuration::Seconds(5);
+ if (newDelay > BoardRetrySettings.MaxDelayMs) {
+ newDelay = BoardRetrySettings.MaxDelayMs;
}
- newDelay *= AppData()->RandomProvider->Uniform(100, 115);
+ newDelay *= AppData()->RandomProvider->Uniform(50, 200);
newDelay /= 100;
replica.CurrentDelay = newDelay;
return replica.CurrentDelay;
@@ -402,7 +410,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
CheckCompletion();
}
- void ReconnectReplica(ui32 replicaIdx) {
+ void ReconnectReplica(ui32 replicaIdx, bool fromReconnect = false) {
auto& replica = Replicas[replicaIdx];
if (!Subscriber) {
@@ -413,12 +421,20 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
}
auto now = TlsActivationContext->Monotonic();
- if (now - replica.LastReconnectAt < replica.CurrentDelay) {
+ if (now - replica.LastReconnectAt < GetCurrentDelay(replica)) {
auto at = replica.LastReconnectAt + GetReconnectDelayForReplica(replica);
replica.IsScheduled = true;
Schedule(at - now, new TEvPrivate::TEvReconnectReplicas(replicaIdx));
return;
}
+ if (!fromReconnect) {
+ auto delay = TDuration::Seconds(1);
+ delay *= AppData()->RandomProvider->Uniform(10, 200);
+ delay /= 100;
+ replica.IsScheduled = true;
+ Schedule(delay, new TEvPrivate::TEvReconnectReplicas(replicaIdx));
+ return;
+ }
replica.ReconnectNumber++;
replica.State = EReplicaState::Reconnect;
@@ -433,7 +449,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
void Handle(TEvPrivate::TEvReconnectReplicas::TPtr& ev) {
const auto& idx = ev->Get()->ReplicaIdx;
Replicas[idx].IsScheduled = false;
- ReconnectReplica(idx);
+ ReconnectReplica(idx, true);
}
std::pair<ui64, ui64> DecodeCookie(ui64 cookie) {
@@ -477,12 +493,15 @@ public:
return NKikimrServices::TActivity::BOARD_LOOKUP_ACTOR;
}
- TBoardLookupActor(const TString &path, TActorId owner, EBoardLookupMode mode, ui32 groupId)
+ TBoardLookupActor(
+ const TString &path, TActorId owner, EBoardLookupMode mode, ui32 groupId,
+ TBoardRetrySettings boardRetrySettings)
: Path(path)
, Owner(owner)
, Mode(mode)
, StateStorageGroupId(groupId)
, Subscriber(Mode == EBoardLookupMode::Subscription)
+ , BoardRetrySettings(std::move(boardRetrySettings))
{}
void Bootstrap() {
@@ -524,8 +543,10 @@ public:
}
};
-IActor* CreateBoardLookupActor(const TString &path, const TActorId &owner, ui32 groupId, EBoardLookupMode mode) {
- return new TBoardLookupActor(path, owner, mode, groupId);
+IActor* CreateBoardLookupActor(
+ const TString &path, const TActorId &owner, ui32 groupId, EBoardLookupMode mode,
+ TBoardRetrySettings boardRetrySettings) {
+ return new TBoardLookupActor(path, owner, mode, groupId, std::move(boardRetrySettings));
}
}
diff --git a/ydb/core/base/board_publish.cpp b/ydb/core/base/board_publish.cpp
index 8b73b4ad407..c808bb0326c 100644
--- a/ydb/core/base/board_publish.cpp
+++ b/ydb/core/base/board_publish.cpp
@@ -100,10 +100,11 @@ class TBoardPublishActor : public TActorBootstrapped<TBoardPublishActor> {
const ui32 StateStorageGroupId;
const ui32 TtlMs;
const bool Register;
+ const TBoardRetrySettings BoardRetrySettings;
struct TRetryState {
NMonotonic::TMonotonic LastRetryAt = TMonotonic::Zero();
- TDuration CurrentDelay = TDuration::MilliSeconds(100);
+ TDuration CurrentDelay = TDuration::Zero();
};
struct TReplicaPublishActorState {
@@ -113,13 +114,20 @@ class TBoardPublishActor : public TActorBootstrapped<TBoardPublishActor> {
THashMap<TActorId, TReplicaPublishActorState> ReplicaPublishActors; // replica -> publish actor
+ const TDuration& GetCurrentDelay(TRetryState& state) {
+ if (state.CurrentDelay == TDuration::Zero()) {
+ state.CurrentDelay = BoardRetrySettings.StartDelayMs;
+ }
+ return state.CurrentDelay;
+ }
+
TDuration GetRetryDelay(TRetryState& state) {
auto newDelay = state.CurrentDelay;
newDelay *= 2;
- if (newDelay > TDuration::Seconds(1)) {
- newDelay = TDuration::Seconds(1);
+ if (newDelay > BoardRetrySettings.MaxDelayMs) {
+ newDelay = BoardRetrySettings.MaxDelayMs;
}
- newDelay *= AppData()->RandomProvider->Uniform(10, 200);
+ newDelay *= AppData()->RandomProvider->Uniform(50, 200);
newDelay /= 100;
state.CurrentDelay = newDelay;
return state.CurrentDelay;
@@ -183,7 +191,7 @@ class TBoardPublishActor : public TActorBootstrapped<TBoardPublishActor> {
const TActorId replica = ev->Get()->Replica;
auto& retryState = ReplicaPublishActors[replica].RetryState;
- if (now - retryState.LastRetryAt < retryState.CurrentDelay) {
+ if (now - retryState.LastRetryAt < GetCurrentDelay(retryState)) {
auto at = retryState.LastRetryAt + GetRetryDelay(retryState);
Schedule(at - now, new TEvPrivate::TEvRetryPublishActor(replica));
return;
@@ -196,11 +204,19 @@ class TBoardPublishActor : public TActorBootstrapped<TBoardPublishActor> {
void Handle(TEvPrivate::TEvRetryPublishActor::TPtr &ev) {
const auto& replica = ev->Get()->Replica;
- RetryReplica(replica);
+ RetryReplica(replica, true);
}
- void RetryReplica(const TActorId& replica) {
+ void RetryReplica(const TActorId& replica, bool fromRetry = false) {
+ if (!fromRetry) {
+ auto delay = TDuration::Seconds(2);
+ delay *= AppData()->RandomProvider->Uniform(50, 200);
+ delay /= 100;
+ Schedule(delay, new TEvPrivate::TEvRetryPublishActor(replica));
+ return;
+ }
+
auto replicaPublishActorsIt = ReplicaPublishActors.find(replica);
if (replicaPublishActorsIt == ReplicaPublishActors.end()) {
return;
@@ -221,13 +237,16 @@ public:
return NKikimrServices::TActivity::BOARD_PUBLISH_ACTOR;
}
- TBoardPublishActor(const TString &path, const TString &payload, const TActorId &owner, ui32 groupId, ui32 ttlMs, bool reg)
+ TBoardPublishActor(
+ const TString &path, const TString &payload, const TActorId &owner, ui32 groupId, ui32 ttlMs, bool reg,
+ TBoardRetrySettings boardRetrySettings)
: Path(path)
, Payload(payload)
, Owner(owner)
, StateStorageGroupId(groupId)
, TtlMs(ttlMs)
, Register(reg)
+ , BoardRetrySettings(std::move(boardRetrySettings))
{
Y_UNUSED(TtlMs);
Y_UNUSED(Register);
@@ -257,8 +276,10 @@ public:
}
};
-IActor* CreateBoardPublishActor(const TString &path, const TString &payload, const TActorId &owner, ui32 groupId, ui32 ttlMs, bool reg) {
- return new TBoardPublishActor(path, payload, owner, groupId, ttlMs, reg);
+IActor* CreateBoardPublishActor(
+ const TString &path, const TString &payload, const TActorId &owner, ui32 groupId, ui32 ttlMs, bool reg,
+ TBoardRetrySettings boardRetrySettings) {
+ return new TBoardPublishActor(path, payload, owner, groupId, ttlMs, reg, std::move(boardRetrySettings));
}
TString MakeEndpointsBoardPath(const TString &database) {
diff --git a/ydb/core/base/statestorage.h b/ydb/core/base/statestorage.h
index 261477786ee..38b6babefd8 100644
--- a/ydb/core/base/statestorage.h
+++ b/ydb/core/base/statestorage.h
@@ -540,6 +540,11 @@ enum class EBoardLookupMode {
Subscription,
};
+struct TBoardRetrySettings {
+ TDuration StartDelayMs = TDuration::MilliSeconds(2000);
+ TDuration MaxDelayMs = TDuration::MilliSeconds(5000);
+};
+
TIntrusivePtr<TStateStorageInfo> BuildStateStorageInfo(char (&namePrefix)[TActorId::MaxServiceIDLength], const NKikimrConfig::TDomainsConfig::TStateStorage& config);
void BuildStateStorageInfos(const NKikimrConfig::TDomainsConfig::TStateStorage& config,
TIntrusivePtr<TStateStorageInfo> &stateStorageInfo,
@@ -556,8 +561,12 @@ IActor* CreateStateStorageTabletGuardian(ui64 tabletId, const TActorId &leader,
IActor* CreateStateStorageFollowerGuardian(ui64 tabletId, const TActorId &follower); // created as followerCandidate
IActor* CreateStateStorageBoardReplica(const TIntrusivePtr<TStateStorageInfo> &, ui32);
IActor* CreateSchemeBoardReplica(const TIntrusivePtr<TStateStorageInfo>&, ui32);
-IActor* CreateBoardLookupActor(const TString &path, const TActorId &owner, ui32 groupId, EBoardLookupMode mode);
-IActor* CreateBoardPublishActor(const TString &path, const TString &payload, const TActorId &owner, ui32 groupId, ui32 ttlMs, bool reg);
+IActor* CreateBoardLookupActor(
+ const TString &path, const TActorId &owner, ui32 groupId, EBoardLookupMode mode,
+ TBoardRetrySettings boardRetrySettings = {});
+IActor* CreateBoardPublishActor(
+ const TString &path, const TString &payload, const TActorId &owner, ui32 groupId, ui32 ttlMs, bool reg,
+ TBoardRetrySettings boardRetrySettings = {});
TString MakeEndpointsBoardPath(const TString &database);
diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp
index bec70b79a1e..da37347c841 100644
--- a/ydb/core/kqp/counters/kqp_counters.cpp
+++ b/ydb/core/kqp/counters/kqp_counters.cpp
@@ -764,6 +764,7 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co
RmSnapshotLatency = KqpGroup->GetHistogram(
"RM/SnapshotLatency", NMonitoring::ExponentialHistogram(20, 2, 1));
RmMaxSnapshotLatency = KqpGroup->GetCounter("RM/MaxSnapshotLatency", false);
+ RmNodeNumberInSnapshot = KqpGroup->GetCounter("RM/NodeNumberInSnapshot", false);
/* Spilling */
SpillingWriteBlobs = KqpGroup->GetCounter("Spilling/WriteBlobs", true);
diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h
index 7cfc101f010..ad885cac5f8 100644
--- a/ydb/core/kqp/counters/kqp_counters.h
+++ b/ydb/core/kqp/counters/kqp_counters.h
@@ -362,6 +362,7 @@ public:
::NMonitoring::TDynamicCounters::TCounterPtr RmInternalError;
NMonitoring::THistogramPtr RmSnapshotLatency;
::NMonitoring::TDynamicCounters::TCounterPtr RmMaxSnapshotLatency;
+ ::NMonitoring::TDynamicCounters::TCounterPtr RmNodeNumberInSnapshot;
// Spilling counters
::NMonitoring::TDynamicCounters::TCounterPtr SpillingWriteBlobs;
diff --git a/ydb/core/kqp/node_service/kqp_node_ut.cpp b/ydb/core/kqp/node_service/kqp_node_ut.cpp
index d55ae12d647..e4380cf0621 100644
--- a/ydb/core/kqp/node_service/kqp_node_ut.cpp
+++ b/ydb/core/kqp/node_service/kqp_node_ut.cpp
@@ -99,6 +99,11 @@ NKikimrConfig::TTableServiceConfig MakeKqpResourceManagerConfig() {
config.MutableResourceManager()->SetPublishStatisticsIntervalSec(0);
config.MutableResourceManager()->SetEnableInstantMkqlMemoryAlloc(true);
+ auto* infoExchangerRetrySettings = config.MutableResourceManager()->MutableInfoExchangerSettings();
+ auto* exchangerSettings = infoExchangerRetrySettings->MutableExchangerSettings();
+ exchangerSettings->SetStartDelayMs(10);
+ exchangerSettings->SetMaxDelayMs(10);
+
return config;
}
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
index 150fa41f327..a6d9d2dc24b 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
@@ -194,7 +194,7 @@ public:
TString endpointAddress;
bool useSsl = false;
ParseGrpcEndpoint(TokenAccessorConfig.GetEndpoint(), endpointAddress, useSsl);
-
+
CredentialsFactory = NYql::CreateSecuredServiceAccountCredentialsOverTokenAccessorFactory(endpointAddress, useSsl, caContent, TokenAccessorConfig.GetConnectionPoolSize());
}
@@ -363,17 +363,25 @@ public:
KQP_PROXY_LOG_D("Received node white board pool stats: " << pool.usage());
NodeResources.SetCpuUsage(pool.usage());
NodeResources.SetThreads(pool.threads());
+
+ PublishResourceUsage();
}
void DoPublishResources() {
- SendBoardPublishPoison();
-
SendWhiteboardRequest();
+
if (AppData()->TenantName.empty() || !SelfDataCenterId) {
KQP_PROXY_LOG_E("Cannot start publishing usage, tenants: " << AppData()->TenantName << ", " << SelfDataCenterId.value_or("empty"));
return;
}
+ SendBoardPublishPoison();
+
+ if (TableServiceConfig.GetEnablePublishKqpProxyByRM()) {
+ Send(KqpRmServiceActor, std::make_unique<TEvKqp::TEvKqpProxyPublishRequest>());
+ return;
+ }
+
auto groupId = GetDefaultStateStorageGroupId(AppData()->TenantName);
if (!groupId) {
KQP_PROXY_LOG_D("Unable to determine default state storage group id for database " <<
@@ -389,22 +397,14 @@ public:
}
void PublishResourceUsage() {
- const auto& sbs = TableServiceConfig.GetSessionBalancerSettings();
- auto now = TAppData::TimeProvider->Now();
- TDuration batchingInterval = TDuration::MilliSeconds(sbs.GetBoardPublishIntervalMs());
-
- if (TableServiceConfig.GetEnablePublishKqpProxyByRM()) {
- if (!LastPublishResourcesAt.has_value() || now - *LastPublishResourcesAt < batchingInterval) {
- LastPublishResourcesAt = TAppData::TimeProvider->Now();
- Send(KqpRmServiceActor, std::make_unique<TEvKqp::TEvKqpProxyPublishRequest>());
- }
- return;
- }
-
if (ResourcesPublishScheduled) {
return;
}
+ const auto& sbs = TableServiceConfig.GetSessionBalancerSettings();
+ auto now = TAppData::TimeProvider->Now();
+ TDuration batchingInterval = TDuration::MilliSeconds(sbs.GetBoardPublishIntervalMs());
+
if (LastPublishResourcesAt && now - *LastPublishResourcesAt < batchingInterval) {
ResourcesPublishScheduled = true;
Schedule(batchingInterval, new TEvPrivate::TEvReadyToPublishResources());
@@ -829,11 +829,14 @@ public:
if (!TableServiceConfig.GetEnablePublishKqpProxyByRM()) {
LookupPeerProxyData();
} else {
- GetKqpResourceManager()->RequestClusterResourcesInfo(
- [as = TlsActivationContext->ActorSystem(), self = SelfId()](TVector<NKikimrKqp::TKqpNodeResources>&& resources) {
- TAutoPtr<IEventHandle> eh = new IEventHandle(self, self, new TEvPrivate::TEvResourcesSnapshot(std::move(resources)));
- as->Send(eh);
- });
+ if (SelfDataCenterId && !AppData()->TenantName.empty() && !IsLookupByRmScheduled) {
+ IsLookupByRmScheduled = true;
+ GetKqpResourceManager()->RequestClusterResourcesInfo(
+ [as = TlsActivationContext->ActorSystem(), self = SelfId()](TVector<NKikimrKqp::TKqpNodeResources>&& resources) {
+ TAutoPtr<IEventHandle> eh = new IEventHandle(self, self, new TEvPrivate::TEvResourcesSnapshot(std::move(resources)));
+ as->Send(eh);
+ });
+ }
}
if (!ShutdownRequested) {
const auto& sbs = TableServiceConfig.GetSessionBalancerSettings();
@@ -875,33 +878,38 @@ public:
}
void Handle(TEvPrivate::TEvResourcesSnapshot::TPtr& ev) {
- if (ev->Get()->Snapshot.empty()) {
- PeerProxyNodeResources.clear();
- KQP_PROXY_LOG_E("Can not find default state storage group for database " <<
- AppData()->TenantName);
- return;
- }
+ IsLookupByRmScheduled = false;
+
+ TVector<NKikimrKqp::TKqpProxyNodeResources> proxyResources;
+ std::vector<ui64> localDatacenterProxies;
+ proxyResources.reserve(ev->Get()->Snapshot.size());
- Y_VERIFY(SelfDataCenterId);
- PeerProxyNodeResources.resize(ev->Get()->Snapshot.size());
- size_t idx = 0;
auto getDataCenterId = [](const auto& entry) {
return entry.HasDataCenterId() ? entry.GetDataCenterId() : DataCenterToString(entry.GetDataCenterNumId());
};
- LocalDatacenterProxies.clear();
- for(auto& nodeResource : ev->Get()->Snapshot) {
- auto* proxyNodeResources = nodeResource.MutableKqpProxyNodeResources();
- proxyNodeResources->SetNodeId(nodeResource.GetNodeId());
-
- PeerProxyNodeResources[idx] = std::move(*proxyNodeResources);
+ for(auto& nodeResources : ev->Get()->Snapshot) {
+ auto* proxyNodeResources = nodeResources.MutableKqpProxyNodeResources();
- if (getDataCenterId(PeerProxyNodeResources[idx]) == *SelfDataCenterId) {
- LocalDatacenterProxies.emplace_back(PeerProxyNodeResources[idx].GetNodeId());
+ if (proxyNodeResources->HasNodeId()) {
+ proxyResources.push_back(std::move(*proxyNodeResources));
+ if (getDataCenterId(proxyResources.back()) == *SelfDataCenterId) {
+ localDatacenterProxies.emplace_back(proxyResources.back().GetNodeId());
+ }
}
- ++idx;
}
+ if (proxyResources.empty()) {
+ PeerProxyNodeResources.clear();
+ KQP_PROXY_LOG_D("Received unexpected data from rm for database " <<
+ AppData()->TenantName);
+ return;
+ }
+
+ Y_VERIFY(SelfDataCenterId);
+ PeerProxyNodeResources = std::move(proxyResources);
+ LocalDatacenterProxies = std::move(localDatacenterProxies);
+
PeerStats = CalcPeerStats(PeerProxyNodeResources, *SelfDataCenterId);
TryKickSession();
}
@@ -1461,13 +1469,13 @@ private:
}
}
- void Handle(NKqp::TEvListScriptExecutionOperations::TPtr& ev) {
+ void Handle(NKqp::TEvListScriptExecutionOperations::TPtr& ev) {
if (CheckScriptExecutionsTablesReady<TEvListScriptExecutionOperationsResponse>(ev)) {
Register(CreateListScriptExecutionOperationsActor(std::move(ev)));
- }
+ }
}
- void Handle(NKqp::TEvCancelScriptExecutionOperation::TPtr& ev) {
+ void Handle(NKqp::TEvCancelScriptExecutionOperation::TPtr& ev) {
if (CheckScriptExecutionsTablesReady<TEvCancelScriptExecutionOperationResponse>(ev)) {
Register(CreateCancelScriptExecutionOperationActor(std::move(ev)));
}
@@ -1544,6 +1552,7 @@ private:
};
EScriptExecutionsCreationStatus ScriptExecutionsCreationStatus = EScriptExecutionsCreationStatus::NotStarted;
std::deque<THolder<IEventHandle>> DelayedEventsQueue;
+ bool IsLookupByRmScheduled = false;
};
} // namespace
diff --git a/ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp b/ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp
index 50873cec8ce..cf48f3758c9 100644
--- a/ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp
+++ b/ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp
@@ -2,6 +2,8 @@
#include <ydb/core/base/location.h>
#include <ydb/core/base/statestorage.h>
+#include <ydb/core/cms/console/configs_dispatcher.h>
+#include <ydb/core/cms/console/console.h>
#include <ydb/core/kqp/common/kqp.h>
#include <ydb/core/mind/tenant_pool.h>
#include <ydb/core/kqp/common/kqp_event_ids.h>
@@ -25,19 +27,30 @@ namespace NRm {
class TKqpResourceInfoExchangerActor : public TActorBootstrapped<TKqpResourceInfoExchangerActor> {
using TBase = TActorBootstrapped<TKqpResourceInfoExchangerActor>;
+ static constexpr ui32 BUCKETS_COUNT = 32;
+
struct TEvPrivate {
enum EEv {
- EvRetrySending = EventSpaceBegin(TEvents::ES_PRIVATE),
+ EvSendToNode = EventSpaceBegin(TEvents::ES_PRIVATE),
EvRegularSending,
EvRetrySubscriber,
EvUpdateSnapshotState,
+ EvRetryNode,
};
- struct TEvRetrySending :
- public TEventLocal<TEvRetrySending, EEv::EvRetrySending> {
- ui32 NodeId;
+ struct TEvSendToNode :
+ public TEventLocal<TEvSendToNode, EEv::EvSendToNode> {
+ ui32 BucketInd;
- TEvRetrySending(ui32 nodeId) : NodeId(nodeId) {
+ TEvSendToNode(ui32 bucketInd) : BucketInd(bucketInd) {
+ }
+ };
+
+ struct TEvRetryNode :
+ public TEventLocal<TEvRetryNode, EEv::EvRetryNode> {
+ ui32 BucketInd;
+
+ TEvRetryNode(ui32 bucketInd) : BucketInd(bucketInd) {
}
};
@@ -54,31 +67,64 @@ class TKqpResourceInfoExchangerActor : public TActorBootstrapped<TKqpResourceInf
};
};
- struct TRetryState {
+ struct TDelayState {
bool IsScheduled = false;
NMonotonic::TMonotonic LastRetryAt = TMonotonic::Zero();
- TDuration CurrentDelay = TDuration::MilliSeconds(50);
+ TDuration CurrentDelay = TDuration::Zero();
+ };
+
+ struct TDelaySettings {
+ TDuration StartDelayMs = TDuration::MilliSeconds(500);
+ TDuration MaxDelayMs = TDuration::MilliSeconds(2000);
};
struct TNodeState {
- TRetryState RetryState;
NMonotonic::TMonotonic LastUpdateAt = TMonotonic::Zero();
NKikimrKqp::TResourceExchangeNodeData NodeData;
};
+ struct TBucketState {
+ TDelayState RetryState;
+ TDelayState SendState;
+ THashSet<ui32> NodeIdsToSend;
+ THashSet<ui32> NodeIdsToRetry;
+ };
+
public:
TKqpResourceInfoExchangerActor(TIntrusivePtr<TKqpCounters> counters,
- std::shared_ptr<TResourceSnapshotState> resourceSnapshotState)
+ std::shared_ptr<TResourceSnapshotState> resourceSnapshotState,
+ const NKikimrConfig::TTableServiceConfig::TResourceManager::TInfoExchangerSettings& settings)
: ResourceSnapshotState(std::move(resourceSnapshotState))
, Counters(counters)
+ , Settings(settings)
{
- Y_UNUSED(counters);
+ Buckets.resize(BUCKETS_COUNT);
+
+ const auto& publisherSettings = settings.GetPublisherSettings();
+ const auto& subscriberSettings = settings.GetSubscriberSettings();
+ const auto& exchangerSettings = settings.GetExchangerSettings();
+
+ UpdateBoardRetrySettings(publisherSettings, PublisherSettings);
+ UpdateBoardRetrySettings(subscriberSettings, SubscriberSettings);
+
+ if (exchangerSettings.HasStartDelayMs()) {
+ ExchangerSettings.StartDelayMs = TDuration::MilliSeconds(exchangerSettings.GetStartDelayMs());
+ }
+ if (exchangerSettings.HasMaxDelayMs()) {
+ ExchangerSettings.MaxDelayMs = TDuration::MilliSeconds(exchangerSettings.GetMaxDelayMs());
+ }
}
void Bootstrap() {
LOG_D("Start KqpResourceInfoExchangerActor at " << SelfId());
+ ui32 tableServiceConfigKind = (ui32) NKikimrConsole::TConfigItem::TableServiceConfigItem;
+
+ Send(NConsole::MakeConfigsDispatcherID(SelfId().NodeId()),
+ new NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest({tableServiceConfigKind}),
+ IEventHandle::FlagTrackDelivery);
+
Become(&TKqpResourceInfoExchangerActor::WorkState);
Send(MakeTenantPoolRootID(), new TEvents::TEvSubscribe);
@@ -86,6 +132,21 @@ public:
private:
+ bool UpdateBoardRetrySettings(
+ const NKikimrConfig::TTableServiceConfig::TResourceManager::TRetrySettings& settings,
+ TBoardRetrySettings& retrySetting) {
+ bool ret = false;
+ if (settings.HasStartDelayMs()) {
+ ret = true;
+ retrySetting.StartDelayMs = TDuration::MilliSeconds(settings.GetStartDelayMs());
+ }
+ if (settings.HasMaxDelayMs()) {
+ ret = true;
+ retrySetting.MaxDelayMs = TDuration::MilliSeconds(settings.GetMaxDelayMs());
+ }
+ return ret;
+ }
+
static TString MakeKqpInfoExchangerBoardPath(TStringBuf database) {
return TStringBuilder() << "kqpexch+" << database;
}
@@ -94,8 +155,8 @@ private:
auto& retryState = BoardState.RetryStateForSubscriber;
auto now = TlsActivationContext->Monotonic();
- if (now - retryState.LastRetryAt < retryState.CurrentDelay) {
- auto at = retryState.LastRetryAt + GetRetryDelay(retryState);
+ if (now - retryState.LastRetryAt < GetCurrentDelay(retryState)) {
+ auto at = retryState.LastRetryAt + GetDelay(retryState);
Schedule(at - now, new TEvPrivate::TEvRetrySubscriber);
return;
}
@@ -108,7 +169,8 @@ private:
BoardState.Subscriber = TActorId();
auto subscriber = CreateBoardLookupActor(
- BoardState.Path, SelfId(), BoardState.StateStorageGroupId, EBoardLookupMode::Subscription);
+ BoardState.Path, SelfId(), BoardState.StateStorageGroupId, EBoardLookupMode::Subscription,
+ SubscriberSettings);
BoardState.Subscriber = Register(subscriber);
retryState.LastRetryAt = now;
@@ -123,7 +185,7 @@ private:
BoardState.Publisher = TActorId();
auto publisher = CreateBoardPublishActor(BoardState.Path, SelfInfo, SelfId(),
- BoardState.StateStorageGroupId, /* ttlMs */ 0, /* reg */ true);
+ BoardState.StateStorageGroupId, /* ttlMs */ 0, /* reg */ true, PublisherSettings);
BoardState.Publisher = Register(publisher);
auto& nodeState = NodesState[SelfId().NodeId()];
@@ -132,20 +194,29 @@ private:
ActorIdToProto(BoardState.Publisher, boardData->MutablePublisher());
}
- TVector<ui32> UpdateBoardInfo(const TMap<TActorId, TEvStateStorage::TBoardInfoEntry>& infos) {
+ std::pair<TVector<ui32>, bool> UpdateBoardInfo(const TMap<TActorId, TEvStateStorage::TBoardInfoEntry>& infos) {
auto nodeIds = TVector<ui32>();
+ bool isChanged = false;
auto now = TlsActivationContext->Monotonic();
- for (const auto& [id, entry] : infos) {
+ for (const auto& [publisherId, entry] : infos) {
if (entry.Dropped) {
- auto nodesStateIt = NodesState.find(id.NodeId());
+ auto nodesStateIt = NodesState.find(publisherId.NodeId());
+
if (nodesStateIt == NodesState.end()) {
continue;
}
+
+ auto bucketInd = GetBucketInd(publisherId.NodeId());
+ auto& bucket = Buckets[bucketInd];
+
auto& currentBoardData = nodesStateIt->second.NodeData.GetResourceExchangeBoardData();
auto currentPublisher = ActorIdFromProto(currentBoardData.GetPublisher());
- if (currentPublisher == id) {
+ if (currentPublisher == publisherId) {
NodesState.erase(nodesStateIt);
+ isChanged = true;
+ bucket.NodeIdsToSend.erase(publisherId.NodeId());
+ bucket.NodeIdsToRetry.erase(publisherId.NodeId());
}
continue;
}
@@ -160,11 +231,16 @@ private:
auto* currentBoardData = currentInfo.MutableResourceExchangeBoardData();
auto currentUlidString = currentBoardData->GetUlid();
+ auto bucketInd = GetBucketInd(owner.NodeId());
+ auto& bucket = Buckets[bucketInd];
+
if (currentUlidString.empty()) {
*currentBoardData = std::move(boardData);
- ActorIdToProto(id, currentBoardData->MutablePublisher());
+ ActorIdToProto(publisherId, currentBoardData->MutablePublisher());
nodeIds.push_back(owner.NodeId());
+ isChanged = true;
nodeState.LastUpdateAt = now;
+ bucket.NodeIdsToSend.insert(owner.NodeId());
continue;
}
@@ -173,19 +249,25 @@ private:
if (currentUlid < ulid) {
*currentBoardData = std::move(boardData);
- ActorIdToProto(id, currentBoardData->MutablePublisher());
+ ActorIdToProto(publisherId, currentBoardData->MutablePublisher());
currentInfo.SetRound(0);
+ isChanged = true;
(*currentInfo.MutableResources()) = NKikimrKqp::TKqpNodeResources();
nodeIds.push_back(owner.NodeId());
nodeState.LastUpdateAt = now;
+ bucket.NodeIdsToSend.insert(owner.NodeId());
continue;
}
}
- return nodeIds;
+ Counters->RmNodeNumberInSnapshot->Set(NodesState.size());
+
+ return {nodeIds, isChanged};
}
- void UpdateResourceInfo(const TVector<NKikimrKqp::TResourceExchangeNodeData>& infos) {
+ bool UpdateResourceInfo(const TVector<NKikimrKqp::TResourceExchangeNodeData>& infos) {
+ bool isChanged = false;
+
auto now = TlsActivationContext->Monotonic();
for (const auto& info : infos) {
@@ -214,9 +296,12 @@ private:
auto latency = now - nodeState.LastUpdateAt;
Counters->RmSnapshotLatency->Collect(latency.MilliSeconds());
nodeState.LastUpdateAt = now;
+ isChanged = true;
continue;
}
}
+
+ return isChanged;
}
void UpdateResourceSnapshotState() {
@@ -224,8 +309,8 @@ private:
return;
}
auto now = TlsActivationContext->Monotonic();
- if (now - ResourceSnapshotRetryState.LastRetryAt < ResourceSnapshotRetryState.CurrentDelay) {
- auto at = ResourceSnapshotRetryState.LastRetryAt + GetRetryDelay(ResourceSnapshotRetryState);
+ if (now - ResourceSnapshotRetryState.LastRetryAt < GetCurrentDelay(ResourceSnapshotRetryState)) {
+ auto at = ResourceSnapshotRetryState.LastRetryAt + GetDelay(ResourceSnapshotRetryState);
ResourceSnapshotRetryState.IsScheduled = true;
Schedule(at - now, new TEvPrivate::TEvUpdateSnapshotState);
return;
@@ -233,10 +318,10 @@ private:
TVector<NKikimrKqp::TKqpNodeResources> resources;
resources.reserve(NodesState.size());
- for (const auto& [id, state] : NodesState) {
- auto currentResources = state.NodeData.GetResources();
- if (currentResources.GetNodeId()) {
- resources.push_back(currentResources);
+ for (const auto& [nodeId, state] : NodesState) {
+ const auto& currentResources = state.NodeData.GetResources();
+ if (currentResources.HasNodeId()) {
+ resources.push_back(std::move(currentResources));
}
}
@@ -248,40 +333,92 @@ private:
ResourceSnapshotRetryState.LastRetryAt = now;
}
- void SendInfos(TVector<ui32> infoNodeIds, TVector<ui32> nodeIds = {}) {
+ void SendInfos(TVector<ui32> infoNodeIds, bool resending = false, TVector<ui32> nodeIds = {}) {
auto& nodeState = NodesState[SelfId().NodeId()];
nodeState.NodeData.SetRound(Round++);
if (!nodeIds.empty()) {
- auto snapshotMsg = CreateSnapshotMessage(infoNodeIds, true);
+ auto snapshotMsg = CreateSnapshotMessage(infoNodeIds, resending);
for (const auto& nodeId : nodeIds) {
auto nodesStateIt = NodesState.find(nodeId);
if (nodesStateIt == NodesState.end()) {
- return;
+ continue;
+ }
+
+ auto owner = ActorIdFromProto(nodesStateIt->second.NodeData.GetResourceExchangeBoardData().GetOwner());
+ if (owner != SelfId()) {
+ auto msg = MakeHolder<TEvKqpResourceInfoExchanger::TEvSendResources>();
+ msg->Record = snapshotMsg;
+ Send(owner, msg.Release(), IEventHandle::FlagSubscribeOnSession);
}
- SendingToNode(nodesStateIt->second, true, {}, snapshotMsg);
}
} else {
auto snapshotMsg = CreateSnapshotMessage(infoNodeIds, false);
- TDuration currentLatency = TDuration::MilliSeconds(0);
- auto nowMonotic = TlsActivationContext->Monotonic();
-
- for (auto& [id, state] : NodesState) {
- SendingToNode(state, true, {}, snapshotMsg);
- if (id != SelfId().NodeId()) {
- currentLatency = Max(currentLatency, nowMonotic - state.LastUpdateAt);
+ for (size_t idx = 0; idx < BUCKETS_COUNT; idx++) {
+ auto& bucket = Buckets[idx];
+ if (!bucket.NodeIdsToSend.empty()) {
+ SendingToBucketNodes(idx, bucket, false, {}, snapshotMsg);
}
}
-
- Counters->RmMaxSnapshotLatency->Set(currentLatency.MilliSeconds());
}
}
private:
+ void Handle(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse::TPtr&) {
+ LOG_D("Subscribed for config changes.");
+ }
+
+ void Handle(NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr& ev) {
+ auto &event = ev->Get()->Record;
+
+ NKikimrConfig::TTableServiceConfig tableServiceConfig;
+
+ tableServiceConfig.Swap(event.MutableConfig()->MutableTableServiceConfig());
+ LOG_D("Updated table service config.");
+
+ const auto& infoExchangerSettings = tableServiceConfig.GetResourceManager().GetInfoExchangerSettings();
+ const auto& publisherSettings = infoExchangerSettings.GetPublisherSettings();
+ const auto& subscriberSettings = infoExchangerSettings.GetSubscriberSettings();
+ const auto& exchangerSettings = infoExchangerSettings.GetExchangerSettings();
+
+ if (UpdateBoardRetrySettings(publisherSettings, PublisherSettings)) {
+ CreatePublisher();
+ }
+
+ if (UpdateBoardRetrySettings(subscriberSettings, SubscriberSettings)) {
+ CreateSubscriber();
+ }
+
+ if (exchangerSettings.HasStartDelayMs()) {
+ ExchangerSettings.StartDelayMs = TDuration::MilliSeconds(exchangerSettings.GetStartDelayMs());
+ }
+ if (exchangerSettings.HasMaxDelayMs()) {
+ ExchangerSettings.MaxDelayMs = TDuration::MilliSeconds(exchangerSettings.GetMaxDelayMs());
+ }
+
+ auto responseEv = MakeHolder<NConsole::TEvConsole::TEvConfigNotificationResponse>(event);
+ Send(ev->Sender, responseEv.Release(), IEventHandle::FlagTrackDelivery, ev->Cookie);
+ }
+
+ void Handle(TEvents::TEvUndelivered::TPtr& ev) {
+ switch (ev->Get()->SourceType) {
+ case NConsole::TEvConfigsDispatcher::EvSetConfigSubscriptionRequest:
+ LOG_C("Failed to deliver subscription request to config dispatcher.");
+ break;
+
+ case NConsole::TEvConsole::EvConfigNotificationResponse:
+ LOG_E("Failed to deliver config notification response.");
+ break;
+
+ default:
+ break;
+ }
+ }
+
void Handle(TEvTenantPool::TEvTenantPoolStatus::TPtr& ev) {
TString tenant;
for (auto &slot : ev->Get()->Record.GetSlots()) {
@@ -332,8 +469,6 @@ private:
LOG_I("Received tenant pool status for exchanger, serving tenant: " << BoardState.Tenant
<< ", board: " << BoardState.Path
<< ", ssGroupId: " << BoardState.StateStorageGroupId);
-
- Schedule(TDuration::Seconds(2), new TEvPrivate::TEvRegularSending);
}
@@ -351,11 +486,15 @@ private:
<< ", ssGroupId: " << BoardState.StateStorageGroupId
<< ", with size: " << ev->Get()->InfoEntries.size());
- auto nodeIds = UpdateBoardInfo(ev->Get()->InfoEntries);
+ auto [nodeIds, isChanged] = UpdateBoardInfo(ev->Get()->InfoEntries);
- SendInfos({SelfId().NodeId()}, std::move(nodeIds));
+ if (!nodeIds.empty()) {
+ SendInfos({SelfId().NodeId()}, true, std::move(nodeIds));
+ }
- UpdateResourceSnapshotState();
+ if (isChanged) {
+ UpdateResourceSnapshotState();
+ }
}
void Handle(TEvStateStorage::TEvBoardInfoUpdate::TPtr& ev) {
@@ -371,11 +510,15 @@ private:
<< ", ssGroupId: " << BoardState.StateStorageGroupId
<< ", with size: " << ev->Get()->Updates.size());
- auto nodeIds = UpdateBoardInfo(ev->Get()->Updates);
+ auto [nodeIds, isChanged] = UpdateBoardInfo(ev->Get()->Updates);
- SendInfos({SelfId().NodeId()}, std::move(nodeIds));
+ if (!nodeIds.empty()) {
+ SendInfos({SelfId().NodeId()}, true, std::move(nodeIds));
+ }
- UpdateResourceSnapshotState();
+ if (isChanged) {
+ UpdateResourceSnapshotState();
+ }
}
void Handle(TEvKqpResourceInfoExchanger::TEvPublishResource::TPtr& ev) {
@@ -395,40 +538,56 @@ private:
const TVector<NKikimrKqp::TResourceExchangeNodeData> resourceInfos(
ev->Get()->Record.GetSnapshot().begin(), ev->Get()->Record.GetSnapshot().end());
- UpdateResourceInfo(resourceInfos);
+ bool isChanged = UpdateResourceInfo(resourceInfos);
if (ev->Get()->Record.GetNeedResending()) {
auto nodesStateIt = NodesState.find(nodeId);
- if (nodesStateIt == NodesState.end()) {
- return;
+ if (nodesStateIt != NodesState.end()) {
+ SendInfos({SelfId().NodeId()}, false, {nodesStateIt->first});
}
- SendingToNode(nodesStateIt->second, false, {SelfId().NodeId()});
}
- UpdateResourceSnapshotState();
+ if (isChanged) {
+ UpdateResourceSnapshotState();
+ }
}
void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr &ev) {
const auto& nodeId = ev->Get()->NodeId;
- auto nodesStateIt = NodesState.find(nodeId);
- if (nodesStateIt == NodesState.end()) {
+ if (NodesState.find(nodeId) == NodesState.end()) {
return;
}
- SendingToNode(nodesStateIt->second, true, {SelfId().NodeId()});
+
+ auto bucketInd = GetBucketInd(nodeId);
+ auto& bucket = Buckets[bucketInd];
+
+ bucket.NodeIdsToRetry.insert(nodeId);
+
+ SendingToBucketNodes(bucketInd, bucket, true, {SelfId().NodeId()});
}
- void Handle(TEvPrivate::TEvRetrySending::TPtr& ev) {
- const auto& nodeId = ev->Get()->NodeId;
- auto nodesStateIt = NodesState.find(nodeId);
+ void Handle(TEvPrivate::TEvSendToNode::TPtr& ev) {
+ const auto& bucketInd = ev->Get()->BucketInd;
+ auto& bucket = Buckets[bucketInd];
- if (nodesStateIt == NodesState.end()) {
- return;
+ bucket.SendState.IsScheduled = false;
+
+ if (!bucket.NodeIdsToSend.empty()) {
+ SendingToBucketNodes(bucketInd, bucket, false, {SelfId().NodeId()});
}
- nodesStateIt->second.RetryState.IsScheduled = false;
+ }
- SendingToNode(nodesStateIt->second, true, {SelfId().NodeId()});
+ void Handle(TEvPrivate::TEvRetryNode::TPtr& ev) {
+ const auto& bucketInd = ev->Get()->BucketInd;
+ auto& bucket = Buckets[bucketInd];
+
+ bucket.RetryState.IsScheduled = false;
+
+ if (!bucket.NodeIdsToRetry.empty()) {
+ SendingToBucketNodes(bucketInd, bucket, true, {SelfId().NodeId()});
+ }
}
void Handle(TEvPrivate::TEvRetrySubscriber::TPtr&) {
@@ -456,12 +615,16 @@ private:
STATEFN(WorkState) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvStateStorage::TEvBoardInfo, Handle);
+ hFunc(NConsole::TEvConsole::TEvConfigNotificationRequest, Handle);
+ hFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse, Handle);
+ hFunc(TEvents::TEvUndelivered, Handle);
hFunc(TEvStateStorage::TEvBoardInfoUpdate, Handle);
hFunc(TEvTenantPool::TEvTenantPoolStatus, Handle);
hFunc(TEvKqpResourceInfoExchanger::TEvPublishResource, Handle);
hFunc(TEvKqpResourceInfoExchanger::TEvSendResources, Handle);
hFunc(TEvPrivate::TEvRetrySubscriber, Handle);
- hFunc(TEvPrivate::TEvRetrySending, Handle);
+ hFunc(TEvPrivate::TEvSendToNode, Handle);
+ hFunc(TEvPrivate::TEvRetryNode, Handle);
hFunc(TEvPrivate::TEvRegularSending, Handle);
hFunc(TEvPrivate::TEvUpdateSnapshotState, Handle);
hFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
@@ -478,47 +641,84 @@ private:
TActor::PassAway();
}
- TDuration GetRetryDelay(TRetryState& state) {
+ const TDuration& GetCurrentDelay(TDelayState& state) {
+ if (state.CurrentDelay == TDuration::Zero()) {
+ state.CurrentDelay = ExchangerSettings.StartDelayMs;
+ }
+ return state.CurrentDelay;
+ }
+
+ TDuration GetDelay(TDelayState& state) {
auto newDelay = state.CurrentDelay;
newDelay *= 2;
- if (newDelay > TDuration::Seconds(2)) {
- newDelay = TDuration::Seconds(2);
+ if (newDelay > ExchangerSettings.MaxDelayMs) {
+ newDelay = ExchangerSettings.MaxDelayMs;
}
- newDelay *= AppData()->RandomProvider->Uniform(10, 200);
+ newDelay *= AppData()->RandomProvider->Uniform(50, 200);
newDelay /= 100;
state.CurrentDelay = newDelay;
return state.CurrentDelay;
}
- void SendingToNode(TNodeState& state, bool retry,
+ ui32 GetBucketInd(ui32 nodeId) {
+ return nodeId & (BUCKETS_COUNT - 1);
+ }
+
+ void SendingToBucketNodes(ui32 bucketInd, TBucketState& bucketState, bool retry,
TVector<ui32> nodeIdsForSnapshot = {}, NKikimrKqp::TResourceExchangeSnapshot snapshotMsg = {}) {
- auto& retryState = state.RetryState;
- if (retryState.IsScheduled) {
- return;
+ TDelayState* retryState;
+ if (retry) {
+ retryState = &bucketState.RetryState;
+ } else {
+ retryState = &bucketState.SendState;
}
- auto owner = ActorIdFromProto(state.NodeData.GetResourceExchangeBoardData().GetOwner());
- auto nodeId = owner.NodeId();
+ if (retryState->IsScheduled) {
+ return;
+ }
auto now = TlsActivationContext->Monotonic();
- if (retry && now - retryState.LastRetryAt < retryState.CurrentDelay) {
- auto at = retryState.LastRetryAt + GetRetryDelay(retryState);
- retryState.IsScheduled = true;
- Schedule(at - now, new TEvPrivate::TEvRetrySending(nodeId));
+ if (now - retryState->LastRetryAt < GetCurrentDelay(*retryState)) {
+ auto at = retryState->LastRetryAt + GetDelay(*retryState);
+ retryState->IsScheduled = true;
+ if (retry) {
+ Schedule(at - now, new TEvPrivate::TEvRetryNode(bucketInd));
+ } else {
+ Schedule(at - now, new TEvPrivate::TEvSendToNode(bucketInd));
+ }
return;
}
- if (owner != SelfId()) {
- auto msg = MakeHolder<TEvKqpResourceInfoExchanger::TEvSendResources>();
- if (nodeIdsForSnapshot.empty()) {
- msg->Record = std::move(snapshotMsg);
- } else {
- msg->Record = CreateSnapshotMessage(nodeIdsForSnapshot);
+ if (!nodeIdsForSnapshot.empty()) {
+ snapshotMsg = CreateSnapshotMessage(nodeIdsForSnapshot);
+ }
+
+ THashSet<ui32>* nodeIds;
+ if (retry) {
+ nodeIds = &bucketState.NodeIdsToRetry;
+ } else {
+ nodeIds = &bucketState.NodeIdsToSend;
+ }
+
+ for (const auto& nodeId: *nodeIds) {
+ auto nodesStateIt = NodesState.find(nodeId);
+ if (nodesStateIt == NodesState.end()) {
+ continue;
+ }
+ auto& nodeState = nodesStateIt->second;
+ auto owner = ActorIdFromProto(nodeState.NodeData.GetResourceExchangeBoardData().GetOwner());
+ if (owner != SelfId()) {
+ auto msg = MakeHolder<TEvKqpResourceInfoExchanger::TEvSendResources>();
+ msg->Record = snapshotMsg;
+ Send(owner, msg.Release(), IEventHandle::FlagSubscribeOnSession);
}
- Send(owner, msg.Release(), IEventHandle::FlagSubscribeOnSession);
}
- retryState.LastRetryAt = now;
+ if (retry) {
+ bucketState.NodeIdsToRetry.clear();
+ }
+
+ retryState->LastRetryAt = now;
}
NKikimrKqp::TResourceExchangeSnapshot CreateSnapshotMessage(const TVector<ui32>& nodeIds,
@@ -530,7 +730,13 @@ private:
for (const auto& nodeId: nodeIds) {
auto* el = snapshot->Add();
- *el = NodesState[nodeId].NodeData;
+
+ auto nodesStateIt = NodesState.find(nodeId);
+ if (nodesStateIt == NodesState.end()) {
+ continue;
+ }
+
+ *el = nodesStateIt->second.NodeData;
}
return snapshotMsg;
@@ -550,23 +756,30 @@ private:
ui32 StateStorageGroupId = std::numeric_limits<ui32>::max();
TActorId Publisher = TActorId();
TActorId Subscriber = TActorId();
- TRetryState RetryStateForSubscriber;
+ TDelayState RetryStateForSubscriber;
std::optional<TInstant> LastPublishTime;
};
TBoardState BoardState;
- TRetryState ResourceSnapshotRetryState;
+ TDelayState ResourceSnapshotRetryState;
+ TVector<TBucketState> Buckets;
THashMap<ui32, TNodeState> NodesState;
std::shared_ptr<TResourceSnapshotState> ResourceSnapshotState;
+ TBoardRetrySettings PublisherSettings;
+ TBoardRetrySettings SubscriberSettings;
+ TDelaySettings ExchangerSettings;
+
TIntrusivePtr<TKqpCounters> Counters;
+ NKikimrConfig::TTableServiceConfig::TResourceManager::TInfoExchangerSettings Settings;
};
NActors::IActor* CreateKqpResourceInfoExchangerActor(TIntrusivePtr<TKqpCounters> counters,
- std::shared_ptr<TResourceSnapshotState> resourceSnapshotState)
+ std::shared_ptr<TResourceSnapshotState> resourceSnapshotState,
+ const NKikimrConfig::TTableServiceConfig::TResourceManager::TInfoExchangerSettings& settings)
{
- return new TKqpResourceInfoExchangerActor(counters, std::move(resourceSnapshotState));
+ return new TKqpResourceInfoExchangerActor(counters, std::move(resourceSnapshotState), settings);
}
} // namespace NRm
diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.cpp b/ydb/core/kqp/rm_service/kqp_rm_service.cpp
index ef6dab2ac6e..73ec85b8336 100644
--- a/ydb/core/kqp/rm_service/kqp_rm_service.cpp
+++ b/ydb/core/kqp/rm_service/kqp_rm_service.cpp
@@ -153,8 +153,18 @@ public:
UpdatePatternCache(Config.GetKqpPatternCacheCapacityBytes());
if (PublishResourcesByExchanger) {
+ CreateResourceInfoExchanger(Config.GetInfoExchangerSettings());
+ return;
+ }
+ }
+
+ void CreateResourceInfoExchanger(
+ const NKikimrConfig::TTableServiceConfig::TResourceManager::TInfoExchangerSettings& settings) {
+ PublishResourcesByExchanger = true;
+ if (!ResourceInfoExchanger) {
ResourceSnapshotState = std::make_shared<TResourceSnapshotState>();
- auto exchanger = CreateKqpResourceInfoExchangerActor(Counters, ResourceSnapshotState);
+ auto exchanger = CreateKqpResourceInfoExchangerActor(
+ Counters, ResourceSnapshotState, settings);
ResourceInfoExchanger = ActorSystem->Register(exchanger);
return;
}
@@ -550,11 +560,14 @@ public:
void RequestClusterResourcesInfo(TOnResourcesSnapshotCallback&& callback) override {
LOG_AS_D("Schedule Snapshot request");
if (PublishResourcesByExchanger) {
- std::shared_ptr<const TVector<NKikimrKqp::TKqpNodeResources>> infos;
+ std::shared_ptr<TVector<NKikimrKqp::TKqpNodeResources>> infos;
with_lock (ResourceSnapshotState->Lock) {
infos = ResourceSnapshotState->Snapshot;
}
- TVector<NKikimrKqp::TKqpNodeResources> resources = *infos;
+ TVector<NKikimrKqp::TKqpNodeResources> resources;
+ if (infos != nullptr) {
+ resources = *infos;
+ }
callback(std::move(resources));
return;
}
@@ -778,6 +791,11 @@ private:
}
void HandleWork(TEvKqp::TEvKqpProxyPublishRequest::TPtr&) {
+ SendWhiteboardRequest();
+ if (AppData()->TenantName.empty() || !SelfDataCenterId) {
+ LOG_E("Cannot start publishing usage for kqp_proxy, tenants: " << AppData()->TenantName << ", " << SelfDataCenterId.value_or("empty"));
+ return;
+ }
PublishResourceUsage("kqp_proxy");
}
@@ -821,13 +839,14 @@ private:
}
void Handle(TEvInterconnect::TEvNodeInfo::TPtr& ev) {
- auto selfDataCenterId = TString();
+ SelfDataCenterId = TString();
if (const auto& node = ev->Get()->Node) {
- selfDataCenterId = node->Location.GetDataCenterId();
+ SelfDataCenterId = node->Location.GetDataCenterId();
}
- ProxyNodeResources.SetDataCenterNumId(DataCenterFromString(selfDataCenterId));
- ProxyNodeResources.SetDataCenterId(selfDataCenterId);
+ ProxyNodeResources.SetNodeId(SelfId().NodeId());
+ ProxyNodeResources.SetDataCenterNumId(DataCenterFromString(*SelfDataCenterId));
+ ProxyNodeResources.SetDataCenterId(*SelfDataCenterId);
PublishResourceUsage("data_center update");
}
@@ -869,6 +888,23 @@ private:
auto& config = *event.MutableConfig()->MutableTableServiceConfig()->MutableResourceManager();
ResourceManager->UpdatePatternCache(config.GetKqpPatternCacheCapacityBytes());
+ bool enablePublishResourcesByExchanger = config.GetEnablePublishResourcesByExchanger();
+ if (enablePublishResourcesByExchanger != PublishResourcesByExchanger) {
+ PublishResourcesByExchanger = enablePublishResourcesByExchanger;
+ if (enablePublishResourcesByExchanger) {
+ ResourceManager->CreateResourceInfoExchanger(config.GetInfoExchangerSettings());
+ PublishResourceUsage("exchanger enabled");
+ } else {
+ if (ResourceManager->ResourceInfoExchanger) {
+ Send(ResourceManager->ResourceInfoExchanger, new TEvents::TEvPoison);
+ ResourceManager->ResourceInfoExchanger = TActorId();
+ }
+ ResourceManager->PublishResourcesByExchanger = false;
+ ResourceManager->ResourceSnapshotState.reset();
+ PublishResourceUsage("exchanger disabled");
+ }
+ }
+
#define FORCE_VALUE(name) if (!config.Has ## name ()) config.Set ## name(config.Get ## name());
FORCE_VALUE(ComputeActorsCount)
FORCE_VALUE(ChannelBufferSize)
@@ -887,6 +923,7 @@ private:
ResourceManager->ExecutionUnitsResource.SetNewLimit(config.GetComputeActorsCount());
ResourceManager->Config.Swap(&config);
}
+
}
static void HandleWork(TEvents::TEvUndelivered::TPtr& ev) {
@@ -1010,9 +1047,10 @@ private:
ToBroker(new TEvResourceBroker::TEvNotifyActorDied);
if (ResourceManager->ResourceInfoExchanger) {
Send(ResourceManager->ResourceInfoExchanger, new TEvents::TEvPoison);
+ ResourceManager->ResourceInfoExchanger = TActorId();
}
+ ResourceManager->ResourceSnapshotState.reset();
if (WbState.BoardPublisherActorId) {
-
Send(WbState.BoardPublisherActorId, new TEvents::TEvPoison);
}
TActor::PassAway();
@@ -1055,13 +1093,17 @@ private:
NKikimrKqp::TKqpNodeResources payload;
payload.SetNodeId(SelfId().NodeId());
payload.SetTimestamp(now.Seconds());
- auto* proxyNodeResources = payload.MutableKqpProxyNodeResources();
if (KqpProxySharedResources) {
- ProxyNodeResources.SetActiveWorkersCount(KqpProxySharedResources->AtomicLocalSessionCount.load());
+ if (SelfDataCenterId) {
+ auto* proxyNodeResources = payload.MutableKqpProxyNodeResources();
+ ProxyNodeResources.SetActiveWorkersCount(KqpProxySharedResources->AtomicLocalSessionCount.load());
+ if (SelfDataCenterId) {
+ *proxyNodeResources = ProxyNodeResources;
+ }
+ }
} else {
LOG_D("Don't set KqpProxySharedResources");
}
- *proxyNodeResources = ProxyNodeResources;
ActorIdToProto(MakeKqpResourceManagerServiceID(SelfId().NodeId()), payload.MutableResourceManagerActorId()); // legacy
with_lock (ResourceManager->Lock) {
payload.SetAvailableComputeActors(ResourceManager->ExecutionUnitsResource.Available()); // legacy
@@ -1079,8 +1121,10 @@ private:
<< "reason: " << reason
<< ", payload: " << payload.ShortDebugString());
WbState.LastPublishTime = now;
- Send(ResourceManager->ResourceInfoExchanger,
- new TEvKqpResourceInfoExchanger::TEvPublishResource(std::move(payload)));
+ if (ResourceManager->ResourceInfoExchanger) {
+ Send(ResourceManager->ResourceInfoExchanger,
+ new TEvKqpResourceInfoExchanger::TEvPublishResource(std::move(payload)));
+ }
return;
}
@@ -1129,6 +1173,7 @@ private:
std::shared_ptr<TKqpResourceManager> ResourceManager;
bool PublishResourcesByExchanger;
+ std::optional<TString> SelfDataCenterId;
};
} // namespace NRm
diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.h b/ydb/core/kqp/rm_service/kqp_rm_service.h
index 2f149098b38..fbcff7151cc 100644
--- a/ydb/core/kqp/rm_service/kqp_rm_service.h
+++ b/ydb/core/kqp/rm_service/kqp_rm_service.h
@@ -125,7 +125,8 @@ struct TEvKqpResourceInfoExchanger {
};
NActors::IActor* CreateKqpResourceInfoExchangerActor(TIntrusivePtr<TKqpCounters> counters,
- std::shared_ptr<TResourceSnapshotState> resourceSnapshotState);
+ std::shared_ptr<TResourceSnapshotState> resourceSnapshotState,
+ const NKikimrConfig::TTableServiceConfig::TResourceManager::TInfoExchangerSettings& settings);
} // namespace NRm
diff --git a/ydb/core/kqp/rm_service/kqp_rm_ut.cpp b/ydb/core/kqp/rm_service/kqp_rm_ut.cpp
index e634bd4e0a6..8057cba687b 100644
--- a/ydb/core/kqp/rm_service/kqp_rm_ut.cpp
+++ b/ydb/core/kqp/rm_service/kqp_rm_ut.cpp
@@ -103,6 +103,11 @@ NKikimrConfig::TTableServiceConfig::TResourceManager MakeKqpResourceManagerConfi
config.SetQueryMemoryLimit(1000);
config.SetEnablePublishResourcesByExchanger(EnablePublishResourcesByExchanger);
+ auto* infoExchangerRetrySettings = config.MutableInfoExchangerSettings();
+ auto* exchangerSettings = infoExchangerRetrySettings->MutableExchangerSettings();
+ exchangerSettings->SetStartDelayMs(50);
+ exchangerSettings->SetMaxDelayMs(50);
+
return config;
}
@@ -505,7 +510,7 @@ void KqpRm::SnapshotSharing(bool byExchanger) {
auto rm_first = GetKqpResourceManager(ResourceManagers[0].NodeId());
auto rm_second = GetKqpResourceManager(ResourceManagers[1].NodeId());
- Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4));
+ Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1));
CheckSnapshot(0, {{1000, 100}, {1000, 100}}, rm_first);
CheckSnapshot(1, {{1000, 100}, {1000, 100}}, rm_second);
@@ -519,12 +524,10 @@ void KqpRm::SnapshotSharing(bool byExchanger) {
bool allocated = rm_first->AllocateResources(1, 2, request);
UNIT_ASSERT(allocated);
- Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4));
-
allocated = rm_first->AllocateResources(2, 1, request);
UNIT_ASSERT(allocated);
- Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4));
+ Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1));
CheckSnapshot(0, {{800, 80}, {1000, 100}}, rm_second);
}
@@ -533,12 +536,10 @@ void KqpRm::SnapshotSharing(bool byExchanger) {
bool allocated = rm_second->AllocateResources(1, 2, request);
UNIT_ASSERT(allocated);
- Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4));
-
allocated = rm_second->AllocateResources(2, 1, request);
UNIT_ASSERT(allocated);
- Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4));
+ Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1));
CheckSnapshot(1, {{800, 80}, {800, 80}}, rm_first);
}
@@ -547,7 +548,7 @@ void KqpRm::SnapshotSharing(bool byExchanger) {
rm_first->FreeResources(1);
rm_first->FreeResources(2);
- Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4));
+ Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1));
CheckSnapshot(0, {{1000, 100}, {800, 80}}, rm_second);
}
@@ -556,7 +557,7 @@ void KqpRm::SnapshotSharing(bool byExchanger) {
rm_second->FreeResources(1);
rm_second->FreeResources(2);
- Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4));
+ Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1));
CheckSnapshot(1, {{1000, 100}, {1000, 100}}, rm_first);
}
@@ -577,7 +578,7 @@ void KqpRm::NodesMembership(bool byExchanger) {
auto rm_first = GetKqpResourceManager(ResourceManagers[0].NodeId());
auto rm_second = GetKqpResourceManager(ResourceManagers[1].NodeId());
- Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4));
+ Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1));
CheckSnapshot(0, {{1000, 100}, {1000, 100}}, rm_first);
CheckSnapshot(1, {{1000, 100}, {1000, 100}}, rm_second);
@@ -591,7 +592,7 @@ void KqpRm::NodesMembership(bool byExchanger) {
options.FinalEvents.emplace_back(TEvents::TSystem::Poison, 1);
UNIT_ASSERT(Runtime->DispatchEvents(options));
- Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4));
+ Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1));
CheckSnapshot(0, {{1000, 100}}, rm_first);
}
@@ -611,7 +612,7 @@ void KqpRm::DisonnectNodes() {
auto rm_first = GetKqpResourceManager(ResourceManagers[0].NodeId());
auto rm_second = GetKqpResourceManager(ResourceManagers[1].NodeId());
- Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(500));
+ Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1));
CheckSnapshot(0, {{1000, 100}, {1000, 100}}, rm_first);
CheckSnapshot(1, {{1000, 100}, {1000, 100}}, rm_second);
@@ -627,6 +628,8 @@ void KqpRm::DisonnectNodes() {
Disconnect(0, 1);
+ Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1));
+
CheckSnapshot(0, {{1000, 100}}, rm_first);
}
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h
index 910cee52d33..cc852fa465f 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.h
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.h
@@ -86,6 +86,11 @@ struct TKikimrSettings: public TTestFeatureFlagsHolder<TKikimrSettings> {
TKikimrSettings()
{
FeatureFlags.SetForceColumnTablesCompositeMarks(true);
+ auto* tableServiceConfig = AppConfig.MutableTableServiceConfig();
+ auto* infoExchangerRetrySettings = tableServiceConfig->MutableResourceManager()->MutableInfoExchangerSettings();
+ auto* exchangerSettings = infoExchangerRetrySettings->MutableExchangerSettings();
+ exchangerSettings->SetStartDelayMs(10);
+ exchangerSettings->SetMaxDelayMs(10);
}
TKikimrSettings& SetAppConfig(const NKikimrConfig::TAppConfig& value) { AppConfig = value; return *this; }
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index b00ae5d0ce1..d0fabe1b370 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1113,6 +1113,15 @@ message TTableServiceConfig {
}
message TResourceManager {
+ message TRetrySettings {
+ optional uint32 StartDelayMs = 1;
+ optional uint32 MaxDelayMs = 2;
+ }
+ message TInfoExchangerSettings {
+ optional TRetrySettings PublisherSettings = 1;
+ optional TRetrySettings SubscriberSettings = 2;
+ optional TRetrySettings ExchangerSettings = 3;
+ }
optional uint32 ComputeActorsCount = 1 [default = 1000];
optional uint64 ChannelBufferSize = 2 [default = 8388608]; // 8 MB
reserved 3;
@@ -1132,6 +1141,7 @@ message TTableServiceConfig {
optional TShardsScanningPolicy ShardsScanningPolicy = 16;
optional uint64 KqpPatternCacheCapacityBytes = 17 [default = 104857600]; // 100 MiB, 0 is for disable
optional bool EnablePublishResourcesByExchanger = 18 [default = true];
+ optional TInfoExchangerSettings InfoExchangerSettings = 19;
}
message TSpillingServiceConfig {
@@ -1311,7 +1321,7 @@ message TTableServiceConfig {
optional bool EnableKqpDataQueryStreamLookup = 31 [default = false];
optional TExecuterRetriesConfig ExecuterRetriesConfig = 32;
reserved 33; // optional bool EnableKqpDataQueryStreamPointLookup = 33 [default = false];
- optional bool EnablePublishKqpProxyByRM = 34 [default = false];
+ optional bool EnablePublishKqpProxyByRM = 34 [default = true];
optional bool EnableKqpScanQueryStreamIdxLookupJoin = 35 [default = false];
optional bool EnablePredicateExtractForScanQueries = 36 [default = true];
optional bool EnablePredicateExtractForDataQueries = 37 [default = true];