blob: c88548388acd93e2e6e6aeeecc8668627ff3f40b (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
package delta
import (
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)
// watches for all delta xDS resource types
type watches struct {
deltaWatches map[string]watch
// Opaque resources share a muxed channel
deltaMuxedResponses chan cache.DeltaResponse
}
// newWatches creates and initializes watches.
func newWatches() watches {
// deltaMuxedResponses needs a buffer to release go-routines populating it
return watches{
deltaWatches: make(map[string]watch, int(types.UnknownType)),
deltaMuxedResponses: make(chan cache.DeltaResponse, int(types.UnknownType)),
}
}
// Cancel all watches
func (w *watches) Cancel() {
for _, watch := range w.deltaWatches {
watch.Cancel()
}
}
// watch contains the necessary modifiables for receiving resource responses
type watch struct {
responses chan cache.DeltaResponse
cancel func()
nonce string
state stream.StreamState
}
// Cancel calls terminate and cancel
func (w *watch) Cancel() {
if w.cancel != nil {
w.cancel()
}
if w.responses != nil {
// w.responses should never be used by a producer once cancel() has been closed, so we can safely close it here
// This is needed to release resources taken by goroutines watching this channel
close(w.responses)
}
}
|