1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
|
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package rls
import (
"context"
"strings"
"testing"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer/rls/internal/test/e2e"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancergroup"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/grpctest"
rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/stubserver"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
)
const (
defaultTestTimeout = 5 * time.Second
defaultTestShortTimeout = 100 * time.Millisecond
)
func init() {
balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond
}
type s struct {
grpctest.Tester
}
func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
// fakeBackoffStrategy is a fake implementation of the backoff.Strategy
// interface, for tests to inject the backoff duration.
type fakeBackoffStrategy struct {
backoff time.Duration
}
func (f *fakeBackoffStrategy) Backoff(retries int) time.Duration {
return f.backoff
}
// fakeThrottler is a fake implementation of the adaptiveThrottler interface.
type fakeThrottler struct {
throttleFunc func() bool // Fake throttler implementation.
throttleCh chan struct{} // Invocation of ShouldThrottle signals here.
}
func (f *fakeThrottler) ShouldThrottle() bool {
select {
case <-f.throttleCh:
default:
}
f.throttleCh <- struct{}{}
return f.throttleFunc()
}
func (f *fakeThrottler) RegisterBackendResponse(bool) {}
// alwaysThrottlingThrottler returns a fake throttler which always throttles.
func alwaysThrottlingThrottler() *fakeThrottler {
return &fakeThrottler{
throttleFunc: func() bool { return true },
throttleCh: make(chan struct{}, 1),
}
}
// neverThrottlingThrottler returns a fake throttler which never throttles.
func neverThrottlingThrottler() *fakeThrottler {
return &fakeThrottler{
throttleFunc: func() bool { return false },
throttleCh: make(chan struct{}, 1),
}
}
// oneTimeAllowingThrottler returns a fake throttler which does not throttle
// requests until the client RPC succeeds, but throttles everything that comes
// after. This is useful for tests which need to set up a valid cache entry
// before testing other cases.
func oneTimeAllowingThrottler(firstRPCDone *grpcsync.Event) *fakeThrottler {
return &fakeThrottler{
throttleFunc: firstRPCDone.HasFired,
throttleCh: make(chan struct{}, 1),
}
}
func overrideAdaptiveThrottler(t *testing.T, f *fakeThrottler) {
origAdaptiveThrottler := newAdaptiveThrottler
newAdaptiveThrottler = func() adaptiveThrottler { return f }
t.Cleanup(func() { newAdaptiveThrottler = origAdaptiveThrottler })
}
// buildBasicRLSConfig constructs a basic service config for the RLS LB policy
// with header matching rules. This expects the passed child policy name to
// have been registered by the caller.
func buildBasicRLSConfig(childPolicyName, rlsServerAddress string) *e2e.RLSConfig {
return &e2e.RLSConfig{
RouteLookupConfig: &rlspb.RouteLookupConfig{
GrpcKeybuilders: []*rlspb.GrpcKeyBuilder{
{
Names: []*rlspb.GrpcKeyBuilder_Name{{Service: "grpc.testing.TestService"}},
Headers: []*rlspb.NameMatcher{
{Key: "k1", Names: []string{"n1"}},
{Key: "k2", Names: []string{"n2"}},
},
},
},
LookupService: rlsServerAddress,
LookupServiceTimeout: durationpb.New(defaultTestTimeout),
CacheSizeBytes: 1024,
},
RouteLookupChannelServiceConfig: `{"loadBalancingConfig": [{"pick_first": {}}]}`,
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicyName},
ChildPolicyConfigTargetFieldName: e2e.RLSChildPolicyTargetNameField,
}
}
// buildBasicRLSConfigWithChildPolicy constructs a very basic service config for
// the RLS LB policy. It also registers a test LB policy which is capable of
// being a child of the RLS LB policy.
func buildBasicRLSConfigWithChildPolicy(t *testing.T, childPolicyName, rlsServerAddress string) *e2e.RLSConfig {
childPolicyName = "test-child-policy" + childPolicyName
e2e.RegisterRLSChildPolicy(childPolicyName, nil)
t.Logf("Registered child policy with name %q", childPolicyName)
return &e2e.RLSConfig{
RouteLookupConfig: &rlspb.RouteLookupConfig{
GrpcKeybuilders: []*rlspb.GrpcKeyBuilder{{Names: []*rlspb.GrpcKeyBuilder_Name{{Service: "grpc.testing.TestService"}}}},
LookupService: rlsServerAddress,
LookupServiceTimeout: durationpb.New(defaultTestTimeout),
CacheSizeBytes: 1024,
},
RouteLookupChannelServiceConfig: `{"loadBalancingConfig": [{"pick_first": {}}]}`,
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicyName},
ChildPolicyConfigTargetFieldName: e2e.RLSChildPolicyTargetNameField,
}
}
// startBackend starts a backend implementing the TestService on a local port.
// It returns a channel for tests to get notified whenever an RPC is invoked on
// the backend. This allows tests to ensure that RPCs reach expected backends.
// Also returns the address of the backend.
func startBackend(t *testing.T, sopts ...grpc.ServerOption) (rpcCh chan struct{}, address string) {
t.Helper()
rpcCh = make(chan struct{}, 1)
backend := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
select {
case rpcCh <- struct{}{}:
default:
}
return &testpb.Empty{}, nil
},
}
if err := backend.StartServer(sopts...); err != nil {
t.Fatalf("Failed to start backend: %v", err)
}
t.Logf("Started TestService backend at: %q", backend.Address)
t.Cleanup(func() { backend.Stop() })
return rpcCh, backend.Address
}
// startManualResolverWithConfig registers and returns a manual resolver which
// pushes the RLS LB policy's service config on the channel.
func startManualResolverWithConfig(t *testing.T, rlsConfig *e2e.RLSConfig) *manual.Resolver {
t.Helper()
scJSON, err := rlsConfig.ServiceConfigJSON()
if err != nil {
t.Fatal(err)
}
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON)
r := manual.NewBuilderWithScheme("rls-e2e")
r.InitialState(resolver.State{ServiceConfig: sc})
t.Cleanup(r.Close)
return r
}
// makeTestRPCAndExpectItToReachBackend is a test helper function which makes
// the EmptyCall RPC on the given ClientConn and verifies that it reaches a
// backend. The latter is accomplished by listening on the provided channel
// which gets pushed to whenever the backend in question gets an RPC.
//
// There are many instances where it can take a while before the attempted RPC
// reaches the expected backend. Examples include, but are not limited to:
// - control channel is changed in a config update. The RLS LB policy creates a
// new control channel, and sends a new picker to gRPC. But it takes a while
// before gRPC actually starts using the new picker.
// - test is waiting for a cache entry to expire after which we expect a
// different behavior because we have configured the fake RLS server to return
// different backends.
//
// Therefore, we do not return an error when the RPC fails. Instead, we wait for
// the context to expire before failing.
func makeTestRPCAndExpectItToReachBackend(ctx context.Context, t *testing.T, cc *grpc.ClientConn, ch chan struct{}) {
t.Helper()
// Drain the backend channel before performing the RPC to remove any
// notifications from previous RPCs.
select {
case <-ch:
default:
}
for {
if err := ctx.Err(); err != nil {
t.Fatalf("Timeout when waiting for RPCs to be routed to the given target: %v", err)
}
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
client := testgrpc.NewTestServiceClient(cc)
client.EmptyCall(sCtx, &testpb.Empty{})
select {
case <-sCtx.Done():
case <-ch:
sCancel()
return
}
}
}
// makeTestRPCAndVerifyError is a test helper function which makes the EmptyCall
// RPC on the given ClientConn and verifies that the RPC fails with the given
// status code and error.
//
// Similar to makeTestRPCAndExpectItToReachBackend, retries until expected
// outcome is reached or the provided context has expired.
func makeTestRPCAndVerifyError(ctx context.Context, t *testing.T, cc *grpc.ClientConn, wantCode codes.Code, wantErr error) {
t.Helper()
for {
if err := ctx.Err(); err != nil {
t.Fatalf("Timeout when waiting for RPCs to fail with given error: %v", err)
}
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
client := testgrpc.NewTestServiceClient(cc)
_, err := client.EmptyCall(sCtx, &testpb.Empty{})
// If the RPC fails with the expected code and expected error message (if
// one was provided), we return. Else we retry after blocking for a little
// while to ensure that we don't keep blasting away with RPCs.
if code := status.Code(err); code == wantCode {
if wantErr == nil || strings.Contains(err.Error(), wantErr.Error()) {
sCancel()
return
}
}
<-sCtx.Done()
}
}
// verifyRLSRequest is a test helper which listens on a channel to see if an RLS
// request was received by the fake RLS server. Based on whether the test
// expects a request to be sent out or not, it uses a different timeout.
func verifyRLSRequest(t *testing.T, ch chan struct{}, wantRequest bool) {
t.Helper()
if wantRequest {
select {
case <-time.After(defaultTestTimeout):
t.Fatalf("Timeout when waiting for an RLS request to be sent out")
case <-ch:
}
} else {
select {
case <-time.After(defaultTestShortTimeout):
case <-ch:
t.Fatalf("RLS request sent out when not expecting one")
}
}
}
|