aboutsummaryrefslogtreecommitdiffstats
path: root/library/go/core/metrics/solomon/stream.go
diff options
context:
space:
mode:
authorqrort <qrort@yandex-team.com>2022-11-30 23:47:12 +0300
committerqrort <qrort@yandex-team.com>2022-11-30 23:47:12 +0300
commit22f8ae0e3f5d68b92aecccdf96c1d841a0334311 (patch)
treebffa27765faf54126ad44bcafa89fadecb7a73d7 /library/go/core/metrics/solomon/stream.go
parent332b99e2173f0425444abb759eebcb2fafaa9209 (diff)
downloadydb-22f8ae0e3f5d68b92aecccdf96c1d841a0334311.tar.gz
validate canons without yatest_common
Diffstat (limited to 'library/go/core/metrics/solomon/stream.go')
-rw-r--r--library/go/core/metrics/solomon/stream.go89
1 files changed, 89 insertions, 0 deletions
diff --git a/library/go/core/metrics/solomon/stream.go b/library/go/core/metrics/solomon/stream.go
new file mode 100644
index 0000000000..26dc768c98
--- /dev/null
+++ b/library/go/core/metrics/solomon/stream.go
@@ -0,0 +1,89 @@
+package solomon
+
+import (
+ "context"
+ "encoding/json"
+ "io"
+
+ "a.yandex-team.ru/library/go/core/xerrors"
+)
+
+const HeaderSize = 24
+
+type StreamFormat string
+
+func (r *Registry) StreamJSON(ctx context.Context, w io.Writer) (written int, err error) {
+ cw := newCompressedWriter(w, CompressionNone)
+
+ if ctx.Err() != nil {
+ return written, xerrors.Errorf("streamJSON context error: %w", ctx.Err())
+ }
+ _, err = cw.Write([]byte("{\"metrics\":["))
+ if err != nil {
+ return written, xerrors.Errorf("write metrics failed: %w", err)
+ }
+
+ first := true
+ r.metrics.Range(func(_, s interface{}) bool {
+ if ctx.Err() != nil {
+ err = xerrors.Errorf("streamJSON context error: %w", ctx.Err())
+ return false
+ }
+
+ // write trailing comma
+ if !first {
+ _, err = cw.Write([]byte(","))
+ if err != nil {
+ err = xerrors.Errorf("write metrics failed: %w", err)
+ return false
+ }
+ }
+
+ var b []byte
+
+ b, err = json.Marshal(s)
+ if err != nil {
+ err = xerrors.Errorf("marshal metric failed: %w", err)
+ return false
+ }
+
+ // write metric json
+ _, err = cw.Write(b)
+ if err != nil {
+ err = xerrors.Errorf("write metric failed: %w", err)
+ return false
+ }
+
+ first = false
+ return true
+ })
+ if err != nil {
+ return written, err
+ }
+
+ if ctx.Err() != nil {
+ return written, xerrors.Errorf("streamJSON context error: %w", ctx.Err())
+ }
+ _, err = cw.Write([]byte("]}"))
+ if err != nil {
+ return written, xerrors.Errorf("write metrics failed: %w", err)
+ }
+
+ if ctx.Err() != nil {
+ return written, xerrors.Errorf("streamJSON context error: %w", ctx.Err())
+ }
+ err = cw.Close()
+ if err != nil {
+ return written, xerrors.Errorf("close failed: %w", err)
+ }
+
+ return cw.(*noCompressionWriteCloser).written, nil
+}
+
+func (r *Registry) StreamSpack(ctx context.Context, w io.Writer, compression CompressionType) (int, error) {
+ metrics, err := r.Gather()
+ if err != nil {
+ return 0, err
+ }
+ return NewSpackEncoder(ctx, compression, metrics).Encode(w)
+}