diff options
author | Alexander Smirnov <alex@ydb.tech> | 2024-01-31 17:22:33 +0300 |
---|---|---|
committer | Alexander Smirnov <alex@ydb.tech> | 2024-01-31 17:22:33 +0300 |
commit | 52be5dbdd420165c68e7e90ba8f1d2f00da041f6 (patch) | |
tree | 5d47f5b2ff4e6a7c8e75d33931a1e683949b7229 /vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/server.go | |
parent | ea57c8867ceca391357c3c5ffcc5ba6738b49adc (diff) | |
parent | 809f0cf2fdfddfbeacc2256ffdbaaf5808ce5ed4 (diff) | |
download | ydb-52be5dbdd420165c68e7e90ba8f1d2f00da041f6.tar.gz |
Merge branch 'mergelibs12' into main
Diffstat (limited to 'vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/server.go')
-rw-r--r-- | vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/server.go | 236 |
1 files changed, 74 insertions, 162 deletions
diff --git a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/server.go b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/server.go index 91681237c9..f5be0c57a9 100644 --- a/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/server.go +++ b/vendor/github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3/server.go @@ -18,17 +18,13 @@ package sotw import ( "context" "errors" - "reflect" "strconv" "sync/atomic" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" - "github.com/envoyproxy/go-control-plane/pkg/resource/v3" + "github.com/envoyproxy/go-control-plane/pkg/server/config" "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) @@ -50,8 +46,23 @@ type Callbacks interface { } // NewServer creates handlers from a config watcher and callbacks. -func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callbacks) Server { - return &server{cache: config, callbacks: callbacks, ctx: ctx} +func NewServer(ctx context.Context, cw cache.ConfigWatcher, callbacks Callbacks, opts ...config.XDSOption) Server { + s := &server{cache: cw, callbacks: callbacks, ctx: ctx, opts: config.NewOpts()} + + // Parse through our options + for _, opt := range opts { + opt(&s.opts) + } + + return s +} + +// WithOrderedADS enables the internal flag to order responses +// strictly. +func WithOrderedADS() config.XDSOption { + return func(o *config.Opts) { + o.Ordered = true + } } type server struct { @@ -61,177 +72,78 @@ type server struct { // streamCount for counting bi-di streams streamCount int64 -} -// Discovery response that is sent over GRPC stream -// We need to record what resource names are already sent to a client -// So if the client requests a new name we can respond back -// regardless current snapshot version (even if it is not changed yet) -type lastDiscoveryResponse struct { - nonce string - resources map[string]struct{} + // Local configuration flags for individual xDS implementations. + opts config.Opts } -// process handles a bi-di stream request -func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryRequest, defaultTypeURL string) error { - // increment stream count - streamID := atomic.AddInt64(&s.streamCount, 1) - - // unique nonce generator for req-resp pairs per xDS stream; the server - // ignores stale nonces. nonce is only modified within send() function. - var streamNonce int64 - - streamState := stream.NewStreamState(false, map[string]string{}) - lastDiscoveryResponses := map[string]lastDiscoveryResponse{} - - // a collection of stack allocated watches per request type - watches := newWatches() - - // node may only be set on the first discovery request - var node = &core.Node{} - - defer func() { - watches.close() - if s.callbacks != nil { - s.callbacks.OnStreamClosed(streamID, node) - } - }() - - // sends a response by serializing to protobuf Any - send := func(resp cache.Response) (string, error) { - if resp == nil { - return "", errors.New("missing response") - } +// streamWrapper abstracts critical data passed around a stream for to be accessed +// through various code paths in the xDS lifecycle. This comes in handy when dealing +// with varying implementation types such as ordered vs unordered resource handling. +type streamWrapper struct { + stream stream.Stream // parent stream object + ID int64 // stream ID in relation to total stream count + nonce int64 // nonce per stream + watches watches // collection of stack allocated watchers per request type + callbacks Callbacks // callbacks for performing actions through stream lifecycle + + node *core.Node // registered xDS client + + // The below fields are used for tracking resource + // cache state and should be maintained per stream. + streamState stream.StreamState + lastDiscoveryResponses map[string]lastDiscoveryResponse +} - out, err := resp.GetDiscoveryResponse() - if err != nil { - return "", err - } +// Send packages the necessary resources before sending on the gRPC stream, +// and sets the current state of the world. +func (s *streamWrapper) send(resp cache.Response) (string, error) { + if resp == nil { + return "", errors.New("missing response") + } - // increment nonce - streamNonce = streamNonce + 1 - out.Nonce = strconv.FormatInt(streamNonce, 10) + out, err := resp.GetDiscoveryResponse() + if err != nil { + return "", err + } - lastResponse := lastDiscoveryResponse{ - nonce: out.Nonce, - resources: make(map[string]struct{}), - } - for _, r := range resp.GetRequest().ResourceNames { - lastResponse.resources[r] = struct{}{} - } - lastDiscoveryResponses[resp.GetRequest().TypeUrl] = lastResponse + // increment nonce and convert it to base10 + out.Nonce = strconv.FormatInt(atomic.AddInt64(&s.nonce, 1), 10) - if s.callbacks != nil { - s.callbacks.OnStreamResponse(resp.GetContext(), streamID, resp.GetRequest(), out) - } - return out.Nonce, str.Send(out) + lastResponse := lastDiscoveryResponse{ + nonce: out.GetNonce(), + resources: make(map[string]struct{}), } + for _, r := range resp.GetRequest().GetResourceNames() { + lastResponse.resources[r] = struct{}{} + } + s.lastDiscoveryResponses[resp.GetRequest().GetTypeUrl()] = lastResponse + // Register with the callbacks provided that we are sending the response. if s.callbacks != nil { - if err := s.callbacks.OnStreamOpen(str.Context(), streamID, defaultTypeURL); err != nil { - return err - } + s.callbacks.OnStreamResponse(resp.GetContext(), s.ID, resp.GetRequest(), out) } - // recompute dynamic channels for this stream - watches.recompute(s.ctx, reqCh) - - for { - // The list of select cases looks like this: - // 0: <- ctx.Done - // 1: <- reqCh - // 2...: per type watches - index, value, ok := reflect.Select(watches.cases) - switch index { - // ctx.Done() -> if we receive a value here we return as no further computation is needed - case 0: - return nil - // Case 1 handles any request inbound on the stream and handles all initialization as needed - case 1: - // input stream ended or errored out - if !ok { - return nil - } - - req := value.Interface().(*discovery.DiscoveryRequest) - if req == nil { - return status.Errorf(codes.Unavailable, "empty request") - } - - // node field in discovery request is delta-compressed - if req.Node != nil { - node = req.Node - } else { - req.Node = node - } - - // nonces can be reused across streams; we verify nonce only if nonce is not initialized - nonce := req.GetResponseNonce() - - // type URL is required for ADS but is implicit for xDS - if defaultTypeURL == resource.AnyType { - if req.TypeUrl == "" { - return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") - } - } else if req.TypeUrl == "" { - req.TypeUrl = defaultTypeURL - } - - if s.callbacks != nil { - if err := s.callbacks.OnStreamRequest(streamID, req); err != nil { - return err - } - } - - if lastResponse, ok := lastDiscoveryResponses[req.TypeUrl]; ok { - if lastResponse.nonce == "" || lastResponse.nonce == nonce { - // Let's record Resource names that a client has received. - streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.resources) - } - } - - typeURL := req.GetTypeUrl() - responder := make(chan cache.Response, 1) - if w, ok := watches.responders[typeURL]; ok { - // We've found a pre-existing watch, lets check and update if needed. - // If these requirements aren't satisfied, leave an open watch. - if w.nonce == "" || w.nonce == nonce { - w.close() - - watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, streamState, responder), - response: responder, - }) - } - } else { - // No pre-existing watch exists, let's create one. - // We need to precompute the watches first then open a watch in the cache. - watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, streamState, responder), - response: responder, - }) - } - - // Recompute the dynamic select cases for this stream. - watches.recompute(s.ctx, reqCh) - default: - // Channel n -> these are the dynamic list of responders that correspond to the stream request typeURL - if !ok { - // Receiver channel was closed. TODO(jpeach): probably cancel the watch or something? - return status.Errorf(codes.Unavailable, "resource watch %d -> failed", index) - } - - res := value.Interface().(cache.Response) - nonce, err := send(res) - if err != nil { - return err - } + return out.GetNonce(), s.stream.Send(out) +} - watches.responders[res.GetRequest().TypeUrl].nonce = nonce - } +// Shutdown closes all open watches, and notifies API consumers the stream has closed. +func (s *streamWrapper) shutdown() { + s.watches.close() + if s.callbacks != nil { + s.callbacks.OnStreamClosed(s.ID, s.node) } } +// Discovery response that is sent over GRPC stream. +// We need to record what resource names are already sent to a client +// So if the client requests a new name we can respond back +// regardless current snapshot version (even if it is not changed yet) +type lastDiscoveryResponse struct { + nonce string + resources map[string]struct{} +} + // StreamHandler converts a blocking read call to channels and initiates stream processing func (s *server) StreamHandler(stream stream.Stream, typeURL string) error { // a channel for receiving incoming requests |