aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-02-08 21:53:16 +0300
committerAlexander Smirnov <alex@ydb.tech>2024-02-09 19:19:03 +0300
commit63a65e0d2ad17b408177919a1092ca7a896ea306 (patch)
tree64593e615b4c8019b8a29fe6effa70a9b81ed715
parent81e73067545279c2f53a4f19857c3d85717cd644 (diff)
downloadydb-63a65e0d2ad17b408177919a1092ca7a896ea306.tar.gz
Intermediate changes
-rw-r--r--yt/yt/RpcProxyProtocolVersion.txt2
-rw-r--r--yt/yt/client/cache/rpc.cpp12
-rw-r--r--yt/yt/client/federated/cache.cpp122
-rw-r--r--yt/yt/client/federated/cache.h51
-rw-r--r--yt/yt/client/federated/unittests/cache_ut.cpp91
-rw-r--r--yt/yt_proto/yt/client/cache/proto/config.proto5
6 files changed, 164 insertions, 119 deletions
diff --git a/yt/yt/RpcProxyProtocolVersion.txt b/yt/yt/RpcProxyProtocolVersion.txt
index de98282d4d..0e120bc485 100644
--- a/yt/yt/RpcProxyProtocolVersion.txt
+++ b/yt/yt/RpcProxyProtocolVersion.txt
@@ -16,7 +16,7 @@ SET(YT_RPC_PROXY_SERVER_PROTOCOL_VERSION_MINOR 2)
SET(YT_RPC_MODIFY_ROWS_STRONG_LOCKS_VERSION 2)
# RPC python client has protocol version plus some patch version
-SET(YT_RPC_PYTHON_BINDINGS_PATCH_VERSION 17)
+SET(YT_RPC_PYTHON_BINDINGS_PATCH_VERSION 18)
# YT proto package has protocol version plus some patch version
SET(YT_PROTO_PACKAGE_PATCH_VERSION 13)
diff --git a/yt/yt/client/cache/rpc.cpp b/yt/yt/client/cache/rpc.cpp
index 3ae8626969..2da29bbb64 100644
--- a/yt/yt/client/cache/rpc.cpp
+++ b/yt/yt/client/cache/rpc.cpp
@@ -85,6 +85,18 @@ NApi::NRpcProxy::TConnectionConfigPtr GetConnectionConfig(const TConfig& config)
connectionConfig->RetryingChannel->RetryTimeout = TDuration::MilliSeconds(config.GetRetryTimeout());
}
+ if (config.HasClusterTag()) {
+ connectionConfig->ClusterTag = NApi::TClusterTag(config.GetClusterTag());
+ }
+
+ if (config.HasClockClusterTag()) {
+ connectionConfig->ClockClusterTag = NObjectClient::TCellTag(config.GetClockClusterTag());
+ }
+
+ if (config.HasUdfRegistryPath()) {
+ connectionConfig->UdfRegistryPath = config.GetUdfRegistryPath();
+ }
+
connectionConfig->Postprocess();
return connectionConfig;
diff --git a/yt/yt/client/federated/cache.cpp b/yt/yt/client/federated/cache.cpp
index 79081582d5..39ad3fbe1b 100644
--- a/yt/yt/client/federated/cache.cpp
+++ b/yt/yt/client/federated/cache.cpp
@@ -1,5 +1,6 @@
#include "cache.h"
#include "client.h"
+#include "connection.h"
#include <yt/yt/client/api/options.h>
@@ -21,37 +22,64 @@ public:
TClientsCache(
TClustersConfig clustersConfig,
NApi::TClientOptions options,
- TFederationConfigPtr federationConfig,
+ TConnectionConfigPtr federationConfig,
TString clusterSeparator)
: ClustersConfig_(std::move(clustersConfig))
, Options_(std::move(options))
, FederationConfig_(std::move(federationConfig))
, ClusterSeparator_(std::move(clusterSeparator))
{}
-
protected:
NApi::IClientPtr CreateClient(TStringBuf clusterUrl) override
{
std::vector<TString> clusters;
NYT::NApi::IClientPtr client;
StringSplitter(clusterUrl).SplitByString(ClusterSeparator_).SkipEmpty().Collect(&clusters);
- if (clusters.size() == 1) {
- return NCache::CreateClient(NCache::MakeClusterConfig(ClustersConfig_, clusterUrl), Options_);
- } else {
- std::vector<NYT::NApi::IClientPtr> clients;
- clients.reserve(clusters.size());
- for (auto& cluster : clusters) {
- clients.push_back(GetClient(cluster));
- }
- return NFederated::CreateClient(std::move(clients), FederationConfig_);
+ switch (clusters.size()) {
+ case 0:
+ THROW_ERROR_EXCEPTION("Can't create client without cluster");
+ case 1:
+ return NCache::CreateClient(NCache::MakeClusterConfig(ClustersConfig_, clusterUrl), Options_);
+ default:
+ return CreateFederatedClient(clusters);
+ }
+ }
+private:
+ NApi::IClientPtr CreateFederatedClient(const std::vector<TString>& clusters)
+ {
+ THashSet<TString> seenClusters;
+ for (const auto& connectionConfig : FederationConfig_->RpcProxyConnections) {
+ THROW_ERROR_EXCEPTION_UNLESS(
+ connectionConfig->ClusterUrl,
+ "Cluster url is mandatory for federated client connection config");
+ seenClusters.insert(connectionConfig->ClusterUrl.value());
+ }
+
+ THROW_ERROR_EXCEPTION_UNLESS(
+ clusters.size() == seenClusters.size(),
+ "Desired (%v) and configured (%v) clusters count mismatch",
+ clusters,
+ seenClusters);
+
+ for (const auto& cluster : clusters) {
+ THROW_ERROR_EXCEPTION_UNLESS(
+ seenClusters.contains(cluster),
+ "No federated client configuration for cluster %v", cluster);
+ }
+
+ if (!FederatedConnection_) {
+ NYT::NApi::NRpcProxy::TConnectionOptions options; // TODO(ashishkin): use proper invoker here?
+ FederatedConnection_ = CreateConnection(FederationConfig_, std::move(options));
}
+ return FederatedConnection_->CreateClient(Options_);
}
private:
const TClustersConfig ClustersConfig_;
const NApi::TClientOptions Options_;
- const TFederationConfigPtr FederationConfig_;
+ const NFederated::TConnectionConfigPtr FederationConfig_;
const TString ClusterSeparator_;
+ NApi::IConnectionPtr FederatedConnection_;
};
////////////////////////////////////////////////////////////////////////////////
@@ -59,53 +87,20 @@ private:
} // namespace
IClientsCachePtr CreateFederatedClientsCache(
- TFederationConfigPtr federatedConfig,
- const TClustersConfig& config,
+ TConnectionConfigPtr federatedConfig,
+ const TClustersConfig& clustersConfig,
const NYT::NApi::TClientOptions& options,
TString clusterSeparator)
{
return NYT::New<TClientsCache>(
- std::move(config),
- std::move(options),
+ clustersConfig,
+ options,
std::move(federatedConfig),
std::move(clusterSeparator));
}
IClientsCachePtr CreateFederatedClientsCache(
- TFederationConfigPtr federationConfig,
- const TConfig& config,
- const NYT::NApi::TClientOptions& options,
- TString clusterSeparator)
-{
- TClustersConfig clustersConfig;
- *clustersConfig.MutableDefaultConfig() = config;
-
- return CreateFederatedClientsCache(
- std::move(federationConfig),
- std::move(clustersConfig),
- std::move(options),
- std::move(clusterSeparator));
-}
-
-IClientsCachePtr CreateFederatedClientsCache(
- TString chaosBundleName,
- const TClustersConfig& config,
- const NYT::NApi::TClientOptions& options,
- TString clusterSeparator)
-{
- auto federationConfig = NYT::New<NYT::NClient::NFederated::TFederationConfig>();
- if (!chaosBundleName.empty()) {
- federationConfig->BundleName = std::move(chaosBundleName);
- }
- return CreateFederatedClientsCache(
- std::move(federationConfig),
- std::move(config),
- std::move(options),
- std::move(clusterSeparator));
-}
-
-IClientsCachePtr CreateFederatedClientsCache(
- TString chaosBundleName,
+ TConnectionConfigPtr federatedConfig,
const TConfig& config,
const NYT::NApi::TClientOptions& options,
TString clusterSeparator)
@@ -113,33 +108,10 @@ IClientsCachePtr CreateFederatedClientsCache(
TClustersConfig clustersConfig;
*clustersConfig.MutableDefaultConfig() = config;
- return CreateFederatedClientsCache(
- std::move(chaosBundleName),
+ return NYT::New<TClientsCache>(
std::move(clustersConfig),
options,
- std::move(clusterSeparator));
-}
-
-IClientsCachePtr CreateFederatedClientsCache(
- const TConfig& config,
- TString chaosBundleName,
- TString clusterSeparator)
-{
- return CreateFederatedClientsCache(
- std::move(chaosBundleName),
- config,
- NApi::GetClientOpsFromEnvStatic(),
- std::move(clusterSeparator));
-}
-
-IClientsCachePtr CreateFederatedClientsCache(
- TString chaosBundleName,
- TString clusterSeparator)
-{
- return CreateFederatedClientsCache(
- std::move(chaosBundleName),
- TClustersConfig{},
- NApi::GetClientOpsFromEnvStatic(),
+ std::move(federatedConfig),
std::move(clusterSeparator));
}
diff --git a/yt/yt/client/federated/cache.h b/yt/yt/client/federated/cache.h
index 93b9831d83..ab244a4a1f 100644
--- a/yt/yt/client/federated/cache.h
+++ b/yt/yt/client/federated/cache.h
@@ -13,53 +13,28 @@ using NCache::TConfig;
////////////////////////////////////////////////////////////////////////////////
-//! Creates clients cache which explicitly given federation and clusters config.
-//! Server name is always overwritten with requested.
-//! Creates FederatedClient when clusters are enumerated with separator(default is '+').
-//! For example: seneca-sas+seneca-vla.
+//! Creates clients cache for generic and federated clients.
+//! Federated clients created with federatedConfig, and generic clients created with clustersConfig.
+//! Which client to create decided by cluster url: if several clusters concatenated by clusterSeparator requested then
+//! federated client it is, generic client otherwise.
+//! For example, for "markov" generic client will be created, and for "seneca-sas+seneca-vla" federated one.
IClientsCachePtr CreateFederatedClientsCache(
- TFederationConfigPtr federationConfig,
- const TClustersConfig& config,
+ TConnectionConfigPtr federatedConfig,
+ const TClustersConfig& clustersConfig,
const NYT::NApi::TClientOptions& options,
TString clusterSeparator = "+");
-//! Shortcut to create cache with default federation config.
+//! Creates clients cache for generic and federated clients.
+//! Federated clients created with federatedConfig, and generic clients created with config.
+//! Which client to create decided by cluster url: if several clusters concatenated by clusterSeparator requested then
+//! federated client it is, generic client otherwise.
+//! For example, for "markov" generic client will be created, and for "seneca-sas+seneca-vla" federated one.
IClientsCachePtr CreateFederatedClientsCache(
- TFederationConfigPtr federationConfig,
+ TConnectionConfigPtr federatedConfig,
const TConfig& config,
const NYT::NApi::TClientOptions& options,
TString clusterSeparator = "+");
-//! Shortcut to create cache with default federation config.
-IClientsCachePtr CreateFederatedClientsCache(
- TString chaosBundleName,
- const TClustersConfig& config,
- const NYT::NApi::TClientOptions& options,
- TString clusterSeparator = "+");
-
-//! Creates clients cache which shares same config (except server name).
-//! Shortcut to create cache with default federation config.
-IClientsCachePtr CreateFederatedClientsCache(
- TString chaosBundleName,
- const TConfig& config,
- const NYT::NApi::TClientOptions& options,
- TString clusterSeparator = "+");
-
-//! Creates clients cache which shares same config (except server name).
-//! Shortcut to create cache with default federation config.
-//! Shortcut to use client options from env.
-IClientsCachePtr CreateFederatedClientsCache(
- TString chaosBundleName,
- const TConfig& config,
- TString clusterSeparator = "+");
-
-//! Shortcut to create cache with default federation config.
-//! Shortcut to use client options from env.
-//! Shortcut to create cache with default clusters config.
-IClientsCachePtr CreateFederatedClientsCache(
- TString chaosBundleName,
- TString clusterSeparator = "+");
-
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NClient::NFederated
diff --git a/yt/yt/client/federated/unittests/cache_ut.cpp b/yt/yt/client/federated/unittests/cache_ut.cpp
index d4ef45ffb7..d13bfff5dd 100644
--- a/yt/yt/client/federated/unittests/cache_ut.cpp
+++ b/yt/yt/client/federated/unittests/cache_ut.cpp
@@ -1,7 +1,10 @@
#include <yt/yt/client/federated/cache.h>
#include <yt/yt/client/federated/config.h>
-
+#include <yt/yt/client/api/options.h>
#include <yt/yt/client/cache/cache.h>
+#include <yt/yt/core/misc/error.h>
+
+#include <yt/yt_proto/yt/client/cache/proto/config.pb.h>
#include <library/cpp/testing/gtest/gtest.h>
@@ -16,9 +19,13 @@ using namespace NYT::NApi;
TEST(TFederatedClientsCacheTest, GetSameClient)
{
SetEnv("YT_TOKEN", "AAAA-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA");
- auto cache = CreateFederatedClientsCache("my_bundle");
- auto client1 = cache->GetClient("localhost");
- auto client2 = cache->GetClient("localhost");
+ auto ytClientsCache = CreateFederatedClientsCache(
+ New<TConnectionConfig>(),
+ TClustersConfig{},
+ NApi::GetClientOpsFromEnvStatic());
+
+ auto client1 = ytClientsCache->GetClient("localhost");
+ auto client2 = ytClientsCache->GetClient("localhost");
EXPECT_TRUE(client1 == client2);
@@ -26,7 +33,81 @@ TEST(TFederatedClientsCacheTest, GetSameClient)
// and to remove references to TConnection that it's holding.
// It's because we don't actually create YT Server.
client1->GetConnection()->Terminate();
- client2->GetConnection()->Terminate();
+}
+
+TEST(TFederatedClientsCacheTest, GetFederatedWithEmptyConfig)
+{
+ SetEnv("YT_TOKEN", "AAAA-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA");
+ auto ytClientsCache = CreateFederatedClientsCache(
+ New<TConnectionConfig>(),
+ TClustersConfig{},
+ NApi::GetClientOpsFromEnvStatic());
+
+ EXPECT_THROW(
+ ytClientsCache->GetClient("primary+secondary"),
+ NYT::TErrorException);
+}
+
+TEST(TFederatedClientsCacheTest, ConfigurationAndClusterUrlMismatch1)
+{
+ SetEnv("YT_TOKEN", "AAAA-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA");
+ auto connectionConfig = New<TConnectionConfig>();
+ connectionConfig->BundleName = TString{"my_bundle"};
+ connectionConfig->RpcProxyConnections.push_back(New<NApi::NRpcProxy::TConnectionConfig>());
+ connectionConfig->RpcProxyConnections.back()->ClusterUrl = TString{"primary"};
+ connectionConfig->RpcProxyConnections.push_back(New<NApi::NRpcProxy::TConnectionConfig>());
+ connectionConfig->RpcProxyConnections.back()->ClusterUrl = TString{"secondary"};
+
+ auto ytClientsCache = CreateFederatedClientsCache(
+ connectionConfig,
+ TClustersConfig{},
+ NApi::GetClientOpsFromEnvStatic());
+
+ EXPECT_THROW(
+ ytClientsCache->GetClient("primary+tertiary"),
+ NYT::TErrorException);
+}
+
+TEST(TFederatedClientsCacheTest, ConfigurationAndClusterUrlMismatch2)
+{
+ SetEnv("YT_TOKEN", "AAAA-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA");
+ auto connectionConfig = New<TConnectionConfig>();
+ connectionConfig->BundleName = TString{"my_bundle"};
+ connectionConfig->RpcProxyConnections.push_back(New<NApi::NRpcProxy::TConnectionConfig>());
+ connectionConfig->RpcProxyConnections.back()->ClusterUrl = TString{"primary"};
+ connectionConfig->RpcProxyConnections.push_back(New<NApi::NRpcProxy::TConnectionConfig>());
+ connectionConfig->RpcProxyConnections.back()->ClusterUrl = TString{"secondary"};
+ connectionConfig->RpcProxyConnections.push_back(New<NApi::NRpcProxy::TConnectionConfig>());
+ connectionConfig->RpcProxyConnections.back()->ClusterUrl = TString{"tertiary"};
+
+ auto ytClientsCache = CreateFederatedClientsCache(
+ connectionConfig,
+ TClustersConfig{},
+ NApi::GetClientOpsFromEnvStatic());
+
+ EXPECT_THROW(
+ ytClientsCache->GetClient("primary+tertiary"),
+ NYT::TErrorException);
+}
+
+TEST(TFederatedClientsCacheTest, ConfigurationMissingCluster)
+{
+ SetEnv("YT_TOKEN", "AAAA-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA");
+ auto connectionConfig = New<TConnectionConfig>();
+ connectionConfig->BundleName = TString{"my_bundle"};
+ connectionConfig->RpcProxyConnections.push_back(New<NApi::NRpcProxy::TConnectionConfig>());
+ connectionConfig->RpcProxyConnections.back()->ClusterUrl = TString{"primary"};
+ connectionConfig->RpcProxyConnections.push_back(New<NApi::NRpcProxy::TConnectionConfig>());
+ connectionConfig->RpcProxyConnections.back()->ClusterUrl = TString{"secondary"};
+
+ auto ytClientsCache = CreateFederatedClientsCache(
+ connectionConfig,
+ TClustersConfig{},
+ NApi::GetClientOpsFromEnvStatic());
+
+ EXPECT_THROW(
+ ytClientsCache->GetClient("primary+secondary+tertiary"),
+ NYT::TErrorException);
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt_proto/yt/client/cache/proto/config.proto b/yt/yt_proto/yt/client/cache/proto/config.proto
index 6350308d37..0311882125 100644
--- a/yt/yt_proto/yt/client/cache/proto/config.proto
+++ b/yt/yt_proto/yt/client/cache/proto/config.proto
@@ -3,6 +3,7 @@ package NYT.NClient.NCache;
import "yt_proto/yt/core/yson/proto/protobuf_interop.proto";
option (NYT.NYson.NProto.derive_underscore_case_names) = true;
+option go_package = "a.yandex-team.ru/yt/go/proto/client/cache";
enum ECompressionCodec
{
@@ -39,6 +40,10 @@ message TConfig
optional bool EnableProxyDiscovery = 19;
optional bool EnablePowerOfTwoChoicesStrategy = 21;
optional bool EnableSelectQueryTracingTag = 22;
+
+ optional uint32 ClusterTag = 23;
+ optional uint32 ClockClusterTag = 24;
+ optional string UdfRegistryPath = 25;
}
message TClustersConfig