aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/google.golang.org/grpc/internal/balancergroup/balancergroup.go
blob: c1f7e75c3ec87139afa7799463116b68ba9b338d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
/*
 * Copyright 2019 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

// Package balancergroup implements a utility struct to bind multiple balancers
// into one balancer.
package balancergroup

import (
	"fmt"
	"sync"
	"time"

	"google.golang.org/grpc/balancer"
	"google.golang.org/grpc/connectivity"
	"google.golang.org/grpc/internal/balancer/gracefulswitch"
	"google.golang.org/grpc/internal/cache"
	"google.golang.org/grpc/internal/grpclog"
	"google.golang.org/grpc/resolver"
)

// subBalancerWrapper is used to keep the configurations that will be used to start
// the underlying balancer. It can be called to start/stop the underlying
// balancer.
//
// When the config changes, it will pass the update to the underlying balancer
// if it exists.
//
// TODO: move to a separate file?
type subBalancerWrapper struct {
	// subBalancerWrapper is passed to the sub-balancer as a ClientConn
	// wrapper, only to keep the state and picker.  When sub-balancer is
	// restarted while in cache, the picker needs to be resent.
	//
	// It also contains the sub-balancer ID, so the parent balancer group can
	// keep track of SubConn/pickers and the sub-balancers they belong to. Some
	// of the actions are forwarded to the parent ClientConn with no change.
	// Some are forward to balancer group with the sub-balancer ID.
	balancer.ClientConn
	id    string
	group *BalancerGroup

	mu    sync.Mutex
	state balancer.State

	// The static part of sub-balancer. Keeps balancerBuilders and addresses.
	// To be used when restarting sub-balancer.
	builder balancer.Builder
	// Options to be passed to sub-balancer at the time of creation.
	buildOpts balancer.BuildOptions
	// ccState is a cache of the addresses/balancer config, so when the balancer
	// is restarted after close, it will get the previous update. It's a pointer
	// and is set to nil at init, so when the balancer is built for the first
	// time (not a restart), it won't receive an empty update. Note that this
	// isn't reset to nil when the underlying balancer is closed.
	ccState *balancer.ClientConnState
	// The dynamic part of sub-balancer. Only used when balancer group is
	// started. Gets cleared when sub-balancer is closed.
	balancer *gracefulswitch.Balancer
}

// UpdateState overrides balancer.ClientConn, to keep state and picker.
func (sbc *subBalancerWrapper) UpdateState(state balancer.State) {
	sbc.mu.Lock()
	sbc.state = state
	sbc.group.updateBalancerState(sbc.id, state)
	sbc.mu.Unlock()
}

// NewSubConn overrides balancer.ClientConn, so balancer group can keep track of
// the relation between subconns and sub-balancers.
func (sbc *subBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
	return sbc.group.newSubConn(sbc, addrs, opts)
}

func (sbc *subBalancerWrapper) updateBalancerStateWithCachedPicker() {
	sbc.mu.Lock()
	if sbc.state.Picker != nil {
		sbc.group.updateBalancerState(sbc.id, sbc.state)
	}
	sbc.mu.Unlock()
}

func (sbc *subBalancerWrapper) startBalancer() {
	if sbc.balancer == nil {
		sbc.balancer = gracefulswitch.NewBalancer(sbc, sbc.buildOpts)
	}
	sbc.group.logger.Infof("Creating child policy of type %q for locality %q", sbc.builder.Name(), sbc.id)
	sbc.balancer.SwitchTo(sbc.builder)
	if sbc.ccState != nil {
		sbc.balancer.UpdateClientConnState(*sbc.ccState)
	}
}

// exitIdle invokes the sub-balancer's ExitIdle method. Returns a boolean
// indicating whether or not the operation was completed.
func (sbc *subBalancerWrapper) exitIdle() (complete bool) {
	b := sbc.balancer
	if b == nil {
		return true
	}
	b.ExitIdle()
	return true
}

func (sbc *subBalancerWrapper) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
	b := sbc.balancer
	if b == nil {
		// This sub-balancer was closed. This can happen when EDS removes a
		// locality. The balancer for this locality was already closed, and the
		// SubConns are being deleted. But SubConn state change can still
		// happen.
		return
	}
	b.UpdateSubConnState(sc, state)
}

func (sbc *subBalancerWrapper) updateClientConnState(s balancer.ClientConnState) error {
	sbc.ccState = &s
	b := sbc.balancer
	if b == nil {
		// This sub-balancer was closed. This should never happen because
		// sub-balancers are closed when the locality is removed from EDS, or
		// the balancer group is closed. There should be no further address
		// updates when either of this happened.
		//
		// This will be a common case with priority support, because a
		// sub-balancer (and the whole balancer group) could be closed because
		// it's the lower priority, but it can still get address updates.
		return nil
	}
	return b.UpdateClientConnState(s)
}

func (sbc *subBalancerWrapper) resolverError(err error) {
	b := sbc.balancer
	if b == nil {
		// This sub-balancer was closed. This should never happen because
		// sub-balancers are closed when the locality is removed from EDS, or
		// the balancer group is closed. There should be no further address
		// updates when either of this happened.
		//
		// This will be a common case with priority support, because a
		// sub-balancer (and the whole balancer group) could be closed because
		// it's the lower priority, but it can still get address updates.
		return
	}
	b.ResolverError(err)
}

func (sbc *subBalancerWrapper) gracefulSwitch(builder balancer.Builder) {
	sbc.builder = builder
	b := sbc.balancer
	// Even if you get an add and it persists builder but doesn't start
	// balancer, this would leave graceful switch being nil, in which we are
	// correctly overwriting with the recent builder here as well to use later.
	// The graceful switch balancer's presence is an invariant of whether the
	// balancer group is closed or not (if closed, nil, if started, present).
	if sbc.balancer != nil {
		sbc.group.logger.Infof("Switching child policy %v to type %v", sbc.id, sbc.builder.Name())
		b.SwitchTo(sbc.builder)
	}
}

func (sbc *subBalancerWrapper) stopBalancer() {
	if sbc.balancer == nil {
		return
	}
	sbc.balancer.Close()
	sbc.balancer = nil
}

// BalancerGroup takes a list of balancers, and make them into one balancer.
//
// Note that this struct doesn't implement balancer.Balancer, because it's not
// intended to be used directly as a balancer. It's expected to be used as a
// sub-balancer manager by a high level balancer.
//
//	Updates from ClientConn are forwarded to sub-balancers
//	- service config update
//	- address update
//	- subConn state change
//	  - find the corresponding balancer and forward
//
//	Actions from sub-balances are forwarded to parent ClientConn
//	- new/remove SubConn
//	- picker update and health states change
//	  - sub-pickers are sent to an aggregator provided by the parent, which
//	    will group them into a group-picker. The aggregated connectivity state is
//	    also handled by the aggregator.
//	- resolveNow
//
// Sub-balancers are only built when the balancer group is started. If the
// balancer group is closed, the sub-balancers are also closed. And it's
// guaranteed that no updates will be sent to parent ClientConn from a closed
// balancer group.
type BalancerGroup struct {
	cc        balancer.ClientConn
	buildOpts balancer.BuildOptions
	logger    *grpclog.PrefixLogger

	// stateAggregator is where the state/picker updates will be sent to. It's
	// provided by the parent balancer, to build a picker with all the
	// sub-pickers.
	stateAggregator BalancerStateAggregator

	// outgoingMu guards all operations in the direction:
	// ClientConn-->Sub-balancer. Including start, stop, resolver updates and
	// SubConn state changes.
	//
	// The corresponding boolean outgoingStarted is used to stop further updates
	// to sub-balancers after they are closed.
	outgoingMu         sync.Mutex
	outgoingStarted    bool
	idToBalancerConfig map[string]*subBalancerWrapper
	// Cache for sub-balancers when they are removed.
	balancerCache *cache.TimeoutCache

	// incomingMu is to make sure this balancer group doesn't send updates to cc
	// after it's closed.
	//
	// We don't share the mutex to avoid deadlocks (e.g. a call to sub-balancer
	// may call back to balancer group inline. It causes deaclock if they
	// require the same mutex).
	//
	// We should never need to hold multiple locks at the same time in this
	// struct. The case where two locks are held can only happen when the
	// underlying balancer calls back into balancer group inline. So there's an
	// implicit lock acquisition order that outgoingMu is locked before
	// incomingMu.

	// incomingMu guards all operations in the direction:
	// Sub-balancer-->ClientConn. Including NewSubConn, RemoveSubConn. It also
	// guards the map from SubConn to balancer ID, so updateSubConnState needs
	// to hold it shortly to find the sub-balancer to forward the update.
	//
	// UpdateState is called by the balancer state aggretator, and it will
	// decide when and whether to call.
	//
	// The corresponding boolean incomingStarted is used to stop further updates
	// from sub-balancers after they are closed.
	incomingMu      sync.Mutex
	incomingStarted bool // This boolean only guards calls back to ClientConn.
	scToSubBalancer map[balancer.SubConn]*subBalancerWrapper
}

// DefaultSubBalancerCloseTimeout is defined as a variable instead of const for
// testing.
//
// TODO: make it a parameter for New().
var DefaultSubBalancerCloseTimeout = 15 * time.Minute

// New creates a new BalancerGroup. Note that the BalancerGroup
// needs to be started to work.
func New(cc balancer.ClientConn, bOpts balancer.BuildOptions, stateAggregator BalancerStateAggregator, logger *grpclog.PrefixLogger) *BalancerGroup {
	return &BalancerGroup{
		cc:              cc,
		buildOpts:       bOpts,
		logger:          logger,
		stateAggregator: stateAggregator,

		idToBalancerConfig: make(map[string]*subBalancerWrapper),
		balancerCache:      cache.NewTimeoutCache(DefaultSubBalancerCloseTimeout),
		scToSubBalancer:    make(map[balancer.SubConn]*subBalancerWrapper),
	}
}

// Start starts the balancer group, including building all the sub-balancers,
// and send the existing addresses to them.
//
// A BalancerGroup can be closed and started later. When a BalancerGroup is
// closed, it can still receive address updates, which will be applied when
// restarted.
func (bg *BalancerGroup) Start() {
	bg.incomingMu.Lock()
	bg.incomingStarted = true
	bg.incomingMu.Unlock()

	bg.outgoingMu.Lock()
	if bg.outgoingStarted {
		bg.outgoingMu.Unlock()
		return
	}

	for _, config := range bg.idToBalancerConfig {
		config.startBalancer()
	}
	bg.outgoingStarted = true
	bg.outgoingMu.Unlock()
}

// AddWithClientConn adds a balancer with the given id to the group. The
// balancer is built with a balancer builder registered with balancerName. The
// given ClientConn is passed to the newly built balancer instead of the
// onepassed to balancergroup.New().
//
// TODO: Get rid of the existing Add() API and replace it with this.
func (bg *BalancerGroup) AddWithClientConn(id, balancerName string, cc balancer.ClientConn) error {
	bg.logger.Infof("Adding child policy of type %q for locality %q", balancerName, id)
	builder := balancer.Get(balancerName)
	if builder == nil {
		return fmt.Errorf("unregistered balancer name %q", balancerName)
	}

	// Store data in static map, and then check to see if bg is started.
	bg.outgoingMu.Lock()
	defer bg.outgoingMu.Unlock()
	var sbc *subBalancerWrapper
	// If outgoingStarted is true, search in the cache. Otherwise, cache is
	// guaranteed to be empty, searching is unnecessary.
	if bg.outgoingStarted {
		if old, ok := bg.balancerCache.Remove(id); ok {
			sbc, _ = old.(*subBalancerWrapper)
			if sbc != nil && sbc.builder != builder {
				// If the sub-balancer in cache was built with a different
				// balancer builder, don't use it, cleanup this old-balancer,
				// and behave as sub-balancer is not found in cache.
				//
				// NOTE that this will also drop the cached addresses for this
				// sub-balancer, which seems to be reasonable.
				sbc.stopBalancer()
				// cleanupSubConns must be done before the new balancer starts,
				// otherwise new SubConns created by the new balancer might be
				// removed by mistake.
				bg.cleanupSubConns(sbc)
				sbc = nil
			}
		}
	}
	if sbc == nil {
		sbc = &subBalancerWrapper{
			ClientConn: cc,
			id:         id,
			group:      bg,
			builder:    builder,
			buildOpts:  bg.buildOpts,
		}
		if bg.outgoingStarted {
			// Only start the balancer if bg is started. Otherwise, we only keep the
			// static data.
			sbc.startBalancer()
		}
	} else {
		// When brining back a sub-balancer from cache, re-send the cached
		// picker and state.
		sbc.updateBalancerStateWithCachedPicker()
	}
	bg.idToBalancerConfig[id] = sbc
	return nil
}

// Add adds a balancer built by builder to the group, with given id.
func (bg *BalancerGroup) Add(id string, builder balancer.Builder) {
	bg.AddWithClientConn(id, builder.Name(), bg.cc)
}

// UpdateBuilder updates the builder for a current child, starting the Graceful
// Switch process for that child.
//
// TODO: update this API to take the name of the new builder instead.
func (bg *BalancerGroup) UpdateBuilder(id string, builder balancer.Builder) {
	bg.outgoingMu.Lock()
	// This does not deal with the balancer cache because this call should come
	// after an Add call for a given child balancer. If the child is removed,
	// the caller will call Add if the child balancer comes back which would
	// then deal with the balancer cache.
	sbc := bg.idToBalancerConfig[id]
	if sbc == nil {
		// simply ignore it if not present, don't error
		return
	}
	sbc.gracefulSwitch(builder)
	bg.outgoingMu.Unlock()
}

// Remove removes the balancer with id from the group.
//
// But doesn't close the balancer. The balancer is kept in a cache, and will be
// closed after timeout. Cleanup work (closing sub-balancer and removing
// subconns) will be done after timeout.
func (bg *BalancerGroup) Remove(id string) {
	bg.logger.Infof("Removing child policy for locality %q", id)
	bg.outgoingMu.Lock()
	if sbToRemove, ok := bg.idToBalancerConfig[id]; ok {
		if bg.outgoingStarted {
			bg.balancerCache.Add(id, sbToRemove, func() {
				// A sub-balancer evicted from the timeout cache needs to closed
				// and its subConns need to removed, unconditionally. There is a
				// possibility that a sub-balancer might be removed (thereby
				// moving it to the cache) around the same time that the
				// balancergroup is closed, and by the time we get here the
				// balancergroup might be closed.  Check for `outgoingStarted ==
				// true` at that point can lead to a leaked sub-balancer.
				bg.outgoingMu.Lock()
				sbToRemove.stopBalancer()
				bg.outgoingMu.Unlock()
				bg.cleanupSubConns(sbToRemove)
			})
		}
		delete(bg.idToBalancerConfig, id)
	} else {
		bg.logger.Infof("balancer group: trying to remove a non-existing locality from balancer group: %v", id)
	}
	bg.outgoingMu.Unlock()
}

// bg.remove(id) doesn't do cleanup for the sub-balancer. This function does
// cleanup after the timeout.
func (bg *BalancerGroup) cleanupSubConns(config *subBalancerWrapper) {
	bg.incomingMu.Lock()
	// Remove SubConns. This is only done after the balancer is
	// actually closed.
	//
	// NOTE: if NewSubConn is called by this (closed) balancer later, the
	// SubConn will be leaked. This shouldn't happen if the balancer
	// implementation is correct. To make sure this never happens, we need to
	// add another layer (balancer manager) between balancer group and the
	// sub-balancers.
	for sc, b := range bg.scToSubBalancer {
		if b == config {
			delete(bg.scToSubBalancer, sc)
		}
	}
	bg.incomingMu.Unlock()
}

// connect attempts to connect to all subConns belonging to sb.
func (bg *BalancerGroup) connect(sb *subBalancerWrapper) {
	bg.incomingMu.Lock()
	for sc, b := range bg.scToSubBalancer {
		if b == sb {
			sc.Connect()
		}
	}
	bg.incomingMu.Unlock()
}

// Following are actions from the parent grpc.ClientConn, forward to sub-balancers.

// UpdateSubConnState handles the state for the subconn. It finds the
// corresponding balancer and forwards the update.
func (bg *BalancerGroup) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
	bg.incomingMu.Lock()
	config, ok := bg.scToSubBalancer[sc]
	if !ok {
		bg.incomingMu.Unlock()
		return
	}
	if state.ConnectivityState == connectivity.Shutdown {
		// Only delete sc from the map when state changed to Shutdown.
		delete(bg.scToSubBalancer, sc)
	}
	bg.incomingMu.Unlock()

	bg.outgoingMu.Lock()
	config.updateSubConnState(sc, state)
	bg.outgoingMu.Unlock()
}

// UpdateClientConnState handles ClientState (including balancer config and
// addresses) from resolver. It finds the balancer and forwards the update.
func (bg *BalancerGroup) UpdateClientConnState(id string, s balancer.ClientConnState) error {
	bg.outgoingMu.Lock()
	defer bg.outgoingMu.Unlock()
	if config, ok := bg.idToBalancerConfig[id]; ok {
		return config.updateClientConnState(s)
	}
	return nil
}

// ResolverError forwards resolver errors to all sub-balancers.
func (bg *BalancerGroup) ResolverError(err error) {
	bg.outgoingMu.Lock()
	for _, config := range bg.idToBalancerConfig {
		config.resolverError(err)
	}
	bg.outgoingMu.Unlock()
}

// Following are actions from sub-balancers, forward to ClientConn.

// newSubConn: forward to ClientConn, and also create a map from sc to balancer,
// so state update will find the right balancer.
//
// One note about removing SubConn: only forward to ClientConn, but not delete
// from map. Delete sc from the map only when state changes to Shutdown. Since
// it's just forwarding the action, there's no need for a removeSubConn()
// wrapper function.
func (bg *BalancerGroup) newSubConn(config *subBalancerWrapper, addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
	// NOTE: if balancer with id was already removed, this should also return
	// error. But since we call balancer.stopBalancer when removing the balancer, this
	// shouldn't happen.
	bg.incomingMu.Lock()
	if !bg.incomingStarted {
		bg.incomingMu.Unlock()
		return nil, fmt.Errorf("NewSubConn is called after balancer group is closed")
	}
	sc, err := bg.cc.NewSubConn(addrs, opts)
	if err != nil {
		bg.incomingMu.Unlock()
		return nil, err
	}
	bg.scToSubBalancer[sc] = config
	bg.incomingMu.Unlock()
	return sc, nil
}

// updateBalancerState: forward the new state to balancer state aggregator. The
// aggregator will create an aggregated picker and an aggregated connectivity
// state, then forward to ClientConn.
func (bg *BalancerGroup) updateBalancerState(id string, state balancer.State) {
	bg.logger.Infof("Balancer state update from locality %v, new state: %+v", id, state)

	// Send new state to the aggregator, without holding the incomingMu.
	// incomingMu is to protect all calls to the parent ClientConn, this update
	// doesn't necessary trigger a call to ClientConn, and should already be
	// protected by aggregator's mutex if necessary.
	if bg.stateAggregator != nil {
		bg.stateAggregator.UpdateState(id, state)
	}
}

// Close closes the balancer. It stops sub-balancers, and removes the subconns.
// The BalancerGroup can be restarted later.
func (bg *BalancerGroup) Close() {
	bg.incomingMu.Lock()
	if bg.incomingStarted {
		bg.incomingStarted = false
		// Also remove all SubConns.
		for sc := range bg.scToSubBalancer {
			bg.cc.RemoveSubConn(sc)
			delete(bg.scToSubBalancer, sc)
		}
	}
	bg.incomingMu.Unlock()

	// Clear(true) runs clear function to close sub-balancers in cache. It
	// must be called out of outgoing mutex.
	bg.balancerCache.Clear(true)

	bg.outgoingMu.Lock()
	if bg.outgoingStarted {
		bg.outgoingStarted = false
		for _, config := range bg.idToBalancerConfig {
			config.stopBalancer()
		}
	}
	bg.outgoingMu.Unlock()
}

// ExitIdle should be invoked when the parent LB policy's ExitIdle is invoked.
// It will trigger this on all sub-balancers, or reconnect their subconns if
// not supported.
func (bg *BalancerGroup) ExitIdle() {
	bg.outgoingMu.Lock()
	for _, config := range bg.idToBalancerConfig {
		if !config.exitIdle() {
			bg.connect(config)
		}
	}
	bg.outgoingMu.Unlock()
}

// ExitIdleOne instructs the sub-balancer `id` to exit IDLE state, if
// appropriate and possible.
func (bg *BalancerGroup) ExitIdleOne(id string) {
	bg.outgoingMu.Lock()
	if config := bg.idToBalancerConfig[id]; config != nil {
		if !config.exitIdle() {
			bg.connect(config)
		}
	}
	bg.outgoingMu.Unlock()
}