diff options
author | hcpp <hcpp@ydb.tech> | 2023-11-08 12:09:41 +0300 |
---|---|---|
committer | hcpp <hcpp@ydb.tech> | 2023-11-08 12:56:14 +0300 |
commit | a361f5b98b98b44ea510d274f6769164640dd5e1 (patch) | |
tree | c47c80962c6e2e7b06798238752fd3da0191a3f6 /library/go/core | |
parent | 9478806fde1f4d40bd5a45e7cbe77237dab613e9 (diff) | |
download | ydb-a361f5b98b98b44ea510d274f6769164640dd5e1.tar.gz |
metrics have been added
Diffstat (limited to 'library/go/core')
91 files changed, 8863 insertions, 0 deletions
diff --git a/library/go/core/metrics/buckets.go b/library/go/core/metrics/buckets.go new file mode 100644 index 0000000000..063c0c4418 --- /dev/null +++ b/library/go/core/metrics/buckets.go @@ -0,0 +1,147 @@ +package metrics + +import ( + "sort" + "time" +) + +var ( + _ DurationBuckets = (*durationBuckets)(nil) + _ Buckets = (*buckets)(nil) +) + +const ( + errBucketsCountNeedsGreaterThanZero = "n needs to be > 0" + errBucketsStartNeedsGreaterThanZero = "start needs to be > 0" + errBucketsFactorNeedsGreaterThanOne = "factor needs to be > 1" +) + +type durationBuckets struct { + buckets []time.Duration +} + +// NewDurationBuckets returns new DurationBuckets implementation. +func NewDurationBuckets(bk ...time.Duration) DurationBuckets { + sort.Slice(bk, func(i, j int) bool { + return bk[i] < bk[j] + }) + return durationBuckets{buckets: bk} +} + +func (d durationBuckets) Size() int { + return len(d.buckets) +} + +func (d durationBuckets) MapDuration(dv time.Duration) (idx int) { + for _, bound := range d.buckets { + if dv < bound { + break + } + idx++ + } + return +} + +func (d durationBuckets) UpperBound(idx int) time.Duration { + if idx > d.Size()-1 { + panic("idx is out of bounds") + } + return d.buckets[idx] +} + +type buckets struct { + buckets []float64 +} + +// NewBuckets returns new Buckets implementation. +func NewBuckets(bk ...float64) Buckets { + sort.Slice(bk, func(i, j int) bool { + return bk[i] < bk[j] + }) + return buckets{buckets: bk} +} + +func (d buckets) Size() int { + return len(d.buckets) +} + +func (d buckets) MapValue(v float64) (idx int) { + for _, bound := range d.buckets { + if v < bound { + break + } + idx++ + } + return +} + +func (d buckets) UpperBound(idx int) float64 { + if idx > d.Size()-1 { + panic("idx is out of bounds") + } + return d.buckets[idx] +} + +// MakeLinearBuckets creates a set of linear value buckets. +func MakeLinearBuckets(start, width float64, n int) Buckets { + if n <= 0 { + panic(errBucketsCountNeedsGreaterThanZero) + } + bounds := make([]float64, n) + for i := range bounds { + bounds[i] = start + (float64(i) * width) + } + return NewBuckets(bounds...) +} + +// MakeLinearDurationBuckets creates a set of linear duration buckets. +func MakeLinearDurationBuckets(start, width time.Duration, n int) DurationBuckets { + if n <= 0 { + panic(errBucketsCountNeedsGreaterThanZero) + } + buckets := make([]time.Duration, n) + for i := range buckets { + buckets[i] = start + (time.Duration(i) * width) + } + return NewDurationBuckets(buckets...) +} + +// MakeExponentialBuckets creates a set of exponential value buckets. +func MakeExponentialBuckets(start, factor float64, n int) Buckets { + if n <= 0 { + panic(errBucketsCountNeedsGreaterThanZero) + } + if start <= 0 { + panic(errBucketsStartNeedsGreaterThanZero) + } + if factor <= 1 { + panic(errBucketsFactorNeedsGreaterThanOne) + } + buckets := make([]float64, n) + curr := start + for i := range buckets { + buckets[i] = curr + curr *= factor + } + return NewBuckets(buckets...) +} + +// MakeExponentialDurationBuckets creates a set of exponential duration buckets. +func MakeExponentialDurationBuckets(start time.Duration, factor float64, n int) DurationBuckets { + if n <= 0 { + panic(errBucketsCountNeedsGreaterThanZero) + } + if start <= 0 { + panic(errBucketsStartNeedsGreaterThanZero) + } + if factor <= 1 { + panic(errBucketsFactorNeedsGreaterThanOne) + } + buckets := make([]time.Duration, n) + curr := start + for i := range buckets { + buckets[i] = curr + curr = time.Duration(float64(curr) * factor) + } + return NewDurationBuckets(buckets...) +} diff --git a/library/go/core/metrics/buckets_test.go b/library/go/core/metrics/buckets_test.go new file mode 100644 index 0000000000..70cb6398c2 --- /dev/null +++ b/library/go/core/metrics/buckets_test.go @@ -0,0 +1,183 @@ +package metrics + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestNewDurationBuckets(t *testing.T) { + buckets := []time.Duration{ + 1 * time.Second, + 3 * time.Second, + 5 * time.Second, + } + bk := NewDurationBuckets(buckets...) + + expect := durationBuckets{ + buckets: []time.Duration{ + 1 * time.Second, + 3 * time.Second, + 5 * time.Second, + }, + } + assert.Equal(t, expect, bk) +} + +func Test_durationBuckets_MapDuration(t *testing.T) { + bk := NewDurationBuckets([]time.Duration{ + 1 * time.Second, + 3 * time.Second, + 5 * time.Second, + }...) + + for i := 0; i <= bk.Size(); i++ { + assert.Equal(t, i, bk.MapDuration(time.Duration(i*2)*time.Second)) + } +} + +func Test_durationBuckets_Size(t *testing.T) { + var buckets []time.Duration + for i := 1; i < 3; i++ { + buckets = append(buckets, time.Duration(i)*time.Second) + bk := NewDurationBuckets(buckets...) + assert.Equal(t, i, bk.Size()) + } +} + +func Test_durationBuckets_UpperBound(t *testing.T) { + bk := NewDurationBuckets([]time.Duration{ + 1 * time.Second, + 2 * time.Second, + 3 * time.Second, + }...) + + assert.Panics(t, func() { bk.UpperBound(999) }) + + for i := 0; i < bk.Size()-1; i++ { + assert.Equal(t, time.Duration(i+1)*time.Second, bk.UpperBound(i)) + } +} + +func TestNewBuckets(t *testing.T) { + bk := NewBuckets(1, 3, 5) + + expect := buckets{ + buckets: []float64{1, 3, 5}, + } + assert.Equal(t, expect, bk) +} + +func Test_buckets_MapValue(t *testing.T) { + bk := NewBuckets(1, 3, 5) + + for i := 0; i <= bk.Size(); i++ { + assert.Equal(t, i, bk.MapValue(float64(i*2))) + } +} + +func Test_buckets_Size(t *testing.T) { + var buckets []float64 + for i := 1; i < 3; i++ { + buckets = append(buckets, float64(i)) + bk := NewBuckets(buckets...) + assert.Equal(t, i, bk.Size()) + } +} + +func Test_buckets_UpperBound(t *testing.T) { + bk := NewBuckets(1, 2, 3) + + assert.Panics(t, func() { bk.UpperBound(999) }) + + for i := 0; i < bk.Size()-1; i++ { + assert.Equal(t, float64(i+1), bk.UpperBound(i)) + } +} + +func TestMakeLinearBuckets_CorrectParameters_NotPanics(t *testing.T) { + assert.NotPanics(t, func() { + assert.Equal(t, + NewBuckets(0.0, 1.0, 2.0), + MakeLinearBuckets(0, 1, 3), + ) + }) +} + +func TestMakeLinearBucketsPanicsOnBadCount(t *testing.T) { + assert.Panics(t, func() { + MakeLinearBuckets(0, 1, 0) + }) +} + +func TestMakeLinearDurationBuckets(t *testing.T) { + assert.NotPanics(t, func() { + assert.Equal(t, + NewDurationBuckets(0, time.Second, 2*time.Second), + MakeLinearDurationBuckets(0*time.Second, 1*time.Second, 3), + ) + }) +} + +func TestMakeLinearDurationBucketsPanicsOnBadCount(t *testing.T) { + assert.Panics(t, func() { + MakeLinearDurationBuckets(0*time.Second, 1*time.Second, 0) + }) +} + +func TestMakeExponentialBuckets(t *testing.T) { + assert.NotPanics(t, func() { + assert.Equal( + t, + NewBuckets(2, 4, 8), + MakeExponentialBuckets(2, 2, 3), + ) + }) +} + +func TestMakeExponentialBucketsPanicsOnBadCount(t *testing.T) { + assert.Panics(t, func() { + MakeExponentialBuckets(2, 2, 0) + }) +} + +func TestMakeExponentialBucketsPanicsOnBadStart(t *testing.T) { + assert.Panics(t, func() { + MakeExponentialBuckets(0, 2, 2) + }) +} + +func TestMakeExponentialBucketsPanicsOnBadFactor(t *testing.T) { + assert.Panics(t, func() { + MakeExponentialBuckets(2, 1, 2) + }) +} + +func TestMakeExponentialDurationBuckets(t *testing.T) { + assert.NotPanics(t, func() { + assert.Equal( + t, + NewDurationBuckets(2*time.Second, 4*time.Second, 8*time.Second), + MakeExponentialDurationBuckets(2*time.Second, 2, 3), + ) + }) +} + +func TestMakeExponentialDurationBucketsPanicsOnBadCount(t *testing.T) { + assert.Panics(t, func() { + MakeExponentialDurationBuckets(2*time.Second, 2, 0) + }) +} + +func TestMakeExponentialDurationBucketsPanicsOnBadStart(t *testing.T) { + assert.Panics(t, func() { + MakeExponentialDurationBuckets(0, 2, 2) + }) +} + +func TestMakeExponentialDurationBucketsPanicsOnBadFactor(t *testing.T) { + assert.Panics(t, func() { + MakeExponentialDurationBuckets(2*time.Second, 1, 2) + }) +} diff --git a/library/go/core/metrics/collect/collect.go b/library/go/core/metrics/collect/collect.go new file mode 100644 index 0000000000..492a2f74a5 --- /dev/null +++ b/library/go/core/metrics/collect/collect.go @@ -0,0 +1,9 @@ +package collect + +import ( + "context" + + "github.com/ydb-platform/ydb/library/go/core/metrics" +) + +type Func func(ctx context.Context, r metrics.Registry, c metrics.CollectPolicy) diff --git a/library/go/core/metrics/collect/policy/inflight/inflight.go b/library/go/core/metrics/collect/policy/inflight/inflight.go new file mode 100644 index 0000000000..bc045fe188 --- /dev/null +++ b/library/go/core/metrics/collect/policy/inflight/inflight.go @@ -0,0 +1,78 @@ +package inflight + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/ydb-platform/ydb/library/go/core/metrics" + "github.com/ydb-platform/ydb/library/go/x/xsync" +) + +var _ metrics.CollectPolicy = (*inflightPolicy)(nil) + +type inflightPolicy struct { + addCollectLock sync.Mutex + collect atomic.Value // func(ctx context.Context) + + minUpdateInterval time.Duration + lastUpdate time.Time + + inflight xsync.SingleInflight +} + +func NewCollectorPolicy(opts ...Option) metrics.CollectPolicy { + c := &inflightPolicy{ + minUpdateInterval: time.Second, + inflight: xsync.NewSingleInflight(), + } + c.collect.Store(func(context.Context) {}) + + for _, opt := range opts { + opt(c) + } + + return c +} + +func (i *inflightPolicy) RegisteredCounter(counterFunc func() int64) func() int64 { + return func() int64 { + i.tryInflightUpdate() + return counterFunc() + } +} + +func (i *inflightPolicy) RegisteredGauge(gaugeFunc func() float64) func() float64 { + return func() float64 { + i.tryInflightUpdate() + return gaugeFunc() + } +} + +func (i *inflightPolicy) AddCollect(collect func(context.Context)) { + oldCollect := i.getCollect() + i.setCollect(func(ctx context.Context) { + oldCollect(ctx) + collect(ctx) + }) +} + +func (i *inflightPolicy) tryInflightUpdate() { + i.inflight.Do(func() { + if time.Since(i.lastUpdate) < i.minUpdateInterval { + return + } + + i.getCollect()(context.Background()) + i.lastUpdate = time.Now() + }) +} + +func (i *inflightPolicy) getCollect() func(context.Context) { + return i.collect.Load().(func(context.Context)) +} + +func (i *inflightPolicy) setCollect(collect func(context.Context)) { + i.collect.Store(collect) +} diff --git a/library/go/core/metrics/collect/policy/inflight/inflight_opts.go b/library/go/core/metrics/collect/policy/inflight/inflight_opts.go new file mode 100644 index 0000000000..cc277b0c71 --- /dev/null +++ b/library/go/core/metrics/collect/policy/inflight/inflight_opts.go @@ -0,0 +1,11 @@ +package inflight + +import "time" + +type Option func(*inflightPolicy) + +func WithMinCollectInterval(interval time.Duration) Option { + return func(c *inflightPolicy) { + c.minUpdateInterval = interval + } +} diff --git a/library/go/core/metrics/collect/policy/inflight/ya.make b/library/go/core/metrics/collect/policy/inflight/ya.make new file mode 100644 index 0000000000..6101e04049 --- /dev/null +++ b/library/go/core/metrics/collect/policy/inflight/ya.make @@ -0,0 +1,8 @@ +GO_LIBRARY() + +SRCS( + inflight.go + inflight_opts.go +) + +END() diff --git a/library/go/core/metrics/collect/policy/ya.make b/library/go/core/metrics/collect/policy/ya.make new file mode 100644 index 0000000000..2717ef9863 --- /dev/null +++ b/library/go/core/metrics/collect/policy/ya.make @@ -0,0 +1 @@ +RECURSE(inflight) diff --git a/library/go/core/metrics/collect/system.go b/library/go/core/metrics/collect/system.go new file mode 100644 index 0000000000..a21e91d632 --- /dev/null +++ b/library/go/core/metrics/collect/system.go @@ -0,0 +1,229 @@ +// dashboard generator for these metrics can be found at: github.com/ydb-platform/ydb/arcadia/library/go/yandex/monitoring-dashboards +package collect + +import ( + "context" + "os" + "runtime" + "runtime/debug" + "time" + + "github.com/prometheus/procfs" + "github.com/ydb-platform/ydb/library/go/core/buildinfo" + "github.com/ydb-platform/ydb/library/go/core/metrics" +) + +var _ Func = GoMetrics + +func GoMetrics(_ context.Context, r metrics.Registry, c metrics.CollectPolicy) { + if r == nil { + return + } + r = r.WithPrefix("go") + + var stats debug.GCStats + stats.PauseQuantiles = make([]time.Duration, 5) // Minimum, 25%, 50%, 75%, and maximum pause times. + var numGoroutine, numThread int + var ms runtime.MemStats + + c.AddCollect(func(context.Context) { + debug.ReadGCStats(&stats) + runtime.ReadMemStats(&ms) + + numThread, _ = runtime.ThreadCreateProfile(nil) + numGoroutine = runtime.NumGoroutine() + }) + + gcRegistry := r.WithPrefix("gc") + gcRegistry.FuncCounter("num", c.RegisteredCounter(func() int64 { + return stats.NumGC + })) + gcRegistry.FuncCounter(r.ComposeName("pause", "total", "ns"), c.RegisteredCounter(func() int64 { + return stats.PauseTotal.Nanoseconds() + })) + gcRegistry.FuncGauge(r.ComposeName("pause", "quantile", "min"), c.RegisteredGauge(func() float64 { + return stats.PauseQuantiles[0].Seconds() + })) + gcRegistry.FuncGauge(r.ComposeName("pause", "quantile", "25"), c.RegisteredGauge(func() float64 { + return stats.PauseQuantiles[1].Seconds() + })) + gcRegistry.FuncGauge(r.ComposeName("pause", "quantile", "50"), c.RegisteredGauge(func() float64 { + return stats.PauseQuantiles[2].Seconds() + })) + gcRegistry.FuncGauge(r.ComposeName("pause", "quantile", "75"), c.RegisteredGauge(func() float64 { + return stats.PauseQuantiles[3].Seconds() + })) + gcRegistry.FuncGauge(r.ComposeName("pause", "quantile", "max"), c.RegisteredGauge(func() float64 { + return stats.PauseQuantiles[4].Seconds() + })) + gcRegistry.FuncGauge(r.ComposeName("last", "ts"), c.RegisteredGauge(func() float64 { + return float64(ms.LastGC) + })) + gcRegistry.FuncCounter(r.ComposeName("forced", "num"), c.RegisteredCounter(func() int64 { + return int64(ms.NumForcedGC) + })) + + r.FuncGauge(r.ComposeName("goroutine", "num"), c.RegisteredGauge(func() float64 { + return float64(numGoroutine) + })) + r.FuncGauge(r.ComposeName("thread", "num"), c.RegisteredGauge(func() float64 { + return float64(numThread) + })) + + memRegistry := r.WithPrefix("mem") + memRegistry.FuncCounter(r.ComposeName("alloc", "total"), c.RegisteredCounter(func() int64 { + return int64(ms.TotalAlloc) + })) + memRegistry.FuncGauge("sys", c.RegisteredGauge(func() float64 { + return float64(ms.Sys) + })) + memRegistry.FuncCounter("lookups", c.RegisteredCounter(func() int64 { + return int64(ms.Lookups) + })) + memRegistry.FuncCounter("mallocs", c.RegisteredCounter(func() int64 { + return int64(ms.Mallocs) + })) + memRegistry.FuncCounter("frees", c.RegisteredCounter(func() int64 { + return int64(ms.Frees) + })) + memRegistry.FuncGauge(r.ComposeName("heap", "alloc"), c.RegisteredGauge(func() float64 { + return float64(ms.HeapAlloc) + })) + memRegistry.FuncGauge(r.ComposeName("heap", "sys"), c.RegisteredGauge(func() float64 { + return float64(ms.HeapSys) + })) + memRegistry.FuncGauge(r.ComposeName("heap", "idle"), c.RegisteredGauge(func() float64 { + return float64(ms.HeapIdle) + })) + memRegistry.FuncGauge(r.ComposeName("heap", "inuse"), c.RegisteredGauge(func() float64 { + return float64(ms.HeapInuse) + })) + memRegistry.FuncGauge(r.ComposeName("heap", "released"), c.RegisteredGauge(func() float64 { + return float64(ms.HeapReleased) + })) + memRegistry.FuncGauge(r.ComposeName("heap", "objects"), c.RegisteredGauge(func() float64 { + return float64(ms.HeapObjects) + })) + + memRegistry.FuncGauge(r.ComposeName("stack", "inuse"), c.RegisteredGauge(func() float64 { + return float64(ms.StackInuse) + })) + memRegistry.FuncGauge(r.ComposeName("stack", "sys"), c.RegisteredGauge(func() float64 { + return float64(ms.StackSys) + })) + + memRegistry.FuncGauge(r.ComposeName("span", "inuse"), c.RegisteredGauge(func() float64 { + return float64(ms.MSpanInuse) + })) + memRegistry.FuncGauge(r.ComposeName("span", "sys"), c.RegisteredGauge(func() float64 { + return float64(ms.MSpanSys) + })) + + memRegistry.FuncGauge(r.ComposeName("cache", "inuse"), c.RegisteredGauge(func() float64 { + return float64(ms.MCacheInuse) + })) + memRegistry.FuncGauge(r.ComposeName("cache", "sys"), c.RegisteredGauge(func() float64 { + return float64(ms.MCacheSys) + })) + + memRegistry.FuncGauge(r.ComposeName("buck", "hash", "sys"), c.RegisteredGauge(func() float64 { + return float64(ms.BuckHashSys) + })) + memRegistry.FuncGauge(r.ComposeName("gc", "sys"), c.RegisteredGauge(func() float64 { + return float64(ms.GCSys) + })) + memRegistry.FuncGauge(r.ComposeName("other", "sys"), c.RegisteredGauge(func() float64 { + return float64(ms.OtherSys) + })) + memRegistry.FuncGauge(r.ComposeName("gc", "next"), c.RegisteredGauge(func() float64 { + return float64(ms.NextGC) + })) + + memRegistry.FuncGauge(r.ComposeName("gc", "cpu", "fraction"), c.RegisteredGauge(func() float64 { + return ms.GCCPUFraction + })) +} + +var _ Func = ProcessMetrics + +func ProcessMetrics(_ context.Context, r metrics.Registry, c metrics.CollectPolicy) { + if r == nil { + return + } + buildVersion := buildinfo.Info.ArcadiaSourceRevision + r.WithTags(map[string]string{"revision": buildVersion}).Gauge("build").Set(1.0) + + pid := os.Getpid() + proc, err := procfs.NewProc(pid) + if err != nil { + return + } + + procRegistry := r.WithPrefix("proc") + + var ioStat procfs.ProcIO + var procStat procfs.ProcStat + var fd int + var cpuWait uint64 + + const clocksPerSec = 100 + + c.AddCollect(func(ctx context.Context) { + if gatheredFD, err := proc.FileDescriptorsLen(); err == nil { + fd = gatheredFD + } + + if gatheredIOStat, err := proc.IO(); err == nil { + ioStat.SyscW = gatheredIOStat.SyscW + ioStat.WriteBytes = gatheredIOStat.WriteBytes + ioStat.SyscR = gatheredIOStat.SyscR + ioStat.ReadBytes = gatheredIOStat.ReadBytes + } + + if gatheredStat, err := proc.Stat(); err == nil { + procStat.UTime = gatheredStat.UTime + procStat.STime = gatheredStat.STime + procStat.RSS = gatheredStat.RSS + } + + if gatheredSched, err := proc.Schedstat(); err == nil { + cpuWait = gatheredSched.WaitingNanoseconds + } + }) + + procRegistry.FuncGauge("fd", c.RegisteredGauge(func() float64 { + return float64(fd) + })) + + ioRegistry := procRegistry.WithPrefix("io") + ioRegistry.FuncCounter(r.ComposeName("read", "count"), c.RegisteredCounter(func() int64 { + return int64(ioStat.SyscR) + })) + ioRegistry.FuncCounter(r.ComposeName("read", "bytes"), c.RegisteredCounter(func() int64 { + return int64(ioStat.ReadBytes) + })) + ioRegistry.FuncCounter(r.ComposeName("write", "count"), c.RegisteredCounter(func() int64 { + return int64(ioStat.SyscW) + })) + ioRegistry.FuncCounter(r.ComposeName("write", "bytes"), c.RegisteredCounter(func() int64 { + return int64(ioStat.WriteBytes) + })) + + cpuRegistry := procRegistry.WithPrefix("cpu") + cpuRegistry.FuncCounter(r.ComposeName("total", "ns"), c.RegisteredCounter(func() int64 { + return int64(procStat.UTime+procStat.STime) * (1_000_000_000 / clocksPerSec) + })) + cpuRegistry.FuncCounter(r.ComposeName("user", "ns"), c.RegisteredCounter(func() int64 { + return int64(procStat.UTime) * (1_000_000_000 / clocksPerSec) + })) + cpuRegistry.FuncCounter(r.ComposeName("system", "ns"), c.RegisteredCounter(func() int64 { + return int64(procStat.STime) * (1_000_000_000 / clocksPerSec) + })) + cpuRegistry.FuncCounter(r.ComposeName("wait", "ns"), c.RegisteredCounter(func() int64 { + return int64(cpuWait) + })) + + procRegistry.FuncGauge(r.ComposeName("mem", "rss"), c.RegisteredGauge(func() float64 { + return float64(procStat.RSS) + })) +} diff --git a/library/go/core/metrics/collect/ya.make b/library/go/core/metrics/collect/ya.make new file mode 100644 index 0000000000..be81763221 --- /dev/null +++ b/library/go/core/metrics/collect/ya.make @@ -0,0 +1,10 @@ +GO_LIBRARY() + +SRCS( + collect.go + system.go +) + +END() + +RECURSE(policy) diff --git a/library/go/core/metrics/gotest/ya.make b/library/go/core/metrics/gotest/ya.make new file mode 100644 index 0000000000..d0bdf91982 --- /dev/null +++ b/library/go/core/metrics/gotest/ya.make @@ -0,0 +1,3 @@ +GO_TEST_FOR(library/go/core/metrics) + +END() diff --git a/library/go/core/metrics/internal/pkg/metricsutil/buckets.go b/library/go/core/metrics/internal/pkg/metricsutil/buckets.go new file mode 100644 index 0000000000..e9501fcceb --- /dev/null +++ b/library/go/core/metrics/internal/pkg/metricsutil/buckets.go @@ -0,0 +1,27 @@ +package metricsutil + +import ( + "sort" + + "github.com/ydb-platform/ydb/library/go/core/metrics" +) + +// BucketsBounds unwraps Buckets bounds to slice of float64. +func BucketsBounds(b metrics.Buckets) []float64 { + bkts := make([]float64, b.Size()) + for i := range bkts { + bkts[i] = b.UpperBound(i) + } + sort.Float64s(bkts) + return bkts +} + +// DurationBucketsBounds unwraps DurationBuckets bounds to slice of float64. +func DurationBucketsBounds(b metrics.DurationBuckets) []float64 { + bkts := make([]float64, b.Size()) + for i := range bkts { + bkts[i] = b.UpperBound(i).Seconds() + } + sort.Float64s(bkts) + return bkts +} diff --git a/library/go/core/metrics/internal/pkg/metricsutil/ya.make b/library/go/core/metrics/internal/pkg/metricsutil/ya.make new file mode 100644 index 0000000000..3058637089 --- /dev/null +++ b/library/go/core/metrics/internal/pkg/metricsutil/ya.make @@ -0,0 +1,5 @@ +GO_LIBRARY() + +SRCS(buckets.go) + +END() diff --git a/library/go/core/metrics/internal/pkg/registryutil/gotest/ya.make b/library/go/core/metrics/internal/pkg/registryutil/gotest/ya.make new file mode 100644 index 0000000000..55c204d140 --- /dev/null +++ b/library/go/core/metrics/internal/pkg/registryutil/gotest/ya.make @@ -0,0 +1,3 @@ +GO_TEST_FOR(library/go/core/metrics/internal/pkg/registryutil) + +END() diff --git a/library/go/core/metrics/internal/pkg/registryutil/registryutil.go b/library/go/core/metrics/internal/pkg/registryutil/registryutil.go new file mode 100644 index 0000000000..ebce50d8cb --- /dev/null +++ b/library/go/core/metrics/internal/pkg/registryutil/registryutil.go @@ -0,0 +1,104 @@ +package registryutil + +import ( + "errors" + "fmt" + "sort" + "strconv" + "strings" + + "github.com/OneOfOne/xxhash" +) + +// BuildRegistryKey creates registry name based on given prefix and tags +func BuildRegistryKey(prefix string, tags map[string]string) string { + var builder strings.Builder + + builder.WriteString(strconv.Quote(prefix)) + builder.WriteRune('{') + builder.WriteString(StringifyTags(tags)) + builder.WriteByte('}') + + return builder.String() +} + +// BuildFQName returns name parts joined by given separator. +// Mainly used to append prefix to registry +func BuildFQName(separator string, parts ...string) (name string) { + var b strings.Builder + for _, p := range parts { + if p == "" { + continue + } + if b.Len() > 0 { + b.WriteString(separator) + } + b.WriteString(strings.Trim(p, separator)) + } + return b.String() +} + +// MergeTags merges 2 sets of tags with the tags from tagsRight overriding values from tagsLeft +func MergeTags(leftTags map[string]string, rightTags map[string]string) map[string]string { + if leftTags == nil && rightTags == nil { + return nil + } + + if len(leftTags) == 0 { + return rightTags + } + + if len(rightTags) == 0 { + return leftTags + } + + newTags := make(map[string]string) + for key, value := range leftTags { + newTags[key] = value + } + for key, value := range rightTags { + newTags[key] = value + } + return newTags +} + +// StringifyTags returns string representation of given tags map. +// It is guaranteed that equal sets of tags will produce equal strings. +func StringifyTags(tags map[string]string) string { + keys := make([]string, 0, len(tags)) + for key := range tags { + keys = append(keys, key) + } + sort.Strings(keys) + + var builder strings.Builder + for i, key := range keys { + if i > 0 { + builder.WriteByte(',') + } + builder.WriteString(key + "=" + tags[key]) + } + + return builder.String() +} + +// VectorHash computes hash of metrics vector element +func VectorHash(tags map[string]string, labels []string) (uint64, error) { + if len(tags) != len(labels) { + return 0, errors.New("inconsistent tags and labels sets") + } + + h := xxhash.New64() + + for _, label := range labels { + v, ok := tags[label] + if !ok { + return 0, fmt.Errorf("label '%s' not found in tags", label) + } + _, _ = h.WriteString(label) + _, _ = h.WriteString(v) + _, _ = h.WriteString(",") + } + + return h.Sum64(), nil +} diff --git a/library/go/core/metrics/internal/pkg/registryutil/registryutil_test.go b/library/go/core/metrics/internal/pkg/registryutil/registryutil_test.go new file mode 100644 index 0000000000..5463f04755 --- /dev/null +++ b/library/go/core/metrics/internal/pkg/registryutil/registryutil_test.go @@ -0,0 +1,48 @@ +package registryutil + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestBuildFQName(t *testing.T) { + testCases := []struct { + name string + parts []string + sep string + expected string + }{ + { + name: "empty", + parts: nil, + sep: "_", + expected: "", + }, + { + name: "one part", + parts: []string{"part"}, + sep: "_", + expected: "part", + }, + { + name: "two parts", + parts: []string{"part", "another"}, + sep: "_", + expected: "part_another", + }, + { + name: "parts with sep", + parts: []string{"abcde", "deabc"}, + sep: "abc", + expected: "deabcde", + }, + } + + for _, testCase := range testCases { + c := testCase + t.Run(c.name, func(t *testing.T) { + assert.Equal(t, c.expected, BuildFQName(c.sep, c.parts...)) + }) + } +} diff --git a/library/go/core/metrics/internal/pkg/registryutil/ya.make b/library/go/core/metrics/internal/pkg/registryutil/ya.make new file mode 100644 index 0000000000..4a1f976d40 --- /dev/null +++ b/library/go/core/metrics/internal/pkg/registryutil/ya.make @@ -0,0 +1,9 @@ +GO_LIBRARY() + +SRCS(registryutil.go) + +GO_TEST_SRCS(registryutil_test.go) + +END() + +RECURSE(gotest) diff --git a/library/go/core/metrics/internal/pkg/ya.make b/library/go/core/metrics/internal/pkg/ya.make new file mode 100644 index 0000000000..416d1b3e5d --- /dev/null +++ b/library/go/core/metrics/internal/pkg/ya.make @@ -0,0 +1,4 @@ +RECURSE( + metricsutil + registryutil +) diff --git a/library/go/core/metrics/internal/ya.make b/library/go/core/metrics/internal/ya.make new file mode 100644 index 0000000000..b2a587f35d --- /dev/null +++ b/library/go/core/metrics/internal/ya.make @@ -0,0 +1 @@ +RECURSE(pkg) diff --git a/library/go/core/metrics/metrics.go b/library/go/core/metrics/metrics.go new file mode 100644 index 0000000000..097fca9a55 --- /dev/null +++ b/library/go/core/metrics/metrics.go @@ -0,0 +1,163 @@ +// Package metrics provides interface collecting performance metrics. +package metrics + +import ( + "context" + "time" +) + +// Gauge tracks single float64 value. +type Gauge interface { + Set(value float64) + Add(value float64) +} + +// FuncGauge is Gauge with value provided by callback function. +type FuncGauge interface { + Function() func() float64 +} + +// IntGauge tracks single int64 value. +type IntGauge interface { + Set(value int64) + Add(value int64) +} + +// FuncIntGauge is IntGauge with value provided by callback function. +type FuncIntGauge interface { + Function() func() int64 +} + +// Counter tracks monotonically increasing value. +type Counter interface { + // Inc increments counter by 1. + Inc() + + // Add adds delta to the counter. Delta must be >=0. + Add(delta int64) +} + +// FuncCounter is Counter with value provided by callback function. +type FuncCounter interface { + Function() func() int64 +} + +// Histogram tracks distribution of value. +type Histogram interface { + RecordValue(value float64) +} + +// Timer measures durations. +type Timer interface { + RecordDuration(value time.Duration) +} + +// DurationBuckets defines buckets of the duration histogram. +type DurationBuckets interface { + // Size returns number of buckets. + Size() int + + // MapDuration returns index of the bucket. + // + // index is integer in range [0, Size()). + MapDuration(d time.Duration) int + + // UpperBound of the last bucket is always +Inf. + // + // bucketIndex is integer in range [0, Size()-1). + UpperBound(bucketIndex int) time.Duration +} + +// Buckets defines intervals of the regular histogram. +type Buckets interface { + // Size returns number of buckets. + Size() int + + // MapValue returns index of the bucket. + // + // Index is integer in range [0, Size()). + MapValue(v float64) int + + // UpperBound of the last bucket is always +Inf. + // + // bucketIndex is integer in range [0, Size()-1). + UpperBound(bucketIndex int) float64 +} + +// GaugeVec stores multiple dynamically created gauges. +type GaugeVec interface { + With(map[string]string) Gauge + + // Reset deletes all metrics in vector. + Reset() +} + +// IntGaugeVec stores multiple dynamically created gauges. +type IntGaugeVec interface { + With(map[string]string) IntGauge + + // Reset deletes all metrics in vector. + Reset() +} + +// CounterVec stores multiple dynamically created counters. +type CounterVec interface { + With(map[string]string) Counter + + // Reset deletes all metrics in vector. + Reset() +} + +// TimerVec stores multiple dynamically created timers. +type TimerVec interface { + With(map[string]string) Timer + + // Reset deletes all metrics in vector. + Reset() +} + +// HistogramVec stores multiple dynamically created histograms. +type HistogramVec interface { + With(map[string]string) Histogram + + // Reset deletes all metrics in vector. + Reset() +} + +// Registry creates profiling metrics. +type Registry interface { + // WithTags creates new sub-scope, where each metric has tags attached to it. + WithTags(tags map[string]string) Registry + // WithPrefix creates new sub-scope, where each metric has prefix added to it name. + WithPrefix(prefix string) Registry + + ComposeName(parts ...string) string + + Counter(name string) Counter + CounterVec(name string, labels []string) CounterVec + FuncCounter(name string, function func() int64) FuncCounter + + Gauge(name string) Gauge + GaugeVec(name string, labels []string) GaugeVec + FuncGauge(name string, function func() float64) FuncGauge + + IntGauge(name string) IntGauge + IntGaugeVec(name string, labels []string) IntGaugeVec + FuncIntGauge(name string, function func() int64) FuncIntGauge + + Timer(name string) Timer + TimerVec(name string, labels []string) TimerVec + + Histogram(name string, buckets Buckets) Histogram + HistogramVec(name string, buckets Buckets, labels []string) HistogramVec + + DurationHistogram(name string, buckets DurationBuckets) Timer + DurationHistogramVec(name string, buckets DurationBuckets, labels []string) TimerVec +} + +// CollectPolicy defines how registered gauge metrics are updated via collect func. +type CollectPolicy interface { + RegisteredCounter(counterFunc func() int64) func() int64 + RegisteredGauge(gaugeFunc func() float64) func() float64 + AddCollect(collect func(ctx context.Context)) +} diff --git a/library/go/core/metrics/mock/counter.go b/library/go/core/metrics/mock/counter.go new file mode 100644 index 0000000000..c3016ea1a9 --- /dev/null +++ b/library/go/core/metrics/mock/counter.go @@ -0,0 +1,35 @@ +package mock + +import ( + "github.com/ydb-platform/ydb/library/go/core/metrics" + "go.uber.org/atomic" +) + +var _ metrics.Counter = (*Counter)(nil) + +// Counter tracks monotonically increasing value. +type Counter struct { + Name string + Tags map[string]string + Value *atomic.Int64 +} + +// Inc increments counter by 1. +func (c *Counter) Inc() { + c.Add(1) +} + +// Add adds delta to the counter. Delta must be >=0. +func (c *Counter) Add(delta int64) { + c.Value.Add(delta) +} + +var _ metrics.FuncCounter = (*FuncCounter)(nil) + +type FuncCounter struct { + function func() int64 +} + +func (c FuncCounter) Function() func() int64 { + return c.function +} diff --git a/library/go/core/metrics/mock/gauge.go b/library/go/core/metrics/mock/gauge.go new file mode 100644 index 0000000000..58d2d29beb --- /dev/null +++ b/library/go/core/metrics/mock/gauge.go @@ -0,0 +1,33 @@ +package mock + +import ( + "github.com/ydb-platform/ydb/library/go/core/metrics" + "go.uber.org/atomic" +) + +var _ metrics.Gauge = (*Gauge)(nil) + +// Gauge tracks single float64 value. +type Gauge struct { + Name string + Tags map[string]string + Value *atomic.Float64 +} + +func (g *Gauge) Set(value float64) { + g.Value.Store(value) +} + +func (g *Gauge) Add(value float64) { + g.Value.Add(value) +} + +var _ metrics.FuncGauge = (*FuncGauge)(nil) + +type FuncGauge struct { + function func() float64 +} + +func (g FuncGauge) Function() func() float64 { + return g.function +} diff --git a/library/go/core/metrics/mock/histogram.go b/library/go/core/metrics/mock/histogram.go new file mode 100644 index 0000000000..734d7b5f88 --- /dev/null +++ b/library/go/core/metrics/mock/histogram.go @@ -0,0 +1,40 @@ +package mock + +import ( + "sort" + "sync" + "time" + + "github.com/ydb-platform/ydb/library/go/core/metrics" + "go.uber.org/atomic" +) + +var ( + _ metrics.Histogram = (*Histogram)(nil) + _ metrics.Timer = (*Histogram)(nil) +) + +type Histogram struct { + Name string + Tags map[string]string + BucketBounds []float64 + BucketValues []int64 + InfValue *atomic.Int64 + mutex sync.Mutex +} + +func (h *Histogram) RecordValue(value float64) { + boundIndex := sort.SearchFloat64s(h.BucketBounds, value) + + if boundIndex < len(h.BucketValues) { + h.mutex.Lock() + h.BucketValues[boundIndex] += 1 + h.mutex.Unlock() + } else { + h.InfValue.Inc() + } +} + +func (h *Histogram) RecordDuration(value time.Duration) { + h.RecordValue(value.Seconds()) +} diff --git a/library/go/core/metrics/mock/int_gauge.go b/library/go/core/metrics/mock/int_gauge.go new file mode 100644 index 0000000000..8955107da9 --- /dev/null +++ b/library/go/core/metrics/mock/int_gauge.go @@ -0,0 +1,33 @@ +package mock + +import ( + "github.com/ydb-platform/ydb/library/go/core/metrics" + "go.uber.org/atomic" +) + +var _ metrics.IntGauge = (*IntGauge)(nil) + +// IntGauge tracks single int64 value. +type IntGauge struct { + Name string + Tags map[string]string + Value *atomic.Int64 +} + +func (g *IntGauge) Set(value int64) { + g.Value.Store(value) +} + +func (g *IntGauge) Add(value int64) { + g.Value.Add(value) +} + +var _ metrics.FuncIntGauge = (*FuncIntGauge)(nil) + +type FuncIntGauge struct { + function func() int64 +} + +func (g FuncIntGauge) Function() func() int64 { + return g.function +} diff --git a/library/go/core/metrics/mock/registry.go b/library/go/core/metrics/mock/registry.go new file mode 100644 index 0000000000..77f465f8ea --- /dev/null +++ b/library/go/core/metrics/mock/registry.go @@ -0,0 +1,224 @@ +package mock + +import ( + "sync" + + "github.com/ydb-platform/ydb/library/go/core/metrics" + "github.com/ydb-platform/ydb/library/go/core/metrics/internal/pkg/metricsutil" + "github.com/ydb-platform/ydb/library/go/core/metrics/internal/pkg/registryutil" + "go.uber.org/atomic" +) + +var _ metrics.Registry = (*Registry)(nil) + +type Registry struct { + separator string + prefix string + tags map[string]string + allowLoadRegisteredMetrics bool + + subregistries map[string]*Registry + m *sync.Mutex + + metrics *sync.Map +} + +func NewRegistry(opts *RegistryOpts) *Registry { + r := &Registry{ + separator: ".", + + subregistries: make(map[string]*Registry), + m: new(sync.Mutex), + + metrics: new(sync.Map), + } + + if opts != nil { + r.separator = string(opts.Separator) + r.prefix = opts.Prefix + r.tags = opts.Tags + r.allowLoadRegisteredMetrics = opts.AllowLoadRegisteredMetrics + } + + return r +} + +// WithTags creates new sub-scope, where each metric has tags attached to it. +func (r Registry) WithTags(tags map[string]string) metrics.Registry { + return r.newSubregistry(r.prefix, registryutil.MergeTags(r.tags, tags)) +} + +// WithPrefix creates new sub-scope, where each metric has prefix added to it name. +func (r Registry) WithPrefix(prefix string) metrics.Registry { + return r.newSubregistry(registryutil.BuildFQName(r.separator, r.prefix, prefix), r.tags) +} + +func (r Registry) ComposeName(parts ...string) string { + return registryutil.BuildFQName(r.separator, parts...) +} + +func (r Registry) Counter(name string) metrics.Counter { + s := &Counter{ + Name: r.newMetricName(name), + Tags: r.tags, + Value: new(atomic.Int64), + } + + key := registryutil.BuildRegistryKey(s.Name, r.tags) + if val, loaded := r.metrics.LoadOrStore(key, s); loaded { + if r.allowLoadRegisteredMetrics { + return val.(*Counter) + } + panic("metric with key " + key + " already registered") + } + return s +} + +func (r Registry) FuncCounter(name string, function func() int64) metrics.FuncCounter { + metricName := r.newMetricName(name) + key := registryutil.BuildRegistryKey(metricName, r.tags) + s := FuncCounter{function: function} + if _, loaded := r.metrics.LoadOrStore(key, s); loaded { + panic("metric with key " + key + " already registered") + } + return s +} + +func (r Registry) Gauge(name string) metrics.Gauge { + s := &Gauge{ + Name: r.newMetricName(name), + Tags: r.tags, + Value: new(atomic.Float64), + } + + key := registryutil.BuildRegistryKey(s.Name, r.tags) + if val, loaded := r.metrics.LoadOrStore(key, s); loaded { + if r.allowLoadRegisteredMetrics { + return val.(*Gauge) + } + panic("metric with key " + key + " already registered") + } + return s +} + +func (r Registry) FuncGauge(name string, function func() float64) metrics.FuncGauge { + metricName := r.newMetricName(name) + key := registryutil.BuildRegistryKey(metricName, r.tags) + s := FuncGauge{function: function} + if _, loaded := r.metrics.LoadOrStore(key, s); loaded { + panic("metric with key " + key + " already registered") + } + return s +} + +func (r *Registry) IntGauge(name string) metrics.IntGauge { + s := &IntGauge{ + Name: r.newMetricName(name), + Tags: r.tags, + Value: new(atomic.Int64), + } + + key := registryutil.BuildRegistryKey(s.Name, r.tags) + if val, loaded := r.metrics.LoadOrStore(key, s); loaded { + if r.allowLoadRegisteredMetrics { + return val.(*IntGauge) + } + panic("metric with key " + key + " already registered") + } + return s +} + +func (r *Registry) FuncIntGauge(name string, function func() int64) metrics.FuncIntGauge { + metricName := r.newMetricName(name) + key := registryutil.BuildRegistryKey(metricName, r.tags) + s := FuncIntGauge{function: function} + if _, loaded := r.metrics.LoadOrStore(key, s); loaded { + panic("metric with key " + key + " already registered") + } + return s +} + +func (r Registry) Timer(name string) metrics.Timer { + s := &Timer{ + Name: r.newMetricName(name), + Tags: r.tags, + Value: new(atomic.Duration), + } + + key := registryutil.BuildRegistryKey(s.Name, r.tags) + if val, loaded := r.metrics.LoadOrStore(key, s); loaded { + if r.allowLoadRegisteredMetrics { + return val.(*Timer) + } + panic("metric with key " + key + " already registered") + } + return s +} + +func (r Registry) Histogram(name string, buckets metrics.Buckets) metrics.Histogram { + s := &Histogram{ + Name: r.newMetricName(name), + Tags: r.tags, + BucketBounds: metricsutil.BucketsBounds(buckets), + BucketValues: make([]int64, buckets.Size()), + InfValue: new(atomic.Int64), + } + + key := registryutil.BuildRegistryKey(s.Name, r.tags) + if val, loaded := r.metrics.LoadOrStore(key, s); loaded { + if r.allowLoadRegisteredMetrics { + return val.(*Histogram) + } + panic("metric with key " + key + " already registered") + } + return s +} + +func (r Registry) DurationHistogram(name string, buckets metrics.DurationBuckets) metrics.Timer { + s := &Histogram{ + Name: r.newMetricName(name), + Tags: r.tags, + BucketBounds: metricsutil.DurationBucketsBounds(buckets), + BucketValues: make([]int64, buckets.Size()), + InfValue: new(atomic.Int64), + } + + key := registryutil.BuildRegistryKey(s.Name, r.tags) + if val, loaded := r.metrics.LoadOrStore(key, s); loaded { + if r.allowLoadRegisteredMetrics { + return val.(*Histogram) + } + panic("metric with key " + key + " already registered") + } + return s +} + +func (r *Registry) newSubregistry(prefix string, tags map[string]string) *Registry { + registryKey := registryutil.BuildRegistryKey(prefix, tags) + + r.m.Lock() + defer r.m.Unlock() + + if existing, ok := r.subregistries[registryKey]; ok { + return existing + } + + subregistry := &Registry{ + separator: r.separator, + prefix: prefix, + tags: tags, + allowLoadRegisteredMetrics: r.allowLoadRegisteredMetrics, + + subregistries: r.subregistries, + m: r.m, + + metrics: r.metrics, + } + + r.subregistries[registryKey] = subregistry + return subregistry +} + +func (r *Registry) newMetricName(name string) string { + return registryutil.BuildFQName(r.separator, r.prefix, name) +} diff --git a/library/go/core/metrics/mock/registry_opts.go b/library/go/core/metrics/mock/registry_opts.go new file mode 100644 index 0000000000..1cc1c3970d --- /dev/null +++ b/library/go/core/metrics/mock/registry_opts.go @@ -0,0 +1,52 @@ +package mock + +import ( + "github.com/ydb-platform/ydb/library/go/core/metrics/internal/pkg/registryutil" +) + +type RegistryOpts struct { + Separator rune + Prefix string + Tags map[string]string + AllowLoadRegisteredMetrics bool +} + +// NewRegistryOpts returns new initialized instance of RegistryOpts +func NewRegistryOpts() *RegistryOpts { + return &RegistryOpts{ + Separator: '.', + Tags: make(map[string]string), + } +} + +// SetTags overrides existing tags +func (o *RegistryOpts) SetTags(tags map[string]string) *RegistryOpts { + o.Tags = tags + return o +} + +// AddTags merges given tags with existing +func (o *RegistryOpts) AddTags(tags map[string]string) *RegistryOpts { + for k, v := range tags { + o.Tags[k] = v + } + return o +} + +// SetPrefix overrides existing prefix +func (o *RegistryOpts) SetPrefix(prefix string) *RegistryOpts { + o.Prefix = prefix + return o +} + +// AppendPrefix adds given prefix as postfix to existing using separator +func (o *RegistryOpts) AppendPrefix(prefix string) *RegistryOpts { + o.Prefix = registryutil.BuildFQName(string(o.Separator), o.Prefix, prefix) + return o +} + +// SetSeparator overrides existing separator +func (o *RegistryOpts) SetSeparator(separator rune) *RegistryOpts { + o.Separator = separator + return o +} diff --git a/library/go/core/metrics/mock/timer.go b/library/go/core/metrics/mock/timer.go new file mode 100644 index 0000000000..3ea3629ca9 --- /dev/null +++ b/library/go/core/metrics/mock/timer.go @@ -0,0 +1,21 @@ +package mock + +import ( + "time" + + "github.com/ydb-platform/ydb/library/go/core/metrics" + "go.uber.org/atomic" +) + +var _ metrics.Timer = (*Timer)(nil) + +// Timer measures gauge duration. +type Timer struct { + Name string + Tags map[string]string + Value *atomic.Duration +} + +func (t *Timer) RecordDuration(value time.Duration) { + t.Value.Store(value) +} diff --git a/library/go/core/metrics/mock/vec.go b/library/go/core/metrics/mock/vec.go new file mode 100644 index 0000000000..f1cde3d47c --- /dev/null +++ b/library/go/core/metrics/mock/vec.go @@ -0,0 +1,256 @@ +package mock + +import ( + "sync" + + "github.com/ydb-platform/ydb/library/go/core/metrics" + "github.com/ydb-platform/ydb/library/go/core/metrics/internal/pkg/registryutil" +) + +type MetricsVector interface { + With(map[string]string) interface{} + + // Reset deletes all metrics in vector. + Reset() +} + +// Vector is base implementation of vector of metrics of any supported type +type Vector struct { + Labels []string + Mtx sync.RWMutex // Protects metrics. + Metrics map[uint64]interface{} + NewMetric func(map[string]string) interface{} +} + +func (v *Vector) With(tags map[string]string) interface{} { + hv, err := registryutil.VectorHash(tags, v.Labels) + if err != nil { + panic(err) + } + + v.Mtx.RLock() + metric, ok := v.Metrics[hv] + v.Mtx.RUnlock() + if ok { + return metric + } + + v.Mtx.Lock() + defer v.Mtx.Unlock() + + metric, ok = v.Metrics[hv] + if !ok { + metric = v.NewMetric(tags) + v.Metrics[hv] = metric + } + + return metric +} + +// Reset deletes all metrics in this vector. +func (v *Vector) Reset() { + v.Mtx.Lock() + defer v.Mtx.Unlock() + + for h := range v.Metrics { + delete(v.Metrics, h) + } +} + +var _ metrics.CounterVec = (*CounterVec)(nil) + +// CounterVec stores counters and +// implements metrics.CounterVec interface +type CounterVec struct { + Vec MetricsVector +} + +// CounterVec creates a new counters vector with given metric name and +// partitioned by the given label names. +func (r *Registry) CounterVec(name string, labels []string) metrics.CounterVec { + return &CounterVec{ + Vec: &Vector{ + Labels: append([]string(nil), labels...), + Metrics: make(map[uint64]interface{}), + NewMetric: func(tags map[string]string) interface{} { + return r.WithTags(tags).Counter(name) + }, + }, + } +} + +// With creates new or returns existing counter with given tags from vector. +// It will panic if tags keys set is not equal to vector labels. +func (v *CounterVec) With(tags map[string]string) metrics.Counter { + return v.Vec.With(tags).(*Counter) +} + +// Reset deletes all metrics in this vector. +func (v *CounterVec) Reset() { + v.Vec.Reset() +} + +var _ metrics.GaugeVec = new(GaugeVec) + +// GaugeVec stores gauges and +// implements metrics.GaugeVec interface +type GaugeVec struct { + Vec MetricsVector +} + +// GaugeVec creates a new gauges vector with given metric name and +// partitioned by the given label names. +func (r *Registry) GaugeVec(name string, labels []string) metrics.GaugeVec { + return &GaugeVec{ + Vec: &Vector{ + Labels: append([]string(nil), labels...), + Metrics: make(map[uint64]interface{}), + NewMetric: func(tags map[string]string) interface{} { + return r.WithTags(tags).Gauge(name) + }, + }, + } +} + +// With creates new or returns existing gauge with given tags from vector. +// It will panic if tags keys set is not equal to vector labels. +func (v *GaugeVec) With(tags map[string]string) metrics.Gauge { + return v.Vec.With(tags).(*Gauge) +} + +// Reset deletes all metrics in this vector. +func (v *GaugeVec) Reset() { + v.Vec.Reset() +} + +var _ metrics.IntGaugeVec = new(IntGaugeVec) + +// IntGaugeVec stores gauges and +// implements metrics.IntGaugeVec interface +type IntGaugeVec struct { + Vec MetricsVector +} + +// IntGaugeVec creates a new gauges vector with given metric name and +// partitioned by the given label names. +func (r *Registry) IntGaugeVec(name string, labels []string) metrics.IntGaugeVec { + return &IntGaugeVec{ + Vec: &Vector{ + Labels: append([]string(nil), labels...), + Metrics: make(map[uint64]interface{}), + NewMetric: func(tags map[string]string) interface{} { + return r.WithTags(tags).IntGauge(name) + }, + }, + } +} + +// With creates new or returns existing gauge with given tags from vector. +// It will panic if tags keys set is not equal to vector labels. +func (v *IntGaugeVec) With(tags map[string]string) metrics.IntGauge { + return v.Vec.With(tags).(*IntGauge) +} + +// Reset deletes all metrics in this vector. +func (v *IntGaugeVec) Reset() { + v.Vec.Reset() +} + +var _ metrics.TimerVec = new(TimerVec) + +// TimerVec stores timers and +// implements metrics.TimerVec interface +type TimerVec struct { + Vec MetricsVector +} + +// TimerVec creates a new timers vector with given metric name and +// partitioned by the given label names. +func (r *Registry) TimerVec(name string, labels []string) metrics.TimerVec { + return &TimerVec{ + Vec: &Vector{ + Labels: append([]string(nil), labels...), + Metrics: make(map[uint64]interface{}), + NewMetric: func(tags map[string]string) interface{} { + return r.WithTags(tags).Timer(name) + }, + }, + } +} + +// With creates new or returns existing timer with given tags from vector. +// It will panic if tags keys set is not equal to vector labels. +func (v *TimerVec) With(tags map[string]string) metrics.Timer { + return v.Vec.With(tags).(*Timer) +} + +// Reset deletes all metrics in this vector. +func (v *TimerVec) Reset() { + v.Vec.Reset() +} + +var _ metrics.HistogramVec = (*HistogramVec)(nil) + +// HistogramVec stores histograms and +// implements metrics.HistogramVec interface +type HistogramVec struct { + Vec MetricsVector +} + +// HistogramVec creates a new histograms vector with given metric name and buckets and +// partitioned by the given label names. +func (r *Registry) HistogramVec(name string, buckets metrics.Buckets, labels []string) metrics.HistogramVec { + return &HistogramVec{ + Vec: &Vector{ + Labels: append([]string(nil), labels...), + Metrics: make(map[uint64]interface{}), + NewMetric: func(tags map[string]string) interface{} { + return r.WithTags(tags).Histogram(name, buckets) + }, + }, + } +} + +// With creates new or returns existing histogram with given tags from vector. +// It will panic if tags keys set is not equal to vector labels. +func (v *HistogramVec) With(tags map[string]string) metrics.Histogram { + return v.Vec.With(tags).(*Histogram) +} + +// Reset deletes all metrics in this vector. +func (v *HistogramVec) Reset() { + v.Vec.Reset() +} + +var _ metrics.TimerVec = (*DurationHistogramVec)(nil) + +// DurationHistogramVec stores duration histograms and +// implements metrics.TimerVec interface +type DurationHistogramVec struct { + Vec MetricsVector +} + +// DurationHistogramVec creates a new duration histograms vector with given metric name and buckets and +// partitioned by the given label names. +func (r *Registry) DurationHistogramVec(name string, buckets metrics.DurationBuckets, labels []string) metrics.TimerVec { + return &DurationHistogramVec{ + Vec: &Vector{ + Labels: append([]string(nil), labels...), + Metrics: make(map[uint64]interface{}), + NewMetric: func(tags map[string]string) interface{} { + return r.WithTags(tags).DurationHistogram(name, buckets) + }, + }, + } +} + +// With creates new or returns existing duration histogram with given tags from vector. +// It will panic if tags keys set is not equal to vector labels. +func (v *DurationHistogramVec) With(tags map[string]string) metrics.Timer { + return v.Vec.With(tags).(*Histogram) +} + +// Reset deletes all metrics in this vector. +func (v *DurationHistogramVec) Reset() { + v.Vec.Reset() +} diff --git a/library/go/core/metrics/mock/ya.make b/library/go/core/metrics/mock/ya.make new file mode 100644 index 0000000000..0ddaf2285b --- /dev/null +++ b/library/go/core/metrics/mock/ya.make @@ -0,0 +1,14 @@ +GO_LIBRARY() + +SRCS( + counter.go + gauge.go + int_gauge.go + histogram.go + registry.go + registry_opts.go + timer.go + vec.go +) + +END() diff --git a/library/go/core/metrics/nop/counter.go b/library/go/core/metrics/nop/counter.go new file mode 100644 index 0000000000..65a36910da --- /dev/null +++ b/library/go/core/metrics/nop/counter.go @@ -0,0 +1,31 @@ +package nop + +import "github.com/ydb-platform/ydb/library/go/core/metrics" + +var _ metrics.Counter = (*Counter)(nil) + +type Counter struct{} + +func (Counter) Inc() {} + +func (Counter) Add(_ int64) {} + +var _ metrics.CounterVec = (*CounterVec)(nil) + +type CounterVec struct{} + +func (t CounterVec) With(_ map[string]string) metrics.Counter { + return Counter{} +} + +func (t CounterVec) Reset() {} + +var _ metrics.FuncCounter = (*FuncCounter)(nil) + +type FuncCounter struct { + function func() int64 +} + +func (c FuncCounter) Function() func() int64 { + return c.function +} diff --git a/library/go/core/metrics/nop/gauge.go b/library/go/core/metrics/nop/gauge.go new file mode 100644 index 0000000000..9ab9ff6d77 --- /dev/null +++ b/library/go/core/metrics/nop/gauge.go @@ -0,0 +1,31 @@ +package nop + +import "github.com/ydb-platform/ydb/library/go/core/metrics" + +var _ metrics.Gauge = (*Gauge)(nil) + +type Gauge struct{} + +func (Gauge) Set(_ float64) {} + +func (Gauge) Add(_ float64) {} + +var _ metrics.GaugeVec = (*GaugeVec)(nil) + +type GaugeVec struct{} + +func (t GaugeVec) With(_ map[string]string) metrics.Gauge { + return Gauge{} +} + +func (t GaugeVec) Reset() {} + +var _ metrics.FuncGauge = (*FuncGauge)(nil) + +type FuncGauge struct { + function func() float64 +} + +func (g FuncGauge) Function() func() float64 { + return g.function +} diff --git a/library/go/core/metrics/nop/histogram.go b/library/go/core/metrics/nop/histogram.go new file mode 100644 index 0000000000..bde571323c --- /dev/null +++ b/library/go/core/metrics/nop/histogram.go @@ -0,0 +1,38 @@ +package nop + +import ( + "time" + + "github.com/ydb-platform/ydb/library/go/core/metrics" +) + +var ( + _ metrics.Histogram = (*Histogram)(nil) + _ metrics.Timer = (*Histogram)(nil) +) + +type Histogram struct{} + +func (Histogram) RecordValue(_ float64) {} + +func (Histogram) RecordDuration(_ time.Duration) {} + +var _ metrics.HistogramVec = (*HistogramVec)(nil) + +type HistogramVec struct{} + +func (t HistogramVec) With(_ map[string]string) metrics.Histogram { + return Histogram{} +} + +func (t HistogramVec) Reset() {} + +var _ metrics.TimerVec = (*DurationHistogramVec)(nil) + +type DurationHistogramVec struct{} + +func (t DurationHistogramVec) With(_ map[string]string) metrics.Timer { + return Histogram{} +} + +func (t DurationHistogramVec) Reset() {} diff --git a/library/go/core/metrics/nop/int_gauge.go b/library/go/core/metrics/nop/int_gauge.go new file mode 100644 index 0000000000..226059a79d --- /dev/null +++ b/library/go/core/metrics/nop/int_gauge.go @@ -0,0 +1,31 @@ +package nop + +import "github.com/ydb-platform/ydb/library/go/core/metrics" + +var _ metrics.IntGauge = (*IntGauge)(nil) + +type IntGauge struct{} + +func (IntGauge) Set(_ int64) {} + +func (IntGauge) Add(_ int64) {} + +var _ metrics.IntGaugeVec = (*IntGaugeVec)(nil) + +type IntGaugeVec struct{} + +func (t IntGaugeVec) With(_ map[string]string) metrics.IntGauge { + return IntGauge{} +} + +func (t IntGaugeVec) Reset() {} + +var _ metrics.FuncIntGauge = (*FuncIntGauge)(nil) + +type FuncIntGauge struct { + function func() int64 +} + +func (g FuncIntGauge) Function() func() int64 { + return g.function +} diff --git a/library/go/core/metrics/nop/registry.go b/library/go/core/metrics/nop/registry.go new file mode 100644 index 0000000000..97ed977ed7 --- /dev/null +++ b/library/go/core/metrics/nop/registry.go @@ -0,0 +1,79 @@ +package nop + +import "github.com/ydb-platform/ydb/library/go/core/metrics" + +var _ metrics.Registry = (*Registry)(nil) + +type Registry struct{} + +func (r Registry) ComposeName(parts ...string) string { + return "" +} + +func (r Registry) WithTags(_ map[string]string) metrics.Registry { + return Registry{} +} + +func (r Registry) WithPrefix(_ string) metrics.Registry { + return Registry{} +} + +func (r Registry) Counter(_ string) metrics.Counter { + return Counter{} +} + +func (r Registry) FuncCounter(_ string, function func() int64) metrics.FuncCounter { + return FuncCounter{function: function} +} + +func (r Registry) Gauge(_ string) metrics.Gauge { + return Gauge{} +} + +func (r Registry) FuncGauge(_ string, function func() float64) metrics.FuncGauge { + return FuncGauge{function: function} +} + +func (r Registry) IntGauge(_ string) metrics.IntGauge { + return IntGauge{} +} + +func (r Registry) FuncIntGauge(_ string, function func() int64) metrics.FuncIntGauge { + return FuncIntGauge{function: function} +} + +func (r Registry) Timer(_ string) metrics.Timer { + return Timer{} +} + +func (r Registry) Histogram(_ string, _ metrics.Buckets) metrics.Histogram { + return Histogram{} +} + +func (r Registry) DurationHistogram(_ string, _ metrics.DurationBuckets) metrics.Timer { + return Histogram{} +} + +func (r Registry) CounterVec(_ string, _ []string) metrics.CounterVec { + return CounterVec{} +} + +func (r Registry) GaugeVec(_ string, _ []string) metrics.GaugeVec { + return GaugeVec{} +} + +func (r Registry) IntGaugeVec(_ string, _ []string) metrics.IntGaugeVec { + return IntGaugeVec{} +} + +func (r Registry) TimerVec(_ string, _ []string) metrics.TimerVec { + return TimerVec{} +} + +func (r Registry) HistogramVec(_ string, _ metrics.Buckets, _ []string) metrics.HistogramVec { + return HistogramVec{} +} + +func (r Registry) DurationHistogramVec(_ string, _ metrics.DurationBuckets, _ []string) metrics.TimerVec { + return DurationHistogramVec{} +} diff --git a/library/go/core/metrics/nop/timer.go b/library/go/core/metrics/nop/timer.go new file mode 100644 index 0000000000..61906032a2 --- /dev/null +++ b/library/go/core/metrics/nop/timer.go @@ -0,0 +1,23 @@ +package nop + +import ( + "time" + + "github.com/ydb-platform/ydb/library/go/core/metrics" +) + +var _ metrics.Timer = (*Timer)(nil) + +type Timer struct{} + +func (Timer) RecordDuration(_ time.Duration) {} + +var _ metrics.TimerVec = (*TimerVec)(nil) + +type TimerVec struct{} + +func (t TimerVec) With(_ map[string]string) metrics.Timer { + return Timer{} +} + +func (t TimerVec) Reset() {} diff --git a/library/go/core/metrics/nop/ya.make b/library/go/core/metrics/nop/ya.make new file mode 100644 index 0000000000..279bc22ef4 --- /dev/null +++ b/library/go/core/metrics/nop/ya.make @@ -0,0 +1,12 @@ +GO_LIBRARY() + +SRCS( + counter.go + gauge.go + int_gauge.go + histogram.go + registry.go + timer.go +) + +END() diff --git a/library/go/core/metrics/prometheus/counter.go b/library/go/core/metrics/prometheus/counter.go new file mode 100644 index 0000000000..1a07063f30 --- /dev/null +++ b/library/go/core/metrics/prometheus/counter.go @@ -0,0 +1,34 @@ +package prometheus + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/ydb-platform/ydb/library/go/core/metrics" +) + +var _ metrics.Counter = (*Counter)(nil) + +// Counter tracks monotonically increasing value. +type Counter struct { + cnt prometheus.Counter +} + +// Inc increments counter by 1. +func (c Counter) Inc() { + c.cnt.Inc() +} + +// Add adds delta to the counter. Delta must be >=0. +func (c Counter) Add(delta int64) { + c.cnt.Add(float64(delta)) +} + +var _ metrics.FuncCounter = (*FuncCounter)(nil) + +type FuncCounter struct { + cnt prometheus.CounterFunc + function func() int64 +} + +func (c FuncCounter) Function() func() int64 { + return c.function +} diff --git a/library/go/core/metrics/prometheus/counter_test.go b/library/go/core/metrics/prometheus/counter_test.go new file mode 100644 index 0000000000..04f0c894f8 --- /dev/null +++ b/library/go/core/metrics/prometheus/counter_test.go @@ -0,0 +1,38 @@ +package prometheus + +import ( + "testing" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/assert" +) + +func TestCounter_Add(t *testing.T) { + c := &Counter{cnt: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "test_counter_add", + })} + + var expectValue int64 = 42 + c.Add(expectValue) + + var res dto.Metric + err := c.cnt.Write(&res) + + assert.NoError(t, err) + assert.Equal(t, expectValue, int64(res.GetCounter().GetValue())) +} + +func TestCounter_Inc(t *testing.T) { + c := &Counter{cnt: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "test_counter_inc", + })} + + var res dto.Metric + for i := 1; i <= 10; i++ { + c.Inc() + err := c.cnt.Write(&res) + assert.NoError(t, err) + assert.Equal(t, int64(i), int64(res.GetCounter().GetValue())) + } +} diff --git a/library/go/core/metrics/prometheus/gauge.go b/library/go/core/metrics/prometheus/gauge.go new file mode 100644 index 0000000000..8683755561 --- /dev/null +++ b/library/go/core/metrics/prometheus/gauge.go @@ -0,0 +1,32 @@ +package prometheus + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/ydb-platform/ydb/library/go/core/metrics" +) + +var _ metrics.Gauge = (*Gauge)(nil) + +// Gauge tracks single float64 value. +type Gauge struct { + gg prometheus.Gauge +} + +func (g Gauge) Set(value float64) { + g.gg.Set(value) +} + +func (g Gauge) Add(value float64) { + g.gg.Add(value) +} + +var _ metrics.FuncGauge = (*FuncGauge)(nil) + +type FuncGauge struct { + ff prometheus.GaugeFunc + function func() float64 +} + +func (g FuncGauge) Function() func() float64 { + return g.function +} diff --git a/library/go/core/metrics/prometheus/gauge_test.go b/library/go/core/metrics/prometheus/gauge_test.go new file mode 100644 index 0000000000..aebb7586c1 --- /dev/null +++ b/library/go/core/metrics/prometheus/gauge_test.go @@ -0,0 +1,39 @@ +package prometheus + +import ( + "testing" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/assert" +) + +func TestGauge_Add(t *testing.T) { + g := &Gauge{gg: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "test_gauge_add", + })} + + var expectValue float64 = 42 + g.Add(expectValue) + + var res dto.Metric + err := g.gg.Write(&res) + + assert.NoError(t, err) + assert.Equal(t, expectValue, res.GetGauge().GetValue()) +} + +func TestGauge_Set(t *testing.T) { + g := &Gauge{gg: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "test_gauge_set", + })} + + var expectValue float64 = 42 + g.Set(expectValue) + + var res dto.Metric + err := g.gg.Write(&res) + + assert.NoError(t, err) + assert.Equal(t, expectValue, res.GetGauge().GetValue()) +} diff --git a/library/go/core/metrics/prometheus/gotest/ya.make b/library/go/core/metrics/prometheus/gotest/ya.make new file mode 100644 index 0000000000..466256dcaa --- /dev/null +++ b/library/go/core/metrics/prometheus/gotest/ya.make @@ -0,0 +1,3 @@ +GO_TEST_FOR(library/go/core/metrics/prometheus) + +END() diff --git a/library/go/core/metrics/prometheus/histogram.go b/library/go/core/metrics/prometheus/histogram.go new file mode 100644 index 0000000000..bd5e0dca66 --- /dev/null +++ b/library/go/core/metrics/prometheus/histogram.go @@ -0,0 +1,22 @@ +package prometheus + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/ydb-platform/ydb/library/go/core/metrics" +) + +var _ metrics.Histogram = (*Histogram)(nil) + +type Histogram struct { + hm prometheus.Observer +} + +func (h Histogram) RecordValue(value float64) { + h.hm.Observe(value) +} + +func (h Histogram) RecordDuration(value time.Duration) { + h.hm.Observe(value.Seconds()) +} diff --git a/library/go/core/metrics/prometheus/histogram_test.go b/library/go/core/metrics/prometheus/histogram_test.go new file mode 100644 index 0000000000..0dec46589c --- /dev/null +++ b/library/go/core/metrics/prometheus/histogram_test.go @@ -0,0 +1,91 @@ +package prometheus + +import ( + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/ydb-platform/ydb/library/go/core/metrics" + "github.com/ydb-platform/ydb/library/go/ptr" + "google.golang.org/protobuf/testing/protocmp" +) + +func TestHistogram_RecordValue(t *testing.T) { + rg := NewRegistry(NewRegistryOpts()) + + h := rg.Histogram("test_histogram_record_value", + metrics.NewBuckets(0.1, 1.0, 15.47, 42.0, 128.256), + ) + + for _, value := range []float64{0.5, 0.7, 34.1234, 127} { + h.RecordValue(value) + } + + expectBuckets := []*dto.Bucket{ + {CumulativeCount: ptr.Uint64(0), UpperBound: ptr.Float64(0.1)}, + {CumulativeCount: ptr.Uint64(2), UpperBound: ptr.Float64(1.0)}, + {CumulativeCount: ptr.Uint64(2), UpperBound: ptr.Float64(15.47)}, + {CumulativeCount: ptr.Uint64(3), UpperBound: ptr.Float64(42.0)}, + {CumulativeCount: ptr.Uint64(4), UpperBound: ptr.Float64(128.256)}, + } + + gathered, err := rg.Gather() + require.NoError(t, err) + + resBuckets := gathered[0].Metric[0].GetHistogram().GetBucket() + + cmpOpts := []cmp.Option{ + cmpopts.IgnoreUnexported(), + protocmp.Transform(), + } + assert.True(t, cmp.Equal(expectBuckets, resBuckets, cmpOpts...), cmp.Diff(expectBuckets, resBuckets, cmpOpts...)) +} + +func TestDurationHistogram_RecordDuration(t *testing.T) { + rg := NewRegistry(NewRegistryOpts()) + + ht := rg.DurationHistogram("test_histogram_record_value", + metrics.NewDurationBuckets( + 1*time.Millisecond, // 0.1 + 1*time.Second, // 1.0 + 15*time.Second+470*time.Millisecond, // 15.47 + 42*time.Second, // 42.0 + 128*time.Second+256*time.Millisecond, // 128.256 + ), + ) + + values := []time.Duration{ + 500 * time.Millisecond, + 700 * time.Millisecond, + 34*time.Second + 1234*time.Millisecond, + 127 * time.Second, + } + + for _, value := range values { + ht.RecordDuration(value) + } + + expectBuckets := []*dto.Bucket{ + {CumulativeCount: ptr.Uint64(0), UpperBound: ptr.Float64(0.001)}, + {CumulativeCount: ptr.Uint64(2), UpperBound: ptr.Float64(1)}, + {CumulativeCount: ptr.Uint64(2), UpperBound: ptr.Float64(15.47)}, + {CumulativeCount: ptr.Uint64(3), UpperBound: ptr.Float64(42)}, + {CumulativeCount: ptr.Uint64(4), UpperBound: ptr.Float64(128.256)}, + } + + gathered, err := rg.Gather() + require.NoError(t, err) + + resBuckets := gathered[0].Metric[0].GetHistogram().GetBucket() + + cmpOpts := []cmp.Option{ + cmpopts.IgnoreUnexported(), + protocmp.Transform(), + } + + assert.True(t, cmp.Equal(expectBuckets, resBuckets, cmpOpts...), cmp.Diff(expectBuckets, resBuckets, cmpOpts...)) +} diff --git a/library/go/core/metrics/prometheus/int_gauge.go b/library/go/core/metrics/prometheus/int_gauge.go new file mode 100644 index 0000000000..813b87828c --- /dev/null +++ b/library/go/core/metrics/prometheus/int_gauge.go @@ -0,0 +1,32 @@ +package prometheus + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/ydb-platform/ydb/library/go/core/metrics" +) + +var _ metrics.IntGauge = (*IntGauge)(nil) + +// IntGauge tracks single int64 value. +type IntGauge struct { + metrics.Gauge +} + +func (i IntGauge) Set(value int64) { + i.Gauge.Set(float64(value)) +} + +func (i IntGauge) Add(value int64) { + i.Gauge.Add(float64(value)) +} + +var _ metrics.FuncIntGauge = (*FuncIntGauge)(nil) + +type FuncIntGauge struct { + ff prometheus.GaugeFunc + function func() int64 +} + +func (g FuncIntGauge) Function() func() int64 { + return g.function +} diff --git a/library/go/core/metrics/prometheus/registry.go b/library/go/core/metrics/prometheus/registry.go new file mode 100644 index 0000000000..bad45fe617 --- /dev/null +++ b/library/go/core/metrics/prometheus/registry.go @@ -0,0 +1,254 @@ +package prometheus + +import ( + "sync" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/ydb-platform/ydb/library/go/core/metrics" + "github.com/ydb-platform/ydb/library/go/core/metrics/internal/pkg/metricsutil" + "github.com/ydb-platform/ydb/library/go/core/metrics/internal/pkg/registryutil" + "github.com/ydb-platform/ydb/library/go/core/xerrors" +) + +var _ metrics.Registry = (*Registry)(nil) + +type Registry struct { + rg *prometheus.Registry + + m *sync.Mutex + subregistries map[string]*Registry + + tags map[string]string + prefix string + nameSanitizer func(string) string +} + +// NewRegistry creates new Prometheus backed registry. +func NewRegistry(opts *RegistryOpts) *Registry { + r := &Registry{ + rg: prometheus.NewRegistry(), + m: new(sync.Mutex), + subregistries: make(map[string]*Registry), + tags: make(map[string]string), + } + + if opts != nil { + r.prefix = opts.Prefix + r.tags = opts.Tags + if opts.rg != nil { + r.rg = opts.rg + } + for _, collector := range opts.Collectors { + collector(r) + } + if opts.NameSanitizer != nil { + r.nameSanitizer = opts.NameSanitizer + } + } + + return r +} + +// WithTags creates new sub-scope, where each metric has tags attached to it. +func (r Registry) WithTags(tags map[string]string) metrics.Registry { + return r.newSubregistry(r.prefix, registryutil.MergeTags(r.tags, tags)) +} + +// WithPrefix creates new sub-scope, where each metric has prefix added to it name. +func (r Registry) WithPrefix(prefix string) metrics.Registry { + return r.newSubregistry(registryutil.BuildFQName("_", r.prefix, prefix), r.tags) +} + +// ComposeName builds FQ name with appropriate separator. +func (r Registry) ComposeName(parts ...string) string { + return registryutil.BuildFQName("_", parts...) +} + +func (r Registry) Counter(name string) metrics.Counter { + name = r.sanitizeName(name) + cnt := prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: r.prefix, + Name: name, + ConstLabels: r.tags, + }) + + if err := r.rg.Register(cnt); err != nil { + var existErr prometheus.AlreadyRegisteredError + if xerrors.As(err, &existErr) { + return &Counter{cnt: existErr.ExistingCollector.(prometheus.Counter)} + } + panic(err) + } + + return &Counter{cnt: cnt} +} + +func (r Registry) FuncCounter(name string, function func() int64) metrics.FuncCounter { + name = r.sanitizeName(name) + cnt := prometheus.NewCounterFunc(prometheus.CounterOpts{ + Namespace: r.prefix, + Name: name, + ConstLabels: r.tags, + }, func() float64 { + return float64(function()) + }) + + if err := r.rg.Register(cnt); err != nil { + panic(err) + } + + return &FuncCounter{ + cnt: cnt, + function: function, + } +} + +func (r Registry) Gauge(name string) metrics.Gauge { + name = r.sanitizeName(name) + gg := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: r.prefix, + Name: name, + ConstLabels: r.tags, + }) + + if err := r.rg.Register(gg); err != nil { + var existErr prometheus.AlreadyRegisteredError + if xerrors.As(err, &existErr) { + return &Gauge{gg: existErr.ExistingCollector.(prometheus.Gauge)} + } + panic(err) + } + + return &Gauge{gg: gg} +} + +func (r Registry) FuncGauge(name string, function func() float64) metrics.FuncGauge { + name = r.sanitizeName(name) + ff := prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: r.prefix, + Name: name, + ConstLabels: r.tags, + }, function) + if err := r.rg.Register(ff); err != nil { + panic(err) + } + return &FuncGauge{ + ff: ff, + function: function, + } +} + +func (r Registry) IntGauge(name string) metrics.IntGauge { + return &IntGauge{Gauge: r.Gauge(name)} +} + +func (r Registry) FuncIntGauge(name string, function func() int64) metrics.FuncIntGauge { + name = r.sanitizeName(name) + ff := prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: r.prefix, + Name: name, + ConstLabels: r.tags, + }, func() float64 { return float64(function()) }) + if err := r.rg.Register(ff); err != nil { + panic(err) + } + return &FuncIntGauge{ + ff: ff, + function: function, + } +} + +func (r Registry) Timer(name string) metrics.Timer { + name = r.sanitizeName(name) + gg := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: r.prefix, + Name: name, + ConstLabels: r.tags, + }) + + if err := r.rg.Register(gg); err != nil { + var existErr prometheus.AlreadyRegisteredError + if xerrors.As(err, &existErr) { + return &Timer{gg: existErr.ExistingCollector.(prometheus.Gauge)} + } + panic(err) + } + + return &Timer{gg: gg} +} + +func (r Registry) Histogram(name string, buckets metrics.Buckets) metrics.Histogram { + name = r.sanitizeName(name) + hm := prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: r.prefix, + Name: name, + ConstLabels: r.tags, + Buckets: metricsutil.BucketsBounds(buckets), + }) + + if err := r.rg.Register(hm); err != nil { + var existErr prometheus.AlreadyRegisteredError + if xerrors.As(err, &existErr) { + return &Histogram{hm: existErr.ExistingCollector.(prometheus.Observer)} + } + panic(err) + } + + return &Histogram{hm: hm} +} + +func (r Registry) DurationHistogram(name string, buckets metrics.DurationBuckets) metrics.Timer { + name = r.sanitizeName(name) + hm := prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: r.prefix, + Name: name, + ConstLabels: r.tags, + Buckets: metricsutil.DurationBucketsBounds(buckets), + }) + + if err := r.rg.Register(hm); err != nil { + var existErr prometheus.AlreadyRegisteredError + if xerrors.As(err, &existErr) { + return &Histogram{hm: existErr.ExistingCollector.(prometheus.Histogram)} + } + panic(err) + } + + return &Histogram{hm: hm} +} + +// Gather returns raw collected Prometheus metrics. +func (r Registry) Gather() ([]*dto.MetricFamily, error) { + return r.rg.Gather() +} + +func (r *Registry) newSubregistry(prefix string, tags map[string]string) *Registry { + registryKey := registryutil.BuildRegistryKey(prefix, tags) + + r.m.Lock() + defer r.m.Unlock() + + if old, ok := r.subregistries[registryKey]; ok { + return old + } + + subregistry := &Registry{ + rg: r.rg, + m: r.m, + subregistries: r.subregistries, + tags: tags, + prefix: prefix, + nameSanitizer: r.nameSanitizer, + } + + r.subregistries[registryKey] = subregistry + return subregistry +} + +func (r *Registry) sanitizeName(name string) string { + if r.nameSanitizer == nil { + return name + } + return r.nameSanitizer(name) +} diff --git a/library/go/core/metrics/prometheus/registry_opts.go b/library/go/core/metrics/prometheus/registry_opts.go new file mode 100644 index 0000000000..fedb019d85 --- /dev/null +++ b/library/go/core/metrics/prometheus/registry_opts.go @@ -0,0 +1,84 @@ +package prometheus + +import ( + "context" + + "github.com/prometheus/client_golang/prometheus" + "github.com/ydb-platform/ydb/library/go/core/metrics" + "github.com/ydb-platform/ydb/library/go/core/metrics/collect" + "github.com/ydb-platform/ydb/library/go/core/metrics/internal/pkg/registryutil" +) + +type RegistryOpts struct { + Prefix string + Tags map[string]string + rg *prometheus.Registry + Collectors []func(metrics.Registry) + NameSanitizer func(string) string +} + +// NewRegistryOpts returns new initialized instance of RegistryOpts. +func NewRegistryOpts() *RegistryOpts { + return &RegistryOpts{ + Tags: make(map[string]string), + } +} + +// SetTags overrides existing tags. +func (o *RegistryOpts) SetTags(tags map[string]string) *RegistryOpts { + o.Tags = tags + return o +} + +// AddTags merges given tags with existing. +func (o *RegistryOpts) AddTags(tags map[string]string) *RegistryOpts { + for k, v := range tags { + o.Tags[k] = v + } + return o +} + +// SetPrefix overrides existing prefix. +func (o *RegistryOpts) SetPrefix(prefix string) *RegistryOpts { + o.Prefix = prefix + return o +} + +// AppendPrefix adds given prefix as postfix to existing using separator. +func (o *RegistryOpts) AppendPrefix(prefix string) *RegistryOpts { + o.Prefix = registryutil.BuildFQName("_", o.Prefix, prefix) + return o +} + +// SetRegistry sets the given prometheus registry for further usage instead +// of creating a new one. +// +// This is primarily used to unite externally defined metrics with metrics kept +// in the core registry. +func (o *RegistryOpts) SetRegistry(rg *prometheus.Registry) *RegistryOpts { + o.rg = rg + return o +} + +// AddCollectors adds collectors that handle their metrics automatically (e.g. system metrics). +func (o *RegistryOpts) AddCollectors( + ctx context.Context, c metrics.CollectPolicy, collectors ...collect.Func, +) *RegistryOpts { + if len(collectors) == 0 { + return o + } + + o.Collectors = append(o.Collectors, func(r metrics.Registry) { + for _, collector := range collectors { + collector(ctx, r, c) + } + }) + return o +} + +// SetNameSanitizer sets a functions which will be called for each metric's name. +// It allows to alter names, for example to replace invalid characters +func (o *RegistryOpts) SetNameSanitizer(v func(string) string) *RegistryOpts { + o.NameSanitizer = v + return o +} diff --git a/library/go/core/metrics/prometheus/registry_test.go b/library/go/core/metrics/prometheus/registry_test.go new file mode 100644 index 0000000000..73d071a8de --- /dev/null +++ b/library/go/core/metrics/prometheus/registry_test.go @@ -0,0 +1,481 @@ +package prometheus + +import ( + "strings" + "sync" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/assert" + "github.com/ydb-platform/ydb/library/go/core/metrics" + "github.com/ydb-platform/ydb/library/go/ptr" + "github.com/ydb-platform/ydb/library/go/test/assertpb" + "google.golang.org/protobuf/testing/protocmp" +) + +func TestNewRegistry(t *testing.T) { + expected := &Registry{ + rg: prometheus.NewRegistry(), + m: new(sync.Mutex), + subregistries: make(map[string]*Registry), + tags: map[string]string{}, + prefix: "", + } + + r := NewRegistry(nil) + assert.IsType(t, expected, r) + assert.Equal(t, expected, r) +} + +func TestRegistry_Subregisters(t *testing.T) { + r := NewRegistry(nil) + sr1 := r.WithPrefix("subregister1"). + WithTags(map[string]string{"ololo": "trololo"}) + sr2 := sr1.WithPrefix("subregister2"). + WithTags(map[string]string{"shimba": "boomba"}) + + // check global subregistries map + expectedMap := map[string]*Registry{ + "\"subregister1\"{}": { + rg: r.rg, + m: r.m, + subregistries: r.subregistries, + prefix: "subregister1", + tags: make(map[string]string), + }, + "\"subregister1\"{ololo=trololo}": { + rg: r.rg, + m: r.m, + subregistries: r.subregistries, + tags: map[string]string{"ololo": "trololo"}, + prefix: "subregister1", + }, + "\"subregister1_subregister2\"{ololo=trololo}": { + rg: r.rg, + m: r.m, + subregistries: r.subregistries, + tags: map[string]string{"ololo": "trololo"}, + prefix: "subregister1_subregister2", + }, + "\"subregister1_subregister2\"{ololo=trololo,shimba=boomba}": { + rg: r.rg, + m: r.m, + subregistries: r.subregistries, + tags: map[string]string{"ololo": "trololo", "shimba": "boomba"}, + prefix: "subregister1_subregister2", + }, + } + + assert.EqualValues(t, expectedMap, r.subregistries) + + // top-register write + rCnt := r.Counter("subregisters_count") + rCnt.Add(2) + + // sub-register write + srTm := sr1.Timer("mytimer") + srTm.RecordDuration(42 * time.Second) + + // sub-sub-register write + srHm := sr2.Histogram("myhistogram", metrics.NewBuckets(1, 2, 3)) + srHm.RecordValue(1.5) + + mr, err := r.Gather() + assert.NoError(t, err) + + assert.IsType(t, mr, []*dto.MetricFamily{}) + + expected := []*dto.MetricFamily{ + { + Name: ptr.String("subregister1_mytimer"), + Help: ptr.String(""), + Type: func(mt dto.MetricType) *dto.MetricType { return &mt }(dto.MetricType_GAUGE), + Metric: []*dto.Metric{ + { + Label: []*dto.LabelPair{ + {Name: ptr.String("ololo"), Value: ptr.String("trololo")}, + }, + Gauge: &dto.Gauge{Value: ptr.Float64(42)}, + }, + }, + }, + { + Name: ptr.String("subregister1_subregister2_myhistogram"), + Help: ptr.String(""), + Type: func(mt dto.MetricType) *dto.MetricType { return &mt }(dto.MetricType_HISTOGRAM), + Metric: []*dto.Metric{ + { + Label: []*dto.LabelPair{ + {Name: ptr.String("ololo"), Value: ptr.String("trololo")}, + {Name: ptr.String("shimba"), Value: ptr.String("boomba")}, + }, + Histogram: &dto.Histogram{ + SampleCount: ptr.Uint64(1), + SampleSum: ptr.Float64(1.5), + Bucket: []*dto.Bucket{ + {CumulativeCount: ptr.Uint64(0), UpperBound: ptr.Float64(1)}, + {CumulativeCount: ptr.Uint64(1), UpperBound: ptr.Float64(2)}, + {CumulativeCount: ptr.Uint64(1), UpperBound: ptr.Float64(3)}, + }, + }, + }, + }, + }, + { + Name: ptr.String("subregisters_count"), + Help: ptr.String(""), + Type: func(mt dto.MetricType) *dto.MetricType { return &mt }(dto.MetricType_COUNTER), + Metric: []*dto.Metric{ + { + Label: []*dto.LabelPair{}, + Counter: &dto.Counter{Value: ptr.Float64(2)}, + }, + }, + }, + } + + cmpOpts := []cmp.Option{ + protocmp.Transform(), + } + assert.True(t, cmp.Equal(expected, mr, cmpOpts...), cmp.Diff(expected, mr, cmpOpts...)) +} + +func TestRegistry_Counter(t *testing.T) { + r := NewRegistry(nil) + sr := r.WithPrefix("myprefix"). + WithTags(map[string]string{"ololo": "trololo"}) + + // must panic on empty name + assert.Panics(t, func() { r.Counter("") }) + + srCnt := sr.Counter("mycounter") + srCnt.Add(42) + + mr, err := r.Gather() + assert.NoError(t, err) + + assert.IsType(t, mr, []*dto.MetricFamily{}) + + expected := []*dto.MetricFamily{ + { + Name: ptr.String("myprefix_mycounter"), + Help: ptr.String(""), + Type: func(mt dto.MetricType) *dto.MetricType { return &mt }(dto.MetricType_COUNTER), + Metric: []*dto.Metric{ + { + Label: []*dto.LabelPair{ + {Name: ptr.String("ololo"), Value: ptr.String("trololo")}, + }, + Counter: &dto.Counter{Value: ptr.Float64(42)}, + }, + }, + }, + } + cmpOpts := []cmp.Option{ + protocmp.Transform(), + } + assert.True(t, cmp.Equal(expected, mr, cmpOpts...), cmp.Diff(expected, mr, cmpOpts...)) +} + +func TestRegistry_DurationHistogram(t *testing.T) { + r := NewRegistry(nil) + sr := r.WithPrefix("myprefix"). + WithTags(map[string]string{"ololo": "trololo"}) + + // must panic on empty name + assert.Panics(t, func() { r.DurationHistogram("", nil) }) + + cnt := sr.DurationHistogram("myhistogram", metrics.NewDurationBuckets( + 1*time.Second, 3*time.Second, 5*time.Second, + )) + + cnt.RecordDuration(2 * time.Second) + cnt.RecordDuration(4 * time.Second) + + mr, err := r.Gather() + assert.NoError(t, err) + + assert.IsType(t, mr, []*dto.MetricFamily{}) + + expected := []*dto.MetricFamily{ + { + Name: ptr.String("myprefix_myhistogram"), + Help: ptr.String(""), + Type: func(mt dto.MetricType) *dto.MetricType { return &mt }(dto.MetricType_HISTOGRAM), + Metric: []*dto.Metric{ + { + Label: []*dto.LabelPair{{Name: ptr.String("ololo"), Value: ptr.String("trololo")}}, + Histogram: &dto.Histogram{ + SampleCount: ptr.Uint64(2), + SampleSum: ptr.Float64(6), + Bucket: []*dto.Bucket{ + {CumulativeCount: ptr.Uint64(0), UpperBound: ptr.Float64(1)}, + {CumulativeCount: ptr.Uint64(1), UpperBound: ptr.Float64(3)}, + {CumulativeCount: ptr.Uint64(2), UpperBound: ptr.Float64(5)}, + }, + }, + }, + }, + }, + } + assertpb.Equal(t, expected, mr) +} + +func TestRegistry_Gauge(t *testing.T) { + r := NewRegistry(nil) + sr := r.WithPrefix("myprefix"). + WithTags(map[string]string{"ololo": "trololo"}) + + // must panic on empty name + assert.Panics(t, func() { r.Gauge("") }) + + cnt := sr.Gauge("mygauge") + cnt.Add(42) + + mr, err := r.Gather() + assert.NoError(t, err) + + assert.IsType(t, mr, []*dto.MetricFamily{}) + + expected := []*dto.MetricFamily{ + { + Name: ptr.String("myprefix_mygauge"), + Help: ptr.String(""), + Type: func(mt dto.MetricType) *dto.MetricType { return &mt }(dto.MetricType_GAUGE), + Metric: []*dto.Metric{ + { + Label: []*dto.LabelPair{{Name: ptr.String("ololo"), Value: ptr.String("trololo")}}, + Gauge: &dto.Gauge{Value: ptr.Float64(42)}, + }, + }, + }, + } + assertpb.Equal(t, expected, mr) +} + +func TestRegistry_Histogram(t *testing.T) { + r := NewRegistry(nil) + sr := r.WithPrefix("myprefix"). + WithTags(map[string]string{"ololo": "trololo"}) + + // must panic on empty name + assert.Panics(t, func() { r.Histogram("", nil) }) + + cnt := sr.Histogram("myhistogram", metrics.NewBuckets(1, 3, 5)) + + cnt.RecordValue(2) + cnt.RecordValue(4) + + mr, err := r.Gather() + assert.NoError(t, err) + + assert.IsType(t, mr, []*dto.MetricFamily{}) + + expected := []*dto.MetricFamily{ + { + Name: ptr.String("myprefix_myhistogram"), + Help: ptr.String(""), + Type: func(mt dto.MetricType) *dto.MetricType { return &mt }(dto.MetricType_HISTOGRAM), + Metric: []*dto.Metric{ + { + Label: []*dto.LabelPair{{Name: ptr.String("ololo"), Value: ptr.String("trololo")}}, + Histogram: &dto.Histogram{ + SampleCount: ptr.Uint64(2), + SampleSum: ptr.Float64(6), + Bucket: []*dto.Bucket{ + {CumulativeCount: ptr.Uint64(0), UpperBound: ptr.Float64(1)}, + {CumulativeCount: ptr.Uint64(1), UpperBound: ptr.Float64(3)}, + {CumulativeCount: ptr.Uint64(2), UpperBound: ptr.Float64(5)}, + }, + }, + }, + }, + }, + } + assertpb.Equal(t, expected, mr) +} + +func TestRegistry_Timer(t *testing.T) { + r := NewRegistry(nil) + sr := r.WithPrefix("myprefix"). + WithTags(map[string]string{"ololo": "trololo"}) + + // must panic on empty name + assert.Panics(t, func() { r.Timer("") }) + + cnt := sr.Timer("mytimer") + cnt.RecordDuration(42 * time.Second) + + mr, err := r.Gather() + assert.NoError(t, err) + + assert.IsType(t, mr, []*dto.MetricFamily{}) + + expected := []*dto.MetricFamily{ + { + Name: ptr.String("myprefix_mytimer"), + Help: ptr.String(""), + Type: func(mt dto.MetricType) *dto.MetricType { return &mt }(dto.MetricType_GAUGE), + Metric: []*dto.Metric{ + { + Label: []*dto.LabelPair{{Name: ptr.String("ololo"), Value: ptr.String("trololo")}}, + Gauge: &dto.Gauge{Value: ptr.Float64(42)}, + }, + }, + }, + } + assertpb.Equal(t, expected, mr) +} + +func TestRegistry_WithPrefix(t *testing.T) { + testCases := []struct { + r metrics.Registry + expected string + }{ + { + r: func() metrics.Registry { + return NewRegistry(nil) + }(), + expected: "", + }, + { + r: func() metrics.Registry { + return NewRegistry(nil).WithPrefix("myprefix") + }(), + expected: "myprefix", + }, + { + r: func() metrics.Registry { + return NewRegistry(nil).WithPrefix("__myprefix_") + }(), + expected: "myprefix", + }, + { + r: func() metrics.Registry { + return NewRegistry(nil).WithPrefix("__myprefix_").WithPrefix("mysubprefix______") + }(), + expected: "myprefix_mysubprefix", + }, + } + + for _, tc := range testCases { + t.Run("", func(t *testing.T) { + assert.Equal(t, tc.expected, tc.r.(*Registry).prefix) + }) + } +} + +func TestRegistry_WithTags(t *testing.T) { + testCases := []struct { + r metrics.Registry + expected map[string]string + }{ + { + r: func() metrics.Registry { + return NewRegistry(nil) + }(), + expected: map[string]string{}, + }, + { + r: func() metrics.Registry { + return NewRegistry(nil).WithTags(map[string]string{"shimba": "boomba"}) + }(), + expected: map[string]string{"shimba": "boomba"}, + }, + { + r: func() metrics.Registry { + return NewRegistry(nil). + WithTags(map[string]string{"shimba": "boomba"}). + WithTags(map[string]string{"looken": "tooken"}) + }(), + expected: map[string]string{ + "shimba": "boomba", + "looken": "tooken", + }, + }, + { + r: func() metrics.Registry { + return NewRegistry(nil). + WithTags(map[string]string{"shimba": "boomba"}). + WithTags(map[string]string{"looken": "tooken"}). + WithTags(map[string]string{"shimba": "cooken"}) + }(), + expected: map[string]string{ + "shimba": "cooken", + "looken": "tooken", + }, + }, + } + + for _, tc := range testCases { + t.Run("", func(t *testing.T) { + assert.Equal(t, tc.expected, tc.r.(*Registry).tags) + }) + } +} + +func TestRegistry_WithTags_NoPanic(t *testing.T) { + _ = NewRegistry(nil).WithTags(map[string]string{"foo": "bar"}) + _ = NewRegistry(nil).WithTags(map[string]string{"foo": "bar"}) +} + +func TestRegistry_Counter_NoPanic(t *testing.T) { + r := NewRegistry(nil) + sr := r.WithPrefix("myprefix"). + WithTags(map[string]string{"ololo": "trololo"}) + cntrRaz := sr.Counter("mycounter").(*Counter) + cntrDvaz := sr.Counter("mycounter").(*Counter) + assert.Equal(t, cntrRaz.cnt, cntrDvaz.cnt) + cntrRaz.Add(100) + cntrDvaz.Add(100) + mr, err := r.Gather() + assert.NoError(t, err) + expected := []*dto.MetricFamily{ + { + Name: ptr.String("myprefix_mycounter"), + Help: ptr.String(""), + Type: func(mt dto.MetricType) *dto.MetricType { return &mt }(dto.MetricType_COUNTER), + Metric: []*dto.Metric{ + { + Label: []*dto.LabelPair{{Name: ptr.String("ololo"), Value: ptr.String("trololo")}}, + Counter: &dto.Counter{Value: ptr.Float64(200)}, + }, + }, + }, + } + assertpb.Equal(t, expected, mr) +} + +func TestRegistry_NameSanitizer(t *testing.T) { + testCases := []struct { + opts *RegistryOpts + name string + want string + }{ + { + opts: nil, + name: "some_name", + want: "some_name", + }, + { + opts: NewRegistryOpts().SetNameSanitizer(func(s string) string { + return strings.ReplaceAll(s, "/", "_") + }), + name: "other/name", + want: "other_name", + }, + } + + for _, tc := range testCases { + r := NewRegistry(tc.opts) + _ = r.Counter(tc.name) + mfs, err := r.Gather() + assert.NoError(t, err) + assert.NotEmpty(t, mfs) + + assert.Equal(t, tc.want, *mfs[0].Name) + } +} diff --git a/library/go/core/metrics/prometheus/timer.go b/library/go/core/metrics/prometheus/timer.go new file mode 100644 index 0000000000..3350e5a61d --- /dev/null +++ b/library/go/core/metrics/prometheus/timer.go @@ -0,0 +1,19 @@ +package prometheus + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/ydb-platform/ydb/library/go/core/metrics" +) + +var _ metrics.Timer = (*Timer)(nil) + +// Timer measures gauge duration. +type Timer struct { + gg prometheus.Gauge +} + +func (t Timer) RecordDuration(value time.Duration) { + t.gg.Set(value.Seconds()) +} diff --git a/library/go/core/metrics/prometheus/timer_test.go b/library/go/core/metrics/prometheus/timer_test.go new file mode 100644 index 0000000000..a520b6f477 --- /dev/null +++ b/library/go/core/metrics/prometheus/timer_test.go @@ -0,0 +1,24 @@ +package prometheus + +import ( + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/assert" +) + +func TestTimer_RecordDuration(t *testing.T) { + g := &Timer{gg: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "test_timer_record_duration", + })} + + g.RecordDuration(42 * time.Second) + + var res dto.Metric + err := g.gg.Write(&res) + + assert.NoError(t, err) + assert.Equal(t, float64(42), res.GetGauge().GetValue()) +} diff --git a/library/go/core/metrics/prometheus/vec.go b/library/go/core/metrics/prometheus/vec.go new file mode 100644 index 0000000000..731c7b752a --- /dev/null +++ b/library/go/core/metrics/prometheus/vec.go @@ -0,0 +1,248 @@ +package prometheus + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/ydb-platform/ydb/library/go/core/metrics" + "github.com/ydb-platform/ydb/library/go/core/metrics/internal/pkg/metricsutil" + "github.com/ydb-platform/ydb/library/go/core/xerrors" +) + +var _ metrics.CounterVec = (*CounterVec)(nil) + +// CounterVec wraps prometheus.CounterVec +// and implements metrics.CounterVec interface. +type CounterVec struct { + vec *prometheus.CounterVec +} + +// CounterVec creates a new counters vector with given metric name and +// partitioned by the given label names. +func (r *Registry) CounterVec(name string, labels []string) metrics.CounterVec { + name = r.sanitizeName(name) + vec := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: r.prefix, + Name: name, + ConstLabels: r.tags, + }, labels) + + if err := r.rg.Register(vec); err != nil { + var existErr prometheus.AlreadyRegisteredError + if xerrors.As(err, &existErr) { + return &CounterVec{vec: existErr.ExistingCollector.(*prometheus.CounterVec)} + } + panic(err) + } + + return &CounterVec{vec: vec} +} + +// With creates new or returns existing counter with given tags from vector. +// It will panic if tags keys set is not equal to vector labels. +func (v *CounterVec) With(tags map[string]string) metrics.Counter { + return &Counter{cnt: v.vec.With(tags)} +} + +// Reset deletes all metrics in this vector. +func (v *CounterVec) Reset() { + v.vec.Reset() +} + +var _ metrics.GaugeVec = (*GaugeVec)(nil) + +// GaugeVec wraps prometheus.GaugeVec +// and implements metrics.GaugeVec interface. +type GaugeVec struct { + vec *prometheus.GaugeVec +} + +// GaugeVec creates a new gauges vector with given metric name and +// partitioned by the given label names. +func (r *Registry) GaugeVec(name string, labels []string) metrics.GaugeVec { + name = r.sanitizeName(name) + vec := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: r.prefix, + Name: name, + ConstLabels: r.tags, + }, labels) + + if err := r.rg.Register(vec); err != nil { + var existErr prometheus.AlreadyRegisteredError + if xerrors.As(err, &existErr) { + return &GaugeVec{vec: existErr.ExistingCollector.(*prometheus.GaugeVec)} + } + panic(err) + } + + return &GaugeVec{vec: vec} +} + +// With creates new or returns existing gauge with given tags from vector. +// It will panic if tags keys set is not equal to vector labels. +func (v *GaugeVec) With(tags map[string]string) metrics.Gauge { + return &Gauge{gg: v.vec.With(tags)} +} + +// Reset deletes all metrics in this vector. +func (v *GaugeVec) Reset() { + v.vec.Reset() +} + +// IntGaugeVec wraps prometheus.GaugeVec +// and implements metrics.IntGaugeVec interface. +type IntGaugeVec struct { + vec *prometheus.GaugeVec +} + +// IntGaugeVec creates a new gauges vector with given metric name and +// partitioned by the given label names. +func (r *Registry) IntGaugeVec(name string, labels []string) metrics.IntGaugeVec { + name = r.sanitizeName(name) + vec := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: r.prefix, + Name: name, + ConstLabels: r.tags, + }, labels) + + if err := r.rg.Register(vec); err != nil { + var existErr prometheus.AlreadyRegisteredError + if xerrors.As(err, &existErr) { + return &IntGaugeVec{vec: existErr.ExistingCollector.(*prometheus.GaugeVec)} + } + panic(err) + } + + return &IntGaugeVec{vec: vec} +} + +// With creates new or returns existing gauge with given tags from vector. +// It will panic if tags keys set is not equal to vector labels. +func (v *IntGaugeVec) With(tags map[string]string) metrics.IntGauge { + return &IntGauge{Gauge{gg: v.vec.With(tags)}} +} + +// Reset deletes all metrics in this vector. +func (v *IntGaugeVec) Reset() { + v.vec.Reset() +} + +var _ metrics.TimerVec = (*TimerVec)(nil) + +// TimerVec wraps prometheus.GaugeVec +// and implements metrics.TimerVec interface. +type TimerVec struct { + vec *prometheus.GaugeVec +} + +// TimerVec creates a new timers vector with given metric name and +// partitioned by the given label names. +func (r *Registry) TimerVec(name string, labels []string) metrics.TimerVec { + name = r.sanitizeName(name) + vec := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: r.prefix, + Name: name, + ConstLabels: r.tags, + }, labels) + + if err := r.rg.Register(vec); err != nil { + var existErr prometheus.AlreadyRegisteredError + if xerrors.As(err, &existErr) { + return &TimerVec{vec: existErr.ExistingCollector.(*prometheus.GaugeVec)} + } + panic(err) + } + + return &TimerVec{vec: vec} +} + +// With creates new or returns existing timer with given tags from vector. +// It will panic if tags keys set is not equal to vector labels. +func (v *TimerVec) With(tags map[string]string) metrics.Timer { + return &Timer{gg: v.vec.With(tags)} +} + +// Reset deletes all metrics in this vector. +func (v *TimerVec) Reset() { + v.vec.Reset() +} + +var _ metrics.HistogramVec = (*HistogramVec)(nil) + +// HistogramVec wraps prometheus.HistogramVec +// and implements metrics.HistogramVec interface. +type HistogramVec struct { + vec *prometheus.HistogramVec +} + +// HistogramVec creates a new histograms vector with given metric name and buckets and +// partitioned by the given label names. +func (r *Registry) HistogramVec(name string, buckets metrics.Buckets, labels []string) metrics.HistogramVec { + name = r.sanitizeName(name) + vec := prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: r.prefix, + Name: name, + ConstLabels: r.tags, + Buckets: metricsutil.BucketsBounds(buckets), + }, labels) + + if err := r.rg.Register(vec); err != nil { + var existErr prometheus.AlreadyRegisteredError + if xerrors.As(err, &existErr) { + return &HistogramVec{vec: existErr.ExistingCollector.(*prometheus.HistogramVec)} + } + panic(err) + } + + return &HistogramVec{vec: vec} +} + +// With creates new or returns existing histogram with given tags from vector. +// It will panic if tags keys set is not equal to vector labels. +func (v *HistogramVec) With(tags map[string]string) metrics.Histogram { + return &Histogram{hm: v.vec.With(tags)} +} + +// Reset deletes all metrics in this vector. +func (v *HistogramVec) Reset() { + v.vec.Reset() +} + +var _ metrics.TimerVec = (*DurationHistogramVec)(nil) + +// DurationHistogramVec wraps prometheus.HistogramVec +// and implements metrics.TimerVec interface. +type DurationHistogramVec struct { + vec *prometheus.HistogramVec +} + +// DurationHistogramVec creates a new duration histograms vector with given metric name and buckets and +// partitioned by the given label names. +func (r *Registry) DurationHistogramVec(name string, buckets metrics.DurationBuckets, labels []string) metrics.TimerVec { + name = r.sanitizeName(name) + vec := prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: r.prefix, + Name: name, + ConstLabels: r.tags, + Buckets: metricsutil.DurationBucketsBounds(buckets), + }, labels) + + if err := r.rg.Register(vec); err != nil { + var existErr prometheus.AlreadyRegisteredError + if xerrors.As(err, &existErr) { + return &DurationHistogramVec{vec: existErr.ExistingCollector.(*prometheus.HistogramVec)} + } + panic(err) + } + + return &DurationHistogramVec{vec: vec} +} + +// With creates new or returns existing duration histogram with given tags from vector. +// It will panic if tags keys set is not equal to vector labels. +func (v *DurationHistogramVec) With(tags map[string]string) metrics.Timer { + return &Histogram{hm: v.vec.With(tags)} +} + +// Reset deletes all metrics in this vector. +func (v *DurationHistogramVec) Reset() { + v.vec.Reset() +} diff --git a/library/go/core/metrics/prometheus/vec_test.go b/library/go/core/metrics/prometheus/vec_test.go new file mode 100644 index 0000000000..ccf088c17a --- /dev/null +++ b/library/go/core/metrics/prometheus/vec_test.go @@ -0,0 +1,137 @@ +package prometheus + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/ydb-platform/ydb/library/go/core/metrics" +) + +func TestCounterVec(t *testing.T) { + rg := NewRegistry(NewRegistryOpts()) + vec := rg.CounterVec("ololo", []string{"shimba", "looken"}) + mt := vec.With(map[string]string{ + "shimba": "boomba", + "looken": "tooken", + }) + + assert.IsType(t, &CounterVec{}, vec) + assert.IsType(t, &Counter{}, mt) + + vec.Reset() + + metrics, err := rg.Gather() + assert.NoError(t, err) + assert.Empty(t, metrics) +} + +func TestCounterVec_RegisterAgain(t *testing.T) { + rg := NewRegistry(NewRegistryOpts()) + vec1 := rg.CounterVec("ololo", []string{"shimba", "looken"}).(*CounterVec) + vec2 := rg.CounterVec("ololo", []string{"shimba", "looken"}).(*CounterVec) + assert.Same(t, vec1.vec, vec2.vec) +} + +func TestGaugeVec(t *testing.T) { + rg := NewRegistry(NewRegistryOpts()) + vec := rg.GaugeVec("ololo", []string{"shimba", "looken"}) + mt := vec.With(map[string]string{ + "shimba": "boomba", + "looken": "tooken", + }) + + assert.IsType(t, &GaugeVec{}, vec) + assert.IsType(t, &Gauge{}, mt) + + vec.Reset() + + metrics, err := rg.Gather() + assert.NoError(t, err) + assert.Empty(t, metrics) +} + +func TestGaugeVec_RegisterAgain(t *testing.T) { + rg := NewRegistry(NewRegistryOpts()) + vec1 := rg.GaugeVec("ololo", []string{"shimba", "looken"}).(*GaugeVec) + vec2 := rg.GaugeVec("ololo", []string{"shimba", "looken"}).(*GaugeVec) + assert.Same(t, vec1.vec, vec2.vec) +} + +func TestTimerVec(t *testing.T) { + rg := NewRegistry(NewRegistryOpts()) + vec := rg.TimerVec("ololo", []string{"shimba", "looken"}) + mt := vec.With(map[string]string{ + "shimba": "boomba", + "looken": "tooken", + }) + + assert.IsType(t, &TimerVec{}, vec) + assert.IsType(t, &Timer{}, mt) + + vec.Reset() + + metrics, err := rg.Gather() + assert.NoError(t, err) + assert.Empty(t, metrics) +} + +func TestTimerVec_RegisterAgain(t *testing.T) { + rg := NewRegistry(NewRegistryOpts()) + vec1 := rg.TimerVec("ololo", []string{"shimba", "looken"}).(*TimerVec) + vec2 := rg.TimerVec("ololo", []string{"shimba", "looken"}).(*TimerVec) + assert.Same(t, vec1.vec, vec2.vec) +} + +func TestHistogramVec(t *testing.T) { + rg := NewRegistry(NewRegistryOpts()) + buckets := metrics.NewBuckets(1, 2, 3) + vec := rg.HistogramVec("ololo", buckets, []string{"shimba", "looken"}) + mt := vec.With(map[string]string{ + "shimba": "boomba", + "looken": "tooken", + }) + + assert.IsType(t, &HistogramVec{}, vec) + assert.IsType(t, &Histogram{}, mt) + + vec.Reset() + + metrics, err := rg.Gather() + assert.NoError(t, err) + assert.Empty(t, metrics) +} + +func TestHistogramVec_RegisterAgain(t *testing.T) { + rg := NewRegistry(NewRegistryOpts()) + buckets := metrics.NewBuckets(1, 2, 3) + vec1 := rg.HistogramVec("ololo", buckets, []string{"shimba", "looken"}).(*HistogramVec) + vec2 := rg.HistogramVec("ololo", buckets, []string{"shimba", "looken"}).(*HistogramVec) + assert.Same(t, vec1.vec, vec2.vec) +} + +func TestDurationHistogramVec(t *testing.T) { + rg := NewRegistry(NewRegistryOpts()) + buckets := metrics.NewDurationBuckets(1, 2, 3) + vec := rg.DurationHistogramVec("ololo", buckets, []string{"shimba", "looken"}) + mt := vec.With(map[string]string{ + "shimba": "boomba", + "looken": "tooken", + }) + + assert.IsType(t, &DurationHistogramVec{}, vec) + assert.IsType(t, &Histogram{}, mt) + + vec.Reset() + + metrics, err := rg.Gather() + assert.NoError(t, err) + assert.Empty(t, metrics) +} + +func TestDurationHistogramVec_RegisterAgain(t *testing.T) { + rg := NewRegistry(NewRegistryOpts()) + buckets := metrics.NewDurationBuckets(1, 2, 3) + vec1 := rg.DurationHistogramVec("ololo", buckets, []string{"shimba", "looken"}).(*DurationHistogramVec) + vec2 := rg.DurationHistogramVec("ololo", buckets, []string{"shimba", "looken"}).(*DurationHistogramVec) + assert.Same(t, vec1.vec, vec2.vec) +} diff --git a/library/go/core/metrics/prometheus/ya.make b/library/go/core/metrics/prometheus/ya.make new file mode 100644 index 0000000000..b012835f4b --- /dev/null +++ b/library/go/core/metrics/prometheus/ya.make @@ -0,0 +1,25 @@ +GO_LIBRARY() + +SRCS( + counter.go + gauge.go + int_gauge.go + histogram.go + registry.go + registry_opts.go + timer.go + vec.go +) + +GO_TEST_SRCS( + counter_test.go + gauge_test.go + histogram_test.go + registry_test.go + timer_test.go + vec_test.go +) + +END() + +RECURSE(gotest) diff --git a/library/go/core/metrics/solomon/converter.go b/library/go/core/metrics/solomon/converter.go new file mode 100644 index 0000000000..6976b223ba --- /dev/null +++ b/library/go/core/metrics/solomon/converter.go @@ -0,0 +1,114 @@ +package solomon + +import ( + "fmt" + + dto "github.com/prometheus/client_model/go" + "go.uber.org/atomic" +) + +// PrometheusMetrics converts Prometheus metrics to Solomon metrics. +func PrometheusMetrics(metrics []*dto.MetricFamily) (*Metrics, error) { + s := &Metrics{ + metrics: make([]Metric, 0, len(metrics)), + } + + if len(metrics) == 0 { + return s, nil + } + + for _, mf := range metrics { + if len(mf.Metric) == 0 { + continue + } + + for _, metric := range mf.Metric { + + tags := make(map[string]string, len(metric.Label)) + for _, label := range metric.Label { + tags[label.GetName()] = label.GetValue() + } + + switch *mf.Type { + case dto.MetricType_COUNTER: + s.metrics = append(s.metrics, &Counter{ + name: mf.GetName(), + metricType: typeCounter, + tags: tags, + value: *atomic.NewInt64(int64(metric.Counter.GetValue())), + }) + case dto.MetricType_GAUGE: + s.metrics = append(s.metrics, &Gauge{ + name: mf.GetName(), + metricType: typeGauge, + tags: tags, + value: *atomic.NewFloat64(metric.Gauge.GetValue()), + }) + case dto.MetricType_HISTOGRAM: + bounds := make([]float64, 0, len(metric.Histogram.Bucket)) + values := make([]int64, 0, len(metric.Histogram.Bucket)) + + var prevValuesSum int64 + + for _, bucket := range metric.Histogram.Bucket { + // prometheus uses cumulative buckets where solomon uses instant + bucketValue := int64(bucket.GetCumulativeCount()) + bucketValue -= prevValuesSum + prevValuesSum += bucketValue + + bounds = append(bounds, bucket.GetUpperBound()) + values = append(values, bucketValue) + } + + s.metrics = append(s.metrics, &Histogram{ + name: mf.GetName(), + metricType: typeHistogram, + tags: tags, + bucketBounds: bounds, + bucketValues: values, + infValue: *atomic.NewInt64(int64(metric.Histogram.GetSampleCount()) - prevValuesSum), + }) + case dto.MetricType_SUMMARY: + bounds := make([]float64, 0, len(metric.Summary.Quantile)) + values := make([]int64, 0, len(metric.Summary.Quantile)) + + var prevValuesSum int64 + + for _, bucket := range metric.Summary.GetQuantile() { + // prometheus uses cumulative buckets where solomon uses instant + bucketValue := int64(bucket.GetValue()) + bucketValue -= prevValuesSum + prevValuesSum += bucketValue + + bounds = append(bounds, bucket.GetQuantile()) + values = append(values, bucketValue) + } + + mName := mf.GetName() + + s.metrics = append(s.metrics, &Histogram{ + name: mName, + metricType: typeHistogram, + tags: tags, + bucketBounds: bounds, + bucketValues: values, + infValue: *atomic.NewInt64(int64(*metric.Summary.SampleCount) - prevValuesSum), + }, &Counter{ + name: mName + "_count", + metricType: typeCounter, + tags: tags, + value: *atomic.NewInt64(int64(*metric.Summary.SampleCount)), + }, &Gauge{ + name: mName + "_sum", + metricType: typeGauge, + tags: tags, + value: *atomic.NewFloat64(*metric.Summary.SampleSum), + }) + default: + return nil, fmt.Errorf("unsupported type: %s", mf.Type.String()) + } + } + } + + return s, nil +} diff --git a/library/go/core/metrics/solomon/converter_test.go b/library/go/core/metrics/solomon/converter_test.go new file mode 100644 index 0000000000..5368029038 --- /dev/null +++ b/library/go/core/metrics/solomon/converter_test.go @@ -0,0 +1,200 @@ +package solomon + +import ( + "testing" + + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/assert" + "github.com/ydb-platform/ydb/library/go/ptr" + "go.uber.org/atomic" +) + +func TestPrometheusMetrics(t *testing.T) { + testCases := []struct { + name string + metrics []*dto.MetricFamily + expect *Metrics + expectErr error + }{ + { + name: "success", + metrics: []*dto.MetricFamily{ + { + Name: ptr.String("subregister1_mygauge"), + Help: ptr.String(""), + Type: ptr.T(dto.MetricType_GAUGE), + Metric: []*dto.Metric{ + { + Label: []*dto.LabelPair{ + {Name: ptr.String("ololo"), Value: ptr.String("trololo")}, + }, + Gauge: &dto.Gauge{Value: ptr.Float64(42)}, + }, + }, + }, + { + Name: ptr.String("subregisters_count"), + Help: ptr.String(""), + Type: ptr.T(dto.MetricType_COUNTER), + Metric: []*dto.Metric{ + { + Label: []*dto.LabelPair{}, + Counter: &dto.Counter{Value: ptr.Float64(2)}, + }, + }, + }, + { + Name: ptr.String("subregister1_subregister2_myhistogram"), + Help: ptr.String(""), + Type: ptr.T(dto.MetricType_HISTOGRAM), + Metric: []*dto.Metric{ + { + Label: []*dto.LabelPair{ + {Name: ptr.String("ololo"), Value: ptr.String("trololo")}, + {Name: ptr.String("shimba"), Value: ptr.String("boomba")}, + }, + Histogram: &dto.Histogram{ + SampleCount: ptr.Uint64(6), + SampleSum: ptr.Float64(4.2), + Bucket: []*dto.Bucket{ + {CumulativeCount: ptr.Uint64(1), UpperBound: ptr.Float64(1)}, // 0.5 written + {CumulativeCount: ptr.Uint64(3), UpperBound: ptr.Float64(2)}, // 1.5 & 1.7 written + {CumulativeCount: ptr.Uint64(4), UpperBound: ptr.Float64(3)}, // 2.2 written + }, + }, + }, + }, + }, + { + Name: ptr.String("metrics_group"), + Help: ptr.String(""), + Type: ptr.T(dto.MetricType_COUNTER), + Metric: []*dto.Metric{ + { + Label: []*dto.LabelPair{}, + Counter: &dto.Counter{Value: ptr.Float64(2)}, + }, + { + Label: []*dto.LabelPair{}, + Counter: &dto.Counter{Value: ptr.Float64(3)}, + }, + }, + }, + }, + expect: &Metrics{ + metrics: []Metric{ + &Gauge{ + name: "subregister1_mygauge", + metricType: typeGauge, + tags: map[string]string{"ololo": "trololo"}, + value: *atomic.NewFloat64(42), + }, + &Counter{ + name: "subregisters_count", + metricType: typeCounter, + tags: map[string]string{}, + value: *atomic.NewInt64(2), + }, + &Histogram{ + name: "subregister1_subregister2_myhistogram", + metricType: typeHistogram, + tags: map[string]string{"ololo": "trololo", "shimba": "boomba"}, + bucketBounds: []float64{1, 2, 3}, + bucketValues: []int64{1, 2, 1}, + infValue: *atomic.NewInt64(2), + }, + // group of metrics + &Counter{ + name: "metrics_group", + metricType: typeCounter, + tags: map[string]string{}, + value: *atomic.NewInt64(2), + }, + &Counter{ + name: "metrics_group", + metricType: typeCounter, + tags: map[string]string{}, + value: *atomic.NewInt64(3), + }, + }, + }, + expectErr: nil, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + s, err := PrometheusMetrics(tc.metrics) + + if tc.expectErr == nil { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, tc.expectErr.Error()) + } + + assert.Equal(t, tc.expect, s) + }) + } +} + +func TestPrometheusSummaryMetric(t *testing.T) { + src := []*dto.MetricFamily{ + { + Name: ptr.String("subregister1_subregister2_mysummary"), + Help: ptr.String(""), + Type: func(mt dto.MetricType) *dto.MetricType { return &mt }(dto.MetricType_SUMMARY), + Metric: []*dto.Metric{ + { + Label: []*dto.LabelPair{ + {Name: ptr.String("ololo"), Value: ptr.String("trololo")}, + {Name: ptr.String("shimba"), Value: ptr.String("boomba")}, + }, + Summary: &dto.Summary{ + SampleCount: ptr.Uint64(8), + SampleSum: ptr.Float64(4.2), + Quantile: []*dto.Quantile{ + {Value: ptr.Float64(1), Quantile: ptr.Float64(1)}, // 0.5 written + {Value: ptr.Float64(3), Quantile: ptr.Float64(2)}, // 1.5 & 1.7 written + {Value: ptr.Float64(4), Quantile: ptr.Float64(3)}, // 2.2 written + }, + }, + }, + }, + }, + } + + mName := "subregister1_subregister2_mysummary" + mTags := map[string]string{"ololo": "trololo", "shimba": "boomba"} + bBounds := []float64{1, 2, 3} + bValues := []int64{1, 2, 1} + + expect := &Metrics{ + metrics: []Metric{ + &Histogram{ + name: mName, + metricType: typeHistogram, + tags: mTags, + bucketBounds: bBounds, + bucketValues: bValues, + infValue: *atomic.NewInt64(4), + }, + &Counter{ + name: mName + "_count", + metricType: typeCounter, + tags: mTags, + value: *atomic.NewInt64(8), + }, + &Gauge{ + name: mName + "_sum", + metricType: typeGauge, + tags: mTags, + value: *atomic.NewFloat64(4.2), + }, + }, + } + + s, err := PrometheusMetrics(src) + assert.NoError(t, err) + + assert.Equal(t, expect, s) +} diff --git a/library/go/core/metrics/solomon/counter.go b/library/go/core/metrics/solomon/counter.go new file mode 100644 index 0000000000..e37933760c --- /dev/null +++ b/library/go/core/metrics/solomon/counter.go @@ -0,0 +1,97 @@ +package solomon + +import ( + "encoding/json" + "time" + + "github.com/ydb-platform/ydb/library/go/core/metrics" + "go.uber.org/atomic" +) + +var ( + _ metrics.Counter = (*Counter)(nil) + _ Metric = (*Counter)(nil) +) + +// Counter tracks monotonically increasing value. +type Counter struct { + name string + metricType metricType + tags map[string]string + value atomic.Int64 + timestamp *time.Time + + useNameTag bool +} + +// Inc increments counter by 1. +func (c *Counter) Inc() { + c.Add(1) +} + +// Add adds delta to the counter. Delta must be >=0. +func (c *Counter) Add(delta int64) { + c.value.Add(delta) +} + +func (c *Counter) Name() string { + return c.name +} + +func (c *Counter) getType() metricType { + return c.metricType +} + +func (c *Counter) getLabels() map[string]string { + return c.tags +} + +func (c *Counter) getValue() interface{} { + return c.value.Load() +} + +func (c *Counter) getTimestamp() *time.Time { + return c.timestamp +} + +func (c *Counter) getNameTag() string { + if c.useNameTag { + return "name" + } else { + return "sensor" + } +} + +// MarshalJSON implements json.Marshaler. +func (c *Counter) MarshalJSON() ([]byte, error) { + return json.Marshal(struct { + Type string `json:"type"` + Labels map[string]string `json:"labels"` + Value int64 `json:"value"` + Timestamp *int64 `json:"ts,omitempty"` + }{ + Type: c.metricType.String(), + Value: c.value.Load(), + Labels: func() map[string]string { + labels := make(map[string]string, len(c.tags)+1) + labels[c.getNameTag()] = c.Name() + for k, v := range c.tags { + labels[k] = v + } + return labels + }(), + Timestamp: tsAsRef(c.timestamp), + }) +} + +// Snapshot returns independent copy on metric. +func (c *Counter) Snapshot() Metric { + return &Counter{ + name: c.name, + metricType: c.metricType, + tags: c.tags, + value: *atomic.NewInt64(c.value.Load()), + + useNameTag: c.useNameTag, + } +} diff --git a/library/go/core/metrics/solomon/counter_test.go b/library/go/core/metrics/solomon/counter_test.go new file mode 100644 index 0000000000..09284125d2 --- /dev/null +++ b/library/go/core/metrics/solomon/counter_test.go @@ -0,0 +1,90 @@ +package solomon + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "go.uber.org/atomic" +) + +func TestCounter_Add(t *testing.T) { + c := &Counter{ + name: "mycounter", + metricType: typeCounter, + tags: map[string]string{"ololo": "trololo"}, + } + + c.Add(1) + assert.Equal(t, int64(1), c.value.Load()) + + c.Add(42) + assert.Equal(t, int64(43), c.value.Load()) + + c.Add(1489) + assert.Equal(t, int64(1532), c.value.Load()) +} + +func TestCounter_Inc(t *testing.T) { + c := &Counter{ + name: "mycounter", + metricType: typeCounter, + tags: map[string]string{"ololo": "trololo"}, + } + + for i := 0; i < 10; i++ { + c.Inc() + } + assert.Equal(t, int64(10), c.value.Load()) + + c.Inc() + c.Inc() + assert.Equal(t, int64(12), c.value.Load()) +} + +func TestCounter_MarshalJSON(t *testing.T) { + c := &Counter{ + name: "mycounter", + metricType: typeCounter, + tags: map[string]string{"ololo": "trololo"}, + value: *atomic.NewInt64(42), + } + + b, err := json.Marshal(c) + assert.NoError(t, err) + + expected := []byte(`{"type":"COUNTER","labels":{"ololo":"trololo","sensor":"mycounter"},"value":42}`) + assert.Equal(t, expected, b) +} + +func TestRatedCounter_MarshalJSON(t *testing.T) { + c := &Counter{ + name: "mycounter", + metricType: typeRated, + tags: map[string]string{"ololo": "trololo"}, + value: *atomic.NewInt64(42), + } + + b, err := json.Marshal(c) + assert.NoError(t, err) + + expected := []byte(`{"type":"RATE","labels":{"ololo":"trololo","sensor":"mycounter"},"value":42}`) + assert.Equal(t, expected, b) +} + +func TestNameTagCounter_MarshalJSON(t *testing.T) { + c := &Counter{ + name: "mycounter", + metricType: typeCounter, + tags: map[string]string{"ololo": "trololo"}, + value: *atomic.NewInt64(42), + + useNameTag: true, + } + + b, err := json.Marshal(c) + assert.NoError(t, err) + + expected := []byte(`{"type":"COUNTER","labels":{"name":"mycounter","ololo":"trololo"},"value":42}`) + assert.Equal(t, expected, b) +} diff --git a/library/go/core/metrics/solomon/func_counter.go b/library/go/core/metrics/solomon/func_counter.go new file mode 100644 index 0000000000..db862869e4 --- /dev/null +++ b/library/go/core/metrics/solomon/func_counter.go @@ -0,0 +1,86 @@ +package solomon + +import ( + "encoding/json" + "time" + + "go.uber.org/atomic" +) + +var _ Metric = (*FuncCounter)(nil) + +// FuncCounter tracks int64 value returned by function. +type FuncCounter struct { + name string + metricType metricType + tags map[string]string + function func() int64 + timestamp *time.Time + useNameTag bool +} + +func (c *FuncCounter) Name() string { + return c.name +} + +func (c *FuncCounter) Function() func() int64 { + return c.function +} + +func (c *FuncCounter) getType() metricType { + return c.metricType +} + +func (c *FuncCounter) getLabels() map[string]string { + return c.tags +} + +func (c *FuncCounter) getValue() interface{} { + return c.function() +} + +func (c *FuncCounter) getTimestamp() *time.Time { + return c.timestamp +} + +func (c *FuncCounter) getNameTag() string { + if c.useNameTag { + return "name" + } else { + return "sensor" + } +} + +// MarshalJSON implements json.Marshaler. +func (c *FuncCounter) MarshalJSON() ([]byte, error) { + return json.Marshal(struct { + Type string `json:"type"` + Labels map[string]string `json:"labels"` + Value int64 `json:"value"` + Timestamp *int64 `json:"ts,omitempty"` + }{ + Type: c.metricType.String(), + Value: c.function(), + Labels: func() map[string]string { + labels := make(map[string]string, len(c.tags)+1) + labels[c.getNameTag()] = c.Name() + for k, v := range c.tags { + labels[k] = v + } + return labels + }(), + Timestamp: tsAsRef(c.timestamp), + }) +} + +// Snapshot returns independent copy on metric. +func (c *FuncCounter) Snapshot() Metric { + return &Counter{ + name: c.name, + metricType: c.metricType, + tags: c.tags, + value: *atomic.NewInt64(c.function()), + + useNameTag: c.useNameTag, + } +} diff --git a/library/go/core/metrics/solomon/func_counter_test.go b/library/go/core/metrics/solomon/func_counter_test.go new file mode 100644 index 0000000000..7849769d12 --- /dev/null +++ b/library/go/core/metrics/solomon/func_counter_test.go @@ -0,0 +1,82 @@ +package solomon + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "go.uber.org/atomic" +) + +func TestFuncCounter_Inc(t *testing.T) { + val := new(atomic.Int64) + c := &FuncCounter{ + name: "mycounter", + metricType: typeCounter, + tags: map[string]string{"ololo": "trololo"}, + function: func() int64 { + return val.Load() + }, + } + + val.Store(1) + assert.Equal(t, int64(1), c.Snapshot().(*Counter).value.Load()) + + val.Store(42) + assert.Equal(t, int64(42), c.Snapshot().(*Counter).value.Load()) + +} + +func TestFuncCounter_MarshalJSON(t *testing.T) { + c := &FuncCounter{ + name: "mycounter", + metricType: typeCounter, + tags: map[string]string{"ololo": "trololo"}, + function: func() int64 { + return 42 + }, + } + + b, err := json.Marshal(c) + assert.NoError(t, err) + + expected := []byte(`{"type":"COUNTER","labels":{"ololo":"trololo","sensor":"mycounter"},"value":42}`) + assert.Equal(t, expected, b) +} + +func TestRatedFuncCounter_MarshalJSON(t *testing.T) { + c := &FuncCounter{ + name: "mycounter", + metricType: typeRated, + tags: map[string]string{"ololo": "trololo"}, + function: func() int64 { + return 42 + }, + } + + b, err := json.Marshal(c) + assert.NoError(t, err) + + expected := []byte(`{"type":"RATE","labels":{"ololo":"trololo","sensor":"mycounter"},"value":42}`) + assert.Equal(t, expected, b) +} + +func TestNameTagFuncCounter_MarshalJSON(t *testing.T) { + c := &FuncCounter{ + name: "mycounter", + metricType: typeCounter, + tags: map[string]string{"ololo": "trololo"}, + + function: func() int64 { + return 42 + }, + + useNameTag: true, + } + + b, err := json.Marshal(c) + assert.NoError(t, err) + + expected := []byte(`{"type":"COUNTER","labels":{"name":"mycounter","ololo":"trololo"},"value":42}`) + assert.Equal(t, expected, b) +} diff --git a/library/go/core/metrics/solomon/func_gauge.go b/library/go/core/metrics/solomon/func_gauge.go new file mode 100644 index 0000000000..ce824c6fa8 --- /dev/null +++ b/library/go/core/metrics/solomon/func_gauge.go @@ -0,0 +1,87 @@ +package solomon + +import ( + "encoding/json" + "time" + + "go.uber.org/atomic" +) + +var _ Metric = (*FuncGauge)(nil) + +// FuncGauge tracks float64 value returned by function. +type FuncGauge struct { + name string + metricType metricType + tags map[string]string + function func() float64 + timestamp *time.Time + + useNameTag bool +} + +func (g *FuncGauge) Name() string { + return g.name +} + +func (g *FuncGauge) Function() func() float64 { + return g.function +} + +func (g *FuncGauge) getType() metricType { + return g.metricType +} + +func (g *FuncGauge) getLabels() map[string]string { + return g.tags +} + +func (g *FuncGauge) getValue() interface{} { + return g.function() +} + +func (g *FuncGauge) getTimestamp() *time.Time { + return g.timestamp +} + +func (g *FuncGauge) getNameTag() string { + if g.useNameTag { + return "name" + } else { + return "sensor" + } +} + +// MarshalJSON implements json.Marshaler. +func (g *FuncGauge) MarshalJSON() ([]byte, error) { + return json.Marshal(struct { + Type string `json:"type"` + Labels map[string]string `json:"labels"` + Value float64 `json:"value"` + Timestamp *int64 `json:"ts,omitempty"` + }{ + Type: g.metricType.String(), + Value: g.function(), + Labels: func() map[string]string { + labels := make(map[string]string, len(g.tags)+1) + labels[g.getNameTag()] = g.Name() + for k, v := range g.tags { + labels[k] = v + } + return labels + }(), + Timestamp: tsAsRef(g.timestamp), + }) +} + +// Snapshot returns independent copy on metric. +func (g *FuncGauge) Snapshot() Metric { + return &Gauge{ + name: g.name, + metricType: g.metricType, + tags: g.tags, + value: *atomic.NewFloat64(g.function()), + + useNameTag: g.useNameTag, + } +} diff --git a/library/go/core/metrics/solomon/func_gauge_test.go b/library/go/core/metrics/solomon/func_gauge_test.go new file mode 100644 index 0000000000..f4317a0cab --- /dev/null +++ b/library/go/core/metrics/solomon/func_gauge_test.go @@ -0,0 +1,64 @@ +package solomon + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "go.uber.org/atomic" +) + +func TestFuncGauge_Value(t *testing.T) { + val := new(atomic.Float64) + c := &FuncGauge{ + name: "mygauge", + metricType: typeGauge, + tags: map[string]string{"ololo": "trololo"}, + function: func() float64 { + return val.Load() + }, + } + + val.Store(1) + assert.Equal(t, float64(1), c.Snapshot().(*Gauge).value.Load()) + + val.Store(42) + assert.Equal(t, float64(42), c.Snapshot().(*Gauge).value.Load()) + +} + +func TestFunGauge_MarshalJSON(t *testing.T) { + c := &FuncGauge{ + name: "mygauge", + metricType: typeGauge, + tags: map[string]string{"ololo": "trololo"}, + function: func() float64 { + return 42.18 + }, + } + + b, err := json.Marshal(c) + assert.NoError(t, err) + + expected := []byte(`{"type":"DGAUGE","labels":{"ololo":"trololo","sensor":"mygauge"},"value":42.18}`) + assert.Equal(t, expected, b) +} + +func TestNameTagFunGauge_MarshalJSON(t *testing.T) { + c := &FuncGauge{ + name: "mygauge", + metricType: typeGauge, + tags: map[string]string{"ololo": "trololo"}, + function: func() float64 { + return 42.18 + }, + + useNameTag: true, + } + + b, err := json.Marshal(c) + assert.NoError(t, err) + + expected := []byte(`{"type":"DGAUGE","labels":{"name":"mygauge","ololo":"trololo"},"value":42.18}`) + assert.Equal(t, expected, b) +} diff --git a/library/go/core/metrics/solomon/func_int_gauge.go b/library/go/core/metrics/solomon/func_int_gauge.go new file mode 100644 index 0000000000..4e7f22949a --- /dev/null +++ b/library/go/core/metrics/solomon/func_int_gauge.go @@ -0,0 +1,87 @@ +package solomon + +import ( + "encoding/json" + "time" + + "go.uber.org/atomic" +) + +var _ Metric = (*FuncIntGauge)(nil) + +// FuncIntGauge tracks int64 value returned by function. +type FuncIntGauge struct { + name string + metricType metricType + tags map[string]string + function func() int64 + timestamp *time.Time + + useNameTag bool +} + +func (g *FuncIntGauge) Name() string { + return g.name +} + +func (g *FuncIntGauge) Function() func() int64 { + return g.function +} + +func (g *FuncIntGauge) getType() metricType { + return g.metricType +} + +func (g *FuncIntGauge) getLabels() map[string]string { + return g.tags +} + +func (g *FuncIntGauge) getValue() interface{} { + return g.function() +} + +func (g *FuncIntGauge) getTimestamp() *time.Time { + return g.timestamp +} + +func (g *FuncIntGauge) getNameTag() string { + if g.useNameTag { + return "name" + } else { + return "sensor" + } +} + +// MarshalJSON implements json.Marshaler. +func (g *FuncIntGauge) MarshalJSON() ([]byte, error) { + return json.Marshal(struct { + Type string `json:"type"` + Labels map[string]string `json:"labels"` + Value int64 `json:"value"` + Timestamp *int64 `json:"ts,omitempty"` + }{ + Type: g.metricType.String(), + Value: g.function(), + Labels: func() map[string]string { + labels := make(map[string]string, len(g.tags)+1) + labels[g.getNameTag()] = g.Name() + for k, v := range g.tags { + labels[k] = v + } + return labels + }(), + Timestamp: tsAsRef(g.timestamp), + }) +} + +// Snapshot returns independent copy on metric. +func (g *FuncIntGauge) Snapshot() Metric { + return &IntGauge{ + name: g.name, + metricType: g.metricType, + tags: g.tags, + value: *atomic.NewInt64(g.function()), + + useNameTag: g.useNameTag, + } +} diff --git a/library/go/core/metrics/solomon/func_int_gauge_test.go b/library/go/core/metrics/solomon/func_int_gauge_test.go new file mode 100644 index 0000000000..4a576461e3 --- /dev/null +++ b/library/go/core/metrics/solomon/func_int_gauge_test.go @@ -0,0 +1,64 @@ +package solomon + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "go.uber.org/atomic" +) + +func TestFuncIntGauge_Value(t *testing.T) { + val := new(atomic.Int64) + c := &FuncIntGauge{ + name: "myintgauge", + metricType: typeIGauge, + tags: map[string]string{"ololo": "trololo"}, + function: func() int64 { + return val.Load() + }, + } + + val.Store(1) + assert.Equal(t, int64(1), c.Snapshot().(*IntGauge).value.Load()) + + val.Store(42) + assert.Equal(t, int64(42), c.Snapshot().(*IntGauge).value.Load()) + +} + +func TestFunIntGauge_MarshalJSON(t *testing.T) { + c := &FuncIntGauge{ + name: "myintgauge", + metricType: typeIGauge, + tags: map[string]string{"ololo": "trololo"}, + function: func() int64 { + return 42 + }, + } + + b, err := json.Marshal(c) + assert.NoError(t, err) + + expected := []byte(`{"type":"IGAUGE","labels":{"ololo":"trololo","sensor":"myintgauge"},"value":42}`) + assert.Equal(t, expected, b) +} + +func TestNameTagFunIntGauge_MarshalJSON(t *testing.T) { + c := &FuncIntGauge{ + name: "myintgauge", + metricType: typeIGauge, + tags: map[string]string{"ololo": "trololo"}, + function: func() int64 { + return 42 + }, + + useNameTag: true, + } + + b, err := json.Marshal(c) + assert.NoError(t, err) + + expected := []byte(`{"type":"IGAUGE","labels":{"name":"myintgauge","ololo":"trololo"},"value":42}`) + assert.Equal(t, expected, b) +} diff --git a/library/go/core/metrics/solomon/gauge.go b/library/go/core/metrics/solomon/gauge.go new file mode 100644 index 0000000000..4660d33c11 --- /dev/null +++ b/library/go/core/metrics/solomon/gauge.go @@ -0,0 +1,115 @@ +package solomon + +import ( + "encoding/json" + "time" + + "github.com/ydb-platform/ydb/library/go/core/metrics" + "go.uber.org/atomic" +) + +var ( + _ metrics.Gauge = (*Gauge)(nil) + _ Metric = (*Gauge)(nil) +) + +// Gauge tracks single float64 value. +type Gauge struct { + name string + metricType metricType + tags map[string]string + value atomic.Float64 + timestamp *time.Time + + useNameTag bool +} + +func NewGauge(name string, value float64, opts ...metricOpts) Gauge { + mOpts := MetricsOpts{} + for _, op := range opts { + op(&mOpts) + } + return Gauge{ + name: name, + metricType: typeGauge, + tags: mOpts.tags, + value: *atomic.NewFloat64(value), + useNameTag: mOpts.useNameTag, + timestamp: mOpts.timestamp, + } +} + +func (g *Gauge) Set(value float64) { + g.value.Store(value) +} + +func (g *Gauge) Add(value float64) { + g.value.Add(value) +} + +func (g *Gauge) Name() string { + return g.name +} + +func (g *Gauge) getType() metricType { + return g.metricType +} + +func (g *Gauge) getLabels() map[string]string { + return g.tags +} + +func (g *Gauge) getValue() interface{} { + return g.value.Load() +} + +func (g *Gauge) getTimestamp() *time.Time { + return g.timestamp +} + +func (g *Gauge) getNameTag() string { + if g.useNameTag { + return "name" + } else { + return "sensor" + } +} + +// MarshalJSON implements json.Marshaler. +func (g *Gauge) MarshalJSON() ([]byte, error) { + metricType := g.metricType.String() + value := g.value.Load() + labels := func() map[string]string { + labels := make(map[string]string, len(g.tags)+1) + labels[g.getNameTag()] = g.Name() + for k, v := range g.tags { + labels[k] = v + } + return labels + }() + + return json.Marshal(struct { + Type string `json:"type"` + Labels map[string]string `json:"labels"` + Value float64 `json:"value"` + Timestamp *int64 `json:"ts,omitempty"` + }{ + Type: metricType, + Value: value, + Labels: labels, + Timestamp: tsAsRef(g.timestamp), + }) +} + +// Snapshot returns independent copy of metric. +func (g *Gauge) Snapshot() Metric { + return &Gauge{ + name: g.name, + metricType: g.metricType, + tags: g.tags, + value: *atomic.NewFloat64(g.value.Load()), + + useNameTag: g.useNameTag, + timestamp: g.timestamp, + } +} diff --git a/library/go/core/metrics/solomon/gauge_test.go b/library/go/core/metrics/solomon/gauge_test.go new file mode 100644 index 0000000000..82659a49c4 --- /dev/null +++ b/library/go/core/metrics/solomon/gauge_test.go @@ -0,0 +1,75 @@ +package solomon + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "go.uber.org/atomic" +) + +func TestGauge_Add(t *testing.T) { + c := &Gauge{ + name: "mygauge", + metricType: typeGauge, + tags: map[string]string{"ololo": "trololo"}, + } + + c.Add(1) + assert.Equal(t, float64(1), c.value.Load()) + + c.Add(42) + assert.Equal(t, float64(43), c.value.Load()) + + c.Add(14.89) + assert.Equal(t, float64(57.89), c.value.Load()) +} + +func TestGauge_Set(t *testing.T) { + c := &Gauge{ + name: "mygauge", + metricType: typeGauge, + tags: map[string]string{"ololo": "trololo"}, + } + + c.Set(1) + assert.Equal(t, float64(1), c.value.Load()) + + c.Set(42) + assert.Equal(t, float64(42), c.value.Load()) + + c.Set(14.89) + assert.Equal(t, float64(14.89), c.value.Load()) +} + +func TestGauge_MarshalJSON(t *testing.T) { + c := &Gauge{ + name: "mygauge", + metricType: typeGauge, + tags: map[string]string{"ololo": "trololo"}, + value: *atomic.NewFloat64(42.18), + } + + b, err := json.Marshal(c) + assert.NoError(t, err) + + expected := []byte(`{"type":"DGAUGE","labels":{"ololo":"trololo","sensor":"mygauge"},"value":42.18}`) + assert.Equal(t, expected, b) +} + +func TestNameTagGauge_MarshalJSON(t *testing.T) { + c := &Gauge{ + name: "mygauge", + metricType: typeGauge, + tags: map[string]string{"ololo": "trololo"}, + value: *atomic.NewFloat64(42.18), + + useNameTag: true, + } + + b, err := json.Marshal(c) + assert.NoError(t, err) + + expected := []byte(`{"type":"DGAUGE","labels":{"name":"mygauge","ololo":"trololo"},"value":42.18}`) + assert.Equal(t, expected, b) +} diff --git a/library/go/core/metrics/solomon/gotest/ya.make b/library/go/core/metrics/solomon/gotest/ya.make new file mode 100644 index 0000000000..0c386167a4 --- /dev/null +++ b/library/go/core/metrics/solomon/gotest/ya.make @@ -0,0 +1,3 @@ +GO_TEST_FOR(library/go/core/metrics/solomon) + +END() diff --git a/library/go/core/metrics/solomon/histogram.go b/library/go/core/metrics/solomon/histogram.go new file mode 100644 index 0000000000..6f4d3629e0 --- /dev/null +++ b/library/go/core/metrics/solomon/histogram.go @@ -0,0 +1,182 @@ +package solomon + +import ( + "encoding/binary" + "encoding/json" + "io" + "sort" + "sync" + "time" + + "github.com/ydb-platform/ydb/library/go/core/metrics" + "github.com/ydb-platform/ydb/library/go/core/xerrors" + "go.uber.org/atomic" +) + +var ( + _ metrics.Histogram = (*Histogram)(nil) + _ metrics.Timer = (*Histogram)(nil) + _ Metric = (*Histogram)(nil) +) + +type Histogram struct { + name string + metricType metricType + tags map[string]string + bucketBounds []float64 + bucketValues []int64 + infValue atomic.Int64 + mutex sync.Mutex + timestamp *time.Time + useNameTag bool +} + +type histogram struct { + Bounds []float64 `json:"bounds"` + Buckets []int64 `json:"buckets"` + Inf int64 `json:"inf,omitempty"` +} + +func (h *histogram) writeHistogram(w io.Writer) error { + err := writeULEB128(w, uint32(len(h.Buckets))) + if err != nil { + return xerrors.Errorf("writeULEB128 size histogram buckets failed: %w", err) + } + + for _, upperBound := range h.Bounds { + err = binary.Write(w, binary.LittleEndian, float64(upperBound)) + if err != nil { + return xerrors.Errorf("binary.Write upper bound failed: %w", err) + } + } + + for _, bucketValue := range h.Buckets { + err = binary.Write(w, binary.LittleEndian, uint64(bucketValue)) + if err != nil { + return xerrors.Errorf("binary.Write histogram buckets failed: %w", err) + } + } + return nil +} + +func (h *Histogram) RecordValue(value float64) { + boundIndex := sort.SearchFloat64s(h.bucketBounds, value) + + if boundIndex < len(h.bucketValues) { + h.mutex.Lock() + h.bucketValues[boundIndex] += 1 + h.mutex.Unlock() + } else { + h.infValue.Inc() + } +} + +func (h *Histogram) RecordDuration(value time.Duration) { + h.RecordValue(value.Seconds()) +} + +func (h *Histogram) Reset() { + h.mutex.Lock() + defer h.mutex.Unlock() + + h.bucketValues = make([]int64, len(h.bucketValues)) + h.infValue.Store(0) +} + +func (h *Histogram) Name() string { + return h.name +} + +func (h *Histogram) getType() metricType { + return h.metricType +} + +func (h *Histogram) getLabels() map[string]string { + return h.tags +} + +func (h *Histogram) getValue() interface{} { + return histogram{ + Bounds: h.bucketBounds, + Buckets: h.bucketValues, + } +} + +func (h *Histogram) getTimestamp() *time.Time { + return h.timestamp +} + +func (h *Histogram) getNameTag() string { + if h.useNameTag { + return "name" + } else { + return "sensor" + } +} + +// MarshalJSON implements json.Marshaler. +func (h *Histogram) MarshalJSON() ([]byte, error) { + valuesCopy := make([]int64, len(h.bucketValues)) + h.mutex.Lock() + copy(valuesCopy, h.bucketValues) + h.mutex.Unlock() + return json.Marshal(struct { + Type string `json:"type"` + Labels map[string]string `json:"labels"` + Histogram histogram `json:"hist"` + Timestamp *int64 `json:"ts,omitempty"` + }{ + Type: h.metricType.String(), + Histogram: histogram{ + Bounds: h.bucketBounds, + Buckets: valuesCopy, + Inf: h.infValue.Load(), + }, + Labels: func() map[string]string { + labels := make(map[string]string, len(h.tags)+1) + labels[h.getNameTag()] = h.Name() + for k, v := range h.tags { + labels[k] = v + } + return labels + }(), + Timestamp: tsAsRef(h.timestamp), + }) +} + +// Snapshot returns independent copy on metric. +func (h *Histogram) Snapshot() Metric { + bucketBounds := make([]float64, len(h.bucketBounds)) + bucketValues := make([]int64, len(h.bucketValues)) + + copy(bucketBounds, h.bucketBounds) + h.mutex.Lock() + copy(bucketValues, h.bucketValues) + h.mutex.Unlock() + + return &Histogram{ + name: h.name, + metricType: h.metricType, + tags: h.tags, + bucketBounds: bucketBounds, + bucketValues: bucketValues, + infValue: *atomic.NewInt64(h.infValue.Load()), + useNameTag: h.useNameTag, + } +} + +// InitBucketValues cleans internal bucketValues and saves new values in order. +// Length of internal bucketValues stays unchanged. +// If length of slice in argument bucketValues more than length of internal one, +// the first extra element of bucketValues is stored in infValue. +func (h *Histogram) InitBucketValues(bucketValues []int64) { + h.mutex.Lock() + defer h.mutex.Unlock() + + h.bucketValues = make([]int64, len(h.bucketValues)) + h.infValue.Store(0) + copy(h.bucketValues, bucketValues) + if len(bucketValues) > len(h.bucketValues) { + h.infValue.Store(bucketValues[len(h.bucketValues)]) + } +} diff --git a/library/go/core/metrics/solomon/histogram_test.go b/library/go/core/metrics/solomon/histogram_test.go new file mode 100644 index 0000000000..be7042397c --- /dev/null +++ b/library/go/core/metrics/solomon/histogram_test.go @@ -0,0 +1,153 @@ +package solomon + +import ( + "encoding/json" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.uber.org/atomic" +) + +func TestHistogram_MarshalJSON(t *testing.T) { + h := &Histogram{ + name: "myhistogram", + metricType: typeHistogram, + tags: map[string]string{"ololo": "trololo"}, + bucketBounds: []float64{1, 2, 3}, + bucketValues: []int64{1, 2, 1}, + infValue: *atomic.NewInt64(2), + } + + b, err := json.Marshal(h) + assert.NoError(t, err) + + expected := []byte(`{"type":"HIST","labels":{"ololo":"trololo","sensor":"myhistogram"},"hist":{"bounds":[1,2,3],"buckets":[1,2,1],"inf":2}}`) + assert.Equal(t, expected, b) +} + +func TestRatedHistogram_MarshalJSON(t *testing.T) { + h := &Histogram{ + name: "myhistogram", + metricType: typeRatedHistogram, + tags: map[string]string{"ololo": "trololo"}, + bucketBounds: []float64{1, 2, 3}, + bucketValues: []int64{1, 2, 1}, + infValue: *atomic.NewInt64(2), + } + + b, err := json.Marshal(h) + assert.NoError(t, err) + + expected := []byte(`{"type":"HIST_RATE","labels":{"ololo":"trololo","sensor":"myhistogram"},"hist":{"bounds":[1,2,3],"buckets":[1,2,1],"inf":2}}`) + assert.Equal(t, expected, b) +} + +func TestNameTagHistogram_MarshalJSON(t *testing.T) { + h := &Histogram{ + name: "myhistogram", + metricType: typeRatedHistogram, + tags: map[string]string{"ololo": "trololo"}, + bucketBounds: []float64{1, 2, 3}, + bucketValues: []int64{1, 2, 1}, + infValue: *atomic.NewInt64(2), + useNameTag: true, + } + + b, err := json.Marshal(h) + assert.NoError(t, err) + + expected := []byte(`{"type":"HIST_RATE","labels":{"name":"myhistogram","ololo":"trololo"},"hist":{"bounds":[1,2,3],"buckets":[1,2,1],"inf":2}}`) + assert.Equal(t, expected, b) +} + +func TestHistogram_RecordDuration(t *testing.T) { + h := &Histogram{ + name: "myhistogram", + metricType: typeHistogram, + tags: map[string]string{"ololo": "trololo"}, + bucketBounds: []float64{1, 2, 3}, + bucketValues: make([]int64, 3), + } + + h.RecordDuration(500 * time.Millisecond) + h.RecordDuration(1 * time.Second) + h.RecordDuration(1800 * time.Millisecond) + h.RecordDuration(3 * time.Second) + h.RecordDuration(1 * time.Hour) + + expectedValues := []int64{2, 1, 1} + assert.Equal(t, expectedValues, h.bucketValues) + + var expectedInfValue int64 = 1 + assert.Equal(t, expectedInfValue, h.infValue.Load()) +} + +func TestHistogram_RecordValue(t *testing.T) { + h := &Histogram{ + name: "myhistogram", + metricType: typeHistogram, + tags: map[string]string{"ololo": "trololo"}, + bucketBounds: []float64{1, 2, 3}, + bucketValues: make([]int64, 3), + } + + h.RecordValue(0.5) + h.RecordValue(1) + h.RecordValue(1.8) + h.RecordValue(3) + h.RecordValue(60) + + expectedValues := []int64{2, 1, 1} + assert.Equal(t, expectedValues, h.bucketValues) + + var expectedInfValue int64 = 1 + assert.Equal(t, expectedInfValue, h.infValue.Load()) +} + +func TestHistogram_Reset(t *testing.T) { + h := &Histogram{ + name: "myhistogram", + metricType: typeHistogram, + tags: map[string]string{"ololo": "trololo"}, + bucketBounds: []float64{1, 2, 3}, + bucketValues: make([]int64, 3), + } + + h.RecordValue(0.5) + h.RecordValue(1) + h.RecordValue(1.8) + h.RecordValue(3) + h.RecordValue(60) + + assert.Equal(t, []int64{2, 1, 1}, h.bucketValues) + assert.Equal(t, int64(1), h.infValue.Load()) + + h.Reset() + + assert.Equal(t, []int64{0, 0, 0}, h.bucketValues) + assert.Equal(t, int64(0), h.infValue.Load()) +} + +func TestHistogram_InitBucketValues(t *testing.T) { + h := &Histogram{ + name: "myhistogram", + metricType: typeHistogram, + tags: map[string]string{"ololo": "trololo"}, + bucketBounds: []float64{1, 2, 3}, + bucketValues: make([]int64, 3), + } + + valsToInit := []int64{1, 2, 3, 4} + h.InitBucketValues(valsToInit[:2]) + assert.Equal(t, append(valsToInit[:2], 0), h.bucketValues) + assert.Equal(t, *atomic.NewInt64(0), h.infValue) + + h.InitBucketValues(valsToInit[:3]) + assert.Equal(t, valsToInit[:3], h.bucketValues) + assert.Equal(t, *atomic.NewInt64(0), h.infValue) + + h.InitBucketValues(valsToInit) + assert.Equal(t, valsToInit[:3], h.bucketValues) + assert.Equal(t, *atomic.NewInt64(valsToInit[3]), h.infValue) +} diff --git a/library/go/core/metrics/solomon/int_gauge.go b/library/go/core/metrics/solomon/int_gauge.go new file mode 100644 index 0000000000..8733bf11fe --- /dev/null +++ b/library/go/core/metrics/solomon/int_gauge.go @@ -0,0 +1,115 @@ +package solomon + +import ( + "encoding/json" + "time" + + "github.com/ydb-platform/ydb/library/go/core/metrics" + "go.uber.org/atomic" +) + +var ( + _ metrics.IntGauge = (*IntGauge)(nil) + _ Metric = (*IntGauge)(nil) +) + +// IntGauge tracks single float64 value. +type IntGauge struct { + name string + metricType metricType + tags map[string]string + value atomic.Int64 + timestamp *time.Time + + useNameTag bool +} + +func NewIntGauge(name string, value int64, opts ...metricOpts) IntGauge { + mOpts := MetricsOpts{} + for _, op := range opts { + op(&mOpts) + } + return IntGauge{ + name: name, + metricType: typeIGauge, + tags: mOpts.tags, + value: *atomic.NewInt64(value), + useNameTag: mOpts.useNameTag, + timestamp: mOpts.timestamp, + } +} + +func (g *IntGauge) Set(value int64) { + g.value.Store(value) +} + +func (g *IntGauge) Add(value int64) { + g.value.Add(value) +} + +func (g *IntGauge) Name() string { + return g.name +} + +func (g *IntGauge) getType() metricType { + return g.metricType +} + +func (g *IntGauge) getLabels() map[string]string { + return g.tags +} + +func (g *IntGauge) getValue() interface{} { + return g.value.Load() +} + +func (g *IntGauge) getTimestamp() *time.Time { + return g.timestamp +} + +func (g *IntGauge) getNameTag() string { + if g.useNameTag { + return "name" + } else { + return "sensor" + } +} + +// MarshalJSON implements json.Marshaler. +func (g *IntGauge) MarshalJSON() ([]byte, error) { + metricType := g.metricType.String() + value := g.value.Load() + labels := func() map[string]string { + labels := make(map[string]string, len(g.tags)+1) + labels[g.getNameTag()] = g.Name() + for k, v := range g.tags { + labels[k] = v + } + return labels + }() + + return json.Marshal(struct { + Type string `json:"type"` + Labels map[string]string `json:"labels"` + Value int64 `json:"value"` + Timestamp *int64 `json:"ts,omitempty"` + }{ + Type: metricType, + Value: value, + Labels: labels, + Timestamp: tsAsRef(g.timestamp), + }) +} + +// Snapshot returns independent copy of metric. +func (g *IntGauge) Snapshot() Metric { + return &IntGauge{ + name: g.name, + metricType: g.metricType, + tags: g.tags, + value: *atomic.NewInt64(g.value.Load()), + + useNameTag: g.useNameTag, + timestamp: g.timestamp, + } +} diff --git a/library/go/core/metrics/solomon/int_gauge_test.go b/library/go/core/metrics/solomon/int_gauge_test.go new file mode 100644 index 0000000000..5918ef9ac3 --- /dev/null +++ b/library/go/core/metrics/solomon/int_gauge_test.go @@ -0,0 +1,75 @@ +package solomon + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "go.uber.org/atomic" +) + +func TestIntGauge_Add(t *testing.T) { + c := &IntGauge{ + name: "myintgauge", + metricType: typeIGauge, + tags: map[string]string{"ololo": "trololo"}, + } + + c.Add(1) + assert.Equal(t, int64(1), c.value.Load()) + + c.Add(42) + assert.Equal(t, int64(43), c.value.Load()) + + c.Add(-45) + assert.Equal(t, int64(-2), c.value.Load()) +} + +func TestIntGauge_Set(t *testing.T) { + c := &IntGauge{ + name: "myintgauge", + metricType: typeIGauge, + tags: map[string]string{"ololo": "trololo"}, + } + + c.Set(1) + assert.Equal(t, int64(1), c.value.Load()) + + c.Set(42) + assert.Equal(t, int64(42), c.value.Load()) + + c.Set(-45) + assert.Equal(t, int64(-45), c.value.Load()) +} + +func TestIntGauge_MarshalJSON(t *testing.T) { + c := &IntGauge{ + name: "myintgauge", + metricType: typeIGauge, + tags: map[string]string{"ololo": "trololo"}, + value: *atomic.NewInt64(42), + } + + b, err := json.Marshal(c) + assert.NoError(t, err) + + expected := []byte(`{"type":"IGAUGE","labels":{"ololo":"trololo","sensor":"myintgauge"},"value":42}`) + assert.Equal(t, expected, b) +} + +func TestNameTagIntGauge_MarshalJSON(t *testing.T) { + c := &IntGauge{ + name: "myintgauge", + metricType: typeIGauge, + tags: map[string]string{"ololo": "trololo"}, + value: *atomic.NewInt64(42), + + useNameTag: true, + } + + b, err := json.Marshal(c) + assert.NoError(t, err) + + expected := []byte(`{"type":"IGAUGE","labels":{"name":"myintgauge","ololo":"trololo"},"value":42}`) + assert.Equal(t, expected, b) +} diff --git a/library/go/core/metrics/solomon/metrics.go b/library/go/core/metrics/solomon/metrics.go new file mode 100644 index 0000000000..6b73fd10a6 --- /dev/null +++ b/library/go/core/metrics/solomon/metrics.go @@ -0,0 +1,187 @@ +package solomon + +import ( + "bytes" + "context" + "encoding" + "encoding/json" + "fmt" + "time" + + "github.com/ydb-platform/ydb/library/go/core/xerrors" + "golang.org/x/exp/slices" +) + +// Gather collects all metrics data via snapshots. +func (r Registry) Gather() (*Metrics, error) { + metrics := make([]Metric, 0) + + var err error + r.metrics.Range(func(_, v interface{}) bool { + if s, ok := v.(Metric); ok { + metrics = append(metrics, s.Snapshot()) + return true + } + err = fmt.Errorf("unexpected value type: %T", v) + return false + }) + + if err != nil { + return nil, err + } + + return &Metrics{metrics: metrics}, nil +} + +func NewMetrics(metrics []Metric) Metrics { + return Metrics{metrics: metrics} +} + +func NewMetricsWithTimestamp(metrics []Metric, ts time.Time) Metrics { + return Metrics{metrics: metrics, timestamp: &ts} +} + +type valueType uint8 + +const ( + valueTypeNone valueType = iota + valueTypeOneWithoutTS valueType = 0x01 + valueTypeOneWithTS valueType = 0x02 + valueTypeManyWithTS valueType = 0x03 +) + +type metricType uint8 + +const ( + typeUnspecified metricType = iota + typeGauge metricType = 0x01 + typeCounter metricType = 0x02 + typeRated metricType = 0x03 + typeIGauge metricType = 0x04 + typeHistogram metricType = 0x05 + typeRatedHistogram metricType = 0x06 +) + +func (k metricType) String() string { + switch k { + case typeCounter: + return "COUNTER" + case typeGauge: + return "DGAUGE" + case typeIGauge: + return "IGAUGE" + case typeHistogram: + return "HIST" + case typeRated: + return "RATE" + case typeRatedHistogram: + return "HIST_RATE" + default: + panic("unknown metric type") + } +} + +// Metric is an any abstract solomon Metric. +type Metric interface { + json.Marshaler + + Name() string + getType() metricType + getLabels() map[string]string + getValue() interface{} + getNameTag() string + getTimestamp() *time.Time + + Snapshot() Metric +} + +// Rated marks given Solomon metric or vector as rated. +// Example: +// +// cnt := r.Counter("mycounter") +// Rated(cnt) +// +// cntvec := r.CounterVec("mycounter", []string{"mytag"}) +// Rated(cntvec) +// +// For additional info: https://docs.yandex-team.ru/solomon/data-collection/dataformat/json +func Rated(s interface{}) { + switch st := s.(type) { + case *Counter: + st.metricType = typeRated + case *FuncCounter: + st.metricType = typeRated + case *Histogram: + st.metricType = typeRatedHistogram + + case *CounterVec: + st.vec.rated = true + case *HistogramVec: + st.vec.rated = true + case *DurationHistogramVec: + st.vec.rated = true + } + // any other metrics types are unrateable +} + +var ( + _ json.Marshaler = (*Metrics)(nil) + _ encoding.BinaryMarshaler = (*Metrics)(nil) +) + +type Metrics struct { + metrics []Metric + timestamp *time.Time +} + +// MarshalJSON implements json.Marshaler. +func (s Metrics) MarshalJSON() ([]byte, error) { + return json.Marshal(struct { + Metrics []Metric `json:"metrics"` + Timestamp *int64 `json:"ts,omitempty"` + }{s.metrics, tsAsRef(s.timestamp)}) +} + +// MarshalBinary implements encoding.BinaryMarshaler. +func (s Metrics) MarshalBinary() ([]byte, error) { + var buf bytes.Buffer + se := NewSpackEncoder(context.Background(), CompressionNone, &s) + n, err := se.Encode(&buf) + if err != nil { + return nil, xerrors.Errorf("encode only %d bytes: %w", n, err) + } + return buf.Bytes(), nil +} + +// SplitToChunks splits Metrics into a slice of chunks, each at most maxChunkSize long. +// The length of returned slice is always at least one. +// Zero maxChunkSize denotes unlimited chunk length. +func (s Metrics) SplitToChunks(maxChunkSize int) []Metrics { + if maxChunkSize == 0 || len(s.metrics) == 0 { + return []Metrics{s} + } + chunks := make([]Metrics, 0, len(s.metrics)/maxChunkSize+1) + + for leftBound := 0; leftBound < len(s.metrics); leftBound += maxChunkSize { + rightBound := leftBound + maxChunkSize + if rightBound > len(s.metrics) { + rightBound = len(s.metrics) + } + chunk := s.metrics[leftBound:rightBound] + chunks = append(chunks, Metrics{metrics: chunk}) + } + return chunks +} + +// List return list of metrics +func (s Metrics) List() []Metric { + return slices.Clone(s.metrics) +} + +func tsAsRef(t *time.Time) *int64 { + if t == nil { + return nil + } + ts := t.Unix() + return &ts +} diff --git a/library/go/core/metrics/solomon/metrics_opts.go b/library/go/core/metrics/solomon/metrics_opts.go new file mode 100644 index 0000000000..d9ade67966 --- /dev/null +++ b/library/go/core/metrics/solomon/metrics_opts.go @@ -0,0 +1,29 @@ +package solomon + +import "time" + +type MetricsOpts struct { + useNameTag bool + tags map[string]string + timestamp *time.Time +} + +type metricOpts func(*MetricsOpts) + +func WithTags(tags map[string]string) func(*MetricsOpts) { + return func(m *MetricsOpts) { + m.tags = tags + } +} + +func WithUseNameTag() func(*MetricsOpts) { + return func(m *MetricsOpts) { + m.useNameTag = true + } +} + +func WithTimestamp(t time.Time) func(*MetricsOpts) { + return func(m *MetricsOpts) { + m.timestamp = &t + } +} diff --git a/library/go/core/metrics/solomon/metrics_test.go b/library/go/core/metrics/solomon/metrics_test.go new file mode 100644 index 0000000000..610fa061a1 --- /dev/null +++ b/library/go/core/metrics/solomon/metrics_test.go @@ -0,0 +1,296 @@ +package solomon + +import ( + "encoding/json" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/ydb-platform/ydb/library/go/core/metrics" + "go.uber.org/atomic" +) + +func TestMetrics_MarshalJSON(t *testing.T) { + s := &Metrics{ + metrics: []Metric{ + &Counter{ + name: "mycounter", + metricType: typeCounter, + tags: map[string]string{"ololo": "trololo"}, + value: *atomic.NewInt64(42), + }, + &Counter{ + name: "myratedcounter", + metricType: typeRated, + tags: map[string]string{"ololo": "trololo"}, + value: *atomic.NewInt64(42), + }, + &Gauge{ + name: "mygauge", + metricType: typeGauge, + tags: map[string]string{"shimba": "boomba"}, + value: *atomic.NewFloat64(14.89), + }, + &Timer{ + name: "mytimer", + metricType: typeGauge, + tags: map[string]string{"looken": "tooken"}, + value: *atomic.NewDuration(1456 * time.Millisecond), + }, + &Histogram{ + name: "myhistogram", + metricType: typeHistogram, + tags: map[string]string{"chicken": "cooken"}, + bucketBounds: []float64{1, 2, 3}, + bucketValues: []int64{1, 2, 1}, + infValue: *atomic.NewInt64(1), + }, + &Histogram{ + name: "myratedhistogram", + metricType: typeRatedHistogram, + tags: map[string]string{"chicken": "cooken"}, + bucketBounds: []float64{1, 2, 3}, + bucketValues: []int64{1, 2, 1}, + infValue: *atomic.NewInt64(1), + }, + &Gauge{ + name: "mytimedgauge", + metricType: typeGauge, + tags: map[string]string{"oki": "toki"}, + value: *atomic.NewFloat64(42.24), + timestamp: timeAsRef(time.Unix(1500000000, 0)), + }, + }, + } + + b, err := json.Marshal(s) + assert.NoError(t, err) + + expected := []byte(`{"metrics":[` + + `{"type":"COUNTER","labels":{"ololo":"trololo","sensor":"mycounter"},"value":42},` + + `{"type":"RATE","labels":{"ololo":"trololo","sensor":"myratedcounter"},"value":42},` + + `{"type":"DGAUGE","labels":{"sensor":"mygauge","shimba":"boomba"},"value":14.89},` + + `{"type":"DGAUGE","labels":{"looken":"tooken","sensor":"mytimer"},"value":1.456},` + + `{"type":"HIST","labels":{"chicken":"cooken","sensor":"myhistogram"},"hist":{"bounds":[1,2,3],"buckets":[1,2,1],"inf":1}},` + + `{"type":"HIST_RATE","labels":{"chicken":"cooken","sensor":"myratedhistogram"},"hist":{"bounds":[1,2,3],"buckets":[1,2,1],"inf":1}},` + + `{"type":"DGAUGE","labels":{"oki":"toki","sensor":"mytimedgauge"},"value":42.24,"ts":1500000000}` + + `]}`) + assert.Equal(t, expected, b) +} + +func timeAsRef(t time.Time) *time.Time { + return &t +} + +func TestMetrics_with_timestamp_MarshalJSON(t *testing.T) { + s := &Metrics{ + metrics: []Metric{ + &Counter{ + name: "mycounter", + metricType: typeCounter, + tags: map[string]string{"ololo": "trololo"}, + value: *atomic.NewInt64(42), + }, + &Gauge{ + name: "mytimedgauge", + metricType: typeGauge, + tags: map[string]string{"oki": "toki"}, + value: *atomic.NewFloat64(42.24), + timestamp: timeAsRef(time.Unix(1500000000, 0)), + }, + }, + timestamp: timeAsRef(time.Unix(1657710477, 0)), + } + + b, err := json.Marshal(s) + assert.NoError(t, err) + + expected := []byte(`{"metrics":[` + + `{"type":"COUNTER","labels":{"ololo":"trololo","sensor":"mycounter"},"value":42},` + + `{"type":"DGAUGE","labels":{"oki":"toki","sensor":"mytimedgauge"},"value":42.24,"ts":1500000000}` + + `],"ts":1657710477}`) + assert.Equal(t, expected, b) +} + +func TestRated(t *testing.T) { + testCases := []struct { + name string + s interface{} + expected Metric + }{ + { + "counter", + &Counter{ + name: "mycounter", + metricType: typeCounter, + tags: map[string]string{"ololo": "trololo"}, + value: *atomic.NewInt64(42), + }, + &Counter{ + name: "mycounter", + metricType: typeRated, + tags: map[string]string{"ololo": "trololo"}, + value: *atomic.NewInt64(42), + }, + }, + { + "gauge", + &Gauge{ + name: "mygauge", + metricType: typeGauge, + tags: map[string]string{"ololo": "trololo"}, + value: *atomic.NewFloat64(42), + }, + &Gauge{ + name: "mygauge", + metricType: typeGauge, + tags: map[string]string{"ololo": "trololo"}, + value: *atomic.NewFloat64(42), + }, + }, + { + "timer", + &Timer{ + name: "mytimer", + metricType: typeGauge, + tags: map[string]string{"ololo": "trololo"}, + value: *atomic.NewDuration(1 * time.Second), + }, + &Timer{ + name: "mytimer", + metricType: typeGauge, + tags: map[string]string{"ololo": "trololo"}, + value: *atomic.NewDuration(1 * time.Second), + }, + }, + { + "histogram", + &Histogram{ + name: "myhistogram", + metricType: typeHistogram, + tags: map[string]string{"ololo": "trololo"}, + bucketBounds: []float64{1, 2, 3}, + infValue: *atomic.NewInt64(0), + }, + &Histogram{ + name: "myhistogram", + metricType: typeRatedHistogram, + tags: map[string]string{"ololo": "trololo"}, + bucketBounds: []float64{1, 2, 3}, + infValue: *atomic.NewInt64(0), + }, + }, + { + "metric_interface", + metrics.Counter(&Counter{ + name: "mycounter", + metricType: typeCounter, + tags: map[string]string{"ololo": "trololo"}, + value: *atomic.NewInt64(42), + }), + &Counter{ + name: "mycounter", + metricType: typeRated, + tags: map[string]string{"ololo": "trololo"}, + value: *atomic.NewInt64(42), + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + Rated(tc.s) + assert.Equal(t, tc.expected, tc.s) + }) + } +} + +func TestSplitToChunks(t *testing.T) { + zeroMetrics := Metrics{ + metrics: []Metric{}, + } + oneMetric := Metrics{ + metrics: []Metric{ + &Counter{name: "a"}, + }, + } + twoMetrics := Metrics{ + metrics: []Metric{ + &Counter{name: "a"}, + &Counter{name: "b"}, + }, + } + fourMetrics := Metrics{ + metrics: []Metric{ + &Counter{name: "a"}, + &Counter{name: "b"}, + &Counter{name: "c"}, + &Counter{name: "d"}, + }, + } + fiveMetrics := Metrics{ + metrics: []Metric{ + &Counter{name: "a"}, + &Counter{name: "b"}, + &Counter{name: "c"}, + &Counter{name: "d"}, + &Counter{name: "e"}, + }, + } + + chunks := zeroMetrics.SplitToChunks(2) + assert.Equal(t, 1, len(chunks)) + assert.Equal(t, 0, len(chunks[0].metrics)) + + chunks = oneMetric.SplitToChunks(1) + assert.Equal(t, 1, len(chunks)) + assert.Equal(t, 1, len(chunks[0].metrics)) + assert.Equal(t, "a", chunks[0].metrics[0].Name()) + + chunks = oneMetric.SplitToChunks(2) + assert.Equal(t, 1, len(chunks)) + assert.Equal(t, 1, len(chunks[0].metrics)) + assert.Equal(t, "a", chunks[0].metrics[0].Name()) + + chunks = twoMetrics.SplitToChunks(1) + assert.Equal(t, 2, len(chunks)) + assert.Equal(t, 1, len(chunks[0].metrics)) + assert.Equal(t, 1, len(chunks[1].metrics)) + assert.Equal(t, "a", chunks[0].metrics[0].Name()) + assert.Equal(t, "b", chunks[1].metrics[0].Name()) + + chunks = twoMetrics.SplitToChunks(2) + assert.Equal(t, 1, len(chunks)) + assert.Equal(t, 2, len(chunks[0].metrics)) + assert.Equal(t, "a", chunks[0].metrics[0].Name()) + assert.Equal(t, "b", chunks[0].metrics[1].Name()) + + chunks = fourMetrics.SplitToChunks(2) + assert.Equal(t, 2, len(chunks)) + assert.Equal(t, 2, len(chunks[0].metrics)) + assert.Equal(t, 2, len(chunks[1].metrics)) + assert.Equal(t, "a", chunks[0].metrics[0].Name()) + assert.Equal(t, "b", chunks[0].metrics[1].Name()) + assert.Equal(t, "c", chunks[1].metrics[0].Name()) + assert.Equal(t, "d", chunks[1].metrics[1].Name()) + + chunks = fiveMetrics.SplitToChunks(2) + assert.Equal(t, 3, len(chunks)) + assert.Equal(t, 2, len(chunks[0].metrics)) + assert.Equal(t, 2, len(chunks[1].metrics)) + assert.Equal(t, 1, len(chunks[2].metrics)) + assert.Equal(t, "a", chunks[0].metrics[0].Name()) + assert.Equal(t, "b", chunks[0].metrics[1].Name()) + assert.Equal(t, "c", chunks[1].metrics[0].Name()) + assert.Equal(t, "d", chunks[1].metrics[1].Name()) + assert.Equal(t, "e", chunks[2].metrics[0].Name()) + + chunks = fiveMetrics.SplitToChunks(0) + assert.Equal(t, 1, len(chunks)) + assert.Equal(t, 5, len(chunks[0].metrics)) + assert.Equal(t, "a", chunks[0].metrics[0].Name()) + assert.Equal(t, "b", chunks[0].metrics[1].Name()) + assert.Equal(t, "c", chunks[0].metrics[2].Name()) + assert.Equal(t, "d", chunks[0].metrics[3].Name()) + assert.Equal(t, "e", chunks[0].metrics[4].Name()) +} diff --git a/library/go/core/metrics/solomon/race_test.go b/library/go/core/metrics/solomon/race_test.go new file mode 100644 index 0000000000..32be6f34fb --- /dev/null +++ b/library/go/core/metrics/solomon/race_test.go @@ -0,0 +1,150 @@ +package solomon + +import ( + "bytes" + "context" + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/ydb-platform/ydb/library/go/core/metrics" +) + +type spinBarrier struct { + count int64 + waiting atomic.Int64 + step atomic.Int64 +} + +func newSpinBarrier(size int) *spinBarrier { + return &spinBarrier{count: int64(size)} +} + +func (b *spinBarrier) wait() { + s := b.step.Load() + w := b.waiting.Add(1) + if w == b.count { + b.waiting.Store(0) + b.step.Add(1) + } else { + for s == b.step.Load() { + // noop + } + } +} + +func TestRaceDurationHistogramVecVersusStreamJson(t *testing.T) { + // Regression test: https://github.com/ydb-platform/ydb/review/2690822/details + registry := NewRegistry(NewRegistryOpts()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const stepCount = 200 + + barrier := newSpinBarrier(2) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + // Consumer + defer wg.Done() + out := bytes.NewBuffer(nil) + for i := 0; i < stepCount; i++ { + out.Reset() + barrier.wait() + _, err := registry.StreamJSON(ctx, out) + if err != nil { + require.ErrorIs(t, err, context.Canceled) + break + } + } + }() + + wg.Add(1) + go func() { + // Producer + defer wg.Done() + + const success = "success" + const version = "version" + vecs := make([]metrics.TimerVec, 0) + buckets := metrics.NewDurationBuckets(1, 2, 3) + ProducerLoop: + for i := 0; i < stepCount; i++ { + barrier.wait() + vec := registry.DurationHistogramVec( + fmt.Sprintf("latency-%v", i), + buckets, + []string{success, version}, + ) + Rated(vec) + vecs = append(vecs, vec) + for _, v := range vecs { + v.With(map[string]string{success: "ok", version: "123"}).RecordDuration(time.Second) + v.With(map[string]string{success: "false", version: "123"}).RecordDuration(time.Millisecond) + } + select { + case <-ctx.Done(): + break ProducerLoop + default: + // noop + } + } + }() + wg.Wait() +} + +func TestRaceDurationHistogramRecordDurationVersusStreamJson(t *testing.T) { + // Regression test: https://github.com/ydb-platform/ydb/review/2690822/details + + registry := NewRegistry(NewRegistryOpts()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const stepCount = 200 + barrier := newSpinBarrier(2) + wg := sync.WaitGroup{} + + wg.Add(1) + go func() { + // Consumer + defer wg.Done() + out := bytes.NewBuffer(nil) + for i := 0; i < stepCount; i++ { + out.Reset() + barrier.wait() + _, err := registry.StreamJSON(ctx, out) + if err != nil { + require.ErrorIs(t, err, context.Canceled) + break + } + } + }() + + wg.Add(1) + go func() { + // Producer + defer wg.Done() + + buckets := metrics.NewDurationBuckets(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + hist := registry.DurationHistogram("latency", buckets) + // Rated(hist) + + ProducerLoop: + for i := 0; i < stepCount; i++ { + barrier.wait() + hist.RecordDuration(time.Duration(i % 10)) + select { + case <-ctx.Done(): + break ProducerLoop + default: + // noop + } + } + }() + wg.Wait() +} diff --git a/library/go/core/metrics/solomon/registry.go b/library/go/core/metrics/solomon/registry.go new file mode 100644 index 0000000000..0ad4d9378a --- /dev/null +++ b/library/go/core/metrics/solomon/registry.go @@ -0,0 +1,256 @@ +package solomon + +import ( + "reflect" + "strconv" + "sync" + + "github.com/ydb-platform/ydb/library/go/core/metrics" + "github.com/ydb-platform/ydb/library/go/core/metrics/internal/pkg/metricsutil" + "github.com/ydb-platform/ydb/library/go/core/metrics/internal/pkg/registryutil" +) + +var _ metrics.Registry = (*Registry)(nil) + +type Registry struct { + separator string + prefix string + tags map[string]string + rated bool + useNameTag bool + + subregistries map[string]*Registry + m *sync.Mutex + + metrics *sync.Map +} + +func NewRegistry(opts *RegistryOpts) *Registry { + r := &Registry{ + separator: ".", + useNameTag: false, + + subregistries: make(map[string]*Registry), + m: new(sync.Mutex), + + metrics: new(sync.Map), + } + + if opts != nil { + r.separator = string(opts.Separator) + r.prefix = opts.Prefix + r.tags = opts.Tags + r.rated = opts.Rated + r.useNameTag = opts.UseNameTag + for _, collector := range opts.Collectors { + collector(r) + } + } + + return r +} + +// Rated returns copy of registry with rated set to desired value. +func (r Registry) Rated(rated bool) metrics.Registry { + return &Registry{ + separator: r.separator, + prefix: r.prefix, + tags: r.tags, + rated: rated, + useNameTag: r.useNameTag, + + subregistries: r.subregistries, + m: r.m, + + metrics: r.metrics, + } +} + +// WithTags creates new sub-scope, where each metric has tags attached to it. +func (r Registry) WithTags(tags map[string]string) metrics.Registry { + return r.newSubregistry(r.prefix, registryutil.MergeTags(r.tags, tags)) +} + +// WithPrefix creates new sub-scope, where each metric has prefix added to it name. +func (r Registry) WithPrefix(prefix string) metrics.Registry { + return r.newSubregistry(registryutil.BuildFQName(r.separator, r.prefix, prefix), r.tags) +} + +// ComposeName builds FQ name with appropriate separator. +func (r Registry) ComposeName(parts ...string) string { + return registryutil.BuildFQName(r.separator, parts...) +} + +func (r Registry) Counter(name string) metrics.Counter { + s := &Counter{ + name: r.newMetricName(name), + metricType: typeCounter, + tags: r.tags, + + useNameTag: r.useNameTag, + } + + return r.registerMetric(s).(metrics.Counter) +} + +func (r Registry) FuncCounter(name string, function func() int64) metrics.FuncCounter { + s := &FuncCounter{ + name: r.newMetricName(name), + metricType: typeCounter, + tags: r.tags, + function: function, + useNameTag: r.useNameTag, + } + + return r.registerMetric(s).(metrics.FuncCounter) +} + +func (r Registry) Gauge(name string) metrics.Gauge { + s := &Gauge{ + name: r.newMetricName(name), + metricType: typeGauge, + tags: r.tags, + useNameTag: r.useNameTag, + } + + return r.registerMetric(s).(metrics.Gauge) +} + +func (r Registry) FuncGauge(name string, function func() float64) metrics.FuncGauge { + s := &FuncGauge{ + name: r.newMetricName(name), + metricType: typeGauge, + tags: r.tags, + function: function, + useNameTag: r.useNameTag, + } + + return r.registerMetric(s).(metrics.FuncGauge) +} + +func (r Registry) IntGauge(name string) metrics.IntGauge { + s := &IntGauge{ + name: r.newMetricName(name), + metricType: typeIGauge, + tags: r.tags, + useNameTag: r.useNameTag, + } + + return r.registerMetric(s).(metrics.IntGauge) +} + +func (r Registry) FuncIntGauge(name string, function func() int64) metrics.FuncIntGauge { + s := &FuncIntGauge{ + name: r.newMetricName(name), + metricType: typeIGauge, + tags: r.tags, + function: function, + useNameTag: r.useNameTag, + } + + return r.registerMetric(s).(metrics.FuncIntGauge) +} + +func (r Registry) Timer(name string) metrics.Timer { + s := &Timer{ + name: r.newMetricName(name), + metricType: typeGauge, + tags: r.tags, + useNameTag: r.useNameTag, + } + + return r.registerMetric(s).(metrics.Timer) +} + +func (r Registry) Histogram(name string, buckets metrics.Buckets) metrics.Histogram { + s := &Histogram{ + name: r.newMetricName(name), + metricType: typeHistogram, + tags: r.tags, + bucketBounds: metricsutil.BucketsBounds(buckets), + bucketValues: make([]int64, buckets.Size()), + useNameTag: r.useNameTag, + } + + return r.registerMetric(s).(metrics.Histogram) +} + +func (r Registry) DurationHistogram(name string, buckets metrics.DurationBuckets) metrics.Timer { + s := &Histogram{ + name: r.newMetricName(name), + metricType: typeHistogram, + tags: r.tags, + bucketBounds: metricsutil.DurationBucketsBounds(buckets), + bucketValues: make([]int64, buckets.Size()), + useNameTag: r.useNameTag, + } + + return r.registerMetric(s).(metrics.Timer) +} + +func (r *Registry) newSubregistry(prefix string, tags map[string]string) *Registry { + // differ simple and rated registries + keyTags := registryutil.MergeTags(tags, map[string]string{"rated": strconv.FormatBool(r.rated)}) + registryKey := registryutil.BuildRegistryKey(prefix, keyTags) + + r.m.Lock() + defer r.m.Unlock() + + if existing, ok := r.subregistries[registryKey]; ok { + return existing + } + + subregistry := &Registry{ + separator: r.separator, + prefix: prefix, + tags: tags, + rated: r.rated, + useNameTag: r.useNameTag, + + subregistries: r.subregistries, + m: r.m, + + metrics: r.metrics, + } + + r.subregistries[registryKey] = subregistry + return subregistry +} + +func (r *Registry) newMetricName(name string) string { + return registryutil.BuildFQName(r.separator, r.prefix, name) +} + +func (r *Registry) registerMetric(s Metric) Metric { + if r.rated { + Rated(s) + } + + key := r.metricKey(s) + + oldMetric, loaded := r.metrics.LoadOrStore(key, s) + if !loaded { + return s + } + + if reflect.TypeOf(oldMetric) == reflect.TypeOf(s) { + return oldMetric.(Metric) + } else { + r.metrics.Store(key, s) + return s + } +} + +func (r *Registry) unregisterMetric(s Metric) { + if r.rated { + Rated(s) + } + + r.metrics.Delete(r.metricKey(s)) +} + +func (r *Registry) metricKey(s Metric) string { + // differ simple and rated registries + keyTags := registryutil.MergeTags(r.tags, map[string]string{"rated": strconv.FormatBool(r.rated)}) + return registryutil.BuildRegistryKey(s.Name(), keyTags) +} diff --git a/library/go/core/metrics/solomon/registry_opts.go b/library/go/core/metrics/solomon/registry_opts.go new file mode 100644 index 0000000000..c3df17940a --- /dev/null +++ b/library/go/core/metrics/solomon/registry_opts.go @@ -0,0 +1,87 @@ +package solomon + +import ( + "context" + + "github.com/ydb-platform/ydb/library/go/core/metrics" + "github.com/ydb-platform/ydb/library/go/core/metrics/collect" + "github.com/ydb-platform/ydb/library/go/core/metrics/internal/pkg/registryutil" +) + +type RegistryOpts struct { + Separator rune + Prefix string + Tags map[string]string + Rated bool + UseNameTag bool + Collectors []func(metrics.Registry) +} + +// NewRegistryOpts returns new initialized instance of RegistryOpts +func NewRegistryOpts() *RegistryOpts { + return &RegistryOpts{ + Separator: '.', + Tags: make(map[string]string), + UseNameTag: false, + } +} + +// SetUseNameTag overrides current UseNameTag opt +func (o *RegistryOpts) SetUseNameTag(useNameTag bool) *RegistryOpts { + o.UseNameTag = useNameTag + return o +} + +// SetTags overrides existing tags +func (o *RegistryOpts) SetTags(tags map[string]string) *RegistryOpts { + o.Tags = tags + return o +} + +// AddTags merges given tags with existing +func (o *RegistryOpts) AddTags(tags map[string]string) *RegistryOpts { + for k, v := range tags { + o.Tags[k] = v + } + return o +} + +// SetPrefix overrides existing prefix +func (o *RegistryOpts) SetPrefix(prefix string) *RegistryOpts { + o.Prefix = prefix + return o +} + +// AppendPrefix adds given prefix as postfix to existing using separator +func (o *RegistryOpts) AppendPrefix(prefix string) *RegistryOpts { + o.Prefix = registryutil.BuildFQName(string(o.Separator), o.Prefix, prefix) + return o +} + +// SetSeparator overrides existing separator +func (o *RegistryOpts) SetSeparator(separator rune) *RegistryOpts { + o.Separator = separator + return o +} + +// SetRated overrides existing rated flag +func (o *RegistryOpts) SetRated(rated bool) *RegistryOpts { + o.Rated = rated + return o +} + +// AddCollectors adds collectors that handle their metrics automatically (e.g. system metrics). +func (o *RegistryOpts) AddCollectors( + ctx context.Context, c metrics.CollectPolicy, collectors ...collect.Func, +) *RegistryOpts { + if len(collectors) == 0 { + return o + } + + o.Collectors = append(o.Collectors, func(r metrics.Registry) { + for _, collector := range collectors { + collector(ctx, r, c) + } + }) + return o +} diff --git a/library/go/core/metrics/solomon/registry_test.go b/library/go/core/metrics/solomon/registry_test.go new file mode 100644 index 0000000000..a870203b31 --- /dev/null +++ b/library/go/core/metrics/solomon/registry_test.go @@ -0,0 +1,168 @@ +package solomon + +import ( + "encoding/json" + "fmt" + "reflect" + "sync" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" +) + +func TestRegistry_Gather(t *testing.T) { + r := &Registry{ + separator: ".", + prefix: "myprefix", + tags: make(map[string]string), + subregistries: make(map[string]*Registry), + metrics: func() *sync.Map { + metrics := map[string]Metric{ + "myprefix.mycounter": &Counter{ + name: "myprefix.mycounter", + tags: map[string]string{"ololo": "trololo"}, + value: *atomic.NewInt64(42), + }, + "myprefix.mygauge": &Gauge{ + name: "myprefix.mygauge", + tags: map[string]string{"shimba": "boomba"}, + value: *atomic.NewFloat64(14.89), + }, + "myprefix.mytimer": &Timer{ + name: "myprefix.mytimer", + tags: map[string]string{"looken": "tooken"}, + value: *atomic.NewDuration(1456 * time.Millisecond), + }, + "myprefix.myhistogram": &Histogram{ + name: "myprefix.myhistogram", + tags: map[string]string{"chicken": "cooken"}, + bucketBounds: []float64{1, 2, 3}, + bucketValues: []int64{1, 2, 1}, + infValue: *atomic.NewInt64(1), + }, + } + + sm := new(sync.Map) + for k, v := range metrics { + sm.Store(k, v) + } + + return sm + }(), + } + + s, err := r.Gather() + assert.NoError(t, err) + + expected := &Metrics{} + r.metrics.Range(func(_, s interface{}) bool { + expected.metrics = append(expected.metrics, s.(Metric)) + return true + }) + + opts := cmp.Options{ + cmp.AllowUnexported(Metrics{}, Counter{}, Gauge{}, Timer{}, Histogram{}), + cmpopts.IgnoreUnexported(sync.Mutex{}, atomic.Duration{}, atomic.Int64{}, atomic.Float64{}), + // this will sort both slices for latest tests as well + cmpopts.SortSlices(func(x, y Metric) bool { + return x.Name() < y.Name() + }), + } + + assert.True(t, cmp.Equal(expected, s, opts...), cmp.Diff(expected, s, opts...)) + + for _, sen := range s.metrics { + var expectedMetric Metric + for _, expSen := range expected.metrics { + if expSen.Name() == sen.Name() { + expectedMetric = expSen + break + } + } + require.NotNil(t, expectedMetric) + + assert.NotEqual(t, fmt.Sprintf("%p", expectedMetric), fmt.Sprintf("%p", sen)) + assert.IsType(t, expectedMetric, sen) + + switch st := sen.(type) { + case *Counter: + assert.NotEqual(t, fmt.Sprintf("%p", expectedMetric.(*Counter)), fmt.Sprintf("%p", st)) + case *Gauge: + assert.NotEqual(t, fmt.Sprintf("%p", expectedMetric.(*Gauge)), fmt.Sprintf("%p", st)) + case *Timer: + assert.NotEqual(t, fmt.Sprintf("%p", expectedMetric.(*Timer)), fmt.Sprintf("%p", st)) + case *Histogram: + assert.NotEqual(t, fmt.Sprintf("%p", expectedMetric.(*Histogram)), fmt.Sprintf("%p", st)) + default: + t.Fatalf("unexpected metric type: %T", sen) + } + } +} + +func TestDoubleRegistration(t *testing.T) { + r := NewRegistry(NewRegistryOpts()) + + c0 := r.Counter("counter") + c1 := r.Counter("counter") + require.Equal(t, c0, c1) + + g0 := r.Gauge("counter") + g1 := r.Gauge("counter") + require.Equal(t, g0, g1) + + c2 := r.Counter("counter") + require.NotEqual(t, reflect.ValueOf(c0).Elem().UnsafeAddr(), reflect.ValueOf(c2).Elem().UnsafeAddr()) +} + +func TestSubregistry(t *testing.T) { + r := NewRegistry(NewRegistryOpts()) + + r0 := r.WithPrefix("one") + r1 := r0.WithPrefix("two") + r2 := r0.WithTags(map[string]string{"foo": "bar"}) + + _ = r0.Counter("counter") + _ = r1.Counter("counter") + _ = r2.Counter("counter") +} + +func TestSubregistry_TagAndPrefixReorder(t *testing.T) { + r := NewRegistry(NewRegistryOpts()) + + r0 := r.WithPrefix("one") + r1 := r.WithTags(map[string]string{"foo": "bar"}) + + r3 := r0.WithTags(map[string]string{"foo": "bar"}) + r4 := r1.WithPrefix("one") + + require.True(t, r3 == r4) +} + +func TestRatedRegistry(t *testing.T) { + r := NewRegistry(NewRegistryOpts().SetRated(true)) + s := r.Counter("counter") + b, _ := json.Marshal(s) + expected := []byte(`{"type":"RATE","labels":{"sensor":"counter"},"value":0}`) + assert.Equal(t, expected, b) +} + +func TestNameTagRegistry(t *testing.T) { + r := NewRegistry(NewRegistryOpts().SetUseNameTag(true)) + s := r.Counter("counter") + + b, _ := json.Marshal(s) + expected := []byte(`{"type":"COUNTER","labels":{"name":"counter"},"value":0}`) + assert.Equal(t, expected, b) + + sr := r.WithTags(map[string]string{"foo": "bar"}) + ssr := sr.Counter("sub_counter") + + b1, _ := json.Marshal(ssr) + expected1 := []byte(`{"type":"COUNTER","labels":{"foo":"bar","name":"sub_counter"},"value":0}`) + assert.Equal(t, expected1, b1) +} diff --git a/library/go/core/metrics/solomon/spack.go b/library/go/core/metrics/solomon/spack.go new file mode 100644 index 0000000000..48938d19b6 --- /dev/null +++ b/library/go/core/metrics/solomon/spack.go @@ -0,0 +1,387 @@ +package solomon + +import ( + "bytes" + "context" + "encoding/binary" + "io" + + "github.com/ydb-platform/ydb/library/go/core/xerrors" +) + +type spackVersion uint16 + +const ( + version11 spackVersion = 0x0101 + version12 spackVersion = 0x0102 +) + +type errWriter struct { + w io.Writer + err error +} + +func (ew *errWriter) binaryWrite(data interface{}) { + if ew.err != nil { + return + } + switch t := data.(type) { + case uint8: + ew.err = binary.Write(ew.w, binary.LittleEndian, data.(uint8)) + case uint16: + ew.err = binary.Write(ew.w, binary.LittleEndian, data.(uint16)) + case uint32: + ew.err = binary.Write(ew.w, binary.LittleEndian, data.(uint32)) + default: + ew.err = xerrors.Errorf("binaryWrite not supported type %v", t) + } +} + +func writeULEB128(w io.Writer, value uint32) error { + remaining := value >> 7 + for remaining != 0 { + err := binary.Write(w, binary.LittleEndian, uint8(value&0x7f|0x80)) + if err != nil { + return xerrors.Errorf("binary.Write failed: %w", err) + } + value = remaining + remaining >>= 7 + } + err := binary.Write(w, binary.LittleEndian, uint8(value&0x7f)) + if err != nil { + return xerrors.Errorf("binary.Write failed: %w", err) + } + return err +} + +type spackMetric struct { + flags uint8 + + nameValueIndex uint32 + labelsCount uint32 + labels bytes.Buffer + + metric Metric +} + +func (s *spackMetric) writeLabelPool(se *spackEncoder, namesIdx map[string]uint32, valuesIdx map[string]uint32, name string, value string) error { + _, ok := namesIdx[name] + if !ok { + namesIdx[name] = se.nameCounter + se.nameCounter++ + _, err := se.labelNamePool.WriteString(name) + if err != nil { + return err + } + err = se.labelNamePool.WriteByte(0) + if err != nil { + return err + } + } + + _, ok = valuesIdx[value] + if !ok { + valuesIdx[value] = se.valueCounter + se.valueCounter++ + _, err := se.labelValuePool.WriteString(value) + if err != nil { + return err + } + err = se.labelValuePool.WriteByte(0) + if err != nil { + return err + } + } + + return nil +} + +func (s *spackMetric) writeLabel(se *spackEncoder, namesIdx map[string]uint32, valuesIdx map[string]uint32, name string, value string) error { + s.labelsCount++ + + err := s.writeLabelPool(se, namesIdx, valuesIdx, name, value) + if err != nil { + return err + } + + err = writeULEB128(&s.labels, uint32(namesIdx[name])) + if err != nil { + return err + } + err = writeULEB128(&s.labels, uint32(valuesIdx[value])) + if err != nil { + return err + } + + return nil +} + +func (s *spackMetric) writeMetric(w io.Writer, version spackVersion) error { + metricValueType := valueTypeOneWithoutTS + if s.metric.getTimestamp() != nil { + metricValueType = valueTypeOneWithTS + } + // library/cpp/monlib/encode/spack/spack_v1_encoder.cpp?rev=r9098142#L190 + types := uint8(s.metric.getType()<<2) | uint8(metricValueType) + err := binary.Write(w, binary.LittleEndian, types) + if err != nil { + return xerrors.Errorf("binary.Write types failed: %w", err) + } + + err = binary.Write(w, binary.LittleEndian, uint8(s.flags)) + if err != nil { + return xerrors.Errorf("binary.Write flags failed: %w", err) + } + if version >= version12 { + err = writeULEB128(w, uint32(s.nameValueIndex)) + if err != nil { + return xerrors.Errorf("writeULEB128 name value index: %w", err) + } + } + err = writeULEB128(w, uint32(s.labelsCount)) + if err != nil { + return xerrors.Errorf("writeULEB128 labels count failed: %w", err) + } + + _, err = w.Write(s.labels.Bytes()) // s.writeLabels(w) + if err != nil { + return xerrors.Errorf("write labels failed: %w", err) + } + if s.metric.getTimestamp() != nil { + err = binary.Write(w, binary.LittleEndian, uint32(s.metric.getTimestamp().Unix())) + if err != nil { + return xerrors.Errorf("write timestamp failed: %w", err) + } + } + + switch s.metric.getType() { + case typeGauge: + err = binary.Write(w, binary.LittleEndian, s.metric.getValue().(float64)) + if err != nil { + return xerrors.Errorf("binary.Write gauge value failed: %w", err) + } + case typeIGauge: + err = binary.Write(w, binary.LittleEndian, s.metric.getValue().(int64)) + if err != nil { + return xerrors.Errorf("binary.Write igauge value failed: %w", err) + } + case typeCounter, typeRated: + err = binary.Write(w, binary.LittleEndian, uint64(s.metric.getValue().(int64))) + if err != nil { + return xerrors.Errorf("binary.Write counter value failed: %w", err) + } + case typeHistogram, typeRatedHistogram: + h := s.metric.getValue().(histogram) + err = h.writeHistogram(w) + if err != nil { + return xerrors.Errorf("writeHistogram failed: %w", err) + } + default: + return xerrors.Errorf("unknown metric type: %v", s.metric.getType()) + } + return nil +} + +type SpackOpts func(*spackEncoder) + +func WithVersion12() func(*spackEncoder) { + return func(se *spackEncoder) { + se.version = version12 + } +} + +type spackEncoder struct { + context context.Context + compression uint8 + version spackVersion + + nameCounter uint32 + valueCounter uint32 + + labelNamePool bytes.Buffer + labelValuePool bytes.Buffer + + metrics Metrics +} + +func NewSpackEncoder(ctx context.Context, compression CompressionType, metrics *Metrics, opts ...SpackOpts) *spackEncoder { + if metrics == nil { + metrics = &Metrics{} + } + se := &spackEncoder{ + context: ctx, + compression: uint8(compression), + version: version11, + metrics: *metrics, + } + for _, op := range opts { + op(se) + } + return se +} + +func (se *spackEncoder) writeLabels() ([]spackMetric, error) { + namesIdx := make(map[string]uint32) + valuesIdx := make(map[string]uint32) + spackMetrics := make([]spackMetric, len(se.metrics.metrics)) + + for idx, metric := range se.metrics.metrics { + m := spackMetric{metric: metric} + + var err error + if se.version >= version12 { + err = m.writeLabelPool(se, namesIdx, valuesIdx, metric.getNameTag(), metric.Name()) + m.nameValueIndex = valuesIdx[metric.getNameTag()] + } else { + err = m.writeLabel(se, namesIdx, valuesIdx, metric.getNameTag(), metric.Name()) + } + if err != nil { + return nil, err + } + + for name, value := range metric.getLabels() { + if err := m.writeLabel(se, namesIdx, valuesIdx, name, value); err != nil { + return nil, err + } + + } + spackMetrics[idx] = m + } + + return spackMetrics, nil +} + +func (se *spackEncoder) Encode(w io.Writer) (written int, err error) { + spackMetrics, err := se.writeLabels() + if err != nil { + return written, xerrors.Errorf("writeLabels failed: %w", err) + } + + err = se.writeHeader(w) + if err != nil { + return written, xerrors.Errorf("writeHeader failed: %w", err) + } + written += HeaderSize + compression := CompressionType(se.compression) + + cw := newCompressedWriter(w, compression) + + err = se.writeLabelNamesPool(cw) + if err != nil { + return written, xerrors.Errorf("writeLabelNamesPool failed: %w", err) + } + + err = se.writeLabelValuesPool(cw) + if err != nil { + return written, xerrors.Errorf("writeLabelValuesPool failed: %w", err) + } + + err = se.writeCommonTime(cw) + if err != nil { + return written, xerrors.Errorf("writeCommonTime failed: %w", err) + } + + err = se.writeCommonLabels(cw) + if err != nil { + return written, xerrors.Errorf("writeCommonLabels failed: %w", err) + } + + err = se.writeMetricsData(cw, spackMetrics) + if err != nil { + return written, xerrors.Errorf("writeMetricsData failed: %w", err) + } + + err = cw.Close() + if err != nil { + return written, xerrors.Errorf("close failed: %w", err) + } + + switch compression { + case CompressionNone: + written += cw.(*noCompressionWriteCloser).written + case CompressionLz4: + written += cw.(*lz4CompressionWriteCloser).written + } + + return written, nil +} + +func (se *spackEncoder) writeHeader(w io.Writer) error { + if se.context.Err() != nil { + return xerrors.Errorf("streamSpack context error: %w", se.context.Err()) + } + ew := &errWriter{w: w} + ew.binaryWrite(uint16(0x5053)) // Magic + ew.binaryWrite(uint16(se.version)) // Version + ew.binaryWrite(uint16(24)) // HeaderSize + ew.binaryWrite(uint8(0)) // TimePrecision(SECONDS) + ew.binaryWrite(uint8(se.compression)) // CompressionAlg + ew.binaryWrite(uint32(se.labelNamePool.Len())) // LabelNamesSize + ew.binaryWrite(uint32(se.labelValuePool.Len())) // LabelValuesSize + ew.binaryWrite(uint32(len(se.metrics.metrics))) // MetricsCount + ew.binaryWrite(uint32(len(se.metrics.metrics))) // PointsCount + if ew.err != nil { + return xerrors.Errorf("binaryWrite failed: %w", ew.err) + } + return nil +} + +func (se *spackEncoder) writeLabelNamesPool(w io.Writer) error { + if se.context.Err() != nil { + return xerrors.Errorf("streamSpack context error: %w", se.context.Err()) + } + _, err := w.Write(se.labelNamePool.Bytes()) + if err != nil { + return xerrors.Errorf("write labelNamePool failed: %w", err) + } + return nil +} + +func (se *spackEncoder) writeLabelValuesPool(w io.Writer) error { + if se.context.Err() != nil { + return xerrors.Errorf("streamSpack context error: %w", se.context.Err()) + } + + _, err := w.Write(se.labelValuePool.Bytes()) + if err != nil { + return xerrors.Errorf("write labelValuePool failed: %w", err) + } + return nil +} + +func (se *spackEncoder) writeCommonTime(w io.Writer) error { + if se.context.Err() != nil { + return xerrors.Errorf("streamSpack context error: %w", se.context.Err()) + } + + if se.metrics.timestamp == nil { + return binary.Write(w, binary.LittleEndian, uint32(0)) + } + return binary.Write(w, binary.LittleEndian, uint32(se.metrics.timestamp.Unix())) +} + +func (se *spackEncoder) writeCommonLabels(w io.Writer) error { + if se.context.Err() != nil { + return xerrors.Errorf("streamSpack context error: %w", se.context.Err()) + } + + _, err := w.Write([]byte{0}) + if err != nil { + return xerrors.Errorf("write commonLabels failed: %w", err) + } + return nil +} + +func (se *spackEncoder) writeMetricsData(w io.Writer, metrics []spackMetric) error { + for _, s := range metrics { + if se.context.Err() != nil { + return xerrors.Errorf("streamSpack context error: %w", se.context.Err()) + } + + err := s.writeMetric(w, se.version) + if err != nil { + return xerrors.Errorf("write metric failed: %w", err) + } + } + return nil +} diff --git a/library/go/core/metrics/solomon/spack_compression.go b/library/go/core/metrics/solomon/spack_compression.go new file mode 100644 index 0000000000..004fe0150d --- /dev/null +++ b/library/go/core/metrics/solomon/spack_compression.go @@ -0,0 +1,162 @@ +package solomon + +import ( + "encoding/binary" + "io" + + "github.com/OneOfOne/xxhash" + "github.com/pierrec/lz4" +) + +type CompressionType uint8 + +const ( + CompressionNone CompressionType = 0x0 + CompressionZlib CompressionType = 0x1 + CompressionZstd CompressionType = 0x2 + CompressionLz4 CompressionType = 0x3 +) + +const ( + compressionFrameLength = 512 * 1024 + hashTableSize = 64 * 1024 +) + +type noCompressionWriteCloser struct { + underlying io.Writer + written int +} + +func (w *noCompressionWriteCloser) Write(p []byte) (int, error) { + n, err := w.underlying.Write(p) + w.written += n + return n, err +} + +func (w *noCompressionWriteCloser) Close() error { + return nil +} + +type lz4CompressionWriteCloser struct { + underlying io.Writer + buffer []byte + table []int + written int +} + +func (w *lz4CompressionWriteCloser) flushFrame() (written int, err error) { + src := w.buffer + dst := make([]byte, lz4.CompressBlockBound(len(src))) + + sz, err := lz4.CompressBlock(src, dst, w.table) + if err != nil { + return written, err + } + + if sz == 0 { + dst = src + } else { + dst = dst[:sz] + } + + err = binary.Write(w.underlying, binary.LittleEndian, uint32(len(dst))) + if err != nil { + return written, err + } + w.written += 4 + + err = binary.Write(w.underlying, binary.LittleEndian, uint32(len(src))) + if err != nil { + return written, err + } + w.written += 4 + + n, err := w.underlying.Write(dst) + if err != nil { + return written, err + } + w.written += n + + checksum := xxhash.Checksum32S(dst, 0x1337c0de) + err = binary.Write(w.underlying, binary.LittleEndian, checksum) + if err != nil { + return written, err + } + w.written += 4 + + w.buffer = w.buffer[:0] + + return written, nil +} + +func (w *lz4CompressionWriteCloser) Write(p []byte) (written int, err error) { + q := p[:] + for len(q) > 0 { + space := compressionFrameLength - len(w.buffer) + if space == 0 { + n, err := w.flushFrame() + if err != nil { + return written, err + } + w.written += n + space = compressionFrameLength + } + length := len(q) + if length > space { + length = space + } + w.buffer = append(w.buffer, q[:length]...) + q = q[length:] + } + return written, nil +} + +func (w *lz4CompressionWriteCloser) Close() error { + var err error + if len(w.buffer) > 0 { + n, err := w.flushFrame() + if err != nil { + return err + } + w.written += n + } + err = binary.Write(w.underlying, binary.LittleEndian, uint32(0)) + if err != nil { + return nil + } + w.written += 4 + + err = binary.Write(w.underlying, binary.LittleEndian, uint32(0)) + if err != nil { + return nil + } + w.written += 4 + + err = binary.Write(w.underlying, binary.LittleEndian, uint32(0)) + if err != nil { + return nil + } + w.written += 4 + + return nil +} + +func newCompressedWriter(w io.Writer, compression CompressionType) io.WriteCloser { + switch compression { + case CompressionNone: + return &noCompressionWriteCloser{w, 0} + case CompressionZlib: + panic("zlib compression not supported") + case CompressionZstd: + panic("zstd compression not supported") + case CompressionLz4: + return &lz4CompressionWriteCloser{ + w, + make([]byte, 0, compressionFrameLength), + make([]int, hashTableSize), + 0, + } + default: + panic("unsupported compression algorithm") + } +} diff --git a/library/go/core/metrics/solomon/spack_compression_test.go b/library/go/core/metrics/solomon/spack_compression_test.go new file mode 100644 index 0000000000..baa8a8d1e9 --- /dev/null +++ b/library/go/core/metrics/solomon/spack_compression_test.go @@ -0,0 +1,26 @@ +package solomon + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/assert" +) + +func compress(t *testing.T, c uint8, s string) []byte { + buf := bytes.Buffer{} + w := newCompressedWriter(&buf, CompressionType(c)) + _, err := w.Write([]byte(s)) + assert.Equal(t, nil, err) + assert.Equal(t, nil, w.Close()) + return buf.Bytes() +} + +func TestCompression_None(t *testing.T) { + assert.Equal(t, []byte(nil), compress(t, uint8(CompressionNone), "")) + assert.Equal(t, []byte{'a'}, compress(t, uint8(CompressionNone), "a")) +} + +func TestCompression_Lz4(t *testing.T) { + assert.Equal(t, []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, compress(t, uint8(CompressionLz4), "")) +} diff --git a/library/go/core/metrics/solomon/spack_test.go b/library/go/core/metrics/solomon/spack_test.go new file mode 100644 index 0000000000..64b504bf42 --- /dev/null +++ b/library/go/core/metrics/solomon/spack_test.go @@ -0,0 +1,184 @@ +package solomon + +import ( + "bytes" + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_metrics_encode(t *testing.T) { + expectHeader := []byte{ + 0x53, 0x50, // magic + 0x01, 0x01, // version + 0x18, 0x00, // header size + 0x0, // time precision + 0x0, // compression algorithm + 0x7, 0x0, 0x0, 0x0, // label names size + 0x8, 0x0, 0x0, 0x0, // label values size + 0x1, 0x0, 0x0, 0x0, // metric count + 0x1, 0x0, 0x0, 0x0, // point count + // label names pool + 0x73, 0x65, 0x6e, 0x73, 0x6f, 0x72, 0x0, // "sensor" + // label values pool + 0x6d, 0x79, 0x67, 0x61, 0x75, 0x67, 0x65, 0x0, // "gauge" + } + + testCases := []struct { + name string + metrics *Metrics + expectCommonTime []byte + expectCommonLabels []byte + expectMetrics [][]byte + expectWritten int + }{ + { + "common-ts+gauge", + &Metrics{ + metrics: []Metric{ + func() Metric { + g := NewGauge("mygauge", 43) + return &g + }(), + }, + timestamp: timeAsRef(time.Unix(1500000000, 0)), + }, + []byte{0x0, 0x2f, 0x68, 0x59}, // common time /1500000000 + []byte{0x0}, // common labels count and indexes + [][]byte{ + { + 0x5, // types + 0x0, // flags + 0x1, // labels index size + 0x0, // indexes of name labels + 0x0, // indexes of value labels + + 0x00, 0x00, 0x00, 0x00, 0x00, 0x80, 0x45, 0x40, // 43 // metrics value + + }, + }, + 57, + }, + { + "gauge+ts", + &Metrics{ + metrics: []Metric{ + func() Metric { + g := NewGauge("mygauge", 43, WithTimestamp(time.Unix(1657710476, 0))) + return &g + }(), + }, + }, + []byte{0x0, 0x0, 0x0, 0x0}, // common time + []byte{0x0}, // common labels count and indexes + [][]byte{ + { + 0x6, // uint8(typeGauge << 2) | uint8(valueTypeOneWithTS) + 0x0, // flags + 0x1, // labels index size + 0x0, // indexes of name labels + 0x0, // indexes of value labels + + 0x8c, 0xa7, 0xce, 0x62, //metric ts + 0x00, 0x00, 0x00, 0x00, 0x00, 0x80, 0x45, 0x40, // 43 // metrics value + + }, + }, + 61, + }, + { + "common-ts+gauge+ts", + &Metrics{ + metrics: []Metric{ + func() Metric { + g := NewGauge("mygauge", 43, WithTimestamp(time.Unix(1657710476, 0))) + return &g + }(), + func() Metric { + g := NewGauge("mygauge", 42, WithTimestamp(time.Unix(1500000000, 0))) + return &g + }(), + }, + timestamp: timeAsRef(time.Unix(1500000000, 0)), + }, + []byte{0x0, 0x2f, 0x68, 0x59}, // common time /1500000000 + []byte{0x0}, // common labels count and indexes + [][]byte{ + { + 0x6, // types + 0x0, // flags + 0x1, // labels index size + 0x0, // indexes of name labels + 0x0, // indexes of value labels + + 0x8c, 0xa7, 0xce, 0x62, //metric ts + 0x00, 0x00, 0x00, 0x00, 0x00, 0x80, 0x45, 0x40, // 43 // metrics value + + }, + { + 0x6, // types + 0x0, // flags + 0x1, // labels index size + 0x0, // indexes of name labels + 0x0, // indexes of value labels + + 0x0, 0x2f, 0x68, 0x59, // metric ts + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x45, 0x40, //42 // metrics value + + }, + }, + 78, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + var buf bytes.Buffer + ctx := context.Background() + + written, err := NewSpackEncoder(ctx, CompressionNone, tc.metrics).Encode(&buf) + + assert.NoError(t, err) + assert.Equal(t, tc.expectWritten, written) + + body := buf.Bytes() + setMetricsCount(expectHeader, len(tc.metrics.metrics)) + + require.True(t, bytes.HasPrefix(body, expectHeader)) + body = body[len(expectHeader):] + + require.True(t, bytes.HasPrefix(body, tc.expectCommonTime)) + body = body[len(tc.expectCommonTime):] + + require.True(t, bytes.HasPrefix(body, tc.expectCommonLabels)) + body = body[len(tc.expectCommonLabels):] + + expectButMissing := [][]byte{} + for range tc.expectMetrics { + var seen bool + var val []byte + for _, v := range tc.expectMetrics { + val = v + if bytes.HasPrefix(body, v) { + body = bytes.Replace(body, v, []byte{}, 1) + seen = true + break + } + } + if !seen { + expectButMissing = append(expectButMissing, val) + } + } + assert.Empty(t, body, "unexpected bytes seen") + assert.Empty(t, expectButMissing, "missing metrics bytes") + }) + } +} + +func setMetricsCount(header []byte, count int) { + header[16] = uint8(count) + header[20] = uint8(count) +} diff --git a/library/go/core/metrics/solomon/stream.go b/library/go/core/metrics/solomon/stream.go new file mode 100644 index 0000000000..7cf6d70064 --- /dev/null +++ b/library/go/core/metrics/solomon/stream.go @@ -0,0 +1,89 @@ +package solomon + +import ( + "context" + "encoding/json" + "io" + + "github.com/ydb-platform/ydb/library/go/core/xerrors" +) + +const HeaderSize = 24 + +type StreamFormat string + +func (r *Registry) StreamJSON(ctx context.Context, w io.Writer) (written int, err error) { + cw := newCompressedWriter(w, CompressionNone) + + if ctx.Err() != nil { + return written, xerrors.Errorf("streamJSON context error: %w", ctx.Err()) + } + _, err = cw.Write([]byte("{\"metrics\":[")) + if err != nil { + return written, xerrors.Errorf("write metrics failed: %w", err) + } + + first := true + r.metrics.Range(func(_, s interface{}) bool { + if ctx.Err() != nil { + err = xerrors.Errorf("streamJSON context error: %w", ctx.Err()) + return false + } + + // write trailing comma + if !first { + _, err = cw.Write([]byte(",")) + if err != nil { + err = xerrors.Errorf("write metrics failed: %w", err) + return false + } + } + + var b []byte + + b, err = json.Marshal(s) + if err != nil { + err = xerrors.Errorf("marshal metric failed: %w", err) + return false + } + + // write metric json + _, err = cw.Write(b) + if err != nil { + err = xerrors.Errorf("write metric failed: %w", err) + return false + } + + first = false + return true + }) + if err != nil { + return written, err + } + + if ctx.Err() != nil { + return written, xerrors.Errorf("streamJSON context error: %w", ctx.Err()) + } + _, err = cw.Write([]byte("]}")) + if err != nil { + return written, xerrors.Errorf("write metrics failed: %w", err) + } + + if ctx.Err() != nil { + return written, xerrors.Errorf("streamJSON context error: %w", ctx.Err()) + } + err = cw.Close() + if err != nil { + return written, xerrors.Errorf("close failed: %w", err) + } + + return cw.(*noCompressionWriteCloser).written, nil +} + +func (r *Registry) StreamSpack(ctx context.Context, w io.Writer, compression CompressionType) (int, error) { + metrics, err := r.Gather() + if err != nil { + return 0, err + } + return NewSpackEncoder(ctx, compression, metrics).Encode(w) +} diff --git a/library/go/core/metrics/solomon/stream_test.go b/library/go/core/metrics/solomon/stream_test.go new file mode 100644 index 0000000000..7548f77dbb --- /dev/null +++ b/library/go/core/metrics/solomon/stream_test.go @@ -0,0 +1,595 @@ +package solomon + +import ( + "bytes" + "context" + "encoding/json" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/ydb-platform/ydb/library/go/core/metrics" +) + +func Test_streamJson(t *testing.T) { + testCases := []struct { + name string + registry *Registry + expect string + expectWritten int + expectErr error + }{ + { + "success", + func() *Registry { + r := NewRegistry(NewRegistryOpts()) + + cnt := r.Counter("mycounter") + cnt.Add(42) + + gg := r.Gauge("mygauge") + gg.Set(2) + + return r + }(), + `{"metrics":[{"type":"COUNTER","labels":{"sensor":"mycounter"},"value":42},{"type":"DGAUGE","labels":{"sensor":"mygauge"},"value":2}]}`, + 133, + nil, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + w := httptest.NewRecorder() + ctx := context.Background() + + written, err := tc.registry.StreamJSON(ctx, w) + + if tc.expectErr == nil { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, tc.expectErr.Error()) + } + + assert.Equal(t, tc.expectWritten, written) + assert.Equal(t, len(tc.expect), w.Body.Len()) + + if tc.expect != "" { + var expectedObj, givenObj map[string]interface{} + err = json.Unmarshal([]byte(tc.expect), &expectedObj) + assert.NoError(t, err) + err = json.Unmarshal(w.Body.Bytes(), &givenObj) + assert.NoError(t, err) + + sameMap(t, expectedObj, givenObj) + } + }) + } +} + +func Test_streamSpack(t *testing.T) { + testCases := []struct { + name string + registry *Registry + compression CompressionType + expectHeader []byte + expectLabelNamesPool [][]byte + expectValueNamesPool [][]byte + expectCommonTime []byte + expectCommonLabels []byte + expectMetrics [][]byte + expectWritten int + }{ + { + "counter", + func() *Registry { + r := NewRegistry(NewRegistryOpts()) + + cnt := r.Counter("counter") + cnt.Add(42) + + return r + }(), + CompressionNone, + []byte{ + 0x53, 0x50, // magic + 0x01, 0x01, // version + 0x18, 0x00, // header size + 0x0, // time precision + 0x0, // compression algorithm + 0x7, 0x0, 0x0, 0x0, // label names size + 0x8, 0x0, 0x0, 0x0, // label values size + 0x1, 0x0, 0x0, 0x0, // metric count + 0x1, 0x0, 0x0, 0x0, // point count + }, + [][]byte{ // label names pool + {0x73, 0x65, 0x6e, 0x73, 0x6f, 0x72}, // "sensor" + }, + [][]byte{ // label values pool + {0x63, 0x6f, 0x75, 0x6e, 0x74, 0x65, 0x72}, // "counter" + }, + []byte{0x0, 0x0, 0x0, 0x0}, // common time + []byte{0x0}, // common labels count and indexes + [][]byte{ + { + 0x9, // types + 0x0, // flags + 0x1, // labels index size + 0x0, // indexes of name labels + 0x0, // indexes of value labels + + 0x2a, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, // 42 // metrics value + }, + }, + 57, + }, + { + "counter_lz4", + func() *Registry { + r := NewRegistry(NewRegistryOpts()) + + cnt := r.Counter("counter") + cnt.Add(0) + + return r + }(), + CompressionLz4, + []byte{ + 0x53, 0x50, // magic + 0x01, 0x01, // version + 0x18, 0x00, // header size + 0x0, // time precision + 0x3, // compression algorithm + 0x7, 0x0, 0x0, 0x0, // label names size + 0x8, 0x0, 0x0, 0x0, // label values size + 0x1, 0x0, 0x0, 0x0, // metric count + 0x1, 0x0, 0x0, 0x0, // point count + 0x23, 0x00, 0x00, 0x00, // compressed length + 0x21, 0x00, 0x00, 0x00, // uncompressed length + 0xf0, 0x12, + }, + [][]byte{ // label names pool + {0x73, 0x65, 0x6e, 0x73, 0x6f, 0x72}, // "sensor" + }, + [][]byte{ // label values pool + {0x63, 0x6f, 0x75, 0x6e, 0x74, 0x65, 0x72}, // "counter" + }, + []byte{0x0, 0x0, 0x0, 0x0}, // common time + []byte{0x0}, // common labels count and indexes + [][]byte{ + { + 0x9, // types + 0x0, // flags + 0x1, // labels index size + 0x0, // indexes of name labels + 0x0, // indexes of value labels + + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // 0 //metrics value + 0x10, 0x11, 0xa4, 0x22, // checksum + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // end stream + }, + }, + 83, + }, + { + "rate", + func() *Registry { + r := NewRegistry(NewRegistryOpts()) + + cnt := r.Counter("counter") + Rated(cnt) + cnt.Add(0) + + return r + }(), + CompressionNone, + []byte{ + 0x53, 0x50, // magic + 0x01, 0x01, // version + 0x18, 0x00, // header size + 0x0, // time precision + 0x0, // compression algorithm + 0x7, 0x0, 0x0, 0x0, // label names size + 0x8, 0x0, 0x0, 0x0, // label values size + 0x1, 0x0, 0x0, 0x0, // metric count + 0x1, 0x0, 0x0, 0x0, // point count + }, + [][]byte{ // label names pool + {0x73, 0x65, 0x6e, 0x73, 0x6f, 0x72}, // "sensor" + }, + [][]byte{ // label values pool + {0x63, 0x6f, 0x75, 0x6e, 0x74, 0x65, 0x72}, // "counter" + }, + []byte{0x0, 0x0, 0x0, 0x0}, // common time + []byte{0x0}, // common labels count and indexes + [][]byte{ + { + 0xd, // types + 0x0, // flags + 0x1, // labels index size + 0x0, // indexes of name labels + 0x0, // indexes of value labels + + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, //42 // metrics value + }, + }, + 57, + }, + { + "timer", + func() *Registry { + r := NewRegistry(NewRegistryOpts()) + + t := r.Timer("timer") + t.RecordDuration(2 * time.Second) + + return r + }(), + CompressionNone, + []byte{ + 0x53, 0x50, // magic + 0x01, 0x01, // version + 0x18, 0x00, // header size + 0x0, // time precision + 0x0, // compression algorithm + 0x7, 0x0, 0x0, 0x0, // label names size + 0x6, 0x0, 0x0, 0x0, // label values size + 0x1, 0x0, 0x0, 0x0, // metric count + 0x1, 0x0, 0x0, 0x0, // point count + }, + [][]byte{ // label names pool + {0x73, 0x65, 0x6e, 0x73, 0x6f, 0x72}, // "sensor" + }, + [][]byte{ // label values pool + {0x74, 0x69, 0x6d, 0x65, 0x72}, // "timer" + }, + []byte{0x0, 0x0, 0x0, 0x0}, // common time + []byte{0x0}, // common labels count and indexes + [][]byte{ + { + 0x5, // types + 0x0, // flags + 0x1, // labels index size + 0x0, // indexes of name labels + 0x0, // indexes of value labels + + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x40, //2.0 // metrics value + }, + }, + 55, + }, + { + "gauge", + func() *Registry { + r := NewRegistry(NewRegistryOpts()) + + g := r.Gauge("gauge") + g.Set(42) + + return r + }(), + CompressionNone, + []byte{ + 0x53, 0x50, // magic + 0x01, 0x01, // version + 0x18, 0x00, // header size + 0x0, // time precision + 0x0, // compression algorithm + 0x7, 0x0, 0x0, 0x0, // label names size + 0x6, 0x0, 0x0, 0x0, // label values size + 0x1, 0x0, 0x0, 0x0, // metric count + 0x1, 0x0, 0x0, 0x0, // point count + }, + [][]byte{ // label names pool + {0x73, 0x65, 0x6e, 0x73, 0x6f, 0x72}, // "sensor" + }, + [][]byte{ // label values pool + {0x67, 0x61, 0x75, 0x67, 0x65}, // "gauge" + }, + []byte{0x0, 0x0, 0x0, 0x0}, // common time + []byte{0x0}, // common labels count and indexes + [][]byte{ + { + 0x5, // types + 0x0, // flags + 0x1, // labels index size + 0x0, // indexes of name labels + 0x0, // indexes of value labels + + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x45, 0x40, //42 // metrics value + + }, + }, + 55, + }, + { + "histogram", + func() *Registry { + r := NewRegistry(NewRegistryOpts()) + + _ = r.Histogram("histogram", metrics.NewBuckets(0, 0.1, 0.11)) + + return r + }(), + CompressionNone, + []byte{ + 0x53, 0x50, // magic + 0x01, 0x01, // version + 0x18, 0x00, // header size + 0x0, // time precision + 0x0, // compression algorithm + 0x7, 0x0, 0x0, 0x0, // label names size + 0xa, 0x0, 0x0, 0x0, // label values size + 0x1, 0x0, 0x0, 0x0, // metric count + 0x1, 0x0, 0x0, 0x0, // point count + }, + [][]byte{ // label names pool + {0x73, 0x65, 0x6e, 0x73, 0x6f, 0x72}, // "sensor" + }, + [][]byte{ // label values pool + {0x68, 0x69, 0x73, 0x74, 0x6F, 0x67, 0x72, 0x61, 0x6D}, // "histogram" + }, + []byte{0x0, 0x0, 0x0, 0x0}, // common time + []byte{0x0}, // common labels count and indexes + [][]byte{ + { + /*types*/ 0x15, + /*flags*/ 0x0, + /*labels*/ 0x1, // ? + /*name*/ 0x0, + /*value*/ 0x0, + /*buckets count*/ 0x3, + /*upper bound 0*/ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + /*upper bound 1*/ 0x9a, 0x99, 0x99, 0x99, 0x99, 0x99, 0xb9, 0x3f, + /*upper bound 2*/ 0x29, 0x5c, 0x8f, 0xc2, 0xf5, 0x28, 0xbc, 0x3f, + /*counter 0*/ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + /*counter 1*/ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + /*counter 2*/ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + }, + }, + 100, + }, + { + "rate_histogram", + func() *Registry { + r := NewRegistry(NewRegistryOpts()) + + h := r.Histogram("histogram", metrics.NewBuckets(0, 0.1, 0.11)) + Rated(h) + + return r + }(), + CompressionNone, + []byte{ + 0x53, 0x50, // magic + 0x01, 0x01, // version + 0x18, 0x00, // header size + 0x0, // time precision + 0x0, // compression algorithm + 0x7, 0x0, 0x0, 0x0, // label names size + 0xa, 0x0, 0x0, 0x0, // label values size + 0x1, 0x0, 0x0, 0x0, // metric count + 0x1, 0x0, 0x0, 0x0, // point count + }, + [][]byte{ // label names pool + {0x73, 0x65, 0x6e, 0x73, 0x6f, 0x72}, // "sensor" + }, + [][]byte{ // label values pool + {0x68, 0x69, 0x73, 0x74, 0x6F, 0x67, 0x72, 0x61, 0x6D}, // "histogram" + }, + []byte{0x0, 0x0, 0x0, 0x0}, // common time + []byte{0x0}, // common labels count and indexes + [][]byte{ + { + /*types*/ 0x19, + /*flags*/ 0x0, + /*labels*/ 0x1, // ? + /*name*/ 0x0, + /*value*/ 0x0, + /*buckets count*/ 0x3, + /*upper bound 0*/ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + /*upper bound 1*/ 0x9a, 0x99, 0x99, 0x99, 0x99, 0x99, 0xb9, 0x3f, + /*upper bound 2*/ 0x29, 0x5c, 0x8f, 0xc2, 0xf5, 0x28, 0xbc, 0x3f, + /*counter 0*/ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + /*counter 1*/ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + /*counter 2*/ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + }, + }, + 100, + }, + { + "counter+timer", + func() *Registry { + r := NewRegistry(NewRegistryOpts()) + + cnt := r.Counter("counter") + cnt.Add(42) + + t := r.Timer("timer") + t.RecordDuration(2 * time.Second) + + return r + }(), + CompressionNone, + []byte{ + 0x53, 0x50, // magic + 0x01, 0x01, // version + 0x18, 0x00, // header size + 0x0, // time precision + 0x0, // compression algorithm + 0x7, 0x0, 0x0, 0x0, // label names size + 0xe, 0x0, 0x0, 0x0, // label values size + 0x2, 0x0, 0x0, 0x0, // metric count + 0x2, 0x0, 0x0, 0x0, // point count + }, + [][]byte{ // label names pool + {0x73, 0x65, 0x6e, 0x73, 0x6f, 0x72}, // "sensor" + }, + [][]byte{ // label values pool + {0x63, 0x6f, 0x75, 0x6e, 0x74, 0x65, 0x72}, // "counter" + {0x74, 0x69, 0x6d, 0x65, 0x72}, // "timer" + }, + []byte{0x0, 0x0, 0x0, 0x0}, // common time + []byte{0x0}, // common labels count and indexes + [][]byte{ + { + /*types*/ 0x9, + /*flags*/ 0x0, + /*labels*/ 0x1, // ? + /*name*/ 0x0, + /*value*/ 0x0, + /*metrics value*/ 0x2a, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, //42 + }, + { + /*types*/ 0x5, + /*flags*/ 0x0, + /*labels*/ 0x1, // ? + /*name*/ 0x0, + /*value*/ 0x1, + /*metrics value*/ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x40, //2.0 + + }, + }, + 76, + }, + { + "gauge+histogram", + func() *Registry { + r := NewRegistry(NewRegistryOpts()) + + g := r.Gauge("gauge") + g.Set(42) + + _ = r.Histogram("histogram", metrics.NewBuckets(0, 0.1, 0.11)) + + return r + }(), + CompressionNone, + []byte{ + 0x53, 0x50, // magic + 0x01, 0x01, // version + 0x18, 0x00, // header size + 0x0, // time precision + 0x0, // compression algorithm + 0x7, 0x0, 0x0, 0x0, // label names size + 0x10, 0x0, 0x0, 0x0, // label values size + 0x2, 0x0, 0x0, 0x0, // metric count + 0x2, 0x0, 0x0, 0x0, // point count + }, + [][]byte{ // label names pool + {0x73, 0x65, 0x6e, 0x73, 0x6f, 0x72}, // "sensor" + }, + [][]byte{ // label values pool + {0x67, 0x61, 0x75, 0x67, 0x65}, // "gauge" + {0x68, 0x69, 0x73, 0x74, 0x6F, 0x67, 0x72, 0x61, 0x6D}, // "histogram" + }, + []byte{0x0, 0x0, 0x0, 0x0}, // common time + []byte{0x0}, // common labels count and indexes + [][]byte{ + { + + /*types*/ 0x5, + /*flags*/ 0x0, + /*labels*/ 0x1, // ? + /*name*/ 0x0, + /*value*/ 0x0, + /*metrics value*/ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x45, 0x40, //42 + }, + { + /*types*/ 0x15, + /*flags*/ 0x0, + /*labels*/ 0x1, // ? + /*name*/ 0x0, + /*value*/ 0x1, + /*buckets count*/ 0x3, + /*upper bound 0*/ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + /*upper bound 1*/ 0x9a, 0x99, 0x99, 0x99, 0x99, 0x99, 0xb9, 0x3f, + /*upper bound 2*/ 0x29, 0x5c, 0x8f, 0xc2, 0xf5, 0x28, 0xbc, 0x3f, + /*counter 0*/ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + /*counter 1*/ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + /*counter 2*/ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + }, + }, + 119, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + w := httptest.NewRecorder() + ctx := context.Background() + + written, err := tc.registry.StreamSpack(ctx, w, tc.compression) + + assert.NoError(t, err) + assert.Equal(t, tc.expectWritten, written) + body := w.Body.Bytes() + require.True(t, bytes.HasPrefix(body, tc.expectHeader)) + body = body[len(tc.expectHeader):] + + t.Logf("expectLabelNamesPool: %v", tc.expectLabelNamesPool) + labelNamesPoolBytes := body[:len(bytes.Join(tc.expectLabelNamesPool, []byte{0x0}))+1] + labelNamesPool := bytes.Split(bytes.Trim(labelNamesPoolBytes, "\x00"), []byte{0x0}) + require.ElementsMatch(t, tc.expectLabelNamesPool, labelNamesPool) + body = body[len(labelNamesPoolBytes):] + + t.Logf("expectValueNamesPool: %v", tc.expectValueNamesPool) + valueNamesPoolBytes := body[:len(bytes.Join(tc.expectValueNamesPool, []byte{0x0}))+1] + valueNamesPool := bytes.Split(bytes.Trim(valueNamesPoolBytes, "\x00"), []byte{0x0}) + require.ElementsMatch(t, tc.expectValueNamesPool, valueNamesPool) + body = body[len(valueNamesPoolBytes):] + + require.True(t, bytes.HasPrefix(body, tc.expectCommonTime)) + body = body[len(tc.expectCommonTime):] + + require.True(t, bytes.HasPrefix(body, tc.expectCommonLabels)) + body = body[len(tc.expectCommonLabels):] + + expectButMissing := [][]byte{} + for idx := range tc.expectMetrics { + var seen bool + var val []byte + for _, v := range tc.expectMetrics { + val = v[:] + fixValueNameIndex(idx, val) + if bytes.HasPrefix(body, val) { + body = bytes.Replace(body, val, []byte{}, 1) + seen = true + break + } + } + if !seen { + expectButMissing = append(expectButMissing, val) + } + } + assert.Empty(t, body, "unexpected bytes seen") + assert.Empty(t, expectButMissing, "missing metrics bytes") + }) + } +} + +func fixValueNameIndex(idx int, metric []byte) { + // ASSUMPTION_FOR_TESTS: the size of the index is always equal to one + // That is, the number of points in the metric is always one + metric[4] = uint8(idx) // fix value name index +} + +func sameMap(t *testing.T, expected, actual map[string]interface{}) bool { + if !assert.Len(t, actual, len(expected)) { + return false + } + + for k := range expected { + actualMetric := actual[k] + if !assert.NotNil(t, actualMetric, "expected key %q not found", k) { + return false + } + + if !assert.ElementsMatch(t, expected[k], actualMetric, "%q must have same elements", k) { + return false + } + } + return true +} diff --git a/library/go/core/metrics/solomon/timer.go b/library/go/core/metrics/solomon/timer.go new file mode 100644 index 0000000000..d36940a9f7 --- /dev/null +++ b/library/go/core/metrics/solomon/timer.go @@ -0,0 +1,91 @@ +package solomon + +import ( + "encoding/json" + "time" + + "github.com/ydb-platform/ydb/library/go/core/metrics" + "go.uber.org/atomic" +) + +var ( + _ metrics.Timer = (*Timer)(nil) + _ Metric = (*Timer)(nil) +) + +// Timer measures gauge duration. +type Timer struct { + name string + metricType metricType + tags map[string]string + value atomic.Duration + timestamp *time.Time + + useNameTag bool +} + +func (t *Timer) RecordDuration(value time.Duration) { + t.value.Store(value) +} + +func (t *Timer) Name() string { + return t.name +} + +func (t *Timer) getType() metricType { + return t.metricType +} + +func (t *Timer) getLabels() map[string]string { + return t.tags +} + +func (t *Timer) getValue() interface{} { + return t.value.Load().Seconds() +} + +func (t *Timer) getTimestamp() *time.Time { + return t.timestamp +} + +func (t *Timer) getNameTag() string { + if t.useNameTag { + return "name" + } else { + return "sensor" + } +} + +// MarshalJSON implements json.Marshaler. +func (t *Timer) MarshalJSON() ([]byte, error) { + return json.Marshal(struct { + Type string `json:"type"` + Labels map[string]string `json:"labels"` + Value float64 `json:"value"` + Timestamp *int64 `json:"ts,omitempty"` + }{ + Type: t.metricType.String(), + Value: t.value.Load().Seconds(), + Labels: func() map[string]string { + labels := make(map[string]string, len(t.tags)+1) + labels[t.getNameTag()] = t.Name() + for k, v := range t.tags { + labels[k] = v + } + return labels + }(), + Timestamp: tsAsRef(t.timestamp), + }) +} + +// Snapshot returns independent copy on metric. +func (t *Timer) Snapshot() Metric { + return &Timer{ + name: t.name, + metricType: t.metricType, + tags: t.tags, + value: *atomic.NewDuration(t.value.Load()), + + useNameTag: t.useNameTag, + } +} diff --git a/library/go/core/metrics/solomon/timer_test.go b/library/go/core/metrics/solomon/timer_test.go new file mode 100644 index 0000000000..4904815701 --- /dev/null +++ b/library/go/core/metrics/solomon/timer_test.go @@ -0,0 +1,56 @@ +package solomon + +import ( + "encoding/json" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.uber.org/atomic" +) + +func TestTimer_RecordDuration(t *testing.T) { + c := &Timer{ + name: "mytimer", + metricType: typeGauge, + tags: map[string]string{"ololo": "trololo"}, + } + + c.RecordDuration(1 * time.Second) + assert.Equal(t, 1*time.Second, c.value.Load()) + + c.RecordDuration(42 * time.Millisecond) + assert.Equal(t, 42*time.Millisecond, c.value.Load()) +} + +func TestTimerRated_MarshalJSON(t *testing.T) { + c := &Timer{ + name: "mytimer", + metricType: typeRated, + tags: map[string]string{"ololo": "trololo"}, + value: *atomic.NewDuration(42 * time.Millisecond), + } + + b, err := json.Marshal(c) + assert.NoError(t, err) + + expected := []byte(`{"type":"RATE","labels":{"ololo":"trololo","sensor":"mytimer"},"value":0.042}`) + assert.Equal(t, expected, b) +} + +func TestNameTagTimer_MarshalJSON(t *testing.T) { + c := &Timer{ + name: "mytimer", + metricType: typeRated, + tags: map[string]string{"ololo": "trololo"}, + value: *atomic.NewDuration(42 * time.Millisecond), + + useNameTag: true, + } + + b, err := json.Marshal(c) + assert.NoError(t, err) + + expected := []byte(`{"type":"RATE","labels":{"name":"mytimer","ololo":"trololo"},"value":0.042}`) + assert.Equal(t, expected, b) +} diff --git a/library/go/core/metrics/solomon/vec.go b/library/go/core/metrics/solomon/vec.go new file mode 100644 index 0000000000..323919e9f8 --- /dev/null +++ b/library/go/core/metrics/solomon/vec.go @@ -0,0 +1,279 @@ +package solomon + +import ( + "sync" + + "github.com/ydb-platform/ydb/library/go/core/metrics" + "github.com/ydb-platform/ydb/library/go/core/metrics/internal/pkg/registryutil" +) + +// metricsVector is a base implementation of vector of metrics of any supported type. +type metricsVector struct { + labels []string + mtx sync.RWMutex // Protects metrics. + metrics map[uint64]Metric + rated bool + newMetric func(map[string]string) Metric + removeMetric func(m Metric) +} + +func (v *metricsVector) with(tags map[string]string) Metric { + hv, err := registryutil.VectorHash(tags, v.labels) + if err != nil { + panic(err) + } + + v.mtx.RLock() + metric, ok := v.metrics[hv] + v.mtx.RUnlock() + if ok { + return metric + } + + v.mtx.Lock() + defer v.mtx.Unlock() + + metric, ok = v.metrics[hv] + if !ok { + metric = v.newMetric(tags) + v.metrics[hv] = metric + } + + return metric +} + +// reset deletes all metrics in this vector. +func (v *metricsVector) reset() { + v.mtx.Lock() + defer v.mtx.Unlock() + + for h, m := range v.metrics { + delete(v.metrics, h) + v.removeMetric(m) + } +} + +var _ metrics.CounterVec = (*CounterVec)(nil) + +// CounterVec stores counters and +// implements metrics.CounterVec interface. +type CounterVec struct { + vec *metricsVector +} + +// CounterVec creates a new counters vector with given metric name and +// partitioned by the given label names. +func (r *Registry) CounterVec(name string, labels []string) metrics.CounterVec { + var vec *metricsVector + vec = &metricsVector{ + labels: append([]string(nil), labels...), + metrics: make(map[uint64]Metric), + rated: r.rated, + newMetric: func(tags map[string]string) Metric { + return r.Rated(vec.rated). + WithTags(tags). + Counter(name).(*Counter) + }, + removeMetric: func(m Metric) { + r.WithTags(m.getLabels()).(*Registry).unregisterMetric(m) + }, + } + return &CounterVec{vec: vec} +} + +// With creates new or returns existing counter with given tags from vector. +// It will panic if tags keys set is not equal to vector labels. +func (v *CounterVec) With(tags map[string]string) metrics.Counter { + return v.vec.with(tags).(*Counter) +} + +// Reset deletes all metrics in this vector. +func (v *CounterVec) Reset() { + v.vec.reset() +} + +var _ metrics.GaugeVec = (*GaugeVec)(nil) + +// GaugeVec stores gauges and +// implements metrics.GaugeVec interface. +type GaugeVec struct { + vec *metricsVector +} + +// GaugeVec creates a new gauges vector with given metric name and +// partitioned by the given label names. +func (r *Registry) GaugeVec(name string, labels []string) metrics.GaugeVec { + return &GaugeVec{ + vec: &metricsVector{ + labels: append([]string(nil), labels...), + metrics: make(map[uint64]Metric), + newMetric: func(tags map[string]string) Metric { + return r.WithTags(tags).Gauge(name).(*Gauge) + }, + removeMetric: func(m Metric) { + r.WithTags(m.getLabels()).(*Registry).unregisterMetric(m) + }, + }, + } +} + +// With creates new or returns existing gauge with given tags from vector. +// It will panic if tags keys set is not equal to vector labels. +func (v *GaugeVec) With(tags map[string]string) metrics.Gauge { + return v.vec.with(tags).(*Gauge) +} + +// Reset deletes all metrics in this vector. +func (v *GaugeVec) Reset() { + v.vec.reset() +} + +var _ metrics.IntGaugeVec = (*IntGaugeVec)(nil) + +// IntGaugeVec stores gauges and +// implements metrics.IntGaugeVec interface. +type IntGaugeVec struct { + vec *metricsVector +} + +// IntGaugeVec creates a new gauges vector with given metric name and +// partitioned by the given label names. +func (r *Registry) IntGaugeVec(name string, labels []string) metrics.IntGaugeVec { + return &IntGaugeVec{ + vec: &metricsVector{ + labels: append([]string(nil), labels...), + metrics: make(map[uint64]Metric), + newMetric: func(tags map[string]string) Metric { + return r.WithTags(tags).IntGauge(name).(*IntGauge) + }, + removeMetric: func(m Metric) { + r.WithTags(m.getLabels()).(*Registry).unregisterMetric(m) + }, + }, + } +} + +// With creates new or returns existing gauge with given tags from vector. +// It will panic if tags keys set is not equal to vector labels. +func (v *IntGaugeVec) With(tags map[string]string) metrics.IntGauge { + return v.vec.with(tags).(*IntGauge) +} + +// Reset deletes all metrics in this vector. +func (v *IntGaugeVec) Reset() { + v.vec.reset() +} + +var _ metrics.TimerVec = (*TimerVec)(nil) + +// TimerVec stores timers and +// implements metrics.TimerVec interface. +type TimerVec struct { + vec *metricsVector +} + +// TimerVec creates a new timers vector with given metric name and +// partitioned by the given label names. +func (r *Registry) TimerVec(name string, labels []string) metrics.TimerVec { + return &TimerVec{ + vec: &metricsVector{ + labels: append([]string(nil), labels...), + metrics: make(map[uint64]Metric), + newMetric: func(tags map[string]string) Metric { + return r.WithTags(tags).Timer(name).(*Timer) + }, + removeMetric: func(m Metric) { + r.WithTags(m.getLabels()).(*Registry).unregisterMetric(m) + }, + }, + } +} + +// With creates new or returns existing timer with given tags from vector. +// It will panic if tags keys set is not equal to vector labels. +func (v *TimerVec) With(tags map[string]string) metrics.Timer { + return v.vec.with(tags).(*Timer) +} + +// Reset deletes all metrics in this vector. +func (v *TimerVec) Reset() { + v.vec.reset() +} + +var _ metrics.HistogramVec = (*HistogramVec)(nil) + +// HistogramVec stores histograms and +// implements metrics.HistogramVec interface. +type HistogramVec struct { + vec *metricsVector +} + +// HistogramVec creates a new histograms vector with given metric name and buckets and +// partitioned by the given label names. +func (r *Registry) HistogramVec(name string, buckets metrics.Buckets, labels []string) metrics.HistogramVec { + var vec *metricsVector + vec = &metricsVector{ + labels: append([]string(nil), labels...), + metrics: make(map[uint64]Metric), + rated: r.rated, + newMetric: func(tags map[string]string) Metric { + return r.Rated(vec.rated). + WithTags(tags). + Histogram(name, buckets).(*Histogram) + }, + removeMetric: func(m Metric) { + r.WithTags(m.getLabels()).(*Registry).unregisterMetric(m) + }, + } + return &HistogramVec{vec: vec} +} + +// With creates new or returns existing histogram with given tags from vector. +// It will panic if tags keys set is not equal to vector labels. +func (v *HistogramVec) With(tags map[string]string) metrics.Histogram { + return v.vec.with(tags).(*Histogram) +} + +// Reset deletes all metrics in this vector. +func (v *HistogramVec) Reset() { + v.vec.reset() +} + +var _ metrics.TimerVec = (*DurationHistogramVec)(nil) + +// DurationHistogramVec stores duration histograms and +// implements metrics.TimerVec interface. +type DurationHistogramVec struct { + vec *metricsVector +} + +// DurationHistogramVec creates a new duration histograms vector with given metric name and buckets and +// partitioned by the given label names. +func (r *Registry) DurationHistogramVec(name string, buckets metrics.DurationBuckets, labels []string) metrics.TimerVec { + var vec *metricsVector + vec = &metricsVector{ + labels: append([]string(nil), labels...), + metrics: make(map[uint64]Metric), + rated: r.rated, + newMetric: func(tags map[string]string) Metric { + return r.Rated(vec.rated). + WithTags(tags). + DurationHistogram(name, buckets).(*Histogram) + }, + removeMetric: func(m Metric) { + r.WithTags(m.getLabels()).(*Registry).unregisterMetric(m) + }, + } + return &DurationHistogramVec{vec: vec} +} + +// With creates new or returns existing duration histogram with given tags from vector. +// It will panic if tags keys set is not equal to vector labels. +func (v *DurationHistogramVec) With(tags map[string]string) metrics.Timer { + return v.vec.with(tags).(*Histogram) +} + +// Reset deletes all metrics in this vector. +func (v *DurationHistogramVec) Reset() { + v.vec.reset() +} diff --git a/library/go/core/metrics/solomon/vec_test.go b/library/go/core/metrics/solomon/vec_test.go new file mode 100644 index 0000000000..cac437f434 --- /dev/null +++ b/library/go/core/metrics/solomon/vec_test.go @@ -0,0 +1,339 @@ +package solomon + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/ydb-platform/ydb/library/go/core/metrics" +) + +func TestVec(t *testing.T) { + type args struct { + name string + labels []string + buckets metrics.Buckets + dbuckets metrics.DurationBuckets + } + + testCases := []struct { + name string + args args + expectedType interface{} + expectLabels []string + }{ + { + name: "CounterVec", + args: args{ + name: "cntvec", + labels: []string{"shimba", "looken"}, + }, + expectedType: &CounterVec{}, + expectLabels: []string{"shimba", "looken"}, + }, + { + name: "GaugeVec", + args: args{ + name: "ggvec", + labels: []string{"shimba", "looken"}, + }, + expectedType: &GaugeVec{}, + expectLabels: []string{"shimba", "looken"}, + }, + { + name: "TimerVec", + args: args{ + name: "tvec", + labels: []string{"shimba", "looken"}, + }, + expectedType: &TimerVec{}, + expectLabels: []string{"shimba", "looken"}, + }, + { + name: "HistogramVec", + args: args{ + name: "hvec", + labels: []string{"shimba", "looken"}, + buckets: metrics.NewBuckets(1, 2, 3, 4), + }, + expectedType: &HistogramVec{}, + expectLabels: []string{"shimba", "looken"}, + }, + { + name: "DurationHistogramVec", + args: args{ + name: "dhvec", + labels: []string{"shimba", "looken"}, + dbuckets: metrics.NewDurationBuckets(1, 2, 3, 4), + }, + expectedType: &DurationHistogramVec{}, + expectLabels: []string{"shimba", "looken"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + rg := NewRegistry(NewRegistryOpts()) + + switch vect := tc.expectedType.(type) { + case *CounterVec: + vec := rg.CounterVec(tc.args.name, tc.args.labels) + assert.IsType(t, vect, vec) + assert.Equal(t, tc.expectLabels, vec.(*CounterVec).vec.labels) + case *GaugeVec: + vec := rg.GaugeVec(tc.args.name, tc.args.labels) + assert.IsType(t, vect, vec) + assert.Equal(t, tc.expectLabels, vec.(*GaugeVec).vec.labels) + case *TimerVec: + vec := rg.TimerVec(tc.args.name, tc.args.labels) + assert.IsType(t, vect, vec) + assert.Equal(t, tc.expectLabels, vec.(*TimerVec).vec.labels) + case *HistogramVec: + vec := rg.HistogramVec(tc.args.name, tc.args.buckets, tc.args.labels) + assert.IsType(t, vect, vec) + assert.Equal(t, tc.expectLabels, vec.(*HistogramVec).vec.labels) + case *DurationHistogramVec: + vec := rg.DurationHistogramVec(tc.args.name, tc.args.dbuckets, tc.args.labels) + assert.IsType(t, vect, vec) + assert.Equal(t, tc.expectLabels, vec.(*DurationHistogramVec).vec.labels) + default: + t.Errorf("unknown type: %T", vect) + } + }) + } +} + +func TestCounterVecWith(t *testing.T) { + rg := NewRegistry(NewRegistryOpts()) + + t.Run("plain", func(t *testing.T) { + vec := rg.CounterVec("ololo", []string{"shimba", "looken"}) + tags := map[string]string{ + "shimba": "boomba", + "looken": "tooken", + } + metric := vec.With(tags) + + assert.IsType(t, &CounterVec{}, vec) + assert.IsType(t, &Counter{}, metric) + assert.Equal(t, typeCounter, metric.(*Counter).metricType) + + assert.NotEmpty(t, vec.(*CounterVec).vec.metrics) + vec.Reset() + assert.Empty(t, vec.(*CounterVec).vec.metrics) + assertMetricRemoved(t, rg.WithTags(tags).(*Registry), metric.(*Counter)) + }) + + t.Run("rated", func(t *testing.T) { + vec := rg.CounterVec("ololo", []string{"shimba", "looken"}) + Rated(vec) + tags := map[string]string{ + "shimba": "boomba", + "looken": "tooken", + } + metric := vec.With(tags) + + assert.IsType(t, &CounterVec{}, vec) + assert.IsType(t, &Counter{}, metric) + assert.Equal(t, typeRated, metric.(*Counter).metricType) + + assert.NotEmpty(t, vec.(*CounterVec).vec.metrics) + vec.Reset() + assert.Empty(t, vec.(*CounterVec).vec.metrics) + assertMetricRemoved(t, rg.WithTags(tags).(*Registry), metric.(*Counter)) + }) +} + +func TestGaugeVecWith(t *testing.T) { + rg := NewRegistry(NewRegistryOpts()) + + vec := rg.GaugeVec("ololo", []string{"shimba", "looken"}) + tags := map[string]string{ + "shimba": "boomba", + "looken": "tooken", + } + metric := vec.With(tags) + + assert.IsType(t, &GaugeVec{}, vec) + assert.IsType(t, &Gauge{}, metric) + assert.Equal(t, typeGauge, metric.(*Gauge).metricType) + + assert.NotEmpty(t, vec.(*GaugeVec).vec.metrics) + vec.Reset() + assert.Empty(t, vec.(*GaugeVec).vec.metrics) + assertMetricRemoved(t, rg.WithTags(tags).(*Registry), metric.(*Gauge)) +} + +func TestTimerVecWith(t *testing.T) { + rg := NewRegistry(NewRegistryOpts()) + vec := rg.TimerVec("ololo", []string{"shimba", "looken"}) + tags := map[string]string{ + "shimba": "boomba", + "looken": "tooken", + } + metric := vec.With(tags) + + assert.IsType(t, &TimerVec{}, vec) + assert.IsType(t, &Timer{}, metric) + assert.Equal(t, typeGauge, metric.(*Timer).metricType) + + assert.NotEmpty(t, vec.(*TimerVec).vec.metrics) + vec.Reset() + assert.Empty(t, vec.(*TimerVec).vec.metrics) + assertMetricRemoved(t, rg.WithTags(tags).(*Registry), metric.(*Timer)) +} + +func TestHistogramVecWith(t *testing.T) { + rg := NewRegistry(NewRegistryOpts()) + + t.Run("plain", func(t *testing.T) { + buckets := metrics.NewBuckets(1, 2, 3) + vec := rg.HistogramVec("ololo", buckets, []string{"shimba", "looken"}) + tags := map[string]string{ + "shimba": "boomba", + "looken": "tooken", + } + metric := vec.With(tags) + + assert.IsType(t, &HistogramVec{}, vec) + assert.IsType(t, &Histogram{}, metric) + assert.Equal(t, typeHistogram, metric.(*Histogram).metricType) + + assert.NotEmpty(t, vec.(*HistogramVec).vec.metrics) + vec.Reset() + assert.Empty(t, vec.(*HistogramVec).vec.metrics) + assertMetricRemoved(t, rg.WithTags(tags).(*Registry), metric.(*Histogram)) + }) + + t.Run("rated", func(t *testing.T) { + buckets := metrics.NewBuckets(1, 2, 3) + vec := rg.HistogramVec("ololo", buckets, []string{"shimba", "looken"}) + Rated(vec) + tags := map[string]string{ + "shimba": "boomba", + "looken": "tooken", + } + metric := vec.With(tags) + + assert.IsType(t, &HistogramVec{}, vec) + assert.IsType(t, &Histogram{}, metric) + assert.Equal(t, typeRatedHistogram, metric.(*Histogram).metricType) + + assert.NotEmpty(t, vec.(*HistogramVec).vec.metrics) + vec.Reset() + assert.Empty(t, vec.(*HistogramVec).vec.metrics) + assertMetricRemoved(t, rg.WithTags(tags).(*Registry), metric.(*Histogram)) + }) +} + +func TestDurationHistogramVecWith(t *testing.T) { + rg := NewRegistry(NewRegistryOpts()) + + t.Run("plain", func(t *testing.T) { + buckets := metrics.NewDurationBuckets(1, 2, 3) + vec := rg.DurationHistogramVec("ololo", buckets, []string{"shimba", "looken"}) + tags := map[string]string{ + "shimba": "boomba", + "looken": "tooken", + } + metric := vec.With(tags) + + assert.IsType(t, &DurationHistogramVec{}, vec) + assert.IsType(t, &Histogram{}, metric) + assert.Equal(t, typeHistogram, metric.(*Histogram).metricType) + + assert.NotEmpty(t, vec.(*DurationHistogramVec).vec.metrics) + vec.Reset() + assert.Empty(t, vec.(*DurationHistogramVec).vec.metrics) + assertMetricRemoved(t, rg.WithTags(tags).(*Registry), metric.(*Histogram)) + }) + + t.Run("rated", func(t *testing.T) { + buckets := metrics.NewDurationBuckets(1, 2, 3) + vec := rg.DurationHistogramVec("ololo", buckets, []string{"shimba", "looken"}) + Rated(vec) + tags := map[string]string{ + "shimba": "boomba", + "looken": "tooken", + } + metric := vec.With(tags) + + assert.IsType(t, &DurationHistogramVec{}, vec) + assert.IsType(t, &Histogram{}, metric) + assert.Equal(t, typeRatedHistogram, metric.(*Histogram).metricType) + + assert.NotEmpty(t, vec.(*DurationHistogramVec).vec.metrics) + vec.Reset() + assert.Empty(t, vec.(*DurationHistogramVec).vec.metrics) + assertMetricRemoved(t, rg.WithTags(tags).(*Registry), metric.(*Histogram)) + }) +} + +func TestMetricsVectorWith(t *testing.T) { + rg := NewRegistry(NewRegistryOpts()) + + name := "ololo" + tags := map[string]string{ + "shimba": "boomba", + "looken": "tooken", + } + + vec := &metricsVector{ + labels: []string{"shimba", "looken"}, + metrics: make(map[uint64]Metric), + newMetric: func(tags map[string]string) Metric { + return rg.WithTags(tags).Counter(name).(*Counter) + }, + removeMetric: func(m Metric) { + rg.WithTags(m.getLabels()).(*Registry).unregisterMetric(m) + }, + } + + // check first counter + metric := vec.with(tags) + require.IsType(t, &Counter{}, metric) + cnt := metric.(*Counter) + assert.Equal(t, name, cnt.name) + assert.Equal(t, tags, cnt.tags) + + // check vector length + assert.Equal(t, 1, len(vec.metrics)) + + // check same counter returned for same tags set + cnt2 := vec.with(tags) + assert.Same(t, cnt, cnt2) + + // check vector length + assert.Equal(t, 1, len(vec.metrics)) + + // return new counter + cnt3 := vec.with(map[string]string{ + "shimba": "boomba", + "looken": "cooken", + }) + assert.NotSame(t, cnt, cnt3) + + // check vector length + assert.Equal(t, 2, len(vec.metrics)) + + // check for panic + assert.Panics(t, func() { + vec.with(map[string]string{"chicken": "cooken"}) + }) + assert.Panics(t, func() { + vec.with(map[string]string{"shimba": "boomba", "chicken": "cooken"}) + }) + + // check reset + vec.reset() + assert.Empty(t, vec.metrics) + assertMetricRemoved(t, rg.WithTags(tags).(*Registry), cnt2) + assertMetricRemoved(t, rg.WithTags(tags).(*Registry), cnt3) +} + +func assertMetricRemoved(t *testing.T, rg *Registry, m Metric) { + t.Helper() + + v, ok := rg.metrics.Load(rg.metricKey(m)) + assert.False(t, ok, v) +} diff --git a/library/go/core/metrics/solomon/ya.make b/library/go/core/metrics/solomon/ya.make new file mode 100644 index 0000000000..a4de14cadf --- /dev/null +++ b/library/go/core/metrics/solomon/ya.make @@ -0,0 +1,44 @@ +GO_LIBRARY() + +SRCS( + converter.go + counter.go + func_counter.go + func_gauge.go + func_int_gauge.go + gauge.go + int_gauge.go + histogram.go + metrics.go + metrics_opts.go + registry.go + registry_opts.go + spack.go + spack_compression.go + stream.go + timer.go + vec.go +) + +GO_TEST_SRCS( + converter_test.go + counter_test.go + func_counter_test.go + func_gauge_test.go + func_int_gauge_test.go + gauge_test.go + int_gauge_test.go + histogram_test.go + metrics_test.go + registry_test.go + spack_compression_test.go + spack_test.go + stream_test.go + timer_test.go + vec_test.go + race_test.go +) + +END() + +RECURSE(gotest) diff --git a/library/go/core/metrics/ya.make b/library/go/core/metrics/ya.make new file mode 100644 index 0000000000..0a42f422af --- /dev/null +++ b/library/go/core/metrics/ya.make @@ -0,0 +1,20 @@ +GO_LIBRARY() + +SRCS( + buckets.go + metrics.go +) + +GO_TEST_SRCS(buckets_test.go) + +END() + +RECURSE( + collect + gotest + internal + mock + nop + prometheus + solomon +) diff --git a/library/go/core/resource/cc/main.go b/library/go/core/resource/cc/main.go new file mode 100644 index 0000000000..50887343d6 --- /dev/null +++ b/library/go/core/resource/cc/main.go @@ -0,0 +1,91 @@ +package main + +import ( + "bufio" + "flag" + "fmt" + "io" + "os" + "strings" +) + +func fatalf(msg string, args ...interface{}) { + _, _ = fmt.Fprintf(os.Stderr, msg+"\n", args...) + os.Exit(1) +} + +func generate(w io.Writer, pkg string, blobs [][]byte, keys []string) { + _, _ = fmt.Fprint(w, "// Code generated by github.com/ydb-platform/ydb/library/go/core/resource/cc DO NOT EDIT.\n") + _, _ = fmt.Fprintf(w, "package %s\n\n", pkg) + _, _ = fmt.Fprint(w, "import \"github.com/ydb-platform/ydb/library/go/core/resource\"\n") + + for i := 0; i < len(blobs); i++ { + blob := blobs[i] + + _, _ = fmt.Fprint(w, "\nfunc init() {\n") + + _, _ = fmt.Fprint(w, "\tblob := []byte(") + _, _ = fmt.Fprintf(w, "%+q", blob) + _, _ = fmt.Fprint(w, ")\n") + _, _ = fmt.Fprintf(w, "\tresource.InternalRegister(%q, blob)\n", keys[i]) + _, _ = fmt.Fprint(w, "}\n") + } +} + +func main() { + var pkg, output string + + flag.StringVar(&pkg, "package", "", "package name") + flag.StringVar(&output, "o", "", "output filename") + flag.Parse() + + if flag.NArg()%2 != 0 { + fatalf("cc: must provide even number of arguments") + } + + var keys []string + var blobs [][]byte + for i := 0; 2*i < flag.NArg(); i++ { + file := flag.Arg(2 * i) + key := flag.Arg(2*i + 1) + + if !strings.HasPrefix(key, "notafile") { + fatalf("cc: key argument must start with \"notafile\" string") + } + key = key[8:] + + if file == "-" { + parts := strings.SplitN(key, "=", 2) + if len(parts) != 2 { + fatalf("cc: invalid key syntax: %q", key) + } + + keys = append(keys, parts[0]) + blobs = append(blobs, []byte(parts[1])) + } else { + blob, err := os.ReadFile(file) + if err != nil { + fatalf("cc: %v", err) + } + + keys = append(keys, key) + blobs = append(blobs, blob) + } + } + + f, err := os.Create(output) + if err != nil { + fatalf("cc: %v", err) + } + + b := bufio.NewWriter(f) + generate(b, pkg, blobs, keys) + + if err = b.Flush(); err != nil { + fatalf("cc: %v", err) + } + + if err = f.Close(); err != nil { + fatalf("cc: %v", err) + } +} diff --git a/library/go/core/resource/cc/ya.make b/library/go/core/resource/cc/ya.make new file mode 100644 index 0000000000..4d99fcc9c0 --- /dev/null +++ b/library/go/core/resource/cc/ya.make @@ -0,0 +1,9 @@ +GO_PROGRAM() + +SRCS(main.go) + +GO_TEST_SRCS(generate_test.go) + +END() + +RECURSE(gotest) diff --git a/library/go/core/resource/resource.go b/library/go/core/resource/resource.go new file mode 100644 index 0000000000..686ea73c3b --- /dev/null +++ b/library/go/core/resource/resource.go @@ -0,0 +1,56 @@ +// Package resource provides integration with RESOURCE and RESOURCE_FILES macros. +// +// Use RESOURCE macro to "link" file into the library or executable. +// +// RESOURCE(my_file.txt some_key) +// +// And then retrieve file content in the runtime. +// +// blob := resource.Get("some_key") +// +// Warning: Excessive consumption of resource leads to obesity. +package resource + +import ( + "fmt" + "sort" +) + +var resources = map[string][]byte{} + +// InternalRegister is private API used by generated code. +func InternalRegister(key string, blob []byte) { + if _, ok := resources[key]; ok { + panic(fmt.Sprintf("resource key %q is already defined", key)) + } + + resources[key] = blob +} + +// Get returns content of the file registered by the given key. +// +// If no file was registered for the given key, nil slice is returned. +// +// User should take care, to avoid mutating returned slice. +func Get(key string) []byte { + return resources[key] +} + +// MustGet is like Get, but panics when associated resource is not defined. +func MustGet(key string) []byte { + r, ok := resources[key] + if !ok { + panic(fmt.Sprintf("resource with key %q is not defined", key)) + } + return r +} + +// Keys returns sorted keys of all registered resources inside the binary +func Keys() []string { + keys := make([]string, 0, len(resources)) + for k := range resources { + keys = append(keys, k) + } + sort.Strings(keys) + return keys +} diff --git a/library/go/core/resource/ya.make b/library/go/core/resource/ya.make new file mode 100644 index 0000000000..4860291e25 --- /dev/null +++ b/library/go/core/resource/ya.make @@ -0,0 +1,14 @@ +GO_LIBRARY() + +SRCS(resource.go) + +END() + +RECURSE( + cc + test + test-bin + test-fileonly + test-files + test-keyonly +) |