1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
|
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package rls implements the RLS LB policy.
package rls
import (
"encoding/json"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"unsafe"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/balancergroup"
"google.golang.org/grpc/internal/buffer"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/resolver"
)
const (
// Name is the name of the RLS LB policy.
//
// It currently has an experimental suffix which would be removed once
// end-to-end testing of the policy is completed.
Name = internal.RLSLoadBalancingPolicyName
// Default frequency for data cache purging.
periodicCachePurgeFreq = time.Minute
)
var (
logger = grpclog.Component("rls")
errBalancerClosed = errors.New("rls LB policy is closed")
// Below defined vars for overriding in unit tests.
// Default exponential backoff strategy for data cache entries.
defaultBackoffStrategy = backoff.Strategy(backoff.DefaultExponential)
// Ticker used for periodic data cache purging.
dataCachePurgeTicker = func() *time.Ticker { return time.NewTicker(periodicCachePurgeFreq) }
// We want every cache entry to live in the cache for at least this
// duration. If we encounter a cache entry whose minimum expiration time is
// in the future, we abort the LRU pass, which may temporarily leave the
// cache being too large. This is necessary to ensure that in cases where
// the cache is too small, when we receive an RLS Response, we keep the
// resulting cache entry around long enough for the pending incoming
// requests to be re-processed through the new Picker. If we didn't do this,
// then we'd risk throwing away each RLS response as we receive it, in which
// case we would fail to actually route any of our incoming requests.
minEvictDuration = 5 * time.Second
// Following functions are no-ops in actual code, but can be overridden in
// tests to give tests visibility into exactly when certain events happen.
clientConnUpdateHook = func() {}
dataCachePurgeHook = func() {}
resetBackoffHook = func() {}
)
func init() {
balancer.Register(&rlsBB{})
}
type rlsBB struct{}
func (rlsBB) Name() string {
return Name
}
func (rlsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
lb := &rlsBalancer{
closed: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
cc: cc,
bopts: opts,
purgeTicker: dataCachePurgeTicker(),
dataCachePurgeHook: dataCachePurgeHook,
lbCfg: &lbConfig{},
pendingMap: make(map[cacheKey]*backoffState),
childPolicies: make(map[string]*childPolicyWrapper),
updateCh: buffer.NewUnbounded(),
}
lb.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-experimental-lb %p] ", lb))
lb.dataCache = newDataCache(maxCacheSize, lb.logger)
lb.bg = balancergroup.New(cc, opts, lb, lb.logger)
lb.bg.Start()
go lb.run()
return lb
}
// rlsBalancer implements the RLS LB policy.
type rlsBalancer struct {
closed *grpcsync.Event // Fires when Close() is invoked. Guarded by stateMu.
done *grpcsync.Event // Fires when Close() is done.
cc balancer.ClientConn
bopts balancer.BuildOptions
purgeTicker *time.Ticker
dataCachePurgeHook func()
logger *internalgrpclog.PrefixLogger
// If both cacheMu and stateMu need to be acquired, the former must be
// acquired first to prevent a deadlock. This order restriction is due to the
// fact that in places where we need to acquire both the locks, we always
// start off reading the cache.
// cacheMu guards access to the data cache and pending requests map. We
// cannot use an RWMutex here since even an operation like
// dataCache.getEntry() modifies the underlying LRU, which is implemented as
// a doubly linked list.
cacheMu sync.Mutex
dataCache *dataCache // Cache of RLS data.
pendingMap map[cacheKey]*backoffState // Map of pending RLS requests.
// stateMu guards access to all LB policy state.
stateMu sync.Mutex
lbCfg *lbConfig // Most recently received service config.
childPolicyBuilder balancer.Builder // Cached child policy builder.
resolverState resolver.State // Cached resolver state.
ctrlCh *controlChannel // Control channel to the RLS server.
bg *balancergroup.BalancerGroup
childPolicies map[string]*childPolicyWrapper
defaultPolicy *childPolicyWrapper
// A reference to the most recent picker sent to gRPC as part of a state
// update is cached in this field so that we can release the reference to the
// default child policy wrapper when a new picker is created. See
// sendNewPickerLocked() for details.
lastPicker *rlsPicker
// Set during UpdateClientConnState when pushing updates to child policies.
// Prevents state updates from child policies causing new pickers to be sent
// up the channel. Cleared after all child policies have processed the
// updates sent to them, after which a new picker is sent up the channel.
inhibitPickerUpdates bool
// Channel on which all updates are pushed. Processed in run().
updateCh *buffer.Unbounded
}
type resumePickerUpdates struct {
done chan struct{}
}
// childPolicyIDAndState wraps a child policy id and its state update.
type childPolicyIDAndState struct {
id string
state balancer.State
}
type controlChannelReady struct{}
// run is a long-running goroutine which handles all the updates that the
// balancer wishes to handle. The appropriate updateHandler will push the update
// on to a channel that this goroutine will select on, thereby the handling of
// the update will happen asynchronously.
func (b *rlsBalancer) run() {
// We exit out of the for loop below only after `Close()` has been invoked.
// Firing the done event here will ensure that Close() returns only after
// all goroutines are done.
defer func() { b.done.Fire() }()
// Wait for purgeDataCache() goroutine to exit before returning from here.
doneCh := make(chan struct{})
defer func() {
<-doneCh
}()
go b.purgeDataCache(doneCh)
for {
select {
case u, ok := <-b.updateCh.Get():
if !ok {
return
}
b.updateCh.Load()
switch update := u.(type) {
case childPolicyIDAndState:
b.handleChildPolicyStateUpdate(update.id, update.state)
case controlChannelReady:
b.logger.Infof("Resetting backoff state after control channel getting back to READY")
b.cacheMu.Lock()
updatePicker := b.dataCache.resetBackoffState(&backoffState{bs: defaultBackoffStrategy})
b.cacheMu.Unlock()
if updatePicker {
b.sendNewPicker()
}
resetBackoffHook()
case resumePickerUpdates:
b.stateMu.Lock()
b.logger.Infof("Resuming picker updates after config propagation to child policies")
b.inhibitPickerUpdates = false
b.sendNewPickerLocked()
close(update.done)
b.stateMu.Unlock()
default:
b.logger.Errorf("Unsupported update type %T", update)
}
case <-b.closed.Done():
return
}
}
}
// purgeDataCache is a long-running goroutine which periodically deletes expired
// entries. An expired entry is one for which both the expiryTime and
// backoffExpiryTime are in the past.
func (b *rlsBalancer) purgeDataCache(doneCh chan struct{}) {
defer close(doneCh)
for {
select {
case <-b.closed.Done():
return
case <-b.purgeTicker.C:
b.cacheMu.Lock()
updatePicker := b.dataCache.evictExpiredEntries()
b.cacheMu.Unlock()
if updatePicker {
b.sendNewPicker()
}
b.dataCachePurgeHook()
}
}
}
func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
defer clientConnUpdateHook()
b.stateMu.Lock()
if b.closed.HasFired() {
b.stateMu.Unlock()
b.logger.Warningf("Received service config after balancer close: %s", pretty.ToJSON(ccs.BalancerConfig))
return errBalancerClosed
}
newCfg := ccs.BalancerConfig.(*lbConfig)
if b.lbCfg.Equal(newCfg) {
b.stateMu.Unlock()
b.logger.Infof("New service config matches existing config")
return nil
}
b.logger.Infof("Delaying picker updates until config is propagated to and processed by child policies")
b.inhibitPickerUpdates = true
// When the RLS server name changes, the old control channel needs to be
// swapped out for a new one. All state associated with the throttling
// algorithm is stored on a per-control-channel basis; when we swap out
// channels, we also swap out the throttling state.
b.handleControlChannelUpdate(newCfg)
// Any changes to child policy name or configuration needs to be handled by
// either creating new child policies or pushing updates to existing ones.
b.resolverState = ccs.ResolverState
b.handleChildPolicyConfigUpdate(newCfg, &ccs)
// Resize the cache if the size in the config has changed.
resizeCache := newCfg.cacheSizeBytes != b.lbCfg.cacheSizeBytes
// Update the copy of the config in the LB policy before releasing the lock.
b.lbCfg = newCfg
// Enqueue an event which will notify us when the above update has been
// propagated to all child policies, and the child policies have all
// processed their updates, and we have sent a picker update.
done := make(chan struct{})
b.updateCh.Put(resumePickerUpdates{done: done})
b.stateMu.Unlock()
<-done
if resizeCache {
// If the new config changes reduces the size of the data cache, we
// might have to evict entries to get the cache size down to the newly
// specified size.
//
// And we cannot do this operation above (where we compute the
// `resizeCache` boolean) because `cacheMu` needs to be grabbed before
// `stateMu` if we are to hold both locks at the same time.
b.cacheMu.Lock()
b.dataCache.resize(newCfg.cacheSizeBytes)
b.cacheMu.Unlock()
}
return nil
}
// handleControlChannelUpdate handles updates to service config fields which
// influence the control channel to the RLS server.
//
// Caller must hold lb.stateMu.
func (b *rlsBalancer) handleControlChannelUpdate(newCfg *lbConfig) {
if newCfg.lookupService == b.lbCfg.lookupService && newCfg.lookupServiceTimeout == b.lbCfg.lookupServiceTimeout {
return
}
// Create a new control channel and close the existing one.
b.logger.Infof("Creating control channel to RLS server at: %v", newCfg.lookupService)
backToReadyFn := func() {
b.updateCh.Put(controlChannelReady{})
}
ctrlCh, err := newControlChannel(newCfg.lookupService, newCfg.controlChannelServiceConfig, newCfg.lookupServiceTimeout, b.bopts, backToReadyFn)
if err != nil {
// This is very uncommon and usually represents a non-transient error.
// There is not much we can do here other than wait for another update
// which might fix things.
b.logger.Errorf("Failed to create control channel to %q: %v", newCfg.lookupService, err)
return
}
if b.ctrlCh != nil {
b.ctrlCh.close()
}
b.ctrlCh = ctrlCh
}
// handleChildPolicyConfigUpdate handles updates to service config fields which
// influence child policy configuration.
//
// Caller must hold lb.stateMu.
func (b *rlsBalancer) handleChildPolicyConfigUpdate(newCfg *lbConfig, ccs *balancer.ClientConnState) {
// Update child policy builder first since other steps are dependent on this.
if b.childPolicyBuilder == nil || b.childPolicyBuilder.Name() != newCfg.childPolicyName {
b.logger.Infof("Child policy changed to %q", newCfg.childPolicyName)
b.childPolicyBuilder = balancer.Get(newCfg.childPolicyName)
for _, cpw := range b.childPolicies {
// If the child policy has changed, we need to remove the old policy
// from the BalancerGroup and add a new one. The BalancerGroup takes
// care of closing the old one in this case.
b.bg.Remove(cpw.target)
b.bg.Add(cpw.target, b.childPolicyBuilder)
}
}
configSentToDefault := false
if b.lbCfg.defaultTarget != newCfg.defaultTarget {
// If the default target has changed, create a new childPolicyWrapper for
// the new target if required. If a new wrapper is created, add it to the
// childPolicies map and the BalancerGroup.
b.logger.Infof("Default target in LB config changing from %q to %q", b.lbCfg.defaultTarget, newCfg.defaultTarget)
cpw := b.childPolicies[newCfg.defaultTarget]
if cpw == nil {
cpw = newChildPolicyWrapper(newCfg.defaultTarget)
b.childPolicies[newCfg.defaultTarget] = cpw
b.bg.Add(newCfg.defaultTarget, b.childPolicyBuilder)
b.logger.Infof("Child policy %q added to BalancerGroup", newCfg.defaultTarget)
}
if err := b.buildAndPushChildPolicyConfigs(newCfg.defaultTarget, newCfg, ccs); err != nil {
cpw.lamify(err)
}
// If an old default exists, release its reference. If this was the last
// reference, remove the child policy from the BalancerGroup and remove the
// corresponding entry the childPolicies map.
if b.defaultPolicy != nil {
if b.defaultPolicy.releaseRef() {
delete(b.childPolicies, b.lbCfg.defaultTarget)
b.bg.Remove(b.defaultPolicy.target)
}
}
b.defaultPolicy = cpw
configSentToDefault = true
}
// No change in configuration affecting child policies. Return early.
if b.lbCfg.childPolicyName == newCfg.childPolicyName && b.lbCfg.childPolicyTargetField == newCfg.childPolicyTargetField && childPolicyConfigEqual(b.lbCfg.childPolicyConfig, newCfg.childPolicyConfig) {
return
}
// If fields affecting child policy configuration have changed, the changes
// are pushed to the childPolicyWrapper which handles them appropriately.
for _, cpw := range b.childPolicies {
if configSentToDefault && cpw.target == newCfg.defaultTarget {
// Default target has already been taken care of.
continue
}
if err := b.buildAndPushChildPolicyConfigs(cpw.target, newCfg, ccs); err != nil {
cpw.lamify(err)
}
}
}
// buildAndPushChildPolicyConfigs builds the final child policy configuration by
// adding the `targetField` to the base child policy configuration received in
// RLS LB policy configuration. The `targetField` is set to target and
// configuration is pushed to the child policy through the BalancerGroup.
//
// Caller must hold lb.stateMu.
func (b *rlsBalancer) buildAndPushChildPolicyConfigs(target string, newCfg *lbConfig, ccs *balancer.ClientConnState) error {
jsonTarget, err := json.Marshal(target)
if err != nil {
return fmt.Errorf("failed to marshal child policy target %q: %v", target, err)
}
config := newCfg.childPolicyConfig
targetField := newCfg.childPolicyTargetField
config[targetField] = jsonTarget
jsonCfg, err := json.Marshal(config)
if err != nil {
return fmt.Errorf("failed to marshal child policy config %+v: %v", config, err)
}
parser, _ := b.childPolicyBuilder.(balancer.ConfigParser)
parsedCfg, err := parser.ParseConfig(jsonCfg)
if err != nil {
return fmt.Errorf("childPolicy config parsing failed: %v", err)
}
state := balancer.ClientConnState{ResolverState: ccs.ResolverState, BalancerConfig: parsedCfg}
b.logger.Infof("Pushing new state to child policy %q: %+v", target, state)
if err := b.bg.UpdateClientConnState(target, state); err != nil {
b.logger.Warningf("UpdateClientConnState(%q, %+v) failed : %v", target, ccs, err)
}
return nil
}
func (b *rlsBalancer) ResolverError(err error) {
b.bg.ResolverError(err)
}
func (b *rlsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
b.bg.UpdateSubConnState(sc, state)
}
func (b *rlsBalancer) Close() {
b.stateMu.Lock()
b.closed.Fire()
b.purgeTicker.Stop()
if b.ctrlCh != nil {
b.ctrlCh.close()
}
b.bg.Close()
b.stateMu.Unlock()
b.cacheMu.Lock()
b.dataCache.stop()
b.cacheMu.Unlock()
b.updateCh.Close()
<-b.done.Done()
}
func (b *rlsBalancer) ExitIdle() {
b.bg.ExitIdle()
}
// sendNewPickerLocked pushes a new picker on to the channel.
//
// Note that regardless of what connectivity state is reported, the policy will
// return its own picker, and not a picker that unconditionally queues
// (typically used for IDLE or CONNECTING) or a picker that unconditionally
// fails (typically used for TRANSIENT_FAILURE). This is required because,
// irrespective of the connectivity state, we need to able to perform RLS
// lookups for incoming RPCs and affect the status of queued RPCs based on the
// receipt of RLS responses.
//
// Caller must hold lb.stateMu.
func (b *rlsBalancer) sendNewPickerLocked() {
aggregatedState := b.aggregatedConnectivityState()
// Acquire a separate reference for the picker. This is required to ensure
// that the wrapper held by the old picker is not closed when the default
// target changes in the config, and a new wrapper is created for the new
// default target. See handleChildPolicyConfigUpdate() for how config changes
// affecting the default target are handled.
if b.defaultPolicy != nil {
b.defaultPolicy.acquireRef()
}
picker := &rlsPicker{
kbm: b.lbCfg.kbMap,
origEndpoint: b.bopts.Target.Endpoint(),
lb: b,
defaultPolicy: b.defaultPolicy,
ctrlCh: b.ctrlCh,
maxAge: b.lbCfg.maxAge,
staleAge: b.lbCfg.staleAge,
bg: b.bg,
}
picker.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-picker %p] ", picker))
state := balancer.State{
ConnectivityState: aggregatedState,
Picker: picker,
}
if !b.inhibitPickerUpdates {
b.logger.Infof("New balancer.State: %+v", state)
b.cc.UpdateState(state)
} else {
b.logger.Infof("Delaying picker update: %+v", state)
}
if b.lastPicker != nil {
if b.defaultPolicy != nil {
b.defaultPolicy.releaseRef()
}
}
b.lastPicker = picker
}
func (b *rlsBalancer) sendNewPicker() {
b.stateMu.Lock()
defer b.stateMu.Unlock()
if b.closed.HasFired() {
return
}
b.sendNewPickerLocked()
}
// The aggregated connectivity state reported is determined as follows:
// - If there is at least one child policy in state READY, the connectivity
// state is READY.
// - Otherwise, if there is at least one child policy in state CONNECTING, the
// connectivity state is CONNECTING.
// - Otherwise, if there is at least one child policy in state IDLE, the
// connectivity state is IDLE.
// - Otherwise, all child policies are in TRANSIENT_FAILURE, and the
// connectivity state is TRANSIENT_FAILURE.
//
// If the RLS policy has no child policies and no configured default target,
// then we will report connectivity state IDLE.
//
// Caller must hold lb.stateMu.
func (b *rlsBalancer) aggregatedConnectivityState() connectivity.State {
if len(b.childPolicies) == 0 && b.lbCfg.defaultTarget == "" {
return connectivity.Idle
}
var readyN, connectingN, idleN int
for _, cpw := range b.childPolicies {
state := (*balancer.State)(atomic.LoadPointer(&cpw.state))
switch state.ConnectivityState {
case connectivity.Ready:
readyN++
case connectivity.Connecting:
connectingN++
case connectivity.Idle:
idleN++
}
}
switch {
case readyN > 0:
return connectivity.Ready
case connectingN > 0:
return connectivity.Connecting
case idleN > 0:
return connectivity.Idle
default:
return connectivity.TransientFailure
}
}
// UpdateState is a implementation of the balancergroup.BalancerStateAggregator
// interface. The actual state aggregation functionality is handled
// asynchronously. This method only pushes the state update on to channel read
// and dispatched by the run() goroutine.
func (b *rlsBalancer) UpdateState(id string, state balancer.State) {
b.updateCh.Put(childPolicyIDAndState{id: id, state: state})
}
// handleChildPolicyStateUpdate provides the state aggregator functionality for
// the BalancerGroup.
//
// This method is invoked by the BalancerGroup whenever a child policy sends a
// state update. We cache the child policy's connectivity state and picker for
// two reasons:
// - to suppress connectivity state transitions from TRANSIENT_FAILURE to states
// other than READY
// - to delegate picks to child policies
func (b *rlsBalancer) handleChildPolicyStateUpdate(id string, newState balancer.State) {
b.stateMu.Lock()
defer b.stateMu.Unlock()
cpw := b.childPolicies[id]
if cpw == nil {
// All child policies start with an entry in the map. If ID is not in
// map, it's either been removed, or never existed.
b.logger.Warningf("Received state update %+v for missing child policy %q", newState, id)
return
}
oldState := (*balancer.State)(atomic.LoadPointer(&cpw.state))
if oldState.ConnectivityState == connectivity.TransientFailure && newState.ConnectivityState == connectivity.Connecting {
// Ignore state transitions from TRANSIENT_FAILURE to CONNECTING, and thus
// fail pending RPCs instead of queuing them indefinitely when all
// subChannels are failing, even if the subChannels are bouncing back and
// forth between CONNECTING and TRANSIENT_FAILURE.
return
}
atomic.StorePointer(&cpw.state, unsafe.Pointer(&newState))
b.logger.Infof("Child policy %q has new state %+v", id, newState)
b.sendNewPickerLocked()
}
// acquireChildPolicyReferences attempts to acquire references to
// childPolicyWrappers corresponding to the passed in targets. If there is no
// childPolicyWrapper corresponding to one of the targets, a new one is created
// and added to the BalancerGroup.
func (b *rlsBalancer) acquireChildPolicyReferences(targets []string) []*childPolicyWrapper {
b.stateMu.Lock()
var newChildPolicies []*childPolicyWrapper
for _, target := range targets {
// If the target exists in the LB policy's childPolicies map. a new
// reference is taken here and added to the new list.
if cpw := b.childPolicies[target]; cpw != nil {
cpw.acquireRef()
newChildPolicies = append(newChildPolicies, cpw)
continue
}
// If the target does not exist in the child policy map, then a new
// child policy wrapper is created and added to the new list.
cpw := newChildPolicyWrapper(target)
b.childPolicies[target] = cpw
b.bg.Add(target, b.childPolicyBuilder)
b.logger.Infof("Child policy %q added to BalancerGroup", target)
newChildPolicies = append(newChildPolicies, cpw)
if err := b.buildAndPushChildPolicyConfigs(target, b.lbCfg, &balancer.ClientConnState{
ResolverState: b.resolverState,
}); err != nil {
cpw.lamify(err)
}
}
b.stateMu.Unlock()
return newChildPolicies
}
// releaseChildPolicyReferences releases references to childPolicyWrappers
// corresponding to the passed in targets. If the release reference was the last
// one, the child policy is removed from the BalancerGroup.
func (b *rlsBalancer) releaseChildPolicyReferences(targets []string) {
b.stateMu.Lock()
for _, target := range targets {
if cpw := b.childPolicies[target]; cpw.releaseRef() {
delete(b.childPolicies, cpw.target)
b.bg.Remove(cpw.target)
}
}
b.stateMu.Unlock()
}
|