diff options
author | hcpp <hcpp@ydb.tech> | 2023-11-08 12:09:41 +0300 |
---|---|---|
committer | hcpp <hcpp@ydb.tech> | 2023-11-08 12:56:14 +0300 |
commit | a361f5b98b98b44ea510d274f6769164640dd5e1 (patch) | |
tree | c47c80962c6e2e7b06798238752fd3da0191a3f6 /library/go/yandex/solomon/reporters/puller | |
parent | 9478806fde1f4d40bd5a45e7cbe77237dab613e9 (diff) | |
download | ydb-a361f5b98b98b44ea510d274f6769164640dd5e1.tar.gz |
metrics have been added
Diffstat (limited to 'library/go/yandex/solomon/reporters/puller')
9 files changed, 511 insertions, 0 deletions
diff --git a/library/go/yandex/solomon/reporters/puller/httppuller/example_test.go b/library/go/yandex/solomon/reporters/puller/httppuller/example_test.go new file mode 100644 index 0000000000..c04c81168d --- /dev/null +++ b/library/go/yandex/solomon/reporters/puller/httppuller/example_test.go @@ -0,0 +1,40 @@ +package httppuller_test + +import ( + "net/http" + "time" + + "github.com/ydb-platform/ydb/library/go/core/metrics/solomon" + "github.com/ydb-platform/ydb/library/go/yandex/solomon/reporters/puller/httppuller" + "github.com/ydb-platform/ydb/library/go/yandex/tvm" +) + +func ExampleNewHandler() { + // create metrics registry + opts := solomon.NewRegistryOpts(). + SetSeparator('_'). + SetPrefix("myprefix") + + reg := solomon.NewRegistry(opts) + + // register new metric + cnt := reg.Counter("cyclesCount") + + // pass metric to your function and do job + go func() { + for { + cnt.Inc() + time.Sleep(1 * time.Second) + } + }() + + // start HTTP server with handler on /metrics URI + mux := http.NewServeMux() + mux.Handle("/metrics", httppuller.NewHandler(reg)) + + // Or start + var tvm tvm.Client + mux.Handle("/secure_metrics", httppuller.NewHandler(reg, httppuller.WithTVM(tvm))) + + _ = http.ListenAndServe(":80", mux) +} diff --git a/library/go/yandex/solomon/reporters/puller/httppuller/gotest/ya.make b/library/go/yandex/solomon/reporters/puller/httppuller/gotest/ya.make new file mode 100644 index 0000000000..cf11e75a33 --- /dev/null +++ b/library/go/yandex/solomon/reporters/puller/httppuller/gotest/ya.make @@ -0,0 +1,3 @@ +GO_TEST_FOR(library/go/yandex/solomon/reporters/puller/httppuller) + +END() diff --git a/library/go/yandex/solomon/reporters/puller/httppuller/handler.go b/library/go/yandex/solomon/reporters/puller/httppuller/handler.go new file mode 100644 index 0000000000..9521d41bdc --- /dev/null +++ b/library/go/yandex/solomon/reporters/puller/httppuller/handler.go @@ -0,0 +1,120 @@ +package httppuller + +import ( + "context" + "fmt" + "io" + "net/http" + "reflect" + + "github.com/ydb-platform/ydb/library/go/core/log" + "github.com/ydb-platform/ydb/library/go/core/log/nop" + "github.com/ydb-platform/ydb/library/go/core/metrics/solomon" + "github.com/ydb-platform/ydb/library/go/httputil/headers" + "github.com/ydb-platform/ydb/library/go/httputil/middleware/tvm" +) + +const nilRegistryPanicMsg = "nil registry given" + +type MetricsStreamer interface { + StreamJSON(context.Context, io.Writer) (int, error) + StreamSpack(context.Context, io.Writer, solomon.CompressionType) (int, error) +} + +type handler struct { + registry MetricsStreamer + streamFormat headers.ContentType + checkTicket func(h http.Handler) http.Handler + logger log.Logger +} + +type Option interface { + isOption() +} + +// NewHandler returns new HTTP handler to expose gathered metrics using metrics dumper +func NewHandler(r MetricsStreamer, opts ...Option) http.Handler { + if v := reflect.ValueOf(r); !v.IsValid() || v.Kind() == reflect.Ptr && v.IsNil() { + panic(nilRegistryPanicMsg) + } + + h := handler{ + registry: r, + streamFormat: headers.TypeApplicationJSON, + checkTicket: func(h http.Handler) http.Handler { + return h + }, + logger: &nop.Logger{}, + } + + for _, opt := range opts { + switch o := opt.(type) { + case *tvmOption: + h.checkTicket = tvm.CheckServiceTicket(o.client, tvm.WithAllowedClients(AllFetchers)) + case *spackOption: + h.streamFormat = headers.TypeApplicationXSolomonSpack + case *loggerOption: + h.logger = o.logger + default: + panic(fmt.Sprintf("unsupported option %T", opt)) + } + } + + return h.checkTicket(h) +} + +func (h handler) okSpack(header http.Header) bool { + if h.streamFormat != headers.TypeApplicationXSolomonSpack { + return false + } + for _, header := range header[headers.AcceptKey] { + types, err := headers.ParseAccept(header) + if err != nil { + h.logger.Warn("Can't parse accept header", log.Error(err), log.String("header", header)) + continue + } + for _, acceptableType := range types { + if acceptableType.Type == headers.TypeApplicationXSolomonSpack { + return true + } + } + } + return false +} + +func (h handler) okLZ4Compression(header http.Header) bool { + for _, header := range header[headers.AcceptEncodingKey] { + encodings, err := headers.ParseAcceptEncoding(header) + if err != nil { + h.logger.Warn("Can't parse accept-encoding header", log.Error(err), log.String("header", header)) + continue + } + for _, acceptableEncoding := range encodings { + if acceptableEncoding.Encoding == headers.EncodingLZ4 { + return true + } + } + } + return false +} + +func (h handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if h.okSpack(r.Header) { + compression := solomon.CompressionNone + if h.okLZ4Compression(r.Header) { + compression = solomon.CompressionLz4 + } + w.Header().Set(headers.ContentTypeKey, headers.TypeApplicationXSolomonSpack.String()) + _, err := h.registry.StreamSpack(r.Context(), w, compression) + if err != nil { + h.logger.Error("Failed to write compressed spack", log.Error(err)) + } + return + } + + w.Header().Set(headers.ContentTypeKey, headers.TypeApplicationJSON.String()) + _, err := h.registry.StreamJSON(r.Context(), w) + if err != nil { + h.logger.Error("Failed to write json", log.Error(err)) + } +} diff --git a/library/go/yandex/solomon/reporters/puller/httppuller/handler_test.go b/library/go/yandex/solomon/reporters/puller/httppuller/handler_test.go new file mode 100644 index 0000000000..686f9b60f9 --- /dev/null +++ b/library/go/yandex/solomon/reporters/puller/httppuller/handler_test.go @@ -0,0 +1,197 @@ +package httppuller + +import ( + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "sort" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/ydb-platform/ydb/library/go/core/metrics" + "github.com/ydb-platform/ydb/library/go/core/metrics/solomon" + "github.com/ydb-platform/ydb/library/go/httputil/headers" +) + +type testMetricsData struct { + Metrics []struct { + Type string `json:"type"` + Labels map[string]string `json:"labels"` + Value float64 `json:"value"` + Histogram struct { + Bounds []float64 `json:"bounds"` + Buckets []int64 `json:"buckets"` + Inf int64 `json:"inf"` + } `json:"hist"` + } `json:"metrics"` +} + +type testStreamer struct{} + +func (s testStreamer) StreamJSON(context.Context, io.Writer) (int, error) { return 0, nil } +func (s testStreamer) StreamSpack(context.Context, io.Writer, solomon.CompressionType) (int, error) { + return 0, nil +} + +func TestHandler_NewHandler(t *testing.T) { + assert.PanicsWithValue(t, nilRegistryPanicMsg, func() { NewHandler(nil) }) + assert.PanicsWithValue(t, nilRegistryPanicMsg, func() { var s *solomon.Registry; NewHandler(s) }) + assert.PanicsWithValue(t, nilRegistryPanicMsg, func() { var ts *testStreamer; NewHandler(ts) }) + assert.NotPanics(t, func() { NewHandler(&solomon.Registry{}) }) + assert.NotPanics(t, func() { NewHandler(&testStreamer{}) }) + assert.NotPanics(t, func() { NewHandler(testStreamer{}) }) +} + +func TestHandler_ServeHTTP(t *testing.T) { + testCases := []struct { + name string + registry *solomon.Registry + expectStatus int + expectedApplicationType headers.ContentType + expectBody []byte + }{ + { + "success_json", + func() *solomon.Registry { + r := solomon.NewRegistry(solomon.NewRegistryOpts()) + + cnt := r.Counter("mycounter") + cnt.Add(42) + + gg := r.Gauge("mygauge") + gg.Set(2.4) + + hs := r.Histogram("myhistogram", metrics.NewBuckets(1, 2, 3)) + hs.RecordValue(0.5) + hs.RecordValue(1.5) + hs.RecordValue(1.7) + hs.RecordValue(2.2) + hs.RecordValue(42) + + return r + }(), + http.StatusOK, + headers.TypeApplicationJSON, + []byte(` + { + "metrics": [ + { + "type": "COUNTER", + "labels": { + "sensor": "mycounter" + }, + "value": 42 + }, + { + "type": "DGAUGE", + "labels": { + "sensor": "mygauge" + }, + "value": 2.4 + }, + { + "type": "HIST", + "labels": { + "sensor": "myhistogram" + }, + "hist": { + "bounds": [ + 1, + 2, + 3 + ], + "buckets": [ + 1, + 2, + 1 + ], + "inf": 1 + } + } + ] + } + `), + }, + { + "success_spack", + func() *solomon.Registry { + r := solomon.NewRegistry(solomon.NewRegistryOpts()) + _ = r.Histogram("histogram", metrics.NewBuckets(0, 0.1, 0.11)) + return r + }(), + http.StatusOK, + headers.TypeApplicationXSolomonSpack, + []byte{ + 0x53, 0x50, // magic + 0x01, 0x01, // version + 0x18, 0x00, // header size + 0x0, // time precision + 0x0, // compression algorithm + 0x7, 0x0, 0x0, 0x0, // label names size + 0xa, 0x0, 0x0, 0x0, // label values size + 0x1, 0x0, 0x0, 0x0, // metric count + 0x1, 0x0, 0x0, 0x0, // point count + // label names pool + 0x73, 0x65, 0x6e, 0x73, 0x6f, 0x72, 0x0, // "sensor" + // label values pool + 0x68, 0x69, 0x73, 0x74, 0x6F, 0x67, 0x72, 0x61, 0x6D, 0x0, // "histogram" + // common time + 0x0, 0x0, 0x0, 0x0, + // common labels + 0x0, + /*types*/ 0x15, + /*flags*/ 0x0, + /*labels*/ 0x1, // ? + /*name*/ 0x0, + /*value*/ 0x0, + /*buckets count*/ 0x3, + /*upper bound 0*/ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + /*upper bound 1*/ 0x9a, 0x99, 0x99, 0x99, 0x99, 0x99, 0xb9, 0x3f, + /*upper bound 2*/ 0x29, 0x5c, 0x8f, 0xc2, 0xf5, 0x28, 0xbc, 0x3f, + /*counter 0*/ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + /*counter 1*/ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + /*counter 2*/ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + w := httptest.NewRecorder() + r, _ := http.NewRequest("GET", "/metrics", nil) + + var h http.Handler + if tc.expectedApplicationType == headers.TypeApplicationXSolomonSpack { + h = NewHandler(tc.registry, WithSpack()) + } else { + h = NewHandler(tc.registry) + } + + r.Header.Set(headers.AcceptKey, tc.expectedApplicationType.String()) + h.ServeHTTP(w, r) + assert.Equal(t, tc.expectStatus, w.Code) + assert.Equal(t, tc.expectedApplicationType.String(), w.Header().Get(headers.ContentTypeKey)) + + if tc.expectedApplicationType == headers.TypeApplicationXSolomonSpack { + assert.EqualValues(t, tc.expectBody, w.Body.Bytes()) + } else { + var expectedObj, givenObj testMetricsData + err := json.Unmarshal(tc.expectBody, &expectedObj) + assert.NoError(t, err) + err = json.Unmarshal(w.Body.Bytes(), &givenObj) + assert.NoError(t, err) + + sort.Slice(expectedObj.Metrics, func(i, j int) bool { + return expectedObj.Metrics[i].Type < expectedObj.Metrics[j].Type + }) + sort.Slice(givenObj.Metrics, func(i, j int) bool { + return givenObj.Metrics[i].Type < givenObj.Metrics[j].Type + }) + + assert.EqualValues(t, expectedObj, givenObj) + } + }) + } +} diff --git a/library/go/yandex/solomon/reporters/puller/httppuller/logger.go b/library/go/yandex/solomon/reporters/puller/httppuller/logger.go new file mode 100644 index 0000000000..19fe4bf733 --- /dev/null +++ b/library/go/yandex/solomon/reporters/puller/httppuller/logger.go @@ -0,0 +1,15 @@ +package httppuller + +import "github.com/ydb-platform/ydb/library/go/core/log" + +type loggerOption struct { + logger log.Logger +} + +func (*loggerOption) isOption() {} + +func WithLogger(logger log.Logger) Option { + return &loggerOption{ + logger: logger, + } +} diff --git a/library/go/yandex/solomon/reporters/puller/httppuller/spack.go b/library/go/yandex/solomon/reporters/puller/httppuller/spack.go new file mode 100644 index 0000000000..cf59abd52a --- /dev/null +++ b/library/go/yandex/solomon/reporters/puller/httppuller/spack.go @@ -0,0 +1,10 @@ +package httppuller + +type spackOption struct { +} + +func (*spackOption) isOption() {} + +func WithSpack() Option { + return &spackOption{} +} diff --git a/library/go/yandex/solomon/reporters/puller/httppuller/tvm.go b/library/go/yandex/solomon/reporters/puller/httppuller/tvm.go new file mode 100644 index 0000000000..e6afeec115 --- /dev/null +++ b/library/go/yandex/solomon/reporters/puller/httppuller/tvm.go @@ -0,0 +1,27 @@ +package httppuller + +import "github.com/ydb-platform/ydb/library/go/yandex/tvm" + +const ( + FetcherPreTVMID = 2012024 + FetcherTestTVMID = 2012026 + FetcherProdTVMID = 2012028 +) + +var ( + AllFetchers = []tvm.ClientID{ + FetcherPreTVMID, + FetcherTestTVMID, + FetcherProdTVMID, + } +) + +type tvmOption struct { + client tvm.Client +} + +func (*tvmOption) isOption() {} + +func WithTVM(tvm tvm.Client) Option { + return &tvmOption{client: tvm} +} diff --git a/library/go/yandex/solomon/reporters/puller/httppuller/tvm_test.go b/library/go/yandex/solomon/reporters/puller/httppuller/tvm_test.go new file mode 100644 index 0000000000..8eb4d27942 --- /dev/null +++ b/library/go/yandex/solomon/reporters/puller/httppuller/tvm_test.go @@ -0,0 +1,80 @@ +package httppuller_test + +import ( + "context" + "errors" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/ydb-platform/ydb/library/go/core/metrics/solomon" + "github.com/ydb-platform/ydb/library/go/yandex/solomon/reporters/puller/httppuller" + "github.com/ydb-platform/ydb/library/go/yandex/tvm" +) + +type fakeTVMClient struct{} + +func (f *fakeTVMClient) GetServiceTicketForAlias(ctx context.Context, alias string) (string, error) { + return "", &tvm.Error{Code: tvm.ErrorMissingServiceTicket} +} + +func (f *fakeTVMClient) GetServiceTicketForID(ctx context.Context, dstID tvm.ClientID) (string, error) { + return "", &tvm.Error{Code: tvm.ErrorMissingServiceTicket} +} + +func (f *fakeTVMClient) CheckServiceTicket(ctx context.Context, ticket string) (*tvm.CheckedServiceTicket, error) { + if ticket == "qwerty" { + return &tvm.CheckedServiceTicket{SrcID: httppuller.FetcherProdTVMID}, nil + } + + return nil, &tvm.Error{Code: tvm.ErrorMissingServiceTicket} +} + +func (f *fakeTVMClient) CheckUserTicket(ctx context.Context, ticket string, opts ...tvm.CheckUserTicketOption) (*tvm.CheckedUserTicket, error) { + return nil, &tvm.Error{Code: tvm.ErrorMissingServiceTicket} +} + +func (f *fakeTVMClient) GetStatus(ctx context.Context) (tvm.ClientStatusInfo, error) { + return tvm.ClientStatusInfo{}, &tvm.Error{Code: tvm.ErrorMissingServiceTicket} +} + +func (f *fakeTVMClient) GetRoles(ctx context.Context) (*tvm.Roles, error) { + return nil, errors.New("not implemented") +} + +var _ tvm.Client = &fakeTVMClient{} + +func TestHandler_ServiceTicketValidation(t *testing.T) { + registry := solomon.NewRegistry(solomon.NewRegistryOpts()) + h := httppuller.NewHandler(registry, httppuller.WithTVM(&fakeTVMClient{})) + + t.Run("MissingTicket", func(t *testing.T) { + w := httptest.NewRecorder() + r, _ := http.NewRequest("GET", "/metrics", nil) + + h.ServeHTTP(w, r) + assert.Equal(t, 403, w.Code) + assert.Equal(t, "missing service ticket\n", w.Body.String()) + }) + + t.Run("InvalidTicket", func(t *testing.T) { + w := httptest.NewRecorder() + r, _ := http.NewRequest("GET", "/metrics", nil) + r.Header.Add("X-Ya-Service-Ticket", "123456") + + h.ServeHTTP(w, r) + assert.Equal(t, 403, w.Code) + assert.Truef(t, strings.HasPrefix(w.Body.String(), "service ticket check failed"), "body=%q", w.Body.String()) + }) + + t.Run("GoodTicket", func(t *testing.T) { + w := httptest.NewRecorder() + r, _ := http.NewRequest("GET", "/metrics", nil) + r.Header.Add("X-Ya-Service-Ticket", "qwerty") + + h.ServeHTTP(w, r) + assert.Equal(t, 200, w.Code) + }) +} diff --git a/library/go/yandex/solomon/reporters/puller/httppuller/ya.make b/library/go/yandex/solomon/reporters/puller/httppuller/ya.make new file mode 100644 index 0000000000..283ca566af --- /dev/null +++ b/library/go/yandex/solomon/reporters/puller/httppuller/ya.make @@ -0,0 +1,19 @@ +GO_LIBRARY() + +SRCS( + handler.go + logger.go + spack.go + tvm.go +) + +GO_TEST_SRCS(handler_test.go) + +GO_XTEST_SRCS( + example_test.go + tvm_test.go +) + +END() + +RECURSE(gotest) |