aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/google.golang.org/grpc/xds/internal/server/rds_handler.go
blob: 722748cbd526a7a4b869b78e308adca8e1c7bb3b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
/*
 *
 * Copyright 2021 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package server

import (
	"sync"

	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

// rdsHandlerUpdate wraps the full RouteConfigUpdate that are dynamically
// queried for a given server side listener.
type rdsHandlerUpdate struct {
	updates map[string]xdsresource.RouteConfigUpdate
	err     error
}

// rdsHandler handles any RDS queries that need to be started for a given server
// side listeners Filter Chains (i.e. not inline).
type rdsHandler struct {
	xdsC XDSClient

	mu      sync.Mutex
	updates map[string]xdsresource.RouteConfigUpdate
	cancels map[string]func()

	// For a rdsHandler update, the only update wrapped listener cares about is
	// most recent one, so this channel will be opportunistically drained before
	// sending any new updates.
	updateChannel chan rdsHandlerUpdate
}

// newRDSHandler creates a new rdsHandler to watch for RDS resources.
// listenerWrapper updates the list of route names to watch by calling
// updateRouteNamesToWatch() upon receipt of new Listener configuration.
func newRDSHandler(xdsC XDSClient, ch chan rdsHandlerUpdate) *rdsHandler {
	return &rdsHandler{
		xdsC:          xdsC,
		updateChannel: ch,
		updates:       make(map[string]xdsresource.RouteConfigUpdate),
		cancels:       make(map[string]func()),
	}
}

// updateRouteNamesToWatch handles a list of route names to watch for a given
// server side listener (if a filter chain specifies dynamic RDS configuration).
// This function handles all the logic with respect to any routes that may have
// been added or deleted as compared to what was previously present.
func (rh *rdsHandler) updateRouteNamesToWatch(routeNamesToWatch map[string]bool) {
	rh.mu.Lock()
	defer rh.mu.Unlock()
	// Add and start watches for any routes for any new routes in
	// routeNamesToWatch.
	for routeName := range routeNamesToWatch {
		if _, ok := rh.cancels[routeName]; !ok {
			func(routeName string) {
				rh.cancels[routeName] = rh.xdsC.WatchRouteConfig(routeName, func(update xdsresource.RouteConfigUpdate, err error) {
					rh.handleRouteUpdate(routeName, update, err)
				})
			}(routeName)
		}
	}

	// Delete and cancel watches for any routes from persisted routeNamesToWatch
	// that are no longer present.
	for routeName := range rh.cancels {
		if _, ok := routeNamesToWatch[routeName]; !ok {
			rh.cancels[routeName]()
			delete(rh.cancels, routeName)
			delete(rh.updates, routeName)
		}
	}

	// If the full list (determined by length) of updates are now successfully
	// updated, the listener is ready to be updated.
	if len(rh.updates) == len(rh.cancels) && len(routeNamesToWatch) != 0 {
		drainAndPush(rh.updateChannel, rdsHandlerUpdate{updates: rh.updates})
	}
}

// handleRouteUpdate persists the route config for a given route name, and also
// sends an update to the Listener Wrapper on an error received or if the rds
// handler has a full collection of updates.
func (rh *rdsHandler) handleRouteUpdate(routeName string, update xdsresource.RouteConfigUpdate, err error) {
	if err != nil {
		drainAndPush(rh.updateChannel, rdsHandlerUpdate{err: err})
		return
	}
	rh.mu.Lock()
	defer rh.mu.Unlock()
	rh.updates[routeName] = update

	// If the full list (determined by length) of updates have successfully
	// updated, the listener is ready to be updated.
	if len(rh.updates) == len(rh.cancels) {
		drainAndPush(rh.updateChannel, rdsHandlerUpdate{updates: rh.updates})
	}
}

func drainAndPush(ch chan rdsHandlerUpdate, update rdsHandlerUpdate) {
	select {
	case <-ch:
	default:
	}
	ch <- update
}

// close() is meant to be called by wrapped listener when the wrapped listener
// is closed, and it cleans up resources by canceling all the active RDS
// watches.
func (rh *rdsHandler) close() {
	rh.mu.Lock()
	defer rh.mu.Unlock()
	for _, cancel := range rh.cancels {
		cancel()
	}
}