1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
package httppuller
import (
"context"
"fmt"
"io"
"net/http"
"reflect"
"github.com/ydb-platform/ydb/library/go/core/log"
"github.com/ydb-platform/ydb/library/go/core/log/nop"
"github.com/ydb-platform/ydb/library/go/core/metrics/solomon"
"github.com/ydb-platform/ydb/library/go/httputil/headers"
"github.com/ydb-platform/ydb/library/go/httputil/middleware/tvm"
)
const nilRegistryPanicMsg = "nil registry given"
type MetricsStreamer interface {
StreamJSON(context.Context, io.Writer) (int, error)
StreamSpack(context.Context, io.Writer, solomon.CompressionType) (int, error)
}
type handler struct {
registry MetricsStreamer
streamFormat headers.ContentType
checkTicket func(h http.Handler) http.Handler
logger log.Logger
}
type Option interface {
isOption()
}
// NewHandler returns new HTTP handler to expose gathered metrics using metrics dumper
func NewHandler(r MetricsStreamer, opts ...Option) http.Handler {
if v := reflect.ValueOf(r); !v.IsValid() || v.Kind() == reflect.Ptr && v.IsNil() {
panic(nilRegistryPanicMsg)
}
h := handler{
registry: r,
streamFormat: headers.TypeApplicationJSON,
checkTicket: func(h http.Handler) http.Handler {
return h
},
logger: &nop.Logger{},
}
for _, opt := range opts {
switch o := opt.(type) {
case *tvmOption:
h.checkTicket = tvm.CheckServiceTicket(o.client, tvm.WithAllowedClients(AllFetchers))
case *spackOption:
h.streamFormat = headers.TypeApplicationXSolomonSpack
case *loggerOption:
h.logger = o.logger
default:
panic(fmt.Sprintf("unsupported option %T", opt))
}
}
return h.checkTicket(h)
}
func (h handler) okSpack(header http.Header) bool {
if h.streamFormat != headers.TypeApplicationXSolomonSpack {
return false
}
for _, header := range header[headers.AcceptKey] {
types, err := headers.ParseAccept(header)
if err != nil {
h.logger.Warn("Can't parse accept header", log.Error(err), log.String("header", header))
continue
}
for _, acceptableType := range types {
if acceptableType.Type == headers.TypeApplicationXSolomonSpack {
return true
}
}
}
return false
}
func (h handler) okLZ4Compression(header http.Header) bool {
for _, header := range header[headers.AcceptEncodingKey] {
encodings, err := headers.ParseAcceptEncoding(header)
if err != nil {
h.logger.Warn("Can't parse accept-encoding header", log.Error(err), log.String("header", header))
continue
}
for _, acceptableEncoding := range encodings {
if acceptableEncoding.Encoding == headers.EncodingLZ4 {
return true
}
}
}
return false
}
func (h handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if h.okSpack(r.Header) {
compression := solomon.CompressionNone
if h.okLZ4Compression(r.Header) {
compression = solomon.CompressionLz4
}
w.Header().Set(headers.ContentTypeKey, headers.TypeApplicationXSolomonSpack.String())
_, err := h.registry.StreamSpack(r.Context(), w, compression)
if err != nil {
h.logger.Error("Failed to write compressed spack", log.Error(err))
}
return
}
w.Header().Set(headers.ContentTypeKey, headers.TypeApplicationJSON.String())
_, err := h.registry.StreamJSON(r.Context(), w)
if err != nil {
h.logger.Error("Failed to write json", log.Error(err))
}
}
|