diff options
author | hor911 <hor911@ydb.tech> | 2023-01-22 02:44:34 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2023-01-22 02:44:34 +0300 |
commit | a86f99d30771f6da1089bf2bfa53f1806b0c322c (patch) | |
tree | 9d07426987eea7cc2abbc07b64dd52d05d666fda | |
parent | 3b28556d63f5dd65bc3287ab9d18bf55e2e7ef8a (diff) | |
download | ydb-a86f99d30771f6da1089bf2bfa53f1806b0c322c.tar.gz |
PQ Ingress/Egress statistics
4 files changed, 24 insertions, 2 deletions
diff --git a/ydb/core/yq/libs/control_plane_storage/internal/utils.cpp b/ydb/core/yq/libs/control_plane_storage/internal/utils.cpp index 8d2f537c01..3f02a17960 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/utils.cpp +++ b/ydb/core/yq/libs/control_plane_storage/internal/utils.cpp @@ -207,6 +207,8 @@ TString GetPrettyStatistics(const TString& statistics) { RemapNode(writer, p.second, "TaskRunner.Stage=Total.CpuTimeUs", "CpuTimeUs"); RemapNode(writer, p.second, "TaskRunner.Stage=Total.IngressS3SourceBytes", "IngressObjectStorageBytes"); RemapNode(writer, p.second, "TaskRunner.Stage=Total.EgressS3SinkBytes", "EgressObjectStorageBytes"); + RemapNode(writer, p.second, "TaskRunner.Stage=Total.IngressPqSourceBytes", "IngressStreamBytes"); + RemapNode(writer, p.second, "TaskRunner.Stage=Total.EgressPqSinkBytes", "EgressStreamBytes"); RemapNode(writer, p.second, "TaskRunner.Source=0.Stage=Total.RowsIn", "IngressRows"); writer.OnEndMap(); } diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp index ad097c9525..312d677c16 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp @@ -159,6 +159,7 @@ public: } stateProto.SetStartingMessageTimestampMs(StartingMessageTimestamp.MilliSeconds()); + stateProto.SetIngressBytes(IngressBytes); TString stateBlob; YQL_ENSURE(stateProto.SerializeToString(&stateBlob)); @@ -173,6 +174,7 @@ public: void LoadState(const NDqProto::TSourceState& state) override { TInstant minStartingMessageTs = state.DataSize() ? TInstant::Max() : StartingMessageTimestamp; + ui64 ingressBytes = 0; for (const auto& stateData : state.GetData()) { const auto& data = stateData.GetStateData(); if (data.GetVersion() == StateVersion) { // Current version @@ -189,11 +191,13 @@ public: } } minStartingMessageTs = Min(minStartingMessageTs, TInstant::MilliSeconds(stateProto.GetStartingMessageTimestampMs())); + ingressBytes += stateProto.GetIngressBytes(); } else { ythrow yexception() << "Invalid state version " << data.GetVersion(); } } StartingMessageTimestamp = minStartingMessageTs; + IngressBytes = ingressBytes; InitWatermarkTracker(); if (ReadSession) { @@ -210,6 +214,10 @@ public: } } + ui64 GetIngressBytes() override { + return IngressBytes; + } + ui64 GetInputIndex() const override { return InputIndex; }; @@ -433,7 +441,7 @@ private: const auto partitionKey = MakePartitionKey(event.GetPartitionStream()); for (const auto& message : event.GetMessages()) { const TString& data = message.GetData(); - + Self.IngressBytes += data.size(); LWPROBE(PqReadDataReceived, TString(TStringBuilder() << Self.TxId), Self.SourceParams.GetTopicPath(), data); SRC_LOG_T("Data received: " << message.DebugString(true)); @@ -551,6 +559,7 @@ private: NThreading::TFuture<void> EventFuture; THashMap<TPartitionKey, ui64> PartitionToOffset; // {cluster, partition} -> offset of next event. TInstant StartingMessageTimestamp; + ui64 IngressBytes = 0; const NActors::TActorId ComputeActorId; std::queue<std::pair<ui64, NYdb::NPersQueue::TDeferredCommit>> DeferredCommits; NYdb::NPersQueue::TDeferredCommit CurrentDeferredCommit; diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp index be8916379a..434e37e678 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp @@ -197,11 +197,16 @@ public: SourceId = stateProto.GetSourceId(); ConfirmedSeqNo = stateProto.GetConfirmedSeqNo(); NextSeqNo = ConfirmedSeqNo + 1; + EgressBytes = stateProto.GetEgressBytes(); return; } ythrow yexception() << "Invalid state version " << data.GetVersion(); } + ui64 GetEgressBytes() override { + return EgressBytes; + } + void CommitState(const NDqProto::TCheckpoint& checkpoint) override { Y_UNUSED(checkpoint); } @@ -307,6 +312,7 @@ private: NPq::NProto::TDqPqTopicSinkState stateProto; stateProto.SetSourceId(GetSourceId()); stateProto.SetConfirmedSeqNo(ConfirmedSeqNo); + stateProto.SetEgressBytes(EgressBytes); TString serializedState; YQL_ENSURE(stateProto.SerializeToString(&serializedState)); @@ -321,7 +327,9 @@ private: void WriteNextMessage(NYdb::NPersQueue::TContinuationToken&& token) { SINK_LOG_T("Write data: \"" << Buffer.front() << "\" with seq no " << NextSeqNo); WriteSession->Write(std::move(token), Buffer.front(), NextSeqNo++); - WaitingAcks.push(GetItemSize(Buffer.front())); + auto itemSize = GetItemSize(Buffer.front()); + WaitingAcks.push(itemSize); + EgressBytes += itemSize; Buffer.pop(); } @@ -400,6 +408,7 @@ private: const TString LogPrefix; i64 FreeSpace = 0; bool Finished = false; + ui64 EgressBytes = 0; NYdb::NPersQueue::TPersQueueClient PersQueueClient; std::shared_ptr<NYdb::NPersQueue::IWriteSession> WriteSession; diff --git a/ydb/library/yql/providers/pq/proto/dq_io_state.proto b/ydb/library/yql/providers/pq/proto/dq_io_state.proto index 73861eb78c..ea936be6a9 100644 --- a/ydb/library/yql/providers/pq/proto/dq_io_state.proto +++ b/ydb/library/yql/providers/pq/proto/dq_io_state.proto @@ -7,6 +7,7 @@ message TDqPqTopicSourceState { repeated TTopicDescription Topics = 1; repeated TPartitionReadState Partitions = 2; uint64 StartingMessageTimestampMs = 3; // StartingMessageTimestamp in ms for ReadSession settings + uint64 IngressBytes = 4; message TTopicDescription { string DatabaseId = 1; @@ -26,4 +27,5 @@ message TDqPqTopicSourceState { message TDqPqTopicSinkState { string SourceId = 1; uint64 ConfirmedSeqNo = 2; + uint64 EgressBytes = 3; } |