1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
|
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package rls
import (
"bytes"
"encoding/json"
"fmt"
"net/url"
"time"
"github.com/golang/protobuf/ptypes"
durationpb "github.com/golang/protobuf/ptypes/duration"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/rls/internal/keys"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/pretty"
rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/protobuf/encoding/protojson"
)
const (
// Default max_age if not specified (or greater than this value) in the
// service config.
maxMaxAge = 5 * time.Minute
// Upper limit for cache_size since we don't fully trust the service config.
maxCacheSize = 5 * 1024 * 1024 * 8 // 5MB in bytes
// Default lookup_service_timeout if not specified in the service config.
defaultLookupServiceTimeout = 10 * time.Second
// Default value for targetNameField in the child policy config during
// service config validation.
dummyChildPolicyTarget = "target_name_to_be_filled_in_later"
)
// lbConfig is the internal representation of the RLS LB policy's config.
type lbConfig struct {
serviceconfig.LoadBalancingConfig
cacheSizeBytes int64 // Keep this field 64-bit aligned.
kbMap keys.BuilderMap
lookupService string
lookupServiceTimeout time.Duration
maxAge time.Duration
staleAge time.Duration
defaultTarget string
childPolicyName string
childPolicyConfig map[string]json.RawMessage
childPolicyTargetField string
controlChannelServiceConfig string
}
func (lbCfg *lbConfig) Equal(other *lbConfig) bool {
return lbCfg.kbMap.Equal(other.kbMap) &&
lbCfg.lookupService == other.lookupService &&
lbCfg.lookupServiceTimeout == other.lookupServiceTimeout &&
lbCfg.maxAge == other.maxAge &&
lbCfg.staleAge == other.staleAge &&
lbCfg.cacheSizeBytes == other.cacheSizeBytes &&
lbCfg.defaultTarget == other.defaultTarget &&
lbCfg.childPolicyName == other.childPolicyName &&
lbCfg.childPolicyTargetField == other.childPolicyTargetField &&
lbCfg.controlChannelServiceConfig == other.controlChannelServiceConfig &&
childPolicyConfigEqual(lbCfg.childPolicyConfig, other.childPolicyConfig)
}
func childPolicyConfigEqual(a, b map[string]json.RawMessage) bool {
if (b == nil) != (a == nil) {
return false
}
if len(b) != len(a) {
return false
}
for k, jsonA := range a {
jsonB, ok := b[k]
if !ok {
return false
}
if !bytes.Equal(jsonA, jsonB) {
return false
}
}
return true
}
// This struct resembles the JSON representation of the loadBalancing config
// and makes it easier to unmarshal.
type lbConfigJSON struct {
RouteLookupConfig json.RawMessage
RouteLookupChannelServiceConfig json.RawMessage
ChildPolicy []map[string]json.RawMessage
ChildPolicyConfigTargetFieldName string
}
// ParseConfig parses the JSON load balancer config provided into an
// internal form or returns an error if the config is invalid.
//
// When parsing a config update, the following validations are performed:
// - routeLookupConfig:
// - grpc_keybuilders field:
// - must have at least one entry
// - must not have two entries with the same `Name`
// - within each entry:
// - must have at least one `Name`
// - must not have a `Name` with the `service` field unset or empty
// - within each `headers` entry:
// - must not have `required_match` set
// - must not have `key` unset or empty
// - across all `headers`, `constant_keys` and `extra_keys` fields:
// - must not have the same `key` specified twice
// - no `key` must be the empty string
// - `lookup_service` field must be set and must parse as a target URI
// - if `max_age` > 5m, it should be set to 5 minutes
// - if `stale_age` > `max_age`, ignore it
// - if `stale_age` is set, then `max_age` must also be set
// - ignore `valid_targets` field
// - `cache_size_bytes` field must have a value greater than 0, and if its
// value is greater than 5M, we cap it at 5M
//
// - routeLookupChannelServiceConfig:
// - if specified, must parse as valid service config
//
// - childPolicy:
// - must find a valid child policy with a valid config
//
// - childPolicyConfigTargetFieldName:
// - must be set and non-empty
func (rlsBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
logger.Infof("Received JSON service config: %v", pretty.ToJSON(c))
cfgJSON := &lbConfigJSON{}
if err := json.Unmarshal(c, cfgJSON); err != nil {
return nil, fmt.Errorf("rls: json unmarshal failed for service config %+v: %v", string(c), err)
}
m := protojson.UnmarshalOptions{DiscardUnknown: true}
rlsProto := &rlspb.RouteLookupConfig{}
if err := m.Unmarshal(cfgJSON.RouteLookupConfig, rlsProto); err != nil {
return nil, fmt.Errorf("rls: bad RouteLookupConfig proto %+v: %v", string(cfgJSON.RouteLookupConfig), err)
}
lbCfg, err := parseRLSProto(rlsProto)
if err != nil {
return nil, err
}
if sc := string(cfgJSON.RouteLookupChannelServiceConfig); sc != "" {
parsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(sc)
if parsed.Err != nil {
return nil, fmt.Errorf("rls: bad control channel service config %q: %v", sc, parsed.Err)
}
lbCfg.controlChannelServiceConfig = sc
}
if cfgJSON.ChildPolicyConfigTargetFieldName == "" {
return nil, fmt.Errorf("rls: childPolicyConfigTargetFieldName field is not set in service config %+v", string(c))
}
name, config, err := parseChildPolicyConfigs(cfgJSON.ChildPolicy, cfgJSON.ChildPolicyConfigTargetFieldName)
if err != nil {
return nil, err
}
lbCfg.childPolicyName = name
lbCfg.childPolicyConfig = config
lbCfg.childPolicyTargetField = cfgJSON.ChildPolicyConfigTargetFieldName
return lbCfg, nil
}
func parseRLSProto(rlsProto *rlspb.RouteLookupConfig) (*lbConfig, error) {
// Validations specified on the `grpc_keybuilders` field are performed here.
kbMap, err := keys.MakeBuilderMap(rlsProto)
if err != nil {
return nil, err
}
// `lookup_service` field must be set and must parse as a target URI.
lookupService := rlsProto.GetLookupService()
if lookupService == "" {
return nil, fmt.Errorf("rls: empty lookup_service in route lookup config %+v", rlsProto)
}
parsedTarget, err := url.Parse(lookupService)
if err != nil {
// url.Parse() fails if scheme is missing. Retry with default scheme.
parsedTarget, err = url.Parse(resolver.GetDefaultScheme() + ":///" + lookupService)
if err != nil {
return nil, fmt.Errorf("rls: invalid target URI in lookup_service %s", lookupService)
}
}
if parsedTarget.Scheme == "" {
parsedTarget.Scheme = resolver.GetDefaultScheme()
}
if resolver.Get(parsedTarget.Scheme) == nil {
return nil, fmt.Errorf("rls: unregistered scheme in lookup_service %s", lookupService)
}
lookupServiceTimeout, err := convertDuration(rlsProto.GetLookupServiceTimeout())
if err != nil {
return nil, fmt.Errorf("rls: failed to parse lookup_service_timeout in route lookup config %+v: %v", rlsProto, err)
}
if lookupServiceTimeout == 0 {
lookupServiceTimeout = defaultLookupServiceTimeout
}
// Validations performed here:
// - if `max_age` > 5m, it should be set to 5 minutes
// - if `stale_age` > `max_age`, ignore it
// - if `stale_age` is set, then `max_age` must also be set
maxAge, err := convertDuration(rlsProto.GetMaxAge())
if err != nil {
return nil, fmt.Errorf("rls: failed to parse max_age in route lookup config %+v: %v", rlsProto, err)
}
staleAge, err := convertDuration(rlsProto.GetStaleAge())
if err != nil {
return nil, fmt.Errorf("rls: failed to parse staleAge in route lookup config %+v: %v", rlsProto, err)
}
if staleAge != 0 && maxAge == 0 {
return nil, fmt.Errorf("rls: stale_age is set, but max_age is not in route lookup config %+v", rlsProto)
}
if staleAge >= maxAge {
logger.Infof("rls: stale_age %v is not less than max_age %v, ignoring it", staleAge, maxAge)
staleAge = 0
}
if maxAge == 0 || maxAge > maxMaxAge {
logger.Infof("rls: max_age in route lookup config is %v, using %v", maxAge, maxMaxAge)
maxAge = maxMaxAge
}
// `cache_size_bytes` field must have a value greater than 0, and if its
// value is greater than 5M, we cap it at 5M
cacheSizeBytes := rlsProto.GetCacheSizeBytes()
if cacheSizeBytes <= 0 {
return nil, fmt.Errorf("rls: cache_size_bytes must be set to a non-zero value: %+v", rlsProto)
}
if cacheSizeBytes > maxCacheSize {
logger.Info("rls: cache_size_bytes %v is too large, setting it to: %v", cacheSizeBytes, maxCacheSize)
cacheSizeBytes = maxCacheSize
}
return &lbConfig{
kbMap: kbMap,
lookupService: lookupService,
lookupServiceTimeout: lookupServiceTimeout,
maxAge: maxAge,
staleAge: staleAge,
cacheSizeBytes: cacheSizeBytes,
defaultTarget: rlsProto.GetDefaultTarget(),
}, nil
}
// parseChildPolicyConfigs iterates through the list of child policies and picks
// the first registered policy and validates its config.
func parseChildPolicyConfigs(childPolicies []map[string]json.RawMessage, targetFieldName string) (string, map[string]json.RawMessage, error) {
for i, config := range childPolicies {
if len(config) != 1 {
return "", nil, fmt.Errorf("rls: invalid childPolicy: entry %v does not contain exactly 1 policy/config pair: %q", i, config)
}
var name string
var rawCfg json.RawMessage
for name, rawCfg = range config {
}
builder := balancer.Get(name)
if builder == nil {
continue
}
parser, ok := builder.(balancer.ConfigParser)
if !ok {
return "", nil, fmt.Errorf("rls: childPolicy %q with config %q does not support config parsing", name, string(rawCfg))
}
// To validate child policy configs we do the following:
// - unmarshal the raw JSON bytes of the child policy config into a map
// - add an entry with key set to `target_field_name` and a dummy value
// - marshal the map back to JSON and parse the config using the parser
// retrieved previously
var childConfig map[string]json.RawMessage
if err := json.Unmarshal(rawCfg, &childConfig); err != nil {
return "", nil, fmt.Errorf("rls: json unmarshal failed for child policy config %q: %v", string(rawCfg), err)
}
childConfig[targetFieldName], _ = json.Marshal(dummyChildPolicyTarget)
jsonCfg, err := json.Marshal(childConfig)
if err != nil {
return "", nil, fmt.Errorf("rls: json marshal failed for child policy config {%+v}: %v", childConfig, err)
}
if _, err := parser.ParseConfig(jsonCfg); err != nil {
return "", nil, fmt.Errorf("rls: childPolicy config validation failed: %v", err)
}
return name, childConfig, nil
}
return "", nil, fmt.Errorf("rls: invalid childPolicy config: no supported policies found in %+v", childPolicies)
}
func convertDuration(d *durationpb.Duration) (time.Duration, error) {
if d == nil {
return 0, nil
}
return ptypes.Duration(d)
}
|