diff options
author | Oleg Doronin <dorooleg@yandex.ru> | 2024-11-28 10:00:48 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-28 12:00:48 +0300 |
commit | 31e0e4623992fbd723f2a304f98dd0d5bbb34ebe (patch) | |
tree | 87b9a8995d78473e42d74142beb411dfaa06db87 | |
parent | 5c44bcafe0448a4655aab7a1f65a3a4530d987be (diff) | |
download | ydb-31e0e4623992fbd723f2a304f98dd0d5bbb34ebe.tar.gz |
mon page has been added for rd read actors (#12074)
4 files changed, 87 insertions, 1 deletions
diff --git a/ydb/core/fq/libs/row_dispatcher/events/data_plane.h b/ydb/core/fq/libs/row_dispatcher/events/data_plane.h index 57e0301b160..7be5a12c9c3 100644 --- a/ydb/core/fq/libs/row_dispatcher/events/data_plane.h +++ b/ydb/core/fq/libs/row_dispatcher/events/data_plane.h @@ -29,6 +29,8 @@ struct TEvRowDispatcher { EvCoordinatorResult, EvSessionStatistic, EvHeartbeat, + EvGetInternalStateRequest, + EvGetInternalStateResponse, EvEnd, }; @@ -138,6 +140,16 @@ struct TEvRowDispatcher { Record.SetPartitionId(partitionId); } }; + + struct TEvGetInternalStateRequest : public NActors::TEventPB<TEvGetInternalStateRequest, + NFq::NRowDispatcherProto::TEvGetInternalStateRequest, EEv::EvGetInternalStateRequest> { + TEvGetInternalStateRequest() = default; + }; + + struct TEvGetInternalStateResponse : public NActors::TEventPB<TEvGetInternalStateResponse, + NFq::NRowDispatcherProto::TEvGetInternalStateResponse, EEv::EvGetInternalStateResponse> { + TEvGetInternalStateResponse() = default; + }; }; } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/protos/events.proto b/ydb/core/fq/libs/row_dispatcher/protos/events.proto index 547476f6dfb..8a981fd7c69 100644 --- a/ydb/core/fq/libs/row_dispatcher/protos/events.proto +++ b/ydb/core/fq/libs/row_dispatcher/protos/events.proto @@ -81,3 +81,10 @@ message TEvHeartbeat { uint32 PartitionId = 1; optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100; } + +message TEvGetInternalStateRequest { +} + +message TEvGetInternalStateResponse { + string InternalState = 1; +} diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp index f3d0c8eed42..b70da20a0d8 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp @@ -316,9 +316,16 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> { TMap<TActorId, SessionInfo> Sessions; // key - TopicSession actor id }; + struct ReadActorInfo { + TString InternalState; + TInstant RequestTime; + TInstant ResponseTime; + }; + THashMap<ConsumerSessionKey, TAtomicSharedPtr<ConsumerInfo>, ConsumerSessionKeyHash> Consumers; TMap<ui64, TAtomicSharedPtr<ConsumerInfo>> ConsumersByEventQueueId; THashMap<TopicSessionKey, TopicSessionInfo, TopicSessionKeyHash> TopicSessions; + TMap<TActorId, ReadActorInfo> ReadActorsInternalState; public: explicit TRowDispatcher( @@ -352,6 +359,7 @@ public: void Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev); void Handle(NFq::TEvRowDispatcher::TEvStatistics::TPtr& ev); void Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev); + void Handle(NFq::TEvRowDispatcher::TEvGetInternalStateResponse::TPtr& ev); void Handle(NFq::TEvRowDispatcher::TEvHeartbeat::TPtr& ev); void Handle(const TEvPrivate::TEvTryConnect::TPtr&); @@ -363,6 +371,8 @@ public: void DeleteConsumer(const ConsumerSessionKey& key); void UpdateMetrics(); TString GetInternalState(); + TString GetReadActorsInternalState(); + void UpdateReadActorsInternalState(); template <class TEventPtr> bool CheckSession(TAtomicSharedPtr<ConsumerInfo>& consumer, const TEventPtr& ev); void SetQueryMetrics(const TQueryStatKey& queryKey, ui64 unreadBytesMax, ui64 unreadBytesAvg, i64 readLagMessagesMax); @@ -384,6 +394,7 @@ public: hFunc(NFq::TEvRowDispatcher::TEvSessionError, Handle); hFunc(NFq::TEvRowDispatcher::TEvStatistics, Handle); hFunc(NFq::TEvRowDispatcher::TEvSessionStatistic, Handle); + hFunc(NFq::TEvRowDispatcher::TEvGetInternalStateResponse, Handle); hFunc(TEvPrivate::TEvTryConnect, Handle); hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat, Handle); hFunc(NFq::TEvRowDispatcher::TEvHeartbeat, Handle); @@ -656,6 +667,39 @@ TString TRowDispatcher::GetInternalState() { return str.Str(); } +TString TRowDispatcher::GetReadActorsInternalState() { + TStringStream str; + for (const auto& [_, internalState]: ReadActorsInternalState) { + str << "ResponseTime: " << internalState.ResponseTime << " " << internalState.InternalState << Endl; + } + return str.Str(); +} + +void TRowDispatcher::UpdateReadActorsInternalState() { + TSet<TActorId> ReadActors; + for (const auto& [key, _]: Consumers) { + ReadActors.insert(key.ReadActorId); + } + + for(auto it = ReadActorsInternalState.begin(); it != ReadActorsInternalState.end();) { + if (!ReadActors.contains(it->first)) { + it = ReadActorsInternalState.erase(it); + } else { + ++it; + } + } + + auto now = TInstant::Now(); + for (const auto& readActor: ReadActors) { + auto& internalStateInfo = ReadActorsInternalState[readActor]; + if (now - internalStateInfo.RequestTime < TDuration::Seconds(30)) { + continue; + } + internalStateInfo.RequestTime = now; + Send(readActor, new NFq::TEvRowDispatcher::TEvGetInternalStateRequest{}, 0, 0); + } +} + void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) { LOG_ROW_DISPATCHER_DEBUG("Received TEvStartSession from " << ev->Sender << ", topicPath " << ev->Get()->Record.GetSource().GetTopicPath() << " part id " << ev->Get()->Record.GetPartitionId() << " query id " << ev->Get()->Record.GetQueryId() << " cookie " << ev->Cookie); @@ -916,11 +960,15 @@ void TRowDispatcher::PrintStateToLog() { } void TRowDispatcher::Handle(const NMon::TEvHttpInfo::TPtr& ev) { + UpdateReadActorsInternalState(); TStringStream str; HTML(str) { PRE() { + str << "Current Time: " << TInstant::Now() << Endl; str << "Current state:" << Endl; str << GetInternalState() << Endl; + str << "Read actors state: " << Endl; + str << GetReadActorsInternalState() << Endl; str << Endl; } } @@ -954,6 +1002,12 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev } } +void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvGetInternalStateResponse::TPtr& ev) { + auto& readActorInternalState = ReadActorsInternalState[ev->Sender]; + readActorInternalState.InternalState = ev->Get()->Record.GetInternalState(); + readActorInternalState.ResponseTime = TInstant::Now(); +} + } // namespace //////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp index 81b1820193d..426a4ab92f7 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp @@ -226,6 +226,7 @@ public: void Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev); void Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev); void Handle(NFq::TEvRowDispatcher::TEvStatistics::TPtr& ev); + void Handle(NFq::TEvRowDispatcher::TEvGetInternalStateRequest::TPtr& ev); void HandleDisconnected(TEvInterconnect::TEvNodeDisconnected::TPtr& ev); void HandleConnected(TEvInterconnect::TEvNodeConnected::TPtr& ev); @@ -245,6 +246,7 @@ public: hFunc(NFq::TEvRowDispatcher::TEvStartSessionAck, Handle); hFunc(NFq::TEvRowDispatcher::TEvSessionError, Handle); hFunc(NFq::TEvRowDispatcher::TEvStatistics, Handle); + hFunc(NFq::TEvRowDispatcher::TEvGetInternalStateRequest, Handle); hFunc(NActors::TEvents::TEvPong, Handle); hFunc(TEvInterconnect::TEvNodeConnected, HandleConnected); @@ -269,6 +271,7 @@ public: void StopSessions(); void ReInit(const TString& reason); void PrintInternalState(); + TString GetInternalState(); void TrySendGetNextBatch(SessionInfo& sessionInfo); template <class TEventPtr> bool CheckSession(SessionInfo& session, const TEventPtr& ev, ui64 partitionId); @@ -532,6 +535,12 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvStatistics::TPtr& ev) { } } +void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvGetInternalStateRequest::TPtr& ev) { + auto response = std::make_unique<NFq::TEvRowDispatcher::TEvGetInternalStateResponse>(); + response->Record.SetInternalState(GetInternalState()); + Send(ev->Sender, response.release(), 0, ev->Cookie); +} + void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev) { const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get()->Record.GetTransportMeta(); SRC_LOG_T("TEvNewDataArrived from " << ev->Sender << ", part id " << ev->Get()->Record.GetPartitionId() << ", seqNo " << meta.GetSeqNo() << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo()); @@ -772,6 +781,10 @@ void TDqPqRdReadActor::Handle(TEvPrivate::TEvPrintState::TPtr&) { } void TDqPqRdReadActor::PrintInternalState() { + SRC_LOG_I(GetInternalState()); +} + +TString TDqPqRdReadActor::GetInternalState() { TStringStream str; str << "State: used buffer size " << ReadyBufferSizeBytes << " ready buffer event size " << ReadyBuffer.size() << " state " << static_cast<ui64>(State) << " InFlyAsyncInputData " << InFlyAsyncInputData << "\n"; str << "Counters: GetAsyncInputData " << Counters.GetAsyncInputData << " CoordinatorChanged " << Counters.CoordinatorChanged << " CoordinatorResult " << Counters.CoordinatorResult @@ -789,7 +802,7 @@ void TDqPqRdReadActor::PrintInternalState() { << " has pending data " << sessionInfo.HasPendingData << " connection id " << sessionInfo.Generation << " "; sessionInfo.EventsQueue.PrintInternalState(str); } - SRC_LOG_I(str.Str()); + return str.Str(); } void TDqPqRdReadActor::Handle(TEvPrivate::TEvProcessState::TPtr&) { |