aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDmitry Kardymon <kardymon-d@ydb.tech>2024-11-12 14:29:02 +0300
committerGitHub <noreply@github.com>2024-11-12 14:29:02 +0300
commita431c945bbf0d0a8a1f57e9c552fc09021cde4ee (patch)
tree6aab199b20773b47d21fbd735f10f6fac6b438d4
parenta2e65eecd27f4f81c196baae55e23637933714b4 (diff)
downloadydb-a431c945bbf0d0a8a1f57e9c552fc09021cde4ee.tar.gz
YQ-3850 Shared reading: fix interconnect subscribing (#11506)
-rw-r--r--ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp135
-rw-r--r--ydb/library/yql/dq/actors/common/retry_queue.cpp7
2 files changed, 111 insertions, 31 deletions
diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
index 2bfde99d43..90b7f88977 100644
--- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
@@ -54,6 +54,7 @@ struct TEvPrivate {
EvCoordinatorPing = EvBegin + 20,
EvUpdateMetrics,
EvPrintStateToLog,
+ EvTryConnect,
EvEnd
};
@@ -61,6 +62,11 @@ struct TEvPrivate {
struct TEvCoordinatorPing : NActors::TEventLocal<TEvCoordinatorPing, EvCoordinatorPing> {};
struct TEvUpdateMetrics : public NActors::TEventLocal<TEvUpdateMetrics, EvUpdateMetrics> {};
struct TEvPrintStateToLog : public NActors::TEventLocal<TEvPrintStateToLog, EvPrintStateToLog> {};
+ struct TEvTryConnect : public NActors::TEventLocal<TEvTryConnect, EvTryConnect> {
+ TEvTryConnect(ui32 nodeId = 0)
+ : NodeId(nodeId) {}
+ ui32 NodeId = 0;
+ };
};
struct TQueryStat {
@@ -119,6 +125,92 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
}
};
+ struct TNodesTracker{
+ class TRetryState {
+ public:
+ TDuration GetNextDelay() {
+ constexpr TDuration MaxDelay = TDuration::Seconds(10);
+ constexpr TDuration MinDelay = TDuration::MilliSeconds(100); // from second retry
+ TDuration ret = Delay; // The first delay is zero
+ Delay = ClampVal(Delay * 2, MinDelay, MaxDelay);
+ return ret ? RandomizeDelay(ret) : ret;
+ }
+ private:
+ static TDuration RandomizeDelay(TDuration baseDelay) {
+ const TDuration::TValue half = baseDelay.GetValue() / 2;
+ return TDuration::FromValue(half + RandomNumber<TDuration::TValue>(half));
+ }
+ private:
+ TDuration Delay; // The first time retry will be done instantly.
+ };
+
+ struct TNodeState {
+ bool Connected = false;
+ bool RetryScheduled = false;
+ TMaybe<TRetryState> RetryState;
+ };
+ public:
+ void Init(const NActors::TActorId& selfId) {
+ SelfId = selfId;
+ }
+
+ void AddNode(ui32 nodeId) {
+ if (Nodes.contains(nodeId)) {
+ return;
+ }
+ HandleNodeDisconnected(nodeId);
+ }
+
+ void TryConnect(ui32 nodeId) {
+ auto& state = Nodes[nodeId];
+ state.RetryScheduled = false;
+ if (state.Connected) {
+ return;
+ }
+ auto connectEvent = MakeHolder<NActors::TEvInterconnect::TEvConnectNode>();
+ auto proxyId = NActors::TActivationContext::InterconnectProxy(nodeId);
+ NActors::TActivationContext::Send(
+ new NActors::IEventHandle(proxyId, SelfId, connectEvent.Release(), 0, 0));
+ }
+
+ bool GetNodeConnected(ui32 nodeId) {
+ return Nodes[nodeId].Connected;
+ }
+
+ void HandleNodeConnected(ui32 nodeId) {
+ auto& state = Nodes[nodeId];
+ state.Connected = true;
+ state.RetryState = Nothing();
+ }
+
+ void HandleNodeDisconnected(ui32 nodeId) {
+ auto& state = Nodes[nodeId];
+ state.Connected = false;
+ if (state.RetryScheduled) {
+ return;
+ }
+ state.RetryScheduled = true;
+ if (!state.RetryState) {
+ state.RetryState.ConstructInPlace();
+ }
+ auto ev = MakeHolder<TEvPrivate::TEvTryConnect>(nodeId);
+ auto delay = state.RetryState->GetNextDelay();
+ NActors::TActivationContext::Schedule(delay, new NActors::IEventHandle(SelfId, SelfId, ev.Release()));
+ }
+
+ void PrintInternalState(TStringStream& stream) const {
+ stream << "Nodes states: \n";
+ for (const auto& [nodeId, state] : Nodes) {
+ stream << " id " << nodeId << " connected " << state.Connected << " retry scheduled " << state.RetryScheduled << "\n";
+ }
+ }
+
+ private:
+ TMap<ui32, TNodeState> Nodes;
+ NActors::TActorId SelfId;
+ TString LogPrefix = "RowDispatcher: ";
+ };
+
NConfig::TRowDispatcherConfig Config;
NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory;
@@ -134,8 +226,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
const ::NMonitoring::TDynamicCounterPtr Counters;
TRowDispatcherMetrics Metrics;
NYql::IPqGateway::TPtr PqGateway;
- THashSet<TActorId> InterconnectSessions;
- TMap<ui32, bool> NodeConnected;
+ TNodesTracker NodesTracker;
struct ConsumerCounters {
ui64 NewDataArrived = 0;
@@ -222,7 +313,7 @@ public:
void Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev);
void Handle(NFq::TEvRowDispatcher::TEvHeartbeat::TPtr& ev);
- void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr&);
+ void Handle(const TEvPrivate::TEvTryConnect::TPtr&);
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat::TPtr&);
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr&);
void Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&);
@@ -230,7 +321,6 @@ public:
void Handle(const NMon::TEvHttpInfo::TPtr&);
void DeleteConsumer(const ConsumerSessionKey& key);
- void UpdateInterconnectSessions(const NActors::TActorId& interconnectSession);
void UpdateMetrics();
TString GetInternalState();
@@ -250,7 +340,7 @@ public:
hFunc(NFq::TEvRowDispatcher::TEvSessionError, Handle);
hFunc(NFq::TEvRowDispatcher::TEvStatus, Handle);
hFunc(NFq::TEvRowDispatcher::TEvSessionStatistic, Handle);
- hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvRetry, Handle);
+ hFunc(TEvPrivate::TEvTryConnect, Handle);
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat, Handle);
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed, Handle);
hFunc(NFq::TEvRowDispatcher::TEvHeartbeat, Handle);
@@ -300,6 +390,7 @@ void TRowDispatcher::Bootstrap() {
mon->RegisterActorPage(actorsMonPage, "row_dispatcher", "Row Dispatcher", false,
TlsActivationContext->ExecutorThread.ActorSystem, SelfId());
}
+ NodesTracker.Init(SelfId());
}
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr& ev) {
@@ -317,7 +408,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr&
void TRowDispatcher::HandleConnected(TEvInterconnect::TEvNodeConnected::TPtr& ev) {
LOG_ROW_DISPATCHER_DEBUG("EvNodeConnected, node id " << ev->Get()->NodeId);
- NodeConnected[ev->Get()->NodeId] = true;
+ NodesTracker.HandleNodeConnected(ev->Get()->NodeId);
for (auto& [actorId, consumer] : Consumers) {
consumer->EventsQueue.HandleNodeConnected(ev->Get()->NodeId);
}
@@ -325,7 +416,7 @@ void TRowDispatcher::HandleConnected(TEvInterconnect::TEvNodeConnected::TPtr& ev
void TRowDispatcher::HandleDisconnected(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
LOG_ROW_DISPATCHER_DEBUG("TEvNodeDisconnected, node id " << ev->Get()->NodeId);
- NodeConnected[ev->Get()->NodeId] = false;
+ NodesTracker.HandleNodeDisconnected(ev->Get()->NodeId);
for (auto& [actorId, consumer] : Consumers) {
consumer->EventsQueue.HandleNodeDisconnected(ev->Get()->NodeId);
}
@@ -353,7 +444,7 @@ void TRowDispatcher::Handle(NActors::TEvents::TEvPong::TPtr&) {
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscribe::TPtr& ev) {
LOG_ROW_DISPATCHER_DEBUG("TEvCoordinatorChangesSubscribe from " << ev->Sender);
- UpdateInterconnectSessions(ev->InterconnectSession);
+ NodesTracker.AddNode(ev->Sender.NodeId());
CoordinatorChangedSubscribers.insert(ev->Sender);
if (!CoordinatorActorId) {
return;
@@ -387,6 +478,7 @@ void TRowDispatcher::UpdateMetrics() {
TString TRowDispatcher::GetInternalState() {
TStringStream str;
+ NodesTracker.PrintInternalState(str);
str << "Statistics:\n";
for (auto& [key, sessionsInfo] : TopicSessions) {
str << " " << key.Endpoint << " / " << key.Database << " / " << key.TopicPath << " / " << key.PartitionId;
@@ -410,7 +502,7 @@ TString TRowDispatcher::GetInternalState() {
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
LOG_ROW_DISPATCHER_DEBUG("TEvStartSession from " << ev->Sender << ", topicPath " << ev->Get()->Record.GetSource().GetTopicPath() <<
" partitionId " << ev->Get()->Record.GetPartitionId());
- UpdateInterconnectSessions(ev->InterconnectSession);
+ NodesTracker.AddNode(ev->Sender.NodeId());
TMaybe<ui64> readOffset;
if (ev->Get()->Record.HasOffset()) {
readOffset = ev->Get()->Record.GetOffset();
@@ -430,7 +522,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
LOG_ROW_DISPATCHER_DEBUG("Topic session count " << topicSessionInfo.Sessions.size());
Y_ENSURE(topicSessionInfo.Sessions.size() <= 1);
- auto consumerInfo = MakeAtomicShared<ConsumerInfo>(ev->Sender, SelfId(), NextEventQueueId++, ev->Get()->Record, TActorId(), NodeConnected[ev->Sender.NodeId()]);
+ auto consumerInfo = MakeAtomicShared<ConsumerInfo>(ev->Sender, SelfId(), NextEventQueueId++, ev->Get()->Record, TActorId(), NodesTracker.GetNodeConnected(ev->Sender.NodeId()));
Consumers[key] = consumerInfo;
ConsumersByEventQueueId[consumerInfo->EventQueueId] = consumerInfo;
if (!consumerInfo->EventsQueue.OnEventReceived(ev)) {
@@ -575,14 +667,9 @@ void TRowDispatcher::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClo
}
}
-void TRowDispatcher::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr& ev) {
- LOG_ROW_DISPATCHER_TRACE("TEvRetry " << ev->Get()->EventQueueId);
- auto it = ConsumersByEventQueueId.find(ev->Get()->EventQueueId);
- if (it == ConsumersByEventQueueId.end()) {
- LOG_ROW_DISPATCHER_WARN("No consumer with EventQueueId = " << ev->Get()->EventQueueId);
- return;
- }
- it->second->EventsQueue.Retry();
+void TRowDispatcher::Handle(const TEvPrivate::TEvTryConnect::TPtr& ev) {
+ LOG_ROW_DISPATCHER_TRACE("TEvTryConnect to node id " << ev->Get()->NodeId);
+ NodesTracker.TryConnect(ev->Get()->NodeId);
}
void TRowDispatcher::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat::TPtr& ev) {
@@ -705,18 +792,6 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev
}
}
-void TRowDispatcher::UpdateInterconnectSessions(const NActors::TActorId& interconnectSession) {
- if (!interconnectSession) {
- return;
- }
- auto sessionsIt = InterconnectSessions.find(interconnectSession);
- if (sessionsIt != InterconnectSessions.end()) {
- return;
- }
- Send(interconnectSession, new NActors::TEvents::TEvSubscribe, IEventHandle::FlagTrackDelivery);
- InterconnectSessions.insert(interconnectSession);
-}
-
} // namespace
////////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/library/yql/dq/actors/common/retry_queue.cpp b/ydb/library/yql/dq/actors/common/retry_queue.cpp
index 2d0a61e0b0..2f232b6685 100644
--- a/ydb/library/yql/dq/actors/common/retry_queue.cpp
+++ b/ydb/library/yql/dq/actors/common/retry_queue.cpp
@@ -168,7 +168,12 @@ TDuration TRetryEventsQueue::TRetryState::RandomizeDelay(TDuration baseDelay) {
}
void TRetryEventsQueue::PrintInternalState(TStringStream& stream) const {
- stream << "id " << EventQueueId << ", NextSeqNo "
+ stream << "id " << EventQueueId;
+ if (LocalRecipient) {
+ stream << ", LocalRecipient\n";
+ return;
+ }
+ stream << ", NextSeqNo "
<< NextSeqNo << ", MyConfSeqNo " << MyConfirmedSeqNo << ", SeqNos " << ReceivedEventsSeqNos.size() << ", events size " << Events.size() << ", connected " << Connected << "\n";
}