aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorOleg Doronin <dorooleg@yandex.ru>2024-11-28 10:00:48 +0100
committerGitHub <noreply@github.com>2024-11-28 12:00:48 +0300
commit31e0e4623992fbd723f2a304f98dd0d5bbb34ebe (patch)
tree87b9a8995d78473e42d74142beb411dfaa06db87
parent5c44bcafe0448a4655aab7a1f65a3a4530d987be (diff)
downloadydb-31e0e4623992fbd723f2a304f98dd0d5bbb34ebe.tar.gz
mon page has been added for rd read actors (#12074)
-rw-r--r--ydb/core/fq/libs/row_dispatcher/events/data_plane.h12
-rw-r--r--ydb/core/fq/libs/row_dispatcher/protos/events.proto7
-rw-r--r--ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp54
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp15
4 files changed, 87 insertions, 1 deletions
diff --git a/ydb/core/fq/libs/row_dispatcher/events/data_plane.h b/ydb/core/fq/libs/row_dispatcher/events/data_plane.h
index 57e0301b160..7be5a12c9c3 100644
--- a/ydb/core/fq/libs/row_dispatcher/events/data_plane.h
+++ b/ydb/core/fq/libs/row_dispatcher/events/data_plane.h
@@ -29,6 +29,8 @@ struct TEvRowDispatcher {
EvCoordinatorResult,
EvSessionStatistic,
EvHeartbeat,
+ EvGetInternalStateRequest,
+ EvGetInternalStateResponse,
EvEnd,
};
@@ -138,6 +140,16 @@ struct TEvRowDispatcher {
Record.SetPartitionId(partitionId);
}
};
+
+ struct TEvGetInternalStateRequest : public NActors::TEventPB<TEvGetInternalStateRequest,
+ NFq::NRowDispatcherProto::TEvGetInternalStateRequest, EEv::EvGetInternalStateRequest> {
+ TEvGetInternalStateRequest() = default;
+ };
+
+ struct TEvGetInternalStateResponse : public NActors::TEventPB<TEvGetInternalStateResponse,
+ NFq::NRowDispatcherProto::TEvGetInternalStateResponse, EEv::EvGetInternalStateResponse> {
+ TEvGetInternalStateResponse() = default;
+ };
};
} // namespace NFq
diff --git a/ydb/core/fq/libs/row_dispatcher/protos/events.proto b/ydb/core/fq/libs/row_dispatcher/protos/events.proto
index 547476f6dfb..8a981fd7c69 100644
--- a/ydb/core/fq/libs/row_dispatcher/protos/events.proto
+++ b/ydb/core/fq/libs/row_dispatcher/protos/events.proto
@@ -81,3 +81,10 @@ message TEvHeartbeat {
uint32 PartitionId = 1;
optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100;
}
+
+message TEvGetInternalStateRequest {
+}
+
+message TEvGetInternalStateResponse {
+ string InternalState = 1;
+}
diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
index f3d0c8eed42..b70da20a0d8 100644
--- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
@@ -316,9 +316,16 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
TMap<TActorId, SessionInfo> Sessions; // key - TopicSession actor id
};
+ struct ReadActorInfo {
+ TString InternalState;
+ TInstant RequestTime;
+ TInstant ResponseTime;
+ };
+
THashMap<ConsumerSessionKey, TAtomicSharedPtr<ConsumerInfo>, ConsumerSessionKeyHash> Consumers;
TMap<ui64, TAtomicSharedPtr<ConsumerInfo>> ConsumersByEventQueueId;
THashMap<TopicSessionKey, TopicSessionInfo, TopicSessionKeyHash> TopicSessions;
+ TMap<TActorId, ReadActorInfo> ReadActorsInternalState;
public:
explicit TRowDispatcher(
@@ -352,6 +359,7 @@ public:
void Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev);
void Handle(NFq::TEvRowDispatcher::TEvStatistics::TPtr& ev);
void Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev);
+ void Handle(NFq::TEvRowDispatcher::TEvGetInternalStateResponse::TPtr& ev);
void Handle(NFq::TEvRowDispatcher::TEvHeartbeat::TPtr& ev);
void Handle(const TEvPrivate::TEvTryConnect::TPtr&);
@@ -363,6 +371,8 @@ public:
void DeleteConsumer(const ConsumerSessionKey& key);
void UpdateMetrics();
TString GetInternalState();
+ TString GetReadActorsInternalState();
+ void UpdateReadActorsInternalState();
template <class TEventPtr>
bool CheckSession(TAtomicSharedPtr<ConsumerInfo>& consumer, const TEventPtr& ev);
void SetQueryMetrics(const TQueryStatKey& queryKey, ui64 unreadBytesMax, ui64 unreadBytesAvg, i64 readLagMessagesMax);
@@ -384,6 +394,7 @@ public:
hFunc(NFq::TEvRowDispatcher::TEvSessionError, Handle);
hFunc(NFq::TEvRowDispatcher::TEvStatistics, Handle);
hFunc(NFq::TEvRowDispatcher::TEvSessionStatistic, Handle);
+ hFunc(NFq::TEvRowDispatcher::TEvGetInternalStateResponse, Handle);
hFunc(TEvPrivate::TEvTryConnect, Handle);
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat, Handle);
hFunc(NFq::TEvRowDispatcher::TEvHeartbeat, Handle);
@@ -656,6 +667,39 @@ TString TRowDispatcher::GetInternalState() {
return str.Str();
}
+TString TRowDispatcher::GetReadActorsInternalState() {
+ TStringStream str;
+ for (const auto& [_, internalState]: ReadActorsInternalState) {
+ str << "ResponseTime: " << internalState.ResponseTime << " " << internalState.InternalState << Endl;
+ }
+ return str.Str();
+}
+
+void TRowDispatcher::UpdateReadActorsInternalState() {
+ TSet<TActorId> ReadActors;
+ for (const auto& [key, _]: Consumers) {
+ ReadActors.insert(key.ReadActorId);
+ }
+
+ for(auto it = ReadActorsInternalState.begin(); it != ReadActorsInternalState.end();) {
+ if (!ReadActors.contains(it->first)) {
+ it = ReadActorsInternalState.erase(it);
+ } else {
+ ++it;
+ }
+ }
+
+ auto now = TInstant::Now();
+ for (const auto& readActor: ReadActors) {
+ auto& internalStateInfo = ReadActorsInternalState[readActor];
+ if (now - internalStateInfo.RequestTime < TDuration::Seconds(30)) {
+ continue;
+ }
+ internalStateInfo.RequestTime = now;
+ Send(readActor, new NFq::TEvRowDispatcher::TEvGetInternalStateRequest{}, 0, 0);
+ }
+}
+
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
LOG_ROW_DISPATCHER_DEBUG("Received TEvStartSession from " << ev->Sender << ", topicPath " << ev->Get()->Record.GetSource().GetTopicPath() <<
" part id " << ev->Get()->Record.GetPartitionId() << " query id " << ev->Get()->Record.GetQueryId() << " cookie " << ev->Cookie);
@@ -916,11 +960,15 @@ void TRowDispatcher::PrintStateToLog() {
}
void TRowDispatcher::Handle(const NMon::TEvHttpInfo::TPtr& ev) {
+ UpdateReadActorsInternalState();
TStringStream str;
HTML(str) {
PRE() {
+ str << "Current Time: " << TInstant::Now() << Endl;
str << "Current state:" << Endl;
str << GetInternalState() << Endl;
+ str << "Read actors state: " << Endl;
+ str << GetReadActorsInternalState() << Endl;
str << Endl;
}
}
@@ -954,6 +1002,12 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev
}
}
+void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvGetInternalStateResponse::TPtr& ev) {
+ auto& readActorInternalState = ReadActorsInternalState[ev->Sender];
+ readActorInternalState.InternalState = ev->Get()->Record.GetInternalState();
+ readActorInternalState.ResponseTime = TInstant::Now();
+}
+
} // namespace
////////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp
index 81b1820193d..426a4ab92f7 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp
@@ -226,6 +226,7 @@ public:
void Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev);
void Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev);
void Handle(NFq::TEvRowDispatcher::TEvStatistics::TPtr& ev);
+ void Handle(NFq::TEvRowDispatcher::TEvGetInternalStateRequest::TPtr& ev);
void HandleDisconnected(TEvInterconnect::TEvNodeDisconnected::TPtr& ev);
void HandleConnected(TEvInterconnect::TEvNodeConnected::TPtr& ev);
@@ -245,6 +246,7 @@ public:
hFunc(NFq::TEvRowDispatcher::TEvStartSessionAck, Handle);
hFunc(NFq::TEvRowDispatcher::TEvSessionError, Handle);
hFunc(NFq::TEvRowDispatcher::TEvStatistics, Handle);
+ hFunc(NFq::TEvRowDispatcher::TEvGetInternalStateRequest, Handle);
hFunc(NActors::TEvents::TEvPong, Handle);
hFunc(TEvInterconnect::TEvNodeConnected, HandleConnected);
@@ -269,6 +271,7 @@ public:
void StopSessions();
void ReInit(const TString& reason);
void PrintInternalState();
+ TString GetInternalState();
void TrySendGetNextBatch(SessionInfo& sessionInfo);
template <class TEventPtr>
bool CheckSession(SessionInfo& session, const TEventPtr& ev, ui64 partitionId);
@@ -532,6 +535,12 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvStatistics::TPtr& ev) {
}
}
+void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvGetInternalStateRequest::TPtr& ev) {
+ auto response = std::make_unique<NFq::TEvRowDispatcher::TEvGetInternalStateResponse>();
+ response->Record.SetInternalState(GetInternalState());
+ Send(ev->Sender, response.release(), 0, ev->Cookie);
+}
+
void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev) {
const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get()->Record.GetTransportMeta();
SRC_LOG_T("TEvNewDataArrived from " << ev->Sender << ", part id " << ev->Get()->Record.GetPartitionId() << ", seqNo " << meta.GetSeqNo() << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo());
@@ -772,6 +781,10 @@ void TDqPqRdReadActor::Handle(TEvPrivate::TEvPrintState::TPtr&) {
}
void TDqPqRdReadActor::PrintInternalState() {
+ SRC_LOG_I(GetInternalState());
+}
+
+TString TDqPqRdReadActor::GetInternalState() {
TStringStream str;
str << "State: used buffer size " << ReadyBufferSizeBytes << " ready buffer event size " << ReadyBuffer.size() << " state " << static_cast<ui64>(State) << " InFlyAsyncInputData " << InFlyAsyncInputData << "\n";
str << "Counters: GetAsyncInputData " << Counters.GetAsyncInputData << " CoordinatorChanged " << Counters.CoordinatorChanged << " CoordinatorResult " << Counters.CoordinatorResult
@@ -789,7 +802,7 @@ void TDqPqRdReadActor::PrintInternalState() {
<< " has pending data " << sessionInfo.HasPendingData << " connection id " << sessionInfo.Generation << " ";
sessionInfo.EventsQueue.PrintInternalState(str);
}
- SRC_LOG_I(str.Str());
+ return str.Str();
}
void TDqPqRdReadActor::Handle(TEvPrivate::TEvProcessState::TPtr&) {