aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/server.go
diff options
context:
space:
mode:
authorAlexander Smirnov <alex@ydb.tech>2024-01-31 17:22:33 +0300
committerAlexander Smirnov <alex@ydb.tech>2024-01-31 17:22:33 +0300
commit52be5dbdd420165c68e7e90ba8f1d2f00da041f6 (patch)
tree5d47f5b2ff4e6a7c8e75d33931a1e683949b7229 /vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/server.go
parentea57c8867ceca391357c3c5ffcc5ba6738b49adc (diff)
parent809f0cf2fdfddfbeacc2256ffdbaaf5808ce5ed4 (diff)
downloadydb-52be5dbdd420165c68e7e90ba8f1d2f00da041f6.tar.gz
Merge branch 'mergelibs12' into main
Diffstat (limited to 'vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/server.go')
-rw-r--r--vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/server.go236
1 files changed, 74 insertions, 162 deletions
diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/server.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/server.go
index 91681237c9..f5be0c57a9 100644
--- a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/server.go
+++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/server.go
@@ -18,17 +18,13 @@ package sotw
import (
"context"
"errors"
- "reflect"
"strconv"
"sync/atomic"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
-
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
- "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
+ "github.com/envoyproxy/go-control-plane/pkg/server/config"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)
@@ -50,8 +46,23 @@ type Callbacks interface {
}
// NewServer creates handlers from a config watcher and callbacks.
-func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callbacks) Server {
- return &server{cache: config, callbacks: callbacks, ctx: ctx}
+func NewServer(ctx context.Context, cw cache.ConfigWatcher, callbacks Callbacks, opts ...config.XDSOption) Server {
+ s := &server{cache: cw, callbacks: callbacks, ctx: ctx, opts: config.NewOpts()}
+
+ // Parse through our options
+ for _, opt := range opts {
+ opt(&s.opts)
+ }
+
+ return s
+}
+
+// WithOrderedADS enables the internal flag to order responses
+// strictly.
+func WithOrderedADS() config.XDSOption {
+ return func(o *config.Opts) {
+ o.Ordered = true
+ }
}
type server struct {
@@ -61,177 +72,78 @@ type server struct {
// streamCount for counting bi-di streams
streamCount int64
-}
-// Discovery response that is sent over GRPC stream
-// We need to record what resource names are already sent to a client
-// So if the client requests a new name we can respond back
-// regardless current snapshot version (even if it is not changed yet)
-type lastDiscoveryResponse struct {
- nonce string
- resources map[string]struct{}
+ // Local configuration flags for individual xDS implementations.
+ opts config.Opts
}
-// process handles a bi-di stream request
-func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryRequest, defaultTypeURL string) error {
- // increment stream count
- streamID := atomic.AddInt64(&s.streamCount, 1)
-
- // unique nonce generator for req-resp pairs per xDS stream; the server
- // ignores stale nonces. nonce is only modified within send() function.
- var streamNonce int64
-
- streamState := stream.NewStreamState(false, map[string]string{})
- lastDiscoveryResponses := map[string]lastDiscoveryResponse{}
-
- // a collection of stack allocated watches per request type
- watches := newWatches()
-
- // node may only be set on the first discovery request
- var node = &core.Node{}
-
- defer func() {
- watches.close()
- if s.callbacks != nil {
- s.callbacks.OnStreamClosed(streamID, node)
- }
- }()
-
- // sends a response by serializing to protobuf Any
- send := func(resp cache.Response) (string, error) {
- if resp == nil {
- return "", errors.New("missing response")
- }
+// streamWrapper abstracts critical data passed around a stream for to be accessed
+// through various code paths in the xDS lifecycle. This comes in handy when dealing
+// with varying implementation types such as ordered vs unordered resource handling.
+type streamWrapper struct {
+ stream stream.Stream // parent stream object
+ ID int64 // stream ID in relation to total stream count
+ nonce int64 // nonce per stream
+ watches watches // collection of stack allocated watchers per request type
+ callbacks Callbacks // callbacks for performing actions through stream lifecycle
+
+ node *core.Node // registered xDS client
+
+ // The below fields are used for tracking resource
+ // cache state and should be maintained per stream.
+ streamState stream.StreamState
+ lastDiscoveryResponses map[string]lastDiscoveryResponse
+}
- out, err := resp.GetDiscoveryResponse()
- if err != nil {
- return "", err
- }
+// Send packages the necessary resources before sending on the gRPC stream,
+// and sets the current state of the world.
+func (s *streamWrapper) send(resp cache.Response) (string, error) {
+ if resp == nil {
+ return "", errors.New("missing response")
+ }
- // increment nonce
- streamNonce = streamNonce + 1
- out.Nonce = strconv.FormatInt(streamNonce, 10)
+ out, err := resp.GetDiscoveryResponse()
+ if err != nil {
+ return "", err
+ }
- lastResponse := lastDiscoveryResponse{
- nonce: out.Nonce,
- resources: make(map[string]struct{}),
- }
- for _, r := range resp.GetRequest().ResourceNames {
- lastResponse.resources[r] = struct{}{}
- }
- lastDiscoveryResponses[resp.GetRequest().TypeUrl] = lastResponse
+ // increment nonce and convert it to base10
+ out.Nonce = strconv.FormatInt(atomic.AddInt64(&s.nonce, 1), 10)
- if s.callbacks != nil {
- s.callbacks.OnStreamResponse(resp.GetContext(), streamID, resp.GetRequest(), out)
- }
- return out.Nonce, str.Send(out)
+ lastResponse := lastDiscoveryResponse{
+ nonce: out.GetNonce(),
+ resources: make(map[string]struct{}),
}
+ for _, r := range resp.GetRequest().GetResourceNames() {
+ lastResponse.resources[r] = struct{}{}
+ }
+ s.lastDiscoveryResponses[resp.GetRequest().GetTypeUrl()] = lastResponse
+ // Register with the callbacks provided that we are sending the response.
if s.callbacks != nil {
- if err := s.callbacks.OnStreamOpen(str.Context(), streamID, defaultTypeURL); err != nil {
- return err
- }
+ s.callbacks.OnStreamResponse(resp.GetContext(), s.ID, resp.GetRequest(), out)
}
- // recompute dynamic channels for this stream
- watches.recompute(s.ctx, reqCh)
-
- for {
- // The list of select cases looks like this:
- // 0: <- ctx.Done
- // 1: <- reqCh
- // 2...: per type watches
- index, value, ok := reflect.Select(watches.cases)
- switch index {
- // ctx.Done() -> if we receive a value here we return as no further computation is needed
- case 0:
- return nil
- // Case 1 handles any request inbound on the stream and handles all initialization as needed
- case 1:
- // input stream ended or errored out
- if !ok {
- return nil
- }
-
- req := value.Interface().(*discovery.DiscoveryRequest)
- if req == nil {
- return status.Errorf(codes.Unavailable, "empty request")
- }
-
- // node field in discovery request is delta-compressed
- if req.Node != nil {
- node = req.Node
- } else {
- req.Node = node
- }
-
- // nonces can be reused across streams; we verify nonce only if nonce is not initialized
- nonce := req.GetResponseNonce()
-
- // type URL is required for ADS but is implicit for xDS
- if defaultTypeURL == resource.AnyType {
- if req.TypeUrl == "" {
- return status.Errorf(codes.InvalidArgument, "type URL is required for ADS")
- }
- } else if req.TypeUrl == "" {
- req.TypeUrl = defaultTypeURL
- }
-
- if s.callbacks != nil {
- if err := s.callbacks.OnStreamRequest(streamID, req); err != nil {
- return err
- }
- }
-
- if lastResponse, ok := lastDiscoveryResponses[req.TypeUrl]; ok {
- if lastResponse.nonce == "" || lastResponse.nonce == nonce {
- // Let's record Resource names that a client has received.
- streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.resources)
- }
- }
-
- typeURL := req.GetTypeUrl()
- responder := make(chan cache.Response, 1)
- if w, ok := watches.responders[typeURL]; ok {
- // We've found a pre-existing watch, lets check and update if needed.
- // If these requirements aren't satisfied, leave an open watch.
- if w.nonce == "" || w.nonce == nonce {
- w.close()
-
- watches.addWatch(typeURL, &watch{
- cancel: s.cache.CreateWatch(req, streamState, responder),
- response: responder,
- })
- }
- } else {
- // No pre-existing watch exists, let's create one.
- // We need to precompute the watches first then open a watch in the cache.
- watches.addWatch(typeURL, &watch{
- cancel: s.cache.CreateWatch(req, streamState, responder),
- response: responder,
- })
- }
-
- // Recompute the dynamic select cases for this stream.
- watches.recompute(s.ctx, reqCh)
- default:
- // Channel n -> these are the dynamic list of responders that correspond to the stream request typeURL
- if !ok {
- // Receiver channel was closed. TODO(jpeach): probably cancel the watch or something?
- return status.Errorf(codes.Unavailable, "resource watch %d -> failed", index)
- }
-
- res := value.Interface().(cache.Response)
- nonce, err := send(res)
- if err != nil {
- return err
- }
+ return out.GetNonce(), s.stream.Send(out)
+}
- watches.responders[res.GetRequest().TypeUrl].nonce = nonce
- }
+// Shutdown closes all open watches, and notifies API consumers the stream has closed.
+func (s *streamWrapper) shutdown() {
+ s.watches.close()
+ if s.callbacks != nil {
+ s.callbacks.OnStreamClosed(s.ID, s.node)
}
}
+// Discovery response that is sent over GRPC stream.
+// We need to record what resource names are already sent to a client
+// So if the client requests a new name we can respond back
+// regardless current snapshot version (even if it is not changed yet)
+type lastDiscoveryResponse struct {
+ nonce string
+ resources map[string]struct{}
+}
+
// StreamHandler converts a blocking read call to channels and initiates stream processing
func (s *server) StreamHandler(stream stream.Stream, typeURL string) error {
// a channel for receiving incoming requests