aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authord-mokhnatkin <d-mokhnatkin@yandex-team.ru>2022-02-23 01:40:43 +0300
committerd-mokhnatkin <d-mokhnatkin@yandex-team.ru>2022-02-23 01:40:43 +0300
commit55ab50dbd92b8acfb46e9a1f3c9c4b35bd0a8fa5 (patch)
treea8bc2109a19f11776af0e86749b275661c028771
parent00db9ed6d258817b6909f570db747420081ec88c (diff)
downloadydb-55ab50dbd92b8acfb46e9a1f3c9c4b35bd0a8fa5.tar.gz
YQ-877: some monitoring sink improvements
ref:50137e03a4fdde7174def09307e6657f751753b6
-rw-r--r--ydb/library/yql/providers/solomon/provider/yql_solomon_datasink_type_ann.cpp54
-rw-r--r--ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp16
-rw-r--r--ydb/library/yql/providers/solomon/provider/yql_solomon_provider.h3
3 files changed, 57 insertions, 16 deletions
diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_datasink_type_ann.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_datasink_type_ann.cpp
index dc9ea45a8f6..f36e34ed4a7 100644
--- a/ydb/library/yql/providers/solomon/provider/yql_solomon_datasink_type_ann.cpp
+++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_datasink_type_ann.cpp
@@ -1,6 +1,8 @@
#include "yql_solomon_provider_impl.h"
+#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h>
#include <ydb/library/yql/providers/solomon/expr_nodes/yql_solomon_expr_nodes.h>
+#include <ydb/library/yql/providers/solomon/proto/dq_solomon_shard.pb.h>
namespace NYql {
@@ -33,6 +35,53 @@ private:
if (!EnsureAtom(write.Shard().Ref(), ctx)) {
return TStatus::Error;
}
+
+ if (!State_->IsRtmrMode()) {
+ const TTypeAnnotationNode* inputItemType;
+ if (!EnsureNewSeqType<true, true, true>(write.Input().Pos(), *write.Input().Ref().GetTypeAnn(), ctx, &inputItemType)) {
+ return TStatus::Error;
+ }
+
+ if (!EnsureStructType(write.Input().Pos(), *inputItemType, ctx)) {
+ return TStatus::Error;
+ }
+
+ auto structType = inputItemType->Cast<TStructExprType>();
+
+ bool hasTimestampMember = false;
+ ui32 labelMembers = 0;
+ ui32 sensorMembers = 0;
+
+ for (auto* structItem : structType->GetItems()) {
+ const auto itemName = structItem->GetName();
+ const auto* itemType = structItem->GetItemType();
+ YQL_ENSURE(
+ itemType->GetKind() != ETypeAnnotationKind::Optional,
+ "Optional types are not supported in monitoring sink. FieldName: " << itemName);
+
+ const auto dataType = NUdf::GetDataTypeInfo(itemType->Cast<TDataExprType>()->GetSlot());
+
+ if (dataType.Features & NUdf::DateType || dataType.Features & NUdf::TzDateType) {
+ YQL_ENSURE(!hasTimestampMember, "Multiple timestamps were provided for monitoing sink");
+ hasTimestampMember = true;
+ } else if (dataType.Features & NUdf::StringType) {
+ labelMembers++;
+ } else if (dataType.Features & NUdf::NumericType) {
+ sensorMembers++;
+ } else {
+ YQL_ENSURE(false, "Ivalid data type for monitoing sink: " << dataType.Name);
+ }
+ }
+
+ YQL_ENSURE(hasTimestampMember, "Timestamp wasn't provided for monitoing sink");
+ YQL_ENSURE(sensorMembers != 0, "No sensors were provided for monitoing sink");
+
+ YQL_ENSURE(labelMembers <= SolomonMaxLabelsCount,
+ "Max labels count is " << SolomonMaxLabelsCount << " but " << labelMembers << " were provided");
+ YQL_ENSURE(sensorMembers <= SolomonMaxSensorsCount,
+ "Max sensors count is " << SolomonMaxSensorsCount << " but " << sensorMembers << " were provided");
+ }
+
input.Ptr()->SetTypeAnn(write.World().Ref().GetTypeAnn());
return TStatus::Ok;
}
@@ -66,6 +115,11 @@ private:
return TStatus::Error;
}
+ auto clusterType = shard.SolomonCluster().StringValue();
+ if (State_->Configuration->ClusterConfigs.at(clusterType).GetClusterType() == TSolomonClusterConfig::SCT_MONITORING) {
+ YQL_ENSURE(shard.Service().StringValue() == "custom", "Monitoring allows writing only to 'custom' service");
+ }
+
input.Ptr()->SetTypeAnn(ctx.MakeType<TUnitExprType>());
return TStatus::Ok;
}
diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp
index 773c9609914..683a206f128 100644
--- a/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp
+++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp
@@ -18,9 +18,6 @@ using namespace NNodes;
namespace {
-constexpr i32 MaxLabelsCount = 16;
-constexpr i32 MaxSensorsCount = 50;
-
NSo::NProto::ESolomonClusterType MapClusterType(TSolomonClusterConfig::ESolomonClusterType clusterType) {
switch (clusterType) {
case TSolomonClusterConfig::SCT_SOLOMON:
@@ -50,9 +47,6 @@ void FillScheme(const TTypeAnnotationNode& itemType, NSo::NProto::TDqSolomonShar
for (const TItemExprType* structItem : itemType.Cast<TStructExprType>()->GetItems()) {
const auto itemName = structItem->GetName();
const auto* itemType = structItem->GetItemType();
- YQL_ENSURE(
- itemType->GetKind() != ETypeAnnotationKind::Optional,
- "Optional types are not supported in monitoring sink. FieldName: " << itemName);
const auto dataType = NUdf::GetDataTypeInfo(itemType->Cast<TDataExprType>()->GetSlot());
@@ -62,7 +56,6 @@ void FillScheme(const TTypeAnnotationNode& itemType, NSo::NProto::TDqSolomonShar
schemeItem.SetDataTypeId(dataType.TypeId);
if (dataType.Features & NUdf::DateType || dataType.Features & NUdf::TzDateType) {
- YQL_ENSURE(!scheme.HasTimestamp(), "Multiple timestamps were provided for monitoing sink");
*scheme.MutableTimestamp() = schemeItem;
} else if (dataType.Features & NUdf::NumericType) {
scheme.MutableSensors()->Add(std::move(schemeItem));
@@ -72,15 +65,6 @@ void FillScheme(const TTypeAnnotationNode& itemType, NSo::NProto::TDqSolomonShar
YQL_ENSURE(false, "Ivalid data type for monitoing sink: " << dataType.Name);
}
}
-
- YQL_ENSURE(scheme.HasTimestamp(), "Timestamp wasn't provided for monitoing sink");
- YQL_ENSURE(!scheme.GetSensors().empty(), "No sensors were provided for monitoing sink");
- YQL_ENSURE(!scheme.GetLabels().empty(), "No labels were provided for monitoing sink");
-
- YQL_ENSURE(scheme.GetLabels().size() <= MaxLabelsCount,
- "Max labels count is " << MaxLabelsCount << " but " << scheme.GetLabels().size() << " were provided");
- YQL_ENSURE(scheme.GetSensors().size() <= MaxSensorsCount,
- "Max sensors count is " << MaxSensorsCount << " but " << scheme.GetSensors().size() << " were provided");
}
class TSolomonDqIntegration: public TDqIntegrationBase {
diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_provider.h b/ydb/library/yql/providers/solomon/provider/yql_solomon_provider.h
index 7f4a88a1315..b9182a0305d 100644
--- a/ydb/library/yql/providers/solomon/provider/yql_solomon_provider.h
+++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_provider.h
@@ -7,6 +7,9 @@
namespace NYql {
+constexpr i32 SolomonMaxLabelsCount = 16;
+constexpr i32 SolomonMaxSensorsCount = 50;
+
struct TSolomonState : public TThrRefBase
{
using TPtr = TIntrusivePtr<TSolomonState>;