aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/google.golang.org/grpc/orca/call_metrics.go
blob: 558c7bce6a8eea524e7557ea67960454ffcc60dd (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
/*
 *
 * Copyright 2022 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package orca

import (
	"context"
	"sync"

	"google.golang.org/grpc"
	grpcinternal "google.golang.org/grpc/internal"
	"google.golang.org/grpc/metadata"
	"google.golang.org/grpc/orca/internal"
	"google.golang.org/protobuf/proto"
)

// CallMetricsRecorder allows a service method handler to record per-RPC
// metrics.  It contains all utilization-based metrics from
// ServerMetricsRecorder as well as additional request cost metrics.
type CallMetricsRecorder interface {
	ServerMetricsRecorder

	// SetRequestCost sets the relevant server metric.
	SetRequestCost(name string, val float64)
	// DeleteRequestCost deletes the relevant server metric to prevent it
	// from being sent.
	DeleteRequestCost(name string)

	// SetNamedMetric sets the relevant server metric.
	SetNamedMetric(name string, val float64)
	// DeleteNamedMetric deletes the relevant server metric to prevent it
	// from being sent.
	DeleteNamedMetric(name string)
}

type callMetricsRecorderCtxKey struct{}

// CallMetricsRecorderFromContext returns the RPC-specific custom metrics
// recorder embedded in the provided RPC context.
//
// Returns nil if no custom metrics recorder is found in the provided context,
// which will be the case when custom metrics reporting is not enabled.
func CallMetricsRecorderFromContext(ctx context.Context) CallMetricsRecorder {
	rw, ok := ctx.Value(callMetricsRecorderCtxKey{}).(*recorderWrapper)
	if !ok {
		return nil
	}
	return rw.recorder()
}

// recorderWrapper is a wrapper around a CallMetricsRecorder to ensure that
// concurrent calls to CallMetricsRecorderFromContext() results in only one
// allocation of the underlying metrics recorder, while also allowing for lazy
// initialization of the recorder itself.
type recorderWrapper struct {
	once sync.Once
	r    CallMetricsRecorder
	smp  ServerMetricsProvider
}

func (rw *recorderWrapper) recorder() CallMetricsRecorder {
	rw.once.Do(func() {
		rw.r = newServerMetricsRecorder()
	})
	return rw.r
}

// setTrailerMetadata adds a trailer metadata entry with key being set to
// `internal.TrailerMetadataKey` and value being set to the binary-encoded
// orca.OrcaLoadReport protobuf message.
//
// This function is called from the unary and streaming interceptors defined
// above. Any errors encountered here are not propagated to the caller because
// they are ignored there. Hence we simply log any errors encountered here at
// warning level, and return nothing.
func (rw *recorderWrapper) setTrailerMetadata(ctx context.Context) {
	var sm *ServerMetrics
	if rw.smp != nil {
		sm = rw.smp.ServerMetrics()
		sm.merge(rw.r.ServerMetrics())
	} else {
		sm = rw.r.ServerMetrics()
	}

	b, err := proto.Marshal(sm.toLoadReportProto())
	if err != nil {
		logger.Warningf("Failed to marshal load report: %v", err)
		return
	}
	if err := grpc.SetTrailer(ctx, metadata.Pairs(internal.TrailerMetadataKey, string(b))); err != nil {
		logger.Warningf("Failed to set trailer metadata: %v", err)
	}
}

var joinServerOptions = grpcinternal.JoinServerOptions.(func(...grpc.ServerOption) grpc.ServerOption)

// CallMetricsServerOption returns a server option which enables the reporting
// of per-RPC custom backend metrics for unary and streaming RPCs.
//
// Server applications interested in injecting custom backend metrics should
// pass the server option returned from this function as the first argument to
// grpc.NewServer().
//
// Subsequently, server RPC handlers can retrieve a reference to the RPC
// specific custom metrics recorder [CallMetricsRecorder] to be used, via a call
// to CallMetricsRecorderFromContext(), and inject custom metrics at any time
// during the RPC lifecycle.
//
// The injected custom metrics will be sent as part of trailer metadata, as a
// binary-encoded [ORCA LoadReport] protobuf message, with the metadata key
// being set be "endpoint-load-metrics-bin".
//
// If a non-nil ServerMetricsProvider is provided, the gRPC server will
// transmit the metrics it provides, overwritten by any per-RPC metrics given
// to the CallMetricsRecorder.  A ServerMetricsProvider is typically obtained
// by calling NewServerMetricsRecorder.
//
// [ORCA LoadReport]: https://github.com/cncf/xds/blob/main/xds/data/orca/v3/orca_load_report.proto#L15
func CallMetricsServerOption(smp ServerMetricsProvider) grpc.ServerOption {
	return joinServerOptions(grpc.ChainUnaryInterceptor(unaryInt(smp)), grpc.ChainStreamInterceptor(streamInt(smp)))
}

func unaryInt(smp ServerMetricsProvider) func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
	return func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
		// We don't allocate the metric recorder here. It will be allocated the
		// first time the user calls CallMetricsRecorderFromContext().
		rw := &recorderWrapper{smp: smp}
		ctxWithRecorder := newContextWithRecorderWrapper(ctx, rw)

		resp, err := handler(ctxWithRecorder, req)

		// It is safe to access the underlying metric recorder inside the wrapper at
		// this point, as the user's RPC handler is done executing, and therefore
		// there will be no more calls to CallMetricsRecorderFromContext(), which is
		// where the metric recorder is lazy allocated.
		if rw.r != nil {
			rw.setTrailerMetadata(ctx)
		}
		return resp, err
	}
}

func streamInt(smp ServerMetricsProvider) func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
	return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
		// We don't allocate the metric recorder here. It will be allocated the
		// first time the user calls CallMetricsRecorderFromContext().
		rw := &recorderWrapper{smp: smp}
		ws := &wrappedStream{
			ServerStream: ss,
			ctx:          newContextWithRecorderWrapper(ss.Context(), rw),
		}

		err := handler(srv, ws)

		// It is safe to access the underlying metric recorder inside the wrapper at
		// this point, as the user's RPC handler is done executing, and therefore
		// there will be no more calls to CallMetricsRecorderFromContext(), which is
		// where the metric recorder is lazy allocated.
		if rw.r != nil {
			rw.setTrailerMetadata(ss.Context())
		}
		return err
	}
}

func newContextWithRecorderWrapper(ctx context.Context, r *recorderWrapper) context.Context {
	return context.WithValue(ctx, callMetricsRecorderCtxKey{}, r)
}

// wrappedStream wraps the grpc.ServerStream received by the streaming
// interceptor. Overrides only the Context() method to return a context which
// contains a reference to the CallMetricsRecorder corresponding to this
// stream.
type wrappedStream struct {
	grpc.ServerStream
	ctx context.Context
}

func (w *wrappedStream) Context() context.Context {
	return w.ctx
}