aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhcpp <hcpp@ydb.tech>2023-11-14 22:34:42 +0300
committerhcpp <hcpp@ydb.tech>2023-11-14 22:53:03 +0300
commit37e4d5ec858a3a5ea9994e9213cd742f565142ae (patch)
tree6a4f092a061a3f4bf560b3b7fb587d01f560643f
parent8ef1d62f9e385f0b29a23d93bfd8b3d05555c370 (diff)
downloadydb-37e4d5ec858a3a5ea9994e9213cd742f565142ae.tar.gz
new metrics have been added to connector
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/grpc_metrics.go48
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/service_connector.go13
2 files changed, 52 insertions, 9 deletions
diff --git a/ydb/library/yql/providers/generic/connector/app/server/grpc_metrics.go b/ydb/library/yql/providers/generic/connector/app/server/grpc_metrics.go
index 6e1ff6badaf..de71f7167cc 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/grpc_metrics.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/grpc_metrics.go
@@ -8,6 +8,7 @@ import (
"github.com/ydb-platform/ydb/library/go/core/metrics/solomon"
"google.golang.org/grpc"
"google.golang.org/grpc/status"
+ "google.golang.org/protobuf/proto"
)
func UnaryServerMetrics(registry metrics.Registry) grpc.UnaryServerInterceptor {
@@ -15,12 +16,14 @@ func UnaryServerMetrics(registry metrics.Registry) grpc.UnaryServerInterceptor {
requestDuration := registry.DurationHistogramVec("request_duration_seconds", metrics.MakeExponentialDurationBuckets(250*time.Microsecond, 1.5, 35), []string{"protocol", "endpoint"})
panicsCount := registry.CounterVec("panics_total", []string{"protocol", "endpoint"})
inflightRequests := registry.GaugeVec("inflight_requests", []string{"protocol", "endpoint"})
- responseCountCount := registry.CounterVec("status_total", []string{"protocol", "endpoint", "status"})
+ statusCount := registry.CounterVec("status_total", []string{"protocol", "endpoint", "status"})
+ requestBytes := registry.CounterVec("request_bytes", []string{"protocol", "endpoint"})
+ responseBytes := registry.CounterVec("response_bytes", []string{"protocol", "endpoint"})
solomon.Rated(requestCount)
solomon.Rated(requestDuration)
solomon.Rated(panicsCount)
- solomon.Rated(responseCountCount)
+ solomon.Rated(statusCount)
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (_ any, err error) {
deferFunc := func(startTime time.Time, opName string) {
@@ -45,6 +48,11 @@ func UnaryServerMetrics(registry metrics.Registry) grpc.UnaryServerInterceptor {
opName := info.FullMethod
+ requestBytes.With(map[string]string{
+ "protocol": "grpc",
+ "endpoint": opName,
+ }).Add(int64(proto.Size(req.(proto.Message))))
+
requestCount.With(map[string]string{
"protocol": "grpc",
"endpoint": opName,
@@ -61,12 +69,17 @@ func UnaryServerMetrics(registry metrics.Registry) grpc.UnaryServerInterceptor {
resp, err := handler(ctx, req)
code := status.Code(err)
- responseCountCount.With(map[string]string{
+ statusCount.With(map[string]string{
"protocol": "grpc",
"endpoint": opName,
"status": code.String(),
}).Inc()
+ responseBytes.With(map[string]string{
+ "protocol": "grpc",
+ "endpoint": opName,
+ }).Add(int64(proto.Size(resp.(proto.Message))))
+
return resp, err
}
}
@@ -77,6 +90,9 @@ func StreamServerMetrics(registry metrics.Registry) grpc.StreamServerInterceptor
inflightStreams := registry.GaugeVec("inflight_streams", []string{"protocol", "endpoint"})
panicsCount := registry.CounterVec("panics_total", []string{"protocol", "endpoint"})
sentStreamMessages := registry.CounterVec("sent_stream_messages_total", []string{"protocol", "endpoint"})
+ receivedBytes := registry.CounterVec("received_bytes", []string{"protocol", "endpoint"})
+ sentBytes := registry.CounterVec("sent_bytes", []string{"protocol", "endpoint"})
+ statusCount := registry.CounterVec("stream_status_total", []string{"protocol", "endpoint", "status"})
receivedStreamMessages := registry.CounterVec("received_stream_messages_total", []string{"protocol", "endpoint"})
solomon.Rated(streamCount)
@@ -131,6 +147,21 @@ func StreamServerMetrics(registry metrics.Registry) grpc.StreamServerInterceptor
"protocol": "grpc",
"endpoint": opName,
}),
+ sentBytes: sentBytes.With(map[string]string{
+ "protocol": "grpc",
+ "endpoint": opName,
+ }),
+ receivedBytes: receivedBytes.With(map[string]string{
+ "protocol": "grpc",
+ "endpoint": opName,
+ }),
+ getStatusCounter: func(code string) metrics.Counter {
+ return statusCount.With(map[string]string{
+ "protocol": "grpc",
+ "endpoint": opName,
+ "status": code,
+ })
+ },
})
}
}
@@ -139,6 +170,9 @@ type serverStreamWithMessagesCount struct {
grpc.ServerStream
sentStreamMessages metrics.Counter
receivedStreamMessages metrics.Counter
+ sentBytes metrics.Counter
+ receivedBytes metrics.Counter
+ getStatusCounter func(string) metrics.Counter
}
func (s serverStreamWithMessagesCount) SendMsg(m any) error {
@@ -146,8 +180,12 @@ func (s serverStreamWithMessagesCount) SendMsg(m any) error {
if err == nil {
s.sentStreamMessages.Inc()
+ s.sentBytes.Add(int64(proto.Size(m.(proto.Message))))
}
+ code := status.Code(err)
+ s.getStatusCounter(code.String()).Inc()
+
return err
}
@@ -156,7 +194,11 @@ func (s serverStreamWithMessagesCount) RecvMsg(m any) error {
if err == nil {
s.receivedStreamMessages.Inc()
+ s.receivedBytes.Add(int64(proto.Size(m.(proto.Message))))
}
+ code := status.Code(err)
+ s.getStatusCounter(code.String()).Inc()
+
return err
}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/service_connector.go b/ydb/library/yql/providers/generic/connector/app/server/service_connector.go
index 54ac7a1aa47..d4a80b444a6 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/service_connector.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/service_connector.go
@@ -263,6 +263,12 @@ func makeGRPCOptions(logger log.Logger, cfg *config.TServerConfig, registry *sol
tlsConfig *config.TServerTLSConfig
)
+ unaryInterceptors := []grpc.UnaryServerInterceptor{UnaryServerMetrics(registry)}
+
+ streamInterceptors := []grpc.StreamServerInterceptor{StreamServerMetrics(registry)}
+
+ opts = append(opts, grpc.ChainUnaryInterceptor(unaryInterceptors...), grpc.ChainStreamInterceptor(streamInterceptors...))
+
// TODO: drop deprecated fields after YQ-2057
switch {
case cfg.GetConnectorServer().GetTls() != nil:
@@ -286,12 +292,7 @@ func makeGRPCOptions(logger log.Logger, cfg *config.TServerConfig, registry *sol
// for security reasons we do not allow TLS < 1.2, see YQ-1877
creds := credentials.NewTLS(&tls.Config{Certificates: []tls.Certificate{cert}, MinVersion: tls.VersionTLS12})
-
- unaryInterceptors := []grpc.UnaryServerInterceptor{UnaryServerMetrics(registry)}
-
- streamInterceptors := []grpc.StreamServerInterceptor{StreamServerMetrics(registry)}
-
- opts = append(opts, grpc.Creds(creds), grpc.ChainUnaryInterceptor(unaryInterceptors...), grpc.ChainStreamInterceptor(streamInterceptors...))
+ opts = append(opts, grpc.Creds(creds))
return opts, nil
}