aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoruzhas <uzhas@ydb.tech>2023-12-13 19:01:18 +0300
committeruzhas <uzhas@ydb.tech>2023-12-13 22:41:50 +0300
commit0bfea175d9013a083f56cfbbfc39c230300baf73 (patch)
treeec515fd9ba982ab1efcb4cf65806982229463b8b
parent33b1f1a3020fa35f93c7b9d0a4bf2a61a47f45cc (diff)
downloadydb-0bfea175d9013a083f56cfbbfc39c230300baf73.tar.gz
,allow optional timestamp in insert into monitoring
-rw-r--r--ydb/core/fq/libs/actors/clusters_from_connections.cpp18
-rw-r--r--ydb/library/yql/providers/solomon/async_io/metrics_encoder.cpp16
-rw-r--r--ydb/library/yql/providers/solomon/provider/yql_solomon_datasink_type_ann.cpp19
-rw-r--r--ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp9
4 files changed, 48 insertions, 14 deletions
diff --git a/ydb/core/fq/libs/actors/clusters_from_connections.cpp b/ydb/core/fq/libs/actors/clusters_from_connections.cpp
index a155f7a42c..fa505591c9 100644
--- a/ydb/core/fq/libs/actors/clusters_from_connections.cpp
+++ b/ydb/core/fq/libs/actors/clusters_from_connections.cpp
@@ -4,6 +4,7 @@
#include <ydb/library/yql/providers/generic/connector/api/common/data_source.pb.h>
#include <ydb/library/yql/providers/generic/provider/yql_generic_cluster_config.h>
#include <ydb/library/yql/utils/url_builder.h>
+#include <ydb/library/actors/http/http.h>
#include <util/generic/hash.h>
#include <util/string/builder.h>
@@ -73,6 +74,17 @@ void FillS3ClusterConfig(NYql::TS3ClusterConfig& clusterConfig,
FillClusterAuth(clusterConfig, s3.auth(), authToken, accountIdSignatures);
}
+std::pair<TString, bool> ParseHttpEndpoint(const TString& endpoint) {
+ TStringBuf scheme;
+ TStringBuf host;
+ TStringBuf uri;
+ NHttp::CrackURL(endpoint, scheme, host, uri);
+
+ // by default useSsl is true
+ // explicit "http://" scheme should disable ssl usage
+ return std::make_pair(ToString(host), scheme != "http");
+}
+
void FillSolomonClusterConfig(NYql::TSolomonClusterConfig& clusterConfig,
const TString& name,
const TString& authToken,
@@ -81,11 +93,13 @@ void FillSolomonClusterConfig(NYql::TSolomonClusterConfig& clusterConfig,
const FederatedQuery::Monitoring& monitoring) {
clusterConfig.SetName(name);
- clusterConfig.SetCluster(endpoint);
+ auto [address, useSsl] = ParseHttpEndpoint(endpoint);
+
+ clusterConfig.SetCluster(address);
clusterConfig.SetClusterType(TSolomonClusterConfig::SCT_MONITORING);
clusterConfig.MutablePath()->SetProject(monitoring.project());
clusterConfig.MutablePath()->SetCluster(monitoring.cluster());
- clusterConfig.SetUseSsl(true);
+ clusterConfig.SetUseSsl(useSsl);
FillClusterAuth(clusterConfig, monitoring.auth(), authToken, accountIdSignatures);
}
diff --git a/ydb/library/yql/providers/solomon/async_io/metrics_encoder.cpp b/ydb/library/yql/providers/solomon/async_io/metrics_encoder.cpp
index 0f1dd8f71d..bee50bc4fb 100644
--- a/ydb/library/yql/providers/solomon/async_io/metrics_encoder.cpp
+++ b/ydb/library/yql/providers/solomon/async_io/metrics_encoder.cpp
@@ -32,11 +32,15 @@ void BeginMetric(
}
}
-TInstant ParseTimestamp(
+TMaybe<TInstant> ParseTimestamp(
const NUdf::TUnboxedValue& unboxedValue,
const NYql::NSo::NProto::TDqSolomonSchemeItem& scheme)
{
- const auto& timestampValue = unboxedValue.GetElement(scheme.GetIndex());
+ const NUdf::TUnboxedValue timestampValue = unboxedValue.GetElement(scheme.GetIndex());
+ if (!timestampValue) {
+ return {};
+ }
+
switch (scheme.GetDataTypeId()) {
case NUdf::TDataType<NUdf::TDate>::Id:
case NUdf::TDataType<NUdf::TTzDate>::Id:
@@ -115,7 +119,11 @@ void TMetricsEncoder::BeginNew() {
ui64 TMetricsEncoder::Append(const NUdf::TUnboxedValue& value)
{
- TInstant timestamp = ParseTimestamp(value, Scheme.GetTimestamp());
+ TMaybe<TInstant> timestamp = ParseTimestamp(value, Scheme.GetTimestamp());
+
+ if (!timestamp) {
+ return Scheme.GetSensors().size();
+ }
for (const auto& sensor : Scheme.GetSensors()) {
BeginMetric(SolomonEncoder, sensor);
@@ -129,7 +137,7 @@ ui64 TMetricsEncoder::Append(const NUdf::TUnboxedValue& value)
SolomonEncoder->OnLabelsEnd();
const auto& sensorValue = value.GetElement(sensor.GetIndex());
- EncodeSensorValue(SolomonEncoder, timestamp, sensorValue, sensor);
+ EncodeSensorValue(SolomonEncoder, *timestamp, sensorValue, sensor);
SolomonEncoder->OnMetricEnd();
}
diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_datasink_type_ann.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_datasink_type_ann.cpp
index f34e27e757..28e55a4c67 100644
--- a/ydb/library/yql/providers/solomon/provider/yql_solomon_datasink_type_ann.cpp
+++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_datasink_type_ann.cpp
@@ -54,13 +54,14 @@ private:
for (auto* structItem : structType->GetItems()) {
const auto itemName = structItem->GetName();
- const auto* itemType = structItem->GetItemType();
- if (itemType->GetKind() == ETypeAnnotationKind::Optional) {
- ctx.AddError(TIssue(ctx.GetPosition(write.Input().Pos()), TStringBuilder() << "Optional types are not supported in writing into Monitoring. FieldName: " << itemName));
+ const TDataExprType* itemType = nullptr;
+
+ bool isOptional = false;
+ if (!IsDataOrOptionalOfData(structItem->GetItemType(), isOptional, itemType)) {
return TStatus::Error;
}
- const auto dataType = NUdf::GetDataTypeInfo(itemType->Cast<TDataExprType>()->GetSlot());
+ const auto dataType = NUdf::GetDataTypeInfo(itemType->GetSlot());
if (dataType.Features & NUdf::DateType || dataType.Features & NUdf::TzDateType) {
if (hasTimestampMember) {
@@ -68,7 +69,15 @@ private:
return TStatus::Error;
}
hasTimestampMember = true;
- } else if (dataType.Features & NUdf::StringType) {
+ continue;
+ }
+
+ if (isOptional) {
+ ctx.AddError(TIssue(ctx.GetPosition(write.Input().Pos()), TStringBuilder() << "Optional types for labels and metric values are not supported in writing into Monitoring. FieldName: " << itemName));
+ return TStatus::Error;
+ }
+
+ if (dataType.Features & NUdf::StringType) {
labelMembers++;
} else if (dataType.Features & NUdf::NumericType) {
sensorMembers++;
diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp
index 04c03156e6..0383ee1466 100644
--- a/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp
+++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp
@@ -46,9 +46,12 @@ void FillScheme(const TTypeAnnotationNode& itemType, NSo::NProto::TDqSolomonShar
int index = 0;
for (const TItemExprType* structItem : itemType.Cast<TStructExprType>()->GetItems()) {
const auto itemName = structItem->GetName();
- const auto* itemType = structItem->GetItemType();
+ const TDataExprType* itemType = nullptr;
- const auto dataType = NUdf::GetDataTypeInfo(itemType->Cast<TDataExprType>()->GetSlot());
+ bool isOptionalUnused = false;
+ YQL_ENSURE(IsDataOrOptionalOfData(structItem->GetItemType(), isOptionalUnused, itemType), "Failed to unwrap optional type");
+
+ const auto dataType = NUdf::GetDataTypeInfo(itemType->GetSlot());
NSo::NProto::TDqSolomonSchemeItem schemeItem;
schemeItem.SetKey(TString(itemName));
@@ -62,7 +65,7 @@ void FillScheme(const TTypeAnnotationNode& itemType, NSo::NProto::TDqSolomonShar
} else if (dataType.Features & NUdf::StringType) {
scheme.MutableLabels()->Add(std::move(schemeItem));
} else {
- YQL_ENSURE(false, "Ivalid data type for monitoing sink: " << dataType.Name);
+ YQL_ENSURE(false, "Invalid data type for monitoring sink: " << dataType.Name);
}
}
}