aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/envoyproxy/go-control-plane/pkg/cache
diff options
context:
space:
mode:
authorrobot-contrib <robot-contrib@yandex-team.com>2024-01-30 11:20:39 +0300
committerrobot-contrib <robot-contrib@yandex-team.com>2024-01-30 12:12:51 +0300
commitbe737fd8956853e06bd2c4f9fcd4a85188f4c172 (patch)
tree5bd76802fac1096dfd90983c7739d50de367a79f /vendor/github.com/envoyproxy/go-control-plane/pkg/cache
parentfe62880c46b1f2c9fec779b0dc39f8a92ce256a5 (diff)
downloadydb-be737fd8956853e06bd2c4f9fcd4a85188f4c172.tar.gz
Update vendor/github.com/envoyproxy/go-control-plane to 0.12.0
Diffstat (limited to 'vendor/github.com/envoyproxy/go-control-plane/pkg/cache')
-rw-r--r--vendor/github.com/envoyproxy/go-control-plane/pkg/cache/types/types.go10
-rw-r--r--vendor/github.com/envoyproxy/go-control-plane/pkg/cache/types/ya.make4
-rw-r--r--vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/cache.go40
-rw-r--r--vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/linear.go25
-rw-r--r--vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/order.go24
-rw-r--r--vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/resource.go37
-rw-r--r--vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/simple.go171
-rw-r--r--vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/status.go54
-rw-r--r--vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/ya.make2
9 files changed, 248 insertions, 119 deletions
diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/types/types.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/types/types.go
index b99aacb84f..ffaf0a9774 100644
--- a/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/types/types.go
+++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/types/types.go
@@ -38,13 +38,17 @@ func (e SkipFetchError) Error() string {
// ResponseType enumeration of supported response types
type ResponseType int
+// NOTE: The order of this enum MATTERS!
+// https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#aggregated-discovery-service
+// ADS expects things to be returned in a specific order.
+// See the following issue for details: https://github.com/envoyproxy/go-control-plane/issues/526
const (
- Endpoint ResponseType = iota
- Cluster
+ Cluster ResponseType = iota
+ Endpoint
+ Listener
Route
ScopedRoute
VirtualHost
- Listener
Secret
Runtime
ExtensionConfig
diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/types/ya.make b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/types/ya.make
index 293b3bb9fc..c7d9e357f2 100644
--- a/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/types/ya.make
+++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/types/ya.make
@@ -2,6 +2,8 @@ GO_LIBRARY()
LICENSE(Apache-2.0)
-SRCS(types.go)
+SRCS(
+ types.go
+)
END()
diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/cache.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/cache.go
index 55f60e0d6a..5ad8e24140 100644
--- a/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/cache.go
+++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/cache.go
@@ -44,6 +44,10 @@ type DeltaRequest = discovery.DeltaDiscoveryRequest
// ConfigWatcher implementation must be thread-safe.
type ConfigWatcher interface {
// CreateWatch returns a new open watch from a non-empty request.
+ // This is the entrypoint to propagate configuration changes the
+ // provided Response channel. State from the gRPC server is utilized
+ // to make sure consuming cache implementations can see what the server has sent to clients.
+ //
// An individual consumer normally issues a single open watch by each type URL.
//
// The provided channel produces requested resources as responses, once they are available.
@@ -53,6 +57,9 @@ type ConfigWatcher interface {
CreateWatch(*Request, stream.StreamState, chan Response) (cancel func())
// CreateDeltaWatch returns a new open incremental xDS watch.
+ // This is the entrypoint to propagate configuration changes the
+ // provided DeltaResponse channel. State from the gRPC server is utilized
+ // to make sure consuming cache implementations can see what the server has sent to clients.
//
// The provided channel produces requested resources as responses, or spontaneous updates in accordance
// with the incremental xDS specification.
@@ -160,8 +167,10 @@ type RawDeltaResponse struct {
marshaledResponse atomic.Value
}
-var _ Response = &RawResponse{}
-var _ DeltaResponse = &RawDeltaResponse{}
+var (
+ _ Response = &RawResponse{}
+ _ DeltaResponse = &RawDeltaResponse{}
+)
// PassthroughResponse is a pre constructed xDS response that need not go through marshaling transformations.
type PassthroughResponse struct {
@@ -188,8 +197,10 @@ type DeltaPassthroughResponse struct {
ctx context.Context
}
-var _ Response = &PassthroughResponse{}
-var _ DeltaResponse = &DeltaPassthroughResponse{}
+var (
+ _ Response = &PassthroughResponse{}
+ _ DeltaResponse = &DeltaPassthroughResponse{}
+)
// GetDiscoveryResponse performs the marshaling the first time its called and uses the cached response subsequently.
// This is necessary because the marshaled response does not change across the calls.
@@ -218,7 +229,7 @@ func (r *RawResponse) GetDiscoveryResponse() (*discovery.DiscoveryResponse, erro
marshaledResponse = &discovery.DiscoveryResponse{
VersionInfo: r.Version,
Resources: marshaledResources,
- TypeUrl: r.Request.TypeUrl,
+ TypeUrl: r.GetRequest().GetTypeUrl(),
}
r.marshaledResponse.Store(marshaledResponse)
@@ -249,7 +260,7 @@ func (r *RawDeltaResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscover
marshaledResources[i] = &discovery.Resource{
Name: name,
Resource: &anypb.Any{
- TypeUrl: r.DeltaRequest.TypeUrl,
+ TypeUrl: r.GetDeltaRequest().GetTypeUrl(),
Value: marshaledResource,
},
Version: version,
@@ -259,7 +270,7 @@ func (r *RawDeltaResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscover
marshaledResponse = &discovery.DeltaDiscoveryResponse{
Resources: marshaledResources,
RemovedResources: r.RemovedResources,
- TypeUrl: r.DeltaRequest.TypeUrl,
+ TypeUrl: r.GetDeltaRequest().GetTypeUrl(),
SystemVersionInfo: r.SystemVersionInfo,
}
r.marshaledResponse.Store(marshaledResponse)
@@ -315,14 +326,14 @@ func (r *RawResponse) maybeCreateTTLResource(resource types.ResourceWithTTL) (ty
if err != nil {
return nil, "", err
}
- rsrc.TypeUrl = r.Request.TypeUrl
+ rsrc.TypeUrl = r.GetRequest().GetTypeUrl()
wrappedResource.Resource = rsrc
}
return wrappedResource, deltaResourceTypeURL, nil
}
- return resource.Resource, r.Request.TypeUrl, nil
+ return resource.Resource, r.GetRequest().GetTypeUrl(), nil
}
// GetDiscoveryResponse returns the final passthrough Discovery Response.
@@ -347,19 +358,22 @@ func (r *DeltaPassthroughResponse) GetDeltaRequest() *discovery.DeltaDiscoveryRe
// GetVersion returns the response version.
func (r *PassthroughResponse) GetVersion() (string, error) {
- if r.DiscoveryResponse != nil {
- return r.DiscoveryResponse.VersionInfo, nil
+ discoveryResponse, _ := r.GetDiscoveryResponse()
+ if discoveryResponse != nil {
+ return discoveryResponse.GetVersionInfo(), nil
}
return "", fmt.Errorf("DiscoveryResponse is nil")
}
+
func (r *PassthroughResponse) GetContext() context.Context {
return r.ctx
}
// GetSystemVersion returns the response version.
func (r *DeltaPassthroughResponse) GetSystemVersion() (string, error) {
- if r.DeltaDiscoveryResponse != nil {
- return r.DeltaDiscoveryResponse.SystemVersionInfo, nil
+ deltaDiscoveryResponse, _ := r.GetDeltaDiscoveryResponse()
+ if deltaDiscoveryResponse != nil {
+ return deltaDiscoveryResponse.GetSystemVersionInfo(), nil
}
return "", fmt.Errorf("DeltaDiscoveryResponse is nil")
}
diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/linear.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/linear.go
index cf5ab7e268..f7786ac4f9 100644
--- a/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/linear.go
+++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/linear.go
@@ -187,7 +187,7 @@ func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaRe
if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 {
if cache.log != nil {
cache.log.Debugf("[linear cache] node: %s, sending delta response for typeURL %s with resources: %v removed resources: %v with wildcard: %t",
- request.GetNode().GetId(), request.TypeUrl, GetResourceNames(resp.Resources), resp.RemovedResources, state.IsWildcard())
+ request.GetNode().GetId(), request.GetTypeUrl(), GetResourceNames(resp.Resources), resp.RemovedResources, state.IsWildcard())
}
value <- resp
return resp
@@ -299,7 +299,7 @@ func (cache *LinearCache) GetResources() map[string]types.Resource {
}
func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, value chan Response) func() {
- if request.TypeUrl != cache.typeURL {
+ if request.GetTypeUrl() != cache.typeURL {
value <- nil
return nil
}
@@ -312,8 +312,8 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va
// strip version prefix if it is present
var lastVersion uint64
var err error
- if strings.HasPrefix(request.VersionInfo, cache.versionPrefix) {
- lastVersion, err = strconv.ParseUint(request.VersionInfo[len(cache.versionPrefix):], 0, 64)
+ if strings.HasPrefix(request.GetVersionInfo(), cache.versionPrefix) {
+ lastVersion, err = strconv.ParseUint(request.GetVersionInfo()[len(cache.versionPrefix):], 0, 64)
} else {
err = errors.New("mis-matched version prefix")
}
@@ -321,13 +321,14 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va
cache.mu.Lock()
defer cache.mu.Unlock()
- if err != nil {
+ switch {
+ case err != nil:
stale = true
- staleResources = request.ResourceNames
- } else if len(request.ResourceNames) == 0 {
+ staleResources = request.GetResourceNames()
+ case len(request.GetResourceNames()) == 0:
stale = lastVersion != cache.version
- } else {
- for _, name := range request.ResourceNames {
+ default:
+ for _, name := range request.GetResourceNames() {
// When a resource is removed, its version defaults 0 and it is not considered stale.
if lastVersion < cache.versionVector[name] {
stale = true
@@ -340,7 +341,7 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va
return nil
}
// Create open watches since versions are up to date.
- if len(request.ResourceNames) == 0 {
+ if len(request.GetResourceNames()) == 0 {
cache.watchAll[value] = struct{}{}
return func() {
cache.mu.Lock()
@@ -348,7 +349,7 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va
delete(cache.watchAll, value)
}
}
- for _, name := range request.ResourceNames {
+ for _, name := range request.GetResourceNames() {
set, exists := cache.watches[name]
if !exists {
set = make(watches)
@@ -359,7 +360,7 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va
return func() {
cache.mu.Lock()
defer cache.mu.Unlock()
- for _, name := range request.ResourceNames {
+ for _, name := range request.GetResourceNames() {
set, exists := cache.watches[name]
if exists {
delete(set, value)
diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/order.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/order.go
new file mode 100644
index 0000000000..c416946e82
--- /dev/null
+++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/order.go
@@ -0,0 +1,24 @@
+package cache
+
+// Key is an internal sorting data structure we can use to
+// order responses by Type and their associated watch IDs.
+type key struct {
+ ID int64
+ TypeURL string
+}
+
+// Keys implements Go's sorting.Sort interface
+type keys []key
+
+func (k keys) Len() int {
+ return len(k)
+}
+
+// Less compares the typeURL and determines what order things should be sent.
+func (k keys) Less(i, j int) bool {
+ return GetResponseType(k[i].TypeURL) < GetResponseType(k[j].TypeURL)
+}
+
+func (k keys) Swap(i, j int) {
+ k[i], k[j] = k[j], k[i]
+}
diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/resource.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/resource.go
index ea57e4714a..d4c25f2a11 100644
--- a/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/resource.go
+++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/resource.go
@@ -22,15 +22,12 @@ import (
"google.golang.org/protobuf/proto"
cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
- core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
- auth "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
runtime "github.com/envoyproxy/go-control-plane/envoy/service/runtime/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
- ratelimit "github.com/envoyproxy/go-control-plane/ratelimit/config/ratelimit/v3"
)
// GetResponseType returns the enumeration for a valid xDS type URL.
@@ -93,24 +90,6 @@ func GetResourceName(res types.Resource) string {
switch v := res.(type) {
case *endpoint.ClusterLoadAssignment:
return v.GetClusterName()
- case *cluster.Cluster:
- return v.GetName()
- case *route.RouteConfiguration:
- return v.GetName()
- case *route.ScopedRouteConfiguration:
- return v.GetName()
- case *route.VirtualHost:
- return v.GetName()
- case *listener.Listener:
- return v.GetName()
- case *auth.Secret:
- return v.GetName()
- case *runtime.Runtime:
- return v.GetName()
- case *core.TypedExtensionConfig:
- return v.GetName()
- case *ratelimit.RateLimitConfig:
- return v.GetName()
case types.ResourceWithName:
return v.GetName()
default:
@@ -197,13 +176,13 @@ func mapMerge(dst map[string]bool, src map[string]bool) {
func getClusterReferences(src *cluster.Cluster, out map[resource.Type]map[string]bool) {
endpoints := map[string]bool{}
- switch typ := src.ClusterDiscoveryType.(type) {
+ switch typ := src.GetClusterDiscoveryType().(type) {
case *cluster.Cluster_Type:
if typ.Type == cluster.Cluster_EDS {
- if src.EdsClusterConfig != nil && src.EdsClusterConfig.ServiceName != "" {
- endpoints[src.EdsClusterConfig.ServiceName] = true
+ if src.GetEdsClusterConfig() != nil && src.GetEdsClusterConfig().GetServiceName() != "" {
+ endpoints[src.GetEdsClusterConfig().GetServiceName()] = true
} else {
- endpoints[src.Name] = true
+ endpoints[src.GetName()] = true
}
}
}
@@ -222,8 +201,8 @@ func getListenerReferences(src *listener.Listener, out map[resource.Type]map[str
routes := map[string]bool{}
// Extract route configuration names from HTTP connection manager.
- for _, chain := range src.FilterChains {
- for _, filter := range chain.Filters {
+ for _, chain := range src.GetFilterChains() {
+ for _, filter := range chain.GetFilters() {
config := resource.GetHTTPConnectionManager(filter)
if config == nil {
continue
@@ -236,7 +215,7 @@ func getListenerReferences(src *listener.Listener, out map[resource.Type]map[str
// If the scoped route mapping is embedded, add the referenced route resource names.
for _, s := range config.GetScopedRoutes().GetScopedRouteConfigurationsList().GetScopedRouteConfigurations() {
- routes[s.RouteConfigurationName] = true
+ routes[s.GetRouteConfigurationName()] = true
}
}
}
@@ -254,7 +233,7 @@ func getScopedRouteReferences(src *route.ScopedRouteConfiguration, out map[resou
routes := map[string]bool{}
// For a scoped route configuration, the dependent resource is the RouteConfigurationName.
- routes[src.RouteConfigurationName] = true
+ routes[src.GetRouteConfigurationName()] = true
if len(routes) > 0 {
if _, ok := out[resource.RouteType]; !ok {
diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/simple.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/simple.go
index 43fb6a24a6..ebf63f5b6f 100644
--- a/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/simple.go
+++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/simple.go
@@ -193,8 +193,8 @@ func (cache *snapshotCache) sendHeartbeats(ctx context.Context, node string) {
info.mu.Lock()
for id, watch := range info.watches {
// Respond with the current version regardless of whether the version has changed.
- version := snapshot.GetVersion(watch.Request.TypeUrl)
- resources := snapshot.GetResourcesAndTTL(watch.Request.TypeUrl)
+ version := snapshot.GetVersion(watch.Request.GetTypeUrl())
+ resources := snapshot.GetResourcesAndTTL(watch.Request.GetTypeUrl())
// TODO(snowp): Construct this once per type instead of once per watch.
resourcesWithTTL := map[string]types.ResourceWithTTL{}
@@ -207,7 +207,7 @@ func (cache *snapshotCache) sendHeartbeats(ctx context.Context, node string) {
if len(resourcesWithTTL) == 0 {
continue
}
- cache.log.Debugf("respond open watch %d%v with heartbeat for version %q", id, watch.Request.ResourceNames, version)
+ cache.log.Debugf("respond open watch %d%v with heartbeat for version %q", id, watch.Request.GetResourceNames(), version)
err := cache.respond(ctx, watch.Request, watch.Response, resourcesWithTTL, version, true)
if err != nil {
cache.log.Errorf("received error when attempting to respond to watches: %v", err)
@@ -220,7 +220,7 @@ func (cache *snapshotCache) sendHeartbeats(ctx context.Context, node string) {
}
}
-// SetSnapshotCacheContext updates a snapshot for a node.
+// SetSnapshotCache updates a snapshot for a node.
func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapshot ResourceSnapshot) error {
cache.mu.Lock()
defer cache.mu.Unlock()
@@ -232,33 +232,96 @@ func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapsh
if info, ok := cache.status[node]; ok {
info.mu.Lock()
defer info.mu.Unlock()
- for id, watch := range info.watches {
- version := snapshot.GetVersion(watch.Request.TypeUrl)
- if version != watch.Request.VersionInfo {
- cache.log.Debugf("respond open watch %d %s%v with new version %q", id, watch.Request.TypeUrl, watch.Request.ResourceNames, version)
-
- resources := snapshot.GetResourcesAndTTL(watch.Request.TypeUrl)
- err := cache.respond(ctx, watch.Request, watch.Response, resources, version, false)
- if err != nil {
- return err
- }
- // discard the watch
- delete(info.watches, id)
+ // Respond to SOTW watches for the node.
+ if err := cache.respondSOTWWatches(ctx, info, snapshot); err != nil {
+ return err
+ }
+
+ // Respond to delta watches for the node.
+ return cache.respondDeltaWatches(ctx, info, snapshot)
+ }
+
+ return nil
+}
+
+func (cache *snapshotCache) respondSOTWWatches(ctx context.Context, info *statusInfo, snapshot ResourceSnapshot) error {
+ // responder callback for SOTW watches
+ respond := func(watch ResponseWatch, id int64) error {
+ version := snapshot.GetVersion(watch.Request.GetTypeUrl())
+ if version != watch.Request.GetVersionInfo() {
+ cache.log.Debugf("respond open watch %d %s%v with new version %q", id, watch.Request.GetTypeUrl(), watch.Request.GetResourceNames(), version)
+ resources := snapshot.GetResourcesAndTTL(watch.Request.GetTypeUrl())
+ err := cache.respond(ctx, watch.Request, watch.Response, resources, version, false)
+ if err != nil {
+ return err
}
+ // discard the watch
+ delete(info.watches, id)
}
+ return nil
+ }
- // We only calculate version hashes when using delta. We don't
- // want to do this when using SOTW so we can avoid unnecessary
- // computational cost if not using delta.
- if len(info.deltaWatches) > 0 {
- err := snapshot.ConstructVersionMap()
+ // If ADS is enabled we need to order response watches so we guarantee
+ // sending them in the correct order. Go's default implementation
+ // of maps are randomized order when ranged over.
+ if cache.ads {
+ info.orderResponseWatches()
+ for _, key := range info.orderedWatches {
+ err := respond(info.watches[key.ID], key.ID)
+ if err != nil {
+ return err
+ }
+ }
+ } else {
+ for id, watch := range info.watches {
+ err := respond(watch, id)
if err != nil {
return err
}
}
+ }
+
+ return nil
+}
- // process our delta watches
+func (cache *snapshotCache) respondDeltaWatches(ctx context.Context, info *statusInfo, snapshot ResourceSnapshot) error {
+ // We only calculate version hashes when using delta. We don't
+ // want to do this when using SOTW so we can avoid unnecessary
+ // computational cost if not using delta.
+ if len(info.deltaWatches) == 0 {
+ return nil
+ }
+
+ err := snapshot.ConstructVersionMap()
+ if err != nil {
+ return err
+ }
+
+ // If ADS is enabled we need to order response delta watches so we guarantee
+ // sending them in the correct order. Go's default implementation
+ // of maps are randomized order when ranged over.
+ if cache.ads {
+ info.orderResponseDeltaWatches()
+ for _, key := range info.orderedDeltaWatches {
+ watch := info.deltaWatches[key.ID]
+ res, err := cache.respondDelta(
+ ctx,
+ snapshot,
+ watch.Request,
+ watch.Response,
+ watch.StreamState,
+ )
+ if err != nil {
+ return err
+ }
+ // If we detect a nil response here, that means there has been no state change
+ // so we don't want to respond or remove any existing resource watches
+ if res != nil {
+ delete(info.deltaWatches, key.ID)
+ }
+ }
+ } else {
for id, watch := range info.deltaWatches {
res, err := cache.respondDelta(
ctx,
@@ -277,11 +340,10 @@ func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapsh
}
}
}
-
return nil
}
-// GetSnapshots gets the snapshot for a node, and returns an error if not found.
+// GetSnapshot gets the snapshot for a node, and returns an error if not found.
func (cache *snapshotCache) GetSnapshot(node string) (ResourceSnapshot, error) {
cache.mu.RLock()
defer cache.mu.RUnlock()
@@ -324,14 +386,14 @@ func superset(names map[string]bool, resources map[string]types.ResourceWithTTL)
// CreateWatch returns a watch for an xDS request. A nil function may be
// returned if an error occurs.
func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.StreamState, value chan Response) func() {
- nodeID := cache.hash.ID(request.Node)
+ nodeID := cache.hash.ID(request.GetNode())
cache.mu.Lock()
defer cache.mu.Unlock()
info, ok := cache.status[nodeID]
if !ok {
- info = newStatusInfo(request.Node)
+ info = newStatusInfo(request.GetNode())
cache.status[nodeID] = info
}
@@ -341,31 +403,30 @@ func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.Str
info.mu.Unlock()
var version string
-
snapshot, exists := cache.snapshots[nodeID]
if exists {
- version = snapshot.GetVersion(request.TypeUrl)
+ version = snapshot.GetVersion(request.GetTypeUrl())
}
if exists {
- knownResourceNames := streamState.GetKnownResourceNames(request.TypeUrl)
+ knownResourceNames := streamState.GetKnownResourceNames(request.GetTypeUrl())
diff := []string{}
- for _, r := range request.ResourceNames {
+ for _, r := range request.GetResourceNames() {
if _, ok := knownResourceNames[r]; !ok {
diff = append(diff, r)
}
}
cache.log.Debugf("nodeID %q requested %s%v and known %v. Diff %v", nodeID,
- request.TypeUrl, request.ResourceNames, knownResourceNames, diff)
+ request.GetTypeUrl(), request.GetResourceNames(), knownResourceNames, diff)
if len(diff) > 0 {
- resources := snapshot.GetResourcesAndTTL(request.TypeUrl)
+ resources := snapshot.GetResourcesAndTTL(request.GetTypeUrl())
for _, name := range diff {
if _, exists := resources[name]; exists {
if err := cache.respond(context.Background(), request, value, resources, version, false); err != nil {
- cache.log.Errorf("failed to send a response for %s%v to nodeID %q: %s", request.TypeUrl,
- request.ResourceNames, nodeID, err)
+ cache.log.Errorf("failed to send a response for %s%v to nodeID %q: %s", request.GetTypeUrl(),
+ request.GetResourceNames(), nodeID, err)
return nil
}
return func() {}
@@ -375,9 +436,9 @@ func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.Str
}
// if the requested version is up-to-date or missing a response, leave an open watch
- if !exists || request.VersionInfo == version {
+ if !exists || request.GetVersionInfo() == version {
watchID := cache.nextWatchID()
- cache.log.Debugf("open watch %d for %s%v from nodeID %q, version %q", watchID, request.TypeUrl, request.ResourceNames, nodeID, request.VersionInfo)
+ cache.log.Debugf("open watch %d for %s%v from nodeID %q, version %q", watchID, request.GetTypeUrl(), request.GetResourceNames(), nodeID, request.GetVersionInfo())
info.mu.Lock()
info.watches[watchID] = ResponseWatch{Request: request, Response: value}
info.mu.Unlock()
@@ -385,10 +446,10 @@ func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.Str
}
// otherwise, the watch may be responded immediately
- resources := snapshot.GetResourcesAndTTL(request.TypeUrl)
+ resources := snapshot.GetResourcesAndTTL(request.GetTypeUrl())
if err := cache.respond(context.Background(), request, value, resources, version, false); err != nil {
- cache.log.Errorf("failed to send a response for %s%v to nodeID %q: %s", request.TypeUrl,
- request.ResourceNames, nodeID, err)
+ cache.log.Errorf("failed to send a response for %s%v to nodeID %q: %s", request.GetTypeUrl(),
+ request.GetResourceNames(), nodeID, err)
return nil
}
@@ -418,14 +479,14 @@ func (cache *snapshotCache) cancelWatch(nodeID string, watchID int64) func() {
func (cache *snapshotCache) respond(ctx context.Context, request *Request, value chan Response, resources map[string]types.ResourceWithTTL, version string, heartbeat bool) error {
// for ADS, the request names must match the snapshot names
// if they do not, then the watch is never responded, and it is expected that envoy makes another request
- if len(request.ResourceNames) != 0 && cache.ads {
- if err := superset(nameSet(request.ResourceNames), resources); err != nil {
- cache.log.Warnf("ADS mode: not responding to request: %v", err)
+ if len(request.GetResourceNames()) != 0 && cache.ads {
+ if err := superset(nameSet(request.GetResourceNames()), resources); err != nil {
+ cache.log.Warnf("ADS mode: not responding to request %s%v: %v", request.GetTypeUrl(), request.GetResourceNames(), err)
return nil
}
}
- cache.log.Debugf("respond %s%v version %q with version %q", request.TypeUrl, request.ResourceNames, request.VersionInfo, version)
+ cache.log.Debugf("respond %s%v version %q with version %q", request.GetTypeUrl(), request.GetResourceNames(), request.GetVersionInfo(), version)
select {
case value <- createResponse(ctx, request, resources, version, heartbeat):
@@ -441,8 +502,8 @@ func createResponse(ctx context.Context, request *Request, resources map[string]
// Reply only with the requested resources. Envoy may ask each resource
// individually in a separate stream. It is ok to reply with the same version
// on separate streams since requests do not share their response versions.
- if len(request.ResourceNames) != 0 {
- set := nameSet(request.ResourceNames)
+ if len(request.GetResourceNames()) != 0 {
+ set := nameSet(request.GetResourceNames())
for name, resource := range resources {
if set[name] {
filtered = append(filtered, resource)
@@ -465,7 +526,7 @@ func createResponse(ctx context.Context, request *Request, resources map[string]
// CreateDeltaWatch returns a watch for a delta xDS request which implements the Simple SnapshotCache.
func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() {
- nodeID := cache.hash.ID(request.Node)
+ nodeID := cache.hash.ID(request.GetNode())
t := request.GetTypeUrl()
cache.mu.Lock()
@@ -473,7 +534,7 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream
info, ok := cache.status[nodeID]
if !ok {
- info = newStatusInfo(request.Node)
+ info = newStatusInfo(request.GetNode())
cache.status[nodeID] = info
}
@@ -520,9 +581,9 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream
// Respond to a delta watch with the provided snapshot value. If the response is nil, there has been no state change.
func (cache *snapshotCache) respondDelta(ctx context.Context, snapshot ResourceSnapshot, request *DeltaRequest, value chan DeltaResponse, state stream.StreamState) (*RawDeltaResponse, error) {
resp := createDeltaResponse(ctx, request, state, resourceContainer{
- resourceMap: snapshot.GetResources(request.TypeUrl),
- versionMap: snapshot.GetVersionMap(request.TypeUrl),
- systemVersion: snapshot.GetVersion(request.TypeUrl),
+ resourceMap: snapshot.GetResources(request.GetTypeUrl()),
+ versionMap: snapshot.GetVersionMap(request.GetTypeUrl()),
+ systemVersion: snapshot.GetVersion(request.GetTypeUrl()),
})
// Only send a response if there were changes
@@ -531,7 +592,7 @@ func (cache *snapshotCache) respondDelta(ctx context.Context, snapshot ResourceS
if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 || (state.IsWildcard() && state.IsFirst()) {
if cache.log != nil {
cache.log.Debugf("node: %s, sending delta response for typeURL %s with resources: %v removed resources: %v with wildcard: %t",
- request.GetNode().GetId(), request.TypeUrl, GetResourceNames(resp.Resources), resp.RemovedResources, state.IsWildcard())
+ request.GetNode().GetId(), request.GetTypeUrl(), GetResourceNames(resp.Resources), resp.RemovedResources, state.IsWildcard())
}
select {
case value <- resp:
@@ -563,7 +624,7 @@ func (cache *snapshotCache) cancelDeltaWatch(nodeID string, watchID int64) func(
// Fetch implements the cache fetch function.
// Fetch is called on multiple streams, so responding to individual names with the same version works.
func (cache *snapshotCache) Fetch(ctx context.Context, request *Request) (Response, error) {
- nodeID := cache.hash.ID(request.Node)
+ nodeID := cache.hash.ID(request.GetNode())
cache.mu.RLock()
defer cache.mu.RUnlock()
@@ -571,13 +632,13 @@ func (cache *snapshotCache) Fetch(ctx context.Context, request *Request) (Respon
if snapshot, exists := cache.snapshots[nodeID]; exists {
// Respond only if the request version is distinct from the current snapshot state.
// It might be beneficial to hold the request since Envoy will re-attempt the refresh.
- version := snapshot.GetVersion(request.TypeUrl)
- if request.VersionInfo == version {
+ version := snapshot.GetVersion(request.GetTypeUrl())
+ if request.GetVersionInfo() == version {
cache.log.Warnf("skip fetch: version up to date")
return nil, &types.SkipFetchError{}
}
- resources := snapshot.GetResourcesAndTTL(request.TypeUrl)
+ resources := snapshot.GetResourcesAndTTL(request.GetTypeUrl())
out := createResponse(ctx, request, resources, version, false)
return out, nil
}
diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/status.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/status.go
index 84db1f9821..dca93e02ff 100644
--- a/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/status.go
+++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/status.go
@@ -15,6 +15,7 @@
package cache
import (
+ "sort"
"sync"
"time"
@@ -36,7 +37,7 @@ func (IDHash) ID(node *core.Node) string {
if node == nil {
return ""
}
- return node.Id
+ return node.GetId()
}
var _ NodeHash = IDHash{}
@@ -65,10 +66,12 @@ type statusInfo struct {
node *core.Node
// watches are indexed channels for the response watches and the original requests.
- watches map[int64]ResponseWatch
+ watches map[int64]ResponseWatch
+ orderedWatches keys
// deltaWatches are indexed channels for the delta response watches and the original requests
- deltaWatches map[int64]DeltaResponseWatch
+ deltaWatches map[int64]DeltaResponseWatch
+ orderedDeltaWatches keys
// the timestamp of the last watch request
lastWatchRequestTime time.Time
@@ -105,9 +108,10 @@ type DeltaResponseWatch struct {
// newStatusInfo initializes a status info data structure.
func newStatusInfo(node *core.Node) *statusInfo {
out := statusInfo{
- node: node,
- watches: make(map[int64]ResponseWatch),
- deltaWatches: make(map[int64]DeltaResponseWatch),
+ node: node,
+ watches: make(map[int64]ResponseWatch),
+ orderedWatches: make(keys, 0),
+ deltaWatches: make(map[int64]DeltaResponseWatch),
}
return &out
}
@@ -155,3 +159,41 @@ func (info *statusInfo) setDeltaResponseWatch(id int64, drw DeltaResponseWatch)
defer info.mu.Unlock()
info.deltaWatches[id] = drw
}
+
+// orderResponseWatches will track a list of watch keys and order them if
+// true is passed.
+func (info *statusInfo) orderResponseWatches() {
+ info.orderedWatches = make(keys, len(info.watches))
+
+ var index int
+ for id, watch := range info.watches {
+ info.orderedWatches[index] = key{
+ ID: id,
+ TypeURL: watch.Request.GetTypeUrl(),
+ }
+ index++
+ }
+
+ // Sort our list which we can use in the SetSnapshot functions.
+ // This is only run when we enable ADS on the cache.
+ sort.Sort(info.orderedWatches)
+}
+
+// orderResponseDeltaWatches will track a list of delta watch keys and order them if
+// true is passed.
+func (info *statusInfo) orderResponseDeltaWatches() {
+ info.orderedDeltaWatches = make(keys, len(info.deltaWatches))
+
+ var index int
+ for id, deltaWatch := range info.deltaWatches {
+ info.orderedDeltaWatches[index] = key{
+ ID: id,
+ TypeURL: deltaWatch.Request.GetTypeUrl(),
+ }
+ index++
+ }
+
+ // Sort our list which we can use in the SetSnapshot functions.
+ // This is only run when we enable ADS on the cache.
+ sort.Sort(info.orderedDeltaWatches)
+}
diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/ya.make b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/ya.make
index 4d64296a9c..d3316316c0 100644
--- a/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/ya.make
+++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/ya.make
@@ -7,6 +7,7 @@ SRCS(
delta.go
linear.go
mux.go
+ order.go
resource.go
resources.go
simple.go
@@ -16,6 +17,7 @@ SRCS(
GO_TEST_SRCS(
linear_test.go
+ order_test.go
status_test.go
)