aboutsummaryrefslogtreecommitdiffstats
path: root/library/go/core/metrics/solomon/stream.go
diff options
context:
space:
mode:
authorhcpp <hcpp@ydb.tech>2023-11-08 12:09:41 +0300
committerhcpp <hcpp@ydb.tech>2023-11-08 12:56:14 +0300
commita361f5b98b98b44ea510d274f6769164640dd5e1 (patch)
treec47c80962c6e2e7b06798238752fd3da0191a3f6 /library/go/core/metrics/solomon/stream.go
parent9478806fde1f4d40bd5a45e7cbe77237dab613e9 (diff)
downloadydb-a361f5b98b98b44ea510d274f6769164640dd5e1.tar.gz
metrics have been added
Diffstat (limited to 'library/go/core/metrics/solomon/stream.go')
-rw-r--r--library/go/core/metrics/solomon/stream.go89
1 files changed, 89 insertions, 0 deletions
diff --git a/library/go/core/metrics/solomon/stream.go b/library/go/core/metrics/solomon/stream.go
new file mode 100644
index 0000000000..7cf6d70064
--- /dev/null
+++ b/library/go/core/metrics/solomon/stream.go
@@ -0,0 +1,89 @@
+package solomon
+
+import (
+ "context"
+ "encoding/json"
+ "io"
+
+ "github.com/ydb-platform/ydb/library/go/core/xerrors"
+)
+
+const HeaderSize = 24
+
+type StreamFormat string
+
+func (r *Registry) StreamJSON(ctx context.Context, w io.Writer) (written int, err error) {
+ cw := newCompressedWriter(w, CompressionNone)
+
+ if ctx.Err() != nil {
+ return written, xerrors.Errorf("streamJSON context error: %w", ctx.Err())
+ }
+ _, err = cw.Write([]byte("{\"metrics\":["))
+ if err != nil {
+ return written, xerrors.Errorf("write metrics failed: %w", err)
+ }
+
+ first := true
+ r.metrics.Range(func(_, s interface{}) bool {
+ if ctx.Err() != nil {
+ err = xerrors.Errorf("streamJSON context error: %w", ctx.Err())
+ return false
+ }
+
+ // write trailing comma
+ if !first {
+ _, err = cw.Write([]byte(","))
+ if err != nil {
+ err = xerrors.Errorf("write metrics failed: %w", err)
+ return false
+ }
+ }
+
+ var b []byte
+
+ b, err = json.Marshal(s)
+ if err != nil {
+ err = xerrors.Errorf("marshal metric failed: %w", err)
+ return false
+ }
+
+ // write metric json
+ _, err = cw.Write(b)
+ if err != nil {
+ err = xerrors.Errorf("write metric failed: %w", err)
+ return false
+ }
+
+ first = false
+ return true
+ })
+ if err != nil {
+ return written, err
+ }
+
+ if ctx.Err() != nil {
+ return written, xerrors.Errorf("streamJSON context error: %w", ctx.Err())
+ }
+ _, err = cw.Write([]byte("]}"))
+ if err != nil {
+ return written, xerrors.Errorf("write metrics failed: %w", err)
+ }
+
+ if ctx.Err() != nil {
+ return written, xerrors.Errorf("streamJSON context error: %w", ctx.Err())
+ }
+ err = cw.Close()
+ if err != nil {
+ return written, xerrors.Errorf("close failed: %w", err)
+ }
+
+ return cw.(*noCompressionWriteCloser).written, nil
+}
+
+func (r *Registry) StreamSpack(ctx context.Context, w io.Writer, compression CompressionType) (int, error) {
+ metrics, err := r.Gather()
+ if err != nil {
+ return 0, err
+ }
+ return NewSpackEncoder(ctx, compression, metrics).Encode(w)
+}