summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornadya73 <[email protected]>2024-11-02 14:12:17 +0300
committernadya73 <[email protected]>2024-11-02 14:23:12 +0300
commit017d453a2d7b24d5a4348f7d83c2b58d765f19df (patch)
treecde8108bdcd3883ed93ea3b0b1789850c8ddc498
parent81a29448e61ff3ea8edb3fa707a0946560380ceb (diff)
YT-23294: Recreate terminated federated clients
* Changelog entry Type: fix Component: queue-agent Fix expiration of clients for chaos queues and consumers when cluster connection is changed. Pull Request resolved: <https://github.com/ytsaurus/ytsaurus/pull/913> commit_hash:0f4c6c196076fa370cf5a63df803bf1cbeca5bf0
-rw-r--r--yt/yt/client/api/connection.h1
-rw-r--r--yt/yt/client/api/rpc_proxy/connection_impl.cpp6
-rw-r--r--yt/yt/client/api/rpc_proxy/connection_impl.h3
-rw-r--r--yt/yt/client/federated/connection.cpp10
-rw-r--r--yt/yt/client/unittests/mock/connection.h1
5 files changed, 21 insertions, 0 deletions
diff --git a/yt/yt/client/api/connection.h b/yt/yt/client/api/connection.h
index 76d61db5d7a..573bba0bcb6 100644
--- a/yt/yt/client/api/connection.h
+++ b/yt/yt/client/api/connection.h
@@ -62,6 +62,7 @@ struct IConnection
virtual void ClearMetadataCaches() = 0;
virtual void Terminate() = 0;
+ virtual bool IsTerminated() const = 0;
//! Returns a YSON-serialized connection config.
virtual NYson::TYsonString GetConfigYson() const = 0;
diff --git a/yt/yt/client/api/rpc_proxy/connection_impl.cpp b/yt/yt/client/api/rpc_proxy/connection_impl.cpp
index 8bc17c3c524..3a491d85267 100644
--- a/yt/yt/client/api/rpc_proxy/connection_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/connection_impl.cpp
@@ -356,12 +356,18 @@ void TConnection::ClearMetadataCaches()
void TConnection::Terminate()
{
YT_LOG_DEBUG("Terminating connection");
+ Terminated_ = true;
ChannelPool_->Terminate(TError("Connection terminated"));
if (Config_->EnableProxyDiscovery) {
YT_UNUSED_FUTURE(UpdateProxyListExecutor_->Stop());
}
}
+bool TConnection::IsTerminated() const
+{
+ return Terminated_;
+}
+
const TConnectionConfigPtr& TConnection::GetConfig()
{
return Config_;
diff --git a/yt/yt/client/api/rpc_proxy/connection_impl.h b/yt/yt/client/api/rpc_proxy/connection_impl.h
index 8ccdf08e7d2..182bfc864a7 100644
--- a/yt/yt/client/api/rpc_proxy/connection_impl.h
+++ b/yt/yt/client/api/rpc_proxy/connection_impl.h
@@ -49,6 +49,7 @@ public:
void ClearMetadataCaches() override;
void Terminate() override;
+ bool IsTerminated() const override;
NYson::TYsonString GetConfigYson() const override;
@@ -59,6 +60,8 @@ private:
const TConnectionConfigPtr Config_;
+ std::atomic<bool> Terminated_ = false;
+
const TGuid ConnectionId_;
const TString LoggingTag_;
const TString ClusterId_;
diff --git a/yt/yt/client/federated/connection.cpp b/yt/yt/client/federated/connection.cpp
index b50e202bc00..9b0b94d35ed 100644
--- a/yt/yt/client/federated/connection.cpp
+++ b/yt/yt/client/federated/connection.cpp
@@ -81,11 +81,19 @@ public:
void Terminate() override
{
// TODO(bulatman) What about exceptions?
+ Terminated_ = true;
for (auto& connection : Connections_) {
connection->Terminate();
}
}
+ bool IsTerminated() const override
+ {
+ return Terminated_ || AnyOf(Connections_, [](const auto& connection) {
+ return connection->IsTerminated();
+ });
+ }
+
//! Returns a YSON-serialized connection config.
NYson::TYsonString GetConfigYson() const override
{
@@ -107,6 +115,8 @@ private:
const NConcurrency::TActionQueuePtr ActionQueue_;
const TGuid ConnectionId_;
const TString LoggingTag_;
+
+ std::atomic<bool> Terminated_ = false;
};
} // namespace
diff --git a/yt/yt/client/unittests/mock/connection.h b/yt/yt/client/unittests/mock/connection.h
index 3c305580036..6e409efbbe5 100644
--- a/yt/yt/client/unittests/mock/connection.h
+++ b/yt/yt/client/unittests/mock/connection.h
@@ -34,6 +34,7 @@ public:
MOCK_METHOD(void, ClearMetadataCaches, (), (override));
MOCK_METHOD(void, Terminate, (), (override));
+ MOCK_METHOD(bool, IsTerminated, (), (const, override));
MOCK_METHOD(NYson::TYsonString, GetConfigYson, (), (const, override));
};