diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-02-08 21:53:16 +0300 |
---|---|---|
committer | Alexander Smirnov <alex@ydb.tech> | 2024-02-09 19:19:03 +0300 |
commit | 63a65e0d2ad17b408177919a1092ca7a896ea306 (patch) | |
tree | 64593e615b4c8019b8a29fe6effa70a9b81ed715 | |
parent | 81e73067545279c2f53a4f19857c3d85717cd644 (diff) | |
download | ydb-63a65e0d2ad17b408177919a1092ca7a896ea306.tar.gz |
Intermediate changes
-rw-r--r-- | yt/yt/RpcProxyProtocolVersion.txt | 2 | ||||
-rw-r--r-- | yt/yt/client/cache/rpc.cpp | 12 | ||||
-rw-r--r-- | yt/yt/client/federated/cache.cpp | 122 | ||||
-rw-r--r-- | yt/yt/client/federated/cache.h | 51 | ||||
-rw-r--r-- | yt/yt/client/federated/unittests/cache_ut.cpp | 91 | ||||
-rw-r--r-- | yt/yt_proto/yt/client/cache/proto/config.proto | 5 |
6 files changed, 164 insertions, 119 deletions
diff --git a/yt/yt/RpcProxyProtocolVersion.txt b/yt/yt/RpcProxyProtocolVersion.txt index de98282d4d..0e120bc485 100644 --- a/yt/yt/RpcProxyProtocolVersion.txt +++ b/yt/yt/RpcProxyProtocolVersion.txt @@ -16,7 +16,7 @@ SET(YT_RPC_PROXY_SERVER_PROTOCOL_VERSION_MINOR 2) SET(YT_RPC_MODIFY_ROWS_STRONG_LOCKS_VERSION 2) # RPC python client has protocol version plus some patch version -SET(YT_RPC_PYTHON_BINDINGS_PATCH_VERSION 17) +SET(YT_RPC_PYTHON_BINDINGS_PATCH_VERSION 18) # YT proto package has protocol version plus some patch version SET(YT_PROTO_PACKAGE_PATCH_VERSION 13) diff --git a/yt/yt/client/cache/rpc.cpp b/yt/yt/client/cache/rpc.cpp index 3ae8626969..2da29bbb64 100644 --- a/yt/yt/client/cache/rpc.cpp +++ b/yt/yt/client/cache/rpc.cpp @@ -85,6 +85,18 @@ NApi::NRpcProxy::TConnectionConfigPtr GetConnectionConfig(const TConfig& config) connectionConfig->RetryingChannel->RetryTimeout = TDuration::MilliSeconds(config.GetRetryTimeout()); } + if (config.HasClusterTag()) { + connectionConfig->ClusterTag = NApi::TClusterTag(config.GetClusterTag()); + } + + if (config.HasClockClusterTag()) { + connectionConfig->ClockClusterTag = NObjectClient::TCellTag(config.GetClockClusterTag()); + } + + if (config.HasUdfRegistryPath()) { + connectionConfig->UdfRegistryPath = config.GetUdfRegistryPath(); + } + connectionConfig->Postprocess(); return connectionConfig; diff --git a/yt/yt/client/federated/cache.cpp b/yt/yt/client/federated/cache.cpp index 79081582d5..39ad3fbe1b 100644 --- a/yt/yt/client/federated/cache.cpp +++ b/yt/yt/client/federated/cache.cpp @@ -1,5 +1,6 @@ #include "cache.h" #include "client.h" +#include "connection.h" #include <yt/yt/client/api/options.h> @@ -21,37 +22,64 @@ public: TClientsCache( TClustersConfig clustersConfig, NApi::TClientOptions options, - TFederationConfigPtr federationConfig, + TConnectionConfigPtr federationConfig, TString clusterSeparator) : ClustersConfig_(std::move(clustersConfig)) , Options_(std::move(options)) , FederationConfig_(std::move(federationConfig)) , ClusterSeparator_(std::move(clusterSeparator)) {} - protected: NApi::IClientPtr CreateClient(TStringBuf clusterUrl) override { std::vector<TString> clusters; NYT::NApi::IClientPtr client; StringSplitter(clusterUrl).SplitByString(ClusterSeparator_).SkipEmpty().Collect(&clusters); - if (clusters.size() == 1) { - return NCache::CreateClient(NCache::MakeClusterConfig(ClustersConfig_, clusterUrl), Options_); - } else { - std::vector<NYT::NApi::IClientPtr> clients; - clients.reserve(clusters.size()); - for (auto& cluster : clusters) { - clients.push_back(GetClient(cluster)); - } - return NFederated::CreateClient(std::move(clients), FederationConfig_); + switch (clusters.size()) { + case 0: + THROW_ERROR_EXCEPTION("Can't create client without cluster"); + case 1: + return NCache::CreateClient(NCache::MakeClusterConfig(ClustersConfig_, clusterUrl), Options_); + default: + return CreateFederatedClient(clusters); + } + } +private: + NApi::IClientPtr CreateFederatedClient(const std::vector<TString>& clusters) + { + THashSet<TString> seenClusters; + for (const auto& connectionConfig : FederationConfig_->RpcProxyConnections) { + THROW_ERROR_EXCEPTION_UNLESS( + connectionConfig->ClusterUrl, + "Cluster url is mandatory for federated client connection config"); + seenClusters.insert(connectionConfig->ClusterUrl.value()); + } + + THROW_ERROR_EXCEPTION_UNLESS( + clusters.size() == seenClusters.size(), + "Desired (%v) and configured (%v) clusters count mismatch", + clusters, + seenClusters); + + for (const auto& cluster : clusters) { + THROW_ERROR_EXCEPTION_UNLESS( + seenClusters.contains(cluster), + "No federated client configuration for cluster %v", cluster); + } + + if (!FederatedConnection_) { + NYT::NApi::NRpcProxy::TConnectionOptions options; // TODO(ashishkin): use proper invoker here? + FederatedConnection_ = CreateConnection(FederationConfig_, std::move(options)); } + return FederatedConnection_->CreateClient(Options_); } private: const TClustersConfig ClustersConfig_; const NApi::TClientOptions Options_; - const TFederationConfigPtr FederationConfig_; + const NFederated::TConnectionConfigPtr FederationConfig_; const TString ClusterSeparator_; + NApi::IConnectionPtr FederatedConnection_; }; //////////////////////////////////////////////////////////////////////////////// @@ -59,53 +87,20 @@ private: } // namespace IClientsCachePtr CreateFederatedClientsCache( - TFederationConfigPtr federatedConfig, - const TClustersConfig& config, + TConnectionConfigPtr federatedConfig, + const TClustersConfig& clustersConfig, const NYT::NApi::TClientOptions& options, TString clusterSeparator) { return NYT::New<TClientsCache>( - std::move(config), - std::move(options), + clustersConfig, + options, std::move(federatedConfig), std::move(clusterSeparator)); } IClientsCachePtr CreateFederatedClientsCache( - TFederationConfigPtr federationConfig, - const TConfig& config, - const NYT::NApi::TClientOptions& options, - TString clusterSeparator) -{ - TClustersConfig clustersConfig; - *clustersConfig.MutableDefaultConfig() = config; - - return CreateFederatedClientsCache( - std::move(federationConfig), - std::move(clustersConfig), - std::move(options), - std::move(clusterSeparator)); -} - -IClientsCachePtr CreateFederatedClientsCache( - TString chaosBundleName, - const TClustersConfig& config, - const NYT::NApi::TClientOptions& options, - TString clusterSeparator) -{ - auto federationConfig = NYT::New<NYT::NClient::NFederated::TFederationConfig>(); - if (!chaosBundleName.empty()) { - federationConfig->BundleName = std::move(chaosBundleName); - } - return CreateFederatedClientsCache( - std::move(federationConfig), - std::move(config), - std::move(options), - std::move(clusterSeparator)); -} - -IClientsCachePtr CreateFederatedClientsCache( - TString chaosBundleName, + TConnectionConfigPtr federatedConfig, const TConfig& config, const NYT::NApi::TClientOptions& options, TString clusterSeparator) @@ -113,33 +108,10 @@ IClientsCachePtr CreateFederatedClientsCache( TClustersConfig clustersConfig; *clustersConfig.MutableDefaultConfig() = config; - return CreateFederatedClientsCache( - std::move(chaosBundleName), + return NYT::New<TClientsCache>( std::move(clustersConfig), options, - std::move(clusterSeparator)); -} - -IClientsCachePtr CreateFederatedClientsCache( - const TConfig& config, - TString chaosBundleName, - TString clusterSeparator) -{ - return CreateFederatedClientsCache( - std::move(chaosBundleName), - config, - NApi::GetClientOpsFromEnvStatic(), - std::move(clusterSeparator)); -} - -IClientsCachePtr CreateFederatedClientsCache( - TString chaosBundleName, - TString clusterSeparator) -{ - return CreateFederatedClientsCache( - std::move(chaosBundleName), - TClustersConfig{}, - NApi::GetClientOpsFromEnvStatic(), + std::move(federatedConfig), std::move(clusterSeparator)); } diff --git a/yt/yt/client/federated/cache.h b/yt/yt/client/federated/cache.h index 93b9831d83..ab244a4a1f 100644 --- a/yt/yt/client/federated/cache.h +++ b/yt/yt/client/federated/cache.h @@ -13,53 +13,28 @@ using NCache::TConfig; //////////////////////////////////////////////////////////////////////////////// -//! Creates clients cache which explicitly given federation and clusters config. -//! Server name is always overwritten with requested. -//! Creates FederatedClient when clusters are enumerated with separator(default is '+'). -//! For example: seneca-sas+seneca-vla. +//! Creates clients cache for generic and federated clients. +//! Federated clients created with federatedConfig, and generic clients created with clustersConfig. +//! Which client to create decided by cluster url: if several clusters concatenated by clusterSeparator requested then +//! federated client it is, generic client otherwise. +//! For example, for "markov" generic client will be created, and for "seneca-sas+seneca-vla" federated one. IClientsCachePtr CreateFederatedClientsCache( - TFederationConfigPtr federationConfig, - const TClustersConfig& config, + TConnectionConfigPtr federatedConfig, + const TClustersConfig& clustersConfig, const NYT::NApi::TClientOptions& options, TString clusterSeparator = "+"); -//! Shortcut to create cache with default federation config. +//! Creates clients cache for generic and federated clients. +//! Federated clients created with federatedConfig, and generic clients created with config. +//! Which client to create decided by cluster url: if several clusters concatenated by clusterSeparator requested then +//! federated client it is, generic client otherwise. +//! For example, for "markov" generic client will be created, and for "seneca-sas+seneca-vla" federated one. IClientsCachePtr CreateFederatedClientsCache( - TFederationConfigPtr federationConfig, + TConnectionConfigPtr federatedConfig, const TConfig& config, const NYT::NApi::TClientOptions& options, TString clusterSeparator = "+"); -//! Shortcut to create cache with default federation config. -IClientsCachePtr CreateFederatedClientsCache( - TString chaosBundleName, - const TClustersConfig& config, - const NYT::NApi::TClientOptions& options, - TString clusterSeparator = "+"); - -//! Creates clients cache which shares same config (except server name). -//! Shortcut to create cache with default federation config. -IClientsCachePtr CreateFederatedClientsCache( - TString chaosBundleName, - const TConfig& config, - const NYT::NApi::TClientOptions& options, - TString clusterSeparator = "+"); - -//! Creates clients cache which shares same config (except server name). -//! Shortcut to create cache with default federation config. -//! Shortcut to use client options from env. -IClientsCachePtr CreateFederatedClientsCache( - TString chaosBundleName, - const TConfig& config, - TString clusterSeparator = "+"); - -//! Shortcut to create cache with default federation config. -//! Shortcut to use client options from env. -//! Shortcut to create cache with default clusters config. -IClientsCachePtr CreateFederatedClientsCache( - TString chaosBundleName, - TString clusterSeparator = "+"); - //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NClient::NFederated diff --git a/yt/yt/client/federated/unittests/cache_ut.cpp b/yt/yt/client/federated/unittests/cache_ut.cpp index d4ef45ffb7..d13bfff5dd 100644 --- a/yt/yt/client/federated/unittests/cache_ut.cpp +++ b/yt/yt/client/federated/unittests/cache_ut.cpp @@ -1,7 +1,10 @@ #include <yt/yt/client/federated/cache.h> #include <yt/yt/client/federated/config.h> - +#include <yt/yt/client/api/options.h> #include <yt/yt/client/cache/cache.h> +#include <yt/yt/core/misc/error.h> + +#include <yt/yt_proto/yt/client/cache/proto/config.pb.h> #include <library/cpp/testing/gtest/gtest.h> @@ -16,9 +19,13 @@ using namespace NYT::NApi; TEST(TFederatedClientsCacheTest, GetSameClient) { SetEnv("YT_TOKEN", "AAAA-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"); - auto cache = CreateFederatedClientsCache("my_bundle"); - auto client1 = cache->GetClient("localhost"); - auto client2 = cache->GetClient("localhost"); + auto ytClientsCache = CreateFederatedClientsCache( + New<TConnectionConfig>(), + TClustersConfig{}, + NApi::GetClientOpsFromEnvStatic()); + + auto client1 = ytClientsCache->GetClient("localhost"); + auto client2 = ytClientsCache->GetClient("localhost"); EXPECT_TRUE(client1 == client2); @@ -26,7 +33,81 @@ TEST(TFederatedClientsCacheTest, GetSameClient) // and to remove references to TConnection that it's holding. // It's because we don't actually create YT Server. client1->GetConnection()->Terminate(); - client2->GetConnection()->Terminate(); +} + +TEST(TFederatedClientsCacheTest, GetFederatedWithEmptyConfig) +{ + SetEnv("YT_TOKEN", "AAAA-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"); + auto ytClientsCache = CreateFederatedClientsCache( + New<TConnectionConfig>(), + TClustersConfig{}, + NApi::GetClientOpsFromEnvStatic()); + + EXPECT_THROW( + ytClientsCache->GetClient("primary+secondary"), + NYT::TErrorException); +} + +TEST(TFederatedClientsCacheTest, ConfigurationAndClusterUrlMismatch1) +{ + SetEnv("YT_TOKEN", "AAAA-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"); + auto connectionConfig = New<TConnectionConfig>(); + connectionConfig->BundleName = TString{"my_bundle"}; + connectionConfig->RpcProxyConnections.push_back(New<NApi::NRpcProxy::TConnectionConfig>()); + connectionConfig->RpcProxyConnections.back()->ClusterUrl = TString{"primary"}; + connectionConfig->RpcProxyConnections.push_back(New<NApi::NRpcProxy::TConnectionConfig>()); + connectionConfig->RpcProxyConnections.back()->ClusterUrl = TString{"secondary"}; + + auto ytClientsCache = CreateFederatedClientsCache( + connectionConfig, + TClustersConfig{}, + NApi::GetClientOpsFromEnvStatic()); + + EXPECT_THROW( + ytClientsCache->GetClient("primary+tertiary"), + NYT::TErrorException); +} + +TEST(TFederatedClientsCacheTest, ConfigurationAndClusterUrlMismatch2) +{ + SetEnv("YT_TOKEN", "AAAA-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"); + auto connectionConfig = New<TConnectionConfig>(); + connectionConfig->BundleName = TString{"my_bundle"}; + connectionConfig->RpcProxyConnections.push_back(New<NApi::NRpcProxy::TConnectionConfig>()); + connectionConfig->RpcProxyConnections.back()->ClusterUrl = TString{"primary"}; + connectionConfig->RpcProxyConnections.push_back(New<NApi::NRpcProxy::TConnectionConfig>()); + connectionConfig->RpcProxyConnections.back()->ClusterUrl = TString{"secondary"}; + connectionConfig->RpcProxyConnections.push_back(New<NApi::NRpcProxy::TConnectionConfig>()); + connectionConfig->RpcProxyConnections.back()->ClusterUrl = TString{"tertiary"}; + + auto ytClientsCache = CreateFederatedClientsCache( + connectionConfig, + TClustersConfig{}, + NApi::GetClientOpsFromEnvStatic()); + + EXPECT_THROW( + ytClientsCache->GetClient("primary+tertiary"), + NYT::TErrorException); +} + +TEST(TFederatedClientsCacheTest, ConfigurationMissingCluster) +{ + SetEnv("YT_TOKEN", "AAAA-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"); + auto connectionConfig = New<TConnectionConfig>(); + connectionConfig->BundleName = TString{"my_bundle"}; + connectionConfig->RpcProxyConnections.push_back(New<NApi::NRpcProxy::TConnectionConfig>()); + connectionConfig->RpcProxyConnections.back()->ClusterUrl = TString{"primary"}; + connectionConfig->RpcProxyConnections.push_back(New<NApi::NRpcProxy::TConnectionConfig>()); + connectionConfig->RpcProxyConnections.back()->ClusterUrl = TString{"secondary"}; + + auto ytClientsCache = CreateFederatedClientsCache( + connectionConfig, + TClustersConfig{}, + NApi::GetClientOpsFromEnvStatic()); + + EXPECT_THROW( + ytClientsCache->GetClient("primary+secondary+tertiary"), + NYT::TErrorException); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt_proto/yt/client/cache/proto/config.proto b/yt/yt_proto/yt/client/cache/proto/config.proto index 6350308d37..0311882125 100644 --- a/yt/yt_proto/yt/client/cache/proto/config.proto +++ b/yt/yt_proto/yt/client/cache/proto/config.proto @@ -3,6 +3,7 @@ package NYT.NClient.NCache; import "yt_proto/yt/core/yson/proto/protobuf_interop.proto"; option (NYT.NYson.NProto.derive_underscore_case_names) = true; +option go_package = "a.yandex-team.ru/yt/go/proto/client/cache"; enum ECompressionCodec { @@ -39,6 +40,10 @@ message TConfig optional bool EnableProxyDiscovery = 19; optional bool EnablePowerOfTwoChoicesStrategy = 21; optional bool EnableSelectQueryTracingTag = 22; + + optional uint32 ClusterTag = 23; + optional uint32 ClockClusterTag = 24; + optional string UdfRegistryPath = 25; } message TClustersConfig |