diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-10-14 11:38:29 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-10-14 11:52:32 +0300 |
commit | 86b5d1c9816da4926c2810c27f0b60948c249c21 (patch) | |
tree | 2eeb30ef74165a920bbfef5777170f149a8705f0 | |
parent | 37ba3bb82a83bcb55616ce7817c7087413b0e429 (diff) | |
download | ydb-86b5d1c9816da4926c2810c27f0b60948c249c21.tar.gz |
Intermediate changes
commit_hash:c7b424f51fc54aa5731f2bff20943a09dbe30095
-rw-r--r-- | contrib/libs/cxxsupp/libcxxmsvc/ya.make | 3 | ||||
-rw-r--r-- | yt/yt/client/federated/client.cpp | 123 | ||||
-rw-r--r-- | yt/yt/client/federated/config.cpp | 3 | ||||
-rw-r--r-- | yt/yt/client/federated/config.h | 3 | ||||
-rw-r--r-- | yt/yt/client/federated/unittests/client_ut.cpp | 14 | ||||
-rw-r--r-- | yt/yt/client/federated/unittests/connection_ut.cpp | 87 |
6 files changed, 167 insertions, 66 deletions
diff --git a/contrib/libs/cxxsupp/libcxxmsvc/ya.make b/contrib/libs/cxxsupp/libcxxmsvc/ya.make index db9c8f112dc..ea8eefb5cc5 100644 --- a/contrib/libs/cxxsupp/libcxxmsvc/ya.make +++ b/contrib/libs/cxxsupp/libcxxmsvc/ya.make @@ -2,7 +2,8 @@ LIBRARY() -BUILD_ONLY_IF(OS_WINDOWS) +BUILD_ONLY_IF(MSVC) +NO_BUILD_IF(CLANG_CL) LICENSE( Apache-2.0 AND diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp index ff1b7505d60..a2099a6d0e3 100644 --- a/yt/yt/client/federated/client.cpp +++ b/yt/yt/client/federated/client.cpp @@ -15,6 +15,8 @@ #include <yt/yt/client/object_client/helpers.h> +#include <yt/yt/client/security_client/public.h> + #include <yt/yt/core/concurrency/periodic_executor.h> #include <yt/yt/core/net/address.h> @@ -40,19 +42,20 @@ DECLARE_REFCOUNTED_CLASS(TClient) //////////////////////////////////////////////////////////////////////////////// -std::optional<TString> GetDataCenterByClient(const IClientPtr& client) +TFuture<std::optional<TString>> GetDataCenterByClient(const IClientPtr& client) { TListNodeOptions options; options.MaxSize = 1; - auto items = NConcurrency::WaitFor(client->ListNode(RpcProxiesPath, options)) - .ValueOrThrow(); - auto itemsList = NYTree::ConvertTo<NYTree::IListNodePtr>(items); - if (!itemsList->GetChildCount()) { - return std::nullopt; - } - auto host = itemsList->GetChildren()[0]; - return NNet::InferYPClusterFromHostName(host->GetValue<TString>()); + return client->ListNode(RpcProxiesPath, options) + .Apply(BIND([] (const NYson::TYsonString& items) { + auto itemsList = NYTree::ConvertTo<NYTree::IListNodePtr>(items); + if (!itemsList->GetChildCount()) { + return std::optional<TString>(); + } + auto host = itemsList->GetChildren()[0]; + return NNet::InferYPClusterFromHostName(host->GetValue<TString>()); + })); } class TTransaction @@ -248,17 +251,24 @@ DECLARE_REFCOUNTED_TYPE(TTransaction) //////////////////////////////////////////////////////////////////////////////// +enum class EClientPriority : ui8 +{ + Local, + Remote, + Undefined, +}; + DECLARE_REFCOUNTED_STRUCT(TClientDescription) struct TClientDescription final { - TClientDescription(IClientPtr client, int priority) + TClientDescription(IClientPtr client, EClientPriority priority) : Client(std::move(client)) , Priority(priority) { } IClientPtr Client; - int Priority; + EClientPriority Priority; std::atomic<bool> HasErrors{false}; }; @@ -503,6 +513,7 @@ private: private: const TFederationConfigPtr Config_; const NConcurrency::TPeriodicExecutorPtr Executor_; + const TString LocalDatacenter_; std::vector<TClientDescriptionPtr> UnderlyingClients_; IClientPtr ActiveClient_; @@ -593,19 +604,14 @@ TClient::TClient(const std::vector<IClientPtr>& underlyingClients, TFederationCo NRpc::TDispatcher::Get()->GetLightInvoker(), BIND(&TClient::CheckClustersHealth, MakeWeak(this)), Config_->ClusterHealthCheckPeriod)) + , LocalDatacenter_(NNet::GetLocalYPCluster()) { YT_VERIFY(!underlyingClients.empty()); UnderlyingClients_.reserve(underlyingClients.size()); - const auto& localDatacenter = NNet::GetLocalYPCluster(); for (const auto& client : underlyingClients) { - int priority = GetDataCenterByClient(client) == localDatacenter ? 1 : 0; - UnderlyingClients_.push_back(New<TClientDescription>(client, priority)); + UnderlyingClients_.push_back(New<TClientDescription>(client, EClientPriority::Undefined)); } - std::stable_sort(UnderlyingClients_.begin(), UnderlyingClients_.end(), [] (const auto& lhs, const auto& rhs) { - return lhs->Priority > rhs->Priority; - }); - ActiveClient_ = UnderlyingClients_[0]->Client; ActiveClientIndex_ = 0; @@ -615,43 +621,54 @@ TClient::TClient(const std::vector<IClientPtr>& underlyingClients, TFederationCo void TClient::CheckClustersHealth() { TCheckClusterLivenessOptions options; - options.CheckCypressRoot = true; + options.CheckCypressRoot = Config_->CheckCypressRoot; options.CheckTabletCellBundle = Config_->BundleName; - int activeClientIndex = ActiveClientIndex_.load(); - std::optional<int> betterClientIndex; - std::vector<TFuture<void>> checks; checks.reserve(UnderlyingClients_.size()); - for (const auto& clientDescription : UnderlyingClients_) { checks.emplace_back(clientDescription->Client->CheckClusterLiveness(options)); } - for (int index = 0; index < std::ssize(checks); ++index) { + for (int index = 0; index != std::ssize(checks); ++index) { const auto& check = checks[index]; - bool hasErrors = !NConcurrency::WaitFor(check).IsOK(); - UnderlyingClients_[index]->HasErrors = hasErrors; - if (!betterClientIndex && !hasErrors && index < activeClientIndex) { - betterClientIndex = index; - } + auto error = NConcurrency::WaitFor(check); + YT_LOG_DEBUG_UNLESS(error.IsOK(), error, "Cluster %Qv is marked as unhealthy", + UnderlyingClients_[index]->Client->GetClusterName(/*fetchIfNull*/ false)); + UnderlyingClients_[index]->HasErrors = !error.IsOK() + && !error.FindMatching(NSecurityClient::EErrorCode::AuthorizationError); // Ignore authorization errors. } - if (betterClientIndex && ActiveClientIndex_ == activeClientIndex) { - int newClientIndex = *betterClientIndex; - auto guard = NThreading::WriterGuard(Lock_); - ActiveClient_ = UnderlyingClients_[newClientIndex]->Client; - ActiveClientIndex_ = newClientIndex; - return; + for (int index = 0; index != std::ssize(UnderlyingClients_); ++index) { + auto& client = UnderlyingClients_[index]; + // `Priority` accessed only from this thread so it is not require synchronization. + if (client->Priority == EClientPriority::Undefined) { + auto clientDatacenter = NConcurrency::WaitFor(GetDataCenterByClient(client->Client)); + if (clientDatacenter.IsOK()) { + client->Priority = clientDatacenter.Value() == LocalDatacenter_ + ? EClientPriority::Local + : EClientPriority::Remote; + } + } + } + // Compute better activeClientIndex. + int betterClientIndex = ActiveClientIndex_.load(); + auto betterPriority = UnderlyingClients_[betterClientIndex]->HasErrors + ? EClientPriority::Undefined + : UnderlyingClients_[betterClientIndex]->Priority; + + for (int index = 0; index != std::ssize(UnderlyingClients_); ++index) { + const auto& client = UnderlyingClients_[index]; + if (!client->HasErrors && client->Priority < betterPriority) { + betterClientIndex = index; + betterPriority = client->Priority; + } } - // If active cluster is not healthy, try changing it. - if (UnderlyingClients_[activeClientIndex]->HasErrors) { + if (ActiveClientIndex_ != betterClientIndex) { auto guard = NThreading::WriterGuard(Lock_); - // Check that active client wasn't changed. - if (ActiveClientIndex_ == activeClientIndex && UnderlyingClients_[activeClientIndex]->HasErrors) { - UpdateActiveClient(); - } + ActiveClient_ = UnderlyingClients_[betterClientIndex]->Client; + ActiveClientIndex_ = betterClientIndex; } } @@ -752,42 +769,34 @@ void TClient::HandleError(const TErrorOr<void>& error, int clientIndex) if (ActiveClientIndex_ != clientIndex) { return; } - - auto guard = WriterGuard(Lock_); - if (ActiveClientIndex_ != clientIndex) { - return; - } - UpdateActiveClient(); } void TClient::UpdateActiveClient() { - VERIFY_WRITER_SPINLOCK_AFFINITY(Lock_); - - int activeClientIndex = ActiveClientIndex_.load(); + VERIFY_THREAD_AFFINITY_ANY(); for (int index = 0; index < std::ssize(UnderlyingClients_); ++index) { const auto& clientDescription = UnderlyingClients_[index]; if (!clientDescription->HasErrors) { - if (activeClientIndex != index) { + if (ActiveClientIndex_ != index) { YT_LOG_DEBUG("Active client was changed (PreviousClientIndex: %v, NewClientIndex: %v)", - activeClientIndex, + ActiveClientIndex_.load(), index); + auto guard = NThreading::WriterGuard(Lock_); + ActiveClientIndex_ = index; + ActiveClient_ = clientDescription->Client; } - - ActiveClient_ = clientDescription->Client; - ActiveClientIndex_ = index; - break; + return; } } } TClient::TActiveClientInfo TClient::GetActiveClient() { - auto guard = ReaderGuard(Lock_); YT_LOG_TRACE("Request will be send to the active client (ClientIndex: %v)", ActiveClientIndex_.load()); + auto guard = ReaderGuard(Lock_); return {ActiveClient_, ActiveClientIndex_.load()}; } diff --git a/yt/yt/client/federated/config.cpp b/yt/yt/client/federated/config.cpp index edba2d190ed..f5381e890a4 100644 --- a/yt/yt/client/federated/config.cpp +++ b/yt/yt/client/federated/config.cpp @@ -15,6 +15,9 @@ void TFederationConfig::Register(TRegistrar registrar) .GreaterThan(TDuration::Zero()) .Default(TDuration::Seconds(60)); + registrar.Parameter("check_cypress_root", &TThis::CheckCypressRoot) + .Default(true); + registrar.Parameter("cluster_retry_attempts", &TThis::ClusterRetryAttempts) .GreaterThanOrEqual(0) .Default(3); diff --git a/yt/yt/client/federated/config.h b/yt/yt/client/federated/config.h index 115d0450dcb..d092c3f0193 100644 --- a/yt/yt/client/federated/config.h +++ b/yt/yt/client/federated/config.h @@ -22,6 +22,9 @@ public: //! How often cluster liveness should be checked on the background. TDuration ClusterHealthCheckPeriod; + //! Checks Cypress root availability in liveness check. + bool CheckCypressRoot; + //! Maximum number of retry attempts to make. int ClusterRetryAttempts; diff --git a/yt/yt/client/federated/unittests/client_ut.cpp b/yt/yt/client/federated/unittests/client_ut.cpp index 0bc009fb6f8..169c1c5039e 100644 --- a/yt/yt/client/federated/unittests/client_ut.cpp +++ b/yt/yt/client/federated/unittests/client_ut.cpp @@ -119,13 +119,13 @@ TEST(TFederatedClientTest, Basic) .WillRepeatedly(Return(VoidFuture)); // Creation of federated client. - std::vector<IClientPtr> clients{mockClientSas, mockClientVla}; + std::vector<IClientPtr> clients{mockClientVla, mockClientSas}; auto config = New<TFederationConfig>(); - config->ClusterHealthCheckPeriod = TDuration::Seconds(5); + config->ClusterHealthCheckPeriod = TDuration::Seconds(3); config->ClusterRetryAttempts = 1; auto federatedClient = CreateClient(clients, config); - // 1. `vla` client should be used as closest cluster. + // 1. `vla` client should be used as first cluster. // 2. error from `vla` cluster should be received. // 3. `sas` client should be used as other cluster. @@ -213,6 +213,9 @@ TEST(TFederatedClientTest, CheckHealth) EXPECT_CALL(*mockClientSas, LookupRows(data.Path, _, _, _)) .WillOnce(Return(MakeFuture(data.LookupResult2))); + // Wait initialization and choose `local` cluster. + Sleep(TDuration::Seconds(2)); + // From `vla`. { auto result = federatedClient->LookupRows(data.Path, data.NameTable, data.Keys); @@ -462,8 +465,6 @@ TEST(TFederatedClientTest, AttachTransaction) auto mockConnectionVla = New<TStrictMockConnection>(); EXPECT_CALL(*mockConnectionVla, GetClusterTag()) .WillRepeatedly(Return(NObjectClient::TCellTag(456))); - EXPECT_CALL(*mockClientVla, GetConnection()) - .WillOnce(Return(mockConnectionVla)); // Creation of federated client. std::vector<IClientPtr> clients{mockClientSas, mockClientVla}; @@ -471,6 +472,9 @@ TEST(TFederatedClientTest, AttachTransaction) config->ClusterHealthCheckPeriod = TDuration::Seconds(5); auto federatedClient = CreateClient(clients, config); + // Wait initialization. + Sleep(TDuration::Seconds(2)); + auto mockTransactionSas = New<TStrictMockTransaction>(); auto transactionId = TGuid(0, 123 << 16, 0, 0); EXPECT_CALL(*mockTransactionSas, GetId()) diff --git a/yt/yt/client/federated/unittests/connection_ut.cpp b/yt/yt/client/federated/unittests/connection_ut.cpp index 7f7473d0192..e512087a146 100644 --- a/yt/yt/client/federated/unittests/connection_ut.cpp +++ b/yt/yt/client/federated/unittests/connection_ut.cpp @@ -35,11 +35,19 @@ TEST(TFederatedConnectionTest, CreateClient) // To identify best (closest) cluster. NYson::TYsonString nodesYsonSas(TStringBuf(R"(["a-rpc-proxy-a.sas.yp-c.yandex.net:9013"])")); EXPECT_CALL(*mockClientSas, ListNode("//sys/rpc_proxies", _)) - .WillRepeatedly(Return(MakeFuture(nodesYsonSas))); + .WillOnce(Return(MakeFuture(nodesYsonSas))); + EXPECT_CALL(*mockClientSas, GetNode("//test/node", _)) + .WillRepeatedly([=] (const NYPath::TYPath&, const NApi::TGetNodeOptions&) { + return MakeFuture(nodesYsonSas); + }); NYson::TYsonString nodesYsonVla(TStringBuf(R"(["a-rpc-proxy-a.vla.yp-c.yandex.net:9013"])")); EXPECT_CALL(*mockClientVla, ListNode("//sys/rpc_proxies", _)) - .WillRepeatedly(Return(MakeFuture(nodesYsonVla))); + .WillOnce(Return(MakeFuture(nodesYsonVla))); + EXPECT_CALL(*mockClientVla, GetNode("//test/node", _)) + .WillRepeatedly([=] (const NYPath::TYPath&, const NApi::TGetNodeOptions&) { + return MakeFuture(nodesYsonVla); + }); EXPECT_CALL(*mockClientSas, CheckClusterLiveness(_)) .WillRepeatedly(Return(VoidFuture)); @@ -65,8 +73,81 @@ TEST(TFederatedConnectionTest, CreateClient) auto connection = CreateConnection({mockConnectionSas, mockConnectionVla}, config); EXPECT_THAT(connection->GetLoggingTag(), testing::HasSubstr("Clusters: (sas; vla)")); auto client = connection->CreateClient(clientOptions); - auto nodes = client->ListNode("//sys/rpc_proxies").Get().ValueOrThrow(); + auto nodes = client->GetNode("//test/node").Get().ValueOrThrow(); EXPECT_EQ(nodesYsonSas, nodes); + + Sleep(TDuration::Seconds(2)); + auto nodes2 = client->GetNode("//test/node").Get().ValueOrThrow(); + EXPECT_EQ(nodesYsonSas, nodes2); +} + +TEST(TFederatedConnectionTest, CreateClientWhenOneClusterUnavailable) +{ + auto config = New<TFederationConfig>(); + config->BundleName = "my_bundle"; + config->ClusterHealthCheckPeriod = TDuration::Seconds(5); + + auto mockConnectionSas = New<TStrictMockConnection>(); + auto mockConnectionVla = New<TStrictMockConnection>(); + auto mockClientSas = New<TStrictMockClient>(); + auto mockClientVla = New<TStrictMockClient>(); + + // To identify best (closest) cluster. + NYson::TYsonString nodesYsonSas(TStringBuf(R"(["a-rpc-proxy-a.sas.yp-c.yandex.net:9013"])")); + EXPECT_CALL(*mockClientSas, ListNode("//sys/rpc_proxies", _)) + .WillOnce(Return(MakeFuture<NYson::TYsonString>(TError("Failure")))) + .WillRepeatedly([=] (const NYPath::TYPath&, const NApi::TListNodeOptions&) { + return MakeFuture(nodesYsonSas); + }); + EXPECT_CALL(*mockClientSas, GetNode("//test/node", _)) + .WillRepeatedly([=] (const NYPath::TYPath&, const NApi::TGetNodeOptions&) { + return MakeFuture(nodesYsonSas); + }); + + NYson::TYsonString nodesYsonVla(TStringBuf(R"(["a-rpc-proxy-a.vla.yp-c.yandex.net:9013"])")); + EXPECT_CALL(*mockClientVla, ListNode("//sys/rpc_proxies", _)) + .WillRepeatedly([=] (const NYPath::TYPath&, const NApi::TListNodeOptions&) { + return MakeFuture(nodesYsonVla); + }); + EXPECT_CALL(*mockClientVla, GetNode("//test/node", _)) + .WillRepeatedly([=] (const NYPath::TYPath&, const NApi::TGetNodeOptions&) { + return MakeFuture(nodesYsonVla); + }); + + EXPECT_CALL(*mockClientSas, CheckClusterLiveness(_)) + .WillRepeatedly(Return(VoidFuture)); + EXPECT_CALL(*mockClientVla, CheckClusterLiveness(_)) + .WillRepeatedly(Return(VoidFuture)); + + NApi::TClientOptions clientOptions; + EXPECT_CALL(*mockConnectionSas, CreateClient(::testing::Ref(clientOptions))) + .WillOnce(Return(mockClientSas)); + EXPECT_CALL(*mockConnectionVla, CreateClient(::testing::Ref(clientOptions))) + .WillOnce(Return(mockClientVla)); + + EXPECT_CALL(*mockConnectionSas, GetLoggingTag()) + .WillOnce(ReturnRefOfCopy(TString("sas"))); + EXPECT_CALL(*mockConnectionVla, GetLoggingTag()) + .WillOnce(ReturnRefOfCopy(TString("vla"))); + + auto finally = Finally([oldLocalHostName = NNet::GetLocalHostName()] { + NNet::WriteLocalHostName(oldLocalHostName); + }); + NNet::WriteLocalHostName("a-rpc-proxy.sas.yp-c.yandex.net"); + + auto connection = CreateConnection({mockConnectionSas, mockConnectionVla}, config); + EXPECT_THAT(connection->GetLoggingTag(), testing::HasSubstr("Clusters: (sas; vla)")); + auto client = connection->CreateClient(clientOptions); + + Sleep(TDuration::Seconds(2)); + + auto nodes1 = client->GetNode("//test/node").Get().ValueOrThrow(); + EXPECT_EQ(nodesYsonVla, nodes1); + + Sleep(TDuration::Seconds(6)); + + auto nodes2 = client->GetNode("//test/node").Get().ValueOrThrow(); + EXPECT_EQ(nodesYsonSas, nodes2); } //////////////////////////////////////////////////////////////////////////////// |