aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2023-01-22 02:44:34 +0300
committerhor911 <hor911@ydb.tech>2023-01-22 02:44:34 +0300
commita86f99d30771f6da1089bf2bfa53f1806b0c322c (patch)
tree9d07426987eea7cc2abbc07b64dd52d05d666fda
parent3b28556d63f5dd65bc3287ab9d18bf55e2e7ef8a (diff)
downloadydb-a86f99d30771f6da1089bf2bfa53f1806b0c322c.tar.gz
PQ Ingress/Egress statistics
-rw-r--r--ydb/core/yq/libs/control_plane_storage/internal/utils.cpp2
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp11
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp11
-rw-r--r--ydb/library/yql/providers/pq/proto/dq_io_state.proto2
4 files changed, 24 insertions, 2 deletions
diff --git a/ydb/core/yq/libs/control_plane_storage/internal/utils.cpp b/ydb/core/yq/libs/control_plane_storage/internal/utils.cpp
index 8d2f537c01..3f02a17960 100644
--- a/ydb/core/yq/libs/control_plane_storage/internal/utils.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/internal/utils.cpp
@@ -207,6 +207,8 @@ TString GetPrettyStatistics(const TString& statistics) {
RemapNode(writer, p.second, "TaskRunner.Stage=Total.CpuTimeUs", "CpuTimeUs");
RemapNode(writer, p.second, "TaskRunner.Stage=Total.IngressS3SourceBytes", "IngressObjectStorageBytes");
RemapNode(writer, p.second, "TaskRunner.Stage=Total.EgressS3SinkBytes", "EgressObjectStorageBytes");
+ RemapNode(writer, p.second, "TaskRunner.Stage=Total.IngressPqSourceBytes", "IngressStreamBytes");
+ RemapNode(writer, p.second, "TaskRunner.Stage=Total.EgressPqSinkBytes", "EgressStreamBytes");
RemapNode(writer, p.second, "TaskRunner.Source=0.Stage=Total.RowsIn", "IngressRows");
writer.OnEndMap();
}
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
index ad097c9525..312d677c16 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
@@ -159,6 +159,7 @@ public:
}
stateProto.SetStartingMessageTimestampMs(StartingMessageTimestamp.MilliSeconds());
+ stateProto.SetIngressBytes(IngressBytes);
TString stateBlob;
YQL_ENSURE(stateProto.SerializeToString(&stateBlob));
@@ -173,6 +174,7 @@ public:
void LoadState(const NDqProto::TSourceState& state) override {
TInstant minStartingMessageTs = state.DataSize() ? TInstant::Max() : StartingMessageTimestamp;
+ ui64 ingressBytes = 0;
for (const auto& stateData : state.GetData()) {
const auto& data = stateData.GetStateData();
if (data.GetVersion() == StateVersion) { // Current version
@@ -189,11 +191,13 @@ public:
}
}
minStartingMessageTs = Min(minStartingMessageTs, TInstant::MilliSeconds(stateProto.GetStartingMessageTimestampMs()));
+ ingressBytes += stateProto.GetIngressBytes();
} else {
ythrow yexception() << "Invalid state version " << data.GetVersion();
}
}
StartingMessageTimestamp = minStartingMessageTs;
+ IngressBytes = ingressBytes;
InitWatermarkTracker();
if (ReadSession) {
@@ -210,6 +214,10 @@ public:
}
}
+ ui64 GetIngressBytes() override {
+ return IngressBytes;
+ }
+
ui64 GetInputIndex() const override {
return InputIndex;
};
@@ -433,7 +441,7 @@ private:
const auto partitionKey = MakePartitionKey(event.GetPartitionStream());
for (const auto& message : event.GetMessages()) {
const TString& data = message.GetData();
-
+ Self.IngressBytes += data.size();
LWPROBE(PqReadDataReceived, TString(TStringBuilder() << Self.TxId), Self.SourceParams.GetTopicPath(), data);
SRC_LOG_T("Data received: " << message.DebugString(true));
@@ -551,6 +559,7 @@ private:
NThreading::TFuture<void> EventFuture;
THashMap<TPartitionKey, ui64> PartitionToOffset; // {cluster, partition} -> offset of next event.
TInstant StartingMessageTimestamp;
+ ui64 IngressBytes = 0;
const NActors::TActorId ComputeActorId;
std::queue<std::pair<ui64, NYdb::NPersQueue::TDeferredCommit>> DeferredCommits;
NYdb::NPersQueue::TDeferredCommit CurrentDeferredCommit;
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp
index be8916379a..434e37e678 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp
@@ -197,11 +197,16 @@ public:
SourceId = stateProto.GetSourceId();
ConfirmedSeqNo = stateProto.GetConfirmedSeqNo();
NextSeqNo = ConfirmedSeqNo + 1;
+ EgressBytes = stateProto.GetEgressBytes();
return;
}
ythrow yexception() << "Invalid state version " << data.GetVersion();
}
+ ui64 GetEgressBytes() override {
+ return EgressBytes;
+ }
+
void CommitState(const NDqProto::TCheckpoint& checkpoint) override {
Y_UNUSED(checkpoint);
}
@@ -307,6 +312,7 @@ private:
NPq::NProto::TDqPqTopicSinkState stateProto;
stateProto.SetSourceId(GetSourceId());
stateProto.SetConfirmedSeqNo(ConfirmedSeqNo);
+ stateProto.SetEgressBytes(EgressBytes);
TString serializedState;
YQL_ENSURE(stateProto.SerializeToString(&serializedState));
@@ -321,7 +327,9 @@ private:
void WriteNextMessage(NYdb::NPersQueue::TContinuationToken&& token) {
SINK_LOG_T("Write data: \"" << Buffer.front() << "\" with seq no " << NextSeqNo);
WriteSession->Write(std::move(token), Buffer.front(), NextSeqNo++);
- WaitingAcks.push(GetItemSize(Buffer.front()));
+ auto itemSize = GetItemSize(Buffer.front());
+ WaitingAcks.push(itemSize);
+ EgressBytes += itemSize;
Buffer.pop();
}
@@ -400,6 +408,7 @@ private:
const TString LogPrefix;
i64 FreeSpace = 0;
bool Finished = false;
+ ui64 EgressBytes = 0;
NYdb::NPersQueue::TPersQueueClient PersQueueClient;
std::shared_ptr<NYdb::NPersQueue::IWriteSession> WriteSession;
diff --git a/ydb/library/yql/providers/pq/proto/dq_io_state.proto b/ydb/library/yql/providers/pq/proto/dq_io_state.proto
index 73861eb78c..ea936be6a9 100644
--- a/ydb/library/yql/providers/pq/proto/dq_io_state.proto
+++ b/ydb/library/yql/providers/pq/proto/dq_io_state.proto
@@ -7,6 +7,7 @@ message TDqPqTopicSourceState {
repeated TTopicDescription Topics = 1;
repeated TPartitionReadState Partitions = 2;
uint64 StartingMessageTimestampMs = 3; // StartingMessageTimestamp in ms for ReadSession settings
+ uint64 IngressBytes = 4;
message TTopicDescription {
string DatabaseId = 1;
@@ -26,4 +27,5 @@ message TDqPqTopicSourceState {
message TDqPqTopicSinkState {
string SourceId = 1;
uint64 ConfirmedSeqNo = 2;
+ uint64 EgressBytes = 3;
}