diff options
author | don-dron <don-dron@yandex-team.com> | 2025-02-12 10:24:02 +0300 |
---|---|---|
committer | don-dron <don-dron@yandex-team.com> | 2025-02-12 10:40:14 +0300 |
commit | 68ee082e0346a833fb5a5dcead60e46e8801c52b (patch) | |
tree | b838c97de69b17b3962ca648a9c24027e64889b0 | |
parent | 170ec5117c02f67dc3013e96e1c7368247e4fc93 (diff) | |
download | ydb-68ee082e0346a833fb5a5dcead60e46e8801c52b.tar.gz |
YT-24116: Add dynconfig for bus server
commit_hash:f9f0449d014d5fb078085a5cb0f46768b3a92de8
-rw-r--r-- | yt/yt/core/bus/client.h | 5 | ||||
-rw-r--r-- | yt/yt/core/bus/server.h | 8 | ||||
-rw-r--r-- | yt/yt/core/bus/tcp/client.cpp | 10 | ||||
-rw-r--r-- | yt/yt/core/bus/tcp/config.cpp | 14 | ||||
-rw-r--r-- | yt/yt/core/bus/tcp/config.h | 41 | ||||
-rw-r--r-- | yt/yt/core/bus/tcp/connection.cpp | 16 | ||||
-rw-r--r-- | yt/yt/core/bus/tcp/connection.h | 4 | ||||
-rw-r--r-- | yt/yt/core/bus/tcp/public.h | 6 | ||||
-rw-r--r-- | yt/yt/core/bus/tcp/server.cpp | 34 |
9 files changed, 129 insertions, 9 deletions
diff --git a/yt/yt/core/bus/client.h b/yt/yt/core/bus/client.h index fa12665fc2..1b9ca361f5 100644 --- a/yt/yt/core/bus/client.h +++ b/yt/yt/core/bus/client.h @@ -2,6 +2,8 @@ #include "public.h" +#include <yt/yt/core/bus/tcp/public.h> + #include <yt/yt/core/ytree/public.h> namespace NYT::NBus { @@ -28,6 +30,9 @@ struct IBusClient //! Typically used for constructing errors. virtual const NYTree::IAttributeDictionary& GetEndpointAttributes() const = 0; + //! Apply new dynamic config. + virtual void OnDynamicConfigChanged(const NBus::TBusClientDynamicConfigPtr& config) = 0; + //! Creates a new bus. /*! * The bus will point to the address supplied during construction. diff --git a/yt/yt/core/bus/server.h b/yt/yt/core/bus/server.h index dd7a35bc13..d217935c36 100644 --- a/yt/yt/core/bus/server.h +++ b/yt/yt/core/bus/server.h @@ -2,6 +2,8 @@ #include "public.h" +#include <yt/yt/core/bus/tcp/public.h> + #include <yt/yt/core/actions/future.h> namespace NYT::NBus { @@ -22,6 +24,12 @@ struct IBusServer */ virtual void Start(IMessageHandlerPtr handler) = 0; + //! Apply new dynamic config. + /* + * \param config New config. + */ + virtual void OnDynamicConfigChanged(const NBus::TBusServerDynamicConfigPtr& config) = 0; + //! Asynchronously stops the listener. /*! * After this call the instance is no longer usable. diff --git a/yt/yt/core/bus/tcp/client.cpp b/yt/yt/core/bus/tcp/client.cpp index 5dd0117bee..81db62b1ab 100644 --- a/yt/yt/core/bus/tcp/client.cpp +++ b/yt/yt/core/bus/tcp/client.cpp @@ -160,6 +160,11 @@ public: return *EndpointAttributes_; } + void OnDynamicConfigChanged(const NBus::TBusClientDynamicConfigPtr& config) override + { + DynamicConfig_.Store(config); + } + IBusPtr CreateBus(IMessageHandlerPtr handler, const TCreateBusOptions& options) override { YT_ASSERT_THREAD_AFFINITY_ANY(); @@ -181,7 +186,6 @@ public: .EndMap()); auto poller = TTcpDispatcher::TImpl::Get()->GetXferPoller(); - auto connection = New<TTcpConnection>( Config_, EConnectionType::Client, @@ -196,7 +200,8 @@ public: std::move(handler), std::move(poller), PacketTranscoderFactory_, - MemoryUsageTracker_); + MemoryUsageTracker_, + DynamicConfig_.Acquire()->NeedRejectConnectionDueMemoryOvercommit); connection->Start(); return New<TTcpClientBusProxy>(std::move(connection)); @@ -204,6 +209,7 @@ public: private: const TBusClientConfigPtr Config_; + TAtomicIntrusivePtr<TBusClientDynamicConfig> DynamicConfig_{New<TBusClientDynamicConfig>()}; IPacketTranscoderFactory* const PacketTranscoderFactory_; const IMemoryUsageTrackerPtr MemoryUsageTracker_; const std::string EndpointDescription_; diff --git a/yt/yt/core/bus/tcp/config.cpp b/yt/yt/core/bus/tcp/config.cpp index f67c6fba6c..809a2d7413 100644 --- a/yt/yt/core/bus/tcp/config.cpp +++ b/yt/yt/core/bus/tcp/config.cpp @@ -126,6 +126,9 @@ TBusServerConfigPtr TBusServerConfig::CreateUds(const std::string& socketPath) return config; } +void TBusServerDynamicConfig::Register(TRegistrar /*registrar*/) +{ } + //////////////////////////////////////////////////////////////////////////////// void TBusConfig::Register(TRegistrar registrar) @@ -170,6 +173,14 @@ void TBusConfig::Register(TRegistrar registrar) //////////////////////////////////////////////////////////////////////////////// +void TBusDynamicConfig::Register(TRegistrar registrar) +{ + registrar.Parameter("need_reject_connection_due_memory_overcommit", &TThis::NeedRejectConnectionDueMemoryOvercommit) + .Default(false); +} + +//////////////////////////////////////////////////////////////////////////////// + void TBusClientConfig::Register(TRegistrar registrar) { registrar.Parameter("address", &TThis::Address) @@ -198,6 +209,9 @@ TBusClientConfigPtr TBusClientConfig::CreateUds(const std::string& socketPath) return config; } +void TBusClientDynamicConfig::Register(TRegistrar /*registrar*/) +{ } + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NBus diff --git a/yt/yt/core/bus/tcp/config.h b/yt/yt/core/bus/tcp/config.h index 491612fabb..d0a5bde08d 100644 --- a/yt/yt/core/bus/tcp/config.h +++ b/yt/yt/core/bus/tcp/config.h @@ -130,6 +130,21 @@ DEFINE_REFCOUNTED_TYPE(TBusConfig) //////////////////////////////////////////////////////////////////////////////// +class TBusDynamicConfig + : public NYTree::TYsonStruct +{ +public: + bool NeedRejectConnectionDueMemoryOvercommit; + + REGISTER_YSON_STRUCT(TBusDynamicConfig); + + static void Register(TRegistrar registrar); +}; + +DEFINE_REFCOUNTED_TYPE(TBusDynamicConfig) + +//////////////////////////////////////////////////////////////////////////////// + class TBusServerConfig : public TBusConfig { @@ -151,6 +166,19 @@ DEFINE_REFCOUNTED_TYPE(TBusServerConfig) //////////////////////////////////////////////////////////////////////////////// +class TBusServerDynamicConfig + : public TBusDynamicConfig +{ +public: + REGISTER_YSON_STRUCT(TBusServerDynamicConfig); + + static void Register(TRegistrar registrar); +}; + +DEFINE_REFCOUNTED_TYPE(TBusServerDynamicConfig) + +//////////////////////////////////////////////////////////////////////////////// + class TBusClientConfig : public TBusConfig { @@ -170,5 +198,18 @@ DEFINE_REFCOUNTED_TYPE(TBusClientConfig) //////////////////////////////////////////////////////////////////////////////// +class TBusClientDynamicConfig + : public TBusDynamicConfig +{ +public: + REGISTER_YSON_STRUCT(TBusClientDynamicConfig); + + static void Register(TRegistrar registrar); +}; + +DEFINE_REFCOUNTED_TYPE(TBusClientDynamicConfig) + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NBus diff --git a/yt/yt/core/bus/tcp/connection.cpp b/yt/yt/core/bus/tcp/connection.cpp index a357e9199d..792cc3d0c4 100644 --- a/yt/yt/core/bus/tcp/connection.cpp +++ b/yt/yt/core/bus/tcp/connection.cpp @@ -114,7 +114,8 @@ TTcpConnection::TTcpConnection( IMessageHandlerPtr handler, IPollerPtr poller, IPacketTranscoderFactory* packetTranscoderFactory, - IMemoryUsageTrackerPtr memoryUsageTracker) + IMemoryUsageTrackerPtr memoryUsageTracker, + bool needRejectConnectionDueMemoryOvercommit) : Config_(std::move(config)) , ConnectionType_(connectionType) , Id_(id) @@ -139,6 +140,7 @@ TTcpConnection::TTcpConnection( , EncryptionMode_(Config_->EncryptionMode) , VerificationMode_(Config_->VerificationMode) , MemoryUsageTracker_(std::move(memoryUsageTracker)) + , NeedRejectConnectionDueMemoryOvercommit_(needRejectConnectionDueMemoryOvercommit) { } TTcpConnection::~TTcpConnection() @@ -596,9 +598,15 @@ void TTcpConnection::InitBuffers() ConnectionType_ == EConnectionType::Server ? GetRefCountedTypeCookie<TTcpServerConnectionWriteBufferTag>() : GetRefCountedTypeCookie<TTcpClientConnectionWriteBufferTag>()); - trackedBlob - .TryReserve(WriteBufferSize) - .ThrowOnError(); + + if (NeedRejectConnectionDueMemoryOvercommit_) { + trackedBlob + .TryReserve(WriteBufferSize) + .ThrowOnError(); + } else { + trackedBlob.Reserve(WriteBufferSize); + } + WriteBuffers_.push_back(std::move(trackedBlob)); } diff --git a/yt/yt/core/bus/tcp/connection.h b/yt/yt/core/bus/tcp/connection.h index bc520f331a..229d244828 100644 --- a/yt/yt/core/bus/tcp/connection.h +++ b/yt/yt/core/bus/tcp/connection.h @@ -88,7 +88,8 @@ public: IMessageHandlerPtr handler, NConcurrency::IPollerPtr poller, IPacketTranscoderFactory* packetTranscoderFactory, - IMemoryUsageTrackerPtr memoryUsageTracker); + IMemoryUsageTrackerPtr memoryUsageTracker, + bool needRejectConnectionDueMemoryOvercommit); ~TTcpConnection(); @@ -279,6 +280,7 @@ private: const EVerificationMode VerificationMode_; const IMemoryUsageTrackerPtr MemoryUsageTracker_; + const bool NeedRejectConnectionDueMemoryOvercommit_; NYTree::IAttributeDictionaryPtr PeerAttributes_; diff --git a/yt/yt/core/bus/tcp/public.h b/yt/yt/core/bus/tcp/public.h index 0c86109c1a..aa86f46c4b 100644 --- a/yt/yt/core/bus/tcp/public.h +++ b/yt/yt/core/bus/tcp/public.h @@ -14,9 +14,15 @@ using TBusNetworkCountersPtr = TIntrusivePtr<TBusNetworkCounters>; DECLARE_REFCOUNTED_CLASS(TMultiplexingBandConfig) DECLARE_REFCOUNTED_CLASS(TTcpDispatcherConfig) DECLARE_REFCOUNTED_CLASS(TTcpDispatcherDynamicConfig) + DECLARE_REFCOUNTED_CLASS(TBusConfig) +DECLARE_REFCOUNTED_CLASS(TBusDynamicConfig) + DECLARE_REFCOUNTED_CLASS(TBusServerConfig) +DECLARE_REFCOUNTED_CLASS(TBusServerDynamicConfig) + DECLARE_REFCOUNTED_CLASS(TBusClientConfig) +DECLARE_REFCOUNTED_CLASS(TBusClientDynamicConfig) struct IPacketTranscoderFactory; diff --git a/yt/yt/core/bus/tcp/server.cpp b/yt/yt/core/bus/tcp/server.cpp index a9fcf0c8a0..2802f2d5d6 100644 --- a/yt/yt/core/bus/tcp/server.cpp +++ b/yt/yt/core/bus/tcp/server.cpp @@ -78,6 +78,13 @@ public: YT_LOG_INFO("Bus server started"); } + void OnDynamicConfigChanged(const NBus::TBusServerDynamicConfigPtr& config) + { + YT_VERIFY(config); + + DynamicConfig_.Store(config); + } + TFuture<void> Stop() { YT_LOG_INFO("Stopping Bus server"); @@ -122,6 +129,7 @@ public: protected: const TBusServerConfigPtr Config_; + TAtomicIntrusivePtr<TBusServerDynamicConfig> DynamicConfig_{New<TBusServerDynamicConfig>()}; const IPollerPtr Poller_; const IMessageHandlerPtr Handler_; IPacketTranscoderFactory* const PacketTranscoderFactory_; @@ -254,7 +262,6 @@ protected: .EndMap()); auto poller = TTcpDispatcher::TImpl::Get()->GetXferPoller(); - auto connection = New<TTcpConnection>( Config_, EConnectionType::Server, @@ -269,7 +276,8 @@ protected: Handler_, std::move(poller), PacketTranscoderFactory_, - MemoryUsageTracker_); + MemoryUsageTracker_, + DynamicConfig_.Acquire()->NeedRejectConnectionDueMemoryOvercommit); { auto guard = WriterGuard(ConnectionsSpinLock_); @@ -435,6 +443,18 @@ public: Server_.Store(server); server->Start(); + server->OnDynamicConfigChanged(DynamicConfig_.Acquire()); + } + + void OnDynamicConfigChanged(const NBus::TBusServerDynamicConfigPtr& config) final + { + YT_VERIFY(config); + + DynamicConfig_.Store(config); + + if (auto server = Server_.Acquire()) { + server->OnDynamicConfigChanged(config); + } } TFuture<void> Stop() final @@ -448,6 +468,7 @@ public: private: const TBusServerConfigPtr Config_; + TAtomicIntrusivePtr<TBusServerDynamicConfig> DynamicConfig_{New<TBusServerDynamicConfig>()}; IPacketTranscoderFactory* const PacketTranscoderFactory_; const IMemoryUsageTrackerPtr MemoryUsageTracker_; @@ -521,6 +542,15 @@ public: } } + void OnDynamicConfigChanged(const NBus::TBusServerDynamicConfigPtr& config) final + { + YT_VERIFY(config); + + for (const auto& server : Servers_) { + server->OnDynamicConfigChanged(config); + } + } + TFuture<void> Stop() final { if (Config_->EnableLocalBypass && Config_->Port) { |