diff options
author | Dmitry Kardymon <kardymon-d@ydb.tech> | 2024-11-12 14:29:02 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-12 14:29:02 +0300 |
commit | a431c945bbf0d0a8a1f57e9c552fc09021cde4ee (patch) | |
tree | 6aab199b20773b47d21fbd735f10f6fac6b438d4 | |
parent | a2e65eecd27f4f81c196baae55e23637933714b4 (diff) | |
download | ydb-a431c945bbf0d0a8a1f57e9c552fc09021cde4ee.tar.gz |
YQ-3850 Shared reading: fix interconnect subscribing (#11506)
-rw-r--r-- | ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp | 135 | ||||
-rw-r--r-- | ydb/library/yql/dq/actors/common/retry_queue.cpp | 7 |
2 files changed, 111 insertions, 31 deletions
diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp index 2bfde99d43..90b7f88977 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp @@ -54,6 +54,7 @@ struct TEvPrivate { EvCoordinatorPing = EvBegin + 20, EvUpdateMetrics, EvPrintStateToLog, + EvTryConnect, EvEnd }; @@ -61,6 +62,11 @@ struct TEvPrivate { struct TEvCoordinatorPing : NActors::TEventLocal<TEvCoordinatorPing, EvCoordinatorPing> {}; struct TEvUpdateMetrics : public NActors::TEventLocal<TEvUpdateMetrics, EvUpdateMetrics> {}; struct TEvPrintStateToLog : public NActors::TEventLocal<TEvPrintStateToLog, EvPrintStateToLog> {}; + struct TEvTryConnect : public NActors::TEventLocal<TEvTryConnect, EvTryConnect> { + TEvTryConnect(ui32 nodeId = 0) + : NodeId(nodeId) {} + ui32 NodeId = 0; + }; }; struct TQueryStat { @@ -119,6 +125,92 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> { } }; + struct TNodesTracker{ + class TRetryState { + public: + TDuration GetNextDelay() { + constexpr TDuration MaxDelay = TDuration::Seconds(10); + constexpr TDuration MinDelay = TDuration::MilliSeconds(100); // from second retry + TDuration ret = Delay; // The first delay is zero + Delay = ClampVal(Delay * 2, MinDelay, MaxDelay); + return ret ? RandomizeDelay(ret) : ret; + } + private: + static TDuration RandomizeDelay(TDuration baseDelay) { + const TDuration::TValue half = baseDelay.GetValue() / 2; + return TDuration::FromValue(half + RandomNumber<TDuration::TValue>(half)); + } + private: + TDuration Delay; // The first time retry will be done instantly. + }; + + struct TNodeState { + bool Connected = false; + bool RetryScheduled = false; + TMaybe<TRetryState> RetryState; + }; + public: + void Init(const NActors::TActorId& selfId) { + SelfId = selfId; + } + + void AddNode(ui32 nodeId) { + if (Nodes.contains(nodeId)) { + return; + } + HandleNodeDisconnected(nodeId); + } + + void TryConnect(ui32 nodeId) { + auto& state = Nodes[nodeId]; + state.RetryScheduled = false; + if (state.Connected) { + return; + } + auto connectEvent = MakeHolder<NActors::TEvInterconnect::TEvConnectNode>(); + auto proxyId = NActors::TActivationContext::InterconnectProxy(nodeId); + NActors::TActivationContext::Send( + new NActors::IEventHandle(proxyId, SelfId, connectEvent.Release(), 0, 0)); + } + + bool GetNodeConnected(ui32 nodeId) { + return Nodes[nodeId].Connected; + } + + void HandleNodeConnected(ui32 nodeId) { + auto& state = Nodes[nodeId]; + state.Connected = true; + state.RetryState = Nothing(); + } + + void HandleNodeDisconnected(ui32 nodeId) { + auto& state = Nodes[nodeId]; + state.Connected = false; + if (state.RetryScheduled) { + return; + } + state.RetryScheduled = true; + if (!state.RetryState) { + state.RetryState.ConstructInPlace(); + } + auto ev = MakeHolder<TEvPrivate::TEvTryConnect>(nodeId); + auto delay = state.RetryState->GetNextDelay(); + NActors::TActivationContext::Schedule(delay, new NActors::IEventHandle(SelfId, SelfId, ev.Release())); + } + + void PrintInternalState(TStringStream& stream) const { + stream << "Nodes states: \n"; + for (const auto& [nodeId, state] : Nodes) { + stream << " id " << nodeId << " connected " << state.Connected << " retry scheduled " << state.RetryScheduled << "\n"; + } + } + + private: + TMap<ui32, TNodeState> Nodes; + NActors::TActorId SelfId; + TString LogPrefix = "RowDispatcher: "; + }; + NConfig::TRowDispatcherConfig Config; NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory; @@ -134,8 +226,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> { const ::NMonitoring::TDynamicCounterPtr Counters; TRowDispatcherMetrics Metrics; NYql::IPqGateway::TPtr PqGateway; - THashSet<TActorId> InterconnectSessions; - TMap<ui32, bool> NodeConnected; + TNodesTracker NodesTracker; struct ConsumerCounters { ui64 NewDataArrived = 0; @@ -222,7 +313,7 @@ public: void Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev); void Handle(NFq::TEvRowDispatcher::TEvHeartbeat::TPtr& ev); - void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr&); + void Handle(const TEvPrivate::TEvTryConnect::TPtr&); void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat::TPtr&); void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr&); void Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&); @@ -230,7 +321,6 @@ public: void Handle(const NMon::TEvHttpInfo::TPtr&); void DeleteConsumer(const ConsumerSessionKey& key); - void UpdateInterconnectSessions(const NActors::TActorId& interconnectSession); void UpdateMetrics(); TString GetInternalState(); @@ -250,7 +340,7 @@ public: hFunc(NFq::TEvRowDispatcher::TEvSessionError, Handle); hFunc(NFq::TEvRowDispatcher::TEvStatus, Handle); hFunc(NFq::TEvRowDispatcher::TEvSessionStatistic, Handle); - hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvRetry, Handle); + hFunc(TEvPrivate::TEvTryConnect, Handle); hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat, Handle); hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed, Handle); hFunc(NFq::TEvRowDispatcher::TEvHeartbeat, Handle); @@ -300,6 +390,7 @@ void TRowDispatcher::Bootstrap() { mon->RegisterActorPage(actorsMonPage, "row_dispatcher", "Row Dispatcher", false, TlsActivationContext->ExecutorThread.ActorSystem, SelfId()); } + NodesTracker.Init(SelfId()); } void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr& ev) { @@ -317,7 +408,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr& void TRowDispatcher::HandleConnected(TEvInterconnect::TEvNodeConnected::TPtr& ev) { LOG_ROW_DISPATCHER_DEBUG("EvNodeConnected, node id " << ev->Get()->NodeId); - NodeConnected[ev->Get()->NodeId] = true; + NodesTracker.HandleNodeConnected(ev->Get()->NodeId); for (auto& [actorId, consumer] : Consumers) { consumer->EventsQueue.HandleNodeConnected(ev->Get()->NodeId); } @@ -325,7 +416,7 @@ void TRowDispatcher::HandleConnected(TEvInterconnect::TEvNodeConnected::TPtr& ev void TRowDispatcher::HandleDisconnected(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) { LOG_ROW_DISPATCHER_DEBUG("TEvNodeDisconnected, node id " << ev->Get()->NodeId); - NodeConnected[ev->Get()->NodeId] = false; + NodesTracker.HandleNodeDisconnected(ev->Get()->NodeId); for (auto& [actorId, consumer] : Consumers) { consumer->EventsQueue.HandleNodeDisconnected(ev->Get()->NodeId); } @@ -353,7 +444,7 @@ void TRowDispatcher::Handle(NActors::TEvents::TEvPong::TPtr&) { void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscribe::TPtr& ev) { LOG_ROW_DISPATCHER_DEBUG("TEvCoordinatorChangesSubscribe from " << ev->Sender); - UpdateInterconnectSessions(ev->InterconnectSession); + NodesTracker.AddNode(ev->Sender.NodeId()); CoordinatorChangedSubscribers.insert(ev->Sender); if (!CoordinatorActorId) { return; @@ -387,6 +478,7 @@ void TRowDispatcher::UpdateMetrics() { TString TRowDispatcher::GetInternalState() { TStringStream str; + NodesTracker.PrintInternalState(str); str << "Statistics:\n"; for (auto& [key, sessionsInfo] : TopicSessions) { str << " " << key.Endpoint << " / " << key.Database << " / " << key.TopicPath << " / " << key.PartitionId; @@ -410,7 +502,7 @@ TString TRowDispatcher::GetInternalState() { void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) { LOG_ROW_DISPATCHER_DEBUG("TEvStartSession from " << ev->Sender << ", topicPath " << ev->Get()->Record.GetSource().GetTopicPath() << " partitionId " << ev->Get()->Record.GetPartitionId()); - UpdateInterconnectSessions(ev->InterconnectSession); + NodesTracker.AddNode(ev->Sender.NodeId()); TMaybe<ui64> readOffset; if (ev->Get()->Record.HasOffset()) { readOffset = ev->Get()->Record.GetOffset(); @@ -430,7 +522,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) { LOG_ROW_DISPATCHER_DEBUG("Topic session count " << topicSessionInfo.Sessions.size()); Y_ENSURE(topicSessionInfo.Sessions.size() <= 1); - auto consumerInfo = MakeAtomicShared<ConsumerInfo>(ev->Sender, SelfId(), NextEventQueueId++, ev->Get()->Record, TActorId(), NodeConnected[ev->Sender.NodeId()]); + auto consumerInfo = MakeAtomicShared<ConsumerInfo>(ev->Sender, SelfId(), NextEventQueueId++, ev->Get()->Record, TActorId(), NodesTracker.GetNodeConnected(ev->Sender.NodeId())); Consumers[key] = consumerInfo; ConsumersByEventQueueId[consumerInfo->EventQueueId] = consumerInfo; if (!consumerInfo->EventsQueue.OnEventReceived(ev)) { @@ -575,14 +667,9 @@ void TRowDispatcher::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClo } } -void TRowDispatcher::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr& ev) { - LOG_ROW_DISPATCHER_TRACE("TEvRetry " << ev->Get()->EventQueueId); - auto it = ConsumersByEventQueueId.find(ev->Get()->EventQueueId); - if (it == ConsumersByEventQueueId.end()) { - LOG_ROW_DISPATCHER_WARN("No consumer with EventQueueId = " << ev->Get()->EventQueueId); - return; - } - it->second->EventsQueue.Retry(); +void TRowDispatcher::Handle(const TEvPrivate::TEvTryConnect::TPtr& ev) { + LOG_ROW_DISPATCHER_TRACE("TEvTryConnect to node id " << ev->Get()->NodeId); + NodesTracker.TryConnect(ev->Get()->NodeId); } void TRowDispatcher::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat::TPtr& ev) { @@ -705,18 +792,6 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev } } -void TRowDispatcher::UpdateInterconnectSessions(const NActors::TActorId& interconnectSession) { - if (!interconnectSession) { - return; - } - auto sessionsIt = InterconnectSessions.find(interconnectSession); - if (sessionsIt != InterconnectSessions.end()) { - return; - } - Send(interconnectSession, new NActors::TEvents::TEvSubscribe, IEventHandle::FlagTrackDelivery); - InterconnectSessions.insert(interconnectSession); -} - } // namespace //////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/library/yql/dq/actors/common/retry_queue.cpp b/ydb/library/yql/dq/actors/common/retry_queue.cpp index 2d0a61e0b0..2f232b6685 100644 --- a/ydb/library/yql/dq/actors/common/retry_queue.cpp +++ b/ydb/library/yql/dq/actors/common/retry_queue.cpp @@ -168,7 +168,12 @@ TDuration TRetryEventsQueue::TRetryState::RandomizeDelay(TDuration baseDelay) { } void TRetryEventsQueue::PrintInternalState(TStringStream& stream) const { - stream << "id " << EventQueueId << ", NextSeqNo " + stream << "id " << EventQueueId; + if (LocalRecipient) { + stream << ", LocalRecipient\n"; + return; + } + stream << ", NextSeqNo " << NextSeqNo << ", MyConfSeqNo " << MyConfirmedSeqNo << ", SeqNos " << ReceivedEventsSeqNos.size() << ", events size " << Events.size() << ", connected " << Connected << "\n"; } |