aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-10-14 11:38:29 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-10-14 11:52:32 +0300
commit86b5d1c9816da4926c2810c27f0b60948c249c21 (patch)
tree2eeb30ef74165a920bbfef5777170f149a8705f0
parent37ba3bb82a83bcb55616ce7817c7087413b0e429 (diff)
downloadydb-86b5d1c9816da4926c2810c27f0b60948c249c21.tar.gz
Intermediate changes
commit_hash:c7b424f51fc54aa5731f2bff20943a09dbe30095
-rw-r--r--contrib/libs/cxxsupp/libcxxmsvc/ya.make3
-rw-r--r--yt/yt/client/federated/client.cpp123
-rw-r--r--yt/yt/client/federated/config.cpp3
-rw-r--r--yt/yt/client/federated/config.h3
-rw-r--r--yt/yt/client/federated/unittests/client_ut.cpp14
-rw-r--r--yt/yt/client/federated/unittests/connection_ut.cpp87
6 files changed, 167 insertions, 66 deletions
diff --git a/contrib/libs/cxxsupp/libcxxmsvc/ya.make b/contrib/libs/cxxsupp/libcxxmsvc/ya.make
index db9c8f112dc..ea8eefb5cc5 100644
--- a/contrib/libs/cxxsupp/libcxxmsvc/ya.make
+++ b/contrib/libs/cxxsupp/libcxxmsvc/ya.make
@@ -2,7 +2,8 @@
LIBRARY()
-BUILD_ONLY_IF(OS_WINDOWS)
+BUILD_ONLY_IF(MSVC)
+NO_BUILD_IF(CLANG_CL)
LICENSE(
Apache-2.0 AND
diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp
index ff1b7505d60..a2099a6d0e3 100644
--- a/yt/yt/client/federated/client.cpp
+++ b/yt/yt/client/federated/client.cpp
@@ -15,6 +15,8 @@
#include <yt/yt/client/object_client/helpers.h>
+#include <yt/yt/client/security_client/public.h>
+
#include <yt/yt/core/concurrency/periodic_executor.h>
#include <yt/yt/core/net/address.h>
@@ -40,19 +42,20 @@ DECLARE_REFCOUNTED_CLASS(TClient)
////////////////////////////////////////////////////////////////////////////////
-std::optional<TString> GetDataCenterByClient(const IClientPtr& client)
+TFuture<std::optional<TString>> GetDataCenterByClient(const IClientPtr& client)
{
TListNodeOptions options;
options.MaxSize = 1;
- auto items = NConcurrency::WaitFor(client->ListNode(RpcProxiesPath, options))
- .ValueOrThrow();
- auto itemsList = NYTree::ConvertTo<NYTree::IListNodePtr>(items);
- if (!itemsList->GetChildCount()) {
- return std::nullopt;
- }
- auto host = itemsList->GetChildren()[0];
- return NNet::InferYPClusterFromHostName(host->GetValue<TString>());
+ return client->ListNode(RpcProxiesPath, options)
+ .Apply(BIND([] (const NYson::TYsonString& items) {
+ auto itemsList = NYTree::ConvertTo<NYTree::IListNodePtr>(items);
+ if (!itemsList->GetChildCount()) {
+ return std::optional<TString>();
+ }
+ auto host = itemsList->GetChildren()[0];
+ return NNet::InferYPClusterFromHostName(host->GetValue<TString>());
+ }));
}
class TTransaction
@@ -248,17 +251,24 @@ DECLARE_REFCOUNTED_TYPE(TTransaction)
////////////////////////////////////////////////////////////////////////////////
+enum class EClientPriority : ui8
+{
+ Local,
+ Remote,
+ Undefined,
+};
+
DECLARE_REFCOUNTED_STRUCT(TClientDescription)
struct TClientDescription final
{
- TClientDescription(IClientPtr client, int priority)
+ TClientDescription(IClientPtr client, EClientPriority priority)
: Client(std::move(client))
, Priority(priority)
{ }
IClientPtr Client;
- int Priority;
+ EClientPriority Priority;
std::atomic<bool> HasErrors{false};
};
@@ -503,6 +513,7 @@ private:
private:
const TFederationConfigPtr Config_;
const NConcurrency::TPeriodicExecutorPtr Executor_;
+ const TString LocalDatacenter_;
std::vector<TClientDescriptionPtr> UnderlyingClients_;
IClientPtr ActiveClient_;
@@ -593,19 +604,14 @@ TClient::TClient(const std::vector<IClientPtr>& underlyingClients, TFederationCo
NRpc::TDispatcher::Get()->GetLightInvoker(),
BIND(&TClient::CheckClustersHealth, MakeWeak(this)),
Config_->ClusterHealthCheckPeriod))
+ , LocalDatacenter_(NNet::GetLocalYPCluster())
{
YT_VERIFY(!underlyingClients.empty());
UnderlyingClients_.reserve(underlyingClients.size());
- const auto& localDatacenter = NNet::GetLocalYPCluster();
for (const auto& client : underlyingClients) {
- int priority = GetDataCenterByClient(client) == localDatacenter ? 1 : 0;
- UnderlyingClients_.push_back(New<TClientDescription>(client, priority));
+ UnderlyingClients_.push_back(New<TClientDescription>(client, EClientPriority::Undefined));
}
- std::stable_sort(UnderlyingClients_.begin(), UnderlyingClients_.end(), [] (const auto& lhs, const auto& rhs) {
- return lhs->Priority > rhs->Priority;
- });
-
ActiveClient_ = UnderlyingClients_[0]->Client;
ActiveClientIndex_ = 0;
@@ -615,43 +621,54 @@ TClient::TClient(const std::vector<IClientPtr>& underlyingClients, TFederationCo
void TClient::CheckClustersHealth()
{
TCheckClusterLivenessOptions options;
- options.CheckCypressRoot = true;
+ options.CheckCypressRoot = Config_->CheckCypressRoot;
options.CheckTabletCellBundle = Config_->BundleName;
- int activeClientIndex = ActiveClientIndex_.load();
- std::optional<int> betterClientIndex;
-
std::vector<TFuture<void>> checks;
checks.reserve(UnderlyingClients_.size());
-
for (const auto& clientDescription : UnderlyingClients_) {
checks.emplace_back(clientDescription->Client->CheckClusterLiveness(options));
}
- for (int index = 0; index < std::ssize(checks); ++index) {
+ for (int index = 0; index != std::ssize(checks); ++index) {
const auto& check = checks[index];
- bool hasErrors = !NConcurrency::WaitFor(check).IsOK();
- UnderlyingClients_[index]->HasErrors = hasErrors;
- if (!betterClientIndex && !hasErrors && index < activeClientIndex) {
- betterClientIndex = index;
- }
+ auto error = NConcurrency::WaitFor(check);
+ YT_LOG_DEBUG_UNLESS(error.IsOK(), error, "Cluster %Qv is marked as unhealthy",
+ UnderlyingClients_[index]->Client->GetClusterName(/*fetchIfNull*/ false));
+ UnderlyingClients_[index]->HasErrors = !error.IsOK()
+ && !error.FindMatching(NSecurityClient::EErrorCode::AuthorizationError); // Ignore authorization errors.
}
- if (betterClientIndex && ActiveClientIndex_ == activeClientIndex) {
- int newClientIndex = *betterClientIndex;
- auto guard = NThreading::WriterGuard(Lock_);
- ActiveClient_ = UnderlyingClients_[newClientIndex]->Client;
- ActiveClientIndex_ = newClientIndex;
- return;
+ for (int index = 0; index != std::ssize(UnderlyingClients_); ++index) {
+ auto& client = UnderlyingClients_[index];
+ // `Priority` accessed only from this thread so it is not require synchronization.
+ if (client->Priority == EClientPriority::Undefined) {
+ auto clientDatacenter = NConcurrency::WaitFor(GetDataCenterByClient(client->Client));
+ if (clientDatacenter.IsOK()) {
+ client->Priority = clientDatacenter.Value() == LocalDatacenter_
+ ? EClientPriority::Local
+ : EClientPriority::Remote;
+ }
+ }
+ }
+ // Compute better activeClientIndex.
+ int betterClientIndex = ActiveClientIndex_.load();
+ auto betterPriority = UnderlyingClients_[betterClientIndex]->HasErrors
+ ? EClientPriority::Undefined
+ : UnderlyingClients_[betterClientIndex]->Priority;
+
+ for (int index = 0; index != std::ssize(UnderlyingClients_); ++index) {
+ const auto& client = UnderlyingClients_[index];
+ if (!client->HasErrors && client->Priority < betterPriority) {
+ betterClientIndex = index;
+ betterPriority = client->Priority;
+ }
}
- // If active cluster is not healthy, try changing it.
- if (UnderlyingClients_[activeClientIndex]->HasErrors) {
+ if (ActiveClientIndex_ != betterClientIndex) {
auto guard = NThreading::WriterGuard(Lock_);
- // Check that active client wasn't changed.
- if (ActiveClientIndex_ == activeClientIndex && UnderlyingClients_[activeClientIndex]->HasErrors) {
- UpdateActiveClient();
- }
+ ActiveClient_ = UnderlyingClients_[betterClientIndex]->Client;
+ ActiveClientIndex_ = betterClientIndex;
}
}
@@ -752,42 +769,34 @@ void TClient::HandleError(const TErrorOr<void>& error, int clientIndex)
if (ActiveClientIndex_ != clientIndex) {
return;
}
-
- auto guard = WriterGuard(Lock_);
- if (ActiveClientIndex_ != clientIndex) {
- return;
- }
-
UpdateActiveClient();
}
void TClient::UpdateActiveClient()
{
- VERIFY_WRITER_SPINLOCK_AFFINITY(Lock_);
-
- int activeClientIndex = ActiveClientIndex_.load();
+ VERIFY_THREAD_AFFINITY_ANY();
for (int index = 0; index < std::ssize(UnderlyingClients_); ++index) {
const auto& clientDescription = UnderlyingClients_[index];
if (!clientDescription->HasErrors) {
- if (activeClientIndex != index) {
+ if (ActiveClientIndex_ != index) {
YT_LOG_DEBUG("Active client was changed (PreviousClientIndex: %v, NewClientIndex: %v)",
- activeClientIndex,
+ ActiveClientIndex_.load(),
index);
+ auto guard = NThreading::WriterGuard(Lock_);
+ ActiveClientIndex_ = index;
+ ActiveClient_ = clientDescription->Client;
}
-
- ActiveClient_ = clientDescription->Client;
- ActiveClientIndex_ = index;
- break;
+ return;
}
}
}
TClient::TActiveClientInfo TClient::GetActiveClient()
{
- auto guard = ReaderGuard(Lock_);
YT_LOG_TRACE("Request will be send to the active client (ClientIndex: %v)",
ActiveClientIndex_.load());
+ auto guard = ReaderGuard(Lock_);
return {ActiveClient_, ActiveClientIndex_.load()};
}
diff --git a/yt/yt/client/federated/config.cpp b/yt/yt/client/federated/config.cpp
index edba2d190ed..f5381e890a4 100644
--- a/yt/yt/client/federated/config.cpp
+++ b/yt/yt/client/federated/config.cpp
@@ -15,6 +15,9 @@ void TFederationConfig::Register(TRegistrar registrar)
.GreaterThan(TDuration::Zero())
.Default(TDuration::Seconds(60));
+ registrar.Parameter("check_cypress_root", &TThis::CheckCypressRoot)
+ .Default(true);
+
registrar.Parameter("cluster_retry_attempts", &TThis::ClusterRetryAttempts)
.GreaterThanOrEqual(0)
.Default(3);
diff --git a/yt/yt/client/federated/config.h b/yt/yt/client/federated/config.h
index 115d0450dcb..d092c3f0193 100644
--- a/yt/yt/client/federated/config.h
+++ b/yt/yt/client/federated/config.h
@@ -22,6 +22,9 @@ public:
//! How often cluster liveness should be checked on the background.
TDuration ClusterHealthCheckPeriod;
+ //! Checks Cypress root availability in liveness check.
+ bool CheckCypressRoot;
+
//! Maximum number of retry attempts to make.
int ClusterRetryAttempts;
diff --git a/yt/yt/client/federated/unittests/client_ut.cpp b/yt/yt/client/federated/unittests/client_ut.cpp
index 0bc009fb6f8..169c1c5039e 100644
--- a/yt/yt/client/federated/unittests/client_ut.cpp
+++ b/yt/yt/client/federated/unittests/client_ut.cpp
@@ -119,13 +119,13 @@ TEST(TFederatedClientTest, Basic)
.WillRepeatedly(Return(VoidFuture));
// Creation of federated client.
- std::vector<IClientPtr> clients{mockClientSas, mockClientVla};
+ std::vector<IClientPtr> clients{mockClientVla, mockClientSas};
auto config = New<TFederationConfig>();
- config->ClusterHealthCheckPeriod = TDuration::Seconds(5);
+ config->ClusterHealthCheckPeriod = TDuration::Seconds(3);
config->ClusterRetryAttempts = 1;
auto federatedClient = CreateClient(clients, config);
- // 1. `vla` client should be used as closest cluster.
+ // 1. `vla` client should be used as first cluster.
// 2. error from `vla` cluster should be received.
// 3. `sas` client should be used as other cluster.
@@ -213,6 +213,9 @@ TEST(TFederatedClientTest, CheckHealth)
EXPECT_CALL(*mockClientSas, LookupRows(data.Path, _, _, _))
.WillOnce(Return(MakeFuture(data.LookupResult2)));
+ // Wait initialization and choose `local` cluster.
+ Sleep(TDuration::Seconds(2));
+
// From `vla`.
{
auto result = federatedClient->LookupRows(data.Path, data.NameTable, data.Keys);
@@ -462,8 +465,6 @@ TEST(TFederatedClientTest, AttachTransaction)
auto mockConnectionVla = New<TStrictMockConnection>();
EXPECT_CALL(*mockConnectionVla, GetClusterTag())
.WillRepeatedly(Return(NObjectClient::TCellTag(456)));
- EXPECT_CALL(*mockClientVla, GetConnection())
- .WillOnce(Return(mockConnectionVla));
// Creation of federated client.
std::vector<IClientPtr> clients{mockClientSas, mockClientVla};
@@ -471,6 +472,9 @@ TEST(TFederatedClientTest, AttachTransaction)
config->ClusterHealthCheckPeriod = TDuration::Seconds(5);
auto federatedClient = CreateClient(clients, config);
+ // Wait initialization.
+ Sleep(TDuration::Seconds(2));
+
auto mockTransactionSas = New<TStrictMockTransaction>();
auto transactionId = TGuid(0, 123 << 16, 0, 0);
EXPECT_CALL(*mockTransactionSas, GetId())
diff --git a/yt/yt/client/federated/unittests/connection_ut.cpp b/yt/yt/client/federated/unittests/connection_ut.cpp
index 7f7473d0192..e512087a146 100644
--- a/yt/yt/client/federated/unittests/connection_ut.cpp
+++ b/yt/yt/client/federated/unittests/connection_ut.cpp
@@ -35,11 +35,19 @@ TEST(TFederatedConnectionTest, CreateClient)
// To identify best (closest) cluster.
NYson::TYsonString nodesYsonSas(TStringBuf(R"(["a-rpc-proxy-a.sas.yp-c.yandex.net:9013"])"));
EXPECT_CALL(*mockClientSas, ListNode("//sys/rpc_proxies", _))
- .WillRepeatedly(Return(MakeFuture(nodesYsonSas)));
+ .WillOnce(Return(MakeFuture(nodesYsonSas)));
+ EXPECT_CALL(*mockClientSas, GetNode("//test/node", _))
+ .WillRepeatedly([=] (const NYPath::TYPath&, const NApi::TGetNodeOptions&) {
+ return MakeFuture(nodesYsonSas);
+ });
NYson::TYsonString nodesYsonVla(TStringBuf(R"(["a-rpc-proxy-a.vla.yp-c.yandex.net:9013"])"));
EXPECT_CALL(*mockClientVla, ListNode("//sys/rpc_proxies", _))
- .WillRepeatedly(Return(MakeFuture(nodesYsonVla)));
+ .WillOnce(Return(MakeFuture(nodesYsonVla)));
+ EXPECT_CALL(*mockClientVla, GetNode("//test/node", _))
+ .WillRepeatedly([=] (const NYPath::TYPath&, const NApi::TGetNodeOptions&) {
+ return MakeFuture(nodesYsonVla);
+ });
EXPECT_CALL(*mockClientSas, CheckClusterLiveness(_))
.WillRepeatedly(Return(VoidFuture));
@@ -65,8 +73,81 @@ TEST(TFederatedConnectionTest, CreateClient)
auto connection = CreateConnection({mockConnectionSas, mockConnectionVla}, config);
EXPECT_THAT(connection->GetLoggingTag(), testing::HasSubstr("Clusters: (sas; vla)"));
auto client = connection->CreateClient(clientOptions);
- auto nodes = client->ListNode("//sys/rpc_proxies").Get().ValueOrThrow();
+ auto nodes = client->GetNode("//test/node").Get().ValueOrThrow();
EXPECT_EQ(nodesYsonSas, nodes);
+
+ Sleep(TDuration::Seconds(2));
+ auto nodes2 = client->GetNode("//test/node").Get().ValueOrThrow();
+ EXPECT_EQ(nodesYsonSas, nodes2);
+}
+
+TEST(TFederatedConnectionTest, CreateClientWhenOneClusterUnavailable)
+{
+ auto config = New<TFederationConfig>();
+ config->BundleName = "my_bundle";
+ config->ClusterHealthCheckPeriod = TDuration::Seconds(5);
+
+ auto mockConnectionSas = New<TStrictMockConnection>();
+ auto mockConnectionVla = New<TStrictMockConnection>();
+ auto mockClientSas = New<TStrictMockClient>();
+ auto mockClientVla = New<TStrictMockClient>();
+
+ // To identify best (closest) cluster.
+ NYson::TYsonString nodesYsonSas(TStringBuf(R"(["a-rpc-proxy-a.sas.yp-c.yandex.net:9013"])"));
+ EXPECT_CALL(*mockClientSas, ListNode("//sys/rpc_proxies", _))
+ .WillOnce(Return(MakeFuture<NYson::TYsonString>(TError("Failure"))))
+ .WillRepeatedly([=] (const NYPath::TYPath&, const NApi::TListNodeOptions&) {
+ return MakeFuture(nodesYsonSas);
+ });
+ EXPECT_CALL(*mockClientSas, GetNode("//test/node", _))
+ .WillRepeatedly([=] (const NYPath::TYPath&, const NApi::TGetNodeOptions&) {
+ return MakeFuture(nodesYsonSas);
+ });
+
+ NYson::TYsonString nodesYsonVla(TStringBuf(R"(["a-rpc-proxy-a.vla.yp-c.yandex.net:9013"])"));
+ EXPECT_CALL(*mockClientVla, ListNode("//sys/rpc_proxies", _))
+ .WillRepeatedly([=] (const NYPath::TYPath&, const NApi::TListNodeOptions&) {
+ return MakeFuture(nodesYsonVla);
+ });
+ EXPECT_CALL(*mockClientVla, GetNode("//test/node", _))
+ .WillRepeatedly([=] (const NYPath::TYPath&, const NApi::TGetNodeOptions&) {
+ return MakeFuture(nodesYsonVla);
+ });
+
+ EXPECT_CALL(*mockClientSas, CheckClusterLiveness(_))
+ .WillRepeatedly(Return(VoidFuture));
+ EXPECT_CALL(*mockClientVla, CheckClusterLiveness(_))
+ .WillRepeatedly(Return(VoidFuture));
+
+ NApi::TClientOptions clientOptions;
+ EXPECT_CALL(*mockConnectionSas, CreateClient(::testing::Ref(clientOptions)))
+ .WillOnce(Return(mockClientSas));
+ EXPECT_CALL(*mockConnectionVla, CreateClient(::testing::Ref(clientOptions)))
+ .WillOnce(Return(mockClientVla));
+
+ EXPECT_CALL(*mockConnectionSas, GetLoggingTag())
+ .WillOnce(ReturnRefOfCopy(TString("sas")));
+ EXPECT_CALL(*mockConnectionVla, GetLoggingTag())
+ .WillOnce(ReturnRefOfCopy(TString("vla")));
+
+ auto finally = Finally([oldLocalHostName = NNet::GetLocalHostName()] {
+ NNet::WriteLocalHostName(oldLocalHostName);
+ });
+ NNet::WriteLocalHostName("a-rpc-proxy.sas.yp-c.yandex.net");
+
+ auto connection = CreateConnection({mockConnectionSas, mockConnectionVla}, config);
+ EXPECT_THAT(connection->GetLoggingTag(), testing::HasSubstr("Clusters: (sas; vla)"));
+ auto client = connection->CreateClient(clientOptions);
+
+ Sleep(TDuration::Seconds(2));
+
+ auto nodes1 = client->GetNode("//test/node").Get().ValueOrThrow();
+ EXPECT_EQ(nodesYsonVla, nodes1);
+
+ Sleep(TDuration::Seconds(6));
+
+ auto nodes2 = client->GetNode("//test/node").Get().ValueOrThrow();
+ EXPECT_EQ(nodesYsonSas, nodes2);
}
////////////////////////////////////////////////////////////////////////////////