1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
|
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package rls
import (
"errors"
"fmt"
"strings"
"sync/atomic"
"time"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/rls/internal/keys"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
var (
errRLSThrottled = errors.New("RLS call throttled at client side")
// Function to compute data cache entry size.
computeDataCacheEntrySize = dcEntrySize
)
// exitIdler wraps the only method on the BalancerGroup that the picker calls.
type exitIdler interface {
ExitIdleOne(id string)
}
// rlsPicker selects the subConn to be used for a particular RPC. It does not
// manage subConns directly and delegates to pickers provided by child policies.
type rlsPicker struct {
// The keyBuilder map used to generate RLS keys for the RPC. This is built
// by the LB policy based on the received ServiceConfig.
kbm keys.BuilderMap
// Endpoint from the user's original dial target. Used to set the `host_key`
// field in `extra_keys`.
origEndpoint string
lb *rlsBalancer
// The picker is given its own copy of the below fields from the RLS LB policy
// to avoid having to grab the mutex on the latter.
defaultPolicy *childPolicyWrapper // Child policy for the default target.
ctrlCh *controlChannel // Control channel to the RLS server.
maxAge time.Duration // Cache max age from LB config.
staleAge time.Duration // Cache stale age from LB config.
bg exitIdler
logger *internalgrpclog.PrefixLogger
}
// isFullMethodNameValid return true if name is of the form `/service/method`.
func isFullMethodNameValid(name string) bool {
return strings.HasPrefix(name, "/") && strings.Count(name, "/") == 2
}
// Pick makes the routing decision for every outbound RPC.
func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
if name := info.FullMethodName; !isFullMethodNameValid(name) {
return balancer.PickResult{}, fmt.Errorf("rls: method name %q is not of the form '/service/method", name)
}
// Build the request's keys using the key builders from LB config.
md, _ := metadata.FromOutgoingContext(info.Ctx)
reqKeys := p.kbm.RLSKey(md, p.origEndpoint, info.FullMethodName)
p.lb.cacheMu.Lock()
defer p.lb.cacheMu.Unlock()
// Lookup data cache and pending request map using request path and keys.
cacheKey := cacheKey{path: info.FullMethodName, keys: reqKeys.Str}
dcEntry := p.lb.dataCache.getEntry(cacheKey)
pendingEntry := p.lb.pendingMap[cacheKey]
now := time.Now()
switch {
// No data cache entry. No pending request.
case dcEntry == nil && pendingEntry == nil:
throttled := p.sendRouteLookupRequestLocked(cacheKey, &backoffState{bs: defaultBackoffStrategy}, reqKeys.Map, rlspb.RouteLookupRequest_REASON_MISS, "")
if throttled {
return p.useDefaultPickIfPossible(info, errRLSThrottled)
}
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
// No data cache entry. Pending request exits.
case dcEntry == nil && pendingEntry != nil:
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
// Data cache hit. No pending request.
case dcEntry != nil && pendingEntry == nil:
if dcEntry.expiryTime.After(now) {
if !dcEntry.staleTime.IsZero() && dcEntry.staleTime.Before(now) && dcEntry.backoffTime.Before(now) {
p.sendRouteLookupRequestLocked(cacheKey, dcEntry.backoffState, reqKeys.Map, rlspb.RouteLookupRequest_REASON_STALE, dcEntry.headerData)
}
// Delegate to child policies.
res, err := p.delegateToChildPoliciesLocked(dcEntry, info)
return res, err
}
// We get here only if the data cache entry has expired. If entry is in
// backoff, delegate to default target or fail the pick.
if dcEntry.backoffState != nil && dcEntry.backoffTime.After(now) {
// Avoid propagating the status code received on control plane RPCs to the
// data plane which can lead to unexpected outcomes as we do not control
// the status code sent by the control plane. Propagating the status
// message received from the control plane is still fine, as it could be
// useful for debugging purposes.
st := dcEntry.status
return p.useDefaultPickIfPossible(info, status.Error(codes.Unavailable, fmt.Sprintf("most recent error from RLS server: %v", st.Error())))
}
// We get here only if the entry has expired and is not in backoff.
throttled := p.sendRouteLookupRequestLocked(cacheKey, dcEntry.backoffState, reqKeys.Map, rlspb.RouteLookupRequest_REASON_MISS, "")
if throttled {
return p.useDefaultPickIfPossible(info, errRLSThrottled)
}
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
// Data cache hit. Pending request exists.
default:
if dcEntry.expiryTime.After(now) {
res, err := p.delegateToChildPoliciesLocked(dcEntry, info)
return res, err
}
// Data cache entry has expired and pending request exists. Queue pick.
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
}
// delegateToChildPoliciesLocked is a helper function which iterates through the
// list of child policy wrappers in a cache entry and attempts to find a child
// policy to which this RPC can be routed to. If all child policies are in
// TRANSIENT_FAILURE, we delegate to the last child policy arbitrarily.
func (p *rlsPicker) delegateToChildPoliciesLocked(dcEntry *cacheEntry, info balancer.PickInfo) (balancer.PickResult, error) {
const rlsDataHeaderName = "x-google-rls-data"
for i, cpw := range dcEntry.childPolicyWrappers {
state := (*balancer.State)(atomic.LoadPointer(&cpw.state))
// Delegate to the child policy if it is not in TRANSIENT_FAILURE, or if
// it is the last one (which handles the case of delegating to the last
// child picker if all child polcies are in TRANSIENT_FAILURE).
if state.ConnectivityState != connectivity.TransientFailure || i == len(dcEntry.childPolicyWrappers)-1 {
// Any header data received from the RLS server is stored in the
// cache entry and needs to be sent to the actual backend in the
// X-Google-RLS-Data header.
res, err := state.Picker.Pick(info)
if err != nil {
return res, err
}
if res.Metadata == nil {
res.Metadata = metadata.Pairs(rlsDataHeaderName, dcEntry.headerData)
} else {
res.Metadata.Append(rlsDataHeaderName, dcEntry.headerData)
}
return res, nil
}
}
// In the unlikely event that we have a cache entry with no targets, we end up
// queueing the RPC.
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
// useDefaultPickIfPossible is a helper method which delegates to the default
// target if one is configured, or fails the pick with the given error.
func (p *rlsPicker) useDefaultPickIfPossible(info balancer.PickInfo, errOnNoDefault error) (balancer.PickResult, error) {
if p.defaultPolicy != nil {
state := (*balancer.State)(atomic.LoadPointer(&p.defaultPolicy.state))
return state.Picker.Pick(info)
}
return balancer.PickResult{}, errOnNoDefault
}
// sendRouteLookupRequestLocked adds an entry to the pending request map and
// sends out an RLS request using the passed in arguments. Returns a value
// indicating if the request was throttled by the client-side adaptive
// throttler.
func (p *rlsPicker) sendRouteLookupRequestLocked(cacheKey cacheKey, bs *backoffState, reqKeys map[string]string, reason rlspb.RouteLookupRequest_Reason, staleHeaders string) bool {
if p.lb.pendingMap[cacheKey] != nil {
return false
}
p.lb.pendingMap[cacheKey] = bs
throttled := p.ctrlCh.lookup(reqKeys, reason, staleHeaders, func(targets []string, headerData string, err error) {
p.handleRouteLookupResponse(cacheKey, targets, headerData, err)
})
if throttled {
delete(p.lb.pendingMap, cacheKey)
}
return throttled
}
// handleRouteLookupResponse is the callback invoked by the control channel upon
// receipt of an RLS response. Modifies the data cache and pending requests map
// and sends a new picker.
//
// Acquires the write-lock on the cache. Caller must not hold p.lb.cacheMu.
func (p *rlsPicker) handleRouteLookupResponse(cacheKey cacheKey, targets []string, headerData string, err error) {
p.logger.Infof("Received RLS response for key %+v with targets %+v, headerData %q, err: %v", cacheKey, targets, headerData, err)
p.lb.cacheMu.Lock()
defer func() {
// Pending request map entry is unconditionally deleted since the request is
// no longer pending.
p.logger.Infof("Removing pending request entry for key %+v", cacheKey)
delete(p.lb.pendingMap, cacheKey)
p.lb.sendNewPicker()
p.lb.cacheMu.Unlock()
}()
// Lookup the data cache entry or create a new one.
dcEntry := p.lb.dataCache.getEntry(cacheKey)
if dcEntry == nil {
dcEntry = &cacheEntry{}
if _, ok := p.lb.dataCache.addEntry(cacheKey, dcEntry); !ok {
// This is a very unlikely case where we are unable to add a
// data cache entry. Log and leave.
p.logger.Warningf("Failed to add data cache entry for %+v", cacheKey)
return
}
}
// For failed requests, the data cache entry is modified as follows:
// - status is set to error returned from the control channel
// - current backoff state is available in the pending entry
// - `retries` field is incremented and
// - backoff state is moved to the data cache
// - backoffTime is set to the time indicated by the backoff state
// - backoffExpirationTime is set to twice the backoff time
// - backoffTimer is set to fire after backoffTime
//
// When a proactive cache refresh fails, this would leave the targets and the
// expiry time from the old entry unchanged. And this mean that the old valid
// entry would be used until expiration, and a new picker would be sent upon
// backoff expiry.
now := time.Now()
if err != nil {
dcEntry.status = err
pendingEntry := p.lb.pendingMap[cacheKey]
pendingEntry.retries++
backoffTime := pendingEntry.bs.Backoff(pendingEntry.retries)
dcEntry.backoffState = pendingEntry
dcEntry.backoffTime = now.Add(backoffTime)
dcEntry.backoffExpiryTime = now.Add(2 * backoffTime)
if dcEntry.backoffState.timer != nil {
dcEntry.backoffState.timer.Stop()
}
dcEntry.backoffState.timer = time.AfterFunc(backoffTime, p.lb.sendNewPicker)
return
}
// For successful requests, the cache entry is modified as follows:
// - childPolicyWrappers is set to point to the child policy wrappers
// associated with the targets specified in the received response
// - headerData is set to the value received in the response
// - expiryTime, stateTime and earliestEvictionTime are set
// - status is set to nil (OK status)
// - backoff state is cleared
p.setChildPolicyWrappersInCacheEntry(dcEntry, targets)
dcEntry.headerData = headerData
dcEntry.expiryTime = now.Add(p.maxAge)
if p.staleAge != 0 {
dcEntry.staleTime = now.Add(p.staleAge)
}
dcEntry.earliestEvictTime = now.Add(minEvictDuration)
dcEntry.status = nil
dcEntry.backoffState = &backoffState{bs: defaultBackoffStrategy}
dcEntry.backoffTime = time.Time{}
dcEntry.backoffExpiryTime = time.Time{}
p.lb.dataCache.updateEntrySize(dcEntry, computeDataCacheEntrySize(cacheKey, dcEntry))
}
// setChildPolicyWrappersInCacheEntry sets up the childPolicyWrappers field in
// the cache entry to point to the child policy wrappers for the targets
// specified in the RLS response.
//
// Caller must hold a write-lock on p.lb.cacheMu.
func (p *rlsPicker) setChildPolicyWrappersInCacheEntry(dcEntry *cacheEntry, newTargets []string) {
// If the childPolicyWrappers field is already pointing to the right targets,
// then the field's value does not need to change.
targetsChanged := true
func() {
if cpws := dcEntry.childPolicyWrappers; cpws != nil {
if len(newTargets) != len(cpws) {
return
}
for i, target := range newTargets {
if cpws[i].target != target {
return
}
}
targetsChanged = false
}
}()
if !targetsChanged {
return
}
// If the childPolicyWrappers field is not already set to the right targets,
// then it must be reset. We construct a new list of child policies and
// then swap out the old list for the new one.
newChildPolicies := p.lb.acquireChildPolicyReferences(newTargets)
oldChildPolicyTargets := make([]string, len(dcEntry.childPolicyWrappers))
for i, cpw := range dcEntry.childPolicyWrappers {
oldChildPolicyTargets[i] = cpw.target
}
p.lb.releaseChildPolicyReferences(oldChildPolicyTargets)
dcEntry.childPolicyWrappers = newChildPolicies
}
func dcEntrySize(key cacheKey, entry *cacheEntry) int64 {
return int64(len(key.path) + len(key.keys) + len(entry.headerData))
}
|