aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2023-10-24 21:26:00 +0300
committerhor911 <hor911@ydb.tech>2023-10-24 21:41:48 +0300
commit32537eb24d6375aa4032ba0e6ca4c3b6c64ac439 (patch)
tree43854ff0bfb0527a301bdf2da85e2bbc34990b5e
parent621ec2027f2f3c9ce8813fd72a39e29a89fc9bcb (diff)
downloadydb-32537eb24d6375aa4032ba0e6ca4c3b6c64ac439.tar.gz
Generic Connector Traffic Split and Stats
-rw-r--r--ydb/core/fq/libs/compute/common/utils.cpp12
-rw-r--r--ydb/core/fq/libs/control_plane_storage/internal/utils.cpp28
-rw-r--r--ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp4
-rw-r--r--ydb/library/yql/providers/generic/actors/yql_generic_source_factory.cpp12
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp12
5 files changed, 49 insertions, 19 deletions
diff --git a/ydb/core/fq/libs/compute/common/utils.cpp b/ydb/core/fq/libs/compute/common/utils.cpp
index c8514927ded..f5864b5bf5a 100644
--- a/ydb/core/fq/libs/compute/common/utils.cpp
+++ b/ydb/core/fq/libs/compute/common/utils.cpp
@@ -16,6 +16,7 @@ struct TTotalStatistics {
ui64 TotalOutputRows = 0;
ui64 TotalOutputBytes = 0;
ui64 TotalIngressBytes = 0;
+ ui64 TotalEgressBytes = 0;
TAggregates Aggregates;
};
@@ -92,6 +93,10 @@ void WriteNamedNode(NYson::TYsonWriter& writer, NJson::TJsonValue& node, const T
if (auto* ingressNode = item.GetValueByPath("Ingress.Bytes.Sum")) {
totals.TotalIngressBytes += ingressNode->GetIntegerSafe();
}
+ } else if (name == "Egress") {
+ if (auto* egressNode = item.GetValueByPath("Egress.Bytes.Sum")) {
+ totals.TotalEgressBytes += egressNode->GetIntegerSafe();
+ }
}
}
break;
@@ -312,6 +317,13 @@ TString GetV1StatFromV2Plan(const TString& plan) {
writer.OnInt64Scalar(totals.TotalIngressBytes);
writer.OnEndMap();
}
+ if (totals.TotalEgressBytes) {
+ writer.OnKeyedItem("TotalEgressBytes");
+ writer.OnBeginMap();
+ writer.OnKeyedItem("sum");
+ writer.OnInt64Scalar(totals.TotalEgressBytes);
+ writer.OnEndMap();
+ }
writer.OnEndMap();
}
}
diff --git a/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp b/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp
index e782aff1cec..35b63968e87 100644
--- a/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp
+++ b/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp
@@ -131,24 +131,22 @@ std::vector<TString> GetMeteringRecords(const TString& statistics, bool billable
//
if (graph.first.StartsWith("Graph=") || graph.first.StartsWith("Precompute=")) {
// YQv1 raw
- if (auto* ingressNode = graph.second.GetValueByPath("TaskRunner.Stage=Total.IngressS3SourceBytes.sum")) {
- ingress += ingressNode->GetIntegerSafe();
+ if (auto* stageTotalNode = graph.second.GetValueByPath("TaskRunner.Stage=Total")) {
+ if (stageTotalNode->GetType() == NJson::JSON_MAP) {
+ for (const auto& metric : stageTotalNode->GetMapSafe()) {
+ // Ingress.....Bytes i.e. IngressS3SourceBytes
+ if (metric.first.StartsWith("Ingress") && metric.first.EndsWith("Bytes")) {
+ if (auto* sumNode = metric.second.GetValueByPath("sum")) {
+ ingress += sumNode->GetIntegerSafe();
+ }
+ }
+ }
+ }
}
} else if (graph.first.StartsWith("ResultSet") || graph.first.StartsWith("Sink") || graph.first.StartsWith("Precompute_")) {
// YQv2
- if (auto* ingressNode = graph.second.GetValueByPath("IngressObjectStorageBytes.sum")) {
- // prettyfied
+ if (auto* ingressNode = graph.second.GetValueByPath("TotalIngressBytes.sum")) {
ingress += ingressNode->GetIntegerSafe();
- } else if (graph.second.GetType() == NJson::JSON_MAP) {
- for (const auto& stage : graph.second.GetMapSafe()) {
- if (auto* ingressNode = stage.second.GetValueByPath("IngressBytes=S3Source.sum")) {
- // raw old
- ingress += ingressNode->GetIntegerSafe();
- } else if (auto* ingressNode = stage.second.GetValueByPath("IngressBytes=S3Source.Ingress.Bytes.sum")) {
- // raw new
- ingress += ingressNode->GetIntegerSafe();
- }
- }
}
}
}
@@ -312,6 +310,8 @@ TString GetPrettyStatistics(const TString& statistics) {
AggregateNode(writer, p.second, "TotalCpuTimeUs", "CpuTimeUs");
AggregateNode(writer, p.second, "Ingress=S3Source.Ingress.Bytes", "IngressObjectStorageBytes");
AggregateNode(writer, p.second, "Egress=S3Sink.Egress.Bytes", "EgressObjectStorageBytes");
+ RemapNode(writer, p.second, "TotalIngressBytes", "TotalIngressBytes");
+ RemapNode(writer, p.second, "TotalEgressBytes", "TotalEgressBytes");
writer.OnEndMap();
}
}
diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp
index 713e8e91bc5..c15a69eca47 100644
--- a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp
+++ b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp
@@ -278,6 +278,9 @@ namespace NYql::NDq {
// Preserve stream message to return it to ComputeActor later
LastReadSplitsResponse_ = std::move(ev->Get()->Response);
+ IngressStats_.Bytes += LastReadSplitsResponse_->ByteSizeLong();
+ IngressStats_.Chunks++;
+ IngressStats_.Resume();
NotifyComputeActorWithData();
YQL_CLOG(TRACE, ProviderGeneric) << "Handle :: EvReadSplitsPart :: event handling finished";
@@ -379,6 +382,7 @@ namespace NYql::NDq {
YQL_CLOG(INFO, ProviderGeneric) << "GetAsyncInputData :: data reading finished";
}
+ IngressStats_.TryPause();
return 0;
}
diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_source_factory.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_source_factory.cpp
index 6299e1c277a..b3570987744 100644
--- a/ydb/library/yql/providers/generic/actors/yql_generic_source_factory.cpp
+++ b/ydb/library/yql/providers/generic/actors/yql_generic_source_factory.cpp
@@ -9,12 +9,16 @@ namespace NYql::NDq {
void RegisterGenericReadActorFactory(TDqAsyncIoFactory& factory,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
NYql::NConnector::IClient::TPtr genericClient) {
- factory.RegisterSource<Generic::TSource>("GenericSource", [credentialsFactory, genericClient](
- Generic::TSource&& settings,
- IDqAsyncIoFactory::TSourceArguments&& args) {
+ auto genericFactory = [credentialsFactory, genericClient](
+ Generic::TSource&& settings,
+ IDqAsyncIoFactory::TSourceArguments&& args) {
return CreateGenericReadActor(genericClient, std::move(settings), args.InputIndex, args.StatsLevel,
args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory, args.HolderFactory);
- });
+ };
+
+ for (auto& sourceName : {"ClickHouseGeneric", "PostgreSqlGeneric"}) {
+ factory.RegisterSource<Generic::TSource>(sourceName, genericFactory);
+ }
}
}
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp
index 39adc3d0e80..7b66033ca78 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp
@@ -154,7 +154,17 @@ namespace NYql {
// preserve source description for read actor
protoSettings.PackFrom(srcDesc);
- sourceType = "GenericSource";
+ switch (srcDesc.data_source_instance().kind()) {
+ case NYql::NConnector::NApi::CLICKHOUSE:
+ sourceType = "ClickHouseGeneric";
+ break;
+ case NYql::NConnector::NApi::POSTGRESQL:
+ sourceType = "PostgreSqlGeneric";
+ break;
+ default:
+ ythrow yexception() << "Data source kind is unknown or not specified";
+ break;
+ }
}
}