diff options
author | hcpp <hcpp@ydb.tech> | 2023-11-14 22:34:42 +0300 |
---|---|---|
committer | hcpp <hcpp@ydb.tech> | 2023-11-14 22:53:03 +0300 |
commit | 37e4d5ec858a3a5ea9994e9213cd742f565142ae (patch) | |
tree | 6a4f092a061a3f4bf560b3b7fb587d01f560643f | |
parent | 8ef1d62f9e385f0b29a23d93bfd8b3d05555c370 (diff) | |
download | ydb-37e4d5ec858a3a5ea9994e9213cd742f565142ae.tar.gz |
new metrics have been added to connector
-rw-r--r-- | ydb/library/yql/providers/generic/connector/app/server/grpc_metrics.go | 48 | ||||
-rw-r--r-- | ydb/library/yql/providers/generic/connector/app/server/service_connector.go | 13 |
2 files changed, 52 insertions, 9 deletions
diff --git a/ydb/library/yql/providers/generic/connector/app/server/grpc_metrics.go b/ydb/library/yql/providers/generic/connector/app/server/grpc_metrics.go index 6e1ff6badaf..de71f7167cc 100644 --- a/ydb/library/yql/providers/generic/connector/app/server/grpc_metrics.go +++ b/ydb/library/yql/providers/generic/connector/app/server/grpc_metrics.go @@ -8,6 +8,7 @@ import ( "github.com/ydb-platform/ydb/library/go/core/metrics/solomon" "google.golang.org/grpc" "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" ) func UnaryServerMetrics(registry metrics.Registry) grpc.UnaryServerInterceptor { @@ -15,12 +16,14 @@ func UnaryServerMetrics(registry metrics.Registry) grpc.UnaryServerInterceptor { requestDuration := registry.DurationHistogramVec("request_duration_seconds", metrics.MakeExponentialDurationBuckets(250*time.Microsecond, 1.5, 35), []string{"protocol", "endpoint"}) panicsCount := registry.CounterVec("panics_total", []string{"protocol", "endpoint"}) inflightRequests := registry.GaugeVec("inflight_requests", []string{"protocol", "endpoint"}) - responseCountCount := registry.CounterVec("status_total", []string{"protocol", "endpoint", "status"}) + statusCount := registry.CounterVec("status_total", []string{"protocol", "endpoint", "status"}) + requestBytes := registry.CounterVec("request_bytes", []string{"protocol", "endpoint"}) + responseBytes := registry.CounterVec("response_bytes", []string{"protocol", "endpoint"}) solomon.Rated(requestCount) solomon.Rated(requestDuration) solomon.Rated(panicsCount) - solomon.Rated(responseCountCount) + solomon.Rated(statusCount) return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (_ any, err error) { deferFunc := func(startTime time.Time, opName string) { @@ -45,6 +48,11 @@ func UnaryServerMetrics(registry metrics.Registry) grpc.UnaryServerInterceptor { opName := info.FullMethod + requestBytes.With(map[string]string{ + "protocol": "grpc", + "endpoint": opName, + }).Add(int64(proto.Size(req.(proto.Message)))) + requestCount.With(map[string]string{ "protocol": "grpc", "endpoint": opName, @@ -61,12 +69,17 @@ func UnaryServerMetrics(registry metrics.Registry) grpc.UnaryServerInterceptor { resp, err := handler(ctx, req) code := status.Code(err) - responseCountCount.With(map[string]string{ + statusCount.With(map[string]string{ "protocol": "grpc", "endpoint": opName, "status": code.String(), }).Inc() + responseBytes.With(map[string]string{ + "protocol": "grpc", + "endpoint": opName, + }).Add(int64(proto.Size(resp.(proto.Message)))) + return resp, err } } @@ -77,6 +90,9 @@ func StreamServerMetrics(registry metrics.Registry) grpc.StreamServerInterceptor inflightStreams := registry.GaugeVec("inflight_streams", []string{"protocol", "endpoint"}) panicsCount := registry.CounterVec("panics_total", []string{"protocol", "endpoint"}) sentStreamMessages := registry.CounterVec("sent_stream_messages_total", []string{"protocol", "endpoint"}) + receivedBytes := registry.CounterVec("received_bytes", []string{"protocol", "endpoint"}) + sentBytes := registry.CounterVec("sent_bytes", []string{"protocol", "endpoint"}) + statusCount := registry.CounterVec("stream_status_total", []string{"protocol", "endpoint", "status"}) receivedStreamMessages := registry.CounterVec("received_stream_messages_total", []string{"protocol", "endpoint"}) solomon.Rated(streamCount) @@ -131,6 +147,21 @@ func StreamServerMetrics(registry metrics.Registry) grpc.StreamServerInterceptor "protocol": "grpc", "endpoint": opName, }), + sentBytes: sentBytes.With(map[string]string{ + "protocol": "grpc", + "endpoint": opName, + }), + receivedBytes: receivedBytes.With(map[string]string{ + "protocol": "grpc", + "endpoint": opName, + }), + getStatusCounter: func(code string) metrics.Counter { + return statusCount.With(map[string]string{ + "protocol": "grpc", + "endpoint": opName, + "status": code, + }) + }, }) } } @@ -139,6 +170,9 @@ type serverStreamWithMessagesCount struct { grpc.ServerStream sentStreamMessages metrics.Counter receivedStreamMessages metrics.Counter + sentBytes metrics.Counter + receivedBytes metrics.Counter + getStatusCounter func(string) metrics.Counter } func (s serverStreamWithMessagesCount) SendMsg(m any) error { @@ -146,8 +180,12 @@ func (s serverStreamWithMessagesCount) SendMsg(m any) error { if err == nil { s.sentStreamMessages.Inc() + s.sentBytes.Add(int64(proto.Size(m.(proto.Message)))) } + code := status.Code(err) + s.getStatusCounter(code.String()).Inc() + return err } @@ -156,7 +194,11 @@ func (s serverStreamWithMessagesCount) RecvMsg(m any) error { if err == nil { s.receivedStreamMessages.Inc() + s.receivedBytes.Add(int64(proto.Size(m.(proto.Message)))) } + code := status.Code(err) + s.getStatusCounter(code.String()).Inc() + return err } diff --git a/ydb/library/yql/providers/generic/connector/app/server/service_connector.go b/ydb/library/yql/providers/generic/connector/app/server/service_connector.go index 54ac7a1aa47..d4a80b444a6 100644 --- a/ydb/library/yql/providers/generic/connector/app/server/service_connector.go +++ b/ydb/library/yql/providers/generic/connector/app/server/service_connector.go @@ -263,6 +263,12 @@ func makeGRPCOptions(logger log.Logger, cfg *config.TServerConfig, registry *sol tlsConfig *config.TServerTLSConfig ) + unaryInterceptors := []grpc.UnaryServerInterceptor{UnaryServerMetrics(registry)} + + streamInterceptors := []grpc.StreamServerInterceptor{StreamServerMetrics(registry)} + + opts = append(opts, grpc.ChainUnaryInterceptor(unaryInterceptors...), grpc.ChainStreamInterceptor(streamInterceptors...)) + // TODO: drop deprecated fields after YQ-2057 switch { case cfg.GetConnectorServer().GetTls() != nil: @@ -286,12 +292,7 @@ func makeGRPCOptions(logger log.Logger, cfg *config.TServerConfig, registry *sol // for security reasons we do not allow TLS < 1.2, see YQ-1877 creds := credentials.NewTLS(&tls.Config{Certificates: []tls.Certificate{cert}, MinVersion: tls.VersionTLS12}) - - unaryInterceptors := []grpc.UnaryServerInterceptor{UnaryServerMetrics(registry)} - - streamInterceptors := []grpc.StreamServerInterceptor{StreamServerMetrics(registry)} - - opts = append(opts, grpc.Creds(creds), grpc.ChainUnaryInterceptor(unaryInterceptors...), grpc.ChainStreamInterceptor(streamInterceptors...)) + opts = append(opts, grpc.Creds(creds)) return opts, nil } |