//
// Copyright 2019 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

#ifndef GRPC_SRC_CORE_EXT_XDS_XDS_CLIENT_H
#define GRPC_SRC_CORE_EXT_XDS_XDS_CLIENT_H

#include <grpc/support/port_platform.h>

#include <map>
#include <memory>
#include <set>
#include <util/generic/string.h>
#include <util/string/cast.h>
#include <utility>
#include <vector>

#include "y_absl/base/thread_annotations.h"
#include "y_absl/status/status.h"
#include "y_absl/status/statusor.h"
#include "y_absl/strings/string_view.h"
#include "upb/def.hpp"

#include <grpc/event_engine/event_engine.h>

#include "src/core/ext/xds/xds_api.h"
#include "src/core/ext/xds/xds_bootstrap.h"
#include "src/core/ext/xds/xds_client_stats.h"
#include "src/core/ext/xds/xds_resource_type.h"
#include "src/core/ext/xds/xds_transport.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/dual_ref_counted.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/uri/uri_parser.h"

namespace grpc_core {

extern TraceFlag grpc_xds_client_trace;
extern TraceFlag grpc_xds_client_refcount_trace;

class XdsClient : public DualRefCounted<XdsClient> {
 public:
  // Resource watcher interface.  Implemented by callers.
  // Note: Most callers will not use this API directly but rather via a
  // resource-type-specific wrapper API provided by the relevant
  // XdsResourceType implementation.
  class ResourceWatcherInterface : public RefCounted<ResourceWatcherInterface> {
   public:
    virtual void OnGenericResourceChanged(
        const XdsResourceType::ResourceData* resource)
        Y_ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
    virtual void OnError(y_absl::Status status)
        Y_ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
    virtual void OnResourceDoesNotExist()
        Y_ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
  };

  XdsClient(
      std::unique_ptr<XdsBootstrap> bootstrap,
      OrphanablePtr<XdsTransportFactory> transport_factory,
      std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine,
      TString user_agent_name, TString user_agent_version,
      Duration resource_request_timeout = Duration::Seconds(15));
  ~XdsClient() override;

  const XdsBootstrap& bootstrap() const {
    return *bootstrap_;  // ctor asserts that it is non-null
  }

  XdsTransportFactory* transport_factory() const {
    return transport_factory_.get();
  }

  void Orphan() override;

  // Start and cancel watch for a resource.
  //
  // The XdsClient takes ownership of the watcher, but the caller may
  // keep a raw pointer to the watcher, which may be used only for
  // cancellation.  (Because the caller does not own the watcher, the
  // pointer must not be used for any other purpose.)
  // If the caller is going to start a new watch after cancelling the
  // old one, it should set delay_unsubscription to true.
  //
  // The resource type object must be a global singleton, since the first
  // time the XdsClient sees a particular resource type object, it will
  // store the pointer to that object as the authoritative implementation for
  // its type URLs.  The resource type object must outlive the XdsClient object,
  // and it is illegal to start a subsequent watch for the same type URLs using
  // a different resource type object.
  //
  // Note: Most callers will not use this API directly but rather via a
  // resource-type-specific wrapper API provided by the relevant
  // XdsResourceType implementation.
  void WatchResource(const XdsResourceType* type, y_absl::string_view name,
                     RefCountedPtr<ResourceWatcherInterface> watcher);
  void CancelResourceWatch(const XdsResourceType* type,
                           y_absl::string_view listener_name,
                           ResourceWatcherInterface* watcher,
                           bool delay_unsubscription = false);

  // Adds and removes drop stats for cluster_name and eds_service_name.
  RefCountedPtr<XdsClusterDropStats> AddClusterDropStats(
      const XdsBootstrap::XdsServer& xds_server, y_absl::string_view cluster_name,
      y_absl::string_view eds_service_name);
  void RemoveClusterDropStats(const XdsBootstrap::XdsServer& xds_server,
                              y_absl::string_view cluster_name,
                              y_absl::string_view eds_service_name,
                              XdsClusterDropStats* cluster_drop_stats);

