aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer_test.go
blob: cdbd446f8101db91a3b49c0bb28bacc55ec1b653 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
/*
 *
 * Copyright 2022 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package grpcsync

import (
	"context"
	"sync"
	"testing"
	"time"

	"github.com/google/go-cmp/cmp"
)

const (
	defaultTestTimeout      = 5 * time.Second
	defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.
)

// TestCallbackSerializer_Schedule_FIFO verifies that callbacks are executed in
// the same order in which they were scheduled.
func (s) TestCallbackSerializer_Schedule_FIFO(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
	cs := NewCallbackSerializer(ctx)
	defer cancel()

	// We have two channels, one to record the order of scheduling, and the
	// other to record the order of execution. We spawn a bunch of goroutines
	// which record the order of scheduling and call the actual Schedule()
	// method as well.  The callbacks record the order of execution.
	//
	// We need to grab a lock to record order of scheduling to guarantee that
	// the act of recording and the act of calling Schedule() happen atomically.
	const numCallbacks = 100
	var mu sync.Mutex
	scheduleOrderCh := make(chan int, numCallbacks)
	executionOrderCh := make(chan int, numCallbacks)
	for i := 0; i < numCallbacks; i++ {
		go func(id int) {
			mu.Lock()
			defer mu.Unlock()
			scheduleOrderCh <- id
			cs.Schedule(func(ctx context.Context) {
				select {
				case <-ctx.Done():
					return
				case executionOrderCh <- id:
				}
			})
		}(i)
	}

	// Spawn a couple of goroutines to capture the order or scheduling and the
	// order of execution.
	scheduleOrder := make([]int, numCallbacks)
	executionOrder := make([]int, numCallbacks)
	var wg sync.WaitGroup
	wg.Add(2)
	go func() {
		defer wg.Done()
		for i := 0; i < numCallbacks; i++ {
			select {
			case <-ctx.Done():
				return
			case id := <-scheduleOrderCh:
				scheduleOrder[i] = id
			}
		}
	}()
	go func() {
		defer wg.Done()
		for i := 0; i < numCallbacks; i++ {
			select {
			case <-ctx.Done():
				return
			case id := <-executionOrderCh:
				executionOrder[i] = id
			}
		}
	}()
	wg.Wait()

	if diff := cmp.Diff(executionOrder, scheduleOrder); diff != "" {
		t.Fatalf("Callbacks are not executed in scheduled order. diff(-want, +got):\n%s", diff)
	}
}

// TestCallbackSerializer_Schedule_Concurrent verifies that all concurrently
// scheduled callbacks get executed.
func (s) TestCallbackSerializer_Schedule_Concurrent(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
	cs := NewCallbackSerializer(ctx)
	defer cancel()

	// Schedule callbacks concurrently by calling Schedule() from goroutines.
	// The execution of the callbacks call Done() on the waitgroup, which
	// eventually unblocks the test and allows it to complete.
	const numCallbacks = 100
	var wg sync.WaitGroup
	wg.Add(numCallbacks)
	for i := 0; i < numCallbacks; i++ {
		go func() {
			cs.Schedule(func(context.Context) {
				wg.Done()
			})
		}()
	}

	// We call Wait() on the waitgroup from a goroutine so that we can select on
	// the Wait() being unblocked and the overall test deadline expiring.
	done := make(chan struct{})
	go func() {
		wg.Wait()
		close(done)
	}()

	select {
	case <-ctx.Done():
		t.Fatal("Timeout waiting for all scheduled callbacks to be executed")
	case <-done:
	}
}

// TestCallbackSerializer_Schedule_Close verifies that callbacks in the queue
// are not executed once Close() returns.
func (s) TestCallbackSerializer_Schedule_Close(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
	defer cancel()

	serializerCtx, serializerCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
	cs := NewCallbackSerializer(serializerCtx)

	// Schedule a callback which blocks until the context passed to it is
	// canceled. It also closes a channel to signal that it has started.
	firstCallbackStartedCh := make(chan struct{})
	cs.Schedule(func(ctx context.Context) {
		close(firstCallbackStartedCh)
		<-ctx.Done()
	})

	// Schedule a bunch of callbacks. These should be exeuted since the are
	// scheduled before the serializer is closed.
	const numCallbacks = 10
	callbackCh := make(chan int, numCallbacks)
	for i := 0; i < numCallbacks; i++ {
		num := i
		if !cs.Schedule(func(context.Context) { callbackCh <- num }) {
			t.Fatal("Schedule failed to accept a callback when the serializer is yet to be closed")
		}
	}

	// Ensure that none of the newer callbacks are executed at this point.
	select {
	case <-time.After(defaultTestShortTimeout):
	case <-callbackCh:
		t.Fatal("Newer callback executed when older one is still executing")
	}

	// Wait for the first callback to start before closing the scheduler.
	<-firstCallbackStartedCh

	// Cancel the context which will unblock the first callback. All of the
	// other callbacks (which have not started executing at this point) should
	// be executed after this.
	serializerCancel()

	// Ensure that the newer callbacks are executed.
	for i := 0; i < numCallbacks; i++ {
		select {
		case <-ctx.Done():
			t.Fatal("Timeout when waiting for callback scheduled before close to be executed")
		case num := <-callbackCh:
			if num != i {
				t.Fatalf("Executing callback %d, want %d", num, i)
			}
		}
	}
	<-cs.Done

	done := make(chan struct{})
	if cs.Schedule(func(context.Context) { close(done) }) {
		t.Fatal("Scheduled a callback after closing the serializer")
	}

	// Ensure that the lates callback is executed at this point.
	select {
	case <-time.After(defaultTestShortTimeout):
	case <-done:
		t.Fatal("Newer callback executed when scheduled after closing serializer")
	}
}