diff options
author | hor911 <hor911@ydb.tech> | 2023-10-24 21:26:00 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2023-10-24 21:41:48 +0300 |
commit | 32537eb24d6375aa4032ba0e6ca4c3b6c64ac439 (patch) | |
tree | 43854ff0bfb0527a301bdf2da85e2bbc34990b5e | |
parent | 621ec2027f2f3c9ce8813fd72a39e29a89fc9bcb (diff) | |
download | ydb-32537eb24d6375aa4032ba0e6ca4c3b6c64ac439.tar.gz |
Generic Connector Traffic Split and Stats
5 files changed, 49 insertions, 19 deletions
diff --git a/ydb/core/fq/libs/compute/common/utils.cpp b/ydb/core/fq/libs/compute/common/utils.cpp index c8514927ded..f5864b5bf5a 100644 --- a/ydb/core/fq/libs/compute/common/utils.cpp +++ b/ydb/core/fq/libs/compute/common/utils.cpp @@ -16,6 +16,7 @@ struct TTotalStatistics { ui64 TotalOutputRows = 0; ui64 TotalOutputBytes = 0; ui64 TotalIngressBytes = 0; + ui64 TotalEgressBytes = 0; TAggregates Aggregates; }; @@ -92,6 +93,10 @@ void WriteNamedNode(NYson::TYsonWriter& writer, NJson::TJsonValue& node, const T if (auto* ingressNode = item.GetValueByPath("Ingress.Bytes.Sum")) { totals.TotalIngressBytes += ingressNode->GetIntegerSafe(); } + } else if (name == "Egress") { + if (auto* egressNode = item.GetValueByPath("Egress.Bytes.Sum")) { + totals.TotalEgressBytes += egressNode->GetIntegerSafe(); + } } } break; @@ -312,6 +317,13 @@ TString GetV1StatFromV2Plan(const TString& plan) { writer.OnInt64Scalar(totals.TotalIngressBytes); writer.OnEndMap(); } + if (totals.TotalEgressBytes) { + writer.OnKeyedItem("TotalEgressBytes"); + writer.OnBeginMap(); + writer.OnKeyedItem("sum"); + writer.OnInt64Scalar(totals.TotalEgressBytes); + writer.OnEndMap(); + } writer.OnEndMap(); } } diff --git a/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp b/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp index e782aff1cec..35b63968e87 100644 --- a/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp +++ b/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp @@ -131,24 +131,22 @@ std::vector<TString> GetMeteringRecords(const TString& statistics, bool billable // if (graph.first.StartsWith("Graph=") || graph.first.StartsWith("Precompute=")) { // YQv1 raw - if (auto* ingressNode = graph.second.GetValueByPath("TaskRunner.Stage=Total.IngressS3SourceBytes.sum")) { - ingress += ingressNode->GetIntegerSafe(); + if (auto* stageTotalNode = graph.second.GetValueByPath("TaskRunner.Stage=Total")) { + if (stageTotalNode->GetType() == NJson::JSON_MAP) { + for (const auto& metric : stageTotalNode->GetMapSafe()) { + // Ingress.....Bytes i.e. IngressS3SourceBytes + if (metric.first.StartsWith("Ingress") && metric.first.EndsWith("Bytes")) { + if (auto* sumNode = metric.second.GetValueByPath("sum")) { + ingress += sumNode->GetIntegerSafe(); + } + } + } + } } } else if (graph.first.StartsWith("ResultSet") || graph.first.StartsWith("Sink") || graph.first.StartsWith("Precompute_")) { // YQv2 - if (auto* ingressNode = graph.second.GetValueByPath("IngressObjectStorageBytes.sum")) { - // prettyfied + if (auto* ingressNode = graph.second.GetValueByPath("TotalIngressBytes.sum")) { ingress += ingressNode->GetIntegerSafe(); - } else if (graph.second.GetType() == NJson::JSON_MAP) { - for (const auto& stage : graph.second.GetMapSafe()) { - if (auto* ingressNode = stage.second.GetValueByPath("IngressBytes=S3Source.sum")) { - // raw old - ingress += ingressNode->GetIntegerSafe(); - } else if (auto* ingressNode = stage.second.GetValueByPath("IngressBytes=S3Source.Ingress.Bytes.sum")) { - // raw new - ingress += ingressNode->GetIntegerSafe(); - } - } } } } @@ -312,6 +310,8 @@ TString GetPrettyStatistics(const TString& statistics) { AggregateNode(writer, p.second, "TotalCpuTimeUs", "CpuTimeUs"); AggregateNode(writer, p.second, "Ingress=S3Source.Ingress.Bytes", "IngressObjectStorageBytes"); AggregateNode(writer, p.second, "Egress=S3Sink.Egress.Bytes", "EgressObjectStorageBytes"); + RemapNode(writer, p.second, "TotalIngressBytes", "TotalIngressBytes"); + RemapNode(writer, p.second, "TotalEgressBytes", "TotalEgressBytes"); writer.OnEndMap(); } } diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp index 713e8e91bc5..c15a69eca47 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp @@ -278,6 +278,9 @@ namespace NYql::NDq { // Preserve stream message to return it to ComputeActor later LastReadSplitsResponse_ = std::move(ev->Get()->Response); + IngressStats_.Bytes += LastReadSplitsResponse_->ByteSizeLong(); + IngressStats_.Chunks++; + IngressStats_.Resume(); NotifyComputeActorWithData(); YQL_CLOG(TRACE, ProviderGeneric) << "Handle :: EvReadSplitsPart :: event handling finished"; @@ -379,6 +382,7 @@ namespace NYql::NDq { YQL_CLOG(INFO, ProviderGeneric) << "GetAsyncInputData :: data reading finished"; } + IngressStats_.TryPause(); return 0; } diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_source_factory.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_source_factory.cpp index 6299e1c277a..b3570987744 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_source_factory.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_source_factory.cpp @@ -9,12 +9,16 @@ namespace NYql::NDq { void RegisterGenericReadActorFactory(TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, NYql::NConnector::IClient::TPtr genericClient) { - factory.RegisterSource<Generic::TSource>("GenericSource", [credentialsFactory, genericClient]( - Generic::TSource&& settings, - IDqAsyncIoFactory::TSourceArguments&& args) { + auto genericFactory = [credentialsFactory, genericClient]( + Generic::TSource&& settings, + IDqAsyncIoFactory::TSourceArguments&& args) { return CreateGenericReadActor(genericClient, std::move(settings), args.InputIndex, args.StatsLevel, args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory, args.HolderFactory); - }); + }; + + for (auto& sourceName : {"ClickHouseGeneric", "PostgreSqlGeneric"}) { + factory.RegisterSource<Generic::TSource>(sourceName, genericFactory); + } } } diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp index 39adc3d0e80..7b66033ca78 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp @@ -154,7 +154,17 @@ namespace NYql { // preserve source description for read actor protoSettings.PackFrom(srcDesc); - sourceType = "GenericSource"; + switch (srcDesc.data_source_instance().kind()) { + case NYql::NConnector::NApi::CLICKHOUSE: + sourceType = "ClickHouseGeneric"; + break; + case NYql::NConnector::NApi::POSTGRESQL: + sourceType = "PostgreSqlGeneric"; + break; + default: + ythrow yexception() << "Data source kind is unknown or not specified"; + break; + } } } |