diff options
author | robot-contrib <robot-contrib@yandex-team.com> | 2024-01-30 11:20:39 +0300 |
---|---|---|
committer | robot-contrib <robot-contrib@yandex-team.com> | 2024-01-30 12:12:51 +0300 |
commit | be737fd8956853e06bd2c4f9fcd4a85188f4c172 (patch) | |
tree | 5bd76802fac1096dfd90983c7739d50de367a79f /vendor/github.com/envoyproxy/go-control-plane/pkg/cache | |
parent | fe62880c46b1f2c9fec779b0dc39f8a92ce256a5 (diff) | |
download | ydb-be737fd8956853e06bd2c4f9fcd4a85188f4c172.tar.gz |
Update vendor/github.com/envoyproxy/go-control-plane to 0.12.0
Diffstat (limited to 'vendor/github.com/envoyproxy/go-control-plane/pkg/cache')
9 files changed, 248 insertions, 119 deletions
diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/types/types.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/types/types.go index b99aacb84f..ffaf0a9774 100644 --- a/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/types/types.go +++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/types/types.go @@ -38,13 +38,17 @@ func (e SkipFetchError) Error() string { // ResponseType enumeration of supported response types type ResponseType int +// NOTE: The order of this enum MATTERS! +// https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#aggregated-discovery-service +// ADS expects things to be returned in a specific order. +// See the following issue for details: https://github.com/envoyproxy/go-control-plane/issues/526 const ( - Endpoint ResponseType = iota - Cluster + Cluster ResponseType = iota + Endpoint + Listener Route ScopedRoute VirtualHost - Listener Secret Runtime ExtensionConfig diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/types/ya.make b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/types/ya.make index 293b3bb9fc..c7d9e357f2 100644 --- a/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/types/ya.make +++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/types/ya.make @@ -2,6 +2,8 @@ GO_LIBRARY() LICENSE(Apache-2.0) -SRCS(types.go) +SRCS( + types.go +) END() diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/cache.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/cache.go index 55f60e0d6a..5ad8e24140 100644 --- a/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/cache.go +++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/cache.go @@ -44,6 +44,10 @@ type DeltaRequest = discovery.DeltaDiscoveryRequest // ConfigWatcher implementation must be thread-safe. type ConfigWatcher interface { // CreateWatch returns a new open watch from a non-empty request. + // This is the entrypoint to propagate configuration changes the + // provided Response channel. State from the gRPC server is utilized + // to make sure consuming cache implementations can see what the server has sent to clients. + // // An individual consumer normally issues a single open watch by each type URL. // // The provided channel produces requested resources as responses, once they are available. @@ -53,6 +57,9 @@ type ConfigWatcher interface { CreateWatch(*Request, stream.StreamState, chan Response) (cancel func()) // CreateDeltaWatch returns a new open incremental xDS watch. + // This is the entrypoint to propagate configuration changes the + // provided DeltaResponse channel. State from the gRPC server is utilized + // to make sure consuming cache implementations can see what the server has sent to clients. // // The provided channel produces requested resources as responses, or spontaneous updates in accordance // with the incremental xDS specification. @@ -160,8 +167,10 @@ type RawDeltaResponse struct { marshaledResponse atomic.Value } -var _ Response = &RawResponse{} -var _ DeltaResponse = &RawDeltaResponse{} +var ( + _ Response = &RawResponse{} + _ DeltaResponse = &RawDeltaResponse{} +) // PassthroughResponse is a pre constructed xDS response that need not go through marshaling transformations. type PassthroughResponse struct { @@ -188,8 +197,10 @@ type DeltaPassthroughResponse struct { ctx context.Context } -var _ Response = &PassthroughResponse{} -var _ DeltaResponse = &DeltaPassthroughResponse{} +var ( + _ Response = &PassthroughResponse{} + _ DeltaResponse = &DeltaPassthroughResponse{} +) // GetDiscoveryResponse performs the marshaling the first time its called and uses the cached response subsequently. // This is necessary because the marshaled response does not change across the calls. @@ -218,7 +229,7 @@ func (r *RawResponse) GetDiscoveryResponse() (*discovery.DiscoveryResponse, erro marshaledResponse = &discovery.DiscoveryResponse{ VersionInfo: r.Version, Resources: marshaledResources, - TypeUrl: r.Request.TypeUrl, + TypeUrl: r.GetRequest().GetTypeUrl(), } r.marshaledResponse.Store(marshaledResponse) @@ -249,7 +260,7 @@ func (r *RawDeltaResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscover marshaledResources[i] = &discovery.Resource{ Name: name, Resource: &anypb.Any{ - TypeUrl: r.DeltaRequest.TypeUrl, + TypeUrl: r.GetDeltaRequest().GetTypeUrl(), Value: marshaledResource, }, Version: version, @@ -259,7 +270,7 @@ func (r *RawDeltaResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscover marshaledResponse = &discovery.DeltaDiscoveryResponse{ Resources: marshaledResources, RemovedResources: r.RemovedResources, - TypeUrl: r.DeltaRequest.TypeUrl, + TypeUrl: r.GetDeltaRequest().GetTypeUrl(), SystemVersionInfo: r.SystemVersionInfo, } r.marshaledResponse.Store(marshaledResponse) @@ -315,14 +326,14 @@ func (r *RawResponse) maybeCreateTTLResource(resource types.ResourceWithTTL) (ty if err != nil { return nil, "", err } - rsrc.TypeUrl = r.Request.TypeUrl + rsrc.TypeUrl = r.GetRequest().GetTypeUrl() wrappedResource.Resource = rsrc } return wrappedResource, deltaResourceTypeURL, nil } - return resource.Resource, r.Request.TypeUrl, nil + return resource.Resource, r.GetRequest().GetTypeUrl(), nil } // GetDiscoveryResponse returns the final passthrough Discovery Response. @@ -347,19 +358,22 @@ func (r *DeltaPassthroughResponse) GetDeltaRequest() *discovery.DeltaDiscoveryRe // GetVersion returns the response version. func (r *PassthroughResponse) GetVersion() (string, error) { - if r.DiscoveryResponse != nil { - return r.DiscoveryResponse.VersionInfo, nil + discoveryResponse, _ := r.GetDiscoveryResponse() + if discoveryResponse != nil { + return discoveryResponse.GetVersionInfo(), nil } return "", fmt.Errorf("DiscoveryResponse is nil") } + func (r *PassthroughResponse) GetContext() context.Context { return r.ctx } // GetSystemVersion returns the response version. func (r *DeltaPassthroughResponse) GetSystemVersion() (string, error) { - if r.DeltaDiscoveryResponse != nil { - return r.DeltaDiscoveryResponse.SystemVersionInfo, nil + deltaDiscoveryResponse, _ := r.GetDeltaDiscoveryResponse() + if deltaDiscoveryResponse != nil { + return deltaDiscoveryResponse.GetSystemVersionInfo(), nil } return "", fmt.Errorf("DeltaDiscoveryResponse is nil") } diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/linear.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/linear.go index cf5ab7e268..f7786ac4f9 100644 --- a/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/linear.go +++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/linear.go @@ -187,7 +187,7 @@ func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaRe if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 { if cache.log != nil { cache.log.Debugf("[linear cache] node: %s, sending delta response for typeURL %s with resources: %v removed resources: %v with wildcard: %t", - request.GetNode().GetId(), request.TypeUrl, GetResourceNames(resp.Resources), resp.RemovedResources, state.IsWildcard()) + request.GetNode().GetId(), request.GetTypeUrl(), GetResourceNames(resp.Resources), resp.RemovedResources, state.IsWildcard()) } value <- resp return resp @@ -299,7 +299,7 @@ func (cache *LinearCache) GetResources() map[string]types.Resource { } func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, value chan Response) func() { - if request.TypeUrl != cache.typeURL { + if request.GetTypeUrl() != cache.typeURL { value <- nil return nil } @@ -312,8 +312,8 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va // strip version prefix if it is present var lastVersion uint64 var err error - if strings.HasPrefix(request.VersionInfo, cache.versionPrefix) { - lastVersion, err = strconv.ParseUint(request.VersionInfo[len(cache.versionPrefix):], 0, 64) + if strings.HasPrefix(request.GetVersionInfo(), cache.versionPrefix) { + lastVersion, err = strconv.ParseUint(request.GetVersionInfo()[len(cache.versionPrefix):], 0, 64) } else { err = errors.New("mis-matched version prefix") } @@ -321,13 +321,14 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va cache.mu.Lock() defer cache.mu.Unlock() - if err != nil { + switch { + case err != nil: stale = true - staleResources = request.ResourceNames - } else if len(request.ResourceNames) == 0 { + staleResources = request.GetResourceNames() + case len(request.GetResourceNames()) == 0: stale = lastVersion != cache.version - } else { - for _, name := range request.ResourceNames { + default: + for _, name := range request.GetResourceNames() { // When a resource is removed, its version defaults 0 and it is not considered stale. if lastVersion < cache.versionVector[name] { stale = true @@ -340,7 +341,7 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va return nil } // Create open watches since versions are up to date. - if len(request.ResourceNames) == 0 { + if len(request.GetResourceNames()) == 0 { cache.watchAll[value] = struct{}{} return func() { cache.mu.Lock() @@ -348,7 +349,7 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va delete(cache.watchAll, value) } } - for _, name := range request.ResourceNames { + for _, name := range request.GetResourceNames() { set, exists := cache.watches[name] if !exists { set = make(watches) @@ -359,7 +360,7 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va return func() { cache.mu.Lock() defer cache.mu.Unlock() - for _, name := range request.ResourceNames { + for _, name := range request.GetResourceNames() { set, exists := cache.watches[name] if exists { delete(set, value) diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/order.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/order.go new file mode 100644 index 0000000000..c416946e82 --- /dev/null +++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/order.go @@ -0,0 +1,24 @@ +package cache + +// Key is an internal sorting data structure we can use to +// order responses by Type and their associated watch IDs. +type key struct { + ID int64 + TypeURL string +} + +// Keys implements Go's sorting.Sort interface +type keys []key + +func (k keys) Len() int { + return len(k) +} + +// Less compares the typeURL and determines what order things should be sent. +func (k keys) Less(i, j int) bool { + return GetResponseType(k[i].TypeURL) < GetResponseType(k[j].TypeURL) +} + +func (k keys) Swap(i, j int) { + k[i], k[j] = k[j], k[i] +} diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/resource.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/resource.go index ea57e4714a..d4c25f2a11 100644 --- a/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/resource.go +++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/resource.go @@ -22,15 +22,12 @@ import ( "google.golang.org/protobuf/proto" cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" - core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" - auth "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3" runtime "github.com/envoyproxy/go-control-plane/envoy/service/runtime/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/resource/v3" - ratelimit "github.com/envoyproxy/go-control-plane/ratelimit/config/ratelimit/v3" ) // GetResponseType returns the enumeration for a valid xDS type URL. @@ -93,24 +90,6 @@ func GetResourceName(res types.Resource) string { switch v := res.(type) { case *endpoint.ClusterLoadAssignment: return v.GetClusterName() - case *cluster.Cluster: - return v.GetName() - case *route.RouteConfiguration: - return v.GetName() - case *route.ScopedRouteConfiguration: - return v.GetName() - case *route.VirtualHost: - return v.GetName() - case *listener.Listener: - return v.GetName() - case *auth.Secret: - return v.GetName() - case *runtime.Runtime: - return v.GetName() - case *core.TypedExtensionConfig: - return v.GetName() - case *ratelimit.RateLimitConfig: - return v.GetName() case types.ResourceWithName: return v.GetName() default: @@ -197,13 +176,13 @@ func mapMerge(dst map[string]bool, src map[string]bool) { func getClusterReferences(src *cluster.Cluster, out map[resource.Type]map[string]bool) { endpoints := map[string]bool{} - switch typ := src.ClusterDiscoveryType.(type) { + switch typ := src.GetClusterDiscoveryType().(type) { case *cluster.Cluster_Type: if typ.Type == cluster.Cluster_EDS { - if src.EdsClusterConfig != nil && src.EdsClusterConfig.ServiceName != "" { - endpoints[src.EdsClusterConfig.ServiceName] = true + if src.GetEdsClusterConfig() != nil && src.GetEdsClusterConfig().GetServiceName() != "" { + endpoints[src.GetEdsClusterConfig().GetServiceName()] = true } else { - endpoints[src.Name] = true + endpoints[src.GetName()] = true } } } @@ -222,8 +201,8 @@ func getListenerReferences(src *listener.Listener, out map[resource.Type]map[str routes := map[string]bool{} // Extract route configuration names from HTTP connection manager. - for _, chain := range src.FilterChains { - for _, filter := range chain.Filters { + for _, chain := range src.GetFilterChains() { + for _, filter := range chain.GetFilters() { config := resource.GetHTTPConnectionManager(filter) if config == nil { continue @@ -236,7 +215,7 @@ func getListenerReferences(src *listener.Listener, out map[resource.Type]map[str // If the scoped route mapping is embedded, add the referenced route resource names. for _, s := range config.GetScopedRoutes().GetScopedRouteConfigurationsList().GetScopedRouteConfigurations() { - routes[s.RouteConfigurationName] = true + routes[s.GetRouteConfigurationName()] = true } } } @@ -254,7 +233,7 @@ func getScopedRouteReferences(src *route.ScopedRouteConfiguration, out map[resou routes := map[string]bool{} // For a scoped route configuration, the dependent resource is the RouteConfigurationName. - routes[src.RouteConfigurationName] = true + routes[src.GetRouteConfigurationName()] = true if len(routes) > 0 { if _, ok := out[resource.RouteType]; !ok { diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/simple.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/simple.go index 43fb6a24a6..ebf63f5b6f 100644 --- a/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/simple.go +++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/simple.go @@ -193,8 +193,8 @@ func (cache *snapshotCache) sendHeartbeats(ctx context.Context, node string) { info.mu.Lock() for id, watch := range info.watches { // Respond with the current version regardless of whether the version has changed. - version := snapshot.GetVersion(watch.Request.TypeUrl) - resources := snapshot.GetResourcesAndTTL(watch.Request.TypeUrl) + version := snapshot.GetVersion(watch.Request.GetTypeUrl()) + resources := snapshot.GetResourcesAndTTL(watch.Request.GetTypeUrl()) // TODO(snowp): Construct this once per type instead of once per watch. resourcesWithTTL := map[string]types.ResourceWithTTL{} @@ -207,7 +207,7 @@ func (cache *snapshotCache) sendHeartbeats(ctx context.Context, node string) { if len(resourcesWithTTL) == 0 { continue } - cache.log.Debugf("respond open watch %d%v with heartbeat for version %q", id, watch.Request.ResourceNames, version) + cache.log.Debugf("respond open watch %d%v with heartbeat for version %q", id, watch.Request.GetResourceNames(), version) err := cache.respond(ctx, watch.Request, watch.Response, resourcesWithTTL, version, true) if err != nil { cache.log.Errorf("received error when attempting to respond to watches: %v", err) @@ -220,7 +220,7 @@ func (cache *snapshotCache) sendHeartbeats(ctx context.Context, node string) { } } -// SetSnapshotCacheContext updates a snapshot for a node. +// SetSnapshotCache updates a snapshot for a node. func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapshot ResourceSnapshot) error { cache.mu.Lock() defer cache.mu.Unlock() @@ -232,33 +232,96 @@ func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapsh if info, ok := cache.status[node]; ok { info.mu.Lock() defer info.mu.Unlock() - for id, watch := range info.watches { - version := snapshot.GetVersion(watch.Request.TypeUrl) - if version != watch.Request.VersionInfo { - cache.log.Debugf("respond open watch %d %s%v with new version %q", id, watch.Request.TypeUrl, watch.Request.ResourceNames, version) - - resources := snapshot.GetResourcesAndTTL(watch.Request.TypeUrl) - err := cache.respond(ctx, watch.Request, watch.Response, resources, version, false) - if err != nil { - return err - } - // discard the watch - delete(info.watches, id) + // Respond to SOTW watches for the node. + if err := cache.respondSOTWWatches(ctx, info, snapshot); err != nil { + return err + } + + // Respond to delta watches for the node. + return cache.respondDeltaWatches(ctx, info, snapshot) + } + + return nil +} + +func (cache *snapshotCache) respondSOTWWatches(ctx context.Context, info *statusInfo, snapshot ResourceSnapshot) error { + // responder callback for SOTW watches + respond := func(watch ResponseWatch, id int64) error { + version := snapshot.GetVersion(watch.Request.GetTypeUrl()) + if version != watch.Request.GetVersionInfo() { + cache.log.Debugf("respond open watch %d %s%v with new version %q", id, watch.Request.GetTypeUrl(), watch.Request.GetResourceNames(), version) + resources := snapshot.GetResourcesAndTTL(watch.Request.GetTypeUrl()) + err := cache.respond(ctx, watch.Request, watch.Response, resources, version, false) + if err != nil { + return err } + // discard the watch + delete(info.watches, id) } + return nil + } - // We only calculate version hashes when using delta. We don't - // want to do this when using SOTW so we can avoid unnecessary - // computational cost if not using delta. - if len(info.deltaWatches) > 0 { - err := snapshot.ConstructVersionMap() + // If ADS is enabled we need to order response watches so we guarantee + // sending them in the correct order. Go's default implementation + // of maps are randomized order when ranged over. + if cache.ads { + info.orderResponseWatches() + for _, key := range info.orderedWatches { + err := respond(info.watches[key.ID], key.ID) + if err != nil { + return err + } + } + } else { + for id, watch := range info.watches { + err := respond(watch, id) if err != nil { return err } } + } + + return nil +} - // process our delta watches +func (cache *snapshotCache) respondDeltaWatches(ctx context.Context, info *statusInfo, snapshot ResourceSnapshot) error { + // We only calculate version hashes when using delta. We don't + // want to do this when using SOTW so we can avoid unnecessary + // computational cost if not using delta. + if len(info.deltaWatches) == 0 { + return nil + } + + err := snapshot.ConstructVersionMap() + if err != nil { + return err + } + + // If ADS is enabled we need to order response delta watches so we guarantee + // sending them in the correct order. Go's default implementation + // of maps are randomized order when ranged over. + if cache.ads { + info.orderResponseDeltaWatches() + for _, key := range info.orderedDeltaWatches { + watch := info.deltaWatches[key.ID] + res, err := cache.respondDelta( + ctx, + snapshot, + watch.Request, + watch.Response, + watch.StreamState, + ) + if err != nil { + return err + } + // If we detect a nil response here, that means there has been no state change + // so we don't want to respond or remove any existing resource watches + if res != nil { + delete(info.deltaWatches, key.ID) + } + } + } else { for id, watch := range info.deltaWatches { res, err := cache.respondDelta( ctx, @@ -277,11 +340,10 @@ func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapsh } } } - return nil } -// GetSnapshots gets the snapshot for a node, and returns an error if not found. +// GetSnapshot gets the snapshot for a node, and returns an error if not found. func (cache *snapshotCache) GetSnapshot(node string) (ResourceSnapshot, error) { cache.mu.RLock() defer cache.mu.RUnlock() @@ -324,14 +386,14 @@ func superset(names map[string]bool, resources map[string]types.ResourceWithTTL) // CreateWatch returns a watch for an xDS request. A nil function may be // returned if an error occurs. func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.StreamState, value chan Response) func() { - nodeID := cache.hash.ID(request.Node) + nodeID := cache.hash.ID(request.GetNode()) cache.mu.Lock() defer cache.mu.Unlock() info, ok := cache.status[nodeID] if !ok { - info = newStatusInfo(request.Node) + info = newStatusInfo(request.GetNode()) cache.status[nodeID] = info } @@ -341,31 +403,30 @@ func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.Str info.mu.Unlock() var version string - snapshot, exists := cache.snapshots[nodeID] if exists { - version = snapshot.GetVersion(request.TypeUrl) + version = snapshot.GetVersion(request.GetTypeUrl()) } if exists { - knownResourceNames := streamState.GetKnownResourceNames(request.TypeUrl) + knownResourceNames := streamState.GetKnownResourceNames(request.GetTypeUrl()) diff := []string{} - for _, r := range request.ResourceNames { + for _, r := range request.GetResourceNames() { if _, ok := knownResourceNames[r]; !ok { diff = append(diff, r) } } cache.log.Debugf("nodeID %q requested %s%v and known %v. Diff %v", nodeID, - request.TypeUrl, request.ResourceNames, knownResourceNames, diff) + request.GetTypeUrl(), request.GetResourceNames(), knownResourceNames, diff) if len(diff) > 0 { - resources := snapshot.GetResourcesAndTTL(request.TypeUrl) + resources := snapshot.GetResourcesAndTTL(request.GetTypeUrl()) for _, name := range diff { if _, exists := resources[name]; exists { if err := cache.respond(context.Background(), request, value, resources, version, false); err != nil { - cache.log.Errorf("failed to send a response for %s%v to nodeID %q: %s", request.TypeUrl, - request.ResourceNames, nodeID, err) + cache.log.Errorf("failed to send a response for %s%v to nodeID %q: %s", request.GetTypeUrl(), + request.GetResourceNames(), nodeID, err) return nil } return func() {} @@ -375,9 +436,9 @@ func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.Str } // if the requested version is up-to-date or missing a response, leave an open watch - if !exists || request.VersionInfo == version { + if !exists || request.GetVersionInfo() == version { watchID := cache.nextWatchID() - cache.log.Debugf("open watch %d for %s%v from nodeID %q, version %q", watchID, request.TypeUrl, request.ResourceNames, nodeID, request.VersionInfo) + cache.log.Debugf("open watch %d for %s%v from nodeID %q, version %q", watchID, request.GetTypeUrl(), request.GetResourceNames(), nodeID, request.GetVersionInfo()) info.mu.Lock() info.watches[watchID] = ResponseWatch{Request: request, Response: value} info.mu.Unlock() @@ -385,10 +446,10 @@ func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.Str } // otherwise, the watch may be responded immediately - resources := snapshot.GetResourcesAndTTL(request.TypeUrl) + resources := snapshot.GetResourcesAndTTL(request.GetTypeUrl()) if err := cache.respond(context.Background(), request, value, resources, version, false); err != nil { - cache.log.Errorf("failed to send a response for %s%v to nodeID %q: %s", request.TypeUrl, - request.ResourceNames, nodeID, err) + cache.log.Errorf("failed to send a response for %s%v to nodeID %q: %s", request.GetTypeUrl(), + request.GetResourceNames(), nodeID, err) return nil } @@ -418,14 +479,14 @@ func (cache *snapshotCache) cancelWatch(nodeID string, watchID int64) func() { func (cache *snapshotCache) respond(ctx context.Context, request *Request, value chan Response, resources map[string]types.ResourceWithTTL, version string, heartbeat bool) error { // for ADS, the request names must match the snapshot names // if they do not, then the watch is never responded, and it is expected that envoy makes another request - if len(request.ResourceNames) != 0 && cache.ads { - if err := superset(nameSet(request.ResourceNames), resources); err != nil { - cache.log.Warnf("ADS mode: not responding to request: %v", err) + if len(request.GetResourceNames()) != 0 && cache.ads { + if err := superset(nameSet(request.GetResourceNames()), resources); err != nil { + cache.log.Warnf("ADS mode: not responding to request %s%v: %v", request.GetTypeUrl(), request.GetResourceNames(), err) return nil } } - cache.log.Debugf("respond %s%v version %q with version %q", request.TypeUrl, request.ResourceNames, request.VersionInfo, version) + cache.log.Debugf("respond %s%v version %q with version %q", request.GetTypeUrl(), request.GetResourceNames(), request.GetVersionInfo(), version) select { case value <- createResponse(ctx, request, resources, version, heartbeat): @@ -441,8 +502,8 @@ func createResponse(ctx context.Context, request *Request, resources map[string] // Reply only with the requested resources. Envoy may ask each resource // individually in a separate stream. It is ok to reply with the same version // on separate streams since requests do not share their response versions. - if len(request.ResourceNames) != 0 { - set := nameSet(request.ResourceNames) + if len(request.GetResourceNames()) != 0 { + set := nameSet(request.GetResourceNames()) for name, resource := range resources { if set[name] { filtered = append(filtered, resource) @@ -465,7 +526,7 @@ func createResponse(ctx context.Context, request *Request, resources map[string] // CreateDeltaWatch returns a watch for a delta xDS request which implements the Simple SnapshotCache. func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() { - nodeID := cache.hash.ID(request.Node) + nodeID := cache.hash.ID(request.GetNode()) t := request.GetTypeUrl() cache.mu.Lock() @@ -473,7 +534,7 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream info, ok := cache.status[nodeID] if !ok { - info = newStatusInfo(request.Node) + info = newStatusInfo(request.GetNode()) cache.status[nodeID] = info } @@ -520,9 +581,9 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream // Respond to a delta watch with the provided snapshot value. If the response is nil, there has been no state change. func (cache *snapshotCache) respondDelta(ctx context.Context, snapshot ResourceSnapshot, request *DeltaRequest, value chan DeltaResponse, state stream.StreamState) (*RawDeltaResponse, error) { resp := createDeltaResponse(ctx, request, state, resourceContainer{ - resourceMap: snapshot.GetResources(request.TypeUrl), - versionMap: snapshot.GetVersionMap(request.TypeUrl), - systemVersion: snapshot.GetVersion(request.TypeUrl), + resourceMap: snapshot.GetResources(request.GetTypeUrl()), + versionMap: snapshot.GetVersionMap(request.GetTypeUrl()), + systemVersion: snapshot.GetVersion(request.GetTypeUrl()), }) // Only send a response if there were changes @@ -531,7 +592,7 @@ func (cache *snapshotCache) respondDelta(ctx context.Context, snapshot ResourceS if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 || (state.IsWildcard() && state.IsFirst()) { if cache.log != nil { cache.log.Debugf("node: %s, sending delta response for typeURL %s with resources: %v removed resources: %v with wildcard: %t", - request.GetNode().GetId(), request.TypeUrl, GetResourceNames(resp.Resources), resp.RemovedResources, state.IsWildcard()) + request.GetNode().GetId(), request.GetTypeUrl(), GetResourceNames(resp.Resources), resp.RemovedResources, state.IsWildcard()) } select { case value <- resp: @@ -563,7 +624,7 @@ func (cache *snapshotCache) cancelDeltaWatch(nodeID string, watchID int64) func( // Fetch implements the cache fetch function. // Fetch is called on multiple streams, so responding to individual names with the same version works. func (cache *snapshotCache) Fetch(ctx context.Context, request *Request) (Response, error) { - nodeID := cache.hash.ID(request.Node) + nodeID := cache.hash.ID(request.GetNode()) cache.mu.RLock() defer cache.mu.RUnlock() @@ -571,13 +632,13 @@ func (cache *snapshotCache) Fetch(ctx context.Context, request *Request) (Respon if snapshot, exists := cache.snapshots[nodeID]; exists { // Respond only if the request version is distinct from the current snapshot state. // It might be beneficial to hold the request since Envoy will re-attempt the refresh. - version := snapshot.GetVersion(request.TypeUrl) - if request.VersionInfo == version { + version := snapshot.GetVersion(request.GetTypeUrl()) + if request.GetVersionInfo() == version { cache.log.Warnf("skip fetch: version up to date") return nil, &types.SkipFetchError{} } - resources := snapshot.GetResourcesAndTTL(request.TypeUrl) + resources := snapshot.GetResourcesAndTTL(request.GetTypeUrl()) out := createResponse(ctx, request, resources, version, false) return out, nil } diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/status.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/status.go index 84db1f9821..dca93e02ff 100644 --- a/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/status.go +++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/status.go @@ -15,6 +15,7 @@ package cache import ( + "sort" "sync" "time" @@ -36,7 +37,7 @@ func (IDHash) ID(node *core.Node) string { if node == nil { return "" } - return node.Id + return node.GetId() } var _ NodeHash = IDHash{} @@ -65,10 +66,12 @@ type statusInfo struct { node *core.Node // watches are indexed channels for the response watches and the original requests. - watches map[int64]ResponseWatch + watches map[int64]ResponseWatch + orderedWatches keys // deltaWatches are indexed channels for the delta response watches and the original requests - deltaWatches map[int64]DeltaResponseWatch + deltaWatches map[int64]DeltaResponseWatch + orderedDeltaWatches keys // the timestamp of the last watch request lastWatchRequestTime time.Time @@ -105,9 +108,10 @@ type DeltaResponseWatch struct { // newStatusInfo initializes a status info data structure. func newStatusInfo(node *core.Node) *statusInfo { out := statusInfo{ - node: node, - watches: make(map[int64]ResponseWatch), - deltaWatches: make(map[int64]DeltaResponseWatch), + node: node, + watches: make(map[int64]ResponseWatch), + orderedWatches: make(keys, 0), + deltaWatches: make(map[int64]DeltaResponseWatch), } return &out } @@ -155,3 +159,41 @@ func (info *statusInfo) setDeltaResponseWatch(id int64, drw DeltaResponseWatch) defer info.mu.Unlock() info.deltaWatches[id] = drw } + +// orderResponseWatches will track a list of watch keys and order them if +// true is passed. +func (info *statusInfo) orderResponseWatches() { + info.orderedWatches = make(keys, len(info.watches)) + + var index int + for id, watch := range info.watches { + info.orderedWatches[index] = key{ + ID: id, + TypeURL: watch.Request.GetTypeUrl(), + } + index++ + } + + // Sort our list which we can use in the SetSnapshot functions. + // This is only run when we enable ADS on the cache. + sort.Sort(info.orderedWatches) +} + +// orderResponseDeltaWatches will track a list of delta watch keys and order them if +// true is passed. +func (info *statusInfo) orderResponseDeltaWatches() { + info.orderedDeltaWatches = make(keys, len(info.deltaWatches)) + + var index int + for id, deltaWatch := range info.deltaWatches { + info.orderedDeltaWatches[index] = key{ + ID: id, + TypeURL: deltaWatch.Request.GetTypeUrl(), + } + index++ + } + + // Sort our list which we can use in the SetSnapshot functions. + // This is only run when we enable ADS on the cache. + sort.Sort(info.orderedDeltaWatches) +} diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/ya.make b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/ya.make index 4d64296a9c..d3316316c0 100644 --- a/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/ya.make +++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/cache/v3/ya.make @@ -7,6 +7,7 @@ SRCS( delta.go linear.go mux.go + order.go resource.go resources.go simple.go @@ -16,6 +17,7 @@ SRCS( GO_TEST_SRCS( linear_test.go + order_test.go status_test.go ) |