  // Adds and removes locality stats for cluster_name and eds_service_name
  // for the specified locality.
  RefCountedPtr<XdsClusterLocalityStats> AddClusterLocalityStats(
      const XdsBootstrap::XdsServer& xds_server, y_absl::string_view cluster_name,
      y_absl::string_view eds_service_name,
      RefCountedPtr<XdsLocalityName> locality);
  void RemoveClusterLocalityStats(
      const XdsBootstrap::XdsServer& xds_server, y_absl::string_view cluster_name,
      y_absl::string_view eds_service_name,
      const RefCountedPtr<XdsLocalityName>& locality,
      XdsClusterLocalityStats* cluster_locality_stats);

  // Resets connection backoff state.
  void ResetBackoff();

  // Dumps the active xDS config in JSON format.
  // Individual xDS resource is encoded as envoy.admin.v3.*ConfigDump. Returns
  // envoy.service.status.v3.ClientConfig which also includes the config
  // status (e.g., CLIENT_REQUESTED, CLIENT_ACKED, CLIENT_NACKED).
  //
  // Expected to be invoked by wrapper languages in their CSDS service
  // implementation.
  TString DumpClientConfigBinary();

  grpc_event_engine::experimental::EventEngine* engine() {
    return engine_.get();
  }

 private:
  struct XdsResourceKey {
    TString id;
    std::vector<URI::QueryParam> query_params;

    bool operator<(const XdsResourceKey& other) const {
      int c = id.compare(other.id);
      if (c != 0) return c < 0;
      return query_params < other.query_params;
    }
  };

  struct XdsResourceName {
    TString authority;
    XdsResourceKey key;
  };

  // Contains a channel to the xds server and all the data related to the
  // channel.  Holds a ref to the xds client object.
  class ChannelState : public DualRefCounted<ChannelState> {
   public:
    template <typename T>
    class RetryableCall;

    class AdsCallState;
    class LrsCallState;

    ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
                 const XdsBootstrap::XdsServer& server);
    ~ChannelState() override;

    void Orphan() override;

    XdsClient* xds_client() const { return xds_client_.get(); }
    AdsCallState* ads_calld() const;
    LrsCallState* lrs_calld() const;

    void ResetBackoff();

    void MaybeStartLrsCall();
    void StopLrsCallLocked() Y_ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);

    // Returns non-OK if there has been an error since the last time the
    // ADS stream saw a response.
    const y_absl::Status& status() const { return status_; }

