aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/google.golang.org/grpc/balancer/rls/control_channel.go
blob: 4acc11d90e944c2f99b4f3a8a64b4b3fdf6a0efd (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
/*
 *
 * Copyright 2021 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package rls

import (
	"context"
	"fmt"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/balancer"
	"google.golang.org/grpc/balancer/rls/internal/adaptive"
	"google.golang.org/grpc/connectivity"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/grpc/internal"
	internalgrpclog "google.golang.org/grpc/internal/grpclog"
	"google.golang.org/grpc/internal/pretty"
	rlsgrpc "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
	rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
)

var newAdaptiveThrottler = func() adaptiveThrottler { return adaptive.New() }

type adaptiveThrottler interface {
	ShouldThrottle() bool
	RegisterBackendResponse(throttled bool)
}

// controlChannel is a wrapper around the gRPC channel to the RLS server
// specified in the service config.
type controlChannel struct {
	// rpcTimeout specifies the timeout for the RouteLookup RPC call. The LB
	// policy receives this value in its service config.
	rpcTimeout time.Duration
	// backToReadyFunc is a callback to be invoked when the connectivity state
	// changes from READY --> TRANSIENT_FAILURE --> READY.
	backToReadyFunc func()
	// throttler in an adaptive throttling implementation used to avoid
	// hammering the RLS service while it is overloaded or down.
	throttler adaptiveThrottler

	cc     *grpc.ClientConn
	client rlsgrpc.RouteLookupServiceClient
	logger *internalgrpclog.PrefixLogger
}

// newControlChannel creates a controlChannel to rlsServerName and uses
// serviceConfig, if non-empty, as the default service config for the underlying
// gRPC channel.
func newControlChannel(rlsServerName, serviceConfig string, rpcTimeout time.Duration, bOpts balancer.BuildOptions, backToReadyFunc func()) (*controlChannel, error) {
	ctrlCh := &controlChannel{
		rpcTimeout:      rpcTimeout,
		backToReadyFunc: backToReadyFunc,
		throttler:       newAdaptiveThrottler(),
	}
	ctrlCh.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-control-channel %p] ", ctrlCh))

	dopts, err := ctrlCh.dialOpts(bOpts, serviceConfig)
	if err != nil {
		return nil, err
	}
	ctrlCh.cc, err = grpc.Dial(rlsServerName, dopts...)
	if err != nil {
		return nil, err
	}
	ctrlCh.client = rlsgrpc.NewRouteLookupServiceClient(ctrlCh.cc)
	ctrlCh.logger.Infof("Control channel created to RLS server at: %v", rlsServerName)

	go ctrlCh.monitorConnectivityState()
	return ctrlCh, nil
}

// dialOpts constructs the dial options for the control plane channel.
func (cc *controlChannel) dialOpts(bOpts balancer.BuildOptions, serviceConfig string) ([]grpc.DialOption, error) {
	// The control plane channel will use the same authority as the parent
	// channel for server authorization. This ensures that the identity of the
	// RLS server and the identity of the backends is the same, so if the RLS
	// config is injected by an attacker, it cannot cause leakage of private
	// information contained in headers set by the application.
	dopts := []grpc.DialOption{grpc.WithAuthority(bOpts.Authority)}
	if bOpts.Dialer != nil {
		dopts = append(dopts, grpc.WithContextDialer(bOpts.Dialer))
	}

	// The control channel will use the channel credentials from the parent
	// channel, including any call creds associated with the channel creds.
	var credsOpt grpc.DialOption
	switch {
	case bOpts.DialCreds != nil:
		credsOpt = grpc.WithTransportCredentials(bOpts.DialCreds.Clone())
	case bOpts.CredsBundle != nil:
		// The "fallback" mode in google default credentials (which is the only
		// type of credentials we expect to be used with RLS) uses TLS/ALTS
		// creds for transport and uses the same call creds as that on the
		// parent bundle.
		bundle, err := bOpts.CredsBundle.NewWithMode(internal.CredsBundleModeFallback)
		if err != nil {
			return nil, err
		}
		credsOpt = grpc.WithCredentialsBundle(bundle)
	default:
		cc.logger.Warningf("no credentials available, using Insecure")
		credsOpt = grpc.WithTransportCredentials(insecure.NewCredentials())
	}
	dopts = append(dopts, credsOpt)

	// If the RLS LB policy's configuration specified a service config for the
	// control channel, use that and disable service config fetching via the name
	// resolver for the control channel.
	if serviceConfig != "" {
		cc.logger.Infof("Disabling service config from the name resolver and instead using: %s", serviceConfig)
		dopts = append(dopts, grpc.WithDisableServiceConfig(), grpc.WithDefaultServiceConfig(serviceConfig))
	}

	return dopts, nil
}

func (cc *controlChannel) monitorConnectivityState() {
	cc.logger.Infof("Starting connectivity state monitoring goroutine")
	// Since we use two mechanisms to deal with RLS server being down:
	//   - adaptive throttling for the channel as a whole
	//   - exponential backoff on a per-request basis
	// we need a way to avoid double-penalizing requests by counting failures
	// toward both mechanisms when the RLS server is unreachable.
	//
	// To accomplish this, we monitor the state of the control plane channel. If
	// the state has been TRANSIENT_FAILURE since the last time it was in state
	// READY, and it then transitions into state READY, we push on a channel
	// which is being read by the LB policy.
	//
	// The LB the policy will iterate through the cache to reset the backoff
	// timeouts in all cache entries. Specifically, this means that it will
	// reset the backoff state and cancel the pending backoff timer. Note that
	// when cancelling the backoff timer, just like when the backoff timer fires
	// normally, a new picker is returned to the channel, to force it to
	// re-process any wait-for-ready RPCs that may still be queued if we failed
	// them while we were in backoff. However, we should optimize this case by
	// returning only one new picker, regardless of how many backoff timers are
	// cancelled.

	// Using the background context is fine here since we check for the ClientConn
	// entering SHUTDOWN and return early in that case.
	ctx := context.Background()

	first := true
	for {
		// Wait for the control channel to become READY.
		for s := cc.cc.GetState(); s != connectivity.Ready; s = cc.cc.GetState() {
			if s == connectivity.Shutdown {
				return
			}
			cc.cc.WaitForStateChange(ctx, s)
		}
		cc.logger.Infof("Connectivity state is READY")

		if !first {
			cc.logger.Infof("Control channel back to READY")
			cc.backToReadyFunc()
		}
		first = false

		// Wait for the control channel to move out of READY.
		cc.cc.WaitForStateChange(ctx, connectivity.Ready)
		if cc.cc.GetState() == connectivity.Shutdown {
			return
		}
		cc.logger.Infof("Connectivity state is %s", cc.cc.GetState())
	}
}

func (cc *controlChannel) close() {
	cc.logger.Infof("Closing control channel")
	cc.cc.Close()
}

type lookupCallback func(targets []string, headerData string, err error)

// lookup starts a RouteLookup RPC in a separate goroutine and returns the
// results (and error, if any) in the provided callback.
//
// The returned boolean indicates whether the request was throttled by the
// client-side adaptive throttling algorithm in which case the provided callback
// will not be invoked.
func (cc *controlChannel) lookup(reqKeys map[string]string, reason rlspb.RouteLookupRequest_Reason, staleHeaders string, cb lookupCallback) (throttled bool) {
	if cc.throttler.ShouldThrottle() {
		cc.logger.Infof("RLS request throttled by client-side adaptive throttling")
		return true
	}
	go func() {
		req := &rlspb.RouteLookupRequest{
			TargetType:      "grpc",
			KeyMap:          reqKeys,
			Reason:          reason,
			StaleHeaderData: staleHeaders,
		}
		cc.logger.Infof("Sending RLS request %+v", pretty.ToJSON(req))

		ctx, cancel := context.WithTimeout(context.Background(), cc.rpcTimeout)
		defer cancel()
		resp, err := cc.client.RouteLookup(ctx, req)
		cb(resp.GetTargets(), resp.GetHeaderData(), err)
	}()
	return false
}