diff options
author | qrort <qrort@yandex-team.com> | 2022-11-30 23:47:12 +0300 |
---|---|---|
committer | qrort <qrort@yandex-team.com> | 2022-11-30 23:47:12 +0300 |
commit | 22f8ae0e3f5d68b92aecccdf96c1d841a0334311 (patch) | |
tree | bffa27765faf54126ad44bcafa89fadecb7a73d7 /library/go/core/metrics/solomon/stream.go | |
parent | 332b99e2173f0425444abb759eebcb2fafaa9209 (diff) | |
download | ydb-22f8ae0e3f5d68b92aecccdf96c1d841a0334311.tar.gz |
validate canons without yatest_common
Diffstat (limited to 'library/go/core/metrics/solomon/stream.go')
-rw-r--r-- | library/go/core/metrics/solomon/stream.go | 89 |
1 files changed, 89 insertions, 0 deletions
diff --git a/library/go/core/metrics/solomon/stream.go b/library/go/core/metrics/solomon/stream.go new file mode 100644 index 0000000000..26dc768c98 --- /dev/null +++ b/library/go/core/metrics/solomon/stream.go @@ -0,0 +1,89 @@ +package solomon + +import ( + "context" + "encoding/json" + "io" + + "a.yandex-team.ru/library/go/core/xerrors" +) + +const HeaderSize = 24 + +type StreamFormat string + +func (r *Registry) StreamJSON(ctx context.Context, w io.Writer) (written int, err error) { + cw := newCompressedWriter(w, CompressionNone) + + if ctx.Err() != nil { + return written, xerrors.Errorf("streamJSON context error: %w", ctx.Err()) + } + _, err = cw.Write([]byte("{\"metrics\":[")) + if err != nil { + return written, xerrors.Errorf("write metrics failed: %w", err) + } + + first := true + r.metrics.Range(func(_, s interface{}) bool { + if ctx.Err() != nil { + err = xerrors.Errorf("streamJSON context error: %w", ctx.Err()) + return false + } + + // write trailing comma + if !first { + _, err = cw.Write([]byte(",")) + if err != nil { + err = xerrors.Errorf("write metrics failed: %w", err) + return false + } + } + + var b []byte + + b, err = json.Marshal(s) + if err != nil { + err = xerrors.Errorf("marshal metric failed: %w", err) + return false + } + + // write metric json + _, err = cw.Write(b) + if err != nil { + err = xerrors.Errorf("write metric failed: %w", err) + return false + } + + first = false + return true + }) + if err != nil { + return written, err + } + + if ctx.Err() != nil { + return written, xerrors.Errorf("streamJSON context error: %w", ctx.Err()) + } + _, err = cw.Write([]byte("]}")) + if err != nil { + return written, xerrors.Errorf("write metrics failed: %w", err) + } + + if ctx.Err() != nil { + return written, xerrors.Errorf("streamJSON context error: %w", ctx.Err()) + } + err = cw.Close() + if err != nil { + return written, xerrors.Errorf("close failed: %w", err) + } + + return cw.(*noCompressionWriteCloser).written, nil +} + +func (r *Registry) StreamSpack(ctx context.Context, w io.Writer, compression CompressionType) (int, error) { + metrics, err := r.Gather() + if err != nil { + return 0, err + } + return NewSpackEncoder(ctx, compression, metrics).Encode(w) +} |