aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordon-dron <don-dron@yandex-team.com>2025-02-12 10:24:02 +0300
committerdon-dron <don-dron@yandex-team.com>2025-02-12 10:40:14 +0300
commit68ee082e0346a833fb5a5dcead60e46e8801c52b (patch)
treeb838c97de69b17b3962ca648a9c24027e64889b0
parent170ec5117c02f67dc3013e96e1c7368247e4fc93 (diff)
downloadydb-68ee082e0346a833fb5a5dcead60e46e8801c52b.tar.gz
YT-24116: Add dynconfig for bus server
commit_hash:f9f0449d014d5fb078085a5cb0f46768b3a92de8
-rw-r--r--yt/yt/core/bus/client.h5
-rw-r--r--yt/yt/core/bus/server.h8
-rw-r--r--yt/yt/core/bus/tcp/client.cpp10
-rw-r--r--yt/yt/core/bus/tcp/config.cpp14
-rw-r--r--yt/yt/core/bus/tcp/config.h41
-rw-r--r--yt/yt/core/bus/tcp/connection.cpp16
-rw-r--r--yt/yt/core/bus/tcp/connection.h4
-rw-r--r--yt/yt/core/bus/tcp/public.h6
-rw-r--r--yt/yt/core/bus/tcp/server.cpp34
9 files changed, 129 insertions, 9 deletions
diff --git a/yt/yt/core/bus/client.h b/yt/yt/core/bus/client.h
index fa12665fc2..1b9ca361f5 100644
--- a/yt/yt/core/bus/client.h
+++ b/yt/yt/core/bus/client.h
@@ -2,6 +2,8 @@
#include "public.h"
+#include <yt/yt/core/bus/tcp/public.h>
+
#include <yt/yt/core/ytree/public.h>
namespace NYT::NBus {
@@ -28,6 +30,9 @@ struct IBusClient
//! Typically used for constructing errors.
virtual const NYTree::IAttributeDictionary& GetEndpointAttributes() const = 0;
+ //! Apply new dynamic config.
+ virtual void OnDynamicConfigChanged(const NBus::TBusClientDynamicConfigPtr& config) = 0;
+
//! Creates a new bus.
/*!
* The bus will point to the address supplied during construction.
diff --git a/yt/yt/core/bus/server.h b/yt/yt/core/bus/server.h
index dd7a35bc13..d217935c36 100644
--- a/yt/yt/core/bus/server.h
+++ b/yt/yt/core/bus/server.h
@@ -2,6 +2,8 @@
#include "public.h"
+#include <yt/yt/core/bus/tcp/public.h>
+
#include <yt/yt/core/actions/future.h>
namespace NYT::NBus {
@@ -22,6 +24,12 @@ struct IBusServer
*/
virtual void Start(IMessageHandlerPtr handler) = 0;
+ //! Apply new dynamic config.
+ /*
+ * \param config New config.
+ */
+ virtual void OnDynamicConfigChanged(const NBus::TBusServerDynamicConfigPtr& config) = 0;
+
//! Asynchronously stops the listener.
/*!
* After this call the instance is no longer usable.
diff --git a/yt/yt/core/bus/tcp/client.cpp b/yt/yt/core/bus/tcp/client.cpp
index 5dd0117bee..81db62b1ab 100644
--- a/yt/yt/core/bus/tcp/client.cpp
+++ b/yt/yt/core/bus/tcp/client.cpp
@@ -160,6 +160,11 @@ public:
return *EndpointAttributes_;
}
+ void OnDynamicConfigChanged(const NBus::TBusClientDynamicConfigPtr& config) override
+ {
+ DynamicConfig_.Store(config);
+ }
+
IBusPtr CreateBus(IMessageHandlerPtr handler, const TCreateBusOptions& options) override
{
YT_ASSERT_THREAD_AFFINITY_ANY();
@@ -181,7 +186,6 @@ public:
.EndMap());
auto poller = TTcpDispatcher::TImpl::Get()->GetXferPoller();
-
auto connection = New<TTcpConnection>(
Config_,
EConnectionType::Client,
@@ -196,7 +200,8 @@ public:
std::move(handler),
std::move(poller),
PacketTranscoderFactory_,
- MemoryUsageTracker_);
+ MemoryUsageTracker_,
+ DynamicConfig_.Acquire()->NeedRejectConnectionDueMemoryOvercommit);
connection->Start();
return New<TTcpClientBusProxy>(std::move(connection));
@@ -204,6 +209,7 @@ public:
private:
const TBusClientConfigPtr Config_;
+ TAtomicIntrusivePtr<TBusClientDynamicConfig> DynamicConfig_{New<TBusClientDynamicConfig>()};
IPacketTranscoderFactory* const PacketTranscoderFactory_;
const IMemoryUsageTrackerPtr MemoryUsageTracker_;
const std::string EndpointDescription_;
diff --git a/yt/yt/core/bus/tcp/config.cpp b/yt/yt/core/bus/tcp/config.cpp
index f67c6fba6c..809a2d7413 100644
--- a/yt/yt/core/bus/tcp/config.cpp
+++ b/yt/yt/core/bus/tcp/config.cpp
@@ -126,6 +126,9 @@ TBusServerConfigPtr TBusServerConfig::CreateUds(const std::string& socketPath)
return config;
}
+void TBusServerDynamicConfig::Register(TRegistrar /*registrar*/)
+{ }
+
////////////////////////////////////////////////////////////////////////////////
void TBusConfig::Register(TRegistrar registrar)
@@ -170,6 +173,14 @@ void TBusConfig::Register(TRegistrar registrar)
////////////////////////////////////////////////////////////////////////////////
+void TBusDynamicConfig::Register(TRegistrar registrar)
+{
+ registrar.Parameter("need_reject_connection_due_memory_overcommit", &TThis::NeedRejectConnectionDueMemoryOvercommit)
+ .Default(false);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
void TBusClientConfig::Register(TRegistrar registrar)
{
registrar.Parameter("address", &TThis::Address)
@@ -198,6 +209,9 @@ TBusClientConfigPtr TBusClientConfig::CreateUds(const std::string& socketPath)
return config;
}
+void TBusClientDynamicConfig::Register(TRegistrar /*registrar*/)
+{ }
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NBus
diff --git a/yt/yt/core/bus/tcp/config.h b/yt/yt/core/bus/tcp/config.h
index 491612fabb..d0a5bde08d 100644
--- a/yt/yt/core/bus/tcp/config.h
+++ b/yt/yt/core/bus/tcp/config.h
@@ -130,6 +130,21 @@ DEFINE_REFCOUNTED_TYPE(TBusConfig)
////////////////////////////////////////////////////////////////////////////////
+class TBusDynamicConfig
+ : public NYTree::TYsonStruct
+{
+public:
+ bool NeedRejectConnectionDueMemoryOvercommit;
+
+ REGISTER_YSON_STRUCT(TBusDynamicConfig);
+
+ static void Register(TRegistrar registrar);
+};
+
+DEFINE_REFCOUNTED_TYPE(TBusDynamicConfig)
+
+////////////////////////////////////////////////////////////////////////////////
+
class TBusServerConfig
: public TBusConfig
{
@@ -151,6 +166,19 @@ DEFINE_REFCOUNTED_TYPE(TBusServerConfig)
////////////////////////////////////////////////////////////////////////////////
+class TBusServerDynamicConfig
+ : public TBusDynamicConfig
+{
+public:
+ REGISTER_YSON_STRUCT(TBusServerDynamicConfig);
+
+ static void Register(TRegistrar registrar);
+};
+
+DEFINE_REFCOUNTED_TYPE(TBusServerDynamicConfig)
+
+////////////////////////////////////////////////////////////////////////////////
+
class TBusClientConfig
: public TBusConfig
{
@@ -170,5 +198,18 @@ DEFINE_REFCOUNTED_TYPE(TBusClientConfig)
////////////////////////////////////////////////////////////////////////////////
+class TBusClientDynamicConfig
+ : public TBusDynamicConfig
+{
+public:
+ REGISTER_YSON_STRUCT(TBusClientDynamicConfig);
+
+ static void Register(TRegistrar registrar);
+};
+
+DEFINE_REFCOUNTED_TYPE(TBusClientDynamicConfig)
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NBus
diff --git a/yt/yt/core/bus/tcp/connection.cpp b/yt/yt/core/bus/tcp/connection.cpp
index a357e9199d..792cc3d0c4 100644
--- a/yt/yt/core/bus/tcp/connection.cpp
+++ b/yt/yt/core/bus/tcp/connection.cpp
@@ -114,7 +114,8 @@ TTcpConnection::TTcpConnection(
IMessageHandlerPtr handler,
IPollerPtr poller,
IPacketTranscoderFactory* packetTranscoderFactory,
- IMemoryUsageTrackerPtr memoryUsageTracker)
+ IMemoryUsageTrackerPtr memoryUsageTracker,
+ bool needRejectConnectionDueMemoryOvercommit)
: Config_(std::move(config))
, ConnectionType_(connectionType)
, Id_(id)
@@ -139,6 +140,7 @@ TTcpConnection::TTcpConnection(
, EncryptionMode_(Config_->EncryptionMode)
, VerificationMode_(Config_->VerificationMode)
, MemoryUsageTracker_(std::move(memoryUsageTracker))
+ , NeedRejectConnectionDueMemoryOvercommit_(needRejectConnectionDueMemoryOvercommit)
{ }
TTcpConnection::~TTcpConnection()
@@ -596,9 +598,15 @@ void TTcpConnection::InitBuffers()
ConnectionType_ == EConnectionType::Server
? GetRefCountedTypeCookie<TTcpServerConnectionWriteBufferTag>()
: GetRefCountedTypeCookie<TTcpClientConnectionWriteBufferTag>());
- trackedBlob
- .TryReserve(WriteBufferSize)
- .ThrowOnError();
+
+ if (NeedRejectConnectionDueMemoryOvercommit_) {
+ trackedBlob
+ .TryReserve(WriteBufferSize)
+ .ThrowOnError();
+ } else {
+ trackedBlob.Reserve(WriteBufferSize);
+ }
+
WriteBuffers_.push_back(std::move(trackedBlob));
}
diff --git a/yt/yt/core/bus/tcp/connection.h b/yt/yt/core/bus/tcp/connection.h
index bc520f331a..229d244828 100644
--- a/yt/yt/core/bus/tcp/connection.h
+++ b/yt/yt/core/bus/tcp/connection.h
@@ -88,7 +88,8 @@ public:
IMessageHandlerPtr handler,
NConcurrency::IPollerPtr poller,
IPacketTranscoderFactory* packetTranscoderFactory,
- IMemoryUsageTrackerPtr memoryUsageTracker);
+ IMemoryUsageTrackerPtr memoryUsageTracker,
+ bool needRejectConnectionDueMemoryOvercommit);
~TTcpConnection();
@@ -279,6 +280,7 @@ private:
const EVerificationMode VerificationMode_;
const IMemoryUsageTrackerPtr MemoryUsageTracker_;
+ const bool NeedRejectConnectionDueMemoryOvercommit_;
NYTree::IAttributeDictionaryPtr PeerAttributes_;
diff --git a/yt/yt/core/bus/tcp/public.h b/yt/yt/core/bus/tcp/public.h
index 0c86109c1a..aa86f46c4b 100644
--- a/yt/yt/core/bus/tcp/public.h
+++ b/yt/yt/core/bus/tcp/public.h
@@ -14,9 +14,15 @@ using TBusNetworkCountersPtr = TIntrusivePtr<TBusNetworkCounters>;
DECLARE_REFCOUNTED_CLASS(TMultiplexingBandConfig)
DECLARE_REFCOUNTED_CLASS(TTcpDispatcherConfig)
DECLARE_REFCOUNTED_CLASS(TTcpDispatcherDynamicConfig)
+
DECLARE_REFCOUNTED_CLASS(TBusConfig)
+DECLARE_REFCOUNTED_CLASS(TBusDynamicConfig)
+
DECLARE_REFCOUNTED_CLASS(TBusServerConfig)
+DECLARE_REFCOUNTED_CLASS(TBusServerDynamicConfig)
+
DECLARE_REFCOUNTED_CLASS(TBusClientConfig)
+DECLARE_REFCOUNTED_CLASS(TBusClientDynamicConfig)
struct IPacketTranscoderFactory;
diff --git a/yt/yt/core/bus/tcp/server.cpp b/yt/yt/core/bus/tcp/server.cpp
index a9fcf0c8a0..2802f2d5d6 100644
--- a/yt/yt/core/bus/tcp/server.cpp
+++ b/yt/yt/core/bus/tcp/server.cpp
@@ -78,6 +78,13 @@ public:
YT_LOG_INFO("Bus server started");
}
+ void OnDynamicConfigChanged(const NBus::TBusServerDynamicConfigPtr& config)
+ {
+ YT_VERIFY(config);
+
+ DynamicConfig_.Store(config);
+ }
+
TFuture<void> Stop()
{
YT_LOG_INFO("Stopping Bus server");
@@ -122,6 +129,7 @@ public:
protected:
const TBusServerConfigPtr Config_;
+ TAtomicIntrusivePtr<TBusServerDynamicConfig> DynamicConfig_{New<TBusServerDynamicConfig>()};
const IPollerPtr Poller_;
const IMessageHandlerPtr Handler_;
IPacketTranscoderFactory* const PacketTranscoderFactory_;
@@ -254,7 +262,6 @@ protected:
.EndMap());
auto poller = TTcpDispatcher::TImpl::Get()->GetXferPoller();
-
auto connection = New<TTcpConnection>(
Config_,
EConnectionType::Server,
@@ -269,7 +276,8 @@ protected:
Handler_,
std::move(poller),
PacketTranscoderFactory_,
- MemoryUsageTracker_);
+ MemoryUsageTracker_,
+ DynamicConfig_.Acquire()->NeedRejectConnectionDueMemoryOvercommit);
{
auto guard = WriterGuard(ConnectionsSpinLock_);
@@ -435,6 +443,18 @@ public:
Server_.Store(server);
server->Start();
+ server->OnDynamicConfigChanged(DynamicConfig_.Acquire());
+ }
+
+ void OnDynamicConfigChanged(const NBus::TBusServerDynamicConfigPtr& config) final
+ {
+ YT_VERIFY(config);
+
+ DynamicConfig_.Store(config);
+
+ if (auto server = Server_.Acquire()) {
+ server->OnDynamicConfigChanged(config);
+ }
}
TFuture<void> Stop() final
@@ -448,6 +468,7 @@ public:
private:
const TBusServerConfigPtr Config_;
+ TAtomicIntrusivePtr<TBusServerDynamicConfig> DynamicConfig_{New<TBusServerDynamicConfig>()};
IPacketTranscoderFactory* const PacketTranscoderFactory_;
const IMemoryUsageTrackerPtr MemoryUsageTracker_;
@@ -521,6 +542,15 @@ public:
}
}
+ void OnDynamicConfigChanged(const NBus::TBusServerDynamicConfigPtr& config) final
+ {
+ YT_VERIFY(config);
+
+ for (const auto& server : Servers_) {
+ server->OnDynamicConfigChanged(config);
+ }
+ }
+
TFuture<void> Stop() final
{
if (Config_->EnableLocalBypass && Config_->Port) {