diff options
author | nadya73 <[email protected]> | 2024-11-02 14:12:17 +0300 |
---|---|---|
committer | nadya73 <[email protected]> | 2024-11-02 14:23:12 +0300 |
commit | 017d453a2d7b24d5a4348f7d83c2b58d765f19df (patch) | |
tree | cde8108bdcd3883ed93ea3b0b1789850c8ddc498 | |
parent | 81a29448e61ff3ea8edb3fa707a0946560380ceb (diff) |
YT-23294: Recreate terminated federated clients
* Changelog entry
Type: fix
Component: queue-agent
Fix expiration of clients for chaos queues and consumers when cluster connection is changed.
Pull Request resolved: <https://github.com/ytsaurus/ytsaurus/pull/913>
commit_hash:0f4c6c196076fa370cf5a63df803bf1cbeca5bf0
-rw-r--r-- | yt/yt/client/api/connection.h | 1 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/connection_impl.cpp | 6 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/connection_impl.h | 3 | ||||
-rw-r--r-- | yt/yt/client/federated/connection.cpp | 10 | ||||
-rw-r--r-- | yt/yt/client/unittests/mock/connection.h | 1 |
5 files changed, 21 insertions, 0 deletions
diff --git a/yt/yt/client/api/connection.h b/yt/yt/client/api/connection.h index 76d61db5d7a..573bba0bcb6 100644 --- a/yt/yt/client/api/connection.h +++ b/yt/yt/client/api/connection.h @@ -62,6 +62,7 @@ struct IConnection virtual void ClearMetadataCaches() = 0; virtual void Terminate() = 0; + virtual bool IsTerminated() const = 0; //! Returns a YSON-serialized connection config. virtual NYson::TYsonString GetConfigYson() const = 0; diff --git a/yt/yt/client/api/rpc_proxy/connection_impl.cpp b/yt/yt/client/api/rpc_proxy/connection_impl.cpp index 8bc17c3c524..3a491d85267 100644 --- a/yt/yt/client/api/rpc_proxy/connection_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/connection_impl.cpp @@ -356,12 +356,18 @@ void TConnection::ClearMetadataCaches() void TConnection::Terminate() { YT_LOG_DEBUG("Terminating connection"); + Terminated_ = true; ChannelPool_->Terminate(TError("Connection terminated")); if (Config_->EnableProxyDiscovery) { YT_UNUSED_FUTURE(UpdateProxyListExecutor_->Stop()); } } +bool TConnection::IsTerminated() const +{ + return Terminated_; +} + const TConnectionConfigPtr& TConnection::GetConfig() { return Config_; diff --git a/yt/yt/client/api/rpc_proxy/connection_impl.h b/yt/yt/client/api/rpc_proxy/connection_impl.h index 8ccdf08e7d2..182bfc864a7 100644 --- a/yt/yt/client/api/rpc_proxy/connection_impl.h +++ b/yt/yt/client/api/rpc_proxy/connection_impl.h @@ -49,6 +49,7 @@ public: void ClearMetadataCaches() override; void Terminate() override; + bool IsTerminated() const override; NYson::TYsonString GetConfigYson() const override; @@ -59,6 +60,8 @@ private: const TConnectionConfigPtr Config_; + std::atomic<bool> Terminated_ = false; + const TGuid ConnectionId_; const TString LoggingTag_; const TString ClusterId_; diff --git a/yt/yt/client/federated/connection.cpp b/yt/yt/client/federated/connection.cpp index b50e202bc00..9b0b94d35ed 100644 --- a/yt/yt/client/federated/connection.cpp +++ b/yt/yt/client/federated/connection.cpp @@ -81,11 +81,19 @@ public: void Terminate() override { // TODO(bulatman) What about exceptions? + Terminated_ = true; for (auto& connection : Connections_) { connection->Terminate(); } } + bool IsTerminated() const override + { + return Terminated_ || AnyOf(Connections_, [](const auto& connection) { + return connection->IsTerminated(); + }); + } + //! Returns a YSON-serialized connection config. NYson::TYsonString GetConfigYson() const override { @@ -107,6 +115,8 @@ private: const NConcurrency::TActionQueuePtr ActionQueue_; const TGuid ConnectionId_; const TString LoggingTag_; + + std::atomic<bool> Terminated_ = false; }; } // namespace diff --git a/yt/yt/client/unittests/mock/connection.h b/yt/yt/client/unittests/mock/connection.h index 3c305580036..6e409efbbe5 100644 --- a/yt/yt/client/unittests/mock/connection.h +++ b/yt/yt/client/unittests/mock/connection.h @@ -34,6 +34,7 @@ public: MOCK_METHOD(void, ClearMetadataCaches, (), (override)); MOCK_METHOD(void, Terminate, (), (override)); + MOCK_METHOD(bool, IsTerminated, (), (const, override)); MOCK_METHOD(NYson::TYsonString, GetConfigYson, (), (const, override)); }; |