aboutsummaryrefslogtreecommitdiffstats
path: root/library/go/yandex/solomon/reporters/puller
diff options
context:
space:
mode:
authorhcpp <hcpp@ydb.tech>2023-11-08 12:09:41 +0300
committerhcpp <hcpp@ydb.tech>2023-11-08 12:56:14 +0300
commita361f5b98b98b44ea510d274f6769164640dd5e1 (patch)
treec47c80962c6e2e7b06798238752fd3da0191a3f6 /library/go/yandex/solomon/reporters/puller
parent9478806fde1f4d40bd5a45e7cbe77237dab613e9 (diff)
downloadydb-a361f5b98b98b44ea510d274f6769164640dd5e1.tar.gz
metrics have been added
Diffstat (limited to 'library/go/yandex/solomon/reporters/puller')
-rw-r--r--library/go/yandex/solomon/reporters/puller/httppuller/example_test.go40
-rw-r--r--library/go/yandex/solomon/reporters/puller/httppuller/gotest/ya.make3
-rw-r--r--library/go/yandex/solomon/reporters/puller/httppuller/handler.go120
-rw-r--r--library/go/yandex/solomon/reporters/puller/httppuller/handler_test.go197
-rw-r--r--library/go/yandex/solomon/reporters/puller/httppuller/logger.go15
-rw-r--r--library/go/yandex/solomon/reporters/puller/httppuller/spack.go10
-rw-r--r--library/go/yandex/solomon/reporters/puller/httppuller/tvm.go27
-rw-r--r--library/go/yandex/solomon/reporters/puller/httppuller/tvm_test.go80
-rw-r--r--library/go/yandex/solomon/reporters/puller/httppuller/ya.make19
9 files changed, 511 insertions, 0 deletions
diff --git a/library/go/yandex/solomon/reporters/puller/httppuller/example_test.go b/library/go/yandex/solomon/reporters/puller/httppuller/example_test.go
new file mode 100644
index 0000000000..c04c81168d
--- /dev/null
+++ b/library/go/yandex/solomon/reporters/puller/httppuller/example_test.go
@@ -0,0 +1,40 @@
+package httppuller_test
+
+import (
+ "net/http"
+ "time"
+
+ "github.com/ydb-platform/ydb/library/go/core/metrics/solomon"
+ "github.com/ydb-platform/ydb/library/go/yandex/solomon/reporters/puller/httppuller"
+ "github.com/ydb-platform/ydb/library/go/yandex/tvm"
+)
+
+func ExampleNewHandler() {
+ // create metrics registry
+ opts := solomon.NewRegistryOpts().
+ SetSeparator('_').
+ SetPrefix("myprefix")
+
+ reg := solomon.NewRegistry(opts)
+
+ // register new metric
+ cnt := reg.Counter("cyclesCount")
+
+ // pass metric to your function and do job
+ go func() {
+ for {
+ cnt.Inc()
+ time.Sleep(1 * time.Second)
+ }
+ }()
+
+ // start HTTP server with handler on /metrics URI
+ mux := http.NewServeMux()
+ mux.Handle("/metrics", httppuller.NewHandler(reg))
+
+ // Or start
+ var tvm tvm.Client
+ mux.Handle("/secure_metrics", httppuller.NewHandler(reg, httppuller.WithTVM(tvm)))
+
+ _ = http.ListenAndServe(":80", mux)
+}
diff --git a/library/go/yandex/solomon/reporters/puller/httppuller/gotest/ya.make b/library/go/yandex/solomon/reporters/puller/httppuller/gotest/ya.make
new file mode 100644
index 0000000000..cf11e75a33
--- /dev/null
+++ b/library/go/yandex/solomon/reporters/puller/httppuller/gotest/ya.make
@@ -0,0 +1,3 @@
+GO_TEST_FOR(library/go/yandex/solomon/reporters/puller/httppuller)
+
+END()
diff --git a/library/go/yandex/solomon/reporters/puller/httppuller/handler.go b/library/go/yandex/solomon/reporters/puller/httppuller/handler.go
new file mode 100644
index 0000000000..9521d41bdc
--- /dev/null
+++ b/library/go/yandex/solomon/reporters/puller/httppuller/handler.go
@@ -0,0 +1,120 @@
+package httppuller
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "net/http"
+ "reflect"
+
+ "github.com/ydb-platform/ydb/library/go/core/log"
+ "github.com/ydb-platform/ydb/library/go/core/log/nop"
+ "github.com/ydb-platform/ydb/library/go/core/metrics/solomon"
+ "github.com/ydb-platform/ydb/library/go/httputil/headers"
+ "github.com/ydb-platform/ydb/library/go/httputil/middleware/tvm"
+)
+
+const nilRegistryPanicMsg = "nil registry given"
+
+type MetricsStreamer interface {
+ StreamJSON(context.Context, io.Writer) (int, error)
+ StreamSpack(context.Context, io.Writer, solomon.CompressionType) (int, error)
+}
+
+type handler struct {
+ registry MetricsStreamer
+ streamFormat headers.ContentType
+ checkTicket func(h http.Handler) http.Handler
+ logger log.Logger
+}
+
+type Option interface {
+ isOption()
+}
+
+// NewHandler returns new HTTP handler to expose gathered metrics using metrics dumper
+func NewHandler(r MetricsStreamer, opts ...Option) http.Handler {
+ if v := reflect.ValueOf(r); !v.IsValid() || v.Kind() == reflect.Ptr && v.IsNil() {
+ panic(nilRegistryPanicMsg)
+ }
+
+ h := handler{
+ registry: r,
+ streamFormat: headers.TypeApplicationJSON,
+ checkTicket: func(h http.Handler) http.Handler {
+ return h
+ },
+ logger: &nop.Logger{},
+ }
+
+ for _, opt := range opts {
+ switch o := opt.(type) {
+ case *tvmOption:
+ h.checkTicket = tvm.CheckServiceTicket(o.client, tvm.WithAllowedClients(AllFetchers))
+ case *spackOption:
+ h.streamFormat = headers.TypeApplicationXSolomonSpack
+ case *loggerOption:
+ h.logger = o.logger
+ default:
+ panic(fmt.Sprintf("unsupported option %T", opt))
+ }
+ }
+
+ return h.checkTicket(h)
+}
+
+func (h handler) okSpack(header http.Header) bool {
+ if h.streamFormat != headers.TypeApplicationXSolomonSpack {
+ return false
+ }
+ for _, header := range header[headers.AcceptKey] {
+ types, err := headers.ParseAccept(header)
+ if err != nil {
+ h.logger.Warn("Can't parse accept header", log.Error(err), log.String("header", header))
+ continue
+ }
+ for _, acceptableType := range types {
+ if acceptableType.Type == headers.TypeApplicationXSolomonSpack {
+ return true
+ }
+ }
+ }
+ return false
+}
+
+func (h handler) okLZ4Compression(header http.Header) bool {
+ for _, header := range header[headers.AcceptEncodingKey] {
+ encodings, err := headers.ParseAcceptEncoding(header)
+ if err != nil {
+ h.logger.Warn("Can't parse accept-encoding header", log.Error(err), log.String("header", header))
+ continue
+ }
+ for _, acceptableEncoding := range encodings {
+ if acceptableEncoding.Encoding == headers.EncodingLZ4 {
+ return true
+ }
+ }
+ }
+ return false
+}
+
+func (h handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ if h.okSpack(r.Header) {
+ compression := solomon.CompressionNone
+ if h.okLZ4Compression(r.Header) {
+ compression = solomon.CompressionLz4
+ }
+ w.Header().Set(headers.ContentTypeKey, headers.TypeApplicationXSolomonSpack.String())
+ _, err := h.registry.StreamSpack(r.Context(), w, compression)
+ if err != nil {
+ h.logger.Error("Failed to write compressed spack", log.Error(err))
+ }
+ return
+ }
+
+ w.Header().Set(headers.ContentTypeKey, headers.TypeApplicationJSON.String())
+ _, err := h.registry.StreamJSON(r.Context(), w)
+ if err != nil {
+ h.logger.Error("Failed to write json", log.Error(err))
+ }
+}
diff --git a/library/go/yandex/solomon/reporters/puller/httppuller/handler_test.go b/library/go/yandex/solomon/reporters/puller/httppuller/handler_test.go
new file mode 100644
index 0000000000..686f9b60f9
--- /dev/null
+++ b/library/go/yandex/solomon/reporters/puller/httppuller/handler_test.go
@@ -0,0 +1,197 @@
+package httppuller
+
+import (
+ "context"
+ "encoding/json"
+ "io"
+ "net/http"
+ "net/http/httptest"
+ "sort"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/ydb-platform/ydb/library/go/core/metrics"
+ "github.com/ydb-platform/ydb/library/go/core/metrics/solomon"
+ "github.com/ydb-platform/ydb/library/go/httputil/headers"
+)
+
+type testMetricsData struct {
+ Metrics []struct {
+ Type string `json:"type"`
+ Labels map[string]string `json:"labels"`
+ Value float64 `json:"value"`
+ Histogram struct {
+ Bounds []float64 `json:"bounds"`
+ Buckets []int64 `json:"buckets"`
+ Inf int64 `json:"inf"`
+ } `json:"hist"`
+ } `json:"metrics"`
+}
+
+type testStreamer struct{}
+
+func (s testStreamer) StreamJSON(context.Context, io.Writer) (int, error) { return 0, nil }
+func (s testStreamer) StreamSpack(context.Context, io.Writer, solomon.CompressionType) (int, error) {
+ return 0, nil
+}
+
+func TestHandler_NewHandler(t *testing.T) {
+ assert.PanicsWithValue(t, nilRegistryPanicMsg, func() { NewHandler(nil) })
+ assert.PanicsWithValue(t, nilRegistryPanicMsg, func() { var s *solomon.Registry; NewHandler(s) })
+ assert.PanicsWithValue(t, nilRegistryPanicMsg, func() { var ts *testStreamer; NewHandler(ts) })
+ assert.NotPanics(t, func() { NewHandler(&solomon.Registry{}) })
+ assert.NotPanics(t, func() { NewHandler(&testStreamer{}) })
+ assert.NotPanics(t, func() { NewHandler(testStreamer{}) })
+}
+
+func TestHandler_ServeHTTP(t *testing.T) {
+ testCases := []struct {
+ name string
+ registry *solomon.Registry
+ expectStatus int
+ expectedApplicationType headers.ContentType
+ expectBody []byte
+ }{
+ {
+ "success_json",
+ func() *solomon.Registry {
+ r := solomon.NewRegistry(solomon.NewRegistryOpts())
+
+ cnt := r.Counter("mycounter")
+ cnt.Add(42)
+
+ gg := r.Gauge("mygauge")
+ gg.Set(2.4)
+
+ hs := r.Histogram("myhistogram", metrics.NewBuckets(1, 2, 3))
+ hs.RecordValue(0.5)
+ hs.RecordValue(1.5)
+ hs.RecordValue(1.7)
+ hs.RecordValue(2.2)
+ hs.RecordValue(42)
+
+ return r
+ }(),
+ http.StatusOK,
+ headers.TypeApplicationJSON,
+ []byte(`
+ {
+ "metrics": [
+ {
+ "type": "COUNTER",
+ "labels": {
+ "sensor": "mycounter"
+ },
+ "value": 42
+ },
+ {
+ "type": "DGAUGE",
+ "labels": {
+ "sensor": "mygauge"
+ },
+ "value": 2.4
+ },
+ {
+ "type": "HIST",
+ "labels": {
+ "sensor": "myhistogram"
+ },
+ "hist": {
+ "bounds": [
+ 1,
+ 2,
+ 3
+ ],
+ "buckets": [
+ 1,
+ 2,
+ 1
+ ],
+ "inf": 1
+ }
+ }
+ ]
+ }
+ `),
+ },
+ {
+ "success_spack",
+ func() *solomon.Registry {
+ r := solomon.NewRegistry(solomon.NewRegistryOpts())
+ _ = r.Histogram("histogram", metrics.NewBuckets(0, 0.1, 0.11))
+ return r
+ }(),
+ http.StatusOK,
+ headers.TypeApplicationXSolomonSpack,
+ []byte{
+ 0x53, 0x50, // magic
+ 0x01, 0x01, // version
+ 0x18, 0x00, // header size
+ 0x0, // time precision
+ 0x0, // compression algorithm
+ 0x7, 0x0, 0x0, 0x0, // label names size
+ 0xa, 0x0, 0x0, 0x0, // label values size
+ 0x1, 0x0, 0x0, 0x0, // metric count
+ 0x1, 0x0, 0x0, 0x0, // point count
+ // label names pool
+ 0x73, 0x65, 0x6e, 0x73, 0x6f, 0x72, 0x0, // "sensor"
+ // label values pool
+ 0x68, 0x69, 0x73, 0x74, 0x6F, 0x67, 0x72, 0x61, 0x6D, 0x0, // "histogram"
+ // common time
+ 0x0, 0x0, 0x0, 0x0,
+ // common labels
+ 0x0,
+ /*types*/ 0x15,
+ /*flags*/ 0x0,
+ /*labels*/ 0x1, // ?
+ /*name*/ 0x0,
+ /*value*/ 0x0,
+ /*buckets count*/ 0x3,
+ /*upper bound 0*/ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
+ /*upper bound 1*/ 0x9a, 0x99, 0x99, 0x99, 0x99, 0x99, 0xb9, 0x3f,
+ /*upper bound 2*/ 0x29, 0x5c, 0x8f, 0xc2, 0xf5, 0x28, 0xbc, 0x3f,
+ /*counter 0*/ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
+ /*counter 1*/ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
+ /*counter 2*/ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
+ },
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ w := httptest.NewRecorder()
+ r, _ := http.NewRequest("GET", "/metrics", nil)
+
+ var h http.Handler
+ if tc.expectedApplicationType == headers.TypeApplicationXSolomonSpack {
+ h = NewHandler(tc.registry, WithSpack())
+ } else {
+ h = NewHandler(tc.registry)
+ }
+
+ r.Header.Set(headers.AcceptKey, tc.expectedApplicationType.String())
+ h.ServeHTTP(w, r)
+ assert.Equal(t, tc.expectStatus, w.Code)
+ assert.Equal(t, tc.expectedApplicationType.String(), w.Header().Get(headers.ContentTypeKey))
+
+ if tc.expectedApplicationType == headers.TypeApplicationXSolomonSpack {
+ assert.EqualValues(t, tc.expectBody, w.Body.Bytes())
+ } else {
+ var expectedObj, givenObj testMetricsData
+ err := json.Unmarshal(tc.expectBody, &expectedObj)
+ assert.NoError(t, err)
+ err = json.Unmarshal(w.Body.Bytes(), &givenObj)
+ assert.NoError(t, err)
+
+ sort.Slice(expectedObj.Metrics, func(i, j int) bool {
+ return expectedObj.Metrics[i].Type < expectedObj.Metrics[j].Type
+ })
+ sort.Slice(givenObj.Metrics, func(i, j int) bool {
+ return givenObj.Metrics[i].Type < givenObj.Metrics[j].Type
+ })
+
+ assert.EqualValues(t, expectedObj, givenObj)
+ }
+ })
+ }
+}
diff --git a/library/go/yandex/solomon/reporters/puller/httppuller/logger.go b/library/go/yandex/solomon/reporters/puller/httppuller/logger.go
new file mode 100644
index 0000000000..19fe4bf733
--- /dev/null
+++ b/library/go/yandex/solomon/reporters/puller/httppuller/logger.go
@@ -0,0 +1,15 @@
+package httppuller
+
+import "github.com/ydb-platform/ydb/library/go/core/log"
+
+type loggerOption struct {
+ logger log.Logger
+}
+
+func (*loggerOption) isOption() {}
+
+func WithLogger(logger log.Logger) Option {
+ return &loggerOption{
+ logger: logger,
+ }
+}
diff --git a/library/go/yandex/solomon/reporters/puller/httppuller/spack.go b/library/go/yandex/solomon/reporters/puller/httppuller/spack.go
new file mode 100644
index 0000000000..cf59abd52a
--- /dev/null
+++ b/library/go/yandex/solomon/reporters/puller/httppuller/spack.go
@@ -0,0 +1,10 @@
+package httppuller
+
+type spackOption struct {
+}
+
+func (*spackOption) isOption() {}
+
+func WithSpack() Option {
+ return &spackOption{}
+}
diff --git a/library/go/yandex/solomon/reporters/puller/httppuller/tvm.go b/library/go/yandex/solomon/reporters/puller/httppuller/tvm.go
new file mode 100644
index 0000000000..e6afeec115
--- /dev/null
+++ b/library/go/yandex/solomon/reporters/puller/httppuller/tvm.go
@@ -0,0 +1,27 @@
+package httppuller
+
+import "github.com/ydb-platform/ydb/library/go/yandex/tvm"
+
+const (
+ FetcherPreTVMID = 2012024
+ FetcherTestTVMID = 2012026
+ FetcherProdTVMID = 2012028
+)
+
+var (
+ AllFetchers = []tvm.ClientID{
+ FetcherPreTVMID,
+ FetcherTestTVMID,
+ FetcherProdTVMID,
+ }
+)
+
+type tvmOption struct {
+ client tvm.Client
+}
+
+func (*tvmOption) isOption() {}
+
+func WithTVM(tvm tvm.Client) Option {
+ return &tvmOption{client: tvm}
+}
diff --git a/library/go/yandex/solomon/reporters/puller/httppuller/tvm_test.go b/library/go/yandex/solomon/reporters/puller/httppuller/tvm_test.go
new file mode 100644
index 0000000000..8eb4d27942
--- /dev/null
+++ b/library/go/yandex/solomon/reporters/puller/httppuller/tvm_test.go
@@ -0,0 +1,80 @@
+package httppuller_test
+
+import (
+ "context"
+ "errors"
+ "net/http"
+ "net/http/httptest"
+ "strings"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/ydb-platform/ydb/library/go/core/metrics/solomon"
+ "github.com/ydb-platform/ydb/library/go/yandex/solomon/reporters/puller/httppuller"
+ "github.com/ydb-platform/ydb/library/go/yandex/tvm"
+)
+
+type fakeTVMClient struct{}
+
+func (f *fakeTVMClient) GetServiceTicketForAlias(ctx context.Context, alias string) (string, error) {
+ return "", &tvm.Error{Code: tvm.ErrorMissingServiceTicket}
+}
+
+func (f *fakeTVMClient) GetServiceTicketForID(ctx context.Context, dstID tvm.ClientID) (string, error) {
+ return "", &tvm.Error{Code: tvm.ErrorMissingServiceTicket}
+}
+
+func (f *fakeTVMClient) CheckServiceTicket(ctx context.Context, ticket string) (*tvm.CheckedServiceTicket, error) {
+ if ticket == "qwerty" {
+ return &tvm.CheckedServiceTicket{SrcID: httppuller.FetcherProdTVMID}, nil
+ }
+
+ return nil, &tvm.Error{Code: tvm.ErrorMissingServiceTicket}
+}
+
+func (f *fakeTVMClient) CheckUserTicket(ctx context.Context, ticket string, opts ...tvm.CheckUserTicketOption) (*tvm.CheckedUserTicket, error) {
+ return nil, &tvm.Error{Code: tvm.ErrorMissingServiceTicket}
+}
+
+func (f *fakeTVMClient) GetStatus(ctx context.Context) (tvm.ClientStatusInfo, error) {
+ return tvm.ClientStatusInfo{}, &tvm.Error{Code: tvm.ErrorMissingServiceTicket}
+}
+
+func (f *fakeTVMClient) GetRoles(ctx context.Context) (*tvm.Roles, error) {
+ return nil, errors.New("not implemented")
+}
+
+var _ tvm.Client = &fakeTVMClient{}
+
+func TestHandler_ServiceTicketValidation(t *testing.T) {
+ registry := solomon.NewRegistry(solomon.NewRegistryOpts())
+ h := httppuller.NewHandler(registry, httppuller.WithTVM(&fakeTVMClient{}))
+
+ t.Run("MissingTicket", func(t *testing.T) {
+ w := httptest.NewRecorder()
+ r, _ := http.NewRequest("GET", "/metrics", nil)
+
+ h.ServeHTTP(w, r)
+ assert.Equal(t, 403, w.Code)
+ assert.Equal(t, "missing service ticket\n", w.Body.String())
+ })
+
+ t.Run("InvalidTicket", func(t *testing.T) {
+ w := httptest.NewRecorder()
+ r, _ := http.NewRequest("GET", "/metrics", nil)
+ r.Header.Add("X-Ya-Service-Ticket", "123456")
+
+ h.ServeHTTP(w, r)
+ assert.Equal(t, 403, w.Code)
+ assert.Truef(t, strings.HasPrefix(w.Body.String(), "service ticket check failed"), "body=%q", w.Body.String())
+ })
+
+ t.Run("GoodTicket", func(t *testing.T) {
+ w := httptest.NewRecorder()
+ r, _ := http.NewRequest("GET", "/metrics", nil)
+ r.Header.Add("X-Ya-Service-Ticket", "qwerty")
+
+ h.ServeHTTP(w, r)
+ assert.Equal(t, 200, w.Code)
+ })
+}
diff --git a/library/go/yandex/solomon/reporters/puller/httppuller/ya.make b/library/go/yandex/solomon/reporters/puller/httppuller/ya.make
new file mode 100644
index 0000000000..283ca566af
--- /dev/null
+++ b/library/go/yandex/solomon/reporters/puller/httppuller/ya.make
@@ -0,0 +1,19 @@
+GO_LIBRARY()
+
+SRCS(
+ handler.go
+ logger.go
+ spack.go
+ tvm.go
+)
+
+GO_TEST_SRCS(handler_test.go)
+
+GO_XTEST_SRCS(
+ example_test.go
+ tvm_test.go
+)
+
+END()
+
+RECURSE(gotest)