diff options
author | gzuykov <gzuykov@yandex-team.com> | 2023-11-12 22:36:24 +0300 |
---|---|---|
committer | gzuykov <gzuykov@yandex-team.com> | 2023-11-12 22:49:44 +0300 |
commit | e24f824435e26ee6d92e01d20813c6f9e23efe8d (patch) | |
tree | 8cc3fab4c28a90a37c4bc12cbd055402ab8d1e20 | |
parent | 7a8477e5ea1f0f47da3c34465a83ac93e5381fb5 (diff) | |
download | ydb-e24f824435e26ee6d92e01d20813c6f9e23efe8d.tar.gz |
library/go: abstract metrics streamer interface
abstract metrics streamer
18 files changed, 136 insertions, 32 deletions
diff --git a/library/go/core/metrics/collect/policy/ya.make b/library/go/core/metrics/collect/policy/ya.make index 2717ef9863..f643ecafcf 100644 --- a/library/go/core/metrics/collect/policy/ya.make +++ b/library/go/core/metrics/collect/policy/ya.make @@ -1 +1,3 @@ -RECURSE(inflight) +RECURSE( + inflight +) diff --git a/library/go/core/metrics/collect/ya.make b/library/go/core/metrics/collect/ya.make index be81763221..d583b0b7f6 100644 --- a/library/go/core/metrics/collect/ya.make +++ b/library/go/core/metrics/collect/ya.make @@ -7,4 +7,6 @@ SRCS( END() -RECURSE(policy) +RECURSE( + policy +) diff --git a/library/go/core/metrics/internal/pkg/metricsutil/ya.make b/library/go/core/metrics/internal/pkg/metricsutil/ya.make index 3058637089..3d705b4380 100644 --- a/library/go/core/metrics/internal/pkg/metricsutil/ya.make +++ b/library/go/core/metrics/internal/pkg/metricsutil/ya.make @@ -1,5 +1,7 @@ GO_LIBRARY() -SRCS(buckets.go) +SRCS( + buckets.go +) END() diff --git a/library/go/core/metrics/internal/pkg/registryutil/ya.make b/library/go/core/metrics/internal/pkg/registryutil/ya.make index 4a1f976d40..80153f9212 100644 --- a/library/go/core/metrics/internal/pkg/registryutil/ya.make +++ b/library/go/core/metrics/internal/pkg/registryutil/ya.make @@ -1,9 +1,13 @@ GO_LIBRARY() -SRCS(registryutil.go) +SRCS( + registryutil.go +) GO_TEST_SRCS(registryutil_test.go) END() -RECURSE(gotest) +RECURSE( + gotest +) diff --git a/library/go/core/metrics/internal/ya.make b/library/go/core/metrics/internal/ya.make index b2a587f35d..5a7d25ca94 100644 --- a/library/go/core/metrics/internal/ya.make +++ b/library/go/core/metrics/internal/ya.make @@ -1 +1,3 @@ -RECURSE(pkg) +RECURSE( + pkg +) diff --git a/library/go/core/metrics/metrics.go b/library/go/core/metrics/metrics.go index 097fca9a55..43beaa47a4 100644 --- a/library/go/core/metrics/metrics.go +++ b/library/go/core/metrics/metrics.go @@ -3,6 +3,7 @@ package metrics import ( "context" + "io" "time" ) @@ -155,6 +156,11 @@ type Registry interface { DurationHistogramVec(name string, buckets DurationBuckets, labels []string) TimerVec } +// MetricsStreamer represents a registry that can stream collected metrics data to some destination +type MetricsStreamer interface { + Stream(context.Context, io.Writer) (int, error) +} + // CollectPolicy defines how registered gauge metrics are updated via collect func. type CollectPolicy interface { RegisteredCounter(counterFunc func() int64) func() int64 diff --git a/library/go/core/metrics/mock/ya.make b/library/go/core/metrics/mock/ya.make index 0ddaf2285b..15dafba2d9 100644 --- a/library/go/core/metrics/mock/ya.make +++ b/library/go/core/metrics/mock/ya.make @@ -3,8 +3,8 @@ GO_LIBRARY() SRCS( counter.go gauge.go - int_gauge.go histogram.go + int_gauge.go registry.go registry_opts.go timer.go diff --git a/library/go/core/metrics/nop/registry.go b/library/go/core/metrics/nop/registry.go index 97ed977ed7..0da02db448 100644 --- a/library/go/core/metrics/nop/registry.go +++ b/library/go/core/metrics/nop/registry.go @@ -1,8 +1,14 @@ package nop -import "github.com/ydb-platform/ydb/library/go/core/metrics" +import ( + "context" + "io" + + "github.com/ydb-platform/ydb/library/go/core/metrics" +) var _ metrics.Registry = (*Registry)(nil) +var _ metrics.MetricsStreamer = (*Registry)(nil) type Registry struct{} @@ -77,3 +83,7 @@ func (r Registry) HistogramVec(_ string, _ metrics.Buckets, _ []string) metrics. func (r Registry) DurationHistogramVec(_ string, _ metrics.DurationBuckets, _ []string) metrics.TimerVec { return DurationHistogramVec{} } + +func (r Registry) Stream(_ context.Context, _ io.Writer) (int, error) { + return 0, nil +} diff --git a/library/go/core/metrics/nop/ya.make b/library/go/core/metrics/nop/ya.make index 279bc22ef4..20959b3be2 100644 --- a/library/go/core/metrics/nop/ya.make +++ b/library/go/core/metrics/nop/ya.make @@ -3,8 +3,8 @@ GO_LIBRARY() SRCS( counter.go gauge.go - int_gauge.go histogram.go + int_gauge.go registry.go timer.go ) diff --git a/library/go/core/metrics/prometheus/registry.go b/library/go/core/metrics/prometheus/registry.go index bad45fe617..e10aef935e 100644 --- a/library/go/core/metrics/prometheus/registry.go +++ b/library/go/core/metrics/prometheus/registry.go @@ -5,6 +5,7 @@ import ( "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" "github.com/ydb-platform/ydb/library/go/core/metrics" "github.com/ydb-platform/ydb/library/go/core/metrics/internal/pkg/metricsutil" "github.com/ydb-platform/ydb/library/go/core/metrics/internal/pkg/registryutil" @@ -12,6 +13,7 @@ import ( ) var _ metrics.Registry = (*Registry)(nil) +var _ metrics.MetricsStreamer = (*Registry)(nil) type Registry struct { rg *prometheus.Registry @@ -22,6 +24,7 @@ type Registry struct { tags map[string]string prefix string nameSanitizer func(string) string + streamFormat expfmt.Format } // NewRegistry creates new Prometheus backed registry. @@ -31,11 +34,13 @@ func NewRegistry(opts *RegistryOpts) *Registry { m: new(sync.Mutex), subregistries: make(map[string]*Registry), tags: make(map[string]string), + streamFormat: StreamCompact, } if opts != nil { r.prefix = opts.Prefix r.tags = opts.Tags + r.streamFormat = opts.StreamFormat if opts.rg != nil { r.rg = opts.rg } diff --git a/library/go/core/metrics/prometheus/registry_opts.go b/library/go/core/metrics/prometheus/registry_opts.go index fedb019d85..1dab30d40d 100644 --- a/library/go/core/metrics/prometheus/registry_opts.go +++ b/library/go/core/metrics/prometheus/registry_opts.go @@ -4,6 +4,7 @@ import ( "context" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/expfmt" "github.com/ydb-platform/ydb/library/go/core/metrics" "github.com/ydb-platform/ydb/library/go/core/metrics/collect" "github.com/ydb-platform/ydb/library/go/core/metrics/internal/pkg/registryutil" @@ -15,12 +16,14 @@ type RegistryOpts struct { rg *prometheus.Registry Collectors []func(metrics.Registry) NameSanitizer func(string) string + StreamFormat expfmt.Format } // NewRegistryOpts returns new initialized instance of RegistryOpts. func NewRegistryOpts() *RegistryOpts { return &RegistryOpts{ - Tags: make(map[string]string), + Tags: make(map[string]string), + StreamFormat: StreamCompact, } } @@ -82,3 +85,9 @@ func (o *RegistryOpts) SetNameSanitizer(v func(string) string) *RegistryOpts { o.NameSanitizer = v return o } + +// SetStreamFormat sets default metrics stream format +func (o *RegistryOpts) SetStreamFormat(format expfmt.Format) *RegistryOpts { + o.StreamFormat = format + return o +} diff --git a/library/go/core/metrics/prometheus/registry_test.go b/library/go/core/metrics/prometheus/registry_test.go index 73d071a8de..fe50cebf68 100644 --- a/library/go/core/metrics/prometheus/registry_test.go +++ b/library/go/core/metrics/prometheus/registry_test.go @@ -23,6 +23,7 @@ func TestNewRegistry(t *testing.T) { subregistries: make(map[string]*Registry), tags: map[string]string{}, prefix: "", + streamFormat: StreamCompact, } r := NewRegistry(nil) diff --git a/library/go/core/metrics/prometheus/stream.go b/library/go/core/metrics/prometheus/stream.go new file mode 100644 index 0000000000..44b2963036 --- /dev/null +++ b/library/go/core/metrics/prometheus/stream.go @@ -0,0 +1,32 @@ +package prometheus + +import ( + "context" + "fmt" + "io" + + "github.com/prometheus/common/expfmt" +) + +const ( + StreamText expfmt.Format = expfmt.FmtText + StreamCompact expfmt.Format = expfmt.FmtProtoCompact +) + +func (r *Registry) Stream(_ context.Context, w io.Writer) (int, error) { + metrics, err := r.Gather() + if err != nil { + return 0, fmt.Errorf("cannot gather metrics: %w", err) + } + + enc := expfmt.NewEncoder(w, r.streamFormat) + for _, mf := range metrics { + if err := enc.Encode(mf); err != nil { + return 0, fmt.Errorf("cannot encode metric family: %w", err) + } + } + + // prometheus encoder does not report how much bytes have been written + // so we indicate it by returning -1 instead + return -1, nil +} diff --git a/library/go/core/metrics/prometheus/ya.make b/library/go/core/metrics/prometheus/ya.make index b012835f4b..c5d26f8c1b 100644 --- a/library/go/core/metrics/prometheus/ya.make +++ b/library/go/core/metrics/prometheus/ya.make @@ -3,10 +3,11 @@ GO_LIBRARY() SRCS( counter.go gauge.go - int_gauge.go histogram.go + int_gauge.go registry.go registry_opts.go + stream.go timer.go vec.go ) @@ -22,4 +23,6 @@ GO_TEST_SRCS( END() -RECURSE(gotest) +RECURSE( + gotest +) diff --git a/library/go/core/metrics/solomon/registry.go b/library/go/core/metrics/solomon/registry.go index 0ad4d9378a..5e0bfaa71a 100644 --- a/library/go/core/metrics/solomon/registry.go +++ b/library/go/core/metrics/solomon/registry.go @@ -11,13 +11,15 @@ import ( ) var _ metrics.Registry = (*Registry)(nil) +var _ metrics.MetricsStreamer = (*Registry)(nil) type Registry struct { - separator string - prefix string - tags map[string]string - rated bool - useNameTag bool + separator string + prefix string + tags map[string]string + rated bool + useNameTag bool + streamFormat StreamFormat subregistries map[string]*Registry m *sync.Mutex @@ -27,8 +29,9 @@ type Registry struct { func NewRegistry(opts *RegistryOpts) *Registry { r := &Registry{ - separator: ".", - useNameTag: false, + separator: ".", + useNameTag: false, + streamFormat: StreamSpack, subregistries: make(map[string]*Registry), m: new(sync.Mutex), @@ -42,6 +45,7 @@ func NewRegistry(opts *RegistryOpts) *Registry { r.tags = opts.Tags r.rated = opts.Rated r.useNameTag = opts.UseNameTag + r.streamFormat = opts.StreamFormat for _, collector := range opts.Collectors { collector(r) } diff --git a/library/go/core/metrics/solomon/registry_opts.go b/library/go/core/metrics/solomon/registry_opts.go index c3df17940a..5a68a39283 100644 --- a/library/go/core/metrics/solomon/registry_opts.go +++ b/library/go/core/metrics/solomon/registry_opts.go @@ -9,20 +9,22 @@ import ( ) type RegistryOpts struct { - Separator rune - Prefix string - Tags map[string]string - Rated bool - UseNameTag bool - Collectors []func(metrics.Registry) + Separator rune + Prefix string + Tags map[string]string + Rated bool + UseNameTag bool + Collectors []func(metrics.Registry) + StreamFormat StreamFormat } // NewRegistryOpts returns new initialized instance of RegistryOpts func NewRegistryOpts() *RegistryOpts { return &RegistryOpts{ - Separator: '.', - Tags: make(map[string]string), - UseNameTag: false, + Separator: '.', + Tags: make(map[string]string), + UseNameTag: false, + StreamFormat: StreamSpack, } } @@ -85,3 +87,9 @@ func (o *RegistryOpts) AddCollectors( }) return o } + +// SetStreamFormat sets default sensors stream format +func (o *RegistryOpts) SetStreamFormat(format StreamFormat) *RegistryOpts { + o.StreamFormat = format + return o +} diff --git a/library/go/core/metrics/solomon/stream.go b/library/go/core/metrics/solomon/stream.go index 7cf6d70064..d1481d1efd 100644 --- a/library/go/core/metrics/solomon/stream.go +++ b/library/go/core/metrics/solomon/stream.go @@ -12,6 +12,18 @@ const HeaderSize = 24 type StreamFormat string +const ( + StreamSpack StreamFormat = "spack" + StreamJSON StreamFormat = "json" +) + +func (r *Registry) Stream(ctx context.Context, w io.Writer) (written int, err error) { + if r.streamFormat == StreamJSON { + return r.StreamJSON(ctx, w) + } + return r.StreamSpack(ctx, w, CompressionLz4) +} + func (r *Registry) StreamJSON(ctx context.Context, w io.Writer) (written int, err error) { cw := newCompressedWriter(w, CompressionNone) diff --git a/library/go/core/metrics/solomon/ya.make b/library/go/core/metrics/solomon/ya.make index a4de14cadf..806ef55637 100644 --- a/library/go/core/metrics/solomon/ya.make +++ b/library/go/core/metrics/solomon/ya.make @@ -7,8 +7,8 @@ SRCS( func_gauge.go func_int_gauge.go gauge.go - int_gauge.go histogram.go + int_gauge.go metrics.go metrics_opts.go registry.go @@ -27,18 +27,20 @@ GO_TEST_SRCS( func_gauge_test.go func_int_gauge_test.go gauge_test.go - int_gauge_test.go histogram_test.go + int_gauge_test.go metrics_test.go + race_test.go registry_test.go spack_compression_test.go spack_test.go stream_test.go timer_test.go vec_test.go - race_test.go ) END() -RECURSE(gotest) +RECURSE( + gotest +) |