diff options
author | robot-contrib <robot-contrib@yandex-team.com> | 2024-01-30 11:20:39 +0300 |
---|---|---|
committer | robot-contrib <robot-contrib@yandex-team.com> | 2024-01-30 12:12:51 +0300 |
commit | be737fd8956853e06bd2c4f9fcd4a85188f4c172 (patch) | |
tree | 5bd76802fac1096dfd90983c7739d50de367a79f /vendor/github.com/envoyproxy/go-control-plane/pkg/server | |
parent | fe62880c46b1f2c9fec779b0dc39f8a92ce256a5 (diff) | |
download | ydb-be737fd8956853e06bd2c4f9fcd4a85188f4c172.tar.gz |
Update vendor/github.com/envoyproxy/go-control-plane to 0.12.0
Diffstat (limited to 'vendor/github.com/envoyproxy/go-control-plane/pkg/server')
16 files changed, 580 insertions, 245 deletions
diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/config/config.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/config/config.go new file mode 100644 index 0000000000..b746acfab9 --- /dev/null +++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/config/config.go @@ -0,0 +1,25 @@ +package config + +// Opts for individual xDS implementations that can be +// utilized through the functional opts pattern. +type Opts struct { + // If true respond to ADS requests with a guaranteed resource ordering + Ordered bool +} + +func NewOpts() Opts { + return Opts{ + Ordered: false, + } +} + +// Each xDS implementation should implement their own functional opts. +// It is recommended that config values be added in this package specifically, +// but the individual opts functions should be in their respective +// implementation package so the import looks like the following: +// +// `sotw.WithOrderedADS()` +// `delta.WithOrderedADS()` +// +// this allows for easy inference as to which opt applies to what implementation. +type XDSOption func(*Opts) diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/config/doc.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/config/doc.go new file mode 100644 index 0000000000..2c85adfd5f --- /dev/null +++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/config/doc.go @@ -0,0 +1,22 @@ +/* +Config abstracts xDS server options into a unified configuration package +that allows for easy manipulation as well as unified passage of options +to individual xDS server implementations. + +This enables code reduction as well as a unified source of config. Delta +and SOTW might have similar ordered responses through ADS and rather than +duplicating the logic across server implementations, we add the options +in this package which are passed down to each individual spec. + +Each xDS implementation should implement their own functional opts. +It is recommended that config values be added in this package specifically, +but the individual opts functions should be in their respective +implementation package so the import looks like the following: + +`sotw.WithOrderedADS()` +`delta.WithOrderedADS()` + +this allows for easy inference as to which opt applies to what implementation. +*/ + +package config diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/config/ya.make b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/config/ya.make new file mode 100644 index 0000000000..e44e5bff33 --- /dev/null +++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/config/ya.make @@ -0,0 +1,10 @@ +GO_LIBRARY() + +LICENSE(Apache-2.0) + +SRCS( + config.go + doc.go +) + +END() diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/delta/v3/server.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/delta/v3/server.go index 5f10266aba..b570b19b27 100644 --- a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/delta/v3/server.go +++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/delta/v3/server.go @@ -13,6 +13,7 @@ import ( discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" "github.com/envoyproxy/go-control-plane/pkg/resource/v3" + "github.com/envoyproxy/go-control-plane/pkg/server/config" "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) @@ -30,7 +31,7 @@ type Callbacks interface { // OnStreamDeltaRequest is called once a request is received on a stream. // Returning an error will end processing and close the stream. OnStreamClosed will still be called. OnStreamDeltaRequest(int64, *discovery.DeltaDiscoveryRequest) error - // OnStreamDelatResponse is called immediately prior to sending a response on a stream. + // OnStreamDeltaResponse is called immediately prior to sending a response on a stream. OnStreamDeltaResponse(int64, *discovery.DeltaDiscoveryRequest, *discovery.DeltaDiscoveryResponse) } @@ -43,15 +44,25 @@ type server struct { // total stream count for counting bi-di streams streamCount int64 ctx context.Context + + // Local configuration flags for individual xDS implementations. + opts config.Opts } // NewServer creates a delta xDS specific server which utilizes a ConfigWatcher and delta Callbacks. -func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callbacks) Server { - return &server{ +func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callbacks, opts ...config.XDSOption) Server { + s := &server{ cache: config, callbacks: callbacks, ctx: ctx, } + + // Parse through our options + for _, opt := range opts { + opt(&s.opts) + } + + return s } func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.DeltaDiscoveryRequest, defaultTypeURL string) error { @@ -63,7 +74,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De // a collection of stack allocated watches per request type watches := newWatches() - var node = &core.Node{} + node := &core.Node{} defer func() { watches.Cancel() @@ -72,7 +83,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De } }() - // Sends a response, returns the new stream nonce + // sends a response, returns the new stream nonce send := func(resp cache.DeltaResponse) (string, error) { if resp == nil { return "", errors.New("missing response") @@ -83,13 +94,51 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De return "", err } - streamNonce = streamNonce + 1 + streamNonce++ response.Nonce = strconv.FormatInt(streamNonce, 10) if s.callbacks != nil { s.callbacks.OnStreamDeltaResponse(streamID, resp.GetDeltaRequest(), response) } - return response.Nonce, str.Send(response) + return response.GetNonce(), str.Send(response) + } + + // process a single delta response + process := func(resp cache.DeltaResponse) error { + typ := resp.GetDeltaRequest().GetTypeUrl() + if resp == deltaErrorResponse { + return status.Errorf(codes.Unavailable, typ+" watch failed") + } + + nonce, err := send(resp) + if err != nil { + return err + } + + watch := watches.deltaWatches[typ] + watch.nonce = nonce + + watch.state.SetResourceVersions(resp.GetNextVersionMap()) + watches.deltaWatches[typ] = watch + return nil + } + + // processAll purges the deltaMuxedResponses channel + processAll := func() error { + for { + select { + // We watch the multiplexed channel for incoming responses. + case resp, more := <-watches.deltaMuxedResponses: + if !more { + break + } + if err := process(resp); err != nil { + return err + } + default: + return nil + } + } } if s.callbacks != nil { @@ -102,35 +151,31 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De select { case <-s.ctx.Done(): return nil + // We watch the multiplexed channel for incoming responses. case resp, more := <-watches.deltaMuxedResponses: + // input stream ended or errored out if !more { break } - typ := resp.GetDeltaRequest().GetTypeUrl() - if resp == deltaErrorResponse { - return status.Errorf(codes.Unavailable, typ+" watch failed") - } - - nonce, err := send(resp) - if err != nil { + if err := process(resp); err != nil { return err } - - watch := watches.deltaWatches[typ] - watch.nonce = nonce - - watch.state.SetResourceVersions(resp.GetNextVersionMap()) - watches.deltaWatches[typ] = watch case req, more := <-reqCh: // input stream ended or errored out if !more { return nil } + if req == nil { return status.Errorf(codes.Unavailable, "empty request") } + // make sure all existing responses are processed prior to new requests to avoid deadlock + if err := processAll(); err != nil { + return err + } + if s.callbacks != nil { if err := s.callbacks.OnStreamDeltaRequest(streamID, req); err != nil { return err @@ -139,18 +184,18 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De // The node information might only be set on the first incoming delta discovery request, so store it here so we can // reset it on subsequent requests that omit it. - if req.Node != nil { - node = req.Node + if req.GetNode() != nil { + node = req.GetNode() } else { req.Node = node } // type URL is required for ADS but is implicit for any other xDS stream if defaultTypeURL == resource.AnyType { - if req.TypeUrl == "" { + if req.GetTypeUrl() == "" { return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") } - } else if req.TypeUrl == "" { + } else if req.GetTypeUrl() == "" { req.TypeUrl = defaultTypeURL } @@ -173,16 +218,8 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De s.subscribe(req.GetResourceNamesSubscribe(), &watch.state) s.unsubscribe(req.GetResourceNamesUnsubscribe(), &watch.state) - watch.responses = make(chan cache.DeltaResponse, 1) - watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watch.responses) + watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watches.deltaMuxedResponses) watches.deltaWatches[typeURL] = watch - - go func() { - resp, more := <-watch.responses - if more { - watches.deltaMuxedResponses <- resp - } - }() } } } diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/delta/v3/watches.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/delta/v3/watches.go index c88548388a..63c4c2d38d 100644 --- a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/delta/v3/watches.go +++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/delta/v3/watches.go @@ -17,9 +17,13 @@ type watches struct { // newWatches creates and initializes watches. func newWatches() watches { // deltaMuxedResponses needs a buffer to release go-routines populating it + // + // because deltaMuxedResponses can be populated by an update from the cache + // and a request from the client, we need to create the channel with a buffer + // size of 2x the number of types to avoid deadlocks. return watches{ deltaWatches: make(map[string]watch, int(types.UnknownType)), - deltaMuxedResponses: make(chan cache.DeltaResponse, int(types.UnknownType)), + deltaMuxedResponses: make(chan cache.DeltaResponse, int(types.UnknownType)*2), } } @@ -28,13 +32,14 @@ func (w *watches) Cancel() { for _, watch := range w.deltaWatches { watch.Cancel() } + + close(w.deltaMuxedResponses) } // watch contains the necessary modifiables for receiving resource responses type watch struct { - responses chan cache.DeltaResponse - cancel func() - nonce string + cancel func() + nonce string state stream.StreamState } @@ -44,9 +49,4 @@ func (w *watch) Cancel() { if w.cancel != nil { w.cancel() } - if w.responses != nil { - // w.responses should never be used by a producer once cancel() has been closed, so we can safely close it here - // This is needed to release resources taken by goroutines watching this channel - close(w.responses) - } } diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/delta/v3/ya.make b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/delta/v3/ya.make index 7e3abbda25..77c00272c4 100644 --- a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/delta/v3/ya.make +++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/delta/v3/ya.make @@ -11,4 +11,6 @@ GO_TEST_SRCS(watches_test.go) END() -RECURSE(gotest) +RECURSE( + gotest +) diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/rest/v3/ya.make b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/rest/v3/ya.make index d32b575a10..728337851d 100644 --- a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/rest/v3/ya.make +++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/rest/v3/ya.make @@ -2,6 +2,8 @@ GO_LIBRARY() LICENSE(Apache-2.0) -SRCS(server.go) +SRCS( + server.go +) END() diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/ads.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/ads.go new file mode 100644 index 0000000000..bbb6dd4b20 --- /dev/null +++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/ads.go @@ -0,0 +1,140 @@ +package sotw + +import ( + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "github.com/envoyproxy/go-control-plane/pkg/cache/types" + "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + "github.com/envoyproxy/go-control-plane/pkg/resource/v3" +) + +// process handles a bi-di stream request +func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRequest, defaultTypeURL string) error { + // We make a responder channel here so we can multiplex responses from the dynamic channels. + sw.watches.addWatch(resource.AnyType, &watch{ + // Create a buffered channel the size of the known resource types. + response: make(chan cache.Response, types.UnknownType), + cancel: func() { + close(sw.watches.responders[resource.AnyType].response) + }, + }) + + process := func(resp cache.Response) error { + nonce, err := sw.send(resp) + if err != nil { + return err + } + + sw.watches.responders[resp.GetRequest().GetTypeUrl()].nonce = nonce + return nil + } + + // Instead of creating a separate channel for each incoming request and abandoning the old one + // This algorithm uses (and reuses) a single channel for all request types and guarantees + // the server will send updates over the wire in an ordered fashion. + // Downside is there is no longer back pressure per resource. + // There is potential for a dropped response from the cache but this is not impactful + // to the client since SOTW version handling is global and a new sequence will be + // initiated on a new request. + processAllExcept := func(typeURL string) error { + for { + select { + // We watch the multiplexed ADS channel for incoming responses. + case res := <-sw.watches.responders[resource.AnyType].response: + if res.GetRequest().GetTypeUrl() != typeURL { + if err := process(res); err != nil { + return err + } + } + default: + return nil + } + } + } + + // This control loop strictly orders resources when running in ADS mode. + // It should be treated as a child process of the original process() loop + // and should return on close of stream or error. This will cause the + // cleanup routines in the parent process() loop to execute. + for { + select { + case <-s.ctx.Done(): + return nil + // We only watch the multiplexed channel since all values will come through from process. + case res := <-sw.watches.responders[resource.AnyType].response: + if err := process(res); err != nil { + return status.Errorf(codes.Unavailable, err.Error()) + } + case req, ok := <-reqCh: + // Input stream ended or failed. + if !ok { + return nil + } + + // Received an empty request over the request channel. Can't respond. + if req == nil { + return status.Errorf(codes.Unavailable, "empty request") + } + + // Only first request is guaranteed to hold node info so if it's missing, reassign. + if req.GetNode() != nil { + sw.node = req.GetNode() + } else { + req.Node = sw.node + } + + // Nonces can be reused across streams; we verify nonce only if nonce is not initialized. + nonce := req.GetResponseNonce() + + // type URL is required for ADS but is implicit for xDS + if defaultTypeURL == resource.AnyType { + if req.GetTypeUrl() == "" { + return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") + } + } + + if s.callbacks != nil { + if err := s.callbacks.OnStreamRequest(sw.ID, req); err != nil { + return err + } + } + + if lastResponse, ok := sw.lastDiscoveryResponses[req.GetTypeUrl()]; ok { + if lastResponse.nonce == "" || lastResponse.nonce == nonce { + // Let's record Resource names that a client has received. + sw.streamState.SetKnownResourceNames(req.GetTypeUrl(), lastResponse.resources) + } + } + + typeURL := req.GetTypeUrl() + // Use the multiplexed channel for new watches. + responder := sw.watches.responders[resource.AnyType].response + if w, ok := sw.watches.responders[typeURL]; ok { + // We've found a pre-existing watch, lets check and update if needed. + // If these requirements aren't satisfied, leave an open watch. + if w.nonce == "" || w.nonce == nonce { + w.close() + + // Only process if we have an existing watch otherwise go ahead and create. + if err := processAllExcept(typeURL); err != nil { + return err + } + + sw.watches.addWatch(typeURL, &watch{ + cancel: s.cache.CreateWatch(req, sw.streamState, responder), + response: responder, + }) + } + } else { + // No pre-existing watch exists, let's create one. + // We need to precompute the watches first then open a watch in the cache. + sw.watches.addWatch(typeURL, &watch{ + cancel: s.cache.CreateWatch(req, sw.streamState, responder), + response: responder, + }) + } + } + } +} diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/server.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/server.go index 91681237c9..f5be0c57a9 100644 --- a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/server.go +++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/server.go @@ -18,17 +18,13 @@ package sotw import ( "context" "errors" - "reflect" "strconv" "sync/atomic" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" - "github.com/envoyproxy/go-control-plane/pkg/resource/v3" + "github.com/envoyproxy/go-control-plane/pkg/server/config" "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) @@ -50,8 +46,23 @@ type Callbacks interface { } // NewServer creates handlers from a config watcher and callbacks. -func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callbacks) Server { - return &server{cache: config, callbacks: callbacks, ctx: ctx} +func NewServer(ctx context.Context, cw cache.ConfigWatcher, callbacks Callbacks, opts ...config.XDSOption) Server { + s := &server{cache: cw, callbacks: callbacks, ctx: ctx, opts: config.NewOpts()} + + // Parse through our options + for _, opt := range opts { + opt(&s.opts) + } + + return s +} + +// WithOrderedADS enables the internal flag to order responses +// strictly. +func WithOrderedADS() config.XDSOption { + return func(o *config.Opts) { + o.Ordered = true + } } type server struct { @@ -61,177 +72,78 @@ type server struct { // streamCount for counting bi-di streams streamCount int64 -} -// Discovery response that is sent over GRPC stream -// We need to record what resource names are already sent to a client -// So if the client requests a new name we can respond back -// regardless current snapshot version (even if it is not changed yet) -type lastDiscoveryResponse struct { - nonce string - resources map[string]struct{} + // Local configuration flags for individual xDS implementations. + opts config.Opts } -// process handles a bi-di stream request -func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryRequest, defaultTypeURL string) error { - // increment stream count - streamID := atomic.AddInt64(&s.streamCount, 1) - - // unique nonce generator for req-resp pairs per xDS stream; the server - // ignores stale nonces. nonce is only modified within send() function. - var streamNonce int64 - - streamState := stream.NewStreamState(false, map[string]string{}) - lastDiscoveryResponses := map[string]lastDiscoveryResponse{} - - // a collection of stack allocated watches per request type - watches := newWatches() - - // node may only be set on the first discovery request - var node = &core.Node{} - - defer func() { - watches.close() - if s.callbacks != nil { - s.callbacks.OnStreamClosed(streamID, node) - } - }() - - // sends a response by serializing to protobuf Any - send := func(resp cache.Response) (string, error) { - if resp == nil { - return "", errors.New("missing response") - } +// streamWrapper abstracts critical data passed around a stream for to be accessed +// through various code paths in the xDS lifecycle. This comes in handy when dealing +// with varying implementation types such as ordered vs unordered resource handling. +type streamWrapper struct { + stream stream.Stream // parent stream object + ID int64 // stream ID in relation to total stream count + nonce int64 // nonce per stream + watches watches // collection of stack allocated watchers per request type + callbacks Callbacks // callbacks for performing actions through stream lifecycle + + node *core.Node // registered xDS client + + // The below fields are used for tracking resource + // cache state and should be maintained per stream. + streamState stream.StreamState + lastDiscoveryResponses map[string]lastDiscoveryResponse +} - out, err := resp.GetDiscoveryResponse() - if err != nil { - return "", err - } +// Send packages the necessary resources before sending on the gRPC stream, +// and sets the current state of the world. +func (s *streamWrapper) send(resp cache.Response) (string, error) { + if resp == nil { + return "", errors.New("missing response") + } - // increment nonce - streamNonce = streamNonce + 1 - out.Nonce = strconv.FormatInt(streamNonce, 10) + out, err := resp.GetDiscoveryResponse() + if err != nil { + return "", err + } - lastResponse := lastDiscoveryResponse{ - nonce: out.Nonce, - resources: make(map[string]struct{}), - } - for _, r := range resp.GetRequest().ResourceNames { - lastResponse.resources[r] = struct{}{} - } - lastDiscoveryResponses[resp.GetRequest().TypeUrl] = lastResponse + // increment nonce and convert it to base10 + out.Nonce = strconv.FormatInt(atomic.AddInt64(&s.nonce, 1), 10) - if s.callbacks != nil { - s.callbacks.OnStreamResponse(resp.GetContext(), streamID, resp.GetRequest(), out) - } - return out.Nonce, str.Send(out) + lastResponse := lastDiscoveryResponse{ + nonce: out.GetNonce(), + resources: make(map[string]struct{}), } + for _, r := range resp.GetRequest().GetResourceNames() { + lastResponse.resources[r] = struct{}{} + } + s.lastDiscoveryResponses[resp.GetRequest().GetTypeUrl()] = lastResponse + // Register with the callbacks provided that we are sending the response. if s.callbacks != nil { - if err := s.callbacks.OnStreamOpen(str.Context(), streamID, defaultTypeURL); err != nil { - return err - } + s.callbacks.OnStreamResponse(resp.GetContext(), s.ID, resp.GetRequest(), out) } - // recompute dynamic channels for this stream - watches.recompute(s.ctx, reqCh) - - for { - // The list of select cases looks like this: - // 0: <- ctx.Done - // 1: <- reqCh - // 2...: per type watches - index, value, ok := reflect.Select(watches.cases) - switch index { - // ctx.Done() -> if we receive a value here we return as no further computation is needed - case 0: - return nil - // Case 1 handles any request inbound on the stream and handles all initialization as needed - case 1: - // input stream ended or errored out - if !ok { - return nil - } - - req := value.Interface().(*discovery.DiscoveryRequest) - if req == nil { - return status.Errorf(codes.Unavailable, "empty request") - } - - // node field in discovery request is delta-compressed - if req.Node != nil { - node = req.Node - } else { - req.Node = node - } - - // nonces can be reused across streams; we verify nonce only if nonce is not initialized - nonce := req.GetResponseNonce() - - // type URL is required for ADS but is implicit for xDS - if defaultTypeURL == resource.AnyType { - if req.TypeUrl == "" { - return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") - } - } else if req.TypeUrl == "" { - req.TypeUrl = defaultTypeURL - } - - if s.callbacks != nil { - if err := s.callbacks.OnStreamRequest(streamID, req); err != nil { - return err - } - } - - if lastResponse, ok := lastDiscoveryResponses[req.TypeUrl]; ok { - if lastResponse.nonce == "" || lastResponse.nonce == nonce { - // Let's record Resource names that a client has received. - streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.resources) - } - } - - typeURL := req.GetTypeUrl() - responder := make(chan cache.Response, 1) - if w, ok := watches.responders[typeURL]; ok { - // We've found a pre-existing watch, lets check and update if needed. - // If these requirements aren't satisfied, leave an open watch. - if w.nonce == "" || w.nonce == nonce { - w.close() - - watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, streamState, responder), - response: responder, - }) - } - } else { - // No pre-existing watch exists, let's create one. - // We need to precompute the watches first then open a watch in the cache. - watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, streamState, responder), - response: responder, - }) - } - - // Recompute the dynamic select cases for this stream. - watches.recompute(s.ctx, reqCh) - default: - // Channel n -> these are the dynamic list of responders that correspond to the stream request typeURL - if !ok { - // Receiver channel was closed. TODO(jpeach): probably cancel the watch or something? - return status.Errorf(codes.Unavailable, "resource watch %d -> failed", index) - } - - res := value.Interface().(cache.Response) - nonce, err := send(res) - if err != nil { - return err - } + return out.GetNonce(), s.stream.Send(out) +} - watches.responders[res.GetRequest().TypeUrl].nonce = nonce - } +// Shutdown closes all open watches, and notifies API consumers the stream has closed. +func (s *streamWrapper) shutdown() { + s.watches.close() + if s.callbacks != nil { + s.callbacks.OnStreamClosed(s.ID, s.node) } } +// Discovery response that is sent over GRPC stream. +// We need to record what resource names are already sent to a client +// So if the client requests a new name we can respond back +// regardless current snapshot version (even if it is not changed yet) +type lastDiscoveryResponse struct { + nonce string + resources map[string]struct{} +} + // StreamHandler converts a blocking read call to channels and initiates stream processing func (s *server) StreamHandler(stream stream.Stream, typeURL string) error { // a channel for receiving incoming requests diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/watches.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/watches.go index 45670d6a91..d781f663e6 100644 --- a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/watches.go +++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/watches.go @@ -60,7 +60,7 @@ func (w *watches) recompute(ctx context.Context, req <-chan *discovery.Discovery } } -// watch contains the necessary modifiables for receiving resource responses +// watch contains the necessary modifiable data for receiving resource responses type watch struct { cancel func() nonce string diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/xds.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/xds.go new file mode 100644 index 0000000000..3b24dec409 --- /dev/null +++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/xds.go @@ -0,0 +1,166 @@ +package sotw + +import ( + "reflect" + "sync/atomic" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + "github.com/envoyproxy/go-control-plane/pkg/resource/v3" + "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" +) + +// process handles a bi-di stream request +func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryRequest, defaultTypeURL string) error { + // create our streamWrapper which can be passed down to sub control loops. + // this is useful for abstracting critical information for various types of + // xDS resource processing. + sw := streamWrapper{ + stream: str, + ID: atomic.AddInt64(&s.streamCount, 1), // increment stream count + callbacks: s.callbacks, + node: &core.Node{}, // node may only be set on the first discovery request + + // a collection of stack allocated watches per request type. + watches: newWatches(), + streamState: stream.NewStreamState(false, map[string]string{}), + lastDiscoveryResponses: make(map[string]lastDiscoveryResponse), + } + + // cleanup once our stream has ended. + defer sw.shutdown() + + if s.callbacks != nil { + if err := s.callbacks.OnStreamOpen(str.Context(), sw.ID, defaultTypeURL); err != nil { + return err + } + } + + // do an initial recompute so we can load the first 2 channels: + // <-reqCh + // s.ctx.Done() + sw.watches.recompute(s.ctx, reqCh) + + for { + // The list of select cases looks like this: + // 0: <- ctx.Done + // 1: <- reqCh + // 2...: per type watches + index, value, ok := reflect.Select(sw.watches.cases) + switch index { + // ctx.Done() -> if we receive a value here we return + // as no further computation is needed + case 0: + return nil + // Case 1 handles any request inbound on the stream + // and handles all initialization as needed + case 1: + // input stream ended or failed + if !ok { + return nil + } + + req := value.Interface().(*discovery.DiscoveryRequest) + if req == nil { + return status.Errorf(codes.Unavailable, "empty request") + } + + // Only first request is guaranteed to hold node info so if it's missing, reassign. + if req.GetNode() != nil { + sw.node = req.GetNode() + } else { + req.Node = sw.node + } + + // nonces can be reused across streams; we verify nonce only if nonce is not initialized + nonce := req.GetResponseNonce() + + // type URL is required for ADS but is implicit for xDS + if defaultTypeURL == resource.AnyType { + if req.GetTypeUrl() == "" { + return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") + } + + // When using ADS we need to order responses. + // This is guaranteed in the xDS protocol specification + // as ADS is required to be eventually consistent. + // More details can be found here if interested: + // https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#eventual-consistency-considerations + if s.opts.Ordered { + // send our first request on the stream again so it doesn't get + // lost in processing on the new control loop + // There's a risk (albeit very limited) that we'd end up handling requests in the wrong order here. + // If envoy is using ADS for endpoints, and clusters are added in short sequence, + // the following request might include a new cluster and be discarded as the previous one will be handled after. + go func() { + reqCh <- req + }() + + // Trigger a different code path specifically for ADS. + // We want resource ordering so things don't get sent before they should. + // This is a blocking call and will exit the process function + // on successful completion. + return s.processADS(&sw, reqCh, defaultTypeURL) + } + } else if req.GetTypeUrl() == "" { + req.TypeUrl = defaultTypeURL + } + + if s.callbacks != nil { + if err := s.callbacks.OnStreamRequest(sw.ID, req); err != nil { + return err + } + } + + if lastResponse, ok := sw.lastDiscoveryResponses[req.GetTypeUrl()]; ok { + if lastResponse.nonce == "" || lastResponse.nonce == nonce { + // Let's record Resource names that a client has received. + sw.streamState.SetKnownResourceNames(req.GetTypeUrl(), lastResponse.resources) + } + } + + typeURL := req.GetTypeUrl() + responder := make(chan cache.Response, 1) + if w, ok := sw.watches.responders[typeURL]; ok { + // We've found a pre-existing watch, lets check and update if needed. + // If these requirements aren't satisfied, leave an open watch. + if w.nonce == "" || w.nonce == nonce { + w.close() + + sw.watches.addWatch(typeURL, &watch{ + cancel: s.cache.CreateWatch(req, sw.streamState, responder), + response: responder, + }) + } + } else { + // No pre-existing watch exists, let's create one. + // We need to precompute the watches first then open a watch in the cache. + sw.watches.addWatch(typeURL, &watch{ + cancel: s.cache.CreateWatch(req, sw.streamState, responder), + response: responder, + }) + } + + // Recompute the dynamic select cases for this stream. + sw.watches.recompute(s.ctx, reqCh) + default: + // Channel n -> these are the dynamic list of responders that correspond to the stream request typeURL + if !ok { + // Receiver channel was closed. TODO(jpeach): probably cancel the watch or something? + return status.Errorf(codes.Unavailable, "resource watch %d -> failed", index) + } + + res := value.Interface().(cache.Response) + nonce, err := sw.send(res) + if err != nil { + return err + } + + sw.watches.responders[res.GetRequest().GetTypeUrl()].nonce = nonce + } + } +} diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/ya.make b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/ya.make index 48fb263441..6d7dad878f 100644 --- a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/ya.make +++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/ya.make @@ -3,8 +3,10 @@ GO_LIBRARY() LICENSE(Apache-2.0) SRCS( + ads.go server.go watches.go + xds.go ) END() diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/stream/v3/stream.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/stream/v3/stream.go index b5832b7d58..1664a941e0 100644 --- a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/stream/v3/stream.go +++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/stream/v3/stream.go @@ -6,7 +6,7 @@ import ( discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" ) -// Generic RPC stream. +// Generic RPC stream for state of the world. type Stream interface { grpc.ServerStream @@ -14,6 +14,7 @@ type Stream interface { Recv() (*discovery.DiscoveryRequest, error) } +// Generic RPC Stream for the delta based xDS protocol. type DeltaStream interface { grpc.ServerStream @@ -21,7 +22,7 @@ type DeltaStream interface { Recv() (*discovery.DeltaDiscoveryRequest, error) } -// StreamState will keep track of resource state per type on a stream. +// StreamState will keep track of resource cache state per type on a stream. type StreamState struct { // nolint:golint,revive // Indicates whether the delta stream currently has a wildcard watch wildcard bool @@ -34,11 +35,32 @@ type StreamState struct { // nolint:golint,revive // This field stores the last state sent to the client. resourceVersions map[string]string - // knownResourceNames contains resource names that a client has received previously + // knownResourceNames contains resource names that a client has received previously (SOTW). knownResourceNames map[string]map[string]struct{} - // indicates whether the object has been modified since its creation + // First indicates whether the StreamState has been modified since its creation first bool + + // Ordered indicates whether we want an ordered ADS stream or not + ordered bool +} + +// NewStreamState initializes a stream state. +func NewStreamState(wildcard bool, initialResourceVersions map[string]string) StreamState { + state := StreamState{ + wildcard: wildcard, + subscribedResourceNames: map[string]struct{}{}, + resourceVersions: initialResourceVersions, + first: true, + knownResourceNames: map[string]map[string]struct{}{}, + ordered: false, // Ordered comes from the first request since that's when we discover if they want ADS + } + + if initialResourceVersions == nil { + state.resourceVersions = make(map[string]string) + } + + return state } // GetSubscribedResourceNames returns the list of resources currently explicitly subscribed to @@ -71,31 +93,43 @@ func (s *StreamState) WatchesResources(resourceNames map[string]struct{}) bool { return false } +func (s *StreamState) SetWildcard(wildcard bool) { + s.wildcard = wildcard +} + +// GetResourceVersions returns a map of current resources grouped by type URL. func (s *StreamState) GetResourceVersions() map[string]string { return s.resourceVersions } +// SetResourceVersions sets a list of resource versions by type URL and removes the flag +// of "first" since we can safely assume another request has come through the stream. func (s *StreamState) SetResourceVersions(resourceVersions map[string]string) { s.first = false s.resourceVersions = resourceVersions } +// IsFirst returns whether or not the state of the stream is based upon the initial request. func (s *StreamState) IsFirst() bool { return s.first } -func (s *StreamState) SetWildcard(wildcard bool) { - s.wildcard = wildcard -} - +// IsWildcard returns whether or not an xDS client requested in wildcard mode on the initial request. func (s *StreamState) IsWildcard() bool { return s.wildcard } +// GetKnownResourceNames returns the current known list of resources on a SOTW stream. +func (s *StreamState) GetKnownResourceNames(url string) map[string]struct{} { + return s.knownResourceNames[url] +} + +// SetKnownResourceNames sets a list of resource names in a stream utilizing the SOTW protocol. func (s *StreamState) SetKnownResourceNames(url string, names map[string]struct{}) { s.knownResourceNames[url] = names } +// SetKnownResourceNamesAsList is a helper function to set resource names as a slice input. func (s *StreamState) SetKnownResourceNamesAsList(url string, names []string) { m := map[string]struct{}{} for _, name := range names { @@ -103,24 +137,3 @@ func (s *StreamState) SetKnownResourceNamesAsList(url string, names []string) { } s.knownResourceNames[url] = m } - -func (s *StreamState) GetKnownResourceNames(url string) map[string]struct{} { - return s.knownResourceNames[url] -} - -// NewStreamState initializes a stream state. -func NewStreamState(wildcard bool, initialResourceVersions map[string]string) StreamState { - state := StreamState{ - wildcard: wildcard, - subscribedResourceNames: map[string]struct{}{}, - resourceVersions: initialResourceVersions, - first: true, - knownResourceNames: map[string]map[string]struct{}{}, - } - - if initialResourceVersions == nil { - state.resourceVersions = make(map[string]string) - } - - return state -} diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/stream/v3/ya.make b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/stream/v3/ya.make index 6cb4c649be..ff0a7e670b 100644 --- a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/stream/v3/ya.make +++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/stream/v3/ya.make @@ -2,6 +2,8 @@ GO_LIBRARY() LICENSE(Apache-2.0) -SRCS(stream.go) +SRCS( + stream.go +) END() diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/v3/server.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/v3/server.go index 79b66881c4..c3e91ee032 100644 --- a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/v3/server.go +++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/v3/server.go @@ -21,6 +21,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "github.com/envoyproxy/go-control-plane/pkg/server/config" "github.com/envoyproxy/go-control-plane/pkg/server/delta/v3" "github.com/envoyproxy/go-control-plane/pkg/server/rest/v3" "github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3" @@ -29,7 +30,6 @@ import ( core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" clusterservice "github.com/envoyproxy/go-control-plane/envoy/service/cluster/v3" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" - discoverygrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" endpointservice "github.com/envoyproxy/go-control-plane/envoy/service/endpoint/v3" extensionconfigservice "github.com/envoyproxy/go-control-plane/envoy/service/extension/v3" listenerservice "github.com/envoyproxy/go-control-plane/envoy/service/listener/v3" @@ -49,7 +49,7 @@ type Server interface { routeservice.ScopedRoutesDiscoveryServiceServer routeservice.VirtualHostDiscoveryServiceServer listenerservice.ListenerDiscoveryServiceServer - discoverygrpc.AggregatedDiscoveryServiceServer + discovery.AggregatedDiscoveryServiceServer secretservice.SecretDiscoveryServiceServer runtimeservice.RuntimeDiscoveryServiceServer extensionconfigservice.ExtensionConfigDiscoveryServiceServer @@ -165,10 +165,10 @@ func (c CallbackFuncs) OnFetchResponse(req *discovery.DiscoveryRequest, resp *di } // NewServer creates handlers from a config watcher and callbacks. -func NewServer(ctx context.Context, config cache.Cache, callbacks Callbacks) Server { +func NewServer(ctx context.Context, config cache.Cache, callbacks Callbacks, opts ...config.XDSOption) Server { return NewServerAdvanced(rest.NewServer(config, callbacks), - sotw.NewServer(ctx, config, callbacks), - delta.NewServer(ctx, config, callbacks), + sotw.NewServer(ctx, config, callbacks, opts...), + delta.NewServer(ctx, config, callbacks, opts...), ) } @@ -186,7 +186,7 @@ func (s *server) StreamHandler(stream stream.Stream, typeURL string) error { return s.sotw.StreamHandler(stream, typeURL) } -func (s *server) StreamAggregatedResources(stream discoverygrpc.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error { +func (s *server) StreamAggregatedResources(stream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error { return s.StreamHandler(stream, resource.AnyType) } @@ -311,7 +311,7 @@ func (s *server) DeltaStreamHandler(stream stream.DeltaStream, typeURL string) e return s.delta.DeltaStreamHandler(stream, typeURL) } -func (s *server) DeltaAggregatedResources(stream discoverygrpc.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error { +func (s *server) DeltaAggregatedResources(stream discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error { return s.DeltaStreamHandler(stream, resource.AnyType) } diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/v3/ya.make b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/v3/ya.make index 26fe186fa4..96ad36cef8 100644 --- a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/v3/ya.make +++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/v3/ya.make @@ -15,4 +15,6 @@ GO_XTEST_SRCS( END() -RECURSE(gotest) +RECURSE( + gotest +) |