aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgzuykov <gzuykov@yandex-team.com>2023-11-12 22:36:24 +0300
committergzuykov <gzuykov@yandex-team.com>2023-11-12 22:49:44 +0300
commite24f824435e26ee6d92e01d20813c6f9e23efe8d (patch)
tree8cc3fab4c28a90a37c4bc12cbd055402ab8d1e20
parent7a8477e5ea1f0f47da3c34465a83ac93e5381fb5 (diff)
downloadydb-e24f824435e26ee6d92e01d20813c6f9e23efe8d.tar.gz
library/go: abstract metrics streamer interface
abstract metrics streamer
-rw-r--r--library/go/core/metrics/collect/policy/ya.make4
-rw-r--r--library/go/core/metrics/collect/ya.make4
-rw-r--r--library/go/core/metrics/internal/pkg/metricsutil/ya.make4
-rw-r--r--library/go/core/metrics/internal/pkg/registryutil/ya.make8
-rw-r--r--library/go/core/metrics/internal/ya.make4
-rw-r--r--library/go/core/metrics/metrics.go6
-rw-r--r--library/go/core/metrics/mock/ya.make2
-rw-r--r--library/go/core/metrics/nop/registry.go12
-rw-r--r--library/go/core/metrics/nop/ya.make2
-rw-r--r--library/go/core/metrics/prometheus/registry.go5
-rw-r--r--library/go/core/metrics/prometheus/registry_opts.go11
-rw-r--r--library/go/core/metrics/prometheus/registry_test.go1
-rw-r--r--library/go/core/metrics/prometheus/stream.go32
-rw-r--r--library/go/core/metrics/prometheus/ya.make7
-rw-r--r--library/go/core/metrics/solomon/registry.go18
-rw-r--r--library/go/core/metrics/solomon/registry_opts.go26
-rw-r--r--library/go/core/metrics/solomon/stream.go12
-rw-r--r--library/go/core/metrics/solomon/ya.make10
18 files changed, 136 insertions, 32 deletions
diff --git a/library/go/core/metrics/collect/policy/ya.make b/library/go/core/metrics/collect/policy/ya.make
index 2717ef9863..f643ecafcf 100644
--- a/library/go/core/metrics/collect/policy/ya.make
+++ b/library/go/core/metrics/collect/policy/ya.make
@@ -1 +1,3 @@
-RECURSE(inflight)
+RECURSE(
+ inflight
+)
diff --git a/library/go/core/metrics/collect/ya.make b/library/go/core/metrics/collect/ya.make
index be81763221..d583b0b7f6 100644
--- a/library/go/core/metrics/collect/ya.make
+++ b/library/go/core/metrics/collect/ya.make
@@ -7,4 +7,6 @@ SRCS(
END()
-RECURSE(policy)
+RECURSE(
+ policy
+)
diff --git a/library/go/core/metrics/internal/pkg/metricsutil/ya.make b/library/go/core/metrics/internal/pkg/metricsutil/ya.make
index 3058637089..3d705b4380 100644
--- a/library/go/core/metrics/internal/pkg/metricsutil/ya.make
+++ b/library/go/core/metrics/internal/pkg/metricsutil/ya.make
@@ -1,5 +1,7 @@
GO_LIBRARY()
-SRCS(buckets.go)
+SRCS(
+ buckets.go
+)
END()
diff --git a/library/go/core/metrics/internal/pkg/registryutil/ya.make b/library/go/core/metrics/internal/pkg/registryutil/ya.make
index 4a1f976d40..80153f9212 100644
--- a/library/go/core/metrics/internal/pkg/registryutil/ya.make
+++ b/library/go/core/metrics/internal/pkg/registryutil/ya.make
@@ -1,9 +1,13 @@
GO_LIBRARY()
-SRCS(registryutil.go)
+SRCS(
+ registryutil.go
+)
GO_TEST_SRCS(registryutil_test.go)
END()
-RECURSE(gotest)
+RECURSE(
+ gotest
+)
diff --git a/library/go/core/metrics/internal/ya.make b/library/go/core/metrics/internal/ya.make
index b2a587f35d..5a7d25ca94 100644
--- a/library/go/core/metrics/internal/ya.make
+++ b/library/go/core/metrics/internal/ya.make
@@ -1 +1,3 @@
-RECURSE(pkg)
+RECURSE(
+ pkg
+)
diff --git a/library/go/core/metrics/metrics.go b/library/go/core/metrics/metrics.go
index 097fca9a55..43beaa47a4 100644
--- a/library/go/core/metrics/metrics.go
+++ b/library/go/core/metrics/metrics.go
@@ -3,6 +3,7 @@ package metrics
import (
"context"
+ "io"
"time"
)
@@ -155,6 +156,11 @@ type Registry interface {
DurationHistogramVec(name string, buckets DurationBuckets, labels []string) TimerVec
}
+// MetricsStreamer represents a registry that can stream collected metrics data to some destination
+type MetricsStreamer interface {
+ Stream(context.Context, io.Writer) (int, error)
+}
+
// CollectPolicy defines how registered gauge metrics are updated via collect func.
type CollectPolicy interface {
RegisteredCounter(counterFunc func() int64) func() int64
diff --git a/library/go/core/metrics/mock/ya.make b/library/go/core/metrics/mock/ya.make
index 0ddaf2285b..15dafba2d9 100644
--- a/library/go/core/metrics/mock/ya.make
+++ b/library/go/core/metrics/mock/ya.make
@@ -3,8 +3,8 @@ GO_LIBRARY()
SRCS(
counter.go
gauge.go
- int_gauge.go
histogram.go
+ int_gauge.go
registry.go
registry_opts.go
timer.go
diff --git a/library/go/core/metrics/nop/registry.go b/library/go/core/metrics/nop/registry.go
index 97ed977ed7..0da02db448 100644
--- a/library/go/core/metrics/nop/registry.go
+++ b/library/go/core/metrics/nop/registry.go
@@ -1,8 +1,14 @@
package nop
-import "github.com/ydb-platform/ydb/library/go/core/metrics"
+import (
+ "context"
+ "io"
+
+ "github.com/ydb-platform/ydb/library/go/core/metrics"
+)
var _ metrics.Registry = (*Registry)(nil)
+var _ metrics.MetricsStreamer = (*Registry)(nil)
type Registry struct{}
@@ -77,3 +83,7 @@ func (r Registry) HistogramVec(_ string, _ metrics.Buckets, _ []string) metrics.
func (r Registry) DurationHistogramVec(_ string, _ metrics.DurationBuckets, _ []string) metrics.TimerVec {
return DurationHistogramVec{}
}
+
+func (r Registry) Stream(_ context.Context, _ io.Writer) (int, error) {
+ return 0, nil
+}
diff --git a/library/go/core/metrics/nop/ya.make b/library/go/core/metrics/nop/ya.make
index 279bc22ef4..20959b3be2 100644
--- a/library/go/core/metrics/nop/ya.make
+++ b/library/go/core/metrics/nop/ya.make
@@ -3,8 +3,8 @@ GO_LIBRARY()
SRCS(
counter.go
gauge.go
- int_gauge.go
histogram.go
+ int_gauge.go
registry.go
timer.go
)
diff --git a/library/go/core/metrics/prometheus/registry.go b/library/go/core/metrics/prometheus/registry.go
index bad45fe617..e10aef935e 100644
--- a/library/go/core/metrics/prometheus/registry.go
+++ b/library/go/core/metrics/prometheus/registry.go
@@ -5,6 +5,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
+ "github.com/prometheus/common/expfmt"
"github.com/ydb-platform/ydb/library/go/core/metrics"
"github.com/ydb-platform/ydb/library/go/core/metrics/internal/pkg/metricsutil"
"github.com/ydb-platform/ydb/library/go/core/metrics/internal/pkg/registryutil"
@@ -12,6 +13,7 @@ import (
)
var _ metrics.Registry = (*Registry)(nil)
+var _ metrics.MetricsStreamer = (*Registry)(nil)
type Registry struct {
rg *prometheus.Registry
@@ -22,6 +24,7 @@ type Registry struct {
tags map[string]string
prefix string
nameSanitizer func(string) string
+ streamFormat expfmt.Format
}
// NewRegistry creates new Prometheus backed registry.
@@ -31,11 +34,13 @@ func NewRegistry(opts *RegistryOpts) *Registry {
m: new(sync.Mutex),
subregistries: make(map[string]*Registry),
tags: make(map[string]string),
+ streamFormat: StreamCompact,
}
if opts != nil {
r.prefix = opts.Prefix
r.tags = opts.Tags
+ r.streamFormat = opts.StreamFormat
if opts.rg != nil {
r.rg = opts.rg
}
diff --git a/library/go/core/metrics/prometheus/registry_opts.go b/library/go/core/metrics/prometheus/registry_opts.go
index fedb019d85..1dab30d40d 100644
--- a/library/go/core/metrics/prometheus/registry_opts.go
+++ b/library/go/core/metrics/prometheus/registry_opts.go
@@ -4,6 +4,7 @@ import (
"context"
"github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/common/expfmt"
"github.com/ydb-platform/ydb/library/go/core/metrics"
"github.com/ydb-platform/ydb/library/go/core/metrics/collect"
"github.com/ydb-platform/ydb/library/go/core/metrics/internal/pkg/registryutil"
@@ -15,12 +16,14 @@ type RegistryOpts struct {
rg *prometheus.Registry
Collectors []func(metrics.Registry)
NameSanitizer func(string) string
+ StreamFormat expfmt.Format
}
// NewRegistryOpts returns new initialized instance of RegistryOpts.
func NewRegistryOpts() *RegistryOpts {
return &RegistryOpts{
- Tags: make(map[string]string),
+ Tags: make(map[string]string),
+ StreamFormat: StreamCompact,
}
}
@@ -82,3 +85,9 @@ func (o *RegistryOpts) SetNameSanitizer(v func(string) string) *RegistryOpts {
o.NameSanitizer = v
return o
}
+
+// SetStreamFormat sets default metrics stream format
+func (o *RegistryOpts) SetStreamFormat(format expfmt.Format) *RegistryOpts {
+ o.StreamFormat = format
+ return o
+}
diff --git a/library/go/core/metrics/prometheus/registry_test.go b/library/go/core/metrics/prometheus/registry_test.go
index 73d071a8de..fe50cebf68 100644
--- a/library/go/core/metrics/prometheus/registry_test.go
+++ b/library/go/core/metrics/prometheus/registry_test.go
@@ -23,6 +23,7 @@ func TestNewRegistry(t *testing.T) {
subregistries: make(map[string]*Registry),
tags: map[string]string{},
prefix: "",
+ streamFormat: StreamCompact,
}
r := NewRegistry(nil)
diff --git a/library/go/core/metrics/prometheus/stream.go b/library/go/core/metrics/prometheus/stream.go
new file mode 100644
index 0000000000..44b2963036
--- /dev/null
+++ b/library/go/core/metrics/prometheus/stream.go
@@ -0,0 +1,32 @@
+package prometheus
+
+import (
+ "context"
+ "fmt"
+ "io"
+
+ "github.com/prometheus/common/expfmt"
+)
+
+const (
+ StreamText expfmt.Format = expfmt.FmtText
+ StreamCompact expfmt.Format = expfmt.FmtProtoCompact
+)
+
+func (r *Registry) Stream(_ context.Context, w io.Writer) (int, error) {
+ metrics, err := r.Gather()
+ if err != nil {
+ return 0, fmt.Errorf("cannot gather metrics: %w", err)
+ }
+
+ enc := expfmt.NewEncoder(w, r.streamFormat)
+ for _, mf := range metrics {
+ if err := enc.Encode(mf); err != nil {
+ return 0, fmt.Errorf("cannot encode metric family: %w", err)
+ }
+ }
+
+ // prometheus encoder does not report how much bytes have been written
+ // so we indicate it by returning -1 instead
+ return -1, nil
+}
diff --git a/library/go/core/metrics/prometheus/ya.make b/library/go/core/metrics/prometheus/ya.make
index b012835f4b..c5d26f8c1b 100644
--- a/library/go/core/metrics/prometheus/ya.make
+++ b/library/go/core/metrics/prometheus/ya.make
@@ -3,10 +3,11 @@ GO_LIBRARY()
SRCS(
counter.go
gauge.go
- int_gauge.go
histogram.go
+ int_gauge.go
registry.go
registry_opts.go
+ stream.go
timer.go
vec.go
)
@@ -22,4 +23,6 @@ GO_TEST_SRCS(
END()
-RECURSE(gotest)
+RECURSE(
+ gotest
+)
diff --git a/library/go/core/metrics/solomon/registry.go b/library/go/core/metrics/solomon/registry.go
index 0ad4d9378a..5e0bfaa71a 100644
--- a/library/go/core/metrics/solomon/registry.go
+++ b/library/go/core/metrics/solomon/registry.go
@@ -11,13 +11,15 @@ import (
)
var _ metrics.Registry = (*Registry)(nil)
+var _ metrics.MetricsStreamer = (*Registry)(nil)
type Registry struct {
- separator string
- prefix string
- tags map[string]string
- rated bool
- useNameTag bool
+ separator string
+ prefix string
+ tags map[string]string
+ rated bool
+ useNameTag bool
+ streamFormat StreamFormat
subregistries map[string]*Registry
m *sync.Mutex
@@ -27,8 +29,9 @@ type Registry struct {
func NewRegistry(opts *RegistryOpts) *Registry {
r := &Registry{
- separator: ".",
- useNameTag: false,
+ separator: ".",
+ useNameTag: false,
+ streamFormat: StreamSpack,
subregistries: make(map[string]*Registry),
m: new(sync.Mutex),
@@ -42,6 +45,7 @@ func NewRegistry(opts *RegistryOpts) *Registry {
r.tags = opts.Tags
r.rated = opts.Rated
r.useNameTag = opts.UseNameTag
+ r.streamFormat = opts.StreamFormat
for _, collector := range opts.Collectors {
collector(r)
}
diff --git a/library/go/core/metrics/solomon/registry_opts.go b/library/go/core/metrics/solomon/registry_opts.go
index c3df17940a..5a68a39283 100644
--- a/library/go/core/metrics/solomon/registry_opts.go
+++ b/library/go/core/metrics/solomon/registry_opts.go
@@ -9,20 +9,22 @@ import (
)
type RegistryOpts struct {
- Separator rune
- Prefix string
- Tags map[string]string
- Rated bool
- UseNameTag bool
- Collectors []func(metrics.Registry)
+ Separator rune
+ Prefix string
+ Tags map[string]string
+ Rated bool
+ UseNameTag bool
+ Collectors []func(metrics.Registry)
+ StreamFormat StreamFormat
}
// NewRegistryOpts returns new initialized instance of RegistryOpts
func NewRegistryOpts() *RegistryOpts {
return &RegistryOpts{
- Separator: '.',
- Tags: make(map[string]string),
- UseNameTag: false,
+ Separator: '.',
+ Tags: make(map[string]string),
+ UseNameTag: false,
+ StreamFormat: StreamSpack,
}
}
@@ -85,3 +87,9 @@ func (o *RegistryOpts) AddCollectors(
})
return o
}
+
+// SetStreamFormat sets default sensors stream format
+func (o *RegistryOpts) SetStreamFormat(format StreamFormat) *RegistryOpts {
+ o.StreamFormat = format
+ return o
+}
diff --git a/library/go/core/metrics/solomon/stream.go b/library/go/core/metrics/solomon/stream.go
index 7cf6d70064..d1481d1efd 100644
--- a/library/go/core/metrics/solomon/stream.go
+++ b/library/go/core/metrics/solomon/stream.go
@@ -12,6 +12,18 @@ const HeaderSize = 24
type StreamFormat string
+const (
+ StreamSpack StreamFormat = "spack"
+ StreamJSON StreamFormat = "json"
+)
+
+func (r *Registry) Stream(ctx context.Context, w io.Writer) (written int, err error) {
+ if r.streamFormat == StreamJSON {
+ return r.StreamJSON(ctx, w)
+ }
+ return r.StreamSpack(ctx, w, CompressionLz4)
+}
+
func (r *Registry) StreamJSON(ctx context.Context, w io.Writer) (written int, err error) {
cw := newCompressedWriter(w, CompressionNone)
diff --git a/library/go/core/metrics/solomon/ya.make b/library/go/core/metrics/solomon/ya.make
index a4de14cadf..806ef55637 100644
--- a/library/go/core/metrics/solomon/ya.make
+++ b/library/go/core/metrics/solomon/ya.make
@@ -7,8 +7,8 @@ SRCS(
func_gauge.go
func_int_gauge.go
gauge.go
- int_gauge.go
histogram.go
+ int_gauge.go
metrics.go
metrics_opts.go
registry.go
@@ -27,18 +27,20 @@ GO_TEST_SRCS(
func_gauge_test.go
func_int_gauge_test.go
gauge_test.go
- int_gauge_test.go
histogram_test.go
+ int_gauge_test.go
metrics_test.go
+ race_test.go
registry_test.go
spack_compression_test.go
spack_test.go
stream_test.go
timer_test.go
vec_test.go
- race_test.go
)
END()
-RECURSE(gotest)
+RECURSE(
+ gotest
+)