aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/envoyproxy/go-control-plane/pkg/server
diff options
context:
space:
mode:
authorrobot-contrib <robot-contrib@yandex-team.com>2024-01-30 11:20:39 +0300
committerrobot-contrib <robot-contrib@yandex-team.com>2024-01-30 12:12:51 +0300
commitbe737fd8956853e06bd2c4f9fcd4a85188f4c172 (patch)
tree5bd76802fac1096dfd90983c7739d50de367a79f /vendor/github.com/envoyproxy/go-control-plane/pkg/server
parentfe62880c46b1f2c9fec779b0dc39f8a92ce256a5 (diff)
downloadydb-be737fd8956853e06bd2c4f9fcd4a85188f4c172.tar.gz
Update vendor/github.com/envoyproxy/go-control-plane to 0.12.0
Diffstat (limited to 'vendor/github.com/envoyproxy/go-control-plane/pkg/server')
-rw-r--r--vendor/github.com/envoyproxy/go-control-plane/pkg/server/config/config.go25
-rw-r--r--vendor/github.com/envoyproxy/go-control-plane/pkg/server/config/doc.go22
-rw-r--r--vendor/github.com/envoyproxy/go-control-plane/pkg/server/config/ya.make10
-rw-r--r--vendor/github.com/envoyproxy/go-control-plane/pkg/server/delta/v3/server.go103
-rw-r--r--vendor/github.com/envoyproxy/go-control-plane/pkg/server/delta/v3/watches.go18
-rw-r--r--vendor/github.com/envoyproxy/go-control-plane/pkg/server/delta/v3/ya.make4
-rw-r--r--vendor/github.com/envoyproxy/go-control-plane/pkg/server/rest/v3/ya.make4
-rw-r--r--vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/ads.go140
-rw-r--r--vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/server.go236
-rw-r--r--vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/watches.go2
-rw-r--r--vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/xds.go166
-rw-r--r--vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/ya.make2
-rw-r--r--vendor/github.com/envoyproxy/go-control-plane/pkg/server/stream/v3/stream.go71
-rw-r--r--vendor/github.com/envoyproxy/go-control-plane/pkg/server/stream/v3/ya.make4
-rw-r--r--vendor/github.com/envoyproxy/go-control-plane/pkg/server/v3/server.go14
-rw-r--r--vendor/github.com/envoyproxy/go-control-plane/pkg/server/v3/ya.make4
16 files changed, 580 insertions, 245 deletions
diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/config/config.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/config/config.go
new file mode 100644
index 0000000000..b746acfab9
--- /dev/null
+++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/config/config.go
@@ -0,0 +1,25 @@
+package config
+
+// Opts for individual xDS implementations that can be
+// utilized through the functional opts pattern.
+type Opts struct {
+ // If true respond to ADS requests with a guaranteed resource ordering
+ Ordered bool
+}
+
+func NewOpts() Opts {
+ return Opts{
+ Ordered: false,
+ }
+}
+
+// Each xDS implementation should implement their own functional opts.
+// It is recommended that config values be added in this package specifically,
+// but the individual opts functions should be in their respective
+// implementation package so the import looks like the following:
+//
+// `sotw.WithOrderedADS()`
+// `delta.WithOrderedADS()`
+//
+// this allows for easy inference as to which opt applies to what implementation.
+type XDSOption func(*Opts)
diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/config/doc.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/config/doc.go
new file mode 100644
index 0000000000..2c85adfd5f
--- /dev/null
+++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/config/doc.go
@@ -0,0 +1,22 @@
+/*
+Config abstracts xDS server options into a unified configuration package
+that allows for easy manipulation as well as unified passage of options
+to individual xDS server implementations.
+
+This enables code reduction as well as a unified source of config. Delta
+and SOTW might have similar ordered responses through ADS and rather than
+duplicating the logic across server implementations, we add the options
+in this package which are passed down to each individual spec.
+
+Each xDS implementation should implement their own functional opts.
+It is recommended that config values be added in this package specifically,
+but the individual opts functions should be in their respective
+implementation package so the import looks like the following:
+
+`sotw.WithOrderedADS()`
+`delta.WithOrderedADS()`
+
+this allows for easy inference as to which opt applies to what implementation.
+*/
+
+package config
diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/config/ya.make b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/config/ya.make
new file mode 100644
index 0000000000..e44e5bff33
--- /dev/null
+++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/config/ya.make
@@ -0,0 +1,10 @@
+GO_LIBRARY()
+
+LICENSE(Apache-2.0)
+
+SRCS(
+ config.go
+ doc.go
+)
+
+END()
diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/delta/v3/server.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/delta/v3/server.go
index 5f10266aba..b570b19b27 100644
--- a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/delta/v3/server.go
+++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/delta/v3/server.go
@@ -13,6 +13,7 @@ import (
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
+ "github.com/envoyproxy/go-control-plane/pkg/server/config"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)
@@ -30,7 +31,7 @@ type Callbacks interface {
// OnStreamDeltaRequest is called once a request is received on a stream.
// Returning an error will end processing and close the stream. OnStreamClosed will still be called.
OnStreamDeltaRequest(int64, *discovery.DeltaDiscoveryRequest) error
- // OnStreamDelatResponse is called immediately prior to sending a response on a stream.
+ // OnStreamDeltaResponse is called immediately prior to sending a response on a stream.
OnStreamDeltaResponse(int64, *discovery.DeltaDiscoveryRequest, *discovery.DeltaDiscoveryResponse)
}
@@ -43,15 +44,25 @@ type server struct {
// total stream count for counting bi-di streams
streamCount int64
ctx context.Context
+
+ // Local configuration flags for individual xDS implementations.
+ opts config.Opts
}
// NewServer creates a delta xDS specific server which utilizes a ConfigWatcher and delta Callbacks.
-func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callbacks) Server {
- return &server{
+func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callbacks, opts ...config.XDSOption) Server {
+ s := &server{
cache: config,
callbacks: callbacks,
ctx: ctx,
}
+
+ // Parse through our options
+ for _, opt := range opts {
+ opt(&s.opts)
+ }
+
+ return s
}
func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.DeltaDiscoveryRequest, defaultTypeURL string) error {
@@ -63,7 +74,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
// a collection of stack allocated watches per request type
watches := newWatches()
- var node = &core.Node{}
+ node := &core.Node{}
defer func() {
watches.Cancel()
@@ -72,7 +83,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
}
}()
- // Sends a response, returns the new stream nonce
+ // sends a response, returns the new stream nonce
send := func(resp cache.DeltaResponse) (string, error) {
if resp == nil {
return "", errors.New("missing response")
@@ -83,13 +94,51 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
return "", err
}
- streamNonce = streamNonce + 1
+ streamNonce++
response.Nonce = strconv.FormatInt(streamNonce, 10)
if s.callbacks != nil {
s.callbacks.OnStreamDeltaResponse(streamID, resp.GetDeltaRequest(), response)
}
- return response.Nonce, str.Send(response)
+ return response.GetNonce(), str.Send(response)
+ }
+
+ // process a single delta response
+ process := func(resp cache.DeltaResponse) error {
+ typ := resp.GetDeltaRequest().GetTypeUrl()
+ if resp == deltaErrorResponse {
+ return status.Errorf(codes.Unavailable, typ+" watch failed")
+ }
+
+ nonce, err := send(resp)
+ if err != nil {
+ return err
+ }
+
+ watch := watches.deltaWatches[typ]
+ watch.nonce = nonce
+
+ watch.state.SetResourceVersions(resp.GetNextVersionMap())
+ watches.deltaWatches[typ] = watch
+ return nil
+ }
+
+ // processAll purges the deltaMuxedResponses channel
+ processAll := func() error {
+ for {
+ select {
+ // We watch the multiplexed channel for incoming responses.
+ case resp, more := <-watches.deltaMuxedResponses:
+ if !more {
+ break
+ }
+ if err := process(resp); err != nil {
+ return err
+ }
+ default:
+ return nil
+ }
+ }
}
if s.callbacks != nil {
@@ -102,35 +151,31 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
select {
case <-s.ctx.Done():
return nil
+ // We watch the multiplexed channel for incoming responses.
case resp, more := <-watches.deltaMuxedResponses:
+ // input stream ended or errored out
if !more {
break
}
- typ := resp.GetDeltaRequest().GetTypeUrl()
- if resp == deltaErrorResponse {
- return status.Errorf(codes.Unavailable, typ+" watch failed")
- }
-
- nonce, err := send(resp)
- if err != nil {
+ if err := process(resp); err != nil {
return err
}
-
- watch := watches.deltaWatches[typ]
- watch.nonce = nonce
-
- watch.state.SetResourceVersions(resp.GetNextVersionMap())
- watches.deltaWatches[typ] = watch
case req, more := <-reqCh:
// input stream ended or errored out
if !more {
return nil
}
+
if req == nil {
return status.Errorf(codes.Unavailable, "empty request")
}
+ // make sure all existing responses are processed prior to new requests to avoid deadlock
+ if err := processAll(); err != nil {
+ return err
+ }
+
if s.callbacks != nil {
if err := s.callbacks.OnStreamDeltaRequest(streamID, req); err != nil {
return err
@@ -139,18 +184,18 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
// The node information might only be set on the first incoming delta discovery request, so store it here so we can
// reset it on subsequent requests that omit it.
- if req.Node != nil {
- node = req.Node
+ if req.GetNode() != nil {
+ node = req.GetNode()
} else {
req.Node = node
}
// type URL is required for ADS but is implicit for any other xDS stream
if defaultTypeURL == resource.AnyType {
- if req.TypeUrl == "" {
+ if req.GetTypeUrl() == "" {
return status.Errorf(codes.InvalidArgument, "type URL is required for ADS")
}
- } else if req.TypeUrl == "" {
+ } else if req.GetTypeUrl() == "" {
req.TypeUrl = defaultTypeURL
}
@@ -173,16 +218,8 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
s.subscribe(req.GetResourceNamesSubscribe(), &watch.state)
s.unsubscribe(req.GetResourceNamesUnsubscribe(), &watch.state)
- watch.responses = make(chan cache.DeltaResponse, 1)
- watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watch.responses)
+ watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watches.deltaMuxedResponses)
watches.deltaWatches[typeURL] = watch
-
- go func() {
- resp, more := <-watch.responses
- if more {
- watches.deltaMuxedResponses <- resp
- }
- }()
}
}
}
diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/delta/v3/watches.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/delta/v3/watches.go
index c88548388a..63c4c2d38d 100644
--- a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/delta/v3/watches.go
+++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/delta/v3/watches.go
@@ -17,9 +17,13 @@ type watches struct {
// newWatches creates and initializes watches.
func newWatches() watches {
// deltaMuxedResponses needs a buffer to release go-routines populating it
+ //
+ // because deltaMuxedResponses can be populated by an update from the cache
+ // and a request from the client, we need to create the channel with a buffer
+ // size of 2x the number of types to avoid deadlocks.
return watches{
deltaWatches: make(map[string]watch, int(types.UnknownType)),
- deltaMuxedResponses: make(chan cache.DeltaResponse, int(types.UnknownType)),
+ deltaMuxedResponses: make(chan cache.DeltaResponse, int(types.UnknownType)*2),
}
}
@@ -28,13 +32,14 @@ func (w *watches) Cancel() {
for _, watch := range w.deltaWatches {
watch.Cancel()
}
+
+ close(w.deltaMuxedResponses)
}
// watch contains the necessary modifiables for receiving resource responses
type watch struct {
- responses chan cache.DeltaResponse
- cancel func()
- nonce string
+ cancel func()
+ nonce string
state stream.StreamState
}
@@ -44,9 +49,4 @@ func (w *watch) Cancel() {
if w.cancel != nil {
w.cancel()
}
- if w.responses != nil {
- // w.responses should never be used by a producer once cancel() has been closed, so we can safely close it here
- // This is needed to release resources taken by goroutines watching this channel
- close(w.responses)
- }
}
diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/delta/v3/ya.make b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/delta/v3/ya.make
index 7e3abbda25..77c00272c4 100644
--- a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/delta/v3/ya.make
+++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/delta/v3/ya.make
@@ -11,4 +11,6 @@ GO_TEST_SRCS(watches_test.go)
END()
-RECURSE(gotest)
+RECURSE(
+ gotest
+)
diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/rest/v3/ya.make b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/rest/v3/ya.make
index d32b575a10..728337851d 100644
--- a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/rest/v3/ya.make
+++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/rest/v3/ya.make
@@ -2,6 +2,8 @@ GO_LIBRARY()
LICENSE(Apache-2.0)
-SRCS(server.go)
+SRCS(
+ server.go
+)
END()
diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/ads.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/ads.go
new file mode 100644
index 0000000000..bbb6dd4b20
--- /dev/null
+++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/ads.go
@@ -0,0 +1,140 @@
+package sotw
+
+import (
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+
+ discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
+ "github.com/envoyproxy/go-control-plane/pkg/cache/types"
+ "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
+ "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
+)
+
+// process handles a bi-di stream request
+func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRequest, defaultTypeURL string) error {
+ // We make a responder channel here so we can multiplex responses from the dynamic channels.
+ sw.watches.addWatch(resource.AnyType, &watch{
+ // Create a buffered channel the size of the known resource types.
+ response: make(chan cache.Response, types.UnknownType),
+ cancel: func() {
+ close(sw.watches.responders[resource.AnyType].response)
+ },
+ })
+
+ process := func(resp cache.Response) error {
+ nonce, err := sw.send(resp)
+ if err != nil {
+ return err
+ }
+
+ sw.watches.responders[resp.GetRequest().GetTypeUrl()].nonce = nonce
+ return nil
+ }
+
+ // Instead of creating a separate channel for each incoming request and abandoning the old one
+ // This algorithm uses (and reuses) a single channel for all request types and guarantees
+ // the server will send updates over the wire in an ordered fashion.
+ // Downside is there is no longer back pressure per resource.
+ // There is potential for a dropped response from the cache but this is not impactful
+ // to the client since SOTW version handling is global and a new sequence will be
+ // initiated on a new request.
+ processAllExcept := func(typeURL string) error {
+ for {
+ select {
+ // We watch the multiplexed ADS channel for incoming responses.
+ case res := <-sw.watches.responders[resource.AnyType].response:
+ if res.GetRequest().GetTypeUrl() != typeURL {
+ if err := process(res); err != nil {
+ return err
+ }
+ }
+ default:
+ return nil
+ }
+ }
+ }
+
+ // This control loop strictly orders resources when running in ADS mode.
+ // It should be treated as a child process of the original process() loop
+ // and should return on close of stream or error. This will cause the
+ // cleanup routines in the parent process() loop to execute.
+ for {
+ select {
+ case <-s.ctx.Done():
+ return nil
+ // We only watch the multiplexed channel since all values will come through from process.
+ case res := <-sw.watches.responders[resource.AnyType].response:
+ if err := process(res); err != nil {
+ return status.Errorf(codes.Unavailable, err.Error())
+ }
+ case req, ok := <-reqCh:
+ // Input stream ended or failed.
+ if !ok {
+ return nil
+ }
+
+ // Received an empty request over the request channel. Can't respond.
+ if req == nil {
+ return status.Errorf(codes.Unavailable, "empty request")
+ }
+
+ // Only first request is guaranteed to hold node info so if it's missing, reassign.
+ if req.GetNode() != nil {
+ sw.node = req.GetNode()
+ } else {
+ req.Node = sw.node
+ }
+
+ // Nonces can be reused across streams; we verify nonce only if nonce is not initialized.
+ nonce := req.GetResponseNonce()
+
+ // type URL is required for ADS but is implicit for xDS
+ if defaultTypeURL == resource.AnyType {
+ if req.GetTypeUrl() == "" {
+ return status.Errorf(codes.InvalidArgument, "type URL is required for ADS")
+ }
+ }
+
+ if s.callbacks != nil {
+ if err := s.callbacks.OnStreamRequest(sw.ID, req); err != nil {
+ return err
+ }
+ }
+
+ if lastResponse, ok := sw.lastDiscoveryResponses[req.GetTypeUrl()]; ok {
+ if lastResponse.nonce == "" || lastResponse.nonce == nonce {
+ // Let's record Resource names that a client has received.
+ sw.streamState.SetKnownResourceNames(req.GetTypeUrl(), lastResponse.resources)
+ }
+ }
+
+ typeURL := req.GetTypeUrl()
+ // Use the multiplexed channel for new watches.
+ responder := sw.watches.responders[resource.AnyType].response
+ if w, ok := sw.watches.responders[typeURL]; ok {
+ // We've found a pre-existing watch, lets check and update if needed.
+ // If these requirements aren't satisfied, leave an open watch.
+ if w.nonce == "" || w.nonce == nonce {
+ w.close()
+
+ // Only process if we have an existing watch otherwise go ahead and create.
+ if err := processAllExcept(typeURL); err != nil {
+ return err
+ }
+
+ sw.watches.addWatch(typeURL, &watch{
+ cancel: s.cache.CreateWatch(req, sw.streamState, responder),
+ response: responder,
+ })
+ }
+ } else {
+ // No pre-existing watch exists, let's create one.
+ // We need to precompute the watches first then open a watch in the cache.
+ sw.watches.addWatch(typeURL, &watch{
+ cancel: s.cache.CreateWatch(req, sw.streamState, responder),
+ response: responder,
+ })
+ }
+ }
+ }
+}
diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/server.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/server.go
index 91681237c9..f5be0c57a9 100644
--- a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/server.go
+++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/server.go
@@ -18,17 +18,13 @@ package sotw
import (
"context"
"errors"
- "reflect"
"strconv"
"sync/atomic"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
-
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
- "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
+ "github.com/envoyproxy/go-control-plane/pkg/server/config"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)
@@ -50,8 +46,23 @@ type Callbacks interface {
}
// NewServer creates handlers from a config watcher and callbacks.
-func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callbacks) Server {
- return &server{cache: config, callbacks: callbacks, ctx: ctx}
+func NewServer(ctx context.Context, cw cache.ConfigWatcher, callbacks Callbacks, opts ...config.XDSOption) Server {
+ s := &server{cache: cw, callbacks: callbacks, ctx: ctx, opts: config.NewOpts()}
+
+ // Parse through our options
+ for _, opt := range opts {
+ opt(&s.opts)
+ }
+
+ return s
+}
+
+// WithOrderedADS enables the internal flag to order responses
+// strictly.
+func WithOrderedADS() config.XDSOption {
+ return func(o *config.Opts) {
+ o.Ordered = true
+ }
}
type server struct {
@@ -61,177 +72,78 @@ type server struct {
// streamCount for counting bi-di streams
streamCount int64
-}
-// Discovery response that is sent over GRPC stream
-// We need to record what resource names are already sent to a client
-// So if the client requests a new name we can respond back
-// regardless current snapshot version (even if it is not changed yet)
-type lastDiscoveryResponse struct {
- nonce string
- resources map[string]struct{}
+ // Local configuration flags for individual xDS implementations.
+ opts config.Opts
}
-// process handles a bi-di stream request
-func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryRequest, defaultTypeURL string) error {
- // increment stream count
- streamID := atomic.AddInt64(&s.streamCount, 1)
-
- // unique nonce generator for req-resp pairs per xDS stream; the server
- // ignores stale nonces. nonce is only modified within send() function.
- var streamNonce int64
-
- streamState := stream.NewStreamState(false, map[string]string{})
- lastDiscoveryResponses := map[string]lastDiscoveryResponse{}
-
- // a collection of stack allocated watches per request type
- watches := newWatches()
-
- // node may only be set on the first discovery request
- var node = &core.Node{}
-
- defer func() {
- watches.close()
- if s.callbacks != nil {
- s.callbacks.OnStreamClosed(streamID, node)
- }
- }()
-
- // sends a response by serializing to protobuf Any
- send := func(resp cache.Response) (string, error) {
- if resp == nil {
- return "", errors.New("missing response")
- }
+// streamWrapper abstracts critical data passed around a stream for to be accessed
+// through various code paths in the xDS lifecycle. This comes in handy when dealing
+// with varying implementation types such as ordered vs unordered resource handling.
+type streamWrapper struct {
+ stream stream.Stream // parent stream object
+ ID int64 // stream ID in relation to total stream count
+ nonce int64 // nonce per stream
+ watches watches // collection of stack allocated watchers per request type
+ callbacks Callbacks // callbacks for performing actions through stream lifecycle
+
+ node *core.Node // registered xDS client
+
+ // The below fields are used for tracking resource
+ // cache state and should be maintained per stream.
+ streamState stream.StreamState
+ lastDiscoveryResponses map[string]lastDiscoveryResponse
+}
- out, err := resp.GetDiscoveryResponse()
- if err != nil {
- return "", err
- }
+// Send packages the necessary resources before sending on the gRPC stream,
+// and sets the current state of the world.
+func (s *streamWrapper) send(resp cache.Response) (string, error) {
+ if resp == nil {
+ return "", errors.New("missing response")
+ }
- // increment nonce
- streamNonce = streamNonce + 1
- out.Nonce = strconv.FormatInt(streamNonce, 10)
+ out, err := resp.GetDiscoveryResponse()
+ if err != nil {
+ return "", err
+ }
- lastResponse := lastDiscoveryResponse{
- nonce: out.Nonce,
- resources: make(map[string]struct{}),
- }
- for _, r := range resp.GetRequest().ResourceNames {
- lastResponse.resources[r] = struct{}{}
- }
- lastDiscoveryResponses[resp.GetRequest().TypeUrl] = lastResponse
+ // increment nonce and convert it to base10
+ out.Nonce = strconv.FormatInt(atomic.AddInt64(&s.nonce, 1), 10)
- if s.callbacks != nil {
- s.callbacks.OnStreamResponse(resp.GetContext(), streamID, resp.GetRequest(), out)
- }
- return out.Nonce, str.Send(out)
+ lastResponse := lastDiscoveryResponse{
+ nonce: out.GetNonce(),
+ resources: make(map[string]struct{}),
}
+ for _, r := range resp.GetRequest().GetResourceNames() {
+ lastResponse.resources[r] = struct{}{}
+ }
+ s.lastDiscoveryResponses[resp.GetRequest().GetTypeUrl()] = lastResponse
+ // Register with the callbacks provided that we are sending the response.
if s.callbacks != nil {
- if err := s.callbacks.OnStreamOpen(str.Context(), streamID, defaultTypeURL); err != nil {
- return err
- }
+ s.callbacks.OnStreamResponse(resp.GetContext(), s.ID, resp.GetRequest(), out)
}
- // recompute dynamic channels for this stream
- watches.recompute(s.ctx, reqCh)
-
- for {
- // The list of select cases looks like this:
- // 0: <- ctx.Done
- // 1: <- reqCh
- // 2...: per type watches
- index, value, ok := reflect.Select(watches.cases)
- switch index {
- // ctx.Done() -> if we receive a value here we return as no further computation is needed
- case 0:
- return nil
- // Case 1 handles any request inbound on the stream and handles all initialization as needed
- case 1:
- // input stream ended or errored out
- if !ok {
- return nil
- }
-
- req := value.Interface().(*discovery.DiscoveryRequest)
- if req == nil {
- return status.Errorf(codes.Unavailable, "empty request")
- }
-
- // node field in discovery request is delta-compressed
- if req.Node != nil {
- node = req.Node
- } else {
- req.Node = node
- }
-
- // nonces can be reused across streams; we verify nonce only if nonce is not initialized
- nonce := req.GetResponseNonce()
-
- // type URL is required for ADS but is implicit for xDS
- if defaultTypeURL == resource.AnyType {
- if req.TypeUrl == "" {
- return status.Errorf(codes.InvalidArgument, "type URL is required for ADS")
- }
- } else if req.TypeUrl == "" {
- req.TypeUrl = defaultTypeURL
- }
-
- if s.callbacks != nil {
- if err := s.callbacks.OnStreamRequest(streamID, req); err != nil {
- return err
- }
- }
-
- if lastResponse, ok := lastDiscoveryResponses[req.TypeUrl]; ok {
- if lastResponse.nonce == "" || lastResponse.nonce == nonce {
- // Let's record Resource names that a client has received.
- streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.resources)
- }
- }
-
- typeURL := req.GetTypeUrl()
- responder := make(chan cache.Response, 1)
- if w, ok := watches.responders[typeURL]; ok {
- // We've found a pre-existing watch, lets check and update if needed.
- // If these requirements aren't satisfied, leave an open watch.
- if w.nonce == "" || w.nonce == nonce {
- w.close()
-
- watches.addWatch(typeURL, &watch{
- cancel: s.cache.CreateWatch(req, streamState, responder),
- response: responder,
- })
- }
- } else {
- // No pre-existing watch exists, let's create one.
- // We need to precompute the watches first then open a watch in the cache.
- watches.addWatch(typeURL, &watch{
- cancel: s.cache.CreateWatch(req, streamState, responder),
- response: responder,
- })
- }
-
- // Recompute the dynamic select cases for this stream.
- watches.recompute(s.ctx, reqCh)
- default:
- // Channel n -> these are the dynamic list of responders that correspond to the stream request typeURL
- if !ok {
- // Receiver channel was closed. TODO(jpeach): probably cancel the watch or something?
- return status.Errorf(codes.Unavailable, "resource watch %d -> failed", index)
- }
-
- res := value.Interface().(cache.Response)
- nonce, err := send(res)
- if err != nil {
- return err
- }
+ return out.GetNonce(), s.stream.Send(out)
+}
- watches.responders[res.GetRequest().TypeUrl].nonce = nonce
- }
+// Shutdown closes all open watches, and notifies API consumers the stream has closed.
+func (s *streamWrapper) shutdown() {
+ s.watches.close()
+ if s.callbacks != nil {
+ s.callbacks.OnStreamClosed(s.ID, s.node)
}
}
+// Discovery response that is sent over GRPC stream.
+// We need to record what resource names are already sent to a client
+// So if the client requests a new name we can respond back
+// regardless current snapshot version (even if it is not changed yet)
+type lastDiscoveryResponse struct {
+ nonce string
+ resources map[string]struct{}
+}
+
// StreamHandler converts a blocking read call to channels and initiates stream processing
func (s *server) StreamHandler(stream stream.Stream, typeURL string) error {
// a channel for receiving incoming requests
diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/watches.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/watches.go
index 45670d6a91..d781f663e6 100644
--- a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/watches.go
+++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/watches.go
@@ -60,7 +60,7 @@ func (w *watches) recompute(ctx context.Context, req <-chan *discovery.Discovery
}
}
-// watch contains the necessary modifiables for receiving resource responses
+// watch contains the necessary modifiable data for receiving resource responses
type watch struct {
cancel func()
nonce string
diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/xds.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/xds.go
new file mode 100644
index 0000000000..3b24dec409
--- /dev/null
+++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/xds.go
@@ -0,0 +1,166 @@
+package sotw
+
+import (
+ "reflect"
+ "sync/atomic"
+
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+
+ core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
+ discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
+ "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
+ "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
+ "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
+)
+
+// process handles a bi-di stream request
+func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryRequest, defaultTypeURL string) error {
+ // create our streamWrapper which can be passed down to sub control loops.
+ // this is useful for abstracting critical information for various types of
+ // xDS resource processing.
+ sw := streamWrapper{
+ stream: str,
+ ID: atomic.AddInt64(&s.streamCount, 1), // increment stream count
+ callbacks: s.callbacks,
+ node: &core.Node{}, // node may only be set on the first discovery request
+
+ // a collection of stack allocated watches per request type.
+ watches: newWatches(),
+ streamState: stream.NewStreamState(false, map[string]string{}),
+ lastDiscoveryResponses: make(map[string]lastDiscoveryResponse),
+ }
+
+ // cleanup once our stream has ended.
+ defer sw.shutdown()
+
+ if s.callbacks != nil {
+ if err := s.callbacks.OnStreamOpen(str.Context(), sw.ID, defaultTypeURL); err != nil {
+ return err
+ }
+ }
+
+ // do an initial recompute so we can load the first 2 channels:
+ // <-reqCh
+ // s.ctx.Done()
+ sw.watches.recompute(s.ctx, reqCh)
+
+ for {
+ // The list of select cases looks like this:
+ // 0: <- ctx.Done
+ // 1: <- reqCh
+ // 2...: per type watches
+ index, value, ok := reflect.Select(sw.watches.cases)
+ switch index {
+ // ctx.Done() -> if we receive a value here we return
+ // as no further computation is needed
+ case 0:
+ return nil
+ // Case 1 handles any request inbound on the stream
+ // and handles all initialization as needed
+ case 1:
+ // input stream ended or failed
+ if !ok {
+ return nil
+ }
+
+ req := value.Interface().(*discovery.DiscoveryRequest)
+ if req == nil {
+ return status.Errorf(codes.Unavailable, "empty request")
+ }
+
+ // Only first request is guaranteed to hold node info so if it's missing, reassign.
+ if req.GetNode() != nil {
+ sw.node = req.GetNode()
+ } else {
+ req.Node = sw.node
+ }
+
+ // nonces can be reused across streams; we verify nonce only if nonce is not initialized
+ nonce := req.GetResponseNonce()
+
+ // type URL is required for ADS but is implicit for xDS
+ if defaultTypeURL == resource.AnyType {
+ if req.GetTypeUrl() == "" {
+ return status.Errorf(codes.InvalidArgument, "type URL is required for ADS")
+ }
+
+ // When using ADS we need to order responses.
+ // This is guaranteed in the xDS protocol specification
+ // as ADS is required to be eventually consistent.
+ // More details can be found here if interested:
+ // https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#eventual-consistency-considerations
+ if s.opts.Ordered {
+ // send our first request on the stream again so it doesn't get
+ // lost in processing on the new control loop
+ // There's a risk (albeit very limited) that we'd end up handling requests in the wrong order here.
+ // If envoy is using ADS for endpoints, and clusters are added in short sequence,
+ // the following request might include a new cluster and be discarded as the previous one will be handled after.
+ go func() {
+ reqCh <- req
+ }()
+
+ // Trigger a different code path specifically for ADS.
+ // We want resource ordering so things don't get sent before they should.
+ // This is a blocking call and will exit the process function
+ // on successful completion.
+ return s.processADS(&sw, reqCh, defaultTypeURL)
+ }
+ } else if req.GetTypeUrl() == "" {
+ req.TypeUrl = defaultTypeURL
+ }
+
+ if s.callbacks != nil {
+ if err := s.callbacks.OnStreamRequest(sw.ID, req); err != nil {
+ return err
+ }
+ }
+
+ if lastResponse, ok := sw.lastDiscoveryResponses[req.GetTypeUrl()]; ok {
+ if lastResponse.nonce == "" || lastResponse.nonce == nonce {
+ // Let's record Resource names that a client has received.
+ sw.streamState.SetKnownResourceNames(req.GetTypeUrl(), lastResponse.resources)
+ }
+ }
+
+ typeURL := req.GetTypeUrl()
+ responder := make(chan cache.Response, 1)
+ if w, ok := sw.watches.responders[typeURL]; ok {
+ // We've found a pre-existing watch, lets check and update if needed.
+ // If these requirements aren't satisfied, leave an open watch.
+ if w.nonce == "" || w.nonce == nonce {
+ w.close()
+
+ sw.watches.addWatch(typeURL, &watch{
+ cancel: s.cache.CreateWatch(req, sw.streamState, responder),
+ response: responder,
+ })
+ }
+ } else {
+ // No pre-existing watch exists, let's create one.
+ // We need to precompute the watches first then open a watch in the cache.
+ sw.watches.addWatch(typeURL, &watch{
+ cancel: s.cache.CreateWatch(req, sw.streamState, responder),
+ response: responder,
+ })
+ }
+
+ // Recompute the dynamic select cases for this stream.
+ sw.watches.recompute(s.ctx, reqCh)
+ default:
+ // Channel n -> these are the dynamic list of responders that correspond to the stream request typeURL
+ if !ok {
+ // Receiver channel was closed. TODO(jpeach): probably cancel the watch or something?
+ return status.Errorf(codes.Unavailable, "resource watch %d -> failed", index)
+ }
+
+ res := value.Interface().(cache.Response)
+ nonce, err := sw.send(res)
+ if err != nil {
+ return err
+ }
+
+ sw.watches.responders[res.GetRequest().GetTypeUrl()].nonce = nonce
+ }
+ }
+}
diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/ya.make b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/ya.make
index 48fb263441..6d7dad878f 100644
--- a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/ya.make
+++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/ya.make
@@ -3,8 +3,10 @@ GO_LIBRARY()
LICENSE(Apache-2.0)
SRCS(
+ ads.go
server.go
watches.go
+ xds.go
)
END()
diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/stream/v3/stream.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/stream/v3/stream.go
index b5832b7d58..1664a941e0 100644
--- a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/stream/v3/stream.go
+++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/stream/v3/stream.go
@@ -6,7 +6,7 @@ import (
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
)
-// Generic RPC stream.
+// Generic RPC stream for state of the world.
type Stream interface {
grpc.ServerStream
@@ -14,6 +14,7 @@ type Stream interface {
Recv() (*discovery.DiscoveryRequest, error)
}
+// Generic RPC Stream for the delta based xDS protocol.
type DeltaStream interface {
grpc.ServerStream
@@ -21,7 +22,7 @@ type DeltaStream interface {
Recv() (*discovery.DeltaDiscoveryRequest, error)
}
-// StreamState will keep track of resource state per type on a stream.
+// StreamState will keep track of resource cache state per type on a stream.
type StreamState struct { // nolint:golint,revive
// Indicates whether the delta stream currently has a wildcard watch
wildcard bool
@@ -34,11 +35,32 @@ type StreamState struct { // nolint:golint,revive
// This field stores the last state sent to the client.
resourceVersions map[string]string
- // knownResourceNames contains resource names that a client has received previously
+ // knownResourceNames contains resource names that a client has received previously (SOTW).
knownResourceNames map[string]map[string]struct{}
- // indicates whether the object has been modified since its creation
+ // First indicates whether the StreamState has been modified since its creation
first bool
+
+ // Ordered indicates whether we want an ordered ADS stream or not
+ ordered bool
+}
+
+// NewStreamState initializes a stream state.
+func NewStreamState(wildcard bool, initialResourceVersions map[string]string) StreamState {
+ state := StreamState{
+ wildcard: wildcard,
+ subscribedResourceNames: map[string]struct{}{},
+ resourceVersions: initialResourceVersions,
+ first: true,
+ knownResourceNames: map[string]map[string]struct{}{},
+ ordered: false, // Ordered comes from the first request since that's when we discover if they want ADS
+ }
+
+ if initialResourceVersions == nil {
+ state.resourceVersions = make(map[string]string)
+ }
+
+ return state
}
// GetSubscribedResourceNames returns the list of resources currently explicitly subscribed to
@@ -71,31 +93,43 @@ func (s *StreamState) WatchesResources(resourceNames map[string]struct{}) bool {
return false
}
+func (s *StreamState) SetWildcard(wildcard bool) {
+ s.wildcard = wildcard
+}
+
+// GetResourceVersions returns a map of current resources grouped by type URL.
func (s *StreamState) GetResourceVersions() map[string]string {
return s.resourceVersions
}
+// SetResourceVersions sets a list of resource versions by type URL and removes the flag
+// of "first" since we can safely assume another request has come through the stream.
func (s *StreamState) SetResourceVersions(resourceVersions map[string]string) {
s.first = false
s.resourceVersions = resourceVersions
}
+// IsFirst returns whether or not the state of the stream is based upon the initial request.
func (s *StreamState) IsFirst() bool {
return s.first
}
-func (s *StreamState) SetWildcard(wildcard bool) {
- s.wildcard = wildcard
-}
-
+// IsWildcard returns whether or not an xDS client requested in wildcard mode on the initial request.
func (s *StreamState) IsWildcard() bool {
return s.wildcard
}
+// GetKnownResourceNames returns the current known list of resources on a SOTW stream.
+func (s *StreamState) GetKnownResourceNames(url string) map[string]struct{} {
+ return s.knownResourceNames[url]
+}
+
+// SetKnownResourceNames sets a list of resource names in a stream utilizing the SOTW protocol.
func (s *StreamState) SetKnownResourceNames(url string, names map[string]struct{}) {
s.knownResourceNames[url] = names
}
+// SetKnownResourceNamesAsList is a helper function to set resource names as a slice input.
func (s *StreamState) SetKnownResourceNamesAsList(url string, names []string) {
m := map[string]struct{}{}
for _, name := range names {
@@ -103,24 +137,3 @@ func (s *StreamState) SetKnownResourceNamesAsList(url string, names []string) {
}
s.knownResourceNames[url] = m
}
-
-func (s *StreamState) GetKnownResourceNames(url string) map[string]struct{} {
- return s.knownResourceNames[url]
-}
-
-// NewStreamState initializes a stream state.
-func NewStreamState(wildcard bool, initialResourceVersions map[string]string) StreamState {
- state := StreamState{
- wildcard: wildcard,
- subscribedResourceNames: map[string]struct{}{},
- resourceVersions: initialResourceVersions,
- first: true,
- knownResourceNames: map[string]map[string]struct{}{},
- }
-
- if initialResourceVersions == nil {
- state.resourceVersions = make(map[string]string)
- }
-
- return state
-}
diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/stream/v3/ya.make b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/stream/v3/ya.make
index 6cb4c649be..ff0a7e670b 100644
--- a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/stream/v3/ya.make
+++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/stream/v3/ya.make
@@ -2,6 +2,8 @@ GO_LIBRARY()
LICENSE(Apache-2.0)
-SRCS(stream.go)
+SRCS(
+ stream.go
+)
END()
diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/v3/server.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/v3/server.go
index 79b66881c4..c3e91ee032 100644
--- a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/v3/server.go
+++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/v3/server.go
@@ -21,6 +21,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
+ "github.com/envoyproxy/go-control-plane/pkg/server/config"
"github.com/envoyproxy/go-control-plane/pkg/server/delta/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/rest/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3"
@@ -29,7 +30,6 @@ import (
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
clusterservice "github.com/envoyproxy/go-control-plane/envoy/service/cluster/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
- discoverygrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
endpointservice "github.com/envoyproxy/go-control-plane/envoy/service/endpoint/v3"
extensionconfigservice "github.com/envoyproxy/go-control-plane/envoy/service/extension/v3"
listenerservice "github.com/envoyproxy/go-control-plane/envoy/service/listener/v3"
@@ -49,7 +49,7 @@ type Server interface {
routeservice.ScopedRoutesDiscoveryServiceServer
routeservice.VirtualHostDiscoveryServiceServer
listenerservice.ListenerDiscoveryServiceServer
- discoverygrpc.AggregatedDiscoveryServiceServer
+ discovery.AggregatedDiscoveryServiceServer
secretservice.SecretDiscoveryServiceServer
runtimeservice.RuntimeDiscoveryServiceServer
extensionconfigservice.ExtensionConfigDiscoveryServiceServer
@@ -165,10 +165,10 @@ func (c CallbackFuncs) OnFetchResponse(req *discovery.DiscoveryRequest, resp *di
}
// NewServer creates handlers from a config watcher and callbacks.
-func NewServer(ctx context.Context, config cache.Cache, callbacks Callbacks) Server {
+func NewServer(ctx context.Context, config cache.Cache, callbacks Callbacks, opts ...config.XDSOption) Server {
return NewServerAdvanced(rest.NewServer(config, callbacks),
- sotw.NewServer(ctx, config, callbacks),
- delta.NewServer(ctx, config, callbacks),
+ sotw.NewServer(ctx, config, callbacks, opts...),
+ delta.NewServer(ctx, config, callbacks, opts...),
)
}
@@ -186,7 +186,7 @@ func (s *server) StreamHandler(stream stream.Stream, typeURL string) error {
return s.sotw.StreamHandler(stream, typeURL)
}
-func (s *server) StreamAggregatedResources(stream discoverygrpc.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
+func (s *server) StreamAggregatedResources(stream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
return s.StreamHandler(stream, resource.AnyType)
}
@@ -311,7 +311,7 @@ func (s *server) DeltaStreamHandler(stream stream.DeltaStream, typeURL string) e
return s.delta.DeltaStreamHandler(stream, typeURL)
}
-func (s *server) DeltaAggregatedResources(stream discoverygrpc.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error {
+func (s *server) DeltaAggregatedResources(stream discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error {
return s.DeltaStreamHandler(stream, resource.AnyType)
}
diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/v3/ya.make b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/v3/ya.make
index 26fe186fa4..96ad36cef8 100644
--- a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/v3/ya.make
+++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/v3/ya.make
@@ -15,4 +15,6 @@ GO_XTEST_SRCS(
END()
-RECURSE(gotest)
+RECURSE(
+ gotest
+)