diff options
author | uzhas <uzhas@ydb.tech> | 2023-12-13 19:01:18 +0300 |
---|---|---|
committer | uzhas <uzhas@ydb.tech> | 2023-12-13 22:41:50 +0300 |
commit | 0bfea175d9013a083f56cfbbfc39c230300baf73 (patch) | |
tree | ec515fd9ba982ab1efcb4cf65806982229463b8b | |
parent | 33b1f1a3020fa35f93c7b9d0a4bf2a61a47f45cc (diff) | |
download | ydb-0bfea175d9013a083f56cfbbfc39c230300baf73.tar.gz |
,allow optional timestamp in insert into monitoring
4 files changed, 48 insertions, 14 deletions
diff --git a/ydb/core/fq/libs/actors/clusters_from_connections.cpp b/ydb/core/fq/libs/actors/clusters_from_connections.cpp index a155f7a42c..fa505591c9 100644 --- a/ydb/core/fq/libs/actors/clusters_from_connections.cpp +++ b/ydb/core/fq/libs/actors/clusters_from_connections.cpp @@ -4,6 +4,7 @@ #include <ydb/library/yql/providers/generic/connector/api/common/data_source.pb.h> #include <ydb/library/yql/providers/generic/provider/yql_generic_cluster_config.h> #include <ydb/library/yql/utils/url_builder.h> +#include <ydb/library/actors/http/http.h> #include <util/generic/hash.h> #include <util/string/builder.h> @@ -73,6 +74,17 @@ void FillS3ClusterConfig(NYql::TS3ClusterConfig& clusterConfig, FillClusterAuth(clusterConfig, s3.auth(), authToken, accountIdSignatures); } +std::pair<TString, bool> ParseHttpEndpoint(const TString& endpoint) { + TStringBuf scheme; + TStringBuf host; + TStringBuf uri; + NHttp::CrackURL(endpoint, scheme, host, uri); + + // by default useSsl is true + // explicit "http://" scheme should disable ssl usage + return std::make_pair(ToString(host), scheme != "http"); +} + void FillSolomonClusterConfig(NYql::TSolomonClusterConfig& clusterConfig, const TString& name, const TString& authToken, @@ -81,11 +93,13 @@ void FillSolomonClusterConfig(NYql::TSolomonClusterConfig& clusterConfig, const FederatedQuery::Monitoring& monitoring) { clusterConfig.SetName(name); - clusterConfig.SetCluster(endpoint); + auto [address, useSsl] = ParseHttpEndpoint(endpoint); + + clusterConfig.SetCluster(address); clusterConfig.SetClusterType(TSolomonClusterConfig::SCT_MONITORING); clusterConfig.MutablePath()->SetProject(monitoring.project()); clusterConfig.MutablePath()->SetCluster(monitoring.cluster()); - clusterConfig.SetUseSsl(true); + clusterConfig.SetUseSsl(useSsl); FillClusterAuth(clusterConfig, monitoring.auth(), authToken, accountIdSignatures); } diff --git a/ydb/library/yql/providers/solomon/async_io/metrics_encoder.cpp b/ydb/library/yql/providers/solomon/async_io/metrics_encoder.cpp index 0f1dd8f71d..bee50bc4fb 100644 --- a/ydb/library/yql/providers/solomon/async_io/metrics_encoder.cpp +++ b/ydb/library/yql/providers/solomon/async_io/metrics_encoder.cpp @@ -32,11 +32,15 @@ void BeginMetric( } } -TInstant ParseTimestamp( +TMaybe<TInstant> ParseTimestamp( const NUdf::TUnboxedValue& unboxedValue, const NYql::NSo::NProto::TDqSolomonSchemeItem& scheme) { - const auto& timestampValue = unboxedValue.GetElement(scheme.GetIndex()); + const NUdf::TUnboxedValue timestampValue = unboxedValue.GetElement(scheme.GetIndex()); + if (!timestampValue) { + return {}; + } + switch (scheme.GetDataTypeId()) { case NUdf::TDataType<NUdf::TDate>::Id: case NUdf::TDataType<NUdf::TTzDate>::Id: @@ -115,7 +119,11 @@ void TMetricsEncoder::BeginNew() { ui64 TMetricsEncoder::Append(const NUdf::TUnboxedValue& value) { - TInstant timestamp = ParseTimestamp(value, Scheme.GetTimestamp()); + TMaybe<TInstant> timestamp = ParseTimestamp(value, Scheme.GetTimestamp()); + + if (!timestamp) { + return Scheme.GetSensors().size(); + } for (const auto& sensor : Scheme.GetSensors()) { BeginMetric(SolomonEncoder, sensor); @@ -129,7 +137,7 @@ ui64 TMetricsEncoder::Append(const NUdf::TUnboxedValue& value) SolomonEncoder->OnLabelsEnd(); const auto& sensorValue = value.GetElement(sensor.GetIndex()); - EncodeSensorValue(SolomonEncoder, timestamp, sensorValue, sensor); + EncodeSensorValue(SolomonEncoder, *timestamp, sensorValue, sensor); SolomonEncoder->OnMetricEnd(); } diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_datasink_type_ann.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_datasink_type_ann.cpp index f34e27e757..28e55a4c67 100644 --- a/ydb/library/yql/providers/solomon/provider/yql_solomon_datasink_type_ann.cpp +++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_datasink_type_ann.cpp @@ -54,13 +54,14 @@ private: for (auto* structItem : structType->GetItems()) { const auto itemName = structItem->GetName(); - const auto* itemType = structItem->GetItemType(); - if (itemType->GetKind() == ETypeAnnotationKind::Optional) { - ctx.AddError(TIssue(ctx.GetPosition(write.Input().Pos()), TStringBuilder() << "Optional types are not supported in writing into Monitoring. FieldName: " << itemName)); + const TDataExprType* itemType = nullptr; + + bool isOptional = false; + if (!IsDataOrOptionalOfData(structItem->GetItemType(), isOptional, itemType)) { return TStatus::Error; } - const auto dataType = NUdf::GetDataTypeInfo(itemType->Cast<TDataExprType>()->GetSlot()); + const auto dataType = NUdf::GetDataTypeInfo(itemType->GetSlot()); if (dataType.Features & NUdf::DateType || dataType.Features & NUdf::TzDateType) { if (hasTimestampMember) { @@ -68,7 +69,15 @@ private: return TStatus::Error; } hasTimestampMember = true; - } else if (dataType.Features & NUdf::StringType) { + continue; + } + + if (isOptional) { + ctx.AddError(TIssue(ctx.GetPosition(write.Input().Pos()), TStringBuilder() << "Optional types for labels and metric values are not supported in writing into Monitoring. FieldName: " << itemName)); + return TStatus::Error; + } + + if (dataType.Features & NUdf::StringType) { labelMembers++; } else if (dataType.Features & NUdf::NumericType) { sensorMembers++; diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp index 04c03156e6..0383ee1466 100644 --- a/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp +++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp @@ -46,9 +46,12 @@ void FillScheme(const TTypeAnnotationNode& itemType, NSo::NProto::TDqSolomonShar int index = 0; for (const TItemExprType* structItem : itemType.Cast<TStructExprType>()->GetItems()) { const auto itemName = structItem->GetName(); - const auto* itemType = structItem->GetItemType(); + const TDataExprType* itemType = nullptr; - const auto dataType = NUdf::GetDataTypeInfo(itemType->Cast<TDataExprType>()->GetSlot()); + bool isOptionalUnused = false; + YQL_ENSURE(IsDataOrOptionalOfData(structItem->GetItemType(), isOptionalUnused, itemType), "Failed to unwrap optional type"); + + const auto dataType = NUdf::GetDataTypeInfo(itemType->GetSlot()); NSo::NProto::TDqSolomonSchemeItem schemeItem; schemeItem.SetKey(TString(itemName)); @@ -62,7 +65,7 @@ void FillScheme(const TTypeAnnotationNode& itemType, NSo::NProto::TDqSolomonShar } else if (dataType.Features & NUdf::StringType) { scheme.MutableLabels()->Add(std::move(schemeItem)); } else { - YQL_ENSURE(false, "Ivalid data type for monitoing sink: " << dataType.Name); + YQL_ENSURE(false, "Invalid data type for monitoring sink: " << dataType.Name); } } } |