aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/status.go
blob: dca93e02ff589945bcd7db2e856b22bfe4f604f8 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
// Copyright 2018 Envoyproxy Authors
//
//   Licensed under the Apache License, Version 2.0 (the "License");
//   you may not use this file except in compliance with the License.
//   You may obtain a copy of the License at
//
//       http://www.apache.org/licenses/LICENSE-2.0
//
//   Unless required by applicable law or agreed to in writing, software
//   distributed under the License is distributed on an "AS IS" BASIS,
//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//   See the License for the specific language governing permissions and
//   limitations under the License.

package cache

import (
	"sort"
	"sync"
	"time"

	core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
	"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)

// NodeHash computes string identifiers for Envoy nodes.
type NodeHash interface {
	// ID function defines a unique string identifier for the remote Envoy node.
	ID(node *core.Node) string
}

// IDHash uses ID field as the node hash.
type IDHash struct{}

// ID uses the node ID field
func (IDHash) ID(node *core.Node) string {
	if node == nil {
		return ""
	}
	return node.GetId()
}

var _ NodeHash = IDHash{}

// StatusInfo publishes information about nodes that are watching the xDS cache.
type StatusInfo interface {
	// GetNode returns the node metadata.
	GetNode() *core.Node

	// GetNumWatches returns the number of open watches.
	GetNumWatches() int

	// GetNumDeltaWatches returns the number of open delta watches.
	GetNumDeltaWatches() int

	// GetLastWatchRequestTime returns the timestamp of the last discovery watch request.
	GetLastWatchRequestTime() time.Time

	// GetLastDeltaWatchRequestTime returns the timestamp of the last delta discovery watch request.
	GetLastDeltaWatchRequestTime() time.Time
}

// statusInfo tracks the server state for the remote Envoy node.
type statusInfo struct {
	// node is the constant Envoy node metadata.
	node *core.Node

	// watches are indexed channels for the response watches and the original requests.
	watches        map[int64]ResponseWatch
	orderedWatches keys

	// deltaWatches are indexed channels for the delta response watches and the original requests
	deltaWatches        map[int64]DeltaResponseWatch
	orderedDeltaWatches keys

	// the timestamp of the last watch request
	lastWatchRequestTime time.Time

	// the timestamp of the last delta watch request
	lastDeltaWatchRequestTime time.Time

	// mutex to protect the status fields.
	// should not acquire mutex of the parent cache after acquiring this mutex.
	mu sync.RWMutex
}

// ResponseWatch is a watch record keeping both the request and an open channel for the response.
type ResponseWatch struct {
	// Request is the original request for the watch.
	Request *Request

	// Response is the channel to push responses to.
	Response chan Response
}

// DeltaResponseWatch is a watch record keeping both the delta request and an open channel for the delta response.
type DeltaResponseWatch struct {
	// Request is the most recent delta request for the watch
	Request *DeltaRequest

	// Response is the channel to push the delta responses to
	Response chan DeltaResponse

	// VersionMap for the stream
	StreamState stream.StreamState
}

// newStatusInfo initializes a status info data structure.
func newStatusInfo(node *core.Node) *statusInfo {
	out := statusInfo{
		node:           node,
		watches:        make(map[int64]ResponseWatch),
		orderedWatches: make(keys, 0),
		deltaWatches:   make(map[int64]DeltaResponseWatch),
	}
	return &out
}

func (info *statusInfo) GetNode() *core.Node {
	info.mu.RLock()
	defer info.mu.RUnlock()
	return info.node
}

func (info *statusInfo) GetNumWatches() int {
	info.mu.RLock()
	defer info.mu.RUnlock()
	return len(info.watches)
}

func (info *statusInfo) GetNumDeltaWatches() int {
	info.mu.RLock()
	defer info.mu.RUnlock()
	return len(info.deltaWatches)
}

func (info *statusInfo) GetLastWatchRequestTime() time.Time {
	info.mu.RLock()
	defer info.mu.RUnlock()
	return info.lastWatchRequestTime
}

func (info *statusInfo) GetLastDeltaWatchRequestTime() time.Time {
	info.mu.RLock()
	defer info.mu.RUnlock()
	return info.lastDeltaWatchRequestTime
}

// setLastDeltaWatchRequestTime will set the current time of the last delta discovery watch request.
func (info *statusInfo) setLastDeltaWatchRequestTime(t time.Time) {
	info.mu.Lock()
	defer info.mu.Unlock()
	info.lastDeltaWatchRequestTime = t
}

// setDeltaResponseWatch will set the provided delta response watch for the associated watch ID.
func (info *statusInfo) setDeltaResponseWatch(id int64, drw DeltaResponseWatch) {
	info.mu.Lock()
	defer info.mu.Unlock()
	info.deltaWatches[id] = drw
}

// orderResponseWatches will track a list of watch keys and order them if
// true is passed.
func (info *statusInfo) orderResponseWatches() {
	info.orderedWatches = make(keys, len(info.watches))

	var index int
	for id, watch := range info.watches {
		info.orderedWatches[index] = key{
			ID:      id,
			TypeURL: watch.Request.GetTypeUrl(),
		}
		index++
	}

	// Sort our list which we can use in the SetSnapshot functions.
	// This is only run when we enable ADS on the cache.
	sort.Sort(info.orderedWatches)
}

// orderResponseDeltaWatches will track a list of delta watch keys and order them if
// true is passed.
func (info *statusInfo) orderResponseDeltaWatches() {
	info.orderedDeltaWatches = make(keys, len(info.deltaWatches))

	var index int
	for id, deltaWatch := range info.deltaWatches {
		info.orderedDeltaWatches[index] = key{
			ID:      id,
			TypeURL: deltaWatch.Request.GetTypeUrl(),
		}
		index++
	}

	// Sort our list which we can use in the SetSnapshot functions.
	// This is only run when we enable ADS on the cache.
	sort.Sort(info.orderedDeltaWatches)
}