    void SubscribeLocked(const XdsResourceType* type,
                         const XdsResourceName& name)
        Y_ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
    void UnsubscribeLocked(const XdsResourceType* type,
                           const XdsResourceName& name,
                           bool delay_unsubscription)
        Y_ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);

   private:
    void OnConnectivityFailure(y_absl::Status status);

    // Enqueues error notifications to watchers.  Caller must drain
    // XdsClient::work_serializer_ after releasing the lock.
    void SetChannelStatusLocked(y_absl::Status status)
        Y_ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);

    // The owning xds client.
    WeakRefCountedPtr<XdsClient> xds_client_;

    const XdsBootstrap::XdsServer& server_;  // Owned by bootstrap.

    OrphanablePtr<XdsTransportFactory::XdsTransport> transport_;

    bool shutting_down_ = false;

    // The retryable XDS calls.
    OrphanablePtr<RetryableCall<AdsCallState>> ads_calld_;
    OrphanablePtr<RetryableCall<LrsCallState>> lrs_calld_;

    // Stores the most recent accepted resource version for each resource type.
    std::map<const XdsResourceType*, TString /*version*/>
        resource_type_version_map_;

    y_absl::Status status_;
  };

  struct ResourceState {
    std::map<ResourceWatcherInterface*, RefCountedPtr<ResourceWatcherInterface>>
        watchers;
    // The latest data seen for the resource.
    std::unique_ptr<XdsResourceType::ResourceData> resource;
    XdsApi::ResourceMetadata meta;
    bool ignored_deletion = false;
  };

  struct AuthorityState {
    RefCountedPtr<ChannelState> channel_state;
    std::map<const XdsResourceType*, std::map<XdsResourceKey, ResourceState>>
        resource_map;
  };

  struct LoadReportState {
    struct LocalityState {
      XdsClusterLocalityStats* locality_stats = nullptr;
      XdsClusterLocalityStats::Snapshot deleted_locality_stats;
    };

    XdsClusterDropStats* drop_stats = nullptr;
    XdsClusterDropStats::Snapshot deleted_drop_stats;
    std::map<RefCountedPtr<XdsLocalityName>, LocalityState,
             XdsLocalityName::Less>
        locality_stats;
    Timestamp last_report_time = Timestamp::Now();
  };

  // Load report data.
  using LoadReportMap = std::map<
      std::pair<TString /*cluster_name*/, TString /*eds_service_name*/>,
      LoadReportState>;

  struct LoadReportServer {
    RefCountedPtr<ChannelState> channel_state;
    LoadReportMap load_report_map;
  };

  // Sends an error notification to a specific set of watchers.
  void NotifyWatchersOnErrorLocked(
      const std::map<ResourceWatcherInterface*,
                     RefCountedPtr<ResourceWatcherInterface>>& watchers,
      y_absl::Status status);
  // Sends a resource-does-not-exist notification to a specific set of watchers.
  void NotifyWatchersOnResourceDoesNotExist(
      const std::map<ResourceWatcherInterface*,
                     RefCountedPtr<ResourceWatcherInterface>>& watchers);

  void MaybeRegisterResourceTypeLocked(const XdsResourceType* resource_type)
      Y_ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);

  // Gets the type for resource_type, or null if the type is unknown.
  const XdsResourceType* GetResourceTypeLocked(y_absl::string_view resource_type)
      Y_ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);

  y_absl::StatusOr<XdsResourceName> ParseXdsResourceName(
      y_absl::string_view name, const XdsResourceType* type);
  static TString ConstructFullXdsResourceName(
      y_absl::string_view authority, y_absl::string_view resource_type,
      const XdsResourceKey& key);

  XdsApi::ClusterLoadReportMap BuildLoadReportSnapshotLocked(
      const XdsBootstrap::XdsServer& xds_server, bool send_all_clusters,
      const std::set<TString>& clusters) Y_ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);

  RefCountedPtr<ChannelState> GetOrCreateChannelStateLocked(
      const XdsBootstrap::XdsServer& server, const char* reason)
      Y_ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);

  std::unique_ptr<XdsBootstrap> bootstrap_;
  OrphanablePtr<XdsTransportFactory> transport_factory_;
  const Duration request_timeout_;
  const bool xds_federation_enabled_;
  XdsApi api_;
  WorkSerializer work_serializer_;
  std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;

  Mutex mu_;

  // Stores resource type objects seen by type URL.
  std::map<y_absl::string_view /*resource_type*/, const XdsResourceType*>
      resource_types_ Y_ABSL_GUARDED_BY(mu_);
  upb::SymbolTable symtab_ Y_ABSL_GUARDED_BY(mu_);

  // Map of existing xDS server channels.
  // Key is owned by the bootstrap config.
  std::map<const XdsBootstrap::XdsServer*, ChannelState*>
      xds_server_channel_map_ Y_ABSL_GUARDED_BY(mu_);

  std::map<TString /*authority*/, AuthorityState> authority_state_map_
      Y_ABSL_GUARDED_BY(mu_);

  // Key is owned by the bootstrap config.
  std::map<const XdsBootstrap::XdsServer*, LoadReportServer>
      xds_load_report_server_map_ Y_ABSL_GUARDED_BY(mu_);

  // Stores started watchers whose resource name was not parsed successfully,
  // waiting to be cancelled or reset in Orphan().
  std::map<ResourceWatcherInterface*, RefCountedPtr<ResourceWatcherInterface>>
      invalid_watchers_ Y_ABSL_GUARDED_BY(mu_);

  bool shutting_down_ Y_ABSL_GUARDED_BY(mu_) = false;
};

}  // namespace grpc_core

#endif  // GRPC_SRC_CORE_EXT_XDS_XDS_CLIENT_H